From 6ae82332c46b50466c1b9df8a066c0bd16d8a8b5 Mon Sep 17 00:00:00 2001 From: Hakan Shehu Date: Fri, 29 Nov 2024 22:46:16 +0100 Subject: [PATCH] Improve events in synapse service --- apps/server/src/services/node-service.ts | 29 ++-- apps/server/src/services/synapse-service.ts | 144 ++++++++++++++++++-- apps/server/src/types/events.ts | 30 +++- 3 files changed, 175 insertions(+), 28 deletions(-) diff --git a/apps/server/src/services/node-service.ts b/apps/server/src/services/node-service.ts index 32acfa49..cf0d5b2f 100644 --- a/apps/server/src/services/node-service.ts +++ b/apps/server/src/services/node-service.ts @@ -93,8 +93,7 @@ class NodeService { ); eventBus.publish({ - type: 'node_transaction_created', - transactionId: transactionId, + type: 'node_created', nodeId: input.nodeId, nodeType: input.attributes.type, workspaceId: input.workspaceId, @@ -232,8 +231,7 @@ class NodeService { }); eventBus.publish({ - type: 'node_transaction_created', - transactionId: transactionId, + type: 'node_updated', nodeId: input.nodeId, nodeType: node.type, workspaceId: input.workspaceId, @@ -312,8 +310,7 @@ class NodeService { ); eventBus.publish({ - type: 'node_transaction_created', - transactionId: input.id, + type: 'node_created', nodeId: input.nodeId, nodeType: attributes.type, workspaceId: context.workspaceId, @@ -465,13 +462,26 @@ class NodeService { }); eventBus.publish({ - type: 'node_transaction_created', - transactionId: input.id, + type: 'node_updated', nodeId: input.nodeId, nodeType: node.type, workspaceId: context.workspaceId, }); + const addedCollaboratorIds = Object.keys( + collaboratorChanges.addedCollaborators + ); + + if (addedCollaboratorIds.length > 0) { + for (const userId of addedCollaboratorIds) { + eventBus.publish({ + type: 'collaborator_added', + userId, + nodeId: input.nodeId, + }); + } + } + const removedCollaboratorIds = Object.keys( collaboratorChanges.removedCollaborators ); @@ -567,8 +577,7 @@ class NodeService { }); eventBus.publish({ - type: 'node_transaction_created', - transactionId: input.id, + type: 'node_deleted', nodeId: input.nodeId, nodeType: node.type, workspaceId: workspaceUser.workspace_id, diff --git a/apps/server/src/services/synapse-service.ts b/apps/server/src/services/synapse-service.ts index b9ded2ae..67e678af 100644 --- a/apps/server/src/services/synapse-service.ts +++ b/apps/server/src/services/synapse-service.ts @@ -22,7 +22,10 @@ import { eventBus } from '@/lib/event-bus'; import { CollaboratorRemovedEvent, InteractionUpdatedEvent, - NodeTransactionCreatedEvent, + NodeCreatedEvent, + NodeUpdatedEvent, + NodeDeletedEvent, + CollaboratorAddedEvent, } from '@/types/events'; interface SynapseUserCursor { @@ -50,8 +53,14 @@ class SynapseService { constructor() { eventBus.subscribe((event) => { - if (event.type === 'node_transaction_created') { - this.handleNodeTransactionCreatedEvent(event); + if (event.type === 'node_created') { + this.handleNodeCreatedEvent(event); + } else if (event.type === 'node_updated') { + this.handleNodeUpdatedEvent(event); + } else if (event.type === 'node_deleted') { + this.handleNodeDeletedEvent(event); + } else if (event.type === 'collaborator_added') { + this.handleCollaboratorAddedEvent(event); } else if (event.type === 'collaborator_removed') { this.handleCollaboratorRemovedEvent(event); } else if (event.type === 'interaction_updated') { @@ -400,9 +409,96 @@ class SynapseService { this.sendMessage(connection, message); } - private async handleNodeTransactionCreatedEvent( - event: NodeTransactionCreatedEvent - ) { + private async handleNodeCreatedEvent(event: NodeCreatedEvent) { + const userDevices = this.getPendingNodeTransactionCursors( + event.workspaceId + ); + const userIds = Array.from(userDevices.keys()); + if (userIds.length === 0) { + return; + } + + let usersToSend: string[] = []; + if (PUBLIC_NODES.includes(event.nodeType)) { + usersToSend = userIds; + } else { + const collaborations = await database + .selectFrom('collaborations') + .selectAll() + .where((eb) => + eb.and([ + eb('user_id', 'in', userIds), + eb('node_id', '=', event.nodeId), + ]) + ) + .execute(); + + usersToSend = collaborations.map((c) => c.user_id); + } + + if (usersToSend.length === 0) { + return; + } + + for (const userId of usersToSend) { + const deviceIds = userDevices.get(userId) ?? []; + for (const deviceId of deviceIds) { + const socketConnection = this.connections.get(deviceId); + if (socketConnection === undefined) { + continue; + } + + this.sendPendingTransactions(socketConnection, userId); + this.sendPendingCollaborations(socketConnection, userId); + } + } + } + + private async handleNodeUpdatedEvent(event: NodeUpdatedEvent) { + const userDevices = this.getPendingNodeTransactionCursors( + event.workspaceId + ); + const userIds = Array.from(userDevices.keys()); + if (userIds.length === 0) { + return; + } + + let usersToSend: string[] = []; + if (PUBLIC_NODES.includes(event.nodeType)) { + usersToSend = userIds; + } else { + const collaborations = await database + .selectFrom('collaborations') + .selectAll() + .where((eb) => + eb.and([ + eb('user_id', 'in', userIds), + eb('node_id', '=', event.nodeId), + ]) + ) + .execute(); + + usersToSend = collaborations.map((c) => c.user_id); + } + + if (usersToSend.length === 0) { + return; + } + + for (const userId of usersToSend) { + const deviceIds = userDevices.get(userId) ?? []; + for (const deviceId of deviceIds) { + const socketConnection = this.connections.get(deviceId); + if (socketConnection === undefined) { + continue; + } + + this.sendPendingTransactions(socketConnection, userId); + } + } + } + + private async handleNodeDeletedEvent(event: NodeDeletedEvent) { const userDevices = this.getPendingNodeTransactionCursors( event.workspaceId ); @@ -453,14 +549,6 @@ class SynapseService { return; } - const collaborations = await database - .selectFrom('collaborations') - .selectAll() - .where((eb) => - eb.and([eb('user_id', 'in', userIds), eb('node_id', '=', event.nodeId)]) - ) - .execute(); - let usersToSend: string[] = []; if (PUBLIC_NODES.includes(event.nodeType)) { usersToSend = userIds; @@ -492,6 +580,18 @@ class SynapseService { } } + private handleCollaboratorAddedEvent(event: CollaboratorAddedEvent) { + const deviceIds = this.getPendingCollaborationsCursors(event.userId); + for (const deviceId of deviceIds) { + const socketConnection = this.connections.get(deviceId); + if (socketConnection === undefined) { + continue; + } + + this.sendPendingCollaborations(socketConnection, event.userId); + } + } + private handleCollaboratorRemovedEvent(event: CollaboratorRemovedEvent) { const deviceIds = this.getPendingRevocationsCursors(event.userId); for (const deviceId of deviceIds) { @@ -544,6 +644,22 @@ class SynapseService { return userDevices; } + private getPendingCollaborationsCursors(userId: string): string[] { + const userDevices: string[] = []; + for (const connection of this.connections.values()) { + const connectionUsers = connection.collaborations.values(); + for (const user of connectionUsers) { + if (user.userId !== userId || user.syncing) { + continue; + } + + userDevices.push(connection.deviceId); + } + } + + return userDevices; + } + private getPendingRevocationsCursors(userId: string): string[] { const userDevices: string[] = []; for (const connection of this.connections.values()) { diff --git a/apps/server/src/types/events.ts b/apps/server/src/types/events.ts index 83ed2745..923778c1 100644 --- a/apps/server/src/types/events.ts +++ b/apps/server/src/types/events.ts @@ -1,13 +1,32 @@ import { NodeType } from '@colanode/core'; -export type NodeTransactionCreatedEvent = { - type: 'node_transaction_created'; - transactionId: string; +export type NodeCreatedEvent = { + type: 'node_created'; nodeId: string; nodeType: NodeType; workspaceId: string; }; +export type NodeUpdatedEvent = { + type: 'node_updated'; + nodeId: string; + nodeType: NodeType; + workspaceId: string; +}; + +export type NodeDeletedEvent = { + type: 'node_deleted'; + nodeId: string; + nodeType: NodeType; + workspaceId: string; +}; + +export type CollaboratorAddedEvent = { + type: 'collaborator_added'; + userId: string; + nodeId: string; +}; + export type CollaboratorRemovedEvent = { type: 'collaborator_removed'; userId: string; @@ -23,6 +42,9 @@ export type InteractionUpdatedEvent = { }; export type Event = - | NodeTransactionCreatedEvent + | NodeCreatedEvent + | NodeUpdatedEvent + | NodeDeletedEvent + | CollaboratorAddedEvent | CollaboratorRemovedEvent | InteractionUpdatedEvent;