enh: RAG_EMBEDDING_CONCURRENT_REQUESTS

This commit is contained in:
Timothy Jaeryang Baek
2026-02-21 14:33:48 -06:00
parent 5522b91c32
commit 5d4547f934
5 changed files with 64 additions and 7 deletions

View File

@@ -2903,6 +2903,12 @@ ENABLE_ASYNC_EMBEDDING = PersistentConfig(
os.environ.get("ENABLE_ASYNC_EMBEDDING", "True").lower() == "true",
)
RAG_EMBEDDING_CONCURRENT_REQUESTS = PersistentConfig(
"RAG_EMBEDDING_CONCURRENT_REQUESTS",
"rag.embedding_concurrent_requests",
int(os.getenv("RAG_EMBEDDING_CONCURRENT_REQUESTS", "0")),
)
RAG_EMBEDDING_QUERY_PREFIX = os.environ.get("RAG_EMBEDDING_QUERY_PREFIX", None)
RAG_EMBEDDING_CONTENT_PREFIX = os.environ.get("RAG_EMBEDDING_CONTENT_PREFIX", None)

View File

@@ -240,6 +240,7 @@ from open_webui.config import (
RAG_EMBEDDING_ENGINE,
RAG_EMBEDDING_BATCH_SIZE,
ENABLE_ASYNC_EMBEDDING,
RAG_EMBEDDING_CONCURRENT_REQUESTS,
RAG_TOP_K,
RAG_TOP_K_RERANKER,
RAG_RELEVANCE_THRESHOLD,
@@ -980,6 +981,7 @@ app.state.config.RAG_EMBEDDING_ENGINE = RAG_EMBEDDING_ENGINE
app.state.config.RAG_EMBEDDING_MODEL = RAG_EMBEDDING_MODEL
app.state.config.RAG_EMBEDDING_BATCH_SIZE = RAG_EMBEDDING_BATCH_SIZE
app.state.config.ENABLE_ASYNC_EMBEDDING = ENABLE_ASYNC_EMBEDDING
app.state.config.RAG_EMBEDDING_CONCURRENT_REQUESTS = RAG_EMBEDDING_CONCURRENT_REQUESTS
app.state.config.RAG_RERANKING_ENGINE = RAG_RERANKING_ENGINE
app.state.config.RAG_RERANKING_MODEL = RAG_RERANKING_MODEL
@@ -1133,6 +1135,7 @@ app.state.EMBEDDING_FUNCTION = get_embedding_function(
else None
),
enable_async=app.state.config.ENABLE_ASYNC_EMBEDDING,
concurrent_requests=app.state.config.RAG_EMBEDDING_CONCURRENT_REQUESTS,
)
app.state.RERANKING_FUNCTION = get_reranking_function(

View File

@@ -803,6 +803,7 @@ def get_embedding_function(
embedding_batch_size,
azure_api_version=None,
enable_async=True,
concurrent_requests=0,
) -> Awaitable:
if embedding_engine == "":
# Sentence transformers: CPU-bound sync operation
@@ -844,10 +845,22 @@ def get_embedding_function(
log.debug(
f"generate_multiple_async: Processing {len(batches)} batches in parallel"
)
# Execute all batches in parallel
tasks = [
embedding_function(batch, prefix=prefix, user=user)
for batch in batches
# Use semaphore to limit concurrent embedding API requests
# 0 = unlimited (no semaphore)
if concurrent_requests:
semaphore = asyncio.Semaphore(concurrent_requests)
async def generate_batch_with_semaphore(batch):
async with semaphore:
return await embedding_function(
batch, prefix=prefix, user=user
)
tasks = [generate_batch_with_semaphore(batch) for batch in batches]
else:
tasks = [
embedding_function(batch, prefix=prefix, user=user)
for batch in batches
]
batch_results = await asyncio.gather(*tasks)
else:

View File

@@ -270,6 +270,7 @@ async def get_status(request: Request):
"RAG_RERANKING_MODEL": request.app.state.config.RAG_RERANKING_MODEL,
"RAG_EMBEDDING_BATCH_SIZE": request.app.state.config.RAG_EMBEDDING_BATCH_SIZE,
"ENABLE_ASYNC_EMBEDDING": request.app.state.config.ENABLE_ASYNC_EMBEDDING,
"RAG_EMBEDDING_CONCURRENT_REQUESTS": request.app.state.config.RAG_EMBEDDING_CONCURRENT_REQUESTS,
}
@@ -281,6 +282,7 @@ async def get_embedding_config(request: Request, user=Depends(get_admin_user)):
"RAG_EMBEDDING_MODEL": request.app.state.config.RAG_EMBEDDING_MODEL,
"RAG_EMBEDDING_BATCH_SIZE": request.app.state.config.RAG_EMBEDDING_BATCH_SIZE,
"ENABLE_ASYNC_EMBEDDING": request.app.state.config.ENABLE_ASYNC_EMBEDDING,
"RAG_EMBEDDING_CONCURRENT_REQUESTS": request.app.state.config.RAG_EMBEDDING_CONCURRENT_REQUESTS,
"openai_config": {
"url": request.app.state.config.RAG_OPENAI_API_BASE_URL,
"key": request.app.state.config.RAG_OPENAI_API_KEY,
@@ -321,6 +323,7 @@ class EmbeddingModelUpdateForm(BaseModel):
RAG_EMBEDDING_MODEL: str
RAG_EMBEDDING_BATCH_SIZE: Optional[int] = 1
ENABLE_ASYNC_EMBEDDING: Optional[bool] = True
RAG_EMBEDDING_CONCURRENT_REQUESTS: Optional[int] = 0
def unload_embedding_model(request: Request):
@@ -355,6 +358,9 @@ async def update_embedding_config(
request.app.state.config.ENABLE_ASYNC_EMBEDDING = (
form_data.ENABLE_ASYNC_EMBEDDING
)
request.app.state.config.RAG_EMBEDDING_CONCURRENT_REQUESTS = (
form_data.RAG_EMBEDDING_CONCURRENT_REQUESTS
)
if request.app.state.config.RAG_EMBEDDING_ENGINE in [
"ollama",
@@ -422,6 +428,7 @@ async def update_embedding_config(
else None
),
enable_async=request.app.state.config.ENABLE_ASYNC_EMBEDDING,
concurrent_requests=request.app.state.config.RAG_EMBEDDING_CONCURRENT_REQUESTS,
)
return {
@@ -430,6 +437,7 @@ async def update_embedding_config(
"RAG_EMBEDDING_MODEL": request.app.state.config.RAG_EMBEDDING_MODEL,
"RAG_EMBEDDING_BATCH_SIZE": request.app.state.config.RAG_EMBEDDING_BATCH_SIZE,
"ENABLE_ASYNC_EMBEDDING": request.app.state.config.ENABLE_ASYNC_EMBEDDING,
"RAG_EMBEDDING_CONCURRENT_REQUESTS": request.app.state.config.RAG_EMBEDDING_CONCURRENT_REQUESTS,
"openai_config": {
"url": request.app.state.config.RAG_OPENAI_API_BASE_URL,
"key": request.app.state.config.RAG_OPENAI_API_KEY,
@@ -1589,6 +1597,7 @@ def save_docs_to_vector_db(
else None
),
enable_async=request.app.state.config.ENABLE_ASYNC_EMBEDDING,
concurrent_requests=request.app.state.config.RAG_EMBEDDING_CONCURRENT_REQUESTS,
)
# Run async embedding in sync context using the main event loop
@@ -2329,7 +2338,7 @@ async def process_web_search(
# Limited concurrency with semaphore
semaphore = asyncio.Semaphore(concurrent_limit)
async def search_with_limit(query):
async def search_query_with_semaphore(query):
async with semaphore:
return await run_in_threadpool(
search_web,
@@ -2339,7 +2348,7 @@ async def process_web_search(
user,
)
search_tasks = [search_with_limit(query) for query in form_data.queries]
search_tasks = [search_query_with_semaphore(query) for query in form_data.queries]
else:
# Unlimited parallel execution (previous behavior)
search_tasks = [

View File

@@ -42,6 +42,7 @@
let RAG_EMBEDDING_MODEL = '';
let RAG_EMBEDDING_BATCH_SIZE = 1;
let ENABLE_ASYNC_EMBEDDING = true;
let RAG_EMBEDDING_CONCURRENT_REQUESTS = 0;
let rerankingModel = '';
@@ -104,7 +105,8 @@
RAG_EMBEDDING_ENGINE,
RAG_EMBEDDING_MODEL,
RAG_EMBEDDING_BATCH_SIZE,
ENABLE_ASYNC_EMBEDDING
ENABLE_ASYNC_EMBEDDING,
RAG_EMBEDDING_CONCURRENT_REQUESTS
});
updateEmbeddingModelLoading = true;
@@ -113,6 +115,7 @@
RAG_EMBEDDING_MODEL: RAG_EMBEDDING_MODEL,
RAG_EMBEDDING_BATCH_SIZE: RAG_EMBEDDING_BATCH_SIZE,
ENABLE_ASYNC_EMBEDDING: ENABLE_ASYNC_EMBEDDING,
RAG_EMBEDDING_CONCURRENT_REQUESTS: RAG_EMBEDDING_CONCURRENT_REQUESTS,
ollama_config: {
key: OllamaKey,
url: OllamaUrl
@@ -241,6 +244,7 @@
RAG_EMBEDDING_MODEL = embeddingConfig.RAG_EMBEDDING_MODEL;
RAG_EMBEDDING_BATCH_SIZE = embeddingConfig.RAG_EMBEDDING_BATCH_SIZE ?? 1;
ENABLE_ASYNC_EMBEDDING = embeddingConfig.ENABLE_ASYNC_EMBEDDING ?? true;
RAG_EMBEDDING_CONCURRENT_REQUESTS = embeddingConfig.RAG_EMBEDDING_CONCURRENT_REQUESTS ?? 0;
OpenAIKey = embeddingConfig.openai_config.key;
OpenAIUrl = embeddingConfig.openai_config.url;
@@ -1043,6 +1047,28 @@
<Switch bind:state={ENABLE_ASYNC_EMBEDDING} />
</div>
</div>
<div class=" mb-2.5 flex w-full justify-between">
<div class="self-center text-xs font-medium">
<Tooltip
content={$i18n.t(
'Limits the number of concurrent embedding requests. Set to 0 for unlimited.'
)}
placement="top-start"
>
{$i18n.t('Embedding Concurrent Requests')}
</Tooltip>
</div>
<div class="">
<input
bind:value={RAG_EMBEDDING_CONCURRENT_REQUESTS}
type="number"
class=" bg-transparent text-center w-14 outline-none"
min="0"
step="1"
/>
</div>
</div>
{/if}
</div>