diff --git a/desktop/src/electron/database/global/migrations.ts b/desktop/src/data/databases/global/migrations.ts similarity index 96% rename from desktop/src/electron/database/global/migrations.ts rename to desktop/src/data/databases/global/migrations.ts index 44fed7cd..cbf5b6cf 100644 --- a/desktop/src/electron/database/global/migrations.ts +++ b/desktop/src/data/databases/global/migrations.ts @@ -10,6 +10,8 @@ const createAccountsTable: Migration = { .addColumn('email', 'text', (col) => col.notNull()) .addColumn('avatar', 'text') .addColumn('token', 'text', (col) => col.notNull()) + .addColumn('last_update_id', 'text') + .addColumn('last_update_at', 'text') .execute(); }, down: async (db) => { diff --git a/desktop/src/electron/database/workspace/migrations.ts b/desktop/src/data/databases/workspace/migrations.ts similarity index 92% rename from desktop/src/electron/database/workspace/migrations.ts rename to desktop/src/data/databases/workspace/migrations.ts index 93f2e88e..387478a1 100644 --- a/desktop/src/electron/database/workspace/migrations.ts +++ b/desktop/src/data/databases/workspace/migrations.ts @@ -18,6 +18,8 @@ const createNodesTable: Migration = { .addColumn('created_by', 'text', (col) => col.notNull()) .addColumn('updated_by', 'text') .addColumn('version_id', 'text', (col) => col.notNull()) + .addColumn('server_created_at', 'text') + .addColumn('server_updated_at', 'text') .addColumn('state', 'text') .execute(); }, diff --git a/server/package-lock.json b/server/package-lock.json index 3c35bb55..cf4219f3 100644 --- a/server/package-lock.json +++ b/server/package-lock.json @@ -18,13 +18,16 @@ "jsonwebtoken": "^9.0.2", "kafkajs": "^2.2.4", "postgres": "^3.4.4", - "ulid": "^2.3.0" + "redis": "^4.7.0", + "ulid": "^2.3.0", + "ws": "^8.18.0" }, "devDependencies": { "@types/bcrypt": "^5.0.2", "@types/express": "^4.17.21", "@types/jsonwebtoken": "^9.0.6", "@types/node": "^22.0.0", + "@types/ws": "^8.5.12", "concurrently": "^8.2.2", "nodemon": "^3.1.4", "prettier": "^3.3.3", @@ -176,6 +179,65 @@ "@prisma/debug": "5.17.0" } }, + "node_modules/@redis/bloom": { + "version": "1.2.0", + "resolved": "https://registry.npmjs.org/@redis/bloom/-/bloom-1.2.0.tgz", + "integrity": "sha512-HG2DFjYKbpNmVXsa0keLHp/3leGJz1mjh09f2RLGGLQZzSHpkmZWuwJbAvo3QcRY8p80m5+ZdXZdYOSBLlp7Cg==", + "license": "MIT", + "peerDependencies": { + "@redis/client": "^1.0.0" + } + }, + "node_modules/@redis/client": { + "version": "1.6.0", + "resolved": "https://registry.npmjs.org/@redis/client/-/client-1.6.0.tgz", + "integrity": "sha512-aR0uffYI700OEEH4gYnitAnv3vzVGXCFvYfdpu/CJKvk4pHfLPEy/JSZyrpQ+15WhXe1yJRXLtfQ84s4mEXnPg==", + "license": "MIT", + "dependencies": { + "cluster-key-slot": "1.1.2", + "generic-pool": "3.9.0", + "yallist": "4.0.0" + }, + "engines": { + "node": ">=14" + } + }, + "node_modules/@redis/graph": { + "version": "1.1.1", + "resolved": "https://registry.npmjs.org/@redis/graph/-/graph-1.1.1.tgz", + "integrity": "sha512-FEMTcTHZozZciLRl6GiiIB4zGm5z5F3F6a6FZCyrfxdKOhFlGkiAqlexWMBzCi4DcRoyiOsuLfW+cjlGWyExOw==", + "license": "MIT", + "peerDependencies": { + "@redis/client": "^1.0.0" + } + }, + "node_modules/@redis/json": { + "version": "1.0.7", + "resolved": "https://registry.npmjs.org/@redis/json/-/json-1.0.7.tgz", + "integrity": "sha512-6UyXfjVaTBTJtKNG4/9Z8PSpKE6XgSyEb8iwaqDcy+uKrd/DGYHTWkUdnQDyzm727V7p21WUMhsqz5oy65kPcQ==", + "license": "MIT", + "peerDependencies": { + "@redis/client": "^1.0.0" + } + }, + "node_modules/@redis/search": { + "version": "1.2.0", + "resolved": "https://registry.npmjs.org/@redis/search/-/search-1.2.0.tgz", + "integrity": "sha512-tYoDBbtqOVigEDMAcTGsRlMycIIjwMCgD8eR2t0NANeQmgK/lvxNAvYyb6bZDD4frHRhIHkJu2TBRvB0ERkOmw==", + "license": "MIT", + "peerDependencies": { + "@redis/client": "^1.0.0" + } + }, + "node_modules/@redis/time-series": { + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/@redis/time-series/-/time-series-1.1.0.tgz", + "integrity": "sha512-c1Q99M5ljsIuc4YdaCwfUEXsofakb9c8+Zse2qxTadu8TalLXuAESzLvFAvNVbkmSlvlzIQOLpBCmWI9wTOt+g==", + "license": "MIT", + "peerDependencies": { + "@redis/client": "^1.0.0" + } + }, "node_modules/@tsconfig/node10": { "version": "1.0.11", "resolved": "https://registry.npmjs.org/@tsconfig/node10/-/node10-1.0.11.tgz", @@ -340,6 +402,16 @@ "@types/send": "*" } }, + "node_modules/@types/ws": { + "version": "8.5.12", + "resolved": "https://registry.npmjs.org/@types/ws/-/ws-8.5.12.tgz", + "integrity": "sha512-3tPRkv1EtkDpzlgyKyI8pGsGZAGPEaXeu0DOj5DI25Ja91bdAYddYHbADRYVrZMRbfW+1l5YwXVDKohDJNQxkQ==", + "dev": true, + "license": "MIT", + "dependencies": { + "@types/node": "*" + } + }, "node_modules/abbrev": { "version": "1.1.1", "resolved": "https://registry.npmjs.org/abbrev/-/abbrev-1.1.1.tgz", @@ -712,6 +784,15 @@ "node": ">=12" } }, + "node_modules/cluster-key-slot": { + "version": "1.1.2", + "resolved": "https://registry.npmjs.org/cluster-key-slot/-/cluster-key-slot-1.1.2.tgz", + "integrity": "sha512-RMr0FhtfXemyinomL4hrWcYJxmX6deFdCxpJzhDttxgO1+bcCnkk+9drydLVDmAMG7NE6aN/fl4F7ucU/90gAA==", + "license": "Apache-2.0", + "engines": { + "node": ">=0.10.0" + } + }, "node_modules/color-convert": { "version": "2.0.1", "resolved": "https://registry.npmjs.org/color-convert/-/color-convert-2.0.1.tgz", @@ -1247,6 +1328,15 @@ "node": ">=10" } }, + "node_modules/generic-pool": { + "version": "3.9.0", + "resolved": "https://registry.npmjs.org/generic-pool/-/generic-pool-3.9.0.tgz", + "integrity": "sha512-hymDOu5B53XvN4QT9dBmZxPX4CWhBPPLguTZ9MMFeFa/Kg0xWVfylOVNlJji/E7yTZWFd/q9GO5TxDLq156D7g==", + "license": "MIT", + "engines": { + "node": ">= 4" + } + }, "node_modules/get-caller-file": { "version": "2.0.5", "resolved": "https://registry.npmjs.org/get-caller-file/-/get-caller-file-2.0.5.tgz", @@ -2164,6 +2254,23 @@ "node": ">=8.10.0" } }, + "node_modules/redis": { + "version": "4.7.0", + "resolved": "https://registry.npmjs.org/redis/-/redis-4.7.0.tgz", + "integrity": "sha512-zvmkHEAdGMn+hMRXuMBtu4Vo5P6rHQjLoHftu+lBqq8ZTA3RCVC/WzD790bkKKiNFp7d5/9PcSD19fJyyRvOdQ==", + "license": "MIT", + "workspaces": [ + "./packages/*" + ], + "dependencies": { + "@redis/bloom": "1.2.0", + "@redis/client": "1.6.0", + "@redis/graph": "1.1.1", + "@redis/json": "1.0.7", + "@redis/search": "1.2.0", + "@redis/time-series": "1.1.0" + } + }, "node_modules/regenerator-runtime": { "version": "0.14.1", "resolved": "https://registry.npmjs.org/regenerator-runtime/-/regenerator-runtime-0.14.1.tgz", @@ -2708,6 +2815,27 @@ "integrity": "sha512-l4Sp/DRseor9wL6EvV2+TuQn63dMkPjZ/sp9XkghTEbV9KlPS1xUsZ3u7/IQO4wxtcFB4bgpQPRcR3QCvezPcQ==", "license": "ISC" }, + "node_modules/ws": { + "version": "8.18.0", + "resolved": "https://registry.npmjs.org/ws/-/ws-8.18.0.tgz", + "integrity": "sha512-8VbfWfHLbbwu3+N6OKsOMpBdT4kXPDDB9cJk2bJ6mh9ucxdlnNvH1e+roYkKmN9Nxw2yjz7VzeO9oOz2zJ04Pw==", + "license": "MIT", + "engines": { + "node": ">=10.0.0" + }, + "peerDependencies": { + "bufferutil": "^4.0.1", + "utf-8-validate": ">=5.0.2" + }, + "peerDependenciesMeta": { + "bufferutil": { + "optional": true + }, + "utf-8-validate": { + "optional": true + } + } + }, "node_modules/y18n": { "version": "5.0.8", "resolved": "https://registry.npmjs.org/y18n/-/y18n-5.0.8.tgz", diff --git a/server/package.json b/server/package.json index 08ce725f..a20e7444 100644 --- a/server/package.json +++ b/server/package.json @@ -17,6 +17,7 @@ "@types/express": "^4.17.21", "@types/jsonwebtoken": "^9.0.6", "@types/node": "^22.0.0", + "@types/ws": "^8.5.12", "concurrently": "^8.2.2", "nodemon": "^3.1.4", "prettier": "^3.3.3", @@ -35,6 +36,8 @@ "jsonwebtoken": "^9.0.2", "kafkajs": "^2.2.4", "postgres": "^3.4.4", - "ulid": "^2.3.0" + "redis": "^4.7.0", + "ulid": "^2.3.0", + "ws": "^8.18.0" } } diff --git a/server/prisma/migrations/20240805122702_add_account_devices/migration.sql b/server/prisma/migrations/20240805122702_add_account_devices/migration.sql deleted file mode 100644 index 391b6929..00000000 --- a/server/prisma/migrations/20240805122702_add_account_devices/migration.sql +++ /dev/null @@ -1,15 +0,0 @@ --- CreateTable -CREATE TABLE "account_devices" ( - "id" VARCHAR(30) NOT NULL, - "account_id" VARCHAR(30) NOT NULL, - "type" INTEGER NOT NULL, - "version" VARCHAR(30) NOT NULL, - "platform" VARCHAR(30), - "cpu" VARCHAR(30), - "hostname" VARCHAR(30), - "created_at" TIMESTAMPTZ(6) NOT NULL, - "last_online_at" TIMESTAMPTZ(6), - "last_active_at" TIMESTAMPTZ(6), - - CONSTRAINT "account_devices_pkey" PRIMARY KEY ("id") -); diff --git a/server/prisma/migrations/20240805101219_init_db/migration.sql b/server/prisma/migrations/20240808134858_init_db/migration.sql similarity index 71% rename from server/prisma/migrations/20240805101219_init_db/migration.sql rename to server/prisma/migrations/20240808134858_init_db/migration.sql index f1c4177e..fd217f17 100644 --- a/server/prisma/migrations/20240805101219_init_db/migration.sql +++ b/server/prisma/migrations/20240808134858_init_db/migration.sql @@ -61,11 +61,41 @@ CREATE TABLE "nodes" ( "updated_at" TIMESTAMPTZ(6), "updated_by" VARCHAR(30), "version_id" VARCHAR(30) NOT NULL, + "server_created_at" TIMESTAMPTZ(6) NOT NULL, + "server_updated_at" TIMESTAMPTZ(6), "state" TEXT, CONSTRAINT "nodes_pkey" PRIMARY KEY ("id") ); +-- CreateTable +CREATE TABLE "account_devices" ( + "id" VARCHAR(30) NOT NULL, + "account_id" VARCHAR(30) NOT NULL, + "type" INTEGER NOT NULL, + "version" VARCHAR(30) NOT NULL, + "platform" VARCHAR(30), + "cpu" VARCHAR(30), + "hostname" VARCHAR(30), + "created_at" TIMESTAMPTZ(6) NOT NULL, + "last_online_at" TIMESTAMPTZ(6), + "last_active_at" TIMESTAMPTZ(6), + + CONSTRAINT "account_devices_pkey" PRIMARY KEY ("id") +); + +-- CreateTable +CREATE TABLE "updates" ( + "id" VARCHAR(30) NOT NULL, + "workspace_id" VARCHAR(30) NOT NULL, + "type" VARCHAR(30) NOT NULL, + "content" JSONB, + "created_at" TIMESTAMPTZ(6) NOT NULL, + "devices" TEXT[], + + CONSTRAINT "updates_pkey" PRIMARY KEY ("id") +); + -- CreateIndex CREATE UNIQUE INDEX "IX_accounts_email" ON "accounts"("email"); diff --git a/server/prisma/schema.prisma b/server/prisma/schema.prisma index fb6d8f86..6734c09f 100644 --- a/server/prisma/schema.prisma +++ b/server/prisma/schema.prisma @@ -55,19 +55,21 @@ model workspaceAccounts { } model nodes { - id String @id @db.VarChar(30) - workspaceId String @db.VarChar(30) @map("workspace_id") - parentId String? @db.VarChar(30) @map("parent_id") - type String @db.VarChar(30) - index String? @db.VarChar(30) - attrs Json? - content Json? - createdAt DateTime @db.Timestamptz(6) @map("created_at") - createdBy String @db.VarChar(30) @map("created_by") - updatedAt DateTime? @db.Timestamptz(6) @map("updated_at") - updatedBy String? @db.VarChar(30) @map("updated_by") - versionId String @db.VarChar(30) @map("version_id") - state String? @db.Text() + id String @id @db.VarChar(30) + workspaceId String @db.VarChar(30) @map("workspace_id") + parentId String? @db.VarChar(30) @map("parent_id") + type String @db.VarChar(30) + index String? @db.VarChar(30) + attrs Json? + content Json? + createdAt DateTime @db.Timestamptz(6) @map("created_at") + createdBy String @db.VarChar(30) @map("created_by") + updatedAt DateTime? @db.Timestamptz(6) @map("updated_at") + updatedBy String? @db.VarChar(30) @map("updated_by") + versionId String @db.VarChar(30) @map("version_id") + serverCreatedAt DateTime @db.Timestamptz(6) @map("server_created_at") + serverUpdatedAt DateTime? @db.Timestamptz(6) @map("server_updated_at") + state String? @db.Text() parent nodes? @relation("node_parent_child", fields: [parentId], references: [id], onDelete: Cascade) children nodes[] @relation("node_parent_child") @@ -77,7 +79,7 @@ model nodes { model accountDevices { id String @id @db.VarChar(30) - account_id String @db.VarChar(30) + accountId String @db.VarChar(30) @map("account_id") type Int version String @db.VarChar(30) @map("version") platform String? @db.VarChar(30) @@ -89,3 +91,14 @@ model accountDevices { @@map("account_devices") } + +model updates { + id String @id @db.VarChar(30) + workspaceId String @db.VarChar(30) @map("workspace_id") + type String @db.VarChar(30) + content Json? + createdAt DateTime @db.Timestamptz(6) @map("created_at") + devices String[] + + @@map("updates") +} diff --git a/server/src/consumers/node-changes.ts b/server/src/consumers/node-changes.ts new file mode 100644 index 00000000..0cb7e93c --- /dev/null +++ b/server/src/consumers/node-changes.ts @@ -0,0 +1,90 @@ +import { kafka, TOPIC_NAMES, CONSUMER_IDS } from '@/data/kafka'; +import { ChangeMessage, NodeChangeData } from '@/types/changes'; +import { prisma } from '@/data/prisma'; +import { NeuronId } from '@/lib/id'; +import { Node } from '@/types/nodes'; + +export const initNodeChangesConsumer = async () => { + const consumer = kafka.consumer({ groupId: CONSUMER_IDS.NODE_CHANGES }); + + await consumer.connect(); + await consumer.subscribe({ topic: TOPIC_NAMES.NODE_CHANGES }); + + await consumer.run({ + eachMessage: async ({ message }) => { + if (!message || !message.value) { + return; + } + + const change = JSON.parse( + message.value.toString(), + ) as ChangeMessage; + + await handleNodeChange(change); + }, + }); +}; + +const handleNodeChange = async (change: ChangeMessage) => { + const changeData = change.after; + if (!changeData) { + return; + } + + const workspaceAccounts = await prisma.workspaceAccounts.findMany({ + where: { + workspaceId: changeData.workspace_id, + }, + }); + + if (workspaceAccounts.length === 0) { + return; + } + + const accountIds = workspaceAccounts.map((account) => account.accountId); + const accountDevices = await prisma.accountDevices.findMany({ + where: { + accountId: { + in: accountIds, + }, + }, + }); + + if (accountDevices.length === 0) { + return; + } + + const deviceIds = accountDevices.map((device) => device.id); + const node: Node = { + id: changeData.id, + workspaceId: changeData.workspace_id, + parentId: changeData.parent_id, + type: changeData.type, + index: changeData.index, + attrs: changeData.attrs ? JSON.parse(changeData.attrs) : null, + content: changeData.content ? JSON.parse(changeData.content) : null, + createdAt: new Date(changeData.created_at), + createdBy: changeData.created_by, + updatedAt: changeData.updated_at ? new Date(changeData.updated_at) : null, + updatedBy: changeData.updated_by, + versionId: changeData.version_id, + serverCreatedAt: new Date(changeData.server_created_at), + serverUpdatedAt: changeData.server_updated_at + ? new Date(changeData.server_updated_at) + : null, + state: changeData.state, + }; + + await prisma.updates.create({ + data: { + id: NeuronId.generate(NeuronId.Type.Update), + workspaceId: changeData.workspace_id, + devices: deviceIds, + type: 'node_sync', + content: JSON.stringify(node), + createdAt: new Date(), + }, + }); +}; + +const handelNodeDelete = async (id: string) => {}; diff --git a/server/src/consumers/transactions.ts b/server/src/consumers/transactions.ts index 3b0cdd16..9b048e61 100644 --- a/server/src/consumers/transactions.ts +++ b/server/src/consumers/transactions.ts @@ -1,4 +1,4 @@ -import { kafka, TOPIC_NAMES, KAFKA_CONSUMER_GROUP } from '@/data/kafka'; +import { kafka, TOPIC_NAMES, CONSUMER_IDS } from '@/data/kafka'; import { CreateNodeTransactionInput, Transaction, @@ -8,7 +8,7 @@ import { prisma } from '@/data/prisma'; import { Prisma } from '@prisma/client'; export const initTransactionsConsumer = async () => { - const consumer = kafka.consumer({ groupId: KAFKA_CONSUMER_GROUP }); + const consumer = kafka.consumer({ groupId: CONSUMER_IDS.TRANSACTIONS }); await consumer.connect(); await consumer.subscribe({ topic: TOPIC_NAMES.TRANSACTIONS }); @@ -55,6 +55,7 @@ const handleCreateNodeTransaction = async (transaction: Transaction) => { createdAt: input.createdAt, createdBy: input.createdBy, versionId: input.versionId, + serverCreatedAt: new Date(), }; if (input.attrs !== undefined && input.attrs !== null) { @@ -82,6 +83,7 @@ const handleCreateNodesTransaction = async (transaction: Transaction) => { createdAt: node.createdAt, createdBy: node.createdBy, versionId: node.versionId, + serverCreatedAt: new Date(), }; if (node.attrs !== undefined && node.attrs !== null) { @@ -118,6 +120,7 @@ const handleUpdateNodeTransaction = async (transaction: Transaction) => { updatedAt: input.updatedAt, updatedBy: input.updatedBy, versionId: input.versionId, + serverUpdatedAt: new Date(), }; if (input.attrs !== undefined) { diff --git a/server/src/consumers/update-changes.ts b/server/src/consumers/update-changes.ts new file mode 100644 index 00000000..4a6e1da8 --- /dev/null +++ b/server/src/consumers/update-changes.ts @@ -0,0 +1,46 @@ +import { kafka, TOPIC_NAMES, CONSUMER_IDS } from '@/data/kafka'; +import { ChangeMessage, UpdateChangeData } from '@/types/changes'; +import { redis, CHANNEL_NAMES } from '@/data/redis'; +import { Update } from '@/types/updates'; + +export const initUpdateChangesConsumer = async () => { + const consumer = kafka.consumer({ groupId: CONSUMER_IDS.UPDATE_CHANGES }); + + await consumer.connect(); + await consumer.subscribe({ topic: TOPIC_NAMES.UPDATE_CHANGES }); + + await consumer.run({ + eachMessage: async ({ message }) => { + if (!message || !message.value) { + return; + } + + const change = JSON.parse( + message.value.toString(), + ) as ChangeMessage; + + await handleUpdateChange(change); + }, + }); +}; + +const handleUpdateChange = async (change: ChangeMessage) => { + const changeData = change.after; + if (!changeData) { + return; + } + + const devices = changeData.devices; + for (const deviceId of devices) { + const update: Update = { + id: changeData.id, + deviceId: deviceId, + type: changeData.type, + content: changeData.content, + workspaceId: changeData.workspace_id, + createdAt: changeData.created_at, + }; + + await redis.publish(CHANNEL_NAMES.UPDATES, JSON.stringify(update)); + } +}; diff --git a/server/src/consumers/updates.ts b/server/src/consumers/updates.ts new file mode 100644 index 00000000..6fba3353 --- /dev/null +++ b/server/src/consumers/updates.ts @@ -0,0 +1,19 @@ +import { redis, CHANNEL_NAMES } from '@/data/redis'; +import { sockets } from '@/lib/sockets'; +import { Update } from '@/types/updates'; + +export const initUpdatesSubscriber = async () => { + const subscriber = redis.duplicate(); + await subscriber.connect(); + await subscriber.subscribe(CHANNEL_NAMES.UPDATES, handleMessage); +}; + +const handleMessage = async (_: string, message: string) => { + const update = JSON.parse(message) as Update; + const socket = sockets.getSocket(update.deviceId); + if (!socket) { + return; + } + + socket.send(JSON.stringify(update)); +}; diff --git a/server/src/data/kafka.ts b/server/src/data/kafka.ts index b2308f73..41982211 100644 --- a/server/src/data/kafka.ts +++ b/server/src/data/kafka.ts @@ -4,10 +4,6 @@ const KAFKA_CLIENT_ID = process.env.KAFKA_CLIENT_ID ?? 'neuron'; const KAFKA_BROKERS = process.env.KAFKA_BROKERS ?? ''; const KAFKA_USERNAME = process.env.KAFKA_USERNAME; const KAFKA_PASSWORD = process.env.KAFKA_PASSWORD; -const KAFKA_TRANSACTIONS_TOPIC_NAME = - process.env.KAFKA_TRANSACTIONS_TOPIC_NAME ?? 'neuron_transactions'; -export const KAFKA_CONSUMER_GROUP = - process.env.KAFKA_CONSUMER_GROUP ?? 'neuron'; export const kafka = new Kafka({ clientId: KAFKA_CLIENT_ID, @@ -25,7 +21,24 @@ export const kafka = new Kafka({ export const producer = kafka.producer(); export const TOPIC_NAMES = { - TRANSACTIONS: KAFKA_TRANSACTIONS_TOPIC_NAME, + TRANSACTIONS: + process.env.KAFKA_TRANSACTIONS_TOPIC_NAME ?? 'neuron_transactions', + NODE_CHANGES: + process.env.KAFKA_NODE_CHANGES_TOPIC_NAME ?? 'neuron_node_changes', + UPDATE_CHANGES: + process.env.KAFKA_UPDATE_CHANGES_TOPIC_NAME ?? 'neuron_update_changes', +}; + +export const CONSUMER_IDS = { + TRANSACTIONS: + process.env.KAFKA_TRANSACTIONS_CONSUMER_ID ?? + 'neuron_transactions_consumer', + NODE_CHANGES: + process.env.KAFKA_NODE_CHANGES_CONSUMER_ID ?? + 'neuron_node_changes_consumer', + UPDATE_CHANGES: + process.env.KAFKA_UPDATE_CHANGES_CONSUMER_ID ?? + 'neuron_update_changes_consumer', }; const connectProducer = async () => { diff --git a/server/src/data/redis.ts b/server/src/data/redis.ts new file mode 100644 index 00000000..e0873f47 --- /dev/null +++ b/server/src/data/redis.ts @@ -0,0 +1,18 @@ +import { createClient } from 'redis'; + +const REDIS_URL = process.env.REDIS_URL || ''; +export const redis = createClient({ + url: REDIS_URL, +}); + +export const initRedis = async () => { + await redis.connect(); + + redis.on('error', (err) => { + console.error('Redis client error:', err); + }); +}; + +export const CHANNEL_NAMES = { + UPDATES: process.env.REDIS_UPDATES_CHANNEL_NAME || 'neuron_updates', +}; diff --git a/server/src/index.ts b/server/src/index.ts index d8599bf2..49f06386 100644 --- a/server/src/index.ts +++ b/server/src/index.ts @@ -1,7 +1,24 @@ import { initApi } from '@/api'; +import { initRedis } from '@/data/redis'; import { initTransactionsConsumer } from '@/consumers/transactions'; +import { initNodeChangesConsumer } from '@/consumers/node-changes'; +import { initUpdateChangesConsumer } from '@/consumers/update-changes'; +import { initUpdatesSubscriber } from '@/consumers/updates'; initApi(); initTransactionsConsumer().then(() => { console.log('Transactions consumer started'); }); +initNodeChangesConsumer().then(() => { + console.log('Node changes consumer started'); +}); +initUpdateChangesConsumer().then(() => { + console.log('Update changes consumer started'); +}); + +initRedis().then(() => { + console.log('Redis initialized'); + // initUpdatesSubscriber().then(() => { + // console.log('Updates subscriber started'); + // }); +}); diff --git a/server/src/lib/id.ts b/server/src/lib/id.ts index c6b7e412..36d0b2f3 100644 --- a/server/src/lib/id.ts +++ b/server/src/lib/id.ts @@ -14,6 +14,7 @@ enum IdType { Node = 'nd', Message = 'ms', Device = 'dv', + Update = 'up', } export class NeuronId { @@ -47,6 +48,12 @@ export class NeuronId { return IdType.Page; case 'channel': return IdType.Channel; + case 'message': + return IdType.Message; + case 'device': + return IdType.Device; + case 'update': + return IdType.Update; default: return IdType.Node; } diff --git a/server/src/lib/jwt.ts b/server/src/lib/jwt.ts new file mode 100644 index 00000000..0e6f7a11 --- /dev/null +++ b/server/src/lib/jwt.ts @@ -0,0 +1,34 @@ +import jwt from 'jsonwebtoken'; + +const JwtSecretKey = process.env.JWT_SECRET ?? ''; +const JwtAudience = process.env.JWT_AUDIENCE ?? ''; +const JwtIssuer = process.env.JWT_ISSUER ?? ''; + +export type JwtPayload = { + id: string; + name: string; + email: string; +}; + +export const createJwtToken = (payload: JwtPayload): string => { + const signOptions: jwt.SignOptions = { + issuer: JwtIssuer, + audience: JwtAudience, + subject: payload.id, + }; + + return jwt.sign(payload, JwtSecretKey, signOptions); +}; + +export const verifyJwtToken = (token: string): JwtPayload | null => { + try { + const decoded = jwt.verify(token, JwtSecretKey, { + issuer: JwtIssuer, + audience: JwtAudience, + }); + + return decoded as JwtPayload; + } catch (err) { + return null; + } +}; diff --git a/server/src/lib/sockets.ts b/server/src/lib/sockets.ts new file mode 100644 index 00000000..31cdeaee --- /dev/null +++ b/server/src/lib/sockets.ts @@ -0,0 +1,19 @@ +import { WebSocket } from 'ws'; + +class SocketConnections { + private sockets: Map = new Map(); + + public addSocket(deviceId: string, ws: WebSocket) { + this.sockets.set(deviceId, ws); + } + + public removeSocket(deviceId: string) { + this.sockets.delete(deviceId); + } + + public getSocket(deviceId: string): WebSocket | undefined { + return this.sockets.get(deviceId); + } +} + +export const sockets = new SocketConnections(); diff --git a/server/src/middlewares/auth.ts b/server/src/middlewares/auth.ts index 8e0b80aa..df37e187 100644 --- a/server/src/middlewares/auth.ts +++ b/server/src/middlewares/auth.ts @@ -1,14 +1,10 @@ -import jwt from 'jsonwebtoken'; import { ApiError, NeuronNextFunction, NeuronRequest, NeuronResponse, } from '@/types/api'; - -const JwtSecretKey = process.env.JWT_SECRET ?? ''; -const JwtAudience = process.env.JWT_AUDIENCE ?? ''; -const JwtIssuer = process.env.JWT_ISSUER ?? ''; +import { verifyJwtToken } from '@/lib/jwt'; export const authMiddleware = ( req: NeuronRequest, @@ -24,18 +20,14 @@ export const authMiddleware = ( }); } - try { - const decoded = jwt.verify(token, JwtSecretKey, { - issuer: JwtIssuer, - audience: JwtAudience, - }); - - req.accountId = decoded.sub as string; - next(); - } catch (err) { - res.status(400).json({ + const payload = verifyJwtToken(token); + if (!payload) { + return res.status(400).json({ code: ApiError.Unauthorized, message: 'Invalid Token', }); } + + req.accountId = payload.id as string; + next(); }; diff --git a/server/src/types/changes.ts b/server/src/types/changes.ts new file mode 100644 index 00000000..77e0f478 --- /dev/null +++ b/server/src/types/changes.ts @@ -0,0 +1,53 @@ +export type ChangeMessage = { + before?: T | null; + after: T; + source: ChangeSource; + op: string; + ts_ms: number; + ts_ns: number; + ts_us: number; + transaction: any; +}; + +type ChangeSource = { + version: string; + connector: string; + name: string; + ts_ms: number; + snapshot: string; + db: string; + sequence: string; + ts_us: number; + ts_ns: number; + schema: string; + table: string; + txId: number; + lsn: number; +}; + +export type NodeChangeData = { + id: string; + workspace_id: string; + parent_id?: string | null; + type: string; + index: string | null; + attrs?: string | null; + content?: string | null; + created_at: string; + created_by: string; + updated_at?: string | null; + updated_by?: string | null; + version_id: string; + server_created_at: string; + server_updated_at?: string | null; + state: string | null; +}; + +export type UpdateChangeData = { + id: string; + workspace_id: string; + devices: string[]; + type: string; + content: string; + created_at: string; +}; diff --git a/server/src/types/updates.ts b/server/src/types/updates.ts new file mode 100644 index 00000000..5795f05a --- /dev/null +++ b/server/src/types/updates.ts @@ -0,0 +1,8 @@ +export type Update = { + id: string; + workspaceId: string; + deviceId: string; + type: string; + content: string; + createdAt: string; +};