Improve clean entry data job

This commit is contained in:
Hakan Shehu
2024-12-30 09:39:24 +01:00
parent 7939da3c29
commit 8d23d18491

View File

@@ -1,12 +1,16 @@
import { DeleteObjectCommand } from '@aws-sdk/client-s3';
import { generateId, IdType } from '@colanode/core';
// import { DeleteObjectCommand } from '@aws-sdk/client-s3';
import { database } from '@/data/database';
import { CreateEntryTransaction } from '@/data/schema';
import {
CreateEntryTransaction,
CreateFileTombstone,
CreateMessageTombstone,
} from '@/data/schema';
import { JobHandler } from '@/types/jobs';
// import { filesStorage, BUCKET_NAMES } from '@/data/storage';
import { eventBus } from '@/lib/event-bus';
import { createLogger } from '@/lib/logger';
import { BUCKET_NAMES, filesStorage } from '@/data/storage';
const BATCH_SIZE = 100;
@@ -51,24 +55,21 @@ export const cleanEntryDataHandler: JobHandler<CleanEntryDataInput> = async (
return;
}
const userId = deleteTransaction.created_by;
const parentIds = [input.entryId];
while (parentIds.length > 0) {
const tempParentIds = parentIds.splice(0, BATCH_SIZE);
const deletedEntryIds = await deleteChildren(
tempParentIds,
input.workspaceId,
deleteTransaction.created_by
);
const deletedEntryIds = await deleteChildren(tempParentIds, userId);
await deleteMessages(tempParentIds, userId);
await deleteFiles(tempParentIds, userId);
parentIds.push(...deletedEntryIds);
}
};
const deleteChildren = async (
parentIds: string[],
workspaceId: string,
userId: string
) => {
const deleteChildren = async (parentIds: string[], userId: string) => {
const deletedEntryIds: string[] = [];
let hasMore = true;
while (hasMore) {
@@ -87,33 +88,19 @@ const deleteChildren = async (
break;
}
// const fileIds: string[] = descendants
// .filter((d) => d.type === 'file')
// .map((d) => d.id);
// const uploads: SelectUpload[] =
// fileIds.length > 0
// ? await database
// .selectFrom('uploads')
// .selectAll()
// .where('node_id', 'in', fileIds)
// .execute()
// : [];
const entryIds: string[] = descendants.map((d) => d.id);
const transactionsToCreate: CreateEntryTransaction[] = descendants.map(
(descendant) => ({
id: generateId(IdType.Transaction),
entry_id: descendant.id,
root_id: descendant.root_id,
workspace_id: workspaceId,
workspace_id: descendant.workspace_id,
operation: 'delete',
created_at: new Date(),
created_by: userId,
server_created_at: new Date(),
})
);
// const uploadsToDelete: string[] = uploads.map((u) => u.node_id);
await database.transaction().execute(async (trx) => {
await trx
@@ -131,13 +118,6 @@ const deleteChildren = async (
throw new Error('Failed to create transactions');
}
// if (uploadsToDelete.length > 0) {
// await trx
// .deleteFrom('uploads')
// .where('node_id', 'in', uploadsToDelete)
// .execute();
// }
await trx.deleteFrom('entries').where('id', 'in', entryIds).execute();
await trx
.updateTable('collaborations')
@@ -149,26 +129,13 @@ const deleteChildren = async (
.execute();
});
// for (const upload of uploads) {
// const command = new DeleteObjectCommand({
// Bucket: BUCKET_NAMES.FILES,
// Key: upload.path,
// });
// logger.trace(
// `Deleting file as a descendant of ${parentIds}: ${upload.path}`
// );
// await filesStorage.send(command);
// }
for (const entry of descendants) {
eventBus.publish({
type: 'entry_deleted',
entryId: entry.id,
entryType: entry.type,
rootId: entry.root_id,
workspaceId: workspaceId,
workspaceId: entry.workspace_id,
});
deletedEntryIds.push(entry.id);
@@ -183,3 +150,137 @@ const deleteChildren = async (
return deletedEntryIds;
};
const deleteMessages = async (entryIds: string[], userId: string) => {
let hasMore = true;
while (hasMore) {
try {
const messages = await database
.selectFrom('messages')
.selectAll()
.where('entry_id', 'in', entryIds)
.orderBy('id', 'asc')
.limit(BATCH_SIZE)
.execute();
if (messages.length === 0) {
hasMore = false;
break;
}
const messageIds: string[] = messages.map((m) => m.id);
const messageTombstonesToCreate: CreateMessageTombstone[] = messages.map(
(message) => ({
id: message.id,
root_id: message.root_id,
workspace_id: message.workspace_id,
deleted_at: new Date(),
deleted_by: userId,
})
);
await database.transaction().execute(async (trx) => {
await trx
.deleteFrom('messages')
.where('id', 'in', messageIds)
.execute();
await trx
.deleteFrom('message_reactions')
.where('message_id', 'in', messageIds)
.execute();
await trx
.deleteFrom('message_interactions')
.where('message_id', 'in', messageIds)
.execute();
await trx
.insertInto('message_tombstones')
.values(messageTombstonesToCreate)
.execute();
});
for (const message of messages) {
eventBus.publish({
type: 'message_deleted',
messageId: message.id,
rootId: message.root_id,
workspaceId: message.workspace_id,
});
}
hasMore = messages.length === BATCH_SIZE;
} catch (error) {
logger.error(`Error deleting messages for ${entryIds}: ${error}`);
hasMore = false;
}
}
};
const deleteFiles = async (entryIds: string[], userId: string) => {
let hasMore = true;
while (hasMore) {
try {
const files = await database
.selectFrom('files')
.selectAll()
.where('entry_id', 'in', entryIds)
.orderBy('id', 'asc')
.limit(BATCH_SIZE)
.execute();
if (files.length === 0) {
hasMore = false;
break;
}
const fileIds: string[] = files.map((m) => m.id);
const fileTombstonesToCreate: CreateFileTombstone[] = files.map(
(file) => ({
id: file.id,
root_id: file.root_id,
workspace_id: file.workspace_id,
deleted_at: new Date(),
deleted_by: userId,
})
);
await database.transaction().execute(async (trx) => {
await trx.deleteFrom('files').where('id', 'in', fileIds).execute();
await trx
.deleteFrom('file_interactions')
.where('file_id', 'in', fileIds)
.execute();
await trx
.insertInto('file_tombstones')
.values(fileTombstonesToCreate)
.execute();
});
for (const file of files) {
eventBus.publish({
type: 'file_deleted',
fileId: file.id,
rootId: file.root_id,
workspaceId: file.workspace_id,
});
const path = `files/${file.workspace_id}/${file.id}${file.extension}`;
const command = new DeleteObjectCommand({
Bucket: BUCKET_NAMES.FILES,
Key: path,
});
await filesStorage.send(command);
}
hasMore = files.length === BATCH_SIZE;
} catch (error) {
logger.error(`Error deleting files for ${entryIds}: ${error}`);
hasMore = false;
}
}
};