Add recurring jobs to check node and document embeddings

This commit is contained in:
Hakan Shehu
2025-03-03 23:07:50 +01:00
parent e83b60b513
commit d095dbbd16
18 changed files with 358 additions and 88 deletions

View File

@@ -6,6 +6,7 @@ export const createNodeEmbeddingsTable: Migration = {
.createTable('node_embeddings')
.addColumn('node_id', 'varchar(30)', (col) => col.notNull())
.addColumn('chunk', 'integer', (col) => col.notNull())
.addColumn('revision', 'bigint', (col) => col.notNull())
.addColumn('workspace_id', 'varchar(30)', (col) => col.notNull())
.addColumn('text', 'text', (col) => col.notNull())
.addColumn('summary', 'text')

View File

@@ -6,6 +6,7 @@ export const createDocumentEmbeddingsTable: Migration = {
.createTable('document_embeddings')
.addColumn('document_id', 'varchar(30)', (col) => col.notNull())
.addColumn('chunk', 'integer', (col) => col.notNull())
.addColumn('revision', 'bigint', (col) => col.notNull())
.addColumn('workspace_id', 'varchar(30)', (col) => col.notNull())
.addColumn('text', 'text', (col) => col.notNull())
.addColumn('summary', 'text')

View File

@@ -261,6 +261,7 @@ export type UpdateUpload = Updateable<UploadTable>;
interface NodeEmbeddingTable {
node_id: ColumnType<string, string, never>;
chunk: ColumnType<number, number, number>;
revision: ColumnType<bigint, bigint, bigint>;
workspace_id: ColumnType<string, string, never>;
text: ColumnType<string, string, string>;
summary: ColumnType<string | null, string | null, string | null>;
@@ -277,6 +278,7 @@ export type UpdateNodeEmbedding = Updateable<NodeEmbeddingTable>;
interface DocumentEmbeddingTable {
document_id: ColumnType<string, string, never>;
chunk: ColumnType<number, number, number>;
revision: ColumnType<bigint, bigint, bigint>;
workspace_id: ColumnType<string, string, never>;
text: ColumnType<string, string, string>;
summary: ColumnType<string | null, string | null, string | null>;

View File

@@ -0,0 +1,84 @@
import { extractDocumentText } from '@colanode/core';
import { database } from '@/data/database';
import { configuration } from '@/lib/configuration';
import {
fetchEmbeddingCursor,
scheduleDocumentEmbedding,
updateEmbeddingCursor,
} from '@/lib/embeddings';
const BATCH_SIZE = 100;
export type CheckDocumentEmbeddingsInput = {
type: 'check_document_embeddings';
};
declare module '@/types/jobs' {
interface JobMap {
check_document_embeddings: {
input: CheckDocumentEmbeddingsInput;
};
}
}
export const checkDocumentEmbeddingsHandler = async () => {
if (!configuration.ai.enabled) {
return;
}
const cursor = await fetchEmbeddingCursor('document_embeddings');
let hasMore = true;
let lastRevision = cursor;
while (hasMore) {
const documents = await database
.selectFrom('documents')
.selectAll()
.where('revision', '>=', lastRevision)
.orderBy('revision', 'asc')
.limit(BATCH_SIZE)
.execute();
if (documents.length === 0) {
hasMore = false;
continue;
}
for (const document of documents) {
const text = extractDocumentText(document.id, document.content);
if (!text || text.trim() === '') {
await database
.deleteFrom('document_embeddings')
.where('document_id', '=', document.id)
.execute();
return;
}
const firstEmbedding = await database
.selectFrom('document_embeddings')
.select(['revision'])
.where('document_id', '=', document.id)
.orderBy('created_at', 'asc')
.executeTakeFirst();
const revision = firstEmbedding?.revision ?? 0n;
if (revision >= document.revision) {
continue;
}
await scheduleDocumentEmbedding(document.id);
}
if (documents.length > 0) {
lastRevision = documents[documents.length - 1]?.revision ?? 0n;
}
if (documents.length < BATCH_SIZE) {
hasMore = false;
}
}
await updateEmbeddingCursor('document_embeddings', lastRevision);
};

View File

@@ -0,0 +1,97 @@
import { getNodeModel } from '@colanode/core';
import { database } from '@/data/database';
import { configuration } from '@/lib/configuration';
import {
fetchEmbeddingCursor,
scheduleNodeEmbedding,
updateEmbeddingCursor,
} from '@/lib/embeddings';
const BATCH_SIZE = 100;
export type CheckNodeEmbeddingsInput = {
type: 'check_node_embeddings';
};
declare module '@/types/jobs' {
interface JobMap {
check_node_embeddings: {
input: CheckNodeEmbeddingsInput;
};
}
}
export const checkNodeEmbeddingsHandler = async () => {
if (!configuration.ai.enabled) {
return;
}
const cursor = await fetchEmbeddingCursor('node_embeddings');
let hasMore = true;
let lastRevision = cursor;
while (hasMore) {
const nodes = await database
.selectFrom('nodes')
.selectAll()
.where('revision', '>=', lastRevision)
.orderBy('revision', 'asc')
.limit(BATCH_SIZE)
.execute();
if (nodes.length === 0) {
hasMore = false;
continue;
}
for (const node of nodes) {
const nodeModel = getNodeModel(node.attributes.type);
if (!nodeModel) {
continue;
}
const nodeText = nodeModel.extractNodeText(node.id, node.attributes);
if (!nodeText) {
continue;
}
if (nodeText === null) {
continue;
}
if (!nodeText.attributes || nodeText.attributes.trim() === '') {
await database
.deleteFrom('node_embeddings')
.where('node_id', '=', node.id)
.execute();
continue;
}
const firstEmbedding = await database
.selectFrom('node_embeddings')
.select(['revision'])
.where('node_id', '=', node.id)
.orderBy('created_at', 'asc')
.executeTakeFirst();
const revision = firstEmbedding?.revision ?? 0n;
if (revision >= node.revision) {
continue;
}
await scheduleNodeEmbedding(node);
}
if (nodes.length > 0) {
lastRevision = nodes[nodes.length - 1]?.revision ?? 0n;
}
if (nodes.length < BATCH_SIZE) {
hasMore = false;
}
}
await updateEmbeddingCursor('node_embeddings', lastRevision);
};

View File

@@ -32,7 +32,7 @@ export const embedDocumentHandler = async (input: {
const { documentId } = input;
const document = await database
.selectFrom('documents')
.select(['id', 'content', 'workspace_id', 'created_at'])
.select(['id', 'content', 'workspace_id', 'created_at', 'revision'])
.where('id', '=', documentId)
.executeTakeFirst();
@@ -68,10 +68,17 @@ export const embedDocumentHandler = async (input: {
const existingEmbeddings = await database
.selectFrom('document_embeddings')
.select(['chunk', 'text', 'summary'])
.select(['chunk', 'revision', 'text', 'summary'])
.where('document_id', '=', documentId)
.execute();
const revision =
existingEmbeddings.length > 0 ? existingEmbeddings[0]!.revision : 0n;
if (revision >= document.revision) {
return;
}
const textChunks = await chunkText(
text,
existingEmbeddings.map((e) => ({
@@ -96,6 +103,7 @@ export const embedDocumentHandler = async (input: {
embeddingsToUpsert.push({
document_id: documentId,
chunk: i,
revision: document.revision,
workspace_id: document.workspace_id,
text: chunk.text,
summary: chunk.summary,
@@ -131,6 +139,7 @@ export const embedDocumentHandler = async (input: {
embeddingsToUpsert.map((embedding) => ({
document_id: embedding.document_id,
chunk: embedding.chunk,
revision: embedding.revision,
workspace_id: embedding.workspace_id,
text: embedding.text,
summary: embedding.summary,

View File

@@ -67,12 +67,20 @@ export const embedNodeHandler = async (input: {
const existingEmbeddings = await database
.selectFrom('node_embeddings')
.select(['chunk', 'text', 'summary'])
.select(['chunk', 'revision', 'text', 'summary'])
.where('node_id', '=', nodeId)
.execute();
const revision =
existingEmbeddings.length > 0 ? existingEmbeddings[0]!.revision : 0n;
if (revision >= node.revision) {
return;
}
const fullText =
`${nodeText.name ?? ''}\n\n${nodeText.attributes ?? ''}`.trim();
const textChunks = await chunkText(
fullText,
existingEmbeddings.map((e) => ({
@@ -97,6 +105,7 @@ export const embedNodeHandler = async (input: {
embeddingsToUpsert.push({
node_id: nodeId,
chunk: i,
revision: node.revision,
workspace_id: node.workspace_id,
text: chunk.text,
summary: chunk.summary,
@@ -132,6 +141,7 @@ export const embedNodeHandler = async (input: {
embeddingsToUpsert.map((embedding) => ({
node_id: embedding.node_id,
chunk: embedding.chunk,
revision: embedding.revision,
workspace_id: embedding.workspace_id,
text: embedding.text,
summary: embedding.summary,

View File

@@ -2,9 +2,12 @@ import { cleanNodeDataHandler } from '@/jobs/clean-node-data';
import { cleanWorkspaceDataHandler } from '@/jobs/clean-workspace-data';
import { JobHandler, JobMap } from '@/types/jobs';
import { sendEmailVerifyEmailHandler } from '@/jobs/send-email-verify-email';
import { embedNodeHandler } from './embed-node';
import { embedDocumentHandler } from './embed-document';
import { assistantResponseHandler } from './assistant-response';
import { embedNodeHandler } from '@/jobs/embed-node';
import { embedDocumentHandler } from '@/jobs/embed-document';
import { assistantResponseHandler } from '@/jobs/assistant-response';
import { checkNodeEmbeddingsHandler } from '@/jobs/check-node-embeddings';
import { checkDocumentEmbeddingsHandler } from '@/jobs/check-document-embeddings';
type JobHandlerMap = {
[K in keyof JobMap]: JobHandler<JobMap[K]['input']>;
};
@@ -16,4 +19,6 @@ export const jobHandlerMap: JobHandlerMap = {
embed_node: embedNodeHandler,
embed_document: embedDocumentHandler,
assistant_response: assistantResponseHandler,
check_node_embeddings: checkNodeEmbeddingsHandler,
check_document_embeddings: checkDocumentEmbeddingsHandler,
};

View File

@@ -1,4 +1,5 @@
import { Document } from '@langchain/core/documents';
import { SearchResult } from '@/types/retrieval';
import { RerankedContextItem } from '@/types/assistant';
import { NodeMetadata, DocumentMetadata } from '@/types/metadata';
@@ -151,7 +152,7 @@ const processSearchResult = (
) => {
const key = createKey(result);
const recencyBoost = calculateRecencyBoost(result.createdAt);
let normalizedScore = isKeyword
const normalizedScore = isKeyword
? (result.score / maxScore) * weight
: ((maxScore - result.score) / maxScore) * weight;

View File

@@ -1,8 +1,14 @@
import { StateGraph } from '@langchain/langgraph';
import { Document } from '@langchain/core/documents';
import { CallbackHandler } from 'langfuse-langchain';
import {
DatabaseAttributes,
getNodeModel,
RecordAttributes,
} from '@colanode/core';
import { database } from '@/data/database';
import { configuration } from '@/lib/configuration';
import { CallbackHandler } from 'langfuse-langchain';
import { fetchNode, fetchNodeDescendants } from '@/lib/nodes';
import {
rewriteQuery,
@@ -15,11 +21,6 @@ import {
import { nodeRetrievalService } from '@/services/node-retrieval-service';
import { documentRetrievalService } from '@/services/document-retrieval-service';
import { recordsRetrievalService } from '@/services/records-retrieval-service';
import {
DatabaseAttributes,
getNodeModel,
RecordAttributes,
} from '@colanode/core';
import {
AssistantChainState,
ResponseState,

View File

@@ -1,6 +1,7 @@
import { RecursiveCharacterTextSplitter } from 'langchain/text_splitter';
import { configuration } from '@/lib/configuration';
import type { NodeType } from '@colanode/core';
import { configuration } from '@/lib/configuration';
import { enrichChunk } from '@/services/llm-service';
import { TextChunk } from '@/types/chunking';
@@ -21,24 +22,25 @@ export const chunkText = async (
.map((doc) => ({ text: doc.pageContent }))
.filter((c) => c.text.trim().length > 5);
if (configuration.ai.chunking.enhanceWithContext) {
const enrichedChunks: TextChunk[] = [];
for (const chunk of chunks) {
const existingChunk = existingChunks.find((ec) => ec.text === chunk.text);
if (existingChunk?.summary) {
enrichedChunks.push({
text: chunk.text,
summary: existingChunk.summary,
});
continue;
}
const summary = await enrichChunk(chunk.text, text, nodeType);
enrichedChunks.push({ text: chunk.text, summary });
}
return enrichedChunks;
if (!configuration.ai.chunking.enhanceWithContext) {
return chunks;
}
return chunks;
const enrichedChunks: TextChunk[] = [];
for (const chunk of chunks) {
const existingChunk = existingChunks.find((ec) => ec.text === chunk.text);
if (existingChunk?.summary) {
enrichedChunks.push({
text: chunk.text,
summary: existingChunk.summary,
});
continue;
}
const summary = await enrichChunk(chunk.text, text, nodeType);
enrichedChunks.push({ text: chunk.text, summary });
}
return enrichedChunks;
};

View File

@@ -15,8 +15,7 @@ import { ConcurrentUpdateResult, UpdateDocumentOutput } from '@/types/nodes';
import { eventBus } from '@/lib/event-bus';
import { fetchNode, fetchNodeTree, mapNode } from '@/lib/nodes';
import { CreateDocumentInput, CreateDocumentOutput } from '@/types/documents';
import { jobService } from '@/services/job-service';
import { configuration } from '@/lib/configuration';
import { scheduleDocumentEmbedding } from '@/lib/embeddings';
const debug = createDebugger('server:lib:documents');
@@ -299,16 +298,3 @@ const tryUpdateDocumentFromMutation = async (
return { type: 'retry', output: null };
}
};
const scheduleDocumentEmbedding = async (documentId: string) => {
await jobService.addJob(
{
type: 'embed_document',
documentId,
},
{
jobId: `embed_document:${documentId}`,
delay: configuration.ai.documentEmbeddingDelay,
}
);
};

View File

@@ -0,0 +1,67 @@
import { MessageAttributes } from '@colanode/core';
import { redis } from '@/data/redis';
import { SelectNode } from '@/data/schema';
import { configuration } from '@/lib/configuration';
import { jobService } from '@/services/job-service';
export const fetchEmbeddingCursor = async (
cursorId: string
): Promise<bigint> => {
const cursorStringValue = await redis.get(`embedding_cursor:${cursorId}`);
if (!cursorStringValue) {
return 0n;
}
return BigInt(cursorStringValue);
};
export const updateEmbeddingCursor = async (
cursorId: string,
value: bigint
) => {
await redis.set(`embedding_cursor:${cursorId}`, value.toString());
};
export const deleteEmbeddingCursor = async (cursorId: string) => {
await redis.del(`embedding_cursor:${cursorId}`);
};
export const scheduleNodeEmbedding = async (node: SelectNode) => {
if (node.type === 'message') {
const attributes = node.attributes as MessageAttributes;
if (attributes.subtype === 'question' || attributes.subtype === 'answer') {
return;
}
}
const jobOptions: { jobId: string; delay?: number } = {
jobId: `embed_node:${node.id}`,
};
// Only add delay for non-message nodes
if (node.type !== 'message') {
jobOptions.delay = configuration.ai.nodeEmbeddingDelay;
}
await jobService.addJob(
{
type: 'embed_node',
nodeId: node.id,
},
jobOptions
);
};
export const scheduleDocumentEmbedding = async (documentId: string) => {
await jobService.addJob(
{
type: 'embed_document',
documentId,
},
{
jobId: `embed_document:${documentId}`,
delay: configuration.ai.documentEmbeddingDelay,
}
);
};

View File

@@ -2,7 +2,9 @@ import {
getNodeModel,
FieldAttributes,
RecordAttributes,
DatabaseAttributes,
} from '@colanode/core';
import { database } from '@/data/database';
import {
NodeMetadata,
@@ -14,7 +16,6 @@ import {
DatabaseInfo,
DatabaseFieldInfo,
} from '@/types/metadata';
import { DatabaseAttributes } from '@colanode/core';
const fetchBaseMetadata = async (
id: string,

View File

@@ -39,7 +39,7 @@ import {
} from '@/lib/collaborations';
import { jobService } from '@/services/job-service';
import { deleteFile } from '@/lib/files';
import { configuration } from '@/lib/configuration';
import { scheduleNodeEmbedding } from '@/lib/embeddings';
const debug = createDebugger('server:lib:nodes');
@@ -803,29 +803,3 @@ export const deleteNode = async (
node: deletedNode,
};
};
const scheduleNodeEmbedding = async (node: SelectNode) => {
if (node.type === 'message') {
const attributes = node.attributes as MessageAttributes;
if (attributes.subtype === 'question' || attributes.subtype === 'answer') {
return;
}
}
const jobOptions: { jobId: string; delay?: number } = {
jobId: `embed_node:${node.id}`,
};
// Only add delay for non-message nodes
if (node.type !== 'message') {
jobOptions.delay = configuration.ai.nodeEmbeddingDelay;
}
await jobService.addJob(
{
type: 'embed_node',
nodeId: node.id,
},
jobOptions
);
};

View File

@@ -39,6 +39,34 @@ class JobService {
this.jobQueue.on('error', (error) => {
debug(`Job queue error: ${error}`);
});
this.jobQueue.upsertJobScheduler(
'check_node_embeddings',
{ pattern: '0 */30 * * * *' },
{
name: 'check_node_embeddings',
data: { type: 'check_node_embeddings' } as JobInput,
opts: {
backoff: 3,
attempts: 5,
removeOnFail: 1000,
},
}
);
this.jobQueue.upsertJobScheduler(
'check_document_embeddings',
{ pattern: '0 */30 * * * *' },
{
name: 'check_document_embeddings',
data: { type: 'check_document_embeddings' } as JobInput,
opts: {
backoff: 3,
attempts: 5,
removeOnFail: 1000,
},
}
);
}
public async initWorker() {
@@ -67,6 +95,8 @@ class JobService {
const input = job.data as JobInput;
const handler = jobHandlerMap[input.type] as JobHandler<typeof input>;
await handler(input);
debug(`Job ${job.id} with type ${input.type} completed.`);
};
}

View File

@@ -2,8 +2,7 @@ import { ChatOpenAI } from '@langchain/openai';
import { ChatGoogleGenerativeAI } from '@langchain/google-genai';
import { StringOutputParser } from '@langchain/core/output_parsers';
import { Document } from '@langchain/core/documents';
import { CallbackHandler } from 'langfuse-langchain';
import { SystemMessage } from '@langchain/core/messages';
import { NodeType, RecordNode } from '@colanode/core';
import { configuration } from '@/lib/configuration';
import {
@@ -26,15 +25,6 @@ import {
databaseFilterPrompt,
chunkSummarizationPrompt,
} from '@/lib/llm-prompts';
import { NodeType } from '@colanode/core';
const langfuseCallback = configuration.ai.langfuse.enabled
? new CallbackHandler({
publicKey: configuration.ai.langfuse.publicKey,
secretKey: configuration.ai.langfuse.secretKey,
baseUrl: configuration.ai.langfuse.baseUrl,
})
: undefined;
const getChatModel = (
task: keyof typeof configuration.ai.models
@@ -158,7 +148,7 @@ export const generateDatabaseFilters = async (args: {
id: string;
name: string;
fields: Record<string, { type: string; name: string }>;
sampleRecords: any[];
sampleRecords: RecordNode[];
}>;
}): Promise<DatabaseFilterResult> => {
const task = 'databaseFilter';

View File

@@ -90,9 +90,18 @@ export const recordModel: NodeModel = {
throw new Error('Invalid node type');
}
const texts: string[] = [];
for (const field of Object.values(attributes.fields)) {
if (field.type === 'text') {
texts.push(field.value);
} else if (field.type === 'string') {
texts.push(field.value);
}
}
return {
name: attributes.name,
attributes: null,
attributes: texts.join('\n'),
};
},
};