mirror of
https://github.com/open-webui/open-webui.git
synced 2025-12-16 11:57:51 +01:00
wip
This commit is contained in:
152
backend/open_webui/routers/prompts.py
Normal file
152
backend/open_webui/routers/prompts.py
Normal file
@@ -0,0 +1,152 @@
|
||||
from typing import Optional
|
||||
|
||||
from open_webui.models.prompts import (
|
||||
PromptForm,
|
||||
PromptUserResponse,
|
||||
PromptModel,
|
||||
Prompts,
|
||||
)
|
||||
from open_webui.constants import ERROR_MESSAGES
|
||||
from fastapi import APIRouter, Depends, HTTPException, status, Request
|
||||
from open_webui.utils.auth import get_admin_user, get_verified_user
|
||||
from open_webui.utils.access_control import has_access, has_permission
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
############################
|
||||
# GetPrompts
|
||||
############################
|
||||
|
||||
|
||||
@router.get("/", response_model=list[PromptModel])
|
||||
async def get_prompts(user=Depends(get_verified_user)):
|
||||
if user.role == "admin":
|
||||
prompts = Prompts.get_prompts()
|
||||
else:
|
||||
prompts = Prompts.get_prompts_by_user_id(user.id, "read")
|
||||
|
||||
return prompts
|
||||
|
||||
|
||||
@router.get("/list", response_model=list[PromptUserResponse])
|
||||
async def get_prompt_list(user=Depends(get_verified_user)):
|
||||
if user.role == "admin":
|
||||
prompts = Prompts.get_prompts()
|
||||
else:
|
||||
prompts = Prompts.get_prompts_by_user_id(user.id, "write")
|
||||
|
||||
return prompts
|
||||
|
||||
|
||||
############################
|
||||
# CreateNewPrompt
|
||||
############################
|
||||
|
||||
|
||||
@router.post("/create", response_model=Optional[PromptModel])
|
||||
async def create_new_prompt(
|
||||
request: Request, form_data: PromptForm, user=Depends(get_verified_user)
|
||||
):
|
||||
if user.role != "admin" and not has_permission(
|
||||
user.id, "workspace.prompts", request.app.state.config.USER_PERMISSIONS
|
||||
):
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
detail=ERROR_MESSAGES.UNAUTHORIZED,
|
||||
)
|
||||
|
||||
prompt = Prompts.get_prompt_by_command(form_data.command)
|
||||
if prompt is None:
|
||||
prompt = Prompts.insert_new_prompt(user.id, form_data)
|
||||
|
||||
if prompt:
|
||||
return prompt
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_400_BAD_REQUEST,
|
||||
detail=ERROR_MESSAGES.DEFAULT(),
|
||||
)
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_400_BAD_REQUEST,
|
||||
detail=ERROR_MESSAGES.COMMAND_TAKEN,
|
||||
)
|
||||
|
||||
|
||||
############################
|
||||
# GetPromptByCommand
|
||||
############################
|
||||
|
||||
|
||||
@router.get("/command/{command}", response_model=Optional[PromptModel])
|
||||
async def get_prompt_by_command(command: str, user=Depends(get_verified_user)):
|
||||
prompt = Prompts.get_prompt_by_command(f"/{command}")
|
||||
|
||||
if prompt:
|
||||
if (
|
||||
user.role == "admin"
|
||||
or prompt.user_id == user.id
|
||||
or has_access(user.id, "read", prompt.access_control)
|
||||
):
|
||||
return prompt
|
||||
else:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
detail=ERROR_MESSAGES.NOT_FOUND,
|
||||
)
|
||||
|
||||
|
||||
############################
|
||||
# UpdatePromptByCommand
|
||||
############################
|
||||
|
||||
|
||||
@router.post("/command/{command}/update", response_model=Optional[PromptModel])
|
||||
async def update_prompt_by_command(
|
||||
command: str,
|
||||
form_data: PromptForm,
|
||||
user=Depends(get_verified_user),
|
||||
):
|
||||
prompt = Prompts.get_prompt_by_command(f"/{command}")
|
||||
if not prompt:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
detail=ERROR_MESSAGES.NOT_FOUND,
|
||||
)
|
||||
|
||||
if prompt.user_id != user.id and user.role != "admin":
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
detail=ERROR_MESSAGES.ACCESS_PROHIBITED,
|
||||
)
|
||||
|
||||
prompt = Prompts.update_prompt_by_command(f"/{command}", form_data)
|
||||
if prompt:
|
||||
return prompt
|
||||
else:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
detail=ERROR_MESSAGES.ACCESS_PROHIBITED,
|
||||
)
|
||||
|
||||
|
||||
############################
|
||||
# DeletePromptByCommand
|
||||
############################
|
||||
|
||||
|
||||
@router.delete("/command/{command}/delete", response_model=bool)
|
||||
async def delete_prompt_by_command(command: str, user=Depends(get_verified_user)):
|
||||
prompt = Prompts.get_prompt_by_command(f"/{command}")
|
||||
if not prompt:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
detail=ERROR_MESSAGES.NOT_FOUND,
|
||||
)
|
||||
|
||||
if prompt.user_id != user.id and user.role != "admin":
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
detail=ERROR_MESSAGES.ACCESS_PROHIBITED,
|
||||
)
|
||||
|
||||
result = Prompts.delete_prompt_by_command(f"/{command}")
|
||||
return result
|
||||
Reference in New Issue
Block a user