wip: access control backend

This commit is contained in:
Timothy Jaeryang Baek
2024-11-15 01:29:07 -08:00
parent b80ec76435
commit 2ab5b2fd71
8 changed files with 282 additions and 52 deletions

View File

@@ -1,12 +1,18 @@
import logging
import uuid
from datetime import UTC, datetime, timedelta
from typing import Optional, Union
import jwt
from datetime import UTC, datetime, timedelta
from typing import Optional, Union, List, Dict
from open_webui.apps.webui.models.users import Users
from open_webui.apps.webui.models.groups import Groups
from open_webui.constants import ERROR_MESSAGES
from open_webui.env import WEBUI_SECRET_KEY
from fastapi import Depends, HTTPException, Request, Response, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from passlib.context import CryptContext
@@ -147,3 +153,57 @@ def get_admin_user(user=Depends(get_current_user)):
detail=ERROR_MESSAGES.ACCESS_PROHIBITED,
)
return user
def has_permission(
user_id: str,
permission_key: str,
default_permissions: Dict[str, bool] = {},
) -> bool:
"""
Check if a user has a specific permission by checking the group permissions
and falls back to default permissions if not found in any group.
Permission keys can be hierarchical and separated by dots ('.').
"""
def get_permission(permissions: Dict[str, bool], keys: List[str]) -> bool:
"""Traverse permissions dict using a list of keys (from dot-split permission_key)."""
for key in keys:
if key not in permissions:
return False # If any part of the hierarchy is missing, deny access
permissions = permissions[key] # Go one level deeper
return bool(permissions) # Return the boolean at the final level
permission_hierarchy = permission_key.split(".")
# Retrieve user group permissions
user_groups = Groups.get_groups_by_member_id(user_id)
for group in user_groups:
group_permissions = group.permissions
if get_permission(group_permissions, permission_hierarchy):
return True
# Check default permissions afterwards if the group permissions don't allow it
return get_permission(default_permissions, permission_hierarchy)
def has_access(
user_id: str,
action: str = "write",
access_control: Optional[dict] = None,
) -> bool:
if access_control is None:
return action == "read"
user_groups = Groups.get_groups_by_member_id(user_id)
user_group_ids = [group.id for group in user_groups]
permission_access = access_control.get(action, {})
permitted_group_ids = permission_access.get("group_ids", [])
permitted_user_ids = permission_access.get("user_ids", [])
return user_id in permitted_user_ids or any(
group_id in permitted_group_ids for group_id in user_group_ids
)