Improve events in synapse service

This commit is contained in:
Hakan Shehu
2024-11-29 22:46:16 +01:00
parent 16fdbbd71f
commit 6ae82332c4
3 changed files with 175 additions and 28 deletions

View File

@@ -93,8 +93,7 @@ class NodeService {
);
eventBus.publish({
type: 'node_transaction_created',
transactionId: transactionId,
type: 'node_created',
nodeId: input.nodeId,
nodeType: input.attributes.type,
workspaceId: input.workspaceId,
@@ -232,8 +231,7 @@ class NodeService {
});
eventBus.publish({
type: 'node_transaction_created',
transactionId: transactionId,
type: 'node_updated',
nodeId: input.nodeId,
nodeType: node.type,
workspaceId: input.workspaceId,
@@ -312,8 +310,7 @@ class NodeService {
);
eventBus.publish({
type: 'node_transaction_created',
transactionId: input.id,
type: 'node_created',
nodeId: input.nodeId,
nodeType: attributes.type,
workspaceId: context.workspaceId,
@@ -465,13 +462,26 @@ class NodeService {
});
eventBus.publish({
type: 'node_transaction_created',
transactionId: input.id,
type: 'node_updated',
nodeId: input.nodeId,
nodeType: node.type,
workspaceId: context.workspaceId,
});
const addedCollaboratorIds = Object.keys(
collaboratorChanges.addedCollaborators
);
if (addedCollaboratorIds.length > 0) {
for (const userId of addedCollaboratorIds) {
eventBus.publish({
type: 'collaborator_added',
userId,
nodeId: input.nodeId,
});
}
}
const removedCollaboratorIds = Object.keys(
collaboratorChanges.removedCollaborators
);
@@ -567,8 +577,7 @@ class NodeService {
});
eventBus.publish({
type: 'node_transaction_created',
transactionId: input.id,
type: 'node_deleted',
nodeId: input.nodeId,
nodeType: node.type,
workspaceId: workspaceUser.workspace_id,

View File

@@ -22,7 +22,10 @@ import { eventBus } from '@/lib/event-bus';
import {
CollaboratorRemovedEvent,
InteractionUpdatedEvent,
NodeTransactionCreatedEvent,
NodeCreatedEvent,
NodeUpdatedEvent,
NodeDeletedEvent,
CollaboratorAddedEvent,
} from '@/types/events';
interface SynapseUserCursor {
@@ -50,8 +53,14 @@ class SynapseService {
constructor() {
eventBus.subscribe((event) => {
if (event.type === 'node_transaction_created') {
this.handleNodeTransactionCreatedEvent(event);
if (event.type === 'node_created') {
this.handleNodeCreatedEvent(event);
} else if (event.type === 'node_updated') {
this.handleNodeUpdatedEvent(event);
} else if (event.type === 'node_deleted') {
this.handleNodeDeletedEvent(event);
} else if (event.type === 'collaborator_added') {
this.handleCollaboratorAddedEvent(event);
} else if (event.type === 'collaborator_removed') {
this.handleCollaboratorRemovedEvent(event);
} else if (event.type === 'interaction_updated') {
@@ -400,9 +409,96 @@ class SynapseService {
this.sendMessage(connection, message);
}
private async handleNodeTransactionCreatedEvent(
event: NodeTransactionCreatedEvent
) {
private async handleNodeCreatedEvent(event: NodeCreatedEvent) {
const userDevices = this.getPendingNodeTransactionCursors(
event.workspaceId
);
const userIds = Array.from(userDevices.keys());
if (userIds.length === 0) {
return;
}
let usersToSend: string[] = [];
if (PUBLIC_NODES.includes(event.nodeType)) {
usersToSend = userIds;
} else {
const collaborations = await database
.selectFrom('collaborations')
.selectAll()
.where((eb) =>
eb.and([
eb('user_id', 'in', userIds),
eb('node_id', '=', event.nodeId),
])
)
.execute();
usersToSend = collaborations.map((c) => c.user_id);
}
if (usersToSend.length === 0) {
return;
}
for (const userId of usersToSend) {
const deviceIds = userDevices.get(userId) ?? [];
for (const deviceId of deviceIds) {
const socketConnection = this.connections.get(deviceId);
if (socketConnection === undefined) {
continue;
}
this.sendPendingTransactions(socketConnection, userId);
this.sendPendingCollaborations(socketConnection, userId);
}
}
}
private async handleNodeUpdatedEvent(event: NodeUpdatedEvent) {
const userDevices = this.getPendingNodeTransactionCursors(
event.workspaceId
);
const userIds = Array.from(userDevices.keys());
if (userIds.length === 0) {
return;
}
let usersToSend: string[] = [];
if (PUBLIC_NODES.includes(event.nodeType)) {
usersToSend = userIds;
} else {
const collaborations = await database
.selectFrom('collaborations')
.selectAll()
.where((eb) =>
eb.and([
eb('user_id', 'in', userIds),
eb('node_id', '=', event.nodeId),
])
)
.execute();
usersToSend = collaborations.map((c) => c.user_id);
}
if (usersToSend.length === 0) {
return;
}
for (const userId of usersToSend) {
const deviceIds = userDevices.get(userId) ?? [];
for (const deviceId of deviceIds) {
const socketConnection = this.connections.get(deviceId);
if (socketConnection === undefined) {
continue;
}
this.sendPendingTransactions(socketConnection, userId);
}
}
}
private async handleNodeDeletedEvent(event: NodeDeletedEvent) {
const userDevices = this.getPendingNodeTransactionCursors(
event.workspaceId
);
@@ -453,14 +549,6 @@ class SynapseService {
return;
}
const collaborations = await database
.selectFrom('collaborations')
.selectAll()
.where((eb) =>
eb.and([eb('user_id', 'in', userIds), eb('node_id', '=', event.nodeId)])
)
.execute();
let usersToSend: string[] = [];
if (PUBLIC_NODES.includes(event.nodeType)) {
usersToSend = userIds;
@@ -492,6 +580,18 @@ class SynapseService {
}
}
private handleCollaboratorAddedEvent(event: CollaboratorAddedEvent) {
const deviceIds = this.getPendingCollaborationsCursors(event.userId);
for (const deviceId of deviceIds) {
const socketConnection = this.connections.get(deviceId);
if (socketConnection === undefined) {
continue;
}
this.sendPendingCollaborations(socketConnection, event.userId);
}
}
private handleCollaboratorRemovedEvent(event: CollaboratorRemovedEvent) {
const deviceIds = this.getPendingRevocationsCursors(event.userId);
for (const deviceId of deviceIds) {
@@ -544,6 +644,22 @@ class SynapseService {
return userDevices;
}
private getPendingCollaborationsCursors(userId: string): string[] {
const userDevices: string[] = [];
for (const connection of this.connections.values()) {
const connectionUsers = connection.collaborations.values();
for (const user of connectionUsers) {
if (user.userId !== userId || user.syncing) {
continue;
}
userDevices.push(connection.deviceId);
}
}
return userDevices;
}
private getPendingRevocationsCursors(userId: string): string[] {
const userDevices: string[] = [];
for (const connection of this.connections.values()) {

View File

@@ -1,13 +1,32 @@
import { NodeType } from '@colanode/core';
export type NodeTransactionCreatedEvent = {
type: 'node_transaction_created';
transactionId: string;
export type NodeCreatedEvent = {
type: 'node_created';
nodeId: string;
nodeType: NodeType;
workspaceId: string;
};
export type NodeUpdatedEvent = {
type: 'node_updated';
nodeId: string;
nodeType: NodeType;
workspaceId: string;
};
export type NodeDeletedEvent = {
type: 'node_deleted';
nodeId: string;
nodeType: NodeType;
workspaceId: string;
};
export type CollaboratorAddedEvent = {
type: 'collaborator_added';
userId: string;
nodeId: string;
};
export type CollaboratorRemovedEvent = {
type: 'collaborator_removed';
userId: string;
@@ -23,6 +42,9 @@ export type InteractionUpdatedEvent = {
};
export type Event =
| NodeTransactionCreatedEvent
| NodeCreatedEvent
| NodeUpdatedEvent
| NodeDeletedEvent
| CollaboratorAddedEvent
| CollaboratorRemovedEvent
| InteractionUpdatedEvent;