From 8e644724205f96b892c023aed4853c201e35145d Mon Sep 17 00:00:00 2001 From: Hakan Shehu Date: Fri, 17 Jan 2025 15:16:57 +0100 Subject: [PATCH] Switch from eventemitter to eventbus --- .../src/main/mutations/accounts/base.ts | 12 +++- .../src/main/mutations/files/file-create.ts | 4 +- .../src/main/mutations/files/file-delete.ts | 4 +- .../src/main/mutations/files/file-download.ts | 2 + .../main/mutations/messages/message-create.ts | 4 +- .../main/mutations/messages/message-delete.ts | 4 +- .../services/accounts/account-connection.ts | 45 +++++++++----- .../main/services/accounts/account-service.ts | 58 +++++++++++++++---- apps/desktop/src/main/services/app-service.ts | 7 +++ .../src/main/services/server-service.ts | 18 +++--- .../main/services/workspaces/file-service.ts | 4 +- .../services/workspaces/mutation-service.ts | 2 +- .../main/services/workspaces/synchronizer.ts | 39 +++++++------ apps/desktop/src/shared/types/events.ts | 23 +++++++- 14 files changed, 162 insertions(+), 64 deletions(-) diff --git a/apps/desktop/src/main/mutations/accounts/base.ts b/apps/desktop/src/main/mutations/accounts/base.ts index 28d884d6..e73e561f 100644 --- a/apps/desktop/src/main/mutations/accounts/base.ts +++ b/apps/desktop/src/main/mutations/accounts/base.ts @@ -4,6 +4,7 @@ import { appService } from '@/main/services/app-service'; import { ServerService } from '@/main/services/server-service'; import { MutationError, MutationErrorCode } from '@/shared/mutations'; import { mapAccount, mapWorkspace } from '@/main/utils'; +import { eventBus } from '@/shared/lib/event-bus'; export abstract class AccountMutationHandlerBase { protected async handleLoginSuccess( @@ -17,7 +18,7 @@ export abstract class AccountMutationHandlerBase { id: login.account.id, email: login.account.email, name: login.account.name, - server: server.server.domain, + server: server.domain, token: login.token, device_id: login.deviceId, avatar: login.account.avatar, @@ -34,6 +35,11 @@ export abstract class AccountMutationHandlerBase { const account = mapAccount(createdAccount); const accountService = await appService.initAccount(account); + eventBus.publish({ + type: 'account_created', + account: account, + }); + if (login.workspaces.length === 0) { return; } @@ -58,6 +64,10 @@ export abstract class AccountMutationHandlerBase { } await accountService.initWorkspace(mapWorkspace(createdWorkspace)); + eventBus.publish({ + type: 'workspace_created', + workspace: mapWorkspace(createdWorkspace), + }); } } } diff --git a/apps/desktop/src/main/mutations/files/file-create.ts b/apps/desktop/src/main/mutations/files/file-create.ts index dcf4aed3..9e50a485 100644 --- a/apps/desktop/src/main/mutations/files/file-create.ts +++ b/apps/desktop/src/main/mutations/files/file-create.ts @@ -193,6 +193,8 @@ export class FileCreateMutationHandler throw new Error('Failed to create file.'); } + workspace.mutations.triggerSync(); + eventBus.publish({ type: 'file_created', accountId: workspace.accountId, @@ -200,8 +202,6 @@ export class FileCreateMutationHandler file: mapFile(createdFile), }); - workspace.mutations.triggerSync(); - eventBus.publish({ type: 'file_state_created', accountId: workspace.accountId, diff --git a/apps/desktop/src/main/mutations/files/file-delete.ts b/apps/desktop/src/main/mutations/files/file-delete.ts index 8d5d3df9..af55d80b 100644 --- a/apps/desktop/src/main/mutations/files/file-delete.ts +++ b/apps/desktop/src/main/mutations/files/file-delete.ts @@ -102,6 +102,8 @@ export class FileDeleteMutationHandler .execute(); }); + workspace.mutations.triggerSync(); + eventBus.publish({ type: 'file_deleted', accountId: workspace.accountId, @@ -109,8 +111,6 @@ export class FileDeleteMutationHandler file: mapFile(file), }); - workspace.mutations.triggerSync(); - return { success: true, }; diff --git a/apps/desktop/src/main/mutations/files/file-download.ts b/apps/desktop/src/main/mutations/files/file-download.ts index 5511d71b..63787f20 100644 --- a/apps/desktop/src/main/mutations/files/file-download.ts +++ b/apps/desktop/src/main/mutations/files/file-download.ts @@ -82,6 +82,8 @@ export class FileDownloadMutationHandler throw new Error('Failed to create file state.'); } + workspace.files.triggerDownloads(); + eventBus.publish({ type: 'file_state_created', accountId: workspace.accountId, diff --git a/apps/desktop/src/main/mutations/messages/message-create.ts b/apps/desktop/src/main/mutations/messages/message-create.ts index 7ebc9c39..5945b645 100644 --- a/apps/desktop/src/main/mutations/messages/message-create.ts +++ b/apps/desktop/src/main/mutations/messages/message-create.ts @@ -292,6 +292,8 @@ export class MessageCreateMutationHandler }; }); + workspace.mutations.triggerSync(); + if (createdMessage) { eventBus.publish({ type: 'message_created', @@ -323,8 +325,6 @@ export class MessageCreateMutationHandler } } - workspace.mutations.triggerSync(); - return { id: messageId, }; diff --git a/apps/desktop/src/main/mutations/messages/message-delete.ts b/apps/desktop/src/main/mutations/messages/message-delete.ts index f9f16130..211b9df5 100644 --- a/apps/desktop/src/main/mutations/messages/message-delete.ts +++ b/apps/desktop/src/main/mutations/messages/message-delete.ts @@ -99,6 +99,8 @@ export class MessageDeleteMutationHandler .execute(); }); + workspace.mutations.triggerSync(); + eventBus.publish({ type: 'message_deleted', accountId: workspace.accountId, @@ -106,8 +108,6 @@ export class MessageDeleteMutationHandler message: mapMessage(message), }); - workspace.mutations.triggerSync(); - return { success: true, }; diff --git a/apps/desktop/src/main/services/accounts/account-connection.ts b/apps/desktop/src/main/services/accounts/account-connection.ts index 44c09f07..4e07bae9 100644 --- a/apps/desktop/src/main/services/accounts/account-connection.ts +++ b/apps/desktop/src/main/services/accounts/account-connection.ts @@ -2,13 +2,12 @@ import { Message, createDebugger } from '@colanode/core'; import { WebSocket } from 'ws'; import ms from 'ms'; -import { EventEmitter } from 'events'; - import { BackoffCalculator } from '@/shared/lib/backoff-calculator'; import { AccountService } from '@/main/services/accounts/account-service'; import { EventLoop } from '@/shared/lib/event-loop'; +import { eventBus } from '@/shared/lib/event-bus'; -export class AccountConnection extends EventEmitter { +export class AccountConnection { private readonly debug = createDebugger('service:account-connection'); private readonly account: AccountService; private readonly eventLoop: EventLoop; @@ -17,9 +16,9 @@ export class AccountConnection extends EventEmitter { private backoffCalculator: BackoffCalculator; private closingCount: number; - constructor(accountService: AccountService) { - super(); + private eventSubscriptionId: string; + constructor(accountService: AccountService) { this.account = accountService; this.socket = null; this.backoffCalculator = new BackoffCalculator(); @@ -29,15 +28,20 @@ export class AccountConnection extends EventEmitter { this.checkConnection(); }); - this.account.server.on('availability_change', () => { - this.eventLoop.trigger(); + this.eventSubscriptionId = eventBus.subscribe((event) => { + if ( + event.type === 'server_availability_changed' && + event.server.domain === this.account.server.domain + ) { + this.eventLoop.trigger(); + } }); } public init(): void { this.eventLoop.start(); - if (!this.account.server.isAvailable()) { + if (!this.account.server.isAvailable) { return; } @@ -66,26 +70,39 @@ export class AccountConnection extends EventEmitter { `Received message of type ${message.type} for account ${this.account.id}` ); - this.emit('message', message); + eventBus.publish({ + type: 'account_connection_message', + accountId: this.account.id, + message, + }); }; this.socket.onopen = () => { this.debug(`Socket connection for account ${this.account.id} opened`); this.backoffCalculator.reset(); - this.emit('open'); + eventBus.publish({ + type: 'account_connection_opened', + accountId: this.account.id, + }); }; this.socket.onerror = () => { this.debug(`Socket connection for account ${this.account.id} errored`); this.backoffCalculator.increaseError(); - this.emit('close'); + eventBus.publish({ + type: 'account_connection_closed', + accountId: this.account.id, + }); }; this.socket.onclose = () => { this.debug(`Socket connection for account ${this.account.id} closed`); this.backoffCalculator.increaseError(); - this.emit('close'); + eventBus.publish({ + type: 'account_connection_closed', + accountId: this.account.id, + }); }; } @@ -113,13 +130,13 @@ export class AccountConnection extends EventEmitter { this.socket = null; } - this.removeAllListeners(); this.eventLoop.stop(); + eventBus.unsubscribe(this.eventSubscriptionId); } private checkConnection(): void { this.debug(`Checking connection for account ${this.account.id}`); - if (!this.account.server.isAvailable()) { + if (!this.account.server.isAvailable) { return; } diff --git a/apps/desktop/src/main/services/accounts/account-service.ts b/apps/desktop/src/main/services/accounts/account-service.ts index fd198966..5d642366 100644 --- a/apps/desktop/src/main/services/accounts/account-service.ts +++ b/apps/desktop/src/main/services/accounts/account-service.ts @@ -44,6 +44,7 @@ export class AccountService { public readonly connection: AccountConnection; public readonly client: AccountClient; + private readonly eventSubscriptionId: string; constructor(account: Account, server: ServerService, app: AppService) { this.debug(`Initializing account service for account ${account.id}`); @@ -71,15 +72,24 @@ export class AccountService { this.client = new AccountClient(this); this.connection = new AccountConnection(this); - this.handleMessage = this.handleMessage.bind(this); - this.connection.on('message', this.handleMessage); - - this.triggerSync = this.triggerSync.bind(this); - this.server.on('availability_change', this.triggerSync); - this.eventLoop = new EventLoop(ms('1 minute'), ms('1 second'), () => { this.sync(); }); + + this.eventSubscriptionId = eventBus.subscribe((event) => { + if ( + event.type === 'server_availability_changed' && + event.server.domain === this.server.domain && + event.isAvailable + ) { + this.eventLoop.trigger(); + } else if ( + event.type === 'account_connection_message' && + event.accountId === this.account.id + ) { + this.handleMessage(event.message); + } + }); } public get id(): string { @@ -118,19 +128,47 @@ export class AccountService { public async logout(): Promise { try { + await this.app.database.transaction().execute(async (tx) => { + const deletedAccount = await tx + .deleteFrom('accounts') + .where('id', '=', this.account.id) + .executeTakeFirst(); + + if (!deletedAccount) { + throw new Error('Failed to delete account'); + } + + await tx + .insertInto('deleted_tokens') + .values({ + account_id: this.account.id, + token: this.account.token, + server: this.server.domain, + created_at: new Date().toISOString(), + }) + .execute(); + }); + const workspaces = this.workspaces.values(); for (const workspace of workspaces) { await workspace.delete(); + this.workspaces.delete(workspace.id); } this.database.destroy(); this.connection.close(); this.eventLoop.stop(); + eventBus.unsubscribe(this.eventSubscriptionId); const accountPath = getAccountDirectoryPath(this.account.id); if (fs.existsSync(accountPath)) { fs.rmSync(accountPath, { recursive: true, force: true }); } + + eventBus.publish({ + type: 'account_deleted', + account: this.account, + }); } catch (error) { this.debug(`Error logging out of account ${this.account.id}: ${error}`); } @@ -192,16 +230,12 @@ export class AccountService { } } - private triggerSync(): void { - this.eventLoop.trigger(); - } - private async sync(): Promise { this.debug(`Syncing account ${this.account.id}`); - if (!this.server.isAvailable()) { + if (!this.server.isAvailable) { this.debug( - `Server ${this.server.server.domain} is not available for syncing account ${this.account.email}` + `Server ${this.server.domain} is not available for syncing account ${this.account.email}` ); return; } diff --git a/apps/desktop/src/main/services/app-service.ts b/apps/desktop/src/main/services/app-service.ts index 1db63259..a748414e 100644 --- a/apps/desktop/src/main/services/app-service.ts +++ b/apps/desktop/src/main/services/app-service.ts @@ -23,6 +23,7 @@ export class AppService { private readonly servers: Map = new Map(); private readonly accounts: Map = new Map(); private readonly cleanupEventLoop: EventLoop; + private readonly eventSubscriptionId: string; public readonly database: Kysely; public readonly metadata: MetadataService; @@ -51,6 +52,12 @@ export class AppService { this.cleanup(); } ); + + this.eventSubscriptionId = eventBus.subscribe((event) => { + if (event.type === 'account_deleted') { + this.accounts.delete(event.account.id); + } + }); } public async migrate(): Promise { diff --git a/apps/desktop/src/main/services/server-service.ts b/apps/desktop/src/main/services/server-service.ts index 2c6ff304..cc3c6d69 100644 --- a/apps/desktop/src/main/services/server-service.ts +++ b/apps/desktop/src/main/services/server-service.ts @@ -2,8 +2,6 @@ import { createDebugger, ServerConfig } from '@colanode/core'; import axios from 'axios'; import ms from 'ms'; -import { EventEmitter } from 'events'; - import { mapServer } from '@/main/utils'; import { eventBus } from '@/shared/lib/event-bus'; import { Server } from '@/shared/types/servers'; @@ -17,7 +15,7 @@ type ServerState = { count: number; }; -export class ServerService extends EventEmitter { +export class ServerService { private readonly debug = createDebugger('desktop:service:server'); private readonly appService: AppService; @@ -29,8 +27,6 @@ export class ServerService extends EventEmitter { public readonly apiBaseUrl: string; constructor(appService: AppService, server: Server) { - super(); - this.appService = appService; this.server = server; this.synapseUrl = ServerService.buildSynapseUrl(server.domain); @@ -42,10 +38,14 @@ export class ServerService extends EventEmitter { this.eventLoop.start(); } - public isAvailable() { + public get isAvailable() { return this.state?.isAvailable ?? false; } + public get domain() { + return this.server.domain; + } + private async sync() { const config = await ServerService.fetchServerConfig(this.server.domain); const existingState = this.state; @@ -62,7 +62,11 @@ export class ServerService extends EventEmitter { const wasAvailable = existingState?.isAvailable ?? false; const isAvailable = newState.isAvailable; if (wasAvailable !== isAvailable) { - this.emit('availability_change', isAvailable); + eventBus.publish({ + type: 'server_availability_changed', + server: this.server, + isAvailable, + }); } this.debug( diff --git a/apps/desktop/src/main/services/workspaces/file-service.ts b/apps/desktop/src/main/services/workspaces/file-service.ts index d74d9dd9..d7cabb77 100644 --- a/apps/desktop/src/main/services/workspaces/file-service.ts +++ b/apps/desktop/src/main/services/workspaces/file-service.ts @@ -135,7 +135,7 @@ export class FileService { } private async uploadFiles(): Promise { - if (!this.workspace.account.server.isAvailable()) { + if (!this.workspace.account.server.isAvailable) { return; } @@ -341,7 +341,7 @@ export class FileService { } public async downloadFiles(): Promise { - if (!this.workspace.account.server.isAvailable()) { + if (!this.workspace.account.server.isAvailable) { return; } diff --git a/apps/desktop/src/main/services/workspaces/mutation-service.ts b/apps/desktop/src/main/services/workspaces/mutation-service.ts index c6eedf2b..7c14a33b 100644 --- a/apps/desktop/src/main/services/workspaces/mutation-service.ts +++ b/apps/desktop/src/main/services/workspaces/mutation-service.ts @@ -46,7 +46,7 @@ export class MutationService { } private async sendMutations(): Promise { - if (!this.workspace.account.server.isAvailable()) { + if (!this.workspace.account.server.isAvailable) { return false; } diff --git a/apps/desktop/src/main/services/workspaces/synchronizer.ts b/apps/desktop/src/main/services/workspaces/synchronizer.ts index e31ad9e6..77d251c6 100644 --- a/apps/desktop/src/main/services/workspaces/synchronizer.ts +++ b/apps/desktop/src/main/services/workspaces/synchronizer.ts @@ -12,6 +12,7 @@ import ms from 'ms'; import { WorkspaceService } from '@/main/services/workspaces/workspace-service'; import { AccountConnection } from '@/main/services/accounts/account-connection'; import { EventLoop } from '@/shared/lib/event-loop'; +import { eventBus } from '@/shared/lib/event-bus'; export type SynchronizerStatus = 'idle' | 'waiting' | 'processing'; @@ -23,6 +24,7 @@ export class Synchronizer { private readonly connection: AccountConnection; private readonly cursorKey: string; private readonly eventLoop: EventLoop; + private readonly eventSubscriptionId: string; private readonly processor: ( data: SynchronizerMap[TInput['type']]['data'] @@ -49,21 +51,25 @@ export class Synchronizer { this.ping(); }); - this.connection.on('open', () => { - this.eventLoop.trigger(); + this.eventSubscriptionId = eventBus.subscribe((event) => { + if ( + event.type === 'account_connection_message' && + event.accountId === this.workspace.account.id + ) { + this.handleMessage(event.message); + } else if ( + event.type === 'account_connection_opened' && + event.accountId === this.workspace.account.id + ) { + this.eventLoop.trigger(); + } else if ( + event.type === 'account_connection_closed' && + event.accountId === this.workspace.account.id + ) { + this.eventLoop.stop(); + } }); - this.connection.on('close', () => { - this.eventLoop.stop(); - }); - - this.connection.on('error', () => { - this.eventLoop.stop(); - }); - - this.handleMessage = this.handleMessage.bind(this); - this.connection.on('message', this.handleMessage); - this.eventLoop.start(); } @@ -172,17 +178,16 @@ export class Synchronizer { public destroy() { this.eventLoop.stop(); - this.connection.off('message', this.handleMessage); + eventBus.unsubscribe(this.eventSubscriptionId); } public async delete() { + this.destroy(); + await this.workspace.database .deleteFrom('cursors') .where('key', '=', this.cursorKey) .execute(); - - this.eventLoop.stop(); - this.connection.off('message', this.handleMessage); } private generateId() { diff --git a/apps/desktop/src/shared/types/events.ts b/apps/desktop/src/shared/types/events.ts index 20065460..1a2b9d86 100644 --- a/apps/desktop/src/shared/types/events.ts +++ b/apps/desktop/src/shared/types/events.ts @@ -1,4 +1,4 @@ -import { Entry } from '@colanode/core'; +import { Entry, Message } from '@colanode/core'; import { EntryInteraction } from '@/shared/types/entries'; import { @@ -222,6 +222,22 @@ export type ServerAvailabilityChangedEvent = { isAvailable: boolean; }; +export type AccountConnectionOpenedEvent = { + type: 'account_connection_opened'; + accountId: string; +}; + +export type AccountConnectionClosedEvent = { + type: 'account_connection_closed'; + accountId: string; +}; + +export type AccountConnectionMessageEvent = { + type: 'account_connection_message'; + accountId: string; + message: Message; +}; + export type Event = | UserCreatedEvent | UserUpdatedEvent @@ -255,4 +271,7 @@ export type Event = | RadarDataUpdatedEvent | ServerAvailabilityChangedEvent | CollaborationCreatedEvent - | CollaborationDeletedEvent; + | CollaborationDeletedEvent + | AccountConnectionOpenedEvent + | AccountConnectionClosedEvent + | AccountConnectionMessageEvent;