"""Telegram alerting for status changes.""" import os from datetime import datetime from typing import Optional import httpx TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN", "") TELEGRAM_ADMIN_ID = os.getenv("TELEGRAM_ADMIN_ID", "") async def send_telegram_alert(message: str, is_recovery: bool = False) -> bool: """Send alert to Telegram.""" if not TELEGRAM_BOT_TOKEN or not TELEGRAM_ADMIN_ID: print("Telegram alerting not configured") return False emoji = "\u2705" if is_recovery else "\u26a0\ufe0f" text = f"{emoji} *Status Alert*\n\n{message}" url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage" data = { "chat_id": TELEGRAM_ADMIN_ID, "text": text, "parse_mode": "Markdown", } try: async with httpx.AsyncClient(timeout=10.0) as client: response = await client.post(url, json=data) response.raise_for_status() print(f"Telegram alert sent: {message[:50]}...") return True except Exception as e: print(f"Failed to send Telegram alert: {e}") return False async def alert_service_down(service_name: str, display_name: str, message: Optional[str]): """Alert when service goes down.""" now = datetime.now().strftime("%d.%m.%Y %H:%M:%S") text = ( f"*{display_name}* is DOWN\n\n" f"Time: `{now}`\n" ) if message: text += f"Error: `{message}`" await send_telegram_alert(text, is_recovery=False) async def alert_service_recovered(service_name: str, display_name: str, downtime_minutes: int): """Alert when service recovers.""" now = datetime.now().strftime("%d.%m.%Y %H:%M:%S") text = ( f"*{display_name}* is back ONLINE\n\n" f"Time: `{now}`\n" f"Downtime: `{downtime_minutes} min`" ) await send_telegram_alert(text, is_recovery=True) async def alert_ssl_expiring(domain: str, days_left: int): """Alert when SSL certificate is expiring soon.""" text = ( f"*SSL Certificate Expiring*\n\n" f"Domain: `{domain}`\n" f"Days left: `{days_left}`\n\n" f"Please renew the certificate!" ) await send_telegram_alert(text, is_recovery=False) async def alert_ssl_expired(domain: str): """Alert when SSL certificate has expired.""" text = ( f"*SSL Certificate EXPIRED*\n\n" f"Domain: `{domain}`\n\n" f"Certificate has expired! Site may show security warnings." ) await send_telegram_alert(text, is_recovery=False)