Add collaboration sync for desktop

This commit is contained in:
Hakan Shehu
2024-11-28 19:12:03 +01:00
parent 09aeec166b
commit cb6972918d
14 changed files with 452 additions and 13 deletions

View File

@@ -92,6 +92,9 @@ const createWorkspaceCursorsTable: Migration = {
.onDelete('cascade')
)
.addColumn('transactions', 'integer', (col) => col.notNull().defaultTo(0))
.addColumn('collaborations', 'integer', (col) =>
col.notNull().defaultTo(0)
)
.addColumn('revocations', 'integer', (col) => col.notNull().defaultTo(0))
.addColumn('created_at', 'text', (col) => col.notNull())
.addColumn('updated_at', 'text')

View File

@@ -48,6 +48,7 @@ export type UpdateWorkspace = Updateable<WorkspaceTable>;
interface WorkspaceCursorTable {
user_id: ColumnType<string, string, never>;
transactions: ColumnType<bigint, bigint, bigint>;
collaborations: ColumnType<bigint, bigint, bigint>;
revocations: ColumnType<bigint, bigint, bigint>;
created_at: ColumnType<string, string, string>;
updated_at: ColumnType<string | null, string | null, string>;

View File

@@ -56,6 +56,25 @@ const createNodeTransactionsTable: Migration = {
},
};
const createCollaborationsTable: Migration = {
up: async (db) => {
await db.schema
.createTable('collaborations')
.addColumn('user_id', 'text', (col) => col.notNull())
.addColumn('node_id', 'text', (col) => col.notNull())
.addColumn('workspace_id', 'text', (col) => col.notNull())
.addColumn('roles', 'text')
.addColumn('created_at', 'text', (col) => col.notNull())
.addColumn('updated_at', 'text')
.addColumn('version', 'integer')
.addPrimaryKeyConstraint('collaborations_pkey', ['user_id', 'node_id'])
.execute();
},
down: async (db) => {
await db.schema.dropTable('collaborations').execute();
},
};
const createDownloadsTable: Migration = {
up: async (db) => {
await db.schema
@@ -236,11 +255,12 @@ const createNodeDeleteNameTrigger: Migration = {
export const workspaceDatabaseMigrations: Record<string, Migration> = {
'00001_create_nodes_table': createNodesTable,
'00002_create_node_transactions_table': createNodeTransactionsTable,
'00003_create_uploads_table': createUploadsTable,
'00004_create_downloads_table': createDownloadsTable,
'00005_create_node_paths_table': createNodePathsTable,
'00006_create_node_names_table': createNodeNamesTable,
'00007_create_node_insert_name_trigger': createNodeInsertNameTrigger,
'00008_create_node_update_name_trigger': createNodeUpdateNameTrigger,
'00009_create_node_delete_name_trigger': createNodeDeleteNameTrigger,
'00003_create_collaborations_table': createCollaborationsTable,
'00004_create_uploads_table': createUploadsTable,
'00005_create_downloads_table': createDownloadsTable,
'00006_create_node_paths_table': createNodePathsTable,
'00007_create_node_names_table': createNodeNamesTable,
'00008_create_node_insert_name_trigger': createNodeInsertNameTrigger,
'00009_create_node_update_name_trigger': createNodeUpdateNameTrigger,
'00010_create_node_delete_name_trigger': createNodeDeleteNameTrigger,
};

View File

@@ -42,6 +42,20 @@ export type SelectNodeTransaction = Selectable<NodeTransactionTable>;
export type CreateNodeTransaction = Insertable<NodeTransactionTable>;
export type UpdateNodeTransaction = Updateable<NodeTransactionTable>;
interface CollaborationTable {
user_id: ColumnType<string, string, never>;
node_id: ColumnType<string, string, never>;
workspace_id: ColumnType<string, string, never>;
roles: ColumnType<string, string, string>;
created_at: ColumnType<string, string, never>;
updated_at: ColumnType<string | null, string | null, string | null>;
version: ColumnType<bigint, bigint, bigint>;
}
export type SelectCollaboration = Selectable<CollaborationTable>;
export type CreateCollaboration = Insertable<CollaborationTable>;
export type UpdateCollaboration = Updateable<CollaborationTable>;
interface UploadTable {
node_id: ColumnType<string, string, never>;
upload_id: ColumnType<string, string, never>;
@@ -73,6 +87,7 @@ export interface WorkspaceDatabaseSchema {
nodes: NodeTable;
node_transactions: NodeTransactionTable;
node_paths: NodePathTable;
collaborations: CollaborationTable;
uploads: UploadTable;
downloads: DownloadTable;
}

View File

@@ -1,7 +1,48 @@
import { ServerCollaborationRevocation } from '@colanode/core';
import {
ServerCollaboration,
ServerCollaborationRevocation,
} from '@colanode/core';
import { databaseService } from '@/main/data/database-service';
import { eventBus } from '@/shared/lib/event-bus';
class CollaborationService {
public async applyServerCollaboration(
userId: string,
collaboration: ServerCollaboration
) {
const workspaceDatabase =
await databaseService.getWorkspaceDatabase(userId);
await workspaceDatabase
.insertInto('collaborations')
.values({
user_id: userId,
node_id: collaboration.nodeId,
workspace_id: collaboration.workspaceId,
roles: JSON.stringify(collaboration.roles),
created_at: collaboration.createdAt,
version: BigInt(collaboration.version),
})
.onConflict((oc) =>
oc
.columns(['user_id', 'node_id'])
.doUpdateSet({
roles: JSON.stringify(collaboration.roles),
version: BigInt(collaboration.version),
updated_at: collaboration.updatedAt,
})
.where('version', '<', BigInt(collaboration.version))
)
.execute();
eventBus.publish({
type: 'collaboration_synced',
userId,
nodeId: collaboration.nodeId,
workspaceId: collaboration.workspaceId,
});
}
public async applyServerCollaborationRevocation(
userId: string,
revocation: ServerCollaborationRevocation

View File

@@ -51,6 +51,8 @@ export class SocketConnection {
syncService.syncServerTransactions(message);
} else if (message.type === 'collaboration_revocations_batch') {
syncService.syncServerRevocations(message);
} else if (message.type === 'collaborations_batch') {
syncService.syncServerCollaborations(message);
}
};

View File

@@ -5,7 +5,9 @@ import { httpClient } from '@/shared/lib/http-client';
import { serverService } from '@/main/services/server-service';
import {
CollaborationRevocationsBatchMessage,
CollaborationsBatchMessage,
FetchCollaborationRevocationsMessage,
FetchCollaborationsMessage,
FetchNodeTransactionsMessage,
GetNodeTransactionsOutput,
LocalNodeTransaction,
@@ -37,6 +39,7 @@ class SyncService {
> = new Map();
private readonly syncingTransactions: Set<string> = new Set();
private readonly syncingCollaborations: Set<string> = new Set();
private readonly syncingRevocations: Set<string> = new Set();
constructor() {
@@ -49,6 +52,8 @@ class SyncService {
this.requireNodeTransactions(event.workspace.userId);
} else if (event.type === 'socket_connection_opened') {
this.syncAllWorkspaces();
} else if (event.type === 'collaboration_synced') {
this.checkForMissingNode(event.userId, event.nodeId);
}
});
}
@@ -63,7 +68,9 @@ class SyncService {
this.syncLocalPendingTransactions(workspace.user_id);
this.syncLocalIncompleteTransactions(workspace.user_id);
this.requireNodeTransactions(workspace.user_id);
this.requireCollaborations(workspace.user_id);
this.requireCollaborationRevocations(workspace.user_id);
this.syncMissingNodes(workspace.user_id);
}
}
@@ -158,6 +165,36 @@ class SyncService {
}
}
public async syncServerCollaborations(message: CollaborationsBatchMessage) {
if (this.syncingCollaborations.has(message.userId)) {
return;
}
this.syncingCollaborations.add(message.userId);
let cursor: bigint | null = null;
try {
for (const collaboration of message.collaborations) {
await collaborationService.applyServerCollaboration(
message.userId,
collaboration
);
cursor = BigInt(collaboration.version);
}
if (cursor) {
this.updateCollaborationCursor(message.userId, cursor);
}
} catch (error) {
this.logger.error(
error,
`Error syncing server collaborations for user ${message.userId}`
);
} finally {
this.syncingCollaborations.delete(message.userId);
this.requireCollaborations(message.userId);
}
}
public async syncServerRevocations(
message: CollaborationRevocationsBatchMessage
) {
@@ -296,6 +333,127 @@ class SyncService {
}
}
private async syncMissingNodes(userId: string) {
const workspaceDatabase =
await databaseService.getWorkspaceDatabase(userId);
const missingNodes = await workspaceDatabase
.selectFrom('collaborations')
.leftJoin('nodes', 'collaborations.node_id', 'nodes.id')
.select('collaborations.node_id')
.where('nodes.id', 'is', null)
.execute();
console.log('missingNodes', missingNodes);
if (missingNodes.length === 0) {
return;
}
const workspace = await databaseService.appDatabase
.selectFrom('workspaces')
.innerJoin('accounts', 'workspaces.account_id', 'accounts.id')
.innerJoin('servers', 'accounts.server', 'servers.domain')
.select([
'workspaces.workspace_id',
'workspaces.user_id',
'workspaces.account_id',
'accounts.token',
'servers.domain',
'servers.attributes',
])
.where('workspaces.user_id', '=', userId)
.executeTakeFirst();
if (!workspace) {
return;
}
if (!serverService.isAvailable(workspace.domain)) {
return;
}
for (const node of missingNodes) {
try {
const { data } = await httpClient.get<GetNodeTransactionsOutput>(
`/v1/nodes/${workspace.workspace_id}/transactions/${node.node_id}`,
{
domain: workspace.domain,
token: workspace.token,
}
);
console.log('missing node data', data);
await nodeService.replaceTransactions(
userId,
node.node_id,
data.transactions
);
} catch (error) {
this.logger.error(
error,
`Error syncing missing node ${node.node_id} for user ${userId}`
);
}
}
}
private async checkForMissingNode(userId: string, nodeId: string) {
const workspaceDatabase =
await databaseService.getWorkspaceDatabase(userId);
const node = await workspaceDatabase
.selectFrom('nodes')
.selectAll()
.where('id', '=', nodeId)
.executeTakeFirst();
if (node) {
return;
}
const workspace = await databaseService.appDatabase
.selectFrom('workspaces')
.innerJoin('accounts', 'workspaces.account_id', 'accounts.id')
.innerJoin('servers', 'accounts.server', 'servers.domain')
.select([
'workspaces.workspace_id',
'workspaces.user_id',
'workspaces.account_id',
'accounts.token',
'servers.domain',
'servers.attributes',
])
.where('workspaces.user_id', '=', userId)
.executeTakeFirst();
if (!workspace) {
return;
}
if (!serverService.isAvailable(workspace.domain)) {
return;
}
try {
const { data } = await httpClient.get<GetNodeTransactionsOutput>(
`/v1/nodes/${workspace.workspace_id}/transactions/${nodeId}`,
{
domain: workspace.domain,
token: workspace.token,
}
);
await nodeService.replaceTransactions(userId, nodeId, data.transactions);
} catch (error) {
this.logger.error(
error,
`Error checking for missing node ${nodeId} for user ${userId}`
);
}
}
private async sendLocalTransactions(userId: string) {
const workspaceDatabase =
await databaseService.getWorkspaceDatabase(userId);
@@ -405,6 +563,33 @@ class SyncService {
socketService.sendMessage(workspaceWithCursor.account_id, message);
}
private async requireCollaborations(userId: string) {
const workspaceWithCursor = await databaseService.appDatabase
.selectFrom('workspaces as w')
.leftJoin('workspace_cursors as wc', 'w.user_id', 'wc.user_id')
.select([
'w.user_id',
'w.workspace_id',
'w.account_id',
'wc.collaborations',
])
.where('w.user_id', '=', userId)
.executeTakeFirst();
if (!workspaceWithCursor) {
return;
}
const message: FetchCollaborationsMessage = {
type: 'fetch_collaborations',
userId: workspaceWithCursor.user_id,
workspaceId: workspaceWithCursor.workspace_id,
cursor: workspaceWithCursor.collaborations?.toString() ?? '0',
};
socketService.sendMessage(workspaceWithCursor.account_id, message);
}
private async requireCollaborationRevocations(userId: string) {
const workspaceWithCursor = await databaseService.appDatabase
.selectFrom('workspaces as w')
@@ -433,6 +618,7 @@ class SyncService {
.values({
user_id: userId,
transactions: cursor,
collaborations: 0n,
revocations: 0n,
created_at: new Date().toISOString(),
})
@@ -445,6 +631,25 @@ class SyncService {
.execute();
}
private async updateCollaborationCursor(userId: string, cursor: bigint) {
await databaseService.appDatabase
.insertInto('workspace_cursors')
.values({
user_id: userId,
collaborations: cursor,
revocations: 0n,
transactions: 0n,
created_at: new Date().toISOString(),
})
.onConflict((eb) =>
eb.column('user_id').doUpdateSet({
collaborations: cursor,
updated_at: new Date().toISOString(),
})
)
.execute();
}
private async updateCollaborationRevocationCursor(
userId: string,
cursor: bigint
@@ -455,6 +660,7 @@ class SyncService {
user_id: userId,
revocations: cursor,
transactions: 0n,
collaborations: 0n,
created_at: new Date().toISOString(),
})
.onConflict((eb) =>

View File

@@ -120,6 +120,13 @@ export type NodeTransactionIncompleteEvent = {
transactionId: string;
};
export type CollaborationCreatedEvent = {
type: 'collaboration_synced';
userId: string;
nodeId: string;
workspaceId: string;
};
export type ServerAvailabilityChangedEvent = {
type: 'server_availability_changed';
server: Server;
@@ -154,4 +161,5 @@ export type Event =
| NodeTransactionCreatedEvent
| NodeTransactionIncompleteEvent
| ServerAvailabilityChangedEvent
| SocketConnectionOpenedEvent;
| SocketConnectionOpenedEvent
| CollaborationCreatedEvent;

View File

@@ -233,6 +233,17 @@ const createNodePathsTable: Migration = {
const createCollaborationsTable: Migration = {
up: async (db) => {
// Create sequence first
await sql`
CREATE SEQUENCE IF NOT EXISTS collaborations_version_seq
START WITH 1000000000
INCREMENT BY 1
NO MINVALUE
NO MAXVALUE
CACHE 1;
`.execute(db);
// Create table with version column
await db.schema
.createTable('collaborations')
.addColumn('user_id', 'varchar(30)', (col) => col.notNull())
@@ -241,10 +252,33 @@ const createCollaborationsTable: Migration = {
.addColumn('roles', 'jsonb', (col) => col.notNull().defaultTo('{}'))
.addColumn('created_at', 'timestamptz', (col) => col.notNull())
.addColumn('updated_at', 'timestamptz')
.addColumn('version', 'bigint', (col) =>
col.notNull().defaultTo(sql`nextval('collaborations_version_seq')`)
)
.addPrimaryKeyConstraint('collaborations_pkey', ['user_id', 'node_id'])
.execute();
// Add trigger to update version on update
await sql`
CREATE OR REPLACE FUNCTION update_collaboration_version() RETURNS TRIGGER AS $$
BEGIN
NEW.version = nextval('collaborations_version_seq');
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER trg_update_collaboration_version
BEFORE UPDATE ON collaborations
FOR EACH ROW
EXECUTE FUNCTION update_collaboration_version();
`.execute(db);
},
down: async (db) => {
await sql`
DROP TRIGGER IF EXISTS trg_update_collaboration_version ON collaborations;
DROP FUNCTION IF EXISTS update_collaboration_version();
DROP SEQUENCE IF EXISTS collaborations_version_seq;
`.execute(db);
await db.schema.dropTable('collaborations').execute();
},
};

View File

@@ -118,11 +118,11 @@ export type UpdateNodeTransaction = Updateable<NodeTransactionTable>;
interface CollaborationTable {
user_id: ColumnType<string, string, never>;
node_id: ColumnType<string, string, never>;
type: ColumnType<string, never, never>;
workspace_id: ColumnType<string, string, never>;
roles: JSONColumnType<Record<string, NodeRole>, string, string>;
created_at: ColumnType<Date, Date, never>;
updated_at: ColumnType<Date | null, Date | null, Date>;
version: ColumnType<bigint, never, never>;
}
export type SelectCollaboration = Selectable<CollaborationTable>;

View File

@@ -1,5 +1,6 @@
import { database } from '@/data/database';
import {
SelectCollaboration,
SelectCollaborationRevocation,
SelectNode,
SelectNodeTransaction,
@@ -8,6 +9,7 @@ import { NodeCollaborator } from '@/types/nodes';
import {
NodeOutput,
NodeRole,
ServerCollaboration,
ServerCollaborationRevocation,
ServerNodeTransaction,
} from '@colanode/core';
@@ -111,6 +113,20 @@ export const mapCollaborationRevocation = (
};
};
export const mapCollaboration = (
collaboration: SelectCollaboration
): ServerCollaboration => {
return {
userId: collaboration.user_id,
nodeId: collaboration.node_id,
workspaceId: collaboration.workspace_id,
roles: collaboration.roles,
createdAt: collaboration.created_at.toISOString(),
updatedAt: collaboration.updated_at?.toISOString() ?? null,
version: collaboration.version.toString(),
};
};
export const fetchNode = async (nodeId: string): Promise<SelectNode | null> => {
const result = await database
.selectFrom('nodes')

View File

@@ -4,12 +4,17 @@ import { WebSocketServer, WebSocket } from 'ws';
import { verifyToken } from '@/lib/tokens';
import {
CollaborationRevocationsBatchMessage,
CollaborationsBatchMessage,
Message,
NodeTransactionsBatchMessage,
NodeType,
} from '@colanode/core';
import { logService } from '@/services/log-service';
import { mapCollaborationRevocation, mapNodeTransaction } from '@/lib/nodes';
import {
mapCollaboration,
mapCollaborationRevocation,
mapNodeTransaction,
} from '@/lib/nodes';
import { eventBus } from '@/lib/event-bus';
import {
CollaboratorRemovedEvent,
@@ -29,6 +34,7 @@ interface SynapseConnection {
socket: WebSocket;
transactions: Map<string, SynapseUserCursor>;
revocations: Map<string, SynapseUserCursor>;
collaborations: Map<string, SynapseUserCursor>;
}
const PUBLIC_NODES: NodeType[] = ['workspace', 'user'];
@@ -116,6 +122,7 @@ class SynapseService {
socket,
transactions: new Map(),
revocations: new Map(),
collaborations: new Map(),
};
this.connections.set(account.deviceId, connection);
@@ -168,6 +175,21 @@ class SynapseService {
state.cursor = message.cursor;
this.sendPendingRevocations(connection, message.userId);
}
} else if (message.type === 'fetch_collaborations') {
const state = connection.collaborations.get(message.userId);
if (!state) {
connection.collaborations.set(message.userId, {
userId: message.userId,
workspaceId: message.workspaceId,
cursor: message.cursor,
syncing: false,
});
this.sendPendingCollaborations(connection, message.userId);
} else if (!state.syncing && state.cursor !== message.cursor) {
state.cursor = message.cursor;
this.sendPendingCollaborations(connection, message.userId);
}
}
}
@@ -262,6 +284,46 @@ class SynapseService {
this.sendMessage(connection, message);
}
private async sendPendingCollaborations(
connection: SynapseConnection,
userId: string
) {
const state = connection.collaborations.get(userId);
if (!state || state.syncing) {
return;
}
state.syncing = true;
this.logger.trace(
state,
`Sending pending collaborations for ${connection.deviceId} with ${userId}`
);
const unsyncedCollaborations = await database
.selectFrom('collaborations as c')
.selectAll()
.where('c.user_id', '=', userId)
.where('c.version', '>', BigInt(state.cursor))
.orderBy('c.version', 'asc')
.limit(50)
.execute();
if (unsyncedCollaborations.length === 0) {
state.syncing = false;
return;
}
const collaborations = unsyncedCollaborations.map(mapCollaboration);
const message: CollaborationsBatchMessage = {
type: 'collaborations_batch',
userId,
collaborations,
};
connection.collaborations.delete(userId);
this.sendMessage(connection, message);
}
private async handleNodeTransactionCreatedEvent(
event: NodeTransactionCreatedEvent
) {