Files
open-webui/backend/open_webui/routers/knowledge.py
Classic298 33e8a09880 fix(db): release connection before embedding in knowledge /create (#20575)
Remove Depends(get_session) from POST /create endpoint to prevent database connections from being held during embedding API calls (1-5+ seconds).

The has_permission() and Knowledges.insert_new_knowledge() functions manage their own short-lived sessions internally, releasing connections before the slow embed_knowledge_base_metadata() call begins.
2026-01-11 23:37:05 +04:00

990 lines
30 KiB
Python

from typing import List, Optional
from pydantic import BaseModel
from fastapi import APIRouter, Depends, HTTPException, status, Request, Query
from fastapi.responses import StreamingResponse
from fastapi.concurrency import run_in_threadpool
import logging
import io
import zipfile
from sqlalchemy.orm import Session
from open_webui.internal.db import get_session
from open_webui.models.groups import Groups
from open_webui.models.knowledge import (
KnowledgeFileListResponse,
Knowledges,
KnowledgeForm,
KnowledgeResponse,
KnowledgeUserResponse,
)
from open_webui.models.files import Files, FileModel, FileMetadataResponse
from open_webui.retrieval.vector.factory import VECTOR_DB_CLIENT
from open_webui.routers.retrieval import (
process_file,
ProcessFileForm,
process_files_batch,
BatchProcessFilesForm,
)
from open_webui.storage.provider import Storage
from open_webui.constants import ERROR_MESSAGES
from open_webui.utils.auth import get_verified_user, get_admin_user
from open_webui.utils.access_control import has_access, has_permission
from open_webui.config import BYPASS_ADMIN_ACCESS_CONTROL
from open_webui.models.models import Models, ModelForm
log = logging.getLogger(__name__)
router = APIRouter()
############################
# getKnowledgeBases
############################
PAGE_ITEM_COUNT = 30
############################
# Knowledge Base Embedding
############################
KNOWLEDGE_BASES_COLLECTION = "knowledge-bases"
async def embed_knowledge_base_metadata(
request: Request,
knowledge_base_id: str,
name: str,
description: str,
) -> bool:
"""Generate and store embedding for knowledge base."""
try:
content = f"{name}\n\n{description}" if description else name
embedding = await request.app.state.EMBEDDING_FUNCTION(content)
VECTOR_DB_CLIENT.upsert(
collection_name=KNOWLEDGE_BASES_COLLECTION,
items=[
{
"id": knowledge_base_id,
"text": content,
"vector": embedding,
"metadata": {
"knowledge_base_id": knowledge_base_id,
},
}
],
)
return True
except Exception as e:
log.error(f"Failed to embed knowledge base {knowledge_base_id}: {e}")
return False
def remove_knowledge_base_metadata_embedding(knowledge_base_id: str) -> bool:
"""Remove knowledge base embedding."""
try:
VECTOR_DB_CLIENT.delete(
collection_name=KNOWLEDGE_BASES_COLLECTION,
ids=[knowledge_base_id],
)
return True
except Exception as e:
log.debug(f"Failed to remove embedding for {knowledge_base_id}: {e}")
return False
class KnowledgeAccessResponse(KnowledgeUserResponse):
write_access: Optional[bool] = False
class KnowledgeAccessListResponse(BaseModel):
items: list[KnowledgeAccessResponse]
total: int
@router.get("/", response_model=KnowledgeAccessListResponse)
async def get_knowledge_bases(
page: Optional[int] = 1,
user=Depends(get_verified_user),
db: Session = Depends(get_session),
):
page = max(page, 1)
limit = PAGE_ITEM_COUNT
skip = (page - 1) * limit
filter = {}
if not user.role == "admin" or not BYPASS_ADMIN_ACCESS_CONTROL:
groups = Groups.get_groups_by_member_id(user.id, db=db)
if groups:
filter["group_ids"] = [group.id for group in groups]
filter["user_id"] = user.id
result = Knowledges.search_knowledge_bases(
user.id, filter=filter, skip=skip, limit=limit, db=db
)
return KnowledgeAccessListResponse(
items=[
KnowledgeAccessResponse(
**knowledge_base.model_dump(),
write_access=(
user.id == knowledge_base.user_id
or (user.role == "admin" and BYPASS_ADMIN_ACCESS_CONTROL)
or has_access(
user.id, "write", knowledge_base.access_control, db=db
)
),
)
for knowledge_base in result.items
],
total=result.total,
)
@router.get("/search", response_model=KnowledgeAccessListResponse)
async def search_knowledge_bases(
query: Optional[str] = None,
view_option: Optional[str] = None,
page: Optional[int] = 1,
user=Depends(get_verified_user),
db: Session = Depends(get_session),
):
page = max(page, 1)
limit = PAGE_ITEM_COUNT
skip = (page - 1) * limit
filter = {}
if query:
filter["query"] = query
if view_option:
filter["view_option"] = view_option
if not user.role == "admin" or not BYPASS_ADMIN_ACCESS_CONTROL:
groups = Groups.get_groups_by_member_id(user.id, db=db)
if groups:
filter["group_ids"] = [group.id for group in groups]
filter["user_id"] = user.id
result = Knowledges.search_knowledge_bases(
user.id, filter=filter, skip=skip, limit=limit, db=db
)
return KnowledgeAccessListResponse(
items=[
KnowledgeAccessResponse(
**knowledge_base.model_dump(),
write_access=(
user.id == knowledge_base.user_id
or (user.role == "admin" and BYPASS_ADMIN_ACCESS_CONTROL)
or has_access(
user.id, "write", knowledge_base.access_control, db=db
)
),
)
for knowledge_base in result.items
],
total=result.total,
)
@router.get("/search/files", response_model=KnowledgeFileListResponse)
async def search_knowledge_files(
query: Optional[str] = None,
page: Optional[int] = 1,
user=Depends(get_verified_user),
db: Session = Depends(get_session),
):
page = max(page, 1)
limit = PAGE_ITEM_COUNT
skip = (page - 1) * limit
filter = {}
if query:
filter["query"] = query
groups = Groups.get_groups_by_member_id(user.id, db=db)
if groups:
filter["group_ids"] = [group.id for group in groups]
filter["user_id"] = user.id
return Knowledges.search_knowledge_files(
filter=filter, skip=skip, limit=limit, db=db
)
############################
# CreateNewKnowledge
############################
@router.post("/create", response_model=Optional[KnowledgeResponse])
async def create_new_knowledge(
request: Request,
form_data: KnowledgeForm,
user=Depends(get_verified_user),
):
# NOTE: We intentionally do NOT use Depends(get_session) here.
# Database operations (has_permission, insert_new_knowledge) manage their own sessions.
# This prevents holding a connection during embed_knowledge_base_metadata()
# which makes external embedding API calls (1-5+ seconds).
if user.role != "admin" and not has_permission(
user.id, "workspace.knowledge", request.app.state.config.USER_PERMISSIONS
):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=ERROR_MESSAGES.UNAUTHORIZED,
)
# Check if user can share publicly
if (
user.role != "admin"
and form_data.access_control == None
and not has_permission(
user.id,
"sharing.public_knowledge",
request.app.state.config.USER_PERMISSIONS,
)
):
form_data.access_control = {}
knowledge = Knowledges.insert_new_knowledge(user.id, form_data)
if knowledge:
# Embed knowledge base for semantic search
await embed_knowledge_base_metadata(
request,
knowledge.id,
knowledge.name,
knowledge.description,
)
return knowledge
else:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.FILE_EXISTS,
)
############################
# ReindexKnowledgeFiles
############################
@router.post("/reindex", response_model=bool)
async def reindex_knowledge_files(
request: Request,
user=Depends(get_verified_user),
db: Session = Depends(get_session),
):
if user.role != "admin":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=ERROR_MESSAGES.UNAUTHORIZED,
)
knowledge_bases = Knowledges.get_knowledge_bases(db=db)
log.info(f"Starting reindexing for {len(knowledge_bases)} knowledge bases")
for knowledge_base in knowledge_bases:
try:
files = Knowledges.get_files_by_id(knowledge_base.id, db=db)
try:
if VECTOR_DB_CLIENT.has_collection(collection_name=knowledge_base.id):
VECTOR_DB_CLIENT.delete_collection(
collection_name=knowledge_base.id
)
except Exception as e:
log.error(f"Error deleting collection {knowledge_base.id}: {str(e)}")
continue # Skip, don't raise
failed_files = []
for file in files:
try:
await run_in_threadpool(
process_file,
request,
ProcessFileForm(
file_id=file.id, collection_name=knowledge_base.id
),
user=user,
db=db,
)
except Exception as e:
log.error(
f"Error processing file {file.filename} (ID: {file.id}): {str(e)}"
)
failed_files.append({"file_id": file.id, "error": str(e)})
continue
except Exception as e:
log.error(f"Error processing knowledge base {knowledge_base.id}: {str(e)}")
# Don't raise, just continue
continue
if failed_files:
log.warning(
f"Failed to process {len(failed_files)} files in knowledge base {knowledge_base.id}"
)
for failed in failed_files:
log.warning(f"File ID: {failed['file_id']}, Error: {failed['error']}")
log.info(f"Reindexing completed.")
return True
############################
# ReindexKnowledgeBases
############################
@router.post("/metadata/reindex", response_model=dict)
async def reindex_knowledge_base_metadata_embeddings(
request: Request,
user=Depends(get_admin_user),
):
"""Batch embed all existing knowledge bases. Admin only.
NOTE: We intentionally do NOT use Depends(get_session) here.
This endpoint loops through ALL knowledge bases and calls embed_knowledge_base_metadata()
for each one, making N external embedding API calls. Holding a session during
this entire operation would exhaust the connection pool.
"""
knowledge_bases = Knowledges.get_knowledge_bases()
log.info(f"Reindexing embeddings for {len(knowledge_bases)} knowledge bases")
success_count = 0
for kb in knowledge_bases:
if await embed_knowledge_base_metadata(request, kb.id, kb.name, kb.description):
success_count += 1
log.info(f"Embedding reindex complete: {success_count}/{len(knowledge_bases)}")
return {"total": len(knowledge_bases), "success": success_count}
############################
# GetKnowledgeById
############################
class KnowledgeFilesResponse(KnowledgeResponse):
files: Optional[list[FileMetadataResponse]] = None
write_access: Optional[bool] = False
@router.get("/{id}", response_model=Optional[KnowledgeFilesResponse])
async def get_knowledge_by_id(
id: str, user=Depends(get_verified_user), db: Session = Depends(get_session)
):
knowledge = Knowledges.get_knowledge_by_id(id=id, db=db)
if knowledge:
if (
user.role == "admin"
or knowledge.user_id == user.id
or has_access(user.id, "read", knowledge.access_control, db=db)
):
return KnowledgeFilesResponse(
**knowledge.model_dump(),
write_access=(
user.id == knowledge.user_id
or (user.role == "admin" and BYPASS_ADMIN_ACCESS_CONTROL)
or has_access(user.id, "write", knowledge.access_control, db=db)
),
)
else:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=ERROR_MESSAGES.ACCESS_PROHIBITED,
)
else:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=ERROR_MESSAGES.NOT_FOUND,
)
############################
# UpdateKnowledgeById
############################
@router.post("/{id}/update", response_model=Optional[KnowledgeFilesResponse])
async def update_knowledge_by_id(
request: Request,
id: str,
form_data: KnowledgeForm,
user=Depends(get_verified_user),
):
# NOTE: We intentionally do NOT use Depends(get_session) here.
# Database operations manage their own short-lived sessions internally.
# This prevents holding a connection during embed_knowledge_base_metadata()
# which makes external embedding API calls (1-5+ seconds).
knowledge = Knowledges.get_knowledge_by_id(id=id)
if not knowledge:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.NOT_FOUND,
)
# Is the user the original creator, in a group with write access, or an admin
if (
knowledge.user_id != user.id
and not has_access(user.id, "write", knowledge.access_control)
and user.role != "admin"
):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.ACCESS_PROHIBITED,
)
# Check if user can share publicly
if (
user.role != "admin"
and form_data.access_control == None
and not has_permission(
user.id,
"sharing.public_knowledge",
request.app.state.config.USER_PERMISSIONS,
)
):
form_data.access_control = {}
knowledge = Knowledges.update_knowledge_by_id(id=id, form_data=form_data)
if knowledge:
# Re-embed knowledge base for semantic search
await embed_knowledge_base_metadata(
request,
knowledge.id,
knowledge.name,
knowledge.description,
)
return KnowledgeFilesResponse(
**knowledge.model_dump(),
files=Knowledges.get_file_metadatas_by_id(knowledge.id),
)
else:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.ID_TAKEN,
)
############################
# GetKnowledgeFilesById
############################
@router.get("/{id}/files", response_model=KnowledgeFileListResponse)
async def get_knowledge_files_by_id(
id: str,
query: Optional[str] = None,
view_option: Optional[str] = None,
order_by: Optional[str] = None,
direction: Optional[str] = None,
page: Optional[int] = 1,
user=Depends(get_verified_user),
db: Session = Depends(get_session),
):
knowledge = Knowledges.get_knowledge_by_id(id=id, db=db)
if not knowledge:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.NOT_FOUND,
)
if not (
user.role == "admin"
or knowledge.user_id == user.id
or has_access(user.id, "read", knowledge.access_control, db=db)
):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.ACCESS_PROHIBITED,
)
page = max(page, 1)
limit = 30
skip = (page - 1) * limit
filter = {}
if query:
filter["query"] = query
if view_option:
filter["view_option"] = view_option
if order_by:
filter["order_by"] = order_by
if direction:
filter["direction"] = direction
return Knowledges.search_files_by_id(
id, user.id, filter=filter, skip=skip, limit=limit, db=db
)
############################
# AddFileToKnowledge
############################
class KnowledgeFileIdForm(BaseModel):
file_id: str
@router.post("/{id}/file/add", response_model=Optional[KnowledgeFilesResponse])
def add_file_to_knowledge_by_id(
request: Request,
id: str,
form_data: KnowledgeFileIdForm,
user=Depends(get_verified_user),
db: Session = Depends(get_session),
):
knowledge = Knowledges.get_knowledge_by_id(id=id, db=db)
if not knowledge:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.NOT_FOUND,
)
if (
knowledge.user_id != user.id
and not has_access(user.id, "write", knowledge.access_control, db=db)
and user.role != "admin"
):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.ACCESS_PROHIBITED,
)
file = Files.get_file_by_id(form_data.file_id, db=db)
if not file:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.NOT_FOUND,
)
if not file.data:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.FILE_NOT_PROCESSED,
)
# Add content to the vector database
try:
process_file(
request,
ProcessFileForm(file_id=form_data.file_id, collection_name=id),
user=user,
db=db,
)
# Add file to knowledge base
Knowledges.add_file_to_knowledge_by_id(
knowledge_id=id, file_id=form_data.file_id, user_id=user.id, db=db
)
except Exception as e:
log.debug(e)
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e),
)
if knowledge:
return KnowledgeFilesResponse(
**knowledge.model_dump(),
files=Knowledges.get_file_metadatas_by_id(knowledge.id, db=db),
)
else:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.NOT_FOUND,
)
@router.post("/{id}/file/update", response_model=Optional[KnowledgeFilesResponse])
def update_file_from_knowledge_by_id(
request: Request,
id: str,
form_data: KnowledgeFileIdForm,
user=Depends(get_verified_user),
db: Session = Depends(get_session),
):
knowledge = Knowledges.get_knowledge_by_id(id=id, db=db)
if not knowledge:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.NOT_FOUND,
)
if (
knowledge.user_id != user.id
and not has_access(user.id, "write", knowledge.access_control, db=db)
and user.role != "admin"
):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.ACCESS_PROHIBITED,
)
file = Files.get_file_by_id(form_data.file_id, db=db)
if not file:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.NOT_FOUND,
)
# Remove content from the vector database
VECTOR_DB_CLIENT.delete(
collection_name=knowledge.id, filter={"file_id": form_data.file_id}
)
# Add content to the vector database
try:
process_file(
request,
ProcessFileForm(file_id=form_data.file_id, collection_name=id),
user=user,
db=db,
)
except Exception as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e),
)
if knowledge:
return KnowledgeFilesResponse(
**knowledge.model_dump(),
files=Knowledges.get_file_metadatas_by_id(knowledge.id, db=db),
)
else:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.NOT_FOUND,
)
############################
# RemoveFileFromKnowledge
############################
@router.post("/{id}/file/remove", response_model=Optional[KnowledgeFilesResponse])
def remove_file_from_knowledge_by_id(
id: str,
form_data: KnowledgeFileIdForm,
delete_file: bool = Query(True),
user=Depends(get_verified_user),
db: Session = Depends(get_session),
):
knowledge = Knowledges.get_knowledge_by_id(id=id, db=db)
if not knowledge:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.NOT_FOUND,
)
if (
knowledge.user_id != user.id
and not has_access(user.id, "write", knowledge.access_control, db=db)
and user.role != "admin"
):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.ACCESS_PROHIBITED,
)
file = Files.get_file_by_id(form_data.file_id, db=db)
if not file:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.NOT_FOUND,
)
Knowledges.remove_file_from_knowledge_by_id(
knowledge_id=id, file_id=form_data.file_id, db=db
)
# Remove content from the vector database
try:
VECTOR_DB_CLIENT.delete(
collection_name=knowledge.id, filter={"file_id": form_data.file_id}
) # Remove by file_id first
VECTOR_DB_CLIENT.delete(
collection_name=knowledge.id, filter={"hash": file.hash}
) # Remove by hash as well in case of duplicates
except Exception as e:
log.debug("This was most likely caused by bypassing embedding processing")
log.debug(e)
pass
if delete_file:
try:
# Remove the file's collection from vector database
file_collection = f"file-{form_data.file_id}"
if VECTOR_DB_CLIENT.has_collection(collection_name=file_collection):
VECTOR_DB_CLIENT.delete_collection(collection_name=file_collection)
except Exception as e:
log.debug("This was most likely caused by bypassing embedding processing")
log.debug(e)
pass
# Delete file from database
Files.delete_file_by_id(form_data.file_id, db=db)
if knowledge:
return KnowledgeFilesResponse(
**knowledge.model_dump(),
files=Knowledges.get_file_metadatas_by_id(knowledge.id, db=db),
)
else:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.NOT_FOUND,
)
############################
# DeleteKnowledgeById
############################
@router.delete("/{id}/delete", response_model=bool)
async def delete_knowledge_by_id(
id: str, user=Depends(get_verified_user), db: Session = Depends(get_session)
):
knowledge = Knowledges.get_knowledge_by_id(id=id, db=db)
if not knowledge:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.NOT_FOUND,
)
if (
knowledge.user_id != user.id
and not has_access(user.id, "write", knowledge.access_control, db=db)
and user.role != "admin"
):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.ACCESS_PROHIBITED,
)
log.info(f"Deleting knowledge base: {id} (name: {knowledge.name})")
# Get all models
models = Models.get_all_models(db=db)
log.info(f"Found {len(models)} models to check for knowledge base {id}")
# Update models that reference this knowledge base
for model in models:
if model.meta and hasattr(model.meta, "knowledge"):
knowledge_list = model.meta.knowledge or []
# Filter out the deleted knowledge base
updated_knowledge = [k for k in knowledge_list if k.get("id") != id]
# If the knowledge list changed, update the model
if len(updated_knowledge) != len(knowledge_list):
log.info(f"Updating model {model.id} to remove knowledge base {id}")
model.meta.knowledge = updated_knowledge
# Create a ModelForm for the update
model_form = ModelForm(
id=model.id,
name=model.name,
base_model_id=model.base_model_id,
meta=model.meta,
params=model.params,
access_control=model.access_control,
is_active=model.is_active,
)
Models.update_model_by_id(model.id, model_form, db=db)
# Clean up vector DB
try:
VECTOR_DB_CLIENT.delete_collection(collection_name=id)
except Exception as e:
log.debug(e)
pass
# Remove knowledge base embedding
remove_knowledge_base_metadata_embedding(id)
result = Knowledges.delete_knowledge_by_id(id=id, db=db)
return result
############################
# ResetKnowledgeById
############################
@router.post("/{id}/reset", response_model=Optional[KnowledgeResponse])
async def reset_knowledge_by_id(
id: str, user=Depends(get_verified_user), db: Session = Depends(get_session)
):
knowledge = Knowledges.get_knowledge_by_id(id=id, db=db)
if not knowledge:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.NOT_FOUND,
)
if (
knowledge.user_id != user.id
and not has_access(user.id, "write", knowledge.access_control, db=db)
and user.role != "admin"
):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.ACCESS_PROHIBITED,
)
try:
VECTOR_DB_CLIENT.delete_collection(collection_name=id)
except Exception as e:
log.debug(e)
pass
knowledge = Knowledges.reset_knowledge_by_id(id=id, db=db)
return knowledge
############################
# AddFilesToKnowledge
############################
@router.post("/{id}/files/batch/add", response_model=Optional[KnowledgeFilesResponse])
async def add_files_to_knowledge_batch(
request: Request,
id: str,
form_data: list[KnowledgeFileIdForm],
user=Depends(get_verified_user),
db: Session = Depends(get_session),
):
"""
Add multiple files to a knowledge base
"""
knowledge = Knowledges.get_knowledge_by_id(id=id, db=db)
if not knowledge:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.NOT_FOUND,
)
if (
knowledge.user_id != user.id
and not has_access(user.id, "write", knowledge.access_control, db=db)
and user.role != "admin"
):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.ACCESS_PROHIBITED,
)
# Get files content
log.info(f"files/batch/add - {len(form_data)} files")
files: List[FileModel] = []
for form in form_data:
file = Files.get_file_by_id(form.file_id, db=db)
if not file:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"File {form.file_id} not found",
)
files.append(file)
# Process files
try:
result = await process_files_batch(
request=request,
form_data=BatchProcessFilesForm(files=files, collection_name=id),
user=user,
db=db,
)
except Exception as e:
log.error(
f"add_files_to_knowledge_batch: Exception occurred: {e}", exc_info=True
)
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
# Only add files that were successfully processed
successful_file_ids = [r.file_id for r in result.results if r.status == "completed"]
for file_id in successful_file_ids:
Knowledges.add_file_to_knowledge_by_id(
knowledge_id=id, file_id=file_id, user_id=user.id, db=db
)
# If there were any errors, include them in the response
if result.errors:
error_details = [f"{err.file_id}: {err.error}" for err in result.errors]
return KnowledgeFilesResponse(
**knowledge.model_dump(),
files=Knowledges.get_file_metadatas_by_id(knowledge.id, db=db),
warnings={
"message": "Some files failed to process",
"errors": error_details,
},
)
return KnowledgeFilesResponse(
**knowledge.model_dump(),
files=Knowledges.get_file_metadatas_by_id(knowledge.id, db=db),
)
############################
# ExportKnowledgeById
############################
@router.get("/{id}/export")
async def export_knowledge_by_id(
id: str, user=Depends(get_admin_user), db: Session = Depends(get_session)
):
"""
Export a knowledge base as a zip file containing .txt files.
Admin only.
"""
knowledge = Knowledges.get_knowledge_by_id(id=id, db=db)
if not knowledge:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=ERROR_MESSAGES.NOT_FOUND,
)
files = Knowledges.get_files_by_id(id, db=db)
# Create zip file in memory
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zf:
for file in files:
content = file.data.get("content", "") if file.data else ""
if content:
# Use original filename with .txt extension
filename = file.filename
if not filename.endswith(".txt"):
filename = f"{filename}.txt"
zf.writestr(filename, content)
zip_buffer.seek(0)
# Sanitize knowledge name for filename
safe_name = "".join(c if c.isalnum() or c in " -_" else "_" for c in knowledge.name)
zip_filename = f"{safe_name}.zip"
return StreamingResponse(
zip_buffer,
media_type="application/zip",
headers={"Content-Disposition": f"attachment; filename={zip_filename}"},
)