Improve web socket connections

This commit is contained in:
Hakan Shehu
2024-10-07 19:17:52 +02:00
parent 1b9f0acc5c
commit 68a0f4c6a1
21 changed files with 227 additions and 94 deletions

View File

@@ -1,7 +1,5 @@
import { hashCode } from '@/lib/utils';
import { getEmojiUrl } from '@/lib/emojis';
import { getIconUrl } from '@/lib/icons';
import { getIdType, IdType } from '@/lib/id';
import { IdType } from '@/lib/id';
export const getAvatarSizeClasses = (size?: string) => {
if (size === 'small') {

View File

@@ -1,10 +1,12 @@
import { MessageHandler, MessageMap } from '@/operations/messages';
import { ServerMutationMessageHandler } from '@/main/handlers/messages/server-mutation';
import { ServerChangeMessageHandler } from '@/main/handlers/messages/server-change';
import { ServerChangeAckMessageHandler } from '@/main/handlers/messages/server-change-ack';
type MessageHandlerMap = {
[K in keyof MessageMap]: MessageHandler<MessageMap[K]>;
};
export const messageHandlerMap: MessageHandlerMap = {
server_mutation: new ServerMutationMessageHandler(),
server_change: new ServerChangeMessageHandler(),
server_change_ack: new ServerChangeAckMessageHandler(),
};

View File

@@ -0,0 +1,14 @@
import { MessageContext, MessageHandler } from '@/operations/messages';
import { ServerChangeAckMessageInput } from '@/operations/messages/server-change-ack';
import { socketManager } from '@/main/sockets/socket-manager';
export class ServerChangeAckMessageHandler
implements MessageHandler<ServerChangeAckMessageInput>
{
public async handleMessage(
context: MessageContext,
input: ServerChangeAckMessageInput,
): Promise<void> {
socketManager.sendMessage(context.accountId, input);
}
}

View File

@@ -1,16 +1,19 @@
import { MessageHandler } from '@/operations/messages';
import { ServerMutationMessageInput } from '@/operations/messages/server-mutation';
import { MessageContext, MessageHandler } from '@/operations/messages';
import { ServerChangeMessageInput } from '@/operations/messages/server-change';
import { mediator } from '@/main/mediator';
export class ServerMutationMessageHandler
implements MessageHandler<ServerMutationMessageInput>
export class ServerChangeMessageHandler
implements MessageHandler<ServerChangeMessageInput>
{
public async handleMessage(input: ServerMutationMessageInput): Promise<void> {
public async handleMessage(
context: MessageContext,
input: ServerChangeMessageInput,
): Promise<void> {
if (input.change.table === 'nodes' && input.change.workspaceId) {
await mediator.executeMutation({
type: 'node_sync',
id: input.change.id,
accountId: input.accountId,
accountId: context.accountId,
workspaceId: input.change.workspaceId,
action: input.change.action,
after: input.change.after,
@@ -23,7 +26,7 @@ export class ServerMutationMessageHandler
await mediator.executeMutation({
type: 'node_reaction_sync',
id: input.change.id,
accountId: input.accountId,
accountId: context.accountId,
workspaceId: input.change.workspaceId,
action: input.change.action,
after: input.change.after,
@@ -36,12 +39,23 @@ export class ServerMutationMessageHandler
await mediator.executeMutation({
type: 'node_collaborator_sync',
id: input.change.id,
accountId: input.accountId,
accountId: context.accountId,
workspaceId: input.change.workspaceId,
action: input.change.action,
after: input.change.after,
before: input.change.before,
});
}
await mediator.executeMessage(
{
accountId: context.accountId,
deviceId: context.deviceId,
},
{
type: 'server_change_ack',
changeId: input.change.id,
},
);
}
}

View File

@@ -8,7 +8,6 @@ export class NodeReactionCreateMutationHandler
async handleMutation(
input: NodeReactionCreateMutationInput,
): Promise<MutationResult<NodeReactionCreateMutationInput>> {
console.log('create node reaction', input);
const workspaceDatabase = await databaseManager.getWorkspaceDatabase(
input.userId,
);

View File

@@ -13,7 +13,11 @@ import {
} from '@/operations/queries';
import { queryHandlerMap } from '@/main/handlers/queries';
import { eventBus } from '@/lib/event-bus';
import { MessageHandler, MessageInput } from '@/operations/messages';
import {
MessageContext,
MessageHandler,
MessageInput,
} from '@/operations/messages';
import { messageHandlerMap } from '@/main/handlers/messages';
class Mediator {
@@ -54,9 +58,12 @@ class Mediator {
return result.output;
}
public async executeMessage<T extends MessageInput>(input: T): Promise<void> {
public async executeMessage<T extends MessageInput>(
context: MessageContext,
input: T,
): Promise<void> {
const handler = messageHandlerMap[input.type] as MessageHandler<T>;
await handler.handleMessage(input);
await handler.handleMessage(context, input);
}
public unsubscribeQuery(id: string) {

View File

@@ -1,7 +1,7 @@
import { WebSocket } from 'ws';
import { buildSynapseUrl } from '@/lib/servers';
import { BackoffCalculator } from '@/lib/backoff-calculator';
import { MessageInput } from '@/operations/messages';
import { MessageContext, MessageInput } from '@/operations/messages';
import { mediator } from '@/main/mediator';
import { SelectAccount, SelectServer } from '@/main/data/app/schema';
@@ -50,8 +50,12 @@ export class SocketConnection {
return;
}
const context: MessageContext = {
accountId: this.account.id,
deviceId: this.account.device_id,
};
const message: MessageInput = JSON.parse(data);
await mediator.executeMessage(message);
await mediator.executeMessage(context, message);
};
this.socket.onopen = () => {
@@ -67,6 +71,12 @@ export class SocketConnection {
return this.socket !== null && this.socket.readyState === WebSocket.OPEN;
}
public sendMessage(message: MessageInput): void {
if (this.socket) {
this.socket.send(JSON.stringify(message));
}
}
public close(): void {
if (this.socket) {
this.socket.close();

View File

@@ -1,5 +1,6 @@
import { SocketConnection } from '@/main/sockets/socket-connection';
import { databaseManager } from '@/main/data/database-manager';
import { MessageInput } from '@/operations/messages';
const EVENT_LOOP_INTERVAL = 5000;
@@ -21,6 +22,15 @@ class SocketManager {
this.initiated = true;
}
public sendMessage(accountId: string, message: MessageInput) {
const connection = this.accounts.get(accountId);
if (!connection) {
return;
}
connection.sendMessage(message);
}
private async executeEventLoop() {
await this.checkAccounts();
this.checkConnections();

View File

@@ -22,12 +22,12 @@ class Synchronizer {
}
private async executeEventLoop() {
// try {
await this.checkForWorkspaceSyncs();
await this.checkForWorkspaceChanges();
// } catch (error) {
// console.log('error', error);
// }
try {
await this.checkForWorkspaceSyncs();
await this.checkForWorkspaceChanges();
} catch (error) {
console.log('error', error);
}
setTimeout(this.executeEventLoop, EVENT_LOOP_INTERVAL);
}

View File

@@ -2,6 +2,11 @@ export interface MessageMap {}
export type MessageInput = MessageMap[keyof MessageMap];
export type MessageContext = {
accountId: string;
deviceId: string;
};
export interface MessageHandler<T extends MessageInput> {
handleMessage: (input: T) => Promise<void>;
handleMessage: (context: MessageContext, input: T) => Promise<void>;
}

View File

@@ -0,0 +1,10 @@
export type ServerChangeAckMessageInput = {
type: 'server_change_ack';
changeId: string;
};
declare module '@/operations/messages' {
interface MessageMap {
server_change_ack: ServerChangeAckMessageInput;
}
}

View File

@@ -1,13 +1,12 @@
import { ServerChange } from '@/types/sync';
export type ServerMutationMessageInput = {
type: 'server_mutation';
accountId: string;
export type ServerChangeMessageInput = {
type: 'server_change';
change: ServerChange;
};
declare module '@/operations/messages' {
interface MessageMap {
server_mutation: ServerMutationMessageInput;
server_change: ServerChangeMessageInput;
}
}

View File

@@ -37,8 +37,7 @@ export const Avatar = (props: AvatarProps) => {
};
const AvatarFallback = ({ id, name, size, className }: AvatarProps) => {
const idType = getIdType(id);
if (idType === IdType.User && name) {
if (name) {
const color = getColorForId(id);
return (
<div
@@ -54,6 +53,7 @@ const AvatarFallback = ({ id, name, size, className }: AvatarProps) => {
);
}
const idType = getIdType(id);
const icon = getDefaultNodeIcon(idType);
return (
<Icon name={icon} className={cn(getAvatarSizeClasses(size), className)} />

View File

@@ -1,7 +1,6 @@
import express, { Request, Response } from 'express';
import cors from 'cors';
import http from 'http';
import { WebSocketServer } from 'ws';
import { accountsRouter } from '@/routes/accounts';
import { workspacesRouter } from '@/routes/workspaces';
@@ -18,7 +17,7 @@ export const initApi = () => {
app.use(express.json());
app.use(cors());
app.get('/', (req: Request, res: Response) => {
app.get('/', (_: Request, res: Response) => {
res.send('Neuron');
});
@@ -29,15 +28,7 @@ export const initApi = () => {
app.use('/v1/avatars', authMiddleware, avatarsRouter);
const server = http.createServer(app);
const wss = new WebSocketServer({
server,
path: '/v1/synapse',
});
wss.on('connection', async (socket, req) => {
await synapse.addConnection(socket, req);
});
synapse.init(server);
server.listen(port, () => {
console.log(`Server is running at http://localhost:${port}`);

View File

@@ -2,6 +2,7 @@ import { redis, CHANNEL_NAMES } from '@/data/redis';
import { synapse } from '@/synapse';
import { ServerChange } from '@/types/sync';
import { ChangeCdcData } from '@/types/cdc';
import { ServerChangeMessageInput } from '@/messages/server-change';
export const initChangesSubscriber = async () => {
const subscriber = redis.duplicate();
@@ -24,10 +25,12 @@ const handleMessage = async (message: string) => {
after: changeData.after ? JSON.parse(changeData.after) : null,
};
const input: ServerChangeMessageInput = {
type: 'server_change',
change: serverChange,
};
for (const deviceId of changeData.device_ids) {
synapse.send(deviceId, {
type: 'change',
payload: serverChange,
});
synapse.send(deviceId, input);
}
};

View File

@@ -2,7 +2,7 @@ import { initApi } from '@/api';
import { initRedis } from '@/data/redis';
import { initNodeChangesConsumer } from '@/consumers/node-cdc';
import { initChangeCdcConsumer } from '@/consumers/change-cdc';
import { initChangesSubscriber } from '@/consumers/mutations';
import { initChangesSubscriber } from '@/consumers/changes-subcriber';
import { initNodeCollaboratorChangesConsumer } from '@/consumers/node-collaborator-cdc';
import { initNodeReactionChangesConsumer } from '@/consumers/node-reaction-cdc';
import { migrate } from '@/data/database';

View File

@@ -0,0 +1,8 @@
export interface MessageMap {}
export type MessageInput = MessageMap[keyof MessageMap];
export type MessageContext = {
accountId: string;
deviceId: string;
};

View File

@@ -0,0 +1,27 @@
import { database } from '@/data/database';
import { MessageContext } from '@/messages';
import { sql } from 'kysely';
export type ServerChangeAckMessageInput = {
type: 'server_change_ack';
changeId: string;
};
declare module '@/messages' {
interface MessageMap {
server_change_ack: ServerChangeAckMessageInput;
}
}
export const handleChangeAck = async (
context: MessageContext,
input: ServerChangeAckMessageInput,
) => {
await database
.updateTable('changes')
.set({
device_ids: sql`array_remove(device_ids, ${context.deviceId})`,
})
.where('id', '=', input.changeId)
.execute();
};

View File

@@ -0,0 +1,12 @@
import { ServerChange } from '@/types/sync';
export type ServerChangeMessageInput = {
type: 'server_change';
change: ServerChange;
};
declare module '@/messages' {
interface MessageMap {
server_change: ServerChangeMessageInput;
}
}

View File

@@ -1,67 +1,94 @@
import { WebSocket } from 'ws';
import { IncomingMessage } from 'http';
import { SocketMessage } from '@/types/sockets';
import http from 'http';
import { WebSocket, WebSocketServer } from 'ws';
import { database } from '@/data/database';
import { sql } from 'kysely';
import { SelectChange } from '@/data/schema';
import { ServerChange } from '@/types/sync';
import { MessageInput } from '@/messages';
import { handleChangeAck } from '@/messages/server-change-ack';
import { verifyToken } from '@/lib/tokens';
import { NeuronRequestAccount } from '@/types/api';
interface SynapseConnection {
socket: WebSocket;
account: NeuronRequestAccount;
}
class SynapseManager {
private readonly sockets: Map<string, WebSocket> = new Map();
private readonly sockets: Map<string, SynapseConnection> = new Map();
public async addConnection(socket: WebSocket, req: IncomingMessage) {
const deviceId = req.url?.split('device_id=')[1];
if (!deviceId) {
socket.close();
return;
}
public init(server: http.Server) {
const wss = new WebSocketServer({
server,
path: '/v1/synapse',
verifyClient: async (info, callback) => {
const req = info.req;
const token = req.headers['authorization'];
socket.on('message', (message) => {
this.handleMessage(deviceId, message.toString());
if (!token) {
return callback(false, 401, 'Unauthorized');
}
callback(true);
},
});
socket.on('close', () => {
this.sockets.delete(deviceId);
});
wss.on('connection', async (socket, req) => {
const token = req.headers['authorization'];
if (!token) {
socket.close();
return;
}
this.sockets.set(deviceId, socket);
await this.sendPendingChanges(deviceId);
const result = await verifyToken(token);
if (!result.authenticated) {
socket.close();
return;
}
const account = result.account;
socket.on('message', (message) => {
this.handleMessage(account.deviceId, message.toString());
});
socket.on('close', () => {
this.sockets.delete(account.deviceId);
});
this.sockets.set(account.deviceId, {
socket,
account,
});
await this.sendPendingChanges(account.deviceId);
});
}
public send(deviceId: string, message: SocketMessage) {
const socket = this.sockets.get(deviceId);
if (!socket) {
public send(deviceId: string, message: MessageInput) {
const connection = this.sockets.get(deviceId);
if (!connection || !connection.socket) {
return;
}
socket.send(JSON.stringify(message));
connection.socket.send(JSON.stringify(message));
}
private async handleMessage(
deviceId: string,
account: NeuronRequestAccount,
message: string,
): Promise<void> {
const socketMessage: SocketMessage = JSON.parse(message);
if (socketMessage.type === 'mutation_ack') {
await this.handleMutationAck(deviceId, socketMessage);
const messageInput: MessageInput = JSON.parse(message);
if (messageInput.type === 'server_change_ack') {
await handleChangeAck(
{
accountId: account.id,
deviceId: account.deviceId,
},
messageInput,
);
}
}
private async handleMutationAck(deviceId: string, message: SocketMessage) {
const mutationId = message.payload.id;
if (!mutationId) {
return;
}
await database
.updateTable('changes')
.set({
device_ids: sql`array_remove(device_ids, ${deviceId})`,
})
.where('id', '=', mutationId)
.execute();
}
private async sendPendingChanges(deviceId: string) {
let lastId = '0';
let hasMore = true;
@@ -85,9 +112,10 @@ class SynapseManager {
before: change.before,
after: change.after,
};
this.send(deviceId, {
type: 'change',
payload: serverChange,
type: 'server_change',
change: serverChange,
});
lastId = change.id;
}

View File

@@ -1,4 +0,0 @@
export type SocketMessage = {
type: string;
payload: any;
};