From b086819e0945a3edb2b66088c0a9baea514134e3 Mon Sep 17 00:00:00 2001 From: Hakan Shehu Date: Mon, 13 Jan 2025 20:46:43 +0100 Subject: [PATCH] Optimize mutations --- .../src/main/data/workspace/migrations.ts | 1 - .../desktop/src/main/data/workspace/schema.ts | 1 - .../src/main/jobs/sync-pending-mutations.ts | 314 +++++++++++++++--- .../mutations/entries/entry-mark-opened.ts | 1 - .../main/mutations/entries/entry-mark-seen.ts | 1 - .../src/main/mutations/files/file-create.ts | 1 - .../src/main/mutations/files/file-delete.ts | 1 - .../main/mutations/files/file-mark-opened.ts | 1 - .../main/mutations/files/file-mark-seen.ts | 1 - .../main/mutations/messages/message-create.ts | 2 - .../main/mutations/messages/message-delete.ts | 1 - .../mutations/messages/message-mark-seen.ts | 1 - .../messages/message-reaction-create.ts | 1 - .../messages/message-reaction-delete.ts | 1 - .../src/main/services/entry-service.ts | 3 - scripts/src/seed/index.ts | 4 +- 16 files changed, 272 insertions(+), 63 deletions(-) diff --git a/apps/desktop/src/main/data/workspace/migrations.ts b/apps/desktop/src/main/data/workspace/migrations.ts index 00ea6b67..9f32ec3c 100644 --- a/apps/desktop/src/main/data/workspace/migrations.ts +++ b/apps/desktop/src/main/data/workspace/migrations.ts @@ -282,7 +282,6 @@ const createMutationsTable: Migration = { .createTable('mutations') .addColumn('id', 'text', (col) => col.notNull().primaryKey()) .addColumn('type', 'text', (col) => col.notNull()) - .addColumn('node_id', 'text', (col) => col.notNull()) .addColumn('data', 'text', (col) => col.notNull()) .addColumn('created_at', 'text', (col) => col.notNull()) .addColumn('retries', 'integer', (col) => col.notNull()) diff --git a/apps/desktop/src/main/data/workspace/schema.ts b/apps/desktop/src/main/data/workspace/schema.ts index 4dd08f4b..800e8d89 100644 --- a/apps/desktop/src/main/data/workspace/schema.ts +++ b/apps/desktop/src/main/data/workspace/schema.ts @@ -207,7 +207,6 @@ export type UpdateFileInteraction = Updateable; interface MutationTable { id: ColumnType; type: ColumnType; - node_id: ColumnType; data: ColumnType; created_at: ColumnType; retries: ColumnType; diff --git a/apps/desktop/src/main/jobs/sync-pending-mutations.ts b/apps/desktop/src/main/jobs/sync-pending-mutations.ts index 47a1b081..da631197 100644 --- a/apps/desktop/src/main/jobs/sync-pending-mutations.ts +++ b/apps/desktop/src/main/jobs/sync-pending-mutations.ts @@ -20,6 +20,9 @@ declare module '@/main/jobs' { } } +const READ_SIZE = 500; +const BATCH_SIZE = 50; + export class SyncPendingMutationsJobHandler implements JobHandler { @@ -29,6 +32,16 @@ export class SyncPendingMutationsJobHandler private readonly debug = createDebugger('job:sync-pending-mutations'); public async handleJob(input: SyncPendingMutationsInput) { + let hasMore = true; + + while (hasMore) { + hasMore = await this.syncMutations(input); + } + } + + private async syncMutations( + input: SyncPendingMutationsInput + ): Promise { this.debug(`Sending local pending mutations for user ${input.userId}`); const workspaceDatabase = await databaseService.getWorkspaceDatabase( @@ -39,11 +52,26 @@ export class SyncPendingMutationsJobHandler .selectFrom('mutations') .selectAll() .orderBy('id', 'asc') - .limit(20) + .limit(READ_SIZE) .execute(); if (unsyncedMutations.length === 0) { - return; + return false; + } + + const allMutations: Mutation[] = unsyncedMutations.map(mapMutation); + const { validMutations, deletedMutationIds } = + await this.consolidateMutations(allMutations); + + if (deletedMutationIds.size > 0) { + this.debug( + `Deleting ${deletedMutationIds.size} redundant local pending mutations for user ${input.userId}` + ); + + await workspaceDatabase + .deleteFrom('mutations') + .where('id', 'in', Array.from(deletedMutationIds)) + .execute(); } this.debug( @@ -55,60 +83,260 @@ export class SyncPendingMutationsJobHandler this.debug( `No workspace credentials found for user ${input.userId}, skipping sending local pending mutations` ); - return; + return false; } if (!serverService.isAvailable(credentials.serverDomain)) { this.debug( `Server ${credentials.serverDomain} is not available, skipping sending local pending mutations` ); - return; + return false; } - const mutations: Mutation[] = unsyncedMutations.map(mapMutation); - const { data } = await httpClient.post( - `/v1/workspaces/${credentials.workspaceId}/mutations`, - { - mutations, - }, - { - domain: credentials.serverDomain, - token: credentials.token, + const totalBatches = Math.ceil(validMutations.length / BATCH_SIZE); + let currentBatch = 1; + + try { + while (validMutations.length > 0) { + const batch = validMutations.splice(0, BATCH_SIZE); + + this.debug( + `Sending batch ${currentBatch++} of ${totalBatches} mutations for user ${input.userId}` + ); + + const { data } = await httpClient.post( + `/v1/workspaces/${credentials.workspaceId}/mutations`, + { + mutations: batch, + }, + { + domain: credentials.serverDomain, + token: credentials.token, + } + ); + + const syncedMutationIds: string[] = []; + const unsyncedMutationIds: string[] = []; + + for (const result of data.results) { + if (result.status === 'success') { + syncedMutationIds.push(result.id); + } else { + unsyncedMutationIds.push(result.id); + } + } + + if (syncedMutationIds.length > 0) { + this.debug( + `Marking ${syncedMutationIds.length} local pending mutations as sent for user ${input.userId}` + ); + + await workspaceDatabase + .deleteFrom('mutations') + .where('id', 'in', syncedMutationIds) + .execute(); + } + + if (unsyncedMutationIds.length > 0) { + this.debug( + `Marking ${unsyncedMutationIds.length} local pending mutations as failed for user ${input.userId}` + ); + + await workspaceDatabase + .updateTable('mutations') + .set((eb) => ({ retries: eb('retries', '+', 1) })) + .where('id', 'in', unsyncedMutationIds) + .execute(); + } } - ); + } catch (error) { + this.debug( + `Failed to send local pending mutations for user ${input.userId}: ${error}` + ); + } - const syncedMutationIds: string[] = []; - const unsyncedMutationIds: string[] = []; + return unsyncedMutations.length === READ_SIZE; + } - for (const result of data.results) { - if (result.status === 'success') { - syncedMutationIds.push(result.id); - } else { - unsyncedMutationIds.push(result.id); + private async consolidateMutations(mutations: Mutation[]) { + const validMutations: Mutation[] = []; + const deletedMutationIds: Set = new Set(); + + for (let i = mutations.length - 1; i >= 0; i--) { + const mutation = mutations[i]; + if (!mutation) { + continue; + } + + if (deletedMutationIds.has(mutation.id)) { + continue; + } + + if (mutation.type === 'apply_delete_transaction') { + for (let j = i - 1; j >= 0; j--) { + const previousMutation = mutations[j]; + if (!previousMutation) { + continue; + } + + if ( + previousMutation.type === 'apply_create_transaction' && + previousMutation.data.id === mutation.data.id + ) { + deletedMutationIds.add(mutation.id); + deletedMutationIds.add(previousMutation.id); + } else if ( + previousMutation.type === 'apply_update_transaction' && + previousMutation.data.id === mutation.data.id + ) { + deletedMutationIds.add(previousMutation.id); + } else if ( + previousMutation.type === 'mark_entry_opened' && + previousMutation.data.entryId === mutation.data.id + ) { + deletedMutationIds.add(previousMutation.id); + } else if ( + previousMutation.type === 'mark_entry_seen' && + previousMutation.data.entryId === mutation.data.id + ) { + deletedMutationIds.add(previousMutation.id); + } + } + } else if (mutation.type === 'delete_file') { + for (let j = i - 1; j >= 0; j--) { + const previousMutation = mutations[j]; + if (!previousMutation) { + continue; + } + + if ( + previousMutation.type === 'create_file' && + previousMutation.data.id === mutation.data.id + ) { + deletedMutationIds.add(mutation.id); + deletedMutationIds.add(previousMutation.id); + } else if ( + previousMutation.type === 'mark_file_seen' && + previousMutation.data.fileId === mutation.data.id + ) { + deletedMutationIds.add(previousMutation.id); + } else if ( + previousMutation.type === 'mark_file_opened' && + previousMutation.data.fileId === mutation.data.id + ) { + deletedMutationIds.add(previousMutation.id); + } + } + } else if (mutation.type === 'delete_message') { + for (let j = i - 1; j >= 0; j--) { + const previousMutation = mutations[j]; + if (!previousMutation) { + continue; + } + + if ( + previousMutation.type === 'create_message' && + previousMutation.data.id === mutation.data.id + ) { + deletedMutationIds.add(mutation.id); + deletedMutationIds.add(previousMutation.id); + } else if ( + previousMutation.type === 'mark_message_seen' && + previousMutation.data.messageId === mutation.data.id + ) { + deletedMutationIds.add(previousMutation.id); + } else if ( + previousMutation.type === 'create_message_reaction' && + previousMutation.data.messageId === mutation.data.id + ) { + deletedMutationIds.add(previousMutation.id); + } else if ( + previousMutation.type === 'delete_message_reaction' && + previousMutation.data.messageId === mutation.data.id + ) { + deletedMutationIds.add(previousMutation.id); + } + } + } else if (mutation.type === 'mark_entry_seen') { + for (let j = i - 1; j >= 0; j--) { + const previousMutation = mutations[j]; + if (!previousMutation) { + continue; + } + + if ( + previousMutation.type === 'mark_entry_seen' && + previousMutation.data.entryId === mutation.data.entryId + ) { + deletedMutationIds.add(previousMutation.id); + } + } + } else if (mutation.type === 'mark_entry_opened') { + for (let j = i - 1; j >= 0; j--) { + const previousMutation = mutations[j]; + if (!previousMutation) { + continue; + } + + if ( + previousMutation.type === 'mark_entry_opened' && + previousMutation.data.entryId === mutation.data.entryId + ) { + deletedMutationIds.add(previousMutation.id); + } + } + } else if (mutation.type === 'mark_file_seen') { + for (let j = i - 1; j >= 0; j--) { + const previousMutation = mutations[j]; + if (!previousMutation) { + continue; + } + + if ( + previousMutation.type === 'mark_file_seen' && + previousMutation.data.fileId === mutation.data.fileId + ) { + deletedMutationIds.add(previousMutation.id); + } + } + } else if (mutation.type === 'mark_file_opened') { + for (let j = i - 1; j >= 0; j--) { + const previousMutation = mutations[j]; + if (!previousMutation) { + continue; + } + + if ( + previousMutation.type === 'mark_file_opened' && + previousMutation.data.fileId === mutation.data.fileId + ) { + deletedMutationIds.add(previousMutation.id); + } + } + } else if (mutation.type === 'mark_message_seen') { + for (let j = i - 1; j >= 0; j--) { + const previousMutation = mutations[j]; + if (!previousMutation) { + continue; + } + + if ( + previousMutation.type === 'mark_message_seen' && + previousMutation.data.messageId === mutation.data.messageId + ) { + deletedMutationIds.add(previousMutation.id); + } + } + } + + if (!deletedMutationIds.has(mutation.id)) { + validMutations.push(mutation); } } - if (syncedMutationIds.length > 0) { - this.debug( - `Marking ${syncedMutationIds.length} local pending mutations as sent for user ${input.userId}` - ); - - await workspaceDatabase - .deleteFrom('mutations') - .where('id', 'in', syncedMutationIds) - .execute(); - } - - if (unsyncedMutationIds.length > 0) { - this.debug( - `Marking ${unsyncedMutationIds.length} local pending mutations as failed for user ${input.userId}` - ); - - await workspaceDatabase - .updateTable('mutations') - .set((eb) => ({ retries: eb('retries', '+', 1) })) - .where('id', 'in', unsyncedMutationIds) - .execute(); - } + return { + validMutations, + deletedMutationIds, + }; } } diff --git a/apps/desktop/src/main/mutations/entries/entry-mark-opened.ts b/apps/desktop/src/main/mutations/entries/entry-mark-opened.ts index 3fb89a84..cf7204de 100644 --- a/apps/desktop/src/main/mutations/entries/entry-mark-opened.ts +++ b/apps/desktop/src/main/mutations/entries/entry-mark-opened.ts @@ -100,7 +100,6 @@ export class EntryMarkOpenedMutationHandler type: mutation.type, data: JSON.stringify(mutation.data), created_at: mutation.createdAt, - node_id: input.entryId, retries: 0, }) .executeTakeFirst(); diff --git a/apps/desktop/src/main/mutations/entries/entry-mark-seen.ts b/apps/desktop/src/main/mutations/entries/entry-mark-seen.ts index ae66279b..a9dba475 100644 --- a/apps/desktop/src/main/mutations/entries/entry-mark-seen.ts +++ b/apps/desktop/src/main/mutations/entries/entry-mark-seen.ts @@ -100,7 +100,6 @@ export class EntryMarkSeenMutationHandler type: mutation.type, data: JSON.stringify(mutation.data), created_at: mutation.createdAt, - node_id: input.entryId, retries: 0, }) .executeTakeFirst(); diff --git a/apps/desktop/src/main/mutations/files/file-create.ts b/apps/desktop/src/main/mutations/files/file-create.ts index bd63dcf6..54a98a39 100644 --- a/apps/desktop/src/main/mutations/files/file-create.ts +++ b/apps/desktop/src/main/mutations/files/file-create.ts @@ -181,7 +181,6 @@ export class FileCreateMutationHandler .values({ id: generateId(IdType.Mutation), type: 'create_file', - node_id: fileId, data: JSON.stringify(mutationData), created_at: new Date().toISOString(), retries: 0, diff --git a/apps/desktop/src/main/mutations/files/file-delete.ts b/apps/desktop/src/main/mutations/files/file-delete.ts index 5fef77e5..5ed57f93 100644 --- a/apps/desktop/src/main/mutations/files/file-delete.ts +++ b/apps/desktop/src/main/mutations/files/file-delete.ts @@ -104,7 +104,6 @@ export class FileDeleteMutationHandler .values({ id: generateId(IdType.Mutation), type: 'delete_file', - node_id: input.fileId, data: JSON.stringify(deleteFileMutationData), created_at: deletedAt, retries: 0, diff --git a/apps/desktop/src/main/mutations/files/file-mark-opened.ts b/apps/desktop/src/main/mutations/files/file-mark-opened.ts index def9cb0a..2245afa9 100644 --- a/apps/desktop/src/main/mutations/files/file-mark-opened.ts +++ b/apps/desktop/src/main/mutations/files/file-mark-opened.ts @@ -100,7 +100,6 @@ export class FileMarkOpenedMutationHandler type: mutation.type, data: JSON.stringify(mutation.data), created_at: mutation.createdAt, - node_id: input.fileId, retries: 0, }) .executeTakeFirst(); diff --git a/apps/desktop/src/main/mutations/files/file-mark-seen.ts b/apps/desktop/src/main/mutations/files/file-mark-seen.ts index 0c657652..2205f03a 100644 --- a/apps/desktop/src/main/mutations/files/file-mark-seen.ts +++ b/apps/desktop/src/main/mutations/files/file-mark-seen.ts @@ -100,7 +100,6 @@ export class FileMarkSeenMutationHandler type: mutation.type, data: JSON.stringify(mutation.data), created_at: mutation.createdAt, - node_id: input.fileId, retries: 0, }) .executeTakeFirst(); diff --git a/apps/desktop/src/main/mutations/messages/message-create.ts b/apps/desktop/src/main/mutations/messages/message-create.ts index 73f18f1a..f3d554f5 100644 --- a/apps/desktop/src/main/mutations/messages/message-create.ts +++ b/apps/desktop/src/main/mutations/messages/message-create.ts @@ -163,7 +163,6 @@ export class MessageCreateMutationHandler fileMutations.push({ id: generateId(IdType.Mutation), type: 'create_file', - node_id: fileId, data: JSON.stringify(mutationData), created_at: createdAt, retries: 0, @@ -258,7 +257,6 @@ export class MessageCreateMutationHandler .values({ id: generateId(IdType.Mutation), type: 'create_message', - node_id: messageId, data: JSON.stringify(createMessageMutationData), created_at: createdAt, retries: 0, diff --git a/apps/desktop/src/main/mutations/messages/message-delete.ts b/apps/desktop/src/main/mutations/messages/message-delete.ts index e01f330f..c84c2a34 100644 --- a/apps/desktop/src/main/mutations/messages/message-delete.ts +++ b/apps/desktop/src/main/mutations/messages/message-delete.ts @@ -102,7 +102,6 @@ export class MessageDeleteMutationHandler .values({ id: generateId(IdType.Mutation), type: 'delete_message', - node_id: input.messageId, data: JSON.stringify(deleteMessageMutationData), created_at: deletedAt, retries: 0, diff --git a/apps/desktop/src/main/mutations/messages/message-mark-seen.ts b/apps/desktop/src/main/mutations/messages/message-mark-seen.ts index c72d5414..58d3369a 100644 --- a/apps/desktop/src/main/mutations/messages/message-mark-seen.ts +++ b/apps/desktop/src/main/mutations/messages/message-mark-seen.ts @@ -100,7 +100,6 @@ export class MessageMarkSeenMutationHandler type: mutation.type, data: JSON.stringify(mutation.data), created_at: mutation.createdAt, - node_id: input.messageId, retries: 0, }) .executeTakeFirst(); diff --git a/apps/desktop/src/main/mutations/messages/message-reaction-create.ts b/apps/desktop/src/main/mutations/messages/message-reaction-create.ts index 41f9bfa1..9f73862b 100644 --- a/apps/desktop/src/main/mutations/messages/message-reaction-create.ts +++ b/apps/desktop/src/main/mutations/messages/message-reaction-create.ts @@ -132,7 +132,6 @@ export class MessageReactionCreateMutationHandler type: mutation.type, data: JSON.stringify(mutation.data), created_at: mutation.createdAt, - node_id: input.messageId, retries: 0, }) .executeTakeFirst(); diff --git a/apps/desktop/src/main/mutations/messages/message-reaction-delete.ts b/apps/desktop/src/main/mutations/messages/message-reaction-delete.ts index a6007332..4646238e 100644 --- a/apps/desktop/src/main/mutations/messages/message-reaction-delete.ts +++ b/apps/desktop/src/main/mutations/messages/message-reaction-delete.ts @@ -75,7 +75,6 @@ export class MessageReactionDeleteMutationHandler type: mutation.type, data: JSON.stringify(mutation.data), created_at: mutation.createdAt, - node_id: input.messageId, retries: 0, }) .executeTakeFirst(); diff --git a/apps/desktop/src/main/services/entry-service.ts b/apps/desktop/src/main/services/entry-service.ts index 68a252cb..a93edfa3 100644 --- a/apps/desktop/src/main/services/entry-service.ts +++ b/apps/desktop/src/main/services/entry-service.ts @@ -147,7 +147,6 @@ class EntryService { .returningAll() .values({ id: generateId(IdType.Mutation), - node_id: input.id, type: 'apply_create_transaction', data: JSON.stringify(mapEntryTransaction(createdTransaction)), created_at: createdAt, @@ -325,7 +324,6 @@ class EntryService { .returningAll() .values({ id: generateId(IdType.Mutation), - node_id: entryId, type: 'apply_update_transaction', data: JSON.stringify(mapEntryTransaction(createdTransaction)), created_at: updatedAt, @@ -457,7 +455,6 @@ class EntryService { .returningAll() .values({ id: generateId(IdType.Mutation), - node_id: entryId, type: 'apply_delete_transaction', data: JSON.stringify(mapEntryTransaction(createdTransaction)), created_at: new Date().toISOString(), diff --git a/scripts/src/seed/index.ts b/scripts/src/seed/index.ts index 7a9101dc..23f98a84 100644 --- a/scripts/src/seed/index.ts +++ b/scripts/src/seed/index.ts @@ -146,7 +146,7 @@ const sendMutations = async (user: User, workspaceId: string) => { const batch = remainingMutations.splice(0, batchSize); console.log( - `Sending batch ${currentBatch} of ${totalBatches} mutations for user ${user.login.account.email}` + `Sending batch ${currentBatch++} of ${totalBatches} mutations for user ${user.login.account.email}` ); await axios.post( @@ -158,8 +158,6 @@ const sendMutations = async (user: User, workspaceId: string) => { headers: { Authorization: `Bearer ${user.login.token}` }, } ); - - currentBatch++; } };