Files
game-marathon/backend/app/services/consumables.py
mamonov.ep 72089d1b47 Исправлены ошибки Wild Card и skip-assignment
- Wild Card: исправлен game.name → game.title
- Wild Card: добавлена поддержка игр типа playthrough
- points.py: добавлена проверка на None для challenge_points
- PlaythroughInfo: поля сделаны Optional (description, points, proof_type)
- organizer_skip_assignment: добавлен фильтр is_event_assignment

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-16 18:10:40 +03:00

726 lines
24 KiB
Python

"""
Consumables Service - handles consumable items usage
Consumables:
- skip: Skip current assignment without penalty
- skip_exile: Skip + permanently exile game from pool
- boost: x1.5 multiplier for current assignment
- wild_card: Choose a game, get random challenge from it
- lucky_dice: Random multiplier (0.5, 1.0, 1.5, 2.0, 2.5, 3.0)
- copycat: Copy another participant's assignment
- undo: Restore points and streak from last drop
"""
import random
from datetime import datetime
from fastapi import HTTPException
from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.models import (
User, Participant, Marathon, Assignment, AssignmentStatus,
ShopItem, UserInventory, ConsumableUsage, ConsumableType, Game, Challenge,
BonusAssignment, ExiledGame, GameType
)
class ConsumablesService:
"""Service for consumable items"""
# Boost settings
BOOST_MULTIPLIER = 1.5
# Lucky Dice multipliers (equal probability, starts from 1.5x)
LUCKY_DICE_MULTIPLIERS = [1.5, 2.0, 2.5, 3.0, 3.5, 4.0]
async def use_skip(
self,
db: AsyncSession,
user: User,
participant: Participant,
marathon: Marathon,
assignment: Assignment,
) -> dict:
"""
Use a Skip to bypass current assignment without penalty.
- No streak loss
- No drop penalty
- Assignment marked as dropped but without negative effects
Returns: dict with result info
Raises:
HTTPException: If skips not allowed or limit reached
"""
# Check marathon settings
if not marathon.allow_skips:
raise HTTPException(status_code=400, detail="Skips are not allowed in this marathon")
if marathon.max_skips_per_participant is not None:
if participant.skips_used >= marathon.max_skips_per_participant:
raise HTTPException(
status_code=400,
detail=f"Skip limit reached ({marathon.max_skips_per_participant} per participant)"
)
# Check assignment is active
if assignment.status != AssignmentStatus.ACTIVE.value:
raise HTTPException(status_code=400, detail="Can only skip active assignments")
# Consume skip from inventory
item = await self._consume_item(db, user, ConsumableType.SKIP.value)
# Mark assignment as dropped (but without penalty)
assignment.status = AssignmentStatus.DROPPED.value
assignment.completed_at = datetime.utcnow()
# Note: We do NOT increase drop_count or reset streak
# Track skip usage
participant.skips_used += 1
# Log usage
usage = ConsumableUsage(
user_id=user.id,
item_id=item.id,
marathon_id=marathon.id,
assignment_id=assignment.id,
effect_data={
"type": "skip",
"skipped_without_penalty": True,
},
)
db.add(usage)
return {
"success": True,
"skipped": True,
"penalty": 0,
"streak_preserved": True,
}
async def use_skip_exile(
self,
db: AsyncSession,
user: User,
participant: Participant,
marathon: Marathon,
assignment: Assignment,
) -> dict:
"""
Use Skip with Exile - skip assignment AND permanently exile game from pool.
- No streak loss
- No drop penalty
- Game is permanently excluded from participant's pool
Returns: dict with result info
Raises:
HTTPException: If skips not allowed or limit reached
"""
# Check marathon settings (same as regular skip)
if not marathon.allow_skips:
raise HTTPException(status_code=400, detail="Skips are not allowed in this marathon")
if marathon.max_skips_per_participant is not None:
if participant.skips_used >= marathon.max_skips_per_participant:
raise HTTPException(
status_code=400,
detail=f"Skip limit reached ({marathon.max_skips_per_participant} per participant)"
)
# Check assignment is active
if assignment.status != AssignmentStatus.ACTIVE.value:
raise HTTPException(status_code=400, detail="Can only skip active assignments")
# Get game_id (different for playthrough vs challenges)
if assignment.is_playthrough:
game_id = assignment.game_id
else:
# Need to load challenge to get game_id
if assignment.challenge:
game_id = assignment.challenge.game_id
else:
# Load challenge if not already loaded
result = await db.execute(
select(Challenge).where(Challenge.id == assignment.challenge_id)
)
challenge = result.scalar_one()
game_id = challenge.game_id
# Check if game is already exiled
existing = await db.execute(
select(ExiledGame).where(
ExiledGame.participant_id == participant.id,
ExiledGame.game_id == game_id,
ExiledGame.is_active == True,
)
)
if existing.scalar_one_or_none():
raise HTTPException(status_code=400, detail="Game is already exiled")
# Consume skip_exile from inventory
item = await self._consume_item(db, user, ConsumableType.SKIP_EXILE.value)
# Mark assignment as dropped (without penalty)
assignment.status = AssignmentStatus.DROPPED.value
assignment.completed_at = datetime.utcnow()
# Track skip usage
participant.skips_used += 1
# Add game to exiled list
exiled = ExiledGame(
participant_id=participant.id,
game_id=game_id,
assignment_id=assignment.id,
exiled_by="user",
)
db.add(exiled)
# Log usage
usage = ConsumableUsage(
user_id=user.id,
item_id=item.id,
marathon_id=marathon.id,
assignment_id=assignment.id,
effect_data={
"type": "skip_exile",
"skipped_without_penalty": True,
"game_exiled": True,
"game_id": game_id,
},
)
db.add(usage)
return {
"success": True,
"skipped": True,
"exiled": True,
"game_id": game_id,
"penalty": 0,
"streak_preserved": True,
}
async def use_boost(
self,
db: AsyncSession,
user: User,
participant: Participant,
marathon: Marathon,
) -> dict:
"""
Activate a Boost - multiplies points for current assignment on complete.
- Points for completed challenge are multiplied by BOOST_MULTIPLIER
- One-time use (consumed on complete)
Returns: dict with result info
Raises:
HTTPException: If consumables not allowed or boost already active
"""
if not marathon.allow_consumables:
raise HTTPException(status_code=400, detail="Consumables are not allowed in this marathon")
if participant.has_active_boost:
raise HTTPException(status_code=400, detail="Boost is already activated")
# Consume boost from inventory
item = await self._consume_item(db, user, ConsumableType.BOOST.value)
# Activate boost (one-time use)
participant.has_active_boost = True
# Log usage
usage = ConsumableUsage(
user_id=user.id,
item_id=item.id,
marathon_id=marathon.id,
effect_data={
"type": "boost",
"multiplier": self.BOOST_MULTIPLIER,
"one_time": True,
},
)
db.add(usage)
return {
"success": True,
"boost_activated": True,
"multiplier": self.BOOST_MULTIPLIER,
}
async def use_wild_card(
self,
db: AsyncSession,
user: User,
participant: Participant,
marathon: Marathon,
assignment: Assignment,
game_id: int,
) -> dict:
"""
Use Wild Card - choose a game and switch to it.
For challenges game type:
- New challenge is randomly selected from the chosen game
- Assignment becomes a regular challenge
For playthrough game type:
- Assignment becomes a playthrough of the chosen game
- Bonus assignments are created from game's challenges
Returns: dict with new assignment info
Raises:
HTTPException: If game not in marathon or no challenges available
"""
if not marathon.allow_consumables:
raise HTTPException(status_code=400, detail="Consumables are not allowed in this marathon")
if assignment.status != AssignmentStatus.ACTIVE.value:
raise HTTPException(status_code=400, detail="Can only use wild card on active assignments")
# Verify game is in this marathon and load challenges
result = await db.execute(
select(Game)
.options(selectinload(Game.challenges))
.where(
Game.id == game_id,
Game.marathon_id == marathon.id,
)
)
game = result.scalar_one_or_none()
if not game:
raise HTTPException(status_code=400, detail="Game not found in this marathon")
# Store old assignment info for logging
old_game_id = assignment.game_id
old_challenge_id = assignment.challenge_id
old_is_playthrough = assignment.is_playthrough
# Consume wild card from inventory
item = await self._consume_item(db, user, ConsumableType.WILD_CARD.value)
# Delete existing bonus assignments if any
if assignment.bonus_assignments:
for ba in assignment.bonus_assignments:
await db.delete(ba)
new_challenge_id = None
new_challenge_title = None
if game.game_type == GameType.PLAYTHROUGH.value:
# Switch to playthrough mode
assignment.game_id = game_id
assignment.challenge_id = None
assignment.is_playthrough = True
# Create bonus assignments from game's challenges
for ch in game.challenges:
bonus = BonusAssignment(
main_assignment_id=assignment.id,
challenge_id=ch.id,
)
db.add(bonus)
else:
# Switch to challenge mode - get random challenge
if not game.challenges:
raise HTTPException(status_code=400, detail="No challenges available for this game")
new_challenge = random.choice(game.challenges)
new_challenge_id = new_challenge.id
new_challenge_title = new_challenge.title
assignment.game_id = game_id
assignment.challenge_id = new_challenge_id
assignment.is_playthrough = False
# Reset timestamps since it's a new assignment
assignment.started_at = datetime.utcnow()
assignment.deadline = None
# Log usage
usage = ConsumableUsage(
user_id=user.id,
item_id=item.id,
marathon_id=marathon.id,
assignment_id=assignment.id,
effect_data={
"type": "wild_card",
"old_game_id": old_game_id,
"old_challenge_id": old_challenge_id,
"old_is_playthrough": old_is_playthrough,
"new_game_id": game_id,
"new_challenge_id": new_challenge_id,
"new_is_playthrough": game.game_type == GameType.PLAYTHROUGH.value,
},
)
db.add(usage)
return {
"success": True,
"game_id": game_id,
"game_name": game.title,
"game_type": game.game_type,
"is_playthrough": game.game_type == GameType.PLAYTHROUGH.value,
"challenge_id": new_challenge_id,
"challenge_title": new_challenge_title,
}
async def use_lucky_dice(
self,
db: AsyncSession,
user: User,
participant: Participant,
marathon: Marathon,
) -> dict:
"""
Use Lucky Dice - get a random multiplier for current assignment.
- Random multiplier from [0.5, 1.0, 1.5, 2.0, 2.5, 3.0]
- Applied on next complete (stacks with boost if both active)
- One-time use
Returns: dict with rolled multiplier
Raises:
HTTPException: If consumables not allowed or lucky dice already active
"""
if not marathon.allow_consumables:
raise HTTPException(status_code=400, detail="Consumables are not allowed in this marathon")
if participant.has_lucky_dice:
raise HTTPException(status_code=400, detail="Lucky Dice is already active")
# Consume lucky dice from inventory
item = await self._consume_item(db, user, ConsumableType.LUCKY_DICE.value)
# Roll the dice
multiplier = random.choice(self.LUCKY_DICE_MULTIPLIERS)
# Activate lucky dice
participant.has_lucky_dice = True
participant.lucky_dice_multiplier = multiplier
# Log usage
usage = ConsumableUsage(
user_id=user.id,
item_id=item.id,
marathon_id=marathon.id,
effect_data={
"type": "lucky_dice",
"multiplier": multiplier,
},
)
db.add(usage)
return {
"success": True,
"lucky_dice_activated": True,
"multiplier": multiplier,
}
async def use_copycat(
self,
db: AsyncSession,
user: User,
participant: Participant,
marathon: Marathon,
assignment: Assignment,
target_participant_id: int,
) -> dict:
"""
Use Copycat - copy another participant's assignment.
- Current assignment is replaced with target's current/last assignment
- Can copy even if target already completed theirs
- Cannot copy your own assignment
Returns: dict with copied assignment info
Raises:
HTTPException: If target not found or no assignment to copy
"""
if not marathon.allow_consumables:
raise HTTPException(status_code=400, detail="Consumables are not allowed in this marathon")
if assignment.status != AssignmentStatus.ACTIVE.value:
raise HTTPException(status_code=400, detail="Can only use copycat on active assignments")
if target_participant_id == participant.id:
raise HTTPException(status_code=400, detail="Cannot copy your own assignment")
# Find target participant
result = await db.execute(
select(Participant)
.where(
Participant.id == target_participant_id,
Participant.marathon_id == marathon.id,
)
)
target_participant = result.scalar_one_or_none()
if not target_participant:
raise HTTPException(status_code=400, detail="Target participant not found")
# Get target's most recent assignment (active or completed)
result = await db.execute(
select(Assignment)
.options(
selectinload(Assignment.challenge),
selectinload(Assignment.game).selectinload(Game.challenges),
)
.where(
Assignment.participant_id == target_participant_id,
Assignment.status.in_([
AssignmentStatus.ACTIVE.value,
AssignmentStatus.COMPLETED.value
])
)
.order_by(Assignment.started_at.desc())
.limit(1)
)
target_assignment = result.scalar_one_or_none()
if not target_assignment:
raise HTTPException(status_code=400, detail="Target has no assignment to copy")
# Consume copycat from inventory
item = await self._consume_item(db, user, ConsumableType.COPYCAT.value)
# Store old assignment info for logging
old_game_id = assignment.game_id
old_challenge_id = assignment.challenge_id
old_is_playthrough = assignment.is_playthrough
# Copy the assignment - handle both challenge and playthrough
assignment.game_id = target_assignment.game_id
assignment.challenge_id = target_assignment.challenge_id
assignment.is_playthrough = target_assignment.is_playthrough
# Reset timestamps
assignment.started_at = datetime.utcnow()
assignment.deadline = None
# If copying a playthrough, recreate bonus assignments
if target_assignment.is_playthrough:
# Delete existing bonus assignments
for ba in assignment.bonus_assignments:
await db.delete(ba)
# Create new bonus assignments from target game's challenges
if target_assignment.game and target_assignment.game.challenges:
for ch in target_assignment.game.challenges:
bonus = BonusAssignment(
main_assignment_id=assignment.id,
challenge_id=ch.id,
)
db.add(bonus)
# Log usage
usage = ConsumableUsage(
user_id=user.id,
item_id=item.id,
marathon_id=marathon.id,
assignment_id=assignment.id,
effect_data={
"type": "copycat",
"old_challenge_id": old_challenge_id,
"old_game_id": old_game_id,
"old_is_playthrough": old_is_playthrough,
"copied_from_participant_id": target_participant_id,
"new_challenge_id": target_assignment.challenge_id,
"new_game_id": target_assignment.game_id,
"new_is_playthrough": target_assignment.is_playthrough,
},
)
db.add(usage)
# Prepare response
if target_assignment.is_playthrough:
title = f"Прохождение: {target_assignment.game.title}" if target_assignment.game else "Прохождение"
else:
title = target_assignment.challenge.title if target_assignment.challenge else None
return {
"success": True,
"copied": True,
"game_id": target_assignment.game_id,
"challenge_id": target_assignment.challenge_id,
"is_playthrough": target_assignment.is_playthrough,
"challenge_title": title,
}
async def use_undo(
self,
db: AsyncSession,
user: User,
participant: Participant,
marathon: Marathon,
) -> dict:
"""
Use Undo - restore points and streak from last drop.
- Only works if there was a drop in this marathon
- Can only undo once per drop
- Restores both points and streak
Returns: dict with restored values
Raises:
HTTPException: If no drop to undo
"""
if not marathon.allow_consumables:
raise HTTPException(status_code=400, detail="Consumables are not allowed in this marathon")
if not participant.can_undo:
raise HTTPException(status_code=400, detail="No drop to undo")
if participant.last_drop_points is None or participant.last_drop_streak_before is None:
raise HTTPException(status_code=400, detail="No drop data to restore")
# Consume undo from inventory
item = await self._consume_item(db, user, ConsumableType.UNDO.value)
# Store values for logging
points_restored = participant.last_drop_points
streak_restored = participant.last_drop_streak_before
current_points = participant.total_points
current_streak = participant.current_streak
# Restore points and streak
participant.total_points += points_restored
participant.current_streak = streak_restored
participant.drop_count = max(0, participant.drop_count - 1)
# Clear undo data
participant.can_undo = False
participant.last_drop_points = None
participant.last_drop_streak_before = None
# Log usage
usage = ConsumableUsage(
user_id=user.id,
item_id=item.id,
marathon_id=marathon.id,
effect_data={
"type": "undo",
"points_restored": points_restored,
"streak_restored_to": streak_restored,
"points_before": current_points,
"streak_before": current_streak,
},
)
db.add(usage)
return {
"success": True,
"undone": True,
"points_restored": points_restored,
"streak_restored": streak_restored,
"new_total_points": participant.total_points,
"new_streak": participant.current_streak,
}
async def _consume_item(
self,
db: AsyncSession,
user: User,
item_code: str,
) -> ShopItem:
"""
Consume 1 unit of a consumable from user's inventory.
Returns: The consumed ShopItem
Raises:
HTTPException: If user doesn't have the item
"""
result = await db.execute(
select(UserInventory)
.options(selectinload(UserInventory.item))
.join(ShopItem)
.where(
UserInventory.user_id == user.id,
ShopItem.code == item_code,
UserInventory.quantity > 0,
)
)
inv_item = result.scalar_one_or_none()
if not inv_item:
raise HTTPException(
status_code=400,
detail=f"You don't have any {item_code} in your inventory"
)
# Decrease quantity
inv_item.quantity -= 1
return inv_item.item
async def get_consumable_count(
self,
db: AsyncSession,
user_id: int,
item_code: str,
) -> int:
"""Get how many of a consumable user has"""
result = await db.execute(
select(UserInventory.quantity)
.join(ShopItem)
.where(
UserInventory.user_id == user_id,
ShopItem.code == item_code,
)
)
quantity = result.scalar_one_or_none()
return quantity or 0
def consume_boost_on_complete(self, participant: Participant) -> float:
"""
Consume boost when completing assignment (called from wheel.py).
One-time use - boost is consumed after single complete.
Returns: Multiplier value (BOOST_MULTIPLIER if boost was active, 1.0 otherwise)
"""
if participant.has_active_boost:
participant.has_active_boost = False
return self.BOOST_MULTIPLIER
return 1.0
def consume_lucky_dice_on_complete(self, participant: Participant) -> float:
"""
Consume lucky dice when completing assignment (called from wheel.py).
One-time use - consumed after single complete.
Returns: Multiplier value (rolled multiplier if active, 1.0 otherwise)
"""
if participant.has_lucky_dice and participant.lucky_dice_multiplier is not None:
multiplier = participant.lucky_dice_multiplier
participant.has_lucky_dice = False
participant.lucky_dice_multiplier = None
return multiplier
return 1.0
def save_drop_for_undo(
self,
participant: Participant,
points_lost: int,
streak_before: int,
) -> None:
"""
Save drop data for potential undo (called from wheel.py before dropping).
"""
participant.last_drop_points = points_lost
participant.last_drop_streak_before = streak_before
participant.can_undo = True
# Singleton instance
consumables_service = ConsumablesService()