Files
sibuti/transport/backend/app/services/connection_checker.py
2025-12-18 21:13:49 +03:00

84 lines
3.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Фоновая задача для проверки потери связи с объектами.
Создаёт события CONNECTION_LOST если нет данных более N минут.
"""
import asyncio
from datetime import datetime, timedelta
from sqlalchemy import select, desc
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import async_session_maker
from app.models import Vehicle, Position, Event
from app.config import settings
from app.services.websocket_manager import manager
async def check_connections():
"""Проверить все объекты на потерю связи"""
async with async_session_maker() as db:
# Получить все объекты
result = await db.execute(select(Vehicle))
vehicles = result.scalars().all()
now = datetime.utcnow()
threshold = now - timedelta(minutes=settings.connection_lost_minutes)
for vehicle in vehicles:
# Получить последнюю позицию
pos_result = await db.execute(
select(Position)
.where(Position.vehicle_id == vehicle.id)
.order_by(desc(Position.timestamp))
.limit(1)
)
last_pos = pos_result.scalar_one_or_none()
if not last_pos:
continue
# Проверить, прошло ли достаточно времени
if last_pos.timestamp < threshold:
# Проверить, не было ли уже события CONNECTION_LOST за последние N минут
event_result = await db.execute(
select(Event)
.where(Event.vehicle_id == vehicle.id)
.where(Event.type == "CONNECTION_LOST")
.where(Event.timestamp > threshold)
.limit(1)
)
existing_event = event_result.scalar_one_or_none()
if not existing_event:
# Создать событие
minutes_ago = (now - last_pos.timestamp).total_seconds() / 60
event = Event(
vehicle_id=vehicle.id,
timestamp=now,
type="CONNECTION_LOST",
payload={
"last_seen": last_pos.timestamp.isoformat(),
"minutes_ago": round(minutes_ago, 1),
"lat": last_pos.lat,
"lon": last_pos.lon
}
)
db.add(event)
await db.commit()
await db.refresh(event)
# Отправить через WebSocket
await manager.broadcast_event(event)
print(f"⚠️ CONNECTION_LOST: {vehicle.name} (нет данных {minutes_ago:.0f} мин)")
async def connection_checker_loop():
"""Бесконечный цикл проверки соединений"""
print("🔍 Connection checker started")
while True:
try:
await check_connections()
except Exception as e:
print(f"Connection checker error: {e}")
await asyncio.sleep(60) # Проверять каждую минуту