diff --git a/dashboard/src/components/AppHeader.vue b/dashboard/src/components/AppHeader.vue index 3877473..8e60ba0 100644 --- a/dashboard/src/components/AppHeader.vue +++ b/dashboard/src/components/AppHeader.vue @@ -32,10 +32,6 @@ const navItems = [ path: '/conversations', icon: `` }, - { - path: '/chat-rooms', - icon: `` - }, { path: '/tools', icon: `` diff --git a/dashboard/src/components/SupervisionGraphEditor.vue b/dashboard/src/components/SupervisionGraphEditor.vue deleted file mode 100644 index e30bfa8..0000000 --- a/dashboard/src/components/SupervisionGraphEditor.vue +++ /dev/null @@ -1,1159 +0,0 @@ - - - - - diff --git a/dashboard/src/components/room/AddToRoomModal.vue b/dashboard/src/components/room/AddToRoomModal.vue deleted file mode 100644 index b83f9a8..0000000 --- a/dashboard/src/components/room/AddToRoomModal.vue +++ /dev/null @@ -1,166 +0,0 @@ - - - - - diff --git a/dashboard/src/components/room/AgentFormModal.vue b/dashboard/src/components/room/AgentFormModal.vue deleted file mode 100644 index 01fb67f..0000000 --- a/dashboard/src/components/room/AgentFormModal.vue +++ /dev/null @@ -1,377 +0,0 @@ - - - - - diff --git a/dashboard/src/components/room/CreateRoomModal.vue b/dashboard/src/components/room/CreateRoomModal.vue deleted file mode 100644 index dfe8e3f..0000000 --- a/dashboard/src/components/room/CreateRoomModal.vue +++ /dev/null @@ -1,381 +0,0 @@ - - - - - diff --git a/dashboard/src/components/room/RoomMessageList.vue b/dashboard/src/components/room/RoomMessageList.vue deleted file mode 100644 index a208453..0000000 --- a/dashboard/src/components/room/RoomMessageList.vue +++ /dev/null @@ -1,399 +0,0 @@ - - - - - diff --git a/dashboard/src/components/room/RoomSidebar.vue b/dashboard/src/components/room/RoomSidebar.vue deleted file mode 100644 index 8ed7f29..0000000 --- a/dashboard/src/components/room/RoomSidebar.vue +++ /dev/null @@ -1,695 +0,0 @@ - - - - - diff --git a/dashboard/src/components/room/RoomToolbar.vue b/dashboard/src/components/room/RoomToolbar.vue deleted file mode 100644 index f7ac190..0000000 --- a/dashboard/src/components/room/RoomToolbar.vue +++ /dev/null @@ -1,423 +0,0 @@ - - - - - diff --git a/dashboard/src/router/index.js b/dashboard/src/router/index.js index 82dd97d..9e41d99 100644 --- a/dashboard/src/router/index.js +++ b/dashboard/src/router/index.js @@ -32,12 +32,6 @@ const routes = [ component: () => import('../views/ToolsView.vue'), meta: { requiresAuth: true } }, - { - path: '/chat-rooms', - name: 'ChatRooms', - component: () => import('../views/ChatRoomView.vue'), - meta: { requiresAuth: true } - }, // 首页重定向 { path: '/home', diff --git a/dashboard/src/utils/api.js b/dashboard/src/utils/api.js index 70749d8..d1cc6cf 100644 --- a/dashboard/src/utils/api.js +++ b/dashboard/src/utils/api.js @@ -91,32 +91,4 @@ export const providersAPI = { test: (id) => api.post(`/providers/${id}/test`) } -// ============ Agent 接口 ============ - -export const agentsAPI = { - list: () => api.get('/agents/'), - create: (data) => api.post('/agents/', data), - get: (id) => api.get(`/agents/${id}`), - update: (id, data) => api.put(`/agents/${id}`, data), - delete: (id) => api.delete(`/agents/${id}`) -} - -// ============ 聊天室接口 ============ - -export const chatRoomsAPI = { - list: (params) => api.get('/chat-rooms/', { params }), - create: (data) => api.post('/chat-rooms/', data), - get: (id) => api.get(`/chat-rooms/${id}`), - update: (id, data) => api.put(`/chat-rooms/${id}`, data), - delete: (id) => api.delete(`/chat-rooms/${id}`), - getMessages: (id) => api.get(`/chat-rooms/${id}/messages`), - start: (id) => `/api/chat-rooms/${id}/start`, - // 注意: start 返回路径字符串,由调用方使用 fetch 处理 SSE 流 - stop: (id) => api.post(`/chat-rooms/${id}/stop`), - reset: (id) => api.post(`/chat-rooms/${id}/reset`), - addAgent: (roomId, data) => api.post(`/chat-rooms/${roomId}/agents`, data), - updateAgent: (roomId, agentId, data) => api.put(`/chat-rooms/${roomId}/agents/${agentId}`, data), - deleteAgent: (roomId, agentId) => api.delete(`/chat-rooms/${roomId}/agents/${agentId}`) -} - export default api diff --git a/dashboard/src/utils/parallelStreamManager.js b/dashboard/src/utils/parallelStreamManager.js deleted file mode 100644 index ba690d4..0000000 --- a/dashboard/src/utils/parallelStreamManager.js +++ /dev/null @@ -1,183 +0,0 @@ -import { useParallelStreamStore } from './parallelStreamStore.js' - -/** - * Parallel stream manager for handling multi-agent parallel chat room SSE connections. - */ -class ParallelStreamManager { - constructor() { - this.activeRooms = {} // roomId -> { controller, streams } - this.decoder = new TextDecoder() - } - - /** - * Start parallel room stream. - * @param {string} roomId - Room identifier - * @param {string} token - Auth token - * @returns {Promise} - */ - async startParallelRoom(roomId, token) { - const controller = new AbortController() - this.activeRooms[roomId] = { - controller, - streams: new Map() // agentId -> stream info - } - - const store = useParallelStreamStore() - - try { - const response = await fetch(`/api/chat-rooms/${roomId}/start`, { - method: 'POST', - headers: { - 'Authorization': `Bearer ${token}`, - 'Content-Type': 'application/json' - }, - signal: controller.signal - }) - - if (!response.ok) { - throw new Error(`HTTP error: ${response.status}`) - } - - const reader = response.body.getReader() - await this._processStream(roomId, reader, store) - } catch (e) { - if (e.name !== 'AbortError') { - console.error('Parallel stream error:', e) - throw e - } - } finally { - delete this.activeRooms[roomId] - } - } - - /** - * Process SSE stream from server. - * @param {string} roomId - * @param {ReadableStreamReader} reader - * @param {Object} store - Pinia store - */ - async _processStream(roomId, reader, store) { - let buffer = '' - - while (true) { - const { done, value } = await reader.read() - if (done) break - - buffer += this.decoder.decode(value, { stream: true }) - const lines = buffer.split('\n') - buffer = lines.pop() || '' - - let currentEvent = '' - for (const line of lines) { - if (line.startsWith('event: ')) { - currentEvent = line.slice(7).trim() - } else if (line.startsWith('data: ')) { - try { - const data = JSON.parse(line.slice(6)) - this._handleParallelEvent(roomId, currentEvent, data, store) - } catch (e) { - console.error('Parse error:', e) - } - } - } - } - } - - /** - * Handle parallel event from SSE. - * @param {string} roomId - * @param {string} eventType - * @param {Object} data - * @param {Object} store - Pinia store - */ - _handleParallelEvent(roomId, eventType, data, store) { - switch (eventType) { - case 'parallel_start': - store.initRoom(roomId, data.agents || [], 'parallel') - if (data.round) { - store.updateRoundInfo(roomId, data.round, data.max_rounds || 0) - } - break - - case 'round_start': - store.updateRoundInfo(roomId, data.round, data.max_rounds || 0) - break - - case 'agent_status': - store.updateAgentStatus(roomId, data.agent_id, data.status) - break - - case 'message_start': - store.startAgentStream(roomId, data.agent_id, data) - break - - case 'message_chunk': - store.updateAgentContent(roomId, data.agent_id, { - content: data.content || '', - progress: data.progress || 0 - }) - break - - case 'message_end': - store.completeAgentStream(roomId, data.agent_id, data) - break - - case 'agent_error': - store.errorAgentStream(roomId, data.agent_id, { - message: data.error, - agentName: data.agent_name - }) - break - - case 'parallel_end': - // Parallel round completed - break - - case 'round_end': - // Round completed - break - - case 'room_completed': - store.cleanupRoom(roomId) - break - - case 'error': - console.error('Room error:', data.content) - store.cleanupRoom(roomId) - break - } - } - - /** - * Cancel room stream. - * @param {string} roomId - */ - cancelRoom(roomId) { - const room = this.activeRooms[roomId] - if (room) { - room.controller.abort() - delete this.activeRooms[roomId] - } - } - - /** - * Cancel all active room streams. - */ - cancelAll() { - for (const roomId of Object.keys(this.activeRooms)) { - this.cancelRoom(roomId) - } - } - - /** - * Check if a room is currently streaming. - * @param {string} roomId - * @returns {boolean} - */ - isStreaming(roomId) { - return roomId in this.activeRooms - } -} - -// Export singleton instance -export const parallelStreamManager = new ParallelStreamManager() \ No newline at end of file diff --git a/dashboard/src/utils/parallelStreamStore.js b/dashboard/src/utils/parallelStreamStore.js deleted file mode 100644 index 92cba64..0000000 --- a/dashboard/src/utils/parallelStreamStore.js +++ /dev/null @@ -1,168 +0,0 @@ -import { defineStore } from 'pinia' - -/** - * Parallel stream store for managing multi-agent parallel chat room state. - */ -export const useParallelStreamStore = defineStore('parallelStream', { - state: () => ({ - // Per room ID storage for parallel stream state - rooms: {} - }), - - actions: { - /** - * Initialize a room for parallel execution. - * @param {string} roomId - Room identifier - * @param {Array} agents - List of agents with id and name - * @param {string} mode - Execution mode ('parallel' or 'sequential') - */ - initRoom(roomId, agents, mode = 'sequential') { - this.rooms[roomId] = { - mode, - agents: {}, - roundInfo: { current: 0, max: 0 } - } - agents.forEach(agent => { - this.rooms[roomId].agents[agent.id] = { - id: agent.id, - name: agent.name, - status: 'pending', // pending, streaming, completed, error - message: null, - progress: 0, - error: null - } - }) - }, - - /** - * Update round information. - * @param {string} roomId - * @param {number} current - * @param {number} max - */ - updateRoundInfo(roomId, current, max) { - const room = this.rooms[roomId] - if (room) { - room.roundInfo = { current, max } - } - }, - - /** - * Start streaming for an agent. - * @param {string} roomId - * @param {string|number} agentId - * @param {Object} messageStart - Initial message data - */ - startAgentStream(roomId, agentId, messageStart) { - const room = this.rooms[roomId] - if (!room) return - - const agentIdStr = String(agentId) - room.agents[agentIdStr] = { - ...room.agents[agentIdStr], - status: 'streaming', - message: { - id: messageStart.id, - sender_name: messageStart.sender_name, - sender_color: messageStart.sender_color, - content: '', - process_steps: [] - }, - progress: 0 - } - }, - - /** - * Update agent content with a chunk. - * @param {string} roomId - * @param {string|number} agentId - * @param {Object} chunk - Chunk data with content and progress - */ - updateAgentContent(roomId, agentId, chunk) { - const room = this.rooms[roomId] - if (!room) return - - const agentIdStr = String(agentId) - const agent = room.agents[agentIdStr] - if (!agent || agent.status !== 'streaming') return - - if (agent.message) { - agent.message.content += chunk.content - } - agent.progress = chunk.progress || 0 - }, - - /** - * Complete agent stream with final message. - * @param {string} roomId - * @param {string|number} agentId - * @param {Object} finalMessage - Complete message data - */ - completeAgentStream(roomId, agentId, finalMessage) { - const room = this.rooms[roomId] - if (!room) return - - const agentIdStr = String(agentId) - const agent = room.agents[agentIdStr] - if (!agent) return - - agent.status = 'completed' - if (finalMessage) { - agent.message = finalMessage - } - agent.progress = 100 - }, - - /** - * Mark agent as error. - * @param {string} roomId - * @param {string|number} agentId - * @param {string} error - Error message - */ - errorAgentStream(roomId, agentId, error) { - const room = this.rooms[roomId] - if (!room) return - - const agentIdStr = String(agentId) - const agent = room.agents[agentIdStr] - if (!agent) return - - agent.status = 'error' - agent.error = error - }, - - /** - * Update agent status. - * @param {string} roomId - * @param {string|number} agentId - * @param {string} status - */ - updateAgentStatus(roomId, agentId, status) { - const room = this.rooms[roomId] - if (!room) return - - const agentIdStr = String(agentId) - const agent = room.agents[agentIdStr] - if (agent) { - agent.status = status - } - }, - - /** - * Clean up room data. - * @param {string} roomId - */ - cleanupRoom(roomId) { - if (this.rooms[roomId]) { - delete this.rooms[roomId] - } - }, - - /** - * Reset all room data. - */ - resetAll() { - this.rooms = {} - } - } -}) \ No newline at end of file diff --git a/dashboard/src/views/ChatRoomView.vue b/dashboard/src/views/ChatRoomView.vue deleted file mode 100644 index a8f8ad0..0000000 --- a/dashboard/src/views/ChatRoomView.vue +++ /dev/null @@ -1,1010 +0,0 @@ - - - - - diff --git a/luxx/__init__.py b/luxx/__init__.py index 0d5e1f9..31ca677 100644 --- a/luxx/__init__.py +++ b/luxx/__init__.py @@ -15,7 +15,7 @@ logger = logging.getLogger(__name__) async def lifespan(app: FastAPI): """Application lifespan manager""" # Import all models to ensure they are registered with Base - from luxx.models import User, Conversation, Message, Project, LLMProvider, ChatRoom, RoomAgent # noqa + from luxx.models import User, Conversation, Message, Project, LLMProvider # noqa init_db() # Create default admin user if not exists, using config values diff --git a/luxx/models.py b/luxx/models.py index b628786..3c0417c 100644 --- a/luxx/models.py +++ b/luxx/models.py @@ -171,29 +171,18 @@ class Conversation(Base): class Message(Base): - """Message model - - 同时服务于普通会话和聊天室: - - 普通会话:conversation_id 非空,room_id 为空 - - 聊天室:room_id 非空,conversation_id 为空,sender_name/sender_color/round_number 有值 - """ + """Message model for conversations""" __tablename__ = "messages" id: Mapped[str] = mapped_column(String(64), primary_key=True) conversation_id: Mapped[Optional[str]] = mapped_column(String(64), ForeignKey("conversations.id"), nullable=True) - room_id: Mapped[Optional[str]] = mapped_column(String(64), ForeignKey("chat_rooms.id"), nullable=True) role: Mapped[str] = mapped_column(String(16), nullable=False) content: Mapped[str] = mapped_column(Text, nullable=False, default="") token_count: Mapped[int] = mapped_column(Integer, default=0) usage: Mapped[Optional[str]] = mapped_column(Text, nullable=True) - # 聊天室专属字段(普通会话为空) - sender_name: Mapped[Optional[str]] = mapped_column(String(100), nullable=True) - sender_color: Mapped[Optional[str]] = mapped_column(String(7), nullable=True, default="#2563eb") - round_number: Mapped[Optional[int]] = mapped_column(Integer, nullable=True) created_at: Mapped[datetime] = mapped_column(DateTime, default=local_now) conversation: Mapped[Optional["Conversation"]] = relationship("Conversation", back_populates="messages") - room: Mapped[Optional["ChatRoom"]] = relationship("ChatRoom", back_populates="messages") def to_dict(self): import json @@ -201,7 +190,6 @@ class Message(Base): result = { "id": self.id, "conversation_id": self.conversation_id, - "room_id": self.room_id, "role": self.role, "token_count": self.token_count, "created_at": self.created_at.isoformat() if self.created_at else None @@ -213,14 +201,6 @@ class Message(Base): except json.JSONDecodeError: result["usage"] = None - # 聊天室专属字段 - if self.sender_name: - result["sender_name"] = self.sender_name - if self.sender_color: - result["sender_color"] = self.sender_color - if self.round_number is not None: - result["round_number"] = self.round_number - try: content_obj = json.loads(self.content) if self.content else {} except json.JSONDecodeError: @@ -242,117 +222,3 @@ class Message(Base): result["attachments"] = content_obj.get("attachments", []) return result - - -# ============ Chat Room Models ============ - -class Agent(Base): - """Standalone reusable Agent template""" - __tablename__ = "agents" - - id: Mapped[int] = mapped_column(Integer, primary_key=True) - user_id: Mapped[int] = mapped_column(Integer, ForeignKey("users.id"), nullable=False) - name: Mapped[str] = mapped_column(String(100), nullable=False) - role: Mapped[str] = mapped_column(String(255), nullable=False, default="") - provider_id: Mapped[Optional[int]] = mapped_column(Integer, ForeignKey("llm_providers.id"), nullable=True) - model: Mapped[str] = mapped_column(String(100), nullable=False, default="") - system_prompt: Mapped[str] = mapped_column(Text, nullable=False, default="You are a helpful AI assistant.") - color: Mapped[str] = mapped_column(String(7), nullable=False, default="#2563eb") - created_at: Mapped[datetime] = mapped_column(DateTime, default=local_now) - updated_at: Mapped[datetime] = mapped_column(DateTime, default=local_now, onupdate=local_now) - - user: Mapped["User"] = relationship("User", backref="agents") - provider: Mapped[Optional["LLMProvider"]] = relationship("LLMProvider") - - def to_dict(self): - return { - "id": self.id, - "user_id": self.user_id, - "name": self.name, - "role": self.role, - "provider_id": self.provider_id, - "model": self.model, - "system_prompt": self.system_prompt, - "color": self.color, - "created_at": self.created_at.isoformat() if self.created_at else None, - "updated_at": self.updated_at.isoformat() if self.updated_at else None - } - - -class ChatRoom(Base): - """Multi-agent chat room model""" - __tablename__ = "chat_rooms" - - id: Mapped[str] = mapped_column(String(64), primary_key=True) - user_id: Mapped[int] = mapped_column(Integer, ForeignKey("users.id"), nullable=False) - title: Mapped[str] = mapped_column(String(255), nullable=False) - task: Mapped[str] = mapped_column(Text, nullable=False, default="") - status: Mapped[str] = mapped_column(String(20), nullable=False, default="idle") # idle, running, paused, completed, error - execution_mode: Mapped[str] = mapped_column(String(20), nullable=False, default="sequential") # sequential, parallel - max_rounds: Mapped[int] = mapped_column(Integer, default=5) - current_round: Mapped[int] = mapped_column(Integer, default=0) - created_at: Mapped[datetime] = mapped_column(DateTime, default=local_now) - updated_at: Mapped[datetime] = mapped_column(DateTime, default=local_now, onupdate=local_now) - - user: Mapped["User"] = relationship("User", backref="chat_rooms") - agents: Mapped[List["RoomAgent"]] = relationship( - "RoomAgent", back_populates="room", cascade="all, delete-orphan", order_by="RoomAgent.turn_order" - ) - messages: Mapped[List["Message"]] = relationship( - "Message", back_populates="room", cascade="all, delete-orphan", - primaryjoin="ChatRoom.id == foreign(Message.room_id)", - order_by="Message.created_at" - ) - - def to_dict(self, include_messages: bool = False): - result = { - "id": self.id, - "user_id": self.user_id, - "title": self.title, - "task": self.task, - "status": self.status, - "execution_mode": self.execution_mode, - "max_rounds": self.max_rounds, - "current_round": self.current_round, - "created_at": self.created_at.isoformat() if self.created_at else None, - "updated_at": self.updated_at.isoformat() if self.updated_at else None, - "agents": [a.to_dict() for a in self.agents] - } - if include_messages: - result["messages"] = [m.to_dict() for m in self.messages] - return result - - -class RoomAgent(Base): - """Agent assignment in a chat room (links Agent to Room with room-specific config)""" - __tablename__ = "room_agents" - - id: Mapped[int] = mapped_column(Integer, primary_key=True) - room_id: Mapped[str] = mapped_column(String(64), ForeignKey("chat_rooms.id"), nullable=False) - agent_id: Mapped[Optional[int]] = mapped_column(Integer, ForeignKey("agents.id"), nullable=True) - name: Mapped[str] = mapped_column(String(100), nullable=False) - role: Mapped[str] = mapped_column(String(255), nullable=False, default="") - provider_id: Mapped[Optional[int]] = mapped_column(Integer, ForeignKey("llm_providers.id"), nullable=True) - model: Mapped[str] = mapped_column(String(100), nullable=False, default="") - system_prompt: Mapped[str] = mapped_column(Text, nullable=False, default="You are a helpful AI assistant.") - color: Mapped[str] = mapped_column(String(7), nullable=False, default="#2563eb") - turn_order: Mapped[int] = mapped_column(Integer, default=0) - - room: Mapped["ChatRoom"] = relationship("ChatRoom", back_populates="agents") - provider: Mapped[Optional["LLMProvider"]] = relationship("LLMProvider") - agent: Mapped[Optional["Agent"]] = relationship("Agent") - - def to_dict(self): - return { - "id": self.id, - "room_id": self.room_id, - "agent_id": self.agent_id, - "name": self.name, - "role": self.role, - "provider_id": self.provider_id, - "model": self.model, - "system_prompt": self.system_prompt, - "color": self.color, - "turn_order": self.turn_order - } - diff --git a/luxx/routes/__init__.py b/luxx/routes/__init__.py index d2e599c..545065f 100644 --- a/luxx/routes/__init__.py +++ b/luxx/routes/__init__.py @@ -1,7 +1,7 @@ """API routes module""" from fastapi import APIRouter -from luxx.routes import auth, conversations, messages, tools, providers, chat_rooms, agents, tasks +from luxx.routes import auth, conversations, messages, tools, providers, tasks api_router = APIRouter() @@ -12,6 +12,4 @@ api_router.include_router(conversations.router) api_router.include_router(messages.router) api_router.include_router(tools.router) api_router.include_router(providers.router) -api_router.include_router(chat_rooms.router) -api_router.include_router(agents.router) api_router.include_router(tasks.router) diff --git a/luxx/routes/agents.py b/luxx/routes/agents.py deleted file mode 100644 index 6f12dda..0000000 --- a/luxx/routes/agents.py +++ /dev/null @@ -1,143 +0,0 @@ -"""Standalone Agent CRUD routes""" -from typing import Optional -from fastapi import APIRouter, Depends -from pydantic import BaseModel -from sqlalchemy.orm import Session - -from luxx.database import get_db -from luxx.models import Agent, LLMProvider, User -from luxx.routes.auth import get_current_user -from luxx.utils.helpers import success_response, error_response - -router = APIRouter(prefix="/agents", tags=["Agents"]) - - -class AgentCreate(BaseModel): - name: str - role: str = "" - provider_id: Optional[int] = None - model: str = "" - system_prompt: str = "You are a helpful AI assistant." - color: str = "#2563eb" - - -class AgentUpdate(BaseModel): - name: Optional[str] = None - role: Optional[str] = None - provider_id: Optional[int] = None - model: Optional[str] = None - system_prompt: Optional[str] = None - color: Optional[str] = None - - -@router.get("/", response_model=dict) -def list_agents( - current_user: User = Depends(get_current_user), - db: Session = Depends(get_db) -): - """List all agents for current user""" - agents = db.query(Agent).filter( - Agent.user_id == current_user.id - ).order_by(Agent.updated_at.desc()).all() - return success_response(data=[a.to_dict() for a in agents]) - - -@router.post("/", response_model=dict) -def create_agent( - data: AgentCreate, - current_user: User = Depends(get_current_user), - db: Session = Depends(get_db) -): - """Create a new agent""" - model = data.model - provider_id = data.provider_id - if provider_id and not model: - provider = db.query(LLMProvider).filter( - LLMProvider.id == provider_id, - LLMProvider.user_id == current_user.id - ).first() - if provider: - model = provider.default_model - if not model: - default_provider = db.query(LLMProvider).filter( - LLMProvider.user_id == current_user.id, - LLMProvider.is_default == True - ).first() - if default_provider: - provider_id = default_provider.id - model = default_provider.default_model - if not model: - model = "gpt-4" - - agent = Agent( - user_id=current_user.id, - name=data.name, - role=data.role, - provider_id=provider_id, - model=model, - system_prompt=data.system_prompt, - color=data.color - ) - db.add(agent) - db.commit() - db.refresh(agent) - return success_response(data=agent.to_dict(), message="Agent created") - - -@router.get("/{agent_id}", response_model=dict) -def get_agent( - agent_id: int, - current_user: User = Depends(get_current_user), - db: Session = Depends(get_db) -): - """Get agent details""" - agent = db.query(Agent).filter( - Agent.id == agent_id, - Agent.user_id == current_user.id - ).first() - if not agent: - return error_response("Agent not found", 404) - return success_response(data=agent.to_dict()) - - -@router.put("/{agent_id}", response_model=dict) -def update_agent( - agent_id: int, - data: AgentUpdate, - current_user: User = Depends(get_current_user), - db: Session = Depends(get_db) -): - """Update an agent""" - agent = db.query(Agent).filter( - Agent.id == agent_id, - Agent.user_id == current_user.id - ).first() - if not agent: - return error_response("Agent not found", 404) - - update_data = data.dict(exclude_unset=True) - for key, value in update_data.items(): - setattr(agent, key, value) - - db.commit() - db.refresh(agent) - return success_response(data=agent.to_dict(), message="Agent updated") - - -@router.delete("/{agent_id}", response_model=dict) -def delete_agent( - agent_id: int, - current_user: User = Depends(get_current_user), - db: Session = Depends(get_db) -): - """Delete an agent""" - agent = db.query(Agent).filter( - Agent.id == agent_id, - Agent.user_id == current_user.id - ).first() - if not agent: - return error_response("Agent not found", 404) - - db.delete(agent) - db.commit() - return success_response(message="Agent deleted") diff --git a/luxx/routes/chat_rooms.py b/luxx/routes/chat_rooms.py deleted file mode 100644 index f56bf6e..0000000 --- a/luxx/routes/chat_rooms.py +++ /dev/null @@ -1,476 +0,0 @@ -"""Chat room routes for multi-agent conversations""" -from typing import Optional, List -from fastapi import APIRouter, Depends -from fastapi.responses import StreamingResponse -from pydantic import BaseModel -from sqlalchemy.orm import Session -from datetime import datetime - -from luxx.database import get_db, SessionLocal -from luxx.models import ChatRoom, RoomAgent, Agent, Message, LLMProvider, User -from luxx.routes.auth import get_current_user -from luxx.services.chat_room import orchestrator -from luxx.utils.helpers import generate_id, success_response, error_response, paginate - -router = APIRouter(prefix="/chat-rooms", tags=["Chat Rooms"]) - - -# ============ Request Models ============ - -class AgentConfig(BaseModel): - agent_id: Optional[int] = None # Link to existing Agent - name: str = "" - role: str = "" - provider_id: Optional[int] = None - model: str = "" - system_prompt: str = "You are a helpful AI assistant." - color: str = "#2563eb" - # Supervision fields - agent_type: str = "producer" # producer | reviewer | executor | observer - reviews_for: Optional[str] = None # JSON: [agent_id_1, agent_id_2] - reviewed_by: Optional[str] = None # JSON: [agent_id_1, agent_id_2] - review_strictness: int = 3 - capability_tags: Optional[str] = None # JSON: ["security", "performance"] - - -class ChatRoomCreate(BaseModel): - title: str - task: str - max_rounds: int = 5 - execution_mode: str = "sequential" # sequential | parallel | review_loop - agents: List[AgentConfig] = [] - - -class ChatRoomUpdate(BaseModel): - title: Optional[str] = None - task: Optional[str] = None - max_rounds: Optional[int] = None - status: Optional[str] = None - execution_mode: Optional[str] = None - - -class AgentCreate(BaseModel): - agent_id: Optional[int] = None # Link to existing Agent - name: str = "" - role: str = "" - provider_id: Optional[int] = None - model: str = "" - system_prompt: str = "You are a helpful AI assistant." - color: str = "#2563eb" - - -class AgentUpdate(BaseModel): - name: Optional[str] = None - role: Optional[str] = None - provider_id: Optional[int] = None - model: Optional[str] = None - system_prompt: Optional[str] = None - color: Optional[str] = None - turn_order: Optional[int] = None - - -# ============ Room CRUD ============ - -@router.get("/", response_model=dict) -def list_rooms( - page: int = 1, - page_size: int = 20, - current_user: User = Depends(get_current_user), - db: Session = Depends(get_db) -): - """List chat rooms""" - query = db.query(ChatRoom).filter(ChatRoom.user_id == current_user.id) - result = paginate(query.order_by(ChatRoom.updated_at.desc()), page, page_size) - return success_response(data={ - "items": [r.to_dict() for r in result["items"]], - "total": result["total"], - "page": result["page"], - "page_size": result["page_size"] - }) - - -@router.post("/", response_model=dict) -def create_room( - data: ChatRoomCreate, - current_user: User = Depends(get_current_user), - db: Session = Depends(get_db) -): - """Create a chat room with agents""" - room = ChatRoom( - id=generate_id("room"), - user_id=current_user.id, - title=data.title, - task=data.task, - max_rounds=data.max_rounds, - execution_mode=data.execution_mode - ) - db.add(room) - db.flush() - - for i, agent_cfg in enumerate(data.agents): - # If agent_id provided, copy config from existing Agent - if agent_cfg.agent_id: - existing = db.query(Agent).filter( - Agent.id == agent_cfg.agent_id, - Agent.user_id == current_user.id - ).first() - if existing: - name = agent_cfg.name or existing.name - role = agent_cfg.role or existing.role - provider_id = agent_cfg.provider_id or existing.provider_id - model = agent_cfg.model or existing.model - system_prompt = agent_cfg.system_prompt if agent_cfg.system_prompt != "You are a helpful AI assistant." else existing.system_prompt - color = agent_cfg.color if agent_cfg.color != "#2563eb" else existing.color - agent_id = existing.id - else: - return error_response(f"Agent {agent_cfg.agent_id} not found", 404) - else: - name = agent_cfg.name or f"Agent {i+1}" - role = agent_cfg.role - provider_id = agent_cfg.provider_id - model = agent_cfg.model - system_prompt = agent_cfg.system_prompt - color = agent_cfg.color - agent_id = None - - # Resolve model from provider if not specified - if provider_id and not model: - provider = db.query(LLMProvider).filter( - LLMProvider.id == provider_id, - LLMProvider.user_id == current_user.id - ).first() - if provider: - model = provider.default_model - if not model: - default_provider = db.query(LLMProvider).filter( - LLMProvider.user_id == current_user.id, - LLMProvider.is_default == True - ).first() - if default_provider: - provider_id = default_provider.id - model = default_provider.default_model - if not model: - model = "gpt-4" - - agent = RoomAgent( - room_id=room.id, - agent_id=agent_id, - name=name, - role=role, - provider_id=provider_id, - model=model, - system_prompt=system_prompt, - color=color, - turn_order=i, - agent_type=agent_cfg.agent_type, - reviews_for=agent_cfg.reviews_for, - reviewed_by=agent_cfg.reviewed_by, - review_strictness=agent_cfg.review_strictness, - capability_tags=agent_cfg.capability_tags - ) - db.add(agent) - - db.commit() - db.refresh(room) - return success_response(data=room.to_dict(include_messages=False), message="Room created") - - -@router.get("/{room_id}", response_model=dict) -def get_room( - room_id: str, - current_user: User = Depends(get_current_user), - db: Session = Depends(get_db) -): - """Get room details with agents""" - room = db.query(ChatRoom).filter( - ChatRoom.id == room_id, - ChatRoom.user_id == current_user.id - ).first() - if not room: - return error_response("Room not found", 404) - - result = room.to_dict(include_messages=False) - # Also get message count - msg_count = db.query(Message).filter(Message.room_id == room_id).count() - result["message_count"] = msg_count - return success_response(data=result) - - -@router.put("/{room_id}", response_model=dict) -def update_room( - room_id: str, - data: ChatRoomUpdate, - current_user: User = Depends(get_current_user), - db: Session = Depends(get_db) -): - """Update room""" - room = db.query(ChatRoom).filter( - ChatRoom.id == room_id, - ChatRoom.user_id == current_user.id - ).first() - if not room: - return error_response("Room not found", 404) - - if room.status == "running": - return error_response("Cannot update a running room", 400) - - update_data = data.dict(exclude_unset=True) - for key, value in update_data.items(): - setattr(room, key, value) - - db.commit() - db.refresh(room) - return success_response(data=room.to_dict(), message="Room updated") - - -@router.delete("/{room_id}", response_model=dict) -def delete_room( - room_id: str, - current_user: User = Depends(get_current_user), - db: Session = Depends(get_db) -): - """Delete room""" - room = db.query(ChatRoom).filter( - ChatRoom.id == room_id, - ChatRoom.user_id == current_user.id - ).first() - if not room: - return error_response("Room not found", 404) - - if room.status == "running": - return error_response("Cannot delete a running room. Stop it first.", 400) - - db.delete(room) - db.commit() - return success_response(message="Room deleted") - - -# ============ Room Actions ============ - -@router.post("/{room_id}/start") -async def start_room( - room_id: str, - current_user: User = Depends(get_current_user), - db: Session = Depends(get_db) -): - """Start the multi-agent conversation as SSE stream""" - room = db.query(ChatRoom).filter( - ChatRoom.id == room_id, - ChatRoom.user_id == current_user.id - ).first() - if not room: - return error_response("Room not found", 404) - - if room.status == "running": - return error_response("Room is already running", 400) - - async def event_generator(): - async for sse_str in orchestrator.run_room(room_id): - yield sse_str - - return StreamingResponse( - event_generator(), - media_type="text/event-stream", - headers={ - "Cache-Control": "no-cache", - "Connection": "keep-alive", - "X-Accel-Buffering": "no" - } - ) - - -@router.post("/{room_id}/stop", response_model=dict) -def stop_room( - room_id: str, - current_user: User = Depends(get_current_user), - db: Session = Depends(get_db) -): - """Stop a running room""" - room = db.query(ChatRoom).filter( - ChatRoom.id == room_id, - ChatRoom.user_id == current_user.id - ).first() - if not room: - return error_response("Room not found", 404) - - orchestrator.cancel(room_id) - room.status = "paused" - db.commit() - return success_response(message="Room stopped") - - -@router.post("/{room_id}/reset", response_model=dict) -def reset_room( - room_id: str, - current_user: User = Depends(get_current_user), - db: Session = Depends(get_db) -): - """Reset room to initial state, clearing all messages""" - room = db.query(ChatRoom).filter( - ChatRoom.id == room_id, - ChatRoom.user_id == current_user.id - ).first() - if not room: - return error_response("Room not found", 404) - - if room.status == "running": - return error_response("Cannot reset a running room", 400) - - # Delete all messages in this room - db.query(Message).filter(Message.room_id == room_id).delete() - room.status = "idle" - room.current_round = 0 - db.commit() - return success_response(message="Room reset") - - -# ============ Messages ============ - -@router.get("/{room_id}/messages", response_model=dict) -def get_room_messages( - room_id: str, - current_user: User = Depends(get_current_user), - db: Session = Depends(get_db) -): - """Get all messages in a room""" - room = db.query(ChatRoom).filter( - ChatRoom.id == room_id, - ChatRoom.user_id == current_user.id - ).first() - if not room: - return error_response("Room not found", 404) - - messages = db.query(Message).filter( - Message.room_id == room_id - ).order_by(Message.created_at).all() - - return success_response(data={ - "messages": [m.to_dict() for m in messages], - "room": room.to_dict() - }) - - -# ============ Agent CRUD ============ - -@router.post("/{room_id}/agents", response_model=dict) -def add_agent( - room_id: str, - data: AgentCreate, - current_user: User = Depends(get_current_user), - db: Session = Depends(get_db) -): - """Add an agent to a room""" - room = db.query(ChatRoom).filter( - ChatRoom.id == room_id, - ChatRoom.user_id == current_user.id - ).first() - if not room: - return error_response("Room not found", 404) - - if room.status == "running": - return error_response("Cannot modify agents while room is running", 400) - - # Get max turn_order - max_order = db.query(RoomAgent).filter( - RoomAgent.room_id == room_id - ).count() - - # If agent_id provided, copy from existing Agent - if data.agent_id: - existing = db.query(Agent).filter( - Agent.id == data.agent_id, - Agent.user_id == current_user.id - ).first() - if not existing: - return error_response(f"Agent {data.agent_id} not found", 404) - name = data.name or existing.name - role = data.role or existing.role - provider_id = data.provider_id or existing.provider_id - model = data.model or existing.model - system_prompt = data.system_prompt if data.system_prompt != "You are a helpful AI assistant." else existing.system_prompt - color = data.color if data.color != "#2563eb" else existing.color - agent_id = existing.id - else: - name = data.name or f"Agent {max_order + 1}" - role = data.role - provider_id = data.provider_id - model = data.model - system_prompt = data.system_prompt - color = data.color - agent_id = None - - model = model - if provider_id and not model: - provider = db.query(LLMProvider).filter(LLMProvider.id == provider_id).first() - if provider: - model = provider.default_model - if not model: - model = "gpt-4" - - agent = RoomAgent( - room_id=room_id, - agent_id=agent_id, - name=name, - role=role, - provider_id=provider_id, - model=model, - system_prompt=system_prompt, - color=color, - turn_order=max_order - ) - db.add(agent) - db.commit() - db.refresh(agent) - return success_response(data=agent.to_dict(), message="Agent added") - - -@router.put("/{room_id}/agents/{agent_id}", response_model=dict) -def update_agent( - room_id: str, - agent_id: int, - data: AgentUpdate, - current_user: User = Depends(get_current_user), - db: Session = Depends(get_db) -): - """Update an agent""" - agent = db.query(RoomAgent).filter( - RoomAgent.id == agent_id, - RoomAgent.room_id == room_id - ).first() - if not agent: - return error_response("Agent not found", 404) - - room = db.query(ChatRoom).filter(ChatRoom.id == room_id).first() - if room and room.status == "running": - return error_response("Cannot modify agents while room is running", 400) - - update_data = data.dict(exclude_unset=True) - for key, value in update_data.items(): - setattr(agent, key, value) - - db.commit() - return success_response(data=agent.to_dict(), message="Agent updated") - - -@router.delete("/{room_id}/agents/{agent_id}", response_model=dict) -def delete_agent( - room_id: str, - agent_id: int, - current_user: User = Depends(get_current_user), - db: Session = Depends(get_db) -): - """Remove an agent from a room""" - agent = db.query(RoomAgent).filter( - RoomAgent.id == agent_id, - RoomAgent.room_id == room_id - ).first() - if not agent: - return error_response("Agent not found", 404) - - room = db.query(ChatRoom).filter(ChatRoom.id == room_id).first() - if room and room.status == "running": - return error_response("Cannot remove agents while room is running", 400) - - db.delete(agent) - db.commit() - return success_response(message="Agent removed") diff --git a/luxx/routes/providers.py b/luxx/routes/providers.py index f3f16bf..cf21df3 100644 --- a/luxx/routes/providers.py +++ b/luxx/routes/providers.py @@ -7,6 +7,7 @@ from sqlalchemy.orm import Session from luxx.database import get_db from luxx.models import User, LLMProvider from luxx.routes.auth import get_current_user +from luxx.services.llm_client import LLMClient from luxx.utils.helpers import success_response import httpx import asyncio @@ -178,9 +179,16 @@ def test_provider( # Test the connection async def test(): - async with httpx.AsyncClient(timeout=10.0) as client: - response = await client.post( - provider.base_url, + client = LLMClient( + api_key=provider.api_key, + api_url=provider.base_url, + model=provider.default_model, + provider_type=provider.provider_type + ) + endpoint = client.build_endpoint() + async with httpx.AsyncClient(timeout=10.0) as http_client: + response = await http_client.post( + endpoint, headers={ "Authorization": f"Bearer {provider.api_key}", "Content-Type": "application/json" diff --git a/luxx/services/chat_room.py b/luxx/services/chat_room.py deleted file mode 100644 index 1b7b9d5..0000000 --- a/luxx/services/chat_room.py +++ /dev/null @@ -1,540 +0,0 @@ -"""Multi-agent chat room service. - -Orchestrates multiple agents taking turns to discuss and solve a task. -Each agent uses its own LLM provider/model and system prompt. -""" -import json -import logging -import asyncio -import traceback -from typing import List, Dict, Any, AsyncGenerator, Optional - -from luxx.database import SessionLocal -from luxx.models import ChatRoom, RoomAgent, Message, LLMProvider, Agent, User -from luxx.services.llm_client import LLMClient -from luxx.services.stream_context import StreamState, StepType -from luxx.services.events import sse_event -from luxx.utils.helpers import generate_id -from luxx.tools.core import CommandPermission - -logger = logging.getLogger(__name__) - - -class ChatRoomOrchestrator: - """Orchestrates multi-agent conversations in a chat room.""" - - def __init__(self): - self._running_rooms: Dict[str, asyncio.Task] = {} - - def is_running(self, room_id: str) -> bool: - return room_id in self._running_rooms and not self._running_rooms[room_id].done() - - def cancel(self, room_id: str): - task = self._running_rooms.get(room_id) - if task and not task.done(): - task.cancel() - - async def run_room( - self, - room_id: str, - db_session=None - ) -> AsyncGenerator[str, None]: - """Run a chat room: agents take turns discussing the task.""" - db = db_session or SessionLocal() - own_session = db_session is None - - try: - room = db.query(ChatRoom).filter(ChatRoom.id == room_id).first() - if not room: - yield sse_event("error", {"content": "Room not found"}) - return - - agents = db.query(RoomAgent).filter( - RoomAgent.room_id == room_id - ).order_by(RoomAgent.turn_order).all() - - if not agents: - yield sse_event("error", {"content": "No agents in room"}) - return - - room.status = "running" - db.commit() - - # Yield room started event - yield sse_event("room_started", {"room_id": room_id, "task": room.task}) - - # Build conversation history from existing messages - history = self._load_history(room_id, db) - - # If no messages yet, add the task as the initial user message - if not history: - task_msg = Message( - id=generate_id("msg"), - room_id=room_id, - role="user", - content=json.dumps({"text": room.task}, ensure_ascii=False), - sender_name="用户", - sender_color="#10b981", - round_number=0 - ) - db.add(task_msg) - db.commit() - history.append({"role": "user", "content": room.task}) - yield sse_event("message", task_msg.to_dict()) - - # Run rounds based on execution mode - for round_num in range(room.current_round + 1, room.max_rounds + 1): - room.current_round = round_num - db.commit() - - yield sse_event("round_start", { - "round": round_num, - "max_rounds": room.max_rounds - }) - - if room.execution_mode == "parallel": - # Parallel execution: all agents at once - try: - async for event in self._parallel_round( - room_id, agents, history, round_num, db - ): - yield event - except asyncio.CancelledError: - room.status = "paused" - db.commit() - yield sse_event("room_paused", {"room_id": room_id, "round": round_num}) - return - else: - # Sequential execution: agents take turns - for agent in agents: - try: - async for event in self._agent_turn( - room_id, agent, history, round_num, db - ): - yield event - except asyncio.CancelledError: - room.status = "paused" - db.commit() - yield sse_event("room_paused", {"room_id": room_id, "round": round_num}) - return - except Exception as e: - logger.error(f"Agent {agent.name} error: {e}\n{traceback.format_exc()}") - yield sse_event("agent_error", { - "agent": agent.name, - "error": str(e) - }) - - yield sse_event("round_end", {"round": round_num}) - - # Completed - room.status = "completed" - db.commit() - yield sse_event("room_completed", { - "room_id": room_id, - "total_rounds": room.max_rounds - }) - - except asyncio.CancelledError: - room = db.query(ChatRoom).filter(ChatRoom.id == room_id).first() - if room: - room.status = "paused" - db.commit() - yield sse_event("room_paused", {"room_id": room_id}) - except Exception as e: - logger.error(f"Room error: {e}\n{traceback.format_exc()}") - room = db.query(ChatRoom).filter(ChatRoom.id == room_id).first() - if room: - room.status = "error" - db.commit() - yield sse_event("error", {"content": str(e)}) - finally: - if own_session: - db.close() - self._running_rooms.pop(room_id, None) - - def _get_creator_permission_level(self, agent: RoomAgent, db) -> int: - """Get the creator's permission level for this agent. - - If the agent is linked to a reusable Agent template, use that template's owner. - Otherwise, use the ChatRoom owner's permission. - """ - # If agent is linked to a reusable Agent template, use that template's owner - if agent.agent_id: - template_agent = db.query(Agent).filter(Agent.id == agent.agent_id).first() - if template_agent: - user = db.query(User).filter(User.id == template_agent.user_id).first() - if user: - return user.permission_level - - # Fallback to ChatRoom owner - room = db.query(ChatRoom).filter(ChatRoom.id == agent.room_id).first() - if room: - user = db.query(User).filter(User.id == room.user_id).first() - if user: - return user.permission_level - - # Default to READ_ONLY if no user found - return CommandPermission.READ_ONLY - - async def _agent_turn( - self, - room_id: str, - agent: RoomAgent, - history: List[Dict], - round_num: int, - db - ) -> AsyncGenerator[str, None]: - """Execute one agent's turn in the conversation with streaming output.""" - # Get LLM client for this agent - llm, max_tokens = self._create_llm_client(agent, db) - if not llm: - yield sse_event("agent_error", { - "agent": agent.name, - "error": "No LLM provider configured" - }) - return - - model = agent.model or llm.default_model or "gpt-4" - - # Get creator's permission level for tool execution - creator_permission = self._get_creator_permission_level(agent, db) - - # Build messages for this agent - messages = self._build_agent_messages(agent, history) - - # Create placeholder message for streaming updates - msg_id = generate_id("msg") - accumulated_content = "" - - # Yield streaming start event with placeholder - yield sse_event("message_start", { - "id": msg_id, - "room_id": room_id, - "role": "assistant", - "sender_name": agent.name, - "sender_color": agent.color, - "round_number": round_num - }) - - # Stream LLM response (without tools for now - chat room agents are text-only) - try: - async for delta in llm.stream_call( - model=model, - messages=messages, - temperature=0.7, - max_tokens=max_tokens or 2000 - ): - if delta.content: - accumulated_content += delta.content - yield sse_event("message_chunk", { - "id": msg_id, - "content": delta.content, - "accumulated": accumulated_content - }) - - if delta.is_complete: - break - except Exception as e: - logger.error(f"LLM stream failed for {agent.name}: {e}") - yield sse_event("agent_error", { - "agent": agent.name, - "error": f"LLM stream failed: {str(e)}" - }) - await llm.close() - return - - # Estimate token count - token_count = len(accumulated_content) // 4 - - # Build steps for storage - steps = [{"id": "step-0", "index": 0, "type": "text", "content": accumulated_content}] - content_json = {"steps": steps} - - # Save complete message to DB - msg = Message( - id=msg_id, - room_id=room_id, - role="assistant", - content=json.dumps(content_json, ensure_ascii=False), - token_count=token_count, - sender_name=agent.name, - sender_color=agent.color, - round_number=round_num - ) - db.add(msg) - db.commit() - - # Update history - history.append({"role": "assistant", "content": accumulated_content, "sender": agent.name}) - - # Yield message end event - yield sse_event("message_end", { - "id": msg_id, - "content": accumulated_content, - "token_count": token_count - }) - - # Also yield the complete message for consistency - msg_dict = msg.to_dict() - yield sse_event("message", msg_dict) - - # Close client - await llm.close() - - async def _parallel_round( - self, - room_id: str, - agents: List[RoomAgent], - history: List[Dict], - round_num: int, - db - ) -> AsyncGenerator[str, None]: - """Execute all agents in parallel for one round.""" - if not agents: - return - - # Yield parallel start event - yield sse_event("parallel_start", { - "round": round_num, - "max_rounds": self.max_rounds, - "agents": [{"id": a.id, "name": a.name} for a in agents] - }) - - # Create all agent tasks - tasks = [] - for agent in agents: - task = self._agent_turn_async( - room_id, agent, list(history), round_num, db - ) - tasks.append(task) - - # Execute in parallel and merge streams - async for event in self._merge_streams(tasks): - yield event - - # Yield parallel end event - yield sse_event("parallel_end", { - "round": round_num, - "agent_count": len(agents) - }) - - async def _agent_turn_async( - self, - room_id: str, - agent: RoomAgent, - history: List[Dict], - round_num: int, - db - ) -> AsyncGenerator[Dict[str, Any], None]: - """Execute a single agent turn asynchronously, yielding event stream.""" - # Yield agent status - pending - yield {"type": "agent_status", "agent_id": agent.id, "agent_name": agent.name, "status": "pending"} - - # Get LLM client for this agent - llm, max_tokens = self._create_llm_client(agent, db) - if not llm: - yield {"type": "agent_error", "agent_id": agent.id, "agent_name": agent.name, "error": "No LLM provider configured"} - return - - model = agent.model or llm.default_model or "gpt-4" - - # Get creator's permission level for tool execution (for future use) - creator_permission = self._get_creator_permission_level(agent, db) - - # Build messages for this agent - messages = self._build_agent_messages(agent, history) - - # Create placeholder message for streaming updates - msg_id = generate_id("msg") - accumulated_content = "" - - # Yield agent status - streaming - yield {"type": "agent_status", "agent_id": agent.id, "agent_name": agent.name, "status": "streaming"} - - # Yield streaming start event with placeholder - yield {"type": "message_start", "id": msg_id, "room_id": room_id, "role": "assistant", - "sender_name": agent.name, "sender_color": agent.color, "round_number": round_num, "agent_id": agent.id} - - # Stream LLM response - try: - async for delta in llm.stream_call( - model=model, - messages=messages, - temperature=0.7, - max_tokens=max_tokens or 2000 - ): - if delta.content: - accumulated_content += delta.content - # Estimate progress based on content length (assume max ~2000 chars) - progress = min(95, int(len(accumulated_content) / 20)) - yield {"type": "message_chunk", "id": msg_id, "content": delta.content, - "accumulated": accumulated_content, "agent_id": agent.id, - "progress": progress} - - if delta.is_complete: - break - except Exception as e: - logger.error(f"LLM stream failed for {agent.name}: {e}") - yield {"type": "agent_error", "agent_id": agent.id, "agent_name": agent.name, "error": f"LLM stream failed: {str(e)}"} - await llm.close() - return - - # Estimate token count - token_count = len(accumulated_content) // 4 - - # Build steps for storage - steps = [{"id": "step-0", "index": 0, "type": "text", "content": accumulated_content}] - content_json = {"steps": steps} - - # Save complete message to DB - msg = Message( - id=msg_id, - room_id=room_id, - role="assistant", - content=json.dumps(content_json, ensure_ascii=False), - token_count=token_count, - sender_name=agent.name, - sender_color=agent.color, - round_number=round_num - ) - db.add(msg) - db.commit() - - # Update history - history.append({"role": "assistant", "content": accumulated_content, "sender": agent.name}) - - # Yield agent status - completed - yield {"type": "agent_status", "agent_id": agent.id, "agent_name": agent.name, "status": "completed"} - - # Yield message end event - yield {"type": "message_end", "id": msg_id, "content": accumulated_content, - "token_count": token_count, "agent_id": agent.id} - - # Also yield the complete message for consistency - msg_dict = msg.to_dict() - yield {"type": "message", "message": msg_dict} - - # Close client - await llm.close() - - async def _merge_streams( - self, tasks: List[AsyncGenerator] - ) -> AsyncGenerator[str, None]: - """Merge multiple streams while maintaining real-time output.""" - import asyncio - - async def consume_stream(stream, queue): - try: - async for event in stream: - await queue.put(event) - except Exception as e: - logger.error(f"Stream error: {e}") - finally: - await queue.put(None) # Mark end - - queue = asyncio.Queue() - consumers = [asyncio.create_task(consume_stream(t, queue)) for t in tasks] - - completed = 0 - while completed < len(tasks): - event = await queue.get() - if event is None: - completed += 1 - else: - # Convert dict event to SSE format - if isinstance(event, dict) and "type" in event: - if event["type"] == "message": - yield sse_event("message", event.get("message", {})) - elif event["type"] == "message_start": - yield sse_event("message_start", {k: v for k, v in event.items() if k != "type"}) - elif event["type"] == "message_chunk": - yield sse_event("message_chunk", {k: v for k, v in event.items() if k != "type"}) - elif event["type"] == "message_end": - yield sse_event("message_end", {k: v for k, v in event.items() if k != "type"}) - elif event["type"] == "agent_status": - yield sse_event("agent_status", {k: v for k, v in event.items() if k != "type"}) - elif event["type"] == "agent_error": - yield sse_event("agent_error", {k: v for k, v in event.items() if k != "type"}) - else: - yield sse_event(event["type"], {k: v for k, v in event.items() if k != "type"}) - - # Ensure all tasks complete - await asyncio.gather(*consumers, return_exceptions=True) - - def _create_llm_client(self, agent: RoomAgent, db) -> tuple: - """Create LLM client for an agent.""" - if agent.provider_id: - provider = db.query(LLMProvider).filter( - LLMProvider.id == agent.provider_id - ).first() - if provider: - client = LLMClient( - api_key=provider.api_key, - api_url=provider.base_url, - model=agent.model or provider.default_model, - provider_type=provider.provider_type - ) - return client, provider.max_tokens - - return None, None - - def _build_agent_messages(self, agent: RoomAgent, history: List[Dict]) -> List[Dict]: - """Build the message list for an agent's LLM call.""" - messages = [{"role": "system", "content": agent.system_prompt}] - - for h in history: - role = h.get("role", "user") - content = h.get("content", "") - sender = h.get("sender", "") - - if role == "user": - messages.append({"role": "user", "content": content}) - elif role == "assistant": - # Prefix with sender name so the agent knows who said what - prefix = f"[{sender}]: " if sender else "" - messages.append({"role": "assistant", "content": prefix + content}) - - return messages - - def _load_history(self, room_id: str, db) -> List[Dict]: - """Load conversation history from existing room messages.""" - messages = db.query(Message).filter( - Message.room_id == room_id - ).order_by(Message.created_at).all() - - history = [] - for msg in messages: - # Extract text from message content - text = self._extract_text(msg.content) - entry = {"role": msg.role, "content": text} - if msg.sender_name and msg.role == "assistant": - entry["sender"] = msg.sender_name - history.append(entry) - - return history - - @staticmethod - def _extract_text(content: str) -> str: - """Extract text from message content JSON.""" - if not content: - return "" - try: - parsed = json.loads(content) - if isinstance(parsed, dict): - # Try steps-based format - steps = parsed.get("steps", []) - if steps: - return "".join( - s.get("content", "") for s in steps - if s.get("type") == "text" - ) - # Try simple text format - if "text" in parsed: - return parsed["text"] - return content - except (json.JSONDecodeError, TypeError): - return content - - -# Singleton orchestrator -orchestrator = ChatRoomOrchestrator() diff --git a/luxx/services/llm_adapters/anthropic_adapter.py b/luxx/services/llm_adapters/anthropic_adapter.py index 353bf7a..83c7ff6 100644 --- a/luxx/services/llm_adapters/anthropic_adapter.py +++ b/luxx/services/llm_adapters/anthropic_adapter.py @@ -61,6 +61,10 @@ class AnthropicAdapter(ProviderAdapter): def provider_type(self) -> str: return "anthropic" + @property + def api_path(self) -> str: + return self.MESSAGES_PATH + def build_request( self, model: str, diff --git a/luxx/services/llm_adapters/base.py b/luxx/services/llm_adapters/base.py index 33c3bfd..8330ab1 100644 --- a/luxx/services/llm_adapters/base.py +++ b/luxx/services/llm_adapters/base.py @@ -28,7 +28,16 @@ class ProviderAdapter(ABC): str: Provider type, e.g., "openai", "anthropic" """ pass - + + @property + def api_path(self) -> str: + """API endpoint path suffix to append to base URL + + Returns: + str: Path suffix, e.g., "/chat/completions" for OpenAI + """ + return "" + @abstractmethod def build_request( self, diff --git a/luxx/services/llm_adapters/openai_adapter.py b/luxx/services/llm_adapters/openai_adapter.py index fbce776..fc41c01 100644 --- a/luxx/services/llm_adapters/openai_adapter.py +++ b/luxx/services/llm_adapters/openai_adapter.py @@ -20,6 +20,10 @@ class OpenAIAdapter(ProviderAdapter): def provider_type(self) -> str: return "openai" + @property + def api_path(self) -> str: + return "/chat/completions" + def build_request(self, model: str, messages: List[Dict], tools=None, **kwargs) -> tuple: api_key = kwargs.get("api_key", "") headers = {"Content-Type": "application/json", "Authorization": f"Bearer {api_key}"} @@ -59,9 +63,15 @@ class OpenAIAdapter(ProviderAdapter): choices = chunk.get("choices", []) if not choices: + # DeepSeek may send usage in a separate chunk without choices usage = chunk.get("usage") if usage: - logger.debug(f"Usage chunk: {usage}") + logger.info(f"[TOKEN] Received usage from stream: {usage}") + yield ParsedDelta(usage={ + "prompt_tokens": usage.get("prompt_tokens", 0), + "completion_tokens": usage.get("completion_tokens", 0), + "total_tokens": usage.get("total_tokens", 0) + }) return choice = choices[0] @@ -80,8 +90,8 @@ class OpenAIAdapter(ProviderAdapter): for tc in tool_calls: yield ParsedDelta(tool_call=tc) - # Set is_complete for final chunks - if finish_reason in ("stop", "tool_calls"): + # Set is_complete for final chunks (DeepSeek may return null, "length", "content_filter") + if finish_reason and finish_reason not in (None, ""): yield ParsedDelta(is_complete=True) def parse_response(self, data: Dict) -> Dict: diff --git a/luxx/services/llm_client.py b/luxx/services/llm_client.py index 60f3424..7da0eef 100644 --- a/luxx/services/llm_client.py +++ b/luxx/services/llm_client.py @@ -174,6 +174,23 @@ class LLMClient: """Whether current Provider supports tool calls""" return self.adapter.supports_tools() + def build_endpoint(self) -> str: + """Build full API endpoint URL by appending adapter's API path + + Handles cases where base_url already contains the path: + - https://api.deepseek.com/v1 + /chat/completions → keep as-is + - https://api.deepseek.com + /chat/completions → https://api.deepseek.com/chat/completions + """ + base = self.api_url.rstrip('/') + api_path = self.adapter.api_path + if not api_path: + return base + known_endings = ['/chat/completions', '/v1/messages', '/v1/chat/completions'] + for ending in known_endings: + if base.endswith(ending): + return base + return base + api_path + async def client(self) -> httpx.AsyncClient: """Get HTTP client (lazy load)""" if self._client is None or self._client.is_closed: @@ -224,7 +241,7 @@ class LLMClient: model, messages, tools, stream=False, **kwargs ) - endpoint = self.api_url + endpoint = self.build_endpoint() logger.info(f"Sync call to {endpoint} with model {model}") try: @@ -240,7 +257,8 @@ class LLMClient: return self.adapter.parse_response(data) except httpx.HTTPStatusError as e: - logger.error(f"HTTP error: {e.response.status_code} - {e.response.text}") + error_body = e.response.text if e.response else "" + logger.error(f"HTTP error: {e.response.status_code} - {error_body}") raise except Exception as e: logger.error(f"Sync call error: {e}\n{traceback.format_exc()}") @@ -276,7 +294,7 @@ class LLMClient: model, messages, tools, **kwargs ) - endpoint = self.api_url + endpoint = self.build_endpoint() logger.info(f"Stream call to {endpoint} with model {model}") try: @@ -304,12 +322,18 @@ class LLMClient: yield ParsedDelta(is_complete=True) continue async for delta in self.adapter.parse_stream_chunk(event): - if delta.content or delta.has_tool_call() or delta.is_complete: + if delta.content or delta.has_tool_call() or delta.is_complete or delta.usage: yield delta except httpx.HTTPStatusError as e: status_code = e.response.status_code if e.response else "?" - error_body = e.response.text if e.response else "" + error_body = "" + if e.response: + try: + await e.response.aread() + error_body = e.response.text + except Exception: + pass logger.error(f"HTTP error: {status_code} - {error_body}") yield ParsedDelta() except Exception as e: