"""Chat Room Service - orchestrates multi-agent chat with unified participants""" import json import re import uuid import asyncio import logging from typing import List, Dict, Any, Optional, AsyncGenerator from dataclasses import dataclass from luxx.core.database import SessionLocal from luxx.models.room import ChatRoom, Agent, RoomParticipant from luxx.models.chat import Message from luxx.agents.base import BaseAgent logger = logging.getLogger(__name__) @dataclass class DispatchResult: triggered_agents: List[BaseAgent] mentions: List[str] should_respond: bool class MessageDispatcher: @staticmethod def parse_mentions(content: str) -> List[str]: return re.findall(r'@(\w+)', content) def dispatch(self, content: str, room_agents: List[BaseAgent], sender_id: str) -> DispatchResult: available = [a for a in room_agents if a.agent_id != sender_id] mentions = self.parse_mentions(content) if mentions: name_map = {a.name.lower(): a for a in available} triggered = [name_map[n.lower()] for n in mentions if n.lower() in name_map] else: triggered = [a for a in available if a.auto_response] triggered.sort(key=lambda a: a.priority) return DispatchResult(triggered_agents=triggered, mentions=mentions, should_respond=bool(triggered)) class ResponseAggregator: def __init__(self, room_id: str): self.room_id = room_id async def aggregate_stream(self, streams: Dict[str, AsyncGenerator]) -> AsyncGenerator[Dict[str, Any], None]: if not streams: return def parse_sse(s: str) -> Dict[str, Any]: result = {"event": None, "data": {}} for line in s.strip().split('\n'): if line.startswith('event: '): result["event"] = line[7:].strip() elif line.startswith('data: '): try: result["data"] = json.loads(line[6:].strip()) except json.JSONDecodeError: result["data"] = {"content": line[6:].strip()} return result queue = asyncio.Queue() async def producer(agent_id: str, stream): try: async for event in stream: parsed = parse_sse(event) if isinstance(event, str) else event parsed["agent_id"] = agent_id await queue.put((agent_id, parsed)) except Exception as e: logger.error(f"Agent {agent_id} stream error: {e}") await queue.put((agent_id, {"event": "error", "agent_id": agent_id, "data": {"content": str(e)}})) finally: await queue.put((agent_id, None)) producers = [asyncio.create_task(producer(aid, s)) for aid, s in streams.items()] active = len(producers) while active > 0: aid, event = await queue.get() if event is None: active -= 1 else: yield event await asyncio.gather(*producers, return_exceptions=True) class ChatRoomService: def __init__(self): self.dispatcher = MessageDispatcher() def _query(self, model, filters=None): db = SessionLocal() try: q = db.query(model) if filters: q = q.filter(*filters) if isinstance(filters, list) else q.filter(filters) return q, db except: db.close() raise def get_room(self, room_id: str) -> Optional[ChatRoom]: db = SessionLocal() try: return db.query(ChatRoom).filter(ChatRoom.id == room_id).first() finally: db.close() def get_room_agents(self, room_id: str) -> List[BaseAgent]: db = SessionLocal() try: rps = db.query(RoomParticipant).filter( RoomParticipant.room_id == room_id, RoomParticipant.is_active == True, RoomParticipant.agent_id != None ).all() agents = [] for rp in rps: agent_db = db.query(Agent).filter(Agent.id == rp.agent_id, Agent.is_active == True).first() if agent_db: agents.append(BaseAgent.from_model(agent_db)) return sorted(agents, key=lambda a: a.priority) finally: db.close() def get_agent(self, agent_id: str) -> Optional[BaseAgent]: db = SessionLocal() try: agent_db = db.query(Agent).filter(Agent.id == agent_id).first() return BaseAgent.from_model(agent_db) if agent_db else None finally: db.close() def list_rooms(self, user_id: int = None) -> List[Dict]: db = SessionLocal() try: q = db.query(ChatRoom) if user_id: q = q.filter(ChatRoom.owner_id == user_id) return [r.to_dict(include_agents=True) for r in q.order_by(ChatRoom.updated_at.desc()).all()] finally: db.close() def create_room(self, name: str, owner_id: int, description: str = None, agent_ids: List[str] = None) -> Dict: db = SessionLocal() try: room = ChatRoom(id=str(uuid.uuid4()), name=name, description=description, owner_id=owner_id) db.add(room) for agent_id in (agent_ids or []): db.add(RoomParticipant(id=str(uuid.uuid4()), room_id=room.id, agent_id=agent_id)) db.commit() return room.to_dict(include_agents=True) finally: db.close() def update_room(self, room_id: str, **kwargs) -> Optional[Dict]: db = SessionLocal() try: room = db.query(ChatRoom).filter(ChatRoom.id == room_id).first() if not room: return None for key, value in kwargs.items(): if value is not None and hasattr(room, key): setattr(room, key, value) db.commit() return room.to_dict(include_agents=True) finally: db.close() def delete_room(self, room_id: str) -> bool: db = SessionLocal() try: room = db.query(ChatRoom).filter(ChatRoom.id == room_id).first() if room: db.delete(room) db.commit() return True return False finally: db.close() def add_participant(self, room_id: str, agent_id: str = None, user_id: int = None, role: str = "member") -> bool: db = SessionLocal() try: filter_cond = RoomParticipant.room_id == room_id if agent_id: filter_cond = filter_cond & (RoomParticipant.agent_id == agent_id) if user_id: filter_cond = filter_cond & (RoomParticipant.user_id == user_id) existing = db.query(RoomParticipant).filter(filter_cond).first() if existing: existing.is_active = True else: db.add(RoomParticipant(id=str(uuid.uuid4()), room_id=room_id, agent_id=agent_id, user_id=user_id, role=role)) db.commit() return True finally: db.close() def remove_participant(self, room_id: str, participant_id: str) -> bool: db = SessionLocal() try: rp = db.query(RoomParticipant).filter( RoomParticipant.room_id == room_id, RoomParticipant.id == participant_id ).first() if rp: rp.is_active = False db.commit() return True return False finally: db.close() def get_messages(self, room_id: str, limit: int = 50, before_id: str = None) -> List[Dict]: db = SessionLocal() try: q = db.query(Message).filter( Message.room_id == room_id ).order_by(Message.created_at.desc()) if before_id: before = db.query(Message).filter(Message.id == before_id).first() if before: q = q.filter(Message.created_at < before.created_at) return [m.to_dict() for m in reversed(q.limit(limit).all())] finally: db.close() def save_message(self, room_id: str, sender_type: str, sender_id: str, sender_name: str, content: str, mentions: List[str] = None, token_count: int = 0) -> Dict: db = SessionLocal() try: msg = Message( id=str(uuid.uuid4()), room_id=room_id, participant_id=f"{sender_type}:{sender_id}", role="assistant" if sender_type == "agent" else "user", sender_name=sender_name, content=content, mentions=json.dumps(mentions) if mentions else None, token_count=token_count ) db.add(msg) room = db.query(ChatRoom).filter(ChatRoom.id == room_id).first() if room: from datetime import datetime room.updated_at = datetime.now() db.commit() return msg.to_dict() finally: db.close() async def process_message(self, room_id: str, user_message: str, user_id: str, _user_name: str = None, context: Dict = None): room = self.get_room(room_id) if not room: yield {"event": "error", "data": {"content": "Chat room not found"}} return room_agents = self.get_room_agents(room_id) if not room_agents: yield {"event": "error", "data": {"content": "No agents available"}} return result = self.dispatcher.dispatch(user_message, room_agents, user_id) if not result.should_respond: yield {"event": "no_response", "data": {"message": "No agents triggered"}} return messages = self.get_messages(room_id, limit=20) streams = {a.agent_id: a.stream_response(user_message, messages, context) for a in result.triggered_agents} aggregator = ResponseAggregator(room_id) async for event in aggregator.aggregate_stream(streams): yield event chat_room_service = ChatRoomService() dispatcher = chat_room_service.dispatcher