Refactor sync service

This commit is contained in:
Hakan Shehu
2024-12-11 11:43:56 +01:00
parent 689fa5e7d0
commit 1c8e6a6ecf
22 changed files with 1609 additions and 1100 deletions

View File

@@ -6,7 +6,7 @@ import path from 'path';
import { metadataService } from '@/main/services/metadata-service';
import { WindowSize } from '@/shared/types/metadata';
import { createDebugger } from '@/main/debugger';
import { bootstrapper } from '@/main/bootstrapper';
import { scheduler } from '@/main/scheduler';
import { assetService } from '@/main/services/asset-service';
import { avatarService } from '@/main/services/avatar-service';
import { commandService } from '@/main/services/command-service';
@@ -30,7 +30,7 @@ if (started) {
}
const createWindow = async () => {
await bootstrapper.init();
await scheduler.init();
// Create the browser window.
let windowSize = await metadataService.get<WindowSize>('window_size');
@@ -152,7 +152,7 @@ app.on('activate', () => {
// In this file you can include the rest of your app's specific main process
// code. You can also put them in separate files and import them here.
ipcMain.handle('init', async () => {
await bootstrapper.init();
await scheduler.init();
});
ipcMain.handle(

View File

@@ -1,61 +0,0 @@
import { createDebugger } from '@/main/debugger';
import { databaseService } from '@/main/data/database-service';
import { accountService } from '@/main/services/account-service';
import { fileService } from '@/main/services/file-service';
import { notificationService } from '@/main/services/notification-service';
import { radarService } from '@/main/services/radar-service';
import { serverService } from '@/main/services/server-service';
import { socketService } from '@/main/services/socket-service';
import { syncService } from '@/main/services/sync-service';
// one minute
const EVENT_LOOP_INTERVAL = 1000 * 60;
class Bootstrapper {
private readonly debug = createDebugger('bootstrapper');
private initPromise: Promise<void> | null = null;
private eventLoop: NodeJS.Timeout | null = null;
constructor() {
this.executeEventLoop = this.executeEventLoop.bind(this);
}
public init(): Promise<void> {
if (!this.initPromise) {
this.initPromise = this.executeInit();
}
return this.initPromise;
}
private async executeInit() {
this.debug('Initializing');
await databaseService.init();
radarService.init();
if (!this.eventLoop) {
this.eventLoop = setTimeout(this.executeEventLoop, 50);
}
}
private async executeEventLoop() {
this.debug('Executing event loop');
try {
await serverService.syncServers();
await accountService.syncAccounts();
await socketService.checkConnections();
await syncService.syncAllWorkspaces();
await fileService.syncFiles();
notificationService.checkBadge();
} catch (error) {
this.debug(error, 'Error executing event loop');
}
this.eventLoop = setTimeout(this.executeEventLoop, EVENT_LOOP_INTERVAL);
}
}
export const bootstrapper = new Bootstrapper();

View File

@@ -0,0 +1,40 @@
import { databaseService } from '@/main/data/database-service';
import { createDebugger } from '@/main/debugger';
import { socketService } from '@/main/services/socket-service';
import { JobHandler } from '@/main/jobs';
export type ConnectSocketInput = {
type: 'connect_socket';
accountId: string;
};
declare module '@/main/jobs' {
interface JobMap {
connect_socket: {
input: ConnectSocketInput;
};
}
}
export class ConnectSocketJobHandler implements JobHandler<ConnectSocketInput> {
public triggerDebounce = 0;
public interval = 1000 * 60;
private readonly debug = createDebugger('job:connect-socket');
public async handleJob(input: ConnectSocketInput) {
const account = await databaseService.appDatabase
.selectFrom('accounts')
.selectAll()
.where('id', '=', input.accountId)
.executeTakeFirst();
if (!account) {
this.debug(`Account ${input.accountId} not found`);
return;
}
this.debug(`Connecting to socket for account ${account.email}`);
socketService.checkConnection(account);
}
}

View File

@@ -0,0 +1,10 @@
// eslint-disable-next-line @typescript-eslint/no-empty-object-type
export interface JobMap {}
export type JobInput = JobMap[keyof JobMap]['input'];
export interface JobHandler<T extends JobInput> {
triggerDebounce: number;
interval: number;
handleJob: (input: T) => Promise<void>;
}

View File

@@ -0,0 +1,27 @@
import { syncService } from '@/main/services/sync-service';
import { JobHandler } from '@/main/jobs';
export type InitSyncConsumersInput = {
type: 'init_sync_consumers';
accountId: string;
userId: string;
};
declare module '@/main/jobs' {
interface JobMap {
init_sync_consumers: {
input: InitSyncConsumersInput;
};
}
}
export class InitSyncConsumersJobHandler
implements JobHandler<InitSyncConsumersInput>
{
public triggerDebounce = 100;
public interval = 1000 * 60;
public async handleJob(input: InitSyncConsumersInput) {
syncService.initUserConsumers(input.accountId, input.userId);
}
}

View File

@@ -0,0 +1,61 @@
import { nodeService } from '@/main/services/node-service';
import { createDebugger } from '@/main/debugger';
import { databaseService } from '@/main/data/database-service';
import { JobHandler } from '@/main/jobs';
import { mapTransaction } from '@/main/utils';
export type RevertInvalidTransactionsInput = {
type: 'revert_invalid_transactions';
userId: string;
};
declare module '@/main/jobs' {
interface JobMap {
revert_invalid_transactions: {
input: RevertInvalidTransactionsInput;
};
}
}
export class RevertInvalidTransactionsJobHandler
implements JobHandler<RevertInvalidTransactionsInput>
{
public triggerDebounce = 100;
public interval = 1000 * 60 * 5;
private readonly debug = createDebugger('job:revert-invalid-transactions');
public async handleJob(input: RevertInvalidTransactionsInput) {
this.debug(`Reverting invalid transactions for user ${input.userId}`);
const workspaceDatabase = await databaseService.getWorkspaceDatabase(
input.userId
);
const invalidTransactions = await workspaceDatabase
.selectFrom('transactions')
.selectAll()
.where('status', '=', 'pending')
.where('retry_count', '>=', 10)
.execute();
if (invalidTransactions.length === 0) {
this.debug(
`No invalid transactions found for user ${input.userId}, skipping`
);
return;
}
for (const transactionRow of invalidTransactions) {
const transaction = mapTransaction(transactionRow);
if (transaction.operation === 'create') {
await nodeService.revertCreateTransaction(input.userId, transaction);
} else if (transaction.operation === 'update') {
await nodeService.revertUpdateTransaction(input.userId, transaction);
} else if (transaction.operation === 'delete') {
await nodeService.revertDeleteTransaction(input.userId, transaction);
}
}
}
}

View File

@@ -0,0 +1,24 @@
import { accountService } from '@/main/services/account-service';
import { JobHandler } from '@/main/jobs';
export type SyncAccountInput = {
type: 'sync_account';
accountId: string;
};
declare module '@/main/jobs' {
interface JobMap {
sync_account: {
input: SyncAccountInput;
};
}
}
export class SyncAccountJobHandler implements JobHandler<SyncAccountInput> {
public triggerDebounce = 0;
public interval = 1000 * 60;
public async handleJob(input: SyncAccountInput) {
await accountService.syncAccount(input.accountId);
}
}

View File

@@ -0,0 +1,83 @@
import { serverService } from '@/main/services/server-service';
import { createDebugger } from '@/main/debugger';
import { databaseService } from '@/main/data/database-service';
import { JobHandler } from '@/main/jobs';
import { httpClient } from '@/shared/lib/http-client';
export type SyncDeletedTokensInput = {
type: 'sync_deleted_tokens';
};
declare module '@/main/jobs' {
interface JobMap {
sync_deleted_tokens: {
input: SyncDeletedTokensInput;
};
}
}
export class SyncDeletedTokensJobHandler
implements JobHandler<SyncDeletedTokensInput>
{
public triggerDebounce = 100;
public interval = 1000 * 60 * 5;
private readonly debug = createDebugger('job:sync-deleted-tokens');
public async handleJob(_: SyncDeletedTokensInput) {
this.debug('Syncing deleted tokens');
const deletedTokens = await databaseService.appDatabase
.selectFrom('deleted_tokens')
.innerJoin('servers', 'deleted_tokens.server', 'servers.domain')
.select([
'deleted_tokens.token',
'deleted_tokens.account_id',
'servers.domain',
'servers.attributes',
])
.execute();
if (deletedTokens.length === 0) {
this.debug('No deleted tokens found');
return;
}
for (const deletedToken of deletedTokens) {
if (!serverService.isAvailable(deletedToken.domain)) {
this.debug(
`Server ${deletedToken.domain} is not available for logging out account ${deletedToken.account_id}`
);
continue;
}
try {
const { status } = await httpClient.delete(`/v1/accounts/logout`, {
domain: deletedToken.domain,
token: deletedToken.token,
});
this.debug(`Deleted token logout response status code: ${status}`);
if (status !== 200) {
return;
}
await databaseService.appDatabase
.deleteFrom('deleted_tokens')
.where('token', '=', deletedToken.token)
.where('account_id', '=', deletedToken.account_id)
.execute();
this.debug(
`Logged out account ${deletedToken.account_id} from server ${deletedToken.domain}`
);
} catch (error) {
this.debug(
`Failed to logout account ${deletedToken.account_id} from server ${deletedToken.domain}`,
error
);
}
}
}
}

View File

@@ -0,0 +1,155 @@
import { GetTransactionsOutput } from '@colanode/core';
import { sql } from 'kysely';
import { SelectTransaction } from '@/main/data/workspace/schema';
import { nodeService } from '@/main/services/node-service';
import { fetchCursor, fetchWorkspaceCredentials } from '@/main/utils';
import { createDebugger } from '@/main/debugger';
import { databaseService } from '@/main/data/database-service';
import { serverService } from '@/main/services/server-service';
import { JobHandler } from '@/main/jobs';
import { httpClient } from '@/shared/lib/http-client';
export type SyncIncompleteTransactionsInput = {
type: 'sync_incomplete_transactions';
userId: string;
};
declare module '@/main/jobs' {
interface JobMap {
sync_incomplete_transactions: {
input: SyncIncompleteTransactionsInput;
};
}
}
export class SyncIncompleteTransactionsJobHandler
implements JobHandler<SyncIncompleteTransactionsInput>
{
public triggerDebounce = 100;
public interval = 1000 * 60;
private readonly debug = createDebugger('job:sync-incomplete-transactions');
public async handleJob(input: SyncIncompleteTransactionsInput) {
this.debug(`Syncing incomplete transactions for user ${input.userId}`);
const workspaceDatabase = await databaseService.getWorkspaceDatabase(
input.userId
);
const incompleteTransactions = await workspaceDatabase
.selectFrom('transactions')
.selectAll()
.where('status', '=', 'incomplete')
.execute();
if (incompleteTransactions.length === 0) {
this.debug(
`No incomplete transactions found for user ${input.userId}, skipping`
);
return;
}
const credentials = await fetchWorkspaceCredentials(input.userId);
if (!credentials) {
this.debug(
`No workspace credentials found for user ${input.userId}, skipping`
);
return;
}
if (!serverService.isAvailable(credentials.serverDomain)) {
this.debug(
`Server ${credentials.serverDomain} is not available, skipping`
);
return;
}
const groupedByNodeId = incompleteTransactions.reduce<
Record<string, SelectTransaction[]>
>((acc, transaction) => {
acc[transaction.node_id] = [
...(acc[transaction.node_id] ?? []),
transaction,
];
return acc;
}, {});
for (const [nodeId, transactions] of Object.entries(groupedByNodeId)) {
try {
this.debug(
`Syncing incomplete transactions for node ${nodeId} for user ${input.userId}`
);
const { data } = await httpClient.get<GetTransactionsOutput>(
`/v1/workspaces/${credentials.workspaceId}/transactions/${nodeId}`,
{
domain: credentials.serverDomain,
token: credentials.token,
}
);
if (data.transactions.length === 0) {
this.debug(
`No transactions found for node ${nodeId} for user ${input.userId}, deleting`
);
await workspaceDatabase
.deleteFrom('transactions')
.where(
'id',
'in',
transactions.map((t) => t.id)
)
.execute();
continue;
}
const cursor = await fetchCursor(input.userId, 'transactions');
const synced = await nodeService.replaceTransactions(
input.userId,
nodeId,
data.transactions,
cursor
);
if (!synced) {
this.debug(
`Failed to sync transactions for node ${nodeId} for user ${input.userId}, incrementing retry count`
);
await workspaceDatabase
.updateTable('transactions')
.set({ retry_count: sql`retry_count + 1` })
.where(
'id',
'in',
transactions.map((t) => t.id)
)
.execute();
} else {
this.debug(
`Successfully synced transactions for node ${nodeId} for user ${input.userId}, resetting retry count`
);
await workspaceDatabase
.updateTable('transactions')
.set({ retry_count: sql`retry_count + 1` })
.where(
'id',
'in',
transactions.map((t) => t.id)
)
.where('status', '=', 'incomplete')
.execute();
}
} catch (error) {
this.debug(
error,
`Error syncing incomplete transactions for node ${nodeId} for user ${input.userId}`
);
}
}
}
}

View File

@@ -0,0 +1,95 @@
import { GetTransactionsOutput } from '@colanode/core';
import { nodeService } from '@/main/services/node-service';
import { fetchCursor, fetchWorkspaceCredentials } from '@/main/utils';
import { createDebugger } from '@/main/debugger';
import { databaseService } from '@/main/data/database-service';
import { serverService } from '@/main/services/server-service';
import { JobHandler } from '@/main/jobs';
import { httpClient } from '@/shared/lib/http-client';
export type SyncMissingNodesInput = {
type: 'sync_missing_nodes';
userId: string;
};
declare module '@/main/jobs' {
interface JobMap {
sync_missing_nodes: {
input: SyncMissingNodesInput;
};
}
}
export class SyncMissingNodesJobHandler
implements JobHandler<SyncMissingNodesInput>
{
public triggerDebounce = 100;
public interval = 1000 * 60;
private readonly debug = createDebugger('job:sync-missing-nodes');
public async handleJob(input: SyncMissingNodesInput) {
this.debug(`Syncing missing nodes for user ${input.userId}`);
const workspaceDatabase = await databaseService.getWorkspaceDatabase(
input.userId
);
const missingNodes = await workspaceDatabase
.selectFrom('collaborations')
.leftJoin('nodes', 'collaborations.node_id', 'nodes.id')
.select('collaborations.node_id')
.where('nodes.id', 'is', null)
.execute();
if (missingNodes.length === 0) {
this.debug(`No missing nodes found for user ${input.userId}, skipping`);
return;
}
const credentials = await fetchWorkspaceCredentials(input.userId);
if (!credentials) {
this.debug(
`No workspace credentials found for user ${input.userId}, skipping`
);
return;
}
if (!serverService.isAvailable(credentials.serverDomain)) {
this.debug(
`Server ${credentials.serverDomain} is not available, skipping`
);
return;
}
for (const node of missingNodes) {
try {
this.debug(
`Syncing missing node ${node.node_id} for user ${input.userId}`
);
const { data } = await httpClient.get<GetTransactionsOutput>(
`/v1/workspaces/${credentials.workspaceId}/transactions/${node.node_id}`,
{
domain: credentials.serverDomain,
token: credentials.token,
}
);
const cursor = await fetchCursor(input.userId, 'transactions');
await nodeService.replaceTransactions(
input.userId,
node.node_id,
data.transactions,
cursor
);
} catch (error) {
this.debug(
error,
`Error syncing missing node ${node.node_id} for user ${input.userId}`
);
}
}
}
}

View File

@@ -0,0 +1,142 @@
import { SyncInteractionsMessage } from '@colanode/core';
import { sql } from 'kysely';
import { SelectInteractionEvent } from '@/main/data/workspace/schema';
import { socketService } from '@/main/services/socket-service';
import { fetchWorkspaceCredentials } from '@/main/utils';
import { createDebugger } from '@/main/debugger';
import { databaseService } from '@/main/data/database-service';
import { serverService } from '@/main/services/server-service';
import { JobHandler } from '@/main/jobs';
export type SyncPendingInteractionsInput = {
type: 'sync_pending_interactions';
userId: string;
};
declare module '@/main/jobs' {
interface JobMap {
sync_pending_interactions: {
input: SyncPendingInteractionsInput;
};
}
}
export class SyncPendingInteractionsJobHandler
implements JobHandler<SyncPendingInteractionsInput>
{
public triggerDebounce = 100;
public interval = 1000 * 60;
private readonly debug = createDebugger('job:sync-pending-interactions');
public async handleJob(input: SyncPendingInteractionsInput) {
this.debug(`Sending local pending interactions for user ${input.userId}`);
const workspaceDatabase = await databaseService.getWorkspaceDatabase(
input.userId
);
const credentials = await fetchWorkspaceCredentials(input.userId);
if (!credentials) {
this.debug(
`No workspace credentials found for user ${input.userId}, skipping sending local pending interactions`
);
return;
}
if (!serverService.isAvailable(credentials.serverDomain)) {
this.debug(
`Server ${credentials.serverDomain} is not available, skipping sending local pending interactions`
);
return;
}
const cutoff = new Date(Date.now() - 1000 * 60 * 5).toISOString();
let cursor = '0';
let hasMore = true;
while (hasMore) {
const interactionEvents = await workspaceDatabase
.selectFrom('interaction_events')
.selectAll()
.where((eb) =>
eb.or([eb('sent_at', 'is', null), eb('sent_at', '<', cutoff)])
)
.where('event_id', '>', cursor)
.limit(50)
.execute();
if (interactionEvents.length === 0) {
this.debug(
`No local pending interactions found for user ${input.userId}, stopping sync`
);
hasMore = false;
break;
}
this.debug(
`Sending ${interactionEvents.length} local pending interactions for user ${input.userId}`
);
const groupedByNodeId: Record<string, SelectInteractionEvent[]> = {};
for (const event of interactionEvents) {
groupedByNodeId[event.node_id] = [
...(groupedByNodeId[event.node_id] ?? []),
event,
];
cursor = event.event_id;
}
const sentEventIds: string[] = [];
for (const [nodeId, events] of Object.entries(groupedByNodeId)) {
if (events.length === 0) {
continue;
}
const firstEvent = events[0];
if (!firstEvent) {
continue;
}
const message: SyncInteractionsMessage = {
type: 'sync_interactions',
nodeId,
nodeType: firstEvent.node_type,
userId: credentials.userId,
events: events.map((e) => ({
attribute: e.attribute,
value: e.value,
createdAt: e.created_at,
})),
};
const sent = socketService.sendMessage(credentials.accountId, message);
if (sent) {
sentEventIds.push(...events.map((e) => e.event_id));
}
}
if (sentEventIds.length > 0) {
this.debug(
`Marking ${sentEventIds.length} local pending interactions as sent for user ${input.userId}`
);
await workspaceDatabase
.updateTable('interaction_events')
.set({
sent_at: new Date().toISOString(),
sent_count: sql`sent_count + 1`,
})
.where('event_id', 'in', sentEventIds)
.execute();
await workspaceDatabase
.deleteFrom('interaction_events')
.where('sent_count', '>', 20)
.execute();
}
}
}
}

View File

@@ -0,0 +1,119 @@
import { SyncTransactionsOutput, LocalTransaction } from '@colanode/core';
import { fetchWorkspaceCredentials, mapTransaction } from '@/main/utils';
import { createDebugger } from '@/main/debugger';
import { databaseService } from '@/main/data/database-service';
import { serverService } from '@/main/services/server-service';
import { JobHandler } from '@/main/jobs';
import { httpClient } from '@/shared/lib/http-client';
export type SyncPendingTransactionsInput = {
type: 'sync_pending_transactions';
userId: string;
};
declare module '@/main/jobs' {
interface JobMap {
sync_pending_transactions: {
input: SyncPendingTransactionsInput;
};
}
}
export class SyncPendingTransactionsJobHandler
implements JobHandler<SyncPendingTransactionsInput>
{
public triggerDebounce = 0;
public interval = 1000 * 60;
private readonly debug = createDebugger('job:sync-pending-transactions');
public async handleJob(input: SyncPendingTransactionsInput) {
this.debug(`Sending local pending transactions for user ${input.userId}`);
const workspaceDatabase = await databaseService.getWorkspaceDatabase(
input.userId
);
const unsyncedTransactions = await workspaceDatabase
.selectFrom('transactions')
.selectAll()
.where('status', '=', 'pending')
.orderBy('id', 'asc')
.limit(20)
.execute();
if (unsyncedTransactions.length === 0) {
return;
}
this.debug(
`Sending ${unsyncedTransactions.length} local pending transactions for user ${input.userId}`
);
const credentials = await fetchWorkspaceCredentials(input.userId);
if (!credentials) {
this.debug(
`No workspace credentials found for user ${input.userId}, skipping sending local pending transactions`
);
return;
}
if (!serverService.isAvailable(credentials.serverDomain)) {
this.debug(
`Server ${credentials.serverDomain} is not available, skipping sending local pending transactions`
);
return;
}
const transactions: LocalTransaction[] =
unsyncedTransactions.map(mapTransaction);
const { data } = await httpClient.post<SyncTransactionsOutput>(
`/v1/workspaces/${credentials.workspaceId}/transactions`,
{
transactions,
},
{
domain: credentials.serverDomain,
token: credentials.token,
}
);
const syncedTransactionIds: string[] = [];
const unsyncedTransactionIds: string[] = [];
for (const result of data.results) {
if (result.status === 'success') {
syncedTransactionIds.push(result.id);
} else {
unsyncedTransactionIds.push(result.id);
}
}
if (syncedTransactionIds.length > 0) {
this.debug(
`Marking ${syncedTransactionIds.length} local pending transactions as sent for user ${input.userId}`
);
await workspaceDatabase
.updateTable('transactions')
.set({ status: 'sent' })
.where('id', 'in', syncedTransactionIds)
.where('status', '=', 'pending')
.execute();
}
if (unsyncedTransactionIds.length > 0) {
this.debug(
`Marking ${unsyncedTransactionIds.length} local pending transactions as failed for user ${input.userId}`
);
await workspaceDatabase
.updateTable('transactions')
.set((eb) => ({ retry_count: eb('retry_count', '+', 1) }))
.where('id', 'in', unsyncedTransactionIds)
.where('status', '=', 'pending')
.execute();
}
}
}

View File

@@ -0,0 +1,23 @@
import { serverService } from '@/main/services/server-service';
import { JobHandler } from '@/main/jobs';
export type SyncServersInput = {
type: 'sync_servers';
};
declare module '@/main/jobs' {
interface JobMap {
sync_servers: {
input: SyncServersInput;
};
}
}
export class SyncServersJobHandler implements JobHandler<SyncServersInput> {
public triggerDebounce = 0;
public interval = 1000 * 60;
public async handleJob(_: SyncServersInput) {
await serverService.syncServers();
}
}

View File

@@ -0,0 +1,347 @@
import { sha256 } from 'js-sha256';
import { socketService } from './services/socket-service';
import { databaseService } from '@/main/data/database-service';
import { createDebugger } from '@/main/debugger';
import { JobHandler, JobInput, JobMap } from '@/main/jobs';
import { SyncServersJobHandler } from '@/main/jobs/sync-servers';
import { SyncAccountJobHandler } from '@/main/jobs/sync-account';
import { InitSyncConsumersJobHandler } from '@/main/jobs/init-sync-consumers';
import { RevertInvalidTransactionsJobHandler } from '@/main/jobs/revert-invalid-transactions';
import { SyncPendingTransactionsJobHandler } from '@/main/jobs/sync-pending-transactions';
import { SyncIncompleteTransactionsJobHandler } from '@/main/jobs/sync-incomplete-transactions';
import { SyncPendingInteractionsJobHandler } from '@/main/jobs/sync-pending-interactions';
import { SyncMissingNodesJobHandler } from '@/main/jobs/sync-missing-nodes';
import { SyncDeletedTokensJobHandler } from '@/main/jobs/sync-deleted-tokens';
import { ConnectSocketJobHandler } from '@/main/jobs/connect-socket';
import { eventBus } from '@/shared/lib/event-bus';
import { Event } from '@/shared/types/events';
type JobHandlerMap = {
[K in keyof JobMap]: JobHandler<JobMap[K]['input']>;
};
export const jobHandlerMap: JobHandlerMap = {
sync_servers: new SyncServersJobHandler(),
sync_account: new SyncAccountJobHandler(),
init_sync_consumers: new InitSyncConsumersJobHandler(),
revert_invalid_transactions: new RevertInvalidTransactionsJobHandler(),
sync_pending_transactions: new SyncPendingTransactionsJobHandler(),
sync_incomplete_transactions: new SyncIncompleteTransactionsJobHandler(),
sync_pending_interactions: new SyncPendingInteractionsJobHandler(),
sync_missing_nodes: new SyncMissingNodesJobHandler(),
sync_deleted_tokens: new SyncDeletedTokensJobHandler(),
connect_socket: new ConnectSocketJobHandler(),
};
type JobState = {
id: string;
input: JobMap[keyof JobMap]['input'];
running: boolean;
triggered: boolean;
handler: JobHandler<JobMap[keyof JobMap]['input']>;
timeout: NodeJS.Timeout | null;
};
class Scheduler {
private readonly debug = createDebugger('scheduler');
private initPromise: Promise<void> | null = null;
private initialized = false;
private states: Map<string, JobState> = new Map();
constructor() {
eventBus.subscribe((event) => {
this.handleEvent(event);
});
}
public init() {
if (!this.initPromise) {
this.initPromise = this.executeInit();
}
return this.initPromise;
}
private async executeInit() {
this.debug('Initializing scheduler');
await databaseService.init();
await this.scheduleJobs();
this.initialized = true;
}
public schedule(input: JobInput) {
const id = sha256(JSON.stringify(input));
if (this.states.has(id)) {
return;
}
const handler = jobHandlerMap[input.type] as JobHandler<
JobMap[keyof JobMap]['input']
>;
if (!handler) {
this.debug(`No handler found for job type: ${input.type}`);
return;
}
const state: JobState = {
id,
input,
running: false,
triggered: false,
handler,
timeout: null,
};
state.timeout = setTimeout(() => {
this.executeJob(state);
}, 0);
this.states.set(id, state);
}
public trigger(input: JobInput) {
const id = sha256(JSON.stringify(input));
const state = this.states.get(id);
if (!state) {
return;
}
if (state.running) {
state.triggered = true;
return;
}
if (state.timeout) {
clearTimeout(state.timeout);
}
state.timeout = setTimeout(() => {
this.executeJob(state);
}, state.handler.triggerDebounce);
}
private async executeJob(state: JobState) {
if (state.running) {
return;
}
state.running = true;
state.triggered = false;
if (state.timeout) {
clearTimeout(state.timeout);
}
try {
await state.handler.handleJob(state.input);
} catch (error) {
this.debug(error, `Error executing job: ${state.input.type}`);
} finally {
state.running = false;
if (state.timeout === null && state.handler.interval > 0) {
state.timeout = setTimeout(() => {
this.executeJob(state);
}, state.handler.interval);
}
}
}
private async scheduleJobs() {
this.schedule({ type: 'sync_servers' });
this.schedule({ type: 'sync_deleted_tokens' });
}
private async scheduleServerAccountsJobs(domain: string) {
const accounts = await databaseService.appDatabase
.selectFrom('accounts')
.selectAll()
.where('server', '=', domain)
.execute();
for (const account of accounts) {
this.scheduleAccountJobs(account.id);
}
}
private scheduleAccountJobs(accountId: string) {
this.schedule({ type: 'sync_account', accountId });
this.schedule({ type: 'connect_socket', accountId });
}
private deleteAccountJobs(accountId: string) {
const jobIds = Array.from(this.states.keys());
for (const jobId of jobIds) {
const state = this.states.get(jobId);
if (!state) {
continue;
}
if (
state.input.type === 'sync_account' &&
state.input.accountId === accountId
) {
this.states.delete(jobId);
} else if (
state.input.type === 'connect_socket' &&
state.input.accountId === accountId
) {
this.states.delete(jobId);
}
}
}
private async scheduleAccountWorkspacesJobs(accountId: string) {
const workspaces = await databaseService.appDatabase
.selectFrom('workspaces')
.selectAll()
.where('account_id', '=', accountId)
.execute();
for (const workspace of workspaces) {
this.scheduleWorkspaceJobs(accountId, workspace.user_id);
}
}
private scheduleWorkspaceJobs(accountId: string, userId: string) {
if (!socketService.isConnected(accountId)) {
return;
}
this.schedule({
type: 'sync_pending_transactions',
userId,
});
this.schedule({
type: 'sync_incomplete_transactions',
userId,
});
this.schedule({
type: 'sync_pending_interactions',
userId,
});
this.schedule({
type: 'sync_missing_nodes',
userId,
});
this.schedule({
type: 'revert_invalid_transactions',
userId,
});
this.schedule({
type: 'init_sync_consumers',
userId,
accountId,
});
}
private deleteWorkspaceJobs(userId: string) {
const jobIds = Array.from(this.states.keys());
for (const jobId of jobIds) {
const state = this.states.get(jobId);
if (!state) {
continue;
}
if (
state.input.type === 'init_sync_consumers' &&
state.input.userId === userId
) {
this.states.delete(jobId);
}
if (
state.input.type === 'sync_pending_transactions' &&
state.input.userId === userId
) {
this.states.delete(jobId);
}
if (
state.input.type === 'sync_incomplete_transactions' &&
state.input.userId === userId
) {
this.states.delete(jobId);
}
if (
state.input.type === 'sync_pending_interactions' &&
state.input.userId === userId
) {
this.states.delete(jobId);
}
if (
state.input.type === 'sync_missing_nodes' &&
state.input.userId === userId
) {
this.states.delete(jobId);
}
if (
state.input.type === 'revert_invalid_transactions' &&
state.input.userId === userId
) {
this.states.delete(jobId);
}
}
}
private handleEvent(event: Event) {
if (!this.initialized) {
return;
}
if (event.type === 'server_availability_changed' && event.isAvailable) {
this.scheduleServerAccountsJobs(event.server.domain);
} else if (event.type === 'server_created') {
this.scheduleServerAccountsJobs(event.server.domain);
} else if (event.type === 'account_created') {
this.scheduleAccountJobs(event.account.id);
} else if (event.type === 'account_deleted') {
this.trigger({
type: 'sync_deleted_tokens',
});
this.deleteAccountJobs(event.account.id);
} else if (event.type === 'workspace_created') {
this.scheduleWorkspaceJobs(
event.workspace.accountId,
event.workspace.userId
);
} else if (event.type === 'workspace_deleted') {
this.deleteWorkspaceJobs(event.workspace.userId);
} else if (event.type === 'socket_connection_opened') {
this.scheduleAccountWorkspacesJobs(event.accountId);
} else if (event.type === 'transaction_created') {
this.trigger({
type: 'sync_pending_transactions',
userId: event.userId,
});
} else if (event.type === 'transaction_incomplete') {
this.trigger({
type: 'sync_incomplete_transactions',
userId: event.userId,
});
} else if (event.type === 'interaction_event_created') {
this.trigger({
type: 'sync_pending_interactions',
userId: event.userId,
});
}
}
}
export const scheduler = new Scheduler();

View File

@@ -14,28 +14,25 @@ import {
} from '@/main/utils';
import { eventBus } from '@/shared/lib/event-bus';
import { httpClient } from '@/shared/lib/http-client';
import { socketService } from '@/main/services/socket-service';
class AccountService {
private readonly debug = createDebugger('service:account');
async syncAccounts() {
this.debug('Syncing all accounts');
public async syncAccount(accountId: string) {
this.debug(`Syncing account ${accountId}`);
const accounts = await databaseService.appDatabase
const account = await databaseService.appDatabase
.selectFrom('accounts')
.selectAll()
.execute();
.where('id', '=', accountId)
.executeTakeFirst();
for (const account of accounts) {
await this.syncAccount(account);
if (!account) {
this.debug(`Account ${accountId} not found`);
return;
}
await this.syncDeletedTokens();
}
private async syncAccount(account: SelectAccount) {
this.debug(`Syncing account ${account.email}`);
const server = await databaseService.appDatabase
.selectFrom('servers')
.selectAll()
@@ -69,6 +66,7 @@ class AccountService {
if (status >= 400 && status < 500) {
this.debug(`Account ${account.email} is not valid, logging out...`);
await this.logoutAccount(account);
socketService.removeConnection(account.id);
return;
}
@@ -82,28 +80,33 @@ class AccountService {
.where('account_id', '=', account.id)
.execute();
const updatedAccount = await databaseService.appDatabase
.updateTable('accounts')
.returningAll()
.set({
name: data.account.name,
avatar: data.account.avatar,
})
.where('id', '=', account.id)
.executeTakeFirst();
if (
data.account.name !== account.name ||
data.account.avatar !== account.avatar
) {
const updatedAccount = await databaseService.appDatabase
.updateTable('accounts')
.returningAll()
.set({
name: data.account.name,
avatar: data.account.avatar,
})
.where('id', '=', account.id)
.executeTakeFirst();
if (!updatedAccount) {
this.debug(`Failed to update account ${account.email} after sync`);
return;
} else {
this.debug(`Updated account ${account.email} after sync`);
if (!updatedAccount) {
this.debug(`Failed to update account ${account.email} after sync`);
return;
} else {
this.debug(`Updated account ${account.email} after sync`);
}
eventBus.publish({
type: 'account_updated',
account: mapAccount(updatedAccount),
});
}
eventBus.publish({
type: 'account_updated',
account: mapAccount(updatedAccount),
});
for (const workspace of data.workspaces) {
const currentWorkspace = currentWorkspaces.find(
(w) => w.workspace_id === workspace.id
@@ -234,62 +237,6 @@ class AccountService {
return true;
}
public async syncDeletedTokens() {
this.debug('Syncing deleted tokens');
const deletedTokens = await databaseService.appDatabase
.selectFrom('deleted_tokens')
.innerJoin('servers', 'deleted_tokens.server', 'servers.domain')
.select([
'deleted_tokens.token',
'deleted_tokens.account_id',
'servers.domain',
'servers.attributes',
])
.execute();
if (deletedTokens.length === 0) {
this.debug('No deleted tokens found');
return;
}
for (const deletedToken of deletedTokens) {
if (!serverService.isAvailable(deletedToken.domain)) {
this.debug(
`Server ${deletedToken.domain} is not available for logging out account ${deletedToken.account_id}`
);
continue;
}
try {
const { status } = await httpClient.delete(`/v1/accounts/logout`, {
domain: deletedToken.domain,
token: deletedToken.token,
});
this.debug(`Deleted token logout response status code: ${status}`);
if (status !== 200) {
return;
}
await databaseService.appDatabase
.deleteFrom('deleted_tokens')
.where('token', '=', deletedToken.token)
.where('account_id', '=', deletedToken.account_id)
.execute();
this.debug(
`Logged out account ${deletedToken.account_id} from server ${deletedToken.domain}`
);
} catch {
this.debug(
`Failed to logout account ${deletedToken.account_id} from server ${deletedToken.domain}`
);
}
}
}
private async deleteWorkspace(userId: string): Promise<boolean> {
this.debug(`Deleting workspace ${userId}`);

View File

@@ -1,7 +1,7 @@
import { Message } from '@colanode/core';
import { WebSocket } from 'ws';
import { accountService } from '@/main/services/account-service';
import { scheduler } from '@/main/scheduler';
import { createDebugger } from '@/main/debugger';
import { SelectAccount } from '@/main/data/app/schema';
import { syncService } from '@/main/services/sync-service';
@@ -67,13 +67,25 @@ export class SocketConnection {
} else if (message.type === 'interactions_batch') {
syncService.syncServerInteractions(message);
} else if (message.type === 'account_updated') {
accountService.syncAccounts();
scheduler.trigger({
type: 'sync_account',
accountId: this.account.id,
});
} else if (message.type === 'workspace_updated') {
accountService.syncAccounts();
scheduler.trigger({
type: 'sync_account',
accountId: this.account.id,
});
} else if (message.type === 'workspace_user_created') {
accountService.syncAccounts();
scheduler.trigger({
type: 'sync_account',
accountId: this.account.id,
});
} else if (message.type === 'workspace_deleted') {
accountService.syncAccounts();
scheduler.trigger({
type: 'sync_account',
accountId: this.account.id,
});
}
};

View File

@@ -1,28 +1,15 @@
import { Message } from '@colanode/core';
import { SelectAccount } from '@/main/data/app/schema';
import { createDebugger } from '@/main/debugger';
import { databaseService } from '@/main/data/database-service';
import { serverService } from '@/main/services/server-service';
import { SocketConnection } from '@/main/services/socket-connection';
import { eventBus } from '@/shared/lib/event-bus';
class SocketService {
private readonly debug = createDebugger('service:socket');
private readonly sockets: Map<string, SocketConnection> = new Map();
constructor() {
eventBus.subscribe((event) => {
if (event.type === 'server_availability_changed' && event.isAvailable) {
this.checkConnections();
} else if (
event.type === 'account_created' ||
event.type === 'account_updated' ||
event.type === 'account_deleted'
) {
this.checkConnections();
}
});
}
constructor() {}
public sendMessage(accountId: string, message: Message): boolean {
const connection = this.sockets.get(accountId);
@@ -33,47 +20,32 @@ class SocketService {
return connection.sendMessage(message);
}
public async checkConnections() {
this.debug('Checking socket connections');
const accounts = await databaseService.appDatabase
.selectFrom('accounts')
.selectAll()
.where('status', '=', 'active')
.execute();
// Update accounts map
for (const account of accounts) {
if (!serverService.isAvailable(account.server)) {
this.debug(
`Server ${account.server} is not available, skipping socket connection`
);
continue;
}
const socket = this.sockets.get(account.id);
if (socket) {
socket.checkConnection();
continue;
}
const synapseUrl = serverService.buildSynapseUrl(account.server);
const connection = new SocketConnection(synapseUrl, account);
connection.init();
this.sockets.set(account.id, connection);
public checkConnection(account: SelectAccount) {
const socket = this.sockets.get(account.id);
if (socket) {
socket.checkConnection();
return;
}
// Remove logged out or missing accounts
for (const [accountId, connection] of this.sockets.entries()) {
const account = accounts.find((acc) => acc.id === accountId);
if (!account) {
connection.close();
this.sockets.delete(accountId);
}
const synapseUrl = serverService.buildSynapseUrl(account.server);
const connection = new SocketConnection(synapseUrl, account);
connection.init();
this.sockets.set(account.id, connection);
}
public removeConnection(accountId: string) {
const connection = this.sockets.get(accountId);
if (connection) {
connection.close();
this.sockets.delete(accountId);
}
}
public isConnected(accountId: string): boolean {
const connection = this.sockets.get(accountId);
return connection?.isConnected() ?? false;
}
}
export const socketService = new SocketService();

View File

@@ -0,0 +1,267 @@
import {
CollaborationsBatchMessage,
DeletedCollaborationsBatchMessage,
InitSyncConsumerMessage,
InteractionsBatchMessage,
SyncConsumerType,
TransactionsBatchMessage,
} from '@colanode/core';
import { interactionService } from './interaction-service';
import { nodeService } from '@/main/services/node-service';
import { collaborationService } from '@/main/services/collaboration-service';
import { socketService } from '@/main/services/socket-service';
import { fetchCursor, updateCursor } from '@/main/utils';
import { createDebugger } from '@/main/debugger';
type SyncConsumerStatus = 'idle' | 'fetching' | 'syncing';
export class SyncConsumer {
private readonly debug = createDebugger('service:sync-consumer');
public readonly accountId: string;
public readonly userId: string;
public readonly type: SyncConsumerType;
private cursor: bigint | null = null;
private status: SyncConsumerStatus = 'idle';
constructor(accountId: string, userId: string, type: SyncConsumerType) {
this.accountId = accountId;
this.userId = userId;
this.type = type;
}
public async init() {
if (this.status === 'fetching') {
return;
}
const cursor = await fetchCursor(this.userId, this.type);
this.initConsumer(cursor);
}
public async ping() {
if (this.status === 'idle' || this.cursor === null) {
await this.init();
return;
}
if (this.status === 'syncing') {
return;
}
const cursor = await fetchCursor(this.userId, this.type);
if (cursor !== this.cursor) {
this.cursor = cursor;
this.initConsumer(cursor);
}
}
public async syncTransactions(message: TransactionsBatchMessage) {
if (this.type !== 'transactions') {
this.debug(
`Syncing of server transactions not supported for consumer type ${this.type}, skipping`
);
return;
}
if (this.status === 'syncing') {
this.debug(
`Syncing of server transactions already in progress for user ${message.userId}, skipping`
);
return;
}
this.debug(`Syncing server transactions for user ${message.userId}`);
this.status = 'syncing';
let cursor: bigint | null = null;
try {
for (const transaction of message.transactions) {
await nodeService.applyServerTransaction(message.userId, transaction);
cursor = BigInt(transaction.version);
}
} catch (error) {
this.debug(
error,
`Error syncing server transactions for user ${message.userId}`
);
} finally {
this.debug(
`Syncing of server transactions completed for user ${message.userId}`
);
if (cursor) {
this.cursor = cursor;
this.initConsumer(cursor);
await updateCursor(message.userId, 'transactions', cursor);
this.status = 'idle';
} else {
this.status = 'idle';
this.ping();
}
}
}
public async syncCollaborations(message: CollaborationsBatchMessage) {
if (this.type !== 'collaborations') {
this.debug(
`Syncing of server collaborations not supported for consumer type ${this.type}, skipping`
);
return;
}
if (this.status === 'syncing') {
this.debug(
`Syncing of server collaborations already in progress for user ${message.userId}, skipping`
);
return;
}
this.debug(`Syncing server collaborations for user ${message.userId}`);
this.status = 'syncing';
let cursor: bigint | null = null;
try {
for (const collaboration of message.collaborations) {
await collaborationService.applyServerCollaboration(
message.userId,
collaboration
);
cursor = BigInt(collaboration.version);
}
} catch (error) {
this.debug(
error,
`Error syncing server collaborations for user ${message.userId}`
);
} finally {
if (cursor) {
await updateCursor(message.userId, 'collaborations', cursor);
this.cursor = cursor;
this.status = 'idle';
this.initConsumer(cursor);
} else {
this.status = 'idle';
this.ping();
}
}
}
public async syncDeletedCollaborations(
message: DeletedCollaborationsBatchMessage
) {
if (this.type !== 'deleted_collaborations') {
this.debug(
`Syncing of server deleted collaborations not supported for consumer type ${this.type}, skipping`
);
return;
}
if (this.status === 'syncing') {
this.debug(
`Syncing of server deleted collaborations already in progress for user ${message.userId}, skipping`
);
return;
}
this.debug(
`Syncing server deleted collaborations for user ${message.userId}`
);
this.status = 'syncing';
let cursor: bigint | null = null;
try {
for (const deletedCollaboration of message.deletedCollaborations) {
await collaborationService.applyServerDeletedCollaboration(
message.userId,
deletedCollaboration
);
cursor = BigInt(deletedCollaboration.version);
}
} catch (error) {
this.debug(
error,
`Error syncing server deleted collaborations for user ${message.userId}`
);
} finally {
this.debug(
`Syncing of server deleted collaborations completed for user ${message.userId}`
);
if (cursor) {
await updateCursor(message.userId, 'deleted_collaborations', cursor);
this.cursor = cursor;
this.status = 'idle';
this.initConsumer(cursor);
} else {
this.status = 'idle';
this.ping();
}
}
}
public async syncInteractions(message: InteractionsBatchMessage) {
if (this.type !== 'interactions') {
this.debug(
`Syncing of server interactions not supported for consumer type ${this.type}, skipping`
);
return;
}
if (this.status === 'syncing') {
this.debug(
`Syncing of server interactions already in progress for user ${message.userId}, skipping`
);
return;
}
this.debug(`Syncing server interactions for user ${message.userId}`);
this.status = 'syncing';
let cursor: bigint | null = null;
try {
for (const interaction of message.interactions) {
await interactionService.applyServerInteraction(
message.userId,
interaction
);
cursor = BigInt(interaction.version);
}
} catch (error) {
this.debug(
error,
`Error syncing server interactions for user ${message.userId}`
);
} finally {
this.debug(
`Syncing of server interactions completed for user ${message.userId}`
);
if (cursor) {
await updateCursor(message.userId, 'interactions', cursor);
this.cursor = cursor;
this.status = 'idle';
this.initConsumer(cursor);
} else {
this.status = 'idle';
this.ping();
}
}
}
private initConsumer(cursor: bigint) {
const message: InitSyncConsumerMessage = {
type: 'init_sync_consumer',
userId: this.userId,
consumerType: this.type,
cursor: cursor.toString(),
};
const sent = socketService.sendMessage(this.accountId, message);
if (sent) {
this.status = 'fetching';
}
}
}

View File

@@ -1,916 +1,94 @@
import {
CollaborationsBatchMessage,
DeletedCollaborationsBatchMessage,
InitSyncConsumerMessage,
GetTransactionsOutput,
InteractionsBatchMessage,
LocalTransaction,
TransactionsBatchMessage,
SyncInteractionsMessage,
SyncTransactionsOutput,
SyncConsumerType,
TransactionsBatchMessage,
} from '@colanode/core';
import { sql } from 'kysely';
import { SyncConsumer } from '@/main/services/sync-consumer';
import { createDebugger } from '@/main/debugger';
import { databaseService } from '@/main/data/database-service';
import {
SelectInteractionEvent,
SelectTransaction,
} from '@/main/data/workspace/schema';
import { collaborationService } from '@/main/services/collaboration-service';
import { interactionService } from '@/main/services/interaction-service';
import { nodeService } from '@/main/services/node-service';
import { serverService } from '@/main/services/server-service';
import { socketService } from '@/main/services/socket-service';
import { fetchWorkspaceCredentials, mapTransaction } from '@/main/utils';
import { eventBus } from '@/shared/lib/event-bus';
import { httpClient } from '@/shared/lib/http-client';
import { CollaborationSyncedEvent } from '@/shared/types/events';
type WorkspaceSyncState = {
isSyncing: boolean;
scheduledSync: boolean;
};
class SyncService {
private readonly debug = createDebugger('service:sync');
private readonly localPendingTransactionStates: Map<
string,
WorkspaceSyncState
> = new Map();
private readonly consumers: Map<string, SyncConsumer> = new Map();
private readonly localIncompleteTransactionStates: Map<
string,
WorkspaceSyncState
> = new Map();
public async initUserConsumers(accountId: string, userId: string) {
await this.createConsumer(accountId, userId, 'transactions');
await this.createConsumer(accountId, userId, 'deleted_collaborations');
await this.createConsumer(accountId, userId, 'collaborations');
await this.createConsumer(accountId, userId, 'interactions');
}
private readonly localPendingInteractionStates: Map<
string,
WorkspaceSyncState
> = new Map();
private readonly syncingTransactions: Set<string> = new Set();
private readonly syncingCollaborations: Set<string> = new Set();
private readonly syncingDeletedCollaborations: Set<string> = new Set();
private readonly syncingInteractions: Set<string> = new Set();
constructor() {
eventBus.subscribe((event) => {
if (event.type === 'transaction_created') {
this.syncLocalPendingTransactions(event.userId);
} else if (event.type === 'transaction_incomplete') {
this.syncLocalIncompleteTransactions(event.userId);
} else if (event.type === 'workspace_created') {
this.syncWorkspace(event.workspace.userId);
} else if (event.type === 'socket_connection_opened') {
this.syncAllWorkspaces();
} else if (event.type === 'collaboration_synced') {
this.checkForMissingNode(event);
} else if (event.type === 'interaction_event_created') {
this.syncLocalPendingInteractions(event.userId);
public async deleteUserConsumers(userId: string) {
this.consumers.forEach((consumer, key) => {
if (consumer.userId === userId) {
this.consumers.delete(key);
}
});
}
public async syncAllWorkspaces() {
this.debug('Syncing all workspaces');
const workspaces = await databaseService.appDatabase
.selectFrom('workspaces')
.select(['user_id'])
.execute();
for (const workspace of workspaces) {
await this.syncWorkspace(workspace.user_id);
}
}
private async syncWorkspace(userId: string) {
this.syncLocalPendingTransactions(userId);
this.syncLocalIncompleteTransactions(userId);
this.syncInvalidTransactions(userId);
this.syncLocalPendingInteractions(userId);
this.initSyncConsumer(userId, 'transactions');
this.initSyncConsumer(userId, 'collaborations');
this.initSyncConsumer(userId, 'deleted_collaborations');
this.initSyncConsumer(userId, 'interactions');
this.syncMissingNodes(userId);
}
public async syncLocalPendingTransactions(userId: string) {
this.debug(`Syncing local pending transactions for user ${userId}`);
if (!this.localPendingTransactionStates.has(userId)) {
this.localPendingTransactionStates.set(userId, {
isSyncing: false,
scheduledSync: false,
});
}
const syncState = this.localPendingTransactionStates.get(userId)!;
if (syncState.isSyncing) {
this.debug(
`Syncing of local pending transactions already in progress for user ${userId}, scheduling sync`
);
syncState.scheduledSync = true;
private async createConsumer(
accountId: string,
userId: string,
type: SyncConsumerType
) {
const key = this.getConsumerKey(userId, type);
const consumer = this.consumers.get(key);
if (consumer) {
consumer.ping();
return;
}
syncState.isSyncing = true;
try {
await this.sendLocalTransactions(userId);
} catch (error) {
this.debug(error, `Error syncing local transactions for user ${userId}`);
} finally {
syncState.isSyncing = false;
this.debug(
`Syncing of local pending transactions completed for user ${userId}`
);
this.debug(
`Creating new sync consumer for account ${accountId} and user ${userId} of type ${type}`
);
const newConsumer = new SyncConsumer(accountId, userId, type);
this.consumers.set(key, newConsumer);
if (syncState.scheduledSync) {
syncState.scheduledSync = false;
this.syncLocalPendingTransactions(userId);
}
}
}
public async syncLocalPendingInteractions(userId: string) {
this.debug(`Syncing local pending interactions for user ${userId}`);
if (!this.localPendingInteractionStates.has(userId)) {
this.localPendingInteractionStates.set(userId, {
isSyncing: false,
scheduledSync: false,
});
}
const syncState = this.localPendingInteractionStates.get(userId)!;
if (syncState.isSyncing) {
this.debug(
`Syncing of local pending interactions already in progress for user ${userId}, scheduling sync`
);
syncState.scheduledSync = true;
return;
}
syncState.isSyncing = true;
try {
await this.sendLocalInteractions(userId);
} catch (error) {
this.debug(error, `Error syncing local interactions for user ${userId}`);
} finally {
syncState.isSyncing = false;
this.debug(
`Syncing of local pending interactions completed for user ${userId}`
);
if (syncState.scheduledSync) {
syncState.scheduledSync = false;
this.syncLocalPendingInteractions(userId);
}
}
}
public async syncLocalIncompleteTransactions(userId: string) {
this.debug(`Syncing local incomplete transactions for user ${userId}`);
if (!this.localIncompleteTransactionStates.has(userId)) {
this.localIncompleteTransactionStates.set(userId, {
isSyncing: false,
scheduledSync: false,
});
}
const syncState = this.localIncompleteTransactionStates.get(userId)!;
if (syncState.isSyncing) {
this.debug(
`Syncing of local incomplete transactions already in progress for user ${userId}, scheduling sync`
);
syncState.scheduledSync = true;
return;
}
syncState.isSyncing = true;
try {
await this.syncIncompleteTransactions(userId);
} catch (error) {
this.debug(
error,
`Error syncing incomplete transactions for user ${userId}`
);
} finally {
syncState.isSyncing = false;
this.debug(
`Syncing of local incomplete transactions completed for user ${userId}`
);
if (syncState.scheduledSync) {
syncState.scheduledSync = false;
this.syncLocalIncompleteTransactions(userId);
}
}
await newConsumer.init();
}
public async syncServerTransactions(message: TransactionsBatchMessage) {
this.debug(`Syncing server transactions for user ${message.userId}`);
if (this.syncingTransactions.has(message.userId)) {
this.debug(
`Syncing of server transactions already in progress for user ${message.userId}, skipping`
);
return;
}
this.syncingTransactions.add(message.userId);
let cursor: bigint | null = null;
try {
for (const transaction of message.transactions) {
await nodeService.applyServerTransaction(message.userId, transaction);
cursor = BigInt(transaction.version);
}
if (cursor) {
this.updateCursor(message.userId, 'transactions', cursor);
}
} catch (error) {
this.debug(
error,
`Error syncing server transactions for user ${message.userId}`
);
} finally {
this.debug(
`Syncing of server transactions completed for user ${message.userId}`
);
this.syncingTransactions.delete(message.userId);
this.initSyncConsumer(message.userId, 'transactions');
}
}
public async syncServerCollaborations(message: CollaborationsBatchMessage) {
this.debug(`Syncing server collaborations for user ${message.userId}`);
if (this.syncingCollaborations.has(message.userId)) {
this.debug(
`Syncing of server collaborations already in progress for user ${message.userId}, skipping`
);
return;
}
this.syncingCollaborations.add(message.userId);
let cursor: bigint | null = null;
try {
for (const collaboration of message.collaborations) {
await collaborationService.applyServerCollaboration(
message.userId,
collaboration
);
cursor = BigInt(collaboration.version);
}
if (cursor) {
this.updateCursor(message.userId, 'collaborations', cursor);
}
} catch (error) {
this.debug(
error,
`Error syncing server collaborations for user ${message.userId}`
);
} finally {
this.syncingCollaborations.delete(message.userId);
this.initSyncConsumer(message.userId, 'collaborations');
const consumer = this.consumers.get(
this.getConsumerKey(message.userId, 'transactions')
);
if (consumer) {
consumer.syncTransactions(message);
}
}
public async syncServerDeletedCollaborations(
message: DeletedCollaborationsBatchMessage
) {
this.debug(
`Syncing server deleted collaborations for user ${message.userId}`
const consumer = this.consumers.get(
this.getConsumerKey(message.userId, 'deleted_collaborations')
);
if (this.syncingDeletedCollaborations.has(message.userId)) {
this.debug(
`Syncing of server deleted collaborations already in progress for user ${message.userId}, skipping`
);
return;
if (consumer) {
consumer.syncDeletedCollaborations(message);
}
}
this.syncingDeletedCollaborations.add(message.userId);
let cursor: bigint | null = null;
try {
for (const deletedCollaboration of message.deletedCollaborations) {
await collaborationService.applyServerDeletedCollaboration(
message.userId,
deletedCollaboration
);
cursor = BigInt(deletedCollaboration.version);
}
if (cursor) {
this.updateCursor(message.userId, 'deleted_collaborations', cursor);
}
} catch (error) {
this.debug(
error,
`Error syncing server deleted collaborations for user ${message.userId}`
);
} finally {
this.debug(
`Syncing of server deleted collaborations completed for user ${message.userId}`
);
this.syncingDeletedCollaborations.delete(message.userId);
this.initSyncConsumer(message.userId, 'deleted_collaborations');
public async syncServerCollaborations(message: CollaborationsBatchMessage) {
const consumer = this.consumers.get(
this.getConsumerKey(message.userId, 'collaborations')
);
if (consumer) {
consumer.syncCollaborations(message);
}
}
public async syncServerInteractions(message: InteractionsBatchMessage) {
this.debug(`Syncing server interactions for user ${message.userId}`);
if (this.syncingInteractions.has(message.userId)) {
this.debug(
`Syncing of server interactions already in progress for user ${message.userId}, skipping`
);
return;
}
this.syncingInteractions.add(message.userId);
let cursor: bigint | null = null;
try {
for (const interaction of message.interactions) {
await interactionService.applyServerInteraction(
message.userId,
interaction
);
cursor = BigInt(interaction.version);
}
if (cursor) {
this.updateCursor(message.userId, 'interactions', cursor);
}
} catch (error) {
this.debug(
error,
`Error syncing server interactions for user ${message.userId}`
);
} finally {
this.debug(
`Syncing of server interactions completed for user ${message.userId}`
);
this.syncingInteractions.delete(message.userId);
this.initSyncConsumer(message.userId, 'interactions');
}
}
private async syncIncompleteTransactions(userId: string) {
this.debug(`Syncing incomplete transactions for user ${userId}`);
const workspaceDatabase =
await databaseService.getWorkspaceDatabase(userId);
const incompleteTransactions = await workspaceDatabase
.selectFrom('transactions')
.selectAll()
.where('status', '=', 'incomplete')
.execute();
if (incompleteTransactions.length === 0) {
this.debug(
`No incomplete transactions found for user ${userId}, skipping`
);
return;
}
const credentials = await fetchWorkspaceCredentials(userId);
if (!credentials) {
this.debug(`No workspace credentials found for user ${userId}, skipping`);
return;
}
if (!serverService.isAvailable(credentials.serverDomain)) {
this.debug(
`Server ${credentials.serverDomain} is not available, skipping`
);
return;
}
const groupedByNodeId = incompleteTransactions.reduce<
Record<string, SelectTransaction[]>
>((acc, transaction) => {
acc[transaction.node_id] = [
...(acc[transaction.node_id] ?? []),
transaction,
];
return acc;
}, {});
for (const [nodeId, transactions] of Object.entries(groupedByNodeId)) {
try {
this.debug(
`Syncing incomplete transactions for node ${nodeId} for user ${userId}`
);
const { data } = await httpClient.get<GetTransactionsOutput>(
`/v1/workspaces/${credentials.workspaceId}/transactions/${nodeId}`,
{
domain: credentials.serverDomain,
token: credentials.token,
}
);
if (data.transactions.length === 0) {
this.debug(
`No transactions found for node ${nodeId} for user ${userId}, deleting`
);
await workspaceDatabase
.deleteFrom('transactions')
.where(
'id',
'in',
transactions.map((t) => t.id)
)
.execute();
continue;
}
const cursor = await this.fetchCursor(userId, 'transactions');
const synced = await nodeService.replaceTransactions(
userId,
nodeId,
data.transactions,
cursor
);
if (!synced) {
this.debug(
`Failed to sync transactions for node ${nodeId} for user ${userId}, incrementing retry count`
);
await workspaceDatabase
.updateTable('transactions')
.set({ retry_count: sql`retry_count + 1` })
.where(
'id',
'in',
transactions.map((t) => t.id)
)
.execute();
} else {
this.debug(
`Successfully synced transactions for node ${nodeId} for user ${userId}, resetting retry count`
);
await workspaceDatabase
.updateTable('transactions')
.set({ retry_count: sql`retry_count + 1` })
.where(
'id',
'in',
transactions.map((t) => t.id)
)
.where('status', '=', 'incomplete')
.execute();
}
} catch (error) {
this.debug(
error,
`Error syncing incomplete transactions for node ${nodeId} for user ${userId}`
);
}
}
}
private async syncInvalidTransactions(userId: string) {
this.debug(`Syncing invalid transactions for user ${userId}`);
const workspaceDatabase =
await databaseService.getWorkspaceDatabase(userId);
const invalidTransactions = await workspaceDatabase
.selectFrom('transactions')
.selectAll()
.where('status', '=', 'pending')
.where('retry_count', '>=', 10)
.execute();
if (invalidTransactions.length === 0) {
this.debug(`No invalid transactions found for user ${userId}, skipping`);
return;
}
for (const transactionRow of invalidTransactions) {
const transaction = mapTransaction(transactionRow);
if (transaction.operation === 'create') {
await nodeService.revertCreateTransaction(userId, transaction);
} else if (transaction.operation === 'update') {
await nodeService.revertUpdateTransaction(userId, transaction);
} else if (transaction.operation === 'delete') {
await nodeService.revertDeleteTransaction(userId, transaction);
}
}
}
private async syncMissingNodes(userId: string) {
this.debug(`Syncing missing nodes for user ${userId}`);
const workspaceDatabase =
await databaseService.getWorkspaceDatabase(userId);
const missingNodes = await workspaceDatabase
.selectFrom('collaborations')
.leftJoin('nodes', 'collaborations.node_id', 'nodes.id')
.select('collaborations.node_id')
.where('nodes.id', 'is', null)
.execute();
if (missingNodes.length === 0) {
this.debug(`No missing nodes found for user ${userId}, skipping`);
return;
}
const credentials = await fetchWorkspaceCredentials(userId);
if (!credentials) {
this.debug(`No workspace credentials found for user ${userId}, skipping`);
return;
}
if (!serverService.isAvailable(credentials.serverDomain)) {
this.debug(
`Server ${credentials.serverDomain} is not available, skipping`
);
return;
}
for (const node of missingNodes) {
try {
this.debug(`Syncing missing node ${node.node_id} for user ${userId}`);
const { data } = await httpClient.get<GetTransactionsOutput>(
`/v1/workspaces/${credentials.workspaceId}/transactions/${node.node_id}`,
{
domain: credentials.serverDomain,
token: credentials.token,
}
);
const cursor = await this.fetchCursor(userId, 'transactions');
await nodeService.replaceTransactions(
userId,
node.node_id,
data.transactions,
cursor
);
} catch (error) {
this.debug(
error,
`Error syncing missing node ${node.node_id} for user ${userId}`
);
}
}
}
private async checkForMissingNode(event: CollaborationSyncedEvent) {
this.debug(
`Checking for missing node ${event.nodeId} for user ${event.userId}`
const consumer = this.consumers.get(
this.getConsumerKey(message.userId, 'interactions')
);
// check only if the collaboration has been created in the last minute
if (new Date().getTime() - event.createdAt.getTime() > 60000) {
this.debug(
`Collaboration ${event.nodeId} for user ${event.userId} was created more than a minute ago, skipping`
);
return;
}
const workspaceDatabase = await databaseService.getWorkspaceDatabase(
event.userId
);
const node = await workspaceDatabase
.selectFrom('nodes')
.selectAll()
.where('id', '=', event.nodeId)
.executeTakeFirst();
if (node) {
this.debug(
`Node ${event.nodeId} for user ${event.userId} found, skipping`
);
return;
}
const credentials = await fetchWorkspaceCredentials(event.userId);
if (!credentials) {
this.debug(
`No workspace credentials found for user ${event.userId}, skipping`
);
return;
}
if (!serverService.isAvailable(credentials.serverDomain)) {
this.debug(
`Server ${credentials.serverDomain} is not available, skipping`
);
return;
}
try {
const { data } = await httpClient.get<GetTransactionsOutput>(
`/v1/workspaces/${credentials.workspaceId}/transactions/${event.nodeId}`,
{
domain: credentials.serverDomain,
token: credentials.token,
}
);
const cursor = await this.fetchCursor(event.userId, 'transactions');
await nodeService.replaceTransactions(
event.userId,
event.nodeId,
data.transactions,
cursor
);
} catch (error) {
this.debug(
error,
`Error checking for missing node ${event.nodeId} for user ${event.userId}`
);
if (consumer) {
consumer.syncInteractions(message);
}
}
private async sendLocalTransactions(userId: string) {
this.debug(`Sending local pending transactions for user ${userId}`);
const workspaceDatabase =
await databaseService.getWorkspaceDatabase(userId);
const unsyncedTransactions = await workspaceDatabase
.selectFrom('transactions')
.selectAll()
.where('status', '=', 'pending')
.orderBy('id', 'asc')
.limit(20)
.execute();
if (unsyncedTransactions.length === 0) {
return;
}
this.debug(
`Sending ${unsyncedTransactions.length} local pending transactions for user ${userId}`
);
const credentials = await fetchWorkspaceCredentials(userId);
if (!credentials) {
this.debug(
`No workspace credentials found for user ${userId}, skipping sending local pending transactions`
);
return;
}
if (!serverService.isAvailable(credentials.serverDomain)) {
this.debug(
`Server ${credentials.serverDomain} is not available, skipping sending local pending transactions`
);
return;
}
const transactions: LocalTransaction[] =
unsyncedTransactions.map(mapTransaction);
const { data } = await httpClient.post<SyncTransactionsOutput>(
`/v1/workspaces/${credentials.workspaceId}/transactions`,
{
transactions,
},
{
domain: credentials.serverDomain,
token: credentials.token,
}
);
const syncedTransactionIds: string[] = [];
const unsyncedTransactionIds: string[] = [];
for (const result of data.results) {
if (result.status === 'success') {
syncedTransactionIds.push(result.id);
} else {
unsyncedTransactionIds.push(result.id);
}
}
if (syncedTransactionIds.length > 0) {
this.debug(
`Marking ${syncedTransactionIds.length} local pending transactions as sent for user ${userId}`
);
await workspaceDatabase
.updateTable('transactions')
.set({ status: 'sent' })
.where('id', 'in', syncedTransactionIds)
.where('status', '=', 'pending')
.execute();
}
if (unsyncedTransactionIds.length > 0) {
this.debug(
`Marking ${unsyncedTransactionIds.length} local pending transactions as failed for user ${userId}`
);
await workspaceDatabase
.updateTable('transactions')
.set((eb) => ({ retry_count: eb('retry_count', '+', 1) }))
.where('id', 'in', unsyncedTransactionIds)
.where('status', '=', 'pending')
.execute();
}
}
private async sendLocalInteractions(userId: string) {
this.debug(`Sending local pending interactions for user ${userId}`);
const workspaceDatabase =
await databaseService.getWorkspaceDatabase(userId);
const credentials = await fetchWorkspaceCredentials(userId);
if (!credentials) {
this.debug(
`No workspace credentials found for user ${userId}, skipping sending local pending interactions`
);
return;
}
if (!serverService.isAvailable(credentials.serverDomain)) {
this.debug(
`Server ${credentials.serverDomain} is not available, skipping sending local pending interactions`
);
return;
}
const cutoff = new Date(Date.now() - 1000 * 60 * 5).toISOString();
let cursor = '0';
let hasMore = true;
while (hasMore) {
const interactionEvents = await workspaceDatabase
.selectFrom('interaction_events')
.selectAll()
.where((eb) =>
eb.or([eb('sent_at', 'is', null), eb('sent_at', '<', cutoff)])
)
.where('event_id', '>', cursor)
.limit(50)
.execute();
if (interactionEvents.length === 0) {
this.debug(
`No local pending interactions found for user ${userId}, stopping sync`
);
hasMore = false;
break;
}
this.debug(
`Sending ${interactionEvents.length} local pending interactions for user ${userId}`
);
const groupedByNodeId: Record<string, SelectInteractionEvent[]> = {};
for (const event of interactionEvents) {
groupedByNodeId[event.node_id] = [
...(groupedByNodeId[event.node_id] ?? []),
event,
];
cursor = event.event_id;
}
const sentEventIds: string[] = [];
for (const [nodeId, events] of Object.entries(groupedByNodeId)) {
if (events.length === 0) {
continue;
}
const firstEvent = events[0];
if (!firstEvent) {
continue;
}
const message: SyncInteractionsMessage = {
type: 'sync_interactions',
nodeId,
nodeType: firstEvent.node_type,
userId: credentials.userId,
events: events.map((e) => ({
attribute: e.attribute,
value: e.value,
createdAt: e.created_at,
})),
};
const sent = socketService.sendMessage(credentials.accountId, message);
if (sent) {
sentEventIds.push(...events.map((e) => e.event_id));
}
}
if (sentEventIds.length > 0) {
this.debug(
`Marking ${sentEventIds.length} local pending interactions as sent for user ${userId}`
);
await workspaceDatabase
.updateTable('interaction_events')
.set({
sent_at: new Date().toISOString(),
sent_count: sql`sent_count + 1`,
})
.where('event_id', 'in', sentEventIds)
.execute();
await workspaceDatabase
.deleteFrom('interaction_events')
.where('sent_count', '>', 20)
.execute();
}
}
}
private async initSyncConsumer(userId: string, type: SyncConsumerType) {
this.debug(
`Initializing sync consumer for user ${userId} with type ${type}`
);
const workspace = await databaseService.appDatabase
.selectFrom('workspaces')
.select(['user_id', 'workspace_id', 'account_id'])
.where('user_id', '=', userId)
.executeTakeFirst();
if (!workspace) {
this.debug(
`No workspace found for user ${userId}, skipping requiring interactions`
);
return;
}
const cursor = await this.fetchCursor(userId, type);
const message: InitSyncConsumerMessage = {
type: 'init_sync_consumer',
userId,
consumerType: type,
cursor: cursor.toString(),
};
socketService.sendMessage(workspace.account_id, message);
}
private async updateCursor(
userId: string,
type: SyncConsumerType,
cursor: bigint
) {
this.debug(`Updating cursor ${type} for user ${userId} to ${cursor}`);
const workspaceDatabase =
await databaseService.getWorkspaceDatabase(userId);
await workspaceDatabase
.insertInto('cursors')
.values({
type,
value: cursor,
created_at: new Date().toISOString(),
})
.onConflict((eb) =>
eb.column('type').doUpdateSet({
value: cursor,
updated_at: new Date().toISOString(),
})
)
.execute();
}
private async fetchCursor(
userId: string,
type: SyncConsumerType
): Promise<bigint> {
const workspaceDatabase =
await databaseService.getWorkspaceDatabase(userId);
const cursor = await workspaceDatabase
.selectFrom('cursors')
.select('value')
.where('type', '=', type)
.executeTakeFirst();
return cursor?.value ?? 0n;
private getConsumerKey(userId: string, type: SyncConsumerType) {
return `${userId}-${type}`;
}
}

View File

@@ -1,4 +1,4 @@
import { LocalTransaction, Node } from '@colanode/core';
import { LocalTransaction, Node, SyncConsumerType } from '@colanode/core';
import { encodeState } from '@colanode/crdt';
import {
DeleteResult,
@@ -135,6 +135,44 @@ export const fetchWorkspaceCredentials = async (
};
};
export const updateCursor = async (
userId: string,
type: SyncConsumerType,
cursor: bigint
) => {
const workspaceDatabase = await databaseService.getWorkspaceDatabase(userId);
await workspaceDatabase
.insertInto('cursors')
.values({
type,
value: cursor,
created_at: new Date().toISOString(),
})
.onConflict((eb) =>
eb.column('type').doUpdateSet({
value: cursor,
updated_at: new Date().toISOString(),
})
)
.execute();
};
export const fetchCursor = async (
userId: string,
type: SyncConsumerType
): Promise<bigint> => {
const workspaceDatabase = await databaseService.getWorkspaceDatabase(userId);
const cursor = await workspaceDatabase
.selectFrom('cursors')
.select('value')
.where('type', '=', type)
.executeTakeFirst();
return cursor?.value ?? 0n;
};
export const mapNode = (row: SelectNode): Node => {
return {
id: row.id,

View File

@@ -36,7 +36,7 @@ clientRouter.post('/v1/accounts/login/google', loginWithGoogleHandler);
clientRouter.post('/v1/accounts/register/email', registerWithEmailHandler);
clientRouter.post('/v1/accounts/logout', authMiddleware, logoutHandler);
clientRouter.delete('/v1/accounts/logout', authMiddleware, logoutHandler);
clientRouter.put(
'/v1/accounts/:accountId',

View File

@@ -55,6 +55,8 @@ export class SocketConnection {
private readonly socket: WebSocket;
private readonly users: Map<string, SocketUser> = new Map();
private readonly pendingUsers: Map<string, Promise<SocketUser | null>> =
new Map();
constructor(account: RequestAccount, socket: WebSocket) {
this.account = account;
@@ -118,7 +120,7 @@ export class SocketConnection {
private async handleInitSyncConsumer(message: InitSyncConsumerMessage) {
this.logger.info(
`Init sync consumer from ${this.account.id} for ${message.consumerType}`
`Init sync consumer from ${this.account.id} and user ${message.userId} for ${message.consumerType}`
);
const user = await this.getOrCreateUser(message.userId);
@@ -166,7 +168,7 @@ export class SocketConnection {
consumer.fetching = true;
this.logger.trace(
`Sending pending node transactions for ${this.account.id} with ${consumer.type}`
`Checking for pending node transactions for ${this.account.id} and user ${user.userId} with cursor ${consumer.cursor}`
);
const unsyncedTransactions = await database
@@ -192,6 +194,9 @@ export class SocketConnection {
.execute();
if (unsyncedTransactions.length === 0) {
this.logger.trace(
`No pending node transactions found for ${this.account.id} and user ${user.userId} with cursor ${consumer.cursor}`
);
consumer.fetching = false;
return;
}
@@ -217,8 +222,7 @@ export class SocketConnection {
consumer.fetching = true;
this.logger.trace(
consumer,
`Sending pending deleted collaborations for ${this.account.id} with ${consumer.type}`
`Checking for pending deleted collaborations for ${this.account.id} and user ${user.userId} with cursor ${consumer.cursor}`
);
const unsyncedDeletedCollaborations = await database
@@ -231,6 +235,9 @@ export class SocketConnection {
.execute();
if (unsyncedDeletedCollaborations.length === 0) {
this.logger.trace(
`No pending deleted collaborations found for ${this.account.id} and user ${user.userId} with cursor ${consumer.cursor}`
);
consumer.fetching = false;
return;
}
@@ -258,8 +265,7 @@ export class SocketConnection {
consumer.fetching = true;
this.logger.trace(
consumer,
`Sending pending collaborations for ${this.account.id} with ${consumer.type}`
`Checking for pending collaborations for ${this.account.id} and user ${user.userId} with cursor ${consumer.cursor}`
);
const unsyncedCollaborations = await database
@@ -272,6 +278,9 @@ export class SocketConnection {
.execute();
if (unsyncedCollaborations.length === 0) {
this.logger.trace(
`No pending collaborations found for ${this.account.id} and user ${user.userId} with cursor ${consumer.cursor}`
);
consumer.fetching = false;
return;
}
@@ -297,8 +306,7 @@ export class SocketConnection {
consumer.fetching = true;
this.logger.trace(
consumer,
`Sending pending interactions for ${this.account.id} with ${consumer.type}`
`Checking for pending interactions for ${this.account.id} and user ${user.userId} with cursor ${consumer.cursor}`
);
const unsyncedInteractions = await database
@@ -324,6 +332,9 @@ export class SocketConnection {
.execute();
if (unsyncedInteractions.length === 0) {
this.logger.trace(
`No pending interactions found for ${this.account.id} and user ${user.userId} with cursor ${consumer.cursor}`
);
consumer.fetching = false;
return;
}
@@ -439,29 +450,48 @@ export class SocketConnection {
}
private async getOrCreateUser(userId: string): Promise<SocketUser | null> {
const socketUser = this.users.get(userId);
if (socketUser) {
return socketUser;
const existingUser = this.users.get(userId);
if (existingUser) {
return existingUser;
}
const pendingUser = this.pendingUsers.get(userId);
if (pendingUser) {
return pendingUser;
}
const userPromise = this.fetchAndCreateUser(userId);
this.pendingUsers.set(userId, userPromise);
try {
const user = await userPromise;
return user;
} finally {
this.pendingUsers.delete(userId);
}
}
private async fetchAndCreateUser(userId: string): Promise<SocketUser | null> {
const workspaceUser = await database
.selectFrom('workspace_users')
.where('id', '=', userId)
.selectAll()
.executeTakeFirst();
if (!workspaceUser) {
if (
!workspaceUser ||
workspaceUser.status !== WorkspaceStatus.Active ||
workspaceUser.account_id !== this.account.id
) {
return null;
}
if (workspaceUser.status !== WorkspaceStatus.Active) {
return null;
}
if (workspaceUser.account_id !== this.account.id) {
return null;
const addedSocketUser = this.users.get(userId);
if (addedSocketUser) {
return addedSocketUser;
}
// Create and store the new SocketUser
const newSocketUser: SocketUser = {
userId,
workspaceId: workspaceUser.workspace_id,