Refactor jobs abstraction

This commit is contained in:
Hakan Shehu
2024-11-27 11:20:29 +01:00
parent e4dbb56d13
commit c76af35e0d
11 changed files with 173 additions and 116 deletions

View File

@@ -1,7 +1,7 @@
import { initApi } from '@/api';
import { initRedis } from '@/data/redis';
import { migrate } from '@/data/database';
import { initTaskWorker } from '@/queues/tasks';
import { jobService } from '@/services/job-service';
import dotenv from 'dotenv';
dotenv.config();
@@ -11,7 +11,8 @@ const init = async () => {
await initRedis();
await initApi();
initTaskWorker();
jobService.initQueue();
await jobService.initWorker();
};
init();

View File

@@ -0,0 +1,20 @@
import { JobHandler } from '@/types/jobs';
export type CleanWorkspaceDataInput = {
type: 'clean_workspace_data';
workspaceId: string;
};
declare module '@/types/jobs' {
interface JobMap {
clean_workspace_data: {
input: CleanWorkspaceDataInput;
};
}
}
export const cleanWorkspaceDataHandler: JobHandler<
CleanWorkspaceDataInput
> = async (input) => {
console.log(input);
};

View File

@@ -0,0 +1,14 @@
import { JobHandler } from '@/types/jobs';
import { JobMap } from '@/types/jobs';
import { sendEmailHandler } from '@/jobs/send-email';
import { cleanWorkspaceDataHandler } from '@/jobs/clean-workspace-data';
type JobHandlerMap = {
[K in keyof JobMap]: JobHandler<JobMap[K]['input']>;
};
export const jobHandlerMap: JobHandlerMap = {
send_email: sendEmailHandler,
clean_workspace_data: cleanWorkspaceDataHandler,
};

View File

@@ -0,0 +1,22 @@
import { emailService } from '@/services/email-service';
import { JobHandler } from '@/types/jobs';
export type SendEmailInput = {
type: 'send_email';
to: string | string[];
subject: string;
text?: string;
html?: string;
};
declare module '@/types/jobs' {
interface JobMap {
send_email: {
input: SendEmailInput;
};
}
}
export const sendEmailHandler: JobHandler<SendEmailInput> = async (input) => {
await emailService.sendEmail(input);
};

View File

@@ -1,44 +0,0 @@
import { redisConfig } from '@/data/redis';
import { SendEmailTask, Task } from '@/types/tasks';
import { Job, Queue, Worker } from 'bullmq';
import { sendEmail } from '@/services/email';
const taskQueue = new Queue('tasks', {
connection: {
host: redisConfig.host,
password: redisConfig.password,
port: redisConfig.port,
db: redisConfig.db,
},
defaultJobOptions: {
removeOnComplete: true,
},
});
export const enqueueTask = async (task: Task): Promise<void> => {
await taskQueue.add('task', task);
};
export const initTaskWorker = () => {
return new Worker('tasks', handleTaskJob, {
connection: {
host: redisConfig.host,
password: redisConfig.password,
port: redisConfig.port,
db: redisConfig.db,
},
});
};
const handleTaskJob = async (job: Job) => {
const task = job.data as Task;
switch (task.type) {
case 'send_email':
return handleSendEmailTask(task);
}
};
const handleSendEmailTask = async (task: SendEmailTask): Promise<void> => {
await sendEmail(task.message);
};

View File

@@ -0,0 +1,51 @@
import nodemailer from 'nodemailer';
const SMTP_HOST = process.env.SMTP_HOST;
const SMTP_PORT = process.env.SMTP_PORT;
const SMTP_USER = process.env.SMTP_USER;
const SMTP_PASS = process.env.SMTP_PASS;
const EMAIL_FROM = process.env.EMAIL_FROM;
interface EmailMessage {
to: string | string[];
subject: string;
text?: string;
html?: string;
}
class EmailService {
private transporter: nodemailer.Transporter | undefined;
constructor() {}
public async init() {
if (!SMTP_HOST || !SMTP_PORT || !SMTP_USER || !SMTP_PASS || !EMAIL_FROM) {
throw new Error('SMTP configuration is missing');
}
this.transporter = nodemailer.createTransport({
host: SMTP_HOST,
port: parseInt(SMTP_PORT),
secure: true,
auth: {
user: SMTP_USER,
pass: SMTP_PASS,
},
});
await this.transporter.verify();
}
public async sendEmail(message: EmailMessage): Promise<void> {
if (!this.transporter) {
throw new Error('Email service not initialized');
}
await this.transporter.sendMail({
from: EMAIL_FROM,
...message,
});
}
}
export const emailService = new EmailService();

View File

@@ -1,43 +0,0 @@
import nodemailer from 'nodemailer';
import { EmailConfig, EmailMessage } from '@/types/email';
const SMTP_HOST = process.env.SMTP_HOST;
const SMTP_PORT = process.env.SMTP_PORT;
const SMTP_USER = process.env.SMTP_USER;
const SMTP_PASS = process.env.SMTP_PASS;
const EMAIL_FROM = process.env.EMAIL_FROM;
if (!SMTP_HOST || !SMTP_PORT || !SMTP_USER || !SMTP_PASS || !EMAIL_FROM) {
throw new Error('SMTP configuration is missing');
}
let transporter: nodemailer.Transporter;
const emailConfig: EmailConfig = {
from: EMAIL_FROM,
smtp: {
host: SMTP_HOST,
port: parseInt(SMTP_PORT),
secure: true,
auth: {
user: SMTP_USER,
pass: SMTP_PASS,
},
},
};
export const initEmail = async () => {
transporter = nodemailer.createTransport(emailConfig.smtp);
await transporter.verify();
};
export const sendEmail = async (message: EmailMessage): Promise<void> => {
if (!transporter) {
throw new Error('Email service not initialized');
}
await transporter.sendMail({
from: emailConfig.from,
...message,
});
};

View File

@@ -0,0 +1,58 @@
import { redisConfig } from '@/data/redis';
import { jobHandlerMap } from '@/jobs';
import { JobHandler, JobInput } from '@/types/jobs';
import { Job, Queue, Worker, JobsOptions } from 'bullmq';
class JobService {
private jobQueue: Queue | undefined;
private jobWorker: Worker | undefined;
public initQueue() {
if (this.jobQueue) {
return;
}
this.jobQueue = new Queue('jobs', {
connection: {
host: redisConfig.host,
password: redisConfig.password,
port: redisConfig.port,
db: redisConfig.db,
},
defaultJobOptions: {
removeOnComplete: true,
},
});
}
public async initWorker() {
if (this.jobWorker) {
return;
}
this.jobWorker = new Worker('jobs', this.handleJobJob, {
connection: {
host: redisConfig.host,
password: redisConfig.password,
port: redisConfig.port,
db: redisConfig.db,
},
});
}
public async addJob(job: JobInput, options?: JobsOptions) {
if (!this.jobQueue) {
throw new Error('Job queue not initialized.');
}
await this.jobQueue.add(job.type, job, options);
}
private handleJobJob = async (job: Job) => {
const input = job.data as JobInput;
const handler = jobHandlerMap[input.type] as JobHandler<typeof input>;
await handler(input);
};
}
export const jobService = new JobService();

View File

@@ -1,19 +0,0 @@
export interface EmailConfig {
from: string;
smtp?: {
host: string;
port: number;
secure: boolean;
auth: {
user: string;
pass: string;
};
};
}
export interface EmailMessage {
to: string | string[];
subject: string;
text?: string;
html?: string;
}

View File

@@ -0,0 +1,5 @@
export interface JobMap {}
export type JobInput = JobMap[keyof JobMap]['input'];
export type JobHandler<T extends JobInput> = (input: T) => Promise<void>;

View File

@@ -1,8 +0,0 @@
import { EmailMessage } from '@/types/email';
export type Task = SendEmailTask;
export type SendEmailTask = {
type: 'send_email';
message: EmailMessage;
};