Improve synchronization flow

This commit is contained in:
Hakan Shehu
2024-10-10 14:13:54 +02:00
parent cf72237e55
commit 4df3aefc41
54 changed files with 1529 additions and 1034 deletions

View File

@@ -41,7 +41,7 @@
"prettier": "^3.3.3",
"ts-node": "^10.9.2",
"tsconfig-paths": "^4.2.0",
"typescript": "^5.6.2"
"typescript": "^5.6.3"
}
},
"node_modules/@aws-crypto/crc32": {
@@ -4974,9 +4974,9 @@
"license": "MIT"
},
"node_modules/typescript": {
"version": "5.6.2",
"resolved": "https://registry.npmjs.org/typescript/-/typescript-5.6.2.tgz",
"integrity": "sha512-NW8ByodCSNCwZeghjN3o+JX5OFH0Ojg6sadjEKY4huZ52TqbJTJnDo5+Tw98lSy63NZvi4n+ez5m2u5d4PkZyw==",
"version": "5.6.3",
"resolved": "https://registry.npmjs.org/typescript/-/typescript-5.6.3.tgz",
"integrity": "sha512-hjcS1mhfuyi4WW8IWtjP7brDrG2cuDZukyrYrSauoXGNgx0S7zceP07adYkJycEr56BOUTNPzbInooiN3fn1qw==",
"dev": true,
"license": "Apache-2.0",
"bin": {

View File

@@ -26,7 +26,7 @@
"prettier": "^3.3.3",
"ts-node": "^10.9.2",
"tsconfig-paths": "^4.2.0",
"typescript": "^5.6.2"
"typescript": "^5.6.3"
},
"dependencies": {
"@aws-sdk/client-s3": "^3.665.0",

View File

@@ -8,7 +8,7 @@ import { authMiddleware } from '@/middlewares/auth';
import { syncRouter } from '@/routes/sync';
import { configRouter } from '@/routes/config';
import { avatarsRouter } from '@/routes/avatars';
import { synapse } from '@/synapse';
import { socketManager } from '@/sockets/socket-manager';
export const initApi = () => {
const app = express();
@@ -28,7 +28,7 @@ export const initApi = () => {
app.use('/v1/avatars', authMiddleware, avatarsRouter);
const server = http.createServer(app);
synapse.init(server);
socketManager.init(server);
server.listen(port, () => {
console.log(`Server is running at http://localhost:${port}`);

View File

@@ -2,7 +2,6 @@ import { kafka, TOPIC_NAMES, CONSUMER_IDS } from '@/data/kafka';
import { CdcMessage, ChangeCdcData } from '@/types/cdc';
import { redis, CHANNEL_NAMES } from '@/data/redis';
import { PostgresOperation } from '@/lib/constants';
import { database } from '@/data/database';
export const initChangeCdcConsumer = async () => {
const consumer = kafka.consumer({ groupId: CONSUMER_IDS.CHANGE_CDC });
@@ -52,18 +51,7 @@ const handleChangeCreate = async (change: CdcMessage<ChangeCdcData>) => {
};
const handleChangeUpdate = async (change: CdcMessage<ChangeCdcData>) => {
const changeData = change.after;
if (!changeData) {
return;
}
// if all devices have acknowledged the mutation, delete it
if (changeData.device_ids == null || changeData.device_ids.length == 0) {
await database
.deleteFrom('changes')
.where('id', '=', changeData.id)
.execute();
}
console.log('Change update:', change.after?.id);
};
const handleChangeDelete = async (change: CdcMessage<ChangeCdcData>) => {

View File

@@ -1,5 +1,5 @@
import { redis, CHANNEL_NAMES } from '@/data/redis';
import { synapse } from '@/synapse';
import { socketManager } from '@/sockets/socket-manager';
import { ServerChange } from '@/types/sync';
import { ChangeCdcData } from '@/types/cdc';
import { ServerChangeMessageInput } from '@/messages/server-change';
@@ -12,17 +12,13 @@ export const initChangesSubscriber = async () => {
const handleMessage = async (message: string) => {
const changeData = JSON.parse(message) as ChangeCdcData;
if (!changeData.device_ids || !changeData.device_ids.length) {
return;
}
const serverChange: ServerChange = {
id: changeData.id,
action: changeData.action as 'insert' | 'update' | 'delete',
table: changeData.table,
workspaceId: changeData.workspace_id,
before: changeData.before ? JSON.parse(changeData.before) : null,
after: changeData.after ? JSON.parse(changeData.after) : null,
deviceId: changeData.device_id,
data: JSON.parse(changeData.data),
createdAt: changeData.created_at,
};
const input: ServerChangeMessageInput = {
@@ -30,7 +26,5 @@ const handleMessage = async (message: string) => {
change: serverChange,
};
for (const deviceId of changeData.device_ids) {
synapse.send(deviceId, input);
}
socketManager.send(changeData.device_id, input);
};

View File

@@ -3,7 +3,11 @@ import { CdcMessage, NodeCdcData } from '@/types/cdc';
import { PostgresOperation } from '@/lib/constants';
import { database } from '@/data/database';
import { generateId, IdType } from '@/lib/id';
import { ServerNode } from '@/types/nodes';
import {
ServerNodeCreateChangeData,
ServerNodeDeleteChangeData,
ServerNodeUpdateChangeData,
} from '@/types/sync';
export const initNodeChangesConsumer = async () => {
const consumer = kafka.consumer({ groupId: CONSUMER_IDS.NODE_CDC });
@@ -54,18 +58,31 @@ const handleNodeCreate = async (change: CdcMessage<NodeCdcData>) => {
return;
}
const serverNode: ServerNode = mapNode(node);
const data: ServerNodeCreateChangeData = {
type: 'node_create',
id: node.id,
workspaceId: node.workspace_id,
state: node.state,
createdAt: node.created_at,
createdBy: node.created_by,
serverCreatedAt: node.server_created_at,
versionId: node.version_id,
};
await database
.insertInto('changes')
.values({
id: generateId(IdType.Change),
table: 'nodes',
action: 'insert',
workspace_id: node.workspace_id,
created_at: new Date(),
after: JSON.stringify(serverNode),
device_ids: deviceIds,
})
.values(
deviceIds.map((deviceId) => {
return {
id: generateId(IdType.Change),
device_id: deviceId,
workspace_id: node.workspace_id,
data: JSON.stringify(data),
created_at: new Date(),
retry_count: 0,
};
}),
)
.execute();
};
@@ -80,19 +97,31 @@ const handleNodeUpdate = async (change: CdcMessage<NodeCdcData>) => {
return;
}
const serverNode: ServerNode = mapNode(node);
const data: ServerNodeUpdateChangeData = {
type: 'node_update',
id: node.id,
workspaceId: node.workspace_id,
update: node.state,
updatedAt: node.updated_at ?? new Date().toISOString(),
updatedBy: node.updated_by ?? node.created_by,
serverUpdatedAt: node.server_updated_at ?? new Date().toISOString(),
versionId: node.version_id,
};
await database
.insertInto('changes')
.values({
id: generateId(IdType.Change),
table: 'nodes',
action: 'update',
workspace_id: node.workspace_id,
created_at: new Date(),
before: change.before ? JSON.stringify(change.before) : null,
after: JSON.stringify(serverNode),
device_ids: deviceIds,
})
.values(
deviceIds.map((deviceId) => {
return {
id: generateId(IdType.Change),
device_id: deviceId,
workspace_id: node.workspace_id,
data: JSON.stringify(data),
created_at: new Date(),
retry_count: 0,
};
}),
)
.execute();
};
@@ -107,19 +136,26 @@ const handleNodeDelete = async (change: CdcMessage<NodeCdcData>) => {
return;
}
const serverNode: ServerNode = mapNode(node);
const data: ServerNodeDeleteChangeData = {
type: 'node_delete',
id: node.id,
workspaceId: node.workspace_id,
};
await database
.insertInto('changes')
.values({
id: generateId(IdType.Change),
table: 'nodes',
action: 'delete',
workspace_id: node.workspace_id,
created_at: new Date(),
before: JSON.stringify(serverNode),
after: null,
device_ids: deviceIds,
})
.values(
deviceIds.map((deviceId) => {
return {
id: generateId(IdType.Change),
device_id: deviceId,
workspace_id: node.workspace_id,
data: JSON.stringify(data),
created_at: new Date(),
retry_count: 0,
};
}),
)
.execute();
};
@@ -140,24 +176,3 @@ const getDeviceIds = async (workspaceId: string) => {
const deviceIds = accountDevices.map((account) => account.id);
return deviceIds;
};
const mapNode = (node: NodeCdcData): ServerNode => {
return {
id: node.id,
workspaceId: node.workspace_id,
parentId: node.parent_id,
type: node.type,
index: node.index,
attributes: JSON.parse(node.attributes),
state: node.state,
createdAt: new Date(node.created_at),
createdBy: node.created_by,
updatedAt: node.updated_at ? new Date(node.updated_at) : null,
updatedBy: node.updated_by,
versionId: node.version_id,
serverCreatedAt: new Date(node.server_created_at),
serverUpdatedAt: node.server_updated_at
? new Date(node.server_updated_at)
: null,
};
};

View File

@@ -3,7 +3,11 @@ import { CdcMessage, NodeCollaboratorCdcData } from '@/types/cdc';
import { PostgresOperation } from '@/lib/constants';
import { database } from '@/data/database';
import { generateId, IdType } from '@/lib/id';
import { ServerNodeCollaborator } from '@/types/nodes';
import {
ServerNodeCollaboratorCreateChangeData,
ServerNodeCollaboratorDeleteChangeData,
ServerNodeCollaboratorUpdateChangeData,
} from '@/types/sync';
export const initNodeCollaboratorChangesConsumer = async () => {
const consumer = kafka.consumer({
@@ -60,19 +64,32 @@ const handleNodeCollaboratorCreate = async (
return;
}
const serverNodeCollaborator: ServerNodeCollaborator =
mapNodeCollaborator(nodeCollaborator);
const data: ServerNodeCollaboratorCreateChangeData = {
type: 'node_collaborator_create',
nodeId: nodeCollaborator.node_id,
collaboratorId: nodeCollaborator.collaborator_id,
role: nodeCollaborator.role,
workspaceId: nodeCollaborator.workspace_id,
createdAt: nodeCollaborator.created_at,
createdBy: nodeCollaborator.created_by,
versionId: nodeCollaborator.version_id,
serverCreatedAt: nodeCollaborator.server_created_at,
};
await database
.insertInto('changes')
.values({
id: generateId(IdType.Change),
table: 'node_collaborators',
action: 'insert',
workspace_id: nodeCollaborator.workspace_id,
created_at: new Date(),
after: JSON.stringify(serverNodeCollaborator),
device_ids: deviceIds,
})
.values(
deviceIds.map((deviceId) => {
return {
id: generateId(IdType.Change),
device_id: deviceId,
workspace_id: nodeCollaborator.workspace_id,
data: JSON.stringify(data),
created_at: new Date(),
retry_count: 0,
};
}),
)
.execute();
};
@@ -89,19 +106,33 @@ const handleNodeCollaboratorUpdate = async (
return;
}
const serverNodeCollaborator: ServerNodeCollaborator =
mapNodeCollaborator(nodeCollaborator);
const data: ServerNodeCollaboratorUpdateChangeData = {
type: 'node_collaborator_update',
nodeId: nodeCollaborator.node_id,
collaboratorId: nodeCollaborator.collaborator_id,
role: nodeCollaborator.role,
workspaceId: nodeCollaborator.workspace_id,
updatedAt: nodeCollaborator.updated_at ?? new Date().toISOString(),
updatedBy: nodeCollaborator.updated_by ?? nodeCollaborator.created_by,
versionId: nodeCollaborator.version_id,
serverUpdatedAt:
nodeCollaborator.server_updated_at ?? new Date().toISOString(),
};
await database
.insertInto('changes')
.values({
id: generateId(IdType.Change),
table: 'node_collaborators',
action: 'update',
workspace_id: nodeCollaborator.workspace_id,
created_at: new Date(),
after: JSON.stringify(serverNodeCollaborator),
device_ids: deviceIds,
})
.values(
deviceIds.map((deviceId) => {
return {
id: generateId(IdType.Change),
device_id: deviceId,
workspace_id: nodeCollaborator.workspace_id,
data: JSON.stringify(data),
created_at: new Date(),
retry_count: 0,
};
}),
)
.execute();
};
@@ -118,20 +149,27 @@ const handleNodeCollaboratorDelete = async (
return;
}
const serverNodeCollaborator: ServerNodeCollaborator =
mapNodeCollaborator(nodeCollaborator);
const data: ServerNodeCollaboratorDeleteChangeData = {
type: 'node_collaborator_delete',
nodeId: nodeCollaborator.node_id,
collaboratorId: nodeCollaborator.collaborator_id,
workspaceId: nodeCollaborator.workspace_id,
};
await database
.insertInto('changes')
.values({
id: generateId(IdType.Change),
table: 'node_collaborators',
action: 'delete',
workspace_id: nodeCollaborator.workspace_id,
created_at: new Date(),
before: JSON.stringify(serverNodeCollaborator),
after: null,
device_ids: deviceIds,
})
.values(
deviceIds.map((deviceId) => {
return {
id: generateId(IdType.Change),
device_id: deviceId,
workspace_id: nodeCollaborator.workspace_id,
data: JSON.stringify(data),
created_at: new Date(),
retry_count: 0,
};
}),
)
.execute();
};
@@ -152,25 +190,3 @@ const getDeviceIds = async (workspaceId: string) => {
const deviceIds = accountDevices.map((account) => account.id);
return deviceIds;
};
const mapNodeCollaborator = (
nodeCollaborator: NodeCollaboratorCdcData,
): ServerNodeCollaborator => {
return {
nodeId: nodeCollaborator.node_id,
collaboratorId: nodeCollaborator.collaborator_id,
role: nodeCollaborator.role,
workspaceId: nodeCollaborator.workspace_id,
createdAt: new Date(nodeCollaborator.created_at),
createdBy: nodeCollaborator.created_by,
updatedAt: nodeCollaborator.updated_at
? new Date(nodeCollaborator.updated_at)
: null,
updatedBy: nodeCollaborator.updated_by,
versionId: nodeCollaborator.version_id,
serverCreatedAt: new Date(nodeCollaborator.server_created_at),
serverUpdatedAt: nodeCollaborator.server_updated_at
? new Date(nodeCollaborator.server_updated_at)
: null,
};
};

View File

@@ -3,7 +3,10 @@ import { CdcMessage, NodeReactionCdcData } from '@/types/cdc';
import { PostgresOperation } from '@/lib/constants';
import { database } from '@/data/database';
import { generateId, IdType } from '@/lib/id';
import { ServerNodeReaction } from '@/types/nodes';
import {
ServerNodeReactionCreateChangeData,
ServerNodeReactionDeleteChangeData,
} from '@/types/sync';
export const initNodeReactionChangesConsumer = async () => {
const consumer = kafka.consumer({
@@ -56,18 +59,30 @@ const handleNodeReactionCreate = async (
return;
}
const serverNodeReaction: ServerNodeReaction = mapNodeReaction(reaction);
const data: ServerNodeReactionCreateChangeData = {
type: 'node_reaction_create',
nodeId: reaction.node_id,
actorId: reaction.actor_id,
reaction: reaction.reaction,
workspaceId: reaction.workspace_id,
createdAt: reaction.created_at,
serverCreatedAt: reaction.server_created_at,
};
await database
.insertInto('changes')
.values({
id: generateId(IdType.Change),
table: 'node_reactions',
action: 'insert',
workspace_id: reaction.workspace_id,
created_at: new Date(),
after: JSON.stringify(serverNodeReaction),
device_ids: deviceIds,
})
.values(
deviceIds.map((deviceId) => {
return {
id: generateId(IdType.Change),
device_id: deviceId,
workspace_id: reaction.workspace_id,
data: JSON.stringify(data),
created_at: new Date(),
retry_count: 0,
};
}),
)
.execute();
};
@@ -84,19 +99,28 @@ const handleNodeReactionDelete = async (
return;
}
const serverNodeReaction: ServerNodeReaction = mapNodeReaction(reaction);
const data: ServerNodeReactionDeleteChangeData = {
type: 'node_reaction_delete',
nodeId: reaction.node_id,
actorId: reaction.actor_id,
reaction: reaction.reaction,
workspaceId: reaction.workspace_id,
};
await database
.insertInto('changes')
.values({
id: generateId(IdType.Change),
table: 'node_reactions',
action: 'delete',
workspace_id: reaction.workspace_id,
created_at: new Date(),
before: JSON.stringify(serverNodeReaction),
after: null,
device_ids: deviceIds,
})
.values(
deviceIds.map((deviceId) => {
return {
id: generateId(IdType.Change),
device_id: deviceId,
workspace_id: reaction.workspace_id,
data: JSON.stringify(data),
created_at: new Date(),
retry_count: 0,
};
}),
)
.execute();
};
@@ -117,14 +141,3 @@ const getDeviceIds = async (workspaceId: string) => {
const deviceIds = accountDevices.map((account) => account.id);
return deviceIds;
};
const mapNodeReaction = (reaction: NodeReactionCdcData): ServerNodeReaction => {
return {
nodeId: reaction.node_id,
actorId: reaction.actor_id,
reaction: reaction.reaction,
workspaceId: reaction.workspace_id,
createdAt: new Date(reaction.created_at),
serverCreatedAt: new Date(reaction.server_created_at),
};
};

View File

@@ -192,13 +192,17 @@ const createChangesTable: Migration = {
await db.schema
.createTable('changes')
.addColumn('id', 'varchar(30)', (col) => col.notNull().primaryKey())
.addColumn('device_id', 'varchar(30)', (col) => col.notNull())
.addColumn('workspace_id', 'varchar(30)', (col) => col.notNull())
.addColumn('table', 'varchar(30)', (col) => col.notNull())
.addColumn('action', 'varchar(30)', (col) => col.notNull())
.addColumn('after', 'jsonb')
.addColumn('before', 'jsonb')
.addColumn('data', 'jsonb')
.addColumn('created_at', 'timestamptz', (col) => col.notNull())
.addColumn('device_ids', sql`text[]`, (col) => col.notNull())
.addColumn('retry_count', 'integer', (col) => col.notNull().defaultTo(0))
.execute();
await db.schema
.createIndex('changes_device_id_index')
.on('changes')
.column('device_id')
.execute();
},
down: async (db) => {

View File

@@ -139,13 +139,11 @@ export type UpdateNodeReaction = Updateable<NodeReactionTable>;
interface ChangeTable {
id: ColumnType<string, string, never>;
device_id: ColumnType<string, string, never>;
workspace_id: ColumnType<string, string, never>;
table: ColumnType<string, string, never>;
action: ColumnType<string, string, never>;
after: ColumnType<string | null, string | null, never>;
before: ColumnType<string | null, string | null, never>;
data: JSONColumnType<any, string | null, string | null>;
created_at: ColumnType<Date, Date, never>;
device_ids: ColumnType<string[], string[], string[]>;
retry_count: ColumnType<number, number, number>;
}
export type SelectChange = Selectable<ChangeTable>;

View File

@@ -1,4 +1,12 @@
export interface MessageMap {}
import { ServerChangeMessageInput } from '@/messages/server-change';
import { ServerChangeResultMessageInput } from '@/messages/server-change-result';
import { ServerChangeBatchMessageInput } from '@/messages/server-change-batch';
export interface MessageMap {
server_change: ServerChangeMessageInput;
server_change_result: ServerChangeResultMessageInput;
server_change_batch: ServerChangeBatchMessageInput;
}
export type MessageInput = MessageMap[keyof MessageMap];

View File

@@ -1,27 +0,0 @@
import { database } from '@/data/database';
import { MessageContext } from '@/messages';
import { sql } from 'kysely';
export type ServerChangeAckMessageInput = {
type: 'server_change_ack';
changeId: string;
};
declare module '@/messages' {
interface MessageMap {
server_change_ack: ServerChangeAckMessageInput;
}
}
export const handleChangeAck = async (
context: MessageContext,
input: ServerChangeAckMessageInput,
) => {
await database
.updateTable('changes')
.set({
device_ids: sql`array_remove(device_ids, ${context.deviceId})`,
})
.where('id', '=', input.changeId)
.execute();
};

View File

@@ -0,0 +1,6 @@
import { ServerChange } from '@/types/sync';
export type ServerChangeBatchMessageInput = {
type: 'server_change_batch';
changes: ServerChange[];
};

View File

@@ -0,0 +1,5 @@
export type ServerChangeResultMessageInput = {
type: 'server_change_result';
changeId: string;
success: boolean;
};

View File

@@ -4,9 +4,3 @@ export type ServerChangeMessageInput = {
type: 'server_change';
change: ServerChange;
};
declare module '@/messages' {
interface MessageMap {
server_change: ServerChangeMessageInput;
}
}

View File

@@ -0,0 +1,91 @@
import { database } from '@/data/database';
import { MessageInput } from '@/messages';
import { NeuronRequestAccount } from '@/types/api';
import { WebSocket } from 'ws';
export class SocketConnection {
private readonly socket: WebSocket;
private readonly account: NeuronRequestAccount;
private readonly pendingChanges: Set<string> = new Set();
constructor(socket: WebSocket, account: NeuronRequestAccount) {
this.socket = socket;
this.account = account;
socket.on('message', (message) => {
this.handleMessage(message.toString());
});
}
public send(message: MessageInput) {
if (message.type === 'server_change') {
const changeId = message.change.id;
if (this.pendingChanges.size > 0) {
return;
}
this.pendingChanges.add(changeId);
} else if (message.type === 'server_change_batch') {
if (this.pendingChanges.size > 0) {
return;
}
message.changes.forEach((change) => {
this.pendingChanges.add(change.id);
});
}
this.socket.send(JSON.stringify(message));
}
public init() {
this.sendPendingChanges();
}
private async handleMessage(message: string): Promise<void> {
const messageInput: MessageInput = JSON.parse(message);
if (messageInput.type === 'server_change_result') {
if (messageInput.success) {
await database
.deleteFrom('changes')
.where('id', '=', messageInput.changeId)
.execute();
} else {
await database
.updateTable('changes')
.set((eb) => ({ retry_count: eb('retry_count', '+', 1) }))
.where('id', '=', messageInput.changeId)
.execute();
}
this.pendingChanges.delete(messageInput.changeId);
if (this.pendingChanges.size === 0) {
this.sendPendingChanges();
}
}
}
private async sendPendingChanges() {
const changes = await database
.selectFrom('changes')
.selectAll()
.where('device_id', '=', this.account.deviceId)
.orderBy('id', 'asc')
.limit(100)
.execute();
if (changes.length === 0) {
return;
}
this.send({
type: 'server_change_batch',
changes: changes.map((change) => ({
id: change.id,
workspaceId: change.workspace_id,
deviceId: change.device_id,
createdAt: change.created_at.toISOString(),
data: change.data,
})),
});
}
}

View File

@@ -0,0 +1,62 @@
import http from 'http';
import { WebSocketServer } from 'ws';
import { verifyToken } from '@/lib/tokens';
import { SocketConnection } from '@/sockets/socket-connection';
import { MessageInput } from '@/messages';
class SocketManager {
private readonly sockets: Map<string, SocketConnection> = new Map();
public init(server: http.Server) {
const wss = new WebSocketServer({
server,
path: '/v1/synapse',
verifyClient: async (info, callback) => {
const req = info.req;
const token = req.headers['authorization'];
if (!token) {
return callback(false, 401, 'Unauthorized');
}
callback(true);
},
});
wss.on('connection', async (socket, req) => {
const token = req.headers['authorization'];
if (!token) {
socket.close();
return;
}
const result = await verifyToken(token);
if (!result.authenticated) {
socket.close();
return;
}
const account = result.account;
socket.on('close', () => {
this.sockets.delete(account.deviceId);
});
const connection = new SocketConnection(socket, account);
connection.init();
this.sockets.set(account.deviceId, connection);
});
}
public send(deviceId: string, message: MessageInput) {
const connection = this.sockets.get(deviceId);
if (!connection) {
return;
}
connection.send(message);
}
}
export const socketManager = new SocketManager();

View File

@@ -1,128 +0,0 @@
import http from 'http';
import { WebSocket, WebSocketServer } from 'ws';
import { database } from '@/data/database';
import { sql } from 'kysely';
import { SelectChange } from '@/data/schema';
import { ServerChange } from '@/types/sync';
import { MessageInput } from '@/messages';
import { handleChangeAck } from '@/messages/server-change-ack';
import { verifyToken } from '@/lib/tokens';
import { NeuronRequestAccount } from '@/types/api';
interface SynapseConnection {
socket: WebSocket;
account: NeuronRequestAccount;
}
class SynapseManager {
private readonly sockets: Map<string, SynapseConnection> = new Map();
public init(server: http.Server) {
const wss = new WebSocketServer({
server,
path: '/v1/synapse',
verifyClient: async (info, callback) => {
const req = info.req;
const token = req.headers['authorization'];
if (!token) {
return callback(false, 401, 'Unauthorized');
}
callback(true);
},
});
wss.on('connection', async (socket, req) => {
const token = req.headers['authorization'];
if (!token) {
socket.close();
return;
}
const result = await verifyToken(token);
if (!result.authenticated) {
socket.close();
return;
}
const account = result.account;
socket.on('message', (message) => {
this.handleMessage(account, message.toString());
});
socket.on('close', () => {
this.sockets.delete(account.deviceId);
});
this.sockets.set(account.deviceId, {
socket,
account,
});
await this.sendPendingChanges(account.deviceId);
});
}
public send(deviceId: string, message: MessageInput) {
const connection = this.sockets.get(deviceId);
if (!connection || !connection.socket) {
return;
}
connection.socket.send(JSON.stringify(message));
}
private async handleMessage(
account: NeuronRequestAccount,
message: string,
): Promise<void> {
const messageInput: MessageInput = JSON.parse(message);
if (messageInput.type === 'server_change_ack') {
await handleChangeAck(
{
accountId: account.id,
deviceId: account.deviceId,
},
messageInput,
);
}
}
private async sendPendingChanges(deviceId: string) {
let lastId = '0';
let hasMore = true;
while (hasMore) {
const pendingChanges = await sql<SelectChange>`
SELECT *
FROM changes
WHERE ${deviceId} = ANY(device_ids)
AND id > ${lastId}
ORDER BY id ASC
LIMIT 50
`.execute(database);
for (const change of pendingChanges.rows) {
const serverChange: ServerChange = {
id: change.id,
action: change.action as 'insert' | 'update' | 'delete',
table: change.table,
workspaceId: change.workspace_id,
before: change.before,
after: change.after,
};
this.send(deviceId, {
type: 'server_change',
change: serverChange,
});
lastId = change.id;
}
hasMore = pendingChanges.rows.length === 50;
}
}
}
export const synapse = new SynapseManager();

View File

@@ -27,13 +27,10 @@ type CdcSource = {
export type ChangeCdcData = {
id: string;
device_id: string;
workspace_id: string;
table: string;
action: string;
after: string | null;
before: string | null;
data: string;
created_at: string;
device_ids: string[];
};
export type NodeCdcData = {

View File

@@ -59,9 +59,95 @@ export type LocalNodeReactionChangeData = {
export type ServerChange = {
id: string;
table: string;
action: 'insert' | 'update' | 'delete';
workspaceId: string;
before: any | null;
after: any | null;
deviceId: string;
data: ServerChangeData;
createdAt: string;
};
export type ServerChangeData =
| ServerNodeCreateChangeData
| ServerNodeUpdateChangeData
| ServerNodeDeleteChangeData
| ServerNodeCollaboratorCreateChangeData
| ServerNodeCollaboratorUpdateChangeData
| ServerNodeCollaboratorDeleteChangeData
| ServerNodeReactionCreateChangeData
| ServerNodeReactionDeleteChangeData;
export type ServerNodeCreateChangeData = {
type: 'node_create';
id: string;
workspaceId: string;
state: string;
createdAt: string;
createdBy: string;
versionId: string;
serverCreatedAt: string;
};
export type ServerNodeUpdateChangeData = {
type: 'node_update';
id: string;
workspaceId: string;
update: string;
updatedAt: string;
updatedBy: string;
versionId: string;
serverUpdatedAt: string;
};
export type ServerNodeDeleteChangeData = {
type: 'node_delete';
id: string;
workspaceId: string;
};
export type ServerNodeCollaboratorCreateChangeData = {
type: 'node_collaborator_create';
nodeId: string;
collaboratorId: string;
role: string;
workspaceId: string;
createdAt: string;
createdBy: string;
versionId: string;
serverCreatedAt: string;
};
export type ServerNodeCollaboratorUpdateChangeData = {
type: 'node_collaborator_update';
nodeId: string;
collaboratorId: string;
workspaceId: string;
role: string;
updatedAt: string;
updatedBy: string;
versionId: string;
serverUpdatedAt: string;
};
export type ServerNodeCollaboratorDeleteChangeData = {
type: 'node_collaborator_delete';
nodeId: string;
collaboratorId: string;
workspaceId: string;
};
export type ServerNodeReactionCreateChangeData = {
type: 'node_reaction_create';
nodeId: string;
actorId: string;
reaction: string;
workspaceId: string;
createdAt: string;
serverCreatedAt: string;
};
export type ServerNodeReactionDeleteChangeData = {
type: 'node_reaction_delete';
nodeId: string;
actorId: string;
reaction: string;
workspaceId: string;
};