From db83d0bcdd19f51c44b1a42350e3f62bd5b89177 Mon Sep 17 00:00:00 2001 From: Hakan Shehu Date: Thu, 23 Oct 2025 10:25:39 +0200 Subject: [PATCH] Implement a cleanup job for deleting orphan data (#242) --- apps/server/src/jobs/cleanup.ts | 153 ++++++++++++++++++++ apps/server/src/jobs/index.ts | 4 +- apps/server/src/jobs/node-clean.ts | 7 +- apps/server/src/jobs/uploads-clean.ts | 63 -------- apps/server/src/jobs/workspace-clean.ts | 2 +- apps/server/src/lib/config/jobs.ts | 10 +- apps/server/src/lib/storage/azure.ts | 13 +- apps/server/src/lib/storage/core.ts | 3 +- apps/server/src/lib/storage/fs.ts | 3 +- apps/server/src/lib/storage/gcs.ts | 25 ++-- apps/server/src/lib/storage/index.ts | 7 +- apps/server/src/lib/storage/s3.ts | 34 +++-- apps/server/src/lib/storage/tus/redis-kv.ts | 3 +- apps/server/src/services/job-service.ts | 16 +- 14 files changed, 225 insertions(+), 118 deletions(-) create mode 100644 apps/server/src/jobs/cleanup.ts delete mode 100644 apps/server/src/jobs/uploads-clean.ts diff --git a/apps/server/src/jobs/cleanup.ts b/apps/server/src/jobs/cleanup.ts new file mode 100644 index 00000000..770c0b71 --- /dev/null +++ b/apps/server/src/jobs/cleanup.ts @@ -0,0 +1,153 @@ +import { DeleteResult } from 'kysely'; +import ms from 'ms'; + +import { database } from '@colanode/server/data/database'; +import { JobHandler } from '@colanode/server/jobs'; +import { createLogger } from '@colanode/server/lib/logger'; +import { storage } from '@colanode/server/lib/storage'; + +const logger = createLogger('server:job:cleanup'); + +export type CleanupInput = { + type: 'cleanup'; +}; + +declare module '@colanode/server/jobs' { + interface JobMap { + cleanup: { + input: CleanupInput; + }; + } +} + +export const cleanupHandler: JobHandler = async () => { + logger.debug(`Cleaning up`); + await cleanNodeRelations(); + await cleanDocuments(); + await cleanUploads(); +}; + +const cleanNodeRelations = async () => { + try { + // clean node relations that are not referenced by any node + const nodeUpdates = await database + .deleteFrom('node_updates') + .where('node_id', 'not in', (qb) => qb.selectFrom('nodes').select('id')) + .execute(); + + logDeletedRows(nodeUpdates, 'node_updates'); + + const nodeInteractions = await database + .deleteFrom('node_interactions') + .where('node_id', 'not in', (qb) => qb.selectFrom('nodes').select('id')) + .execute(); + + logDeletedRows(nodeInteractions, 'node_interactions'); + + const nodeReactions = await database + .deleteFrom('node_reactions') + .where('node_id', 'not in', (qb) => qb.selectFrom('nodes').select('id')) + .execute(); + + logDeletedRows(nodeReactions, 'node_reactions'); + + const nodeEmbeddings = await database + .deleteFrom('node_embeddings') + .where('node_id', 'not in', (qb) => qb.selectFrom('nodes').select('id')) + .execute(); + + logDeletedRows(nodeEmbeddings, 'node_embeddings'); + + const collaborations = await database + .deleteFrom('collaborations') + .where('node_id', 'not in', (qb) => qb.selectFrom('nodes').select('id')) + .execute(); + + logDeletedRows(collaborations, 'collaborations'); + } catch (error) { + logger.error(error, `Error cleaning node relations`); + throw error; + } +}; + +const cleanDocuments = async () => { + try { + // clean documents that are not referenced by any node + const documents = await database + .deleteFrom('documents') + .where('id', 'not in', (qb) => qb.selectFrom('nodes').select('id')) + .execute(); + + logDeletedRows(documents, 'documents'); + + const documentUpdates = await database + .deleteFrom('document_updates') + .where('document_id', 'not in', (qb) => + qb.selectFrom('nodes').select('id') + ) + .execute(); + + logDeletedRows(documentUpdates, 'document_updates'); + + const documentEmbeddings = await database + .deleteFrom('document_embeddings') + .where('document_id', 'not in', (qb) => + qb.selectFrom('nodes').select('id') + ) + .execute(); + + logDeletedRows(documentEmbeddings, 'document_embeddings'); + } catch (error) { + logger.error(error, `Error cleaning documents`); + throw error; + } +}; + +const cleanUploads = async () => { + try { + const sevenDaysAgo = new Date(Date.now() - ms('7 days')); + // Select uploads where file node does not exist OR (not uploaded and created at is older than 7 days) + const uploads = await database + .selectFrom('uploads') + .selectAll() + .where((eb) => + eb.or([ + eb('file_id', 'not in', (qb) => qb.selectFrom('nodes').select('id')), + eb.and([ + eb('uploaded_at', 'is', null), + eb('created_at', '<', sevenDaysAgo), + ]), + ]) + ) + .execute(); + + if (uploads.length === 0) { + return; + } + + for (const upload of uploads) { + await storage.delete(upload.path); + + await database + .deleteFrom('uploads') + .where('file_id', '=', upload.file_id) + .where('upload_id', '=', upload.upload_id) + .execute(); + } + + logger.debug(`Deleted ${uploads.length.toLocaleString()} uploads`); + } catch (error) { + logger.error(error, `Error cleaning uploads`); + } +}; + +const logDeletedRows = (result: DeleteResult[], label: string) => { + let count = BigInt(0); + for (const row of result) { + count += row.numDeletedRows; + } + + if (count > 0) { + logger.debug(`Deleted ${count.toLocaleString()} ${label}`); + } +}; diff --git a/apps/server/src/jobs/index.ts b/apps/server/src/jobs/index.ts index c16372f9..f5b25280 100644 --- a/apps/server/src/jobs/index.ts +++ b/apps/server/src/jobs/index.ts @@ -1,4 +1,5 @@ import { assistantRespondHandler } from '@colanode/server/jobs/assistant-response'; +import { cleanupHandler } from '@colanode/server/jobs/cleanup'; import { documentEmbedHandler } from '@colanode/server/jobs/document-embed'; import { documentEmbedScanHandler } from '@colanode/server/jobs/document-embed-scan'; import { documentUpdatesMergeHandler } from '@colanode/server/jobs/document-updates-merge'; @@ -8,7 +9,6 @@ import { nodeCleanHandler } from '@colanode/server/jobs/node-clean'; import { nodeEmbedHandler } from '@colanode/server/jobs/node-embed'; import { nodeEmbedScanHandler } from '@colanode/server/jobs/node-embed-scan'; import { nodeUpdatesMergeHandler } from '@colanode/server/jobs/node-updates-merge'; -import { uploadsCleanHandler } from '@colanode/server/jobs/uploads-clean'; import { workspaceCleanHandler } from '@colanode/server/jobs/workspace-clean'; // eslint-disable-next-line @typescript-eslint/no-empty-object-type @@ -34,5 +34,5 @@ export const jobHandlerMap: JobHandlerMap = { 'document.embed.scan': documentEmbedScanHandler, 'node.updates.merge': nodeUpdatesMergeHandler, 'document.updates.merge': documentUpdatesMergeHandler, - 'uploads.clean': uploadsCleanHandler, + cleanup: cleanupHandler, }; diff --git a/apps/server/src/jobs/node-clean.ts b/apps/server/src/jobs/node-clean.ts index dcff3a07..9ef7c7da 100644 --- a/apps/server/src/jobs/node-clean.ts +++ b/apps/server/src/jobs/node-clean.ts @@ -137,6 +137,8 @@ const cleanNodeRelations = async (nodeIds: string[]) => { .where('node_id', 'in', nodeIds) .execute(); + await database.deleteFrom('documents').where('id', 'in', nodeIds).execute(); + await database .deleteFrom('document_embeddings') .where('document_id', 'in', nodeIds) @@ -146,11 +148,6 @@ const cleanNodeRelations = async (nodeIds: string[]) => { .deleteFrom('document_updates') .where('document_id', 'in', nodeIds) .execute(); - - await database - .deleteFrom('document_embeddings') - .where('document_id', 'in', nodeIds) - .execute(); }; const cleanNodeFiles = async (nodeIds: string[]) => { diff --git a/apps/server/src/jobs/uploads-clean.ts b/apps/server/src/jobs/uploads-clean.ts deleted file mode 100644 index 92689d61..00000000 --- a/apps/server/src/jobs/uploads-clean.ts +++ /dev/null @@ -1,63 +0,0 @@ -import ms from 'ms'; - -import { database } from '@colanode/server/data/database'; -import { redis } from '@colanode/server/data/redis'; -import { JobHandler } from '@colanode/server/jobs'; -import { config } from '@colanode/server/lib/config'; -import { createLogger } from '@colanode/server/lib/logger'; -import { storage } from '@colanode/server/lib/storage'; -import { RedisKvStore } from '@colanode/server/lib/storage/tus/redis-kv'; - -const logger = createLogger('server:job:uploads-clean'); - -export type UploadsCleanInput = { - type: 'uploads.clean'; -}; - -declare module '@colanode/server/jobs' { - interface JobMap { - 'uploads.clean': { - input: UploadsCleanInput; - }; - } -} - -export const uploadsCleanHandler: JobHandler = async () => { - logger.debug(`Cleaning uploads`); - - try { - // Delete uploads that are older than 7 days - const sevenDaysAgo = new Date(Date.now() - ms('7 days')); - const expiredUploads = await database - .selectFrom('uploads') - .selectAll() - .where('created_at', '<', sevenDaysAgo) - .where('uploaded_at', 'is', null) - .execute(); - - if (expiredUploads.length === 0) { - logger.debug(`No expired uploads found`); - return; - } - - const redisKv = new RedisKvStore(redis, config.redis.tus.kvPrefix); - for (const upload of expiredUploads) { - await storage.delete(upload.path); - await redisKv.delete(upload.path); - - const infoPath = `${upload.path}.info`; - await storage.delete(infoPath); - - await database - .deleteFrom('uploads') - .where('file_id', '=', upload.file_id) - .where('upload_id', '=', upload.upload_id) - .execute(); - } - - logger.debug(`Deleted ${expiredUploads.length} expired uploads`); - } catch (error) { - logger.error(error, `Error cleaning workspace data`); - throw error; - } -}; diff --git a/apps/server/src/jobs/workspace-clean.ts b/apps/server/src/jobs/workspace-clean.ts index de1df0f7..380a102a 100644 --- a/apps/server/src/jobs/workspace-clean.ts +++ b/apps/server/src/jobs/workspace-clean.ts @@ -1,7 +1,7 @@ import { database } from '@colanode/server/data/database'; import { JobHandler } from '@colanode/server/jobs'; -import { storage } from '@colanode/server/lib/storage'; import { createLogger } from '@colanode/server/lib/logger'; +import { storage } from '@colanode/server/lib/storage'; const BATCH_SIZE = 500; const logger = createLogger('server:job:clean-workspace-data'); diff --git a/apps/server/src/lib/config/jobs.ts b/apps/server/src/lib/config/jobs.ts index 24959a24..088e8cac 100644 --- a/apps/server/src/lib/config/jobs.ts +++ b/apps/server/src/lib/config/jobs.ts @@ -35,7 +35,7 @@ export const documentUpdatesMergeJobConfigSchema = z.discriminatedUnion( ] ); -export const uploadsCleanJobConfigSchema = z.discriminatedUnion('enabled', [ +export const cleanupJobConfigSchema = z.discriminatedUnion('enabled', [ z.object({ enabled: z.literal(true), cron: z.string().default(DEFAULT_CRON_PATTERN), @@ -48,7 +48,7 @@ export const uploadsCleanJobConfigSchema = z.discriminatedUnion('enabled', [ export const jobsConfigSchema = z.object({ nodeUpdatesMerge: nodeUpdatesMergeJobConfigSchema, documentUpdatesMerge: documentUpdatesMergeJobConfigSchema, - uploadsClean: uploadsCleanJobConfigSchema, + cleanup: cleanupJobConfigSchema, }); export type JobsConfig = z.infer; @@ -69,9 +69,9 @@ export const readJobsConfigVariables = () => { mergeWindow: process.env.JOBS_DOCUMENT_UPDATES_MERGE_MERGE_WINDOW, cutoffWindow: process.env.JOBS_DOCUMENT_UPDATES_MERGE_CUTOFF_WINDOW, }, - uploadsClean: { - enabled: process.env.JOBS_UPLOADS_CLEAN_ENABLED === 'true', - cron: process.env.JOBS_UPLOADS_CLEAN_CRON, + cleanup: { + enabled: process.env.JOBS_CLEANUP_ENABLED === 'true', + cron: process.env.JOBS_CLEANUP_CRON, }, }; }; diff --git a/apps/server/src/lib/storage/azure.ts b/apps/server/src/lib/storage/azure.ts index f49780d5..b2479328 100644 --- a/apps/server/src/lib/storage/azure.ts +++ b/apps/server/src/lib/storage/azure.ts @@ -1,12 +1,13 @@ import { Readable } from 'stream'; -import { DataStore } from '@tus/server'; -import { AzureStore } from '@tus/azure-store'; import { BlobServiceClient, StorageSharedKeyCredential, BlockBlobClient, } from '@azure/storage-blob'; +import { AzureStore } from '@tus/azure-store'; +import { DataStore } from '@tus/server'; + import type { AzureStorageConfig } from '@colanode/server/lib/config/storage'; import type { Storage } from './core'; @@ -15,7 +16,7 @@ export class AzureBlobStorage implements Storage { private readonly containerName: string; private readonly blobServiceClient: BlobServiceClient; private readonly config: AzureStorageConfig; - public readonly tusStore: DataStore; + private readonly azureStore: AzureStore; constructor(config: AzureStorageConfig) { this.config = { ...config }; @@ -30,13 +31,17 @@ export class AzureBlobStorage implements Storage { ); this.containerName = this.config.containerName; - this.tusStore = new AzureStore({ + this.azureStore = new AzureStore({ account: this.config.account, accountKey: this.config.accountKey, containerName: this.containerName, }); } + public get tusStore(): DataStore { + return this.azureStore; + } + private getBlockBlobClient(path: string): BlockBlobClient { const containerClient = this.blobServiceClient.getContainerClient( this.containerName diff --git a/apps/server/src/lib/storage/core.ts b/apps/server/src/lib/storage/core.ts index 72cf81c5..2b99db89 100644 --- a/apps/server/src/lib/storage/core.ts +++ b/apps/server/src/lib/storage/core.ts @@ -1,7 +1,9 @@ import { Readable } from 'stream'; + import { DataStore } from '@tus/server'; export interface Storage { + readonly tusStore: DataStore; download(path: string): Promise<{ stream: Readable; contentType?: string }>; delete(path: string): Promise; upload( @@ -10,5 +12,4 @@ export interface Storage { contentType: string, contentLength?: bigint ): Promise; - readonly tusStore: DataStore; } diff --git a/apps/server/src/lib/storage/fs.ts b/apps/server/src/lib/storage/fs.ts index 54d66287..e3de2a45 100644 --- a/apps/server/src/lib/storage/fs.ts +++ b/apps/server/src/lib/storage/fs.ts @@ -1,8 +1,9 @@ import { createReadStream, createWriteStream, promises as fs } from 'fs'; import { Readable } from 'stream'; -import { DataStore } from '@tus/server'; import { FileStore } from '@tus/file-store'; +import { DataStore } from '@tus/server'; + import type { FileStorageConfig } from '@colanode/server/lib/config/storage'; import type { Storage } from './core'; diff --git a/apps/server/src/lib/storage/gcs.ts b/apps/server/src/lib/storage/gcs.ts index 05522df0..f900ae80 100644 --- a/apps/server/src/lib/storage/gcs.ts +++ b/apps/server/src/lib/storage/gcs.ts @@ -1,31 +1,36 @@ import { Readable } from 'stream'; -import { Storage, Bucket, File } from '@google-cloud/storage'; -import { DataStore } from '@tus/server'; +import { Storage as GoogleStorage, Bucket, File } from '@google-cloud/storage'; import { GCSStore } from '@tus/gcs-store'; +import { DataStore } from '@tus/server'; + import type { GCSStorageConfig } from '@colanode/server/lib/config/storage'; -import type { Storage as StorageInterface } from './core'; +import type { Storage } from './core'; -export class GCSStorage implements StorageInterface { +export class GCSStorage implements Storage { private readonly bucket: Bucket; - public readonly tusStore: DataStore; + private readonly gcsStore: GCSStore; constructor(config: GCSStorageConfig) { - const storage = new Storage({ + const storage = new GoogleStorage({ projectId: config.projectId, keyFilename: config.credentials, }); this.bucket = storage.bucket(config.bucket); - this.tusStore = new GCSStore({ bucket: this.bucket }); + this.gcsStore = new GCSStore({ bucket: this.bucket }); + } + + public get tusStore(): DataStore { + return this.gcsStore; } private getFile(path: string): File { return this.bucket.file(path); } - async download( + public async download( path: string ): Promise<{ stream: Readable; contentType?: string }> { const file = this.getFile(path); @@ -38,12 +43,12 @@ export class GCSStorage implements StorageInterface { }; } - async delete(path: string): Promise { + public async delete(path: string): Promise { const file = this.getFile(path); await file.delete(); } - async upload( + public async upload( path: string, data: Buffer | Readable, contentType: string, diff --git a/apps/server/src/lib/storage/index.ts b/apps/server/src/lib/storage/index.ts index 005a0f22..6d921c4b 100644 --- a/apps/server/src/lib/storage/index.ts +++ b/apps/server/src/lib/storage/index.ts @@ -1,9 +1,8 @@ import { config } from '@colanode/server/lib/config'; +import type { StorageConfig } from '@colanode/server/lib/config/storage'; -import type { StorageConfig } from '../config/storage'; - -import type { Storage } from './core'; import { AzureBlobStorage } from './azure'; +import type { Storage } from './core'; import { FileSystemStorage } from './fs'; import { GCSStorage } from './gcs'; import { S3Storage } from './s3'; @@ -20,7 +19,7 @@ const buildStorage = (storageConfig: StorageConfig): Storage => { return new AzureBlobStorage(storageConfig); default: throw new Error( - `Unsupported storage type: ${(storageConfig as any).type}` + `Unsupported storage type: ${JSON.stringify(storageConfig)}` ); } }; diff --git a/apps/server/src/lib/storage/s3.ts b/apps/server/src/lib/storage/s3.ts index 50233afe..c69ae6c2 100644 --- a/apps/server/src/lib/storage/s3.ts +++ b/apps/server/src/lib/storage/s3.ts @@ -6,22 +6,23 @@ import { PutObjectCommand, S3Client, } from '@aws-sdk/client-s3'; -import { RedisClientType } from '@redis/client'; -import { FILE_UPLOAD_PART_SIZE } from '@colanode/core'; +import { MetadataValue, S3Store } from '@tus/s3-store'; import { DataStore } from '@tus/server'; -import { S3Store } from '@tus/s3-store'; -import { config } from '@colanode/server/lib/config'; -import { RedisKvStore } from '@colanode/server/lib/storage/tus/redis-kv'; -import type { S3StorageConfig } from '@colanode/server/lib/config/storage'; +import { FILE_UPLOAD_PART_SIZE } from '@colanode/core'; import { redis } from '@colanode/server/data/redis'; +import { config } from '@colanode/server/lib/config'; +import type { S3StorageConfig } from '@colanode/server/lib/config/storage'; +import { RedisKvStore } from '@colanode/server/lib/storage/tus/redis-kv'; + import type { Storage } from './core'; export class S3Storage implements Storage { private readonly client: S3Client; private readonly bucket: string; private readonly s3Config: S3StorageConfig; - public readonly tusStore: DataStore; + private readonly s3Store: S3Store; + private readonly redisKv: RedisKvStore; constructor(s3Config: S3StorageConfig) { this.s3Config = { ...s3Config }; @@ -37,9 +38,10 @@ export class S3Storage implements Storage { this.bucket = this.s3Config.bucket; - this.tusStore = new S3Store({ + this.redisKv = new RedisKvStore(redis, config.redis.tus.kvPrefix); + this.s3Store = new S3Store({ partSize: FILE_UPLOAD_PART_SIZE, - cache: new RedisKvStore(redis, config.redis.tus.kvPrefix), + cache: this.redisKv, s3ClientConfig: { bucket: this.bucket, endpoint: this.s3Config.endpoint, @@ -53,7 +55,11 @@ export class S3Storage implements Storage { }); } - async download( + public get tusStore(): DataStore { + return this.s3Store; + } + + public async download( path: string ): Promise<{ stream: Readable; contentType?: string }> { const command = new GetObjectCommand({ Bucket: this.bucket, Key: path }); @@ -69,12 +75,16 @@ export class S3Storage implements Storage { }; } - async delete(path: string): Promise { + public async delete(path: string): Promise { const command = new DeleteObjectCommand({ Bucket: this.bucket, Key: path }); await this.client.send(command); + await this.redisKv.delete(path); + + const infoPath = `${path}.info`; + await this.redisKv.delete(infoPath); } - async upload( + public async upload( path: string, data: Buffer | Readable, contentType: string, diff --git a/apps/server/src/lib/storage/tus/redis-kv.ts b/apps/server/src/lib/storage/tus/redis-kv.ts index c735cf82..681aaf98 100644 --- a/apps/server/src/lib/storage/tus/redis-kv.ts +++ b/apps/server/src/lib/storage/tus/redis-kv.ts @@ -1,5 +1,4 @@ import type { RedisClientType } from '@redis/client'; -import type { Upload } from '@tus/server'; import type { KvStore } from '@tus/utils'; import { sha256 } from 'js-sha256'; @@ -11,7 +10,7 @@ import { sha256 } from 'js-sha256'; * Original author: Mitja Puzigaća */ -export class RedisKvStore implements KvStore { +export class RedisKvStore implements KvStore { private readonly redis: RedisClientType; private readonly prefix: string; diff --git a/apps/server/src/services/job-service.ts b/apps/server/src/services/job-service.ts index c91d77aa..979ab9c0 100644 --- a/apps/server/src/services/job-service.ts +++ b/apps/server/src/services/job-service.ts @@ -47,7 +47,7 @@ class JobService { return; } - this.jobWorker = new Worker(this.queueName, this.handleJobJob, { + this.jobWorker = new Worker(this.queueName, this.handleJob, { prefix: this.prefix, connection: { url: config.redis.url, @@ -64,7 +64,7 @@ class JobService { await this.jobQueue.add(job.type, job, options); } - private handleJobJob = async (job: Job) => { + private handleJob = async (job: Job) => { const input = job.data as JobInput; const handler = jobHandlerMap[input.type] as JobHandler; if (!handler) { @@ -89,7 +89,7 @@ class JobService { await this.initDocumentEmbedScanRecurringJob(); await this.initNodeUpdatesMergeRecurringJob(); await this.initDocumentUpdatesMergeRecurringJob(); - await this.initUploadsCleanRecurringJob(); + await this.initCleanupRecurringJob(); } private async initNodeEmbedScanRecurringJob(): Promise { @@ -183,19 +183,19 @@ class JobService { } } - private async initUploadsCleanRecurringJob(): Promise { + private async initCleanupRecurringJob(): Promise { if (!this.jobQueue) { return; } - const id = 'uploads.clean'; - if (config.jobs.uploadsClean.enabled) { + const id = 'cleanup'; + if (config.jobs.cleanup.enabled) { this.jobQueue.upsertJobScheduler( id, - { pattern: config.jobs.uploadsClean.cron }, + { pattern: config.jobs.cleanup.cron }, { name: id, - data: { type: 'uploads.clean' } as JobInput, + data: { type: 'cleanup' } as JobInput, } ); } else {