From ae8064fbc310e3ded48ad0556e63b5a4ad1b280b Mon Sep 17 00:00:00 2001 From: Hakan Shehu Date: Thu, 28 Nov 2024 11:35:17 +0100 Subject: [PATCH] Improve synchronization and collaboration changes --- apps/desktop/src/main/data/app/migrations.ts | 4 +- apps/desktop/src/main/data/app/schema.ts | 4 +- .../src/main/data/workspace/migrations.ts | 5 +- .../desktop/src/main/data/workspace/schema.ts | 5 +- .../main/services/collaboration-service.ts | 84 +--- .../desktop/src/main/services/node-service.ts | 41 +- .../src/main/services/socket-connection.ts | 4 +- .../desktop/src/main/services/sync-service.ts | 70 ++-- apps/desktop/src/main/utils.ts | 15 +- apps/server/src/data/migrations.ts | 90 ++-- apps/server/src/data/schema.ts | 28 +- apps/server/src/jobs/create-collaborations.ts | 77 ---- apps/server/src/jobs/delete-collaborations.ts | 102 ----- apps/server/src/jobs/index.ts | 4 - apps/server/src/lib/collaborations.ts | 28 -- apps/server/src/lib/nodes.ts | 45 +- apps/server/src/routes/sync.ts | 6 +- apps/server/src/services/node-service.ts | 387 ++++++++---------- apps/server/src/services/synapse.ts | 135 +++--- apps/server/src/types/nodes.ts | 9 +- packages/core/src/types/messages.ts | 20 +- packages/core/src/types/sync.ts | 32 +- 22 files changed, 440 insertions(+), 755 deletions(-) delete mode 100644 apps/server/src/jobs/create-collaborations.ts delete mode 100644 apps/server/src/jobs/delete-collaborations.ts delete mode 100644 apps/server/src/lib/collaborations.ts diff --git a/apps/desktop/src/main/data/app/migrations.ts b/apps/desktop/src/main/data/app/migrations.ts index ec462e80..74a25fbc 100644 --- a/apps/desktop/src/main/data/app/migrations.ts +++ b/apps/desktop/src/main/data/app/migrations.ts @@ -91,8 +91,8 @@ const createWorkspaceCursorsTable: Migration = { .references('workspaces.user_id') .onDelete('cascade') ) - .addColumn('node_transactions', 'integer', (col) => col.notNull()) - .addColumn('collaborations', 'integer', (col) => col.notNull()) + .addColumn('transactions', 'integer', (col) => col.notNull().defaultTo(0)) + .addColumn('revocations', 'integer', (col) => col.notNull().defaultTo(0)) .addColumn('created_at', 'text', (col) => col.notNull()) .addColumn('updated_at', 'text') .execute(); diff --git a/apps/desktop/src/main/data/app/schema.ts b/apps/desktop/src/main/data/app/schema.ts index 1971ceb2..21f9f7d0 100644 --- a/apps/desktop/src/main/data/app/schema.ts +++ b/apps/desktop/src/main/data/app/schema.ts @@ -47,8 +47,8 @@ export type UpdateWorkspace = Updateable; interface WorkspaceCursorTable { user_id: ColumnType; - node_transactions: ColumnType; - collaborations: ColumnType; + transactions: ColumnType; + revocations: ColumnType; created_at: ColumnType; updated_at: ColumnType; } diff --git a/apps/desktop/src/main/data/workspace/migrations.ts b/apps/desktop/src/main/data/workspace/migrations.ts index 9892b1e8..d38823d9 100644 --- a/apps/desktop/src/main/data/workspace/migrations.ts +++ b/apps/desktop/src/main/data/workspace/migrations.ts @@ -40,14 +40,15 @@ const createNodeTransactionsTable: Migration = { .createTable('node_transactions') .addColumn('id', 'text', (col) => col.notNull().primaryKey()) .addColumn('node_id', 'text', (col) => col.notNull()) - .addColumn('type', 'text', (col) => col.notNull()) + .addColumn('node_type', 'text', (col) => col.notNull()) + .addColumn('operation', 'text', (col) => col.notNull()) .addColumn('data', 'blob') .addColumn('created_at', 'text', (col) => col.notNull()) .addColumn('created_by', 'text', (col) => col.notNull()) .addColumn('server_created_at', 'text') .addColumn('retry_count', 'integer', (col) => col.defaultTo(0)) .addColumn('status', 'text', (col) => col.defaultTo('pending')) - .addColumn('number', 'integer') + .addColumn('version', 'integer') .execute(); }, down: async (db) => { diff --git a/apps/desktop/src/main/data/workspace/schema.ts b/apps/desktop/src/main/data/workspace/schema.ts index 8bb3d81d..c3d8e434 100644 --- a/apps/desktop/src/main/data/workspace/schema.ts +++ b/apps/desktop/src/main/data/workspace/schema.ts @@ -27,14 +27,15 @@ export type SelectNodePath = Selectable; interface NodeTransactionTable { id: ColumnType; node_id: ColumnType; - type: ColumnType; + node_type: ColumnType; + operation: ColumnType; data: ColumnType; created_at: ColumnType; created_by: ColumnType; server_created_at: ColumnType; retry_count: ColumnType; status: ColumnType; - number: ColumnType; + version: ColumnType; } export type SelectNodeTransaction = Selectable; diff --git a/apps/desktop/src/main/services/collaboration-service.ts b/apps/desktop/src/main/services/collaboration-service.ts index d0adedd7..632f42d0 100644 --- a/apps/desktop/src/main/services/collaboration-service.ts +++ b/apps/desktop/src/main/services/collaboration-service.ts @@ -1,79 +1,31 @@ -import { CollaborationAttributes, ServerCollaboration } from '@colanode/core'; +import { ServerCollaborationRevocation } from '@colanode/core'; import { databaseService } from '@/main/data/database-service'; -import { decodeState, YDoc } from '@colanode/crdt'; class CollaborationService { - public async applyServerCollaboration( + public async applyServerCollaborationRevocation( userId: string, - collaboration: ServerCollaboration - ) { - if (collaboration.deletedAt) { - return this.deleteCollaboration(userId, collaboration); - } - - return this.upsertCollaboration(userId, collaboration); - } - - private async deleteCollaboration( - userId: string, - collaboration: ServerCollaboration + revocation: ServerCollaborationRevocation ) { const workspaceDatabase = await databaseService.getWorkspaceDatabase(userId); - await workspaceDatabase - .deleteFrom('collaborations') - .where('user_id', '=', collaboration.userId) - .where('node_id', '=', collaboration.nodeId) - .execute(); - } + await workspaceDatabase.transaction().execute(async (tx) => { + await tx + .deleteFrom('nodes') + .where('id', '=', revocation.nodeId) + .execute(); - private async upsertCollaboration( - userId: string, - collaboration: ServerCollaboration - ) { - const workspaceDatabase = - await databaseService.getWorkspaceDatabase(userId); + await tx + .deleteFrom('collaborations') + .where('user_id', '=', userId) + .where('node_id', '=', revocation.nodeId) + .execute(); - const existingCollaboration = await workspaceDatabase - .selectFrom('collaborations') - .selectAll() - .where('user_id', '=', userId) - .where('node_id', '=', collaboration.nodeId) - .executeTakeFirst(); - - const ydoc = new YDoc(); - if (existingCollaboration) { - ydoc.applyUpdate(existingCollaboration.state); - } - - const state = decodeState(collaboration.state); - const number = BigInt(collaboration.number); - - ydoc.applyUpdate(state); - const attributes = ydoc.getAttributes(); - const attributesJson = JSON.stringify(attributes); - - await workspaceDatabase - .insertInto('collaborations') - .values({ - node_id: collaboration.nodeId, - user_id: userId, - attributes: attributesJson, - state: state, - created_at: collaboration.createdAt, - number: number, - updated_at: collaboration.updatedAt, - }) - .onConflict((b) => - b.columns(['user_id', 'node_id']).doUpdateSet({ - attributes: attributesJson, - state: state, - number: number, - updated_at: collaboration.updatedAt, - }) - ) - .execute(); + await tx + .deleteFrom('node_transactions') + .where('node_id', '=', revocation.nodeId) + .execute(); + }); } } diff --git a/apps/desktop/src/main/services/node-service.ts b/apps/desktop/src/main/services/node-service.ts index c225377c..d11f7843 100644 --- a/apps/desktop/src/main/services/node-service.ts +++ b/apps/desktop/src/main/services/node-service.ts @@ -130,7 +130,8 @@ class NodeService { .values({ id: transactionId, node_id: inputItem.id, - type: 'create', + operation: 'create', + node_type: inputItem.attributes.type, data: update, created_at: createdAt, created_by: context.userId, @@ -308,7 +309,8 @@ class NodeService { .values({ id: transactionId, node_id: nodeId, - type: 'update', + node_type: node.type, + operation: 'update', data: update, created_at: updatedAt, created_by: context.userId, @@ -392,7 +394,8 @@ class NodeService { .values({ id: generateId(IdType.Transaction), node_id: nodeId, - type: 'delete', + node_type: node.type, + operation: 'delete', data: null, created_at: new Date().toISOString(), created_by: context.userId, @@ -425,11 +428,11 @@ class NodeService { userId: string, transaction: ServerNodeTransaction ) { - if (transaction.type === 'create') { + if (transaction.operation === 'create') { await this.applyServerCreateTransaction(userId, transaction); - } else if (transaction.type === 'update') { + } else if (transaction.operation === 'update') { await this.applyServerUpdateTransaction(userId, transaction); - } else if (transaction.type === 'delete') { + } else if (transaction.operation === 'delete') { await this.applyServerDeleteTransaction(userId, transaction); } } @@ -441,17 +444,17 @@ class NodeService { const workspaceDatabase = await databaseService.getWorkspaceDatabase(userId); - const number = BigInt(transaction.number); + const version = BigInt(transaction.version); const existingTransaction = await workspaceDatabase .selectFrom('node_transactions') - .select(['id', 'status', 'number', 'server_created_at']) + .select(['id', 'status', 'version', 'server_created_at']) .where('id', '=', transaction.id) .executeTakeFirst(); if (existingTransaction) { if ( existingTransaction.status === 'synced' && - existingTransaction.number === number && + existingTransaction.version === version && existingTransaction.server_created_at === transaction.serverCreatedAt ) { return; @@ -461,7 +464,7 @@ class NodeService { .updateTable('node_transactions') .set({ status: 'synced', - number, + version, server_created_at: transaction.serverCreatedAt, }) .where('id', '=', transaction.id) @@ -483,13 +486,14 @@ class NodeService { .values({ id: transaction.id, node_id: transaction.nodeId, - type: 'create', + node_type: transaction.nodeType, + operation: 'create', data: decodeState(transaction.data), created_at: transaction.createdAt, created_by: transaction.createdBy, retry_count: 0, status: 'synced', - number, + version, server_created_at: transaction.serverCreatedAt, }) .execute(); @@ -525,17 +529,17 @@ class NodeService { const workspaceDatabase = await databaseService.getWorkspaceDatabase(userId); - const number = BigInt(transaction.number); + const version = BigInt(transaction.version); const existingTransaction = await workspaceDatabase .selectFrom('node_transactions') - .select(['id', 'status', 'number', 'server_created_at']) + .select(['id', 'status', 'version', 'server_created_at']) .where('id', '=', transaction.id) .executeTakeFirst(); if (existingTransaction) { if ( existingTransaction.status === 'synced' && - existingTransaction.number === number && + existingTransaction.version === version && existingTransaction.server_created_at === transaction.serverCreatedAt ) { return; @@ -545,7 +549,7 @@ class NodeService { .updateTable('node_transactions') .set({ status: 'synced', - number, + version, server_created_at: transaction.serverCreatedAt, }) .where('id', '=', transaction.id) @@ -580,13 +584,14 @@ class NodeService { .values({ id: transaction.id, node_id: transaction.nodeId, - type: 'update', + node_type: transaction.nodeType, + operation: 'update', data: decodeState(transaction.data), created_at: transaction.createdAt, created_by: transaction.createdBy, retry_count: 0, status: 'synced', - number, + version, server_created_at: transaction.serverCreatedAt, }) .execute(); diff --git a/apps/desktop/src/main/services/socket-connection.ts b/apps/desktop/src/main/services/socket-connection.ts index 9aa5a891..99379c60 100644 --- a/apps/desktop/src/main/services/socket-connection.ts +++ b/apps/desktop/src/main/services/socket-connection.ts @@ -49,8 +49,8 @@ export class SocketConnection { const message: Message = JSON.parse(data); if (message.type === 'node_transactions_batch') { syncService.syncServerTransactions(message); - } else if (message.type === 'collaborations_batch') { - syncService.syncServerCollaborations(message); + } else if (message.type === 'collaboration_revocations_batch') { + syncService.syncServerRevocations(message); } }; diff --git a/apps/desktop/src/main/services/sync-service.ts b/apps/desktop/src/main/services/sync-service.ts index af38bd13..1042912d 100644 --- a/apps/desktop/src/main/services/sync-service.ts +++ b/apps/desktop/src/main/services/sync-service.ts @@ -4,8 +4,8 @@ import { eventBus } from '@/shared/lib/event-bus'; import { httpClient } from '@/shared/lib/http-client'; import { serverService } from '@/main/services/server-service'; import { - CollaborationsBatchMessage, - FetchCollaborationsMessage, + CollaborationRevocationsBatchMessage, + FetchCollaborationRevocationsMessage, FetchNodeTransactionsMessage, LocalNodeTransaction, NodeTransactionsBatchMessage, @@ -26,7 +26,7 @@ class SyncService { private readonly localSyncStates: Map = new Map(); private readonly syncingTransactions: Set = new Set(); - private readonly syncingCollaborations: Set = new Set(); + private readonly syncingRevocations: Set = new Set(); constructor() { eventBus.subscribe((event) => { @@ -49,7 +49,7 @@ class SyncService { for (const workspace of workspaces) { this.syncLocalTransactions(workspace.user_id); this.requireNodeTransactions(workspace.user_id); - this.requireCollaborations(workspace.user_id); + this.requireCollaborationRevocations(workspace.user_id); } } @@ -95,7 +95,7 @@ class SyncService { try { for (const transaction of message.transactions) { await nodeService.applyServerTransaction(message.userId, transaction); - cursor = BigInt(transaction.number); + cursor = BigInt(transaction.version); } if (cursor) { @@ -112,33 +112,35 @@ class SyncService { } } - public async syncServerCollaborations(message: CollaborationsBatchMessage) { - if (this.syncingCollaborations.has(message.userId)) { + public async syncServerRevocations( + message: CollaborationRevocationsBatchMessage + ) { + if (this.syncingRevocations.has(message.userId)) { return; } - this.syncingCollaborations.add(message.userId); + this.syncingRevocations.add(message.userId); let cursor: bigint | null = null; try { - for (const collaboration of message.collaborations) { - await collaborationService.applyServerCollaboration( + for (const revocation of message.revocations) { + await collaborationService.applyServerCollaborationRevocation( message.userId, - collaboration + revocation ); - cursor = BigInt(collaboration.number); + cursor = BigInt(revocation.version); } if (cursor) { - this.updateNodeCollaborationCursor(message.userId, cursor); + this.updateCollaborationRevocationCursor(message.userId, cursor); } } catch (error) { this.logger.error( error, - `Error syncing server collaborations for user ${message.userId}` + `Error syncing server revocations for user ${message.userId}` ); } finally { - this.syncingCollaborations.delete(message.userId); - this.requireCollaborations(message.userId); + this.syncingRevocations.delete(message.userId); + this.requireCollaborationRevocations(message.userId); } } @@ -232,7 +234,7 @@ class SyncService { 'w.user_id', 'w.workspace_id', 'w.account_id', - 'wc.node_transactions', + 'wc.transactions', ]) .where('w.user_id', '=', userId) .executeTakeFirst(); @@ -245,22 +247,17 @@ class SyncService { type: 'fetch_node_transactions', userId: workspaceWithCursor.user_id, workspaceId: workspaceWithCursor.workspace_id, - cursor: workspaceWithCursor.node_transactions?.toString() ?? null, + cursor: workspaceWithCursor.transactions?.toString() ?? '0', }; socketService.sendMessage(workspaceWithCursor.account_id, message); } - private async requireCollaborations(userId: string) { + private async requireCollaborationRevocations(userId: string) { const workspaceWithCursor = await databaseService.appDatabase .selectFrom('workspaces as w') .leftJoin('workspace_cursors as wc', 'w.user_id', 'wc.user_id') - .select([ - 'w.user_id', - 'w.workspace_id', - 'w.account_id', - 'wc.collaborations', - ]) + .select(['w.user_id', 'w.workspace_id', 'w.account_id', 'wc.revocations']) .where('w.user_id', '=', userId) .executeTakeFirst(); @@ -268,11 +265,11 @@ class SyncService { return; } - const message: FetchCollaborationsMessage = { - type: 'fetch_collaborations', + const message: FetchCollaborationRevocationsMessage = { + type: 'fetch_collaboration_revocations', userId: workspaceWithCursor.user_id, workspaceId: workspaceWithCursor.workspace_id, - cursor: workspaceWithCursor.collaborations?.toString() ?? null, + cursor: workspaceWithCursor.revocations?.toString() ?? '0', }; socketService.sendMessage(workspaceWithCursor.account_id, message); @@ -283,31 +280,34 @@ class SyncService { .insertInto('workspace_cursors') .values({ user_id: userId, - node_transactions: cursor, - collaborations: 0n, + transactions: cursor, + revocations: 0n, created_at: new Date().toISOString(), }) .onConflict((eb) => eb.column('user_id').doUpdateSet({ - node_transactions: cursor, + transactions: cursor, updated_at: new Date().toISOString(), }) ) .execute(); } - private async updateNodeCollaborationCursor(userId: string, cursor: bigint) { + private async updateCollaborationRevocationCursor( + userId: string, + cursor: bigint + ) { await databaseService.appDatabase .insertInto('workspace_cursors') .values({ user_id: userId, - collaborations: cursor, - node_transactions: 0n, + revocations: cursor, + transactions: 0n, created_at: new Date().toISOString(), }) .onConflict((eb) => eb.column('user_id').doUpdateSet({ - collaborations: cursor, + revocations: cursor, updated_at: new Date().toISOString(), }) ) diff --git a/apps/desktop/src/main/utils.ts b/apps/desktop/src/main/utils.ts index 966070f6..b1215a97 100644 --- a/apps/desktop/src/main/utils.ts +++ b/apps/desktop/src/main/utils.ts @@ -140,33 +140,36 @@ export const mapWorkspace = (row: SelectWorkspace): Workspace => { export const mapTransaction = ( row: SelectNodeTransaction ): LocalNodeTransaction => { - if (row.type === 'create' && row.data) { + if (row.operation === 'create' && row.data) { return { id: row.id, nodeId: row.node_id, - type: row.type, + nodeType: row.node_type, + operation: 'create', data: encodeState(row.data), createdAt: row.created_at, createdBy: row.created_by, }; } - if (row.type === 'update' && row.data) { + if (row.operation === 'update' && row.data) { return { id: row.id, nodeId: row.node_id, - type: row.type, + nodeType: row.node_type, + operation: 'update', data: encodeState(row.data), createdAt: row.created_at, createdBy: row.created_by, }; } - if (row.type === 'delete') { + if (row.operation === 'delete') { return { id: row.id, nodeId: row.node_id, - type: row.type, + nodeType: row.node_type, + operation: 'delete', createdAt: row.created_at, createdBy: row.created_by, }; diff --git a/apps/server/src/data/migrations.ts b/apps/server/src/data/migrations.ts index d7166e33..f41deec7 100644 --- a/apps/server/src/data/migrations.ts +++ b/apps/server/src/data/migrations.ts @@ -125,7 +125,7 @@ const createNodesTable: Migration = { const createNodeTransactionsTable: Migration = { up: async (db) => { await sql` - CREATE SEQUENCE IF NOT EXISTS node_transactions_number_seq + CREATE SEQUENCE IF NOT EXISTS node_transactions_version_seq START WITH 1000000000 INCREMENT BY 1 NO MINVALUE @@ -138,19 +138,22 @@ const createNodeTransactionsTable: Migration = { .addColumn('id', 'varchar(30)', (col) => col.notNull().primaryKey()) .addColumn('workspace_id', 'varchar(30)', (col) => col.notNull()) .addColumn('node_id', 'varchar(30)', (col) => col.notNull()) - .addColumn('type', 'varchar(30)', (col) => col.notNull()) + .addColumn('node_type', 'varchar(30)', (col) => col.notNull()) + .addColumn('operation', 'varchar(30)', (col) => col.notNull()) .addColumn('data', 'bytea') .addColumn('created_at', 'timestamptz', (col) => col.notNull()) .addColumn('created_by', 'varchar(30)', (col) => col.notNull()) .addColumn('server_created_at', 'timestamptz', (col) => col.notNull()) - .addColumn('number', 'bigint', (col) => - col.notNull().defaultTo(sql`nextval('node_transactions_number_seq')`) + .addColumn('version', 'bigint', (col) => + col.notNull().defaultTo(sql`nextval('node_transactions_version_seq')`) ) .execute(); }, down: async (db) => { await db.schema.dropTable('node_transactions').execute(); - await sql`DROP SEQUENCE IF EXISTS node_transactions_number_seq`.execute(db); + await sql`DROP SEQUENCE IF EXISTS node_transactions_version_seq`.execute( + db + ); }, }; @@ -229,9 +232,27 @@ const createNodePathsTable: Migration = { }; const createCollaborationsTable: Migration = { + up: async (db) => { + await db.schema + .createTable('collaborations') + .addColumn('user_id', 'varchar(30)', (col) => col.notNull()) + .addColumn('node_id', 'varchar(30)', (col) => col.notNull()) + .addColumn('workspace_id', 'varchar(30)', (col) => col.notNull()) + .addColumn('roles', 'jsonb', (col) => col.notNull().defaultTo('{}')) + .addColumn('created_at', 'timestamptz', (col) => col.notNull()) + .addColumn('updated_at', 'timestamptz') + .addPrimaryKeyConstraint('collaborations_pkey', ['user_id', 'node_id']) + .execute(); + }, + down: async (db) => { + await db.schema.dropTable('collaborations').execute(); + }, +}; + +const createCollaborationRevocationsTable: Migration = { up: async (db) => { await sql` - CREATE SEQUENCE IF NOT EXISTS collaboration_number_seq + CREATE SEQUENCE IF NOT EXISTS collaboration_revocations_version_seq START WITH 1000000000 INCREMENT BY 1 NO MINVALUE @@ -240,48 +261,47 @@ const createCollaborationsTable: Migration = { `.execute(db); await db.schema - .createTable('collaborations') + .createTable('collaboration_revocations') .addColumn('user_id', 'varchar(30)', (col) => col.notNull()) .addColumn('node_id', 'varchar(30)', (col) => col.notNull()) - .addColumn('type', 'varchar(30)', (col) => - col.generatedAlwaysAs(sql`(attributes->>'type')::VARCHAR(30)`).stored() - ) .addColumn('workspace_id', 'varchar(30)', (col) => col.notNull()) - .addColumn('attributes', 'jsonb', (col) => col.notNull()) - .addColumn('state', 'bytea', (col) => col.notNull()) .addColumn('created_at', 'timestamptz', (col) => col.notNull()) - .addColumn('updated_at', 'timestamptz') - .addColumn('deleted_at', 'timestamptz') - .addColumn('number', 'bigint', (col) => - col.notNull().defaultTo(sql`nextval('collaboration_number_seq')`) + .addColumn('version', 'bigint', (col) => + col + .notNull() + .defaultTo(sql`nextval('collaboration_revocations_version_seq')`) ) - .addPrimaryKeyConstraint('collaborations_pkey', ['user_id', 'node_id']) + .addPrimaryKeyConstraint('collaboration_revocations_pkey', [ + 'user_id', + 'node_id', + ]) .execute(); - // Add trigger to update number on each update await sql` - CREATE OR REPLACE FUNCTION fn_update_collaboration_number() RETURNS TRIGGER AS $$ + CREATE OR REPLACE FUNCTION handle_collaboration_update() RETURNS TRIGGER AS $$ BEGIN - NEW.number = nextval('collaboration_number_seq'); - RETURN NEW; + IF NEW.roles = '{}' THEN + DELETE FROM collaborations WHERE user_id = NEW.user_id AND node_id = NEW.node_id; + INSERT INTO collaboration_revocations (user_id, node_id, workspace_id, created_at) + VALUES (NEW.user_id, NEW.node_id, NEW.workspace_id, NOW()); + ELSE + DELETE FROM collaboration_revocations WHERE user_id = NEW.user_id AND node_id = NEW.node_id; + END IF; + RETURN NULL; END; $$ LANGUAGE plpgsql; - CREATE TRIGGER trg_update_collaboration_number - BEFORE UPDATE ON collaborations - FOR EACH ROW - EXECUTE FUNCTION fn_update_collaboration_number(); + CREATE TRIGGER after_collaboration_insert + AFTER INSERT ON collaborations + FOR EACH ROW EXECUTE FUNCTION handle_collaboration_update(); + + CREATE TRIGGER after_collaboration_update + AFTER UPDATE ON collaborations + FOR EACH ROW EXECUTE FUNCTION handle_collaboration_update(); `.execute(db); }, down: async (db) => { - // Drop trigger and function first - await sql` - DROP TRIGGER IF EXISTS trg_update_collaboration_number ON collaborations; - DROP FUNCTION IF EXISTS fn_update_collaboration_number(); - DROP SEQUENCE IF EXISTS collaboration_number_seq; - `.execute(db); - - await db.schema.dropTable('collaborations').execute(); + await db.schema.dropTable('collaboration_revocations').execute(); }, }; @@ -317,5 +337,7 @@ export const databaseMigrations: Record = { '00006_create_node_transactions_table': createNodeTransactionsTable, '00007_create_node_paths_table': createNodePathsTable, '00008_create_collaborations_table': createCollaborationsTable, - '00009_create_uploads_table': createUploadsTable, + '00009_create_collaboration_revocations_table': + createCollaborationRevocationsTable, + '00010_create_uploads_table': createUploadsTable, }; diff --git a/apps/server/src/data/schema.ts b/apps/server/src/data/schema.ts index ec164dc2..20f6d0c2 100644 --- a/apps/server/src/data/schema.ts +++ b/apps/server/src/data/schema.ts @@ -1,4 +1,4 @@ -import { NodeAttributes, WorkspaceRole } from '@colanode/core'; +import { NodeAttributes, NodeRole, WorkspaceRole } from '@colanode/core'; import { ColumnType, Insertable, @@ -101,13 +101,14 @@ export type UpdateNode = Updateable; interface NodeTransactionTable { id: ColumnType; node_id: ColumnType; + node_type: ColumnType; + operation: ColumnType; workspace_id: ColumnType; - type: ColumnType; data: ColumnType; created_at: ColumnType; created_by: ColumnType; server_created_at: ColumnType; - number: ColumnType; + version: ColumnType; } export type SelectNodeTransaction = Selectable; @@ -119,18 +120,30 @@ interface CollaborationTable { node_id: ColumnType; type: ColumnType; workspace_id: ColumnType; - attributes: JSONColumnType; - state: ColumnType; + roles: JSONColumnType, string, string>; created_at: ColumnType; updated_at: ColumnType; - deleted_at: ColumnType; - number: ColumnType; } export type SelectCollaboration = Selectable; export type CreateCollaboration = Insertable; export type UpdateCollaboration = Updateable; +interface CollaborationRevocationTable { + user_id: ColumnType; + node_id: ColumnType; + workspace_id: ColumnType; + created_at: ColumnType; + version: ColumnType; +} + +export type SelectCollaborationRevocation = + Selectable; +export type CreateCollaborationRevocation = + Insertable; +export type UpdateCollaborationRevocation = + Updateable; + interface NodePathTable { ancestor_id: ColumnType; descendant_id: ColumnType; @@ -163,6 +176,7 @@ export interface DatabaseSchema { nodes: NodeTable; node_transactions: NodeTransactionTable; collaborations: CollaborationTable; + collaboration_revocations: CollaborationRevocationTable; node_paths: NodePathTable; uploads: UploadTable; } diff --git a/apps/server/src/jobs/create-collaborations.ts b/apps/server/src/jobs/create-collaborations.ts deleted file mode 100644 index 2aa66007..00000000 --- a/apps/server/src/jobs/create-collaborations.ts +++ /dev/null @@ -1,77 +0,0 @@ -import { database } from '@/data/database'; -import { CreateCollaboration } from '@/data/schema'; -import { JobHandler } from '@/types/jobs'; -import { buildDefaultCollaboration } from '@/lib/collaborations'; -import { NodeType } from '@colanode/core'; -import { eventBus } from '@/lib/event-bus'; - -export type CreateCollaborationsInput = { - type: 'create_collaborations'; - userId: string; - nodeId: string; - workspaceId: string; -}; - -declare module '@/types/jobs' { - interface JobMap { - create_collaborations: { - input: CreateCollaborationsInput; - }; - } -} - -export const createCollaborationsHandler: JobHandler< - CreateCollaborationsInput -> = async (input) => { - const nodeRow = await database - .selectFrom('nodes') - .where('id', '=', input.nodeId) - .select(['id', 'type']) - .executeTakeFirst(); - - if (!nodeRow) { - return; - } - - const descendants = await database - .selectFrom('node_paths') - .where('ancestor_id', '=', input.nodeId) - .selectAll() - .execute(); - - if (descendants.length === 0) { - return; - } - - const collaborationsToCreate: CreateCollaboration[] = []; - for (const descendant of descendants) { - collaborationsToCreate.push( - buildDefaultCollaboration( - input.userId, - descendant.descendant_id, - nodeRow.type as NodeType, - input.workspaceId - ) - ); - } - - const createdCollaborations = await database - .insertInto('collaborations') - .returning(['node_id', 'user_id', 'workspace_id']) - .values(collaborationsToCreate) - .onConflict((b) => - b.columns(['node_id', 'user_id']).doUpdateSet({ - deleted_at: null, - }) - ) - .execute(); - - for (const createdCollaboration of createdCollaborations) { - eventBus.publish({ - type: 'collaboration_created', - nodeId: createdCollaboration.node_id, - userId: createdCollaboration.user_id, - workspaceId: createdCollaboration.workspace_id, - }); - } -}; diff --git a/apps/server/src/jobs/delete-collaborations.ts b/apps/server/src/jobs/delete-collaborations.ts deleted file mode 100644 index 255961ed..00000000 --- a/apps/server/src/jobs/delete-collaborations.ts +++ /dev/null @@ -1,102 +0,0 @@ -import { database } from '@/data/database'; -import { eventBus } from '@/lib/event-bus'; -import { JobHandler } from '@/types/jobs'; -import { extractNodeCollaborators, NodeAttributes } from '@colanode/core'; - -export type DeleteCollaborationsInput = { - type: 'delete_collaborations'; - nodeId: string; - userId: string; - workspaceId: string; -}; - -declare module '@/types/jobs' { - interface JobMap { - delete_collaborations: { - input: DeleteCollaborationsInput; - }; - } -} - -export const deleteCollaborationsHandler: JobHandler< - DeleteCollaborationsInput -> = async (input) => { - const updatedCollaboration = await database - .updateTable('collaborations') - .returning(['node_id', 'user_id']) - .where('node_id', '=', input.nodeId) - .where('user_id', '=', input.userId) - .set({ deleted_at: new Date() }) - .executeTakeFirst(); - - if (updatedCollaboration) { - eventBus.publish({ - type: 'collaboration_updated', - nodeId: updatedCollaboration.node_id, - userId: updatedCollaboration.user_id, - workspaceId: input.workspaceId, - }); - } - - await checkChildCollaborations(input.nodeId, input.userId, input.workspaceId); -}; - -const checkChildCollaborations = async ( - parentId: string, - userId: string, - workspaceId: string -) => { - let lastId = parentId; - - const parentIdsToCheck: string[] = []; - const nodeIdsToDelete: string[] = []; - while (true) { - const children = await database - .selectFrom('nodes') - .select(['id', 'type', 'attributes']) - .where('parent_id', '=', parentId) - .where('id', '>', lastId) - .orderBy('id', 'asc') - .limit(100) - .execute(); - - for (const child of children) { - const collaborators = extractNodeCollaborators(child.attributes); - if (!collaborators[userId]) { - nodeIdsToDelete.push(child.id); - } - - parentIdsToCheck.push(child.id); - lastId = child.id; - } - - if (children.length < 100) { - break; - } - } - - if (nodeIdsToDelete.length > 0) { - const updatedCollaborations = await database - .updateTable('collaborations') - .returning(['node_id', 'user_id']) - .where('node_id', 'in', nodeIdsToDelete) - .where('user_id', '=', userId) - .set({ deleted_at: new Date() }) - .execute(); - - for (const updatedCollaboration of updatedCollaborations) { - eventBus.publish({ - type: 'collaboration_updated', - nodeId: updatedCollaboration.node_id, - userId: updatedCollaboration.user_id, - workspaceId, - }); - } - } - - if (parentIdsToCheck.length > 0) { - for (const parentId of parentIdsToCheck) { - await checkChildCollaborations(parentId, userId, workspaceId); - } - } -}; diff --git a/apps/server/src/jobs/index.ts b/apps/server/src/jobs/index.ts index 13d79281..1cbaa530 100644 --- a/apps/server/src/jobs/index.ts +++ b/apps/server/src/jobs/index.ts @@ -3,8 +3,6 @@ import { JobMap } from '@/types/jobs'; import { sendEmailHandler } from '@/jobs/send-email'; import { cleanWorkspaceDataHandler } from '@/jobs/clean-workspace-data'; -import { createCollaborationsHandler } from '@/jobs/create-collaborations'; -import { deleteCollaborationsHandler } from '@/jobs/delete-collaborations'; type JobHandlerMap = { [K in keyof JobMap]: JobHandler; @@ -13,6 +11,4 @@ type JobHandlerMap = { export const jobHandlerMap: JobHandlerMap = { send_email: sendEmailHandler, clean_workspace_data: cleanWorkspaceDataHandler, - create_collaborations: createCollaborationsHandler, - delete_collaborations: deleteCollaborationsHandler, }; diff --git a/apps/server/src/lib/collaborations.ts b/apps/server/src/lib/collaborations.ts deleted file mode 100644 index 5d666f79..00000000 --- a/apps/server/src/lib/collaborations.ts +++ /dev/null @@ -1,28 +0,0 @@ -import { CreateCollaboration } from '@/data/schema'; -import { CollaborationAttributes, NodeType, registry } from '@colanode/core'; -import { YDoc } from '@colanode/crdt'; - -export const buildDefaultCollaboration = ( - userId: string, - nodeId: string, - type: NodeType, - workspaceId: string -): CreateCollaboration => { - const model = registry.getCollaborationModel(type); - - const attributes: CollaborationAttributes = { - type, - }; - - const ydoc = new YDoc(); - ydoc.updateAttributes(model.schema, attributes); - - return { - user_id: userId, - node_id: nodeId, - workspace_id: workspaceId, - attributes: JSON.stringify(ydoc.getAttributes()), - state: ydoc.getState(), - created_at: new Date(), - }; -}; diff --git a/apps/server/src/lib/nodes.ts b/apps/server/src/lib/nodes.ts index f34f2a32..734af81c 100644 --- a/apps/server/src/lib/nodes.ts +++ b/apps/server/src/lib/nodes.ts @@ -1,13 +1,13 @@ import { database } from '@/data/database'; import { - SelectCollaboration, + SelectCollaborationRevocation, SelectNode, SelectNodeTransaction, } from '@/data/schema'; import { NodeCollaborator } from '@/types/nodes'; import { NodeOutput, - ServerCollaboration, + ServerCollaborationRevocation, ServerNodeTransaction, } from '@colanode/core'; import { @@ -51,63 +51,62 @@ export const mapNode = (node: SelectNode): Node => { export const mapNodeTransaction = ( transaction: SelectNodeTransaction ): ServerNodeTransaction => { - if (transaction.type === 'create' && transaction.data) { + if (transaction.operation === 'create' && transaction.data) { return { id: transaction.id, - type: 'create', + operation: 'create', nodeId: transaction.node_id, + nodeType: transaction.node_type, workspaceId: transaction.workspace_id, data: encodeState(transaction.data), createdAt: transaction.created_at.toISOString(), createdBy: transaction.created_by, serverCreatedAt: transaction.server_created_at.toISOString(), - number: transaction.number.toString(), + version: transaction.version.toString(), }; } - if (transaction.type === 'update' && transaction.data) { + if (transaction.operation === 'update' && transaction.data) { return { id: transaction.id, - type: 'update', + operation: 'update', nodeId: transaction.node_id, + nodeType: transaction.node_type, workspaceId: transaction.workspace_id, data: encodeState(transaction.data), createdAt: transaction.created_at.toISOString(), createdBy: transaction.created_by, serverCreatedAt: transaction.server_created_at.toISOString(), - number: transaction.number.toString(), + version: transaction.version.toString(), }; } - if (transaction.type === 'delete') { + if (transaction.operation === 'delete') { return { id: transaction.id, - type: 'delete', + operation: 'delete', nodeId: transaction.node_id, + nodeType: transaction.node_type, workspaceId: transaction.workspace_id, createdAt: transaction.created_at.toISOString(), createdBy: transaction.created_by, serverCreatedAt: transaction.server_created_at.toISOString(), - number: transaction.number.toString(), + version: transaction.version.toString(), }; } throw new Error('Unknown transaction type'); }; -export const mapCollaboration = ( - collaboration: SelectCollaboration -): ServerCollaboration => { +export const mapCollaborationRevocation = ( + revocation: SelectCollaborationRevocation +): ServerCollaborationRevocation => { return { - userId: collaboration.user_id, - nodeId: collaboration.node_id, - type: collaboration.type, - workspaceId: collaboration.workspace_id, - state: encodeState(collaboration.state), - createdAt: collaboration.created_at.toISOString(), - updatedAt: collaboration.updated_at?.toISOString() ?? null, - deletedAt: collaboration.deleted_at?.toISOString() ?? null, - number: collaboration.number.toString(), + userId: revocation.user_id, + nodeId: revocation.node_id, + workspaceId: revocation.workspace_id, + createdAt: revocation.created_at.toISOString(), + version: revocation.version.toString(), }; }; diff --git a/apps/server/src/routes/sync.ts b/apps/server/src/routes/sync.ts index 69f06fd4..0e8aae57 100644 --- a/apps/server/src/routes/sync.ts +++ b/apps/server/src/routes/sync.ts @@ -71,11 +71,11 @@ const handleLocalNodeTransaction = async ( workspaceUser: SelectWorkspaceUser, transaction: LocalNodeTransaction ): Promise => { - if (transaction.type === 'create') { + if (transaction.operation === 'create') { return await handleCreateNodeTransaction(workspaceUser, transaction); - } else if (transaction.type === 'update') { + } else if (transaction.operation === 'update') { return await handleUpdateNodeTransaction(workspaceUser, transaction); - } else if (transaction.type === 'delete') { + } else if (transaction.operation === 'delete') { return await handleDeleteNodeTransaction(workspaceUser, transaction); } else { return 'error'; diff --git a/apps/server/src/services/node-service.ts b/apps/server/src/services/node-service.ts index fff44089..1214bf33 100644 --- a/apps/server/src/services/node-service.ts +++ b/apps/server/src/services/node-service.ts @@ -2,9 +2,9 @@ import { extractNodeCollaborators, generateId, IdType, - Node, NodeAttributes, NodeMutationContext, + NodeRole, registry, } from '@colanode/core'; import { decodeState, YDoc } from '@colanode/crdt'; @@ -12,11 +12,11 @@ import { CreateCollaboration, CreateNode, CreateNodeTransaction, - SelectCollaboration, + DatabaseSchema, SelectWorkspaceUser, } from '@/data/schema'; -import { cloneDeep, difference } from 'lodash-es'; -import { fetchWorkspaceUsers, mapNode } from '@/lib/nodes'; +import { cloneDeep } from 'lodash-es'; +import { mapNode } from '@/lib/nodes'; import { database } from '@/data/database'; import { fetchNodeAncestors } from '@/lib/nodes'; import { @@ -31,10 +31,9 @@ import { UpdateNodeInput, UpdateNodeOutput, } from '@/types/nodes'; -import { buildDefaultCollaboration } from '@/lib/collaborations'; import { eventBus } from '@/lib/event-bus'; import { logService } from '@/services/log'; -import { jobService } from '@/services/job-service'; +import { sql, Transaction } from 'kysely'; const UPDATE_RETRIES_LIMIT = 10; @@ -44,8 +43,9 @@ type UpdateResult = { }; type CollaboratorChangeResult = { - removedCollaborators: string[]; - addedCollaborators: string[]; + addedCollaborators: Record; + updatedCollaborators: Record; + removedCollaborators: Record; }; class NodeService { @@ -57,7 +57,8 @@ class NodeService { const model = registry.getNodeModel(input.attributes.type); const ydoc = new YDoc(); const update = ydoc.updateAttributes(model.schema, input.attributes); - const attributesJson = JSON.stringify(ydoc.getAttributes()); + const attributes = ydoc.getAttributes(); + const attributesJson = JSON.stringify(attributes); const date = new Date(); const transactionId = generateId(IdType.Transaction); @@ -74,27 +75,21 @@ class NodeService { const createTransaction: CreateNodeTransaction = { id: transactionId, node_id: input.nodeId, + node_type: input.attributes.type, workspace_id: input.workspaceId, - type: 'create', + operation: 'create', data: update, created_at: date, created_by: input.userId, server_created_at: date, }; - const createCollaborations = await this.buildCollaborations( - input.nodeId, - input.workspaceId, - input.attributes, - input.ancestors - ); - try { - const { createdNode, createdTransaction, createdCollaborations } = + const { createdNode, createdTransaction } = await this.applyDatabaseCreateTransaction( + attributes, createNode, - createTransaction, - createCollaborations + createTransaction ); eventBus.publish({ @@ -104,19 +99,9 @@ class NodeService { workspaceId: input.workspaceId, }); - for (const collaboration of createdCollaborations) { - eventBus.publish({ - type: 'collaboration_created', - userId: collaboration.user_id, - nodeId: collaboration.node_id, - workspaceId: collaboration.workspace_id, - }); - } - return { node: createdNode, transaction: createdTransaction, - createdCollaborations: createdCollaborations, }; } catch (error) { this.logger.error(error, 'Failed to create node transaction'); @@ -187,12 +172,10 @@ class NodeService { const date = new Date(); const transactionId = generateId(IdType.Transaction); - const { addedCollaborators, removedCollaborators } = - this.checkCollaboratorChanges( - node, - ancestors.filter((a) => a.id !== input.nodeId), - attributes - ); + const collaboratorChanges = this.checkCollaboratorChanges( + node.attributes, + attributes + ); try { const { updatedNode, createdTransaction } = await database @@ -221,8 +204,9 @@ class NodeService { .values({ id: transactionId, node_id: input.nodeId, + node_type: node.type, workspace_id: input.workspaceId, - type: 'update', + operation: 'update', data: update, created_at: date, created_by: input.userId, @@ -234,6 +218,12 @@ class NodeService { throw new Error('Failed to create transaction'); } + await this.applyCollaborationUpdates( + trx, + input.nodeId, + collaboratorChanges + ); + return { updatedNode, createdTransaction, @@ -247,24 +237,6 @@ class NodeService { workspaceId: input.workspaceId, }); - for (const addedCollaborator of addedCollaborators) { - jobService.addJob({ - type: 'create_collaborations', - nodeId: input.nodeId, - userId: addedCollaborator, - workspaceId: input.workspaceId, - }); - } - - for (const removedCollaborator of removedCollaborators) { - jobService.addJob({ - type: 'delete_collaborations', - nodeId: input.nodeId, - userId: removedCollaborator, - workspaceId: input.workspaceId, - }); - } - return { type: 'success', output: { @@ -319,8 +291,9 @@ class NodeService { const createTransaction: CreateNodeTransaction = { id: input.id, node_id: input.nodeId, + node_type: attributes.type, workspace_id: context.workspaceId, - type: 'create', + operation: 'create', data: typeof input.data === 'string' ? decodeState(input.data) : input.data, created_at: input.createdAt, @@ -328,20 +301,12 @@ class NodeService { server_created_at: new Date(), }; - const createCollaborations: CreateCollaboration[] = - await this.buildCollaborations( - input.nodeId, - context.workspaceId, - attributes, - ancestors - ); - try { - const { createdNode, createdTransaction, createdCollaborations } = + const { createdNode, createdTransaction } = await this.applyDatabaseCreateTransaction( + attributes, createNode, - createTransaction, - createCollaborations + createTransaction ); eventBus.publish({ @@ -351,19 +316,9 @@ class NodeService { workspaceId: context.workspaceId, }); - for (const collaboration of createdCollaborations) { - eventBus.publish({ - type: 'collaboration_created', - userId: collaboration.user_id, - nodeId: collaboration.node_id, - workspaceId: collaboration.workspace_id, - }); - } - return { node: createdNode, transaction: createdTransaction, - collaborations: createdCollaborations, }; } catch (error) { this.logger.error(error, 'Failed to apply node create transaction'); @@ -412,7 +367,7 @@ class NodeService { .selectFrom('node_transactions') .selectAll() .where('node_id', '=', input.nodeId) - .orderBy('number', 'asc') + .orderBy('version', 'asc') .execute(); const ydoc = new YDoc(); @@ -445,12 +400,10 @@ class NodeService { return { type: 'error', output: null }; } - const { addedCollaborators, removedCollaborators } = - this.checkCollaboratorChanges( - node, - ancestors.filter((a) => a.id !== input.nodeId), - attributes - ); + const collaboratorChanges = this.checkCollaboratorChanges( + node.attributes, + attributes + ); try { const { updatedNode, createdTransaction } = await database @@ -480,7 +433,8 @@ class NodeService { id: input.id, node_id: input.nodeId, workspace_id: context.workspaceId, - type: 'update', + node_type: node.type, + operation: 'update', data: typeof input.data === 'string' ? decodeState(input.data) @@ -495,6 +449,12 @@ class NodeService { throw new Error('Failed to create transaction'); } + await this.applyCollaborationUpdates( + trx, + input.nodeId, + collaboratorChanges + ); + return { updatedNode, createdTransaction, @@ -508,24 +468,6 @@ class NodeService { workspaceId: context.workspaceId, }); - for (const addedCollaborator of addedCollaborators) { - jobService.addJob({ - type: 'create_collaborations', - nodeId: input.nodeId, - userId: addedCollaborator, - workspaceId: context.workspaceId, - }); - } - - for (const removedCollaborator of removedCollaborators) { - jobService.addJob({ - type: 'delete_collaborations', - nodeId: input.nodeId, - userId: removedCollaborator, - workspaceId: context.workspaceId, - }); - } - return { type: 'success', output: { @@ -534,6 +476,7 @@ class NodeService { }, }; } catch (error) { + console.log('error', error); return { type: 'retry', output: null }; } } @@ -562,8 +505,9 @@ class NodeService { return null; } - const { deletedNode, createdTransaction, updatedCollaborations } = - await database.transaction().execute(async (trx) => { + const { deletedNode, createdTransaction } = await database + .transaction() + .execute(async (trx) => { const deletedNode = await trx .deleteFrom('nodes') .returningAll() @@ -586,7 +530,8 @@ class NodeService { id: input.id, node_id: input.nodeId, workspace_id: workspaceUser.workspace_id, - type: 'delete', + node_type: node.type, + operation: 'delete', created_at: input.createdAt, created_by: workspaceUser.id, server_created_at: new Date(), @@ -597,19 +542,9 @@ class NodeService { throw new Error('Failed to create transaction'); } - const updatedCollaborations = await trx - .updateTable('collaborations') - .returningAll() - .set({ - deleted_at: input.createdAt, - }) - .where('node_id', '=', input.nodeId) - .execute(); - return { deletedNode, createdTransaction, - updatedCollaborations, }; }); @@ -620,27 +555,27 @@ class NodeService { workspaceId: workspaceUser.workspace_id, }); - for (const collaboration of updatedCollaborations) { - eventBus.publish({ - type: 'collaboration_updated', - userId: collaboration.user_id, - nodeId: collaboration.node_id, - workspaceId: collaboration.workspace_id, - }); - } - return { node: deletedNode, transaction: createdTransaction, - updatedCollaborations, }; } private async applyDatabaseCreateTransaction( + attributes: NodeAttributes, node: CreateNode, - transaction: CreateNodeTransaction, - collaborations: CreateCollaboration[] + transaction: CreateNodeTransaction ) { + const collaborationsToCreate: CreateCollaboration[] = Object.entries( + extractNodeCollaborators(attributes) + ).map(([userId, role]) => ({ + user_id: userId, + node_id: node.id, + workspace_id: node.workspace_id, + roles: JSON.stringify({ [node.id]: role }), + created_at: new Date(), + })); + return await database.transaction().execute(async (trx) => { const createdNode = await trx .insertInto('nodes') @@ -662,128 +597,138 @@ class NodeService { throw new Error('Failed to create transaction'); } - let createdCollaborations: SelectCollaboration[] = []; - if (collaborations.length > 0) { - createdCollaborations = await trx + if (collaborationsToCreate.length > 0) { + const createdCollaborations = await trx .insertInto('collaborations') .returningAll() - .values(collaborations) + .values(collaborationsToCreate) .execute(); - if (createdCollaborations.length !== collaborations.length) { + if (createdCollaborations.length !== collaborationsToCreate.length) { throw new Error('Failed to create collaborations'); } } - return { createdNode, createdTransaction, createdCollaborations }; + await sql` + INSERT INTO collaborations (user_id, node_id, workspace_id, roles, created_at) + SELECT + c.user_id, + ${node.id} as node_id, + ${node.workspace_id} as workspace_id, + c.roles, + ${new Date()} as created_at + FROM collaborations as c + WHERE c.node_id = ${attributes.parentId} + ON CONFLICT (user_id, node_id) DO UPDATE + SET + roles = collaborations.roles || EXCLUDED.roles, + updated_at = NOW() + `.execute(trx); + + return { createdNode, createdTransaction }; }); } - private async buildCollaborations( + private async applyCollaborationUpdates( + transaction: Transaction, nodeId: string, - workspaceId: string, - attributes: NodeAttributes, - ancestors: Node[] + updateResult: CollaboratorChangeResult ) { - if (attributes.type === 'user') { - return this.buildUserCollaborations(nodeId, workspaceId); + for (const [userId, role] of Object.entries( + updateResult.addedCollaborators + )) { + const roles = JSON.stringify({ [nodeId]: role }); + await sql` + INSERT INTO collaborations (user_id, node_id, workspace_id, roles, created_at) + SELECT + ${userId} as user_id, + np.descendant_id as node_id, + np.workspace_id as workspace_id, + ${roles}, + ${new Date()} as created_at + FROM node_paths as np + WHERE np.ancestor_id = ${nodeId} + ON CONFLICT (user_id, node_id) DO UPDATE + SET + roles = collaborations.roles || EXCLUDED.roles, + updated_at = NOW() + `.execute(transaction); } - return this.buildNodeCollaborations( - nodeId, - workspaceId, - attributes, - ancestors - ); - } - - private async buildUserCollaborations( - userId: string, - workspaceId: string - ): Promise { - const createCollaborations: CreateCollaboration[] = []; - createCollaborations.push( - buildDefaultCollaboration(userId, workspaceId, 'workspace', workspaceId) - ); - - const workspaceUserIds = await fetchWorkspaceUsers(workspaceId); - - for (const workspaceUserId of workspaceUserIds) { - createCollaborations.push( - buildDefaultCollaboration(workspaceUserId, userId, 'user', workspaceId) - ); - - if (workspaceUserId === userId) { - continue; - } - - createCollaborations.push( - buildDefaultCollaboration(userId, workspaceUserId, 'user', workspaceId) - ); + for (const [userId, role] of Object.entries( + updateResult.updatedCollaborators + )) { + const roles = JSON.stringify({ [nodeId]: role }); + await sql` + INSERT INTO collaborations (user_id, node_id, workspace_id, roles, created_at) + SELECT + ${userId} as user_id, + np.descendant_id as node_id, + np.workspace_id as workspace_id, + ${roles}, + ${new Date()} as created_at + FROM node_paths as np + WHERE np.ancestor_id = ${nodeId} + ON CONFLICT (user_id, node_id) DO UPDATE + SET + roles = collaborations.roles || EXCLUDED.roles, + updated_at = NOW() + `.execute(transaction); } - return createCollaborations; - } - - private buildNodeCollaborations( - nodeId: string, - workspaceId: string, - attributes: NodeAttributes, - ancestors: Node[] - ): CreateCollaboration[] { - const collaborators = extractNodeCollaborators([ - ...ancestors.map((a) => a.attributes), - attributes, - ]); - - const collaboratorIds = Object.keys(collaborators); - const createCollaborations: CreateCollaboration[] = collaboratorIds.map( - (userId) => - buildDefaultCollaboration(userId, nodeId, attributes.type, workspaceId) + const removedCollaboratorIds = Object.keys( + updateResult.removedCollaborators ); - return createCollaborations; + if (removedCollaboratorIds.length > 0) { + await sql` + UPDATE collaborations + SET + roles = collaborations.roles - ${nodeId}, + updated_at = NOW() + WHERE user_id IN (${sql.join(removedCollaboratorIds, sql`, `)}) + AND node_id IN + ( + SELECT np.descendant_id + FROM node_paths as np + WHERE np.ancestor_id = ${nodeId} + ) + `.execute(transaction); + } } private checkCollaboratorChanges( - node: Node, - ancestors: Node[], - newAttributes: NodeAttributes + beforeAttributes: NodeAttributes, + afterAttributes: NodeAttributes ): CollaboratorChangeResult { - const beforeCollaborators = Object.keys( - extractNodeCollaborators(node.attributes) - ); + const beforeCollaborators = extractNodeCollaborators(beforeAttributes); + const afterCollaborators = extractNodeCollaborators(afterAttributes); - const afterCollaborators = Object.keys( - extractNodeCollaborators(newAttributes) - ); + const addedCollaborators: Record = {}; + const updatedCollaborators: Record = {}; + const removedCollaborators: Record = {}; - if (beforeCollaborators.length === 0 && afterCollaborators.length === 0) { - return { removedCollaborators: [], addedCollaborators: [] }; + // Check for added and updated collaborators + for (const [userId, newRole] of Object.entries(afterCollaborators)) { + if (!(userId in beforeCollaborators)) { + addedCollaborators[userId] = newRole; + } else if (beforeCollaborators[userId] !== newRole) { + updatedCollaborators[userId] = newRole; + } } - const addedCollaborators = difference( - afterCollaborators, - beforeCollaborators - ); - - const removedCollaborators = difference( - beforeCollaborators, - afterCollaborators - ); - - if (addedCollaborators.length === 0 && removedCollaborators.length === 0) { - return { removedCollaborators: [], addedCollaborators: [] }; + // Check for removed collaborators + for (const [userId, oldRole] of Object.entries(beforeCollaborators)) { + if (!(userId in afterCollaborators)) { + removedCollaborators[userId] = oldRole; + } } - const inheritedCollaborators = Object.keys( - extractNodeCollaborators(ancestors.map((a) => a.attributes)) - ); - - const added = difference(addedCollaborators, inheritedCollaborators); - const removed = difference(removedCollaborators, inheritedCollaborators); - - return { removedCollaborators: removed, addedCollaborators: added }; + return { + addedCollaborators, + updatedCollaborators, + removedCollaborators, + }; } } diff --git a/apps/server/src/services/synapse.ts b/apps/server/src/services/synapse.ts index b8081a87..8a9c0528 100644 --- a/apps/server/src/services/synapse.ts +++ b/apps/server/src/services/synapse.ts @@ -3,23 +3,19 @@ import { Server } from 'http'; import { WebSocketServer, WebSocket } from 'ws'; import { verifyToken } from '@/lib/tokens'; import { - CollaborationsBatchMessage, + CollaborationRevocationsBatchMessage, Message, NodeTransactionsBatchMessage, } from '@colanode/core'; import { logService } from '@/services/log'; -import { mapCollaboration, mapNodeTransaction } from '@/lib/nodes'; +import { mapCollaborationRevocation, mapNodeTransaction } from '@/lib/nodes'; import { eventBus } from '@/lib/event-bus'; -import { - CollaborationCreatedEvent, - CollaborationUpdatedEvent, - NodeTransactionCreatedEvent, -} from '@/types/events'; +import { NodeTransactionCreatedEvent } from '@/types/events'; interface SynapseUserCursor { workspaceId: string; userId: string; - cursor: string | null; + cursor: string; syncing: boolean; } @@ -27,8 +23,8 @@ interface SynapseConnection { accountId: string; deviceId: string; socket: WebSocket; - nodeTransactions: Map; - collaborations: Map; + transactions: Map; + revocations: Map; } class SynapseService { @@ -39,10 +35,6 @@ class SynapseService { eventBus.subscribe((event) => { if (event.type === 'node_transaction_created') { this.handleNodeTransactionCreatedEvent(event); - } else if (event.type === 'collaboration_created') { - this.handleCollaborationCreatedEvent(event); - } else if (event.type === 'collaboration_updated') { - this.handleCollaborationUpdatedEvent(event); } }); } @@ -114,8 +106,8 @@ class SynapseService { accountId: account.id, deviceId: account.deviceId, socket, - nodeTransactions: new Map(), - collaborations: new Map(), + transactions: new Map(), + revocations: new Map(), }; this.connections.set(account.deviceId, connection); @@ -139,9 +131,9 @@ class SynapseService { this.logger.trace(message, `Socket message from ${connection.deviceId}`); if (message.type === 'fetch_node_transactions') { - const state = connection.nodeTransactions.get(message.userId); + const state = connection.transactions.get(message.userId); if (!state) { - connection.nodeTransactions.set(message.userId, { + connection.transactions.set(message.userId, { userId: message.userId, workspaceId: message.workspaceId, cursor: message.cursor, @@ -153,20 +145,20 @@ class SynapseService { state.cursor = message.cursor; this.sendPendingTransactions(connection, message.userId); } - } else if (message.type === 'fetch_collaborations') { - const state = connection.collaborations.get(message.userId); + } else if (message.type === 'fetch_collaboration_revocations') { + const state = connection.revocations.get(message.userId); if (!state) { - connection.collaborations.set(message.userId, { + connection.revocations.set(message.userId, { userId: message.userId, workspaceId: message.workspaceId, cursor: message.cursor, syncing: false, }); - this.sendPendingCollaborations(connection, message.userId); + this.sendPendingRevocations(connection, message.userId); } else if (!state.syncing && state.cursor !== message.cursor) { state.cursor = message.cursor; - this.sendPendingCollaborations(connection, message.userId); + this.sendPendingRevocations(connection, message.userId); } } } @@ -175,7 +167,7 @@ class SynapseService { connection: SynapseConnection, userId: string ) { - const state = connection.nodeTransactions.get(userId); + const state = connection.transactions.get(userId); if (!state || state.syncing) { return; } @@ -186,19 +178,23 @@ class SynapseService { `Sending pending node transactions for ${connection.deviceId} with ${userId}` ); - let query = database + const unsyncedTransactions = await database .selectFrom('node_transactions as nt') - .innerJoin('collaborations as c', (join) => + .leftJoin('collaborations as c', (join) => join.on('c.user_id', '=', userId).onRef('c.node_id', '=', 'nt.node_id') ) - .selectAll('nt'); - - if (state.cursor) { - query = query.where('nt.number', '>', BigInt(state.cursor)); - } - - const unsyncedTransactions = await query - .orderBy('nt.number', 'asc') + .selectAll('nt') + .where((eb) => + eb.or([ + eb.and([ + eb('nt.workspace_id', '=', state.workspaceId), + eb('nt.node_type', 'in', ['workspace', 'user']), + ]), + eb('c.node_id', '=', eb.ref('nt.node_id')), + ]) + ) + .where('nt.version', '>', BigInt(state.cursor)) + .orderBy('nt.version', 'asc') .limit(20) .execute(); @@ -214,15 +210,15 @@ class SynapseService { transactions, }; - connection.nodeTransactions.delete(userId); + connection.transactions.delete(userId); this.sendMessage(connection, message); } - private async sendPendingCollaborations( + private async sendPendingRevocations( connection: SynapseConnection, userId: string ) { - const state = connection.collaborations.get(userId); + const state = connection.revocations.get(userId); if (!state || state.syncing) { return; } @@ -230,36 +226,31 @@ class SynapseService { state.syncing = true; this.logger.trace( state, - `Sending pending collaborations for ${connection.deviceId} with ${userId}` + `Sending pending collaboration revocations for ${connection.deviceId} with ${userId}` ); - let query = database - .selectFrom('collaborations as c') + const unsyncedRevocations = await database + .selectFrom('collaboration_revocations as cr') .selectAll() - .where('c.user_id', '=', userId); - - if (state.cursor) { - query = query.where('c.number', '>', BigInt(state.cursor)); - } - - const unsyncedCollaborations = await query - .orderBy('c.number', 'asc') + .where('cr.user_id', '=', userId) + .where('cr.version', '>', BigInt(state.cursor)) + .orderBy('cr.version', 'asc') .limit(20) .execute(); - if (unsyncedCollaborations.length === 0) { + if (unsyncedRevocations.length === 0) { state.syncing = false; return; } - const collaborations = unsyncedCollaborations.map(mapCollaboration); - const message: CollaborationsBatchMessage = { - type: 'collaborations_batch', + const revocations = unsyncedRevocations.map(mapCollaborationRevocation); + const message: CollaborationRevocationsBatchMessage = { + type: 'collaboration_revocations_batch', userId, - collaborations, + revocations, }; - connection.collaborations.delete(userId); + connection.revocations.delete(userId); this.sendMessage(connection, message); } @@ -299,44 +290,12 @@ class SynapseService { } } - private handleCollaborationCreatedEvent(event: CollaborationCreatedEvent) { - const userDevices = this.getPendingCollaborationCursors(event.userId); - if (userDevices.length === 0) { - return; - } - - for (const deviceId of userDevices) { - const socketConnection = this.connections.get(deviceId); - if (socketConnection === undefined) { - continue; - } - - this.sendPendingCollaborations(socketConnection, event.userId); - } - } - - private handleCollaborationUpdatedEvent(event: CollaborationUpdatedEvent) { - const userDevices = this.getPendingCollaborationCursors(event.userId); - if (userDevices.length === 0) { - return; - } - - for (const deviceId of userDevices) { - const socketConnection = this.connections.get(deviceId); - if (socketConnection === undefined) { - continue; - } - - this.sendPendingCollaborations(socketConnection, event.userId); - } - } - private getPendingNodeTransactionCursors( workspaceId: string ): Map { const userDevices = new Map(); for (const connection of this.connections.values()) { - const connectionUsers = connection.nodeTransactions.values(); + const connectionUsers = connection.transactions.values(); for (const user of connectionUsers) { if (user.workspaceId !== workspaceId || user.syncing) { continue; @@ -354,7 +313,7 @@ class SynapseService { private getPendingCollaborationCursors(userId: string): string[] { const userDevices: string[] = []; for (const connection of this.connections.values()) { - const connectionUsers = connection.collaborations.values(); + const connectionUsers = connection.revocations.values(); for (const user of connectionUsers) { if (user.userId !== userId || user.syncing) { continue; diff --git a/apps/server/src/types/nodes.ts b/apps/server/src/types/nodes.ts index 7e290198..50b5a22b 100644 --- a/apps/server/src/types/nodes.ts +++ b/apps/server/src/types/nodes.ts @@ -1,8 +1,4 @@ -import { - SelectCollaboration, - SelectNode, - SelectNodeTransaction, -} from '@/data/schema'; +import { SelectNode, SelectNodeTransaction } from '@/data/schema'; import { Node, NodeAttributes } from '@colanode/core'; export type NodeCollaborator = { @@ -22,7 +18,6 @@ export type CreateNodeInput = { export type CreateNodeOutput = { node: SelectNode; transaction: SelectNodeTransaction; - createdCollaborations: SelectCollaboration[]; }; export type UpdateNodeInput = { @@ -47,7 +42,6 @@ export type ApplyNodeCreateTransactionInput = { export type ApplyNodeCreateTransactionOutput = { node: SelectNode; transaction: SelectNodeTransaction; - collaborations: SelectCollaboration[]; }; export type ApplyNodeUpdateTransactionInput = { @@ -72,5 +66,4 @@ export type ApplyNodeDeleteTransactionInput = { export type ApplyNodeDeleteTransactionOutput = { node: SelectNode; transaction: SelectNodeTransaction; - updatedCollaborations: SelectCollaboration[]; }; diff --git a/packages/core/src/types/messages.ts b/packages/core/src/types/messages.ts index 68e81a4d..bb206087 100644 --- a/packages/core/src/types/messages.ts +++ b/packages/core/src/types/messages.ts @@ -1,17 +1,17 @@ -import { ServerCollaboration, ServerNodeTransaction } from './sync'; +import { ServerCollaborationRevocation, ServerNodeTransaction } from './sync'; export type FetchNodeTransactionsMessage = { type: 'fetch_node_transactions'; userId: string; workspaceId: string; - cursor: string | null; + cursor: string; }; -export type FetchCollaborationsMessage = { - type: 'fetch_collaborations'; +export type FetchCollaborationRevocationsMessage = { + type: 'fetch_collaboration_revocations'; userId: string; workspaceId: string; - cursor: string | null; + cursor: string; }; export type NodeTransactionsBatchMessage = { @@ -20,14 +20,14 @@ export type NodeTransactionsBatchMessage = { transactions: ServerNodeTransaction[]; }; -export type CollaborationsBatchMessage = { - type: 'collaborations_batch'; +export type CollaborationRevocationsBatchMessage = { + type: 'collaboration_revocations_batch'; userId: string; - collaborations: ServerCollaboration[]; + revocations: ServerCollaborationRevocation[]; }; export type Message = | FetchNodeTransactionsMessage | NodeTransactionsBatchMessage - | FetchCollaborationsMessage - | CollaborationsBatchMessage; + | FetchCollaborationRevocationsMessage + | CollaborationRevocationsBatchMessage; diff --git a/packages/core/src/types/sync.ts b/packages/core/src/types/sync.ts index 35e1aec1..35f38425 100644 --- a/packages/core/src/types/sync.ts +++ b/packages/core/src/types/sync.ts @@ -21,7 +21,8 @@ export type LocalNodeTransaction = export type LocalCreateNodeTransaction = { id: string; nodeId: string; - type: 'create'; + nodeType: string; + operation: 'create'; data: string; createdAt: string; createdBy: string; @@ -30,7 +31,8 @@ export type LocalCreateNodeTransaction = { export type LocalUpdateNodeTransaction = { id: string; nodeId: string; - type: 'update'; + nodeType: string; + operation: 'update'; data: string; createdAt: string; createdBy: string; @@ -39,44 +41,48 @@ export type LocalUpdateNodeTransaction = { export type LocalDeleteNodeTransaction = { id: string; nodeId: string; - type: 'delete'; + nodeType: string; + operation: 'delete'; createdAt: string; createdBy: string; }; export type ServerNodeCreateTransaction = { id: string; - type: 'create'; + operation: 'create'; nodeId: string; + nodeType: string; workspaceId: string; data: string; createdAt: string; createdBy: string; serverCreatedAt: string; - number: string; + version: string; }; export type ServerNodeUpdateTransaction = { id: string; - type: 'update'; + operation: 'update'; nodeId: string; + nodeType: string; workspaceId: string; data: string; createdAt: string; createdBy: string; serverCreatedAt: string; - number: string; + version: string; }; export type ServerNodeDeleteTransaction = { id: string; - type: 'delete'; + operation: 'delete'; nodeId: string; + nodeType: string; workspaceId: string; createdAt: string; createdBy: string; serverCreatedAt: string; - number: string; + version: string; }; export type ServerNodeTransaction = @@ -84,14 +90,10 @@ export type ServerNodeTransaction = | ServerNodeUpdateTransaction | ServerNodeDeleteTransaction; -export type ServerCollaboration = { +export type ServerCollaborationRevocation = { userId: string; nodeId: string; - type: string; workspaceId: string; - state: string; createdAt: string; - updatedAt: string | null; - deletedAt: string | null; - number: string; + version: string; };