from typing import Dict, Set from fastapi import WebSocket from uuid import UUID import json class ConnectionManager: def __init__(self): # room_id -> set of (websocket, user_id) self.active_connections: Dict[UUID, Set[tuple[WebSocket, UUID]]] = {} async def connect(self, websocket: WebSocket, room_id: UUID, user_id: UUID): await websocket.accept() if room_id not in self.active_connections: self.active_connections[room_id] = set() self.active_connections[room_id].add((websocket, user_id)) def disconnect(self, websocket: WebSocket, room_id: UUID, user_id: UUID): if room_id in self.active_connections: self.active_connections[room_id].discard((websocket, user_id)) if not self.active_connections[room_id]: del self.active_connections[room_id] async def broadcast_to_room(self, room_id: UUID, message: dict, exclude_user: UUID = None): if room_id not in self.active_connections: return message_json = json.dumps(message, default=str) disconnected = [] for websocket, user_id in self.active_connections[room_id]: if exclude_user and user_id == exclude_user: continue try: await websocket.send_text(message_json) except Exception: disconnected.append((websocket, user_id)) for conn in disconnected: self.active_connections[room_id].discard(conn) def get_room_user_count(self, room_id: UUID) -> int: if room_id not in self.active_connections: return 0 return len(self.active_connections[room_id]) manager = ConnectionManager()