Add create and delete collaborations jobs

This commit is contained in:
Hakan Shehu
2024-11-27 12:41:34 +01:00
parent c76af35e0d
commit c320babe80
6 changed files with 291 additions and 205 deletions

View File

@@ -123,7 +123,7 @@ interface CollaborationTable {
state: ColumnType<Uint8Array, Uint8Array, Uint8Array>;
created_at: ColumnType<Date, Date, never>;
updated_at: ColumnType<Date | null, Date | null, Date>;
deleted_at: ColumnType<Date | null, Date | null, Date>;
deleted_at: ColumnType<Date | null, Date | null, Date | null>;
number: ColumnType<bigint, never, never>;
}

View File

@@ -0,0 +1,77 @@
import { database } from '@/data/database';
import { CreateCollaboration } from '@/data/schema';
import { JobHandler } from '@/types/jobs';
import { buildDefaultCollaboration } from '@/lib/collaborations';
import { NodeType } from '@colanode/core';
import { eventBus } from '@/lib/event-bus';
export type CreateCollaborationsInput = {
type: 'create_collaborations';
userId: string;
nodeId: string;
workspaceId: string;
};
declare module '@/types/jobs' {
interface JobMap {
create_collaborations: {
input: CreateCollaborationsInput;
};
}
}
export const createCollaborationsHandler: JobHandler<
CreateCollaborationsInput
> = async (input) => {
const nodeRow = await database
.selectFrom('nodes')
.where('id', '=', input.nodeId)
.select(['id', 'type'])
.executeTakeFirst();
if (!nodeRow) {
return;
}
const descendants = await database
.selectFrom('node_paths')
.where('ancestor_id', '=', input.nodeId)
.selectAll()
.execute();
if (descendants.length === 0) {
return;
}
const collaborationsToCreate: CreateCollaboration[] = [];
for (const descendant of descendants) {
collaborationsToCreate.push(
buildDefaultCollaboration(
input.userId,
descendant.descendant_id,
nodeRow.type as NodeType,
input.workspaceId
)
);
}
const createdCollaborations = await database
.insertInto('collaborations')
.returning(['node_id', 'user_id', 'workspace_id'])
.values(collaborationsToCreate)
.onConflict((b) =>
b.columns(['node_id', 'user_id']).doUpdateSet({
deleted_at: null,
})
)
.execute();
for (const createdCollaboration of createdCollaborations) {
eventBus.publish({
type: 'collaboration_created',
nodeId: createdCollaboration.node_id,
userId: createdCollaboration.user_id,
workspaceId: createdCollaboration.workspace_id,
});
}
};

View File

@@ -0,0 +1,102 @@
import { database } from '@/data/database';
import { eventBus } from '@/lib/event-bus';
import { JobHandler } from '@/types/jobs';
import { extractNodeCollaborators, NodeAttributes } from '@colanode/core';
export type DeleteCollaborationsInput = {
type: 'delete_collaborations';
nodeId: string;
userId: string;
workspaceId: string;
};
declare module '@/types/jobs' {
interface JobMap {
delete_collaborations: {
input: DeleteCollaborationsInput;
};
}
}
export const deleteCollaborationsHandler: JobHandler<
DeleteCollaborationsInput
> = async (input) => {
const updatedCollaboration = await database
.updateTable('collaborations')
.returning(['node_id', 'user_id'])
.where('node_id', '=', input.nodeId)
.where('user_id', '=', input.userId)
.set({ deleted_at: new Date() })
.executeTakeFirst();
if (updatedCollaboration) {
eventBus.publish({
type: 'collaboration_updated',
nodeId: updatedCollaboration.node_id,
userId: updatedCollaboration.user_id,
workspaceId: input.workspaceId,
});
}
await checkChildCollaborations(input.nodeId, input.userId, input.workspaceId);
};
const checkChildCollaborations = async (
parentId: string,
userId: string,
workspaceId: string
) => {
let lastId = parentId;
const parentIdsToCheck: string[] = [];
const nodeIdsToDelete: string[] = [];
while (true) {
const children = await database
.selectFrom('nodes')
.select(['id', 'type', 'attributes'])
.where('parent_id', '=', parentId)
.where('id', '>', lastId)
.orderBy('id', 'asc')
.limit(100)
.execute();
for (const child of children) {
const collaborators = extractNodeCollaborators(child.attributes);
if (!collaborators[userId]) {
nodeIdsToDelete.push(child.id);
}
parentIdsToCheck.push(child.id);
lastId = child.id;
}
if (children.length < 100) {
break;
}
}
if (nodeIdsToDelete.length > 0) {
const updatedCollaborations = await database
.updateTable('collaborations')
.returning(['node_id', 'user_id'])
.where('node_id', 'in', nodeIdsToDelete)
.where('user_id', '=', userId)
.set({ deleted_at: new Date() })
.execute();
for (const updatedCollaboration of updatedCollaborations) {
eventBus.publish({
type: 'collaboration_updated',
nodeId: updatedCollaboration.node_id,
userId: updatedCollaboration.user_id,
workspaceId,
});
}
}
if (parentIdsToCheck.length > 0) {
for (const parentId of parentIdsToCheck) {
await checkChildCollaborations(parentId, userId, workspaceId);
}
}
};

View File

@@ -3,6 +3,8 @@ import { JobMap } from '@/types/jobs';
import { sendEmailHandler } from '@/jobs/send-email';
import { cleanWorkspaceDataHandler } from '@/jobs/clean-workspace-data';
import { createCollaborationsHandler } from '@/jobs/create-collaborations';
import { deleteCollaborationsHandler } from '@/jobs/delete-collaborations';
type JobHandlerMap = {
[K in keyof JobMap]: JobHandler<JobMap[K]['input']>;
@@ -11,4 +13,6 @@ type JobHandlerMap = {
export const jobHandlerMap: JobHandlerMap = {
send_email: sendEmailHandler,
clean_workspace_data: cleanWorkspaceDataHandler,
create_collaborations: createCollaborationsHandler,
delete_collaborations: deleteCollaborationsHandler,
};

View File

@@ -33,7 +33,8 @@ import {
} from '@/types/nodes';
import { buildDefaultCollaboration } from '@/lib/collaborations';
import { eventBus } from '@/lib/event-bus';
import { logService } from './log';
import { logService } from '@/services/log';
import { jobService } from '@/services/job-service';
const UPDATE_RETRIES_LIMIT = 10;
@@ -186,103 +187,58 @@ class NodeService {
const date = new Date();
const transactionId = generateId(IdType.Transaction);
const { removedCollaborators, addedCollaborators } =
const { addedCollaborators, removedCollaborators } =
this.checkCollaboratorChanges(
node,
ancestors.filter((a) => a.id !== input.nodeId),
attributes
);
const collaborations: CreateCollaboration[] = addedCollaborators.map(
(collaboratorId) =>
buildDefaultCollaboration(
collaboratorId,
input.nodeId,
attributes.type,
input.workspaceId
)
);
try {
const {
updatedNode,
createdTransaction,
createdCollaborations,
updatedCollaborations,
} = await database.transaction().execute(async (trx) => {
const updatedNode = await trx
.updateTable('nodes')
.returningAll()
.set({
attributes: attributesJson,
updated_at: date,
updated_by: input.userId,
transaction_id: transactionId,
})
.where('id', '=', input.nodeId)
.where('transaction_id', '=', node.transactionId)
.executeTakeFirst();
if (!updatedNode) {
throw new Error('Failed to update node');
}
const createdTransaction = await trx
.insertInto('node_transactions')
.returningAll()
.values({
id: transactionId,
node_id: input.nodeId,
workspace_id: input.workspaceId,
type: 'update',
data: update,
created_at: date,
created_by: input.userId,
server_created_at: date,
})
.executeTakeFirst();
if (!createdTransaction) {
throw new Error('Failed to create transaction');
}
let createdCollaborations: SelectCollaboration[] = [];
let updatedCollaborations: SelectCollaboration[] = [];
if (collaborations.length > 0) {
createdCollaborations = await trx
.insertInto('collaborations')
.returningAll()
.values(collaborations)
.execute();
if (createdCollaborations.length !== collaborations.length) {
throw new Error('Failed to create collaborations');
}
}
if (removedCollaborators.length > 0) {
updatedCollaborations = await trx
.updateTable('collaborations')
const { updatedNode, createdTransaction } = await database
.transaction()
.execute(async (trx) => {
const updatedNode = await trx
.updateTable('nodes')
.returningAll()
.set({
deleted_at: date,
attributes: attributesJson,
updated_at: date,
updated_by: input.userId,
transaction_id: transactionId,
})
.where('node_id', '=', input.nodeId)
.where('user_id', 'in', removedCollaborators)
.execute();
.where('id', '=', input.nodeId)
.where('transaction_id', '=', node.transactionId)
.executeTakeFirst();
if (updatedCollaborations.length !== removedCollaborators.length) {
throw new Error('Failed to remove collaborations');
if (!updatedNode) {
throw new Error('Failed to update node');
}
}
return {
updatedNode,
createdTransaction,
createdCollaborations,
updatedCollaborations,
};
});
const createdTransaction = await trx
.insertInto('node_transactions')
.returningAll()
.values({
id: transactionId,
node_id: input.nodeId,
workspace_id: input.workspaceId,
type: 'update',
data: update,
created_at: date,
created_by: input.userId,
server_created_at: date,
})
.executeTakeFirst();
if (!createdTransaction) {
throw new Error('Failed to create transaction');
}
return {
updatedNode,
createdTransaction,
};
});
eventBus.publish({
type: 'node_transaction_created',
@@ -291,21 +247,21 @@ class NodeService {
workspaceId: input.workspaceId,
});
for (const collaboration of createdCollaborations) {
eventBus.publish({
type: 'collaboration_created',
userId: collaboration.user_id,
nodeId: collaboration.node_id,
workspaceId: collaboration.workspace_id,
for (const addedCollaborator of addedCollaborators) {
jobService.addJob({
type: 'create_collaborations',
nodeId: input.nodeId,
userId: addedCollaborator,
workspaceId: input.workspaceId,
});
}
for (const collaboration of updatedCollaborations) {
eventBus.publish({
type: 'collaboration_updated',
userId: collaboration.user_id,
nodeId: collaboration.node_id,
workspaceId: collaboration.workspace_id,
for (const removedCollaborator of removedCollaborators) {
jobService.addJob({
type: 'delete_collaborations',
nodeId: input.nodeId,
userId: removedCollaborator,
workspaceId: input.workspaceId,
});
}
@@ -314,8 +270,6 @@ class NodeService {
output: {
node: updatedNode,
transaction: createdTransaction,
createdCollaborations,
updatedCollaborations,
},
};
} catch (error) {
@@ -491,106 +445,61 @@ class NodeService {
return { type: 'error', output: null };
}
const { removedCollaborators, addedCollaborators } =
const { addedCollaborators, removedCollaborators } =
this.checkCollaboratorChanges(
node,
ancestors.filter((a) => a.id !== input.nodeId),
attributes
);
const collaborations: CreateCollaboration[] = addedCollaborators.map(
(collaboratorId) =>
buildDefaultCollaboration(
collaboratorId,
input.nodeId,
attributes.type,
context.workspaceId
)
);
try {
const {
updatedNode,
createdTransaction,
createdCollaborations,
updatedCollaborations,
} = await database.transaction().execute(async (trx) => {
const updatedNode = await trx
.updateTable('nodes')
.returningAll()
.set({
attributes: attributesJson,
updated_at: input.createdAt,
updated_by: input.userId,
transaction_id: input.id,
})
.where('id', '=', input.nodeId)
.where('transaction_id', '=', node.transactionId)
.executeTakeFirst();
if (!updatedNode) {
throw new Error('Failed to update node');
}
const createdTransaction = await trx
.insertInto('node_transactions')
.returningAll()
.values({
id: input.id,
node_id: input.nodeId,
workspace_id: context.workspaceId,
type: 'update',
data:
typeof input.data === 'string'
? decodeState(input.data)
: input.data,
created_at: input.createdAt,
created_by: input.userId,
server_created_at: new Date(),
})
.executeTakeFirst();
if (!createdTransaction) {
throw new Error('Failed to create transaction');
}
let createdCollaborations: SelectCollaboration[] = [];
let updatedCollaborations: SelectCollaboration[] = [];
if (collaborations.length > 0) {
createdCollaborations = await trx
.insertInto('collaborations')
.returningAll()
.values(collaborations)
.execute();
if (createdCollaborations.length !== collaborations.length) {
throw new Error('Failed to create collaborations');
}
}
if (removedCollaborators.length > 0) {
updatedCollaborations = await trx
.updateTable('collaborations')
const { updatedNode, createdTransaction } = await database
.transaction()
.execute(async (trx) => {
const updatedNode = await trx
.updateTable('nodes')
.returningAll()
.set({
deleted_at: input.createdAt,
attributes: attributesJson,
updated_at: input.createdAt,
updated_by: input.userId,
transaction_id: input.id,
})
.where('node_id', '=', input.nodeId)
.where('user_id', 'in', removedCollaborators)
.execute();
.where('id', '=', input.nodeId)
.where('transaction_id', '=', node.transactionId)
.executeTakeFirst();
if (updatedCollaborations.length !== removedCollaborators.length) {
throw new Error('Failed to remove collaborations');
if (!updatedNode) {
throw new Error('Failed to update node');
}
}
return {
updatedNode,
createdTransaction,
createdCollaborations,
updatedCollaborations,
};
});
const createdTransaction = await trx
.insertInto('node_transactions')
.returningAll()
.values({
id: input.id,
node_id: input.nodeId,
workspace_id: context.workspaceId,
type: 'update',
data:
typeof input.data === 'string'
? decodeState(input.data)
: input.data,
created_at: input.createdAt,
created_by: input.userId,
server_created_at: new Date(),
})
.executeTakeFirst();
if (!createdTransaction) {
throw new Error('Failed to create transaction');
}
return {
updatedNode,
createdTransaction,
};
});
eventBus.publish({
type: 'node_transaction_created',
@@ -599,21 +508,21 @@ class NodeService {
workspaceId: context.workspaceId,
});
for (const collaboration of createdCollaborations) {
eventBus.publish({
type: 'collaboration_created',
userId: collaboration.user_id,
nodeId: collaboration.node_id,
workspaceId: collaboration.workspace_id,
for (const addedCollaborator of addedCollaborators) {
jobService.addJob({
type: 'create_collaborations',
nodeId: input.nodeId,
userId: addedCollaborator,
workspaceId: context.workspaceId,
});
}
for (const collaboration of updatedCollaborations) {
eventBus.publish({
type: 'collaboration_updated',
userId: collaboration.user_id,
nodeId: collaboration.node_id,
workspaceId: collaboration.workspace_id,
for (const removedCollaborator of removedCollaborators) {
jobService.addJob({
type: 'delete_collaborations',
nodeId: input.nodeId,
userId: removedCollaborator,
workspaceId: context.workspaceId,
});
}
@@ -622,8 +531,6 @@ class NodeService {
output: {
node: updatedNode,
transaction: createdTransaction,
createdCollaborations,
updatedCollaborations,
},
};
} catch (error) {

View File

@@ -35,8 +35,6 @@ export type UpdateNodeInput = {
export type UpdateNodeOutput = {
node: SelectNode;
transaction: SelectNodeTransaction;
createdCollaborations: SelectCollaboration[];
updatedCollaborations: SelectCollaboration[];
};
export type ApplyNodeCreateTransactionInput = {
@@ -63,8 +61,6 @@ export type ApplyNodeUpdateTransactionInput = {
export type ApplyNodeUpdateTransactionOutput = {
node: SelectNode;
transaction: SelectNodeTransaction;
createdCollaborations: SelectCollaboration[];
updatedCollaborations: SelectCollaboration[];
};
export type ApplyNodeDeleteTransactionInput = {