From b3590a70eded39958914df5aee4882d4968ce45c Mon Sep 17 00:00:00 2001 From: Hakan Shehu Date: Mon, 23 Jun 2025 21:45:52 +0200 Subject: [PATCH] Implement pre-aggregated counters for users, nodes and upload storage used (#82) --- .../routes/workspaces/files/file-upload.ts | 15 +-- .../migrations/00022-create-counters-table.ts | 16 +++ ...-create-workspace-user-counter-triggers.ts | 80 ++++++++++++++ ...-create-workspace-node-counter-triggers.ts | 77 +++++++++++++ ...reate-workspace-upload-counter-triggers.ts | 104 ++++++++++++++++++ ...026-create-user-upload-counter-triggers.ts | 104 ++++++++++++++++++ apps/server/src/data/migrations/index.ts | 13 +++ apps/server/src/data/schema.ts | 8 ++ apps/server/src/lib/counters.ts | 18 +++ 9 files changed, 426 insertions(+), 9 deletions(-) create mode 100644 apps/server/src/data/migrations/00022-create-counters-table.ts create mode 100644 apps/server/src/data/migrations/00023-create-workspace-user-counter-triggers.ts create mode 100644 apps/server/src/data/migrations/00024-create-workspace-node-counter-triggers.ts create mode 100644 apps/server/src/data/migrations/00025-create-workspace-upload-counter-triggers.ts create mode 100644 apps/server/src/data/migrations/00026-create-user-upload-counter-triggers.ts create mode 100644 apps/server/src/lib/counters.ts diff --git a/apps/server/src/api/client/routes/workspaces/files/file-upload.ts b/apps/server/src/api/client/routes/workspaces/files/file-upload.ts index e4232e3b..b2cf0894 100644 --- a/apps/server/src/api/client/routes/workspaces/files/file-upload.ts +++ b/apps/server/src/api/client/routes/workspaces/files/file-upload.ts @@ -13,6 +13,7 @@ import { import { database } from '@colanode/server/data/database'; import { s3Client } from '@colanode/server/data/storage'; import { config } from '@colanode/server/lib/config'; +import { fetchCounter } from '@colanode/server/lib/counters'; import { buildFilePath } from '@colanode/server/lib/files'; import { mapNode, updateNode } from '@colanode/server/lib/nodes'; @@ -87,16 +88,12 @@ export const fileUploadRoute: FastifyPluginCallbackZod = ( }); } - const storageUsedRow = await database - .selectFrom('uploads') - .select(({ fn }) => [fn.sum('size').as('storage_used')]) - .where('created_by', '=', request.user.id) - .executeTakeFirst(); + const storageUsed = await fetchCounter( + database, + `${user.id}.storage.used` + ); - const storageUsed = BigInt(storageUsedRow?.storage_used ?? 0); - const storageLimit = BigInt(user.storage_limit); - - if (storageUsed >= storageLimit) { + if (storageUsed >= BigInt(user.storage_limit)) { return reply.code(400).send({ code: ApiErrorCode.FileUploadInitFailed, message: 'You have reached the maximum storage limit.', diff --git a/apps/server/src/data/migrations/00022-create-counters-table.ts b/apps/server/src/data/migrations/00022-create-counters-table.ts new file mode 100644 index 00000000..dd7355b7 --- /dev/null +++ b/apps/server/src/data/migrations/00022-create-counters-table.ts @@ -0,0 +1,16 @@ +import { Migration } from 'kysely'; + +export const createCountersTable: Migration = { + up: async (db) => { + await db.schema + .createTable('counters') + .addColumn('key', 'varchar(500)', (col) => col.notNull().primaryKey()) + .addColumn('value', 'bigint', (col) => col.notNull().defaultTo(0)) + .addColumn('created_at', 'timestamptz', (col) => col.notNull()) + .addColumn('updated_at', 'timestamptz') + .execute(); + }, + down: async (db) => { + await db.schema.dropTable('counters').execute(); + }, +}; diff --git a/apps/server/src/data/migrations/00023-create-workspace-user-counter-triggers.ts b/apps/server/src/data/migrations/00023-create-workspace-user-counter-triggers.ts new file mode 100644 index 00000000..315258cd --- /dev/null +++ b/apps/server/src/data/migrations/00023-create-workspace-user-counter-triggers.ts @@ -0,0 +1,80 @@ +import { Migration, sql } from 'kysely'; + +export const createWorkspaceUserCounterTriggers: Migration = { + up: async (db) => { + await db + .insertInto('counters') + .columns(['key', 'value', 'created_at']) + .expression((eb) => + eb + .selectFrom('users') + .select([ + eb + .fn('concat', [ + eb.cast(eb.val(''), 'varchar'), + eb.ref('workspace_id'), + eb.cast(eb.val('.users.count'), 'varchar'), + ]) + .as('key'), + eb.fn.count('id').as('value'), + eb.val(new Date()).as('created_at'), + ]) + .groupBy('workspace_id') + ) + .execute(); + + // Create trigger function to increment user counter on user insert + await sql` + CREATE OR REPLACE FUNCTION fn_increment_workspace_user_counter() RETURNS TRIGGER AS $$ + BEGIN + INSERT INTO counters (key, value, created_at, updated_at) + VALUES ( + CONCAT(NEW.workspace_id, '.users.count'), + 1, + NOW(), + NOW() + ) + ON CONFLICT (key) + DO UPDATE SET + value = counters.value + 1, + updated_at = NOW(); + + RETURN NEW; + END; + $$ LANGUAGE plpgsql; + + CREATE TRIGGER trg_increment_workspace_user_counter + AFTER INSERT ON users + FOR EACH ROW + EXECUTE FUNCTION fn_increment_workspace_user_counter(); + `.execute(db); + + // Create trigger function to decrement user counter on user delete + await sql` + CREATE OR REPLACE FUNCTION fn_decrement_workspace_user_counter() RETURNS TRIGGER AS $$ + BEGIN + UPDATE counters + SET + value = GREATEST(0, value - 1), + updated_at = NOW() + WHERE key = CONCAT(OLD.workspace_id, '.users.count'); + + RETURN OLD; + END; + $$ LANGUAGE plpgsql; + + CREATE TRIGGER trg_decrement_workspace_user_counter + AFTER DELETE ON users + FOR EACH ROW + EXECUTE FUNCTION fn_decrement_workspace_user_counter(); + `.execute(db); + }, + down: async (db) => { + await sql` + DROP TRIGGER IF EXISTS trg_increment_workspace_user_counter ON users; + DROP TRIGGER IF EXISTS trg_decrement_workspace_user_counter ON users; + DROP FUNCTION IF EXISTS fn_increment_workspace_user_counter(); + DROP FUNCTION IF EXISTS fn_decrement_workspace_user_counter(); + `.execute(db); + }, +}; diff --git a/apps/server/src/data/migrations/00024-create-workspace-node-counter-triggers.ts b/apps/server/src/data/migrations/00024-create-workspace-node-counter-triggers.ts new file mode 100644 index 00000000..057fde72 --- /dev/null +++ b/apps/server/src/data/migrations/00024-create-workspace-node-counter-triggers.ts @@ -0,0 +1,77 @@ +import { Migration, sql } from 'kysely'; + +export const createWorkspaceNodeCounterTriggers: Migration = { + up: async (db) => { + await db + .insertInto('counters') + .columns(['key', 'value', 'created_at']) + .expression((eb) => + eb + .selectFrom('nodes') + .select([ + eb + .fn('concat', [ + eb.ref('workspace_id'), + eb.cast(eb.val('.nodes.count'), 'varchar'), + ]) + .as('key'), + eb.fn.count('id').as('value'), + eb.val(new Date()).as('created_at'), + ]) + .groupBy('workspace_id') + ) + .execute(); + + await sql` + CREATE OR REPLACE FUNCTION fn_increment_workspace_node_counter() RETURNS TRIGGER AS $$ + BEGIN + INSERT INTO counters (key, value, created_at, updated_at) + VALUES ( + CONCAT(NEW.workspace_id, '.nodes.count'), + 1, + NOW(), + NOW() + ) + ON CONFLICT (key) + DO UPDATE SET + value = counters.value + 1, + updated_at = NOW(); + + RETURN NEW; + END; + $$ LANGUAGE plpgsql; + + CREATE TRIGGER trg_increment_workspace_node_counter + AFTER INSERT ON nodes + FOR EACH ROW + EXECUTE FUNCTION fn_increment_workspace_node_counter(); + `.execute(db); + + await sql` + CREATE OR REPLACE FUNCTION fn_decrement_workspace_node_counter() RETURNS TRIGGER AS $$ + BEGIN + UPDATE counters + SET + value = GREATEST(0, value - 1), + updated_at = NOW() + WHERE key = CONCAT(OLD.workspace_id, '.nodes.count'); + + RETURN OLD; + END; + $$ LANGUAGE plpgsql; + + CREATE TRIGGER trg_decrement_workspace_node_counter + AFTER DELETE ON nodes + FOR EACH ROW + EXECUTE FUNCTION fn_decrement_workspace_node_counter(); + `.execute(db); + }, + down: async (db) => { + await sql` + DROP TRIGGER IF EXISTS trg_increment_workspace_node_counter ON nodes; + DROP TRIGGER IF EXISTS trg_decrement_workspace_node_counter ON nodes; + DROP FUNCTION IF EXISTS fn_increment_workspace_node_counter(); + DROP FUNCTION IF EXISTS fn_decrement_workspace_node_counter(); + `.execute(db); + }, +}; diff --git a/apps/server/src/data/migrations/00025-create-workspace-upload-counter-triggers.ts b/apps/server/src/data/migrations/00025-create-workspace-upload-counter-triggers.ts new file mode 100644 index 00000000..f26e84e8 --- /dev/null +++ b/apps/server/src/data/migrations/00025-create-workspace-upload-counter-triggers.ts @@ -0,0 +1,104 @@ +import { Migration, sql } from 'kysely'; + +export const createWorkspaceUploadCounterTriggers: Migration = { + up: async (db) => { + await db + .insertInto('counters') + .columns(['key', 'value', 'created_at']) + .expression((eb) => + eb + .selectFrom('uploads') + .select([ + eb + .fn('concat', [ + eb.ref('workspace_id'), + eb.cast(eb.val('.storage.used'), 'varchar'), + ]) + .as('key'), + eb.fn.sum('size').as('value'), + eb.val(new Date()).as('created_at'), + ]) + .groupBy('workspace_id') + ) + .execute(); + + await sql` + CREATE OR REPLACE FUNCTION fn_increment_workspace_storage_counter() RETURNS TRIGGER AS $$ + BEGIN + INSERT INTO counters (key, value, created_at, updated_at) + VALUES ( + CONCAT(NEW.workspace_id, '.storage.used'), + NEW.size, + NOW(), + NOW() + ) + ON CONFLICT (key) + DO UPDATE SET + value = counters.value + NEW.size, + updated_at = NOW(); + + RETURN NEW; + END; + $$ LANGUAGE plpgsql; + + CREATE TRIGGER trg_increment_workspace_storage_counter + AFTER INSERT ON uploads + FOR EACH ROW + EXECUTE FUNCTION fn_increment_workspace_storage_counter(); + `.execute(db); + + await sql` + CREATE OR REPLACE FUNCTION fn_decrement_workspace_storage_counter() RETURNS TRIGGER AS $$ + BEGIN + UPDATE counters + SET + value = GREATEST(0, value - OLD.size), + updated_at = NOW() + WHERE key = CONCAT(OLD.workspace_id, '.storage.used'); + + RETURN OLD; + END; + $$ LANGUAGE plpgsql; + + CREATE TRIGGER trg_decrement_workspace_storage_counter + AFTER DELETE ON uploads + FOR EACH ROW + EXECUTE FUNCTION fn_decrement_workspace_storage_counter(); + `.execute(db); + + await sql` + CREATE OR REPLACE FUNCTION fn_update_workspace_storage_counter() RETURNS TRIGGER AS $$ + DECLARE + size_difference BIGINT; + BEGIN + IF OLD.size IS DISTINCT FROM NEW.size THEN + size_difference := NEW.size - OLD.size; + + UPDATE counters + SET + value = GREATEST(0, value + size_difference), + updated_at = NOW() + WHERE key = CONCAT(NEW.workspace_id, '.storage.used'); + END IF; + + RETURN NEW; + END; + $$ LANGUAGE plpgsql; + + CREATE TRIGGER trg_update_workspace_storage_counter + AFTER UPDATE ON uploads + FOR EACH ROW + EXECUTE FUNCTION fn_update_workspace_storage_counter(); + `.execute(db); + }, + down: async (db) => { + await sql` + DROP TRIGGER IF EXISTS trg_increment_workspace_storage_counter ON uploads; + DROP TRIGGER IF EXISTS trg_decrement_workspace_storage_counter ON uploads; + DROP TRIGGER IF EXISTS trg_update_workspace_storage_counter ON uploads; + DROP FUNCTION IF EXISTS fn_increment_workspace_storage_counter(); + DROP FUNCTION IF EXISTS fn_decrement_workspace_storage_counter(); + DROP FUNCTION IF EXISTS fn_update_workspace_storage_counter(); + `.execute(db); + }, +}; diff --git a/apps/server/src/data/migrations/00026-create-user-upload-counter-triggers.ts b/apps/server/src/data/migrations/00026-create-user-upload-counter-triggers.ts new file mode 100644 index 00000000..1abc2d7d --- /dev/null +++ b/apps/server/src/data/migrations/00026-create-user-upload-counter-triggers.ts @@ -0,0 +1,104 @@ +import { Migration, sql } from 'kysely'; + +export const createUserUploadCounterTriggers: Migration = { + up: async (db) => { + await db + .insertInto('counters') + .columns(['key', 'value', 'created_at']) + .expression((eb) => + eb + .selectFrom('uploads') + .select([ + eb + .fn('concat', [ + eb.ref('created_by'), + eb.cast(eb.val('.storage.used'), 'varchar'), + ]) + .as('key'), + eb.fn.sum('size').as('value'), + eb.val(new Date()).as('created_at'), + ]) + .groupBy('created_by') + ) + .execute(); + + await sql` + CREATE OR REPLACE FUNCTION fn_increment_user_storage_counter() RETURNS TRIGGER AS $$ + BEGIN + INSERT INTO counters (key, value, created_at, updated_at) + VALUES ( + CONCAT(NEW.created_by, '.storage.used'), + NEW.size, + NOW(), + NOW() + ) + ON CONFLICT (key) + DO UPDATE SET + value = counters.value + NEW.size, + updated_at = NOW(); + + RETURN NEW; + END; + $$ LANGUAGE plpgsql; + + CREATE TRIGGER trg_increment_user_storage_counter + AFTER INSERT ON uploads + FOR EACH ROW + EXECUTE FUNCTION fn_increment_user_storage_counter(); + `.execute(db); + + await sql` + CREATE OR REPLACE FUNCTION fn_decrement_user_storage_counter() RETURNS TRIGGER AS $$ + BEGIN + UPDATE counters + SET + value = GREATEST(0, value - OLD.size), + updated_at = NOW() + WHERE key = CONCAT(OLD.created_by, '.storage.used'); + + RETURN OLD; + END; + $$ LANGUAGE plpgsql; + + CREATE TRIGGER trg_decrement_user_storage_counter + AFTER DELETE ON uploads + FOR EACH ROW + EXECUTE FUNCTION fn_decrement_user_storage_counter(); + `.execute(db); + + await sql` + CREATE OR REPLACE FUNCTION fn_update_user_storage_counter() RETURNS TRIGGER AS $$ + DECLARE + size_difference BIGINT; + BEGIN + IF OLD.size IS DISTINCT FROM NEW.size THEN + size_difference := NEW.size - OLD.size; + + UPDATE counters + SET + value = GREATEST(0, value + size_difference), + updated_at = NOW() + WHERE key = CONCAT(NEW.created_by, '.storage.used'); + END IF; + + RETURN NEW; + END; + $$ LANGUAGE plpgsql; + + CREATE TRIGGER trg_update_user_storage_counter + AFTER UPDATE ON uploads + FOR EACH ROW + EXECUTE FUNCTION fn_update_user_storage_counter(); + `.execute(db); + }, + down: async (db) => { + await sql` + DROP TRIGGER IF EXISTS trg_increment_user_storage_counter ON uploads; + DROP TRIGGER IF EXISTS trg_decrement_user_storage_counter ON uploads; + DROP TRIGGER IF EXISTS trg_update_user_storage_counter ON uploads; + DROP FUNCTION IF EXISTS fn_increment_user_storage_counter(); + DROP FUNCTION IF EXISTS fn_decrement_user_storage_counter(); + DROP FUNCTION IF EXISTS fn_update_user_storage_counter(); + `.execute(db); + }, +}; diff --git a/apps/server/src/data/migrations/index.ts b/apps/server/src/data/migrations/index.ts index 04823b85..76c2e146 100644 --- a/apps/server/src/data/migrations/index.ts +++ b/apps/server/src/data/migrations/index.ts @@ -21,6 +21,11 @@ import { createNodeEmbeddingsTable } from './00018-create-node-embeddings-table' import { createDocumentEmbeddingsTable } from './00019-create-document-embeddings-table'; import { alterDevicesPlatformColumn } from './00020-alter-devices-platform-column'; import { renameAccountAttributesColumn } from './00021-rename-account-attributes-column'; +import { createCountersTable } from './00022-create-counters-table'; +import { createWorkspaceUserCounterTriggers } from './00023-create-workspace-user-counter-triggers'; +import { createWorkspaceNodeCounterTriggers } from './00024-create-workspace-node-counter-triggers'; +import { createWorkspaceUploadCounterTriggers } from './00025-create-workspace-upload-counter-triggers'; +import { createUserUploadCounterTriggers } from './00026-create-user-upload-counter-triggers'; export const databaseMigrations: Record = { '00001_create_accounts_table': createAccountsTable, @@ -44,4 +49,12 @@ export const databaseMigrations: Record = { '00019_create_document_embeddings_table': createDocumentEmbeddingsTable, '00020_alter_devices_platform_column': alterDevicesPlatformColumn, '00021_rename_account_attributes_column': renameAccountAttributesColumn, + '00022_create_counters_table': createCountersTable, + '00023_create_workspace_user_counter_triggers': + createWorkspaceUserCounterTriggers, + '00024_create_workspace_node_counter_triggers': + createWorkspaceNodeCounterTriggers, + '00025_create_workspace_upload_counter_triggers': + createWorkspaceUploadCounterTriggers, + '00026_create_user_upload_counter_triggers': createUserUploadCounterTriggers, }; diff --git a/apps/server/src/data/schema.ts b/apps/server/src/data/schema.ts index 172ac76d..9d4588d1 100644 --- a/apps/server/src/data/schema.ts +++ b/apps/server/src/data/schema.ts @@ -298,6 +298,13 @@ export type SelectDocumentEmbedding = Selectable; export type CreateDocumentEmbedding = Insertable; export type UpdateDocumentEmbedding = Updateable; +interface CounterTable { + key: ColumnType; + value: ColumnType; + created_at: ColumnType; + updated_at: ColumnType; +} + export interface DatabaseSchema { accounts: AccountTable; devices: DeviceTable; @@ -315,4 +322,5 @@ export interface DatabaseSchema { uploads: UploadTable; node_embeddings: NodeEmbeddingTable; document_embeddings: DocumentEmbeddingTable; + counters: CounterTable; } diff --git a/apps/server/src/lib/counters.ts b/apps/server/src/lib/counters.ts new file mode 100644 index 00000000..a81f80b7 --- /dev/null +++ b/apps/server/src/lib/counters.ts @@ -0,0 +1,18 @@ +import { Kysely, Transaction } from 'kysely'; + +import { DatabaseSchema } from '@colanode/server/data/schema'; + +export type CounterKey = `${string}.storage.used` | `${string}.nodes.count`; + +export const fetchCounter = async ( + database: Kysely | Transaction, + key: CounterKey +) => { + const counter = await database + .selectFrom('counters') + .selectAll() + .where('key', '=', key) + .executeTakeFirst(); + + return counter?.value ? BigInt(counter.value) : BigInt(0); +};