Files
game-marathon/backend/app/api/v1/shop.py
mamonov.ep 58c390c768 Исправлена ошибка MissingGreenlet при использовании Wild Card
Добавлена загрузка bonus_assignments через selectinload для wild_card,
чтобы избежать lazy loading в асинхронном контексте.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-16 18:54:46 +03:00

905 lines
32 KiB
Python

"""
Shop API endpoints - catalog, purchases, inventory, cosmetics, consumables
"""
from datetime import datetime
from fastapi import APIRouter, HTTPException
from sqlalchemy import select
from sqlalchemy.orm import selectinload
from app.api.deps import CurrentUser, DbSession, require_participant, require_admin_with_2fa
from app.models import (
User, Marathon, Participant, Assignment, AssignmentStatus,
ShopItem, UserInventory, CoinTransaction, ShopItemType,
CertificationStatus, Challenge, Game,
)
from app.schemas import (
ShopItemResponse, ShopItemCreate, ShopItemUpdate,
InventoryItemResponse, PurchaseRequest, PurchaseResponse,
UseConsumableRequest, UseConsumableResponse,
EquipItemRequest, EquipItemResponse,
CoinTransactionResponse, CoinsBalanceResponse, AdminCoinsRequest,
CertificationRequestSchema, CertificationReviewRequest, CertificationStatusResponse,
ConsumablesStatusResponse, MessageResponse, SwapCandidate,
AdminGrantItemRequest,
)
from app.schemas.user import UserPublic
from app.services.shop import shop_service
from app.services.coins import coins_service
from app.services.consumables import consumables_service
from app.services.telegram_notifier import telegram_notifier
router = APIRouter(prefix="/shop", tags=["shop"])
# === Catalog ===
@router.get("/items", response_model=list[ShopItemResponse])
async def get_shop_items(
current_user: CurrentUser,
db: DbSession,
item_type: str | None = None,
include_unavailable: bool = False,
):
"""Get list of shop items"""
items = await shop_service.get_available_items(db, item_type, include_unavailable)
# Get user's inventory to mark owned/equipped items
user_inventory = await shop_service.get_user_inventory(db, current_user.id)
owned_ids = {inv.item_id for inv in user_inventory}
equipped_ids = {inv.item_id for inv in user_inventory if inv.equipped}
result = []
for item in items:
item_dict = ShopItemResponse.model_validate(item).model_dump()
item_dict["is_owned"] = item.id in owned_ids
item_dict["is_equipped"] = item.id in equipped_ids
result.append(ShopItemResponse(**item_dict))
return result
@router.get("/items/{item_id}", response_model=ShopItemResponse)
async def get_shop_item(
item_id: int,
current_user: CurrentUser,
db: DbSession,
):
"""Get single shop item by ID"""
item = await shop_service.get_item_by_id(db, item_id)
if not item:
raise HTTPException(status_code=404, detail="Item not found")
is_owned = await shop_service.check_user_owns_item(db, current_user.id, item_id)
# Check if equipped
is_equipped = False
if is_owned:
inventory = await shop_service.get_user_inventory(db, current_user.id, item.item_type)
is_equipped = any(inv.equipped and inv.item_id == item_id for inv in inventory)
response = ShopItemResponse.model_validate(item)
response.is_owned = is_owned
response.is_equipped = is_equipped
return response
# === Purchases ===
@router.post("/purchase", response_model=PurchaseResponse)
async def purchase_item(
data: PurchaseRequest,
current_user: CurrentUser,
db: DbSession,
):
"""Purchase an item from the shop"""
inv_item, total_cost = await shop_service.purchase_item(
db, current_user, data.item_id, data.quantity
)
await db.commit()
await db.refresh(current_user)
item = await shop_service.get_item_by_id(db, data.item_id)
return PurchaseResponse(
success=True,
item=ShopItemResponse.model_validate(item),
quantity=data.quantity,
total_cost=total_cost,
new_balance=current_user.coins_balance,
message=f"Successfully purchased {item.name} x{data.quantity}",
)
# === Inventory ===
@router.get("/inventory", response_model=list[InventoryItemResponse])
async def get_my_inventory(
current_user: CurrentUser,
db: DbSession,
item_type: str | None = None,
):
"""Get current user's inventory"""
inventory = await shop_service.get_user_inventory(db, current_user.id, item_type)
return [InventoryItemResponse.model_validate(inv) for inv in inventory]
# === Equip/Unequip ===
@router.post("/equip", response_model=EquipItemResponse)
async def equip_item(
data: EquipItemRequest,
current_user: CurrentUser,
db: DbSession,
):
"""Equip a cosmetic item from inventory"""
item = await shop_service.equip_item(db, current_user, data.inventory_id)
await db.commit()
return EquipItemResponse(
success=True,
item_type=item.item_type,
equipped_item=ShopItemResponse.model_validate(item),
message=f"Equipped {item.name}",
)
@router.post("/unequip/{item_type}", response_model=EquipItemResponse)
async def unequip_item(
item_type: str,
current_user: CurrentUser,
db: DbSession,
):
"""Unequip item of specified type"""
valid_types = [ShopItemType.FRAME.value, ShopItemType.TITLE.value,
ShopItemType.NAME_COLOR.value, ShopItemType.BACKGROUND.value]
if item_type not in valid_types:
raise HTTPException(status_code=400, detail=f"Invalid item type: {item_type}")
await shop_service.unequip_item(db, current_user, item_type)
await db.commit()
return EquipItemResponse(
success=True,
item_type=item_type,
equipped_item=None,
message=f"Unequipped {item_type}",
)
# === Consumables ===
@router.post("/use", response_model=UseConsumableResponse)
async def use_consumable(
data: UseConsumableRequest,
current_user: CurrentUser,
db: DbSession,
):
"""Use a consumable item"""
# Get marathon
result = await db.execute(select(Marathon).where(Marathon.id == data.marathon_id))
marathon = result.scalar_one_or_none()
if not marathon:
raise HTTPException(status_code=404, detail="Marathon not found")
# Get participant
participant = await require_participant(db, current_user.id, data.marathon_id)
# For some consumables, we need the assignment
assignment = None
if data.item_code in ["skip", "skip_exile", "wild_card", "copycat"]:
if not data.assignment_id:
raise HTTPException(status_code=400, detail=f"assignment_id is required for {data.item_code}")
# For copycat and wild_card, we need bonus_assignments to properly handle playthrough
if data.item_code in ("copycat", "wild_card"):
result = await db.execute(
select(Assignment)
.options(selectinload(Assignment.bonus_assignments))
.where(
Assignment.id == data.assignment_id,
Assignment.participant_id == participant.id,
)
)
else:
result = await db.execute(
select(Assignment).where(
Assignment.id == data.assignment_id,
Assignment.participant_id == participant.id,
)
)
assignment = result.scalar_one_or_none()
if not assignment:
raise HTTPException(status_code=404, detail="Assignment not found")
# Use the consumable
if data.item_code == "skip":
effect = await consumables_service.use_skip(db, current_user, participant, marathon, assignment)
effect_description = "Assignment skipped without penalty"
elif data.item_code == "skip_exile":
effect = await consumables_service.use_skip_exile(db, current_user, participant, marathon, assignment)
effect_description = "Assignment skipped, game exiled from pool"
elif data.item_code == "boost":
effect = await consumables_service.use_boost(db, current_user, participant, marathon)
effect_description = f"Boost x{effect['multiplier']} activated for current assignment"
elif data.item_code == "wild_card":
if data.game_id is None:
raise HTTPException(status_code=400, detail="game_id is required for wild_card")
effect = await consumables_service.use_wild_card(
db, current_user, participant, marathon, assignment, data.game_id
)
effect_description = f"New challenge from {effect['game_name']}: {effect['challenge_title']}"
elif data.item_code == "lucky_dice":
effect = await consumables_service.use_lucky_dice(db, current_user, participant, marathon)
effect_description = f"Lucky Dice rolled: x{effect['multiplier']} multiplier"
elif data.item_code == "copycat":
if data.target_participant_id is None:
raise HTTPException(status_code=400, detail="target_participant_id is required for copycat")
effect = await consumables_service.use_copycat(
db, current_user, participant, marathon, assignment, data.target_participant_id
)
effect_description = f"Copied challenge: {effect['challenge_title']}"
elif data.item_code == "undo":
effect = await consumables_service.use_undo(db, current_user, participant, marathon)
effect_description = f"Restored {effect['points_restored']} points and streak {effect['streak_restored']}"
else:
raise HTTPException(status_code=400, detail=f"Unknown consumable: {data.item_code}")
await db.commit()
# Get remaining quantity
remaining = await consumables_service.get_consumable_count(db, current_user.id, data.item_code)
return UseConsumableResponse(
success=True,
item_code=data.item_code,
remaining_quantity=remaining,
effect_description=effect_description,
effect_data=effect,
)
@router.get("/consumables/{marathon_id}", response_model=ConsumablesStatusResponse)
async def get_consumables_status(
marathon_id: int,
current_user: CurrentUser,
db: DbSession,
):
"""Get consumables status for participant in marathon"""
result = await db.execute(select(Marathon).where(Marathon.id == marathon_id))
marathon = result.scalar_one_or_none()
if not marathon:
raise HTTPException(status_code=404, detail="Marathon not found")
participant = await require_participant(db, current_user.id, marathon_id)
# Get inventory counts for all consumables
skips_available = await consumables_service.get_consumable_count(db, current_user.id, "skip")
skip_exiles_available = await consumables_service.get_consumable_count(db, current_user.id, "skip_exile")
boosts_available = await consumables_service.get_consumable_count(db, current_user.id, "boost")
wild_cards_available = await consumables_service.get_consumable_count(db, current_user.id, "wild_card")
lucky_dice_available = await consumables_service.get_consumable_count(db, current_user.id, "lucky_dice")
copycats_available = await consumables_service.get_consumable_count(db, current_user.id, "copycat")
undos_available = await consumables_service.get_consumable_count(db, current_user.id, "undo")
# Calculate remaining skips for this marathon
skips_remaining = None
if marathon.max_skips_per_participant is not None:
skips_remaining = max(0, marathon.max_skips_per_participant - participant.skips_used)
return ConsumablesStatusResponse(
skips_available=skips_available,
skip_exiles_available=skip_exiles_available,
skips_used=participant.skips_used,
skips_remaining=skips_remaining,
boosts_available=boosts_available,
has_active_boost=participant.has_active_boost,
boost_multiplier=consumables_service.BOOST_MULTIPLIER if participant.has_active_boost else None,
wild_cards_available=wild_cards_available,
lucky_dice_available=lucky_dice_available,
has_lucky_dice=participant.has_lucky_dice,
lucky_dice_multiplier=participant.lucky_dice_multiplier,
copycats_available=copycats_available,
undos_available=undos_available,
can_undo=participant.can_undo,
)
@router.get("/copycat-candidates/{marathon_id}", response_model=list[SwapCandidate])
async def get_copycat_candidates(
marathon_id: int,
current_user: CurrentUser,
db: DbSession,
):
"""Get participants with active assignments available for copycat (no event required)"""
result = await db.execute(select(Marathon).where(Marathon.id == marathon_id))
marathon = result.scalar_one_or_none()
if not marathon:
raise HTTPException(status_code=404, detail="Marathon not found")
participant = await require_participant(db, current_user.id, marathon_id)
# Get all participants except current user with active assignments
# Support both challenge assignments and playthrough assignments
result = await db.execute(
select(Participant, Assignment, Challenge, Game)
.join(Assignment, Assignment.participant_id == Participant.id)
.outerjoin(Challenge, Assignment.challenge_id == Challenge.id)
.outerjoin(Game, Challenge.game_id == Game.id)
.options(selectinload(Participant.user))
.where(
Participant.marathon_id == marathon_id,
Participant.id != participant.id,
Assignment.status == AssignmentStatus.ACTIVE.value,
)
)
rows = result.all()
candidates = []
for p, assignment, challenge, game in rows:
# For playthrough assignments, challenge is None
if assignment.is_playthrough:
# Need to get game info for playthrough
game_result = await db.execute(
select(Game).where(Game.id == assignment.game_id)
)
playthrough_game = game_result.scalar_one_or_none()
if playthrough_game:
candidates.append(SwapCandidate(
participant_id=p.id,
user=UserPublic(
id=p.user.id,
nickname=p.user.nickname,
avatar_url=p.user.avatar_url,
role=p.user.role,
telegram_avatar_url=p.user.telegram_avatar_url,
created_at=p.user.created_at,
equipped_frame=None,
equipped_title=None,
equipped_name_color=None,
equipped_background=None,
),
challenge_title=f"Прохождение: {playthrough_game.title}",
challenge_description=playthrough_game.playthrough_description or "Прохождение игры",
challenge_points=playthrough_game.playthrough_points or 0,
challenge_difficulty="medium",
game_title=playthrough_game.title,
))
elif challenge and game:
candidates.append(SwapCandidate(
participant_id=p.id,
user=UserPublic(
id=p.user.id,
nickname=p.user.nickname,
avatar_url=p.user.avatar_url,
role=p.user.role,
telegram_avatar_url=p.user.telegram_avatar_url,
created_at=p.user.created_at,
equipped_frame=None,
equipped_title=None,
equipped_name_color=None,
equipped_background=None,
),
challenge_title=challenge.title,
challenge_description=challenge.description,
challenge_points=challenge.points,
challenge_difficulty=challenge.difficulty,
game_title=game.title,
))
return candidates
# === Coins ===
@router.get("/balance", response_model=CoinsBalanceResponse)
async def get_coins_balance(
current_user: CurrentUser,
db: DbSession,
):
"""Get current user's coins balance with recent transactions"""
result = await db.execute(
select(CoinTransaction)
.where(CoinTransaction.user_id == current_user.id)
.order_by(CoinTransaction.created_at.desc())
.limit(10)
)
transactions = result.scalars().all()
return CoinsBalanceResponse(
balance=current_user.coins_balance,
recent_transactions=[CoinTransactionResponse.model_validate(t) for t in transactions],
)
@router.get("/transactions", response_model=list[CoinTransactionResponse])
async def get_coin_transactions(
current_user: CurrentUser,
db: DbSession,
limit: int = 50,
offset: int = 0,
):
"""Get user's coin transaction history"""
result = await db.execute(
select(CoinTransaction)
.where(CoinTransaction.user_id == current_user.id)
.order_by(CoinTransaction.created_at.desc())
.offset(offset)
.limit(min(limit, 100))
)
transactions = result.scalars().all()
return [CoinTransactionResponse.model_validate(t) for t in transactions]
# === Certification (organizer endpoints) ===
@router.post("/certification/{marathon_id}/request", response_model=CertificationStatusResponse)
async def request_certification(
marathon_id: int,
current_user: CurrentUser,
db: DbSession,
):
"""Request certification for a marathon (organizer only)"""
# Check user is organizer
result = await db.execute(
select(Marathon).where(Marathon.id == marathon_id)
)
marathon = result.scalar_one_or_none()
if not marathon:
raise HTTPException(status_code=404, detail="Marathon not found")
if marathon.creator_id != current_user.id and not current_user.is_admin:
raise HTTPException(status_code=403, detail="Only the creator can request certification")
if marathon.certification_status != CertificationStatus.NONE.value:
raise HTTPException(
status_code=400,
detail=f"Marathon already has certification status: {marathon.certification_status}"
)
marathon.certification_status = CertificationStatus.PENDING.value
marathon.certification_requested_at = datetime.utcnow()
await db.commit()
await db.refresh(marathon)
return CertificationStatusResponse(
marathon_id=marathon.id,
certification_status=marathon.certification_status,
is_certified=marathon.is_certified,
certification_requested_at=marathon.certification_requested_at,
certified_at=marathon.certified_at,
certified_by_nickname=None,
rejection_reason=None,
)
@router.delete("/certification/{marathon_id}/request", response_model=MessageResponse)
async def cancel_certification_request(
marathon_id: int,
current_user: CurrentUser,
db: DbSession,
):
"""Cancel certification request (organizer only)"""
result = await db.execute(
select(Marathon).where(Marathon.id == marathon_id)
)
marathon = result.scalar_one_or_none()
if not marathon:
raise HTTPException(status_code=404, detail="Marathon not found")
if marathon.creator_id != current_user.id and not current_user.is_admin:
raise HTTPException(status_code=403, detail="Only the creator can cancel certification request")
if marathon.certification_status != CertificationStatus.PENDING.value:
raise HTTPException(status_code=400, detail="No pending certification request to cancel")
marathon.certification_status = CertificationStatus.NONE.value
marathon.certification_requested_at = None
await db.commit()
return MessageResponse(message="Certification request cancelled")
@router.get("/certification/{marathon_id}", response_model=CertificationStatusResponse)
async def get_certification_status(
marathon_id: int,
current_user: CurrentUser,
db: DbSession,
):
"""Get certification status of a marathon"""
result = await db.execute(
select(Marathon)
.options(selectinload(Marathon.certified_by))
.where(Marathon.id == marathon_id)
)
marathon = result.scalar_one_or_none()
if not marathon:
raise HTTPException(status_code=404, detail="Marathon not found")
return CertificationStatusResponse(
marathon_id=marathon.id,
certification_status=marathon.certification_status,
is_certified=marathon.is_certified,
certification_requested_at=marathon.certification_requested_at,
certified_at=marathon.certified_at,
certified_by_nickname=marathon.certified_by.nickname if marathon.certified_by else None,
rejection_reason=marathon.certification_rejection_reason,
)
# === Admin endpoints ===
@router.get("/admin/items", response_model=list[ShopItemResponse])
async def admin_get_all_items(
current_user: CurrentUser,
db: DbSession,
):
"""Get all shop items including inactive (admin only)"""
require_admin_with_2fa(current_user)
items = await shop_service.get_available_items(db, include_unavailable=True)
return [ShopItemResponse.model_validate(item) for item in items]
@router.post("/admin/items", response_model=ShopItemResponse)
async def admin_create_item(
data: ShopItemCreate,
current_user: CurrentUser,
db: DbSession,
):
"""Create a new shop item (admin only)"""
require_admin_with_2fa(current_user)
# Check code uniqueness
existing = await shop_service.get_item_by_code(db, data.code)
if existing:
raise HTTPException(status_code=400, detail=f"Item with code '{data.code}' already exists")
item = ShopItem(
item_type=data.item_type,
code=data.code,
name=data.name,
description=data.description,
price=data.price,
rarity=data.rarity,
asset_data=data.asset_data,
is_active=data.is_active,
available_from=data.available_from,
available_until=data.available_until,
stock_limit=data.stock_limit,
stock_remaining=data.stock_limit, # Initialize remaining = limit
)
db.add(item)
await db.commit()
await db.refresh(item)
return ShopItemResponse.model_validate(item)
@router.put("/admin/items/{item_id}", response_model=ShopItemResponse)
async def admin_update_item(
item_id: int,
data: ShopItemUpdate,
current_user: CurrentUser,
db: DbSession,
):
"""Update a shop item (admin only)"""
require_admin_with_2fa(current_user)
item = await shop_service.get_item_by_id(db, item_id)
if not item:
raise HTTPException(status_code=404, detail="Item not found")
# Update fields
if data.name is not None:
item.name = data.name
if data.description is not None:
item.description = data.description
if data.price is not None:
item.price = data.price
if data.rarity is not None:
item.rarity = data.rarity
if data.asset_data is not None:
item.asset_data = data.asset_data
if data.is_active is not None:
item.is_active = data.is_active
if data.available_from is not None:
item.available_from = data.available_from
if data.available_until is not None:
item.available_until = data.available_until
if data.stock_limit is not None:
# If increasing limit, also increase remaining
if item.stock_limit is not None and data.stock_limit > item.stock_limit:
diff = data.stock_limit - item.stock_limit
item.stock_remaining = (item.stock_remaining or 0) + diff
item.stock_limit = data.stock_limit
await db.commit()
await db.refresh(item)
return ShopItemResponse.model_validate(item)
@router.delete("/admin/items/{item_id}", response_model=MessageResponse)
async def admin_delete_item(
item_id: int,
current_user: CurrentUser,
db: DbSession,
):
"""Delete a shop item (admin only)"""
require_admin_with_2fa(current_user)
item = await shop_service.get_item_by_id(db, item_id)
if not item:
raise HTTPException(status_code=404, detail="Item not found")
await db.delete(item)
await db.commit()
return MessageResponse(message=f"Item '{item.name}' deleted")
@router.post("/admin/users/{user_id}/coins/grant", response_model=MessageResponse)
async def admin_grant_coins(
user_id: int,
data: AdminCoinsRequest,
current_user: CurrentUser,
db: DbSession,
):
"""Grant coins to a user (admin only)"""
require_admin_with_2fa(current_user)
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=404, detail="User not found")
await coins_service.admin_grant_coins(db, user, data.amount, data.reason, current_user.id)
await db.commit()
return MessageResponse(message=f"Granted {data.amount} coins to {user.nickname}")
@router.post("/admin/users/{user_id}/coins/deduct", response_model=MessageResponse)
async def admin_deduct_coins(
user_id: int,
data: AdminCoinsRequest,
current_user: CurrentUser,
db: DbSession,
):
"""Deduct coins from a user (admin only)"""
require_admin_with_2fa(current_user)
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=404, detail="User not found")
success = await coins_service.admin_deduct_coins(db, user, data.amount, data.reason, current_user.id)
if not success:
raise HTTPException(status_code=400, detail="User doesn't have enough coins")
await db.commit()
return MessageResponse(message=f"Deducted {data.amount} coins from {user.nickname}")
@router.get("/admin/certification/pending", response_model=list[dict])
async def admin_get_pending_certifications(
current_user: CurrentUser,
db: DbSession,
):
"""Get list of marathons pending certification (admin only)"""
require_admin_with_2fa(current_user)
result = await db.execute(
select(Marathon)
.options(selectinload(Marathon.creator))
.where(Marathon.certification_status == CertificationStatus.PENDING.value)
.order_by(Marathon.certification_requested_at.asc())
)
marathons = result.scalars().all()
return [
{
"id": m.id,
"title": m.title,
"creator_nickname": m.creator.nickname,
"status": m.status,
"participants_count": len(m.participants) if m.participants else 0,
"certification_requested_at": m.certification_requested_at,
}
for m in marathons
]
@router.post("/admin/certification/{marathon_id}/review", response_model=CertificationStatusResponse)
async def admin_review_certification(
marathon_id: int,
data: CertificationReviewRequest,
current_user: CurrentUser,
db: DbSession,
):
"""Approve or reject marathon certification (admin only)"""
require_admin_with_2fa(current_user)
result = await db.execute(
select(Marathon).where(Marathon.id == marathon_id)
)
marathon = result.scalar_one_or_none()
if not marathon:
raise HTTPException(status_code=404, detail="Marathon not found")
if marathon.certification_status != CertificationStatus.PENDING.value:
raise HTTPException(status_code=400, detail="Marathon is not pending certification")
if data.approve:
marathon.certification_status = CertificationStatus.CERTIFIED.value
marathon.certified_at = datetime.utcnow()
marathon.certified_by_id = current_user.id
marathon.certification_rejection_reason = None
else:
if not data.rejection_reason:
raise HTTPException(status_code=400, detail="Rejection reason is required")
marathon.certification_status = CertificationStatus.REJECTED.value
marathon.certification_rejection_reason = data.rejection_reason
await db.commit()
await db.refresh(marathon)
return CertificationStatusResponse(
marathon_id=marathon.id,
certification_status=marathon.certification_status,
is_certified=marathon.is_certified,
certification_requested_at=marathon.certification_requested_at,
certified_at=marathon.certified_at,
certified_by_nickname=current_user.nickname if data.approve else None,
rejection_reason=marathon.certification_rejection_reason,
)
# === Admin Item Granting ===
@router.post("/admin/users/{user_id}/items/grant", response_model=MessageResponse)
async def admin_grant_item(
user_id: int,
data: AdminGrantItemRequest,
current_user: CurrentUser,
db: DbSession,
):
"""Grant an item to a user (admin only)"""
require_admin_with_2fa(current_user)
# Get target user
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=404, detail="User not found")
# Get item
item = await shop_service.get_item_by_id(db, data.item_id)
if not item:
raise HTTPException(status_code=404, detail="Item not found")
# Check if user already has this item in inventory
result = await db.execute(
select(UserInventory).where(
UserInventory.user_id == user_id,
UserInventory.item_id == data.item_id,
)
)
existing = result.scalar_one_or_none()
if existing:
# Add to quantity
existing.quantity += data.quantity
else:
# Create new inventory item
inventory_item = UserInventory(
user_id=user_id,
item_id=data.item_id,
quantity=data.quantity,
)
db.add(inventory_item)
# Log the action (using coin transaction as audit log)
transaction = CoinTransaction(
user_id=user_id,
amount=0,
transaction_type="admin_grant_item",
description=f"Admin granted {item.name} x{data.quantity}: {data.reason}",
reference_type="admin_action",
reference_id=current_user.id,
)
db.add(transaction)
await db.commit()
# Send Telegram notification
await telegram_notifier.notify_item_granted(
user=user,
item_name=item.name,
quantity=data.quantity,
reason=data.reason,
admin_nickname=current_user.nickname,
)
return MessageResponse(message=f"Granted {item.name} x{data.quantity} to {user.nickname}")
@router.get("/admin/users/{user_id}/inventory", response_model=list[InventoryItemResponse])
async def admin_get_user_inventory(
user_id: int,
current_user: CurrentUser,
db: DbSession,
item_type: str | None = None,
):
"""Get a user's inventory (admin only)"""
require_admin_with_2fa(current_user)
# Check user exists
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=404, detail="User not found")
inventory = await shop_service.get_user_inventory(db, user_id, item_type)
return [InventoryItemResponse.model_validate(inv) for inv in inventory]
@router.delete("/admin/users/{user_id}/inventory/{inventory_id}", response_model=MessageResponse)
async def admin_remove_inventory_item(
user_id: int,
inventory_id: int,
current_user: CurrentUser,
db: DbSession,
quantity: int = 1,
):
"""Remove an item from user's inventory (admin only)"""
require_admin_with_2fa(current_user)
# Check user exists
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=404, detail="User not found")
# Get inventory item
result = await db.execute(
select(UserInventory)
.options(selectinload(UserInventory.item))
.where(
UserInventory.id == inventory_id,
UserInventory.user_id == user_id,
)
)
inv = result.scalar_one_or_none()
if not inv:
raise HTTPException(status_code=404, detail="Inventory item not found")
item_name = inv.item.name
if quantity >= inv.quantity:
# Remove entirely
await db.delete(inv)
removed_qty = inv.quantity
else:
# Reduce quantity
inv.quantity -= quantity
removed_qty = quantity
# Log the action
transaction = CoinTransaction(
user_id=user_id,
amount=0,
transaction_type="admin_remove_item",
description=f"Admin removed {item_name} x{removed_qty}",
reference_type="admin_action",
reference_id=current_user.id,
)
db.add(transaction)
await db.commit()
return MessageResponse(message=f"Removed {item_name} x{removed_qty} from {user.nickname}")