From 712d0da8aaf7362bacaf63e18a9cc268b39b21fb Mon Sep 17 00:00:00 2001 From: Hakan Shehu Date: Fri, 15 Nov 2024 10:49:07 +0100 Subject: [PATCH] Improve sync service and introduce account sync --- .../main/handlers/mutations/account-logout.ts | 32 +++ .../main/handlers/mutations/account-update.ts | 2 +- .../main/handlers/mutations/email-login.ts | 36 ++- .../main/handlers/mutations/email-register.ts | 36 ++- .../src/main/handlers/mutations/index.ts | 4 +- .../src/main/handlers/mutations/logout.ts | 119 -------- .../src/main/services/account-service.ts | 228 ++++++++++++++++ .../desktop/src/main/services/sync-service.ts | 255 +----------------- .../src/main/services/workspace-service.ts | 240 +++++++++++++++++ apps/desktop/src/main/utils.ts | 40 ++- .../components/accounts/account-logout.tsx | 2 +- .../src/shared/mutations/account-logout.ts | 17 ++ apps/desktop/src/shared/mutations/logout.ts | 17 -- apps/desktop/src/shared/types/accounts.ts | 13 - apps/server/src/routes/accounts.ts | 91 ++++++- apps/server/src/routes/workspaces.ts | 2 +- packages/core/src/index.ts | 1 + .../core}/src/types/accounts.ts | 12 +- 18 files changed, 726 insertions(+), 421 deletions(-) create mode 100644 apps/desktop/src/main/handlers/mutations/account-logout.ts delete mode 100644 apps/desktop/src/main/handlers/mutations/logout.ts create mode 100644 apps/desktop/src/main/services/account-service.ts create mode 100644 apps/desktop/src/main/services/workspace-service.ts create mode 100644 apps/desktop/src/shared/mutations/account-logout.ts delete mode 100644 apps/desktop/src/shared/mutations/logout.ts rename {apps/server => packages/core}/src/types/accounts.ts (83%) diff --git a/apps/desktop/src/main/handlers/mutations/account-logout.ts b/apps/desktop/src/main/handlers/mutations/account-logout.ts new file mode 100644 index 00000000..dd22f9ce --- /dev/null +++ b/apps/desktop/src/main/handlers/mutations/account-logout.ts @@ -0,0 +1,32 @@ +import { databaseService } from '@/main/data/database-service'; +import { + AccountLogoutMutationInput, + AccountLogoutMutationOutput, +} from '@/shared/mutations/account-logout'; +import { MutationHandler } from '@/main/types'; +import { accountService } from '@/main/services/account-service'; + +export class AccountLogoutMutationHandler + implements MutationHandler +{ + async handleMutation( + input: AccountLogoutMutationInput + ): Promise { + const account = await databaseService.appDatabase + .selectFrom('accounts') + .selectAll() + .where('id', '=', input.accountId) + .executeTakeFirst(); + + if (!account) { + return { + success: false, + }; + } + + await accountService.logoutAccount(account); + return { + success: true, + }; + } +} diff --git a/apps/desktop/src/main/handlers/mutations/account-update.ts b/apps/desktop/src/main/handlers/mutations/account-update.ts index 8282e2ef..0deeec45 100644 --- a/apps/desktop/src/main/handlers/mutations/account-update.ts +++ b/apps/desktop/src/main/handlers/mutations/account-update.ts @@ -1,7 +1,7 @@ import { databaseService } from '@/main/data/database-service'; import { httpClient } from '@/shared/lib/http-client'; import { MutationHandler } from '@/main/types'; -import { AccountUpdateOutput } from '@/shared/types/accounts'; +import { AccountUpdateOutput } from '@colanode/core'; import { AccountUpdateMutationInput, AccountUpdateMutationOutput, diff --git a/apps/desktop/src/main/handlers/mutations/email-login.ts b/apps/desktop/src/main/handlers/mutations/email-login.ts index 35aa20b8..fbfd4fd8 100644 --- a/apps/desktop/src/main/handlers/mutations/email-login.ts +++ b/apps/desktop/src/main/handlers/mutations/email-login.ts @@ -1,4 +1,4 @@ -import { LoginOutput } from '@/shared/types/accounts'; +import { LoginOutput } from '@colanode/core'; import { databaseService } from '@/main/data/database-service'; import { EmailLoginMutationInput, @@ -7,6 +7,7 @@ import { import { MutationHandler } from '@/main/types'; import { httpClient } from '@/shared/lib/http-client'; import { eventBus } from '@/shared/lib/event-bus'; +import { Account } from '@/shared/types/accounts'; export class EmailLoginMutationHandler implements MutationHandler @@ -38,20 +39,37 @@ export class EmailLoginMutationHandler } ); + let account: Account | undefined; await databaseService.appDatabase.transaction().execute(async (trx) => { - await trx + const createdAccount = await trx .insertInto('accounts') + .returningAll() .values({ id: data.account.id, name: data.account.name, avatar: data.account.avatar, - device_id: data.account.deviceId, + device_id: data.deviceId, email: data.account.email, - token: data.account.token, + token: data.token, server: server.domain, status: 'active', }) - .execute(); + .executeTakeFirst(); + + if (!createdAccount) { + throw new Error('Failed to create account!'); + } + + account = { + id: createdAccount.id, + name: createdAccount.name, + email: createdAccount.email, + avatar: createdAccount.avatar, + deviceId: data.deviceId, + token: data.token, + status: 'active', + server: server.domain, + }; if (data.workspaces.length === 0) { return; @@ -74,9 +92,13 @@ export class EmailLoginMutationHandler .execute(); }); + if (!account) { + throw new Error('Failed to create account!'); + } + eventBus.publish({ type: 'account_created', - account: data.account, + account, }); if (data.workspaces.length > 0) { @@ -97,7 +119,7 @@ export class EmailLoginMutationHandler return { success: true, - account: data.account, + account, workspaces: data.workspaces, }; } diff --git a/apps/desktop/src/main/handlers/mutations/email-register.ts b/apps/desktop/src/main/handlers/mutations/email-register.ts index 019c762e..ba104be6 100644 --- a/apps/desktop/src/main/handlers/mutations/email-register.ts +++ b/apps/desktop/src/main/handlers/mutations/email-register.ts @@ -1,4 +1,4 @@ -import { LoginOutput } from '@/shared/types/accounts'; +import { LoginOutput } from '@colanode/core'; import { databaseService } from '@/main/data/database-service'; import { httpClient } from '@/shared/lib/http-client'; import { @@ -7,6 +7,7 @@ import { } from '@/shared/mutations/email-register'; import { MutationHandler } from '@/main/types'; import { eventBus } from '@/shared/lib/event-bus'; +import { Account } from '@/shared/types/accounts'; export class EmailRegisterMutationHandler implements MutationHandler @@ -39,20 +40,37 @@ export class EmailRegisterMutationHandler } ); + let account: Account | undefined; await databaseService.appDatabase.transaction().execute(async (trx) => { - await trx + const createdAccount = await trx .insertInto('accounts') + .returningAll() .values({ id: data.account.id, name: data.account.name, avatar: data.account.avatar, - device_id: data.account.deviceId, + device_id: data.deviceId, email: data.account.email, - token: data.account.token, + token: data.token, server: server.domain, status: 'active', }) - .execute(); + .executeTakeFirst(); + + if (!createdAccount) { + throw new Error('Failed to create account!'); + } + + account = { + id: createdAccount.id, + name: createdAccount.name, + email: createdAccount.email, + avatar: createdAccount.avatar, + deviceId: data.deviceId, + token: data.token, + status: 'active', + server: server.domain, + }; if (data.workspaces.length === 0) { return; @@ -75,9 +93,13 @@ export class EmailRegisterMutationHandler .execute(); }); + if (!account) { + throw new Error('Failed to create account!'); + } + eventBus.publish({ type: 'account_created', - account: data.account, + account, }); if (data.workspaces.length > 0) { @@ -98,7 +120,7 @@ export class EmailRegisterMutationHandler return { success: true, - account: data.account, + account, workspaces: data.workspaces, }; } diff --git a/apps/desktop/src/main/handlers/mutations/index.ts b/apps/desktop/src/main/handlers/mutations/index.ts index 6f1b247f..f7bd0639 100644 --- a/apps/desktop/src/main/handlers/mutations/index.ts +++ b/apps/desktop/src/main/handlers/mutations/index.ts @@ -27,7 +27,7 @@ import { WorkspaceCreateMutationHandler } from '@/main/handlers/mutations/worksp import { WorkspaceUpdateMutationHandler } from '@/main/handlers/mutations/workspace-update'; import { DocumentSaveMutationHandler } from '@/main/handlers/mutations/document-save'; import { AvatarUploadMutationHandler } from '@/main/handlers/mutations/avatar-upload'; -import { LogoutMutationHandler } from '@/main/handlers/mutations/logout'; +import { AccountLogoutMutationHandler } from '@/main/handlers/mutations/account-logout'; import { ServerNodeSyncMutationHandler } from '@/main/handlers/mutations/server-node-sync'; import { ServerNodeDeleteMutationHandler } from '@/main/handlers/mutations/server-node-delete'; import { FolderCreateMutationHandler } from '@/main/handlers/mutations/folder-create'; @@ -72,7 +72,7 @@ export const mutationHandlerMap: MutationHandlerMap = { workspace_update: new WorkspaceUpdateMutationHandler(), document_save: new DocumentSaveMutationHandler(), avatar_upload: new AvatarUploadMutationHandler(), - logout: new LogoutMutationHandler(), + account_logout: new AccountLogoutMutationHandler(), server_node_sync: new ServerNodeSyncMutationHandler(), server_node_delete: new ServerNodeDeleteMutationHandler(), folder_create: new FolderCreateMutationHandler(), diff --git a/apps/desktop/src/main/handlers/mutations/logout.ts b/apps/desktop/src/main/handlers/mutations/logout.ts deleted file mode 100644 index 4848c009..00000000 --- a/apps/desktop/src/main/handlers/mutations/logout.ts +++ /dev/null @@ -1,119 +0,0 @@ -import fs from 'fs'; -import { databaseService } from '@/main/data/database-service'; -import { - LogoutMutationInput, - LogoutMutationOutput, -} from '@/shared/mutations/logout'; -import { MutationHandler } from '@/main/types'; -import { - getAccountAvatarsDirectoryPath, - getWorkspaceDirectoryPath, -} from '@/main/utils'; -import { eventBus } from '@/shared/lib/event-bus'; - -export class LogoutMutationHandler - implements MutationHandler -{ - async handleMutation( - input: LogoutMutationInput - ): Promise { - const account = await databaseService.appDatabase - .selectFrom('accounts') - .selectAll() - .where('id', '=', input.accountId) - .executeTakeFirst(); - - if (!account) { - return { - success: false, - }; - } - - const workspaces = await databaseService.appDatabase - .selectFrom('workspaces') - .selectAll() - .where('account_id', '=', account.id) - .execute(); - - for (const workspace of workspaces) { - await databaseService.deleteWorkspaceDatabase(workspace.user_id); - - const workspaceDir = getWorkspaceDirectoryPath(workspace.user_id); - if (fs.existsSync(workspaceDir)) { - fs.rmSync(workspaceDir, { recursive: true }); - } - } - - const avatarsDir = getAccountAvatarsDirectoryPath(account.id); - if (fs.existsSync(avatarsDir)) { - fs.rmSync(avatarsDir, { recursive: true }); - } - - const deletedAccount = await databaseService.appDatabase - .deleteFrom('accounts') - .returningAll() - .where('id', '=', account.id) - .executeTakeFirst(); - - if (!deletedAccount) { - return { - success: false, - }; - } - - eventBus.publish({ - type: 'account_deleted', - account: { - id: deletedAccount.id, - server: deletedAccount.server, - name: deletedAccount.name, - email: deletedAccount.email, - avatar: deletedAccount.avatar, - token: deletedAccount.token, - deviceId: deletedAccount.device_id, - status: deletedAccount.status, - }, - }); - - const deletedWorkspaces = await databaseService.appDatabase - .deleteFrom('workspaces') - .where('account_id', '=', account.id) - .execute(); - - if (deletedWorkspaces.length !== workspaces.length) { - return { - success: false, - }; - } - - for (const workspace of workspaces) { - eventBus.publish({ - type: 'workspace_deleted', - workspace: { - id: workspace.workspace_id, - userId: workspace.user_id, - name: workspace.name, - avatar: workspace.avatar, - description: workspace.description, - role: workspace.role, - versionId: workspace.version_id, - accountId: workspace.account_id, - }, - }); - } - - await databaseService.appDatabase - .insertInto('deleted_tokens') - .values({ - token: account.token, - account_id: account.id, - server: account.server, - created_at: new Date().toISOString(), - }) - .execute(); - - return { - success: true, - }; - } -} diff --git a/apps/desktop/src/main/services/account-service.ts b/apps/desktop/src/main/services/account-service.ts new file mode 100644 index 00000000..dbfdec45 --- /dev/null +++ b/apps/desktop/src/main/services/account-service.ts @@ -0,0 +1,228 @@ +import fs from 'fs'; +import { databaseService } from '@/main/data/database-service'; +import { SelectAccount } from '@/main/data/app/schema'; +import { AccountSyncOutput } from '@colanode/core'; +import { httpClient } from '@/shared/lib/http-client'; +import { mapAccount, mapWorkspace } from '@/main/utils'; +import { getAccountAvatarsDirectoryPath } from '@/main/utils'; +import { eventBus } from '@/shared/lib/event-bus'; +import { workspaceService } from '@/main/services/workspace-service'; + +class AccountService { + async syncAccounts() { + const accounts = await databaseService.appDatabase + .selectFrom('accounts') + .selectAll() + .execute(); + + for (const account of accounts) { + this.syncAccount(account); + } + } + + private async syncAccount(account: SelectAccount) { + const server = await databaseService.appDatabase + .selectFrom('servers') + .selectAll() + .where('domain', '=', account.server) + .executeTakeFirst(); + + if (!server) { + throw new Error('Server not found!'); + } + + const { data, status } = await httpClient.get( + '/v1/accounts/sync', + { + serverDomain: server.domain, + serverAttributes: server.attributes, + token: account.token, + } + ); + + if (status >= 400 && status < 500) { + await this.logoutAccount(account); + return; + } + + if (status !== 200) { + return; + } + + const currentWorkspaces = await databaseService.appDatabase + .selectFrom('workspaces') + .selectAll() + .where('account_id', '=', account.id) + .execute(); + + const updatedAccount = await databaseService.appDatabase + .updateTable('accounts') + .returningAll() + .set({ + name: data.account.name, + avatar: data.account.avatar, + }) + .where('id', '=', account.id) + .executeTakeFirst(); + + if (!updatedAccount) { + return; + } + + eventBus.publish({ + type: 'account_updated', + account: mapAccount(updatedAccount), + }); + + for (const workspace of data.workspaces) { + const currentWorkspace = currentWorkspaces.find( + (w) => w.workspace_id === workspace.id + ); + + if (!currentWorkspace) { + // create workspace here + const createdWorkspace = await databaseService.appDatabase + .insertInto('workspaces') + .values({ + workspace_id: workspace.id, + user_id: workspace.user.id, + account_id: account.id, + name: workspace.name, + avatar: workspace.avatar, + description: workspace.description, + role: workspace.user.role, + version_id: workspace.versionId, + }) + .returningAll() + .executeTakeFirst(); + + if (!createdWorkspace) { + return; + } + + eventBus.publish({ + type: 'workspace_created', + workspace: mapWorkspace(createdWorkspace), + }); + } else { + // update workspace here + const updatedWorkspace = await databaseService.appDatabase + .updateTable('workspaces') + .returningAll() + .set({ + name: workspace.name, + avatar: workspace.avatar, + description: workspace.description, + role: workspace.user.role, + version_id: workspace.versionId, + }) + .where('user_id', '=', currentWorkspace.user_id) + .executeTakeFirst(); + + if (!updatedWorkspace) { + return; + } + + eventBus.publish({ + type: 'workspace_updated', + workspace: mapWorkspace(updatedWorkspace), + }); + } + } + + for (const workspace of currentWorkspaces) { + const updatedWorkspace = data.workspaces.find( + (w) => w.id === workspace.workspace_id + ); + + if (!updatedWorkspace) { + await workspaceService.deleteWorkspace(workspace.user_id); + } + } + } + + public async logoutAccount(account: SelectAccount): Promise { + const workspaces = await databaseService.appDatabase + .selectFrom('workspaces') + .select(['user_id']) + .where('account_id', '=', account.id) + .execute(); + + for (const workspace of workspaces) { + await workspaceService.deleteWorkspace(workspace.user_id); + } + + const avatarsDir = getAccountAvatarsDirectoryPath(account.id); + if (fs.existsSync(avatarsDir)) { + fs.rmSync(avatarsDir, { recursive: true }); + } + + const deletedAccount = await databaseService.appDatabase + .deleteFrom('accounts') + .returningAll() + .where('id', '=', account.id) + .executeTakeFirst(); + + if (!deletedAccount) { + return false; + } + + eventBus.publish({ + type: 'account_deleted', + account: mapAccount(deletedAccount), + }); + + await databaseService.appDatabase + .insertInto('deleted_tokens') + .values({ + token: account.token, + account_id: account.id, + server: account.server, + created_at: new Date().toISOString(), + }) + .execute(); + + return true; + } + + public async syncDeletedTokens() { + const deletedTokens = await databaseService.appDatabase + .selectFrom('deleted_tokens') + .innerJoin('servers', 'deleted_tokens.server', 'servers.domain') + .select([ + 'deleted_tokens.token', + 'deleted_tokens.account_id', + 'servers.domain', + 'servers.attributes', + ]) + .execute(); + + if (deletedTokens.length === 0) { + return; + } + + for (const deletedToken of deletedTokens) { + try { + const { status } = await httpClient.delete(`/v1/accounts/logout`, { + serverDomain: deletedToken.domain, + serverAttributes: deletedToken.attributes, + token: deletedToken.token, + }); + + if (status !== 200) { + return; + } + + await databaseService.appDatabase + .deleteFrom('deleted_tokens') + .where('token', '=', deletedToken.token) + .where('account_id', '=', deletedToken.account_id) + .execute(); + } catch (error) { + // console.log('error', error); + } + } + } +} + +export const accountService = new AccountService(); diff --git a/apps/desktop/src/main/services/sync-service.ts b/apps/desktop/src/main/services/sync-service.ts index 323134b7..4fbd6f5e 100644 --- a/apps/desktop/src/main/services/sync-service.ts +++ b/apps/desktop/src/main/services/sync-service.ts @@ -1,28 +1,14 @@ -import { httpClient } from '@/shared/lib/http-client'; -import { databaseService } from '@/main/data/database-service'; -import { LocalChange, SyncChangesOutput } from '@colanode/core'; -import { - SelectChange, - WorkspaceDatabaseSchema, -} from '@/main/data/workspace/schema'; -import { Kysely } from 'kysely'; -import { eventBus } from '@/shared/lib/event-bus'; +import { accountService } from '@/main/services/account-service'; +import { workspaceService } from '@/main/services/workspace-service'; -type SyncState = { - isSyncing: boolean; - scheduledSync: boolean; -}; +// one minute +const EVENT_LOOP_INTERVAL = 1000 * 60; class SyncService { private initiated: boolean = false; - private syncStates: Map = new Map(); constructor() { - eventBus.subscribe((event) => { - if (event.type === 'change_created') { - this.syncWorkspace(event.userId); - } - }); + this.executeEventLoop = this.executeEventLoop.bind(this); } public init() { @@ -31,238 +17,19 @@ class SyncService { } this.initiated = true; - this.syncAllWorkspaces(); + setTimeout(this.executeEventLoop, 10); } - private async syncAllWorkspaces() { - const workspaces = await databaseService.appDatabase - .selectFrom('workspaces') - .select(['user_id']) - .execute(); - - for (const workspace of workspaces) { - this.syncWorkspace(workspace.user_id); - } - } - - public async syncWorkspace(userId: string) { - if (!this.syncStates.has(userId)) { - this.syncStates.set(userId, { - isSyncing: false, - scheduledSync: false, - }); - } - - const syncState = this.syncStates.get(userId)!; - if (syncState.isSyncing) { - syncState.scheduledSync = true; - return; - } - - syncState.isSyncing = true; + private async executeEventLoop() { try { - await this.syncWorkspaceChanges(userId); + await accountService.syncAccounts(); + await accountService.syncDeletedTokens(); + await workspaceService.syncAllWorkspaces(); } catch (error) { console.log('error', error); - } finally { - syncState.isSyncing = false; - - if (syncState.scheduledSync) { - syncState.scheduledSync = false; - this.syncWorkspace(userId); - } - } - } - - // private async syncDeletedTokens() { - // const deletedTokens = await databaseService.appDatabase - // .selectFrom('deleted_tokens') - // .innerJoin('servers', 'deleted_tokens.server', 'servers.domain') - // .select([ - // 'deleted_tokens.token', - // 'deleted_tokens.account_id', - // 'servers.domain', - // 'servers.attributes', - // ]) - // .execute(); - - // if (deletedTokens.length === 0) { - // return; - // } - - // for (const deletedToken of deletedTokens) { - // try { - // const { status } = await httpClient.delete(`/v1/accounts/logout`, { - // serverDomain: deletedToken.domain, - // serverAttributes: deletedToken.attributes, - // token: deletedToken.token, - // }); - - // if (status !== 200) { - // return; - // } - - // await databaseService.appDatabase - // .deleteFrom('deleted_tokens') - // .where('token', '=', deletedToken.token) - // .where('account_id', '=', deletedToken.account_id) - // .execute(); - // } catch (error) { - // // console.log('error', error); - // } - // } - // } - - private async syncWorkspaceChanges(userId: string) { - const workspaceDatabase = - await databaseService.getWorkspaceDatabase(userId); - - const changes = - await this.fetchAndCompactWorkspaceChanges(workspaceDatabase); - if (changes.length === 0) { - return; } - const workspace = await databaseService.appDatabase - .selectFrom('workspaces') - .innerJoin('accounts', 'workspaces.account_id', 'accounts.id') - .innerJoin('servers', 'accounts.server', 'servers.domain') - .select([ - 'workspaces.workspace_id', - 'workspaces.user_id', - 'workspaces.account_id', - 'accounts.token', - 'servers.domain', - 'servers.attributes', - ]) - .where('workspaces.user_id', '=', userId) - .executeTakeFirst(); - - if (!workspace) { - return; - } - - while (changes.length > 0) { - const changesToSync = changes.splice(0, 20); - const { data } = await httpClient.post( - `/v1/sync/${workspace.workspace_id}`, - { - changes: changesToSync, - }, - { - serverDomain: workspace.domain, - serverAttributes: workspace.attributes, - token: workspace.token, - } - ); - - const syncedChangeIds: number[] = []; - const unsyncedChangeIds: number[] = []; - for (const result of data.results) { - if (result.status === 'success') { - syncedChangeIds.push(result.id); - } else { - unsyncedChangeIds.push(result.id); - } - } - - if (syncedChangeIds.length > 0) { - await workspaceDatabase - .deleteFrom('changes') - .where('id', 'in', syncedChangeIds) - .execute(); - } - - if (unsyncedChangeIds.length > 0) { - await workspaceDatabase - .updateTable('changes') - .set((eb) => ({ retry_count: eb('retry_count', '+', 1) })) - .where('id', 'in', unsyncedChangeIds) - .execute(); - - //we just delete changes that have failed to sync for more than 5 times. - //in the future we might need to revert the change locally. - await workspaceDatabase - .deleteFrom('changes') - .where('retry_count', '>=', 5) - .execute(); - } - } - } - - private async fetchAndCompactWorkspaceChanges( - database: Kysely - ): Promise { - const changeRows = await database - .selectFrom('changes') - .selectAll() - .orderBy('id asc') - .limit(1000) - .execute(); - - if (changeRows.length === 0) { - return []; - } - - const changes: LocalChange[] = changeRows.map(this.mapChange); - const changesToDelete = new Set(); - for (let i = changes.length - 1; i >= 0; i--) { - const change = changes[i]; - - if (changesToDelete.has(change.id)) { - continue; - } - - if (change.data.type === 'node_delete') { - for (let j = i - 1; j >= 0; j--) { - const otherChange = changes[j]; - if ( - otherChange.data.type === 'node_create' && - otherChange.data.id === change.data.id - ) { - // if the node has been created and then deleted, we don't need to sync the delete - changesToDelete.add(change.id); - changesToDelete.add(otherChange.id); - } - - if ( - otherChange.data.type === 'node_update' && - otherChange.data.id === change.data.id - ) { - changesToDelete.add(otherChange.id); - } - } - } else if (change.data.type === 'user_node_update') { - for (let j = i - 1; j >= 0; j--) { - const otherChange = changes[j]; - if ( - otherChange.data.type === 'user_node_update' && - otherChange.data.nodeId === change.data.nodeId && - otherChange.data.userId === change.data.userId - ) { - changesToDelete.add(otherChange.id); - } - } - } - } - - if (changesToDelete.size > 0) { - const toDeleteIds = Array.from(changesToDelete); - await database - .deleteFrom('changes') - .where('id', 'in', toDeleteIds) - .execute(); - } - - return changes.filter((change) => !changesToDelete.has(change.id)); - } - - private mapChange(change: SelectChange): LocalChange { - return { - id: change.id, - data: JSON.parse(change.data), - createdAt: change.created_at, - }; + setTimeout(this.executeEventLoop, EVENT_LOOP_INTERVAL); } } diff --git a/apps/desktop/src/main/services/workspace-service.ts b/apps/desktop/src/main/services/workspace-service.ts new file mode 100644 index 00000000..004d5ea7 --- /dev/null +++ b/apps/desktop/src/main/services/workspace-service.ts @@ -0,0 +1,240 @@ +import fs from 'fs'; +import { databaseService } from '@/main/data/database-service'; +import { + getWorkspaceDirectoryPath, + mapChange, + mapWorkspace, +} from '@/main/utils'; +import { eventBus } from '@/shared/lib/event-bus'; +import { LocalChange, SyncChangesOutput } from '@colanode/core'; +import { WorkspaceDatabaseSchema } from '@/main/data/workspace/schema'; +import { Kysely } from 'kysely'; +import { httpClient } from '@/shared/lib/http-client'; + +type WorkspaceSyncState = { + isSyncing: boolean; + scheduledSync: boolean; +}; + +class WorkspaceService { + private syncStates: Map = new Map(); + + constructor() { + eventBus.subscribe((event) => { + if (event.type === 'change_created') { + this.syncWorkspace(event.userId); + } + }); + } + + public async deleteWorkspace(userId: string): Promise { + const deletedWorkspace = await databaseService.appDatabase + .deleteFrom('workspaces') + .returningAll() + .where('user_id', '=', userId) + .executeTakeFirst(); + + if (!deletedWorkspace) { + return false; + } + + await databaseService.deleteWorkspaceDatabase(userId); + const workspaceDir = getWorkspaceDirectoryPath(userId); + if (fs.existsSync(workspaceDir)) { + fs.rmSync(workspaceDir, { recursive: true }); + } + + eventBus.publish({ + type: 'workspace_deleted', + workspace: mapWorkspace(deletedWorkspace), + }); + + return true; + } + + public async syncAllWorkspaces() { + const workspaces = await databaseService.appDatabase + .selectFrom('workspaces') + .select(['user_id']) + .execute(); + + for (const workspace of workspaces) { + this.syncWorkspace(workspace.user_id); + } + } + + public async syncWorkspace(userId: string) { + if (!this.syncStates.has(userId)) { + this.syncStates.set(userId, { + isSyncing: false, + scheduledSync: false, + }); + } + + const syncState = this.syncStates.get(userId)!; + if (syncState.isSyncing) { + syncState.scheduledSync = true; + return; + } + + syncState.isSyncing = true; + try { + await this.syncWorkspaceChanges(userId); + } catch (error) { + console.log('error', error); + } finally { + syncState.isSyncing = false; + + if (syncState.scheduledSync) { + syncState.scheduledSync = false; + this.syncWorkspace(userId); + } + } + } + + private async syncWorkspaceChanges(userId: string) { + const workspaceDatabase = + await databaseService.getWorkspaceDatabase(userId); + + const changes = + await this.fetchAndCompactWorkspaceChanges(workspaceDatabase); + if (changes.length === 0) { + return; + } + + const workspace = await databaseService.appDatabase + .selectFrom('workspaces') + .innerJoin('accounts', 'workspaces.account_id', 'accounts.id') + .innerJoin('servers', 'accounts.server', 'servers.domain') + .select([ + 'workspaces.workspace_id', + 'workspaces.user_id', + 'workspaces.account_id', + 'accounts.token', + 'servers.domain', + 'servers.attributes', + ]) + .where('workspaces.user_id', '=', userId) + .executeTakeFirst(); + + if (!workspace) { + return; + } + + while (changes.length > 0) { + const changesToSync = changes.splice(0, 20); + const { data } = await httpClient.post( + `/v1/sync/${workspace.workspace_id}`, + { + changes: changesToSync, + }, + { + serverDomain: workspace.domain, + serverAttributes: workspace.attributes, + token: workspace.token, + } + ); + + const syncedChangeIds: number[] = []; + const unsyncedChangeIds: number[] = []; + for (const result of data.results) { + if (result.status === 'success') { + syncedChangeIds.push(result.id); + } else { + unsyncedChangeIds.push(result.id); + } + } + + if (syncedChangeIds.length > 0) { + await workspaceDatabase + .deleteFrom('changes') + .where('id', 'in', syncedChangeIds) + .execute(); + } + + if (unsyncedChangeIds.length > 0) { + await workspaceDatabase + .updateTable('changes') + .set((eb) => ({ retry_count: eb('retry_count', '+', 1) })) + .where('id', 'in', unsyncedChangeIds) + .execute(); + + //we just delete changes that have failed to sync for more than 5 times. + //in the future we might need to revert the change locally. + await workspaceDatabase + .deleteFrom('changes') + .where('retry_count', '>=', 5) + .execute(); + } + } + } + + private async fetchAndCompactWorkspaceChanges( + database: Kysely + ): Promise { + const changeRows = await database + .selectFrom('changes') + .selectAll() + .orderBy('id asc') + .limit(1000) + .execute(); + + if (changeRows.length === 0) { + return []; + } + + const changes: LocalChange[] = changeRows.map(mapChange); + const changesToDelete = new Set(); + for (let i = changes.length - 1; i >= 0; i--) { + const change = changes[i]; + + if (changesToDelete.has(change.id)) { + continue; + } + + if (change.data.type === 'node_delete') { + for (let j = i - 1; j >= 0; j--) { + const otherChange = changes[j]; + if ( + otherChange.data.type === 'node_create' && + otherChange.data.id === change.data.id + ) { + // if the node has been created and then deleted, we don't need to sync the delete + changesToDelete.add(change.id); + changesToDelete.add(otherChange.id); + } + + if ( + otherChange.data.type === 'node_update' && + otherChange.data.id === change.data.id + ) { + changesToDelete.add(otherChange.id); + } + } + } else if (change.data.type === 'user_node_update') { + for (let j = i - 1; j >= 0; j--) { + const otherChange = changes[j]; + if ( + otherChange.data.type === 'user_node_update' && + otherChange.data.nodeId === change.data.nodeId && + otherChange.data.userId === change.data.userId + ) { + changesToDelete.add(otherChange.id); + } + } + } + } + + if (changesToDelete.size > 0) { + const toDeleteIds = Array.from(changesToDelete); + await database + .deleteFrom('changes') + .where('id', 'in', toDeleteIds) + .execute(); + } + + return changes.filter((change) => !changesToDelete.has(change.id)); + } +} + +export const workspaceService = new WorkspaceService(); diff --git a/apps/desktop/src/main/utils.ts b/apps/desktop/src/main/utils.ts index f4a004ec..ec36c43a 100644 --- a/apps/desktop/src/main/utils.ts +++ b/apps/desktop/src/main/utils.ts @@ -8,10 +8,14 @@ import { } from 'kysely'; import path from 'path'; import { + SelectChange, SelectNode, WorkspaceDatabaseSchema, } from '@/main/data/workspace/schema'; -import { Node, NodeTypes } from '@colanode/core'; +import { LocalChange, Node, NodeTypes } from '@colanode/core'; +import { Account } from '@/shared/types/accounts'; +import { SelectAccount, SelectWorkspace } from './data/app/schema'; +import { Workspace } from '@/shared/types/workspaces'; export const appPath = app.getPath('userData'); @@ -89,3 +93,37 @@ export const mapNode = (row: SelectNode): Node => { serverVersionId: row.server_version_id, }; }; + +export const mapAccount = (row: SelectAccount): Account => { + return { + id: row.id, + server: row.server, + name: row.name, + avatar: row.avatar, + deviceId: row.device_id, + email: row.email, + token: row.token, + status: row.status, + }; +}; + +export const mapWorkspace = (row: SelectWorkspace): Workspace => { + return { + id: row.workspace_id, + name: row.name, + versionId: row.version_id, + accountId: row.account_id, + role: row.role, + userId: row.user_id, + avatar: row.avatar, + description: row.description, + }; +}; + +export const mapChange = (change: SelectChange): LocalChange => { + return { + id: change.id, + data: JSON.parse(change.data), + createdAt: change.created_at, + }; +}; diff --git a/apps/desktop/src/renderer/components/accounts/account-logout.tsx b/apps/desktop/src/renderer/components/accounts/account-logout.tsx index 47fad341..1ebdfa5a 100644 --- a/apps/desktop/src/renderer/components/accounts/account-logout.tsx +++ b/apps/desktop/src/renderer/components/accounts/account-logout.tsx @@ -45,7 +45,7 @@ export const AccountLogout = ({ onCancel, onLogout }: AccountLogoutProps) => { onClick={async () => { mutate({ input: { - type: 'logout', + type: 'account_logout', accountId: account.id, }, onSuccess() { diff --git a/apps/desktop/src/shared/mutations/account-logout.ts b/apps/desktop/src/shared/mutations/account-logout.ts new file mode 100644 index 00000000..f871a6a6 --- /dev/null +++ b/apps/desktop/src/shared/mutations/account-logout.ts @@ -0,0 +1,17 @@ +export type AccountLogoutMutationInput = { + type: 'account_logout'; + accountId: string; +}; + +export type AccountLogoutMutationOutput = { + success: boolean; +}; + +declare module '@/shared/mutations' { + interface MutationMap { + account_logout: { + input: AccountLogoutMutationInput; + output: AccountLogoutMutationOutput; + }; + } +} diff --git a/apps/desktop/src/shared/mutations/logout.ts b/apps/desktop/src/shared/mutations/logout.ts deleted file mode 100644 index 679a67c7..00000000 --- a/apps/desktop/src/shared/mutations/logout.ts +++ /dev/null @@ -1,17 +0,0 @@ -export type LogoutMutationInput = { - type: 'logout'; - accountId: string; -}; - -export type LogoutMutationOutput = { - success: boolean; -}; - -declare module '@/shared/mutations' { - interface MutationMap { - logout: { - input: LogoutMutationInput; - output: LogoutMutationOutput; - }; - } -} diff --git a/apps/desktop/src/shared/types/accounts.ts b/apps/desktop/src/shared/types/accounts.ts index ee15a62a..85336f88 100644 --- a/apps/desktop/src/shared/types/accounts.ts +++ b/apps/desktop/src/shared/types/accounts.ts @@ -1,10 +1,3 @@ -import { WorkspaceOutput } from '@colanode/core'; - -export type LoginOutput = { - account: Account; - workspaces: WorkspaceOutput[]; -}; - export type Account = { id: string; name: string; @@ -15,9 +8,3 @@ export type Account = { status: string; server: string; }; - -export type AccountUpdateOutput = { - id: string; - name: string; - avatar?: string | null; -}; diff --git a/apps/server/src/routes/accounts.ts b/apps/server/src/routes/accounts.ts index 3569ca28..358c46d6 100644 --- a/apps/server/src/routes/accounts.ts +++ b/apps/server/src/routes/accounts.ts @@ -8,13 +8,16 @@ import { GoogleLoginInput, GoogleUserInfo, LoginOutput, -} from '@/types/accounts'; + WorkspaceOutput, + WorkspaceRole, + generateId, + IdType, + AccountSyncOutput, +} from '@colanode/core'; import axios from 'axios'; import { ApiError, ColanodeRequest, ColanodeResponse } from '@/types/api'; -import { generateId, IdType } from '@colanode/core'; import { database } from '@/data/database'; import bcrypt from 'bcrypt'; -import { WorkspaceOutput, WorkspaceRole } from '@colanode/core'; import { authMiddleware } from '@/middlewares/auth'; import { generateToken } from '@/lib/tokens'; import { enqueueTask } from '@/queues/tasks'; @@ -383,6 +386,83 @@ accountsRouter.put( } ); +accountsRouter.get( + '/sync', + authMiddleware, + async (req: ColanodeRequest, res: ColanodeResponse) => { + if (!req.account) { + return res.status(401).json({ + code: ApiError.Unauthorized, + message: 'Unauthorized.', + }); + } + + const account = await database + .selectFrom('accounts') + .where('id', '=', req.account.id) + .selectAll() + .executeTakeFirst(); + + if (!account) { + return res.status(404).json({ + code: ApiError.ResourceNotFound, + message: 'Account not found.', + }); + } + + const workspaceOutputs: WorkspaceOutput[] = []; + const workspaceUsers = await database + .selectFrom('workspace_users') + .where('account_id', '=', account.id) + .selectAll() + .execute(); + + if (workspaceUsers.length > 0) { + const workspaceIds = workspaceUsers.map((wu) => wu.workspace_id); + const workspaces = await database + .selectFrom('workspaces') + .where('id', 'in', workspaceIds) + .selectAll() + .execute(); + + for (const workspaceUser of workspaceUsers) { + const workspace = workspaces.find( + (w) => w.id === workspaceUser.workspace_id + ); + + if (!workspace) { + continue; + } + + workspaceOutputs.push({ + id: workspace.id, + name: workspace.name, + versionId: workspaceUser.version_id, + avatar: workspace.avatar, + description: workspace.description, + user: { + id: workspaceUser.id, + accountId: workspaceUser.account_id, + role: workspaceUser.role as WorkspaceRole, + }, + }); + } + } + + const output: AccountSyncOutput = { + account: { + id: account.id, + name: account.name, + email: account.email, + avatar: account.avatar, + }, + workspaces: workspaceOutputs, + }; + + return res.status(200).json(output); + } +); + const buildLoginOutput = async ( account: SelectAccount ): Promise => { @@ -459,12 +539,13 @@ const buildLoginOutput = async ( return { account: { - token, id: account.id, name: account.name, email: account.email, - deviceId: device.id, + avatar: account.avatar, }, workspaces: workspaceOutputs, + deviceId: device.id, + token, }; }; diff --git a/apps/server/src/routes/workspaces.ts b/apps/server/src/routes/workspaces.ts index a26b59b5..82da85e0 100644 --- a/apps/server/src/routes/workspaces.ts +++ b/apps/server/src/routes/workspaces.ts @@ -15,6 +15,7 @@ import { IdType, UserAttributes, WorkspaceAttributes, + AccountStatus, } from '@colanode/core'; import { database } from '@/data/database'; import { Router } from 'express'; @@ -26,7 +27,6 @@ import { SelectWorkspaceUser, } from '@/data/schema'; import { getNameFromEmail } from '@/lib/utils'; -import { AccountStatus } from '@/types/accounts'; import { mapNodeOutput } from '@/lib/nodes'; import { NodeCreatedEvent, NodeUpdatedEvent } from '@/types/events'; import { enqueueEvent } from '@/queues/events'; diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index c0d58526..4c9e6efe 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -20,3 +20,4 @@ export * from './registry/workspace'; export * from './types/workspaces'; export * from './types/nodes'; export * from './types/sync'; +export * from './types/accounts'; diff --git a/apps/server/src/types/accounts.ts b/packages/core/src/types/accounts.ts similarity index 83% rename from apps/server/src/types/accounts.ts rename to packages/core/src/types/accounts.ts index a045cb99..eb8d4f0d 100644 --- a/apps/server/src/types/accounts.ts +++ b/packages/core/src/types/accounts.ts @@ -1,4 +1,4 @@ -import { WorkspaceOutput } from '@colanode/core'; +import { WorkspaceOutput } from './workspaces'; export type GoogleLoginInput = { access_token: string; @@ -27,6 +27,8 @@ export type GoogleUserInfo = { export type LoginOutput = { account: AccountOutput; workspaces: WorkspaceOutput[]; + deviceId: string; + token: string; }; export type AccountOutput = { @@ -34,8 +36,6 @@ export type AccountOutput = { name: string; email: string; avatar?: string | null; - token: string; - deviceId: string; }; export enum AccountStatus { @@ -53,3 +53,9 @@ export type AccountUpdateOutput = { name: string; avatar?: string | null; }; + +export type AccountSyncOutput = { + account: AccountOutput; + workspaces: WorkspaceOutput[]; + token?: string; +};