Luxx/luxx/services/room.py

362 lines
13 KiB
Python

"""Chat Room Service - orchestrates multi-agent chat"""
import json
import re
import uuid
import asyncio
import logging
from typing import List, Dict, Any, Optional, AsyncGenerator
from dataclasses import dataclass
from luxx.core.database import SessionLocal
from luxx.models.room import ChatRoom, Agent, ChatRoomAgent, ChatRoomMessage
from luxx.agents.base import BaseAgent
logger = logging.getLogger(__name__)
# ==================== Dispatcher ====================
@dataclass
class DispatchResult:
"""Result of message dispatch"""
triggered_agents: List[BaseAgent]
mentions: List[str]
should_respond: bool
class MessageDispatcher:
"""Dispatcher for routing messages to agents"""
@staticmethod
def parse_mentions(content: str) -> List[str]:
"""Parse @mentions from message content"""
pattern = r'@(\w+)'
return re.findall(pattern, content)
@staticmethod
def get_agents_by_names(names: List[str], room_agents: List[BaseAgent]) -> List[BaseAgent]:
"""Get agents by their names (case-insensitive)"""
name_lower_map = {a.name.lower(): a for a in room_agents}
matched = []
for name in names:
agent = name_lower_map.get(name.lower())
if agent:
matched.append(agent)
return matched
@staticmethod
def get_agents_by_ids(agent_ids: List[str], room_agents: List[BaseAgent]) -> List[BaseAgent]:
"""Get agents by their IDs"""
id_set = set(agent_ids)
return [a for a in room_agents if a.agent_id in id_set]
def dispatch(self, content: str, room_agents: List[BaseAgent], sender_id: str, sender_type: str = "user") -> DispatchResult:
"""Dispatch a message to appropriate agents."""
available_agents = [a for a in room_agents if a.agent_id != sender_id]
mentions = self.parse_mentions(content)
if mentions:
triggered = self.get_agents_by_names(mentions, available_agents)
logger.info(f"Message with mentions: {mentions} -> triggered: {[a.name for a in triggered]}")
return DispatchResult(triggered_agents=triggered, mentions=mentions, should_respond=len(triggered) > 0)
auto_agents = [a for a in available_agents if a.auto_response]
auto_agents.sort(key=lambda a: a.priority)
logger.info(f"Auto-response agents triggered: {[a.name for a in auto_agents]}")
return DispatchResult(triggered_agents=auto_agents, mentions=[], should_respond=len(auto_agents) > 0)
# ==================== Aggregator ====================
class ResponseAggregator:
"""Aggregates responses from multiple agents"""
def __init__(self, room_id: str):
self.room_id = room_id
self._agent_responses: Dict[str, Dict[str, Any]] = {}
async def aggregate_stream(self, agent_streams: Dict[str, AsyncGenerator]) -> AsyncGenerator[Dict[str, Any], None]:
"""Aggregate streaming responses from multiple agents."""
if not agent_streams:
return
async def collect_agent_stream(agent_id: str, stream):
try:
async for event in stream:
event["agent_id"] = agent_id
yield event
except Exception as e:
logger.error(f"Agent {agent_id} stream error: {e}")
yield {"event": "error", "agent_id": agent_id, "data": {"content": str(e)}}
tasks = [collect_agent_stream(agent_id, stream) for agent_id, stream in agent_streams.items()]
async for event in asyncio.merge(*tasks):
yield event
def aggregate_final(self, responses: Dict[str, Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Aggregate final responses from agents."""
results = []
for agent_id, response in responses.items():
if response.get("event") == "done":
results.append({
"agent_id": agent_id,
"agent_name": response.get("agent_name"),
"message_id": response.get("message_id"),
"content": response.get("content"),
"token_count": response.get("token_count", 0)
})
return results
# ==================== Chat Room Service ====================
class ChatRoomService:
"""Service for managing chat rooms with multi-agent support"""
def __init__(self):
self.dispatcher = MessageDispatcher()
def get_room(self, room_id: str) -> Optional[ChatRoom]:
"""Get a chat room by ID"""
db = SessionLocal()
try:
return db.query(ChatRoom).filter(ChatRoom.id == room_id).first()
finally:
db.close()
def get_room_agents(self, room_id: str) -> List[BaseAgent]:
"""Get all active agents in a chat room"""
db = SessionLocal()
try:
room_agents = db.query(ChatRoomAgent).filter(
ChatRoomAgent.chat_room_id == room_id,
ChatRoomAgent.is_active == True
).all()
agents = []
for ra in room_agents:
agent_db = db.query(Agent).filter(Agent.id == ra.agent_id, Agent.is_active == True).first()
if agent_db:
agents.append(BaseAgent.from_model(agent_db))
agents.sort(key=lambda a: a.priority)
return agents
finally:
db.close()
def get_agent(self, agent_id: str) -> Optional[BaseAgent]:
"""Get an agent by ID"""
db = SessionLocal()
try:
agent_db = db.query(Agent).filter(Agent.id == agent_id).first()
if agent_db:
return BaseAgent.from_model(agent_db)
return None
finally:
db.close()
def list_rooms(self, user_id: int = None, include_agents: bool = True) -> List[Dict]:
"""List all chat rooms"""
db = SessionLocal()
try:
query = db.query(ChatRoom)
if user_id:
query = query.filter(ChatRoom.owner_id == user_id)
rooms = query.order_by(ChatRoom.updated_at.desc()).all()
return [r.to_dict(include_agents=include_agents) for r in rooms]
finally:
db.close()
def create_room(self, name: str, owner_id: int, description: str = None, agent_ids: List[str] = None) -> Dict:
"""Create a new chat room"""
db = SessionLocal()
try:
room = ChatRoom(
id=str(uuid.uuid4()),
name=name,
description=description,
owner_id=owner_id
)
db.add(room)
if agent_ids:
for agent_id in agent_ids:
room_agent = ChatRoomAgent(
id=str(uuid.uuid4()),
chat_room_id=room.id,
agent_id=agent_id
)
db.add(room_agent)
db.commit()
return room.to_dict(include_agents=True)
finally:
db.close()
def update_room(self, room_id: str, name: str = None, description: str = None, is_active: bool = None) -> Optional[Dict]:
"""Update a chat room"""
db = SessionLocal()
try:
room = db.query(ChatRoom).filter(ChatRoom.id == room_id).first()
if not room:
return None
if name is not None:
room.name = name
if description is not None:
room.description = description
if is_active is not None:
room.is_active = is_active
db.commit()
return room.to_dict(include_agents=True)
finally:
db.close()
def delete_room(self, room_id: str) -> bool:
"""Delete a chat room"""
db = SessionLocal()
try:
room = db.query(ChatRoom).filter(ChatRoom.id == room_id).first()
if not room:
return False
db.delete(room)
db.commit()
return True
finally:
db.close()
def add_agent_to_room(self, room_id: str, agent_id: str) -> bool:
"""Add an agent to a chat room"""
db = SessionLocal()
try:
existing = db.query(ChatRoomAgent).filter(
ChatRoomAgent.chat_room_id == room_id,
ChatRoomAgent.agent_id == agent_id
).first()
if existing:
existing.is_active = True
else:
room_agent = ChatRoomAgent(
id=str(uuid.uuid4()),
chat_room_id=room_id,
agent_id=agent_id
)
db.add(room_agent)
db.commit()
return True
finally:
db.close()
def remove_agent_from_room(self, room_id: str, agent_id: str) -> bool:
"""Remove an agent from a chat room"""
db = SessionLocal()
try:
room_agent = db.query(ChatRoomAgent).filter(
ChatRoomAgent.chat_room_id == room_id,
ChatRoomAgent.agent_id == agent_id
).first()
if room_agent:
room_agent.is_active = False
db.commit()
return True
return False
finally:
db.close()
def get_messages(self, room_id: str, limit: int = 50, before_id: str = None) -> List[Dict]:
"""Get messages from a chat room"""
db = SessionLocal()
try:
query = db.query(ChatRoomMessage).filter(
ChatRoomMessage.room_id == room_id
).order_by(ChatRoomMessage.created_at.desc())
if before_id:
before_msg = db.query(ChatRoomMessage).filter(
ChatRoomMessage.id == before_id
).first()
if before_msg:
query = query.filter(ChatRoomMessage.created_at < before_msg.created_at)
messages = query.limit(limit).all()
return [m.to_dict() for m in reversed(messages)]
finally:
db.close()
def save_message(self, room_id: str, sender_type: str, sender_id: str, sender_name: str, content: str,
mentions: List[str] = None, parent_id: str = None, token_count: int = 0) -> Dict:
"""Save a message to a chat room"""
db = SessionLocal()
try:
msg = ChatRoomMessage(
id=str(uuid.uuid4()),
room_id=room_id,
sender_type=sender_type,
sender_id=sender_id,
sender_name=sender_name,
content=content,
mentions=json.dumps(mentions) if mentions else None,
parent_id=parent_id,
token_count=token_count
)
db.add(msg)
room = db.query(ChatRoom).filter(ChatRoom.id == room_id).first()
if room:
from datetime import datetime
room.updated_at = datetime.now()
db.commit()
return msg.to_dict()
finally:
db.close()
async def process_message(self, room_id: str, user_message: str, user_id: str, user_name: str, context: Dict = None) -> AsyncGenerator[Dict[str, Any], None]:
"""Process a user message and dispatch to appropriate agents."""
room = self.get_room(room_id)
if not room:
yield {"event": "error", "data": {"content": "Chat room not found"}}
return
room_agents = self.get_room_agents(room_id)
if not room_agents:
yield {"event": "error", "data": {"content": "No agents available in this room"}}
return
dispatch_result = self.dispatcher.dispatch(
content=user_message,
room_agents=room_agents,
sender_id=user_id,
sender_type="user"
)
if not dispatch_result.should_respond:
yield {"event": "no_response", "data": {"message": "No agents triggered"}}
return
messages = self.get_messages(room_id, limit=20)
agent_streams = {}
for agent in dispatch_result.triggered_agents:
stream = agent.stream_response(
user_message=user_message,
conversation_history=messages,
context=context
)
agent_streams[agent.agent_id] = stream
aggregator = ResponseAggregator(room_id)
async for event in aggregator.aggregate_stream(agent_streams):
yield event
# Global service instance
chat_room_service = ChatRoomService()
# Export for backward compatibility
dispatcher = chat_room_service.dispatcher