diff --git a/apps/server/src/data/schema.ts b/apps/server/src/data/schema.ts index d741acc9..ec164dc2 100644 --- a/apps/server/src/data/schema.ts +++ b/apps/server/src/data/schema.ts @@ -123,7 +123,7 @@ interface CollaborationTable { state: ColumnType; created_at: ColumnType; updated_at: ColumnType; - deleted_at: ColumnType; + deleted_at: ColumnType; number: ColumnType; } diff --git a/apps/server/src/jobs/create-collaborations.ts b/apps/server/src/jobs/create-collaborations.ts new file mode 100644 index 00000000..2aa66007 --- /dev/null +++ b/apps/server/src/jobs/create-collaborations.ts @@ -0,0 +1,77 @@ +import { database } from '@/data/database'; +import { CreateCollaboration } from '@/data/schema'; +import { JobHandler } from '@/types/jobs'; +import { buildDefaultCollaboration } from '@/lib/collaborations'; +import { NodeType } from '@colanode/core'; +import { eventBus } from '@/lib/event-bus'; + +export type CreateCollaborationsInput = { + type: 'create_collaborations'; + userId: string; + nodeId: string; + workspaceId: string; +}; + +declare module '@/types/jobs' { + interface JobMap { + create_collaborations: { + input: CreateCollaborationsInput; + }; + } +} + +export const createCollaborationsHandler: JobHandler< + CreateCollaborationsInput +> = async (input) => { + const nodeRow = await database + .selectFrom('nodes') + .where('id', '=', input.nodeId) + .select(['id', 'type']) + .executeTakeFirst(); + + if (!nodeRow) { + return; + } + + const descendants = await database + .selectFrom('node_paths') + .where('ancestor_id', '=', input.nodeId) + .selectAll() + .execute(); + + if (descendants.length === 0) { + return; + } + + const collaborationsToCreate: CreateCollaboration[] = []; + for (const descendant of descendants) { + collaborationsToCreate.push( + buildDefaultCollaboration( + input.userId, + descendant.descendant_id, + nodeRow.type as NodeType, + input.workspaceId + ) + ); + } + + const createdCollaborations = await database + .insertInto('collaborations') + .returning(['node_id', 'user_id', 'workspace_id']) + .values(collaborationsToCreate) + .onConflict((b) => + b.columns(['node_id', 'user_id']).doUpdateSet({ + deleted_at: null, + }) + ) + .execute(); + + for (const createdCollaboration of createdCollaborations) { + eventBus.publish({ + type: 'collaboration_created', + nodeId: createdCollaboration.node_id, + userId: createdCollaboration.user_id, + workspaceId: createdCollaboration.workspace_id, + }); + } +}; diff --git a/apps/server/src/jobs/delete-collaborations.ts b/apps/server/src/jobs/delete-collaborations.ts new file mode 100644 index 00000000..255961ed --- /dev/null +++ b/apps/server/src/jobs/delete-collaborations.ts @@ -0,0 +1,102 @@ +import { database } from '@/data/database'; +import { eventBus } from '@/lib/event-bus'; +import { JobHandler } from '@/types/jobs'; +import { extractNodeCollaborators, NodeAttributes } from '@colanode/core'; + +export type DeleteCollaborationsInput = { + type: 'delete_collaborations'; + nodeId: string; + userId: string; + workspaceId: string; +}; + +declare module '@/types/jobs' { + interface JobMap { + delete_collaborations: { + input: DeleteCollaborationsInput; + }; + } +} + +export const deleteCollaborationsHandler: JobHandler< + DeleteCollaborationsInput +> = async (input) => { + const updatedCollaboration = await database + .updateTable('collaborations') + .returning(['node_id', 'user_id']) + .where('node_id', '=', input.nodeId) + .where('user_id', '=', input.userId) + .set({ deleted_at: new Date() }) + .executeTakeFirst(); + + if (updatedCollaboration) { + eventBus.publish({ + type: 'collaboration_updated', + nodeId: updatedCollaboration.node_id, + userId: updatedCollaboration.user_id, + workspaceId: input.workspaceId, + }); + } + + await checkChildCollaborations(input.nodeId, input.userId, input.workspaceId); +}; + +const checkChildCollaborations = async ( + parentId: string, + userId: string, + workspaceId: string +) => { + let lastId = parentId; + + const parentIdsToCheck: string[] = []; + const nodeIdsToDelete: string[] = []; + while (true) { + const children = await database + .selectFrom('nodes') + .select(['id', 'type', 'attributes']) + .where('parent_id', '=', parentId) + .where('id', '>', lastId) + .orderBy('id', 'asc') + .limit(100) + .execute(); + + for (const child of children) { + const collaborators = extractNodeCollaborators(child.attributes); + if (!collaborators[userId]) { + nodeIdsToDelete.push(child.id); + } + + parentIdsToCheck.push(child.id); + lastId = child.id; + } + + if (children.length < 100) { + break; + } + } + + if (nodeIdsToDelete.length > 0) { + const updatedCollaborations = await database + .updateTable('collaborations') + .returning(['node_id', 'user_id']) + .where('node_id', 'in', nodeIdsToDelete) + .where('user_id', '=', userId) + .set({ deleted_at: new Date() }) + .execute(); + + for (const updatedCollaboration of updatedCollaborations) { + eventBus.publish({ + type: 'collaboration_updated', + nodeId: updatedCollaboration.node_id, + userId: updatedCollaboration.user_id, + workspaceId, + }); + } + } + + if (parentIdsToCheck.length > 0) { + for (const parentId of parentIdsToCheck) { + await checkChildCollaborations(parentId, userId, workspaceId); + } + } +}; diff --git a/apps/server/src/jobs/index.ts b/apps/server/src/jobs/index.ts index 1cbaa530..13d79281 100644 --- a/apps/server/src/jobs/index.ts +++ b/apps/server/src/jobs/index.ts @@ -3,6 +3,8 @@ import { JobMap } from '@/types/jobs'; import { sendEmailHandler } from '@/jobs/send-email'; import { cleanWorkspaceDataHandler } from '@/jobs/clean-workspace-data'; +import { createCollaborationsHandler } from '@/jobs/create-collaborations'; +import { deleteCollaborationsHandler } from '@/jobs/delete-collaborations'; type JobHandlerMap = { [K in keyof JobMap]: JobHandler; @@ -11,4 +13,6 @@ type JobHandlerMap = { export const jobHandlerMap: JobHandlerMap = { send_email: sendEmailHandler, clean_workspace_data: cleanWorkspaceDataHandler, + create_collaborations: createCollaborationsHandler, + delete_collaborations: deleteCollaborationsHandler, }; diff --git a/apps/server/src/services/node-service.ts b/apps/server/src/services/node-service.ts index 6a35e04f..fff44089 100644 --- a/apps/server/src/services/node-service.ts +++ b/apps/server/src/services/node-service.ts @@ -33,7 +33,8 @@ import { } from '@/types/nodes'; import { buildDefaultCollaboration } from '@/lib/collaborations'; import { eventBus } from '@/lib/event-bus'; -import { logService } from './log'; +import { logService } from '@/services/log'; +import { jobService } from '@/services/job-service'; const UPDATE_RETRIES_LIMIT = 10; @@ -186,103 +187,58 @@ class NodeService { const date = new Date(); const transactionId = generateId(IdType.Transaction); - const { removedCollaborators, addedCollaborators } = + const { addedCollaborators, removedCollaborators } = this.checkCollaboratorChanges( node, ancestors.filter((a) => a.id !== input.nodeId), attributes ); - const collaborations: CreateCollaboration[] = addedCollaborators.map( - (collaboratorId) => - buildDefaultCollaboration( - collaboratorId, - input.nodeId, - attributes.type, - input.workspaceId - ) - ); - try { - const { - updatedNode, - createdTransaction, - createdCollaborations, - updatedCollaborations, - } = await database.transaction().execute(async (trx) => { - const updatedNode = await trx - .updateTable('nodes') - .returningAll() - .set({ - attributes: attributesJson, - updated_at: date, - updated_by: input.userId, - transaction_id: transactionId, - }) - .where('id', '=', input.nodeId) - .where('transaction_id', '=', node.transactionId) - .executeTakeFirst(); - - if (!updatedNode) { - throw new Error('Failed to update node'); - } - - const createdTransaction = await trx - .insertInto('node_transactions') - .returningAll() - .values({ - id: transactionId, - node_id: input.nodeId, - workspace_id: input.workspaceId, - type: 'update', - data: update, - created_at: date, - created_by: input.userId, - server_created_at: date, - }) - .executeTakeFirst(); - - if (!createdTransaction) { - throw new Error('Failed to create transaction'); - } - - let createdCollaborations: SelectCollaboration[] = []; - let updatedCollaborations: SelectCollaboration[] = []; - if (collaborations.length > 0) { - createdCollaborations = await trx - .insertInto('collaborations') - .returningAll() - .values(collaborations) - .execute(); - - if (createdCollaborations.length !== collaborations.length) { - throw new Error('Failed to create collaborations'); - } - } - - if (removedCollaborators.length > 0) { - updatedCollaborations = await trx - .updateTable('collaborations') + const { updatedNode, createdTransaction } = await database + .transaction() + .execute(async (trx) => { + const updatedNode = await trx + .updateTable('nodes') .returningAll() .set({ - deleted_at: date, + attributes: attributesJson, + updated_at: date, + updated_by: input.userId, + transaction_id: transactionId, }) - .where('node_id', '=', input.nodeId) - .where('user_id', 'in', removedCollaborators) - .execute(); + .where('id', '=', input.nodeId) + .where('transaction_id', '=', node.transactionId) + .executeTakeFirst(); - if (updatedCollaborations.length !== removedCollaborators.length) { - throw new Error('Failed to remove collaborations'); + if (!updatedNode) { + throw new Error('Failed to update node'); } - } - return { - updatedNode, - createdTransaction, - createdCollaborations, - updatedCollaborations, - }; - }); + const createdTransaction = await trx + .insertInto('node_transactions') + .returningAll() + .values({ + id: transactionId, + node_id: input.nodeId, + workspace_id: input.workspaceId, + type: 'update', + data: update, + created_at: date, + created_by: input.userId, + server_created_at: date, + }) + .executeTakeFirst(); + + if (!createdTransaction) { + throw new Error('Failed to create transaction'); + } + + return { + updatedNode, + createdTransaction, + }; + }); eventBus.publish({ type: 'node_transaction_created', @@ -291,21 +247,21 @@ class NodeService { workspaceId: input.workspaceId, }); - for (const collaboration of createdCollaborations) { - eventBus.publish({ - type: 'collaboration_created', - userId: collaboration.user_id, - nodeId: collaboration.node_id, - workspaceId: collaboration.workspace_id, + for (const addedCollaborator of addedCollaborators) { + jobService.addJob({ + type: 'create_collaborations', + nodeId: input.nodeId, + userId: addedCollaborator, + workspaceId: input.workspaceId, }); } - for (const collaboration of updatedCollaborations) { - eventBus.publish({ - type: 'collaboration_updated', - userId: collaboration.user_id, - nodeId: collaboration.node_id, - workspaceId: collaboration.workspace_id, + for (const removedCollaborator of removedCollaborators) { + jobService.addJob({ + type: 'delete_collaborations', + nodeId: input.nodeId, + userId: removedCollaborator, + workspaceId: input.workspaceId, }); } @@ -314,8 +270,6 @@ class NodeService { output: { node: updatedNode, transaction: createdTransaction, - createdCollaborations, - updatedCollaborations, }, }; } catch (error) { @@ -491,106 +445,61 @@ class NodeService { return { type: 'error', output: null }; } - const { removedCollaborators, addedCollaborators } = + const { addedCollaborators, removedCollaborators } = this.checkCollaboratorChanges( node, ancestors.filter((a) => a.id !== input.nodeId), attributes ); - const collaborations: CreateCollaboration[] = addedCollaborators.map( - (collaboratorId) => - buildDefaultCollaboration( - collaboratorId, - input.nodeId, - attributes.type, - context.workspaceId - ) - ); - try { - const { - updatedNode, - createdTransaction, - createdCollaborations, - updatedCollaborations, - } = await database.transaction().execute(async (trx) => { - const updatedNode = await trx - .updateTable('nodes') - .returningAll() - .set({ - attributes: attributesJson, - updated_at: input.createdAt, - updated_by: input.userId, - transaction_id: input.id, - }) - .where('id', '=', input.nodeId) - .where('transaction_id', '=', node.transactionId) - .executeTakeFirst(); - - if (!updatedNode) { - throw new Error('Failed to update node'); - } - - const createdTransaction = await trx - .insertInto('node_transactions') - .returningAll() - .values({ - id: input.id, - node_id: input.nodeId, - workspace_id: context.workspaceId, - type: 'update', - data: - typeof input.data === 'string' - ? decodeState(input.data) - : input.data, - created_at: input.createdAt, - created_by: input.userId, - server_created_at: new Date(), - }) - .executeTakeFirst(); - - if (!createdTransaction) { - throw new Error('Failed to create transaction'); - } - - let createdCollaborations: SelectCollaboration[] = []; - let updatedCollaborations: SelectCollaboration[] = []; - if (collaborations.length > 0) { - createdCollaborations = await trx - .insertInto('collaborations') - .returningAll() - .values(collaborations) - .execute(); - - if (createdCollaborations.length !== collaborations.length) { - throw new Error('Failed to create collaborations'); - } - } - - if (removedCollaborators.length > 0) { - updatedCollaborations = await trx - .updateTable('collaborations') + const { updatedNode, createdTransaction } = await database + .transaction() + .execute(async (trx) => { + const updatedNode = await trx + .updateTable('nodes') .returningAll() .set({ - deleted_at: input.createdAt, + attributes: attributesJson, + updated_at: input.createdAt, + updated_by: input.userId, + transaction_id: input.id, }) - .where('node_id', '=', input.nodeId) - .where('user_id', 'in', removedCollaborators) - .execute(); + .where('id', '=', input.nodeId) + .where('transaction_id', '=', node.transactionId) + .executeTakeFirst(); - if (updatedCollaborations.length !== removedCollaborators.length) { - throw new Error('Failed to remove collaborations'); + if (!updatedNode) { + throw new Error('Failed to update node'); } - } - return { - updatedNode, - createdTransaction, - createdCollaborations, - updatedCollaborations, - }; - }); + const createdTransaction = await trx + .insertInto('node_transactions') + .returningAll() + .values({ + id: input.id, + node_id: input.nodeId, + workspace_id: context.workspaceId, + type: 'update', + data: + typeof input.data === 'string' + ? decodeState(input.data) + : input.data, + created_at: input.createdAt, + created_by: input.userId, + server_created_at: new Date(), + }) + .executeTakeFirst(); + + if (!createdTransaction) { + throw new Error('Failed to create transaction'); + } + + return { + updatedNode, + createdTransaction, + }; + }); eventBus.publish({ type: 'node_transaction_created', @@ -599,21 +508,21 @@ class NodeService { workspaceId: context.workspaceId, }); - for (const collaboration of createdCollaborations) { - eventBus.publish({ - type: 'collaboration_created', - userId: collaboration.user_id, - nodeId: collaboration.node_id, - workspaceId: collaboration.workspace_id, + for (const addedCollaborator of addedCollaborators) { + jobService.addJob({ + type: 'create_collaborations', + nodeId: input.nodeId, + userId: addedCollaborator, + workspaceId: context.workspaceId, }); } - for (const collaboration of updatedCollaborations) { - eventBus.publish({ - type: 'collaboration_updated', - userId: collaboration.user_id, - nodeId: collaboration.node_id, - workspaceId: collaboration.workspace_id, + for (const removedCollaborator of removedCollaborators) { + jobService.addJob({ + type: 'delete_collaborations', + nodeId: input.nodeId, + userId: removedCollaborator, + workspaceId: context.workspaceId, }); } @@ -622,8 +531,6 @@ class NodeService { output: { node: updatedNode, transaction: createdTransaction, - createdCollaborations, - updatedCollaborations, }, }; } catch (error) { diff --git a/apps/server/src/types/nodes.ts b/apps/server/src/types/nodes.ts index 244a4242..7e290198 100644 --- a/apps/server/src/types/nodes.ts +++ b/apps/server/src/types/nodes.ts @@ -35,8 +35,6 @@ export type UpdateNodeInput = { export type UpdateNodeOutput = { node: SelectNode; transaction: SelectNodeTransaction; - createdCollaborations: SelectCollaboration[]; - updatedCollaborations: SelectCollaboration[]; }; export type ApplyNodeCreateTransactionInput = { @@ -63,8 +61,6 @@ export type ApplyNodeUpdateTransactionInput = { export type ApplyNodeUpdateTransactionOutput = { node: SelectNode; transaction: SelectNodeTransaction; - createdCollaborations: SelectCollaboration[]; - updatedCollaborations: SelectCollaboration[]; }; export type ApplyNodeDeleteTransactionInput = {