"""WebSocket handler for Chat Rooms""" import json import asyncio import logging from typing import Dict, Set from fastapi import WebSocket, WebSocketDisconnect from luxx.services.room import chat_room_service logger = logging.getLogger(__name__) class ChatRoomConnectionManager: """Manages WebSocket connections for chat rooms""" def __init__(self): self._room_connections: Dict[str, Set[WebSocket]] = {} self._connection_rooms: Dict[WebSocket, str] = {} async def connect(self, websocket: WebSocket, room_id: str): """Connect a WebSocket to a chat room""" await websocket.accept() if room_id not in self._room_connections: self._room_connections[room_id] = set() self._room_connections[room_id].add(websocket) self._connection_rooms[websocket] = room_id logger.info(f"WebSocket connected to room: {room_id}") await websocket.send_json({ "event": "connected", "data": {"room_id": room_id, "message": "Connected to chat room"} }) def disconnect(self, websocket: WebSocket): """Disconnect a WebSocket from its room""" room_id = self._connection_rooms.pop(websocket, None) if room_id and room_id in self._room_connections: self._room_connections[room_id].discard(websocket) if not self._room_connections[room_id]: del self._room_connections[room_id] logger.info(f"WebSocket disconnected from room: {room_id}") async def broadcast_to_room(self, room_id: str, message: dict, exclude: WebSocket = None): """Broadcast a message to all connections in a room""" if room_id not in self._room_connections: return disconnected = [] for connection in self._room_connections[room_id]: if connection == exclude: continue try: await connection.send_json(message) except Exception as e: logger.error(f"Failed to send to connection: {e}") disconnected.append(connection) for conn in disconnected: self.disconnect(conn) async def send_to_room(self, room_id: str, message: dict): """Send a message to all connections in a room""" await self.broadcast_to_room(room_id, message) def get_room_size(self, room_id: str) -> int: """Get the number of connections in a room""" return len(self._room_connections.get(room_id, set())) # Global connection manager connection_manager = ChatRoomConnectionManager() async def websocket_handler(websocket: WebSocket, room_id: str): """Handle WebSocket connection for a chat room.""" await connection_manager.connect(websocket, room_id) room = chat_room_service.get_room(room_id) if not room: await websocket.send_json({"event": "error", "data": {"content": "Chat room not found"}}) await websocket.close() return try: messages = chat_room_service.get_messages(room_id, limit=50) await websocket.send_json({"event": "history", "data": {"messages": messages}}) agents = chat_room_service.get_room_agents(room_id) await websocket.send_json({"event": "agents", "data": {"agents": [a.to_dict() for a in agents]}}) await connection_manager.broadcast_to_room(room_id, { "event": "system", "data": {"content": "User joined the room", "type": "join"} }, exclude=websocket) while True: data = await websocket.receive_json() action = data.get("action") if action == "send_message": content = data.get("content", "") if not content: continue user_id = str(data.get("user_id", "anonymous")) user_name = data.get("user_name", "Anonymous") msg = chat_room_service.save_message( room_id=room_id, sender_type="user", sender_id=user_id, sender_name=user_name, content=content ) await connection_manager.broadcast_to_room(room_id, {"event": "message", "data": msg}) agents = chat_room_service.get_room_agents(room_id) for agent in agents: await connection_manager.broadcast_to_room(room_id, { "event": "typing", "data": {"agent_id": agent.agent_id, "agent_name": agent.name, "is_typing": True} }) context = {"user_id": user_id, "username": user_name} logger.info(f"[ROOM_WS] Starting process_message, agents count: {len(agents)}") event_count = 0 async for event in chat_room_service.process_message( room_id=room_id, user_message=content, user_id=user_id, user_name=user_name, context=context ): event_count += 1 logger.info(f"[ROOM_WS] Received event {event_count}: {event.get('event')}") if event.get("event") in ["process_step", "done", "error"]: # Find agent_name from agents list agent_name = None for agent in agents: if agent.agent_id == event.get("agent_id"): agent_name = agent.name break await connection_manager.broadcast_to_room(room_id, { "event": event.get("event"), "data": event.get("data", {}), "agent_id": event.get("agent_id"), "agent_name": agent_name }) if event.get("event") == "done": # Find agent_name from agents list agent_name = None for agent in agents: if agent.agent_id == event.get("agent_id"): agent_name = agent.name break chat_room_service.save_message( room_id=room_id, sender_type="agent", sender_id=event.get("agent_id"), sender_name=agent_name, content=event.get("data", {}).get("content", "") if isinstance(event.get("data"), dict) else "", token_count=event.get("data", {}).get("token_count", 0) if isinstance(event.get("data"), dict) else 0 ) else: logger.info(f"[ROOM_WS] Skipping event: {event.get('event')}") logger.info(f"[ROOM_WS] process_message completed, total events: {event_count}") for agent in agents: await connection_manager.broadcast_to_room(room_id, { "event": "typing", "data": {"agent_id": agent.agent_id, "agent_name": agent.name, "is_typing": False} }) elif action == "ping": await websocket.send_json({"event": "pong", "data": {}}) except WebSocketDisconnect: logger.info(f"WebSocket disconnected from room: {room_id}") connection_manager.disconnect(websocket) await connection_manager.broadcast_to_room(room_id, { "event": "system", "data": {"content": "User left the room", "type": "leave"} }) except Exception as e: logger.error(f"WebSocket error in room {room_id}: {e}") connection_manager.disconnect(websocket) await connection_manager.broadcast_to_room(room_id, { "event": "error", "data": {"content": str(e)} })