Add a buffer for messages until the socket is authenticated

This commit is contained in:
Hakan Shehu
2024-11-26 23:59:16 +01:00
parent fa169b2f7e
commit f2feaf5a7b

View File

@@ -66,6 +66,28 @@ class SynapseService {
});
wss.on('connection', async (socket, req) => {
let verified = false;
const messageBuffer: Message[] = [];
socket.onmessage = async (event) => {
let data: string;
if (typeof event.data === 'string') {
data = event.data;
} else if (event.data instanceof ArrayBuffer) {
data = new TextDecoder().decode(event.data);
} else {
console.error('Unsupported message data type:', typeof event.data);
return;
}
const message: Message = JSON.parse(data);
if (!verified) {
messageBuffer.push(message);
} else {
this.handleSocketMessage(connection, message);
}
};
const token = req.headers['authorization'];
if (!token) {
socket.close();
@@ -81,12 +103,12 @@ class SynapseService {
const account = result.account;
this.logger.info(`New synapse connection from ${account.id}`);
socket.on('close', () => {
socket.onclose = () => {
const connection = this.connections.get(account.deviceId);
if (connection) {
this.connections.delete(account.deviceId);
}
});
};
const connection: SynapseConnection = {
accountId: account.id,
@@ -96,11 +118,13 @@ class SynapseService {
collaborations: new Map(),
};
socket.on('message', (message) => {
this.handleSocketMessage(connection, JSON.parse(message.toString()));
});
this.connections.set(account.deviceId, connection);
verified = true;
for (const message of messageBuffer) {
this.handleSocketMessage(connection, message);
}
messageBuffer.splice(0, messageBuffer.length);
});
}
@@ -125,7 +149,7 @@ class SynapseService {
});
this.sendPendingTransactions(connection, message.userId);
} else if (!state.syncing && !(state.cursor, message.cursor)) {
} else if (!state.syncing && state.cursor !== message.cursor) {
state.cursor = message.cursor;
this.sendPendingTransactions(connection, message.userId);
}
@@ -140,7 +164,7 @@ class SynapseService {
});
this.sendPendingCollaborations(connection, message.userId);
} else if (!state.syncing && !(state.cursor, message.cursor)) {
} else if (!state.syncing && state.cursor !== message.cursor) {
state.cursor = message.cursor;
this.sendPendingCollaborations(connection, message.userId);
}