From ad899c1c7c15799619a0068dc0279efa673223e9 Mon Sep 17 00:00:00 2001 From: Hakan Shehu Date: Tue, 3 Dec 2024 00:17:23 +0100 Subject: [PATCH] Implement node data cleanup job --- .../main/services/collaboration-service.ts | 15 ++ .../desktop/src/main/services/node-service.ts | 20 ++ apps/server/src/data/redis.ts | 4 - apps/server/src/data/schema.ts | 4 + apps/server/src/jobs/clean-node-data.ts | 186 ++++++++++++++++++ apps/server/src/jobs/index.ts | 4 +- apps/server/src/services/job-service.ts | 8 +- apps/server/src/services/node-service.ts | 16 ++ apps/server/src/services/synapse-service.ts | 7 +- 9 files changed, 253 insertions(+), 11 deletions(-) create mode 100644 apps/server/src/jobs/clean-node-data.ts diff --git a/apps/desktop/src/main/services/collaboration-service.ts b/apps/desktop/src/main/services/collaboration-service.ts index 90dfe3f5..454c427f 100644 --- a/apps/desktop/src/main/services/collaboration-service.ts +++ b/apps/desktop/src/main/services/collaboration-service.ts @@ -72,6 +72,21 @@ class CollaborationService { .deleteFrom('node_transactions') .where('node_id', '=', revocation.nodeId) .execute(); + + await tx + .deleteFrom('collaborations') + .where('node_id', '=', revocation.nodeId) + .execute(); + + await tx + .deleteFrom('interaction_events') + .where('node_id', '=', revocation.nodeId) + .execute(); + + await tx + .deleteFrom('interactions') + .where('node_id', '=', revocation.nodeId) + .execute(); }); } } diff --git a/apps/desktop/src/main/services/node-service.ts b/apps/desktop/src/main/services/node-service.ts index fc56deb1..63d18257 100644 --- a/apps/desktop/src/main/services/node-service.ts +++ b/apps/desktop/src/main/services/node-service.ts @@ -456,6 +456,21 @@ class NodeService { .where('node_id', '=', nodeId) .execute(); + await trx + .deleteFrom('collaborations') + .where('node_id', '=', nodeId) + .execute(); + + await trx + .deleteFrom('interaction_events') + .where('node_id', '=', nodeId) + .execute(); + + await trx + .deleteFrom('interactions') + .where('node_id', '=', nodeId) + .execute(); + const createdTransaction = await trx .insertInto('node_transactions') .returningAll() @@ -879,6 +894,11 @@ class NodeService { .where('node_id', '=', transaction.nodeId) .execute(); + await trx + .deleteFrom('collaborations') + .where('node_id', '=', transaction.nodeId) + .execute(); + const nodeRow = await trx .deleteFrom('nodes') .returningAll() diff --git a/apps/server/src/data/redis.ts b/apps/server/src/data/redis.ts index cdcf96dc..0fea673a 100644 --- a/apps/server/src/data/redis.ts +++ b/apps/server/src/data/redis.ts @@ -29,7 +29,3 @@ export const initRedis = async () => { console.error('Redis client error:', err); }); }; - -export const CHANNEL_NAMES = { - SYNAPSE: process.env.REDIS_SYNAPSE_CHANNEL_NAME || 'colanode_synapse', -}; diff --git a/apps/server/src/data/schema.ts b/apps/server/src/data/schema.ts index d12c9e8f..f0325502 100644 --- a/apps/server/src/data/schema.ts +++ b/apps/server/src/data/schema.ts @@ -174,6 +174,10 @@ interface UploadTable { completed_at: ColumnType; } +export type SelectUpload = Selectable; +export type CreateUpload = Insertable; +export type UpdateUpload = Updateable; + interface InteractionTable { user_id: ColumnType; node_id: ColumnType; diff --git a/apps/server/src/jobs/clean-node-data.ts b/apps/server/src/jobs/clean-node-data.ts new file mode 100644 index 00000000..9b35a4ca --- /dev/null +++ b/apps/server/src/jobs/clean-node-data.ts @@ -0,0 +1,186 @@ +import { generateId, IdType } from '@colanode/core'; +import { DeleteObjectCommand } from '@aws-sdk/client-s3'; + +import { database } from '@/data/database'; +import { CreateNodeTransaction, SelectUpload } from '@/data/schema'; +import { JobHandler } from '@/types/jobs'; +import { filesStorage, BUCKET_NAMES } from '@/data/storage'; +import { eventBus } from '@/lib/event-bus'; +import { createLogger } from '@/lib/logger'; + +const BATCH_SIZE = 100; + +const logger = createLogger('clean-node-data'); + +export type CleanNodeDataInput = { + type: 'clean_node_data'; + nodeId: string; + workspaceId: string; +}; + +declare module '@/types/jobs' { + interface JobMap { + clean_node_data: { + input: CleanNodeDataInput; + }; + } +} + +export const cleanNodeDataHandler: JobHandler = async ( + input +) => { + logger.trace(`Cleaning node data for ${input.nodeId}`); + + const deleteTransactions = await database + .selectFrom('node_transactions') + .selectAll() + .where('node_id', '=', input.nodeId) + .execute(); + + if (deleteTransactions.length !== 1) { + logger.error(`Expected 1 delete transaction for ${input.nodeId}`); + return; + } + + const deleteTransaction = deleteTransactions[0]; + if ( + !deleteTransaction?.operation || + deleteTransaction.operation !== 'delete' + ) { + logger.error(`Expected delete transaction for ${input.nodeId}`); + return; + } + + const parentIds = [input.nodeId]; + while (parentIds.length > 0) { + const tempParentIds = parentIds.splice(0, BATCH_SIZE); + const deletedNodeIds = await deleteChildren( + tempParentIds, + input.workspaceId, + deleteTransaction.created_by + ); + + parentIds.push(...deletedNodeIds); + } +}; + +const deleteChildren = async ( + parentIds: string[], + workspaceId: string, + userId: string +) => { + const deletedNodeIds: string[] = []; + let hasMore = true; + while (hasMore) { + try { + const descendants = await database + .selectFrom('nodes') + .selectAll() + .where('parent_id', 'in', parentIds) + .orderBy('id', 'asc') + .limit(BATCH_SIZE) + .execute(); + + if (descendants.length === 0) { + logger.trace(`No descendants found for ${parentIds}`); + hasMore = false; + break; + } + + const fileIds: string[] = descendants + .filter((d) => d.type === 'file') + .map((d) => d.id); + + const uploads: SelectUpload[] = + fileIds.length > 0 + ? await database + .selectFrom('uploads') + .selectAll() + .where('node_id', 'in', fileIds) + .execute() + : []; + + const nodeIds: string[] = descendants.map((d) => d.id); + const transactionsToCreate: CreateNodeTransaction[] = descendants.map( + (descendant) => ({ + id: generateId(IdType.Transaction), + node_id: descendant.id, + node_type: descendant.type, + workspace_id: workspaceId, + operation: 'delete', + created_at: new Date(), + created_by: userId, + server_created_at: new Date(), + }) + ); + const uploadsToDelete: string[] = uploads.map((u) => u.node_id); + + await database.transaction().execute(async (trx) => { + await trx + .deleteFrom('node_transactions') + .where('node_id', 'in', nodeIds) + .execute(); + + const createdTransactions = await trx + .insertInto('node_transactions') + .returningAll() + .values(transactionsToCreate) + .execute(); + + if (createdTransactions.length !== transactionsToCreate.length) { + throw new Error('Failed to create transactions'); + } + + if (uploadsToDelete.length > 0) { + await trx + .deleteFrom('uploads') + .where('node_id', 'in', uploadsToDelete) + .execute(); + } + + await trx.deleteFrom('nodes').where('id', 'in', nodeIds).execute(); + await trx + .updateTable('collaborations') + .set({ + roles: '{}', + updated_at: new Date(), + }) + .where('node_id', 'in', nodeIds) + .execute(); + }); + + for (const upload of uploads) { + const command = new DeleteObjectCommand({ + Bucket: BUCKET_NAMES.FILES, + Key: upload.path, + }); + + logger.trace( + `Deleting file as a descendant of ${parentIds}: ${upload.path}` + ); + + await filesStorage.send(command); + } + + for (const node of descendants) { + logger.trace(`Publishing node deleted event for ${node.id}`); + + eventBus.publish({ + type: 'node_deleted', + nodeId: node.id, + nodeType: node.type, + workspaceId: workspaceId, + }); + + deletedNodeIds.push(node.id); + } + + hasMore = descendants.length === BATCH_SIZE; + } catch (error) { + logger.error(`Error cleaning node data for ${parentIds}: ${error}`); + hasMore = false; + } + } + + return deletedNodeIds; +}; diff --git a/apps/server/src/jobs/index.ts b/apps/server/src/jobs/index.ts index 08b65c7a..6a2b03bf 100644 --- a/apps/server/src/jobs/index.ts +++ b/apps/server/src/jobs/index.ts @@ -1,6 +1,7 @@ +import { cleanNodeDataHandler } from '@/jobs/clean-node-data'; import { cleanWorkspaceDataHandler } from '@/jobs/clean-workspace-data'; import { sendEmailHandler } from '@/jobs/send-email'; -import { JobHandler , JobMap } from '@/types/jobs'; +import { JobHandler, JobMap } from '@/types/jobs'; type JobHandlerMap = { [K in keyof JobMap]: JobHandler; @@ -9,4 +10,5 @@ type JobHandlerMap = { export const jobHandlerMap: JobHandlerMap = { send_email: sendEmailHandler, clean_workspace_data: cleanWorkspaceDataHandler, + clean_node_data: cleanNodeDataHandler, }; diff --git a/apps/server/src/services/job-service.ts b/apps/server/src/services/job-service.ts index 422e23ed..5583a555 100644 --- a/apps/server/src/services/job-service.ts +++ b/apps/server/src/services/job-service.ts @@ -1,9 +1,11 @@ -import { Job, JobsOptions,Queue, Worker } from 'bullmq'; +import { Job, JobsOptions, Queue, Worker } from 'bullmq'; import { redisConfig } from '@/data/redis'; import { jobHandlerMap } from '@/jobs'; import { JobHandler, JobInput } from '@/types/jobs'; +const JOBS_QUEUE_NAME = process.env.REDIS_JOBS_QUEUE_NAME ?? 'jobs'; + class JobService { private jobQueue: Queue | undefined; private jobWorker: Worker | undefined; @@ -13,7 +15,7 @@ class JobService { return; } - this.jobQueue = new Queue('jobs', { + this.jobQueue = new Queue(JOBS_QUEUE_NAME, { connection: { host: redisConfig.host, password: redisConfig.password, @@ -31,7 +33,7 @@ class JobService { return; } - this.jobWorker = new Worker('jobs', this.handleJobJob, { + this.jobWorker = new Worker(JOBS_QUEUE_NAME, this.handleJobJob, { connection: { host: redisConfig.host, password: redisConfig.password, diff --git a/apps/server/src/services/node-service.ts b/apps/server/src/services/node-service.ts index 6a832bf4..47ef50b8 100644 --- a/apps/server/src/services/node-service.ts +++ b/apps/server/src/services/node-service.ts @@ -11,6 +11,7 @@ import { decodeState, YDoc } from '@colanode/crdt'; import { sql, Transaction } from 'kysely'; import { cloneDeep } from 'lodash-es'; +import { jobService } from '@/services/job-service'; import { database } from '@/data/database'; import { CreateCollaboration, @@ -569,6 +570,15 @@ class NodeService { throw new Error('Failed to create transaction'); } + await trx + .updateTable('collaborations') + .set({ + roles: '{}', + updated_at: new Date(), + }) + .where('node_id', '=', input.nodeId) + .execute(); + return { deletedNode, createdTransaction, @@ -582,6 +592,12 @@ class NodeService { workspaceId: workspaceUser.workspace_id, }); + await jobService.addJob({ + type: 'clean_node_data', + nodeId: input.nodeId, + workspaceId: workspaceUser.workspace_id, + }); + return { node: deletedNode, transaction: createdTransaction, diff --git a/apps/server/src/services/synapse-service.ts b/apps/server/src/services/synapse-service.ts index 8affcfba..032943d2 100644 --- a/apps/server/src/services/synapse-service.ts +++ b/apps/server/src/services/synapse-service.ts @@ -514,8 +514,8 @@ class SynapseService { if (PUBLIC_NODES.includes(event.nodeType)) { usersToSend = userIds; } else { - const collaborations = await database - .selectFrom('collaborations') + const revocations = await database + .selectFrom('collaboration_revocations') .selectAll() .where((eb) => eb.and([ @@ -525,7 +525,7 @@ class SynapseService { ) .execute(); - usersToSend = collaborations.map((c) => c.user_id); + usersToSend = revocations.map((r) => r.user_id); } if (usersToSend.length === 0) { @@ -541,6 +541,7 @@ class SynapseService { } this.sendPendingTransactions(socketConnection, userId); + this.sendPendingRevocations(socketConnection, userId); } } }