mirror of
https://github.com/colanode/colanode.git
synced 2025-12-29 00:25:03 +01:00
Implement new node structure with crdt in server
This commit is contained in:
58
server/package-lock.json
generated
58
server/package-lock.json
generated
@@ -14,6 +14,7 @@
|
||||
"bcrypt": "^5.1.1",
|
||||
"cors": "^2.8.5",
|
||||
"express": "^4.21.0",
|
||||
"js-base64": "^3.7.7",
|
||||
"jsonwebtoken": "^9.0.2",
|
||||
"kafkajs": "^2.2.4",
|
||||
"kysely": "^0.27.4",
|
||||
@@ -21,7 +22,8 @@
|
||||
"postgres": "^3.4.4",
|
||||
"redis": "^4.7.0",
|
||||
"ulid": "^2.3.0",
|
||||
"ws": "^8.18.0"
|
||||
"ws": "^8.18.0",
|
||||
"yjs": "^13.6.19"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@types/bcrypt": "^5.0.2",
|
||||
@@ -1603,6 +1605,22 @@
|
||||
"node": ">=0.12.0"
|
||||
}
|
||||
},
|
||||
"node_modules/isomorphic.js": {
|
||||
"version": "0.2.5",
|
||||
"resolved": "https://registry.npmjs.org/isomorphic.js/-/isomorphic.js-0.2.5.tgz",
|
||||
"integrity": "sha512-PIeMbHqMt4DnUP3MA/Flc0HElYjMXArsw1qwJZcm9sqR8mq3l8NYizFMty0pWwE/tzIGH3EKK5+jes5mAr85yw==",
|
||||
"license": "MIT",
|
||||
"funding": {
|
||||
"type": "GitHub Sponsors ❤",
|
||||
"url": "https://github.com/sponsors/dmonad"
|
||||
}
|
||||
},
|
||||
"node_modules/js-base64": {
|
||||
"version": "3.7.7",
|
||||
"resolved": "https://registry.npmjs.org/js-base64/-/js-base64-3.7.7.tgz",
|
||||
"integrity": "sha512-7rCnleh0z2CkXhH67J8K1Ytz0b2Y+yxTPL+/KOJoa20hfnVQ/3/T6W/KflYI4bRHRagNeXeU2bkNGI3v1oS/lw==",
|
||||
"license": "BSD-3-Clause"
|
||||
},
|
||||
"node_modules/json5": {
|
||||
"version": "2.2.3",
|
||||
"resolved": "https://registry.npmjs.org/json5/-/json5-2.2.3.tgz",
|
||||
@@ -1683,6 +1701,27 @@
|
||||
"node": ">=14.0.0"
|
||||
}
|
||||
},
|
||||
"node_modules/lib0": {
|
||||
"version": "0.2.97",
|
||||
"resolved": "https://registry.npmjs.org/lib0/-/lib0-0.2.97.tgz",
|
||||
"integrity": "sha512-Q4d1ekgvufi9FiHkkL46AhecfNjznSL9MRNoJRQ76gBHS9OqU2ArfQK0FvBpuxgWeJeNI0LVgAYMIpsGeX4gYg==",
|
||||
"license": "MIT",
|
||||
"dependencies": {
|
||||
"isomorphic.js": "^0.2.4"
|
||||
},
|
||||
"bin": {
|
||||
"0ecdsa-generate-keypair": "bin/0ecdsa-generate-keypair.js",
|
||||
"0gentesthtml": "bin/gentesthtml.js",
|
||||
"0serve": "bin/0serve.js"
|
||||
},
|
||||
"engines": {
|
||||
"node": ">=16"
|
||||
},
|
||||
"funding": {
|
||||
"type": "GitHub Sponsors ❤",
|
||||
"url": "https://github.com/sponsors/dmonad"
|
||||
}
|
||||
},
|
||||
"node_modules/lodash": {
|
||||
"version": "4.17.21",
|
||||
"resolved": "https://registry.npmjs.org/lodash/-/lodash-4.17.21.tgz",
|
||||
@@ -3017,6 +3056,23 @@
|
||||
"node": ">=12"
|
||||
}
|
||||
},
|
||||
"node_modules/yjs": {
|
||||
"version": "13.6.19",
|
||||
"resolved": "https://registry.npmjs.org/yjs/-/yjs-13.6.19.tgz",
|
||||
"integrity": "sha512-GNKw4mEUn5yWU2QPHRx8jppxmCm9KzbBhB4qJLUJFiiYD0g/tDVgXQ7aPkyh01YO28kbs2J/BEbWBagjuWyejw==",
|
||||
"license": "MIT",
|
||||
"dependencies": {
|
||||
"lib0": "^0.2.86"
|
||||
},
|
||||
"engines": {
|
||||
"node": ">=16.0.0",
|
||||
"npm": ">=8.0.0"
|
||||
},
|
||||
"funding": {
|
||||
"type": "GitHub Sponsors ❤",
|
||||
"url": "https://github.com/sponsors/dmonad"
|
||||
}
|
||||
},
|
||||
"node_modules/yn": {
|
||||
"version": "3.1.1",
|
||||
"resolved": "https://registry.npmjs.org/yn/-/yn-3.1.1.tgz",
|
||||
|
||||
@@ -32,6 +32,7 @@
|
||||
"bcrypt": "^5.1.1",
|
||||
"cors": "^2.8.5",
|
||||
"express": "^4.21.0",
|
||||
"js-base64": "^3.7.7",
|
||||
"jsonwebtoken": "^9.0.2",
|
||||
"kafkajs": "^2.2.4",
|
||||
"kysely": "^0.27.4",
|
||||
@@ -39,6 +40,7 @@
|
||||
"postgres": "^3.4.4",
|
||||
"redis": "^4.7.0",
|
||||
"ulid": "^2.3.0",
|
||||
"ws": "^8.18.0"
|
||||
"ws": "^8.18.0",
|
||||
"yjs": "^13.6.19"
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,173 +0,0 @@
|
||||
import { kafka, TOPIC_NAMES, CONSUMER_IDS } from '@/data/kafka';
|
||||
import { ChangeMessage, NodeAttributeChangeData } from '@/types/changes';
|
||||
import { PostgresOperation } from '@/lib/constants';
|
||||
import { database } from '@/data/database';
|
||||
import { NeuronId } from '@/lib/id';
|
||||
import { ServerNode, ServerNodeAttribute } from '@/types/nodes';
|
||||
|
||||
export const initNodeAttributeChangesConsumer = async () => {
|
||||
const consumer = kafka.consumer({
|
||||
groupId: CONSUMER_IDS.NODE_ATTRIBUTE_CHANGES,
|
||||
});
|
||||
|
||||
await consumer.connect();
|
||||
await consumer.subscribe({ topic: TOPIC_NAMES.NODE_ATTRIBUTE_CHANGES });
|
||||
|
||||
await consumer.run({
|
||||
eachMessage: async ({ message }) => {
|
||||
if (!message || !message.value) {
|
||||
return;
|
||||
}
|
||||
|
||||
const change = JSON.parse(
|
||||
message.value.toString(),
|
||||
) as ChangeMessage<NodeAttributeChangeData>;
|
||||
|
||||
await handleNodeAttributeChange(change);
|
||||
},
|
||||
});
|
||||
};
|
||||
|
||||
const handleNodeAttributeChange = async (
|
||||
change: ChangeMessage<NodeAttributeChangeData>,
|
||||
) => {
|
||||
switch (change.op) {
|
||||
case PostgresOperation.CREATE: {
|
||||
await handleNodeAttributeCreate(change);
|
||||
break;
|
||||
}
|
||||
case PostgresOperation.UPDATE: {
|
||||
await handleNodeAttributeUpdate(change);
|
||||
break;
|
||||
}
|
||||
case PostgresOperation.DELETE: {
|
||||
await handleNodeAttributeDelete(change);
|
||||
break;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
const handleNodeAttributeCreate = async (
|
||||
change: ChangeMessage<NodeAttributeChangeData>,
|
||||
) => {
|
||||
const node = change.after;
|
||||
if (!node) {
|
||||
return;
|
||||
}
|
||||
|
||||
const deviceIds = await getDeviceIds(node.workspace_id);
|
||||
if (deviceIds.length == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
const serverNodeAttribute: ServerNodeAttribute = mapNodeAttribute(node);
|
||||
await database
|
||||
.insertInto('mutations')
|
||||
.values({
|
||||
id: NeuronId.generate(NeuronId.Type.Mutation),
|
||||
table: 'node_attributes',
|
||||
action: 'insert',
|
||||
workspace_id: node.workspace_id,
|
||||
created_at: new Date(),
|
||||
after: JSON.stringify(serverNodeAttribute),
|
||||
device_ids: deviceIds,
|
||||
})
|
||||
.execute();
|
||||
};
|
||||
|
||||
const handleNodeAttributeUpdate = async (
|
||||
change: ChangeMessage<NodeAttributeChangeData>,
|
||||
) => {
|
||||
const node = change.after;
|
||||
if (!node) {
|
||||
return;
|
||||
}
|
||||
|
||||
const deviceIds = await getDeviceIds(node.workspace_id);
|
||||
if (deviceIds.length == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
const serverNodeAttribute: ServerNodeAttribute = mapNodeAttribute(node);
|
||||
await database
|
||||
.insertInto('mutations')
|
||||
.values({
|
||||
id: NeuronId.generate(NeuronId.Type.Mutation),
|
||||
table: 'node_attributes',
|
||||
action: 'update',
|
||||
workspace_id: node.workspace_id,
|
||||
created_at: new Date(),
|
||||
before: change.before ? JSON.stringify(change.before) : null,
|
||||
after: JSON.stringify(serverNodeAttribute),
|
||||
device_ids: deviceIds,
|
||||
})
|
||||
.execute();
|
||||
};
|
||||
|
||||
const handleNodeAttributeDelete = async (
|
||||
change: ChangeMessage<NodeAttributeChangeData>,
|
||||
) => {
|
||||
const node = change.before;
|
||||
if (!node) {
|
||||
return;
|
||||
}
|
||||
|
||||
const deviceIds = await getDeviceIds(node.workspace_id);
|
||||
if (deviceIds.length == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
const serverNodeAttribute: ServerNodeAttribute = mapNodeAttribute(node);
|
||||
await database
|
||||
.insertInto('mutations')
|
||||
.values({
|
||||
id: NeuronId.generate(NeuronId.Type.Mutation),
|
||||
table: 'node_attributes',
|
||||
action: 'delete',
|
||||
workspace_id: node.workspace_id,
|
||||
created_at: new Date(),
|
||||
before: JSON.stringify(serverNodeAttribute),
|
||||
after: null,
|
||||
device_ids: deviceIds,
|
||||
})
|
||||
.execute();
|
||||
};
|
||||
|
||||
const getDeviceIds = async (workspaceId: string) => {
|
||||
const accountDevices = await database
|
||||
.selectFrom('account_devices')
|
||||
.where(
|
||||
'account_id',
|
||||
'in',
|
||||
database
|
||||
.selectFrom('workspace_accounts')
|
||||
.where('workspace_id', '=', workspaceId)
|
||||
.select('account_id'),
|
||||
)
|
||||
.select('id')
|
||||
.execute();
|
||||
|
||||
const deviceIds = accountDevices.map((account) => account.id);
|
||||
return deviceIds;
|
||||
};
|
||||
|
||||
const mapNodeAttribute = (
|
||||
node: NodeAttributeChangeData,
|
||||
): ServerNodeAttribute => {
|
||||
return {
|
||||
nodeId: node.node_id,
|
||||
type: node.type,
|
||||
key: node.key,
|
||||
workspaceId: node.workspace_id,
|
||||
textValue: node.text_value,
|
||||
numberValue: node.number_value,
|
||||
foreignNodeId: node.foreign_node_id,
|
||||
createdAt: node.created_at,
|
||||
createdBy: node.created_by,
|
||||
updatedAt: node.updated_at,
|
||||
updatedBy: node.updated_by,
|
||||
versionId: node.version_id,
|
||||
serverCreatedAt: node.server_created_at,
|
||||
serverUpdatedAt: node.server_updated_at,
|
||||
};
|
||||
};
|
||||
@@ -148,7 +148,8 @@ const mapNode = (node: NodeChangeData): ServerNode => {
|
||||
parentId: node.parent_id,
|
||||
type: node.type,
|
||||
index: node.index,
|
||||
content: node.content ? JSON.parse(node.content) : null,
|
||||
attributes: JSON.parse(node.attributes),
|
||||
state: node.state,
|
||||
createdAt: node.created_at,
|
||||
createdBy: node.created_by,
|
||||
updatedAt: node.updated_at,
|
||||
|
||||
@@ -23,9 +23,6 @@ export const producer = kafka.producer();
|
||||
export const TOPIC_NAMES = {
|
||||
NODE_CHANGES:
|
||||
process.env.KAFKA_NODE_CHANGES_TOPIC_NAME ?? 'neuron_node_changes',
|
||||
NODE_ATTRIBUTE_CHANGES:
|
||||
process.env.KAFKA_NODE_ATTRIBUTE_CHANGES_TOPIC_NAME ??
|
||||
'neuron_node_attribute_changes',
|
||||
NODE_REACTION_CHANGES:
|
||||
process.env.KAFKA_NODE_REACTION_CHANGES_TOPIC_NAME ??
|
||||
'neuron_node_reaction_changes',
|
||||
@@ -37,9 +34,6 @@ export const CONSUMER_IDS = {
|
||||
NODE_CHANGES:
|
||||
process.env.KAFKA_NODE_CHANGES_CONSUMER_ID ??
|
||||
'neuron_node_changes_consumer',
|
||||
NODE_ATTRIBUTE_CHANGES:
|
||||
process.env.KAFKA_NODE_ATTRIBUTE_CHANGES_CONSUMER_ID ??
|
||||
'neuron_node_attribute_changes_consumer',
|
||||
NODE_REACTION_CHANGES:
|
||||
process.env.KAFKA_NODE_REACTION_CHANGES_CONSUMER_ID ??
|
||||
'neuron_node_reaction_changes_consumer',
|
||||
|
||||
@@ -70,51 +70,24 @@ const createWorkspaceAccountsTable: Migration = {
|
||||
|
||||
const createNodesTable: Migration = {
|
||||
up: async (db) => {
|
||||
await db.schema
|
||||
.createTable('nodes')
|
||||
.addColumn('id', 'varchar(30)', (col) => col.notNull().primaryKey())
|
||||
.addColumn('workspace_id', 'varchar(30)', (col) => col.notNull())
|
||||
.addColumn('parent_id', 'varchar(30)', (col) =>
|
||||
col.references('nodes.id').onDelete('cascade'),
|
||||
)
|
||||
.addColumn('type', 'varchar(30)', (col) => col.notNull())
|
||||
.addColumn('index', 'varchar(30)')
|
||||
.addColumn('content', 'jsonb')
|
||||
.addColumn('created_at', 'timestamptz', (col) => col.notNull())
|
||||
.addColumn('created_by', 'varchar(30)', (col) => col.notNull())
|
||||
.addColumn('updated_at', 'timestamptz')
|
||||
.addColumn('updated_by', 'varchar(30)')
|
||||
.addColumn('version_id', 'varchar(30)', (col) => col.notNull())
|
||||
.addColumn('server_created_at', 'timestamptz', (col) => col.notNull())
|
||||
.addColumn('server_updated_at', 'timestamptz')
|
||||
.execute();
|
||||
},
|
||||
down: async (db) => {
|
||||
await db.schema.dropTable('nodes').execute();
|
||||
},
|
||||
};
|
||||
|
||||
const createNodeAttributesTable: Migration = {
|
||||
up: async (db) => {
|
||||
await db.schema
|
||||
.createTable('node_attributes')
|
||||
.addColumn('node_id', 'varchar(30)', (col) => col.notNull())
|
||||
.addColumn('type', 'varchar(30)', (col) => col.notNull())
|
||||
.addColumn('key', 'varchar(30)', (col) => col.notNull())
|
||||
.addColumn('workspace_id', 'varchar(30)', (col) => col.notNull())
|
||||
.addColumn('text_value', 'varchar(30)')
|
||||
.addColumn('number_value', 'numeric')
|
||||
.addColumn('foreign_node_id', 'varchar(30)', (col) =>
|
||||
col.references('nodes.id').onDelete('cascade'),
|
||||
)
|
||||
.addColumn('created_at', 'timestamptz', (col) => col.notNull())
|
||||
.addColumn('created_by', 'varchar(30)', (col) => col.notNull())
|
||||
.addColumn('updated_at', 'timestamptz')
|
||||
.addColumn('updated_by', 'varchar(30)')
|
||||
.addColumn('version_id', 'varchar(30)', (col) => col.notNull())
|
||||
.addColumn('server_created_at', 'timestamptz', (col) => col.notNull())
|
||||
.addColumn('server_updated_at', 'timestamptz')
|
||||
.execute();
|
||||
await sql`
|
||||
CREATE TABLE nodes (
|
||||
id VARCHAR(30) PRIMARY KEY NOT NULL,
|
||||
workspace_id VARCHAR(30) NOT NULL,
|
||||
type VARCHAR(30) GENERATED ALWAYS AS ((attributes->>'type')::VARCHAR(30)) STORED,
|
||||
parent_id VARCHAR(30) GENERATED ALWAYS AS ((attributes->>'parentId')::VARCHAR(30)) STORED,
|
||||
"index" VARCHAR(30) GENERATED ALWAYS AS ((attributes->>'index')::VARCHAR(30)) STORED,
|
||||
attributes JSONB NOT NULL,
|
||||
state TEXT NOT NULL,
|
||||
created_at TIMESTAMPTZ NOT NULL,
|
||||
updated_at TIMESTAMPTZ,
|
||||
created_by VARCHAR(30) NOT NULL,
|
||||
updated_by VARCHAR(30),
|
||||
version_id VARCHAR(30) NOT NULL,
|
||||
server_created_at TIMESTAMPTZ,
|
||||
server_updated_at TIMESTAMPTZ,
|
||||
FOREIGN KEY (parent_id) REFERENCES nodes(id) ON DELETE CASCADE
|
||||
)`.execute(db);
|
||||
},
|
||||
down: async (db) => {
|
||||
await db.schema.dropTable('nodes').execute();
|
||||
@@ -192,8 +165,7 @@ export const databaseMigrations: Record<string, Migration> = {
|
||||
'00002_create_workspaces_table': createWorkspacesTable,
|
||||
'00003_create_workspace_accounts_table': createWorkspaceAccountsTable,
|
||||
'00004_create_nodes_table': createNodesTable,
|
||||
'00005_create_node_attributes_table': createNodeAttributesTable,
|
||||
'00006_create_account_devices_table': createAccountDevicesTable,
|
||||
'00007_create_mutations_table': createMutationsTable,
|
||||
'00008_create_node_reactions_table': createNodeReactionsTable,
|
||||
'00005_create_account_devices_table': createAccountDevicesTable,
|
||||
'00006_create_mutations_table': createMutationsTable,
|
||||
'00007_create_node_reactions_table': createNodeReactionsTable,
|
||||
};
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import { NodeBlock } from '@/types/nodes';
|
||||
import { ServerNodeAttributes } from '@/types/nodes';
|
||||
import {
|
||||
ColumnType,
|
||||
Insertable,
|
||||
@@ -79,10 +79,15 @@ export type UpdateAccountDevice = Updateable<AccountDeviceTable>;
|
||||
interface NodeTable {
|
||||
id: ColumnType<string, string, never>;
|
||||
workspace_id: ColumnType<string, string, never>;
|
||||
parent_id: ColumnType<string | null, string | null, string | null>;
|
||||
type: ColumnType<string, string, string>;
|
||||
index: ColumnType<string | null, string | null, string | null>;
|
||||
content: JSONColumnType<NodeBlock[] | null, string | null, string | null>;
|
||||
parent_id: ColumnType<string | null, never, never>;
|
||||
type: ColumnType<string, never, never>;
|
||||
index: ColumnType<string | null, never, never>;
|
||||
attributes: JSONColumnType<
|
||||
ServerNodeAttributes,
|
||||
string | null,
|
||||
string | null
|
||||
>;
|
||||
state: ColumnType<string, string, string>;
|
||||
created_at: ColumnType<Date, Date, never>;
|
||||
updated_at: ColumnType<Date | null, Date | null, Date>;
|
||||
created_by: ColumnType<string, string, never>;
|
||||
@@ -96,27 +101,6 @@ export type SelectNode = Selectable<NodeTable>;
|
||||
export type CreateNode = Insertable<NodeTable>;
|
||||
export type UpdateNode = Updateable<NodeTable>;
|
||||
|
||||
interface NodeAttributeTable {
|
||||
node_id: ColumnType<string, string, never>;
|
||||
type: ColumnType<string, string, never>;
|
||||
key: ColumnType<string, string, string>;
|
||||
workspace_id: ColumnType<string, string, never>;
|
||||
text_value: ColumnType<string | null, string | null, string | null>;
|
||||
number_value: ColumnType<number | null, number | null, number | null>;
|
||||
foreign_node_id: ColumnType<string | null, string | null, string | null>;
|
||||
created_at: ColumnType<Date, Date, never>;
|
||||
updated_at: ColumnType<Date | null, Date | null, Date>;
|
||||
created_by: ColumnType<string, string, never>;
|
||||
updated_by: ColumnType<string | null, string | null, string>;
|
||||
version_id: ColumnType<string, string, string>;
|
||||
server_created_at: ColumnType<Date, Date, never>;
|
||||
server_updated_at: ColumnType<Date | null, Date | null, Date>;
|
||||
}
|
||||
|
||||
export type SelectNodeAttribute = Selectable<NodeAttributeTable>;
|
||||
export type CreateNodeAttribute = Insertable<NodeAttributeTable>;
|
||||
export type UpdateNodeAttribute = Updateable<NodeAttributeTable>;
|
||||
|
||||
interface NodeReactionTable {
|
||||
node_id: ColumnType<string, string, never>;
|
||||
reactor_id: ColumnType<string, string, never>;
|
||||
@@ -151,7 +135,6 @@ export interface DatabaseSchema {
|
||||
workspace_accounts: WorkspaceAccountTable;
|
||||
account_devices: AccountDeviceTable;
|
||||
nodes: NodeTable;
|
||||
node_attributes: NodeAttributeTable;
|
||||
node_reactions: NodeReactionTable;
|
||||
mutations: MutationTable;
|
||||
}
|
||||
|
||||
@@ -3,11 +3,12 @@ import { Router } from 'express';
|
||||
import {
|
||||
ExecuteLocalMutationsInput,
|
||||
LocalMutation,
|
||||
LocalNodeAttributeMutationData,
|
||||
LocalNodeMutationData,
|
||||
LocalNodeReactionMutationData,
|
||||
} from '@/types/mutations';
|
||||
import { database } from '@/data/database';
|
||||
import * as Y from 'yjs';
|
||||
import { fromUint8Array, toUint8Array } from 'js-base64';
|
||||
|
||||
export const mutationsRouter = Router();
|
||||
|
||||
@@ -30,31 +31,7 @@ mutationsRouter.post('/', async (req: NeuronRequest, res: NeuronResponse) => {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
case 'node_attributes': {
|
||||
switch (mutation.action) {
|
||||
case 'insert': {
|
||||
await handleCreateNodeAttributeMutation(
|
||||
input.workspaceId,
|
||||
mutation,
|
||||
);
|
||||
break;
|
||||
}
|
||||
case 'update': {
|
||||
await handleUpdateNodeAttributeMutation(
|
||||
input.workspaceId,
|
||||
mutation,
|
||||
);
|
||||
break;
|
||||
}
|
||||
case 'delete': {
|
||||
await handleDeleteNodeAttributeMutation(
|
||||
input.workspaceId,
|
||||
mutation,
|
||||
);
|
||||
break;
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
case 'node_reactions': {
|
||||
switch (mutation.action) {
|
||||
@@ -67,6 +44,7 @@ mutationsRouter.post('/', async (req: NeuronRequest, res: NeuronResponse) => {
|
||||
break;
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -96,11 +74,9 @@ const handleCreateNodeMutation = async (
|
||||
.insertInto('nodes')
|
||||
.values({
|
||||
id: nodeData.id,
|
||||
parent_id: nodeData.parent_id,
|
||||
attributes: nodeData.attributes,
|
||||
workspace_id: workspaceId,
|
||||
type: nodeData.type,
|
||||
index: nodeData.index,
|
||||
content: nodeData.content,
|
||||
state: nodeData.state,
|
||||
created_at: new Date(nodeData.created_at),
|
||||
created_by: nodeData.created_by,
|
||||
version_id: nodeData.version_id,
|
||||
@@ -120,10 +96,11 @@ const handleUpdateNodeMutation = async (
|
||||
const nodeData = JSON.parse(mutation.after) as LocalNodeMutationData;
|
||||
const existingNode = await database
|
||||
.selectFrom('nodes')
|
||||
.select(['id', 'workspace_id', 'state'])
|
||||
.where('id', '=', nodeData.id)
|
||||
.executeTakeFirst();
|
||||
|
||||
if (!existingNode) {
|
||||
if (!existingNode || existingNode.workspace_id != workspaceId) {
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -132,13 +109,22 @@ const handleUpdateNodeMutation = async (
|
||||
: new Date();
|
||||
const updatedBy = nodeData.updated_by ?? nodeData.created_by;
|
||||
|
||||
const doc = new Y.Doc({
|
||||
guid: nodeData.id,
|
||||
});
|
||||
|
||||
Y.applyUpdate(doc, toUint8Array(existingNode.state));
|
||||
Y.applyUpdate(doc, toUint8Array(nodeData.state));
|
||||
|
||||
const attributesMap = doc.getMap('attributes');
|
||||
const attributes = JSON.stringify(attributesMap.toJSON());
|
||||
const encodedState = fromUint8Array(Y.encodeStateAsUpdate(doc));
|
||||
|
||||
await database
|
||||
.updateTable('nodes')
|
||||
.set({
|
||||
parent_id: nodeData.parent_id,
|
||||
type: nodeData.type,
|
||||
index: nodeData.index,
|
||||
content: nodeData.content,
|
||||
attributes: attributes,
|
||||
state: encodedState,
|
||||
updated_at: updatedAt,
|
||||
updated_by: updatedBy,
|
||||
version_id: nodeData.version_id,
|
||||
@@ -170,146 +156,6 @@ const handleDeleteNodeMutation = async (
|
||||
await database.deleteFrom('nodes').where('id', '=', nodeData.id).execute();
|
||||
};
|
||||
|
||||
const handleCreateNodeAttributeMutation = async (
|
||||
workspaceId: string,
|
||||
mutation: LocalMutation,
|
||||
): Promise<void> => {
|
||||
if (!mutation.after) {
|
||||
return;
|
||||
}
|
||||
|
||||
const nodeAttributeData = JSON.parse(
|
||||
mutation.after,
|
||||
) as LocalNodeAttributeMutationData;
|
||||
const existingNodeAttribute = await database
|
||||
.selectFrom('node_attributes')
|
||||
.where((eb) =>
|
||||
eb.and([
|
||||
eb('node_id', '=', nodeAttributeData.node_id),
|
||||
eb('type', '=', nodeAttributeData.type),
|
||||
eb('key', '=', nodeAttributeData.key),
|
||||
]),
|
||||
)
|
||||
.executeTakeFirst();
|
||||
|
||||
if (existingNodeAttribute) {
|
||||
return;
|
||||
}
|
||||
|
||||
await database
|
||||
.insertInto('node_attributes')
|
||||
.values({
|
||||
node_id: nodeAttributeData.node_id,
|
||||
type: nodeAttributeData.type,
|
||||
key: nodeAttributeData.key,
|
||||
workspace_id: workspaceId,
|
||||
text_value: nodeAttributeData.text_value,
|
||||
number_value: nodeAttributeData.number_value,
|
||||
foreign_node_id: nodeAttributeData.foreign_node_id,
|
||||
created_at: new Date(nodeAttributeData.created_at),
|
||||
created_by: nodeAttributeData.created_by,
|
||||
version_id: nodeAttributeData.version_id,
|
||||
server_created_at: new Date(),
|
||||
})
|
||||
.execute();
|
||||
};
|
||||
|
||||
const handleUpdateNodeAttributeMutation = async (
|
||||
workspaceId: string,
|
||||
mutation: LocalMutation,
|
||||
): Promise<void> => {
|
||||
if (!mutation.after) {
|
||||
return;
|
||||
}
|
||||
|
||||
const nodeAttributeData = JSON.parse(
|
||||
mutation.after,
|
||||
) as LocalNodeAttributeMutationData;
|
||||
const existingNodeAttribute = await database
|
||||
.selectFrom('node_attributes')
|
||||
.where((eb) =>
|
||||
eb.and([
|
||||
eb('node_id', '=', nodeAttributeData.node_id),
|
||||
eb('type', '=', nodeAttributeData.type),
|
||||
eb('key', '=', nodeAttributeData.key),
|
||||
]),
|
||||
)
|
||||
.executeTakeFirst();
|
||||
|
||||
if (!existingNodeAttribute) {
|
||||
return;
|
||||
}
|
||||
|
||||
const updatedAt = nodeAttributeData.updated_at
|
||||
? new Date(nodeAttributeData.updated_at)
|
||||
: new Date();
|
||||
|
||||
const updatedBy =
|
||||
nodeAttributeData.updated_by ?? nodeAttributeData.created_by;
|
||||
|
||||
await database
|
||||
.updateTable('node_attributes')
|
||||
.set({
|
||||
text_value: nodeAttributeData.text_value,
|
||||
number_value: nodeAttributeData.number_value,
|
||||
foreign_node_id: nodeAttributeData.foreign_node_id,
|
||||
updated_at: updatedAt,
|
||||
updated_by: updatedBy,
|
||||
version_id: nodeAttributeData.version_id,
|
||||
server_updated_at: new Date(),
|
||||
})
|
||||
.where((eb) =>
|
||||
eb.and([
|
||||
eb('node_id', '=', nodeAttributeData.node_id),
|
||||
eb('type', '=', nodeAttributeData.type),
|
||||
eb('key', '=', nodeAttributeData.key),
|
||||
]),
|
||||
)
|
||||
.execute();
|
||||
};
|
||||
|
||||
const handleDeleteNodeAttributeMutation = async (
|
||||
workspaceId: string,
|
||||
mutation: LocalMutation,
|
||||
): Promise<void> => {
|
||||
if (!mutation.before) {
|
||||
return;
|
||||
}
|
||||
|
||||
const nodeAttributeData = JSON.parse(
|
||||
mutation.before,
|
||||
) as LocalNodeAttributeMutationData;
|
||||
const existingNodeAttribute = await database
|
||||
.selectFrom('node_attributes')
|
||||
.selectAll()
|
||||
.where((eb) =>
|
||||
eb.and([
|
||||
eb('node_id', '=', nodeAttributeData.node_id),
|
||||
eb('type', '=', nodeAttributeData.type),
|
||||
eb('key', '=', nodeAttributeData.key),
|
||||
]),
|
||||
)
|
||||
.executeTakeFirst();
|
||||
|
||||
if (
|
||||
!existingNodeAttribute ||
|
||||
existingNodeAttribute.workspace_id !== workspaceId
|
||||
) {
|
||||
return;
|
||||
}
|
||||
|
||||
await database
|
||||
.deleteFrom('node_attributes')
|
||||
.where((eb) =>
|
||||
eb.and([
|
||||
eb('node_id', '=', nodeAttributeData.node_id),
|
||||
eb('type', '=', nodeAttributeData.type),
|
||||
eb('key', '=', nodeAttributeData.key),
|
||||
]),
|
||||
)
|
||||
.execute();
|
||||
};
|
||||
|
||||
const handleCreateNodeReactionMutation = async (
|
||||
workspaceId: string,
|
||||
mutation: LocalMutation,
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import { NeuronRequest, NeuronResponse } from '@/types/api';
|
||||
import { database } from '@/data/database';
|
||||
import { Router } from 'express';
|
||||
import { ServerNode, ServerNodeAttribute } from '@/types/nodes';
|
||||
import { ServerNode } from '@/types/nodes';
|
||||
|
||||
export const syncRouter = Router();
|
||||
|
||||
@@ -15,12 +15,6 @@ syncRouter.get(
|
||||
.where('workspace_id', '=', workspaceId)
|
||||
.execute();
|
||||
|
||||
const nodeAttributeRows = await database
|
||||
.selectFrom('node_attributes')
|
||||
.selectAll()
|
||||
.where('workspace_id', '=', workspaceId)
|
||||
.execute();
|
||||
|
||||
const nodes: ServerNode[] = nodeRows.map((node) => {
|
||||
return {
|
||||
id: node.id,
|
||||
@@ -28,10 +22,11 @@ syncRouter.get(
|
||||
workspaceId: node.workspace_id,
|
||||
type: node.type,
|
||||
index: node.index,
|
||||
attributes: node.attributes,
|
||||
state: node.state,
|
||||
createdAt: node.created_at.toISOString(),
|
||||
createdBy: node.created_by,
|
||||
versionId: node.version_id,
|
||||
content: node.content,
|
||||
updatedAt: node.updated_at?.toISOString(),
|
||||
updatedBy: node.updated_by,
|
||||
serverCreatedAt: node.server_created_at.toISOString(),
|
||||
@@ -39,28 +34,8 @@ syncRouter.get(
|
||||
};
|
||||
});
|
||||
|
||||
const nodeAttributes: ServerNodeAttribute[] = nodeAttributeRows.map(
|
||||
(nodeAttribute) => {
|
||||
return {
|
||||
nodeId: nodeAttribute.node_id,
|
||||
type: nodeAttribute.type,
|
||||
key: nodeAttribute.key,
|
||||
textValue: nodeAttribute.text_value,
|
||||
numberValue: nodeAttribute.number_value,
|
||||
foreignNodeId: nodeAttribute.foreign_node_id,
|
||||
workspaceId: nodeAttribute.workspace_id,
|
||||
createdAt: nodeAttribute.created_at.toISOString(),
|
||||
createdBy: nodeAttribute.created_by,
|
||||
versionId: nodeAttribute.version_id,
|
||||
serverCreatedAt: nodeAttribute.server_created_at.toISOString(),
|
||||
serverUpdatedAt: nodeAttribute.server_updated_at?.toISOString(),
|
||||
};
|
||||
},
|
||||
);
|
||||
|
||||
res.status(200).json({
|
||||
nodes,
|
||||
nodeAttributes,
|
||||
});
|
||||
},
|
||||
);
|
||||
|
||||
@@ -11,6 +11,8 @@ import { ApiError, NeuronRequest, NeuronResponse } from '@/types/api';
|
||||
import { NeuronId } from '@/lib/id';
|
||||
import { database } from '@/data/database';
|
||||
import { Router } from 'express';
|
||||
import * as Y from 'yjs';
|
||||
import { fromUint8Array } from 'js-base64';
|
||||
|
||||
export const workspacesRouter = Router();
|
||||
|
||||
@@ -57,6 +59,20 @@ workspacesRouter.post('/', async (req: NeuronRequest, res: NeuronResponse) => {
|
||||
|
||||
const userId = NeuronId.generate(NeuronId.Type.User);
|
||||
const userVersionId = NeuronId.generate(NeuronId.Type.Version);
|
||||
const userDoc = new Y.Doc({
|
||||
guid: userId,
|
||||
});
|
||||
|
||||
const userAttributesMap = userDoc.getMap('attributes');
|
||||
userDoc.transact(() => {
|
||||
userAttributesMap.set('type', 'user');
|
||||
userAttributesMap.set('name', account.name);
|
||||
userAttributesMap.set('avatar', account.avatar);
|
||||
});
|
||||
|
||||
const userAttributes = JSON.stringify(userAttributesMap.toJSON());
|
||||
const userState = fromUint8Array(Y.encodeStateAsUpdate(userDoc));
|
||||
|
||||
const workspaceAccount: WorkspaceAccount = {
|
||||
accountId: req.accountId,
|
||||
workspaceId: workspace.id,
|
||||
@@ -88,7 +104,8 @@ workspacesRouter.post('/', async (req: NeuronRequest, res: NeuronResponse) => {
|
||||
.values({
|
||||
id: userId,
|
||||
workspace_id: workspace.id,
|
||||
type: 'user',
|
||||
attributes: userAttributes,
|
||||
state: userState,
|
||||
created_at: workspaceAccount.createdAt,
|
||||
created_by: workspaceAccount.createdBy,
|
||||
version_id: userVersionId,
|
||||
@@ -96,34 +113,6 @@ workspacesRouter.post('/', async (req: NeuronRequest, res: NeuronResponse) => {
|
||||
})
|
||||
.execute();
|
||||
|
||||
await trx
|
||||
.insertInto('node_attributes')
|
||||
.values([
|
||||
{
|
||||
node_id: userId,
|
||||
type: 'name',
|
||||
key: '1',
|
||||
workspace_id: workspace.id,
|
||||
text_value: account.name,
|
||||
created_at: workspaceAccount.createdAt,
|
||||
created_by: workspaceAccount.createdBy,
|
||||
version_id: userVersionId,
|
||||
server_created_at: new Date(),
|
||||
},
|
||||
{
|
||||
node_id: userId,
|
||||
type: 'avatar',
|
||||
key: '1',
|
||||
workspace_id: workspace.id,
|
||||
text_value: account.avatar,
|
||||
created_at: workspaceAccount.createdAt,
|
||||
created_by: workspaceAccount.createdBy,
|
||||
version_id: userVersionId,
|
||||
server_created_at: new Date(),
|
||||
},
|
||||
])
|
||||
.execute();
|
||||
|
||||
await trx
|
||||
.insertInto('workspace_accounts')
|
||||
.values({
|
||||
|
||||
@@ -42,25 +42,8 @@ export type NodeChangeData = {
|
||||
parent_id: string | null;
|
||||
type: string;
|
||||
index: string | null;
|
||||
attrs: string | null;
|
||||
content: string | null;
|
||||
created_at: string;
|
||||
created_by: string;
|
||||
updated_at: string | null;
|
||||
updated_by: string | null;
|
||||
version_id: string;
|
||||
server_created_at: string;
|
||||
server_updated_at: string | null;
|
||||
};
|
||||
|
||||
export type NodeAttributeChangeData = {
|
||||
node_id: string;
|
||||
type: string;
|
||||
key: string;
|
||||
workspace_id: string;
|
||||
text_value: string | null;
|
||||
number_value: number | null;
|
||||
foreign_node_id: string | null;
|
||||
attributes: string;
|
||||
state: string;
|
||||
created_at: string;
|
||||
created_by: string;
|
||||
updated_at: string | null;
|
||||
|
||||
@@ -14,28 +14,8 @@ export type LocalMutation = {
|
||||
|
||||
export type LocalNodeMutationData = {
|
||||
id: string;
|
||||
type: string;
|
||||
parent_id: string | null;
|
||||
index: string | null;
|
||||
attrs?: string | null;
|
||||
content?: string | null;
|
||||
created_at: string;
|
||||
updated_at?: string | null;
|
||||
created_by: string;
|
||||
updated_by?: string | null;
|
||||
version_id: string;
|
||||
server_created_at: string;
|
||||
server_updated_at: string;
|
||||
server_version_id: string;
|
||||
};
|
||||
|
||||
export type LocalNodeAttributeMutationData = {
|
||||
node_id: string;
|
||||
type: string;
|
||||
key: string;
|
||||
text_value: string | null;
|
||||
number_value: number | null;
|
||||
foreign_node_id: string | null;
|
||||
attributes: string;
|
||||
state: string;
|
||||
created_at: string;
|
||||
updated_at?: string | null;
|
||||
created_by: string;
|
||||
|
||||
@@ -4,7 +4,8 @@ export type ServerNode = {
|
||||
parentId?: string | null;
|
||||
type: string;
|
||||
index: string | null;
|
||||
content?: NodeBlock[] | null;
|
||||
attributes: ServerNodeAttributes | null;
|
||||
state: string;
|
||||
createdAt: string;
|
||||
createdBy: string;
|
||||
updatedAt?: string | null;
|
||||
@@ -14,21 +15,12 @@ export type ServerNode = {
|
||||
serverUpdatedAt?: string | null;
|
||||
};
|
||||
|
||||
export type ServerNodeAttribute = {
|
||||
nodeId: string;
|
||||
export type ServerNodeAttributes = {
|
||||
type: string;
|
||||
key: string;
|
||||
workspaceId: string;
|
||||
textValue: string | null;
|
||||
numberValue: number | null;
|
||||
foreignNodeId: string | null;
|
||||
createdAt: string;
|
||||
createdBy: string;
|
||||
updatedAt?: string | null;
|
||||
updatedBy?: string | null;
|
||||
versionId: string;
|
||||
serverCreatedAt: string;
|
||||
serverUpdatedAt?: string | null;
|
||||
parentId?: string | null;
|
||||
index?: string | null;
|
||||
content?: NodeBlock[] | null;
|
||||
[key: string]: any;
|
||||
};
|
||||
|
||||
export type ServerNodeReaction = {
|
||||
|
||||
Reference in New Issue
Block a user