Files
game-marathon/backend/app/api/v1/users.py
2025-12-18 17:15:21 +07:00

304 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from fastapi import APIRouter, HTTPException, status, UploadFile, File, Response
from sqlalchemy import select, func
from app.api.deps import DbSession, CurrentUser
from app.core.config import settings
from app.core.security import verify_password, get_password_hash
from app.models import User, Participant, Assignment, Marathon
from app.models.assignment import AssignmentStatus
from app.models.marathon import MarathonStatus
from app.schemas import (
UserPublic, UserPrivate, UserUpdate, TelegramLink, MessageResponse,
PasswordChange, UserStats, UserProfilePublic,
)
from app.services.storage import storage_service
router = APIRouter(prefix="/users", tags=["users"])
@router.get("/{user_id}", response_model=UserPublic)
async def get_user(user_id: int, db: DbSession, current_user: CurrentUser):
"""Get user profile. Requires authentication."""
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found",
)
return UserPublic.model_validate(user)
@router.get("/{user_id}/avatar")
async def get_user_avatar(user_id: int, db: DbSession):
"""Stream user avatar from storage"""
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=404, detail="User not found")
if not user.avatar_path:
raise HTTPException(status_code=404, detail="User has no avatar")
# Get file from storage
file_data = await storage_service.get_file(user.avatar_path, "avatars")
if not file_data:
raise HTTPException(status_code=404, detail="Avatar not found in storage")
content, content_type = file_data
return Response(
content=content,
media_type=content_type,
headers={
"Cache-Control": "public, max-age=3600",
}
)
@router.patch("/me", response_model=UserPrivate)
async def update_me(data: UserUpdate, current_user: CurrentUser, db: DbSession):
"""Update current user's profile"""
if data.nickname is not None:
current_user.nickname = data.nickname
await db.commit()
await db.refresh(current_user)
return UserPrivate.model_validate(current_user)
@router.post("/me/avatar", response_model=UserPrivate)
async def upload_avatar(
current_user: CurrentUser,
db: DbSession,
file: UploadFile = File(...),
):
"""Upload current user's avatar"""
# Validate file
if not file.content_type.startswith("image/"):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="File must be an image",
)
contents = await file.read()
if len(contents) > settings.MAX_UPLOAD_SIZE:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"File too large. Maximum size is {settings.MAX_UPLOAD_SIZE // 1024 // 1024} MB",
)
# Get file extension
ext = file.filename.split(".")[-1].lower() if file.filename else "jpg"
if ext not in settings.ALLOWED_IMAGE_EXTENSIONS:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid file type. Allowed: {settings.ALLOWED_IMAGE_EXTENSIONS}",
)
# Delete old avatar if exists
if current_user.avatar_path:
await storage_service.delete_file(current_user.avatar_path)
# Upload file
filename = storage_service.generate_filename(current_user.id, file.filename)
file_path = await storage_service.upload_file(
content=contents,
folder="avatars",
filename=filename,
content_type=file.content_type or "image/jpeg",
)
# Update user
current_user.avatar_path = file_path
await db.commit()
await db.refresh(current_user)
return UserPrivate.model_validate(current_user)
@router.post("/me/telegram", response_model=MessageResponse)
async def link_telegram(
data: TelegramLink,
current_user: CurrentUser,
db: DbSession,
):
# Check if telegram_id already linked to another user
result = await db.execute(
select(User).where(User.telegram_id == data.telegram_id, User.id != current_user.id)
)
if result.scalar_one_or_none():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="This Telegram account is already linked to another user",
)
current_user.telegram_id = data.telegram_id
current_user.telegram_username = data.telegram_username
await db.commit()
return MessageResponse(message="Telegram account linked successfully")
@router.post("/me/telegram/unlink", response_model=MessageResponse)
async def unlink_telegram(
current_user: CurrentUser,
db: DbSession,
):
if not current_user.telegram_id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Telegram account is not linked",
)
current_user.telegram_id = None
current_user.telegram_username = None
await db.commit()
return MessageResponse(message="Telegram account unlinked successfully")
@router.post("/me/password", response_model=MessageResponse)
async def change_password(
data: PasswordChange,
current_user: CurrentUser,
db: DbSession,
):
"""Смена пароля текущего пользователя"""
if not verify_password(data.current_password, current_user.password_hash):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Неверный текущий пароль",
)
if data.current_password == data.new_password:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Новый пароль должен отличаться от текущего",
)
current_user.password_hash = get_password_hash(data.new_password)
await db.commit()
return MessageResponse(message="Пароль успешно изменен")
@router.get("/me/stats", response_model=UserStats)
async def get_my_stats(current_user: CurrentUser, db: DbSession):
"""Получить свою статистику"""
return await _get_user_stats(current_user.id, db)
@router.get("/{user_id}/stats", response_model=UserStats)
async def get_user_stats(user_id: int, db: DbSession, current_user: CurrentUser):
"""Получить статистику пользователя. Requires authentication."""
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found",
)
return await _get_user_stats(user_id, db)
@router.get("/{user_id}/profile", response_model=UserProfilePublic)
async def get_user_profile(user_id: int, db: DbSession, current_user: CurrentUser):
"""Получить публичный профиль пользователя со статистикой. Requires authentication."""
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found",
)
stats = await _get_user_stats(user_id, db)
return UserProfilePublic(
id=user.id,
nickname=user.nickname,
avatar_url=user.avatar_url,
created_at=user.created_at,
stats=stats,
)
async def _get_user_stats(user_id: int, db) -> UserStats:
"""Вспомогательная функция для подсчета статистики пользователя"""
# 1. Количество марафонов (участий)
marathons_result = await db.execute(
select(func.count(Participant.id))
.where(Participant.user_id == user_id)
)
marathons_count = marathons_result.scalar() or 0
# 2. Количество побед (1 место в завершенных марафонах)
wins_count = 0
user_participations = await db.execute(
select(Participant)
.join(Marathon, Marathon.id == Participant.marathon_id)
.where(
Participant.user_id == user_id,
Marathon.status == MarathonStatus.FINISHED.value
)
)
for participation in user_participations.scalars():
# Для каждого марафона проверяем, был ли пользователь первым
max_points_result = await db.execute(
select(func.max(Participant.total_points))
.where(Participant.marathon_id == participation.marathon_id)
)
max_points = max_points_result.scalar() or 0
if participation.total_points == max_points and max_points > 0:
# Проверяем что он единственный с такими очками (не ничья)
count_with_max = await db.execute(
select(func.count(Participant.id))
.where(
Participant.marathon_id == participation.marathon_id,
Participant.total_points == max_points
)
)
if count_with_max.scalar() == 1:
wins_count += 1
# 3. Выполненных заданий
completed_result = await db.execute(
select(func.count(Assignment.id))
.join(Participant, Participant.id == Assignment.participant_id)
.where(
Participant.user_id == user_id,
Assignment.status == AssignmentStatus.COMPLETED.value
)
)
completed_assignments = completed_result.scalar() or 0
# 4. Всего очков заработано
points_result = await db.execute(
select(func.coalesce(func.sum(Assignment.points_earned), 0))
.join(Participant, Participant.id == Assignment.participant_id)
.where(
Participant.user_id == user_id,
Assignment.status == AssignmentStatus.COMPLETED.value
)
)
total_points_earned = points_result.scalar() or 0
return UserStats(
marathons_count=marathons_count,
wins_count=wins_count,
completed_assignments=completed_assignments,
total_points_earned=total_points_earned,
)