hyf-backend/th_agenter/core/simple_permissions.py

107 lines
3.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""简化的权限检查系统."""
from functools import wraps
from typing import Optional
from fastapi import HTTPException, Depends
from loguru import logger
from sqlalchemy.orm import Session
from ..db.database import get_session
from ..models.user import User
from ..models.permission import Role
from ..services.auth import AuthService
async def is_super_admin(user: User, session: Session) -> bool:
"""检查用户是否为超级管理员."""
session.desc = f"检查用户 {user.id} 是否为超级管理员"
if not user or not user.is_active:
session.desc = f"用户 {user.id} 不是活跃状态"
return False
try:
# 直接使用提供的session查询避免MissingGreenlet错误
from sqlalchemy import select
from ..models.permission import UserRole, Role
stmt = select(UserRole).join(Role).filter(
UserRole.user_id == user.id,
Role.code == 'SUPER_ADMIN',
Role.is_active == True
)
user_role = await session.execute(stmt)
result = user_role.scalar_one_or_none() is not None
session.desc = f"用户 {user.id} 超级管理员角色查询结果: {result}"
return result
except Exception as e:
# 如果调用失败记录错误并返回False
session.desc = f"EXCEPTION: 用户 {user.id} 超级管理员角色查询失败: {str(e)}"
logger.error(f"检查用户 {user.id} 超级管理员角色失败: {str(e)}")
return False
async def require_super_admin(
current_user: User = Depends(AuthService.get_current_user),
session: Session = Depends(get_session)
) -> User:
"""要求超级管理员权限的依赖项."""
if not await is_super_admin(current_user, session):
raise HTTPException(
status_code=403,
detail="需要超级管理员权限"
)
return current_user
def require_authenticated_user(
current_user: User = Depends(AuthService.get_current_user)
) -> User:
"""要求已认证用户的依赖项."""
if not current_user or not current_user.is_active:
raise HTTPException(
status_code=401,
detail="需要登录"
)
return current_user
class SimplePermissionChecker:
"""简化的权限检查器."""
def __init__(self, db: Session):
self.db = db
async def check_super_admin(self, user: User) -> bool:
"""检查是否为超级管理员."""
return await is_super_admin(user, self.db)
async def check_user_access(self, user: User, target_user_id: int) -> bool:
"""检查用户访问权限(自己或超级管理员)."""
if not user or not user.is_active:
return False
# 超级管理员可以访问所有用户
if await self.check_super_admin(user):
return True
# 用户只能访问自己的信息
return user.id == target_user_id
# 权限装饰器
def super_admin_required(func):
"""超级管理员权限装饰器."""
@wraps(func)
def wrapper(*args, **kwargs):
# 这个装饰器主要用于服务层实际的FastAPI依赖项检查在路由层
return func(*args, **kwargs)
return wrapper
def authenticated_required(func):
"""认证用户权限装饰器."""
@wraps(func)
def wrapper(*args, **kwargs):
# 这个装饰器主要用于服务层实际的FastAPI依赖项检查在路由层
return func(*args, **kwargs)
return wrapper