Rename some tables for clarity

This commit is contained in:
Hakan Shehu
2024-12-05 01:11:41 +01:00
parent a462077a8c
commit a688c2b719
22 changed files with 313 additions and 317 deletions

View File

@@ -34,10 +34,10 @@ const createNodesTable: Migration = {
},
};
const createNodeTransactionsTable: Migration = {
const createTransactionsTable: Migration = {
up: async (db) => {
await db.schema
.createTable('node_transactions')
.createTable('transactions')
.addColumn('id', 'text', (col) => col.notNull().primaryKey())
.addColumn('node_id', 'text', (col) => col.notNull())
.addColumn('node_type', 'text', (col) => col.notNull())
@@ -52,7 +52,7 @@ const createNodeTransactionsTable: Migration = {
.execute();
},
down: async (db) => {
await db.schema.dropTable('node_transactions').execute();
await db.schema.dropTable('transactions').execute();
},
};
@@ -317,7 +317,7 @@ const createCursorsTable: Migration = {
export const workspaceDatabaseMigrations: Record<string, Migration> = {
'00001_create_nodes_table': createNodesTable,
'00002_create_node_transactions_table': createNodeTransactionsTable,
'00002_create_transactions_table': createTransactionsTable,
'00003_create_collaborations_table': createCollaborationsTable,
'00004_create_uploads_table': createUploadsTable,
'00005_create_downloads_table': createDownloadsTable,

View File

@@ -29,7 +29,7 @@ interface NodePathTable {
export type SelectNodePath = Selectable<NodePathTable>;
interface NodeTransactionTable {
interface TransactionTable {
id: ColumnType<string, string, never>;
node_id: ColumnType<string, string, never>;
node_type: ColumnType<NodeType, NodeType, never>;
@@ -43,9 +43,9 @@ interface NodeTransactionTable {
version: ColumnType<bigint | null, bigint | null, bigint | null>;
}
export type SelectNodeTransaction = Selectable<NodeTransactionTable>;
export type CreateNodeTransaction = Insertable<NodeTransactionTable>;
export type UpdateNodeTransaction = Updateable<NodeTransactionTable>;
export type SelectTransaction = Selectable<TransactionTable>;
export type CreateTransaction = Insertable<TransactionTable>;
export type UpdateTransaction = Updateable<TransactionTable>;
interface CollaborationTable {
user_id: ColumnType<string, string, never>;
@@ -128,7 +128,7 @@ interface CursorTable {
export interface WorkspaceDatabaseSchema {
nodes: NodeTable;
node_transactions: NodeTransactionTable;
transactions: TransactionTable;
node_paths: NodePathTable;
collaborations: CollaborationTable;
uploads: UploadTable;

View File

@@ -1,6 +1,6 @@
import {
ServerCollaboration,
ServerCollaborationRevocation,
ServerDeletedCollaboration,
} from '@colanode/core';
import { createDebugger } from '@/main/debugger';
@@ -51,12 +51,12 @@ class CollaborationService {
});
}
public async applyServerCollaborationRevocation(
public async applyServerDeletedCollaboration(
userId: string,
revocation: ServerCollaborationRevocation
deletedCollaboration: ServerDeletedCollaboration
) {
this.debug(
`Applying server collaboration revocation: ${revocation.nodeId} for user ${userId}`
`Applying server deleted collaboration: ${deletedCollaboration.nodeId} for user ${userId}`
);
const workspaceDatabase =
@@ -65,27 +65,27 @@ class CollaborationService {
await workspaceDatabase.transaction().execute(async (tx) => {
await tx
.deleteFrom('nodes')
.where('id', '=', revocation.nodeId)
.where('id', '=', deletedCollaboration.nodeId)
.execute();
await tx
.deleteFrom('node_transactions')
.where('node_id', '=', revocation.nodeId)
.deleteFrom('transactions')
.where('node_id', '=', deletedCollaboration.nodeId)
.execute();
await tx
.deleteFrom('collaborations')
.where('node_id', '=', revocation.nodeId)
.where('node_id', '=', deletedCollaboration.nodeId)
.execute();
await tx
.deleteFrom('interaction_events')
.where('node_id', '=', revocation.nodeId)
.where('node_id', '=', deletedCollaboration.nodeId)
.execute();
await tx
.deleteFrom('interactions')
.where('node_id', '=', revocation.nodeId)
.where('node_id', '=', deletedCollaboration.nodeId)
.execute();
});
}

View File

@@ -272,16 +272,16 @@ class FileService {
continue;
}
const nodeTransactions = await workspaceDatabase
.selectFrom('node_transactions')
const transactions = await workspaceDatabase
.selectFrom('transactions')
.selectAll()
.where('node_id', '=', upload.node_id)
.where('status', '=', 'pending')
.execute();
if (nodeTransactions.length > 0) {
if (transactions.length > 0) {
this.debug(
`Node transactions found for node ${upload.node_id}, skipping upload until transactions are completed`
`Transactions found for node ${upload.node_id}, skipping upload until transactions are completed`
);
continue;
}

View File

@@ -5,10 +5,10 @@ import {
NodeAttributes,
NodeMutationContext,
registry,
ServerNodeCreateTransaction,
ServerNodeDeleteTransaction,
ServerNodeTransaction,
ServerNodeUpdateTransaction,
ServerCreateTransaction,
ServerDeleteTransaction,
ServerTransaction,
ServerUpdateTransaction,
} from '@colanode/core';
import { decodeState, YDoc } from '@colanode/crdt';
import { sql } from 'kysely';
@@ -21,7 +21,7 @@ import {
CreateUpload,
SelectDownload,
SelectNode,
SelectNodeTransaction,
SelectTransaction,
SelectUpload,
} from '@/main/data/workspace/schema';
import { interactionService } from '@/main/services/interaction-service';
@@ -77,7 +77,7 @@ class NodeService {
const inputs = Array.isArray(input) ? input : [input];
const createdNodes: SelectNode[] = [];
const createdNodeTransactions: SelectNodeTransaction[] = [];
const createdTransactions: SelectTransaction[] = [];
const createdUploads: SelectUpload[] = [];
const createdDownloads: SelectDownload[] = [];
@@ -140,7 +140,7 @@ class NodeService {
createdNodes.push(createdNode);
const createdTransaction = await transaction
.insertInto('node_transactions')
.insertInto('transactions')
.returningAll()
.values({
id: transactionId,
@@ -159,7 +159,7 @@ class NodeService {
throw new Error('Failed to create transaction');
}
createdNodeTransactions.push(createdTransaction);
createdTransactions.push(createdTransaction);
if (inputItem.upload) {
const createdUpload = await transaction
@@ -203,13 +203,13 @@ class NodeService {
});
}
for (const createdTransaction of createdNodeTransactions) {
for (const createdTransaction of createdTransactions) {
this.debug(
`Created transaction ${createdTransaction.id} for node ${createdTransaction.node_id} with operation ${createdTransaction.operation}`
);
eventBus.publish({
type: 'node_transaction_created',
type: 'transaction_created',
userId,
transaction: mapTransaction(createdTransaction),
});
@@ -312,7 +312,7 @@ class NodeService {
const ydoc = new YDoc();
const previousTransactions = await workspaceDatabase
.selectFrom('node_transactions')
.selectFrom('transactions')
.where('node_id', '=', nodeId)
.selectAll()
.execute();
@@ -345,7 +345,7 @@ class NodeService {
if (updatedNode) {
const createdTransaction = await trx
.insertInto('node_transactions')
.insertInto('transactions')
.returningAll()
.values({
id: transactionId,
@@ -389,7 +389,7 @@ class NodeService {
);
eventBus.publish({
type: 'node_transaction_created',
type: 'transaction_created',
userId,
transaction: mapTransaction(createdTransaction),
});
@@ -452,7 +452,7 @@ class NodeService {
}
await trx
.deleteFrom('node_transactions')
.deleteFrom('transactions')
.where('node_id', '=', nodeId)
.execute();
@@ -472,7 +472,7 @@ class NodeService {
.execute();
const createdTransaction = await trx
.insertInto('node_transactions')
.insertInto('transactions')
.returningAll()
.values({
id: generateId(IdType.Transaction),
@@ -510,7 +510,7 @@ class NodeService {
);
eventBus.publish({
type: 'node_transaction_created',
type: 'transaction_created',
userId,
transaction: mapTransaction(createdTransaction),
});
@@ -521,7 +521,7 @@ class NodeService {
public async applyServerTransaction(
userId: string,
transaction: ServerNodeTransaction
transaction: ServerTransaction
) {
if (transaction.operation === 'create') {
await this.applyServerCreateTransaction(userId, transaction);
@@ -535,7 +535,7 @@ class NodeService {
public async replaceTransactions(
userId: string,
nodeId: string,
transactions: ServerNodeTransaction[]
transactions: ServerTransaction[]
): Promise<boolean> {
const workspaceDatabase =
await databaseService.getWorkspaceDatabase(userId);
@@ -592,7 +592,7 @@ class NodeService {
.execute();
await trx
.insertInto('node_transactions')
.insertInto('transactions')
.values(
transactions.map((t) => ({
id: t.id,
@@ -624,7 +624,7 @@ class NodeService {
private async applyServerCreateTransaction(
userId: string,
transaction: ServerNodeCreateTransaction
transaction: ServerCreateTransaction
) {
this.debug(
`Applying server create transaction ${transaction.id} for node ${transaction.nodeId}`
@@ -635,7 +635,7 @@ class NodeService {
const version = BigInt(transaction.version);
const existingTransaction = await workspaceDatabase
.selectFrom('node_transactions')
.selectFrom('transactions')
.select(['id', 'status', 'version', 'server_created_at'])
.where('id', '=', transaction.id)
.executeTakeFirst();
@@ -653,7 +653,7 @@ class NodeService {
}
await workspaceDatabase
.updateTable('node_transactions')
.updateTable('transactions')
.set({
status: 'synced',
version,
@@ -689,7 +689,7 @@ class NodeService {
.executeTakeFirst();
await trx
.insertInto('node_transactions')
.insertInto('transactions')
.values({
id: transaction.id,
node_id: transaction.nodeId,
@@ -732,7 +732,7 @@ class NodeService {
);
eventBus.publish({
type: 'node_transaction_incomplete',
type: 'transaction_incomplete',
userId,
transactionId: transaction.id,
});
@@ -741,14 +741,14 @@ class NodeService {
private async applyServerUpdateTransaction(
userId: string,
transaction: ServerNodeUpdateTransaction
transaction: ServerUpdateTransaction
) {
const workspaceDatabase =
await databaseService.getWorkspaceDatabase(userId);
const version = BigInt(transaction.version);
const existingTransaction = await workspaceDatabase
.selectFrom('node_transactions')
.selectFrom('transactions')
.select(['id', 'status', 'version', 'server_created_at'])
.where('id', '=', transaction.id)
.executeTakeFirst();
@@ -766,7 +766,7 @@ class NodeService {
}
await workspaceDatabase
.updateTable('node_transactions')
.updateTable('transactions')
.set({
status: 'synced',
version,
@@ -782,7 +782,7 @@ class NodeService {
}
const previousTransactions = await workspaceDatabase
.selectFrom('node_transactions')
.selectFrom('transactions')
.selectAll()
.where('node_id', '=', transaction.nodeId)
.orderBy('id', 'asc')
@@ -815,7 +815,7 @@ class NodeService {
.executeTakeFirst();
await trx
.insertInto('node_transactions')
.insertInto('transactions')
.values({
id: transaction.id,
node_id: transaction.nodeId,
@@ -858,7 +858,7 @@ class NodeService {
);
eventBus.publish({
type: 'node_transaction_incomplete',
type: 'transaction_incomplete',
userId,
transactionId: transaction.id,
});
@@ -867,7 +867,7 @@ class NodeService {
private async applyServerDeleteTransaction(
userId: string,
transaction: ServerNodeDeleteTransaction
transaction: ServerDeleteTransaction
) {
this.debug(
`Applying server delete transaction ${transaction.id} for node ${transaction.nodeId}`
@@ -880,7 +880,7 @@ class NodeService {
.transaction()
.execute(async (trx) => {
await trx
.deleteFrom('node_transactions')
.deleteFrom('transactions')
.where('node_id', '=', transaction.nodeId)
.execute();

View File

@@ -57,10 +57,10 @@ export class SocketConnection {
`Received message of type ${message.type} for account ${this.account.id}`
);
if (message.type === 'node_transactions_batch') {
if (message.type === 'transactions_batch') {
syncService.syncServerTransactions(message);
} else if (message.type === 'collaboration_revocations_batch') {
syncService.syncServerRevocations(message);
} else if (message.type === 'deleted_collaborations_batch') {
syncService.syncServerDeletedCollaborations(message);
} else if (message.type === 'collaborations_batch') {
syncService.syncServerCollaborations(message);
} else if (message.type === 'interactions_batch') {

View File

@@ -1,13 +1,13 @@
import {
CollaborationRevocationsBatchMessage,
CollaborationsBatchMessage,
DeletedCollaborationsBatchMessage,
InitSyncConsumerMessage,
GetNodeTransactionsOutput,
GetTransactionsOutput,
InteractionsBatchMessage,
LocalNodeTransaction,
NodeTransactionsBatchMessage,
LocalTransaction,
TransactionsBatchMessage,
SyncInteractionsMessage,
SyncNodeTransactionsOutput,
SyncTransactionsOutput,
SyncConsumerType,
} from '@colanode/core';
import { sql } from 'kysely';
@@ -16,7 +16,7 @@ import { createDebugger } from '@/main/debugger';
import { databaseService } from '@/main/data/database-service';
import {
SelectInteractionEvent,
SelectNodeTransaction,
SelectTransaction,
} from '@/main/data/workspace/schema';
import { collaborationService } from '@/main/services/collaboration-service';
import { interactionService } from '@/main/services/interaction-service';
@@ -51,14 +51,14 @@ class SyncService {
private readonly syncingTransactions: Set<string> = new Set();
private readonly syncingCollaborations: Set<string> = new Set();
private readonly syncingRevocations: Set<string> = new Set();
private readonly syncingDeletedCollaborations: Set<string> = new Set();
private readonly syncingInteractions: Set<string> = new Set();
constructor() {
eventBus.subscribe((event) => {
if (event.type === 'node_transaction_created') {
if (event.type === 'transaction_created') {
this.syncLocalPendingTransactions(event.userId);
} else if (event.type === 'node_transaction_incomplete') {
} else if (event.type === 'transaction_incomplete') {
this.syncLocalIncompleteTransactions(event.userId);
} else if (event.type === 'workspace_created') {
this.syncWorkspace(event.workspace.userId);
@@ -92,7 +92,7 @@ class SyncService {
this.initSyncConsumer(userId, 'transactions');
this.initSyncConsumer(userId, 'collaborations');
this.initSyncConsumer(userId, 'revocations');
this.initSyncConsumer(userId, 'deleted_collaborations');
this.initSyncConsumer(userId, 'interactions');
this.syncMissingNodes(userId);
@@ -212,7 +212,7 @@ class SyncService {
}
}
public async syncServerTransactions(message: NodeTransactionsBatchMessage) {
public async syncServerTransactions(message: TransactionsBatchMessage) {
this.debug(`Syncing server transactions for user ${message.userId}`);
if (this.syncingTransactions.has(message.userId)) {
@@ -283,44 +283,46 @@ class SyncService {
}
}
public async syncServerRevocations(
message: CollaborationRevocationsBatchMessage
public async syncServerDeletedCollaborations(
message: DeletedCollaborationsBatchMessage
) {
this.debug(`Syncing server revocations for user ${message.userId}`);
this.debug(
`Syncing server deleted collaborations for user ${message.userId}`
);
if (this.syncingRevocations.has(message.userId)) {
if (this.syncingDeletedCollaborations.has(message.userId)) {
this.debug(
`Syncing of server revocations already in progress for user ${message.userId}, skipping`
`Syncing of server deleted collaborations already in progress for user ${message.userId}, skipping`
);
return;
}
this.syncingRevocations.add(message.userId);
this.syncingDeletedCollaborations.add(message.userId);
let cursor: bigint | null = null;
try {
for (const revocation of message.revocations) {
await collaborationService.applyServerCollaborationRevocation(
for (const deletedCollaboration of message.deletedCollaborations) {
await collaborationService.applyServerDeletedCollaboration(
message.userId,
revocation
deletedCollaboration
);
cursor = BigInt(revocation.version);
cursor = BigInt(deletedCollaboration.version);
}
if (cursor) {
this.updateCursor(message.userId, 'revocations', cursor);
this.updateCursor(message.userId, 'deleted_collaborations', cursor);
}
} catch (error) {
this.debug(
error,
`Error syncing server revocations for user ${message.userId}`
`Error syncing server deleted collaborations for user ${message.userId}`
);
} finally {
this.debug(
`Syncing of server revocations completed for user ${message.userId}`
`Syncing of server deleted collaborations completed for user ${message.userId}`
);
this.syncingRevocations.delete(message.userId);
this.initSyncConsumer(message.userId, 'revocations');
this.syncingDeletedCollaborations.delete(message.userId);
this.initSyncConsumer(message.userId, 'deleted_collaborations');
}
}
@@ -370,7 +372,7 @@ class SyncService {
await databaseService.getWorkspaceDatabase(userId);
const incompleteTransactions = await workspaceDatabase
.selectFrom('node_transactions')
.selectFrom('transactions')
.selectAll()
.where('status', '=', 'incomplete')
.execute();
@@ -396,7 +398,7 @@ class SyncService {
}
const groupedByNodeId = incompleteTransactions.reduce<
Record<string, SelectNodeTransaction[]>
Record<string, SelectTransaction[]>
>((acc, transaction) => {
acc[transaction.node_id] = [
...(acc[transaction.node_id] ?? []),
@@ -411,7 +413,7 @@ class SyncService {
`Syncing incomplete transactions for node ${nodeId} for user ${userId}`
);
const { data } = await httpClient.get<GetNodeTransactionsOutput>(
const { data } = await httpClient.get<GetTransactionsOutput>(
`/v1/nodes/${credentials.workspaceId}/transactions/${nodeId}`,
{
domain: credentials.serverDomain,
@@ -425,7 +427,7 @@ class SyncService {
);
await workspaceDatabase
.deleteFrom('node_transactions')
.deleteFrom('transactions')
.where(
'id',
'in',
@@ -447,7 +449,7 @@ class SyncService {
);
await workspaceDatabase
.updateTable('node_transactions')
.updateTable('transactions')
.set({ retry_count: sql`retry_count + 1` })
.where(
'id',
@@ -461,7 +463,7 @@ class SyncService {
);
await workspaceDatabase
.updateTable('node_transactions')
.updateTable('transactions')
.set({ retry_count: sql`retry_count + 1` })
.where(
'id',
@@ -515,7 +517,7 @@ class SyncService {
try {
this.debug(`Syncing missing node ${node.node_id} for user ${userId}`);
const { data } = await httpClient.get<GetNodeTransactionsOutput>(
const { data } = await httpClient.get<GetTransactionsOutput>(
`/v1/nodes/${credentials.workspaceId}/transactions/${node.node_id}`,
{
domain: credentials.serverDomain,
@@ -568,7 +570,7 @@ class SyncService {
}
try {
const { data } = await httpClient.get<GetNodeTransactionsOutput>(
const { data } = await httpClient.get<GetTransactionsOutput>(
`/v1/nodes/${credentials.workspaceId}/transactions/${nodeId}`,
{
domain: credentials.serverDomain,
@@ -592,7 +594,7 @@ class SyncService {
await databaseService.getWorkspaceDatabase(userId);
const unsyncedTransactions = await workspaceDatabase
.selectFrom('node_transactions')
.selectFrom('transactions')
.selectAll()
.where('status', '=', 'pending')
.orderBy('id', 'asc')
@@ -622,9 +624,9 @@ class SyncService {
return;
}
const transactions: LocalNodeTransaction[] =
const transactions: LocalTransaction[] =
unsyncedTransactions.map(mapTransaction);
const { data } = await httpClient.post<SyncNodeTransactionsOutput>(
const { data } = await httpClient.post<SyncTransactionsOutput>(
`/v1/sync/${credentials.workspaceId}`,
{
transactions,
@@ -652,7 +654,7 @@ class SyncService {
);
await workspaceDatabase
.updateTable('node_transactions')
.updateTable('transactions')
.set({ status: 'sent' })
.where('id', 'in', syncedTransactionIds)
.where('status', '=', 'pending')
@@ -665,7 +667,7 @@ class SyncService {
);
await workspaceDatabase
.updateTable('node_transactions')
.updateTable('transactions')
.set((eb) => ({ retry_count: eb('retry_count', '+', 1) }))
.where('id', 'in', unsyncedTransactionIds)
.where('status', '=', 'pending')

View File

@@ -1,4 +1,4 @@
import { LocalNodeTransaction, Node } from '@colanode/core';
import { LocalTransaction, Node } from '@colanode/core';
import { encodeState } from '@colanode/crdt';
import {
DeleteResult,
@@ -22,7 +22,7 @@ import {
SelectDownload,
SelectInteraction,
SelectNode,
SelectNodeTransaction,
SelectTransaction,
SelectUpload,
WorkspaceDatabaseSchema,
} from '@/main/data/workspace/schema';
@@ -175,9 +175,7 @@ export const mapWorkspace = (row: SelectWorkspace): Workspace => {
};
};
export const mapTransaction = (
row: SelectNodeTransaction
): LocalNodeTransaction => {
export const mapTransaction = (row: SelectTransaction): LocalTransaction => {
if (row.operation === 'create' && row.data) {
return {
id: row.id,

View File

@@ -1,4 +1,4 @@
import { LocalNodeTransaction, Node } from '@colanode/core';
import { LocalTransaction, Node } from '@colanode/core';
import { Account } from '@/shared/types/accounts';
import { Interaction } from '@/shared/types/interactions';
@@ -110,14 +110,14 @@ export type RadarDataUpdatedEvent = {
type: 'radar_data_updated';
};
export type NodeTransactionCreatedEvent = {
type: 'node_transaction_created';
export type TransactionCreatedEvent = {
type: 'transaction_created';
userId: string;
transaction: LocalNodeTransaction;
transaction: LocalTransaction;
};
export type NodeTransactionIncompleteEvent = {
type: 'node_transaction_incomplete';
export type TransactionIncompleteEvent = {
type: 'transaction_incomplete';
userId: string;
transactionId: string;
};
@@ -172,8 +172,8 @@ export type Event =
| UploadDeletedEvent
| QueryResultUpdatedEvent
| RadarDataUpdatedEvent
| NodeTransactionCreatedEvent
| NodeTransactionIncompleteEvent
| TransactionCreatedEvent
| TransactionIncompleteEvent
| ServerAvailabilityChangedEvent
| SocketConnectionOpenedEvent
| CollaborationCreatedEvent

View File

@@ -122,10 +122,10 @@ const createNodesTable: Migration = {
},
};
const createNodeTransactionsTable: Migration = {
const createTransactionsTable: Migration = {
up: async (db) => {
await sql`
CREATE SEQUENCE IF NOT EXISTS node_transactions_version_seq
CREATE SEQUENCE IF NOT EXISTS transactions_version_seq
START WITH 1000000000
INCREMENT BY 1
NO MINVALUE
@@ -134,7 +134,7 @@ const createNodeTransactionsTable: Migration = {
`.execute(db);
await db.schema
.createTable('node_transactions')
.createTable('transactions')
.addColumn('id', 'varchar(30)', (col) => col.notNull().primaryKey())
.addColumn('node_id', 'varchar(30)', (col) => col.notNull())
.addColumn('node_type', 'varchar(30)', (col) => col.notNull())
@@ -145,15 +145,13 @@ const createNodeTransactionsTable: Migration = {
.addColumn('created_by', 'varchar(30)', (col) => col.notNull())
.addColumn('server_created_at', 'timestamptz', (col) => col.notNull())
.addColumn('version', 'bigint', (col) =>
col.notNull().defaultTo(sql`nextval('node_transactions_version_seq')`)
col.notNull().defaultTo(sql`nextval('transactions_version_seq')`)
)
.execute();
},
down: async (db) => {
await db.schema.dropTable('node_transactions').execute();
await sql`DROP SEQUENCE IF EXISTS node_transactions_version_seq`.execute(
db
);
await db.schema.dropTable('transactions').execute();
await sql`DROP SEQUENCE IF EXISTS transactions_version_seq`.execute(db);
},
};
@@ -283,10 +281,10 @@ const createCollaborationsTable: Migration = {
},
};
const createCollaborationRevocationsTable: Migration = {
const createDeletedCollaborationsTable: Migration = {
up: async (db) => {
await sql`
CREATE SEQUENCE IF NOT EXISTS collaboration_revocations_version_seq
CREATE SEQUENCE IF NOT EXISTS deleted_collaborations_version_seq
START WITH 1000000000
INCREMENT BY 1
NO MINVALUE
@@ -295,7 +293,7 @@ const createCollaborationRevocationsTable: Migration = {
`.execute(db);
await db.schema
.createTable('collaboration_revocations')
.createTable('deleted_collaborations')
.addColumn('user_id', 'varchar(30)', (col) => col.notNull())
.addColumn('node_id', 'varchar(30)', (col) => col.notNull())
.addColumn('workspace_id', 'varchar(30)', (col) => col.notNull())
@@ -303,9 +301,9 @@ const createCollaborationRevocationsTable: Migration = {
.addColumn('version', 'bigint', (col) =>
col
.notNull()
.defaultTo(sql`nextval('collaboration_revocations_version_seq')`)
.defaultTo(sql`nextval('deleted_collaborations_version_seq')`)
)
.addPrimaryKeyConstraint('collaboration_revocations_pkey', [
.addPrimaryKeyConstraint('deleted_collaborations_pkey', [
'user_id',
'node_id',
])
@@ -316,10 +314,10 @@ const createCollaborationRevocationsTable: Migration = {
BEGIN
IF NEW.roles = '{}' THEN
DELETE FROM collaborations WHERE user_id = NEW.user_id AND node_id = NEW.node_id;
INSERT INTO collaboration_revocations (user_id, node_id, workspace_id, created_at)
INSERT INTO deleted_collaborations (user_id, node_id, workspace_id, created_at)
VALUES (NEW.user_id, NEW.node_id, NEW.workspace_id, NOW());
ELSE
DELETE FROM collaboration_revocations WHERE user_id = NEW.user_id AND node_id = NEW.node_id;
DELETE FROM deleted_collaborations WHERE user_id = NEW.user_id AND node_id = NEW.node_id;
END IF;
RETURN NULL;
END;
@@ -335,7 +333,7 @@ const createCollaborationRevocationsTable: Migration = {
`.execute(db);
},
down: async (db) => {
await db.schema.dropTable('collaboration_revocations').execute();
await db.schema.dropTable('deleted_collaborations').execute();
},
};
@@ -421,11 +419,10 @@ export const databaseMigrations: Record<string, Migration> = {
'00003_create_workspaces_table': createWorkspacesTable,
'00004_create_workspace_users_table': createWorkspaceUsersTable,
'00005_create_nodes_table': createNodesTable,
'00006_create_node_transactions_table': createNodeTransactionsTable,
'00006_create_transactions_table': createTransactionsTable,
'00007_create_node_paths_table': createNodePathsTable,
'00008_create_collaborations_table': createCollaborationsTable,
'00009_create_collaboration_revocations_table':
createCollaborationRevocationsTable,
'00009_create_deleted_collaborations_table': createDeletedCollaborationsTable,
'00010_create_uploads_table': createUploadsTable,
'00011_create_interactions_table': createInteractionsTable,
};

View File

@@ -104,7 +104,7 @@ export type SelectNode = Selectable<NodeTable>;
export type CreateNode = Insertable<NodeTable>;
export type UpdateNode = Updateable<NodeTable>;
interface NodeTransactionTable {
interface TransactionTable {
id: ColumnType<string, string, never>;
node_id: ColumnType<string, string, never>;
node_type: ColumnType<NodeType, NodeType, never>;
@@ -117,9 +117,9 @@ interface NodeTransactionTable {
version: ColumnType<bigint, never, never>;
}
export type SelectNodeTransaction = Selectable<NodeTransactionTable>;
export type CreateNodeTransaction = Insertable<NodeTransactionTable>;
export type UpdateNodeTransaction = Updateable<NodeTransactionTable>;
export type SelectTransaction = Selectable<TransactionTable>;
export type CreateTransaction = Insertable<TransactionTable>;
export type UpdateTransaction = Updateable<TransactionTable>;
interface CollaborationTable {
user_id: ColumnType<string, string, never>;
@@ -135,7 +135,7 @@ export type SelectCollaboration = Selectable<CollaborationTable>;
export type CreateCollaboration = Insertable<CollaborationTable>;
export type UpdateCollaboration = Updateable<CollaborationTable>;
interface CollaborationRevocationTable {
interface DeletedCollaborationTable {
user_id: ColumnType<string, never, never>;
node_id: ColumnType<string, never, never>;
workspace_id: ColumnType<string, string, never>;
@@ -143,12 +143,9 @@ interface CollaborationRevocationTable {
version: ColumnType<bigint, never, never>;
}
export type SelectCollaborationRevocation =
Selectable<CollaborationRevocationTable>;
export type CreateCollaborationRevocation =
Insertable<CollaborationRevocationTable>;
export type UpdateCollaborationRevocation =
Updateable<CollaborationRevocationTable>;
export type SelectDeletedCollaboration = Selectable<DeletedCollaborationTable>;
export type CreateDeletedCollaboration = Insertable<DeletedCollaborationTable>;
export type UpdateDeletedCollaboration = Updateable<DeletedCollaborationTable>;
interface NodePathTable {
ancestor_id: ColumnType<string, string, never>;
@@ -205,9 +202,9 @@ export interface DatabaseSchema {
workspaces: WorkspaceTable;
workspace_users: WorkspaceUserTable;
nodes: NodeTable;
node_transactions: NodeTransactionTable;
transactions: TransactionTable;
collaborations: CollaborationTable;
collaboration_revocations: CollaborationRevocationTable;
deleted_collaborations: DeletedCollaborationTable;
node_paths: NodePathTable;
uploads: UploadTable;
interactions: InteractionTable;

View File

@@ -2,7 +2,7 @@ import { generateId, IdType } from '@colanode/core';
import { DeleteObjectCommand } from '@aws-sdk/client-s3';
import { database } from '@/data/database';
import { CreateNodeTransaction, SelectUpload } from '@/data/schema';
import { CreateTransaction, SelectUpload } from '@/data/schema';
import { JobHandler } from '@/types/jobs';
import { filesStorage, BUCKET_NAMES } from '@/data/storage';
import { eventBus } from '@/lib/event-bus';
@@ -32,7 +32,7 @@ export const cleanNodeDataHandler: JobHandler<CleanNodeDataInput> = async (
logger.trace(`Cleaning node data for ${input.nodeId}`);
const deleteTransactions = await database
.selectFrom('node_transactions')
.selectFrom('transactions')
.selectAll()
.where('node_id', '=', input.nodeId)
.execute();
@@ -101,7 +101,7 @@ const deleteChildren = async (
: [];
const nodeIds: string[] = descendants.map((d) => d.id);
const transactionsToCreate: CreateNodeTransaction[] = descendants.map(
const transactionsToCreate: CreateTransaction[] = descendants.map(
(descendant) => ({
id: generateId(IdType.Transaction),
node_id: descendant.id,
@@ -117,12 +117,12 @@ const deleteChildren = async (
await database.transaction().execute(async (trx) => {
await trx
.deleteFrom('node_transactions')
.deleteFrom('transactions')
.where('node_id', 'in', nodeIds)
.execute();
const createdTransactions = await trx
.insertInto('node_transactions')
.insertInto('transactions')
.returningAll()
.values(transactionsToCreate)
.execute();

View File

@@ -4,19 +4,21 @@ import {
Node,
NodeOutput,
NodeRole,
NodeType, ServerCollaboration,
ServerCollaborationRevocation,
NodeType,
ServerCollaboration,
ServerDeletedCollaboration,
ServerInteraction,
ServerNodeTransaction} from '@colanode/core';
ServerTransaction,
} from '@colanode/core';
import { encodeState } from '@colanode/crdt';
import { database } from '@/data/database';
import {
SelectCollaboration,
SelectCollaborationRevocation,
SelectDeletedCollaboration,
SelectInteraction,
SelectNode,
SelectNodeTransaction,
SelectTransaction,
} from '@/data/schema';
import { NodeCollaborator } from '@/types/nodes';
@@ -50,9 +52,9 @@ export const mapNode = (node: SelectNode): Node => {
} as Node;
};
export const mapNodeTransaction = (
transaction: SelectNodeTransaction
): ServerNodeTransaction => {
export const mapTransaction = (
transaction: SelectTransaction
): ServerTransaction => {
if (transaction.operation === 'create' && transaction.data) {
return {
id: transaction.id,
@@ -100,15 +102,15 @@ export const mapNodeTransaction = (
throw new Error('Unknown transaction type');
};
export const mapCollaborationRevocation = (
revocation: SelectCollaborationRevocation
): ServerCollaborationRevocation => {
export const mapDeletedCollaboration = (
deletedCollaboration: SelectDeletedCollaboration
): ServerDeletedCollaboration => {
return {
userId: revocation.user_id,
nodeId: revocation.node_id,
workspaceId: revocation.workspace_id,
createdAt: revocation.created_at.toISOString(),
version: revocation.version.toString(),
userId: deletedCollaboration.user_id,
nodeId: deletedCollaboration.node_id,
workspaceId: deletedCollaboration.workspace_id,
createdAt: deletedCollaboration.created_at.toISOString(),
version: deletedCollaboration.version.toString(),
};
};

View File

@@ -1,8 +1,8 @@
import { GetNodeTransactionsOutput, hasViewerAccess } from '@colanode/core';
import { GetTransactionsOutput, hasViewerAccess } from '@colanode/core';
import { Request, Response, Router } from 'express';
import { database } from '@/data/database';
import { fetchNodeRole, mapNodeTransaction } from '@/lib/nodes';
import { fetchNodeRole, mapTransaction } from '@/lib/nodes';
import { ApiError } from '@/types/api';
export const nodesRouter = Router();
@@ -60,14 +60,14 @@ nodesRouter.get(
}
const transactions = await database
.selectFrom('node_transactions')
.selectFrom('transactions')
.selectAll()
.where('node_id', '=', nodeId)
.orderBy('version', 'desc')
.execute();
const output: GetNodeTransactionsOutput = {
transactions: transactions.map(mapNodeTransaction),
const output: GetTransactionsOutput = {
transactions: transactions.map(mapTransaction),
};
res.status(200).json(output);

View File

@@ -1,11 +1,11 @@
import {
LocalCreateNodeTransaction,
LocalDeleteNodeTransaction,
LocalNodeTransaction,
LocalUpdateNodeTransaction,
SyncNodeTransactionResult,
SyncNodeTransactionsInput,
SyncNodeTransactionStatus,
LocalCreateTransaction,
LocalDeleteTransaction,
LocalTransaction,
LocalUpdateTransaction,
SyncTransactionResult,
SyncTransactionsInput,
SyncTransactionStatus,
} from '@colanode/core';
import { Request, Response, Router } from 'express';
@@ -26,7 +26,7 @@ syncRouter.post('/:workspaceId', async (req: Request, res: Response) => {
}
const workspaceId = req.params.workspaceId as string;
const input = req.body as SyncNodeTransactionsInput;
const input = req.body as SyncTransactionsInput;
const workspaceUser = await database
.selectFrom('workspace_users')
@@ -43,13 +43,10 @@ syncRouter.post('/:workspaceId', async (req: Request, res: Response) => {
return;
}
const results: SyncNodeTransactionResult[] = [];
const results: SyncTransactionResult[] = [];
for (const transaction of input.transactions) {
try {
const status = await handleLocalNodeTransaction(
workspaceUser,
transaction
);
const status = await handleLocalTransaction(workspaceUser, transaction);
results.push({
id: transaction.id,
status: status,
@@ -67,26 +64,26 @@ syncRouter.post('/:workspaceId', async (req: Request, res: Response) => {
res.status(200).json({ results });
});
const handleLocalNodeTransaction = async (
const handleLocalTransaction = async (
workspaceUser: SelectWorkspaceUser,
transaction: LocalNodeTransaction
): Promise<SyncNodeTransactionStatus> => {
transaction: LocalTransaction
): Promise<SyncTransactionStatus> => {
if (transaction.operation === 'create') {
return await handleCreateNodeTransaction(workspaceUser, transaction);
return await handleCreateTransaction(workspaceUser, transaction);
} else if (transaction.operation === 'update') {
return await handleUpdateNodeTransaction(workspaceUser, transaction);
return await handleUpdateTransaction(workspaceUser, transaction);
} else if (transaction.operation === 'delete') {
return await handleDeleteNodeTransaction(workspaceUser, transaction);
return await handleDeleteTransaction(workspaceUser, transaction);
} else {
return 'error';
}
};
const handleCreateNodeTransaction = async (
const handleCreateTransaction = async (
workspaceUser: SelectWorkspaceUser,
transaction: LocalCreateNodeTransaction
): Promise<SyncNodeTransactionStatus> => {
const output = await nodeService.applyNodeCreateTransaction(workspaceUser, {
transaction: LocalCreateTransaction
): Promise<SyncTransactionStatus> => {
const output = await nodeService.applyCreateTransaction(workspaceUser, {
id: transaction.id,
nodeId: transaction.nodeId,
data: transaction.data,
@@ -100,11 +97,11 @@ const handleCreateNodeTransaction = async (
return 'success';
};
const handleUpdateNodeTransaction = async (
const handleUpdateTransaction = async (
workspaceUser: SelectWorkspaceUser,
transaction: LocalUpdateNodeTransaction
): Promise<SyncNodeTransactionStatus> => {
const output = await nodeService.applyNodeUpdateTransaction(workspaceUser, {
transaction: LocalUpdateTransaction
): Promise<SyncTransactionStatus> => {
const output = await nodeService.applyUpdateTransaction(workspaceUser, {
id: transaction.id,
nodeId: transaction.nodeId,
userId: transaction.createdBy,
@@ -119,11 +116,11 @@ const handleUpdateNodeTransaction = async (
return 'success';
};
const handleDeleteNodeTransaction = async (
const handleDeleteTransaction = async (
workspaceUser: SelectWorkspaceUser,
transaction: LocalDeleteNodeTransaction
): Promise<SyncNodeTransactionStatus> => {
const output = await nodeService.applyNodeDeleteTransaction(workspaceUser, {
transaction: LocalDeleteTransaction
): Promise<SyncTransactionStatus> => {
const output = await nodeService.applyDeleteTransaction(workspaceUser, {
id: transaction.id,
nodeId: transaction.nodeId,
createdAt: new Date(transaction.createdAt),

View File

@@ -14,7 +14,7 @@ import {
import { Request, Response, Router } from 'express';
import { database } from '@/data/database';
import { fetchNode, mapNode, mapNodeTransaction } from '@/lib/nodes';
import { fetchNode, mapNode, mapTransaction } from '@/lib/nodes';
import { getNameFromEmail } from '@/lib/utils';
import { nodeService } from '@/services/node-service';
import { workspaceService } from '@/services/workspace-service';
@@ -606,7 +606,7 @@ workspacesRouter.put(
}
res.status(200).json({
transaction: mapNodeTransaction(updateUserOutput.transaction),
transaction: mapTransaction(updateUserOutput.transaction),
});
}
);

View File

@@ -16,7 +16,7 @@ import { database } from '@/data/database';
import {
CreateCollaboration,
CreateNode,
CreateNodeTransaction,
CreateTransaction,
DatabaseSchema,
SelectWorkspaceUser,
} from '@/data/schema';
@@ -24,12 +24,12 @@ import { eventBus } from '@/lib/event-bus';
import { fetchNodeAncestors, mapNode } from '@/lib/nodes';
import { createLogger } from '@/lib/logger';
import {
ApplyNodeCreateTransactionInput,
ApplyNodeCreateTransactionOutput,
ApplyNodeDeleteTransactionInput,
ApplyNodeDeleteTransactionOutput,
ApplyNodeUpdateTransactionInput,
ApplyNodeUpdateTransactionOutput,
ApplyCreateTransactionInput,
ApplyCreateTransactionOutput,
ApplyDeleteTransactionInput,
ApplyDeleteTransactionOutput,
ApplyUpdateTransactionInput,
ApplyUpdateTransactionOutput,
CreateNodeInput,
CreateNodeOutput,
UpdateNodeInput,
@@ -73,7 +73,7 @@ class NodeService {
transaction_id: transactionId,
};
const createTransaction: CreateNodeTransaction = {
const createTransaction: CreateTransaction = {
id: transactionId,
node_id: input.nodeId,
node_type: input.attributes.type,
@@ -143,7 +143,7 @@ class NodeService {
}
const previousTransactions = await database
.selectFrom('node_transactions')
.selectFrom('transactions')
.selectAll()
.where('node_id', '=', input.nodeId)
.orderBy('id', 'asc')
@@ -200,7 +200,7 @@ class NodeService {
}
const createdTransaction = await trx
.insertInto('node_transactions')
.insertInto('transactions')
.returningAll()
.values({
id: transactionId,
@@ -250,10 +250,10 @@ class NodeService {
}
}
public async applyNodeCreateTransaction(
public async applyCreateTransaction(
workspaceUser: SelectWorkspaceUser,
input: ApplyNodeCreateTransactionInput
): Promise<ApplyNodeCreateTransactionOutput | null> {
input: ApplyCreateTransactionInput
): Promise<ApplyCreateTransactionOutput | null> {
const ydoc = new YDoc();
ydoc.applyUpdate(input.data);
const attributes = ydoc.getAttributes<NodeAttributes>();
@@ -289,7 +289,7 @@ class NodeService {
transaction_id: input.id,
};
const createTransaction: CreateNodeTransaction = {
const createTransaction: CreateTransaction = {
id: input.id,
node_id: input.nodeId,
node_type: attributes.type,
@@ -327,16 +327,13 @@ class NodeService {
}
}
public async applyNodeUpdateTransaction(
public async applyUpdateTransaction(
workspaceUser: SelectWorkspaceUser,
input: ApplyNodeUpdateTransactionInput
): Promise<ApplyNodeUpdateTransactionOutput | null> {
input: ApplyUpdateTransactionInput
): Promise<ApplyUpdateTransactionOutput | null> {
let count = 0;
while (count < UPDATE_RETRIES_LIMIT) {
const result = await this.tryApplyNodeUpdateTransaction(
workspaceUser,
input
);
const result = await this.tryApplyUpdateTransaction(workspaceUser, input);
if (result.type === 'success') {
return result.output;
@@ -352,10 +349,10 @@ class NodeService {
return null;
}
private async tryApplyNodeUpdateTransaction(
private async tryApplyUpdateTransaction(
workspaceUser: SelectWorkspaceUser,
input: ApplyNodeUpdateTransactionInput
): Promise<UpdateResult<ApplyNodeUpdateTransactionOutput>> {
input: ApplyUpdateTransactionInput
): Promise<UpdateResult<ApplyUpdateTransactionOutput>> {
const ancestorRows = await fetchNodeAncestors(input.nodeId);
const ancestors = ancestorRows.map(mapNode);
@@ -365,7 +362,7 @@ class NodeService {
}
const previousTransactions = await database
.selectFrom('node_transactions')
.selectFrom('transactions')
.selectAll()
.where('node_id', '=', input.nodeId)
.orderBy('version', 'asc')
@@ -428,7 +425,7 @@ class NodeService {
}
const createdTransaction = await trx
.insertInto('node_transactions')
.insertInto('transactions')
.returningAll()
.values({
id: input.id,
@@ -509,10 +506,10 @@ class NodeService {
}
}
public async applyNodeDeleteTransaction(
public async applyDeleteTransaction(
workspaceUser: SelectWorkspaceUser,
input: ApplyNodeDeleteTransactionInput
): Promise<ApplyNodeDeleteTransactionOutput | null> {
input: ApplyDeleteTransactionInput
): Promise<ApplyDeleteTransactionOutput | null> {
const ancestorRows = await fetchNodeAncestors(input.nodeId);
const ancestors = ancestorRows.map(mapNode);
const node = ancestors.find((ancestor) => ancestor.id === input.nodeId);
@@ -547,12 +544,12 @@ class NodeService {
}
await trx
.deleteFrom('node_transactions')
.deleteFrom('transactions')
.where('node_id', '=', input.nodeId)
.execute();
const createdTransaction = await trx
.insertInto('node_transactions')
.insertInto('transactions')
.returningAll()
.values({
id: input.id,
@@ -607,7 +604,7 @@ class NodeService {
private async applyDatabaseCreateTransaction(
attributes: NodeAttributes,
node: CreateNode,
transaction: CreateNodeTransaction
transaction: CreateTransaction
) {
const collaborationsToCreate: CreateCollaboration[] = Object.entries(
extractNodeCollaborators(attributes)
@@ -631,7 +628,7 @@ class NodeService {
}
const createdTransaction = await trx
.insertInto('node_transactions')
.insertInto('transactions')
.returningAll()
.values(transaction)
.executeTakeFirst();

View File

@@ -1,14 +1,14 @@
import { WebSocket } from 'ws';
import {
CollaborationRevocationsBatchMessage,
CollaborationsBatchMessage,
InitSyncConsumerMessage,
InteractionsBatchMessage,
Message,
NodeTransactionsBatchMessage,
TransactionsBatchMessage,
NodeType,
SyncConsumerType,
WorkspaceStatus,
DeletedCollaborationsBatchMessage,
} from '@colanode/core';
import { interactionService } from '@/services/interaction-service';
@@ -17,9 +17,9 @@ import { RequestAccount } from '@/types/api';
import { database } from '@/data/database';
import {
mapCollaboration,
mapCollaborationRevocation,
mapDeletedCollaboration,
mapInteraction,
mapNodeTransaction,
mapTransaction,
} from '@/lib/nodes';
import {
CollaboratorAddedEvent,
@@ -131,8 +131,8 @@ export class SocketConnection {
) {
if (consumer.type === 'transactions') {
this.consumeTransactions(user, consumer);
} else if (consumer.type === 'revocations') {
this.consumeRevocations(user, consumer);
} else if (consumer.type === 'deleted_collaborations') {
this.consumeDeletedCollaborations(user, consumer);
} else if (consumer.type === 'collaborations') {
this.consumeCollaborations(user, consumer);
} else if (consumer.type === 'interactions') {
@@ -154,24 +154,24 @@ export class SocketConnection {
);
const unsyncedTransactions = await database
.selectFrom('node_transactions as nt')
.selectFrom('transactions as t')
.leftJoin('collaborations as c', (join) =>
join
.on('c.user_id', '=', user.userId)
.onRef('c.node_id', '=', 'nt.node_id')
.onRef('c.node_id', '=', 't.node_id')
)
.selectAll('nt')
.selectAll('t')
.where((eb) =>
eb.or([
eb.and([
eb('nt.workspace_id', '=', user.workspaceId),
eb('nt.node_type', 'in', PUBLIC_NODES),
eb('t.workspace_id', '=', user.workspaceId),
eb('t.node_type', 'in', PUBLIC_NODES),
]),
eb('c.node_id', '=', eb.ref('nt.node_id')),
eb('c.node_id', '=', eb.ref('t.node_id')),
])
)
.where('nt.version', '>', BigInt(consumer.cursor))
.orderBy('nt.version', 'asc')
.where('t.version', '>', BigInt(consumer.cursor))
.orderBy('t.version', 'asc')
.limit(20)
.execute();
@@ -180,9 +180,9 @@ export class SocketConnection {
return;
}
const transactions = unsyncedTransactions.map(mapNodeTransaction);
const message: NodeTransactionsBatchMessage = {
type: 'node_transactions_batch',
const transactions = unsyncedTransactions.map(mapTransaction);
const message: TransactionsBatchMessage = {
type: 'transactions_batch',
userId: user.userId,
transactions,
};
@@ -191,7 +191,7 @@ export class SocketConnection {
this.sendMessage(message);
}
private async consumeRevocations(
private async consumeDeletedCollaborations(
user: SocketUser,
consumer: SocketSyncConsumer
) {
@@ -202,28 +202,30 @@ export class SocketConnection {
consumer.fetching = true;
this.logger.trace(
consumer,
`Sending pending collaboration revocations for ${this.account.id} with ${consumer.type}`
`Sending pending deleted collaborations for ${this.account.id} with ${consumer.type}`
);
const unsyncedRevocations = await database
.selectFrom('collaboration_revocations as cr')
const unsyncedDeletedCollaborations = await database
.selectFrom('deleted_collaborations as dc')
.selectAll()
.where('cr.user_id', '=', user.userId)
.where('cr.version', '>', BigInt(consumer.cursor))
.orderBy('cr.version', 'asc')
.where('dc.user_id', '=', user.userId)
.where('dc.version', '>', BigInt(consumer.cursor))
.orderBy('dc.version', 'asc')
.limit(50)
.execute();
if (unsyncedRevocations.length === 0) {
if (unsyncedDeletedCollaborations.length === 0) {
consumer.fetching = false;
return;
}
const revocations = unsyncedRevocations.map(mapCollaborationRevocation);
const message: CollaborationRevocationsBatchMessage = {
type: 'collaboration_revocations_batch',
const deletedCollaborations = unsyncedDeletedCollaborations.map(
mapDeletedCollaboration
);
const message: DeletedCollaborationsBatchMessage = {
type: 'deleted_collaborations_batch',
userId: user.userId,
revocations,
deletedCollaborations,
};
user.consumers.delete(consumer.type);
@@ -368,9 +370,11 @@ export class SocketConnection {
this.consumePendingSync(user, transactionsConsumer);
}
const revocationsConsumer = user.consumers.get('revocations');
if (revocationsConsumer) {
this.consumePendingSync(user, revocationsConsumer);
const deletedCollaborationsConsumer = user.consumers.get(
'deleted_collaborations'
);
if (deletedCollaborationsConsumer) {
this.consumePendingSync(user, deletedCollaborationsConsumer);
}
}
}
@@ -396,9 +400,11 @@ export class SocketConnection {
continue;
}
const revocationsConsumer = user.consumers.get('revocations');
if (revocationsConsumer) {
this.consumePendingSync(user, revocationsConsumer);
const deletedCollaborationsConsumer = user.consumers.get(
'deleted_collaborations'
);
if (deletedCollaborationsConsumer) {
this.consumePendingSync(user, deletedCollaborationsConsumer);
}
}
}

View File

@@ -1,6 +1,6 @@
import { Node, NodeAttributes } from '@colanode/core';
import { SelectNode, SelectNodeTransaction } from '@/data/schema';
import { SelectNode, SelectTransaction } from '@/data/schema';
export type NodeCollaborator = {
nodeId: string;
@@ -18,7 +18,7 @@ export type CreateNodeInput = {
export type CreateNodeOutput = {
node: SelectNode;
transaction: SelectNodeTransaction;
transaction: SelectTransaction;
};
export type UpdateNodeInput = {
@@ -30,22 +30,22 @@ export type UpdateNodeInput = {
export type UpdateNodeOutput = {
node: SelectNode;
transaction: SelectNodeTransaction;
transaction: SelectTransaction;
};
export type ApplyNodeCreateTransactionInput = {
export type ApplyCreateTransactionInput = {
id: string;
nodeId: string;
data: string | Uint8Array;
createdAt: Date;
};
export type ApplyNodeCreateTransactionOutput = {
export type ApplyCreateTransactionOutput = {
node: SelectNode;
transaction: SelectNodeTransaction;
transaction: SelectTransaction;
};
export type ApplyNodeUpdateTransactionInput = {
export type ApplyUpdateTransactionInput = {
id: string;
nodeId: string;
userId: string;
@@ -53,18 +53,18 @@ export type ApplyNodeUpdateTransactionInput = {
createdAt: Date;
};
export type ApplyNodeUpdateTransactionOutput = {
export type ApplyUpdateTransactionOutput = {
node: SelectNode;
transaction: SelectNodeTransaction;
transaction: SelectTransaction;
};
export type ApplyNodeDeleteTransactionInput = {
export type ApplyDeleteTransactionInput = {
id: string;
nodeId: string;
createdAt: Date;
};
export type ApplyNodeDeleteTransactionOutput = {
export type ApplyDeleteTransactionOutput = {
node: SelectNode;
transaction: SelectNodeTransaction;
transaction: SelectTransaction;
};

View File

@@ -1,24 +1,24 @@
import { InteractionEvent } from './interactions';
import {
ServerCollaboration,
ServerCollaborationRevocation,
ServerDeletedCollaboration,
ServerInteraction,
ServerNodeTransaction,
ServerTransaction,
SyncConsumerType,
} from './sync';
import { NodeType } from '../registry';
export type NodeTransactionsBatchMessage = {
type: 'node_transactions_batch';
export type TransactionsBatchMessage = {
type: 'transactions_batch';
userId: string;
transactions: ServerNodeTransaction[];
transactions: ServerTransaction[];
};
export type CollaborationRevocationsBatchMessage = {
type: 'collaboration_revocations_batch';
export type DeletedCollaborationsBatchMessage = {
type: 'deleted_collaborations_batch';
userId: string;
revocations: ServerCollaborationRevocation[];
deletedCollaborations: ServerDeletedCollaboration[];
};
export type CollaborationsBatchMessage = {
@@ -49,9 +49,9 @@ export type InitSyncConsumerMessage = {
};
export type Message =
| NodeTransactionsBatchMessage
| CollaborationRevocationsBatchMessage
| TransactionsBatchMessage
| CollaborationsBatchMessage
| DeletedCollaborationsBatchMessage
| InteractionsBatchMessage
| SyncInteractionsMessage
| InitSyncConsumerMessage;

View File

@@ -1,4 +1,4 @@
import { ServerNodeTransaction } from './sync';
import { ServerTransaction } from './sync';
import { NodeAttributes } from '../registry';
@@ -16,6 +16,6 @@ export type NodeOutput = {
transactionId: string;
};
export type GetNodeTransactionsOutput = {
transactions: ServerNodeTransaction[];
export type GetTransactionsOutput = {
transactions: ServerTransaction[];
};

View File

@@ -3,27 +3,27 @@ import { InteractionAttributes } from './interactions';
import { NodeType } from '../registry';
import { NodeRole } from '../registry/core';
export type SyncNodeTransactionsInput = {
transactions: LocalNodeTransaction[];
export type SyncTransactionsInput = {
transactions: LocalTransaction[];
};
export type SyncNodeTransactionsOutput = {
results: SyncNodeTransactionResult[];
export type SyncTransactionsOutput = {
results: SyncTransactionResult[];
};
export type SyncNodeTransactionStatus = 'success' | 'error';
export type SyncTransactionStatus = 'success' | 'error';
export type SyncNodeTransactionResult = {
export type SyncTransactionResult = {
id: string;
status: SyncNodeTransactionStatus;
status: SyncTransactionStatus;
};
export type LocalNodeTransaction =
| LocalCreateNodeTransaction
| LocalUpdateNodeTransaction
| LocalDeleteNodeTransaction;
export type LocalTransaction =
| LocalCreateTransaction
| LocalUpdateTransaction
| LocalDeleteTransaction;
export type LocalCreateNodeTransaction = {
export type LocalCreateTransaction = {
id: string;
nodeId: string;
nodeType: string;
@@ -33,7 +33,7 @@ export type LocalCreateNodeTransaction = {
createdBy: string;
};
export type LocalUpdateNodeTransaction = {
export type LocalUpdateTransaction = {
id: string;
nodeId: string;
nodeType: string;
@@ -43,7 +43,7 @@ export type LocalUpdateNodeTransaction = {
createdBy: string;
};
export type LocalDeleteNodeTransaction = {
export type LocalDeleteTransaction = {
id: string;
nodeId: string;
nodeType: string;
@@ -52,7 +52,7 @@ export type LocalDeleteNodeTransaction = {
createdBy: string;
};
export type ServerNodeCreateTransaction = {
export type ServerCreateTransaction = {
id: string;
operation: 'create';
nodeId: string;
@@ -65,7 +65,7 @@ export type ServerNodeCreateTransaction = {
version: string;
};
export type ServerNodeUpdateTransaction = {
export type ServerUpdateTransaction = {
id: string;
operation: 'update';
nodeId: string;
@@ -78,7 +78,7 @@ export type ServerNodeUpdateTransaction = {
version: string;
};
export type ServerNodeDeleteTransaction = {
export type ServerDeleteTransaction = {
id: string;
operation: 'delete';
nodeId: string;
@@ -90,12 +90,12 @@ export type ServerNodeDeleteTransaction = {
version: string;
};
export type ServerNodeTransaction =
| ServerNodeCreateTransaction
| ServerNodeUpdateTransaction
| ServerNodeDeleteTransaction;
export type ServerTransaction =
| ServerCreateTransaction
| ServerUpdateTransaction
| ServerDeleteTransaction;
export type ServerCollaborationRevocation = {
export type ServerDeletedCollaboration = {
userId: string;
nodeId: string;
workspaceId: string;
@@ -129,5 +129,5 @@ export type ServerInteraction = {
export type SyncConsumerType =
| 'transactions'
| 'collaborations'
| 'revocations'
| 'deleted_collaborations'
| 'interactions';