Files
game-marathon/backend/app/api/deps.py
2025-12-19 02:07:25 +07:00

227 lines
7.2 KiB
Python

from typing import Annotated
from datetime import datetime
from fastapi import Depends, HTTPException, status, Header
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.config import settings
from app.core.database import get_db
from app.core.security import decode_access_token
from app.models import User, Participant, Marathon, UserRole, ParticipantRole, AdminLog, AdminActionType
security = HTTPBearer()
async def get_current_user(
credentials: Annotated[HTTPAuthorizationCredentials, Depends(security)],
db: Annotated[AsyncSession, Depends(get_db)],
) -> User:
token = credentials.credentials
payload = decode_access_token(token)
if payload is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid or expired token",
headers={"WWW-Authenticate": "Bearer"},
)
user_id = payload.get("sub")
if user_id is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token payload",
)
result = await db.execute(select(User).where(User.id == int(user_id)))
user = result.scalar_one_or_none()
if user is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User not found",
)
# Check if user is banned
if user.is_banned:
# Auto-unban if ban expired
if user.banned_until and datetime.utcnow() > user.banned_until:
# Save ban info for logging before clearing
old_ban_reason = user.ban_reason
old_banned_until = user.banned_until.isoformat() if user.banned_until else None
user.is_banned = False
user.banned_at = None
user.banned_until = None
user.banned_by_id = None
user.ban_reason = None
# Log system auto-unban action
log = AdminLog(
admin_id=None, # System action, no admin
action=AdminActionType.USER_AUTO_UNBAN.value,
target_type="user",
target_id=user.id,
details={
"nickname": user.nickname,
"reason": old_ban_reason,
"banned_until": old_banned_until,
"system": True,
},
ip_address=None,
)
db.add(log)
await db.commit()
await db.refresh(user)
else:
# Still banned - return ban info in error
ban_info = {
"banned_at": user.banned_at.isoformat() if user.banned_at else None,
"banned_until": user.banned_until.isoformat() if user.banned_until else None,
"reason": user.ban_reason,
}
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=ban_info,
)
return user
def require_admin(user: User) -> User:
"""Check if user is admin"""
if not user.is_admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Admin access required",
)
return user
def require_admin_with_2fa(user: User) -> User:
"""Check if user is admin with Telegram linked (2FA enabled)"""
if not user.is_admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Admin access required",
)
if not user.telegram_id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Для доступа к админ-панели необходимо привязать Telegram в профиле",
)
return user
async def get_participant(
db: AsyncSession,
user_id: int,
marathon_id: int,
) -> Participant | None:
"""Get participant record for user in marathon"""
result = await db.execute(
select(Participant).where(
Participant.user_id == user_id,
Participant.marathon_id == marathon_id,
)
)
return result.scalar_one_or_none()
async def require_participant(
db: AsyncSession,
user_id: int,
marathon_id: int,
) -> Participant:
"""Require user to be participant of marathon"""
participant = await get_participant(db, user_id, marathon_id)
if not participant:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You are not a participant of this marathon",
)
return participant
async def require_organizer(
db: AsyncSession,
user: User,
marathon_id: int,
) -> Participant:
"""Require user to be organizer of marathon (or admin)"""
if user.is_admin:
# Admins can act as organizers
participant = await get_participant(db, user.id, marathon_id)
if participant:
return participant
# Create virtual participant for admin
result = await db.execute(select(Marathon).where(Marathon.id == marathon_id))
marathon = result.scalar_one_or_none()
if not marathon:
raise HTTPException(status_code=404, detail="Marathon not found")
# Return a temporary object for admin
return Participant(
user_id=user.id,
marathon_id=marathon_id,
role=ParticipantRole.ORGANIZER.value
)
participant = await get_participant(db, user.id, marathon_id)
if not participant:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You are not a participant of this marathon",
)
if not participant.is_organizer:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Only organizers can perform this action",
)
return participant
async def require_creator(
db: AsyncSession,
user: User,
marathon_id: int,
) -> Marathon:
"""Require user to be creator of marathon (or admin)"""
result = await db.execute(select(Marathon).where(Marathon.id == marathon_id))
marathon = result.scalar_one_or_none()
if not marathon:
raise HTTPException(status_code=404, detail="Marathon not found")
if not user.is_admin and marathon.creator_id != user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Only the creator can perform this action",
)
return marathon
# Type aliases for cleaner dependency injection
CurrentUser = Annotated[User, Depends(get_current_user)]
DbSession = Annotated[AsyncSession, Depends(get_db)]
async def verify_bot_secret(
x_bot_secret: str | None = Header(None, alias="X-Bot-Secret")
) -> None:
"""Verify that request comes from trusted bot using secret key."""
if not settings.BOT_API_SECRET:
# If secret is not configured, skip check (for development)
return
if x_bot_secret != settings.BOT_API_SECRET:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Invalid or missing bot secret"
)
BotSecretDep = Annotated[None, Depends(verify_bot_secret)]