mirror of
https://github.com/colanode/colanode.git
synced 2025-12-16 11:47:47 +01:00
Fix pub/sub communication for event-bus in server
This commit is contained in:
@@ -10,9 +10,12 @@ export interface Configuration {
|
||||
ai: AiConfiguration;
|
||||
}
|
||||
|
||||
export type ServerMode = 'standalone' | 'cluster';
|
||||
|
||||
export interface ServerConfiguration {
|
||||
name: string;
|
||||
avatar: string;
|
||||
mode: ServerMode;
|
||||
}
|
||||
|
||||
export type AccountVerificationType = 'automatic' | 'manual' | 'email';
|
||||
@@ -105,6 +108,7 @@ export const configuration: Configuration = {
|
||||
server: {
|
||||
name: getRequiredEnv('SERVER_NAME'),
|
||||
avatar: getOptionalEnv('SERVER_AVATAR') || '',
|
||||
mode: (getOptionalEnv('SERVER_MODE') as ServerMode) || 'standalone',
|
||||
},
|
||||
account: {
|
||||
verificationType:
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import { generateId, IdType } from '@colanode/core';
|
||||
|
||||
import { Event } from '@/types/events';
|
||||
import { redis } from '@/data/redis';
|
||||
import { host } from '@/host';
|
||||
import { configuration } from '@/lib/configuration';
|
||||
|
||||
export interface Subscription {
|
||||
@@ -20,12 +21,14 @@ export type DistributedEventEnvelope = {
|
||||
};
|
||||
|
||||
export class EventBusService {
|
||||
private subscriptions: Map<string, Subscription>;
|
||||
private id = 0;
|
||||
private readonly subscriptions: Map<string, Subscription>;
|
||||
private readonly hostId: string;
|
||||
private subscriberId = 0;
|
||||
private initialized = false;
|
||||
|
||||
public constructor() {
|
||||
this.subscriptions = new Map<string, Subscription>();
|
||||
this.hostId = generateId(IdType.Host);
|
||||
}
|
||||
|
||||
public async init() {
|
||||
@@ -35,14 +38,16 @@ export class EventBusService {
|
||||
|
||||
this.initialized = true;
|
||||
|
||||
if (host.environment === 'development') {
|
||||
if (configuration.server.mode === 'standalone') {
|
||||
return;
|
||||
}
|
||||
|
||||
const client = redis.duplicate();
|
||||
await client.connect();
|
||||
|
||||
client.subscribe(configuration.redis.eventsChannel, (message) => {
|
||||
const envelope = JSON.parse(message) as DistributedEventEnvelope;
|
||||
if (envelope.hostId === host.id) {
|
||||
if (envelope.hostId === this.hostId) {
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -51,7 +56,7 @@ export class EventBusService {
|
||||
}
|
||||
|
||||
public subscribe(callback: (event: Event) => void): string {
|
||||
const id = (this.id++).toLocaleString();
|
||||
const id = (this.subscriberId++).toLocaleString();
|
||||
this.subscriptions.set(id, {
|
||||
callback,
|
||||
id,
|
||||
@@ -68,12 +73,14 @@ export class EventBusService {
|
||||
public publish(event: Event) {
|
||||
this.processEvent(event);
|
||||
|
||||
if (host.environment === 'production') {
|
||||
redis.publish(
|
||||
configuration.redis.eventsChannel,
|
||||
JSON.stringify({ event, hostId: host.id })
|
||||
);
|
||||
if (configuration.server.mode === 'standalone') {
|
||||
return;
|
||||
}
|
||||
|
||||
redis.publish(
|
||||
configuration.redis.eventsChannel,
|
||||
JSON.stringify({ event, hostId: this.hostId })
|
||||
);
|
||||
}
|
||||
|
||||
private processEvent(event: Event) {
|
||||
|
||||
@@ -64,6 +64,8 @@ services:
|
||||
# The server requires a name and avatar URL which will be displayed in the desktop app login screen.
|
||||
SERVER_NAME: 'Colanode'
|
||||
SERVER_AVATAR: ''
|
||||
# Possible values for SERVER_MODE: 'standalone', 'cluster'
|
||||
SERVER_MODE: 'standalone'
|
||||
|
||||
# ───────────────────────────────────────────────────────────────
|
||||
# Account Configuration
|
||||
|
||||
Reference in New Issue
Block a user