diff --git a/dashboard/src/components/AppHeader.vue b/dashboard/src/components/AppHeader.vue index 8e60ba0..3877473 100644 --- a/dashboard/src/components/AppHeader.vue +++ b/dashboard/src/components/AppHeader.vue @@ -32,6 +32,10 @@ const navItems = [ path: '/conversations', icon: `` }, + { + path: '/chat-rooms', + icon: `` + }, { path: '/tools', icon: `` diff --git a/dashboard/src/components/MessageBubble.vue b/dashboard/src/components/MessageBubble.vue index 400270b..8ef6853 100644 --- a/dashboard/src/components/MessageBubble.vue +++ b/dashboard/src/components/MessageBubble.vue @@ -1,8 +1,11 @@ + + diff --git a/luxx/__init__.py b/luxx/__init__.py index bd1e09d..3075c3a 100644 --- a/luxx/__init__.py +++ b/luxx/__init__.py @@ -15,7 +15,7 @@ logger = logging.getLogger(__name__) async def lifespan(app: FastAPI): """Application lifespan manager""" # Import all models to ensure they are registered with Base - from luxx.models import User, Conversation, Message, Project, LLMProvider # noqa + from luxx.models import User, Conversation, Message, Project, LLMProvider, ChatRoom, RoomAgent # noqa init_db() # Create default test user if not exists diff --git a/luxx/models.py b/luxx/models.py index 374e571..baca411 100644 --- a/luxx/models.py +++ b/luxx/models.py @@ -18,21 +18,19 @@ class LLMProvider(Base): id: Mapped[int] = mapped_column(Integer, primary_key=True) user_id: Mapped[int] = mapped_column(Integer, ForeignKey("users.id"), nullable=False) name: Mapped[str] = mapped_column(String(100), nullable=False) - provider_type: Mapped[str] = mapped_column(String(50), nullable=False, default="openai") # openai, deepseek, glm, etc. + provider_type: Mapped[str] = mapped_column(String(50), nullable=False, default="openai") base_url: Mapped[str] = mapped_column(String(500), nullable=False) api_key: Mapped[str] = mapped_column(String(500), nullable=False) default_model: Mapped[str] = mapped_column(String(100), nullable=False, default="gpt-4") - max_tokens: Mapped[int] = mapped_column(Integer, default=8192) # 默认 8192 + max_tokens: Mapped[int] = mapped_column(Integer, default=8192) is_default: Mapped[bool] = mapped_column(Boolean, default=False) enabled: Mapped[bool] = mapped_column(Boolean, default=True) created_at: Mapped[datetime] = mapped_column(DateTime, default=local_now) updated_at: Mapped[datetime] = mapped_column(DateTime, default=local_now, onupdate=local_now) - # Relationships user: Mapped["User"] = relationship("User", backref="llm_providers") def to_dict(self, include_key: bool = False): - """Convert to dictionary, optionally include API key""" result = { "id": self.id, "user_id": self.user_id, @@ -62,7 +60,6 @@ class Project(Base): created_at: Mapped[datetime] = mapped_column(DateTime, default=local_now) updated_at: Mapped[datetime] = mapped_column(DateTime, default=local_now, onupdate=local_now) - # Relationships user: Mapped["User"] = relationship("User", backref="projects") @@ -75,12 +72,11 @@ class User(Base): email: Mapped[Optional[str]] = mapped_column(String(120), unique=True, nullable=True) password_hash: Mapped[Optional[str]] = mapped_column(String(255), nullable=True) role: Mapped[str] = mapped_column(String(20), default="user") - permission_level: Mapped[int] = mapped_column(Integer, default=1) # 1=READ_ONLY, 2=WRITE, 3=EXECUTE, 4=ADMIN - workspace_path: Mapped[Optional[str]] = mapped_column(String(500), nullable=True) # 用户工作空间路径 + permission_level: Mapped[int] = mapped_column(Integer, default=1) + workspace_path: Mapped[Optional[str]] = mapped_column(String(500), nullable=True) is_active: Mapped[bool] = mapped_column(Boolean, default=True) created_at: Mapped[datetime] = mapped_column(DateTime, default=local_now) - # Relationships conversations: Mapped[List["Conversation"]] = relationship( "Conversation", back_populates="user", cascade="all, delete-orphan" ) @@ -115,11 +111,11 @@ class Conversation(Base): created_at: Mapped[datetime] = mapped_column(DateTime, default=local_now) updated_at: Mapped[datetime] = mapped_column(DateTime, default=local_now, onupdate=local_now) - # Relationships user: Mapped["User"] = relationship("User", back_populates="conversations") provider: Mapped[Optional["LLMProvider"]] = relationship("LLMProvider") messages: Mapped[List["Message"]] = relationship( - "Message", back_populates="conversation", cascade="all, delete-orphan" + "Message", back_populates="conversation", cascade="all, delete-orphan", + primaryjoin="Conversation.id == foreign(Message.conversation_id)" ) def to_dict(self): @@ -142,84 +138,148 @@ class Conversation(Base): class Message(Base): """Message model - content 字段统一使用 JSON 格式存储: - - **User 消息:** - { - "text": "用户输入的文本内容", - "attachments": [ - {"name": "utils.py", "extension": "py", "content": "..."} - ] - } - - **Assistant 消息:** - { - "steps": [ // 有序步骤,用于渲染(主要数据源) - {"id": "step-0", "index": 0, "type": "thinking", "content": "..."}, - {"id": "step-1", "index": 1, "type": "text", "content": "..."}, - {"id": "step-2", "index": 2, "type": "tool_call", "id_ref": "call_xxx", "name": "...", "arguments": "..."}, - {"id": "step-3", "index": 3, "type": "tool_result", "id_ref": "call_xxx", "name": "...", "content": "..."} - ] - } - - 注意:to_dict() 返回时会从 steps 动态计算 text 和 content 字段。 + 同时服务于普通会话和聊天室: + - 普通会话:conversation_id 非空,room_id 为空 + - 聊天室:room_id 非空,conversation_id 为空,sender_name/sender_color/round_number 有值 """ __tablename__ = "messages" id: Mapped[str] = mapped_column(String(64), primary_key=True) - conversation_id: Mapped[str] = mapped_column(String(64), ForeignKey("conversations.id"), nullable=False) - role: Mapped[str] = mapped_column(String(16), nullable=False) # user, assistant, system, tool + conversation_id: Mapped[Optional[str]] = mapped_column(String(64), ForeignKey("conversations.id"), nullable=True) + room_id: Mapped[Optional[str]] = mapped_column(String(64), ForeignKey("chat_rooms.id"), nullable=True) + role: Mapped[str] = mapped_column(String(16), nullable=False) content: Mapped[str] = mapped_column(Text, nullable=False, default="") token_count: Mapped[int] = mapped_column(Integer, default=0) - usage: Mapped[Optional[str]] = mapped_column(Text, nullable=True) # JSON string for usage info + usage: Mapped[Optional[str]] = mapped_column(Text, nullable=True) + # 聊天室专属字段(普通会话为空) + sender_name: Mapped[Optional[str]] = mapped_column(String(100), nullable=True) + sender_color: Mapped[Optional[str]] = mapped_column(String(7), nullable=True, default="#2563eb") + round_number: Mapped[Optional[int]] = mapped_column(Integer, nullable=True) created_at: Mapped[datetime] = mapped_column(DateTime, default=local_now) - # Relationships - conversation: Mapped["Conversation"] = relationship("Conversation", back_populates="messages") + conversation: Mapped[Optional["Conversation"]] = relationship("Conversation", back_populates="messages") + room: Mapped[Optional["ChatRoom"]] = relationship("ChatRoom", back_populates="messages") def to_dict(self): - """Convert to dictionary, extracting process_steps for frontend""" import json result = { "id": self.id, "conversation_id": self.conversation_id, + "room_id": self.room_id, "role": self.role, "token_count": self.token_count, "created_at": self.created_at.isoformat() if self.created_at else None } - # Parse usage JSON if self.usage: try: result["usage"] = json.loads(self.usage) except json.JSONDecodeError: result["usage"] = None - # Parse content JSON + # 聊天室专属字段 + if self.sender_name: + result["sender_name"] = self.sender_name + if self.sender_color: + result["sender_color"] = self.sender_color + if self.round_number is not None: + result["round_number"] = self.round_number + try: content_obj = json.loads(self.content) if self.content else {} except json.JSONDecodeError: - # Legacy plain text content result["content"] = self.content result["text"] = self.content result["attachments"] = [] result["process_steps"] = [] return result - # Extract steps as process_steps for frontend rendering steps = content_obj.get("steps", []) result["process_steps"] = steps - # Extract text from steps (concatenate all text type steps) text_content = "".join( s.get("content", "") for s in steps if s.get("type") == "text" ) result["text"] = text_content - result["content"] = text_content # Alias for convenience - - # Extract attachments + result["content"] = text_content result["attachments"] = content_obj.get("attachments", []) return result + + +# ============ Chat Room Models ============ + +class ChatRoom(Base): + """Multi-agent chat room model""" + __tablename__ = "chat_rooms" + + id: Mapped[str] = mapped_column(String(64), primary_key=True) + user_id: Mapped[int] = mapped_column(Integer, ForeignKey("users.id"), nullable=False) + title: Mapped[str] = mapped_column(String(255), nullable=False) + task: Mapped[str] = mapped_column(Text, nullable=False, default="") + status: Mapped[str] = mapped_column(String(20), nullable=False, default="idle") # idle, running, paused, completed, error + max_rounds: Mapped[int] = mapped_column(Integer, default=5) + current_round: Mapped[int] = mapped_column(Integer, default=0) + created_at: Mapped[datetime] = mapped_column(DateTime, default=local_now) + updated_at: Mapped[datetime] = mapped_column(DateTime, default=local_now, onupdate=local_now) + + user: Mapped["User"] = relationship("User", backref="chat_rooms") + agents: Mapped[List["RoomAgent"]] = relationship( + "RoomAgent", back_populates="room", cascade="all, delete-orphan", order_by="RoomAgent.turn_order" + ) + messages: Mapped[List["Message"]] = relationship( + "Message", back_populates="room", cascade="all, delete-orphan", + primaryjoin="ChatRoom.id == foreign(Message.room_id)", + order_by="Message.created_at" + ) + + def to_dict(self, include_messages: bool = False): + result = { + "id": self.id, + "user_id": self.user_id, + "title": self.title, + "task": self.task, + "status": self.status, + "max_rounds": self.max_rounds, + "current_round": self.current_round, + "created_at": self.created_at.isoformat() if self.created_at else None, + "updated_at": self.updated_at.isoformat() if self.updated_at else None, + "agents": [a.to_dict() for a in self.agents] + } + if include_messages: + result["messages"] = [m.to_dict() for m in self.messages] + return result + + +class RoomAgent(Base): + """Agent configuration in a chat room""" + __tablename__ = "room_agents" + + id: Mapped[int] = mapped_column(Integer, primary_key=True) + room_id: Mapped[str] = mapped_column(String(64), ForeignKey("chat_rooms.id"), nullable=False) + name: Mapped[str] = mapped_column(String(100), nullable=False) + role: Mapped[str] = mapped_column(String(255), nullable=False, default="") + provider_id: Mapped[Optional[int]] = mapped_column(Integer, ForeignKey("llm_providers.id"), nullable=True) + model: Mapped[str] = mapped_column(String(100), nullable=False, default="") + system_prompt: Mapped[str] = mapped_column(Text, nullable=False, default="You are a helpful AI assistant.") + color: Mapped[str] = mapped_column(String(7), nullable=False, default="#2563eb") + turn_order: Mapped[int] = mapped_column(Integer, default=0) + + room: Mapped["ChatRoom"] = relationship("ChatRoom", back_populates="agents") + provider: Mapped[Optional["LLMProvider"]] = relationship("LLMProvider") + + def to_dict(self): + return { + "id": self.id, + "room_id": self.room_id, + "name": self.name, + "role": self.role, + "provider_id": self.provider_id, + "model": self.model, + "system_prompt": self.system_prompt, + "color": self.color, + "turn_order": self.turn_order + } + diff --git a/luxx/routes/__init__.py b/luxx/routes/__init__.py index c6519c8..9b7b74d 100644 --- a/luxx/routes/__init__.py +++ b/luxx/routes/__init__.py @@ -1,7 +1,7 @@ """API routes module""" from fastapi import APIRouter -from luxx.routes import auth, conversations, messages, tools, providers +from luxx.routes import auth, conversations, messages, tools, providers, chat_rooms api_router = APIRouter() @@ -12,3 +12,4 @@ api_router.include_router(conversations.router) api_router.include_router(messages.router) api_router.include_router(tools.router) api_router.include_router(providers.router) +api_router.include_router(chat_rooms.router) diff --git a/luxx/routes/chat_rooms.py b/luxx/routes/chat_rooms.py new file mode 100644 index 0000000..59c6f15 --- /dev/null +++ b/luxx/routes/chat_rooms.py @@ -0,0 +1,413 @@ +"""Chat room routes for multi-agent conversations""" +from typing import Optional, List +from fastapi import APIRouter, Depends +from fastapi.responses import StreamingResponse +from pydantic import BaseModel +from sqlalchemy.orm import Session +from datetime import datetime + +from luxx.database import get_db, SessionLocal +from luxx.models import ChatRoom, RoomAgent, Message, LLMProvider, User +from luxx.routes.auth import get_current_user +from luxx.services.chat_room import orchestrator +from luxx.utils.helpers import generate_id, success_response, error_response, paginate + +router = APIRouter(prefix="/chat-rooms", tags=["Chat Rooms"]) + + +# ============ Request Models ============ + +class AgentConfig(BaseModel): + name: str + role: str = "" + provider_id: Optional[int] = None + model: str = "" + system_prompt: str = "You are a helpful AI assistant." + color: str = "#2563eb" + + +class ChatRoomCreate(BaseModel): + title: str + task: str + max_rounds: int = 5 + agents: List[AgentConfig] = [] + + +class ChatRoomUpdate(BaseModel): + title: Optional[str] = None + task: Optional[str] = None + max_rounds: Optional[int] = None + status: Optional[str] = None + + +class AgentCreate(BaseModel): + name: str + role: str = "" + provider_id: Optional[int] = None + model: str = "" + system_prompt: str = "You are a helpful AI assistant." + color: str = "#2563eb" + + +class AgentUpdate(BaseModel): + name: Optional[str] = None + role: Optional[str] = None + provider_id: Optional[int] = None + model: Optional[str] = None + system_prompt: Optional[str] = None + color: Optional[str] = None + turn_order: Optional[int] = None + + +# ============ Room CRUD ============ + +@router.get("/", response_model=dict) +def list_rooms( + page: int = 1, + page_size: int = 20, + current_user: User = Depends(get_current_user), + db: Session = Depends(get_db) +): + """List chat rooms""" + query = db.query(ChatRoom).filter(ChatRoom.user_id == current_user.id) + result = paginate(query.order_by(ChatRoom.updated_at.desc()), page, page_size) + return success_response(data={ + "items": [r.to_dict() for r in result["items"]], + "total": result["total"], + "page": result["page"], + "page_size": result["page_size"] + }) + + +@router.post("/", response_model=dict) +def create_room( + data: ChatRoomCreate, + current_user: User = Depends(get_current_user), + db: Session = Depends(get_db) +): + """Create a chat room with agents""" + room = ChatRoom( + id=generate_id("room"), + user_id=current_user.id, + title=data.title, + task=data.task, + max_rounds=data.max_rounds + ) + db.add(room) + db.flush() + + for i, agent_cfg in enumerate(data.agents): + # Resolve model from provider if not specified + model = agent_cfg.model + provider_id = agent_cfg.provider_id + if provider_id and not model: + provider = db.query(LLMProvider).filter( + LLMProvider.id == provider_id, + LLMProvider.user_id == current_user.id + ).first() + if provider: + model = provider.default_model + if not model: + # Use default provider + default_provider = db.query(LLMProvider).filter( + LLMProvider.user_id == current_user.id, + LLMProvider.is_default == True + ).first() + if default_provider: + provider_id = default_provider.id + model = default_provider.default_model + if not model: + model = "gpt-4" + + agent = RoomAgent( + room_id=room.id, + name=agent_cfg.name, + role=agent_cfg.role, + provider_id=provider_id, + model=model, + system_prompt=agent_cfg.system_prompt, + color=agent_cfg.color, + turn_order=i + ) + db.add(agent) + + db.commit() + db.refresh(room) + return success_response(data=room.to_dict(include_messages=False), message="Room created") + + +@router.get("/{room_id}", response_model=dict) +def get_room( + room_id: str, + current_user: User = Depends(get_current_user), + db: Session = Depends(get_db) +): + """Get room details with agents""" + room = db.query(ChatRoom).filter( + ChatRoom.id == room_id, + ChatRoom.user_id == current_user.id + ).first() + if not room: + return error_response("Room not found", 404) + + result = room.to_dict(include_messages=False) + # Also get message count + msg_count = db.query(Message).filter(Message.room_id == room_id).count() + result["message_count"] = msg_count + return success_response(data=result) + + +@router.put("/{room_id}", response_model=dict) +def update_room( + room_id: str, + data: ChatRoomUpdate, + current_user: User = Depends(get_current_user), + db: Session = Depends(get_db) +): + """Update room""" + room = db.query(ChatRoom).filter( + ChatRoom.id == room_id, + ChatRoom.user_id == current_user.id + ).first() + if not room: + return error_response("Room not found", 404) + + if room.status == "running": + return error_response("Cannot update a running room", 400) + + update_data = data.dict(exclude_unset=True) + for key, value in update_data.items(): + setattr(room, key, value) + + db.commit() + db.refresh(room) + return success_response(data=room.to_dict(), message="Room updated") + + +@router.delete("/{room_id}", response_model=dict) +def delete_room( + room_id: str, + current_user: User = Depends(get_current_user), + db: Session = Depends(get_db) +): + """Delete room""" + room = db.query(ChatRoom).filter( + ChatRoom.id == room_id, + ChatRoom.user_id == current_user.id + ).first() + if not room: + return error_response("Room not found", 404) + + if room.status == "running": + return error_response("Cannot delete a running room. Stop it first.", 400) + + db.delete(room) + db.commit() + return success_response(message="Room deleted") + + +# ============ Room Actions ============ + +@router.post("/{room_id}/start") +async def start_room( + room_id: str, + current_user: User = Depends(get_current_user), + db: Session = Depends(get_db) +): + """Start the multi-agent conversation as SSE stream""" + room = db.query(ChatRoom).filter( + ChatRoom.id == room_id, + ChatRoom.user_id == current_user.id + ).first() + if not room: + return error_response("Room not found", 404) + + if room.status == "running": + return error_response("Room is already running", 400) + + async def event_generator(): + async for sse_str in orchestrator.run_room(room_id): + yield sse_str + + return StreamingResponse( + event_generator(), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "X-Accel-Buffering": "no" + } + ) + + +@router.post("/{room_id}/stop", response_model=dict) +def stop_room( + room_id: str, + current_user: User = Depends(get_current_user), + db: Session = Depends(get_db) +): + """Stop a running room""" + room = db.query(ChatRoom).filter( + ChatRoom.id == room_id, + ChatRoom.user_id == current_user.id + ).first() + if not room: + return error_response("Room not found", 404) + + orchestrator.cancel(room_id) + room.status = "paused" + db.commit() + return success_response(message="Room stopped") + + +@router.post("/{room_id}/reset", response_model=dict) +def reset_room( + room_id: str, + current_user: User = Depends(get_current_user), + db: Session = Depends(get_db) +): + """Reset room to initial state, clearing all messages""" + room = db.query(ChatRoom).filter( + ChatRoom.id == room_id, + ChatRoom.user_id == current_user.id + ).first() + if not room: + return error_response("Room not found", 404) + + if room.status == "running": + return error_response("Cannot reset a running room", 400) + + # Delete all messages in this room + db.query(Message).filter(Message.room_id == room_id).delete() + room.status = "idle" + room.current_round = 0 + db.commit() + return success_response(message="Room reset") + + +# ============ Messages ============ + +@router.get("/{room_id}/messages", response_model=dict) +def get_room_messages( + room_id: str, + current_user: User = Depends(get_current_user), + db: Session = Depends(get_db) +): + """Get all messages in a room""" + room = db.query(ChatRoom).filter( + ChatRoom.id == room_id, + ChatRoom.user_id == current_user.id + ).first() + if not room: + return error_response("Room not found", 404) + + messages = db.query(Message).filter( + Message.room_id == room_id + ).order_by(Message.created_at).all() + + return success_response(data={ + "messages": [m.to_dict() for m in messages], + "room": room.to_dict() + }) + + +# ============ Agent CRUD ============ + +@router.post("/{room_id}/agents", response_model=dict) +def add_agent( + room_id: str, + data: AgentCreate, + current_user: User = Depends(get_current_user), + db: Session = Depends(get_db) +): + """Add an agent to a room""" + room = db.query(ChatRoom).filter( + ChatRoom.id == room_id, + ChatRoom.user_id == current_user.id + ).first() + if not room: + return error_response("Room not found", 404) + + if room.status == "running": + return error_response("Cannot modify agents while room is running", 400) + + # Get max turn_order + max_order = db.query(RoomAgent).filter( + RoomAgent.room_id == room_id + ).count() + + model = data.model + provider_id = data.provider_id + if provider_id and not model: + provider = db.query(LLMProvider).filter(LLMProvider.id == provider_id).first() + if provider: + model = provider.default_model + if not model: + model = "gpt-4" + + agent = RoomAgent( + room_id=room_id, + name=data.name, + role=data.role, + provider_id=provider_id, + model=model, + system_prompt=data.system_prompt, + color=data.color, + turn_order=max_order + ) + db.add(agent) + db.commit() + db.refresh(agent) + return success_response(data=agent.to_dict(), message="Agent added") + + +@router.put("/{room_id}/agents/{agent_id}", response_model=dict) +def update_agent( + room_id: str, + agent_id: int, + data: AgentUpdate, + current_user: User = Depends(get_current_user), + db: Session = Depends(get_db) +): + """Update an agent""" + agent = db.query(RoomAgent).filter( + RoomAgent.id == agent_id, + RoomAgent.room_id == room_id + ).first() + if not agent: + return error_response("Agent not found", 404) + + room = db.query(ChatRoom).filter(ChatRoom.id == room_id).first() + if room and room.status == "running": + return error_response("Cannot modify agents while room is running", 400) + + update_data = data.dict(exclude_unset=True) + for key, value in update_data.items(): + setattr(agent, key, value) + + db.commit() + return success_response(data=agent.to_dict(), message="Agent updated") + + +@router.delete("/{room_id}/agents/{agent_id}", response_model=dict) +def delete_agent( + room_id: str, + agent_id: int, + current_user: User = Depends(get_current_user), + db: Session = Depends(get_db) +): + """Remove an agent from a room""" + agent = db.query(RoomAgent).filter( + RoomAgent.id == agent_id, + RoomAgent.room_id == room_id + ).first() + if not agent: + return error_response("Agent not found", 404) + + room = db.query(ChatRoom).filter(ChatRoom.id == room_id).first() + if room and room.status == "running": + return error_response("Cannot remove agents while room is running", 400) + + db.delete(agent) + db.commit() + return success_response(message="Agent removed") diff --git a/luxx/services/chat_room.py b/luxx/services/chat_room.py new file mode 100644 index 0000000..e6f4967 --- /dev/null +++ b/luxx/services/chat_room.py @@ -0,0 +1,288 @@ +"""Multi-agent chat room service. + +Orchestrates multiple agents taking turns to discuss and solve a task. +Each agent uses its own LLM provider/model and system prompt. +""" +import json +import logging +import asyncio +import traceback +from typing import List, Dict, Any, AsyncGenerator, Optional + +from luxx.database import SessionLocal +from luxx.models import ChatRoom, RoomAgent, Message, LLMProvider +from luxx.services.llm_client import LLMClient +from luxx.services.stream_context import StreamState, StepType +from luxx.services.events import sse_event +from luxx.utils.helpers import generate_id + +logger = logging.getLogger(__name__) + + +class ChatRoomOrchestrator: + """Orchestrates multi-agent conversations in a chat room.""" + + def __init__(self): + self._running_rooms: Dict[str, asyncio.Task] = {} + + def is_running(self, room_id: str) -> bool: + return room_id in self._running_rooms and not self._running_rooms[room_id].done() + + def cancel(self, room_id: str): + task = self._running_rooms.get(room_id) + if task and not task.done(): + task.cancel() + + async def run_room( + self, + room_id: str, + db_session=None + ) -> AsyncGenerator[str, None]: + """Run a chat room: agents take turns discussing the task.""" + db = db_session or SessionLocal() + own_session = db_session is None + + try: + room = db.query(ChatRoom).filter(ChatRoom.id == room_id).first() + if not room: + yield sse_event("error", {"content": "Room not found"}) + return + + agents = db.query(RoomAgent).filter( + RoomAgent.room_id == room_id + ).order_by(RoomAgent.turn_order).all() + + if not agents: + yield sse_event("error", {"content": "No agents in room"}) + return + + room.status = "running" + db.commit() + + # Yield room started event + yield sse_event("room_started", {"room_id": room_id, "task": room.task}) + + # Build conversation history from existing messages + history = self._load_history(room_id, db) + + # If no messages yet, add the task as the initial user message + if not history: + task_msg = Message( + id=generate_id("msg"), + room_id=room_id, + role="user", + content=json.dumps({"text": room.task}, ensure_ascii=False), + sender_name="用户", + sender_color="#10b981", + round_number=0 + ) + db.add(task_msg) + db.commit() + history.append({"role": "user", "content": room.task}) + yield sse_event("message", task_msg.to_dict()) + + # Run rounds + for round_num in range(room.current_round + 1, room.max_rounds + 1): + room.current_round = round_num + db.commit() + + yield sse_event("round_start", { + "round": round_num, + "max_rounds": room.max_rounds + }) + + for agent in agents: + try: + async for event in self._agent_turn( + room_id, agent, history, round_num, db + ): + yield event + except asyncio.CancelledError: + room.status = "paused" + db.commit() + yield sse_event("room_paused", {"room_id": room_id, "round": round_num}) + return + except Exception as e: + logger.error(f"Agent {agent.name} error: {e}\n{traceback.format_exc()}") + yield sse_event("agent_error", { + "agent": agent.name, + "error": str(e) + }) + + yield sse_event("round_end", {"round": round_num}) + + # Completed + room.status = "completed" + db.commit() + yield sse_event("room_completed", { + "room_id": room_id, + "total_rounds": room.max_rounds + }) + + except asyncio.CancelledError: + room = db.query(ChatRoom).filter(ChatRoom.id == room_id).first() + if room: + room.status = "paused" + db.commit() + yield sse_event("room_paused", {"room_id": room_id}) + except Exception as e: + logger.error(f"Room error: {e}\n{traceback.format_exc()}") + room = db.query(ChatRoom).filter(ChatRoom.id == room_id).first() + if room: + room.status = "error" + db.commit() + yield sse_event("error", {"content": str(e)}) + finally: + if own_session: + db.close() + self._running_rooms.pop(room_id, None) + + async def _agent_turn( + self, + room_id: str, + agent: RoomAgent, + history: List[Dict], + round_num: int, + db + ) -> AsyncGenerator[str, None]: + """Execute one agent's turn in the conversation.""" + # Get LLM client for this agent + llm, max_tokens = self._create_llm_client(agent, db) + if not llm: + yield sse_event("agent_error", { + "agent": agent.name, + "error": "No LLM provider configured" + }) + return + + model = agent.model or llm.default_model or "gpt-4" + + # Build messages for this agent + messages = self._build_agent_messages(agent, history) + + # Call LLM (non-streaming for simplicity in multi-agent context) + try: + response = await llm.async_sync_call( + model=model, + messages=messages, + temperature=0.7, + max_tokens=max_tokens or 2000 + ) + except Exception as e: + logger.error(f"LLM call failed for {agent.name}: {e}") + yield sse_event("agent_error", { + "agent": agent.name, + "error": f"LLM call failed: {str(e)}" + }) + return + + content = response.get("content", "") + usage = response.get("usage", {}) + token_count = usage.get("total_tokens", len(content) // 4) + + # Build steps for storage (compatible with Message content format) + steps = [{"id": "step-0", "index": 0, "type": "text", "content": content}] + content_json = {"steps": steps} + + # Save message + msg = Message( + id=generate_id("msg"), + room_id=room_id, + role="assistant", + content=json.dumps(content_json, ensure_ascii=False), + token_count=token_count, + usage=json.dumps(usage) if usage else None, + sender_name=agent.name, + sender_color=agent.color, + round_number=round_num + ) + db.add(msg) + db.commit() + + # Update history + history.append({"role": "assistant", "content": content, "sender": agent.name}) + + # Yield message event + msg_dict = msg.to_dict() + yield sse_event("message", msg_dict) + + # Close client + await llm.close() + + def _create_llm_client(self, agent: RoomAgent, db) -> tuple: + """Create LLM client for an agent.""" + if agent.provider_id: + provider = db.query(LLMProvider).filter( + LLMProvider.id == agent.provider_id + ).first() + if provider: + client = LLMClient( + api_key=provider.api_key, + api_url=provider.base_url, + model=agent.model or provider.default_model, + provider_type=provider.provider_type + ) + return client, provider.max_tokens + + return None, None + + def _build_agent_messages(self, agent: RoomAgent, history: List[Dict]) -> List[Dict]: + """Build the message list for an agent's LLM call.""" + messages = [{"role": "system", "content": agent.system_prompt}] + + for h in history: + role = h.get("role", "user") + content = h.get("content", "") + sender = h.get("sender", "") + + if role == "user": + messages.append({"role": "user", "content": content}) + elif role == "assistant": + # Prefix with sender name so the agent knows who said what + prefix = f"[{sender}]: " if sender else "" + messages.append({"role": "assistant", "content": prefix + content}) + + return messages + + def _load_history(self, room_id: str, db) -> List[Dict]: + """Load conversation history from existing room messages.""" + messages = db.query(Message).filter( + Message.room_id == room_id + ).order_by(Message.created_at).all() + + history = [] + for msg in messages: + # Extract text from message content + text = self._extract_text(msg.content) + entry = {"role": msg.role, "content": text} + if msg.sender_name and msg.role == "assistant": + entry["sender"] = msg.sender_name + history.append(entry) + + return history + + @staticmethod + def _extract_text(content: str) -> str: + """Extract text from message content JSON.""" + if not content: + return "" + try: + parsed = json.loads(content) + if isinstance(parsed, dict): + # Try steps-based format + steps = parsed.get("steps", []) + if steps: + return "".join( + s.get("content", "") for s in steps + if s.get("type") == "text" + ) + # Try simple text format + if "text" in parsed: + return parsed["text"] + return content + except (json.JSONDecodeError, TypeError): + return content + + +# Singleton orchestrator +orchestrator = ChatRoomOrchestrator()