From 234c74dbf076461009b646386c885bbb4600bb20 Mon Sep 17 00:00:00 2001 From: ViperEkura <3081035982@qq.com> Date: Wed, 22 Apr 2026 10:15:56 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E4=BF=AE=E6=94=B9=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E5=BA=93=E7=BB=93=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- assets/API.md | 351 +++++++++++++----------- assets/ARCHITECTURE.md | 457 +++++++++++-------------------- assets/DATABASE.md | 52 ++-- luxx/__init__.py | 22 +- luxx/agents/builtins/__init__.py | 1 - luxx/models/__init__.py | 6 +- luxx/models/chat.py | 46 +++- luxx/models/participant.py | 72 +++++ luxx/models/room.py | 129 ++------- luxx/models/user.py | 6 +- luxx/services/__init__.py | 3 + luxx/services/participant.py | 128 +++++++++ luxx/services/room.py | 441 ++++++++++------------------- luxx/services/room_ws.py | 213 +++++--------- 14 files changed, 859 insertions(+), 1068 deletions(-) delete mode 100644 luxx/agents/builtins/__init__.py create mode 100644 luxx/models/participant.py create mode 100644 luxx/services/participant.py diff --git a/assets/API.md b/assets/API.md index b6ca15f..c079856 100644 --- a/assets/API.md +++ b/assets/API.md @@ -2,8 +2,14 @@ ## 认证 `/api/auth` +| 方法 | 路径 | 说明 | +|------|------|------| +| POST | `/api/auth/register` | 用户注册 | +| POST | `/api/auth/login` | 用户登录 | +| POST | `/api/auth/logout` | 用户登出 | +| GET | `/api/auth/me` | 获取当前用户信息 | + ### POST /api/auth/register -用户注册 **请求体:** ```json @@ -14,20 +20,7 @@ } ``` -**响应:** -```json -{ - "success": true, - "message": "注册成功", - "data": { - "id": 1, - "username": "string" - } -} -``` - ### POST /api/auth/login -用户登录 **请求体:** ```json @@ -37,88 +30,20 @@ } ``` -**响应:** -```json -{ - "success": true, - "message": "登录成功", - "data": { - "access_token": "eyJ...", - "token_type": "bearer", - "user": { - "id": 1, - "username": "string", - "role": "user" - } - } -} -``` - -### POST /api/auth/logout -用户登出 - -**请求头:** `Authorization: Bearer ` - -**响应:** -```json -{ - "success": true, - "message": "登出成功" -} -``` - -### GET /api/auth/me -获取当前用户信息 - -**请求头:** `Authorization: Bearer ` - -**响应:** -```json -{ - "success": true, - "data": { - "id": 1, - "username": "string", - "email": "user@example.com", - "role": "user", - "is_active": true - } -} -``` - --- ## 会话 `/api/conversations` -### GET /api/conversations/ -获取会话列表 +| 方法 | 路径 | 说明 | +|------|------|------| +| GET | `/api/conversations/` | 获取会话列表 | +| POST | `/api/conversations/` | 创建会话 | +| GET | `/api/conversations/{id}` | 获取会话详情 | +| PUT | `/api/conversations/{id}` | 更新会话 | +| DELETE | `/api/conversations/{id}` | 删除会话 | -**查询参数:** -- `project_id` (可选): 项目ID -- `page` (可选): 页码,默认1 -- `page_size` (可选): 每页数量,默认20 +### 创建会话请求体 -**请求头:** `Authorization: Bearer ` - -**响应:** -```json -{ - "success": true, - "data": { - "items": [...], - "total": 100, - "page": 1, - "page_size": 20 - } -} -``` - -### POST /api/conversations/ -创建会话 - -**请求头:** `Authorization: Bearer ` - -**请求体:** ```json { "project_id": "string (可选)", @@ -131,56 +56,19 @@ } ``` -**响应:** -```json -{ - "success": true, - "message": "会话创建成功", - "data": { - "id": "conv_xxx", - "user_id": 1, - "title": "新会话", - "model": "glm-5", - ... - } -} -``` - -### GET /api/conversations/{id} -获取会话详情 - -**路径参数:** -- `id`: 会话ID - -**请求头:** `Authorization: Bearer ` - -### PUT /api/conversations/{id} -更新会话 - -### DELETE /api/conversations/{id} -删除会话 - --- ## 消息 `/api/messages` -### GET /api/messages/{conversation_id} -获取消息列表 +| 方法 | 路径 | 说明 | +|------|------|------| +| GET | `/api/messages/{conversation_id}` | 获取消息列表 | +| POST | `/api/messages/` | 发送消息(非流式) | +| POST | `/api/messages/stream` | 发送消息(流式 SSE) | +| DELETE | `/api/messages/{id}` | 删除消息 | -**路径参数:** -- `conversation_id`: 会话ID +### 发送消息请求体 -**查询参数:** -- `limit` (可选): 返回数量,默认100 - -**请求头:** `Authorization: Bearer ` - -### POST /api/messages/ -发送消息(非流式) - -**请求头:** `Authorization: Bearer ` - -**请求体:** ```json { "conversation_id": "conv_xxx", @@ -189,45 +77,122 @@ } ``` -**响应:** +### SSE 事件类型 + +| 事件 | 说明 | +|------|------| +| `text` | 文本增量 | +| `tool_call` | 工具调用 | +| `tool_result` | 工具结果 | +| `done` | 完成 | +| `error` | 错误 | + +--- + +## 聊天室 `/api/chat-rooms` + +| 方法 | 路径 | 说明 | +|------|------|------| +| GET | `/api/chat-rooms/` | 获取聊天室列表 | +| POST | `/api/chat-rooms/` | 创建聊天室 | +| GET | `/api/chat-rooms/{room_id}` | 获取聊天室详情 | +| PUT | `/api/chat-rooms/{room_id}` | 更新聊天室 | +| DELETE | `/api/chat-rooms/{room_id}` | 删除聊天室 | +| GET | `/api/chat-rooms/{room_id}/agents` | 获取聊天室中的 Agent 列表 | +| POST | `/api/chat-rooms/{room_id}/agents` | 添加 Agent 到聊天室 | +| DELETE | `/api/chat-rooms/{room_id}/agents/{agent_id}` | 从聊天室移除 Agent | +| GET | `/api/chat-rooms/{room_id}/messages` | 获取聊天室消息历史 | +| POST | `/api/chat-rooms/{room_id}/messages` | 发送消息到聊天室(SSE 流式响应) | + +### 创建聊天室请求体 + ```json { - "success": true, - "data": { - "user_message": {...}, - "assistant_message": {...} - } + "name": "聊天室名称", + "description": "描述(可选)", + "agent_ids": ["agent-1", "agent-2"] } ``` -### POST /api/messages/stream -发送消息(流式响应) +### 添加 Agent 请求体 -使用 Server-Sent Events (SSE) 返回流式响应。 +```json +{ + "agent_id": "agent-1" +} +``` -**事件类型:** -- `text`: 文本增量 -- `tool_call`: 工具调用 -- `tool_result`: 工具结果 -- `done`: 完成 -- `error`: 错误 +--- -### DELETE /api/messages/{id} -删除消息 +## WebSocket `/ws/chat-room/{room_id}` + +### 连接参数 + +| 参数 | 类型 | 说明 | 示例 | +|------|------|------|------| +| `participant_type` | string | `user` 或 `agent` | `user` | +| `participant_id` | string | 用户 ID 或 Agent ID | `1` | +| `participant_name` | string | 显示名称 | `John` | + +### 连接示例 + +**用户连接:** +``` +ws://host/ws/chat-room/room-123?participant_type=user&participant_id=1&participant_name=John +``` + +**Agent 连接:** +``` +ws://host/ws/chat-room/room-123?participant_type=agent&participant_id=agent-1&participant_name=Assistant +``` + +### 发送消息 + +```json +{ + "action": "send_message", + "content": "Hello everyone!" +} +``` + +### 接收事件 + +| 事件 | 说明 | 数据示例 | +|------|------|---------| +| `connected` | 连接成功 | `{"room_id": "xxx", "type": "user"}` | +| `history` | 历史消息 | `{"messages": [...]}` | +| `agents` | Agent 列表 | `{"agents": [...]}` | +| `message` | 新消息 | 消息对象 | +| `typing` | 打字状态 | `{"agent_id": "xxx", "is_typing": true}` | +| `process_step` | 处理步骤 | 步骤对象 | +| `done` | 完成 | `{"message_id": "xxx", "token_count": 100}` | +| `error` | 错误 | `{"content": "error message"}` | +| `system` | 系统消息 | `{"content": "John joined", "type": "user_join"}` | + +--- + +## Agent `/api/agents` + +| 方法 | 路径 | 说明 | +|------|------|------| +| GET | `/api/agents/` | 获取 Agent 列表 | +| POST | `/api/agents/` | 创建 Agent | +| GET | `/api/agents/{id}` | 获取 Agent 详情 | +| PUT | `/api/agents/{id}` | 更新 Agent | +| DELETE | `/api/agents/{id}` | 删除 Agent | --- ## 工具 `/api/tools` -### GET /api/tools/ -获取可用工具列表 +| 方法 | 路径 | 说明 | +|------|------|------| +| GET | `/api/tools/` | 获取可用工具列表 | +| GET | `/api/tools/{name}` | 获取工具详情 | +| POST | `/api/tools/{name}/execute` | 手动执行工具 | -**查询参数:** -- `category` (可选): 工具分类 +### 工具列表响应 -**请求头:** `Authorization: Bearer ` - -**响应:** ```json { "success": true, @@ -236,24 +201,78 @@ "categorized": { "crawler": [...], "code": [...], - "data": [...], - "weather": [...] + "data": [...] }, "total": 11 } } ``` -### GET /api/tools/{name} -获取工具详情 +### 执行工具请求体 -### POST /api/tools/{name}/execute -手动执行工具 - -**请求体:** ```json { "arg1": "value1", "arg2": "value2" } ``` + +--- + +## LLM 提供商 `/api/providers` + +| 方法 | 路径 | 说明 | +|------|------|------| +| GET | `/api/providers/` | 获取提供商列表 | +| POST | `/api/providers/` | 创建提供商 | +| GET | `/api/providers/{id}` | 获取提供商详情 | +| PUT | `/api/providers/{id}` | 更新提供商 | +| DELETE | `/api/providers/{id}` | 删除提供商 | +| POST | `/api/providers/{id}/test` | 测试提供商连接 | + +--- + +## 健康检查 + +| 方法 | 路径 | 说明 | +|------|------|------| +| GET | `/api/health` | 健康检查 | +| GET | `/` | 服务信息 | + +--- + +## 通用响应格式 + +### 成功响应 + +```json +{ + "success": true, + "message": "操作成功", + "data": {...} +} +``` + +### 错误响应 + +```json +{ + "success": false, + "error": "错误信息", + "code": 404 +} +``` + +### 分页响应 + +```json +{ + "success": true, + "data": { + "items": [...], + "total": 100, + "page": 1, + "page_size": 20 + } +} +``` diff --git a/assets/ARCHITECTURE.md b/assets/ARCHITECTURE.md index 06c423e..7b21d58 100644 --- a/assets/ARCHITECTURE.md +++ b/assets/ARCHITECTURE.md @@ -17,35 +17,57 @@ ``` luxx/ -├── __init__.py # FastAPI 应用工厂 -├── config.py # 配置管理(YAML) -├── database.py # 数据库连接 -├── models.py # ORM 模型 -├── routes/ # API 路由层 -│ ├── __init__.py # 路由聚合 -│ ├── auth.py # 认证 (登录/注册) -│ ├── conversations.py # 会话管理 (CRUD) -│ ├── messages.py # 消息处理 (流式/同步) -│ ├── providers.py # LLM 提供商管理 -│ └── tools.py # 工具管理 -├── services/ # 服务层 -│ ├── chat.py # 聊天服务 (Agentic Loop) -│ └── llm_client.py # LLM 客户端 -├── tools/ # 工具系统 -│ ├── core.py # 核心类 (ToolRegistry, ToolDefinition, ToolResult) -│ ├── factory.py # @tool 装饰器 -│ ├── executor.py # 工具执行器 (缓存/并行) -│ ├── services.py # 工具服务层 -│ └── builtin/ # 内置工具 -│ ├── __init__.py # 工具注册入口 -│ ├── code.py # 代码执行 (python_execute, python_eval) -│ ├── crawler.py # 网页爬虫 (web_search, web_fetch, batch_fetch) -│ └── data.py # 数据处理 (process_data) -└── utils/ # 工具函数 - └── helpers.py # 密码哈希、ID生成、响应封装 +├── __init__.py # FastAPI 应用工厂 +├── config.py # 配置管理(YAML) +├── core/ +│ ├── __init__.py +│ ├── config.py # 配置管理 +│ └── database.py # 数据库连接 +├── models/ # ORM 模型 +│ ├── __init__.py +│ ├── user.py # User, LLMProvider, Project +│ ├── chat.py # Conversation, Message +│ ├── room.py # ChatRoom, Agent, RoomParticipant, ChatRoomMessage +│ └── participant.py # Participant (统一参与者抽象) +├── services/ # 服务层 +│ ├── __init__.py +│ ├── chat.py # 聊天服务 (Agentic Loop) +│ ├── room.py # 聊天室服务 (多 Agent 编排) +│ ├── participant.py # 统一参与者服务 +│ ├── room_ws.py # WebSocket 处理 +│ ├── llm_service.py # LLM 服务 +│ ├── message_service.py # 消息服务 +│ ├── stream_service.py # 流式响应服务 +│ ├── task.py # 任务服务 +│ └── agent.py # Agent 管理 +├── agents/ # Agent 系统 +│ ├── __init__.py +│ ├── base.py # BaseAgent 抽象类 +│ └── registry.py # Agent 注册表 +├── api/ # API 路由层 +│ ├── __init__.py +│ ├── auth.py # 认证 (登录/注册) +│ ├── conversations.py # 会话管理 (CRUD) +│ ├── messages.py # 消息处理 +│ ├── providers.py # LLM 提供商管理 +│ ├── tools.py # 工具管理 +│ ├── agents.py # Agent 管理 +│ └── rooms.py # 聊天室管理 +├── tools/ # 工具系统 +│ ├── __init__.py +│ ├── core.py # 核心类 (ToolRegistry, ToolDefinition, ToolResult) +│ ├── factory.py # @tool 装饰器 +│ ├── executor.py # 工具执行器 +│ └── builtin/ # 内置工具 +│ ├── __init__.py +│ ├── code.py # 代码执行 +│ ├── crawler.py # 网页爬虫 +│ └── data.py # 数据处理 +└── utils/ # 工具函数 + └── helpers.py # 密码哈希、ID生成、响应封装 -run.py # 应用入口文件 -config.yaml # 配置文件 +run.py # 应用入口文件 +config.yaml # 配置文件 ``` ## 核心组件 @@ -55,144 +77,70 @@ FastAPI 应用入口,使用 lifespan 管理生命周期: - 启动:初始化数据库、注册内置工具、创建默认管理员用户 - 关闭:清理资源 -```python -# 默认管理员账号 -username: admin -password: admin123 -``` - -### 2. 配置管理 (`config.py`) +### 2. 配置管理 (`core/config.py`) 使用 YAML 文件管理配置: - 配置文件:`config.yaml` - 环境变量替换:`${VAR_NAME}` - 单例模式全局访问 -- 默认值支持 -```yaml -# config.yaml 示例 -app: - secret_key: ${APP_SECRET_KEY} - debug: true - -database: - type: sqlite - url: sqlite:///./chat.db - -llm: - provider: deepseek - api_key: ${DEEPSEEK_API_KEY} - api_url: https://api.deepseek.com/v1 -``` - -### 3. 数据库 (`database.py`) +### 3. 数据库 (`core/database.py`) - SQLAlchemy 同步支持 - SQLite 默认数据库 - 依赖注入获取会话 -### 4. ORM 模型 (`models.py`) +### 4. ORM 模型 + +#### 统一参与者架构 ```mermaid -erDiagram - USER { - int id PK - string username UK - string email UK - string password_hash - string role - boolean is_active - datetime created_at +classDiagram + class Participant { + +str participant_id + +str name + +ParticipantType participant_type + +from_user(User) Participant + +from_agent(...) Participant + +from_participant_id(str) Participant } - - PROJECT { - string id PK - int user_id FK - string name - text description - datetime created_at - datetime updated_at + + class ParticipantType { + <> + USER = "user" + AGENT = "agent" } - - CONVERSATION { - string id PK - int user_id FK - int provider_id FK "optional" - string project_id FK "optional" - string title - string model - text system_prompt - float temperature - int max_tokens - boolean thinking_enabled - datetime created_at - datetime updated_at + + class RoomParticipant { + +str room_id + +str agent_id + +int user_id + +str role + +participant_id property + +participant_type property } - - MESSAGE { - string id PK - string conversation_id FK - string role - longtext content "JSON 格式" - int token_count - text usage "JSON 格式" - datetime created_at + + class ChatRoomMessage { + +str participant_id + +sender_type property + +sender_id property } - - LLM_PROVIDER { - int id PK - int user_id FK - string name - string provider_type - string base_url - string api_key - string default_model - int max_tokens - boolean is_default - boolean enabled - datetime created_at - datetime updated_at - } - - USER ||--o{ PROJECT : "has" - USER ||--o{ CONVERSATION : "has" - USER ||--o{ LLM_PROVIDER : "configures" - PROJECT ||--o{ CONVERSATION : "contains" - LLM_PROVIDER ||--o{ CONVERSATION : "uses" - CONVERSATION ||--o{ MESSAGE : "has" + + Participant --> ParticipantType + RoomParticipant --> Participant + ChatRoomMessage --> Participant ``` -### Message Content JSON 结构 +### 5. 聊天室系统 -`content` 字段统一使用 JSON 格式存储: +#### WebSocket 端点 +- `/ws/chat-room/{room_id}?participant_type=user&participant_id=123&participant_name=John` +- `/ws/chat-room/{room_id}?participant_type=agent&participant_id=agent-1&participant_name=Assistant` -**User 消息:** +#### ParticipantService +统一处理用户和 Agent 的消息: +- `process_message()` - 处理来自任何参与者的消息 +- `send_message()` - 主动发送消息 -```json -{ - "text": "用户输入的文本内容", - "attachments": [ - {"name": "utils.py", "extension": "py", "content": "..."} - ] -} -``` - -**Assistant 消息:** - -```json -{ - "text": "AI 回复的文本内容", - "tool_calls": [...], - "steps": [ - {"id": "step-0", "index": 0, "type": "thinking", "content": "..."}, - {"id": "step-1", "index": 1, "type": "text", "content": "..."}, - {"id": "step-2", "index": 2, "type": "tool_call", "id_ref": "call_xxx", "name": "...", "arguments": "..."}, - {"id": "step-3", "index": 3, "type": "tool_result", "id_ref": "call_xxx", "name": "...", "content": "...", "success": true} - ] -} -``` - -`steps` 字段是**渲染顺序的唯一数据源**,按 `index` 顺序排列。thinking、text、tool_call、tool_result 可以在多轮迭代中穿插出现。 - -### 5. 工具系统 +### 6. 工具系统 ```mermaid classDiagram @@ -219,21 +167,13 @@ classDiagram +register(tool) void +get(name) ToolDefinition? +list_all() List~dict~ - +list_by_category(category) List~dict~ +execute(name, arguments) dict - +remove(name) bool } class ToolExecutor { +enable_cache: bool - +cache_ttl: int - +max_workers: int - +_cache: Dict - +_call_history: List +process_tool_calls(tool_calls, context) list +process_tool_calls_parallel(tool_calls, context) list - +clear_cache() void - +get_history(limit) List } ``` @@ -250,8 +190,6 @@ classDiagram #### 工具开发规范 -所有工具必须遵循统一的开发规范,确保错误处理和返回格式一致。 - **核心原则:装饰器自动处理一切,工具函数只写业务逻辑** ```python @@ -268,7 +206,7 @@ def my_tool(arguments: dict): # 业务逻辑 - 只管返回数据 data = fetch_data(arguments["arg1"]) return {"items": data, "count": len(data)} - + # 或者直接抛出异常(装饰器自动捕获并转换) if invalid: raise ValueError("Invalid input") @@ -279,7 +217,7 @@ def my_tool(arguments: dict): 2. 所有异常捕获和转换 3. 结果格式统一包装 -**返回格式转换** +**返回格式转换:** | 工具函数返回/抛出 | 装饰器转换为 | |-------------------|-------------| @@ -287,73 +225,52 @@ def my_tool(arguments: dict): | `raise ValueError("msg")` | `{"success": false, "data": null, "error": "ValueError: msg"}` | | `raise Exception()` | `{"success": false, "data": null, "error": "..."}` | -**工具调用流程** +#### 工具调用流程 -``` -LLM 请求 - ↓ -ToolRegistry.execute(name, args) - ↓ -@tool 装饰器 - ├─ 验证 required_params - ├─ 执行工具函数 (try-except 包裹) - ├─ 捕获异常 → 转换为 error - ├─ 包装返回格式 - └─ 返回 ToolResult - ↓ -ToolExecutor 返回结果 - ↓ -前端 ProcessBlock 显示 +```mermaid +flowchart TD + A[LLM 请求] --> B{ToolRegistry.execute} + B --> C[@tool 装饰器] + C --> D{验证 required_params} + D -->|失败| E[返回 error] + D -->|成功| F[执行工具函数] + F --> G{try-except} + G -->|成功| H[包装 ToolResult] + G -->|异常| I[捕获并转换 error] + I --> H + H --> J[返回 ToolResult] + J --> K[前端 ProcessBlock 显示] ``` -### 6. 服务层 +#### Message Content JSON 结构 -#### ChatService (`services/chat.py`) -核心聊天服务: -- Agentic Loop 迭代执行(最多 10 轮) -- 流式 SSE 响应 -- 工具调用编排(并行执行) -- 消息历史管理 -- 自动重试机制 -- 支持 thinking_content 提取 -- Token 用量追踪 +`content` 字段统一使用 JSON 格式存储: -#### LLMClient (`services/llm_client.py`) -LLM API 客户端: -- 多提供商:DeepSeek、GLM、OpenAI -- 流式/同步调用 -- 错误处理和重试 -- Token 计数 +**User 消息:** +```json +{ + "text": "用户输入的文本内容", + "attachments": [ + {"name": "utils.py", "extension": "py", "content": "..."} + ] +} +``` -### 7. 认证系统 (`routes/auth.py`) -- JWT Bearer Token -- Bcrypt 密码哈希 -- 用户注册/登录 +**Assistant 消息:** +```json +{ + "text": "AI 回复的文本内容", + "tool_calls": [...], + "steps": [ + {"id": "step-0", "index": 0, "type": "thinking", "content": "..."}, + {"id": "step-1", "index": 1, "type": "text", "content": "..."}, + {"id": "step-2", "index": 2, "type": "tool_call", "id_ref": "call_xxx", "name": "...", "arguments": "..."}, + {"id": "step-3", "index": 3, "type": "tool_result", "id_ref": "call_xxx", "name": "...", "content": "...", "success": true} + ] +} +``` -### 8. API 路由 - -| 路由 | 方法 | 说明 | -|------|------|------| -| `/auth/register` | POST | 用户注册 | -| `/auth/login` | POST | 用户登录 | -| `/conversations` | GET | 会话列表(分页) | -| `/conversations` | POST | 创建会话 | -| `/conversations/{id}` | GET | 会话详情 | -| `/conversations/{id}` | PUT | 更新会话 | -| `/conversations/{id}` | DELETE | 删除会话 | -| `/messages` | GET | 消息列表 | -| `/messages` | POST | 发送消息(同步) | -| `/messages/stream` | POST | 发送消息(流式 SSE) | -| `/messages/{id}` | DELETE | 删除消息 | -| `/providers` | GET | LLM 提供商列表 | -| `/providers` | POST | 创建提供商 | -| `/providers/{id}` | GET | 提供商详情 | -| `/providers/{id}` | PUT | 更新提供商 | -| `/providers/{id}` | DELETE | 删除提供商 | -| `/providers/{id}/test` | POST | 测试提供商连接 | -| `/tools` | GET | 可用工具列表 | -| `/health` | GET | 健康检查 | -| `/` | GET | 服务信息 | +**steps 字段是渲染顺序的唯一数据源**,按 `index` 顺序排列。 ## 数据流 @@ -362,63 +279,43 @@ LLM API 客户端: ```mermaid sequenceDiagram participant Client - participant API as POST /messages/stream - participant CS as ChatService + participant API as WebSocket/API + participant PS as ParticipantService + participant RS as ChatRoomService participant LLM as LLM API participant TE as ToolExecutor - - Client->>API: POST {content, tools, thinking_enabled} - API->>CS: stream_response() - - loop MAX_ITERATIONS (10) - CS->>LLM: call(messages, tools) - LLM-->>CS: SSE Stream - + + Client->>API: send_message(content) + API->>PS: process_message() + PS->>RS: save_message() + PS->>RS: dispatch to agents + + loop Agentic Loop + RS->>LLM: call(messages, tools) + LLM-->>RS: SSE Stream alt tool_calls - CS->>TE: process_tool_calls_parallel() - TE-->>CS: tool_results - CS->>CS: 追加到 messages + RS->>TE: process_tool_calls_parallel() + TE-->>RS: tool_results end end - - CS->>CS: _save_message() - CS->>API: SSE Stream - API-->>Client: 流式响应 + + RS-->>PS: process_step events + PS-->>Client: SSE Stream ``` -## SSE 事件 +## WebSocket 事件 | 事件 | 说明 | |------|------| -| `process_step` | 结构化步骤(thinking/text/tool_call/tool_result),携带 `id`、`index` 确保渲染顺序 | -| `done` | 响应完成,携带 message_id、token_count、usage | +| `connected` | 连接成功 | +| `history` | 历史消息 | +| `agents` | 聊天室 Agent 列表 | +| `message` | 新消息 | +| `typing` | 打字状态 | +| `process_step` | Agent 处理步骤 | +| `done` | 处理完成 | | `error` | 错误信息 | - -### process_step 事件格式 - -```json -{"type": "process_step", "step": {"id": "step-0", "index": 0, "type": "thinking", "content": "..."}} -{"type": "process_step", "step": {"id": "step-1", "index": 1, "type": "text", "content": "回复文本..."}} -{"type": "process_step", "step": {"id": "step-2", "index": 2, "type": "tool_call", "id_ref": "call_abc", "name": "web_search", "arguments": "{\"query\": \"...\"}"}} -{"type": "process_step", "step": {"id": "step-3", "index": 3, "type": "tool_result", "id_ref": "call_abc", "name": "web_search", "content": "{...}", "success": true}} -``` - -| 字段 | 说明 | -|------|------| -| `id` | 步骤唯一标识(格式 `step-{index}`) | -| `index` | 步骤序号,确保按正确顺序显示 | -| `type` | 步骤类型:`thinking` / `text` / `tool_call` / `tool_result` | -| `id_ref` | 工具调用引用 ID(仅 tool_call/tool_result) | -| `name` | 工具名称(仅 tool_call/tool_result) | -| `arguments` | 工具调用参数 JSON 字符串(仅 tool_call) | -| `content` | 内容(thinking 的思考内容、text 的文本、tool_result 的返回结果) | -| `success` | 工具执行是否成功(仅 tool_result) | - -### done 事件格式 - -```json -{"type": "done", "message_id": "uuid", "token_count": 1234, "usage": {"prompt_tokens": 100, "completion_tokens": 200, "total_tokens": 300}} -``` +| `system` | 系统消息 (join/leave) | ## 配置示例 @@ -454,35 +351,3 @@ tools: | `APP_SECRET_KEY` | 应用密钥 | `your-secret-key` | | `DEEPSEEK_API_KEY` | DeepSeek API | `sk-xxxx` | | `DATABASE_URL` | 数据库连接 | `sqlite:///./chat.db` | - -## 项目结构说明 - -### 入口文件 - -- `run.py` - 启动 Uvicorn 服务器 - -### 响应格式 - -所有 API 统一使用响应封装: - -```json -// 成功 -{"success": true, "data": {...}, "message": "操作成功"} - -// 错误 -{"success": false, "error": "错误信息", "code": 404} -``` - -### 工具缓存机制 - -ToolExecutor 支持结果缓存: -- TTL: 5 分钟(可配置) -- 缓存 Key: `{tool_name}:{sorted_arguments_json}` -- 调用历史记录最近 1000 条 - -### 流式响应特点 - -1. 实时返回 thinking_content(模型思考过程) -2. 实时返回 text 增量更新 -3. 工具调用串行执行,结果批量返回 -4. 最终 `done` 事件包含完整 message_id 和 token 用量 diff --git a/assets/DATABASE.md b/assets/DATABASE.md index 2cf33e3..389ec3a 100644 --- a/assets/DATABASE.md +++ b/assets/DATABASE.md @@ -90,21 +90,13 @@ erDiagram int owner_id FK bool is_active datetime created_at - } - - CHAT_ROOM_AGENT { - string id PK - string chat_room_id FK - string agent_id FK - bool is_active - datetime joined_at + datetime updated_at } CHAT_ROOM_MESSAGE { string id PK string room_id FK - string sender_type - string sender_id + string participant_id string sender_name text content string mentions @@ -122,8 +114,6 @@ erDiagram LLM_PROVIDER ||--o{ AGENT : configures CONVERSATION ||--o{ MESSAGE : contains CHAT_ROOM ||--o{ CHAT_ROOM_MESSAGE : contains - CHAT_ROOM ||--o{ CHAT_ROOM_AGENT : has - AGENT ||--o{ CHAT_ROOM_AGENT : joins ``` ## 3. 表设计 @@ -227,24 +217,13 @@ erDiagram | is_active | BOOLEAN | DEFAULT 1 | 是否启用 | | created_at | DATETIME | - | 创建时间 | -### chat_room_agents 聊天室成员表 - -| 字段 | 类型 | 约束 | 说明 | -|------|------|------|------| -| id | VARCHAR(64) | PK | 关联 ID | -| chat_room_id | VARCHAR(64) | FK(chat_rooms.id) | 聊天室 ID | -| agent_id | VARCHAR(64) | FK(agents.id) | Agent ID | -| is_active | BOOLEAN | DEFAULT 1 | 是否启用 | -| joined_at | DATETIME | - | 加入时间 | - ### chat_room_messages 聊天室消息表 | 字段 | 类型 | 约束 | 说明 | |------|------|------|------| | id | VARCHAR(64) | PK | 消息 ID | | room_id | VARCHAR(64) | FK(chat_rooms.id) | 聊天室 ID | -| sender_type | VARCHAR(16) | NOT NULL | user/agent/system | -| sender_id | VARCHAR(64) | NOT NULL | 发送者 ID | +| participant_id | VARCHAR(64) | NOT NULL | 统一参与者 ID (格式: "user:123" 或 "agent:abc") | | sender_name | VARCHAR(50) | NOT NULL | 发送者名称 | | content | TEXT | NOT NULL | 消息内容 | | mentions | TEXT | - | @ 提及列表(JSON) | @@ -252,32 +231,41 @@ erDiagram | token_count | INTEGER | DEFAULT 0 | Token 数 | | created_at | DATETIME | - | 创建时间 | +**participant_id 格式:** +- 用户: `user:{user_id}` (如 `user:123`) +- Agent: `agent:{agent_id}` (如 `agent:abc-001`) + ## 4. 流程图 -### 消息流 +### 消息流(统一参与者) ```mermaid sequenceDiagram - participant U as 用户 + participant U as User participant WS as WebSocket - participant R as ChatRoomService + participant PS as ParticipantService + participant RS as ChatRoomService participant A as Agent participant LLM as LLM U->>WS: 发送消息 - WS->>R: dispatch - R->>A: 触发 Agent + WS->>PS: process_message() + PS->>RS: save_message() + RS->>RS: 创建 participant_id + RS-->>PS: 消息保存成功 + PS->>A: 触发 Agent A->>LLM: stream_call LLM-->>A: SSE 流 - A-->>WS: process_step - WS-->>U: 实时推送 + A-->>PS: process_step + PS-->>WS: 实时推送 + WS-->>U: 显示 ``` ### Agent 调度 ```mermaid flowchart TD - A[用户消息] --> B{有 @ 提及?} + A[用户/Agent 消息] --> B{有 @ 提及?} B -->|是| C[触发指定 Agent] B -->|否| D[触发 auto_response Agent] diff --git a/luxx/__init__.py b/luxx/__init__.py index 6082ccc..604cd54 100644 --- a/luxx/__init__.py +++ b/luxx/__init__.py @@ -18,7 +18,7 @@ async def lifespan(app: FastAPI): # Import all models to ensure they are registered with Base from luxx.models.user import User, LLMProvider, Project from luxx.models.chat import Conversation, Message - from luxx.models.room import ChatRoom, Agent, ChatRoomAgent, ChatRoomMessage + from luxx.models.room import ChatRoom, Agent init_db() # Create default test user if not exists @@ -71,7 +71,25 @@ def create_app() -> FastAPI: from luxx.services.room_ws import websocket_handler @app.websocket("/ws/chat-room/{room_id}") - async def chat_room_websocket(websocket: WebSocket, room_id: str): + async def chat_room_websocket( + websocket: WebSocket, + room_id: str, + participant_id: str = None, + participant_type: str = "user", + participant_name: str = None + ): + """ + WebSocket endpoint for chat room participation. + + Query parameters: + - participant_id: ID of the user or agent connecting + - participant_type: "user" or "agent" (default: "user") + - participant_name: Display name for the participant + + Example: + - User: ws://host/ws/chat-room/{room_id}?participant_id=123&participant_type=user&participant_name=John + - Agent: ws://host/ws/chat-room/{room_id}?participant_id=agent-1&participant_type=agent&participant_name=Assistant + """ await websocket_handler(websocket, room_id) # Health check diff --git a/luxx/agents/builtins/__init__.py b/luxx/agents/builtins/__init__.py deleted file mode 100644 index fcffce8..0000000 --- a/luxx/agents/builtins/__init__.py +++ /dev/null @@ -1 +0,0 @@ -# Builtins package - user-defined agent templates can be placed here diff --git a/luxx/models/__init__.py b/luxx/models/__init__.py index 600c2b3..838d782 100644 --- a/luxx/models/__init__.py +++ b/luxx/models/__init__.py @@ -1,10 +1,12 @@ """Models package""" from luxx.models.user import User, LLMProvider, Project from luxx.models.chat import Conversation, Message -from luxx.models.room import ChatRoom, Agent, ChatRoomAgent, ChatRoomMessage +from luxx.models.room import ChatRoom, Agent +from luxx.models.participant import Participant, ParticipantType __all__ = [ "User", "LLMProvider", "Project", "Conversation", "Message", - "ChatRoom", "Agent", "ChatRoomAgent", "ChatRoomMessage" + "ChatRoom", "Agent", + "Participant", "ParticipantType", ] diff --git a/luxx/models/chat.py b/luxx/models/chat.py index e86cc20..d5d1dda 100644 --- a/luxx/models/chat.py +++ b/luxx/models/chat.py @@ -1,4 +1,5 @@ """Chat-related models""" +import json from datetime import datetime from typing import Optional, List from sqlalchemy import String, Integer, Boolean, Float, Text, DateTime, ForeignKey @@ -18,17 +19,16 @@ class Conversation(Base): id: Mapped[str] = mapped_column(String(64), primary_key=True) user_id: Mapped[int] = mapped_column(Integer, ForeignKey("users.id"), nullable=False) provider_id: Mapped[Optional[int]] = mapped_column(Integer, ForeignKey("llm_providers.id"), nullable=True) - project_id: Mapped[Optional[str]] = mapped_column(String(64), ForeignKey("projects.id"), nullable=True) + project_id: Mapped[Optional[str]] = mapped_column(String(64), nullable=True) title: Mapped[str] = mapped_column(String(255), nullable=False) model: Mapped[str] = mapped_column(String(64), nullable=False, default="deepseek-chat") - system_prompt: Mapped[str] = mapped_column(Text, nullable=False, default="You are a helpful assistant.") + system_prompt: Mapped[str] = mapped_column(Text, nullable=False, default="You are helpful.") temperature: Mapped[float] = mapped_column(Float, default=0.7) max_tokens: Mapped[int] = mapped_column(Integer, default=2000) thinking_enabled: Mapped[bool] = mapped_column(Boolean, default=False) created_at: Mapped[datetime] = mapped_column(DateTime, default=local_now) updated_at: Mapped[datetime] = mapped_column(DateTime, default=local_now, onupdate=local_now) - # Relationships user: Mapped["User"] = relationship("User", back_populates="conversations") provider: Mapped[Optional["LLMProvider"]] = relationship("LLMProvider") messages: Mapped[List["Message"]] = relationship( @@ -53,31 +53,43 @@ class Conversation(Base): class Message(Base): - """Message model. + """Unified Message model for Conversation and ChatRoom. - content 字段统一使用 JSON 格式存储: + role: user/assistant/system/tool + content: JSON format with text, attachments, tool_calls, steps """ __tablename__ = "messages" id: Mapped[str] = mapped_column(String(64), primary_key=True) - conversation_id: Mapped[str] = mapped_column(String(64), ForeignKey("conversations.id"), nullable=False) + conversation_id: Mapped[Optional[str]] = mapped_column(String(64), ForeignKey("conversations.id"), nullable=True) + room_id: Mapped[Optional[str]] = mapped_column(String(64), nullable=True) role: Mapped[str] = mapped_column(String(16), nullable=False) content: Mapped[str] = mapped_column(Text, nullable=False, default="") + sender_name: Mapped[str] = mapped_column(String(50), nullable=False, default="") + mentions: Mapped[Optional[str]] = mapped_column(Text, nullable=True) token_count: Mapped[int] = mapped_column(Integer, default=0) usage: Mapped[Optional[str]] = mapped_column(Text, nullable=True) + + conversation: Mapped[Optional["Conversation"]] = relationship("Conversation", back_populates="messages") created_at: Mapped[datetime] = mapped_column(DateTime, default=local_now) - # Relationships - conversation: Mapped["Conversation"] = relationship("Conversation", back_populates="messages") + @property + def target_type(self) -> str: + return "conversation" if self.conversation_id else "room" + + @property + def target_id(self) -> str: + return self.conversation_id or self.room_id or "" def to_dict(self): - """Convert to dictionary, extracting process_steps for frontend""" - import json - result = { "id": self.id, "conversation_id": self.conversation_id, + "room_id": self.room_id, + "target_type": self.target_type, + "target_id": self.target_id, "role": self.role, + "sender_name": self.sender_name, "token_count": self.token_count, "created_at": self.created_at.isoformat() if self.created_at else None } @@ -89,11 +101,19 @@ class Message(Base): except json.JSONDecodeError: result["usage"] = None + # Parse mentions JSON + if self.mentions: + try: + result["mentions"] = json.loads(self.mentions) + except json.JSONDecodeError: + result["mentions"] = [] + else: + result["mentions"] = [] + # Parse content JSON try: content_obj = json.loads(self.content) if self.content else {} except json.JSONDecodeError: - result["content"] = self.content result["text"] = self.content result["attachments"] = [] result["tool_calls"] = [] @@ -105,7 +125,7 @@ class Message(Base): result["tool_calls"] = content_obj.get("tool_calls", []) result["process_steps"] = content_obj.get("steps", []) - if "content" not in result: + if "content" not in content_obj: result["content"] = result["text"] return result diff --git a/luxx/models/participant.py b/luxx/models/participant.py new file mode 100644 index 0000000..b90e15c --- /dev/null +++ b/luxx/models/participant.py @@ -0,0 +1,72 @@ +"""Participant model - unified participant for users and agents.""" +from datetime import datetime +from typing import Optional, Dict, Any, TYPE_CHECKING +from dataclasses import dataclass +from enum import Enum + +if TYPE_CHECKING: + from luxx.models.user import User + + +class ParticipantType(Enum): + USER = "user" + AGENT = "agent" + + +@dataclass +class Participant: + """Unified participant abstraction for users and agents.""" + participant_id: str + name: str + participant_type: ParticipantType + avatar: Optional[str] = None + role: Optional[str] = None # ChatRoom role: owner/admin/member + permission_level: int = 1 + is_active: bool = True + created_at: Optional[datetime] = None + + @property + def is_agent(self) -> bool: + return self.participant_type == ParticipantType.AGENT + + @property + def is_user(self) -> bool: + return self.participant_type == ParticipantType.USER + + def to_dict(self) -> Dict[str, Any]: + return { + "id": self.participant_id, + "name": self.name, + "type": self.participant_type.value, + "avatar": self.avatar, + "role": self.role, + "permission_level": self.permission_level, + "is_active": self.is_active, + "created_at": self.created_at.isoformat() if self.created_at else None + } + + @classmethod + def from_user(cls, user: "User") -> "Participant": + return cls( + participant_id=f"user:{user.id}", + name=user.username, + participant_type=ParticipantType.USER, + permission_level=user.permission_level, + is_active=user.is_active, + created_at=user.created_at + ) + + @classmethod + def from_agent( + cls, agent_id: str, name: str, role: str = None, + avatar: str = None, permission_level: int = 1, is_active: bool = True + ) -> "Participant": + return cls( + participant_id=f"agent:{agent_id}", + name=name, + participant_type=ParticipantType.AGENT, + role=role, + avatar=avatar, + permission_level=permission_level, + is_active=is_active + ) diff --git a/luxx/models/room.py b/luxx/models/room.py index dc5f41f..e07b28e 100644 --- a/luxx/models/room.py +++ b/luxx/models/room.py @@ -1,18 +1,21 @@ -"""ChatRoom models""" +"""ChatRoom models - unified participant architecture""" from datetime import datetime -from typing import Optional, List +from typing import Optional, List, TYPE_CHECKING from sqlalchemy import String, Integer, Boolean, Text, DateTime, ForeignKey from sqlalchemy.orm import Mapped, mapped_column, relationship from luxx.core.database import Base +if TYPE_CHECKING: + from luxx.models.user import User + def local_now(): return datetime.now() class ChatRoom(Base): - """Chat Room model - like a group chat for multiple agents""" + """Chat Room - group chat for multiple participants""" __tablename__ = "chat_rooms" id: Mapped[str] = mapped_column(String(64), primary_key=True) @@ -23,39 +26,26 @@ class ChatRoom(Base): created_at: Mapped[datetime] = mapped_column(DateTime, default=local_now) updated_at: Mapped[datetime] = mapped_column(DateTime, default=local_now, onupdate=local_now) - # Relationships owner: Mapped["User"] = relationship("User", backref="chat_rooms") - agents: Mapped[List["ChatRoomAgent"]] = relationship( - "ChatRoomAgent", back_populates="chat_room", cascade="all, delete-orphan" - ) - messages: Mapped[List["ChatRoomMessage"]] = relationship( - "ChatRoomMessage", back_populates="chat_room", cascade="all, delete-orphan" - ) - def to_dict(self, include_agents: bool = False): - result = { - "id": self.id, - "name": self.name, - "description": self.description, - "owner_id": self.owner_id, - "is_active": self.is_active, + def to_dict(self): + return { + "id": self.id, "name": self.name, "description": self.description, + "owner_id": self.owner_id, "is_active": self.is_active, "created_at": self.created_at.isoformat() if self.created_at else None, "updated_at": self.updated_at.isoformat() if self.updated_at else None } - if include_agents and self.agents: - result["agents"] = [ca.to_dict() for ca in self.agents] - return result class Agent(Base): - """Agent model - defines an AI agent with specific role""" + """Agent model - defines an AI agent""" __tablename__ = "agents" id: Mapped[str] = mapped_column(String(64), primary_key=True) name: Mapped[str] = mapped_column(String(50), nullable=False) role: Mapped[str] = mapped_column(String(50), nullable=False, default="helper") avatar: Mapped[Optional[str]] = mapped_column(String(500), nullable=True) - system_prompt: Mapped[str] = mapped_column(Text, nullable=False, default="You are a helpful assistant.") + system_prompt: Mapped[str] = mapped_column(Text, nullable=False, default="You are helpful.") provider_id: Mapped[Optional[int]] = mapped_column(Integer, ForeignKey("llm_providers.id"), nullable=True) model: Mapped[Optional[str]] = mapped_column(String(100), nullable=True) tools: Mapped[Optional[str]] = mapped_column(Text, nullable=True) @@ -68,105 +58,24 @@ class Agent(Base): created_at: Mapped[datetime] = mapped_column(DateTime, default=local_now) updated_at: Mapped[datetime] = mapped_column(DateTime, default=local_now, onupdate=local_now) - # Relationships provider: Mapped[Optional["LLMProvider"]] = relationship("LLMProvider") - chat_rooms: Mapped[List["ChatRoomAgent"]] = relationship( - "ChatRoomAgent", back_populates="agent", cascade="all, delete-orphan" - ) def to_dict(self, include_secrets: bool = False): import json result = { - "id": self.id, - "name": self.name, - "role": self.role, - "avatar": self.avatar, - "system_prompt": self.system_prompt, - "provider_id": self.provider_id, - "model": self.model, - "is_active": self.is_active, - "priority": self.priority, - "auto_response": self.auto_response, - "mention_trigger": self.mention_trigger, + "id": self.id, "name": self.name, "role": self.role, "avatar": self.avatar, + "system_prompt": self.system_prompt, "provider_id": self.provider_id, "model": self.model, + "is_active": self.is_active, "priority": self.priority, + "auto_response": self.auto_response, "mention_trigger": self.mention_trigger, "temperature": float(self.temperature) if self.temperature else 0.7, "max_tokens": self.max_tokens, "created_at": self.created_at.isoformat() if self.created_at else None, "updated_at": self.updated_at.isoformat() if self.updated_at else None } - if self.tools: - try: - result["tools"] = json.loads(self.tools) - except json.JSONDecodeError: - result["tools"] = [] - else: + try: + result["tools"] = json.loads(self.tools) if self.tools else [] + except json.JSONDecodeError: result["tools"] = [] - if include_secrets and self.provider: result["provider"] = self.provider.to_dict(include_key=True) return result - - -class ChatRoomAgent(Base): - """Association table for ChatRoom and Agent""" - __tablename__ = "chat_room_agents" - - id: Mapped[str] = mapped_column(String(64), primary_key=True) - chat_room_id: Mapped[str] = mapped_column(String(64), ForeignKey("chat_rooms.id"), nullable=False) - agent_id: Mapped[str] = mapped_column(String(64), ForeignKey("agents.id"), nullable=False) - is_active: Mapped[bool] = mapped_column(Boolean, default=True) - joined_at: Mapped[datetime] = mapped_column(DateTime, default=local_now) - - # Relationships - chat_room: Mapped["ChatRoom"] = relationship("ChatRoom", back_populates="agents") - agent: Mapped["Agent"] = relationship("Agent", back_populates="chat_rooms") - - def to_dict(self): - return { - "id": self.id, - "chat_room_id": self.chat_room_id, - "agent_id": self.agent_id, - "is_active": self.is_active, - "joined_at": self.joined_at.isoformat() if self.joined_at else None, - "agent": self.agent.to_dict() if self.agent else None - } - - -class ChatRoomMessage(Base): - """Chat Room Message model""" - __tablename__ = "chat_room_messages" - - id: Mapped[str] = mapped_column(String(64), primary_key=True) - room_id: Mapped[str] = mapped_column(String(64), ForeignKey("chat_rooms.id"), nullable=False) - sender_type: Mapped[str] = mapped_column(String(16), nullable=False) - sender_id: Mapped[str] = mapped_column(String(64), nullable=False) - sender_name: Mapped[str] = mapped_column(String(50), nullable=False) - content: Mapped[str] = mapped_column(Text, nullable=False, default="") - mentions: Mapped[Optional[str]] = mapped_column(Text, nullable=True) - parent_id: Mapped[Optional[str]] = mapped_column(String(64), nullable=True) - token_count: Mapped[int] = mapped_column(Integer, default=0) - created_at: Mapped[datetime] = mapped_column(DateTime, default=local_now) - - # Relationships - chat_room: Mapped["ChatRoom"] = relationship("ChatRoom", back_populates="messages") - - def to_dict(self): - import json - result = { - "id": self.id, - "room_id": self.room_id, - "sender_type": self.sender_type, - "sender_id": self.sender_id, - "sender_name": self.sender_name, - "content": self.content, - "parent_id": self.parent_id, - "token_count": self.token_count, - "created_at": self.created_at.isoformat() if self.created_at else None - } - if self.mentions: - try: - result["mentions"] = json.loads(self.mentions) - except json.JSONDecodeError: - result["mentions"] = [] - else: - result["mentions"] = [] - return result diff --git a/luxx/models/user.py b/luxx/models/user.py index c0196bf..93ff31e 100644 --- a/luxx/models/user.py +++ b/luxx/models/user.py @@ -1,11 +1,15 @@ """User-related models""" from datetime import datetime -from typing import Optional, List +from typing import Optional, List, TYPE_CHECKING from sqlalchemy import String, Integer, Boolean, Text, DateTime, ForeignKey from sqlalchemy.orm import Mapped, mapped_column, relationship from luxx.core.database import Base +# Avoid circular import at runtime +if TYPE_CHECKING: + from luxx.models.chat import Conversation + def local_now(): return datetime.now() diff --git a/luxx/services/__init__.py b/luxx/services/__init__.py index 9bc4178..8222574 100644 --- a/luxx/services/__init__.py +++ b/luxx/services/__init__.py @@ -5,11 +5,14 @@ from luxx.services.agent import agent_manager from luxx.services.llm_service import llm_service, LLMService from luxx.services.message_service import message_service, MessageService from luxx.services.stream_service import stream_service, StreamService +from luxx.services.participant import participant_service, ParticipantService __all__ = [ "chat_service", "chat_room_service", "agent_manager", + "participant_service", + "ParticipantService", "ChatService", "llm_service", "LLMService", diff --git a/luxx/services/participant.py b/luxx/services/participant.py new file mode 100644 index 0000000..d5a526a --- /dev/null +++ b/luxx/services/participant.py @@ -0,0 +1,128 @@ +"""Participant Service - unified service for users and agents in chat rooms.""" +import logging +from typing import Dict, Any, Optional, AsyncGenerator + +from luxx.agents.base import BaseAgent +from luxx.agents.registry import agent_registry +from luxx.models.participant import Participant +from luxx.services.room import chat_room_service + +logger = logging.getLogger(__name__) + + +class ParticipantService: + """Unified service for managing participants in chat rooms.""" + + def __init__(self): + self._active_agents: Dict[str, BaseAgent] = {} + self._active_users: Dict[int, Any] = {} + + def _cm(self): + """Lazy import connection manager.""" + from luxx.services.room_ws import connection_manager + return connection_manager + + # ==================== Agent ==================== + + def register_agent(self, agent: BaseAgent) -> Participant: + self._active_agents[agent.agent_id] = agent + agent_registry.register(agent) + return Participant.from_agent( + agent.agent_id, agent.name, agent.role, agent.avatar, + agent.auto_response, agent.mention_trigger, agent.priority + ) + + def unregister_agent(self, agent_id: str) -> bool: + if agent_id in self._active_agents: + del self._active_agents[agent_id] + agent_registry.unregister(agent_id) + return True + return False + + def get_agent_participant(self, agent_id: str) -> Optional[Participant]: + agent = self._active_agents.get(agent_id) or chat_room_service.get_agent(agent_id) + if agent: + if agent_id not in self._active_agents: + self._active_agents[agent_id] = agent + return Participant.from_agent( + agent.agent_id, agent.name, agent.role, agent.avatar, + agent.auto_response, agent.mention_trigger, agent.priority + ) + return None + + # ==================== User ==================== + + def register_user(self, user) -> Participant: + self._active_users[user.id] = user + return Participant.from_user(user) + + def get_user_participant(self, user_id: int) -> Optional[Participant]: + user = self._active_users.get(user_id) + if not user: + from luxx.core.database import SessionLocal + from luxx.models.user import User + db = SessionLocal() + try: + user = db.query(User).filter(User.id == user_id).first() + if user: + self._active_users[user_id] = user + finally: + db.close() + return Participant.from_user(user) if user else None + + # ==================== Unified ==================== + + def get_participant(self, participant_id: str, ptype: str = "user") -> Optional[Participant]: + if ptype == "agent": + return self.get_agent_participant(participant_id) + try: + return self.get_user_participant(int(participant_id)) + except (ValueError, TypeError): + return None + + # ==================== Messages ==================== + + async def process_message( + self, room_id: str, content: str, sender_id: str, + sender_name: str, sender_type: str = "user", context: Dict = None + ) -> AsyncGenerator[Dict[str, Any], None]: + cm = self._cm() + + msg = chat_room_service.save_message(room_id, sender_type, sender_id, sender_name, content) + await cm.broadcast_to_room(room_id, {"event": "message", "data": msg}) + + room_agents = chat_room_service.get_room_agents(room_id) + if sender_type == "agent": + room_agents = [a for a in room_agents if a.agent_id != sender_id] + + for agent in room_agents: + await cm.broadcast_to_room(room_id, { + "event": "typing", + "data": {"agent_id": agent.agent_id, "agent_name": agent.name, "is_typing": True} + }) + + ctx = (context or {}) + ctx.update({"sender_type": sender_type, "sender_id": sender_id, "username": sender_name}) + + async for event in chat_room_service.process_message(room_id, content, sender_id, sender_name, ctx): + yield event + + for agent in room_agents: + await cm.broadcast_to_room(room_id, { + "event": "typing", + "data": {"agent_id": agent.agent_id, "agent_name": agent.name, "is_typing": False} + }) + + async def send_message( + self, room_id: str, participant_id: str, + participant_type: str, participant_name: str, content: str + ): + cm = self._cm() + msg = chat_room_service.save_message( + room_id, participant_type, participant_id, participant_name, content + ) + await cm.broadcast_to_room(room_id, {"event": "message", "data": msg}) + return msg + + +participant_service = ParticipantService() diff --git a/luxx/services/room.py b/luxx/services/room.py index b0209c9..e6fb445 100644 --- a/luxx/services/room.py +++ b/luxx/services/room.py @@ -1,177 +1,19 @@ """Chat Room Service - orchestrates multi-agent chat""" import json -import re import uuid -import asyncio import logging from typing import List, Dict, Any, Optional, AsyncGenerator -from dataclasses import dataclass from luxx.core.database import SessionLocal -from luxx.models.room import ChatRoom, Agent, ChatRoomAgent, ChatRoomMessage +from luxx.models.room import ChatRoom, Agent +from luxx.models.chat import Message from luxx.agents.base import BaseAgent logger = logging.getLogger(__name__) -# ==================== Dispatcher ==================== - -@dataclass -class DispatchResult: - """Result of message dispatch""" - triggered_agents: List[BaseAgent] - mentions: List[str] - should_respond: bool - - -class MessageDispatcher: - """Dispatcher for routing messages to agents""" - - @staticmethod - def parse_mentions(content: str) -> List[str]: - """Parse @mentions from message content""" - pattern = r'@(\w+)' - return re.findall(pattern, content) - - @staticmethod - def get_agents_by_names(names: List[str], room_agents: List[BaseAgent]) -> List[BaseAgent]: - """Get agents by their names (case-insensitive)""" - name_lower_map = {a.name.lower(): a for a in room_agents} - matched = [] - for name in names: - agent = name_lower_map.get(name.lower()) - if agent: - matched.append(agent) - return matched - - @staticmethod - def get_agents_by_ids(agent_ids: List[str], room_agents: List[BaseAgent]) -> List[BaseAgent]: - """Get agents by their IDs""" - id_set = set(agent_ids) - return [a for a in room_agents if a.agent_id in id_set] - - def dispatch(self, content: str, room_agents: List[BaseAgent], sender_id: str, sender_type: str = "user") -> DispatchResult: - """Dispatch a message to appropriate agents.""" - available_agents = [a for a in room_agents if a.agent_id != sender_id] - mentions = self.parse_mentions(content) - - if mentions: - triggered = self.get_agents_by_names(mentions, available_agents) - logger.info(f"Message with mentions: {mentions} -> triggered: {[a.name for a in triggered]}") - return DispatchResult(triggered_agents=triggered, mentions=mentions, should_respond=len(triggered) > 0) - - auto_agents = [a for a in available_agents if a.auto_response] - auto_agents.sort(key=lambda a: a.priority) - logger.info(f"Auto-response agents triggered: {[a.name for a in auto_agents]}") - return DispatchResult(triggered_agents=auto_agents, mentions=[], should_respond=len(auto_agents) > 0) - - -# ==================== Aggregator ==================== - -class ResponseAggregator: - """Aggregates responses from multiple agents""" - - def __init__(self, room_id: str): - self.room_id = room_id - self._agent_responses: Dict[str, Dict[str, Any]] = {} - - async def aggregate_stream(self, agent_streams: Dict[str, AsyncGenerator]) -> AsyncGenerator[Dict[str, Any], None]: - """Aggregate streaming responses from multiple agents.""" - if not agent_streams: - return - - import asyncio - - def parse_sse(event_str: str) -> Dict[str, Any]: - """Parse SSE string to dict.""" - lines = event_str.strip().split('\n') - result = {"event": None, "data": {}} - for line in lines: - if line.startswith('event: '): - result["event"] = line[7:].strip() - elif line.startswith('data: '): - try: - result["data"] = json.loads(line[6:].strip()) - except json.JSONDecodeError: - result["data"] = {"content": line[6:].strip()} - return result - - async def collect_agent_stream(agent_id: str, stream): - """Collect all events from a single agent stream.""" - try: - async for event in stream: - # Event is SSE string from BaseAgent - parsed = parse_sse(event) - parsed["agent_id"] = agent_id - yield parsed - except Exception as e: - logger.error(f"Agent {agent_id} stream error: {e}") - yield {"event": "error", "agent_id": agent_id, "data": {"content": str(e)}} - - # Use a queue-based approach for merging - queue = asyncio.Queue() - - async def producer(agent_id: str, stream): - try: - async for event in stream: - # Parse SSE string to dict if needed - if isinstance(event, str): - parsed = parse_sse(event) - parsed["agent_id"] = agent_id - await queue.put((agent_id, parsed)) - else: - # Already a dict, just add agent_id - if isinstance(event, dict): - event["agent_id"] = agent_id - await queue.put((agent_id, event)) - except Exception as e: - logger.error(f"Agent {agent_id} stream error: {e}") - await queue.put((agent_id, {"event": "error", "agent_id": agent_id, "data": {"content": str(e)}})) - finally: - await queue.put((agent_id, None)) # Signal done - - # Start all producers - producers = [ - asyncio.create_task(producer(agent_id, stream)) - for agent_id, stream in agent_streams.items() - ] - - active = len(producers) - while active > 0: - agent_id, event = await queue.get() - if event is None: - active -= 1 - else: - yield event - - # Wait for all producers to complete - await asyncio.gather(*producers, return_exceptions=True) - - def aggregate_final(self, responses: Dict[str, Dict[str, Any]]) -> List[Dict[str, Any]]: - """Aggregate final responses from agents.""" - results = [] - for agent_id, response in responses.items(): - if response.get("event") == "done": - results.append({ - "agent_id": agent_id, - "agent_name": response.get("agent_name"), - "message_id": response.get("message_id"), - "content": response.get("content"), - "token_count": response.get("token_count", 0) - }) - return results - - -# ==================== Chat Room Service ==================== - class ChatRoomService: - """Service for managing chat rooms with multi-agent support""" - - def __init__(self): - self.dispatcher = MessageDispatcher() - def get_room(self, room_id: str) -> Optional[ChatRoom]: - """Get a chat room by ID""" db = SessionLocal() try: return db.query(ChatRoom).filter(ChatRoom.id == room_id).first() @@ -179,50 +21,60 @@ class ChatRoomService: db.close() def get_room_agents(self, room_id: str) -> List[BaseAgent]: - """Get all active agents in a chat room""" + """Get active agents in a room from Message table""" db = SessionLocal() try: - room_agents = db.query(ChatRoomAgent).filter( - ChatRoomAgent.chat_room_id == room_id, - ChatRoomAgent.is_active == True - ).all() + # Query distinct agent records from messages + messages = db.query(Message).filter( + Message.room_id == room_id, + Message.role == "agent" + ).distinct().all() + + agent_ids = [] + seen = set() + for msg in messages: + # Extract agent_id from content JSON + try: + content = json.loads(msg.content) if msg.content else {} + agent_id = content.get("agent_id") + if agent_id and agent_id not in seen: + seen.add(agent_id) + agent_ids.append(agent_id) + except json.JSONDecodeError: + pass agents = [] - for ra in room_agents: - agent_db = db.query(Agent).filter(Agent.id == ra.agent_id, Agent.is_active == True).first() + for agent_id in agent_ids: + agent_db = db.query(Agent).filter( + Agent.id == agent_id, + Agent.is_active == True + ).first() if agent_db: agents.append(BaseAgent.from_model(agent_db)) - - agents.sort(key=lambda a: a.priority) - return agents + + return sorted(agents, key=lambda a: a.priority) finally: db.close() def get_agent(self, agent_id: str) -> Optional[BaseAgent]: - """Get an agent by ID""" db = SessionLocal() try: agent_db = db.query(Agent).filter(Agent.id == agent_id).first() - if agent_db: - return BaseAgent.from_model(agent_db) - return None + return BaseAgent.from_model(agent_db) if agent_db else None finally: db.close() - def list_rooms(self, user_id: int = None, include_agents: bool = True) -> List[Dict]: - """List all chat rooms""" + def list_rooms(self, user_id: int = None) -> List[Dict]: db = SessionLocal() try: - query = db.query(ChatRoom) + q = db.query(ChatRoom) if user_id: - query = query.filter(ChatRoom.owner_id == user_id) - rooms = query.order_by(ChatRoom.updated_at.desc()).all() - return [r.to_dict(include_agents=include_agents) for r in rooms] + q = q.filter(ChatRoom.owner_id == user_id) + return [r.to_dict() for r in q.order_by(ChatRoom.updated_at.desc()).all()] finally: db.close() def create_room(self, name: str, owner_id: int, description: str = None, agent_ids: List[str] = None) -> Dict: - """Create a new chat room""" db = SessionLocal() try: room = ChatRoom( @@ -233,183 +85,174 @@ class ChatRoomService: ) db.add(room) - if agent_ids: - for agent_id in agent_ids: - room_agent = ChatRoomAgent( - id=str(uuid.uuid4()), - chat_room_id=room.id, - agent_id=agent_id - ) - db.add(room_agent) + # Record agents as join messages + for agent_id in (agent_ids or []): + msg = Message( + id=str(uuid.uuid4()), + room_id=room.id, + role="agent", + content=json.dumps({"type": "join", "agent_id": agent_id}), + sender_name=agent_id + ) + db.add(msg) db.commit() - return room.to_dict(include_agents=True) + return room.to_dict() finally: db.close() - def update_room(self, room_id: str, name: str = None, description: str = None, is_active: bool = None) -> Optional[Dict]: - """Update a chat room""" + def update_room(self, room_id: str, **kwargs) -> Optional[Dict]: db = SessionLocal() try: room = db.query(ChatRoom).filter(ChatRoom.id == room_id).first() if not room: return None - - if name is not None: - room.name = name - if description is not None: - room.description = description - if is_active is not None: - room.is_active = is_active - + for key, value in kwargs.items(): + if value is not None and hasattr(room, key): + setattr(room, key, value) db.commit() - return room.to_dict(include_agents=True) + return room.to_dict() finally: db.close() def delete_room(self, room_id: str) -> bool: - """Delete a chat room""" db = SessionLocal() try: room = db.query(ChatRoom).filter(ChatRoom.id == room_id).first() - if not room: - return False - db.delete(room) - db.commit() - return True - finally: - db.close() - - def add_agent_to_room(self, room_id: str, agent_id: str) -> bool: - """Add an agent to a chat room""" - db = SessionLocal() - try: - existing = db.query(ChatRoomAgent).filter( - ChatRoomAgent.chat_room_id == room_id, - ChatRoomAgent.agent_id == agent_id - ).first() - - if existing: - existing.is_active = True - else: - room_agent = ChatRoomAgent( - id=str(uuid.uuid4()), - chat_room_id=room_id, - agent_id=agent_id - ) - db.add(room_agent) - - db.commit() - return True - finally: - db.close() - - def remove_agent_from_room(self, room_id: str, agent_id: str) -> bool: - """Remove an agent from a chat room""" - db = SessionLocal() - try: - room_agent = db.query(ChatRoomAgent).filter( - ChatRoomAgent.chat_room_id == room_id, - ChatRoomAgent.agent_id == agent_id - ).first() - - if room_agent: - room_agent.is_active = False + if room: + db.delete(room) db.commit() return True return False finally: db.close() - def get_messages(self, room_id: str, limit: int = 50, before_id: str = None) -> List[Dict]: - """Get messages from a chat room""" + def add_participant( + self, room_id: str, agent_id: str = None, user_id: int = None + ) -> bool: db = SessionLocal() try: - query = db.query(ChatRoomMessage).filter( - ChatRoomMessage.room_id == room_id - ).order_by(ChatRoomMessage.created_at.desc()) - - if before_id: - before_msg = db.query(ChatRoomMessage).filter( - ChatRoomMessage.id == before_id - ).first() - if before_msg: - query = query.filter(ChatRoomMessage.created_at < before_msg.created_at) - - messages = query.limit(limit).all() - return [m.to_dict() for m in reversed(messages)] + role = "agent" if agent_id else "user" + sender_name = agent_id or f"user_{user_id}" + content_data = {"type": "join", "agent_id": agent_id} if agent_id else {"type": "join", "user_id": user_id} + + msg = Message( + id=str(uuid.uuid4()), + room_id=room_id, + role=role, + sender_name=sender_name, + content=json.dumps(content_data) + ) + db.add(msg) + db.commit() + return True finally: db.close() - def save_message(self, room_id: str, sender_type: str, sender_id: str, sender_name: str, content: str, - mentions: List[str] = None, parent_id: str = None, token_count: int = 0) -> Dict: - """Save a message to a chat room""" + def remove_participant(self, room_id: str, participant_id: str) -> bool: db = SessionLocal() try: - msg = ChatRoomMessage( + db.query(Message).filter( + Message.id == participant_id + ).delete() + db.commit() + return True + finally: + db.close() + + def get_messages(self, room_id: str, limit: int = 50, before_id: str = None) -> List[Dict]: + db = SessionLocal() + try: + q = db.query(Message).filter(Message.room_id == room_id).order_by(Message.created_at.desc()) + if before_id: + before = db.query(Message).filter(Message.id == before_id).first() + if before: + q = q.filter(Message.created_at < before.created_at) + return [m.to_dict() for m in reversed(q.limit(limit).all())] + finally: + db.close() + + def save_message( + self, + room_id: str, + role: str, + sender_name: str, + content: str, + mentions: List[str] = None, + token_count: int = 0 + ) -> Dict: + db = SessionLocal() + try: + msg = Message( id=str(uuid.uuid4()), room_id=room_id, - sender_type=sender_type, - sender_id=sender_id, + role=role, sender_name=sender_name, content=content, mentions=json.dumps(mentions) if mentions else None, - parent_id=parent_id, token_count=token_count ) db.add(msg) - + + # Update room updated_at room = db.query(ChatRoom).filter(ChatRoom.id == room_id).first() if room: from datetime import datetime room.updated_at = datetime.now() - + db.commit() return msg.to_dict() finally: db.close() - async def process_message(self, room_id: str, user_message: str, user_id: str, user_name: str, context: Dict = None) -> AsyncGenerator[Dict[str, Any], None]: - """Process a user message and dispatch to appropriate agents.""" + async def process_message( + self, room_id: str, user_message: str, sender_id: str, sender_name: str = None + ): room = self.get_room(room_id) if not room: yield {"event": "error", "data": {"content": "Chat room not found"}} return - room_agents = self.get_room_agents(room_id) - if not room_agents: - yield {"event": "error", "data": {"content": "No agents available in this room"}} + agents = self.get_room_agents(room_id) + if not agents: + yield {"event": "error", "data": {"content": "No agents available"}} return - dispatch_result = self.dispatcher.dispatch( - content=user_message, - room_agents=room_agents, - sender_id=user_id, - sender_type="user" - ) - - if not dispatch_result.should_respond: + # Check if sender is agent + from luxx.agents.registry import agent_registry + sender_is_agent = agent_registry.get(sender_id) is not None + + # Filter out sender if agent + if sender_is_agent: + agents = [a for a in agents if a.agent_id != sender_id] + + # Check mentions + import re + mentions = re.findall(r'@(\w+)', user_message) + triggered = [] + + if mentions: + name_map = {a.name.lower(): a for a in agents} + triggered = [name_map[n.lower()] for n in mentions if n.lower() in name_map] + + if not triggered: + triggered = [a for a in agents if a.auto_response] + triggered.sort(key=lambda a: a.priority) + + if not triggered: yield {"event": "no_response", "data": {"message": "No agents triggered"}} return + # Get history messages = self.get_messages(room_id, limit=20) - - agent_streams = {} - for agent in dispatch_result.triggered_agents: - stream = agent.stream_response( - user_message=user_message, - conversation_history=messages, - context=context - ) - agent_streams[agent.agent_id] = stream - - aggregator = ResponseAggregator(room_id) - async for event in aggregator.aggregate_stream(agent_streams): - yield event + + # Stream responses + for agent in triggered: + async for event in agent.stream_response(user_message, messages): + yield event + + self.save_message(room_id, "user", sender_id, sender_name, user_message) -# Global service instance +# Global instance chat_room_service = ChatRoomService() - -# Export for backward compatibility -dispatcher = chat_room_service.dispatcher diff --git a/luxx/services/room_ws.py b/luxx/services/room_ws.py index fc143a4..0302d7c 100644 --- a/luxx/services/room_ws.py +++ b/luxx/services/room_ws.py @@ -1,103 +1,82 @@ -"""WebSocket handler for Chat Rooms""" -import json -import asyncio +"""WebSocket handler for Chat Rooms - unified user and agent participants.""" import logging from typing import Dict, Set from fastapi import WebSocket, WebSocketDisconnect from luxx.services.room import chat_room_service +from luxx.services.participant import participant_service logger = logging.getLogger(__name__) -class ChatRoomConnectionManager: - """Manages WebSocket connections for chat rooms""" - +class ConnectionManager: def __init__(self): - self._room_connections: Dict[str, Set[WebSocket]] = {} - self._connection_rooms: Dict[WebSocket, str] = {} + self._rooms: Dict[str, Set[WebSocket]] = {} + self._info: Dict[WebSocket, Dict] = {} - async def connect(self, websocket: WebSocket, room_id: str): - """Connect a WebSocket to a chat room""" - await websocket.accept() + async def connect(self, ws: WebSocket, room_id: str, ptype: str, pid: str, pname: str): + await ws.accept() + self._rooms.setdefault(room_id, set()).add(ws) + self._info[ws] = {"type": ptype, "id": pid, "name": pname} + await ws.send_json({"event": "connected", "data": {"room_id": room_id, "type": ptype}}) - if room_id not in self._room_connections: - self._room_connections[room_id] = set() + def disconnect(self, ws: WebSocket): + info = self._info.pop(ws, {}) + room = self._rooms.get(self._info.get(ws, {}).get("id")) + if room: + room.discard(ws) + if not room: + del self._rooms[room] + return info - self._room_connections[room_id].add(websocket) - self._connection_rooms[websocket] = room_id + async def broadcast(self, room_id: str, msg: dict, exclude: WebSocket = None): + for ws in self._rooms.get(room_id, set()): + if ws != exclude: + try: + await ws.send_json(msg) + except: + self.disconnect(ws) - logger.info(f"WebSocket connected to room: {room_id}") - - await websocket.send_json({ - "event": "connected", - "data": {"room_id": room_id, "message": "Connected to chat room"} - }) - - def disconnect(self, websocket: WebSocket): - """Disconnect a WebSocket from its room""" - room_id = self._connection_rooms.pop(websocket, None) - if room_id and room_id in self._room_connections: - self._room_connections[room_id].discard(websocket) - if not self._room_connections[room_id]: - del self._room_connections[room_id] - logger.info(f"WebSocket disconnected from room: {room_id}") - - async def broadcast_to_room(self, room_id: str, message: dict, exclude: WebSocket = None): - """Broadcast a message to all connections in a room""" - if room_id not in self._room_connections: - return - - disconnected = [] - for connection in self._room_connections[room_id]: - if connection == exclude: - continue - try: - await connection.send_json(message) - except Exception as e: - logger.error(f"Failed to send to connection: {e}") - disconnected.append(connection) - - for conn in disconnected: - self.disconnect(conn) - - async def send_to_room(self, room_id: str, message: dict): - """Send a message to all connections in a room""" - await self.broadcast_to_room(room_id, message) - - def get_room_size(self, room_id: str) -> int: - """Get the number of connections in a room""" - return len(self._room_connections.get(room_id, set())) + def size(self, room_id: str) -> int: + return len(self._rooms.get(room_id, set())) -# Global connection manager -connection_manager = ChatRoomConnectionManager() +cm = ConnectionManager() -async def websocket_handler(websocket: WebSocket, room_id: str): - """Handle WebSocket connection for a chat room.""" - await connection_manager.connect(websocket, room_id) +async def websocket_handler(ws: WebSocket, room_id: str): + params = dict(ws.query_params) + ptype = params.get("participant_type", "user") + pid = params.get("participant_id", "") + pname = params.get("participant_name", "Anonymous") + + await cm.connect(ws, room_id, ptype, pid, pname) room = chat_room_service.get_room(room_id) if not room: - await websocket.send_json({"event": "error", "data": {"content": "Chat room not found"}}) - await websocket.close() + await ws.send_json({"event": "error", "data": {"content": "Room not found"}}) + await ws.close() return + if ptype == "agent" and pid: + agent = chat_room_service.get_agent(pid) + if agent: + participant_service.register_agent(agent) + try: - messages = chat_room_service.get_messages(room_id, limit=50) - await websocket.send_json({"event": "history", "data": {"messages": messages}}) + # Send history + await ws.send_json({"event": "history", "data": {"messages": chat_room_service.get_messages(room_id)}}) + await ws.send_json({"event": "agents", "data": { + "agents": [a.to_dict() for a in chat_room_service.get_room_agents(room_id)] + }}) - agents = chat_room_service.get_room_agents(room_id) - await websocket.send_json({"event": "agents", "data": {"agents": [a.to_dict() for a in agents]}}) - - await connection_manager.broadcast_to_room(room_id, { + await cm.broadcast(room_id, { "event": "system", - "data": {"content": "User joined the room", "type": "join"} - }, exclude=websocket) + "data": {"content": f"{pname} joined", "type": f"{ptype}_join"} + }, exclude=ws) while True: - data = await websocket.receive_json() + data = await ws.receive_json() action = data.get("action") if action == "send_message": @@ -105,87 +84,29 @@ async def websocket_handler(websocket: WebSocket, room_id: str): if not content: continue - user_id = str(data.get("user_id", "anonymous")) - user_name = data.get("user_name", "Anonymous") + sid = pid if ptype == "agent" else str(data.get("user_id", pid or "anonymous")) + sname = pname if ptype == "agent" else data.get("user_name", pname or "Anonymous") - msg = chat_room_service.save_message( - room_id=room_id, sender_type="user", sender_id=user_id, - sender_name=user_name, content=content - ) - - await connection_manager.broadcast_to_room(room_id, {"event": "message", "data": msg}) - - agents = chat_room_service.get_room_agents(room_id) - for agent in agents: - await connection_manager.broadcast_to_room(room_id, { - "event": "typing", - "data": {"agent_id": agent.agent_id, "agent_name": agent.name, "is_typing": True} - }) - - context = {"user_id": user_id, "username": user_name} - logger.info(f"[ROOM_WS] Starting process_message, agents count: {len(agents)}") - - event_count = 0 - async for event in chat_room_service.process_message( - room_id=room_id, user_message=content, user_id=user_id, - user_name=user_name, context=context + async for event in participant_service.process_message( + room_id, content, sid, sname, ptype ): - event_count += 1 - logger.info(f"[ROOM_WS] Received event {event_count}: {event.get('event')}") - if event.get("event") in ["process_step", "done", "error"]: - # Find agent_name from agents list - agent_name = None - for agent in agents: - if agent.agent_id == event.get("agent_id"): - agent_name = agent.name - break - await connection_manager.broadcast_to_room(room_id, { - "event": event.get("event"), + await cm.broadcast(room_id, { + "event": event["event"], "data": event.get("data", {}), - "agent_id": event.get("agent_id"), - "agent_name": agent_name + "agent_id": event.get("agent_id") }) - if event.get("event") == "done": - # Find agent_name from agents list - agent_name = None - for agent in agents: - if agent.agent_id == event.get("agent_id"): - agent_name = agent.name - break - chat_room_service.save_message( - room_id=room_id, sender_type="agent", - sender_id=event.get("agent_id"), - sender_name=agent_name, - content=event.get("data", {}).get("content", "") if isinstance(event.get("data"), dict) else "", - token_count=event.get("data", {}).get("token_count", 0) if isinstance(event.get("data"), dict) else 0 - ) - else: - logger.info(f"[ROOM_WS] Skipping event: {event.get('event')}") - - logger.info(f"[ROOM_WS] process_message completed, total events: {event_count}") - - for agent in agents: - await connection_manager.broadcast_to_room(room_id, { - "event": "typing", - "data": {"agent_id": agent.agent_id, "agent_name": agent.name, "is_typing": False} - }) - elif action == "ping": - await websocket.send_json({"event": "pong", "data": {}}) + await ws.send_json({"event": "pong", "data": {}}) except WebSocketDisconnect: - logger.info(f"WebSocket disconnected from room: {room_id}") - connection_manager.disconnect(websocket) - await connection_manager.broadcast_to_room(room_id, { - "event": "system", - "data": {"content": "User left the room", "type": "leave"} - }) + await cm.broadcast(room_id, {"event": "system", "data": {"content": f"{pname} left", "type": "leave"}}) except Exception as e: - logger.error(f"WebSocket error in room {room_id}: {e}") - connection_manager.disconnect(websocket) - await connection_manager.broadcast_to_room(room_id, { - "event": "error", - "data": {"content": str(e)} - }) + logger.error(f"WebSocket error: {e}") + await cm.broadcast(room_id, {"event": "error", "data": {"content": str(e)}}) + finally: + cm.disconnect(ws) + + +connection_manager = cm