mirror of
https://github.com/colanode/colanode.git
synced 2026-05-18 13:15:12 +02:00
Refactor node updates
This commit is contained in:
@@ -215,41 +215,41 @@ export class MutationService {
|
||||
|
||||
if (
|
||||
previousMutation.type === 'create_node' &&
|
||||
previousMutation.data.id === mutation.data.id
|
||||
previousMutation.data.nodeId === mutation.data.nodeId
|
||||
) {
|
||||
deletedMutationIds.add(mutation.id);
|
||||
deletedMutationIds.add(previousMutation.id);
|
||||
} else if (
|
||||
previousMutation.type === 'update_node' &&
|
||||
previousMutation.data.id === mutation.data.id
|
||||
previousMutation.data.nodeId === mutation.data.nodeId
|
||||
) {
|
||||
deletedMutationIds.add(mutation.id);
|
||||
deletedMutationIds.add(previousMutation.id);
|
||||
} else if (
|
||||
previousMutation.type === 'delete_node' &&
|
||||
previousMutation.data.id === mutation.data.id
|
||||
previousMutation.data.nodeId === mutation.data.nodeId
|
||||
) {
|
||||
deletedMutationIds.add(previousMutation.id);
|
||||
} else if (previousMutation.type === 'update_document') {
|
||||
deletedMutationIds.add(previousMutation.id);
|
||||
} else if (
|
||||
previousMutation.type === 'mark_node_seen' &&
|
||||
previousMutation.data.nodeId === mutation.data.id
|
||||
previousMutation.data.nodeId === mutation.data.nodeId
|
||||
) {
|
||||
deletedMutationIds.add(previousMutation.id);
|
||||
} else if (
|
||||
previousMutation.type === 'mark_node_opened' &&
|
||||
previousMutation.data.nodeId === mutation.data.id
|
||||
previousMutation.data.nodeId === mutation.data.nodeId
|
||||
) {
|
||||
deletedMutationIds.add(previousMutation.id);
|
||||
} else if (
|
||||
previousMutation.type === 'create_node_reaction' &&
|
||||
previousMutation.data.nodeId === mutation.data.id
|
||||
previousMutation.data.nodeId === mutation.data.nodeId
|
||||
) {
|
||||
deletedMutationIds.add(previousMutation.id);
|
||||
} else if (
|
||||
previousMutation.type === 'delete_node_reaction' &&
|
||||
previousMutation.data.nodeId === mutation.data.id
|
||||
previousMutation.data.nodeId === mutation.data.nodeId
|
||||
) {
|
||||
deletedMutationIds.add(previousMutation.id);
|
||||
}
|
||||
|
||||
@@ -4,7 +4,7 @@ import {
|
||||
createDebugger,
|
||||
NodeAttributes,
|
||||
DeleteNodeMutationData,
|
||||
SyncNodeData,
|
||||
SyncNodeUpdateData,
|
||||
SyncNodeTombstoneData,
|
||||
getNodeModel,
|
||||
CreateNodeMutationData,
|
||||
@@ -74,6 +74,7 @@ export class NodeService {
|
||||
throw new Error('Invalid attributes');
|
||||
}
|
||||
|
||||
const updateId = generateId(IdType.Update);
|
||||
const createdAt = new Date().toISOString();
|
||||
const rootId = tree[0]?.id ?? input.id;
|
||||
const nodeText = model.extractNodeText(input.id, input.attributes);
|
||||
@@ -99,22 +100,24 @@ export class NodeService {
|
||||
throw new Error('Failed to create entry');
|
||||
}
|
||||
|
||||
const createdState = await trx
|
||||
.insertInto('node_states')
|
||||
const createdNodeUpdate = await trx
|
||||
.insertInto('node_updates')
|
||||
.returningAll()
|
||||
.values({
|
||||
id: input.id,
|
||||
state: update,
|
||||
revision: BigInt(0),
|
||||
id: updateId,
|
||||
node_id: input.id,
|
||||
data: update,
|
||||
created_at: createdAt,
|
||||
})
|
||||
.executeTakeFirst();
|
||||
|
||||
if (!createdState) {
|
||||
throw new Error('Failed to create state');
|
||||
if (!createdNodeUpdate) {
|
||||
throw new Error('Failed to create node update');
|
||||
}
|
||||
|
||||
const mutationData: CreateNodeMutationData = {
|
||||
id: input.id,
|
||||
nodeId: input.id,
|
||||
updateId: updateId,
|
||||
data: encodeState(update),
|
||||
createdAt: createdAt,
|
||||
};
|
||||
@@ -203,8 +206,6 @@ export class NodeService {
|
||||
const updatedAt = new Date().toISOString();
|
||||
const updatedAttributes = updater(node.attributes as T);
|
||||
|
||||
const model = getNodeModel(updatedAttributes.type);
|
||||
|
||||
const canUpdateAttributesContext: CanUpdateAttributesContext = {
|
||||
user: {
|
||||
id: this.workspace.userId,
|
||||
@@ -217,29 +218,25 @@ export class NodeService {
|
||||
attributes: updatedAttributes,
|
||||
};
|
||||
|
||||
const model = getNodeModel(updatedAttributes.type);
|
||||
if (!model.canUpdateAttributes(canUpdateAttributesContext)) {
|
||||
return 'unauthorized';
|
||||
}
|
||||
|
||||
const state = await this.workspace.database
|
||||
const nodeState = await this.workspace.database
|
||||
.selectFrom('node_states')
|
||||
.where('id', '=', nodeId)
|
||||
.selectAll()
|
||||
.executeTakeFirst();
|
||||
|
||||
const updates = await this.workspace.database
|
||||
const nodeUpdates = await this.workspace.database
|
||||
.selectFrom('node_updates')
|
||||
.where('node_id', '=', nodeId)
|
||||
.selectAll()
|
||||
.execute();
|
||||
|
||||
const ydoc = new YDoc();
|
||||
|
||||
if (state) {
|
||||
ydoc.applyUpdate(state.state);
|
||||
}
|
||||
|
||||
for (const update of updates) {
|
||||
const ydoc = new YDoc(nodeState?.state);
|
||||
for (const update of nodeUpdates) {
|
||||
ydoc.applyUpdate(update.data);
|
||||
}
|
||||
|
||||
@@ -289,7 +286,7 @@ export class NodeService {
|
||||
}
|
||||
|
||||
const mutationData: UpdateNodeMutationData = {
|
||||
id: nodeId,
|
||||
nodeId: nodeId,
|
||||
updateId: updateId,
|
||||
data: encodeState(update),
|
||||
createdAt: updatedAt,
|
||||
@@ -400,7 +397,7 @@ export class NodeService {
|
||||
.execute();
|
||||
|
||||
const deleteMutationData: DeleteNodeMutationData = {
|
||||
id: nodeId,
|
||||
nodeId: nodeId,
|
||||
rootId: node.rootId,
|
||||
deletedAt: new Date().toISOString(),
|
||||
};
|
||||
@@ -440,28 +437,42 @@ export class NodeService {
|
||||
}
|
||||
}
|
||||
|
||||
public async syncServerNode(node: SyncNodeData) {
|
||||
const existingNode = await this.workspace.database
|
||||
.selectFrom('nodes')
|
||||
.where('id', '=', node.id)
|
||||
.selectAll()
|
||||
.executeTakeFirst();
|
||||
public async syncServerNodeUpdate(update: SyncNodeUpdateData) {
|
||||
for (let count = 0; count < UPDATE_RETRIES_LIMIT; count++) {
|
||||
try {
|
||||
const existingNode = await this.workspace.database
|
||||
.selectFrom('nodes')
|
||||
.where('id', '=', update.nodeId)
|
||||
.selectAll()
|
||||
.executeTakeFirst();
|
||||
|
||||
if (!existingNode) {
|
||||
return this.createServerNode(node);
|
||||
} else {
|
||||
return this.updateServerNode(node);
|
||||
if (!existingNode) {
|
||||
const result = await this.tryCreateServerNode(update);
|
||||
if (result) {
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
const result = await this.tryUpdateServerNode(existingNode, update);
|
||||
if (result) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
this.debug(`Failed to update node ${update.id}: ${error}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async createServerNode(node: SyncNodeData): Promise<void> {
|
||||
const serverRevision = BigInt(node.revision);
|
||||
private async tryCreateServerNode(
|
||||
update: SyncNodeUpdateData
|
||||
): Promise<boolean> {
|
||||
const serverRevision = BigInt(update.revision);
|
||||
|
||||
const ydoc = new YDoc(node.state);
|
||||
const ydoc = new YDoc(update.state);
|
||||
const attributes = ydoc.getObject<NodeAttributes>();
|
||||
|
||||
const model = getNodeModel(attributes.type);
|
||||
const nodeText = model.extractNodeText(node.id, attributes);
|
||||
const nodeText = model.extractNodeText(update.id, attributes);
|
||||
|
||||
const { createdNode } = await this.workspace.database
|
||||
.transaction()
|
||||
@@ -470,11 +481,11 @@ export class NodeService {
|
||||
.insertInto('nodes')
|
||||
.returningAll()
|
||||
.values({
|
||||
id: node.id,
|
||||
root_id: node.rootId,
|
||||
id: update.nodeId,
|
||||
root_id: update.rootId,
|
||||
attributes: JSON.stringify(attributes),
|
||||
created_at: node.createdAt,
|
||||
created_by: node.createdBy,
|
||||
created_at: update.createdAt,
|
||||
created_by: update.createdBy,
|
||||
local_revision: 0n,
|
||||
server_revision: serverRevision,
|
||||
})
|
||||
@@ -488,9 +499,9 @@ export class NodeService {
|
||||
.insertInto('node_states')
|
||||
.returningAll()
|
||||
.values({
|
||||
id: node.id,
|
||||
id: update.nodeId,
|
||||
revision: serverRevision,
|
||||
state: decodeState(node.state),
|
||||
state: decodeState(update.state),
|
||||
})
|
||||
.executeTakeFirst();
|
||||
|
||||
@@ -498,7 +509,7 @@ export class NodeService {
|
||||
await trx
|
||||
.insertInto('node_texts')
|
||||
.values({
|
||||
id: node.id,
|
||||
id: update.nodeId,
|
||||
name: nodeText.name,
|
||||
attributes: nodeText.attributes,
|
||||
})
|
||||
@@ -509,8 +520,8 @@ export class NodeService {
|
||||
});
|
||||
|
||||
if (!createdNode) {
|
||||
this.debug(`Failed to create node ${node.id}`);
|
||||
return;
|
||||
this.debug(`Failed to create node ${update.nodeId}`);
|
||||
return false;
|
||||
}
|
||||
|
||||
this.debug(`Created node ${createdNode.id} with type ${createdNode.type}`);
|
||||
@@ -521,44 +532,30 @@ export class NodeService {
|
||||
workspaceId: this.workspace.id,
|
||||
node: mapNode(createdNode),
|
||||
});
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
public async updateServerNode(node: SyncNodeData): Promise<void> {
|
||||
for (let count = 0; count < UPDATE_RETRIES_LIMIT; count++) {
|
||||
try {
|
||||
const result = await this.tryUpdateServerNode(node);
|
||||
private async tryUpdateServerNode(
|
||||
existingNode: SelectNode,
|
||||
update: SyncNodeUpdateData
|
||||
): Promise<boolean> {
|
||||
const serverRevision = BigInt(update.revision);
|
||||
|
||||
if (result) {
|
||||
return;
|
||||
}
|
||||
} catch (error) {
|
||||
this.debug(`Failed to update node ${node.id}: ${error}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async tryUpdateServerNode(node: SyncNodeData): Promise<boolean> {
|
||||
const serverRevision = BigInt(node.revision);
|
||||
|
||||
const existingNode = await this.workspace.database
|
||||
.selectFrom('nodes')
|
||||
.where('id', '=', node.id)
|
||||
const nodeState = await this.workspace.database
|
||||
.selectFrom('node_states')
|
||||
.where('id', '=', existingNode.id)
|
||||
.selectAll()
|
||||
.executeTakeFirst();
|
||||
|
||||
if (!existingNode) {
|
||||
await this.createServerNode(node);
|
||||
return true;
|
||||
}
|
||||
|
||||
const ydoc = new YDoc(node.state);
|
||||
const updates = await this.workspace.database
|
||||
.selectFrom('node_updates')
|
||||
.selectAll()
|
||||
.where('node_id', '=', node.id)
|
||||
.where('node_id', '=', existingNode.id)
|
||||
.orderBy('id', 'asc')
|
||||
.execute();
|
||||
|
||||
const ydoc = new YDoc(nodeState?.state);
|
||||
for (const update of updates) {
|
||||
ydoc.applyUpdate(update.data);
|
||||
}
|
||||
@@ -567,7 +564,10 @@ export class NodeService {
|
||||
const localRevision = BigInt(existingNode.local_revision) + BigInt(1);
|
||||
|
||||
const model = getNodeModel(attributes.type);
|
||||
const nodeText = model.extractNodeText(node.id, attributes);
|
||||
const nodeText = model.extractNodeText(existingNode.id, attributes);
|
||||
|
||||
const mergedUpdateIds = update.mergedUpdates?.map((u) => u.id) ?? [];
|
||||
const updatesToDelete = [update.id, ...mergedUpdateIds];
|
||||
|
||||
const { updatedNode } = await this.workspace.database
|
||||
.transaction()
|
||||
@@ -577,12 +577,12 @@ export class NodeService {
|
||||
.returningAll()
|
||||
.set({
|
||||
attributes: JSON.stringify(attributes),
|
||||
updated_at: node.updatedAt,
|
||||
updated_by: node.updatedBy,
|
||||
updated_at: update.createdAt,
|
||||
updated_by: update.createdBy,
|
||||
local_revision: localRevision,
|
||||
server_revision: serverRevision,
|
||||
})
|
||||
.where('id', '=', node.id)
|
||||
.where('id', '=', existingNode.id)
|
||||
.where('local_revision', '=', existingNode.local_revision)
|
||||
.executeTakeFirst();
|
||||
|
||||
@@ -590,31 +590,51 @@ export class NodeService {
|
||||
throw new Error('Failed to update node');
|
||||
}
|
||||
|
||||
await trx
|
||||
.updateTable('node_states')
|
||||
.set({
|
||||
const upsertedState = await trx
|
||||
.insertInto('node_states')
|
||||
.returningAll()
|
||||
.values({
|
||||
id: existingNode.id,
|
||||
revision: serverRevision,
|
||||
state: decodeState(node.state),
|
||||
state: decodeState(update.state),
|
||||
})
|
||||
.where('id', '=', node.id)
|
||||
.onConflict((cb) =>
|
||||
cb
|
||||
.doUpdateSet({
|
||||
revision: serverRevision,
|
||||
state: decodeState(update.state),
|
||||
})
|
||||
.where('revision', '=', BigInt(nodeState?.revision ?? 0))
|
||||
)
|
||||
.executeTakeFirst();
|
||||
|
||||
if (!upsertedState) {
|
||||
throw new Error('Failed to update node state');
|
||||
}
|
||||
|
||||
if (nodeText) {
|
||||
await trx
|
||||
.insertInto('node_texts')
|
||||
.values({
|
||||
id: node.id,
|
||||
id: existingNode.id,
|
||||
name: nodeText.name,
|
||||
attributes: nodeText.attributes,
|
||||
})
|
||||
.execute();
|
||||
}
|
||||
|
||||
if (updatesToDelete.length > 0) {
|
||||
await trx
|
||||
.deleteFrom('node_updates')
|
||||
.where('id', 'in', updatesToDelete)
|
||||
.execute();
|
||||
}
|
||||
|
||||
return { updatedNode };
|
||||
});
|
||||
|
||||
if (!updatedNode) {
|
||||
this.debug(`Failed to update node ${node.id}`);
|
||||
this.debug(`Failed to update node ${existingNode.id}`);
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -701,7 +721,7 @@ export class NodeService {
|
||||
const node = await this.workspace.database
|
||||
.selectFrom('nodes')
|
||||
.selectAll()
|
||||
.where('id', '=', mutation.id)
|
||||
.where('id', '=', mutation.nodeId)
|
||||
.executeTakeFirst();
|
||||
|
||||
if (!node) {
|
||||
@@ -709,33 +729,36 @@ export class NodeService {
|
||||
}
|
||||
|
||||
await this.workspace.database.transaction().execute(async (tx) => {
|
||||
await tx.deleteFrom('nodes').where('id', '=', mutation.id).execute();
|
||||
await tx.deleteFrom('nodes').where('id', '=', mutation.nodeId).execute();
|
||||
|
||||
await tx
|
||||
.deleteFrom('node_updates')
|
||||
.where('node_id', '=', mutation.id)
|
||||
.where('node_id', '=', mutation.nodeId)
|
||||
.execute();
|
||||
|
||||
await tx
|
||||
.deleteFrom('node_interactions')
|
||||
.where('node_id', '=', mutation.id)
|
||||
.where('node_id', '=', mutation.nodeId)
|
||||
.execute();
|
||||
|
||||
await tx
|
||||
.deleteFrom('node_reactions')
|
||||
.where('node_id', '=', mutation.id)
|
||||
.where('node_id', '=', mutation.nodeId)
|
||||
.execute();
|
||||
|
||||
await tx
|
||||
.deleteFrom('node_states')
|
||||
.where('id', '=', mutation.id)
|
||||
.where('id', '=', mutation.nodeId)
|
||||
.execute();
|
||||
|
||||
await tx.deleteFrom('documents').where('id', '=', mutation.id).execute();
|
||||
await tx
|
||||
.deleteFrom('documents')
|
||||
.where('id', '=', mutation.nodeId)
|
||||
.execute();
|
||||
|
||||
await tx
|
||||
.deleteFrom('document_updates')
|
||||
.where('document_id', '=', mutation.id)
|
||||
.where('document_id', '=', mutation.nodeId)
|
||||
.execute();
|
||||
});
|
||||
|
||||
@@ -767,39 +790,39 @@ export class NodeService {
|
||||
const node = await this.workspace.database
|
||||
.selectFrom('nodes')
|
||||
.selectAll()
|
||||
.where('id', '=', mutation.id)
|
||||
.where('id', '=', mutation.nodeId)
|
||||
.executeTakeFirst();
|
||||
|
||||
if (!node) {
|
||||
// Make sure we don't have any data left behind
|
||||
await this.workspace.database
|
||||
.deleteFrom('node_updates')
|
||||
.where('id', '=', mutation.id)
|
||||
.where('id', '=', mutation.updateId)
|
||||
.execute();
|
||||
|
||||
await this.workspace.database
|
||||
.deleteFrom('node_interactions')
|
||||
.where('node_id', '=', mutation.id)
|
||||
.where('node_id', '=', mutation.nodeId)
|
||||
.execute();
|
||||
|
||||
await this.workspace.database
|
||||
.deleteFrom('node_reactions')
|
||||
.where('node_id', '=', mutation.id)
|
||||
.where('node_id', '=', mutation.nodeId)
|
||||
.execute();
|
||||
|
||||
await this.workspace.database
|
||||
.deleteFrom('node_states')
|
||||
.where('id', '=', mutation.id)
|
||||
.where('id', '=', mutation.nodeId)
|
||||
.execute();
|
||||
|
||||
await this.workspace.database
|
||||
.deleteFrom('documents')
|
||||
.where('id', '=', mutation.id)
|
||||
.where('id', '=', mutation.nodeId)
|
||||
.execute();
|
||||
|
||||
await this.workspace.database
|
||||
.deleteFrom('document_updates')
|
||||
.where('document_id', '=', mutation.id)
|
||||
.where('document_id', '=', mutation.nodeId)
|
||||
.execute();
|
||||
|
||||
return true;
|
||||
@@ -808,7 +831,7 @@ export class NodeService {
|
||||
const updateRow = await this.workspace.database
|
||||
.selectFrom('node_updates')
|
||||
.selectAll()
|
||||
.where('id', '=', mutation.id)
|
||||
.where('id', '=', mutation.updateId)
|
||||
.executeTakeFirst();
|
||||
|
||||
if (!updateRow) {
|
||||
@@ -818,21 +841,17 @@ export class NodeService {
|
||||
const nodeUpdates = await this.workspace.database
|
||||
.selectFrom('node_updates')
|
||||
.selectAll()
|
||||
.where('node_id', '=', mutation.id)
|
||||
.where('node_id', '=', mutation.nodeId)
|
||||
.orderBy('id', 'asc')
|
||||
.execute();
|
||||
|
||||
const state = await this.workspace.database
|
||||
.selectFrom('node_states')
|
||||
.selectAll()
|
||||
.where('id', '=', mutation.id)
|
||||
.where('id', '=', mutation.nodeId)
|
||||
.executeTakeFirst();
|
||||
|
||||
if (!state) {
|
||||
return true;
|
||||
}
|
||||
|
||||
const ydoc = new YDoc(state.state);
|
||||
const ydoc = new YDoc(state?.state);
|
||||
for (const nodeUpdate of nodeUpdates) {
|
||||
if (nodeUpdate.id === mutation.updateId) {
|
||||
continue;
|
||||
@@ -858,7 +877,7 @@ export class NodeService {
|
||||
attributes: JSON.stringify(attributes),
|
||||
local_revision: localRevision,
|
||||
})
|
||||
.where('id', '=', mutation.id)
|
||||
.where('id', '=', mutation.nodeId)
|
||||
.where('local_revision', '=', node.local_revision)
|
||||
.executeTakeFirst();
|
||||
|
||||
@@ -901,7 +920,7 @@ export class NodeService {
|
||||
const tombstone = await this.workspace.database
|
||||
.selectFrom('tombstones')
|
||||
.selectAll()
|
||||
.where('id', '=', mutation.id)
|
||||
.where('id', '=', mutation.nodeId)
|
||||
.executeTakeFirst();
|
||||
|
||||
if (!tombstone) {
|
||||
@@ -911,21 +930,17 @@ export class NodeService {
|
||||
const state = await this.workspace.database
|
||||
.selectFrom('node_states')
|
||||
.selectAll()
|
||||
.where('id', '=', mutation.id)
|
||||
.where('id', '=', mutation.nodeId)
|
||||
.executeTakeFirst();
|
||||
|
||||
if (!state) {
|
||||
return;
|
||||
}
|
||||
|
||||
const nodeUpdates = await this.workspace.database
|
||||
.selectFrom('node_updates')
|
||||
.selectAll()
|
||||
.where('node_id', '=', mutation.id)
|
||||
.where('node_id', '=', mutation.nodeId)
|
||||
.orderBy('id', 'asc')
|
||||
.execute();
|
||||
|
||||
const ydoc = new YDoc(state.state);
|
||||
const ydoc = new YDoc(state?.state);
|
||||
for (const nodeUpdate of nodeUpdates) {
|
||||
ydoc.applyUpdate(nodeUpdate.data);
|
||||
}
|
||||
@@ -959,7 +974,7 @@ export class NodeService {
|
||||
|
||||
await trx
|
||||
.deleteFrom('tombstones')
|
||||
.where('id', '=', mutation.id)
|
||||
.where('id', '=', mutation.nodeId)
|
||||
.execute();
|
||||
});
|
||||
|
||||
|
||||
@@ -2,14 +2,14 @@ import {
|
||||
createDebugger,
|
||||
SyncCollaborationsInput,
|
||||
SyncUsersInput,
|
||||
SyncNodesInput,
|
||||
SyncNodesUpdatesInput,
|
||||
SyncNodeInteractionsInput,
|
||||
SyncNodeReactionsInput,
|
||||
SyncNodeTombstonesInput,
|
||||
SyncNodeInteractionData,
|
||||
SyncNodeReactionData,
|
||||
SyncNodeTombstoneData,
|
||||
SyncNodeData,
|
||||
SyncNodeUpdateData,
|
||||
SyncUserData,
|
||||
SyncCollaborationData,
|
||||
SyncDocumentUpdatesInput,
|
||||
@@ -22,7 +22,7 @@ import { Synchronizer } from '@/main/services/workspaces/synchronizer';
|
||||
import { eventBus } from '@/shared/lib/event-bus';
|
||||
|
||||
interface RootSynchronizers {
|
||||
nodes: Synchronizer<SyncNodesInput>;
|
||||
nodeUpdates: Synchronizer<SyncNodesUpdatesInput>;
|
||||
nodeInteractions: Synchronizer<SyncNodeInteractionsInput>;
|
||||
nodeReactions: Synchronizer<SyncNodeReactionsInput>;
|
||||
nodeTombstones: Synchronizer<SyncNodeTombstonesInput>;
|
||||
@@ -32,7 +32,7 @@ interface RootSynchronizers {
|
||||
type SyncHandlers = {
|
||||
users: (data: SyncUserData) => Promise<void>;
|
||||
collaborations: (data: SyncCollaborationData) => Promise<void>;
|
||||
nodes: (data: SyncNodeData) => Promise<void>;
|
||||
nodeUpdates: (data: SyncNodeUpdateData) => Promise<void>;
|
||||
nodeInteractions: (data: SyncNodeInteractionData) => Promise<void>;
|
||||
nodeReactions: (data: SyncNodeReactionData) => Promise<void>;
|
||||
nodeTombstones: (data: SyncNodeTombstoneData) => Promise<void>;
|
||||
@@ -61,7 +61,9 @@ export class SyncService {
|
||||
this.workspace.collaborations.syncServerCollaboration.bind(
|
||||
this.workspace.collaborations
|
||||
),
|
||||
nodes: this.workspace.nodes.syncServerNode.bind(this.workspace.nodes),
|
||||
nodeUpdates: this.workspace.nodes.syncServerNodeUpdate.bind(
|
||||
this.workspace.nodes
|
||||
),
|
||||
nodeInteractions:
|
||||
this.workspace.nodeInteractions.syncServerNodeInteraction.bind(
|
||||
this.workspace.nodes
|
||||
@@ -140,7 +142,7 @@ export class SyncService {
|
||||
}
|
||||
|
||||
private destroyRootSynchronizers(rootSynchronizers: RootSynchronizers): void {
|
||||
rootSynchronizers.nodes.destroy();
|
||||
rootSynchronizers.nodeUpdates.destroy();
|
||||
rootSynchronizers.nodeInteractions.destroy();
|
||||
rootSynchronizers.nodeReactions.destroy();
|
||||
rootSynchronizers.nodeTombstones.destroy();
|
||||
@@ -157,11 +159,11 @@ export class SyncService {
|
||||
);
|
||||
|
||||
const rootSynchronizers = {
|
||||
nodes: new Synchronizer(
|
||||
nodeUpdates: new Synchronizer(
|
||||
this.workspace,
|
||||
{ type: 'nodes', rootId },
|
||||
`${rootId}_nodes`,
|
||||
this.syncHandlers.nodes
|
||||
{ type: 'nodes_updates', rootId },
|
||||
`${rootId}_nodes_updates`,
|
||||
this.syncHandlers.nodeUpdates
|
||||
),
|
||||
nodeInteractions: new Synchronizer(
|
||||
this.workspace,
|
||||
|
||||
@@ -107,7 +107,7 @@ const handleDeleteNode = async (
|
||||
mutation: DeleteNodeMutation
|
||||
): Promise<SyncMutationStatus> => {
|
||||
const output = await deleteNode(user, {
|
||||
id: mutation.data.id,
|
||||
nodeId: mutation.data.nodeId,
|
||||
rootId: mutation.data.rootId,
|
||||
deletedAt: mutation.data.deletedAt,
|
||||
});
|
||||
|
||||
@@ -2,15 +2,6 @@ import { Migration, sql } from 'kysely';
|
||||
|
||||
export const createNodesTable: Migration = {
|
||||
up: async (db) => {
|
||||
await sql`
|
||||
CREATE SEQUENCE IF NOT EXISTS nodes_revision_sequence
|
||||
START WITH 1000000000
|
||||
INCREMENT BY 1
|
||||
NO MINVALUE
|
||||
NO MAXVALUE
|
||||
CACHE 1;
|
||||
`.execute(db);
|
||||
|
||||
await db.schema
|
||||
.createTable('nodes')
|
||||
.addColumn('id', 'varchar(30)', (col) => col.notNull().primaryKey())
|
||||
@@ -24,43 +15,15 @@ export const createNodesTable: Migration = {
|
||||
)
|
||||
.addColumn('root_id', 'varchar(30)', (col) => col.notNull())
|
||||
.addColumn('workspace_id', 'varchar(30)', (col) => col.notNull())
|
||||
.addColumn('revision', 'bigint', (col) =>
|
||||
col.notNull().defaultTo(sql`nextval('nodes_revision_sequence')`)
|
||||
)
|
||||
.addColumn('revision', 'bigint', (col) => col.notNull())
|
||||
.addColumn('attributes', 'jsonb', (col) => col.notNull())
|
||||
.addColumn('state', 'bytea', (col) => col.notNull())
|
||||
.addColumn('created_at', 'timestamptz', (col) => col.notNull())
|
||||
.addColumn('created_by', 'varchar(30)', (col) => col.notNull())
|
||||
.addColumn('updated_at', 'timestamptz')
|
||||
.addColumn('updated_by', 'varchar(30)')
|
||||
.execute();
|
||||
|
||||
await db.schema
|
||||
.createIndex('nodes_root_id_revision_idx')
|
||||
.on('nodes')
|
||||
.columns(['root_id', 'revision'])
|
||||
.execute();
|
||||
|
||||
await sql`
|
||||
CREATE OR REPLACE FUNCTION update_node_revision() RETURNS TRIGGER AS $$
|
||||
BEGIN
|
||||
NEW.revision = nextval('nodes_revision_sequence');
|
||||
RETURN NEW;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
CREATE TRIGGER trg_update_node_revision
|
||||
BEFORE UPDATE ON nodes
|
||||
FOR EACH ROW
|
||||
EXECUTE FUNCTION update_node_revision();
|
||||
`.execute(db);
|
||||
},
|
||||
down: async (db) => {
|
||||
await sql`
|
||||
DROP TRIGGER IF EXISTS trg_update_node_revision ON nodes;
|
||||
DROP FUNCTION IF EXISTS update_node_revision();
|
||||
`.execute(db);
|
||||
|
||||
await db.schema.dropTable('nodes').execute();
|
||||
},
|
||||
};
|
||||
|
||||
@@ -0,0 +1,66 @@
|
||||
import { sql, Migration } from 'kysely';
|
||||
|
||||
export const createNodeUpdatesTable: Migration = {
|
||||
up: async (db) => {
|
||||
await sql`
|
||||
CREATE SEQUENCE IF NOT EXISTS node_updates_revision_sequence
|
||||
START WITH 1000000000
|
||||
INCREMENT BY 1
|
||||
NO MINVALUE
|
||||
NO MAXVALUE
|
||||
CACHE 1;
|
||||
`.execute(db);
|
||||
|
||||
await db.schema
|
||||
.createTable('node_updates')
|
||||
.addColumn('id', 'varchar(30)', (col) => col.notNull().primaryKey())
|
||||
.addColumn('node_id', 'varchar(30)', (col) => col.notNull())
|
||||
.addColumn('root_id', 'varchar(30)', (col) => col.notNull())
|
||||
.addColumn('workspace_id', 'varchar(30)', (col) => col.notNull())
|
||||
.addColumn('revision', 'bigint', (col) =>
|
||||
col.notNull().defaultTo(sql`nextval('node_updates_revision_sequence')`)
|
||||
)
|
||||
.addColumn('data', 'bytea', (col) => col.notNull())
|
||||
.addColumn('merged_updates', 'jsonb')
|
||||
.addColumn('created_at', 'timestamptz', (col) => col.notNull())
|
||||
.addColumn('created_by', 'varchar(30)', (col) => col.notNull())
|
||||
.execute();
|
||||
|
||||
await sql`
|
||||
CREATE OR REPLACE FUNCTION update_node_update_revision() RETURNS TRIGGER AS $$
|
||||
BEGIN
|
||||
NEW.revision = nextval('node_updates_revision_sequence');
|
||||
RETURN NEW;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
CREATE TRIGGER trg_update_node_update_revision
|
||||
BEFORE UPDATE ON node_updates
|
||||
FOR EACH ROW
|
||||
EXECUTE FUNCTION update_node_update_revision();
|
||||
`.execute(db);
|
||||
|
||||
await db.schema
|
||||
.createIndex('node_updates_root_id_revision_idx')
|
||||
.on('node_updates')
|
||||
.columns(['root_id', 'revision'])
|
||||
.execute();
|
||||
|
||||
await db.schema
|
||||
.createIndex('node_updates_node_id_idx')
|
||||
.on('node_updates')
|
||||
.columns(['node_id'])
|
||||
.execute();
|
||||
},
|
||||
down: async (db) => {
|
||||
await sql`
|
||||
DROP TRIGGER IF EXISTS trg_update_node_update_revision ON node_updates;
|
||||
DROP FUNCTION IF EXISTS update_document_update_revision();
|
||||
`.execute(db);
|
||||
|
||||
await db.schema.dropTable('node_updates').execute();
|
||||
await sql`DROP SEQUENCE IF EXISTS node_updates_revision_sequence`.execute(
|
||||
db
|
||||
);
|
||||
},
|
||||
};
|
||||
@@ -5,13 +5,14 @@ import { createDevicesTable } from './00002-create-devices-table';
|
||||
import { createWorkspacesTable } from './00003-create-workspaces-table';
|
||||
import { createUsersTable } from './00004-create-users-table';
|
||||
import { createNodesTable } from './00005-create-nodes-table';
|
||||
import { createNodeReactionsTable } from './00006-create-node-reactions-table';
|
||||
import { createNodeInteractionsTable } from './00007-create-node-interactions-table';
|
||||
import { createNodeTombstonesTable } from './00008-create-node-tombstones-table';
|
||||
import { createNodePathsTable } from './00009-create-node-paths-table';
|
||||
import { createCollaborationsTable } from './00010-create-collaborations-table';
|
||||
import { createDocumentsTable } from './00011-create-documents-table';
|
||||
import { createDocumentUpdatesTable } from './00012-create-document-updates-table';
|
||||
import { createNodeUpdatesTable } from './00006-create-node-updates-table';
|
||||
import { createNodeReactionsTable } from './00007-create-node-reactions-table';
|
||||
import { createNodeInteractionsTable } from './00008-create-node-interactions-table';
|
||||
import { createNodeTombstonesTable } from './00009-create-node-tombstones-table';
|
||||
import { createNodePathsTable } from './00010-create-node-paths-table';
|
||||
import { createCollaborationsTable } from './00011-create-collaborations-table';
|
||||
import { createDocumentsTable } from './00012-create-documents-table';
|
||||
import { createDocumentUpdatesTable } from './00013-create-document-updates-table';
|
||||
|
||||
export const databaseMigrations: Record<string, Migration> = {
|
||||
'00001_create_accounts_table': createAccountsTable,
|
||||
@@ -19,11 +20,12 @@ export const databaseMigrations: Record<string, Migration> = {
|
||||
'00003_create_workspaces_table': createWorkspacesTable,
|
||||
'00004_create_users_table': createUsersTable,
|
||||
'00005_create_nodes_table': createNodesTable,
|
||||
'00006_create_node_reactions_table': createNodeReactionsTable,
|
||||
'00007_create_node_interactions_table': createNodeInteractionsTable,
|
||||
'00008_create_node_tombstones_table': createNodeTombstonesTable,
|
||||
'00009_create_node_paths_table': createNodePathsTable,
|
||||
'00010_create_collaborations_table': createCollaborationsTable,
|
||||
'00011_create_documents_table': createDocumentsTable,
|
||||
'00012_create_document_updates_table': createDocumentUpdatesTable,
|
||||
'00006_create_node_updates_table': createNodeUpdatesTable,
|
||||
'00007_create_node_reactions_table': createNodeReactionsTable,
|
||||
'00008_create_node_interactions_table': createNodeInteractionsTable,
|
||||
'00009_create_node_tombstones_table': createNodeTombstonesTable,
|
||||
'00010_create_node_paths_table': createNodePathsTable,
|
||||
'00011_create_collaborations_table': createCollaborationsTable,
|
||||
'00012_create_documents_table': createDocumentsTable,
|
||||
'00013_create_document_updates_table': createDocumentUpdatesTable,
|
||||
};
|
||||
|
||||
@@ -6,7 +6,7 @@ import {
|
||||
UserStatus,
|
||||
DocumentType,
|
||||
DocumentContent,
|
||||
DocumentUpdateMergeMetadata,
|
||||
UpdateMergeMetadata,
|
||||
} from '@colanode/core';
|
||||
import {
|
||||
ColumnType,
|
||||
@@ -99,9 +99,8 @@ interface NodeTable {
|
||||
parent_id: ColumnType<string | null, never, never>;
|
||||
root_id: ColumnType<string, string, never>;
|
||||
workspace_id: ColumnType<string, string, never>;
|
||||
revision: ColumnType<bigint, never, never>;
|
||||
revision: ColumnType<bigint, bigint, bigint>;
|
||||
attributes: JSONColumnType<NodeAttributes, string | null, string | null>;
|
||||
state: ColumnType<Uint8Array, Uint8Array, Uint8Array>;
|
||||
created_at: ColumnType<Date, Date, never>;
|
||||
created_by: ColumnType<string, string, never>;
|
||||
updated_at: ColumnType<Date | null, Date | null, Date>;
|
||||
@@ -112,6 +111,26 @@ export type SelectNode = Selectable<NodeTable>;
|
||||
export type CreateNode = Insertable<NodeTable>;
|
||||
export type UpdateNode = Updateable<NodeTable>;
|
||||
|
||||
interface NodeUpdateTable {
|
||||
id: ColumnType<string, string, never>;
|
||||
node_id: ColumnType<string, string, never>;
|
||||
root_id: ColumnType<string, string, never>;
|
||||
workspace_id: ColumnType<string, string, never>;
|
||||
revision: ColumnType<bigint, never, never>;
|
||||
data: ColumnType<Uint8Array, Uint8Array, Uint8Array>;
|
||||
created_at: ColumnType<Date, Date, never>;
|
||||
created_by: ColumnType<string, string, never>;
|
||||
merged_updates: ColumnType<
|
||||
UpdateMergeMetadata[] | null,
|
||||
string | null,
|
||||
string | null
|
||||
>;
|
||||
}
|
||||
|
||||
export type SelectNodeUpdate = Selectable<NodeUpdateTable>;
|
||||
export type CreateNodeUpdate = Insertable<NodeUpdateTable>;
|
||||
export type UpdateNodeUpdate = Updateable<NodeUpdateTable>;
|
||||
|
||||
interface NodeInteractionTable {
|
||||
node_id: ColumnType<string, string, never>;
|
||||
collaborator_id: ColumnType<string, string, never>;
|
||||
@@ -211,7 +230,7 @@ interface DocumentUpdateTable {
|
||||
created_at: ColumnType<Date, Date, never>;
|
||||
created_by: ColumnType<string, string, never>;
|
||||
merged_updates: ColumnType<
|
||||
DocumentUpdateMergeMetadata[] | null,
|
||||
UpdateMergeMetadata[] | null,
|
||||
string | null,
|
||||
string | null
|
||||
>;
|
||||
@@ -227,6 +246,7 @@ export interface DatabaseSchema {
|
||||
workspaces: WorkspaceTable;
|
||||
users: UserTable;
|
||||
nodes: NodeTable;
|
||||
node_updates: NodeUpdateTable;
|
||||
node_interactions: NodeInteractionTable;
|
||||
node_reactions: NodeReactionTable;
|
||||
node_paths: NodePathTable;
|
||||
|
||||
@@ -5,19 +5,21 @@ import {
|
||||
createDebugger,
|
||||
CreateNodeMutationData,
|
||||
extractNodeCollaborators,
|
||||
generateId,
|
||||
getNodeModel,
|
||||
IdType,
|
||||
Node,
|
||||
NodeAttributes,
|
||||
UpdateNodeMutationData,
|
||||
} from '@colanode/core';
|
||||
import { YDoc } from '@colanode/crdt';
|
||||
import { decodeState, YDoc } from '@colanode/crdt';
|
||||
import { cloneDeep } from 'lodash-es';
|
||||
|
||||
import { database } from '@/data/database';
|
||||
import {
|
||||
CreateCollaboration,
|
||||
CreateNode,
|
||||
SelectNode,
|
||||
SelectNodeUpdate,
|
||||
SelectUser,
|
||||
} from '@/data/schema';
|
||||
import {
|
||||
@@ -64,6 +66,19 @@ export const fetchNode = async (nodeId: string): Promise<SelectNode | null> => {
|
||||
return result ?? null;
|
||||
};
|
||||
|
||||
export const fetchNodeUpdates = async (
|
||||
nodeId: string
|
||||
): Promise<SelectNodeUpdate[]> => {
|
||||
const result = await database
|
||||
.selectFrom('node_updates')
|
||||
.selectAll()
|
||||
.where('node_id', '=', nodeId)
|
||||
.orderBy('id', 'desc')
|
||||
.execute();
|
||||
|
||||
return result;
|
||||
};
|
||||
|
||||
export const fetchNodeTree = async (nodeId: string): Promise<SelectNode[]> => {
|
||||
const result = await database
|
||||
.selectFrom('nodes')
|
||||
@@ -104,16 +119,7 @@ export const createNode = async (
|
||||
const attributesJson = JSON.stringify(attributes);
|
||||
const state = ydoc.getState();
|
||||
const date = new Date();
|
||||
|
||||
const createNode: CreateNode = {
|
||||
id: input.nodeId,
|
||||
root_id: input.rootId,
|
||||
workspace_id: input.workspaceId,
|
||||
attributes: attributesJson,
|
||||
created_at: date,
|
||||
created_by: input.userId,
|
||||
state: state,
|
||||
};
|
||||
const updateId = generateId(IdType.Update);
|
||||
|
||||
const collaborationsToCreate: CreateCollaboration[] = Object.entries(
|
||||
extractNodeCollaborators(attributes)
|
||||
@@ -130,10 +136,36 @@ export const createNode = async (
|
||||
const { createdNode, createdCollaborations } = await database
|
||||
.transaction()
|
||||
.execute(async (trx) => {
|
||||
const createdNodeUpdate = await trx
|
||||
.insertInto('node_updates')
|
||||
.returningAll()
|
||||
.values({
|
||||
id: updateId,
|
||||
node_id: input.nodeId,
|
||||
root_id: input.rootId,
|
||||
workspace_id: input.workspaceId,
|
||||
data: state,
|
||||
created_at: date,
|
||||
created_by: input.userId,
|
||||
})
|
||||
.executeTakeFirst();
|
||||
|
||||
if (!createdNodeUpdate) {
|
||||
throw new Error('Failed to create node update');
|
||||
}
|
||||
|
||||
const createdNode = await trx
|
||||
.insertInto('nodes')
|
||||
.returningAll()
|
||||
.values(createNode)
|
||||
.values({
|
||||
id: input.nodeId,
|
||||
root_id: input.rootId,
|
||||
workspace_id: input.workspaceId,
|
||||
attributes: attributesJson,
|
||||
created_at: date,
|
||||
created_by: input.userId,
|
||||
revision: createdNodeUpdate.revision,
|
||||
})
|
||||
.executeTakeFirst();
|
||||
|
||||
if (!createdNode) {
|
||||
@@ -204,14 +236,19 @@ export const tryUpdateNode = async (
|
||||
return { type: 'error', output: null };
|
||||
}
|
||||
|
||||
const model = getNodeModel(node.type);
|
||||
const ydoc = new YDoc(node.state);
|
||||
const nodeUpdates = await fetchNodeUpdates(input.nodeId);
|
||||
const ydoc = new YDoc();
|
||||
for (const nodeUpdate of nodeUpdates) {
|
||||
ydoc.applyUpdate(nodeUpdate.data);
|
||||
}
|
||||
|
||||
const currentAttributes = ydoc.getObject<NodeAttributes>();
|
||||
const updatedAttributes = input.updater(cloneDeep(currentAttributes));
|
||||
if (!updatedAttributes) {
|
||||
return { type: 'error', output: null };
|
||||
}
|
||||
|
||||
const model = getNodeModel(node.type);
|
||||
const update = ydoc.update(model.attributesSchema, updatedAttributes);
|
||||
|
||||
if (!update) {
|
||||
@@ -220,9 +257,8 @@ export const tryUpdateNode = async (
|
||||
|
||||
const attributes = ydoc.getObject<NodeAttributes>();
|
||||
const attributesJson = JSON.stringify(attributes);
|
||||
|
||||
const date = new Date();
|
||||
const state = ydoc.getState();
|
||||
const updateId = generateId(IdType.Update);
|
||||
|
||||
const collaboratorChanges = checkCollaboratorChanges(
|
||||
node.attributes,
|
||||
@@ -232,6 +268,24 @@ export const tryUpdateNode = async (
|
||||
try {
|
||||
const { updatedNode, createdCollaborations, updatedCollaborations } =
|
||||
await database.transaction().execute(async (trx) => {
|
||||
const createdNodeUpdate = await trx
|
||||
.insertInto('node_updates')
|
||||
.returningAll()
|
||||
.values({
|
||||
id: updateId,
|
||||
node_id: input.nodeId,
|
||||
root_id: node.root_id,
|
||||
workspace_id: node.workspace_id,
|
||||
data: update,
|
||||
created_at: date,
|
||||
created_by: input.userId,
|
||||
})
|
||||
.executeTakeFirst();
|
||||
|
||||
if (!createdNodeUpdate) {
|
||||
throw new Error('Failed to create node update');
|
||||
}
|
||||
|
||||
const updatedNode = await trx
|
||||
.updateTable('nodes')
|
||||
.returningAll()
|
||||
@@ -239,7 +293,7 @@ export const tryUpdateNode = async (
|
||||
attributes: attributesJson,
|
||||
updated_at: date,
|
||||
updated_by: input.userId,
|
||||
state: state,
|
||||
revision: createdNodeUpdate.revision,
|
||||
})
|
||||
.where('id', '=', input.nodeId)
|
||||
.where('revision', '=', node.revision)
|
||||
@@ -331,22 +385,12 @@ export const createNodeFromMutation = async (
|
||||
return null;
|
||||
}
|
||||
|
||||
const rootId = tree[0]?.id ?? mutation.id;
|
||||
const createNode: CreateNode = {
|
||||
id: mutation.id,
|
||||
root_id: rootId,
|
||||
attributes: JSON.stringify(attributes),
|
||||
workspace_id: user.workspace_id,
|
||||
created_at: new Date(mutation.createdAt),
|
||||
created_by: user.id,
|
||||
state: ydoc.getState(),
|
||||
};
|
||||
|
||||
const rootId = tree[0]?.id ?? mutation.nodeId;
|
||||
const collaborationsToCreate: CreateCollaboration[] = Object.entries(
|
||||
extractNodeCollaborators(attributes)
|
||||
).map(([userId, role]) => ({
|
||||
collaborator_id: userId,
|
||||
node_id: mutation.id,
|
||||
node_id: mutation.nodeId,
|
||||
workspace_id: user.workspace_id,
|
||||
role,
|
||||
created_at: new Date(),
|
||||
@@ -357,10 +401,36 @@ export const createNodeFromMutation = async (
|
||||
const { createdNode, createdCollaborations } = await database
|
||||
.transaction()
|
||||
.execute(async (trx) => {
|
||||
const createdNodeUpdate = await trx
|
||||
.insertInto('node_updates')
|
||||
.returningAll()
|
||||
.values({
|
||||
id: mutation.updateId,
|
||||
node_id: mutation.nodeId,
|
||||
root_id: rootId,
|
||||
workspace_id: user.workspace_id,
|
||||
data: ydoc.getState(),
|
||||
created_at: new Date(mutation.createdAt),
|
||||
created_by: user.id,
|
||||
})
|
||||
.executeTakeFirst();
|
||||
|
||||
if (!createdNodeUpdate) {
|
||||
throw new Error('Failed to create node update');
|
||||
}
|
||||
|
||||
const createdNode = await trx
|
||||
.insertInto('nodes')
|
||||
.returningAll()
|
||||
.values(createNode)
|
||||
.values({
|
||||
id: mutation.nodeId,
|
||||
root_id: rootId,
|
||||
attributes: JSON.stringify(attributes),
|
||||
workspace_id: user.workspace_id,
|
||||
created_at: new Date(mutation.createdAt),
|
||||
created_by: user.id,
|
||||
revision: createdNodeUpdate.revision,
|
||||
})
|
||||
.executeTakeFirst();
|
||||
|
||||
if (!createdNode) {
|
||||
@@ -382,7 +452,7 @@ export const createNodeFromMutation = async (
|
||||
|
||||
eventBus.publish({
|
||||
type: 'node_created',
|
||||
nodeId: mutation.id,
|
||||
nodeId: mutation.nodeId,
|
||||
rootId,
|
||||
workspaceId: user.workspace_id,
|
||||
});
|
||||
@@ -391,7 +461,7 @@ export const createNodeFromMutation = async (
|
||||
eventBus.publish({
|
||||
type: 'collaboration_created',
|
||||
collaboratorId: createdCollaboration.collaborator_id,
|
||||
nodeId: mutation.id,
|
||||
nodeId: mutation.nodeId,
|
||||
workspaceId: user.workspace_id,
|
||||
});
|
||||
}
|
||||
@@ -428,19 +498,24 @@ const tryUpdateNodeFromMutation = async (
|
||||
user: SelectUser,
|
||||
mutation: UpdateNodeMutationData
|
||||
): Promise<ConcurrentUpdateResult<UpdateNodeOutput>> => {
|
||||
const tree = await fetchNodeTree(mutation.id);
|
||||
const tree = await fetchNodeTree(mutation.nodeId);
|
||||
if (tree.length === 0) {
|
||||
return { type: 'error', output: null };
|
||||
}
|
||||
|
||||
const node = tree[tree.length - 1];
|
||||
if (!node || node.id !== mutation.id) {
|
||||
if (!node || node.id !== mutation.nodeId) {
|
||||
return { type: 'error', output: null };
|
||||
}
|
||||
|
||||
const model = getNodeModel(node.type);
|
||||
const ydoc = new YDoc(node.state);
|
||||
ydoc.applyUpdate(mutation.data);
|
||||
const nodeUpdates = await fetchNodeUpdates(mutation.nodeId);
|
||||
const ydoc = new YDoc();
|
||||
for (const nodeUpdate of nodeUpdates) {
|
||||
ydoc.applyUpdate(nodeUpdate.data);
|
||||
}
|
||||
|
||||
const update = decodeState(mutation.data);
|
||||
ydoc.applyUpdate(update);
|
||||
|
||||
const attributes = ydoc.getObject<NodeAttributes>();
|
||||
const attributesJson = JSON.stringify(attributes);
|
||||
@@ -457,6 +532,7 @@ const tryUpdateNodeFromMutation = async (
|
||||
attributes,
|
||||
};
|
||||
|
||||
const model = getNodeModel(node.type);
|
||||
if (!model.canUpdateAttributes(canUpdateNodeContext)) {
|
||||
return { type: 'error', output: null };
|
||||
}
|
||||
@@ -469,6 +545,24 @@ const tryUpdateNodeFromMutation = async (
|
||||
try {
|
||||
const { updatedNode, createdCollaborations, updatedCollaborations } =
|
||||
await database.transaction().execute(async (trx) => {
|
||||
const createdNodeUpdate = await trx
|
||||
.insertInto('node_updates')
|
||||
.returningAll()
|
||||
.values({
|
||||
id: mutation.updateId,
|
||||
node_id: mutation.nodeId,
|
||||
root_id: node.root_id,
|
||||
workspace_id: user.workspace_id,
|
||||
data: update,
|
||||
created_at: new Date(mutation.createdAt),
|
||||
created_by: user.id,
|
||||
})
|
||||
.executeTakeFirst();
|
||||
|
||||
if (!createdNodeUpdate) {
|
||||
throw new Error('Failed to create node update');
|
||||
}
|
||||
|
||||
const updatedNode = await trx
|
||||
.updateTable('nodes')
|
||||
.returningAll()
|
||||
@@ -476,9 +570,9 @@ const tryUpdateNodeFromMutation = async (
|
||||
attributes: attributesJson,
|
||||
updated_at: new Date(mutation.createdAt),
|
||||
updated_by: user.id,
|
||||
state: ydoc.getState(),
|
||||
revision: createdNodeUpdate.revision,
|
||||
})
|
||||
.where('id', '=', mutation.id)
|
||||
.where('id', '=', mutation.nodeId)
|
||||
.where('revision', '=', node.revision)
|
||||
.executeTakeFirst();
|
||||
|
||||
@@ -489,7 +583,7 @@ const tryUpdateNodeFromMutation = async (
|
||||
const { createdCollaborations, updatedCollaborations } =
|
||||
await applyCollaboratorUpdates(
|
||||
trx,
|
||||
mutation.id,
|
||||
mutation.nodeId,
|
||||
user.id,
|
||||
user.workspace_id,
|
||||
collaboratorChanges
|
||||
@@ -504,7 +598,7 @@ const tryUpdateNodeFromMutation = async (
|
||||
|
||||
eventBus.publish({
|
||||
type: 'node_updated',
|
||||
nodeId: mutation.id,
|
||||
nodeId: mutation.nodeId,
|
||||
rootId: node.root_id,
|
||||
workspaceId: user.workspace_id,
|
||||
});
|
||||
@@ -513,7 +607,7 @@ const tryUpdateNodeFromMutation = async (
|
||||
eventBus.publish({
|
||||
type: 'collaboration_created',
|
||||
collaboratorId: createdCollaboration.collaborator_id,
|
||||
nodeId: mutation.id,
|
||||
nodeId: mutation.nodeId,
|
||||
workspaceId: user.workspace_id,
|
||||
});
|
||||
}
|
||||
@@ -522,7 +616,7 @@ const tryUpdateNodeFromMutation = async (
|
||||
eventBus.publish({
|
||||
type: 'collaboration_updated',
|
||||
collaboratorId: updatedCollaboration.collaborator_id,
|
||||
nodeId: mutation.id,
|
||||
nodeId: mutation.nodeId,
|
||||
workspaceId: user.workspace_id,
|
||||
});
|
||||
}
|
||||
@@ -542,13 +636,13 @@ export const deleteNode = async (
|
||||
user: SelectUser,
|
||||
input: DeleteNodeInput
|
||||
): Promise<DeleteNodeOutput | null> => {
|
||||
const tree = await fetchNodeTree(input.id);
|
||||
const tree = await fetchNodeTree(input.nodeId);
|
||||
if (tree.length === 0) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const node = tree[tree.length - 1];
|
||||
if (!node || node.id !== input.id) {
|
||||
if (!node || node.id !== input.nodeId) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@@ -574,7 +668,7 @@ export const deleteNode = async (
|
||||
const deletedNode = await trx
|
||||
.deleteFrom('nodes')
|
||||
.returningAll()
|
||||
.where('id', '=', input.id)
|
||||
.where('id', '=', input.nodeId)
|
||||
.executeTakeFirst();
|
||||
|
||||
if (!deletedNode) {
|
||||
@@ -597,14 +691,19 @@ export const deleteNode = async (
|
||||
throw new Error('Failed to create tombstone');
|
||||
}
|
||||
|
||||
await trx
|
||||
.deleteFrom('node_updates')
|
||||
.where('node_id', '=', input.nodeId)
|
||||
.execute();
|
||||
|
||||
await trx
|
||||
.deleteFrom('node_reactions')
|
||||
.where('node_id', '=', input.id)
|
||||
.where('node_id', '=', input.nodeId)
|
||||
.execute();
|
||||
|
||||
await trx
|
||||
.deleteFrom('node_interactions')
|
||||
.where('node_id', '=', input.id)
|
||||
.where('node_id', '=', input.nodeId)
|
||||
.execute();
|
||||
|
||||
const updatedCollaborations = await trx
|
||||
@@ -614,7 +713,7 @@ export const deleteNode = async (
|
||||
deleted_by: user.id,
|
||||
})
|
||||
.returningAll()
|
||||
.where('node_id', '=', input.id)
|
||||
.where('node_id', '=', input.nodeId)
|
||||
.execute();
|
||||
|
||||
return {
|
||||
@@ -625,7 +724,7 @@ export const deleteNode = async (
|
||||
|
||||
eventBus.publish({
|
||||
type: 'node_deleted',
|
||||
nodeId: input.id,
|
||||
nodeId: input.nodeId,
|
||||
rootId: node.root_id,
|
||||
workspaceId: user.workspace_id,
|
||||
});
|
||||
@@ -634,14 +733,14 @@ export const deleteNode = async (
|
||||
eventBus.publish({
|
||||
type: 'collaboration_updated',
|
||||
collaboratorId: updatedCollaboration.collaborator_id,
|
||||
nodeId: input.id,
|
||||
nodeId: input.nodeId,
|
||||
workspaceId: user.workspace_id,
|
||||
});
|
||||
}
|
||||
|
||||
await jobService.addJob({
|
||||
type: 'clean_node_data',
|
||||
nodeId: input.id,
|
||||
nodeId: input.nodeId,
|
||||
workspaceId: user.workspace_id,
|
||||
userId: user.id,
|
||||
});
|
||||
|
||||
@@ -23,7 +23,7 @@ import { ConnectedUser } from '@/types/users';
|
||||
import { BaseSynchronizer } from '@/synchronizers/base';
|
||||
import { UserSynchronizer } from '@/synchronizers/users';
|
||||
import { CollaborationSynchronizer } from '@/synchronizers/collaborations';
|
||||
import { NodeSynchronizer } from '@/synchronizers/nodes';
|
||||
import { NodeUpdatesSynchronizer } from '@/synchronizers/node-updates';
|
||||
import { NodeReactionSynchronizer } from '@/synchronizers/node-reactions';
|
||||
import { NodeTombstoneSynchronizer } from '@/synchronizers/node-tombstones';
|
||||
import { NodeInteractionSynchronizer } from '@/synchronizers/node-interactions';
|
||||
@@ -146,12 +146,17 @@ export class SocketConnection {
|
||||
message.input,
|
||||
cursor
|
||||
);
|
||||
} else if (message.input.type === 'nodes') {
|
||||
} else if (message.input.type === 'nodes_updates') {
|
||||
if (!user.rootIds.has(message.input.rootId)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return new NodeSynchronizer(message.id, user.user, message.input, cursor);
|
||||
return new NodeUpdatesSynchronizer(
|
||||
message.id,
|
||||
user.user,
|
||||
message.input,
|
||||
cursor
|
||||
);
|
||||
} else if (message.input.type === 'node_reactions') {
|
||||
return new NodeReactionSynchronizer(
|
||||
message.id,
|
||||
|
||||
98
apps/server/src/synchronizers/node-updates.ts
Normal file
98
apps/server/src/synchronizers/node-updates.ts
Normal file
@@ -0,0 +1,98 @@
|
||||
import {
|
||||
SynchronizerOutputMessage,
|
||||
SyncNodesUpdatesInput,
|
||||
SyncNodeUpdateData,
|
||||
} from '@colanode/core';
|
||||
import { encodeState } from '@colanode/crdt';
|
||||
|
||||
import { BaseSynchronizer } from '@/synchronizers/base';
|
||||
import { Event } from '@/types/events';
|
||||
import { database } from '@/data/database';
|
||||
import { SelectNodeUpdate } from '@/data/schema';
|
||||
|
||||
export class NodeUpdatesSynchronizer extends BaseSynchronizer<SyncNodesUpdatesInput> {
|
||||
public async fetchData(): Promise<SynchronizerOutputMessage<SyncNodesUpdatesInput> | null> {
|
||||
const nodeUpdates = await this.fetchNodeUpdates();
|
||||
if (nodeUpdates.length === 0) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return this.buildMessage(nodeUpdates);
|
||||
}
|
||||
|
||||
public async fetchDataFromEvent(
|
||||
event: Event
|
||||
): Promise<SynchronizerOutputMessage<SyncNodesUpdatesInput> | null> {
|
||||
if (!this.shouldFetch(event)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const nodeUpdates = await this.fetchNodeUpdates();
|
||||
if (nodeUpdates.length === 0) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return this.buildMessage(nodeUpdates);
|
||||
}
|
||||
|
||||
private async fetchNodeUpdates() {
|
||||
if (this.status === 'fetching') {
|
||||
return [];
|
||||
}
|
||||
|
||||
this.status = 'fetching';
|
||||
const nodesUpdates = await database
|
||||
.selectFrom('node_updates')
|
||||
.selectAll()
|
||||
.where('root_id', '=', this.input.rootId)
|
||||
.where('revision', '>', this.cursor)
|
||||
.orderBy('revision', 'asc')
|
||||
.limit(20)
|
||||
.execute();
|
||||
|
||||
this.status = 'pending';
|
||||
return nodesUpdates;
|
||||
}
|
||||
|
||||
private buildMessage(
|
||||
unsyncedNodeUpdates: SelectNodeUpdate[]
|
||||
): SynchronizerOutputMessage<SyncNodesUpdatesInput> {
|
||||
const items: SyncNodeUpdateData[] = unsyncedNodeUpdates.map(
|
||||
(nodeUpdate) => {
|
||||
return {
|
||||
id: nodeUpdate.id,
|
||||
nodeId: nodeUpdate.node_id,
|
||||
rootId: nodeUpdate.root_id,
|
||||
workspaceId: nodeUpdate.workspace_id,
|
||||
revision: nodeUpdate.revision.toString(),
|
||||
state: encodeState(nodeUpdate.data),
|
||||
createdAt: nodeUpdate.created_at.toISOString(),
|
||||
createdBy: nodeUpdate.created_by,
|
||||
mergedUpdates: nodeUpdate.merged_updates,
|
||||
};
|
||||
}
|
||||
);
|
||||
|
||||
return {
|
||||
type: 'synchronizer_output',
|
||||
userId: this.user.userId,
|
||||
id: this.id,
|
||||
items: items.map((item) => ({
|
||||
cursor: item.revision,
|
||||
data: item,
|
||||
})),
|
||||
};
|
||||
}
|
||||
|
||||
private shouldFetch(event: Event) {
|
||||
if (event.type === 'node_created' && event.rootId === this.input.rootId) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (event.type === 'node_updated' && event.rootId === this.input.rootId) {
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
}
|
||||
@@ -1,100 +0,0 @@
|
||||
import {
|
||||
SynchronizerOutputMessage,
|
||||
SyncNodesInput,
|
||||
SyncNodeData,
|
||||
} from '@colanode/core';
|
||||
import { encodeState } from '@colanode/crdt';
|
||||
|
||||
import { BaseSynchronizer } from '@/synchronizers/base';
|
||||
import { Event } from '@/types/events';
|
||||
import { database } from '@/data/database';
|
||||
import { SelectNode } from '@/data/schema';
|
||||
|
||||
export class NodeSynchronizer extends BaseSynchronizer<SyncNodesInput> {
|
||||
public async fetchData(): Promise<SynchronizerOutputMessage<SyncNodesInput> | null> {
|
||||
const nodes = await this.fetchNodes();
|
||||
if (nodes.length === 0) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return this.buildMessage(nodes);
|
||||
}
|
||||
|
||||
public async fetchDataFromEvent(
|
||||
event: Event
|
||||
): Promise<SynchronizerOutputMessage<SyncNodesInput> | null> {
|
||||
if (!this.shouldFetch(event)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const nodes = await this.fetchNodes();
|
||||
if (nodes.length === 0) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return this.buildMessage(nodes);
|
||||
}
|
||||
|
||||
private async fetchNodes() {
|
||||
if (this.status === 'fetching') {
|
||||
return [];
|
||||
}
|
||||
|
||||
this.status = 'fetching';
|
||||
const nodes = await database
|
||||
.selectFrom('nodes')
|
||||
.selectAll()
|
||||
.where('root_id', '=', this.input.rootId)
|
||||
.where('revision', '>', this.cursor)
|
||||
.orderBy('revision', 'asc')
|
||||
.limit(20)
|
||||
.execute();
|
||||
|
||||
this.status = 'pending';
|
||||
return nodes;
|
||||
}
|
||||
|
||||
private buildMessage(
|
||||
unsyncedNodes: SelectNode[]
|
||||
): SynchronizerOutputMessage<SyncNodesInput> {
|
||||
const items: SyncNodeData[] = unsyncedNodes.map((node) => {
|
||||
return {
|
||||
id: node.id,
|
||||
rootId: node.root_id,
|
||||
workspaceId: node.workspace_id,
|
||||
revision: node.revision.toString(),
|
||||
state: encodeState(node.state),
|
||||
createdAt: node.created_at.toISOString(),
|
||||
createdBy: node.created_by,
|
||||
updatedAt: node.updated_at?.toISOString() ?? null,
|
||||
updatedBy: node.updated_by,
|
||||
};
|
||||
});
|
||||
|
||||
return {
|
||||
type: 'synchronizer_output',
|
||||
userId: this.user.userId,
|
||||
id: this.id,
|
||||
items: items.map((item) => ({
|
||||
cursor: item.revision,
|
||||
data: item,
|
||||
})),
|
||||
};
|
||||
}
|
||||
|
||||
private shouldFetch(event: Event) {
|
||||
if (event.type === 'node_created' && event.rootId === this.input.rootId) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (event.type === 'node_updated' && event.rootId === this.input.rootId) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (event.type === 'node_deleted' && event.rootId === this.input.rootId) {
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
}
|
||||
@@ -32,7 +32,7 @@ export type UpdateNodeOutput = {
|
||||
};
|
||||
|
||||
export type DeleteNodeInput = {
|
||||
id: string;
|
||||
nodeId: string;
|
||||
rootId: string;
|
||||
deletedAt: string;
|
||||
};
|
||||
|
||||
@@ -32,4 +32,4 @@ export * from './lib/texts';
|
||||
export * from './lib/permissions';
|
||||
export * from './types/api';
|
||||
export * from './lib/debugger';
|
||||
export * from './types/documents';
|
||||
export * from './types/crdt';
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import { DocumentUpdateMergeMetadata } from '../types/documents';
|
||||
import { UpdateMergeMetadata } from '../types/crdt';
|
||||
|
||||
export type SyncDocumentUpdatesInput = {
|
||||
type: 'document_updates';
|
||||
@@ -12,7 +12,7 @@ export type SyncDocumentUpdateData = {
|
||||
revision: string;
|
||||
createdBy: string;
|
||||
createdAt: string;
|
||||
mergedUpdates: DocumentUpdateMergeMetadata[] | null | undefined;
|
||||
mergedUpdates: UpdateMergeMetadata[] | null | undefined;
|
||||
};
|
||||
|
||||
declare module '@colanode/core' {
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
export * from './nodes';
|
||||
export * from './nodes-updates';
|
||||
export * from './users';
|
||||
export * from './node-reactions';
|
||||
export * from './node-interactions';
|
||||
|
||||
27
packages/core/src/synchronizers/nodes-updates.ts
Normal file
27
packages/core/src/synchronizers/nodes-updates.ts
Normal file
@@ -0,0 +1,27 @@
|
||||
import { UpdateMergeMetadata } from '@colanode/core';
|
||||
|
||||
export type SyncNodesUpdatesInput = {
|
||||
type: 'nodes_updates';
|
||||
rootId: string;
|
||||
};
|
||||
|
||||
export type SyncNodeUpdateData = {
|
||||
id: string;
|
||||
nodeId: string;
|
||||
rootId: string;
|
||||
workspaceId: string;
|
||||
revision: string;
|
||||
state: string;
|
||||
createdAt: string;
|
||||
createdBy: string;
|
||||
mergedUpdates: UpdateMergeMetadata[] | null | undefined;
|
||||
};
|
||||
|
||||
declare module '@colanode/core' {
|
||||
interface SynchronizerMap {
|
||||
nodes_updates: {
|
||||
input: SyncNodesUpdatesInput;
|
||||
data: SyncNodeUpdateData;
|
||||
};
|
||||
}
|
||||
}
|
||||
@@ -1,25 +0,0 @@
|
||||
export type SyncNodesInput = {
|
||||
type: 'nodes';
|
||||
rootId: string;
|
||||
};
|
||||
|
||||
export type SyncNodeData = {
|
||||
id: string;
|
||||
rootId: string;
|
||||
workspaceId: string;
|
||||
revision: string;
|
||||
state: string;
|
||||
createdAt: string;
|
||||
createdBy: string;
|
||||
updatedAt: string | null;
|
||||
updatedBy: string | null;
|
||||
};
|
||||
|
||||
declare module '@colanode/core' {
|
||||
interface SynchronizerMap {
|
||||
nodes: {
|
||||
input: SyncNodesInput;
|
||||
data: SyncNodeData;
|
||||
};
|
||||
}
|
||||
}
|
||||
@@ -1,4 +1,4 @@
|
||||
export type DocumentUpdateMergeMetadata = {
|
||||
export type UpdateMergeMetadata = {
|
||||
id: string;
|
||||
createdAt: string;
|
||||
createdBy: string;
|
||||
@@ -19,7 +19,8 @@ export type MutationBase = {
|
||||
};
|
||||
|
||||
export type CreateNodeMutationData = {
|
||||
id: string;
|
||||
nodeId: string;
|
||||
updateId: string;
|
||||
createdAt: string;
|
||||
data: string;
|
||||
};
|
||||
@@ -29,13 +30,8 @@ export type CreateNodeMutation = MutationBase & {
|
||||
data: CreateNodeMutationData;
|
||||
};
|
||||
|
||||
export type ApplyNodeUpdateMutationData = {
|
||||
id: string;
|
||||
data: string;
|
||||
};
|
||||
|
||||
export type UpdateNodeMutationData = {
|
||||
id: string;
|
||||
nodeId: string;
|
||||
updateId: string;
|
||||
data: string;
|
||||
createdAt: string;
|
||||
@@ -47,7 +43,7 @@ export type UpdateNodeMutation = MutationBase & {
|
||||
};
|
||||
|
||||
export type DeleteNodeMutationData = {
|
||||
id: string;
|
||||
nodeId: string;
|
||||
rootId: string;
|
||||
deletedAt: string;
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user