From 0412c6d7bf2e2a73380734c93c2ad1ecd1edc712 Mon Sep 17 00:00:00 2001 From: Hakan Shehu Date: Mon, 14 Oct 2024 14:25:48 +0200 Subject: [PATCH] Remove Kafka dependency --- desktop/src/main/data/app/migrations.ts | 18 +- desktop/src/main/data/app/schema.ts | 1 - .../main/handlers/mutations/email-login.ts | 35 ++- .../main/handlers/mutations/email-register.ts | 35 ++- .../handlers/mutations/workspace-create.ts | 39 ++- .../main/handlers/queries/workspace-list.ts | 1 - desktop/src/types/accounts.ts | 4 +- desktop/src/types/workspaces.ts | 17 +- server/package-lock.json | 268 ++++++++++++++++++ server/package.json | 1 + server/src/consumers/change-cdc.ts | 59 ---- server/src/consumers/changes-subcriber.ts | 54 +++- server/src/consumers/node-cdc.ts | 178 ------------ server/src/consumers/node-collaborator-cdc.ts | 192 ------------- server/src/consumers/node-reaction-cdc.ts | 143 ---------- server/src/data/database.ts | 32 ++- server/src/data/kafka.ts | 54 ---- server/src/data/migrations.ts | 30 +- server/src/data/schema.ts | 14 +- server/src/index.ts | 24 +- server/src/lib/nodes.ts | 27 +- server/src/queues/changes.ts | 171 +++++++++++ server/src/routes/accounts.ts | 29 +- server/src/routes/workspaces.ts | 252 ++++++++++++---- server/src/sockets/socket-connection.ts | 62 +++- server/src/sockets/socket-manager.ts | 12 + server/src/sync/node-collaborators.ts | 192 ++++++++++--- server/src/sync/node-reactions.ts | 111 ++++++-- server/src/sync/nodes.ts | 169 +++++++++-- server/src/types/cdc.ts | 74 ----- server/src/types/sync.ts | 8 +- server/src/types/workspaces.ts | 24 +- server/tsconfig.json | 2 +- 33 files changed, 1356 insertions(+), 976 deletions(-) delete mode 100644 server/src/consumers/change-cdc.ts delete mode 100644 server/src/consumers/node-cdc.ts delete mode 100644 server/src/consumers/node-collaborator-cdc.ts delete mode 100644 server/src/consumers/node-reaction-cdc.ts delete mode 100644 server/src/data/kafka.ts create mode 100644 server/src/queues/changes.ts delete mode 100644 server/src/types/cdc.ts diff --git a/desktop/src/main/data/app/migrations.ts b/desktop/src/main/data/app/migrations.ts index 836d311e..7d932a4a 100644 --- a/desktop/src/main/data/app/migrations.ts +++ b/desktop/src/main/data/app/migrations.ts @@ -12,14 +12,7 @@ const createServersTable: Migration = { .addColumn('created_at', 'text', (col) => col.notNull()) .addColumn('last_synced_at', 'text') .execute(); - }, - down: async (db) => { - await db.schema.dropTable('servers').execute(); - }, -}; -const insertNeuronServers: Migration = { - up: async (db) => { await db .insertInto('servers') .values([ @@ -51,10 +44,7 @@ const insertNeuronServers: Migration = { .execute(); }, down: async (db) => { - await db - .deleteFrom('servers') - .where('id', 'in', ['localhost', 'eu.neuronapp.io', 'us.neuronapp.io']) - .execute(); + await db.schema.dropTable('servers').execute(); }, }; @@ -91,7 +81,6 @@ const createWorkspacesTable: Migration = { .addColumn('avatar', 'text') .addColumn('version_id', 'text', (col) => col.notNull()) .addColumn('role', 'text', (col) => col.notNull()) - .addColumn('synced', 'integer') .execute(); }, down: async (db) => { @@ -101,7 +90,6 @@ const createWorkspacesTable: Migration = { export const appDatabaseMigrations: Record = { '00001_create_servers_table': createServersTable, - '00002_insert_neuron_servers': insertNeuronServers, - '00003_create_accounts_table': createAccountsTable, - '00004_create_workspaces_table': createWorkspacesTable, + '00002_create_accounts_table': createAccountsTable, + '00003_create_workspaces_table': createWorkspacesTable, }; diff --git a/desktop/src/main/data/app/schema.ts b/desktop/src/main/data/app/schema.ts index c1c7d42e..b305d454 100644 --- a/desktop/src/main/data/app/schema.ts +++ b/desktop/src/main/data/app/schema.ts @@ -38,7 +38,6 @@ interface WorkspaceTable { avatar: ColumnType; version_id: ColumnType; role: ColumnType; - synced: ColumnType; } export type SelectWorkspace = Selectable; diff --git a/desktop/src/main/handlers/mutations/email-login.ts b/desktop/src/main/handlers/mutations/email-login.ts index d84429e5..1ea49b59 100644 --- a/desktop/src/main/handlers/mutations/email-login.ts +++ b/desktop/src/main/handlers/mutations/email-login.ts @@ -66,10 +66,10 @@ export class EmailLoginMutationHandler name: workspace.name, account_id: data.account.id, avatar: workspace.avatar, - role: workspace.role, + role: workspace.user.role, description: workspace.description, synced: 0, - user_id: workspace.userId, + user_id: workspace.user.id, version_id: workspace.versionId, })), ) @@ -79,6 +79,37 @@ export class EmailLoginMutationHandler type: 'app', table: 'workspaces', }); + + for (const workspace of data.workspaces) { + const workspaceDatabase = await databaseManager.getWorkspaceDatabase( + workspace.id, + ); + + const user = workspace.user.node; + await workspaceDatabase + .insertInto('nodes') + .values({ + id: user.id, + attributes: JSON.stringify(user.attributes), + state: user.state, + created_at: user.createdAt, + created_by: user.createdBy, + updated_at: user.updatedAt, + updated_by: user.updatedBy, + server_created_at: user.serverCreatedAt, + server_updated_at: user.serverUpdatedAt, + version_id: user.versionId, + server_version_id: user.versionId, + }) + .onConflict((cb) => cb.doNothing()) + .execute(); + + changedTables.push({ + type: 'workspace', + table: 'nodes', + userId: workspace.user.id, + }); + } }); return { diff --git a/desktop/src/main/handlers/mutations/email-register.ts b/desktop/src/main/handlers/mutations/email-register.ts index 0eec7cf3..f5e53d07 100644 --- a/desktop/src/main/handlers/mutations/email-register.ts +++ b/desktop/src/main/handlers/mutations/email-register.ts @@ -70,10 +70,10 @@ export class EmailRegisterMutationHandler name: workspace.name, account_id: data.account.id, avatar: workspace.avatar, - role: workspace.role, + role: workspace.user.role, description: workspace.description, synced: 0, - user_id: workspace.userId, + user_id: workspace.user.id, version_id: workspace.versionId, })), ) @@ -83,6 +83,37 @@ export class EmailRegisterMutationHandler type: 'app', table: 'workspaces', }); + + for (const workspace of data.workspaces) { + const workspaceDatabase = await databaseManager.getWorkspaceDatabase( + workspace.id, + ); + + const user = workspace.user.node; + await workspaceDatabase + .insertInto('nodes') + .values({ + id: user.id, + attributes: JSON.stringify(user.attributes), + state: user.state, + created_at: user.createdAt, + created_by: user.createdBy, + updated_at: user.updatedAt, + updated_by: user.updatedBy, + server_created_at: user.serverCreatedAt, + server_updated_at: user.serverUpdatedAt, + version_id: user.versionId, + server_version_id: user.versionId, + }) + .onConflict((cb) => cb.doNothing()) + .execute(); + + changedTables.push({ + type: 'workspace', + table: 'nodes', + userId: workspace.user.id, + }); + } }); return { diff --git a/desktop/src/main/handlers/mutations/workspace-create.ts b/desktop/src/main/handlers/mutations/workspace-create.ts index 7a7238c9..cf374b89 100644 --- a/desktop/src/main/handlers/mutations/workspace-create.ts +++ b/desktop/src/main/handlers/mutations/workspace-create.ts @@ -6,7 +6,7 @@ import { MutationHandler, MutationResult, } from '@/operations/mutations'; -import { Workspace } from '@/types/workspaces'; +import { WorkspaceOutput } from '@/types/workspaces'; export class WorkspaceCreateMutationHandler implements MutationHandler @@ -39,7 +39,7 @@ export class WorkspaceCreateMutationHandler server.attributes, account.token, ); - const { data } = await axios.post(`/v1/workspaces`, { + const { data } = await axios.post(`/v1/workspaces`, { name: input.name, description: input.description, avatar: input.avatar, @@ -49,23 +49,50 @@ export class WorkspaceCreateMutationHandler .insertInto('workspaces') .values({ workspace_id: data.id ?? data.id, - account_id: data.accountId, + account_id: data.user.accountId, name: data.name, description: data.description, avatar: data.avatar, - role: data.role, - synced: 0, - user_id: data.userId, + role: data.user.role, + user_id: data.user.id, version_id: data.versionId, }) .onConflict((cb) => cb.doNothing()) .execute(); + const workspaceDatabase = await databaseManager.getWorkspaceDatabase( + data.user.id, + ); + + const user = data.user.node; + await workspaceDatabase + .insertInto('nodes') + .values({ + id: user.id, + attributes: JSON.stringify(user.attributes), + state: user.state, + created_at: user.createdAt, + created_by: user.createdBy, + updated_at: user.updatedAt, + updated_by: user.updatedBy, + server_created_at: user.serverCreatedAt, + server_updated_at: user.serverUpdatedAt, + version_id: user.versionId, + server_version_id: user.versionId, + }) + .onConflict((cb) => cb.doNothing()) + .execute(); + const changedTables: MutationChange[] = [ { type: 'app', table: 'workspaces', }, + { + type: 'workspace', + table: 'nodes', + userId: user.id, + }, ]; return { diff --git a/desktop/src/main/handlers/queries/workspace-list.ts b/desktop/src/main/handlers/queries/workspace-list.ts index 5bb39068..08a0c736 100644 --- a/desktop/src/main/handlers/queries/workspace-list.ts +++ b/desktop/src/main/handlers/queries/workspace-list.ts @@ -85,7 +85,6 @@ export class WorkspaceListQueryHandler accountId: row.account_id, role: row.role as WorkspaceRole, userId: row.user_id, - synced: row.synced === 1, }; }); } diff --git a/desktop/src/types/accounts.ts b/desktop/src/types/accounts.ts index 2996db64..4802265d 100644 --- a/desktop/src/types/accounts.ts +++ b/desktop/src/types/accounts.ts @@ -1,8 +1,8 @@ -import { Workspace } from '@/types/workspaces'; +import { WorkspaceOutput } from '@/types/workspaces'; export type LoginOutput = { account: Account; - workspaces: Workspace[]; + workspaces: WorkspaceOutput[]; }; export type Account = { diff --git a/desktop/src/types/workspaces.ts b/desktop/src/types/workspaces.ts index 1882cff3..a3e1229c 100644 --- a/desktop/src/types/workspaces.ts +++ b/desktop/src/types/workspaces.ts @@ -16,7 +16,22 @@ export type Workspace = { accountId: string; role: WorkspaceRole; userId: string; - synced: boolean; +}; + +export type WorkspaceOutput = { + id: string; + name: string; + description?: string | null; + avatar?: string | null; + versionId: string; + user: WorkspaceUserOutput; +}; + +export type WorkspaceUserOutput = { + id: string; + accountId: string; + role: WorkspaceRole; + node: ServerNode; }; export type WorkspaceAccountsInviteOutput = { diff --git a/server/package-lock.json b/server/package-lock.json index 3df12b5d..e2dd4de4 100644 --- a/server/package-lock.json +++ b/server/package-lock.json @@ -12,6 +12,7 @@ "@aws-sdk/client-s3": "^3.668.0", "axios": "^1.7.7", "bcrypt": "^5.1.1", + "bullmq": "^5.19.0", "cors": "^2.8.5", "express": "^4.21.1", "js-base64": "^3.7.7", @@ -1339,6 +1340,12 @@ "url": "https://opencollective.com/libvips" } }, + "node_modules/@ioredis/commands": { + "version": "1.2.0", + "resolved": "https://registry.npmjs.org/@ioredis/commands/-/commands-1.2.0.tgz", + "integrity": "sha512-Sx1pU8EM64o2BrqNpEO1CNLtKQwyhuXuqyfH7oGKCk+1a33d2r5saW8zNwm3j6BTExtjrv2BxTgzzkMwts6vGg==", + "license": "MIT" + }, "node_modules/@jridgewell/resolve-uri": { "version": "3.1.2", "resolved": "https://registry.npmjs.org/@jridgewell/resolve-uri/-/resolve-uri-3.1.2.tgz", @@ -1387,6 +1394,84 @@ "node-pre-gyp": "bin/node-pre-gyp" } }, + "node_modules/@msgpackr-extract/msgpackr-extract-darwin-arm64": { + "version": "3.0.3", + "resolved": "https://registry.npmjs.org/@msgpackr-extract/msgpackr-extract-darwin-arm64/-/msgpackr-extract-darwin-arm64-3.0.3.tgz", + "integrity": "sha512-QZHtlVgbAdy2zAqNA9Gu1UpIuI8Xvsd1v8ic6B2pZmeFnFcMWiPLfWXh7TVw4eGEZ/C9TH281KwhVoeQUKbyjw==", + "cpu": [ + "arm64" + ], + "license": "MIT", + "optional": true, + "os": [ + "darwin" + ] + }, + "node_modules/@msgpackr-extract/msgpackr-extract-darwin-x64": { + "version": "3.0.3", + "resolved": "https://registry.npmjs.org/@msgpackr-extract/msgpackr-extract-darwin-x64/-/msgpackr-extract-darwin-x64-3.0.3.tgz", + "integrity": "sha512-mdzd3AVzYKuUmiWOQ8GNhl64/IoFGol569zNRdkLReh6LRLHOXxU4U8eq0JwaD8iFHdVGqSy4IjFL4reoWCDFw==", + "cpu": [ + "x64" + ], + "license": "MIT", + "optional": true, + "os": [ + "darwin" + ] + }, + "node_modules/@msgpackr-extract/msgpackr-extract-linux-arm": { + "version": "3.0.3", + "resolved": "https://registry.npmjs.org/@msgpackr-extract/msgpackr-extract-linux-arm/-/msgpackr-extract-linux-arm-3.0.3.tgz", + "integrity": "sha512-fg0uy/dG/nZEXfYilKoRe7yALaNmHoYeIoJuJ7KJ+YyU2bvY8vPv27f7UKhGRpY6euFYqEVhxCFZgAUNQBM3nw==", + "cpu": [ + "arm" + ], + "license": "MIT", + "optional": true, + "os": [ + "linux" + ] + }, + "node_modules/@msgpackr-extract/msgpackr-extract-linux-arm64": { + "version": "3.0.3", + "resolved": "https://registry.npmjs.org/@msgpackr-extract/msgpackr-extract-linux-arm64/-/msgpackr-extract-linux-arm64-3.0.3.tgz", + "integrity": "sha512-YxQL+ax0XqBJDZiKimS2XQaf+2wDGVa1enVRGzEvLLVFeqa5kx2bWbtcSXgsxjQB7nRqqIGFIcLteF/sHeVtQg==", + "cpu": [ + "arm64" + ], + "license": "MIT", + "optional": true, + "os": [ + "linux" + ] + }, + "node_modules/@msgpackr-extract/msgpackr-extract-linux-x64": { + "version": "3.0.3", + "resolved": "https://registry.npmjs.org/@msgpackr-extract/msgpackr-extract-linux-x64/-/msgpackr-extract-linux-x64-3.0.3.tgz", + "integrity": "sha512-cvwNfbP07pKUfq1uH+S6KJ7dT9K8WOE4ZiAcsrSes+UY55E/0jLYc+vq+DO7jlmqRb5zAggExKm0H7O/CBaesg==", + "cpu": [ + "x64" + ], + "license": "MIT", + "optional": true, + "os": [ + "linux" + ] + }, + "node_modules/@msgpackr-extract/msgpackr-extract-win32-x64": { + "version": "3.0.3", + "resolved": "https://registry.npmjs.org/@msgpackr-extract/msgpackr-extract-win32-x64/-/msgpackr-extract-win32-x64-3.0.3.tgz", + "integrity": "sha512-x0fWaQtYp4E6sktbsdAqnehxDgEc/VwM7uLsRCYWaiGu0ykYdZPiS8zCWdnjHwyiumousxfBm4SO31eXqwEZhQ==", + "cpu": [ + "x64" + ], + "license": "MIT", + "optional": true, + "os": [ + "win32" + ] + }, "node_modules/@redis/bloom": { "version": "1.2.0", "resolved": "https://registry.npmjs.org/@redis/bloom/-/bloom-1.2.0.tgz", @@ -2663,6 +2748,21 @@ "integrity": "sha512-E+XQCRwSbaaiChtv6k6Dwgc+bx+Bs6vuKJHHl5kox/BaKbhiXzqQOwK4cO22yElGp2OCmjwVhT3HmxgyPGnJfQ==", "license": "MIT" }, + "node_modules/bullmq": { + "version": "5.19.0", + "resolved": "https://registry.npmjs.org/bullmq/-/bullmq-5.19.0.tgz", + "integrity": "sha512-S6ZxVqPgzvKVkGjUN5Qwi0bDgM2aZPKsgJ8ESe5gUOOt3APDRPfDAzrkUz1FkTd1nfgc3HFBN8MCipWDGTdFGA==", + "license": "MIT", + "dependencies": { + "cron-parser": "^4.6.0", + "ioredis": "^5.4.1", + "msgpackr": "^1.10.1", + "node-abort-controller": "^3.1.1", + "semver": "^7.5.4", + "tslib": "^2.0.0", + "uuid": "^9.0.0" + } + }, "node_modules/busboy": { "version": "1.6.0", "resolved": "https://registry.npmjs.org/busboy/-/busboy-1.6.0.tgz", @@ -3033,6 +3133,18 @@ "dev": true, "license": "MIT" }, + "node_modules/cron-parser": { + "version": "4.9.0", + "resolved": "https://registry.npmjs.org/cron-parser/-/cron-parser-4.9.0.tgz", + "integrity": "sha512-p0SaNjrHOnQeR8/VnfGbmg9te2kfyYSQ7Sc/j/6DtPL3JQvKxmjO9TSjNFpujqV3vEYYBvNNvXSxzyksBWAx1Q==", + "license": "MIT", + "dependencies": { + "luxon": "^3.2.1" + }, + "engines": { + "node": ">=12.0.0" + } + }, "node_modules/debug": { "version": "2.6.9", "resolved": "https://registry.npmjs.org/debug/-/debug-2.6.9.tgz", @@ -3074,6 +3186,15 @@ "integrity": "sha512-bd2L678uiWATM6m5Z1VzNCErI3jiGzt6HGY8OVICs40JQq/HALfbyNJmp0UDakEY4pMMaN0Ly5om/B1VI/+xfQ==", "license": "MIT" }, + "node_modules/denque": { + "version": "2.1.0", + "resolved": "https://registry.npmjs.org/denque/-/denque-2.1.0.tgz", + "integrity": "sha512-HVQE3AAb/pxF8fQAoiqpvg9i3evqug3hoiwakOyZAwJm+6vZehbkYXZ0l4JxS+I3QxM97v5aaRNhj8v5oBhekw==", + "license": "Apache-2.0", + "engines": { + "node": ">=0.10" + } + }, "node_modules/depd": { "version": "2.0.0", "resolved": "https://registry.npmjs.org/depd/-/depd-2.0.0.tgz", @@ -3637,6 +3758,53 @@ "integrity": "sha512-k/vGaX4/Yla3WzyMCvTQOXYeIHvqOKtnqBduzTHpzpQZzAskKMhZ2K+EnBiSM9zGSoIFeMpXKxa4dYeZIQqewQ==", "license": "ISC" }, + "node_modules/ioredis": { + "version": "5.4.1", + "resolved": "https://registry.npmjs.org/ioredis/-/ioredis-5.4.1.tgz", + "integrity": "sha512-2YZsvl7jopIa1gaePkeMtd9rAcSjOOjPtpcLlOeusyO+XH2SK5ZcT+UCrElPP+WVIInh2TzeI4XW9ENaSLVVHA==", + "license": "MIT", + "dependencies": { + "@ioredis/commands": "^1.1.1", + "cluster-key-slot": "^1.1.0", + "debug": "^4.3.4", + "denque": "^2.1.0", + "lodash.defaults": "^4.2.0", + "lodash.isarguments": "^3.1.0", + "redis-errors": "^1.2.0", + "redis-parser": "^3.0.0", + "standard-as-callback": "^2.1.0" + }, + "engines": { + "node": ">=12.22.0" + }, + "funding": { + "type": "opencollective", + "url": "https://opencollective.com/ioredis" + } + }, + "node_modules/ioredis/node_modules/debug": { + "version": "4.3.7", + "resolved": "https://registry.npmjs.org/debug/-/debug-4.3.7.tgz", + "integrity": "sha512-Er2nc/H7RrMXZBFCEim6TCmMk02Z8vLC2Rbi1KEBggpo0fS6l0S1nnapwmIi3yW/+GOJap1Krg4w0Hg80oCqgQ==", + "license": "MIT", + "dependencies": { + "ms": "^2.1.3" + }, + "engines": { + "node": ">=6.0" + }, + "peerDependenciesMeta": { + "supports-color": { + "optional": true + } + } + }, + "node_modules/ioredis/node_modules/ms": { + "version": "2.1.3", + "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.3.tgz", + "integrity": "sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA==", + "license": "MIT" + }, "node_modules/ipaddr.js": { "version": "1.9.1", "resolved": "https://registry.npmjs.org/ipaddr.js/-/ipaddr.js-1.9.1.tgz", @@ -3794,6 +3962,27 @@ "dev": true, "license": "MIT" }, + "node_modules/lodash.defaults": { + "version": "4.2.0", + "resolved": "https://registry.npmjs.org/lodash.defaults/-/lodash.defaults-4.2.0.tgz", + "integrity": "sha512-qjxPLHd3r5DnsdGacqOMU6pb/avJzdh9tFX2ymgoZE27BmjXrNy/y4LoaiTeAb+O3gL8AfpJGtqfX/ae2leYYQ==", + "license": "MIT" + }, + "node_modules/lodash.isarguments": { + "version": "3.1.0", + "resolved": "https://registry.npmjs.org/lodash.isarguments/-/lodash.isarguments-3.1.0.tgz", + "integrity": "sha512-chi4NHZlZqZD18a0imDHnZPrDeBbTtVN7GXMwuGdRH9qotxAjYs3aVLKc7zNOG9eddR5Ksd8rvFEBc9SsggPpg==", + "license": "MIT" + }, + "node_modules/luxon": { + "version": "3.5.0", + "resolved": "https://registry.npmjs.org/luxon/-/luxon-3.5.0.tgz", + "integrity": "sha512-rh+Zjr6DNfUYR3bPwJEnuwDdqMbxZW7LOQfUN4B54+Cl+0o5zaU9RJ6bcidfDtC1cWCZXQ+nvX8bf6bAji37QQ==", + "license": "MIT", + "engines": { + "node": ">=12" + } + }, "node_modules/make-dir": { "version": "3.1.0", "resolved": "https://registry.npmjs.org/make-dir/-/make-dir-3.1.0.tgz", @@ -3958,6 +4147,37 @@ "integrity": "sha512-Tpp60P6IUJDTuOq/5Z8cdskzJujfwqfOTkrwIwj7IRISpnkJnT6SyJ4PCPnGMoFjC9ddhal5KVIYtAt97ix05A==", "license": "MIT" }, + "node_modules/msgpackr": { + "version": "1.11.0", + "resolved": "https://registry.npmjs.org/msgpackr/-/msgpackr-1.11.0.tgz", + "integrity": "sha512-I8qXuuALqJe5laEBYoFykChhSXLikZmUhccjGsPuSJ/7uPip2TJ7lwdIQwWSAi0jGZDXv4WOP8Qg65QZRuXxXw==", + "license": "MIT", + "optionalDependencies": { + "msgpackr-extract": "^3.0.2" + } + }, + "node_modules/msgpackr-extract": { + "version": "3.0.3", + "resolved": "https://registry.npmjs.org/msgpackr-extract/-/msgpackr-extract-3.0.3.tgz", + "integrity": "sha512-P0efT1C9jIdVRefqjzOQ9Xml57zpOXnIuS+csaB4MdZbTdmGDLo8XhzBG1N7aO11gKDDkJvBLULeFTo46wwreA==", + "hasInstallScript": true, + "license": "MIT", + "optional": true, + "dependencies": { + "node-gyp-build-optional-packages": "5.2.2" + }, + "bin": { + "download-msgpackr-prebuilds": "bin/download-prebuilds.js" + }, + "optionalDependencies": { + "@msgpackr-extract/msgpackr-extract-darwin-arm64": "3.0.3", + "@msgpackr-extract/msgpackr-extract-darwin-x64": "3.0.3", + "@msgpackr-extract/msgpackr-extract-linux-arm": "3.0.3", + "@msgpackr-extract/msgpackr-extract-linux-arm64": "3.0.3", + "@msgpackr-extract/msgpackr-extract-linux-x64": "3.0.3", + "@msgpackr-extract/msgpackr-extract-win32-x64": "3.0.3" + } + }, "node_modules/multer": { "version": "1.4.5-lts.1", "resolved": "https://registry.npmjs.org/multer/-/multer-1.4.5-lts.1.tgz", @@ -3997,6 +4217,12 @@ "node": ">= 0.6" } }, + "node_modules/node-abort-controller": { + "version": "3.1.1", + "resolved": "https://registry.npmjs.org/node-abort-controller/-/node-abort-controller-3.1.1.tgz", + "integrity": "sha512-AGK2yQKIjRuqnc6VkX2Xj5d+QW8xZ87pa1UK6yA6ouUyuxfHuMP6umE5QK7UmTeOAymo+Zx1Fxiuw9rVx8taHQ==", + "license": "MIT" + }, "node_modules/node-addon-api": { "version": "5.1.0", "resolved": "https://registry.npmjs.org/node-addon-api/-/node-addon-api-5.1.0.tgz", @@ -4023,6 +4249,21 @@ } } }, + "node_modules/node-gyp-build-optional-packages": { + "version": "5.2.2", + "resolved": "https://registry.npmjs.org/node-gyp-build-optional-packages/-/node-gyp-build-optional-packages-5.2.2.tgz", + "integrity": "sha512-s+w+rBWnpTMwSFbaE0UXsRlg7hU4FjekKU4eyAih5T8nJuNZT1nNsskXpxmeqSK9UzkBl6UgRlnKc8hz8IEqOw==", + "license": "MIT", + "optional": true, + "dependencies": { + "detect-libc": "^2.0.1" + }, + "bin": { + "node-gyp-build-optional-packages": "bin.js", + "node-gyp-build-optional-packages-optional": "optional.js", + "node-gyp-build-optional-packages-test": "build-test.js" + } + }, "node_modules/nodemon": { "version": "3.1.7", "resolved": "https://registry.npmjs.org/nodemon/-/nodemon-3.1.7.tgz", @@ -4490,6 +4731,27 @@ "@redis/time-series": "1.1.0" } }, + "node_modules/redis-errors": { + "version": "1.2.0", + "resolved": "https://registry.npmjs.org/redis-errors/-/redis-errors-1.2.0.tgz", + "integrity": "sha512-1qny3OExCf0UvUV/5wpYKf2YwPcOqXzkwKKSmKHiE6ZMQs5heeE/c8eXK+PNllPvmjgAbfnsbpkGZWy8cBpn9w==", + "license": "MIT", + "engines": { + "node": ">=4" + } + }, + "node_modules/redis-parser": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/redis-parser/-/redis-parser-3.0.0.tgz", + "integrity": "sha512-DJnGAeenTdpMEH6uAJRK/uiyEIH9WVsUmoLwzudwGJUwZPp80PDBWPHXSAGNPwNvIXAbe7MSUB1zQFugFml66A==", + "license": "MIT", + "dependencies": { + "redis-errors": "^1.0.0" + }, + "engines": { + "node": ">=4" + } + }, "node_modules/require-directory": { "version": "2.1.1", "resolved": "https://registry.npmjs.org/require-directory/-/require-directory-2.1.1.tgz", @@ -4751,6 +5013,12 @@ "node": ">= 10.x" } }, + "node_modules/standard-as-callback": { + "version": "2.1.0", + "resolved": "https://registry.npmjs.org/standard-as-callback/-/standard-as-callback-2.1.0.tgz", + "integrity": "sha512-qoRRSyROncaz1z0mvYqIE4lCd9p2R90i6GxW3uZv5ucSu8tU7B5HXUP1gG8pVZsYNVaXjk8ClXHPttLyxAL48A==", + "license": "MIT" + }, "node_modules/statuses": { "version": "2.0.1", "resolved": "https://registry.npmjs.org/statuses/-/statuses-2.0.1.tgz", diff --git a/server/package.json b/server/package.json index af853eac..d4e220f6 100644 --- a/server/package.json +++ b/server/package.json @@ -32,6 +32,7 @@ "@aws-sdk/client-s3": "^3.668.0", "axios": "^1.7.7", "bcrypt": "^5.1.1", + "bullmq": "^5.19.0", "cors": "^2.8.5", "express": "^4.21.1", "js-base64": "^3.7.7", diff --git a/server/src/consumers/change-cdc.ts b/server/src/consumers/change-cdc.ts deleted file mode 100644 index 7324c7f6..00000000 --- a/server/src/consumers/change-cdc.ts +++ /dev/null @@ -1,59 +0,0 @@ -import { kafka, TOPIC_NAMES, CONSUMER_IDS } from '@/data/kafka'; -import { CdcMessage, ChangeCdcData } from '@/types/cdc'; -import { redis, CHANNEL_NAMES } from '@/data/redis'; -import { PostgresOperation } from '@/lib/constants'; - -export const initChangeCdcConsumer = async () => { - const consumer = kafka.consumer({ groupId: CONSUMER_IDS.CHANGE_CDC }); - - await consumer.connect(); - await consumer.subscribe({ topic: TOPIC_NAMES.CHANGE_CDC }); - - await consumer.run({ - eachMessage: async ({ message }) => { - if (!message || !message.value) { - return; - } - - const change = JSON.parse( - message.value.toString(), - ) as CdcMessage; - - await handleChangeCdc(change); - }, - }); -}; - -const handleChangeCdc = async (change: CdcMessage) => { - switch (change.op) { - case PostgresOperation.CREATE: { - await handleChangeCreate(change); - break; - } - case PostgresOperation.UPDATE: { - await handleChangeUpdate(change); - break; - } - case PostgresOperation.DELETE: { - await handleChangeDelete(change); - break; - } - } -}; - -const handleChangeCreate = async (change: CdcMessage) => { - const changeData = change.after; - if (!changeData) { - return; - } - - await redis.publish(CHANNEL_NAMES.CHANGES, JSON.stringify(changeData)); -}; - -const handleChangeUpdate = async (change: CdcMessage) => { - console.log('Change update:', change.after?.id); -}; - -const handleChangeDelete = async (change: CdcMessage) => { - console.log('Change delete:', change.before?.id); -}; diff --git a/server/src/consumers/changes-subcriber.ts b/server/src/consumers/changes-subcriber.ts index 5680e51d..1fd7f019 100644 --- a/server/src/consumers/changes-subcriber.ts +++ b/server/src/consumers/changes-subcriber.ts @@ -1,8 +1,11 @@ +import { database } from '@/data/database'; import { redis, CHANNEL_NAMES } from '@/data/redis'; import { socketManager } from '@/sockets/socket-manager'; -import { ServerChange } from '@/types/sync'; -import { ChangeCdcData } from '@/types/cdc'; -import { ServerChangeMessageInput } from '@/messages/server-change'; +import { + ServerChange, + ServerChangeBroadcastMessage, + ServerChangeData, +} from '@/types/sync'; export const initChangesSubscriber = async () => { const subscriber = redis.duplicate(); @@ -11,20 +14,41 @@ export const initChangesSubscriber = async () => { }; const handleMessage = async (message: string) => { - const changeData = JSON.parse(message) as ChangeCdcData; + const data: ServerChangeBroadcastMessage = JSON.parse(message); + if (!data.deviceIds) { + return; + } + + if (!data.changeId) { + return; + } + + const connections = socketManager.getConnections(data.deviceIds); + if (connections.length === 0) { + return; + } + + const change = await database + .selectFrom('changes') + .selectAll() + .where('id', '=', data.changeId) + .executeTakeFirst(); + + if (!change) { + return; + } const serverChange: ServerChange = { - id: changeData.id, - workspaceId: changeData.workspace_id, - deviceId: changeData.device_id, - data: JSON.parse(changeData.data), - createdAt: changeData.created_at, + id: change.id, + workspaceId: change.workspace_id, + data: change.data as ServerChangeData, + createdAt: change.created_at.toISOString(), }; - const input: ServerChangeMessageInput = { - type: 'server_change', - change: serverChange, - }; - - socketManager.send(changeData.device_id, input); + connections.forEach((connection) => { + connection.send({ + type: 'server_change', + change: serverChange, + }); + }); }; diff --git a/server/src/consumers/node-cdc.ts b/server/src/consumers/node-cdc.ts deleted file mode 100644 index 732503aa..00000000 --- a/server/src/consumers/node-cdc.ts +++ /dev/null @@ -1,178 +0,0 @@ -import { kafka, TOPIC_NAMES, CONSUMER_IDS } from '@/data/kafka'; -import { CdcMessage, NodeCdcData } from '@/types/cdc'; -import { PostgresOperation } from '@/lib/constants'; -import { database } from '@/data/database'; -import { generateId, IdType } from '@/lib/id'; -import { - ServerNodeCreateChangeData, - ServerNodeDeleteChangeData, - ServerNodeUpdateChangeData, -} from '@/types/sync'; - -export const initNodeChangesConsumer = async () => { - const consumer = kafka.consumer({ groupId: CONSUMER_IDS.NODE_CDC }); - - await consumer.connect(); - await consumer.subscribe({ topic: TOPIC_NAMES.NODE_CDC }); - - await consumer.run({ - eachMessage: async ({ message }) => { - if (!message || !message.value) { - return; - } - - const change = JSON.parse( - message.value.toString(), - ) as CdcMessage; - - await handleNodeCdc(change); - }, - }); -}; - -const handleNodeCdc = async (change: CdcMessage) => { - switch (change.op) { - case PostgresOperation.CREATE: { - await handleNodeCreate(change); - break; - } - case PostgresOperation.UPDATE: { - await handleNodeUpdate(change); - break; - } - case PostgresOperation.DELETE: { - await handleNodeDelete(change); - break; - } - } -}; - -const handleNodeCreate = async (change: CdcMessage) => { - const node = change.after; - if (!node) { - return; - } - - const deviceIds = await getDeviceIds(node.workspace_id); - if (deviceIds.length == 0) { - return; - } - - const data: ServerNodeCreateChangeData = { - type: 'node_create', - id: node.id, - workspaceId: node.workspace_id, - state: node.state, - createdAt: node.created_at, - createdBy: node.created_by, - serverCreatedAt: node.server_created_at, - versionId: node.version_id, - }; - - await database - .insertInto('changes') - .values( - deviceIds.map((deviceId) => { - return { - id: generateId(IdType.Change), - device_id: deviceId, - workspace_id: node.workspace_id, - data: JSON.stringify(data), - created_at: new Date(), - retry_count: 0, - }; - }), - ) - .execute(); -}; - -const handleNodeUpdate = async (change: CdcMessage) => { - const node = change.after; - if (!node) { - return; - } - - const deviceIds = await getDeviceIds(node.workspace_id); - if (deviceIds.length == 0) { - return; - } - - const data: ServerNodeUpdateChangeData = { - type: 'node_update', - id: node.id, - workspaceId: node.workspace_id, - update: node.state, - updatedAt: node.updated_at ?? new Date().toISOString(), - updatedBy: node.updated_by ?? node.created_by, - serverUpdatedAt: node.server_updated_at ?? new Date().toISOString(), - versionId: node.version_id, - }; - - await database - .insertInto('changes') - .values( - deviceIds.map((deviceId) => { - return { - id: generateId(IdType.Change), - device_id: deviceId, - workspace_id: node.workspace_id, - data: JSON.stringify(data), - created_at: new Date(), - retry_count: 0, - }; - }), - ) - .execute(); -}; - -const handleNodeDelete = async (change: CdcMessage) => { - const node = change.before; - if (!node) { - return; - } - - const deviceIds = await getDeviceIds(node.workspace_id); - if (deviceIds.length == 0) { - return; - } - - const data: ServerNodeDeleteChangeData = { - type: 'node_delete', - id: node.id, - workspaceId: node.workspace_id, - }; - - await database - .insertInto('changes') - .values( - deviceIds.map((deviceId) => { - return { - id: generateId(IdType.Change), - device_id: deviceId, - workspace_id: node.workspace_id, - data: JSON.stringify(data), - created_at: new Date(), - retry_count: 0, - }; - }), - ) - .execute(); -}; - -const getDeviceIds = async (workspaceId: string) => { - const accountDevices = await database - .selectFrom('account_devices') - .where( - 'account_id', - 'in', - database - .selectFrom('workspace_users') - .where('workspace_id', '=', workspaceId) - .select('account_id'), - ) - .select('id') - .execute(); - - const deviceIds = accountDevices.map((account) => account.id); - return deviceIds; -}; diff --git a/server/src/consumers/node-collaborator-cdc.ts b/server/src/consumers/node-collaborator-cdc.ts deleted file mode 100644 index 075897cd..00000000 --- a/server/src/consumers/node-collaborator-cdc.ts +++ /dev/null @@ -1,192 +0,0 @@ -import { kafka, TOPIC_NAMES, CONSUMER_IDS } from '@/data/kafka'; -import { CdcMessage, NodeCollaboratorCdcData } from '@/types/cdc'; -import { PostgresOperation } from '@/lib/constants'; -import { database } from '@/data/database'; -import { generateId, IdType } from '@/lib/id'; -import { - ServerNodeCollaboratorCreateChangeData, - ServerNodeCollaboratorDeleteChangeData, - ServerNodeCollaboratorUpdateChangeData, -} from '@/types/sync'; - -export const initNodeCollaboratorChangesConsumer = async () => { - const consumer = kafka.consumer({ - groupId: CONSUMER_IDS.NODE_COLLABORATOR_CDC, - }); - - await consumer.connect(); - await consumer.subscribe({ topic: TOPIC_NAMES.NODE_COLLABORATOR_CDC }); - - await consumer.run({ - eachMessage: async ({ message }) => { - if (!message || !message.value) { - return; - } - - const change = JSON.parse( - message.value.toString(), - ) as CdcMessage; - - await handleNodeCollaboratorCdc(change); - }, - }); -}; - -const handleNodeCollaboratorCdc = async ( - change: CdcMessage, -) => { - switch (change.op) { - case PostgresOperation.CREATE: { - await handleNodeCollaboratorCreate(change); - break; - } - case PostgresOperation.UPDATE: { - await handleNodeCollaboratorUpdate(change); - break; - } - case PostgresOperation.DELETE: { - await handleNodeCollaboratorDelete(change); - break; - } - } -}; - -const handleNodeCollaboratorCreate = async ( - change: CdcMessage, -) => { - const nodeCollaborator = change.after; - if (!nodeCollaborator) { - return; - } - - const deviceIds = await getDeviceIds(nodeCollaborator.workspace_id); - if (deviceIds.length == 0) { - return; - } - - const data: ServerNodeCollaboratorCreateChangeData = { - type: 'node_collaborator_create', - nodeId: nodeCollaborator.node_id, - collaboratorId: nodeCollaborator.collaborator_id, - role: nodeCollaborator.role, - workspaceId: nodeCollaborator.workspace_id, - createdAt: nodeCollaborator.created_at, - createdBy: nodeCollaborator.created_by, - versionId: nodeCollaborator.version_id, - serverCreatedAt: nodeCollaborator.server_created_at, - }; - - await database - .insertInto('changes') - .values( - deviceIds.map((deviceId) => { - return { - id: generateId(IdType.Change), - device_id: deviceId, - workspace_id: nodeCollaborator.workspace_id, - data: JSON.stringify(data), - created_at: new Date(), - retry_count: 0, - }; - }), - ) - .execute(); -}; - -const handleNodeCollaboratorUpdate = async ( - change: CdcMessage, -) => { - const nodeCollaborator = change.after; - if (!nodeCollaborator) { - return; - } - - const deviceIds = await getDeviceIds(nodeCollaborator.workspace_id); - if (deviceIds.length == 0) { - return; - } - - const data: ServerNodeCollaboratorUpdateChangeData = { - type: 'node_collaborator_update', - nodeId: nodeCollaborator.node_id, - collaboratorId: nodeCollaborator.collaborator_id, - role: nodeCollaborator.role, - workspaceId: nodeCollaborator.workspace_id, - updatedAt: nodeCollaborator.updated_at ?? new Date().toISOString(), - updatedBy: nodeCollaborator.updated_by ?? nodeCollaborator.created_by, - versionId: nodeCollaborator.version_id, - serverUpdatedAt: - nodeCollaborator.server_updated_at ?? new Date().toISOString(), - }; - - await database - .insertInto('changes') - .values( - deviceIds.map((deviceId) => { - return { - id: generateId(IdType.Change), - device_id: deviceId, - workspace_id: nodeCollaborator.workspace_id, - data: JSON.stringify(data), - created_at: new Date(), - retry_count: 0, - }; - }), - ) - .execute(); -}; - -const handleNodeCollaboratorDelete = async ( - change: CdcMessage, -) => { - const nodeCollaborator = change.before; - if (!nodeCollaborator) { - return; - } - - const deviceIds = await getDeviceIds(nodeCollaborator.workspace_id); - if (deviceIds.length == 0) { - return; - } - - const data: ServerNodeCollaboratorDeleteChangeData = { - type: 'node_collaborator_delete', - nodeId: nodeCollaborator.node_id, - collaboratorId: nodeCollaborator.collaborator_id, - workspaceId: nodeCollaborator.workspace_id, - }; - - await database - .insertInto('changes') - .values( - deviceIds.map((deviceId) => { - return { - id: generateId(IdType.Change), - device_id: deviceId, - workspace_id: nodeCollaborator.workspace_id, - data: JSON.stringify(data), - created_at: new Date(), - retry_count: 0, - }; - }), - ) - .execute(); -}; - -const getDeviceIds = async (workspaceId: string) => { - const accountDevices = await database - .selectFrom('account_devices') - .where( - 'account_id', - 'in', - database - .selectFrom('workspace_users') - .where('workspace_id', '=', workspaceId) - .select('account_id'), - ) - .select('id') - .execute(); - - const deviceIds = accountDevices.map((account) => account.id); - return deviceIds; -}; diff --git a/server/src/consumers/node-reaction-cdc.ts b/server/src/consumers/node-reaction-cdc.ts deleted file mode 100644 index f408c6f3..00000000 --- a/server/src/consumers/node-reaction-cdc.ts +++ /dev/null @@ -1,143 +0,0 @@ -import { kafka, TOPIC_NAMES, CONSUMER_IDS } from '@/data/kafka'; -import { CdcMessage, NodeReactionCdcData } from '@/types/cdc'; -import { PostgresOperation } from '@/lib/constants'; -import { database } from '@/data/database'; -import { generateId, IdType } from '@/lib/id'; -import { - ServerNodeReactionCreateChangeData, - ServerNodeReactionDeleteChangeData, -} from '@/types/sync'; - -export const initNodeReactionChangesConsumer = async () => { - const consumer = kafka.consumer({ - groupId: CONSUMER_IDS.NODE_REACTION_CDC, - }); - - await consumer.connect(); - await consumer.subscribe({ topic: TOPIC_NAMES.NODE_REACTION_CDC }); - - await consumer.run({ - eachMessage: async ({ message }) => { - if (!message || !message.value) { - return; - } - - const change = JSON.parse( - message.value.toString(), - ) as CdcMessage; - - await handleNodeReactionCdc(change); - }, - }); -}; - -const handleNodeReactionCdc = async ( - change: CdcMessage, -) => { - switch (change.op) { - case PostgresOperation.CREATE: { - await handleNodeReactionCreate(change); - break; - } - case PostgresOperation.DELETE: { - await handleNodeReactionDelete(change); - break; - } - } -}; - -const handleNodeReactionCreate = async ( - change: CdcMessage, -) => { - const reaction = change.after; - if (!reaction) { - return; - } - - const deviceIds = await getDeviceIds(reaction.workspace_id); - if (deviceIds.length == 0) { - return; - } - - const data: ServerNodeReactionCreateChangeData = { - type: 'node_reaction_create', - nodeId: reaction.node_id, - actorId: reaction.actor_id, - reaction: reaction.reaction, - workspaceId: reaction.workspace_id, - createdAt: reaction.created_at, - serverCreatedAt: reaction.server_created_at, - }; - - await database - .insertInto('changes') - .values( - deviceIds.map((deviceId) => { - return { - id: generateId(IdType.Change), - device_id: deviceId, - workspace_id: reaction.workspace_id, - data: JSON.stringify(data), - created_at: new Date(), - retry_count: 0, - }; - }), - ) - .execute(); -}; - -const handleNodeReactionDelete = async ( - change: CdcMessage, -) => { - const reaction = change.before; - if (!reaction) { - return; - } - - const deviceIds = await getDeviceIds(reaction.workspace_id); - if (deviceIds.length == 0) { - return; - } - - const data: ServerNodeReactionDeleteChangeData = { - type: 'node_reaction_delete', - nodeId: reaction.node_id, - actorId: reaction.actor_id, - reaction: reaction.reaction, - workspaceId: reaction.workspace_id, - }; - - await database - .insertInto('changes') - .values( - deviceIds.map((deviceId) => { - return { - id: generateId(IdType.Change), - device_id: deviceId, - workspace_id: reaction.workspace_id, - data: JSON.stringify(data), - created_at: new Date(), - retry_count: 0, - }; - }), - ) - .execute(); -}; - -const getDeviceIds = async (workspaceId: string) => { - const accountDevices = await database - .selectFrom('account_devices') - .where( - 'account_id', - 'in', - database - .selectFrom('workspace_users') - .where('workspace_id', '=', workspaceId) - .select('account_id'), - ) - .select('id') - .execute(); - - const deviceIds = accountDevices.map((account) => account.id); - return deviceIds; -}; diff --git a/server/src/data/database.ts b/server/src/data/database.ts index d4ae0618..8bca02f7 100644 --- a/server/src/data/database.ts +++ b/server/src/data/database.ts @@ -1,4 +1,12 @@ -import { Kysely, Migration, Migrator, PostgresDialect } from 'kysely'; +import { + DeleteResult, + InsertResult, + Kysely, + Migration, + Migrator, + PostgresDialect, + UpdateResult, +} from 'kysely'; import { Pool } from 'pg'; import { DatabaseSchema } from '@/data/schema'; import { databaseMigrations } from '@/data/migrations'; @@ -25,3 +33,25 @@ export const migrate = async () => { await migrator.migrateToLatest(); }; + +export const hasInsertChanges = (result: InsertResult[]): boolean => { + if (result.length === 0) { + return false; + } + + return result.some( + (r) => r.numInsertedOrUpdatedRows && r.numInsertedOrUpdatedRows > 0n, + ); +}; + +export const hasUpdateChanges = (result: UpdateResult[]): boolean => { + if (result.length === 0) { + return false; + } + + return result.some((r) => r.numUpdatedRows && r.numUpdatedRows > 0n); +}; + +export const hasDeleteChanges = (result: DeleteResult[]): boolean => { + return result.some((r) => r.numDeletedRows && r.numDeletedRows > 0n); +}; diff --git a/server/src/data/kafka.ts b/server/src/data/kafka.ts deleted file mode 100644 index cb34a357..00000000 --- a/server/src/data/kafka.ts +++ /dev/null @@ -1,54 +0,0 @@ -import { Kafka } from 'kafkajs'; - -const KAFKA_CLIENT_ID = process.env.KAFKA_CLIENT_ID ?? 'neuron'; -const KAFKA_BROKERS = process.env.KAFKA_BROKERS ?? ''; -const KAFKA_USERNAME = process.env.KAFKA_USERNAME; -const KAFKA_PASSWORD = process.env.KAFKA_PASSWORD; - -export const kafka = new Kafka({ - clientId: KAFKA_CLIENT_ID, - brokers: KAFKA_BROKERS.split(','), - sasl: - KAFKA_USERNAME && KAFKA_PASSWORD - ? { - username: KAFKA_USERNAME, - password: KAFKA_PASSWORD, - mechanism: 'plain', - } - : undefined, -}); - -export const producer = kafka.producer(); - -export const TOPIC_NAMES = { - NODE_CDC: process.env.KAFKA_NODE_CDC_TOPIC_NAME ?? 'neuron_node_cdc', - NODE_COLLABORATOR_CDC: - process.env.KAFKA_NODE_COLLABORATOR_CDC_TOPIC_NAME ?? - 'neuron_node_collaborator_cdc', - NODE_REACTION_CDC: - process.env.KAFKA_NODE_REACTION_CDC_TOPIC_NAME ?? - 'neuron_node_reaction_cdc', - CHANGE_CDC: process.env.KAFKA_CHANGE_CDC_TOPIC_NAME ?? 'neuron_change_cdc', -}; - -export const CONSUMER_IDS = { - NODE_CDC: - process.env.KAFKA_NODE_CDC_CONSUMER_ID ?? 'neuron_node_cdc_consumer', - NODE_COLLABORATOR_CDC: - process.env.KAFKA_NODE_COLLABORATOR_CDC_CONSUMER_ID ?? - 'neuron_node_collaborator_cdc_consumer', - NODE_REACTION_CDC: - process.env.KAFKA_NODE_REACTION_CDC_CONSUMER_ID ?? - 'neuron_node_reaction_cdc_consumer', - CHANGE_CDC: - process.env.KAFKA_CHANGE_CDC_CONSUMER_ID ?? 'neuron_change_cdc_consumer', -}; - -const connectProducer = async () => { - await producer.connect(); - console.log('Kafka Producer connected'); -}; - -connectProducer().catch((err) => { - console.error('Failed to connect Kafka Producer', err); -}); diff --git a/server/src/data/migrations.ts b/server/src/data/migrations.ts index efa07310..0b4241a2 100644 --- a/server/src/data/migrations.ts +++ b/server/src/data/migrations.ts @@ -192,17 +192,10 @@ const createChangesTable: Migration = { await db.schema .createTable('changes') .addColumn('id', 'varchar(30)', (col) => col.notNull().primaryKey()) - .addColumn('device_id', 'varchar(30)', (col) => col.notNull()) .addColumn('workspace_id', 'varchar(30)', (col) => col.notNull()) .addColumn('data', 'jsonb') .addColumn('created_at', 'timestamptz', (col) => col.notNull()) - .addColumn('retry_count', 'integer', (col) => col.notNull().defaultTo(0)) - .execute(); - - await db.schema - .createIndex('changes_device_id_index') - .on('changes') - .column('device_id') + .addColumn('notified_at', 'timestamptz') .execute(); }, down: async (db) => { @@ -210,6 +203,26 @@ const createChangesTable: Migration = { }, }; +const createChangeDevicesTable: Migration = { + up: async (db) => { + await db.schema + .createTable('change_devices') + .addColumn('change_id', 'varchar(30)', (col) => + col.notNull().references('changes.id').onDelete('cascade'), + ) + .addColumn('device_id', 'varchar(30)', (col) => col.notNull()) + .addColumn('retry_count', 'integer', (col) => col.notNull().defaultTo(0)) + .addPrimaryKeyConstraint('change_devices_pkey', [ + 'change_id', + 'device_id', + ]) + .execute(); + }, + down: async (db) => { + await db.schema.dropTable('change_devices').execute(); + }, +}; + export const databaseMigrations: Record = { '00001_create_accounts_table': createAccountsTable, '00002_create_workspaces_table': createWorkspacesTable, @@ -219,4 +232,5 @@ export const databaseMigrations: Record = { '00006_create_node_reactions_table': createNodeReactionsTable, '00007_create_account_devices_table': createAccountDevicesTable, '00008_create_changes_table': createChangesTable, + '00009_create_change_devices_table': createChangeDevicesTable, }; diff --git a/server/src/data/schema.ts b/server/src/data/schema.ts index 4e66cc58..abf1794b 100644 --- a/server/src/data/schema.ts +++ b/server/src/data/schema.ts @@ -139,17 +139,26 @@ export type UpdateNodeReaction = Updateable; interface ChangeTable { id: ColumnType; - device_id: ColumnType; workspace_id: ColumnType; data: JSONColumnType; created_at: ColumnType; - retry_count: ColumnType; + notified_at: ColumnType; } export type SelectChange = Selectable; export type CreateChange = Insertable; export type UpdateChange = Updateable; +interface ChangeDeviceTable { + change_id: ColumnType; + device_id: ColumnType; + retry_count: ColumnType; +} + +export type SelectChangeDevice = Selectable; +export type CreateChangeDevice = Insertable; +export type UpdateChangeDevice = Updateable; + export interface DatabaseSchema { accounts: AccountTable; workspaces: WorkspaceTable; @@ -159,4 +168,5 @@ export interface DatabaseSchema { node_collaborators: NodeCollaboratorTable; node_reactions: NodeReactionTable; changes: ChangeTable; + change_devices: ChangeDeviceTable; } diff --git a/server/src/index.ts b/server/src/index.ts index c9b79a8b..ee999d1a 100644 --- a/server/src/index.ts +++ b/server/src/index.ts @@ -1,33 +1,17 @@ import { initApi } from '@/api'; import { initRedis } from '@/data/redis'; -import { initNodeChangesConsumer } from '@/consumers/node-cdc'; -import { initChangeCdcConsumer } from '@/consumers/change-cdc'; import { initChangesSubscriber } from '@/consumers/changes-subcriber'; -import { initNodeCollaboratorChangesConsumer } from '@/consumers/node-collaborator-cdc'; -import { initNodeReactionChangesConsumer } from '@/consumers/node-reaction-cdc'; import { migrate } from '@/data/database'; +import { initChangeWorker } from '@/queues/changes'; migrate().then(() => { initApi(); - initNodeChangesConsumer().then(() => { - console.log('Node cdc consumer started'); - }); - - initNodeCollaboratorChangesConsumer().then(() => { - console.log('Node collaborator cdc consumer started'); - }); - - initNodeReactionChangesConsumer().then(() => { - console.log('Node reaction cdc consumer started'); - }); - - initChangeCdcConsumer().then(() => { - console.log('Change cdc consumer started'); - }); - initRedis().then(() => { console.log('Redis initialized'); + + initChangeWorker(); + initChangesSubscriber().then(() => { console.log('Change subscriber started'); }); diff --git a/server/src/lib/nodes.ts b/server/src/lib/nodes.ts index 47600421..b9efa8db 100644 --- a/server/src/lib/nodes.ts +++ b/server/src/lib/nodes.ts @@ -28,7 +28,11 @@ type NodeCollaboratorRow = { role: string; }; -export const getCollaboratorRole = async ( +type NodeIdRow = { + id: string; +}; + +export const fetchCollaboratorRole = async ( nodeId: string, collaboratorId: string, ): Promise => { @@ -65,3 +69,24 @@ export const getCollaboratorRole = async ( return highestRole; }; + +export const fetchNodeTree = async (nodeId: string): Promise => { + const query = sql` + WITH RECURSIVE ancestors(id, parent_id, level) AS ( + SELECT id, parent_id, 0 AS level + FROM nodes + WHERE id = ${nodeId} + UNION ALL + SELECT n.id, n.parent_id, a.level + 1 + FROM nodes n + INNER JOIN ancestors a ON n.id = a.parent_id + ) + SELECT n.id + FROM nodes n + JOIN ancestors a ON n.id = a.id + ORDER BY a.level DESC; + `.compile(database); + + const result = await database.executeQuery(query); + return result.rows.map((row) => row.id); +}; diff --git a/server/src/queues/changes.ts b/server/src/queues/changes.ts new file mode 100644 index 00000000..fdd6c4c2 --- /dev/null +++ b/server/src/queues/changes.ts @@ -0,0 +1,171 @@ +import { database } from '@/data/database'; +import { CHANNEL_NAMES, redis } from '@/data/redis'; +import { getIdType, IdType } from '@/lib/id'; +import { fetchNodeTree } from '@/lib/nodes'; +import { ServerChangeData } from '@/types/sync'; +import { Job, Queue, Worker } from 'bullmq'; + +const REDIS_HOST = process.env.REDIS_HOST; +const REDIS_PASSWORD = process.env.REDIS_PASSWORD; +const REDIS_PORT = process.env.REDIS_PORT; +const REDIS_DB = process.env.REDIS_DB; + +if (!REDIS_HOST || !REDIS_PASSWORD || !REDIS_PORT || !REDIS_DB) { + throw new Error('Redis configuration is missing'); +} + +export const queue = new Queue('changes', { + connection: { + host: REDIS_HOST, + password: REDIS_PASSWORD, + port: parseInt(REDIS_PORT), + db: parseInt(REDIS_DB), + }, + defaultJobOptions: { + removeOnComplete: true, + }, +}); + +export const enqueueChange = async (id: string): Promise => { + await queue.add('change', { id }, { jobId: id }); +}; + +export const enqueueChanges = async (ids: string[]): Promise => { + await queue.addBulk(ids.map((id) => ({ name: 'change', data: { id } }))); +}; + +export const initChangeWorker = () => { + return new Worker('changes', handleChangeJob, { + connection: { + host: REDIS_HOST, + password: REDIS_PASSWORD, + port: parseInt(REDIS_PORT), + db: parseInt(REDIS_DB), + }, + }); +}; + +const handleChangeJob = async (job: Job) => { + try { + const changeId = job.data.id; + const change = await database + .selectFrom('changes') + .selectAll() + .where('id', '=', changeId) + .executeTakeFirst(); + + if (!change) { + return; + } + + if (change.notified_at) { + console.log('change already notified'); + return; + } + + const changeData = change.data as ServerChangeData; + const nodeId = getNodeId(changeData); + if (!nodeId) { + return; + } + + const deviceIds = await fetchDevicesForNode(change.workspace_id, nodeId); + if (deviceIds.length === 0) { + return; + } + + await database.transaction().execute(async (trx) => { + await trx + .insertInto('change_devices') + .values( + deviceIds.map((deviceId) => ({ + change_id: changeId, + device_id: deviceId, + retry_count: 0, + })), + ) + .execute(); + + await trx + .updateTable('changes') + .set({ notified_at: new Date() }) + .where('id', '=', changeId) + .execute(); + }); + + redis.publish( + CHANNEL_NAMES.CHANGES, + JSON.stringify({ + changeId, + deviceIds, + }), + ); + } catch (error) { + console.error('error', error); + } +}; + +const getNodeId = (changeData: ServerChangeData): string | null => { + switch (changeData.type) { + case 'node_create': + return changeData.id; + case 'node_update': + return changeData.id; + case 'node_delete': + return changeData.id; + case 'node_collaborator_create': + return changeData.nodeId; + case 'node_collaborator_update': + return changeData.nodeId; + case 'node_collaborator_delete': + return changeData.nodeId; + case 'node_reaction_create': + return changeData.nodeId; + case 'node_reaction_delete': + return changeData.nodeId; + default: + return null; + } +}; + +const fetchDevicesForNode = async ( + workspaceId: string, + nodeId: string, +): Promise => { + const idType = getIdType(nodeId); + if (idType === IdType.User) { + return fetchAllWorkspaceDevices(workspaceId); + } + + const nodeTree = await fetchNodeTree(nodeId); + return fetchDevicesForNodes(nodeTree); +}; + +const fetchAllWorkspaceDevices = async ( + workspaceId: string, +): Promise => { + const deviceIds = await database + .selectFrom('workspace_users as wu') + .fullJoin('account_devices as ad', 'wu.account_id', 'ad.account_id') + .select('ad.id') + .where('wu.workspace_id', '=', workspaceId) + .execute(); + + return deviceIds + .map((row) => row.id) + .filter((id): id is string => id !== null); +}; + +const fetchDevicesForNodes = async (nodeIds: string[]): Promise => { + const deviceIds = await database + .selectFrom('node_collaborators as nc') + .fullJoin('workspace_users as wu', 'nc.collaborator_id', 'wu.id') + .fullJoin('account_devices as ad', 'wu.account_id', 'ad.account_id') + .select('ad.id') + .where('nc.node_id', 'in', nodeIds) + .execute(); + + return deviceIds + .map((row) => row.id) + .filter((id): id is string => id !== null); +}; diff --git a/server/src/routes/accounts.ts b/server/src/routes/accounts.ts index 443ec17e..7b137146 100644 --- a/server/src/routes/accounts.ts +++ b/server/src/routes/accounts.ts @@ -12,9 +12,10 @@ import { ApiError, NeuronRequest, NeuronResponse } from '@/types/api'; import { generateId, IdType } from '@/lib/id'; import { database } from '@/data/database'; import bcrypt from 'bcrypt'; -import { WorkspaceOutput } from '@/types/workspaces'; +import { WorkspaceOutput, WorkspaceRole } from '@/types/workspaces'; import { authMiddleware } from '@/middlewares/auth'; import { generateToken } from '@/lib/tokens'; +import { mapNode } from '@/lib/nodes'; const GoogleUserInfoUrl = 'https://www.googleapis.com/oauth2/v1/userinfo'; const SaltRounds = 10; @@ -233,31 +234,47 @@ const buildLoginOutput = async ( .execute(); const workspaceOutputs: WorkspaceOutput[] = []; - const workspaceIds = workspaceUsers.map((wa) => wa.workspace_id); - if (workspaceIds.length > 0) { + if (workspaceUsers.length > 0) { + const workspaceIds = workspaceUsers.map((wu) => wu.workspace_id); const workspaces = await database .selectFrom('workspaces') .where('id', 'in', workspaceIds) .selectAll() .execute(); + const userIds = workspaceUsers.map((wu) => wu.id); + const userNodes = await database + .selectFrom('nodes') + .selectAll() + .where('id', 'in', userIds) + .execute(); + for (const workspaceUser of workspaceUsers) { const workspace = workspaces.find( (w) => w.id === workspaceUser.workspace_id, ); + if (!workspace) { continue; } + const userNode = userNodes.find((n) => n.id === workspaceUser.id); + if (!userNode) { + continue; + } + workspaceOutputs.push({ id: workspace.id, name: workspace.name, - role: workspaceUser.role, - userId: workspaceUser.id, versionId: workspaceUser.version_id, - accountId: workspaceUser.account_id, avatar: workspace.avatar, description: workspace.description, + user: { + id: workspaceUser.id, + accountId: workspaceUser.account_id, + role: workspaceUser.role as WorkspaceRole, + node: mapNode(userNode), + }, }); } } diff --git a/server/src/routes/workspaces.ts b/server/src/routes/workspaces.ts index faba9f3d..7e4bc1a2 100644 --- a/server/src/routes/workspaces.ts +++ b/server/src/routes/workspaces.ts @@ -1,6 +1,4 @@ import { - Workspace, - WorkspaceUser, WorkspaceAccountRoleUpdateInput, WorkspaceAccountsInviteInput, WorkspaceUserStatus, @@ -17,6 +15,7 @@ import * as Y from 'yjs'; import { fromUint8Array, toUint8Array } from 'js-base64'; import { CreateAccount, + CreateChange, CreateNode, CreateWorkspaceUser, SelectNode, @@ -26,6 +25,11 @@ import { getNameFromEmail } from '@/lib/utils'; import { AccountStatus } from '@/types/accounts'; import { ServerNode } from '@/types/nodes'; import { mapNode } from '@/lib/nodes'; +import { + ServerNodeCreateChangeData, + ServerNodeUpdateChangeData, +} from '@/types/sync'; +import { enqueueChange, enqueueChanges } from '@/queues/changes'; export const workspacesRouter = Router(); @@ -59,16 +63,9 @@ workspacesRouter.post('/', async (req: NeuronRequest, res: NeuronResponse) => { }); } - const workspace: Workspace = { - id: generateId(IdType.Workspace), - name: input.name, - description: input.description, - avatar: input.avatar, - createdAt: new Date(), - createdBy: account.id, - status: WorkspaceStatus.Active, - versionId: generateId(IdType.Version), - }; + const createdAt = new Date(); + const workspaceId = generateId(IdType.Workspace); + const workspaceVersionId = generateId(IdType.Version); const userId = generateId(IdType.User); const userVersionId = generateId(IdType.Version); @@ -88,30 +85,44 @@ workspacesRouter.post('/', async (req: NeuronRequest, res: NeuronResponse) => { const userAttributes = JSON.stringify(userAttributesMap.toJSON()); const userState = fromUint8Array(Y.encodeStateAsUpdate(userDoc)); - - const workspaceUser: WorkspaceUser = { + const userChangeId = generateId(IdType.Change); + const changeData: ServerNodeCreateChangeData = { + type: 'node_create', id: userId, - accountId: account.id, - workspaceId: workspace.id, - role: WorkspaceRole.Owner, - createdAt: new Date(), + workspaceId: workspaceId, + state: userState, + createdAt: createdAt.toISOString(), + serverCreatedAt: createdAt.toISOString(), + versionId: userVersionId, createdBy: account.id, - status: WorkspaceUserStatus.Active, - versionId: generateId(IdType.Version), }; await database.transaction().execute(async (trx) => { await trx .insertInto('workspaces') .values({ - id: workspace.id, - name: workspace.name, - description: workspace.description, - avatar: workspace.avatar, - created_at: workspace.createdAt, - created_by: workspace.createdBy, - status: workspace.status, - version_id: workspace.versionId, + id: workspaceId, + name: input.name, + description: input.description, + avatar: input.avatar, + created_at: createdAt, + created_by: account.id, + status: WorkspaceStatus.Active, + version_id: workspaceVersionId, + }) + .execute(); + + await trx + .insertInto('workspace_users') + .values({ + id: userId, + account_id: account.id, + workspace_id: workspaceId, + role: WorkspaceRole.Owner, + created_at: createdAt, + created_by: account.id, + status: WorkspaceUserStatus.Active, + version_id: generateId(IdType.Version), }) .execute(); @@ -119,40 +130,52 @@ workspacesRouter.post('/', async (req: NeuronRequest, res: NeuronResponse) => { .insertInto('nodes') .values({ id: userId, - workspace_id: workspace.id, + workspace_id: workspaceId, attributes: userAttributes, state: userState, - created_at: workspaceUser.createdAt, - created_by: workspaceUser.createdBy, + created_at: createdAt, + created_by: account.id, version_id: userVersionId, - server_created_at: new Date(), + server_created_at: createdAt, }) .execute(); await trx - .insertInto('workspace_users') + .insertInto('changes') .values({ - id: workspaceUser.id, - account_id: workspaceUser.accountId, - workspace_id: workspaceUser.workspaceId, - role: workspaceUser.role, - created_at: workspaceUser.createdAt, - created_by: workspaceUser.createdBy, - status: workspaceUser.status, - version_id: workspaceUser.versionId, + id: userChangeId, + workspace_id: workspaceId, + data: JSON.stringify(changeData), + created_at: createdAt, }) .execute(); }); + await enqueueChange(userChangeId); + const output: WorkspaceOutput = { - id: workspace.id, - name: workspace.name, - description: workspace.description, - avatar: workspace.avatar, - versionId: workspace.versionId, - accountId: account.id, - role: workspaceUser.role, - userId: userId, + id: workspaceId, + name: input.name, + description: input.description, + avatar: input.avatar, + versionId: workspaceVersionId, + user: { + id: userId, + accountId: account.id, + role: WorkspaceRole.Owner, + node: { + id: userId, + workspaceId: workspaceId, + type: 'user', + attributes: JSON.parse(userAttributes), + state: userState, + createdAt: new Date(), + createdBy: account.id, + versionId: userVersionId, + serverCreatedAt: new Date(), + index: null, + }, + }, }; return res.status(200).json(output); @@ -219,6 +242,19 @@ workspacesRouter.put( }); } + const userNode = await database + .selectFrom('nodes') + .selectAll() + .where('id', '=', workspaceUser.id) + .executeTakeFirst(); + + if (!userNode) { + return res.status(500).json({ + code: ApiError.InternalServerError, + message: 'Internal server error.', + }); + } + const updatedWorkspace = await database .updateTable('workspaces') .set({ @@ -245,9 +281,12 @@ workspacesRouter.put( description: updatedWorkspace.description, avatar: updatedWorkspace.avatar, versionId: updatedWorkspace.version_id, - accountId: req.account.id, - role: workspaceUser.role, - userId: workspaceUser.id, + user: { + id: workspaceUser.id, + accountId: workspaceUser.account_id, + role: workspaceUser.role, + node: mapNode(userNode), + }, }; return res.status(200).json(output); @@ -347,15 +386,31 @@ workspacesRouter.get( }); } + const userNode = await database + .selectFrom('nodes') + .selectAll() + .where('id', '=', workspaceUser.id) + .executeTakeFirst(); + + if (!userNode) { + return res.status(500).json({ + code: ApiError.InternalServerError, + message: 'Internal server error.', + }); + } + const output: WorkspaceOutput = { id: workspace.id, name: workspace.name, description: workspace.description, avatar: workspace.avatar, versionId: workspace.version_id, - accountId: req.account.id, - role: workspaceUser.role, - userId: workspaceUser.id, + user: { + id: workspaceUser.id, + accountId: workspaceUser.account_id, + role: workspaceUser.role as WorkspaceRole, + node: mapNode(userNode), + }, }; return res.status(200).json(output); @@ -383,14 +438,30 @@ workspacesRouter.get('/', async (req: NeuronRequest, res: NeuronResponse) => { .where('id', 'in', workspaceIds) .execute(); + const userNodes = await database + .selectFrom('nodes') + .selectAll() + .where( + 'id', + 'in', + workspaceUsers.map((wa) => wa.id), + ) + .execute(); + const outputs: WorkspaceOutput[] = []; for (const workspace of workspaces) { - const workspaceAccount = workspaceUsers.find( + const workspaceUser = workspaceUsers.find( (wa) => wa.workspace_id === workspace.id, ); - if (!workspaceAccount) { + if (!workspaceUser) { + continue; + } + + const userNode = userNodes.find((un) => un.id === workspaceUser.id); + + if (!userNode) { continue; } @@ -400,9 +471,12 @@ workspacesRouter.get('/', async (req: NeuronRequest, res: NeuronResponse) => { description: workspace.description, avatar: workspace.avatar, versionId: workspace.version_id, - accountId: req.account.id, - role: workspaceAccount.role, - userId: workspaceAccount.id, + user: { + id: workspaceUser.id, + accountId: workspaceUser.account_id, + role: workspaceUser.role as WorkspaceRole, + node: mapNode(userNode), + }, }; outputs.push(output); @@ -504,6 +578,7 @@ workspacesRouter.post( const accountsToCreate: CreateAccount[] = []; const workspaceUsersToCreate: CreateWorkspaceUser[] = []; const usersToCreate: CreateNode[] = []; + const changesToCreate: CreateChange[] = []; const users: ServerNode[] = []; for (const email of input.emails) { @@ -568,6 +643,7 @@ workspacesRouter.post( const userAttributes = JSON.stringify(userAttributesMap.toJSON()); const userState = fromUint8Array(Y.encodeStateAsUpdate(userDoc)); + const userChangeId = generateId(IdType.Change); workspaceUsersToCreate.push({ id: userId, @@ -604,6 +680,24 @@ workspacesRouter.post( workspace_id: user.workspaceId, }); + const changeData: ServerNodeCreateChangeData = { + type: 'node_create', + id: user.id, + workspaceId: user.workspaceId, + state: user.state, + createdAt: user.createdAt.toISOString(), + createdBy: user.createdBy, + versionId: user.versionId, + serverCreatedAt: user.serverCreatedAt.toISOString(), + }; + + changesToCreate.push({ + id: userChangeId, + workspace_id: workspace.id, + data: JSON.stringify(changeData), + created_at: new Date(), + }); + users.push(user); } @@ -627,7 +721,16 @@ workspacesRouter.post( if (usersToCreate.length > 0) { await trx.insertInto('nodes').values(usersToCreate).execute(); } + + if (changesToCreate.length > 0) { + await trx.insertInto('changes').values(changesToCreate).execute(); + } }); + + if (changesToCreate.length > 0) { + const changeIds = changesToCreate.map((change) => change.id); + await enqueueChanges(changeIds); + } } return res.status(200).json({ @@ -719,12 +822,18 @@ workspacesRouter.put( Y.applyUpdate(userDoc, toUint8Array(user.state)); + const userUpdates: string[] = []; + userDoc.on('update', (update) => { + userUpdates.push(fromUint8Array(update)); + }); + const userAttributesMap = userDoc.getMap('attributes'); userAttributesMap.set('role', input.role); const userAttributes = JSON.stringify(userAttributesMap.toJSON()); const encodedState = fromUint8Array(Y.encodeStateAsUpdate(userDoc)); const updatedAt = new Date(); + const userChangeId = generateId(IdType.Change); const userNode: ServerNode = { id: user.id, @@ -743,6 +852,17 @@ workspacesRouter.put( updatedBy: currentWorkspaceUser.id, }; + const changeData: ServerNodeUpdateChangeData = { + type: 'node_update', + id: userNode.id, + workspaceId: userNode.workspaceId, + updates: userUpdates, + updatedAt: updatedAt.toISOString(), + updatedBy: currentWorkspaceUser.id, + versionId: userNode.versionId, + serverUpdatedAt: updatedAt.toISOString(), + }; + await database.transaction().execute(async (trx) => { await database .updateTable('workspace_users') @@ -767,8 +887,20 @@ workspacesRouter.put( }) .where('id', '=', userNode.id) .execute(); + + await trx + .insertInto('changes') + .values({ + id: userChangeId, + workspace_id: workspace.id, + data: JSON.stringify(changeData), + created_at: new Date(), + }) + .execute(); }); + await enqueueChange(userChangeId); + return res.status(200).json({ user: userNode, }); diff --git a/server/src/sockets/socket-connection.ts b/server/src/sockets/socket-connection.ts index 336e7c6b..1e8061c5 100644 --- a/server/src/sockets/socket-connection.ts +++ b/server/src/sockets/socket-connection.ts @@ -1,16 +1,20 @@ import { database } from '@/data/database'; import { MessageInput } from '@/messages'; import { NeuronRequestAccount } from '@/types/api'; +import { ServerChange } from '@/types/sync'; import { WebSocket } from 'ws'; export class SocketConnection { private readonly socket: WebSocket; - private readonly account: NeuronRequestAccount; private readonly pendingChanges: Set = new Set(); + public accountId: string; + public deviceId: string; + constructor(socket: WebSocket, account: NeuronRequestAccount) { this.socket = socket; - this.account = account; + this.accountId = account.id; + this.deviceId = account.deviceId; socket.on('message', (message) => { this.handleMessage(message.toString()); @@ -46,14 +50,24 @@ export class SocketConnection { if (messageInput.type === 'server_change_result') { if (messageInput.success) { await database - .deleteFrom('changes') - .where('id', '=', messageInput.changeId) + .deleteFrom('change_devices') + .where((eb) => + eb.and([ + eb('change_id', '=', messageInput.changeId), + eb('device_id', '=', this.deviceId), + ]), + ) .execute(); } else { await database - .updateTable('changes') + .updateTable('change_devices') .set((eb) => ({ retry_count: eb('retry_count', '+', 1) })) - .where('id', '=', messageInput.changeId) + .where((eb) => + eb.and([ + eb('change_id', '=', messageInput.changeId), + eb('device_id', '=', this.deviceId), + ]), + ) .execute(); } @@ -66,9 +80,16 @@ export class SocketConnection { private async sendPendingChanges() { const changes = await database - .selectFrom('changes') - .selectAll() - .where('device_id', '=', this.account.deviceId) + .selectFrom('changes as c') + .fullJoin('change_devices as cd', 'c.id', 'cd.change_id') + .select([ + 'c.id', + 'c.workspace_id', + 'cd.device_id', + 'c.created_at', + 'c.data', + ]) + .where('cd.device_id', '=', this.deviceId) .orderBy('id', 'asc') .limit(100) .execute(); @@ -77,15 +98,28 @@ export class SocketConnection { return; } - this.send({ - type: 'server_change_batch', - changes: changes.map((change) => ({ + const serverChanges: ServerChange[] = []; + for (const change of changes) { + if ( + !change.id || + !change.workspace_id || + !change.created_at || + !change.data + ) { + continue; + } + + serverChanges.push({ id: change.id, workspaceId: change.workspace_id, - deviceId: change.device_id, createdAt: change.created_at.toISOString(), data: change.data, - })), + }); + } + + this.send({ + type: 'server_change_batch', + changes: serverChanges, }); } } diff --git a/server/src/sockets/socket-manager.ts b/server/src/sockets/socket-manager.ts index b6f70346..cced814b 100644 --- a/server/src/sockets/socket-manager.ts +++ b/server/src/sockets/socket-manager.ts @@ -49,6 +49,18 @@ class SocketManager { }); } + public getConnections(deviceIds: string[]): SocketConnection[] { + const result: SocketConnection[] = []; + for (const deviceId of deviceIds) { + const connection = this.sockets.get(deviceId); + if (connection) { + result.push(connection); + } + } + + return result; + } + public send(deviceId: string, message: MessageInput) { const connection = this.sockets.get(deviceId); if (!connection) { diff --git a/server/src/sync/node-collaborators.ts b/server/src/sync/node-collaborators.ts index 7e7dceaf..7e29e1b4 100644 --- a/server/src/sync/node-collaborators.ts +++ b/server/src/sync/node-collaborators.ts @@ -1,10 +1,20 @@ -import { database } from '@/data/database'; +import { + database, + hasDeleteChanges, + hasInsertChanges, + hasUpdateChanges, +} from '@/data/database'; import { SelectWorkspaceUser } from '@/data/schema'; -import { getCollaboratorRole } from '@/lib/nodes'; +import { generateId, IdType } from '@/lib/id'; +import { fetchCollaboratorRole } from '@/lib/nodes'; +import { enqueueChange } from '@/queues/changes'; import { SyncLocalChangeResult, LocalChange, LocalNodeCollaboratorChangeData, + ServerNodeCollaboratorCreateChangeData, + ServerNodeCollaboratorUpdateChangeData, + ServerNodeCollaboratorDeleteChangeData, } from '@/types/sync'; export const handleNodeCollaboratorChange = async ( @@ -49,20 +59,52 @@ const handleCreateNodeCollaboratorChange = async ( }; } - await database - .insertInto('node_collaborators') - .values({ - node_id: nodeCollaboratorData.node_id, - collaborator_id: nodeCollaboratorData.collaborator_id, - role: nodeCollaboratorData.role, - workspace_id: workspaceUser.workspace_id, - created_at: new Date(nodeCollaboratorData.created_at), - created_by: nodeCollaboratorData.created_by, - server_created_at: new Date(), - version_id: nodeCollaboratorData.version_id, - }) - .onConflict((ob) => ob.doNothing()) - .execute(); + const serverCreatedAt = new Date(); + const changeId = generateId(IdType.Change); + const changeData: ServerNodeCollaboratorCreateChangeData = { + type: 'node_collaborator_create', + nodeId: nodeCollaboratorData.node_id, + collaboratorId: nodeCollaboratorData.collaborator_id, + role: nodeCollaboratorData.role, + createdAt: nodeCollaboratorData.created_at, + serverCreatedAt: serverCreatedAt.toISOString(), + workspaceId: workspaceUser.workspace_id, + createdBy: nodeCollaboratorData.created_by, + versionId: nodeCollaboratorData.version_id, + }; + + await database.transaction().execute(async (trx) => { + const result = await trx + .insertInto('node_collaborators') + .values({ + node_id: nodeCollaboratorData.node_id, + collaborator_id: nodeCollaboratorData.collaborator_id, + role: nodeCollaboratorData.role, + workspace_id: workspaceUser.workspace_id, + created_at: new Date(nodeCollaboratorData.created_at), + created_by: nodeCollaboratorData.created_by, + server_created_at: new Date(), + version_id: nodeCollaboratorData.version_id, + }) + .onConflict((ob) => ob.doNothing()) + .execute(); + + if (!hasInsertChanges(result)) { + return; + } + + await trx + .insertInto('changes') + .values({ + id: changeId, + workspace_id: workspaceUser.workspace_id, + data: JSON.stringify(changeData), + created_at: new Date(), + }) + .execute(); + }); + + await enqueueChange(changeId); return { status: 'success', @@ -93,7 +135,7 @@ const canCreateNodeCollaborator = async ( } // Get the current user's role for the node or its ancestors - const currentUserRole = await getCollaboratorRole( + const currentUserRole = await fetchCollaboratorRole( data.node_id, workspaceUser.id, ); @@ -185,23 +227,57 @@ const handleUpdateNodeCollaboratorChange = async ( } } - await database - .updateTable('node_collaborators') - .set({ - role: nodeCollaboratorData.role, - updated_at: updatedAt, - updated_by: - nodeCollaboratorData.updated_by ?? existingNodeCollaborator.created_by, - version_id: nodeCollaboratorData.version_id, - server_updated_at: new Date(), - }) - .where((eb) => - eb.and([ - eb('node_id', '=', nodeCollaboratorData.node_id), - eb('collaborator_id', '=', nodeCollaboratorData.collaborator_id), - ]), - ) - .execute(); + const serverUpdatedAt = new Date(); + const changeId = generateId(IdType.Change); + const changeData: ServerNodeCollaboratorUpdateChangeData = { + type: 'node_collaborator_update', + nodeId: nodeCollaboratorData.node_id, + collaboratorId: nodeCollaboratorData.collaborator_id, + role: nodeCollaboratorData.role, + serverUpdatedAt: serverUpdatedAt.toISOString(), + workspaceId: workspaceUser.workspace_id, + versionId: nodeCollaboratorData.version_id, + updatedAt: updatedAt.toISOString(), + updatedBy: + nodeCollaboratorData.updated_by ?? existingNodeCollaborator.created_by, + }; + + await database.transaction().execute(async (trx) => { + const result = await trx + .updateTable('node_collaborators') + .set({ + role: nodeCollaboratorData.role, + updated_at: updatedAt, + updated_by: + nodeCollaboratorData.updated_by ?? + existingNodeCollaborator.created_by, + version_id: nodeCollaboratorData.version_id, + server_updated_at: new Date(), + }) + .where((eb) => + eb.and([ + eb('node_id', '=', nodeCollaboratorData.node_id), + eb('collaborator_id', '=', nodeCollaboratorData.collaborator_id), + ]), + ) + .execute(); + + if (!hasUpdateChanges(result)) { + return; + } + + await trx + .insertInto('changes') + .values({ + id: changeId, + workspace_id: workspaceUser.workspace_id, + data: JSON.stringify(changeData), + created_at: new Date(), + }) + .execute(); + }); + + await enqueueChange(changeId); return { status: 'success', @@ -223,7 +299,7 @@ const canUpdateNodeCollaborator = async ( } // Get the current user's role for the node or its ancestors - const currentUserRole = await getCollaboratorRole( + const currentUserRole = await fetchCollaboratorRole( data.node_id, workspaceUser.id, ); @@ -294,15 +370,41 @@ const handleDeleteNodeCollaboratorChange = async ( }; } - await database - .deleteFrom('node_collaborators') - .where((eb) => - eb.and([ - eb('node_id', '=', nodeCollaboratorData.node_id), - eb('collaborator_id', '=', nodeCollaboratorData.collaborator_id), - ]), - ) - .execute(); + const changeId = generateId(IdType.Change); + const changeData: ServerNodeCollaboratorDeleteChangeData = { + type: 'node_collaborator_delete', + nodeId: nodeCollaboratorData.node_id, + collaboratorId: nodeCollaboratorData.collaborator_id, + workspaceId: workspaceUser.workspace_id, + }; + + await database.transaction().execute(async (trx) => { + const result = await trx + .deleteFrom('node_collaborators') + .where((eb) => + eb.and([ + eb('node_id', '=', nodeCollaboratorData.node_id), + eb('collaborator_id', '=', nodeCollaboratorData.collaborator_id), + ]), + ) + .execute(); + + if (!hasDeleteChanges(result)) { + return; + } + + await trx + .insertInto('changes') + .values({ + id: changeId, + workspace_id: workspaceUser.workspace_id, + data: JSON.stringify(changeData), + created_at: new Date(), + }) + .execute(); + }); + + await enqueueChange(changeId); return { status: 'success', @@ -324,7 +426,7 @@ const canDeleteNodeCollaborator = async ( } // Get the current user's role for the node or its ancestors - const currentUserRole = await getCollaboratorRole( + const currentUserRole = await fetchCollaboratorRole( data.node_id, workspaceUser.id, ); diff --git a/server/src/sync/node-reactions.ts b/server/src/sync/node-reactions.ts index 0f60722d..0cee095d 100644 --- a/server/src/sync/node-reactions.ts +++ b/server/src/sync/node-reactions.ts @@ -1,10 +1,14 @@ -import { database } from '@/data/database'; +import { database, hasInsertChanges, hasDeleteChanges } from '@/data/database'; import { SelectWorkspaceUser } from '@/data/schema'; -import { getCollaboratorRole } from '@/lib/nodes'; +import { generateId, IdType } from '@/lib/id'; +import { fetchCollaboratorRole } from '@/lib/nodes'; +import { enqueueChange } from '@/queues/changes'; import { SyncLocalChangeResult, LocalChange, LocalNodeReactionChangeData, + ServerNodeReactionCreateChangeData, + ServerNodeReactionDeleteChangeData, } from '@/types/sync'; export const handleNodeReactionChange = async ( @@ -45,7 +49,7 @@ const handleCreateNodeReactionChange = async ( }; } - const nodeRole = await getCollaboratorRole( + const nodeRole = await fetchCollaboratorRole( nodeReactionData.node_id, workspaceUser.id, ); @@ -56,18 +60,48 @@ const handleCreateNodeReactionChange = async ( }; } - await database - .insertInto('node_reactions') - .values({ - node_id: nodeReactionData.node_id, - actor_id: nodeReactionData.actor_id, - reaction: nodeReactionData.reaction, - created_at: new Date(nodeReactionData.created_at), - workspace_id: workspaceUser.workspace_id, - server_created_at: new Date(), - }) - .onConflict((ob) => ob.doNothing()) - .execute(); + const serverCreatedAt = new Date(); + const changeId = generateId(IdType.Change); + const changeData: ServerNodeReactionCreateChangeData = { + type: 'node_reaction_create', + nodeId: nodeReactionData.node_id, + actorId: nodeReactionData.actor_id, + reaction: nodeReactionData.reaction, + createdAt: nodeReactionData.created_at, + serverCreatedAt: serverCreatedAt.toISOString(), + workspaceId: workspaceUser.workspace_id, + }; + + await database.transaction().execute(async (trx) => { + const result = await database + .insertInto('node_reactions') + .values({ + node_id: nodeReactionData.node_id, + actor_id: nodeReactionData.actor_id, + reaction: nodeReactionData.reaction, + created_at: new Date(nodeReactionData.created_at), + workspace_id: workspaceUser.workspace_id, + server_created_at: new Date(), + }) + .onConflict((ob) => ob.doNothing()) + .execute(); + + if (!hasInsertChanges(result)) { + return; + } + + await trx + .insertInto('changes') + .values({ + id: changeId, + workspace_id: workspaceUser.workspace_id, + data: JSON.stringify(changeData), + created_at: new Date(), + }) + .execute(); + }); + + await enqueueChange(changeId); return { status: 'success', @@ -94,16 +128,43 @@ const handleDeleteNodeReactionChange = async ( }; } - await database - .deleteFrom('node_reactions') - .where((eb) => - eb.and([ - eb('node_id', '=', nodeReactionData.node_id), - eb('actor_id', '=', nodeReactionData.actor_id), - eb('reaction', '=', nodeReactionData.reaction), - ]), - ) - .execute(); + const changeId = generateId(IdType.Change); + const changeData: ServerNodeReactionDeleteChangeData = { + type: 'node_reaction_delete', + nodeId: nodeReactionData.node_id, + actorId: nodeReactionData.actor_id, + reaction: nodeReactionData.reaction, + workspaceId: workspaceUser.workspace_id, + }; + + await database.transaction().execute(async (trx) => { + const result = await trx + .deleteFrom('node_reactions') + .where((eb) => + eb.and([ + eb('node_id', '=', nodeReactionData.node_id), + eb('actor_id', '=', nodeReactionData.actor_id), + eb('reaction', '=', nodeReactionData.reaction), + ]), + ) + .execute(); + + if (!hasDeleteChanges(result)) { + return; + } + + await trx + .insertInto('changes') + .values({ + id: changeId, + workspace_id: workspaceUser.workspace_id, + data: JSON.stringify(changeData), + created_at: new Date(), + }) + .execute(); + }); + + await enqueueChange(changeId); return { status: 'success', diff --git a/server/src/sync/nodes.ts b/server/src/sync/nodes.ts index 3afbd37f..c94296c8 100644 --- a/server/src/sync/nodes.ts +++ b/server/src/sync/nodes.ts @@ -1,14 +1,24 @@ -import { database } from '@/data/database'; +import { + database, + hasInsertChanges, + hasUpdateChanges, + hasDeleteChanges, +} from '@/data/database'; import { SelectWorkspaceUser } from '@/data/schema'; -import { getCollaboratorRole } from '@/lib/nodes'; +import { fetchCollaboratorRole } from '@/lib/nodes'; import { SyncLocalChangeResult, LocalChange, LocalNodeChangeData, + ServerNodeCreateChangeData, + ServerNodeUpdateChangeData, + ServerNodeDeleteChangeData, } from '@/types/sync'; import { ServerNodeAttributes } from '@/types/nodes'; import { fromUint8Array, toUint8Array } from 'js-base64'; import * as Y from 'yjs'; +import { generateId, IdType } from '@/lib/id'; +import { enqueueChange } from '@/queues/changes'; export const handleNodeChange = async ( workspaceUser: SelectWorkspaceUser, @@ -51,7 +61,7 @@ const handleCreateNodeChange = async ( const attributes: ServerNodeAttributes = JSON.parse(nodeData.attributes); if (attributes.parentId) { - const parentRole = await getCollaboratorRole( + const parentRole = await fetchCollaboratorRole( attributes.parentId, workspaceUser.id, ); @@ -66,19 +76,50 @@ const handleCreateNodeChange = async ( } } - await database - .insertInto('nodes') - .values({ - id: nodeData.id, - attributes: nodeData.attributes, - workspace_id: workspaceUser.workspace_id, - state: nodeData.state, - created_at: new Date(nodeData.created_at), - created_by: nodeData.created_by, - version_id: nodeData.version_id, - server_created_at: new Date(), - }) - .execute(); + const serverCreatedAt = new Date(); + const changeId = generateId(IdType.Change); + const changeData: ServerNodeCreateChangeData = { + type: 'node_create', + id: nodeData.id, + workspaceId: workspaceUser.workspace_id, + state: nodeData.state, + createdAt: nodeData.created_at, + createdBy: nodeData.created_by, + serverCreatedAt: serverCreatedAt.toISOString(), + versionId: nodeData.version_id, + }; + + await database.transaction().execute(async (trx) => { + const result = await trx + .insertInto('nodes') + .values({ + id: nodeData.id, + attributes: nodeData.attributes, + workspace_id: workspaceUser.workspace_id, + state: nodeData.state, + created_at: new Date(nodeData.created_at), + created_by: nodeData.created_by, + version_id: nodeData.version_id, + server_created_at: serverCreatedAt, + }) + .execute(); + + if (!hasInsertChanges(result)) { + return; + } + + await trx + .insertInto('changes') + .values({ + id: changeId, + workspace_id: workspaceUser.workspace_id, + data: JSON.stringify(changeData), + created_at: new Date(), + }) + .execute(); + }); + + await enqueueChange(changeId); return { status: 'success', @@ -111,7 +152,7 @@ const handleUpdateNodeChange = async ( }; } - const role = await getCollaboratorRole(nodeData.id, workspaceUser.id); + const role = await fetchCollaboratorRole(nodeData.id, workspaceUser.id); if (role === null) { return { status: 'error', @@ -122,30 +163,67 @@ const handleUpdateNodeChange = async ( ? new Date(nodeData.updated_at) : new Date(); const updatedBy = nodeData.updated_by ?? workspaceUser.id; + const serverUpdatedAt = new Date(); const doc = new Y.Doc({ guid: nodeData.id, }); Y.applyUpdate(doc, toUint8Array(existingNode.state)); + + const updates: string[] = []; + doc.on('update', (update) => { + updates.push(fromUint8Array(update)); + }); + Y.applyUpdate(doc, toUint8Array(nodeData.state)); const attributesMap = doc.getMap('attributes'); const attributes = JSON.stringify(attributesMap.toJSON()); const encodedState = fromUint8Array(Y.encodeStateAsUpdate(doc)); - await database - .updateTable('nodes') - .set({ - attributes: attributes, - state: encodedState, - updated_at: updatedAt, - updated_by: updatedBy, - version_id: nodeData.version_id, - server_updated_at: new Date(), - }) - .where('id', '=', nodeData.id) - .execute(); + const changeId = generateId(IdType.Change); + const changeData: ServerNodeUpdateChangeData = { + type: 'node_update', + id: nodeData.id, + workspaceId: workspaceUser.workspace_id, + updates: updates, + updatedAt: updatedAt.toISOString(), + updatedBy: updatedBy, + serverUpdatedAt: serverUpdatedAt.toISOString(), + versionId: nodeData.version_id, + }; + + await database.transaction().execute(async (trx) => { + const result = await trx + .updateTable('nodes') + .set({ + attributes: attributes, + state: encodedState, + updated_at: updatedAt, + updated_by: updatedBy, + version_id: nodeData.version_id, + server_updated_at: new Date(), + }) + .where('id', '=', nodeData.id) + .execute(); + + if (!hasUpdateChanges(result)) { + return; + } + + await trx + .insertInto('changes') + .values({ + id: changeId, + workspace_id: workspaceUser.workspace_id, + data: JSON.stringify(changeData), + created_at: new Date(), + }) + .execute(); + }); + + await enqueueChange(changeId); return { status: 'success', @@ -181,14 +259,43 @@ const handleDeleteNodeChange = async ( }; } - const role = await getCollaboratorRole(nodeData.id, workspaceUser.id); + const role = await fetchCollaboratorRole(nodeData.id, workspaceUser.id); if (role === null) { return { status: 'error', }; } - await database.deleteFrom('nodes').where('id', '=', nodeData.id).execute(); + const changeId = generateId(IdType.Change); + const changeData: ServerNodeDeleteChangeData = { + type: 'node_delete', + id: nodeData.id, + workspaceId: workspaceUser.workspace_id, + }; + + await database.transaction().execute(async (trx) => { + const result = await trx + .deleteFrom('nodes') + .where('id', '=', nodeData.id) + .execute(); + + if (!hasDeleteChanges(result)) { + return; + } + + await trx + .insertInto('changes') + .values({ + id: changeId, + workspace_id: workspaceUser.workspace_id, + data: JSON.stringify(changeData), + created_at: new Date(), + }) + .execute(); + }); + + await enqueueChange(changeId); + return { status: 'success', }; diff --git a/server/src/types/cdc.ts b/server/src/types/cdc.ts deleted file mode 100644 index ec4de10c..00000000 --- a/server/src/types/cdc.ts +++ /dev/null @@ -1,74 +0,0 @@ -export type CdcMessage = { - before?: T | null; - after: T; - source: CdcSource; - op: string; - ts_ms: number; - ts_ns: number; - ts_us: number; - transaction: any; -}; - -type CdcSource = { - version: string; - connector: string; - name: string; - ts_ms: number; - snapshot: string; - db: string; - sequence: string; - ts_us: number; - ts_ns: number; - schema: string; - table: string; - txId: number; - lsn: number; -}; - -export type ChangeCdcData = { - id: string; - device_id: string; - workspace_id: string; - data: string; - created_at: string; -}; - -export type NodeCdcData = { - id: string; - workspace_id: string; - parent_id: string | null; - type: string; - index: string | null; - attributes: string; - state: string; - created_at: string; - created_by: string; - updated_at: string | null; - updated_by: string | null; - version_id: string; - server_created_at: string; - server_updated_at: string | null; -}; - -export type NodeCollaboratorCdcData = { - node_id: string; - collaborator_id: string; - role: string; - workspace_id: string; - created_at: string; - created_by: string; - updated_at: string | null; - updated_by: string | null; - version_id: string; - server_created_at: string; - server_updated_at: string | null; -}; - -export type NodeReactionCdcData = { - node_id: string; - actor_id: string; - reaction: string; - workspace_id: string; - created_at: string; - server_created_at: string; -}; diff --git a/server/src/types/sync.ts b/server/src/types/sync.ts index 70d96a14..78671aa7 100644 --- a/server/src/types/sync.ts +++ b/server/src/types/sync.ts @@ -60,11 +60,15 @@ export type LocalNodeReactionChangeData = { export type ServerChange = { id: string; workspaceId: string; - deviceId: string; data: ServerChangeData; createdAt: string; }; +export type ServerChangeBroadcastMessage = { + changeId: string; + deviceIds: string[]; +}; + export type ServerChangeData = | ServerNodeCreateChangeData | ServerNodeUpdateChangeData @@ -90,7 +94,7 @@ export type ServerNodeUpdateChangeData = { type: 'node_update'; id: string; workspaceId: string; - update: string; + updates: string[]; updatedAt: string; updatedBy: string; versionId: string; diff --git a/server/src/types/workspaces.ts b/server/src/types/workspaces.ts index dcb3fc6b..3e2f7442 100644 --- a/server/src/types/workspaces.ts +++ b/server/src/types/workspaces.ts @@ -1,3 +1,5 @@ +import { ServerNode } from '@/types/nodes'; + export enum WorkspaceStatus { Active = 1, Inactive = 2, @@ -15,19 +17,6 @@ export enum WorkspaceUserStatus { Inactive = 2, } -export type Workspace = { - id: string; - name: string; - description?: string | null; - avatar?: string | null; - createdAt: Date; - createdBy: string; - updatedAt?: Date | null; - updatedBy?: string | null; - status: WorkspaceStatus; - versionId: string; -}; - export type WorkspaceUser = { id: string; workspaceId: string; @@ -53,9 +42,14 @@ export type WorkspaceOutput = { description?: string | null; avatar?: string | null; versionId: string; + user: WorkspaceUserOutput; +}; + +export type WorkspaceUserOutput = { + id: string; accountId: string; - role: string; - userId: string; + role: WorkspaceRole; + node: ServerNode; }; export type WorkspaceAccountsInviteInput = { diff --git a/server/tsconfig.json b/server/tsconfig.json index ed3fa107..79a3b5e1 100644 --- a/server/tsconfig.json +++ b/server/tsconfig.json @@ -1,6 +1,6 @@ { "compilerOptions": { - "target": "ES6", + "target": "ES2020", "module": "commonjs", "outDir": "./dist", "rootDir": ".",