web: optimize attachment uploading

This commit is contained in:
Abdullah Atta
2024-07-24 12:39:05 +05:00
committed by Abdullah Atta
parent 053f70cdb4
commit e4a755c69f
5 changed files with 8101 additions and 1950 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -15,6 +15,7 @@
"@dnd-kit/sortable": "^8.0.0", "@dnd-kit/sortable": "^8.0.0",
"@emotion/react": "11.11.1", "@emotion/react": "11.11.1",
"@hazae41/foras": "^2.1.4", "@hazae41/foras": "^2.1.4",
"@henrygd/queue": "^1.0.6",
"@mdi/js": "^7.2.96", "@mdi/js": "^7.2.96",
"@mdi/react": "^1.6.1", "@mdi/react": "^1.6.1",
"@notesnook-importer/core": "^2.0.0", "@notesnook-importer/core": "^2.0.0",

View File

@@ -51,6 +51,12 @@ export class IndexedDBFileStore implements IFileStorage {
readChunk(chunkName: string): Promise<Uint8Array | undefined> { readChunk(chunkName: string): Promise<Uint8Array | undefined> {
return this.storage.get(chunkName); return this.storage.get(chunkName);
} }
async listChunks(chunkPrefix: string): Promise<string[]> {
const keys = await this.storage.keys();
return keys.filter((k) =>
(k as string).startsWith(chunkPrefix)
) as string[];
}
} }
export class CacheStorageFileStore implements IFileStorage { export class CacheStorageFileStore implements IFileStorage {
@@ -108,6 +114,14 @@ export class CacheStorageFileStore implements IFileStorage {
return response ? new Uint8Array(await response.arrayBuffer()) : undefined; return response ? new Uint8Array(await response.arrayBuffer()) : undefined;
} }
async listChunks(chunkPrefix: string): Promise<string[]> {
const cache = await this.getCache();
const keys = await cache.keys();
return keys
.filter((k) => k.url.startsWith(`/${chunkPrefix}`))
.map((r) => r.url.slice(1));
}
private toURL(chunkName: string) { private toURL(chunkName: string) {
return `/${chunkName}`; return `/${chunkName}`;
} }
@@ -158,4 +172,8 @@ export class OriginPrivateFileSystem implements IFileStorage {
await this.create(); await this.create();
return this.worker.readChunk(this.name, chunkName); return this.worker.readChunk(this.name, chunkName);
} }
async listChunks(chunkPrefix: string): Promise<string[]> {
await this.create();
return (await this.worker.listChunks(this.name, chunkPrefix)) || [];
}
} }

View File

@@ -19,7 +19,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
import "web-streams-polyfill/dist/ponyfill"; import "web-streams-polyfill/dist/ponyfill";
import { xxhash64, createXXHash64 } from "hash-wasm"; import { xxhash64, createXXHash64 } from "hash-wasm";
import axios, { AxiosProgressEvent } from "axios"; import axios from "axios";
import { AppEventManager, AppEvents } from "../common/app-events"; import { AppEventManager, AppEvents } from "../common/app-events";
import { StreamableFS } from "@notesnook/streamable-fs"; import { StreamableFS } from "@notesnook/streamable-fs";
import { NNCrypto } from "./nncrypto"; import { NNCrypto } from "./nncrypto";
@@ -50,12 +50,13 @@ import {
RequestOptions RequestOptions
} from "@notesnook/core/dist/interfaces"; } from "@notesnook/core/dist/interfaces";
import { logger } from "../utils/logger"; import { logger } from "../utils/logger";
import { newQueue } from "@henrygd/queue";
const ABYTES = 17; const ABYTES = 17;
const CHUNK_SIZE = 512 * 1024; const CHUNK_SIZE = 512 * 1024;
const ENCRYPTED_CHUNK_SIZE = CHUNK_SIZE + ABYTES; const ENCRYPTED_CHUNK_SIZE = CHUNK_SIZE + ABYTES;
const UPLOAD_PART_REQUIRED_CHUNKS = Math.ceil( const UPLOAD_PART_REQUIRED_CHUNKS = Math.ceil(
(5 * 1024 * 1024) / ENCRYPTED_CHUNK_SIZE (10 * 1024 * 1024) / ENCRYPTED_CHUNK_SIZE
); );
const MINIMUM_MULTIPART_FILE_SIZE = 25 * 1024 * 1024; const MINIMUM_MULTIPART_FILE_SIZE = 25 * 1024 * 1024;
const streamablefs = new StreamableFS( const streamablefs = new StreamableFS(
@@ -241,7 +242,7 @@ async function uploadFile(
if (uploadedFileSize > 0) return true; if (uploadedFileSize > 0) return true;
const fileHandle = await streamablefs.readFile(filename); const fileHandle = await streamablefs.readFile(filename);
if (!fileHandle || !(await exists(filename))) if (!fileHandle || !(await exists(fileHandle)))
throw new Error( throw new Error(
`File is corrupt or missing data. Please upload the file again. (File hash: ${filename})` `File is corrupt or missing data. Please upload the file again. (File hash: ${filename})`
); );
@@ -260,9 +261,6 @@ async function uploadFile(
await checkUpload(filename); await checkUpload(filename);
await fileHandle.addAdditionalData("uploaded", true); await fileHandle.addAdditionalData("uploaded", true);
if (isAttachmentDeletable(fileHandle.file.type)) {
await streamablefs.deleteFile(filename);
}
} }
return uploaded; return uploaded;
@@ -340,7 +338,7 @@ async function multiPartUploadFile(
{}) as UploadAdditionalData; {}) as UploadAdditionalData;
const TOTAL_PARTS = Math.ceil( const TOTAL_PARTS = Math.ceil(
fileHandle.file.chunks / UPLOAD_PART_REQUIRED_CHUNKS fileHandle.chunks.length / UPLOAD_PART_REQUIRED_CHUNKS
); );
const { uploadedChunks = [] } = additionalData; const { uploadedChunks = [] } = additionalData;
let { uploadedBytes = 0, uploadId = "" } = additionalData; let { uploadedBytes = 0, uploadId = "" } = additionalData;
@@ -368,11 +366,11 @@ async function multiPartUploadFile(
await fileHandle.addAdditionalData("uploadId", uploadId); await fileHandle.addAdditionalData("uploadId", uploadId);
const onUploadProgress = (ev: AxiosProgressEvent) => { const onUploadProgress = () => {
reportProgress( reportProgress(
{ {
total: fileHandle.file.size + ABYTES * TOTAL_PARTS, total: fileHandle.file.size + ABYTES * TOTAL_PARTS,
loaded: uploadedBytes + ev.loaded loaded: uploadedBytes
}, },
{ {
type: "upload", type: "upload",
@@ -381,41 +379,49 @@ async function multiPartUploadFile(
); );
}; };
onUploadProgress({ bytes: 0, loaded: 0 }); onUploadProgress();
const queue = newQueue(4);
for (let i = uploadedChunks.length; i < TOTAL_PARTS; ++i) { for (let i = uploadedChunks.length; i < TOTAL_PARTS; ++i) {
const blob = await fileHandle.readChunks( const from = i * UPLOAD_PART_REQUIRED_CHUNKS;
i * UPLOAD_PART_REQUIRED_CHUNKS, const length = Math.min(
fileHandle.chunks.length - from,
UPLOAD_PART_REQUIRED_CHUNKS UPLOAD_PART_REQUIRED_CHUNKS
); );
const url = parts[i]; const url = parts[i];
const response = await axios queue.add(async () => {
.request({ const blob = await fileHandle.readChunks(
url, i * UPLOAD_PART_REQUIRED_CHUNKS,
method: "PUT", length
headers: { "Content-Type": "" },
signal,
data: blob,
onUploadProgress
})
.catch((e) => {
throw new WrappedError(`Failed to upload part at offset ${i}`, e);
});
if (!response.headers.etag || typeof response.headers.etag !== "string")
throw new Error(
`Failed to upload part at offset ${i}: invalid etag. ETag: ${response.headers.etag}`
); );
const response = await axios
.request({
url,
method: "PUT",
headers: { "Content-Type": "" },
signal,
data: blob,
onUploadProgress: (ev) => {
uploadedBytes += ev.bytes;
onUploadProgress();
}
})
.catch((e) => {
throw new WrappedError(`Failed to upload part at offset ${i}`, e);
});
uploadedBytes += blob.size; if (!response.headers.etag || typeof response.headers.etag !== "string")
uploadedChunks.push({ throw new Error(
PartNumber: i + 1, `Failed to upload part at offset ${i}: invalid etag. ETag: ${response.headers.etag}`
ETag: JSON.parse(response.headers.etag) );
uploadedChunks.push({
PartNumber: i + 1,
ETag: JSON.parse(response.headers.etag)
});
await fileHandle.addAdditionalData("uploadedChunks", uploadedChunks);
await fileHandle.addAdditionalData("uploadedBytes", uploadedBytes);
}); });
await fileHandle.addAdditionalData("uploadedChunks", uploadedChunks);
await fileHandle.addAdditionalData("uploadedBytes", uploadedBytes);
onUploadProgress({ bytes: 0, loaded: 0 });
} }
await queue.done();
await axios await axios
.post( .post(
@@ -423,7 +429,7 @@ async function multiPartUploadFile(
{ {
Key: filename, Key: filename,
UploadId: uploadId, UploadId: uploadId,
PartETags: uploadedChunks PartETags: uploadedChunks.sort((a, b) => a.PartNumber - b.PartNumber)
}, },
{ {
headers, headers,
@@ -473,12 +479,8 @@ async function downloadFile(
const { url, headers, chunkSize, signal } = requestOptions; const { url, headers, chunkSize, signal } = requestOptions;
const handle = await streamablefs.readFile(filename); const handle = await streamablefs.readFile(filename);
if ( if (handle && (await exists(handle))) return true;
handle && if (handle) await handle.delete();
handle.file.size === (await handle.size()) - handle.file.chunks * ABYTES
)
return true;
else if (handle) await handle.delete();
const attachment = await db.attachments.attachment(filename); const attachment = await db.attachments.attachment(filename);
if (!attachment) throw new Error("Attachment doesn't exist."); if (!attachment) throw new Error("Attachment doesn't exist.");
@@ -488,6 +490,21 @@ async function downloadFile(
{ type: "download", hash: filename } { type: "download", hash: filename }
); );
const size = await getUploadedFileSize(filename);
if (size <= 0) {
const error = `File length is 0. Please upload this file again from the attachment manager. (File hash: ${filename})`;
await db.attachments.markAsFailed(attachment.id, error);
throw new Error(error);
}
const totalChunks = Math.ceil(size / chunkSize);
const decryptedLength = size - totalChunks * ABYTES;
if (attachment && attachment.size !== decryptedLength) {
const error = `File length mismatch. Expected ${attachment.size} but got ${decryptedLength} bytes. Please upload this file again from the attachment manager. (File hash: ${filename})`;
await db.attachments.markAsFailed(attachment.id, error);
throw new Error(error);
}
const signedUrl = ( const signedUrl = (
await axios.get(url, { await axios.get(url, {
headers, headers,
@@ -497,55 +514,22 @@ async function downloadFile(
logger.debug("Got attachment signed url", { filename }); logger.debug("Got attachment signed url", { filename });
const response = await fetch(signedUrl, {
signal
});
logger.debug("Got attachment", { filename });
const contentType = response.headers.get("content-type");
if (contentType === "application/xml") {
const error = parseS3Error(await response.text());
if (error.Code !== "Unknown") {
throw new Error(`[${error.Code}] ${error.Message}`);
}
}
const contentLength = parseInt(
response.headers.get("content-length") || "0"
);
if (contentLength === 0 || isNaN(contentLength)) {
const error = `File length is 0. Please upload this file again from the attachment manager. (File hash: ${filename})`;
await db.attachments.markAsFailed(attachment.id, error);
throw new Error(error);
}
if (!response.body) {
const error = `The download response does not contain a body. Please upload this file again from the attachment manager. (File hash: ${filename})`;
await db.attachments.markAsFailed(attachment.id, error);
throw new Error(error);
}
const totalChunks = Math.ceil(contentLength / chunkSize);
const decryptedLength = contentLength - totalChunks * ABYTES;
if (attachment && attachment.size !== decryptedLength) {
const error = `File length mismatch. Please upload this file again from the attachment manager. (File hash: ${filename})`;
await db.attachments.markAsFailed(attachment.id, error);
throw new Error(error);
}
const fileHandle = await streamablefs.createFile( const fileHandle = await streamablefs.createFile(
filename, filename,
decryptedLength, decryptedLength,
attachment?.mimeType || "application/octet-stream" attachment.mimeType || "application/octet-stream"
); );
const response = await fetch(signedUrl, {
signal
});
await response.body await response.body
.pipeThrough( ?.pipeThrough(
new ProgressStream((totalRead, done) => { new ProgressStream((totalRead, done) => {
reportProgress( reportProgress(
{ {
total: contentLength, total: size,
loaded: done ? contentLength : totalRead loaded: done ? size : totalRead
}, },
{ type: "download", hash: filename } { type: "download", hash: filename }
); );
@@ -569,11 +553,14 @@ async function downloadFile(
} }
} }
async function exists(filename: string) { async function exists(filename: string | FileHandle) {
const handle = await streamablefs.readFile(filename); const handle =
typeof filename === "string"
? await streamablefs.readFile(filename)
: filename;
return ( return (
!!handle && !!handle &&
handle.file.size === (await handle.size()) - handle.file.chunks * ABYTES handle.file.size === (await handle.size()) - handle.chunks.length * ABYTES
); );
} }
@@ -610,14 +597,11 @@ export async function streamingDecryptFile(
export async function saveFile(filename: string, fileMetadata: FileMetadata) { export async function saveFile(filename: string, fileMetadata: FileMetadata) {
logger.debug("Saving file", { filename }); logger.debug("Saving file", { filename });
const { name, type, isUploaded } = fileMetadata; const { name, type } = fileMetadata;
const decrypted = await decryptFile(filename, fileMetadata); const decrypted = await decryptFile(filename, fileMetadata);
logger.debug("Decrypting file", { filename, result: !!decrypted }); logger.debug("Decrypting file", { filename, result: !!decrypted });
if (decrypted) saveAs(decrypted, getFileNameWithExtension(name, type)); if (decrypted) saveAs(decrypted, getFileNameWithExtension(name, type));
if (isUploaded && isAttachmentDeletable(type))
await streamablefs.deleteFile(filename);
} }
async function deleteFile( async function deleteFile(
@@ -684,10 +668,6 @@ export const FileStorage: IFileStorage = {
hashBase64 hashBase64
}; };
function isAttachmentDeletable(type: string) {
return !type.startsWith("image/") && !type.startsWith("application/pdf");
}
function isSuccessStatusCode(statusCode: number) { function isSuccessStatusCode(statusCode: number) {
return statusCode >= 200 && statusCode <= 299; return statusCode >= 200 && statusCode <= 299;
} }

View File

@@ -93,6 +93,14 @@ class OriginPrivateFileStore implements IFileStorage {
} }
} }
async listChunks(chunkPrefix: string): Promise<string[]> {
const chunks: string[] = [];
for await (const entry of this.directory.keys()) {
if (entry.startsWith(chunkPrefix)) chunks.push(entry);
}
return chunks;
}
private async safeOp<T>(chunkName: string, createPromise: () => Promise<T>) { private async safeOp<T>(chunkName: string, createPromise: () => Promise<T>) {
const lock = this.locks.get(chunkName); const lock = this.locks.get(chunkName);
if (lock) await lock; if (lock) await lock;
@@ -139,6 +147,9 @@ const workerModule = {
async readChunk(directoryName: string, chunkName: string) { async readChunk(directoryName: string, chunkName: string) {
const chunk = await fileStores.get(directoryName)?.readChunk(chunkName); const chunk = await fileStores.get(directoryName)?.readChunk(chunkName);
return chunk ? transfer(chunk, [chunk.buffer]) : undefined; return chunk ? transfer(chunk, [chunk.buffer]) : undefined;
},
async listChunks(directoryName: string, chunkPrefix: string) {
return (await fileStores.get(directoryName)?.listChunks(chunkPrefix)) || [];
} }
}; };