Switch from eventemitter to eventbus

This commit is contained in:
Hakan Shehu
2025-01-17 15:16:57 +01:00
parent 60742806f5
commit 8e64472420
14 changed files with 162 additions and 64 deletions

View File

@@ -4,6 +4,7 @@ import { appService } from '@/main/services/app-service';
import { ServerService } from '@/main/services/server-service';
import { MutationError, MutationErrorCode } from '@/shared/mutations';
import { mapAccount, mapWorkspace } from '@/main/utils';
import { eventBus } from '@/shared/lib/event-bus';
export abstract class AccountMutationHandlerBase {
protected async handleLoginSuccess(
@@ -17,7 +18,7 @@ export abstract class AccountMutationHandlerBase {
id: login.account.id,
email: login.account.email,
name: login.account.name,
server: server.server.domain,
server: server.domain,
token: login.token,
device_id: login.deviceId,
avatar: login.account.avatar,
@@ -34,6 +35,11 @@ export abstract class AccountMutationHandlerBase {
const account = mapAccount(createdAccount);
const accountService = await appService.initAccount(account);
eventBus.publish({
type: 'account_created',
account: account,
});
if (login.workspaces.length === 0) {
return;
}
@@ -58,6 +64,10 @@ export abstract class AccountMutationHandlerBase {
}
await accountService.initWorkspace(mapWorkspace(createdWorkspace));
eventBus.publish({
type: 'workspace_created',
workspace: mapWorkspace(createdWorkspace),
});
}
}
}

View File

@@ -193,6 +193,8 @@ export class FileCreateMutationHandler
throw new Error('Failed to create file.');
}
workspace.mutations.triggerSync();
eventBus.publish({
type: 'file_created',
accountId: workspace.accountId,
@@ -200,8 +202,6 @@ export class FileCreateMutationHandler
file: mapFile(createdFile),
});
workspace.mutations.triggerSync();
eventBus.publish({
type: 'file_state_created',
accountId: workspace.accountId,

View File

@@ -102,6 +102,8 @@ export class FileDeleteMutationHandler
.execute();
});
workspace.mutations.triggerSync();
eventBus.publish({
type: 'file_deleted',
accountId: workspace.accountId,
@@ -109,8 +111,6 @@ export class FileDeleteMutationHandler
file: mapFile(file),
});
workspace.mutations.triggerSync();
return {
success: true,
};

View File

@@ -82,6 +82,8 @@ export class FileDownloadMutationHandler
throw new Error('Failed to create file state.');
}
workspace.files.triggerDownloads();
eventBus.publish({
type: 'file_state_created',
accountId: workspace.accountId,

View File

@@ -292,6 +292,8 @@ export class MessageCreateMutationHandler
};
});
workspace.mutations.triggerSync();
if (createdMessage) {
eventBus.publish({
type: 'message_created',
@@ -323,8 +325,6 @@ export class MessageCreateMutationHandler
}
}
workspace.mutations.triggerSync();
return {
id: messageId,
};

View File

@@ -99,6 +99,8 @@ export class MessageDeleteMutationHandler
.execute();
});
workspace.mutations.triggerSync();
eventBus.publish({
type: 'message_deleted',
accountId: workspace.accountId,
@@ -106,8 +108,6 @@ export class MessageDeleteMutationHandler
message: mapMessage(message),
});
workspace.mutations.triggerSync();
return {
success: true,
};

View File

@@ -2,13 +2,12 @@ import { Message, createDebugger } from '@colanode/core';
import { WebSocket } from 'ws';
import ms from 'ms';
import { EventEmitter } from 'events';
import { BackoffCalculator } from '@/shared/lib/backoff-calculator';
import { AccountService } from '@/main/services/accounts/account-service';
import { EventLoop } from '@/shared/lib/event-loop';
import { eventBus } from '@/shared/lib/event-bus';
export class AccountConnection extends EventEmitter {
export class AccountConnection {
private readonly debug = createDebugger('service:account-connection');
private readonly account: AccountService;
private readonly eventLoop: EventLoop;
@@ -17,9 +16,9 @@ export class AccountConnection extends EventEmitter {
private backoffCalculator: BackoffCalculator;
private closingCount: number;
constructor(accountService: AccountService) {
super();
private eventSubscriptionId: string;
constructor(accountService: AccountService) {
this.account = accountService;
this.socket = null;
this.backoffCalculator = new BackoffCalculator();
@@ -29,15 +28,20 @@ export class AccountConnection extends EventEmitter {
this.checkConnection();
});
this.account.server.on('availability_change', () => {
this.eventLoop.trigger();
this.eventSubscriptionId = eventBus.subscribe((event) => {
if (
event.type === 'server_availability_changed' &&
event.server.domain === this.account.server.domain
) {
this.eventLoop.trigger();
}
});
}
public init(): void {
this.eventLoop.start();
if (!this.account.server.isAvailable()) {
if (!this.account.server.isAvailable) {
return;
}
@@ -66,26 +70,39 @@ export class AccountConnection extends EventEmitter {
`Received message of type ${message.type} for account ${this.account.id}`
);
this.emit('message', message);
eventBus.publish({
type: 'account_connection_message',
accountId: this.account.id,
message,
});
};
this.socket.onopen = () => {
this.debug(`Socket connection for account ${this.account.id} opened`);
this.backoffCalculator.reset();
this.emit('open');
eventBus.publish({
type: 'account_connection_opened',
accountId: this.account.id,
});
};
this.socket.onerror = () => {
this.debug(`Socket connection for account ${this.account.id} errored`);
this.backoffCalculator.increaseError();
this.emit('close');
eventBus.publish({
type: 'account_connection_closed',
accountId: this.account.id,
});
};
this.socket.onclose = () => {
this.debug(`Socket connection for account ${this.account.id} closed`);
this.backoffCalculator.increaseError();
this.emit('close');
eventBus.publish({
type: 'account_connection_closed',
accountId: this.account.id,
});
};
}
@@ -113,13 +130,13 @@ export class AccountConnection extends EventEmitter {
this.socket = null;
}
this.removeAllListeners();
this.eventLoop.stop();
eventBus.unsubscribe(this.eventSubscriptionId);
}
private checkConnection(): void {
this.debug(`Checking connection for account ${this.account.id}`);
if (!this.account.server.isAvailable()) {
if (!this.account.server.isAvailable) {
return;
}

View File

@@ -44,6 +44,7 @@ export class AccountService {
public readonly connection: AccountConnection;
public readonly client: AccountClient;
private readonly eventSubscriptionId: string;
constructor(account: Account, server: ServerService, app: AppService) {
this.debug(`Initializing account service for account ${account.id}`);
@@ -71,15 +72,24 @@ export class AccountService {
this.client = new AccountClient(this);
this.connection = new AccountConnection(this);
this.handleMessage = this.handleMessage.bind(this);
this.connection.on('message', this.handleMessage);
this.triggerSync = this.triggerSync.bind(this);
this.server.on('availability_change', this.triggerSync);
this.eventLoop = new EventLoop(ms('1 minute'), ms('1 second'), () => {
this.sync();
});
this.eventSubscriptionId = eventBus.subscribe((event) => {
if (
event.type === 'server_availability_changed' &&
event.server.domain === this.server.domain &&
event.isAvailable
) {
this.eventLoop.trigger();
} else if (
event.type === 'account_connection_message' &&
event.accountId === this.account.id
) {
this.handleMessage(event.message);
}
});
}
public get id(): string {
@@ -118,19 +128,47 @@ export class AccountService {
public async logout(): Promise<void> {
try {
await this.app.database.transaction().execute(async (tx) => {
const deletedAccount = await tx
.deleteFrom('accounts')
.where('id', '=', this.account.id)
.executeTakeFirst();
if (!deletedAccount) {
throw new Error('Failed to delete account');
}
await tx
.insertInto('deleted_tokens')
.values({
account_id: this.account.id,
token: this.account.token,
server: this.server.domain,
created_at: new Date().toISOString(),
})
.execute();
});
const workspaces = this.workspaces.values();
for (const workspace of workspaces) {
await workspace.delete();
this.workspaces.delete(workspace.id);
}
this.database.destroy();
this.connection.close();
this.eventLoop.stop();
eventBus.unsubscribe(this.eventSubscriptionId);
const accountPath = getAccountDirectoryPath(this.account.id);
if (fs.existsSync(accountPath)) {
fs.rmSync(accountPath, { recursive: true, force: true });
}
eventBus.publish({
type: 'account_deleted',
account: this.account,
});
} catch (error) {
this.debug(`Error logging out of account ${this.account.id}: ${error}`);
}
@@ -192,16 +230,12 @@ export class AccountService {
}
}
private triggerSync(): void {
this.eventLoop.trigger();
}
private async sync(): Promise<void> {
this.debug(`Syncing account ${this.account.id}`);
if (!this.server.isAvailable()) {
if (!this.server.isAvailable) {
this.debug(
`Server ${this.server.server.domain} is not available for syncing account ${this.account.email}`
`Server ${this.server.domain} is not available for syncing account ${this.account.email}`
);
return;
}

View File

@@ -23,6 +23,7 @@ export class AppService {
private readonly servers: Map<string, ServerService> = new Map();
private readonly accounts: Map<string, AccountService> = new Map();
private readonly cleanupEventLoop: EventLoop;
private readonly eventSubscriptionId: string;
public readonly database: Kysely<AppDatabaseSchema>;
public readonly metadata: MetadataService;
@@ -51,6 +52,12 @@ export class AppService {
this.cleanup();
}
);
this.eventSubscriptionId = eventBus.subscribe((event) => {
if (event.type === 'account_deleted') {
this.accounts.delete(event.account.id);
}
});
}
public async migrate(): Promise<void> {

View File

@@ -2,8 +2,6 @@ import { createDebugger, ServerConfig } from '@colanode/core';
import axios from 'axios';
import ms from 'ms';
import { EventEmitter } from 'events';
import { mapServer } from '@/main/utils';
import { eventBus } from '@/shared/lib/event-bus';
import { Server } from '@/shared/types/servers';
@@ -17,7 +15,7 @@ type ServerState = {
count: number;
};
export class ServerService extends EventEmitter {
export class ServerService {
private readonly debug = createDebugger('desktop:service:server');
private readonly appService: AppService;
@@ -29,8 +27,6 @@ export class ServerService extends EventEmitter {
public readonly apiBaseUrl: string;
constructor(appService: AppService, server: Server) {
super();
this.appService = appService;
this.server = server;
this.synapseUrl = ServerService.buildSynapseUrl(server.domain);
@@ -42,10 +38,14 @@ export class ServerService extends EventEmitter {
this.eventLoop.start();
}
public isAvailable() {
public get isAvailable() {
return this.state?.isAvailable ?? false;
}
public get domain() {
return this.server.domain;
}
private async sync() {
const config = await ServerService.fetchServerConfig(this.server.domain);
const existingState = this.state;
@@ -62,7 +62,11 @@ export class ServerService extends EventEmitter {
const wasAvailable = existingState?.isAvailable ?? false;
const isAvailable = newState.isAvailable;
if (wasAvailable !== isAvailable) {
this.emit('availability_change', isAvailable);
eventBus.publish({
type: 'server_availability_changed',
server: this.server,
isAvailable,
});
}
this.debug(

View File

@@ -135,7 +135,7 @@ export class FileService {
}
private async uploadFiles(): Promise<void> {
if (!this.workspace.account.server.isAvailable()) {
if (!this.workspace.account.server.isAvailable) {
return;
}
@@ -341,7 +341,7 @@ export class FileService {
}
public async downloadFiles(): Promise<void> {
if (!this.workspace.account.server.isAvailable()) {
if (!this.workspace.account.server.isAvailable) {
return;
}

View File

@@ -46,7 +46,7 @@ export class MutationService {
}
private async sendMutations(): Promise<boolean> {
if (!this.workspace.account.server.isAvailable()) {
if (!this.workspace.account.server.isAvailable) {
return false;
}

View File

@@ -12,6 +12,7 @@ import ms from 'ms';
import { WorkspaceService } from '@/main/services/workspaces/workspace-service';
import { AccountConnection } from '@/main/services/accounts/account-connection';
import { EventLoop } from '@/shared/lib/event-loop';
import { eventBus } from '@/shared/lib/event-bus';
export type SynchronizerStatus = 'idle' | 'waiting' | 'processing';
@@ -23,6 +24,7 @@ export class Synchronizer<TInput extends SynchronizerInput> {
private readonly connection: AccountConnection;
private readonly cursorKey: string;
private readonly eventLoop: EventLoop;
private readonly eventSubscriptionId: string;
private readonly processor: (
data: SynchronizerMap[TInput['type']]['data']
@@ -49,21 +51,25 @@ export class Synchronizer<TInput extends SynchronizerInput> {
this.ping();
});
this.connection.on('open', () => {
this.eventLoop.trigger();
this.eventSubscriptionId = eventBus.subscribe((event) => {
if (
event.type === 'account_connection_message' &&
event.accountId === this.workspace.account.id
) {
this.handleMessage(event.message);
} else if (
event.type === 'account_connection_opened' &&
event.accountId === this.workspace.account.id
) {
this.eventLoop.trigger();
} else if (
event.type === 'account_connection_closed' &&
event.accountId === this.workspace.account.id
) {
this.eventLoop.stop();
}
});
this.connection.on('close', () => {
this.eventLoop.stop();
});
this.connection.on('error', () => {
this.eventLoop.stop();
});
this.handleMessage = this.handleMessage.bind(this);
this.connection.on('message', this.handleMessage);
this.eventLoop.start();
}
@@ -172,17 +178,16 @@ export class Synchronizer<TInput extends SynchronizerInput> {
public destroy() {
this.eventLoop.stop();
this.connection.off('message', this.handleMessage);
eventBus.unsubscribe(this.eventSubscriptionId);
}
public async delete() {
this.destroy();
await this.workspace.database
.deleteFrom('cursors')
.where('key', '=', this.cursorKey)
.execute();
this.eventLoop.stop();
this.connection.off('message', this.handleMessage);
}
private generateId() {

View File

@@ -1,4 +1,4 @@
import { Entry } from '@colanode/core';
import { Entry, Message } from '@colanode/core';
import { EntryInteraction } from '@/shared/types/entries';
import {
@@ -222,6 +222,22 @@ export type ServerAvailabilityChangedEvent = {
isAvailable: boolean;
};
export type AccountConnectionOpenedEvent = {
type: 'account_connection_opened';
accountId: string;
};
export type AccountConnectionClosedEvent = {
type: 'account_connection_closed';
accountId: string;
};
export type AccountConnectionMessageEvent = {
type: 'account_connection_message';
accountId: string;
message: Message;
};
export type Event =
| UserCreatedEvent
| UserUpdatedEvent
@@ -255,4 +271,7 @@ export type Event =
| RadarDataUpdatedEvent
| ServerAvailabilityChangedEvent
| CollaborationCreatedEvent
| CollaborationDeletedEvent;
| CollaborationDeletedEvent
| AccountConnectionOpenedEvent
| AccountConnectionClosedEvent
| AccountConnectionMessageEvent;