mirror of
https://github.com/open-webui/open-webui.git
synced 2026-02-24 04:00:31 +01:00
feat: analytics backend API with chat_message table
- Add chat_message table for message-level analytics with usage JSON field - Add migration to backfill from existing chats - Add /analytics endpoints: summary, models, users, daily - Support hourly/daily granularity for time-series data - Fill missing days/hours in date range
This commit is contained in:
@@ -68,6 +68,7 @@ from open_webui.socket.main import (
|
||||
get_models_in_use,
|
||||
)
|
||||
from open_webui.routers import (
|
||||
analytics,
|
||||
audio,
|
||||
images,
|
||||
ollama,
|
||||
@@ -1455,6 +1456,9 @@ app.include_router(functions.router, prefix="/api/v1/functions", tags=["function
|
||||
app.include_router(
|
||||
evaluations.router, prefix="/api/v1/evaluations", tags=["evaluations"]
|
||||
)
|
||||
app.include_router(
|
||||
analytics.router, prefix="/api/v1/analytics", tags=["analytics"]
|
||||
)
|
||||
app.include_router(utils.router, prefix="/api/v1/utils", tags=["utils"])
|
||||
|
||||
# SCIM 2.0 API for identity management
|
||||
|
||||
@@ -0,0 +1,160 @@
|
||||
"""Add chat_message table
|
||||
|
||||
Revision ID: 8452d01d26d7
|
||||
Revises: 374d2f66af06
|
||||
Create Date: 2026-02-01 04:00:00.000000
|
||||
|
||||
"""
|
||||
|
||||
import time
|
||||
import json
|
||||
import logging
|
||||
from typing import Sequence, Union
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
revision: str = "8452d01d26d7"
|
||||
down_revision: Union[str, None] = "374d2f66af06"
|
||||
branch_labels: Union[str, Sequence[str], None] = None
|
||||
depends_on: Union[str, Sequence[str], None] = None
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
# Step 1: Create table
|
||||
op.create_table(
|
||||
"chat_message",
|
||||
sa.Column("id", sa.String(), primary_key=True),
|
||||
sa.Column("chat_id", sa.String(), nullable=False, index=True),
|
||||
sa.Column("user_id", sa.String(), index=True),
|
||||
sa.Column("role", sa.String(), nullable=False),
|
||||
sa.Column("parent_id", sa.String(), nullable=True),
|
||||
sa.Column("content", sa.JSON(), nullable=True),
|
||||
sa.Column("output", sa.JSON(), nullable=True),
|
||||
sa.Column("model_id", sa.String(), nullable=True, index=True),
|
||||
sa.Column("files", sa.JSON(), nullable=True),
|
||||
sa.Column("sources", sa.JSON(), nullable=True),
|
||||
sa.Column("embeds", sa.JSON(), nullable=True),
|
||||
sa.Column("done", sa.Boolean(), default=True),
|
||||
sa.Column("status_history", sa.JSON(), nullable=True),
|
||||
sa.Column("error", sa.JSON(), nullable=True),
|
||||
sa.Column("usage", sa.JSON(), nullable=True),
|
||||
sa.Column("created_at", sa.BigInteger(), index=True),
|
||||
sa.Column("updated_at", sa.BigInteger()),
|
||||
sa.ForeignKeyConstraint(["chat_id"], ["chat.id"], ondelete="CASCADE"),
|
||||
)
|
||||
|
||||
# Create composite indexes
|
||||
op.create_index(
|
||||
"chat_message_chat_parent_idx", "chat_message", ["chat_id", "parent_id"]
|
||||
)
|
||||
op.create_index(
|
||||
"chat_message_model_created_idx", "chat_message", ["model_id", "created_at"]
|
||||
)
|
||||
op.create_index(
|
||||
"chat_message_user_created_idx", "chat_message", ["user_id", "created_at"]
|
||||
)
|
||||
|
||||
# Step 2: Backfill from existing chats
|
||||
conn = op.get_bind()
|
||||
|
||||
chat_table = sa.table(
|
||||
"chat",
|
||||
sa.column("id", sa.String()),
|
||||
sa.column("user_id", sa.String()),
|
||||
sa.column("chat", sa.JSON()),
|
||||
)
|
||||
|
||||
chat_message_table = sa.table(
|
||||
"chat_message",
|
||||
sa.column("id", sa.String()),
|
||||
sa.column("chat_id", sa.String()),
|
||||
sa.column("user_id", sa.String()),
|
||||
sa.column("role", sa.String()),
|
||||
sa.column("parent_id", sa.String()),
|
||||
sa.column("content", sa.Text()),
|
||||
sa.column("output", sa.JSON()),
|
||||
sa.column("model_id", sa.String()),
|
||||
sa.column("files", sa.JSON()),
|
||||
sa.column("sources", sa.JSON()),
|
||||
sa.column("embeds", sa.JSON()),
|
||||
sa.column("done", sa.Boolean()),
|
||||
sa.column("status_history", sa.JSON()),
|
||||
sa.column("error", sa.JSON()),
|
||||
sa.column("usage", sa.JSON()),
|
||||
sa.column("created_at", sa.BigInteger()),
|
||||
sa.column("updated_at", sa.BigInteger()),
|
||||
)
|
||||
|
||||
# Fetch all chats
|
||||
chats = conn.execute(
|
||||
sa.select(chat_table.c.id, chat_table.c.user_id, chat_table.c.chat)
|
||||
).fetchall()
|
||||
|
||||
now = int(time.time())
|
||||
messages_inserted = 0
|
||||
|
||||
for chat_row in chats:
|
||||
chat_id = chat_row[0]
|
||||
user_id = chat_row[1]
|
||||
chat_data = chat_row[2]
|
||||
|
||||
if not chat_data:
|
||||
continue
|
||||
|
||||
# Handle both string and dict chat data
|
||||
if isinstance(chat_data, str):
|
||||
try:
|
||||
chat_data = json.loads(chat_data)
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
history = chat_data.get("history", {})
|
||||
messages = history.get("messages", {})
|
||||
|
||||
for message_id, message in messages.items():
|
||||
if not isinstance(message, dict):
|
||||
continue
|
||||
|
||||
role = message.get("role")
|
||||
if not role:
|
||||
continue
|
||||
|
||||
timestamp = message.get("timestamp", now)
|
||||
|
||||
try:
|
||||
conn.execute(
|
||||
sa.insert(chat_message_table).values(
|
||||
id=message_id,
|
||||
chat_id=chat_id,
|
||||
user_id=user_id,
|
||||
role=role,
|
||||
parent_id=message.get("parentId"),
|
||||
content=message.get("content"),
|
||||
output=message.get("output"),
|
||||
model_id=message.get("model"),
|
||||
files=message.get("files"),
|
||||
sources=message.get("sources"),
|
||||
embeds=message.get("embeds"),
|
||||
done=message.get("done", True),
|
||||
status_history=message.get("statusHistory"),
|
||||
error=message.get("error"),
|
||||
created_at=timestamp,
|
||||
updated_at=timestamp,
|
||||
)
|
||||
)
|
||||
messages_inserted += 1
|
||||
except Exception as e:
|
||||
log.warning(f"Failed to insert message {message_id}: {e}")
|
||||
continue
|
||||
|
||||
log.info(f"Backfilled {messages_inserted} messages into chat_message table")
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
op.drop_index("chat_message_user_created_idx", table_name="chat_message")
|
||||
op.drop_index("chat_message_model_created_idx", table_name="chat_message")
|
||||
op.drop_index("chat_message_chat_parent_idx", table_name="chat_message")
|
||||
op.drop_table("chat_message")
|
||||
399
backend/open_webui/models/chat_messages.py
Normal file
399
backend/open_webui/models/chat_messages.py
Normal file
@@ -0,0 +1,399 @@
|
||||
import time
|
||||
import uuid
|
||||
from typing import Any, Optional
|
||||
|
||||
from sqlalchemy.orm import Session
|
||||
from open_webui.internal.db import Base, get_db_context
|
||||
|
||||
from pydantic import BaseModel, ConfigDict
|
||||
from sqlalchemy import (
|
||||
BigInteger,
|
||||
Boolean,
|
||||
Column,
|
||||
ForeignKey,
|
||||
String,
|
||||
Text,
|
||||
JSON,
|
||||
Index,
|
||||
)
|
||||
|
||||
####################
|
||||
# ChatMessage DB Schema
|
||||
####################
|
||||
|
||||
|
||||
class ChatMessage(Base):
|
||||
__tablename__ = "chat_message"
|
||||
|
||||
# Identity
|
||||
id = Column(String, primary_key=True)
|
||||
chat_id = Column(
|
||||
String, ForeignKey("chat.id", ondelete="CASCADE"), nullable=False, index=True
|
||||
)
|
||||
user_id = Column(String, index=True)
|
||||
|
||||
# Structure
|
||||
role = Column(String, nullable=False) # user, assistant, system
|
||||
parent_id = Column(String, nullable=True)
|
||||
|
||||
# Content
|
||||
content = Column(JSON, nullable=True) # Can be str or list of blocks
|
||||
output = Column(JSON, nullable=True)
|
||||
|
||||
# Model (for assistant messages)
|
||||
model_id = Column(String, nullable=True, index=True)
|
||||
|
||||
# Attachments
|
||||
files = Column(JSON, nullable=True)
|
||||
sources = Column(JSON, nullable=True)
|
||||
embeds = Column(JSON, nullable=True)
|
||||
|
||||
# Status
|
||||
done = Column(Boolean, default=True)
|
||||
status_history = Column(JSON, nullable=True)
|
||||
error = Column(JSON, nullable=True)
|
||||
|
||||
# Usage (tokens, timing, etc.)
|
||||
usage = Column(JSON, nullable=True)
|
||||
|
||||
# Timestamps
|
||||
created_at = Column(BigInteger, index=True)
|
||||
updated_at = Column(BigInteger)
|
||||
|
||||
__table_args__ = (
|
||||
Index("chat_message_chat_parent_idx", "chat_id", "parent_id"),
|
||||
Index("chat_message_model_created_idx", "model_id", "created_at"),
|
||||
Index("chat_message_user_created_idx", "user_id", "created_at"),
|
||||
)
|
||||
|
||||
|
||||
####################
|
||||
# Pydantic Models
|
||||
####################
|
||||
|
||||
|
||||
class ChatMessageModel(BaseModel):
|
||||
model_config = ConfigDict(from_attributes=True)
|
||||
|
||||
id: str
|
||||
chat_id: str
|
||||
user_id: str
|
||||
role: str
|
||||
parent_id: Optional[str] = None
|
||||
content: Optional[Any] = None # str or list of blocks
|
||||
output: Optional[list] = None
|
||||
model_id: Optional[str] = None
|
||||
files: Optional[list] = None
|
||||
sources: Optional[list] = None
|
||||
embeds: Optional[list] = None
|
||||
done: bool = True
|
||||
status_history: Optional[list] = None
|
||||
error: Optional[dict] = None
|
||||
usage: Optional[dict] = None
|
||||
created_at: int
|
||||
updated_at: int
|
||||
|
||||
|
||||
####################
|
||||
# Table Operations
|
||||
####################
|
||||
|
||||
|
||||
class ChatMessageTable:
|
||||
def upsert_message(
|
||||
self,
|
||||
message_id: str,
|
||||
chat_id: str,
|
||||
user_id: str,
|
||||
data: dict,
|
||||
db: Optional[Session] = None,
|
||||
) -> Optional[ChatMessageModel]:
|
||||
"""Insert or update a chat message."""
|
||||
with get_db_context(db) as db:
|
||||
now = int(time.time())
|
||||
timestamp = data.get("timestamp", now)
|
||||
|
||||
existing = db.get(ChatMessage, message_id)
|
||||
if existing:
|
||||
# Update existing
|
||||
if "role" in data:
|
||||
existing.role = data["role"]
|
||||
if "parent_id" in data:
|
||||
existing.parent_id = data.get("parent_id") or data.get("parentId")
|
||||
if "content" in data:
|
||||
existing.content = data.get("content")
|
||||
if "output" in data:
|
||||
existing.output = data.get("output")
|
||||
if "model_id" in data or "model" in data:
|
||||
existing.model_id = data.get("model_id") or data.get("model")
|
||||
if "files" in data:
|
||||
existing.files = data.get("files")
|
||||
if "sources" in data:
|
||||
existing.sources = data.get("sources")
|
||||
if "embeds" in data:
|
||||
existing.embeds = data.get("embeds")
|
||||
if "done" in data:
|
||||
existing.done = data.get("done", True)
|
||||
if "status_history" in data or "statusHistory" in data:
|
||||
existing.status_history = data.get("status_history") or data.get(
|
||||
"statusHistory"
|
||||
)
|
||||
if "error" in data:
|
||||
existing.error = data.get("error")
|
||||
# Extract usage from info.usage if present
|
||||
info = data.get("info", {})
|
||||
usage = info.get("usage") if info else None
|
||||
if usage:
|
||||
existing.usage = usage
|
||||
existing.updated_at = now
|
||||
db.commit()
|
||||
db.refresh(existing)
|
||||
return ChatMessageModel.model_validate(existing)
|
||||
else:
|
||||
# Insert new
|
||||
# Extract usage from info.usage if present
|
||||
info = data.get("info", {})
|
||||
usage = info.get("usage") if info else None
|
||||
message = ChatMessage(
|
||||
id=message_id,
|
||||
chat_id=chat_id,
|
||||
user_id=user_id,
|
||||
role=data.get("role", "user"),
|
||||
parent_id=data.get("parent_id") or data.get("parentId"),
|
||||
content=data.get("content"),
|
||||
output=data.get("output"),
|
||||
model_id=data.get("model_id") or data.get("model"),
|
||||
files=data.get("files"),
|
||||
sources=data.get("sources"),
|
||||
embeds=data.get("embeds"),
|
||||
done=data.get("done", True),
|
||||
status_history=data.get("status_history")
|
||||
or data.get("statusHistory"),
|
||||
error=data.get("error"),
|
||||
usage=usage,
|
||||
created_at=timestamp,
|
||||
updated_at=now,
|
||||
)
|
||||
db.add(message)
|
||||
db.commit()
|
||||
db.refresh(message)
|
||||
return ChatMessageModel.model_validate(message)
|
||||
|
||||
def get_message_by_id(
|
||||
self, id: str, db: Optional[Session] = None
|
||||
) -> Optional[ChatMessageModel]:
|
||||
with get_db_context(db) as db:
|
||||
message = db.get(ChatMessage, id)
|
||||
return ChatMessageModel.model_validate(message) if message else None
|
||||
|
||||
def get_messages_by_chat_id(
|
||||
self, chat_id: str, db: Optional[Session] = None
|
||||
) -> list[ChatMessageModel]:
|
||||
with get_db_context(db) as db:
|
||||
messages = (
|
||||
db.query(ChatMessage)
|
||||
.filter_by(chat_id=chat_id)
|
||||
.order_by(ChatMessage.created_at.asc())
|
||||
.all()
|
||||
)
|
||||
return [ChatMessageModel.model_validate(message) for message in messages]
|
||||
|
||||
def get_messages_by_user_id(
|
||||
self,
|
||||
user_id: str,
|
||||
skip: int = 0,
|
||||
limit: int = 50,
|
||||
db: Optional[Session] = None,
|
||||
) -> list[ChatMessageModel]:
|
||||
with get_db_context(db) as db:
|
||||
messages = (
|
||||
db.query(ChatMessage)
|
||||
.filter_by(user_id=user_id)
|
||||
.order_by(ChatMessage.created_at.desc())
|
||||
.offset(skip)
|
||||
.limit(limit)
|
||||
.all()
|
||||
)
|
||||
return [ChatMessageModel.model_validate(message) for message in messages]
|
||||
|
||||
def get_messages_by_model_id(
|
||||
self,
|
||||
model_id: str,
|
||||
start_date: Optional[int] = None,
|
||||
end_date: Optional[int] = None,
|
||||
skip: int = 0,
|
||||
limit: int = 100,
|
||||
db: Optional[Session] = None,
|
||||
) -> list[ChatMessageModel]:
|
||||
with get_db_context(db) as db:
|
||||
query = db.query(ChatMessage).filter_by(model_id=model_id)
|
||||
if start_date:
|
||||
query = query.filter(ChatMessage.created_at >= start_date)
|
||||
if end_date:
|
||||
query = query.filter(ChatMessage.created_at <= end_date)
|
||||
messages = (
|
||||
query.order_by(ChatMessage.created_at.desc())
|
||||
.offset(skip)
|
||||
.limit(limit)
|
||||
.all()
|
||||
)
|
||||
return [ChatMessageModel.model_validate(message) for message in messages]
|
||||
|
||||
def delete_messages_by_chat_id(
|
||||
self, chat_id: str, db: Optional[Session] = None
|
||||
) -> bool:
|
||||
with get_db_context(db) as db:
|
||||
db.query(ChatMessage).filter_by(chat_id=chat_id).delete()
|
||||
db.commit()
|
||||
return True
|
||||
|
||||
# Analytics methods
|
||||
def get_message_count_by_model(
|
||||
self,
|
||||
start_date: Optional[int] = None,
|
||||
end_date: Optional[int] = None,
|
||||
db: Optional[Session] = None,
|
||||
) -> dict[str, int]:
|
||||
with get_db_context(db) as db:
|
||||
from sqlalchemy import func
|
||||
|
||||
query = db.query(
|
||||
ChatMessage.model_id, func.count(ChatMessage.id).label("count")
|
||||
).filter(ChatMessage.role == "assistant", ChatMessage.model_id.isnot(None))
|
||||
|
||||
if start_date:
|
||||
query = query.filter(ChatMessage.created_at >= start_date)
|
||||
if end_date:
|
||||
query = query.filter(ChatMessage.created_at <= end_date)
|
||||
|
||||
results = query.group_by(ChatMessage.model_id).all()
|
||||
return {row.model_id: row.count for row in results}
|
||||
|
||||
def get_message_count_by_user(
|
||||
self,
|
||||
start_date: Optional[int] = None,
|
||||
end_date: Optional[int] = None,
|
||||
db: Optional[Session] = None,
|
||||
) -> dict[str, int]:
|
||||
with get_db_context(db) as db:
|
||||
from sqlalchemy import func
|
||||
|
||||
query = db.query(
|
||||
ChatMessage.user_id, func.count(ChatMessage.id).label("count")
|
||||
)
|
||||
|
||||
if start_date:
|
||||
query = query.filter(ChatMessage.created_at >= start_date)
|
||||
if end_date:
|
||||
query = query.filter(ChatMessage.created_at <= end_date)
|
||||
|
||||
results = query.group_by(ChatMessage.user_id).all()
|
||||
return {row.user_id: row.count for row in results}
|
||||
|
||||
def get_message_count_by_chat(
|
||||
self,
|
||||
start_date: Optional[int] = None,
|
||||
end_date: Optional[int] = None,
|
||||
db: Optional[Session] = None,
|
||||
) -> dict[str, int]:
|
||||
with get_db_context(db) as db:
|
||||
from sqlalchemy import func
|
||||
|
||||
query = db.query(
|
||||
ChatMessage.chat_id, func.count(ChatMessage.id).label("count")
|
||||
)
|
||||
|
||||
if start_date:
|
||||
query = query.filter(ChatMessage.created_at >= start_date)
|
||||
if end_date:
|
||||
query = query.filter(ChatMessage.created_at <= end_date)
|
||||
|
||||
results = query.group_by(ChatMessage.chat_id).all()
|
||||
return {row.chat_id: row.count for row in results}
|
||||
|
||||
def get_daily_message_counts_by_model(
|
||||
self,
|
||||
start_date: Optional[int] = None,
|
||||
end_date: Optional[int] = None,
|
||||
db: Optional[Session] = None,
|
||||
) -> dict[str, dict[str, int]]:
|
||||
"""Get message counts grouped by day and model."""
|
||||
with get_db_context(db) as db:
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
query = db.query(ChatMessage.created_at, ChatMessage.model_id).filter(
|
||||
ChatMessage.role == "assistant",
|
||||
ChatMessage.model_id.isnot(None)
|
||||
)
|
||||
|
||||
if start_date:
|
||||
query = query.filter(ChatMessage.created_at >= start_date)
|
||||
if end_date:
|
||||
query = query.filter(ChatMessage.created_at <= end_date)
|
||||
|
||||
results = query.all()
|
||||
|
||||
# Group by date -> model -> count
|
||||
daily_counts: dict[str, dict[str, int]] = {}
|
||||
for timestamp, model_id in results:
|
||||
date_str = datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d")
|
||||
if date_str not in daily_counts:
|
||||
daily_counts[date_str] = {}
|
||||
daily_counts[date_str][model_id] = daily_counts[date_str].get(model_id, 0) + 1
|
||||
|
||||
# Fill in missing days
|
||||
if start_date and end_date:
|
||||
current = datetime.fromtimestamp(start_date)
|
||||
end_dt = datetime.fromtimestamp(end_date)
|
||||
while current <= end_dt:
|
||||
date_str = current.strftime("%Y-%m-%d")
|
||||
if date_str not in daily_counts:
|
||||
daily_counts[date_str] = {}
|
||||
current += timedelta(days=1)
|
||||
|
||||
return daily_counts
|
||||
|
||||
def get_hourly_message_counts_by_model(
|
||||
self,
|
||||
start_date: Optional[int] = None,
|
||||
end_date: Optional[int] = None,
|
||||
db: Optional[Session] = None,
|
||||
) -> dict[str, dict[str, int]]:
|
||||
"""Get message counts grouped by hour and model."""
|
||||
with get_db_context(db) as db:
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
query = db.query(ChatMessage.created_at, ChatMessage.model_id).filter(
|
||||
ChatMessage.role == "assistant",
|
||||
ChatMessage.model_id.isnot(None)
|
||||
)
|
||||
|
||||
if start_date:
|
||||
query = query.filter(ChatMessage.created_at >= start_date)
|
||||
if end_date:
|
||||
query = query.filter(ChatMessage.created_at <= end_date)
|
||||
|
||||
results = query.all()
|
||||
|
||||
# Group by hour -> model -> count
|
||||
hourly_counts: dict[str, dict[str, int]] = {}
|
||||
for timestamp, model_id in results:
|
||||
hour_str = datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d %H:00")
|
||||
if hour_str not in hourly_counts:
|
||||
hourly_counts[hour_str] = {}
|
||||
hourly_counts[hour_str][model_id] = hourly_counts[hour_str].get(model_id, 0) + 1
|
||||
|
||||
# Fill in missing hours
|
||||
if start_date and end_date:
|
||||
current = datetime.fromtimestamp(start_date).replace(minute=0, second=0, microsecond=0)
|
||||
end_dt = datetime.fromtimestamp(end_date)
|
||||
while current <= end_dt:
|
||||
hour_str = current.strftime("%Y-%m-%d %H:00")
|
||||
if hour_str not in hourly_counts:
|
||||
hourly_counts[hour_str] = {}
|
||||
current += timedelta(hours=1)
|
||||
|
||||
return hourly_counts
|
||||
|
||||
|
||||
ChatMessages = ChatMessageTable()
|
||||
@@ -8,6 +8,7 @@ from sqlalchemy.orm import Session
|
||||
from open_webui.internal.db import Base, JSONField, get_db, get_db_context
|
||||
from open_webui.models.tags import TagModel, Tag, Tags
|
||||
from open_webui.models.folders import Folders
|
||||
from open_webui.models.chat_messages import ChatMessages
|
||||
from open_webui.utils.misc import sanitize_data_for_db, sanitize_text_for_db
|
||||
|
||||
from pydantic import BaseModel, ConfigDict
|
||||
@@ -314,6 +315,22 @@ class ChatTable:
|
||||
db.add(chat_item)
|
||||
db.commit()
|
||||
db.refresh(chat_item)
|
||||
|
||||
# Dual-write initial messages to chat_message table
|
||||
try:
|
||||
history = form_data.chat.get("history", {})
|
||||
messages = history.get("messages", {})
|
||||
for message_id, message in messages.items():
|
||||
if isinstance(message, dict) and message.get("role"):
|
||||
ChatMessages.upsert_message(
|
||||
message_id=message_id,
|
||||
chat_id=id,
|
||||
user_id=user_id,
|
||||
data=message,
|
||||
)
|
||||
except Exception as e:
|
||||
log.warning(f"Failed to write initial messages to chat_message table: {e}")
|
||||
|
||||
return ChatModel.model_validate(chat_item) if chat_item else None
|
||||
|
||||
def _chat_import_form_to_chat_model(
|
||||
@@ -356,6 +373,23 @@ class ChatTable:
|
||||
|
||||
db.add_all(chats)
|
||||
db.commit()
|
||||
|
||||
# Dual-write messages to chat_message table
|
||||
try:
|
||||
for form_data, chat_obj in zip(chat_import_forms, chats):
|
||||
history = form_data.chat.get("history", {})
|
||||
messages = history.get("messages", {})
|
||||
for message_id, message in messages.items():
|
||||
if isinstance(message, dict) and message.get("role"):
|
||||
ChatMessages.upsert_message(
|
||||
message_id=message_id,
|
||||
chat_id=chat_obj.id,
|
||||
user_id=user_id,
|
||||
data=message,
|
||||
)
|
||||
except Exception as e:
|
||||
log.warning(f"Failed to write imported messages to chat_message table: {e}")
|
||||
|
||||
return [ChatModel.model_validate(chat) for chat in chats]
|
||||
|
||||
def update_chat_by_id(
|
||||
@@ -458,6 +492,18 @@ class ChatTable:
|
||||
history["currentId"] = message_id
|
||||
|
||||
chat["history"] = history
|
||||
|
||||
# Dual-write to chat_message table
|
||||
try:
|
||||
ChatMessages.upsert_message(
|
||||
message_id=message_id,
|
||||
chat_id=id,
|
||||
user_id=self.get_chat_by_id(id).user_id,
|
||||
data=history["messages"][message_id],
|
||||
)
|
||||
except Exception as e:
|
||||
log.warning(f"Failed to write to chat_message table: {e}")
|
||||
|
||||
return self.update_chat_by_id(id, chat)
|
||||
|
||||
def add_message_status_to_chat_by_id_and_message_id(
|
||||
|
||||
194
backend/open_webui/routers/analytics.py
Normal file
194
backend/open_webui/routers/analytics.py
Normal file
@@ -0,0 +1,194 @@
|
||||
from typing import Optional
|
||||
import logging
|
||||
from fastapi import APIRouter, Depends, Query
|
||||
from pydantic import BaseModel
|
||||
|
||||
from open_webui.models.chat_messages import ChatMessages, ChatMessageModel
|
||||
from open_webui.utils.auth import get_admin_user
|
||||
from open_webui.internal.db import get_session
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
|
||||
####################
|
||||
# Response Models
|
||||
####################
|
||||
|
||||
|
||||
class ModelAnalyticsEntry(BaseModel):
|
||||
model_id: str
|
||||
count: int
|
||||
|
||||
|
||||
class ModelAnalyticsResponse(BaseModel):
|
||||
models: list[ModelAnalyticsEntry]
|
||||
|
||||
|
||||
class UserAnalyticsEntry(BaseModel):
|
||||
user_id: str
|
||||
name: Optional[str] = None
|
||||
email: Optional[str] = None
|
||||
count: int
|
||||
|
||||
|
||||
class UserAnalyticsResponse(BaseModel):
|
||||
users: list[UserAnalyticsEntry]
|
||||
|
||||
|
||||
####################
|
||||
# Endpoints
|
||||
####################
|
||||
|
||||
|
||||
@router.get("/models", response_model=ModelAnalyticsResponse)
|
||||
async def get_model_analytics(
|
||||
start_date: Optional[int] = Query(None, description="Start timestamp (epoch)"),
|
||||
end_date: Optional[int] = Query(None, description="End timestamp (epoch)"),
|
||||
user=Depends(get_admin_user),
|
||||
db: Session = Depends(get_session),
|
||||
):
|
||||
"""Get message counts per model."""
|
||||
counts = ChatMessages.get_message_count_by_model(
|
||||
start_date=start_date, end_date=end_date, db=db
|
||||
)
|
||||
models = [
|
||||
ModelAnalyticsEntry(model_id=model_id, count=count)
|
||||
for model_id, count in sorted(counts.items(), key=lambda x: -x[1])
|
||||
]
|
||||
return ModelAnalyticsResponse(models=models)
|
||||
|
||||
|
||||
@router.get("/users", response_model=UserAnalyticsResponse)
|
||||
async def get_user_analytics(
|
||||
start_date: Optional[int] = Query(None, description="Start timestamp (epoch)"),
|
||||
end_date: Optional[int] = Query(None, description="End timestamp (epoch)"),
|
||||
limit: int = Query(50, description="Max users to return"),
|
||||
user=Depends(get_admin_user),
|
||||
db: Session = Depends(get_session),
|
||||
):
|
||||
"""Get message counts per user with user info."""
|
||||
from open_webui.models.users import Users
|
||||
|
||||
counts = ChatMessages.get_message_count_by_user(
|
||||
start_date=start_date, end_date=end_date, db=db
|
||||
)
|
||||
|
||||
# Get user info for top users
|
||||
top_user_ids = [uid for uid, _ in sorted(counts.items(), key=lambda x: -x[1])[:limit]]
|
||||
user_info = {u.id: u for u in Users.get_users_by_user_ids(top_user_ids, db=db)}
|
||||
|
||||
users = []
|
||||
for user_id in top_user_ids:
|
||||
u = user_info.get(user_id)
|
||||
users.append(UserAnalyticsEntry(
|
||||
user_id=user_id,
|
||||
name=u.name if u else None,
|
||||
email=u.email if u else None,
|
||||
count=counts[user_id]
|
||||
))
|
||||
|
||||
return UserAnalyticsResponse(users=users)
|
||||
|
||||
|
||||
@router.get("/messages", response_model=list[ChatMessageModel])
|
||||
async def get_messages(
|
||||
model_id: Optional[str] = Query(None, description="Filter by model ID"),
|
||||
user_id: Optional[str] = Query(None, description="Filter by user ID"),
|
||||
chat_id: Optional[str] = Query(None, description="Filter by chat ID"),
|
||||
start_date: Optional[int] = Query(None, description="Start timestamp (epoch)"),
|
||||
end_date: Optional[int] = Query(None, description="End timestamp (epoch)"),
|
||||
skip: int = Query(0),
|
||||
limit: int = Query(50, le=100),
|
||||
user=Depends(get_admin_user),
|
||||
db: Session = Depends(get_session),
|
||||
):
|
||||
"""Query messages with filters."""
|
||||
if chat_id:
|
||||
return ChatMessages.get_messages_by_chat_id(chat_id=chat_id, db=db)
|
||||
elif model_id:
|
||||
return ChatMessages.get_messages_by_model_id(
|
||||
model_id=model_id,
|
||||
start_date=start_date,
|
||||
end_date=end_date,
|
||||
skip=skip,
|
||||
limit=limit,
|
||||
db=db,
|
||||
)
|
||||
elif user_id:
|
||||
return ChatMessages.get_messages_by_user_id(
|
||||
user_id=user_id, skip=skip, limit=limit, db=db
|
||||
)
|
||||
else:
|
||||
# Return empty if no filter specified
|
||||
return []
|
||||
|
||||
|
||||
class SummaryResponse(BaseModel):
|
||||
total_messages: int
|
||||
total_chats: int
|
||||
total_models: int
|
||||
total_users: int
|
||||
|
||||
|
||||
@router.get("/summary", response_model=SummaryResponse)
|
||||
async def get_summary(
|
||||
start_date: Optional[int] = Query(None, description="Start timestamp (epoch)"),
|
||||
end_date: Optional[int] = Query(None, description="End timestamp (epoch)"),
|
||||
user=Depends(get_admin_user),
|
||||
db: Session = Depends(get_session),
|
||||
):
|
||||
"""Get summary statistics for the dashboard."""
|
||||
model_counts = ChatMessages.get_message_count_by_model(
|
||||
start_date=start_date, end_date=end_date, db=db
|
||||
)
|
||||
user_counts = ChatMessages.get_message_count_by_user(
|
||||
start_date=start_date, end_date=end_date, db=db
|
||||
)
|
||||
chat_counts = ChatMessages.get_message_count_by_chat(
|
||||
start_date=start_date, end_date=end_date, db=db
|
||||
)
|
||||
|
||||
return SummaryResponse(
|
||||
total_messages=sum(model_counts.values()),
|
||||
total_chats=len(chat_counts),
|
||||
total_models=len(model_counts),
|
||||
total_users=len(user_counts),
|
||||
)
|
||||
|
||||
|
||||
class DailyStatsEntry(BaseModel):
|
||||
date: str
|
||||
models: dict[str, int]
|
||||
|
||||
|
||||
class DailyStatsResponse(BaseModel):
|
||||
data: list[DailyStatsEntry]
|
||||
|
||||
|
||||
@router.get("/daily", response_model=DailyStatsResponse)
|
||||
async def get_daily_stats(
|
||||
start_date: Optional[int] = Query(None, description="Start timestamp (epoch)"),
|
||||
end_date: Optional[int] = Query(None, description="End timestamp (epoch)"),
|
||||
granularity: str = Query("daily", description="Granularity: 'hourly' or 'daily'"),
|
||||
user=Depends(get_admin_user),
|
||||
db: Session = Depends(get_session),
|
||||
):
|
||||
"""Get message counts grouped by model for time-series chart."""
|
||||
if granularity == "hourly":
|
||||
counts = ChatMessages.get_hourly_message_counts_by_model(
|
||||
start_date=start_date, end_date=end_date, db=db
|
||||
)
|
||||
else:
|
||||
counts = ChatMessages.get_daily_message_counts_by_model(
|
||||
start_date=start_date, end_date=end_date, db=db
|
||||
)
|
||||
return DailyStatsResponse(
|
||||
data=[
|
||||
DailyStatsEntry(date=date, models=models)
|
||||
for date, models in sorted(counts.items())
|
||||
]
|
||||
)
|
||||
Reference in New Issue
Block a user