Use zod for config parsing and some folder restructure (#29)

This commit is contained in:
Hakan Shehu
2025-05-04 11:31:41 +02:00
committed by GitHub
parent 14bcb55ea0
commit f22e6d43bf
48 changed files with 1599 additions and 1495 deletions

View File

@@ -9,7 +9,7 @@ import {
import { database } from '@/data/database';
import { isAuthEmailRateLimited } from '@/lib/rate-limits';
import { configuration } from '@/lib/configuration';
import { config } from '@/lib/config';
import {
buildLoginSuccessOutput,
buildLoginVerifyOutput,
@@ -59,7 +59,7 @@ export const emailLoginRoute: FastifyPluginCallbackZod = (
}
if (account.status === AccountStatus.Unverified) {
if (configuration.account.verificationType === 'email') {
if (config.account.verificationType === 'email') {
const output = await buildLoginVerifyOutput(account);
return output;
}

View File

@@ -11,7 +11,7 @@ import {
import { database } from '@/data/database';
import { isAuthEmailRateLimited } from '@/lib/rate-limits';
import { configuration } from '@/lib/configuration';
import { config } from '@/lib/config';
import { generateOtpCode, saveOtp } from '@/lib/otps';
import { AccountPasswordResetOtpAttributes, Otp } from '@/types/otps';
import { jobService } from '@/services/job-service';
@@ -45,9 +45,7 @@ export const emailPasswordResetInitRoute: FastifyPluginCallbackZod = (
}
const id = generateId(IdType.OtpCode);
const expiresAt = new Date(
Date.now() + configuration.account.otpTimeout * 1000
);
const expiresAt = new Date(Date.now() + config.account.otpTimeout * 1000);
const otpCode = await generateOtpCode();
const account = await database

View File

@@ -12,7 +12,7 @@ import {
import { database } from '@/data/database';
import { SelectAccount } from '@/data/schema';
import { isAuthEmailRateLimited } from '@/lib/rate-limits';
import { configuration } from '@/lib/configuration';
import { config } from '@/lib/config';
import {
buildLoginSuccessOutput,
buildLoginVerifyOutput,
@@ -58,7 +58,7 @@ export const emailRegisterRoute: FastifyPluginCallbackZod = (
let account: SelectAccount | null | undefined = null;
const status =
configuration.account.verificationType === 'automatic'
config.account.verificationType === 'automatic'
? AccountStatus.Active
: AccountStatus.Unverified;
@@ -104,7 +104,7 @@ export const emailRegisterRoute: FastifyPluginCallbackZod = (
}
if (account.status === AccountStatus.Unverified) {
if (configuration.account.verificationType === 'email') {
if (config.account.verificationType === 'email') {
const output = await buildLoginVerifyOutput(account);
return output;
}

View File

@@ -12,7 +12,7 @@ import {
import axios from 'axios';
import { database } from '@/data/database';
import { configuration } from '@/lib/configuration';
import { config } from '@/lib/config';
import { buildLoginSuccessOutput } from '@/lib/accounts';
const GoogleUserInfoUrl = 'https://www.googleapis.com/oauth2/v1/userinfo';
@@ -34,7 +34,7 @@ export const googleLoginRoute: FastifyPluginCallbackZod = (
},
},
handler: async (request, reply) => {
if (!configuration.account.allowGoogleLogin) {
if (!config.account.allowGoogleLogin) {
return reply.code(400).send({
code: ApiErrorCode.GoogleAuthFailed,
message: 'Google login is not allowed.',

View File

@@ -6,7 +6,7 @@ import { ApiErrorCode } from '@colanode/core';
import { Readable } from 'stream';
import { avatarS3 } from '@/data/storage';
import { configuration } from '@/lib/configuration';
import { config } from '@/lib/config';
export const avatarDownloadRoute: FastifyPluginCallbackZod = (
instance,
@@ -25,7 +25,7 @@ export const avatarDownloadRoute: FastifyPluginCallbackZod = (
try {
const avatarId = request.params.avatarId;
const command = new GetObjectCommand({
Bucket: configuration.avatarS3.bucketName,
Bucket: config.avatarS3.bucketName,
Key: `avatars/${avatarId}.jpeg`,
});

View File

@@ -10,7 +10,7 @@ import {
} from '@colanode/core';
import { avatarS3 } from '@/data/storage';
import { configuration } from '@/lib/configuration';
import { config } from '@/lib/config';
export const avatarUploadRoute: FastifyPluginCallbackZod = (
instance,
@@ -64,7 +64,7 @@ export const avatarUploadRoute: FastifyPluginCallbackZod = (
const avatarId = generateId(IdType.Avatar);
const command = new PutObjectCommand({
Bucket: configuration.avatarS3.bucketName,
Bucket: config.avatarS3.bucketName,
Key: `avatars/${avatarId}.jpeg`,
Body: jpegBuffer,
ContentType: 'image/jpeg',

View File

@@ -1,7 +1,7 @@
import { FastifyPluginCallbackZod } from 'fastify-type-provider-zod';
import { ServerConfig, serverConfigSchema } from '@colanode/core';
import { configuration } from '@/lib/configuration';
import { config } from '@/lib/config';
export const configGetRoute: FastifyPluginCallbackZod = (instance, _, done) => {
instance.route({
@@ -13,16 +13,16 @@ export const configGetRoute: FastifyPluginCallbackZod = (instance, _, done) => {
},
},
handler: async (request) => {
const config: ServerConfig = {
name: configuration.server.name,
avatar: configuration.server.avatar,
version: configuration.server.version,
sha: configuration.server.sha,
const output: ServerConfig = {
name: config.server.name,
avatar: config.server.avatar ?? '',
version: config.server.version,
sha: config.server.sha,
ip: request.client.ip,
attributes: {},
};
return config;
return output;
},
});

View File

@@ -16,7 +16,7 @@ import { database } from '@/data/database';
import { getNameFromEmail } from '@/lib/utils';
import { SelectAccount } from '@/data/schema';
import { eventBus } from '@/lib/event-bus';
import { configuration } from '@/lib/configuration';
import { config } from '@/lib/config';
export const userCreateRoute: FastifyPluginCallbackZod = (
instance,
@@ -95,8 +95,8 @@ export const userCreateRoute: FastifyPluginCallbackZod = (
name: account.name,
email: account.email,
avatar: account.avatar,
storage_limit: configuration.user.storageLimit.toString(),
max_file_size: configuration.user.maxFileSize.toString(),
storage_limit: config.user.storageLimit.toString(),
max_file_size: config.user.maxFileSize.toString(),
created_at: new Date(),
created_by: request.account.id,
status: UserStatus.Active,

View File

@@ -12,7 +12,7 @@ import { createDebugger } from '@colanode/core';
import { databaseMigrations } from '@/data/migrations';
import { DatabaseSchema } from '@/data/schema';
import { configuration } from '@/lib/configuration';
import { config } from '@/lib/config';
const debug = createDebugger('server:database');
@@ -26,11 +26,11 @@ pg.types.setTypeParser(pg.types.builtins.INT4, (val) => {
const dialect = new PostgresDialect({
pool: new pg.Pool({
connectionString: configuration.postgres.url,
connectionString: config.postgres.url,
ssl:
configuration.postgres.ssl &&
Object.values(configuration.postgres.ssl).some((value) => value)
? configuration.postgres.ssl
config.postgres.ssl &&
Object.values(config.postgres.ssl).some((value) => value)
? config.postgres.ssl
: undefined,
}),
});

View File

@@ -1,10 +1,10 @@
import { createClient } from 'redis';
import { configuration } from '@/lib/configuration';
import { config } from '@/lib/config';
export const redis = createClient({
url: configuration.redis.url,
database: configuration.redis.db,
url: config.redis.url,
database: config.redis.db,
});
export const initRedis = async () => {

View File

@@ -1,21 +1,21 @@
import { S3Client } from '@aws-sdk/client-s3';
import { configuration } from '@/lib/configuration';
import { config } from '@/lib/config';
export const avatarS3 = new S3Client({
endpoint: configuration.avatarS3.endpoint,
region: configuration.avatarS3.region,
endpoint: config.avatarS3.endpoint,
region: config.avatarS3.region,
credentials: {
accessKeyId: configuration.avatarS3.accessKey,
secretAccessKey: configuration.avatarS3.secretKey,
accessKeyId: config.avatarS3.accessKey,
secretAccessKey: config.avatarS3.secretKey,
},
});
export const fileS3 = new S3Client({
endpoint: configuration.fileS3.endpoint,
region: configuration.fileS3.region,
endpoint: config.fileS3.endpoint,
region: config.fileS3.region,
credentials: {
accessKeyId: configuration.fileS3.accessKey,
secretAccessKey: configuration.fileS3.secretKey,
accessKeyId: config.fileS3.accessKey,
secretAccessKey: config.fileS3.secretKey,
},
});

View File

@@ -7,10 +7,10 @@ import {
} from '@colanode/core';
import { database } from '@/data/database';
import { configuration } from '@/lib/configuration';
import { config } from '@/lib/config';
import { fetchNode, createNode } from '@/lib/nodes';
import { JobHandler } from '@/types/jobs';
import { runAssistantResponseChain } from '@/lib/assistant';
import { runAssistantResponseChain } from '@/lib/ai/assistants';
import { SelectNode } from '@/data/schema';
import { Citation } from '@/types/assistant';
@@ -39,7 +39,7 @@ export const assistantResponseHandler: JobHandler<
selectedContextNodeIds,
});
if (!configuration.ai.enabled) {
if (!config.ai.enabled) {
return;
}

View File

@@ -1,12 +1,12 @@
import { extractDocumentText } from '@colanode/core';
import { database } from '@/data/database';
import { configuration } from '@/lib/configuration';
import { config } from '@/lib/config';
import {
fetchEmbeddingCursor,
scheduleDocumentEmbedding,
updateEmbeddingCursor,
} from '@/lib/embeddings';
} from '@/lib/ai/embeddings';
const BATCH_SIZE = 100;
@@ -23,7 +23,7 @@ declare module '@/types/jobs' {
}
export const checkDocumentEmbeddingsHandler = async () => {
if (!configuration.ai.enabled) {
if (!config.ai.enabled) {
return;
}

View File

@@ -1,12 +1,12 @@
import { getNodeModel } from '@colanode/core';
import { database } from '@/data/database';
import { configuration } from '@/lib/configuration';
import { config } from '@/lib/config';
import {
fetchEmbeddingCursor,
scheduleNodeEmbedding,
updateEmbeddingCursor,
} from '@/lib/embeddings';
} from '@/lib/ai/embeddings';
const BATCH_SIZE = 100;
@@ -23,7 +23,7 @@ declare module '@/types/jobs' {
}
export const checkNodeEmbeddingsHandler = async () => {
if (!configuration.ai.enabled) {
if (!config.ai.enabled) {
return;
}

View File

@@ -2,9 +2,9 @@ import { OpenAIEmbeddings } from '@langchain/openai';
import { sql } from 'kysely';
import { extractDocumentText, getNodeModel } from '@colanode/core';
import { chunkText } from '@/lib/chunking';
import { chunkText } from '@/lib/ai/chunking';
import { database } from '@/data/database';
import { configuration } from '@/lib/configuration';
import { config } from '@/lib/config';
import { CreateDocumentEmbedding } from '@/data/schema';
import { fetchNode } from '@/lib/nodes';
@@ -25,7 +25,7 @@ export const embedDocumentHandler = async (input: {
type: 'embed_document';
documentId: string;
}) => {
if (!configuration.ai.enabled) {
if (!config.ai.enabled) {
return;
}
@@ -61,9 +61,9 @@ export const embedDocumentHandler = async (input: {
}
const embeddings = new OpenAIEmbeddings({
apiKey: configuration.ai.embedding.apiKey,
modelName: configuration.ai.embedding.modelName,
dimensions: configuration.ai.embedding.dimensions,
apiKey: config.ai.embedding.apiKey,
modelName: config.ai.embedding.modelName,
dimensions: config.ai.embedding.dimensions,
});
const existingEmbeddings = await database
@@ -112,7 +112,7 @@ export const embedDocumentHandler = async (input: {
});
}
const batchSize = configuration.ai.embedding.batchSize;
const batchSize = config.ai.embedding.batchSize;
for (let i = 0; i < embeddingsToUpsert.length; i += batchSize) {
const batch = embeddingsToUpsert.slice(i, i + batchSize);
const textsToEmbed = batch.map((item) =>

View File

@@ -2,9 +2,9 @@ import { OpenAIEmbeddings } from '@langchain/openai';
import { sql } from 'kysely';
import { getNodeModel } from '@colanode/core';
import { chunkText } from '@/lib/chunking';
import { chunkText } from '@/lib/ai/chunking';
import { database } from '@/data/database';
import { configuration } from '@/lib/configuration';
import { config } from '@/lib/config';
import { CreateNodeEmbedding } from '@/data/schema';
import { fetchNode } from '@/lib/nodes';
@@ -25,7 +25,7 @@ export const embedNodeHandler = async (input: {
type: 'embed_node';
nodeId: string;
}) => {
if (!configuration.ai.enabled) {
if (!config.ai.enabled) {
return;
}
@@ -60,9 +60,9 @@ export const embedNodeHandler = async (input: {
}
const embeddings = new OpenAIEmbeddings({
apiKey: configuration.ai.embedding.apiKey,
modelName: configuration.ai.embedding.modelName,
dimensions: configuration.ai.embedding.dimensions,
apiKey: config.ai.embedding.apiKey,
modelName: config.ai.embedding.modelName,
dimensions: config.ai.embedding.dimensions,
});
const existingEmbeddings = await database
@@ -118,7 +118,7 @@ export const embedNodeHandler = async (input: {
return;
}
const batchSize = configuration.ai.embedding.batchSize;
const batchSize = config.ai.embedding.batchSize;
for (let i = 0; i < embeddingsToUpsert.length; i += batchSize) {
const batch = embeddingsToUpsert.slice(i, i + batchSize);
const textsToEmbed = batch.map((item) =>

View File

@@ -9,7 +9,7 @@ import {
} from '@colanode/core';
import argon2 from '@node-rs/argon2';
import { configuration } from '@/lib/configuration';
import { config } from '@/lib/config';
import { database } from '@/data/database';
import { SelectAccount } from '@/data/schema';
import { generateToken } from '@/lib/tokens';
@@ -137,9 +137,7 @@ export const buildLoginVerifyOutput = async (
account: SelectAccount
): Promise<LoginVerifyOutput> => {
const id = generateId(IdType.OtpCode);
const expiresAt = new Date(
Date.now() + configuration.account.otpTimeout * 1000
);
const expiresAt = new Date(Date.now() + config.account.otpTimeout * 1000);
const otpCode = await generateOtpCode();
const otp: Otp<AccountVerifyOtpAttributes> = {

View File

@@ -8,7 +8,7 @@ import {
} from '@colanode/core';
import { database } from '@/data/database';
import { configuration } from '@/lib/configuration';
import { config } from '@/lib/config';
import { fetchNode, fetchNodeDescendants } from '@/lib/nodes';
import {
rewriteQuery,
@@ -17,10 +17,10 @@ import {
rerankDocuments,
generateFinalAnswer,
generateDatabaseFilters,
} from '@/services/llm-service';
import { nodeRetrievalService } from '@/services/node-retrieval-service';
import { documentRetrievalService } from '@/services/document-retrieval-service';
import { recordsRetrievalService } from '@/services/records-retrieval-service';
} from '@/lib/ai/llms';
import { retrieveNodes } from '@/lib/ai/node-retrievals';
import { retrieveDocuments } from '@/lib/ai/document-retrievals';
import { retrieveByFilters } from '@/lib/records';
import {
AssistantChainState,
ResponseState,
@@ -29,49 +29,53 @@ import {
AssistantResponse,
AssistantInput,
} from '@/types/assistant';
import { fetchMetadataForContextItems } from '@/lib/metadata';
import { fetchMetadataForContextItems } from '@/lib/ai/metadata';
import { SelectNode } from '@/data/schema';
import {
formatChatHistory,
formatContextDocuments,
selectTopContext,
formatMetadataForPrompt,
} from '@/lib/ai-utils';
} from '@/lib/ai/utils';
async function generateRewrittenQuery(state: AssistantChainState) {
const generateRewrittenQuery = async (state: AssistantChainState) => {
const rewrittenQuery = await rewriteQuery(state.userInput);
return { rewrittenQuery };
}
};
async function assessIntent(state: AssistantChainState) {
const assessIntent = async (state: AssistantChainState) => {
const chatHistory = formatChatHistory(state.chatHistory);
const intent = await assessUserIntent(state.userInput, chatHistory);
return { intent };
}
};
async function generateNoContextResponse(state: AssistantChainState) {
const generateNoContextResponse = async (state: AssistantChainState) => {
const chatHistory = formatChatHistory(state.chatHistory);
const finalAnswer = await generateNoContextAnswer(
state.userInput,
chatHistory
);
return { finalAnswer };
}
};
const fetchContextDocuments = async (state: AssistantChainState) => {
if (!config.ai.enabled) {
return { contextDocuments: [] };
}
async function fetchContextDocuments(state: AssistantChainState) {
const [nodeResults, documentResults] = await Promise.all([
nodeRetrievalService.retrieve(
retrieveNodes(
state.rewrittenQuery,
state.workspaceId,
state.userId,
configuration.ai.retrieval.hybridSearch.maxResults,
config.ai.retrieval.hybridSearch.maxResults,
state.selectedContextNodeIds
),
documentRetrievalService.retrieve(
retrieveDocuments(
state.rewrittenQuery,
state.workspaceId,
state.userId,
configuration.ai.retrieval.hybridSearch.maxResults,
config.ai.retrieval.hybridSearch.maxResults,
state.selectedContextNodeIds
),
]);
@@ -79,7 +83,7 @@ async function fetchContextDocuments(state: AssistantChainState) {
if (state.databaseFilters.shouldFilter) {
const filteredRecords = await Promise.all(
state.databaseFilters.filters.map(async (filter) => {
const records = await recordsRetrievalService.retrieveByFilters(
const records = await retrieveByFilters(
filter.databaseId,
state.workspaceId,
state.userId,
@@ -112,9 +116,9 @@ async function fetchContextDocuments(state: AssistantChainState) {
return {
contextDocuments: [...nodeResults, ...documentResults, ...databaseResults],
};
}
};
async function fetchChatHistory(state: AssistantChainState) {
const fetchChatHistory = async (state: AssistantChainState) => {
const messages = await database
.selectFrom('nodes')
.where('parent_id', '=', state.parentMessageId)
@@ -146,9 +150,9 @@ async function fetchChatHistory(state: AssistantChainState) {
});
return { chatHistory };
}
};
async function rerankContextDocuments(state: AssistantChainState) {
const rerankContextDocuments = async (state: AssistantChainState) => {
const docsForRerank = state.contextDocuments.map((doc) => ({
content: doc.pageContent,
type: doc.metadata.type,
@@ -160,9 +164,9 @@ async function rerankContextDocuments(state: AssistantChainState) {
);
return { rerankedContext };
}
};
async function selectRelevantDocuments(state: AssistantChainState) {
const selectRelevantDocuments = async (state: AssistantChainState) => {
if (state.rerankedContext.length === 0) {
return { topContext: [] };
}
@@ -189,17 +193,17 @@ async function selectRelevantDocuments(state: AssistantChainState) {
});
return { topContext };
}
};
async function fetchWorkspaceDetails(workspaceId: string) {
const fetchWorkspaceDetails = async (workspaceId: string) => {
return database
.selectFrom('workspaces')
.where('id', '=', workspaceId)
.select(['name', 'id'])
.executeTakeFirst();
}
};
async function generateResponse(state: AssistantChainState) {
const generateResponse = async (state: AssistantChainState) => {
const workspace = await fetchWorkspaceDetails(state.workspaceId);
const formattedChatHistory = formatChatHistory(state.chatHistory);
const formattedContext = formatContextDocuments(state.topContext);
@@ -216,9 +220,9 @@ async function generateResponse(state: AssistantChainState) {
});
return { finalAnswer: result.answer, citations: result.citations };
}
};
async function fetchDatabaseContext(state: AssistantChainState) {
const fetchDatabaseContext = async (state: AssistantChainState) => {
const databases = await database
.selectFrom('nodes as n')
.innerJoin('collaborations as c', 'c.node_id', 'n.root_id')
@@ -232,7 +236,7 @@ async function fetchDatabaseContext(state: AssistantChainState) {
const databaseContext: DatabaseContextItem[] = await Promise.all(
databases.map(async (db) => {
const dbNode = db as SelectNode;
const sampleRecords = await recordsRetrievalService.retrieveByFilters(
const sampleRecords = await retrieveByFilters(
db.id,
state.workspaceId,
state.userId,
@@ -261,9 +265,9 @@ async function fetchDatabaseContext(state: AssistantChainState) {
);
return { databaseContext };
}
};
async function generateDatabaseFilterAttributes(state: AssistantChainState) {
const generateDatabaseFilterAttributes = async (state: AssistantChainState) => {
if (state.intent === 'no_context' || !state.databaseContext.length) {
return {
databaseFilters: { shouldFilter: false, filters: [] } as DatabaseFilters,
@@ -275,7 +279,7 @@ async function generateDatabaseFilterAttributes(state: AssistantChainState) {
});
return { databaseFilters };
}
};
const assistantResponseChain = new StateGraph(ResponseState)
.addNode('generateRewrittenQuery', generateRewrittenQuery)
@@ -303,15 +307,18 @@ const assistantResponseChain = new StateGraph(ResponseState)
.addEdge('generateNoContextResponse', '__end__')
.compile();
const langfuseCallback = configuration.ai.langfuse.enabled
? new CallbackHandler({
publicKey: configuration.ai.langfuse.publicKey,
secretKey: configuration.ai.langfuse.secretKey,
baseUrl: configuration.ai.langfuse.baseUrl,
})
: undefined;
const langfuseCallback =
config.ai.enabled && config.ai.langfuse.enabled
? new CallbackHandler({
publicKey: config.ai.langfuse.publicKey,
secretKey: config.ai.langfuse.secretKey,
baseUrl: config.ai.langfuse.baseUrl,
})
: undefined;
async function getFullContextNodeIds(selectedIds: string[]): Promise<string[]> {
const getFullContextNodeIds = async (
selectedIds: string[]
): Promise<string[]> => {
const fullSet = new Set<string>();
for (const id of selectedIds) {
fullSet.add(id);
@@ -324,11 +331,11 @@ async function getFullContextNodeIds(selectedIds: string[]): Promise<string[]> {
}
return Array.from(fullSet);
}
};
export async function runAssistantResponseChain(
export const runAssistantResponseChain = async (
input: AssistantInput
): Promise<AssistantResponse> {
): Promise<AssistantResponse> => {
let fullContextNodeIds: string[] = [];
if (input.selectedContextNodeIds && input.selectedContextNodeIds.length > 0) {
fullContextNodeIds = await getFullContextNodeIds(
@@ -349,4 +356,4 @@ export async function runAssistantResponseChain(
callbacks,
});
return { finalAnswer: result.finalAnswer, citations: result.citations };
}
};

View File

@@ -1,8 +1,8 @@
import { RecursiveCharacterTextSplitter } from 'langchain/text_splitter';
import type { NodeType } from '@colanode/core';
import { configuration } from '@/lib/configuration';
import { enrichChunk } from '@/services/llm-service';
import { config } from '@/lib/config';
import { enrichChunk } from '@/lib/ai/llms';
import { TextChunk } from '@/types/chunking';
export const chunkText = async (
@@ -10,8 +10,12 @@ export const chunkText = async (
existingChunks: TextChunk[],
nodeType: NodeType
): Promise<TextChunk[]> => {
const chunkSize = configuration.ai.chunking.defaultChunkSize;
const chunkOverlap = configuration.ai.chunking.defaultOverlap;
if (!config.ai.enabled) {
return [];
}
const chunkSize = config.ai.chunking.defaultChunkSize;
const chunkOverlap = config.ai.chunking.defaultOverlap;
const splitter = new RecursiveCharacterTextSplitter({
chunkSize,
chunkOverlap,
@@ -22,7 +26,7 @@ export const chunkText = async (
.map((doc) => ({ text: doc.pageContent }))
.filter((c) => c.text.trim().length > 5);
if (!configuration.ai.chunking.enhanceWithContext) {
if (!config.ai.chunking.enhanceWithContext) {
return chunks;
}

View File

@@ -0,0 +1,224 @@
import { OpenAIEmbeddings } from '@langchain/openai';
import { Document } from '@langchain/core/documents';
import { sql } from 'kysely';
import { database } from '@/data/database';
import { config } from '@/lib/config';
import { SearchResult } from '@/types/retrieval';
import { RewrittenQuery } from '@/types/llm';
import { combineAndScoreSearchResults } from '@/lib/ai/utils';
const embeddings = config.ai.enabled
? new OpenAIEmbeddings({
apiKey: config.ai.embedding.apiKey,
modelName: config.ai.embedding.modelName,
dimensions: config.ai.embedding.dimensions,
})
: undefined;
export const retrieveDocuments = async (
rewrittenQuery: RewrittenQuery,
workspaceId: string,
userId: string,
limit?: number,
contextNodeIds?: string[]
): Promise<Document[]> => {
if (!config.ai.enabled || !embeddings) {
return [];
}
const maxResults = limit ?? config.ai.retrieval.hybridSearch.maxResults;
const embedding = await embeddings.embedQuery(rewrittenQuery.semanticQuery);
if (!embedding) {
return [];
}
const [semanticResults, keywordResults] = await Promise.all([
semanticSearchDocuments(
embedding,
workspaceId,
userId,
maxResults,
contextNodeIds
),
keywordSearchDocuments(
rewrittenQuery.keywordQuery,
workspaceId,
userId,
maxResults,
contextNodeIds
),
]);
return combineSearchResults(semanticResults, keywordResults);
};
const semanticSearchDocuments = async (
embedding: number[],
workspaceId: string,
userId: string,
limit: number,
contextNodeIds?: string[]
): Promise<SearchResult[]> => {
let queryBuilder = database
.selectFrom('document_embeddings')
.innerJoin('documents', 'documents.id', 'document_embeddings.document_id')
.innerJoin('nodes', 'nodes.id', 'documents.id')
.innerJoin('collaborations', (join) =>
join
.onRef('collaborations.node_id', '=', 'nodes.root_id')
.on('collaborations.collaborator_id', '=', sql.lit(userId))
.on('collaborations.deleted_at', 'is', null)
)
.select([
'document_embeddings.document_id as id',
'document_embeddings.text',
'document_embeddings.summary',
'documents.created_at',
'documents.created_by',
'document_embeddings.chunk as chunk_index',
sql<number>`${sql.raw(`'[${embedding}]'::vector`)} <=> document_embeddings.embedding_vector`.as(
'similarity'
),
])
.where('document_embeddings.workspace_id', '=', workspaceId);
if (contextNodeIds && contextNodeIds.length > 0) {
queryBuilder = queryBuilder.where(
'document_embeddings.document_id',
'in',
contextNodeIds
);
}
const results = await queryBuilder
.groupBy([
'document_embeddings.document_id',
'document_embeddings.text',
'document_embeddings.summary',
'documents.created_at',
'documents.created_by',
'document_embeddings.chunk',
])
.orderBy('similarity', 'asc')
.limit(limit)
.execute();
return results.map((result) => ({
id: result.id,
text: result.text,
summary: result.summary,
score: result.similarity,
type: 'semantic',
createdAt: result.created_at,
createdBy: result.created_by,
chunkIndex: result.chunk_index,
}));
};
const keywordSearchDocuments = async (
query: string,
workspaceId: string,
userId: string,
limit: number,
contextNodeIds?: string[]
): Promise<SearchResult[]> => {
let queryBuilder = database
.selectFrom('document_embeddings')
.innerJoin('documents', 'documents.id', 'document_embeddings.document_id')
.innerJoin('nodes', 'nodes.id', 'documents.id')
.innerJoin('collaborations', (join) =>
join
.onRef('collaborations.node_id', '=', 'nodes.root_id')
.on('collaborations.collaborator_id', '=', sql.lit(userId))
.on('collaborations.deleted_at', 'is', null)
)
.select([
'document_embeddings.document_id as id',
'document_embeddings.text',
'document_embeddings.summary',
'documents.created_at',
'documents.created_by',
'document_embeddings.chunk as chunk_index',
sql<number>`ts_rank(document_embeddings.search_vector, websearch_to_tsquery('english', ${query}))`.as(
'rank'
),
])
.where('document_embeddings.workspace_id', '=', workspaceId)
.where(
() =>
sql`document_embeddings.search_vector @@ websearch_to_tsquery('english', ${query})`
);
if (contextNodeIds && contextNodeIds.length > 0) {
queryBuilder = queryBuilder.where(
'document_embeddings.document_id',
'in',
contextNodeIds
);
}
const results = await queryBuilder
.groupBy([
'document_embeddings.document_id',
'document_embeddings.text',
'documents.created_at',
'documents.created_by',
'document_embeddings.chunk',
'document_embeddings.summary',
])
.orderBy('rank', 'desc')
.limit(limit)
.execute();
return results.map((result) => ({
id: result.id,
text: result.text,
summary: result.summary,
score: result.rank,
type: 'keyword',
createdAt: result.created_at,
createdBy: result.created_by,
chunkIndex: result.chunk_index,
}));
};
const combineSearchResults = async (
semanticResults: SearchResult[],
keywordResults: SearchResult[]
): Promise<Document[]> => {
if (!config.ai.enabled || !embeddings) {
return [];
}
const { semanticSearchWeight, keywordSearchWeight } =
config.ai.retrieval.hybridSearch;
const authorIds = Array.from(
new Set(
[...semanticResults, ...keywordResults]
.map((r) => r.createdBy)
.filter((id): id is string => id !== undefined && id !== null)
)
);
const authors =
authorIds.length > 0
? await database
.selectFrom('users')
.select(['id', 'name'])
.where('id', 'in', authorIds)
.execute()
: [];
const authorMap = new Map(authors.map((author) => [author.id, author]));
return combineAndScoreSearchResults(
semanticResults,
keywordResults,
semanticSearchWeight,
keywordSearchWeight,
authorMap
);
};

View File

@@ -2,7 +2,7 @@ import { MessageAttributes } from '@colanode/core';
import { redis } from '@/data/redis';
import { SelectNode } from '@/data/schema';
import { configuration } from '@/lib/configuration';
import { config } from '@/lib/config';
import { jobService } from '@/services/job-service';
export const fetchEmbeddingCursor = async (
@@ -28,7 +28,7 @@ export const deleteEmbeddingCursor = async (cursorId: string) => {
};
export const scheduleNodeEmbedding = async (node: SelectNode) => {
if (!configuration.ai.enabled) {
if (!config.ai.enabled) {
return;
}
@@ -45,7 +45,7 @@ export const scheduleNodeEmbedding = async (node: SelectNode) => {
// Only add delay for non-message nodes
if (node.type !== 'message') {
jobOptions.delay = configuration.ai.nodeEmbeddingDelay;
jobOptions.delay = config.ai.nodeEmbeddingDelay;
}
await jobService.addJob(
@@ -58,7 +58,7 @@ export const scheduleNodeEmbedding = async (node: SelectNode) => {
};
export const scheduleDocumentEmbedding = async (documentId: string) => {
if (!configuration.ai.enabled) {
if (!config.ai.enabled) {
return;
}
@@ -69,7 +69,7 @@ export const scheduleDocumentEmbedding = async (documentId: string) => {
},
{
jobId: `embed_document:${documentId}`,
delay: configuration.ai.documentEmbeddingDelay,
delay: config.ai.documentEmbeddingDelay,
}
);
};

View File

@@ -4,7 +4,7 @@ import { StringOutputParser } from '@langchain/core/output_parsers';
import { Document } from '@langchain/core/documents';
import { NodeType, RecordNode } from '@colanode/core';
import { configuration } from '@/lib/configuration';
import { config } from '@/lib/config';
import {
rerankedDocumentsSchema,
RerankedDocuments,
@@ -24,17 +24,16 @@ import {
noContextPrompt,
databaseFilterPrompt,
chunkSummarizationPrompt,
} from '@/lib/llm-prompts';
} from '@/lib/ai/prompts';
const getChatModel = (
task: keyof typeof configuration.ai.models
): ChatOpenAI | ChatGoogleGenerativeAI => {
const modelConfig = configuration.ai.models[task];
if (!configuration.ai.enabled) {
const getChatModel = (task: string): ChatOpenAI | ChatGoogleGenerativeAI => {
if (!config.ai.enabled) {
throw new Error('AI is disabled.');
}
const providerConfig = configuration.ai.providers[modelConfig.provider];
const modelConfig = config.ai.models[task as keyof typeof config.ai.models];
const providerConfig = config.ai.providers[modelConfig.provider];
if (!providerConfig.enabled) {
throw new Error(`${modelConfig.provider} provider is disabled.`);
}

View File

@@ -0,0 +1,222 @@
import { OpenAIEmbeddings } from '@langchain/openai';
import { Document } from '@langchain/core/documents';
import { sql } from 'kysely';
import { database } from '@/data/database';
import { config } from '@/lib/config';
import { SearchResult } from '@/types/retrieval';
import { RewrittenQuery } from '@/types/llm';
import { combineAndScoreSearchResults } from '@/lib/ai/utils';
const embeddings = config.ai.enabled
? new OpenAIEmbeddings({
apiKey: config.ai.embedding.apiKey,
modelName: config.ai.embedding.modelName,
dimensions: config.ai.embedding.dimensions,
})
: undefined;
export const retrieveNodes = async (
rewrittenQuery: RewrittenQuery,
workspaceId: string,
userId: string,
limit?: number,
contextNodeIds?: string[]
): Promise<Document[]> => {
if (!config.ai.enabled || !embeddings) {
return [];
}
const maxResults = limit ?? config.ai.retrieval.hybridSearch.maxResults;
const embedding = await embeddings.embedQuery(rewrittenQuery.semanticQuery);
if (!embedding) {
return [];
}
const [semanticResults, keywordResults] = await Promise.all([
semanticSearchNodes(
embedding,
workspaceId,
userId,
maxResults,
contextNodeIds
),
keywordSearchNodes(
rewrittenQuery.keywordQuery,
workspaceId,
userId,
maxResults,
contextNodeIds
),
]);
return combineSearchResults(semanticResults, keywordResults);
};
const semanticSearchNodes = async (
embedding: number[],
workspaceId: string,
userId: string,
limit: number,
contextNodeIds?: string[]
): Promise<SearchResult[]> => {
let queryBuilder = database
.selectFrom('node_embeddings')
.innerJoin('nodes', 'nodes.id', 'node_embeddings.node_id')
.innerJoin('collaborations', (join) =>
join
.onRef('collaborations.node_id', '=', 'nodes.root_id')
.on('collaborations.collaborator_id', '=', sql.lit(userId))
.on('collaborations.deleted_at', 'is', null)
)
.select([
'node_embeddings.node_id as id',
'node_embeddings.text',
'node_embeddings.summary',
'nodes.created_at',
'nodes.created_by',
'node_embeddings.chunk as chunk_index',
sql<number>`${sql.raw(`'[${embedding}]'::vector`)} <=> node_embeddings.embedding_vector`.as(
'similarity'
),
])
.where('node_embeddings.workspace_id', '=', workspaceId);
if (contextNodeIds && contextNodeIds.length > 0) {
queryBuilder = queryBuilder.where(
'node_embeddings.node_id',
'in',
contextNodeIds
);
}
const results = await queryBuilder
.groupBy([
'node_embeddings.node_id',
'node_embeddings.text',
'nodes.created_at',
'nodes.created_by',
'node_embeddings.chunk',
'node_embeddings.summary',
])
.orderBy('similarity', 'asc')
.limit(limit)
.execute();
return results.map((result) => ({
id: result.id,
text: result.text,
summary: result.summary,
score: result.similarity,
type: 'semantic',
createdAt: result.created_at,
createdBy: result.created_by,
chunkIndex: result.chunk_index,
}));
};
const keywordSearchNodes = async (
query: string,
workspaceId: string,
userId: string,
limit: number,
contextNodeIds?: string[]
): Promise<SearchResult[]> => {
let queryBuilder = database
.selectFrom('node_embeddings')
.innerJoin('nodes', 'nodes.id', 'node_embeddings.node_id')
.innerJoin('collaborations', (join) =>
join
.onRef('collaborations.node_id', '=', 'nodes.root_id')
.on('collaborations.collaborator_id', '=', sql.lit(userId))
.on('collaborations.deleted_at', 'is', null)
)
.select([
'node_embeddings.node_id as id',
'node_embeddings.text',
'node_embeddings.summary',
'nodes.created_at',
'nodes.created_by',
'node_embeddings.chunk as chunk_index',
sql<number>`ts_rank(node_embeddings.search_vector, websearch_to_tsquery('english', ${query}))`.as(
'rank'
),
])
.where('node_embeddings.workspace_id', '=', workspaceId)
.where(
() =>
sql`node_embeddings.search_vector @@ websearch_to_tsquery('english', ${query})`
);
if (contextNodeIds && contextNodeIds.length > 0) {
queryBuilder = queryBuilder.where(
'node_embeddings.node_id',
'in',
contextNodeIds
);
}
const results = await queryBuilder
.groupBy([
'node_embeddings.node_id',
'node_embeddings.text',
'nodes.created_at',
'nodes.created_by',
'node_embeddings.chunk',
'node_embeddings.summary',
])
.orderBy('rank', 'desc')
.limit(limit)
.execute();
return results.map((result) => ({
id: result.id,
text: result.text,
summary: result.summary,
score: result.rank,
type: 'keyword',
createdAt: result.created_at,
createdBy: result.created_by,
chunkIndex: result.chunk_index,
}));
};
const combineSearchResults = async (
semanticResults: SearchResult[],
keywordResults: SearchResult[]
): Promise<Document[]> => {
if (!config.ai.enabled || !embeddings) {
return [];
}
const { semanticSearchWeight, keywordSearchWeight } =
config.ai.retrieval.hybridSearch;
const authorIds = Array.from(
new Set(
[...semanticResults, ...keywordResults]
.map((r) => r.createdBy)
.filter((id): id is string => id !== undefined && id !== null)
)
);
const authors =
authorIds.length > 0
? await database
.selectFrom('users')
.select(['id', 'name'])
.where('id', 'in', authorIds)
.execute()
: [];
const authorMap = new Map(authors.map((author) => [author.id, author]));
return combineAndScoreSearchResults(
semanticResults,
keywordResults,
semanticSearchWeight,
keywordSearchWeight,
authorMap
);
};

View File

@@ -0,0 +1,26 @@
import { z } from 'zod';
export const accountVerificationTypeSchema = z.enum([
'automatic',
'manual',
'email',
]);
export type AccountVerificationType = z.infer<
typeof accountVerificationTypeSchema
>;
export const accountConfigSchema = z.object({
verificationType: accountVerificationTypeSchema.default('manual'),
otpTimeout: z.coerce.number().default(600),
allowGoogleLogin: z.boolean().default(false),
});
export type AccountConfig = z.infer<typeof accountConfigSchema>;
export const readAccountConfigVariables = () => {
return {
verificationType: process.env.ACCOUNT_VERIFICATION_TYPE,
otpTimeout: process.env.ACCOUNT_OTP_TIMEOUT,
allowGoogleLogin: process.env.ACCOUNT_ALLOW_GOOGLE_LOGIN === 'true',
};
};

View File

@@ -0,0 +1,173 @@
import { z } from 'zod';
export const aiProviderSchema = z.enum(['openai', 'google']);
export type AiProvider = z.infer<typeof aiProviderSchema>;
export const aiProviderConfigSchema = z.object({
apiKey: z.string().default(''),
enabled: z
.preprocess((val) => val === 'true', z.boolean().optional())
.default(false),
});
export type AiProviderConfig = z.infer<typeof aiProviderConfigSchema>;
export const aiModelConfigSchema = z.object({
provider: aiProviderSchema.default('openai'),
modelName: z.string().default('gpt-4o'),
temperature: z.coerce.number().default(0.5),
});
export type AiModelConfig = z.infer<typeof aiModelConfigSchema>;
export const chunkingConfigSchema = z.object({
defaultChunkSize: z.coerce.number().default(1000),
defaultOverlap: z.coerce.number().default(200),
enhanceWithContext: z.preprocess(
(val) => val === 'true',
z.boolean().default(false)
),
});
export type ChunkingConfig = z.infer<typeof chunkingConfigSchema>;
export const retrievalConfigSchema = z.object({
hybridSearch: z.object({
semanticSearchWeight: z.coerce.number().default(0.7),
keywordSearchWeight: z.coerce.number().default(0.3),
maxResults: z.coerce.number().default(20),
}),
});
export type RetrievalConfig = z.infer<typeof retrievalConfigSchema>;
export const aiConfigSchema = z.discriminatedUnion('enabled', [
z.object({
enabled: z.literal(true),
nodeEmbeddingDelay: z.coerce.number().default(5000),
documentEmbeddingDelay: z.coerce.number().default(10000),
providers: z.object({
openai: aiProviderConfigSchema,
google: aiProviderConfigSchema,
}),
langfuse: z.object({
enabled: z.preprocess(
(val) => val === 'true',
z.boolean().default(false)
),
publicKey: z.string().default(''),
secretKey: z.string().default(''),
baseUrl: z.string().default('https://cloud.langfuse.com'),
}),
models: z.object({
queryRewrite: aiModelConfigSchema,
response: aiModelConfigSchema,
rerank: aiModelConfigSchema,
summarization: aiModelConfigSchema,
contextEnhancer: aiModelConfigSchema,
noContext: aiModelConfigSchema,
intentRecognition: aiModelConfigSchema,
databaseFilter: aiModelConfigSchema,
}),
embedding: z.object({
provider: aiProviderSchema.default('openai'),
modelName: z.string().default('text-embedding-3-large'),
dimensions: z.coerce.number().default(2000),
apiKey: z.string().default(''),
batchSize: z.coerce.number().default(50),
}),
chunking: chunkingConfigSchema,
retrieval: retrievalConfigSchema,
}),
z.object({
enabled: z.literal(false),
}),
]);
export type AiConfig = z.infer<typeof aiConfigSchema>;
export const readAiConfigVariables = () => {
return {
enabled: process.env.AI_ENABLED === 'true',
nodeEmbeddingDelay: process.env.AI_NODE_EMBEDDING_DELAY,
documentEmbeddingDelay: process.env.AI_DOCUMENT_EMBEDDING_DELAY,
providers: {
openai: {
apiKey: process.env.OPENAI_API_KEY,
enabled: process.env.OPENAI_ENABLED,
},
google: {
apiKey: process.env.GOOGLE_API_KEY,
enabled: process.env.GOOGLE_ENABLED,
},
},
langfuse: {
enabled: process.env.LANGFUSE_ENABLED,
publicKey: process.env.LANGFUSE_PUBLIC_KEY,
secretKey: process.env.LANGFUSE_SECRET_KEY,
baseUrl: process.env.LANGFUSE_BASE_URL,
},
models: {
queryRewrite: {
provider: process.env.QUERY_REWRITE_PROVIDER,
modelName: process.env.QUERY_REWRITE_MODEL,
temperature: process.env.QUERY_REWRITE_TEMPERATURE,
},
response: {
provider: process.env.RESPONSE_PROVIDER,
modelName: process.env.RESPONSE_MODEL,
temperature: process.env.RESPONSE_TEMPERATURE,
},
rerank: {
provider: process.env.RERANK_PROVIDER,
modelName: process.env.RERANK_MODEL,
temperature: process.env.RERANK_TEMPERATURE,
},
summarization: {
provider: process.env.SUMMARIZATION_PROVIDER,
modelName: process.env.SUMMARIZATION_MODEL,
temperature: process.env.SUMMARIZATION_TEMPERATURE,
},
contextEnhancer: {
provider: process.env.CHUNK_CONTEXT_PROVIDER,
modelName: process.env.CHUNK_CONTEXT_MODEL,
temperature: process.env.CHUNK_CONTEXT_TEMPERATURE,
},
noContext: {
provider: process.env.NO_CONTEXT_PROVIDER,
modelName: process.env.NO_CONTEXT_MODEL,
temperature: process.env.NO_CONTEXT_TEMPERATURE,
},
intentRecognition: {
provider: process.env.INTENT_RECOGNITION_PROVIDER,
modelName: process.env.INTENT_RECOGNITION_MODEL,
temperature: process.env.INTENT_RECOGNITION_TEMPERATURE,
},
databaseFilter: {
provider: process.env.DATABASE_FILTER_PROVIDER,
modelName: process.env.DATABASE_FILTER_MODEL,
temperature: process.env.DATABASE_FILTER_TEMPERATURE,
},
},
embedding: {
provider: process.env.EMBEDDING_PROVIDER,
modelName: process.env.EMBEDDING_MODEL,
dimensions: process.env.EMBEDDING_DIMENSIONS,
apiKey: process.env.EMBEDDING_API_KEY,
batchSize: process.env.EMBEDDING_BATCH_SIZE,
},
chunking: {
defaultChunkSize: process.env.CHUNK_DEFAULT_CHUNK_SIZE,
defaultOverlap: process.env.CHUNK_DEFAULT_OVERLAP,
enhanceWithContext: process.env.CHUNK_ENHANCE_WITH_CONTEXT,
},
retrieval: {
hybridSearch: {
semanticSearchWeight:
process.env.RETRIEVAL_HYBRID_SEARCH_SEMANTIC_WEIGHT,
keywordSearchWeight: process.env.RETRIEVAL_HYBRID_SEARCH_KEYWORD_WEIGHT,
maxResults: process.env.RETRIEVAL_HYBRID_SEARCH_MAX_RESULTS,
},
},
};
};

View File

@@ -0,0 +1,59 @@
import { z } from 'zod';
import { accountConfigSchema, readAccountConfigVariables } from './account';
import { readServerConfigVariables, serverConfigSchema } from './server';
import { readUserConfigVariables, userConfigSchema } from './user';
import { postgresConfigSchema, readPostgresConfigVariables } from './postgres';
import {
readAvatarsS3ConfigVariables,
readFilesS3ConfigVariables,
s3ConfigSchema,
} from './s3';
import { aiConfigSchema, readAiConfigVariables } from './ai';
import { readSmtpConfigVariables, smtpConfigSchema } from './smtp';
import { readRedisConfigVariables, redisConfigSchema } from './redis';
const configSchema = z.object({
server: serverConfigSchema,
account: accountConfigSchema,
user: userConfigSchema,
postgres: postgresConfigSchema,
redis: redisConfigSchema,
avatarS3: s3ConfigSchema,
fileS3: s3ConfigSchema,
smtp: smtpConfigSchema,
ai: aiConfigSchema,
});
export type Configuration = z.infer<typeof configSchema>;
const readConfigVariables = (): Configuration => {
try {
const input = {
server: readServerConfigVariables(),
account: readAccountConfigVariables(),
user: readUserConfigVariables(),
postgres: readPostgresConfigVariables(),
redis: readRedisConfigVariables(),
avatarS3: readAvatarsS3ConfigVariables(),
fileS3: readFilesS3ConfigVariables(),
smtp: readSmtpConfigVariables(),
ai: readAiConfigVariables(),
};
return configSchema.parse(input);
} catch (error) {
if (error instanceof z.ZodError) {
console.error('Configuration validation error:');
error.errors.forEach((err) => {
console.error(`- ${err.path.join('.')}: ${err.message}`);
});
} else {
console.error('Configuration validation error:', error);
}
process.exit(1);
}
};
export const config = readConfigVariables();

View File

@@ -0,0 +1,31 @@
import { z } from 'zod';
export const postgresConfigSchema = z.object({
url: z.string({
required_error:
'POSTGRES_URL is required (e.g. postgres://postgres:postgres@localhost:5432/postgres)',
}),
ssl: z.object({
rejectUnauthorized: z.preprocess(
(val) => (val === undefined ? undefined : val === 'true'),
z.boolean().optional()
),
ca: z.string().optional(),
key: z.string().optional(),
cert: z.string().optional(),
}),
});
export type PostgresConfig = z.infer<typeof postgresConfigSchema>;
export const readPostgresConfigVariables = () => {
return {
url: process.env.POSTGRES_URL,
ssl: {
rejectUnauthorized: process.env.POSTGRES_SSL_REJECT_UNAUTHORIZED,
ca: process.env.POSTGRES_SSL_CA,
key: process.env.POSTGRES_SSL_KEY,
cert: process.env.POSTGRES_SSL_CERT,
},
};
};

View File

@@ -0,0 +1,25 @@
import { z } from 'zod';
export const redisConfigSchema = z.object({
url: z.string({ required_error: 'REDIS_URL is required' }),
db: z.coerce.number().default(0),
jobs: z.object({
name: z.string().optional().default('jobs'),
prefix: z.string().optional().default('colanode'),
}),
eventsChannel: z.string().optional().default('events'),
});
export type RedisConfig = z.infer<typeof redisConfigSchema>;
export const readRedisConfigVariables = () => {
return {
url: process.env.REDIS_URL,
db: process.env.REDIS_DB,
jobs: {
name: process.env.REDIS_JOBS_NAME,
prefix: process.env.REDIS_JOBS_PREFIX,
},
eventsChannel: process.env.REDIS_EVENTS_CHANNEL,
};
};

View File

@@ -0,0 +1,31 @@
import { z } from 'zod';
export const s3ConfigSchema = z.object({
endpoint: z.string({ required_error: 'S3_ENDPOINT is required' }),
accessKey: z.string({ required_error: 'S3_ACCESS_KEY is required' }),
secretKey: z.string({ required_error: 'S3_SECRET_KEY is required' }),
bucketName: z.string({ required_error: 'S3_BUCKET_NAME is required' }),
region: z.string({ required_error: 'S3_REGION is required' }),
});
export type S3Config = z.infer<typeof s3ConfigSchema>;
export const readFilesS3ConfigVariables = () => {
return {
endpoint: process.env.S3_FILES_ENDPOINT,
accessKey: process.env.S3_FILES_ACCESS_KEY,
secretKey: process.env.S3_FILES_SECRET_KEY,
bucketName: process.env.S3_FILES_BUCKET_NAME,
region: process.env.S3_FILES_REGION,
};
};
export const readAvatarsS3ConfigVariables = () => {
return {
endpoint: process.env.S3_AVATARS_ENDPOINT,
accessKey: process.env.S3_AVATARS_ACCESS_KEY,
secretKey: process.env.S3_AVATARS_SECRET_KEY,
bucketName: process.env.S3_AVATARS_BUCKET_NAME,
region: process.env.S3_AVATARS_REGION,
};
};

View File

@@ -0,0 +1,58 @@
import { z } from 'zod';
import fs from 'fs';
const serverModeSchema = z.enum(['standalone', 'cluster']);
export type ServerMode = z.infer<typeof serverModeSchema>;
const buildInfoSchema = z.object({
version: z.string(),
sha: z.string(),
});
export type BuildInfo = z.infer<typeof buildInfoSchema>;
const parseBuildInfo = (): BuildInfo => {
const defaultBuildInfo: BuildInfo = {
version: 'dev',
sha: 'dev',
};
if (!fs.existsSync('/app/build.json')) {
return defaultBuildInfo;
}
const json = fs.readFileSync('/app/build.json', 'utf8');
if (!json || json.length === 0) {
return defaultBuildInfo;
}
try {
return buildInfoSchema.parse(json);
} catch (error) {
console.error('Failed to parse build info:', error);
return defaultBuildInfo;
}
};
export const serverConfigSchema = z.object({
version: z.string().default('dev'),
sha: z.string().default('dev'),
name: z.string().default('Colanode Server'),
avatar: z.string().optional(),
mode: serverModeSchema.default('standalone'),
});
export type ServerConfig = z.infer<typeof serverConfigSchema>;
export const readServerConfigVariables = () => {
const buildInfo = parseBuildInfo();
return {
version: buildInfo.version,
sha: buildInfo.sha,
name: process.env.SERVER_NAME,
avatar: process.env.SERVER_AVATAR,
mode: process.env.SERVER_MODE,
};
};

View File

@@ -0,0 +1,44 @@
import { z } from 'zod';
export const smtpConfigSchema = z.discriminatedUnion('enabled', [
z.object({
enabled: z.literal(true),
host: z.string({
required_error: 'SMTP_HOST is required when SMTP is enabled',
}),
port: z.coerce.number().default(587),
secure: z.preprocess((val) => val === 'true', z.boolean().default(false)),
user: z.string({
required_error: 'SMTP_USER is required when SMTP is enabled',
}),
password: z.string({
required_error: 'SMTP_PASSWORD is required when SMTP is enabled',
}),
from: z.object({
email: z.string({
required_error: 'SMTP_EMAIL_FROM is required when SMTP is enabled',
}),
name: z.string().default('Colanode'),
}),
}),
z.object({
enabled: z.literal(false),
}),
]);
export type SmtpConfig = z.infer<typeof smtpConfigSchema>;
export const readSmtpConfigVariables = () => {
return {
enabled: process.env.SMTP_ENABLED === 'true',
host: process.env.SMTP_HOST,
port: process.env.SMTP_PORT,
secure: process.env.SMTP_SECURE === 'true',
user: process.env.SMTP_USER,
password: process.env.SMTP_PASSWORD,
from: {
email: process.env.SMTP_EMAIL_FROM,
name: process.env.SMTP_NAME,
},
};
};

View File

@@ -0,0 +1,21 @@
import { z } from 'zod';
export const userConfigSchema = z.object({
storageLimit: z.preprocess(
(val) => val && BigInt(val as string),
z.bigint().default(10737418240n)
),
maxFileSize: z.preprocess(
(val) => val && BigInt(val as string),
z.bigint().default(104857600n)
),
});
export type UserConfig = z.infer<typeof userConfigSchema>;
export const readUserConfigVariables = () => {
return {
storageLimit: process.env.USER_STORAGE_LIMIT,
maxFileSize: process.env.USER_MAX_FILE_SIZE,
};
};

View File

@@ -1,371 +0,0 @@
import fs from 'fs';
interface BuildInfo {
version: string;
sha: string;
}
export interface Configuration {
server: ServerConfiguration;
account: AccountConfiguration;
user: UserConfiguration;
postgres: PostgresConfiguration;
redis: RedisConfiguration;
avatarS3: S3Configuration;
fileS3: S3Configuration;
smtp: SmtpConfiguration;
ai: AiConfiguration;
}
export type ServerMode = 'standalone' | 'cluster';
export interface ServerConfiguration {
version: string;
sha: string;
name: string;
avatar: string;
mode: ServerMode;
}
export type AccountVerificationType = 'automatic' | 'manual' | 'email';
export interface AccountConfiguration {
verificationType: AccountVerificationType;
otpTimeout: number;
allowGoogleLogin: boolean;
}
export interface UserConfiguration {
storageLimit: bigint;
maxFileSize: bigint;
}
export interface PostgresConfiguration {
url: string;
ssl: {
rejectUnauthorized?: boolean;
ca?: string;
key?: string;
cert?: string;
};
}
export interface RedisConfiguration {
url: string;
db: number;
jobs: {
prefix: string;
name: string;
};
eventsChannel: string;
}
export interface S3Configuration {
endpoint: string;
accessKey: string;
secretKey: string;
bucketName: string;
region: string;
}
export interface SmtpConfiguration {
host: string;
port: number;
user: string;
password: string;
secure: boolean;
from: {
email: string;
name: string;
};
}
export type AiProvider = 'openai' | 'google';
export interface AiProviderConfiguration {
apiKey: string;
enabled?: boolean;
}
export interface AiModelConfiguration {
provider: AiProvider;
modelName: string;
temperature: number;
}
export interface AiConfiguration {
enabled: boolean;
nodeEmbeddingDelay: number;
documentEmbeddingDelay: number;
providers: {
openai: AiProviderConfiguration;
google: AiProviderConfiguration;
};
langfuse: {
enabled: boolean;
publicKey: string;
secretKey: string;
baseUrl: string;
};
models: {
queryRewrite: AiModelConfiguration;
response: AiModelConfiguration;
rerank: AiModelConfiguration;
summarization: AiModelConfiguration;
contextEnhancer: AiModelConfiguration;
noContext: AiModelConfiguration;
intentRecognition: AiModelConfiguration;
databaseFilter: AiModelConfiguration;
};
embedding: {
provider: AiProvider;
apiKey: string;
modelName: string;
dimensions: number;
batchSize: number;
};
chunking: ChunkingConfiguration;
retrieval: RetrievalConfiguration;
}
export interface ChunkingConfiguration {
defaultChunkSize: number;
defaultOverlap: number;
enhanceWithContext: boolean;
}
export interface RetrievalConfiguration {
hybridSearch: {
semanticSearchWeight: number;
keywordSearchWeight: number;
maxResults: number;
};
}
const getRequiredEnv = (env: string): string => {
const value = process.env[env];
if (!value) {
throw new Error(`${env} is not set`);
}
return value;
};
const getOptionalEnv = (env: string): string | undefined => {
return process.env[env];
};
const parseBuildInfo = (): BuildInfo => {
const defaultBuildInfo: BuildInfo = {
version: 'dev',
sha: 'dev',
};
if (!fs.existsSync('/app/build.json')) {
return defaultBuildInfo;
}
const json = fs.readFileSync('/app/build.json', 'utf8');
if (!json || json.length === 0) {
return defaultBuildInfo;
}
const buildInfo = JSON.parse(json);
if (!buildInfo.version || !buildInfo.sha) {
return defaultBuildInfo;
}
return {
version: buildInfo.version,
sha: buildInfo.sha,
};
};
const buildInfo: BuildInfo = parseBuildInfo();
export const configuration: Configuration = {
server: {
version: buildInfo.version,
sha: buildInfo.sha,
name: getRequiredEnv('SERVER_NAME'),
avatar: getOptionalEnv('SERVER_AVATAR') || '',
mode: (getOptionalEnv('SERVER_MODE') as ServerMode) || 'standalone',
},
account: {
verificationType:
(getOptionalEnv(
'ACCOUNT_VERIFICATION_TYPE'
) as AccountVerificationType) || 'manual',
otpTimeout: parseInt(getOptionalEnv('ACCOUNT_OTP_TIMEOUT') || '600'),
allowGoogleLogin: getOptionalEnv('ACCOUNT_ALLOW_GOOGLE_LOGIN') === 'true',
},
user: {
storageLimit: BigInt(getOptionalEnv('USER_STORAGE_LIMIT') || '10737418240'),
maxFileSize: BigInt(getOptionalEnv('USER_MAX_FILE_SIZE') || '104857600'),
},
postgres: {
url: getRequiredEnv('POSTGRES_URL'),
ssl: {
rejectUnauthorized:
getOptionalEnv('POSTGRES_SSL_REJECT_UNAUTHORIZED') === undefined
? undefined
: getOptionalEnv('POSTGRES_SSL_REJECT_UNAUTHORIZED') === 'true',
ca: getOptionalEnv('POSTGRES_SSL_CA'),
key: getOptionalEnv('POSTGRES_SSL_KEY'),
cert: getOptionalEnv('POSTGRES_SSL_CERT'),
},
},
redis: {
url: getRequiredEnv('REDIS_URL'),
db: parseInt(getOptionalEnv('REDIS_DB') || '0'),
jobs: {
name: getOptionalEnv('REDIS_JOBS_QUEUE_NAME') || 'jobs',
prefix: getOptionalEnv('REDIS_JOBS_QUEUE_PREFIX') || 'colanode',
},
eventsChannel: getOptionalEnv('REDIS_EVENTS_CHANNEL') || 'events',
},
avatarS3: {
endpoint: getRequiredEnv('S3_AVATARS_ENDPOINT'),
accessKey: getRequiredEnv('S3_AVATARS_ACCESS_KEY'),
secretKey: getRequiredEnv('S3_AVATARS_SECRET_KEY'),
bucketName: getRequiredEnv('S3_AVATARS_BUCKET_NAME'),
region: getRequiredEnv('S3_AVATARS_REGION'),
},
fileS3: {
endpoint: getRequiredEnv('S3_FILES_ENDPOINT'),
accessKey: getRequiredEnv('S3_FILES_ACCESS_KEY'),
secretKey: getRequiredEnv('S3_FILES_SECRET_KEY'),
bucketName: getRequiredEnv('S3_FILES_BUCKET_NAME'),
region: getRequiredEnv('S3_FILES_REGION'),
},
smtp: {
host: getOptionalEnv('SMTP_HOST') || '',
port: parseInt(getOptionalEnv('SMTP_PORT') || '587'),
secure: getOptionalEnv('SMTP_SECURE') === 'true',
user: getOptionalEnv('SMTP_USER') || '',
password: getOptionalEnv('SMTP_PASSWORD') || '',
from: {
email: getRequiredEnv('SMTP_EMAIL_FROM'),
name: getRequiredEnv('SMTP_EMAIL_FROM_NAME'),
},
},
ai: {
enabled: getOptionalEnv('AI_ENABLED') === 'true',
nodeEmbeddingDelay: parseInt(
getOptionalEnv('AI_NODE_EMBEDDING_DELAY') || '5000'
),
documentEmbeddingDelay: parseInt(
getOptionalEnv('AI_DOCUMENT_EMBEDDING_DELAY') || '10000'
),
providers: {
openai: {
apiKey: getOptionalEnv('OPENAI_API_KEY') || '',
enabled: getOptionalEnv('OPENAI_ENABLED') === 'true',
},
google: {
apiKey: getOptionalEnv('GOOGLE_API_KEY') || '',
enabled: getOptionalEnv('GOOGLE_ENABLED') === 'true',
},
},
langfuse: {
enabled: getOptionalEnv('LANGFUSE_ENABLED') === 'true',
publicKey: getOptionalEnv('LANGFUSE_PUBLIC_KEY') || '',
secretKey: getOptionalEnv('LANGFUSE_SECRET_KEY') || '',
baseUrl:
getOptionalEnv('LANGFUSE_BASE_URL') || 'https://cloud.langfuse.com',
},
models: {
queryRewrite: {
provider: (getOptionalEnv('QUERY_REWRITE_PROVIDER') ||
'openai') as AiProvider,
modelName: getOptionalEnv('QUERY_REWRITE_MODEL') || 'gpt-4o-mini',
temperature: parseFloat(
getOptionalEnv('QUERY_REWRITE_TEMPERATURE') || '0.3'
),
},
response: {
provider: (getOptionalEnv('RESPONSE_PROVIDER') ||
'openai') as AiProvider,
modelName: getOptionalEnv('RESPONSE_MODEL') || 'gpt-4o-mini',
temperature: parseFloat(
getOptionalEnv('RESPONSE_TEMPERATURE') || '0.3'
),
},
rerank: {
provider: (getOptionalEnv('RERANK_PROVIDER') || 'openai') as AiProvider,
modelName: getOptionalEnv('RERANK_MODEL') || 'gpt-4o-mini',
temperature: parseFloat(getOptionalEnv('RERANK_TEMPERATURE') || '0.3'),
},
summarization: {
provider: (getOptionalEnv('SUMMARIZATION_PROVIDER') ||
'openai') as AiProvider,
modelName: getOptionalEnv('SUMMARIZATION_MODEL') || 'gpt-4o-mini',
temperature: parseFloat(
getOptionalEnv('SUMMARIZATION_TEMPERATURE') || '0.3'
),
},
contextEnhancer: {
provider: (getOptionalEnv('CHUNK_CONTEXT_PROVIDER') ||
'openai') as AiProvider,
modelName: getOptionalEnv('CHUNK_CONTEXT_MODEL') || 'gpt-4o-mini',
temperature: parseFloat(
getOptionalEnv('CHUNK_CONTEXT_TEMPERATURE') || '0.3'
),
},
noContext: {
provider: (getOptionalEnv('NO_CONTEXT_PROVIDER') ||
'openai') as AiProvider,
modelName: getOptionalEnv('NO_CONTEXT_MODEL') || 'gpt-4o-mini',
temperature: parseFloat(
getOptionalEnv('NO_CONTEXT_TEMPERATURE') || '0.3'
),
},
intentRecognition: {
provider: (getOptionalEnv('INTENT_RECOGNITION_PROVIDER') ||
'openai') as AiProvider,
modelName: getOptionalEnv('INTENT_RECOGNITION_MODEL') || 'gpt-4o-mini',
temperature: parseFloat(
getOptionalEnv('INTENT_RECOGNITION_TEMPERATURE') || '0.3'
),
},
databaseFilter: {
provider: (getOptionalEnv('DATABASE_FILTER_PROVIDER') ||
'openai') as AiProvider,
modelName: getOptionalEnv('DATABASE_FILTER_MODEL') || 'gpt-4o-mini',
temperature: parseFloat(
getOptionalEnv('DATABASE_FILTER_TEMPERATURE') || '0.3'
),
},
},
embedding: {
provider: (getOptionalEnv('EMBEDDING_PROVIDER') ||
'openai') as AiProvider,
modelName: getOptionalEnv('EMBEDDING_MODEL') || 'text-embedding-3-large',
dimensions: parseInt(getOptionalEnv('EMBEDDING_DIMENSIONS') || '2000'),
apiKey: getOptionalEnv('EMBEDDING_API_KEY') || '',
batchSize: parseInt(getOptionalEnv('EMBEDDING_BATCH_SIZE') || '50'),
},
chunking: {
defaultChunkSize: parseInt(
getOptionalEnv('CHUNK_DEFAULT_CHUNK_SIZE') || '1000'
),
defaultOverlap: parseInt(
getOptionalEnv('CHUNK_DEFAULT_OVERLAP') || '200'
),
enhanceWithContext:
getOptionalEnv('CHUNK_ENHANCE_WITH_CONTEXT') === 'true',
},
retrieval: {
hybridSearch: {
semanticSearchWeight: parseFloat(
getOptionalEnv('RETRIEVAL_HYBRID_SEARCH_SEMANTIC_WEIGHT') || '0.7'
),
keywordSearchWeight: parseFloat(
getOptionalEnv('RETRIEVAL_HYBRID_SEARCH_KEYWORD_WEIGHT') || '0.3'
),
maxResults: parseInt(
getOptionalEnv('RETRIEVAL_HYBRID_SEARCH_MAX_RESULTS') || '20'
),
},
},
},
};

View File

@@ -15,7 +15,7 @@ import { ConcurrentUpdateResult, UpdateDocumentOutput } from '@/types/nodes';
import { eventBus } from '@/lib/event-bus';
import { fetchNode, fetchNodeTree, mapNode } from '@/lib/nodes';
import { CreateDocumentInput, CreateDocumentOutput } from '@/types/documents';
import { scheduleDocumentEmbedding } from '@/lib/embeddings';
import { scheduleDocumentEmbedding } from '@/lib/ai/embeddings';
const debug = createDebugger('server:lib:documents');

View File

@@ -2,7 +2,7 @@ import { generateId, IdType } from '@colanode/core';
import { Event } from '@/types/events';
import { redis } from '@/data/redis';
import { configuration } from '@/lib/configuration';
import { config } from '@/lib/config';
export interface Subscription {
id: string;
@@ -38,14 +38,14 @@ export class EventBusService {
this.initialized = true;
if (configuration.server.mode === 'standalone') {
if (config.server.mode === 'standalone') {
return;
}
const client = redis.duplicate();
await client.connect();
client.subscribe(configuration.redis.eventsChannel, (message) => {
client.subscribe(config.redis.eventsChannel, (message) => {
const envelope = JSON.parse(message) as DistributedEventEnvelope;
if (envelope.hostId === this.hostId) {
return;
@@ -73,12 +73,12 @@ export class EventBusService {
public publish(event: Event) {
this.processEvent(event);
if (configuration.server.mode === 'standalone') {
if (config.server.mode === 'standalone') {
return;
}
redis.publish(
configuration.redis.eventsChannel,
config.redis.eventsChannel,
JSON.stringify({ event, hostId: this.hostId })
);
}

View File

@@ -7,7 +7,7 @@ import {
import { FileAttributes } from '@colanode/core';
import { getSignedUrl } from '@aws-sdk/s3-request-presigner';
import { configuration } from '@/lib/configuration';
import { config } from '@/lib/config';
import { fileS3 } from '@/data/storage';
export const buildFilePath = (
@@ -24,7 +24,7 @@ export const buildUploadUrl = async (
mimeType: string
) => {
const command = new PutObjectCommand({
Bucket: configuration.fileS3.bucketName,
Bucket: config.fileS3.bucketName,
Key: path,
ContentLength: size,
ContentType: mimeType,
@@ -40,7 +40,7 @@ export const buildUploadUrl = async (
export const buildDownloadUrl = async (path: string) => {
const command = new GetObjectCommand({
Bucket: configuration.fileS3.bucketName,
Bucket: config.fileS3.bucketName,
Key: path,
});
@@ -53,7 +53,7 @@ export const buildDownloadUrl = async (path: string) => {
export const fetchFileMetadata = async (path: string) => {
const command = new HeadObjectCommand({
Bucket: configuration.fileS3.bucketName,
Bucket: config.fileS3.bucketName,
Key: path,
});
@@ -70,7 +70,7 @@ export const fetchFileMetadata = async (path: string) => {
export const deleteFile = async (path: string) => {
const command = new DeleteObjectCommand({
Bucket: configuration.fileS3.bucketName,
Bucket: config.fileS3.bucketName,
Key: path,
});

View File

@@ -39,7 +39,7 @@ import {
} from '@/lib/collaborations';
import { jobService } from '@/services/job-service';
import { deleteFile } from '@/lib/files';
import { scheduleNodeEmbedding } from '@/lib/embeddings';
import { scheduleNodeEmbedding } from '@/lib/ai/embeddings';
const debug = createDebugger('server:lib:nodes');

View File

@@ -0,0 +1,499 @@
import { sql, Expression, SqlBool } from 'kysely';
import {
BooleanFieldAttributes,
CreatedAtFieldAttributes,
DatabaseNode,
DateFieldAttributes,
EmailFieldAttributes,
FieldAttributes,
isStringArray,
NumberFieldAttributes,
PhoneFieldAttributes,
SelectFieldAttributes,
TextFieldAttributes,
UrlFieldAttributes,
DatabaseViewFieldFilterAttributes,
DatabaseViewFilterAttributes,
DatabaseViewSortAttributes,
MultiSelectFieldAttributes,
} from '@colanode/core';
import { database } from '@/data/database';
type FilterInput = {
filters: DatabaseViewFilterAttributes[];
sorts: DatabaseViewSortAttributes[];
page: number;
count: number;
};
type SearchInput = {
searchQuery: string;
exclude?: string[];
};
type TextBasedFieldAttributes =
| TextFieldAttributes
| EmailFieldAttributes
| PhoneFieldAttributes
| UrlFieldAttributes;
export const retrieveByFilters = async (
databaseId: string,
workspaceId: string,
userId: string,
input: FilterInput
) => {
const databaseNode = await fetchDatabase(databaseId, workspaceId);
const filterQuery = buildFiltersQuery(
input.filters,
databaseNode.attributes.fields
);
const orderByQuery =
input.sorts.length > 0
? buildSortOrdersQuery(input.sorts, databaseNode.attributes.fields)
: 'n.id ASC';
const offset = (input.page - 1) * input.count;
const query = database
.selectFrom('nodes as n')
.innerJoin('collaborations as c', 'c.node_id', 'n.root_id')
.where('n.parent_id', '=', databaseId)
.where('n.type', '=', 'record')
.where('n.workspace_id', '=', workspaceId)
.where('c.collaborator_id', '=', userId)
.where('c.deleted_at', 'is', null);
if (filterQuery) {
query.where(filterQuery);
}
const result = await query
.orderBy(sql.raw(orderByQuery))
.limit(input.count)
.offset(offset)
.selectAll()
.execute();
return result;
};
export const searchRecords = async (
databaseId: string,
workspaceId: string,
userId: string,
input: SearchInput
) => {
if (!input.searchQuery) {
return fetchAllRecords(databaseId, workspaceId, userId, input.exclude);
}
const searchCondition = sql<SqlBool>`
to_tsvector('english', n.attributes->>'name') @@ plainto_tsquery('english', ${input.searchQuery})
OR EXISTS (
SELECT 1
FROM jsonb_each_text(n.attributes->'fields') fields
WHERE to_tsvector('english', fields.value::text) @@ plainto_tsquery('english', ${input.searchQuery})
)
`;
const query = database
.selectFrom('nodes as n')
.innerJoin('collaborations as c', 'c.node_id', 'n.root_id')
.where('n.parent_id', '=', databaseId)
.where('n.type', '=', 'record')
.where('n.workspace_id', '=', workspaceId)
.where('c.collaborator_id', '=', userId)
.where('c.deleted_at', 'is', null)
.where(searchCondition);
if (input.exclude?.length) {
query.where('n.id', 'not in', input.exclude);
}
return query.selectAll().execute();
};
export const fetchAllRecords = async (
databaseId: string,
workspaceId: string,
userId: string,
exclude?: string[]
) => {
return database
.selectFrom('nodes as n')
.innerJoin('collaborations as c', 'c.node_id', 'n.root_id')
.where('n.parent_id', '=', databaseId)
.where('n.type', '=', 'record')
.where('n.workspace_id', '=', workspaceId)
.where('c.collaborator_id', '=', userId)
.where('c.deleted_at', 'is', null)
.$if(!!exclude?.length, (qb) => qb.where('n.id', 'not in', exclude!))
.selectAll()
.execute();
};
export const fetchDatabase = async (
databaseId: string,
workspaceId: string
): Promise<DatabaseNode> => {
const row = await database
.selectFrom('nodes')
.where('id', '=', databaseId)
.where('workspace_id', '=', workspaceId)
.where('type', '=', 'database')
.selectAll()
.executeTakeFirst();
if (!row) {
throw new Error('Database not found');
}
return row as unknown as DatabaseNode;
};
export const buildFiltersQuery = (
filters: DatabaseViewFilterAttributes[],
fields: Record<string, FieldAttributes>
): Expression<SqlBool> | undefined => {
if (filters.length === 0) {
return undefined;
}
const filterQueries = filters
.map((filter) => buildFilterQuery(filter, fields))
.filter((query): query is Expression<SqlBool> => query !== null);
if (filterQueries.length === 0) {
return undefined;
}
return sql<SqlBool>`(${sql.join(filterQueries, sql` AND `)})`;
};
export const buildFilterQuery = (
filter: DatabaseViewFilterAttributes,
fields: Record<string, FieldAttributes>
): Expression<SqlBool> | null => {
if (filter.type === 'group') {
return null;
}
const field = fields[filter.fieldId];
if (!field) {
return null;
}
switch (field.type) {
case 'boolean':
return buildBooleanFilterQuery(filter, field);
case 'created_at':
return buildCreatedAtFilterQuery(filter, field);
case 'date':
return buildDateFilterQuery(filter, field);
case 'email':
case 'phone':
case 'url':
case 'text':
return buildTextFilterQuery(filter, field as TextBasedFieldAttributes);
case 'multi_select':
return buildMultiSelectFilterQuery(filter, field);
case 'number':
return buildNumberFilterQuery(filter, field);
case 'select':
return buildSelectFilterQuery(filter, field);
default:
return null;
}
};
export const buildBooleanFilterQuery = (
filter: DatabaseViewFieldFilterAttributes,
field: BooleanFieldAttributes
): Expression<SqlBool> | null => {
if (filter.operator === 'is_true') {
return sql<SqlBool>`(n.attributes->'fields'->${field.id}->>'value')::boolean = true`;
}
if (filter.operator === 'is_false') {
return sql<SqlBool>`((n.attributes->'fields'->${field.id}->>'value')::boolean = false OR n.attributes->'fields'->${field.id}->>'value' IS NULL)`;
}
return null;
};
export const buildNumberFilterQuery = (
filter: DatabaseViewFieldFilterAttributes,
field: NumberFieldAttributes
): Expression<SqlBool> | null => {
if (filter.operator === 'is_empty') {
return sql<SqlBool>`n.attributes->'fields'->${field.id}->>'value' IS NULL`;
}
if (filter.operator === 'is_not_empty') {
return sql<SqlBool>`n.attributes->'fields'->${field.id}->>'value' IS NOT NULL`;
}
if (filter.value === null || typeof filter.value !== 'number') {
return null;
}
const value = filter.value;
let operator: string;
switch (filter.operator) {
case 'is_equal_to':
operator = '=';
break;
case 'is_not_equal_to':
operator = '!=';
break;
case 'is_greater_than':
operator = '>';
break;
case 'is_less_than':
operator = '<';
break;
case 'is_greater_than_or_equal_to':
operator = '>=';
break;
case 'is_less_than_or_equal_to':
operator = '<=';
break;
default:
return null;
}
return sql<SqlBool>`(n.attributes->'fields'->${field.id}->>'value')::numeric ${sql.raw(operator)} ${value}`;
};
export const buildTextFilterQuery = (
filter: DatabaseViewFieldFilterAttributes,
field: TextBasedFieldAttributes
): Expression<SqlBool> | null => {
if (filter.operator === 'is_empty') {
return sql<SqlBool>`n.attributes->'fields'->${field.id}->>'value' IS NULL`;
}
if (filter.operator === 'is_not_empty') {
return sql<SqlBool>`n.attributes->'fields'->${field.id}->>'value' IS NOT NULL`;
}
if (filter.value === null || typeof filter.value !== 'string') {
return null;
}
const value = filter.value;
switch (filter.operator) {
case 'is_equal_to':
return sql<SqlBool>`n.attributes->'fields'->${field.id}->>'value' = ${value}`;
case 'is_not_equal_to':
return sql<SqlBool>`n.attributes->'fields'->${field.id}->>'value' != ${value}`;
case 'contains':
return sql<SqlBool>`n.attributes->'fields'->${field.id}->>'value' ILIKE ${'%' + value + '%'}`;
case 'does_not_contain':
return sql<SqlBool>`n.attributes->'fields'->${field.id}->>'value' NOT ILIKE ${'%' + value + '%'}`;
case 'starts_with':
return sql<SqlBool>`n.attributes->'fields'->${field.id}->>'value' ILIKE ${value + '%'}`;
case 'ends_with':
return sql<SqlBool>`n.attributes->'fields'->${field.id}->>'value' ILIKE ${'%' + value}`;
default:
return null;
}
};
export const buildEmailFilterQuery = buildTextFilterQuery;
export const buildPhoneFilterQuery = buildTextFilterQuery;
export const buildUrlFilterQuery = buildTextFilterQuery;
export const buildSelectFilterQuery = (
filter: DatabaseViewFieldFilterAttributes,
field: SelectFieldAttributes
): Expression<SqlBool> | null => {
if (filter.operator === 'is_empty') {
return sql<SqlBool>`n.attributes->'fields'->${field.id}->>'value' IS NULL`;
}
if (filter.operator === 'is_not_empty') {
return sql<SqlBool>`n.attributes->'fields'->${field.id}->>'value' IS NOT NULL`;
}
if (!isStringArray(filter.value) || filter.value.length === 0) {
return null;
}
switch (filter.operator) {
case 'is_in':
return sql<SqlBool>`n.attributes->'fields'->${field.id}->>'value' IN (${sql.join(filter.value)})`;
case 'is_not_in':
return sql<SqlBool>`n.attributes->'fields'->${field.id}->>'value' NOT IN (${sql.join(filter.value)})`;
default:
return null;
}
};
export const buildMultiSelectFilterQuery = (
filter: DatabaseViewFieldFilterAttributes,
field: MultiSelectFieldAttributes
): Expression<SqlBool> | null => {
if (filter.operator === 'is_empty') {
return sql<SqlBool>`(n.attributes->'fields'->${field.id}->>'value' IS NULL OR jsonb_array_length(n.attributes->'fields'->${field.id}->'value') = 0)`;
}
if (filter.operator === 'is_not_empty') {
return sql<SqlBool>`(n.attributes->'fields'->${field.id}->>'value' IS NOT NULL AND jsonb_array_length(n.attributes->'fields'->${field.id}->'value') > 0)`;
}
if (!isStringArray(filter.value) || filter.value.length === 0) {
return null;
}
switch (filter.operator) {
case 'is_in':
return sql<SqlBool>`EXISTS (
SELECT 1
FROM jsonb_array_elements_text(n.attributes->'fields'->${field.id}->'value') val
WHERE val IN (${sql.join(filter.value)})
)`;
case 'is_not_in':
return sql<SqlBool>`NOT EXISTS (
SELECT 1
FROM jsonb_array_elements_text(n.attributes->'fields'->${field.id}->'value') val
WHERE val IN (${sql.join(filter.value)})
)`;
default:
return null;
}
};
export const buildDateFilterQuery = (
filter: DatabaseViewFieldFilterAttributes,
field: DateFieldAttributes
): Expression<SqlBool> | null => {
if (filter.operator === 'is_empty') {
return sql<SqlBool>`n.attributes->'fields'->${field.id}->>'value' IS NULL`;
}
if (filter.operator === 'is_not_empty') {
return sql<SqlBool>`n.attributes->'fields'->${field.id}->>'value' IS NOT NULL`;
}
if (filter.value === null || typeof filter.value !== 'string') {
return null;
}
const date = new Date(filter.value);
if (isNaN(date.getTime())) {
return null;
}
const dateString = date.toISOString().split('T')[0];
let operator: string;
switch (filter.operator) {
case 'is_equal_to':
operator = '=';
break;
case 'is_not_equal_to':
operator = '!=';
break;
case 'is_on_or_after':
operator = '>=';
break;
case 'is_on_or_before':
operator = '<=';
break;
case 'is_after':
operator = '>';
break;
case 'is_before':
operator = '<';
break;
default:
return null;
}
return sql<SqlBool>`DATE(n.attributes->'fields'->${field.id}->>'value') ${sql.raw(operator)} ${dateString}`;
};
export const buildCreatedAtFilterQuery = (
filter: DatabaseViewFieldFilterAttributes,
_: CreatedAtFieldAttributes
): Expression<SqlBool> | null => {
if (filter.operator === 'is_empty') {
return sql<SqlBool>`n.created_at IS NULL`;
}
if (filter.operator === 'is_not_empty') {
return sql<SqlBool>`n.created_at IS NOT NULL`;
}
if (filter.value === null || typeof filter.value !== 'string') {
return null;
}
const date = new Date(filter.value);
if (isNaN(date.getTime())) {
return null;
}
const dateString = date.toISOString().split('T')[0];
let operator: string;
switch (filter.operator) {
case 'is_equal_to':
operator = '=';
break;
case 'is_not_equal_to':
operator = '!=';
break;
case 'is_on_or_after':
operator = '>=';
break;
case 'is_on_or_before':
operator = '<=';
break;
case 'is_after':
operator = '>';
break;
case 'is_before':
operator = '<';
break;
default:
return null;
}
return sql<SqlBool>`DATE(n.created_at) ${sql.raw(operator)} ${dateString}`;
};
export const buildSortOrdersQuery = (
sorts: DatabaseViewSortAttributes[],
fields: Record<string, FieldAttributes>
): string => {
return sorts
.map((sort) => buildSortOrderQuery(sort, fields))
.filter((query): query is string => query !== null)
.join(', ');
};
export const buildSortOrderQuery = (
sort: DatabaseViewSortAttributes,
fields: Record<string, FieldAttributes>
): string | null => {
const field = fields[sort.fieldId];
if (!field) {
return null;
}
if (field.type === 'created_at') {
return `n.created_at ${sort.direction}`;
}
if (field.type === 'created_by') {
return `n.created_by ${sort.direction}`;
}
return `n.attributes->'fields'->${sort.fieldId}->>'value' ${sort.direction}`;
};

View File

@@ -10,7 +10,7 @@ import {
import { createDocument } from '@/lib/documents';
import { SelectAccount } from '@/data/schema';
import { database } from '@/data/database';
import { configuration } from '@/lib/configuration';
import { config } from '@/lib/config';
import { eventBus } from '@/lib/event-bus';
import {
generateInitialMessageBlocks,
@@ -55,8 +55,8 @@ export const createWorkspace = async (
name: account.name,
email: account.email,
avatar: account.avatar,
storage_limit: configuration.user.storageLimit.toString(),
max_file_size: configuration.user.maxFileSize.toString(),
storage_limit: config.user.storageLimit.toString(),
max_file_size: config.user.maxFileSize.toString(),
created_at: date,
created_by: account.id,
status: UserStatus.Active,

View File

@@ -1,219 +0,0 @@
import { OpenAIEmbeddings } from '@langchain/openai';
import { Document } from '@langchain/core/documents';
import { sql } from 'kysely';
import { database } from '@/data/database';
import { configuration } from '@/lib/configuration';
import { SearchResult } from '@/types/retrieval';
import { RewrittenQuery } from '@/types/llm';
import { combineAndScoreSearchResults } from '@/lib/ai-utils';
export class DocumentRetrievalService {
private embeddings = new OpenAIEmbeddings({
apiKey: configuration.ai.embedding.apiKey,
modelName: configuration.ai.embedding.modelName,
dimensions: configuration.ai.embedding.dimensions,
});
public async retrieve(
rewrittenQuery: RewrittenQuery,
workspaceId: string,
userId: string,
limit = configuration.ai.retrieval.hybridSearch.maxResults,
contextNodeIds?: string[]
): Promise<Document[]> {
const embedding = await this.embeddings.embedQuery(
rewrittenQuery.semanticQuery
);
if (!embedding) {
return [];
}
const [semanticResults, keywordResults] = await Promise.all([
this.semanticSearch(
embedding,
workspaceId,
userId,
limit,
contextNodeIds
),
this.keywordSearch(
rewrittenQuery.keywordQuery,
workspaceId,
userId,
limit,
contextNodeIds
),
]);
return this.combineSearchResults(semanticResults, keywordResults);
}
private async semanticSearch(
embedding: number[],
workspaceId: string,
userId: string,
limit: number,
contextNodeIds?: string[]
): Promise<SearchResult[]> {
let queryBuilder = database
.selectFrom('document_embeddings')
.innerJoin('documents', 'documents.id', 'document_embeddings.document_id')
.innerJoin('nodes', 'nodes.id', 'documents.id')
.innerJoin('collaborations', (join) =>
join
.onRef('collaborations.node_id', '=', 'nodes.root_id')
.on('collaborations.collaborator_id', '=', sql.lit(userId))
.on('collaborations.deleted_at', 'is', null)
)
.select([
'document_embeddings.document_id as id',
'document_embeddings.text',
'document_embeddings.summary',
'documents.created_at',
'documents.created_by',
'document_embeddings.chunk as chunk_index',
sql<number>`${sql.raw(`'[${embedding}]'::vector`)} <=> document_embeddings.embedding_vector`.as(
'similarity'
),
])
.where('document_embeddings.workspace_id', '=', workspaceId);
if (contextNodeIds && contextNodeIds.length > 0) {
queryBuilder = queryBuilder.where(
'document_embeddings.document_id',
'in',
contextNodeIds
);
}
const results = await queryBuilder
.groupBy([
'document_embeddings.document_id',
'document_embeddings.text',
'document_embeddings.summary',
'documents.created_at',
'documents.created_by',
'document_embeddings.chunk',
])
.orderBy('similarity', 'asc')
.limit(limit)
.execute();
return results.map((result) => ({
id: result.id,
text: result.text,
summary: result.summary,
score: result.similarity,
type: 'semantic',
createdAt: result.created_at,
createdBy: result.created_by,
chunkIndex: result.chunk_index,
}));
}
private async keywordSearch(
query: string,
workspaceId: string,
userId: string,
limit: number,
contextNodeIds?: string[]
): Promise<SearchResult[]> {
let queryBuilder = database
.selectFrom('document_embeddings')
.innerJoin('documents', 'documents.id', 'document_embeddings.document_id')
.innerJoin('nodes', 'nodes.id', 'documents.id')
.innerJoin('collaborations', (join) =>
join
.onRef('collaborations.node_id', '=', 'nodes.root_id')
.on('collaborations.collaborator_id', '=', sql.lit(userId))
.on('collaborations.deleted_at', 'is', null)
)
.select([
'document_embeddings.document_id as id',
'document_embeddings.text',
'document_embeddings.summary',
'documents.created_at',
'documents.created_by',
'document_embeddings.chunk as chunk_index',
sql<number>`ts_rank(document_embeddings.search_vector, websearch_to_tsquery('english', ${query}))`.as(
'rank'
),
])
.where('document_embeddings.workspace_id', '=', workspaceId)
.where(
() =>
sql`document_embeddings.search_vector @@ websearch_to_tsquery('english', ${query})`
);
if (contextNodeIds && contextNodeIds.length > 0) {
queryBuilder = queryBuilder.where(
'document_embeddings.document_id',
'in',
contextNodeIds
);
}
const results = await queryBuilder
.groupBy([
'document_embeddings.document_id',
'document_embeddings.text',
'documents.created_at',
'documents.created_by',
'document_embeddings.chunk',
'document_embeddings.summary',
])
.orderBy('rank', 'desc')
.limit(limit)
.execute();
return results.map((result) => ({
id: result.id,
text: result.text,
summary: result.summary,
score: result.rank,
type: 'keyword',
createdAt: result.created_at,
createdBy: result.created_by,
chunkIndex: result.chunk_index,
}));
}
private async combineSearchResults(
semanticResults: SearchResult[],
keywordResults: SearchResult[]
): Promise<Document[]> {
const { semanticSearchWeight, keywordSearchWeight } =
configuration.ai.retrieval.hybridSearch;
const authorIds = Array.from(
new Set(
[...semanticResults, ...keywordResults]
.map((r) => r.createdBy)
.filter((id): id is string => id !== undefined && id !== null)
)
);
const authors =
authorIds.length > 0
? await database
.selectFrom('users')
.select(['id', 'name'])
.where('id', 'in', authorIds)
.execute()
: [];
const authorMap = new Map(authors.map((author) => [author.id, author]));
return combineAndScoreSearchResults(
semanticResults,
keywordResults,
semanticSearchWeight,
keywordSearchWeight,
authorMap
);
}
}
export const documentRetrievalService = new DocumentRetrievalService();

View File

@@ -1,7 +1,7 @@
import nodemailer from 'nodemailer';
import { createDebugger } from '@colanode/core';
import { configuration } from '@/lib/configuration';
import { config } from '@/lib/config';
interface EmailMessage {
to: string | string[];
@@ -14,27 +14,22 @@ const debug = createDebugger('server:service:email');
class EmailService {
private transporter: nodemailer.Transporter | undefined;
constructor() {}
private from: string | undefined;
public async init() {
if (
!configuration.smtp.host ||
!configuration.smtp.port ||
!configuration.smtp.from.email ||
!configuration.smtp.from.name
) {
if (!config.smtp.enabled) {
debug('SMTP configuration is not set, skipping initialization');
return;
}
this.from = `${config.smtp.from.name} <${config.smtp.from.email}>`;
this.transporter = nodemailer.createTransport({
host: configuration.smtp.host,
port: configuration.smtp.port,
secure: configuration.smtp.secure,
host: config.smtp.host,
port: config.smtp.port,
secure: config.smtp.secure,
auth: {
user: configuration.smtp.user,
pass: configuration.smtp.password,
user: config.smtp.user,
pass: config.smtp.password,
},
});
@@ -42,13 +37,13 @@ class EmailService {
}
public async sendEmail(message: EmailMessage): Promise<void> {
if (!this.transporter) {
if (!this.transporter || !this.from) {
debug('Email service not initialized, skipping email send');
return;
}
await this.transporter.sendMail({
from: `${configuration.smtp.from.name} <${configuration.smtp.from.email}>`,
from: this.from,
...message,
});
}

View File

@@ -1,7 +1,7 @@
import { Job, JobsOptions, Queue, Worker } from 'bullmq';
import { createDebugger } from '@colanode/core';
import { configuration } from '@/lib/configuration';
import { config } from '@/lib/config';
import { jobHandlerMap } from '@/jobs';
import { JobHandler, JobInput } from '@/types/jobs';
@@ -17,8 +17,8 @@ class JobService {
// for more information, see: https://docs.bullmq.io/bull/patterns/redis-cluster
private readonly queueName = configuration.redis.jobs.name;
private readonly prefix = `{${configuration.redis.jobs.prefix}}`;
private readonly queueName = config.redis.jobs.name;
private readonly prefix = `{${config.redis.jobs.prefix}}`;
public initQueue() {
if (this.jobQueue) {
@@ -28,8 +28,8 @@ class JobService {
this.jobQueue = new Queue(this.queueName, {
prefix: this.prefix,
connection: {
db: configuration.redis.db,
url: configuration.redis.url,
db: config.redis.db,
url: config.redis.url,
},
defaultJobOptions: {
removeOnComplete: true,
@@ -40,7 +40,7 @@ class JobService {
debug(`Job queue error: ${error}`);
});
if (configuration.ai.enabled) {
if (config.ai.enabled) {
this.jobQueue.upsertJobScheduler(
'check_node_embeddings',
{ pattern: '0 */30 * * * *' },
@@ -79,8 +79,8 @@ class JobService {
this.jobWorker = new Worker(this.queueName, this.handleJobJob, {
prefix: this.prefix,
connection: {
url: configuration.redis.url,
db: configuration.redis.db,
url: config.redis.url,
db: config.redis.db,
},
});
}

View File

@@ -1,216 +0,0 @@
import { OpenAIEmbeddings } from '@langchain/openai';
import { Document } from '@langchain/core/documents';
import { sql } from 'kysely';
import { database } from '@/data/database';
import { configuration } from '@/lib/configuration';
import { SearchResult } from '@/types/retrieval';
import { RewrittenQuery } from '@/types/llm';
import { combineAndScoreSearchResults } from '@/lib/ai-utils';
export class NodeRetrievalService {
private embeddings = new OpenAIEmbeddings({
apiKey: configuration.ai.embedding.apiKey,
modelName: configuration.ai.embedding.modelName,
dimensions: configuration.ai.embedding.dimensions,
});
public async retrieve(
rewrittenQuery: RewrittenQuery,
workspaceId: string,
userId: string,
limit = configuration.ai.retrieval.hybridSearch.maxResults,
contextNodeIds?: string[]
): Promise<Document[]> {
const embedding = await this.embeddings.embedQuery(
rewrittenQuery.semanticQuery
);
if (!embedding) {
return [];
}
const [semanticResults, keywordResults] = await Promise.all([
this.semanticSearch(
embedding,
workspaceId,
userId,
limit,
contextNodeIds
),
this.keywordSearch(
rewrittenQuery.keywordQuery,
workspaceId,
userId,
limit,
contextNodeIds
),
]);
return this.combineSearchResults(semanticResults, keywordResults);
}
private async semanticSearch(
embedding: number[],
workspaceId: string,
userId: string,
limit: number,
contextNodeIds?: string[]
): Promise<SearchResult[]> {
let queryBuilder = database
.selectFrom('node_embeddings')
.innerJoin('nodes', 'nodes.id', 'node_embeddings.node_id')
.innerJoin('collaborations', (join) =>
join
.onRef('collaborations.node_id', '=', 'nodes.root_id')
.on('collaborations.collaborator_id', '=', sql.lit(userId))
.on('collaborations.deleted_at', 'is', null)
)
.select([
'node_embeddings.node_id as id',
'node_embeddings.text',
'node_embeddings.summary',
'nodes.created_at',
'nodes.created_by',
'node_embeddings.chunk as chunk_index',
sql<number>`${sql.raw(`'[${embedding}]'::vector`)} <=> node_embeddings.embedding_vector`.as(
'similarity'
),
])
.where('node_embeddings.workspace_id', '=', workspaceId);
if (contextNodeIds && contextNodeIds.length > 0) {
queryBuilder = queryBuilder.where(
'node_embeddings.node_id',
'in',
contextNodeIds
);
}
const results = await queryBuilder
.groupBy([
'node_embeddings.node_id',
'node_embeddings.text',
'nodes.created_at',
'nodes.created_by',
'node_embeddings.chunk',
'node_embeddings.summary',
])
.orderBy('similarity', 'asc')
.limit(limit)
.execute();
return results.map((result) => ({
id: result.id,
text: result.text,
summary: result.summary,
score: result.similarity,
type: 'semantic',
createdAt: result.created_at,
createdBy: result.created_by,
chunkIndex: result.chunk_index,
}));
}
private async keywordSearch(
query: string,
workspaceId: string,
userId: string,
limit: number,
contextNodeIds?: string[]
): Promise<SearchResult[]> {
let queryBuilder = database
.selectFrom('node_embeddings')
.innerJoin('nodes', 'nodes.id', 'node_embeddings.node_id')
.innerJoin('collaborations', (join) =>
join
.onRef('collaborations.node_id', '=', 'nodes.root_id')
.on('collaborations.collaborator_id', '=', sql.lit(userId))
.on('collaborations.deleted_at', 'is', null)
)
.select([
'node_embeddings.node_id as id',
'node_embeddings.text',
'node_embeddings.summary',
'nodes.created_at',
'nodes.created_by',
'node_embeddings.chunk as chunk_index',
sql<number>`ts_rank(node_embeddings.search_vector, websearch_to_tsquery('english', ${query}))`.as(
'rank'
),
])
.where('node_embeddings.workspace_id', '=', workspaceId)
.where(
() =>
sql`node_embeddings.search_vector @@ websearch_to_tsquery('english', ${query})`
);
if (contextNodeIds && contextNodeIds.length > 0) {
queryBuilder = queryBuilder.where(
'node_embeddings.node_id',
'in',
contextNodeIds
);
}
const results = await queryBuilder
.groupBy([
'node_embeddings.node_id',
'node_embeddings.text',
'nodes.created_at',
'nodes.created_by',
'node_embeddings.chunk',
'node_embeddings.summary',
])
.orderBy('rank', 'desc')
.limit(limit)
.execute();
return results.map((result) => ({
id: result.id,
text: result.text,
summary: result.summary,
score: result.rank,
type: 'keyword',
createdAt: result.created_at,
createdBy: result.created_by,
chunkIndex: result.chunk_index,
}));
}
private async combineSearchResults(
semanticResults: SearchResult[],
keywordResults: SearchResult[]
): Promise<Document[]> {
const { semanticSearchWeight, keywordSearchWeight } =
configuration.ai.retrieval.hybridSearch;
const authorIds = Array.from(
new Set(
[...semanticResults, ...keywordResults]
.map((r) => r.createdBy)
.filter((id): id is string => id !== undefined && id !== null)
)
);
const authors =
authorIds.length > 0
? await database
.selectFrom('users')
.select(['id', 'name'])
.where('id', 'in', authorIds)
.execute()
: [];
const authorMap = new Map(authors.map((author) => [author.id, author]));
return combineAndScoreSearchResults(
semanticResults,
keywordResults,
semanticSearchWeight,
keywordSearchWeight,
authorMap
);
}
}
export const nodeRetrievalService = new NodeRetrievalService();

View File

@@ -1,514 +0,0 @@
import { sql, Kysely, Expression, SqlBool } from 'kysely';
import {
BooleanFieldAttributes,
CreatedAtFieldAttributes,
DatabaseNode,
DateFieldAttributes,
EmailFieldAttributes,
FieldAttributes,
isStringArray,
NumberFieldAttributes,
PhoneFieldAttributes,
SelectFieldAttributes,
TextFieldAttributes,
UrlFieldAttributes,
DatabaseViewFieldFilterAttributes,
DatabaseViewFilterAttributes,
DatabaseViewSortAttributes,
MultiSelectFieldAttributes,
} from '@colanode/core';
import { database } from '@/data/database';
import { DatabaseSchema } from '@/data/schema';
type FilterInput = {
filters: DatabaseViewFilterAttributes[];
sorts: DatabaseViewSortAttributes[];
page: number;
count: number;
};
type SearchInput = {
searchQuery: string;
exclude?: string[];
};
type TextBasedFieldAttributes =
| TextFieldAttributes
| EmailFieldAttributes
| PhoneFieldAttributes
| UrlFieldAttributes;
export class RecordsRetrievalService {
constructor(private readonly db: Kysely<DatabaseSchema>) {}
public async retrieveByFilters(
databaseId: string,
workspaceId: string,
userId: string,
input: FilterInput
) {
const database = await this.fetchDatabase(databaseId, workspaceId);
const filterQuery = this.buildFiltersQuery(
input.filters,
database.attributes.fields
);
const orderByQuery =
input.sorts.length > 0
? this.buildSortOrdersQuery(input.sorts, database.attributes.fields)
: 'n.id ASC';
const offset = (input.page - 1) * input.count;
const query = this.db
.selectFrom('nodes as n')
.innerJoin('collaborations as c', 'c.node_id', 'n.root_id')
.where('n.parent_id', '=', databaseId)
.where('n.type', '=', 'record')
.where('n.workspace_id', '=', workspaceId)
.where('c.collaborator_id', '=', userId)
.where('c.deleted_at', 'is', null);
if (filterQuery) {
query.where(filterQuery);
}
const result = await query
.orderBy(sql.raw(orderByQuery))
.limit(input.count)
.offset(offset)
.selectAll()
.execute();
return result;
}
public async searchRecords(
databaseId: string,
workspaceId: string,
userId: string,
input: SearchInput
) {
if (!input.searchQuery) {
return this.fetchAllRecords(
databaseId,
workspaceId,
userId,
input.exclude
);
}
const searchCondition = sql<SqlBool>`
to_tsvector('english', n.attributes->>'name') @@ plainto_tsquery('english', ${input.searchQuery})
OR EXISTS (
SELECT 1
FROM jsonb_each_text(n.attributes->'fields') fields
WHERE to_tsvector('english', fields.value::text) @@ plainto_tsquery('english', ${input.searchQuery})
)
`;
const query = this.db
.selectFrom('nodes as n')
.innerJoin('collaborations as c', 'c.node_id', 'n.root_id')
.where('n.parent_id', '=', databaseId)
.where('n.type', '=', 'record')
.where('n.workspace_id', '=', workspaceId)
.where('c.collaborator_id', '=', userId)
.where('c.deleted_at', 'is', null)
.where(searchCondition);
if (input.exclude?.length) {
query.where('n.id', 'not in', input.exclude);
}
return query.selectAll().execute();
}
private async fetchAllRecords(
databaseId: string,
workspaceId: string,
userId: string,
exclude?: string[]
) {
return this.db
.selectFrom('nodes as n')
.innerJoin('collaborations as c', 'c.node_id', 'n.root_id')
.where('n.parent_id', '=', databaseId)
.where('n.type', '=', 'record')
.where('n.workspace_id', '=', workspaceId)
.where('c.collaborator_id', '=', userId)
.where('c.deleted_at', 'is', null)
.$if(!!exclude?.length, (qb) => qb.where('n.id', 'not in', exclude!))
.selectAll()
.execute();
}
private async fetchDatabase(
databaseId: string,
workspaceId: string
): Promise<DatabaseNode> {
const row = await this.db
.selectFrom('nodes')
.where('id', '=', databaseId)
.where('workspace_id', '=', workspaceId)
.where('type', '=', 'database')
.selectAll()
.executeTakeFirst();
if (!row) {
throw new Error('Database not found');
}
return row as unknown as DatabaseNode;
}
private buildFiltersQuery(
filters: DatabaseViewFilterAttributes[],
fields: Record<string, FieldAttributes>
): Expression<SqlBool> | undefined {
if (filters.length === 0) {
return undefined;
}
const filterQueries = filters
.map((filter) => this.buildFilterQuery(filter, fields))
.filter((query): query is Expression<SqlBool> => query !== null);
if (filterQueries.length === 0) {
return undefined;
}
return sql<SqlBool>`(${sql.join(filterQueries, sql` AND `)})`;
}
private buildFilterQuery(
filter: DatabaseViewFilterAttributes,
fields: Record<string, FieldAttributes>
): Expression<SqlBool> | null {
if (filter.type === 'group') {
return null;
}
const field = fields[filter.fieldId];
if (!field) {
return null;
}
switch (field.type) {
case 'boolean':
return this.buildBooleanFilterQuery(filter, field);
case 'created_at':
return this.buildCreatedAtFilterQuery(filter, field);
case 'date':
return this.buildDateFilterQuery(filter, field);
case 'email':
case 'phone':
case 'url':
case 'text':
return this.buildTextFilterQuery(
filter,
field as TextBasedFieldAttributes
);
case 'multi_select':
return this.buildMultiSelectFilterQuery(filter, field);
case 'number':
return this.buildNumberFilterQuery(filter, field);
case 'select':
return this.buildSelectFilterQuery(filter, field);
default:
return null;
}
}
private buildBooleanFilterQuery(
filter: DatabaseViewFieldFilterAttributes,
field: BooleanFieldAttributes
): Expression<SqlBool> | null {
if (filter.operator === 'is_true') {
return sql<SqlBool>`(n.attributes->'fields'->${field.id}->>'value')::boolean = true`;
}
if (filter.operator === 'is_false') {
return sql<SqlBool>`((n.attributes->'fields'->${field.id}->>'value')::boolean = false OR n.attributes->'fields'->${field.id}->>'value' IS NULL)`;
}
return null;
}
private buildNumberFilterQuery(
filter: DatabaseViewFieldFilterAttributes,
field: NumberFieldAttributes
): Expression<SqlBool> | null {
if (filter.operator === 'is_empty') {
return sql<SqlBool>`n.attributes->'fields'->${field.id}->>'value' IS NULL`;
}
if (filter.operator === 'is_not_empty') {
return sql<SqlBool>`n.attributes->'fields'->${field.id}->>'value' IS NOT NULL`;
}
if (filter.value === null || typeof filter.value !== 'number') {
return null;
}
const value = filter.value;
let operator: string;
switch (filter.operator) {
case 'is_equal_to':
operator = '=';
break;
case 'is_not_equal_to':
operator = '!=';
break;
case 'is_greater_than':
operator = '>';
break;
case 'is_less_than':
operator = '<';
break;
case 'is_greater_than_or_equal_to':
operator = '>=';
break;
case 'is_less_than_or_equal_to':
operator = '<=';
break;
default:
return null;
}
return sql<SqlBool>`(n.attributes->'fields'->${field.id}->>'value')::numeric ${sql.raw(operator)} ${value}`;
}
private buildTextFilterQuery(
filter: DatabaseViewFieldFilterAttributes,
field: TextBasedFieldAttributes
): Expression<SqlBool> | null {
if (filter.operator === 'is_empty') {
return sql<SqlBool>`n.attributes->'fields'->${field.id}->>'value' IS NULL`;
}
if (filter.operator === 'is_not_empty') {
return sql<SqlBool>`n.attributes->'fields'->${field.id}->>'value' IS NOT NULL`;
}
if (filter.value === null || typeof filter.value !== 'string') {
return null;
}
const value = filter.value;
switch (filter.operator) {
case 'is_equal_to':
return sql<SqlBool>`n.attributes->'fields'->${field.id}->>'value' = ${value}`;
case 'is_not_equal_to':
return sql<SqlBool>`n.attributes->'fields'->${field.id}->>'value' != ${value}`;
case 'contains':
return sql<SqlBool>`n.attributes->'fields'->${field.id}->>'value' ILIKE ${'%' + value + '%'}`;
case 'does_not_contain':
return sql<SqlBool>`n.attributes->'fields'->${field.id}->>'value' NOT ILIKE ${'%' + value + '%'}`;
case 'starts_with':
return sql<SqlBool>`n.attributes->'fields'->${field.id}->>'value' ILIKE ${value + '%'}`;
case 'ends_with':
return sql<SqlBool>`n.attributes->'fields'->${field.id}->>'value' ILIKE ${'%' + value}`;
default:
return null;
}
}
private buildEmailFilterQuery = this.buildTextFilterQuery;
private buildPhoneFilterQuery = this.buildTextFilterQuery;
private buildUrlFilterQuery = this.buildTextFilterQuery;
private buildSelectFilterQuery(
filter: DatabaseViewFieldFilterAttributes,
field: SelectFieldAttributes
): Expression<SqlBool> | null {
if (filter.operator === 'is_empty') {
return sql<SqlBool>`n.attributes->'fields'->${field.id}->>'value' IS NULL`;
}
if (filter.operator === 'is_not_empty') {
return sql<SqlBool>`n.attributes->'fields'->${field.id}->>'value' IS NOT NULL`;
}
if (!isStringArray(filter.value) || filter.value.length === 0) {
return null;
}
switch (filter.operator) {
case 'is_in':
return sql<SqlBool>`n.attributes->'fields'->${field.id}->>'value' IN (${sql.join(filter.value)})`;
case 'is_not_in':
return sql<SqlBool>`n.attributes->'fields'->${field.id}->>'value' NOT IN (${sql.join(filter.value)})`;
default:
return null;
}
}
private buildMultiSelectFilterQuery(
filter: DatabaseViewFieldFilterAttributes,
field: MultiSelectFieldAttributes
): Expression<SqlBool> | null {
if (filter.operator === 'is_empty') {
return sql<SqlBool>`(n.attributes->'fields'->${field.id}->>'value' IS NULL OR jsonb_array_length(n.attributes->'fields'->${field.id}->'value') = 0)`;
}
if (filter.operator === 'is_not_empty') {
return sql<SqlBool>`(n.attributes->'fields'->${field.id}->>'value' IS NOT NULL AND jsonb_array_length(n.attributes->'fields'->${field.id}->'value') > 0)`;
}
if (!isStringArray(filter.value) || filter.value.length === 0) {
return null;
}
switch (filter.operator) {
case 'is_in':
return sql<SqlBool>`EXISTS (
SELECT 1
FROM jsonb_array_elements_text(n.attributes->'fields'->${field.id}->'value') val
WHERE val IN (${sql.join(filter.value)})
)`;
case 'is_not_in':
return sql<SqlBool>`NOT EXISTS (
SELECT 1
FROM jsonb_array_elements_text(n.attributes->'fields'->${field.id}->'value') val
WHERE val IN (${sql.join(filter.value)})
)`;
default:
return null;
}
}
private buildDateFilterQuery(
filter: DatabaseViewFieldFilterAttributes,
field: DateFieldAttributes
): Expression<SqlBool> | null {
if (filter.operator === 'is_empty') {
return sql<SqlBool>`n.attributes->'fields'->${field.id}->>'value' IS NULL`;
}
if (filter.operator === 'is_not_empty') {
return sql<SqlBool>`n.attributes->'fields'->${field.id}->>'value' IS NOT NULL`;
}
if (filter.value === null || typeof filter.value !== 'string') {
return null;
}
const date = new Date(filter.value);
if (isNaN(date.getTime())) {
return null;
}
const dateString = date.toISOString().split('T')[0];
let operator: string;
switch (filter.operator) {
case 'is_equal_to':
operator = '=';
break;
case 'is_not_equal_to':
operator = '!=';
break;
case 'is_on_or_after':
operator = '>=';
break;
case 'is_on_or_before':
operator = '<=';
break;
case 'is_after':
operator = '>';
break;
case 'is_before':
operator = '<';
break;
default:
return null;
}
return sql<SqlBool>`DATE(n.attributes->'fields'->${field.id}->>'value') ${sql.raw(operator)} ${dateString}`;
}
private buildCreatedAtFilterQuery(
filter: DatabaseViewFieldFilterAttributes,
_: CreatedAtFieldAttributes
): Expression<SqlBool> | null {
if (filter.operator === 'is_empty') {
return sql<SqlBool>`n.created_at IS NULL`;
}
if (filter.operator === 'is_not_empty') {
return sql<SqlBool>`n.created_at IS NOT NULL`;
}
if (filter.value === null || typeof filter.value !== 'string') {
return null;
}
const date = new Date(filter.value);
if (isNaN(date.getTime())) {
return null;
}
const dateString = date.toISOString().split('T')[0];
let operator: string;
switch (filter.operator) {
case 'is_equal_to':
operator = '=';
break;
case 'is_not_equal_to':
operator = '!=';
break;
case 'is_on_or_after':
operator = '>=';
break;
case 'is_on_or_before':
operator = '<=';
break;
case 'is_after':
operator = '>';
break;
case 'is_before':
operator = '<';
break;
default:
return null;
}
return sql<SqlBool>`DATE(n.created_at) ${sql.raw(operator)} ${dateString}`;
}
private buildSortOrdersQuery(
sorts: DatabaseViewSortAttributes[],
fields: Record<string, FieldAttributes>
): string {
return sorts
.map((sort) => this.buildSortOrderQuery(sort, fields))
.filter((query): query is string => query !== null)
.join(', ');
}
private buildSortOrderQuery(
sort: DatabaseViewSortAttributes,
fields: Record<string, FieldAttributes>
): string | null {
const field = fields[sort.fieldId];
if (!field) {
return null;
}
if (field.type === 'created_at') {
return `n.created_at ${sort.direction}`;
}
if (field.type === 'created_by') {
return `n.created_by ${sort.direction}`;
}
return `n.attributes->'fields'->${sort.fieldId}->>'value' ${sort.direction}`;
}
}
export const recordsRetrievalService = new RecordsRetrievalService(database);

View File

@@ -151,18 +151,28 @@ services:
# ───────────────────────────────────────────────────────────────
# SMTP configuration
# ---------------------------------------------------------------
# If you don't want to use SMTP, set:
# SMTP_ENABLED: 'false'
# ---------------------------------------------------------------
# If using the local Mailpit service (defined above), use:
# SMTP_ENABLED: 'true'
# SMTP_HOST: 'smtp'
# SMTP_PORT: '1025'
# SMTP_USER: ''
# SMTP_PASSWORD: ''
#
# SMTP_EMAIL_FROM: 'your_email@example.com'
# SMTP_EMAIL_FROM_NAME: 'Colanode'
# ---------------------------------------------------------------
# If using a real SMTP provider, update these:
# SMTP_ENABLED: 'true'
# SMTP_HOST: 'your_smtp_provider_host'
# SMTP_PORT: '587' # Or 465, etc.
# SMTP_USER: 'your_smtp_username'
# SMTP_PASSWORD: 'your_smtp_password'
# SMTP_EMAIL_FROM: 'your_email@example.com'
# SMTP_EMAIL_FROM_NAME: 'Colanode'
# ---------------------------------------------------------------
SMTP_ENABLED: 'true'
SMTP_HOST: 'smtp' # Defaulting to Mailpit service name
SMTP_PORT: '1025' # Defaulting to Mailpit port
SMTP_USER: ''
@@ -173,7 +183,7 @@ services:
# ───────────────────────────────────────────────────────────────
# AI Configuration
# ───────────────────────────────────────────────────────────────
AI_ENABLED: 'true'
AI_ENABLED: 'false'
AI_NODE_EMBEDDING_DELAY: '5000'
AI_DOCUMENT_EMBEDDING_DELAY: '10000'