Files
open-webui/backend/open_webui/utils/anthropic.py
Timothy Jaeryang Baek 631e30e22d refac
2026-02-21 15:35:34 -06:00

535 lines
20 KiB
Python

import json
import logging
import aiohttp
from open_webui.env import (
AIOHTTP_CLIENT_SESSION_SSL,
AIOHTTP_CLIENT_TIMEOUT_MODEL_LIST,
ENABLE_FORWARD_USER_INFO_HEADERS,
)
from open_webui.models.users import UserModel
from open_webui.utils.headers import include_user_info_headers
log = logging.getLogger(__name__)
def is_anthropic_url(url: str) -> bool:
"""Check if the URL is an Anthropic API endpoint."""
return "api.anthropic.com" in url
async def get_anthropic_models(url: str, key: str, user: UserModel = None) -> dict:
"""
Fetch models from Anthropic's /v1/models endpoint with pagination.
Normalizes the response to OpenAI format.
"""
timeout = aiohttp.ClientTimeout(total=AIOHTTP_CLIENT_TIMEOUT_MODEL_LIST)
all_models = []
after_id = None
try:
async with aiohttp.ClientSession(timeout=timeout, trust_env=True) as session:
headers = {
"x-api-key": key,
"anthropic-version": "2023-06-01",
}
if ENABLE_FORWARD_USER_INFO_HEADERS and user:
headers = include_user_info_headers(headers, user)
while True:
params = {"limit": 1000}
if after_id:
params["after_id"] = after_id
async with session.get(
f"{url}/models",
headers=headers,
params=params,
ssl=AIOHTTP_CLIENT_SESSION_SSL,
) as response:
if response.status != 200:
error_detail = f"HTTP Error: {response.status}"
try:
res = await response.json()
if "error" in res:
error_detail = f"External Error: {res['error']}"
except Exception:
pass
return {"object": "list", "data": [], "error": error_detail}
data = await response.json()
for model in data.get("data", []):
all_models.append(
{
"id": model.get("id"),
"object": "model",
"created": 0,
"owned_by": "anthropic",
"name": model.get("display_name", model.get("id")),
}
)
if not data.get("has_more", False):
break
after_id = data.get("last_id")
except Exception as e:
log.error(f"Anthropic connection error: {e}")
return None
return {"object": "list", "data": all_models}
##############################
#
# Anthropic Messages API Conversion Utilities
#
##############################
def convert_anthropic_to_openai_payload(anthropic_payload: dict) -> dict:
"""
Convert an Anthropic Messages API request to OpenAI Chat Completions format.
Anthropic format:
{model, messages: [{role, content}], system, max_tokens, ...}
OpenAI format:
{model, messages: [{role, content}], max_tokens, ...}
"""
openai_payload = {}
# Model
openai_payload["model"] = anthropic_payload.get("model", "")
# Build messages list
messages = []
# System prompt (Anthropic has it as top-level, OpenAI as a system message)
system = anthropic_payload.get("system")
if system:
if isinstance(system, str):
messages.append({"role": "system", "content": system})
elif isinstance(system, list):
# Anthropic supports system as list of content blocks
text_parts = []
for block in system:
if isinstance(block, dict) and block.get("type") == "text":
text_parts.append(block.get("text", ""))
elif isinstance(block, str):
text_parts.append(block)
messages.append({"role": "system", "content": "\n".join(text_parts)})
# Convert messages
for msg in anthropic_payload.get("messages", []):
role = msg.get("role", "user")
content = msg.get("content")
if isinstance(content, str):
messages.append({"role": role, "content": content})
elif isinstance(content, list):
# Convert Anthropic content blocks to OpenAI format
openai_content = []
tool_calls = []
for block in content:
block_type = block.get("type", "text")
if block_type == "text":
openai_content.append(
{
"type": "text",
"text": block.get("text", ""),
}
)
elif block_type == "image":
source = block.get("source", {})
if source.get("type") == "base64":
media_type = source.get("media_type", "image/png")
data = source.get("data", "")
openai_content.append(
{
"type": "image_url",
"image_url": {
"url": f"data:{media_type};base64,{data}",
},
}
)
elif source.get("type") == "url":
openai_content.append(
{
"type": "image_url",
"image_url": {"url": source.get("url", "")},
}
)
elif block_type == "tool_use":
tool_calls.append(
{
"id": block.get("id", ""),
"type": "function",
"function": {
"name": block.get("name", ""),
"arguments": (
json.dumps(block.get("input", {}))
if isinstance(block.get("input"), dict)
else str(block.get("input", "{}"))
),
},
}
)
elif block_type == "tool_result":
# Tool results become separate tool messages in OpenAI format
tool_content = block.get("content", "")
if isinstance(tool_content, list):
tool_text_parts = []
for tc in tool_content:
if isinstance(tc, dict) and tc.get("type") == "text":
tool_text_parts.append(tc.get("text", ""))
tool_content = "\n".join(tool_text_parts)
# Propagate error status if present
if block.get("is_error"):
tool_content = f"Error: {tool_content}"
messages.append(
{
"role": "tool",
"tool_call_id": block.get("tool_use_id", ""),
"content": tool_content,
}
)
# Build the message
if tool_calls:
# Assistant message with tool calls
msg_dict = {"role": role}
if openai_content:
# If there's only text, flatten it
if len(openai_content) == 1 and openai_content[0]["type"] == "text":
msg_dict["content"] = openai_content[0]["text"]
else:
msg_dict["content"] = openai_content
else:
msg_dict["content"] = ""
msg_dict["tool_calls"] = tool_calls
messages.append(msg_dict)
elif openai_content:
# If there's only a single text block, flatten it to a string
if len(openai_content) == 1 and openai_content[0]["type"] == "text":
messages.append(
{"role": role, "content": openai_content[0]["text"]}
)
else:
messages.append({"role": role, "content": openai_content})
else:
messages.append({"role": role, "content": str(content) if content else ""})
openai_payload["messages"] = messages
# max_tokens
if "max_tokens" in anthropic_payload:
openai_payload["max_tokens"] = anthropic_payload["max_tokens"]
# Common parameters
for param in ("temperature", "top_p", "stop_sequences", "stream"):
if param in anthropic_payload:
if param == "stop_sequences":
openai_payload["stop"] = anthropic_payload[param]
else:
openai_payload[param] = anthropic_payload[param]
# Tools conversion: Anthropic → OpenAI
if "tools" in anthropic_payload:
openai_tools = []
for tool in anthropic_payload["tools"]:
openai_tools.append(
{
"type": "function",
"function": {
"name": tool.get("name", ""),
"description": tool.get("description", ""),
"parameters": tool.get("input_schema", {}),
},
}
)
openai_payload["tools"] = openai_tools
# tool_choice
if "tool_choice" in anthropic_payload:
tc = anthropic_payload["tool_choice"]
if isinstance(tc, dict):
tc_type = tc.get("type", "auto")
if tc_type == "auto":
openai_payload["tool_choice"] = "auto"
elif tc_type == "any":
openai_payload["tool_choice"] = "required"
elif tc_type == "tool":
openai_payload["tool_choice"] = {
"type": "function",
"function": {"name": tc.get("name", "")},
}
return openai_payload
def convert_openai_to_anthropic_response(
openai_response: dict, model: str = ""
) -> dict:
"""
Convert a non-streaming OpenAI Chat Completions response to Anthropic Messages format.
"""
import uuid as _uuid
choice = {}
if openai_response.get("choices"):
choice = openai_response["choices"][0]
message = choice.get("message", {})
finish_reason = choice.get("finish_reason", "stop")
# Map finish_reason to stop_reason
stop_reason_map = {
"stop": "end_turn",
"length": "max_tokens",
"tool_calls": "tool_use",
"content_filter": "end_turn",
}
stop_reason = stop_reason_map.get(finish_reason, "end_turn")
# Build content blocks
content = []
msg_content = message.get("content")
if msg_content:
content.append({"type": "text", "text": msg_content})
# Tool calls → tool_use blocks
tool_calls = message.get("tool_calls", [])
for tc in tool_calls:
func = tc.get("function", {})
try:
tool_input = json.loads(func.get("arguments", "{}"))
except (json.JSONDecodeError, TypeError):
tool_input = {}
content.append(
{
"type": "tool_use",
"id": tc.get("id", f"toolu_{_uuid.uuid4().hex[:24]}"),
"name": func.get("name", ""),
"input": tool_input,
}
)
# Usage
openai_usage = openai_response.get("usage", {})
usage = {
"input_tokens": openai_usage.get("prompt_tokens", 0),
"output_tokens": openai_usage.get("completion_tokens", 0),
}
return {
"id": openai_response.get("id", f"msg_{_uuid.uuid4().hex[:24]}"),
"type": "message",
"role": "assistant",
"content": content,
"model": model or openai_response.get("model", ""),
"stop_reason": stop_reason,
"stop_sequence": None,
"usage": usage,
}
async def openai_stream_to_anthropic_stream(openai_stream_generator, model: str = ""):
"""
Convert an OpenAI SSE streaming response to Anthropic Messages SSE format.
OpenAI sends: data: {"choices": [{"delta": {"content": "..."}}]}
Anthropic sends: event: content_block_delta\\ndata: {"type": "content_block_delta", ...}
Handles text content, tool calls, and mixed content with proper
multi-block indexing as required by Anthropic's streaming protocol.
"""
import uuid as _uuid
msg_id = f"msg_{_uuid.uuid4().hex[:24]}"
input_tokens = 0
output_tokens = 0
stop_reason = "end_turn"
# Track content blocks with a running index.
# Each text block or tool_use block gets its own index.
current_block_index = 0
text_block_open = False
# Track tool call state: maps OpenAI tool_call index -> Anthropic block index
# This allows handling multiple concurrent tool calls.
tool_call_blocks = {} # {openai_tc_index: anthropic_block_index}
tool_call_started = {} # {openai_tc_index: bool}
# Emit message_start
message_start = {
"type": "message_start",
"message": {
"id": msg_id,
"type": "message",
"role": "assistant",
"content": [],
"model": model,
"stop_reason": None,
"stop_sequence": None,
"usage": {"input_tokens": 0, "output_tokens": 0},
},
}
yield f"event: message_start\ndata: {json.dumps(message_start)}\n\n".encode()
try:
async for chunk in openai_stream_generator:
if isinstance(chunk, bytes):
chunk = chunk.decode("utf-8", errors="ignore")
for line in chunk.strip().split("\n"):
line = line.strip()
if not line or not line.startswith("data:"):
continue
data_str = line[5:].strip()
if data_str == "[DONE]":
continue
if data_str == "{}":
continue
try:
data = json.loads(data_str)
except (json.JSONDecodeError, TypeError):
continue
choices = data.get("choices", [])
if not choices:
# Check for usage in the final chunk
if data.get("usage"):
input_tokens = data["usage"].get("prompt_tokens", input_tokens)
output_tokens = data["usage"].get(
"completion_tokens", output_tokens
)
continue
delta = choices[0].get("delta", {})
finish_reason = choices[0].get("finish_reason")
# Update usage if present
if data.get("usage"):
input_tokens = data["usage"].get("prompt_tokens", input_tokens)
output_tokens = data["usage"].get(
"completion_tokens", output_tokens
)
# --- Handle text content ---
content = delta.get("content")
if content is not None:
if not text_block_open:
# Start a new text content block
block_start = {
"type": "content_block_start",
"index": current_block_index,
"content_block": {"type": "text", "text": ""},
}
yield f"event: content_block_start\ndata: {json.dumps(block_start)}\n\n".encode()
text_block_open = True
# Send text delta
block_delta = {
"type": "content_block_delta",
"index": current_block_index,
"delta": {"type": "text_delta", "text": content},
}
yield f"event: content_block_delta\ndata: {json.dumps(block_delta)}\n\n".encode()
# --- Handle tool calls ---
tool_calls = delta.get("tool_calls")
if tool_calls:
# Close text block if one is open (text comes before tools)
if text_block_open:
block_stop = {
"type": "content_block_stop",
"index": current_block_index,
}
yield f"event: content_block_stop\ndata: {json.dumps(block_stop)}\n\n".encode()
text_block_open = False
current_block_index += 1
for tc in tool_calls:
tc_index = tc.get("index", 0)
if tc_index not in tool_call_started:
# First time seeing this tool call — emit content_block_start
tool_call_blocks[tc_index] = current_block_index
tool_call_started[tc_index] = True
# Extract tool call ID and name from the first chunk
tc_id = tc.get("id", f"toolu_{_uuid.uuid4().hex[:24]}")
tc_name = tc.get("function", {}).get("name", "")
block_start = {
"type": "content_block_start",
"index": current_block_index,
"content_block": {
"type": "tool_use",
"id": tc_id,
"name": tc_name,
"input": {},
},
}
yield f"event: content_block_start\ndata: {json.dumps(block_start)}\n\n".encode()
current_block_index += 1
# Emit argument chunks as input_json_delta
args_chunk = tc.get("function", {}).get("arguments", "")
if args_chunk:
block_delta = {
"type": "content_block_delta",
"index": tool_call_blocks[tc_index],
"delta": {
"type": "input_json_delta",
"partial_json": args_chunk,
},
}
yield f"event: content_block_delta\ndata: {json.dumps(block_delta)}\n\n".encode()
# --- Handle finish reason ---
if finish_reason is not None:
stop_reason_map = {
"stop": "end_turn",
"length": "max_tokens",
"tool_calls": "tool_use",
}
stop_reason = stop_reason_map.get(finish_reason, "end_turn")
except Exception as e:
log.error(f"Error in Anthropic stream conversion: {e}")
# Close any open text block
if text_block_open:
block_stop = {"type": "content_block_stop", "index": current_block_index}
yield f"event: content_block_stop\ndata: {json.dumps(block_stop)}\n\n".encode()
# Close any open tool call blocks
for tc_index, block_index in tool_call_blocks.items():
block_stop = {"type": "content_block_stop", "index": block_index}
yield f"event: content_block_stop\ndata: {json.dumps(block_stop)}\n\n".encode()
# Emit message_delta with stop reason
message_delta = {
"type": "message_delta",
"delta": {
"stop_reason": stop_reason,
"stop_sequence": None,
},
"usage": {"output_tokens": output_tokens},
}
yield f"event: message_delta\ndata: {json.dumps(message_delta)}\n\n".encode()
# Emit message_stop
yield f"event: message_stop\ndata: {json.dumps({'type': 'message_stop'})}\n\n".encode()