Improvements in event loop and some syncs

This commit is contained in:
Hakan Shehu
2025-01-18 01:27:09 +01:00
parent 8587ebf704
commit 6ffa9c96e8
10 changed files with 31 additions and 19 deletions

View File

@@ -3,13 +3,17 @@ type EventLoopStatus = 'idle' | 'scheduled' | 'processing';
export class EventLoop {
private readonly interval: number;
private readonly debounce: number;
private readonly callback: () => void;
private readonly callback: () => void | Promise<void>;
private timeout: NodeJS.Timeout | null;
private status: EventLoopStatus = 'idle';
private triggered: boolean = false;
constructor(interval: number, debounce: number, callback: () => void) {
constructor(
interval: number,
debounce: number,
callback: () => void | Promise<void>
) {
this.interval = interval;
this.debounce = debounce;
@@ -47,14 +51,22 @@ export class EventLoop {
this.start();
}
private execute(): void {
private async execute(): Promise<void> {
if (this.status !== 'scheduled') {
return;
}
this.status = 'processing';
this.callback();
try {
await Promise.resolve(this.callback());
} catch (error) {
console.error('Callback execution failed:', error);
}
if (this.status !== 'processing') {
return;
}
const timeout = this.triggered ? this.debounce : this.interval;
this.timeout = setTimeout(() => {

View File

@@ -8,7 +8,7 @@ import axios, {
import { createDebugger } from '@colanode/core';
import { AccountService } from '@/main/services/accounts/account-service';
import { BackoffCalculator } from '@/shared/lib/backoff-calculator';
import { BackoffCalculator } from '@/main/lib/backoff-calculator';
export class AccountClient {
private readonly debug = createDebugger('desktop:service:account');

View File

@@ -2,9 +2,9 @@ import { Message, createDebugger } from '@colanode/core';
import { WebSocket } from 'ws';
import ms from 'ms';
import { BackoffCalculator } from '@/shared/lib/backoff-calculator';
import { BackoffCalculator } from '@/main/lib/backoff-calculator';
import { AccountService } from '@/main/services/accounts/account-service';
import { EventLoop } from '@/shared/lib/event-loop';
import { EventLoop } from '@/main/lib/event-loop';
import { eventBus } from '@/shared/lib/event-bus';
export class AccountConnection {

View File

@@ -30,7 +30,7 @@ import { eventBus } from '@/shared/lib/event-bus';
import { parseApiError } from '@/shared/lib/axios';
import { Account } from '@/shared/types/accounts';
import { Workspace } from '@/shared/types/workspaces';
import { EventLoop } from '@/shared/lib/event-loop';
import { EventLoop } from '@/main/lib/event-loop';
export class AccountService {
private readonly debug = createDebugger('desktop:service:account');

View File

@@ -13,7 +13,7 @@ import { AccountService } from '@/main/services/accounts/account-service';
import { ServerService } from '@/main/services/server-service';
import { Account } from '@/shared/types/accounts';
import { Server } from '@/shared/types/servers';
import { EventLoop } from '@/shared/lib/event-loop';
import { EventLoop } from '@/main/lib/event-loop';
import { parseApiError } from '@/shared/lib/axios';
import { NotificationService } from '@/main/services/notification-service';
import { eventBus } from '@/shared/lib/event-bus';

View File

@@ -6,7 +6,7 @@ import { mapServer } from '@/main/utils';
import { eventBus } from '@/shared/lib/event-bus';
import { Server } from '@/shared/types/servers';
import { AppService } from '@/main/services/app-service';
import { EventLoop } from '@/shared/lib/event-loop';
import { EventLoop } from '@/main/lib/event-loop';
type ServerState = {
isAvailable: boolean;

View File

@@ -24,7 +24,7 @@ import {
import { eventBus } from '@/shared/lib/event-bus';
import { DownloadStatus, UploadStatus } from '@/shared/types/files';
import { WorkspaceService } from '@/main/services/workspaces/workspace-service';
import { EventLoop } from '@/shared/lib/event-loop';
import { EventLoop } from '@/main/lib/event-loop';
import { SelectFileState } from '@/main/databases/workspace';
const UPLOAD_RETRIES_LIMIT = 10;

View File

@@ -3,7 +3,7 @@ import ms from 'ms';
import { WorkspaceService } from '@/main/services/workspaces/workspace-service';
import { mapMutation } from '@/main/utils';
import { EventLoop } from '@/shared/lib/event-loop';
import { EventLoop } from '@/main/lib/event-loop';
const READ_SIZE = 500;
const BATCH_SIZE = 50;
@@ -16,7 +16,7 @@ export class MutationService {
constructor(workspaceService: WorkspaceService) {
this.workspace = workspaceService;
this.eventLoop = new EventLoop(ms('1 minute'), ms('1 second'), () => {
this.eventLoop = new EventLoop(ms('1 minute'), 100, () => {
this.sync();
});
@@ -33,10 +33,10 @@ export class MutationService {
private async sync(): Promise<void> {
try {
let hasMore = true;
let hasMutations = true;
while (hasMore) {
hasMore = await this.sendMutations();
while (hasMutations) {
hasMutations = await this.sendMutations();
}
await this.revertInvalidMutations();
@@ -120,7 +120,7 @@ export class MutationService {
);
}
return unsyncedMutations.length === READ_SIZE;
return unsyncedMutations.length > 0;
}
private async revertInvalidMutations(): Promise<void> {

View File

@@ -11,7 +11,7 @@ import ms from 'ms';
import { WorkspaceService } from '@/main/services/workspaces/workspace-service';
import { AccountConnection } from '@/main/services/accounts/account-connection';
import { EventLoop } from '@/shared/lib/event-loop';
import { EventLoop } from '@/main/lib/event-loop';
import { eventBus } from '@/shared/lib/event-bus';
export type SynchronizerStatus = 'idle' | 'waiting' | 'processing';
@@ -112,7 +112,7 @@ export class Synchronizer<TInput extends SynchronizerInput> {
lastCursor = BigInt(item.cursor);
}
} catch (error) {
this.debug('Error consuming items', error);
this.debug(`Error consuming items: ${error}`);
} finally {
if (lastCursor !== null) {
this.cursor = lastCursor;