This commit is contained in:
Timothy Jaeryang Baek
2026-01-23 04:31:39 +04:00
parent 655420fd25
commit 46cf40ec82

View File

@@ -92,10 +92,19 @@ signin_rate_limiter = RateLimiter(
)
def create_session_response(request: Request, user, db) -> dict:
def create_session_response(
request: Request, user, db, response: Response = None, set_cookie: bool = False
) -> dict:
"""
Create JWT token and build session response for a user.
Shared helper for signin, signup, ldap_auth, add_user, and token_exchange endpoints.
Args:
request: FastAPI request object
user: User object
db: Database session
response: FastAPI response object (required if set_cookie is True)
set_cookie: Whether to set the auth cookie on the response
"""
expires_delta = parse_duration(request.app.state.config.JWT_EXPIRES_IN)
expires_at = None
@@ -107,6 +116,21 @@ def create_session_response(request: Request, user, db) -> dict:
expires_delta=expires_delta,
)
if set_cookie and response:
datetime_expires_at = (
datetime.datetime.fromtimestamp(expires_at, datetime.timezone.utc)
if expires_at
else None
)
response.set_cookie(
key="token",
value=token,
expires=datetime_expires_at,
httponly=True,
samesite=WEBUI_AUTH_COOKIE_SAME_SITE,
secure=WEBUI_AUTH_COOKIE_SECURE,
)
user_permissions = get_permissions(
user.id, request.app.state.config.USER_PERMISSIONS, db=db
)
@@ -519,36 +543,6 @@ async def ldap_auth(
user = Auths.authenticate_user_by_email(email, db=db)
if user:
expires_delta = parse_duration(request.app.state.config.JWT_EXPIRES_IN)
expires_at = None
if expires_delta:
expires_at = int(time.time()) + int(expires_delta.total_seconds())
token = create_token(
data={"id": user.id},
expires_delta=expires_delta,
)
# Set the cookie token
response.set_cookie(
key="token",
value=token,
expires=(
datetime.datetime.fromtimestamp(
expires_at, datetime.timezone.utc
)
if expires_at
else None
),
httponly=True, # Ensures the cookie is not accessible via JavaScript
samesite=WEBUI_AUTH_COOKIE_SAME_SITE,
secure=WEBUI_AUTH_COOKIE_SECURE,
)
user_permissions = get_permissions(
user.id, request.app.state.config.USER_PERMISSIONS, db=db
)
if (
user.role != "admin"
and ENABLE_LDAP_GROUP_MANAGEMENT
@@ -564,17 +558,7 @@ async def ldap_auth(
except Exception as e:
log.error(f"Failed to sync groups for user {user.id}: {e}")
return {
"token": token,
"token_type": "Bearer",
"expires_at": expires_at,
"id": user.id,
"email": user.email,
"name": user.name,
"role": user.role,
"profile_image_url": user.profile_image_url,
"permissions": user_permissions,
}
return create_session_response(request, user, db, response, set_cookie=True)
else:
raise HTTPException(400, detail=ERROR_MESSAGES.INVALID_CRED)
else:
@@ -683,48 +667,7 @@ async def signin(
)
if user:
expires_delta = parse_duration(request.app.state.config.JWT_EXPIRES_IN)
expires_at = None
if expires_delta:
expires_at = int(time.time()) + int(expires_delta.total_seconds())
token = create_token(
data={"id": user.id},
expires_delta=expires_delta,
)
datetime_expires_at = (
datetime.datetime.fromtimestamp(expires_at, datetime.timezone.utc)
if expires_at
else None
)
# Set the cookie token
response.set_cookie(
key="token",
value=token,
expires=datetime_expires_at,
httponly=True, # Ensures the cookie is not accessible via JavaScript
samesite=WEBUI_AUTH_COOKIE_SAME_SITE,
secure=WEBUI_AUTH_COOKIE_SECURE,
)
user_permissions = get_permissions(
user.id, request.app.state.config.USER_PERMISSIONS, db=db
)
return {
"token": token,
"token_type": "Bearer",
"expires_at": expires_at,
"id": user.id,
"email": user.email,
"name": user.name,
"role": user.role,
"profile_image_url": user.profile_image_url,
"permissions": user_permissions,
}
return create_session_response(request, user, db, response, set_cookie=True)
else:
raise HTTPException(400, detail=ERROR_MESSAGES.INVALID_CRED)
@@ -785,32 +728,6 @@ async def signup(
)
if user:
expires_delta = parse_duration(request.app.state.config.JWT_EXPIRES_IN)
expires_at = None
if expires_delta:
expires_at = int(time.time()) + int(expires_delta.total_seconds())
token = create_token(
data={"id": user.id},
expires_delta=expires_delta,
)
datetime_expires_at = (
datetime.datetime.fromtimestamp(expires_at, datetime.timezone.utc)
if expires_at
else None
)
# Set the cookie token
response.set_cookie(
key="token",
value=token,
expires=datetime_expires_at,
httponly=True, # Ensures the cookie is not accessible via JavaScript
samesite=WEBUI_AUTH_COOKIE_SAME_SITE,
secure=WEBUI_AUTH_COOKIE_SECURE,
)
if request.app.state.config.WEBHOOK_URL:
await post_webhook(
request.app.state.WEBUI_NAME,
@@ -823,10 +740,6 @@ async def signup(
},
)
user_permissions = get_permissions(
user.id, request.app.state.config.USER_PERMISSIONS, db=db
)
if not has_users:
# Disable signup after the first user is created
request.app.state.config.ENABLE_SIGNUP = False
@@ -837,17 +750,7 @@ async def signup(
db=db,
)
return {
"token": token,
"token_type": "Bearer",
"expires_at": expires_at,
"id": user.id,
"email": user.email,
"name": user.name,
"role": user.role,
"profile_image_url": user.profile_image_url,
"permissions": user_permissions,
}
return create_session_response(request, user, db, response, set_cookie=True)
else:
raise HTTPException(500, detail=ERROR_MESSAGES.CREATE_USER_ERROR)
except HTTPException: