web: upload files < 25mb in a single request

This commit is contained in:
Abdullah Atta
2023-08-03 04:09:32 +05:00
committed by Abdullah Atta
parent 0f9452f6ca
commit c38a98c239

View File

@@ -45,6 +45,7 @@ const ENCRYPTED_CHUNK_SIZE = CHUNK_SIZE + ABYTES;
const UPLOAD_PART_REQUIRED_CHUNKS = Math.ceil(
(5 * 1024 * 1024) / ENCRYPTED_CHUNK_SIZE
);
const MINIMUM_MULTIPART_FILE_SIZE = 25 * 1024 * 1024;
const streamablefs = new StreamableFS("streamable-fs");
async function writeEncryptedFile(
@@ -195,126 +196,38 @@ type RequestOptions = {
chunkSize: number;
};
type UploadAdditionalData = {
uploadedBytes?: number;
uploadId?: string;
uploaded?: boolean;
uploadedChunks?: { PartNumber: number; ETag: string }[];
};
async function uploadFile(filename: string, requestOptions: RequestOptions) {
const fileHandle = await streamablefs.readFile(filename);
if (!fileHandle)
throw new Error(`File stream not found. (File hash: ${filename})`);
const TOTAL_PARTS = Math.ceil(
fileHandle.file.chunks / UPLOAD_PART_REQUIRED_CHUNKS
);
const additionalData = (fileHandle.file.additionalData || {}) as {
uploadedBytes?: number;
uploadId?: string;
uploaded?: boolean;
uploadedChunks?: { PartNumber: number; ETag: string }[];
};
const { uploadedChunks = [], uploaded = false } = additionalData;
let { uploadedBytes = 0, uploadId = "" } = additionalData;
try {
if (uploaded) {
if (fileHandle.file.additionalData?.uploaded) {
await checkUpload(filename);
return true;
}
const { headers, signal } = requestOptions;
const uploaded =
fileHandle.file.size < MINIMUM_MULTIPART_FILE_SIZE
? await singlePartUploadFile(fileHandle, filename, requestOptions)
: await multiPartUploadFile(fileHandle, filename, requestOptions);
const initiateMultiPartUpload = await axios
.get(
`${hosts.API_HOST}/s3/multipart?name=${filename}&parts=${TOTAL_PARTS}&uploadId=${uploadId}`,
{
headers,
signal
}
)
.catch((e) => {
throw new WrappedError("Could not initiate multi-part upload.", e);
});
if (uploaded) {
await checkUpload(filename);
uploadId = initiateMultiPartUpload.data.uploadId;
const { parts } = initiateMultiPartUpload.data;
if (!parts)
throw new Error(
"Could not initiate multi-part upload: invalid response."
);
await fileHandle.addAdditionalData("uploadId", uploadId);
const onUploadProgress = (ev: AxiosProgressEvent) => {
reportProgress(
{
total: fileHandle.file.size + ABYTES,
loaded: uploadedBytes + ev.loaded
},
{
type: "upload",
hash: filename
}
);
};
for (let i = uploadedChunks.length; i < TOTAL_PARTS; ++i) {
const blob = await fileHandle.readChunks(
i * UPLOAD_PART_REQUIRED_CHUNKS,
UPLOAD_PART_REQUIRED_CHUNKS
);
const url = parts[i];
const data = await blob.arrayBuffer();
const response = await axios
.request({
url,
method: "PUT",
headers: { "Content-Type": "" },
signal,
data,
onUploadProgress
})
.catch((e) => {
throw new WrappedError(`Failed to upload part at offset ${i}`, e);
});
if (!response.headers.etag || typeof response.headers.etag !== "string")
throw new Error(
`Failed to upload part at offset ${i}: invalid etag. ETag: ${response.headers.etag}`
);
uploadedBytes += blob.size;
uploadedChunks.push({
PartNumber: i + 1,
ETag: JSON.parse(response.headers.etag)
});
await fileHandle.addAdditionalData("uploadedChunks", uploadedChunks);
await fileHandle.addAdditionalData("uploadedBytes", uploadedBytes);
await fileHandle.addAdditionalData("uploaded", true);
if (isAttachmentDeletable(fileHandle.file.type)) {
await streamablefs.deleteFile(filename);
}
}
await axios
.post(
`${hosts.API_HOST}/s3/multipart`,
{
Key: filename,
UploadId: uploadId,
PartETags: uploadedChunks
},
{
headers,
signal
}
)
.catch(async (e) => {
await resetUpload(fileHandle);
throw new WrappedError("Could not complete multi-part upload.", e);
});
await fileHandle.addAdditionalData("uploaded", true);
if (isAttachmentDeletable(fileHandle.file.type)) {
await streamablefs.deleteFile(filename);
}
await checkUpload(filename);
return true;
return uploaded;
} catch (e) {
reportProgress(undefined, { type: "upload", hash: filename });
const error = toS3Error(e);
@@ -336,6 +249,148 @@ async function uploadFile(filename: string, requestOptions: RequestOptions) {
}
}
async function singlePartUploadFile(
fileHandle: FileHandle,
filename: string,
requestOptions: RequestOptions
) {
console.log("Streaming file upload!");
const { url, headers, signal } = requestOptions;
const uploadUrl = await fetch(url, {
method: "PUT",
headers,
signal
}).then((res) => (res.ok ? res.text() : null));
if (!uploadUrl) throw new Error("Unable to resolve attachment upload url.");
const response = await axios.request({
url: uploadUrl,
method: "PUT",
headers: {
"Content-Type": "application/octet-stream"
},
data: await fileHandle.toBlob(),
signal,
onUploadProgress: (ev) =>
reportProgress(
{
total: fileHandle.file.size + ABYTES,
loaded: ev.loaded
},
{
type: "upload",
hash: filename
}
)
});
return isSuccessStatusCode(response.status);
}
async function multiPartUploadFile(
fileHandle: FileHandle,
filename: string,
requestOptions: RequestOptions
) {
const { headers, signal } = requestOptions;
const additionalData = (fileHandle.file.additionalData ||
{}) as UploadAdditionalData;
const TOTAL_PARTS = Math.ceil(
fileHandle.file.chunks / UPLOAD_PART_REQUIRED_CHUNKS
);
const { uploadedChunks = [] } = additionalData;
let { uploadedBytes = 0, uploadId = "" } = additionalData;
const initiateMultiPartUpload = await axios
.get(
`${hosts.API_HOST}/s3/multipart?name=${filename}&parts=${TOTAL_PARTS}&uploadId=${uploadId}`,
{
headers,
signal
}
)
.catch((e) => {
throw new WrappedError("Could not initiate multi-part upload.", e);
});
uploadId = initiateMultiPartUpload.data.uploadId;
const { parts } = initiateMultiPartUpload.data;
if (!parts)
throw new Error("Could not initiate multi-part upload: invalid response.");
await fileHandle.addAdditionalData("uploadId", uploadId);
const onUploadProgress = (ev: AxiosProgressEvent) => {
reportProgress(
{
total: fileHandle.file.size + ABYTES,
loaded: uploadedBytes + ev.loaded
},
{
type: "upload",
hash: filename
}
);
};
for (let i = uploadedChunks.length; i < TOTAL_PARTS; ++i) {
const blob = await fileHandle.readChunks(
i * UPLOAD_PART_REQUIRED_CHUNKS,
UPLOAD_PART_REQUIRED_CHUNKS
);
const url = parts[i];
const data = await blob.arrayBuffer();
const response = await axios
.request({
url,
method: "PUT",
headers: { "Content-Type": "" },
signal,
data,
onUploadProgress
})
.catch((e) => {
throw new WrappedError(`Failed to upload part at offset ${i}`, e);
});
if (!response.headers.etag || typeof response.headers.etag !== "string")
throw new Error(
`Failed to upload part at offset ${i}: invalid etag. ETag: ${response.headers.etag}`
);
uploadedBytes += blob.size;
uploadedChunks.push({
PartNumber: i + 1,
ETag: JSON.parse(response.headers.etag)
});
await fileHandle.addAdditionalData("uploadedChunks", uploadedChunks);
await fileHandle.addAdditionalData("uploadedBytes", uploadedBytes);
}
await axios
.post(
`${hosts.API_HOST}/s3/multipart`,
{
Key: filename,
UploadId: uploadId,
PartETags: uploadedChunks
},
{
headers,
signal
}
)
.catch(async (e) => {
await resetUpload(fileHandle);
throw new WrappedError("Could not complete multi-part upload.", e);
});
return true;
}
async function resetUpload(fileHandle: FileHandle) {
await fileHandle.addAdditionalData("uploadId", undefined);
await fileHandle.addAdditionalData("uploadedChunks", undefined);