mirror of
https://github.com/colanode/colanode.git
synced 2025-12-29 00:25:03 +01:00
Improve synchronization and collaboration changes
This commit is contained in:
@@ -91,8 +91,8 @@ const createWorkspaceCursorsTable: Migration = {
|
||||
.references('workspaces.user_id')
|
||||
.onDelete('cascade')
|
||||
)
|
||||
.addColumn('node_transactions', 'integer', (col) => col.notNull())
|
||||
.addColumn('collaborations', 'integer', (col) => col.notNull())
|
||||
.addColumn('transactions', 'integer', (col) => col.notNull().defaultTo(0))
|
||||
.addColumn('revocations', 'integer', (col) => col.notNull().defaultTo(0))
|
||||
.addColumn('created_at', 'text', (col) => col.notNull())
|
||||
.addColumn('updated_at', 'text')
|
||||
.execute();
|
||||
|
||||
@@ -47,8 +47,8 @@ export type UpdateWorkspace = Updateable<WorkspaceTable>;
|
||||
|
||||
interface WorkspaceCursorTable {
|
||||
user_id: ColumnType<string, string, never>;
|
||||
node_transactions: ColumnType<bigint, bigint, bigint>;
|
||||
collaborations: ColumnType<bigint, bigint, bigint>;
|
||||
transactions: ColumnType<bigint, bigint, bigint>;
|
||||
revocations: ColumnType<bigint, bigint, bigint>;
|
||||
created_at: ColumnType<string, string, string>;
|
||||
updated_at: ColumnType<string | null, string | null, string>;
|
||||
}
|
||||
|
||||
@@ -40,14 +40,15 @@ const createNodeTransactionsTable: Migration = {
|
||||
.createTable('node_transactions')
|
||||
.addColumn('id', 'text', (col) => col.notNull().primaryKey())
|
||||
.addColumn('node_id', 'text', (col) => col.notNull())
|
||||
.addColumn('type', 'text', (col) => col.notNull())
|
||||
.addColumn('node_type', 'text', (col) => col.notNull())
|
||||
.addColumn('operation', 'text', (col) => col.notNull())
|
||||
.addColumn('data', 'blob')
|
||||
.addColumn('created_at', 'text', (col) => col.notNull())
|
||||
.addColumn('created_by', 'text', (col) => col.notNull())
|
||||
.addColumn('server_created_at', 'text')
|
||||
.addColumn('retry_count', 'integer', (col) => col.defaultTo(0))
|
||||
.addColumn('status', 'text', (col) => col.defaultTo('pending'))
|
||||
.addColumn('number', 'integer')
|
||||
.addColumn('version', 'integer')
|
||||
.execute();
|
||||
},
|
||||
down: async (db) => {
|
||||
|
||||
@@ -27,14 +27,15 @@ export type SelectNodePath = Selectable<NodePathTable>;
|
||||
interface NodeTransactionTable {
|
||||
id: ColumnType<string, string, never>;
|
||||
node_id: ColumnType<string, string, never>;
|
||||
type: ColumnType<string, string, never>;
|
||||
node_type: ColumnType<string, string, never>;
|
||||
operation: ColumnType<string, string, never>;
|
||||
data: ColumnType<Uint8Array | null, Uint8Array | null, never>;
|
||||
created_at: ColumnType<string, string, never>;
|
||||
created_by: ColumnType<string, string, never>;
|
||||
server_created_at: ColumnType<string | null, string | null, string | null>;
|
||||
retry_count: ColumnType<number, number, number>;
|
||||
status: ColumnType<string, string, string>;
|
||||
number: ColumnType<bigint | null, bigint | null, bigint | null>;
|
||||
version: ColumnType<bigint | null, bigint | null, bigint | null>;
|
||||
}
|
||||
|
||||
export type SelectNodeTransaction = Selectable<NodeTransactionTable>;
|
||||
|
||||
@@ -1,79 +1,31 @@
|
||||
import { CollaborationAttributes, ServerCollaboration } from '@colanode/core';
|
||||
import { ServerCollaborationRevocation } from '@colanode/core';
|
||||
import { databaseService } from '@/main/data/database-service';
|
||||
import { decodeState, YDoc } from '@colanode/crdt';
|
||||
|
||||
class CollaborationService {
|
||||
public async applyServerCollaboration(
|
||||
public async applyServerCollaborationRevocation(
|
||||
userId: string,
|
||||
collaboration: ServerCollaboration
|
||||
) {
|
||||
if (collaboration.deletedAt) {
|
||||
return this.deleteCollaboration(userId, collaboration);
|
||||
}
|
||||
|
||||
return this.upsertCollaboration(userId, collaboration);
|
||||
}
|
||||
|
||||
private async deleteCollaboration(
|
||||
userId: string,
|
||||
collaboration: ServerCollaboration
|
||||
revocation: ServerCollaborationRevocation
|
||||
) {
|
||||
const workspaceDatabase =
|
||||
await databaseService.getWorkspaceDatabase(userId);
|
||||
|
||||
await workspaceDatabase
|
||||
.deleteFrom('collaborations')
|
||||
.where('user_id', '=', collaboration.userId)
|
||||
.where('node_id', '=', collaboration.nodeId)
|
||||
.execute();
|
||||
}
|
||||
await workspaceDatabase.transaction().execute(async (tx) => {
|
||||
await tx
|
||||
.deleteFrom('nodes')
|
||||
.where('id', '=', revocation.nodeId)
|
||||
.execute();
|
||||
|
||||
private async upsertCollaboration(
|
||||
userId: string,
|
||||
collaboration: ServerCollaboration
|
||||
) {
|
||||
const workspaceDatabase =
|
||||
await databaseService.getWorkspaceDatabase(userId);
|
||||
await tx
|
||||
.deleteFrom('collaborations')
|
||||
.where('user_id', '=', userId)
|
||||
.where('node_id', '=', revocation.nodeId)
|
||||
.execute();
|
||||
|
||||
const existingCollaboration = await workspaceDatabase
|
||||
.selectFrom('collaborations')
|
||||
.selectAll()
|
||||
.where('user_id', '=', userId)
|
||||
.where('node_id', '=', collaboration.nodeId)
|
||||
.executeTakeFirst();
|
||||
|
||||
const ydoc = new YDoc();
|
||||
if (existingCollaboration) {
|
||||
ydoc.applyUpdate(existingCollaboration.state);
|
||||
}
|
||||
|
||||
const state = decodeState(collaboration.state);
|
||||
const number = BigInt(collaboration.number);
|
||||
|
||||
ydoc.applyUpdate(state);
|
||||
const attributes = ydoc.getAttributes<CollaborationAttributes>();
|
||||
const attributesJson = JSON.stringify(attributes);
|
||||
|
||||
await workspaceDatabase
|
||||
.insertInto('collaborations')
|
||||
.values({
|
||||
node_id: collaboration.nodeId,
|
||||
user_id: userId,
|
||||
attributes: attributesJson,
|
||||
state: state,
|
||||
created_at: collaboration.createdAt,
|
||||
number: number,
|
||||
updated_at: collaboration.updatedAt,
|
||||
})
|
||||
.onConflict((b) =>
|
||||
b.columns(['user_id', 'node_id']).doUpdateSet({
|
||||
attributes: attributesJson,
|
||||
state: state,
|
||||
number: number,
|
||||
updated_at: collaboration.updatedAt,
|
||||
})
|
||||
)
|
||||
.execute();
|
||||
await tx
|
||||
.deleteFrom('node_transactions')
|
||||
.where('node_id', '=', revocation.nodeId)
|
||||
.execute();
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -130,7 +130,8 @@ class NodeService {
|
||||
.values({
|
||||
id: transactionId,
|
||||
node_id: inputItem.id,
|
||||
type: 'create',
|
||||
operation: 'create',
|
||||
node_type: inputItem.attributes.type,
|
||||
data: update,
|
||||
created_at: createdAt,
|
||||
created_by: context.userId,
|
||||
@@ -308,7 +309,8 @@ class NodeService {
|
||||
.values({
|
||||
id: transactionId,
|
||||
node_id: nodeId,
|
||||
type: 'update',
|
||||
node_type: node.type,
|
||||
operation: 'update',
|
||||
data: update,
|
||||
created_at: updatedAt,
|
||||
created_by: context.userId,
|
||||
@@ -392,7 +394,8 @@ class NodeService {
|
||||
.values({
|
||||
id: generateId(IdType.Transaction),
|
||||
node_id: nodeId,
|
||||
type: 'delete',
|
||||
node_type: node.type,
|
||||
operation: 'delete',
|
||||
data: null,
|
||||
created_at: new Date().toISOString(),
|
||||
created_by: context.userId,
|
||||
@@ -425,11 +428,11 @@ class NodeService {
|
||||
userId: string,
|
||||
transaction: ServerNodeTransaction
|
||||
) {
|
||||
if (transaction.type === 'create') {
|
||||
if (transaction.operation === 'create') {
|
||||
await this.applyServerCreateTransaction(userId, transaction);
|
||||
} else if (transaction.type === 'update') {
|
||||
} else if (transaction.operation === 'update') {
|
||||
await this.applyServerUpdateTransaction(userId, transaction);
|
||||
} else if (transaction.type === 'delete') {
|
||||
} else if (transaction.operation === 'delete') {
|
||||
await this.applyServerDeleteTransaction(userId, transaction);
|
||||
}
|
||||
}
|
||||
@@ -441,17 +444,17 @@ class NodeService {
|
||||
const workspaceDatabase =
|
||||
await databaseService.getWorkspaceDatabase(userId);
|
||||
|
||||
const number = BigInt(transaction.number);
|
||||
const version = BigInt(transaction.version);
|
||||
const existingTransaction = await workspaceDatabase
|
||||
.selectFrom('node_transactions')
|
||||
.select(['id', 'status', 'number', 'server_created_at'])
|
||||
.select(['id', 'status', 'version', 'server_created_at'])
|
||||
.where('id', '=', transaction.id)
|
||||
.executeTakeFirst();
|
||||
|
||||
if (existingTransaction) {
|
||||
if (
|
||||
existingTransaction.status === 'synced' &&
|
||||
existingTransaction.number === number &&
|
||||
existingTransaction.version === version &&
|
||||
existingTransaction.server_created_at === transaction.serverCreatedAt
|
||||
) {
|
||||
return;
|
||||
@@ -461,7 +464,7 @@ class NodeService {
|
||||
.updateTable('node_transactions')
|
||||
.set({
|
||||
status: 'synced',
|
||||
number,
|
||||
version,
|
||||
server_created_at: transaction.serverCreatedAt,
|
||||
})
|
||||
.where('id', '=', transaction.id)
|
||||
@@ -483,13 +486,14 @@ class NodeService {
|
||||
.values({
|
||||
id: transaction.id,
|
||||
node_id: transaction.nodeId,
|
||||
type: 'create',
|
||||
node_type: transaction.nodeType,
|
||||
operation: 'create',
|
||||
data: decodeState(transaction.data),
|
||||
created_at: transaction.createdAt,
|
||||
created_by: transaction.createdBy,
|
||||
retry_count: 0,
|
||||
status: 'synced',
|
||||
number,
|
||||
version,
|
||||
server_created_at: transaction.serverCreatedAt,
|
||||
})
|
||||
.execute();
|
||||
@@ -525,17 +529,17 @@ class NodeService {
|
||||
const workspaceDatabase =
|
||||
await databaseService.getWorkspaceDatabase(userId);
|
||||
|
||||
const number = BigInt(transaction.number);
|
||||
const version = BigInt(transaction.version);
|
||||
const existingTransaction = await workspaceDatabase
|
||||
.selectFrom('node_transactions')
|
||||
.select(['id', 'status', 'number', 'server_created_at'])
|
||||
.select(['id', 'status', 'version', 'server_created_at'])
|
||||
.where('id', '=', transaction.id)
|
||||
.executeTakeFirst();
|
||||
|
||||
if (existingTransaction) {
|
||||
if (
|
||||
existingTransaction.status === 'synced' &&
|
||||
existingTransaction.number === number &&
|
||||
existingTransaction.version === version &&
|
||||
existingTransaction.server_created_at === transaction.serverCreatedAt
|
||||
) {
|
||||
return;
|
||||
@@ -545,7 +549,7 @@ class NodeService {
|
||||
.updateTable('node_transactions')
|
||||
.set({
|
||||
status: 'synced',
|
||||
number,
|
||||
version,
|
||||
server_created_at: transaction.serverCreatedAt,
|
||||
})
|
||||
.where('id', '=', transaction.id)
|
||||
@@ -580,13 +584,14 @@ class NodeService {
|
||||
.values({
|
||||
id: transaction.id,
|
||||
node_id: transaction.nodeId,
|
||||
type: 'update',
|
||||
node_type: transaction.nodeType,
|
||||
operation: 'update',
|
||||
data: decodeState(transaction.data),
|
||||
created_at: transaction.createdAt,
|
||||
created_by: transaction.createdBy,
|
||||
retry_count: 0,
|
||||
status: 'synced',
|
||||
number,
|
||||
version,
|
||||
server_created_at: transaction.serverCreatedAt,
|
||||
})
|
||||
.execute();
|
||||
|
||||
@@ -49,8 +49,8 @@ export class SocketConnection {
|
||||
const message: Message = JSON.parse(data);
|
||||
if (message.type === 'node_transactions_batch') {
|
||||
syncService.syncServerTransactions(message);
|
||||
} else if (message.type === 'collaborations_batch') {
|
||||
syncService.syncServerCollaborations(message);
|
||||
} else if (message.type === 'collaboration_revocations_batch') {
|
||||
syncService.syncServerRevocations(message);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -4,8 +4,8 @@ import { eventBus } from '@/shared/lib/event-bus';
|
||||
import { httpClient } from '@/shared/lib/http-client';
|
||||
import { serverService } from '@/main/services/server-service';
|
||||
import {
|
||||
CollaborationsBatchMessage,
|
||||
FetchCollaborationsMessage,
|
||||
CollaborationRevocationsBatchMessage,
|
||||
FetchCollaborationRevocationsMessage,
|
||||
FetchNodeTransactionsMessage,
|
||||
LocalNodeTransaction,
|
||||
NodeTransactionsBatchMessage,
|
||||
@@ -26,7 +26,7 @@ class SyncService {
|
||||
private readonly localSyncStates: Map<string, WorkspaceSyncState> = new Map();
|
||||
|
||||
private readonly syncingTransactions: Set<string> = new Set();
|
||||
private readonly syncingCollaborations: Set<string> = new Set();
|
||||
private readonly syncingRevocations: Set<string> = new Set();
|
||||
|
||||
constructor() {
|
||||
eventBus.subscribe((event) => {
|
||||
@@ -49,7 +49,7 @@ class SyncService {
|
||||
for (const workspace of workspaces) {
|
||||
this.syncLocalTransactions(workspace.user_id);
|
||||
this.requireNodeTransactions(workspace.user_id);
|
||||
this.requireCollaborations(workspace.user_id);
|
||||
this.requireCollaborationRevocations(workspace.user_id);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -95,7 +95,7 @@ class SyncService {
|
||||
try {
|
||||
for (const transaction of message.transactions) {
|
||||
await nodeService.applyServerTransaction(message.userId, transaction);
|
||||
cursor = BigInt(transaction.number);
|
||||
cursor = BigInt(transaction.version);
|
||||
}
|
||||
|
||||
if (cursor) {
|
||||
@@ -112,33 +112,35 @@ class SyncService {
|
||||
}
|
||||
}
|
||||
|
||||
public async syncServerCollaborations(message: CollaborationsBatchMessage) {
|
||||
if (this.syncingCollaborations.has(message.userId)) {
|
||||
public async syncServerRevocations(
|
||||
message: CollaborationRevocationsBatchMessage
|
||||
) {
|
||||
if (this.syncingRevocations.has(message.userId)) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.syncingCollaborations.add(message.userId);
|
||||
this.syncingRevocations.add(message.userId);
|
||||
let cursor: bigint | null = null;
|
||||
try {
|
||||
for (const collaboration of message.collaborations) {
|
||||
await collaborationService.applyServerCollaboration(
|
||||
for (const revocation of message.revocations) {
|
||||
await collaborationService.applyServerCollaborationRevocation(
|
||||
message.userId,
|
||||
collaboration
|
||||
revocation
|
||||
);
|
||||
cursor = BigInt(collaboration.number);
|
||||
cursor = BigInt(revocation.version);
|
||||
}
|
||||
|
||||
if (cursor) {
|
||||
this.updateNodeCollaborationCursor(message.userId, cursor);
|
||||
this.updateCollaborationRevocationCursor(message.userId, cursor);
|
||||
}
|
||||
} catch (error) {
|
||||
this.logger.error(
|
||||
error,
|
||||
`Error syncing server collaborations for user ${message.userId}`
|
||||
`Error syncing server revocations for user ${message.userId}`
|
||||
);
|
||||
} finally {
|
||||
this.syncingCollaborations.delete(message.userId);
|
||||
this.requireCollaborations(message.userId);
|
||||
this.syncingRevocations.delete(message.userId);
|
||||
this.requireCollaborationRevocations(message.userId);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -232,7 +234,7 @@ class SyncService {
|
||||
'w.user_id',
|
||||
'w.workspace_id',
|
||||
'w.account_id',
|
||||
'wc.node_transactions',
|
||||
'wc.transactions',
|
||||
])
|
||||
.where('w.user_id', '=', userId)
|
||||
.executeTakeFirst();
|
||||
@@ -245,22 +247,17 @@ class SyncService {
|
||||
type: 'fetch_node_transactions',
|
||||
userId: workspaceWithCursor.user_id,
|
||||
workspaceId: workspaceWithCursor.workspace_id,
|
||||
cursor: workspaceWithCursor.node_transactions?.toString() ?? null,
|
||||
cursor: workspaceWithCursor.transactions?.toString() ?? '0',
|
||||
};
|
||||
|
||||
socketService.sendMessage(workspaceWithCursor.account_id, message);
|
||||
}
|
||||
|
||||
private async requireCollaborations(userId: string) {
|
||||
private async requireCollaborationRevocations(userId: string) {
|
||||
const workspaceWithCursor = await databaseService.appDatabase
|
||||
.selectFrom('workspaces as w')
|
||||
.leftJoin('workspace_cursors as wc', 'w.user_id', 'wc.user_id')
|
||||
.select([
|
||||
'w.user_id',
|
||||
'w.workspace_id',
|
||||
'w.account_id',
|
||||
'wc.collaborations',
|
||||
])
|
||||
.select(['w.user_id', 'w.workspace_id', 'w.account_id', 'wc.revocations'])
|
||||
.where('w.user_id', '=', userId)
|
||||
.executeTakeFirst();
|
||||
|
||||
@@ -268,11 +265,11 @@ class SyncService {
|
||||
return;
|
||||
}
|
||||
|
||||
const message: FetchCollaborationsMessage = {
|
||||
type: 'fetch_collaborations',
|
||||
const message: FetchCollaborationRevocationsMessage = {
|
||||
type: 'fetch_collaboration_revocations',
|
||||
userId: workspaceWithCursor.user_id,
|
||||
workspaceId: workspaceWithCursor.workspace_id,
|
||||
cursor: workspaceWithCursor.collaborations?.toString() ?? null,
|
||||
cursor: workspaceWithCursor.revocations?.toString() ?? '0',
|
||||
};
|
||||
|
||||
socketService.sendMessage(workspaceWithCursor.account_id, message);
|
||||
@@ -283,31 +280,34 @@ class SyncService {
|
||||
.insertInto('workspace_cursors')
|
||||
.values({
|
||||
user_id: userId,
|
||||
node_transactions: cursor,
|
||||
collaborations: 0n,
|
||||
transactions: cursor,
|
||||
revocations: 0n,
|
||||
created_at: new Date().toISOString(),
|
||||
})
|
||||
.onConflict((eb) =>
|
||||
eb.column('user_id').doUpdateSet({
|
||||
node_transactions: cursor,
|
||||
transactions: cursor,
|
||||
updated_at: new Date().toISOString(),
|
||||
})
|
||||
)
|
||||
.execute();
|
||||
}
|
||||
|
||||
private async updateNodeCollaborationCursor(userId: string, cursor: bigint) {
|
||||
private async updateCollaborationRevocationCursor(
|
||||
userId: string,
|
||||
cursor: bigint
|
||||
) {
|
||||
await databaseService.appDatabase
|
||||
.insertInto('workspace_cursors')
|
||||
.values({
|
||||
user_id: userId,
|
||||
collaborations: cursor,
|
||||
node_transactions: 0n,
|
||||
revocations: cursor,
|
||||
transactions: 0n,
|
||||
created_at: new Date().toISOString(),
|
||||
})
|
||||
.onConflict((eb) =>
|
||||
eb.column('user_id').doUpdateSet({
|
||||
collaborations: cursor,
|
||||
revocations: cursor,
|
||||
updated_at: new Date().toISOString(),
|
||||
})
|
||||
)
|
||||
|
||||
@@ -140,33 +140,36 @@ export const mapWorkspace = (row: SelectWorkspace): Workspace => {
|
||||
export const mapTransaction = (
|
||||
row: SelectNodeTransaction
|
||||
): LocalNodeTransaction => {
|
||||
if (row.type === 'create' && row.data) {
|
||||
if (row.operation === 'create' && row.data) {
|
||||
return {
|
||||
id: row.id,
|
||||
nodeId: row.node_id,
|
||||
type: row.type,
|
||||
nodeType: row.node_type,
|
||||
operation: 'create',
|
||||
data: encodeState(row.data),
|
||||
createdAt: row.created_at,
|
||||
createdBy: row.created_by,
|
||||
};
|
||||
}
|
||||
|
||||
if (row.type === 'update' && row.data) {
|
||||
if (row.operation === 'update' && row.data) {
|
||||
return {
|
||||
id: row.id,
|
||||
nodeId: row.node_id,
|
||||
type: row.type,
|
||||
nodeType: row.node_type,
|
||||
operation: 'update',
|
||||
data: encodeState(row.data),
|
||||
createdAt: row.created_at,
|
||||
createdBy: row.created_by,
|
||||
};
|
||||
}
|
||||
|
||||
if (row.type === 'delete') {
|
||||
if (row.operation === 'delete') {
|
||||
return {
|
||||
id: row.id,
|
||||
nodeId: row.node_id,
|
||||
type: row.type,
|
||||
nodeType: row.node_type,
|
||||
operation: 'delete',
|
||||
createdAt: row.created_at,
|
||||
createdBy: row.created_by,
|
||||
};
|
||||
|
||||
@@ -125,7 +125,7 @@ const createNodesTable: Migration = {
|
||||
const createNodeTransactionsTable: Migration = {
|
||||
up: async (db) => {
|
||||
await sql`
|
||||
CREATE SEQUENCE IF NOT EXISTS node_transactions_number_seq
|
||||
CREATE SEQUENCE IF NOT EXISTS node_transactions_version_seq
|
||||
START WITH 1000000000
|
||||
INCREMENT BY 1
|
||||
NO MINVALUE
|
||||
@@ -138,19 +138,22 @@ const createNodeTransactionsTable: Migration = {
|
||||
.addColumn('id', 'varchar(30)', (col) => col.notNull().primaryKey())
|
||||
.addColumn('workspace_id', 'varchar(30)', (col) => col.notNull())
|
||||
.addColumn('node_id', 'varchar(30)', (col) => col.notNull())
|
||||
.addColumn('type', 'varchar(30)', (col) => col.notNull())
|
||||
.addColumn('node_type', 'varchar(30)', (col) => col.notNull())
|
||||
.addColumn('operation', 'varchar(30)', (col) => col.notNull())
|
||||
.addColumn('data', 'bytea')
|
||||
.addColumn('created_at', 'timestamptz', (col) => col.notNull())
|
||||
.addColumn('created_by', 'varchar(30)', (col) => col.notNull())
|
||||
.addColumn('server_created_at', 'timestamptz', (col) => col.notNull())
|
||||
.addColumn('number', 'bigint', (col) =>
|
||||
col.notNull().defaultTo(sql`nextval('node_transactions_number_seq')`)
|
||||
.addColumn('version', 'bigint', (col) =>
|
||||
col.notNull().defaultTo(sql`nextval('node_transactions_version_seq')`)
|
||||
)
|
||||
.execute();
|
||||
},
|
||||
down: async (db) => {
|
||||
await db.schema.dropTable('node_transactions').execute();
|
||||
await sql`DROP SEQUENCE IF EXISTS node_transactions_number_seq`.execute(db);
|
||||
await sql`DROP SEQUENCE IF EXISTS node_transactions_version_seq`.execute(
|
||||
db
|
||||
);
|
||||
},
|
||||
};
|
||||
|
||||
@@ -229,9 +232,27 @@ const createNodePathsTable: Migration = {
|
||||
};
|
||||
|
||||
const createCollaborationsTable: Migration = {
|
||||
up: async (db) => {
|
||||
await db.schema
|
||||
.createTable('collaborations')
|
||||
.addColumn('user_id', 'varchar(30)', (col) => col.notNull())
|
||||
.addColumn('node_id', 'varchar(30)', (col) => col.notNull())
|
||||
.addColumn('workspace_id', 'varchar(30)', (col) => col.notNull())
|
||||
.addColumn('roles', 'jsonb', (col) => col.notNull().defaultTo('{}'))
|
||||
.addColumn('created_at', 'timestamptz', (col) => col.notNull())
|
||||
.addColumn('updated_at', 'timestamptz')
|
||||
.addPrimaryKeyConstraint('collaborations_pkey', ['user_id', 'node_id'])
|
||||
.execute();
|
||||
},
|
||||
down: async (db) => {
|
||||
await db.schema.dropTable('collaborations').execute();
|
||||
},
|
||||
};
|
||||
|
||||
const createCollaborationRevocationsTable: Migration = {
|
||||
up: async (db) => {
|
||||
await sql`
|
||||
CREATE SEQUENCE IF NOT EXISTS collaboration_number_seq
|
||||
CREATE SEQUENCE IF NOT EXISTS collaboration_revocations_version_seq
|
||||
START WITH 1000000000
|
||||
INCREMENT BY 1
|
||||
NO MINVALUE
|
||||
@@ -240,48 +261,47 @@ const createCollaborationsTable: Migration = {
|
||||
`.execute(db);
|
||||
|
||||
await db.schema
|
||||
.createTable('collaborations')
|
||||
.createTable('collaboration_revocations')
|
||||
.addColumn('user_id', 'varchar(30)', (col) => col.notNull())
|
||||
.addColumn('node_id', 'varchar(30)', (col) => col.notNull())
|
||||
.addColumn('type', 'varchar(30)', (col) =>
|
||||
col.generatedAlwaysAs(sql`(attributes->>'type')::VARCHAR(30)`).stored()
|
||||
)
|
||||
.addColumn('workspace_id', 'varchar(30)', (col) => col.notNull())
|
||||
.addColumn('attributes', 'jsonb', (col) => col.notNull())
|
||||
.addColumn('state', 'bytea', (col) => col.notNull())
|
||||
.addColumn('created_at', 'timestamptz', (col) => col.notNull())
|
||||
.addColumn('updated_at', 'timestamptz')
|
||||
.addColumn('deleted_at', 'timestamptz')
|
||||
.addColumn('number', 'bigint', (col) =>
|
||||
col.notNull().defaultTo(sql`nextval('collaboration_number_seq')`)
|
||||
.addColumn('version', 'bigint', (col) =>
|
||||
col
|
||||
.notNull()
|
||||
.defaultTo(sql`nextval('collaboration_revocations_version_seq')`)
|
||||
)
|
||||
.addPrimaryKeyConstraint('collaborations_pkey', ['user_id', 'node_id'])
|
||||
.addPrimaryKeyConstraint('collaboration_revocations_pkey', [
|
||||
'user_id',
|
||||
'node_id',
|
||||
])
|
||||
.execute();
|
||||
|
||||
// Add trigger to update number on each update
|
||||
await sql`
|
||||
CREATE OR REPLACE FUNCTION fn_update_collaboration_number() RETURNS TRIGGER AS $$
|
||||
CREATE OR REPLACE FUNCTION handle_collaboration_update() RETURNS TRIGGER AS $$
|
||||
BEGIN
|
||||
NEW.number = nextval('collaboration_number_seq');
|
||||
RETURN NEW;
|
||||
IF NEW.roles = '{}' THEN
|
||||
DELETE FROM collaborations WHERE user_id = NEW.user_id AND node_id = NEW.node_id;
|
||||
INSERT INTO collaboration_revocations (user_id, node_id, workspace_id, created_at)
|
||||
VALUES (NEW.user_id, NEW.node_id, NEW.workspace_id, NOW());
|
||||
ELSE
|
||||
DELETE FROM collaboration_revocations WHERE user_id = NEW.user_id AND node_id = NEW.node_id;
|
||||
END IF;
|
||||
RETURN NULL;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
CREATE TRIGGER trg_update_collaboration_number
|
||||
BEFORE UPDATE ON collaborations
|
||||
FOR EACH ROW
|
||||
EXECUTE FUNCTION fn_update_collaboration_number();
|
||||
CREATE TRIGGER after_collaboration_insert
|
||||
AFTER INSERT ON collaborations
|
||||
FOR EACH ROW EXECUTE FUNCTION handle_collaboration_update();
|
||||
|
||||
CREATE TRIGGER after_collaboration_update
|
||||
AFTER UPDATE ON collaborations
|
||||
FOR EACH ROW EXECUTE FUNCTION handle_collaboration_update();
|
||||
`.execute(db);
|
||||
},
|
||||
down: async (db) => {
|
||||
// Drop trigger and function first
|
||||
await sql`
|
||||
DROP TRIGGER IF EXISTS trg_update_collaboration_number ON collaborations;
|
||||
DROP FUNCTION IF EXISTS fn_update_collaboration_number();
|
||||
DROP SEQUENCE IF EXISTS collaboration_number_seq;
|
||||
`.execute(db);
|
||||
|
||||
await db.schema.dropTable('collaborations').execute();
|
||||
await db.schema.dropTable('collaboration_revocations').execute();
|
||||
},
|
||||
};
|
||||
|
||||
@@ -317,5 +337,7 @@ export const databaseMigrations: Record<string, Migration> = {
|
||||
'00006_create_node_transactions_table': createNodeTransactionsTable,
|
||||
'00007_create_node_paths_table': createNodePathsTable,
|
||||
'00008_create_collaborations_table': createCollaborationsTable,
|
||||
'00009_create_uploads_table': createUploadsTable,
|
||||
'00009_create_collaboration_revocations_table':
|
||||
createCollaborationRevocationsTable,
|
||||
'00010_create_uploads_table': createUploadsTable,
|
||||
};
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import { NodeAttributes, WorkspaceRole } from '@colanode/core';
|
||||
import { NodeAttributes, NodeRole, WorkspaceRole } from '@colanode/core';
|
||||
import {
|
||||
ColumnType,
|
||||
Insertable,
|
||||
@@ -101,13 +101,14 @@ export type UpdateNode = Updateable<NodeTable>;
|
||||
interface NodeTransactionTable {
|
||||
id: ColumnType<string, string, never>;
|
||||
node_id: ColumnType<string, string, never>;
|
||||
node_type: ColumnType<string, string, never>;
|
||||
operation: ColumnType<string, string, never>;
|
||||
workspace_id: ColumnType<string, string, never>;
|
||||
type: ColumnType<string, string, never>;
|
||||
data: ColumnType<Uint8Array | null, Uint8Array | null, Uint8Array | null>;
|
||||
created_at: ColumnType<Date, Date, never>;
|
||||
created_by: ColumnType<string, string, never>;
|
||||
server_created_at: ColumnType<Date, Date, never>;
|
||||
number: ColumnType<bigint, never, never>;
|
||||
version: ColumnType<bigint, never, never>;
|
||||
}
|
||||
|
||||
export type SelectNodeTransaction = Selectable<NodeTransactionTable>;
|
||||
@@ -119,18 +120,30 @@ interface CollaborationTable {
|
||||
node_id: ColumnType<string, string, never>;
|
||||
type: ColumnType<string, never, never>;
|
||||
workspace_id: ColumnType<string, string, never>;
|
||||
attributes: JSONColumnType<any, string, string>;
|
||||
state: ColumnType<Uint8Array, Uint8Array, Uint8Array>;
|
||||
roles: JSONColumnType<Record<string, NodeRole>, string, string>;
|
||||
created_at: ColumnType<Date, Date, never>;
|
||||
updated_at: ColumnType<Date | null, Date | null, Date>;
|
||||
deleted_at: ColumnType<Date | null, Date | null, Date | null>;
|
||||
number: ColumnType<bigint, never, never>;
|
||||
}
|
||||
|
||||
export type SelectCollaboration = Selectable<CollaborationTable>;
|
||||
export type CreateCollaboration = Insertable<CollaborationTable>;
|
||||
export type UpdateCollaboration = Updateable<CollaborationTable>;
|
||||
|
||||
interface CollaborationRevocationTable {
|
||||
user_id: ColumnType<string, never, never>;
|
||||
node_id: ColumnType<string, never, never>;
|
||||
workspace_id: ColumnType<string, string, never>;
|
||||
created_at: ColumnType<Date, never, never>;
|
||||
version: ColumnType<bigint, never, never>;
|
||||
}
|
||||
|
||||
export type SelectCollaborationRevocation =
|
||||
Selectable<CollaborationRevocationTable>;
|
||||
export type CreateCollaborationRevocation =
|
||||
Insertable<CollaborationRevocationTable>;
|
||||
export type UpdateCollaborationRevocation =
|
||||
Updateable<CollaborationRevocationTable>;
|
||||
|
||||
interface NodePathTable {
|
||||
ancestor_id: ColumnType<string, string, never>;
|
||||
descendant_id: ColumnType<string, string, never>;
|
||||
@@ -163,6 +176,7 @@ export interface DatabaseSchema {
|
||||
nodes: NodeTable;
|
||||
node_transactions: NodeTransactionTable;
|
||||
collaborations: CollaborationTable;
|
||||
collaboration_revocations: CollaborationRevocationTable;
|
||||
node_paths: NodePathTable;
|
||||
uploads: UploadTable;
|
||||
}
|
||||
|
||||
@@ -1,77 +0,0 @@
|
||||
import { database } from '@/data/database';
|
||||
import { CreateCollaboration } from '@/data/schema';
|
||||
import { JobHandler } from '@/types/jobs';
|
||||
import { buildDefaultCollaboration } from '@/lib/collaborations';
|
||||
import { NodeType } from '@colanode/core';
|
||||
import { eventBus } from '@/lib/event-bus';
|
||||
|
||||
export type CreateCollaborationsInput = {
|
||||
type: 'create_collaborations';
|
||||
userId: string;
|
||||
nodeId: string;
|
||||
workspaceId: string;
|
||||
};
|
||||
|
||||
declare module '@/types/jobs' {
|
||||
interface JobMap {
|
||||
create_collaborations: {
|
||||
input: CreateCollaborationsInput;
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
export const createCollaborationsHandler: JobHandler<
|
||||
CreateCollaborationsInput
|
||||
> = async (input) => {
|
||||
const nodeRow = await database
|
||||
.selectFrom('nodes')
|
||||
.where('id', '=', input.nodeId)
|
||||
.select(['id', 'type'])
|
||||
.executeTakeFirst();
|
||||
|
||||
if (!nodeRow) {
|
||||
return;
|
||||
}
|
||||
|
||||
const descendants = await database
|
||||
.selectFrom('node_paths')
|
||||
.where('ancestor_id', '=', input.nodeId)
|
||||
.selectAll()
|
||||
.execute();
|
||||
|
||||
if (descendants.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
const collaborationsToCreate: CreateCollaboration[] = [];
|
||||
for (const descendant of descendants) {
|
||||
collaborationsToCreate.push(
|
||||
buildDefaultCollaboration(
|
||||
input.userId,
|
||||
descendant.descendant_id,
|
||||
nodeRow.type as NodeType,
|
||||
input.workspaceId
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
const createdCollaborations = await database
|
||||
.insertInto('collaborations')
|
||||
.returning(['node_id', 'user_id', 'workspace_id'])
|
||||
.values(collaborationsToCreate)
|
||||
.onConflict((b) =>
|
||||
b.columns(['node_id', 'user_id']).doUpdateSet({
|
||||
deleted_at: null,
|
||||
})
|
||||
)
|
||||
.execute();
|
||||
|
||||
for (const createdCollaboration of createdCollaborations) {
|
||||
eventBus.publish({
|
||||
type: 'collaboration_created',
|
||||
nodeId: createdCollaboration.node_id,
|
||||
userId: createdCollaboration.user_id,
|
||||
workspaceId: createdCollaboration.workspace_id,
|
||||
});
|
||||
}
|
||||
};
|
||||
@@ -1,102 +0,0 @@
|
||||
import { database } from '@/data/database';
|
||||
import { eventBus } from '@/lib/event-bus';
|
||||
import { JobHandler } from '@/types/jobs';
|
||||
import { extractNodeCollaborators, NodeAttributes } from '@colanode/core';
|
||||
|
||||
export type DeleteCollaborationsInput = {
|
||||
type: 'delete_collaborations';
|
||||
nodeId: string;
|
||||
userId: string;
|
||||
workspaceId: string;
|
||||
};
|
||||
|
||||
declare module '@/types/jobs' {
|
||||
interface JobMap {
|
||||
delete_collaborations: {
|
||||
input: DeleteCollaborationsInput;
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
export const deleteCollaborationsHandler: JobHandler<
|
||||
DeleteCollaborationsInput
|
||||
> = async (input) => {
|
||||
const updatedCollaboration = await database
|
||||
.updateTable('collaborations')
|
||||
.returning(['node_id', 'user_id'])
|
||||
.where('node_id', '=', input.nodeId)
|
||||
.where('user_id', '=', input.userId)
|
||||
.set({ deleted_at: new Date() })
|
||||
.executeTakeFirst();
|
||||
|
||||
if (updatedCollaboration) {
|
||||
eventBus.publish({
|
||||
type: 'collaboration_updated',
|
||||
nodeId: updatedCollaboration.node_id,
|
||||
userId: updatedCollaboration.user_id,
|
||||
workspaceId: input.workspaceId,
|
||||
});
|
||||
}
|
||||
|
||||
await checkChildCollaborations(input.nodeId, input.userId, input.workspaceId);
|
||||
};
|
||||
|
||||
const checkChildCollaborations = async (
|
||||
parentId: string,
|
||||
userId: string,
|
||||
workspaceId: string
|
||||
) => {
|
||||
let lastId = parentId;
|
||||
|
||||
const parentIdsToCheck: string[] = [];
|
||||
const nodeIdsToDelete: string[] = [];
|
||||
while (true) {
|
||||
const children = await database
|
||||
.selectFrom('nodes')
|
||||
.select(['id', 'type', 'attributes'])
|
||||
.where('parent_id', '=', parentId)
|
||||
.where('id', '>', lastId)
|
||||
.orderBy('id', 'asc')
|
||||
.limit(100)
|
||||
.execute();
|
||||
|
||||
for (const child of children) {
|
||||
const collaborators = extractNodeCollaborators(child.attributes);
|
||||
if (!collaborators[userId]) {
|
||||
nodeIdsToDelete.push(child.id);
|
||||
}
|
||||
|
||||
parentIdsToCheck.push(child.id);
|
||||
lastId = child.id;
|
||||
}
|
||||
|
||||
if (children.length < 100) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (nodeIdsToDelete.length > 0) {
|
||||
const updatedCollaborations = await database
|
||||
.updateTable('collaborations')
|
||||
.returning(['node_id', 'user_id'])
|
||||
.where('node_id', 'in', nodeIdsToDelete)
|
||||
.where('user_id', '=', userId)
|
||||
.set({ deleted_at: new Date() })
|
||||
.execute();
|
||||
|
||||
for (const updatedCollaboration of updatedCollaborations) {
|
||||
eventBus.publish({
|
||||
type: 'collaboration_updated',
|
||||
nodeId: updatedCollaboration.node_id,
|
||||
userId: updatedCollaboration.user_id,
|
||||
workspaceId,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
if (parentIdsToCheck.length > 0) {
|
||||
for (const parentId of parentIdsToCheck) {
|
||||
await checkChildCollaborations(parentId, userId, workspaceId);
|
||||
}
|
||||
}
|
||||
};
|
||||
@@ -3,8 +3,6 @@ import { JobMap } from '@/types/jobs';
|
||||
|
||||
import { sendEmailHandler } from '@/jobs/send-email';
|
||||
import { cleanWorkspaceDataHandler } from '@/jobs/clean-workspace-data';
|
||||
import { createCollaborationsHandler } from '@/jobs/create-collaborations';
|
||||
import { deleteCollaborationsHandler } from '@/jobs/delete-collaborations';
|
||||
|
||||
type JobHandlerMap = {
|
||||
[K in keyof JobMap]: JobHandler<JobMap[K]['input']>;
|
||||
@@ -13,6 +11,4 @@ type JobHandlerMap = {
|
||||
export const jobHandlerMap: JobHandlerMap = {
|
||||
send_email: sendEmailHandler,
|
||||
clean_workspace_data: cleanWorkspaceDataHandler,
|
||||
create_collaborations: createCollaborationsHandler,
|
||||
delete_collaborations: deleteCollaborationsHandler,
|
||||
};
|
||||
|
||||
@@ -1,28 +0,0 @@
|
||||
import { CreateCollaboration } from '@/data/schema';
|
||||
import { CollaborationAttributes, NodeType, registry } from '@colanode/core';
|
||||
import { YDoc } from '@colanode/crdt';
|
||||
|
||||
export const buildDefaultCollaboration = (
|
||||
userId: string,
|
||||
nodeId: string,
|
||||
type: NodeType,
|
||||
workspaceId: string
|
||||
): CreateCollaboration => {
|
||||
const model = registry.getCollaborationModel(type);
|
||||
|
||||
const attributes: CollaborationAttributes = {
|
||||
type,
|
||||
};
|
||||
|
||||
const ydoc = new YDoc();
|
||||
ydoc.updateAttributes(model.schema, attributes);
|
||||
|
||||
return {
|
||||
user_id: userId,
|
||||
node_id: nodeId,
|
||||
workspace_id: workspaceId,
|
||||
attributes: JSON.stringify(ydoc.getAttributes()),
|
||||
state: ydoc.getState(),
|
||||
created_at: new Date(),
|
||||
};
|
||||
};
|
||||
@@ -1,13 +1,13 @@
|
||||
import { database } from '@/data/database';
|
||||
import {
|
||||
SelectCollaboration,
|
||||
SelectCollaborationRevocation,
|
||||
SelectNode,
|
||||
SelectNodeTransaction,
|
||||
} from '@/data/schema';
|
||||
import { NodeCollaborator } from '@/types/nodes';
|
||||
import {
|
||||
NodeOutput,
|
||||
ServerCollaboration,
|
||||
ServerCollaborationRevocation,
|
||||
ServerNodeTransaction,
|
||||
} from '@colanode/core';
|
||||
import {
|
||||
@@ -51,63 +51,62 @@ export const mapNode = (node: SelectNode): Node => {
|
||||
export const mapNodeTransaction = (
|
||||
transaction: SelectNodeTransaction
|
||||
): ServerNodeTransaction => {
|
||||
if (transaction.type === 'create' && transaction.data) {
|
||||
if (transaction.operation === 'create' && transaction.data) {
|
||||
return {
|
||||
id: transaction.id,
|
||||
type: 'create',
|
||||
operation: 'create',
|
||||
nodeId: transaction.node_id,
|
||||
nodeType: transaction.node_type,
|
||||
workspaceId: transaction.workspace_id,
|
||||
data: encodeState(transaction.data),
|
||||
createdAt: transaction.created_at.toISOString(),
|
||||
createdBy: transaction.created_by,
|
||||
serverCreatedAt: transaction.server_created_at.toISOString(),
|
||||
number: transaction.number.toString(),
|
||||
version: transaction.version.toString(),
|
||||
};
|
||||
}
|
||||
|
||||
if (transaction.type === 'update' && transaction.data) {
|
||||
if (transaction.operation === 'update' && transaction.data) {
|
||||
return {
|
||||
id: transaction.id,
|
||||
type: 'update',
|
||||
operation: 'update',
|
||||
nodeId: transaction.node_id,
|
||||
nodeType: transaction.node_type,
|
||||
workspaceId: transaction.workspace_id,
|
||||
data: encodeState(transaction.data),
|
||||
createdAt: transaction.created_at.toISOString(),
|
||||
createdBy: transaction.created_by,
|
||||
serverCreatedAt: transaction.server_created_at.toISOString(),
|
||||
number: transaction.number.toString(),
|
||||
version: transaction.version.toString(),
|
||||
};
|
||||
}
|
||||
|
||||
if (transaction.type === 'delete') {
|
||||
if (transaction.operation === 'delete') {
|
||||
return {
|
||||
id: transaction.id,
|
||||
type: 'delete',
|
||||
operation: 'delete',
|
||||
nodeId: transaction.node_id,
|
||||
nodeType: transaction.node_type,
|
||||
workspaceId: transaction.workspace_id,
|
||||
createdAt: transaction.created_at.toISOString(),
|
||||
createdBy: transaction.created_by,
|
||||
serverCreatedAt: transaction.server_created_at.toISOString(),
|
||||
number: transaction.number.toString(),
|
||||
version: transaction.version.toString(),
|
||||
};
|
||||
}
|
||||
|
||||
throw new Error('Unknown transaction type');
|
||||
};
|
||||
|
||||
export const mapCollaboration = (
|
||||
collaboration: SelectCollaboration
|
||||
): ServerCollaboration => {
|
||||
export const mapCollaborationRevocation = (
|
||||
revocation: SelectCollaborationRevocation
|
||||
): ServerCollaborationRevocation => {
|
||||
return {
|
||||
userId: collaboration.user_id,
|
||||
nodeId: collaboration.node_id,
|
||||
type: collaboration.type,
|
||||
workspaceId: collaboration.workspace_id,
|
||||
state: encodeState(collaboration.state),
|
||||
createdAt: collaboration.created_at.toISOString(),
|
||||
updatedAt: collaboration.updated_at?.toISOString() ?? null,
|
||||
deletedAt: collaboration.deleted_at?.toISOString() ?? null,
|
||||
number: collaboration.number.toString(),
|
||||
userId: revocation.user_id,
|
||||
nodeId: revocation.node_id,
|
||||
workspaceId: revocation.workspace_id,
|
||||
createdAt: revocation.created_at.toISOString(),
|
||||
version: revocation.version.toString(),
|
||||
};
|
||||
};
|
||||
|
||||
|
||||
@@ -71,11 +71,11 @@ const handleLocalNodeTransaction = async (
|
||||
workspaceUser: SelectWorkspaceUser,
|
||||
transaction: LocalNodeTransaction
|
||||
): Promise<SyncNodeTransactionStatus> => {
|
||||
if (transaction.type === 'create') {
|
||||
if (transaction.operation === 'create') {
|
||||
return await handleCreateNodeTransaction(workspaceUser, transaction);
|
||||
} else if (transaction.type === 'update') {
|
||||
} else if (transaction.operation === 'update') {
|
||||
return await handleUpdateNodeTransaction(workspaceUser, transaction);
|
||||
} else if (transaction.type === 'delete') {
|
||||
} else if (transaction.operation === 'delete') {
|
||||
return await handleDeleteNodeTransaction(workspaceUser, transaction);
|
||||
} else {
|
||||
return 'error';
|
||||
|
||||
@@ -2,9 +2,9 @@ import {
|
||||
extractNodeCollaborators,
|
||||
generateId,
|
||||
IdType,
|
||||
Node,
|
||||
NodeAttributes,
|
||||
NodeMutationContext,
|
||||
NodeRole,
|
||||
registry,
|
||||
} from '@colanode/core';
|
||||
import { decodeState, YDoc } from '@colanode/crdt';
|
||||
@@ -12,11 +12,11 @@ import {
|
||||
CreateCollaboration,
|
||||
CreateNode,
|
||||
CreateNodeTransaction,
|
||||
SelectCollaboration,
|
||||
DatabaseSchema,
|
||||
SelectWorkspaceUser,
|
||||
} from '@/data/schema';
|
||||
import { cloneDeep, difference } from 'lodash-es';
|
||||
import { fetchWorkspaceUsers, mapNode } from '@/lib/nodes';
|
||||
import { cloneDeep } from 'lodash-es';
|
||||
import { mapNode } from '@/lib/nodes';
|
||||
import { database } from '@/data/database';
|
||||
import { fetchNodeAncestors } from '@/lib/nodes';
|
||||
import {
|
||||
@@ -31,10 +31,9 @@ import {
|
||||
UpdateNodeInput,
|
||||
UpdateNodeOutput,
|
||||
} from '@/types/nodes';
|
||||
import { buildDefaultCollaboration } from '@/lib/collaborations';
|
||||
import { eventBus } from '@/lib/event-bus';
|
||||
import { logService } from '@/services/log';
|
||||
import { jobService } from '@/services/job-service';
|
||||
import { sql, Transaction } from 'kysely';
|
||||
|
||||
const UPDATE_RETRIES_LIMIT = 10;
|
||||
|
||||
@@ -44,8 +43,9 @@ type UpdateResult<T> = {
|
||||
};
|
||||
|
||||
type CollaboratorChangeResult = {
|
||||
removedCollaborators: string[];
|
||||
addedCollaborators: string[];
|
||||
addedCollaborators: Record<string, NodeRole>;
|
||||
updatedCollaborators: Record<string, NodeRole>;
|
||||
removedCollaborators: Record<string, NodeRole>;
|
||||
};
|
||||
|
||||
class NodeService {
|
||||
@@ -57,7 +57,8 @@ class NodeService {
|
||||
const model = registry.getNodeModel(input.attributes.type);
|
||||
const ydoc = new YDoc();
|
||||
const update = ydoc.updateAttributes(model.schema, input.attributes);
|
||||
const attributesJson = JSON.stringify(ydoc.getAttributes<NodeAttributes>());
|
||||
const attributes = ydoc.getAttributes<NodeAttributes>();
|
||||
const attributesJson = JSON.stringify(attributes);
|
||||
|
||||
const date = new Date();
|
||||
const transactionId = generateId(IdType.Transaction);
|
||||
@@ -74,27 +75,21 @@ class NodeService {
|
||||
const createTransaction: CreateNodeTransaction = {
|
||||
id: transactionId,
|
||||
node_id: input.nodeId,
|
||||
node_type: input.attributes.type,
|
||||
workspace_id: input.workspaceId,
|
||||
type: 'create',
|
||||
operation: 'create',
|
||||
data: update,
|
||||
created_at: date,
|
||||
created_by: input.userId,
|
||||
server_created_at: date,
|
||||
};
|
||||
|
||||
const createCollaborations = await this.buildCollaborations(
|
||||
input.nodeId,
|
||||
input.workspaceId,
|
||||
input.attributes,
|
||||
input.ancestors
|
||||
);
|
||||
|
||||
try {
|
||||
const { createdNode, createdTransaction, createdCollaborations } =
|
||||
const { createdNode, createdTransaction } =
|
||||
await this.applyDatabaseCreateTransaction(
|
||||
attributes,
|
||||
createNode,
|
||||
createTransaction,
|
||||
createCollaborations
|
||||
createTransaction
|
||||
);
|
||||
|
||||
eventBus.publish({
|
||||
@@ -104,19 +99,9 @@ class NodeService {
|
||||
workspaceId: input.workspaceId,
|
||||
});
|
||||
|
||||
for (const collaboration of createdCollaborations) {
|
||||
eventBus.publish({
|
||||
type: 'collaboration_created',
|
||||
userId: collaboration.user_id,
|
||||
nodeId: collaboration.node_id,
|
||||
workspaceId: collaboration.workspace_id,
|
||||
});
|
||||
}
|
||||
|
||||
return {
|
||||
node: createdNode,
|
||||
transaction: createdTransaction,
|
||||
createdCollaborations: createdCollaborations,
|
||||
};
|
||||
} catch (error) {
|
||||
this.logger.error(error, 'Failed to create node transaction');
|
||||
@@ -187,12 +172,10 @@ class NodeService {
|
||||
const date = new Date();
|
||||
const transactionId = generateId(IdType.Transaction);
|
||||
|
||||
const { addedCollaborators, removedCollaborators } =
|
||||
this.checkCollaboratorChanges(
|
||||
node,
|
||||
ancestors.filter((a) => a.id !== input.nodeId),
|
||||
attributes
|
||||
);
|
||||
const collaboratorChanges = this.checkCollaboratorChanges(
|
||||
node.attributes,
|
||||
attributes
|
||||
);
|
||||
|
||||
try {
|
||||
const { updatedNode, createdTransaction } = await database
|
||||
@@ -221,8 +204,9 @@ class NodeService {
|
||||
.values({
|
||||
id: transactionId,
|
||||
node_id: input.nodeId,
|
||||
node_type: node.type,
|
||||
workspace_id: input.workspaceId,
|
||||
type: 'update',
|
||||
operation: 'update',
|
||||
data: update,
|
||||
created_at: date,
|
||||
created_by: input.userId,
|
||||
@@ -234,6 +218,12 @@ class NodeService {
|
||||
throw new Error('Failed to create transaction');
|
||||
}
|
||||
|
||||
await this.applyCollaborationUpdates(
|
||||
trx,
|
||||
input.nodeId,
|
||||
collaboratorChanges
|
||||
);
|
||||
|
||||
return {
|
||||
updatedNode,
|
||||
createdTransaction,
|
||||
@@ -247,24 +237,6 @@ class NodeService {
|
||||
workspaceId: input.workspaceId,
|
||||
});
|
||||
|
||||
for (const addedCollaborator of addedCollaborators) {
|
||||
jobService.addJob({
|
||||
type: 'create_collaborations',
|
||||
nodeId: input.nodeId,
|
||||
userId: addedCollaborator,
|
||||
workspaceId: input.workspaceId,
|
||||
});
|
||||
}
|
||||
|
||||
for (const removedCollaborator of removedCollaborators) {
|
||||
jobService.addJob({
|
||||
type: 'delete_collaborations',
|
||||
nodeId: input.nodeId,
|
||||
userId: removedCollaborator,
|
||||
workspaceId: input.workspaceId,
|
||||
});
|
||||
}
|
||||
|
||||
return {
|
||||
type: 'success',
|
||||
output: {
|
||||
@@ -319,8 +291,9 @@ class NodeService {
|
||||
const createTransaction: CreateNodeTransaction = {
|
||||
id: input.id,
|
||||
node_id: input.nodeId,
|
||||
node_type: attributes.type,
|
||||
workspace_id: context.workspaceId,
|
||||
type: 'create',
|
||||
operation: 'create',
|
||||
data:
|
||||
typeof input.data === 'string' ? decodeState(input.data) : input.data,
|
||||
created_at: input.createdAt,
|
||||
@@ -328,20 +301,12 @@ class NodeService {
|
||||
server_created_at: new Date(),
|
||||
};
|
||||
|
||||
const createCollaborations: CreateCollaboration[] =
|
||||
await this.buildCollaborations(
|
||||
input.nodeId,
|
||||
context.workspaceId,
|
||||
attributes,
|
||||
ancestors
|
||||
);
|
||||
|
||||
try {
|
||||
const { createdNode, createdTransaction, createdCollaborations } =
|
||||
const { createdNode, createdTransaction } =
|
||||
await this.applyDatabaseCreateTransaction(
|
||||
attributes,
|
||||
createNode,
|
||||
createTransaction,
|
||||
createCollaborations
|
||||
createTransaction
|
||||
);
|
||||
|
||||
eventBus.publish({
|
||||
@@ -351,19 +316,9 @@ class NodeService {
|
||||
workspaceId: context.workspaceId,
|
||||
});
|
||||
|
||||
for (const collaboration of createdCollaborations) {
|
||||
eventBus.publish({
|
||||
type: 'collaboration_created',
|
||||
userId: collaboration.user_id,
|
||||
nodeId: collaboration.node_id,
|
||||
workspaceId: collaboration.workspace_id,
|
||||
});
|
||||
}
|
||||
|
||||
return {
|
||||
node: createdNode,
|
||||
transaction: createdTransaction,
|
||||
collaborations: createdCollaborations,
|
||||
};
|
||||
} catch (error) {
|
||||
this.logger.error(error, 'Failed to apply node create transaction');
|
||||
@@ -412,7 +367,7 @@ class NodeService {
|
||||
.selectFrom('node_transactions')
|
||||
.selectAll()
|
||||
.where('node_id', '=', input.nodeId)
|
||||
.orderBy('number', 'asc')
|
||||
.orderBy('version', 'asc')
|
||||
.execute();
|
||||
|
||||
const ydoc = new YDoc();
|
||||
@@ -445,12 +400,10 @@ class NodeService {
|
||||
return { type: 'error', output: null };
|
||||
}
|
||||
|
||||
const { addedCollaborators, removedCollaborators } =
|
||||
this.checkCollaboratorChanges(
|
||||
node,
|
||||
ancestors.filter((a) => a.id !== input.nodeId),
|
||||
attributes
|
||||
);
|
||||
const collaboratorChanges = this.checkCollaboratorChanges(
|
||||
node.attributes,
|
||||
attributes
|
||||
);
|
||||
|
||||
try {
|
||||
const { updatedNode, createdTransaction } = await database
|
||||
@@ -480,7 +433,8 @@ class NodeService {
|
||||
id: input.id,
|
||||
node_id: input.nodeId,
|
||||
workspace_id: context.workspaceId,
|
||||
type: 'update',
|
||||
node_type: node.type,
|
||||
operation: 'update',
|
||||
data:
|
||||
typeof input.data === 'string'
|
||||
? decodeState(input.data)
|
||||
@@ -495,6 +449,12 @@ class NodeService {
|
||||
throw new Error('Failed to create transaction');
|
||||
}
|
||||
|
||||
await this.applyCollaborationUpdates(
|
||||
trx,
|
||||
input.nodeId,
|
||||
collaboratorChanges
|
||||
);
|
||||
|
||||
return {
|
||||
updatedNode,
|
||||
createdTransaction,
|
||||
@@ -508,24 +468,6 @@ class NodeService {
|
||||
workspaceId: context.workspaceId,
|
||||
});
|
||||
|
||||
for (const addedCollaborator of addedCollaborators) {
|
||||
jobService.addJob({
|
||||
type: 'create_collaborations',
|
||||
nodeId: input.nodeId,
|
||||
userId: addedCollaborator,
|
||||
workspaceId: context.workspaceId,
|
||||
});
|
||||
}
|
||||
|
||||
for (const removedCollaborator of removedCollaborators) {
|
||||
jobService.addJob({
|
||||
type: 'delete_collaborations',
|
||||
nodeId: input.nodeId,
|
||||
userId: removedCollaborator,
|
||||
workspaceId: context.workspaceId,
|
||||
});
|
||||
}
|
||||
|
||||
return {
|
||||
type: 'success',
|
||||
output: {
|
||||
@@ -534,6 +476,7 @@ class NodeService {
|
||||
},
|
||||
};
|
||||
} catch (error) {
|
||||
console.log('error', error);
|
||||
return { type: 'retry', output: null };
|
||||
}
|
||||
}
|
||||
@@ -562,8 +505,9 @@ class NodeService {
|
||||
return null;
|
||||
}
|
||||
|
||||
const { deletedNode, createdTransaction, updatedCollaborations } =
|
||||
await database.transaction().execute(async (trx) => {
|
||||
const { deletedNode, createdTransaction } = await database
|
||||
.transaction()
|
||||
.execute(async (trx) => {
|
||||
const deletedNode = await trx
|
||||
.deleteFrom('nodes')
|
||||
.returningAll()
|
||||
@@ -586,7 +530,8 @@ class NodeService {
|
||||
id: input.id,
|
||||
node_id: input.nodeId,
|
||||
workspace_id: workspaceUser.workspace_id,
|
||||
type: 'delete',
|
||||
node_type: node.type,
|
||||
operation: 'delete',
|
||||
created_at: input.createdAt,
|
||||
created_by: workspaceUser.id,
|
||||
server_created_at: new Date(),
|
||||
@@ -597,19 +542,9 @@ class NodeService {
|
||||
throw new Error('Failed to create transaction');
|
||||
}
|
||||
|
||||
const updatedCollaborations = await trx
|
||||
.updateTable('collaborations')
|
||||
.returningAll()
|
||||
.set({
|
||||
deleted_at: input.createdAt,
|
||||
})
|
||||
.where('node_id', '=', input.nodeId)
|
||||
.execute();
|
||||
|
||||
return {
|
||||
deletedNode,
|
||||
createdTransaction,
|
||||
updatedCollaborations,
|
||||
};
|
||||
});
|
||||
|
||||
@@ -620,27 +555,27 @@ class NodeService {
|
||||
workspaceId: workspaceUser.workspace_id,
|
||||
});
|
||||
|
||||
for (const collaboration of updatedCollaborations) {
|
||||
eventBus.publish({
|
||||
type: 'collaboration_updated',
|
||||
userId: collaboration.user_id,
|
||||
nodeId: collaboration.node_id,
|
||||
workspaceId: collaboration.workspace_id,
|
||||
});
|
||||
}
|
||||
|
||||
return {
|
||||
node: deletedNode,
|
||||
transaction: createdTransaction,
|
||||
updatedCollaborations,
|
||||
};
|
||||
}
|
||||
|
||||
private async applyDatabaseCreateTransaction(
|
||||
attributes: NodeAttributes,
|
||||
node: CreateNode,
|
||||
transaction: CreateNodeTransaction,
|
||||
collaborations: CreateCollaboration[]
|
||||
transaction: CreateNodeTransaction
|
||||
) {
|
||||
const collaborationsToCreate: CreateCollaboration[] = Object.entries(
|
||||
extractNodeCollaborators(attributes)
|
||||
).map(([userId, role]) => ({
|
||||
user_id: userId,
|
||||
node_id: node.id,
|
||||
workspace_id: node.workspace_id,
|
||||
roles: JSON.stringify({ [node.id]: role }),
|
||||
created_at: new Date(),
|
||||
}));
|
||||
|
||||
return await database.transaction().execute(async (trx) => {
|
||||
const createdNode = await trx
|
||||
.insertInto('nodes')
|
||||
@@ -662,128 +597,138 @@ class NodeService {
|
||||
throw new Error('Failed to create transaction');
|
||||
}
|
||||
|
||||
let createdCollaborations: SelectCollaboration[] = [];
|
||||
if (collaborations.length > 0) {
|
||||
createdCollaborations = await trx
|
||||
if (collaborationsToCreate.length > 0) {
|
||||
const createdCollaborations = await trx
|
||||
.insertInto('collaborations')
|
||||
.returningAll()
|
||||
.values(collaborations)
|
||||
.values(collaborationsToCreate)
|
||||
.execute();
|
||||
|
||||
if (createdCollaborations.length !== collaborations.length) {
|
||||
if (createdCollaborations.length !== collaborationsToCreate.length) {
|
||||
throw new Error('Failed to create collaborations');
|
||||
}
|
||||
}
|
||||
|
||||
return { createdNode, createdTransaction, createdCollaborations };
|
||||
await sql`
|
||||
INSERT INTO collaborations (user_id, node_id, workspace_id, roles, created_at)
|
||||
SELECT
|
||||
c.user_id,
|
||||
${node.id} as node_id,
|
||||
${node.workspace_id} as workspace_id,
|
||||
c.roles,
|
||||
${new Date()} as created_at
|
||||
FROM collaborations as c
|
||||
WHERE c.node_id = ${attributes.parentId}
|
||||
ON CONFLICT (user_id, node_id) DO UPDATE
|
||||
SET
|
||||
roles = collaborations.roles || EXCLUDED.roles,
|
||||
updated_at = NOW()
|
||||
`.execute(trx);
|
||||
|
||||
return { createdNode, createdTransaction };
|
||||
});
|
||||
}
|
||||
|
||||
private async buildCollaborations(
|
||||
private async applyCollaborationUpdates(
|
||||
transaction: Transaction<DatabaseSchema>,
|
||||
nodeId: string,
|
||||
workspaceId: string,
|
||||
attributes: NodeAttributes,
|
||||
ancestors: Node[]
|
||||
updateResult: CollaboratorChangeResult
|
||||
) {
|
||||
if (attributes.type === 'user') {
|
||||
return this.buildUserCollaborations(nodeId, workspaceId);
|
||||
for (const [userId, role] of Object.entries(
|
||||
updateResult.addedCollaborators
|
||||
)) {
|
||||
const roles = JSON.stringify({ [nodeId]: role });
|
||||
await sql`
|
||||
INSERT INTO collaborations (user_id, node_id, workspace_id, roles, created_at)
|
||||
SELECT
|
||||
${userId} as user_id,
|
||||
np.descendant_id as node_id,
|
||||
np.workspace_id as workspace_id,
|
||||
${roles},
|
||||
${new Date()} as created_at
|
||||
FROM node_paths as np
|
||||
WHERE np.ancestor_id = ${nodeId}
|
||||
ON CONFLICT (user_id, node_id) DO UPDATE
|
||||
SET
|
||||
roles = collaborations.roles || EXCLUDED.roles,
|
||||
updated_at = NOW()
|
||||
`.execute(transaction);
|
||||
}
|
||||
|
||||
return this.buildNodeCollaborations(
|
||||
nodeId,
|
||||
workspaceId,
|
||||
attributes,
|
||||
ancestors
|
||||
);
|
||||
}
|
||||
|
||||
private async buildUserCollaborations(
|
||||
userId: string,
|
||||
workspaceId: string
|
||||
): Promise<CreateCollaboration[]> {
|
||||
const createCollaborations: CreateCollaboration[] = [];
|
||||
createCollaborations.push(
|
||||
buildDefaultCollaboration(userId, workspaceId, 'workspace', workspaceId)
|
||||
);
|
||||
|
||||
const workspaceUserIds = await fetchWorkspaceUsers(workspaceId);
|
||||
|
||||
for (const workspaceUserId of workspaceUserIds) {
|
||||
createCollaborations.push(
|
||||
buildDefaultCollaboration(workspaceUserId, userId, 'user', workspaceId)
|
||||
);
|
||||
|
||||
if (workspaceUserId === userId) {
|
||||
continue;
|
||||
}
|
||||
|
||||
createCollaborations.push(
|
||||
buildDefaultCollaboration(userId, workspaceUserId, 'user', workspaceId)
|
||||
);
|
||||
for (const [userId, role] of Object.entries(
|
||||
updateResult.updatedCollaborators
|
||||
)) {
|
||||
const roles = JSON.stringify({ [nodeId]: role });
|
||||
await sql`
|
||||
INSERT INTO collaborations (user_id, node_id, workspace_id, roles, created_at)
|
||||
SELECT
|
||||
${userId} as user_id,
|
||||
np.descendant_id as node_id,
|
||||
np.workspace_id as workspace_id,
|
||||
${roles},
|
||||
${new Date()} as created_at
|
||||
FROM node_paths as np
|
||||
WHERE np.ancestor_id = ${nodeId}
|
||||
ON CONFLICT (user_id, node_id) DO UPDATE
|
||||
SET
|
||||
roles = collaborations.roles || EXCLUDED.roles,
|
||||
updated_at = NOW()
|
||||
`.execute(transaction);
|
||||
}
|
||||
|
||||
return createCollaborations;
|
||||
}
|
||||
|
||||
private buildNodeCollaborations(
|
||||
nodeId: string,
|
||||
workspaceId: string,
|
||||
attributes: NodeAttributes,
|
||||
ancestors: Node[]
|
||||
): CreateCollaboration[] {
|
||||
const collaborators = extractNodeCollaborators([
|
||||
...ancestors.map((a) => a.attributes),
|
||||
attributes,
|
||||
]);
|
||||
|
||||
const collaboratorIds = Object.keys(collaborators);
|
||||
const createCollaborations: CreateCollaboration[] = collaboratorIds.map(
|
||||
(userId) =>
|
||||
buildDefaultCollaboration(userId, nodeId, attributes.type, workspaceId)
|
||||
const removedCollaboratorIds = Object.keys(
|
||||
updateResult.removedCollaborators
|
||||
);
|
||||
|
||||
return createCollaborations;
|
||||
if (removedCollaboratorIds.length > 0) {
|
||||
await sql`
|
||||
UPDATE collaborations
|
||||
SET
|
||||
roles = collaborations.roles - ${nodeId},
|
||||
updated_at = NOW()
|
||||
WHERE user_id IN (${sql.join(removedCollaboratorIds, sql`, `)})
|
||||
AND node_id IN
|
||||
(
|
||||
SELECT np.descendant_id
|
||||
FROM node_paths as np
|
||||
WHERE np.ancestor_id = ${nodeId}
|
||||
)
|
||||
`.execute(transaction);
|
||||
}
|
||||
}
|
||||
|
||||
private checkCollaboratorChanges(
|
||||
node: Node,
|
||||
ancestors: Node[],
|
||||
newAttributes: NodeAttributes
|
||||
beforeAttributes: NodeAttributes,
|
||||
afterAttributes: NodeAttributes
|
||||
): CollaboratorChangeResult {
|
||||
const beforeCollaborators = Object.keys(
|
||||
extractNodeCollaborators(node.attributes)
|
||||
);
|
||||
const beforeCollaborators = extractNodeCollaborators(beforeAttributes);
|
||||
const afterCollaborators = extractNodeCollaborators(afterAttributes);
|
||||
|
||||
const afterCollaborators = Object.keys(
|
||||
extractNodeCollaborators(newAttributes)
|
||||
);
|
||||
const addedCollaborators: Record<string, NodeRole> = {};
|
||||
const updatedCollaborators: Record<string, NodeRole> = {};
|
||||
const removedCollaborators: Record<string, NodeRole> = {};
|
||||
|
||||
if (beforeCollaborators.length === 0 && afterCollaborators.length === 0) {
|
||||
return { removedCollaborators: [], addedCollaborators: [] };
|
||||
// Check for added and updated collaborators
|
||||
for (const [userId, newRole] of Object.entries(afterCollaborators)) {
|
||||
if (!(userId in beforeCollaborators)) {
|
||||
addedCollaborators[userId] = newRole;
|
||||
} else if (beforeCollaborators[userId] !== newRole) {
|
||||
updatedCollaborators[userId] = newRole;
|
||||
}
|
||||
}
|
||||
|
||||
const addedCollaborators = difference(
|
||||
afterCollaborators,
|
||||
beforeCollaborators
|
||||
);
|
||||
|
||||
const removedCollaborators = difference(
|
||||
beforeCollaborators,
|
||||
afterCollaborators
|
||||
);
|
||||
|
||||
if (addedCollaborators.length === 0 && removedCollaborators.length === 0) {
|
||||
return { removedCollaborators: [], addedCollaborators: [] };
|
||||
// Check for removed collaborators
|
||||
for (const [userId, oldRole] of Object.entries(beforeCollaborators)) {
|
||||
if (!(userId in afterCollaborators)) {
|
||||
removedCollaborators[userId] = oldRole;
|
||||
}
|
||||
}
|
||||
|
||||
const inheritedCollaborators = Object.keys(
|
||||
extractNodeCollaborators(ancestors.map((a) => a.attributes))
|
||||
);
|
||||
|
||||
const added = difference(addedCollaborators, inheritedCollaborators);
|
||||
const removed = difference(removedCollaborators, inheritedCollaborators);
|
||||
|
||||
return { removedCollaborators: removed, addedCollaborators: added };
|
||||
return {
|
||||
addedCollaborators,
|
||||
updatedCollaborators,
|
||||
removedCollaborators,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -3,23 +3,19 @@ import { Server } from 'http';
|
||||
import { WebSocketServer, WebSocket } from 'ws';
|
||||
import { verifyToken } from '@/lib/tokens';
|
||||
import {
|
||||
CollaborationsBatchMessage,
|
||||
CollaborationRevocationsBatchMessage,
|
||||
Message,
|
||||
NodeTransactionsBatchMessage,
|
||||
} from '@colanode/core';
|
||||
import { logService } from '@/services/log';
|
||||
import { mapCollaboration, mapNodeTransaction } from '@/lib/nodes';
|
||||
import { mapCollaborationRevocation, mapNodeTransaction } from '@/lib/nodes';
|
||||
import { eventBus } from '@/lib/event-bus';
|
||||
import {
|
||||
CollaborationCreatedEvent,
|
||||
CollaborationUpdatedEvent,
|
||||
NodeTransactionCreatedEvent,
|
||||
} from '@/types/events';
|
||||
import { NodeTransactionCreatedEvent } from '@/types/events';
|
||||
|
||||
interface SynapseUserCursor {
|
||||
workspaceId: string;
|
||||
userId: string;
|
||||
cursor: string | null;
|
||||
cursor: string;
|
||||
syncing: boolean;
|
||||
}
|
||||
|
||||
@@ -27,8 +23,8 @@ interface SynapseConnection {
|
||||
accountId: string;
|
||||
deviceId: string;
|
||||
socket: WebSocket;
|
||||
nodeTransactions: Map<string, SynapseUserCursor>;
|
||||
collaborations: Map<string, SynapseUserCursor>;
|
||||
transactions: Map<string, SynapseUserCursor>;
|
||||
revocations: Map<string, SynapseUserCursor>;
|
||||
}
|
||||
|
||||
class SynapseService {
|
||||
@@ -39,10 +35,6 @@ class SynapseService {
|
||||
eventBus.subscribe((event) => {
|
||||
if (event.type === 'node_transaction_created') {
|
||||
this.handleNodeTransactionCreatedEvent(event);
|
||||
} else if (event.type === 'collaboration_created') {
|
||||
this.handleCollaborationCreatedEvent(event);
|
||||
} else if (event.type === 'collaboration_updated') {
|
||||
this.handleCollaborationUpdatedEvent(event);
|
||||
}
|
||||
});
|
||||
}
|
||||
@@ -114,8 +106,8 @@ class SynapseService {
|
||||
accountId: account.id,
|
||||
deviceId: account.deviceId,
|
||||
socket,
|
||||
nodeTransactions: new Map(),
|
||||
collaborations: new Map(),
|
||||
transactions: new Map(),
|
||||
revocations: new Map(),
|
||||
};
|
||||
|
||||
this.connections.set(account.deviceId, connection);
|
||||
@@ -139,9 +131,9 @@ class SynapseService {
|
||||
this.logger.trace(message, `Socket message from ${connection.deviceId}`);
|
||||
|
||||
if (message.type === 'fetch_node_transactions') {
|
||||
const state = connection.nodeTransactions.get(message.userId);
|
||||
const state = connection.transactions.get(message.userId);
|
||||
if (!state) {
|
||||
connection.nodeTransactions.set(message.userId, {
|
||||
connection.transactions.set(message.userId, {
|
||||
userId: message.userId,
|
||||
workspaceId: message.workspaceId,
|
||||
cursor: message.cursor,
|
||||
@@ -153,20 +145,20 @@ class SynapseService {
|
||||
state.cursor = message.cursor;
|
||||
this.sendPendingTransactions(connection, message.userId);
|
||||
}
|
||||
} else if (message.type === 'fetch_collaborations') {
|
||||
const state = connection.collaborations.get(message.userId);
|
||||
} else if (message.type === 'fetch_collaboration_revocations') {
|
||||
const state = connection.revocations.get(message.userId);
|
||||
if (!state) {
|
||||
connection.collaborations.set(message.userId, {
|
||||
connection.revocations.set(message.userId, {
|
||||
userId: message.userId,
|
||||
workspaceId: message.workspaceId,
|
||||
cursor: message.cursor,
|
||||
syncing: false,
|
||||
});
|
||||
|
||||
this.sendPendingCollaborations(connection, message.userId);
|
||||
this.sendPendingRevocations(connection, message.userId);
|
||||
} else if (!state.syncing && state.cursor !== message.cursor) {
|
||||
state.cursor = message.cursor;
|
||||
this.sendPendingCollaborations(connection, message.userId);
|
||||
this.sendPendingRevocations(connection, message.userId);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -175,7 +167,7 @@ class SynapseService {
|
||||
connection: SynapseConnection,
|
||||
userId: string
|
||||
) {
|
||||
const state = connection.nodeTransactions.get(userId);
|
||||
const state = connection.transactions.get(userId);
|
||||
if (!state || state.syncing) {
|
||||
return;
|
||||
}
|
||||
@@ -186,19 +178,23 @@ class SynapseService {
|
||||
`Sending pending node transactions for ${connection.deviceId} with ${userId}`
|
||||
);
|
||||
|
||||
let query = database
|
||||
const unsyncedTransactions = await database
|
||||
.selectFrom('node_transactions as nt')
|
||||
.innerJoin('collaborations as c', (join) =>
|
||||
.leftJoin('collaborations as c', (join) =>
|
||||
join.on('c.user_id', '=', userId).onRef('c.node_id', '=', 'nt.node_id')
|
||||
)
|
||||
.selectAll('nt');
|
||||
|
||||
if (state.cursor) {
|
||||
query = query.where('nt.number', '>', BigInt(state.cursor));
|
||||
}
|
||||
|
||||
const unsyncedTransactions = await query
|
||||
.orderBy('nt.number', 'asc')
|
||||
.selectAll('nt')
|
||||
.where((eb) =>
|
||||
eb.or([
|
||||
eb.and([
|
||||
eb('nt.workspace_id', '=', state.workspaceId),
|
||||
eb('nt.node_type', 'in', ['workspace', 'user']),
|
||||
]),
|
||||
eb('c.node_id', '=', eb.ref('nt.node_id')),
|
||||
])
|
||||
)
|
||||
.where('nt.version', '>', BigInt(state.cursor))
|
||||
.orderBy('nt.version', 'asc')
|
||||
.limit(20)
|
||||
.execute();
|
||||
|
||||
@@ -214,15 +210,15 @@ class SynapseService {
|
||||
transactions,
|
||||
};
|
||||
|
||||
connection.nodeTransactions.delete(userId);
|
||||
connection.transactions.delete(userId);
|
||||
this.sendMessage(connection, message);
|
||||
}
|
||||
|
||||
private async sendPendingCollaborations(
|
||||
private async sendPendingRevocations(
|
||||
connection: SynapseConnection,
|
||||
userId: string
|
||||
) {
|
||||
const state = connection.collaborations.get(userId);
|
||||
const state = connection.revocations.get(userId);
|
||||
if (!state || state.syncing) {
|
||||
return;
|
||||
}
|
||||
@@ -230,36 +226,31 @@ class SynapseService {
|
||||
state.syncing = true;
|
||||
this.logger.trace(
|
||||
state,
|
||||
`Sending pending collaborations for ${connection.deviceId} with ${userId}`
|
||||
`Sending pending collaboration revocations for ${connection.deviceId} with ${userId}`
|
||||
);
|
||||
|
||||
let query = database
|
||||
.selectFrom('collaborations as c')
|
||||
const unsyncedRevocations = await database
|
||||
.selectFrom('collaboration_revocations as cr')
|
||||
.selectAll()
|
||||
.where('c.user_id', '=', userId);
|
||||
|
||||
if (state.cursor) {
|
||||
query = query.where('c.number', '>', BigInt(state.cursor));
|
||||
}
|
||||
|
||||
const unsyncedCollaborations = await query
|
||||
.orderBy('c.number', 'asc')
|
||||
.where('cr.user_id', '=', userId)
|
||||
.where('cr.version', '>', BigInt(state.cursor))
|
||||
.orderBy('cr.version', 'asc')
|
||||
.limit(20)
|
||||
.execute();
|
||||
|
||||
if (unsyncedCollaborations.length === 0) {
|
||||
if (unsyncedRevocations.length === 0) {
|
||||
state.syncing = false;
|
||||
return;
|
||||
}
|
||||
|
||||
const collaborations = unsyncedCollaborations.map(mapCollaboration);
|
||||
const message: CollaborationsBatchMessage = {
|
||||
type: 'collaborations_batch',
|
||||
const revocations = unsyncedRevocations.map(mapCollaborationRevocation);
|
||||
const message: CollaborationRevocationsBatchMessage = {
|
||||
type: 'collaboration_revocations_batch',
|
||||
userId,
|
||||
collaborations,
|
||||
revocations,
|
||||
};
|
||||
|
||||
connection.collaborations.delete(userId);
|
||||
connection.revocations.delete(userId);
|
||||
this.sendMessage(connection, message);
|
||||
}
|
||||
|
||||
@@ -299,44 +290,12 @@ class SynapseService {
|
||||
}
|
||||
}
|
||||
|
||||
private handleCollaborationCreatedEvent(event: CollaborationCreatedEvent) {
|
||||
const userDevices = this.getPendingCollaborationCursors(event.userId);
|
||||
if (userDevices.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
for (const deviceId of userDevices) {
|
||||
const socketConnection = this.connections.get(deviceId);
|
||||
if (socketConnection === undefined) {
|
||||
continue;
|
||||
}
|
||||
|
||||
this.sendPendingCollaborations(socketConnection, event.userId);
|
||||
}
|
||||
}
|
||||
|
||||
private handleCollaborationUpdatedEvent(event: CollaborationUpdatedEvent) {
|
||||
const userDevices = this.getPendingCollaborationCursors(event.userId);
|
||||
if (userDevices.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
for (const deviceId of userDevices) {
|
||||
const socketConnection = this.connections.get(deviceId);
|
||||
if (socketConnection === undefined) {
|
||||
continue;
|
||||
}
|
||||
|
||||
this.sendPendingCollaborations(socketConnection, event.userId);
|
||||
}
|
||||
}
|
||||
|
||||
private getPendingNodeTransactionCursors(
|
||||
workspaceId: string
|
||||
): Map<string, string[]> {
|
||||
const userDevices = new Map<string, string[]>();
|
||||
for (const connection of this.connections.values()) {
|
||||
const connectionUsers = connection.nodeTransactions.values();
|
||||
const connectionUsers = connection.transactions.values();
|
||||
for (const user of connectionUsers) {
|
||||
if (user.workspaceId !== workspaceId || user.syncing) {
|
||||
continue;
|
||||
@@ -354,7 +313,7 @@ class SynapseService {
|
||||
private getPendingCollaborationCursors(userId: string): string[] {
|
||||
const userDevices: string[] = [];
|
||||
for (const connection of this.connections.values()) {
|
||||
const connectionUsers = connection.collaborations.values();
|
||||
const connectionUsers = connection.revocations.values();
|
||||
for (const user of connectionUsers) {
|
||||
if (user.userId !== userId || user.syncing) {
|
||||
continue;
|
||||
|
||||
@@ -1,8 +1,4 @@
|
||||
import {
|
||||
SelectCollaboration,
|
||||
SelectNode,
|
||||
SelectNodeTransaction,
|
||||
} from '@/data/schema';
|
||||
import { SelectNode, SelectNodeTransaction } from '@/data/schema';
|
||||
import { Node, NodeAttributes } from '@colanode/core';
|
||||
|
||||
export type NodeCollaborator = {
|
||||
@@ -22,7 +18,6 @@ export type CreateNodeInput = {
|
||||
export type CreateNodeOutput = {
|
||||
node: SelectNode;
|
||||
transaction: SelectNodeTransaction;
|
||||
createdCollaborations: SelectCollaboration[];
|
||||
};
|
||||
|
||||
export type UpdateNodeInput = {
|
||||
@@ -47,7 +42,6 @@ export type ApplyNodeCreateTransactionInput = {
|
||||
export type ApplyNodeCreateTransactionOutput = {
|
||||
node: SelectNode;
|
||||
transaction: SelectNodeTransaction;
|
||||
collaborations: SelectCollaboration[];
|
||||
};
|
||||
|
||||
export type ApplyNodeUpdateTransactionInput = {
|
||||
@@ -72,5 +66,4 @@ export type ApplyNodeDeleteTransactionInput = {
|
||||
export type ApplyNodeDeleteTransactionOutput = {
|
||||
node: SelectNode;
|
||||
transaction: SelectNodeTransaction;
|
||||
updatedCollaborations: SelectCollaboration[];
|
||||
};
|
||||
|
||||
@@ -1,17 +1,17 @@
|
||||
import { ServerCollaboration, ServerNodeTransaction } from './sync';
|
||||
import { ServerCollaborationRevocation, ServerNodeTransaction } from './sync';
|
||||
|
||||
export type FetchNodeTransactionsMessage = {
|
||||
type: 'fetch_node_transactions';
|
||||
userId: string;
|
||||
workspaceId: string;
|
||||
cursor: string | null;
|
||||
cursor: string;
|
||||
};
|
||||
|
||||
export type FetchCollaborationsMessage = {
|
||||
type: 'fetch_collaborations';
|
||||
export type FetchCollaborationRevocationsMessage = {
|
||||
type: 'fetch_collaboration_revocations';
|
||||
userId: string;
|
||||
workspaceId: string;
|
||||
cursor: string | null;
|
||||
cursor: string;
|
||||
};
|
||||
|
||||
export type NodeTransactionsBatchMessage = {
|
||||
@@ -20,14 +20,14 @@ export type NodeTransactionsBatchMessage = {
|
||||
transactions: ServerNodeTransaction[];
|
||||
};
|
||||
|
||||
export type CollaborationsBatchMessage = {
|
||||
type: 'collaborations_batch';
|
||||
export type CollaborationRevocationsBatchMessage = {
|
||||
type: 'collaboration_revocations_batch';
|
||||
userId: string;
|
||||
collaborations: ServerCollaboration[];
|
||||
revocations: ServerCollaborationRevocation[];
|
||||
};
|
||||
|
||||
export type Message =
|
||||
| FetchNodeTransactionsMessage
|
||||
| NodeTransactionsBatchMessage
|
||||
| FetchCollaborationsMessage
|
||||
| CollaborationsBatchMessage;
|
||||
| FetchCollaborationRevocationsMessage
|
||||
| CollaborationRevocationsBatchMessage;
|
||||
|
||||
@@ -21,7 +21,8 @@ export type LocalNodeTransaction =
|
||||
export type LocalCreateNodeTransaction = {
|
||||
id: string;
|
||||
nodeId: string;
|
||||
type: 'create';
|
||||
nodeType: string;
|
||||
operation: 'create';
|
||||
data: string;
|
||||
createdAt: string;
|
||||
createdBy: string;
|
||||
@@ -30,7 +31,8 @@ export type LocalCreateNodeTransaction = {
|
||||
export type LocalUpdateNodeTransaction = {
|
||||
id: string;
|
||||
nodeId: string;
|
||||
type: 'update';
|
||||
nodeType: string;
|
||||
operation: 'update';
|
||||
data: string;
|
||||
createdAt: string;
|
||||
createdBy: string;
|
||||
@@ -39,44 +41,48 @@ export type LocalUpdateNodeTransaction = {
|
||||
export type LocalDeleteNodeTransaction = {
|
||||
id: string;
|
||||
nodeId: string;
|
||||
type: 'delete';
|
||||
nodeType: string;
|
||||
operation: 'delete';
|
||||
createdAt: string;
|
||||
createdBy: string;
|
||||
};
|
||||
|
||||
export type ServerNodeCreateTransaction = {
|
||||
id: string;
|
||||
type: 'create';
|
||||
operation: 'create';
|
||||
nodeId: string;
|
||||
nodeType: string;
|
||||
workspaceId: string;
|
||||
data: string;
|
||||
createdAt: string;
|
||||
createdBy: string;
|
||||
serverCreatedAt: string;
|
||||
number: string;
|
||||
version: string;
|
||||
};
|
||||
|
||||
export type ServerNodeUpdateTransaction = {
|
||||
id: string;
|
||||
type: 'update';
|
||||
operation: 'update';
|
||||
nodeId: string;
|
||||
nodeType: string;
|
||||
workspaceId: string;
|
||||
data: string;
|
||||
createdAt: string;
|
||||
createdBy: string;
|
||||
serverCreatedAt: string;
|
||||
number: string;
|
||||
version: string;
|
||||
};
|
||||
|
||||
export type ServerNodeDeleteTransaction = {
|
||||
id: string;
|
||||
type: 'delete';
|
||||
operation: 'delete';
|
||||
nodeId: string;
|
||||
nodeType: string;
|
||||
workspaceId: string;
|
||||
createdAt: string;
|
||||
createdBy: string;
|
||||
serverCreatedAt: string;
|
||||
number: string;
|
||||
version: string;
|
||||
};
|
||||
|
||||
export type ServerNodeTransaction =
|
||||
@@ -84,14 +90,10 @@ export type ServerNodeTransaction =
|
||||
| ServerNodeUpdateTransaction
|
||||
| ServerNodeDeleteTransaction;
|
||||
|
||||
export type ServerCollaboration = {
|
||||
export type ServerCollaborationRevocation = {
|
||||
userId: string;
|
||||
nodeId: string;
|
||||
type: string;
|
||||
workspaceId: string;
|
||||
state: string;
|
||||
createdAt: string;
|
||||
updatedAt: string | null;
|
||||
deletedAt: string | null;
|
||||
number: string;
|
||||
version: string;
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user