diff --git a/luxx/__init__.py b/luxx/__init__.py index bd1e09d..6082ccc 100644 --- a/luxx/__init__.py +++ b/luxx/__init__.py @@ -3,10 +3,11 @@ import logging from contextlib import asynccontextmanager from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware +from fastapi.websockets import WebSocket -from luxx.config import config -from luxx.database import init_db -from luxx.routes import api_router +from luxx.core.config import config +from luxx.core.database import init_db +from luxx.api import api_router logger = logging.getLogger(__name__) @@ -15,14 +16,15 @@ logger = logging.getLogger(__name__) async def lifespan(app: FastAPI): """Application lifespan manager""" # Import all models to ensure they are registered with Base - from luxx.models import User, Conversation, Message, Project, LLMProvider # noqa + from luxx.models.user import User, LLMProvider, Project + from luxx.models.chat import Conversation, Message + from luxx.models.room import ChatRoom, Agent, ChatRoomAgent, ChatRoomMessage init_db() - + # Create default test user if not exists - from luxx.database import SessionLocal - from luxx.models import User + from luxx.core.database import SessionLocal from luxx.utils.helpers import hash_password - + db = SessionLocal() try: default_user = db.query(User).filter(User.username == "admin").first() @@ -37,9 +39,10 @@ async def lifespan(app: FastAPI): logger.info("Default admin user created: admin / admin") finally: db.close() - + # Import and register tools from luxx.tools.builtin import crawler, code, data + yield @@ -51,24 +54,31 @@ def create_app() -> FastAPI: version="1.0.0", lifespan=lifespan ) - + # Configure CORS app.add_middleware( CORSMiddleware, - allow_origins=["*"], # Should be restricted in production + allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) - + # Register routes app.include_router(api_router, prefix="/api") - + + # WebSocket endpoint for chat rooms + from luxx.services.room_ws import websocket_handler + + @app.websocket("/ws/chat-room/{room_id}") + async def chat_room_websocket(websocket: WebSocket, room_id: str): + await websocket_handler(websocket, room_id) + # Health check @app.get("/health") async def health_check(): return {"status": "healthy", "service": "luxx"} - + @app.get("/") async def root(): return { @@ -76,7 +86,7 @@ def create_app() -> FastAPI: "version": "1.0.0", "docs": "/docs" } - + return app diff --git a/luxx/agents/__init__.py b/luxx/agents/__init__.py new file mode 100644 index 0000000..b87966c --- /dev/null +++ b/luxx/agents/__init__.py @@ -0,0 +1,5 @@ +"""Agents package""" +from luxx.agents.base import BaseAgent +from luxx.agents.registry import agent_registry + +__all__ = ["BaseAgent", "agent_registry"] diff --git a/luxx/agents/base.py b/luxx/agents/base.py new file mode 100644 index 0000000..481ba7e --- /dev/null +++ b/luxx/agents/base.py @@ -0,0 +1,288 @@ +"""Base Agent class""" +import json +import uuid +import logging +from typing import List, Dict, Any, Optional, AsyncGenerator +from abc import ABC, abstractmethod + +from luxx.services.llm_client import LLMClient + +logger = logging.getLogger(__name__) + + +class BaseAgent(ABC): + """Base class for all agents""" + + def __init__( + self, + agent_id: str, + name: str, + role: str, + system_prompt: str, + provider_id: int = None, + model: str = None, + tools: List[str] = None, + temperature: float = 0.7, + max_tokens: int = 2048, + priority: int = 5, + auto_response: bool = True, + mention_trigger: bool = False, + avatar: str = None + ): + self.agent_id = agent_id + self.name = name + self.role = role + self.system_prompt = system_prompt + self.provider_id = provider_id + self.model = model + self.tools = tools or [] + self.temperature = temperature + self.max_tokens = max_tokens + self.priority = priority + self.auto_response = auto_response + self.mention_trigger = mention_trigger + self.avatar = avatar + self.llm_client = None + + def _get_llm_client(self, room_id: str = None): + """Get LLM client, optionally using agent's provider""" + if self.llm_client: + return self.llm_client + + if self.provider_id: + from luxx.database import SessionLocal + from luxx.models import LLMProvider + db = SessionLocal() + try: + provider = db.query(LLMProvider).filter(LLMProvider.id == self.provider_id).first() + if provider: + self.llm_client = LLMClient( + api_key=provider.api_key, + api_url=provider.base_url, + model=provider.default_model + ) + return self.llm_client + finally: + db.close() + + # Fallback to global config + self.llm_client = LLMClient() + return self.llm_client + + async def stream_response( + self, + user_message: str, + conversation_history: List[Dict] = None, + context: Dict = None, + thinking_enabled: bool = False + ) -> AsyncGenerator[Dict[str, Any], None]: + """ + Generate streaming response for the agent. + + Args: + user_message: The user's message + conversation_history: Previous messages in the room + context: Additional context (workspace, user info, etc.) + thinking_enabled: Enable reasoning chain + + Yields: + SSE-formatted event dictionaries + """ + messages = [] + + # Add system prompt + final_system_prompt = self._build_system_prompt(context) + messages.append({"role": "system", "content": final_system_prompt}) + + # Add conversation history (last 10 messages) + if conversation_history: + for msg in conversation_history[-10:]: + role = "assistant" if msg["sender_type"] == "agent" else "user" + messages.append({ + "role": role, + "content": msg["content"] + }) + + # Add current user message + messages.append({"role": "user", "content": user_message}) + + # Get LLM client + llm = self._get_llm_client() + + # Get tools if enabled + enabled_tools = [] + if self.tools: + from luxx.tools.core import registry + for tool_name in self.tools: + tool = registry.get(tool_name) + if tool: + enabled_tools.append(tool) + + # Stream response + step_index = 0 + full_content = "" + + try: + async for sse_line in llm.stream_call( + model=self.model or llm.default_model, + messages=messages, + tools=enabled_tools if enabled_tools else None, + temperature=self.temperature, + max_tokens=self.max_tokens, + thinking_enabled=thinking_enabled + ): + # Parse SSE line + event_type = None + data_str = None + + for line in sse_line.strip().split('\n'): + if line.startswith('event: '): + event_type = line[7:].strip() + elif line.startswith('data: '): + data_str = line[6:].strip() + + if data_str is None: + continue + + # Handle error events + if event_type == 'error': + try: + error_data = json.loads(data_str) + yield { + "event": "error", + "data": {"content": error_data.get("content", "Unknown error")} + } + except json.JSONDecodeError: + yield { + "event": "error", + "data": {"content": data_str} + } + return + + # Parse the data + try: + chunk = json.loads(data_str) + except json.JSONDecodeError: + continue + + # Check for error in response + if "error" in chunk: + error_msg = chunk["error"].get("message", str(chunk["error"])) + yield { + "event": "error", + "data": {"content": f"API Error: {error_msg}"} + } + return + + # Get delta + choices = chunk.get("choices", []) + if not choices: + continue + + delta = choices[0].get("delta", {}) + + # Handle reasoning (thinking) + reasoning = delta.get("reasoning_content", "") + if reasoning: + step_index += 1 + yield { + "event": "process_step", + "data": { + "step": { + "id": f"{self.agent_id}-step-{step_index}", + "type": "thinking", + "content": reasoning + } + } + } + + # Handle content + content = delta.get("content", "") + if content: + step_index += 1 + full_content += content + yield { + "event": "process_step", + "data": { + "step": { + "id": f"{self.agent_id}-step-{step_index}", + "type": "text", + "content": full_content + } + } + } + + # Final message + yield { + "event": "done", + "data": { + "message_id": str(uuid.uuid4()), + "agent_id": self.agent_id, + "agent_name": self.name, + "content": full_content, + "token_count": len(full_content) // 4 + } + } + + except Exception as e: + logger.error(f"Agent {self.name} stream error: {e}") + yield { + "event": "error", + "data": {"content": str(e)} + } + + def _build_system_prompt(self, context: Dict = None) -> str: + """Build the final system prompt with context""" + prompt = self.system_prompt + if context: + workspace = context.get("workspace", "") + if workspace: + prompt += f"\n\nCurrent workspace: {workspace}" + user_name = context.get("username", "") + if user_name: + prompt += f"\nCurrent user: {user_name}" + return prompt + + def to_dict(self) -> Dict[str, Any]: + """Convert agent to dictionary""" + return { + "id": self.agent_id, + "name": self.name, + "role": self.role, + "avatar": self.avatar, + "system_prompt": self.system_prompt, + "model": self.model, + "tools": self.tools, + "priority": self.priority, + "auto_response": self.auto_response, + "mention_trigger": self.mention_trigger, + "temperature": self.temperature, + "max_tokens": self.max_tokens + } + + @classmethod + def from_model(cls, agent_db_model) -> "BaseAgent": + """Create agent instance from database model""" + import json + tools = [] + if agent_db_model.tools: + try: + tools = json.loads(agent_db_model.tools) + except json.JSONDecodeError: + pass + + return cls( + agent_id=agent_db_model.id, + name=agent_db_model.name, + role=agent_db_model.role, + system_prompt=agent_db_model.system_prompt, + provider_id=agent_db_model.provider_id, + model=agent_db_model.model, + tools=tools, + temperature=float(agent_db_model.temperature) if agent_db_model.temperature else 0.7, + max_tokens=agent_db_model.max_tokens or 2048, + priority=agent_db_model.priority or 5, + auto_response=agent_db_model.auto_response, + mention_trigger=agent_db_model.mention_trigger, + avatar=agent_db_model.avatar + ) diff --git a/luxx/agents/builtins/__init__.py b/luxx/agents/builtins/__init__.py new file mode 100644 index 0000000..fcffce8 --- /dev/null +++ b/luxx/agents/builtins/__init__.py @@ -0,0 +1 @@ +# Builtins package - user-defined agent templates can be placed here diff --git a/luxx/agents/registry.py b/luxx/agents/registry.py new file mode 100644 index 0000000..443cb5e --- /dev/null +++ b/luxx/agents/registry.py @@ -0,0 +1,63 @@ +"""Agent Registry - manages agent instances in memory""" +from typing import Dict, List, Optional +import logging + +from luxx.agents.base import BaseAgent + +logger = logging.getLogger(__name__) + + +class AgentRegistry: + """Registry for managing agent instances""" + + def __init__(self): + self._agents: Dict[str, BaseAgent] = {} + + def register(self, agent: BaseAgent) -> None: + """Register an agent instance""" + self._agents[agent.agent_id] = agent + logger.info(f"Registered agent: {agent.name} ({agent.role})") + + def unregister(self, agent_id: str) -> bool: + """Unregister an agent""" + if agent_id in self._agents: + agent = self._agents.pop(agent_id) + logger.info(f"Unregistered agent: {agent.name}") + return True + return False + + def get(self, agent_id: str) -> Optional[BaseAgent]: + """Get an agent by ID""" + return self._agents.get(agent_id) + + def get_by_role(self, role: str) -> Optional[BaseAgent]: + """Get an agent by role (returns first match)""" + for agent in self._agents.values(): + if agent.role == role: + return agent + return None + + def list_all(self) -> List[BaseAgent]: + """List all registered agents""" + return list(self._agents.values()) + + def list_by_role(self, role: str) -> List[BaseAgent]: + """List all agents with a specific role""" + return [a for a in self._agents.values() if a.role == role] + + def list_auto_response(self) -> List[BaseAgent]: + """List all agents that auto-respond""" + return [a for a in self._agents.values() if a.auto_response and not a.mention_trigger] + + def list_mention_trigger(self) -> List[BaseAgent]: + """List all agents that respond on mention""" + return [a for a in self._agents.values() if a.mention_trigger] + + def clear(self) -> None: + """Clear all registered agents""" + self._agents.clear() + logger.info("Cleared all registered agents") + + +# Global registry instance +agent_registry = AgentRegistry() diff --git a/luxx/api/__init__.py b/luxx/api/__init__.py new file mode 100644 index 0000000..9508760 --- /dev/null +++ b/luxx/api/__init__.py @@ -0,0 +1,16 @@ +"""API routes module""" +from fastapi import APIRouter + +from luxx.api import auth, chat, tools, providers, agents, rooms + + +api_router = APIRouter() + +# Register sub-routes +api_router.include_router(auth.router) +api_router.include_router(chat.conversations.router) +api_router.include_router(chat.messages.router) +api_router.include_router(tools.router) +api_router.include_router(providers.router) +api_router.include_router(agents.router) +api_router.include_router(rooms.router) diff --git a/luxx/api/agents.py b/luxx/api/agents.py new file mode 100644 index 0000000..d2d5926 --- /dev/null +++ b/luxx/api/agents.py @@ -0,0 +1,108 @@ +"""Agent Management API Routes""" +from typing import List, Optional +from fastapi import APIRouter, HTTPException +from pydantic import BaseModel + +from luxx.services.agent import agent_manager + +router = APIRouter(prefix="/agents", tags=["agents"]) + + +class CreateAgentRequest(BaseModel): + name: str + role: str + system_prompt: str + provider_id: Optional[int] = None + model: Optional[str] = None + tools: Optional[List[str]] = None + priority: int = 5 + auto_response: bool = True + mention_trigger: bool = False + temperature: float = 0.7 + max_tokens: int = 2048 + avatar: Optional[str] = None + + +class UpdateAgentRequest(BaseModel): + name: Optional[str] = None + role: Optional[str] = None + system_prompt: Optional[str] = None + provider_id: Optional[int] = None + model: Optional[str] = None + tools: Optional[List[str]] = None + priority: Optional[int] = None + auto_response: Optional[bool] = None + mention_trigger: Optional[bool] = None + temperature: Optional[float] = None + max_tokens: Optional[int] = None + is_active: Optional[bool] = None + avatar: Optional[str] = None + + +@router.get("") +async def list_agents(include_inactive: bool = False): + """List all agents""" + agents = agent_manager.list_agents(include_inactive=include_inactive) + return {"agents": agents} + + +@router.post("") +async def create_agent(request: CreateAgentRequest): + """Create a new agent""" + agent = agent_manager.create_agent( + name=request.name, + role=request.role, + system_prompt=request.system_prompt, + provider_id=request.provider_id, + model=request.model, + tools=request.tools, + priority=request.priority, + auto_response=request.auto_response, + mention_trigger=request.mention_trigger, + temperature=request.temperature, + max_tokens=request.max_tokens, + avatar=request.avatar + ) + return {"agent": agent} + + +@router.get("/{agent_id}") +async def get_agent(agent_id: str): + """Get an agent by ID""" + agent = agent_manager.get_agent(agent_id) + if not agent: + raise HTTPException(status_code=404, detail="Agent not found") + return {"agent": agent} + + +@router.put("/{agent_id}") +async def update_agent(agent_id: str, request: UpdateAgentRequest): + """Update an agent""" + agent = agent_manager.update_agent( + agent_id=agent_id, + name=request.name, + role=request.role, + system_prompt=request.system_prompt, + provider_id=request.provider_id, + model=request.model, + tools=request.tools, + priority=request.priority, + auto_response=request.auto_response, + mention_trigger=request.mention_trigger, + temperature=request.temperature, + max_tokens=request.max_tokens, + is_active=request.is_active, + avatar=request.avatar + ) + if not agent: + raise HTTPException(status_code=404, detail="Agent not found") + return {"agent": agent} + + +@router.delete("/{agent_id}") +async def delete_agent(agent_id: str): + """Delete an agent""" + success = agent_manager.delete_agent(agent_id) + if not success: + raise HTTPException(status_code=404, detail="Agent not found") + return {"success": True} diff --git a/luxx/routes/auth.py b/luxx/api/auth.py similarity index 93% rename from luxx/routes/auth.py rename to luxx/api/auth.py index 00daaae..d3f5b40 100644 --- a/luxx/routes/auth.py +++ b/luxx/api/auth.py @@ -5,8 +5,8 @@ from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from sqlalchemy.orm import Session from pydantic import BaseModel -from luxx.database import get_db -from luxx.models import User +from luxx.core.database import get_db +from luxx.models.user import User from luxx.utils.helpers import ( hash_password, verify_password, @@ -23,20 +23,17 @@ oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/auth/login") class UserRegister(BaseModel): - """User registration model""" username: str email: str | None = None password: str class UserLogin(BaseModel): - """User login model""" username: str password: str class UserResponse(BaseModel): - """User response model""" id: int username: str email: str | None @@ -45,12 +42,10 @@ class UserResponse(BaseModel): class UserPermissionUpdate(BaseModel): - """User permission update model""" permission_level: int class TokenResponse(BaseModel): - """Token response model""" access_token: str token_type: str @@ -85,22 +80,23 @@ def register(user_data: UserRegister, db: Session = Depends(get_db)): existing_user = db.query(User).filter(User.username == user_data.username).first() if existing_user: return error_response("Username already exists", 400) - + if user_data.email: existing_email = db.query(User).filter(User.email == user_data.email).first() if existing_email: return error_response("Email already registered", 400) - + password_hash = hash_password(user_data.password) user = User( username=user_data.username, email=user_data.email, password_hash=password_hash ) + db.add(user) db.commit() db.refresh(user) - + return success_response( data={"id": user.id, "username": user.username}, message="Registration successful" @@ -111,18 +107,18 @@ def register(user_data: UserRegister, db: Session = Depends(get_db)): def login(user_data: UserLogin, db: Session = Depends(get_db)): """User login""" user = db.query(User).filter(User.username == user_data.username).first() - + if not user or not verify_password(user_data.password, user.password_hash or ""): return error_response("Invalid username or password", 401) - + if not user.is_active: return error_response("User account is disabled", 403) - + access_token = create_access_token( data={"sub": str(user.id)}, expires_delta=timedelta(days=7) ) - + return success_response( data={ "access_token": access_token, @@ -135,7 +131,7 @@ def login(user_data: UserLogin, db: Session = Depends(get_db)): @router.post("/logout") def logout(current_user: User = Depends(get_current_user)): - """User logout (client should delete token)""" + """User logout""" return success_response(message="Logout successful") @@ -158,12 +154,11 @@ def update_user(user_id: int, data: UserPermissionUpdate, admin_user: User = Dep user = db.query(User).filter(User.id == user_id).first() if not user: return error_response("User not found", 404) - - # Validate permission level + if data.permission_level < 1 or data.permission_level > 4: return error_response("Invalid permission level (1-4)", 400) - + user.permission_level = data.permission_level db.commit() - + return success_response(data=user.to_dict(), message="User permission updated") diff --git a/luxx/api/chat/__init__.py b/luxx/api/chat/__init__.py new file mode 100644 index 0000000..8abe63d --- /dev/null +++ b/luxx/api/chat/__init__.py @@ -0,0 +1 @@ +"""Chat API package""" diff --git a/luxx/routes/conversations.py b/luxx/api/chat/conversations.py similarity index 89% rename from luxx/routes/conversations.py rename to luxx/api/chat/conversations.py index d4c0b11..1a666d3 100644 --- a/luxx/routes/conversations.py +++ b/luxx/api/chat/conversations.py @@ -1,12 +1,13 @@ """Conversation routes""" -from typing import Optional, List +from typing import Optional from fastapi import APIRouter, Depends from pydantic import BaseModel from sqlalchemy.orm import Session -from luxx.database import get_db -from luxx.models import Conversation, Message, User -from luxx.routes.auth import get_current_user +from luxx.core.database import get_db +from luxx.models.chat import Conversation, Message +from luxx.models.user import User +from luxx.api.auth import get_current_user from luxx.utils.helpers import generate_id, success_response, error_response, paginate @@ -14,7 +15,6 @@ router = APIRouter(prefix="/conversations", tags=["Conversations"]) class ConversationCreate(BaseModel): - """Create conversation model""" project_id: Optional[str] = None provider_id: Optional[int] = None title: Optional[str] = None @@ -26,7 +26,6 @@ class ConversationCreate(BaseModel): class ConversationUpdate(BaseModel): - """Update conversation model""" title: Optional[str] = None model: Optional[str] = None system_prompt: Optional[str] = None @@ -46,19 +45,17 @@ def list_conversations( import json query = db.query(Conversation).filter(Conversation.user_id == current_user.id) result = paginate(query.order_by(Conversation.updated_at.desc()), page, page_size) - + items = [] for c in result["items"]: conv_dict = c.to_dict() - # Get first user message for fallback title first_msg = db.query(Message).filter( Message.conversation_id == c.id, Message.role == 'user' ).order_by(Message.created_at).first() if first_msg: conv_dict['first_message'] = first_msg.content[:50] + ('...' if len(first_msg.content) > 50 else '') - - # Calculate total tokens from all assistant messages in this conversation + assistant_messages = db.query(Message).filter( Message.conversation_id == c.id, Message.role == 'assistant' @@ -66,7 +63,6 @@ def list_conversations( total_tokens = 0 for msg in assistant_messages: total_tokens += msg.token_count or 0 - # Also try to get usage from the usage field if msg.usage: try: usage_obj = json.loads(msg.usage) @@ -75,7 +71,7 @@ def list_conversations( pass conv_dict['token_count'] = total_tokens items.append(conv_dict) - + return success_response(data={ "items": items, "total": result["total"], @@ -91,21 +87,19 @@ def create_conversation( db: Session = Depends(get_db) ): """Create conversation""" - from luxx.models import LLMProvider - - # Get provider info - use default provider if not specified + from luxx.models.user import LLMProvider + provider_id = data.provider_id model = data.model - + if not provider_id: - # Find default provider default_provider = db.query(LLMProvider).filter( LLMProvider.user_id == current_user.id, LLMProvider.is_default == True ).first() if default_provider: provider_id = default_provider.id - + if provider_id and not model: provider = db.query(LLMProvider).filter( LLMProvider.id == provider_id, @@ -113,10 +107,10 @@ def create_conversation( ).first() if provider: model = provider.default_model - + if not model: model = "gpt-4" - + conversation = Conversation( id=generate_id("conv"), user_id=current_user.id, @@ -129,11 +123,11 @@ def create_conversation( max_tokens=data.max_tokens, thinking_enabled=data.thinking_enabled ) - + db.add(conversation) db.commit() db.refresh(conversation) - + return success_response(data=conversation.to_dict(), message="Conversation created successfully") @@ -148,10 +142,10 @@ def get_conversation( Conversation.id == conversation_id, Conversation.user_id == current_user.id ).first() - + if not conversation: return error_response("Conversation not found", 404) - + return success_response(data=conversation.to_dict()) @@ -167,17 +161,17 @@ def update_conversation( Conversation.id == conversation_id, Conversation.user_id == current_user.id ).first() - + if not conversation: return error_response("Conversation not found", 404) - + update_data = data.dict(exclude_unset=True) for key, value in update_data.items(): setattr(conversation, key, value) - + db.commit() db.refresh(conversation) - + return success_response(data=conversation.to_dict(), message="Conversation updated successfully") @@ -192,11 +186,11 @@ def delete_conversation( Conversation.id == conversation_id, Conversation.user_id == current_user.id ).first() - + if not conversation: return error_response("Conversation not found", 404) - + db.delete(conversation) db.commit() - + return success_response(message="Conversation deleted successfully") diff --git a/luxx/routes/messages.py b/luxx/api/chat/messages.py similarity index 91% rename from luxx/routes/messages.py rename to luxx/api/chat/messages.py index fac223d..e8a39ee 100644 --- a/luxx/routes/messages.py +++ b/luxx/api/chat/messages.py @@ -1,15 +1,15 @@ """Message routes""" -import json -from typing import List, Optional +from typing import List from fastapi import APIRouter, Depends, Response from fastapi.responses import StreamingResponse from pydantic import BaseModel from sqlalchemy.orm import Session from datetime import datetime -from luxx.database import get_db -from luxx.models import Conversation, Message, User -from luxx.routes.auth import get_current_user +from luxx.core.database import get_db +from luxx.models.chat import Conversation, Message +from luxx.models.user import User +from luxx.api.auth import get_current_user from luxx.services.chat import chat_service from luxx.utils.helpers import generate_id, success_response, error_response @@ -18,15 +18,13 @@ router = APIRouter(prefix="/messages", tags=["Messages"]) class MessageCreate(BaseModel): - """Create message model""" conversation_id: str content: str thinking_enabled: bool = False - enabled_tools: List[str] = [] # 启用的工具名称列表 + enabled_tools: List[str] = [] class MessageResponse(BaseModel): - """Message response model""" id: str role: str content: str @@ -44,14 +42,14 @@ def list_messages( Conversation.id == conversation_id, Conversation.user_id == current_user.id ).first() - + if not conversation: return error_response("Conversation not found", 404) - + messages = db.query(Message).filter( Message.conversation_id == conversation_id ).order_by(Message.created_at).all() - + return success_response(data={ "messages": [m.to_dict() for m in messages], "title": conversation.title, @@ -70,10 +68,10 @@ def send_message( Conversation.id == data.conversation_id, Conversation.user_id == current_user.id ).first() - + if not conversation: return error_response("Conversation not found", 404) - + user_message = Message( id=generate_id("msg"), conversation_id=data.conversation_id, @@ -81,21 +79,22 @@ def send_message( content=data.content, token_count=len(data.content) // 4 ) + db.add(user_message) - + conversation.updated_at = datetime.now() - + response = chat_service.non_stream_response( conversation=conversation, user_message=data.content, tools_enabled=False ) - + if not response.get("success"): return error_response(response.get("error", "Failed to generate response"), 500) - + ai_content = response.get("content", "") - + ai_message = Message( id=generate_id("msg"), conversation_id=data.conversation_id, @@ -103,9 +102,10 @@ def send_message( content=ai_content, token_count=len(ai_content) // 4 ) + db.add(ai_message) db.commit() - + return success_response(data={ "user_message": user_message.to_dict(), "assistant_message": ai_message.to_dict() @@ -123,10 +123,10 @@ async def stream_message( Conversation.id == data.conversation_id, Conversation.user_id == current_user.id ).first() - + if not conversation: return error_response("Conversation not found", 404) - + user_message = Message( id=generate_id("msg"), conversation_id=data.conversation_id, @@ -134,12 +134,13 @@ async def stream_message( content=data.content, token_count=len(data.content) // 4 ) + db.add(user_message) conversation.updated_at = datetime.now() db.commit() - + workspace = current_user.workspace_path if current_user.workspace_path else None - + async def event_generator(): async for sse_str in chat_service.stream_response( conversation=conversation, @@ -151,9 +152,8 @@ async def stream_message( workspace=workspace, user_permission_level=current_user.permission_level ): - # Chat service returns raw SSE strings (including done event) yield sse_str - + return StreamingResponse( event_generator(), media_type="text/event-stream", @@ -176,11 +176,11 @@ def delete_message( Message.id == message_id, Conversation.user_id == current_user.id ).first() - + if not message: return error_response("Message not found", 404) - + db.delete(message) db.commit() - + return success_response(message="Message deleted successfully") diff --git a/luxx/routes/providers.py b/luxx/api/providers.py similarity index 93% rename from luxx/routes/providers.py rename to luxx/api/providers.py index fddbb5f..ecf1893 100644 --- a/luxx/routes/providers.py +++ b/luxx/api/providers.py @@ -3,9 +3,9 @@ from typing import Optional from fastapi import APIRouter, Depends, HTTPException from pydantic import BaseModel -from luxx.database import get_db, SessionLocal -from luxx.models import User, LLMProvider -from luxx.routes.auth import get_current_user +from luxx.core.database import SessionLocal +from luxx.models.user import User, LLMProvider +from luxx.api.auth import get_current_user from luxx.utils.helpers import success_response import httpx import asyncio @@ -45,7 +45,7 @@ def list_providers( providers = db.query(LLMProvider).filter( LLMProvider.user_id == current_user.id ).order_by(LLMProvider.is_default.desc(), LLMProvider.created_at.desc()).all() - + return success_response(data={ "providers": [p.to_dict() for p in providers], "total": len(providers) @@ -62,7 +62,6 @@ def create_provider( """Create a new LLM provider""" db = SessionLocal() try: - db_provider = LLMProvider( user_id=current_user.id, name=provider.name, @@ -75,7 +74,7 @@ def create_provider( db.add(db_provider) db.commit() db.refresh(db_provider) - + return success_response(data=db_provider.to_dict(include_key=True)) except Exception as e: db.rollback() @@ -96,10 +95,10 @@ def get_provider( LLMProvider.id == provider_id, LLMProvider.user_id == current_user.id ).first() - + if not provider: raise HTTPException(status_code=404, detail="Provider not found") - + return success_response(data=provider.to_dict(include_key=True)) finally: db.close() @@ -118,28 +117,25 @@ def update_provider( LLMProvider.id == provider_id, LLMProvider.user_id == current_user.id ).first() - + if not provider: raise HTTPException(status_code=404, detail="Provider not found") - - # If setting as default, unset others + if update.is_default: db.query(LLMProvider).filter( LLMProvider.user_id == current_user.id, LLMProvider.id != provider_id ).update({"is_default": False}) - - # Update fields + update_data = update.dict(exclude_unset=True) - # Keep existing API key if the new one is empty if update_data.get('api_key') == '': update_data.pop('api_key') for key, value in update_data.items(): setattr(provider, key, value) - + db.commit() db.refresh(provider) - + return success_response(data=provider.to_dict(include_key=True)) except HTTPException: raise @@ -162,13 +158,13 @@ def delete_provider( LLMProvider.id == provider_id, LLMProvider.user_id == current_user.id ).first() - + if not provider: raise HTTPException(status_code=404, detail="Provider not found") - + db.delete(provider) db.commit() - + return success_response(message="Provider deleted") finally: db.close() @@ -180,7 +176,6 @@ def test_provider( current_user: User = Depends(get_current_user) ): """Test provider connection""" - try: db = SessionLocal() try: @@ -188,11 +183,10 @@ def test_provider( LLMProvider.id == provider_id, LLMProvider.user_id == current_user.id ).first() - + if not provider: return {"success": False, "message": "Provider not found"} - - # Test the connection + async def test(): async with httpx.AsyncClient(timeout=10.0) as client: response = await client.post( @@ -213,7 +207,7 @@ def test_provider( "success": True, "response_body": response.text[:500] if response.text else None } - + result = asyncio.run(test()) return { "success": result.get("success", False), diff --git a/luxx/api/rooms.py b/luxx/api/rooms.py new file mode 100644 index 0000000..062a68a --- /dev/null +++ b/luxx/api/rooms.py @@ -0,0 +1,156 @@ +"""Chat Room API Routes""" +from typing import List, Optional +from fastapi import APIRouter, HTTPException +from pydantic import BaseModel + +from luxx.services.room import chat_room_service + +router = APIRouter(prefix="/chat-rooms", tags=["chat-rooms"]) + + +class CreateChatRoomRequest(BaseModel): + name: str + description: Optional[str] = None + agent_ids: Optional[List[str]] = None + + +class UpdateChatRoomRequest(BaseModel): + name: Optional[str] = None + description: Optional[str] = None + is_active: Optional[bool] = None + + +class SendMessageRequest(BaseModel): + content: str + + +class AddAgentRequest(BaseModel): + agent_id: str + + +def get_current_user_id() -> int: + """Get current user ID from auth context""" + return 1 + + +@router.get("") +async def list_chat_rooms(): + """List all chat rooms""" + user_id = get_current_user_id() + rooms = chat_room_service.list_rooms(user_id=user_id) + return {"rooms": rooms} + + +@router.post("") +async def create_chat_room(request: CreateChatRoomRequest): + """Create a new chat room""" + user_id = get_current_user_id() + room = chat_room_service.create_room( + name=request.name, + owner_id=user_id, + description=request.description, + agent_ids=request.agent_ids + ) + return {"room": room} + + +@router.get("/{room_id}") +async def get_chat_room(room_id: str): + """Get a chat room by ID""" + room = chat_room_service.get_room(room_id) + if not room: + raise HTTPException(status_code=404, detail="Chat room not found") + return {"room": room.to_dict(include_agents=True)} + + +@router.put("/{room_id}") +async def update_chat_room(room_id: str, request: UpdateChatRoomRequest): + """Update a chat room""" + room = chat_room_service.update_room( + room_id=room_id, + name=request.name, + description=request.description, + is_active=request.is_active + ) + if not room: + raise HTTPException(status_code=404, detail="Chat room not found") + return {"room": room} + + +@router.delete("/{room_id}") +async def delete_chat_room(room_id: str): + """Delete a chat room""" + success = chat_room_service.delete_room(room_id) + if not success: + raise HTTPException(status_code=404, detail="Chat room not found") + return {"success": True} + + +@router.get("/{room_id}/agents") +async def get_room_agents(room_id: str): + """Get all agents in a chat room""" + agents = chat_room_service.get_room_agents(room_id) + return {"agents": [a.to_dict() for a in agents]} + + +@router.post("/{room_id}/agents") +async def add_agent_to_room(room_id: str, request: AddAgentRequest): + """Add an agent to a chat room""" + success = chat_room_service.add_agent_to_room(room_id, request.agent_id) + if not success: + raise HTTPException(status_code=400, detail="Failed to add agent") + return {"success": True} + + +@router.delete("/{room_id}/agents/{agent_id}") +async def remove_agent_from_room(room_id: str, agent_id: str): + """Remove an agent from a chat room""" + success = chat_room_service.remove_agent_from_room(room_id, agent_id) + if not success: + raise HTTPException(status_code=404, detail="Agent not found in room") + return {"success": True} + + +@router.get("/{room_id}/messages") +async def get_room_messages(room_id: str, limit: int = 50, before_id: str = None): + """Get messages from a chat room""" + messages = chat_room_service.get_messages(room_id, limit=limit, before_id=before_id) + return {"messages": messages} + + +@router.post("/{room_id}/messages") +async def send_message(room_id: str, request: SendMessageRequest): + """Send a message to a chat room. Returns a streaming response via SSE.""" + from fastapi.responses import StreamingResponse + import json + + user_id = str(get_current_user_id()) + user_name = "User" + + async def generate(): + async for event in chat_room_service.process_message( + room_id=room_id, + user_message=request.content, + user_id=user_id, + user_name=user_name + ): + if event.get("event") in ["process_step", "done"]: + chat_room_service.save_message( + room_id=room_id, + sender_type="user", + sender_id=user_id, + sender_name=user_name, + content=request.content + ) + + yield f"data: {json.dumps(event, ensure_ascii=False)}\n\n" + + return StreamingResponse( + generate(), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "X-Accel-Buffering": "no" + } + ) diff --git a/luxx/routes/tools.py b/luxx/api/tools.py similarity index 83% rename from luxx/routes/tools.py rename to luxx/api/tools.py index cfadf64..2ff47dd 100644 --- a/luxx/routes/tools.py +++ b/luxx/api/tools.py @@ -1,11 +1,10 @@ """Tool routes""" -from typing import Optional, List, Dict, Any +from typing import Optional, Dict, Any from fastapi import APIRouter, Depends, Body from pydantic import BaseModel -from luxx.database import get_db -from luxx.models import User -from luxx.routes.auth import get_current_user +from luxx.models.user import User +from luxx.api.auth import get_current_user from luxx.tools.core import registry from luxx.utils.helpers import success_response @@ -19,24 +18,22 @@ def list_tools( current_user: User = Depends(get_current_user) ): """Get available tools list""" - # Get tool definitions directly from registry to access category - if category: all_tools = [t for t in registry._tools.values() if t.category == category] tools = [t.to_openai_format() for t in all_tools] - categorized_tools = [t for t in registry._tools.values() if t.category == category] + categorized_tools = all_tools else: all_tools = list(registry._tools.values()) tools = registry.list_all() categorized_tools = all_tools - + categorized = {} for tool in categorized_tools: cat = tool.category if cat not in categorized: categorized[cat] = [] categorized[cat].append(tool.to_openai_format()) - + return success_response(data={ "tools": tools, "categorized": categorized, @@ -51,10 +48,10 @@ def get_tool( ): """Get tool details""" tool = registry.get(name) - + if not tool: return {"success": False, "message": "Tool not found", "code": 404} - + return success_response(data=tool.to_openai_format()) diff --git a/luxx/core/__init__.py b/luxx/core/__init__.py new file mode 100644 index 0000000..b78977c --- /dev/null +++ b/luxx/core/__init__.py @@ -0,0 +1,5 @@ +"""Core package - configuration and database""" +from luxx.core.config import config +from luxx.core.database import Base, SessionLocal, get_db, init_db, engine + +__all__ = ["config", "Base", "SessionLocal", "get_db", "init_db", "engine"] diff --git a/luxx/config.py b/luxx/core/config.py similarity index 96% rename from luxx/config.py rename to luxx/core/config.py index 7d0e372..343e29d 100644 --- a/luxx/config.py +++ b/luxx/core/config.py @@ -7,16 +7,16 @@ from typing import Any, Dict, Optional class Config: """Configuration class (singleton pattern)""" - + _instance: Optional["Config"] = None _config: Dict[str, Any] = {} - + def __new__(cls): if cls._instance is None: cls._instance = super().__new__(cls) cls._instance._load_config() return cls._instance - + def _load_config(self) -> None: """Load configuration from YAML file""" yaml_paths = [ @@ -24,16 +24,16 @@ class Config: Path(__file__).parent.parent / "config.yaml", Path.cwd() / "config.yaml", ] - + for path in yaml_paths: if path.exists(): with open(path, "r", encoding="utf-8") as f: self._config = yaml.safe_load(f) or {} self._resolve_env_vars() return - + self._config = {} - + def _resolve_env_vars(self) -> None: """Resolve environment variable references""" def resolve(value: Any) -> Any: @@ -44,9 +44,9 @@ class Config: elif isinstance(value, list): return [resolve(item) for item in value] return value - + self._config = resolve(self._config) - + def get(self, key: str, default: Any = None) -> Any: """Get configuration value, supports dot-separated keys""" keys = key.split(".") @@ -59,77 +59,77 @@ class Config: if value is None: return default return value - + # App configuration @property def secret_key(self) -> str: return self.get("app.secret_key", "change-me-in-production") - + @property def debug(self) -> bool: return self.get("app.debug", True) - + @property def app_host(self) -> str: return self.get("app.host", "0.0.0.0") - + @property def app_port(self) -> int: return self.get("app.port", 8000) - + # Database configuration @property def database_url(self) -> str: return self.get("database.url", "sqlite:///./chat.db") - + # LLM configuration @property def llm_api_key(self) -> str: return self.get("llm.api_key", "") or os.environ.get("DEEPSEEK_API_KEY", "") - + @property def llm_api_url(self) -> str: return self.get("llm.api_url", "https://api.deepseek.com/v1") - + @property def llm_provider(self) -> str: return self.get("llm.provider", "deepseek") - + # Tools configuration @property def tools_enable_cache(self) -> bool: return self.get("tools.enable_cache", True) - + @property def tools_cache_ttl(self) -> int: return self.get("tools.cache_ttl", 300) - + @property def tools_max_workers(self) -> int: return self.get("tools.max_workers", 4) - + @property def tools_max_iterations(self) -> int: return self.get("tools.max_iterations", 10) - + # Logging configuration @property def log_level(self) -> str: return self.get("logging.level", "INFO") - + @property def log_format(self) -> str: return self.get("logging.format", "%(asctime)s | %(levelname)-8s | %(message)s") - + @property def log_date_format(self) -> str: return self.get("logging.date_format", "%Y-%m-%d %H:%M:%S") - + # Workspace configuration @property def workspace_root(self) -> str: return self.get("workspace.root", "./workspaces") - + @property def workspace_auto_create(self) -> bool: return self.get("workspace.auto_create", True) diff --git a/luxx/database.py b/luxx/core/database.py similarity index 87% rename from luxx/database.py rename to luxx/core/database.py index c4a9dba..ec4072f 100644 --- a/luxx/database.py +++ b/luxx/core/database.py @@ -1,9 +1,9 @@ """Database connection module""" from sqlalchemy import create_engine -from sqlalchemy.orm import sessionmaker, declarative_base, Mapped +from sqlalchemy.orm import sessionmaker, declarative_base from typing import Generator -from luxx.config import config +from luxx.core.config import config # Create database engine @@ -13,9 +13,11 @@ engine = create_engine( echo=False ) + # Create session factory SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) + # Create base class Base = declarative_base() diff --git a/luxx/models.py b/luxx/models.py deleted file mode 100644 index e2198a8..0000000 --- a/luxx/models.py +++ /dev/null @@ -1,223 +0,0 @@ -"""ORM model definitions""" -from datetime import datetime -from typing import Optional, List -from sqlalchemy import String, Integer, Boolean, Float, Text, DateTime, ForeignKey -from sqlalchemy.orm import Mapped, mapped_column, relationship - -from luxx.database import Base - - -def local_now(): - return datetime.now() - - -class LLMProvider(Base): - """LLM Provider configuration model""" - __tablename__ = "llm_providers" - - id: Mapped[int] = mapped_column(Integer, primary_key=True) - user_id: Mapped[int] = mapped_column(Integer, ForeignKey("users.id"), nullable=False) - name: Mapped[str] = mapped_column(String(100), nullable=False) - provider_type: Mapped[str] = mapped_column(String(50), nullable=False, default="openai") # openai, deepseek, glm, etc. - base_url: Mapped[str] = mapped_column(String(500), nullable=False) - api_key: Mapped[str] = mapped_column(String(500), nullable=False) - default_model: Mapped[str] = mapped_column(String(100), nullable=False, default="gpt-4") - max_tokens: Mapped[int] = mapped_column(Integer, default=8192) # 默认 8192 - is_default: Mapped[bool] = mapped_column(Boolean, default=False) - enabled: Mapped[bool] = mapped_column(Boolean, default=True) - created_at: Mapped[datetime] = mapped_column(DateTime, default=local_now) - updated_at: Mapped[datetime] = mapped_column(DateTime, default=local_now, onupdate=local_now) - - # Relationships - user: Mapped["User"] = relationship("User", backref="llm_providers") - - def to_dict(self, include_key: bool = False): - """Convert to dictionary, optionally include API key""" - result = { - "id": self.id, - "user_id": self.user_id, - "name": self.name, - "provider_type": self.provider_type, - "base_url": self.base_url, - "default_model": self.default_model, - "max_tokens": self.max_tokens, - "is_default": self.is_default, - "enabled": self.enabled, - "created_at": self.created_at.isoformat() if self.created_at else None, - "updated_at": self.updated_at.isoformat() if self.updated_at else None - } - if include_key: - result["api_key"] = self.api_key - return result - - -class Project(Base): - """Project model""" - __tablename__ = "projects" - - id: Mapped[str] = mapped_column(String(64), primary_key=True) - user_id: Mapped[int] = mapped_column(Integer, ForeignKey("users.id"), nullable=False) - name: Mapped[str] = mapped_column(String(255), nullable=False) - description: Mapped[Optional[str]] = mapped_column(Text, nullable=True) - created_at: Mapped[datetime] = mapped_column(DateTime, default=local_now) - updated_at: Mapped[datetime] = mapped_column(DateTime, default=local_now, onupdate=local_now) - - # Relationships - user: Mapped["User"] = relationship("User", backref="projects") - - -class User(Base): - """User model""" - __tablename__ = "users" - - id: Mapped[int] = mapped_column(Integer, primary_key=True) - username: Mapped[str] = mapped_column(String(50), unique=True, nullable=False) - email: Mapped[Optional[str]] = mapped_column(String(120), unique=True, nullable=True) - password_hash: Mapped[Optional[str]] = mapped_column(String(255), nullable=True) - role: Mapped[str] = mapped_column(String(20), default="user") - permission_level: Mapped[int] = mapped_column(Integer, default=1) # 1=READ_ONLY, 2=WRITE, 3=EXECUTE, 4=ADMIN - workspace_path: Mapped[Optional[str]] = mapped_column(String(500), nullable=True) # 用户工作空间路径 - is_active: Mapped[bool] = mapped_column(Boolean, default=True) - created_at: Mapped[datetime] = mapped_column(DateTime, default=local_now) - - # Relationships - conversations: Mapped[List["Conversation"]] = relationship( - "Conversation", back_populates="user", cascade="all, delete-orphan" - ) - - def to_dict(self): - return { - "id": self.id, - "username": self.username, - "email": self.email, - "role": self.role, - "permission_level": self.permission_level, - "workspace_path": self.workspace_path, - "is_active": self.is_active, - "created_at": self.created_at.isoformat() if self.created_at else None - } - - -class Conversation(Base): - """Conversation model""" - __tablename__ = "conversations" - - id: Mapped[str] = mapped_column(String(64), primary_key=True) - user_id: Mapped[int] = mapped_column(Integer, ForeignKey("users.id"), nullable=False) - provider_id: Mapped[Optional[int]] = mapped_column(Integer, ForeignKey("llm_providers.id"), nullable=True) - project_id: Mapped[Optional[str]] = mapped_column(String(64), ForeignKey("projects.id"), nullable=True) - title: Mapped[str] = mapped_column(String(255), nullable=False) - model: Mapped[str] = mapped_column(String(64), nullable=False, default="deepseek-chat") - system_prompt: Mapped[str] = mapped_column(Text, nullable=False, default="You are a helpful assistant.") - temperature: Mapped[float] = mapped_column(Float, default=0.7) - max_tokens: Mapped[int] = mapped_column(Integer, default=2000) - thinking_enabled: Mapped[bool] = mapped_column(Boolean, default=False) - created_at: Mapped[datetime] = mapped_column(DateTime, default=local_now) - updated_at: Mapped[datetime] = mapped_column(DateTime, default=local_now, onupdate=local_now) - - # Relationships - user: Mapped["User"] = relationship("User", back_populates="conversations") - provider: Mapped[Optional["LLMProvider"]] = relationship("LLMProvider") - messages: Mapped[List["Message"]] = relationship( - "Message", back_populates="conversation", cascade="all, delete-orphan" - ) - - def to_dict(self): - return { - "id": self.id, - "user_id": self.user_id, - "provider_id": self.provider_id, - "project_id": self.project_id, - "title": self.title, - "model": self.model, - "system_prompt": self.system_prompt, - "temperature": self.temperature, - "max_tokens": self.max_tokens, - "thinking_enabled": self.thinking_enabled, - "created_at": self.created_at.isoformat() if self.created_at else None, - "updated_at": self.updated_at.isoformat() if self.updated_at else None - } - - -class Message(Base): - """Message model - - content 字段统一使用 JSON 格式存储: - - **User 消息:** - { - "text": "用户输入的文本内容", - "attachments": [ - {"name": "utils.py", "extension": "py", "content": "..."} - ] - } - - **Assistant 消息:** - { - "text": "AI 回复的文本内容", - "tool_calls": [...], // 遗留的扁平结构 - "steps": [ // 有序步骤,用于渲染(主要数据源) - {"id": "step-0", "index": 0, "type": "thinking", "content": "..."}, - {"id": "step-1", "index": 1, "type": "text", "content": "..."}, - {"id": "step-2", "index": 2, "type": "tool_call", "id_ref": "call_xxx", "name": "...", "arguments": "..."}, - {"id": "step-3", "index": 3, "type": "tool_result", "id_ref": "call_xxx", "name": "...", "content": "..."} - ] - } - """ - __tablename__ = "messages" - - id: Mapped[str] = mapped_column(String(64), primary_key=True) - conversation_id: Mapped[str] = mapped_column(String(64), ForeignKey("conversations.id"), nullable=False) - role: Mapped[str] = mapped_column(String(16), nullable=False) # user, assistant, system, tool - content: Mapped[str] = mapped_column(Text, nullable=False, default="") - token_count: Mapped[int] = mapped_column(Integer, default=0) - usage: Mapped[Optional[str]] = mapped_column(Text, nullable=True) # JSON string for usage info - created_at: Mapped[datetime] = mapped_column(DateTime, default=local_now) - - # Relationships - conversation: Mapped["Conversation"] = relationship("Conversation", back_populates="messages") - - def to_dict(self): - """Convert to dictionary, extracting process_steps for frontend""" - import json - - result = { - "id": self.id, - "conversation_id": self.conversation_id, - "role": self.role, - "token_count": self.token_count, - "created_at": self.created_at.isoformat() if self.created_at else None - } - - # Parse usage JSON - if self.usage: - try: - result["usage"] = json.loads(self.usage) - except json.JSONDecodeError: - result["usage"] = None - - # Parse content JSON - try: - content_obj = json.loads(self.content) if self.content else {} - except json.JSONDecodeError: - # Legacy plain text content - result["content"] = self.content - result["text"] = self.content - result["attachments"] = [] - result["tool_calls"] = [] - result["process_steps"] = [] - return result - - # Extract common fields - result["text"] = content_obj.get("text", "") - result["attachments"] = content_obj.get("attachments", []) - result["tool_calls"] = content_obj.get("tool_calls", []) - - # Extract steps as process_steps for frontend rendering - result["process_steps"] = content_obj.get("steps", []) - - # For backward compatibility - if "content" not in result: - result["content"] = result["text"] - - return result diff --git a/luxx/models/__init__.py b/luxx/models/__init__.py new file mode 100644 index 0000000..600c2b3 --- /dev/null +++ b/luxx/models/__init__.py @@ -0,0 +1,10 @@ +"""Models package""" +from luxx.models.user import User, LLMProvider, Project +from luxx.models.chat import Conversation, Message +from luxx.models.room import ChatRoom, Agent, ChatRoomAgent, ChatRoomMessage + +__all__ = [ + "User", "LLMProvider", "Project", + "Conversation", "Message", + "ChatRoom", "Agent", "ChatRoomAgent", "ChatRoomMessage" +] diff --git a/luxx/models/chat.py b/luxx/models/chat.py new file mode 100644 index 0000000..e86cc20 --- /dev/null +++ b/luxx/models/chat.py @@ -0,0 +1,111 @@ +"""Chat-related models""" +from datetime import datetime +from typing import Optional, List +from sqlalchemy import String, Integer, Boolean, Float, Text, DateTime, ForeignKey +from sqlalchemy.orm import Mapped, mapped_column, relationship + +from luxx.core.database import Base + + +def local_now(): + return datetime.now() + + +class Conversation(Base): + """Conversation model""" + __tablename__ = "conversations" + + id: Mapped[str] = mapped_column(String(64), primary_key=True) + user_id: Mapped[int] = mapped_column(Integer, ForeignKey("users.id"), nullable=False) + provider_id: Mapped[Optional[int]] = mapped_column(Integer, ForeignKey("llm_providers.id"), nullable=True) + project_id: Mapped[Optional[str]] = mapped_column(String(64), ForeignKey("projects.id"), nullable=True) + title: Mapped[str] = mapped_column(String(255), nullable=False) + model: Mapped[str] = mapped_column(String(64), nullable=False, default="deepseek-chat") + system_prompt: Mapped[str] = mapped_column(Text, nullable=False, default="You are a helpful assistant.") + temperature: Mapped[float] = mapped_column(Float, default=0.7) + max_tokens: Mapped[int] = mapped_column(Integer, default=2000) + thinking_enabled: Mapped[bool] = mapped_column(Boolean, default=False) + created_at: Mapped[datetime] = mapped_column(DateTime, default=local_now) + updated_at: Mapped[datetime] = mapped_column(DateTime, default=local_now, onupdate=local_now) + + # Relationships + user: Mapped["User"] = relationship("User", back_populates="conversations") + provider: Mapped[Optional["LLMProvider"]] = relationship("LLMProvider") + messages: Mapped[List["Message"]] = relationship( + "Message", back_populates="conversation", cascade="all, delete-orphan" + ) + + def to_dict(self): + return { + "id": self.id, + "user_id": self.user_id, + "provider_id": self.provider_id, + "project_id": self.project_id, + "title": self.title, + "model": self.model, + "system_prompt": self.system_prompt, + "temperature": self.temperature, + "max_tokens": self.max_tokens, + "thinking_enabled": self.thinking_enabled, + "created_at": self.created_at.isoformat() if self.created_at else None, + "updated_at": self.updated_at.isoformat() if self.updated_at else None + } + + +class Message(Base): + """Message model. + + content 字段统一使用 JSON 格式存储: + """ + __tablename__ = "messages" + + id: Mapped[str] = mapped_column(String(64), primary_key=True) + conversation_id: Mapped[str] = mapped_column(String(64), ForeignKey("conversations.id"), nullable=False) + role: Mapped[str] = mapped_column(String(16), nullable=False) + content: Mapped[str] = mapped_column(Text, nullable=False, default="") + token_count: Mapped[int] = mapped_column(Integer, default=0) + usage: Mapped[Optional[str]] = mapped_column(Text, nullable=True) + created_at: Mapped[datetime] = mapped_column(DateTime, default=local_now) + + # Relationships + conversation: Mapped["Conversation"] = relationship("Conversation", back_populates="messages") + + def to_dict(self): + """Convert to dictionary, extracting process_steps for frontend""" + import json + + result = { + "id": self.id, + "conversation_id": self.conversation_id, + "role": self.role, + "token_count": self.token_count, + "created_at": self.created_at.isoformat() if self.created_at else None + } + + # Parse usage JSON + if self.usage: + try: + result["usage"] = json.loads(self.usage) + except json.JSONDecodeError: + result["usage"] = None + + # Parse content JSON + try: + content_obj = json.loads(self.content) if self.content else {} + except json.JSONDecodeError: + result["content"] = self.content + result["text"] = self.content + result["attachments"] = [] + result["tool_calls"] = [] + result["process_steps"] = [] + return result + + result["text"] = content_obj.get("text", "") + result["attachments"] = content_obj.get("attachments", []) + result["tool_calls"] = content_obj.get("tool_calls", []) + result["process_steps"] = content_obj.get("steps", []) + + if "content" not in result: + result["content"] = result["text"] + + return result diff --git a/luxx/models/room.py b/luxx/models/room.py new file mode 100644 index 0000000..dc5f41f --- /dev/null +++ b/luxx/models/room.py @@ -0,0 +1,172 @@ +"""ChatRoom models""" +from datetime import datetime +from typing import Optional, List +from sqlalchemy import String, Integer, Boolean, Text, DateTime, ForeignKey +from sqlalchemy.orm import Mapped, mapped_column, relationship + +from luxx.core.database import Base + + +def local_now(): + return datetime.now() + + +class ChatRoom(Base): + """Chat Room model - like a group chat for multiple agents""" + __tablename__ = "chat_rooms" + + id: Mapped[str] = mapped_column(String(64), primary_key=True) + name: Mapped[str] = mapped_column(String(100), nullable=False) + description: Mapped[Optional[str]] = mapped_column(Text, nullable=True) + owner_id: Mapped[int] = mapped_column(Integer, ForeignKey("users.id"), nullable=False) + is_active: Mapped[bool] = mapped_column(Boolean, default=True) + created_at: Mapped[datetime] = mapped_column(DateTime, default=local_now) + updated_at: Mapped[datetime] = mapped_column(DateTime, default=local_now, onupdate=local_now) + + # Relationships + owner: Mapped["User"] = relationship("User", backref="chat_rooms") + agents: Mapped[List["ChatRoomAgent"]] = relationship( + "ChatRoomAgent", back_populates="chat_room", cascade="all, delete-orphan" + ) + messages: Mapped[List["ChatRoomMessage"]] = relationship( + "ChatRoomMessage", back_populates="chat_room", cascade="all, delete-orphan" + ) + + def to_dict(self, include_agents: bool = False): + result = { + "id": self.id, + "name": self.name, + "description": self.description, + "owner_id": self.owner_id, + "is_active": self.is_active, + "created_at": self.created_at.isoformat() if self.created_at else None, + "updated_at": self.updated_at.isoformat() if self.updated_at else None + } + if include_agents and self.agents: + result["agents"] = [ca.to_dict() for ca in self.agents] + return result + + +class Agent(Base): + """Agent model - defines an AI agent with specific role""" + __tablename__ = "agents" + + id: Mapped[str] = mapped_column(String(64), primary_key=True) + name: Mapped[str] = mapped_column(String(50), nullable=False) + role: Mapped[str] = mapped_column(String(50), nullable=False, default="helper") + avatar: Mapped[Optional[str]] = mapped_column(String(500), nullable=True) + system_prompt: Mapped[str] = mapped_column(Text, nullable=False, default="You are a helpful assistant.") + provider_id: Mapped[Optional[int]] = mapped_column(Integer, ForeignKey("llm_providers.id"), nullable=True) + model: Mapped[Optional[str]] = mapped_column(String(100), nullable=True) + tools: Mapped[Optional[str]] = mapped_column(Text, nullable=True) + is_active: Mapped[bool] = mapped_column(Boolean, default=True) + priority: Mapped[int] = mapped_column(Integer, default=5) + auto_response: Mapped[bool] = mapped_column(Boolean, default=True) + mention_trigger: Mapped[bool] = mapped_column(Boolean, default=False) + temperature: Mapped[float] = mapped_column(Text, default="0.7") + max_tokens: Mapped[int] = mapped_column(Integer, default=2048) + created_at: Mapped[datetime] = mapped_column(DateTime, default=local_now) + updated_at: Mapped[datetime] = mapped_column(DateTime, default=local_now, onupdate=local_now) + + # Relationships + provider: Mapped[Optional["LLMProvider"]] = relationship("LLMProvider") + chat_rooms: Mapped[List["ChatRoomAgent"]] = relationship( + "ChatRoomAgent", back_populates="agent", cascade="all, delete-orphan" + ) + + def to_dict(self, include_secrets: bool = False): + import json + result = { + "id": self.id, + "name": self.name, + "role": self.role, + "avatar": self.avatar, + "system_prompt": self.system_prompt, + "provider_id": self.provider_id, + "model": self.model, + "is_active": self.is_active, + "priority": self.priority, + "auto_response": self.auto_response, + "mention_trigger": self.mention_trigger, + "temperature": float(self.temperature) if self.temperature else 0.7, + "max_tokens": self.max_tokens, + "created_at": self.created_at.isoformat() if self.created_at else None, + "updated_at": self.updated_at.isoformat() if self.updated_at else None + } + if self.tools: + try: + result["tools"] = json.loads(self.tools) + except json.JSONDecodeError: + result["tools"] = [] + else: + result["tools"] = [] + + if include_secrets and self.provider: + result["provider"] = self.provider.to_dict(include_key=True) + return result + + +class ChatRoomAgent(Base): + """Association table for ChatRoom and Agent""" + __tablename__ = "chat_room_agents" + + id: Mapped[str] = mapped_column(String(64), primary_key=True) + chat_room_id: Mapped[str] = mapped_column(String(64), ForeignKey("chat_rooms.id"), nullable=False) + agent_id: Mapped[str] = mapped_column(String(64), ForeignKey("agents.id"), nullable=False) + is_active: Mapped[bool] = mapped_column(Boolean, default=True) + joined_at: Mapped[datetime] = mapped_column(DateTime, default=local_now) + + # Relationships + chat_room: Mapped["ChatRoom"] = relationship("ChatRoom", back_populates="agents") + agent: Mapped["Agent"] = relationship("Agent", back_populates="chat_rooms") + + def to_dict(self): + return { + "id": self.id, + "chat_room_id": self.chat_room_id, + "agent_id": self.agent_id, + "is_active": self.is_active, + "joined_at": self.joined_at.isoformat() if self.joined_at else None, + "agent": self.agent.to_dict() if self.agent else None + } + + +class ChatRoomMessage(Base): + """Chat Room Message model""" + __tablename__ = "chat_room_messages" + + id: Mapped[str] = mapped_column(String(64), primary_key=True) + room_id: Mapped[str] = mapped_column(String(64), ForeignKey("chat_rooms.id"), nullable=False) + sender_type: Mapped[str] = mapped_column(String(16), nullable=False) + sender_id: Mapped[str] = mapped_column(String(64), nullable=False) + sender_name: Mapped[str] = mapped_column(String(50), nullable=False) + content: Mapped[str] = mapped_column(Text, nullable=False, default="") + mentions: Mapped[Optional[str]] = mapped_column(Text, nullable=True) + parent_id: Mapped[Optional[str]] = mapped_column(String(64), nullable=True) + token_count: Mapped[int] = mapped_column(Integer, default=0) + created_at: Mapped[datetime] = mapped_column(DateTime, default=local_now) + + # Relationships + chat_room: Mapped["ChatRoom"] = relationship("ChatRoom", back_populates="messages") + + def to_dict(self): + import json + result = { + "id": self.id, + "room_id": self.room_id, + "sender_type": self.sender_type, + "sender_id": self.sender_id, + "sender_name": self.sender_name, + "content": self.content, + "parent_id": self.parent_id, + "token_count": self.token_count, + "created_at": self.created_at.isoformat() if self.created_at else None + } + if self.mentions: + try: + result["mentions"] = json.loads(self.mentions) + except json.JSONDecodeError: + result["mentions"] = [] + else: + result["mentions"] = [] + return result diff --git a/luxx/models/user.py b/luxx/models/user.py new file mode 100644 index 0000000..c0196bf --- /dev/null +++ b/luxx/models/user.py @@ -0,0 +1,99 @@ +"""User-related models""" +from datetime import datetime +from typing import Optional, List +from sqlalchemy import String, Integer, Boolean, Text, DateTime, ForeignKey +from sqlalchemy.orm import Mapped, mapped_column, relationship + +from luxx.core.database import Base + + +def local_now(): + return datetime.now() + + +class User(Base): + """User model""" + __tablename__ = "users" + + id: Mapped[int] = mapped_column(Integer, primary_key=True) + username: Mapped[str] = mapped_column(String(50), unique=True, nullable=False) + email: Mapped[Optional[str]] = mapped_column(String(120), unique=True, nullable=True) + password_hash: Mapped[Optional[str]] = mapped_column(String(255), nullable=True) + role: Mapped[str] = mapped_column(String(20), default="user") + permission_level: Mapped[int] = mapped_column(Integer, default=1) + workspace_path: Mapped[Optional[str]] = mapped_column(String(500), nullable=True) + is_active: Mapped[bool] = mapped_column(Boolean, default=True) + created_at: Mapped[datetime] = mapped_column(DateTime, default=local_now) + + # Relationships + conversations: Mapped[List["Conversation"]] = relationship( + "Conversation", back_populates="user", cascade="all, delete-orphan" + ) + llm_providers: Mapped[List["LLMProvider"]] = relationship("LLMProvider", back_populates="user") + projects: Mapped[List["Project"]] = relationship("Project", back_populates="user") + + def to_dict(self): + return { + "id": self.id, + "username": self.username, + "email": self.email, + "role": self.role, + "permission_level": self.permission_level, + "workspace_path": self.workspace_path, + "is_active": self.is_active, + "created_at": self.created_at.isoformat() if self.created_at else None + } + + +class LLMProvider(Base): + """LLM Provider configuration model""" + __tablename__ = "llm_providers" + + id: Mapped[int] = mapped_column(Integer, primary_key=True) + user_id: Mapped[int] = mapped_column(Integer, ForeignKey("users.id"), nullable=False) + name: Mapped[str] = mapped_column(String(100), nullable=False) + provider_type: Mapped[str] = mapped_column(String(50), nullable=False, default="openai") + base_url: Mapped[str] = mapped_column(String(500), nullable=False) + api_key: Mapped[str] = mapped_column(String(500), nullable=False) + default_model: Mapped[str] = mapped_column(String(100), nullable=False, default="gpt-4") + max_tokens: Mapped[int] = mapped_column(Integer, default=8192) + is_default: Mapped[bool] = mapped_column(Boolean, default=False) + enabled: Mapped[bool] = mapped_column(Boolean, default=True) + created_at: Mapped[datetime] = mapped_column(DateTime, default=local_now) + updated_at: Mapped[datetime] = mapped_column(DateTime, default=local_now, onupdate=local_now) + + # Relationships + user: Mapped["User"] = relationship("User", back_populates="llm_providers") + + def to_dict(self, include_key: bool = False): + result = { + "id": self.id, + "user_id": self.user_id, + "name": self.name, + "provider_type": self.provider_type, + "base_url": self.base_url, + "default_model": self.default_model, + "max_tokens": self.max_tokens, + "is_default": self.is_default, + "enabled": self.enabled, + "created_at": self.created_at.isoformat() if self.created_at else None, + "updated_at": self.updated_at.isoformat() if self.updated_at else None + } + if include_key: + result["api_key"] = self.api_key + return result + + +class Project(Base): + """Project model""" + __tablename__ = "projects" + + id: Mapped[str] = mapped_column(String(64), primary_key=True) + user_id: Mapped[int] = mapped_column(Integer, ForeignKey("users.id"), nullable=False) + name: Mapped[str] = mapped_column(String(255), nullable=False) + description: Mapped[Optional[str]] = mapped_column(Text, nullable=True) + created_at: Mapped[datetime] = mapped_column(DateTime, default=local_now) + updated_at: Mapped[datetime] = mapped_column(DateTime, default=local_now, onupdate=local_now) + + # Relationships + user: Mapped["User"] = relationship("User", back_populates="projects") diff --git a/luxx/routes/__init__.py b/luxx/routes/__init__.py deleted file mode 100644 index c6519c8..0000000 --- a/luxx/routes/__init__.py +++ /dev/null @@ -1,14 +0,0 @@ -"""API routes module""" -from fastapi import APIRouter - -from luxx.routes import auth, conversations, messages, tools, providers - - -api_router = APIRouter() - -# Register sub-routes -api_router.include_router(auth.router) -api_router.include_router(conversations.router) -api_router.include_router(messages.router) -api_router.include_router(tools.router) -api_router.include_router(providers.router) diff --git a/luxx/services/__init__.py b/luxx/services/__init__.py index d4d0ab0..c53c74b 100644 --- a/luxx/services/__init__.py +++ b/luxx/services/__init__.py @@ -1,3 +1,6 @@ -"""Services module""" -from luxx.services.llm_client import LLMClient, llm_client, LLMResponse -from luxx.services.chat import ChatService, chat_service +"""Services package""" +from luxx.services.chat import chat_service +from luxx.services.room import chat_room_service +from luxx.services.agent import agent_manager + +__all__ = ["chat_service", "chat_room_service", "agent_manager"] diff --git a/luxx/services/agent.py b/luxx/services/agent.py new file mode 100644 index 0000000..247f9e1 --- /dev/null +++ b/luxx/services/agent.py @@ -0,0 +1,125 @@ +"""Agent Manager Service""" +import uuid +import json +import logging +from typing import List, Dict, Optional + +from luxx.core.database import SessionLocal +from luxx.models.room import Agent + +logger = logging.getLogger(__name__) + + +class AgentManager: + """Service for managing agents""" + + def get_agent(self, agent_id: str) -> Optional[Dict]: + """Get an agent by ID""" + db = SessionLocal() + try: + agent = db.query(Agent).filter(Agent.id == agent_id).first() + return agent.to_dict() if agent else None + finally: + db.close() + + def list_agents(self, include_inactive: bool = False) -> List[Dict]: + """List all agents""" + db = SessionLocal() + try: + query = db.query(Agent) + if not include_inactive: + query = query.filter(Agent.is_active == True) + agents = query.order_by(Agent.priority).all() + return [a.to_dict() for a in agents] + finally: + db.close() + + def create_agent(self, name: str, role: str, system_prompt: str, provider_id: int = None, model: str = None, + tools: List[str] = None, priority: int = 5, auto_response: bool = True, mention_trigger: bool = False, + temperature: float = 0.7, max_tokens: int = 2048, avatar: str = None) -> Dict: + """Create a new agent""" + if not system_prompt: + system_prompt = "You are a helpful assistant." + + db = SessionLocal() + try: + agent = Agent( + id=str(uuid.uuid4()), + name=name, + role=role, + system_prompt=system_prompt, + provider_id=provider_id, + model=model, + tools=json.dumps(tools) if tools else None, + priority=priority, + auto_response=auto_response, + mention_trigger=mention_trigger, + temperature=str(temperature), + max_tokens=max_tokens, + avatar=avatar + ) + db.add(agent) + db.commit() + return agent.to_dict() + finally: + db.close() + + def update_agent(self, agent_id: str, name: str = None, role: str = None, system_prompt: str = None, + provider_id: int = None, model: str = None, tools: List[str] = None, priority: int = None, + auto_response: bool = None, mention_trigger: bool = None, temperature: float = None, + max_tokens: int = None, is_active: bool = None, avatar: str = None) -> Optional[Dict]: + """Update an agent""" + db = SessionLocal() + try: + agent = db.query(Agent).filter(Agent.id == agent_id).first() + if not agent: + return None + + if name is not None: + agent.name = name + if role is not None: + agent.role = role + if system_prompt is not None: + agent.system_prompt = system_prompt + if provider_id is not None: + agent.provider_id = provider_id + if model is not None: + agent.model = model + if tools is not None: + agent.tools = json.dumps(tools) + if priority is not None: + agent.priority = priority + if auto_response is not None: + agent.auto_response = auto_response + if mention_trigger is not None: + agent.mention_trigger = mention_trigger + if temperature is not None: + agent.temperature = str(temperature) + if max_tokens is not None: + agent.max_tokens = max_tokens + if is_active is not None: + agent.is_active = is_active + if avatar is not None: + agent.avatar = avatar + + db.commit() + return agent.to_dict() + finally: + db.close() + + def delete_agent(self, agent_id: str) -> bool: + """Delete an agent""" + db = SessionLocal() + try: + agent = db.query(Agent).filter(Agent.id == agent_id).first() + if not agent: + return False + db.delete(agent) + db.commit() + return True + finally: + db.close() + + +# Global manager instance +agent_manager = AgentManager() diff --git a/luxx/services/room.py b/luxx/services/room.py new file mode 100644 index 0000000..1e7c798 --- /dev/null +++ b/luxx/services/room.py @@ -0,0 +1,361 @@ +"""Chat Room Service - orchestrates multi-agent chat""" +import json +import re +import uuid +import asyncio +import logging +from typing import List, Dict, Any, Optional, AsyncGenerator +from dataclasses import dataclass + +from luxx.core.database import SessionLocal +from luxx.models.room import ChatRoom, Agent, ChatRoomAgent, ChatRoomMessage +from luxx.agents.base import BaseAgent + +logger = logging.getLogger(__name__) + + +# ==================== Dispatcher ==================== + +@dataclass +class DispatchResult: + """Result of message dispatch""" + triggered_agents: List[BaseAgent] + mentions: List[str] + should_respond: bool + + +class MessageDispatcher: + """Dispatcher for routing messages to agents""" + + @staticmethod + def parse_mentions(content: str) -> List[str]: + """Parse @mentions from message content""" + pattern = r'@(\w+)' + return re.findall(pattern, content) + + @staticmethod + def get_agents_by_names(names: List[str], room_agents: List[BaseAgent]) -> List[BaseAgent]: + """Get agents by their names (case-insensitive)""" + name_lower_map = {a.name.lower(): a for a in room_agents} + matched = [] + for name in names: + agent = name_lower_map.get(name.lower()) + if agent: + matched.append(agent) + return matched + + @staticmethod + def get_agents_by_ids(agent_ids: List[str], room_agents: List[BaseAgent]) -> List[BaseAgent]: + """Get agents by their IDs""" + id_set = set(agent_ids) + return [a for a in room_agents if a.agent_id in id_set] + + def dispatch(self, content: str, room_agents: List[BaseAgent], sender_id: str, sender_type: str = "user") -> DispatchResult: + """Dispatch a message to appropriate agents.""" + available_agents = [a for a in room_agents if a.agent_id != sender_id] + mentions = self.parse_mentions(content) + + if mentions: + triggered = self.get_agents_by_names(mentions, available_agents) + logger.info(f"Message with mentions: {mentions} -> triggered: {[a.name for a in triggered]}") + return DispatchResult(triggered_agents=triggered, mentions=mentions, should_respond=len(triggered) > 0) + + auto_agents = [a for a in available_agents if a.auto_response] + auto_agents.sort(key=lambda a: a.priority) + logger.info(f"Auto-response agents triggered: {[a.name for a in auto_agents]}") + return DispatchResult(triggered_agents=auto_agents, mentions=[], should_respond=len(auto_agents) > 0) + + +# ==================== Aggregator ==================== + +class ResponseAggregator: + """Aggregates responses from multiple agents""" + + def __init__(self, room_id: str): + self.room_id = room_id + self._agent_responses: Dict[str, Dict[str, Any]] = {} + + async def aggregate_stream(self, agent_streams: Dict[str, AsyncGenerator]) -> AsyncGenerator[Dict[str, Any], None]: + """Aggregate streaming responses from multiple agents.""" + if not agent_streams: + return + + async def collect_agent_stream(agent_id: str, stream): + try: + async for event in stream: + event["agent_id"] = agent_id + yield event + except Exception as e: + logger.error(f"Agent {agent_id} stream error: {e}") + yield {"event": "error", "agent_id": agent_id, "data": {"content": str(e)}} + + tasks = [collect_agent_stream(agent_id, stream) for agent_id, stream in agent_streams.items()] + async for event in asyncio.merge(*tasks): + yield event + + def aggregate_final(self, responses: Dict[str, Dict[str, Any]]) -> List[Dict[str, Any]]: + """Aggregate final responses from agents.""" + results = [] + for agent_id, response in responses.items(): + if response.get("event") == "done": + results.append({ + "agent_id": agent_id, + "agent_name": response.get("agent_name"), + "message_id": response.get("message_id"), + "content": response.get("content"), + "token_count": response.get("token_count", 0) + }) + return results + + +# ==================== Chat Room Service ==================== + +class ChatRoomService: + """Service for managing chat rooms with multi-agent support""" + + def __init__(self): + self.dispatcher = MessageDispatcher() + + def get_room(self, room_id: str) -> Optional[ChatRoom]: + """Get a chat room by ID""" + db = SessionLocal() + try: + return db.query(ChatRoom).filter(ChatRoom.id == room_id).first() + finally: + db.close() + + def get_room_agents(self, room_id: str) -> List[BaseAgent]: + """Get all active agents in a chat room""" + db = SessionLocal() + try: + room_agents = db.query(ChatRoomAgent).filter( + ChatRoomAgent.chat_room_id == room_id, + ChatRoomAgent.is_active == True + ).all() + + agents = [] + for ra in room_agents: + agent_db = db.query(Agent).filter(Agent.id == ra.agent_id, Agent.is_active == True).first() + if agent_db: + agents.append(BaseAgent.from_model(agent_db)) + + agents.sort(key=lambda a: a.priority) + return agents + finally: + db.close() + + def get_agent(self, agent_id: str) -> Optional[BaseAgent]: + """Get an agent by ID""" + db = SessionLocal() + try: + agent_db = db.query(Agent).filter(Agent.id == agent_id).first() + if agent_db: + return BaseAgent.from_model(agent_db) + return None + finally: + db.close() + + def list_rooms(self, user_id: int = None, include_agents: bool = True) -> List[Dict]: + """List all chat rooms""" + db = SessionLocal() + try: + query = db.query(ChatRoom) + if user_id: + query = query.filter(ChatRoom.owner_id == user_id) + rooms = query.order_by(ChatRoom.updated_at.desc()).all() + return [r.to_dict(include_agents=include_agents) for r in rooms] + finally: + db.close() + + def create_room(self, name: str, owner_id: int, description: str = None, agent_ids: List[str] = None) -> Dict: + """Create a new chat room""" + db = SessionLocal() + try: + room = ChatRoom( + id=str(uuid.uuid4()), + name=name, + description=description, + owner_id=owner_id + ) + db.add(room) + + if agent_ids: + for agent_id in agent_ids: + room_agent = ChatRoomAgent( + id=str(uuid.uuid4()), + chat_room_id=room.id, + agent_id=agent_id + ) + db.add(room_agent) + + db.commit() + return room.to_dict(include_agents=True) + finally: + db.close() + + def update_room(self, room_id: str, name: str = None, description: str = None, is_active: bool = None) -> Optional[Dict]: + """Update a chat room""" + db = SessionLocal() + try: + room = db.query(ChatRoom).filter(ChatRoom.id == room_id).first() + if not room: + return None + + if name is not None: + room.name = name + if description is not None: + room.description = description + if is_active is not None: + room.is_active = is_active + + db.commit() + return room.to_dict(include_agents=True) + finally: + db.close() + + def delete_room(self, room_id: str) -> bool: + """Delete a chat room""" + db = SessionLocal() + try: + room = db.query(ChatRoom).filter(ChatRoom.id == room_id).first() + if not room: + return False + db.delete(room) + db.commit() + return True + finally: + db.close() + + def add_agent_to_room(self, room_id: str, agent_id: str) -> bool: + """Add an agent to a chat room""" + db = SessionLocal() + try: + existing = db.query(ChatRoomAgent).filter( + ChatRoomAgent.chat_room_id == room_id, + ChatRoomAgent.agent_id == agent_id + ).first() + + if existing: + existing.is_active = True + else: + room_agent = ChatRoomAgent( + id=str(uuid.uuid4()), + chat_room_id=room_id, + agent_id=agent_id + ) + db.add(room_agent) + + db.commit() + return True + finally: + db.close() + + def remove_agent_from_room(self, room_id: str, agent_id: str) -> bool: + """Remove an agent from a chat room""" + db = SessionLocal() + try: + room_agent = db.query(ChatRoomAgent).filter( + ChatRoomAgent.chat_room_id == room_id, + ChatRoomAgent.agent_id == agent_id + ).first() + + if room_agent: + room_agent.is_active = False + db.commit() + return True + return False + finally: + db.close() + + def get_messages(self, room_id: str, limit: int = 50, before_id: str = None) -> List[Dict]: + """Get messages from a chat room""" + db = SessionLocal() + try: + query = db.query(ChatRoomMessage).filter( + ChatRoomMessage.room_id == room_id + ).order_by(ChatRoomMessage.created_at.desc()) + + if before_id: + before_msg = db.query(ChatRoomMessage).filter( + ChatRoomMessage.id == before_id + ).first() + if before_msg: + query = query.filter(ChatRoomMessage.created_at < before_msg.created_at) + + messages = query.limit(limit).all() + return [m.to_dict() for m in reversed(messages)] + finally: + db.close() + + def save_message(self, room_id: str, sender_type: str, sender_id: str, sender_name: str, content: str, + mentions: List[str] = None, parent_id: str = None, token_count: int = 0) -> Dict: + """Save a message to a chat room""" + db = SessionLocal() + try: + msg = ChatRoomMessage( + id=str(uuid.uuid4()), + room_id=room_id, + sender_type=sender_type, + sender_id=sender_id, + sender_name=sender_name, + content=content, + mentions=json.dumps(mentions) if mentions else None, + parent_id=parent_id, + token_count=token_count + ) + db.add(msg) + + room = db.query(ChatRoom).filter(ChatRoom.id == room_id).first() + if room: + from datetime import datetime + room.updated_at = datetime.now() + + db.commit() + return msg.to_dict() + finally: + db.close() + + async def process_message(self, room_id: str, user_message: str, user_id: str, user_name: str, context: Dict = None) -> AsyncGenerator[Dict[str, Any], None]: + """Process a user message and dispatch to appropriate agents.""" + room = self.get_room(room_id) + if not room: + yield {"event": "error", "data": {"content": "Chat room not found"}} + return + + room_agents = self.get_room_agents(room_id) + if not room_agents: + yield {"event": "error", "data": {"content": "No agents available in this room"}} + return + + dispatch_result = self.dispatcher.dispatch( + content=user_message, + room_agents=room_agents, + sender_id=user_id, + sender_type="user" + ) + + if not dispatch_result.should_respond: + yield {"event": "no_response", "data": {"message": "No agents triggered"}} + return + + messages = self.get_messages(room_id, limit=20) + + agent_streams = {} + for agent in dispatch_result.triggered_agents: + stream = agent.stream_response( + user_message=user_message, + conversation_history=messages, + context=context + ) + agent_streams[agent.agent_id] = stream + + aggregator = ResponseAggregator(room_id) + async for event in aggregator.aggregate_stream(agent_streams): + yield event + + +# Global service instance +chat_room_service = ChatRoomService() + +# Export for backward compatibility +dispatcher = chat_room_service.dispatcher diff --git a/luxx/services/room_ws.py b/luxx/services/room_ws.py new file mode 100644 index 0000000..b6e83ce --- /dev/null +++ b/luxx/services/room_ws.py @@ -0,0 +1,170 @@ +"""WebSocket handler for Chat Rooms""" +import json +import asyncio +import logging +from typing import Dict, Set +from fastapi import WebSocket, WebSocketDisconnect + +from luxx.services.room import chat_room_service + +logger = logging.getLogger(__name__) + + +class ChatRoomConnectionManager: + """Manages WebSocket connections for chat rooms""" + + def __init__(self): + self._room_connections: Dict[str, Set[WebSocket]] = {} + self._connection_rooms: Dict[WebSocket, str] = {} + + async def connect(self, websocket: WebSocket, room_id: str): + """Connect a WebSocket to a chat room""" + await websocket.accept() + + if room_id not in self._room_connections: + self._room_connections[room_id] = set() + + self._room_connections[room_id].add(websocket) + self._connection_rooms[websocket] = room_id + + logger.info(f"WebSocket connected to room: {room_id}") + + await websocket.send_json({ + "event": "connected", + "data": {"room_id": room_id, "message": "Connected to chat room"} + }) + + def disconnect(self, websocket: WebSocket): + """Disconnect a WebSocket from its room""" + room_id = self._connection_rooms.pop(websocket, None) + if room_id and room_id in self._room_connections: + self._room_connections[room_id].discard(websocket) + if not self._room_connections[room_id]: + del self._room_connections[room_id] + logger.info(f"WebSocket disconnected from room: {room_id}") + + async def broadcast_to_room(self, room_id: str, message: dict, exclude: WebSocket = None): + """Broadcast a message to all connections in a room""" + if room_id not in self._room_connections: + return + + disconnected = [] + for connection in self._room_connections[room_id]: + if connection == exclude: + continue + try: + await connection.send_json(message) + except Exception as e: + logger.error(f"Failed to send to connection: {e}") + disconnected.append(connection) + + for conn in disconnected: + self.disconnect(conn) + + async def send_to_room(self, room_id: str, message: dict): + """Send a message to all connections in a room""" + await self.broadcast_to_room(room_id, message) + + def get_room_size(self, room_id: str) -> int: + """Get the number of connections in a room""" + return len(self._room_connections.get(room_id, set())) + + +# Global connection manager +connection_manager = ChatRoomConnectionManager() + + +async def websocket_handler(websocket: WebSocket, room_id: str): + """Handle WebSocket connection for a chat room.""" + await connection_manager.connect(websocket, room_id) + + room = chat_room_service.get_room(room_id) + if not room: + await websocket.send_json({"event": "error", "data": {"content": "Chat room not found"}}) + await websocket.close() + return + + try: + messages = chat_room_service.get_messages(room_id, limit=50) + await websocket.send_json({"event": "history", "data": {"messages": messages}}) + + agents = chat_room_service.get_room_agents(room_id) + await websocket.send_json({"event": "agents", "data": {"agents": [a.to_dict() for a in agents]}}) + + await connection_manager.broadcast_to_room(room_id, { + "event": "system", + "data": {"content": "User joined the room", "type": "join"} + }, exclude=websocket) + + while True: + data = await websocket.receive_json() + action = data.get("action") + + if action == "send_message": + content = data.get("content", "") + if not content: + continue + + user_id = str(data.get("user_id", "anonymous")) + user_name = data.get("user_name", "Anonymous") + + msg = chat_room_service.save_message( + room_id=room_id, sender_type="user", sender_id=user_id, + sender_name=user_name, content=content + ) + + await connection_manager.broadcast_to_room(room_id, {"event": "message", "data": msg}) + + agents = chat_room_service.get_room_agents(room_id) + for agent in agents: + await connection_manager.broadcast_to_room(room_id, { + "event": "typing", + "data": {"agent_id": agent.agent_id, "agent_name": agent.name, "is_typing": True} + }) + + context = {"user_id": user_id, "username": user_name} + + async for event in chat_room_service.process_message( + room_id=room_id, user_message=content, user_id=user_id, + user_name=user_name, context=context + ): + if event.get("event") in ["process_step", "done", "error"]: + await connection_manager.broadcast_to_room(room_id, { + "event": event.get("event"), + "data": event.get("data", {}), + "agent_id": event.get("agent_id"), + "agent_name": event.get("agent_name") + }) + + if event.get("event") == "done": + chat_room_service.save_message( + room_id=room_id, sender_type="agent", + sender_id=event.get("agent_id"), + sender_name=event.get("agent_name"), + content=event.get("content", ""), + token_count=event.get("token_count", 0) + ) + + for agent in agents: + await connection_manager.broadcast_to_room(room_id, { + "event": "typing", + "data": {"agent_id": agent.agent_id, "agent_name": agent.name, "is_typing": False} + }) + + elif action == "ping": + await websocket.send_json({"event": "pong", "data": {}}) + + except WebSocketDisconnect: + logger.info(f"WebSocket disconnected from room: {room_id}") + connection_manager.disconnect(websocket) + await connection_manager.broadcast_to_room(room_id, { + "event": "system", + "data": {"content": "User left the room", "type": "leave"} + }) + except Exception as e: + logger.error(f"WebSocket error in room {room_id}: {e}") + connection_manager.disconnect(websocket) + await connection_manager.broadcast_to_room(room_id, { + "event": "error", + "data": {"content": str(e)} + })