Files
Timothy Jaeryang Baek 9044abf3bb chore: format
2026-02-23 01:40:53 -06:00

912 lines
28 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import logging
import os
import uuid
import json
from pathlib import Path
from typing import Optional
from urllib.parse import quote
import asyncio
from fastapi import (
BackgroundTasks,
APIRouter,
Depends,
File,
Form,
HTTPException,
Request,
UploadFile,
status,
Query,
)
from fastapi.responses import FileResponse, StreamingResponse
from sqlalchemy.orm import Session
from open_webui.internal.db import get_session, SessionLocal
from open_webui.constants import ERROR_MESSAGES
from open_webui.retrieval.vector.factory import VECTOR_DB_CLIENT
from open_webui.models.channels import Channels
from open_webui.models.users import Users
from open_webui.models.files import (
FileForm,
FileModel,
FileModelResponse,
Files,
)
from open_webui.models.chats import Chats
from open_webui.models.knowledge import Knowledges
from open_webui.models.groups import Groups
from open_webui.models.access_grants import AccessGrants
from open_webui.routers.retrieval import ProcessFileForm, process_file
from open_webui.routers.audio import transcribe
from open_webui.storage.provider import Storage
from open_webui.config import BYPASS_ADMIN_ACCESS_CONTROL
from open_webui.utils.auth import get_admin_user, get_verified_user
from open_webui.utils.misc import strict_match_mime_type
from pydantic import BaseModel
log = logging.getLogger(__name__)
router = APIRouter()
############################
# Check if the current user has access to a file through any knowledge bases the user may be in.
############################
# TODO: Optimize this function to use the knowledge_file table for faster lookups.
def has_access_to_file(
file_id: Optional[str],
access_type: str,
user=Depends(get_verified_user),
db: Optional[Session] = None,
) -> bool:
file = Files.get_file_by_id(file_id, db=db)
log.debug(f"Checking if user has {access_type} access to file")
if not file:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=ERROR_MESSAGES.NOT_FOUND,
)
# Check if the file is associated with any knowledge bases the user has access to
knowledge_bases = Knowledges.get_knowledges_by_file_id(file_id, db=db)
user_group_ids = {
group.id for group in Groups.get_groups_by_member_id(user.id, db=db)
}
for knowledge_base in knowledge_bases:
if knowledge_base.user_id == user.id or AccessGrants.has_access(
user_id=user.id,
resource_type="knowledge",
resource_id=knowledge_base.id,
permission=access_type,
user_group_ids=user_group_ids,
db=db,
):
return True
knowledge_base_id = file.meta.get("collection_name") if file.meta else None
if knowledge_base_id:
knowledge_bases = Knowledges.get_knowledge_bases_by_user_id(
user.id, access_type, db=db
)
for knowledge_base in knowledge_bases:
if knowledge_base.id == knowledge_base_id:
return True
# Check if the file is associated with any channels the user has access to
channels = Channels.get_channels_by_file_id_and_user_id(file_id, user.id, db=db)
if access_type == "read" and channels:
return True
# Check if the file is associated with any chats the user has access to
# TODO: Granular access control for chats
chats = Chats.get_shared_chats_by_file_id(file_id, db=db)
if chats:
return True
return False
############################
# Upload File
############################
def _is_text_file(file_path: str, chunk_size: int = 8192) -> bool:
"""Check if a file is likely a text file by reading a chunk and validating UTF-8.
This catches files whose extensions are mis-mapped by mimetypes/browsers
(e.g. TypeScript .ts → video/mp2t) without maintaining an extension whitelist.
"""
try:
resolved = Storage.get_file(file_path)
with open(resolved, "rb") as f:
chunk = f.read(chunk_size)
if not chunk:
return False
# Null bytes are a strong indicator of binary content
if b"\x00" in chunk:
return False
chunk.decode("utf-8")
return True
except (UnicodeDecodeError, Exception):
return False
def process_uploaded_file(
request,
file,
file_path,
file_item,
file_metadata,
user,
db: Optional[Session] = None,
):
def _process_handler(db_session):
try:
content_type = file.content_type
# Detect mis-labeled text files (e.g. .ts → video/mp2t)
if content_type and content_type.startswith(("image/", "video/")):
if _is_text_file(file_path):
content_type = "text/plain"
if content_type:
stt_supported_content_types = getattr(
request.app.state.config, "STT_SUPPORTED_CONTENT_TYPES", []
)
if strict_match_mime_type(stt_supported_content_types, content_type):
file_path_processed = Storage.get_file(file_path)
result = transcribe(
request, file_path_processed, file_metadata, user
)
process_file(
request,
ProcessFileForm(
file_id=file_item.id, content=result.get("text", "")
),
user=user,
db=db_session,
)
elif (not content_type.startswith(("image/", "video/"))) or (
request.app.state.config.CONTENT_EXTRACTION_ENGINE == "external"
):
process_file(
request,
ProcessFileForm(file_id=file_item.id),
user=user,
db=db_session,
)
else:
raise Exception(
f"File type {content_type} is not supported for processing"
)
else:
log.info(
f"File type {file.content_type} is not provided, but trying to process anyway"
)
process_file(
request,
ProcessFileForm(file_id=file_item.id),
user=user,
db=db_session,
)
except Exception as e:
log.error(f"Error processing file: {file_item.id}")
Files.update_file_data_by_id(
file_item.id,
{
"status": "failed",
"error": str(e.detail) if hasattr(e, "detail") else str(e),
},
db=db_session,
)
if db:
_process_handler(db)
else:
with SessionLocal() as db_session:
_process_handler(db_session)
@router.post("/", response_model=FileModelResponse)
def upload_file(
request: Request,
background_tasks: BackgroundTasks,
file: UploadFile = File(...),
metadata: Optional[dict | str] = Form(None),
process: bool = Query(True),
process_in_background: bool = Query(True),
user=Depends(get_verified_user),
db: Session = Depends(get_session),
):
return upload_file_handler(
request,
file=file,
metadata=metadata,
process=process,
process_in_background=process_in_background,
user=user,
background_tasks=background_tasks,
db=db,
)
def upload_file_handler(
request: Request,
file: UploadFile = File(...),
metadata: Optional[dict | str] = Form(None),
process: bool = Query(True),
process_in_background: bool = Query(True),
user=Depends(get_verified_user),
background_tasks: Optional[BackgroundTasks] = None,
db: Optional[Session] = None,
):
log.info(f"file.content_type: {file.content_type} {process}")
if isinstance(metadata, str):
try:
metadata = json.loads(metadata)
except json.JSONDecodeError:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.DEFAULT("Invalid metadata format"),
)
file_metadata = metadata if metadata else {}
try:
unsanitized_filename = file.filename
filename = os.path.basename(unsanitized_filename)
file_extension = os.path.splitext(filename)[1]
# Remove the leading dot from the file extension
file_extension = file_extension[1:] if file_extension else ""
if process and request.app.state.config.ALLOWED_FILE_EXTENSIONS:
request.app.state.config.ALLOWED_FILE_EXTENSIONS = [
ext for ext in request.app.state.config.ALLOWED_FILE_EXTENSIONS if ext
]
if file_extension not in request.app.state.config.ALLOWED_FILE_EXTENSIONS:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.DEFAULT(
f"File type {file_extension} is not allowed"
),
)
# replace filename with uuid
id = str(uuid.uuid4())
name = filename
filename = f"{id}_{filename}"
contents, file_path = Storage.upload_file(
file.file,
filename,
{
"OpenWebUI-User-Email": user.email,
"OpenWebUI-User-Id": user.id,
"OpenWebUI-User-Name": user.name,
"OpenWebUI-File-Id": id,
},
)
file_item = Files.insert_new_file(
user.id,
FileForm(
**{
"id": id,
"filename": name,
"path": file_path,
"data": {
**({"status": "pending"} if process else {}),
},
"meta": {
"name": name,
"content_type": (
file.content_type
if isinstance(file.content_type, str)
else None
),
"size": len(contents),
"data": file_metadata,
},
}
),
db=db,
)
if "channel_id" in file_metadata:
channel = Channels.get_channel_by_id_and_user_id(
file_metadata["channel_id"], user.id, db=db
)
if channel:
Channels.add_file_to_channel_by_id(
channel.id, file_item.id, user.id, db=db
)
if process:
if background_tasks and process_in_background:
background_tasks.add_task(
process_uploaded_file,
request,
file,
file_path,
file_item,
file_metadata,
user,
)
return {"status": True, **file_item.model_dump()}
else:
process_uploaded_file(
request,
file,
file_path,
file_item,
file_metadata,
user,
db=db,
)
return {"status": True, **file_item.model_dump()}
else:
if file_item:
return file_item
else:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.DEFAULT("Error uploading file"),
)
except HTTPException as e:
raise e
except Exception as e:
log.exception(e)
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.DEFAULT("Error uploading file"),
)
############################
# List Files
############################
@router.get("/", response_model=list[FileModelResponse])
async def list_files(
user=Depends(get_verified_user),
content: bool = Query(True),
db: Session = Depends(get_session),
):
if user.role == "admin" and BYPASS_ADMIN_ACCESS_CONTROL:
files = Files.get_files(db=db)
else:
files = Files.get_files_by_user_id(user.id, db=db)
if not content:
for file in files:
if "content" in file.data:
del file.data["content"]
return files
############################
# Search Files
############################
@router.get("/search", response_model=list[FileModelResponse])
async def search_files(
filename: str = Query(
...,
description="Filename pattern to search for. Supports wildcards such as '*.txt'",
),
content: bool = Query(True),
skip: int = Query(0, ge=0, description="Number of files to skip"),
limit: int = Query(
100, ge=1, le=1000, description="Maximum number of files to return"
),
user=Depends(get_verified_user),
db: Session = Depends(get_session),
):
"""
Search for files by filename with support for wildcard patterns.
Uses SQL-based filtering with pagination for better performance.
"""
# Determine user_id: null for admin with bypass (search all), user.id otherwise
user_id = (
None if (user.role == "admin" and BYPASS_ADMIN_ACCESS_CONTROL) else user.id
)
# Use optimized database query with pagination
files = Files.search_files(
user_id=user_id,
filename=filename,
skip=skip,
limit=limit,
db=db,
)
if not files:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="No files found matching the pattern.",
)
if not content:
for file in files:
if file.data and "content" in file.data:
del file.data["content"]
return files
############################
# Delete All Files
############################
@router.delete("/all")
async def delete_all_files(
user=Depends(get_admin_user), db: Session = Depends(get_session)
):
result = Files.delete_all_files(db=db)
if result:
try:
Storage.delete_all_files()
VECTOR_DB_CLIENT.reset()
except Exception as e:
log.exception(e)
log.error("Error deleting files")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.DEFAULT("Error deleting files"),
)
return {"message": "All files deleted successfully"}
else:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.DEFAULT("Error deleting files"),
)
############################
# Get File By Id
############################
@router.get("/{id}", response_model=Optional[FileModel])
async def get_file_by_id(
id: str, user=Depends(get_verified_user), db: Session = Depends(get_session)
):
file = Files.get_file_by_id(id, db=db)
if not file:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=ERROR_MESSAGES.NOT_FOUND,
)
if (
file.user_id == user.id
or user.role == "admin"
or has_access_to_file(id, "read", user, db=db)
):
return file
else:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=ERROR_MESSAGES.NOT_FOUND,
)
@router.get("/{id}/process/status")
async def get_file_process_status(
id: str,
stream: bool = Query(False),
user=Depends(get_verified_user),
db: Session = Depends(get_session),
):
file = Files.get_file_by_id(id, db=db)
if not file:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=ERROR_MESSAGES.NOT_FOUND,
)
if (
file.user_id == user.id
or user.role == "admin"
or has_access_to_file(id, "read", user, db=db)
):
if stream:
MAX_FILE_PROCESSING_DURATION = 3600 * 2
async def event_stream(file_id):
# NOTE: We intentionally do NOT capture the request's db session here.
# Each poll creates its own short-lived session to avoid holding a
# connection for hours. A WebSocket push would be more efficient.
for _ in range(MAX_FILE_PROCESSING_DURATION):
file_item = Files.get_file_by_id(file_id) # Creates own session
if file_item:
data = file_item.model_dump().get("data", {})
status = data.get("status")
if status:
event = {"status": status}
if status == "failed":
event["error"] = data.get("error")
yield f"data: {json.dumps(event)}\n\n"
if status in ("completed", "failed"):
break
else:
# Legacy
break
else:
yield f"data: {json.dumps({'status': 'not_found'})}\n\n"
break
await asyncio.sleep(1)
return StreamingResponse(
event_stream(file.id),
media_type="text/event-stream",
)
else:
return {"status": file.data.get("status", "pending")}
else:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=ERROR_MESSAGES.NOT_FOUND,
)
############################
# Get File Data Content By Id
############################
@router.get("/{id}/data/content")
async def get_file_data_content_by_id(
id: str, user=Depends(get_verified_user), db: Session = Depends(get_session)
):
file = Files.get_file_by_id(id, db=db)
if not file:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=ERROR_MESSAGES.NOT_FOUND,
)
if (
file.user_id == user.id
or user.role == "admin"
or has_access_to_file(id, "read", user, db=db)
):
return {"content": file.data.get("content", "")}
else:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=ERROR_MESSAGES.NOT_FOUND,
)
############################
# Update File Data Content By Id
############################
class ContentForm(BaseModel):
content: str
@router.post("/{id}/data/content/update")
def update_file_data_content_by_id(
request: Request,
id: str,
form_data: ContentForm,
user=Depends(get_verified_user),
db: Session = Depends(get_session),
):
file = Files.get_file_by_id(id, db=db)
if not file:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=ERROR_MESSAGES.NOT_FOUND,
)
if (
file.user_id == user.id
or user.role == "admin"
or has_access_to_file(id, "write", user, db=db)
):
try:
process_file(
request,
ProcessFileForm(file_id=id, content=form_data.content),
user=user,
)
file = Files.get_file_by_id(id=id, db=db)
except Exception as e:
log.exception(e)
log.error(f"Error processing file: {file.id}")
return {"content": file.data.get("content", "")}
else:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=ERROR_MESSAGES.NOT_FOUND,
)
############################
# Get File Content By Id
############################
@router.get("/{id}/content")
async def get_file_content_by_id(
id: str,
user=Depends(get_verified_user),
attachment: bool = Query(False),
db: Session = Depends(get_session),
):
file = Files.get_file_by_id(id, db=db)
if not file:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=ERROR_MESSAGES.NOT_FOUND,
)
if (
file.user_id == user.id
or user.role == "admin"
or has_access_to_file(id, "read", user, db=db)
):
try:
file_path = Storage.get_file(file.path)
file_path = Path(file_path)
# Check if the file already exists in the cache
if file_path.is_file():
# Handle Unicode filenames
filename = file.meta.get("name", file.filename)
encoded_filename = quote(filename) # RFC5987 encoding
content_type = file.meta.get("content_type")
filename = file.meta.get("name", file.filename)
encoded_filename = quote(filename)
headers = {}
if attachment:
headers["Content-Disposition"] = (
f"attachment; filename*=UTF-8''{encoded_filename}"
)
else:
if content_type == "application/pdf" or filename.lower().endswith(
".pdf"
):
headers["Content-Disposition"] = (
f"inline; filename*=UTF-8''{encoded_filename}"
)
content_type = "application/pdf"
elif content_type != "text/plain":
headers["Content-Disposition"] = (
f"attachment; filename*=UTF-8''{encoded_filename}"
)
return FileResponse(file_path, headers=headers, media_type=content_type)
else:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=ERROR_MESSAGES.NOT_FOUND,
)
except HTTPException as e:
raise e
except Exception as e:
log.exception(e)
log.error("Error getting file content")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.DEFAULT("Error getting file content"),
)
else:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=ERROR_MESSAGES.NOT_FOUND,
)
@router.get("/{id}/content/html")
async def get_html_file_content_by_id(
id: str, user=Depends(get_verified_user), db: Session = Depends(get_session)
):
file = Files.get_file_by_id(id, db=db)
if not file:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=ERROR_MESSAGES.NOT_FOUND,
)
file_user = Users.get_user_by_id(file.user_id, db=db)
if not file_user.role == "admin":
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=ERROR_MESSAGES.NOT_FOUND,
)
if (
file.user_id == user.id
or user.role == "admin"
or has_access_to_file(id, "read", user, db=db)
):
try:
file_path = Storage.get_file(file.path)
file_path = Path(file_path)
# Check if the file already exists in the cache
if file_path.is_file():
log.info(f"file_path: {file_path}")
return FileResponse(file_path)
else:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=ERROR_MESSAGES.NOT_FOUND,
)
except HTTPException as e:
raise e
except Exception as e:
log.exception(e)
log.error("Error getting file content")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.DEFAULT("Error getting file content"),
)
else:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=ERROR_MESSAGES.NOT_FOUND,
)
@router.get("/{id}/content/{file_name}")
async def get_file_content_by_id(
id: str, user=Depends(get_verified_user), db: Session = Depends(get_session)
):
file = Files.get_file_by_id(id, db=db)
if not file:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=ERROR_MESSAGES.NOT_FOUND,
)
if (
file.user_id == user.id
or user.role == "admin"
or has_access_to_file(id, "read", user, db=db)
):
file_path = file.path
# Handle Unicode filenames
filename = file.meta.get("name", file.filename)
encoded_filename = quote(filename) # RFC5987 encoding
headers = {
"Content-Disposition": f"attachment; filename*=UTF-8''{encoded_filename}"
}
if file_path:
file_path = Storage.get_file(file_path)
file_path = Path(file_path)
# Check if the file already exists in the cache
if file_path.is_file():
return FileResponse(file_path, headers=headers)
else:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=ERROR_MESSAGES.NOT_FOUND,
)
else:
# File path doesnt exist, return the content as .txt if possible
file_content = file.content.get("content", "")
file_name = file.filename
# Create a generator that encodes the file content
def generator():
yield file_content.encode("utf-8")
return StreamingResponse(
generator(),
media_type="text/plain",
headers=headers,
)
else:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=ERROR_MESSAGES.NOT_FOUND,
)
############################
# Delete File By Id
############################
@router.delete("/{id}")
async def delete_file_by_id(
id: str, user=Depends(get_verified_user), db: Session = Depends(get_session)
):
file = Files.get_file_by_id(id, db=db)
if not file:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=ERROR_MESSAGES.NOT_FOUND,
)
if (
file.user_id == user.id
or user.role == "admin"
or has_access_to_file(id, "write", user, db=db)
):
# Clean up KB associations and embeddings before deleting
knowledges = Knowledges.get_knowledges_by_file_id(id, db=db)
for knowledge in knowledges:
# Remove KB-file relationship
Knowledges.remove_file_from_knowledge_by_id(knowledge.id, id, db=db)
# Clean KB embeddings (same logic as /knowledge/{id}/file/remove)
try:
VECTOR_DB_CLIENT.delete(
collection_name=knowledge.id, filter={"file_id": id}
)
if file.hash:
VECTOR_DB_CLIENT.delete(
collection_name=knowledge.id, filter={"hash": file.hash}
)
except Exception as e:
log.debug(f"KB embedding cleanup for {knowledge.id}: {e}")
result = Files.delete_file_by_id(id, db=db)
if result:
try:
Storage.delete_file(file.path)
VECTOR_DB_CLIENT.delete(collection_name=f"file-{id}")
except Exception as e:
log.exception(e)
log.error("Error deleting files")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.DEFAULT("Error deleting files"),
)
return {"message": "File deleted successfully"}
else:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.DEFAULT("Error deleting file"),
)
else:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=ERROR_MESSAGES.NOT_FOUND,
)