Improve sync service and introduce account sync

This commit is contained in:
Hakan Shehu
2024-11-15 10:49:07 +01:00
parent 54bb91a866
commit 712d0da8aa
18 changed files with 726 additions and 421 deletions

View File

@@ -0,0 +1,32 @@
import { databaseService } from '@/main/data/database-service';
import {
AccountLogoutMutationInput,
AccountLogoutMutationOutput,
} from '@/shared/mutations/account-logout';
import { MutationHandler } from '@/main/types';
import { accountService } from '@/main/services/account-service';
export class AccountLogoutMutationHandler
implements MutationHandler<AccountLogoutMutationInput>
{
async handleMutation(
input: AccountLogoutMutationInput
): Promise<AccountLogoutMutationOutput> {
const account = await databaseService.appDatabase
.selectFrom('accounts')
.selectAll()
.where('id', '=', input.accountId)
.executeTakeFirst();
if (!account) {
return {
success: false,
};
}
await accountService.logoutAccount(account);
return {
success: true,
};
}
}

View File

@@ -1,7 +1,7 @@
import { databaseService } from '@/main/data/database-service';
import { httpClient } from '@/shared/lib/http-client';
import { MutationHandler } from '@/main/types';
import { AccountUpdateOutput } from '@/shared/types/accounts';
import { AccountUpdateOutput } from '@colanode/core';
import {
AccountUpdateMutationInput,
AccountUpdateMutationOutput,

View File

@@ -1,4 +1,4 @@
import { LoginOutput } from '@/shared/types/accounts';
import { LoginOutput } from '@colanode/core';
import { databaseService } from '@/main/data/database-service';
import {
EmailLoginMutationInput,
@@ -7,6 +7,7 @@ import {
import { MutationHandler } from '@/main/types';
import { httpClient } from '@/shared/lib/http-client';
import { eventBus } from '@/shared/lib/event-bus';
import { Account } from '@/shared/types/accounts';
export class EmailLoginMutationHandler
implements MutationHandler<EmailLoginMutationInput>
@@ -38,20 +39,37 @@ export class EmailLoginMutationHandler
}
);
let account: Account | undefined;
await databaseService.appDatabase.transaction().execute(async (trx) => {
await trx
const createdAccount = await trx
.insertInto('accounts')
.returningAll()
.values({
id: data.account.id,
name: data.account.name,
avatar: data.account.avatar,
device_id: data.account.deviceId,
device_id: data.deviceId,
email: data.account.email,
token: data.account.token,
token: data.token,
server: server.domain,
status: 'active',
})
.execute();
.executeTakeFirst();
if (!createdAccount) {
throw new Error('Failed to create account!');
}
account = {
id: createdAccount.id,
name: createdAccount.name,
email: createdAccount.email,
avatar: createdAccount.avatar,
deviceId: data.deviceId,
token: data.token,
status: 'active',
server: server.domain,
};
if (data.workspaces.length === 0) {
return;
@@ -74,9 +92,13 @@ export class EmailLoginMutationHandler
.execute();
});
if (!account) {
throw new Error('Failed to create account!');
}
eventBus.publish({
type: 'account_created',
account: data.account,
account,
});
if (data.workspaces.length > 0) {
@@ -97,7 +119,7 @@ export class EmailLoginMutationHandler
return {
success: true,
account: data.account,
account,
workspaces: data.workspaces,
};
}

View File

@@ -1,4 +1,4 @@
import { LoginOutput } from '@/shared/types/accounts';
import { LoginOutput } from '@colanode/core';
import { databaseService } from '@/main/data/database-service';
import { httpClient } from '@/shared/lib/http-client';
import {
@@ -7,6 +7,7 @@ import {
} from '@/shared/mutations/email-register';
import { MutationHandler } from '@/main/types';
import { eventBus } from '@/shared/lib/event-bus';
import { Account } from '@/shared/types/accounts';
export class EmailRegisterMutationHandler
implements MutationHandler<EmailRegisterMutationInput>
@@ -39,20 +40,37 @@ export class EmailRegisterMutationHandler
}
);
let account: Account | undefined;
await databaseService.appDatabase.transaction().execute(async (trx) => {
await trx
const createdAccount = await trx
.insertInto('accounts')
.returningAll()
.values({
id: data.account.id,
name: data.account.name,
avatar: data.account.avatar,
device_id: data.account.deviceId,
device_id: data.deviceId,
email: data.account.email,
token: data.account.token,
token: data.token,
server: server.domain,
status: 'active',
})
.execute();
.executeTakeFirst();
if (!createdAccount) {
throw new Error('Failed to create account!');
}
account = {
id: createdAccount.id,
name: createdAccount.name,
email: createdAccount.email,
avatar: createdAccount.avatar,
deviceId: data.deviceId,
token: data.token,
status: 'active',
server: server.domain,
};
if (data.workspaces.length === 0) {
return;
@@ -75,9 +93,13 @@ export class EmailRegisterMutationHandler
.execute();
});
if (!account) {
throw new Error('Failed to create account!');
}
eventBus.publish({
type: 'account_created',
account: data.account,
account,
});
if (data.workspaces.length > 0) {
@@ -98,7 +120,7 @@ export class EmailRegisterMutationHandler
return {
success: true,
account: data.account,
account,
workspaces: data.workspaces,
};
}

View File

@@ -27,7 +27,7 @@ import { WorkspaceCreateMutationHandler } from '@/main/handlers/mutations/worksp
import { WorkspaceUpdateMutationHandler } from '@/main/handlers/mutations/workspace-update';
import { DocumentSaveMutationHandler } from '@/main/handlers/mutations/document-save';
import { AvatarUploadMutationHandler } from '@/main/handlers/mutations/avatar-upload';
import { LogoutMutationHandler } from '@/main/handlers/mutations/logout';
import { AccountLogoutMutationHandler } from '@/main/handlers/mutations/account-logout';
import { ServerNodeSyncMutationHandler } from '@/main/handlers/mutations/server-node-sync';
import { ServerNodeDeleteMutationHandler } from '@/main/handlers/mutations/server-node-delete';
import { FolderCreateMutationHandler } from '@/main/handlers/mutations/folder-create';
@@ -72,7 +72,7 @@ export const mutationHandlerMap: MutationHandlerMap = {
workspace_update: new WorkspaceUpdateMutationHandler(),
document_save: new DocumentSaveMutationHandler(),
avatar_upload: new AvatarUploadMutationHandler(),
logout: new LogoutMutationHandler(),
account_logout: new AccountLogoutMutationHandler(),
server_node_sync: new ServerNodeSyncMutationHandler(),
server_node_delete: new ServerNodeDeleteMutationHandler(),
folder_create: new FolderCreateMutationHandler(),

View File

@@ -1,119 +0,0 @@
import fs from 'fs';
import { databaseService } from '@/main/data/database-service';
import {
LogoutMutationInput,
LogoutMutationOutput,
} from '@/shared/mutations/logout';
import { MutationHandler } from '@/main/types';
import {
getAccountAvatarsDirectoryPath,
getWorkspaceDirectoryPath,
} from '@/main/utils';
import { eventBus } from '@/shared/lib/event-bus';
export class LogoutMutationHandler
implements MutationHandler<LogoutMutationInput>
{
async handleMutation(
input: LogoutMutationInput
): Promise<LogoutMutationOutput> {
const account = await databaseService.appDatabase
.selectFrom('accounts')
.selectAll()
.where('id', '=', input.accountId)
.executeTakeFirst();
if (!account) {
return {
success: false,
};
}
const workspaces = await databaseService.appDatabase
.selectFrom('workspaces')
.selectAll()
.where('account_id', '=', account.id)
.execute();
for (const workspace of workspaces) {
await databaseService.deleteWorkspaceDatabase(workspace.user_id);
const workspaceDir = getWorkspaceDirectoryPath(workspace.user_id);
if (fs.existsSync(workspaceDir)) {
fs.rmSync(workspaceDir, { recursive: true });
}
}
const avatarsDir = getAccountAvatarsDirectoryPath(account.id);
if (fs.existsSync(avatarsDir)) {
fs.rmSync(avatarsDir, { recursive: true });
}
const deletedAccount = await databaseService.appDatabase
.deleteFrom('accounts')
.returningAll()
.where('id', '=', account.id)
.executeTakeFirst();
if (!deletedAccount) {
return {
success: false,
};
}
eventBus.publish({
type: 'account_deleted',
account: {
id: deletedAccount.id,
server: deletedAccount.server,
name: deletedAccount.name,
email: deletedAccount.email,
avatar: deletedAccount.avatar,
token: deletedAccount.token,
deviceId: deletedAccount.device_id,
status: deletedAccount.status,
},
});
const deletedWorkspaces = await databaseService.appDatabase
.deleteFrom('workspaces')
.where('account_id', '=', account.id)
.execute();
if (deletedWorkspaces.length !== workspaces.length) {
return {
success: false,
};
}
for (const workspace of workspaces) {
eventBus.publish({
type: 'workspace_deleted',
workspace: {
id: workspace.workspace_id,
userId: workspace.user_id,
name: workspace.name,
avatar: workspace.avatar,
description: workspace.description,
role: workspace.role,
versionId: workspace.version_id,
accountId: workspace.account_id,
},
});
}
await databaseService.appDatabase
.insertInto('deleted_tokens')
.values({
token: account.token,
account_id: account.id,
server: account.server,
created_at: new Date().toISOString(),
})
.execute();
return {
success: true,
};
}
}

View File

@@ -0,0 +1,228 @@
import fs from 'fs';
import { databaseService } from '@/main/data/database-service';
import { SelectAccount } from '@/main/data/app/schema';
import { AccountSyncOutput } from '@colanode/core';
import { httpClient } from '@/shared/lib/http-client';
import { mapAccount, mapWorkspace } from '@/main/utils';
import { getAccountAvatarsDirectoryPath } from '@/main/utils';
import { eventBus } from '@/shared/lib/event-bus';
import { workspaceService } from '@/main/services/workspace-service';
class AccountService {
async syncAccounts() {
const accounts = await databaseService.appDatabase
.selectFrom('accounts')
.selectAll()
.execute();
for (const account of accounts) {
this.syncAccount(account);
}
}
private async syncAccount(account: SelectAccount) {
const server = await databaseService.appDatabase
.selectFrom('servers')
.selectAll()
.where('domain', '=', account.server)
.executeTakeFirst();
if (!server) {
throw new Error('Server not found!');
}
const { data, status } = await httpClient.get<AccountSyncOutput>(
'/v1/accounts/sync',
{
serverDomain: server.domain,
serverAttributes: server.attributes,
token: account.token,
}
);
if (status >= 400 && status < 500) {
await this.logoutAccount(account);
return;
}
if (status !== 200) {
return;
}
const currentWorkspaces = await databaseService.appDatabase
.selectFrom('workspaces')
.selectAll()
.where('account_id', '=', account.id)
.execute();
const updatedAccount = await databaseService.appDatabase
.updateTable('accounts')
.returningAll()
.set({
name: data.account.name,
avatar: data.account.avatar,
})
.where('id', '=', account.id)
.executeTakeFirst();
if (!updatedAccount) {
return;
}
eventBus.publish({
type: 'account_updated',
account: mapAccount(updatedAccount),
});
for (const workspace of data.workspaces) {
const currentWorkspace = currentWorkspaces.find(
(w) => w.workspace_id === workspace.id
);
if (!currentWorkspace) {
// create workspace here
const createdWorkspace = await databaseService.appDatabase
.insertInto('workspaces')
.values({
workspace_id: workspace.id,
user_id: workspace.user.id,
account_id: account.id,
name: workspace.name,
avatar: workspace.avatar,
description: workspace.description,
role: workspace.user.role,
version_id: workspace.versionId,
})
.returningAll()
.executeTakeFirst();
if (!createdWorkspace) {
return;
}
eventBus.publish({
type: 'workspace_created',
workspace: mapWorkspace(createdWorkspace),
});
} else {
// update workspace here
const updatedWorkspace = await databaseService.appDatabase
.updateTable('workspaces')
.returningAll()
.set({
name: workspace.name,
avatar: workspace.avatar,
description: workspace.description,
role: workspace.user.role,
version_id: workspace.versionId,
})
.where('user_id', '=', currentWorkspace.user_id)
.executeTakeFirst();
if (!updatedWorkspace) {
return;
}
eventBus.publish({
type: 'workspace_updated',
workspace: mapWorkspace(updatedWorkspace),
});
}
}
for (const workspace of currentWorkspaces) {
const updatedWorkspace = data.workspaces.find(
(w) => w.id === workspace.workspace_id
);
if (!updatedWorkspace) {
await workspaceService.deleteWorkspace(workspace.user_id);
}
}
}
public async logoutAccount(account: SelectAccount): Promise<boolean> {
const workspaces = await databaseService.appDatabase
.selectFrom('workspaces')
.select(['user_id'])
.where('account_id', '=', account.id)
.execute();
for (const workspace of workspaces) {
await workspaceService.deleteWorkspace(workspace.user_id);
}
const avatarsDir = getAccountAvatarsDirectoryPath(account.id);
if (fs.existsSync(avatarsDir)) {
fs.rmSync(avatarsDir, { recursive: true });
}
const deletedAccount = await databaseService.appDatabase
.deleteFrom('accounts')
.returningAll()
.where('id', '=', account.id)
.executeTakeFirst();
if (!deletedAccount) {
return false;
}
eventBus.publish({
type: 'account_deleted',
account: mapAccount(deletedAccount),
});
await databaseService.appDatabase
.insertInto('deleted_tokens')
.values({
token: account.token,
account_id: account.id,
server: account.server,
created_at: new Date().toISOString(),
})
.execute();
return true;
}
public async syncDeletedTokens() {
const deletedTokens = await databaseService.appDatabase
.selectFrom('deleted_tokens')
.innerJoin('servers', 'deleted_tokens.server', 'servers.domain')
.select([
'deleted_tokens.token',
'deleted_tokens.account_id',
'servers.domain',
'servers.attributes',
])
.execute();
if (deletedTokens.length === 0) {
return;
}
for (const deletedToken of deletedTokens) {
try {
const { status } = await httpClient.delete(`/v1/accounts/logout`, {
serverDomain: deletedToken.domain,
serverAttributes: deletedToken.attributes,
token: deletedToken.token,
});
if (status !== 200) {
return;
}
await databaseService.appDatabase
.deleteFrom('deleted_tokens')
.where('token', '=', deletedToken.token)
.where('account_id', '=', deletedToken.account_id)
.execute();
} catch (error) {
// console.log('error', error);
}
}
}
}
export const accountService = new AccountService();

View File

@@ -1,28 +1,14 @@
import { httpClient } from '@/shared/lib/http-client';
import { databaseService } from '@/main/data/database-service';
import { LocalChange, SyncChangesOutput } from '@colanode/core';
import {
SelectChange,
WorkspaceDatabaseSchema,
} from '@/main/data/workspace/schema';
import { Kysely } from 'kysely';
import { eventBus } from '@/shared/lib/event-bus';
import { accountService } from '@/main/services/account-service';
import { workspaceService } from '@/main/services/workspace-service';
type SyncState = {
isSyncing: boolean;
scheduledSync: boolean;
};
// one minute
const EVENT_LOOP_INTERVAL = 1000 * 60;
class SyncService {
private initiated: boolean = false;
private syncStates: Map<string, SyncState> = new Map();
constructor() {
eventBus.subscribe((event) => {
if (event.type === 'change_created') {
this.syncWorkspace(event.userId);
}
});
this.executeEventLoop = this.executeEventLoop.bind(this);
}
public init() {
@@ -31,238 +17,19 @@ class SyncService {
}
this.initiated = true;
this.syncAllWorkspaces();
setTimeout(this.executeEventLoop, 10);
}
private async syncAllWorkspaces() {
const workspaces = await databaseService.appDatabase
.selectFrom('workspaces')
.select(['user_id'])
.execute();
for (const workspace of workspaces) {
this.syncWorkspace(workspace.user_id);
}
}
public async syncWorkspace(userId: string) {
if (!this.syncStates.has(userId)) {
this.syncStates.set(userId, {
isSyncing: false,
scheduledSync: false,
});
}
const syncState = this.syncStates.get(userId)!;
if (syncState.isSyncing) {
syncState.scheduledSync = true;
return;
}
syncState.isSyncing = true;
private async executeEventLoop() {
try {
await this.syncWorkspaceChanges(userId);
await accountService.syncAccounts();
await accountService.syncDeletedTokens();
await workspaceService.syncAllWorkspaces();
} catch (error) {
console.log('error', error);
} finally {
syncState.isSyncing = false;
if (syncState.scheduledSync) {
syncState.scheduledSync = false;
this.syncWorkspace(userId);
}
}
}
// private async syncDeletedTokens() {
// const deletedTokens = await databaseService.appDatabase
// .selectFrom('deleted_tokens')
// .innerJoin('servers', 'deleted_tokens.server', 'servers.domain')
// .select([
// 'deleted_tokens.token',
// 'deleted_tokens.account_id',
// 'servers.domain',
// 'servers.attributes',
// ])
// .execute();
// if (deletedTokens.length === 0) {
// return;
// }
// for (const deletedToken of deletedTokens) {
// try {
// const { status } = await httpClient.delete(`/v1/accounts/logout`, {
// serverDomain: deletedToken.domain,
// serverAttributes: deletedToken.attributes,
// token: deletedToken.token,
// });
// if (status !== 200) {
// return;
// }
// await databaseService.appDatabase
// .deleteFrom('deleted_tokens')
// .where('token', '=', deletedToken.token)
// .where('account_id', '=', deletedToken.account_id)
// .execute();
// } catch (error) {
// // console.log('error', error);
// }
// }
// }
private async syncWorkspaceChanges(userId: string) {
const workspaceDatabase =
await databaseService.getWorkspaceDatabase(userId);
const changes =
await this.fetchAndCompactWorkspaceChanges(workspaceDatabase);
if (changes.length === 0) {
return;
}
const workspace = await databaseService.appDatabase
.selectFrom('workspaces')
.innerJoin('accounts', 'workspaces.account_id', 'accounts.id')
.innerJoin('servers', 'accounts.server', 'servers.domain')
.select([
'workspaces.workspace_id',
'workspaces.user_id',
'workspaces.account_id',
'accounts.token',
'servers.domain',
'servers.attributes',
])
.where('workspaces.user_id', '=', userId)
.executeTakeFirst();
if (!workspace) {
return;
}
while (changes.length > 0) {
const changesToSync = changes.splice(0, 20);
const { data } = await httpClient.post<SyncChangesOutput>(
`/v1/sync/${workspace.workspace_id}`,
{
changes: changesToSync,
},
{
serverDomain: workspace.domain,
serverAttributes: workspace.attributes,
token: workspace.token,
}
);
const syncedChangeIds: number[] = [];
const unsyncedChangeIds: number[] = [];
for (const result of data.results) {
if (result.status === 'success') {
syncedChangeIds.push(result.id);
} else {
unsyncedChangeIds.push(result.id);
}
}
if (syncedChangeIds.length > 0) {
await workspaceDatabase
.deleteFrom('changes')
.where('id', 'in', syncedChangeIds)
.execute();
}
if (unsyncedChangeIds.length > 0) {
await workspaceDatabase
.updateTable('changes')
.set((eb) => ({ retry_count: eb('retry_count', '+', 1) }))
.where('id', 'in', unsyncedChangeIds)
.execute();
//we just delete changes that have failed to sync for more than 5 times.
//in the future we might need to revert the change locally.
await workspaceDatabase
.deleteFrom('changes')
.where('retry_count', '>=', 5)
.execute();
}
}
}
private async fetchAndCompactWorkspaceChanges(
database: Kysely<WorkspaceDatabaseSchema>
): Promise<LocalChange[]> {
const changeRows = await database
.selectFrom('changes')
.selectAll()
.orderBy('id asc')
.limit(1000)
.execute();
if (changeRows.length === 0) {
return [];
}
const changes: LocalChange[] = changeRows.map(this.mapChange);
const changesToDelete = new Set<number>();
for (let i = changes.length - 1; i >= 0; i--) {
const change = changes[i];
if (changesToDelete.has(change.id)) {
continue;
}
if (change.data.type === 'node_delete') {
for (let j = i - 1; j >= 0; j--) {
const otherChange = changes[j];
if (
otherChange.data.type === 'node_create' &&
otherChange.data.id === change.data.id
) {
// if the node has been created and then deleted, we don't need to sync the delete
changesToDelete.add(change.id);
changesToDelete.add(otherChange.id);
}
if (
otherChange.data.type === 'node_update' &&
otherChange.data.id === change.data.id
) {
changesToDelete.add(otherChange.id);
}
}
} else if (change.data.type === 'user_node_update') {
for (let j = i - 1; j >= 0; j--) {
const otherChange = changes[j];
if (
otherChange.data.type === 'user_node_update' &&
otherChange.data.nodeId === change.data.nodeId &&
otherChange.data.userId === change.data.userId
) {
changesToDelete.add(otherChange.id);
}
}
}
}
if (changesToDelete.size > 0) {
const toDeleteIds = Array.from(changesToDelete);
await database
.deleteFrom('changes')
.where('id', 'in', toDeleteIds)
.execute();
}
return changes.filter((change) => !changesToDelete.has(change.id));
}
private mapChange(change: SelectChange): LocalChange {
return {
id: change.id,
data: JSON.parse(change.data),
createdAt: change.created_at,
};
setTimeout(this.executeEventLoop, EVENT_LOOP_INTERVAL);
}
}

View File

@@ -0,0 +1,240 @@
import fs from 'fs';
import { databaseService } from '@/main/data/database-service';
import {
getWorkspaceDirectoryPath,
mapChange,
mapWorkspace,
} from '@/main/utils';
import { eventBus } from '@/shared/lib/event-bus';
import { LocalChange, SyncChangesOutput } from '@colanode/core';
import { WorkspaceDatabaseSchema } from '@/main/data/workspace/schema';
import { Kysely } from 'kysely';
import { httpClient } from '@/shared/lib/http-client';
type WorkspaceSyncState = {
isSyncing: boolean;
scheduledSync: boolean;
};
class WorkspaceService {
private syncStates: Map<string, WorkspaceSyncState> = new Map();
constructor() {
eventBus.subscribe((event) => {
if (event.type === 'change_created') {
this.syncWorkspace(event.userId);
}
});
}
public async deleteWorkspace(userId: string): Promise<boolean> {
const deletedWorkspace = await databaseService.appDatabase
.deleteFrom('workspaces')
.returningAll()
.where('user_id', '=', userId)
.executeTakeFirst();
if (!deletedWorkspace) {
return false;
}
await databaseService.deleteWorkspaceDatabase(userId);
const workspaceDir = getWorkspaceDirectoryPath(userId);
if (fs.existsSync(workspaceDir)) {
fs.rmSync(workspaceDir, { recursive: true });
}
eventBus.publish({
type: 'workspace_deleted',
workspace: mapWorkspace(deletedWorkspace),
});
return true;
}
public async syncAllWorkspaces() {
const workspaces = await databaseService.appDatabase
.selectFrom('workspaces')
.select(['user_id'])
.execute();
for (const workspace of workspaces) {
this.syncWorkspace(workspace.user_id);
}
}
public async syncWorkspace(userId: string) {
if (!this.syncStates.has(userId)) {
this.syncStates.set(userId, {
isSyncing: false,
scheduledSync: false,
});
}
const syncState = this.syncStates.get(userId)!;
if (syncState.isSyncing) {
syncState.scheduledSync = true;
return;
}
syncState.isSyncing = true;
try {
await this.syncWorkspaceChanges(userId);
} catch (error) {
console.log('error', error);
} finally {
syncState.isSyncing = false;
if (syncState.scheduledSync) {
syncState.scheduledSync = false;
this.syncWorkspace(userId);
}
}
}
private async syncWorkspaceChanges(userId: string) {
const workspaceDatabase =
await databaseService.getWorkspaceDatabase(userId);
const changes =
await this.fetchAndCompactWorkspaceChanges(workspaceDatabase);
if (changes.length === 0) {
return;
}
const workspace = await databaseService.appDatabase
.selectFrom('workspaces')
.innerJoin('accounts', 'workspaces.account_id', 'accounts.id')
.innerJoin('servers', 'accounts.server', 'servers.domain')
.select([
'workspaces.workspace_id',
'workspaces.user_id',
'workspaces.account_id',
'accounts.token',
'servers.domain',
'servers.attributes',
])
.where('workspaces.user_id', '=', userId)
.executeTakeFirst();
if (!workspace) {
return;
}
while (changes.length > 0) {
const changesToSync = changes.splice(0, 20);
const { data } = await httpClient.post<SyncChangesOutput>(
`/v1/sync/${workspace.workspace_id}`,
{
changes: changesToSync,
},
{
serverDomain: workspace.domain,
serverAttributes: workspace.attributes,
token: workspace.token,
}
);
const syncedChangeIds: number[] = [];
const unsyncedChangeIds: number[] = [];
for (const result of data.results) {
if (result.status === 'success') {
syncedChangeIds.push(result.id);
} else {
unsyncedChangeIds.push(result.id);
}
}
if (syncedChangeIds.length > 0) {
await workspaceDatabase
.deleteFrom('changes')
.where('id', 'in', syncedChangeIds)
.execute();
}
if (unsyncedChangeIds.length > 0) {
await workspaceDatabase
.updateTable('changes')
.set((eb) => ({ retry_count: eb('retry_count', '+', 1) }))
.where('id', 'in', unsyncedChangeIds)
.execute();
//we just delete changes that have failed to sync for more than 5 times.
//in the future we might need to revert the change locally.
await workspaceDatabase
.deleteFrom('changes')
.where('retry_count', '>=', 5)
.execute();
}
}
}
private async fetchAndCompactWorkspaceChanges(
database: Kysely<WorkspaceDatabaseSchema>
): Promise<LocalChange[]> {
const changeRows = await database
.selectFrom('changes')
.selectAll()
.orderBy('id asc')
.limit(1000)
.execute();
if (changeRows.length === 0) {
return [];
}
const changes: LocalChange[] = changeRows.map(mapChange);
const changesToDelete = new Set<number>();
for (let i = changes.length - 1; i >= 0; i--) {
const change = changes[i];
if (changesToDelete.has(change.id)) {
continue;
}
if (change.data.type === 'node_delete') {
for (let j = i - 1; j >= 0; j--) {
const otherChange = changes[j];
if (
otherChange.data.type === 'node_create' &&
otherChange.data.id === change.data.id
) {
// if the node has been created and then deleted, we don't need to sync the delete
changesToDelete.add(change.id);
changesToDelete.add(otherChange.id);
}
if (
otherChange.data.type === 'node_update' &&
otherChange.data.id === change.data.id
) {
changesToDelete.add(otherChange.id);
}
}
} else if (change.data.type === 'user_node_update') {
for (let j = i - 1; j >= 0; j--) {
const otherChange = changes[j];
if (
otherChange.data.type === 'user_node_update' &&
otherChange.data.nodeId === change.data.nodeId &&
otherChange.data.userId === change.data.userId
) {
changesToDelete.add(otherChange.id);
}
}
}
}
if (changesToDelete.size > 0) {
const toDeleteIds = Array.from(changesToDelete);
await database
.deleteFrom('changes')
.where('id', 'in', toDeleteIds)
.execute();
}
return changes.filter((change) => !changesToDelete.has(change.id));
}
}
export const workspaceService = new WorkspaceService();

View File

@@ -8,10 +8,14 @@ import {
} from 'kysely';
import path from 'path';
import {
SelectChange,
SelectNode,
WorkspaceDatabaseSchema,
} from '@/main/data/workspace/schema';
import { Node, NodeTypes } from '@colanode/core';
import { LocalChange, Node, NodeTypes } from '@colanode/core';
import { Account } from '@/shared/types/accounts';
import { SelectAccount, SelectWorkspace } from './data/app/schema';
import { Workspace } from '@/shared/types/workspaces';
export const appPath = app.getPath('userData');
@@ -89,3 +93,37 @@ export const mapNode = (row: SelectNode): Node => {
serverVersionId: row.server_version_id,
};
};
export const mapAccount = (row: SelectAccount): Account => {
return {
id: row.id,
server: row.server,
name: row.name,
avatar: row.avatar,
deviceId: row.device_id,
email: row.email,
token: row.token,
status: row.status,
};
};
export const mapWorkspace = (row: SelectWorkspace): Workspace => {
return {
id: row.workspace_id,
name: row.name,
versionId: row.version_id,
accountId: row.account_id,
role: row.role,
userId: row.user_id,
avatar: row.avatar,
description: row.description,
};
};
export const mapChange = (change: SelectChange): LocalChange => {
return {
id: change.id,
data: JSON.parse(change.data),
createdAt: change.created_at,
};
};

View File

@@ -45,7 +45,7 @@ export const AccountLogout = ({ onCancel, onLogout }: AccountLogoutProps) => {
onClick={async () => {
mutate({
input: {
type: 'logout',
type: 'account_logout',
accountId: account.id,
},
onSuccess() {

View File

@@ -0,0 +1,17 @@
export type AccountLogoutMutationInput = {
type: 'account_logout';
accountId: string;
};
export type AccountLogoutMutationOutput = {
success: boolean;
};
declare module '@/shared/mutations' {
interface MutationMap {
account_logout: {
input: AccountLogoutMutationInput;
output: AccountLogoutMutationOutput;
};
}
}

View File

@@ -1,17 +0,0 @@
export type LogoutMutationInput = {
type: 'logout';
accountId: string;
};
export type LogoutMutationOutput = {
success: boolean;
};
declare module '@/shared/mutations' {
interface MutationMap {
logout: {
input: LogoutMutationInput;
output: LogoutMutationOutput;
};
}
}

View File

@@ -1,10 +1,3 @@
import { WorkspaceOutput } from '@colanode/core';
export type LoginOutput = {
account: Account;
workspaces: WorkspaceOutput[];
};
export type Account = {
id: string;
name: string;
@@ -15,9 +8,3 @@ export type Account = {
status: string;
server: string;
};
export type AccountUpdateOutput = {
id: string;
name: string;
avatar?: string | null;
};