Rename mutations to changes and sync improvements

This commit is contained in:
Hakan Shehu
2024-10-04 12:23:36 +02:00
parent 2210306d32
commit 596bcba9f4
47 changed files with 969 additions and 842 deletions

View File

@@ -5,7 +5,6 @@ import { WebSocketServer } from 'ws';
import { accountsRouter } from '@/routes/accounts';
import { workspacesRouter } from '@/routes/workspaces';
import { mutationsRouter } from '@/routes/mutations';
import { authMiddleware } from '@/middlewares/auth';
import { syncRouter } from '@/routes/sync';
import { configRouter } from '@/routes/config';
@@ -25,8 +24,7 @@ export const initApi = () => {
app.use('/v1/accounts', accountsRouter);
app.use('/v1/config', configRouter);
app.use('/v1/workspaces', authMiddleware, workspacesRouter);
app.use('/v1/mutations', authMiddleware, mutationsRouter);
app.use('/v1/', authMiddleware, syncRouter);
app.use('/v1/sync', authMiddleware, syncRouter);
const server = http.createServer(app);

View File

@@ -0,0 +1,71 @@
import { kafka, TOPIC_NAMES, CONSUMER_IDS } from '@/data/kafka';
import { CdcMessage, ChangeCdcData } from '@/types/cdc';
import { redis, CHANNEL_NAMES } from '@/data/redis';
import { PostgresOperation } from '@/lib/constants';
import { database } from '@/data/database';
export const initChangeCdcConsumer = async () => {
const consumer = kafka.consumer({ groupId: CONSUMER_IDS.CHANGE_CDC });
await consumer.connect();
await consumer.subscribe({ topic: TOPIC_NAMES.CHANGE_CDC });
await consumer.run({
eachMessage: async ({ message }) => {
if (!message || !message.value) {
return;
}
const change = JSON.parse(
message.value.toString(),
) as CdcMessage<ChangeCdcData>;
await handleChangeCdc(change);
},
});
};
const handleChangeCdc = async (change: CdcMessage<ChangeCdcData>) => {
switch (change.op) {
case PostgresOperation.CREATE: {
await handleChangeCreate(change);
break;
}
case PostgresOperation.UPDATE: {
await handleChangeUpdate(change);
break;
}
case PostgresOperation.DELETE: {
await handleChangeDelete(change);
break;
}
}
};
const handleChangeCreate = async (change: CdcMessage<ChangeCdcData>) => {
const changeData = change.after;
if (!changeData) {
return;
}
await redis.publish(CHANNEL_NAMES.CHANGES, JSON.stringify(changeData));
};
const handleChangeUpdate = async (change: CdcMessage<ChangeCdcData>) => {
const changeData = change.after;
if (!changeData) {
return;
}
// if all devices have acknowledged the mutation, delete it
if (changeData.device_ids == null || changeData.device_ids.length == 0) {
await database
.deleteFrom('changes')
.where('id', '=', changeData.id)
.execute();
}
};
const handleChangeDelete = async (change: CdcMessage<ChangeCdcData>) => {
console.log('Change delete:', change.before?.id);
};

View File

@@ -1,79 +0,0 @@
import { kafka, TOPIC_NAMES, CONSUMER_IDS } from '@/data/kafka';
import { ChangeMessage, MutationChangeData } from '@/types/changes';
import { redis, CHANNEL_NAMES } from '@/data/redis';
import { PostgresOperation } from '@/lib/constants';
import { database } from '@/data/database';
export const initMutationChangesConsumer = async () => {
const consumer = kafka.consumer({ groupId: CONSUMER_IDS.MUTATION_CHANGES });
await consumer.connect();
await consumer.subscribe({ topic: TOPIC_NAMES.MUTATION_CHANGES });
await consumer.run({
eachMessage: async ({ message }) => {
if (!message || !message.value) {
return;
}
const change = JSON.parse(
message.value.toString(),
) as ChangeMessage<MutationChangeData>;
await handleMutationChange(change);
},
});
};
const handleMutationChange = async (
change: ChangeMessage<MutationChangeData>,
) => {
switch (change.op) {
case PostgresOperation.CREATE: {
await handleMutationCreate(change);
break;
}
case PostgresOperation.UPDATE: {
await handleMutationUpdate(change);
break;
}
case PostgresOperation.DELETE: {
await handleMutationDelete(change);
break;
}
}
};
const handleMutationCreate = async (
change: ChangeMessage<MutationChangeData>,
) => {
const mutationData = change.after;
if (!mutationData) {
return;
}
await redis.publish(CHANNEL_NAMES.MUTATIONS, JSON.stringify(mutationData));
};
const handleMutationUpdate = async (
change: ChangeMessage<MutationChangeData>,
) => {
const mutationData = change.after;
if (!mutationData) {
return;
}
// if all devices have acknowledged the mutation, delete it
if (mutationData.device_ids == null || mutationData.device_ids.length == 0) {
await database
.deleteFrom('mutations')
.where('id', '=', mutationData.id)
.execute();
}
};
const handleMutationDelete = async (
change: ChangeMessage<MutationChangeData>,
) => {
console.log('Mutation delete:', change.before?.id);
};

View File

@@ -1,33 +1,33 @@
import { redis, CHANNEL_NAMES } from '@/data/redis';
import { synapse } from '@/synapse';
import { ServerMutation } from '@/types/mutations';
import { MutationChangeData } from '@/types/changes';
import { ServerChange } from '@/types/sync';
import { ChangeCdcData } from '@/types/cdc';
export const initMutationsSubscriber = async () => {
export const initChangesSubscriber = async () => {
const subscriber = redis.duplicate();
await subscriber.connect();
await subscriber.subscribe(CHANNEL_NAMES.MUTATIONS, handleMessage);
await subscriber.subscribe(CHANNEL_NAMES.CHANGES, handleMessage);
};
const handleMessage = async (message: string) => {
const mutationData = JSON.parse(message) as MutationChangeData;
if (!mutationData.device_ids || !mutationData.device_ids.length) {
const changeData = JSON.parse(message) as ChangeCdcData;
if (!changeData.device_ids || !changeData.device_ids.length) {
return;
}
const serverMutation: ServerMutation = {
id: mutationData.id,
action: mutationData.action as 'insert' | 'update' | 'delete',
table: mutationData.table,
workspaceId: mutationData.workspace_id,
before: mutationData.before ? JSON.parse(mutationData.before) : null,
after: mutationData.after ? JSON.parse(mutationData.after) : null,
const serverChange: ServerChange = {
id: changeData.id,
action: changeData.action as 'insert' | 'update' | 'delete',
table: changeData.table,
workspaceId: changeData.workspace_id,
before: changeData.before ? JSON.parse(changeData.before) : null,
after: changeData.after ? JSON.parse(changeData.after) : null,
};
for (const deviceId of mutationData.device_ids) {
for (const deviceId of changeData.device_ids) {
synapse.send(deviceId, {
type: 'mutation',
payload: serverMutation,
type: 'change',
payload: serverChange,
});
}
};

View File

@@ -1,15 +1,15 @@
import { kafka, TOPIC_NAMES, CONSUMER_IDS } from '@/data/kafka';
import { ChangeMessage, NodeChangeData } from '@/types/changes';
import { CdcMessage, NodeCdcData } from '@/types/cdc';
import { PostgresOperation } from '@/lib/constants';
import { database } from '@/data/database';
import { NeuronId } from '@/lib/id';
import { ServerNode } from '@/types/nodes';
export const initNodeChangesConsumer = async () => {
const consumer = kafka.consumer({ groupId: CONSUMER_IDS.NODE_CHANGES });
const consumer = kafka.consumer({ groupId: CONSUMER_IDS.NODE_CDC });
await consumer.connect();
await consumer.subscribe({ topic: TOPIC_NAMES.NODE_CHANGES });
await consumer.subscribe({ topic: TOPIC_NAMES.NODE_CDC });
await consumer.run({
eachMessage: async ({ message }) => {
@@ -19,14 +19,14 @@ export const initNodeChangesConsumer = async () => {
const change = JSON.parse(
message.value.toString(),
) as ChangeMessage<NodeChangeData>;
) as CdcMessage<NodeCdcData>;
await handleNodeChange(change);
await handleNodeCdc(change);
},
});
};
const handleNodeChange = async (change: ChangeMessage<NodeChangeData>) => {
const handleNodeCdc = async (change: CdcMessage<NodeCdcData>) => {
switch (change.op) {
case PostgresOperation.CREATE: {
await handleNodeCreate(change);
@@ -43,7 +43,7 @@ const handleNodeChange = async (change: ChangeMessage<NodeChangeData>) => {
}
};
const handleNodeCreate = async (change: ChangeMessage<NodeChangeData>) => {
const handleNodeCreate = async (change: CdcMessage<NodeCdcData>) => {
const node = change.after;
if (!node) {
return;
@@ -56,9 +56,9 @@ const handleNodeCreate = async (change: ChangeMessage<NodeChangeData>) => {
const serverNode: ServerNode = mapNode(node);
await database
.insertInto('mutations')
.insertInto('changes')
.values({
id: NeuronId.generate(NeuronId.Type.Mutation),
id: NeuronId.generate(NeuronId.Type.Change),
table: 'nodes',
action: 'insert',
workspace_id: node.workspace_id,
@@ -69,7 +69,7 @@ const handleNodeCreate = async (change: ChangeMessage<NodeChangeData>) => {
.execute();
};
const handleNodeUpdate = async (change: ChangeMessage<NodeChangeData>) => {
const handleNodeUpdate = async (change: CdcMessage<NodeCdcData>) => {
const node = change.after;
if (!node) {
return;
@@ -82,9 +82,9 @@ const handleNodeUpdate = async (change: ChangeMessage<NodeChangeData>) => {
const serverNode: ServerNode = mapNode(node);
await database
.insertInto('mutations')
.insertInto('changes')
.values({
id: NeuronId.generate(NeuronId.Type.Mutation),
id: NeuronId.generate(NeuronId.Type.Change),
table: 'nodes',
action: 'update',
workspace_id: node.workspace_id,
@@ -96,7 +96,7 @@ const handleNodeUpdate = async (change: ChangeMessage<NodeChangeData>) => {
.execute();
};
const handleNodeDelete = async (change: ChangeMessage<NodeChangeData>) => {
const handleNodeDelete = async (change: CdcMessage<NodeCdcData>) => {
const node = change.before;
if (!node) {
return;
@@ -109,9 +109,9 @@ const handleNodeDelete = async (change: ChangeMessage<NodeChangeData>) => {
const serverNode: ServerNode = mapNode(node);
await database
.insertInto('mutations')
.insertInto('changes')
.values({
id: NeuronId.generate(NeuronId.Type.Mutation),
id: NeuronId.generate(NeuronId.Type.Change),
table: 'nodes',
action: 'delete',
workspace_id: node.workspace_id,
@@ -130,7 +130,7 @@ const getDeviceIds = async (workspaceId: string) => {
'account_id',
'in',
database
.selectFrom('workspace_accounts')
.selectFrom('workspace_users')
.where('workspace_id', '=', workspaceId)
.select('account_id'),
)
@@ -141,7 +141,7 @@ const getDeviceIds = async (workspaceId: string) => {
return deviceIds;
};
const mapNode = (node: NodeChangeData): ServerNode => {
const mapNode = (node: NodeCdcData): ServerNode => {
return {
id: node.id,
workspaceId: node.workspace_id,

View File

@@ -1,5 +1,5 @@
import { kafka, TOPIC_NAMES, CONSUMER_IDS } from '@/data/kafka';
import { ChangeMessage, NodeCollaboratorChangeData } from '@/types/changes';
import { CdcMessage, NodeCollaboratorCdcData } from '@/types/cdc';
import { PostgresOperation } from '@/lib/constants';
import { database } from '@/data/database';
import { NeuronId } from '@/lib/id';
@@ -7,11 +7,11 @@ import { ServerNodeCollaborator } from '@/types/nodes';
export const initNodeCollaboratorChangesConsumer = async () => {
const consumer = kafka.consumer({
groupId: CONSUMER_IDS.NODE_COLLABORATOR_CHANGES,
groupId: CONSUMER_IDS.NODE_COLLABORATOR_CDC,
});
await consumer.connect();
await consumer.subscribe({ topic: TOPIC_NAMES.NODE_COLLABORATOR_CHANGES });
await consumer.subscribe({ topic: TOPIC_NAMES.NODE_COLLABORATOR_CDC });
await consumer.run({
eachMessage: async ({ message }) => {
@@ -21,15 +21,15 @@ export const initNodeCollaboratorChangesConsumer = async () => {
const change = JSON.parse(
message.value.toString(),
) as ChangeMessage<NodeCollaboratorChangeData>;
) as CdcMessage<NodeCollaboratorCdcData>;
await handleNodeCollaboratorChange(change);
await handleNodeCollaboratorCdc(change);
},
});
};
const handleNodeCollaboratorChange = async (
change: ChangeMessage<NodeCollaboratorChangeData>,
const handleNodeCollaboratorCdc = async (
change: CdcMessage<NodeCollaboratorCdcData>,
) => {
switch (change.op) {
case PostgresOperation.CREATE: {
@@ -48,27 +48,27 @@ const handleNodeCollaboratorChange = async (
};
const handleNodeCollaboratorCreate = async (
change: ChangeMessage<NodeCollaboratorChangeData>,
change: CdcMessage<NodeCollaboratorCdcData>,
) => {
const reaction = change.after;
if (!reaction) {
const nodeCollaborator = change.after;
if (!nodeCollaborator) {
return;
}
const deviceIds = await getDeviceIds(reaction.workspace_id);
const deviceIds = await getDeviceIds(nodeCollaborator.workspace_id);
if (deviceIds.length == 0) {
return;
}
const serverNodeCollaborator: ServerNodeCollaborator =
mapNodeCollaborator(reaction);
mapNodeCollaborator(nodeCollaborator);
await database
.insertInto('mutations')
.insertInto('changes')
.values({
id: NeuronId.generate(NeuronId.Type.Mutation),
id: NeuronId.generate(NeuronId.Type.Change),
table: 'node_collaborators',
action: 'insert',
workspace_id: reaction.workspace_id,
workspace_id: nodeCollaborator.workspace_id,
created_at: new Date(),
after: JSON.stringify(serverNodeCollaborator),
device_ids: deviceIds,
@@ -77,27 +77,27 @@ const handleNodeCollaboratorCreate = async (
};
const handleNodeCollaboratorUpdate = async (
change: ChangeMessage<NodeCollaboratorChangeData>,
change: CdcMessage<NodeCollaboratorCdcData>,
) => {
const reaction = change.after;
if (!reaction) {
const nodeCollaborator = change.after;
if (!nodeCollaborator) {
return;
}
const deviceIds = await getDeviceIds(reaction.workspace_id);
const deviceIds = await getDeviceIds(nodeCollaborator.workspace_id);
if (deviceIds.length == 0) {
return;
}
const serverNodeCollaborator: ServerNodeCollaborator =
mapNodeCollaborator(reaction);
mapNodeCollaborator(nodeCollaborator);
await database
.insertInto('mutations')
.insertInto('changes')
.values({
id: NeuronId.generate(NeuronId.Type.Mutation),
id: NeuronId.generate(NeuronId.Type.Change),
table: 'node_collaborators',
action: 'update',
workspace_id: reaction.workspace_id,
workspace_id: nodeCollaborator.workspace_id,
created_at: new Date(),
after: JSON.stringify(serverNodeCollaborator),
device_ids: deviceIds,
@@ -106,27 +106,27 @@ const handleNodeCollaboratorUpdate = async (
};
const handleNodeCollaboratorDelete = async (
change: ChangeMessage<NodeCollaboratorChangeData>,
change: CdcMessage<NodeCollaboratorCdcData>,
) => {
const reaction = change.before;
if (!reaction) {
const nodeCollaborator = change.before;
if (!nodeCollaborator) {
return;
}
const deviceIds = await getDeviceIds(reaction.workspace_id);
const deviceIds = await getDeviceIds(nodeCollaborator.workspace_id);
if (deviceIds.length == 0) {
return;
}
const serverNodeCollaborator: ServerNodeCollaborator =
mapNodeCollaborator(reaction);
mapNodeCollaborator(nodeCollaborator);
await database
.insertInto('mutations')
.insertInto('changes')
.values({
id: NeuronId.generate(NeuronId.Type.Mutation),
id: NeuronId.generate(NeuronId.Type.Change),
table: 'node_collaborators',
action: 'delete',
workspace_id: reaction.workspace_id,
workspace_id: nodeCollaborator.workspace_id,
created_at: new Date(),
before: JSON.stringify(serverNodeCollaborator),
after: null,
@@ -142,7 +142,7 @@ const getDeviceIds = async (workspaceId: string) => {
'account_id',
'in',
database
.selectFrom('workspace_accounts')
.selectFrom('workspace_users')
.where('workspace_id', '=', workspaceId)
.select('account_id'),
)
@@ -154,21 +154,23 @@ const getDeviceIds = async (workspaceId: string) => {
};
const mapNodeCollaborator = (
reaction: NodeCollaboratorChangeData,
nodeCollaborator: NodeCollaboratorCdcData,
): ServerNodeCollaborator => {
return {
nodeId: reaction.node_id,
collaboratorId: reaction.collaborator_id,
role: reaction.role,
workspaceId: reaction.workspace_id,
createdAt: new Date(reaction.created_at),
createdBy: reaction.created_by,
updatedAt: reaction.updated_at ? new Date(reaction.updated_at) : null,
updatedBy: reaction.updated_by,
versionId: reaction.version_id,
serverCreatedAt: new Date(reaction.server_created_at),
serverUpdatedAt: reaction.server_updated_at
? new Date(reaction.server_updated_at)
nodeId: nodeCollaborator.node_id,
collaboratorId: nodeCollaborator.collaborator_id,
role: nodeCollaborator.role,
workspaceId: nodeCollaborator.workspace_id,
createdAt: new Date(nodeCollaborator.created_at),
createdBy: nodeCollaborator.created_by,
updatedAt: nodeCollaborator.updated_at
? new Date(nodeCollaborator.updated_at)
: null,
updatedBy: nodeCollaborator.updated_by,
versionId: nodeCollaborator.version_id,
serverCreatedAt: new Date(nodeCollaborator.server_created_at),
serverUpdatedAt: nodeCollaborator.server_updated_at
? new Date(nodeCollaborator.server_updated_at)
: null,
};
};

View File

@@ -1,5 +1,5 @@
import { kafka, TOPIC_NAMES, CONSUMER_IDS } from '@/data/kafka';
import { ChangeMessage, NodeReactionChangeData } from '@/types/changes';
import { CdcMessage, NodeReactionCdcData } from '@/types/cdc';
import { PostgresOperation } from '@/lib/constants';
import { database } from '@/data/database';
import { NeuronId } from '@/lib/id';
@@ -7,11 +7,11 @@ import { ServerNodeReaction } from '@/types/nodes';
export const initNodeReactionChangesConsumer = async () => {
const consumer = kafka.consumer({
groupId: CONSUMER_IDS.NODE_REACTION_CHANGES,
groupId: CONSUMER_IDS.NODE_REACTION_CDC,
});
await consumer.connect();
await consumer.subscribe({ topic: TOPIC_NAMES.NODE_REACTION_CHANGES });
await consumer.subscribe({ topic: TOPIC_NAMES.NODE_REACTION_CDC });
await consumer.run({
eachMessage: async ({ message }) => {
@@ -21,15 +21,15 @@ export const initNodeReactionChangesConsumer = async () => {
const change = JSON.parse(
message.value.toString(),
) as ChangeMessage<NodeReactionChangeData>;
) as CdcMessage<NodeReactionCdcData>;
await handleNodeReactionChange(change);
await handleNodeReactionCdc(change);
},
});
};
const handleNodeReactionChange = async (
change: ChangeMessage<NodeReactionChangeData>,
const handleNodeReactionCdc = async (
change: CdcMessage<NodeReactionCdcData>,
) => {
switch (change.op) {
case PostgresOperation.CREATE: {
@@ -44,7 +44,7 @@ const handleNodeReactionChange = async (
};
const handleNodeReactionCreate = async (
change: ChangeMessage<NodeReactionChangeData>,
change: CdcMessage<NodeReactionCdcData>,
) => {
const reaction = change.after;
if (!reaction) {
@@ -58,9 +58,9 @@ const handleNodeReactionCreate = async (
const serverNodeReaction: ServerNodeReaction = mapNodeReaction(reaction);
await database
.insertInto('mutations')
.insertInto('changes')
.values({
id: NeuronId.generate(NeuronId.Type.Mutation),
id: NeuronId.generate(NeuronId.Type.Change),
table: 'node_reactions',
action: 'insert',
workspace_id: reaction.workspace_id,
@@ -72,7 +72,7 @@ const handleNodeReactionCreate = async (
};
const handleNodeReactionDelete = async (
change: ChangeMessage<NodeReactionChangeData>,
change: CdcMessage<NodeReactionCdcData>,
) => {
const reaction = change.before;
if (!reaction) {
@@ -86,9 +86,9 @@ const handleNodeReactionDelete = async (
const serverNodeReaction: ServerNodeReaction = mapNodeReaction(reaction);
await database
.insertInto('mutations')
.insertInto('changes')
.values({
id: NeuronId.generate(NeuronId.Type.Mutation),
id: NeuronId.generate(NeuronId.Type.Change),
table: 'node_reactions',
action: 'delete',
workspace_id: reaction.workspace_id,
@@ -107,7 +107,7 @@ const getDeviceIds = async (workspaceId: string) => {
'account_id',
'in',
database
.selectFrom('workspace_accounts')
.selectFrom('workspace_users')
.where('workspace_id', '=', workspaceId)
.select('account_id'),
)
@@ -118,9 +118,7 @@ const getDeviceIds = async (workspaceId: string) => {
return deviceIds;
};
const mapNodeReaction = (
reaction: NodeReactionChangeData,
): ServerNodeReaction => {
const mapNodeReaction = (reaction: NodeReactionCdcData): ServerNodeReaction => {
return {
nodeId: reaction.node_id,
reactorId: reaction.actor_id,

View File

@@ -21,31 +21,27 @@ export const kafka = new Kafka({
export const producer = kafka.producer();
export const TOPIC_NAMES = {
NODE_CHANGES:
process.env.KAFKA_NODE_CHANGES_TOPIC_NAME ?? 'neuron_node_changes',
NODE_COLLABORATOR_CHANGES:
process.env.KAFKA_NODE_COLLABORATOR_CHANGES_TOPIC_NAME ??
'neuron_node_collaborator_changes',
NODE_REACTION_CHANGES:
process.env.KAFKA_NODE_REACTION_CHANGES_TOPIC_NAME ??
'neuron_node_reaction_changes',
MUTATION_CHANGES:
process.env.KAFKA_MUTATION_CHANGES_TOPIC_NAME ?? 'neuron_mutation_changes',
NODE_CDC: process.env.KAFKA_NODE_CDC_TOPIC_NAME ?? 'neuron_node_cdc',
NODE_COLLABORATOR_CDC:
process.env.KAFKA_NODE_COLLABORATOR_CDC_TOPIC_NAME ??
'neuron_node_collaborator_cdc',
NODE_REACTION_CDC:
process.env.KAFKA_NODE_REACTION_CDC_TOPIC_NAME ??
'neuron_node_reaction_cdc',
CHANGE_CDC: process.env.KAFKA_CHANGE_CDC_TOPIC_NAME ?? 'neuron_change_cdc',
};
export const CONSUMER_IDS = {
NODE_CHANGES:
process.env.KAFKA_NODE_CHANGES_CONSUMER_ID ??
'neuron_node_changes_consumer',
NODE_COLLABORATOR_CHANGES:
process.env.KAFKA_NODE_COLLABORATOR_CHANGES_CONSUMER_ID ??
'neuron_node_collaborator_changes_consumer',
NODE_REACTION_CHANGES:
process.env.KAFKA_NODE_REACTION_CHANGES_CONSUMER_ID ??
'neuron_node_reaction_changes_consumer',
MUTATION_CHANGES:
process.env.KAFKA_MUTATION_CHANGES_CONSUMER_ID ??
'neuron_mutation_changes_consumer',
NODE_CDC:
process.env.KAFKA_NODE_CDC_CONSUMER_ID ?? 'neuron_node_cdc_consumer',
NODE_COLLABORATOR_CDC:
process.env.KAFKA_NODE_COLLABORATOR_CDC_CONSUMER_ID ??
'neuron_node_collaborator_cdc_consumer',
NODE_REACTION_CDC:
process.env.KAFKA_NODE_REACTION_CDC_CONSUMER_ID ??
'neuron_node_reaction_cdc_consumer',
CHANGE_CDC:
process.env.KAFKA_CHANGE_CDC_CONSUMER_ID ?? 'neuron_change_cdc_consumer',
};
const connectProducer = async () => {

View File

@@ -42,13 +42,13 @@ const createWorkspacesTable: Migration = {
},
};
const createWorkspaceAccountsTable: Migration = {
const createWorkspaceUsersTable: Migration = {
up: async (db) => {
await db.schema
.createTable('workspace_accounts')
.createTable('workspace_users')
.addColumn('id', 'varchar(30)', (col) => col.notNull().primaryKey())
.addColumn('workspace_id', 'varchar(30)', (col) => col.notNull())
.addColumn('account_id', 'varchar(30)', (col) => col.notNull())
.addColumn('user_id', 'varchar(30)', (col) => col.notNull())
.addColumn('role', 'varchar(30)', (col) => col.notNull())
.addColumn('attrs', 'jsonb')
.addColumn('created_at', 'timestamptz', (col) => col.notNull())
@@ -57,14 +57,14 @@ const createWorkspaceAccountsTable: Migration = {
.addColumn('updated_by', 'varchar(30)')
.addColumn('status', 'integer', (col) => col.notNull())
.addColumn('version_id', 'varchar(30)', (col) => col.notNull())
.addPrimaryKeyConstraint('workspace_accounts_pkey', [
.addUniqueConstraint('unique_workspace_account_combination', [
'workspace_id',
'account_id',
])
.execute();
},
down: async (db) => {
await db.schema.dropTable('workspace_accounts').execute();
await db.schema.dropTable('workspace_users').execute();
},
};
@@ -182,10 +182,10 @@ const createAccountDevicesTable: Migration = {
},
};
const createMutationsTable: Migration = {
const createChangesTable: Migration = {
up: async (db) => {
await db.schema
.createTable('mutations')
.createTable('changes')
.addColumn('id', 'varchar(30)', (col) => col.notNull().primaryKey())
.addColumn('workspace_id', 'varchar(30)', (col) => col.notNull())
.addColumn('table', 'varchar(30)', (col) => col.notNull())
@@ -197,17 +197,17 @@ const createMutationsTable: Migration = {
.execute();
},
down: async (db) => {
await db.schema.dropTable('mutations').execute();
await db.schema.dropTable('changes').execute();
},
};
export const databaseMigrations: Record<string, Migration> = {
'00001_create_accounts_table': createAccountsTable,
'00002_create_workspaces_table': createWorkspacesTable,
'00003_create_workspace_accounts_table': createWorkspaceAccountsTable,
'00003_create_workspace_users_table': createWorkspaceUsersTable,
'00004_create_nodes_table': createNodesTable,
'00005_create_node_collaborators_table': createNodeCollaboratorsTable,
'00006_create_node_reactions_table': createNodeReactionsTable,
'00007_create_account_devices_table': createAccountDevicesTable,
'00008_create_mutations_table': createMutationsTable,
'00008_create_changes_table': createChangesTable,
};

View File

@@ -14,5 +14,5 @@ export const initRedis = async () => {
};
export const CHANNEL_NAMES = {
MUTATIONS: process.env.REDIS_MUTATIONS_CHANNEL_NAME || 'neuron_mutations',
CHANGES: process.env.REDIS_CHANGES_CHANNEL_NAME || 'neuron_changes',
};

View File

@@ -41,10 +41,10 @@ export type SelectWorkspace = Selectable<WorkspaceTable>;
export type CreateWorkspace = Insertable<WorkspaceTable>;
export type UpdateWorkspace = Updateable<WorkspaceTable>;
interface WorkspaceAccountTable {
interface WorkspaceUserTable {
id: ColumnType<string, string, never>;
workspace_id: ColumnType<string, string, never>;
account_id: ColumnType<string, string, never>;
user_id: ColumnType<string, string, never>;
role: ColumnType<string, string, string>;
attrs: ColumnType<string | null, string | null, string | null>;
created_at: ColumnType<Date, Date, never>;
@@ -55,9 +55,9 @@ interface WorkspaceAccountTable {
version_id: ColumnType<string, string, string>;
}
export type SelectWorkspaceAccount = Selectable<WorkspaceAccountTable>;
export type CreateWorkspaceAccount = Insertable<WorkspaceAccountTable>;
export type UpdateWorkspaceAccount = Updateable<WorkspaceAccountTable>;
export type SelectWorkspaceUser = Selectable<WorkspaceUserTable>;
export type CreateWorkspaceUser = Insertable<WorkspaceUserTable>;
export type UpdateWorkspaceUser = Updateable<WorkspaceUserTable>;
interface AccountDeviceTable {
id: ColumnType<string, string, never>;
@@ -132,7 +132,7 @@ export type SelectNodeReaction = Selectable<NodeReactionTable>;
export type CreateNodeReaction = Insertable<NodeReactionTable>;
export type UpdateNodeReaction = Updateable<NodeReactionTable>;
interface MutationTable {
interface ChangeTable {
id: ColumnType<string, string, never>;
workspace_id: ColumnType<string, string, never>;
table: ColumnType<string, string, never>;
@@ -143,17 +143,17 @@ interface MutationTable {
device_ids: ColumnType<string[], string[], string[]>;
}
export type SelectMutation = Selectable<MutationTable>;
export type CreateMutation = Insertable<MutationTable>;
export type UpdateMutation = Updateable<MutationTable>;
export type SelectChange = Selectable<ChangeTable>;
export type CreateChange = Insertable<ChangeTable>;
export type UpdateChange = Updateable<ChangeTable>;
export interface DatabaseSchema {
accounts: AccountTable;
workspaces: WorkspaceTable;
workspace_accounts: WorkspaceAccountTable;
workspace_users: WorkspaceUserTable;
account_devices: AccountDeviceTable;
nodes: NodeTable;
node_collaborators: NodeCollaboratorTable;
node_reactions: NodeReactionTable;
mutations: MutationTable;
changes: ChangeTable;
}

View File

@@ -1,35 +1,35 @@
import { initApi } from '@/api';
import { initRedis } from '@/data/redis';
import { initNodeChangesConsumer } from '@/consumers/node-changes';
import { initMutationChangesConsumer } from '@/consumers/mutation-changes';
import { initMutationsSubscriber } from '@/consumers/mutations';
import { initNodeCollaboratorChangesConsumer } from '@/consumers/node-collaborator-changes';
import { initNodeReactionChangesConsumer } from '@/consumers/node-reaction-changes';
import { initNodeChangesConsumer } from '@/consumers/node-cdc';
import { initChangeCdcConsumer } from '@/consumers/change-cdc';
import { initChangesSubscriber } from '@/consumers/mutations';
import { initNodeCollaboratorChangesConsumer } from '@/consumers/node-collaborator-cdc';
import { initNodeReactionChangesConsumer } from '@/consumers/node-reaction-cdc';
import { migrate } from '@/data/database';
migrate().then(() => {
initApi();
initNodeChangesConsumer().then(() => {
console.log('Node changes consumer started');
console.log('Node cdc consumer started');
});
initNodeCollaboratorChangesConsumer().then(() => {
console.log('Node collaborator change consumer started');
console.log('Node collaborator cdc consumer started');
});
initNodeReactionChangesConsumer().then(() => {
console.log('Node reaction change consumer started');
console.log('Node reaction cdc consumer started');
});
initMutationChangesConsumer().then(() => {
console.log('Mutation changes consumer started');
initChangeCdcConsumer().then(() => {
console.log('Change cdc consumer started');
});
initRedis().then(() => {
console.log('Redis initialized');
initMutationsSubscriber().then(() => {
console.log('Mutation subscriber started');
initChangesSubscriber().then(() => {
console.log('Change subscriber started');
});
});
});

View File

@@ -7,7 +7,7 @@ enum IdType {
Workspace = 'wc',
User = 'us',
Version = 've',
Mutation = 'mu',
Change = 'cd',
Space = 'sp',
Page = 'pg',
Channel = 'ch',
@@ -40,8 +40,8 @@ export class NeuronId {
return IdType.User;
case 'version':
return IdType.Version;
case 'mutation':
return IdType.Mutation;
case 'change':
return IdType.Change;
case 'space':
return IdType.Space;
case 'page':

View File

@@ -258,14 +258,14 @@ const buildLoginOutput = async (
email,
});
const accountWorkspaces = await database
.selectFrom('workspace_accounts')
const workspaceUsers = await database
.selectFrom('workspace_users')
.where('account_id', '=', id)
.selectAll()
.execute();
const workspaceOutputs: WorkspaceOutput[] = [];
const workspaceIds = accountWorkspaces.map((wa) => wa.workspace_id);
const workspaceIds = workspaceUsers.map((wa) => wa.workspace_id);
if (workspaceIds.length > 0) {
const workspaces = await database
.selectFrom('workspaces')
@@ -273,9 +273,9 @@ const buildLoginOutput = async (
.selectAll()
.execute();
for (const accountWorkspace of accountWorkspaces) {
for (const workspaceUser of workspaceUsers) {
const workspace = workspaces.find(
(w) => w.id === accountWorkspace.workspace_id,
(w) => w.id === workspaceUser.workspace_id,
);
if (!workspace) {
continue;
@@ -284,10 +284,10 @@ const buildLoginOutput = async (
workspaceOutputs.push({
id: workspace.id,
name: workspace.name,
role: accountWorkspace.role,
userId: accountWorkspace.user_id,
versionId: accountWorkspace.version_id,
accountId: accountWorkspace.account_id,
role: workspaceUser.role,
userId: workspaceUser.id,
versionId: workspaceUser.version_id,
accountId: workspaceUser.account_id,
avatar: workspace.avatar,
description: workspace.description,
});

View File

@@ -1,92 +0,0 @@
import { ApiError, NeuronRequest, NeuronResponse } from '@/types/api';
import { Router } from 'express';
import {
ExecuteLocalMutationResult,
ExecuteLocalMutationsInput,
LocalMutation,
ServerExecuteMutationResult,
} from '@/types/mutations';
import { handleNodeMutation } from '@/mutations/nodes';
import { handleNodeCollaboratorMutation } from '@/mutations/node-collaborators';
import { handleNodeReactionMutation } from '@/mutations/node-reactions';
import { database } from '@/data/database';
import { SelectWorkspaceAccount } from '@/data/schema';
export const mutationsRouter = Router();
mutationsRouter.post('/', async (req: NeuronRequest, res: NeuronResponse) => {
const input = req.body as ExecuteLocalMutationsInput;
if (!req.accountId) {
return res.status(401).json({
code: ApiError.Unauthorized,
message: 'Unauthorized.',
});
}
const workspace = await database
.selectFrom('workspaces')
.selectAll()
.where('id', '=', input.workspaceId)
.executeTakeFirst();
if (!workspace) {
return res.status(404).json({
code: ApiError.ResourceNotFound,
message: 'Workspace not found.',
});
}
const workspaceAccount = await database
.selectFrom('workspace_accounts')
.selectAll()
.where('workspace_id', '=', workspace.id)
.where('account_id', '=', req.accountId)
.executeTakeFirst();
if (!workspaceAccount) {
return res.status(403).json({
code: ApiError.Forbidden,
message: 'Forbidden.',
});
}
const results: ServerExecuteMutationResult[] = [];
for (const mutation of input.mutations) {
try {
const result = await handleLocalMutation(workspaceAccount, mutation);
results.push({
id: mutation.id,
status: result.status,
});
} catch (error) {
results.push({
id: mutation.id,
status: 'error',
});
}
}
console.log('executed mutations', results);
res.status(200).json({ results });
});
const handleLocalMutation = async (
workspaceAccount: SelectWorkspaceAccount,
mutation: LocalMutation,
): Promise<ExecuteLocalMutationResult> => {
switch (mutation.table) {
case 'nodes': {
return handleNodeMutation(workspaceAccount, mutation);
}
case 'node_collaborators': {
return handleNodeCollaboratorMutation(workspaceAccount, mutation);
}
case 'node_reactions': {
return handleNodeReactionMutation(workspaceAccount, mutation);
}
}
return {
status: 'error',
};
};

View File

@@ -1,14 +1,28 @@
import { NeuronRequest, NeuronResponse } from '@/types/api';
import { ApiError, NeuronRequest, NeuronResponse } from '@/types/api';
import { database } from '@/data/database';
import { Router } from 'express';
import { ServerNode, ServerNodeReaction } from '@/types/nodes';
import {
ServerNode,
ServerNodeCollaborator,
ServerNodeReaction,
} from '@/types/nodes';
import { compareString } from '@/lib/utils';
import { mapNode } from '@/lib/nodes';
import {
LocalChange,
ServerSyncChangeResult,
SyncLocalChangeResult,
SyncLocalChangesInput,
} from '@/types/sync';
import { SelectWorkspaceUser } from '@/data/schema';
import { handleNodeChange } from '@/sync/nodes';
import { handleNodeCollaboratorChange } from '@/sync/node-collaborators';
import { handleNodeReactionChange } from '@/sync/node-reactions';
export const syncRouter = Router();
syncRouter.get(
'/:workspaceId/sync',
'/:workspaceId',
async (req: NeuronRequest, res: NeuronResponse) => {
const workspaceId = req.params.workspaceId as string;
const nodeRows = await database
@@ -23,6 +37,12 @@ syncRouter.get(
.where('workspace_id', '=', workspaceId)
.execute();
const nodeCollaboratorRows = await database
.selectFrom('node_collaborators')
.selectAll()
.where('workspace_id', '=', workspaceId)
.execute();
const nodes: ServerNode[] = nodeRows
.map((node) => mapNode(node))
.sort((a, b) => compareString(a.id, b.id));
@@ -40,9 +60,107 @@ syncRouter.get(
},
);
const nodeCollaborators: ServerNodeCollaborator[] =
nodeCollaboratorRows.map((nodeCollaborator) => {
return {
nodeId: nodeCollaborator.node_id,
collaboratorId: nodeCollaborator.collaborator_id,
role: nodeCollaborator.role,
workspaceId: nodeCollaborator.workspace_id,
createdAt: nodeCollaborator.created_at,
createdBy: nodeCollaborator.created_by,
updatedAt: nodeCollaborator.updated_at,
updatedBy: nodeCollaborator.updated_by,
versionId: nodeCollaborator.version_id,
serverCreatedAt: nodeCollaborator.server_created_at,
serverUpdatedAt: nodeCollaborator.server_created_at,
};
});
res.status(200).json({
nodes,
nodeReactions,
nodeCollabors: nodeCollaborators,
});
},
);
syncRouter.post(
'/:workspaceId',
async (req: NeuronRequest, res: NeuronResponse) => {
const input = req.body as SyncLocalChangesInput;
if (!req.accountId) {
return res.status(401).json({
code: ApiError.Unauthorized,
message: 'Unauthorized.',
});
}
const workspace = await database
.selectFrom('workspaces')
.selectAll()
.where('id', '=', input.workspaceId)
.executeTakeFirst();
if (!workspace) {
return res.status(404).json({
code: ApiError.ResourceNotFound,
message: 'Workspace not found.',
});
}
const workspaceAccount = await database
.selectFrom('workspace_users')
.selectAll()
.where('workspace_id', '=', workspace.id)
.where('account_id', '=', req.accountId)
.executeTakeFirst();
if (!workspaceAccount) {
return res.status(403).json({
code: ApiError.Forbidden,
message: 'Forbidden.',
});
}
const results: ServerSyncChangeResult[] = [];
for (const mutation of input.changes) {
try {
const result = await handleLocalChange(workspaceAccount, mutation);
results.push({
id: mutation.id,
status: result.status,
});
} catch (error) {
results.push({
id: mutation.id,
status: 'error',
});
}
}
console.log('executed mutations', results);
res.status(200).json({ results });
},
);
const handleLocalChange = async (
workspaceUser: SelectWorkspaceUser,
mutation: LocalChange,
): Promise<SyncLocalChangeResult> => {
switch (mutation.table) {
case 'nodes': {
return handleNodeChange(workspaceUser, mutation);
}
case 'node_collaborators': {
return handleNodeCollaboratorChange(workspaceUser, mutation);
}
case 'node_reactions': {
return handleNodeReactionChange(workspaceUser, mutation);
}
}
return {
status: 'error',
};
};

View File

@@ -1,9 +1,9 @@
import {
Workspace,
WorkspaceAccount,
WorkspaceUser,
WorkspaceAccountRoleUpdateInput,
WorkspaceAccountsInviteInput,
WorkspaceAccountStatus,
WorkspaceUserStatus,
WorkspaceInput,
WorkspaceOutput,
WorkspaceRole,
@@ -18,9 +18,9 @@ import { fromUint8Array, toUint8Array } from 'js-base64';
import {
CreateAccount,
CreateNode,
CreateWorkspaceAccount,
CreateWorkspaceUser,
SelectNode,
SelectWorkspaceAccount,
SelectWorkspaceUser,
} from '@/data/schema';
import { getNameFromEmail } from '@/lib/utils';
import { AccountStatus } from '@/types/accounts';
@@ -89,14 +89,14 @@ workspacesRouter.post('/', async (req: NeuronRequest, res: NeuronResponse) => {
const userAttributes = JSON.stringify(userAttributesMap.toJSON());
const userState = fromUint8Array(Y.encodeStateAsUpdate(userDoc));
const workspaceAccount: WorkspaceAccount = {
const workspaceUser: WorkspaceUser = {
id: userId,
accountId: req.accountId,
workspaceId: workspace.id,
userId: userId,
role: WorkspaceRole.Owner,
createdAt: new Date(),
createdBy: req.accountId,
status: WorkspaceAccountStatus.Active,
status: WorkspaceUserStatus.Active,
versionId: NeuronId.generate(NeuronId.Type.Version),
};
@@ -122,24 +122,24 @@ workspacesRouter.post('/', async (req: NeuronRequest, res: NeuronResponse) => {
workspace_id: workspace.id,
attributes: userAttributes,
state: userState,
created_at: workspaceAccount.createdAt,
created_by: workspaceAccount.createdBy,
created_at: workspaceUser.createdAt,
created_by: workspaceUser.createdBy,
version_id: userVersionId,
server_created_at: new Date(),
})
.execute();
await trx
.insertInto('workspace_accounts')
.insertInto('workspace_users')
.values({
account_id: workspaceAccount.accountId,
workspace_id: workspaceAccount.workspaceId,
user_id: workspaceAccount.userId,
role: workspaceAccount.role,
created_at: workspaceAccount.createdAt,
created_by: workspaceAccount.createdBy,
status: workspaceAccount.status,
version_id: workspaceAccount.versionId,
id: workspaceUser.id,
account_id: workspaceUser.accountId,
workspace_id: workspaceUser.workspaceId,
role: workspaceUser.role,
created_at: workspaceUser.createdAt,
created_by: workspaceUser.createdBy,
status: workspaceUser.status,
version_id: workspaceUser.versionId,
})
.execute();
});
@@ -151,7 +151,7 @@ workspacesRouter.post('/', async (req: NeuronRequest, res: NeuronResponse) => {
avatar: workspace.avatar,
versionId: workspace.versionId,
accountId: account.id,
role: workspaceAccount.role,
role: workspaceUser.role,
userId: userId,
};
@@ -184,35 +184,35 @@ workspacesRouter.put(
});
}
const workspaceAccount = await database
.selectFrom('workspace_accounts')
const workspaceUser = await database
.selectFrom('workspace_users')
.selectAll()
.where('workspace_id', '=', id)
.where('account_id', '=', req.accountId)
.executeTakeFirst();
if (!workspaceAccount) {
if (!workspaceUser) {
return res.status(403).json({
code: ApiError.Forbidden,
message: 'Forbidden.',
});
}
if (workspaceAccount.role !== WorkspaceRole.Owner) {
if (workspaceUser.role !== WorkspaceRole.Owner) {
return res.status(403).json({
code: ApiError.Forbidden,
message: 'Forbidden.',
});
}
if (!workspaceAccount) {
if (!workspaceUser) {
return res.status(403).json({
code: ApiError.Forbidden,
message: 'Forbidden.',
});
}
if (workspaceAccount.role !== WorkspaceRole.Owner) {
if (workspaceUser.role !== WorkspaceRole.Owner) {
return res.status(403).json({
code: ApiError.Forbidden,
message: 'Forbidden.',
@@ -246,8 +246,8 @@ workspacesRouter.put(
avatar: updatedWorkspace.avatar,
versionId: updatedWorkspace.version_id,
accountId: req.accountId,
role: workspaceAccount.role,
userId: workspaceAccount.user_id,
role: workspaceUser.role,
userId: workspaceUser.id,
};
return res.status(200).json(output);
@@ -279,21 +279,21 @@ workspacesRouter.delete(
});
}
const workspaceAccount = await database
.selectFrom('workspace_accounts')
const workspaceUser = await database
.selectFrom('workspace_users')
.selectAll()
.where('workspace_id', '=', id)
.where('account_id', '=', req.accountId)
.executeTakeFirst();
if (!workspaceAccount) {
if (!workspaceUser) {
return res.status(403).json({
code: ApiError.Forbidden,
message: 'Forbidden.',
});
}
if (workspaceAccount.role !== WorkspaceRole.Owner) {
if (workspaceUser.role !== WorkspaceRole.Owner) {
return res.status(403).json({
code: ApiError.Forbidden,
message: 'Forbidden.',
@@ -302,64 +302,65 @@ workspacesRouter.delete(
await database.deleteFrom('workspaces').where('id', '=', id).execute();
await database.deleteFrom('workspaces').where('id', '=', id).execute();
return res.status(200).json({
id: workspace.id,
});
},
);
workspacesRouter.get(':id', async (req: NeuronRequest, res: NeuronResponse) => {
const id = req.params.id;
workspacesRouter.get(
'/:id',
async (req: NeuronRequest, res: NeuronResponse) => {
const id = req.params.id;
if (!req.accountId) {
return res.status(401).json({
code: ApiError.Unauthorized,
message: 'Unauthorized.',
});
}
if (!req.accountId) {
return res.status(401).json({
code: ApiError.Unauthorized,
message: 'Unauthorized.',
});
}
const workspace = await database
.selectFrom('workspaces')
.selectAll()
.where('id', '=', id)
.executeTakeFirst();
const workspace = await database
.selectFrom('workspaces')
.selectAll()
.where('id', '=', id)
.executeTakeFirst();
if (!workspace) {
return res.status(404).json({
code: ApiError.ResourceNotFound,
message: 'Workspace not found.',
});
}
if (!workspace) {
return res.status(404).json({
code: ApiError.ResourceNotFound,
message: 'Workspace not found.',
});
}
const workspaceAccount = await database
.selectFrom('workspace_accounts')
.selectAll()
.where('workspace_id', '=', id)
.where('account_id', '=', req.accountId)
.executeTakeFirst();
const workspaceUser = await database
.selectFrom('workspace_users')
.selectAll()
.where('workspace_id', '=', id)
.where('account_id', '=', req.accountId)
.executeTakeFirst();
if (!workspaceAccount) {
return res.status(403).json({
code: ApiError.Forbidden,
message: 'Forbidden.',
});
}
if (!workspaceUser) {
return res.status(403).json({
code: ApiError.Forbidden,
message: 'Forbidden.',
});
}
const output: WorkspaceOutput = {
id: workspace.id,
name: workspace.name,
description: workspace.description,
avatar: workspace.avatar,
versionId: workspace.version_id,
accountId: req.accountId,
role: workspaceAccount.role,
userId: workspaceAccount.user_id,
};
const output: WorkspaceOutput = {
id: workspace.id,
name: workspace.name,
description: workspace.description,
avatar: workspace.avatar,
versionId: workspace.version_id,
accountId: req.accountId,
role: workspaceUser.role,
userId: workspaceUser.id,
};
return res.status(200).json(output);
});
return res.status(200).json(output);
},
);
workspacesRouter.get('/', async (req: NeuronRequest, res: NeuronResponse) => {
if (!req.accountId) {
@@ -369,13 +370,13 @@ workspacesRouter.get('/', async (req: NeuronRequest, res: NeuronResponse) => {
});
}
const workspaceAccounts = await database
.selectFrom('workspace_accounts')
const workspaceUsers = await database
.selectFrom('workspace_users')
.selectAll()
.where('account_id', '=', req.accountId)
.execute();
const workspaceIds = workspaceAccounts.map((wa) => wa.workspace_id);
const workspaceIds = workspaceUsers.map((wa) => wa.workspace_id);
const workspaces = await database
.selectFrom('workspaces')
.selectAll()
@@ -385,7 +386,7 @@ workspacesRouter.get('/', async (req: NeuronRequest, res: NeuronResponse) => {
const outputs: WorkspaceOutput[] = [];
for (const workspace of workspaces) {
const workspaceAccount = workspaceAccounts.find(
const workspaceAccount = workspaceUsers.find(
(wa) => wa.workspace_id === workspace.id,
);
@@ -401,7 +402,7 @@ workspacesRouter.get('/', async (req: NeuronRequest, res: NeuronResponse) => {
versionId: workspace.version_id,
accountId: req.accountId,
role: workspaceAccount.role,
userId: workspaceAccount.user_id,
userId: workspaceAccount.id,
};
outputs.push(output);
@@ -411,7 +412,7 @@ workspacesRouter.get('/', async (req: NeuronRequest, res: NeuronResponse) => {
});
workspacesRouter.post(
'/:id/accounts',
'/:id/users',
async (req: NeuronRequest, res: NeuronResponse) => {
const id = req.params.id;
const input: WorkspaceAccountsInviteInput = req.body;
@@ -443,14 +444,14 @@ workspacesRouter.post(
});
}
const workspaceAccount = await database
.selectFrom('workspace_accounts')
const workspaceUser = await database
.selectFrom('workspace_users')
.selectAll()
.where('workspace_id', '=', id)
.where('account_id', '=', req.accountId)
.executeTakeFirst();
if (!workspaceAccount) {
if (!workspaceUser) {
return res.status(403).json({
code: ApiError.Forbidden,
message: 'Forbidden.',
@@ -458,8 +459,8 @@ workspacesRouter.post(
}
if (
workspaceAccount.role !== WorkspaceRole.Owner &&
workspaceAccount.role !== WorkspaceRole.Admin
workspaceUser.role !== WorkspaceRole.Owner &&
workspaceUser.role !== WorkspaceRole.Admin
) {
return res.status(403).json({
code: ApiError.Forbidden,
@@ -473,12 +474,12 @@ workspacesRouter.post(
.where('email', 'in', input.emails)
.execute();
let existingWorkspaceAccounts: SelectWorkspaceAccount[] = [];
let existingWorkspaceUsers: SelectWorkspaceUser[] = [];
let existingUsers: SelectNode[] = [];
if (existingAccounts.length > 0) {
const existingAccountIds = existingAccounts.map((account) => account.id);
existingWorkspaceAccounts = await database
.selectFrom('workspace_accounts')
existingWorkspaceUsers = await database
.selectFrom('workspace_users')
.selectAll()
.where((eb) =>
eb.and([
@@ -489,9 +490,9 @@ workspacesRouter.post(
.execute();
}
if (existingWorkspaceAccounts.length > 0) {
const existingUserIds = existingWorkspaceAccounts.map(
(workspaceAccount) => workspaceAccount.user_id,
if (existingWorkspaceUsers.length > 0) {
const existingUserIds = existingWorkspaceUsers.map(
(workspaceAccount) => workspaceAccount.id,
);
existingUsers = await database
.selectFrom('nodes')
@@ -501,7 +502,7 @@ workspacesRouter.post(
}
const accountsToCreate: CreateAccount[] = [];
const workspaceAccountsToCreate: CreateWorkspaceAccount[] = [];
const workspaceUsersToCreate: CreateWorkspaceUser[] = [];
const usersToCreate: CreateNode[] = [];
const users: ServerNode[] = [];
@@ -530,13 +531,13 @@ workspacesRouter.post(
});
}
const existingWorkspaceAccount = existingWorkspaceAccounts.find(
(workspaceAccount) => workspaceAccount.account_id === account!.id,
const existingWorkspaceUser = existingWorkspaceUsers.find(
(workspaceUser) => workspaceUser.account_id === account!.id,
);
if (existingWorkspaceAccount) {
if (existingWorkspaceUser) {
const existingUser = existingUsers.find(
(user) => user.id === existingWorkspaceAccount.user_id,
(user) => user.id === existingWorkspaceUser.id,
);
if (!existingUser) {
return res.status(500).json({
@@ -568,14 +569,14 @@ workspacesRouter.post(
const userAttributes = JSON.stringify(userAttributesMap.toJSON());
const userState = fromUint8Array(Y.encodeStateAsUpdate(userDoc));
workspaceAccountsToCreate.push({
workspaceUsersToCreate.push({
id: userId,
account_id: account!.id,
workspace_id: workspace.id,
user_id: userId,
role: WorkspaceRole.Collaborator,
created_at: new Date(),
created_by: req.accountId,
status: WorkspaceAccountStatus.Active,
status: WorkspaceUserStatus.Active,
version_id: NeuronId.generate(NeuronId.Type.Version),
});
@@ -585,7 +586,7 @@ workspacesRouter.post(
attributes: JSON.parse(userAttributes),
state: userState,
createdAt: new Date(),
createdBy: workspaceAccount.user_id,
createdBy: workspaceUser.id,
serverCreatedAt: new Date(),
versionId: userVersionId,
workspaceId: workspace.id,
@@ -608,7 +609,7 @@ workspacesRouter.post(
if (
accountsToCreate.length > 0 ||
workspaceAccountsToCreate.length > 0 ||
workspaceUsersToCreate.length > 0 ||
usersToCreate.length > 0
) {
await database.transaction().execute(async (trx) => {
@@ -616,10 +617,10 @@ workspacesRouter.post(
await trx.insertInto('accounts').values(accountsToCreate).execute();
}
if (workspaceAccountsToCreate.length > 0) {
if (workspaceUsersToCreate.length > 0) {
await trx
.insertInto('workspace_accounts')
.values(workspaceAccountsToCreate)
.insertInto('workspace_users')
.values(workspaceUsersToCreate)
.execute();
}
@@ -636,10 +637,10 @@ workspacesRouter.post(
);
workspacesRouter.put(
'/:id/accounts/:accountId',
'/:id/users/:userId',
async (req: NeuronRequest, res: NeuronResponse) => {
const id = req.params.id;
const accountId = req.params.accountId;
const userId = req.params.userId;
const input: WorkspaceAccountRoleUpdateInput = req.body;
if (!req.accountId) {
@@ -662,18 +663,14 @@ workspacesRouter.put(
});
}
const workspaceAccounts = await database
.selectFrom('workspace_accounts')
const currentWorkspaceUser = await database
.selectFrom('workspace_users')
.selectAll()
.where('workspace_id', '=', id)
.where('account_id', 'in', [req.accountId, accountId])
.execute();
.where('account_id', '=', req.accountId)
.executeTakeFirst();
const currentWorkspaceAccount = workspaceAccounts.find(
(workspaceAccount) => workspaceAccount.account_id === req.accountId,
);
if (!currentWorkspaceAccount) {
if (!currentWorkspaceUser) {
return res.status(403).json({
code: ApiError.Forbidden,
message: 'Forbidden.',
@@ -681,8 +678,8 @@ workspacesRouter.put(
}
if (
currentWorkspaceAccount.role !== WorkspaceRole.Owner &&
currentWorkspaceAccount.role !== WorkspaceRole.Admin
currentWorkspaceUser.role !== WorkspaceRole.Owner &&
currentWorkspaceUser.role !== WorkspaceRole.Admin
) {
return res.status(403).json({
code: ApiError.Forbidden,
@@ -690,11 +687,13 @@ workspacesRouter.put(
});
}
const workspaceAccountToUpdate = workspaceAccounts.find(
(workspaceAccount) => workspaceAccount.account_id === accountId,
);
const workspaceUserToUpdate = await database
.selectFrom('workspace_users')
.selectAll()
.where('id', '=', userId)
.executeTakeFirst();
if (!workspaceAccountToUpdate) {
if (!workspaceUserToUpdate) {
return res.status(404).json({
code: ApiError.ResourceNotFound,
message: 'NotFound.',
@@ -704,7 +703,7 @@ workspacesRouter.put(
const user = await database
.selectFrom('nodes')
.selectAll()
.where('id', '=', workspaceAccountToUpdate.user_id)
.where('id', '=', workspaceUserToUpdate.id)
.executeTakeFirst();
if (!user) {
@@ -741,18 +740,19 @@ workspacesRouter.put(
serverUpdatedAt: updatedAt,
versionId: NeuronId.generate(NeuronId.Type.Version),
updatedAt: updatedAt,
updatedBy: currentWorkspaceAccount.user_id,
updatedBy: currentWorkspaceUser.id,
};
await database.transaction().execute(async (trx) => {
await database
.updateTable('workspace_accounts')
.updateTable('workspace_users')
.set({
role: input.role,
updated_at: new Date(),
updated_by: currentWorkspaceAccount.account_id,
updated_by: currentWorkspaceUser.account_id,
version_id: NeuronId.generate(NeuronId.Type.Version),
})
.where('id', '=', userId)
.execute();
await database
@@ -762,7 +762,7 @@ workspacesRouter.put(
state: encodedState,
server_updated_at: updatedAt,
updated_at: updatedAt,
updated_by: currentWorkspaceAccount.user_id,
updated_by: currentWorkspaceUser.id,
version_id: userNode.versionId,
})
.where('id', '=', userNode.id)

View File

@@ -3,8 +3,8 @@ import { IncomingMessage } from 'http';
import { SocketMessage } from '@/types/sockets';
import { database } from '@/data/database';
import { sql } from 'kysely';
import { SelectMutation } from '@/data/schema';
import { ServerMutation } from '@/types/mutations';
import { SelectChange } from '@/data/schema';
import { ServerChange } from '@/types/sync';
class SynapseManager {
private readonly sockets: Map<string, WebSocket> = new Map();
@@ -25,7 +25,7 @@ class SynapseManager {
});
this.sockets.set(deviceId, socket);
await this.sendPendingMutations(deviceId);
await this.sendPendingChanges(deviceId);
}
public send(deviceId: string, message: SocketMessage) {
@@ -54,7 +54,7 @@ class SynapseManager {
}
await database
.updateTable('mutations')
.updateTable('changes')
.set({
device_ids: sql`array_remove(device_ids, ${deviceId})`,
})
@@ -62,37 +62,37 @@ class SynapseManager {
.execute();
}
private async sendPendingMutations(deviceId: string) {
private async sendPendingChanges(deviceId: string) {
let lastId = '0';
let hasMore = true;
while (hasMore) {
const pendingMutations = await sql<SelectMutation>`
const pendingChanges = await sql<SelectChange>`
SELECT *
FROM mutations
FROM changes
WHERE ${deviceId} = ANY(device_ids)
AND id > ${lastId}
ORDER BY id ASC
LIMIT 50
`.execute(database);
for (const mutation of pendingMutations.rows) {
const serverMutation: ServerMutation = {
id: mutation.id,
action: mutation.action as 'insert' | 'update' | 'delete',
table: mutation.table,
workspaceId: mutation.workspace_id,
before: mutation.before,
after: mutation.after,
for (const change of pendingChanges.rows) {
const serverChange: ServerChange = {
id: change.id,
action: change.action as 'insert' | 'update' | 'delete',
table: change.table,
workspaceId: change.workspace_id,
before: change.before,
after: change.after,
};
this.send(deviceId, {
type: 'mutation',
payload: serverMutation,
type: 'change',
payload: serverChange,
});
lastId = mutation.id;
lastId = change.id;
}
hasMore = pendingMutations.rows.length === 50;
hasMore = pendingChanges.rows.length === 50;
}
}
}

View File

@@ -1,45 +1,45 @@
import { database } from '@/data/database';
import { SelectWorkspaceAccount } from '@/data/schema';
import { SelectWorkspaceUser } from '@/data/schema';
import { getCollaboratorRole } from '@/lib/nodes';
import {
ExecuteLocalMutationResult,
LocalMutation,
LocalNodeCollaboratorMutationData,
} from '@/types/mutations';
SyncLocalChangeResult,
LocalChange,
LocalNodeCollaboratorChangeData,
} from '@/types/sync';
export const handleNodeCollaboratorMutation = async (
workspaceAccount: SelectWorkspaceAccount,
mutation: LocalMutation,
): Promise<ExecuteLocalMutationResult> => {
switch (mutation.action) {
export const handleNodeCollaboratorChange = async (
workspaceUser: SelectWorkspaceUser,
change: LocalChange,
): Promise<SyncLocalChangeResult> => {
switch (change.action) {
case 'insert': {
return handleCreateNodeCollaboratorMutation(workspaceAccount, mutation);
return handleCreateNodeCollaboratorChange(workspaceUser, change);
}
case 'update': {
return handleUpdateNodeCollaboratorMutation(workspaceAccount, mutation);
return handleUpdateNodeCollaboratorChange(workspaceUser, change);
}
case 'delete': {
return handleDeleteNodeCollaboratorMutation(workspaceAccount, mutation);
return handleDeleteNodeCollaboratorChange(workspaceUser, change);
}
}
};
const handleCreateNodeCollaboratorMutation = async (
workspaceAccount: SelectWorkspaceAccount,
mutation: LocalMutation,
): Promise<ExecuteLocalMutationResult> => {
if (!mutation.after) {
const handleCreateNodeCollaboratorChange = async (
workspaceUser: SelectWorkspaceUser,
change: LocalChange,
): Promise<SyncLocalChangeResult> => {
if (!change.after) {
return {
status: 'error',
};
}
const nodeCollaboratorData = JSON.parse(
mutation.after,
) as LocalNodeCollaboratorMutationData;
change.after,
) as LocalNodeCollaboratorChangeData;
const canCreate = await canCreateNodeCollaborator(
workspaceAccount,
workspaceUser,
nodeCollaboratorData,
);
@@ -55,7 +55,7 @@ const handleCreateNodeCollaboratorMutation = async (
node_id: nodeCollaboratorData.node_id,
collaborator_id: nodeCollaboratorData.collaborator_id,
role: nodeCollaboratorData.role,
workspace_id: workspaceAccount.workspace_id,
workspace_id: workspaceUser.workspace_id,
created_at: new Date(nodeCollaboratorData.created_at),
created_by: nodeCollaboratorData.created_by,
server_created_at: new Date(),
@@ -70,8 +70,8 @@ const handleCreateNodeCollaboratorMutation = async (
};
const canCreateNodeCollaborator = async (
workspaceAccount: SelectWorkspaceAccount,
data: LocalNodeCollaboratorMutationData,
workspaceUser: SelectWorkspaceUser,
data: LocalNodeCollaboratorChangeData,
): Promise<boolean> => {
const node = await database
.selectFrom('nodes')
@@ -86,8 +86,8 @@ const canCreateNodeCollaborator = async (
// If the node is a root node and created by the current user
if (
node.parent_id === null &&
node.created_by === workspaceAccount.user_id &&
data.collaborator_id === workspaceAccount.user_id
node.created_by === workspaceUser.id &&
data.collaborator_id === workspaceUser.id
) {
return true;
}
@@ -95,7 +95,7 @@ const canCreateNodeCollaborator = async (
// Get the current user's role for the node or its ancestors
const currentUserRole = await getCollaboratorRole(
data.node_id,
workspaceAccount.user_id,
workspaceUser.id,
);
if (currentUserRole === null) {
@@ -120,19 +120,19 @@ const canCreateNodeCollaborator = async (
return false;
};
const handleUpdateNodeCollaboratorMutation = async (
workspaceAccount: SelectWorkspaceAccount,
mutation: LocalMutation,
): Promise<ExecuteLocalMutationResult> => {
if (!mutation.after) {
const handleUpdateNodeCollaboratorChange = async (
workspaceUser: SelectWorkspaceUser,
change: LocalChange,
): Promise<SyncLocalChangeResult> => {
if (!change.after) {
return {
status: 'error',
};
}
const nodeCollaboratorData = JSON.parse(
mutation.after,
) as LocalNodeCollaboratorMutationData;
change.after,
) as LocalNodeCollaboratorChangeData;
const existingNodeCollaborator = await database
.selectFrom('node_collaborators')
@@ -147,7 +147,7 @@ const handleUpdateNodeCollaboratorMutation = async (
if (
!existingNodeCollaborator ||
existingNodeCollaborator.workspace_id != workspaceAccount.workspace_id ||
existingNodeCollaborator.workspace_id != workspaceUser.workspace_id ||
existingNodeCollaborator.updated_at === null ||
existingNodeCollaborator.updated_by === null
) {
@@ -157,7 +157,7 @@ const handleUpdateNodeCollaboratorMutation = async (
}
const canUpdate = await canUpdateNodeCollaborator(
workspaceAccount,
workspaceUser,
nodeCollaboratorData,
);
@@ -209,8 +209,8 @@ const handleUpdateNodeCollaboratorMutation = async (
};
const canUpdateNodeCollaborator = async (
workspaceAccount: SelectWorkspaceAccount,
data: LocalNodeCollaboratorMutationData,
workspaceUser: SelectWorkspaceUser,
data: LocalNodeCollaboratorChangeData,
): Promise<boolean> => {
const node = await database
.selectFrom('nodes')
@@ -225,7 +225,7 @@ const canUpdateNodeCollaborator = async (
// Get the current user's role for the node or its ancestors
const currentUserRole = await getCollaboratorRole(
data.node_id,
workspaceAccount.user_id,
workspaceUser.id,
);
if (currentUserRole === null) {
@@ -250,19 +250,19 @@ const canUpdateNodeCollaborator = async (
return false;
};
const handleDeleteNodeCollaboratorMutation = async (
workspaceAccount: SelectWorkspaceAccount,
mutation: LocalMutation,
): Promise<ExecuteLocalMutationResult> => {
if (!mutation.before) {
const handleDeleteNodeCollaboratorChange = async (
workspaceUser: SelectWorkspaceUser,
change: LocalChange,
): Promise<SyncLocalChangeResult> => {
if (!change.before) {
return {
status: 'error',
};
}
const nodeCollaboratorData = JSON.parse(
mutation.before,
) as LocalNodeCollaboratorMutationData;
change.before,
) as LocalNodeCollaboratorChangeData;
const existingNodeCollaborator = await database
.selectFrom('node_collaborators')
@@ -277,7 +277,7 @@ const handleDeleteNodeCollaboratorMutation = async (
if (
!existingNodeCollaborator ||
existingNodeCollaborator.workspace_id != workspaceAccount.workspace_id
existingNodeCollaborator.workspace_id != workspaceUser.workspace_id
) {
return {
status: 'error',
@@ -285,7 +285,7 @@ const handleDeleteNodeCollaboratorMutation = async (
}
const canDelete = await canDeleteNodeCollaborator(
workspaceAccount,
workspaceUser,
nodeCollaboratorData,
);
if (!canDelete) {
@@ -310,8 +310,8 @@ const handleDeleteNodeCollaboratorMutation = async (
};
const canDeleteNodeCollaborator = async (
workspaceAccount: SelectWorkspaceAccount,
data: LocalNodeCollaboratorMutationData,
workspaceUser: SelectWorkspaceUser,
data: LocalNodeCollaboratorChangeData,
): Promise<boolean> => {
const node = await database
.selectFrom('nodes')
@@ -326,7 +326,7 @@ const canDeleteNodeCollaborator = async (
// Get the current user's role for the node or its ancestors
const currentUserRole = await getCollaboratorRole(
data.node_id,
workspaceAccount.user_id,
workspaceUser.id,
);
if (currentUserRole === null) {

View File

@@ -1,22 +1,22 @@
import { database } from '@/data/database';
import { SelectWorkspaceAccount } from '@/data/schema';
import { SelectWorkspaceUser } from '@/data/schema';
import { getCollaboratorRole } from '@/lib/nodes';
import {
ExecuteLocalMutationResult,
LocalMutation,
LocalNodeReactionMutationData,
} from '@/types/mutations';
SyncLocalChangeResult,
LocalChange,
LocalNodeReactionChangeData,
} from '@/types/sync';
export const handleNodeReactionMutation = async (
workspaceAccount: SelectWorkspaceAccount,
mutation: LocalMutation,
): Promise<ExecuteLocalMutationResult> => {
export const handleNodeReactionChange = async (
workspaceUser: SelectWorkspaceUser,
mutation: LocalChange,
): Promise<SyncLocalChangeResult> => {
switch (mutation.action) {
case 'insert': {
return handleCreateNodeReactionMutation(workspaceAccount, mutation);
return handleCreateNodeReactionChange(workspaceUser, mutation);
}
case 'delete': {
return handleDeleteNodeReactionMutation(workspaceAccount, mutation);
return handleDeleteNodeReactionChange(workspaceUser, mutation);
}
}
@@ -25,21 +25,21 @@ export const handleNodeReactionMutation = async (
};
};
const handleCreateNodeReactionMutation = async (
workspaceAccount: SelectWorkspaceAccount,
mutation: LocalMutation,
): Promise<ExecuteLocalMutationResult> => {
if (!mutation.after) {
const handleCreateNodeReactionChange = async (
workspaceUser: SelectWorkspaceUser,
change: LocalChange,
): Promise<SyncLocalChangeResult> => {
if (!change.after) {
return {
status: 'error',
};
}
const nodeReactionData = JSON.parse(
mutation.after,
) as LocalNodeReactionMutationData;
change.after,
) as LocalNodeReactionChangeData;
if (nodeReactionData.actor_id !== workspaceAccount.user_id) {
if (nodeReactionData.actor_id !== workspaceUser.id) {
return {
status: 'error',
};
@@ -47,7 +47,7 @@ const handleCreateNodeReactionMutation = async (
const nodeRole = await getCollaboratorRole(
nodeReactionData.node_id,
workspaceAccount.user_id,
workspaceUser.id,
);
if (nodeRole === null) {
@@ -63,7 +63,7 @@ const handleCreateNodeReactionMutation = async (
actor_id: nodeReactionData.actor_id,
reaction: nodeReactionData.reaction,
created_at: new Date(nodeReactionData.created_at),
workspace_id: workspaceAccount.workspace_id,
workspace_id: workspaceUser.workspace_id,
server_created_at: new Date(),
})
.onConflict((ob) => ob.doNothing())
@@ -74,21 +74,21 @@ const handleCreateNodeReactionMutation = async (
};
};
const handleDeleteNodeReactionMutation = async (
workspaceAccount: SelectWorkspaceAccount,
mutation: LocalMutation,
): Promise<ExecuteLocalMutationResult> => {
if (!mutation.before) {
const handleDeleteNodeReactionChange = async (
workspaceUser: SelectWorkspaceUser,
change: LocalChange,
): Promise<SyncLocalChangeResult> => {
if (!change.before) {
return {
status: 'error',
};
}
const nodeReactionData = JSON.parse(
mutation.before,
) as LocalNodeReactionMutationData;
change.before,
) as LocalNodeReactionChangeData;
if (nodeReactionData.actor_id !== workspaceAccount.user_id) {
if (nodeReactionData.actor_id !== workspaceUser.id) {
return {
status: 'error',
};

View File

@@ -1,43 +1,43 @@
import { database } from '@/data/database';
import { SelectWorkspaceAccount } from '@/data/schema';
import { SelectWorkspaceUser } from '@/data/schema';
import { getCollaboratorRole } from '@/lib/nodes';
import {
ExecuteLocalMutationResult,
LocalMutation,
LocalNodeMutationData,
} from '@/types/mutations';
SyncLocalChangeResult,
LocalChange,
LocalNodeChangeData,
} from '@/types/sync';
import { ServerNodeAttributes } from '@/types/nodes';
import { fromUint8Array, toUint8Array } from 'js-base64';
import * as Y from 'yjs';
export const handleNodeMutation = async (
workspaceAccount: SelectWorkspaceAccount,
mutation: LocalMutation,
): Promise<ExecuteLocalMutationResult> => {
switch (mutation.action) {
export const handleNodeChange = async (
workspaceUser: SelectWorkspaceUser,
change: LocalChange,
): Promise<SyncLocalChangeResult> => {
switch (change.action) {
case 'insert': {
return handleCreateNodeMutation(workspaceAccount, mutation);
return handleCreateNodeChange(workspaceUser, change);
}
case 'update': {
return handleUpdateNodeMutation(workspaceAccount, mutation);
return handleUpdateNodeChange(workspaceUser, change);
}
case 'delete': {
return handleDeleteNodeMutation(workspaceAccount, mutation);
return handleDeleteNodeChange(workspaceUser, change);
}
}
};
const handleCreateNodeMutation = async (
workspaceAccount: SelectWorkspaceAccount,
mutation: LocalMutation,
): Promise<ExecuteLocalMutationResult> => {
if (!mutation.after) {
const handleCreateNodeChange = async (
workspaceUser: SelectWorkspaceUser,
change: LocalChange,
): Promise<SyncLocalChangeResult> => {
if (!change.after) {
return {
status: 'error',
};
}
const nodeData = JSON.parse(mutation.after) as LocalNodeMutationData;
const nodeData = JSON.parse(change.after) as LocalNodeChangeData;
const existingNode = await database
.selectFrom('nodes')
.where('id', '=', nodeData.id)
@@ -53,7 +53,7 @@ const handleCreateNodeMutation = async (
if (attributes.parentId) {
const parentRole = await getCollaboratorRole(
attributes.parentId,
workspaceAccount.user_id,
workspaceUser.id,
);
if (
@@ -71,7 +71,7 @@ const handleCreateNodeMutation = async (
.values({
id: nodeData.id,
attributes: nodeData.attributes,
workspace_id: workspaceAccount.workspace_id,
workspace_id: workspaceUser.workspace_id,
state: nodeData.state,
created_at: new Date(nodeData.created_at),
created_by: nodeData.created_by,
@@ -85,17 +85,17 @@ const handleCreateNodeMutation = async (
};
};
const handleUpdateNodeMutation = async (
workspaceAccount: SelectWorkspaceAccount,
mutation: LocalMutation,
): Promise<ExecuteLocalMutationResult> => {
if (!mutation.after) {
const handleUpdateNodeChange = async (
workspaceUser: SelectWorkspaceUser,
change: LocalChange,
): Promise<SyncLocalChangeResult> => {
if (!change.after) {
return {
status: 'error',
};
}
const nodeData = JSON.parse(mutation.after) as LocalNodeMutationData;
const nodeData = JSON.parse(change.after) as LocalNodeChangeData;
const existingNode = await database
.selectFrom('nodes')
.select(['id', 'workspace_id', 'state'])
@@ -104,14 +104,14 @@ const handleUpdateNodeMutation = async (
if (
!existingNode ||
existingNode.workspace_id != workspaceAccount.workspace_id
existingNode.workspace_id != workspaceUser.workspace_id
) {
return {
status: 'error',
};
}
const role = await getCollaboratorRole(nodeData.id, workspaceAccount.user_id);
const role = await getCollaboratorRole(nodeData.id, workspaceUser.id);
if (role === null) {
return {
status: 'error',
@@ -121,7 +121,7 @@ const handleUpdateNodeMutation = async (
const updatedAt = nodeData.updated_at
? new Date(nodeData.updated_at)
: new Date();
const updatedBy = nodeData.updated_by ?? workspaceAccount.user_id;
const updatedBy = nodeData.updated_by ?? workspaceUser.id;
const doc = new Y.Doc({
guid: nodeData.id,
@@ -152,17 +152,17 @@ const handleUpdateNodeMutation = async (
};
};
const handleDeleteNodeMutation = async (
workspaceAccount: SelectWorkspaceAccount,
mutation: LocalMutation,
): Promise<ExecuteLocalMutationResult> => {
if (!mutation.before) {
const handleDeleteNodeChange = async (
workspaceUser: SelectWorkspaceUser,
change: LocalChange,
): Promise<SyncLocalChangeResult> => {
if (!change.before) {
return {
status: 'error',
};
}
const nodeData = JSON.parse(mutation.before) as LocalNodeMutationData;
const nodeData = JSON.parse(change.before) as LocalNodeChangeData;
const existingNode = await database
.selectFrom('nodes')
.where('id', '=', nodeData.id)
@@ -175,13 +175,13 @@ const handleDeleteNodeMutation = async (
};
}
if (existingNode.workspace_id !== workspaceAccount.workspace_id) {
if (existingNode.workspace_id !== workspaceUser.workspace_id) {
return {
status: 'error',
};
}
const role = await getCollaboratorRole(nodeData.id, workspaceAccount.user_id);
const role = await getCollaboratorRole(nodeData.id, workspaceUser.id);
if (role === null) {
return {
status: 'error',

View File

@@ -1,7 +1,7 @@
export type ChangeMessage<T> = {
export type CdcMessage<T> = {
before?: T | null;
after: T;
source: ChangeSource;
source: CdcSource;
op: string;
ts_ms: number;
ts_ns: number;
@@ -9,7 +9,7 @@ export type ChangeMessage<T> = {
transaction: any;
};
type ChangeSource = {
type CdcSource = {
version: string;
connector: string;
name: string;
@@ -25,7 +25,7 @@ type ChangeSource = {
lsn: number;
};
export type MutationChangeData = {
export type ChangeCdcData = {
id: string;
workspace_id: string;
table: string;
@@ -36,7 +36,7 @@ export type MutationChangeData = {
device_ids: string[];
};
export type NodeChangeData = {
export type NodeCdcData = {
id: string;
workspace_id: string;
parent_id: string | null;
@@ -53,7 +53,7 @@ export type NodeChangeData = {
server_updated_at: string | null;
};
export type NodeCollaboratorChangeData = {
export type NodeCollaboratorCdcData = {
node_id: string;
collaborator_id: string;
role: string;
@@ -67,7 +67,7 @@ export type NodeCollaboratorChangeData = {
server_updated_at: string | null;
};
export type NodeReactionChangeData = {
export type NodeReactionCdcData = {
node_id: string;
actor_id: string;
reaction: string;

View File

@@ -1,20 +1,20 @@
export type ExecuteLocalMutationsInput = {
export type SyncLocalChangesInput = {
workspaceId: string;
mutations: LocalMutation[];
changes: LocalChange[];
};
export type ExecuteLocalMutationResult = {
status: ExecuteLocalMutationStatus;
export type SyncLocalChangeResult = {
status: SyncLocalChangeStatus;
};
export type ExecuteLocalMutationStatus = 'success' | 'error';
export type SyncLocalChangeStatus = 'success' | 'error';
export type ServerExecuteMutationResult = {
export type ServerSyncChangeResult = {
id: number;
status: ExecuteLocalMutationStatus;
status: SyncLocalChangeStatus;
};
export type LocalMutation = {
export type LocalChange = {
id: number;
table: string;
action: 'insert' | 'update' | 'delete';
@@ -23,7 +23,7 @@ export type LocalMutation = {
createdAt: string;
};
export type LocalNodeMutationData = {
export type LocalNodeChangeData = {
id: string;
attributes: string;
state: string;
@@ -37,7 +37,7 @@ export type LocalNodeMutationData = {
server_version_id: string;
};
export type LocalNodeCollaboratorMutationData = {
export type LocalNodeCollaboratorChangeData = {
node_id: string;
collaborator_id: string;
role: string;
@@ -51,14 +51,14 @@ export type LocalNodeCollaboratorMutationData = {
server_version_id: string;
};
export type LocalNodeReactionMutationData = {
export type LocalNodeReactionChangeData = {
node_id: string;
actor_id: string;
reaction: string;
created_at: string;
};
export type ServerMutation = {
export type ServerChange = {
id: string;
table: string;
action: 'insert' | 'update' | 'delete';

View File

@@ -10,7 +10,7 @@ export enum WorkspaceRole {
Viewer = 'viewer',
}
export enum WorkspaceAccountStatus {
export enum WorkspaceUserStatus {
Active = 1,
Inactive = 2,
}
@@ -28,16 +28,16 @@ export type Workspace = {
versionId: string;
};
export type WorkspaceAccount = {
export type WorkspaceUser = {
id: string;
workspaceId: string;
accountId: string;
userId: string;
role: WorkspaceRole;
createdAt: Date;
createdBy: string;
updatedAt?: Date | null;
updatedBy?: string | null;
status: WorkspaceAccountStatus;
status: WorkspaceUserStatus;
versionId: string;
};