import logging from typing import List import httpx from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.core.config import settings from app.models import User, Participant, Marathon logger = logging.getLogger(__name__) class TelegramNotifier: """Service for sending Telegram notifications.""" def __init__(self): self.bot_token = settings.TELEGRAM_BOT_TOKEN self.api_url = f"https://api.telegram.org/bot{self.bot_token}" async def send_message( self, chat_id: int, text: str, parse_mode: str = "HTML", reply_markup: dict | None = None ) -> bool: """Send a message to a Telegram chat.""" if not self.bot_token: logger.warning("Telegram bot token not configured") return False try: async with httpx.AsyncClient() as client: payload = { "chat_id": chat_id, "text": text, "parse_mode": parse_mode } if reply_markup: payload["reply_markup"] = reply_markup response = await client.post( f"{self.api_url}/sendMessage", json=payload, timeout=10.0 ) if response.status_code == 200: return True else: logger.error(f"Failed to send message: {response.text}") return False except Exception as e: logger.error(f"Error sending Telegram message: {e}") return False async def send_photo( self, chat_id: int, photo: bytes, caption: str | None = None, parse_mode: str = "HTML", filename: str = "photo.jpg", content_type: str = "image/jpeg" ) -> bool: """Send a photo to a Telegram chat.""" if not self.bot_token: logger.warning("Telegram bot token not configured") return False try: timeout = httpx.Timeout(connect=30.0, read=60.0, write=120.0, pool=30.0) async with httpx.AsyncClient(timeout=timeout) as client: data = {"chat_id": str(chat_id)} if caption: data["caption"] = caption data["parse_mode"] = parse_mode files = {"photo": (filename, photo, content_type)} response = await client.post( f"{self.api_url}/sendPhoto", data=data, files=files, ) if response.status_code == 200: return True else: logger.error(f"Failed to send photo to {chat_id}: {response.status_code} - {response.text}") return False except Exception as e: logger.error(f"Error sending Telegram photo to {chat_id}: {type(e).__name__}: {e}") return False async def send_video( self, chat_id: int, video: bytes, caption: str | None = None, parse_mode: str = "HTML", filename: str = "video.mp4", content_type: str = "video/mp4" ) -> bool: """Send a video to a Telegram chat.""" if not self.bot_token: logger.warning("Telegram bot token not configured") return False try: timeout = httpx.Timeout(connect=30.0, read=120.0, write=300.0, pool=30.0) async with httpx.AsyncClient(timeout=timeout) as client: data = {"chat_id": str(chat_id)} if caption: data["caption"] = caption data["parse_mode"] = parse_mode files = {"video": (filename, video, content_type)} response = await client.post( f"{self.api_url}/sendVideo", data=data, files=files, ) if response.status_code == 200: return True else: logger.error(f"Failed to send video to {chat_id}: {response.status_code} - {response.text}") return False except Exception as e: logger.error(f"Error sending Telegram video to {chat_id}: {type(e).__name__}: {e}") return False async def send_media_group( self, chat_id: int, media_items: list[dict], caption: str | None = None, parse_mode: str = "HTML" ) -> bool: """ Send a media group (multiple photos/videos) to a Telegram chat. media_items: list of dicts with keys: - type: "photo" or "video" - data: bytes - filename: str - content_type: str """ if not self.bot_token: logger.warning("Telegram bot token not configured") return False if not media_items: return False try: import json # Use longer timeouts for file uploads timeout = httpx.Timeout( connect=30.0, read=120.0, write=300.0, # 5 minutes for uploading files pool=30.0 ) async with httpx.AsyncClient(timeout=timeout) as client: # Build media array and files dict media_array = [] files_dict = {} for i, item in enumerate(media_items): attach_name = f"media{i}" media_obj = { "type": item["type"], "media": f"attach://{attach_name}" } # Only first item gets the caption if i == 0 and caption: media_obj["caption"] = caption media_obj["parse_mode"] = parse_mode media_array.append(media_obj) files_dict[attach_name] = ( item.get("filename", f"file{i}"), item["data"], item.get("content_type", "application/octet-stream") ) data = { "chat_id": str(chat_id), "media": json.dumps(media_array) } logger.info(f"Sending media group to {chat_id}: {len(media_items)} files") response = await client.post( f"{self.api_url}/sendMediaGroup", data=data, files=files_dict, ) if response.status_code == 200: logger.info(f"Successfully sent media group to {chat_id}") return True else: logger.error(f"Failed to send media group to {chat_id}: {response.status_code} - {response.text}") return False except Exception as e: logger.error(f"Error sending Telegram media group to {chat_id}: {type(e).__name__}: {e}") return False async def send_media_message( self, chat_id: int, text: str | None = None, media_type: str | None = None, media_data: bytes | None = None, media_items: list[dict] | None = None, parse_mode: str = "HTML" ) -> bool: """ Send a message with optional media. For single media: use media_type and media_data For multiple media: use media_items list with dicts containing: - type: "photo" or "video" - data: bytes - filename: str (optional) - content_type: str (optional) """ # Multiple media - use media group if media_items and len(media_items) > 1: return await self.send_media_group(chat_id, media_items, text, parse_mode) # Single media from media_items if media_items and len(media_items) == 1: item = media_items[0] if item["type"] == "photo": return await self.send_photo( chat_id, item["data"], text, parse_mode, item.get("filename", "photo.jpg"), item.get("content_type", "image/jpeg") ) elif item["type"] == "video": return await self.send_video( chat_id, item["data"], text, parse_mode, item.get("filename", "video.mp4"), item.get("content_type", "video/mp4") ) # Legacy single media support if media_data and media_type: if media_type == "photo": return await self.send_photo(chat_id, media_data, text, parse_mode) elif media_type == "video": return await self.send_video(chat_id, media_data, text, parse_mode) if text: return await self.send_message(chat_id, text, parse_mode) return False async def notify_user( self, db: AsyncSession, user_id: int, message: str, reply_markup: dict | None = None ) -> bool: """Send notification to a user by user_id.""" result = await db.execute( select(User).where(User.id == user_id) ) user = result.scalar_one_or_none() if not user: logger.warning(f"[Notify] User {user_id} not found") return False if not user.telegram_id: logger.warning(f"[Notify] User {user_id} ({user.nickname}) has no telegram_id") return False logger.info(f"[Notify] Sending to user {user.nickname} (telegram_id={user.telegram_id})") return await self.send_message(user.telegram_id, message, reply_markup=reply_markup) async def notify_marathon_participants( self, db: AsyncSession, marathon_id: int, message: str, exclude_user_id: int | None = None, check_setting: str | None = None ) -> int: """Send notification to all marathon participants with linked Telegram. Args: check_setting: If provided, only send to users with this setting enabled. Options: 'notify_events', 'notify_disputes', 'notify_moderation' """ result = await db.execute( select(User) .join(Participant, Participant.user_id == User.id) .where( Participant.marathon_id == marathon_id, User.telegram_id.isnot(None) ) ) users = result.scalars().all() sent_count = 0 for user in users: if exclude_user_id and user.id == exclude_user_id: continue # Check notification setting if specified if check_setting and not getattr(user, check_setting, True): logger.info(f"[Notify] Skipping user {user.nickname} - {check_setting} is disabled") continue if await self.send_message(user.telegram_id, message): sent_count += 1 return sent_count # Notification templates async def notify_event_start( self, db: AsyncSession, marathon_id: int, event_type: str, marathon_title: str ) -> int: """Notify participants about event start (respects notify_events setting).""" event_messages = { "golden_hour": f"🌟 Начался Golden Hour в «{marathon_title}»!\n\nВсе очки x1.5 в течение часа!", "jackpot": f"🎰 JACKPOT в «{marathon_title}»!\n\nОчки x3 за следующий сложный челлендж!", "double_risk": f"⚡ Double Risk в «{marathon_title}»!\n\nПоловина очков, но дропы бесплатны!", "common_enemy": f"👥 Common Enemy в «{marathon_title}»!\n\nВсе получают одинаковый челлендж. Первые 3 — бонус!", "swap": f"🔄 Swap в «{marathon_title}»!\n\nМожно поменяться заданием с другим участником!", "game_choice": f"🎲 Выбор игры в «{marathon_title}»!\n\nВыбери игру и один из 3 челленджей!" } message = event_messages.get( event_type, f"📌 Новое событие в «{marathon_title}»!" ) return await self.notify_marathon_participants( db, marathon_id, message, check_setting='notify_events' ) async def notify_event_end( self, db: AsyncSession, marathon_id: int, event_type: str, marathon_title: str ) -> int: """Notify participants about event end (respects notify_events setting).""" event_names = { "golden_hour": "Golden Hour", "jackpot": "Jackpot", "double_risk": "Double Risk", "common_enemy": "Common Enemy", "swap": "Swap", "game_choice": "Выбор игры" } event_name = event_names.get(event_type, "Событие") message = f"⏰ {event_name} в «{marathon_title}» завершён" return await self.notify_marathon_participants( db, marathon_id, message, check_setting='notify_events' ) async def notify_marathon_start( self, db: AsyncSession, marathon_id: int, marathon_title: str ) -> int: """Notify participants about marathon start.""" message = ( f"🚀 Марафон «{marathon_title}» начался!\n\n" f"Время крутить колесо и получить первое задание!" ) return await self.notify_marathon_participants(db, marathon_id, message) async def notify_marathon_finish( self, db: AsyncSession, marathon_id: int, marathon_title: str ) -> int: """Notify participants about marathon finish.""" message = ( f"🏆 Марафон «{marathon_title}» завершён!\n\n" f"Зайди на сайт, чтобы увидеть итоговую таблицу!" ) return await self.notify_marathon_participants(db, marathon_id, message) async def notify_dispute_raised( self, db: AsyncSession, user_id: int, marathon_title: str, challenge_title: str, assignment_id: int ) -> bool: """Notify user about dispute raised on their assignment (respects notify_disputes setting).""" # Check user's notification settings result = await db.execute(select(User).where(User.id == user_id)) user = result.scalar_one_or_none() if user and not user.notify_disputes: logger.info(f"[Dispute] Skipping user {user.nickname} - notify_disputes is disabled") return False logger.info(f"[Dispute] Sending notification to user_id={user_id} for assignment_id={assignment_id}") dispute_url = f"{settings.FRONTEND_URL}/assignments/{assignment_id}" logger.info(f"[Dispute] URL: {dispute_url}") # Telegram requires HTTPS for inline keyboard URLs use_inline_button = dispute_url.startswith("https://") if use_inline_button: message = ( f"⚠️ На твоё задание подан спор\n\n" f"Марафон: {marathon_title}\n" f"Задание: {challenge_title}" ) reply_markup = { "inline_keyboard": [[ {"text": "Открыть спор", "url": dispute_url} ]] } else: message = ( f"⚠️ На твоё задание подан спор\n\n" f"Марафон: {marathon_title}\n" f"Задание: {challenge_title}\n\n" f"🔗 {dispute_url}" ) reply_markup = None result = await self.notify_user(db, user_id, message, reply_markup=reply_markup) logger.info(f"[Dispute] Notification result: {result}") return result async def notify_dispute_resolved( self, db: AsyncSession, user_id: int, marathon_title: str, challenge_title: str, is_valid: bool ) -> bool: """Notify user about dispute resolution (respects notify_disputes setting).""" # Check user's notification settings result = await db.execute(select(User).where(User.id == user_id)) user = result.scalar_one_or_none() if user and not user.notify_disputes: logger.info(f"[Dispute] Skipping user {user.nickname} - notify_disputes is disabled") return False if is_valid: message = ( f"❌ Спор признан обоснованным\n\n" f"Марафон: {marathon_title}\n" f"Задание: {challenge_title}\n\n" f"Задание возвращено. Выполни его заново." ) else: message = ( f"✅ Спор отклонён\n\n" f"Марафон: {marathon_title}\n" f"Задание: {challenge_title}\n\n" f"Твоё выполнение засчитано!" ) return await self.notify_user(db, user_id, message) async def notify_game_approved( self, db: AsyncSession, user_id: int, marathon_title: str, game_title: str ) -> bool: """Notify user that their proposed game was approved (respects notify_moderation setting).""" # Check user's notification settings result = await db.execute(select(User).where(User.id == user_id)) user = result.scalar_one_or_none() if user and not user.notify_moderation: logger.info(f"[Moderation] Skipping user {user.nickname} - notify_moderation is disabled") return False message = ( f"✅ Твоя игра одобрена!\n\n" f"Марафон: {marathon_title}\n" f"Игра: {game_title}\n\n" f"Теперь она доступна для всех участников." ) return await self.notify_user(db, user_id, message) async def notify_game_rejected( self, db: AsyncSession, user_id: int, marathon_title: str, game_title: str ) -> bool: """Notify user that their proposed game was rejected (respects notify_moderation setting).""" # Check user's notification settings result = await db.execute(select(User).where(User.id == user_id)) user = result.scalar_one_or_none() if user and not user.notify_moderation: logger.info(f"[Moderation] Skipping user {user.nickname} - notify_moderation is disabled") return False message = ( f"❌ Твоя игра отклонена\n\n" f"Марафон: {marathon_title}\n" f"Игра: {game_title}\n\n" f"Ты можешь предложить другую игру." ) return await self.notify_user(db, user_id, message) async def notify_challenge_approved( self, db: AsyncSession, user_id: int, marathon_title: str, game_title: str, challenge_title: str ) -> bool: """Notify user that their proposed challenge was approved (respects notify_moderation setting).""" # Check user's notification settings result = await db.execute(select(User).where(User.id == user_id)) user = result.scalar_one_or_none() if user and not user.notify_moderation: logger.info(f"[Moderation] Skipping user {user.nickname} - notify_moderation is disabled") return False message = ( f"✅ Твой челлендж одобрен!\n\n" f"Марафон: {marathon_title}\n" f"Игра: {game_title}\n" f"Задание: {challenge_title}\n\n" f"Теперь оно доступно для всех участников." ) return await self.notify_user(db, user_id, message) async def notify_challenge_rejected( self, db: AsyncSession, user_id: int, marathon_title: str, game_title: str, challenge_title: str ) -> bool: """Notify user that their proposed challenge was rejected (respects notify_moderation setting).""" # Check user's notification settings result = await db.execute(select(User).where(User.id == user_id)) user = result.scalar_one_or_none() if user and not user.notify_moderation: logger.info(f"[Moderation] Skipping user {user.nickname} - notify_moderation is disabled") return False message = ( f"❌ Твой челлендж отклонён\n\n" f"Марафон: {marathon_title}\n" f"Игра: {game_title}\n" f"Задание: {challenge_title}\n\n" f"Ты можешь предложить другой челлендж." ) return await self.notify_user(db, user_id, message) async def notify_admin_disputes_pending( self, db: AsyncSession, count: int ) -> bool: """Notify admin about disputes waiting for decision.""" if not settings.TELEGRAM_ADMIN_ID: logger.warning("[Notify] No TELEGRAM_ADMIN_ID configured") return False admin_url = f"{settings.FRONTEND_URL}/admin/disputes" use_inline_button = admin_url.startswith("https://") if use_inline_button: message = ( f"⚠️ {count} оспаривани{'е' if count == 1 else 'й'} ожида{'ет' if count == 1 else 'ют'} решения\n\n" f"Голосование завершено, требуется ваше решение." ) reply_markup = { "inline_keyboard": [[ {"text": "Открыть оспаривания", "url": admin_url} ]] } else: message = ( f"⚠️ {count} оспаривани{'е' if count == 1 else 'й'} ожида{'ет' if count == 1 else 'ют'} решения\n\n" f"Голосование завершено, требуется ваше решение.\n\n" f"🔗 {admin_url}" ) reply_markup = None return await self.send_message( int(settings.TELEGRAM_ADMIN_ID), message, reply_markup=reply_markup ) # Global instance telegram_notifier = TelegramNotifier()