Add resync for incomplete transactions

This commit is contained in:
Hakan Shehu
2024-11-28 17:55:17 +01:00
parent f33e301eab
commit 09aeec166b
8 changed files with 383 additions and 67 deletions

View File

@@ -28,6 +28,7 @@ import {
} from '@/main/data/workspace/schema';
import { eventBus } from '@/shared/lib/event-bus';
import { SelectWorkspace } from '@/main/data/app/schema';
import { sql } from 'kysely';
export type CreateNodeInput = {
id: string;
@@ -437,6 +438,96 @@ class NodeService {
}
}
public async replaceTransactions(
userId: string,
nodeId: string,
transactions: ServerNodeTransaction[]
): Promise<boolean> {
const workspaceDatabase =
await databaseService.getWorkspaceDatabase(userId);
const firstTransaction = transactions[0];
if (!firstTransaction) {
return false;
}
const lastTransaction = transactions[transactions.length - 1];
if (!lastTransaction) {
return false;
}
const ydoc = new YDoc();
for (const transaction of transactions) {
if (transaction.operation === 'delete') {
await this.applyServerDeleteTransaction(userId, transaction);
return true;
}
ydoc.applyUpdate(transaction.data);
}
const attributes = ydoc.getAttributes<NodeAttributes>();
const attributesJson = JSON.stringify(attributes);
await workspaceDatabase.transaction().execute(async (trx) => {
await trx
.insertInto('nodes')
.values({
id: nodeId,
attributes: attributesJson,
created_at: firstTransaction.createdAt,
created_by: firstTransaction.createdBy,
updated_at:
firstTransaction.id !== lastTransaction.id
? lastTransaction.createdAt
: null,
updated_by:
firstTransaction.id !== lastTransaction.id
? lastTransaction.createdBy
: null,
transaction_id: lastTransaction.id,
})
.onConflict((oc) =>
oc.columns(['id']).doUpdateSet({
attributes: attributesJson,
updated_at: lastTransaction.createdAt,
updated_by: lastTransaction.createdBy,
transaction_id: lastTransaction.id,
})
)
.execute();
await trx
.insertInto('node_transactions')
.values(
transactions.map((t) => ({
id: t.id,
node_id: t.nodeId,
node_type: t.nodeType,
operation: t.operation,
data:
t.operation !== 'delete' && t.data ? decodeState(t.data) : null,
created_at: t.createdAt,
created_by: t.createdBy,
retry_count: 0,
status: 'synced',
version: BigInt(t.version),
server_created_at: t.serverCreatedAt,
}))
)
.onConflict((oc) =>
oc.columns(['id']).doUpdateSet({
status: 'synced',
version: sql`excluded.version`,
server_created_at: sql`excluded.server_created_at`,
})
)
.execute();
});
return true;
}
private async applyServerCreateTransaction(
userId: string,
transaction: ServerNodeCreateTransaction
@@ -481,23 +572,6 @@ class NodeService {
const result = await workspaceDatabase
.transaction()
.execute(async (trx) => {
await trx
.insertInto('node_transactions')
.values({
id: transaction.id,
node_id: transaction.nodeId,
node_type: transaction.nodeType,
operation: 'create',
data: decodeState(transaction.data),
created_at: transaction.createdAt,
created_by: transaction.createdBy,
retry_count: 0,
status: 'synced',
version,
server_created_at: transaction.serverCreatedAt,
})
.execute();
const nodeRow = await trx
.insertInto('nodes')
.returningAll()
@@ -510,6 +584,23 @@ class NodeService {
})
.executeTakeFirst();
await trx
.insertInto('node_transactions')
.values({
id: transaction.id,
node_id: transaction.nodeId,
node_type: transaction.nodeType,
operation: 'create',
data: decodeState(transaction.data),
created_at: transaction.createdAt,
created_by: transaction.createdBy,
retry_count: 0,
status: nodeRow ? 'synced' : 'incomplete',
version,
server_created_at: transaction.serverCreatedAt,
})
.execute();
return nodeRow;
});
@@ -519,6 +610,12 @@ class NodeService {
userId,
node: mapNode(result),
});
} else {
eventBus.publish({
type: 'node_transaction_incomplete',
userId,
transactionId: transaction.id,
});
}
}
@@ -579,23 +676,6 @@ class NodeService {
const result = await workspaceDatabase
.transaction()
.execute(async (trx) => {
await trx
.insertInto('node_transactions')
.values({
id: transaction.id,
node_id: transaction.nodeId,
node_type: transaction.nodeType,
operation: 'update',
data: decodeState(transaction.data),
created_at: transaction.createdAt,
created_by: transaction.createdBy,
retry_count: 0,
status: 'synced',
version,
server_created_at: transaction.serverCreatedAt,
})
.execute();
const nodeRow = await trx
.updateTable('nodes')
.returningAll()
@@ -608,6 +688,23 @@ class NodeService {
.where('id', '=', transaction.nodeId)
.executeTakeFirst();
await trx
.insertInto('node_transactions')
.values({
id: transaction.id,
node_id: transaction.nodeId,
node_type: transaction.nodeType,
operation: 'update',
data: decodeState(transaction.data),
created_at: transaction.createdAt,
created_by: transaction.createdBy,
retry_count: 0,
status: nodeRow ? 'synced' : 'incomplete',
version,
server_created_at: transaction.serverCreatedAt,
})
.execute();
return nodeRow;
});
@@ -617,6 +714,12 @@ class NodeService {
userId,
node: mapNode(result),
});
} else {
eventBus.publish({
type: 'node_transaction_incomplete',
userId,
transactionId: transaction.id,
});
}
}

View File

@@ -1,5 +1,4 @@
import { databaseService } from '@/main/data/database-service';
import { getIdType, IdType, NodeTypes } from '@colanode/core';
import { WorkspaceRadarData } from '@/shared/types/radars';
import { eventBus } from '@/shared/lib/event-bus';
import { Event } from '@/shared/types/events';
@@ -130,15 +129,6 @@ class RadarService {
eventBus.publish({
type: 'radar_data_updated',
});
} else if (
event.type === 'user_node_created' ||
event.type === 'user_node_updated'
) {
// to be optimized
await this.initWorkspace(event.userId);
eventBus.publish({
type: 'radar_data_updated',
});
}
}
}

View File

@@ -7,6 +7,7 @@ import {
CollaborationRevocationsBatchMessage,
FetchCollaborationRevocationsMessage,
FetchNodeTransactionsMessage,
GetNodeTransactionsOutput,
LocalNodeTransaction,
NodeTransactionsBatchMessage,
SyncNodeTransactionsOutput,
@@ -15,6 +16,8 @@ import { logService } from '@/main/services/log-service';
import { nodeService } from '@/main/services/node-service';
import { socketService } from '@/main/services/socket-service';
import { collaborationService } from '@/main/services/collaboration-service';
import { SelectNodeTransaction } from '@/main/data/workspace/schema';
import { sql } from 'kysely';
type WorkspaceSyncState = {
isSyncing: boolean;
@@ -23,7 +26,15 @@ type WorkspaceSyncState = {
class SyncService {
private readonly logger = logService.createLogger('sync-service');
private readonly localSyncStates: Map<string, WorkspaceSyncState> = new Map();
private readonly localPendingTransactionStates: Map<
string,
WorkspaceSyncState
> = new Map();
private readonly localIncompleteTransactionStates: Map<
string,
WorkspaceSyncState
> = new Map();
private readonly syncingTransactions: Set<string> = new Set();
private readonly syncingRevocations: Set<string> = new Set();
@@ -31,7 +42,9 @@ class SyncService {
constructor() {
eventBus.subscribe((event) => {
if (event.type === 'node_transaction_created') {
this.syncLocalTransactions(event.userId);
this.syncLocalPendingTransactions(event.userId);
} else if (event.type === 'node_transaction_incomplete') {
this.syncLocalIncompleteTransactions(event.userId);
} else if (event.type === 'workspace_created') {
this.requireNodeTransactions(event.workspace.userId);
} else if (event.type === 'socket_connection_opened') {
@@ -47,21 +60,22 @@ class SyncService {
.execute();
for (const workspace of workspaces) {
this.syncLocalTransactions(workspace.user_id);
this.syncLocalPendingTransactions(workspace.user_id);
this.syncLocalIncompleteTransactions(workspace.user_id);
this.requireNodeTransactions(workspace.user_id);
this.requireCollaborationRevocations(workspace.user_id);
}
}
public async syncLocalTransactions(userId: string) {
if (!this.localSyncStates.has(userId)) {
this.localSyncStates.set(userId, {
public async syncLocalPendingTransactions(userId: string) {
if (!this.localPendingTransactionStates.has(userId)) {
this.localPendingTransactionStates.set(userId, {
isSyncing: false,
scheduledSync: false,
});
}
const syncState = this.localSyncStates.get(userId)!;
const syncState = this.localPendingTransactionStates.get(userId)!;
if (syncState.isSyncing) {
syncState.scheduledSync = true;
return;
@@ -80,7 +94,39 @@ class SyncService {
if (syncState.scheduledSync) {
syncState.scheduledSync = false;
this.syncLocalTransactions(userId);
this.syncLocalPendingTransactions(userId);
}
}
}
public async syncLocalIncompleteTransactions(userId: string) {
if (!this.localIncompleteTransactionStates.has(userId)) {
this.localIncompleteTransactionStates.set(userId, {
isSyncing: false,
scheduledSync: false,
});
}
const syncState = this.localIncompleteTransactionStates.get(userId)!;
if (syncState.isSyncing) {
syncState.scheduledSync = true;
return;
}
syncState.isSyncing = true;
try {
await this.syncIncompleteTransactions(userId);
} catch (error) {
this.logger.error(
error,
`Error syncing incomplete transactions for user ${userId}`
);
} finally {
syncState.isSyncing = false;
if (syncState.scheduledSync) {
syncState.scheduledSync = false;
this.syncLocalIncompleteTransactions(userId);
}
}
}
@@ -144,6 +190,112 @@ class SyncService {
}
}
private async syncIncompleteTransactions(userId: string) {
const workspaceDatabase =
await databaseService.getWorkspaceDatabase(userId);
const incompleteTransactions = await workspaceDatabase
.selectFrom('node_transactions')
.selectAll()
.where('status', '=', 'incomplete')
.execute();
if (incompleteTransactions.length === 0) {
return;
}
const workspace = await databaseService.appDatabase
.selectFrom('workspaces')
.innerJoin('accounts', 'workspaces.account_id', 'accounts.id')
.innerJoin('servers', 'accounts.server', 'servers.domain')
.select([
'workspaces.workspace_id',
'workspaces.user_id',
'workspaces.account_id',
'accounts.token',
'servers.domain',
'servers.attributes',
])
.where('workspaces.user_id', '=', userId)
.executeTakeFirst();
if (!workspace) {
return;
}
if (!serverService.isAvailable(workspace.domain)) {
return;
}
const groupedByNodeId = incompleteTransactions.reduce<
Record<string, SelectNodeTransaction[]>
>((acc, transaction) => {
acc[transaction.node_id] = [
...(acc[transaction.node_id] ?? []),
transaction,
];
return acc;
}, {});
for (const [nodeId, transactions] of Object.entries(groupedByNodeId)) {
try {
const { data } = await httpClient.get<GetNodeTransactionsOutput>(
`/v1/nodes/${workspace.workspace_id}/transactions/${nodeId}`,
{
domain: workspace.domain,
token: workspace.token,
}
);
if (data.transactions.length === 0) {
await workspaceDatabase
.deleteFrom('node_transactions')
.where(
'id',
'in',
transactions.map((t) => t.id)
)
.execute();
continue;
}
const synced = await nodeService.replaceTransactions(
userId,
nodeId,
data.transactions
);
if (!synced) {
await workspaceDatabase
.updateTable('node_transactions')
.set({ retry_count: sql`retry_count + 1` })
.where(
'id',
'in',
transactions.map((t) => t.id)
)
.execute();
} else {
await workspaceDatabase
.updateTable('node_transactions')
.set({ retry_count: sql`retry_count + 1` })
.where(
'id',
'in',
transactions.map((t) => t.id)
)
.where('status', '=', 'incomplete')
.execute();
}
} catch (error) {
this.logger.error(
error,
`Error syncing incomplete transactions for node ${nodeId} for user ${userId}`
);
}
}
}
private async sendLocalTransactions(userId: string) {
const workspaceDatabase =
await databaseService.getWorkspaceDatabase(userId);

View File

@@ -62,18 +62,6 @@ export type ServerUpdatedEvent = {
server: Server;
};
export type UserNodeCreatedEvent = {
type: 'user_node_created';
userId: string;
userNode: UserNode;
};
export type UserNodeUpdatedEvent = {
type: 'user_node_updated';
userId: string;
userNode: UserNode;
};
export type DownloadCreatedEvent = {
type: 'download_created';
userId: string;
@@ -126,6 +114,12 @@ export type NodeTransactionCreatedEvent = {
transaction: LocalNodeTransaction;
};
export type NodeTransactionIncompleteEvent = {
type: 'node_transaction_incomplete';
userId: string;
transactionId: string;
};
export type ServerAvailabilityChangedEvent = {
type: 'server_availability_changed';
server: Server;
@@ -149,8 +143,6 @@ export type Event =
| WorkspaceDeletedEvent
| ServerCreatedEvent
| ServerUpdatedEvent
| UserNodeCreatedEvent
| UserNodeUpdatedEvent
| DownloadCreatedEvent
| DownloadUpdatedEvent
| DownloadDeletedEvent
@@ -160,5 +152,6 @@ export type Event =
| QueryResultUpdatedEvent
| RadarDataUpdatedEvent
| NodeTransactionCreatedEvent
| NodeTransactionIncompleteEvent
| ServerAvailabilityChangedEvent
| SocketConnectionOpenedEvent;

View File

@@ -9,6 +9,7 @@ import { syncRouter } from '@/routes/sync';
import { configRouter } from '@/routes/config';
import { avatarsRouter } from '@/routes/avatars';
import { filesRouter } from '@/routes/files';
import { nodesRouter } from '@/routes/nodes';
import { synapse } from '@/services/synapse-service';
import { logService } from '@/services/log-service';
@@ -35,6 +36,7 @@ export const initApi = async () => {
app.use('/v1/sync', authMiddleware, syncRouter);
app.use('/v1/avatars', authMiddleware, avatarsRouter);
app.use('/v1/files', authMiddleware, filesRouter);
app.use('/v1/nodes', authMiddleware, nodesRouter);
const server = http.createServer(app);
await synapse.init(server);

View File

@@ -7,6 +7,7 @@ import {
import { NodeCollaborator } from '@/types/nodes';
import {
NodeOutput,
NodeRole,
ServerCollaborationRevocation,
ServerNodeTransaction,
} from '@colanode/core';
@@ -172,7 +173,7 @@ export const fetchNodeCollaborators = async (
export const fetchNodeRole = async (
nodeId: string,
collaboratorId: string
): Promise<string | null> => {
): Promise<NodeRole | null> => {
const ancestors = await fetchNodeAncestors(nodeId);
if (ancestors.length === 0) {
return null;

View File

@@ -0,0 +1,70 @@
import { database } from '@/data/database';
import { fetchNodeRole, mapNodeTransaction } from '@/lib/nodes';
import { ApiError, ColanodeRequest, ColanodeResponse } from '@/types/api';
import { Router } from 'express';
import { GetNodeTransactionsOutput, hasViewerAccess } from '@colanode/core';
export const nodesRouter = Router();
nodesRouter.get(
'/:workspaceId/transactions/:nodeId',
async (req: ColanodeRequest, res: ColanodeResponse) => {
const workspaceId = req.params.workspaceId as string;
const nodeId = req.params.nodeId as string;
if (!req.account) {
return res.status(401).json({
code: ApiError.Unauthorized,
message: 'Unauthorized.',
});
}
const workspace = await database
.selectFrom('workspaces')
.selectAll()
.where('id', '=', workspaceId)
.executeTakeFirst();
if (!workspace) {
return res.status(404).json({
code: ApiError.ResourceNotFound,
message: 'Workspace not found.',
});
}
const workspaceUser = await database
.selectFrom('workspace_users')
.selectAll()
.where('workspace_id', '=', workspace.id)
.where('account_id', '=', req.account.id)
.executeTakeFirst();
if (!workspaceUser) {
return res.status(403).json({
code: ApiError.Forbidden,
message: 'Forbidden.',
});
}
const role = await fetchNodeRole(nodeId, workspaceUser.id);
if (role === null || !hasViewerAccess(role)) {
return res.status(403).json({
code: ApiError.Forbidden,
message: 'Forbidden.',
});
}
const transactions = await database
.selectFrom('node_transactions')
.selectAll()
.where('node_id', '=', nodeId)
.orderBy('version', 'desc')
.execute();
const output: GetNodeTransactionsOutput = {
transactions: transactions.map(mapNodeTransaction),
};
res.status(200).json(output);
}
);

View File

@@ -1,4 +1,5 @@
import { NodeAttributes } from '../registry';
import { ServerNodeTransaction } from './sync';
export type NodeOutput = {
id: string;
@@ -13,3 +14,7 @@ export type NodeOutput = {
updatedBy?: string | null;
transactionId: string;
};
export type GetNodeTransactionsOutput = {
transactions: ServerNodeTransaction[];
};