86 lines
2.5 KiB
Python
86 lines
2.5 KiB
Python
"""Telegram alerting for status changes."""
|
|
import os
|
|
from datetime import datetime
|
|
from typing import Optional
|
|
|
|
import httpx
|
|
|
|
|
|
TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN", "")
|
|
TELEGRAM_ADMIN_ID = os.getenv("TELEGRAM_ADMIN_ID", "")
|
|
|
|
|
|
async def send_telegram_alert(message: str, is_recovery: bool = False) -> bool:
|
|
"""Send alert to Telegram."""
|
|
if not TELEGRAM_BOT_TOKEN or not TELEGRAM_ADMIN_ID:
|
|
print("Telegram alerting not configured")
|
|
return False
|
|
|
|
emoji = "\u2705" if is_recovery else "\u26a0\ufe0f"
|
|
text = f"{emoji} *Status Alert*\n\n{message}"
|
|
|
|
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage"
|
|
data = {
|
|
"chat_id": TELEGRAM_ADMIN_ID,
|
|
"text": text,
|
|
"parse_mode": "Markdown",
|
|
}
|
|
|
|
try:
|
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
|
response = await client.post(url, json=data)
|
|
response.raise_for_status()
|
|
print(f"Telegram alert sent: {message[:50]}...")
|
|
return True
|
|
except Exception as e:
|
|
print(f"Failed to send Telegram alert: {e}")
|
|
return False
|
|
|
|
|
|
async def alert_service_down(service_name: str, display_name: str, message: Optional[str]):
|
|
"""Alert when service goes down."""
|
|
now = datetime.now().strftime("%d.%m.%Y %H:%M:%S")
|
|
text = (
|
|
f"*{display_name}* is DOWN\n\n"
|
|
f"Time: `{now}`\n"
|
|
)
|
|
if message:
|
|
text += f"Error: `{message}`"
|
|
|
|
await send_telegram_alert(text, is_recovery=False)
|
|
|
|
|
|
async def alert_service_recovered(service_name: str, display_name: str, downtime_minutes: int):
|
|
"""Alert when service recovers."""
|
|
now = datetime.now().strftime("%d.%m.%Y %H:%M:%S")
|
|
text = (
|
|
f"*{display_name}* is back ONLINE\n\n"
|
|
f"Time: `{now}`\n"
|
|
f"Downtime: `{downtime_minutes} min`"
|
|
)
|
|
|
|
await send_telegram_alert(text, is_recovery=True)
|
|
|
|
|
|
async def alert_ssl_expiring(domain: str, days_left: int):
|
|
"""Alert when SSL certificate is expiring soon."""
|
|
text = (
|
|
f"*SSL Certificate Expiring*\n\n"
|
|
f"Domain: `{domain}`\n"
|
|
f"Days left: `{days_left}`\n\n"
|
|
f"Please renew the certificate!"
|
|
)
|
|
|
|
await send_telegram_alert(text, is_recovery=False)
|
|
|
|
|
|
async def alert_ssl_expired(domain: str):
|
|
"""Alert when SSL certificate has expired."""
|
|
text = (
|
|
f"*SSL Certificate EXPIRED*\n\n"
|
|
f"Domain: `{domain}`\n\n"
|
|
f"Certificate has expired! Site may show security warnings."
|
|
)
|
|
|
|
await send_telegram_alert(text, is_recovery=False)
|