Move cursors inside workspace database

This commit is contained in:
Hakan Shehu
2024-12-04 14:25:59 +01:00
parent 7aaf8acf0b
commit 0929e3a4b9
5 changed files with 88 additions and 169 deletions

View File

@@ -80,32 +80,6 @@ const createWorkspacesTable: Migration = {
},
};
const createWorkspaceCursorsTable: Migration = {
up: async (db) => {
await db.schema
.createTable('workspace_cursors')
.addColumn('user_id', 'text', (col) =>
col
.notNull()
.primaryKey()
.references('workspaces.user_id')
.onDelete('cascade')
)
.addColumn('transactions', 'integer', (col) => col.notNull().defaultTo(0))
.addColumn('collaborations', 'integer', (col) =>
col.notNull().defaultTo(0)
)
.addColumn('interactions', 'integer', (col) => col.notNull().defaultTo(0))
.addColumn('revocations', 'integer', (col) => col.notNull().defaultTo(0))
.addColumn('created_at', 'text', (col) => col.notNull())
.addColumn('updated_at', 'text')
.execute();
},
down: async (db) => {
await db.schema.dropTable('workspace_cursors').execute();
},
};
const createDeletedTokensTable: Migration = {
up: async (db) => {
await db.schema
@@ -125,6 +99,5 @@ export const appDatabaseMigrations: Record<string, Migration> = {
'00001_create_servers_table': createServersTable,
'00002_create_accounts_table': createAccountsTable,
'00003_create_workspaces_table': createWorkspacesTable,
'00004_create_workspace_cursors_table': createWorkspaceCursorsTable,
'00005_create_deleted_tokens_table': createDeletedTokensTable,
};

View File

@@ -45,20 +45,6 @@ export type SelectWorkspace = Selectable<WorkspaceTable>;
export type CreateWorkspace = Insertable<WorkspaceTable>;
export type UpdateWorkspace = Updateable<WorkspaceTable>;
interface WorkspaceCursorTable {
user_id: ColumnType<string, string, never>;
transactions: ColumnType<bigint, bigint | null, bigint>;
collaborations: ColumnType<bigint, bigint | null, bigint>;
interactions: ColumnType<bigint, bigint | null, bigint>;
revocations: ColumnType<bigint, bigint | null, bigint>;
created_at: ColumnType<string, string, string>;
updated_at: ColumnType<string | null, string | null, string>;
}
export type SelectWorkspaceCursor = Selectable<WorkspaceCursorTable>;
export type CreateWorkspaceCursor = Insertable<WorkspaceCursorTable>;
export type UpdateWorkspaceCursor = Updateable<WorkspaceCursorTable>;
interface DeletedTokenTable {
token: ColumnType<string, string, never>;
account_id: ColumnType<string, string, never>;
@@ -70,6 +56,5 @@ export interface AppDatabaseSchema {
servers: ServerTable;
accounts: AccountTable;
workspaces: WorkspaceTable;
workspace_cursors: WorkspaceCursorTable;
deleted_tokens: DeletedTokenTable;
}

View File

@@ -300,6 +300,21 @@ const createNodeDeleteNameTrigger: Migration = {
},
};
const createCursorsTable: Migration = {
up: async (db) => {
await db.schema
.createTable('cursors')
.addColumn('name', 'text', (col) => col.notNull().primaryKey())
.addColumn('value', 'integer', (col) => col.notNull().defaultTo(0))
.addColumn('created_at', 'text', (col) => col.notNull())
.addColumn('updated_at', 'text')
.execute();
},
down: async (db) => {
await db.schema.dropTable('cursors').execute();
},
};
export const workspaceDatabaseMigrations: Record<string, Migration> = {
'00001_create_nodes_table': createNodesTable,
'00002_create_node_transactions_table': createNodeTransactionsTable,
@@ -313,4 +328,5 @@ export const workspaceDatabaseMigrations: Record<string, Migration> = {
'00010_create_node_insert_name_trigger': createNodeInsertNameTrigger,
'00011_create_node_update_name_trigger': createNodeUpdateNameTrigger,
'00012_create_node_delete_name_trigger': createNodeDeleteNameTrigger,
'00013_create_cursors_table': createCursorsTable,
};

View File

@@ -115,6 +115,13 @@ export type SelectInteractionEvent = Selectable<InteractionEventTable>;
export type CreateInteractionEvent = Insertable<InteractionEventTable>;
export type UpdateInteractionEvent = Updateable<InteractionEventTable>;
interface CursorTable {
name: ColumnType<string, string, never>;
value: ColumnType<bigint, bigint, bigint>;
created_at: ColumnType<string, string, never>;
updated_at: ColumnType<string | null, string | null, string | null>;
}
export interface WorkspaceDatabaseSchema {
nodes: NodeTable;
node_transactions: NodeTransactionTable;
@@ -124,4 +131,5 @@ export interface WorkspaceDatabaseSchema {
downloads: DownloadTable;
interactions: InteractionTable;
interaction_events: InteractionEventTable;
cursors: CursorTable;
}

View File

@@ -233,7 +233,7 @@ class SyncService {
}
if (cursor) {
this.updateNodeTransactionCursor(message.userId, cursor);
this.updateCursor(message.userId, 'transactions', cursor);
}
} catch (error) {
this.debug(
@@ -272,7 +272,7 @@ class SyncService {
}
if (cursor) {
this.updateCollaborationCursor(message.userId, cursor);
this.updateCursor(message.userId, 'collaborations', cursor);
}
} catch (error) {
this.debug(
@@ -309,7 +309,7 @@ class SyncService {
}
if (cursor) {
this.updateCollaborationRevocationCursor(message.userId, cursor);
this.updateCursor(message.userId, 'revocations', cursor);
}
} catch (error) {
this.debug(
@@ -348,7 +348,7 @@ class SyncService {
}
if (cursor) {
this.updateInteractionCursor(message.userId, cursor);
this.updateCursor(message.userId, 'interactions', cursor);
}
} catch (error) {
this.debug(
@@ -779,207 +779,144 @@ class SyncService {
private async requireNodeTransactions(userId: string) {
this.debug(`Requiring node transactions for user ${userId}`);
const workspaceWithCursor = await databaseService.appDatabase
.selectFrom('workspaces as w')
.leftJoin('workspace_cursors as wc', 'w.user_id', 'wc.user_id')
.select([
'w.user_id',
'w.workspace_id',
'w.account_id',
'wc.transactions',
])
.where('w.user_id', '=', userId)
const workspace = await databaseService.appDatabase
.selectFrom('workspaces')
.select(['user_id', 'workspace_id', 'account_id'])
.where('user_id', '=', userId)
.executeTakeFirst();
if (!workspaceWithCursor) {
if (!workspace) {
this.debug(
`No workspace found for user ${userId}, skipping requiring node transactions`
);
return;
}
const cursor = await this.fetchCursor(userId, 'transactions');
const message: FetchNodeTransactionsMessage = {
type: 'fetch_node_transactions',
userId: workspaceWithCursor.user_id,
workspaceId: workspaceWithCursor.workspace_id,
cursor: workspaceWithCursor.transactions?.toString() ?? '0',
userId,
workspaceId: workspace.workspace_id,
cursor: cursor.toString(),
};
socketService.sendMessage(workspaceWithCursor.account_id, message);
socketService.sendMessage(workspace.account_id, message);
}
private async requireCollaborations(userId: string) {
this.debug(`Requiring collaborations for user ${userId}`);
const workspaceWithCursor = await databaseService.appDatabase
.selectFrom('workspaces as w')
.leftJoin('workspace_cursors as wc', 'w.user_id', 'wc.user_id')
.select([
'w.user_id',
'w.workspace_id',
'w.account_id',
'wc.collaborations',
])
.where('w.user_id', '=', userId)
const workspace = await databaseService.appDatabase
.selectFrom('workspaces')
.select(['user_id', 'workspace_id', 'account_id'])
.where('user_id', '=', userId)
.executeTakeFirst();
if (!workspaceWithCursor) {
if (!workspace) {
this.debug(
`No workspace found for user ${userId}, skipping requiring collaborations`
);
return;
}
const cursor = await this.fetchCursor(userId, 'collaborations');
const message: FetchCollaborationsMessage = {
type: 'fetch_collaborations',
userId: workspaceWithCursor.user_id,
workspaceId: workspaceWithCursor.workspace_id,
cursor: workspaceWithCursor.collaborations?.toString() ?? '0',
userId: workspace.user_id,
workspaceId: workspace.workspace_id,
cursor: cursor.toString(),
};
socketService.sendMessage(workspaceWithCursor.account_id, message);
socketService.sendMessage(workspace.account_id, message);
}
private async requireCollaborationRevocations(userId: string) {
this.debug(`Requiring collaboration revocations for user ${userId}`);
const workspaceWithCursor = await databaseService.appDatabase
.selectFrom('workspaces as w')
.leftJoin('workspace_cursors as wc', 'w.user_id', 'wc.user_id')
.select(['w.user_id', 'w.workspace_id', 'w.account_id', 'wc.revocations'])
.where('w.user_id', '=', userId)
const workspace = await databaseService.appDatabase
.selectFrom('workspaces')
.select(['user_id', 'workspace_id', 'account_id'])
.where('user_id', '=', userId)
.executeTakeFirst();
if (!workspaceWithCursor) {
if (!workspace) {
this.debug(
`No workspace found for user ${userId}, skipping requiring collaboration revocations`
);
return;
}
const cursor = await this.fetchCursor(userId, 'revocations');
const message: FetchCollaborationRevocationsMessage = {
type: 'fetch_collaboration_revocations',
userId: workspaceWithCursor.user_id,
workspaceId: workspaceWithCursor.workspace_id,
cursor: workspaceWithCursor.revocations?.toString() ?? '0',
userId: workspace.user_id,
workspaceId: workspace.workspace_id,
cursor: cursor.toString(),
};
socketService.sendMessage(workspaceWithCursor.account_id, message);
socketService.sendMessage(workspace.account_id, message);
}
private async requireInteractions(userId: string) {
this.debug(`Requiring interactions for user ${userId}`);
const workspaceWithCursor = await databaseService.appDatabase
.selectFrom('workspaces as w')
.leftJoin('workspace_cursors as wc', 'w.user_id', 'wc.user_id')
.select([
'w.user_id',
'w.workspace_id',
'w.account_id',
'wc.interactions',
])
.where('w.user_id', '=', userId)
const workspace = await databaseService.appDatabase
.selectFrom('workspaces')
.select(['user_id', 'workspace_id', 'account_id'])
.where('user_id', '=', userId)
.executeTakeFirst();
if (!workspaceWithCursor) {
if (!workspace) {
this.debug(
`No workspace found for user ${userId}, skipping requiring interactions`
);
return;
}
const cursor = await this.fetchCursor(userId, 'interactions');
const message: FetchInteractionsMessage = {
type: 'fetch_interactions',
userId: workspaceWithCursor.user_id,
workspaceId: workspaceWithCursor.workspace_id,
cursor: workspaceWithCursor.interactions?.toString() ?? '0',
userId: workspace.user_id,
workspaceId: workspace.workspace_id,
cursor: cursor.toString(),
};
socketService.sendMessage(workspaceWithCursor.account_id, message);
socketService.sendMessage(workspace.account_id, message);
}
private async updateNodeTransactionCursor(userId: string, cursor: bigint) {
this.debug(
`Updating node transaction cursor for user ${userId} to ${cursor}`
);
private async updateCursor(userId: string, name: string, cursor: bigint) {
this.debug(`Updating cursor ${name} for user ${userId} to ${cursor}`);
await databaseService.appDatabase
.insertInto('workspace_cursors')
const workspaceDatabase =
await databaseService.getWorkspaceDatabase(userId);
await workspaceDatabase
.insertInto('cursors')
.values({
user_id: userId,
transactions: cursor,
name,
value: cursor,
created_at: new Date().toISOString(),
})
.onConflict((eb) =>
eb.column('user_id').doUpdateSet({
transactions: cursor,
eb.column('name').doUpdateSet({
value: cursor,
updated_at: new Date().toISOString(),
})
)
.execute();
}
private async updateCollaborationCursor(userId: string, cursor: bigint) {
this.debug(`Updating collaboration cursor for user ${userId} to ${cursor}`);
private async fetchCursor(userId: string, name: string): Promise<bigint> {
const workspaceDatabase =
await databaseService.getWorkspaceDatabase(userId);
await databaseService.appDatabase
.insertInto('workspace_cursors')
.values({
user_id: userId,
collaborations: cursor,
created_at: new Date().toISOString(),
})
.onConflict((eb) =>
eb.column('user_id').doUpdateSet({
collaborations: cursor,
updated_at: new Date().toISOString(),
})
)
.execute();
}
const cursor = await workspaceDatabase
.selectFrom('cursors')
.select('value')
.where('name', '=', name)
.executeTakeFirst();
private async updateCollaborationRevocationCursor(
userId: string,
cursor: bigint
) {
this.debug(
`Updating collaboration revocation cursor for user ${userId} to ${cursor}`
);
await databaseService.appDatabase
.insertInto('workspace_cursors')
.values({
user_id: userId,
revocations: cursor,
created_at: new Date().toISOString(),
})
.onConflict((eb) =>
eb.column('user_id').doUpdateSet({
revocations: cursor,
updated_at: new Date().toISOString(),
})
)
.execute();
}
private async updateInteractionCursor(userId: string, cursor: bigint) {
this.debug(`Updating interaction cursor for user ${userId} to ${cursor}`);
await databaseService.appDatabase
.insertInto('workspace_cursors')
.values({
user_id: userId,
interactions: cursor,
created_at: new Date().toISOString(),
})
.onConflict((eb) =>
eb.column('user_id').doUpdateSet({
interactions: cursor,
updated_at: new Date().toISOString(),
})
)
.execute();
return cursor?.value ?? 0n;
}
}