2026-01-05 04:45:17 +04:00
"""
Built - in tools for Open WebUI .
These tools are automatically available when native function calling is enabled .
2026-01-06 01:42:49 +04:00
IMPORTANT : DO NOT IMPORT THIS MODULE DIRECTLY IN OTHER PARTS OF THE CODEBASE .
2026-01-05 04:45:17 +04:00
"""
import json
import logging
import time
2026-01-06 01:53:00 +04:00
import asyncio
2026-01-05 04:45:17 +04:00
from typing import Optional
from fastapi import Request
from open_webui . models . users import UserModel
2026-01-07 10:21:05 -05:00
from open_webui . routers . retrieval import search_web as _search_web
2026-01-05 04:45:17 +04:00
from open_webui . retrieval . utils import get_content_from_url
2026-01-05 17:45:39 +04:00
from open_webui . routers . images import (
image_generations ,
image_edits ,
CreateImageForm ,
EditImageForm ,
)
from open_webui . routers . memories import (
query_memory ,
2026-01-06 01:42:49 +04:00
add_memory as _add_memory ,
update_memory_by_id ,
2026-01-05 17:45:39 +04:00
QueryMemoryForm ,
AddMemoryForm ,
2026-01-06 01:42:49 +04:00
MemoryUpdateModel ,
2026-01-05 17:45:39 +04:00
)
from open_webui . models . notes import Notes
from open_webui . models . chats import Chats
from open_webui . models . channels import Channels , ChannelMember , Channel
from open_webui . models . messages import Messages , Message
from open_webui . models . groups import Groups
2026-02-19 14:06:24 -06:00
from open_webui . models . memories import Memories
from open_webui . retrieval . vector . factory import VECTOR_DB_CLIENT
2026-02-06 22:25:18 +04:00
from open_webui . utils . sanitize import sanitize_code
2026-01-05 04:45:17 +04:00
log = logging . getLogger ( __name__ )
2026-01-09 22:21:00 +04:00
MAX_KNOWLEDGE_BASE_SEARCH_ITEMS = 10_000
2026-01-05 17:45:39 +04:00
# =============================================================================
# TIME UTILITIES
# =============================================================================
async def get_current_timestamp (
__request__ : Request = None ,
__user__ : dict = None ,
) - > str :
"""
Get the current Unix timestamp in seconds .
: return : JSON with current_timestamp ( seconds ) and current_iso ( ISO format )
"""
try :
import datetime
now = datetime . datetime . now ( datetime . timezone . utc )
return json . dumps (
{
" current_timestamp " : int ( now . timestamp ( ) ) ,
" current_iso " : now . isoformat ( ) ,
} ,
ensure_ascii = False ,
)
except Exception as e :
log . exception ( f " get_current_timestamp error: { e } " )
return json . dumps ( { " error " : str ( e ) } )
async def calculate_timestamp (
days_ago : int = 0 ,
weeks_ago : int = 0 ,
months_ago : int = 0 ,
years_ago : int = 0 ,
__request__ : Request = None ,
__user__ : dict = None ,
) - > str :
"""
Get the current Unix timestamp , optionally adjusted by days , weeks , months , or years .
Use this to calculate timestamps for date filtering in search functions .
Examples : " last week " = weeks_ago = 1 , " 3 days ago " = days_ago = 3 , " a year ago " = years_ago = 1
: param days_ago : Number of days to subtract from current time ( default : 0 )
: param weeks_ago : Number of weeks to subtract from current time ( default : 0 )
: param months_ago : Number of months to subtract from current time ( default : 0 )
: param years_ago : Number of years to subtract from current time ( default : 0 )
: return : JSON with current_timestamp and calculated_timestamp ( both in seconds )
"""
try :
import datetime
from dateutil . relativedelta import relativedelta
now = datetime . datetime . now ( datetime . timezone . utc )
current_ts = int ( now . timestamp ( ) )
# Calculate the adjusted time
total_days = days_ago + ( weeks_ago * 7 )
adjusted = now - datetime . timedelta ( days = total_days )
# Handle months and years separately (variable length)
if months_ago > 0 or years_ago > 0 :
adjusted = adjusted - relativedelta ( months = months_ago , years = years_ago )
adjusted_ts = int ( adjusted . timestamp ( ) )
return json . dumps (
{
" current_timestamp " : current_ts ,
" current_iso " : now . isoformat ( ) ,
" calculated_timestamp " : adjusted_ts ,
" calculated_iso " : adjusted . isoformat ( ) ,
} ,
ensure_ascii = False ,
)
except ImportError :
# Fallback without dateutil
import datetime
now = datetime . datetime . now ( datetime . timezone . utc )
current_ts = int ( now . timestamp ( ) )
total_days = days_ago + ( weeks_ago * 7 ) + ( months_ago * 30 ) + ( years_ago * 365 )
adjusted = now - datetime . timedelta ( days = total_days )
adjusted_ts = int ( adjusted . timestamp ( ) )
return json . dumps (
{
" current_timestamp " : current_ts ,
" current_iso " : now . isoformat ( ) ,
" calculated_timestamp " : adjusted_ts ,
" calculated_iso " : adjusted . isoformat ( ) ,
} ,
ensure_ascii = False ,
)
except Exception as e :
log . exception ( f " calculate_timestamp error: { e } " )
return json . dumps ( { " error " : str ( e ) } )
# =============================================================================
# WEB SEARCH TOOLS
# =============================================================================
2026-01-05 04:45:17 +04:00
2026-01-07 10:21:05 -05:00
async def search_web (
2026-01-05 04:45:17 +04:00
query : str ,
count : int = 5 ,
__request__ : Request = None ,
__user__ : dict = None ,
) - > str :
"""
2026-01-07 10:21:05 -05:00
Search the public web for information . Best for current events , external references ,
or topics not covered in internal documents . If knowledge base tools are available ,
consider checking those first for internal information .
2026-01-05 04:45:17 +04:00
: param query : The search query to look up
: param count : Number of results to return ( default : 5 )
: return : JSON with search results containing title , link , and snippet for each result
"""
if __request__ is None :
return json . dumps ( { " error " : " Request context not available " } )
try :
engine = __request__ . app . state . config . WEB_SEARCH_ENGINE
user = UserModel ( * * __user__ ) if __user__ else None
2026-02-13 20:32:48 +01:00
# Use admin-configured result count if configured, falling back to model-provided count of provided, else default to 5
count = __request__ . app . state . config . WEB_SEARCH_RESULT_COUNT or count
2026-01-15 14:46:00 +08:00
results = await asyncio . to_thread ( _search_web , __request__ , engine , query , user )
2026-01-05 04:45:17 +04:00
# Limit results
results = results [ : count ] if results else [ ]
return json . dumps (
[ { " title " : r . title , " link " : r . link , " snippet " : r . snippet } for r in results ] ,
ensure_ascii = False ,
)
except Exception as e :
2026-01-07 10:21:05 -05:00
log . exception ( f " search_web error: { e } " )
2026-01-05 04:45:17 +04:00
return json . dumps ( { " error " : str ( e ) } )
async def fetch_url (
url : str ,
__request__ : Request = None ,
__user__ : dict = None ,
) - > str :
"""
Fetch and extract the main text content from a web page URL .
: param url : The URL to fetch content from
: return : The extracted text content from the page
"""
if __request__ is None :
return json . dumps ( { " error " : " Request context not available " } )
try :
2026-01-06 01:53:00 +04:00
content , _ = await asyncio . to_thread ( get_content_from_url , __request__ , url )
2026-01-05 17:45:39 +04:00
2026-01-05 04:45:17 +04:00
# Truncate if too long (avoid overwhelming context)
max_length = 50000
if len ( content ) > max_length :
content = content [ : max_length ] + " \n \n [Content truncated...] "
2026-01-05 17:45:39 +04:00
2026-01-05 04:45:17 +04:00
return content
except Exception as e :
log . exception ( f " fetch_url error: { e } " )
return json . dumps ( { " error " : str ( e ) } )
2026-01-05 17:45:39 +04:00
# =============================================================================
# IMAGE GENERATION TOOLS
# =============================================================================
2026-01-05 04:45:17 +04:00
async def generate_image (
prompt : str ,
__request__ : Request = None ,
__user__ : dict = None ,
__event_emitter__ : callable = None ,
2026-01-06 02:19:57 +04:00
__chat_id__ : str = None ,
__message_id__ : str = None ,
2026-01-05 04:45:17 +04:00
) - > str :
"""
Generate an image based on a text prompt .
: param prompt : A detailed description of the image to generate
: return : Confirmation that the image was generated , or an error message
"""
if __request__ is None :
return json . dumps ( { " error " : " Request context not available " } )
try :
user = UserModel ( * * __user__ ) if __user__ else None
images = await image_generations (
request = __request__ ,
form_data = CreateImageForm ( prompt = prompt ) ,
user = user ,
)
2026-01-06 02:19:57 +04:00
# Prepare file entries for the images
image_files = [ { " type " : " image " , " url " : img [ " url " ] } for img in images ]
# Persist files to DB if chat context is available
if __chat_id__ and __message_id__ and images :
image_files = Chats . add_message_files_by_id_and_message_id (
__chat_id__ ,
__message_id__ ,
image_files ,
)
2026-01-05 04:45:17 +04:00
# Emit the images to the UI if event emitter is available
2026-01-06 02:19:57 +04:00
if __event_emitter__ and image_files :
2026-01-05 04:45:17 +04:00
await __event_emitter__ (
{
2026-01-06 02:19:57 +04:00
" type " : " chat:message:files " ,
2026-01-05 04:45:17 +04:00
" data " : {
2026-01-06 02:19:57 +04:00
" files " : image_files ,
2026-01-05 04:45:17 +04:00
} ,
}
)
2026-01-06 02:00:30 +04:00
# Return a message indicating the image is already displayed
return json . dumps (
{
" status " : " success " ,
" message " : " The image has been successfully generated and is already visible to the user in the chat. You do not need to display or embed the image again - just acknowledge that it has been created. " ,
" images " : images ,
} ,
ensure_ascii = False ,
)
2026-01-05 04:45:17 +04:00
return json . dumps ( { " status " : " success " , " images " : images } , ensure_ascii = False )
except Exception as e :
log . exception ( f " generate_image error: { e } " )
return json . dumps ( { " error " : str ( e ) } )
async def edit_image (
prompt : str ,
2026-01-06 02:19:57 +04:00
image_urls : list [ str ] ,
2026-01-05 04:45:17 +04:00
__request__ : Request = None ,
__user__ : dict = None ,
__event_emitter__ : callable = None ,
2026-01-06 02:19:57 +04:00
__chat_id__ : str = None ,
__message_id__ : str = None ,
2026-01-05 04:45:17 +04:00
) - > str :
"""
2026-01-06 02:19:57 +04:00
Edit existing images based on a text prompt .
2026-01-05 04:45:17 +04:00
2026-01-06 02:19:57 +04:00
: param prompt : A description of the changes to make to the images
: param image_urls : A list of URLs of the images to edit
: return : Confirmation that the images were edited , or an error message
2026-01-05 04:45:17 +04:00
"""
if __request__ is None :
return json . dumps ( { " error " : " Request context not available " } )
try :
user = UserModel ( * * __user__ ) if __user__ else None
images = await image_edits (
request = __request__ ,
2026-01-06 02:19:57 +04:00
form_data = EditImageForm ( prompt = prompt , image = image_urls ) ,
2026-01-05 04:45:17 +04:00
user = user ,
)
2026-01-06 02:19:57 +04:00
# Prepare file entries for the images
image_files = [ { " type " : " image " , " url " : img [ " url " ] } for img in images ]
# Persist files to DB if chat context is available
if __chat_id__ and __message_id__ and images :
image_files = Chats . add_message_files_by_id_and_message_id (
__chat_id__ ,
__message_id__ ,
image_files ,
)
2026-01-05 04:45:17 +04:00
# Emit the images to the UI if event emitter is available
2026-01-06 02:19:57 +04:00
if __event_emitter__ and image_files :
2026-01-05 04:45:17 +04:00
await __event_emitter__ (
{
2026-01-06 02:19:57 +04:00
" type " : " chat:message:files " ,
2026-01-05 04:45:17 +04:00
" data " : {
2026-01-06 02:19:57 +04:00
" files " : image_files ,
2026-01-05 04:45:17 +04:00
} ,
}
)
2026-01-06 02:00:30 +04:00
# Return a message indicating the image is already displayed
return json . dumps (
{
" status " : " success " ,
" message " : " The edited image has been successfully generated and is already visible to the user in the chat. You do not need to display or embed the image again - just acknowledge that it has been created. " ,
" images " : images ,
} ,
ensure_ascii = False ,
)
2026-01-05 04:45:17 +04:00
return json . dumps ( { " status " : " success " , " images " : images } , ensure_ascii = False )
except Exception as e :
log . exception ( f " edit_image error: { e } " )
return json . dumps ( { " error " : str ( e ) } )
2026-01-11 21:18:41 +01:00
# =============================================================================
# CODE INTERPRETER TOOLS
# =============================================================================
async def execute_code (
code : str ,
__request__ : Request = None ,
__user__ : dict = None ,
__event_emitter__ : callable = None ,
__event_call__ : callable = None ,
__chat_id__ : str = None ,
__message_id__ : str = None ,
__metadata__ : dict = None ,
) - > str :
"""
Execute Python code in a sandboxed environment and return the output .
Use this to perform calculations , data analysis , generate visualizations ,
or run any Python code that would help answer the user ' s question.
: param code : The Python code to execute
: return : JSON with stdout , stderr , and result from execution
"""
from uuid import uuid4
if __request__ is None :
return json . dumps ( { " error " : " Request context not available " } )
try :
2026-02-06 22:25:18 +04:00
# Sanitize code (strips ANSI codes and markdown fences)
code = sanitize_code ( code )
2026-01-27 21:37:20 +04:00
2026-01-11 21:18:41 +01:00
# Import blocked modules from config (same as middleware)
from open_webui . config import CODE_INTERPRETER_BLOCKED_MODULES
# Add import blocking code if there are blocked modules
if CODE_INTERPRETER_BLOCKED_MODULES :
import textwrap
2026-02-11 16:24:11 -06:00
blocking_code = textwrap . dedent ( f """
2026-01-11 21:18:41 +01:00
import builtins
BLOCKED_MODULES = { CODE_INTERPRETER_BLOCKED_MODULES }
_real_import = builtins . __import__
def restricted_import ( name , globals = None , locals = None , fromlist = ( ) , level = 0 ) :
if name . split ( ' . ' ) [ 0 ] in BLOCKED_MODULES :
importer_name = globals . get ( ' __name__ ' ) if globals else None
if importer_name == ' __main__ ' :
raise ImportError (
f " Direct import of module {{ name }} is restricted. "
)
return _real_import ( name , globals , locals , fromlist , level )
builtins . __import__ = restricted_import
2026-02-11 16:24:11 -06:00
""" )
2026-01-11 21:18:41 +01:00
code = blocking_code + " \n " + code
2026-02-11 16:24:11 -06:00
engine = getattr (
__request__ . app . state . config , " CODE_INTERPRETER_ENGINE " , " pyodide "
)
2026-01-11 21:18:41 +01:00
if engine == " pyodide " :
# Execute via frontend pyodide using bidirectional event call
if __event_call__ is None :
2026-02-11 16:24:11 -06:00
return json . dumps (
{
" error " : " Event call not available. WebSocket connection required for pyodide execution. "
}
)
2026-01-11 21:18:41 +01:00
output = await __event_call__ (
{
" type " : " execute:python " ,
" data " : {
" id " : str ( uuid4 ( ) ) ,
" code " : code ,
2026-02-11 16:24:11 -06:00
" session_id " : (
__metadata__ . get ( " session_id " ) if __metadata__ else None
) ,
2026-01-11 21:18:41 +01:00
} ,
}
)
# Parse the output - pyodide returns dict with stdout, stderr, result
if isinstance ( output , dict ) :
stdout = output . get ( " stdout " , " " )
stderr = output . get ( " stderr " , " " )
result = output . get ( " result " , " " )
else :
stdout = " "
stderr = " "
result = str ( output ) if output else " "
elif engine == " jupyter " :
from open_webui . utils . code_interpreter import execute_code_jupyter
output = await execute_code_jupyter (
__request__ . app . state . config . CODE_INTERPRETER_JUPYTER_URL ,
code ,
(
__request__ . app . state . config . CODE_INTERPRETER_JUPYTER_AUTH_TOKEN
2026-02-11 16:24:11 -06:00
if __request__ . app . state . config . CODE_INTERPRETER_JUPYTER_AUTH
== " token "
2026-01-11 21:18:41 +01:00
else None
) ,
(
__request__ . app . state . config . CODE_INTERPRETER_JUPYTER_AUTH_PASSWORD
2026-02-11 16:24:11 -06:00
if __request__ . app . state . config . CODE_INTERPRETER_JUPYTER_AUTH
== " password "
2026-01-11 21:18:41 +01:00
else None
) ,
__request__ . app . state . config . CODE_INTERPRETER_JUPYTER_TIMEOUT ,
)
stdout = output . get ( " stdout " , " " )
stderr = output . get ( " stderr " , " " )
result = output . get ( " result " , " " )
else :
return json . dumps ( { " error " : f " Unknown code interpreter engine: { engine } " } )
# Handle image outputs (base64 encoded) - replace with uploaded URLs
# Get actual user object for image upload (upload_image requires user.id attribute)
if __user__ and __user__ . get ( " id " ) :
from open_webui . models . users import Users
from open_webui . utils . files import get_image_url_from_base64
user = Users . get_user_by_id ( __user__ [ " id " ] )
# Extract and upload images from stdout
if stdout and isinstance ( stdout , str ) :
stdout_lines = stdout . split ( " \n " )
for idx , line in enumerate ( stdout_lines ) :
if " data:image/png;base64 " in line :
image_url = get_image_url_from_base64 (
__request__ ,
line ,
__metadata__ or { } ,
user ,
)
if image_url :
stdout_lines [ idx ] = f "  "
stdout = " \n " . join ( stdout_lines )
# Extract and upload images from result
if result and isinstance ( result , str ) :
result_lines = result . split ( " \n " )
for idx , line in enumerate ( result_lines ) :
if " data:image/png;base64 " in line :
image_url = get_image_url_from_base64 (
__request__ ,
line ,
__metadata__ or { } ,
user ,
)
if image_url :
result_lines [ idx ] = f "  "
result = " \n " . join ( result_lines )
response = {
" status " : " success " ,
" stdout " : stdout ,
" stderr " : stderr ,
" result " : result ,
}
return json . dumps ( response , ensure_ascii = False )
except Exception as e :
log . exception ( f " execute_code error: { e } " )
return json . dumps ( { " error " : str ( e ) } )
2026-01-05 17:45:39 +04:00
# =============================================================================
# MEMORY TOOLS
# =============================================================================
2026-01-06 01:42:49 +04:00
async def search_memories (
2026-01-05 04:45:17 +04:00
query : str ,
2026-01-09 03:03:25 +04:00
count : int = 5 ,
2026-01-05 04:45:17 +04:00
__request__ : Request = None ,
__user__ : dict = None ,
) - > str :
"""
Search the user ' s stored memories for relevant information.
: param query : The search query to find relevant memories
2026-01-09 03:03:25 +04:00
: param count : Number of memories to return ( default 5 )
2026-01-05 04:45:17 +04:00
: return : JSON with matching memories and their dates
"""
if __request__ is None :
return json . dumps ( { " error " : " Request context not available " } )
try :
user = UserModel ( * * __user__ ) if __user__ else None
results = await query_memory (
__request__ ,
2026-01-09 03:03:25 +04:00
QueryMemoryForm ( content = query , k = count ) ,
2026-01-05 04:45:17 +04:00
user ,
)
if results and hasattr ( results , " documents " ) and results . documents :
memories = [ ]
for doc_idx , doc in enumerate ( results . documents [ 0 ] ) :
2026-01-06 01:42:49 +04:00
memory_id = None
if results . ids and results . ids [ 0 ] :
memory_id = results . ids [ 0 ] [ doc_idx ]
2026-01-05 04:45:17 +04:00
created_at = " Unknown "
2026-01-05 17:45:39 +04:00
if results . metadatas and results . metadatas [ 0 ] [ doc_idx ] . get (
" created_at "
) :
2026-01-05 04:45:17 +04:00
created_at = time . strftime (
2026-01-05 17:45:39 +04:00
" % Y- % m- %d " ,
time . localtime ( results . metadatas [ 0 ] [ doc_idx ] [ " created_at " ] ) ,
2026-01-05 04:45:17 +04:00
)
2026-01-06 01:42:49 +04:00
memories . append ( { " id " : memory_id , " date " : created_at , " content " : doc } )
2026-01-05 04:45:17 +04:00
return json . dumps ( memories , ensure_ascii = False )
else :
return json . dumps ( [ ] )
except Exception as e :
2026-01-06 01:42:49 +04:00
log . exception ( f " search_memories error: { e } " )
2026-01-05 04:45:17 +04:00
return json . dumps ( { " error " : str ( e ) } )
2026-01-06 01:42:49 +04:00
async def add_memory (
2026-01-05 04:45:17 +04:00
content : str ,
__request__ : Request = None ,
__user__ : dict = None ,
) - > str :
"""
Store a new memory for the user .
: param content : The memory content to store
: return : Confirmation that the memory was stored
"""
if __request__ is None :
return json . dumps ( { " error " : " Request context not available " } )
try :
user = UserModel ( * * __user__ ) if __user__ else None
2026-01-06 01:42:49 +04:00
memory = await _add_memory (
2026-01-05 04:45:17 +04:00
__request__ ,
AddMemoryForm ( content = content ) ,
user ,
)
return json . dumps ( { " status " : " success " , " id " : memory . id } , ensure_ascii = False )
except Exception as e :
2026-01-06 01:42:49 +04:00
log . exception ( f " add_memory error: { e } " )
return json . dumps ( { " error " : str ( e ) } )
async def replace_memory_content (
memory_id : str ,
content : str ,
__request__ : Request = None ,
__user__ : dict = None ,
) - > str :
"""
Update the content of an existing memory by its ID .
: param memory_id : The ID of the memory to update
: param content : The new content for the memory
: return : Confirmation that the memory was updated
"""
if __request__ is None :
return json . dumps ( { " error " : " Request context not available " } )
try :
user = UserModel ( * * __user__ ) if __user__ else None
memory = await update_memory_by_id (
memory_id = memory_id ,
request = __request__ ,
form_data = MemoryUpdateModel ( content = content ) ,
user = user ,
)
return json . dumps (
{ " status " : " success " , " id " : memory . id , " content " : memory . content } ,
ensure_ascii = False ,
)
except Exception as e :
log . exception ( f " replace_memory_content error: { e } " )
2026-01-05 04:45:17 +04:00
return json . dumps ( { " error " : str ( e ) } )
2026-01-05 17:45:39 +04:00
2026-02-19 14:06:24 -06:00
async def delete_memory (
memory_id : str ,
__request__ : Request = None ,
__user__ : dict = None ,
) - > str :
"""
Delete a memory by its ID .
: param memory_id : The ID of the memory to delete
: return : Confirmation that the memory was deleted
"""
if __request__ is None :
return json . dumps ( { " error " : " Request context not available " } )
try :
user = UserModel ( * * __user__ ) if __user__ else None
result = Memories . delete_memory_by_id_and_user_id ( memory_id , user . id )
if result :
VECTOR_DB_CLIENT . delete (
collection_name = f " user-memory- { user . id } " , ids = [ memory_id ]
)
return json . dumps (
{ " status " : " success " , " message " : f " Memory { memory_id } deleted " } ,
ensure_ascii = False ,
)
else :
return json . dumps ( { " error " : " Memory not found or access denied " } )
except Exception as e :
log . exception ( f " delete_memory error: { e } " )
return json . dumps ( { " error " : str ( e ) } )
async def list_memories (
__request__ : Request = None ,
__user__ : dict = None ,
) - > str :
"""
List all stored memories for the user .
: return : JSON list of all memories with id , content , and dates
"""
if __request__ is None :
return json . dumps ( { " error " : " Request context not available " } )
try :
user = UserModel ( * * __user__ ) if __user__ else None
memories = Memories . get_memories_by_user_id ( user . id )
if memories :
result = [
{
" id " : m . id ,
" content " : m . content ,
" created_at " : time . strftime (
" % Y- % m- %d % H: % M " , time . localtime ( m . created_at )
) ,
" updated_at " : time . strftime (
" % Y- % m- %d % H: % M " , time . localtime ( m . updated_at )
) ,
}
for m in memories
]
return json . dumps ( result , ensure_ascii = False )
else :
return json . dumps ( [ ] )
except Exception as e :
log . exception ( f " list_memories error: { e } " )
return json . dumps ( { " error " : str ( e ) } )
2026-01-05 17:45:39 +04:00
# =============================================================================
# NOTES TOOLS
# =============================================================================
async def search_notes (
query : str ,
count : int = 5 ,
start_timestamp : Optional [ int ] = None ,
end_timestamp : Optional [ int ] = None ,
__request__ : Request = None ,
__user__ : dict = None ,
) - > str :
"""
Search the user ' s notes by title and content.
: param query : The search query to find matching notes
: param count : Maximum number of results to return ( default : 5 )
: param start_timestamp : Only include notes updated after this Unix timestamp ( seconds )
: param end_timestamp : Only include notes updated before this Unix timestamp ( seconds )
: return : JSON with matching notes containing id , title , and content snippet
"""
if __request__ is None :
return json . dumps ( { " error " : " Request context not available " } )
if not __user__ :
return json . dumps ( { " error " : " User context not available " } )
try :
user_id = __user__ . get ( " id " )
user_group_ids = [ group . id for group in Groups . get_groups_by_member_id ( user_id ) ]
result = Notes . search_notes (
user_id = user_id ,
filter = {
" query " : query ,
" user_id " : user_id ,
" group_ids " : user_group_ids ,
" permission " : " read " ,
} ,
skip = 0 ,
limit = count * 3 , # Fetch more for filtering
)
# Convert timestamps to nanoseconds for comparison
start_ts = start_timestamp * 1_000_000_000 if start_timestamp else None
end_ts = end_timestamp * 1_000_000_000 if end_timestamp else None
notes = [ ]
for note in result . items :
# Apply date filters (updated_at is in nanoseconds)
if start_ts and note . updated_at < start_ts :
continue
if end_ts and note . updated_at > end_ts :
continue
# Extract a snippet from the markdown content
content_snippet = " "
if note . data and note . data . get ( " content " , { } ) . get ( " md " ) :
md_content = note . data [ " content " ] [ " md " ]
lower_content = md_content . lower ( )
lower_query = query . lower ( )
idx = lower_content . find ( lower_query )
if idx != - 1 :
start = max ( 0 , idx - 50 )
end = min ( len ( md_content ) , idx + len ( query ) + 100 )
content_snippet = (
( " ... " if start > 0 else " " )
+ md_content [ start : end ]
+ ( " ... " if end < len ( md_content ) else " " )
)
else :
content_snippet = md_content [ : 150 ] + (
" ... " if len ( md_content ) > 150 else " "
)
notes . append (
{
" id " : note . id ,
" title " : note . title ,
" snippet " : content_snippet ,
" updated_at " : note . updated_at ,
}
)
if len ( notes ) > = count :
break
return json . dumps ( notes , ensure_ascii = False )
except Exception as e :
log . exception ( f " search_notes error: { e } " )
return json . dumps ( { " error " : str ( e ) } )
async def view_note (
note_id : str ,
__request__ : Request = None ,
__user__ : dict = None ,
) - > str :
"""
Get the full content of a note by its ID .
: param note_id : The ID of the note to retrieve
: return : JSON with the note ' s id, title, and full markdown content
"""
if __request__ is None :
return json . dumps ( { " error " : " Request context not available " } )
if not __user__ :
return json . dumps ( { " error " : " User context not available " } )
try :
note = Notes . get_note_by_id ( note_id )
if not note :
return json . dumps ( { " error " : " Note not found " } )
# Check access permission
user_id = __user__ . get ( " id " )
user_group_ids = [ group . id for group in Groups . get_groups_by_member_id ( user_id ) ]
2026-02-09 13:28:14 -06:00
from open_webui . models . access_grants import AccessGrants
2026-01-05 17:45:39 +04:00
2026-02-09 13:28:14 -06:00
if note . user_id != user_id and not AccessGrants . has_access (
user_id = user_id ,
resource_type = " note " ,
resource_id = note . id ,
permission = " read " ,
user_group_ids = set ( user_group_ids ) ,
2026-01-05 17:45:39 +04:00
) :
return json . dumps ( { " error " : " Access denied " } )
# Extract markdown content
content = " "
if note . data and note . data . get ( " content " , { } ) . get ( " md " ) :
content = note . data [ " content " ] [ " md " ]
return json . dumps (
{
" id " : note . id ,
" title " : note . title ,
" content " : content ,
" updated_at " : note . updated_at ,
" created_at " : note . created_at ,
} ,
ensure_ascii = False ,
)
except Exception as e :
log . exception ( f " view_note error: { e } " )
return json . dumps ( { " error " : str ( e ) } )
async def write_note (
title : str ,
content : str ,
__request__ : Request = None ,
__user__ : dict = None ,
) - > str :
"""
Create a new note with the given title and content .
: param title : The title of the new note
: param content : The markdown content for the note
: return : JSON with success status and new note id
"""
if __request__ is None :
return json . dumps ( { " error " : " Request context not available " } )
if not __user__ :
return json . dumps ( { " error " : " User context not available " } )
try :
from open_webui . models . notes import NoteForm
user_id = __user__ . get ( " id " )
form = NoteForm (
title = title ,
data = { " content " : { " md " : content } } ,
2026-02-09 13:28:14 -06:00
access_grants = [ ] , # Private by default - only owner can access
2026-01-05 17:45:39 +04:00
)
new_note = Notes . insert_new_note ( user_id , form )
if not new_note :
return json . dumps ( { " error " : " Failed to create note " } )
return json . dumps (
{
" status " : " success " ,
" id " : new_note . id ,
" title " : new_note . title ,
" created_at " : new_note . created_at ,
} ,
ensure_ascii = False ,
)
except Exception as e :
log . exception ( f " write_note error: { e } " )
return json . dumps ( { " error " : str ( e ) } )
async def replace_note_content (
note_id : str ,
content : str ,
title : Optional [ str ] = None ,
__request__ : Request = None ,
__user__ : dict = None ,
) - > str :
"""
Update the content of a note . Use this to modify task lists , add notes , or update content .
: param note_id : The ID of the note to update
: param content : The new markdown content for the note
: param title : Optional new title for the note
: return : JSON with success status and updated note info
"""
if __request__ is None :
return json . dumps ( { " error " : " Request context not available " } )
if not __user__ :
return json . dumps ( { " error " : " User context not available " } )
try :
from open_webui . models . notes import NoteUpdateForm
note = Notes . get_note_by_id ( note_id )
if not note :
return json . dumps ( { " error " : " Note not found " } )
# Check write permission
user_id = __user__ . get ( " id " )
user_group_ids = [ group . id for group in Groups . get_groups_by_member_id ( user_id ) ]
2026-02-09 13:28:14 -06:00
from open_webui . models . access_grants import AccessGrants
2026-01-05 17:45:39 +04:00
2026-02-09 13:28:14 -06:00
if note . user_id != user_id and not AccessGrants . has_access (
user_id = user_id ,
resource_type = " note " ,
resource_id = note . id ,
permission = " write " ,
user_group_ids = set ( user_group_ids ) ,
2026-01-05 17:45:39 +04:00
) :
return json . dumps ( { " error " : " Write access denied " } )
# Build update form
update_data = { " data " : { " content " : { " md " : content } } }
if title :
update_data [ " title " ] = title
form = NoteUpdateForm ( * * update_data )
updated_note = Notes . update_note_by_id ( note_id , form )
if not updated_note :
return json . dumps ( { " error " : " Failed to update note " } )
return json . dumps (
{
" status " : " success " ,
" id " : updated_note . id ,
" title " : updated_note . title ,
" updated_at " : updated_note . updated_at ,
} ,
ensure_ascii = False ,
)
except Exception as e :
log . exception ( f " replace_note_content error: { e } " )
return json . dumps ( { " error " : str ( e ) } )
# =============================================================================
# CHATS TOOLS
# =============================================================================
async def search_chats (
query : str ,
count : int = 5 ,
start_timestamp : Optional [ int ] = None ,
end_timestamp : Optional [ int ] = None ,
__request__ : Request = None ,
__user__ : dict = None ,
2026-01-22 14:59:15 +04:00
__chat_id__ : str = None ,
2026-01-05 17:45:39 +04:00
) - > str :
"""
Search the user ' s previous chat conversations by title and message content.
: param query : The search query to find matching chats
: param count : Maximum number of results to return ( default : 5 )
: param start_timestamp : Only include chats updated after this Unix timestamp ( seconds )
: param end_timestamp : Only include chats updated before this Unix timestamp ( seconds )
: return : JSON with matching chats containing id , title , updated_at , and content snippet
"""
if __request__ is None :
return json . dumps ( { " error " : " Request context not available " } )
if not __user__ :
return json . dumps ( { " error " : " User context not available " } )
try :
user_id = __user__ . get ( " id " )
chats = Chats . get_chats_by_user_id_and_search_text (
user_id = user_id ,
search_text = query ,
include_archived = False ,
skip = 0 ,
limit = count * 3 , # Fetch more for filtering
)
results = [ ]
for chat in chats :
2026-01-22 14:59:15 +04:00
# Skip the current chat to avoid showing it in search results
if __chat_id__ and chat . id == __chat_id__ :
continue
2026-01-05 17:45:39 +04:00
# Apply date filters (updated_at is in seconds)
if start_timestamp and chat . updated_at < start_timestamp :
continue
if end_timestamp and chat . updated_at > end_timestamp :
continue
# Find a matching message snippet
snippet = " "
messages = chat . chat . get ( " history " , { } ) . get ( " messages " , { } )
lower_query = query . lower ( )
for msg_id , msg in messages . items ( ) :
content = msg . get ( " content " , " " )
if isinstance ( content , str ) and lower_query in content . lower ( ) :
idx = content . lower ( ) . find ( lower_query )
start = max ( 0 , idx - 50 )
end = min ( len ( content ) , idx + len ( query ) + 100 )
snippet = (
( " ... " if start > 0 else " " )
+ content [ start : end ]
+ ( " ... " if end < len ( content ) else " " )
)
break
if not snippet and lower_query in chat . title . lower ( ) :
snippet = f " Title match: { chat . title } "
results . append (
{
" id " : chat . id ,
" title " : chat . title ,
" snippet " : snippet ,
" updated_at " : chat . updated_at ,
}
)
if len ( results ) > = count :
break
return json . dumps ( results , ensure_ascii = False )
except Exception as e :
log . exception ( f " search_chats error: { e } " )
return json . dumps ( { " error " : str ( e ) } )
async def view_chat (
chat_id : str ,
__request__ : Request = None ,
__user__ : dict = None ,
) - > str :
"""
Get the full conversation history of a chat by its ID .
: param chat_id : The ID of the chat to retrieve
: return : JSON with the chat ' s id, title, and messages
"""
if __request__ is None :
return json . dumps ( { " error " : " Request context not available " } )
if not __user__ :
return json . dumps ( { " error " : " User context not available " } )
try :
user_id = __user__ . get ( " id " )
chat = Chats . get_chat_by_id_and_user_id ( chat_id , user_id )
if not chat :
return json . dumps ( { " error " : " Chat not found or access denied " } )
# Extract messages from history
messages = [ ]
history = chat . chat . get ( " history " , { } )
msg_dict = history . get ( " messages " , { } )
# Build message chain from currentId
current_id = history . get ( " currentId " )
visited = set ( )
while current_id and current_id not in visited :
visited . add ( current_id )
msg = msg_dict . get ( current_id )
if msg :
messages . append (
{
" role " : msg . get ( " role " , " " ) ,
" content " : msg . get ( " content " , " " ) ,
}
)
current_id = msg . get ( " parentId " ) if msg else None
# Reverse to get chronological order
messages . reverse ( )
return json . dumps (
{
" id " : chat . id ,
" title " : chat . title ,
" messages " : messages ,
" updated_at " : chat . updated_at ,
" created_at " : chat . created_at ,
} ,
ensure_ascii = False ,
)
except Exception as e :
log . exception ( f " view_chat error: { e } " )
return json . dumps ( { " error " : str ( e ) } )
# =============================================================================
# CHANNELS TOOLS
# =============================================================================
async def search_channels (
query : str ,
count : int = 5 ,
__request__ : Request = None ,
__user__ : dict = None ,
) - > str :
"""
Search for channels by name and description that the user has access to .
: param query : The search query to find matching channels
: param count : Maximum number of results to return ( default : 5 )
: return : JSON with matching channels containing id , name , description , and type
"""
if __request__ is None :
return json . dumps ( { " error " : " Request context not available " } )
if not __user__ :
return json . dumps ( { " error " : " User context not available " } )
try :
user_id = __user__ . get ( " id " )
# Get all channels the user has access to
all_channels = Channels . get_channels_by_user_id ( user_id )
# Filter by query
lower_query = query . lower ( )
matching_channels = [ ]
for channel in all_channels :
name_match = lower_query in channel . name . lower ( ) if channel . name else False
desc_match = lower_query in ( channel . description or " " ) . lower ( )
if name_match or desc_match :
matching_channels . append (
{
" id " : channel . id ,
" name " : channel . name ,
" description " : channel . description or " " ,
" type " : channel . type or " public " ,
}
)
if len ( matching_channels ) > = count :
break
return json . dumps ( matching_channels , ensure_ascii = False )
except Exception as e :
log . exception ( f " search_channels error: { e } " )
return json . dumps ( { " error " : str ( e ) } )
async def search_channel_messages (
query : str ,
count : int = 10 ,
start_timestamp : Optional [ int ] = None ,
end_timestamp : Optional [ int ] = None ,
__request__ : Request = None ,
__user__ : dict = None ,
) - > str :
"""
Search for messages in channels the user is a member of , including thread replies .
: param query : The search query to find matching messages
: param count : Maximum number of results to return ( default : 10 )
: param start_timestamp : Only include messages created after this Unix timestamp ( seconds )
: param end_timestamp : Only include messages created before this Unix timestamp ( seconds )
: return : JSON with matching messages containing channel info , message content , and thread context
"""
if __request__ is None :
return json . dumps ( { " error " : " Request context not available " } )
if not __user__ :
return json . dumps ( { " error " : " User context not available " } )
try :
user_id = __user__ . get ( " id " )
# Get all channels the user has access to
user_channels = Channels . get_channels_by_user_id ( user_id )
channel_ids = [ c . id for c in user_channels ]
channel_map = { c . id : c for c in user_channels }
if not channel_ids :
return json . dumps ( [ ] )
# Convert timestamps to nanoseconds (Message.created_at is in nanoseconds)
start_ts = start_timestamp * 1_000_000_000 if start_timestamp else None
end_ts = end_timestamp * 1_000_000_000 if end_timestamp else None
# Search messages using the model method
matching_messages = Messages . search_messages_by_channel_ids (
channel_ids = channel_ids ,
query = query ,
start_timestamp = start_ts ,
end_timestamp = end_ts ,
limit = count ,
)
results = [ ]
for msg in matching_messages :
channel = channel_map . get ( msg . channel_id )
# Extract snippet around the match
content = msg . content or " "
lower_query = query . lower ( )
idx = content . lower ( ) . find ( lower_query )
if idx != - 1 :
start = max ( 0 , idx - 50 )
end = min ( len ( content ) , idx + len ( query ) + 100 )
snippet = (
( " ... " if start > 0 else " " )
+ content [ start : end ]
+ ( " ... " if end < len ( content ) else " " )
)
else :
snippet = content [ : 150 ] + ( " ... " if len ( content ) > 150 else " " )
results . append (
{
" channel_id " : msg . channel_id ,
" channel_name " : channel . name if channel else " Unknown " ,
" message_id " : msg . id ,
" content_snippet " : snippet ,
" is_thread_reply " : msg . parent_id is not None ,
" parent_id " : msg . parent_id ,
" created_at " : msg . created_at ,
}
)
return json . dumps ( results , ensure_ascii = False )
except Exception as e :
log . exception ( f " search_channel_messages error: { e } " )
return json . dumps ( { " error " : str ( e ) } )
async def view_channel_message (
message_id : str ,
__request__ : Request = None ,
__user__ : dict = None ,
) - > str :
"""
Get the full content of a channel message by its ID , including thread replies .
: param message_id : The ID of the message to retrieve
: return : JSON with the message content , channel info , and thread replies if any
"""
if __request__ is None :
return json . dumps ( { " error " : " Request context not available " } )
if not __user__ :
return json . dumps ( { " error " : " User context not available " } )
try :
user_id = __user__ . get ( " id " )
message = Messages . get_message_by_id ( message_id )
if not message :
return json . dumps ( { " error " : " Message not found " } )
# Verify user has access to the channel
channel = Channels . get_channel_by_id ( message . channel_id )
if not channel :
return json . dumps ( { " error " : " Channel not found " } )
# Check if user has access to the channel
user_channels = Channels . get_channels_by_user_id ( user_id )
channel_ids = [ c . id for c in user_channels ]
if message . channel_id not in channel_ids :
return json . dumps ( { " error " : " Access denied " } )
# Build response with thread information
result = {
" id " : message . id ,
" channel_id " : message . channel_id ,
" channel_name " : channel . name ,
" content " : message . content ,
" user_id " : message . user_id ,
" is_thread_reply " : message . parent_id is not None ,
" parent_id " : message . parent_id ,
" reply_count " : message . reply_count ,
" created_at " : message . created_at ,
" updated_at " : message . updated_at ,
}
# Include user info if available
if message . user :
result [ " user_name " ] = message . user . name
return json . dumps ( result , ensure_ascii = False )
except Exception as e :
log . exception ( f " view_channel_message error: { e } " )
return json . dumps ( { " error " : str ( e ) } )
async def view_channel_thread (
parent_message_id : str ,
__request__ : Request = None ,
__user__ : dict = None ,
) - > str :
"""
Get all messages in a channel thread , including the parent message and all replies .
: param parent_message_id : The ID of the parent message that started the thread
: return : JSON with the parent message and all thread replies in chronological order
"""
if __request__ is None :
return json . dumps ( { " error " : " Request context not available " } )
if not __user__ :
return json . dumps ( { " error " : " User context not available " } )
try :
user_id = __user__ . get ( " id " )
# Get the parent message
parent_message = Messages . get_message_by_id ( parent_message_id )
if not parent_message :
return json . dumps ( { " error " : " Message not found " } )
# Verify user has access to the channel
channel = Channels . get_channel_by_id ( parent_message . channel_id )
if not channel :
return json . dumps ( { " error " : " Channel not found " } )
user_channels = Channels . get_channels_by_user_id ( user_id )
channel_ids = [ c . id for c in user_channels ]
if parent_message . channel_id not in channel_ids :
return json . dumps ( { " error " : " Access denied " } )
# Get all thread replies
thread_replies = Messages . get_thread_replies_by_message_id ( parent_message_id )
# Build the response
messages = [ ]
# Add parent message first
messages . append (
{
" id " : parent_message . id ,
" content " : parent_message . content ,
" user_id " : parent_message . user_id ,
" user_name " : parent_message . user . name if parent_message . user else None ,
" is_parent " : True ,
" created_at " : parent_message . created_at ,
}
)
# Add thread replies (reverse to get chronological order)
for reply in reversed ( thread_replies ) :
messages . append (
{
" id " : reply . id ,
" content " : reply . content ,
" user_id " : reply . user_id ,
" user_name " : reply . user . name if reply . user else None ,
" is_parent " : False ,
" reply_to_id " : reply . reply_to_id ,
" created_at " : reply . created_at ,
}
)
return json . dumps (
{
" channel_id " : parent_message . channel_id ,
" channel_name " : channel . name ,
" thread_id " : parent_message_id ,
" message_count " : len ( messages ) ,
" messages " : messages ,
} ,
ensure_ascii = False ,
)
except Exception as e :
log . exception ( f " view_channel_thread error: { e } " )
return json . dumps ( { " error " : str ( e ) } )
2026-01-07 08:58:58 -05:00
# =============================================================================
# KNOWLEDGE BASE TOOLS
# =============================================================================
async def list_knowledge_bases (
count : int = 10 ,
skip : int = 0 ,
__request__ : Request = None ,
__user__ : dict = None ,
) - > str :
"""
List the user ' s accessible knowledge bases.
: param count : Maximum number of KBs to return ( default : 10 )
: param skip : Number of results to skip for pagination ( default : 0 )
: return : JSON with KBs containing id , name , description , and file_count
"""
if __request__ is None :
return json . dumps ( { " error " : " Request context not available " } )
if not __user__ :
return json . dumps ( { " error " : " User context not available " } )
try :
from open_webui . models . knowledge import Knowledges
user_id = __user__ . get ( " id " )
user_group_ids = [ group . id for group in Groups . get_groups_by_member_id ( user_id ) ]
result = Knowledges . search_knowledge_bases (
user_id ,
filter = {
2026-01-07 09:46:07 -05:00
" query " : " " ,
2026-01-07 08:58:58 -05:00
" user_id " : user_id ,
" group_ids " : user_group_ids ,
} ,
skip = skip ,
limit = count ,
)
knowledge_bases = [ ]
for knowledge_base in result . items :
files = Knowledges . get_files_by_id ( knowledge_base . id )
file_count = len ( files ) if files else 0
knowledge_bases . append (
{
" id " : knowledge_base . id ,
" name " : knowledge_base . name ,
" description " : knowledge_base . description or " " ,
" file_count " : file_count ,
" updated_at " : knowledge_base . updated_at ,
}
)
return json . dumps ( knowledge_bases , ensure_ascii = False )
except Exception as e :
log . exception ( f " list_knowledge_bases error: { e } " )
return json . dumps ( { " error " : str ( e ) } )
2026-01-07 09:46:07 -05:00
2026-01-07 08:58:58 -05:00
async def search_knowledge_bases (
query : str ,
count : int = 5 ,
skip : int = 0 ,
__request__ : Request = None ,
__user__ : dict = None ,
) - > str :
"""
Search the user ' s accessible knowledge bases by name and description.
: param query : The search query to find matching knowledge bases
: param count : Maximum number of results to return ( default : 5 )
: param skip : Number of results to skip for pagination ( default : 0 )
: return : JSON with matching KBs containing id , name , description , and file_count
"""
if __request__ is None :
return json . dumps ( { " error " : " Request context not available " } )
if not __user__ :
return json . dumps ( { " error " : " User context not available " } )
try :
from open_webui . models . knowledge import Knowledges
user_id = __user__ . get ( " id " )
user_group_ids = [ group . id for group in Groups . get_groups_by_member_id ( user_id ) ]
result = Knowledges . search_knowledge_bases (
user_id ,
filter = {
" query " : query ,
" user_id " : user_id ,
" group_ids " : user_group_ids ,
} ,
skip = skip ,
limit = count ,
)
knowledge_bases = [ ]
for knowledge_base in result . items :
files = Knowledges . get_files_by_id ( knowledge_base . id )
file_count = len ( files ) if files else 0
knowledge_bases . append (
{
" id " : knowledge_base . id ,
" name " : knowledge_base . name ,
" description " : knowledge_base . description or " " ,
" file_count " : file_count ,
" updated_at " : knowledge_base . updated_at ,
}
)
return json . dumps ( knowledge_bases , ensure_ascii = False )
except Exception as e :
log . exception ( f " search_knowledge_bases error: { e } " )
return json . dumps ( { " error " : str ( e ) } )
async def search_knowledge_files (
query : str ,
knowledge_id : Optional [ str ] = None ,
count : int = 5 ,
skip : int = 0 ,
__request__ : Request = None ,
__user__ : dict = None ,
) - > str :
"""
Search files across knowledge bases the user has access to .
: param query : The search query to find matching files by filename
: param knowledge_id : Optional KB id to limit search to a specific knowledge base
: param count : Maximum number of results to return ( default : 5 )
: param skip : Number of results to skip for pagination ( default : 0 )
2026-01-07 09:46:07 -05:00
: return : JSON with matching files containing id , filename , and updated_at
2026-01-07 08:58:58 -05:00
"""
if __request__ is None :
return json . dumps ( { " error " : " Request context not available " } )
if not __user__ :
return json . dumps ( { " error " : " User context not available " } )
try :
from open_webui . models . knowledge import Knowledges
user_id = __user__ . get ( " id " )
user_group_ids = [ group . id for group in Groups . get_groups_by_member_id ( user_id ) ]
if knowledge_id :
result = Knowledges . search_files_by_id (
knowledge_id = knowledge_id ,
user_id = user_id ,
filter = { " query " : query } ,
skip = skip ,
limit = count ,
)
else :
result = Knowledges . search_knowledge_files (
filter = {
" query " : query ,
" user_id " : user_id ,
" group_ids " : user_group_ids ,
} ,
skip = skip ,
limit = count ,
)
files = [ ]
for file in result . items :
file_info = {
" id " : file . id ,
" filename " : file . filename ,
" updated_at " : file . updated_at ,
}
if hasattr ( file , " collection " ) and file . collection :
file_info [ " knowledge_id " ] = file . collection . get ( " id " , " " )
file_info [ " knowledge_name " ] = file . collection . get ( " name " , " " )
files . append ( file_info )
return json . dumps ( files , ensure_ascii = False )
except Exception as e :
log . exception ( f " search_knowledge_files error: { e } " )
return json . dumps ( { " error " : str ( e ) } )
2026-02-17 00:48:49 -06:00
async def view_file (
file_id : str ,
__request__ : Request = None ,
__user__ : dict = None ,
__model_knowledge__ : Optional [ list [ dict ] ] = None ,
) - > str :
"""
Get the full content of a file by its ID .
: param file_id : The ID of the file to retrieve
: return : JSON with the file ' s id, filename, and full text content
"""
if __request__ is None :
return json . dumps ( { " error " : " Request context not available " } )
if not __user__ :
return json . dumps ( { " error " : " User context not available " } )
try :
from open_webui . models . files import Files
from open_webui . routers . files import has_access_to_file
user_id = __user__ . get ( " id " )
user_role = __user__ . get ( " role " , " user " )
file = Files . get_file_by_id ( file_id )
if not file :
return json . dumps ( { " error " : " File not found " } )
if (
file . user_id != user_id
and user_role != " admin "
and not any (
item . get ( " type " ) == " file " and item . get ( " id " ) == file_id
for item in ( __model_knowledge__ or [ ] )
)
and not has_access_to_file (
file_id = file_id ,
access_type = " read " ,
user = UserModel ( * * __user__ ) ,
)
) :
return json . dumps ( { " error " : " File not found " } )
content = " "
if file . data :
content = file . data . get ( " content " , " " )
return json . dumps (
{
" id " : file . id ,
" filename " : file . filename ,
" content " : content ,
" updated_at " : file . updated_at ,
" created_at " : file . created_at ,
} ,
ensure_ascii = False ,
)
except Exception as e :
log . exception ( f " view_file error: { e } " )
return json . dumps ( { " error " : str ( e ) } )
2026-01-07 08:58:58 -05:00
async def view_knowledge_file (
file_id : str ,
__request__ : Request = None ,
__user__ : dict = None ,
) - > str :
"""
Get the full content of a file from a knowledge base .
: param file_id : The ID of the file to retrieve
: return : JSON with the file ' s id, filename, and full text content
"""
if __request__ is None :
return json . dumps ( { " error " : " Request context not available " } )
if not __user__ :
return json . dumps ( { " error " : " User context not available " } )
try :
from open_webui . models . files import Files
from open_webui . models . knowledge import Knowledges
2026-02-09 13:28:14 -06:00
from open_webui . models . access_grants import AccessGrants
2026-01-07 08:58:58 -05:00
user_id = __user__ . get ( " id " )
user_role = __user__ . get ( " role " , " user " )
user_group_ids = [ group . id for group in Groups . get_groups_by_member_id ( user_id ) ]
file = Files . get_file_by_id ( file_id )
if not file :
return json . dumps ( { " error " : " File not found " } )
2026-01-07 09:46:07 -05:00
# Check access via any KB containing this file
2026-01-07 08:58:58 -05:00
knowledges = Knowledges . get_knowledges_by_file_id ( file_id )
has_knowledge_access = False
knowledge_info = None
2026-01-07 09:46:07 -05:00
2026-01-07 08:58:58 -05:00
for knowledge_base in knowledges :
if (
user_role == " admin "
or knowledge_base . user_id == user_id
2026-02-09 13:28:14 -06:00
or AccessGrants . has_access (
user_id = user_id ,
resource_type = " knowledge " ,
resource_id = knowledge_base . id ,
permission = " read " ,
user_group_ids = set ( user_group_ids ) ,
2026-01-08 01:55:56 +04:00
)
2026-01-07 08:58:58 -05:00
) :
has_knowledge_access = True
knowledge_info = { " id " : knowledge_base . id , " name " : knowledge_base . name }
break
if not has_knowledge_access :
if file . user_id != user_id and user_role != " admin " :
return json . dumps ( { " error " : " Access denied " } )
content = " "
if file . data :
content = file . data . get ( " content " , " " )
result = {
" id " : file . id ,
" filename " : file . filename ,
" content " : content ,
" updated_at " : file . updated_at ,
" created_at " : file . created_at ,
}
if knowledge_info :
result [ " knowledge_id " ] = knowledge_info [ " id " ]
result [ " knowledge_name " ] = knowledge_info [ " name " ]
return json . dumps ( result , ensure_ascii = False )
except Exception as e :
log . exception ( f " view_knowledge_file error: { e } " )
return json . dumps ( { " error " : str ( e ) } )
2026-01-09 22:21:00 +04:00
async def query_knowledge_files (
2026-01-07 08:58:58 -05:00
query : str ,
knowledge_ids : Optional [ list [ str ] ] = None ,
count : int = 5 ,
__request__ : Request = None ,
__user__ : dict = None ,
2026-01-07 09:46:07 -05:00
__model_knowledge__ : list [ dict ] = None ,
2026-01-07 08:58:58 -05:00
) - > str :
"""
2026-02-05 21:14:58 +01:00
Search knowledge base files using semantic / vector search . Searches across collections ( KBs ) ,
2026-01-07 10:21:05 -05:00
individual files , and notes that the user has access to .
2026-01-07 08:58:58 -05:00
: param query : The search query to find semantically relevant content
: param knowledge_ids : Optional list of KB ids to limit search to specific knowledge bases
: param count : Maximum number of results to return ( default : 5 )
: return : JSON with relevant chunks containing content , source filename , and relevance score
"""
if __request__ is None :
return json . dumps ( { " error " : " Request context not available " } )
if not __user__ :
return json . dumps ( { " error " : " User context not available " } )
2026-01-15 21:15:02 +01:00
# Coerce parameters from LLM tool calls (may come as strings)
if isinstance ( count , str ) :
try :
count = int ( count )
except ValueError :
count = 5 # Default fallback
# Handle knowledge_ids being string "None", "null", or empty
if isinstance ( knowledge_ids , str ) :
if knowledge_ids . lower ( ) in ( " none " , " null " , " " ) :
knowledge_ids = None
else :
# Try to parse as JSON array if it looks like one
try :
knowledge_ids = json . loads ( knowledge_ids )
except json . JSONDecodeError :
# Treat as single ID
knowledge_ids = [ knowledge_ids ]
2026-01-07 08:58:58 -05:00
try :
from open_webui . models . knowledge import Knowledges
2026-01-07 09:46:07 -05:00
from open_webui . models . files import Files
from open_webui . models . notes import Notes
2026-01-07 08:58:58 -05:00
from open_webui . retrieval . utils import query_collection
2026-02-09 13:28:14 -06:00
from open_webui . models . access_grants import AccessGrants
2026-01-07 08:58:58 -05:00
user_id = __user__ . get ( " id " )
user_role = __user__ . get ( " role " , " user " )
user_group_ids = [ group . id for group in Groups . get_groups_by_member_id ( user_id ) ]
embedding_function = __request__ . app . state . EMBEDDING_FUNCTION
if not embedding_function :
return json . dumps ( { " error " : " Embedding function not configured " } )
collection_names = [ ]
2026-01-07 09:46:07 -05:00
note_results = [ ] # Notes aren't vectorized, handle separately
# If model has attached knowledge, use those
if __model_knowledge__ :
for item in __model_knowledge__ :
item_type = item . get ( " type " )
item_id = item . get ( " id " )
if item_type == " collection " :
# Knowledge base - use KB ID as collection name
knowledge = Knowledges . get_knowledge_by_id ( item_id )
if knowledge and (
user_role == " admin "
or knowledge . user_id == user_id
2026-02-09 13:28:14 -06:00
or AccessGrants . has_access (
user_id = user_id ,
resource_type = " knowledge " ,
resource_id = knowledge . id ,
permission = " read " ,
user_group_ids = set ( user_group_ids ) ,
2026-01-08 01:55:56 +04:00
)
2026-01-07 09:46:07 -05:00
) :
collection_names . append ( item_id )
elif item_type == " file " :
# Individual file - use file-{id} as collection name
file = Files . get_file_by_id ( item_id )
if file and ( user_role == " admin " or file . user_id == user_id ) :
collection_names . append ( f " file- { item_id } " )
elif item_type == " note " :
# Note - always return full content as context
note = Notes . get_note_by_id ( item_id )
if note and (
user_role == " admin "
or note . user_id == user_id
2026-02-09 13:28:14 -06:00
or AccessGrants . has_access (
user_id = user_id ,
resource_type = " note " ,
resource_id = note . id ,
permission = " read " ,
)
2026-01-07 09:46:07 -05:00
) :
content = note . data . get ( " content " , { } ) . get ( " md " , " " )
2026-01-08 01:55:56 +04:00
note_results . append (
{
" content " : content ,
" source " : note . title ,
" note_id " : note . id ,
" type " : " note " ,
}
)
2026-01-07 09:46:07 -05:00
elif knowledge_ids :
# User specified specific KBs
2026-01-07 08:58:58 -05:00
for knowledge_id in knowledge_ids :
knowledge = Knowledges . get_knowledge_by_id ( knowledge_id )
if knowledge and (
user_role == " admin "
or knowledge . user_id == user_id
2026-02-09 13:28:14 -06:00
or AccessGrants . has_access (
user_id = user_id ,
resource_type = " knowledge " ,
resource_id = knowledge . id ,
permission = " read " ,
user_group_ids = set ( user_group_ids ) ,
2026-01-08 01:55:56 +04:00
)
2026-01-07 08:58:58 -05:00
) :
collection_names . append ( knowledge_id )
else :
2026-01-07 09:46:07 -05:00
# No model knowledge and no specific IDs - search all accessible KBs
2026-01-07 08:58:58 -05:00
result = Knowledges . search_knowledge_bases (
user_id ,
filter = {
" query " : " " ,
" user_id " : user_id ,
" group_ids " : user_group_ids ,
} ,
skip = 0 ,
2026-01-07 09:46:07 -05:00
limit = 50 ,
2026-01-07 08:58:58 -05:00
)
collection_names = [ knowledge_base . id for knowledge_base in result . items ]
2026-01-07 09:46:07 -05:00
chunks = [ ]
2026-01-07 08:58:58 -05:00
2026-01-07 09:46:07 -05:00
# Add note results first
chunks . extend ( note_results )
2026-01-07 08:58:58 -05:00
2026-01-07 09:46:07 -05:00
# Query vector collections if any
if collection_names :
query_results = await query_collection (
collection_names = collection_names ,
queries = [ query ] ,
embedding_function = embedding_function ,
k = count ,
)
2026-01-07 08:58:58 -05:00
2026-01-07 09:46:07 -05:00
if query_results and " documents " in query_results :
documents = query_results . get ( " documents " , [ [ ] ] ) [ 0 ]
metadatas = query_results . get ( " metadatas " , [ [ ] ] ) [ 0 ]
distances = query_results . get ( " distances " , [ [ ] ] ) [ 0 ]
for idx , doc in enumerate ( documents ) :
chunk_info = {
" content " : doc ,
2026-01-08 01:55:56 +04:00
" source " : metadatas [ idx ] . get (
" source " , metadatas [ idx ] . get ( " name " , " Unknown " )
) ,
2026-01-07 09:46:07 -05:00
" file_id " : metadatas [ idx ] . get ( " file_id " , " " ) ,
}
if idx < len ( distances ) :
chunk_info [ " distance " ] = distances [ idx ]
chunks . append ( chunk_info )
2026-01-07 08:58:58 -05:00
2026-01-07 09:46:07 -05:00
# Limit to requested count
chunks = chunks [ : count ]
2026-01-07 08:58:58 -05:00
return json . dumps ( chunks , ensure_ascii = False )
2026-01-09 22:21:00 +04:00
except Exception as e :
log . exception ( f " query_knowledge_files error: { e } " )
return json . dumps ( { " error " : str ( e ) } )
async def query_knowledge_bases (
query : str ,
count : int = 5 ,
__request__ : Request = None ,
__user__ : dict = None ,
) - > str :
"""
Search knowledge bases by semantic similarity to query .
Finds KBs whose name / description match the meaning of your query .
Use this to discover relevant knowledge bases before querying their files .
: param query : Natural language query describing what you ' re looking for
: param count : Maximum results ( default : 5 )
: return : JSON with matching KBs ( id , name , description , similarity )
"""
if __request__ is None :
return json . dumps ( { " error " : " Request context not available " } )
if not __user__ :
return json . dumps ( { " error " : " User context not available " } )
try :
import heapq
from open_webui . models . knowledge import Knowledges
from open_webui . routers . knowledge import KNOWLEDGE_BASES_COLLECTION
from open_webui . retrieval . vector . factory import VECTOR_DB_CLIENT
user_id = __user__ . get ( " id " )
user_group_ids = [ group . id for group in Groups . get_groups_by_member_id ( user_id ) ]
query_embedding = await __request__ . app . state . EMBEDDING_FUNCTION ( query )
# Min-heap of (distance, knowledge_base_id) - only holds top `count` results
top_results_heap = [ ]
seen_ids = set ( )
page_offset = 0
page_size = 100
while True :
accessible_knowledge_bases = Knowledges . search_knowledge_bases (
user_id ,
filter = { " user_id " : user_id , " group_ids " : user_group_ids } ,
skip = page_offset ,
limit = page_size ,
)
if not accessible_knowledge_bases . items :
break
accessible_ids = [ kb . id for kb in accessible_knowledge_bases . items ]
search_results = VECTOR_DB_CLIENT . search (
collection_name = KNOWLEDGE_BASES_COLLECTION ,
vectors = [ query_embedding ] ,
filter = { " knowledge_base_id " : { " $in " : accessible_ids } } ,
limit = count ,
)
if search_results and search_results . ids and search_results . ids [ 0 ] :
result_ids = search_results . ids [ 0 ]
2026-01-09 22:27:53 +04:00
result_distances = (
search_results . distances [ 0 ]
if search_results . distances
else [ 0 ] * len ( result_ids )
)
2026-01-09 22:21:00 +04:00
for knowledge_base_id , distance in zip ( result_ids , result_distances ) :
if knowledge_base_id in seen_ids :
continue
seen_ids . add ( knowledge_base_id )
if len ( top_results_heap ) < count :
heapq . heappush ( top_results_heap , ( distance , knowledge_base_id ) )
elif distance > top_results_heap [ 0 ] [ 0 ] :
2026-01-09 22:27:53 +04:00
heapq . heapreplace (
top_results_heap , ( distance , knowledge_base_id )
)
2026-01-09 22:21:00 +04:00
page_offset + = page_size
if len ( accessible_knowledge_bases . items ) < page_size :
break
if page_offset > = MAX_KNOWLEDGE_BASE_SEARCH_ITEMS :
break
# Sort by distance descending (best first) and fetch KB details
sorted_results = sorted ( top_results_heap , key = lambda x : x [ 0 ] , reverse = True )
matching_knowledge_bases = [ ]
for distance , knowledge_base_id in sorted_results :
knowledge_base = Knowledges . get_knowledge_by_id ( knowledge_base_id )
if knowledge_base :
2026-01-09 22:27:53 +04:00
matching_knowledge_bases . append (
{
" id " : knowledge_base . id ,
" name " : knowledge_base . name ,
" description " : knowledge_base . description or " " ,
" similarity " : round ( distance , 4 ) ,
}
)
2026-01-09 22:21:00 +04:00
return json . dumps ( matching_knowledge_bases , ensure_ascii = False )
2026-01-07 08:58:58 -05:00
except Exception as e :
log . exception ( f " query_knowledge_bases error: { e } " )
return json . dumps ( { " error " : str ( e ) } )
2026-02-11 14:00:34 -06:00
# =============================================================================
# SKILLS TOOLS
# =============================================================================
async def view_skill (
name : str ,
__request__ : Request = None ,
__user__ : dict = None ,
) - > str :
"""
Load the full instructions of a skill by its name from the available skills manifest .
Use this when you need detailed instructions for a skill listed in < available_skills > .
: param name : The name of the skill to load ( as shown in the manifest )
: return : The full skill instructions as markdown content
"""
if __request__ is None :
return json . dumps ( { " error " : " Request context not available " } )
if not __user__ :
return json . dumps ( { " error " : " User context not available " } )
try :
from open_webui . models . skills import Skills
from open_webui . models . access_grants import AccessGrants
user_id = __user__ . get ( " id " )
# Direct DB lookup by unique name
skill = Skills . get_skill_by_name ( name )
if not skill or not skill . is_active :
return json . dumps ( { " error " : f " Skill ' { name } ' not found " } )
# Check user access
user_role = __user__ . get ( " role " , " user " )
if user_role != " admin " and skill . user_id != user_id :
2026-02-11 16:24:11 -06:00
user_group_ids = [
group . id for group in Groups . get_groups_by_member_id ( user_id )
]
2026-02-11 14:00:34 -06:00
if not AccessGrants . has_access (
user_id = user_id ,
resource_type = " skill " ,
resource_id = skill . id ,
permission = " read " ,
user_group_ids = set ( user_group_ids ) ,
) :
return json . dumps ( { " error " : " Access denied " } )
2026-02-11 16:24:11 -06:00
return json . dumps (
{
" name " : skill . name ,
" content " : skill . content ,
} ,
ensure_ascii = False ,
)
2026-02-11 14:00:34 -06:00
except Exception as e :
log . exception ( f " view_skill error: { e } " )
return json . dumps ( { " error " : str ( e ) } )