diff --git a/apps/server/src/jobs/node-clean.ts b/apps/server/src/jobs/node-clean.ts index 44f67fa0..a7837024 100644 --- a/apps/server/src/jobs/node-clean.ts +++ b/apps/server/src/jobs/node-clean.ts @@ -1,6 +1,8 @@ +import { getIdType, IdType } from '@colanode/core'; import { database } from '@colanode/server/data/database'; import { CreateNodeTombstone } from '@colanode/server/data/schema'; import { JobHandler } from '@colanode/server/jobs'; +import { updateDocument } from '@colanode/server/lib/documents'; import { eventBus } from '@colanode/server/lib/event-bus'; import { deleteFile } from '@colanode/server/lib/files'; import { createLogger } from '@colanode/server/lib/logger'; @@ -11,6 +13,7 @@ const logger = createLogger('server:job:clean-node-data'); export type NodeCleanInput = { type: 'node.clean'; nodeId: string; + parentId: string | null; workspaceId: string; userId: string; }; @@ -29,6 +32,10 @@ export const nodeCleanHandler: JobHandler = async (input) => { await cleanNodeRelations([input.nodeId]); await cleanNodeFiles([input.nodeId]); + if (input.parentId) { + await cleanNodeFromDocument(input); + } + let hasMore = true; while (hasMore) { const children = await database @@ -164,3 +171,28 @@ const cleanNodeFiles = async (nodeIds: string[]) => { .execute(); } }; + +const cleanNodeFromDocument = async (input: NodeCleanInput) => { + if (!input.parentId) { + return; + } + + const parentIdType = getIdType(input.parentId); + if (parentIdType !== IdType.Page && parentIdType !== IdType.Record) { + return; + } + + await updateDocument({ + documentId: input.parentId, + userId: input.userId, + workspaceId: input.workspaceId, + updater: (content) => { + if (!content.blocks[input.nodeId]) { + return content; + } + + delete content.blocks[input.nodeId]; + return content; + }, + }); +}; diff --git a/apps/server/src/lib/documents.ts b/apps/server/src/lib/documents.ts index 89829e4a..285c81b1 100644 --- a/apps/server/src/lib/documents.ts +++ b/apps/server/src/lib/documents.ts @@ -1,3 +1,5 @@ +import { cloneDeep } from 'lodash-es'; + import { CanUpdateDocumentContext, DocumentContent, @@ -17,6 +19,7 @@ import { fetchNode, fetchNodeTree, mapNode } from '@colanode/server/lib/nodes'; import { CreateDocumentInput, CreateDocumentOutput, + UpdateDocumentInput, } from '@colanode/server/types/documents'; import { ConcurrentUpdateResult } from '@colanode/server/types/nodes'; @@ -286,3 +289,148 @@ const tryUpdateDocumentFromMutation = async ( return { type: 'retry' }; } }; + +export const updateDocument = async ( + input: UpdateDocumentInput +): Promise => { + for (let count = 0; count < UPDATE_RETRIES_LIMIT; count++) { + const result = await tryUpdateDocument(input); + + if (result.type === 'success') { + return true; + } + + if (result.type === 'error') { + return false; + } + } + + return false; +}; + +const tryUpdateDocument = async ( + input: UpdateDocumentInput +): Promise> => { + const node = await fetchNode(input.documentId); + if (!node) { + return { type: 'error', error: 'Node not found' }; + } + + const model = getNodeModel(node.type); + if (!model.documentSchema) { + return { type: 'error', error: 'Node does not support documents' }; + } + + const documentUpdates = await database + .selectFrom('document_updates') + .where('document_id', '=', input.documentId) + .selectAll() + .execute(); + + const ydoc = new YDoc(); + for (const update of documentUpdates) { + ydoc.applyUpdate(update.data); + } + + const currentContent = ydoc.getObject(); + const updatedContent = input.updater(cloneDeep(currentContent)); + if (!updatedContent) { + return { type: 'error', error: 'Failed to update document' }; + } + + const update = ydoc.update(model.documentSchema, updatedContent); + if (!update) { + return { type: 'error', error: 'Failed to create document update' }; + } + + const content = ydoc.getObject(); + + if (!model.documentSchema.safeParse(content).success) { + return { type: 'error', error: 'Updated content is invalid' }; + } + + const date = new Date(); + const updateId = generateId(IdType.Update); + + try { + const { updatedDocument, createdDocumentUpdate } = await database + .transaction() + .execute(async (trx) => { + const createdDocumentUpdate = await trx + .insertInto('document_updates') + .returningAll() + .values({ + id: updateId, + document_id: input.documentId, + root_id: node.root_id, + workspace_id: input.workspaceId, + data: update, + created_at: date, + created_by: input.userId, + merged_updates: null, + }) + .executeTakeFirst(); + + if (!createdDocumentUpdate) { + throw new Error('Failed to create document update'); + } + + const updatedDocument = await trx + .insertInto('documents') + .returningAll() + .values({ + id: input.documentId, + workspace_id: input.workspaceId, + content: JSON.stringify(content), + created_at: date, + created_by: input.userId, + revision: createdDocumentUpdate.revision, + }) + .onConflict((cb) => + cb.column('id').doUpdateSet({ + content: JSON.stringify(content), + updated_at: date, + updated_by: input.userId, + revision: createdDocumentUpdate.revision, + }) + ) + .executeTakeFirst(); + + if (!updatedDocument) { + throw new Error('Failed to update document'); + } + + return { + updatedDocument, + createdDocumentUpdate, + }; + }); + + if (!updatedDocument || !createdDocumentUpdate) { + throw new Error('Failed to update document'); + } + + eventBus.publish({ + type: 'document.updated', + documentId: input.documentId, + workspaceId: input.workspaceId, + }); + + eventBus.publish({ + type: 'document.update.created', + documentId: input.documentId, + rootId: node.root_id, + workspaceId: input.workspaceId, + }); + + await scheduleDocumentEmbedding(input.documentId); + + return { + type: 'success', + output: true, + }; + } catch (error) { + logger.error(error, `Failed to update document`); + return { type: 'retry' }; + } +}; diff --git a/apps/server/src/lib/nodes.ts b/apps/server/src/lib/nodes.ts index 8d51228f..a7d5a75a 100644 --- a/apps/server/src/lib/nodes.ts +++ b/apps/server/src/lib/nodes.ts @@ -730,6 +730,7 @@ export const deleteNodeFromMutation = async ( await jobService.addJob({ type: 'node.clean', nodeId: mutation.nodeId, + parentId: node.parent_id, workspaceId: user.workspace_id, userId: user.id, }); diff --git a/apps/server/src/types/documents.ts b/apps/server/src/types/documents.ts index 9bf2adeb..e3f9394a 100644 --- a/apps/server/src/types/documents.ts +++ b/apps/server/src/types/documents.ts @@ -11,3 +11,10 @@ export type CreateDocumentInput = { export type CreateDocumentOutput = { document: SelectDocument; }; + +export type UpdateDocumentInput = { + documentId: string; + userId: string; + workspaceId: string; + updater: (content: DocumentContent) => DocumentContent | null; +};