From 5d4547f934b6fbe751bb2041f9597fe11ddf8e43 Mon Sep 17 00:00:00 2001 From: Timothy Jaeryang Baek Date: Sat, 21 Feb 2026 14:33:48 -0600 Subject: [PATCH] enh: RAG_EMBEDDING_CONCURRENT_REQUESTS --- backend/open_webui/config.py | 6 ++++ backend/open_webui/main.py | 3 ++ backend/open_webui/retrieval/utils.py | 21 +++++++++++--- backend/open_webui/routers/retrieval.py | 13 +++++++-- .../admin/Settings/Documents.svelte | 28 ++++++++++++++++++- 5 files changed, 64 insertions(+), 7 deletions(-) diff --git a/backend/open_webui/config.py b/backend/open_webui/config.py index 9ec08cc4c3..af7234bb2c 100644 --- a/backend/open_webui/config.py +++ b/backend/open_webui/config.py @@ -2903,6 +2903,12 @@ ENABLE_ASYNC_EMBEDDING = PersistentConfig( os.environ.get("ENABLE_ASYNC_EMBEDDING", "True").lower() == "true", ) +RAG_EMBEDDING_CONCURRENT_REQUESTS = PersistentConfig( + "RAG_EMBEDDING_CONCURRENT_REQUESTS", + "rag.embedding_concurrent_requests", + int(os.getenv("RAG_EMBEDDING_CONCURRENT_REQUESTS", "0")), +) + RAG_EMBEDDING_QUERY_PREFIX = os.environ.get("RAG_EMBEDDING_QUERY_PREFIX", None) RAG_EMBEDDING_CONTENT_PREFIX = os.environ.get("RAG_EMBEDDING_CONTENT_PREFIX", None) diff --git a/backend/open_webui/main.py b/backend/open_webui/main.py index 3f1536244d..693a611266 100644 --- a/backend/open_webui/main.py +++ b/backend/open_webui/main.py @@ -240,6 +240,7 @@ from open_webui.config import ( RAG_EMBEDDING_ENGINE, RAG_EMBEDDING_BATCH_SIZE, ENABLE_ASYNC_EMBEDDING, + RAG_EMBEDDING_CONCURRENT_REQUESTS, RAG_TOP_K, RAG_TOP_K_RERANKER, RAG_RELEVANCE_THRESHOLD, @@ -980,6 +981,7 @@ app.state.config.RAG_EMBEDDING_ENGINE = RAG_EMBEDDING_ENGINE app.state.config.RAG_EMBEDDING_MODEL = RAG_EMBEDDING_MODEL app.state.config.RAG_EMBEDDING_BATCH_SIZE = RAG_EMBEDDING_BATCH_SIZE app.state.config.ENABLE_ASYNC_EMBEDDING = ENABLE_ASYNC_EMBEDDING +app.state.config.RAG_EMBEDDING_CONCURRENT_REQUESTS = RAG_EMBEDDING_CONCURRENT_REQUESTS app.state.config.RAG_RERANKING_ENGINE = RAG_RERANKING_ENGINE app.state.config.RAG_RERANKING_MODEL = RAG_RERANKING_MODEL @@ -1133,6 +1135,7 @@ app.state.EMBEDDING_FUNCTION = get_embedding_function( else None ), enable_async=app.state.config.ENABLE_ASYNC_EMBEDDING, + concurrent_requests=app.state.config.RAG_EMBEDDING_CONCURRENT_REQUESTS, ) app.state.RERANKING_FUNCTION = get_reranking_function( diff --git a/backend/open_webui/retrieval/utils.py b/backend/open_webui/retrieval/utils.py index 536ddb4c32..33d3779a0c 100644 --- a/backend/open_webui/retrieval/utils.py +++ b/backend/open_webui/retrieval/utils.py @@ -803,6 +803,7 @@ def get_embedding_function( embedding_batch_size, azure_api_version=None, enable_async=True, + concurrent_requests=0, ) -> Awaitable: if embedding_engine == "": # Sentence transformers: CPU-bound sync operation @@ -844,10 +845,22 @@ def get_embedding_function( log.debug( f"generate_multiple_async: Processing {len(batches)} batches in parallel" ) - # Execute all batches in parallel - tasks = [ - embedding_function(batch, prefix=prefix, user=user) - for batch in batches + # Use semaphore to limit concurrent embedding API requests + # 0 = unlimited (no semaphore) + if concurrent_requests: + semaphore = asyncio.Semaphore(concurrent_requests) + + async def generate_batch_with_semaphore(batch): + async with semaphore: + return await embedding_function( + batch, prefix=prefix, user=user + ) + + tasks = [generate_batch_with_semaphore(batch) for batch in batches] + else: + tasks = [ + embedding_function(batch, prefix=prefix, user=user) + for batch in batches ] batch_results = await asyncio.gather(*tasks) else: diff --git a/backend/open_webui/routers/retrieval.py b/backend/open_webui/routers/retrieval.py index b16bc022ee..9630462613 100644 --- a/backend/open_webui/routers/retrieval.py +++ b/backend/open_webui/routers/retrieval.py @@ -270,6 +270,7 @@ async def get_status(request: Request): "RAG_RERANKING_MODEL": request.app.state.config.RAG_RERANKING_MODEL, "RAG_EMBEDDING_BATCH_SIZE": request.app.state.config.RAG_EMBEDDING_BATCH_SIZE, "ENABLE_ASYNC_EMBEDDING": request.app.state.config.ENABLE_ASYNC_EMBEDDING, + "RAG_EMBEDDING_CONCURRENT_REQUESTS": request.app.state.config.RAG_EMBEDDING_CONCURRENT_REQUESTS, } @@ -281,6 +282,7 @@ async def get_embedding_config(request: Request, user=Depends(get_admin_user)): "RAG_EMBEDDING_MODEL": request.app.state.config.RAG_EMBEDDING_MODEL, "RAG_EMBEDDING_BATCH_SIZE": request.app.state.config.RAG_EMBEDDING_BATCH_SIZE, "ENABLE_ASYNC_EMBEDDING": request.app.state.config.ENABLE_ASYNC_EMBEDDING, + "RAG_EMBEDDING_CONCURRENT_REQUESTS": request.app.state.config.RAG_EMBEDDING_CONCURRENT_REQUESTS, "openai_config": { "url": request.app.state.config.RAG_OPENAI_API_BASE_URL, "key": request.app.state.config.RAG_OPENAI_API_KEY, @@ -321,6 +323,7 @@ class EmbeddingModelUpdateForm(BaseModel): RAG_EMBEDDING_MODEL: str RAG_EMBEDDING_BATCH_SIZE: Optional[int] = 1 ENABLE_ASYNC_EMBEDDING: Optional[bool] = True + RAG_EMBEDDING_CONCURRENT_REQUESTS: Optional[int] = 0 def unload_embedding_model(request: Request): @@ -355,6 +358,9 @@ async def update_embedding_config( request.app.state.config.ENABLE_ASYNC_EMBEDDING = ( form_data.ENABLE_ASYNC_EMBEDDING ) + request.app.state.config.RAG_EMBEDDING_CONCURRENT_REQUESTS = ( + form_data.RAG_EMBEDDING_CONCURRENT_REQUESTS + ) if request.app.state.config.RAG_EMBEDDING_ENGINE in [ "ollama", @@ -422,6 +428,7 @@ async def update_embedding_config( else None ), enable_async=request.app.state.config.ENABLE_ASYNC_EMBEDDING, + concurrent_requests=request.app.state.config.RAG_EMBEDDING_CONCURRENT_REQUESTS, ) return { @@ -430,6 +437,7 @@ async def update_embedding_config( "RAG_EMBEDDING_MODEL": request.app.state.config.RAG_EMBEDDING_MODEL, "RAG_EMBEDDING_BATCH_SIZE": request.app.state.config.RAG_EMBEDDING_BATCH_SIZE, "ENABLE_ASYNC_EMBEDDING": request.app.state.config.ENABLE_ASYNC_EMBEDDING, + "RAG_EMBEDDING_CONCURRENT_REQUESTS": request.app.state.config.RAG_EMBEDDING_CONCURRENT_REQUESTS, "openai_config": { "url": request.app.state.config.RAG_OPENAI_API_BASE_URL, "key": request.app.state.config.RAG_OPENAI_API_KEY, @@ -1589,6 +1597,7 @@ def save_docs_to_vector_db( else None ), enable_async=request.app.state.config.ENABLE_ASYNC_EMBEDDING, + concurrent_requests=request.app.state.config.RAG_EMBEDDING_CONCURRENT_REQUESTS, ) # Run async embedding in sync context using the main event loop @@ -2329,7 +2338,7 @@ async def process_web_search( # Limited concurrency with semaphore semaphore = asyncio.Semaphore(concurrent_limit) - async def search_with_limit(query): + async def search_query_with_semaphore(query): async with semaphore: return await run_in_threadpool( search_web, @@ -2339,7 +2348,7 @@ async def process_web_search( user, ) - search_tasks = [search_with_limit(query) for query in form_data.queries] + search_tasks = [search_query_with_semaphore(query) for query in form_data.queries] else: # Unlimited parallel execution (previous behavior) search_tasks = [ diff --git a/src/lib/components/admin/Settings/Documents.svelte b/src/lib/components/admin/Settings/Documents.svelte index 3f4434084b..ca0d86ebbd 100644 --- a/src/lib/components/admin/Settings/Documents.svelte +++ b/src/lib/components/admin/Settings/Documents.svelte @@ -42,6 +42,7 @@ let RAG_EMBEDDING_MODEL = ''; let RAG_EMBEDDING_BATCH_SIZE = 1; let ENABLE_ASYNC_EMBEDDING = true; + let RAG_EMBEDDING_CONCURRENT_REQUESTS = 0; let rerankingModel = ''; @@ -104,7 +105,8 @@ RAG_EMBEDDING_ENGINE, RAG_EMBEDDING_MODEL, RAG_EMBEDDING_BATCH_SIZE, - ENABLE_ASYNC_EMBEDDING + ENABLE_ASYNC_EMBEDDING, + RAG_EMBEDDING_CONCURRENT_REQUESTS }); updateEmbeddingModelLoading = true; @@ -113,6 +115,7 @@ RAG_EMBEDDING_MODEL: RAG_EMBEDDING_MODEL, RAG_EMBEDDING_BATCH_SIZE: RAG_EMBEDDING_BATCH_SIZE, ENABLE_ASYNC_EMBEDDING: ENABLE_ASYNC_EMBEDDING, + RAG_EMBEDDING_CONCURRENT_REQUESTS: RAG_EMBEDDING_CONCURRENT_REQUESTS, ollama_config: { key: OllamaKey, url: OllamaUrl @@ -241,6 +244,7 @@ RAG_EMBEDDING_MODEL = embeddingConfig.RAG_EMBEDDING_MODEL; RAG_EMBEDDING_BATCH_SIZE = embeddingConfig.RAG_EMBEDDING_BATCH_SIZE ?? 1; ENABLE_ASYNC_EMBEDDING = embeddingConfig.ENABLE_ASYNC_EMBEDDING ?? true; + RAG_EMBEDDING_CONCURRENT_REQUESTS = embeddingConfig.RAG_EMBEDDING_CONCURRENT_REQUESTS ?? 0; OpenAIKey = embeddingConfig.openai_config.key; OpenAIUrl = embeddingConfig.openai_config.url; @@ -1043,6 +1047,28 @@ + +
+
+ + {$i18n.t('Embedding Concurrent Requests')} + +
+
+ +
+
{/if}