diff --git a/packages/core/src/api/index.ts b/packages/core/src/api/index.ts index 20815e6bc..5049d3801 100644 --- a/packages/core/src/api/index.ts +++ b/packages/core/src/api/index.ts @@ -95,6 +95,7 @@ class Database { isInitialized = false; eventManager = new EventManager(); sseMutex = new Mutex(); + _fs?: FileStorage; storage: StorageAccessor = () => { if (!this.options?.storage) @@ -109,7 +110,10 @@ class Database { throw new Error( "Database not initialized. Did you forget to call db.setup()?" ); - return new FileStorage(this.options.fs, this.tokenManager); + return ( + this._fs || + (this._fs = new FileStorage(this.options.fs, this.tokenManager)) + ); }; crypto: CryptoAccessor = () => { diff --git a/packages/core/src/database/fs.ts b/packages/core/src/database/fs.ts index 9d65d1a31..51fa1aba5 100644 --- a/packages/core/src/database/fs.ts +++ b/packages/core/src/database/fs.ts @@ -30,16 +30,22 @@ import { logger } from "../logger"; export type FileStorageAccessor = () => FileStorage; export type DownloadableFile = { filename: string; - // metadata: AttachmentMetadata; chunkSize: number; }; export type QueueItem = DownloadableFile & { cancel?: (reason?: string) => Promise; + operation?: Promise; }; export class FileStorage { - downloads = new Map(); - uploads = new Map(); + id = Date.now(); + downloads = new Map(); + uploads = new Map(); + groups = { + downloads: new Map>(), + uploads: new Map>() + }; + constructor( private readonly fs: IFileStorage, private readonly tokenManager: TokenManager @@ -50,12 +56,26 @@ export class FileStorage { groupId: string, eventData?: Record ) { + let current = 0; const token = await this.tokenManager.getAccessToken(); const total = files.length; - let current = 0; - this.downloads.set(groupId, files); + const group = this.groups.downloads.get(groupId) || new Set(); + files.forEach((f) => group.add(f.filename)); + this.groups.downloads.set(groupId, group); for (const file of files as QueueItem[]) { + if (!group.has(file.filename)) continue; + + const download = this.downloads.get(file.filename); + if (download && download.operation) { + logger.debug("[queueDownloads] duplicate download", { + filename: file.filename, + groupId + }); + await download.operation; + continue; + } + const { filename, chunkSize } = file; if (await this.exists(filename)) { current++; @@ -68,14 +88,6 @@ export class FileStorage { continue; } - const url = `${hosts.API_HOST}/s3?name=${filename}`; - const { execute, cancel } = this.fs.downloadFile(filename, { - url, - chunkSize, - headers: { Authorization: `Bearer ${token}` } - }); - file.cancel = cancel; - EV.publish(EVENTS.fileDownload, { total, current, @@ -83,7 +95,22 @@ export class FileStorage { filename }); - const result = await execute().catch(() => false); + const url = `${hosts.API_HOST}/s3?name=${filename}`; + const { execute, cancel } = this.fs.downloadFile(filename, { + url, + chunkSize, + headers: { Authorization: `Bearer ${token}` } + }); + file.cancel = cancel; + file.operation = execute() + .catch(() => false) + .finally(() => { + this.downloads.delete(filename); + group.delete(filename); + }); + + this.downloads.set(filename, file); + const result = await file.operation; if (eventData) EV.publish(EVENTS.fileDownloaded, { success: result, @@ -94,17 +121,31 @@ export class FileStorage { eventData }); } - this.downloads.delete(groupId); } async queueUploads(files: DownloadableFile[], groupId: string) { + let current = 0; const token = await this.tokenManager.getAccessToken(); const total = files.length; - let current = 0; - this.uploads.set(groupId, files); + const group = this.groups.uploads.get(groupId) || new Set(); + files.forEach((f) => group.add(f.filename)); + this.groups.uploads.set(groupId, group); for (const file of files as QueueItem[]) { + if (!group.has(file.filename)) continue; + + const upload = this.uploads.get(file.filename); + if (upload && upload.operation) { + logger.debug("[queueUploads] duplicate upload", { + filename: file.filename, + groupId + }); + await file.operation; + continue; + } + const { filename, chunkSize } = file; + let error = null; const url = `${hosts.API_HOST}/s3?name=${filename}`; const { execute, cancel } = this.fs.uploadFile(filename, { chunkSize, @@ -112,6 +153,16 @@ export class FileStorage { headers: { Authorization: `Bearer ${token}` } }); file.cancel = cancel; + file.operation = execute() + .catch((e) => { + logger.error(e, "failed to upload attachment", { hash: filename }); + error = e; + return false; + }) + .finally(() => { + this.uploads.delete(filename); + group.delete(filename); + }); EV.publish(EVENTS.fileUpload, { total, @@ -120,13 +171,8 @@ export class FileStorage { filename }); - let error = null; - const result = await execute().catch((e) => { - logger.error(e, "failed to upload attachment", { hash: filename }); - error = e; - return false; - }); - + this.uploads.set(filename, file); + const result = await file.operation; EV.publish(EVENTS.fileUploaded, { error, success: result, @@ -136,44 +182,67 @@ export class FileStorage { filename }); } - this.uploads.delete(groupId); } async downloadFile(groupId: string, filename: string, chunkSize: number) { + if (await this.exists(filename)) return true; + + const download = this.downloads.get(filename); + if (download && download.operation) { + logger.debug("[downloadFile] duplicate download", { filename, groupId }); + return await download.operation; + } + logger.debug("[downloadFile] downloading", { filename, groupId }); const url = `${hosts.API_HOST}/s3?name=${filename}`; + const file: QueueItem = { filename, chunkSize }; const token = await this.tokenManager.getAccessToken(); + const group = this.groups.downloads.get(groupId) || new Set(); const { execute, cancel } = this.fs.downloadFile(filename, { url, chunkSize, headers: { Authorization: `Bearer ${token}` } }); - this.downloads.set(groupId, [{ cancel, filename, chunkSize }]); - const result = await execute(); - this.downloads.delete(groupId); - return result; + file.cancel = cancel; + file.operation = execute().finally(() => { + this.downloads.delete(filename); + group.delete(filename); + }); + + this.downloads.set(filename, file); + this.groups.downloads.set(groupId, group.add(filename)); + return await file.operation; } async cancel(groupId: string) { const queues = [ - { type: "download", files: this.downloads.get(groupId) }, - { type: "upload", files: this.uploads.get(groupId) } - ].filter((a) => !!a.files); + { + type: "download", + ids: this.groups.downloads.get(groupId), + files: this.downloads + }, + { + type: "upload", + ids: this.groups.uploads.get(groupId), + files: this.uploads + } + ].filter((a) => !!a.ids); for (const queue of queues) { - if (!queue.files) continue; - for (let i = 0; i < queue.files.length; ++i) { - const file = queue.files[i]; - if (file.cancel) await file.cancel("Operation canceled."); - queue.files.splice(i, 1); + if (!queue.ids) continue; + + for (const filename of queue.ids) { + const file = queue.files.get(filename); + if (file?.cancel) await file.cancel("Operation canceled."); + queue.ids.delete(filename); } if (queue.type === "download") { - this.downloads.delete(groupId); + this.groups.downloads.delete(groupId); EV.publish(EVENTS.downloadCanceled, { groupId, canceled: true }); } else if (queue.type === "upload") { - this.uploads.delete(groupId); + this.groups.uploads.delete(groupId); EV.publish(EVENTS.uploadCanceled, { groupId, canceled: true }); } }