Improve synchronizer

This commit is contained in:
Hakan Shehu
2024-10-07 19:35:03 +02:00
parent 200c00af18
commit 6bb0296763

View File

@@ -1,3 +1,4 @@
import { BackoffCalculator } from '@/lib/backoff-calculator';
import { buildAxiosInstance } from '@/lib/servers';
import { SelectWorkspace } from '@/main/data/app/schema';
import { databaseManager } from '@/main/data/database-manager';
@@ -7,6 +8,8 @@ const EVENT_LOOP_INTERVAL = 100;
class Synchronizer {
private initiated: boolean = false;
private readonly workspaceBackoffs: Map<string, BackoffCalculator> =
new Map();
constructor() {
this.executeEventLoop = this.executeEventLoop.bind(this);
@@ -64,146 +67,184 @@ class Synchronizer {
.execute();
for (const workspace of workspaces) {
const workspaceDatabase = await databaseManager.getWorkspaceDatabase(
workspace.user_id,
);
const changes = await workspaceDatabase
.selectFrom('changes')
.selectAll()
.orderBy('id asc')
.limit(20)
.execute();
if (changes.length === 0) {
return;
}
const axios = buildAxiosInstance(
workspace.domain,
workspace.attributes,
workspace.token,
);
const { data } = await axios.post<ServerSyncResponse>(
`/v1/sync/${workspace.workspace_id}`,
{
changes: changes,
},
);
const syncedChangeIds: number[] = [];
const unsyncedChangeIds: number[] = [];
for (const result of data.results) {
if (result.status === 'success') {
syncedChangeIds.push(result.id);
} else {
unsyncedChangeIds.push(result.id);
if (this.workspaceBackoffs.has(workspace.workspace_id)) {
const backoff = this.workspaceBackoffs.get(workspace.workspace_id);
if (!backoff.canRetry()) {
return;
}
}
if (syncedChangeIds.length > 0) {
await workspaceDatabase
.deleteFrom('changes')
.where('id', 'in', syncedChangeIds)
.execute();
}
try {
const workspaceDatabase = await databaseManager.getWorkspaceDatabase(
workspace.user_id,
);
if (unsyncedChangeIds.length > 0) {
await workspaceDatabase
.updateTable('changes')
.set((eb) => ({ retry_count: eb('retry_count', '+', 1) }))
.where('id', 'in', unsyncedChangeIds)
const changes = await workspaceDatabase
.selectFrom('changes')
.selectAll()
.orderBy('id asc')
.limit(20)
.execute();
//we just delete changes that have failed to sync for more than 5 times.
//in the future we might need to revert the change locally.
await workspaceDatabase
.deleteFrom('changes')
.where('retry_count', '>=', 5)
.execute();
if (changes.length === 0) {
return;
}
const axios = buildAxiosInstance(
workspace.domain,
workspace.attributes,
workspace.token,
);
const { data } = await axios.post<ServerSyncResponse>(
`/v1/sync/${workspace.workspace_id}`,
{
changes: changes,
},
);
const syncedChangeIds: number[] = [];
const unsyncedChangeIds: number[] = [];
for (const result of data.results) {
if (result.status === 'success') {
syncedChangeIds.push(result.id);
} else {
unsyncedChangeIds.push(result.id);
}
}
if (syncedChangeIds.length > 0) {
await workspaceDatabase
.deleteFrom('changes')
.where('id', 'in', syncedChangeIds)
.execute();
}
if (unsyncedChangeIds.length > 0) {
await workspaceDatabase
.updateTable('changes')
.set((eb) => ({ retry_count: eb('retry_count', '+', 1) }))
.where('id', 'in', unsyncedChangeIds)
.execute();
//we just delete changes that have failed to sync for more than 5 times.
//in the future we might need to revert the change locally.
await workspaceDatabase
.deleteFrom('changes')
.where('retry_count', '>=', 5)
.execute();
}
} catch (error) {
if (!this.workspaceBackoffs.has(workspace.workspace_id)) {
this.workspaceBackoffs.set(
workspace.workspace_id,
new BackoffCalculator(),
);
}
const backoff = this.workspaceBackoffs.get(workspace.workspace_id);
backoff.increaseError();
}
}
}
private async syncWorkspace(workspace: SelectWorkspace): Promise<void> {
const credentials = await databaseManager.appDatabase
.selectFrom('accounts')
.innerJoin('servers', 'accounts.server', 'servers.domain')
.select(['domain', 'attributes', 'token'])
.where('id', '=', workspace.account_id)
.executeTakeFirst();
if (!credentials) {
return;
if (this.workspaceBackoffs.has(workspace.workspace_id)) {
const backoff = this.workspaceBackoffs.get(workspace.workspace_id);
if (!backoff.canRetry()) {
return;
}
}
const axios = buildAxiosInstance(
credentials.domain,
credentials.attributes,
credentials.token,
);
const { data } = await axios.get<WorkspaceSyncData>(
`/v1/sync/${workspace.workspace_id}`,
);
try {
const credentials = await databaseManager.appDatabase
.selectFrom('accounts')
.innerJoin('servers', 'accounts.server', 'servers.domain')
.select(['domain', 'attributes', 'token'])
.where('id', '=', workspace.account_id)
.executeTakeFirst();
const workspaceDatabase = await databaseManager.getWorkspaceDatabase(
workspace.user_id,
);
await workspaceDatabase.transaction().execute(async (trx) => {
await trx.deleteFrom('nodes').execute();
await trx.deleteFrom('node_reactions').execute();
await trx.deleteFrom('node_collaborators').execute();
if (data.nodes.length > 0) {
await trx
.insertInto('nodes')
.values(
data.nodes.map((node) => {
return {
id: node.id,
attributes: JSON.stringify(node.attributes),
state: node.state,
created_at: node.createdAt,
created_by: node.createdBy,
updated_at: node.updatedAt,
updated_by: node.updatedBy,
version_id: node.versionId,
server_created_at: node.serverCreatedAt,
server_updated_at: node.serverUpdatedAt,
server_version_id: node.versionId,
};
}),
)
.execute();
if (!credentials) {
return;
}
if (data.nodeReactions.length > 0) {
await trx
.insertInto('node_reactions')
.values(
data.nodeReactions.map((nodeReaction) => {
return {
node_id: nodeReaction.nodeId,
actor_id: nodeReaction.actorId,
reaction: nodeReaction.reaction,
created_at: nodeReaction.createdAt,
server_created_at: nodeReaction.serverCreatedAt,
};
}),
)
.execute();
}
});
const axios = buildAxiosInstance(
credentials.domain,
credentials.attributes,
credentials.token,
);
const { data } = await axios.get<WorkspaceSyncData>(
`/v1/sync/${workspace.workspace_id}`,
);
await databaseManager.appDatabase
.updateTable('workspaces')
.set({
synced: 1,
})
.where('user_id', '=', workspace.user_id)
.execute();
const workspaceDatabase = await databaseManager.getWorkspaceDatabase(
workspace.user_id,
);
await workspaceDatabase.transaction().execute(async (trx) => {
await trx.deleteFrom('nodes').execute();
await trx.deleteFrom('node_reactions').execute();
await trx.deleteFrom('node_collaborators').execute();
if (data.nodes.length > 0) {
await trx
.insertInto('nodes')
.values(
data.nodes.map((node) => {
return {
id: node.id,
attributes: JSON.stringify(node.attributes),
state: node.state,
created_at: node.createdAt,
created_by: node.createdBy,
updated_at: node.updatedAt,
updated_by: node.updatedBy,
version_id: node.versionId,
server_created_at: node.serverCreatedAt,
server_updated_at: node.serverUpdatedAt,
server_version_id: node.versionId,
};
}),
)
.execute();
}
if (data.nodeReactions.length > 0) {
await trx
.insertInto('node_reactions')
.values(
data.nodeReactions.map((nodeReaction) => {
return {
node_id: nodeReaction.nodeId,
actor_id: nodeReaction.actorId,
reaction: nodeReaction.reaction,
created_at: nodeReaction.createdAt,
server_created_at: nodeReaction.serverCreatedAt,
};
}),
)
.execute();
}
});
await databaseManager.appDatabase
.updateTable('workspaces')
.set({
synced: 1,
})
.where('user_id', '=', workspace.user_id)
.execute();
} catch (error) {
if (!this.workspaceBackoffs.has(workspace.workspace_id)) {
this.workspaceBackoffs.set(
workspace.workspace_id,
new BackoffCalculator(),
);
}
const backoff = this.workspaceBackoffs.get(workspace.workspace_id);
backoff.increaseError();
}
}
}