141 lines
4.0 KiB
Python
141 lines
4.0 KiB
Python
"""SSL certificate monitoring."""
|
|
import ssl
|
|
import socket
|
|
from datetime import datetime, timezone
|
|
from dataclasses import dataclass
|
|
from typing import Optional
|
|
from urllib.parse import urlparse
|
|
|
|
from database import save_ssl_info, get_ssl_info
|
|
from alerts import alert_ssl_expiring, alert_ssl_expired
|
|
|
|
|
|
@dataclass
|
|
class SSLInfo:
|
|
domain: str
|
|
issuer: str
|
|
expires_at: datetime
|
|
days_until_expiry: int
|
|
is_valid: bool
|
|
error: Optional[str] = None
|
|
|
|
|
|
def check_ssl_certificate(url: str) -> Optional[SSLInfo]:
|
|
"""Check SSL certificate for a URL."""
|
|
try:
|
|
parsed = urlparse(url)
|
|
hostname = parsed.hostname
|
|
|
|
if not hostname:
|
|
return None
|
|
|
|
# Skip non-HTTPS or localhost
|
|
if parsed.scheme != "https" or hostname in ("localhost", "127.0.0.1"):
|
|
return None
|
|
|
|
context = ssl.create_default_context()
|
|
conn = context.wrap_socket(
|
|
socket.socket(socket.AF_INET),
|
|
server_hostname=hostname
|
|
)
|
|
conn.settimeout(10.0)
|
|
|
|
try:
|
|
conn.connect((hostname, parsed.port or 443))
|
|
cert = conn.getpeercert()
|
|
finally:
|
|
conn.close()
|
|
|
|
if not cert:
|
|
return SSLInfo(
|
|
domain=hostname,
|
|
issuer="Unknown",
|
|
expires_at=datetime.now(timezone.utc),
|
|
days_until_expiry=0,
|
|
is_valid=False,
|
|
error="No certificate found"
|
|
)
|
|
|
|
# Parse expiry date
|
|
not_after = cert.get("notAfter", "")
|
|
expires_at = datetime.strptime(not_after, "%b %d %H:%M:%S %Y %Z")
|
|
expires_at = expires_at.replace(tzinfo=timezone.utc)
|
|
|
|
# Calculate days until expiry
|
|
now = datetime.now(timezone.utc)
|
|
days_until_expiry = (expires_at - now).days
|
|
|
|
# Get issuer
|
|
issuer_parts = cert.get("issuer", ())
|
|
issuer = "Unknown"
|
|
for part in issuer_parts:
|
|
for key, value in part:
|
|
if key == "organizationName":
|
|
issuer = value
|
|
break
|
|
|
|
return SSLInfo(
|
|
domain=hostname,
|
|
issuer=issuer,
|
|
expires_at=expires_at,
|
|
days_until_expiry=days_until_expiry,
|
|
is_valid=days_until_expiry > 0
|
|
)
|
|
|
|
except ssl.SSLCertVerificationError as e:
|
|
hostname = urlparse(url).hostname or url
|
|
return SSLInfo(
|
|
domain=hostname,
|
|
issuer="Invalid",
|
|
expires_at=datetime.now(timezone.utc),
|
|
days_until_expiry=0,
|
|
is_valid=False,
|
|
error=f"SSL verification failed: {str(e)[:100]}"
|
|
)
|
|
except Exception as e:
|
|
hostname = urlparse(url).hostname or url
|
|
return SSLInfo(
|
|
domain=hostname,
|
|
issuer="Unknown",
|
|
expires_at=datetime.now(timezone.utc),
|
|
days_until_expiry=0,
|
|
is_valid=False,
|
|
error=str(e)[:100]
|
|
)
|
|
|
|
|
|
async def check_and_alert_ssl(url: str, warn_days: int = 14) -> Optional[SSLInfo]:
|
|
"""Check SSL and send alerts if needed."""
|
|
ssl_info = check_ssl_certificate(url)
|
|
|
|
if not ssl_info:
|
|
return None
|
|
|
|
# Save to database
|
|
save_ssl_info(
|
|
domain=ssl_info.domain,
|
|
issuer=ssl_info.issuer,
|
|
expires_at=ssl_info.expires_at,
|
|
days_until_expiry=ssl_info.days_until_expiry
|
|
)
|
|
|
|
# Check if we need to alert
|
|
prev_info = get_ssl_info(ssl_info.domain)
|
|
|
|
if ssl_info.days_until_expiry <= 0:
|
|
# Certificate expired
|
|
await alert_ssl_expired(ssl_info.domain)
|
|
elif ssl_info.days_until_expiry <= warn_days:
|
|
# Certificate expiring soon - alert once per day
|
|
should_alert = True
|
|
if prev_info and prev_info.get("checked_at"):
|
|
# Check if we already alerted today
|
|
last_check = datetime.fromisoformat(prev_info["checked_at"])
|
|
if (datetime.now() - last_check).days < 1:
|
|
should_alert = False
|
|
|
|
if should_alert:
|
|
await alert_ssl_expiring(ssl_info.domain, ssl_info.days_until_expiry)
|
|
|
|
return ssl_info
|