From 87ea22f85cf0069e75d57945a3e51d0c2a11617e Mon Sep 17 00:00:00 2001 From: Hakan Shehu Date: Tue, 29 Oct 2024 00:48:46 +0100 Subject: [PATCH] Add a condition on version id for document updates --- .../main/handlers/mutations/document-save.ts | 209 ++++++++++-------- desktop/src/main/utils.ts | 23 ++ server/src/routes/sync.ts | 136 ++++++------ 3 files changed, 211 insertions(+), 157 deletions(-) diff --git a/desktop/src/main/handlers/mutations/document-save.ts b/desktop/src/main/handlers/mutations/document-save.ts index 5f54ddf6..69cb5d95 100644 --- a/desktop/src/main/handlers/mutations/document-save.ts +++ b/desktop/src/main/handlers/mutations/document-save.ts @@ -7,6 +7,7 @@ import { generateId, IdType } from '@/lib/id'; import { LocalUpdateNodeChangeData } from '@/types/sync'; import { applyChangeToYDoc, mapContentsToBlocks } from '@/lib/editor'; import { LocalNodeAttributes, NodeBlock } from '@/types/nodes'; +import { hasUpdateChanges } from '@/main/utils'; export class DocumentSaveMutationHandler implements MutationHandler @@ -18,112 +19,132 @@ export class DocumentSaveMutationHandler input.userId, ); - const document = await workspaceDatabase - .selectFrom('nodes') - .selectAll() - .where('id', '=', input.documentId) - .executeTakeFirst(); + let count = 0; + while (count++ < 10) { + const document = await workspaceDatabase + .selectFrom('nodes') + .selectAll() + .where('id', '=', input.documentId) + .executeTakeFirst(); - if (!document) { - return { - output: { - success: false, - }, - changes: [], + if (!document) { + return { + output: { + success: false, + }, + changes: [], + }; + } + + const versionId = generateId(IdType.Version); + const updatedAt = new Date().toISOString(); + const updates: string[] = []; + + const doc = new Y.Doc({ + guid: document.id, + }); + Y.applyUpdate(doc, toUint8Array(document.state)); + + doc.on('update', (update) => { + updates.push(fromUint8Array(update)); + }); + + const attributes = JSON.parse(document.attributes) as LocalNodeAttributes; + const blocksMap = new Map(); + if (attributes.content) { + for (const [key, value] of Object.entries(attributes.content)) { + blocksMap.set(key, value); + } + } + + const blocks = mapContentsToBlocks( + document.id, + input.content.content, + blocksMap, + ); + + doc.transact(() => { + applyChangeToYDoc(doc, blocks); + }); + + if (updates.length === 0) { + return { + output: { + success: true, + }, + changes: [], + }; + } + + const attributesMap = doc.getMap('attributes'); + const attributesJson = JSON.stringify(attributesMap.toJSON()); + const encodedState = fromUint8Array(Y.encodeStateAsUpdate(doc)); + + const changeData: LocalUpdateNodeChangeData = { + type: 'node_update', + id: document.id, + updatedAt: updatedAt, + updatedBy: input.userId, + versionId: versionId, + updates: updates, }; - } - const versionId = generateId(IdType.Version); - const updatedAt = new Date().toISOString(); - const updates: string[] = []; + const result = await workspaceDatabase + .transaction() + .execute(async (trx) => { + const result = await trx + .updateTable('nodes') + .set({ + attributes: attributesJson, + state: encodedState, + updated_at: updatedAt, + updated_by: input.userId, + version_id: versionId, + }) + .where('id', '=', input.documentId) + .where('version_id', '=', document.version_id) + .execute(); - const doc = new Y.Doc({ - guid: document.id, - }); - Y.applyUpdate(doc, toUint8Array(document.state)); + const hasChanges = hasUpdateChanges(result); - doc.on('update', (update) => { - updates.push(fromUint8Array(update)); - }); + if (hasChanges) { + await trx + .insertInto('changes') + .values({ + data: JSON.stringify(changeData), + created_at: updatedAt, + }) + .execute(); + } - const attributes = JSON.parse(document.attributes) as LocalNodeAttributes; - const blocksMap = new Map(); - if (attributes.content) { - for (const [key, value] of Object.entries(attributes.content)) { - blocksMap.set(key, value); + return hasChanges; + }); + + if (result) { + return { + output: { + success: true, + }, + changes: [ + { + type: 'workspace', + table: 'nodes', + userId: input.userId, + }, + { + type: 'workspace', + table: 'changes', + userId: input.userId, + }, + ], + }; } } - const blocks = mapContentsToBlocks( - document.id, - input.content.content, - blocksMap, - ); - - doc.transact(() => { - applyChangeToYDoc(doc, blocks); - }); - - if (updates.length === 0) { - return { - output: { - success: true, - }, - changes: [], - }; - } - - const attributesMap = doc.getMap('attributes'); - const attributesJson = JSON.stringify(attributesMap.toJSON()); - const encodedState = fromUint8Array(Y.encodeStateAsUpdate(doc)); - - const changeData: LocalUpdateNodeChangeData = { - type: 'node_update', - id: document.id, - updatedAt: updatedAt, - updatedBy: input.userId, - versionId: versionId, - updates: updates, - }; - - await workspaceDatabase.transaction().execute(async (trx) => { - await trx - .updateTable('nodes') - .set({ - attributes: attributesJson, - state: encodedState, - updated_at: updatedAt, - updated_by: input.userId, - version_id: versionId, - }) - .where('id', '=', input.documentId) - .execute(); - - await trx - .insertInto('changes') - .values({ - data: JSON.stringify(changeData), - created_at: updatedAt, - }) - .execute(); - }); - return { output: { - success: true, + success: false, }, - changes: [ - { - type: 'workspace', - table: 'nodes', - userId: input.userId, - }, - { - type: 'workspace', - table: 'changes', - userId: input.userId, - }, - ], }; } } diff --git a/desktop/src/main/utils.ts b/desktop/src/main/utils.ts index fa644367..9dfc380b 100644 --- a/desktop/src/main/utils.ts +++ b/desktop/src/main/utils.ts @@ -1,4 +1,5 @@ import { app } from 'electron'; +import { DeleteResult, InsertResult, UpdateResult } from 'kysely'; import path from 'path'; export const appPath = app.getPath('userData'); @@ -16,3 +17,25 @@ export const getWorkspaceFilesDirectoryPath = (userId: string): string => { export const getAccountAvatarsDirectoryPath = (accountId: string): string => { return path.join(appPath, 'avatars', accountId); }; + +export const hasInsertChanges = (result: InsertResult[]): boolean => { + if (result.length === 0) { + return false; + } + + return result.some( + (r) => r.numInsertedOrUpdatedRows && r.numInsertedOrUpdatedRows > 0n, + ); +}; + +export const hasUpdateChanges = (result: UpdateResult[]): boolean => { + if (result.length === 0) { + return false; + } + + return result.some((r) => r.numUpdatedRows && r.numUpdatedRows > 0n); +}; + +export const hasDeleteChanges = (result: DeleteResult[]): boolean => { + return result.some((r) => r.numDeletedRows && r.numDeletedRows > 0n); +}; diff --git a/server/src/routes/sync.ts b/server/src/routes/sync.ts index 3112b0db..4ed7f6fa 100644 --- a/server/src/routes/sync.ts +++ b/server/src/routes/sync.ts @@ -1,6 +1,6 @@ import * as Y from 'yjs'; import { ApiError, NeuronRequest, NeuronResponse } from '@/types/api'; -import { database } from '@/data/database'; +import { database, hasUpdateChanges } from '@/data/database'; import { Router } from 'express'; import { LocalChange, @@ -190,72 +190,82 @@ const handleUpdateNodeChange = async ( workspaceUser: SelectWorkspaceUser, changeData: LocalUpdateNodeChangeData, ): Promise => { - const existingNode = await database - .selectFrom('nodes') - .select(['id', 'workspace_id', 'attributes', 'state']) - .where('id', '=', changeData.id) - .executeTakeFirst(); + let count = 0; + while (count++ < 10) { + const existingNode = await database + .selectFrom('nodes') + .select(['id', 'workspace_id', 'attributes', 'state']) + .where('id', '=', changeData.id) + .executeTakeFirst(); - if ( - !existingNode || - existingNode.workspace_id != workspaceUser.workspace_id - ) { - return { - status: 'error', - }; + if ( + !existingNode || + existingNode.workspace_id != workspaceUser.workspace_id + ) { + return { + status: 'error', + }; + } + + const role = await fetchCollaboratorRole(changeData.id, workspaceUser.id); + if (role === null) { + return { + status: 'error', + }; + } + + const doc = new Y.Doc({ + guid: changeData.id, + }); + + Y.applyUpdate(doc, toUint8Array(existingNode.state)); + + for (const update of changeData.updates) { + Y.applyUpdate(doc, toUint8Array(update)); + } + + const attributesMap = doc.getMap('attributes'); + const attributes = attributesMap.toJSON() as ServerNodeAttributes; + const attributesJson = JSON.stringify(attributes); + const encodedState = fromUint8Array(Y.encodeStateAsUpdate(doc)); + + const result = await database + .updateTable('nodes') + .set({ + attributes: attributesJson, + state: encodedState, + updated_at: new Date(changeData.updatedAt), + updated_by: changeData.updatedBy, + version_id: changeData.versionId, + server_updated_at: new Date(), + }) + .where('id', '=', changeData.id) + .where('version_id', '=', changeData.versionId) + .execute(); + + if (hasUpdateChanges(result)) { + const event: NodeUpdatedEvent = { + type: 'node_updated', + id: changeData.id, + workspaceId: workspaceUser.workspace_id, + beforeAttributes: existingNode.attributes, + afterAttributes: attributes, + updatedBy: changeData.updatedBy, + updatedAt: changeData.updatedAt, + serverUpdatedAt: new Date().toISOString(), + versionId: changeData.versionId, + }; + + await enqueueEvent(event); + + return { + status: 'success', + }; + } } - const role = await fetchCollaboratorRole(changeData.id, workspaceUser.id); - if (role === null) { - return { - status: 'error', - }; - } - - const doc = new Y.Doc({ - guid: changeData.id, - }); - - Y.applyUpdate(doc, toUint8Array(existingNode.state)); - - for (const update of changeData.updates) { - Y.applyUpdate(doc, toUint8Array(update)); - } - - const attributesMap = doc.getMap('attributes'); - const attributes = attributesMap.toJSON() as ServerNodeAttributes; - const attributesJson = JSON.stringify(attributes); - const encodedState = fromUint8Array(Y.encodeStateAsUpdate(doc)); - - await database - .updateTable('nodes') - .set({ - attributes: attributesJson, - state: encodedState, - updated_at: new Date(changeData.updatedAt), - updated_by: changeData.updatedBy, - version_id: changeData.versionId, - server_updated_at: new Date(), - }) - .where('id', '=', changeData.id) - .execute(); - - const event: NodeUpdatedEvent = { - type: 'node_updated', - id: changeData.id, - workspaceId: workspaceUser.workspace_id, - beforeAttributes: existingNode.attributes, - afterAttributes: attributes, - updatedBy: changeData.updatedBy, - updatedAt: changeData.updatedAt, - serverUpdatedAt: new Date().toISOString(), - versionId: changeData.versionId, - }; - - await enqueueEvent(event); - return { - status: 'success', + status: 'error', }; };