Implement radar service with the new interactions structure

This commit is contained in:
Hakan Shehu
2024-11-30 11:43:00 +01:00
parent e63f84a559
commit e792785399
6 changed files with 195 additions and 224 deletions

View File

@@ -122,6 +122,11 @@ const createInteractionsTable: Migration = {
.addColumn('node_id', 'text', (col) => col.notNull())
.addColumn('node_type', 'text', (col) => col.notNull())
.addColumn('attributes', 'text')
.addColumn('last_seen_at', 'text', (col) =>
col
.generatedAlwaysAs(sql`json_extract(attributes, '$.lastSeenAt')`)
.stored()
)
.addColumn('created_at', 'text', (col) => col.notNull())
.addColumn('updated_at', 'text')
.addColumn('server_created_at', 'text')

View File

@@ -93,6 +93,7 @@ interface InteractionTable {
node_id: ColumnType<string, string, never>;
node_type: ColumnType<NodeType, NodeType, never>;
attributes: ColumnType<InteractionAttributes, string, string>;
last_seen_at: ColumnType<string | null, string | null, string | null>;
created_at: ColumnType<string, string, never>;
updated_at: ColumnType<string | null, string | null, string | null>;
server_created_at: ColumnType<string | null, string | null, string | null>;

View File

@@ -1,12 +1,9 @@
import { generateId, IdType } from '@colanode/core';
import { databaseService } from '@/main/data/database-service';
import { MutationHandler } from '@/main/types';
import {
MarkNodeAsSeenMutationInput,
MarkNodeAsSeenMutationOutput,
} from '@/shared/mutations/mark-node-as-seen';
import { UserNode } from '@/shared/types/nodes';
import { eventBus } from '@/shared/lib/event-bus';
import { interactionService } from '@/main/services/interaction-service';
export class MarkNodeAsSeenMutationHandler
implements MutationHandler<MarkNodeAsSeenMutationInput>
@@ -14,103 +11,13 @@ export class MarkNodeAsSeenMutationHandler
async handleMutation(
input: MarkNodeAsSeenMutationInput
): Promise<MarkNodeAsSeenMutationOutput> {
// const workspaceDatabase = await databaseService.getWorkspaceDatabase(
// input.userId
// );
// const existingUserNode = await workspaceDatabase
// .selectFrom('user_nodes')
// .where('node_id', '=', input.nodeId)
// .where('user_id', '=', input.userId)
// .selectAll()
// .executeTakeFirst();
// if (
// existingUserNode &&
// existingUserNode.last_seen_version_id === input.versionId
// ) {
// const lastSeenAt = existingUserNode.last_seen_at
// ? new Date(existingUserNode.last_seen_at)
// : null;
// // if has been seen in the last 10 minutes, skip it. We don't want to spam the server with seen events.
// if (lastSeenAt && Date.now() - lastSeenAt.getTime() < 10 * 60 * 1000) {
// return {
// success: true,
// };
// }
// }
// let changeId: number | undefined;
// let userNode: UserNode | undefined;
// const changeData: LocalUserNodeChangeData = {
// type: 'user_node_update',
// nodeId: input.nodeId,
// userId: input.userId,
// lastSeenVersionId: input.versionId,
// lastSeenAt: new Date().toISOString(),
// mentionsCount: 0,
// versionId: generateId(IdType.Version),
// };
// await workspaceDatabase.transaction().execute(async (trx) => {
// const updatedUserNode = await trx
// .updateTable('user_nodes')
// .set({
// last_seen_version_id: input.versionId,
// last_seen_at: new Date().toISOString(),
// mentions_count: 0,
// version_id: generateId(IdType.Version),
// })
// .where('node_id', '=', input.nodeId)
// .where('user_id', '=', input.userId)
// .returningAll()
// .executeTakeFirst();
// if (updatedUserNode) {
// userNode = {
// userId: updatedUserNode.user_id,
// nodeId: updatedUserNode.node_id,
// lastSeenAt: updatedUserNode.last_seen_at,
// lastSeenVersionId: updatedUserNode.last_seen_version_id,
// mentionsCount: updatedUserNode.mentions_count,
// attributes: updatedUserNode.attributes,
// versionId: updatedUserNode.version_id,
// createdAt: updatedUserNode.created_at,
// updatedAt: updatedUserNode.updated_at,
// };
// }
// const createdChange = await trx
// .insertInto('changes')
// .values({
// data: JSON.stringify(changeData),
// created_at: new Date().toISOString(),
// retry_count: 0,
// })
// .returning('id')
// .executeTakeFirst();
// if (createdChange) {
// changeId = createdChange.id;
// }
// });
// if (userNode) {
// eventBus.publish({
// type: 'user_node_created',
// userId: input.userId,
// userNode,
// });
// }
// if (changeId) {
// eventBus.publish({
// type: 'change_created',
// userId: input.userId,
// changeId,
// });
// }
await interactionService.setInteraction(
input.userId,
input.nodeId,
'message',
'lastSeenAt',
new Date().toISOString()
);
return {
success: true,

View File

@@ -10,6 +10,7 @@ import {
import { databaseService } from '@/main/data/database-service';
import { SelectInteractionEvent } from '@/main/data/workspace/schema';
import { eventBus } from '@/shared/lib/event-bus';
import { mapInteraction } from '../utils';
const UPDATE_RETRIES_COUNT = 10;
@@ -70,7 +71,7 @@ class InteractionService {
}
if (interaction) {
const result = await workspaceDatabase
const { updatedInteraction } = await workspaceDatabase
.transaction()
.execute(async (tx) => {
const updatedInteraction = await tx
@@ -86,7 +87,7 @@ class InteractionService {
.executeTakeFirst();
if (!updatedInteraction) {
return false;
return { updatedInteraction: undefined };
}
await tx
@@ -108,70 +109,88 @@ class InteractionService {
)
.execute();
return true;
return { updatedInteraction };
});
if (updatedInteraction) {
eventBus.publish({
type: 'interaction_updated',
userId,
interaction: mapInteraction(updatedInteraction),
});
if (result) {
eventBus.publish({
type: 'interaction_event_created',
userId,
nodeId,
});
return true;
}
return result;
return false;
}
const result = await workspaceDatabase.transaction().execute(async (tx) => {
const createdInteraction = await tx
.insertInto('interactions')
.returningAll()
.values({
node_id: nodeId,
node_type: nodeType,
user_id: userId,
attributes: JSON.stringify(attributes),
created_at: new Date().toISOString(),
version: BigInt(0),
})
.onConflict((b) => b.columns(['node_id', 'user_id']).doNothing())
.executeTakeFirst();
const { createdInteraction } = await workspaceDatabase
.transaction()
.execute(async (tx) => {
const createdInteraction = await tx
.insertInto('interactions')
.returningAll()
.values({
node_id: nodeId,
node_type: nodeType,
user_id: userId,
attributes: JSON.stringify(attributes),
created_at: new Date().toISOString(),
version: BigInt(0),
})
.onConflict((b) => b.columns(['node_id', 'user_id']).doNothing())
.executeTakeFirst();
if (!createdInteraction) {
return false;
}
if (!createdInteraction) {
return { createdInteraction: undefined };
}
await tx
.insertInto('interaction_events')
.values({
node_id: nodeId,
node_type: nodeType,
attribute,
value,
created_at: new Date().toISOString(),
event_id: generateId(IdType.Event),
})
.onConflict((b) =>
b.columns(['node_id', 'attribute']).doUpdateSet({
await tx
.insertInto('interaction_events')
.values({
node_id: nodeId,
node_type: nodeType,
attribute,
value,
sent_at: null,
created_at: new Date().toISOString(),
event_id: generateId(IdType.Event),
})
)
.execute();
.onConflict((b) =>
b.columns(['node_id', 'attribute']).doUpdateSet({
value,
sent_at: null,
event_id: generateId(IdType.Event),
})
)
.execute();
return true;
});
return { createdInteraction };
});
if (createdInteraction) {
eventBus.publish({
type: 'interaction_updated',
userId,
interaction: mapInteraction(createdInteraction),
});
if (result) {
eventBus.publish({
type: 'interaction_event_created',
userId,
nodeId,
});
return true;
}
return result;
return false;
}
public async applyServerInteraction(
@@ -201,8 +220,9 @@ class InteractionService {
const workspaceDatabase =
await databaseService.getWorkspaceDatabase(userId);
await workspaceDatabase
const createdInteraction = await workspaceDatabase
.insertInto('interactions')
.returningAll()
.values({
user_id: interaction.userId,
node_id: interaction.nodeId,
@@ -222,7 +242,17 @@ class InteractionService {
version: BigInt(interaction.version),
})
)
.execute();
.executeTakeFirst();
if (createdInteraction) {
eventBus.publish({
type: 'interaction_updated',
userId,
interaction: mapInteraction(createdInteraction),
});
}
return createdInteraction;
}
private async tryApplyServerInteraction(
@@ -250,7 +280,7 @@ class InteractionService {
);
if (existingInteraction) {
const result = await workspaceDatabase
const { updatedInteraction } = await workspaceDatabase
.transaction()
.execute(async (tx) => {
const updatedInteraction = await tx
@@ -268,7 +298,7 @@ class InteractionService {
.executeTakeFirst();
if (!updatedInteraction) {
return false;
return { updatedInteraction: undefined };
}
if (toDeleteEventIds.length > 0) {
@@ -279,46 +309,68 @@ class InteractionService {
.execute();
}
return true;
return { updatedInteraction };
});
return result;
if (updatedInteraction) {
eventBus.publish({
type: 'interaction_updated',
userId,
interaction: mapInteraction(updatedInteraction),
});
return true;
}
return false;
}
const result = await workspaceDatabase.transaction().execute(async (tx) => {
const createdInteraction = await tx
.insertInto('interactions')
.returningAll()
.values({
user_id: interaction.userId,
node_id: interaction.nodeId,
node_type: interaction.nodeType,
attributes: JSON.stringify(attributes),
created_at: interaction.createdAt,
updated_at: interaction.updatedAt,
server_created_at: interaction.serverCreatedAt,
server_updated_at: interaction.serverUpdatedAt,
version: BigInt(interaction.version),
})
.onConflict((b) => b.columns(['node_id', 'user_id']).doNothing())
.executeTakeFirst();
const { createdInteraction } = await workspaceDatabase
.transaction()
.execute(async (tx) => {
const createdInteraction = await tx
.insertInto('interactions')
.returningAll()
.values({
user_id: interaction.userId,
node_id: interaction.nodeId,
node_type: interaction.nodeType,
attributes: JSON.stringify(attributes),
created_at: interaction.createdAt,
updated_at: interaction.updatedAt,
server_created_at: interaction.serverCreatedAt,
server_updated_at: interaction.serverUpdatedAt,
version: BigInt(interaction.version),
})
.onConflict((b) => b.columns(['node_id', 'user_id']).doNothing())
.executeTakeFirst();
if (!createdInteraction) {
return false;
}
if (!createdInteraction) {
return { createdInteraction: undefined };
}
if (toDeleteEventIds.length > 0) {
await tx
.deleteFrom('interaction_events')
.where('node_id', '=', interaction.nodeId)
.where('event_id', 'in', toDeleteEventIds)
.execute();
}
if (toDeleteEventIds.length > 0) {
await tx
.deleteFrom('interaction_events')
.where('node_id', '=', interaction.nodeId)
.where('event_id', 'in', toDeleteEventIds)
.execute();
}
return { createdInteraction };
});
if (createdInteraction) {
eventBus.publish({
type: 'interaction_updated',
userId,
interaction: mapInteraction(createdInteraction),
});
return true;
});
}
return result;
return false;
}
private mergeServerAttributes(

View File

@@ -2,6 +2,7 @@ import { databaseService } from '@/main/data/database-service';
import { WorkspaceRadarData } from '@/shared/types/radars';
import { eventBus } from '@/shared/lib/event-bus';
import { Event } from '@/shared/types/events';
import { getIdType, IdType } from '@colanode/core';
class RadarService {
private readonly workspaceStates: Map<string, WorkspaceRadarData> = new Map();
@@ -69,56 +70,46 @@ class RadarService {
nodeStates: {},
};
// const nodeUnreadMessageCounts = await workspaceDatabase
// .selectFrom('user_nodes as un')
// .innerJoin('nodes as n', 'un.node_id', 'n.id')
// .where('un.user_id', '=', userId)
// .where('n.type', '=', NodeTypes.Message)
// .where('un.last_seen_version_id', 'is', null)
// .select(['n.parent_id as node_id'])
// .select((eb) => [
// eb.fn.count<number>('un.node_id').as('messages_count'),
// eb.fn.sum<number>('un.mentions_count').as('mentions_count'),
// ])
// .groupBy('n.parent_id')
// .execute();
const nodeUnreadMessageCounts = await workspaceDatabase
.selectFrom('interactions as i')
.innerJoin('nodes as n', 'i.node_id', 'n.id')
.where('i.user_id', '=', userId)
.where('n.type', '=', 'message')
.where('i.last_seen_at', 'is', null)
.select(['n.parent_id as node_id'])
.select((eb) => [eb.fn.count<number>('i.node_id').as('messages_count')])
.groupBy('n.parent_id')
.execute();
// for (const nodeUnreadMessageCount of nodeUnreadMessageCounts) {
// const idType = getIdType(nodeUnreadMessageCount.node_id);
// const nodeId = nodeUnreadMessageCount.node_id;
// const messagesCount = nodeUnreadMessageCount.messages_count;
// const mentionsCount = nodeUnreadMessageCount.mentions_count;
for (const nodeUnreadMessageCount of nodeUnreadMessageCounts) {
const idType = getIdType(nodeUnreadMessageCount.node_id);
const nodeId = nodeUnreadMessageCount.node_id;
const messagesCount = nodeUnreadMessageCount.messages_count;
// if (idType === IdType.Chat) {
// data.nodeStates[nodeId] = {
// type: 'chat',
// nodeId,
// unseenMessagesCount: messagesCount,
// mentionsCount,
// };
if (idType === IdType.Chat) {
data.nodeStates[nodeId] = {
type: 'chat',
nodeId,
unseenMessagesCount: messagesCount,
mentionsCount: 0,
};
// if (mentionsCount > 0) {
// data.importantCount += mentionsCount;
// }
if (messagesCount > 0) {
data.importantCount += messagesCount;
}
} else if (idType === IdType.Channel) {
data.nodeStates[nodeId] = {
type: 'channel',
nodeId,
unseenMessagesCount: messagesCount,
mentionsCount: 0,
};
// if (messagesCount > 0) {
// data.importantCount += messagesCount;
// }
// } else if (idType === IdType.Channel) {
// data.nodeStates[nodeId] = {
// type: 'channel',
// nodeId,
// unseenMessagesCount: messagesCount,
// mentionsCount,
// };
// if (messagesCount > 0) {
// data.hasUnseenChanges = true;
// } else if (mentionsCount > 0) {
// data.importantCount += messagesCount;
// }
// }
// }
if (messagesCount > 0) {
data.hasUnseenChanges = true;
}
}
}
this.workspaceStates.set(userId, data);
}
@@ -129,6 +120,14 @@ class RadarService {
eventBus.publish({
type: 'radar_data_updated',
});
} else if (
event.type === 'interaction_updated' &&
event.userId === event.interaction.userId
) {
await this.initWorkspace(event.userId);
eventBus.publish({
type: 'radar_data_updated',
});
}
}
}

View File

@@ -145,6 +145,12 @@ export type InteractionEventCreatedEvent = {
nodeId: string;
};
export type InteractionUpdatedEvent = {
type: 'interaction_updated';
userId: string;
interaction: Interaction;
};
export type Event =
| NodeCreatedEvent
| NodeUpdatedEvent
@@ -170,4 +176,5 @@ export type Event =
| ServerAvailabilityChangedEvent
| SocketConnectionOpenedEvent
| CollaborationCreatedEvent
| InteractionEventCreatedEvent;
| InteractionEventCreatedEvent
| InteractionUpdatedEvent;