switched back to ai-sdk only because of mastra's incompatibility with zod v4

This commit is contained in:
Ylber Gashi
2025-08-27 22:06:50 +02:00
parent 03c3523206
commit 58efe1931c
28 changed files with 1356 additions and 3180 deletions

View File

@@ -19,6 +19,7 @@
},
"description": "",
"devDependencies": {
"@types/ms": "^2.1.0",
"@types/node": "^24.2.0",
"@types/nodemailer": "^6.4.17",
"@types/pg": "^8.15.5",
@@ -28,22 +29,20 @@
"tsx": "^4.20.3"
},
"dependencies": {
"@ai-sdk/google": "^1.2.22",
"@ai-sdk/openai": "^1.3.22",
"@ai-sdk/google": "^2.0.6",
"@ai-sdk/openai": "^2.0.15",
"@aws-sdk/client-s3": "^3.863.0",
"@colanode/core": "*",
"@colanode/crdt": "*",
"@fastify/cors": "^11.0.1",
"@fastify/websocket": "^11.1.0",
"@mastra/core": "latest",
"@mastra/memory": "latest",
"@mastra/pg": "^0.13.1",
"@mastra/rag": "^1.0.6",
"@node-rs/argon2": "^2.0.2",
"ai": "^4.3.19",
"@opentelemetry/auto-instrumentations-node": "^0.62.1",
"@opentelemetry/sdk-node": "^0.203.0",
"@redis/client": "^5.8.0",
"@tus/s3-store": "^2.0.0",
"@tus/server": "^2.3.0",
"ai": "^5.0.15",
"bullmq": "^5.56.9",
"diff": "^8.0.2",
"dotenv": "^17.2.1",
@@ -53,8 +52,7 @@
"js-sha256": "^0.11.0",
"ky": "^1.8.2",
"kysely": "^0.28.4",
"langchain": "^0.3.30",
"langfuse-langchain": "^3.38.4",
"langfuse-vercel": "^3.38.4",
"ms": "^2.1.3",
"nodemailer": "^7.0.5",
"pg": "^8.16.3",

View File

@@ -6,6 +6,8 @@ import { initRedis } from '@colanode/server/data/redis';
import { eventBus } from '@colanode/server/lib/event-bus';
import { emailService } from '@colanode/server/services/email-service';
import { jobService } from '@colanode/server/services/job-service';
import { initObservability } from '@colanode/server/lib/observability/otel';
import { initAssistantTrigger } from '@colanode/server/services/assistant-trigger';
dotenv.config({
quiet: true,
@@ -22,6 +24,11 @@ const init = async () => {
await eventBus.init();
await emailService.init();
// Subscribe after event bus init and job queue ready
initAssistantTrigger();
initObservability();
};
init();

View File

@@ -5,12 +5,10 @@ import {
getNodeModel,
MessageAttributes,
} from '@colanode/core';
import { Mastra } from '@mastra/core';
import { RuntimeContext } from '@mastra/core/runtime-context';
import { database } from '@colanode/server/data/database';
import { SelectNode } from '@colanode/server/data/schema';
import { JobHandler } from '@colanode/server/jobs';
import { assistantWorkflow } from '@colanode/server/lib/ai/ai-workflow';
import { runAssistantWorkflow } from '@colanode/server/lib/ai/ai-workflow';
import {
AssistantWorkflowInput,
AssistantWorkflowOutput,
@@ -91,10 +89,10 @@ export const assistantRespondHandler: JobHandler<
console.log(`🚀 Processing AI assistant request for message: ${messageId}`);
const startTime = Date.now();
// Prepare request for the AI service
const assistantRequest: AssistantWorkflowInput = {
userInput: messageText,
workspaceId,
workspaceName: workspace.name || workspaceId,
userId: user.id,
userDetails: {
name: user.name || 'User',
@@ -105,54 +103,19 @@ export const assistantRespondHandler: JobHandler<
selectedContextNodeIds,
};
// Prepare runtime context
const runtimeContext = new RuntimeContext();
runtimeContext.set('workspaceName', workspace.name || workspaceId);
runtimeContext.set('userName', user.name || 'User');
runtimeContext.set('userEmail', user.email || '');
runtimeContext.set('workspaceId', workspaceId);
runtimeContext.set('userId', user.id);
runtimeContext.set('selectedContextNodeIds', selectedContextNodeIds || []);
runtimeContext.set('userInput', messageText);
// Initialize Mastra and get the workflow
const mastra = new Mastra({
workflows: {
assistantWorkflow,
},
});
const workflow = mastra.getWorkflow('assistantWorkflow');
const run = await workflow.createRunAsync();
// Execute the workflow
const result = await run.start({
inputData: assistantRequest,
runtimeContext,
});
if (result.status !== 'success' || !result.result) {
const errorMessage =
result.status === 'suspended'
? 'Workflow was suspended unexpectedly'
: (result as any).error || 'Workflow execution failed';
console.error('❌ Workflow failed:', errorMessage);
throw new Error(errorMessage);
}
const assistantResult: AssistantWorkflowOutput = {
...result.result,
processingTimeMs: Date.now() - startTime,
};
const result: AssistantWorkflowOutput =
await runAssistantWorkflow(assistantRequest);
result.processingTimeMs = Date.now() - startTime;
console.log(
`✅ AI response generated (${assistantResult.processingTimeMs}ms): ${
assistantResult.searchPerformed ? 'with search' : 'no search'
`✅ AI response generated (${result.processingTimeMs}ms): ${
result.searchPerformed ? 'with search' : 'no search'
}`
);
await createAndPublishResponse(
assistantResult.finalAnswer,
assistantResult.citations,
result.finalAnswer,
result.citations,
message,
workspaceId
);

View File

@@ -25,6 +25,7 @@ declare module '@colanode/server/jobs' {
export const documentEmbedScanHandler: JobHandler<
DocumentEmbedScanInput
> = async () => {
console.log('document embed scan job');
if (!config.ai.enabled) {
return;
}
@@ -55,7 +56,7 @@ export const documentEmbedScanHandler: JobHandler<
.where('document_id', '=', document.id)
.execute();
return;
continue;
}
const firstEmbedding = await database

View File

@@ -1,4 +1,3 @@
import { createOpenAI } from '@ai-sdk/openai';
import { embedMany } from 'ai';
import { sql } from 'kysely';
@@ -7,6 +6,7 @@ import { database } from '@colanode/server/data/database';
import { CreateDocumentEmbedding } from '@colanode/server/data/schema';
import { JobHandler } from '@colanode/server/jobs';
import { chunkText } from '@colanode/server/lib/ai/chunking';
import { getEmbeddingModel } from '@colanode/server/lib/ai/ai-models';
import { config } from '@colanode/server/lib/config';
import { fetchNode } from '@colanode/server/lib/nodes';
@@ -26,10 +26,13 @@ declare module '@colanode/server/jobs' {
export const documentEmbedHandler: JobHandler<DocumentEmbedInput> = async (
input
) => {
try {
if (!config.ai.enabled) {
return;
}
console.log('document embed job', input);
const { documentId } = input;
const document = await database
.selectFrom('documents')
@@ -37,17 +40,21 @@ export const documentEmbedHandler: JobHandler<DocumentEmbedInput> = async (
.where('id', '=', documentId)
.executeTakeFirst();
console.log('document', document?.id);
if (!document) {
console.log('document not found');
return;
}
const node = await fetchNode(documentId);
if (!node) {
console.log('node not found');
return;
}
const nodeModel = getNodeModel(node.type);
if (!nodeModel?.documentSchema) {
console.log('node model not found');
return;
}
@@ -58,28 +65,36 @@ export const documentEmbedHandler: JobHandler<DocumentEmbedInput> = async (
.where('document_id', '=', documentId)
.execute();
console.log('no text');
return;
}
const openaiClient = createOpenAI({ apiKey: config.ai.embedding.apiKey });
const embeddingModel = openaiClient.embedding(config.ai.embedding.modelName);
console.log('sending request to openai');
console.log('config.ai.embedding.apiKey', config.ai.embedding.apiKey);
const embeddingModel = getEmbeddingModel();
console.log('getting existing embeddings');
const existingEmbeddings = await database
.selectFrom('document_embeddings')
.select(['chunk', 'revision', 'text', 'summary'])
.where('document_id', '=', documentId)
.execute();
console.log('existing embeddings', existingEmbeddings.length);
const revision =
existingEmbeddings.length > 0 ? existingEmbeddings[0]!.revision : 0n;
if (revision >= document.revision) {
console.log('revision is up to date');
return;
}
const textChunks = await chunkText(
text,
existingEmbeddings.map((e) => ({
existingEmbeddings.map((e: { text: string; summary: string | null }) => ({
text: e.text,
summary: e.summary ?? undefined,
})),
@@ -87,14 +102,20 @@ export const documentEmbedHandler: JobHandler<DocumentEmbedInput> = async (
);
const embeddingsToUpsert: CreateDocumentEmbedding[] = [];
console.log('textChunks', textChunks.length);
for (let i = 0; i < textChunks.length; i++) {
console.log('chunk', i);
const chunk = textChunks[i];
if (!chunk) {
console.log('chunk is undefined');
continue;
}
const existing = existingEmbeddings.find((e) => e.chunk === i);
const existing = existingEmbeddings.find(
(e: { chunk: number; text: string }) => e.chunk === i
);
if (existing && existing.text === chunk.text) {
console.log('chunk already exists');
continue;
}
@@ -110,6 +131,8 @@ export const documentEmbedHandler: JobHandler<DocumentEmbedInput> = async (
});
}
console.log('embeddingsToUpsert', embeddingsToUpsert.length);
const batchSize = config.ai.embedding.batchSize;
for (let i = 0; i < embeddingsToUpsert.length; i += batchSize) {
const batch = embeddingsToUpsert.slice(i, i + batchSize);
@@ -117,10 +140,18 @@ export const documentEmbedHandler: JobHandler<DocumentEmbedInput> = async (
item.summary ? `${item.summary}\n\n${item.text}` : item.text
);
console.log('calling embedMany');
const { embeddings: embeddingVectors } = await embedMany({
model: embeddingModel,
values: textsToEmbed,
providerOptions: {
openai: {
dimensions: config.ai.embedding.dimensions,
},
},
});
console.log('embedding vectors', embeddingVectors.length);
for (let j = 0; j < batch.length; j++) {
const vector = embeddingVectors[j];
const batchItem = batch[j];
@@ -130,33 +161,43 @@ export const documentEmbedHandler: JobHandler<DocumentEmbedInput> = async (
}
}
if (embeddingsToUpsert.length === 0) {
// Filter out entries with empty vectors
const ready = embeddingsToUpsert.filter((e) => e.embedding_vector.length > 0);
if (ready.length === 0) {
console.log('no embeddings to upsert');
return;
}
await database
.insertInto('document_embeddings')
.values(
embeddingsToUpsert.map((embedding) => ({
document_id: embedding.document_id,
chunk: embedding.chunk,
revision: embedding.revision,
workspace_id: embedding.workspace_id,
text: embedding.text,
summary: embedding.summary,
embedding_vector: sql.raw(
`'[${embedding.embedding_vector.join(',')}]'::vector`
),
created_at: embedding.created_at,
}))
)
.onConflict((oc) =>
oc.columns(['document_id', 'chunk']).doUpdateSet({
text: sql.ref('excluded.text'),
summary: sql.ref('excluded.summary'),
embedding_vector: sql.ref('excluded.embedding_vector'),
updated_at: new Date(),
})
)
.execute();
console.log('upserting embeddings');
await database
.insertInto('document_embeddings')
.values(
ready.map((embedding) => ({
document_id: embedding.document_id,
chunk: embedding.chunk,
revision: embedding.revision,
workspace_id: embedding.workspace_id,
text: embedding.text,
summary: embedding.summary,
embedding_vector: sql.raw(
`'[${embedding.embedding_vector.join(',')}]'::vector`
),
created_at: embedding.created_at,
}))
)
.onConflict((oc: any) =>
oc.columns(['document_id', 'chunk']).doUpdateSet({
text: sql.ref('excluded.text'),
summary: sql.ref('excluded.summary'),
embedding_vector: sql.ref('excluded.embedding_vector'),
updated_at: new Date(),
})
)
.execute();
} catch (error) {
console.log('error upserting embeddings', error);
throw error;
} finally {
console.log('clearing document embedding schedule');
}
};

View File

@@ -25,6 +25,7 @@ declare module '@colanode/server/jobs' {
export const nodeEmbedScanHandler: JobHandler<
NodeEmbedScanInput
> = async () => {
console.log('node embed scan job');
if (!config.ai.enabled) {
return;
}

View File

@@ -1,4 +1,3 @@
import { createOpenAI } from '@ai-sdk/openai';
import { embedMany } from 'ai';
import { sql } from 'kysely';
@@ -7,6 +6,7 @@ import { database } from '@colanode/server/data/database';
import { CreateNodeEmbedding } from '@colanode/server/data/schema';
import { JobHandler } from '@colanode/server/jobs';
import { chunkText } from '@colanode/server/lib/ai/chunking';
import { getEmbeddingModel } from '@colanode/server/lib/ai/ai-models';
import { config } from '@colanode/server/lib/config';
import { fetchNode } from '@colanode/server/lib/nodes';
@@ -58,8 +58,7 @@ export const nodeEmbedHandler: JobHandler<NodeEmbedInput> = async (input) => {
return;
}
const openaiClient = createOpenAI({ apiKey: config.ai.embedding.apiKey });
const embeddingModel = openaiClient.embedding(config.ai.embedding.modelName);
const embeddingModel = getEmbeddingModel();
const existingEmbeddings = await database
.selectFrom('node_embeddings')
@@ -79,7 +78,7 @@ export const nodeEmbedHandler: JobHandler<NodeEmbedInput> = async (input) => {
const textChunks = await chunkText(
fullText,
existingEmbeddings.map((e) => ({
existingEmbeddings.map((e: { text: string; summary: string | null }) => ({
text: e.text,
summary: e.summary ?? undefined,
})),
@@ -93,7 +92,9 @@ export const nodeEmbedHandler: JobHandler<NodeEmbedInput> = async (input) => {
continue;
}
const existing = existingEmbeddings.find((e) => e.chunk === i);
const existing = existingEmbeddings.find(
(e: { chunk: number; text: string }) => e.chunk === i
);
if (existing && existing.text === chunk.text) {
continue;
}
@@ -124,6 +125,11 @@ export const nodeEmbedHandler: JobHandler<NodeEmbedInput> = async (input) => {
const { embeddings: embeddingVectors } = await embedMany({
model: embeddingModel,
values: textsToEmbed,
providerOptions: {
openai: {
dimensions: config.ai.embedding.dimensions,
},
},
});
for (let j = 0; j < batch.length; j++) {
const vector = embeddingVectors[j];
@@ -134,10 +140,16 @@ export const nodeEmbedHandler: JobHandler<NodeEmbedInput> = async (input) => {
}
}
// Filter out entries with empty vectors
const ready = embeddingsToUpsert.filter((e) => e.embedding_vector.length > 0);
if (ready.length === 0) {
return;
}
await database
.insertInto('node_embeddings')
.values(
embeddingsToUpsert.map((embedding) => ({
ready.map((embedding) => ({
node_id: embedding.node_id,
chunk: embedding.chunk,
revision: embedding.revision,

View File

@@ -1,167 +0,0 @@
import { Agent } from '@mastra/core/agent';
import { ModelConfig } from './ai-models';
export const createIntentAgent = () => {
return new Agent({
name: 'Intent Classifier',
description:
'Classifies whether queries need workspace data or general knowledge',
instructions: ({ runtimeContext }) => {
const workspaceName = runtimeContext?.get('workspaceName') || 'workspace';
const userName = runtimeContext?.get('userName') || 'user';
return `You classify user queries. Your only job is to determine if the query needs workspace data.
WORKSPACE: ${workspaceName}
USER: ${userName}
CLASSIFICATION RULES:
- "no_context" = General knowledge, explanations, how-to questions
- "retrieve" = Workspace content, documents, people, or data
EXAMPLES:
"What is JavaScript?" → no_context
"How do I code?" → no_context
"Explain databases" → no_context
"Find my documents" → retrieve
"Show recent files" → retrieve
"What did John write?" → retrieve
Respond with just the classification and confidence (0-1).`;
},
model: () => ModelConfig.forIntentRecognition(),
});
};
export const createQueryAgent = () => {
return new Agent({
name: 'Query Optimizer',
description: 'Rewrites queries for better search performance',
instructions: () => {
return `You optimize search queries. Your only job is to rewrite queries for better search results.
TASK: Take a user query and create two optimized versions:
1. SEMANTIC QUERY: Natural language for vector search
2. KEYWORD QUERY: Key terms for text search
RULES:
- Remove filler words (the, a, an, is, are, etc.)
- Focus on core concepts and entities
- Keep important context words
- Make queries concise but complete
EXAMPLES:
Input: "Show me documents about the marketing campaign John worked on last month"
Semantic: "marketing campaign documents John authored recent"
Keyword: "marketing campaign John documents month"
Input: "Find all completed projects from Q3"
Semantic: "completed projects third quarter finished"
Keyword: "completed projects Q3 done finished"
Respond with just the two optimized queries.`;
},
model: () => ModelConfig.forIntentRecognition(),
});
};
export const createAnswerAgent = () => {
return new Agent({
name: 'Answer Generator',
description:
'Generates helpful responses using context or general knowledge',
instructions: ({ runtimeContext }) => {
const workspaceName = runtimeContext?.get('workspaceName') || 'workspace';
const userName = runtimeContext?.get('userName') || 'user';
return `You are a helpful assistant for the ${workspaceName} workspace.
USER: ${userName}
YOUR JOB:
- Answer questions clearly and helpfully
- Use provided context when available
- Cite sources when using context: "According to [Source 1]..."
- If no context, use general knowledge
- Be professional but conversational
CONTEXT RULES:
- WITH context: Use it as primary source, cite sources
- WITHOUT context: Use general knowledge, don't mention workspace
RESPONSE STYLE:
- Clear and direct
- Professional but friendly
- Actionable when possible
- Acknowledge limitations honestly
Keep responses focused and helpful.`;
},
model: () => ModelConfig.forAssistant(),
});
};
export const createRerankAgent = () => {
return new Agent({
name: 'Relevance Scorer',
description: 'Scores search results for relevance to user query',
instructions: () => {
return `You score search results for relevance. Your only job is to rate how well each result matches the user's query.
TASK: Score each result from 0.0 to 1.0 based on relevance.
SCORING GUIDE:
- 1.0 = Perfect match, exactly what user needs
- 0.8 = Very relevant, mostly matches query
- 0.6 = Somewhat relevant, partially matches
- 0.4 = Minimally relevant, tangentially related
- 0.2 = Low relevance, barely related
- 0.0 = Not relevant, unrelated to query
CONSIDER:
- Topic match (does content match the query topic?)
- Specificity (does it answer the specific question?)
- Completeness (does it provide useful information?)
- Recency (newer content often more relevant)
Return just the scores for each item.`;
},
model: () => ModelConfig.forReranking(),
});
};
export const createChunkEnrichmentAgent = () => {
return new Agent({
name: 'Chunk Enrichment',
description:
'Creates contextual summaries for text chunks to enhance search retrieval',
instructions: () => {
return `You create concise summaries of text chunks to improve search retrieval. Your only job is to summarize the given chunk within its document context.
TASK: Generate a brief summary (30-50 words) of the chunk that captures its key points and role in the larger document.
GUIDELINES:
- Focus on the main ideas and key information in the chunk
- Consider how the chunk fits into the complete document
- Identify the chunk's purpose or role (introduction, data, conclusion, etc.)
- Use descriptive, neutral language
- Make the summary useful for search and retrieval
- Different content types need different approaches:
* "message": Focus on communication content and context
* "page": Identify document structure and main topics
* "record": Describe the type of data and key fields
* Other types: Adapt summary to content purpose
OUTPUT: Provide only the summary with no additional text or explanations.`;
},
model: () => ModelConfig.forIntentRecognition(),
});
};
export const AgentConfig = {
INTENT_AGENT: 'intent-classifier',
QUERY_AGENT: 'query-optimizer',
ANSWER_AGENT: 'answer-generator',
RERANK_AGENT: 'relevance-scorer',
CHUNK_ENRICHMENT_AGENT: 'chunk-enrichment',
} as const;

View File

@@ -1,460 +0,0 @@
/**
* AI Assistant Demo and Examples
*
* This file demonstrates how to use the new AI workflow-based assistant system
* and provides examples for different use cases.
*/
import { Mastra } from '@mastra/core';
import { RuntimeContext } from '@mastra/core/runtime-context';
import { assistantWorkflow } from './ai-workflow';
import {
AssistantWorkflowInput,
AssistantWorkflowOutput,
} from '@colanode/server/types/ai';
/**
* Process an AI request using the Mastra workflow directly.
*
* @param request - The user's request with context
* @returns Promise resolving to the assistant's response
*/
async function processAIRequest(
request: AssistantWorkflowInput
): Promise<AssistantWorkflowOutput> {
const startTime = Date.now();
// Prepare runtime context
const runtimeContext = new RuntimeContext();
runtimeContext.set('workspaceName', request.workspaceId);
runtimeContext.set('userName', request.userDetails.name);
runtimeContext.set('userEmail', request.userDetails.email);
runtimeContext.set('workspaceId', request.workspaceId);
runtimeContext.set('userId', request.userId);
runtimeContext.set(
'selectedContextNodeIds',
request.selectedContextNodeIds || []
);
runtimeContext.set('userInput', request.userInput);
// Initialize Mastra and get the workflow
const mastra = new Mastra({
workflows: {
assistantWorkflow,
},
});
const workflow = mastra.getWorkflow('assistantWorkflow');
const run = await workflow.createRunAsync();
// Execute the workflow
const result = await run.start({
inputData: request,
runtimeContext,
});
if (result.status !== 'success' || !result.result) {
const errorMessage =
result.status === 'suspended'
? 'Workflow was suspended unexpectedly'
: (result as any).error || 'Workflow execution failed';
console.error('❌ Workflow failed:', errorMessage);
throw new Error(errorMessage);
}
return {
...result.result,
processingTimeMs: Date.now() - startTime,
};
}
/**
* Demo: Basic AI Assistant Usage
*
* Shows how to use the AI service for simple requests
*/
export async function demoBasicUsage() {
console.log('\n🤖 === Basic AI Assistant Usage Demo ===');
try {
const response = await processAIRequest({
userInput: 'What are the latest documents about project management?',
workspaceId: 'demo_workspace_123',
userId: 'demo_user_456',
userDetails: {
name: 'John Doe',
email: 'john@example.com',
},
});
console.log(
'📝 User Query:',
'What are the latest documents about project management?'
);
console.log('🤖 AI Response:', response.finalAnswer);
console.log('🔍 Search Performed:', response.searchPerformed);
console.log('⏱️ Processing Time:', `${response.processingTimeMs}ms`);
console.log('📚 Citations:', response.citations.length);
} catch (error) {
console.error('❌ Demo failed:', error);
}
}
/**
* Demo: Complex Database Query Processing
*
* Shows how the service handles complex database queries
*/
export async function demoComplexDatabaseQuery() {
console.log('\n🗄 === Complex Database Query Demo ===');
try {
const response = await processAIRequest({
userInput:
'Can you help me understand the Q3 sales data from our database?',
workspaceId: 'demo_workspace_123',
userId: 'demo_user_456',
userDetails: {
name: 'Jane Smith',
email: 'jane@example.com',
},
});
console.log(
'📝 User Query:',
'Can you help me understand the Q3 sales data from our database?'
);
console.log('🤖 AI Response:', response.finalAnswer);
console.log('🔍 Search Performed:', response.searchPerformed);
console.log('⏱️ Processing Time:', `${response.processingTimeMs}ms`);
console.log('📚 Citations Found:', response.citations.length);
} catch (error) {
console.error('❌ Complex query demo failed:', error);
}
}
/**
* Demo: Workflow System Usage
*
* Shows how the new workflow system handles different types of queries
*/
export async function demoWorkflowUsage() {
console.log('\n🔄 === Workflow System Demo ===');
try {
// Test general knowledge query (no_context intent)
console.log('\n📚 Testing general knowledge query...');
const generalResponse = await processAIRequest({
userInput: 'What is TypeScript and why is it useful?',
workspaceId: 'demo_workspace_123',
userId: 'demo_user_456',
userDetails: {
name: 'John Doe',
email: 'john@example.com',
},
});
console.log('📝 Query: What is TypeScript and why is it useful?');
console.log(
'🤖 Response:',
generalResponse.finalAnswer.substring(0, 200) + '...'
);
console.log('🔍 Search Performed:', generalResponse.searchPerformed);
console.log('📚 Citations:', generalResponse.citations.length);
// Test workspace-specific query (retrieve intent)
console.log('\n🔍 Testing workspace-specific query...');
const workspaceResponse = await processAIRequest({
userInput: 'Show me recent documents about project planning',
workspaceId: 'demo_workspace_123',
userId: 'demo_user_456',
userDetails: {
name: 'Jane Smith',
email: 'jane@example.com',
},
});
console.log('📝 Query: Show me recent documents about project planning');
console.log(
'🤖 Response:',
workspaceResponse.finalAnswer.substring(0, 200) + '...'
);
console.log('🔍 Search Performed:', workspaceResponse.searchPerformed);
console.log('📚 Citations:', workspaceResponse.citations.length);
} catch (error) {
console.error('❌ Workflow demo failed:', error);
}
}
/**
* Demo: Error Handling
*
* Shows how the system handles various error scenarios
*/
export async function demoErrorHandling() {
console.log('\n⚠ === Error Handling Demo ===');
// Test with invalid workspace ID
try {
console.log('Testing with invalid workspace ID...');
const response = await processAIRequest({
userInput: 'Test query',
workspaceId: 'invalid_workspace',
userId: 'test_user',
userDetails: {
name: 'Test User',
email: 'test@example.com',
},
});
console.log('🤖 Response (graceful error):', response.finalAnswer);
console.log('⏱️ Processing Time:', `${response.processingTimeMs}ms`);
} catch (error) {
console.log(
'❌ Handled error gracefully:',
error instanceof Error ? error.message : 'Unknown error'
);
}
// Test with empty input
try {
console.log('Testing with empty input...');
const response = await processAIRequest({
userInput: '',
workspaceId: 'demo_workspace_123',
userId: 'test_user',
userDetails: {
name: 'Test User',
email: 'test@example.com',
},
});
console.log('🤖 Response (empty input):', response.finalAnswer);
} catch (error) {
console.log(
'❌ Handled empty input gracefully:',
error instanceof Error ? error.message : 'Unknown error'
);
}
}
/**
* Demo: Performance Testing
*
* Shows performance characteristics with different query types
*/
export async function demoPerformance() {
console.log('\n⚡ === Performance Testing Demo ===');
const queries = [
{ type: 'General Knowledge', query: 'What is TypeScript?' },
{ type: 'Simple Workspace', query: 'Show me recent documents' },
{
type: 'Complex Database',
query: 'Find all projects where status is completed and priority is high',
},
];
for (const { type, query } of queries) {
try {
console.log(`\n🔬 Testing ${type} query...`);
console.log(`📝 Query: "${query}"`);
const startTime = Date.now();
const response = await processAIRequest({
userInput: query,
workspaceId: 'demo_workspace_123',
userId: 'perf_test_user',
userDetails: {
name: 'Performance Tester',
email: 'perf@example.com',
},
});
const totalTime = Date.now() - startTime;
console.log('🔍 Search Performed:', response.searchPerformed);
console.log('⏱️ Service Time:', `${response.processingTimeMs}ms`);
console.log('⏱️ Total Time:', `${totalTime}ms`);
console.log(
'📏 Response Length:',
`${response.finalAnswer.length} chars`
);
} catch (error) {
console.error(`❌ Performance test failed for ${type}:`, error);
}
}
}
/**
* Demo: New Workflow Architecture
*
* Shows the new declarative workflow with proper branching
*/
export async function demoNewWorkflowArchitecture() {
console.log('\n🏗 === New Workflow Architecture Demo ===');
try {
// Test 1: No-context branch (general knowledge)
console.log('\n1⃣ Testing NO_CONTEXT branch (general knowledge)...');
const generalResponse = await processAIRequest({
userInput: 'What is TypeScript and why should I use it?',
workspaceId: 'demo_workspace_123',
userId: 'demo_user_456',
userDetails: {
name: 'Alice Developer',
email: 'alice@example.com',
},
});
console.log('📝 Query: "What is TypeScript and why should I use it?"');
console.log('🎯 Expected Branch: no_context');
console.log(
'🔍 Search Performed:',
generalResponse.searchPerformed ? '❌ Unexpected' : '✅ None (correct)'
);
console.log(
'📚 Citations:',
generalResponse.citations.length === 0
? '✅ None (correct)'
: '❌ Unexpected'
);
console.log(
'💬 Response Preview:',
generalResponse.finalAnswer.substring(0, 150) + '...'
);
// Test 2: Retrieve branch (workspace-specific)
console.log('\n2⃣ Testing RETRIEVE branch (workspace-specific)...');
const workspaceResponse = await processAIRequest({
userInput: 'Show me recent documents about project planning',
workspaceId: 'demo_workspace_123',
userId: 'demo_user_456',
userDetails: {
name: 'Bob Manager',
email: 'bob@example.com',
},
});
console.log('📝 Query: "Show me recent documents about project planning"');
console.log('🎯 Expected Branch: retrieve');
console.log(
'🔍 Search Performed:',
workspaceResponse.searchPerformed
? '✅ Yes (correct)'
: '❌ None (unexpected)'
);
console.log(
'📚 Citations:',
workspaceResponse.citations.length > 0
? '✅ Present (good)'
: '⚠️ None (no results)'
);
console.log(
'💬 Response Preview:',
workspaceResponse.finalAnswer.substring(0, 150) + '...'
);
console.log('\n✅ New workflow architecture demo completed successfully!');
} catch (error) {
console.error('❌ New workflow architecture demo failed:', error);
}
}
/**
* Run all demos including new architecture demonstrations
*
* Executes all demo functions to show the complete system capabilities
*/
export async function runAllDemos() {
console.log('🎬 === AI Assistant System Demo Suite ===');
console.log('This demo showcases the new Mastra-based AI assistant system\n');
try {
// Show the migration comparison first
showMigrationComparison();
// Demo new architecture
await demoNewWorkflowArchitecture();
// Run original demos for compatibility
await demoBasicUsage();
await demoWorkflowUsage();
await demoErrorHandling();
await demoPerformance();
console.log('\n🎉 === All Demos Completed Successfully ===');
console.log('The new Mastra-based AI system is ready for production! 🚀');
} catch (error) {
console.error('❌ Demo suite failed:', error);
}
}
/**
* Migration comparison showing before vs after
*/
export function showMigrationComparison() {
console.log(`
🔄 === MASTRA MIGRATION COMPLETED ===
📊 BEFORE (Complex LangChain System):
┌─────────────────────────────────────────────────────────┐
│ ❌ 600+ lines across multiple complex files │
│ ❌ Manual LangGraph workflow with 14+ imperative nodes │
│ ❌ Complex state management between workflow steps │
│ ❌ Monolithic agents with multi-purpose prompts │
│ ❌ Tightly coupled search and reranking logic │
│ ❌ Poor observability and debugging capabilities │
│ ❌ Not optimized for smaller/self-hosted LLMs │
│ ❌ Difficult to test, maintain, and extend │
└─────────────────────────────────────────────────────────┘
📊 AFTER (Declarative Mastra Workflow System):
┌─────────────────────────────────────────────────────────┐
│ ✅ Fully declarative workflow with proper branching │
│ ✅ Single-purpose agents optimized for smaller LLMs │
│ ✅ Granular tools with focused responsibilities │
│ ✅ Intelligent intent-based routing │
│ ✅ Full observability through Mastra's workflow engine │
│ ✅ Type-safe, maintainable, and easily extensible │
└─────────────────────────────────────────────────────────┘
🎯 KEY ACHIEVEMENTS:
• TRUE Mastra idiomatic implementation with .branch() routing
• BYOM (Bring Your Own Model) optimization for self-hosted LLMs
• Granular tools: semantic search, keyword search, database tools
• Step-by-step observability for debugging and optimization
🏗️ ARCHITECTURE HIGHLIGHTS:
• Declarative workflow: intentClassification → branch(intent) → publish
• Simplified agents: intentClassifier, queryOptimizer, answerGenerator
📁 REFACTORED FILE STRUCTURE:
├── ai-workflow.ts → Declarative Mastra workflow with branching
├── ai-agents.ts → Single-purpose agents optimized for small LLMs
├── ai-tools.ts → Granular tools (semantic, keyword, database)
├── ai-models.ts → Multi-provider model configuration
└── ai-demo.ts → Workflow path demonstrations
`);
}
// Run demos if this file is executed directly
if (require.main === module) {
async function main() {
try {
showMigrationComparison();
console.log(
'\n⚠ Note: These demos require valid workspace and user IDs to run properly.'
);
console.log(
'To run actual demos, update the IDs in the demo functions and uncomment the line below.\n'
);
// Uncomment to run actual demonstrations (requires valid workspace/user IDs)
// await runAllDemos();
} catch (error) {
console.error('Demo error:', error);
}
}
main();
}

View File

@@ -9,7 +9,9 @@ export type AITask =
| 'response' // Main assistant responses
| 'intentRecognition' // Intent classification
| 'rerank' // Document reranking
| 'databaseFilter'; // Database filtering
| 'databaseFilter' // Database filtering
| 'queryRewrite' // Query rewriting
| 'contextEnhancer'; // Context enhancement for chunks
export const getModelForTask = (task: AITask) => {
if (!config.ai.enabled) {
@@ -75,9 +77,31 @@ export const getAvailableProviders = (): AIProvider[] => {
) as AIProvider[];
};
export const getEmbeddingModel = () => {
if (!config.ai.enabled) {
throw new Error('AI is disabled in configuration.');
}
const { provider, modelName, apiKey } = config.ai.embedding;
if (provider === 'openai') {
process.env.OPENAI_API_KEY = apiKey;
return openai.textEmbeddingModel(modelName);
}
if (provider === 'google') {
process.env.GOOGLE_GENERATIVE_AI_API_KEY = apiKey;
return google.textEmbedding(modelName);
}
throw new Error(`Unsupported embedding provider: ${provider}`);
};
export const ModelConfig = {
forAssistant: () => getModelForTask('response'),
forIntentRecognition: () => getModelForTask('intentRecognition'),
forReranking: () => getModelForTask('rerank'),
forDatabaseFilter: () => getModelForTask('databaseFilter'),
forQueryRewrite: () => getModelForTask('queryRewrite'),
forContextEnhancer: () => getModelForTask('contextEnhancer'),
} as const;

View File

@@ -1,343 +0,0 @@
import { createTool } from '@mastra/core/tools';
import { z } from 'zod';
export const createHybridSearchTool = () =>
createTool({
id: 'hybrid-search',
description:
'Perform hybrid (semantic + keyword) search on workspace documents and nodes',
inputSchema: z.object({
semanticQuery: z.string().describe('The semantic search query'),
keywordQuery: z.string().describe('The keyword search query'),
workspaceId: z.string().describe('The workspace ID to search in'),
userId: z.string().describe('The user ID for access control'),
maxResults: z
.number()
.default(10)
.describe('Maximum number of results to return'),
selectedContextNodeIds: z
.array(z.string())
.optional()
.describe('Specific node IDs to search within'),
}),
outputSchema: z.object({
results: z.array(
z.object({
content: z
.string()
.describe('The content of the found document/node'),
type: z
.string()
.describe('The type of content (document, node, etc.)'),
sourceId: z.string().describe('Unique identifier for the source'),
score: z.number().describe('Hybrid relevance score (0-1 scaled)'),
metadata: z
.record(z.any())
.describe('Additional metadata about the source'),
})
),
totalFound: z.number().describe('Total number of results returned'),
searchType: z.literal('hybrid').describe('Hybrid search'),
}),
execute: async ({ context }) => {
const { retrieveNodes } = await import(
'@colanode/server/lib/ai/node-retrievals'
);
const { retrieveDocuments } = await import(
'@colanode/server/lib/ai/document-retrievals'
);
const {
semanticQuery,
keywordQuery,
workspaceId,
userId,
maxResults = 10,
selectedContextNodeIds = [],
} = context;
try {
console.log(
`🔎 Hybrid search for: semantic="${semanticQuery}" keyword="${keywordQuery}" (max: ${maxResults})`
);
// Perform hybrid retrieval per source type
const [nodeResults, documentResults] = await Promise.all([
retrieveNodes(
{
semanticQuery,
keywordQuery,
originalQuery: semanticQuery || keywordQuery,
intent: 'retrieve',
},
workspaceId,
userId,
maxResults,
selectedContextNodeIds
),
retrieveDocuments(
{
semanticQuery,
keywordQuery,
originalQuery: semanticQuery || keywordQuery,
intent: 'retrieve',
},
workspaceId,
userId,
maxResults,
selectedContextNodeIds
),
]);
const allResults = [...nodeResults, ...documentResults].slice(
0,
maxResults
);
// Convert to standardized format (loosen typing for MDocument)
const results = allResults.map((doc) => {
const md = doc as any;
return {
content: md.pageContent || md.text || '',
type: md.metadata?.type || 'document',
sourceId: md.metadata?.id || md.id || '',
score: md.metadata?.score || md.score || 0,
metadata: md.metadata || {},
};
});
console.log(`✅ Hybrid search returned ${results.length} results`);
return {
results,
totalFound: results.length,
searchType: 'hybrid' as const,
};
} catch (error) {
console.error('❌ Hybrid search error:', error);
throw error;
}
},
});
export const createDatabaseSchemaInspectionTool = () =>
createTool({
id: 'database-schema-inspection',
description: 'Get database schemas and structure for a workspace',
inputSchema: z.object({
workspaceId: z.string().describe('The workspace ID'),
userId: z.string().describe('The user ID for access control'),
}),
outputSchema: z.object({
databases: z.array(
z.object({
id: z.string(),
name: z.string(),
fields: z.record(
z.object({
type: z.string(),
name: z.string(),
})
),
sampleRecords: z
.array(z.any())
.describe('Sample records for context'),
})
),
totalDatabases: z.number(),
}),
execute: async ({ context }) => {
const { retrieveByFilters } = await import(
'@colanode/server/lib/records'
);
const { database } = await import('@colanode/server/data/database');
const { workspaceId, userId } = context;
try {
console.log(
`🗄️ Inspecting database schemas for workspace: ${workspaceId}`
);
// Get available databases for the user
const databases = await database
.selectFrom('nodes as n')
.innerJoin('collaborations as c', 'c.node_id', 'n.root_id')
.where('n.type', '=', 'database')
.where('n.workspace_id', '=', workspaceId)
.where('c.collaborator_id', '=', userId)
.where('c.deleted_at', 'is', null)
.selectAll()
.execute();
if (databases.length === 0) {
console.log('📭 No accessible databases found');
return {
databases: [],
totalDatabases: 0,
};
}
console.log(`🔍 Found ${databases.length} accessible databases`);
// Get database schemas and sample data
const databaseSchemas = await Promise.all(
databases.map(async (db: any) => {
const sampleRecords = await retrieveByFilters(
db.id,
workspaceId,
userId,
{ filters: [], sorts: [], page: 1, count: 3 } // Just 3 samples
);
const dbAttrs = db.attributes as any;
const fields = dbAttrs.fields || {};
const formattedFields = Object.entries(fields).reduce(
(acc: any, [id, field]: [string, any]) => ({
...acc,
[id]: {
type: field.type,
name: field.name,
},
}),
{}
);
return {
id: db.id,
name: dbAttrs.name || 'Untitled Database',
fields: formattedFields,
sampleRecords,
};
})
);
console.log(
`✅ Retrieved schemas for ${databaseSchemas.length} databases`
);
return {
databases: databaseSchemas,
totalDatabases: databaseSchemas.length,
};
} catch (error) {
console.error('❌ Database schema inspection error:', error);
throw error;
}
},
});
export const createDatabaseQueryTool = () =>
createTool({
id: 'database-query',
description: 'Execute structured queries against workspace databases',
inputSchema: z.object({
databaseId: z.string().describe('The database ID to query'),
workspaceId: z.string().describe('The workspace ID'),
userId: z.string().describe('The user ID for access control'),
filters: z.array(z.any()).describe('Structured filter conditions'),
maxResults: z.number().default(10).describe('Maximum number of results'),
}),
outputSchema: z.object({
results: z.array(
z.object({
content: z
.string()
.describe('Formatted content of the database record'),
metadata: z.object({
id: z.string(),
type: z.literal('record'),
databaseId: z.string(),
databaseName: z.string(),
createdAt: z.date(),
createdBy: z.string(),
}),
})
),
totalFound: z.number(),
databaseName: z.string(),
}),
execute: async ({ context }) => {
const { retrieveByFilters } = await import(
'@colanode/server/lib/records'
);
const { fetchNode } = await import('@colanode/server/lib/nodes');
const {
databaseId,
workspaceId,
userId,
filters,
maxResults = 10,
} = context;
try {
console.log(
`🗄️ Querying database ${databaseId} with ${filters.length} filters`
);
// Get the database node for metadata
const dbNode = await fetchNode(databaseId);
if (!dbNode || dbNode.type !== 'database') {
console.log('❌ Database not found or not accessible');
return {
results: [],
totalFound: 0,
databaseName: 'Unknown Database',
};
}
const databaseName =
(dbNode.attributes as any).name || 'Untitled Database';
// Execute the query
const records = await retrieveByFilters(
databaseId,
workspaceId,
userId,
{ filters, sorts: [], page: 1, count: maxResults }
);
// Format results
const results = records.map((record: any) => {
const fields = Object.entries((record.attributes as any).fields || {})
.map(([key, value]) => `${key}: ${value}`)
.join('\n');
const content = `Database Record from ${databaseName}:\n${fields}`;
return {
content,
metadata: {
id: record.id,
type: 'record' as const,
databaseId,
databaseName,
createdAt: record.created_at,
createdBy: record.created_by,
},
};
});
console.log(
`✅ Database query returned ${results.length} records from ${databaseName}`
);
return {
results,
totalFound: results.length,
databaseName,
};
} catch (error) {
console.error('❌ Database query error:', error);
throw error;
}
},
});
export const createAITools = () => ({
hybridSearch: createHybridSearchTool(),
databaseSchemaInspection: createDatabaseSchemaInspectionTool(),
databaseQuery: createDatabaseQueryTool(),
});

View File

@@ -1,500 +1,319 @@
import { createWorkflow, createStep } from '@mastra/core/workflows';
import { generateObject, generateText } from 'ai';
import { z } from 'zod';
import { ModelConfig } from './ai-models';
import { retrieveNodes } from '@colanode/server/lib/ai/node-retrievals';
import { retrieveDocuments } from '@colanode/server/lib/ai/document-retrievals';
import {
createIntentAgent,
createAnswerAgent,
createQueryAgent,
createRerankAgent,
} from './ai-agents';
import { createAITools } from './ai-tools';
import {
assistantWorkflowInputSchema,
assistantWorkflowOutputSchema,
intentClassificationOutputSchema,
queryRewriteOutputSchema,
searchResultsOutputSchema,
rankedResultsOutputSchema,
answerOutputSchema,
AssistantWorkflowInput,
AssistantWorkflowOutput,
HybridSearchArgs,
} from '@colanode/server/types/ai';
const intentClassificationStep = createStep({
id: 'intent-classification-step',
description:
'Classify user intent: general knowledge vs workspace-specific query',
inputSchema: assistantWorkflowInputSchema,
outputSchema: intentClassificationOutputSchema,
execute: async ({ inputData, runtimeContext }) => {
const intentAgent = createIntentAgent();
export async function runAssistantWorkflow(
input: AssistantWorkflowInput
): Promise<AssistantWorkflowOutput> {
const intent = await classifyIntent(input.userInput);
const prompt = `You are an intent classifier. Your only job is to determine if this query needs workspace data.
if (intent.intent === 'retrieve') {
const rewrites = await rewriteQuery(input.userInput);
Query: "${inputData.userInput}"
Classify as:
- "no_context": General knowledge, explanations, calculations, definitions
- "retrieve": Workspace-specific content, documents, people, or data
Examples:
- "What is TypeScript?" → no_context
- "How do I write clean code?" → no_context
- "Show me recent documents" → retrieve
- "Find projects by John" → retrieve
Respond with just the classification and your confidence (0-1).`;
try {
const response = await intentAgent.generate(
[{ role: 'user', content: prompt }],
{
runtimeContext,
output: z.object({
intent: z.enum(['no_context', 'retrieve']),
confidence: z.number().min(0).max(1),
reasoning: z.string().optional(),
}),
}
);
console.log(
`🎯 Intent: ${response.object.intent} (confidence: ${response.object.confidence})`
);
return {
intent: response.object.intent,
confidence: response.object.confidence,
reasoning: response.object.reasoning,
originalInput: inputData.userInput,
};
} catch (error) {
throw error;
}
},
});
const queryRewriteStep = createStep({
id: 'query-rewrite-step',
description: 'Optimize user query for semantic and keyword search',
inputSchema: intentClassificationOutputSchema,
outputSchema: queryRewriteOutputSchema,
execute: async ({ inputData, runtimeContext }) => {
const queryAgent = createQueryAgent();
const prompt = `Original Query: "${inputData.originalInput}"
Create two optimized versions for search.`;
try {
const response = await queryAgent.generate(
[{ role: 'user', content: prompt }],
{
runtimeContext,
output: z.object({
semanticQuery: z.string(),
keywordQuery: z.string(),
}),
}
);
console.log(`🔍 Semantic query: "${response.object.semanticQuery}"`);
console.log(`🔑 Keyword query: "${response.object.keywordQuery}"`);
return {
semanticQuery: response.object.semanticQuery,
keywordQuery: response.object.keywordQuery,
originalQuery: inputData.originalInput,
intent: inputData.intent,
};
} catch (error) {
throw error;
}
},
});
const runSearchesStep = createStep({
id: 'run-searches-step',
description: 'Execute hybrid (semantic + keyword) search via single tool',
inputSchema: queryRewriteOutputSchema,
outputSchema: searchResultsOutputSchema,
execute: async ({ inputData, runtimeContext }) => {
const workspaceId = runtimeContext?.get('workspaceId') as string;
const userId = runtimeContext?.get('userId') as string;
const selectedContextNodeIds =
(runtimeContext?.get('selectedContextNodeIds') as string[]) || [];
if (!workspaceId || !userId) {
console.error('❌ Missing required runtime context for search');
throw new Error('Missing required runtime context for search');
}
console.log(`🔍 Running hybrid search...`);
console.log(` Semantic: "${inputData.semanticQuery}"`);
console.log(` Keyword: "${inputData.keywordQuery}"`);
try {
const tools = createAITools();
const hybridResults = await tools.hybridSearch.execute({
context: {
semanticQuery: inputData.semanticQuery,
keywordQuery: inputData.keywordQuery,
workspaceId,
userId,
maxResults: 20,
selectedContextNodeIds,
},
runtimeContext,
});
console.log(
`📊 Hybrid search completed: ${hybridResults.results.length} results`
);
return {
results: hybridResults.results,
searchType: 'hybrid' as const,
totalFound: hybridResults.totalFound,
};
} catch (error) {
console.error('❌ Search execution failed:', error);
throw error;
}
},
});
const combineResultsStep = createStep({
id: 'combine-results-step',
description: 'Combine and score search results using RRF algorithm',
inputSchema: searchResultsOutputSchema,
outputSchema: searchResultsOutputSchema, // Same schema for now
execute: async ({ inputData }) => {
if (inputData.results.length === 0) {
console.log('📭 No results to combine');
return inputData; // Pass through if no results
}
console.log(`🔄 Combining ${inputData.results.length} search results`);
// Simple combination: remove duplicates and apply recency boost
const uniqueResults = new Map();
inputData.results.forEach((result, index) => {
const key = result.sourceId;
if (!uniqueResults.has(key)) {
// Apply position-based scoring (earlier results get higher scores)
const positionScore = 1 / (index + 1);
const recencyBoost = result.metadata.createdAt
? Math.max(
0,
1 -
(Date.now() - new Date(result.metadata.createdAt).getTime()) /
(1000 * 60 * 60 * 24 * 30)
) // 30 days
: 0;
const combinedScore =
result.score * 0.7 + positionScore * 0.2 + recencyBoost * 0.1;
uniqueResults.set(key, {
...result,
score: Math.min(1, combinedScore), // Cap at 1.0
});
}
const { results } = await hybridSearch({
semanticQuery: rewrites.semanticQuery,
keywordQuery: rewrites.keywordQuery,
workspaceId: input.workspaceId,
userId: input.userId,
maxResults: 20,
selectedContextNodeIds: input.selectedContextNodeIds,
});
// Sort by combined score
const combinedResults = Array.from(uniqueResults.values())
.sort((a, b) => b.score - a.score)
.slice(0, 20); // Top 20 results
const combined = combineResults(results);
console.log(`✅ Combined to ${combinedResults.length} unique results`);
return {
results: combinedResults,
searchType: inputData.searchType,
totalFound: combinedResults.length,
};
},
});
const rerankStep = createStep({
id: 'rerank-step',
description: 'Rerank search results for relevance using LLM',
inputSchema: searchResultsOutputSchema,
outputSchema: rankedResultsOutputSchema,
execute: async ({ inputData, runtimeContext }) => {
if (inputData.results.length === 0) {
console.log('📭 No results to rerank');
if (combined.length === 0) {
return {
rankedResults: [],
finalAnswer:
"I couldn't find relevant workspace context for that. Try rephrasing or selecting different context.",
citations: [],
searchPerformed: false,
};
}
const originalQuery =
(runtimeContext?.get('userInput') as string) || 'query';
console.log(
`🎯 Reranking ${inputData.results.length} results for query: "${originalQuery}"`
);
try {
const rerankAgent = createRerankAgent();
// Format results for reranking
const resultsText = inputData.results
.map(
(result, index) => `[${index}] ${result.content.substring(0, 300)}`
)
.join('\n\n');
const prompt = `Query: "${originalQuery}"
Results to score (0.0 to 1.0 for relevance):
${resultsText}
Score each result based on how well it answers the query.`;
const response = await rerankAgent.generate(
[{ role: 'user', content: prompt }],
{
runtimeContext,
output: z.object({
scores: z.array(z.number().min(0).max(1)),
}),
}
);
// Apply scores and sort by relevance
const rankedResults = inputData.results
.map((result, index) => ({
content: result.content,
sourceId: result.sourceId,
relevanceScore: response.object.scores[index] || 0.5,
type: result.type,
metadata: result.metadata,
}))
.sort((a, b) => b.relevanceScore - a.relevanceScore)
.slice(0, 20); // Top 20 results
// Generate citations from top results
const citations = rankedResults
.slice(0, 10) // Top 10 for citations
.map((result) => ({
sourceId: result.sourceId,
quote:
result.content.substring(0, 200).trim() +
(result.content.length > 200 ? '...' : ''),
}));
console.log(
`✅ Reranked to ${rankedResults.length} results with ${citations.length} citations`
);
return {
rankedResults,
citations,
searchPerformed: true,
};
} catch (error) {
throw error;
}
},
});
const answerWithContextStep = createStep({
id: 'answer-with-context-step',
description: 'Generate response using retrieved context and citations',
inputSchema: rankedResultsOutputSchema,
outputSchema: answerOutputSchema,
execute: async ({ inputData, runtimeContext }) => {
const answerAgent = createAnswerAgent();
const userQuery =
(runtimeContext?.get('userInput') as string) || 'User query';
if (inputData.rankedResults.length === 0) {
throw new Error('No context available for answering the query');
}
// Format context for the LLM
const contextText = inputData.rankedResults
.slice(0, 10) // Top 10 results
.map((item, index) => `[Source ${index + 1}] ${item.content}`)
.join('\n\n');
const ranked = await rerankLLM(input.userInput, combined);
console.log(
`📚 Generating response with ${inputData.rankedResults.length} context items`
const { answer, citations } = await answerWithContext(
input.userInput,
ranked,
{
workspaceName: input.workspaceName,
userName: input.userDetails.name,
}
);
const prompt = `Answer the user's question using the provided workspace context.
return { finalAnswer: answer, citations, searchPerformed: true };
}
**User Question:** ${userQuery}
const answer = await answerDirect(input.userInput, {
workspaceName: input.workspaceName,
userName: input.userDetails.name,
});
**Workspace Context:**
${contextText}
return { finalAnswer: answer, citations: [], searchPerformed: false };
}
**Instructions:**
- Use the context as your primary information source
- Cite sources using [Source N] format when referencing context
- If the context doesn't fully answer the question, combine it with general knowledge but clearly distinguish between the two
- Provide specific, actionable information when possible
- Be conversational but professional`;
async function classifyIntent(query: string) {
const schema = z.object({
intent: z.enum(['no_context', 'retrieve']),
confidence: z.number().min(0).max(1),
reasoning: z.string().optional(),
});
try {
const response = await answerAgent.generate(
[{ role: 'user', content: prompt }],
{ runtimeContext }
);
const { object } = await generateObject({
model: ModelConfig.forIntentRecognition(),
schema,
prompt: `
Decide if the user question requires searching workspace context.
console.log(
`✅ Generated contextual response (${response.text.length} characters)`
);
Return JSON { "intent": "retrieve"|"no_context", "confidence": 0..1, "reasoning": "<one short sentence>" }.
Guidelines:
- "retrieve" if the answer likely depends on workspace content (notes, pages, records, files, chats) OR references specific people, dates, projects, or "this/that" items.
- "no_context" for general knowledge, chit-chat, or requests that don't reference workspace data.
Question: "${query}"
`,
experimental_telemetry: {
isEnabled: true,
functionId: 'intent-classification',
mxetadata: { stage: 'intent', userQuery: query },
},
});
return object;
}
async function rewriteQuery(original: string) {
const schema = z.object({
semanticQuery: z.string(),
keywordQuery: z.string(),
});
const { object } = await generateObject({
model: ModelConfig.forQueryRewrite(),
schema,
prompt: `
Rewrite the user's input into two queries:
1) "semanticQuery": A short natural-language search query (520 tokens), with expanded entities, acronyms, and likely synonyms. Drop punctuation and noise.
2) "keywordQuery": A Postgres websearch_to_tsquery string with:
- quoted phrases for exact matches
- important terms first
- optional synonyms after OR
- minus terms if the user excluded something
- keep it <= 15 tokens
Example:
Input: "oncall runbook for payments db; not the legacy doc"
semanticQuery: "payments database oncall runbook escalation procedures current"
keywordQuery: "\"payments database\" runbook escalation -legacy"
Input: "${original}"
Return JSON with both fields only.
`,
experimental_telemetry: {
isEnabled: true,
functionId: 'query-rewrite',
metadata: { stage: 'rewrite', original },
},
});
return object;
}
async function hybridSearch(args: HybridSearchArgs) {
const {
semanticQuery,
keywordQuery,
workspaceId,
userId,
maxResults = 20,
selectedContextNodeIds = [],
} = args;
const [nodeResults, documentResults] = await Promise.all([
retrieveNodes(
{
semanticQuery,
keywordQuery,
originalQuery: semanticQuery || keywordQuery,
intent: 'retrieve',
},
workspaceId,
userId,
maxResults,
selectedContextNodeIds
),
retrieveDocuments(
{
semanticQuery,
keywordQuery,
originalQuery: semanticQuery || keywordQuery,
intent: 'retrieve',
},
workspaceId,
userId,
maxResults,
selectedContextNodeIds
),
]);
const merged = [...nodeResults, ...documentResults]
.slice(0, maxResults)
.map((md: any) => {
const meta = Array.isArray(md.metadata) ? (md.metadata[0] ?? {}) : {};
return {
finalAnswer: response.text,
additionalCitations: inputData.citations,
usedContext: true,
content: md.text || '',
type: meta.type ?? 'document',
sourceId: `${md.id}:${meta.chunkIndex ?? 0}`,
score: md.score ?? 0,
metadata: meta,
};
} catch (error) {
console.error('❌ Contextual answer generation failed:', error);
throw error;
});
return {
results: merged,
totalFound: merged.length,
searchType: 'hybrid' as const,
};
}
function combineResults(results: any[]) {
const unique = new Map<string, any>();
results.forEach((r, i) => {
const key = r.sourceId as string;
if (!unique.has(key)) {
const positionScore = 1 / (i + 1);
// Remove recency boost here - it's already applied in combineAndScoreSearchResults
const combined = r.score * 0.8 + positionScore * 0.2;
unique.set(key, { ...r, score: Math.min(1, combined) });
}
},
});
});
return Array.from(unique.values())
.sort((a, b) => b.score - a.score)
.slice(0, 20);
}
const answerDirectStep = createStep({
id: 'answer-direct-step',
description: 'Generate direct response using general knowledge',
inputSchema: intentClassificationOutputSchema,
outputSchema: answerOutputSchema,
execute: async ({ inputData, runtimeContext }) => {
const answerAgent = createAnswerAgent();
async function rerankLLM(query: string, items: any[]) {
const schema = z.object({ scores: z.array(z.number().min(0).max(1)) });
console.log('📝 Generating direct response for general knowledge query');
const preview = items
.map((it, i) => `[${i}] ${String(it.content).slice(0, 300)}`)
.join('\n\n');
const prompt = `Answer this general knowledge question with a comprehensive, helpful response.
const { object } = await generateObject({
model: ModelConfig.forReranking(),
schema,
prompt: `
Query: "${query}"
Score each result from 0.01.0 for relevance.
**Question:** ${inputData.originalInput}
Scoring rubric:
- 50% Semantic match to the query intent.
- 25% Specificity (concrete details, matches entities).
- 15% Recency (newer is better).
- 10% Diversity (penalize near-duplicates).
**Instructions:**
- Provide accurate, detailed information
- Use examples and explanations where helpful
- Be educational and thorough
- Use clear, professional language
- Don't reference any workspace-specific information`;
You may use metadata if present: type, createdAt, author, chunkIndex.
Return only JSON: { "scores": [number...] } in the same order as provided.
try {
const response = await answerAgent.generate(
[{ role: 'user', content: prompt }],
{ runtimeContext }
);
Results:
${preview}
`,
experimental_telemetry: {
isEnabled: true,
functionId: 'rerank',
metadata: { stage: 'rerank', itemCount: items.length },
},
});
console.log(
`✅ Generated direct response (${response.text.length} characters)`
);
return items
.map((it, i) => ({
content: it.content,
sourceId: it.sourceId,
relevanceScore: object.scores[i] ?? 0.5,
type: it.type,
metadata: it.metadata,
}))
.sort((a, b) => b.relevanceScore - a.relevanceScore)
.slice(0, 20);
}
return {
finalAnswer: response.text,
additionalCitations: [],
usedContext: false,
};
} catch (error) {
console.error('❌ Direct answer generation failed:', error);
throw error;
}
},
});
async function answerWithContext(
userQuery: string,
ranked: any[],
{ workspaceName, userName }: { workspaceName: string; userName: string }
) {
const context = ranked
.slice(0, 10)
.map((r, i) => `[Source ${i + 1}] ${String(r.content)}`)
.join('\n\n');
const formatContextOutputStep = createStep({
id: 'format-context-output-step',
description: 'Format the final assistant response output with context',
inputSchema: answerOutputSchema,
outputSchema: assistantWorkflowOutputSchema,
execute: async ({ inputData }) => {
console.log(`📋 Formatting context-based response output`);
const { text } = await generateText({
model: ModelConfig.forAssistant(),
system: `You are a precise, helpful assistant for the ${workspaceName} workspace. User: ${userName}. When you state a fact from context, add [Source N]. If unsure, say so briefly.`,
prompt: `
User question: ${userQuery}
const response = {
finalAnswer: inputData.finalAnswer,
citations: inputData.additionalCitations || [],
searchPerformed: inputData.usedContext,
};
Use only the context below. If the answer requires assumptions not supported by the context, say what is missing.
Prefer bullet points for lists. Include dates and names when available.
console.log(`✅ Formatted response with context`);
console.log(
`📊 Search performed: ${response.searchPerformed ? 'Yes' : 'No'}`
);
console.log(
`💬 Response length: ${response.finalAnswer.length} characters`
);
Context:
${context}
`,
experimental_telemetry: {
isEnabled: true,
functionId: 'answer-with-context',
metadata: {
stage: 'answer',
withContext: true,
contextItems: ranked.length,
},
},
});
return response;
},
});
const used = new Set<number>();
for (const m of text.matchAll(/\[Source\s+(\d+)\]/gi)) used.add(Number(m[1]));
const citations = Array.from(used)
.slice(0, 10)
.map((i) => {
const r = ranked[i - 1];
return r
? {
sourceId: r.sourceId,
quote:
String(r.content).slice(0, 200).trim() +
(String(r.content).length > 200 ? '...' : ''),
}
: null;
})
.filter(Boolean) as { sourceId: string; quote: string }[];
const formatDirectOutputStep = createStep({
id: 'format-direct-output-step',
description: 'Format the final assistant direct response output',
inputSchema: answerOutputSchema,
outputSchema: assistantWorkflowOutputSchema,
execute: async ({ inputData }) => {
console.log(`📋 Formatting direct response output (no context used)`);
return { answer: text, citations };
}
const response = {
finalAnswer: inputData.finalAnswer,
citations: inputData.additionalCitations || [],
searchPerformed: false,
};
async function answerDirect(
userQuery: string,
{ workspaceName, userName }: { workspaceName: string; userName: string }
) {
const { text } = await generateText({
model: ModelConfig.forAssistant(),
system: `You are a precise, helpful assistant. Keep answers concise but complete. You are working in the ${workspaceName} workspace. User: ${userName}.`,
prompt: `Answer the question clearly. If you need extra context from the workspace, say what would help.
Question: ${userQuery}`,
experimental_telemetry: {
isEnabled: true,
functionId: 'answer-direct',
metadata: { stage: 'answer', withContext: false },
},
});
console.log(`✅ Formatted direct response`);
console.log(
`💬 Response length: ${response.finalAnswer.length} characters`
);
return response;
},
});
export const assistantWorkflow = createWorkflow({
id: 'assistant-workflow',
description:
'Declarative AI assistant workflow optimized for smaller LLMs with proper branching',
inputSchema: assistantWorkflowInputSchema,
outputSchema: assistantWorkflowOutputSchema,
})
// Step 1: Always classify intent first
.then(intentClassificationStep)
// Step 2: Branch based on intent classification
.branch([
// RETRIEVE BRANCH: RAG pipeline for workspace-specific queries
[
async ({ inputData }) => inputData.intent === 'retrieve',
createWorkflow({
id: 'retrieve-branch',
inputSchema: intentClassificationOutputSchema,
outputSchema: assistantWorkflowOutputSchema,
})
.then(queryRewriteStep) // Optimize query for search
.then(runSearchesStep) // Execute parallel searches
.then(combineResultsStep) // Combine results algorithmically
.then(rerankStep) // LLM-based reranking
.then(answerWithContextStep) // Generate answer with context
.then(formatContextOutputStep) // Format output
.commit(),
],
// NO_CONTEXT BRANCH: Direct answer for general knowledge queries
[
async ({ inputData }) => inputData.intent === 'no_context',
createWorkflow({
id: 'no-context-branch',
inputSchema: intentClassificationOutputSchema,
outputSchema: assistantWorkflowOutputSchema,
})
.then(answerDirectStep) // Generate direct answer
.then(formatDirectOutputStep) // Format output
.commit(),
],
])
.commit();
return text;
}

View File

@@ -1,8 +1,8 @@
import { MDocument } from '@mastra/rag';
import { generateText } from 'ai';
import type { NodeType } from '@colanode/core';
import { config } from '@colanode/server/lib/config';
import { TextChunk } from '@colanode/server/types/chunking';
import { createChunkEnrichmentAgent } from './ai-agents';
import { ModelConfig } from './ai-models';
export const chunkText = async (
text: string,
@@ -15,20 +15,20 @@ export const chunkText = async (
const chunkSize = config.ai.chunking.defaultChunkSize;
const chunkOverlap = config.ai.chunking.defaultOverlap;
const doc = MDocument.fromText(text);
const docChunks = await doc.chunk({
strategy: 'recursive',
maxSize: chunkSize,
overlap: chunkOverlap,
const docChunks = await recursiveCharacterSplit(text, {
chunkSize,
chunkOverlap,
keepSeparator: true,
});
const chunks = docChunks
.map((chunk) => ({ text: chunk.text }))
.filter((c) => c.text.trim().length > 5);
const chunks: TextChunk[] = docChunks
.map((c) => ({ text: c }))
.filter((c: TextChunk) => c.text.trim().length > 5);
console.log('chunks', chunks);
if (!config.ai.chunking.enhanceWithContext) {
return chunks;
return chunks; // return plain chunks without summaries
}
const enrichedChunks: TextChunk[] = [];
@@ -56,10 +56,8 @@ const enrichChunk = async (
nodeType: NodeType
): Promise<string | undefined> => {
try {
const enrichmentAgent = createChunkEnrichmentAgent();
const prompt = `<task>
Generate a concise summary of the following text chunk that is part of a larger document.
Generate a concise summary of the following text chunk that is part of a larger document.
This summary will be used to enhance vector search retrieval by providing additional context about this specific chunk.
</task>
@@ -92,13 +90,170 @@ ${chunk}
Provide only the summary with no additional commentary or explanations.
</output_format>`;
const response = await enrichmentAgent.generate([
{ role: 'user', content: prompt },
]);
const { text } = await generateText({
model: ModelConfig.forContextEnhancer(),
prompt,
experimental_telemetry: {
isEnabled: true,
functionId: 'chunk-enrichment',
metadata: {
stage: 'chunk-enrichment',
nodeType,
chunkLength: chunk.length,
},
},
});
return response.text.trim();
return text.trim();
} catch (error) {
console.warn('Failed to enrich chunk:', error);
return undefined;
}
};
/**
* Recursive character splitter.
* - Ordered separators: ["\n\n", "\n", " ", ""]
* - keepSeparator: when true, boundaries are preserved on the next chunk
* - chunkOverlap is applied during merge
*
*/
export async function recursiveCharacterSplit(
text: string,
{
chunkSize,
chunkOverlap = 0,
keepSeparator = true,
separators = ['\n\n', '\n', ' ', ''],
lengthFn,
}: {
chunkSize: number;
chunkOverlap?: number;
keepSeparator?: boolean;
separators?: string[];
lengthFn?: (s: string) => number | Promise<number>;
}
): Promise<string[]> {
if (chunkSize <= 0) throw new Error('chunkSize must be > 0');
if (chunkOverlap < 0) throw new Error('chunkOverlap must be >= 0');
if (chunkOverlap >= chunkSize) {
throw new Error('Cannot have chunkOverlap >= chunkSize');
}
const len = async (s: string) =>
(lengthFn ? await lengthFn(s) : s.length) as number;
const escapeRegExp = (lit: string) =>
lit.replace(/[\/\\^$*+?.()|[\]{}-]/g, '\\$&');
const splitOnSeparator = (t: string, sep: string, keep: boolean) => {
let splits: string[];
if (sep) {
if (keep) {
const re = new RegExp(`(?=${escapeRegExp(sep)})`, 'g');
splits = t.split(re);
} else {
splits = t.split(sep);
}
} else {
splits = t.split('');
}
return splits.filter((s) => s !== '');
};
const joinDocs = (docs: string[], sep: string): string | null => {
const joined = docs.join(sep).trim();
return joined === '' ? null : joined;
};
const mergeSplits = async (splits: string[], sep: string) => {
const out: string[] = [];
const cur: string[] = [];
let total = 0;
for (const d of splits) {
const l = await len(d);
if (total + l + cur.length * sep.length > chunkSize) {
if (total > chunkSize) {
// parity with LC: warn but continue
console.warn(
`Created a chunk of size ${total}, which is longer than the specified ${chunkSize}`
);
}
if (cur.length > 0) {
const doc = joinDocs(cur, sep);
if (doc !== null) out.push(doc);
// pop from the left until overlap satisfied or still too big
while (
total > chunkOverlap ||
(total + l + cur.length * sep.length > chunkSize && total > 0)
) {
total -= await len(cur[0]!);
cur.shift();
}
}
}
cur.push(d);
total += l;
}
const doc = joinDocs(cur, sep);
if (doc !== null) out.push(doc);
return out;
};
const _split = async (t: string, seps: string[]): Promise<string[]> => {
const finalChunks: string[] = [];
// choose separator: first present wins, else last fallback
let sep = seps[seps.length - 1] || '';
let nextSeps: string[] | undefined;
for (let i = 0; i < seps.length; i++) {
const s = seps[i];
if (s === undefined) continue;
if (s === '') {
sep = s;
break;
}
if (t.includes(s)) {
sep = s;
nextSeps = seps.slice(i + 1);
break;
}
}
const splits = splitOnSeparator(t, sep, keepSeparator);
const mergeSep = keepSeparator ? '' : sep;
let good: string[] = [];
for (const s of splits) {
if ((await len(s)) < chunkSize) {
good.push(s);
} else {
if (good.length) {
const merged = await mergeSplits(good, mergeSep);
finalChunks.push(...merged);
good = [];
}
if (!nextSeps) {
finalChunks.push(s); // no finer separators left
} else {
const rec = await _split(s, nextSeps);
finalChunks.push(...rec);
}
}
}
if (good.length) {
const merged = await mergeSplits(good, mergeSep);
finalChunks.push(...merged);
}
return finalChunks;
};
return _split(text, separators);
}

View File

@@ -1,28 +1,22 @@
import { MDocument } from '@mastra/rag';
import { createOpenAI } from '@ai-sdk/openai';
import { AIDocument } from '@colanode/server/lib/ai/utils';
import { embed } from 'ai';
import { sql } from 'kysely';
import { database } from '@colanode/server/data/database';
import { combineAndScoreSearchResults } from '@colanode/server/lib/ai/utils';
import { config } from '@colanode/server/lib/config';
import { getEmbeddingModel } from '@colanode/server/lib/ai/ai-models';
import { QueryRewriteOutput } from '@colanode/server/types/ai';
import { SearchResult } from '@colanode/server/types/retrieval';
const embeddingModel = config.ai.enabled
? createOpenAI({ apiKey: config.ai.embedding.apiKey }).embedding(
config.ai.embedding.modelName
)
: undefined;
export const retrieveDocuments = async (
rewrittenQuery: QueryRewriteOutput,
workspaceId: string,
userId: string,
limit?: number,
contextNodeIds?: string[]
): Promise<MDocument[]> => {
if (!config.ai.enabled || !embeddingModel) {
): Promise<AIDocument[]> => {
if (!config.ai.enabled) {
return [];
}
@@ -33,9 +27,15 @@ export const retrieveDocuments = async (
let semanticResults: SearchResult[] = [];
if (doSemantic) {
const embeddingModel = getEmbeddingModel();
const { embedding } = await embed({
model: embeddingModel,
value: rewrittenQuery.semanticQuery,
providerOptions: {
openai: {
dimensions: config.ai.embedding.dimensions,
},
},
});
if (embedding) {
semanticResults = await semanticSearchDocuments(
@@ -85,7 +85,7 @@ const semanticSearchDocuments = async (
'documents.created_at',
'documents.created_by',
'document_embeddings.chunk as chunk_index',
sql<number>`${sql.raw(`'[${embedding}]'::vector`)} <=> document_embeddings.embedding_vector`.as(
sql<number>`1 - (${sql.raw(`'[${embedding}]'::vector`)} <=> document_embeddings.embedding_vector)`.as(
'similarity'
),
])
@@ -100,18 +100,21 @@ const semanticSearchDocuments = async (
}
const results = await queryBuilder
.groupBy([
.distinctOn([
'document_embeddings.document_id',
'document_embeddings.text',
'document_embeddings.summary',
'documents.created_at',
'documents.created_by',
'document_embeddings.chunk',
])
.orderBy('similarity', 'asc')
.orderBy('document_embeddings.document_id')
.orderBy('document_embeddings.chunk')
.orderBy('similarity', 'desc')
.limit(limit)
.execute();
console.log(
'results text',
results.map((r) => r.text)
);
return results.map((result) => ({
id: result.id,
text: result.text,
@@ -196,8 +199,8 @@ const keywordSearchDocuments = async (
const combineSearchResults = async (
semanticResults: SearchResult[],
keywordResults: SearchResult[]
): Promise<MDocument[]> => {
if (!config.ai.enabled || !embeddingModel) {
): Promise<AIDocument[]> => {
if (!config.ai.enabled) {
return [];
}

View File

@@ -1,28 +1,22 @@
import { MDocument } from '@mastra/rag';
import { createOpenAI } from '@ai-sdk/openai';
import { AIDocument } from '@colanode/server/lib/ai/utils';
import { embed } from 'ai';
import { sql } from 'kysely';
import { database } from '@colanode/server/data/database';
import { combineAndScoreSearchResults } from '@colanode/server/lib/ai/utils';
import { config } from '@colanode/server/lib/config';
import { getEmbeddingModel } from '@colanode/server/lib/ai/ai-models';
import { QueryRewriteOutput } from '@colanode/server/types/ai';
import { SearchResult } from '@colanode/server/types/retrieval';
const embeddingModel = config.ai.enabled
? createOpenAI({ apiKey: config.ai.embedding.apiKey }).embedding(
config.ai.embedding.modelName
)
: undefined;
export const retrieveNodes = async (
rewrittenQuery: QueryRewriteOutput,
workspaceId: string,
userId: string,
limit?: number,
contextNodeIds?: string[]
): Promise<MDocument[]> => {
if (!config.ai.enabled || !embeddingModel) {
): Promise<AIDocument[]> => {
if (!config.ai.enabled) {
return [];
}
@@ -33,9 +27,15 @@ export const retrieveNodes = async (
let semanticResults: SearchResult[] = [];
if (doSemantic) {
const embeddingModel = getEmbeddingModel();
const { embedding } = await embed({
model: embeddingModel,
value: rewrittenQuery.semanticQuery,
providerOptions: {
openai: {
dimensions: config.ai.embedding.dimensions,
},
},
});
if (embedding) {
semanticResults = await semanticSearchNodes(
@@ -84,7 +84,7 @@ const semanticSearchNodes = async (
'nodes.created_at',
'nodes.created_by',
'node_embeddings.chunk as chunk_index',
sql<number>`${sql.raw(`'[${embedding}]'::vector`)} <=> node_embeddings.embedding_vector`.as(
sql<number>`1 - (${sql.raw(`'[${embedding}]'::vector`)} <=> node_embeddings.embedding_vector)`.as(
'similarity'
),
])
@@ -99,15 +99,10 @@ const semanticSearchNodes = async (
}
const results = await queryBuilder
.groupBy([
'node_embeddings.node_id',
'node_embeddings.text',
'nodes.created_at',
'nodes.created_by',
'node_embeddings.chunk',
'node_embeddings.summary',
])
.orderBy('similarity', 'asc')
.distinctOn(['node_embeddings.node_id', 'node_embeddings.chunk'])
.orderBy('node_embeddings.node_id')
.orderBy('node_embeddings.chunk')
.orderBy('similarity', 'desc')
.limit(limit)
.execute();
@@ -194,8 +189,8 @@ const keywordSearchNodes = async (
const combineSearchResults = async (
semanticResults: SearchResult[],
keywordResults: SearchResult[]
): Promise<MDocument[]> => {
if (!config.ai.enabled || !embeddingModel) {
): Promise<AIDocument[]> => {
if (!config.ai.enabled) {
return [];
}

View File

@@ -1,338 +0,0 @@
import { PromptTemplate, ChatPromptTemplate } from '@langchain/core/prompts';
export const queryRewritePrompt = PromptTemplate.fromTemplate(
`<task>
You are an expert at rewriting search queries to optimize for both semantic similarity and keyword-based search in a document retrieval system.
Your task is to generate two separate optimized queries:
1. A semantic search query optimized for vector embeddings and semantic similarity
2. A keyword search query optimized for full-text search using PostgreSQL's tsquery
</task>
<guidelines>
For semantic search query:
1. Focus on conceptual meaning and intent
2. Include context-indicating terms
3. Preserve relationship words between concepts
4. Expand concepts with related terms
5. Remove noise words and syntax-specific terms
For keyword search query:
1. Focus on specific technical terms and exact matches
2. Include variations of key terms
3. Keep proper nouns and domain-specific vocabulary
4. Optimize for PostgreSQL's websearch_to_tsquery syntax
5. Include essential filters and constraints
</guidelines>
<input>
Original query: {query}
</input>
<output_format>
Return a JSON object with:
{{
"semanticQuery": "optimized query for semantic search",
"keywordQuery": "optimized query for keyword search"
}}
</output_format>`
);
export const summarizationPrompt = PromptTemplate.fromTemplate(
`<task>
Summarize the following text focusing on key points relevant to the user's query.
If the text is short (<100 characters), return it as is.
</task>
<input>
Text: {text}
User Query: {query}
</input>`
);
export const rerankPrompt = PromptTemplate.fromTemplate(
`<task>
You are the final relevance judge in a hybrid search system. Your task is to re-rank search results by analyzing their true relevance to the user's query.
These documents have already passed through:
1. Semantic search (vector similarity)
2. Keyword-based search (full-text search)
Your ranking will determine the final order and which documents are shown to the user.
</task>
<context>
Each document contains:
- Main content text
- Optional summary/context
- Metadata (type, creation info)
The documents can be:
- Workspace nodes (various content types)
- Documents (files, notes)
- Database records
</context>
<ranking_criteria>
Evaluate relevance based on:
1. Direct answer presence (highest priority)
- Does the content directly answer the query?
- Are key details or facts present?
2. Contextual relevance
- How well does the content relate to the query topic?
- Is the context/summary relevant?
- Does it provide important background information?
3. Information freshness
- For time-sensitive queries, prefer recent content
- For conceptual queries, recency matters less
4. Content completeness
- Does it provide comprehensive information?
- Are related concepts explained?
5. Source appropriateness
- Is the document type appropriate for the query?
- Does the source authority match the information need?
</ranking_criteria>
<scoring_guidelines>
Score from 0 to 1, where:
1.0: Perfect match, directly answers query
0.8-0.9: Highly relevant, contains most key information
0.5-0.7: Moderately relevant, contains some useful information
0.2-0.4: Tangentially relevant, minimal useful information
0.0-0.1: Not relevant or useful for the query
</scoring_guidelines>
<documents>
{context}
</documents>
<user_query>
{query}
</user_query>
<output_format>
Return a JSON array of objects, each containing:
- "index": original position (integer)
- "score": relevance score (0-1 float)
- "type": document type (string)
- "sourceId": original source ID (string)
Example:
[
{{"index": 2, "score": 0.95, "type": "document", "sourceId": "doc123"}},
{{"index": 0, "score": 0.7, "type": "node", "sourceId": "node456"}}
]
</output_format>`
);
export const answerPrompt = ChatPromptTemplate.fromTemplate(
`<system_context>
You are an AI assistant in a collaboration workspace app called Colanode.
CURRENT TIME: {currentTimestamp}
WORKSPACE: {workspaceName}
USER: {userName} ({userEmail})
</system_context>
<current_conversation_history>
{formattedChatHistory}
</current_conversation_history>
<context>
{formattedDocuments}
</context>
<user_query>
{question}
</user_query>
<task>
Based solely on the current conversation history and the relevant context above, provide a clear and professional answer to the user's query.
Pay attention to the metadata provided for each source (like creation date, author, path, document type, etc.) to:
- Properly attribute information to its source
- Consider the recency and relevance of the information
- Understand the relationships between different content pieces
- Recognize the context in which content was created or modified
In your answer, include exact quotes from the provided context that support your answer.
If the relevant context does not contain any information that answers the user's query, respond with "No relevant information found." This is a critical step to ensure correct answers.
</task>
<output_format>
Return your response as a JSON object with the following structure:
{{
"answer": <your answer as a string>,
"citations": [
{{ "sourceId": <source id>, "quote": <exact quote from the context> }},
...
]
}}
</output_format>`
);
export const intentRecognitionPrompt = PromptTemplate.fromTemplate(
`<task>
Determine if the following user query requires retrieving context from the workspace's knowledge base.
You are a crucial decision point in an AI assistant system that must decide between:
1. Retrieving and using specific context from the workspace ("retrieve")
2. Answering directly from general knowledge ("no_context")
</task>
<context>
This system has access to:
- Documents and their embeddings
- Node content (various types of workspace items)
- Database records and their fields
- Previous conversation history
</context>
<guidelines>
Return "retrieve" when the query:
- Asks about specific workspace content, documents, or data
- References previous conversations or shared content
- Mentions specific projects, tasks, or workspace items
- Requires up-to-date information from the workspace
- Contains temporal references to workspace activity
- Asks about specific people or collaborators
- Needs details about database records or fields
Return "no_context" when the query:
- Asks for general knowledge or common facts
- Requests simple calculations or conversions
- Asks about general concepts without workspace specifics
- Makes small talk
- Requests explanations of universal concepts
- Can be answered correctly without workspace-specific information
</guidelines>
<examples>
"retrieve" examples:
- "What did John say about the API design yesterday?"
- "Show me the latest documentation about user authentication"
- "Find records in the Projects database where status is completed"
- "What were the key points from our last meeting?"
"no_context" examples:
- "What is REST API?"
- "How do I write a good commit message?"
- "Convert 42 kilometers to miles"
- "What's your name?"
- "Explain what is Docker in simple terms"
</examples>
<conversation_history>
{formattedChatHistory}
</conversation_history>
<user_query>
{question}
</user_query>
<output_format>
Return exactly one value: "retrieve" or "no_context"
</output_format>`
);
export const noContextPrompt = PromptTemplate.fromTemplate(
`<task>
Answer the following query concisely using general knowledge, without retrieving additional context. Return only the answer.
</task>
<conversation_history>
{formattedChatHistory}
</conversation_history>
<user_query>
{question}
</user_query>`
);
export const databaseFilterPrompt = ChatPromptTemplate.fromTemplate(
`<task>
You are an expert at analyzing natural language queries and converting them into structured database filters.
Your task is to:
1. Determine if this query is asking or makes sense to answer by filtering/searching databases
2. If yes, generate appropriate filter attributes for each relevant database
3. If no, return shouldFilter: false
</task>
<context>
Available Databases:
{databasesInfo}
</context>
<user_query>
{query}
</user_query>
<guidelines>
Only include databases that are relevant to the query.
For each filter, use the exact field IDs from the database schema.
Use appropriate operators based on field types.
</guidelines>
<output_format>
Return a JSON object with:
- shouldFilter: boolean
- filters: array of objects with:
- databaseId: string
- filters: array of DatabaseViewFilterAttributes
Example Response:
{{
"shouldFilter": true,
"filters": [
{{
"databaseId": "db1",
"filters": [
{{
"type": "field",
"fieldId": "field1",
"operator": "contains",
"value": "search term"
}}
]
}}
]
}}
</output_format>`
);
export const chunkSummarizationPrompt = PromptTemplate.fromTemplate(
`<task>
Generate a concise summary of the following text chunk that is part of a larger document.
This summary will be used to enhance vector search retrieval by providing additional context about this specific chunk.
</task>
<context>
Content Type: {nodeType}
</context>
<guidelines>
1. Create a brief (30-50 words) summary that captures the key points and main idea of the chunk
2. Consider how this chunk fits into the overall document provided
3. If the chunk appears to be part of a specific section, identify its role or purpose
4. If the chunk contains structured data (like a database record), identify the type of information it represents
5. Use neutral, descriptive language
6. Consider the content type ("{nodeType}") when creating the summary - different types have different purposes:
- "message": Communication content in a conversation
- "page": Document-like content with structured information
- "record": Database record with specific fields and values
- Other types: Adapt your summary accordingly
</guidelines>
<complete_document>
{fullText}
</complete_document>
<chunk_to_summarize>
{chunk}
</chunk_to_summarize>
<output_format>
Provide only the summary with no additional commentary or explanations.
</output_format>`
);

View File

@@ -1,4 +1,4 @@
import { MDocument } from '@mastra/rag';
import ms from 'ms';
import { SearchResult } from '@colanode/server/types/retrieval';
export const formatDate = (date?: Date | null): string => {
@@ -33,8 +33,8 @@ const processSearchResult = (
const key = createKey(result);
const recencyBoost = calculateRecencyBoost(result.createdAt);
const normalizedScore = isKeyword
? (result.score / maxScore) * weight
: ((maxScore - result.score) / maxScore) * weight;
? (result.score / maxScore) * weight // rank normalized
: Math.max(0, Math.min(1, result.score)) * weight; // similarity already 0..1
if (combined.has(key)) {
const existing = combined.get(key)!;
@@ -47,27 +47,12 @@ const processSearchResult = (
}
};
const createDocumentFromResult = (
result: SearchResult & { finalScore: number },
authorMap: Map<string, { id: string; name: string | null }>
): MDocument => {
const author = result.createdBy ? authorMap.get(result.createdBy) : null;
const summaryPart = (result.summary ?? '').trim();
const content = summaryPart
? `${summaryPart}\n\n${result.text}`
: result.text;
const doc = MDocument.fromText(content, {
id: result.id,
score: result.finalScore,
createdAt: result.createdAt,
type: (result as any).sourceType ?? 'document',
chunkIndex: result.chunkIndex,
author: author ? { id: author.id, name: author.name || 'Unknown' } : null,
});
return doc;
export type AIDocument = {
id: string;
text: string;
summary: string | null;
score: number;
metadata: Record<string, any>[];
};
export const combineAndScoreSearchResults = (
@@ -76,7 +61,7 @@ export const combineAndScoreSearchResults = (
semanticSearchWeight: number,
keywordSearchWeight: number,
authorMap: Map<string, { id: string; name: string | null }>
): Promise<MDocument[]> => {
): Promise<AIDocument[]> => {
const maxSemanticScore = Math.max(...semanticResults.map((r) => r.score), 1);
const maxKeywordScore = Math.max(...keywordResults.map((r) => r.score), 1);
@@ -103,6 +88,26 @@ export const combineAndScoreSearchResults = (
return Promise.resolve(
Array.from(combined.values())
.sort((a, b) => b.finalScore - a.finalScore)
.map((result) => createDocumentFromResult(result, authorMap))
.map((result) => {
const author = result.createdBy
? authorMap.get(result.createdBy)
: null;
const type = (result as any).sourceType ?? 'document';
const meta: Record<string, any> = {
type,
createdAt: result.createdAt ?? null,
createdBy: result.createdBy ?? null,
chunkIndex: result.chunkIndex,
};
if (author)
meta.author = { id: author.id, name: author.name || 'Unknown' };
return {
id: result.id,
text: result.text,
summary: result.summary,
score: result.finalScore,
metadata: [meta],
};
})
);
};

View File

@@ -87,91 +87,91 @@ export const aiConfigSchema = z.discriminatedUnion('enabled', [
export type AiConfig = z.infer<typeof aiConfigSchema>;
export const readAiConfigVariables = () => {
return {
enabled: false,
};
// return {
// enabled: process.env.AI_ENABLED === 'true',
// nodeEmbeddingDelay: process.env.AI_NODE_EMBEDDING_DELAY,
// documentEmbeddingDelay: process.env.AI_DOCUMENT_EMBEDDING_DELAY,
// providers: {
// openai: {
// apiKey: process.env.OPENAI_API_KEY,
// enabled: process.env.OPENAI_ENABLED,
// },
// google: {
// apiKey: process.env.GOOGLE_API_KEY,
// enabled: process.env.GOOGLE_ENABLED,
// },
// },
// langfuse: {
// enabled: process.env.LANGFUSE_ENABLED,
// publicKey: process.env.LANGFUSE_PUBLIC_KEY,
// secretKey: process.env.LANGFUSE_SECRET_KEY,
// baseUrl: process.env.LANGFUSE_BASE_URL,
// },
// models: {
// queryRewrite: {
// provider: process.env.QUERY_REWRITE_PROVIDER,
// modelName: process.env.QUERY_REWRITE_MODEL,
// temperature: process.env.QUERY_REWRITE_TEMPERATURE,
// },
// response: {
// provider: process.env.RESPONSE_PROVIDER,
// modelName: process.env.RESPONSE_MODEL,
// temperature: process.env.RESPONSE_TEMPERATURE,
// },
// rerank: {
// provider: process.env.RERANK_PROVIDER,
// modelName: process.env.RERANK_MODEL,
// temperature: process.env.RERANK_TEMPERATURE,
// },
// summarization: {
// provider: process.env.SUMMARIZATION_PROVIDER,
// modelName: process.env.SUMMARIZATION_MODEL,
// temperature: process.env.SUMMARIZATION_TEMPERATURE,
// },
// contextEnhancer: {
// provider: process.env.CHUNK_CONTEXT_PROVIDER,
// modelName: process.env.CHUNK_CONTEXT_MODEL,
// temperature: process.env.CHUNK_CONTEXT_TEMPERATURE,
// },
// noContext: {
// provider: process.env.NO_CONTEXT_PROVIDER,
// modelName: process.env.NO_CONTEXT_MODEL,
// temperature: process.env.NO_CONTEXT_TEMPERATURE,
// },
// intentRecognition: {
// provider: process.env.INTENT_RECOGNITION_PROVIDER,
// modelName: process.env.INTENT_RECOGNITION_MODEL,
// temperature: process.env.INTENT_RECOGNITION_TEMPERATURE,
// },
// databaseFilter: {
// provider: process.env.DATABASE_FILTER_PROVIDER,
// modelName: process.env.DATABASE_FILTER_MODEL,
// temperature: process.env.DATABASE_FILTER_TEMPERATURE,
// },
// },
// embedding: {
// provider: process.env.EMBEDDING_PROVIDER,
// modelName: process.env.EMBEDDING_MODEL,
// dimensions: process.env.EMBEDDING_DIMENSIONS,
// apiKey: process.env.EMBEDDING_API_KEY,
// batchSize: process.env.EMBEDDING_BATCH_SIZE,
// },
// chunking: {
// defaultChunkSize: process.env.CHUNK_DEFAULT_CHUNK_SIZE,
// defaultOverlap: process.env.CHUNK_DEFAULT_OVERLAP,
// enhanceWithContext: process.env.CHUNK_ENHANCE_WITH_CONTEXT,
// },
// retrieval: {
// hybridSearch: {
// semanticSearchWeight:
// process.env.RETRIEVAL_HYBRID_SEARCH_SEMANTIC_WEIGHT,
// keywordSearchWeight: process.env.RETRIEVAL_HYBRID_SEARCH_KEYWORD_WEIGHT,
// maxResults: process.env.RETRIEVAL_HYBRID_SEARCH_MAX_RESULTS,
// },
// },
// enabled: true,
// };
return {
enabled: process.env.AI_ENABLED === 'true',
nodeEmbeddingDelay: process.env.AI_NODE_EMBEDDING_DELAY,
documentEmbeddingDelay: process.env.AI_DOCUMENT_EMBEDDING_DELAY,
providers: {
openai: {
apiKey: process.env.OPENAI_API_KEY,
enabled: process.env.OPENAI_ENABLED,
},
google: {
apiKey: process.env.GOOGLE_API_KEY,
enabled: process.env.GOOGLE_ENABLED,
},
},
langfuse: {
enabled: process.env.LANGFUSE_ENABLED,
publicKey: process.env.LANGFUSE_PUBLIC_KEY,
secretKey: process.env.LANGFUSE_SECRET_KEY,
baseUrl: process.env.LANGFUSE_BASE_URL,
},
models: {
queryRewrite: {
provider: process.env.QUERY_REWRITE_PROVIDER,
modelName: process.env.QUERY_REWRITE_MODEL,
temperature: process.env.QUERY_REWRITE_TEMPERATURE,
},
response: {
provider: process.env.RESPONSE_PROVIDER,
modelName: process.env.RESPONSE_MODEL,
temperature: process.env.RESPONSE_TEMPERATURE,
},
rerank: {
provider: process.env.RERANK_PROVIDER,
modelName: process.env.RERANK_MODEL,
temperature: process.env.RERANK_TEMPERATURE,
},
summarization: {
provider: process.env.SUMMARIZATION_PROVIDER,
modelName: process.env.SUMMARIZATION_MODEL,
temperature: process.env.SUMMARIZATION_TEMPERATURE,
},
contextEnhancer: {
provider: process.env.CHUNK_CONTEXT_PROVIDER,
modelName: process.env.CHUNK_CONTEXT_MODEL,
temperature: process.env.CHUNK_CONTEXT_TEMPERATURE,
},
noContext: {
provider: process.env.NO_CONTEXT_PROVIDER,
modelName: process.env.NO_CONTEXT_MODEL,
temperature: process.env.NO_CONTEXT_TEMPERATURE,
},
intentRecognition: {
provider: process.env.INTENT_RECOGNITION_PROVIDER,
modelName: process.env.INTENT_RECOGNITION_MODEL,
temperature: process.env.INTENT_RECOGNITION_TEMPERATURE,
},
databaseFilter: {
provider: process.env.DATABASE_FILTER_PROVIDER,
modelName: process.env.DATABASE_FILTER_MODEL,
temperature: process.env.DATABASE_FILTER_TEMPERATURE,
},
},
embedding: {
provider: process.env.EMBEDDING_PROVIDER,
modelName: process.env.EMBEDDING_MODEL,
dimensions: process.env.EMBEDDING_DIMENSIONS,
apiKey: process.env.EMBEDDING_API_KEY,
batchSize: process.env.EMBEDDING_BATCH_SIZE,
},
chunking: {
defaultChunkSize: process.env.CHUNK_DEFAULT_CHUNK_SIZE,
defaultOverlap: process.env.CHUNK_DEFAULT_OVERLAP,
enhanceWithContext: process.env.CHUNK_ENHANCE_WITH_CONTEXT,
},
retrieval: {
hybridSearch: {
semanticSearchWeight:
process.env.RETRIEVAL_HYBRID_SEARCH_SEMANTIC_WEIGHT,
keywordSearchWeight: process.env.RETRIEVAL_HYBRID_SEARCH_KEYWORD_WEIGHT,
maxResults: process.env.RETRIEVAL_HYBRID_SEARCH_MAX_RESULTS,
},
},
};
};

View File

@@ -0,0 +1,36 @@
import { NodeSDK } from '@opentelemetry/sdk-node';
import { getNodeAutoInstrumentations } from '@opentelemetry/auto-instrumentations-node';
import { LangfuseExporter } from 'langfuse-vercel';
let sdk: NodeSDK | null = null;
let started = false;
export const initObservability = () => {
if (started) return;
const secretKey = process.env.LANGFUSE_SECRET_KEY;
const publicKey = process.env.LANGFUSE_PUBLIC_KEY;
const baseUrl = process.env.LANGFUSE_BASEURL;
if (!secretKey || !publicKey) {
return;
}
console.log('LANGFUSE_PUBLIC_KEY', publicKey);
console.log('LANGFUSE_BASEURL', baseUrl);
const exporter = new LangfuseExporter({
secretKey,
publicKey,
baseUrl: baseUrl || 'https://cloud.langfuse.com',
});
sdk = new NodeSDK({
traceExporter: exporter,
instrumentations: [getNodeAutoInstrumentations()],
});
sdk.start();
started = true;
};

View File

@@ -0,0 +1,42 @@
import { eventBus } from '@colanode/server/lib/event-bus';
import { fetchNode } from '@colanode/server/lib/nodes';
import { createLogger } from '@colanode/server/lib/logger';
import { jobService } from '@colanode/server/services/job-service';
const logger = createLogger('server:service:assistant-trigger');
export const initAssistantTrigger = () => {
eventBus.subscribe(async (event) => {
if (event.type !== 'node.created') {
return;
}
try {
const node = await fetchNode(event.nodeId);
if (!node) {
return;
}
const attributes = node.attributes as {
type?: string;
subtype?: string;
} | null;
if (
attributes?.type === 'message' &&
attributes?.subtype === 'question'
) {
await jobService.addJob({
type: 'assistant.respond',
messageId: node.id,
workspaceId: event.workspaceId,
});
logger.debug(
`Queued assistant response for question message ${node.id} (workspace ${event.workspaceId})`
);
}
} catch (error) {
logger.error(error, 'Failed handling node.created for assistant trigger');
}
});
};

View File

@@ -20,7 +20,9 @@ class JobService {
private readonly prefix = `{${config.redis.jobs.prefix}}`;
public async initQueue(): Promise<void> {
console.log('initQueue');
if (this.jobQueue) {
console.log('jobQueue already initialized');
return;
}
@@ -39,11 +41,14 @@ class JobService {
logger.error(error, `Job queue error`);
});
console.log('initRecurringJobs');
await this.initRecurringJobs();
}
public async initWorker() {
console.log('initWorker');
if (this.jobWorker) {
console.log('jobWorker already initialized');
return;
}
@@ -61,9 +66,38 @@ class JobService {
throw new Error('Job queue not initialized.');
}
console.log('adding job', job.type);
await this.jobQueue.add(job.type, job, options);
}
public async removeJob(jobId: string) {
if (!this.jobQueue) {
throw new Error('Job queue not initialized.');
}
console.log(`Removing job ${jobId}`);
const job = await this.jobQueue.getJob(jobId);
if (job) {
await job.remove();
console.log(`Successfully removed job ${jobId}`);
} else {
console.log(`Job ${jobId} not found, nothing to remove`);
}
}
public async getJob(jobId: string): Promise<Job | null> {
if (!this.jobQueue) {
throw new Error('Job queue not initialized.');
}
try {
return await this.jobQueue.getJob(jobId);
} catch (error) {
console.error(`Error getting job ${jobId}:`, error);
return null;
}
}
private handleJobJob = async (job: Job) => {
const input = job.data as JobInput;
const handler = jobHandlerMap[input.type] as JobHandler<typeof input>;

View File

@@ -8,6 +8,7 @@ export const assistantWorkflowInputSchema = z.object({
userInput: z.string(),
workspaceId: z.string(),
userId: z.string(),
workspaceName: z.string(),
userDetails: z.object({
name: z.string(),
email: z.string(),
@@ -54,7 +55,7 @@ export const searchResultsOutputSchema = z.object({
sourceId: z.string(),
score: z.number(),
type: z.string(),
metadata: z.record(z.any()),
metadata: z.array(z.record(z.string(), z.any())),
})
),
searchType: z.enum(['semantic', 'keyword', 'database', 'hybrid']),
@@ -68,7 +69,7 @@ export const rankedResultsOutputSchema = z.object({
sourceId: z.string(),
relevanceScore: z.number(),
type: z.string(),
metadata: z.record(z.any()),
metadata: z.array(z.record(z.string(), z.any())),
})
),
citations: z.array(
@@ -109,10 +110,29 @@ export const WorkflowConfig = {
// TYPE EXPORTS
// ============================================================================
export type AssistantWorkflowInput = z.infer<typeof assistantWorkflowInputSchema>;
export type AssistantWorkflowOutput = z.infer<typeof assistantWorkflowOutputSchema>;
export type IntentClassificationOutput = z.infer<typeof intentClassificationOutputSchema>;
export type AssistantWorkflowInput = z.infer<
typeof assistantWorkflowInputSchema
>;
export type AssistantWorkflowOutput = z.infer<
typeof assistantWorkflowOutputSchema
>;
export type IntentClassificationOutput = z.infer<
typeof intentClassificationOutputSchema
>;
export type QueryRewriteOutput = z.infer<typeof queryRewriteOutputSchema>;
export type SearchResultsOutput = z.infer<typeof searchResultsOutputSchema>;
export type RankedResultsOutput = z.infer<typeof rankedResultsOutputSchema>;
export type AnswerOutput = z.infer<typeof answerOutputSchema>;
export type AnswerOutput = z.infer<typeof answerOutputSchema>;
// =========================================================================
// HYBRID SEARCH ARGS
// =========================================================================
export type HybridSearchArgs = {
semanticQuery: string;
keywordQuery: string;
workspaceId: string;
userId: string;
maxResults?: number;
selectedContextNodeIds?: string[];
};

View File

@@ -151,11 +151,11 @@ services:
REDIS_URL: 'redis://:your_valkey_password@valkey:6379/0'
REDIS_DB: '0'
# Optional variables:
# REDIS_JOBS_QUEUE_NAME: 'jobs'
# REDIS_JOBS_QUEUE_PREFIX: 'colanode'
# REDIS_TUS_LOCK_PREFIX: 'colanode:tus:lock'
# REDIS_TUS_KV_PREFIX: 'colanode:tus:kv'
# REDIS_EVENTS_CHANNEL: 'events'
REDIS_JOBS_QUEUE_NAME: 'jobs'
REDIS_JOBS_QUEUE_PREFIX: 'colanode'
REDIS_TUS_LOCK_PREFIX: 'colanode:tus:lock'
REDIS_TUS_KV_PREFIX: 'colanode:tus:kv'
REDIS_EVENTS_CHANNEL: 'events'
# ───────────────────────────────────────────────────────────────
# S3 configuration for files.

1582
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@@ -59,7 +59,7 @@ export class MessageCreateMutationHandler
const messageAttributes: MessageAttributes = {
type: 'message',
subtype: 'standard',
subtype: input.subtype ?? 'standard',
parentId: input.parentId,
content: blocks,
referenceId: input.referenceId,

View File

@@ -7,6 +7,7 @@ export type MessageCreateMutationInput = {
parentId: string;
content: JSONContent;
referenceId?: string;
subtype?: 'standard' | 'question';
};
export type MessageCreateMutationOutput = {

View File

@@ -70,6 +70,7 @@
"cmdk": "^1.1.1",
"date-fns": "^4.1.0",
"is-hotkey": "^0.2.0",
"lowlight": "^3.3.0",
"lucide-react": "^0.539.0",
"re-resizable": "^6.11.2",
"react": "^19.1.1",

View File

@@ -41,6 +41,7 @@ export const MessageCreate = forwardRef<MessageCreateRefProps>((_, ref) => {
const messageEditorRef = useRef<MessageEditorRefProps>(null);
const [content, setContent] = useState<JSONContent | null>(null);
const [replyTo, setReplyTo] = useState<LocalMessageNode | null>(null);
const [askAI, setAskAI] = useState<boolean>(false);
const hasContent = content != null && editorHasContent(content);
@@ -79,9 +80,11 @@ export const MessageCreate = forwardRef<MessageCreateRefProps>((_, ref) => {
workspaceId: workspace.id,
referenceId: replyTo?.id,
rootId: conversation.rootId,
subtype: askAI ? 'question' : undefined,
},
onSuccess: () => {
setReplyTo(null);
setAskAI(false);
if (messageEditorRef.current) {
messageEditorRef.current.clear();
messageEditorRef.current.focus();
@@ -170,6 +173,21 @@ export const MessageCreate = forwardRef<MessageCreateRefProps>((_, ref) => {
)}
</div>
<div className="flex flex-row gap-2">
<button
type="button"
className={`${
conversation.canCreateMessage
? askAI
? 'text-blue-600'
: 'text-muted-foreground'
: 'text-muted-foreground'
}`}
disabled={isPending || !conversation.canCreateMessage}
onClick={() => setAskAI((v) => !v)}
title="Ask AI"
>
AI
</button>
{isPending ? (
<Spinner size={20} />
) : (