fix: prevent worker death during document upload by using run_coroutine_threadsafe (#21158)

* fix: prevent worker death during document upload by using run_coroutine_threadsafe

Replace asyncio.run() with asyncio.run_coroutine_threadsafe() in
save_docs_to_vector_db() to prevent uvicorn worker health check failures.

The issue: asyncio.run() creates a new event loop and blocks the thread
completely, preventing the worker from responding to health checks during
long-running embedding operations (>5 seconds default timeout).

The fix: Schedule the async embedding work on the main event loop using
run_coroutine_threadsafe(). This keeps the main loop responsive to health
check pings while the sync caller waits for the result.

Changes:
- main.py: Store main event loop reference in app.state.main_loop at startup
- retrieval.py: Use run_coroutine_threadsafe() instead of asyncio.run()

https://claude.ai/code/session_01UQSYvSTkXb57sFb7M85Kcw

* add env var

---------

Co-authored-by: Claude <noreply@anthropic.com>
This commit is contained in:
Classic298
2026-02-12 22:22:57 +01:00
committed by GitHub
parent c6af296b60
commit 8cf32ae2a7
2 changed files with 13 additions and 3 deletions

View File

@@ -591,6 +591,10 @@ https://github.com/open-webui/open-webui
@asynccontextmanager
async def lifespan(app: FastAPI):
# Store reference to main event loop for sync->async calls (e.g., embedding generation)
# This allows sync functions to schedule work on the main loop without blocking health checks
app.state.main_loop = asyncio.get_running_loop()
app.state.instance_id = INSTANCE_ID
start_logger()

View File

@@ -1590,14 +1590,20 @@ def save_docs_to_vector_db(
enable_async=request.app.state.config.ENABLE_ASYNC_EMBEDDING,
)
# Run async embedding in sync context
embeddings = asyncio.run(
# Run async embedding in sync context using the main event loop
# This allows the main loop to stay responsive to health checks during long operations
embedding_timeout_str = os.environ.get("RAG_EMBEDDING_TIMEOUT")
embedding_timeout = int(embedding_timeout_str) if embedding_timeout_str else None
future = asyncio.run_coroutine_threadsafe(
embedding_function(
list(map(lambda x: x.replace("\n", " "), texts)),
prefix=RAG_EMBEDDING_CONTENT_PREFIX,
user=user,
)
),
request.app.state.main_loop,
)
embeddings = future.result(timeout=embedding_timeout)
log.info(f"embeddings generated {len(embeddings)} for {len(texts)} items")
items = [