From 16fdbbd71f7a56ddfe50bae38c58e779e902e337 Mon Sep 17 00:00:00 2001 From: Hakan Shehu Date: Fri, 29 Nov 2024 22:27:23 +0100 Subject: [PATCH] Implement interactions --- apps/desktop/src/main/data/app/migrations.ts | 1 + apps/desktop/src/main/data/app/schema.ts | 7 +- .../src/main/data/workspace/migrations.ts | 55 ++- .../desktop/src/main/data/workspace/schema.ts | 41 +- .../src/main/mutations/field-create.ts | 8 +- .../main/mutations/select-option-create.ts | 8 +- .../desktop/src/main/mutations/view-create.ts | 8 +- apps/desktop/src/main/queries/file-list.ts | 8 +- apps/desktop/src/main/queries/message-list.ts | 5 +- apps/desktop/src/main/queries/record-list.ts | 2 +- apps/desktop/src/main/queries/user-search.ts | 2 +- .../src/main/queries/workspace-user-list.ts | 2 +- .../src/main/services/interaction-service.ts | 354 ++++++++++++++++++ .../desktop/src/main/services/node-service.ts | 72 +++- .../src/main/services/socket-connection.ts | 7 +- .../src/main/services/socket-service.ts | 8 +- .../desktop/src/main/services/sync-service.ts | 214 ++++++++++- apps/desktop/src/main/utils.ts | 21 +- .../collaborators/node-collaborator-audit.tsx | 2 +- .../databases/calendars/calendar-view-day.tsx | 4 +- .../calendars/calendar-view-grid.tsx | 14 +- .../renderer/components/databases/view.tsx | 2 +- .../components/files/file-sidebar.tsx | 3 +- .../components/messages/message-list.tsx | 2 +- .../components/messages/message-time.tsx | 3 +- .../records/values/record-url-value.tsx | 4 +- .../components/spaces/space-sidebar-item.tsx | 3 +- .../renderer/components/ui/date-picker.tsx | 3 +- .../workspaces/workspace-user-invite.tsx | 2 +- apps/desktop/src/shared/lib/avatars.ts | 3 +- apps/desktop/src/shared/lib/databases.ts | 3 +- apps/desktop/src/shared/lib/editor.ts | 2 +- apps/desktop/src/shared/lib/utils.ts | 186 --------- apps/desktop/src/shared/types/events.ts | 12 +- apps/desktop/src/shared/types/interactions.ts | 12 + apps/server/src/data/migrations.ts | 58 ++- apps/server/src/data/schema.ts | 38 +- apps/server/src/lib/nodes.ts | 19 + .../src/services/interaction-service.ts | 157 ++++++++ apps/server/src/services/synapse-service.ts | 144 ++++++- apps/server/src/types/events.ts | 13 +- packages/core/src/index.ts | 3 + packages/core/src/lib/id.ts | 1 + packages/core/src/lib/interactions.ts | 57 +++ packages/core/src/lib/utils.ts | 210 +++++++++++ packages/core/src/types/interactions.ts | 14 + packages/core/src/types/messages.ts | 29 +- packages/core/src/types/sync.ts | 21 +- 48 files changed, 1566 insertions(+), 281 deletions(-) create mode 100644 apps/desktop/src/main/services/interaction-service.ts create mode 100644 apps/desktop/src/shared/types/interactions.ts create mode 100644 apps/server/src/services/interaction-service.ts create mode 100644 packages/core/src/lib/interactions.ts create mode 100644 packages/core/src/lib/utils.ts create mode 100644 packages/core/src/types/interactions.ts diff --git a/apps/desktop/src/main/data/app/migrations.ts b/apps/desktop/src/main/data/app/migrations.ts index e27baae2..6709f060 100644 --- a/apps/desktop/src/main/data/app/migrations.ts +++ b/apps/desktop/src/main/data/app/migrations.ts @@ -95,6 +95,7 @@ const createWorkspaceCursorsTable: Migration = { .addColumn('collaborations', 'integer', (col) => col.notNull().defaultTo(0) ) + .addColumn('interactions', 'integer', (col) => col.notNull().defaultTo(0)) .addColumn('revocations', 'integer', (col) => col.notNull().defaultTo(0)) .addColumn('created_at', 'text', (col) => col.notNull()) .addColumn('updated_at', 'text') diff --git a/apps/desktop/src/main/data/app/schema.ts b/apps/desktop/src/main/data/app/schema.ts index fd5915ca..8bad7606 100644 --- a/apps/desktop/src/main/data/app/schema.ts +++ b/apps/desktop/src/main/data/app/schema.ts @@ -47,9 +47,10 @@ export type UpdateWorkspace = Updateable; interface WorkspaceCursorTable { user_id: ColumnType; - transactions: ColumnType; - collaborations: ColumnType; - revocations: ColumnType; + transactions: ColumnType; + collaborations: ColumnType; + interactions: ColumnType; + revocations: ColumnType; created_at: ColumnType; updated_at: ColumnType; } diff --git a/apps/desktop/src/main/data/workspace/migrations.ts b/apps/desktop/src/main/data/workspace/migrations.ts index 41db741c..1eb84a6e 100644 --- a/apps/desktop/src/main/data/workspace/migrations.ts +++ b/apps/desktop/src/main/data/workspace/migrations.ts @@ -114,6 +114,49 @@ const createUploadsTable: Migration = { }, }; +const createInteractionsTable: Migration = { + up: async (db) => { + await db.schema + .createTable('interactions') + .addColumn('user_id', 'text', (col) => col.notNull()) + .addColumn('node_id', 'text', (col) => col.notNull()) + .addColumn('node_type', 'text', (col) => col.notNull()) + .addColumn('attributes', 'text') + .addColumn('created_at', 'text', (col) => col.notNull()) + .addColumn('updated_at', 'text') + .addColumn('server_created_at', 'text') + .addColumn('server_updated_at', 'text') + .addColumn('version', 'integer', (col) => col.notNull()) + .addPrimaryKeyConstraint('interactions_pkey', ['user_id', 'node_id']) + .execute(); + }, + down: async (db) => { + await db.schema.dropTable('interactions').execute(); + }, +}; + +const createInteractionEventsTable: Migration = { + up: async (db) => { + await db.schema + .createTable('interaction_events') + .addColumn('node_id', 'text', (col) => col.notNull()) + .addColumn('node_type', 'text', (col) => col.notNull()) + .addColumn('attribute', 'text', (col) => col.notNull()) + .addColumn('value', 'text', (col) => col.notNull()) + .addColumn('created_at', 'text', (col) => col.notNull()) + .addColumn('sent_at', 'text') + .addColumn('event_id', 'text', (col) => col.notNull()) + .addPrimaryKeyConstraint('interaction_events_pkey', [ + 'node_id', + 'attribute', + ]) + .execute(); + }, + down: async (db) => { + await db.schema.dropTable('interaction_events').execute(); + }, +}; + const createNodePathsTable: Migration = { up: async (db) => { await db.schema @@ -258,9 +301,11 @@ export const workspaceDatabaseMigrations: Record = { '00003_create_collaborations_table': createCollaborationsTable, '00004_create_uploads_table': createUploadsTable, '00005_create_downloads_table': createDownloadsTable, - '00006_create_node_paths_table': createNodePathsTable, - '00007_create_node_names_table': createNodeNamesTable, - '00008_create_node_insert_name_trigger': createNodeInsertNameTrigger, - '00009_create_node_update_name_trigger': createNodeUpdateNameTrigger, - '00010_create_node_delete_name_trigger': createNodeDeleteNameTrigger, + '00006_create_interactions_table': createInteractionsTable, + '00007_create_interaction_events_table': createInteractionEventsTable, + '00008_create_node_paths_table': createNodePathsTable, + '00009_create_node_names_table': createNodeNamesTable, + '00010_create_node_insert_name_trigger': createNodeInsertNameTrigger, + '00011_create_node_update_name_trigger': createNodeUpdateNameTrigger, + '00012_create_node_delete_name_trigger': createNodeDeleteNameTrigger, }; diff --git a/apps/desktop/src/main/data/workspace/schema.ts b/apps/desktop/src/main/data/workspace/schema.ts index f23667ee..50fc9c00 100644 --- a/apps/desktop/src/main/data/workspace/schema.ts +++ b/apps/desktop/src/main/data/workspace/schema.ts @@ -1,9 +1,14 @@ +import { + InteractionAttribute, + InteractionAttributes, + NodeType, +} from '@colanode/core'; import { ColumnType, Insertable, Selectable, Updateable } from 'kysely'; interface NodeTable { id: ColumnType; parent_id: ColumnType; - type: ColumnType; + type: ColumnType; attributes: ColumnType; created_at: ColumnType; updated_at: ColumnType; @@ -27,7 +32,7 @@ export type SelectNodePath = Selectable; interface NodeTransactionTable { id: ColumnType; node_id: ColumnType; - node_type: ColumnType; + node_type: ColumnType; operation: ColumnType; data: ColumnType; created_at: ColumnType; @@ -83,6 +88,36 @@ export type SelectDownload = Selectable; export type CreateDownload = Insertable; export type UpdateDownload = Updateable; +interface InteractionTable { + user_id: ColumnType; + node_id: ColumnType; + node_type: ColumnType; + attributes: ColumnType; + created_at: ColumnType; + updated_at: ColumnType; + server_created_at: ColumnType; + server_updated_at: ColumnType; + version: ColumnType; +} + +export type SelectInteraction = Selectable; +export type CreateInteraction = Insertable; +export type UpdateInteraction = Updateable; + +interface InteractionEventTable { + node_id: ColumnType; + node_type: ColumnType; + attribute: ColumnType; + value: ColumnType; + created_at: ColumnType; + sent_at: ColumnType; + event_id: ColumnType; +} + +export type SelectInteractionEvent = Selectable; +export type CreateInteractionEvent = Insertable; +export type UpdateInteractionEvent = Updateable; + export interface WorkspaceDatabaseSchema { nodes: NodeTable; node_transactions: NodeTransactionTable; @@ -90,4 +125,6 @@ export interface WorkspaceDatabaseSchema { collaborations: CollaborationTable; uploads: UploadTable; downloads: DownloadTable; + interactions: InteractionTable; + interaction_events: InteractionEventTable; } diff --git a/apps/desktop/src/main/mutations/field-create.ts b/apps/desktop/src/main/mutations/field-create.ts index e2c8748b..ba02526c 100644 --- a/apps/desktop/src/main/mutations/field-create.ts +++ b/apps/desktop/src/main/mutations/field-create.ts @@ -1,5 +1,9 @@ -import { generateId, IdType, generateNodeIndex } from '@colanode/core'; -import { compareString } from '@/shared/lib/utils'; +import { + generateId, + IdType, + generateNodeIndex, + compareString, +} from '@colanode/core'; import { MutationHandler } from '@/main/types'; import { FieldCreateMutationInput, diff --git a/apps/desktop/src/main/mutations/select-option-create.ts b/apps/desktop/src/main/mutations/select-option-create.ts index 9bc4cfc5..42860060 100644 --- a/apps/desktop/src/main/mutations/select-option-create.ts +++ b/apps/desktop/src/main/mutations/select-option-create.ts @@ -1,10 +1,14 @@ -import { generateId, IdType, generateNodeIndex } from '@colanode/core'; +import { + generateId, + IdType, + generateNodeIndex, + compareString, +} from '@colanode/core'; import { MutationHandler } from '@/main/types'; import { SelectOptionCreateMutationInput, SelectOptionCreateMutationOutput, } from '@/shared/mutations/select-option-create'; -import { compareString } from '@/shared/lib/utils'; import { nodeService } from '@/main/services/node-service'; export class SelectOptionCreateMutationHandler diff --git a/apps/desktop/src/main/mutations/view-create.ts b/apps/desktop/src/main/mutations/view-create.ts index 61250af8..e845094a 100644 --- a/apps/desktop/src/main/mutations/view-create.ts +++ b/apps/desktop/src/main/mutations/view-create.ts @@ -1,10 +1,14 @@ -import { generateId, IdType, generateNodeIndex } from '@colanode/core'; +import { + generateId, + IdType, + generateNodeIndex, + compareString, +} from '@colanode/core'; import { MutationHandler } from '@/main/types'; import { ViewCreateMutationInput, ViewCreateMutationOutput, } from '@/shared/mutations/view-create'; -import { compareString } from '@/shared/lib/utils'; import { nodeService } from '@/main/services/node-service'; export class ViewCreateMutationHandler diff --git a/apps/desktop/src/main/queries/file-list.ts b/apps/desktop/src/main/queries/file-list.ts index 10f65116..2a742430 100644 --- a/apps/desktop/src/main/queries/file-list.ts +++ b/apps/desktop/src/main/queries/file-list.ts @@ -1,8 +1,7 @@ import { FileListQueryInput } from '@/shared/queries/file-list'; import { databaseService } from '@/main/data/database-service'; import { ChangeCheckResult, QueryHandler } from '@/main/types'; -import { NodeTypes, FileNode } from '@colanode/core'; -import { compareString } from '@/shared/lib/utils'; +import { FileNode, compareString } from '@colanode/core'; import { Event } from '@/shared/types/events'; import { SelectNode } from '@/main/data/workspace/schema'; import { mapNode } from '@/main/utils'; @@ -95,10 +94,7 @@ export class FileListQueryHandler implements QueryHandler { .selectFrom('nodes') .selectAll() .where((eb) => - eb.and([ - eb('parent_id', '=', input.parentId), - eb('type', '=', NodeTypes.File), - ]) + eb.and([eb('parent_id', '=', input.parentId), eb('type', '=', 'file')]) ) .orderBy('id', 'asc') .limit(input.count) diff --git a/apps/desktop/src/main/queries/message-list.ts b/apps/desktop/src/main/queries/message-list.ts index a950e745..735c7a4d 100644 --- a/apps/desktop/src/main/queries/message-list.ts +++ b/apps/desktop/src/main/queries/message-list.ts @@ -2,9 +2,8 @@ import { MessageListQueryInput } from '@/shared/queries/message-list'; import { databaseService } from '@/main/data/database-service'; import { ChangeCheckResult, QueryHandler } from '@/main/types'; import { SelectNode } from '@/main/data/workspace/schema'; -import { MessageNode, NodeTypes } from '@colanode/core'; +import { MessageNode, compareString } from '@colanode/core'; import { mapNode } from '@/main/utils'; -import { compareString } from '@/shared/lib/utils'; import { Event } from '@/shared/types/events'; export class MessageListQueryHandler @@ -104,7 +103,7 @@ export class MessageListQueryHandler .where((eb) => eb.and([ eb('parent_id', '=', input.conversationId), - eb('type', '=', NodeTypes.Message), + eb('type', '=', 'message'), ]) ) .orderBy('id', 'desc') diff --git a/apps/desktop/src/main/queries/record-list.ts b/apps/desktop/src/main/queries/record-list.ts index 0ee611d2..7216acd8 100644 --- a/apps/desktop/src/main/queries/record-list.ts +++ b/apps/desktop/src/main/queries/record-list.ts @@ -20,8 +20,8 @@ import { ViewSortAttributes, DatabaseNode, RecordNode, + isStringArray, } from '@colanode/core'; -import { isStringArray } from '@/shared/lib/utils'; import { mapNode } from '@/main/utils'; import { NodeTypes } from '@colanode/core'; import { Event } from '@/shared/types/events'; diff --git a/apps/desktop/src/main/queries/user-search.ts b/apps/desktop/src/main/queries/user-search.ts index 7e429fb6..1419e39c 100644 --- a/apps/desktop/src/main/queries/user-search.ts +++ b/apps/desktop/src/main/queries/user-search.ts @@ -113,7 +113,7 @@ export class UserSearchQueryHandler const exclude = input.exclude ?? []; return workspaceDatabase .selectFrom('nodes') - .where('type', '=', NodeTypes.User) + .where('type', '=', 'user') .where('id', '!=', input.userId) .where('id', 'not in', exclude) .selectAll() diff --git a/apps/desktop/src/main/queries/workspace-user-list.ts b/apps/desktop/src/main/queries/workspace-user-list.ts index 9c470a92..477e9d68 100644 --- a/apps/desktop/src/main/queries/workspace-user-list.ts +++ b/apps/desktop/src/main/queries/workspace-user-list.ts @@ -93,7 +93,7 @@ export class WorkspaceUserListQueryHandler const rows = await workspaceDatabase .selectFrom('nodes') .selectAll() - .where('type', '=', NodeTypes.User) + .where('type', '=', 'user') .orderBy('created_at asc') .offset(offset) .limit(input.count) diff --git a/apps/desktop/src/main/services/interaction-service.ts b/apps/desktop/src/main/services/interaction-service.ts new file mode 100644 index 00000000..e6cc213a --- /dev/null +++ b/apps/desktop/src/main/services/interaction-service.ts @@ -0,0 +1,354 @@ +import { + generateId, + IdType, + InteractionAttribute, + InteractionAttributes, + mergeInteractionAttributes, + NodeType, + ServerInteraction, +} from '@colanode/core'; +import { databaseService } from '@/main/data/database-service'; +import { SelectInteractionEvent } from '@/main/data/workspace/schema'; +import { eventBus } from '@/shared/lib/event-bus'; + +const UPDATE_RETRIES_COUNT = 10; + +type ServerAttributesMergeResult = { + attributes: InteractionAttributes; + toDeleteEventIds: string[]; +}; + +class InteractionService { + public async setInteraction( + userId: string, + nodeId: string, + nodeType: NodeType, + attribute: InteractionAttribute, + value: string + ) { + for (let i = 0; i < UPDATE_RETRIES_COUNT; i++) { + const updated = await this.tryUpdateInteraction( + userId, + nodeId, + nodeType, + attribute, + value + ); + + if (updated) { + return true; + } + } + + return false; + } + + private async tryUpdateInteraction( + userId: string, + nodeId: string, + nodeType: NodeType, + attribute: InteractionAttribute, + value: string + ): Promise { + const workspaceDatabase = + await databaseService.getWorkspaceDatabase(userId); + + const interaction = await workspaceDatabase + .selectFrom('interactions') + .selectAll() + .where('node_id', '=', nodeId) + .executeTakeFirst(); + + const attributes = mergeInteractionAttributes( + interaction?.attributes, + attribute, + value + ); + + if (!attributes) { + return true; + } + + if (interaction) { + const result = await workspaceDatabase + .transaction() + .execute(async (tx) => { + const updatedInteraction = await tx + .updateTable('interactions') + .returningAll() + .set({ + attributes: JSON.stringify(attributes), + updated_at: new Date().toISOString(), + }) + .where('node_id', '=', nodeId) + .where('user_id', '=', userId) + .where('version', '=', interaction.version) + .executeTakeFirst(); + + if (!updatedInteraction) { + return false; + } + + await tx + .insertInto('interaction_events') + .values({ + node_id: nodeId, + node_type: nodeType, + attribute, + value, + created_at: new Date().toISOString(), + event_id: generateId(IdType.Event), + }) + .onConflict((b) => + b.columns(['node_id', 'attribute']).doUpdateSet({ + value, + sent_at: null, + event_id: generateId(IdType.Event), + }) + ) + .execute(); + + return true; + }); + + if (result) { + eventBus.publish({ + type: 'interaction_event_created', + userId, + nodeId, + }); + } + + return result; + } + + const result = await workspaceDatabase.transaction().execute(async (tx) => { + const createdInteraction = await tx + .insertInto('interactions') + .returningAll() + .values({ + node_id: nodeId, + node_type: nodeType, + user_id: userId, + attributes: JSON.stringify(attributes), + created_at: new Date().toISOString(), + version: BigInt(0), + }) + .onConflict((b) => b.columns(['node_id', 'user_id']).doNothing()) + .executeTakeFirst(); + + if (!createdInteraction) { + return false; + } + + await tx + .insertInto('interaction_events') + .values({ + node_id: nodeId, + node_type: nodeType, + attribute, + value, + created_at: new Date().toISOString(), + event_id: generateId(IdType.Event), + }) + .onConflict((b) => + b.columns(['node_id', 'attribute']).doUpdateSet({ + value, + sent_at: null, + event_id: generateId(IdType.Event), + }) + ) + .execute(); + + return true; + }); + + if (result) { + eventBus.publish({ + type: 'interaction_event_created', + userId, + nodeId, + }); + } + + return result; + } + + public async applyServerInteraction( + userId: string, + interaction: ServerInteraction + ) { + if (interaction.userId !== userId) { + await this.replaceInteraction(userId, interaction); + return true; + } + + for (let i = 0; i < UPDATE_RETRIES_COUNT; i++) { + const updated = await this.tryApplyServerInteraction(userId, interaction); + + if (updated) { + return true; + } + } + + return false; + } + + private async replaceInteraction( + userId: string, + interaction: ServerInteraction + ) { + const workspaceDatabase = + await databaseService.getWorkspaceDatabase(userId); + + await workspaceDatabase + .insertInto('interactions') + .values({ + user_id: interaction.userId, + node_id: interaction.nodeId, + node_type: interaction.nodeType, + attributes: JSON.stringify(interaction.attributes), + created_at: interaction.createdAt, + updated_at: interaction.updatedAt, + server_created_at: interaction.serverCreatedAt, + server_updated_at: interaction.serverUpdatedAt, + version: BigInt(interaction.version), + }) + .onConflict((b) => + b.columns(['node_id', 'user_id']).doUpdateSet({ + attributes: JSON.stringify(interaction.attributes), + updated_at: interaction.updatedAt, + server_updated_at: interaction.serverUpdatedAt, + version: BigInt(interaction.version), + }) + ) + .execute(); + } + + private async tryApplyServerInteraction( + userId: string, + interaction: ServerInteraction + ): Promise { + console.log('trying to apply server interaction', interaction); + const workspaceDatabase = + await databaseService.getWorkspaceDatabase(userId); + + const existingInteraction = await workspaceDatabase + .selectFrom('interactions') + .selectAll() + .where('node_id', '=', interaction.nodeId) + .executeTakeFirst(); + + const interactionEvents = await workspaceDatabase + .selectFrom('interaction_events') + .selectAll() + .where('node_id', '=', interaction.nodeId) + .execute(); + + const { attributes, toDeleteEventIds } = this.mergeServerAttributes( + interaction.attributes, + interactionEvents + ); + + if (existingInteraction) { + const result = await workspaceDatabase + .transaction() + .execute(async (tx) => { + const updatedInteraction = await tx + .updateTable('interactions') + .returningAll() + .set({ + attributes: JSON.stringify(attributes), + updated_at: interaction.updatedAt, + server_updated_at: interaction.serverUpdatedAt, + version: BigInt(interaction.version), + }) + .where('node_id', '=', interaction.nodeId) + .where('user_id', '=', userId) + .where('version', '=', existingInteraction.version) + .executeTakeFirst(); + + if (!updatedInteraction) { + return false; + } + + if (toDeleteEventIds.length > 0) { + await tx + .deleteFrom('interaction_events') + .where('node_id', '=', interaction.nodeId) + .where('event_id', 'in', toDeleteEventIds) + .execute(); + } + + return true; + }); + + return result; + } + + const result = await workspaceDatabase.transaction().execute(async (tx) => { + const createdInteraction = await tx + .insertInto('interactions') + .returningAll() + .values({ + user_id: interaction.userId, + node_id: interaction.nodeId, + node_type: interaction.nodeType, + attributes: JSON.stringify(attributes), + created_at: interaction.createdAt, + updated_at: interaction.updatedAt, + server_created_at: interaction.serverCreatedAt, + server_updated_at: interaction.serverUpdatedAt, + version: BigInt(interaction.version), + }) + .onConflict((b) => b.columns(['node_id', 'user_id']).doNothing()) + .executeTakeFirst(); + + if (!createdInteraction) { + return false; + } + + if (toDeleteEventIds.length > 0) { + await tx + .deleteFrom('interaction_events') + .where('node_id', '=', interaction.nodeId) + .where('event_id', 'in', toDeleteEventIds) + .execute(); + } + + return true; + }); + + return result; + } + + private mergeServerAttributes( + attributes: InteractionAttributes, + events: SelectInteractionEvent[] + ): ServerAttributesMergeResult { + if (events.length === 0) { + return { attributes, toDeleteEventIds: [] }; + } + + let result = { ...attributes }; + const toDeleteEventIds: string[] = []; + + for (const event of events) { + const merged = mergeInteractionAttributes( + result, + event.attribute, + event.value + ); + + if (merged) { + result = merged; + } else { + toDeleteEventIds.push(event.event_id); + } + } + + return { attributes: result, toDeleteEventIds }; + } +} + +export const interactionService = new InteractionService(); diff --git a/apps/desktop/src/main/services/node-service.ts b/apps/desktop/src/main/services/node-service.ts index 24af2390..d6fa8f64 100644 --- a/apps/desktop/src/main/services/node-service.ts +++ b/apps/desktop/src/main/services/node-service.ts @@ -29,6 +29,7 @@ import { import { eventBus } from '@/shared/lib/event-bus'; import { SelectWorkspace } from '@/main/data/app/schema'; import { sql } from 'kysely'; +import { interactionService } from '@/main/services/interaction-service'; export type CreateNodeInput = { id: string; @@ -191,6 +192,14 @@ class NodeService { userId, transaction: mapTransaction(createdTransaction), }); + + await interactionService.setInteraction( + userId, + createdTransaction.node_id, + createdTransaction.node_type, + 'lastReceivedTransactionId', + createdTransaction.id + ); } for (const createdUpload of createdUploads) { @@ -323,7 +332,10 @@ class NodeService { return { updatedNode, createdTransaction }; } - return { updatedNode: undefined, createdTransaction: undefined }; + return { + updatedNode: undefined, + createdTransaction: undefined, + }; }); if (updatedNode) { @@ -340,6 +352,14 @@ class NodeService { userId, transaction: mapTransaction(createdTransaction), }); + + await interactionService.setInteraction( + userId, + createdTransaction.node_id, + createdTransaction.node_type, + 'lastReceivedTransactionId', + createdTransaction.id + ); } return updatedNode !== undefined; @@ -569,10 +589,10 @@ class NodeService { const attributes = ydoc.getAttributes(); - const result = await workspaceDatabase + const { createdNode } = await workspaceDatabase .transaction() .execute(async (trx) => { - const nodeRow = await trx + const createdNode = await trx .insertInto('nodes') .returningAll() .values({ @@ -595,21 +615,29 @@ class NodeService { created_at: transaction.createdAt, created_by: transaction.createdBy, retry_count: 0, - status: nodeRow ? 'synced' : 'incomplete', + status: createdNode ? 'synced' : 'incomplete', version, server_created_at: transaction.serverCreatedAt, }) .execute(); - return nodeRow; + return { createdNode }; }); - if (result) { + if (createdNode) { eventBus.publish({ type: 'node_created', userId, - node: mapNode(result), + node: mapNode(createdNode), }); + + await interactionService.setInteraction( + userId, + createdNode.id, + createdNode.type, + 'lastReceivedTransactionId', + transaction.id + ); } else { eventBus.publish({ type: 'node_transaction_incomplete', @@ -673,10 +701,10 @@ class NodeService { ydoc.applyUpdate(transaction.data); const attributes = ydoc.getAttributes(); - const result = await workspaceDatabase + const { updatedNode } = await workspaceDatabase .transaction() .execute(async (trx) => { - const nodeRow = await trx + const updatedNode = await trx .updateTable('nodes') .returningAll() .set({ @@ -699,21 +727,29 @@ class NodeService { created_at: transaction.createdAt, created_by: transaction.createdBy, retry_count: 0, - status: nodeRow ? 'synced' : 'incomplete', + status: updatedNode ? 'synced' : 'incomplete', version, server_created_at: transaction.serverCreatedAt, }) .execute(); - return nodeRow; + return { updatedNode }; }); - if (result) { + if (updatedNode) { eventBus.publish({ type: 'node_updated', userId, - node: mapNode(result), + node: mapNode(updatedNode), }); + + await interactionService.setInteraction( + userId, + updatedNode.id, + updatedNode.type, + 'lastReceivedTransactionId', + transaction.id + ); } else { eventBus.publish({ type: 'node_transaction_incomplete', @@ -738,6 +774,16 @@ class NodeService { .where('node_id', '=', transaction.nodeId) .execute(); + await trx + .deleteFrom('interactions') + .where('node_id', '=', transaction.nodeId) + .execute(); + + await trx + .deleteFrom('interaction_events') + .where('node_id', '=', transaction.nodeId) + .execute(); + const nodeRow = await trx .deleteFrom('nodes') .returningAll() diff --git a/apps/desktop/src/main/services/socket-connection.ts b/apps/desktop/src/main/services/socket-connection.ts index 9bc41680..187fdeb4 100644 --- a/apps/desktop/src/main/services/socket-connection.ts +++ b/apps/desktop/src/main/services/socket-connection.ts @@ -53,6 +53,8 @@ export class SocketConnection { syncService.syncServerRevocations(message); } else if (message.type === 'collaborations_batch') { syncService.syncServerCollaborations(message); + } else if (message.type === 'interactions_batch') { + syncService.syncServerInteractions(message); } }; @@ -77,10 +79,13 @@ export class SocketConnection { return this.socket !== null && this.socket.readyState === WebSocket.OPEN; } - public sendMessage(message: Message): void { + public sendMessage(message: Message): boolean { if (this.socket && this.isConnected()) { this.socket.send(JSON.stringify(message)); + return true; } + + return false; } public close(): void { diff --git a/apps/desktop/src/main/services/socket-service.ts b/apps/desktop/src/main/services/socket-service.ts index 74a550e5..d0f7a375 100644 --- a/apps/desktop/src/main/services/socket-service.ts +++ b/apps/desktop/src/main/services/socket-service.ts @@ -1,7 +1,7 @@ import { SocketConnection } from '@/main/services/socket-connection'; import { databaseService } from '@/main/data/database-service'; import { Message } from '@colanode/core'; -import { serverService } from './server-service'; +import { serverService } from '@/main/services/server-service'; import { eventBus } from '@/shared/lib/event-bus'; class SocketService { @@ -21,13 +21,13 @@ class SocketService { }); } - public sendMessage(accountId: string, message: Message) { + public sendMessage(accountId: string, message: Message): boolean { const connection = this.sockets.get(accountId); if (!connection) { - return; + return false; } - connection.sendMessage(message); + return connection.sendMessage(message); } public async checkConnections() { diff --git a/apps/desktop/src/main/services/sync-service.ts b/apps/desktop/src/main/services/sync-service.ts index de76ef13..77dd3316 100644 --- a/apps/desktop/src/main/services/sync-service.ts +++ b/apps/desktop/src/main/services/sync-service.ts @@ -4,21 +4,28 @@ import { eventBus } from '@/shared/lib/event-bus'; import { httpClient } from '@/shared/lib/http-client'; import { serverService } from '@/main/services/server-service'; import { + InteractionsBatchMessage, CollaborationRevocationsBatchMessage, CollaborationsBatchMessage, + FetchInteractionsMessage, FetchCollaborationRevocationsMessage, FetchCollaborationsMessage, FetchNodeTransactionsMessage, GetNodeTransactionsOutput, LocalNodeTransaction, NodeTransactionsBatchMessage, + SyncInteractionsMessage, SyncNodeTransactionsOutput, } from '@colanode/core'; import { logService } from '@/main/services/log-service'; import { nodeService } from '@/main/services/node-service'; import { socketService } from '@/main/services/socket-service'; import { collaborationService } from '@/main/services/collaboration-service'; -import { SelectNodeTransaction } from '@/main/data/workspace/schema'; +import { interactionService } from '@/main/services/interaction-service'; +import { + SelectInteractionEvent, + SelectNodeTransaction, +} from '@/main/data/workspace/schema'; import { sql } from 'kysely'; type WorkspaceSyncState = { @@ -38,9 +45,15 @@ class SyncService { WorkspaceSyncState > = new Map(); + private readonly localPendingInteractionStates: Map< + string, + WorkspaceSyncState + > = new Map(); + private readonly syncingTransactions: Set = new Set(); private readonly syncingCollaborations: Set = new Set(); private readonly syncingRevocations: Set = new Set(); + private readonly syncingInteractions: Set = new Set(); constructor() { eventBus.subscribe((event) => { @@ -54,6 +67,8 @@ class SyncService { this.syncAllWorkspaces(); } else if (event.type === 'collaboration_synced') { this.checkForMissingNode(event.userId, event.nodeId); + } else if (event.type === 'interaction_event_created') { + this.syncLocalPendingInteractions(event.userId); } }); } @@ -67,9 +82,13 @@ class SyncService { for (const workspace of workspaces) { this.syncLocalPendingTransactions(workspace.user_id); this.syncLocalIncompleteTransactions(workspace.user_id); + this.syncLocalPendingInteractions(workspace.user_id); + this.requireNodeTransactions(workspace.user_id); this.requireCollaborations(workspace.user_id); this.requireCollaborationRevocations(workspace.user_id); + this.requireInteractions(workspace.user_id); + this.syncMissingNodes(workspace.user_id); } } @@ -106,6 +125,38 @@ class SyncService { } } + public async syncLocalPendingInteractions(userId: string) { + if (!this.localPendingInteractionStates.has(userId)) { + this.localPendingInteractionStates.set(userId, { + isSyncing: false, + scheduledSync: false, + }); + } + + const syncState = this.localPendingInteractionStates.get(userId)!; + if (syncState.isSyncing) { + syncState.scheduledSync = true; + return; + } + + syncState.isSyncing = true; + try { + await this.sendLocalInteractions(userId); + } catch (error) { + this.logger.error( + error, + `Error syncing local interactions for user ${userId}` + ); + } finally { + syncState.isSyncing = false; + + if (syncState.scheduledSync) { + syncState.scheduledSync = false; + this.syncLocalPendingInteractions(userId); + } + } + } + public async syncLocalIncompleteTransactions(userId: string) { if (!this.localIncompleteTransactionStates.has(userId)) { this.localIncompleteTransactionStates.set(userId, { @@ -227,6 +278,38 @@ class SyncService { } } + public async syncServerInteractions(message: InteractionsBatchMessage) { + if (this.syncingInteractions.has(message.userId)) { + return; + } + + this.syncingInteractions.add(message.userId); + let cursor: bigint | null = null; + try { + for (const interaction of message.interactions) { + console.log('applying server interaction', interaction); + await interactionService.applyServerInteraction( + message.userId, + interaction + ); + cursor = BigInt(interaction.version); + } + + if (cursor) { + this.updateInteractionCursor(message.userId, cursor); + } + } catch (error) { + console.log('error syncing server interactions', error); + this.logger.error( + error, + `Error syncing server interactions for user ${message.userId}` + ); + } finally { + this.syncingInteractions.delete(message.userId); + this.requireInteractions(message.userId); + } + } + private async syncIncompleteTransactions(userId: string) { const workspaceDatabase = await databaseService.getWorkspaceDatabase(userId); @@ -532,6 +615,85 @@ class SyncService { } } + private async sendLocalInteractions(userId: string) { + const workspaceDatabase = + await databaseService.getWorkspaceDatabase(userId); + + const workspace = await databaseService.appDatabase + .selectFrom('workspaces') + .select(['user_id', 'workspace_id', 'account_id']) + .where('user_id', '=', userId) + .executeTakeFirst(); + + if (!workspace) { + return; + } + + const cutoff = new Date(Date.now() - 1000 * 60 * 5).toISOString(); + let hasMore = true; + + while (hasMore) { + const interactionEvents = await workspaceDatabase + .selectFrom('interaction_events') + .selectAll() + .where((eb) => + eb.or([eb('sent_at', 'is', null), eb('sent_at', '<', cutoff)]) + ) + .limit(50) + .execute(); + + if (interactionEvents.length === 0) { + hasMore = false; + break; + } + + const groupedByNodeId: Record = {}; + for (const event of interactionEvents) { + groupedByNodeId[event.node_id] = [ + ...(groupedByNodeId[event.node_id] ?? []), + event, + ]; + } + + const sentEventIds: string[] = []; + for (const [nodeId, events] of Object.entries(groupedByNodeId)) { + if (events.length === 0) { + continue; + } + + const firstEvent = events[0]; + if (!firstEvent) { + continue; + } + + const message: SyncInteractionsMessage = { + type: 'sync_interactions', + nodeId, + nodeType: firstEvent.node_type, + userId: workspace.user_id, + events: events.map((e) => ({ + attribute: e.attribute, + value: e.value, + createdAt: e.created_at, + })), + }; + + const sent = socketService.sendMessage(workspace.account_id, message); + if (sent) { + sentEventIds.push(...events.map((e) => e.event_id)); + } + } + + if (sentEventIds.length > 0) { + await workspaceDatabase + .updateTable('interaction_events') + .set({ sent_at: new Date().toISOString() }) + .where('event_id', 'in', sentEventIds) + .execute(); + } + } + } + private async requireNodeTransactions(userId: string) { const workspaceWithCursor = await databaseService.appDatabase .selectFrom('workspaces as w') @@ -608,14 +770,39 @@ class SyncService { socketService.sendMessage(workspaceWithCursor.account_id, message); } + private async requireInteractions(userId: string) { + const workspaceWithCursor = await databaseService.appDatabase + .selectFrom('workspaces as w') + .leftJoin('workspace_cursors as wc', 'w.user_id', 'wc.user_id') + .select([ + 'w.user_id', + 'w.workspace_id', + 'w.account_id', + 'wc.interactions', + ]) + .where('w.user_id', '=', userId) + .executeTakeFirst(); + + if (!workspaceWithCursor) { + return; + } + + const message: FetchInteractionsMessage = { + type: 'fetch_interactions', + userId: workspaceWithCursor.user_id, + workspaceId: workspaceWithCursor.workspace_id, + cursor: workspaceWithCursor.interactions?.toString() ?? '0', + }; + + socketService.sendMessage(workspaceWithCursor.account_id, message); + } + private async updateNodeTransactionCursor(userId: string, cursor: bigint) { await databaseService.appDatabase .insertInto('workspace_cursors') .values({ user_id: userId, transactions: cursor, - collaborations: 0n, - revocations: 0n, created_at: new Date().toISOString(), }) .onConflict((eb) => @@ -633,8 +820,6 @@ class SyncService { .values({ user_id: userId, collaborations: cursor, - revocations: 0n, - transactions: 0n, created_at: new Date().toISOString(), }) .onConflict((eb) => @@ -655,8 +840,6 @@ class SyncService { .values({ user_id: userId, revocations: cursor, - transactions: 0n, - collaborations: 0n, created_at: new Date().toISOString(), }) .onConflict((eb) => @@ -667,6 +850,23 @@ class SyncService { ) .execute(); } + + private async updateInteractionCursor(userId: string, cursor: bigint) { + await databaseService.appDatabase + .insertInto('workspace_cursors') + .values({ + user_id: userId, + interactions: cursor, + created_at: new Date().toISOString(), + }) + .onConflict((eb) => + eb.column('user_id').doUpdateSet({ + interactions: cursor, + updated_at: new Date().toISOString(), + }) + ) + .execute(); + } } export const syncService = new SyncService(); diff --git a/apps/desktop/src/main/utils.ts b/apps/desktop/src/main/utils.ts index b1215a97..9d593c34 100644 --- a/apps/desktop/src/main/utils.ts +++ b/apps/desktop/src/main/utils.ts @@ -8,6 +8,7 @@ import { } from 'kysely'; import path from 'path'; import { + SelectInteraction, SelectDownload, SelectNode, SelectNodeTransaction, @@ -25,6 +26,7 @@ import { Workspace } from '@/shared/types/workspaces'; import { Server } from '@/shared/types/servers'; import { Download, Upload } from '@/shared/types/nodes'; import { encodeState } from '@colanode/crdt'; +import { Interaction } from '@/shared/types/interactions'; export const appPath = app.getPath('userData'); @@ -93,7 +95,7 @@ export const fetchNodeAncestors = ( .select('ancestor_id') .where('descendant_id', '=', nodeId) ) - .where('type', '!=', NodeTypes.Workspace) + .where('type', '!=', 'workspace') .execute(); }; @@ -210,3 +212,20 @@ export const mapDownload = (row: SelectDownload): Download => { retryCount: row.retry_count, }; }; + +export const mapInteraction = (row: SelectInteraction): Interaction => { + return { + nodeId: row.node_id, + userId: row.user_id, + attributes: row.attributes, + createdAt: new Date(row.created_at), + updatedAt: row.updated_at ? new Date(row.updated_at) : null, + serverCreatedAt: row.server_created_at + ? new Date(row.server_created_at) + : null, + serverUpdatedAt: row.server_updated_at + ? new Date(row.server_updated_at) + : null, + version: row.version ? BigInt(row.version) : null, + }; +}; diff --git a/apps/desktop/src/renderer/components/collaborators/node-collaborator-audit.tsx b/apps/desktop/src/renderer/components/collaborators/node-collaborator-audit.tsx index 56001cd8..1a50309b 100644 --- a/apps/desktop/src/renderer/components/collaborators/node-collaborator-audit.tsx +++ b/apps/desktop/src/renderer/components/collaborators/node-collaborator-audit.tsx @@ -1,7 +1,7 @@ import { useWorkspace } from '@/renderer/contexts/workspace'; import { useQuery } from '@/renderer/hooks/use-query'; import { Avatar } from '@/renderer/components/avatars/avatar'; -import { timeAgo } from '@/shared/lib/utils'; +import { timeAgo } from '@colanode/core'; interface NodeCollaboratorAuditProps { collaboratorId: string; diff --git a/apps/desktop/src/renderer/components/databases/calendars/calendar-view-day.tsx b/apps/desktop/src/renderer/components/databases/calendars/calendar-view-day.tsx index 25962208..bc939ca9 100644 --- a/apps/desktop/src/renderer/components/databases/calendars/calendar-view-day.tsx +++ b/apps/desktop/src/renderer/components/databases/calendars/calendar-view-day.tsx @@ -1,5 +1,5 @@ -import { cn, isSameDay } from '@/shared/lib/utils'; -import { extractNodeRole, RecordNode } from '@colanode/core'; +import { cn } from '@/shared/lib/utils'; +import { extractNodeRole, RecordNode, isSameDay } from '@colanode/core'; import { CalendarViewRecordCard } from '@/renderer/components/databases/calendars/calendar-view-record-card'; import { Plus } from 'lucide-react'; import { useWorkspace } from '@/renderer/contexts/workspace'; diff --git a/apps/desktop/src/renderer/components/databases/calendars/calendar-view-grid.tsx b/apps/desktop/src/renderer/components/databases/calendars/calendar-view-grid.tsx index 3a90482c..6d9e75c0 100644 --- a/apps/desktop/src/renderer/components/databases/calendars/calendar-view-grid.tsx +++ b/apps/desktop/src/renderer/components/databases/calendars/calendar-view-grid.tsx @@ -1,14 +1,14 @@ import React from 'react'; import { buttonVariants } from '@/renderer/components/ui/button'; -import { - cn, - getDisplayedDates, - isSameDay, - toUTCDate, -} from '@/shared/lib/utils'; +import { cn, getDisplayedDates } from '@/shared/lib/utils'; import { DayPicker, DayProps } from 'react-day-picker'; import { CalendarViewDay } from '@/renderer/components/databases/calendars/calendar-view-day'; -import { FieldAttributes, ViewFilterAttributes } from '@colanode/core'; +import { + FieldAttributes, + ViewFilterAttributes, + isSameDay, + toUTCDate, +} from '@colanode/core'; import { useRecordsQuery } from '@/renderer/hooks/user-records-query'; import { filterRecords } from '@/shared/lib/databases'; import { useWorkspace } from '@/renderer/contexts/workspace'; diff --git a/apps/desktop/src/renderer/components/databases/view.tsx b/apps/desktop/src/renderer/components/databases/view.tsx index 9e0bb32e..8c9dd3e8 100644 --- a/apps/desktop/src/renderer/components/databases/view.tsx +++ b/apps/desktop/src/renderer/components/databases/view.tsx @@ -6,6 +6,7 @@ import { ViewFieldFilterAttributes, ViewFilterAttributes, ViewSortAttributes, + compareString, } from '@colanode/core'; import { TableView } from '@/renderer/components/databases/tables/table-view'; import { BoardView } from '@/renderer/components/databases/boards/board-view'; @@ -13,7 +14,6 @@ import { CalendarView } from '@/renderer/components/databases/calendars/calendar import { ViewContext } from '@/renderer/contexts/view'; import { useDatabase } from '@/renderer/contexts/database'; import { useWorkspace } from '@/renderer/contexts/workspace'; -import { compareString } from '@/shared/lib/utils'; import { generateFieldValuesFromFilters, generateViewFieldIndex, diff --git a/apps/desktop/src/renderer/components/files/file-sidebar.tsx b/apps/desktop/src/renderer/components/files/file-sidebar.tsx index 6d65f8e1..1d3d4222 100644 --- a/apps/desktop/src/renderer/components/files/file-sidebar.tsx +++ b/apps/desktop/src/renderer/components/files/file-sidebar.tsx @@ -1,9 +1,8 @@ import React from 'react'; import { formatBytes } from '@/shared/lib/files'; -import { formatDate } from '@/shared/lib/utils'; import { FileThumbnail } from '@/renderer/components/files/file-thumbnail'; import { Avatar } from '@/renderer/components/avatars/avatar'; -import { FileNode } from '@colanode/core'; +import { FileNode, formatDate } from '@colanode/core'; import { useWorkspace } from '@/renderer/contexts/workspace'; import { useQuery } from '@/renderer/hooks/use-query'; diff --git a/apps/desktop/src/renderer/components/messages/message-list.tsx b/apps/desktop/src/renderer/components/messages/message-list.tsx index 7982f0a9..f2ed419d 100644 --- a/apps/desktop/src/renderer/components/messages/message-list.tsx +++ b/apps/desktop/src/renderer/components/messages/message-list.tsx @@ -2,7 +2,7 @@ import React from 'react'; import { InView } from 'react-intersection-observer'; import { Message } from '@/renderer/components/messages/message'; import { useWorkspace } from '@/renderer/contexts/workspace'; -import { compareString } from '@/shared/lib/utils'; +import { compareString } from '@colanode/core'; import { useQueries } from '@/renderer/hooks/use-queries'; import { MessageListQueryInput } from '@/shared/queries/message-list'; import { useConversation } from '@/renderer/contexts/conversation'; diff --git a/apps/desktop/src/renderer/components/messages/message-time.tsx b/apps/desktop/src/renderer/components/messages/message-time.tsx index 0bd4b455..8cfdfc89 100644 --- a/apps/desktop/src/renderer/components/messages/message-time.tsx +++ b/apps/desktop/src/renderer/components/messages/message-time.tsx @@ -1,10 +1,9 @@ -import { MessageNode } from '@colanode/core'; +import { MessageNode, formatDate, timeAgo } from '@colanode/core'; import { Tooltip, TooltipContent, TooltipTrigger, } from '@/renderer/components/ui/tooltip'; -import { formatDate, timeAgo } from '@/shared/lib/utils'; interface MessageTimeProps { message: MessageNode; diff --git a/apps/desktop/src/renderer/components/records/values/record-url-value.tsx b/apps/desktop/src/renderer/components/records/values/record-url-value.tsx index 61a8acee..08c4de62 100644 --- a/apps/desktop/src/renderer/components/records/values/record-url-value.tsx +++ b/apps/desktop/src/renderer/components/records/values/record-url-value.tsx @@ -1,5 +1,4 @@ -import { UrlFieldAttributes } from '@colanode/core'; -import { cn, isValidUrl } from '@/shared/lib/utils'; +import { UrlFieldAttributes, isValidUrl } from '@colanode/core'; import { HoverCard, HoverCardContent, @@ -8,6 +7,7 @@ import { import { SmartTextInput } from '@/renderer/components/ui/smart-text-input'; import { ExternalLink } from 'lucide-react'; import { useRecord } from '@/renderer/contexts/record'; +import { cn } from '@/shared/lib/utils'; interface RecordUrlValueProps { field: UrlFieldAttributes; diff --git a/apps/desktop/src/renderer/components/spaces/space-sidebar-item.tsx b/apps/desktop/src/renderer/components/spaces/space-sidebar-item.tsx index b3836bcb..101b0b92 100644 --- a/apps/desktop/src/renderer/components/spaces/space-sidebar-item.tsx +++ b/apps/desktop/src/renderer/components/spaces/space-sidebar-item.tsx @@ -1,5 +1,5 @@ import React from 'react'; -import { SpaceNode } from '@colanode/core'; +import { SpaceNode, compareString } from '@colanode/core'; import { Avatar } from '@/renderer/components/avatars/avatar'; import { DropdownMenu, @@ -40,7 +40,6 @@ import { ChevronRight, } from 'lucide-react'; import { useQuery } from '@/renderer/hooks/use-query'; -import { compareString } from '@/shared/lib/utils'; interface SettingsState { open: boolean; diff --git a/apps/desktop/src/renderer/components/ui/date-picker.tsx b/apps/desktop/src/renderer/components/ui/date-picker.tsx index 1304beed..58c9c7bf 100644 --- a/apps/desktop/src/renderer/components/ui/date-picker.tsx +++ b/apps/desktop/src/renderer/components/ui/date-picker.tsx @@ -5,7 +5,8 @@ import { PopoverContent, PopoverTrigger, } from '@/renderer/components/ui/popover'; -import { cn, toUTCDate } from '@/shared/lib/utils'; +import { cn } from '@/shared/lib/utils'; +import { toUTCDate } from '@colanode/core'; interface DatePickerProps { value: Date | null; diff --git a/apps/desktop/src/renderer/components/workspaces/workspace-user-invite.tsx b/apps/desktop/src/renderer/components/workspaces/workspace-user-invite.tsx index 84b7df0f..42406c68 100644 --- a/apps/desktop/src/renderer/components/workspaces/workspace-user-invite.tsx +++ b/apps/desktop/src/renderer/components/workspaces/workspace-user-invite.tsx @@ -1,5 +1,5 @@ import React from 'react'; -import { isValidEmail } from '@/shared/lib/utils'; +import { isValidEmail } from '@colanode/core'; import { Button } from '@/renderer/components/ui/button'; import { useMutation } from '@/renderer/hooks/use-mutation'; import { Spinner } from '@/renderer/components/ui/spinner'; diff --git a/apps/desktop/src/shared/lib/avatars.ts b/apps/desktop/src/shared/lib/avatars.ts index d29f9098..58861524 100644 --- a/apps/desktop/src/shared/lib/avatars.ts +++ b/apps/desktop/src/shared/lib/avatars.ts @@ -1,5 +1,4 @@ -import { hashCode } from '@/shared/lib/utils'; -import { IdType } from '@colanode/core'; +import { IdType, hashCode } from '@colanode/core'; export const getAvatarSizeClasses = (size?: string) => { if (size === 'small') { diff --git a/apps/desktop/src/shared/lib/databases.ts b/apps/desktop/src/shared/lib/databases.ts index 84b5fde7..8a5c25c6 100644 --- a/apps/desktop/src/shared/lib/databases.ts +++ b/apps/desktop/src/shared/lib/databases.ts @@ -10,8 +10,9 @@ import { SelectFieldAttributes, ViewType, generateNodeIndex, + compareString, + isStringArray, } from '@colanode/core'; -import { compareString, isStringArray } from '@/shared/lib/utils'; export const getDefaultFieldWidth = (type: FieldType): number => { if (!type) return 0; diff --git a/apps/desktop/src/shared/lib/editor.ts b/apps/desktop/src/shared/lib/editor.ts index cb51ff5f..2a7e40d7 100644 --- a/apps/desktop/src/shared/lib/editor.ts +++ b/apps/desktop/src/shared/lib/editor.ts @@ -3,8 +3,8 @@ import { generateId, getIdTypeFromNode, generateNodeIndex, + compareString, } from '@colanode/core'; -import { compareString } from '@/shared/lib/utils'; import { JSONContent } from '@tiptap/core'; import { Block, BlockLeaf } from '@colanode/core'; diff --git a/apps/desktop/src/shared/lib/utils.ts b/apps/desktop/src/shared/lib/utils.ts index 78cb2b0f..50208ee6 100644 --- a/apps/desktop/src/shared/lib/utils.ts +++ b/apps/desktop/src/shared/lib/utils.ts @@ -5,16 +5,6 @@ export const cn = (...inputs: ClassValue[]) => { return twMerge(clsx(inputs)); }; -export const hashCode = (str: string) => { - let hash = 0; - for (let i = 0; i < str.length; i++) { - const character = str.charCodeAt(i); - hash = (hash << 5) - hash + character; - hash |= 0; // Convert to 32bit integer - } - return hash; -}; - export const updateScrollView = (container: HTMLElement, item: HTMLElement) => { const containerHeight = container.offsetHeight; const itemHeight = item ? item.offsetHeight : 0; @@ -29,137 +19,6 @@ export const updateScrollView = (container: HTMLElement, item: HTMLElement) => { } }; -export const timeAgo = (dateParam: Date | string) => { - if (dateParam == null) { - return 'N/A'; - } - - let date = dateParam; - if (typeof date === 'string') { - date = new Date(date); - } - - const diff = Number(new Date()) - date.getTime(); - const minute = 60 * 1000; - const hour = minute * 60; - const day = hour * 24; - const month = day * 30; - const year = day * 365; - switch (true) { - case diff < minute: { - const seconds = Math.round(diff / 1000); - return seconds < 5 ? 'Now' : `${seconds} seconds ago`; - } - case diff < hour: { - const minutes = Math.round(diff / minute); - return minutes === 1 ? '1 minute ago' : `${minutes} minutes ago`; - } - case diff < day: { - const hours = Math.round(diff / hour); - return hours === 1 ? '1 hour ago' : `${hours} hours ago`; - } - case diff < month: { - const days = Math.round(diff / day); - return days === 1 ? '1 day ago' : `${days} days ago`; - } - case diff < year: { - const months = Math.round(diff / month); - return months === 1 ? '1 month ago' : `${months} months ago`; - } - case diff > year: { - const years = Math.round(diff / year); - return years === 1 ? '1 year ago' : `${years} years ago`; - } - default: - return ''; - } -}; - -export const formatDate = (dateParam: Date | string | undefined): string => { - if (dateParam == null) { - return 'N/A'; - } - - const date = typeof dateParam === 'string' ? new Date(dateParam) : dateParam; - - const monthNames = [ - 'January', - 'February', - 'March', - 'April', - 'May', - 'June', - 'July', - 'August', - 'September', - 'October', - 'November', - 'December', - ]; - - const day = date.getDate(); - const monthIndex = date.getMonth(); - const year = date.getFullYear(); - const hour = date.getHours().toString().padStart(2, '0'); - const minute = date.getMinutes().toString().padStart(2, '0'); - - return `${monthNames[monthIndex]} ${day}, ${year} at ${hour}:${minute}`; -}; - -export const compareString = (a?: string | null, b?: string | null): number => { - if (a === b) { - return 0; - } - - if (a === undefined || a === null) { - return -1; - } - - if (b === undefined || b === null) { - return 1; - } - - if (a > b) { - return 1; - } - - return -1; -}; - -export const isValidUrl = (url: string) => { - try { - new URL(url); - return true; - } catch (err) { - return false; - } -}; - -const emailRegex = - /^[-!#$%&'*+/0-9=?A-Z^_a-z`{|}~](\.?[-!#$%&'*+/0-9=?A-Z^_a-z`{|}~])*@[a-zA-Z0-9](-*\.?[a-zA-Z0-9])*\.[a-zA-Z](-?[a-zA-Z0-9])+$/; -export const isValidEmail = (email: string) => { - if (!email) return false; - - const emailParts = email.split('@'); - - if (emailParts.length !== 2) return false; - - const account = emailParts[0]; - const address = emailParts[1]; - - if (!account || !address) return false; - - if (account.length > 64) return false; - - if (address.length > 255) return false; - - const domainParts = address.split('.'); - - if (domainParts.some((part) => part.length > 63)) return false; - - return emailRegex.test(email); -}; - export const getDisplayedDates = ( month: Date ): { @@ -187,48 +46,3 @@ export const getDisplayedDates = ( return { first: firstDayDisplayed, last: lastDayDisplayed }; }; - -export const isSameDay = ( - date1: Date | string | null, - date2: Date | string | null -) => { - if (date1 == null) { - return false; - } - - if (date2 == null) { - return false; - } - - const d1 = typeof date1 === 'string' ? new Date(date1) : date1; - const d2 = typeof date2 === 'string' ? new Date(date2) : date2; - - return d1.getDate() === d2.getDate() && d1.getMonth() === d2.getMonth(); -}; - -export const toUTCDate = (dateParam: Date | string): Date => { - const date = typeof dateParam === 'string' ? new Date(dateParam) : dateParam; - return new Date( - Date.UTC(date.getFullYear(), date.getMonth(), date.getDate()) - ); -}; - -export const isStringArray = ( - value: unknown | null | undefined -): value is string[] => { - if (value == null) { - return false; - } - - if (value === undefined) { - return false; - } - - if (value === null) { - return false; - } - - return ( - Array.isArray(value) && value.every((item) => typeof item === 'string') - ); -}; diff --git a/apps/desktop/src/shared/types/events.ts b/apps/desktop/src/shared/types/events.ts index f5e8e203..569833ed 100644 --- a/apps/desktop/src/shared/types/events.ts +++ b/apps/desktop/src/shared/types/events.ts @@ -2,7 +2,8 @@ import { LocalNodeTransaction, Node } from '@colanode/core'; import { Account } from '@/shared/types/accounts'; import { Workspace } from '@/shared/types/workspaces'; import { Server } from '@/shared/types/servers'; -import { Download, Upload, UserNode } from '@/shared/types/nodes'; +import { Download, Upload } from '@/shared/types/nodes'; +import { Interaction } from '@/shared/types/interactions'; export type NodeCreatedEvent = { type: 'node_created'; @@ -138,6 +139,12 @@ export type SocketConnectionOpenedEvent = { accountId: string; }; +export type InteractionEventCreatedEvent = { + type: 'interaction_event_created'; + userId: string; + nodeId: string; +}; + export type Event = | NodeCreatedEvent | NodeUpdatedEvent @@ -162,4 +169,5 @@ export type Event = | NodeTransactionIncompleteEvent | ServerAvailabilityChangedEvent | SocketConnectionOpenedEvent - | CollaborationCreatedEvent; + | CollaborationCreatedEvent + | InteractionEventCreatedEvent; diff --git a/apps/desktop/src/shared/types/interactions.ts b/apps/desktop/src/shared/types/interactions.ts new file mode 100644 index 00000000..1ac73ef3 --- /dev/null +++ b/apps/desktop/src/shared/types/interactions.ts @@ -0,0 +1,12 @@ +import { InteractionAttributes } from '@colanode/core'; + +export type Interaction = { + nodeId: string; + userId: string; + attributes: InteractionAttributes; + createdAt: Date; + updatedAt: Date | null; + serverCreatedAt: Date | null; + serverUpdatedAt: Date | null; + version: bigint | null; +}; diff --git a/apps/server/src/data/migrations.ts b/apps/server/src/data/migrations.ts index d5ce5a61..4b2e9638 100644 --- a/apps/server/src/data/migrations.ts +++ b/apps/server/src/data/migrations.ts @@ -99,7 +99,6 @@ const createNodesTable: Migration = { await db.schema .createTable('nodes') .addColumn('id', 'varchar(30)', (col) => col.notNull().primaryKey()) - .addColumn('workspace_id', 'varchar(30)', (col) => col.notNull()) .addColumn('type', 'varchar(30)', (col) => col.generatedAlwaysAs(sql`(attributes->>'type')::VARCHAR(30)`).stored() ) @@ -109,6 +108,7 @@ const createNodesTable: Migration = { .stored() .notNull() ) + .addColumn('workspace_id', 'varchar(30)', (col) => col.notNull()) .addColumn('attributes', 'jsonb', (col) => col.notNull()) .addColumn('created_at', 'timestamptz', (col) => col.notNull()) .addColumn('updated_at', 'timestamptz') @@ -136,9 +136,9 @@ const createNodeTransactionsTable: Migration = { await db.schema .createTable('node_transactions') .addColumn('id', 'varchar(30)', (col) => col.notNull().primaryKey()) - .addColumn('workspace_id', 'varchar(30)', (col) => col.notNull()) .addColumn('node_id', 'varchar(30)', (col) => col.notNull()) .addColumn('node_type', 'varchar(30)', (col) => col.notNull()) + .addColumn('workspace_id', 'varchar(30)', (col) => col.notNull()) .addColumn('operation', 'varchar(30)', (col) => col.notNull()) .addColumn('data', 'bytea') .addColumn('created_at', 'timestamptz', (col) => col.notNull()) @@ -362,6 +362,59 @@ const createUploadsTable: Migration = { }, }; +const createInteractionsTable: Migration = { + up: async (db) => { + await sql` + CREATE SEQUENCE IF NOT EXISTS interactions_version_seq + START WITH 1000000000 + INCREMENT BY 1 + NO MINVALUE + NO MAXVALUE + CACHE 1; + `.execute(db); + + await db.schema + .createTable('interactions') + .addColumn('user_id', 'varchar(30)', (col) => col.notNull()) + .addColumn('node_id', 'varchar(30)', (col) => col.notNull()) + .addColumn('node_type', 'varchar(30)', (col) => col.notNull()) + .addColumn('workspace_id', 'varchar(30)', (col) => col.notNull()) + .addColumn('attributes', 'jsonb') + .addColumn('created_at', 'timestamptz', (col) => col.notNull()) + .addColumn('updated_at', 'timestamptz') + .addColumn('server_created_at', 'timestamptz', (col) => col.notNull()) + .addColumn('server_updated_at', 'timestamptz') + .addColumn('version', 'bigint', (col) => + col.notNull().defaultTo(sql`nextval('interactions_version_seq')`) + ) + .addPrimaryKeyConstraint('interactions_pkey', ['user_id', 'node_id']) + .execute(); + + await sql` + CREATE OR REPLACE FUNCTION update_interaction_version() RETURNS TRIGGER AS $$ + BEGIN + NEW.version = nextval('interactions_version_seq'); + RETURN NEW; + END; + $$ LANGUAGE plpgsql; + + CREATE TRIGGER trg_update_interaction_version + BEFORE UPDATE ON interactions + FOR EACH ROW + EXECUTE FUNCTION update_interaction_version(); + `.execute(db); + }, + down: async (db) => { + await sql` + DROP TRIGGER IF EXISTS trg_update_interaction_version ON interactions; + DROP FUNCTION IF EXISTS update_interaction_version(); + DROP SEQUENCE IF EXISTS interactions_version_seq; + `.execute(db); + + await db.schema.dropTable('interactions').execute(); + }, +}; + export const databaseMigrations: Record = { '00001_create_accounts_table': createAccountsTable, '00002_create_devices_table': createDevicesTable, @@ -374,4 +427,5 @@ export const databaseMigrations: Record = { '00009_create_collaboration_revocations_table': createCollaborationRevocationsTable, '00010_create_uploads_table': createUploadsTable, + '00011_create_interactions_table': createInteractionsTable, }; diff --git a/apps/server/src/data/schema.ts b/apps/server/src/data/schema.ts index 526f6cdb..d12c9e8f 100644 --- a/apps/server/src/data/schema.ts +++ b/apps/server/src/data/schema.ts @@ -1,4 +1,10 @@ -import { NodeAttributes, NodeRole, WorkspaceRole } from '@colanode/core'; +import { + InteractionAttributes, + NodeAttributes, + NodeRole, + NodeType, + WorkspaceRole, +} from '@colanode/core'; import { ColumnType, Insertable, @@ -83,9 +89,9 @@ export type UpdateWorkspaceUser = Updateable; interface NodeTable { id: ColumnType; - workspace_id: ColumnType; + type: ColumnType; parent_id: ColumnType; - type: ColumnType; + workspace_id: ColumnType; attributes: JSONColumnType; created_at: ColumnType; updated_at: ColumnType; @@ -101,9 +107,9 @@ export type UpdateNode = Updateable; interface NodeTransactionTable { id: ColumnType; node_id: ColumnType; - node_type: ColumnType; - operation: ColumnType; + node_type: ColumnType; workspace_id: ColumnType; + operation: ColumnType; data: ColumnType; created_at: ColumnType; created_by: ColumnType; @@ -168,6 +174,27 @@ interface UploadTable { completed_at: ColumnType; } +interface InteractionTable { + user_id: ColumnType; + node_id: ColumnType; + node_type: ColumnType; + workspace_id: ColumnType; + attributes: JSONColumnType< + InteractionAttributes, + string | null, + string | null + >; + created_at: ColumnType; + updated_at: ColumnType; + server_created_at: ColumnType; + server_updated_at: ColumnType; + version: ColumnType; +} + +export type SelectInteraction = Selectable; +export type CreateInteraction = Insertable; +export type UpdateInteraction = Updateable; + export interface DatabaseSchema { accounts: AccountTable; devices: DeviceTable; @@ -179,4 +206,5 @@ export interface DatabaseSchema { collaboration_revocations: CollaborationRevocationTable; node_paths: NodePathTable; uploads: UploadTable; + interactions: InteractionTable; } diff --git a/apps/server/src/lib/nodes.ts b/apps/server/src/lib/nodes.ts index 940bc17e..b2320d44 100644 --- a/apps/server/src/lib/nodes.ts +++ b/apps/server/src/lib/nodes.ts @@ -2,6 +2,7 @@ import { database } from '@/data/database'; import { SelectCollaboration, SelectCollaborationRevocation, + SelectInteraction, SelectNode, SelectNodeTransaction, } from '@/data/schema'; @@ -9,6 +10,7 @@ import { NodeCollaborator } from '@/types/nodes'; import { NodeOutput, NodeRole, + ServerInteraction, ServerCollaboration, ServerCollaborationRevocation, ServerNodeTransaction, @@ -127,6 +129,23 @@ export const mapCollaboration = ( }; }; +export const mapInteraction = ( + interaction: SelectInteraction +): ServerInteraction => { + return { + userId: interaction.user_id, + nodeId: interaction.node_id, + nodeType: interaction.node_type, + workspaceId: interaction.workspace_id, + attributes: interaction.attributes, + createdAt: interaction.created_at.toISOString(), + updatedAt: interaction.updated_at?.toISOString() ?? null, + serverCreatedAt: interaction.server_created_at.toISOString(), + serverUpdatedAt: interaction.server_updated_at?.toISOString() ?? null, + version: interaction.version.toString(), + }; +}; + export const fetchNode = async (nodeId: string): Promise => { const result = await database .selectFrom('nodes') diff --git a/apps/server/src/services/interaction-service.ts b/apps/server/src/services/interaction-service.ts new file mode 100644 index 00000000..c45ebed1 --- /dev/null +++ b/apps/server/src/services/interaction-service.ts @@ -0,0 +1,157 @@ +import { database } from '@/data/database'; +import { eventBus } from '@/lib/event-bus'; +import { + InteractionAttributes, + InteractionEvent, + mergeInteractionAttributes, + SyncInteractionsMessage, +} from '@colanode/core'; + +const UPDATE_RETRIES_COUNT = 10; + +class InteractionService { + public async syncLocalInteractions( + accountId: string, + message: SyncInteractionsMessage + ) { + const workspaceUser = await database + .selectFrom('workspace_users') + .selectAll() + .where('id', '=', message.userId) + .executeTakeFirst(); + + if (!workspaceUser || workspaceUser.account_id !== accountId) { + console.log('workspace user not found', accountId, message.userId); + return false; + } + + for (let i = 0; i < UPDATE_RETRIES_COUNT; i++) { + const synced = await this.trySyncInteraction( + workspaceUser.workspace_id, + message + ); + + if (synced) { + return true; + } + } + + return false; + } + + private async trySyncInteraction( + workspaceId: string, + message: SyncInteractionsMessage + ): Promise { + if (message.events.length === 0) { + return true; + } + + const firstEvent = message.events[0]; + if (!firstEvent) { + return true; + } + + const lastEvent = message.events[message.events.length - 1]; + if (!lastEvent) { + return true; + } + + const interaction = await database + .selectFrom('interactions') + .selectAll() + .where('user_id', '=', message.userId) + .where('node_id', '=', message.nodeId) + .executeTakeFirst(); + + const attributes = this.buildInteractionAttributes( + interaction?.attributes, + message.events + ); + + if (!attributes) { + return true; + } + + if (interaction) { + const updatedInteraction = await database + .updateTable('interactions') + .returningAll() + .set({ + attributes: JSON.stringify(attributes), + updated_at: new Date(lastEvent.createdAt), + server_updated_at: new Date(), + }) + .where('user_id', '=', message.userId) + .where('node_id', '=', message.nodeId) + .where('version', '=', interaction.version) + .executeTakeFirst(); + + if (updatedInteraction) { + eventBus.publish({ + type: 'interaction_updated', + userId: message.userId, + nodeId: message.nodeId, + nodeType: message.nodeType, + workspaceId, + }); + + return true; + } + } + + const createdInteraction = await database + .insertInto('interactions') + .returningAll() + .values({ + user_id: message.userId, + node_id: message.nodeId, + node_type: message.nodeType, + workspace_id: workspaceId, + attributes: JSON.stringify(attributes), + created_at: new Date(firstEvent.createdAt), + server_created_at: new Date(), + }) + .onConflict((oc) => oc.columns(['user_id', 'node_id']).doNothing()) + .executeTakeFirst(); + + if (createdInteraction) { + eventBus.publish({ + type: 'interaction_updated', + userId: message.userId, + nodeId: message.nodeId, + nodeType: message.nodeType, + workspaceId, + }); + + return true; + } + + return false; + } + + private buildInteractionAttributes( + attributes: InteractionAttributes | undefined, + events: InteractionEvent[] + ): InteractionAttributes | null { + if (events.length === 0) { + return null; + } + + let result = { ...attributes }; + for (const event of events) { + const merged = mergeInteractionAttributes( + result, + event.attribute, + event.value + ); + if (merged) { + result = merged; + } + } + + return result; + } +} + +export const interactionService = new InteractionService(); diff --git a/apps/server/src/services/synapse-service.ts b/apps/server/src/services/synapse-service.ts index 44580bf2..b9ded2ae 100644 --- a/apps/server/src/services/synapse-service.ts +++ b/apps/server/src/services/synapse-service.ts @@ -3,6 +3,7 @@ import { Server } from 'http'; import { WebSocketServer, WebSocket } from 'ws'; import { verifyToken } from '@/lib/tokens'; import { + InteractionsBatchMessage, CollaborationRevocationsBatchMessage, CollaborationsBatchMessage, Message, @@ -10,7 +11,9 @@ import { NodeType, } from '@colanode/core'; import { logService } from '@/services/log-service'; +import { interactionService } from '@/services/interaction-service'; import { + mapInteraction, mapCollaboration, mapCollaborationRevocation, mapNodeTransaction, @@ -18,6 +21,7 @@ import { import { eventBus } from '@/lib/event-bus'; import { CollaboratorRemovedEvent, + InteractionUpdatedEvent, NodeTransactionCreatedEvent, } from '@/types/events'; @@ -35,6 +39,7 @@ interface SynapseConnection { transactions: Map; revocations: Map; collaborations: Map; + interactions: Map; } const PUBLIC_NODES: NodeType[] = ['workspace', 'user']; @@ -49,6 +54,8 @@ class SynapseService { this.handleNodeTransactionCreatedEvent(event); } else if (event.type === 'collaborator_removed') { this.handleCollaboratorRemovedEvent(event); + } else if (event.type === 'interaction_updated') { + this.handleInteractionUpdatedEvent(event); } }); } @@ -123,6 +130,7 @@ class SynapseService { transactions: new Map(), revocations: new Map(), collaborations: new Map(), + interactions: new Map(), }; this.connections.set(account.deviceId, connection); @@ -190,6 +198,23 @@ class SynapseService { state.cursor = message.cursor; this.sendPendingCollaborations(connection, message.userId); } + } else if (message.type === 'fetch_interactions') { + const state = connection.interactions.get(message.userId); + if (!state) { + connection.interactions.set(message.userId, { + userId: message.userId, + workspaceId: message.workspaceId, + cursor: message.cursor, + syncing: false, + }); + + this.sendPendingInteractions(connection, message.userId); + } else if (!state.syncing && state.cursor !== message.cursor) { + state.cursor = message.cursor; + this.sendPendingInteractions(connection, message.userId); + } + } else if (message.type === 'sync_interactions') { + interactionService.syncLocalInteractions(connection.accountId, message); } } @@ -218,7 +243,7 @@ class SynapseService { eb.or([ eb.and([ eb('nt.workspace_id', '=', state.workspaceId), - eb('nt.node_type', 'in', ['workspace', 'user']), + eb('nt.node_type', 'in', PUBLIC_NODES), ]), eb('c.node_id', '=', eb.ref('nt.node_id')), ]) @@ -324,6 +349,57 @@ class SynapseService { this.sendMessage(connection, message); } + private async sendPendingInteractions( + connection: SynapseConnection, + userId: string + ) { + const state = connection.interactions.get(userId); + if (!state || state.syncing) { + return; + } + + state.syncing = true; + this.logger.trace( + state, + `Sending pending interactions for ${connection.deviceId} with ${userId}` + ); + + const unsyncedInteractions = await database + .selectFrom('interactions as i') + .leftJoin('collaborations as c', (join) => + join.on('c.user_id', '=', userId).onRef('c.node_id', '=', 'i.node_id') + ) + .where((eb) => + eb.or([ + eb.and([ + eb('i.workspace_id', '=', state.workspaceId), + eb('i.node_type', 'in', PUBLIC_NODES), + ]), + eb('c.node_id', '=', eb.ref('i.node_id')), + ]) + ) + .selectAll('i') + .where('i.version', '>', BigInt(state.cursor)) + .orderBy('i.version', 'asc') + .limit(20) + .execute(); + + if (unsyncedInteractions.length === 0) { + state.syncing = false; + return; + } + + const interactions = unsyncedInteractions.map(mapInteraction); + const message: InteractionsBatchMessage = { + type: 'interactions_batch', + userId, + interactions, + }; + + connection.interactions.delete(userId); + this.sendMessage(connection, message); + } + private async handleNodeTransactionCreatedEvent( event: NodeTransactionCreatedEvent ) { @@ -370,6 +446,52 @@ class SynapseService { } } + private async handleInteractionUpdatedEvent(event: InteractionUpdatedEvent) { + const userDevices = this.getPendingInteractionsCursors(event.workspaceId); + const userIds = Array.from(userDevices.keys()); + if (userIds.length === 0) { + return; + } + + const collaborations = await database + .selectFrom('collaborations') + .selectAll() + .where((eb) => + eb.and([eb('user_id', 'in', userIds), eb('node_id', '=', event.nodeId)]) + ) + .execute(); + + let usersToSend: string[] = []; + if (PUBLIC_NODES.includes(event.nodeType)) { + usersToSend = userIds; + } else { + const collaborations = await database + .selectFrom('collaborations') + .selectAll() + .where((eb) => + eb.and([ + eb('user_id', 'in', userIds), + eb('node_id', '=', event.nodeId), + ]) + ) + .execute(); + + usersToSend = collaborations.map((c) => c.user_id); + } + + for (const userId of usersToSend) { + const deviceIds = userDevices.get(userId) ?? []; + for (const deviceId of deviceIds) { + const socketConnection = this.connections.get(deviceId); + if (socketConnection === undefined) { + continue; + } + + this.sendPendingTransactions(socketConnection, userId); + } + } + } + private handleCollaboratorRemovedEvent(event: CollaboratorRemovedEvent) { const deviceIds = this.getPendingRevocationsCursors(event.userId); for (const deviceId of deviceIds) { @@ -402,6 +524,26 @@ class SynapseService { return userDevices; } + private getPendingInteractionsCursors( + workspaceId: string + ): Map { + const userDevices = new Map(); + for (const connection of this.connections.values()) { + const connectionUsers = connection.interactions.values(); + for (const user of connectionUsers) { + if (user.workspaceId !== workspaceId || user.syncing) { + continue; + } + + const userIds = userDevices.get(user.userId) ?? []; + userIds.push(connection.deviceId); + userDevices.set(user.userId, userIds); + } + } + + return userDevices; + } + private getPendingRevocationsCursors(userId: string): string[] { const userDevices: string[] = []; for (const connection of this.connections.values()) { diff --git a/apps/server/src/types/events.ts b/apps/server/src/types/events.ts index 52758102..83ed2745 100644 --- a/apps/server/src/types/events.ts +++ b/apps/server/src/types/events.ts @@ -14,4 +14,15 @@ export type CollaboratorRemovedEvent = { nodeId: string; }; -export type Event = NodeTransactionCreatedEvent | CollaboratorRemovedEvent; +export type InteractionUpdatedEvent = { + type: 'interaction_updated'; + userId: string; + nodeId: string; + nodeType: NodeType; + workspaceId: string; +}; + +export type Event = + | NodeTransactionCreatedEvent + | CollaboratorRemovedEvent + | InteractionUpdatedEvent; diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index a7571ac3..fe678182 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -2,6 +2,8 @@ export * from './lib/constants'; export * from './lib/id'; export * from './lib/nodes'; export * from './lib/files'; +export * from './lib/interactions'; +export * from './lib/utils'; export * from './registry/block'; export * from './registry/channel'; export * from './registry/chat'; @@ -25,3 +27,4 @@ export * from './types/accounts'; export * from './types/messages'; export * from './types/servers'; export * from './types/files'; +export * from './types/interactions'; diff --git a/packages/core/src/lib/id.ts b/packages/core/src/lib/id.ts index 2bf1857b..e25af9e1 100644 --- a/packages/core/src/lib/id.ts +++ b/packages/core/src/lib/id.ts @@ -46,6 +46,7 @@ export enum IdType { Device = 'dv', Upload = 'up', Transaction = 'tx', + Event = 'ev', } export const generateId = (type: IdType): string => { diff --git a/packages/core/src/lib/interactions.ts b/packages/core/src/lib/interactions.ts new file mode 100644 index 00000000..de6cd0d8 --- /dev/null +++ b/packages/core/src/lib/interactions.ts @@ -0,0 +1,57 @@ +import { + InteractionAttribute, + InteractionAttributes, +} from '../types/interactions'; +import { compareDate, compareString } from './utils'; + +export const mergeInteractionAttributes = ( + attributes: InteractionAttributes | null | undefined, + attribute: InteractionAttribute, + value: string +): InteractionAttributes | null => { + if (!attributes) { + return { [attribute]: value }; + } + + if (attribute === 'firstSeenAt') { + const date = new Date(value); + + if ( + !attributes.firstSeenAt || + compareDate(attributes.firstSeenAt, date) < 0 + ) { + return { ...attributes, firstSeenAt: date }; + } + } + + if (attribute === 'lastSeenAt') { + const date = new Date(value); + + if ( + !attributes.lastSeenAt || + compareDate(attributes.lastSeenAt, date) > 0 + ) { + return { ...attributes, lastSeenAt: date }; + } + } + + if (attribute === 'lastReceivedTransactionId') { + if ( + !attributes.lastReceivedTransactionId || + compareString(attributes.lastReceivedTransactionId, value) > 0 + ) { + return { ...attributes, lastReceivedTransactionId: value }; + } + } + + if (attribute === 'lastSeenTransactionId') { + if ( + !attributes.lastSeenTransactionId || + compareString(attributes.lastSeenTransactionId, value) > 0 + ) { + return { ...attributes, lastSeenTransactionId: value }; + } + } + + return null; +}; diff --git a/packages/core/src/lib/utils.ts b/packages/core/src/lib/utils.ts new file mode 100644 index 00000000..19102096 --- /dev/null +++ b/packages/core/src/lib/utils.ts @@ -0,0 +1,210 @@ +export const compareString = (a?: string | null, b?: string | null): number => { + if (a === b) { + return 0; + } + + if (a === undefined || a === null) { + return -1; + } + + if (b === undefined || b === null) { + return 1; + } + + if (a > b) { + return 1; + } + + return -1; +}; + +export const compareDate = ( + a?: Date | string | null, + b?: Date | string | null +): number => { + const aIsNull = a == null || a === undefined; + const bIsNull = b == null || b === undefined; + + if (aIsNull && bIsNull) { + return 0; + } + + if (aIsNull) { + return -1; + } + + if (bIsNull) { + return 1; + } + + const aDate = typeof a === 'string' ? new Date(a) : a; + const bDate = typeof b === 'string' ? new Date(b) : b; + + return aDate.getTime() - bDate.getTime(); +}; + +export const isValidUrl = (url: string) => { + try { + new URL(url); + return true; + } catch (err) { + return false; + } +}; + +const emailRegex = + /^[-!#$%&'*+/0-9=?A-Z^_a-z`{|}~](\.?[-!#$%&'*+/0-9=?A-Z^_a-z`{|}~])*@[a-zA-Z0-9](-*\.?[a-zA-Z0-9])*\.[a-zA-Z](-?[a-zA-Z0-9])+$/; +export const isValidEmail = (email: string) => { + if (!email) return false; + + const emailParts = email.split('@'); + + if (emailParts.length !== 2) return false; + + const account = emailParts[0]; + const address = emailParts[1]; + + if (!account || !address) return false; + + if (account.length > 64) return false; + + if (address.length > 255) return false; + + const domainParts = address.split('.'); + + if (domainParts.some((part) => part.length > 63)) return false; + + return emailRegex.test(email); +}; + +export const isSameDay = ( + date1: Date | string | null, + date2: Date | string | null +) => { + if (date1 == null) { + return false; + } + + if (date2 == null) { + return false; + } + + const d1 = typeof date1 === 'string' ? new Date(date1) : date1; + const d2 = typeof date2 === 'string' ? new Date(date2) : date2; + + return d1.getDate() === d2.getDate() && d1.getMonth() === d2.getMonth(); +}; + +export const toUTCDate = (dateParam: Date | string): Date => { + const date = typeof dateParam === 'string' ? new Date(dateParam) : dateParam; + return new Date( + Date.UTC(date.getFullYear(), date.getMonth(), date.getDate()) + ); +}; + +export const isStringArray = ( + value: unknown | null | undefined +): value is string[] => { + if (value == null) { + return false; + } + + if (value === undefined) { + return false; + } + + if (value === null) { + return false; + } + + return ( + Array.isArray(value) && value.every((item) => typeof item === 'string') + ); +}; + +export const formatDate = (dateParam: Date | string | undefined): string => { + if (dateParam == null) { + return 'N/A'; + } + + const date = typeof dateParam === 'string' ? new Date(dateParam) : dateParam; + + const monthNames = [ + 'January', + 'February', + 'March', + 'April', + 'May', + 'June', + 'July', + 'August', + 'September', + 'October', + 'November', + 'December', + ]; + + const day = date.getDate(); + const monthIndex = date.getMonth(); + const year = date.getFullYear(); + const hour = date.getHours().toString().padStart(2, '0'); + const minute = date.getMinutes().toString().padStart(2, '0'); + + return `${monthNames[monthIndex]} ${day}, ${year} at ${hour}:${minute}`; +}; + +export const timeAgo = (dateParam: Date | string) => { + if (dateParam == null) { + return 'N/A'; + } + + let date = dateParam; + if (typeof date === 'string') { + date = new Date(date); + } + + const diff = Number(new Date()) - date.getTime(); + const minute = 60 * 1000; + const hour = minute * 60; + const day = hour * 24; + const month = day * 30; + const year = day * 365; + switch (true) { + case diff < minute: { + const seconds = Math.round(diff / 1000); + return seconds < 5 ? 'Now' : `${seconds} seconds ago`; + } + case diff < hour: { + const minutes = Math.round(diff / minute); + return minutes === 1 ? '1 minute ago' : `${minutes} minutes ago`; + } + case diff < day: { + const hours = Math.round(diff / hour); + return hours === 1 ? '1 hour ago' : `${hours} hours ago`; + } + case diff < month: { + const days = Math.round(diff / day); + return days === 1 ? '1 day ago' : `${days} days ago`; + } + case diff < year: { + const months = Math.round(diff / month); + return months === 1 ? '1 month ago' : `${months} months ago`; + } + case diff > year: { + const years = Math.round(diff / year); + return years === 1 ? '1 year ago' : `${years} years ago`; + } + default: + return ''; + } +}; + +export const hashCode = (str: string) => { + let hash = 0; + for (let i = 0; i < str.length; i++) { + const character = str.charCodeAt(i); + hash = (hash << 5) - hash + character; + hash |= 0; // Convert to 32bit integer + } + return hash; +}; diff --git a/packages/core/src/types/interactions.ts b/packages/core/src/types/interactions.ts new file mode 100644 index 00000000..057b0e9f --- /dev/null +++ b/packages/core/src/types/interactions.ts @@ -0,0 +1,14 @@ +export type InteractionAttributes = { + lastReceivedTransactionId?: string | null; + lastSeenTransactionId?: string | null; + firstSeenAt?: Date | null; + lastSeenAt?: Date | null; +}; + +export type InteractionAttribute = keyof InteractionAttributes; + +export type InteractionEvent = { + attribute: InteractionAttribute; + value: string; + createdAt: string; +}; diff --git a/packages/core/src/types/messages.ts b/packages/core/src/types/messages.ts index 76d80e33..4f60648d 100644 --- a/packages/core/src/types/messages.ts +++ b/packages/core/src/types/messages.ts @@ -1,8 +1,11 @@ +import { NodeType } from '~/registry'; import { + ServerInteraction, ServerCollaboration, ServerCollaborationRevocation, ServerNodeTransaction, } from './sync'; +import { InteractionEvent } from './interactions'; export type FetchNodeTransactionsMessage = { type: 'fetch_node_transactions'; @@ -25,6 +28,13 @@ export type FetchCollaborationsMessage = { cursor: string; }; +export type FetchInteractionsMessage = { + type: 'fetch_interactions'; + userId: string; + workspaceId: string; + cursor: string; +}; + export type NodeTransactionsBatchMessage = { type: 'node_transactions_batch'; userId: string; @@ -43,10 +53,27 @@ export type CollaborationsBatchMessage = { collaborations: ServerCollaboration[]; }; +export type InteractionsBatchMessage = { + type: 'interactions_batch'; + userId: string; + interactions: ServerInteraction[]; +}; + +export type SyncInteractionsMessage = { + type: 'sync_interactions'; + userId: string; + nodeId: string; + nodeType: NodeType; + events: InteractionEvent[]; +}; + export type Message = | FetchNodeTransactionsMessage | NodeTransactionsBatchMessage | FetchCollaborationRevocationsMessage | CollaborationRevocationsBatchMessage | FetchCollaborationsMessage - | CollaborationsBatchMessage; + | CollaborationsBatchMessage + | FetchInteractionsMessage + | InteractionsBatchMessage + | SyncInteractionsMessage; diff --git a/packages/core/src/types/sync.ts b/packages/core/src/types/sync.ts index a7c9d7ee..62f6e5ac 100644 --- a/packages/core/src/types/sync.ts +++ b/packages/core/src/types/sync.ts @@ -1,4 +1,6 @@ import { NodeRole } from '~/registry/core'; +import { InteractionAttributes } from './interactions'; +import { NodeType } from '~/registry'; export type SyncNodeTransactionsInput = { transactions: LocalNodeTransaction[]; @@ -53,7 +55,7 @@ export type ServerNodeCreateTransaction = { id: string; operation: 'create'; nodeId: string; - nodeType: string; + nodeType: NodeType; workspaceId: string; data: string; createdAt: string; @@ -66,7 +68,7 @@ export type ServerNodeUpdateTransaction = { id: string; operation: 'update'; nodeId: string; - nodeType: string; + nodeType: NodeType; workspaceId: string; data: string; createdAt: string; @@ -79,7 +81,7 @@ export type ServerNodeDeleteTransaction = { id: string; operation: 'delete'; nodeId: string; - nodeType: string; + nodeType: NodeType; workspaceId: string; createdAt: string; createdBy: string; @@ -109,3 +111,16 @@ export type ServerCollaboration = { updatedAt: string | null; version: string; }; + +export type ServerInteraction = { + userId: string; + nodeId: string; + nodeType: NodeType; + workspaceId: string; + attributes: InteractionAttributes; + createdAt: string; + updatedAt: string | null; + serverCreatedAt: string; + serverUpdatedAt: string | null; + version: string; +};