From 78402bbf64af3c05c28d51458a7e357723fd2a12 Mon Sep 17 00:00:00 2001 From: Hakan Shehu Date: Mon, 28 Oct 2024 21:47:32 +0100 Subject: [PATCH] Table renames --- desktop/src/main/data/workspace/migrations.ts | 13 +-- desktop/src/main/data/workspace/schema.ts | 13 +-- desktop/src/main/handlers/messages/index.ts | 8 +- ...-state-sync.ts => local-user-node-sync.ts} | 8 +- ...state-sync.ts => server-user-node-sync.ts} | 10 +-- desktop/src/main/handlers/mutations/index.ts | 4 +- .../handlers/mutations/mark-node-as-seen.ts | 10 +-- ...state-sync.ts => server-user-node-sync.ts} | 26 +++--- .../handlers/queries/sidebar-chat-list.ts | 14 +-- .../handlers/queries/sidebar-space-list.ts | 14 +-- .../messages/local-node-user-state-sync.ts | 13 --- .../messages/local-user-node-sync.ts | 13 +++ ...state-sync.ts => server-user-node-sync.ts} | 6 +- ...state-sync.ts => server-user-node-sync.ts} | 12 +-- desktop/src/types/sync.ts | 6 +- server/src/data/migrations.ts | 32 ++++--- server/src/data/schema.ts | 27 +++--- server/src/queues/events.ts | 40 ++++----- server/src/queues/tasks.ts | 2 +- server/src/routes/sync.ts | 4 +- server/src/services/synapse.ts | 88 +++++++++---------- server/src/types/events.ts | 17 ---- server/src/types/messages.ts | 14 +-- server/src/types/synapse.ts | 16 ++++ 24 files changed, 202 insertions(+), 208 deletions(-) rename desktop/src/main/handlers/messages/{local-node-user-state-sync.ts => local-user-node-sync.ts} (51%) rename desktop/src/main/handlers/messages/{server-node-user-state-sync.ts => server-user-node-sync.ts} (66%) rename desktop/src/main/handlers/mutations/{server-node-user-state-sync.ts => server-user-node-sync.ts} (76%) delete mode 100644 desktop/src/operations/messages/local-node-user-state-sync.ts create mode 100644 desktop/src/operations/messages/local-user-node-sync.ts rename desktop/src/operations/messages/{server-node-user-state-sync.ts => server-user-node-sync.ts} (64%) rename desktop/src/operations/mutations/{server-node-user-state-sync.ts => server-user-node-sync.ts} (55%) create mode 100644 server/src/types/synapse.ts diff --git a/desktop/src/main/data/workspace/migrations.ts b/desktop/src/main/data/workspace/migrations.ts index 7cdd453c..8611d906 100644 --- a/desktop/src/main/data/workspace/migrations.ts +++ b/desktop/src/main/data/workspace/migrations.ts @@ -44,27 +44,28 @@ const createNodesTable: Migration = { }, }; -const createNodeUserStatesTable: Migration = { +const createUserNodesTable: Migration = { up: async (db) => { await db.schema - .createTable('node_user_states') + .createTable('user_nodes') + .addColumn('user_id', 'text', (col) => col.notNull()) .addColumn('node_id', 'text', (col) => col.notNull().references('nodes.id'), ) - .addColumn('user_id', 'text', (col) => col.notNull()) .addColumn('last_seen_version_id', 'text') .addColumn('last_seen_at', 'text') .addColumn('mentions_count', 'integer', (col) => col.notNull().defaultTo(0), ) + .addColumn('attributes', 'text') .addColumn('version_id', 'text', (col) => col.notNull()) .addColumn('created_at', 'text', (col) => col.notNull()) .addColumn('updated_at', 'text') - .addPrimaryKeyConstraint('node_user_states_pk', ['node_id', 'user_id']) + .addPrimaryKeyConstraint('user_nodes_pk', ['user_id', 'node_id']) .execute(); }, down: async (db) => { - await db.schema.dropTable('node_user_states').execute(); + await db.schema.dropTable('user_nodes').execute(); }, }; @@ -167,7 +168,7 @@ const createDownloadsTable: Migration = { export const workspaceDatabaseMigrations: Record = { '00001_create_nodes_table': createNodesTable, - '00002_create_node_user_states_table': createNodeUserStatesTable, + '00002_create_user_nodes_table': createUserNodesTable, '00003_create_changes_table': createChangesTable, '00004_create_node_names_table': createNodeNamesTable, '00005_create_uploads_table': createUploadsTable, diff --git a/desktop/src/main/data/workspace/schema.ts b/desktop/src/main/data/workspace/schema.ts index 1cc73ebe..4e9f5783 100644 --- a/desktop/src/main/data/workspace/schema.ts +++ b/desktop/src/main/data/workspace/schema.ts @@ -21,20 +21,21 @@ export type SelectNode = Selectable; export type CreateNode = Insertable; export type UpdateNode = Updateable; -interface NodeUserStateTable { - node_id: ColumnType; +interface UserNodeTable { user_id: ColumnType; + node_id: ColumnType; last_seen_version_id: ColumnType; last_seen_at: ColumnType; mentions_count: ColumnType; + attributes: ColumnType; version_id: ColumnType; created_at: ColumnType; updated_at: ColumnType; } -export type SelectNodeUserState = Selectable; -export type CreateNodeUserState = Insertable; -export type UpdateNodeUserState = Updateable; +export type SelectUserNode = Selectable; +export type CreateUserNode = Insertable; +export type UpdateUserNode = Updateable; interface ChangeTable { id: ColumnType; @@ -73,7 +74,7 @@ export type UpdateDownload = Updateable; export interface WorkspaceDatabaseSchema { nodes: NodeTable; - node_user_states: NodeUserStateTable; + user_nodes: UserNodeTable; changes: ChangeTable; uploads: UploadTable; downloads: DownloadTable; diff --git a/desktop/src/main/handlers/messages/index.ts b/desktop/src/main/handlers/messages/index.ts index 153069a6..c417e0cc 100644 --- a/desktop/src/main/handlers/messages/index.ts +++ b/desktop/src/main/handlers/messages/index.ts @@ -3,8 +3,8 @@ import { LocalNodeSyncMessageHandler } from '@/main/handlers/messages/local-node import { LocalNodeDeleteMessageHandler } from '@/main/handlers/messages/local-node-delete'; import { ServerNodeSyncMessageHandler } from '@/main/handlers/messages/server-node-sync'; import { ServerNodeDeleteMessageHandler } from '@/main/handlers/messages/server-node-delete'; -import { ServerNodeUserStateSyncMessageHandler } from '@/main/handlers/messages/server-node-user-state-sync'; -import { LocalNodeUserStateSyncMessageHandler } from '@/main/handlers/messages/local-node-user-state-sync'; +import { ServerUserNodeSyncMessageHandler } from '@/main/handlers/messages/server-user-node-sync'; +import { LocalUserNodeSyncMessageHandler } from '@/main/handlers/messages/local-user-node-sync'; type MessageHandlerMap = { [K in keyof MessageMap]: MessageHandler; @@ -15,6 +15,6 @@ export const messageHandlerMap: MessageHandlerMap = { local_node_delete: new LocalNodeDeleteMessageHandler(), server_node_sync: new ServerNodeSyncMessageHandler(), server_node_delete: new ServerNodeDeleteMessageHandler(), - server_node_user_state_sync: new ServerNodeUserStateSyncMessageHandler(), - local_node_user_state_sync: new LocalNodeUserStateSyncMessageHandler(), + server_user_node_sync: new ServerUserNodeSyncMessageHandler(), + local_user_node_sync: new LocalUserNodeSyncMessageHandler(), }; diff --git a/desktop/src/main/handlers/messages/local-node-user-state-sync.ts b/desktop/src/main/handlers/messages/local-user-node-sync.ts similarity index 51% rename from desktop/src/main/handlers/messages/local-node-user-state-sync.ts rename to desktop/src/main/handlers/messages/local-user-node-sync.ts index a77adc39..a5835f3f 100644 --- a/desktop/src/main/handlers/messages/local-node-user-state-sync.ts +++ b/desktop/src/main/handlers/messages/local-user-node-sync.ts @@ -1,13 +1,13 @@ import { MessageContext, MessageHandler } from '@/operations/messages'; -import { LocalNodeUserStateSyncMessageInput } from '@/operations/messages/local-node-user-state-sync'; +import { LocalUserNodeSyncMessageInput } from '@/operations/messages/local-user-node-sync'; import { socketManager } from '@/main/sockets/socket-manager'; -export class LocalNodeUserStateSyncMessageHandler - implements MessageHandler +export class LocalUserNodeSyncMessageHandler + implements MessageHandler { public async handleMessage( context: MessageContext, - input: LocalNodeUserStateSyncMessageInput, + input: LocalUserNodeSyncMessageInput, ): Promise { socketManager.sendMessage(context.accountId, input); } diff --git a/desktop/src/main/handlers/messages/server-node-user-state-sync.ts b/desktop/src/main/handlers/messages/server-user-node-sync.ts similarity index 66% rename from desktop/src/main/handlers/messages/server-node-user-state-sync.ts rename to desktop/src/main/handlers/messages/server-user-node-sync.ts index 03c1e019..59597c31 100644 --- a/desktop/src/main/handlers/messages/server-node-user-state-sync.ts +++ b/desktop/src/main/handlers/messages/server-user-node-sync.ts @@ -1,16 +1,16 @@ import { MessageContext, MessageHandler } from '@/operations/messages'; -import { ServerNodeUserStateSyncMessageInput } from '@/operations/messages/server-node-user-state-sync'; +import { ServerUserNodeSyncMessageInput } from '@/operations/messages/server-user-node-sync'; import { mediator } from '@/main/mediator'; -export class ServerNodeUserStateSyncMessageHandler - implements MessageHandler +export class ServerUserNodeSyncMessageHandler + implements MessageHandler { public async handleMessage( context: MessageContext, - input: ServerNodeUserStateSyncMessageInput, + input: ServerUserNodeSyncMessageInput, ): Promise { await mediator.executeMutation({ - type: 'server_node_user_state_sync', + type: 'server_user_node_sync', accountId: context.accountId, nodeId: input.nodeId, userId: input.userId, diff --git a/desktop/src/main/handlers/mutations/index.ts b/desktop/src/main/handlers/mutations/index.ts index a5d9286d..9a664a6c 100644 --- a/desktop/src/main/handlers/mutations/index.ts +++ b/desktop/src/main/handlers/mutations/index.ts @@ -36,7 +36,7 @@ import { FileCreateMutationHandler } from '@/main/handlers/mutations/file-create import { FileDownloadMutationHandler } from '@/main/handlers/mutations/file-download'; import { SpaceUpdateMutationHandler } from '@/main/handlers/mutations/space-update'; import { AccountUpdateMutationHandler } from '@/main/handlers/mutations/account-update'; -import { ServerNodeUserStateSyncMutationHandler } from '@/main/handlers/mutations/server-node-user-state-sync'; +import { ServerUserNodeSyncMutationHandler } from '@/main/handlers/mutations/server-user-node-sync'; import { MarkNodeAsSeenMutationHandler } from '@/main/handlers/mutations/mark-node-as-seen'; type MutationHandlerMap = { @@ -81,6 +81,6 @@ export const mutationHandlerMap: MutationHandlerMap = { file_download: new FileDownloadMutationHandler(), space_update: new SpaceUpdateMutationHandler(), account_update: new AccountUpdateMutationHandler(), - server_node_user_state_sync: new ServerNodeUserStateSyncMutationHandler(), + server_user_node_sync: new ServerUserNodeSyncMutationHandler(), mark_node_as_seen: new MarkNodeAsSeenMutationHandler(), }; diff --git a/desktop/src/main/handlers/mutations/mark-node-as-seen.ts b/desktop/src/main/handlers/mutations/mark-node-as-seen.ts index 355c9775..398e722a 100644 --- a/desktop/src/main/handlers/mutations/mark-node-as-seen.ts +++ b/desktop/src/main/handlers/mutations/mark-node-as-seen.ts @@ -2,7 +2,7 @@ import { generateId, IdType } from '@/lib/id'; import { databaseManager } from '@/main/data/database-manager'; import { MutationHandler, MutationResult } from '@/operations/mutations'; import { MarkNodeAsSeenMutationInput } from '@/operations/mutations/mark-node-as-seen'; -import { LocalNodeUserStateChangeData } from '@/types/sync'; +import { LocalUserNodeChangeData } from '@/types/sync'; export class MarkNodeAsSeenMutationHandler implements MutationHandler @@ -14,8 +14,8 @@ export class MarkNodeAsSeenMutationHandler input.userId, ); - const changeData: LocalNodeUserStateChangeData = { - type: 'node_user_state_update', + const changeData: LocalUserNodeChangeData = { + type: 'user_node_update', nodeId: input.nodeId, userId: input.userId, lastSeenVersionId: input.versionId, @@ -26,7 +26,7 @@ export class MarkNodeAsSeenMutationHandler await workspaceDatabase.transaction().execute(async (trx) => { await trx - .updateTable('node_user_states') + .updateTable('user_nodes') .set({ last_seen_version_id: input.versionId, last_seen_at: new Date().toISOString(), @@ -53,7 +53,7 @@ export class MarkNodeAsSeenMutationHandler changes: [ { type: 'workspace', - table: 'node_user_states', + table: 'user_nodes', userId: input.userId, }, { diff --git a/desktop/src/main/handlers/mutations/server-node-user-state-sync.ts b/desktop/src/main/handlers/mutations/server-user-node-sync.ts similarity index 76% rename from desktop/src/main/handlers/mutations/server-node-user-state-sync.ts rename to desktop/src/main/handlers/mutations/server-user-node-sync.ts index 5b42b3f0..7530362f 100644 --- a/desktop/src/main/handlers/mutations/server-node-user-state-sync.ts +++ b/desktop/src/main/handlers/mutations/server-user-node-sync.ts @@ -1,18 +1,14 @@ import { databaseManager } from '@/main/data/database-manager'; import { socketManager } from '@/main/sockets/socket-manager'; -import { - MutationChange, - MutationHandler, - MutationResult, -} from '@/operations/mutations'; -import { ServerNodeUserStateSyncMutationInput } from '@/operations/mutations/server-node-user-state-sync'; +import { MutationHandler, MutationResult } from '@/operations/mutations'; +import { ServerUserNodeSyncMutationInput } from '@/operations/mutations/server-user-node-sync'; -export class ServerNodeUserStateSyncMutationHandler - implements MutationHandler +export class ServerUserNodeSyncMutationHandler + implements MutationHandler { public async handleMutation( - input: ServerNodeUserStateSyncMutationInput, - ): Promise> { + input: ServerUserNodeSyncMutationInput, + ): Promise> { const workspace = await databaseManager.appDatabase .selectFrom('workspaces') .selectAll() @@ -37,10 +33,10 @@ export class ServerNodeUserStateSyncMutationHandler await databaseManager.getWorkspaceDatabase(userId); await workspaceDatabase - .insertInto('node_user_states') + .insertInto('user_nodes') .values({ - node_id: input.nodeId, user_id: input.userId, + node_id: input.nodeId, version_id: input.versionId, last_seen_at: input.lastSeenAt, last_seen_version_id: input.lastSeenVersionId, @@ -60,11 +56,11 @@ export class ServerNodeUserStateSyncMutationHandler .execute(); socketManager.sendMessage(workspace.account_id, { - type: 'local_node_user_state_sync', + type: 'local_user_node_sync', + userId: input.userId, nodeId: input.nodeId, versionId: input.versionId, workspaceId: input.workspaceId, - userId: input.userId, }); return { @@ -74,7 +70,7 @@ export class ServerNodeUserStateSyncMutationHandler changes: [ { type: 'workspace', - table: 'node_user_states', + table: 'user_nodes', userId: userId, }, ], diff --git a/desktop/src/main/handlers/queries/sidebar-chat-list.ts b/desktop/src/main/handlers/queries/sidebar-chat-list.ts index 8cd34370..b5264ebc 100644 --- a/desktop/src/main/handlers/queries/sidebar-chat-list.ts +++ b/desktop/src/main/handlers/queries/sidebar-chat-list.ts @@ -52,7 +52,7 @@ export class SidebarChatListQueryHandler !changes.some( (change) => change.type === 'workspace' && - (change.table === 'nodes' || change.table === 'node_user_states') && + (change.table === 'nodes' || change.table === 'user_nodes') && change.userId === input.userId, ) ) { @@ -157,16 +157,16 @@ export class SidebarChatListQueryHandler const chatIds = chats.map((chat) => chat.id); const unreadCounts = await workspaceDatabase - .selectFrom('node_user_states as nus') - .innerJoin('nodes as n', 'nus.node_id', 'n.id') - .where('nus.user_id', '=', input.userId) + .selectFrom('user_nodes as un') + .innerJoin('nodes as n', 'un.node_id', 'n.id') + .where('un.user_id', '=', input.userId) .where('n.type', '=', NodeTypes.Message) .where('n.parent_id', 'in', chatIds) - .where('nus.last_seen_version_id', 'is', null) + .where('un.last_seen_version_id', 'is', null) .select(['n.parent_id as node_id']) .select((eb) => [ - eb.fn.count('nus.node_id').as('unread_count'), - eb.fn.sum('nus.mentions_count').as('mentions_count'), + eb.fn.count('un.node_id').as('unread_count'), + eb.fn.sum('un.mentions_count').as('mentions_count'), ]) .groupBy('n.parent_id') .execute(); diff --git a/desktop/src/main/handlers/queries/sidebar-space-list.ts b/desktop/src/main/handlers/queries/sidebar-space-list.ts index 74ac5fb5..9d340e34 100644 --- a/desktop/src/main/handlers/queries/sidebar-space-list.ts +++ b/desktop/src/main/handlers/queries/sidebar-space-list.ts @@ -48,7 +48,7 @@ export class SidebarSpaceListQueryHandler !changes.some( (change) => change.type === 'workspace' && - (change.table === 'nodes' || change.table === 'node_user_states') && + (change.table === 'nodes' || change.table === 'user_nodes') && change.userId === input.userId, ) ) { @@ -119,16 +119,16 @@ export class SidebarSpaceListQueryHandler .map((r) => r.id); const unreadCounts = await workspaceDatabase - .selectFrom('node_user_states as nus') - .innerJoin('nodes as n', 'nus.node_id', 'n.id') - .where('nus.user_id', '=', input.userId) + .selectFrom('user_nodes as un') + .innerJoin('nodes as n', 'un.node_id', 'n.id') + .where('un.user_id', '=', input.userId) .where('n.type', '=', NodeTypes.Message) .where('n.parent_id', 'in', channelIds) - .where('nus.last_seen_version_id', 'is', null) + .where('un.last_seen_version_id', 'is', null) .select(['n.parent_id as node_id']) .select((eb) => [ - eb.fn.count('nus.node_id').as('unread_count'), - eb.fn.sum('nus.mentions_count').as('mentions_count'), + eb.fn.count('un.node_id').as('unread_count'), + eb.fn.sum('un.mentions_count').as('mentions_count'), ]) .groupBy('n.parent_id') .execute(); diff --git a/desktop/src/operations/messages/local-node-user-state-sync.ts b/desktop/src/operations/messages/local-node-user-state-sync.ts deleted file mode 100644 index 0a7f0030..00000000 --- a/desktop/src/operations/messages/local-node-user-state-sync.ts +++ /dev/null @@ -1,13 +0,0 @@ -export type LocalNodeUserStateSyncMessageInput = { - type: 'local_node_user_state_sync'; - nodeId: string; - userId: string; - workspaceId: string; - versionId: string; -}; - -declare module '@/operations/messages' { - interface MessageMap { - local_node_user_state_sync: LocalNodeUserStateSyncMessageInput; - } -} diff --git a/desktop/src/operations/messages/local-user-node-sync.ts b/desktop/src/operations/messages/local-user-node-sync.ts new file mode 100644 index 00000000..1d74bbf9 --- /dev/null +++ b/desktop/src/operations/messages/local-user-node-sync.ts @@ -0,0 +1,13 @@ +export type LocalUserNodeSyncMessageInput = { + type: 'local_user_node_sync'; + nodeId: string; + userId: string; + workspaceId: string; + versionId: string; +}; + +declare module '@/operations/messages' { + interface MessageMap { + local_user_node_sync: LocalUserNodeSyncMessageInput; + } +} diff --git a/desktop/src/operations/messages/server-node-user-state-sync.ts b/desktop/src/operations/messages/server-user-node-sync.ts similarity index 64% rename from desktop/src/operations/messages/server-node-user-state-sync.ts rename to desktop/src/operations/messages/server-user-node-sync.ts index da56eaac..d5cabdb9 100644 --- a/desktop/src/operations/messages/server-node-user-state-sync.ts +++ b/desktop/src/operations/messages/server-user-node-sync.ts @@ -1,5 +1,5 @@ -export type ServerNodeUserStateSyncMessageInput = { - type: 'server_node_user_state_sync'; +export type ServerUserNodeSyncMessageInput = { + type: 'server_user_node_sync'; nodeId: string; userId: string; workspaceId: string; @@ -13,6 +13,6 @@ export type ServerNodeUserStateSyncMessageInput = { declare module '@/operations/messages' { interface MessageMap { - server_node_user_state_sync: ServerNodeUserStateSyncMessageInput; + server_user_node_sync: ServerUserNodeSyncMessageInput; } } diff --git a/desktop/src/operations/mutations/server-node-user-state-sync.ts b/desktop/src/operations/mutations/server-user-node-sync.ts similarity index 55% rename from desktop/src/operations/mutations/server-node-user-state-sync.ts rename to desktop/src/operations/mutations/server-user-node-sync.ts index 90be3c30..bf2eb6c6 100644 --- a/desktop/src/operations/mutations/server-node-user-state-sync.ts +++ b/desktop/src/operations/mutations/server-user-node-sync.ts @@ -1,5 +1,5 @@ -export type ServerNodeUserStateSyncMutationInput = { - type: 'server_node_user_state_sync'; +export type ServerUserNodeSyncMutationInput = { + type: 'server_user_node_sync'; accountId: string; nodeId: string; userId: string; @@ -12,15 +12,15 @@ export type ServerNodeUserStateSyncMutationInput = { updatedAt: string | null; }; -export type ServerNodeUserStateSyncMutationOutput = { +export type ServerUserNodeSyncMutationOutput = { success: boolean; }; declare module '@/operations/mutations' { interface MutationMap { - server_node_user_state_sync: { - input: ServerNodeUserStateSyncMutationInput; - output: ServerNodeUserStateSyncMutationOutput; + server_user_node_sync: { + input: ServerUserNodeSyncMutationInput; + output: ServerUserNodeSyncMutationOutput; }; } } diff --git a/desktop/src/types/sync.ts b/desktop/src/types/sync.ts index 85234241..254d0d3f 100644 --- a/desktop/src/types/sync.ts +++ b/desktop/src/types/sync.ts @@ -34,8 +34,8 @@ export type LocalDeleteNodeChangeData = { deletedBy: string; }; -export type LocalNodeUserStateChangeData = { - type: 'node_user_state_update'; +export type LocalUserNodeChangeData = { + type: 'user_node_update'; nodeId: string; userId: string; lastSeenVersionId: string; @@ -48,4 +48,4 @@ export type LocalNodeChangeData = | LocalCreateNodeChangeData | LocalUpdateNodeChangeData | LocalDeleteNodeChangeData - | LocalNodeUserStateChangeData; + | LocalUserNodeChangeData; diff --git a/server/src/data/migrations.ts b/server/src/data/migrations.ts index dbd754a7..709ffc0f 100644 --- a/server/src/data/migrations.ts +++ b/server/src/data/migrations.ts @@ -281,49 +281,47 @@ const createNodeCollaboratorsTable: Migration = { }, }; -const createNodeUserStatesTable: Migration = { +const createUserNodesTable: Migration = { up: async (db) => { await db.schema - .createTable('node_user_states') - .addColumn('node_id', 'varchar(30)', (col) => col.notNull()) + .createTable('user_nodes') .addColumn('user_id', 'varchar(30)', (col) => col.notNull()) + .addColumn('node_id', 'varchar(30)', (col) => col.notNull()) .addColumn('workspace_id', 'varchar(30)', (col) => col.notNull()) .addColumn('last_seen_version_id', 'varchar(30)') .addColumn('last_seen_at', 'timestamptz') .addColumn('mentions_count', 'integer', (col) => col.notNull().defaultTo(0), ) + .addColumn('attributes', 'jsonb') .addColumn('created_at', 'timestamptz', (col) => col.notNull()) .addColumn('updated_at', 'timestamptz') .addColumn('access_removed_at', 'timestamptz') .addColumn('version_id', 'varchar(30)', (col) => col.notNull()) - .addPrimaryKeyConstraint('node_user_states_pkey', ['node_id', 'user_id']) + .addPrimaryKeyConstraint('user_nodes_pkey', ['user_id', 'node_id']) .execute(); }, down: async (db) => { - await db.schema.dropTable('node_user_states').execute(); + await db.schema.dropTable('user_nodes').execute(); }, }; -const createNodeDeviceStatesTable: Migration = { +const createDeviceNodesTable: Migration = { up: async (db) => { await db.schema - .createTable('node_device_states') - .addColumn('node_id', 'varchar(30)', (col) => col.notNull()) + .createTable('device_nodes') .addColumn('device_id', 'varchar(30)', (col) => col.notNull()) + .addColumn('node_id', 'varchar(30)', (col) => col.notNull()) .addColumn('workspace_id', 'varchar(30)', (col) => col.notNull()) .addColumn('node_version_id', 'varchar(30)') - .addColumn('user_state_version_id', 'varchar(30)') + .addColumn('user_node_version_id', 'varchar(30)') .addColumn('node_synced_at', 'timestamptz') - .addColumn('user_state_synced_at', 'timestamptz') - .addPrimaryKeyConstraint('node_device_states_pkey', [ - 'node_id', - 'device_id', - ]) + .addColumn('user_node_synced_at', 'timestamptz') + .addPrimaryKeyConstraint('device_nodes_pkey', ['device_id', 'node_id']) .execute(); }, down: async (db) => { - await db.schema.dropTable('node_device_states').execute(); + await db.schema.dropTable('device_nodes').execute(); }, }; @@ -335,6 +333,6 @@ export const databaseMigrations: Record = { '00005_create_nodes_table': createNodesTable, '00006_create_node_paths_table': createNodePathsTable, '00007_create_node_collaborators_table': createNodeCollaboratorsTable, - '00008_create_node_user_states_table': createNodeUserStatesTable, - '00009_create_node_device_states_table': createNodeDeviceStatesTable, + '00008_create_user_nodes_table': createUserNodesTable, + '00009_create_device_nodes_table': createDeviceNodesTable, }; diff --git a/server/src/data/schema.ts b/server/src/data/schema.ts index 2f1649ca..f4d37032 100644 --- a/server/src/data/schema.ts +++ b/server/src/data/schema.ts @@ -128,37 +128,38 @@ export type SelectNodeCollaborator = Selectable; export type CreateNodeCollaborator = Insertable; export type UpdateNodeCollaborator = Updateable; -interface NodeUserStateTable { +interface UserNodeTable { node_id: ColumnType; user_id: ColumnType; workspace_id: ColumnType; last_seen_version_id: ColumnType; last_seen_at: ColumnType; mentions_count: ColumnType; + attributes: JSONColumnType; created_at: ColumnType; updated_at: ColumnType; access_removed_at: ColumnType; version_id: ColumnType; } -export type SelectNodeUserState = Selectable; -export type CreateNodeUserState = Insertable; -export type UpdateNodeUserState = Updateable; +export type SelectUserNode = Selectable; +export type CreateUserNode = Insertable; +export type UpdateUserNode = Updateable; -interface NodeDeviceStateTable { +interface DeviceNodeTable { node_id: ColumnType; device_id: ColumnType; workspace_id: ColumnType; node_version_id: ColumnType; - user_state_version_id: ColumnType< - string | null, - string | null, - string | null - >; + user_node_version_id: ColumnType; node_synced_at: ColumnType; - user_state_synced_at: ColumnType; + user_node_synced_at: ColumnType; } +export type SelectDeviceNode = Selectable; +export type CreateDeviceNode = Insertable; +export type UpdateDeviceNode = Updateable; + export interface DatabaseSchema { accounts: AccountTable; devices: DeviceTable; @@ -167,6 +168,6 @@ export interface DatabaseSchema { nodes: NodeTable; node_paths: NodePathTable; node_collaborators: NodeCollaboratorTable; - node_user_states: NodeUserStateTable; - node_device_states: NodeDeviceStateTable; + user_nodes: UserNodeTable; + device_nodes: DeviceNodeTable; } diff --git a/server/src/queues/events.ts b/server/src/queues/events.ts index c18bca6b..723f6774 100644 --- a/server/src/queues/events.ts +++ b/server/src/queues/events.ts @@ -1,6 +1,6 @@ import { database } from '@/data/database'; import { redisConfig } from '@/data/redis'; -import { CreateNodeUserState } from '@/data/schema'; +import { CreateUserNode } from '@/data/schema'; import { filesStorage } from '@/data/storage'; import { BUCKET_NAMES } from '@/data/storage'; import { NodeTypes } from '@/lib/constants'; @@ -61,7 +61,7 @@ const handleEventJob = async (job: Job) => { const handleNodeCreatedEvent = async ( event: NodeCreatedEvent, ): Promise => { - await createNodeUserStates(event); + await createUserNodes(event); await synapse.sendSynapseMessage({ type: 'node_create', nodeId: event.id, @@ -99,13 +99,13 @@ const handleNodeDeletedEvent = async ( }); }; -const createNodeUserStates = async (event: NodeCreatedEvent): Promise => { - const userStatesToCreate: CreateNodeUserState[] = []; +const createUserNodes = async (event: NodeCreatedEvent): Promise => { + const userNodesToCreate: CreateUserNode[] = []; if (event.attributes.type === NodeTypes.User) { const userIds = await fetchWorkspaceUsers(event.workspaceId); for (const userId of userIds) { - userStatesToCreate.push({ + userNodesToCreate.push({ user_id: userId, node_id: event.id, workspace_id: event.workspaceId, @@ -122,7 +122,7 @@ const createNodeUserStates = async (event: NodeCreatedEvent): Promise => { continue; } - userStatesToCreate.push({ + userNodesToCreate.push({ user_id: event.id, node_id: userId, workspace_id: event.workspaceId, @@ -140,7 +140,7 @@ const createNodeUserStates = async (event: NodeCreatedEvent): Promise => { const collaboratorIds = collaborators.map((c) => c.collaboratorId); for (const collaboratorId of collaboratorIds) { - userStatesToCreate.push({ + userNodesToCreate.push({ user_id: collaboratorId, node_id: event.id, workspace_id: event.workspaceId, @@ -157,10 +157,10 @@ const createNodeUserStates = async (event: NodeCreatedEvent): Promise => { } } - if (userStatesToCreate.length > 0) { + if (userNodesToCreate.length > 0) { await database - .insertInto('node_user_states') - .values(userStatesToCreate) + .insertInto('user_nodes') + .values(userNodesToCreate) .onConflict((cb) => cb.doNothing()) .execute(); } @@ -187,16 +187,14 @@ const checkForCollaboratorsChange = async ( } if (addedCollaborators.length > 0) { - const existingNodeUserStates = await database - .selectFrom('node_user_states') + const existingUserNodes = await database + .selectFrom('user_nodes') .select('user_id') .where('node_id', '=', event.id) .where('user_id', 'in', addedCollaborators) .execute(); - const existingCollaboratorIds = existingNodeUserStates.map( - (e) => e.user_id, - ); + const existingCollaboratorIds = existingUserNodes.map((e) => e.user_id); const actualAddedCollaborators = difference( addedCollaborators, @@ -211,10 +209,10 @@ const checkForCollaboratorsChange = async ( .execute(); const descendantIds = descendants.map((d) => d.descendant_id); - const userStatesToCreate: CreateNodeUserState[] = []; + const userNodesToCreate: CreateUserNode[] = []; for (const collaboratorId of addedCollaborators) { for (const descendantId of descendantIds) { - userStatesToCreate.push({ + userNodesToCreate.push({ user_id: collaboratorId, node_id: descendantId, last_seen_version_id: null, @@ -228,10 +226,10 @@ const checkForCollaboratorsChange = async ( } } - if (userStatesToCreate.length > 0) { + if (userNodesToCreate.length > 0) { await database - .insertInto('node_user_states') - .values(userStatesToCreate) + .insertInto('user_nodes') + .values(userNodesToCreate) .onConflict((cb) => cb.doNothing()) .execute(); } @@ -255,7 +253,7 @@ const checkForCollaboratorsChange = async ( const descendantIds = descendants.map((d) => d.descendant_id); await database - .updateTable('node_user_states') + .updateTable('user_nodes') .set({ access_removed_at: new Date(), }) diff --git a/server/src/queues/tasks.ts b/server/src/queues/tasks.ts index c8ba8e05..3448bab2 100644 --- a/server/src/queues/tasks.ts +++ b/server/src/queues/tasks.ts @@ -54,7 +54,7 @@ const handleCleanDeviceDataTask = async ( } await database - .deleteFrom('node_device_states') + .deleteFrom('device_nodes') .where('device_id', '=', task.deviceId) .execute(); }; diff --git a/server/src/routes/sync.ts b/server/src/routes/sync.ts index 12aa895e..a717da65 100644 --- a/server/src/routes/sync.ts +++ b/server/src/routes/sync.ts @@ -315,7 +315,7 @@ const handleNodeUserStateChange = async ( } await database - .updateTable('node_user_states') + .updateTable('user_nodes') .set({ last_seen_version_id: changeData.lastSeenVersionId, last_seen_at: new Date(changeData.lastSeenAt), @@ -328,7 +328,7 @@ const handleNodeUserStateChange = async ( .execute(); await synapse.sendSynapseMessage({ - type: 'node_user_state_update', + type: 'user_node_update', nodeId: changeData.nodeId, userId: changeData.userId, workspaceId: workspaceUser.workspace_id, diff --git a/server/src/services/synapse.ts b/server/src/services/synapse.ts index be6a0aa4..56a71506 100644 --- a/server/src/services/synapse.ts +++ b/server/src/services/synapse.ts @@ -7,8 +7,8 @@ import { redis } from '@/data/redis'; import { SynapseMessage, SynapseNodeChangeMessage, - SynapseNodeUserStateChangeMessage, -} from '@/types/events'; + SynapseUserNodeChangeMessage, +} from '@/types/synapse'; import { getIdType, IdType } from '@/lib/id'; import { MessageInput } from '@/types/messages'; @@ -108,13 +108,13 @@ class SynapseService { ) { if (message.type === 'local_node_sync') { await database - .insertInto('node_device_states') + .insertInto('device_nodes') .values({ node_id: message.nodeId, device_id: connection.deviceId, node_version_id: message.versionId, - user_state_version_id: null, - user_state_synced_at: null, + user_node_version_id: null, + user_node_synced_at: null, workspace_id: message.workspaceId, node_synced_at: new Date(), }) @@ -126,29 +126,29 @@ class SynapseService { }), ) .execute(); - } else if (message.type === 'local_node_user_state_sync') { + } else if (message.type === 'local_user_node_sync') { await database - .insertInto('node_device_states') + .insertInto('device_nodes') .values({ node_id: message.nodeId, device_id: connection.deviceId, node_version_id: null, - user_state_version_id: message.versionId, - user_state_synced_at: new Date(), + user_node_version_id: message.versionId, + user_node_synced_at: new Date(), workspace_id: message.workspaceId, node_synced_at: new Date(), }) .onConflict((cb) => cb.columns(['node_id', 'device_id']).doUpdateSet({ workspace_id: message.workspaceId, - user_state_version_id: message.versionId, - user_state_synced_at: new Date(), + user_node_version_id: message.versionId, + user_node_synced_at: new Date(), }), ) .execute(); } else if (message.type === 'local_node_delete') { await database - .deleteFrom('node_device_states') + .deleteFrom('device_nodes') .where('device_id', '=', connection.deviceId) .where('node_id', '=', message.nodeId) .execute(); @@ -159,7 +159,7 @@ class SynapseService { if (userId) { await database - .deleteFrom('node_user_states') + .deleteFrom('user_nodes') .where('node_id', '=', message.nodeId) .where('user_id', '=', userId) .execute(); @@ -181,8 +181,8 @@ class SynapseService { data.type === 'node_delete' ) { this.handleNodeChangeMessage(data); - } else if (data.type === 'node_user_state_update') { - this.handleNodeUserStateUpdateMessage(data); + } else if (data.type === 'user_node_update') { + this.handleUserNodeUpdateMessage(data); } } @@ -202,21 +202,21 @@ class SynapseService { return; } - const nodeUserStates = await database - .selectFrom('node_user_states') + const userNodes = await database + .selectFrom('user_nodes') .selectAll() .where((eb) => eb.and([eb('user_id', 'in', userIds), eb('node_id', '=', data.nodeId)]), ) .execute(); - if (nodeUserStates.length === 0) { + if (userNodes.length === 0) { return; } if (data.type === 'node_delete') { - for (const nodeUserState of nodeUserStates) { - const deviceIds = userDevices.get(nodeUserState.user_id) ?? []; + for (const userNode of userNodes) { + const deviceIds = userDevices.get(userNode.user_id) ?? []; for (const deviceId of deviceIds) { const socketConnection = this.connections.get(deviceId); if (socketConnection === undefined) { @@ -254,8 +254,8 @@ class SynapseService { return; } - for (const nodeUserState of nodeUserStates) { - const deviceIds = userDevices.get(nodeUserState.user_id) ?? []; + for (const userNode of userNodes) { + const deviceIds = userDevices.get(userNode.user_id) ?? []; if (deviceIds.length === 0) { continue; } @@ -266,7 +266,7 @@ class SynapseService { continue; } - if (nodeUserState.access_removed_at !== null) { + if (userNode.access_removed_at !== null) { this.sendSocketMessage(socketConnection, { type: 'server_node_delete', id: data.nodeId, @@ -291,22 +291,22 @@ class SynapseService { } } - private async handleNodeUserStateUpdateMessage( - data: SynapseNodeUserStateChangeMessage, + private async handleUserNodeUpdateMessage( + data: SynapseUserNodeChangeMessage, ) { const userDevices = this.getWorkspaceUserDevices(data.workspaceId); if (!userDevices.has(data.userId)) { return; } - const userState = await database - .selectFrom('node_user_states') + const userNode = await database + .selectFrom('user_nodes') .selectAll() .where('user_id', '=', data.userId) .where('node_id', '=', data.nodeId) .executeTakeFirst(); - if (!userState) { + if (!userNode) { return; } @@ -318,16 +318,16 @@ class SynapseService { } this.sendSocketMessage(socketConnection, { - type: 'server_node_user_state_sync', + type: 'server_user_node_sync', userId: data.userId, nodeId: data.nodeId, - lastSeenVersionId: userState.last_seen_version_id, + lastSeenVersionId: userNode.last_seen_version_id, workspaceId: data.workspaceId, - versionId: userState.version_id, - lastSeenAt: userState.last_seen_at?.toISOString() ?? null, - createdAt: userState.created_at.toISOString(), - updatedAt: userState.updated_at?.toISOString() ?? null, - mentionsCount: userState.mentions_count, + versionId: userNode.version_id, + lastSeenAt: userNode.last_seen_at?.toISOString() ?? null, + createdAt: userNode.created_at.toISOString(), + updatedAt: userNode.updated_at?.toISOString() ?? null, + mentionsCount: userNode.mentions_count, }); } } @@ -354,9 +354,9 @@ class SynapseService { } const unsyncedNodes = await database - .selectFrom('node_user_states as nus') + .selectFrom('user_nodes as nus') .leftJoin('nodes as n', 'n.id', 'nus.node_id') - .leftJoin('node_device_states as nds', (join) => + .leftJoin('device_nodes as nds', (join) => join .onRef('nds.node_id', '=', 'nus.node_id') .on('nds.device_id', '=', connection.deviceId), @@ -380,9 +380,9 @@ class SynapseService { 'nus.mentions_count', 'nus.created_at', 'nus.updated_at', - 'nus.version_id as user_state_version_id', + 'nus.version_id as user_node_version_id', 'nds.node_version_id as device_node_version_id', - 'nds.user_state_version_id as device_user_state_version_id', + 'nds.user_node_version_id as device_user_node_version_id', ]) .where((eb) => eb.and([ @@ -392,8 +392,8 @@ class SynapseService { eb('nus.access_removed_at', 'is not', null), eb('nds.node_version_id', 'is', null), eb('nds.node_version_id', '!=', eb.ref('n.version_id')), - eb('nds.user_state_version_id', 'is', null), - eb('nds.user_state_version_id', '!=', eb.ref('nus.version_id')), + eb('nds.user_node_version_id', 'is', null), + eb('nds.user_node_version_id', '!=', eb.ref('nus.version_id')), ]), ]), ) @@ -416,13 +416,13 @@ class SynapseService { continue; } - if (row.user_state_version_id !== row.device_user_state_version_id) { + if (row.user_node_version_id !== row.device_user_node_version_id) { this.sendSocketMessage(connection, { - type: 'server_node_user_state_sync', + type: 'server_user_node_sync', nodeId: row.node_id, userId: row.user_id, workspaceId: row.workspace_id, - versionId: row.user_state_version_id!, + versionId: row.user_node_version_id!, lastSeenAt: row.last_seen_at?.toISOString() ?? null, lastSeenVersionId: row.last_seen_version_id ?? null, mentionsCount: row.mentions_count, diff --git a/server/src/types/events.ts b/server/src/types/events.ts index 12ecda6c..6b7bb2b1 100644 --- a/server/src/types/events.ts +++ b/server/src/types/events.ts @@ -32,20 +32,3 @@ export type NodeDeletedEvent = { attributes: ServerNodeAttributes; deletedAt: string; }; - -export type SynapseNodeChangeMessage = { - workspaceId: string; - nodeId: string; - type: 'node_create' | 'node_update' | 'node_delete'; -}; - -export type SynapseNodeUserStateChangeMessage = { - workspaceId: string; - nodeId: string; - userId: string; - type: 'node_user_state_update'; -}; - -export type SynapseMessage = - | SynapseNodeChangeMessage - | SynapseNodeUserStateChangeMessage; diff --git a/server/src/types/messages.ts b/server/src/types/messages.ts index a79ed16d..0fb28f5a 100644 --- a/server/src/types/messages.ts +++ b/server/src/types/messages.ts @@ -11,8 +11,8 @@ export type LocalNodeDeleteMessageInput = { workspaceId: string; }; -export type LocalNodeUserStateSyncMessageInput = { - type: 'local_node_user_state_sync'; +export type LocalUserNodeSyncMessageInput = { + type: 'local_user_node_sync'; nodeId: string; userId: string; workspaceId: string; @@ -33,10 +33,10 @@ export type ServerNodeSyncMessageInput = { versionId: string; }; -export type ServerNodeUserStateSyncMessageInput = { - type: 'server_node_user_state_sync'; - nodeId: string; +export type ServerUserNodeSyncMessageInput = { + type: 'server_user_node_sync'; userId: string; + nodeId: string; workspaceId: string; versionId: string; lastSeenAt: string | null; @@ -57,5 +57,5 @@ export type MessageInput = | LocalNodeDeleteMessageInput | ServerNodeSyncMessageInput | ServerNodeDeleteMessageInput - | LocalNodeUserStateSyncMessageInput - | ServerNodeUserStateSyncMessageInput; + | LocalUserNodeSyncMessageInput + | ServerUserNodeSyncMessageInput; diff --git a/server/src/types/synapse.ts b/server/src/types/synapse.ts new file mode 100644 index 00000000..ef4d71b0 --- /dev/null +++ b/server/src/types/synapse.ts @@ -0,0 +1,16 @@ +export type SynapseNodeChangeMessage = { + workspaceId: string; + nodeId: string; + type: 'node_create' | 'node_update' | 'node_delete'; +}; + +export type SynapseUserNodeChangeMessage = { + workspaceId: string; + nodeId: string; + userId: string; + type: 'user_node_update'; +}; + +export type SynapseMessage = + | SynapseNodeChangeMessage + | SynapseUserNodeChangeMessage;