Make server event-bus work with multiple instances

This commit is contained in:
Hakan Shehu
2024-12-02 18:25:03 +01:00
parent 071d15ae72
commit 0c07068341
5 changed files with 59 additions and 9 deletions

8
apps/server/src/host.ts Normal file
View File

@@ -0,0 +1,8 @@
import { generateId, IdType } from '@colanode/core';
type HostEnvironment = 'development' | 'production';
export const host = {
id: generateId(IdType.Host),
environment: (process.env.NODE_ENV as HostEnvironment) || 'development',
};

View File

@@ -1,5 +1,6 @@
import dotenv from 'dotenv';
import { eventBus } from '@/lib/event-bus';
import { initApi } from '@/api';
import { migrate } from '@/data/database';
import { initRedis } from '@/data/redis';
@@ -14,6 +15,8 @@ const init = async () => {
jobService.initQueue();
await jobService.initWorker();
await eventBus.init();
};
init();

View File

@@ -1,4 +1,8 @@
import { Event } from '@/types/events';
import { redis } from '@/data/redis';
import { host } from '@/host';
const CHANNEL_NAME = process.env.REDIS_EVENTS_CHANNEL ?? 'events';
export interface Subscription {
id: string;
@@ -11,14 +15,42 @@ export interface EventBus {
publish(event: Event): void;
}
export type DistributedEventEnvelope = {
event: Event;
hostId: string;
};
export class EventBusService {
private subscriptions: Map<string, Subscription>;
private id = 0;
private initialized = false;
public constructor() {
this.subscriptions = new Map<string, Subscription>();
}
public async init() {
if (this.initialized) {
return;
}
this.initialized = true;
if (host.environment === 'development') {
return;
}
const client = redis.duplicate();
client.subscribe(CHANNEL_NAME, (message) => {
const envelope = JSON.parse(message) as DistributedEventEnvelope;
if (envelope.hostId === host.id) {
return;
}
this.publish(envelope.event);
});
}
public subscribe(callback: (event: Event) => void): string {
const id = (this.id++).toLocaleString();
this.subscriptions.set(id, {
@@ -38,6 +70,10 @@ export class EventBusService {
this.subscriptions.forEach((subscription) => {
subscription.callback(event);
});
if (host.environment === 'production') {
redis.publish(CHANNEL_NAME, JSON.stringify({ event, hostId: host.id }));
}
}
}

View File

@@ -1,5 +1,6 @@
import pino, { Level } from 'pino';
const isDev = true; //process.env.NODE_ENV === 'development';
import { host } from '@/host';
const logConfig: Record<string, Level> = {
api: 'trace',
@@ -11,14 +12,15 @@ class LogService {
return pino({
name,
level: logConfig[name] || 'trace',
transport: isDev
? {
target: 'pino-pretty',
options: {
colorize: true,
},
}
: undefined,
transport:
host.environment === 'development'
? {
target: 'pino-pretty',
options: {
colorize: true,
},
}
: undefined,
});
}
}

View File

@@ -48,6 +48,7 @@ export enum IdType {
Upload = 'up',
Transaction = 'tx',
Event = 'ev',
Host = 'ht',
}
export const generateId = (type: IdType): string => {