mirror of
https://github.com/colanode/colanode.git
synced 2025-12-16 11:47:47 +01:00
Improve sleep scheduler implementation for jobs (#223)
This commit is contained in:
@@ -1,45 +1,63 @@
|
||||
interface SleepState {
|
||||
date: Date;
|
||||
timestamp: number;
|
||||
timeout: NodeJS.Timeout;
|
||||
resolve: () => void;
|
||||
promise: Promise<void>;
|
||||
}
|
||||
|
||||
export class SleepScheduler {
|
||||
private sleepMap = new Map<string, SleepState>();
|
||||
|
||||
public sleepUntil(id: string, date: Date): Promise<void> {
|
||||
if (this.sleepMap.has(id)) {
|
||||
throw new Error(`Sleep already exists for id: ${id}`);
|
||||
public sleepUntil(id: string, timestamp: number): Promise<void> {
|
||||
const now = Date.now();
|
||||
if (timestamp <= now) {
|
||||
return Promise.resolve();
|
||||
}
|
||||
|
||||
return new Promise<void>((resolve) => {
|
||||
const delay = date.getTime() - Date.now();
|
||||
const timeout = setTimeout(() => {
|
||||
this.sleepMap.delete(id);
|
||||
resolve();
|
||||
}, delay);
|
||||
const existing = this.sleepMap.get(id);
|
||||
if (existing) {
|
||||
this.updateResolveTimeIfEarlier(id, timestamp);
|
||||
return existing.promise;
|
||||
}
|
||||
|
||||
this.sleepMap.set(id, {
|
||||
date,
|
||||
timeout,
|
||||
resolve,
|
||||
});
|
||||
});
|
||||
let resolve!: () => void;
|
||||
const promise = new Promise<void>((res) => (resolve = res));
|
||||
|
||||
const state: SleepState = {
|
||||
timestamp,
|
||||
timeout: undefined as unknown as NodeJS.Timeout,
|
||||
resolve,
|
||||
promise,
|
||||
};
|
||||
|
||||
const delay = Math.max(0, timestamp - Date.now());
|
||||
if (delay === 0) {
|
||||
state.resolve();
|
||||
return promise;
|
||||
}
|
||||
|
||||
state.timeout = setTimeout(() => {
|
||||
this.sleepMap.delete(id);
|
||||
state.resolve();
|
||||
}, delay);
|
||||
|
||||
this.sleepMap.set(id, state);
|
||||
return promise;
|
||||
}
|
||||
|
||||
public updateResolveTimeIfEarlier(id: string, date: Date): boolean {
|
||||
public updateResolveTimeIfEarlier(id: string, timestamp: number): boolean {
|
||||
const existingSleep = this.sleepMap.get(id);
|
||||
if (!existingSleep) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (date >= existingSleep.date) {
|
||||
if (timestamp >= existingSleep.timestamp) {
|
||||
return false;
|
||||
}
|
||||
|
||||
clearTimeout(existingSleep.timeout);
|
||||
|
||||
const delay = Math.max(0, date.getTime() - Date.now());
|
||||
const delay = Math.max(0, timestamp - Date.now());
|
||||
if (delay === 0) {
|
||||
this.sleepMap.delete(id);
|
||||
existingSleep.resolve();
|
||||
@@ -51,7 +69,7 @@ export class SleepScheduler {
|
||||
existingSleep.resolve();
|
||||
}, delay);
|
||||
|
||||
existingSleep.date = date;
|
||||
existingSleep.timestamp = timestamp;
|
||||
existingSleep.timeout = newTimeout;
|
||||
|
||||
return true;
|
||||
|
||||
@@ -128,8 +128,8 @@ export class JobService {
|
||||
});
|
||||
|
||||
if (result) {
|
||||
const date = new Date(result.scheduled_at);
|
||||
this.sleepScheduler.updateResolveTimeIfEarlier(JOB_LOOP_ID, date);
|
||||
const timestamp = new Date(result.scheduled_at).getTime();
|
||||
this.sleepScheduler.updateResolveTimeIfEarlier(JOB_LOOP_ID, timestamp);
|
||||
return result;
|
||||
}
|
||||
}
|
||||
@@ -155,8 +155,8 @@ export class JobService {
|
||||
.executeTakeFirst();
|
||||
|
||||
if (job) {
|
||||
const date = new Date(job.scheduled_at);
|
||||
this.sleepScheduler.updateResolveTimeIfEarlier(JOB_LOOP_ID, date);
|
||||
const timestamp = new Date(job.scheduled_at).getTime();
|
||||
this.sleepScheduler.updateResolveTimeIfEarlier(JOB_LOOP_ID, timestamp);
|
||||
}
|
||||
|
||||
return job ?? null;
|
||||
@@ -198,8 +198,11 @@ export class JobService {
|
||||
.executeTakeFirst();
|
||||
|
||||
if (schedule) {
|
||||
const date = new Date(schedule.next_run_at);
|
||||
this.sleepScheduler.updateResolveTimeIfEarlier(SCHEDULE_LOOP_ID, date);
|
||||
const timestamp = new Date(schedule.next_run_at).getTime();
|
||||
this.sleepScheduler.updateResolveTimeIfEarlier(
|
||||
SCHEDULE_LOOP_ID,
|
||||
timestamp
|
||||
);
|
||||
}
|
||||
|
||||
return schedule ?? null;
|
||||
@@ -237,8 +240,8 @@ export class JobService {
|
||||
private async jobLoop() {
|
||||
while (!this.stopped) {
|
||||
if (this.runningJobs >= MAX_CONCURRENCY) {
|
||||
const date = new Date(Date.now() + JOBS_CONCURRENCY_TIMEOUT);
|
||||
await this.sleepScheduler.sleepUntil(JOB_LOOP_ID, date);
|
||||
const timestamp = Date.now() + JOBS_CONCURRENCY_TIMEOUT;
|
||||
await this.sleepScheduler.sleepUntil(JOB_LOOP_ID, timestamp);
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -284,11 +287,13 @@ export class JobService {
|
||||
.executeTakeFirst();
|
||||
|
||||
if (nextScheduledJob) {
|
||||
const date = new Date(nextScheduledJob.scheduled_at);
|
||||
await this.sleepScheduler.sleepUntil(JOB_LOOP_ID, date);
|
||||
const timestamp = new Date(nextScheduledJob.scheduled_at).getTime();
|
||||
await this.sleepScheduler.sleepUntil(JOB_LOOP_ID, timestamp);
|
||||
} else {
|
||||
const date = new Date(now.getTime() + JOBS_MAX_TIMEOUT);
|
||||
await this.sleepScheduler.sleepUntil(JOB_LOOP_ID, date);
|
||||
const timestamp = new Date(
|
||||
now.getTime() + JOBS_MAX_TIMEOUT
|
||||
).getTime();
|
||||
await this.sleepScheduler.sleepUntil(JOB_LOOP_ID, timestamp);
|
||||
}
|
||||
|
||||
continue;
|
||||
@@ -356,11 +361,13 @@ export class JobService {
|
||||
.executeTakeFirst();
|
||||
|
||||
if (nextSchedule) {
|
||||
const date = new Date(nextSchedule.next_run_at);
|
||||
await this.sleepScheduler.sleepUntil(SCHEDULE_LOOP_ID, date);
|
||||
const timestamp = new Date(nextSchedule.next_run_at).getTime();
|
||||
await this.sleepScheduler.sleepUntil(SCHEDULE_LOOP_ID, timestamp);
|
||||
} else {
|
||||
const date = new Date(now.getTime() + SCHEDULES_MAX_TIMEOUT);
|
||||
await this.sleepScheduler.sleepUntil(SCHEDULE_LOOP_ID, date);
|
||||
const timestamp = new Date(
|
||||
now.getTime() + SCHEDULES_MAX_TIMEOUT
|
||||
).getTime();
|
||||
await this.sleepScheduler.sleepUntil(SCHEDULE_LOOP_ID, timestamp);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user