mirror of
https://github.com/colanode/colanode.git
synced 2025-12-16 11:47:47 +01:00
Delete node block from document when node is deleted (#199)
This commit is contained in:
@@ -1,6 +1,8 @@
|
||||
import { getIdType, IdType } from '@colanode/core';
|
||||
import { database } from '@colanode/server/data/database';
|
||||
import { CreateNodeTombstone } from '@colanode/server/data/schema';
|
||||
import { JobHandler } from '@colanode/server/jobs';
|
||||
import { updateDocument } from '@colanode/server/lib/documents';
|
||||
import { eventBus } from '@colanode/server/lib/event-bus';
|
||||
import { deleteFile } from '@colanode/server/lib/files';
|
||||
import { createLogger } from '@colanode/server/lib/logger';
|
||||
@@ -11,6 +13,7 @@ const logger = createLogger('server:job:clean-node-data');
|
||||
export type NodeCleanInput = {
|
||||
type: 'node.clean';
|
||||
nodeId: string;
|
||||
parentId: string | null;
|
||||
workspaceId: string;
|
||||
userId: string;
|
||||
};
|
||||
@@ -29,6 +32,10 @@ export const nodeCleanHandler: JobHandler<NodeCleanInput> = async (input) => {
|
||||
await cleanNodeRelations([input.nodeId]);
|
||||
await cleanNodeFiles([input.nodeId]);
|
||||
|
||||
if (input.parentId) {
|
||||
await cleanNodeFromDocument(input);
|
||||
}
|
||||
|
||||
let hasMore = true;
|
||||
while (hasMore) {
|
||||
const children = await database
|
||||
@@ -164,3 +171,28 @@ const cleanNodeFiles = async (nodeIds: string[]) => {
|
||||
.execute();
|
||||
}
|
||||
};
|
||||
|
||||
const cleanNodeFromDocument = async (input: NodeCleanInput) => {
|
||||
if (!input.parentId) {
|
||||
return;
|
||||
}
|
||||
|
||||
const parentIdType = getIdType(input.parentId);
|
||||
if (parentIdType !== IdType.Page && parentIdType !== IdType.Record) {
|
||||
return;
|
||||
}
|
||||
|
||||
await updateDocument({
|
||||
documentId: input.parentId,
|
||||
userId: input.userId,
|
||||
workspaceId: input.workspaceId,
|
||||
updater: (content) => {
|
||||
if (!content.blocks[input.nodeId]) {
|
||||
return content;
|
||||
}
|
||||
|
||||
delete content.blocks[input.nodeId];
|
||||
return content;
|
||||
},
|
||||
});
|
||||
};
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
import { cloneDeep } from 'lodash-es';
|
||||
|
||||
import {
|
||||
CanUpdateDocumentContext,
|
||||
DocumentContent,
|
||||
@@ -17,6 +19,7 @@ import { fetchNode, fetchNodeTree, mapNode } from '@colanode/server/lib/nodes';
|
||||
import {
|
||||
CreateDocumentInput,
|
||||
CreateDocumentOutput,
|
||||
UpdateDocumentInput,
|
||||
} from '@colanode/server/types/documents';
|
||||
import { ConcurrentUpdateResult } from '@colanode/server/types/nodes';
|
||||
|
||||
@@ -286,3 +289,148 @@ const tryUpdateDocumentFromMutation = async (
|
||||
return { type: 'retry' };
|
||||
}
|
||||
};
|
||||
|
||||
export const updateDocument = async (
|
||||
input: UpdateDocumentInput
|
||||
): Promise<boolean> => {
|
||||
for (let count = 0; count < UPDATE_RETRIES_LIMIT; count++) {
|
||||
const result = await tryUpdateDocument(input);
|
||||
|
||||
if (result.type === 'success') {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (result.type === 'error') {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
};
|
||||
|
||||
const tryUpdateDocument = async (
|
||||
input: UpdateDocumentInput
|
||||
): Promise<ConcurrentUpdateResult<boolean>> => {
|
||||
const node = await fetchNode(input.documentId);
|
||||
if (!node) {
|
||||
return { type: 'error', error: 'Node not found' };
|
||||
}
|
||||
|
||||
const model = getNodeModel(node.type);
|
||||
if (!model.documentSchema) {
|
||||
return { type: 'error', error: 'Node does not support documents' };
|
||||
}
|
||||
|
||||
const documentUpdates = await database
|
||||
.selectFrom('document_updates')
|
||||
.where('document_id', '=', input.documentId)
|
||||
.selectAll()
|
||||
.execute();
|
||||
|
||||
const ydoc = new YDoc();
|
||||
for (const update of documentUpdates) {
|
||||
ydoc.applyUpdate(update.data);
|
||||
}
|
||||
|
||||
const currentContent = ydoc.getObject<DocumentContent>();
|
||||
const updatedContent = input.updater(cloneDeep(currentContent));
|
||||
if (!updatedContent) {
|
||||
return { type: 'error', error: 'Failed to update document' };
|
||||
}
|
||||
|
||||
const update = ydoc.update(model.documentSchema, updatedContent);
|
||||
if (!update) {
|
||||
return { type: 'error', error: 'Failed to create document update' };
|
||||
}
|
||||
|
||||
const content = ydoc.getObject<DocumentContent>();
|
||||
|
||||
if (!model.documentSchema.safeParse(content).success) {
|
||||
return { type: 'error', error: 'Updated content is invalid' };
|
||||
}
|
||||
|
||||
const date = new Date();
|
||||
const updateId = generateId(IdType.Update);
|
||||
|
||||
try {
|
||||
const { updatedDocument, createdDocumentUpdate } = await database
|
||||
.transaction()
|
||||
.execute(async (trx) => {
|
||||
const createdDocumentUpdate = await trx
|
||||
.insertInto('document_updates')
|
||||
.returningAll()
|
||||
.values({
|
||||
id: updateId,
|
||||
document_id: input.documentId,
|
||||
root_id: node.root_id,
|
||||
workspace_id: input.workspaceId,
|
||||
data: update,
|
||||
created_at: date,
|
||||
created_by: input.userId,
|
||||
merged_updates: null,
|
||||
})
|
||||
.executeTakeFirst();
|
||||
|
||||
if (!createdDocumentUpdate) {
|
||||
throw new Error('Failed to create document update');
|
||||
}
|
||||
|
||||
const updatedDocument = await trx
|
||||
.insertInto('documents')
|
||||
.returningAll()
|
||||
.values({
|
||||
id: input.documentId,
|
||||
workspace_id: input.workspaceId,
|
||||
content: JSON.stringify(content),
|
||||
created_at: date,
|
||||
created_by: input.userId,
|
||||
revision: createdDocumentUpdate.revision,
|
||||
})
|
||||
.onConflict((cb) =>
|
||||
cb.column('id').doUpdateSet({
|
||||
content: JSON.stringify(content),
|
||||
updated_at: date,
|
||||
updated_by: input.userId,
|
||||
revision: createdDocumentUpdate.revision,
|
||||
})
|
||||
)
|
||||
.executeTakeFirst();
|
||||
|
||||
if (!updatedDocument) {
|
||||
throw new Error('Failed to update document');
|
||||
}
|
||||
|
||||
return {
|
||||
updatedDocument,
|
||||
createdDocumentUpdate,
|
||||
};
|
||||
});
|
||||
|
||||
if (!updatedDocument || !createdDocumentUpdate) {
|
||||
throw new Error('Failed to update document');
|
||||
}
|
||||
|
||||
eventBus.publish({
|
||||
type: 'document.updated',
|
||||
documentId: input.documentId,
|
||||
workspaceId: input.workspaceId,
|
||||
});
|
||||
|
||||
eventBus.publish({
|
||||
type: 'document.update.created',
|
||||
documentId: input.documentId,
|
||||
rootId: node.root_id,
|
||||
workspaceId: input.workspaceId,
|
||||
});
|
||||
|
||||
await scheduleDocumentEmbedding(input.documentId);
|
||||
|
||||
return {
|
||||
type: 'success',
|
||||
output: true,
|
||||
};
|
||||
} catch (error) {
|
||||
logger.error(error, `Failed to update document`);
|
||||
return { type: 'retry' };
|
||||
}
|
||||
};
|
||||
|
||||
@@ -730,6 +730,7 @@ export const deleteNodeFromMutation = async (
|
||||
await jobService.addJob({
|
||||
type: 'node.clean',
|
||||
nodeId: mutation.nodeId,
|
||||
parentId: node.parent_id,
|
||||
workspaceId: user.workspace_id,
|
||||
userId: user.id,
|
||||
});
|
||||
|
||||
@@ -11,3 +11,10 @@ export type CreateDocumentInput = {
|
||||
export type CreateDocumentOutput = {
|
||||
document: SelectDocument;
|
||||
};
|
||||
|
||||
export type UpdateDocumentInput = {
|
||||
documentId: string;
|
||||
userId: string;
|
||||
workspaceId: string;
|
||||
updater: (content: DocumentContent) => DocumentContent | null;
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user