From 207347190dd506384de1b6ee57487527e2e56309 Mon Sep 17 00:00:00 2001 From: Abdullah Atta Date: Wed, 20 Sep 2023 15:37:27 +0500 Subject: [PATCH] core: fix download/upload cancellation --- packages/core/src/api/sync/index.js | 22 +--- packages/core/src/collections/attachments.js | 82 +++++++------ packages/core/src/collections/content.js | 33 ++++-- packages/core/src/common.js | 12 +- .../content-types/__tests__/tiptap.test.js | 8 +- packages/core/src/content-types/tiptap.js | 18 +-- packages/core/src/database/fs.js | 109 +++++++++++++----- 7 files changed, 160 insertions(+), 124 deletions(-) diff --git a/packages/core/src/api/sync/index.js b/packages/core/src/api/sync/index.js index 81d902090..b8d2e6794 100644 --- a/packages/core/src/api/sync/index.js +++ b/packages/core/src/api/sync/index.js @@ -21,7 +21,6 @@ import { checkSyncStatus, EV, EVENTS, - sendAttachmentsProgressEvent, sendSyncProgressEvent, SYNC_CHECK_IDS } from "../../common"; @@ -339,23 +338,10 @@ class Sync { const attachments = this.db.attachments.pending; this.logger.info("Uploading attachments...", { total: attachments.length }); - for (var i = 0; i < attachments.length; ++i) { - const attachment = attachments[i]; - const { hash } = attachment.metadata; - sendAttachmentsProgressEvent("upload", hash, attachments.length, i); - - try { - const isUploaded = await this.db.fs.uploadFile(hash, hash); - if (!isUploaded) throw new Error("Failed to upload file."); - - await this.db.attachments.markAsUploaded(attachment.id); - } catch (e) { - logger.error(e, { attachment }); - const error = e.message; - await this.db.attachments.markAsFailed(attachment.id, error); - } - } - sendAttachmentsProgressEvent("upload", null, attachments.length); + await this.db.fs.queueUploads( + attachments.map((a) => ({ filename: a.metadata.hash })), + "sync-uploads" + ); } /** diff --git a/packages/core/src/collections/attachments.js b/packages/core/src/collections/attachments.js index 21d77da13..ee82418f0 100644 --- a/packages/core/src/collections/attachments.js +++ b/packages/core/src/collections/attachments.js @@ -20,7 +20,7 @@ along with this program. If not, see . import Collection from "./collection"; import { getId } from "../utils/id"; import { deleteItem, hasItem } from "../utils/array"; -import { EV, EVENTS, sendAttachmentsProgressEvent } from "../common"; +import { EV, EVENTS } from "../common"; import dataurl from "../utils/dataurl"; import dayjs from "dayjs"; import setManipulator from "../utils/set"; @@ -34,6 +34,36 @@ export default class Attachments extends Collection { constructor(db, name, cached) { super(db, name, cached); this.key = null; + + EV.subscribe( + EVENTS.fileDownloaded, + async ({ success, filename, groupId, eventData }) => { + if (!success || !eventData || !eventData.readOnDownload) return; + const attachment = this.attachment(filename); + if (!attachment) return; + + const src = await this.read(filename, getOutputType(attachment)); + if (!src) return; + + EV.publish(EVENTS.mediaAttachmentDownloaded, { + groupId, + hash: attachment.metadata.hash, + attachmentType: getAttachmentType(attachment), + src + }); + } + ); + + EV.subscribe(EVENTS.fileUploaded, async ({ success, error, filename }) => { + const attachment = this.attachment(filename); + if (!attachment) return; + if (success) await this.markAsUploaded(attachment.id); + else + await this.markAsFailed( + attachment.id, + error || "Failed to upload attachment." + ); + }); } merge(localAttachment, remoteAttachment) { @@ -317,47 +347,15 @@ export default class Attachments extends Collection { (!hashesToLoad || hasItem(hashesToLoad, attachment.metadata.hash)) ); - try { - for (let i = 0; i < attachments.length; i++) { - const attachment = attachments[i]; - await this._download(attachment, { - total: attachments.length, - current: i, - groupId: noteId - }); - } - } finally { - sendAttachmentsProgressEvent("download", noteId, attachments.length); - } - } - - async _download(attachment, { total, current, groupId }, notify = true) { - const { metadata, chunkSize } = attachment; - const filename = metadata.hash; - - if (notify) - sendAttachmentsProgressEvent("download", groupId, total, current); - - const isDownloaded = await this._db.fs.downloadFile( - groupId, - filename, - chunkSize, - metadata + await this._db.fs.queueDownloads( + attachments.map((a) => ({ + filename: a.metadata.hash, + metadata: a.metadata, + chunkSize: a.chunkSize + })), + noteId, + { readOnDownload: true } ); - if (!isDownloaded) return; - - const src = await this.read(metadata.hash, getOutputType(attachment)); - if (!src) return; - - if (notify) - EV.publish(EVENTS.mediaAttachmentDownloaded, { - groupId, - hash: metadata.hash, - attachmentType: getAttachmentType(attachment), - src - }); - - return src; } async cleanup() { @@ -451,7 +449,7 @@ export default class Attachments extends Collection { } } -function getOutputType(attachment) { +export function getOutputType(attachment) { if (attachment.metadata.type === "application/vnd.notesnook.web-clip") return "text"; else if (attachment.metadata.type.startsWith("image/")) return "base64"; diff --git a/packages/core/src/collections/content.js b/packages/core/src/collections/content.js index 699c216d6..5d5ee4bca 100644 --- a/packages/core/src/collections/content.js +++ b/packages/core/src/collections/content.js @@ -21,6 +21,7 @@ import Collection from "./collection"; import { getId } from "../utils/id"; import { getContentFromData } from "../content-types"; import { hasItem } from "../utils/array"; +import { getOutputType } from "./attachments"; export default class Content extends Collection { async add(content) { @@ -116,17 +117,27 @@ export default class Content extends Collection { async downloadMedia(groupId, contentItem, notify = true) { const content = getContentFromData(contentItem.type, contentItem.data); - contentItem.data = await content.insertMedia((hash, { total, current }) => { - const attachment = this._db.attachments.attachment(hash); - if (!attachment) return; - - const progressData = { - total, - current, - groupId - }; - - return this._db.attachments._download(attachment, progressData, notify); + contentItem.data = await content.insertMedia(async (hashes) => { + const attachments = hashes.map((h) => this._db.attachments.attachment(h)); + await this._db.fs.queueDownloads( + attachments.map((a) => ({ + filename: a.metadata.hash, + metadata: a.metadata, + chunkSize: a.chunkSize + })), + groupId, + notify ? { readOnDownload: false } : undefined + ); + const sources = {}; + for (const attachment of attachments) { + const src = await this._db.attachments.read( + attachment.metadata.hash, + getOutputType(attachment) + ); + if (!src) continue; + sources[attachment.metadata.hash] = src; + } + return sources; }); return contentItem; } diff --git a/packages/core/src/common.js b/packages/core/src/common.js index 8bf6016f5..ca6d2e661 100644 --- a/packages/core/src/common.js +++ b/packages/core/src/common.js @@ -36,15 +36,6 @@ export async function checkSyncStatus(type) { return results.some((r) => r.type === type && r.result === true); } -export function sendAttachmentsProgressEvent(type, groupId, total, current) { - EV.publish(EVENTS.attachmentsLoading, { - type, - groupId, - total, - current: current === undefined ? total : current - }); -} - export function sendSyncProgressEvent(EV, type, current) { EV.publish(EVENTS.syncProgress, { type, @@ -98,7 +89,8 @@ export const EVENTS = { noteRemoved: "note:removed", tokenRefreshed: "token:refreshed", userUnauthorized: "user:unauthorized", - attachmentsLoading: "attachments:loading", + fileDownloaded: "file:downloaded", + fileUploaded: "file:uploaded", attachmentDeleted: "attachment:deleted", mediaAttachmentDownloaded: "attachments:mediaDownloaded", vaultLocked: "vault:locked", diff --git a/packages/core/src/content-types/__tests__/tiptap.test.js b/packages/core/src/content-types/__tests__/tiptap.test.js index bac15b48c..177145c6f 100644 --- a/packages/core/src/content-types/__tests__/tiptap.test.js +++ b/packages/core/src/content-types/__tests__/tiptap.test.js @@ -41,7 +41,13 @@ test("img src is present after insert attachments", async () => { return { key: "hello", metadata: {} }; }); const tiptap2 = new Tiptap(result.data); - const result2 = await tiptap2.insertMedia(() => "i am a data"); + const result2 = await tiptap2.insertMedia((hashes) => { + const images = {}; + for (const hash of hashes) { + images[hash] = "i am a data"; + } + return images; + }); expect(result2).toContain(`src="i am a data"`); }); diff --git a/packages/core/src/content-types/tiptap.js b/packages/core/src/content-types/tiptap.js index f2fc937d3..361ef9353 100644 --- a/packages/core/src/content-types/tiptap.js +++ b/packages/core/src/content-types/tiptap.js @@ -100,7 +100,7 @@ export class Tiptap { return tokens.some((token) => lowercase.indexOf(token) > -1); } - async insertMedia(getData) { + async insertMedia(resolve) { let hashes = []; new HTMLParser({ ontag: (name, attr) => { @@ -108,21 +108,9 @@ export class Tiptap { if (name === "img" && hash) hashes.push(hash); } }).parse(this.data); + if (!hashes.length) return this.data; - const images = {}; - let hasImages = false; - for (let i = 0; i < hashes.length; ++i) { - const hash = hashes[i]; - const src = await getData(hash, { - total: hashes.length, - current: i - }); - if (!src) continue; - images[hash] = src; - hasImages = true; - } - - if (!hasImages) return this.data; + const images = await resolve(hashes); return new HTMLRewriter({ ontag: (name, attr) => { const hash = attr[ATTRIBUTES.hash]; diff --git a/packages/core/src/database/fs.js b/packages/core/src/database/fs.js index 024c67707..21a3f0e27 100644 --- a/packages/core/src/database/fs.js +++ b/packages/core/src/database/fs.js @@ -19,12 +19,79 @@ along with this program. If not, see . import hosts from "../utils/constants"; import TokenManager from "../api/token-manager"; +import { EV, EVENTS } from "../common"; export default class FileStorage { constructor(fs, storage) { this.fs = fs; this.tokenManager = new TokenManager(storage); - this._queue = []; + this.downloads = new Map(); + this.uploads = new Map(); + } + + async queueDownloads(files, groupId, eventData) { + const token = await this.tokenManager.getAccessToken(); + const total = files.length; + let current = 0; + this.downloads.set(groupId, files); + for (const file of files) { + const { filename, metadata, chunkSize } = file; + const url = `${hosts.API_HOST}/s3?name=${filename}`; + const { execute, cancel } = this.fs.downloadFile(filename, { + metadata, + url, + chunkSize, + headers: { Authorization: `Bearer ${token}` } + }); + file.cancel = cancel; + + const result = await execute().catch(() => false); + + if (eventData) + EV.publish(EVENTS.fileDownloaded, { + success: result, + total, + current: ++current, + groupId, + filename, + eventData + }); + } + this.downloads.delete(groupId); + } + + async queueUploads(files, groupId) { + const token = await this.tokenManager.getAccessToken(); + const total = files.length; + let current = 0; + this.uploads.set(groupId, files); + + for (const file of files) { + const { filename } = file; + const url = `${hosts.API_HOST}/s3?name=${filename}`; + const { execute, cancel } = this.fs.uploadFile(filename, { + url, + headers: { Authorization: `Bearer ${token}` } + }); + file.cancel = cancel; + + let error = null; + const result = await execute().catch((e) => { + console.error("Failed to upload attachment:", e); + error = e; + return false; + }); + + EV.publish(EVENTS.fileUploaded, { + error, + success: result, + total, + current: ++current, + groupId, + filename + }); + } + this.uploads.delete(groupId); } async downloadFile(groupId, filename, chunkSize, metadata) { @@ -36,37 +103,25 @@ export default class FileStorage { chunkSize, headers: { Authorization: `Bearer ${token}` } }); - this._queue.push({ groupId, filename, cancel, type: "download" }); + this.downloads.set(groupId, [{ cancel }]); const result = await execute(); - this._deleteOp(groupId, "download"); - return result; - } - - async uploadFile(groupId, filename) { - const token = await this.tokenManager.getAccessToken(); - const url = `${hosts.API_HOST}/s3?name=${filename}`; - const { execute, cancel } = this.fs.uploadFile(filename, { - url, - headers: { Authorization: `Bearer ${token}` } - }); - this._queue.push({ groupId, filename, cancel, type: "upload" }); - const result = await execute(); - this._deleteOp(groupId, "upload"); + this.downloads.delete(groupId); return result; } async cancel(groupId, type) { - const [op] = this._deleteOp(groupId, type); - if (!op) return; - await op.cancel("Operation canceled."); - } - - _deleteOp(groupId, type) { - const opIndex = this._queue.findIndex( - (item) => item.groupId === groupId && (!type || item.type === type) - ); - if (opIndex < 0) return []; - return this._queue.splice(opIndex, 1); + const queue = + type === "downloads" + ? this.downloads.get(groupId) + : this.uploads.get(groupId); + if (!queue) return; + for (let i = 0; i < queue.length; ++i) { + const file = queue[i]; + if (file.cancel) await file.cancel("Operation canceled."); + queue.splice(i, 1); + } + if (type === "download") this.downloads.delete(groupId); + else if (type === "upload") this.uploads.delete(groupId); } readEncrypted(filename, encryptionKey, cipherData) {