diff --git a/apps/desktop/src/main/data/workspace/migrations.ts b/apps/desktop/src/main/data/workspace/migrations.ts index 1eb84a6e..064f4441 100644 --- a/apps/desktop/src/main/data/workspace/migrations.ts +++ b/apps/desktop/src/main/data/workspace/migrations.ts @@ -122,6 +122,11 @@ const createInteractionsTable: Migration = { .addColumn('node_id', 'text', (col) => col.notNull()) .addColumn('node_type', 'text', (col) => col.notNull()) .addColumn('attributes', 'text') + .addColumn('last_seen_at', 'text', (col) => + col + .generatedAlwaysAs(sql`json_extract(attributes, '$.lastSeenAt')`) + .stored() + ) .addColumn('created_at', 'text', (col) => col.notNull()) .addColumn('updated_at', 'text') .addColumn('server_created_at', 'text') diff --git a/apps/desktop/src/main/data/workspace/schema.ts b/apps/desktop/src/main/data/workspace/schema.ts index 50fc9c00..f46e31d5 100644 --- a/apps/desktop/src/main/data/workspace/schema.ts +++ b/apps/desktop/src/main/data/workspace/schema.ts @@ -93,6 +93,7 @@ interface InteractionTable { node_id: ColumnType; node_type: ColumnType; attributes: ColumnType; + last_seen_at: ColumnType; created_at: ColumnType; updated_at: ColumnType; server_created_at: ColumnType; diff --git a/apps/desktop/src/main/mutations/mark-node-as-seen.ts b/apps/desktop/src/main/mutations/mark-node-as-seen.ts index 325af264..9b2096ae 100644 --- a/apps/desktop/src/main/mutations/mark-node-as-seen.ts +++ b/apps/desktop/src/main/mutations/mark-node-as-seen.ts @@ -1,12 +1,9 @@ -import { generateId, IdType } from '@colanode/core'; -import { databaseService } from '@/main/data/database-service'; import { MutationHandler } from '@/main/types'; import { MarkNodeAsSeenMutationInput, MarkNodeAsSeenMutationOutput, } from '@/shared/mutations/mark-node-as-seen'; -import { UserNode } from '@/shared/types/nodes'; -import { eventBus } from '@/shared/lib/event-bus'; +import { interactionService } from '@/main/services/interaction-service'; export class MarkNodeAsSeenMutationHandler implements MutationHandler @@ -14,103 +11,13 @@ export class MarkNodeAsSeenMutationHandler async handleMutation( input: MarkNodeAsSeenMutationInput ): Promise { - // const workspaceDatabase = await databaseService.getWorkspaceDatabase( - // input.userId - // ); - - // const existingUserNode = await workspaceDatabase - // .selectFrom('user_nodes') - // .where('node_id', '=', input.nodeId) - // .where('user_id', '=', input.userId) - // .selectAll() - // .executeTakeFirst(); - - // if ( - // existingUserNode && - // existingUserNode.last_seen_version_id === input.versionId - // ) { - // const lastSeenAt = existingUserNode.last_seen_at - // ? new Date(existingUserNode.last_seen_at) - // : null; - // // if has been seen in the last 10 minutes, skip it. We don't want to spam the server with seen events. - // if (lastSeenAt && Date.now() - lastSeenAt.getTime() < 10 * 60 * 1000) { - // return { - // success: true, - // }; - // } - // } - - // let changeId: number | undefined; - // let userNode: UserNode | undefined; - - // const changeData: LocalUserNodeChangeData = { - // type: 'user_node_update', - // nodeId: input.nodeId, - // userId: input.userId, - // lastSeenVersionId: input.versionId, - // lastSeenAt: new Date().toISOString(), - // mentionsCount: 0, - // versionId: generateId(IdType.Version), - // }; - - // await workspaceDatabase.transaction().execute(async (trx) => { - // const updatedUserNode = await trx - // .updateTable('user_nodes') - // .set({ - // last_seen_version_id: input.versionId, - // last_seen_at: new Date().toISOString(), - // mentions_count: 0, - // version_id: generateId(IdType.Version), - // }) - // .where('node_id', '=', input.nodeId) - // .where('user_id', '=', input.userId) - // .returningAll() - // .executeTakeFirst(); - - // if (updatedUserNode) { - // userNode = { - // userId: updatedUserNode.user_id, - // nodeId: updatedUserNode.node_id, - // lastSeenAt: updatedUserNode.last_seen_at, - // lastSeenVersionId: updatedUserNode.last_seen_version_id, - // mentionsCount: updatedUserNode.mentions_count, - // attributes: updatedUserNode.attributes, - // versionId: updatedUserNode.version_id, - // createdAt: updatedUserNode.created_at, - // updatedAt: updatedUserNode.updated_at, - // }; - // } - - // const createdChange = await trx - // .insertInto('changes') - // .values({ - // data: JSON.stringify(changeData), - // created_at: new Date().toISOString(), - // retry_count: 0, - // }) - // .returning('id') - // .executeTakeFirst(); - - // if (createdChange) { - // changeId = createdChange.id; - // } - // }); - - // if (userNode) { - // eventBus.publish({ - // type: 'user_node_created', - // userId: input.userId, - // userNode, - // }); - // } - - // if (changeId) { - // eventBus.publish({ - // type: 'change_created', - // userId: input.userId, - // changeId, - // }); - // } + await interactionService.setInteraction( + input.userId, + input.nodeId, + 'message', + 'lastSeenAt', + new Date().toISOString() + ); return { success: true, diff --git a/apps/desktop/src/main/services/interaction-service.ts b/apps/desktop/src/main/services/interaction-service.ts index a120b4e7..883ef42f 100644 --- a/apps/desktop/src/main/services/interaction-service.ts +++ b/apps/desktop/src/main/services/interaction-service.ts @@ -10,6 +10,7 @@ import { import { databaseService } from '@/main/data/database-service'; import { SelectInteractionEvent } from '@/main/data/workspace/schema'; import { eventBus } from '@/shared/lib/event-bus'; +import { mapInteraction } from '../utils'; const UPDATE_RETRIES_COUNT = 10; @@ -70,7 +71,7 @@ class InteractionService { } if (interaction) { - const result = await workspaceDatabase + const { updatedInteraction } = await workspaceDatabase .transaction() .execute(async (tx) => { const updatedInteraction = await tx @@ -86,7 +87,7 @@ class InteractionService { .executeTakeFirst(); if (!updatedInteraction) { - return false; + return { updatedInteraction: undefined }; } await tx @@ -108,70 +109,88 @@ class InteractionService { ) .execute(); - return true; + return { updatedInteraction }; + }); + + if (updatedInteraction) { + eventBus.publish({ + type: 'interaction_updated', + userId, + interaction: mapInteraction(updatedInteraction), }); - if (result) { eventBus.publish({ type: 'interaction_event_created', userId, nodeId, }); + + return true; } - return result; + return false; } - const result = await workspaceDatabase.transaction().execute(async (tx) => { - const createdInteraction = await tx - .insertInto('interactions') - .returningAll() - .values({ - node_id: nodeId, - node_type: nodeType, - user_id: userId, - attributes: JSON.stringify(attributes), - created_at: new Date().toISOString(), - version: BigInt(0), - }) - .onConflict((b) => b.columns(['node_id', 'user_id']).doNothing()) - .executeTakeFirst(); + const { createdInteraction } = await workspaceDatabase + .transaction() + .execute(async (tx) => { + const createdInteraction = await tx + .insertInto('interactions') + .returningAll() + .values({ + node_id: nodeId, + node_type: nodeType, + user_id: userId, + attributes: JSON.stringify(attributes), + created_at: new Date().toISOString(), + version: BigInt(0), + }) + .onConflict((b) => b.columns(['node_id', 'user_id']).doNothing()) + .executeTakeFirst(); - if (!createdInteraction) { - return false; - } + if (!createdInteraction) { + return { createdInteraction: undefined }; + } - await tx - .insertInto('interaction_events') - .values({ - node_id: nodeId, - node_type: nodeType, - attribute, - value, - created_at: new Date().toISOString(), - event_id: generateId(IdType.Event), - }) - .onConflict((b) => - b.columns(['node_id', 'attribute']).doUpdateSet({ + await tx + .insertInto('interaction_events') + .values({ + node_id: nodeId, + node_type: nodeType, + attribute, value, - sent_at: null, + created_at: new Date().toISOString(), event_id: generateId(IdType.Event), }) - ) - .execute(); + .onConflict((b) => + b.columns(['node_id', 'attribute']).doUpdateSet({ + value, + sent_at: null, + event_id: generateId(IdType.Event), + }) + ) + .execute(); - return true; - }); + return { createdInteraction }; + }); + + if (createdInteraction) { + eventBus.publish({ + type: 'interaction_updated', + userId, + interaction: mapInteraction(createdInteraction), + }); - if (result) { eventBus.publish({ type: 'interaction_event_created', userId, nodeId, }); + + return true; } - return result; + return false; } public async applyServerInteraction( @@ -201,8 +220,9 @@ class InteractionService { const workspaceDatabase = await databaseService.getWorkspaceDatabase(userId); - await workspaceDatabase + const createdInteraction = await workspaceDatabase .insertInto('interactions') + .returningAll() .values({ user_id: interaction.userId, node_id: interaction.nodeId, @@ -222,7 +242,17 @@ class InteractionService { version: BigInt(interaction.version), }) ) - .execute(); + .executeTakeFirst(); + + if (createdInteraction) { + eventBus.publish({ + type: 'interaction_updated', + userId, + interaction: mapInteraction(createdInteraction), + }); + } + + return createdInteraction; } private async tryApplyServerInteraction( @@ -250,7 +280,7 @@ class InteractionService { ); if (existingInteraction) { - const result = await workspaceDatabase + const { updatedInteraction } = await workspaceDatabase .transaction() .execute(async (tx) => { const updatedInteraction = await tx @@ -268,7 +298,7 @@ class InteractionService { .executeTakeFirst(); if (!updatedInteraction) { - return false; + return { updatedInteraction: undefined }; } if (toDeleteEventIds.length > 0) { @@ -279,46 +309,68 @@ class InteractionService { .execute(); } - return true; + return { updatedInteraction }; }); - return result; + if (updatedInteraction) { + eventBus.publish({ + type: 'interaction_updated', + userId, + interaction: mapInteraction(updatedInteraction), + }); + + return true; + } + + return false; } - const result = await workspaceDatabase.transaction().execute(async (tx) => { - const createdInteraction = await tx - .insertInto('interactions') - .returningAll() - .values({ - user_id: interaction.userId, - node_id: interaction.nodeId, - node_type: interaction.nodeType, - attributes: JSON.stringify(attributes), - created_at: interaction.createdAt, - updated_at: interaction.updatedAt, - server_created_at: interaction.serverCreatedAt, - server_updated_at: interaction.serverUpdatedAt, - version: BigInt(interaction.version), - }) - .onConflict((b) => b.columns(['node_id', 'user_id']).doNothing()) - .executeTakeFirst(); + const { createdInteraction } = await workspaceDatabase + .transaction() + .execute(async (tx) => { + const createdInteraction = await tx + .insertInto('interactions') + .returningAll() + .values({ + user_id: interaction.userId, + node_id: interaction.nodeId, + node_type: interaction.nodeType, + attributes: JSON.stringify(attributes), + created_at: interaction.createdAt, + updated_at: interaction.updatedAt, + server_created_at: interaction.serverCreatedAt, + server_updated_at: interaction.serverUpdatedAt, + version: BigInt(interaction.version), + }) + .onConflict((b) => b.columns(['node_id', 'user_id']).doNothing()) + .executeTakeFirst(); - if (!createdInteraction) { - return false; - } + if (!createdInteraction) { + return { createdInteraction: undefined }; + } - if (toDeleteEventIds.length > 0) { - await tx - .deleteFrom('interaction_events') - .where('node_id', '=', interaction.nodeId) - .where('event_id', 'in', toDeleteEventIds) - .execute(); - } + if (toDeleteEventIds.length > 0) { + await tx + .deleteFrom('interaction_events') + .where('node_id', '=', interaction.nodeId) + .where('event_id', 'in', toDeleteEventIds) + .execute(); + } + + return { createdInteraction }; + }); + + if (createdInteraction) { + eventBus.publish({ + type: 'interaction_updated', + userId, + interaction: mapInteraction(createdInteraction), + }); return true; - }); + } - return result; + return false; } private mergeServerAttributes( diff --git a/apps/desktop/src/main/services/radar-service.ts b/apps/desktop/src/main/services/radar-service.ts index c65f255d..1d45ca23 100644 --- a/apps/desktop/src/main/services/radar-service.ts +++ b/apps/desktop/src/main/services/radar-service.ts @@ -2,6 +2,7 @@ import { databaseService } from '@/main/data/database-service'; import { WorkspaceRadarData } from '@/shared/types/radars'; import { eventBus } from '@/shared/lib/event-bus'; import { Event } from '@/shared/types/events'; +import { getIdType, IdType } from '@colanode/core'; class RadarService { private readonly workspaceStates: Map = new Map(); @@ -69,56 +70,46 @@ class RadarService { nodeStates: {}, }; - // const nodeUnreadMessageCounts = await workspaceDatabase - // .selectFrom('user_nodes as un') - // .innerJoin('nodes as n', 'un.node_id', 'n.id') - // .where('un.user_id', '=', userId) - // .where('n.type', '=', NodeTypes.Message) - // .where('un.last_seen_version_id', 'is', null) - // .select(['n.parent_id as node_id']) - // .select((eb) => [ - // eb.fn.count('un.node_id').as('messages_count'), - // eb.fn.sum('un.mentions_count').as('mentions_count'), - // ]) - // .groupBy('n.parent_id') - // .execute(); + const nodeUnreadMessageCounts = await workspaceDatabase + .selectFrom('interactions as i') + .innerJoin('nodes as n', 'i.node_id', 'n.id') + .where('i.user_id', '=', userId) + .where('n.type', '=', 'message') + .where('i.last_seen_at', 'is', null) + .select(['n.parent_id as node_id']) + .select((eb) => [eb.fn.count('i.node_id').as('messages_count')]) + .groupBy('n.parent_id') + .execute(); - // for (const nodeUnreadMessageCount of nodeUnreadMessageCounts) { - // const idType = getIdType(nodeUnreadMessageCount.node_id); - // const nodeId = nodeUnreadMessageCount.node_id; - // const messagesCount = nodeUnreadMessageCount.messages_count; - // const mentionsCount = nodeUnreadMessageCount.mentions_count; + for (const nodeUnreadMessageCount of nodeUnreadMessageCounts) { + const idType = getIdType(nodeUnreadMessageCount.node_id); + const nodeId = nodeUnreadMessageCount.node_id; + const messagesCount = nodeUnreadMessageCount.messages_count; - // if (idType === IdType.Chat) { - // data.nodeStates[nodeId] = { - // type: 'chat', - // nodeId, - // unseenMessagesCount: messagesCount, - // mentionsCount, - // }; + if (idType === IdType.Chat) { + data.nodeStates[nodeId] = { + type: 'chat', + nodeId, + unseenMessagesCount: messagesCount, + mentionsCount: 0, + }; - // if (mentionsCount > 0) { - // data.importantCount += mentionsCount; - // } + if (messagesCount > 0) { + data.importantCount += messagesCount; + } + } else if (idType === IdType.Channel) { + data.nodeStates[nodeId] = { + type: 'channel', + nodeId, + unseenMessagesCount: messagesCount, + mentionsCount: 0, + }; - // if (messagesCount > 0) { - // data.importantCount += messagesCount; - // } - // } else if (idType === IdType.Channel) { - // data.nodeStates[nodeId] = { - // type: 'channel', - // nodeId, - // unseenMessagesCount: messagesCount, - // mentionsCount, - // }; - - // if (messagesCount > 0) { - // data.hasUnseenChanges = true; - // } else if (mentionsCount > 0) { - // data.importantCount += messagesCount; - // } - // } - // } + if (messagesCount > 0) { + data.hasUnseenChanges = true; + } + } + } this.workspaceStates.set(userId, data); } @@ -129,6 +120,14 @@ class RadarService { eventBus.publish({ type: 'radar_data_updated', }); + } else if ( + event.type === 'interaction_updated' && + event.userId === event.interaction.userId + ) { + await this.initWorkspace(event.userId); + eventBus.publish({ + type: 'radar_data_updated', + }); } } } diff --git a/apps/desktop/src/shared/types/events.ts b/apps/desktop/src/shared/types/events.ts index 569833ed..a05ded1e 100644 --- a/apps/desktop/src/shared/types/events.ts +++ b/apps/desktop/src/shared/types/events.ts @@ -145,6 +145,12 @@ export type InteractionEventCreatedEvent = { nodeId: string; }; +export type InteractionUpdatedEvent = { + type: 'interaction_updated'; + userId: string; + interaction: Interaction; +}; + export type Event = | NodeCreatedEvent | NodeUpdatedEvent @@ -170,4 +176,5 @@ export type Event = | ServerAvailabilityChangedEvent | SocketConnectionOpenedEvent | CollaborationCreatedEvent - | InteractionEventCreatedEvent; + | InteractionEventCreatedEvent + | InteractionUpdatedEvent;