Files
2026-01-05 08:42:49 +07:00

298 lines
9.5 KiB
Python

"""
Shop Service - handles shop items, purchases, and inventory management
"""
from datetime import datetime
from fastapi import HTTPException
from sqlalchemy import select, update
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.models import User, ShopItem, UserInventory, ShopItemType
from app.services.coins import coins_service
class ShopService:
"""Service for shop operations"""
async def get_available_items(
self,
db: AsyncSession,
item_type: str | None = None,
include_unavailable: bool = False,
) -> list[ShopItem]:
"""
Get list of shop items.
Args:
item_type: Filter by item type (frame, title, etc.)
include_unavailable: Include inactive/out of stock items
"""
query = select(ShopItem)
if item_type:
query = query.where(ShopItem.item_type == item_type)
if not include_unavailable:
now = datetime.utcnow()
query = query.where(
ShopItem.is_active == True,
(ShopItem.available_from.is_(None)) | (ShopItem.available_from <= now),
(ShopItem.available_until.is_(None)) | (ShopItem.available_until >= now),
(ShopItem.stock_remaining.is_(None)) | (ShopItem.stock_remaining > 0),
)
query = query.order_by(ShopItem.price.asc())
result = await db.execute(query)
return list(result.scalars().all())
async def get_item_by_id(self, db: AsyncSession, item_id: int) -> ShopItem | None:
"""Get shop item by ID"""
result = await db.execute(select(ShopItem).where(ShopItem.id == item_id))
return result.scalar_one_or_none()
async def get_item_by_code(self, db: AsyncSession, code: str) -> ShopItem | None:
"""Get shop item by code"""
result = await db.execute(select(ShopItem).where(ShopItem.code == code))
return result.scalar_one_or_none()
async def purchase_item(
self,
db: AsyncSession,
user: User,
item_id: int,
quantity: int = 1,
) -> tuple[UserInventory, int]:
"""
Purchase an item from the shop.
Args:
user: The purchasing user
item_id: ID of item to purchase
quantity: Number to purchase (only for consumables)
Returns:
Tuple of (inventory item, total cost)
Raises:
HTTPException: If item not found, not available, or insufficient funds
"""
# Get item
item = await self.get_item_by_id(db, item_id)
if not item:
raise HTTPException(status_code=404, detail="Item not found")
# Check availability
if not item.is_available:
raise HTTPException(status_code=400, detail="Item is not available")
# For non-consumables, quantity is always 1
if item.item_type != ShopItemType.CONSUMABLE.value:
quantity = 1
# Check if already owned
existing = await db.execute(
select(UserInventory).where(
UserInventory.user_id == user.id,
UserInventory.item_id == item.id,
)
)
if existing.scalar_one_or_none():
raise HTTPException(status_code=400, detail="You already own this item")
# Check stock
if item.stock_remaining is not None and item.stock_remaining < quantity:
raise HTTPException(status_code=400, detail="Not enough stock available")
# Calculate total cost
total_cost = item.price * quantity
# Check balance
if user.coins_balance < total_cost:
raise HTTPException(status_code=400, detail="Not enough coins")
# Deduct coins
success = await coins_service.spend_coins(
db, user, total_cost,
f"Purchase: {item.name} x{quantity}",
"shop_item", item.id,
)
if not success:
raise HTTPException(status_code=400, detail="Payment failed")
# Add to inventory
if item.item_type == ShopItemType.CONSUMABLE.value:
# For consumables, increase quantity if already exists
existing_result = await db.execute(
select(UserInventory).where(
UserInventory.user_id == user.id,
UserInventory.item_id == item.id,
)
)
inv_item = existing_result.scalar_one_or_none()
if inv_item:
inv_item.quantity += quantity
else:
inv_item = UserInventory(
user_id=user.id,
item_id=item.id,
quantity=quantity,
)
db.add(inv_item)
else:
# For cosmetics, create new inventory entry
inv_item = UserInventory(
user_id=user.id,
item_id=item.id,
quantity=1,
)
db.add(inv_item)
# Decrease stock if limited
if item.stock_remaining is not None:
item.stock_remaining -= quantity
await db.flush()
return inv_item, total_cost
async def get_user_inventory(
self,
db: AsyncSession,
user_id: int,
item_type: str | None = None,
) -> list[UserInventory]:
"""Get user's inventory"""
query = (
select(UserInventory)
.options(selectinload(UserInventory.item))
.where(UserInventory.user_id == user_id)
)
if item_type:
query = query.join(ShopItem).where(ShopItem.item_type == item_type)
# Exclude empty consumables
query = query.where(UserInventory.quantity > 0)
result = await db.execute(query)
return list(result.scalars().all())
async def get_inventory_item(
self,
db: AsyncSession,
user_id: int,
inventory_id: int,
) -> UserInventory | None:
"""Get specific inventory item"""
result = await db.execute(
select(UserInventory)
.options(selectinload(UserInventory.item))
.where(
UserInventory.id == inventory_id,
UserInventory.user_id == user_id,
)
)
return result.scalar_one_or_none()
async def equip_item(
self,
db: AsyncSession,
user: User,
inventory_id: int,
) -> ShopItem:
"""
Equip a cosmetic item from inventory.
Returns: The equipped item
Raises:
HTTPException: If item not found or is a consumable
"""
# Get inventory item
inv_item = await self.get_inventory_item(db, user.id, inventory_id)
if not inv_item:
raise HTTPException(status_code=404, detail="Item not found in inventory")
item = inv_item.item
if item.item_type == ShopItemType.CONSUMABLE.value:
raise HTTPException(status_code=400, detail="Cannot equip consumables")
# Unequip current item of same type
await db.execute(
update(UserInventory)
.where(
UserInventory.user_id == user.id,
UserInventory.equipped == True,
UserInventory.item_id.in_(
select(ShopItem.id).where(ShopItem.item_type == item.item_type)
),
)
.values(equipped=False)
)
# Equip new item
inv_item.equipped = True
# Update user's equipped_*_id
if item.item_type == ShopItemType.FRAME.value:
user.equipped_frame_id = item.id
elif item.item_type == ShopItemType.TITLE.value:
user.equipped_title_id = item.id
elif item.item_type == ShopItemType.NAME_COLOR.value:
user.equipped_name_color_id = item.id
elif item.item_type == ShopItemType.BACKGROUND.value:
user.equipped_background_id = item.id
return item
async def unequip_item(
self,
db: AsyncSession,
user: User,
item_type: str,
) -> None:
"""Unequip item of specified type"""
# Unequip from inventory
await db.execute(
update(UserInventory)
.where(
UserInventory.user_id == user.id,
UserInventory.equipped == True,
UserInventory.item_id.in_(
select(ShopItem.id).where(ShopItem.item_type == item_type)
),
)
.values(equipped=False)
)
# Clear user's equipped_*_id
if item_type == ShopItemType.FRAME.value:
user.equipped_frame_id = None
elif item_type == ShopItemType.TITLE.value:
user.equipped_title_id = None
elif item_type == ShopItemType.NAME_COLOR.value:
user.equipped_name_color_id = None
elif item_type == ShopItemType.BACKGROUND.value:
user.equipped_background_id = None
async def check_user_owns_item(
self,
db: AsyncSession,
user_id: int,
item_id: int,
) -> bool:
"""Check if user owns an item"""
result = await db.execute(
select(UserInventory).where(
UserInventory.user_id == user_id,
UserInventory.item_id == item_id,
UserInventory.quantity > 0,
)
)
return result.scalar_one_or_none() is not None
# Singleton instance
shop_service = ShopService()