Files
game-marathon/backend/app/services/telegram_notifier.py
mamonov.ep f78eacb1a5 Добавлен Skip with Exile, модерация марафонов и выдача предметов
## Skip with Exile (новый расходник)
- Новая модель ExiledGame для хранения изгнанных игр
- Расходник skip_exile: пропуск без штрафа + игра исключается из пула навсегда
- Фильтрация изгнанных игр при выдаче заданий
- UI кнопка в PlayPage для использования skip_exile

## Модерация марафонов (для организаторов)
- Эндпоинты: skip-assignment, exiled-games, restore-exiled-game
- UI в LeaderboardPage: кнопка скипа у каждого участника
- Выбор типа скипа (обычный/с изгнанием) + причина
- Telegram уведомления о модерации

## Админская выдача предметов
- Эндпоинты: admin grant/remove items, get user inventory
- Новая страница AdminGrantItemPage (как магазин)
- Telegram уведомление при получении подарка

## Исправления миграций
- Миграции 029/030 теперь идемпотентны (проверка существования таблиц)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2026-01-10 23:02:37 +03:00

665 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import logging
from typing import List
import httpx
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.config import settings
from app.models import User, Participant, Marathon
logger = logging.getLogger(__name__)
class TelegramNotifier:
"""Service for sending Telegram notifications."""
def __init__(self):
self.bot_token = settings.TELEGRAM_BOT_TOKEN
self.api_url = f"https://api.telegram.org/bot{self.bot_token}"
async def send_message(
self,
chat_id: int,
text: str,
parse_mode: str = "HTML",
reply_markup: dict | None = None
) -> bool:
"""Send a message to a Telegram chat."""
if not self.bot_token:
logger.warning("Telegram bot token not configured")
return False
try:
async with httpx.AsyncClient() as client:
payload = {
"chat_id": chat_id,
"text": text,
"parse_mode": parse_mode
}
if reply_markup:
payload["reply_markup"] = reply_markup
response = await client.post(
f"{self.api_url}/sendMessage",
json=payload,
timeout=10.0
)
if response.status_code == 200:
return True
else:
logger.error(f"Failed to send message: {response.text}")
return False
except Exception as e:
logger.error(f"Error sending Telegram message: {e}")
return False
async def send_photo(
self,
chat_id: int,
photo: bytes,
caption: str | None = None,
parse_mode: str = "HTML",
filename: str = "photo.jpg",
content_type: str = "image/jpeg"
) -> bool:
"""Send a photo to a Telegram chat."""
if not self.bot_token:
logger.warning("Telegram bot token not configured")
return False
try:
timeout = httpx.Timeout(connect=30.0, read=60.0, write=120.0, pool=30.0)
async with httpx.AsyncClient(timeout=timeout) as client:
data = {"chat_id": str(chat_id)}
if caption:
data["caption"] = caption
data["parse_mode"] = parse_mode
files = {"photo": (filename, photo, content_type)}
response = await client.post(
f"{self.api_url}/sendPhoto",
data=data,
files=files,
)
if response.status_code == 200:
return True
else:
logger.error(f"Failed to send photo to {chat_id}: {response.status_code} - {response.text}")
return False
except Exception as e:
logger.error(f"Error sending Telegram photo to {chat_id}: {type(e).__name__}: {e}")
return False
async def send_video(
self,
chat_id: int,
video: bytes,
caption: str | None = None,
parse_mode: str = "HTML",
filename: str = "video.mp4",
content_type: str = "video/mp4"
) -> bool:
"""Send a video to a Telegram chat."""
if not self.bot_token:
logger.warning("Telegram bot token not configured")
return False
try:
timeout = httpx.Timeout(connect=30.0, read=120.0, write=300.0, pool=30.0)
async with httpx.AsyncClient(timeout=timeout) as client:
data = {"chat_id": str(chat_id)}
if caption:
data["caption"] = caption
data["parse_mode"] = parse_mode
files = {"video": (filename, video, content_type)}
response = await client.post(
f"{self.api_url}/sendVideo",
data=data,
files=files,
)
if response.status_code == 200:
return True
else:
logger.error(f"Failed to send video to {chat_id}: {response.status_code} - {response.text}")
return False
except Exception as e:
logger.error(f"Error sending Telegram video to {chat_id}: {type(e).__name__}: {e}")
return False
async def send_media_group(
self,
chat_id: int,
media_items: list[dict],
caption: str | None = None,
parse_mode: str = "HTML"
) -> bool:
"""
Send a media group (multiple photos/videos) to a Telegram chat.
media_items: list of dicts with keys:
- type: "photo" or "video"
- data: bytes
- filename: str
- content_type: str
"""
if not self.bot_token:
logger.warning("Telegram bot token not configured")
return False
if not media_items:
return False
try:
import json
# Use longer timeouts for file uploads
timeout = httpx.Timeout(
connect=30.0,
read=120.0,
write=300.0, # 5 minutes for uploading files
pool=30.0
)
async with httpx.AsyncClient(timeout=timeout) as client:
# Build media array and files dict
media_array = []
files_dict = {}
for i, item in enumerate(media_items):
attach_name = f"media{i}"
media_obj = {
"type": item["type"],
"media": f"attach://{attach_name}"
}
# Only first item gets the caption
if i == 0 and caption:
media_obj["caption"] = caption
media_obj["parse_mode"] = parse_mode
media_array.append(media_obj)
files_dict[attach_name] = (
item.get("filename", f"file{i}"),
item["data"],
item.get("content_type", "application/octet-stream")
)
data = {
"chat_id": str(chat_id),
"media": json.dumps(media_array)
}
logger.info(f"Sending media group to {chat_id}: {len(media_items)} files")
response = await client.post(
f"{self.api_url}/sendMediaGroup",
data=data,
files=files_dict,
)
if response.status_code == 200:
logger.info(f"Successfully sent media group to {chat_id}")
return True
else:
logger.error(f"Failed to send media group to {chat_id}: {response.status_code} - {response.text}")
return False
except Exception as e:
logger.error(f"Error sending Telegram media group to {chat_id}: {type(e).__name__}: {e}")
return False
async def send_media_message(
self,
chat_id: int,
text: str | None = None,
media_type: str | None = None,
media_data: bytes | None = None,
media_items: list[dict] | None = None,
parse_mode: str = "HTML"
) -> bool:
"""
Send a message with optional media.
For single media: use media_type and media_data
For multiple media: use media_items list with dicts containing:
- type: "photo" or "video"
- data: bytes
- filename: str (optional)
- content_type: str (optional)
"""
# Multiple media - use media group
if media_items and len(media_items) > 1:
return await self.send_media_group(chat_id, media_items, text, parse_mode)
# Single media from media_items
if media_items and len(media_items) == 1:
item = media_items[0]
if item["type"] == "photo":
return await self.send_photo(
chat_id, item["data"], text, parse_mode,
item.get("filename", "photo.jpg"),
item.get("content_type", "image/jpeg")
)
elif item["type"] == "video":
return await self.send_video(
chat_id, item["data"], text, parse_mode,
item.get("filename", "video.mp4"),
item.get("content_type", "video/mp4")
)
# Legacy single media support
if media_data and media_type:
if media_type == "photo":
return await self.send_photo(chat_id, media_data, text, parse_mode)
elif media_type == "video":
return await self.send_video(chat_id, media_data, text, parse_mode)
if text:
return await self.send_message(chat_id, text, parse_mode)
return False
async def notify_user(
self,
db: AsyncSession,
user_id: int,
message: str,
reply_markup: dict | None = None
) -> bool:
"""Send notification to a user by user_id."""
result = await db.execute(
select(User).where(User.id == user_id)
)
user = result.scalar_one_or_none()
if not user:
logger.warning(f"[Notify] User {user_id} not found")
return False
if not user.telegram_id:
logger.warning(f"[Notify] User {user_id} ({user.nickname}) has no telegram_id")
return False
logger.info(f"[Notify] Sending to user {user.nickname} (telegram_id={user.telegram_id})")
return await self.send_message(user.telegram_id, message, reply_markup=reply_markup)
async def notify_marathon_participants(
self,
db: AsyncSession,
marathon_id: int,
message: str,
exclude_user_id: int | None = None,
check_setting: str | None = None
) -> int:
"""Send notification to all marathon participants with linked Telegram.
Args:
check_setting: If provided, only send to users with this setting enabled.
Options: 'notify_events', 'notify_disputes', 'notify_moderation'
"""
result = await db.execute(
select(User)
.join(Participant, Participant.user_id == User.id)
.where(
Participant.marathon_id == marathon_id,
User.telegram_id.isnot(None)
)
)
users = result.scalars().all()
sent_count = 0
for user in users:
if exclude_user_id and user.id == exclude_user_id:
continue
# Check notification setting if specified
if check_setting and not getattr(user, check_setting, True):
logger.info(f"[Notify] Skipping user {user.nickname} - {check_setting} is disabled")
continue
if await self.send_message(user.telegram_id, message):
sent_count += 1
return sent_count
# Notification templates
async def notify_event_start(
self,
db: AsyncSession,
marathon_id: int,
event_type: str,
marathon_title: str
) -> int:
"""Notify participants about event start (respects notify_events setting)."""
event_messages = {
"golden_hour": f"🌟 <b>Начался Golden Hour</b> в «{marathon_title}»!\n\nВсе очки x1.5 в течение часа!",
"jackpot": f"🎰 <b>JACKPOT</b> в «{marathon_title}»!\n\nОчки x3 за следующий сложный челлендж!",
"double_risk": f"⚡ <b>Double Risk</b> в «{marathon_title}»!\n\nПоловина очков, но дропы бесплатны!",
"common_enemy": f"👥 <b>Common Enemy</b> в «{marathon_title}»!\n\nВсе получают одинаковый челлендж. Первые 3 — бонус!",
"swap": f"🔄 <b>Swap</b> в «{marathon_title}»!\n\nМожно поменяться заданием с другим участником!",
"game_choice": f"🎲 <b>Выбор игры</b> в «{marathon_title}»!\n\nВыбери игру и один из 3 челленджей!"
}
message = event_messages.get(
event_type,
f"📌 Новое событие в «{marathon_title}»!"
)
return await self.notify_marathon_participants(
db, marathon_id, message, check_setting='notify_events'
)
async def notify_event_end(
self,
db: AsyncSession,
marathon_id: int,
event_type: str,
marathon_title: str
) -> int:
"""Notify participants about event end (respects notify_events setting)."""
event_names = {
"golden_hour": "Golden Hour",
"jackpot": "Jackpot",
"double_risk": "Double Risk",
"common_enemy": "Common Enemy",
"swap": "Swap",
"game_choice": "Выбор игры"
}
event_name = event_names.get(event_type, "Событие")
message = f"⏰ <b>{event_name}</b> в «{marathon_title}» завершён"
return await self.notify_marathon_participants(
db, marathon_id, message, check_setting='notify_events'
)
async def notify_marathon_start(
self,
db: AsyncSession,
marathon_id: int,
marathon_title: str
) -> int:
"""Notify participants about marathon start."""
message = (
f"🚀 <b>Марафон «{marathon_title}» начался!</b>\n\n"
f"Время крутить колесо и получить первое задание!"
)
return await self.notify_marathon_participants(db, marathon_id, message)
async def notify_marathon_finish(
self,
db: AsyncSession,
marathon_id: int,
marathon_title: str
) -> int:
"""Notify participants about marathon finish."""
message = (
f"🏆 <b>Марафон «{marathon_title}» завершён!</b>\n\n"
f"Зайди на сайт, чтобы увидеть итоговую таблицу!"
)
return await self.notify_marathon_participants(db, marathon_id, message)
async def notify_dispute_raised(
self,
db: AsyncSession,
user_id: int,
marathon_title: str,
challenge_title: str,
assignment_id: int
) -> bool:
"""Notify user about dispute raised on their assignment (respects notify_disputes setting)."""
# Check user's notification settings
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if user and not user.notify_disputes:
logger.info(f"[Dispute] Skipping user {user.nickname} - notify_disputes is disabled")
return False
logger.info(f"[Dispute] Sending notification to user_id={user_id} for assignment_id={assignment_id}")
dispute_url = f"{settings.FRONTEND_URL}/assignments/{assignment_id}"
logger.info(f"[Dispute] URL: {dispute_url}")
# Telegram requires HTTPS for inline keyboard URLs
use_inline_button = dispute_url.startswith("https://")
if use_inline_button:
message = (
f"⚠️ <b>На твоё задание подан спор</b>\n\n"
f"Марафон: {marathon_title}\n"
f"Задание: {challenge_title}"
)
reply_markup = {
"inline_keyboard": [[
{"text": "Открыть спор", "url": dispute_url}
]]
}
else:
message = (
f"⚠️ <b>На твоё задание подан спор</b>\n\n"
f"Марафон: {marathon_title}\n"
f"Задание: {challenge_title}\n\n"
f"🔗 {dispute_url}"
)
reply_markup = None
result = await self.notify_user(db, user_id, message, reply_markup=reply_markup)
logger.info(f"[Dispute] Notification result: {result}")
return result
async def notify_dispute_resolved(
self,
db: AsyncSession,
user_id: int,
marathon_title: str,
challenge_title: str,
is_valid: bool
) -> bool:
"""Notify user about dispute resolution (respects notify_disputes setting)."""
# Check user's notification settings
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if user and not user.notify_disputes:
logger.info(f"[Dispute] Skipping user {user.nickname} - notify_disputes is disabled")
return False
if is_valid:
message = (
f"❌ <b>Спор признан обоснованным</b>\n\n"
f"Марафон: {marathon_title}\n"
f"Задание: {challenge_title}\n\n"
f"Задание возвращено. Выполни его заново."
)
else:
message = (
f"✅ <b>Спор отклонён</b>\n\n"
f"Марафон: {marathon_title}\n"
f"Задание: {challenge_title}\n\n"
f"Твоё выполнение засчитано!"
)
return await self.notify_user(db, user_id, message)
async def notify_game_approved(
self,
db: AsyncSession,
user_id: int,
marathon_title: str,
game_title: str
) -> bool:
"""Notify user that their proposed game was approved (respects notify_moderation setting)."""
# Check user's notification settings
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if user and not user.notify_moderation:
logger.info(f"[Moderation] Skipping user {user.nickname} - notify_moderation is disabled")
return False
message = (
f"✅ <b>Твоя игра одобрена!</b>\n\n"
f"Марафон: {marathon_title}\n"
f"Игра: {game_title}\n\n"
f"Теперь она доступна для всех участников."
)
return await self.notify_user(db, user_id, message)
async def notify_game_rejected(
self,
db: AsyncSession,
user_id: int,
marathon_title: str,
game_title: str
) -> bool:
"""Notify user that their proposed game was rejected (respects notify_moderation setting)."""
# Check user's notification settings
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if user and not user.notify_moderation:
logger.info(f"[Moderation] Skipping user {user.nickname} - notify_moderation is disabled")
return False
message = (
f"❌ <b>Твоя игра отклонена</b>\n\n"
f"Марафон: {marathon_title}\n"
f"Игра: {game_title}\n\n"
f"Ты можешь предложить другую игру."
)
return await self.notify_user(db, user_id, message)
async def notify_challenge_approved(
self,
db: AsyncSession,
user_id: int,
marathon_title: str,
game_title: str,
challenge_title: str
) -> bool:
"""Notify user that their proposed challenge was approved (respects notify_moderation setting)."""
# Check user's notification settings
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if user and not user.notify_moderation:
logger.info(f"[Moderation] Skipping user {user.nickname} - notify_moderation is disabled")
return False
message = (
f"✅ <b>Твой челлендж одобрен!</b>\n\n"
f"Марафон: {marathon_title}\n"
f"Игра: {game_title}\n"
f"Задание: {challenge_title}\n\n"
f"Теперь оно доступно для всех участников."
)
return await self.notify_user(db, user_id, message)
async def notify_challenge_rejected(
self,
db: AsyncSession,
user_id: int,
marathon_title: str,
game_title: str,
challenge_title: str
) -> bool:
"""Notify user that their proposed challenge was rejected (respects notify_moderation setting)."""
# Check user's notification settings
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if user and not user.notify_moderation:
logger.info(f"[Moderation] Skipping user {user.nickname} - notify_moderation is disabled")
return False
message = (
f"❌ <b>Твой челлендж отклонён</b>\n\n"
f"Марафон: {marathon_title}\n"
f"Игра: {game_title}\n"
f"Задание: {challenge_title}\n\n"
f"Ты можешь предложить другой челлендж."
)
return await self.notify_user(db, user_id, message)
async def notify_admin_disputes_pending(
self,
db: AsyncSession,
count: int
) -> bool:
"""Notify admin about disputes waiting for decision."""
if not settings.TELEGRAM_ADMIN_ID:
logger.warning("[Notify] No TELEGRAM_ADMIN_ID configured")
return False
admin_url = f"{settings.FRONTEND_URL}/admin/disputes"
use_inline_button = admin_url.startswith("https://")
if use_inline_button:
message = (
f"⚠️ <b>{count} оспаривани{'е' if count == 1 else 'й'} ожида{'ет' if count == 1 else 'ют'} решения</b>\n\n"
f"Голосование завершено, требуется ваше решение."
)
reply_markup = {
"inline_keyboard": [[
{"text": "Открыть оспаривания", "url": admin_url}
]]
}
else:
message = (
f"⚠️ <b>{count} оспаривани{'е' if count == 1 else 'й'} ожида{'ет' if count == 1 else 'ют'} решения</b>\n\n"
f"Голосование завершено, требуется ваше решение.\n\n"
f"🔗 {admin_url}"
)
reply_markup = None
return await self.send_message(
int(settings.TELEGRAM_ADMIN_ID),
message,
reply_markup=reply_markup
)
async def notify_assignment_skipped_by_moderator(
self,
db,
user,
marathon_title: str,
game_title: str,
exiled: bool,
reason: str | None,
moderator_nickname: str,
) -> bool:
"""Notify participant that their assignment was skipped by organizer"""
if not user.telegram_id or not user.notify_moderation:
return False
exile_text = "\n🚫 Игра исключена из вашего пула" if exiled else ""
reason_text = f"\n📝 Причина: {reason}" if reason else ""
message = (
f"⏭️ <b>Задание пропущено</b>\n\n"
f"Марафон: {marathon_title}\n"
f"Игра: {game_title}\n"
f"Организатор: {moderator_nickname}"
f"{exile_text}"
f"{reason_text}\n\n"
f"Вы можете крутить колесо заново."
)
return await self.send_message(user.telegram_id, message)
async def notify_item_granted(
self,
user,
item_name: str,
quantity: int,
reason: str,
admin_nickname: str,
) -> bool:
"""Notify user that they received an item from admin"""
if not user.telegram_id:
return False
message = (
f"🎁 <b>Вы получили подарок!</b>\n\n"
f"Предмет: {item_name}\n"
f"Количество: {quantity}\n"
f"От: {admin_nickname}\n"
f"Причина: {reason}"
)
return await self.send_message(user.telegram_id, message)
# Global instance
telegram_notifier = TelegramNotifier()