diff --git a/apps/web/src/interfaces/fs.ts b/apps/web/src/interfaces/fs.ts index 7a1a3085d..95fe7c09a 100644 --- a/apps/web/src/interfaces/fs.ts +++ b/apps/web/src/interfaces/fs.ts @@ -45,6 +45,7 @@ const ENCRYPTED_CHUNK_SIZE = CHUNK_SIZE + ABYTES; const UPLOAD_PART_REQUIRED_CHUNKS = Math.ceil( (5 * 1024 * 1024) / ENCRYPTED_CHUNK_SIZE ); +const MINIMUM_MULTIPART_FILE_SIZE = 25 * 1024 * 1024; const streamablefs = new StreamableFS("streamable-fs"); async function writeEncryptedFile( @@ -195,126 +196,38 @@ type RequestOptions = { chunkSize: number; }; +type UploadAdditionalData = { + uploadedBytes?: number; + uploadId?: string; + uploaded?: boolean; + uploadedChunks?: { PartNumber: number; ETag: string }[]; +}; + async function uploadFile(filename: string, requestOptions: RequestOptions) { const fileHandle = await streamablefs.readFile(filename); if (!fileHandle) throw new Error(`File stream not found. (File hash: ${filename})`); - const TOTAL_PARTS = Math.ceil( - fileHandle.file.chunks / UPLOAD_PART_REQUIRED_CHUNKS - ); - - const additionalData = (fileHandle.file.additionalData || {}) as { - uploadedBytes?: number; - uploadId?: string; - uploaded?: boolean; - uploadedChunks?: { PartNumber: number; ETag: string }[]; - }; - - const { uploadedChunks = [], uploaded = false } = additionalData; - let { uploadedBytes = 0, uploadId = "" } = additionalData; - try { - if (uploaded) { + if (fileHandle.file.additionalData?.uploaded) { await checkUpload(filename); return true; } - const { headers, signal } = requestOptions; + const uploaded = + fileHandle.file.size < MINIMUM_MULTIPART_FILE_SIZE + ? await singlePartUploadFile(fileHandle, filename, requestOptions) + : await multiPartUploadFile(fileHandle, filename, requestOptions); - const initiateMultiPartUpload = await axios - .get( - `${hosts.API_HOST}/s3/multipart?name=${filename}&parts=${TOTAL_PARTS}&uploadId=${uploadId}`, - { - headers, - signal - } - ) - .catch((e) => { - throw new WrappedError("Could not initiate multi-part upload.", e); - }); + if (uploaded) { + await checkUpload(filename); - uploadId = initiateMultiPartUpload.data.uploadId; - const { parts } = initiateMultiPartUpload.data; - - if (!parts) - throw new Error( - "Could not initiate multi-part upload: invalid response." - ); - - await fileHandle.addAdditionalData("uploadId", uploadId); - - const onUploadProgress = (ev: AxiosProgressEvent) => { - reportProgress( - { - total: fileHandle.file.size + ABYTES, - loaded: uploadedBytes + ev.loaded - }, - { - type: "upload", - hash: filename - } - ); - }; - - for (let i = uploadedChunks.length; i < TOTAL_PARTS; ++i) { - const blob = await fileHandle.readChunks( - i * UPLOAD_PART_REQUIRED_CHUNKS, - UPLOAD_PART_REQUIRED_CHUNKS - ); - const url = parts[i]; - const data = await blob.arrayBuffer(); - const response = await axios - .request({ - url, - method: "PUT", - headers: { "Content-Type": "" }, - signal, - data, - onUploadProgress - }) - .catch((e) => { - throw new WrappedError(`Failed to upload part at offset ${i}`, e); - }); - - if (!response.headers.etag || typeof response.headers.etag !== "string") - throw new Error( - `Failed to upload part at offset ${i}: invalid etag. ETag: ${response.headers.etag}` - ); - - uploadedBytes += blob.size; - uploadedChunks.push({ - PartNumber: i + 1, - ETag: JSON.parse(response.headers.etag) - }); - await fileHandle.addAdditionalData("uploadedChunks", uploadedChunks); - await fileHandle.addAdditionalData("uploadedBytes", uploadedBytes); + await fileHandle.addAdditionalData("uploaded", true); + if (isAttachmentDeletable(fileHandle.file.type)) { + await streamablefs.deleteFile(filename); + } } - await axios - .post( - `${hosts.API_HOST}/s3/multipart`, - { - Key: filename, - UploadId: uploadId, - PartETags: uploadedChunks - }, - { - headers, - signal - } - ) - .catch(async (e) => { - await resetUpload(fileHandle); - throw new WrappedError("Could not complete multi-part upload.", e); - }); - - await fileHandle.addAdditionalData("uploaded", true); - - if (isAttachmentDeletable(fileHandle.file.type)) { - await streamablefs.deleteFile(filename); - } - await checkUpload(filename); - return true; + return uploaded; } catch (e) { reportProgress(undefined, { type: "upload", hash: filename }); const error = toS3Error(e); @@ -336,6 +249,148 @@ async function uploadFile(filename: string, requestOptions: RequestOptions) { } } +async function singlePartUploadFile( + fileHandle: FileHandle, + filename: string, + requestOptions: RequestOptions +) { + console.log("Streaming file upload!"); + const { url, headers, signal } = requestOptions; + + const uploadUrl = await fetch(url, { + method: "PUT", + headers, + signal + }).then((res) => (res.ok ? res.text() : null)); + if (!uploadUrl) throw new Error("Unable to resolve attachment upload url."); + + const response = await axios.request({ + url: uploadUrl, + method: "PUT", + headers: { + "Content-Type": "application/octet-stream" + }, + data: await fileHandle.toBlob(), + signal, + onUploadProgress: (ev) => + reportProgress( + { + total: fileHandle.file.size + ABYTES, + loaded: ev.loaded + }, + { + type: "upload", + hash: filename + } + ) + }); + return isSuccessStatusCode(response.status); +} + +async function multiPartUploadFile( + fileHandle: FileHandle, + filename: string, + requestOptions: RequestOptions +) { + const { headers, signal } = requestOptions; + + const additionalData = (fileHandle.file.additionalData || + {}) as UploadAdditionalData; + + const TOTAL_PARTS = Math.ceil( + fileHandle.file.chunks / UPLOAD_PART_REQUIRED_CHUNKS + ); + const { uploadedChunks = [] } = additionalData; + let { uploadedBytes = 0, uploadId = "" } = additionalData; + + const initiateMultiPartUpload = await axios + .get( + `${hosts.API_HOST}/s3/multipart?name=${filename}&parts=${TOTAL_PARTS}&uploadId=${uploadId}`, + { + headers, + signal + } + ) + .catch((e) => { + throw new WrappedError("Could not initiate multi-part upload.", e); + }); + + uploadId = initiateMultiPartUpload.data.uploadId; + const { parts } = initiateMultiPartUpload.data; + + if (!parts) + throw new Error("Could not initiate multi-part upload: invalid response."); + + await fileHandle.addAdditionalData("uploadId", uploadId); + + const onUploadProgress = (ev: AxiosProgressEvent) => { + reportProgress( + { + total: fileHandle.file.size + ABYTES, + loaded: uploadedBytes + ev.loaded + }, + { + type: "upload", + hash: filename + } + ); + }; + + for (let i = uploadedChunks.length; i < TOTAL_PARTS; ++i) { + const blob = await fileHandle.readChunks( + i * UPLOAD_PART_REQUIRED_CHUNKS, + UPLOAD_PART_REQUIRED_CHUNKS + ); + const url = parts[i]; + const data = await blob.arrayBuffer(); + const response = await axios + .request({ + url, + method: "PUT", + headers: { "Content-Type": "" }, + signal, + data, + onUploadProgress + }) + .catch((e) => { + throw new WrappedError(`Failed to upload part at offset ${i}`, e); + }); + + if (!response.headers.etag || typeof response.headers.etag !== "string") + throw new Error( + `Failed to upload part at offset ${i}: invalid etag. ETag: ${response.headers.etag}` + ); + + uploadedBytes += blob.size; + uploadedChunks.push({ + PartNumber: i + 1, + ETag: JSON.parse(response.headers.etag) + }); + await fileHandle.addAdditionalData("uploadedChunks", uploadedChunks); + await fileHandle.addAdditionalData("uploadedBytes", uploadedBytes); + } + + await axios + .post( + `${hosts.API_HOST}/s3/multipart`, + { + Key: filename, + UploadId: uploadId, + PartETags: uploadedChunks + }, + { + headers, + signal + } + ) + .catch(async (e) => { + await resetUpload(fileHandle); + throw new WrappedError("Could not complete multi-part upload.", e); + }); + + return true; +} + async function resetUpload(fileHandle: FileHandle) { await fileHandle.addAdditionalData("uploadId", undefined); await fileHandle.addAdditionalData("uploadedChunks", undefined);