core: fix download/upload cancellation

This commit is contained in:
Abdullah Atta
2023-09-20 15:37:27 +05:00
committed by Ammar Ahmed
parent 0cf76e1f11
commit 207347190d
7 changed files with 160 additions and 124 deletions

View File

@@ -21,7 +21,6 @@ import {
checkSyncStatus,
EV,
EVENTS,
sendAttachmentsProgressEvent,
sendSyncProgressEvent,
SYNC_CHECK_IDS
} from "../../common";
@@ -339,23 +338,10 @@ class Sync {
const attachments = this.db.attachments.pending;
this.logger.info("Uploading attachments...", { total: attachments.length });
for (var i = 0; i < attachments.length; ++i) {
const attachment = attachments[i];
const { hash } = attachment.metadata;
sendAttachmentsProgressEvent("upload", hash, attachments.length, i);
try {
const isUploaded = await this.db.fs.uploadFile(hash, hash);
if (!isUploaded) throw new Error("Failed to upload file.");
await this.db.attachments.markAsUploaded(attachment.id);
} catch (e) {
logger.error(e, { attachment });
const error = e.message;
await this.db.attachments.markAsFailed(attachment.id, error);
}
}
sendAttachmentsProgressEvent("upload", null, attachments.length);
await this.db.fs.queueUploads(
attachments.map((a) => ({ filename: a.metadata.hash })),
"sync-uploads"
);
}
/**

View File

@@ -20,7 +20,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
import Collection from "./collection";
import { getId } from "../utils/id";
import { deleteItem, hasItem } from "../utils/array";
import { EV, EVENTS, sendAttachmentsProgressEvent } from "../common";
import { EV, EVENTS } from "../common";
import dataurl from "../utils/dataurl";
import dayjs from "dayjs";
import setManipulator from "../utils/set";
@@ -34,6 +34,36 @@ export default class Attachments extends Collection {
constructor(db, name, cached) {
super(db, name, cached);
this.key = null;
EV.subscribe(
EVENTS.fileDownloaded,
async ({ success, filename, groupId, eventData }) => {
if (!success || !eventData || !eventData.readOnDownload) return;
const attachment = this.attachment(filename);
if (!attachment) return;
const src = await this.read(filename, getOutputType(attachment));
if (!src) return;
EV.publish(EVENTS.mediaAttachmentDownloaded, {
groupId,
hash: attachment.metadata.hash,
attachmentType: getAttachmentType(attachment),
src
});
}
);
EV.subscribe(EVENTS.fileUploaded, async ({ success, error, filename }) => {
const attachment = this.attachment(filename);
if (!attachment) return;
if (success) await this.markAsUploaded(attachment.id);
else
await this.markAsFailed(
attachment.id,
error || "Failed to upload attachment."
);
});
}
merge(localAttachment, remoteAttachment) {
@@ -317,47 +347,15 @@ export default class Attachments extends Collection {
(!hashesToLoad || hasItem(hashesToLoad, attachment.metadata.hash))
);
try {
for (let i = 0; i < attachments.length; i++) {
const attachment = attachments[i];
await this._download(attachment, {
total: attachments.length,
current: i,
groupId: noteId
});
}
} finally {
sendAttachmentsProgressEvent("download", noteId, attachments.length);
}
}
async _download(attachment, { total, current, groupId }, notify = true) {
const { metadata, chunkSize } = attachment;
const filename = metadata.hash;
if (notify)
sendAttachmentsProgressEvent("download", groupId, total, current);
const isDownloaded = await this._db.fs.downloadFile(
groupId,
filename,
chunkSize,
metadata
await this._db.fs.queueDownloads(
attachments.map((a) => ({
filename: a.metadata.hash,
metadata: a.metadata,
chunkSize: a.chunkSize
})),
noteId,
{ readOnDownload: true }
);
if (!isDownloaded) return;
const src = await this.read(metadata.hash, getOutputType(attachment));
if (!src) return;
if (notify)
EV.publish(EVENTS.mediaAttachmentDownloaded, {
groupId,
hash: metadata.hash,
attachmentType: getAttachmentType(attachment),
src
});
return src;
}
async cleanup() {
@@ -451,7 +449,7 @@ export default class Attachments extends Collection {
}
}
function getOutputType(attachment) {
export function getOutputType(attachment) {
if (attachment.metadata.type === "application/vnd.notesnook.web-clip")
return "text";
else if (attachment.metadata.type.startsWith("image/")) return "base64";

View File

@@ -21,6 +21,7 @@ import Collection from "./collection";
import { getId } from "../utils/id";
import { getContentFromData } from "../content-types";
import { hasItem } from "../utils/array";
import { getOutputType } from "./attachments";
export default class Content extends Collection {
async add(content) {
@@ -116,17 +117,27 @@ export default class Content extends Collection {
async downloadMedia(groupId, contentItem, notify = true) {
const content = getContentFromData(contentItem.type, contentItem.data);
contentItem.data = await content.insertMedia((hash, { total, current }) => {
const attachment = this._db.attachments.attachment(hash);
if (!attachment) return;
const progressData = {
total,
current,
groupId
};
return this._db.attachments._download(attachment, progressData, notify);
contentItem.data = await content.insertMedia(async (hashes) => {
const attachments = hashes.map((h) => this._db.attachments.attachment(h));
await this._db.fs.queueDownloads(
attachments.map((a) => ({
filename: a.metadata.hash,
metadata: a.metadata,
chunkSize: a.chunkSize
})),
groupId,
notify ? { readOnDownload: false } : undefined
);
const sources = {};
for (const attachment of attachments) {
const src = await this._db.attachments.read(
attachment.metadata.hash,
getOutputType(attachment)
);
if (!src) continue;
sources[attachment.metadata.hash] = src;
}
return sources;
});
return contentItem;
}

View File

@@ -36,15 +36,6 @@ export async function checkSyncStatus(type) {
return results.some((r) => r.type === type && r.result === true);
}
export function sendAttachmentsProgressEvent(type, groupId, total, current) {
EV.publish(EVENTS.attachmentsLoading, {
type,
groupId,
total,
current: current === undefined ? total : current
});
}
export function sendSyncProgressEvent(EV, type, current) {
EV.publish(EVENTS.syncProgress, {
type,
@@ -98,7 +89,8 @@ export const EVENTS = {
noteRemoved: "note:removed",
tokenRefreshed: "token:refreshed",
userUnauthorized: "user:unauthorized",
attachmentsLoading: "attachments:loading",
fileDownloaded: "file:downloaded",
fileUploaded: "file:uploaded",
attachmentDeleted: "attachment:deleted",
mediaAttachmentDownloaded: "attachments:mediaDownloaded",
vaultLocked: "vault:locked",

View File

@@ -41,7 +41,13 @@ test("img src is present after insert attachments", async () => {
return { key: "hello", metadata: {} };
});
const tiptap2 = new Tiptap(result.data);
const result2 = await tiptap2.insertMedia(() => "i am a data");
const result2 = await tiptap2.insertMedia((hashes) => {
const images = {};
for (const hash of hashes) {
images[hash] = "i am a data";
}
return images;
});
expect(result2).toContain(`src="i am a data"`);
});

View File

@@ -100,7 +100,7 @@ export class Tiptap {
return tokens.some((token) => lowercase.indexOf(token) > -1);
}
async insertMedia(getData) {
async insertMedia(resolve) {
let hashes = [];
new HTMLParser({
ontag: (name, attr) => {
@@ -108,21 +108,9 @@ export class Tiptap {
if (name === "img" && hash) hashes.push(hash);
}
}).parse(this.data);
if (!hashes.length) return this.data;
const images = {};
let hasImages = false;
for (let i = 0; i < hashes.length; ++i) {
const hash = hashes[i];
const src = await getData(hash, {
total: hashes.length,
current: i
});
if (!src) continue;
images[hash] = src;
hasImages = true;
}
if (!hasImages) return this.data;
const images = await resolve(hashes);
return new HTMLRewriter({
ontag: (name, attr) => {
const hash = attr[ATTRIBUTES.hash];

View File

@@ -19,12 +19,79 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
import hosts from "../utils/constants";
import TokenManager from "../api/token-manager";
import { EV, EVENTS } from "../common";
export default class FileStorage {
constructor(fs, storage) {
this.fs = fs;
this.tokenManager = new TokenManager(storage);
this._queue = [];
this.downloads = new Map();
this.uploads = new Map();
}
async queueDownloads(files, groupId, eventData) {
const token = await this.tokenManager.getAccessToken();
const total = files.length;
let current = 0;
this.downloads.set(groupId, files);
for (const file of files) {
const { filename, metadata, chunkSize } = file;
const url = `${hosts.API_HOST}/s3?name=${filename}`;
const { execute, cancel } = this.fs.downloadFile(filename, {
metadata,
url,
chunkSize,
headers: { Authorization: `Bearer ${token}` }
});
file.cancel = cancel;
const result = await execute().catch(() => false);
if (eventData)
EV.publish(EVENTS.fileDownloaded, {
success: result,
total,
current: ++current,
groupId,
filename,
eventData
});
}
this.downloads.delete(groupId);
}
async queueUploads(files, groupId) {
const token = await this.tokenManager.getAccessToken();
const total = files.length;
let current = 0;
this.uploads.set(groupId, files);
for (const file of files) {
const { filename } = file;
const url = `${hosts.API_HOST}/s3?name=${filename}`;
const { execute, cancel } = this.fs.uploadFile(filename, {
url,
headers: { Authorization: `Bearer ${token}` }
});
file.cancel = cancel;
let error = null;
const result = await execute().catch((e) => {
console.error("Failed to upload attachment:", e);
error = e;
return false;
});
EV.publish(EVENTS.fileUploaded, {
error,
success: result,
total,
current: ++current,
groupId,
filename
});
}
this.uploads.delete(groupId);
}
async downloadFile(groupId, filename, chunkSize, metadata) {
@@ -36,37 +103,25 @@ export default class FileStorage {
chunkSize,
headers: { Authorization: `Bearer ${token}` }
});
this._queue.push({ groupId, filename, cancel, type: "download" });
this.downloads.set(groupId, [{ cancel }]);
const result = await execute();
this._deleteOp(groupId, "download");
return result;
}
async uploadFile(groupId, filename) {
const token = await this.tokenManager.getAccessToken();
const url = `${hosts.API_HOST}/s3?name=${filename}`;
const { execute, cancel } = this.fs.uploadFile(filename, {
url,
headers: { Authorization: `Bearer ${token}` }
});
this._queue.push({ groupId, filename, cancel, type: "upload" });
const result = await execute();
this._deleteOp(groupId, "upload");
this.downloads.delete(groupId);
return result;
}
async cancel(groupId, type) {
const [op] = this._deleteOp(groupId, type);
if (!op) return;
await op.cancel("Operation canceled.");
}
_deleteOp(groupId, type) {
const opIndex = this._queue.findIndex(
(item) => item.groupId === groupId && (!type || item.type === type)
);
if (opIndex < 0) return [];
return this._queue.splice(opIndex, 1);
const queue =
type === "downloads"
? this.downloads.get(groupId)
: this.uploads.get(groupId);
if (!queue) return;
for (let i = 0; i < queue.length; ++i) {
const file = queue[i];
if (file.cancel) await file.cancel("Operation canceled.");
queue.splice(i, 1);
}
if (type === "download") this.downloads.delete(groupId);
else if (type === "upload") this.uploads.delete(groupId);
}
readEncrypted(filename, encryptionKey, cipherData) {