import logging import time import uuid from typing import Optional from sqlalchemy.orm import Session from open_webui.internal.db import Base, get_db_context from pydantic import BaseModel, ConfigDict from sqlalchemy import BigInteger, Column, Text, UniqueConstraint, or_, and_ from sqlalchemy.dialects.postgresql import JSONB log = logging.getLogger(__name__) #################### # AccessGrant DB Schema #################### class AccessGrant(Base): __tablename__ = "access_grant" id = Column(Text, primary_key=True) resource_type = Column( Text, nullable=False ) # "knowledge", "model", "prompt", "tool", "note", "channel", "file" resource_id = Column(Text, nullable=False) principal_type = Column(Text, nullable=False) # "user" or "group" principal_id = Column( Text, nullable=False ) # user_id, group_id, or "*" (wildcard for public) permission = Column(Text, nullable=False) # "read" or "write" created_at = Column(BigInteger, nullable=False) __table_args__ = ( UniqueConstraint( "resource_type", "resource_id", "principal_type", "principal_id", "permission", name="uq_access_grant_grant", ), ) class AccessGrantModel(BaseModel): model_config = ConfigDict(from_attributes=True) id: str resource_type: str resource_id: str principal_type: str principal_id: str permission: str created_at: int class AccessGrantResponse(BaseModel): """Slim grant model for API responses — resource context is implicit from the parent.""" id: str principal_type: str principal_id: str permission: str @classmethod def from_grant(cls, grant: "AccessGrantModel") -> "AccessGrantResponse": return cls( id=grant.id, principal_type=grant.principal_type, principal_id=grant.principal_id, permission=grant.permission, ) #################### # Conversion utilities #################### def access_control_to_grants( resource_type: str, resource_id: str, access_control: Optional[dict], ) -> list[dict]: """ Convert an old-style access_control JSON dict to a flat list of grant dicts. Semantics: - None → public read (user:* read) — except files which are private - {} → private/owner-only (no grants) - {read: {group_ids, user_ids}, write: {group_ids, user_ids}} → specific grants Returns a list of dicts with keys: resource_type, resource_id, principal_type, principal_id, permission """ grants = [] if access_control is None: # NULL → public read (user:* for read) # Exception: files with NULL are private (owner-only), no grants needed if resource_type != "file": grants.append( { "resource_type": resource_type, "resource_id": resource_id, "principal_type": "user", "principal_id": "*", "permission": "read", } ) return grants # {} → private/owner-only, no grants if not access_control: return grants # Parse structured permissions for permission in ["read", "write"]: perm_data = access_control.get(permission, {}) if not perm_data: continue for group_id in perm_data.get("group_ids", []): grants.append( { "resource_type": resource_type, "resource_id": resource_id, "principal_type": "group", "principal_id": group_id, "permission": permission, } ) for user_id in perm_data.get("user_ids", []): grants.append( { "resource_type": resource_type, "resource_id": resource_id, "principal_type": "user", "principal_id": user_id, "permission": permission, } ) return grants def normalize_access_grants(access_grants: Optional[list]) -> list[dict]: """ Normalize direct access_grants payloads from API forms. Keeps only valid grants and removes duplicates by (principal_type, principal_id, permission). """ if not access_grants: return [] deduped = {} for grant in access_grants: if isinstance(grant, BaseModel): grant = grant.model_dump() if not isinstance(grant, dict): continue principal_type = grant.get("principal_type") principal_id = grant.get("principal_id") permission = grant.get("permission") if principal_type not in ("user", "group"): continue if permission not in ("read", "write"): continue if not isinstance(principal_id, str) or not principal_id: continue key = (principal_type, principal_id, permission) deduped[key] = { "id": ( grant.get("id") if isinstance(grant.get("id"), str) and grant.get("id") else str(uuid.uuid4()) ), "principal_type": principal_type, "principal_id": principal_id, "permission": permission, } return list(deduped.values()) def has_public_read_access_grant(access_grants: Optional[list]) -> bool: """ Returns True when a direct grant list includes wildcard public-read. """ for grant in normalize_access_grants(access_grants): if ( grant["principal_type"] == "user" and grant["principal_id"] == "*" and grant["permission"] == "read" ): return True return False def has_user_access_grant(access_grants: Optional[list]) -> bool: """ Returns True when a direct grant list includes any non-wildcard user grant. """ for grant in normalize_access_grants(access_grants): if grant["principal_type"] == "user" and grant["principal_id"] != "*": return True return False def strip_user_access_grants(access_grants: Optional[list]) -> list: """ Remove all non-wildcard user grants from the list. Keeps group grants and the public wildcard (user:*) intact. """ if not access_grants: return [] return [ grant for grant in access_grants if not ( (grant.get("principal_type") if isinstance(grant, dict) else getattr(grant, "principal_type", None)) == "user" and (grant.get("principal_id") if isinstance(grant, dict) else getattr(grant, "principal_id", None)) != "*" ) ] def grants_to_access_control(grants: list) -> Optional[dict]: """ Convert a list of grant objects (AccessGrantModel or AccessGrantResponse) back to the old-style access_control JSON dict for backward compatibility. Semantics: - [] (empty) → {} (private/owner-only) - Contains user:*:read → None (public), but write grants are preserved - Otherwise → {read: {group_ids, user_ids}, write: {group_ids, user_ids}} Note: "public" (user:*:read) still allows additional write permissions to coexist. When the wildcard read is present the function returns None for the legacy dict, so callers that need write info should inspect the grants list directly. """ if not grants: return {} # No grants = private/owner-only result = { "read": {"group_ids": [], "user_ids": []}, "write": {"group_ids": [], "user_ids": []}, } is_public = False for grant in grants: if ( grant.principal_type == "user" and grant.principal_id == "*" and grant.permission == "read" ): is_public = True continue # Don't add wildcard to user_ids list if grant.permission not in ("read", "write"): continue if grant.principal_type == "group": if grant.principal_id not in result[grant.permission]["group_ids"]: result[grant.permission]["group_ids"].append(grant.principal_id) elif grant.principal_type == "user": if grant.principal_id not in result[grant.permission]["user_ids"]: result[grant.permission]["user_ids"].append(grant.principal_id) if is_public: return None # Public read access return result #################### # Table Operations #################### class AccessGrantsTable: def grant_access( self, resource_type: str, resource_id: str, principal_type: str, principal_id: str, permission: str, db: Optional[Session] = None, ) -> Optional[AccessGrantModel]: """Add a single access grant. Idempotent (ignores duplicates).""" with get_db_context(db) as db: # Check for existing grant existing = ( db.query(AccessGrant) .filter_by( resource_type=resource_type, resource_id=resource_id, principal_type=principal_type, principal_id=principal_id, permission=permission, ) .first() ) if existing: return AccessGrantModel.model_validate(existing) grant = AccessGrant( id=str(uuid.uuid4()), resource_type=resource_type, resource_id=resource_id, principal_type=principal_type, principal_id=principal_id, permission=permission, created_at=int(time.time()), ) db.add(grant) db.commit() db.refresh(grant) return AccessGrantModel.model_validate(grant) def revoke_access( self, resource_type: str, resource_id: str, principal_type: str, principal_id: str, permission: str, db: Optional[Session] = None, ) -> bool: """Remove a single access grant.""" with get_db_context(db) as db: deleted = ( db.query(AccessGrant) .filter_by( resource_type=resource_type, resource_id=resource_id, principal_type=principal_type, principal_id=principal_id, permission=permission, ) .delete() ) db.commit() return deleted > 0 def revoke_all_access( self, resource_type: str, resource_id: str, db: Optional[Session] = None, ) -> int: """Remove all access grants for a resource.""" with get_db_context(db) as db: deleted = ( db.query(AccessGrant) .filter_by( resource_type=resource_type, resource_id=resource_id, ) .delete() ) db.commit() return deleted def set_access_control( self, resource_type: str, resource_id: str, access_control: Optional[dict], db: Optional[Session] = None, ) -> list[AccessGrantModel]: """ Replace all grants for a resource from an access_control JSON dict. This is the primary bridge for backward compat with the frontend. """ with get_db_context(db) as db: # Delete all existing grants for this resource db.query(AccessGrant).filter_by( resource_type=resource_type, resource_id=resource_id, ).delete() # Convert JSON to grant dicts grant_dicts = access_control_to_grants( resource_type, resource_id, access_control ) # Insert new grants results = [] for grant_dict in grant_dicts: grant = AccessGrant( id=str(uuid.uuid4()), **grant_dict, created_at=int(time.time()), ) db.add(grant) results.append(grant) db.commit() return [AccessGrantModel.model_validate(g) for g in results] def set_access_grants( self, resource_type: str, resource_id: str, access_grants: Optional[list], db: Optional[Session] = None, ) -> list[AccessGrantModel]: """ Replace all grants for a resource from a direct access_grants list. """ with get_db_context(db) as db: db.query(AccessGrant).filter_by( resource_type=resource_type, resource_id=resource_id, ).delete() normalized_grants = normalize_access_grants(access_grants) results = [] for grant_dict in normalized_grants: grant = AccessGrant( id=str(uuid.uuid4()), resource_type=resource_type, resource_id=resource_id, principal_type=grant_dict["principal_type"], principal_id=grant_dict["principal_id"], permission=grant_dict["permission"], created_at=int(time.time()), ) db.add(grant) results.append(grant) db.commit() return [AccessGrantModel.model_validate(g) for g in results] def get_access_control( self, resource_type: str, resource_id: str, db: Optional[Session] = None, ) -> Optional[dict]: """ Reconstruct the old-style access_control JSON dict from grants. For backward compat with the frontend. """ with get_db_context(db) as db: grants = ( db.query(AccessGrant) .filter_by( resource_type=resource_type, resource_id=resource_id, ) .all() ) grant_models = [AccessGrantModel.model_validate(g) for g in grants] return grants_to_access_control(grant_models) def get_grants_by_resource( self, resource_type: str, resource_id: str, db: Optional[Session] = None, ) -> list[AccessGrantModel]: """Get all grants for a specific resource.""" with get_db_context(db) as db: grants = ( db.query(AccessGrant) .filter_by( resource_type=resource_type, resource_id=resource_id, ) .all() ) return [AccessGrantModel.model_validate(g) for g in grants] def get_grants_by_resources( self, resource_type: str, resource_ids: list[str], db: Optional[Session] = None, ) -> dict[str, list[AccessGrantModel]]: """Batch-fetch grants for multiple resources. Returns {resource_id: [grants]}.""" if not resource_ids: return {} with get_db_context(db) as db: grants = ( db.query(AccessGrant) .filter( AccessGrant.resource_type == resource_type, AccessGrant.resource_id.in_(resource_ids), ) .all() ) result: dict[str, list[AccessGrantModel]] = { rid: [] for rid in resource_ids } for g in grants: result[g.resource_id].append(AccessGrantModel.model_validate(g)) return result def has_access( self, user_id: str, resource_type: str, resource_id: str, permission: str = "read", user_group_ids: Optional[set[str]] = None, db: Optional[Session] = None, ) -> bool: """ Check if a user has the specified permission on a resource. Access is granted if any of the following is true: - There's a grant for user:* (public) with the requested permission - There's a grant for the specific user with the requested permission - There's a grant for any of the user's groups with the requested permission """ with get_db_context(db) as db: # Build conditions for matching grants conditions = [ # Public access and_( AccessGrant.principal_type == "user", AccessGrant.principal_id == "*", ), # Direct user access and_( AccessGrant.principal_type == "user", AccessGrant.principal_id == user_id, ), ] # Group access if user_group_ids is None: from open_webui.models.groups import Groups user_groups = Groups.get_groups_by_member_id(user_id, db=db) user_group_ids = {group.id for group in user_groups} if user_group_ids: conditions.append( and_( AccessGrant.principal_type == "group", AccessGrant.principal_id.in_(user_group_ids), ) ) exists = ( db.query(AccessGrant) .filter( AccessGrant.resource_type == resource_type, AccessGrant.resource_id == resource_id, AccessGrant.permission == permission, or_(*conditions), ) .first() ) return exists is not None def get_accessible_resource_ids( self, user_id: str, resource_type: str, resource_ids: list[str], permission: str = "read", user_group_ids: Optional[set[str]] = None, db: Optional[Session] = None, ) -> set[str]: """ Batch check: return the subset of resource_ids that the user can access. This replaces calling has_access() in a loop (N+1) with a single query. """ if not resource_ids: return set() with get_db_context(db) as db: conditions = [ and_( AccessGrant.principal_type == "user", AccessGrant.principal_id == "*", ), and_( AccessGrant.principal_type == "user", AccessGrant.principal_id == user_id, ), ] if user_group_ids is None: from open_webui.models.groups import Groups user_groups = Groups.get_groups_by_member_id(user_id, db=db) user_group_ids = {group.id for group in user_groups} if user_group_ids: conditions.append( and_( AccessGrant.principal_type == "group", AccessGrant.principal_id.in_(user_group_ids), ) ) rows = ( db.query(AccessGrant.resource_id) .filter( AccessGrant.resource_type == resource_type, AccessGrant.resource_id.in_(resource_ids), AccessGrant.permission == permission, or_(*conditions), ) .distinct() .all() ) return {row[0] for row in rows} def get_users_with_access( self, resource_type: str, resource_id: str, permission: str = "read", db: Optional[Session] = None, ) -> list: """ Get all users who have the specified permission on a resource. Returns a list of UserModel instances. """ from open_webui.models.users import Users, UserModel from open_webui.models.groups import Groups with get_db_context(db) as db: grants = ( db.query(AccessGrant) .filter_by( resource_type=resource_type, resource_id=resource_id, permission=permission, ) .all() ) # Check for public access for grant in grants: if grant.principal_type == "user" and grant.principal_id == "*": result = Users.get_users(filter={"roles": ["!pending"]}, db=db) return result.get("users", []) user_ids_with_access = set() for grant in grants: if grant.principal_type == "user": user_ids_with_access.add(grant.principal_id) elif grant.principal_type == "group": group_user_ids = Groups.get_group_user_ids_by_id( grant.principal_id, db=db ) if group_user_ids: user_ids_with_access.update(group_user_ids) if not user_ids_with_access: return [] return Users.get_users_by_user_ids(list(user_ids_with_access), db=db) def has_permission_filter( self, db, query, DocumentModel, filter: dict, resource_type: str, permission: str = "read", ): """ Apply access control filtering to a SQLAlchemy query by JOINing with access_grant. This replaces the old JSON-column-based filtering with a proper relational JOIN. """ group_ids = filter.get("group_ids", []) user_id = filter.get("user_id") if permission == "read_only": return self._has_read_only_permission_filter( db, query, DocumentModel, filter, resource_type ) # Build principal conditions principal_conditions = [] if group_ids or user_id: # Public access: user:* read principal_conditions.append( and_( AccessGrant.principal_type == "user", AccessGrant.principal_id == "*", ) ) if user_id: # Owner always has access principal_conditions.append(DocumentModel.user_id == user_id) # Direct user grant principal_conditions.append( and_( AccessGrant.principal_type == "user", AccessGrant.principal_id == user_id, ) ) if group_ids: # Group grants principal_conditions.append( and_( AccessGrant.principal_type == "group", AccessGrant.principal_id.in_(group_ids), ) ) if not principal_conditions: return query # LEFT JOIN access_grant and filter # We use a subquery approach to avoid duplicates from multiple matching grants from sqlalchemy import exists as sa_exists, select grant_exists = ( select(AccessGrant.id) .where( AccessGrant.resource_type == resource_type, AccessGrant.resource_id == DocumentModel.id, AccessGrant.permission == permission, or_( and_( AccessGrant.principal_type == "user", AccessGrant.principal_id == "*", ), *( [ and_( AccessGrant.principal_type == "user", AccessGrant.principal_id == user_id, ) ] if user_id else [] ), *( [ and_( AccessGrant.principal_type == "group", AccessGrant.principal_id.in_(group_ids), ) ] if group_ids else [] ), ), ) .correlate(DocumentModel) .exists() ) # Owner OR has a matching grant owner_or_grant = [grant_exists] if user_id: owner_or_grant.append(DocumentModel.user_id == user_id) query = query.filter(or_(*owner_or_grant)) return query def _has_read_only_permission_filter( self, db, query, DocumentModel, filter: dict, resource_type: str, ): """ Filter for items where user has read BUT NOT write access. Public items are NOT considered read_only. """ group_ids = filter.get("group_ids", []) user_id = filter.get("user_id") from sqlalchemy import exists as sa_exists, select # Has read grant (not public) read_grant_exists = ( select(AccessGrant.id) .where( AccessGrant.resource_type == resource_type, AccessGrant.resource_id == DocumentModel.id, AccessGrant.permission == "read", or_( *( [ and_( AccessGrant.principal_type == "user", AccessGrant.principal_id == user_id, ) ] if user_id else [] ), *( [ and_( AccessGrant.principal_type == "group", AccessGrant.principal_id.in_(group_ids), ) ] if group_ids else [] ), ), ) .correlate(DocumentModel) .exists() ) # Does NOT have write grant write_grant_exists = ( select(AccessGrant.id) .where( AccessGrant.resource_type == resource_type, AccessGrant.resource_id == DocumentModel.id, AccessGrant.permission == "write", or_( *( [ and_( AccessGrant.principal_type == "user", AccessGrant.principal_id == user_id, ) ] if user_id else [] ), *( [ and_( AccessGrant.principal_type == "group", AccessGrant.principal_id.in_(group_ids), ) ] if group_ids else [] ), ), ) .correlate(DocumentModel) .exists() ) # Is NOT public public_grant_exists = ( select(AccessGrant.id) .where( AccessGrant.resource_type == resource_type, AccessGrant.resource_id == DocumentModel.id, AccessGrant.permission == "read", AccessGrant.principal_type == "user", AccessGrant.principal_id == "*", ) .correlate(DocumentModel) .exists() ) conditions = [read_grant_exists, ~write_grant_exists, ~public_grant_exists] # Not owner if user_id: conditions.append(DocumentModel.user_id != user_id) query = query.filter(and_(*conditions)) return query AccessGrants = AccessGrantsTable()