enh: knowledge access control

This commit is contained in:
Timothy Jaeryang Baek
2024-11-16 16:51:55 -08:00
parent 8da24d81a4
commit 227cca35e8
23 changed files with 241 additions and 149 deletions

View File

@@ -36,7 +36,9 @@ from open_webui.utils.payload import (
apply_model_system_prompt_to_body,
)
from open_webui.utils.utils import get_admin_user, get_verified_user, has_access
from open_webui.utils.utils import get_admin_user, get_verified_user
from open_webui.utils.access_control import has_access
log = logging.getLogger(__name__)
log.setLevel(SRC_LOG_LEVELS["OPENAI"])

View File

@@ -13,6 +13,7 @@ from open_webui.apps.webui.models.files import FileMetadataResponse
from pydantic import BaseModel, ConfigDict
from sqlalchemy import BigInteger, Column, String, Text, JSON
from open_webui.utils.access_control import has_access
log = logging.getLogger(__name__)
log.setLevel(SRC_LOG_LEVELS["MODELS"])
@@ -129,7 +130,7 @@ class KnowledgeTable:
except Exception:
return None
def get_knowledge_items(self) -> list[KnowledgeModel]:
def get_knowledge_bases(self) -> list[KnowledgeModel]:
with get_db() as db:
return [
KnowledgeModel.model_validate(knowledge)
@@ -138,6 +139,17 @@ class KnowledgeTable:
.all()
]
def get_knowledge_bases_by_user_id(
self, user_id: str, permission: str = "write"
) -> list[KnowledgeModel]:
knowledge_bases = self.get_knowledge_bases()
return [
knowledge_base
for knowledge_base in knowledge_bases
if knowledge_base.user_id == user_id
or has_access(user_id, permission, knowledge_base.access_control)
]
def get_knowledge_by_id(self, id: str) -> Optional[KnowledgeModel]:
try:
with get_db() as db:

View File

@@ -15,7 +15,7 @@ from sqlalchemy.dialects import postgresql, sqlite
from sqlalchemy import BigInteger, Column, Text, JSON, Boolean
from open_webui.utils.utils import has_access
from open_webui.utils.access_control import has_access
log = logging.getLogger(__name__)

View File

@@ -26,64 +26,98 @@ log.setLevel(SRC_LOG_LEVELS["MODELS"])
router = APIRouter()
############################
# GetKnowledgeItems
# getKnowledgeBases
############################
@router.get(
"/", response_model=Optional[Union[list[KnowledgeResponse], KnowledgeResponse]]
)
async def get_knowledge_items(
id: Optional[str] = None, user=Depends(get_verified_user)
):
if id:
knowledge = Knowledges.get_knowledge_by_id(id=id)
@router.get("/", response_model=list[KnowledgeResponse])
async def get_knowledge(user=Depends(get_verified_user)):
knowledge_bases = []
if knowledge:
return knowledge
else:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=ERROR_MESSAGES.NOT_FOUND,
)
if user.role == "admin":
knowledge_bases = Knowledges.get_knowledge_bases()
else:
knowledge_bases = []
knowledge_bases = Knowledges.get_knowledge_bases_by_user_id(user.id, "read")
for knowledge in Knowledges.get_knowledge_items():
files = []
if knowledge.data:
files = Files.get_file_metadatas_by_ids(
knowledge.data.get("file_ids", [])
)
# Check if all files exist
if len(files) != len(knowledge.data.get("file_ids", [])):
missing_files = list(
set(knowledge.data.get("file_ids", []))
- set([file.id for file in files])
)
if missing_files:
data = knowledge.data or {}
file_ids = data.get("file_ids", [])
for missing_file in missing_files:
file_ids.remove(missing_file)
data["file_ids"] = file_ids
Knowledges.update_knowledge_by_id(
id=knowledge.id, form_data=KnowledgeUpdateForm(data=data)
)
files = Files.get_file_metadatas_by_ids(file_ids)
knowledge_bases.append(
KnowledgeResponse(
**knowledge.model_dump(),
files=files,
)
# Get files for each knowledge base
for knowledge_base in knowledge_bases:
files = []
if knowledge_base.data:
files = Files.get_file_metadatas_by_ids(
knowledge_base.data.get("file_ids", [])
)
return knowledge_bases
# Check if all files exist
if len(files) != len(knowledge_base.data.get("file_ids", [])):
missing_files = list(
set(knowledge_base.data.get("file_ids", []))
- set([file.id for file in files])
)
if missing_files:
data = knowledge_base.data or {}
file_ids = data.get("file_ids", [])
for missing_file in missing_files:
file_ids.remove(missing_file)
data["file_ids"] = file_ids
Knowledges.update_knowledge_by_id(
id=knowledge_base.id, form_data=KnowledgeUpdateForm(data=data)
)
files = Files.get_file_metadatas_by_ids(file_ids)
knowledge_base = KnowledgeResponse(
**knowledge_base.model_dump(),
files=files,
)
return knowledge_bases
@router.get("/list", response_model=list[KnowledgeResponse])
async def get_knowledge_list(user=Depends(get_verified_user)):
knowledge_bases = []
if user.role == "admin":
knowledge_bases = Knowledges.get_knowledge_bases()
else:
knowledge_bases = Knowledges.get_knowledge_bases_by_user_id(user.id, "write")
# Get files for each knowledge base
for knowledge_base in knowledge_bases:
files = []
if knowledge_base.data:
files = Files.get_file_metadatas_by_ids(
knowledge_base.data.get("file_ids", [])
)
# Check if all files exist
if len(files) != len(knowledge_base.data.get("file_ids", [])):
missing_files = list(
set(knowledge_base.data.get("file_ids", []))
- set([file.id for file in files])
)
if missing_files:
data = knowledge_base.data or {}
file_ids = data.get("file_ids", [])
for missing_file in missing_files:
file_ids.remove(missing_file)
data["file_ids"] = file_ids
Knowledges.update_knowledge_by_id(
id=knowledge_base.id, form_data=KnowledgeUpdateForm(data=data)
)
files = Files.get_file_metadatas_by_ids(file_ids)
knowledge_base = KnowledgeResponse(
**knowledge_base.model_dump(),
files=files,
)
return knowledge_bases
############################
@@ -92,7 +126,9 @@ async def get_knowledge_items(
@router.post("/create", response_model=Optional[KnowledgeResponse])
async def create_new_knowledge(form_data: KnowledgeForm, user=Depends(get_admin_user)):
async def create_new_knowledge(
form_data: KnowledgeForm, user=Depends(get_verified_user)
):
knowledge = Knowledges.insert_new_knowledge(user.id, form_data)
if knowledge:
@@ -141,7 +177,7 @@ async def get_knowledge_by_id(id: str, user=Depends(get_verified_user)):
async def update_knowledge_by_id(
id: str,
form_data: KnowledgeUpdateForm,
user=Depends(get_admin_user),
user=Depends(get_verified_user),
):
knowledge = Knowledges.update_knowledge_by_id(id=id, form_data=form_data)
@@ -173,7 +209,7 @@ class KnowledgeFileIdForm(BaseModel):
def add_file_to_knowledge_by_id(
id: str,
form_data: KnowledgeFileIdForm,
user=Depends(get_admin_user),
user=Depends(get_verified_user),
):
knowledge = Knowledges.get_knowledge_by_id(id=id)
file = Files.get_file_by_id(form_data.file_id)
@@ -238,7 +274,7 @@ def add_file_to_knowledge_by_id(
def update_file_from_knowledge_by_id(
id: str,
form_data: KnowledgeFileIdForm,
user=Depends(get_admin_user),
user=Depends(get_verified_user),
):
knowledge = Knowledges.get_knowledge_by_id(id=id)
file = Files.get_file_by_id(form_data.file_id)
@@ -288,7 +324,7 @@ def update_file_from_knowledge_by_id(
def remove_file_from_knowledge_by_id(
id: str,
form_data: KnowledgeFileIdForm,
user=Depends(get_admin_user),
user=Depends(get_verified_user),
):
knowledge = Knowledges.get_knowledge_by_id(id=id)
file = Files.get_file_by_id(form_data.file_id)
@@ -371,7 +407,7 @@ async def reset_knowledge_by_id(id: str, user=Depends(get_admin_user)):
@router.delete("/{id}/delete", response_model=bool)
async def delete_knowledge_by_id(id: str, user=Depends(get_admin_user)):
async def delete_knowledge_by_id(id: str, user=Depends(get_verified_user)):
try:
VECTOR_DB_CLIENT.delete_collection(collection_name=id)
except Exception as e:

View File

@@ -10,7 +10,9 @@ from open_webui.constants import ERROR_MESSAGES
from fastapi import APIRouter, Depends, HTTPException, Request, status
from open_webui.utils.utils import get_admin_user, get_verified_user, has_access
from open_webui.utils.utils import get_admin_user, get_verified_user
from open_webui.utils.access_control import has_access
router = APIRouter()

View File

@@ -134,8 +134,8 @@ from open_webui.utils.utils import (
get_current_user,
get_http_authorization_cred,
get_verified_user,
has_access,
)
from open_webui.utils.access_control import has_access
if SAFE_MODE:
print("SAFE MODE ENABLED")

View File

@@ -0,0 +1,57 @@
from typing import Optional, Union, List, Dict
from open_webui.apps.webui.models.groups import Groups
def has_permission(
user_id: str,
permission_key: str,
default_permissions: Dict[str, bool] = {},
) -> bool:
"""
Check if a user has a specific permission by checking the group permissions
and falls back to default permissions if not found in any group.
Permission keys can be hierarchical and separated by dots ('.').
"""
def get_permission(permissions: Dict[str, bool], keys: List[str]) -> bool:
"""Traverse permissions dict using a list of keys (from dot-split permission_key)."""
for key in keys:
if key not in permissions:
return False # If any part of the hierarchy is missing, deny access
permissions = permissions[key] # Go one level deeper
return bool(permissions) # Return the boolean at the final level
permission_hierarchy = permission_key.split(".")
# Retrieve user group permissions
user_groups = Groups.get_groups_by_member_id(user_id)
for group in user_groups:
group_permissions = group.permissions
if get_permission(group_permissions, permission_hierarchy):
return True
# Check default permissions afterwards if the group permissions don't allow it
return get_permission(default_permissions, permission_hierarchy)
def has_access(
user_id: str,
type: str = "write",
access_control: Optional[dict] = None,
) -> bool:
print("user_id", user_id, "type", type, "access_control", access_control)
if access_control is None:
return type == "read"
user_groups = Groups.get_groups_by_member_id(user_id)
user_group_ids = [group.id for group in user_groups]
permission_access = access_control.get(type, {})
permitted_group_ids = permission_access.get("group_ids", [])
permitted_user_ids = permission_access.get("user_ids", [])
return user_id in permitted_user_ids or any(
group_id in permitted_group_ids for group_id in user_group_ids
)

View File

@@ -7,7 +7,6 @@ from typing import Optional, Union, List, Dict
from open_webui.apps.webui.models.users import Users
from open_webui.apps.webui.models.groups import Groups
from open_webui.constants import ERROR_MESSAGES
from open_webui.env import WEBUI_SECRET_KEY
@@ -153,58 +152,3 @@ def get_admin_user(user=Depends(get_current_user)):
detail=ERROR_MESSAGES.ACCESS_PROHIBITED,
)
return user
def has_permission(
user_id: str,
permission_key: str,
default_permissions: Dict[str, bool] = {},
) -> bool:
"""
Check if a user has a specific permission by checking the group permissions
and falls back to default permissions if not found in any group.
Permission keys can be hierarchical and separated by dots ('.').
"""
def get_permission(permissions: Dict[str, bool], keys: List[str]) -> bool:
"""Traverse permissions dict using a list of keys (from dot-split permission_key)."""
for key in keys:
if key not in permissions:
return False # If any part of the hierarchy is missing, deny access
permissions = permissions[key] # Go one level deeper
return bool(permissions) # Return the boolean at the final level
permission_hierarchy = permission_key.split(".")
# Retrieve user group permissions
user_groups = Groups.get_groups_by_member_id(user_id)
for group in user_groups:
group_permissions = group.permissions
if get_permission(group_permissions, permission_hierarchy):
return True
# Check default permissions afterwards if the group permissions don't allow it
return get_permission(default_permissions, permission_hierarchy)
def has_access(
user_id: str,
type: str = "write",
access_control: Optional[dict] = None,
) -> bool:
print("user_id", user_id, "type", type, "access_control", access_control)
if access_control is None:
return type == "read"
user_groups = Groups.get_groups_by_member_id(user_id)
user_group_ids = [group.id for group in user_groups]
permission_access = access_control.get(type, {})
permitted_group_ids = permission_access.get("group_ids", [])
permitted_user_ids = permission_access.get("user_ids", [])
return user_id in permitted_user_ids or any(
group_id in permitted_group_ids for group_id in user_group_ids
)