diff --git a/assets/API.md b/assets/API.md index b6ca15f..24aec97 100644 --- a/assets/API.md +++ b/assets/API.md @@ -18,7 +18,7 @@ ```json { "success": true, - "message": "注册成功", + "message": "Registration successful", "data": { "id": 1, "username": "string" @@ -41,19 +41,31 @@ ```json { "success": true, - "message": "登录成功", + "message": "Login successful", "data": { "access_token": "eyJ...", "token_type": "bearer", "user": { "id": 1, "username": "string", - "role": "user" + "email": "user@example.com", + "role": "user", + "permission_level": 1, + "workspace_path": null, + "is_active": true } } } ``` +**用户权限级别:** +| 级别 | 名称 | 说明 | +|------|------|------| +| 1 | READ_ONLY | 只读权限 | +| 2 | WRITE | 写入权限 | +| 3 | EXECUTE | 执行权限 | +| 4 | ADMIN | 管理员权限 | + ### POST /api/auth/logout 用户登出 @@ -63,7 +75,7 @@ ```json { "success": true, - "message": "登出成功" + "message": "Logout successful" } ``` @@ -81,11 +93,39 @@ "username": "string", "email": "user@example.com", "role": "user", - "is_active": true + "permission_level": 1, + "workspace_path": null, + "is_active": true, + "created_at": "2024-01-01T00:00:00" } } ``` +### GET /api/auth/users +获取所有用户(管理员专用) + +**请求头:** `Authorization: Bearer ` + +**响应:** +```json +{ + "success": true, + "data": { + "users": [...] + } +} +``` + +### PUT /api/auth/users/{user_id} +更新用户权限(管理员专用) + +**请求体:** +```json +{ + "permission_level": 2 +} +``` + --- ## 会话 `/api/conversations` @@ -123,10 +163,11 @@ { "project_id": "string (可选)", "title": "新会话", - "model": "glm-5", - "system_prompt": "string (可选)", - "temperature": 1.0, - "max_tokens": 65536, + "model": "deepseek-chat", + "provider_id": 1, + "system_prompt": "You are a helpful assistant. (可选)", + "temperature": 0.7, + "max_tokens": 2000, "thinking_enabled": false } ``` @@ -139,9 +180,15 @@ "data": { "id": "conv_xxx", "user_id": 1, + "provider_id": 1, "title": "新会话", - "model": "glm-5", - ... + "model": "deepseek-chat", + "system_prompt": "You are a helpful assistant.", + "temperature": 0.7, + "max_tokens": 2000, + "thinking_enabled": false, + "created_at": "2024-01-01T00:00:00", + "updated_at": "2024-01-01T00:00:00" } } ``` @@ -149,32 +196,92 @@ ### GET /api/conversations/{id} 获取会话详情 -**路径参数:** -- `id`: 会话ID +**路径参数:** `id`: 会话ID **请求头:** `Authorization: Bearer ` ### PUT /api/conversations/{id} 更新会话 +**路径参数:** `id`: 会话ID + +**请求头:** `Authorization: Bearer ` + +**请求体:** +```json +{ + "title": "新标题", + "model": "gpt-4", + "provider_id": 1, + "system_prompt": "You are...", + "temperature": 0.8, + "max_tokens": 4000, + "thinking_enabled": true +} +``` + ### DELETE /api/conversations/{id} 删除会话 +**路径参数:** `id`: 会话ID + +**请求头:** `Authorization: Bearer ` + --- ## 消息 `/api/messages` -### GET /api/messages/{conversation_id} +### GET /api/messages/ 获取消息列表 -**路径参数:** -- `conversation_id`: 会话ID - **查询参数:** +- `conversation_id`: 会话ID(必需) - `limit` (可选): 返回数量,默认100 **请求头:** `Authorization: Bearer ` +**响应:** +```json +{ + "success": true, + "data": { + "messages": [ + { + "id": "msg_xxx", + "conversation_id": "conv_xxx", + "role": "user", + "content": "用户消息", + "text": "用户消息", + "attachments": [], + "process_steps": [], + "token_count": 10, + "usage": null, + "created_at": "2024-01-01T00:00:00" + }, + { + "id": "msg_yyy", + "conversation_id": "conv_xxx", + "role": "assistant", + "content": "AI 回复文本内容", + "text": "AI 回复文本内容", + "attachments": [], + "process_steps": [ + {"id": "step-0", "index": 0, "type": "thinking", "content": "让我思考..."}, + {"id": "step-1", "index": 1, "type": "text", "content": "根据搜索结果..."}, + {"id": "step-2", "index": 2, "type": "tool_call", "id_ref": "call_xxx", "name": "web_search", "arguments": "..."}, + {"id": "step-3", "index": 3, "type": "tool_result", "id_ref": "call_xxx", "name": "web_search", "content": "...", "success": true} + ], + "token_count": 100, + "usage": {"prompt_tokens": 50, "completion_tokens": 50, "total_tokens": 100}, + "created_at": "2024-01-01T00:00:01" + } + ], + "title": "会话标题", + "first_message": "用户的第一条消息..." + } +} +``` + ### POST /api/messages/ 发送消息(非流式) @@ -185,7 +292,7 @@ { "conversation_id": "conv_xxx", "content": "用户消息", - "tools_enabled": true + "thinking_enabled": false } ``` @@ -201,20 +308,182 @@ ``` ### POST /api/messages/stream -发送消息(流式响应) +发送消息(流式响应 - SSE) 使用 Server-Sent Events (SSE) 返回流式响应。 -**事件类型:** -- `text`: 文本增量 -- `tool_call`: 工具调用 -- `tool_result`: 工具结果 -- `done`: 完成 -- `error`: 错误 +**请求头:** `Authorization: Bearer ` -### DELETE /api/messages/{id} +**请求体:** +```json +{ + "conversation_id": "conv_xxx", + "content": "用户消息", + "thinking_enabled": true, + "enabled_tools": ["web_search", "file_read", "python_execute"] +} +``` + +**SSE 事件类型:** + +#### process_step +结构化步骤事件(渲染顺序的唯一数据源) + +```json +event: process_step +data: {"step": {"id": "step-0", "index": 0, "type": "thinking", "content": "让我思考一下..."}} + +event: process_step +data: {"step": {"id": "step-1", "index": 1, "type": "text", "content": "以下是搜索结果:"}} + +event: process_step +data: {"step": {"id": "step-2", "index": 2, "type": "tool_call", "id_ref": "call_abc", "name": "web_search", "arguments": "{\"query\": \"...\"}"}} + +event: process_step +data: {"step": {"id": "step-3", "index": 3, "type": "tool_result", "id_ref": "call_abc", "name": "web_search", "content": "{...}", "success": true}} +``` + +**步骤类型说明:** + +| type | 说明 | 额外字段 | +|------|------|---------| +| `thinking` | 模型思考过程 | `content` | +| `text` | 文本回复 | `content` | +| `tool_call` | 工具调用 | `id_ref`, `name`, `arguments` | +| `tool_result` | 工具执行结果 | `id_ref`, `name`, `content`, `success` | + +#### done +响应完成 + +```json +event: done +data: {"message_id": "msg_xxx", "token_count": 150, "usage": {"prompt_tokens": 100, "completion_tokens": 50, "total_tokens": 150}} +``` + +#### error +错误信息 + +```json +event: error +data: {"content": "错误信息描述"} +``` + +### DELETE /api/messages/{message_id} 删除消息 +**路径参数:** `message_id`: 消息ID + +**请求头:** `Authorization: Bearer ` + +--- + +## LLM 提供商 `/api/providers` + +### GET /api/providers/ +获取用户的 LLM 提供商列表 + +**请求头:** `Authorization: Bearer ` + +**响应:** +```json +{ + "success": true, + "data": { + "providers": [ + { + "id": 1, + "user_id": 1, + "name": "DeepSeek", + "provider_type": "openai", + "base_url": "https://api.deepseek.com/v1", + "default_model": "deepseek-chat", + "max_tokens": 8192, + "is_default": true, + "enabled": true, + "created_at": "2024-01-01T00:00:00", + "updated_at": "2024-01-01T00:00:00" + } + ], + "total": 1 + } +} +``` + +### POST /api/providers/ +创建 LLM 提供商 + +**请求头:** `Authorization: Bearer ` + +**请求体:** +```json +{ + "name": "DeepSeek", + "provider_type": "openai", + "base_url": "https://api.deepseek.com/v1", + "api_key": "sk-xxxx", + "default_model": "deepseek-chat", + "is_default": true +} +``` + +**provider_type 可选值:** +- `openai` - OpenAI/DeepSeek/GLM 兼容 API +- `anthropic` - Anthropic Claude API + +### GET /api/providers/{provider_id} +获取提供商详情 + +**路径参数:** `provider_id`: 提供商ID + +**请求头:** `Authorization: Bearer ` + +### PUT /api/providers/{provider_id} +更新提供商 + +**路径参数:** `provider_id`: 提供商ID + +**请求头:** `Authorization: Bearer ` + +**请求体:** +```json +{ + "name": "新名称", + "base_url": "https://api.example.com/v1", + "api_key": "sk-yyyy", + "default_model": "gpt-4", + "max_tokens": 16384, + "is_default": false, + "enabled": true +} +``` + +### DELETE /api/providers/{provider_id} +删除提供商 + +**路径参数:** `provider_id`: 提供商ID + +**请求头:** `Authorization: Bearer ` + +### POST /api/providers/{provider_id}/test +测试提供商连接 + +**路径参数:** `provider_id`: 提供商ID + +**请求头:** `Authorization: Bearer ` + +**响应:** +```json +{ + "success": true, + "message": "HTTP 200: ...", + "data": { + "status_code": 200, + "success": true, + "response_body": "..." + } +} +``` + --- ## 工具 `/api/tools` @@ -223,7 +492,7 @@ 获取可用工具列表 **查询参数:** -- `category` (可选): 工具分类 +- `category` (可选): 工具分类(code/file/shell/crawler/data) **请求头:** `Authorization: Bearer ` @@ -232,12 +501,21 @@ { "success": true, "data": { - "tools": [...], + "tools": [ + { + "name": "python_execute", + "description": "Execute Python code", + "category": "code", + "parameters": {...} + }, + ... + ], "categorized": { - "crawler": [...], "code": [...], - "data": [...], - "weather": [...] + "file": [...], + "shell": [...], + "crawler": [...], + "data": [...] }, "total": 11 } @@ -247,9 +525,39 @@ ### GET /api/tools/{name} 获取工具详情 +**路径参数:** `name`: 工具名称 + +**请求头:** `Authorization: Bearer ` + +**响应:** +```json +{ + "success": true, + "data": { + "name": "web_search", + "description": "Search the web using DuckDuckGo", + "category": "crawler", + "parameters": { + "type": "object", + "properties": { + "query": { + "type": "string", + "description": "Search query" + } + }, + "required": ["query"] + } + } +} +``` + ### POST /api/tools/{name}/execute 手动执行工具 +**路径参数:** `name`: 工具名称 + +**请求头:** `Authorization: Bearer ` + **请求体:** ```json { @@ -257,3 +565,89 @@ "arg2": "value2" } ``` + +**响应:** +```json +{ + "success": true, + "data": { + "result": "..." + } +} +``` + +--- + +## 公共端点 + +### GET /api/health +健康检查 + +**响应:** +```json +{ + "status": "ok", + "message": "Luxx API is running" +} +``` + +### GET /api/ +服务信息 + +**响应:** +```json +{ + "name": "Luxx", + "version": "1.0.0", + "description": "AI Chat API" +} +``` + +--- + +## 工具说明 + +### 内置工具 + +#### 代码执行 (code) +| 工具 | 功能 | 权限 | +|------|------|------| +| `python_execute` | 执行 Python 代码 | EXECUTE | +| `python_eval` | 计算表达式 | EXECUTE | + +#### 文件操作 (file) +| 工具 | 功能 | 权限 | +|------|------|------| +| `file_read` | 读取文件内容 | READ_ONLY | +| `file_write` | 写入文件内容 | WRITE | +| `file_list` | 列出目录内容 | READ_ONLY | +| `file_exists` | 检查文件是否存在 | READ_ONLY | +| `file_grep` | 正则搜索文件 | READ_ONLY | + +#### Shell 命令 (shell) +| 工具 | 功能 | 权限 | +|------|------|------| +| `shell_execute` | 执行 Shell 命令 | EXECUTE | + +#### 网页爬虫 (crawler) +| 工具 | 功能 | 权限 | +|------|------|------| +| `web_search` | DuckDuckGo 搜索 | READ_ONLY | +| `web_fetch` | 网页抓取 | READ_ONLY | +| `batch_fetch` | 批量并发抓取 | READ_ONLY | + +#### 数据处理 (data) +| 工具 | 功能 | 权限 | +|------|------|------| +| `process_data` | JSON 转换、格式化 | READ_ONLY | + +### 权限检查 + +工具执行时自动检查用户权限: + +``` +工具要求的权限 <= 用户拥有的权限 → 允许执行 +工具要求的权限 > 用户拥有的权限 → 返回错误 +``` + +用户通过 `/api/auth/users/{user_id}` 接口设置权限级别。 diff --git a/assets/ARCHITECTURE.md b/assets/ARCHITECTURE.md index 66c54d0..ba55a5b 100644 --- a/assets/ARCHITECTURE.md +++ b/assets/ARCHITECTURE.md @@ -17,39 +17,50 @@ ``` luxx/ -├── __init__.py # FastAPI 应用工厂 -├── config.py # 配置管理(YAML) -├── database.py # 数据库连接 -├── models.py # ORM 模型 -├── routes/ # API 路由层 -│ ├── __init__.py # 路由聚合 -│ ├── auth.py # 认证 (登录/注册) -│ ├── conversations.py # 会话管理 (CRUD) -│ ├── messages.py # 消息处理 (流式/同步) -│ ├── providers.py # LLM 提供商管理 -│ └── tools.py # 工具管理 -├── services/ # 服务层 -│ ├── chat.py # 聊天服务门面 -│ ├── agentic_loop.py # Agentic Loop 执行器 -│ ├── stream_context.py# 流式状态管理 -│ ├── llm_response.py # LLM 响应解析器 -│ ├── process_result.py# 处理结果 -│ └── llm_client.py # LLM 客户端 -├── tools/ # 工具系统 -│ ├── core.py # 核心类 (ToolRegistry, ToolDefinition, ToolResult) -│ ├── factory.py # @tool 装饰器 -│ ├── executor.py # 工具执行器 (缓存/并行) -│ ├── services.py # 工具服务层 -│ └── builtin/ # 内置工具 -│ ├── __init__.py # 工具注册入口 -│ ├── code.py # 代码执行 (python_execute, python_eval) -│ ├── crawler.py # 网页爬虫 (web_search, web_fetch, batch_fetch) -│ └── data.py # 数据处理 (process_data) -└── utils/ # 工具函数 - └── helpers.py # 密码哈希、ID生成、响应封装 +├── __init__.py # FastAPI 应用工厂 +├── config.py # 配置管理(YAML) +├── database.py # 数据库连接 +├── models.py # ORM 模型 +├── routes/ # API 路由层 +│ ├── __init__.py # 路由聚合 +│ ├── auth.py # 认证 (登录/注册) +│ ├── conversations.py # 会话管理 (CRUD) +│ ├── messages.py # 消息处理 (流式/同步) +│ ├── providers.py # LLM 提供商管理 +│ └── tools.py # 工具管理 +├── services/ # 服务层 +│ ├── __init__.py # 服务导出 +│ ├── chat.py # 聊天服务门面 +│ ├── agentic_loop.py # Agentic Loop 执行器 +│ ├── stream_context.py # 流式状态管理 +│ ├── llm_response.py # LLM 响应数据类 +│ ├── process_result.py # [已移除] +│ ├── task.py # 任务系统 (Task/TaskGraph/TaskService) +│ ├── llm_client.py # LLM 客户端 +│ └── llm_adapters/ # LLM API 适配器 +│ ├── __init__.py # 适配器导出 +│ ├── base.py # ProviderAdapter 基类 +│ ├── openai_adapter.py # OpenAI/DeepSeek/GLM 适配器 +│ └── anthropic_adapter.py # Anthropic Claude 适配器 +├── tools/ # 工具系统 +│ ├── __init__.py # 工具注册入口 +│ ├── core.py # 核心类 (ToolRegistry, ToolDefinition, ToolResult, ToolContext) +│ ├── factory.py # @tool 装饰器 +│ ├── executor.py # 工具执行器 (缓存/并行) +│ ├── services.py # 工具服务层 +│ └── builtin/ # 内置工具 +│ ├── __init__.py # 工具注册入口 +│ ├── code.py # 代码执行 (python_execute, python_eval) +│ ├── crawler.py # 网页爬虫 (web_search, web_fetch, batch_fetch) +│ ├── data.py # 数据处理 (process_data) +│ ├── file.py # 文件操作 (file_read, file_write, file_list, file_exists, file_grep) +│ └── shell.py # Shell 命令 (shell_execute) +└── utils/ # 工具函数 + ├── __init__.py + └── helpers.py # 密码哈希、ID生成、响应封装 -run.py # 应用入口文件 -config.yaml # 配置文件 +run.py # 应用入口文件 +config.yaml # 配置文件 ``` ## 核心组件 @@ -77,15 +88,36 @@ password: admin123 app: secret_key: ${APP_SECRET_KEY} debug: true + host: 0.0.0.0 + port: 8000 database: type: sqlite url: sqlite:///./chat.db +workspace: + root: ./workspaces # 用户工作空间根目录 + auto_create: true # 自动创建用户目录 + llm: provider: deepseek api_key: ${DEEPSEEK_API_KEY} api_url: https://api.deepseek.com/v1 + +tools: + enable_cache: true + cache_ttl: 300 + max_workers: 4 + max_iterations: 10 + +logging: + level: INFO +``` + +**工作空间隔离机制:** +- 每个用户的工作空间路径基于 `user_id` 的 SHA256 哈希值 +- 格式:`{workspace_root}/{hash_of_user_id}` +- 所有文件操作必须在用户工作空间内,防止路径穿越攻击 ``` ### 3. 数据库 (`database.py`) @@ -103,10 +135,12 @@ erDiagram string email UK string password_hash string role + int permission_level "1=READ_ONLY, 2=WRITE, 3=EXECUTE, 4=ADMIN" + string workspace_path "用户工作空间路径" boolean is_active datetime created_at } - + PROJECT { string id PK int user_id FK @@ -115,7 +149,7 @@ erDiagram datetime created_at datetime updated_at } - + CONVERSATION { string id PK int user_id FK @@ -130,7 +164,7 @@ erDiagram datetime created_at datetime updated_at } - + MESSAGE { string id PK string conversation_id FK @@ -140,7 +174,7 @@ erDiagram text usage "JSON 格式" datetime created_at } - + LLM_PROVIDER { int id PK int user_id FK @@ -155,7 +189,7 @@ erDiagram datetime created_at datetime updated_at } - + USER ||--o{ PROJECT : "has" USER ||--o{ CONVERSATION : "has" USER ||--o{ LLM_PROVIDER : "configures" @@ -164,6 +198,14 @@ erDiagram CONVERSATION ||--o{ MESSAGE : "has" ``` +**用户权限级别 (permission_level):** +| 级别 | 名称 | 说明 | +|------|------|------| +| 1 | READ_ONLY | 只读权限 | +| 2 | WRITE | 写入权限(文件写入) | +| 3 | EXECUTE | 执行权限(代码执行、Shell命令) | +| 4 | ADMIN | 管理员权限 | + ### Message Content JSON 结构 `content` 字段统一使用 JSON 格式存储: @@ -183,8 +225,6 @@ erDiagram ```json { - "text": "AI 回复的文本内容", - "tool_calls": [...], "steps": [ {"id": "step-0", "index": 0, "type": "thinking", "content": "..."}, {"id": "step-1", "index": 1, "type": "text", "content": "..."}, @@ -194,7 +234,9 @@ erDiagram } ``` -`steps` 字段是**渲染顺序的唯一数据源**,按 `index` 顺序排列。thinking、text、tool_call、tool_result 可以在多轮迭代中穿插出现。 +`steps` 字段是**唯一数据源**,按 `index` 顺序排列。thinking、text、tool_call、tool_result 可以在多轮迭代中穿插出现。 + +**注意**:`text` 和 `content` 字段通过解析 `steps` 中所有 `type: "text"` 的内容动态计算得出。 ### 5. 工具系统 @@ -206,9 +248,25 @@ classDiagram +dict parameters +Callable handler +str category + +CommandPermission required_permission +to_openai_format() dict } + class ToolContext { + +int user_id + +str username + +str workspace + +int user_permission_level + } + + class CommandPermission { + <> + READ_ONLY = 1 + WRITE = 2 + EXECUTE = 3 + ADMIN = 4 + } + class ToolResult { +bool success +Any data @@ -224,7 +282,7 @@ classDiagram +get(name) ToolDefinition? +list_all() List~dict~ +list_by_category(category) List~dict~ - +execute(name, arguments) dict + +execute(name, arguments, context) dict +remove(name) bool } @@ -243,14 +301,51 @@ classDiagram #### 内置工具 -| 工具 | 功能 | 说明 | +**代码执行 (code.py)** + +| 工具 | 功能 | 权限 | |------|------|------| -| `python_execute` | 执行 Python 代码 | 支持 print 输出、变量访问 | -| `python_eval` | 计算表达式 | 快速求值 | -| `web_search` | DuckDuckGo HTML | DuckDuckGo HTML 搜索 | -| `web_fetch` | 网页抓取 | httpx + BeautifulSoup,支持 text/links/structured | -| `batch_fetch` | 批量抓取 | 并发获取多个页面 | -| `process_data` | 数据处理 | JSON 转换、格式化等 | +| `python_execute` | 执行 Python 代码 | EXECUTE | +| `python_eval` | 计算表达式 | EXECUTE | + +**文件操作 (file.py)** + +| 工具 | 功能 | 权限 | +|------|------|------| +| `file_read` | 读取文件内容 | READ_ONLY | +| `file_write` | 写入文件内容 | WRITE | +| `file_list` | 列出目录内容 | READ_ONLY | +| `file_exists` | 检查文件是否存在 | READ_ONLY | +| `file_grep` | 正则搜索文件内容 | READ_ONLY | + +**Shell 命令 (shell.py)** + +| 工具 | 功能 | 权限 | +|------|------|------| +| `shell_execute` | 执行 Shell 命令 | EXECUTE | + +**网页爬虫 (crawler.py)** + +| 工具 | 功能 | 权限 | +|------|------|------| +| `web_search` | DuckDuckGo HTML 搜索 | READ_ONLY | +| `web_fetch` | 网页抓取 | READ_ONLY | +| `batch_fetch` | 批量并发抓取 | READ_ONLY | + +**数据处理 (data.py)** + +| 工具 | 功能 | 权限 | +|------|------|------| +| `process_data` | JSON 转换、格式化 | READ_ONLY | + +#### 权限检查机制 + +工具执行时自动检查用户权限: + +``` +工具要求的权限 <= 用户拥有的权限 → 允许执行 +工具要求的权限 > 用户拥有的权限 → 拒绝执行 +``` #### 工具开发规范 @@ -312,26 +407,82 @@ ToolExecutor 返回结果 ### 6. 服务层 -#### LLMResponseParser (`services/llm_response.py`) -统一解析器,兼容多种 LLM API 格式: -- **OpenAI**: `delta.content`, `delta.tool_calls` -- **DeepSeek**: `delta.content`, `delta.reasoning_content` -- **Anthropic**: `content_block` 类型事件 -- **MiniMax**: `<|im_start|>thinking...<|im_end|>` 标签 +#### LLM 适配器 (`services/llm_adapters/`) + +适配器模式统一处理不同 LLM API 格式: + +```mermaid +classDiagram + class ProviderAdapter { + <> + +str provider_type + +build_request() tuple + +parse_stream_chunk() AsyncGenerator + +parse_response() Dict + +supports_thinking() bool + +supports_tools() bool + } + + class OpenAIAdapter { + +str provider_type = "openai" + +build_request() tuple + +parse_stream_chunk() AsyncGenerator + +parse_response() Dict + +supports_tools() bool + } + + class AnthropicAdapter { + +str provider_type = "anthropic" + +build_request() tuple + +parse_stream_chunk() AsyncGenerator + +parse_response() Dict + +supports_thinking() bool + +supports_tools() bool + } + + ProviderAdapter <|-- OpenAIAdapter + ProviderAdapter <|-- AnthropicAdapter +``` + +**支持的功能对比:** + +| 适配器 | 工具调用 | Thinking/Reasoning | 流式响应 | +|--------|----------|-------------------|----------| +| OpenAI | ✅ | ✅ (DeepSeek) | ✅ | +| Anthropic | ✅ | ✅ | ✅ | + +#### LLM 响应数据类 (`services/llm_response.py`) ```python -from luxx.services.llm_response import llm_parser +class StepType: + """步骤类型常量""" + THINKING = "thinking" + TEXT = "text" + TOOL_CALL = "tool_call" + TOOL_RESULT = "tool_result" -# 解析 OpenAI 格式 -parsed = llm_parser.parse_openai(delta) -# 解析 Anthropic 格式 -parsed = llm_parser.parse_anthropic(chunk) +@dataclass +class Step: + """单个步骤 - 用于存储和传输""" + id: str + index: int + type: str # thinking, text, tool_call, tool_result + content: str = "" + name: str = "" # tool_call/tool_result + arguments: str = "" # tool_call + id_ref: str = "" # tool_result + success: bool = True -# 返回 ParsedDelta -parsed.thinking # 思考内容 -parsed.text # 文本内容 -parsed.tool_calls # 工具调用 + +@dataclass +class ParsedDelta: + """LLM 流式响应增量""" + thinking: str = "" # 思考内容(增量) + text: str = "" # 文本内容(增量) + tool_call: Optional[Dict] = None # 单个工具调用 + usage: Dict[str, int] = {} # Token 用量 + is_complete: bool = False ``` #### ChatService (`services/chat.py`) @@ -340,30 +491,101 @@ parsed.tool_calls # 工具调用 - 流式 SSE 响应 - 工具调用编排(并行执行) - 消息历史管理 -- 自动重试机制 - Token 用量追踪 +- 工作空间上下文传递 #### AgenticLoop (`services/agentic_loop.py`) 执行 Agentic Loop 的核心循环: -- 调用 LLM 获取响应 -- 使用 LLMResponseParser 解析响应 +- 调用 LLM 获取响应(流式) +- 解析 ParsedDelta,更新步骤状态 - 管理 thinking/text/tool_call/tool_result 步骤 - 工具并行执行 +- 最大迭代次数:10 + +```python +# 执行流程 +async for delta in llm.stream_call(...): + events = self._process_delta(delta, context, total_usage) + yield from events + +# 工具调用时 +tool_results = self.tool_executor.process_tool_calls_parallel(...) +messages.append({"role": "assistant", ...}) +messages.extend(tool_results) +``` #### StreamContext (`services/stream_context.py`) 流式状态管理: - 追踪当前步骤类型和索引 - 累积 thinking 和 text 内容 -- 管理 tool_calls 列表 +- 管理 tool_calls 列表和 tool_results - 生成 SSE 事件 +- 构建完整消息内容 #### LLMClient (`services/llm_client.py`) LLM API 客户端: -- 多提供商:DeepSeek、GLM、OpenAI +- 多提供商:OpenAI、DeepSeek、Anthropic +- 自动适配器选择 - 流式/同步调用 - 错误处理和重试 - Token 计数 +### 7. 任务系统 (`services/task.py`) + +用于自主任务执行和依赖管理: + +```mermaid +classDiagram + class Task { + +str id + +str name + +str goal + +TaskStatus status + +List~Step~ steps + +List~Task~ subtasks + } + + class Step { + +str id + +str name + +List~str~ depends_on + +StepStatus status + } + + class TaskGraph { + +topological_sort() List~Step~ + +get_ready_steps() List~Step~ + +detect_cycles() List~List~str~~ + +validate() tuple + } + + class TaskService { + +create_task() Task + +get_task() Task + +update_task_status() Task + +add_steps() List~Step~ + +build_graph() TaskGraph + } + + Task "1" o-- "*" Step + Task "1" o-- "*" Task + TaskService ..> TaskGraph +``` + +**任务状态 (TaskStatus):** +- `PENDING` - 待处理 +- `READY` - 就绪 +- `RUNNING` - 运行中 +- `BLOCK` - 阻塞 +- `TERMINATED` - 已终止 + +**步骤状态 (StepStatus):** +- `PENDING` - 待执行 +- `RUNNING` - 执行中 +- `COMPLETED` - 已完成 +- `FAILED` - 失败 +- `SKIPPED` - 跳过 + ### 7. 认证系统 (`routes/auth.py`) - JWT Bearer Token - Bcrypt 密码哈希 @@ -481,6 +703,10 @@ database: type: sqlite url: sqlite:///./chat.db +workspace: + root: ./workspaces # 用户工作空间根目录 + auto_create: true # 自动创建用户工作空间 + llm: provider: deepseek api_key: ${DEEPSEEK_API_KEY} @@ -491,6 +717,9 @@ tools: cache_ttl: 300 max_workers: 4 max_iterations: 10 + +logging: + level: INFO ``` ## 环境变量 @@ -501,6 +730,26 @@ tools: | `DEEPSEEK_API_KEY` | DeepSeek API | `sk-xxxx` | | `DATABASE_URL` | 数据库连接 | `sqlite:///./chat.db` | +## LLM 适配器配置 + +### OpenAI 兼容 (DeepSeek/GLM 等) + +```yaml +llm: + provider: openai + api_key: ${API_KEY} + api_url: https://api.deepseek.com/v1 # 或其他兼容端点 +``` + +### Anthropic Claude + +```yaml +llm: + provider: anthropic + api_key: ${ANTHROPIC_API_KEY} + api_url: https://api.anthropic.com/v1 +``` + ## 项目结构说明 ### 入口文件 @@ -530,5 +779,21 @@ ToolExecutor 支持结果缓存: 1. 实时返回 thinking_content(模型思考过程) 2. 实时返回 text 增量更新 -3. 工具调用串行执行,结果批量返回 +3. 工具调用并行执行,结果批量返回 4. 最终 `done` 事件包含完整 message_id 和 token 用量 + +### 工作空间隔离 + +每个用户的工作空间完全隔离: +- 用户目录基于 user_id 的 SHA256 哈希生成 +- 所有文件操作强制在用户工作空间内 +- 支持权限级别控制文件操作能力 + +### MessageBuilder + +用于构建发送给 LLM 的消息列表: +- `add_system()` - 添加系统消息 +- `add_user()` - 添加用户消息(JSON 格式) +- `add_assistant()` - 添加助手消息 +- `add_tool_result()` - 添加工具结果消息 +- `extract_text()` - 从 JSON 内容中提取文本 diff --git a/luxx/models.py b/luxx/models.py index e2198a8..374e571 100644 --- a/luxx/models.py +++ b/luxx/models.py @@ -141,9 +141,9 @@ class Conversation(Base): class Message(Base): """Message model - + content 字段统一使用 JSON 格式存储: - + **User 消息:** { "text": "用户输入的文本内容", @@ -151,11 +151,9 @@ class Message(Base): {"name": "utils.py", "extension": "py", "content": "..."} ] } - + **Assistant 消息:** { - "text": "AI 回复的文本内容", - "tool_calls": [...], // 遗留的扁平结构 "steps": [ // 有序步骤,用于渲染(主要数据源) {"id": "step-0", "index": 0, "type": "thinking", "content": "..."}, {"id": "step-1", "index": 1, "type": "text", "content": "..."}, @@ -163,6 +161,8 @@ class Message(Base): {"id": "step-3", "index": 3, "type": "tool_result", "id_ref": "call_xxx", "name": "...", "content": "..."} ] } + + 注意:to_dict() 返回时会从 steps 动态计算 text 和 content 字段。 """ __tablename__ = "messages" @@ -204,20 +204,22 @@ class Message(Base): result["content"] = self.content result["text"] = self.content result["attachments"] = [] - result["tool_calls"] = [] result["process_steps"] = [] return result - - # Extract common fields - result["text"] = content_obj.get("text", "") - result["attachments"] = content_obj.get("attachments", []) - result["tool_calls"] = content_obj.get("tool_calls", []) - + # Extract steps as process_steps for frontend rendering - result["process_steps"] = content_obj.get("steps", []) - - # For backward compatibility - if "content" not in result: - result["content"] = result["text"] - + steps = content_obj.get("steps", []) + result["process_steps"] = steps + + # Extract text from steps (concatenate all text type steps) + text_content = "".join( + s.get("content", "") for s in steps + if s.get("type") == "text" + ) + result["text"] = text_content + result["content"] = text_content # Alias for convenience + + # Extract attachments + result["attachments"] = content_obj.get("attachments", []) + return result diff --git a/luxx/services/__init__.py b/luxx/services/__init__.py index a18b4b9..25abfe3 100644 --- a/luxx/services/__init__.py +++ b/luxx/services/__init__.py @@ -1,4 +1,4 @@ """Services module""" from luxx.services.llm_client import LLMClient -from luxx.services.llm_response import ParsedDelta, LLMResponse +from luxx.services.llm_response import ParsedDelta, Step, StepType from luxx.services.chat import ChatService, create_chat_service diff --git a/luxx/services/agentic_loop.py b/luxx/services/agentic_loop.py index d5ed504..8dafe43 100644 --- a/luxx/services/agentic_loop.py +++ b/luxx/services/agentic_loop.py @@ -1,13 +1,5 @@ -"""AgenticLoop - Executes the Agentic Loop: LLM + Tools iteration. +"""AgenticLoop - Executes the Agentic Loop: LLM + Tools iteration.""" -The loop: -1. Call LLM with messages and tools -2. Check for tool calls in response -3. Execute tools in parallel -4. Add results to messages -5. Repeat (max 10 iterations) -6. Return final response -""" import uuid import logging from typing import List, Dict, AsyncGenerator @@ -15,24 +7,17 @@ from typing import List, Dict, AsyncGenerator from luxx.tools.executor import ToolExecutor from luxx.services.llm_client import LLMClient from luxx.services.stream_context import StreamContext, _sse_event -from luxx.services.process_result import ProcessResult -from luxx.services.llm_response import ParsedDelta +from luxx.services.llm_response import ParsedDelta, StepType logger = logging.getLogger(__name__) -# Maximum iterations to prevent infinite loops MAX_ITERATIONS = 10 class AgenticLoop: - """Executes the Agentic Loop: LLM + Tools iteration. - - Supports multiple LLM Providers, auto-adapts response format. - """ - def __init__(self, tool_executor: ToolExecutor): self.tool_executor = tool_executor - + async def execute( self, llm: LLMClient, @@ -45,17 +30,12 @@ class AgenticLoop: context: 'StreamContext', tool_context: dict = None ) -> AsyncGenerator[str, None]: - """Execute the agentic loop. - - Yields SSE events for each step. - """ total_usage = {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0} - + for iteration in range(MAX_ITERATIONS): context.reset() has_error = False - - # Stream LLM response - now yields ParsedDelta directly + async for delta in llm.stream_call( model=model, messages=messages, @@ -64,156 +44,129 @@ class AgenticLoop: max_tokens=max_tokens, thinking_enabled=thinking_enabled ): - # Process parsed delta - result = self._process_delta(delta, context, total_usage) - - # Yield events - for event in result.events: + events = self._process_delta(delta, context, total_usage) + for event in events: yield event - - # Check for errors - if result.has_error: + + if not delta.has_content() and not delta.is_complete: has_error = True break - - # If error occurred, break the loop + if has_error: break - - # Finalize current step + + # Flush remaining content on complete + if delta.is_complete: + for event in self._flush_remaining(context): + yield event + context.finalize_step() - - # Check for tool calls + if context.tool_calls_list: - # Execute tools and yield events for event in self._execute_tools(context, messages, tool_context): yield event continue - - # No tools - complete + for event in self._complete(context, total_usage): yield event return - - # Max iterations exceeded or error occurred + if not has_error: yield _sse_event("error", {"content": "Exceeded maximum tool call iterations"}) - + def _process_delta( - self, - delta: ParsedDelta, - ctx: 'StreamContext', + self, + delta: ParsedDelta, + ctx: 'StreamContext', total_usage: dict - ) -> ProcessResult: - """Process ParsedDelta from adapter, return result with events and flags. - - Args: - delta: ParsedDelta from LLM adapter - ctx: StreamContext for state management - total_usage: Accumulated token usage - - Returns: - ProcessResult with events and flags - """ - result = ProcessResult() - - # Check for error (empty delta with no content) - if not delta.has_content() and not delta.is_complete: - # Empty delta, possibly an error - return result - - # Update usage + ) -> List[str]: + events = [] + if delta.usage: total_usage.update({ "prompt_tokens": delta.usage.get("prompt_tokens", 0), "completion_tokens": delta.usage.get("completion_tokens", 0), "total_tokens": delta.usage.get("total_tokens", 0) }) - - # Process thinking content (incremental) - if delta.thinking: - logger.debug(f"Processing thinking: {delta.thinking[:50]}...") - ctx.full_thinking += delta.thinking # Accumulate incremental content - if not ctx.current_step_id or ctx.current_step_type != "thinking": - ctx.start_step("thinking") - result.add_event(_sse_event("process_step", { - "step": { - "id": ctx.current_step_id, - "index": ctx.current_step_idx, - "type": "thinking", - "content": ctx.full_thinking - } - })) - result.set_content() - - # Process text content (incremental) - if delta.text: - ctx.full_content += delta.text # Accumulate incremental content - if not ctx.current_step_id or ctx.current_step_type != "text": - ctx.start_step("text") - result.add_event(_sse_event("process_step", { - "step": { - "id": ctx.current_step_id, - "index": ctx.current_step_idx, - "type": "text", - "content": ctx.full_content - } - })) - result.set_content() - - # Process tool calls - if delta.tool_calls: - for tc in delta.tool_calls: - ctx.accumulate_tool_call(tc) - result.set_tool_calls() - - return result - - def _execute_tools(self, ctx: 'StreamContext', messages: list, + + if delta.content: + result = ctx.process_content(delta.content) + if result["should_emit"]: + # Only emit if there's content + if result["thinking"]: + ctx.full_thinking += result["thinking"] + ctx.start_step(StepType.THINKING) + events.append(ctx.emit_thinking()) + + if result["text"]: + ctx.full_content += result["text"] + ctx.start_step(StepType.TEXT) + events.append(ctx.emit_text()) + + # Clear buffers after emit + ctx._thinking_buf = "" + ctx._text_buf = "" + + if delta.has_tool_call(): + ctx.accumulate_tool_call(delta.tool_call) + + return events + + def _execute_tools(self, ctx: 'StreamContext', messages: list, tool_context: dict = None) -> List[str]: - """Execute tools and return list of events.""" events = [] - - # Emit tool call steps + for event in ctx.emit_tool_calls(): events.append(event) - - # Execute in parallel + tool_results = self.tool_executor.process_tool_calls_parallel( ctx.tool_calls_list, tool_context or {} ) - - # Get tool call IDs for result linking + tool_ids = [tc.get("id") for tc in ctx.tool_calls_list] tool_step_ids = [ - s["id"] for s in ctx.all_steps - if s["type"] == "tool_call" and s.get("id_ref") in tool_ids + s.id for s in ctx.all_steps + if s.type == StepType.TOOL_CALL and s.id_ref in tool_ids ] - - # Emit tool result steps + for i, (tr, tc) in enumerate(zip(tool_results, ctx.tool_calls_list)): ref_id = tool_step_ids[i] if i < len(tool_step_ids) else f"step-{len(ctx.all_steps) - len(tool_results) + i}" _, event = ctx.emit_tool_result(tr, ref_id) events.append(event) - - # Prepare for next iteration + messages.append({ "role": "assistant", "content": ctx.full_content or "", "tool_calls": ctx.tool_calls_list }) messages.extend(ctx.all_tool_results[-len(tool_results):]) - + return events - + + def _flush_remaining(self, ctx: 'StreamContext') -> List[str]: + """Flush remaining buffers on complete.""" + events = [] + thinking, text = ctx.flush() + if thinking: + ctx.full_thinking += thinking + ctx.start_step(StepType.THINKING) + events.append(ctx.emit_thinking()) + ctx.finalize_step() + if text: + ctx.full_content += text + ctx.start_step(StepType.TEXT) + events.append(ctx.emit_text()) + ctx.finalize_step() + return events + def _complete(self, ctx: 'StreamContext', total_usage: dict) -> List[str]: - """Complete the loop and return list of events.""" + # Note: buffers already flushed in _flush_remaining or _process_delta token_count = total_usage.get("completion_tokens") or len(ctx.full_content) // 4 msg_id = str(uuid.uuid4()) logger.info(f"[TOKEN] usage={total_usage}, count={token_count}") - + ctx.set_completion(msg_id, token_count, total_usage) - + return [_sse_event("done", { "message_id": msg_id, "token_count": token_count, diff --git a/luxx/services/chat.py b/luxx/services/chat.py index a38f893..b5e3956 100644 --- a/luxx/services/chat.py +++ b/luxx/services/chat.py @@ -17,7 +17,6 @@ from luxx.tools.core import registry from luxx.services.llm_client import LLMClient from luxx.services.stream_context import StreamContext from luxx.services.agentic_loop import AgenticLoop -from luxx.config import config logger = logging.getLogger(__name__) @@ -199,15 +198,12 @@ class ChatService: ): yield event - # Save message after successful completion (only if we have content) - if ctx._last_message_id and (ctx.full_content or ctx.all_tool_calls): + # Save message after successful completion + if ctx._last_message_id and ctx.all_steps: self._save_message( conversation.id, ctx._last_message_id, - ctx.full_content, - ctx.all_tool_calls, - ctx.all_tool_results, - ctx.all_steps, + ctx.get_steps_for_save(), ctx._last_token_count, ctx._last_usage ) @@ -223,18 +219,22 @@ class ChatService: tools_enabled: bool = True, thinking_enabled: bool = False ) -> Dict[str, Any]: - """Non-streaming response for simple requests.""" + """Non-streaming response for simple requests. + + Note: For non-streaming, we return the raw LLM response. + Tool calls should be handled by the streaming endpoint. + """ try: messages = self.build_messages(conversation) messages.append({ "role": "user", "content": json.dumps({"text": user_message, "attachments": []}) }) - + tools = [] if not tools_enabled else None llm, max_tokens = get_llm_client(conversation) model = conversation.model or llm.default_model or "gpt-4" - + response = await llm.sync_call( model=model, messages=messages, @@ -243,14 +243,14 @@ class ChatService: max_tokens=max_tokens or 8192, thinking_enabled=thinking_enabled or conversation.thinking_enabled ) - + return { "success": True, - "content": response.content, - "tool_calls": response.tool_calls, - "usage": response.usage + "content": response.get("content", ""), + "tool_calls": response.get("tool_calls", []), + "usage": response.get("usage", {}) } - + except httpx.HTTPStatusError as e: error_msg = f"HTTP {e.response.status_code}: {e.response.text[:200] if e.response else 'No response body'}" logger.error(f"Non-stream HTTP error: {error_msg}") @@ -262,16 +262,13 @@ class ChatService: logger.error(f"Non-stream error: {type(e).__name__}: {e}\n{traceback.format_exc()}") return {"success": False, "error": f"{type(e).__name__}: {str(e)}"} - def _save_message(self, conversation_id: str, msg_id: str, full_content: str, - all_tool_calls: list, all_tool_results: list, all_steps: list, - token_count: int = 0, usage: dict = None): + def _save_message(self, conversation_id: str, msg_id: str, + all_steps: list, token_count: int = 0, usage: dict = None): """Save assistant message to database.""" from luxx.database import SessionLocal from luxx.models import Message - - content_json = {"text": full_content, "steps": all_steps} - if all_tool_calls: - content_json["tool_calls"] = all_tool_calls + + content_json = {"steps": all_steps} db = SessionLocal() try: diff --git a/luxx/services/llm_adapters/anthropic_adapter.py b/luxx/services/llm_adapters/anthropic_adapter.py index e316a95..353bf7a 100644 --- a/luxx/services/llm_adapters/anthropic_adapter.py +++ b/luxx/services/llm_adapters/anthropic_adapter.py @@ -2,38 +2,39 @@ Supports Anthropic Claude API streaming and non-streaming responses. """ + import json import logging from typing import Dict, List, Any, AsyncGenerator from .base import ProviderAdapter -from ..llm_response import ParsedDelta, LLMResponse +from ..llm_response import ParsedDelta logger = logging.getLogger(__name__) class AnthropicAdapter(ProviderAdapter): """Anthropic Claude API adapter - + Pure parsing adapter - no internal state management. Each parse_stream_chunk call returns incremental content. Accumulation is handled by the consumer (AgenticLoop). - + Anthropic API uses a completely different format from OpenAI: - Endpoint: POST /v1/messages - Streaming: SSE events (content_block_start, content_block_delta, etc.) - Thinking: Independent thinking type content block - Tools: tool_use type content block - + Reference: https://docs.anthropic.com/claude/reference/messages """ - + # Anthropic API endpoint suffix MESSAGES_PATH = "/v1/messages" - + # Anthropic API version ANTHROPIC_VERSION = "2023-06-01" - + # Content block types BLOCK_MESSAGE_START = "message_start" BLOCK_CONTENT_BLOCK_START = "content_block_start" @@ -42,24 +43,24 @@ class AnthropicAdapter(ProviderAdapter): BLOCK_MESSAGE_DELTA = "message_delta" BLOCK_MESSAGE_STOP = "message_stop" BLOCK_ERROR = "error" - + # Delta types DELTA_THINKING = "thinking_delta" DELTA_TEXT = "text_delta" DELTA_INPUT_JSON = "input_json_delta" - + # Content block subtypes SUBTYPE_THINKING = "thinking" SUBTYPE_TEXT = "text" SUBTYPE_TOOL_USE = "tool_use" - + def __init__(self): pass - + @property def provider_type(self) -> str: return "anthropic" - + def build_request( self, model: str, @@ -68,30 +69,30 @@ class AnthropicAdapter(ProviderAdapter): **kwargs ) -> tuple[Dict[str, Any], Dict[str, str]]: """Build Anthropic-format request - + Anthropic request format differs from OpenAI: - Uses "messages" instead of "message" - Requires "max_tokens" - Different tool format """ api_key = kwargs.get("api_key", "") - + headers = { "Content-Type": "application/json", "Authorization": f"Bearer {api_key}", "anthropic-version": self.ANTHROPIC_VERSION } - + # Convert messages to Anthropic format anthropic_messages = self._convert_messages(messages) - + body = { "model": model, "messages": anthropic_messages, "stream": kwargs.get("stream", True), "max_tokens": kwargs.get("max_tokens", 4096) } - + # System message if "system" in kwargs: body["system"] = kwargs["system"] @@ -101,51 +102,51 @@ class AnthropicAdapter(ProviderAdapter): if msg.get("role") == "system": body["system"] = msg.get("content", "") break - + # Thinking capability (Claude 3.5+) if kwargs.get("thinking_enabled"): body["thinking"] = { "type": "enabled", "budget_tokens": kwargs.get("thinking_budget_tokens", 10000) } - + # Tool definitions if tools: body["tools"] = self._convert_tools(tools) - + # Optional parameters if "temperature" in kwargs: body["temperature"] = kwargs["temperature"] - + if "top_p" in kwargs: body["top_p"] = kwargs["top_p"] - + if "stop_sequences" in kwargs: body["stop_sequences"] = kwargs["stop_sequences"] - + return body, headers - + def _convert_messages(self, messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Convert messages to Anthropic format - + Anthropic message format: - role: user, assistant - content: str or List[Dict] """ anthropic_messages = [] - + for msg in messages: role = msg.get("role", "user") - + # Convert role if role == "system": continue # System messages handled separately - + anthropic_msg = { "role": "user" if role == "user" else "assistant", "content": [] } - + # Handle content content = msg.get("content", "") if isinstance(content, str): @@ -165,15 +166,15 @@ class AnthropicAdapter(ProviderAdapter): anthropic_msg["content"].append(item) else: anthropic_msg["content"].append(str(item)) - + anthropic_messages.append(anthropic_msg) - + return anthropic_messages - + def _convert_tools(self, tools: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Convert tool definitions to Anthropic format""" anthropic_tools = [] - + for tool in tools: anthropic_tool = { "name": tool.get("name", ""), @@ -181,133 +182,131 @@ class AnthropicAdapter(ProviderAdapter): "input_schema": tool.get("parameters", {"type": "object", "properties": {}}) } anthropic_tools.append(anthropic_tool) - + return anthropic_tools - + def reset(self): """No-op for pure parsing adapter""" pass - + async def parse_stream_chunk( - self, + self, raw_chunk: str ) -> AsyncGenerator[ParsedDelta, None]: """Parse Anthropic-format SSE stream - + Returns incremental content - no accumulation. """ if not raw_chunk or raw_chunk.strip() == "": return - + try: chunk = json.loads(raw_chunk) except json.JSONDecodeError: return - + chunk_type = chunk.get("type", "") - + # Handle errors if chunk_type == self.BLOCK_ERROR: error_msg = chunk.get("error", {}).get("type", "unknown_error") logger.error(f"Anthropic API error: {error_msg}") yield ParsedDelta() return - + result = ParsedDelta() - + if chunk_type == self.BLOCK_MESSAGE_START: # Message start - no content yet pass - + elif chunk_type == self.BLOCK_CONTENT_BLOCK_START: # Content block start block = chunk.get("content_block", {}) block_type = block.get("type") - + if block_type == self.SUBTYPE_THINKING: # Thinking block start thinking_text = block.get("thinking", {}).get("thinking", "") result.thinking = thinking_text - + elif block_type == self.SUBTYPE_TOOL_USE: # Tool use block start tool_index = chunk.get("index", 0) tool_name = block.get("name", "") - result.tool_calls = [{ + result.tool_call = { "index": tool_index, "id": "", "type": "function", "function": {"name": tool_name, "arguments": ""} - }] - + } + elif block_type == self.SUBTYPE_TEXT: # Text block start - nothing to output yet pass - + elif chunk_type == self.BLOCK_CONTENT_BLOCK_DELTA: # Content block delta delta = chunk.get("delta", {}) delta_type = delta.get("type", "") - + if delta_type == self.DELTA_THINKING: # Thinking delta (incremental) thinking = delta.get("thinking", "") result.thinking = thinking - + elif delta_type == self.DELTA_TEXT: # Text delta (incremental) text = delta.get("text", "") result.text = text - + elif delta_type == self.DELTA_INPUT_JSON: # Tool arguments delta (incremental) partial_json = delta.get("partial_json", "") - # For tool calls, we need to update the arguments - # This is handled by the consumer (AgenticLoop) if partial_json: - result.tool_calls = [{ + result.tool_call = { "index": 0, "function": {"arguments": partial_json} - }] - + } + elif chunk_type == self.BLOCK_CONTENT_BLOCK_STOP: # Content block stop pass - + elif chunk_type == self.BLOCK_MESSAGE_DELTA: # Message delta (usually contains usage) delta = chunk.get("delta", {}) usage = chunk.get("usage", {}) - + result.usage = { "prompt_tokens": usage.get("input_tokens", 0), "completion_tokens": usage.get("output_tokens", 0), "total_tokens": usage.get("input_tokens", 0) + usage.get("output_tokens", 0) } - + # Check if complete by stop reason if delta.get("stop_reason"): result.is_complete = True - + elif chunk_type == self.BLOCK_MESSAGE_STOP: # Message stop result.is_complete = True - + # Yield result if there's any content if result.has_content() or result.is_complete: yield result - - def parse_response(self, data: Dict[str, Any]) -> LLMResponse: + + def parse_response(self, data: Dict[str, Any]) -> Dict: """Parse non-streaming response""" content = data.get("content", []) thinking = "" text_content = "" tool_calls = [] - + for block in content: if isinstance(block, dict): block_type = block.get("type") - + if block_type == "thinking": thinking = block.get("thinking", "") elif block_type == "text": @@ -318,19 +317,22 @@ class AnthropicAdapter(ProviderAdapter): "name": block.get("name", ""), "input": block.get("input", {}) }) - + usage = data.get("usage", {}) - - return LLMResponse( - content=text_content, - thinking=thinking, - tool_calls=tool_calls, - usage={ + + return { + "content": text_content, + "thinking": thinking, + "tool_calls": tool_calls, + "usage": { "prompt_tokens": usage.get("input_tokens", 0), "completion_tokens": usage.get("output_tokens", 0), "total_tokens": usage.get("input_tokens", 0) + usage.get("output_tokens", 0) } - ) - + } + def supports_thinking(self) -> bool: return True + + def supports_tools(self) -> bool: + return True diff --git a/luxx/services/llm_adapters/openai_adapter.py b/luxx/services/llm_adapters/openai_adapter.py index 722f4e1..587d90c 100644 --- a/luxx/services/llm_adapters/openai_adapter.py +++ b/luxx/services/llm_adapters/openai_adapter.py @@ -1,200 +1,86 @@ -"""OpenAI Adapter - OpenAI-compatible API adapter +"""OpenAI Adapter - OpenAI/DeepSeek/GLM/MiniMax compatible API adapter""" -Supports OpenAI, DeepSeek, GLM and other OpenAI-compatible APIs. -""" import json import logging -from typing import Dict, List, Any, AsyncGenerator, Optional +from typing import Dict, List, Any, AsyncGenerator from .base import ProviderAdapter -from ..llm_response import ParsedDelta, LLMResponse +from ..llm_response import ParsedDelta logger = logging.getLogger(__name__) class OpenAIAdapter(ProviderAdapter): - """OpenAI-compatible API adapter - - Pure parsing adapter - no internal state management. - Each parse_stream_chunk call returns incremental content. - Accumulation is handled by the consumer (AgenticLoop). - """ - + """OpenAI-compatible API adapter""" + + def __init__(self): + pass + @property def provider_type(self) -> str: return "openai" - - def __init__(self): - pass - - def build_request( - self, - model: str, - messages: List[Dict[str, Any]], - tools: List[Dict[str, Any]] = None, - **kwargs - ) -> tuple[Dict[str, Any], Dict[str, str]]: - """Build OpenAI-format request""" + + def build_request(self, model: str, messages: List[Dict], tools=None, **kwargs) -> tuple: api_key = kwargs.get("api_key", "") - - headers = { - "Content-Type": "application/json", - "Authorization": f"Bearer {api_key}" - } - - body = { - "model": model, - "messages": messages, - "stream": kwargs.get("stream", True) - } - - # Optional parameters + headers = {"Content-Type": "application/json", "Authorization": f"Bearer {api_key}"} + body = {"model": model, "messages": messages, "stream": kwargs.get("stream", True)} if "temperature" in kwargs: body["temperature"] = kwargs["temperature"] if "max_tokens" in kwargs: body["max_tokens"] = kwargs["max_tokens"] - if "top_p" in kwargs: - body["top_p"] = kwargs["top_p"] - if "frequency_penalty" in kwargs: - body["frequency_penalty"] = kwargs["frequency_penalty"] - if "presence_penalty" in kwargs: - body["presence_penalty"] = kwargs["presence_penalty"] - if "stop" in kwargs: - body["stop"] = kwargs["stop"] if tools: body["tools"] = tools - if kwargs.get("thinking_enabled"): - body["thinking_enabled"] = True - + body["tool_choice"] = "auto" return body, headers - + def reset(self): - """No-op for pure parsing adapter""" pass - - async def parse_stream_chunk( - self, - raw_chunk: str - ) -> AsyncGenerator[ParsedDelta, None]: - """Parse OpenAI-format SSE stream - - Returns incremental content - no accumulation. - """ - # Parse SSE line - event_type, data_str = self._parse_sse_line(raw_chunk) - - if not data_str or data_str == "[DONE]": - if data_str == "[DONE]": - yield ParsedDelta(is_complete=True) + + async def parse_stream_chunk(self, raw_chunk: str) -> AsyncGenerator[ParsedDelta, None]: + """Parse OpenAI/MiniMax format. Returns raw content for accumulation.""" + if not raw_chunk or not raw_chunk.strip(): return - + + chunk_str = raw_chunk.strip() + if chunk_str.startswith("data: "): + chunk_str = chunk_str[6:] + elif chunk_str.startswith("data:"): + chunk_str = chunk_str[5:] + + if chunk_str.strip() == "[DONE]": + yield ParsedDelta(is_complete=True) + return + try: - chunk = json.loads(data_str) + chunk = json.loads(chunk_str) except json.JSONDecodeError: return - - # Handle errors - if event_type == "error" or "error" in chunk: - yield ParsedDelta() + + choices = chunk.get("choices", []) + if not choices: return - - # Extract usage - usage = chunk.get("usage", {}) - - # Parse choices - for choice in chunk.get("choices", []): - delta = choice.get("delta", {}) - content = delta.get("content") or "" - - # Extract thinking tags if present - thinking, clean_text = self._extract_tags(content) - - # Tool calls - tool_calls = delta.get("tool_calls", []) - - # Check if this is the final delta - is_complete = bool(choice.get("finish_reason")) - - if thinking or clean_text or tool_calls or is_complete or usage: - yield ParsedDelta( - thinking=thinking, - text=clean_text, - tool_calls=tool_calls if tool_calls else [], - is_complete=is_complete, - usage=usage if usage else {} - ) - - def parse_response(self, data: Dict[str, Any]) -> LLMResponse: - """Parse non-streaming response""" - choice = data.get("choices", [{}])[0] - message = choice.get("message", {}) - - content = message.get("content", "") or "" - thinking, clean_content = self._extract_tags(content) - if not thinking: - thinking = message.get("reasoning_content") or "" - - tool_calls = message.get("tool_calls", []) - - usage = data.get("usage", {}) - - return LLMResponse( - content=clean_content, - thinking=thinking, - tool_calls=tool_calls, - usage=usage - ) - - def _parse_sse_line(self, line: str) -> tuple: - """Parse a single SSE line, return (event_type, data)""" - if line.startswith("event:"): - return line[6:].strip(), None - elif line.startswith("data:"): - return "", line[5:].strip() - return "", None - - def _extract_tags(self, content: str) -> tuple: - """Extract thinking tags and return (thinking, clean_text) - - Handles thinking tags that may be split across chunks: - - First in content closes any thinking block - - Everything before first is thinking - - Everything after first is clean text - """ + + delta = choices[0].get("delta", {}) + finish_reason = choices[0].get("finish_reason") + content = delta.get("content", "") + if not content: - return "", "" - - content_lower = content.lower() - - # Find first (marks end of thinking block) - end_idx = content_lower.find("") - - if end_idx != -1: - # Found end tag - split at this point - thinking_content = content[:end_idx].strip() - # Find if there's also a start tag before this - start_idx = content_lower.rfind("", 0, end_idx) - - if start_idx != -1: - # There's a complete thinking block - thinking = content[start_idx + 7:end_idx] - clean = content[end_idx + 9:] - else: - # No start tag - this is the end of a split thinking block - # Everything before was thinking - thinking = content[:end_idx] - clean = content[end_idx + 9:] - - return thinking, clean - - # No end tag found - # Check if there's a start tag - start_idx = content_lower.find("") - - if start_idx != -1: - # Has start tag but no end - all content after start is thinking - thinking = content[start_idx + 7:] - return thinking, "" - else: - # No tags at all - everything is clean - return "", content + if finish_reason is not None: + yield ParsedDelta(is_complete=True) + return + + yield ParsedDelta(content=content) + + def parse_response(self, data: Dict) -> Dict: + """Parse non-streaming response.""" + choices = data.get("choices", []) + if not choices: + return {"content": "", "tool_calls": [], "usage": {}} + message = choices[0].get("message", {}) + content = message.get("content", "") + tool_calls = message.get("tool_calls", []) + usage = data.get("usage", {}) + return {"content": content, "tool_calls": tool_calls, "usage": usage} + + def supports_tools(self) -> bool: + return True diff --git a/luxx/services/llm_client.py b/luxx/services/llm_client.py index 680c5a5..ee36d4c 100644 --- a/luxx/services/llm_client.py +++ b/luxx/services/llm_client.py @@ -8,16 +8,16 @@ Supports various LLM API formats: Usage: from luxx.services.llm_client import LLMClient - + # Auto-detect provider client = LLMClient(api_key="...", api_url="...") - + # Specify provider client = LLMClient(api_key="...", api_url="...", provider_type="anthropic") - + # Streaming call async for delta in client.stream_call(model, messages, tools=tools): - print(delta.text, delta.thinking, delta.tool_calls) + print(delta.text, delta.thinking, delta.tool_call) """ import json import logging @@ -32,7 +32,7 @@ from luxx.services.llm_adapters import ( OpenAIAdapter, AnthropicAdapter, ) -from luxx.services.llm_response import ParsedDelta, LLMResponse +from luxx.services.llm_response import ParsedDelta logger = logging.getLogger(__name__) @@ -160,30 +160,30 @@ class LLMClient: messages: List[Dict[str, Any]], tools: List[Dict[str, Any]] = None, **kwargs - ) -> LLMResponse: + ) -> Dict: """Synchronous call to LLM (non-streaming) - + Args: model: Model name messages: Message list tools: Tool definition list **kwargs: Other parameters (temperature, max_tokens, thinking_enabled, etc.) - + Returns: - LLMResponse object + Dict with keys: content, thinking, tool_calls, usage """ import asyncio return asyncio.get_event_loop().run_until_complete( self.async_sync_call(model, messages, tools, **kwargs) ) - + async def async_sync_call( self, model: str, messages: List[Dict[str, Any]], tools: List[Dict[str, Any]] = None, **kwargs - ) -> LLMResponse: + ) -> Dict: """Internal async sync call""" model = model or self.default_model kwargs["api_key"] = self.api_key @@ -259,9 +259,14 @@ class LLMClient: response.raise_for_status() async for line in response.aiter_lines(): - if line.strip(): - async for delta in self.adapter.parse_stream_chunk(line): - yield delta + # MiniMax may send multiple SSE events concatenated on one line + # Format: data: {...}\ndata: {...}\n + parts = line.split("data: ") + for part in parts: + part = part.strip() + if part and part != "[DONE]" and part.startswith("{"): + async for delta in self.adapter.parse_stream_chunk("data: " + part): + yield delta except httpx.HTTPStatusError as e: status_code = e.response.status_code if e.response else "?" diff --git a/luxx/services/llm_response.py b/luxx/services/llm_response.py index a31f403..ae35a73 100644 --- a/luxx/services/llm_response.py +++ b/luxx/services/llm_response.py @@ -1,65 +1,60 @@ -"""LLM Response - Unified message classes for LLM communication +"""LLM Response - Unified message classes for LLM communication""" -This module provides unified data classes for message passing throughout the LLM pipeline. -""" -from typing import Dict, Any, List, Optional from dataclasses import dataclass, field +from typing import Dict, Optional + + +class StepType: + """Step type constants""" + THINKING = "thinking" + TEXT = "text" + TOOL_CALL = "tool_call" + TOOL_RESULT = "tool_result" + + +@dataclass +class Step: + """Single step - used for storage and transport""" + id: str + index: int + type: str + content: str = "" + name: str = "" + arguments: str = "" + id_ref: str = "" + success: bool = True + + def to_dict(self) -> Dict: + return { + "id": self.id, + "index": self.index, + "type": self.type, + "content": self.content, + "name": self.name, + "arguments": self.arguments, + "id_ref": self.id_ref, + "success": self.success + } @dataclass class ParsedDelta: - """Streaming response delta - - Represents a single unit of streaming response data. - Used for streaming responses where content is accumulated incrementally. - - Attributes: - thinking: Accumulated thinking/reasoning content - text: Accumulated text content - tool_calls: List of tool call requests - is_complete: Whether this is the final delta - usage: Token usage statistics - """ - thinking: str = "" - text: str = "" - tool_calls: List[Dict] = field(default_factory=list) - is_complete: bool = False - usage: Dict[str, int] = field(default_factory=dict) - - def has_thinking(self) -> bool: - """Check if there's thinking content""" - return bool(self.thinking) - - def has_text(self) -> bool: - """Check if there's text content""" - return bool(self.text) - - def has_tool_calls(self) -> bool: - """Check if there are tool calls""" - return bool(self.tool_calls) - - def has_content(self) -> bool: - """Check if there's any content""" - return self.has_thinking() or self.has_text() or self.has_tool_calls() - - -@dataclass -class LLMResponse: - """Complete LLM response - - Represents a complete non-streaming response. - - Attributes: - content: Final text content - thinking: Final thinking content (if any) - tool_calls: List of tool calls (if any) - usage: Token usage statistics - """ + """LLM streaming response delta""" content: str = "" thinking: str = "" - tool_calls: List[Dict] = field(default_factory=list) - usage: Dict[str, int] = field(default=dict) - - def has_tool_calls(self) -> bool: - """Check if there are tool calls""" - return bool(self.tool_calls) + text: str = "" + tool_call: Optional[Dict] = None + usage: Dict[str, int] = field(default_factory=dict) + is_complete: bool = False + + def has_thinking(self) -> bool: + return bool(self.thinking) + + def has_text(self) -> bool: + return bool(self.text) + + def has_tool_call(self) -> bool: + return self.tool_call is not None + + def has_content(self) -> bool: + return bool(self.content) or self.has_thinking() or self.has_text() or self.has_tool_call() diff --git a/luxx/services/process_result.py b/luxx/services/process_result.py deleted file mode 100644 index 91cc555..0000000 --- a/luxx/services/process_result.py +++ /dev/null @@ -1,37 +0,0 @@ -"""ProcessResult - Result of processing an SSE line.""" - - -class ProcessResult: - """Result of processing an SSE line. - - Attributes: - events: List of SSE event strings to yield - has_error: Whether an error occurred - error_content: Error message if any - has_content: Whether content was received - has_tool_calls: Whether tool calls were received - """ - - def __init__(self): - self.events: list = [] - self.has_error: bool = False - self.error_content: str = "" - self.has_content: bool = False - self.has_tool_calls: bool = False - - def add_event(self, event: str): - """Add an event to the result.""" - self.events.append(event) - - def set_error(self, content: str): - """Set error state.""" - self.has_error = True - self.error_content = content - - def set_content(self): - """Mark that content was received.""" - self.has_content = True - - def set_tool_calls(self): - """Mark that tool calls were received.""" - self.has_tool_calls = True diff --git a/luxx/services/stream_context.py b/luxx/services/stream_context.py index 39debbf..c6f3caa 100644 --- a/luxx/services/stream_context.py +++ b/luxx/services/stream_context.py @@ -1,25 +1,19 @@ -"""StreamContext - Manages streaming state transitions during LLM response. +"""StreamContext - Manages streaming state transitions during LLM response.""" -Tracks steps in order: -- thinking: Model reasoning content -- text: Model response text -- tool_call: Tool invocation request -- tool_result: Tool execution result - -Each step has unique id and index for frontend rendering. -""" import json from typing import List, Dict, Optional +from luxx.services.llm_response import Step, StepType + +THINK_START = "" +THINK_END = "" + def _sse_event(event: str, data: dict) -> str: - """Format a Server-Sent Event string.""" return f"event: {event}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n" class StreamContext: - """Manages streaming state transitions during LLM response.""" - def __init__(self): self.step_index = 0 self.current_step_id = None @@ -27,46 +21,114 @@ class StreamContext: self.current_step_type = None self.full_content = "" self.full_thinking = "" - self.all_steps = [] - self.all_tool_calls = [] - self.all_tool_results = [] - self.tool_calls_list = [] + self.all_steps: List[Step] = [] + self.all_tool_results: List[Dict] = [] + self.tool_calls_list: List[Dict] = [] self._last_message_id = None self._last_token_count = 0 self._last_usage = None - + self._in_thinking = False + self._thinking_buf = "" + self._text_buf = "" + def reset(self): - """Reset state for new iteration.""" self.current_step_id = None self.current_step_idx = None self.current_step_type = None self.full_content = "" self.full_thinking = "" self.tool_calls_list = [] - + self._in_thinking = False + self._thinking_buf = "" + self._text_buf = "" + + def process_content(self, content: str) -> Dict: + """Process raw content, handling thinking tags. + + Returns dict with: + - thinking: accumulated thinking content (when thinking block ends) + - text: accumulated text content (when thinking block ends) + - should_emit: whether to emit a step + - thinking_only: whether only thinking was found (no text yet) + """ + if not content: + return {"thinking": "", "text": "", "should_emit": False, "thinking_only": False} + + thinking = "" + text = "" + should_emit = False + thinking_only = False + + # Check for thinking start + if THINK_START in content and not self._in_thinking: + self._in_thinking = True + idx = content.find(THINK_START) + len(THINK_START) + content = content[idx:] + + # Check for thinking end + if THINK_END in content: + idx = content.find(THINK_END) + # Extract thinking content + thinking_content = content[:idx] + self._thinking_buf += thinking_content + # Extract text after first + content = content[idx + len(THINK_END):] + + # Look for second (MiniMax format: 正文 正文) + if THINK_END in content: + second_idx = content.find(THINK_END) + text_content = content[:second_idx] + self._text_buf += text_content + content = content[second_idx + len(THINK_END):] + + self._in_thinking = False + should_emit = True + thinking_only = not bool(self._text_buf) + + # Accumulate to buffers + if self._in_thinking: + self._thinking_buf += content + else: + self._text_buf += content + + if should_emit: + thinking = self._thinking_buf + text = self._text_buf + + return { + "thinking": thinking, + "text": text, + "should_emit": should_emit, + "thinking_only": thinking_only + } + + def flush(self): + thinking = self._thinking_buf + text = self._text_buf + self._thinking_buf = "" + self._text_buf = "" + return thinking, text + def start_step(self, step_type: str) -> str: - """Start a new step with unique ID.""" self.current_step_idx = self.step_index self.current_step_id = f"step-{self.step_index}" self.current_step_type = step_type self.step_index += 1 return self.current_step_id - + def finalize_step(self): - """Save current step to all_steps.""" if self.current_step_id is None: return - - content = self.full_content if self.current_step_type == "text" else self.full_thinking - self.all_steps.append({ - "id": self.current_step_id, - "index": self.current_step_idx, - "type": self.current_step_type, - "content": content - }) - + content = self.full_content if self.current_step_type == StepType.TEXT else self.full_thinking + step = Step( + id=self.current_step_id, + index=self.current_step_idx, + type=self.current_step_type, + content=content + ) + self.all_steps.append(step) + def accumulate_tool_call(self, tc_delta: Dict): - """Accumulate tool call delta.""" idx = tc_delta.get("index", 0) if idx >= len(self.tool_calls_list): self.tool_calls_list.append({ @@ -74,39 +136,32 @@ class StreamContext: "type": "function", "function": {"name": "", "arguments": ""} }) - func = tc_delta.get("function", {}) if func.get("name"): self.tool_calls_list[idx]["function"]["name"] += func["name"] if func.get("arguments"): self.tool_calls_list[idx]["function"]["arguments"] += func["arguments"] - + def emit_tool_calls(self) -> List[str]: - """Emit tool call steps, return SSE events.""" events = [] for tc in self.tool_calls_list: step_id = f"step-{self.step_index}" self.step_index += 1 - - step = { - "id": step_id, - "index": self.step_index - 1, - "type": "tool_call", - "id_ref": tc.get("id", ""), - "name": tc["function"]["name"], - "arguments": tc["function"]["arguments"] - } + step = Step( + id=step_id, + index=self.step_index - 1, + type=StepType.TOOL_CALL, + name=tc["function"]["name"], + arguments=tc["function"]["arguments"], + id_ref=tc.get("id", "") + ) self.all_steps.append(step) - self.all_tool_calls.append(tc) - events.append(_sse_event("process_step", {"step": step})) - + events.append(_sse_event("process_step", {"step": step.to_dict()})) return events - + def emit_tool_result(self, result: Dict, ref_step_id: str) -> tuple: - """Emit tool result step, return (step, event).""" step_id = f"step-{self.step_index}" self.step_index += 1 - content = result.get("content", "") success = True try: @@ -115,33 +170,45 @@ class StreamContext: success = parsed.get("success", True) except (json.JSONDecodeError, TypeError): pass - - step = { - "id": step_id, - "index": self.step_index - 1, - "type": "tool_result", - "id_ref": ref_step_id, - "name": result.get("name", ""), - "content": content, - "success": success - } + step = Step( + id=step_id, + index=self.step_index - 1, + type=StepType.TOOL_RESULT, + name=result.get("name", ""), + content=content, + id_ref=ref_step_id, + success=success + ) self.all_steps.append(step) self.all_tool_results.append({ "role": "tool", "tool_call_id": result.get("tool_call_id", ""), "content": content }) - - return step, _sse_event("process_step", {"step": step}) - + return step, _sse_event("process_step", {"step": step.to_dict()}) + + def emit_thinking(self) -> str: + step = Step( + id=self.current_step_id, + index=self.current_step_idx, + type=StepType.THINKING, + content=self.full_thinking + ) + return _sse_event("process_step", {"step": step.to_dict()}) + + def emit_text(self) -> str: + step = Step( + id=self.current_step_id, + index=self.current_step_idx, + type=StepType.TEXT, + content=self.full_content + ) + return _sse_event("process_step", {"step": step.to_dict()}) + def set_completion(self, msg_id: str, token_count: int, usage: dict): - """Set completion info for saving.""" self._last_message_id = msg_id self._last_token_count = token_count self._last_usage = usage - - def reset_completion(self): - """Reset completion info.""" - self._last_message_id = None - self._last_token_count = 0 - self._last_usage = None + + def get_steps_for_save(self) -> List[Dict]: + return [step.to_dict() for step in self.all_steps]