from typing import Generator, Optional from fastapi import Depends, HTTPException, status from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from sqlalchemy.orm import Session from jose import JWTError from app.database import SessionLocal from app.crud import user_crud from app.core.security import verify_token from app.models import User, UserSession from app.core.config import settings import logging logger = logging.getLogger(__name__) security = HTTPBearer(auto_error=False) def get_db() -> Generator: """ دریافت connection به پایگاه داده """ db = SessionLocal() try: yield db finally: db.close() async def get_current_user( credentials: Optional[HTTPAuthorizationCredentials] = Depends(security), db: Session = Depends(get_db) ) -> User: """ دریافت کاربر جاری از توکن """ if not credentials: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="اعتبارسنجی مورد نیاز است", headers={"WWW-Authenticate": "Bearer"}, ) try: # بررسی توکن payload = verify_token(credentials.credentials) if not payload: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="توکن نامعتبر", headers={"WWW-Authenticate": "Bearer"}, ) user_id: int = payload.get("sub") session_id: str = payload.get("session_id") if user_id is None or session_id is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="توکن نامعتبر", headers={"WWW-Authenticate": "Bearer"}, ) # بررسی سشن session = user_crud.validate_session(db, session_id) if not session: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="سشن منقضی شده است", headers={"WWW-Authenticate": "Bearer"}, ) # دریافت کاربر user = user_crud.get_by_id(db, user_id=user_id) if not user or not user.is_active: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="کاربر غیرفعال است", headers={"WWW-Authenticate": "Bearer"}, ) return user except JWTError: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="خطا در اعتبارسنجی", headers={"WWW-Authenticate": "Bearer"}, ) async def get_current_active_superuser( current_user: User = Depends(get_current_user), ) -> User: """ بررسی ادمین بودن کاربر """ if not current_user.is_superuser: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="دسترسی محدود", ) return current_user def get_client_info(request) -> dict: """ دریافت اطلاعات کلاینت """ return { "ip_address": request.client.host if request.client else "unknown", "user_agent": request.headers.get("user-agent", "unknown"), }