Files
2025-12-18 21:13:49 +03:00

122 lines
4.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Симулятор движения транспортных средств.
Генерирует реалистичные данные о перемещении объектов.
"""
import asyncio
import random
import math
import httpx
from datetime import datetime
# Backend API URL (внутри Docker сети, без /api — root_path только для nginx)
API_URL = "http://backend:8000"
# Начальные координаты (Новосибирск - центр)
START_COORDS = [
(55.0304, 82.9204), # Центр
(55.0411, 82.9344), # Север
(55.0198, 82.9064), # Юг
(55.0350, 82.8904), # Запад
(55.0250, 82.9504), # Восток
]
class VehicleSimulator:
def __init__(self, vehicle_id: int, start_lat: float, start_lon: float):
self.vehicle_id = vehicle_id
self.lat = start_lat
self.lon = start_lon
self.speed = random.uniform(20, 60) # km/h
self.heading = random.uniform(0, 360) # degrees
self.is_stopped = False
self.stop_duration = 0
def update(self):
"""Обновить позицию транспортного средства"""
# Случайная остановка
if not self.is_stopped and random.random() < 0.02: # 2% шанс остановиться
self.is_stopped = True
self.stop_duration = random.randint(5, 30) # секунд
self.speed = 0
if self.is_stopped:
self.stop_duration -= 1
if self.stop_duration <= 0:
self.is_stopped = False
self.speed = random.uniform(20, 60)
if not self.is_stopped:
# Случайное изменение направления
self.heading += random.uniform(-15, 15)
self.heading = self.heading % 360
# Случайное изменение скорости
self.speed += random.uniform(-5, 5)
self.speed = max(10, min(90, self.speed)) # Ограничение 10-90 км/ч
# Расчёт нового положения
# Примерно: 1 градус широты = 111 км, 1 градус долготы = 111 * cos(lat) км
speed_ms = self.speed / 3.6 # м/с
distance = speed_ms * 2 # за 2 секунды
# Перевод в градусы
delta_lat = (distance * math.cos(math.radians(self.heading))) / 111000
delta_lon = (distance * math.sin(math.radians(self.heading))) / (111000 * math.cos(math.radians(self.lat)))
self.lat += delta_lat
self.lon += delta_lon
return {
"vehicle_id": self.vehicle_id,
"lat": round(self.lat, 6),
"lon": round(self.lon, 6),
"speed": round(self.speed, 1),
"heading": round(self.heading, 1),
"timestamp": datetime.utcnow().isoformat()
}
async def send_position(client: httpx.AsyncClient, position: dict):
"""Отправить позицию на сервер"""
try:
response = await client.post(f"{API_URL}/ingest/position", json=position)
if response.status_code == 201:
print(f"✓ Vehicle {position['vehicle_id']}: ({position['lat']}, {position['lon']}) @ {position['speed']} km/h")
else:
print(f"✗ Vehicle {position['vehicle_id']}: Error {response.status_code}")
except Exception as e:
print(f"✗ Vehicle {position['vehicle_id']}: {e}")
async def main():
print("🚗 Запуск симулятора транспорта...")
print(f"📡 API URL: {API_URL}")
# Создаём симуляторы для каждого транспортного средства
simulators = []
for i, (lat, lon) in enumerate(START_COORDS, start=1):
sim = VehicleSimulator(vehicle_id=i, start_lat=lat, start_lon=lon)
simulators.append(sim)
print(f" → Vehicle {i}: начальная позиция ({lat}, {lon})")
print("\n🔄 Начинаем отправку данных (Ctrl+C для остановки)...\n")
async with httpx.AsyncClient(timeout=10.0) as client:
while True:
# Обновляем и отправляем позиции всех транспортных средств
tasks = []
for sim in simulators:
position = sim.update()
tasks.append(send_position(client, position))
await asyncio.gather(*tasks)
await asyncio.sleep(2) # Интервал 2 секунды
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\n\n⏹ Симулятор остановлен")