Implement pre-aggregated counters for users, nodes and upload storage used (#82)

This commit is contained in:
Hakan Shehu
2025-06-23 21:45:52 +02:00
committed by GitHub
parent 3fd7932f9c
commit b3590a70ed
9 changed files with 426 additions and 9 deletions

View File

@@ -13,6 +13,7 @@ import {
import { database } from '@colanode/server/data/database';
import { s3Client } from '@colanode/server/data/storage';
import { config } from '@colanode/server/lib/config';
import { fetchCounter } from '@colanode/server/lib/counters';
import { buildFilePath } from '@colanode/server/lib/files';
import { mapNode, updateNode } from '@colanode/server/lib/nodes';
@@ -87,16 +88,12 @@ export const fileUploadRoute: FastifyPluginCallbackZod = (
});
}
const storageUsedRow = await database
.selectFrom('uploads')
.select(({ fn }) => [fn.sum('size').as('storage_used')])
.where('created_by', '=', request.user.id)
.executeTakeFirst();
const storageUsed = await fetchCounter(
database,
`${user.id}.storage.used`
);
const storageUsed = BigInt(storageUsedRow?.storage_used ?? 0);
const storageLimit = BigInt(user.storage_limit);
if (storageUsed >= storageLimit) {
if (storageUsed >= BigInt(user.storage_limit)) {
return reply.code(400).send({
code: ApiErrorCode.FileUploadInitFailed,
message: 'You have reached the maximum storage limit.',

View File

@@ -0,0 +1,16 @@
import { Migration } from 'kysely';
export const createCountersTable: Migration = {
up: async (db) => {
await db.schema
.createTable('counters')
.addColumn('key', 'varchar(500)', (col) => col.notNull().primaryKey())
.addColumn('value', 'bigint', (col) => col.notNull().defaultTo(0))
.addColumn('created_at', 'timestamptz', (col) => col.notNull())
.addColumn('updated_at', 'timestamptz')
.execute();
},
down: async (db) => {
await db.schema.dropTable('counters').execute();
},
};

View File

@@ -0,0 +1,80 @@
import { Migration, sql } from 'kysely';
export const createWorkspaceUserCounterTriggers: Migration = {
up: async (db) => {
await db
.insertInto('counters')
.columns(['key', 'value', 'created_at'])
.expression((eb) =>
eb
.selectFrom('users')
.select([
eb
.fn('concat', [
eb.cast(eb.val(''), 'varchar'),
eb.ref('workspace_id'),
eb.cast(eb.val('.users.count'), 'varchar'),
])
.as('key'),
eb.fn.count('id').as('value'),
eb.val(new Date()).as('created_at'),
])
.groupBy('workspace_id')
)
.execute();
// Create trigger function to increment user counter on user insert
await sql`
CREATE OR REPLACE FUNCTION fn_increment_workspace_user_counter() RETURNS TRIGGER AS $$
BEGIN
INSERT INTO counters (key, value, created_at, updated_at)
VALUES (
CONCAT(NEW.workspace_id, '.users.count'),
1,
NOW(),
NOW()
)
ON CONFLICT (key)
DO UPDATE SET
value = counters.value + 1,
updated_at = NOW();
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER trg_increment_workspace_user_counter
AFTER INSERT ON users
FOR EACH ROW
EXECUTE FUNCTION fn_increment_workspace_user_counter();
`.execute(db);
// Create trigger function to decrement user counter on user delete
await sql`
CREATE OR REPLACE FUNCTION fn_decrement_workspace_user_counter() RETURNS TRIGGER AS $$
BEGIN
UPDATE counters
SET
value = GREATEST(0, value - 1),
updated_at = NOW()
WHERE key = CONCAT(OLD.workspace_id, '.users.count');
RETURN OLD;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER trg_decrement_workspace_user_counter
AFTER DELETE ON users
FOR EACH ROW
EXECUTE FUNCTION fn_decrement_workspace_user_counter();
`.execute(db);
},
down: async (db) => {
await sql`
DROP TRIGGER IF EXISTS trg_increment_workspace_user_counter ON users;
DROP TRIGGER IF EXISTS trg_decrement_workspace_user_counter ON users;
DROP FUNCTION IF EXISTS fn_increment_workspace_user_counter();
DROP FUNCTION IF EXISTS fn_decrement_workspace_user_counter();
`.execute(db);
},
};

View File

@@ -0,0 +1,77 @@
import { Migration, sql } from 'kysely';
export const createWorkspaceNodeCounterTriggers: Migration = {
up: async (db) => {
await db
.insertInto('counters')
.columns(['key', 'value', 'created_at'])
.expression((eb) =>
eb
.selectFrom('nodes')
.select([
eb
.fn('concat', [
eb.ref('workspace_id'),
eb.cast(eb.val('.nodes.count'), 'varchar'),
])
.as('key'),
eb.fn.count('id').as('value'),
eb.val(new Date()).as('created_at'),
])
.groupBy('workspace_id')
)
.execute();
await sql`
CREATE OR REPLACE FUNCTION fn_increment_workspace_node_counter() RETURNS TRIGGER AS $$
BEGIN
INSERT INTO counters (key, value, created_at, updated_at)
VALUES (
CONCAT(NEW.workspace_id, '.nodes.count'),
1,
NOW(),
NOW()
)
ON CONFLICT (key)
DO UPDATE SET
value = counters.value + 1,
updated_at = NOW();
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER trg_increment_workspace_node_counter
AFTER INSERT ON nodes
FOR EACH ROW
EXECUTE FUNCTION fn_increment_workspace_node_counter();
`.execute(db);
await sql`
CREATE OR REPLACE FUNCTION fn_decrement_workspace_node_counter() RETURNS TRIGGER AS $$
BEGIN
UPDATE counters
SET
value = GREATEST(0, value - 1),
updated_at = NOW()
WHERE key = CONCAT(OLD.workspace_id, '.nodes.count');
RETURN OLD;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER trg_decrement_workspace_node_counter
AFTER DELETE ON nodes
FOR EACH ROW
EXECUTE FUNCTION fn_decrement_workspace_node_counter();
`.execute(db);
},
down: async (db) => {
await sql`
DROP TRIGGER IF EXISTS trg_increment_workspace_node_counter ON nodes;
DROP TRIGGER IF EXISTS trg_decrement_workspace_node_counter ON nodes;
DROP FUNCTION IF EXISTS fn_increment_workspace_node_counter();
DROP FUNCTION IF EXISTS fn_decrement_workspace_node_counter();
`.execute(db);
},
};

View File

@@ -0,0 +1,104 @@
import { Migration, sql } from 'kysely';
export const createWorkspaceUploadCounterTriggers: Migration = {
up: async (db) => {
await db
.insertInto('counters')
.columns(['key', 'value', 'created_at'])
.expression((eb) =>
eb
.selectFrom('uploads')
.select([
eb
.fn('concat', [
eb.ref('workspace_id'),
eb.cast(eb.val('.storage.used'), 'varchar'),
])
.as('key'),
eb.fn.sum('size').as('value'),
eb.val(new Date()).as('created_at'),
])
.groupBy('workspace_id')
)
.execute();
await sql`
CREATE OR REPLACE FUNCTION fn_increment_workspace_storage_counter() RETURNS TRIGGER AS $$
BEGIN
INSERT INTO counters (key, value, created_at, updated_at)
VALUES (
CONCAT(NEW.workspace_id, '.storage.used'),
NEW.size,
NOW(),
NOW()
)
ON CONFLICT (key)
DO UPDATE SET
value = counters.value + NEW.size,
updated_at = NOW();
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER trg_increment_workspace_storage_counter
AFTER INSERT ON uploads
FOR EACH ROW
EXECUTE FUNCTION fn_increment_workspace_storage_counter();
`.execute(db);
await sql`
CREATE OR REPLACE FUNCTION fn_decrement_workspace_storage_counter() RETURNS TRIGGER AS $$
BEGIN
UPDATE counters
SET
value = GREATEST(0, value - OLD.size),
updated_at = NOW()
WHERE key = CONCAT(OLD.workspace_id, '.storage.used');
RETURN OLD;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER trg_decrement_workspace_storage_counter
AFTER DELETE ON uploads
FOR EACH ROW
EXECUTE FUNCTION fn_decrement_workspace_storage_counter();
`.execute(db);
await sql`
CREATE OR REPLACE FUNCTION fn_update_workspace_storage_counter() RETURNS TRIGGER AS $$
DECLARE
size_difference BIGINT;
BEGIN
IF OLD.size IS DISTINCT FROM NEW.size THEN
size_difference := NEW.size - OLD.size;
UPDATE counters
SET
value = GREATEST(0, value + size_difference),
updated_at = NOW()
WHERE key = CONCAT(NEW.workspace_id, '.storage.used');
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER trg_update_workspace_storage_counter
AFTER UPDATE ON uploads
FOR EACH ROW
EXECUTE FUNCTION fn_update_workspace_storage_counter();
`.execute(db);
},
down: async (db) => {
await sql`
DROP TRIGGER IF EXISTS trg_increment_workspace_storage_counter ON uploads;
DROP TRIGGER IF EXISTS trg_decrement_workspace_storage_counter ON uploads;
DROP TRIGGER IF EXISTS trg_update_workspace_storage_counter ON uploads;
DROP FUNCTION IF EXISTS fn_increment_workspace_storage_counter();
DROP FUNCTION IF EXISTS fn_decrement_workspace_storage_counter();
DROP FUNCTION IF EXISTS fn_update_workspace_storage_counter();
`.execute(db);
},
};

View File

@@ -0,0 +1,104 @@
import { Migration, sql } from 'kysely';
export const createUserUploadCounterTriggers: Migration = {
up: async (db) => {
await db
.insertInto('counters')
.columns(['key', 'value', 'created_at'])
.expression((eb) =>
eb
.selectFrom('uploads')
.select([
eb
.fn('concat', [
eb.ref('created_by'),
eb.cast(eb.val('.storage.used'), 'varchar'),
])
.as('key'),
eb.fn.sum('size').as('value'),
eb.val(new Date()).as('created_at'),
])
.groupBy('created_by')
)
.execute();
await sql`
CREATE OR REPLACE FUNCTION fn_increment_user_storage_counter() RETURNS TRIGGER AS $$
BEGIN
INSERT INTO counters (key, value, created_at, updated_at)
VALUES (
CONCAT(NEW.created_by, '.storage.used'),
NEW.size,
NOW(),
NOW()
)
ON CONFLICT (key)
DO UPDATE SET
value = counters.value + NEW.size,
updated_at = NOW();
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER trg_increment_user_storage_counter
AFTER INSERT ON uploads
FOR EACH ROW
EXECUTE FUNCTION fn_increment_user_storage_counter();
`.execute(db);
await sql`
CREATE OR REPLACE FUNCTION fn_decrement_user_storage_counter() RETURNS TRIGGER AS $$
BEGIN
UPDATE counters
SET
value = GREATEST(0, value - OLD.size),
updated_at = NOW()
WHERE key = CONCAT(OLD.created_by, '.storage.used');
RETURN OLD;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER trg_decrement_user_storage_counter
AFTER DELETE ON uploads
FOR EACH ROW
EXECUTE FUNCTION fn_decrement_user_storage_counter();
`.execute(db);
await sql`
CREATE OR REPLACE FUNCTION fn_update_user_storage_counter() RETURNS TRIGGER AS $$
DECLARE
size_difference BIGINT;
BEGIN
IF OLD.size IS DISTINCT FROM NEW.size THEN
size_difference := NEW.size - OLD.size;
UPDATE counters
SET
value = GREATEST(0, value + size_difference),
updated_at = NOW()
WHERE key = CONCAT(NEW.created_by, '.storage.used');
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER trg_update_user_storage_counter
AFTER UPDATE ON uploads
FOR EACH ROW
EXECUTE FUNCTION fn_update_user_storage_counter();
`.execute(db);
},
down: async (db) => {
await sql`
DROP TRIGGER IF EXISTS trg_increment_user_storage_counter ON uploads;
DROP TRIGGER IF EXISTS trg_decrement_user_storage_counter ON uploads;
DROP TRIGGER IF EXISTS trg_update_user_storage_counter ON uploads;
DROP FUNCTION IF EXISTS fn_increment_user_storage_counter();
DROP FUNCTION IF EXISTS fn_decrement_user_storage_counter();
DROP FUNCTION IF EXISTS fn_update_user_storage_counter();
`.execute(db);
},
};

View File

@@ -21,6 +21,11 @@ import { createNodeEmbeddingsTable } from './00018-create-node-embeddings-table'
import { createDocumentEmbeddingsTable } from './00019-create-document-embeddings-table';
import { alterDevicesPlatformColumn } from './00020-alter-devices-platform-column';
import { renameAccountAttributesColumn } from './00021-rename-account-attributes-column';
import { createCountersTable } from './00022-create-counters-table';
import { createWorkspaceUserCounterTriggers } from './00023-create-workspace-user-counter-triggers';
import { createWorkspaceNodeCounterTriggers } from './00024-create-workspace-node-counter-triggers';
import { createWorkspaceUploadCounterTriggers } from './00025-create-workspace-upload-counter-triggers';
import { createUserUploadCounterTriggers } from './00026-create-user-upload-counter-triggers';
export const databaseMigrations: Record<string, Migration> = {
'00001_create_accounts_table': createAccountsTable,
@@ -44,4 +49,12 @@ export const databaseMigrations: Record<string, Migration> = {
'00019_create_document_embeddings_table': createDocumentEmbeddingsTable,
'00020_alter_devices_platform_column': alterDevicesPlatformColumn,
'00021_rename_account_attributes_column': renameAccountAttributesColumn,
'00022_create_counters_table': createCountersTable,
'00023_create_workspace_user_counter_triggers':
createWorkspaceUserCounterTriggers,
'00024_create_workspace_node_counter_triggers':
createWorkspaceNodeCounterTriggers,
'00025_create_workspace_upload_counter_triggers':
createWorkspaceUploadCounterTriggers,
'00026_create_user_upload_counter_triggers': createUserUploadCounterTriggers,
};

View File

@@ -298,6 +298,13 @@ export type SelectDocumentEmbedding = Selectable<DocumentEmbeddingTable>;
export type CreateDocumentEmbedding = Insertable<DocumentEmbeddingTable>;
export type UpdateDocumentEmbedding = Updateable<DocumentEmbeddingTable>;
interface CounterTable {
key: ColumnType<string, string, never>;
value: ColumnType<string, string, string>;
created_at: ColumnType<Date, Date, never>;
updated_at: ColumnType<Date | null, Date | null, Date | null>;
}
export interface DatabaseSchema {
accounts: AccountTable;
devices: DeviceTable;
@@ -315,4 +322,5 @@ export interface DatabaseSchema {
uploads: UploadTable;
node_embeddings: NodeEmbeddingTable;
document_embeddings: DocumentEmbeddingTable;
counters: CounterTable;
}

View File

@@ -0,0 +1,18 @@
import { Kysely, Transaction } from 'kysely';
import { DatabaseSchema } from '@colanode/server/data/schema';
export type CounterKey = `${string}.storage.used` | `${string}.nodes.count`;
export const fetchCounter = async (
database: Kysely<DatabaseSchema> | Transaction<DatabaseSchema>,
key: CounterKey
) => {
const counter = await database
.selectFrom('counters')
.selectAll()
.where('key', '=', key)
.executeTakeFirst();
return counter?.value ? BigInt(counter.value) : BigInt(0);
};