Enable radar service with new data structure

This commit is contained in:
Hakan Shehu
2024-12-26 17:35:49 +01:00
parent 94ce64545a
commit dd6bd87ab5
3 changed files with 179 additions and 117 deletions

View File

@@ -1,11 +1,21 @@
import { IdType } from '@colanode/core';
import { getIdType, IdType } from '@colanode/core';
import { Kysely } from 'kysely';
import { WorkspaceDatabaseSchema } from '@/main/data/workspace/schema';
import {
SelectCollaboration,
WorkspaceDatabaseSchema,
} from '@/main/data/workspace/schema';
import { mapWorkspace } from '@/main/utils';
import { databaseService } from '@/main/data/database-service';
import { eventBus } from '@/shared/lib/event-bus';
import { Event, InteractionUpdatedEvent } from '@/shared/types/events';
import {
CollaborationCreatedEvent,
CollaborationDeletedEvent,
Event,
MessageCreatedEvent,
MessageDeletedEvent,
MessageInteractionUpdatedEvent,
} from '@/shared/types/events';
import {
WorkspaceRadarData,
ChannelReadState,
@@ -23,6 +33,7 @@ class RadarWorkspace {
public readonly workspace: Workspace;
private readonly workspaceDatabase: Kysely<WorkspaceDatabaseSchema>;
private readonly unreadMessages: Map<string, UndreadMessage> = new Map();
private readonly collaborations: Map<string, SelectCollaboration> = new Map();
constructor(
workspace: Workspace,
@@ -82,93 +93,151 @@ class RadarWorkspace {
}
public async init(): Promise<void> {
// const unreadMessagesRows = await this.workspaceDatabase
// .selectFrom('entries as entry')
// .innerJoin('interactions as entry_interactions', (join) =>
// join
// .onRef('entry.id', '=', 'entry_interactions.entry_id')
// .on('entry_interactions.user_id', '=', this.workspace.userId)
// )
// .innerJoin('interactions as parent_interactions', (join) =>
// join
// .onRef('node.parent_id', '=', 'parent_interactions.node_id')
// .on('parent_interactions.user_id', '=', this.workspace.userId)
// )
// .select(['node.id as node_id', 'node.parent_id as parent_id'])
// .where('node.created_by', '!=', this.workspace.userId)
// .where('node_interactions.last_seen_at', 'is', null)
// .where('parent_interactions.last_seen_at', 'is not', null)
// .whereRef(
// 'node.created_at',
// '>=',
// sql`json_extract(parent_interactions.attributes, '$.firstSeenAt')`
// )
// .execute();
// for (const unreadMessageRow of unreadMessagesRows) {
// this.unreadMessages.set(unreadMessageRow.node_id, {
// messageId: unreadMessageRow.node_id,
// parentId: unreadMessageRow.parent_id,
// parentIdType: getIdType(unreadMessageRow.parent_id),
// });
// }
const collaborations = await this.workspaceDatabase
.selectFrom('collaborations')
.selectAll()
.execute();
for (const collaboration of collaborations) {
this.collaborations.set(collaboration.entry_id, collaboration);
}
if (this.collaborations.size === 0) {
return;
}
const unreadMessagesRows = await this.workspaceDatabase
.selectFrom('messages as message')
.leftJoin('message_interactions as message_interactions', (join) =>
join
.onRef('message.id', '=', 'message_interactions.message_id')
.on(
'message_interactions.collaborator_id',
'=',
this.workspace.userId
)
)
.innerJoin('entry_interactions as entry_interactions', (join) =>
join
.onRef('message.entry_id', '=', 'entry_interactions.entry_id')
.on('entry_interactions.collaborator_id', '=', this.workspace.userId)
)
.select(['message.id as message_id', 'message.entry_id as entry_id'])
.where('message.created_by', '!=', this.workspace.userId)
.where('message_interactions.seen_at', 'is', null)
.where('entry_interactions.last_seen_at', 'is not', null)
.whereRef('message.created_at', '>=', 'entry_interactions.first_seen_at')
.execute();
for (const unreadMessageRow of unreadMessagesRows) {
this.unreadMessages.set(unreadMessageRow.message_id, {
messageId: unreadMessageRow.message_id,
parentId: unreadMessageRow.entry_id,
parentIdType: getIdType(unreadMessageRow.entry_id),
});
}
}
public async handleInteractionUpdated(
_: InteractionUpdatedEvent
public async handleMessageInteractionUpdated(
event: MessageInteractionUpdatedEvent
): Promise<void> {
// const interaction = event.interaction;
// if (
// event.userId !== this.workspace.userId ||
// interaction.userId !== this.workspace.userId
// ) {
// return;
// }
// if (interaction.attributes.lastSeenAt) {
// const unreadMessage = this.unreadMessages.get(interaction.nodeId);
// if (unreadMessage) {
// this.unreadMessages.delete(interaction.nodeId);
// eventBus.publish({
// type: 'radar_data_updated',
// });
// }
// return;
// }
// if (this.unreadMessages.has(interaction.nodeId)) {
// return;
// }
// const node = await this.workspaceDatabase
// .selectFrom('nodes')
// .selectAll()
// .where('id', '=', interaction.nodeId)
// .executeTakeFirst();
// if (!node) {
// return;
// }
// const parentInteraction = await this.workspaceDatabase
// .selectFrom('interactions')
// .selectAll()
// .where('node_id', '=', node.parent_id)
// .executeTakeFirst();
// if (!parentInteraction || !parentInteraction.last_seen_at) {
// return;
// }
// const parentInteractionAttributes: InteractionAttributes = JSON.parse(
// parentInteraction.attributes
// );
// if (
// !parentInteractionAttributes.firstSeenAt ||
// compareDate(parentInteractionAttributes.firstSeenAt, node.created_at) > 0
// ) {
// return;
// }
// this.unreadMessages.set(interaction.nodeId, {
// messageId: interaction.nodeId,
// parentId: node.parent_id,
// parentIdType: getIdType(node.parent_id),
// });
// eventBus.publish({
// type: 'radar_data_updated',
// });
const interaction = event.messageInteraction;
if (
event.userId !== this.workspace.userId ||
interaction.collaboratorId !== this.workspace.userId
) {
return;
}
if (interaction.seenAt) {
const unreadMessage = this.unreadMessages.get(interaction.messageId);
if (unreadMessage) {
this.unreadMessages.delete(interaction.messageId);
eventBus.publish({
type: 'radar_data_updated',
});
}
return;
}
}
public async handleMessageCreated(event: MessageCreatedEvent): Promise<void> {
const message = event.message;
if (message.createdBy === this.workspace.userId) {
return;
}
if (this.unreadMessages.has(message.id)) {
return;
}
const collaboration = this.collaborations.get(message.rootId);
if (!collaboration) {
return;
}
if (collaboration.created_at > message.createdAt) {
return;
}
const messageInteraction = await this.workspaceDatabase
.selectFrom('message_interactions')
.selectAll()
.where('message_id', '=', message.id)
.where('collaborator_id', '=', this.workspace.userId)
.executeTakeFirst();
if (messageInteraction && messageInteraction.seen_at) {
return;
}
this.unreadMessages.set(message.id, {
messageId: message.id,
parentId: message.rootId,
parentIdType: getIdType(message.rootId),
});
eventBus.publish({
type: 'radar_data_updated',
});
}
public async handleMessageDeleted(event: MessageDeletedEvent): Promise<void> {
const message = event.message;
if (message.createdBy === this.workspace.userId) {
return;
}
if (!this.unreadMessages.has(message.id)) {
return;
}
this.unreadMessages.delete(message.id);
eventBus.publish({
type: 'radar_data_updated',
});
}
public async handleCollaborationCreated(
event: CollaborationCreatedEvent
): Promise<void> {
const collaboration = await this.workspaceDatabase
.selectFrom('collaborations')
.selectAll()
.where('entry_id', '=', event.entryId)
.executeTakeFirst();
if (!collaboration) {
return;
}
this.collaborations.set(event.entryId, collaboration);
}
public async handleCollaborationDeleted(
event: CollaborationDeletedEvent
): Promise<void> {
this.collaborations.delete(event.entryId);
}
}
@@ -217,10 +286,10 @@ class RadarService {
eventBus.publish({
type: 'radar_data_updated',
});
} else if (event.type === 'interaction_updated') {
} else if (event.type === 'message_interaction_updated') {
const radarWorkspace = this.workspaces.get(event.userId);
if (radarWorkspace) {
radarWorkspace.handleInteractionUpdated(event);
radarWorkspace.handleMessageInteractionUpdated(event);
}
} else if (event.type === 'workspace_created') {
const workspaceDatabase = await databaseService.getWorkspaceDatabase(
@@ -233,6 +302,26 @@ class RadarService {
);
this.workspaces.set(event.workspace.userId, radarWorkspace);
await radarWorkspace.init();
} else if (event.type === 'message_created') {
const radarWorkspace = this.workspaces.get(event.userId);
if (radarWorkspace) {
radarWorkspace.handleMessageCreated(event);
}
} else if (event.type === 'message_deleted') {
const radarWorkspace = this.workspaces.get(event.userId);
if (radarWorkspace) {
radarWorkspace.handleMessageDeleted(event);
}
} else if (event.type === 'collaboration_created') {
const radarWorkspace = this.workspaces.get(event.userId);
if (radarWorkspace) {
radarWorkspace.handleCollaborationCreated(event);
}
} else if (event.type === 'collaboration_deleted') {
const radarWorkspace = this.workspaces.get(event.userId);
if (radarWorkspace) {
radarWorkspace.handleCollaborationDeleted(event);
}
}
}
}

View File

@@ -7,7 +7,6 @@ import {
MessageReaction,
} from '@/shared/types/messages';
import { Account } from '@/shared/types/accounts';
import { Interaction } from '@/shared/types/interactions';
import { Server } from '@/shared/types/servers';
import { Workspace } from '@/shared/types/workspaces';
import { User } from '@/shared/types/users';
@@ -211,18 +210,6 @@ export type SocketConnectionOpenedEvent = {
accountId: string;
};
export type InteractionEventCreatedEvent = {
type: 'interaction_event_created';
userId: string;
entryId: string;
};
export type InteractionUpdatedEvent = {
type: 'interaction_updated';
userId: string;
interaction: Interaction;
};
export type Event =
| UserCreatedEvent
| UserUpdatedEvent
@@ -258,6 +245,4 @@ export type Event =
| ServerAvailabilityChangedEvent
| SocketConnectionOpenedEvent
| CollaborationCreatedEvent
| CollaborationDeletedEvent
| InteractionEventCreatedEvent
| InteractionUpdatedEvent;
| CollaborationDeletedEvent;

View File

@@ -1,12 +0,0 @@
import { InteractionAttributes } from '@colanode/core';
export type Interaction = {
userId: string;
entryId: string;
attributes: InteractionAttributes;
createdAt: Date;
updatedAt: Date | null;
serverCreatedAt: Date | null;
serverUpdatedAt: Date | null;
version: bigint | null;
};