mirror of
https://github.com/colanode/colanode.git
synced 2025-12-16 11:47:47 +01:00
Implement a cleanup job for deleting orphan data (#242)
This commit is contained in:
153
apps/server/src/jobs/cleanup.ts
Normal file
153
apps/server/src/jobs/cleanup.ts
Normal file
@@ -0,0 +1,153 @@
|
|||||||
|
import { DeleteResult } from 'kysely';
|
||||||
|
import ms from 'ms';
|
||||||
|
|
||||||
|
import { database } from '@colanode/server/data/database';
|
||||||
|
import { JobHandler } from '@colanode/server/jobs';
|
||||||
|
import { createLogger } from '@colanode/server/lib/logger';
|
||||||
|
import { storage } from '@colanode/server/lib/storage';
|
||||||
|
|
||||||
|
const logger = createLogger('server:job:cleanup');
|
||||||
|
|
||||||
|
export type CleanupInput = {
|
||||||
|
type: 'cleanup';
|
||||||
|
};
|
||||||
|
|
||||||
|
declare module '@colanode/server/jobs' {
|
||||||
|
interface JobMap {
|
||||||
|
cleanup: {
|
||||||
|
input: CleanupInput;
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export const cleanupHandler: JobHandler<CleanupInput> = async () => {
|
||||||
|
logger.debug(`Cleaning up`);
|
||||||
|
await cleanNodeRelations();
|
||||||
|
await cleanDocuments();
|
||||||
|
await cleanUploads();
|
||||||
|
};
|
||||||
|
|
||||||
|
const cleanNodeRelations = async () => {
|
||||||
|
try {
|
||||||
|
// clean node relations that are not referenced by any node
|
||||||
|
const nodeUpdates = await database
|
||||||
|
.deleteFrom('node_updates')
|
||||||
|
.where('node_id', 'not in', (qb) => qb.selectFrom('nodes').select('id'))
|
||||||
|
.execute();
|
||||||
|
|
||||||
|
logDeletedRows(nodeUpdates, 'node_updates');
|
||||||
|
|
||||||
|
const nodeInteractions = await database
|
||||||
|
.deleteFrom('node_interactions')
|
||||||
|
.where('node_id', 'not in', (qb) => qb.selectFrom('nodes').select('id'))
|
||||||
|
.execute();
|
||||||
|
|
||||||
|
logDeletedRows(nodeInteractions, 'node_interactions');
|
||||||
|
|
||||||
|
const nodeReactions = await database
|
||||||
|
.deleteFrom('node_reactions')
|
||||||
|
.where('node_id', 'not in', (qb) => qb.selectFrom('nodes').select('id'))
|
||||||
|
.execute();
|
||||||
|
|
||||||
|
logDeletedRows(nodeReactions, 'node_reactions');
|
||||||
|
|
||||||
|
const nodeEmbeddings = await database
|
||||||
|
.deleteFrom('node_embeddings')
|
||||||
|
.where('node_id', 'not in', (qb) => qb.selectFrom('nodes').select('id'))
|
||||||
|
.execute();
|
||||||
|
|
||||||
|
logDeletedRows(nodeEmbeddings, 'node_embeddings');
|
||||||
|
|
||||||
|
const collaborations = await database
|
||||||
|
.deleteFrom('collaborations')
|
||||||
|
.where('node_id', 'not in', (qb) => qb.selectFrom('nodes').select('id'))
|
||||||
|
.execute();
|
||||||
|
|
||||||
|
logDeletedRows(collaborations, 'collaborations');
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(error, `Error cleaning node relations`);
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
const cleanDocuments = async () => {
|
||||||
|
try {
|
||||||
|
// clean documents that are not referenced by any node
|
||||||
|
const documents = await database
|
||||||
|
.deleteFrom('documents')
|
||||||
|
.where('id', 'not in', (qb) => qb.selectFrom('nodes').select('id'))
|
||||||
|
.execute();
|
||||||
|
|
||||||
|
logDeletedRows(documents, 'documents');
|
||||||
|
|
||||||
|
const documentUpdates = await database
|
||||||
|
.deleteFrom('document_updates')
|
||||||
|
.where('document_id', 'not in', (qb) =>
|
||||||
|
qb.selectFrom('nodes').select('id')
|
||||||
|
)
|
||||||
|
.execute();
|
||||||
|
|
||||||
|
logDeletedRows(documentUpdates, 'document_updates');
|
||||||
|
|
||||||
|
const documentEmbeddings = await database
|
||||||
|
.deleteFrom('document_embeddings')
|
||||||
|
.where('document_id', 'not in', (qb) =>
|
||||||
|
qb.selectFrom('nodes').select('id')
|
||||||
|
)
|
||||||
|
.execute();
|
||||||
|
|
||||||
|
logDeletedRows(documentEmbeddings, 'document_embeddings');
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(error, `Error cleaning documents`);
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
const cleanUploads = async () => {
|
||||||
|
try {
|
||||||
|
const sevenDaysAgo = new Date(Date.now() - ms('7 days'));
|
||||||
|
// Select uploads where file node does not exist OR (not uploaded and created at is older than 7 days)
|
||||||
|
const uploads = await database
|
||||||
|
.selectFrom('uploads')
|
||||||
|
.selectAll()
|
||||||
|
.where((eb) =>
|
||||||
|
eb.or([
|
||||||
|
eb('file_id', 'not in', (qb) => qb.selectFrom('nodes').select('id')),
|
||||||
|
eb.and([
|
||||||
|
eb('uploaded_at', 'is', null),
|
||||||
|
eb('created_at', '<', sevenDaysAgo),
|
||||||
|
]),
|
||||||
|
])
|
||||||
|
)
|
||||||
|
.execute();
|
||||||
|
|
||||||
|
if (uploads.length === 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (const upload of uploads) {
|
||||||
|
await storage.delete(upload.path);
|
||||||
|
|
||||||
|
await database
|
||||||
|
.deleteFrom('uploads')
|
||||||
|
.where('file_id', '=', upload.file_id)
|
||||||
|
.where('upload_id', '=', upload.upload_id)
|
||||||
|
.execute();
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.debug(`Deleted ${uploads.length.toLocaleString()} uploads`);
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(error, `Error cleaning uploads`);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
const logDeletedRows = (result: DeleteResult[], label: string) => {
|
||||||
|
let count = BigInt(0);
|
||||||
|
for (const row of result) {
|
||||||
|
count += row.numDeletedRows;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (count > 0) {
|
||||||
|
logger.debug(`Deleted ${count.toLocaleString()} ${label}`);
|
||||||
|
}
|
||||||
|
};
|
||||||
@@ -1,4 +1,5 @@
|
|||||||
import { assistantRespondHandler } from '@colanode/server/jobs/assistant-response';
|
import { assistantRespondHandler } from '@colanode/server/jobs/assistant-response';
|
||||||
|
import { cleanupHandler } from '@colanode/server/jobs/cleanup';
|
||||||
import { documentEmbedHandler } from '@colanode/server/jobs/document-embed';
|
import { documentEmbedHandler } from '@colanode/server/jobs/document-embed';
|
||||||
import { documentEmbedScanHandler } from '@colanode/server/jobs/document-embed-scan';
|
import { documentEmbedScanHandler } from '@colanode/server/jobs/document-embed-scan';
|
||||||
import { documentUpdatesMergeHandler } from '@colanode/server/jobs/document-updates-merge';
|
import { documentUpdatesMergeHandler } from '@colanode/server/jobs/document-updates-merge';
|
||||||
@@ -8,7 +9,6 @@ import { nodeCleanHandler } from '@colanode/server/jobs/node-clean';
|
|||||||
import { nodeEmbedHandler } from '@colanode/server/jobs/node-embed';
|
import { nodeEmbedHandler } from '@colanode/server/jobs/node-embed';
|
||||||
import { nodeEmbedScanHandler } from '@colanode/server/jobs/node-embed-scan';
|
import { nodeEmbedScanHandler } from '@colanode/server/jobs/node-embed-scan';
|
||||||
import { nodeUpdatesMergeHandler } from '@colanode/server/jobs/node-updates-merge';
|
import { nodeUpdatesMergeHandler } from '@colanode/server/jobs/node-updates-merge';
|
||||||
import { uploadsCleanHandler } from '@colanode/server/jobs/uploads-clean';
|
|
||||||
import { workspaceCleanHandler } from '@colanode/server/jobs/workspace-clean';
|
import { workspaceCleanHandler } from '@colanode/server/jobs/workspace-clean';
|
||||||
|
|
||||||
// eslint-disable-next-line @typescript-eslint/no-empty-object-type
|
// eslint-disable-next-line @typescript-eslint/no-empty-object-type
|
||||||
@@ -34,5 +34,5 @@ export const jobHandlerMap: JobHandlerMap = {
|
|||||||
'document.embed.scan': documentEmbedScanHandler,
|
'document.embed.scan': documentEmbedScanHandler,
|
||||||
'node.updates.merge': nodeUpdatesMergeHandler,
|
'node.updates.merge': nodeUpdatesMergeHandler,
|
||||||
'document.updates.merge': documentUpdatesMergeHandler,
|
'document.updates.merge': documentUpdatesMergeHandler,
|
||||||
'uploads.clean': uploadsCleanHandler,
|
cleanup: cleanupHandler,
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -137,6 +137,8 @@ const cleanNodeRelations = async (nodeIds: string[]) => {
|
|||||||
.where('node_id', 'in', nodeIds)
|
.where('node_id', 'in', nodeIds)
|
||||||
.execute();
|
.execute();
|
||||||
|
|
||||||
|
await database.deleteFrom('documents').where('id', 'in', nodeIds).execute();
|
||||||
|
|
||||||
await database
|
await database
|
||||||
.deleteFrom('document_embeddings')
|
.deleteFrom('document_embeddings')
|
||||||
.where('document_id', 'in', nodeIds)
|
.where('document_id', 'in', nodeIds)
|
||||||
@@ -146,11 +148,6 @@ const cleanNodeRelations = async (nodeIds: string[]) => {
|
|||||||
.deleteFrom('document_updates')
|
.deleteFrom('document_updates')
|
||||||
.where('document_id', 'in', nodeIds)
|
.where('document_id', 'in', nodeIds)
|
||||||
.execute();
|
.execute();
|
||||||
|
|
||||||
await database
|
|
||||||
.deleteFrom('document_embeddings')
|
|
||||||
.where('document_id', 'in', nodeIds)
|
|
||||||
.execute();
|
|
||||||
};
|
};
|
||||||
|
|
||||||
const cleanNodeFiles = async (nodeIds: string[]) => {
|
const cleanNodeFiles = async (nodeIds: string[]) => {
|
||||||
|
|||||||
@@ -1,63 +0,0 @@
|
|||||||
import ms from 'ms';
|
|
||||||
|
|
||||||
import { database } from '@colanode/server/data/database';
|
|
||||||
import { redis } from '@colanode/server/data/redis';
|
|
||||||
import { JobHandler } from '@colanode/server/jobs';
|
|
||||||
import { config } from '@colanode/server/lib/config';
|
|
||||||
import { createLogger } from '@colanode/server/lib/logger';
|
|
||||||
import { storage } from '@colanode/server/lib/storage';
|
|
||||||
import { RedisKvStore } from '@colanode/server/lib/storage/tus/redis-kv';
|
|
||||||
|
|
||||||
const logger = createLogger('server:job:uploads-clean');
|
|
||||||
|
|
||||||
export type UploadsCleanInput = {
|
|
||||||
type: 'uploads.clean';
|
|
||||||
};
|
|
||||||
|
|
||||||
declare module '@colanode/server/jobs' {
|
|
||||||
interface JobMap {
|
|
||||||
'uploads.clean': {
|
|
||||||
input: UploadsCleanInput;
|
|
||||||
};
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
export const uploadsCleanHandler: JobHandler<UploadsCleanInput> = async () => {
|
|
||||||
logger.debug(`Cleaning uploads`);
|
|
||||||
|
|
||||||
try {
|
|
||||||
// Delete uploads that are older than 7 days
|
|
||||||
const sevenDaysAgo = new Date(Date.now() - ms('7 days'));
|
|
||||||
const expiredUploads = await database
|
|
||||||
.selectFrom('uploads')
|
|
||||||
.selectAll()
|
|
||||||
.where('created_at', '<', sevenDaysAgo)
|
|
||||||
.where('uploaded_at', 'is', null)
|
|
||||||
.execute();
|
|
||||||
|
|
||||||
if (expiredUploads.length === 0) {
|
|
||||||
logger.debug(`No expired uploads found`);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
const redisKv = new RedisKvStore(redis, config.redis.tus.kvPrefix);
|
|
||||||
for (const upload of expiredUploads) {
|
|
||||||
await storage.delete(upload.path);
|
|
||||||
await redisKv.delete(upload.path);
|
|
||||||
|
|
||||||
const infoPath = `${upload.path}.info`;
|
|
||||||
await storage.delete(infoPath);
|
|
||||||
|
|
||||||
await database
|
|
||||||
.deleteFrom('uploads')
|
|
||||||
.where('file_id', '=', upload.file_id)
|
|
||||||
.where('upload_id', '=', upload.upload_id)
|
|
||||||
.execute();
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.debug(`Deleted ${expiredUploads.length} expired uploads`);
|
|
||||||
} catch (error) {
|
|
||||||
logger.error(error, `Error cleaning workspace data`);
|
|
||||||
throw error;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
@@ -1,7 +1,7 @@
|
|||||||
import { database } from '@colanode/server/data/database';
|
import { database } from '@colanode/server/data/database';
|
||||||
import { JobHandler } from '@colanode/server/jobs';
|
import { JobHandler } from '@colanode/server/jobs';
|
||||||
import { storage } from '@colanode/server/lib/storage';
|
|
||||||
import { createLogger } from '@colanode/server/lib/logger';
|
import { createLogger } from '@colanode/server/lib/logger';
|
||||||
|
import { storage } from '@colanode/server/lib/storage';
|
||||||
|
|
||||||
const BATCH_SIZE = 500;
|
const BATCH_SIZE = 500;
|
||||||
const logger = createLogger('server:job:clean-workspace-data');
|
const logger = createLogger('server:job:clean-workspace-data');
|
||||||
|
|||||||
@@ -35,7 +35,7 @@ export const documentUpdatesMergeJobConfigSchema = z.discriminatedUnion(
|
|||||||
]
|
]
|
||||||
);
|
);
|
||||||
|
|
||||||
export const uploadsCleanJobConfigSchema = z.discriminatedUnion('enabled', [
|
export const cleanupJobConfigSchema = z.discriminatedUnion('enabled', [
|
||||||
z.object({
|
z.object({
|
||||||
enabled: z.literal(true),
|
enabled: z.literal(true),
|
||||||
cron: z.string().default(DEFAULT_CRON_PATTERN),
|
cron: z.string().default(DEFAULT_CRON_PATTERN),
|
||||||
@@ -48,7 +48,7 @@ export const uploadsCleanJobConfigSchema = z.discriminatedUnion('enabled', [
|
|||||||
export const jobsConfigSchema = z.object({
|
export const jobsConfigSchema = z.object({
|
||||||
nodeUpdatesMerge: nodeUpdatesMergeJobConfigSchema,
|
nodeUpdatesMerge: nodeUpdatesMergeJobConfigSchema,
|
||||||
documentUpdatesMerge: documentUpdatesMergeJobConfigSchema,
|
documentUpdatesMerge: documentUpdatesMergeJobConfigSchema,
|
||||||
uploadsClean: uploadsCleanJobConfigSchema,
|
cleanup: cleanupJobConfigSchema,
|
||||||
});
|
});
|
||||||
|
|
||||||
export type JobsConfig = z.infer<typeof jobsConfigSchema>;
|
export type JobsConfig = z.infer<typeof jobsConfigSchema>;
|
||||||
@@ -69,9 +69,9 @@ export const readJobsConfigVariables = () => {
|
|||||||
mergeWindow: process.env.JOBS_DOCUMENT_UPDATES_MERGE_MERGE_WINDOW,
|
mergeWindow: process.env.JOBS_DOCUMENT_UPDATES_MERGE_MERGE_WINDOW,
|
||||||
cutoffWindow: process.env.JOBS_DOCUMENT_UPDATES_MERGE_CUTOFF_WINDOW,
|
cutoffWindow: process.env.JOBS_DOCUMENT_UPDATES_MERGE_CUTOFF_WINDOW,
|
||||||
},
|
},
|
||||||
uploadsClean: {
|
cleanup: {
|
||||||
enabled: process.env.JOBS_UPLOADS_CLEAN_ENABLED === 'true',
|
enabled: process.env.JOBS_CLEANUP_ENABLED === 'true',
|
||||||
cron: process.env.JOBS_UPLOADS_CLEAN_CRON,
|
cron: process.env.JOBS_CLEANUP_CRON,
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -1,12 +1,13 @@
|
|||||||
import { Readable } from 'stream';
|
import { Readable } from 'stream';
|
||||||
|
|
||||||
import { DataStore } from '@tus/server';
|
|
||||||
import { AzureStore } from '@tus/azure-store';
|
|
||||||
import {
|
import {
|
||||||
BlobServiceClient,
|
BlobServiceClient,
|
||||||
StorageSharedKeyCredential,
|
StorageSharedKeyCredential,
|
||||||
BlockBlobClient,
|
BlockBlobClient,
|
||||||
} from '@azure/storage-blob';
|
} from '@azure/storage-blob';
|
||||||
|
import { AzureStore } from '@tus/azure-store';
|
||||||
|
import { DataStore } from '@tus/server';
|
||||||
|
|
||||||
import type { AzureStorageConfig } from '@colanode/server/lib/config/storage';
|
import type { AzureStorageConfig } from '@colanode/server/lib/config/storage';
|
||||||
|
|
||||||
import type { Storage } from './core';
|
import type { Storage } from './core';
|
||||||
@@ -15,7 +16,7 @@ export class AzureBlobStorage implements Storage {
|
|||||||
private readonly containerName: string;
|
private readonly containerName: string;
|
||||||
private readonly blobServiceClient: BlobServiceClient;
|
private readonly blobServiceClient: BlobServiceClient;
|
||||||
private readonly config: AzureStorageConfig;
|
private readonly config: AzureStorageConfig;
|
||||||
public readonly tusStore: DataStore;
|
private readonly azureStore: AzureStore;
|
||||||
|
|
||||||
constructor(config: AzureStorageConfig) {
|
constructor(config: AzureStorageConfig) {
|
||||||
this.config = { ...config };
|
this.config = { ...config };
|
||||||
@@ -30,13 +31,17 @@ export class AzureBlobStorage implements Storage {
|
|||||||
);
|
);
|
||||||
this.containerName = this.config.containerName;
|
this.containerName = this.config.containerName;
|
||||||
|
|
||||||
this.tusStore = new AzureStore({
|
this.azureStore = new AzureStore({
|
||||||
account: this.config.account,
|
account: this.config.account,
|
||||||
accountKey: this.config.accountKey,
|
accountKey: this.config.accountKey,
|
||||||
containerName: this.containerName,
|
containerName: this.containerName,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public get tusStore(): DataStore {
|
||||||
|
return this.azureStore;
|
||||||
|
}
|
||||||
|
|
||||||
private getBlockBlobClient(path: string): BlockBlobClient {
|
private getBlockBlobClient(path: string): BlockBlobClient {
|
||||||
const containerClient = this.blobServiceClient.getContainerClient(
|
const containerClient = this.blobServiceClient.getContainerClient(
|
||||||
this.containerName
|
this.containerName
|
||||||
|
|||||||
@@ -1,7 +1,9 @@
|
|||||||
import { Readable } from 'stream';
|
import { Readable } from 'stream';
|
||||||
|
|
||||||
import { DataStore } from '@tus/server';
|
import { DataStore } from '@tus/server';
|
||||||
|
|
||||||
export interface Storage {
|
export interface Storage {
|
||||||
|
readonly tusStore: DataStore;
|
||||||
download(path: string): Promise<{ stream: Readable; contentType?: string }>;
|
download(path: string): Promise<{ stream: Readable; contentType?: string }>;
|
||||||
delete(path: string): Promise<void>;
|
delete(path: string): Promise<void>;
|
||||||
upload(
|
upload(
|
||||||
@@ -10,5 +12,4 @@ export interface Storage {
|
|||||||
contentType: string,
|
contentType: string,
|
||||||
contentLength?: bigint
|
contentLength?: bigint
|
||||||
): Promise<void>;
|
): Promise<void>;
|
||||||
readonly tusStore: DataStore;
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,8 +1,9 @@
|
|||||||
import { createReadStream, createWriteStream, promises as fs } from 'fs';
|
import { createReadStream, createWriteStream, promises as fs } from 'fs';
|
||||||
import { Readable } from 'stream';
|
import { Readable } from 'stream';
|
||||||
|
|
||||||
import { DataStore } from '@tus/server';
|
|
||||||
import { FileStore } from '@tus/file-store';
|
import { FileStore } from '@tus/file-store';
|
||||||
|
import { DataStore } from '@tus/server';
|
||||||
|
|
||||||
import type { FileStorageConfig } from '@colanode/server/lib/config/storage';
|
import type { FileStorageConfig } from '@colanode/server/lib/config/storage';
|
||||||
|
|
||||||
import type { Storage } from './core';
|
import type { Storage } from './core';
|
||||||
|
|||||||
@@ -1,31 +1,36 @@
|
|||||||
import { Readable } from 'stream';
|
import { Readable } from 'stream';
|
||||||
|
|
||||||
import { Storage, Bucket, File } from '@google-cloud/storage';
|
import { Storage as GoogleStorage, Bucket, File } from '@google-cloud/storage';
|
||||||
import { DataStore } from '@tus/server';
|
|
||||||
import { GCSStore } from '@tus/gcs-store';
|
import { GCSStore } from '@tus/gcs-store';
|
||||||
|
import { DataStore } from '@tus/server';
|
||||||
|
|
||||||
import type { GCSStorageConfig } from '@colanode/server/lib/config/storage';
|
import type { GCSStorageConfig } from '@colanode/server/lib/config/storage';
|
||||||
|
|
||||||
import type { Storage as StorageInterface } from './core';
|
import type { Storage } from './core';
|
||||||
|
|
||||||
export class GCSStorage implements StorageInterface {
|
export class GCSStorage implements Storage {
|
||||||
private readonly bucket: Bucket;
|
private readonly bucket: Bucket;
|
||||||
public readonly tusStore: DataStore;
|
private readonly gcsStore: GCSStore;
|
||||||
|
|
||||||
constructor(config: GCSStorageConfig) {
|
constructor(config: GCSStorageConfig) {
|
||||||
const storage = new Storage({
|
const storage = new GoogleStorage({
|
||||||
projectId: config.projectId,
|
projectId: config.projectId,
|
||||||
keyFilename: config.credentials,
|
keyFilename: config.credentials,
|
||||||
});
|
});
|
||||||
|
|
||||||
this.bucket = storage.bucket(config.bucket);
|
this.bucket = storage.bucket(config.bucket);
|
||||||
this.tusStore = new GCSStore({ bucket: this.bucket });
|
this.gcsStore = new GCSStore({ bucket: this.bucket });
|
||||||
|
}
|
||||||
|
|
||||||
|
public get tusStore(): DataStore {
|
||||||
|
return this.gcsStore;
|
||||||
}
|
}
|
||||||
|
|
||||||
private getFile(path: string): File {
|
private getFile(path: string): File {
|
||||||
return this.bucket.file(path);
|
return this.bucket.file(path);
|
||||||
}
|
}
|
||||||
|
|
||||||
async download(
|
public async download(
|
||||||
path: string
|
path: string
|
||||||
): Promise<{ stream: Readable; contentType?: string }> {
|
): Promise<{ stream: Readable; contentType?: string }> {
|
||||||
const file = this.getFile(path);
|
const file = this.getFile(path);
|
||||||
@@ -38,12 +43,12 @@ export class GCSStorage implements StorageInterface {
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
async delete(path: string): Promise<void> {
|
public async delete(path: string): Promise<void> {
|
||||||
const file = this.getFile(path);
|
const file = this.getFile(path);
|
||||||
await file.delete();
|
await file.delete();
|
||||||
}
|
}
|
||||||
|
|
||||||
async upload(
|
public async upload(
|
||||||
path: string,
|
path: string,
|
||||||
data: Buffer | Readable,
|
data: Buffer | Readable,
|
||||||
contentType: string,
|
contentType: string,
|
||||||
|
|||||||
@@ -1,9 +1,8 @@
|
|||||||
import { config } from '@colanode/server/lib/config';
|
import { config } from '@colanode/server/lib/config';
|
||||||
|
import type { StorageConfig } from '@colanode/server/lib/config/storage';
|
||||||
|
|
||||||
import type { StorageConfig } from '../config/storage';
|
|
||||||
|
|
||||||
import type { Storage } from './core';
|
|
||||||
import { AzureBlobStorage } from './azure';
|
import { AzureBlobStorage } from './azure';
|
||||||
|
import type { Storage } from './core';
|
||||||
import { FileSystemStorage } from './fs';
|
import { FileSystemStorage } from './fs';
|
||||||
import { GCSStorage } from './gcs';
|
import { GCSStorage } from './gcs';
|
||||||
import { S3Storage } from './s3';
|
import { S3Storage } from './s3';
|
||||||
@@ -20,7 +19,7 @@ const buildStorage = (storageConfig: StorageConfig): Storage => {
|
|||||||
return new AzureBlobStorage(storageConfig);
|
return new AzureBlobStorage(storageConfig);
|
||||||
default:
|
default:
|
||||||
throw new Error(
|
throw new Error(
|
||||||
`Unsupported storage type: ${(storageConfig as any).type}`
|
`Unsupported storage type: ${JSON.stringify(storageConfig)}`
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -6,22 +6,23 @@ import {
|
|||||||
PutObjectCommand,
|
PutObjectCommand,
|
||||||
S3Client,
|
S3Client,
|
||||||
} from '@aws-sdk/client-s3';
|
} from '@aws-sdk/client-s3';
|
||||||
import { RedisClientType } from '@redis/client';
|
import { MetadataValue, S3Store } from '@tus/s3-store';
|
||||||
import { FILE_UPLOAD_PART_SIZE } from '@colanode/core';
|
|
||||||
import { DataStore } from '@tus/server';
|
import { DataStore } from '@tus/server';
|
||||||
import { S3Store } from '@tus/s3-store';
|
|
||||||
|
|
||||||
import { config } from '@colanode/server/lib/config';
|
import { FILE_UPLOAD_PART_SIZE } from '@colanode/core';
|
||||||
import { RedisKvStore } from '@colanode/server/lib/storage/tus/redis-kv';
|
|
||||||
import type { S3StorageConfig } from '@colanode/server/lib/config/storage';
|
|
||||||
import { redis } from '@colanode/server/data/redis';
|
import { redis } from '@colanode/server/data/redis';
|
||||||
|
import { config } from '@colanode/server/lib/config';
|
||||||
|
import type { S3StorageConfig } from '@colanode/server/lib/config/storage';
|
||||||
|
import { RedisKvStore } from '@colanode/server/lib/storage/tus/redis-kv';
|
||||||
|
|
||||||
import type { Storage } from './core';
|
import type { Storage } from './core';
|
||||||
|
|
||||||
export class S3Storage implements Storage {
|
export class S3Storage implements Storage {
|
||||||
private readonly client: S3Client;
|
private readonly client: S3Client;
|
||||||
private readonly bucket: string;
|
private readonly bucket: string;
|
||||||
private readonly s3Config: S3StorageConfig;
|
private readonly s3Config: S3StorageConfig;
|
||||||
public readonly tusStore: DataStore;
|
private readonly s3Store: S3Store;
|
||||||
|
private readonly redisKv: RedisKvStore<MetadataValue>;
|
||||||
|
|
||||||
constructor(s3Config: S3StorageConfig) {
|
constructor(s3Config: S3StorageConfig) {
|
||||||
this.s3Config = { ...s3Config };
|
this.s3Config = { ...s3Config };
|
||||||
@@ -37,9 +38,10 @@ export class S3Storage implements Storage {
|
|||||||
|
|
||||||
this.bucket = this.s3Config.bucket;
|
this.bucket = this.s3Config.bucket;
|
||||||
|
|
||||||
this.tusStore = new S3Store({
|
this.redisKv = new RedisKvStore(redis, config.redis.tus.kvPrefix);
|
||||||
|
this.s3Store = new S3Store({
|
||||||
partSize: FILE_UPLOAD_PART_SIZE,
|
partSize: FILE_UPLOAD_PART_SIZE,
|
||||||
cache: new RedisKvStore(redis, config.redis.tus.kvPrefix),
|
cache: this.redisKv,
|
||||||
s3ClientConfig: {
|
s3ClientConfig: {
|
||||||
bucket: this.bucket,
|
bucket: this.bucket,
|
||||||
endpoint: this.s3Config.endpoint,
|
endpoint: this.s3Config.endpoint,
|
||||||
@@ -53,7 +55,11 @@ export class S3Storage implements Storage {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
async download(
|
public get tusStore(): DataStore {
|
||||||
|
return this.s3Store;
|
||||||
|
}
|
||||||
|
|
||||||
|
public async download(
|
||||||
path: string
|
path: string
|
||||||
): Promise<{ stream: Readable; contentType?: string }> {
|
): Promise<{ stream: Readable; contentType?: string }> {
|
||||||
const command = new GetObjectCommand({ Bucket: this.bucket, Key: path });
|
const command = new GetObjectCommand({ Bucket: this.bucket, Key: path });
|
||||||
@@ -69,12 +75,16 @@ export class S3Storage implements Storage {
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
async delete(path: string): Promise<void> {
|
public async delete(path: string): Promise<void> {
|
||||||
const command = new DeleteObjectCommand({ Bucket: this.bucket, Key: path });
|
const command = new DeleteObjectCommand({ Bucket: this.bucket, Key: path });
|
||||||
await this.client.send(command);
|
await this.client.send(command);
|
||||||
|
await this.redisKv.delete(path);
|
||||||
|
|
||||||
|
const infoPath = `${path}.info`;
|
||||||
|
await this.redisKv.delete(infoPath);
|
||||||
}
|
}
|
||||||
|
|
||||||
async upload(
|
public async upload(
|
||||||
path: string,
|
path: string,
|
||||||
data: Buffer | Readable,
|
data: Buffer | Readable,
|
||||||
contentType: string,
|
contentType: string,
|
||||||
|
|||||||
@@ -1,5 +1,4 @@
|
|||||||
import type { RedisClientType } from '@redis/client';
|
import type { RedisClientType } from '@redis/client';
|
||||||
import type { Upload } from '@tus/server';
|
|
||||||
import type { KvStore } from '@tus/utils';
|
import type { KvStore } from '@tus/utils';
|
||||||
import { sha256 } from 'js-sha256';
|
import { sha256 } from 'js-sha256';
|
||||||
|
|
||||||
@@ -11,7 +10,7 @@ import { sha256 } from 'js-sha256';
|
|||||||
* Original author: Mitja Puzigaća <mitjap@gmail.com>
|
* Original author: Mitja Puzigaća <mitjap@gmail.com>
|
||||||
*/
|
*/
|
||||||
|
|
||||||
export class RedisKvStore<T = Upload> implements KvStore<T> {
|
export class RedisKvStore<T> implements KvStore<T> {
|
||||||
private readonly redis: RedisClientType;
|
private readonly redis: RedisClientType;
|
||||||
private readonly prefix: string;
|
private readonly prefix: string;
|
||||||
|
|
||||||
|
|||||||
@@ -47,7 +47,7 @@ class JobService {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
this.jobWorker = new Worker(this.queueName, this.handleJobJob, {
|
this.jobWorker = new Worker(this.queueName, this.handleJob, {
|
||||||
prefix: this.prefix,
|
prefix: this.prefix,
|
||||||
connection: {
|
connection: {
|
||||||
url: config.redis.url,
|
url: config.redis.url,
|
||||||
@@ -64,7 +64,7 @@ class JobService {
|
|||||||
await this.jobQueue.add(job.type, job, options);
|
await this.jobQueue.add(job.type, job, options);
|
||||||
}
|
}
|
||||||
|
|
||||||
private handleJobJob = async (job: Job) => {
|
private handleJob = async (job: Job) => {
|
||||||
const input = job.data as JobInput;
|
const input = job.data as JobInput;
|
||||||
const handler = jobHandlerMap[input.type] as JobHandler<typeof input>;
|
const handler = jobHandlerMap[input.type] as JobHandler<typeof input>;
|
||||||
if (!handler) {
|
if (!handler) {
|
||||||
@@ -89,7 +89,7 @@ class JobService {
|
|||||||
await this.initDocumentEmbedScanRecurringJob();
|
await this.initDocumentEmbedScanRecurringJob();
|
||||||
await this.initNodeUpdatesMergeRecurringJob();
|
await this.initNodeUpdatesMergeRecurringJob();
|
||||||
await this.initDocumentUpdatesMergeRecurringJob();
|
await this.initDocumentUpdatesMergeRecurringJob();
|
||||||
await this.initUploadsCleanRecurringJob();
|
await this.initCleanupRecurringJob();
|
||||||
}
|
}
|
||||||
|
|
||||||
private async initNodeEmbedScanRecurringJob(): Promise<void> {
|
private async initNodeEmbedScanRecurringJob(): Promise<void> {
|
||||||
@@ -183,19 +183,19 @@ class JobService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private async initUploadsCleanRecurringJob(): Promise<void> {
|
private async initCleanupRecurringJob(): Promise<void> {
|
||||||
if (!this.jobQueue) {
|
if (!this.jobQueue) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
const id = 'uploads.clean';
|
const id = 'cleanup';
|
||||||
if (config.jobs.uploadsClean.enabled) {
|
if (config.jobs.cleanup.enabled) {
|
||||||
this.jobQueue.upsertJobScheduler(
|
this.jobQueue.upsertJobScheduler(
|
||||||
id,
|
id,
|
||||||
{ pattern: config.jobs.uploadsClean.cron },
|
{ pattern: config.jobs.cleanup.cron },
|
||||||
{
|
{
|
||||||
name: id,
|
name: id,
|
||||||
data: { type: 'uploads.clean' } as JobInput,
|
data: { type: 'cleanup' } as JobInput,
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
} else {
|
} else {
|
||||||
|
|||||||
Reference in New Issue
Block a user