Implement server handlers for node reactions

This commit is contained in:
Hakan Shehu
2024-09-15 17:58:09 +02:00
parent 702f5dce45
commit cb286b01f5
9 changed files with 267 additions and 2 deletions

View File

@@ -0,0 +1,132 @@
import { kafka, TOPIC_NAMES, CONSUMER_IDS } from '@/data/kafka';
import { ChangeMessage, NodeReactionChangeData } from '@/types/changes';
import { PostgresOperation } from '@/lib/constants';
import { database } from '@/data/database';
import { NeuronId } from '@/lib/id';
import { ServerNodeReaction } from '@/types/nodes';
export const initNodeReactionChangesConsumer = async () => {
const consumer = kafka.consumer({
groupId: CONSUMER_IDS.NODE_REACTION_CHANGES,
});
await consumer.connect();
await consumer.subscribe({ topic: TOPIC_NAMES.NODE_REACTION_CHANGES });
await consumer.run({
eachMessage: async ({ message }) => {
if (!message || !message.value) {
return;
}
const change = JSON.parse(
message.value.toString(),
) as ChangeMessage<NodeReactionChangeData>;
await handleNodeReactionChange(change);
},
});
};
const handleNodeReactionChange = async (
change: ChangeMessage<NodeReactionChangeData>,
) => {
switch (change.op) {
case PostgresOperation.CREATE: {
await handleNodeReactionCreate(change);
break;
}
case PostgresOperation.DELETE: {
await handleNodeReactionDelete(change);
break;
}
}
};
const handleNodeReactionCreate = async (
change: ChangeMessage<NodeReactionChangeData>,
) => {
const reaction = change.after;
if (!reaction) {
return;
}
const deviceIds = await getDeviceIds(reaction.workspace_id);
if (deviceIds.length == 0) {
return;
}
const serverNodeReaction: ServerNodeReaction = mapNodeReaction(reaction);
await database
.insertInto('mutations')
.values({
id: NeuronId.generate(NeuronId.Type.Mutation),
table: 'node_reactions',
action: 'insert',
workspace_id: reaction.workspace_id,
created_at: new Date(),
after: JSON.stringify(serverNodeReaction),
device_ids: deviceIds,
})
.execute();
};
const handleNodeReactionDelete = async (
change: ChangeMessage<NodeReactionChangeData>,
) => {
const reaction = change.before;
if (!reaction) {
return;
}
const deviceIds = await getDeviceIds(reaction.workspace_id);
if (deviceIds.length == 0) {
return;
}
const serverNodeReaction: ServerNodeReaction = mapNodeReaction(reaction);
await database
.insertInto('mutations')
.values({
id: NeuronId.generate(NeuronId.Type.Mutation),
table: 'node_reactions',
action: 'delete',
workspace_id: reaction.workspace_id,
created_at: new Date(),
before: JSON.stringify(serverNodeReaction),
after: null,
device_ids: deviceIds,
})
.execute();
};
const getDeviceIds = async (workspaceId: string) => {
const accountDevices = await database
.selectFrom('account_devices')
.where(
'account_id',
'in',
database
.selectFrom('workspace_accounts')
.where('workspace_id', '=', workspaceId)
.select('account_id'),
)
.select('id')
.execute();
const deviceIds = accountDevices.map((account) => account.id);
return deviceIds;
};
const mapNodeReaction = (
reaction: NodeReactionChangeData,
): ServerNodeReaction => {
return {
nodeId: reaction.node_id,
reactorId: reaction.reactor_id,
reaction: reaction.reaction,
workspaceId: reaction.workspace_id,
createdAt: reaction.created_at,
serverCreatedAt: reaction.server_created_at,
};
};

View File

@@ -26,6 +26,9 @@ export const TOPIC_NAMES = {
NODE_ATTRIBUTE_CHANGES:
process.env.KAFKA_NODE_ATTRIBUTE_CHANGES_TOPIC_NAME ??
'neuron_node_attribute_changes',
NODE_REACTION_CHANGES:
process.env.KAFKA_NODE_REACTION_CHANGES_TOPIC_NAME ??
'neuron_node_reaction_changes',
MUTATION_CHANGES:
process.env.KAFKA_MUTATION_CHANGES_TOPIC_NAME ?? 'neuron_mutation_changes',
};
@@ -37,6 +40,9 @@ export const CONSUMER_IDS = {
NODE_ATTRIBUTE_CHANGES:
process.env.KAFKA_NODE_ATTRIBUTE_CHANGES_CONSUMER_ID ??
'neuron_node_attribute_changes_consumer',
NODE_REACTION_CHANGES:
process.env.KAFKA_NODE_REACTION_CHANGES_CONSUMER_ID ??
'neuron_node_reaction_changes_consumer',
MUTATION_CHANGES:
process.env.KAFKA_MUTATION_CHANGES_CONSUMER_ID ??
'neuron_mutation_changes_consumer',

View File

@@ -121,6 +121,32 @@ const createNodeAttributesTable: Migration = {
},
};
const createNodeReactionsTable: Migration = {
up: async (db) => {
await db.schema
.createTable('node_reactions')
.addColumn('node_id', 'varchar(30)', (col) =>
col.notNull().references('nodes.id').onDelete('cascade'),
)
.addColumn('reactor_id', 'varchar(30)', (col) =>
col.notNull().references('nodes.id').onDelete('cascade'),
)
.addColumn('reaction', 'varchar(30)', (col) => col.notNull())
.addColumn('workspace_id', 'varchar(30)', (col) => col.notNull())
.addColumn('created_at', 'timestamptz', (col) => col.notNull())
.addColumn('server_created_at', 'timestamptz', (col) => col.notNull())
.addPrimaryKeyConstraint('node_reactions_pkey', [
'node_id',
'reactor_id',
'reaction',
])
.execute();
},
down: async (db) => {
await db.schema.dropTable('node_reactions').execute();
},
};
const createAccountDevicesTable: Migration = {
up: async (db) => {
await db.schema
@@ -169,4 +195,5 @@ export const databaseMigrations: Record<string, Migration> = {
'00005_create_node_attributes_table': createNodeAttributesTable,
'00006_create_account_devices_table': createAccountDevicesTable,
'00007_create_mutations_table': createMutationsTable,
'00008_create_node_reactions_table': createNodeReactionsTable,
};

View File

@@ -117,6 +117,19 @@ export type SelectNodeAttribute = Selectable<NodeAttributeTable>;
export type CreateNodeAttribute = Insertable<NodeAttributeTable>;
export type UpdateNodeAttribute = Updateable<NodeAttributeTable>;
interface NodeReactionTable {
node_id: ColumnType<string, string, never>;
reactor_id: ColumnType<string, string, never>;
reaction: ColumnType<string, string, never>;
workspace_id: ColumnType<string, string, never>;
created_at: ColumnType<Date, Date, never>;
server_created_at: ColumnType<Date, Date, never>;
}
export type SelectNodeReaction = Selectable<NodeReactionTable>;
export type CreateNodeReaction = Insertable<NodeReactionTable>;
export type UpdateNodeReaction = Updateable<NodeReactionTable>;
interface MutationTable {
id: ColumnType<string, string, never>;
workspace_id: ColumnType<string, string, never>;
@@ -139,5 +152,6 @@ export interface DatabaseSchema {
account_devices: AccountDeviceTable;
nodes: NodeTable;
node_attributes: NodeAttributeTable;
node_reactions: NodeReactionTable;
mutations: MutationTable;
}

View File

@@ -5,6 +5,7 @@ import {
LocalMutation,
LocalNodeAttributeMutationData,
LocalNodeMutationData,
LocalNodeReactionMutationData,
} from '@/types/mutations';
import { database } from '@/data/database';
@@ -55,6 +56,18 @@ mutationsRouter.post('/', async (req: NeuronRequest, res: NeuronResponse) => {
}
}
}
case 'node_reactions': {
switch (mutation.action) {
case 'insert': {
await handleCreateNodeReactionMutation(input.workspaceId, mutation);
break;
}
case 'delete': {
await handleDeleteNodeReactionMutation(mutation);
break;
}
}
}
}
}
@@ -296,3 +309,51 @@ const handleDeleteNodeAttributeMutation = async (
)
.execute();
};
const handleCreateNodeReactionMutation = async (
workspaceId: string,
mutation: LocalMutation,
): Promise<void> => {
if (!mutation.after) {
return;
}
const nodeReactionData = JSON.parse(
mutation.after,
) as LocalNodeReactionMutationData;
await database
.insertInto('node_reactions')
.values({
node_id: nodeReactionData.node_id,
reactor_id: nodeReactionData.reactor_id,
reaction: nodeReactionData.reaction,
created_at: new Date(nodeReactionData.created_at),
workspace_id: workspaceId,
server_created_at: new Date(),
})
.onConflict((ob) => ob.doNothing())
.execute();
};
const handleDeleteNodeReactionMutation = async (
mutation: LocalMutation,
): Promise<void> => {
if (!mutation.before) {
return;
}
const nodeReactionData = JSON.parse(
mutation.before,
) as LocalNodeReactionMutationData;
await database
.deleteFrom('node_reactions')
.where((eb) =>
eb.and([
eb('node_id', '=', nodeReactionData.node_id),
eb('reactor_id', '=', nodeReactionData.reactor_id),
eb('reaction', '=', nodeReactionData.reaction),
]),
)
.execute();
};

View File

@@ -69,3 +69,12 @@ export type NodeAttributeChangeData = {
server_created_at: string;
server_updated_at: string | null;
};
export type NodeReactionChangeData = {
node_id: string;
reactor_id: string;
reaction: string;
workspace_id: string;
created_at: string;
server_created_at: string;
};

View File

@@ -46,6 +46,13 @@ export type LocalNodeAttributeMutationData = {
server_version_id: string;
};
export type LocalNodeReactionMutationData = {
node_id: string;
reactor_id: string;
reaction: string;
created_at: string;
};
export type ServerMutation = {
id: string;
table: string;

View File

@@ -31,6 +31,15 @@ export type ServerNodeAttribute = {
serverUpdatedAt?: string | null;
};
export type ServerNodeReaction = {
nodeId: string;
reactorId: string;
reaction: string;
workspaceId: string;
createdAt: string;
serverCreatedAt: string;
};
export type NodeBlock = {
type: string;
text?: string | null;