Implement background sync for nodes

This commit is contained in:
Hakan Shehu
2024-10-14 19:15:20 +02:00
parent 0412c6d7bf
commit ff32bf03a6
18 changed files with 776 additions and 110 deletions

View File

@@ -37,6 +37,7 @@ import { NodeCollaboratorServerDeleteMutationHandler } from '@/main/handlers/mut
import { NodeReactionServerCreateMutationHandler } from '@/main/handlers/mutations/node-reaction-server-create';
import { NodeReactionServerDeleteMutationHandler } from '@/main/handlers/mutations/node-reaction-server-delete';
import { LogoutMutationHandler } from '@/main/handlers/mutations/logout';
import { NodeBatchSyncMutationHandler } from '@/main/handlers/mutations/node-batch-sync';
type MutationHandlerMap = {
[K in keyof MutationMap]: MutationHandler<MutationMap[K]['input']>;
@@ -85,4 +86,5 @@ export const mutationHandlerMap: MutationHandlerMap = {
node_reaction_server_create: new NodeReactionServerCreateMutationHandler(),
node_reaction_server_delete: new NodeReactionServerDeleteMutationHandler(),
logout: new LogoutMutationHandler(),
node_batch_sync: new NodeBatchSyncMutationHandler(),
};

View File

@@ -0,0 +1,115 @@
import { databaseManager } from '@/main/data/database-manager';
import { MutationHandler, MutationResult } from '@/operations/mutations';
import { NodeBatchSyncMutationInput } from '@/operations/mutations/node-batch-sync';
import { toUint8Array } from 'js-base64';
import * as Y from 'yjs';
export class NodeBatchSyncMutationHandler
implements MutationHandler<NodeBatchSyncMutationInput>
{
public async handleMutation(
input: NodeBatchSyncMutationInput,
): Promise<MutationResult<NodeBatchSyncMutationInput>> {
const workspace = await databaseManager.appDatabase
.selectFrom('workspaces')
.selectAll()
.where((eb) =>
eb.and([
eb('account_id', '=', input.accountId),
eb('workspace_id', '=', input.workspaceId),
]),
)
.executeTakeFirst();
if (!workspace) {
return {
output: {
success: false,
},
};
}
const userId = workspace.user_id;
const workspaceDatabase =
await databaseManager.getWorkspaceDatabase(userId);
for (const node of input.nodes) {
const existingNode = await workspaceDatabase
.selectFrom('nodes')
.where('id', '=', node.id)
.selectAll()
.executeTakeFirst();
if (!existingNode) {
const doc = new Y.Doc({
guid: node.id,
});
Y.applyUpdate(doc, toUint8Array(node.state));
const attributesMap = doc.getMap('attributes');
const attributes = JSON.stringify(attributesMap.toJSON());
await workspaceDatabase
.insertInto('nodes')
.values({
id: node.id,
attributes: attributes,
state: node.state,
created_at: node.createdAt,
created_by: node.createdBy,
version_id: node.versionId,
server_created_at: node.serverCreatedAt,
server_version_id: node.versionId,
})
.onConflict((cb) =>
cb
.doUpdateSet({
server_created_at: node.serverCreatedAt,
server_updated_at: node.serverUpdatedAt,
server_version_id: node.versionId,
})
.where('version_id', '=', node.versionId),
)
.execute();
} else if (existingNode.version_id !== node.versionId) {
const doc = new Y.Doc({
guid: node.id,
});
Y.applyUpdate(doc, toUint8Array(existingNode.state));
Y.applyUpdate(doc, toUint8Array(node.state));
const attributesMap = doc.getMap('attributes');
const attributes = JSON.stringify(attributesMap.toJSON());
await workspaceDatabase
.updateTable('nodes')
.set({
state: node.state,
attributes: attributes,
server_updated_at: node.serverUpdatedAt,
server_version_id: node.versionId,
updated_at: node.updatedAt,
updated_by: node.updatedBy,
version_id: node.versionId,
})
.where('id', '=', node.id)
.execute();
}
}
return {
output: {
success: true,
},
changes: [
{
type: 'workspace',
table: 'nodes',
userId: userId,
},
],
};
}
}

View File

@@ -39,7 +39,11 @@ class Synchronizer {
}
public async handleServerChange(accountId: string, change: ServerChange) {
const executed = await this.executeServerChange(accountId, change.data);
const executed = await this.executeServerChange(
accountId,
change.workspaceId,
change.data,
);
if (executed) {
await mediator.executeMessage(
{
@@ -57,6 +61,7 @@ class Synchronizer {
private async executeServerChange(
accountId: string,
workspaceId: string,
data: ServerChangeData,
): Promise<boolean> {
switch (data.type) {
@@ -64,7 +69,7 @@ class Synchronizer {
const result = await mediator.executeMutation({
type: 'node_server_create',
accountId: accountId,
workspaceId: data.workspaceId,
workspaceId: workspaceId,
id: data.id,
state: data.state,
createdAt: data.createdAt,
@@ -79,7 +84,7 @@ class Synchronizer {
const result = await mediator.executeMutation({
type: 'node_server_update',
accountId: accountId,
workspaceId: data.workspaceId,
workspaceId: workspaceId,
id: data.id,
update: data.update,
updatedAt: data.updatedAt,
@@ -94,7 +99,7 @@ class Synchronizer {
const result = await mediator.executeMutation({
type: 'node_server_delete',
accountId: accountId,
workspaceId: data.workspaceId,
workspaceId: workspaceId,
id: data.id,
});
@@ -104,7 +109,7 @@ class Synchronizer {
const result = await mediator.executeMutation({
type: 'node_collaborator_server_create',
accountId: accountId,
workspaceId: data.workspaceId,
workspaceId: workspaceId,
nodeId: data.nodeId,
collaboratorId: data.collaboratorId,
role: data.role,
@@ -120,7 +125,7 @@ class Synchronizer {
const result = await mediator.executeMutation({
type: 'node_collaborator_server_update',
accountId: accountId,
workspaceId: data.workspaceId,
workspaceId: workspaceId,
nodeId: data.nodeId,
collaboratorId: data.collaboratorId,
role: data.role,
@@ -136,7 +141,7 @@ class Synchronizer {
const result = await mediator.executeMutation({
type: 'node_collaborator_server_delete',
accountId: accountId,
workspaceId: data.workspaceId,
workspaceId: workspaceId,
nodeId: data.nodeId,
collaboratorId: data.collaboratorId,
});
@@ -147,7 +152,7 @@ class Synchronizer {
const result = await mediator.executeMutation({
type: 'node_reaction_server_create',
accountId: accountId,
workspaceId: data.workspaceId,
workspaceId: workspaceId,
nodeId: data.nodeId,
actorId: data.actorId,
reaction: data.reaction,
@@ -161,7 +166,7 @@ class Synchronizer {
const result = await mediator.executeMutation({
type: 'node_reaction_server_delete',
accountId: accountId,
workspaceId: data.workspaceId,
workspaceId: workspaceId,
nodeId: data.nodeId,
actorId: data.actorId,
reaction: data.reaction,
@@ -169,6 +174,16 @@ class Synchronizer {
return result.success;
}
case 'node_batch_sync': {
const result = await mediator.executeMutation({
type: 'node_batch_sync',
accountId: accountId,
workspaceId: workspaceId,
nodes: data.nodes,
});
return result.success;
}
default: {
return false;
}

View File

@@ -0,0 +1,21 @@
import { ServerNodeBatchSyncData } from '@/types/sync';
export type NodeBatchSyncMutationInput = {
type: 'node_batch_sync';
accountId: string;
workspaceId: string;
nodes: ServerNodeBatchSyncData[];
};
export type NodeBatchSyncMutationOutput = {
success: boolean;
};
declare module '@/operations/mutations' {
interface MutationMap {
node_batch_sync: {
input: NodeBatchSyncMutationInput;
output: NodeBatchSyncMutationOutput;
};
}
}

View File

@@ -37,12 +37,12 @@ export type ServerChangeData =
| ServerNodeCollaboratorUpdateChangeData
| ServerNodeCollaboratorDeleteChangeData
| ServerNodeReactionCreateChangeData
| ServerNodeReactionDeleteChangeData;
| ServerNodeReactionDeleteChangeData
| ServerNodeBatchSyncChangeData;
export type ServerNodeCreateChangeData = {
type: 'node_create';
id: string;
workspaceId: string;
state: string;
createdAt: string;
createdBy: string;
@@ -53,7 +53,6 @@ export type ServerNodeCreateChangeData = {
export type ServerNodeUpdateChangeData = {
type: 'node_update';
id: string;
workspaceId: string;
update: string;
updatedAt: string;
updatedBy: string;
@@ -64,7 +63,6 @@ export type ServerNodeUpdateChangeData = {
export type ServerNodeDeleteChangeData = {
type: 'node_delete';
id: string;
workspaceId: string;
};
export type ServerNodeCollaboratorCreateChangeData = {
@@ -72,7 +70,6 @@ export type ServerNodeCollaboratorCreateChangeData = {
nodeId: string;
collaboratorId: string;
role: string;
workspaceId: string;
createdAt: string;
createdBy: string;
versionId: string;
@@ -83,7 +80,6 @@ export type ServerNodeCollaboratorUpdateChangeData = {
type: 'node_collaborator_update';
nodeId: string;
collaboratorId: string;
workspaceId: string;
role: string;
updatedAt: string;
updatedBy: string;
@@ -95,7 +91,6 @@ export type ServerNodeCollaboratorDeleteChangeData = {
type: 'node_collaborator_delete';
nodeId: string;
collaboratorId: string;
workspaceId: string;
};
export type ServerNodeReactionCreateChangeData = {
@@ -103,7 +98,6 @@ export type ServerNodeReactionCreateChangeData = {
nodeId: string;
actorId: string;
reaction: string;
workspaceId: string;
createdAt: string;
serverCreatedAt: string;
};
@@ -113,5 +107,21 @@ export type ServerNodeReactionDeleteChangeData = {
nodeId: string;
actorId: string;
reaction: string;
workspaceId: string;
};
export type ServerNodeBatchSyncChangeData = {
type: 'node_batch_sync';
nodes: ServerNodeBatchSyncData[];
};
export type ServerNodeBatchSyncData = {
id: string;
state: string;
createdAt: string;
createdBy: string;
versionId: string;
updatedAt?: string | null;
updatedBy?: string | null;
serverCreatedAt: string;
serverUpdatedAt?: string | null;
};

View File

@@ -105,6 +105,83 @@ const createNodesTable: Migration = {
},
};
const createClosureTable: Migration = {
up: async (db) => {
// Create closure table for storing paths between nodes
await db.schema
.createTable('node_paths')
.addColumn('ancestor_id', 'varchar(30)', (col) =>
col.notNull().references('nodes.id').onDelete('cascade'),
)
.addColumn('descendant_id', 'varchar(30)', (col) =>
col.notNull().references('nodes.id').onDelete('cascade'),
)
.addColumn('workspace_id', 'varchar(30)', (col) => col.notNull())
.addColumn('level', 'integer', (col) => col.notNull())
.addPrimaryKeyConstraint('node_paths_pkey', [
'ancestor_id',
'descendant_id',
])
.execute();
await sql`
CREATE OR REPLACE FUNCTION fn_insert_node_path() RETURNS TRIGGER AS $$
BEGIN
-- Insert direct path from the new node to itself
INSERT INTO node_paths (ancestor_id, descendant_id, workspace_id, level)
VALUES (NEW.id, NEW.id, NEW.workspace_id, 0);
-- Insert paths from ancestors to the new node
INSERT INTO node_paths (ancestor_id, descendant_id, workspace_id, level)
SELECT ancestor_id, NEW.id, NEW.workspace_id, level + 1
FROM node_paths
WHERE descendant_id = NEW.parent_id;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER trg_insert_node_path
AFTER INSERT ON nodes
FOR EACH ROW
EXECUTE FUNCTION fn_insert_node_path();
CREATE OR REPLACE FUNCTION fn_update_node_path() RETURNS TRIGGER AS $$
BEGIN
IF OLD.parent_id IS DISTINCT FROM NEW.parent_id THEN
-- Delete old paths involving the updated node
DELETE FROM node_paths
WHERE descendant_id = NEW.id AND ancestor_id <> NEW.id;
INSERT INTO node_paths (ancestor_id, descendant_id, workspace_id, level)
SELECT ancestor_id, NEW.id, NEW.workspace_id, level + 1
FROM node_paths
WHERE descendant_id = NEW.parent_id;
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER trg_update_node_path
AFTER UPDATE OF parent_id ON nodes
FOR EACH ROW
EXECUTE FUNCTION fn_update_node_path();
`.execute(db);
},
down: async (db) => {
await sql`
DROP TRIGGER IF EXISTS trg_insert_node_path ON nodes;
DROP TRIGGER IF EXISTS trg_update_node_path ON nodes;
DROP FUNCTION IF EXISTS fn_insert_node_path();
DROP FUNCTION IF EXISTS fn_update_node_path();
`.execute(db);
// Drop closure table
await db.schema.dropTable('node_paths').execute();
},
};
const createNodeCollaboratorsTable: Migration = {
up: async (db) => {
await db.schema
@@ -228,9 +305,10 @@ export const databaseMigrations: Record<string, Migration> = {
'00002_create_workspaces_table': createWorkspacesTable,
'00003_create_workspace_users_table': createWorkspaceUsersTable,
'00004_create_nodes_table': createNodesTable,
'00005_create_node_collaborators_table': createNodeCollaboratorsTable,
'00006_create_node_reactions_table': createNodeReactionsTable,
'00007_create_account_devices_table': createAccountDevicesTable,
'00008_create_changes_table': createChangesTable,
'00009_create_change_devices_table': createChangeDevicesTable,
'00005_create_closure_table': createClosureTable,
'00006_create_node_collaborators_table': createNodeCollaboratorsTable,
'00007_create_node_reactions_table': createNodeReactionsTable,
'00008_create_account_devices_table': createAccountDevicesTable,
'00009_create_changes_table': createChangesTable,
'00010_create_change_devices_table': createChangeDevicesTable,
};

View File

@@ -106,6 +106,17 @@ export type SelectNode = Selectable<NodeTable>;
export type CreateNode = Insertable<NodeTable>;
export type UpdateNode = Updateable<NodeTable>;
interface NodePathTable {
ancestor_id: ColumnType<string, string, never>;
descendant_id: ColumnType<string, string, never>;
workspace_id: ColumnType<string, string, never>;
level: ColumnType<number, number, number>;
}
export type SelectNodePath = Selectable<NodePathTable>;
export type CreateNodePath = Insertable<NodePathTable>;
export type UpdateNodePath = Updateable<NodePathTable>;
interface NodeCollaboratorTable {
node_id: ColumnType<string, string, never>;
collaborator_id: ColumnType<string, string, never>;
@@ -165,6 +176,7 @@ export interface DatabaseSchema {
workspace_users: WorkspaceUserTable;
account_devices: AccountDeviceTable;
nodes: NodeTable;
node_paths: NodePathTable;
node_collaborators: NodeCollaboratorTable;
node_reactions: NodeReactionTable;
changes: ChangeTable;

View File

@@ -3,3 +3,22 @@ export const PostgresOperation = {
UPDATE: 'u',
DELETE: 'd',
} as const;
export const NodeTypes = {
User: 'user',
Space: 'space',
Page: 'page',
Channel: 'channel',
Chat: 'chat',
Message: 'message',
HorizontalRule: 'horizontal_rule',
Database: 'database',
DatabaseReplica: 'database_replica',
Record: 'record',
Folder: 'folder',
TableView: 'table_view',
BoardView: 'board_view',
CalendarView: 'calendar_view',
Field: 'field',
SelectOption: 'select_option',
};

View File

@@ -1,7 +1,6 @@
import { database } from '@/data/database';
import { SelectNode } from '@/data/schema';
import { ServerNode } from '@/types/nodes';
import { sql } from 'kysely';
export const mapNode = (node: SelectNode): ServerNode => {
return {
@@ -36,32 +35,20 @@ export const fetchCollaboratorRole = async (
nodeId: string,
collaboratorId: string,
): Promise<string | null> => {
const query = sql<NodeCollaboratorRow>`
WITH RECURSIVE ancestors(id, parent_id, level) AS (
SELECT id, parent_id, 0 AS level
FROM nodes
WHERE id = ${nodeId}
UNION ALL
SELECT n.id, n.parent_id, a.level + 1
FROM nodes n
INNER JOIN ancestors a ON n.id = a.parent_id
)
SELECT
nc.node_id,
a.level AS node_level,
nc.role
FROM node_collaborators nc
JOIN ancestors a ON nc.node_id = a.id
WHERE nc.collaborator_id = ${collaboratorId};
`.compile(database);
const result = await database
.selectFrom('node_paths as np')
.innerJoin('node_collaborators as nc', 'np.ancestor_id', 'nc.node_id')
.select(['np.ancestor_id as node_id', 'np.level as node_level', 'nc.role'])
.where('np.descendant_id', '=', nodeId)
.where('nc.collaborator_id', '=', collaboratorId)
.execute();
const result = await database.executeQuery(query);
if (result.rows.length === 0) {
if (result.length === 0) {
return null;
}
const roleHierarchy = ['owner', 'admin', 'collaborator'];
const highestRole = result.rows.reduce((highest, row) => {
const highestRole = result.reduce((highest, row) => {
const currentRoleIndex = roleHierarchy.indexOf(row.role);
const highestRoleIndex = roleHierarchy.indexOf(highest);
return currentRoleIndex < highestRoleIndex ? row.role : highest;
@@ -70,23 +57,28 @@ export const fetchCollaboratorRole = async (
return highestRole;
};
export const fetchNodeTree = async (nodeId: string): Promise<string[]> => {
const query = sql<NodeIdRow>`
WITH RECURSIVE ancestors(id, parent_id, level) AS (
SELECT id, parent_id, 0 AS level
FROM nodes
WHERE id = ${nodeId}
UNION ALL
SELECT n.id, n.parent_id, a.level + 1
FROM nodes n
INNER JOIN ancestors a ON n.id = a.parent_id
)
SELECT n.id
FROM nodes n
JOIN ancestors a ON n.id = a.id
ORDER BY a.level DESC;
`.compile(database);
export const fetchNodeAscendants = async (
nodeId: string,
): Promise<string[]> => {
const result = await database
.selectFrom('node_paths')
.select('ancestor_id')
.where('descendant_id', '=', nodeId)
.orderBy('level', 'desc')
.execute();
const result = await database.executeQuery(query);
return result.rows.map((row) => row.id);
return result.map((row) => row.ancestor_id);
};
export const fetchNodeDescendants = async (
nodeId: string,
): Promise<string[]> => {
const result = await database
.selectFrom('node_paths')
.select('descendant_id')
.where('ancestor_id', '=', nodeId)
.orderBy('level', 'asc')
.execute();
return result.map((row) => row.descendant_id);
};

View File

@@ -1,7 +1,7 @@
import { database } from '@/data/database';
import { CHANNEL_NAMES, redis } from '@/data/redis';
import { getIdType, IdType } from '@/lib/id';
import { fetchNodeTree } from '@/lib/nodes';
import { fetchNodeAscendants } from '@/lib/nodes';
import { ServerChangeData } from '@/types/sync';
import { Job, Queue, Worker } from 'bullmq';
@@ -137,7 +137,7 @@ const fetchDevicesForNode = async (
return fetchAllWorkspaceDevices(workspaceId);
}
const nodeTree = await fetchNodeTree(nodeId);
const nodeTree = await fetchNodeAscendants(nodeId);
return fetchDevicesForNodes(nodeTree);
};
@@ -146,7 +146,7 @@ const fetchAllWorkspaceDevices = async (
): Promise<string[]> => {
const deviceIds = await database
.selectFrom('workspace_users as wu')
.fullJoin('account_devices as ad', 'wu.account_id', 'ad.account_id')
.innerJoin('account_devices as ad', 'wu.account_id', 'ad.account_id')
.select('ad.id')
.where('wu.workspace_id', '=', workspaceId)
.execute();
@@ -159,8 +159,8 @@ const fetchAllWorkspaceDevices = async (
const fetchDevicesForNodes = async (nodeIds: string[]): Promise<string[]> => {
const deviceIds = await database
.selectFrom('node_collaborators as nc')
.fullJoin('workspace_users as wu', 'nc.collaborator_id', 'wu.id')
.fullJoin('account_devices as ad', 'wu.account_id', 'ad.account_id')
.innerJoin('workspace_users as wu', 'nc.collaborator_id', 'wu.id')
.innerJoin('account_devices as ad', 'wu.account_id', 'ad.account_id')
.select('ad.id')
.where('nc.node_id', 'in', nodeIds)
.execute();

407
server/src/queues/sync.ts Normal file
View File

@@ -0,0 +1,407 @@
import { database } from '@/data/database';
import { CHANNEL_NAMES, redis } from '@/data/redis';
import { generateId, IdType } from '@/lib/id';
import { ServerChange, ServerNodeBatchSyncData } from '@/types/sync';
import { Job, Queue, Worker } from 'bullmq';
import { NodeTypes } from '@/lib/constants';
const SYNC_BATCH_SIZE = 100;
const REDIS_HOST = process.env.REDIS_HOST;
const REDIS_PASSWORD = process.env.REDIS_PASSWORD;
const REDIS_PORT = process.env.REDIS_PORT;
const REDIS_DB = process.env.REDIS_DB;
if (!REDIS_HOST || !REDIS_PASSWORD || !REDIS_PORT || !REDIS_DB) {
throw new Error('Redis configuration is missing');
}
export const queue = new Queue('sync', {
connection: {
host: REDIS_HOST,
password: REDIS_PASSWORD,
port: parseInt(REDIS_PORT),
db: parseInt(REDIS_DB),
},
defaultJobOptions: {
removeOnComplete: true,
},
});
export const enqueueCollaboratorSync = async (
nodeId: string,
collaboratorId: string,
): Promise<void> => {
await queue.add('collaborator-sync', { nodeId, collaboratorId });
};
export const enqueueWorkspaceUserSync = async (
workspaceId: string,
userId: string,
): Promise<void> => {
await queue.add('workspace-user-sync', { workspaceId, userId });
};
export const enqueueAccountDeviceSync = async (
accountId: string,
deviceId: string,
): Promise<void> => {
await queue.add('account-device-sync', { accountId, deviceId });
};
export const initSyncWorker = () => {
return new Worker('sync', handleSyncJob, {
connection: {
host: REDIS_HOST,
password: REDIS_PASSWORD,
port: parseInt(REDIS_PORT),
db: parseInt(REDIS_DB),
},
});
};
const handleSyncJob = async (job: Job) => {
switch (job.name) {
case 'collaborator-sync':
return handleCollaboratorSync(job.data);
case 'workspace-user-sync':
return handleWorkspaceUserSync(job.data);
case 'account-device-sync':
return handleAccountDeviceSync(job.data);
}
};
const handleCollaboratorSync = async (data: {
nodeId: string;
collaboratorId: string;
}) => {
const { nodeId, collaboratorId } = data;
const collaboratorNode = await database
.selectFrom('nodes')
.where('id', '=', nodeId)
.selectAll()
.executeTakeFirst();
if (!collaboratorNode) {
return;
}
const workspaceUser = await database
.selectFrom('workspace_users')
.where('id', '=', collaboratorId)
.selectAll()
.executeTakeFirst();
if (!workspaceUser) {
return;
}
const accountDevices = await database
.selectFrom('account_devices')
.where('account_id', '=', workspaceUser.account_id)
.selectAll()
.execute();
const deviceIds = accountDevices.map((device) => device.id);
if (deviceIds.length === 0) {
return;
}
await processDescendantsSync(
collaboratorNode.workspace_id,
[collaboratorNode.id],
deviceIds,
);
};
const handleWorkspaceUserSync = async (data: {
workspaceId: string;
userId: string;
}) => {
const { workspaceId, userId } = data;
const userNode = await database
.selectFrom('nodes')
.where('id', '=', userId)
.selectAll()
.executeTakeFirst();
if (!userNode) {
return;
}
const workspaceUser = await database
.selectFrom('workspace_users')
.where('id', '=', userId)
.selectAll()
.executeTakeFirst();
if (!workspaceUser) {
return;
}
const accountDevices = await database
.selectFrom('account_devices')
.where('account_id', '=', workspaceUser.account_id)
.selectAll()
.execute();
const deviceIds = accountDevices.map((device) => device.id);
if (deviceIds.length === 0) {
return;
}
const nodesWithAccess = await database
.selectFrom('node_collaborators')
.where('collaborator_id', '=', userNode.id)
.select('node_id')
.execute();
const nodeIds = nodesWithAccess.map((node) => node.node_id);
if (nodeIds.length === 0) {
return;
}
await processUsersSync(workspaceId, deviceIds);
await processDescendantsSync(workspaceId, nodeIds, deviceIds);
};
const handleAccountDeviceSync = async (data: {
accountId: string;
deviceId: string;
}) => {
const { accountId, deviceId } = data;
const workspaceUsers = await database
.selectFrom('workspace_users')
.where('account_id', '=', accountId)
.selectAll()
.execute();
for (const workspaceUser of workspaceUsers) {
await processUsersSync(workspaceUser.workspace_id, [deviceId]);
const nodesWithAccess = await database
.selectFrom('node_collaborators')
.where('collaborator_id', '=', workspaceUser.id)
.select('node_id')
.execute();
const nodeIds = nodesWithAccess.map((node) => node.node_id);
if (nodeIds.length === 0) {
continue;
}
await processDescendantsSync(workspaceUser.workspace_id, nodeIds, [
deviceId,
]);
}
};
const processUsersSync = async (workspaceId: string, deviceIds: string[]) => {
let lastProcessedId = '';
let hasMoreUsers = true;
while (hasMoreUsers) {
const users = await database
.selectFrom('nodes')
.select([
'id',
'state',
'created_at',
'created_by',
'version_id',
'updated_at',
'updated_by',
'server_created_at',
'server_updated_at',
])
.where((eb) =>
eb.and([
eb('workspace_id', '=', workspaceId),
eb('type', '=', NodeTypes.User),
eb('id', '>', lastProcessedId),
]),
)
.orderBy('id', 'asc')
.limit(SYNC_BATCH_SIZE)
.execute();
if (users.length === 0) {
hasMoreUsers = false;
break;
}
lastProcessedId = users[users.length - 1].id;
const nodeSyncData: ServerNodeBatchSyncData[] = [];
for (const user of users) {
nodeSyncData.push({
id: user.id,
state: user.state,
createdAt: user.created_at.toISOString(),
createdBy: user.created_by,
versionId: user.version_id,
updatedAt: user.updated_at?.toISOString(),
updatedBy: user.updated_by,
serverCreatedAt: user.server_created_at.toISOString(),
serverUpdatedAt: user.server_updated_at?.toISOString(),
});
}
if (nodeSyncData.length === 0) {
continue;
}
const change: ServerChange = {
id: generateId(IdType.Change),
workspaceId,
data: {
type: 'node_batch_sync',
nodes: nodeSyncData,
},
createdAt: new Date().toISOString(),
};
await database.transaction().execute(async (tx) => {
await tx
.insertInto('changes')
.values({
id: change.id,
workspace_id: change.workspaceId,
created_at: new Date(change.createdAt),
data: JSON.stringify(change.data),
})
.execute();
await tx
.insertInto('change_devices')
.values(
deviceIds.map((deviceId) => ({
change_id: change.id,
device_id: deviceId,
retry_count: 0,
})),
)
.execute();
});
await redis.publish(
CHANNEL_NAMES.CHANGES,
JSON.stringify({
changeId: change.id,
deviceIds,
}),
);
if (users.length < SYNC_BATCH_SIZE) {
hasMoreUsers = false;
}
}
};
const processDescendantsSync = async (
workspaceId: string,
nodeIds: string[],
deviceIds: string[],
) => {
let lastProcessedId = '';
let hasMoreDescendants = true;
while (hasMoreDescendants) {
const descendantNodes = await database
.selectFrom('node_paths as np')
.innerJoin('nodes as n', 'np.descendant_id', 'n.id')
.select([
'n.id',
'n.state',
'n.created_at',
'n.created_by',
'n.version_id',
'n.updated_at',
'n.updated_by',
'n.server_created_at',
'n.server_updated_at',
])
.where((eb) =>
eb.and([
eb('np.ancestor_id', 'in', nodeIds),
eb('np.descendant_id', '>', lastProcessedId),
]),
)
.orderBy('np.descendant_id', 'asc')
.limit(SYNC_BATCH_SIZE)
.execute();
if (descendantNodes.length === 0) {
hasMoreDescendants = false;
break;
}
const nodeSyncData: ServerNodeBatchSyncData[] = [];
for (const node of descendantNodes) {
nodeSyncData.push({
id: node.id,
state: node.state,
createdAt: node.created_at.toISOString(),
createdBy: node.created_by,
versionId: node.version_id,
updatedAt: node.updated_at?.toISOString(),
updatedBy: node.updated_by,
serverCreatedAt: node.server_created_at.toISOString(),
serverUpdatedAt: node.server_updated_at?.toISOString(),
});
}
lastProcessedId = descendantNodes[descendantNodes.length - 1].id;
if (nodeSyncData.length === 0) {
continue;
}
const change: ServerChange = {
id: generateId(IdType.Change),
workspaceId,
data: {
type: 'node_batch_sync',
nodes: nodeSyncData,
},
createdAt: new Date().toISOString(),
};
await database.transaction().execute(async (tx) => {
await tx
.insertInto('changes')
.values({
id: change.id,
workspace_id: change.workspaceId,
created_at: new Date(change.createdAt),
data: JSON.stringify(change.data),
})
.execute();
await tx
.insertInto('change_devices')
.values(
deviceIds.map((deviceId) => ({
change_id: change.id,
device_id: deviceId,
retry_count: 0,
})),
)
.execute();
});
await redis.publish(
CHANNEL_NAMES.CHANGES,
JSON.stringify({
changeId: change.id,
deviceIds,
}),
);
if (descendantNodes.length < SYNC_BATCH_SIZE) {
hasMoreDescendants = false;
}
}
};

View File

@@ -16,6 +16,7 @@ import { WorkspaceOutput, WorkspaceRole } from '@/types/workspaces';
import { authMiddleware } from '@/middlewares/auth';
import { generateToken } from '@/lib/tokens';
import { mapNode } from '@/lib/nodes';
import { enqueueAccountDeviceSync } from '@/queues/sync';
const GoogleUserInfoUrl = 'https://www.googleapis.com/oauth2/v1/userinfo';
const SaltRounds = 10;
@@ -301,6 +302,8 @@ const buildLoginOutput = async (
throw new Error('Failed to create account device.');
}
await enqueueAccountDeviceSync(id, deviceId);
return {
account: {
token,

View File

@@ -30,6 +30,7 @@ import {
ServerNodeUpdateChangeData,
} from '@/types/sync';
import { enqueueChange, enqueueChanges } from '@/queues/changes';
import { enqueueWorkspaceUserSync } from '@/queues/sync';
export const workspacesRouter = Router();
@@ -85,17 +86,6 @@ workspacesRouter.post('/', async (req: NeuronRequest, res: NeuronResponse) => {
const userAttributes = JSON.stringify(userAttributesMap.toJSON());
const userState = fromUint8Array(Y.encodeStateAsUpdate(userDoc));
const userChangeId = generateId(IdType.Change);
const changeData: ServerNodeCreateChangeData = {
type: 'node_create',
id: userId,
workspaceId: workspaceId,
state: userState,
createdAt: createdAt.toISOString(),
serverCreatedAt: createdAt.toISOString(),
versionId: userVersionId,
createdBy: account.id,
};
await database.transaction().execute(async (trx) => {
await trx
@@ -139,19 +129,9 @@ workspacesRouter.post('/', async (req: NeuronRequest, res: NeuronResponse) => {
server_created_at: createdAt,
})
.execute();
await trx
.insertInto('changes')
.values({
id: userChangeId,
workspace_id: workspaceId,
data: JSON.stringify(changeData),
created_at: createdAt,
})
.execute();
});
await enqueueChange(userChangeId);
await enqueueWorkspaceUserSync(workspaceId, userId);
const output: WorkspaceOutput = {
id: workspaceId,
@@ -683,7 +663,6 @@ workspacesRouter.post(
const changeData: ServerNodeCreateChangeData = {
type: 'node_create',
id: user.id,
workspaceId: user.workspaceId,
state: user.state,
createdAt: user.createdAt.toISOString(),
createdBy: user.createdBy,
@@ -716,6 +695,13 @@ workspacesRouter.post(
.insertInto('workspace_users')
.values(workspaceUsersToCreate)
.execute();
for (const workspaceUser of workspaceUsersToCreate) {
await enqueueWorkspaceUserSync(
workspaceUser.workspace_id,
workspaceUser.id,
);
}
}
if (usersToCreate.length > 0) {
@@ -855,7 +841,6 @@ workspacesRouter.put(
const changeData: ServerNodeUpdateChangeData = {
type: 'node_update',
id: userNode.id,
workspaceId: userNode.workspaceId,
updates: userUpdates,
updatedAt: updatedAt.toISOString(),
updatedBy: currentWorkspaceUser.id,

View File

@@ -81,7 +81,7 @@ export class SocketConnection {
private async sendPendingChanges() {
const changes = await database
.selectFrom('changes as c')
.fullJoin('change_devices as cd', 'c.id', 'cd.change_id')
.innerJoin('change_devices as cd', 'c.id', 'cd.change_id')
.select([
'c.id',
'c.workspace_id',

View File

@@ -8,6 +8,7 @@ import { SelectWorkspaceUser } from '@/data/schema';
import { generateId, IdType } from '@/lib/id';
import { fetchCollaboratorRole } from '@/lib/nodes';
import { enqueueChange } from '@/queues/changes';
import { enqueueCollaboratorSync } from '@/queues/sync';
import {
SyncLocalChangeResult,
LocalChange,
@@ -68,7 +69,6 @@ const handleCreateNodeCollaboratorChange = async (
role: nodeCollaboratorData.role,
createdAt: nodeCollaboratorData.created_at,
serverCreatedAt: serverCreatedAt.toISOString(),
workspaceId: workspaceUser.workspace_id,
createdBy: nodeCollaboratorData.created_by,
versionId: nodeCollaboratorData.version_id,
};
@@ -105,6 +105,10 @@ const handleCreateNodeCollaboratorChange = async (
});
await enqueueChange(changeId);
await enqueueCollaboratorSync(
nodeCollaboratorData.node_id,
nodeCollaboratorData.collaborator_id,
);
return {
status: 'success',
@@ -235,7 +239,6 @@ const handleUpdateNodeCollaboratorChange = async (
collaboratorId: nodeCollaboratorData.collaborator_id,
role: nodeCollaboratorData.role,
serverUpdatedAt: serverUpdatedAt.toISOString(),
workspaceId: workspaceUser.workspace_id,
versionId: nodeCollaboratorData.version_id,
updatedAt: updatedAt.toISOString(),
updatedBy:
@@ -375,7 +378,6 @@ const handleDeleteNodeCollaboratorChange = async (
type: 'node_collaborator_delete',
nodeId: nodeCollaboratorData.node_id,
collaboratorId: nodeCollaboratorData.collaborator_id,
workspaceId: workspaceUser.workspace_id,
};
await database.transaction().execute(async (trx) => {

View File

@@ -69,7 +69,6 @@ const handleCreateNodeReactionChange = async (
reaction: nodeReactionData.reaction,
createdAt: nodeReactionData.created_at,
serverCreatedAt: serverCreatedAt.toISOString(),
workspaceId: workspaceUser.workspace_id,
};
await database.transaction().execute(async (trx) => {
@@ -134,7 +133,6 @@ const handleDeleteNodeReactionChange = async (
nodeId: nodeReactionData.node_id,
actorId: nodeReactionData.actor_id,
reaction: nodeReactionData.reaction,
workspaceId: workspaceUser.workspace_id,
};
await database.transaction().execute(async (trx) => {

View File

@@ -81,7 +81,6 @@ const handleCreateNodeChange = async (
const changeData: ServerNodeCreateChangeData = {
type: 'node_create',
id: nodeData.id,
workspaceId: workspaceUser.workspace_id,
state: nodeData.state,
createdAt: nodeData.created_at,
createdBy: nodeData.created_by,
@@ -186,7 +185,6 @@ const handleUpdateNodeChange = async (
const changeData: ServerNodeUpdateChangeData = {
type: 'node_update',
id: nodeData.id,
workspaceId: workspaceUser.workspace_id,
updates: updates,
updatedAt: updatedAt.toISOString(),
updatedBy: updatedBy,
@@ -270,7 +268,6 @@ const handleDeleteNodeChange = async (
const changeData: ServerNodeDeleteChangeData = {
type: 'node_delete',
id: nodeData.id,
workspaceId: workspaceUser.workspace_id,
};
await database.transaction().execute(async (trx) => {

View File

@@ -77,12 +77,12 @@ export type ServerChangeData =
| ServerNodeCollaboratorUpdateChangeData
| ServerNodeCollaboratorDeleteChangeData
| ServerNodeReactionCreateChangeData
| ServerNodeReactionDeleteChangeData;
| ServerNodeReactionDeleteChangeData
| ServerNodeBatchSyncChangeData;
export type ServerNodeCreateChangeData = {
type: 'node_create';
id: string;
workspaceId: string;
state: string;
createdAt: string;
createdBy: string;
@@ -93,7 +93,6 @@ export type ServerNodeCreateChangeData = {
export type ServerNodeUpdateChangeData = {
type: 'node_update';
id: string;
workspaceId: string;
updates: string[];
updatedAt: string;
updatedBy: string;
@@ -104,7 +103,6 @@ export type ServerNodeUpdateChangeData = {
export type ServerNodeDeleteChangeData = {
type: 'node_delete';
id: string;
workspaceId: string;
};
export type ServerNodeCollaboratorCreateChangeData = {
@@ -112,7 +110,6 @@ export type ServerNodeCollaboratorCreateChangeData = {
nodeId: string;
collaboratorId: string;
role: string;
workspaceId: string;
createdAt: string;
createdBy: string;
versionId: string;
@@ -123,7 +120,6 @@ export type ServerNodeCollaboratorUpdateChangeData = {
type: 'node_collaborator_update';
nodeId: string;
collaboratorId: string;
workspaceId: string;
role: string;
updatedAt: string;
updatedBy: string;
@@ -135,7 +131,6 @@ export type ServerNodeCollaboratorDeleteChangeData = {
type: 'node_collaborator_delete';
nodeId: string;
collaboratorId: string;
workspaceId: string;
};
export type ServerNodeReactionCreateChangeData = {
@@ -143,7 +138,6 @@ export type ServerNodeReactionCreateChangeData = {
nodeId: string;
actorId: string;
reaction: string;
workspaceId: string;
createdAt: string;
serverCreatedAt: string;
};
@@ -153,5 +147,21 @@ export type ServerNodeReactionDeleteChangeData = {
nodeId: string;
actorId: string;
reaction: string;
workspaceId: string;
};
export type ServerNodeBatchSyncChangeData = {
type: 'node_batch_sync';
nodes: ServerNodeBatchSyncData[];
};
export type ServerNodeBatchSyncData = {
id: string;
state: string;
createdAt: string;
createdBy: string;
versionId: string;
updatedAt?: string | null;
updatedBy?: string | null;
serverCreatedAt: string;
serverUpdatedAt?: string | null;
};