diff --git a/apps/server/src/index.ts b/apps/server/src/index.ts index 870529a3..48aae2b1 100644 --- a/apps/server/src/index.ts +++ b/apps/server/src/index.ts @@ -1,7 +1,7 @@ import { initApi } from '@/api'; import { initRedis } from '@/data/redis'; import { migrate } from '@/data/database'; -import { initTaskWorker } from '@/queues/tasks'; +import { jobService } from '@/services/job-service'; import dotenv from 'dotenv'; dotenv.config(); @@ -11,7 +11,8 @@ const init = async () => { await initRedis(); await initApi(); - initTaskWorker(); + jobService.initQueue(); + await jobService.initWorker(); }; init(); diff --git a/apps/server/src/jobs/clean-workspace-data.ts b/apps/server/src/jobs/clean-workspace-data.ts new file mode 100644 index 00000000..b542fcee --- /dev/null +++ b/apps/server/src/jobs/clean-workspace-data.ts @@ -0,0 +1,20 @@ +import { JobHandler } from '@/types/jobs'; + +export type CleanWorkspaceDataInput = { + type: 'clean_workspace_data'; + workspaceId: string; +}; + +declare module '@/types/jobs' { + interface JobMap { + clean_workspace_data: { + input: CleanWorkspaceDataInput; + }; + } +} + +export const cleanWorkspaceDataHandler: JobHandler< + CleanWorkspaceDataInput +> = async (input) => { + console.log(input); +}; diff --git a/apps/server/src/jobs/index.ts b/apps/server/src/jobs/index.ts new file mode 100644 index 00000000..1cbaa530 --- /dev/null +++ b/apps/server/src/jobs/index.ts @@ -0,0 +1,14 @@ +import { JobHandler } from '@/types/jobs'; +import { JobMap } from '@/types/jobs'; + +import { sendEmailHandler } from '@/jobs/send-email'; +import { cleanWorkspaceDataHandler } from '@/jobs/clean-workspace-data'; + +type JobHandlerMap = { + [K in keyof JobMap]: JobHandler; +}; + +export const jobHandlerMap: JobHandlerMap = { + send_email: sendEmailHandler, + clean_workspace_data: cleanWorkspaceDataHandler, +}; diff --git a/apps/server/src/jobs/send-email.ts b/apps/server/src/jobs/send-email.ts new file mode 100644 index 00000000..21ecd5c0 --- /dev/null +++ b/apps/server/src/jobs/send-email.ts @@ -0,0 +1,22 @@ +import { emailService } from '@/services/email-service'; +import { JobHandler } from '@/types/jobs'; + +export type SendEmailInput = { + type: 'send_email'; + to: string | string[]; + subject: string; + text?: string; + html?: string; +}; + +declare module '@/types/jobs' { + interface JobMap { + send_email: { + input: SendEmailInput; + }; + } +} + +export const sendEmailHandler: JobHandler = async (input) => { + await emailService.sendEmail(input); +}; diff --git a/apps/server/src/queues/tasks.ts b/apps/server/src/queues/tasks.ts deleted file mode 100644 index 95163082..00000000 --- a/apps/server/src/queues/tasks.ts +++ /dev/null @@ -1,44 +0,0 @@ -import { redisConfig } from '@/data/redis'; -import { SendEmailTask, Task } from '@/types/tasks'; -import { Job, Queue, Worker } from 'bullmq'; -import { sendEmail } from '@/services/email'; - -const taskQueue = new Queue('tasks', { - connection: { - host: redisConfig.host, - password: redisConfig.password, - port: redisConfig.port, - db: redisConfig.db, - }, - defaultJobOptions: { - removeOnComplete: true, - }, -}); - -export const enqueueTask = async (task: Task): Promise => { - await taskQueue.add('task', task); -}; - -export const initTaskWorker = () => { - return new Worker('tasks', handleTaskJob, { - connection: { - host: redisConfig.host, - password: redisConfig.password, - port: redisConfig.port, - db: redisConfig.db, - }, - }); -}; - -const handleTaskJob = async (job: Job) => { - const task = job.data as Task; - - switch (task.type) { - case 'send_email': - return handleSendEmailTask(task); - } -}; - -const handleSendEmailTask = async (task: SendEmailTask): Promise => { - await sendEmail(task.message); -}; diff --git a/apps/server/src/services/email-service.ts b/apps/server/src/services/email-service.ts new file mode 100644 index 00000000..5fce97e8 --- /dev/null +++ b/apps/server/src/services/email-service.ts @@ -0,0 +1,51 @@ +import nodemailer from 'nodemailer'; + +const SMTP_HOST = process.env.SMTP_HOST; +const SMTP_PORT = process.env.SMTP_PORT; +const SMTP_USER = process.env.SMTP_USER; +const SMTP_PASS = process.env.SMTP_PASS; +const EMAIL_FROM = process.env.EMAIL_FROM; + +interface EmailMessage { + to: string | string[]; + subject: string; + text?: string; + html?: string; +} + +class EmailService { + private transporter: nodemailer.Transporter | undefined; + + constructor() {} + + public async init() { + if (!SMTP_HOST || !SMTP_PORT || !SMTP_USER || !SMTP_PASS || !EMAIL_FROM) { + throw new Error('SMTP configuration is missing'); + } + + this.transporter = nodemailer.createTransport({ + host: SMTP_HOST, + port: parseInt(SMTP_PORT), + secure: true, + auth: { + user: SMTP_USER, + pass: SMTP_PASS, + }, + }); + + await this.transporter.verify(); + } + + public async sendEmail(message: EmailMessage): Promise { + if (!this.transporter) { + throw new Error('Email service not initialized'); + } + + await this.transporter.sendMail({ + from: EMAIL_FROM, + ...message, + }); + } +} + +export const emailService = new EmailService(); diff --git a/apps/server/src/services/email.ts b/apps/server/src/services/email.ts deleted file mode 100644 index 292eae4c..00000000 --- a/apps/server/src/services/email.ts +++ /dev/null @@ -1,43 +0,0 @@ -import nodemailer from 'nodemailer'; -import { EmailConfig, EmailMessage } from '@/types/email'; - -const SMTP_HOST = process.env.SMTP_HOST; -const SMTP_PORT = process.env.SMTP_PORT; -const SMTP_USER = process.env.SMTP_USER; -const SMTP_PASS = process.env.SMTP_PASS; -const EMAIL_FROM = process.env.EMAIL_FROM; - -if (!SMTP_HOST || !SMTP_PORT || !SMTP_USER || !SMTP_PASS || !EMAIL_FROM) { - throw new Error('SMTP configuration is missing'); -} - -let transporter: nodemailer.Transporter; - -const emailConfig: EmailConfig = { - from: EMAIL_FROM, - smtp: { - host: SMTP_HOST, - port: parseInt(SMTP_PORT), - secure: true, - auth: { - user: SMTP_USER, - pass: SMTP_PASS, - }, - }, -}; - -export const initEmail = async () => { - transporter = nodemailer.createTransport(emailConfig.smtp); - await transporter.verify(); -}; - -export const sendEmail = async (message: EmailMessage): Promise => { - if (!transporter) { - throw new Error('Email service not initialized'); - } - - await transporter.sendMail({ - from: emailConfig.from, - ...message, - }); -}; \ No newline at end of file diff --git a/apps/server/src/services/job-service.ts b/apps/server/src/services/job-service.ts new file mode 100644 index 00000000..caae1633 --- /dev/null +++ b/apps/server/src/services/job-service.ts @@ -0,0 +1,58 @@ +import { redisConfig } from '@/data/redis'; +import { jobHandlerMap } from '@/jobs'; +import { JobHandler, JobInput } from '@/types/jobs'; +import { Job, Queue, Worker, JobsOptions } from 'bullmq'; + +class JobService { + private jobQueue: Queue | undefined; + private jobWorker: Worker | undefined; + + public initQueue() { + if (this.jobQueue) { + return; + } + + this.jobQueue = new Queue('jobs', { + connection: { + host: redisConfig.host, + password: redisConfig.password, + port: redisConfig.port, + db: redisConfig.db, + }, + defaultJobOptions: { + removeOnComplete: true, + }, + }); + } + + public async initWorker() { + if (this.jobWorker) { + return; + } + + this.jobWorker = new Worker('jobs', this.handleJobJob, { + connection: { + host: redisConfig.host, + password: redisConfig.password, + port: redisConfig.port, + db: redisConfig.db, + }, + }); + } + + public async addJob(job: JobInput, options?: JobsOptions) { + if (!this.jobQueue) { + throw new Error('Job queue not initialized.'); + } + + await this.jobQueue.add(job.type, job, options); + } + + private handleJobJob = async (job: Job) => { + const input = job.data as JobInput; + const handler = jobHandlerMap[input.type] as JobHandler; + await handler(input); + }; +} + +export const jobService = new JobService(); diff --git a/apps/server/src/types/email.ts b/apps/server/src/types/email.ts deleted file mode 100644 index 2da4ae2a..00000000 --- a/apps/server/src/types/email.ts +++ /dev/null @@ -1,19 +0,0 @@ -export interface EmailConfig { - from: string; - smtp?: { - host: string; - port: number; - secure: boolean; - auth: { - user: string; - pass: string; - }; - }; -} - -export interface EmailMessage { - to: string | string[]; - subject: string; - text?: string; - html?: string; -} \ No newline at end of file diff --git a/apps/server/src/types/jobs.ts b/apps/server/src/types/jobs.ts new file mode 100644 index 00000000..98dc8bf2 --- /dev/null +++ b/apps/server/src/types/jobs.ts @@ -0,0 +1,5 @@ +export interface JobMap {} + +export type JobInput = JobMap[keyof JobMap]['input']; + +export type JobHandler = (input: T) => Promise; diff --git a/apps/server/src/types/tasks.ts b/apps/server/src/types/tasks.ts deleted file mode 100644 index 668689b6..00000000 --- a/apps/server/src/types/tasks.ts +++ /dev/null @@ -1,8 +0,0 @@ -import { EmailMessage } from '@/types/email'; - -export type Task = SendEmailTask; - -export type SendEmailTask = { - type: 'send_email'; - message: EmailMessage; -};