core: remove group from queueDownloads when in progress

This commit is contained in:
Ammar Ahmed
2026-02-26 10:34:04 +05:00
parent 04d422b411
commit 4cea69e71d

View File

@@ -58,165 +58,173 @@ export class FileStorage {
groupId: string,
eventData?: Record<string, unknown>
) {
const newFiles = await this.fs.bulkExists(files.map((f) => f.filename));
files = files.filter((f) => newFiles.includes(f.filename));
if (files.length <= 0) return;
try {
const newFiles = await this.fs.bulkExists(files.map((f) => f.filename));
files = files.filter((f) => newFiles.includes(f.filename));
if (files.length <= 0) return;
let current = 0;
const token = await this.tokenManager.getAccessToken();
const total = files.length;
let current = 0;
const token = await this.tokenManager.getAccessToken();
const total = files.length;
if (this.groups.downloads.has(groupId)) {
logger.debug("[queueDownloads] group already exists", {
groupId
});
return this.groups.downloads.get(groupId);
}
const group = new Set<string>();
files.forEach((f) => group.add(f.filename));
this.groups.downloads.set(groupId, group);
for (const file of files as QueueItem[]) {
current++;
if (!group.has(file.filename)) {
this.eventManager.publish(EVENTS.fileDownloaded, {
success: false,
groupId,
filename: file.filename,
eventData,
current,
total
});
continue;
}
const download = this.downloads.get(file.filename);
if (download && download.operation) {
logger.debug("[queueDownloads] duplicate download", {
filename: file.filename,
if (this.groups.downloads.has(groupId)) {
logger.debug("[queueDownloads] group already exists", {
groupId
});
await download.operation;
continue;
return this.groups.downloads.get(groupId);
}
const { filename, chunkSize } = file;
if (await this.exists(filename)) {
this.eventManager.publish(EVENTS.fileDownloaded, {
success: true,
groupId,
filename,
eventData,
current,
total
});
continue;
}
const group = new Set<string>();
this.eventManager.publish(EVENTS.fileDownload, {
total,
current,
groupId,
filename
});
files.forEach((f) => group.add(f.filename));
this.groups.downloads.set(groupId, group);
const url = `${hosts.API_HOST}/s3?name=${filename}`;
const { execute, cancel } = this.fs.downloadFile(filename, {
url,
chunkSize,
headers: { Authorization: `Bearer ${token}` }
});
file.cancel = cancel;
file.operation = execute()
.catch(() => false)
.finally(() => {
this.downloads.delete(filename);
group.delete(filename);
});
for (const file of files as QueueItem[]) {
current++;
if (!group.has(file.filename)) {
this.eventManager.publish(EVENTS.fileDownloaded, {
success: false,
groupId,
filename: file.filename,
eventData,
current,
total
});
continue;
}
this.downloads.set(filename, file);
const result = await file.operation;
if (eventData)
this.eventManager.publish(EVENTS.fileDownloaded, {
success: result,
const download = this.downloads.get(file.filename);
if (download && download.operation) {
logger.debug("[queueDownloads] duplicate download", {
filename: file.filename,
groupId
});
await download.operation;
continue;
}
const { filename, chunkSize } = file;
if (await this.exists(filename)) {
this.eventManager.publish(EVENTS.fileDownloaded, {
success: true,
groupId,
filename,
eventData,
current,
total
});
continue;
}
this.eventManager.publish(EVENTS.fileDownload, {
total,
current,
groupId,
filename,
eventData
filename
});
const url = `${hosts.API_HOST}/s3?name=${filename}`;
const { execute, cancel } = this.fs.downloadFile(filename, {
url,
chunkSize,
headers: { Authorization: `Bearer ${token}` }
});
file.cancel = cancel;
file.operation = execute()
.catch(() => false)
.finally(() => {
this.downloads.delete(filename);
group.delete(filename);
});
this.downloads.set(filename, file);
const result = await file.operation;
if (eventData)
this.eventManager.publish(EVENTS.fileDownloaded, {
success: result,
total,
current,
groupId,
filename,
eventData
});
}
} finally {
this.groups.downloads.delete(groupId);
}
}
async queueUploads(files: DownloadableFile[], groupId: string) {
let current = 0;
const token = await this.tokenManager.getAccessToken();
const total = files.length;
try {
let current = 0;
const token = await this.tokenManager.getAccessToken();
const total = files.length;
if (this.groups.uploads.has(groupId)) {
logger.debug("[queueUploads] group already exists", {
groupId
});
return this.groups.uploads.get(groupId);
}
const group = new Set<string>();
files.forEach((f) => group.add(f.filename));
this.groups.uploads.set(groupId, group);
for (const file of files as QueueItem[]) {
if (!group.has(file.filename)) continue;
const upload = this.uploads.get(file.filename);
if (upload && upload.operation) {
logger.debug("[queueUploads] duplicate upload", {
filename: file.filename,
if (this.groups.uploads.has(groupId)) {
logger.debug("[queueUploads] group already exists", {
groupId
});
await file.operation;
continue;
return this.groups.uploads.get(groupId);
}
const { filename, chunkSize } = file;
let error = null;
const url = `${hosts.API_HOST}/s3?name=${filename}`;
const { execute, cancel } = this.fs.uploadFile(filename, {
chunkSize,
url,
headers: { Authorization: `Bearer ${token}` }
});
file.cancel = cancel;
file.operation = execute()
.catch((e) => {
logger.error(e, "failed to upload attachment", { hash: filename });
error = e;
return false;
})
.finally(() => {
this.uploads.delete(filename);
group.delete(filename);
const group = new Set<string>();
files.forEach((f) => group.add(f.filename));
this.groups.uploads.set(groupId, group);
for (const file of files as QueueItem[]) {
if (!group.has(file.filename)) continue;
const upload = this.uploads.get(file.filename);
if (upload && upload.operation) {
logger.debug("[queueUploads] duplicate upload", {
filename: file.filename,
groupId
});
await file.operation;
continue;
}
const { filename, chunkSize } = file;
let error = null;
const url = `${hosts.API_HOST}/s3?name=${filename}`;
const { execute, cancel } = this.fs.uploadFile(filename, {
chunkSize,
url,
headers: { Authorization: `Bearer ${token}` }
});
file.cancel = cancel;
file.operation = execute()
.catch((e) => {
logger.error(e, "failed to upload attachment", { hash: filename });
error = e;
return false;
})
.finally(() => {
this.uploads.delete(filename);
group.delete(filename);
});
this.eventManager.publish(EVENTS.fileUpload, {
total,
current,
groupId,
filename
});
this.eventManager.publish(EVENTS.fileUpload, {
total,
current,
groupId,
filename
});
this.uploads.set(filename, file);
const result = await file.operation;
this.eventManager.publish(EVENTS.fileUploaded, {
error,
success: result,
total,
current: ++current,
groupId,
filename
});
this.uploads.set(filename, file);
const result = await file.operation;
this.eventManager.publish(EVENTS.fileUploaded, {
error,
success: result,
total,
current: ++current,
groupId,
filename
});
}
} finally {
this.groups.uploads.delete(groupId);
}
}