mirror of
https://github.com/open-webui/open-webui.git
synced 2026-02-24 12:11:56 +01:00
535 lines
20 KiB
Python
535 lines
20 KiB
Python
import json
|
|
import logging
|
|
|
|
import aiohttp
|
|
|
|
from open_webui.env import (
|
|
AIOHTTP_CLIENT_SESSION_SSL,
|
|
AIOHTTP_CLIENT_TIMEOUT_MODEL_LIST,
|
|
ENABLE_FORWARD_USER_INFO_HEADERS,
|
|
)
|
|
from open_webui.models.users import UserModel
|
|
from open_webui.utils.headers import include_user_info_headers
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
def is_anthropic_url(url: str) -> bool:
|
|
"""Check if the URL is an Anthropic API endpoint."""
|
|
return "api.anthropic.com" in url
|
|
|
|
|
|
async def get_anthropic_models(url: str, key: str, user: UserModel = None) -> dict:
|
|
"""
|
|
Fetch models from Anthropic's /v1/models endpoint with pagination.
|
|
Normalizes the response to OpenAI format.
|
|
"""
|
|
timeout = aiohttp.ClientTimeout(total=AIOHTTP_CLIENT_TIMEOUT_MODEL_LIST)
|
|
all_models = []
|
|
after_id = None
|
|
|
|
try:
|
|
async with aiohttp.ClientSession(timeout=timeout, trust_env=True) as session:
|
|
headers = {
|
|
"x-api-key": key,
|
|
"anthropic-version": "2023-06-01",
|
|
}
|
|
|
|
if ENABLE_FORWARD_USER_INFO_HEADERS and user:
|
|
headers = include_user_info_headers(headers, user)
|
|
|
|
while True:
|
|
params = {"limit": 1000}
|
|
if after_id:
|
|
params["after_id"] = after_id
|
|
|
|
async with session.get(
|
|
f"{url}/models",
|
|
headers=headers,
|
|
params=params,
|
|
ssl=AIOHTTP_CLIENT_SESSION_SSL,
|
|
) as response:
|
|
if response.status != 200:
|
|
error_detail = f"HTTP Error: {response.status}"
|
|
try:
|
|
res = await response.json()
|
|
if "error" in res:
|
|
error_detail = f"External Error: {res['error']}"
|
|
except Exception:
|
|
pass
|
|
return {"object": "list", "data": [], "error": error_detail}
|
|
|
|
data = await response.json()
|
|
|
|
for model in data.get("data", []):
|
|
all_models.append(
|
|
{
|
|
"id": model.get("id"),
|
|
"object": "model",
|
|
"created": 0,
|
|
"owned_by": "anthropic",
|
|
"name": model.get("display_name", model.get("id")),
|
|
}
|
|
)
|
|
|
|
if not data.get("has_more", False):
|
|
break
|
|
after_id = data.get("last_id")
|
|
|
|
except Exception as e:
|
|
log.error(f"Anthropic connection error: {e}")
|
|
return None
|
|
|
|
return {"object": "list", "data": all_models}
|
|
|
|
|
|
##############################
|
|
#
|
|
# Anthropic Messages API Conversion Utilities
|
|
#
|
|
##############################
|
|
|
|
|
|
def convert_anthropic_to_openai_payload(anthropic_payload: dict) -> dict:
|
|
"""
|
|
Convert an Anthropic Messages API request to OpenAI Chat Completions format.
|
|
|
|
Anthropic format:
|
|
{model, messages: [{role, content}], system, max_tokens, ...}
|
|
OpenAI format:
|
|
{model, messages: [{role, content}], max_tokens, ...}
|
|
"""
|
|
openai_payload = {}
|
|
|
|
# Model
|
|
openai_payload["model"] = anthropic_payload.get("model", "")
|
|
|
|
# Build messages list
|
|
messages = []
|
|
|
|
# System prompt (Anthropic has it as top-level, OpenAI as a system message)
|
|
system = anthropic_payload.get("system")
|
|
if system:
|
|
if isinstance(system, str):
|
|
messages.append({"role": "system", "content": system})
|
|
elif isinstance(system, list):
|
|
# Anthropic supports system as list of content blocks
|
|
text_parts = []
|
|
for block in system:
|
|
if isinstance(block, dict) and block.get("type") == "text":
|
|
text_parts.append(block.get("text", ""))
|
|
elif isinstance(block, str):
|
|
text_parts.append(block)
|
|
messages.append({"role": "system", "content": "\n".join(text_parts)})
|
|
|
|
# Convert messages
|
|
for msg in anthropic_payload.get("messages", []):
|
|
role = msg.get("role", "user")
|
|
content = msg.get("content")
|
|
|
|
if isinstance(content, str):
|
|
messages.append({"role": role, "content": content})
|
|
elif isinstance(content, list):
|
|
# Convert Anthropic content blocks to OpenAI format
|
|
openai_content = []
|
|
tool_calls = []
|
|
|
|
for block in content:
|
|
block_type = block.get("type", "text")
|
|
|
|
if block_type == "text":
|
|
openai_content.append(
|
|
{
|
|
"type": "text",
|
|
"text": block.get("text", ""),
|
|
}
|
|
)
|
|
elif block_type == "image":
|
|
source = block.get("source", {})
|
|
if source.get("type") == "base64":
|
|
media_type = source.get("media_type", "image/png")
|
|
data = source.get("data", "")
|
|
openai_content.append(
|
|
{
|
|
"type": "image_url",
|
|
"image_url": {
|
|
"url": f"data:{media_type};base64,{data}",
|
|
},
|
|
}
|
|
)
|
|
elif source.get("type") == "url":
|
|
openai_content.append(
|
|
{
|
|
"type": "image_url",
|
|
"image_url": {"url": source.get("url", "")},
|
|
}
|
|
)
|
|
elif block_type == "tool_use":
|
|
tool_calls.append(
|
|
{
|
|
"id": block.get("id", ""),
|
|
"type": "function",
|
|
"function": {
|
|
"name": block.get("name", ""),
|
|
"arguments": (
|
|
json.dumps(block.get("input", {}))
|
|
if isinstance(block.get("input"), dict)
|
|
else str(block.get("input", "{}"))
|
|
),
|
|
},
|
|
}
|
|
)
|
|
elif block_type == "tool_result":
|
|
# Tool results become separate tool messages in OpenAI format
|
|
tool_content = block.get("content", "")
|
|
if isinstance(tool_content, list):
|
|
tool_text_parts = []
|
|
for tc in tool_content:
|
|
if isinstance(tc, dict) and tc.get("type") == "text":
|
|
tool_text_parts.append(tc.get("text", ""))
|
|
tool_content = "\n".join(tool_text_parts)
|
|
|
|
# Propagate error status if present
|
|
if block.get("is_error"):
|
|
tool_content = f"Error: {tool_content}"
|
|
|
|
messages.append(
|
|
{
|
|
"role": "tool",
|
|
"tool_call_id": block.get("tool_use_id", ""),
|
|
"content": tool_content,
|
|
}
|
|
)
|
|
|
|
# Build the message
|
|
if tool_calls:
|
|
# Assistant message with tool calls
|
|
msg_dict = {"role": role}
|
|
if openai_content:
|
|
# If there's only text, flatten it
|
|
if len(openai_content) == 1 and openai_content[0]["type"] == "text":
|
|
msg_dict["content"] = openai_content[0]["text"]
|
|
else:
|
|
msg_dict["content"] = openai_content
|
|
else:
|
|
msg_dict["content"] = ""
|
|
msg_dict["tool_calls"] = tool_calls
|
|
messages.append(msg_dict)
|
|
elif openai_content:
|
|
# If there's only a single text block, flatten it to a string
|
|
if len(openai_content) == 1 and openai_content[0]["type"] == "text":
|
|
messages.append(
|
|
{"role": role, "content": openai_content[0]["text"]}
|
|
)
|
|
else:
|
|
messages.append({"role": role, "content": openai_content})
|
|
else:
|
|
messages.append({"role": role, "content": str(content) if content else ""})
|
|
|
|
openai_payload["messages"] = messages
|
|
|
|
# max_tokens
|
|
if "max_tokens" in anthropic_payload:
|
|
openai_payload["max_tokens"] = anthropic_payload["max_tokens"]
|
|
|
|
# Common parameters
|
|
for param in ("temperature", "top_p", "stop_sequences", "stream"):
|
|
if param in anthropic_payload:
|
|
if param == "stop_sequences":
|
|
openai_payload["stop"] = anthropic_payload[param]
|
|
else:
|
|
openai_payload[param] = anthropic_payload[param]
|
|
|
|
# Tools conversion: Anthropic → OpenAI
|
|
if "tools" in anthropic_payload:
|
|
openai_tools = []
|
|
for tool in anthropic_payload["tools"]:
|
|
openai_tools.append(
|
|
{
|
|
"type": "function",
|
|
"function": {
|
|
"name": tool.get("name", ""),
|
|
"description": tool.get("description", ""),
|
|
"parameters": tool.get("input_schema", {}),
|
|
},
|
|
}
|
|
)
|
|
openai_payload["tools"] = openai_tools
|
|
|
|
# tool_choice
|
|
if "tool_choice" in anthropic_payload:
|
|
tc = anthropic_payload["tool_choice"]
|
|
if isinstance(tc, dict):
|
|
tc_type = tc.get("type", "auto")
|
|
if tc_type == "auto":
|
|
openai_payload["tool_choice"] = "auto"
|
|
elif tc_type == "any":
|
|
openai_payload["tool_choice"] = "required"
|
|
elif tc_type == "tool":
|
|
openai_payload["tool_choice"] = {
|
|
"type": "function",
|
|
"function": {"name": tc.get("name", "")},
|
|
}
|
|
|
|
return openai_payload
|
|
|
|
|
|
def convert_openai_to_anthropic_response(
|
|
openai_response: dict, model: str = ""
|
|
) -> dict:
|
|
"""
|
|
Convert a non-streaming OpenAI Chat Completions response to Anthropic Messages format.
|
|
"""
|
|
import uuid as _uuid
|
|
|
|
choice = {}
|
|
if openai_response.get("choices"):
|
|
choice = openai_response["choices"][0]
|
|
|
|
message = choice.get("message", {})
|
|
finish_reason = choice.get("finish_reason", "stop")
|
|
|
|
# Map finish_reason to stop_reason
|
|
stop_reason_map = {
|
|
"stop": "end_turn",
|
|
"length": "max_tokens",
|
|
"tool_calls": "tool_use",
|
|
"content_filter": "end_turn",
|
|
}
|
|
stop_reason = stop_reason_map.get(finish_reason, "end_turn")
|
|
|
|
# Build content blocks
|
|
content = []
|
|
msg_content = message.get("content")
|
|
if msg_content:
|
|
content.append({"type": "text", "text": msg_content})
|
|
|
|
# Tool calls → tool_use blocks
|
|
tool_calls = message.get("tool_calls", [])
|
|
for tc in tool_calls:
|
|
func = tc.get("function", {})
|
|
try:
|
|
tool_input = json.loads(func.get("arguments", "{}"))
|
|
except (json.JSONDecodeError, TypeError):
|
|
tool_input = {}
|
|
content.append(
|
|
{
|
|
"type": "tool_use",
|
|
"id": tc.get("id", f"toolu_{_uuid.uuid4().hex[:24]}"),
|
|
"name": func.get("name", ""),
|
|
"input": tool_input,
|
|
}
|
|
)
|
|
|
|
# Usage
|
|
openai_usage = openai_response.get("usage", {})
|
|
usage = {
|
|
"input_tokens": openai_usage.get("prompt_tokens", 0),
|
|
"output_tokens": openai_usage.get("completion_tokens", 0),
|
|
}
|
|
|
|
return {
|
|
"id": openai_response.get("id", f"msg_{_uuid.uuid4().hex[:24]}"),
|
|
"type": "message",
|
|
"role": "assistant",
|
|
"content": content,
|
|
"model": model or openai_response.get("model", ""),
|
|
"stop_reason": stop_reason,
|
|
"stop_sequence": None,
|
|
"usage": usage,
|
|
}
|
|
|
|
|
|
async def openai_stream_to_anthropic_stream(openai_stream_generator, model: str = ""):
|
|
"""
|
|
Convert an OpenAI SSE streaming response to Anthropic Messages SSE format.
|
|
|
|
OpenAI sends: data: {"choices": [{"delta": {"content": "..."}}]}
|
|
Anthropic sends: event: content_block_delta\\ndata: {"type": "content_block_delta", ...}
|
|
|
|
Handles text content, tool calls, and mixed content with proper
|
|
multi-block indexing as required by Anthropic's streaming protocol.
|
|
"""
|
|
import uuid as _uuid
|
|
|
|
msg_id = f"msg_{_uuid.uuid4().hex[:24]}"
|
|
input_tokens = 0
|
|
output_tokens = 0
|
|
stop_reason = "end_turn"
|
|
|
|
# Track content blocks with a running index.
|
|
# Each text block or tool_use block gets its own index.
|
|
current_block_index = 0
|
|
text_block_open = False
|
|
|
|
# Track tool call state: maps OpenAI tool_call index -> Anthropic block index
|
|
# This allows handling multiple concurrent tool calls.
|
|
tool_call_blocks = {} # {openai_tc_index: anthropic_block_index}
|
|
tool_call_started = {} # {openai_tc_index: bool}
|
|
|
|
# Emit message_start
|
|
message_start = {
|
|
"type": "message_start",
|
|
"message": {
|
|
"id": msg_id,
|
|
"type": "message",
|
|
"role": "assistant",
|
|
"content": [],
|
|
"model": model,
|
|
"stop_reason": None,
|
|
"stop_sequence": None,
|
|
"usage": {"input_tokens": 0, "output_tokens": 0},
|
|
},
|
|
}
|
|
yield f"event: message_start\ndata: {json.dumps(message_start)}\n\n".encode()
|
|
|
|
try:
|
|
async for chunk in openai_stream_generator:
|
|
if isinstance(chunk, bytes):
|
|
chunk = chunk.decode("utf-8", errors="ignore")
|
|
|
|
for line in chunk.strip().split("\n"):
|
|
line = line.strip()
|
|
|
|
if not line or not line.startswith("data:"):
|
|
continue
|
|
|
|
data_str = line[5:].strip()
|
|
if data_str == "[DONE]":
|
|
continue
|
|
if data_str == "{}":
|
|
continue
|
|
|
|
try:
|
|
data = json.loads(data_str)
|
|
except (json.JSONDecodeError, TypeError):
|
|
continue
|
|
|
|
choices = data.get("choices", [])
|
|
if not choices:
|
|
# Check for usage in the final chunk
|
|
if data.get("usage"):
|
|
input_tokens = data["usage"].get("prompt_tokens", input_tokens)
|
|
output_tokens = data["usage"].get(
|
|
"completion_tokens", output_tokens
|
|
)
|
|
continue
|
|
|
|
delta = choices[0].get("delta", {})
|
|
finish_reason = choices[0].get("finish_reason")
|
|
|
|
# Update usage if present
|
|
if data.get("usage"):
|
|
input_tokens = data["usage"].get("prompt_tokens", input_tokens)
|
|
output_tokens = data["usage"].get(
|
|
"completion_tokens", output_tokens
|
|
)
|
|
|
|
# --- Handle text content ---
|
|
content = delta.get("content")
|
|
if content is not None:
|
|
if not text_block_open:
|
|
# Start a new text content block
|
|
block_start = {
|
|
"type": "content_block_start",
|
|
"index": current_block_index,
|
|
"content_block": {"type": "text", "text": ""},
|
|
}
|
|
yield f"event: content_block_start\ndata: {json.dumps(block_start)}\n\n".encode()
|
|
text_block_open = True
|
|
|
|
# Send text delta
|
|
block_delta = {
|
|
"type": "content_block_delta",
|
|
"index": current_block_index,
|
|
"delta": {"type": "text_delta", "text": content},
|
|
}
|
|
yield f"event: content_block_delta\ndata: {json.dumps(block_delta)}\n\n".encode()
|
|
|
|
# --- Handle tool calls ---
|
|
tool_calls = delta.get("tool_calls")
|
|
if tool_calls:
|
|
# Close text block if one is open (text comes before tools)
|
|
if text_block_open:
|
|
block_stop = {
|
|
"type": "content_block_stop",
|
|
"index": current_block_index,
|
|
}
|
|
yield f"event: content_block_stop\ndata: {json.dumps(block_stop)}\n\n".encode()
|
|
text_block_open = False
|
|
current_block_index += 1
|
|
|
|
for tc in tool_calls:
|
|
tc_index = tc.get("index", 0)
|
|
|
|
if tc_index not in tool_call_started:
|
|
# First time seeing this tool call — emit content_block_start
|
|
tool_call_blocks[tc_index] = current_block_index
|
|
tool_call_started[tc_index] = True
|
|
|
|
# Extract tool call ID and name from the first chunk
|
|
tc_id = tc.get("id", f"toolu_{_uuid.uuid4().hex[:24]}")
|
|
tc_name = tc.get("function", {}).get("name", "")
|
|
|
|
block_start = {
|
|
"type": "content_block_start",
|
|
"index": current_block_index,
|
|
"content_block": {
|
|
"type": "tool_use",
|
|
"id": tc_id,
|
|
"name": tc_name,
|
|
"input": {},
|
|
},
|
|
}
|
|
yield f"event: content_block_start\ndata: {json.dumps(block_start)}\n\n".encode()
|
|
current_block_index += 1
|
|
|
|
# Emit argument chunks as input_json_delta
|
|
args_chunk = tc.get("function", {}).get("arguments", "")
|
|
if args_chunk:
|
|
block_delta = {
|
|
"type": "content_block_delta",
|
|
"index": tool_call_blocks[tc_index],
|
|
"delta": {
|
|
"type": "input_json_delta",
|
|
"partial_json": args_chunk,
|
|
},
|
|
}
|
|
yield f"event: content_block_delta\ndata: {json.dumps(block_delta)}\n\n".encode()
|
|
|
|
# --- Handle finish reason ---
|
|
if finish_reason is not None:
|
|
stop_reason_map = {
|
|
"stop": "end_turn",
|
|
"length": "max_tokens",
|
|
"tool_calls": "tool_use",
|
|
}
|
|
stop_reason = stop_reason_map.get(finish_reason, "end_turn")
|
|
|
|
except Exception as e:
|
|
log.error(f"Error in Anthropic stream conversion: {e}")
|
|
|
|
# Close any open text block
|
|
if text_block_open:
|
|
block_stop = {"type": "content_block_stop", "index": current_block_index}
|
|
yield f"event: content_block_stop\ndata: {json.dumps(block_stop)}\n\n".encode()
|
|
|
|
# Close any open tool call blocks
|
|
for tc_index, block_index in tool_call_blocks.items():
|
|
block_stop = {"type": "content_block_stop", "index": block_index}
|
|
yield f"event: content_block_stop\ndata: {json.dumps(block_stop)}\n\n".encode()
|
|
|
|
# Emit message_delta with stop reason
|
|
message_delta = {
|
|
"type": "message_delta",
|
|
"delta": {
|
|
"stop_reason": stop_reason,
|
|
"stop_sequence": None,
|
|
},
|
|
"usage": {"output_tokens": output_tokens},
|
|
}
|
|
yield f"event: message_delta\ndata: {json.dumps(message_delta)}\n\n".encode()
|
|
|
|
# Emit message_stop
|
|
yield f"event: message_stop\ndata: {json.dumps({'type': 'message_stop'})}\n\n".encode()
|