Files
Timothy Jaeryang Baek b36f8d9314 chore: format
2026-02-13 15:00:47 -06:00

2134 lines
66 KiB
Python

import json
import logging
import base64
import io
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, Request, status, BackgroundTasks
from fastapi.responses import Response, StreamingResponse, FileResponse
from pydantic import BaseModel
from pydantic import field_validator
from open_webui.socket.main import (
emit_to_users,
enter_room_for_users,
sio,
get_user_ids_from_room,
)
from open_webui.models.users import (
UserIdNameResponse,
UserIdNameStatusResponse,
UserListResponse,
UserModelResponse,
Users,
UserModel,
UserNameResponse,
)
from open_webui.models.groups import Groups
from open_webui.models.channels import (
Channels,
ChannelModel,
ChannelForm,
ChannelResponse,
CreateChannelForm,
ChannelWebhookModel,
ChannelWebhookForm,
)
from open_webui.models.access_grants import AccessGrants, has_public_read_access_grant
from open_webui.models.messages import (
Messages,
MessageModel,
MessageResponse,
MessageWithReactionsResponse,
MessageForm,
)
from open_webui.utils.files import get_image_base64_from_file_id
from open_webui.config import ENABLE_ADMIN_CHAT_ACCESS, ENABLE_ADMIN_EXPORT
from open_webui.constants import ERROR_MESSAGES
from open_webui.env import STATIC_DIR
from open_webui.utils.models import (
get_all_models,
get_filtered_models,
)
from open_webui.utils.chat import generate_chat_completion
from open_webui.utils.auth import get_admin_user, get_verified_user
from open_webui.utils.access_control import has_permission
from open_webui.utils.webhook import post_webhook
from open_webui.utils.channels import extract_mentions, replace_mentions
from open_webui.internal.db import get_session
from sqlalchemy.orm import Session
log = logging.getLogger(__name__)
router = APIRouter()
def channel_has_access(
user_id: str,
channel: ChannelModel,
permission: str = "read",
strict: bool = True,
db: Optional[Session] = None,
) -> bool:
if AccessGrants.has_access(
user_id=user_id,
resource_type="channel",
resource_id=channel.id,
permission=permission,
db=db,
):
return True
if (
not strict
and permission == "write"
and has_public_read_access_grant(channel.access_grants)
):
return True
return False
def get_channel_users_with_access(
channel: ChannelModel, permission: str = "read", db: Optional[Session] = None
):
return AccessGrants.get_users_with_access(
resource_type="channel",
resource_id=channel.id,
permission=permission,
db=db,
)
def get_channel_permitted_group_and_user_ids(
channel: ChannelModel, permission: str = "read"
) -> Optional[dict[str, list[str]]]:
if permission == "read" and has_public_read_access_grant(channel.access_grants):
return None
user_ids = []
group_ids = []
for grant in channel.access_grants:
if grant.permission != permission:
continue
if grant.principal_type == "group":
group_ids.append(grant.principal_id)
elif grant.principal_type == "user" and grant.principal_id != "*":
user_ids.append(grant.principal_id)
return {
"user_ids": list(dict.fromkeys(user_ids)),
"group_ids": list(dict.fromkeys(group_ids)),
}
############################
# Channels Enabled Dependency
############################
def check_channels_access(request: Request, user: Optional[UserModel] = None):
"""Dependency to ensure channels are globally enabled."""
if not request.app.state.config.ENABLE_CHANNELS:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Channels are not enabled",
)
if user:
if user.role != "admin" and not has_permission(
user.id, "features.channels", request.app.state.config.USER_PERMISSIONS
):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=ERROR_MESSAGES.UNAUTHORIZED,
)
############################
# GetChatList
############################
class ChannelListItemResponse(ChannelModel):
user_ids: Optional[list[str]] = None # 'dm' channels only
users: Optional[list[UserIdNameStatusResponse]] = None # 'dm' channels only
last_message_at: Optional[int] = None # timestamp in epoch (time_ns)
unread_count: int = 0
@router.get("/", response_model=list[ChannelListItemResponse])
async def get_channels(
request: Request,
user=Depends(get_verified_user),
db: Session = Depends(get_session),
):
check_channels_access(request, user)
channels = Channels.get_channels_by_user_id(user.id, db=db)
channel_list = []
for channel in channels:
last_message = Messages.get_last_message_by_channel_id(channel.id, db=db)
last_message_at = last_message.created_at if last_message else None
channel_member = Channels.get_member_by_channel_and_user_id(
channel.id, user.id, db=db
)
unread_count = (
Messages.get_unread_message_count(
channel.id, user.id, channel_member.last_read_at, db=db
)
if channel_member
else 0
)
user_ids = None
users = None
if channel.type == "dm":
user_ids = [
member.user_id
for member in Channels.get_members_by_channel_id(channel.id, db=db)
]
users = [
UserIdNameStatusResponse(
**{
**user.model_dump(),
"is_active": Users.is_active(user),
}
)
for user in Users.get_users_by_user_ids(user_ids, db=db)
]
channel_list.append(
ChannelListItemResponse(
**channel.model_dump(),
user_ids=user_ids,
users=users,
last_message_at=last_message_at,
unread_count=unread_count,
)
)
return channel_list
@router.get("/list", response_model=list[ChannelModel])
async def get_all_channels(
request: Request,
user=Depends(get_verified_user),
db: Session = Depends(get_session),
):
check_channels_access(request)
if user.role == "admin":
return Channels.get_channels(db=db)
return Channels.get_channels_by_user_id(user.id, db=db)
############################
# GetDMChannelByUserId
############################
@router.get("/users/{user_id}", response_model=Optional[ChannelModel])
async def get_dm_channel_by_user_id(
request: Request,
user_id: str,
user=Depends(get_verified_user),
db: Session = Depends(get_session),
):
check_channels_access(request, user)
try:
existing_channel = Channels.get_dm_channel_by_user_ids(
[user.id, user_id], db=db
)
if existing_channel:
participant_ids = [
member.user_id
for member in Channels.get_members_by_channel_id(
existing_channel.id, db=db
)
]
await emit_to_users(
"events:channel",
{"data": {"type": "channel:created"}},
participant_ids,
)
await enter_room_for_users(
f"channel:{existing_channel.id}", participant_ids
)
Channels.update_member_active_status(
existing_channel.id, user.id, True, db=db
)
return ChannelModel(**existing_channel.model_dump())
channel = Channels.insert_new_channel(
CreateChannelForm(
type="dm",
name="",
user_ids=[user_id],
),
user.id,
db=db,
)
if channel:
participant_ids = [
member.user_id
for member in Channels.get_members_by_channel_id(channel.id, db=db)
]
await emit_to_users(
"events:channel",
{"data": {"type": "channel:created"}},
participant_ids,
)
await enter_room_for_users(f"channel:{channel.id}", participant_ids)
return ChannelModel(**channel.model_dump())
else:
raise Exception("Error creating channel")
except Exception as e:
log.exception(e)
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT()
)
############################
# CreateNewChannel
############################
@router.post("/create", response_model=Optional[ChannelModel])
async def create_new_channel(
request: Request,
form_data: CreateChannelForm,
user=Depends(get_verified_user),
db: Session = Depends(get_session),
):
check_channels_access(request, user)
if form_data.type not in ["group", "dm"] and user.role != "admin":
# Only admins can create standard channels (joined by default)
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=ERROR_MESSAGES.UNAUTHORIZED,
)
try:
if form_data.type == "dm":
existing_channel = Channels.get_dm_channel_by_user_ids(
[user.id, *form_data.user_ids], db=db
)
if existing_channel:
participant_ids = [
member.user_id
for member in Channels.get_members_by_channel_id(
existing_channel.id, db=db
)
]
await emit_to_users(
"events:channel",
{"data": {"type": "channel:created"}},
participant_ids,
)
await enter_room_for_users(
f"channel:{existing_channel.id}", participant_ids
)
Channels.update_member_active_status(
existing_channel.id, user.id, True, db=db
)
return ChannelModel(**existing_channel.model_dump())
channel = Channels.insert_new_channel(form_data, user.id, db=db)
if channel:
participant_ids = [
member.user_id
for member in Channels.get_members_by_channel_id(channel.id, db=db)
]
await emit_to_users(
"events:channel",
{"data": {"type": "channel:created"}},
participant_ids,
)
await enter_room_for_users(f"channel:{channel.id}", participant_ids)
return ChannelModel(**channel.model_dump())
else:
raise Exception("Error creating channel")
except Exception as e:
log.exception(e)
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT()
)
############################
# GetChannelById
############################
class ChannelFullResponse(ChannelResponse):
user_ids: Optional[list[str]] = None # 'group'/'dm' channels only
users: Optional[list[UserIdNameStatusResponse]] = None # 'group'/'dm' channels only
last_read_at: Optional[int] = None # timestamp in epoch (time_ns)
unread_count: int = 0
@router.get("/{id}", response_model=Optional[ChannelFullResponse])
async def get_channel_by_id(
request: Request,
id: str,
user=Depends(get_verified_user),
db: Session = Depends(get_session),
):
check_channels_access(request, user)
channel = Channels.get_channel_by_id(id, db=db)
if not channel:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND
)
user_ids = None
users = None
if channel.type in ["group", "dm"]:
if not Channels.is_user_channel_member(channel.id, user.id, db=db):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT()
)
user_ids = [
member.user_id
for member in Channels.get_members_by_channel_id(channel.id, db=db)
]
users = [
UserIdNameStatusResponse(
**{
**user.model_dump(),
"is_active": Users.is_active(user),
}
)
for user in Users.get_users_by_user_ids(user_ids, db=db)
]
channel_member = Channels.get_member_by_channel_and_user_id(
channel.id, user.id, db=db
)
unread_count = Messages.get_unread_message_count(
channel.id, user.id, channel_member.last_read_at if channel_member else None
)
return ChannelFullResponse(
**{
**channel.model_dump(),
"user_ids": user_ids,
"users": users,
"is_manager": Channels.is_user_channel_manager(
channel.id, user.id, db=db
),
"write_access": True,
"user_count": len(user_ids),
"last_read_at": channel_member.last_read_at if channel_member else None,
"unread_count": unread_count,
}
)
else:
if user.role != "admin" and not channel_has_access(
user.id, channel, permission="read", db=db
):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT()
)
write_access = channel_has_access(
user.id,
channel,
permission="write",
strict=False,
db=db,
)
user_count = len(get_channel_users_with_access(channel, "read", db=db))
channel_member = Channels.get_member_by_channel_and_user_id(
channel.id, user.id, db=db
)
unread_count = Messages.get_unread_message_count(
channel.id, user.id, channel_member.last_read_at if channel_member else None
)
return ChannelFullResponse(
**{
**channel.model_dump(),
"user_ids": user_ids,
"users": users,
"is_manager": Channels.is_user_channel_manager(
channel.id, user.id, db=db
),
"write_access": write_access or user.role == "admin",
"user_count": user_count,
"last_read_at": channel_member.last_read_at if channel_member else None,
"unread_count": unread_count,
}
)
############################
# GetChannelMembersById
############################
PAGE_ITEM_COUNT = 30
@router.get("/{id}/members", response_model=UserListResponse)
async def get_channel_members_by_id(
request: Request,
id: str,
query: Optional[str] = None,
order_by: Optional[str] = None,
direction: Optional[str] = None,
page: Optional[int] = 1,
user=Depends(get_verified_user),
db: Session = Depends(get_session),
):
check_channels_access(request, user)
channel = Channels.get_channel_by_id(id, db=db)
if not channel:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND
)
limit = PAGE_ITEM_COUNT
page = max(1, page)
skip = (page - 1) * limit
if channel.type in ["group", "dm"]:
if not Channels.is_user_channel_member(channel.id, user.id, db=db):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT()
)
if channel.type == "dm":
user_ids = [
member.user_id
for member in Channels.get_members_by_channel_id(channel.id, db=db)
]
users = Users.get_users_by_user_ids(user_ids, db=db)
total = len(users)
return {
"users": [
UserModelResponse(**user.model_dump(), is_active=Users.is_active(user))
for user in users
],
"total": total,
}
else:
filter = {}
if query:
filter["query"] = query
if order_by:
filter["order_by"] = order_by
if direction:
filter["direction"] = direction
if channel.type == "group":
filter["channel_id"] = channel.id
else:
filter["roles"] = ["!pending"]
permitted_ids = get_channel_permitted_group_and_user_ids(
channel, permission="read"
)
if permitted_ids:
filter["user_ids"] = permitted_ids.get("user_ids")
filter["group_ids"] = permitted_ids.get("group_ids")
result = Users.get_users(filter=filter, skip=skip, limit=limit, db=db)
users = result["users"]
total = result["total"]
return {
"users": [
UserModelResponse(**user.model_dump(), is_active=Users.is_active(user))
for user in users
],
"total": total,
}
#################################################
# UpdateIsActiveMemberByIdAndUserId
#################################################
class UpdateActiveMemberForm(BaseModel):
is_active: bool
@router.post("/{id}/members/active", response_model=bool)
async def update_is_active_member_by_id_and_user_id(
request: Request,
id: str,
form_data: UpdateActiveMemberForm,
user=Depends(get_verified_user),
db: Session = Depends(get_session),
):
check_channels_access(request)
channel = Channels.get_channel_by_id(id, db=db)
if not channel:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND
)
if not Channels.is_user_channel_member(channel.id, user.id, db=db):
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND
)
Channels.update_member_active_status(
channel.id, user.id, form_data.is_active, db=db
)
return True
#################################################
# AddMembersById
#################################################
class UpdateMembersForm(BaseModel):
user_ids: list[str] = []
group_ids: list[str] = []
@router.post("/{id}/update/members/add")
async def add_members_by_id(
request: Request,
id: str,
form_data: UpdateMembersForm,
user=Depends(get_verified_user),
db: Session = Depends(get_session),
):
check_channels_access(request, user)
channel = Channels.get_channel_by_id(id, db=db)
if not channel:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND
)
if channel.user_id != user.id and user.role != "admin":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT()
)
try:
memberships = Channels.add_members_to_channel(
channel.id, user.id, form_data.user_ids, form_data.group_ids, db=db
)
return memberships
except Exception as e:
log.exception(e)
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT()
)
#################################################
#
#################################################
class RemoveMembersForm(BaseModel):
user_ids: list[str] = []
@router.post("/{id}/update/members/remove")
async def remove_members_by_id(
request: Request,
id: str,
form_data: RemoveMembersForm,
user=Depends(get_verified_user),
db: Session = Depends(get_session),
):
check_channels_access(request, user)
channel = Channels.get_channel_by_id(id, db=db)
if not channel:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND
)
if channel.user_id != user.id and user.role != "admin":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT()
)
try:
deleted = Channels.remove_members_from_channel(
channel.id, form_data.user_ids, db=db
)
return deleted
except Exception as e:
log.exception(e)
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT()
)
############################
# UpdateChannelById
############################
@router.post("/{id}/update", response_model=Optional[ChannelModel])
async def update_channel_by_id(
request: Request,
id: str,
form_data: ChannelForm,
user=Depends(get_verified_user),
db: Session = Depends(get_session),
):
check_channels_access(request, user)
channel = Channels.get_channel_by_id(id, db=db)
if not channel:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND
)
if channel.user_id != user.id and user.role != "admin":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT()
)
try:
channel = Channels.update_channel_by_id(id, form_data, db=db)
return ChannelModel(**channel.model_dump())
except Exception as e:
log.exception(e)
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT()
)
############################
# DeleteChannelById
############################
@router.delete("/{id}/delete", response_model=bool)
async def delete_channel_by_id(
request: Request,
id: str,
user=Depends(get_verified_user),
db: Session = Depends(get_session),
):
check_channels_access(request, user)
channel = Channels.get_channel_by_id(id, db=db)
if not channel:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND
)
if channel.user_id != user.id and user.role != "admin":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT()
)
try:
Channels.delete_channel_by_id(id, db=db)
return True
except Exception as e:
log.exception(e)
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT()
)
############################
# GetChannelMessages
############################
class MessageUserResponse(MessageResponse):
data: bool | None = None
@field_validator("data", mode="before")
def convert_data_to_bool(cls, v):
# No data or not a dict → False
if not isinstance(v, dict):
return False
# True if ANY value in the dict is non-empty
return any(bool(val) for val in v.values())
@router.get("/{id}/messages", response_model=list[MessageUserResponse])
async def get_channel_messages(
request: Request,
id: str,
skip: int = 0,
limit: int = 50,
user=Depends(get_verified_user),
db: Session = Depends(get_session),
):
check_channels_access(request, user)
channel = Channels.get_channel_by_id(id, db=db)
if not channel:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND
)
if channel.type in ["group", "dm"]:
if not Channels.is_user_channel_member(channel.id, user.id, db=db):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT()
)
else:
if user.role != "admin" and not channel_has_access(
user.id, channel, permission="read", db=db
):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT()
)
channel_member = Channels.join_channel(
id, user.id, db=db
) # Ensure user is a member of the channel
message_list = Messages.get_messages_by_channel_id(id, skip, limit, db=db)
if not message_list:
return []
# Batch fetch all users in a single query (fixes N+1 problem)
user_ids = list(set(m.user_id for m in message_list))
users = {u.id: u for u in Users.get_users_by_user_ids(user_ids, db=db)}
messages = []
for message in message_list:
thread_replies = Messages.get_thread_replies_by_message_id(message.id, db=db)
latest_thread_reply_at = (
thread_replies[0].created_at if thread_replies else None
)
# Use message.user if present (for webhooks), otherwise look up by user_id
user_info = message.user
if user_info is None and message.user_id in users:
user_info = UserNameResponse(**users[message.user_id].model_dump())
messages.append(
MessageUserResponse(
**{
**message.model_dump(),
"reply_count": len(thread_replies),
"latest_reply_at": latest_thread_reply_at,
"reactions": Messages.get_reactions_by_message_id(
message.id, db=db
),
"user": user_info,
}
)
)
return messages
############################
# GetPinnedChannelMessages
############################
PAGE_ITEM_COUNT_PINNED = 20
@router.get("/{id}/messages/pinned", response_model=list[MessageWithReactionsResponse])
async def get_pinned_channel_messages(
request: Request,
id: str,
page: int = 1,
user=Depends(get_verified_user),
db: Session = Depends(get_session),
):
check_channels_access(request)
channel = Channels.get_channel_by_id(id, db=db)
if not channel:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND
)
if channel.type in ["group", "dm"]:
if not Channels.is_user_channel_member(channel.id, user.id, db=db):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT()
)
else:
if user.role != "admin" and not channel_has_access(
user.id, channel, permission="read", db=db
):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT()
)
page = max(1, page)
skip = (page - 1) * PAGE_ITEM_COUNT_PINNED
limit = PAGE_ITEM_COUNT_PINNED
message_list = Messages.get_pinned_messages_by_channel_id(id, skip, limit, db=db)
if not message_list:
return []
# Batch fetch all users in a single query (fixes N+1 problem)
user_ids = list(set(m.user_id for m in message_list))
users = {u.id: u for u in Users.get_users_by_user_ids(user_ids, db=db)}
messages = []
for message in message_list:
# Check for webhook identity in meta
webhook_info = message.meta.get("webhook") if message.meta else None
if webhook_info:
user_info = UserNameResponse(
id=webhook_info.get("id"),
name=webhook_info.get("name"),
role="webhook",
)
elif message.user_id in users:
user_info = UserNameResponse(**users[message.user_id].model_dump())
else:
user_info = None
messages.append(
MessageWithReactionsResponse(
**{
**message.model_dump(),
"reactions": Messages.get_reactions_by_message_id(
message.id, db=db
),
"user": user_info,
}
)
)
return messages
############################
# PostNewMessage
############################
async def send_notification(
name, webui_url, channel, message, active_user_ids, db=None
):
users = get_channel_users_with_access(channel, "read", db=db)
for user in users:
if (user.id not in active_user_ids) and Channels.is_user_channel_member(
channel.id, user.id, db=db
):
if user.settings:
webhook_url = user.settings.ui.get("notifications", {}).get(
"webhook_url", None
)
if webhook_url:
await post_webhook(
name,
webhook_url,
f"#{channel.name} - {webui_url}/channels/{channel.id}\n\n{message.content}",
{
"action": "channel",
"message": message.content,
"title": channel.name,
"url": f"{webui_url}/channels/{channel.id}",
},
)
return True
async def model_response_handler(request, channel, message, user, db=None):
MODELS = {
model["id"]: model
for model in get_filtered_models(await get_all_models(request, user=user), user)
}
mentions = extract_mentions(message.content)
message_content = replace_mentions(message.content)
model_mentions = {}
# check if the message is a reply to a message sent by a model
if (
message.reply_to_message
and message.reply_to_message.meta
and message.reply_to_message.meta.get("model_id", None)
):
model_id = message.reply_to_message.meta.get("model_id", None)
model_mentions[model_id] = {"id": model_id, "id_type": "M"}
# check if any of the mentions are models
for mention in mentions:
if mention["id_type"] == "M" and mention["id"] not in model_mentions:
model_mentions[mention["id"]] = mention
if not model_mentions:
return False
for mention in model_mentions.values():
model_id = mention["id"]
model = MODELS.get(model_id, None)
if model:
try:
# reverse to get in chronological order
thread_messages = Messages.get_messages_by_parent_id(
channel.id,
message.parent_id if message.parent_id else message.id,
db=db,
)[::-1]
response_message, channel = await new_message_handler(
request,
channel.id,
MessageForm(
**{
"parent_id": (
message.parent_id if message.parent_id else message.id
),
"content": f"",
"data": {},
"meta": {
"model_id": model_id,
"model_name": model.get("name", model_id),
},
}
),
user,
db,
)
thread_history = []
images = []
message_users = {}
for thread_message in thread_messages:
message_user = None
if thread_message.user_id not in message_users:
message_user = Users.get_user_by_id(
thread_message.user_id, db=db
)
message_users[thread_message.user_id] = message_user
else:
message_user = message_users[thread_message.user_id]
if thread_message.meta and thread_message.meta.get(
"model_id", None
):
# If the message was sent by a model, use the model name
message_model_id = thread_message.meta.get("model_id", None)
message_model = MODELS.get(message_model_id, None)
username = (
message_model.get("name", message_model_id)
if message_model
else message_model_id
)
else:
username = message_user.name if message_user else "Unknown"
thread_history.append(
f"{username}: {replace_mentions(thread_message.content)}"
)
thread_message_files = (thread_message.data or {}).get("files", [])
for file in thread_message_files:
if file.get("type", "") == "image":
images.append(file.get("url", ""))
elif file.get("content_type", "").startswith("image/"):
image = get_image_base64_from_file_id(file.get("id", ""))
if image:
images.append(image)
thread_history_string = "\n\n".join(thread_history)
system_message = {
"role": "system",
"content": f"You are {model.get('name', model_id)}, participating in a threaded conversation. Be concise and conversational."
+ (
f"Here's the thread history:\n\n\n{thread_history_string}\n\n\nContinue the conversation naturally as {model.get('name', model_id)}, addressing the most recent message while being aware of the full context."
if thread_history
else ""
),
}
content = f"{user.name if user else 'User'}: {message_content}"
if images:
content = [
{
"type": "text",
"text": content,
},
*[
{
"type": "image_url",
"image_url": {
"url": image,
},
}
for image in images
],
]
form_data = {
"model": model_id,
"messages": [
system_message,
{"role": "user", "content": content},
],
"stream": False,
}
res = await generate_chat_completion(
request,
form_data=form_data,
user=user,
)
if res:
if res.get("choices", []) and len(res["choices"]) > 0:
await update_message_by_id(
request,
channel.id,
response_message.id,
MessageForm(
**{
"content": res["choices"][0]["message"]["content"],
"meta": {
"done": True,
},
}
),
user,
db,
)
elif res.get("error", None):
await update_message_by_id(
request,
channel.id,
response_message.id,
MessageForm(
**{
"content": f"Error: {res['error']}",
"meta": {
"done": True,
},
}
),
user,
db,
)
except Exception as e:
log.info(e)
pass
return True
async def new_message_handler(
request: Request, id: str, form_data: MessageForm, user, db
):
channel = Channels.get_channel_by_id(id, db=db)
if not channel:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND
)
if channel.type in ["group", "dm"]:
if not Channels.is_user_channel_member(channel.id, user.id, db=db):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT()
)
else:
if user.role != "admin" and not channel_has_access(
user.id,
channel,
permission="write",
strict=False,
db=db,
):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT()
)
try:
message = Messages.insert_new_message(form_data, channel.id, user.id, db=db)
if message:
if channel.type in ["group", "dm"]:
members = Channels.get_members_by_channel_id(channel.id, db=db)
for member in members:
if not member.is_active:
Channels.update_member_active_status(
channel.id, member.user_id, True, db=db
)
message = Messages.get_message_by_id(message.id, db=db)
event_data = {
"channel_id": channel.id,
"message_id": message.id,
"data": {
"type": "message",
"data": {"temp_id": form_data.temp_id, **message.model_dump()},
},
"user": UserNameResponse(**user.model_dump()).model_dump(),
"channel": channel.model_dump(),
}
await sio.emit(
"events:channel",
event_data,
to=f"channel:{channel.id}",
)
if message.parent_id:
# If this message is a reply, emit to the parent message as well
parent_message = Messages.get_message_by_id(message.parent_id, db=db)
if parent_message:
await sio.emit(
"events:channel",
{
"channel_id": channel.id,
"message_id": parent_message.id,
"data": {
"type": "message:reply",
"data": parent_message.model_dump(),
},
"user": UserNameResponse(**user.model_dump()).model_dump(),
"channel": channel.model_dump(),
},
to=f"channel:{channel.id}",
)
return message, channel
else:
raise Exception("Error creating message")
except Exception as e:
log.exception(e)
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT()
)
@router.post("/{id}/messages/post", response_model=Optional[MessageModel])
async def post_new_message(
request: Request,
id: str,
form_data: MessageForm,
background_tasks: BackgroundTasks,
user=Depends(get_verified_user),
db: Session = Depends(get_session),
):
check_channels_access(request)
try:
message, channel = await new_message_handler(request, id, form_data, user, db)
try:
if files := message.data.get("files", []):
for file in files:
Channels.set_file_message_id_in_channel_by_id(
channel.id, file.get("id", ""), message.id, db=db
)
except Exception as e:
log.debug(e)
active_user_ids = get_user_ids_from_room(f"channel:{channel.id}")
# NOTE: We intentionally do NOT pass db to background_handler.
# Background tasks should manage their own short-lived sessions to avoid
# holding database connections during slow operations (e.g., LLM calls).
async def background_handler():
await model_response_handler(request, channel, message, user)
await send_notification(
request.app.state.WEBUI_NAME,
request.app.state.config.WEBUI_URL,
channel,
message,
active_user_ids,
)
background_tasks.add_task(background_handler)
return message
except HTTPException as e:
raise e
except Exception as e:
log.exception(e)
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT()
)
############################
# GetChannelMessage
############################
@router.get("/{id}/messages/{message_id}", response_model=Optional[MessageResponse])
async def get_channel_message(
request: Request,
id: str,
message_id: str,
user=Depends(get_verified_user),
db: Session = Depends(get_session),
):
check_channels_access(request)
channel = Channels.get_channel_by_id(id, db=db)
if not channel:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND
)
if channel.type in ["group", "dm"]:
if not Channels.is_user_channel_member(channel.id, user.id, db=db):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT()
)
else:
if user.role != "admin" and not channel_has_access(
user.id, channel, permission="read", db=db
):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT()
)
message = Messages.get_message_by_id(message_id, db=db)
if not message:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND
)
if message.channel_id != id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT()
)
return MessageResponse(
**{
**message.model_dump(),
"user": UserNameResponse(
**Users.get_user_by_id(message.user_id, db=db).model_dump()
),
}
)
############################
# GetChannelMessageData
############################
@router.get("/{id}/messages/{message_id}/data", response_model=Optional[dict])
async def get_channel_message_data(
request: Request,
id: str,
message_id: str,
user=Depends(get_verified_user),
db: Session = Depends(get_session),
):
check_channels_access(request)
channel = Channels.get_channel_by_id(id, db=db)
if not channel:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND
)
if channel.type in ["group", "dm"]:
if not Channels.is_user_channel_member(channel.id, user.id, db=db):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT()
)
else:
if user.role != "admin" and not channel_has_access(
user.id, channel, permission="read", db=db
):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT()
)
message = Messages.get_message_by_id(message_id, db=db)
if not message:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND
)
if message.channel_id != id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT()
)
return message.data
############################
# PinChannelMessage
############################
class PinMessageForm(BaseModel):
is_pinned: bool
@router.post(
"/{id}/messages/{message_id}/pin", response_model=Optional[MessageUserResponse]
)
async def pin_channel_message(
request: Request,
id: str,
message_id: str,
form_data: PinMessageForm,
user=Depends(get_verified_user),
db: Session = Depends(get_session),
):
check_channels_access(request)
channel = Channels.get_channel_by_id(id, db=db)
if not channel:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND
)
if channel.type in ["group", "dm"]:
if not Channels.is_user_channel_member(channel.id, user.id, db=db):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT()
)
else:
if user.role != "admin" and not channel_has_access(
user.id, channel, permission="read", db=db
):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT()
)
message = Messages.get_message_by_id(message_id, db=db)
if not message:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND
)
if message.channel_id != id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT()
)
try:
Messages.update_is_pinned_by_id(message_id, form_data.is_pinned, user.id, db=db)
message = Messages.get_message_by_id(message_id, db=db)
return MessageUserResponse(
**{
**message.model_dump(),
"user": UserNameResponse(
**Users.get_user_by_id(message.user_id, db=db).model_dump()
),
}
)
except Exception as e:
log.exception(e)
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT()
)
############################
# GetChannelThreadMessages
############################
@router.get(
"/{id}/messages/{message_id}/thread", response_model=list[MessageUserResponse]
)
async def get_channel_thread_messages(
request: Request,
id: str,
message_id: str,
skip: int = 0,
limit: int = 50,
user=Depends(get_verified_user),
db: Session = Depends(get_session),
):
check_channels_access(request)
channel = Channels.get_channel_by_id(id, db=db)
if not channel:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND
)
if channel.type in ["group", "dm"]:
if not Channels.is_user_channel_member(channel.id, user.id, db=db):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT()
)
else:
if user.role != "admin" and not channel_has_access(
user.id, channel, permission="read", db=db
):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT()
)
message_list = Messages.get_messages_by_parent_id(
id, message_id, skip, limit, db=db
)
if not message_list:
return []
# Batch fetch all users in a single query (fixes N+1 problem)
user_ids = list(set(m.user_id for m in message_list))
users = {u.id: u for u in Users.get_users_by_user_ids(user_ids, db=db)}
messages = []
for message in message_list:
# Use message.user if present (for webhooks), otherwise look up by user_id
user_info = message.user
if user_info is None and message.user_id in users:
user_info = UserNameResponse(**users[message.user_id].model_dump())
messages.append(
MessageUserResponse(
**{
**message.model_dump(),
"reply_count": 0,
"latest_reply_at": None,
"reactions": Messages.get_reactions_by_message_id(
message.id, db=db
),
"user": user_info,
}
)
)
return messages
############################
# UpdateMessageById
############################
@router.post(
"/{id}/messages/{message_id}/update", response_model=Optional[MessageModel]
)
async def update_message_by_id(
request: Request,
id: str,
message_id: str,
form_data: MessageForm,
user=Depends(get_verified_user),
db: Session = Depends(get_session),
):
check_channels_access(request)
channel = Channels.get_channel_by_id(id, db=db)
if not channel:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND
)
message = Messages.get_message_by_id(message_id, db=db)
if not message:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND
)
if message.channel_id != id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT()
)
if channel.type in ["group", "dm"]:
if not Channels.is_user_channel_member(channel.id, user.id, db=db):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT()
)
else:
if (
user.role != "admin"
and message.user_id != user.id
and not channel_has_access(user.id, channel, permission="read", db=db)
):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT()
)
try:
message = Messages.update_message_by_id(message_id, form_data, db=db)
message = Messages.get_message_by_id(message_id, db=db)
if message:
await sio.emit(
"events:channel",
{
"channel_id": channel.id,
"message_id": message.id,
"data": {
"type": "message:update",
"data": message.model_dump(),
},
"user": UserNameResponse(**user.model_dump()).model_dump(),
"channel": channel.model_dump(),
},
to=f"channel:{channel.id}",
)
return MessageModel(**message.model_dump())
except Exception as e:
log.exception(e)
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT()
)
############################
# AddReactionToMessage
############################
class ReactionForm(BaseModel):
name: str
@router.post("/{id}/messages/{message_id}/reactions/add", response_model=bool)
async def add_reaction_to_message(
request: Request,
id: str,
message_id: str,
form_data: ReactionForm,
user=Depends(get_verified_user),
db: Session = Depends(get_session),
):
check_channels_access(request)
channel = Channels.get_channel_by_id(id, db=db)
if not channel:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND
)
if channel.type in ["group", "dm"]:
if not Channels.is_user_channel_member(channel.id, user.id, db=db):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT()
)
else:
if user.role != "admin" and not channel_has_access(
user.id,
channel,
permission="write",
strict=False,
db=db,
):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT()
)
message = Messages.get_message_by_id(message_id, db=db)
if not message:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND
)
if message.channel_id != id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT()
)
try:
Messages.add_reaction_to_message(message_id, user.id, form_data.name, db=db)
message = Messages.get_message_by_id(message_id, db=db)
await sio.emit(
"events:channel",
{
"channel_id": channel.id,
"message_id": message.id,
"data": {
"type": "message:reaction:add",
"data": {
**message.model_dump(),
"name": form_data.name,
},
},
"user": UserNameResponse(**user.model_dump()).model_dump(),
"channel": channel.model_dump(),
},
to=f"channel:{channel.id}",
)
return True
except Exception as e:
log.exception(e)
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT()
)
############################
# RemoveReactionById
############################
@router.post("/{id}/messages/{message_id}/reactions/remove", response_model=bool)
async def remove_reaction_by_id_and_user_id_and_name(
request: Request,
id: str,
message_id: str,
form_data: ReactionForm,
user=Depends(get_verified_user),
db: Session = Depends(get_session),
):
check_channels_access(request)
channel = Channels.get_channel_by_id(id, db=db)
if not channel:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND
)
if channel.type in ["group", "dm"]:
if not Channels.is_user_channel_member(channel.id, user.id, db=db):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT()
)
else:
if user.role != "admin" and not channel_has_access(
user.id,
channel,
permission="write",
strict=False,
db=db,
):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT()
)
message = Messages.get_message_by_id(message_id, db=db)
if not message:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND
)
if message.channel_id != id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT()
)
try:
Messages.remove_reaction_by_id_and_user_id_and_name(
message_id, user.id, form_data.name, db=db
)
message = Messages.get_message_by_id(message_id, db=db)
await sio.emit(
"events:channel",
{
"channel_id": channel.id,
"message_id": message.id,
"data": {
"type": "message:reaction:remove",
"data": {
**message.model_dump(),
"name": form_data.name,
},
},
"user": UserNameResponse(**user.model_dump()).model_dump(),
"channel": channel.model_dump(),
},
to=f"channel:{channel.id}",
)
return True
except Exception as e:
log.exception(e)
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT()
)
############################
# DeleteMessageById
############################
@router.delete("/{id}/messages/{message_id}/delete", response_model=bool)
async def delete_message_by_id(
request: Request,
id: str,
message_id: str,
user=Depends(get_verified_user),
db: Session = Depends(get_session),
):
check_channels_access(request)
channel = Channels.get_channel_by_id(id, db=db)
if not channel:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND
)
message = Messages.get_message_by_id(message_id, db=db)
if not message:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND
)
if message.channel_id != id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT()
)
if channel.type in ["group", "dm"]:
if not Channels.is_user_channel_member(channel.id, user.id, db=db):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT()
)
else:
if (
user.role != "admin"
and message.user_id != user.id
and not channel_has_access(
user.id,
channel,
permission="write",
strict=False,
db=db,
)
):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT()
)
try:
Messages.delete_message_by_id(message_id, db=db)
await sio.emit(
"events:channel",
{
"channel_id": channel.id,
"message_id": message.id,
"data": {
"type": "message:delete",
"data": {
**message.model_dump(),
"user": UserNameResponse(**user.model_dump()).model_dump(),
},
},
"user": UserNameResponse(**user.model_dump()).model_dump(),
"channel": channel.model_dump(),
},
to=f"channel:{channel.id}",
)
if message.parent_id:
# If this message is a reply, emit to the parent message as well
parent_message = Messages.get_message_by_id(message.parent_id, db=db)
if parent_message:
await sio.emit(
"events:channel",
{
"channel_id": channel.id,
"message_id": parent_message.id,
"data": {
"type": "message:reply",
"data": parent_message.model_dump(),
},
"user": UserNameResponse(**user.model_dump()).model_dump(),
"channel": channel.model_dump(),
},
to=f"channel:{channel.id}",
)
return True
except Exception as e:
log.exception(e)
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT()
)
############################
# Webhooks
############################
@router.get("/webhooks/{webhook_id}/profile/image")
def get_webhook_profile_image(webhook_id: str, user=Depends(get_verified_user)):
"""Get webhook profile image by webhook ID."""
webhook = Channels.get_webhook_by_id(webhook_id)
if not webhook:
# Return default favicon if webhook not found
return FileResponse(f"{STATIC_DIR}/favicon.png")
if webhook.profile_image_url:
# Check if it's url or base64
if webhook.profile_image_url.startswith("http"):
return Response(
status_code=status.HTTP_302_FOUND,
headers={"Location": webhook.profile_image_url},
)
elif webhook.profile_image_url.startswith("data:image"):
try:
header, base64_data = webhook.profile_image_url.split(",", 1)
image_data = base64.b64decode(base64_data)
image_buffer = io.BytesIO(image_data)
media_type = header.split(";")[0].lstrip("data:")
return StreamingResponse(
image_buffer,
media_type=media_type,
headers={"Content-Disposition": "inline"},
)
except Exception as e:
pass
# Return default favicon if no profile image
return FileResponse(f"{STATIC_DIR}/favicon.png")
@router.get("/{id}/webhooks", response_model=list[ChannelWebhookModel])
async def get_channel_webhooks(
request: Request,
id: str,
user=Depends(get_verified_user),
db: Session = Depends(get_session),
):
check_channels_access(request)
channel = Channels.get_channel_by_id(id, db=db)
if not channel:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND
)
# Only channel managers can view webhooks
if (
not Channels.is_user_channel_manager(channel.id, user.id, db=db)
and user.role != "admin"
):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.UNAUTHORIZED
)
return Channels.get_webhooks_by_channel_id(id, db=db)
@router.post("/{id}/webhooks/create", response_model=ChannelWebhookModel)
async def create_channel_webhook(
request: Request,
id: str,
form_data: ChannelWebhookForm,
user=Depends(get_verified_user),
db: Session = Depends(get_session),
):
check_channels_access(request)
channel = Channels.get_channel_by_id(id, db=db)
if not channel:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND
)
# Only channel managers can create webhooks
if (
not Channels.is_user_channel_manager(channel.id, user.id, db=db)
and user.role != "admin"
):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.UNAUTHORIZED
)
webhook = Channels.insert_webhook(id, user.id, form_data, db=db)
if not webhook:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT()
)
return webhook
@router.post("/{id}/webhooks/{webhook_id}/update", response_model=ChannelWebhookModel)
async def update_channel_webhook(
request: Request,
id: str,
webhook_id: str,
form_data: ChannelWebhookForm,
user=Depends(get_verified_user),
db: Session = Depends(get_session),
):
check_channels_access(request)
channel = Channels.get_channel_by_id(id, db=db)
if not channel:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND
)
# Only channel managers can update webhooks
if (
not Channels.is_user_channel_manager(channel.id, user.id, db=db)
and user.role != "admin"
):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.UNAUTHORIZED
)
webhook = Channels.get_webhook_by_id(webhook_id, db=db)
if not webhook or webhook.channel_id != id:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND
)
updated = Channels.update_webhook_by_id(webhook_id, form_data, db=db)
if not updated:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT()
)
return updated
@router.delete("/{id}/webhooks/{webhook_id}/delete", response_model=bool)
async def delete_channel_webhook(
request: Request,
id: str,
webhook_id: str,
user=Depends(get_verified_user),
db: Session = Depends(get_session),
):
check_channels_access(request)
channel = Channels.get_channel_by_id(id, db=db)
if not channel:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND
)
# Only channel managers can delete webhooks
if (
not Channels.is_user_channel_manager(channel.id, user.id, db=db)
and user.role != "admin"
):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.UNAUTHORIZED
)
webhook = Channels.get_webhook_by_id(webhook_id, db=db)
if not webhook or webhook.channel_id != id:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND
)
return Channels.delete_webhook_by_id(webhook_id, db=db)
############################
# Public Webhook Endpoint
############################
class WebhookMessageForm(BaseModel):
content: str
@router.post("/webhooks/{webhook_id}/{token}")
async def post_webhook_message(
request: Request,
webhook_id: str,
token: str,
form_data: WebhookMessageForm,
db: Session = Depends(get_session),
):
"""Public endpoint to post messages via webhook. No authentication required."""
check_channels_access(request)
# Validate webhook
webhook = Channels.get_webhook_by_id_and_token(webhook_id, token, db=db)
if not webhook:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid webhook URL",
)
channel = Channels.get_channel_by_id(webhook.channel_id, db=db)
if not channel:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND
)
# Create message with webhook identity stored in meta
message = Messages.insert_new_message(
MessageForm(content=form_data.content, meta={"webhook": {"id": webhook.id}}),
webhook.channel_id,
webhook.user_id, # Required for DB but webhook info in meta takes precedence
db=db,
)
if not message:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Failed to create message",
)
# Update last_used_at
Channels.update_webhook_last_used_at(webhook_id, db=db)
# Get full message and emit event
message = Messages.get_message_by_id(message.id, db=db)
event_data = {
"channel_id": channel.id,
"message_id": message.id,
"data": {
"type": "message",
"data": {
**message.model_dump(),
"user": {
"id": webhook.id,
"name": webhook.name,
"role": "webhook",
},
},
},
"user": {
"id": webhook.id,
"name": webhook.name,
"role": "webhook",
},
"channel": channel.model_dump(),
}
await sio.emit(
"events:channel",
event_data,
to=f"channel:{channel.id}",
)
return {"success": True, "message_id": message.id}