From ea9c58ea80646cef05e06d0beaf5e81cc2f78cb1 Mon Sep 17 00:00:00 2001 From: Timothy Jaeryang Baek Date: Sun, 1 Feb 2026 19:39:28 -0600 Subject: [PATCH] feat: experimental responses api support --- backend/open_webui/routers/openai.py | 92 ++- backend/open_webui/utils/middleware.py | 634 +++++++++++++++---- src/lib/components/AddConnectionModal.svelte | 46 +- 3 files changed, 644 insertions(+), 128 deletions(-) diff --git a/backend/open_webui/routers/openai.py b/backend/open_webui/routers/openai.py index 44575e57f2..b1d31afb8f 100644 --- a/backend/open_webui/routers/openai.py +++ b/backend/open_webui/routers/openai.py @@ -794,6 +794,79 @@ def convert_to_azure_payload(url, payload: dict, api_version: str): return url, payload +def convert_to_responses_payload(payload: dict) -> dict: + """ + Convert Chat Completions payload to Responses API format. + + Chat Completions: { messages: [{role, content}], ... } + Responses API: { input: [{type: "message", role, content: [...]}], instructions: "system" } + """ + messages = payload.pop("messages", []) + + system_content = "" + input_items = [] + + for msg in messages: + role = msg.get("role", "user") + content = msg.get("content", "") + + # Check for stored output items (from previous Responses API turn) + stored_output = msg.get("output") + if stored_output and isinstance(stored_output, list): + input_items.extend(stored_output) + continue + + if role == "system": + if isinstance(content, str): + system_content = content + elif isinstance(content, list): + system_content = "\n".join(p.get("text", "") for p in content if p.get("type") == "text") + continue + + # Convert content format + text_type = "output_text" if role == "assistant" else "input_text" + + if isinstance(content, str): + content_parts = [{"type": text_type, "text": content}] + elif isinstance(content, list): + content_parts = [] + for part in content: + if part.get("type") == "text": + content_parts.append({"type": text_type, "text": part.get("text", "")}) + elif part.get("type") == "image_url": + url_data = part.get("image_url", {}) + url = url_data.get("url", "") if isinstance(url_data, dict) else url_data + content_parts.append({"type": "input_image", "image_url": url}) + else: + content_parts = [{"type": text_type, "text": str(content)}] + + input_items.append({ + "type": "message", + "role": role, + "content": content_parts + }) + + responses_payload = {**payload, "input": input_items} + + if system_content: + responses_payload["instructions"] = system_content + + if "max_tokens" in responses_payload: + responses_payload["max_output_tokens"] = responses_payload.pop("max_tokens") + + return responses_payload + + + +def convert_responses_result(response: dict) -> dict: + """ + Convert non-streaming Responses API result. + Just add done flag - pass through raw response, frontend handles output. + """ + response["done"] = True + return response + + @router.post("/chat/completions") async def generate_chat_completion( request: Request, @@ -915,6 +988,8 @@ async def generate_chat_completion( request, url, key, api_config, metadata, user=user ) + is_responses = api_config.get("api_type") == "responses" + if api_config.get("azure", False): api_version = api_config.get("api_version", "2023-03-15-preview") request_url, payload = convert_to_azure_payload(url, payload, api_version) @@ -925,9 +1000,18 @@ async def generate_chat_completion( headers["api-key"] = key headers["api-version"] = api_version - request_url = f"{request_url}/chat/completions?api-version={api_version}" + + if is_responses: + payload = convert_to_responses_payload(payload) + request_url = f"{request_url}/responses?api-version={api_version}" + else: + request_url = f"{request_url}/chat/completions?api-version={api_version}" else: - request_url = f"{url}/chat/completions" + if is_responses: + payload = convert_to_responses_payload(payload) + request_url = f"{url}/responses" + else: + request_url = f"{url}/chat/completions" payload = json.dumps(payload) @@ -974,6 +1058,10 @@ async def generate_chat_completion( else: return PlainTextResponse(status_code=r.status, content=response) + # Convert Responses API result to simple format + if is_responses and isinstance(response, dict): + response = convert_responses_result(response) + return response except Exception as e: log.exception(e) diff --git a/backend/open_webui/utils/middleware.py b/backend/open_webui/utils/middleware.py index 81e07df94e..fc2465f1a8 100644 --- a/backend/open_webui/utils/middleware.py +++ b/backend/open_webui/utils/middleware.py @@ -293,6 +293,472 @@ def get_citation_source_from_tool_result( ] +def split_content_and_whitespace(content): + content_stripped = content.rstrip() + original_whitespace = ( + content[len(content_stripped) :] if len(content) > len(content_stripped) else "" + ) + return content_stripped, original_whitespace + + +def is_opening_code_block(content): + backtick_segments = content.split("```") + # Even number of segments means the last backticks are opening a new block + return len(backtick_segments) > 1 and len(backtick_segments) % 2 == 0 + + +def serialize_output(output: list) -> str: + """ + Convert OR-aligned output items to HTML for display. + For LLM consumption, use convert_output_to_messages() instead. + """ + content = "" + + # First pass: collect function_call_output items by call_id for lookup + tool_outputs = {} + for item in output: + if item.get("type") == "function_call_output": + tool_outputs[item.get("call_id")] = item + + # Second pass: render items in order + for item in output: + item_type = item.get("type", "") + + if item_type == "message": + for content_part in item.get("content", []): + if "text" in content_part: + text = content_part.get("text", "").strip() + if text: + content = f"{content}{text}\n" + + elif item_type == "function_call": + # Render tool call inline with its result (if available) + if content and not content.endswith("\n"): + content += "\n" + + call_id = item.get("call_id", "") + name = item.get("name", "") + arguments = item.get("arguments", "") + + result_item = tool_outputs.get(call_id) + if result_item: + result_text = "" + for out in result_item.get("output", []): + if "text" in out: + result_text += out.get("text", "") + files = result_item.get("files") + embeds = result_item.get("embeds", "") + + content += f'
\nTool Executed\n
\n' + else: + content += f'
\nExecuting...\n
\n' + + elif item_type == "function_call_output": + # Already handled inline with function_call above + pass + + elif item_type == "reasoning": + reasoning_content = "" + # Check for 'summary' (new structure) or 'content' (legacy/fallback) + source_list = item.get("summary", []) or item.get("content", []) + for content_part in source_list: + if "text" in content_part: + reasoning_content += content_part.get("text", "") + elif "summary" in content_part: # Handle potential nested logic if any + pass + + reasoning_content = reasoning_content.strip() + + duration = item.get("duration") + status = item.get("status", "in_progress") + + if content and not content.endswith("\n"): + content += "\n" + + display = html.escape( + "\n".join( + (f"> {line}" if not line.startswith(">") else line) + for line in reasoning_content.splitlines() + ) + ) + + if status == "completed" or duration is not None: + content = f'{content}
\nThought for {duration or 0} seconds\n{display}\n
\n' + else: + content = f'{content}
\nThinking…\n{display}\n
\n' + + elif item_type == "open_webui:code_interpreter": + content_stripped, original_whitespace = split_content_and_whitespace( + content + ) + if is_opening_code_block(content_stripped): + content = content_stripped.rstrip("`").rstrip() + original_whitespace + else: + content = content_stripped + original_whitespace + + if content and not content.endswith("\n"): + content += "\n" + + return content.strip() + + +def deep_merge(target, source): + """ + Merge source into target recursively (returning new structure). + - Dicts: Recursive merge. + - Strings: Concatenation. + - Others: Overwrite. + """ + if isinstance(target, dict) and isinstance(source, dict): + new_target = target.copy() + for k, v in source.items(): + if k in new_target: + new_target[k] = deep_merge(new_target[k], v) + else: + new_target[k] = v + return new_target + elif isinstance(target, str) and isinstance(source, str): + return target + source + else: + return source + + +def handle_responses_streaming_event( + data: dict, + current_output: list, +) -> tuple[list, dict | None]: + """ + Handle Responses API streaming events in a pure functional way. + + Args: + data: The event data + current_output: List of output items (treated as immutable) + + Returns: + tuple[list, dict | None]: (new_output, metadata) + - new_output: The updated output list. + - metadata: Metadata to emit (e.g. usage), {} if update occurred, None if skip. + """ + # Default: no change + # Note: treating current_output as immutable, but avoiding full deepcopy for perf. + # We will shallow copy only if we need to modify the list structure or items. + + event_type = data.get("type", "") + + if event_type == "response.output_item.added": + item = data.get("item", {}) + if item: + new_output = list(current_output) + new_output.append(item) + return new_output, None + return current_output, None + + elif event_type == "response.content_part.added": + part = data.get("part", {}) + output_index = data.get("output_index", len(current_output) - 1) + + if current_output and 0 <= output_index < len(current_output): + new_output = list(current_output) + # Copy the item to mutate it + item = new_output[output_index].copy() + new_output[output_index] = item + + if "content" not in item: + item["content"] = [] + else: + # Copy content list + item["content"] = list(item["content"]) + + if item.get("type") == "reasoning": + # Reasoning items should not have content parts + pass + else: + item["content"].append(part) + item["content"].append(part) + return new_output, None + return current_output, None + + elif event_type == "response.reasoning_summary_part.added": + part = data.get("part", {}) + output_index = data.get("output_index", len(current_output) - 1) + + if current_output and 0 <= output_index < len(current_output): + new_output = list(current_output) + item = new_output[output_index].copy() + new_output[output_index] = item + + if "summary" not in item: + item["summary"] = [] + else: + item["summary"] = list(item["summary"]) + + item["summary"].append(part) + return new_output, None + return current_output, None + + elif event_type.startswith("response.") and event_type.endswith(".delta"): + # Generic Delta Handling + parts = event_type.split(".") + if len(parts) >= 3: + delta_type = parts[1] + delta = data.get("delta", "") + + output_index = data.get("output_index", len(current_output) - 1) + + if current_output and 0 <= output_index < len(current_output): + new_output = list(current_output) + item = new_output[output_index].copy() + new_output[output_index] = item + item_type = item.get("type", "") + + # Determine target field and object based on delta_type and item_type + if delta_type == "function_call_arguments": + key = "arguments" + if item_type == "function_call": + # Function call args are usually strings + item[key] = item.get(key, "") + str(delta) + else: + # Generic handling, refined by item type below + pass + + if item_type == "message": + # Message items: "text"/"output_text" -> "text" + # "reasoning_text" -> Skipped (should use reasoning item) + if delta_type in ["text", "output_text"]: + key = "text" + elif delta_type in ["reasoning_text", "reasoning_summary_text"]: + # Skip reasoning updates for message items + return new_output, None + else: + key = delta_type + + content_index = data.get("content_index", 0) + if "content" not in item: + item["content"] = [] + else: + item["content"] = list(item["content"]) + content_list = item["content"] + + while len(content_list) <= content_index: + content_list.append({"type": "text", "text": ""}) + + # Copy the part to mutate it + part = content_list[content_index].copy() + content_list[content_index] = part + + current_val = part.get(key) + if current_val is None: + # Initialize based on delta type + current_val = {} if isinstance(delta, dict) else "" + + part[key] = deep_merge(current_val, delta) + + elif item_type == "reasoning": + # Reasoning items: "reasoning_text"/"reasoning_summary_text" -> "text" + # "text"/"output_text" -> Skipped (should use message item) + if delta_type == "reasoning_summary_text": + # Summary updates -> item['summary'] + key = "text" + summary_index = data.get("summary_index", 0) + if "summary" not in item: + item["summary"] = [] + else: + item["summary"] = list(item["summary"]) + summary_list = item["summary"] + + while len(summary_list) <= summary_index: + summary_list.append( + {"type": "summary_text", "text": ""} + ) + + part = summary_list[summary_index].copy() + summary_list[summary_index] = part + + target_val = part.get(key, "") + part[key] = deep_merge(target_val, delta) + + elif delta_type == "reasoning_text": + # Reasoning body updates -> item['content'] + key = "text" + content_index = data.get("content_index", 0) + if "content" not in item: + item["content"] = [] + else: + item["content"] = list(item["content"]) + content_list = item["content"] + + while len(content_list) <= content_index: + # Reasoning content parts default to text + content_list.append({"type": "text", "text": ""}) + + part = content_list[content_index].copy() + content_list[content_index] = part + + target_val = part.get(key, "") + part[key] = deep_merge(target_val, delta) + + elif delta_type in ["text", "output_text"]: + return new_output, None + else: + # Fallback just in case other deltas target reasoning? + pass + + else: + # Fallback for other item types + if delta_type in ["text", "output_text"]: + key = "text" + else: + key = delta_type + + current_val = item.get(key) + if current_val is None: + current_val = {} if isinstance(delta, dict) else "" + item[key] = deep_merge(current_val, delta) + + return new_output, None + + elif event_type.startswith("response.") and event_type.endswith(".done"): + # Delta Events: response.content_part.done, response.text.done, etc. + parts = event_type.split(".") + if len(parts) >= 3: + type_name = parts[1] + + # 1. Handle specific Delta "done" signals + if type_name == "content_part": + # "Signaling that no further changes will occur to a content part" + # If payloads contains the full part, we could update it. + # Usually purely signaling in standard implementation, but we check payload. + part = data.get("part") + output_index = data.get("output_index", len(current_output) - 1) + + if part and current_output and 0 <= output_index < len(current_output): + new_output = list(current_output) + item = new_output[output_index].copy() + new_output[output_index] = item + + if "content" in item: + item["content"] = list(item["content"]) + content_index = data.get( + "content_index", len(item["content"]) - 1 + ) + if 0 <= content_index < len(item["content"]): + item["content"][content_index] = part + return new_output, {} + return current_output, None + + elif type_name == "reasoning_summary_part": + part = data.get("part") + output_index = data.get("output_index", len(current_output) - 1) + + if part and current_output and 0 <= output_index < len(current_output): + new_output = list(current_output) + item = new_output[output_index].copy() + new_output[output_index] = item + + if "summary" in item: + item["summary"] = list(item["summary"]) + summary_index = data.get( + "summary_index", len(item["summary"]) - 1 + ) + if 0 <= summary_index < len(item["summary"]): + item["summary"][summary_index] = part + return new_output, {} + return current_output, None + + # 2. Skip Output Item done (handled specifically below) + if type_name == "output_item": + pass + + # 3. Generic Field Done (text.done, audio.done) + elif type_name not in ["completed", "failed"]: + output_index = data.get("output_index", len(current_output) - 1) + if current_output and 0 <= output_index < len(current_output): + + key = ( + "text" + if type_name + in [ + "text", + "output_text", + "reasoning_text", + "reasoning_summary_text", + ] + else type_name + ) + if type_name == "function_call_arguments": + key = "arguments" + + if key in data: + final_value = data[key] + new_output = list(current_output) + item = new_output[output_index].copy() + new_output[output_index] = item + item_type = item.get("type", "") + + if type_name == "function_call_arguments": + if item_type == "function_call": + item["arguments"] = final_value + elif item_type == "message": + content_index = data.get("content_index", 0) + if "content" in item: + item["content"] = list(item["content"]) + if len(item["content"]) > content_index: + part = item["content"][content_index].copy() + item["content"][content_index] = part + part[key] = final_value + elif item_type == "reasoning": + item["status"] = "completed" + else: + item[key] = final_value + + return new_output, {} + + return current_output, None + + elif event_type == "response.output_item.done": + # Delta Event: Output item complete + item = data.get("item") + output_index = data.get("output_index", len(current_output) - 1) + + new_output = list(current_output) + if item and 0 <= output_index < len(current_output): + new_output[output_index] = item + elif item: + new_output.append(item) + return new_output, {} + + elif event_type == "response.completed": + # State Machine Event: Completed + response_data = data.get("response", {}) + final_output = response_data.get("output") + + new_output = final_output if final_output is not None else current_output + + # Ensure reasoning items are marked as completed in the final output + if new_output: + for item in new_output: + if ( + item.get("type") == "reasoning" + and item.get("status") != "completed" + ): + item["status"] = "completed" + + return new_output, {"usage": response_data.get("usage"), "done": True} + + elif event_type == "response.in_progress": + # State Machine Event: In Progress + # We could extract metadata if needed, but for now just acknowledge iteration + return current_output, None + + elif event_type == "response.failed": + # State Machine Event: Failed + error = data.get("response", {}).get("error", {}) + return current_output, {"error": error} + + else: + return current_output, None + + def apply_source_context_to_messages( request: Request, messages: list, @@ -1570,7 +2036,9 @@ async def process_chat_payload(request, form_data, user, metadata, model): raise e try: - filter_ids = get_sorted_filter_ids(request, model, metadata.get("filter_ids", [])) + filter_ids = get_sorted_filter_ids( + request, model, metadata.get("filter_ids", []) + ) filter_functions = Functions.get_functions_by_ids(filter_ids) form_data, flags = await process_filter_functions( @@ -2367,20 +2835,6 @@ async def process_chat_response( task_id = str(uuid4()) # Create a unique task ID. model_id = form_data.get("model", "") - def split_content_and_whitespace(content): - content_stripped = content.rstrip() - original_whitespace = ( - content[len(content_stripped) :] - if len(content) > len(content_stripped) - else "" - ) - return content_stripped, original_whitespace - - def is_opening_code_block(content): - backtick_segments = content.split("```") - # Even number of segments means the last backticks are opening a new block - return len(backtick_segments) > 1 and len(backtick_segments) % 2 == 0 - # Handle as a background task async def response_handler(response, events): def serialize_content_blocks(content_blocks, raw=False): @@ -2518,105 +2972,6 @@ async def process_chat_response( return content.strip() - def serialize_output(output: list) -> str: - """ - Convert OR-aligned output items to HTML for display. - For LLM consumption, use convert_output_to_messages() instead. - """ - content = "" - - # First pass: collect function_call_output items by call_id for lookup - tool_outputs = {} - for item in output: - if item.get("type") == "function_call_output": - tool_outputs[item.get("call_id")] = item - - # Second pass: render items in order - for item in output: - item_type = item.get("type", "") - - if item_type == "message": - for content_part in item.get("content", []): - if content_part.get("type") == "output_text": - text = content_part.get("text", "").strip() - if text: - content = f"{content}{text}\n" - - elif item_type == "function_call": - # Render tool call inline with its result (if available) - if content and not content.endswith("\n"): - content += "\n" - - call_id = item.get("call_id", "") - name = item.get("name", "") - arguments = item.get("arguments", "") - - result_item = tool_outputs.get(call_id) - if result_item: - result_text = "" - for out in result_item.get("output", []): - if out.get("type") == "input_text": - result_text += out.get("text", "") - files = result_item.get("files") - embeds = result_item.get("embeds", "") - - content += f'
\nTool Executed\n
\n' - else: - content += f'
\nExecuting...\n
\n' - - elif item_type == "function_call_output": - # Already handled inline with function_call above - pass - - elif item_type == "reasoning": - reasoning_content = "" - for content_part in item.get("content", []): - if content_part.get("type") == "output_text": - reasoning_content = content_part.get("text", "").strip() - - duration = item.get("duration") - status = item.get("status", "in_progress") - - if content and not content.endswith("\n"): - content += "\n" - - display = html.escape( - "\n".join( - (f"> {line}" if not line.startswith(">") else line) - for line in reasoning_content.splitlines() - ) - ) - - if status == "completed" or duration is not None: - content = f'{content}
\nThought for {duration or 0} seconds\n{display}\n
\n' - else: - content = f'{content}
\nThinking…\n{display}\n
\n' - - elif item_type == "open_webui:code_interpreter": - code = item.get("code", "") - output_val = item.get("output") - lang = item.get("lang", "") - - content_stripped, original_whitespace = ( - split_content_and_whitespace(content) - ) - if is_opening_code_block(content_stripped): - content = ( - content_stripped.rstrip("`").rstrip() - + original_whitespace - ) - else: - content = content_stripped + original_whitespace - - if content and not content.endswith("\n"): - content += "\n" - - if output_val: - output_escaped = html.escape(json.dumps(output_val)) - content = f'{content}
\nAnalyzed\n```{lang}\n{code}\n```\n
\n' - else: - content = f'{content}
\nAnalyzing...\n```{lang}\n{code}\n```\n
\n' - return content.strip() def convert_content_blocks_to_messages(content_blocks, raw=False): @@ -2982,16 +3337,19 @@ async def process_chat_response( if existing_output: output = existing_output else: - # Always create an initial message item (even if content is empty) - output = [ - { - "type": "message", - "id": output_id("msg"), - "status": "in_progress", - "role": "assistant", - "content": [{"type": "output_text", "text": content}], - } - ] + # Only create an initial message item if there is content to initialize with + if content: + output = [ + { + "type": "message", + "id": output_id("msg"), + "status": "in_progress", + "role": "assistant", + "content": [{"type": "output_text", "text": content}], + } + ] + else: + output = [] # Keep content_blocks for backward compatibility during transition content_blocks = [ @@ -3040,6 +3398,7 @@ async def process_chat_response( async def stream_body_handler(response, form_data): nonlocal content nonlocal content_blocks + nonlocal output response_tool_calls = [] @@ -3118,6 +3477,33 @@ async def process_chat_response( "data": data, } ) + # Check for Responses API events (type field starts with "response.") + elif data.get("type", "").startswith("response."): + + print(data) + + output, response_metadata = ( + handle_responses_streaming_event(data, output) + ) + + processed_data = { + "output": output, + "content": serialize_output(output), + } + + print(processed_data) + + # Merge any metadata (usage, done, etc.) + if response_metadata: + processed_data.update(response_metadata) + + await event_emitter( + { + "type": "chat:completion", + "data": processed_data, + } + ) + continue else: choices = data.get("choices", []) diff --git a/src/lib/components/AddConnectionModal.svelte b/src/lib/components/AddConnectionModal.svelte index 557549098c..a455627e11 100644 --- a/src/lib/components/AddConnectionModal.svelte +++ b/src/lib/components/AddConnectionModal.svelte @@ -42,6 +42,7 @@ let prefixId = ''; let enable = true; let apiVersion = ''; + let apiType = ''; // '' = chat completions (default), 'responses' = Responses API let headers = ''; @@ -183,7 +184,8 @@ connection_type: connectionType, auth_type, headers: headers ? JSON.parse(headers) : undefined, - ...(!ollama && azure ? { azure: true, api_version: apiVersion } : {}) + ...(!ollama && azure ? { azure: true, api_version: apiVersion } : {}), + ...(apiType ? { api_type: apiType } : {}) } }; @@ -221,6 +223,7 @@ connectionType = connection.config?.connection_type ?? 'external'; azure = connection.config?.azure ?? false; apiVersion = connection.config?.api_version ?? ''; + apiType = connection.config?.api_type ?? ''; } } }; @@ -506,7 +509,7 @@
{/if} + {#if !ollama && !direct} +
+ + +
+ +
+
+ {/if} +