From 7b699b2b1e42a96a6a5c077852dfaf137f4b4049 Mon Sep 17 00:00:00 2001 From: Hakan Shehu Date: Sun, 27 Oct 2024 19:00:41 +0100 Subject: [PATCH] Refactor events --- server/src/queues/events.ts | 75 ++++++++++++++++++------------------- 1 file changed, 37 insertions(+), 38 deletions(-) diff --git a/server/src/queues/events.ts b/server/src/queues/events.ts index 90488c6b..e0d0ee28 100644 --- a/server/src/queues/events.ts +++ b/server/src/queues/events.ts @@ -57,9 +57,27 @@ const handleEventJob = async (job: Job) => { const handleNodeCreatedEvent = async ( event: NodeCreatedEvent, ): Promise => { + await createNodeUserStates(event); + await publishChange(event.id, event.workspaceId, 'node_create'); +}; + +const handleNodeUpdatedEvent = async ( + event: NodeUpdatedEvent, +): Promise => { + await checkForCollaboratorsChange(event); + await publishChange(event.id, event.workspaceId, 'node_update'); +}; + +const handleNodeDeletedEvent = async ( + event: NodeDeletedEvent, +): Promise => { + await publishChange(event.id, event.workspaceId, 'node_delete'); +}; + +const createNodeUserStates = async (event: NodeCreatedEvent): Promise => { + const userStatesToCreate: CreateNodeUserState[] = []; if (event.attributes.type === NodeTypes.User) { const userIds = await fetchWorkspaceUsers(event.workspaceId); - const userStatesToCreate: CreateNodeUserState[] = []; for (const userId of userIds) { userStatesToCreate.push({ @@ -92,46 +110,33 @@ const handleNodeCreatedEvent = async ( updated_at: null, }); } - - if (userStatesToCreate.length > 0) { - await database - .insertInto('node_user_states') - .values(userStatesToCreate) - .onConflict((cb) => cb.doNothing()) - .execute(); - } } else { const collaborators = await fetchNodeCollaborators(event.id); const collaboratorIds = collaborators.map((c) => c.collaboratorId); + for (const collaboratorId of collaboratorIds) { + userStatesToCreate.push({ + user_id: collaboratorId, + node_id: event.id, + last_seen_version_id: null, + workspace_id: event.workspaceId, + last_seen_at: null, + mentions_count: 0, + created_at: new Date(), + access_removed_at: null, + version_id: generateId(IdType.Version), + updated_at: null, + }); + } + } + + if (userStatesToCreate.length > 0) { await database .insertInto('node_user_states') - .values( - collaboratorIds.map((collaboratorId) => ({ - user_id: collaboratorId, - node_id: event.id, - last_seen_version_id: null, - workspace_id: event.workspaceId, - last_seen_at: null, - mentions_count: 0, - created_at: new Date(), - access_removed_at: null, - version_id: generateId(IdType.Version), - updated_at: null, - })), - ) + .values(userStatesToCreate) .onConflict((cb) => cb.doNothing()) .execute(); } - - await publishChange(event.id, event.workspaceId, 'node_create'); -}; - -const handleNodeUpdatedEvent = async ( - event: NodeUpdatedEvent, -): Promise => { - await checkForCollaboratorsChange(event); - await publishChange(event.id, event.workspaceId, 'node_update'); }; const checkForCollaboratorsChange = async ( @@ -242,12 +247,6 @@ const extractCollaboratorIds = (collaborators: ServerNodeAttributes) => { return Object.keys(collaborators.collaborators).sort(); }; -const handleNodeDeletedEvent = async ( - event: NodeDeletedEvent, -): Promise => { - await publishChange(event.id, event.workspaceId, 'node_delete'); -}; - const publishChange = async ( nodeId: string, workspaceId: string,