Sync improvements

This commit is contained in:
Hakan Shehu
2024-10-28 20:35:30 +01:00
parent 70503d5a98
commit 9141d60b5c
42 changed files with 1176 additions and 591 deletions

View File

@@ -9,9 +9,9 @@ import { syncRouter } from '@/routes/sync';
import { configRouter } from '@/routes/config';
import { avatarsRouter } from '@/routes/avatars';
import { filesRouter } from '@/routes/files';
import { socketManager } from '@/sockets/socket-manager';
import { synapse } from '@/services/synapse';
export const initApi = () => {
export const initApi = async () => {
const app = express();
const port = 3000;
@@ -34,7 +34,7 @@ export const initApi = () => {
app.use('/v1/files', authMiddleware, filesRouter);
const server = http.createServer(app);
socketManager.init(server);
await synapse.init(server);
server.listen(port, () => {
console.log(`Server is running at http://localhost:${port}`);

View File

@@ -1,155 +0,0 @@
import { database } from '@/data/database';
import { redis, CHANNEL_NAMES } from '@/data/redis';
import { getIdType, IdType } from '@/lib/id';
import { socketManager } from '@/sockets/socket-manager';
import { ServerNodeChangeEvent } from '@/types/sync';
export const initChangesSubscriber = async () => {
const subscriber = redis.duplicate();
await subscriber.connect();
await subscriber.subscribe(CHANNEL_NAMES.CHANGES, handleEvent);
};
const handleEvent = async (event: string) => {
const data: ServerNodeChangeEvent = JSON.parse(event);
if (data.type === 'node_create') {
const id = data.nodeId;
const idType = getIdType(id);
if (idType === IdType.User) {
const workspaceUser = await database
.selectFrom('workspace_users')
.selectAll()
.where('id', '=', id)
.executeTakeFirst();
if (!workspaceUser) {
return;
}
const devices = await database
.selectFrom('devices')
.selectAll()
.where('account_id', '=', workspaceUser.account_id)
.execute();
for (const device of devices) {
const socketConnection = socketManager.getConnection(device.id);
if (socketConnection === undefined) {
continue;
}
socketConnection.addWorkspaceUser({
workspaceId: data.workspaceId,
userId: id,
});
}
}
}
const userDevices = new Map<string, string[]>();
for (const connection of socketManager.getConnections()) {
const workspaceUsers = connection.getWorkspaceUsers();
for (const workspaceUser of workspaceUsers) {
if (workspaceUser.workspaceId !== data.workspaceId) {
continue;
}
const userIds = userDevices.get(workspaceUser.userId) ?? [];
userIds.push(connection.getDeviceId());
userDevices.set(workspaceUser.userId, userIds);
}
}
const userIds = Array.from(userDevices.keys());
if (userIds.length === 0) {
return;
}
const nodeUserStates = await database
.selectFrom('node_user_states')
.selectAll()
.where((eb) =>
eb.and([eb('user_id', 'in', userIds), eb('node_id', '=', data.nodeId)]),
)
.execute();
if (nodeUserStates.length === 0) {
return;
}
if (data.type === 'node_delete') {
for (const nodeUserState of nodeUserStates) {
const deviceIds = userDevices.get(nodeUserState.user_id) ?? [];
for (const deviceId of deviceIds) {
const socketConnection = socketManager.getConnection(deviceId);
if (socketConnection === undefined) {
continue;
}
socketConnection.send({
type: 'server_node_delete',
id: data.nodeId,
workspaceId: data.workspaceId,
});
}
}
return;
}
const node = await database
.selectFrom('nodes')
.select([
'id',
'state',
'created_at',
'created_by',
'updated_at',
'updated_by',
'server_created_at',
'server_updated_at',
'version_id',
])
.where('id', '=', data.nodeId)
.executeTakeFirst();
if (!node) {
return;
}
for (const nodeUserState of nodeUserStates) {
const deviceIds = userDevices.get(nodeUserState.user_id) ?? [];
if (deviceIds.length === 0) {
continue;
}
for (const deviceId of deviceIds) {
const socketConnection = socketManager.getConnection(deviceId);
if (socketConnection === undefined) {
continue;
}
if (nodeUserState.access_removed_at !== null) {
socketConnection.send({
type: 'server_node_delete',
id: data.nodeId,
workspaceId: data.workspaceId,
});
} else {
socketConnection.send({
type: 'server_node_sync',
id: node.id,
workspaceId: data.workspaceId,
state: node.state!,
createdAt: node.created_at.toISOString(),
createdBy: node.created_by,
updatedAt: node.updated_at?.toISOString() ?? null,
updatedBy: node.updated_by ?? null,
serverCreatedAt: node.server_created_at.toISOString(),
serverUpdatedAt: node.server_updated_at?.toISOString() ?? null,
versionId: node.version_id,
});
}
}
}
};

View File

@@ -31,5 +31,5 @@ export const initRedis = async () => {
};
export const CHANNEL_NAMES = {
CHANGES: process.env.REDIS_CHANGES_CHANNEL_NAME || 'neuron_changes',
SYNAPSE: process.env.REDIS_SYNAPSE_CHANNEL_NAME || 'neuron_synapse',
};

View File

@@ -1,21 +1,16 @@
import { initApi } from '@/api';
import { initRedis } from '@/data/redis';
import { initChangesSubscriber } from '@/consumers/changes-subcriber';
import { migrate } from '@/data/database';
import { initEventWorker } from '@/queues/events';
import { initTaskWorker } from '@/queues/tasks';
migrate().then(() => {
initApi();
const init = async () => {
await migrate();
await initRedis();
await initApi();
initRedis().then(() => {
console.log('Redis initialized');
initEventWorker();
initTaskWorker();
};
initEventWorker();
initTaskWorker();
initChangesSubscriber().then(() => {
console.log('Change subscriber started');
});
});
});
init();

View File

@@ -1,18 +0,0 @@
import { LocalNodeSyncMessageInput } from '@/messages/local-node-sync';
import { LocalNodeDeleteMessageInput } from '@/messages/local-node-delete';
import { ServerNodeSyncMessageInput } from '@/messages/server-node-sync';
import { ServerNodeDeleteMessageInput } from '@/messages/server-node-delete';
export interface MessageMap {
local_node_sync: LocalNodeSyncMessageInput;
local_node_delete: LocalNodeDeleteMessageInput;
server_node_sync: ServerNodeSyncMessageInput;
server_node_delete: ServerNodeDeleteMessageInput;
}
export type MessageInput = MessageMap[keyof MessageMap];
export type MessageContext = {
accountId: string;
deviceId: string;
};

View File

@@ -1,5 +0,0 @@
export type LocalNodeDeleteMessageInput = {
type: 'local_node_delete';
nodeId: string;
workspaceId: string;
};

View File

@@ -1,6 +0,0 @@
export type LocalNodeSyncMessageInput = {
type: 'local_node_sync';
nodeId: string;
versionId: string;
workspaceId: string;
};

View File

@@ -1,5 +0,0 @@
export type ServerNodeDeleteMessageInput = {
type: 'server_node_delete';
id: string;
workspaceId: string;
};

View File

@@ -1,13 +0,0 @@
export type ServerNodeSyncMessageInput = {
type: 'server_node_sync';
id: string;
workspaceId: string;
state: string;
createdAt: string;
createdBy: string;
updatedAt: string | null;
updatedBy: string | null;
serverCreatedAt: string;
serverUpdatedAt: string | null;
versionId: string;
};

View File

@@ -1,11 +1,12 @@
import { database } from '@/data/database';
import { CHANNEL_NAMES, redis, redisConfig } from '@/data/redis';
import { redisConfig } from '@/data/redis';
import { CreateNodeUserState } from '@/data/schema';
import { filesStorage } from '@/data/storage';
import { BUCKET_NAMES } from '@/data/storage';
import { NodeTypes } from '@/lib/constants';
import { generateId, IdType } from '@/lib/id';
import { fetchNodeCollaborators, fetchWorkspaceUsers } from '@/lib/nodes';
import { synapse } from '@/services/synapse';
import {
NodeCreatedEvent,
NodeDeletedEvent,
@@ -61,14 +62,22 @@ const handleNodeCreatedEvent = async (
event: NodeCreatedEvent,
): Promise<void> => {
await createNodeUserStates(event);
await publishChange(event.id, event.workspaceId, 'node_create');
await synapse.sendSynapseMessage({
type: 'node_create',
nodeId: event.id,
workspaceId: event.workspaceId,
});
};
const handleNodeUpdatedEvent = async (
event: NodeUpdatedEvent,
): Promise<void> => {
await checkForCollaboratorsChange(event);
await publishChange(event.id, event.workspaceId, 'node_update');
await synapse.sendSynapseMessage({
type: 'node_update',
nodeId: event.id,
workspaceId: event.workspaceId,
});
};
const handleNodeDeletedEvent = async (
@@ -83,7 +92,11 @@ const handleNodeDeletedEvent = async (
await filesStorage.send(command);
}
await publishChange(event.id, event.workspaceId, 'node_delete');
await synapse.sendSynapseMessage({
type: 'node_delete',
nodeId: event.id,
workspaceId: event.workspaceId,
});
};
const createNodeUserStates = async (event: NodeCreatedEvent): Promise<void> => {
@@ -95,9 +108,9 @@ const createNodeUserStates = async (event: NodeCreatedEvent): Promise<void> => {
userStatesToCreate.push({
user_id: userId,
node_id: event.id,
last_seen_version_id: null,
workspace_id: event.workspaceId,
last_seen_at: null,
last_seen_version_id: null,
mentions_count: 0,
created_at: new Date(),
access_removed_at: null,
@@ -112,8 +125,8 @@ const createNodeUserStates = async (event: NodeCreatedEvent): Promise<void> => {
userStatesToCreate.push({
user_id: event.id,
node_id: userId,
last_seen_version_id: null,
workspace_id: event.workspaceId,
last_seen_version_id: null,
last_seen_at: null,
mentions_count: 0,
created_at: new Date(),
@@ -130,9 +143,11 @@ const createNodeUserStates = async (event: NodeCreatedEvent): Promise<void> => {
userStatesToCreate.push({
user_id: collaboratorId,
node_id: event.id,
last_seen_version_id: null,
workspace_id: event.workspaceId,
last_seen_at: null,
last_seen_at:
collaboratorId === event.createdBy ? new Date(event.createdAt) : null,
last_seen_version_id:
collaboratorId === event.createdBy ? event.versionId : null,
mentions_count: 0,
created_at: new Date(),
access_removed_at: null,
@@ -258,17 +273,3 @@ const extractCollaboratorIds = (collaborators: ServerNodeAttributes) => {
return Object.keys(collaborators.collaborators).sort();
};
const publishChange = async (
nodeId: string,
workspaceId: string,
type: 'node_create' | 'node_update' | 'node_delete',
): Promise<void> => {
const changeJson = JSON.stringify({
nodeId,
workspaceId,
type,
});
await redis.publish(CHANNEL_NAMES.CHANGES, changeJson);
};

View File

@@ -1,3 +1,4 @@
import * as Y from 'yjs';
import { ApiError, NeuronRequest, NeuronResponse } from '@/types/api';
import { database } from '@/data/database';
import { Router } from 'express';
@@ -6,6 +7,7 @@ import {
LocalCreateNodeChangeData,
LocalDeleteNodeChangeData,
LocalNodeChangeData,
LocalNodeUserStateChangeData,
LocalUpdateNodeChangeData,
ServerSyncChangeResult,
SyncLocalChangeResult,
@@ -15,13 +17,13 @@ import { SelectWorkspaceUser } from '@/data/schema';
import { fetchCollaboratorRole } from '@/lib/nodes';
import { ServerNodeAttributes } from '@/types/nodes';
import { fromUint8Array, toUint8Array } from 'js-base64';
import * as Y from 'yjs';
import {
NodeCreatedEvent,
NodeDeletedEvent,
NodeUpdatedEvent,
} from '@/types/events';
import { enqueueEvent } from '@/queues/events';
import { synapse } from '@/services/synapse';
export const syncRouter = Router();
@@ -101,6 +103,9 @@ const handleLocalChange = async (
case 'node_delete': {
return handleDeleteNodeChange(workspaceUser, changeData);
}
case 'node_user_state_update': {
return handleNodeUserStateChange(workspaceUser, changeData);
}
default: {
return {
status: 'error',
@@ -298,3 +303,36 @@ const handleDeleteNodeChange = async (
status: 'success',
};
};
const handleNodeUserStateChange = async (
workspaceUser: SelectWorkspaceUser,
changeData: LocalNodeUserStateChangeData,
): Promise<SyncLocalChangeResult> => {
if (workspaceUser.id !== changeData.userId) {
return {
status: 'error',
};
}
await database
.updateTable('node_user_states')
.set({
last_seen_version_id: changeData.lastSeenVersionId,
last_seen_at: new Date(changeData.lastSeenAt),
mentions_count: changeData.mentionsCount,
version_id: changeData.versionId,
updated_at: new Date(changeData.lastSeenAt),
})
.where('node_id', '=', changeData.nodeId)
.where('user_id', '=', changeData.userId)
.execute();
await synapse.sendSynapseMessage({
type: 'node_user_state_update',
nodeId: changeData.nodeId,
userId: changeData.userId,
workspaceId: workspaceUser.workspace_id,
});
return { status: 'success' };
};

View File

@@ -0,0 +1,526 @@
import { database } from '@/data/database';
import { Server } from 'http';
import { WebSocketServer, WebSocket } from 'ws';
import { verifyToken } from '@/lib/tokens';
import { CHANNEL_NAMES } from '@/data/redis';
import { redis } from '@/data/redis';
import {
SynapseMessage,
SynapseNodeChangeMessage,
SynapseNodeUserStateChangeMessage,
} from '@/types/events';
import { getIdType, IdType } from '@/lib/id';
import { MessageInput } from '@/types/messages';
interface SynapseConnection {
accountId: string;
deviceId: string;
workspaceUsers: {
workspaceId: string;
userId: string;
}[];
pendingSyncs: Set<string>;
socket: WebSocket;
pendingSyncTimeout: NodeJS.Timeout | null;
}
class SynapseService {
private readonly connections: Map<string, SynapseConnection> = new Map();
public async init(server: Server) {
const wss = new WebSocketServer({
server,
path: '/v1/synapse',
verifyClient: async (info, callback) => {
const req = info.req;
const token = req.headers['authorization'];
if (!token) {
return callback(false, 401, 'Unauthorized');
}
callback(true);
},
});
wss.on('connection', async (socket, req) => {
const token = req.headers['authorization'];
if (!token) {
socket.close();
return;
}
const result = await verifyToken(token);
if (!result.authenticated) {
socket.close();
return;
}
const account = result.account;
socket.on('close', () => {
const connection = this.connections.get(account.deviceId);
if (connection) {
if (connection.pendingSyncTimeout) {
clearTimeout(connection.pendingSyncTimeout);
}
this.connections.delete(account.deviceId);
}
});
const connection: SynapseConnection = {
accountId: account.id,
deviceId: account.deviceId,
workspaceUsers: [],
pendingSyncs: new Set(),
pendingSyncTimeout: null,
socket,
};
socket.on('message', (message) => {
this.handleSocketMessage(connection, JSON.parse(message.toString()));
});
this.connections.set(account.deviceId, connection);
this.fetchWorkspaceUsers(connection).then(() => {
this.sendPendingChangesDebounced(connection);
});
});
const subscriber = redis.duplicate();
await subscriber.connect();
await subscriber.subscribe(CHANNEL_NAMES.SYNAPSE, (message) =>
this.handleSynapseMessage(message.toString()),
);
}
private sendSocketMessage(
connection: SynapseConnection,
message: MessageInput,
) {
connection.socket.send(JSON.stringify(message));
}
private async handleSocketMessage(
connection: SynapseConnection,
message: MessageInput,
) {
if (message.type === 'local_node_sync') {
await database
.insertInto('node_device_states')
.values({
node_id: message.nodeId,
device_id: connection.deviceId,
node_version_id: message.versionId,
user_state_version_id: null,
user_state_synced_at: null,
workspace_id: message.workspaceId,
node_synced_at: new Date(),
})
.onConflict((cb) =>
cb.columns(['node_id', 'device_id']).doUpdateSet({
workspace_id: message.workspaceId,
node_version_id: message.versionId,
node_synced_at: new Date(),
}),
)
.execute();
} else if (message.type === 'local_node_user_state_sync') {
await database
.insertInto('node_device_states')
.values({
node_id: message.nodeId,
device_id: connection.deviceId,
node_version_id: null,
user_state_version_id: message.versionId,
user_state_synced_at: new Date(),
workspace_id: message.workspaceId,
node_synced_at: new Date(),
})
.onConflict((cb) =>
cb.columns(['node_id', 'device_id']).doUpdateSet({
workspace_id: message.workspaceId,
user_state_version_id: message.versionId,
user_state_synced_at: new Date(),
}),
)
.execute();
} else if (message.type === 'local_node_delete') {
await database
.deleteFrom('node_device_states')
.where('device_id', '=', connection.deviceId)
.where('node_id', '=', message.nodeId)
.execute();
const userId = connection.workspaceUsers.find(
(wu) => wu.workspaceId === message.workspaceId,
)?.userId;
if (userId) {
await database
.deleteFrom('node_user_states')
.where('node_id', '=', message.nodeId)
.where('user_id', '=', userId)
.execute();
}
}
this.sendPendingChangesDebounced(connection);
}
public async sendSynapseMessage(message: SynapseMessage) {
await redis.publish(CHANNEL_NAMES.SYNAPSE, JSON.stringify(message));
}
private async handleSynapseMessage(message: string) {
const data: SynapseMessage = JSON.parse(message);
if (
data.type === 'node_create' ||
data.type === 'node_update' ||
data.type === 'node_delete'
) {
this.handleNodeChangeMessage(data);
} else if (data.type === 'node_user_state_update') {
this.handleNodeUserStateUpdateMessage(data);
}
}
private async handleNodeChangeMessage(data: SynapseNodeChangeMessage) {
const idType = getIdType(data.nodeId);
if (idType === IdType.User) {
await this.addNewWorkspaceUser(data.nodeId, data.workspaceId);
}
await this.broadcastNodeChange(data);
}
private async broadcastNodeChange(data: SynapseNodeChangeMessage) {
const userDevices = this.getWorkspaceUserDevices(data.workspaceId);
const userIds = Array.from(userDevices.keys());
if (userIds.length === 0) {
return;
}
const nodeUserStates = await database
.selectFrom('node_user_states')
.selectAll()
.where((eb) =>
eb.and([eb('user_id', 'in', userIds), eb('node_id', '=', data.nodeId)]),
)
.execute();
if (nodeUserStates.length === 0) {
return;
}
if (data.type === 'node_delete') {
for (const nodeUserState of nodeUserStates) {
const deviceIds = userDevices.get(nodeUserState.user_id) ?? [];
for (const deviceId of deviceIds) {
const socketConnection = this.connections.get(deviceId);
if (socketConnection === undefined) {
continue;
}
this.sendSocketMessage(socketConnection, {
type: 'server_node_delete',
id: data.nodeId,
workspaceId: data.workspaceId,
});
}
}
return;
}
const node = await database
.selectFrom('nodes')
.select([
'id',
'state',
'created_at',
'created_by',
'updated_at',
'updated_by',
'server_created_at',
'server_updated_at',
'version_id',
])
.where('id', '=', data.nodeId)
.executeTakeFirst();
if (!node) {
return;
}
for (const nodeUserState of nodeUserStates) {
const deviceIds = userDevices.get(nodeUserState.user_id) ?? [];
if (deviceIds.length === 0) {
continue;
}
for (const deviceId of deviceIds) {
const socketConnection = this.connections.get(deviceId);
if (socketConnection === undefined) {
continue;
}
if (nodeUserState.access_removed_at !== null) {
this.sendSocketMessage(socketConnection, {
type: 'server_node_delete',
id: data.nodeId,
workspaceId: data.workspaceId,
});
} else {
this.sendSocketMessage(socketConnection, {
type: 'server_node_sync',
id: node.id,
workspaceId: data.workspaceId,
state: node.state!,
createdAt: node.created_at.toISOString(),
createdBy: node.created_by,
updatedAt: node.updated_at?.toISOString() ?? null,
updatedBy: node.updated_by ?? null,
serverCreatedAt: node.server_created_at.toISOString(),
serverUpdatedAt: node.server_updated_at?.toISOString() ?? null,
versionId: node.version_id,
});
}
}
}
}
private async handleNodeUserStateUpdateMessage(
data: SynapseNodeUserStateChangeMessage,
) {
const userDevices = this.getWorkspaceUserDevices(data.workspaceId);
if (!userDevices.has(data.userId)) {
return;
}
const userState = await database
.selectFrom('node_user_states')
.selectAll()
.where('user_id', '=', data.userId)
.where('node_id', '=', data.nodeId)
.executeTakeFirst();
if (!userState) {
return;
}
const deviceIds = userDevices.get(data.userId) ?? [];
for (const deviceId of deviceIds) {
const socketConnection = this.connections.get(deviceId);
if (socketConnection === undefined) {
continue;
}
this.sendSocketMessage(socketConnection, {
type: 'server_node_user_state_sync',
userId: data.userId,
nodeId: data.nodeId,
lastSeenVersionId: userState.last_seen_version_id,
workspaceId: data.workspaceId,
versionId: userState.version_id,
lastSeenAt: userState.last_seen_at?.toISOString() ?? null,
createdAt: userState.created_at.toISOString(),
updatedAt: userState.updated_at?.toISOString() ?? null,
mentionsCount: userState.mentions_count,
});
}
}
private sendPendingChangesDebounced(connection: SynapseConnection) {
if (connection.pendingSyncTimeout) {
clearTimeout(connection.pendingSyncTimeout);
}
connection.pendingSyncTimeout = setTimeout(async () => {
await this.sendPendingChanges(connection);
}, 500);
}
private async sendPendingChanges(connection: SynapseConnection) {
const userIds = connection.workspaceUsers.map(
(workspaceUser) => workspaceUser.userId,
);
console.log('sendPendingChanges', userIds);
if (userIds.length === 0) {
return;
}
const unsyncedNodes = await database
.selectFrom('node_user_states as nus')
.leftJoin('nodes as n', 'n.id', 'nus.node_id')
.leftJoin('node_device_states as nds', (join) =>
join
.onRef('nds.node_id', '=', 'nus.node_id')
.on('nds.device_id', '=', connection.deviceId),
)
.select([
'n.id',
'n.state',
'n.created_at',
'n.created_by',
'n.updated_at',
'n.updated_by',
'n.server_created_at',
'n.server_updated_at',
'nus.access_removed_at',
'n.version_id as node_version_id',
'nus.node_id',
'nus.user_id',
'nus.workspace_id',
'nus.last_seen_version_id',
'nus.last_seen_at',
'nus.mentions_count',
'nus.created_at',
'nus.updated_at',
'nus.version_id as user_state_version_id',
'nds.node_version_id as device_node_version_id',
'nds.user_state_version_id as device_user_state_version_id',
])
.where((eb) =>
eb.and([
eb('nus.user_id', 'in', userIds),
eb.or([
eb('n.id', 'is', null),
eb('nus.access_removed_at', 'is not', null),
eb('nds.node_version_id', 'is', null),
eb('nds.node_version_id', '!=', eb.ref('n.version_id')),
eb('nds.user_state_version_id', 'is', null),
eb('nds.user_state_version_id', '!=', eb.ref('nus.version_id')),
]),
]),
)
.orderBy('n.id', 'asc')
.limit(100)
.execute();
if (unsyncedNodes.length === 0) {
return;
}
for (const row of unsyncedNodes) {
connection.pendingSyncs.add(row.node_id);
if (row.id === null) {
this.sendSocketMessage(connection, {
type: 'server_node_delete',
id: row.node_id,
workspaceId: row.workspace_id,
});
continue;
}
if (row.user_state_version_id !== row.device_user_state_version_id) {
this.sendSocketMessage(connection, {
type: 'server_node_user_state_sync',
nodeId: row.node_id,
userId: row.user_id,
workspaceId: row.workspace_id,
versionId: row.user_state_version_id!,
lastSeenAt: row.last_seen_at?.toISOString() ?? null,
lastSeenVersionId: row.last_seen_version_id ?? null,
mentionsCount: row.mentions_count,
createdAt: row.created_at!.toISOString(),
updatedAt: row.updated_at?.toISOString() ?? null,
});
}
if (row.node_version_id !== row.device_node_version_id) {
this.sendSocketMessage(connection, {
type: 'server_node_sync',
id: row.id,
workspaceId: row.workspace_id,
state: row.state!,
createdAt: row.created_at!.toISOString(),
createdBy: row.created_by!,
updatedAt: row.updated_at?.toISOString() ?? null,
updatedBy: row.updated_by ?? null,
serverCreatedAt: row.server_created_at!.toISOString(),
serverUpdatedAt: row.server_updated_at?.toISOString() ?? null,
versionId: row.node_version_id!,
});
}
}
}
private async addNewWorkspaceUser(userId: string, workspaceId: string) {
const workspaceUser = await database
.selectFrom('workspace_users')
.selectAll()
.where('id', '=', userId)
.executeTakeFirst();
if (!workspaceUser) {
return;
}
const devices = await database
.selectFrom('devices')
.selectAll()
.where('account_id', '=', workspaceUser.account_id)
.execute();
for (const device of devices) {
const connection = this.connections.get(device.id);
if (!connection) {
continue;
}
if (connection.workspaceUsers.find((wu) => wu.userId === userId)) {
continue;
}
connection.workspaceUsers.push({
workspaceId,
userId,
});
}
}
private async fetchWorkspaceUsers(
connection: SynapseConnection,
): Promise<void> {
const workspaceUsers = await database
.selectFrom('workspace_users')
.selectAll()
.where('account_id', '=', connection.accountId)
.execute();
for (const workspaceUser of workspaceUsers) {
if (
!connection.workspaceUsers.find((wu) => wu.userId === workspaceUser.id)
) {
connection.workspaceUsers.push({
workspaceId: workspaceUser.workspace_id,
userId: workspaceUser.id,
});
}
}
}
private getWorkspaceUserDevices(workspaceId: string): Map<string, string[]> {
const userDevices = new Map<string, string[]>();
for (const connection of this.connections.values()) {
const workspaceUsers = connection.workspaceUsers;
for (const workspaceUser of workspaceUsers) {
if (workspaceUser.workspaceId !== workspaceId) {
continue;
}
const userIds = userDevices.get(workspaceUser.userId) ?? [];
userIds.push(connection.deviceId);
userDevices.set(workspaceUser.userId, userIds);
}
}
return userDevices;
}
}
export const synapse = new SynapseService();

View File

@@ -1,204 +0,0 @@
import { database } from '@/data/database';
import { MessageInput } from '@/messages';
import { NeuronRequestAccount } from '@/types/api';
import { WebSocket } from 'ws';
interface WorkspaceUser {
workspaceId: string;
userId: string;
}
export class SocketConnection {
private readonly socket: WebSocket;
private readonly accountId: string;
private readonly deviceId: string;
private readonly workspaceUsers: WorkspaceUser[];
private readonly pendingSyncs: Set<string>;
constructor(socket: WebSocket, account: NeuronRequestAccount) {
this.socket = socket;
this.accountId = account.id;
this.deviceId = account.deviceId;
this.workspaceUsers = [];
this.pendingSyncs = new Set();
socket.on('message', (message) => {
this.handleMessage(message.toString());
});
}
public send(message: MessageInput): void {
this.socket.send(JSON.stringify(message));
}
public init(): void {
this.fetchWorkspaceUsers().then(() => {
this.sendPendingChanges();
});
}
public getDeviceId(): string {
return this.deviceId;
}
public getWorkspaceUsers(): WorkspaceUser[] {
return this.workspaceUsers;
}
public addWorkspaceUser(workspaceUser: WorkspaceUser): void {
if (this.workspaceUsers.find((wu) => wu.userId === workspaceUser.userId)) {
return;
}
this.workspaceUsers.push(workspaceUser);
this.sendPendingChanges();
}
private async fetchWorkspaceUsers(): Promise<void> {
const workspaceUsers = await database
.selectFrom('workspace_users')
.selectAll()
.where('account_id', '=', this.accountId)
.execute();
for (const workspaceUser of workspaceUsers) {
this.workspaceUsers.push({
workspaceId: workspaceUser.workspace_id,
userId: workspaceUser.id,
});
}
}
private async handleMessage(message: string): Promise<void> {
const messageInput: MessageInput = JSON.parse(message);
console.log(messageInput);
if (messageInput.type === 'local_node_sync') {
this.pendingSyncs.delete(messageInput.nodeId);
await database
.insertInto('node_device_states')
.values({
node_id: messageInput.nodeId,
device_id: this.deviceId,
node_version_id: messageInput.versionId,
user_state_version_id: null,
user_state_synced_at: null,
workspace_id: messageInput.workspaceId,
node_synced_at: new Date(),
})
.onConflict((cb) =>
cb.columns(['node_id', 'device_id']).doUpdateSet({
workspace_id: messageInput.workspaceId,
node_version_id: messageInput.versionId,
node_synced_at: new Date(),
}),
)
.execute();
if (this.pendingSyncs.size === 0) {
this.sendPendingChanges();
}
} else if (messageInput.type === 'local_node_delete') {
this.pendingSyncs.delete(messageInput.nodeId);
await database
.deleteFrom('node_device_states')
.where('device_id', '=', this.deviceId)
.where('node_id', '=', messageInput.nodeId)
.execute();
const userId = this.workspaceUsers.find(
(wu) => wu.workspaceId === messageInput.workspaceId,
)?.userId;
if (userId) {
await database
.deleteFrom('node_user_states')
.where('node_id', '=', messageInput.nodeId)
.where('user_id', '=', userId)
.execute();
}
if (this.pendingSyncs.size === 0) {
this.sendPendingChanges();
}
}
}
private async sendPendingChanges() {
const userIds = this.workspaceUsers.map(
(workspaceUser) => workspaceUser.userId,
);
if (userIds.length === 0) {
return;
}
console.log('userIds', userIds);
const unsyncedNodes = await database
.selectFrom('node_user_states as nus')
.leftJoin('nodes as n', 'n.id', 'nus.node_id')
.leftJoin('node_device_states as nds', (join) =>
join
.onRef('nds.node_id', '=', 'n.id')
.on('nds.device_id', '=', this.deviceId),
)
.select([
'n.id',
'n.state',
'n.created_at',
'n.created_by',
'n.updated_at',
'n.updated_by',
'n.server_created_at',
'n.server_updated_at',
'nus.access_removed_at',
'n.version_id',
'nus.node_id',
'nus.workspace_id',
])
.where((eb) =>
eb.and([
eb('nus.user_id', 'in', userIds),
eb.or([
eb('n.id', 'is', null),
eb('nds.node_version_id', 'is', null),
eb('nds.node_version_id', '!=', eb.ref('n.version_id')),
eb('nus.access_removed_at', 'is not', null),
]),
]),
)
.orderBy('n.id', 'asc')
.limit(100)
.execute();
if (unsyncedNodes.length === 0) {
return;
}
for (const row of unsyncedNodes) {
this.pendingSyncs.add(row.node_id);
if (row.id === null) {
this.send({
type: 'server_node_delete',
id: row.node_id,
workspaceId: row.workspace_id,
});
} else {
this.send({
type: 'server_node_sync',
id: row.id,
workspaceId: row.workspace_id,
state: row.state!,
createdAt: row.created_at!.toISOString(),
createdBy: row.created_by!,
updatedAt: row.updated_at?.toISOString() ?? null,
updatedBy: row.updated_by ?? null,
serverCreatedAt: row.server_created_at!.toISOString(),
serverUpdatedAt: row.server_updated_at?.toISOString() ?? null,
versionId: row.version_id!,
});
}
}
}
}

View File

@@ -1,70 +0,0 @@
import http from 'http';
import { WebSocketServer } from 'ws';
import { verifyToken } from '@/lib/tokens';
import { SocketConnection } from '@/sockets/socket-connection';
import { MessageInput } from '@/messages';
class SocketManager {
private readonly sockets: Map<string, SocketConnection> = new Map();
public init(server: http.Server) {
const wss = new WebSocketServer({
server,
path: '/v1/synapse',
verifyClient: async (info, callback) => {
const req = info.req;
const token = req.headers['authorization'];
if (!token) {
return callback(false, 401, 'Unauthorized');
}
callback(true);
},
});
wss.on('connection', async (socket, req) => {
const token = req.headers['authorization'];
if (!token) {
socket.close();
return;
}
const result = await verifyToken(token);
if (!result.authenticated) {
socket.close();
return;
}
const account = result.account;
socket.on('close', () => {
this.sockets.delete(account.deviceId);
});
const connection = new SocketConnection(socket, account);
connection.init();
this.sockets.set(account.deviceId, connection);
});
}
public getConnections(): IterableIterator<SocketConnection> {
return this.sockets.values();
}
public getConnection(deviceId: string): SocketConnection | undefined {
return this.sockets.get(deviceId);
}
public send(deviceId: string, message: MessageInput) {
const connection = this.sockets.get(deviceId);
if (!connection) {
return;
}
connection.send(message);
}
}
export const socketManager = new SocketManager();

View File

@@ -32,3 +32,20 @@ export type NodeDeletedEvent = {
attributes: ServerNodeAttributes;
deletedAt: string;
};
export type SynapseNodeChangeMessage = {
workspaceId: string;
nodeId: string;
type: 'node_create' | 'node_update' | 'node_delete';
};
export type SynapseNodeUserStateChangeMessage = {
workspaceId: string;
nodeId: string;
userId: string;
type: 'node_user_state_update';
};
export type SynapseMessage =
| SynapseNodeChangeMessage
| SynapseNodeUserStateChangeMessage;

View File

@@ -0,0 +1,61 @@
export type LocalNodeSyncMessageInput = {
type: 'local_node_sync';
nodeId: string;
versionId: string;
workspaceId: string;
};
export type LocalNodeDeleteMessageInput = {
type: 'local_node_delete';
nodeId: string;
workspaceId: string;
};
export type LocalNodeUserStateSyncMessageInput = {
type: 'local_node_user_state_sync';
nodeId: string;
userId: string;
workspaceId: string;
versionId: string;
};
export type ServerNodeSyncMessageInput = {
type: 'server_node_sync';
id: string;
workspaceId: string;
state: string;
createdAt: string;
createdBy: string;
updatedAt: string | null;
updatedBy: string | null;
serverCreatedAt: string;
serverUpdatedAt: string | null;
versionId: string;
};
export type ServerNodeUserStateSyncMessageInput = {
type: 'server_node_user_state_sync';
nodeId: string;
userId: string;
workspaceId: string;
versionId: string;
lastSeenAt: string | null;
lastSeenVersionId: string | null;
mentionsCount: number;
createdAt: string;
updatedAt: string | null;
};
export type ServerNodeDeleteMessageInput = {
type: 'server_node_delete';
id: string;
workspaceId: string;
};
export type MessageInput =
| LocalNodeSyncMessageInput
| LocalNodeDeleteMessageInput
| ServerNodeSyncMessageInput
| ServerNodeDeleteMessageInput
| LocalNodeUserStateSyncMessageInput
| ServerNodeUserStateSyncMessageInput;

View File

@@ -44,13 +44,18 @@ export type LocalDeleteNodeChangeData = {
deletedBy: string;
};
export type LocalNodeUserStateChangeData = {
type: 'node_user_state_update';
nodeId: string;
userId: string;
lastSeenVersionId: string;
lastSeenAt: string;
mentionsCount: number;
versionId: string;
};
export type LocalNodeChangeData =
| LocalCreateNodeChangeData
| LocalUpdateNodeChangeData
| LocalDeleteNodeChangeData;
export type ServerNodeChangeEvent = {
workspaceId: string;
nodeId: string;
type: 'node_create' | 'node_update' | 'node_delete';
};
| LocalDeleteNodeChangeData
| LocalNodeUserStateChangeData;