"""Service monitoring with persistence and alerting.""" import asyncio from datetime import datetime, timedelta from dataclasses import dataclass from typing import Optional from enum import Enum import httpx from database import ( save_metric, get_latency_history, get_uptime_stats, get_avg_latency, create_incident, resolve_incident, get_open_incident, mark_incident_notified ) from alerts import alert_service_down, alert_service_recovered from ssl_monitor import check_and_alert_ssl, SSLInfo class Status(str, Enum): OPERATIONAL = "operational" DEGRADED = "degraded" DOWN = "down" UNKNOWN = "unknown" @dataclass class ServiceStatus: name: str display_name: str status: Status = Status.UNKNOWN latency_ms: Optional[float] = None last_check: Optional[datetime] = None last_incident: Optional[datetime] = None uptime_percent: float = 100.0 message: Optional[str] = None version: Optional[str] = None avg_latency_24h: Optional[float] = None latency_history: list = None # For uptime calculation (in-memory, backed by DB) total_checks: int = 0 successful_checks: int = 0 def __post_init__(self): if self.latency_history is None: self.latency_history = [] def to_dict(self) -> dict: return { "name": self.name, "display_name": self.display_name, "status": self.status.value, "latency_ms": round(self.latency_ms, 2) if self.latency_ms else None, "last_check": self.last_check.isoformat() if self.last_check else None, "last_incident": self.last_incident.isoformat() if self.last_incident else None, "uptime_percent": round(self.uptime_percent, 2), "message": self.message, "version": self.version, "avg_latency_24h": round(self.avg_latency_24h, 2) if self.avg_latency_24h else None, } def update_uptime(self, is_success: bool): self.total_checks += 1 if is_success: self.successful_checks += 1 if self.total_checks > 0: self.uptime_percent = (self.successful_checks / self.total_checks) * 100 class ServiceMonitor: def __init__(self): self.services: dict[str, ServiceStatus] = { "backend": ServiceStatus( name="backend", display_name="Backend API" ), "database": ServiceStatus( name="database", display_name="Database" ), "frontend": ServiceStatus( name="frontend", display_name="Frontend" ), "bot": ServiceStatus( name="bot", display_name="Telegram Bot" ), "external": ServiceStatus( name="external", display_name="External Access" ), } self.last_check: Optional[datetime] = None self.ssl_info: Optional[SSLInfo] = None async def check_backend(self, url: str) -> tuple[Status, Optional[float], Optional[str], Optional[str]]: """Check backend API health.""" try: async with httpx.AsyncClient(timeout=10.0) as client: start = datetime.now() response = await client.get(f"{url}/health") latency = (datetime.now() - start).total_seconds() * 1000 if response.status_code == 200: data = response.json() return Status.OPERATIONAL, latency, None, data.get("version") else: return Status.DEGRADED, latency, f"HTTP {response.status_code}", None except httpx.TimeoutException: return Status.DOWN, None, "Timeout", None except Exception as e: return Status.DOWN, None, str(e)[:100], None async def check_database(self, backend_url: str) -> tuple[Status, Optional[float], Optional[str]]: """Check database through backend.""" try: async with httpx.AsyncClient(timeout=10.0) as client: start = datetime.now() response = await client.get(f"{backend_url}/health") latency = (datetime.now() - start).total_seconds() * 1000 if response.status_code == 200: return Status.OPERATIONAL, latency, None else: return Status.DOWN, latency, "Backend reports unhealthy" except Exception as e: return Status.DOWN, None, "Cannot reach backend" async def check_frontend(self, url: str) -> tuple[Status, Optional[float], Optional[str]]: """Check frontend availability.""" try: async with httpx.AsyncClient(timeout=10.0) as client: start = datetime.now() response = await client.get(url) latency = (datetime.now() - start).total_seconds() * 1000 if response.status_code == 200: return Status.OPERATIONAL, latency, None else: return Status.DEGRADED, latency, f"HTTP {response.status_code}" except httpx.TimeoutException: return Status.DOWN, None, "Timeout" except Exception as e: return Status.DOWN, None, str(e)[:100] async def check_bot(self, url: str) -> tuple[Status, Optional[float], Optional[str]]: """Check Telegram bot health.""" try: async with httpx.AsyncClient(timeout=10.0) as client: start = datetime.now() response = await client.get(f"{url}/health") latency = (datetime.now() - start).total_seconds() * 1000 if response.status_code == 200: return Status.OPERATIONAL, latency, None else: return Status.DEGRADED, latency, f"HTTP {response.status_code}" except httpx.TimeoutException: return Status.DOWN, None, "Timeout" except Exception as e: return Status.DOWN, None, str(e)[:100] async def check_external(self, url: str) -> tuple[Status, Optional[float], Optional[str]]: """Check external (public) URL availability.""" if not url: return Status.UNKNOWN, None, "Not configured" try: async with httpx.AsyncClient(timeout=15.0, follow_redirects=True) as client: start = datetime.now() response = await client.get(url) latency = (datetime.now() - start).total_seconds() * 1000 if response.status_code == 200: return Status.OPERATIONAL, latency, None else: return Status.DEGRADED, latency, f"HTTP {response.status_code}" except httpx.TimeoutException: return Status.DOWN, None, "Timeout" except Exception as e: return Status.DOWN, None, str(e)[:100] async def _process_check_result( self, service_name: str, result: tuple, now: datetime ): """Process check result with DB persistence and alerting.""" if isinstance(result, Exception): return if len(result) == 4: status, latency, message, version = result else: status, latency, message = result version = None svc = self.services[service_name] was_down = svc.status in (Status.DOWN, Status.DEGRADED) is_down = status in (Status.DOWN, Status.DEGRADED) # Update service status svc.status = status svc.latency_ms = latency svc.message = message if version: svc.version = version svc.last_check = now svc.update_uptime(status == Status.OPERATIONAL) # Save metric to database save_metric(service_name, status.value, latency, message) # Load historical data svc.latency_history = get_latency_history(service_name, hours=24) svc.avg_latency_24h = get_avg_latency(service_name, hours=24) # Update uptime from DB stats = get_uptime_stats(service_name, hours=24) if stats["total_checks"] > 0: svc.uptime_percent = stats["uptime_percent"] # Handle incident tracking and alerting if is_down and not was_down: # Service just went down svc.last_incident = now incident_id = create_incident(service_name, status.value, message) await alert_service_down(service_name, svc.display_name, message) mark_incident_notified(incident_id) elif not is_down and was_down: # Service recovered open_incident = get_open_incident(service_name) if open_incident: started_at = datetime.fromisoformat(open_incident["started_at"]) downtime_minutes = int((now - started_at).total_seconds() / 60) resolve_incident(service_name) await alert_service_recovered(service_name, svc.display_name, downtime_minutes) async def check_all_services( self, backend_url: str, frontend_url: str, bot_url: str, external_url: str = "", public_url: str = "" ): """Check all services concurrently.""" now = datetime.now() # Run all checks concurrently results = await asyncio.gather( self.check_backend(backend_url), self.check_database(backend_url), self.check_frontend(frontend_url), self.check_bot(bot_url), self.check_external(external_url), return_exceptions=True ) # Process results service_names = ["backend", "database", "frontend", "bot", "external"] for i, service_name in enumerate(service_names): await self._process_check_result(service_name, results[i], now) # Check SSL certificate (if public URL is HTTPS) if public_url and public_url.startswith("https://"): self.ssl_info = await check_and_alert_ssl(public_url) self.last_check = now def get_all_statuses(self) -> dict[str, ServiceStatus]: return self.services def get_overall_status(self) -> Status: """Get overall system status based on all services.""" # Exclude external from overall status if not configured statuses = [ svc.status for name, svc in self.services.items() if name != "external" or svc.status != Status.UNKNOWN ] if all(s == Status.OPERATIONAL for s in statuses): return Status.OPERATIONAL elif any(s == Status.DOWN for s in statuses): return Status.DOWN elif any(s == Status.DEGRADED for s in statuses): return Status.DEGRADED else: return Status.UNKNOWN def get_ssl_status(self) -> Optional[dict]: """Get SSL certificate status.""" if not self.ssl_info: return None return { "domain": self.ssl_info.domain, "issuer": self.ssl_info.issuer, "expires_at": self.ssl_info.expires_at.isoformat(), "days_until_expiry": self.ssl_info.days_until_expiry, "is_valid": self.ssl_info.is_valid, "error": self.ssl_info.error }