"""SSL certificate monitoring.""" import ssl import socket from datetime import datetime, timezone from dataclasses import dataclass from typing import Optional from urllib.parse import urlparse from database import save_ssl_info, get_ssl_info from alerts import alert_ssl_expiring, alert_ssl_expired @dataclass class SSLInfo: domain: str issuer: str expires_at: datetime days_until_expiry: int is_valid: bool error: Optional[str] = None def check_ssl_certificate(url: str) -> Optional[SSLInfo]: """Check SSL certificate for a URL.""" try: parsed = urlparse(url) hostname = parsed.hostname if not hostname: return None # Skip non-HTTPS or localhost if parsed.scheme != "https" or hostname in ("localhost", "127.0.0.1"): return None context = ssl.create_default_context() conn = context.wrap_socket( socket.socket(socket.AF_INET), server_hostname=hostname ) conn.settimeout(10.0) try: conn.connect((hostname, parsed.port or 443)) cert = conn.getpeercert() finally: conn.close() if not cert: return SSLInfo( domain=hostname, issuer="Unknown", expires_at=datetime.now(timezone.utc), days_until_expiry=0, is_valid=False, error="No certificate found" ) # Parse expiry date not_after = cert.get("notAfter", "") expires_at = datetime.strptime(not_after, "%b %d %H:%M:%S %Y %Z") expires_at = expires_at.replace(tzinfo=timezone.utc) # Calculate days until expiry now = datetime.now(timezone.utc) days_until_expiry = (expires_at - now).days # Get issuer issuer_parts = cert.get("issuer", ()) issuer = "Unknown" for part in issuer_parts: for key, value in part: if key == "organizationName": issuer = value break return SSLInfo( domain=hostname, issuer=issuer, expires_at=expires_at, days_until_expiry=days_until_expiry, is_valid=days_until_expiry > 0 ) except ssl.SSLCertVerificationError as e: hostname = urlparse(url).hostname or url return SSLInfo( domain=hostname, issuer="Invalid", expires_at=datetime.now(timezone.utc), days_until_expiry=0, is_valid=False, error=f"SSL verification failed: {str(e)[:100]}" ) except Exception as e: hostname = urlparse(url).hostname or url return SSLInfo( domain=hostname, issuer="Unknown", expires_at=datetime.now(timezone.utc), days_until_expiry=0, is_valid=False, error=str(e)[:100] ) async def check_and_alert_ssl(url: str, warn_days: int = 14) -> Optional[SSLInfo]: """Check SSL and send alerts if needed.""" ssl_info = check_ssl_certificate(url) if not ssl_info: return None # Save to database save_ssl_info( domain=ssl_info.domain, issuer=ssl_info.issuer, expires_at=ssl_info.expires_at, days_until_expiry=ssl_info.days_until_expiry ) # Check if we need to alert prev_info = get_ssl_info(ssl_info.domain) if ssl_info.days_until_expiry <= 0: # Certificate expired await alert_ssl_expired(ssl_info.domain) elif ssl_info.days_until_expiry <= warn_days: # Certificate expiring soon - alert once per day should_alert = True if prev_info and prev_info.get("checked_at"): # Check if we already alerted today last_check = datetime.fromisoformat(prev_info["checked_at"]) if (datetime.now() - last_check).days < 1: should_alert = False if should_alert: await alert_ssl_expiring(ssl_info.domain, ssl_info.days_until_expiry) return ssl_info