core: fix File already exists error on duplicate downloads

This commit is contained in:
Abdullah Atta
2024-05-07 12:57:45 +05:00
parent 9d5bec3bfb
commit 79f20d100c
2 changed files with 113 additions and 40 deletions

View File

@@ -95,6 +95,7 @@ class Database {
isInitialized = false; isInitialized = false;
eventManager = new EventManager(); eventManager = new EventManager();
sseMutex = new Mutex(); sseMutex = new Mutex();
_fs?: FileStorage;
storage: StorageAccessor = () => { storage: StorageAccessor = () => {
if (!this.options?.storage) if (!this.options?.storage)
@@ -109,7 +110,10 @@ class Database {
throw new Error( throw new Error(
"Database not initialized. Did you forget to call db.setup()?" "Database not initialized. Did you forget to call db.setup()?"
); );
return new FileStorage(this.options.fs, this.tokenManager); return (
this._fs ||
(this._fs = new FileStorage(this.options.fs, this.tokenManager))
);
}; };
crypto: CryptoAccessor = () => { crypto: CryptoAccessor = () => {

View File

@@ -30,16 +30,22 @@ import { logger } from "../logger";
export type FileStorageAccessor = () => FileStorage; export type FileStorageAccessor = () => FileStorage;
export type DownloadableFile = { export type DownloadableFile = {
filename: string; filename: string;
// metadata: AttachmentMetadata;
chunkSize: number; chunkSize: number;
}; };
export type QueueItem = DownloadableFile & { export type QueueItem = DownloadableFile & {
cancel?: (reason?: string) => Promise<void>; cancel?: (reason?: string) => Promise<void>;
operation?: Promise<boolean>;
}; };
export class FileStorage { export class FileStorage {
downloads = new Map<string, QueueItem[]>(); id = Date.now();
uploads = new Map<string, QueueItem[]>(); downloads = new Map<string, QueueItem>();
uploads = new Map<string, QueueItem>();
groups = {
downloads: new Map<string, Set<string>>(),
uploads: new Map<string, Set<string>>()
};
constructor( constructor(
private readonly fs: IFileStorage, private readonly fs: IFileStorage,
private readonly tokenManager: TokenManager private readonly tokenManager: TokenManager
@@ -50,12 +56,26 @@ export class FileStorage {
groupId: string, groupId: string,
eventData?: Record<string, unknown> eventData?: Record<string, unknown>
) { ) {
let current = 0;
const token = await this.tokenManager.getAccessToken(); const token = await this.tokenManager.getAccessToken();
const total = files.length; const total = files.length;
let current = 0; const group = this.groups.downloads.get(groupId) || new Set();
this.downloads.set(groupId, files); files.forEach((f) => group.add(f.filename));
this.groups.downloads.set(groupId, group);
for (const file of files as QueueItem[]) { for (const file of files as QueueItem[]) {
if (!group.has(file.filename)) continue;
const download = this.downloads.get(file.filename);
if (download && download.operation) {
logger.debug("[queueDownloads] duplicate download", {
filename: file.filename,
groupId
});
await download.operation;
continue;
}
const { filename, chunkSize } = file; const { filename, chunkSize } = file;
if (await this.exists(filename)) { if (await this.exists(filename)) {
current++; current++;
@@ -68,14 +88,6 @@ export class FileStorage {
continue; continue;
} }
const url = `${hosts.API_HOST}/s3?name=${filename}`;
const { execute, cancel } = this.fs.downloadFile(filename, {
url,
chunkSize,
headers: { Authorization: `Bearer ${token}` }
});
file.cancel = cancel;
EV.publish(EVENTS.fileDownload, { EV.publish(EVENTS.fileDownload, {
total, total,
current, current,
@@ -83,7 +95,22 @@ export class FileStorage {
filename filename
}); });
const result = await execute().catch(() => false); const url = `${hosts.API_HOST}/s3?name=${filename}`;
const { execute, cancel } = this.fs.downloadFile(filename, {
url,
chunkSize,
headers: { Authorization: `Bearer ${token}` }
});
file.cancel = cancel;
file.operation = execute()
.catch(() => false)
.finally(() => {
this.downloads.delete(filename);
group.delete(filename);
});
this.downloads.set(filename, file);
const result = await file.operation;
if (eventData) if (eventData)
EV.publish(EVENTS.fileDownloaded, { EV.publish(EVENTS.fileDownloaded, {
success: result, success: result,
@@ -94,17 +121,31 @@ export class FileStorage {
eventData eventData
}); });
} }
this.downloads.delete(groupId);
} }
async queueUploads(files: DownloadableFile[], groupId: string) { async queueUploads(files: DownloadableFile[], groupId: string) {
let current = 0;
const token = await this.tokenManager.getAccessToken(); const token = await this.tokenManager.getAccessToken();
const total = files.length; const total = files.length;
let current = 0; const group = this.groups.uploads.get(groupId) || new Set();
this.uploads.set(groupId, files); files.forEach((f) => group.add(f.filename));
this.groups.uploads.set(groupId, group);
for (const file of files as QueueItem[]) { for (const file of files as QueueItem[]) {
if (!group.has(file.filename)) continue;
const upload = this.uploads.get(file.filename);
if (upload && upload.operation) {
logger.debug("[queueUploads] duplicate upload", {
filename: file.filename,
groupId
});
await file.operation;
continue;
}
const { filename, chunkSize } = file; const { filename, chunkSize } = file;
let error = null;
const url = `${hosts.API_HOST}/s3?name=${filename}`; const url = `${hosts.API_HOST}/s3?name=${filename}`;
const { execute, cancel } = this.fs.uploadFile(filename, { const { execute, cancel } = this.fs.uploadFile(filename, {
chunkSize, chunkSize,
@@ -112,6 +153,16 @@ export class FileStorage {
headers: { Authorization: `Bearer ${token}` } headers: { Authorization: `Bearer ${token}` }
}); });
file.cancel = cancel; file.cancel = cancel;
file.operation = execute()
.catch((e) => {
logger.error(e, "failed to upload attachment", { hash: filename });
error = e;
return false;
})
.finally(() => {
this.uploads.delete(filename);
group.delete(filename);
});
EV.publish(EVENTS.fileUpload, { EV.publish(EVENTS.fileUpload, {
total, total,
@@ -120,13 +171,8 @@ export class FileStorage {
filename filename
}); });
let error = null; this.uploads.set(filename, file);
const result = await execute().catch((e) => { const result = await file.operation;
logger.error(e, "failed to upload attachment", { hash: filename });
error = e;
return false;
});
EV.publish(EVENTS.fileUploaded, { EV.publish(EVENTS.fileUploaded, {
error, error,
success: result, success: result,
@@ -136,44 +182,67 @@ export class FileStorage {
filename filename
}); });
} }
this.uploads.delete(groupId);
} }
async downloadFile(groupId: string, filename: string, chunkSize: number) { async downloadFile(groupId: string, filename: string, chunkSize: number) {
if (await this.exists(filename)) return true;
const download = this.downloads.get(filename);
if (download && download.operation) {
logger.debug("[downloadFile] duplicate download", { filename, groupId });
return await download.operation;
}
logger.debug("[downloadFile] downloading", { filename, groupId }); logger.debug("[downloadFile] downloading", { filename, groupId });
const url = `${hosts.API_HOST}/s3?name=${filename}`; const url = `${hosts.API_HOST}/s3?name=${filename}`;
const file: QueueItem = { filename, chunkSize };
const token = await this.tokenManager.getAccessToken(); const token = await this.tokenManager.getAccessToken();
const group = this.groups.downloads.get(groupId) || new Set();
const { execute, cancel } = this.fs.downloadFile(filename, { const { execute, cancel } = this.fs.downloadFile(filename, {
url, url,
chunkSize, chunkSize,
headers: { Authorization: `Bearer ${token}` } headers: { Authorization: `Bearer ${token}` }
}); });
this.downloads.set(groupId, [{ cancel, filename, chunkSize }]); file.cancel = cancel;
const result = await execute(); file.operation = execute().finally(() => {
this.downloads.delete(groupId); this.downloads.delete(filename);
return result; group.delete(filename);
});
this.downloads.set(filename, file);
this.groups.downloads.set(groupId, group.add(filename));
return await file.operation;
} }
async cancel(groupId: string) { async cancel(groupId: string) {
const queues = [ const queues = [
{ type: "download", files: this.downloads.get(groupId) }, {
{ type: "upload", files: this.uploads.get(groupId) } type: "download",
].filter((a) => !!a.files); ids: this.groups.downloads.get(groupId),
files: this.downloads
},
{
type: "upload",
ids: this.groups.uploads.get(groupId),
files: this.uploads
}
].filter((a) => !!a.ids);
for (const queue of queues) { for (const queue of queues) {
if (!queue.files) continue; if (!queue.ids) continue;
for (let i = 0; i < queue.files.length; ++i) {
const file = queue.files[i]; for (const filename of queue.ids) {
if (file.cancel) await file.cancel("Operation canceled."); const file = queue.files.get(filename);
queue.files.splice(i, 1); if (file?.cancel) await file.cancel("Operation canceled.");
queue.ids.delete(filename);
} }
if (queue.type === "download") { if (queue.type === "download") {
this.downloads.delete(groupId); this.groups.downloads.delete(groupId);
EV.publish(EVENTS.downloadCanceled, { groupId, canceled: true }); EV.publish(EVENTS.downloadCanceled, { groupId, canceled: true });
} else if (queue.type === "upload") { } else if (queue.type === "upload") {
this.uploads.delete(groupId); this.groups.uploads.delete(groupId);
EV.publish(EVENTS.uploadCanceled, { groupId, canceled: true }); EV.publish(EVENTS.uploadCanceled, { groupId, canceled: true });
} }
} }