diff --git a/assets/ARCHITECTURE.md b/assets/ARCHITECTURE.md index 06c423e..66c54d0 100644 --- a/assets/ARCHITECTURE.md +++ b/assets/ARCHITECTURE.md @@ -29,7 +29,11 @@ luxx/ │ ├── providers.py # LLM 提供商管理 │ └── tools.py # 工具管理 ├── services/ # 服务层 -│ ├── chat.py # 聊天服务 (Agentic Loop) +│ ├── chat.py # 聊天服务门面 +│ ├── agentic_loop.py # Agentic Loop 执行器 +│ ├── stream_context.py# 流式状态管理 +│ ├── llm_response.py # LLM 响应解析器 +│ ├── process_result.py# 处理结果 │ └── llm_client.py # LLM 客户端 ├── tools/ # 工具系统 │ ├── core.py # 核心类 (ToolRegistry, ToolDefinition, ToolResult) @@ -308,6 +312,28 @@ ToolExecutor 返回结果 ### 6. 服务层 +#### LLMResponseParser (`services/llm_response.py`) +统一解析器,兼容多种 LLM API 格式: +- **OpenAI**: `delta.content`, `delta.tool_calls` +- **DeepSeek**: `delta.content`, `delta.reasoning_content` +- **Anthropic**: `content_block` 类型事件 +- **MiniMax**: `<|im_start|>thinking...<|im_end|>` 标签 + +```python +from luxx.services.llm_response import llm_parser + +# 解析 OpenAI 格式 +parsed = llm_parser.parse_openai(delta) + +# 解析 Anthropic 格式 +parsed = llm_parser.parse_anthropic(chunk) + +# 返回 ParsedDelta +parsed.thinking # 思考内容 +parsed.text # 文本内容 +parsed.tool_calls # 工具调用 +``` + #### ChatService (`services/chat.py`) 核心聊天服务: - Agentic Loop 迭代执行(最多 10 轮) @@ -315,9 +341,22 @@ ToolExecutor 返回结果 - 工具调用编排(并行执行) - 消息历史管理 - 自动重试机制 -- 支持 thinking_content 提取 - Token 用量追踪 +#### AgenticLoop (`services/agentic_loop.py`) +执行 Agentic Loop 的核心循环: +- 调用 LLM 获取响应 +- 使用 LLMResponseParser 解析响应 +- 管理 thinking/text/tool_call/tool_result 步骤 +- 工具并行执行 + +#### StreamContext (`services/stream_context.py`) +流式状态管理: +- 追踪当前步骤类型和索引 +- 累积 thinking 和 text 内容 +- 管理 tool_calls 列表 +- 生成 SSE 事件 + #### LLMClient (`services/llm_client.py`) LLM API 客户端: - 多提供商:DeepSeek、GLM、OpenAI @@ -364,23 +403,30 @@ sequenceDiagram participant Client participant API as POST /messages/stream participant CS as ChatService + participant AL as AgenticLoop + participant Parser as LLMResponseParser participant LLM as LLM API participant TE as ToolExecutor Client->>API: POST {content, tools, thinking_enabled} API->>CS: stream_response() + CS->>AL: execute() loop MAX_ITERATIONS (10) - CS->>LLM: call(messages, tools) - LLM-->>CS: SSE Stream + AL->>LLM: stream_call(messages, tools) + LLM-->>AL: SSE Stream + + AL->>Parser: parse_chunk() + Parser-->>AL: ParsedDelta {thinking, text, tool_calls} alt tool_calls - CS->>TE: process_tool_calls_parallel() - TE-->>CS: tool_results - CS->>CS: 追加到 messages + AL->>TE: process_tool_calls_parallel() + TE-->>AL: tool_results + AL->>AL: 追加到 messages end end + AL->>CS: done event CS->>CS: _save_message() CS->>API: SSE Stream API-->>Client: 流式响应 diff --git a/luxx/services/__init__.py b/luxx/services/__init__.py index d4d0ab0..6cb90eb 100644 --- a/luxx/services/__init__.py +++ b/luxx/services/__init__.py @@ -1,3 +1,4 @@ """Services module""" from luxx.services.llm_client import LLMClient, llm_client, LLMResponse from luxx.services.chat import ChatService, chat_service +from luxx.services.llm_response import LLMResponseParser, llm_parser, ParsedDelta diff --git a/luxx/services/agentic_loop.py b/luxx/services/agentic_loop.py new file mode 100644 index 0000000..15fb2cb --- /dev/null +++ b/luxx/services/agentic_loop.py @@ -0,0 +1,275 @@ +"""AgenticLoop - Executes the Agentic Loop: LLM + Tools iteration. + +The loop: +1. Call LLM with messages and tools +2. Check for tool calls in response +3. Execute tools in parallel +4. Add results to messages +5. Repeat (max 10 iterations) +6. Return final response +""" +import json +import uuid +import logging +import traceback +from typing import List, Dict, Any, AsyncGenerator + +from luxx.tools.executor import ToolExecutor +from luxx.services.llm_client import LLMClient +from luxx.services.stream_context import StreamContext, _sse_event +from luxx.services.process_result import ProcessResult +from luxx.services.llm_response import llm_parser + +logger = logging.getLogger(__name__) + +# Maximum iterations to prevent infinite loops +MAX_ITERATIONS = 10 + + +def _parse_sse_line(line: str) -> tuple: + """Parse SSE line into (event_type, data_str).""" + event_type = None + data_str = None + for part in line.strip().split('\n'): + if part.startswith('event: '): + event_type = part[7:].strip() + elif part.startswith('data: '): + data_str = part[6:].strip() + return event_type, data_str + + +class AgenticLoop: + """Executes the Agentic Loop: LLM + Tools iteration.""" + + def __init__(self, tool_executor: ToolExecutor): + self.tool_executor = tool_executor + + async def execute( + self, + llm: LLMClient, + model: str, + messages: List[Dict], + tools: list, + temperature: float, + max_tokens: int, + thinking_enabled: bool, + context: 'StreamContext', + tool_context: dict = None + ) -> AsyncGenerator[str, None]: + """Execute the agentic loop. + + Yields SSE events for each step. + """ + total_usage = {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0} + + for iteration in range(MAX_ITERATIONS): + context.reset() + has_error = False + + # Stream LLM response + async for sse_line in llm.stream_call( + model=model, + messages=messages, + tools=tools, + temperature=temperature, + max_tokens=max_tokens, + thinking_enabled=thinking_enabled + ): + # Process stream line + result = self._process_stream_line(sse_line, context, total_usage) + + # Yield events + for event in result.events: + yield event + + # Check for errors + if result.has_error: + has_error = True + break + + # If error occurred, break the loop + if has_error: + break + + # Finalize current step + context.finalize_step() + + # Check for tool calls + if context.tool_calls_list: + # Execute tools and yield events + for event in self._execute_tools(context, messages, tool_context): + yield event + continue + + # No tools - complete + for event in self._complete(context, total_usage): + yield event + return + + # Max iterations exceeded or error occurred + if not has_error: + yield _sse_event("error", {"content": "Exceeded maximum tool call iterations"}) + + def _process_stream_line(self, sse_line: str, ctx: 'StreamContext', + total_usage: dict) -> ProcessResult: + """Process single SSE line from LLM, return result with events and flags.""" + result = ProcessResult() + event_type, data_str = _parse_sse_line(sse_line) + if not data_str: + return result + + # Handle upstream errors + if event_type == 'error': + try: + error_data = json.loads(data_str) + error_content = error_data.get("content", "Unknown error") + except json.JSONDecodeError: + error_content = data_str + result.set_error(error_content) + result.add_event(_sse_event("error", {"content": error_content})) + return result + + try: + chunk = json.loads(data_str) + except json.JSONDecodeError: + error_msg = f"Parse error: {data_str[:50]}" + result.set_error(error_msg) + result.add_event(_sse_event("error", {"content": error_msg})) + return result + + # Extract usage + if "usage" in chunk and chunk["usage"]: + usage = chunk["usage"] + total_usage.update({ + "prompt_tokens": usage.get("prompt_tokens", 0), + "completion_tokens": usage.get("completion_tokens", 0), + "total_tokens": usage.get("total_tokens", 0) + }) + + # Handle API errors + if "error" in chunk: + error_msg = chunk["error"].get("message", str(chunk["error"])) + result.set_error(error_msg) + result.add_event(_sse_event("error", {"content": f"API Error: {error_msg}"})) + return result + + # Get delta + choices = chunk.get("choices", []) + if not choices: + # Non-standard format: check for content directly + content = chunk.get("content") or "" + if content: + # Check for thinking tags in content + thinking_part, clean_text = llm_parser._extract_thinking_tags(content) + + if thinking_part: + ctx.full_thinking = (ctx.full_thinking or "") + thinking_part + if not ctx.current_step_id or ctx.current_step_type != "thinking": + ctx.start_step("thinking") + result.add_event(_sse_event("process_step", { + "step": {"id": ctx.current_step_id, "index": ctx.current_step_idx, "type": "thinking", "content": ctx.full_thinking} + })) + result.set_content() + + if clean_text: + ctx.full_content = (ctx.full_content or "") + clean_text + if not ctx.current_step_id or ctx.current_step_type != "text": + ctx.start_step("text") + result.add_event(_sse_event("process_step", { + "step": {"id": ctx.current_step_id, "index": ctx.current_step_idx, "type": "text", "content": ctx.full_content} + })) + result.set_content() + return result + + delta = choices[0].get("delta", {}) + + # Parse delta using unified parser + parsed = llm_parser.parse_openai(delta) + + # Process thinking content + if parsed.thinking: + ctx.full_thinking = parsed.thinking + if not ctx.current_step_id or ctx.current_step_type != "thinking": + ctx.start_step("thinking") + result.add_event(_sse_event("process_step", { + "step": { + "id": ctx.current_step_id, + "index": ctx.current_step_idx, + "type": "thinking", + "content": ctx.full_thinking + } + })) + result.set_content() + + # Process text content + if parsed.text: + ctx.full_content = parsed.text + if not ctx.current_step_id or ctx.current_step_type != "text": + ctx.start_step("text") + result.add_event(_sse_event("process_step", { + "step": { + "id": ctx.current_step_id, + "index": ctx.current_step_idx, + "type": "text", + "content": ctx.full_content + } + })) + result.set_content() + + # Accumulate tool calls + for tc in parsed.tool_calls or delta.get("tool_calls", []): + ctx.accumulate_tool_call(tc) + result.set_tool_calls() + + return result + + def _execute_tools(self, ctx: 'StreamContext', messages: list, + tool_context: dict = None) -> List[str]: + """Execute tools and return list of events.""" + events = [] + + # Emit tool call steps + for event in ctx.emit_tool_calls(): + events.append(event) + + # Execute in parallel + tool_results = self.tool_executor.process_tool_calls_parallel( + ctx.tool_calls_list, tool_context or {} + ) + + # Get tool call IDs for result linking + tool_ids = [tc.get("id") for tc in ctx.tool_calls_list] + tool_step_ids = [ + s["id"] for s in ctx.all_steps + if s["type"] == "tool_call" and s.get("id_ref") in tool_ids + ] + + # Emit tool result steps + for i, (tr, tc) in enumerate(zip(tool_results, ctx.tool_calls_list)): + ref_id = tool_step_ids[i] if i < len(tool_step_ids) else f"step-{len(ctx.all_steps) - len(tool_results) + i}" + _, event = ctx.emit_tool_result(tr, ref_id) + events.append(event) + + # Prepare for next iteration + messages.append({ + "role": "assistant", + "content": ctx.full_content or "", + "tool_calls": ctx.tool_calls_list + }) + messages.extend(ctx.all_tool_results[-len(tool_results):]) + + return events + + def _complete(self, ctx: 'StreamContext', total_usage: dict) -> List[str]: + """Complete the loop and return list of events.""" + token_count = total_usage.get("completion_tokens") or len(ctx.full_content) // 4 + msg_id = str(uuid.uuid4()) + logger.info(f"[TOKEN] usage={total_usage}, count={token_count}") + + ctx.set_completion(msg_id, token_count, total_usage) + + return [_sse_event("done", { + "message_id": msg_id, + "token_count": token_count, + "usage": total_usage + })] diff --git a/luxx/services/chat.py b/luxx/services/chat.py index a8dc3cc..3d67b72 100644 --- a/luxx/services/chat.py +++ b/luxx/services/chat.py @@ -1,34 +1,99 @@ -"""Chat service module""" -import json -import uuid -import logging -from typing import List, Dict, Any, AsyncGenerator, Optional +"""Chat service module with Agentic Loop pattern. + +This module provides the core chat service that orchestrates: +- StreamContext: Manages streaming state transitions +- MessageBuilder: Constructs message lists +- AgenticLoop: Executes the agentic loop (LLM + tools iteration) +- ChatService: Core chat service facade +""" +import json +import logging +import traceback +import httpx +from typing import List, Dict, Any, AsyncGenerator -from luxx.models import Conversation, Message from luxx.tools.executor import ToolExecutor from luxx.tools.core import registry from luxx.services.llm_client import LLMClient +from luxx.services.stream_context import StreamContext +from luxx.services.agentic_loop import AgenticLoop from luxx.config import config logger = logging.getLogger(__name__) -# Maximum iterations to prevent infinite loops -MAX_ITERATIONS = 10 -def _sse_event(event: str, data: dict) -> str: - """Format a Server-Sent Event string.""" - return f"event: {event}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n" +# ============== MessageBuilder ============== + +class MessageBuilder: + """Builds message lists for LLM requests.""" + + def __init__(self): + self.messages = [] + + def add_system(self, content: str) -> 'MessageBuilder': + """Add system message.""" + self.messages.append({"role": "system", "content": content}) + return self + + def add_user(self, content: str, attachments: list = None) -> 'MessageBuilder': + """Add user message in JSON format.""" + msg_content = json.dumps({ + "text": content, + "attachments": attachments or [] + }, ensure_ascii=False) + self.messages.append({"role": "user", "content": msg_content}) + return self + + def add_assistant(self, content: str, tool_calls: list = None) -> 'MessageBuilder': + """Add assistant message.""" + msg = {"role": "assistant", "content": content} + if tool_calls: + msg["tool_calls"] = tool_calls + self.messages.append(msg) + return self + + def add_tool_result(self, tool_call_id: str, content: str) -> 'MessageBuilder': + """Add tool result message.""" + self.messages.append({ + "role": "tool", + "tool_call_id": tool_call_id, + "content": content + }) + return self + + def build(self) -> List[Dict]: + """Build and return message list.""" + return self.messages.copy() + + @staticmethod + def extract_text(content: str) -> str: + """Extract text from message content (supports JSON format).""" + if not content: + return "" + try: + parsed = json.loads(content) + if isinstance(parsed, dict): + return parsed.get("text", content) + except (json.JSONDecodeError, TypeError): + pass + return content -def get_llm_client(conversation: Conversation = None): - """Get LLM client, optionally using conversation's provider. Returns (client, max_tokens)""" +# ============== Factory Function ============== + +def get_llm_client(conversation=None) -> tuple: + """Get LLM client based on conversation provider. Returns (client, max_tokens)""" + from luxx.models import LLMProvider + from luxx.database import SessionLocal + max_tokens = None + if conversation and conversation.provider_id: - from luxx.models import LLMProvider - from luxx.database import SessionLocal db = SessionLocal() try: - provider = db.query(LLMProvider).filter(LLMProvider.id == conversation.provider_id).first() + provider = db.query(LLMProvider).filter( + LLMProvider.id == conversation.provider_id + ).first() if provider: max_tokens = provider.max_tokens client = LLMClient( @@ -40,184 +105,27 @@ def get_llm_client(conversation: Conversation = None): finally: db.close() - # Fallback to global config - client = LLMClient() - return client, max_tokens + return LLMClient(), max_tokens -class StreamContext: - """Context for streaming response state management.""" - - def __init__( - self, - step_index: int = 0, - current_step_id: str = None, - current_step_idx: int = None, - current_stream_type: str = None, - full_content: str = "", - full_thinking: str = "" - ): - self.step_index = step_index - self.current_step_id = current_step_id - self.current_step_idx = current_step_idx - self.current_stream_type = current_stream_type - self.full_content = full_content - self.full_thinking = full_thinking - self.all_steps = [] - self.all_tool_calls = [] - self.all_tool_results = [] - self.tool_calls_list = [] - - def reset_iteration(self): - """Reset streaming step tracker for new iteration.""" - self.current_step_id = None - self.current_step_idx = None - self.current_stream_type = None - self.full_content = "" - self.full_thinking = "" - self.tool_calls_list = [] - - def start_stream_step(self, step_type: str) -> str: - """Start a new streaming step. Returns the step_id.""" - self.current_step_idx = self.step_index - self.current_step_id = f"step-{self.step_index}" - self.current_stream_type = step_type - self.step_index += 1 - return self.current_step_id - - def yield_stream_step(self, step_type: str, content: str) -> Dict[str, Any]: - """Yield a streaming step event.""" - return _sse_event("process_step", { - "step": { - "id": self.current_step_id, - "index": self.current_step_idx, - "type": step_type, - "content": content - } - }) - - def save_streaming_step(self): - """Save the current streaming step to all_steps.""" - if self.current_step_id is None: - return - - if self.current_stream_type == "thinking": - self.all_steps.append({ - "id": self.current_step_id, - "index": self.current_step_idx, - "type": "thinking", - "content": self.full_thinking - }) - elif self.current_stream_type == "text": - self.all_steps.append({ - "id": self.current_step_id, - "index": self.current_step_idx, - "type": "text", - "content": self.full_content - }) - - def handle_thinking_stream(self, delta: Dict) -> Optional[Dict]: - """Handle reasoning/thinking delta. Returns yield_obj if step was yielded.""" - reasoning = delta.get("reasoning_content", "") - if not reasoning: - return None - - prev_len = len(self.full_thinking) - self.full_thinking += reasoning - - if prev_len == 0: # New thinking stream started - self.start_stream_step("thinking") - - return self.yield_stream_step("thinking", self.full_thinking) - - def handle_text_stream(self, delta: Dict) -> Optional[Dict]: - """Handle content delta. Returns yield_obj if step was yielded.""" - content = delta.get("content", "") - if not content: - return None - - prev_len = len(self.full_content) - self.full_content += content - - if prev_len == 0: # New text stream started - self.start_stream_step("text") - - return self.yield_stream_step("text", self.full_content) - - def handle_tool_call(self) -> tuple: - """Handle tool calls. Returns (tool_call_step_ids, tool_call_steps, yield_objs).""" - tool_call_step_ids = [] - tool_call_steps = [] - yield_objs = [] - - for tc in self.tool_calls_list: - call_step_idx = self.step_index - call_step_id = f"step-{self.step_index}" - tool_call_step_ids.append(call_step_id) - self.step_index += 1 - - call_step = { - "id": call_step_id, - "index": call_step_idx, - "type": "tool_call", - "id_ref": tc.get("id", ""), - "name": tc["function"]["name"], - "arguments": tc["function"]["arguments"] - } - tool_call_steps.append(call_step) - yield_objs.append(_sse_event("process_step", {"step": call_step})) - - return tool_call_step_ids, tool_call_steps, yield_objs - - def handle_tool_result(self, tool_result: Dict, tool_call_step_id: str) -> tuple: - """Handle single tool result. Returns (result_step, yield_obj).""" - result_step_idx = self.step_index - result_step_id = f"step-{self.step_index}" - self.step_index += 1 - - content = tool_result.get("content", "") - success = True - try: - content_obj = json.loads(content) - if isinstance(content_obj, dict): - success = content_obj.get("success", True) - except: - pass - - result_step = { - "id": result_step_id, - "index": result_step_idx, - "type": "tool_result", - "id_ref": tool_call_step_id, - "name": tool_result.get("name", ""), - "content": content, - "success": success - } - return result_step, _sse_event("process_step", {"step": result_step}) - +# ============== ChatService ============== class ChatService: - """Chat service with tool support""" + """Core chat service with Agentic Loop support.""" def __init__(self): self.tool_executor = ToolExecutor() + self.agentic_loop = AgenticLoop(self.tool_executor) - def build_messages( - self, - conversation: Conversation, - include_system: bool = True - ) -> List[Dict[str, str]]: - """Build message list""" + def build_messages(self, conversation, include_system: bool = True) -> List[Dict]: + """Build message list from conversation history.""" from luxx.database import SessionLocal from luxx.models import Message messages = [] if include_system and conversation.system_prompt: - messages.append({ - "role": "system", - "content": conversation.system_prompt - }) + messages.append({"role": "system", "content": conversation.system_prompt}) db = SessionLocal() try: @@ -226,28 +134,23 @@ class ChatService: ).order_by(Message.created_at).all() for msg in db_messages: - # Parse JSON content if possible - try: - content_obj = json.loads(msg.content) if msg.content else {} - if isinstance(content_obj, dict): - content = content_obj.get("text", msg.content) - else: - content = msg.content - except (json.JSONDecodeError, TypeError): - content = msg.content - - messages.append({ - "role": msg.role, - "content": content - }) + content = MessageBuilder.extract_text(msg.content) + messages.append({"role": msg.role, "content": content}) finally: db.close() return messages + def _get_tools(self, enabled_tools: list) -> list: + """Filter tools based on enabled_tools list.""" + if not enabled_tools: + return [] + return [t for t in registry.list_all() + if t.get("function", {}).get("name") in enabled_tools] + async def stream_response( self, - conversation: Conversation, + conversation, user_message: str, thinking_enabled: bool = False, enabled_tools: list = None, @@ -255,256 +158,118 @@ class ChatService: username: str = None, workspace: str = None, user_permission_level: int = 1 - ) -> AsyncGenerator[Dict[str, str], None]: - """ - Streaming response generator - - Yields raw SSE event strings for direct forwarding. - """ + ) -> AsyncGenerator[str, None]: + """Streaming response with Agentic Loop.""" try: + # Build initial messages messages = self.build_messages(conversation) - messages.append({ "role": "user", "content": json.dumps({"text": user_message, "attachments": []}) }) - # Get tools based on enabled_tools filter - if enabled_tools: - tools = [t for t in registry.list_all() if t.get("function", {}).get("name") in enabled_tools] - else: - tools = [] - + # Get tools and LLM client + tools = self._get_tools(enabled_tools) llm, provider_max_tokens = get_llm_client(conversation) model = conversation.model or llm.default_model or "gpt-4" - # 直接使用 provider 的 max_tokens - max_tokens = provider_max_tokens + max_tokens = provider_max_tokens or 8192 - # Token usage tracking - total_usage = { - "prompt_tokens": 0, - "completion_tokens": 0, - "total_tokens": 0 + # Tool execution context + tool_context = { + "workspace": workspace, + "user_id": user_id, + "username": username, + "user_permission_level": user_permission_level } - actual_token_count = 0 - # Streaming context for state management + # Stream context ctx = StreamContext() - for iteration in range(MAX_ITERATIONS): - # Reset streaming context for this iteration - ctx.reset_iteration() - - async for sse_line in llm.stream_call( - model=model, - messages=messages, - tools=tools, - temperature=conversation.temperature, - max_tokens=max_tokens or 8192, - thinking_enabled=thinking_enabled or conversation.thinking_enabled - ): - # Parse SSE line - # Format: "event: xxx\ndata: {...}\n\n" - event_type = None - data_str = None - - for line in sse_line.strip().split('\n'): - if line.startswith('event: '): - event_type = line[7:].strip() - elif line.startswith('data: '): - data_str = line[6:].strip() - - if data_str is None: - continue - - # Handle error events from LLM - if event_type == 'error': - try: - error_data = json.loads(data_str) - yield _sse_event("error", {"content": error_data.get("content", "Unknown error")}) - except json.JSONDecodeError: - yield _sse_event("error", {"content": data_str}) - return - - # Parse the data - try: - chunk = json.loads(data_str) - except json.JSONDecodeError: - yield _sse_event("error", {"content": f"Failed to parse response: {data_str}"}) - return - - # 提取 API 返回的 usage 信息 - if "usage" in chunk: - usage = chunk["usage"] - total_usage["prompt_tokens"] = usage.get("prompt_tokens", 0) - total_usage["completion_tokens"] = usage.get("completion_tokens", 0) - total_usage["total_tokens"] = usage.get("total_tokens", 0) - - # Check for error in response - if "error" in chunk: - error_msg = chunk["error"].get("message", str(chunk["error"])) - yield _sse_event("error", {"content": f"API Error: {error_msg}"}) - return - - # Get delta - choices = chunk.get("choices", []) - if not choices: - # Check if there's any content in the response (for non-standard LLM responses) - if chunk.get("content") or chunk.get("message"): - content = chunk.get("content") or chunk.get("message", {}).get("content", "") - if content: - prev_len = len(ctx.full_content) - ctx.full_content += content - if prev_len == 0: # New text stream started - ctx.start_stream_step("text") - yield _sse_event("process_step", { - "step": { - "id": ctx.current_step_id if prev_len == 0 else f"step-{ctx.step_index - 1}", - "index": ctx.current_step_idx if prev_len == 0 else ctx.step_index - 1, - "type": "text", - "content": ctx.full_content - } - }) - continue - - delta = choices[0].get("delta", {}) - - # Handle reasoning (thinking) - yield_obj = ctx.handle_thinking_stream(delta) - if yield_obj: - yield yield_obj - - # Handle content - yield_obj = ctx.handle_text_stream(delta) - if yield_obj: - yield yield_obj - - # Accumulate tool calls - tool_calls_delta = delta.get("tool_calls", []) - for tc in tool_calls_delta: - idx = tc.get("index", 0) - if idx >= len(ctx.tool_calls_list): - ctx.tool_calls_list.append({ - "id": tc.get("id", ""), - "type": "function", - "function": {"name": "", "arguments": ""} - }) - func = tc.get("function", {}) - if func.get("name"): - ctx.tool_calls_list[idx]["function"]["name"] += func["name"] - if func.get("arguments"): - ctx.tool_calls_list[idx]["function"]["arguments"] += func["arguments"] - - # Save streaming step (thinking or text) - ctx.save_streaming_step() - - # Handle tool calls - if ctx.tool_calls_list: - ctx.all_tool_calls.extend(ctx.tool_calls_list) - - # Handle tool_call steps - tool_call_step_ids, tool_call_steps, yield_objs = ctx.handle_tool_call() - ctx.all_steps.extend(tool_call_steps) - for yield_obj in yield_objs: - yield yield_obj - - # Execute tools - tool_context = { - "workspace": workspace, - "user_id": user_id, - "username": username, - "user_permission_level": user_permission_level - } - tool_results = self.tool_executor.process_tool_calls_parallel( - ctx.tool_calls_list, tool_context - ) - - # Handle tool_result steps - for i, tr in enumerate(tool_results): - tool_call_step_id = tool_call_step_ids[i] if i < len(tool_call_step_ids) else f"step-{i}" - result_step, yield_obj = ctx.handle_tool_result(tr, tool_call_step_id) - ctx.all_steps.append(result_step) - yield yield_obj - - ctx.all_tool_results.append({ - "role": "tool", - "tool_call_id": tr.get("tool_call_id", ""), - "content": tr.get("content", "") - }) - - # Add assistant message with tool calls for next iteration - messages.append({ - "role": "assistant", - "content": ctx.full_content or "", - "tool_calls": ctx.tool_calls_list - }) - messages.extend(ctx.all_tool_results[-len(tool_results):]) - ctx.all_tool_results = [] - continue - - # No tool calls - final iteration, save message - msg_id = str(uuid.uuid4()) - - # 使用 API 返回的真实 completion_tokens,如果 API 没返回则降级使用估算值 - actual_token_count = total_usage.get("completion_tokens", 0) or len(ctx.full_content) // 4 - logger.info(f"[TOKEN] total_usage: {total_usage}, actual_token_count: {actual_token_count}") - - self._save_message( - conversation.id, - msg_id, - ctx.full_content, - ctx.all_tool_calls, - ctx.all_tool_results, - ctx.all_steps, - actual_token_count, - total_usage - ) - - yield _sse_event("done", { - "message_id": msg_id, - "token_count": actual_token_count, - "usage": total_usage - }) - return + # Execute agentic loop + async for event in self.agentic_loop.execute( + llm=llm, + model=model, + messages=messages, + tools=tools, + temperature=conversation.temperature, + max_tokens=max_tokens, + thinking_enabled=thinking_enabled or conversation.thinking_enabled, + context=ctx, + tool_context=tool_context + ): + yield event - # Max iterations exceeded - save message before error - if ctx.full_content or ctx.all_tool_calls: - msg_id = str(uuid.uuid4()) + # Save message after successful completion (only if we have content) + if ctx._last_message_id and (ctx.full_content or ctx.all_tool_calls): self._save_message( conversation.id, - msg_id, + ctx._last_message_id, ctx.full_content, ctx.all_tool_calls, ctx.all_tool_results, ctx.all_steps, - actual_token_count, - total_usage + ctx._last_token_count, + ctx._last_usage ) - yield _sse_event("error", {"content": "Exceeded maximum tool call iterations"}) except Exception as e: - logger.error(f"Stream error: {e}") + logger.error(f"Stream error: {e}\n{traceback.format_exc()}") yield _sse_event("error", {"content": str(e)}) - def _save_message( + async def non_stream_response( self, - conversation_id: str, - msg_id: str, - full_content: str, - all_tool_calls: list, - all_tool_results: list, - all_steps: list, - token_count: int = 0, - usage: dict = None - ): - """Save the assistant message to database.""" + conversation, + user_message: str, + tools_enabled: bool = True, + thinking_enabled: bool = False + ) -> Dict[str, Any]: + """Non-streaming response for simple requests.""" + try: + messages = self.build_messages(conversation) + messages.append({ + "role": "user", + "content": json.dumps({"text": user_message, "attachments": []}) + }) + + tools = [] if not tools_enabled else None + llm, max_tokens = get_llm_client(conversation) + model = conversation.model or llm.default_model or "gpt-4" + + response = await llm.sync_call( + model=model, + messages=messages, + tools=tools, + temperature=conversation.temperature, + max_tokens=max_tokens or 8192, + thinking_enabled=thinking_enabled or conversation.thinking_enabled + ) + + return { + "success": True, + "content": response.content, + "tool_calls": response.tool_calls, + "usage": response.usage + } + + except httpx.HTTPStatusError as e: + error_msg = f"HTTP {e.response.status_code}: {e.response.text[:200] if e.response else 'No response body'}" + logger.error(f"Non-stream HTTP error: {error_msg}") + return {"success": False, "error": error_msg} + except httpx.TimeoutException as e: + logger.error(f"Non-stream timeout: {e}") + return {"success": False, "error": "Request timeout"} + except Exception as e: + logger.error(f"Non-stream error: {type(e).__name__}: {e}\n{traceback.format_exc()}") + return {"success": False, "error": f"{type(e).__name__}: {str(e)}"} + + def _save_message(self, conversation_id: str, msg_id: str, full_content: str, + all_tool_calls: list, all_tool_results: list, all_steps: list, + token_count: int = 0, usage: dict = None): + """Save assistant message to database.""" from luxx.database import SessionLocal from luxx.models import Message - content_json = { - "text": full_content, - "steps": all_steps - } + content_json = {"text": full_content, "steps": all_steps} if all_tool_calls: content_json["tool_calls"] = all_tool_calls @@ -520,12 +285,18 @@ class ChatService: ) db.add(msg) db.commit() - except Exception as e: + except Exception: db.rollback() raise finally: db.close() -# Global chat service +def _sse_event(event: str, data: dict) -> str: + """Format a Server-Sent Event string.""" + return f"event: {event}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n" + + +# ============== Global Singleton ============== + chat_service = ChatService() diff --git a/luxx/services/llm_client.py b/luxx/services/llm_client.py index acec3eb..fac67a7 100644 --- a/luxx/services/llm_client.py +++ b/luxx/services/llm_client.py @@ -2,6 +2,7 @@ import json import httpx import logging +import traceback from typing import Dict, Any, Optional, List, AsyncGenerator from luxx.config import config @@ -172,7 +173,7 @@ class LLMClient: logger.error(f"HTTP error: {status_code}") yield f"event: error\ndata: {json.dumps({'content': f'HTTP {status_code}: Request failed'})}\n\n" except Exception as e: - logger.error(f"Exception: {type(e).__name__}: {str(e)}") + logger.error(f"Exception: {type(e).__name__}: {str(e)}\n{traceback.format_exc()}") yield f"event: error\ndata: {json.dumps({'content': str(e)})}\n\n" diff --git a/luxx/services/llm_response.py b/luxx/services/llm_response.py new file mode 100644 index 0000000..f98ec13 --- /dev/null +++ b/luxx/services/llm_response.py @@ -0,0 +1,309 @@ +"""LLM Response Parser - Unified parser for multiple LLM API formats. + +Supported Providers: +- OpenAI: delta.content, delta.tool_calls +- DeepSeek: delta.content, delta.reasoning_content, delta.tool_calls +- Anthropic: content_block with thinking/text types +- MiniMax: <|im_start|>thinking...<|im_end|> tags in content + +Data Flow: +``` +LLM API Response (SSE) + │ + ▼ +LLMResponseParser.parse_chunk() + │ + ├──► ParsedDelta { thinking, text, tool_calls } + │ + ▼ +AgenticLoop._process_stream_line() + │ + ▼ +SSE Events (process_step) + │ + ├──► type: "thinking" + ├──► type: "text" + └──► type: "tool_call" +``` + +API Response Formats: + +1. OpenAI Standard (DeepSeek, OpenAI): +```json +{ + "choices": [{ + "delta": { + "content": "Hello", + "reasoning_content": "Let me think...", + "tool_calls": [{"id": "call_1", "function": {...}}] + } + }] +} +``` + +2. Anthropic Streaming: +```json +{"type": "content_block_start", "content_block": {"type": "thinking", "thinking": "..."}} +{"type": "content_block_delta", "delta": {"type": "thinking_delta", "thinking": "..."}} +{"type": "content_block_delta", "delta": {"type": "text_delta", "text": "..."}} +{"type": "content_block_stop"} +``` + +3. MiniMax (with thinking tags in content): +```json +{ + "choices": [{ + "delta": { + "content": "<|im_start|>thinking分析中...<|im_end|>回复内容" + } + }] +} +``` + +4. Standard thinking tags: +```json +{ + "choices": [{ + "delta": { + "content": "思考内容回复内容" + } + }] +} +``` +""" +from typing import Dict, Any, Optional, List +from dataclasses import dataclass + + +@dataclass +class ParsedDelta: + """Parsed response delta from LLM. + + Attributes: + thinking: Thinking/reasoning content + text: Regular text content + tool_calls: Tool call requests + is_complete: Whether this delta completes a content block + """ + thinking: str = "" + text: str = "" + tool_calls: List[Dict] = None + is_complete: bool = False + + def __post_init__(self): + if self.tool_calls is None: + self.tool_calls = [] + + +class LLMResponseParser: + """Unified parser for LLM API response formats. + + Usage: + from luxx.services.llm_response import llm_parser + + # Parse OpenAI format + delta = {"content": "Hello", "reasoning_content": "Thinking..."} + parsed = llm_parser.parse_openai(delta) + + # Parse Anthropic format + chunk = {"type": "content_block_delta", "delta": {"type": "thinking_delta", "thinking": "..."}} + parsed = llm_parser.parse_anthropic(chunk) + + # Auto-detect format + parsed = llm_parser.parse_chunk(chunk, provider="anthropic") + """ + + # Content block types + BLOCK_THINKING = "thinking" + BLOCK_TEXT = "text" + BLOCK_TOOL_USE = "tool_use" + BLOCK_TOOL_RESULT = "tool_result" + + def __init__(self): + self._buffer = "" + self._thinking_buffer = "" + self._text_buffer = "" + + def reset(self): + """Reset parser state for new message.""" + self._buffer = "" + self._thinking_buffer = "" + self._text_buffer = "" + + def parse_openai(self, delta: Dict) -> ParsedDelta: + """Parse OpenAI format delta. + + Handles: + - OpenAI: delta.content, delta.tool_calls + - DeepSeek: delta.content, delta.reasoning_content, delta.tool_calls + - MiniMax: <|im_start|>thinking...<|im_end|> in content + - Standard: ... in content + + Args: + delta: Delta object from LLM API response + + Returns: + ParsedDelta with extracted thinking, text, and tool_calls + """ + result = ParsedDelta() + + # Get thinking content (DeepSeek uses reasoning_content) + thinking = delta.get("reasoning_content") or delta.get("reasoning") or "" + if thinking: + self._thinking_buffer += thinking + result.thinking = self._thinking_buffer + + # Get text content + text = delta.get("content") or "" + if text: + # Check for embedded thinking tags (MiniMax format) + thinking_part, clean_text = self._extract_thinking_tags(text) + if thinking_part: + self._thinking_buffer += thinking_part + result.thinking = self._thinking_buffer + if clean_text: + self._text_buffer += clean_text + result.text = self._text_buffer + elif thinking_part := delta.get("thinking"): + # Some providers use "thinking" field directly + self._thinking_buffer += thinking_part + result.thinking = self._thinking_buffer + + # Tool calls + result.tool_calls = delta.get("tool_calls") or [] + + return result + + def parse_anthropic(self, chunk: Dict) -> ParsedDelta: + """Parse Anthropic streaming format. + + Anthropic uses a different event structure: + - content_block_start: Begin a content block + - content_block_delta: Incremental content + - content_block_stop: End of content blocks + + Content block types: + - thinking: Model reasoning + - text: Regular text + - tool_use: Tool invocation + - tool_result: Tool output + + Args: + chunk: Anthropic SSE event chunk + + Returns: + ParsedDelta with extracted content + """ + result = ParsedDelta() + chunk_type = chunk.get("type", "") + + if chunk_type == "content_block_start": + block = chunk.get("content_block", {}) + if block.get("type") == self.BLOCK_THINKING: + thinking = block.get("thinking", "") + if thinking: + self._thinking_buffer = thinking + result.thinking = self._thinking_buffer + + elif chunk_type == "content_block_delta": + delta = chunk.get("delta", {}) + delta_type = delta.get("type", "") + + if delta_type == "thinking_delta": + thinking = delta.get("thinking", "") + self._thinking_buffer += thinking + result.thinking = self._thinking_buffer + + elif delta_type == "text_delta": + text = delta.get("text", "") + self._text_buffer += text + result.text = self._text_buffer + + elif delta_type == "partial_json": + # Partial JSON for tool calls + pass + + elif chunk_type == "content_block_stop": + result.is_complete = True + + return result + + def parse_chunk(self, chunk: Dict, provider: str = "openai") -> ParsedDelta: + """Parse chunk based on provider. + + Args: + chunk: Response chunk from LLM + provider: Provider name ("openai", "anthropic", "deepseek", "minimax") + + Returns: + ParsedDelta with extracted content + """ + if provider == "anthropic": + return self.parse_anthropic(chunk) + + # Default to OpenAI format + return self.parse_openai(chunk) + + def _extract_thinking_tags(self, content: str) -> tuple: + """Extract thinking content from tags. + + Handles multiple tag formats: + - MiniMax: <|im_start|>thinking...<|im_end|> + - Standard: ... + + Args: + content: Raw content string from LLM + + Returns: + Tuple of (thinking_content, clean_text) + """ + thinking_parts = [] + clean_parts = [] + i = 0 + + while i < len(content): + remaining = content[i:].lower() + + # Check for MiniMax format + if remaining.startswith("<|im_start|>thinking"): + end_tag = "<|im_end|>" + start = i + 21 # len("<|im_start|>thinking") + end = content.find(end_tag, start) + if end != -1: + thinking_parts.append(content[start:end]) + i = end + len(end_tag) + continue + + # Check for standard format + if remaining.startswith(""): + end_tag = "" + start = i + 7 # len("") + end = content.find(end_tag, start) + if end != -1: + thinking_parts.append(content[start:end]) + i = end + len(end_tag) + continue + + # Regular character + clean_parts.append(content[i]) + i += 1 + + return "".join(thinking_parts), "".join(clean_parts) + + def has_thinking_tags(self, content: str) -> bool: + """Check if content contains thinking tags. + + Args: + content: Raw content string + + Returns: + True if content contains thinking tags + """ + if not content: + return False + lower = content.lower() + return "<|im_start|>thinking" in lower or "" in lower + + +# Global parser instance +llm_parser = LLMResponseParser() diff --git a/luxx/services/process_result.py b/luxx/services/process_result.py new file mode 100644 index 0000000..91cc555 --- /dev/null +++ b/luxx/services/process_result.py @@ -0,0 +1,37 @@ +"""ProcessResult - Result of processing an SSE line.""" + + +class ProcessResult: + """Result of processing an SSE line. + + Attributes: + events: List of SSE event strings to yield + has_error: Whether an error occurred + error_content: Error message if any + has_content: Whether content was received + has_tool_calls: Whether tool calls were received + """ + + def __init__(self): + self.events: list = [] + self.has_error: bool = False + self.error_content: str = "" + self.has_content: bool = False + self.has_tool_calls: bool = False + + def add_event(self, event: str): + """Add an event to the result.""" + self.events.append(event) + + def set_error(self, content: str): + """Set error state.""" + self.has_error = True + self.error_content = content + + def set_content(self): + """Mark that content was received.""" + self.has_content = True + + def set_tool_calls(self): + """Mark that tool calls were received.""" + self.has_tool_calls = True diff --git a/luxx/services/stream_context.py b/luxx/services/stream_context.py new file mode 100644 index 0000000..7ebde9a --- /dev/null +++ b/luxx/services/stream_context.py @@ -0,0 +1,185 @@ +"""StreamContext - Manages streaming state transitions during LLM response. + +Tracks steps in order: +- thinking: Model reasoning content +- text: Model response text +- tool_call: Tool invocation request +- tool_result: Tool execution result + +Each step has unique id and index for frontend rendering. +""" +import json +from typing import List, Dict, Optional + + +def _sse_event(event: str, data: dict) -> str: + """Format a Server-Sent Event string.""" + return f"event: {event}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n" + + +class StreamContext: + """Manages streaming state transitions during LLM response.""" + + def __init__(self): + self.step_index = 0 + self.current_step_id = None + self.current_step_idx = None + self.current_step_type = None + self.full_content = "" + self.full_thinking = "" + self.all_steps = [] + self.all_tool_calls = [] + self.all_tool_results = [] + self.tool_calls_list = [] + self._last_message_id = None + self._last_token_count = 0 + self._last_usage = None + + def reset(self): + """Reset state for new iteration.""" + self.current_step_id = None + self.current_step_idx = None + self.current_step_type = None + self.full_content = "" + self.full_thinking = "" + self.tool_calls_list = [] + + def start_step(self, step_type: str) -> str: + """Start a new step with unique ID.""" + self.current_step_idx = self.step_index + self.current_step_id = f"step-{self.step_index}" + self.current_step_type = step_type + self.step_index += 1 + return self.current_step_id + + def finalize_step(self): + """Save current step to all_steps.""" + if self.current_step_id is None: + return + + content = self.full_content if self.current_step_type == "text" else self.full_thinking + self.all_steps.append({ + "id": self.current_step_id, + "index": self.current_step_idx, + "type": self.current_step_type, + "content": content + }) + + def handle_thinking(self, delta: Dict) -> Optional[str]: + """Handle reasoning delta from LLM.""" + reasoning = delta.get("reasoning_content", "") + if not reasoning: + return None + + if not self.full_thinking: + self.start_step("thinking") + + self.full_thinking += reasoning + return _sse_event("process_step", { + "step": { + "id": self.current_step_id, + "index": self.current_step_idx, + "type": "thinking", + "content": self.full_thinking + } + }) + + def handle_text(self, delta: Dict) -> Optional[str]: + """Handle content delta from LLM.""" + content = delta.get("content", "") + if not content: + return None + + if not self.full_content: + self.start_step("text") + + self.full_content += content + return _sse_event("process_step", { + "step": { + "id": self.current_step_id, + "index": self.current_step_idx, + "type": "text", + "content": self.full_content + } + }) + + def accumulate_tool_call(self, tc_delta: Dict): + """Accumulate tool call delta.""" + idx = tc_delta.get("index", 0) + if idx >= len(self.tool_calls_list): + self.tool_calls_list.append({ + "id": tc_delta.get("id", ""), + "type": "function", + "function": {"name": "", "arguments": ""} + }) + + func = tc_delta.get("function", {}) + if func.get("name"): + self.tool_calls_list[idx]["function"]["name"] += func["name"] + if func.get("arguments"): + self.tool_calls_list[idx]["function"]["arguments"] += func["arguments"] + + def emit_tool_calls(self) -> List[str]: + """Emit tool call steps, return SSE events.""" + events = [] + for tc in self.tool_calls_list: + step_id = f"step-{self.step_index}" + self.step_index += 1 + + step = { + "id": step_id, + "index": self.step_index - 1, + "type": "tool_call", + "id_ref": tc.get("id", ""), + "name": tc["function"]["name"], + "arguments": tc["function"]["arguments"] + } + self.all_steps.append(step) + self.all_tool_calls.append(tc) + events.append(_sse_event("process_step", {"step": step})) + + return events + + def emit_tool_result(self, result: Dict, ref_step_id: str) -> tuple: + """Emit tool result step, return (step, event).""" + step_id = f"step-{self.step_index}" + self.step_index += 1 + + content = result.get("content", "") + success = True + try: + parsed = json.loads(content) + if isinstance(parsed, dict): + success = parsed.get("success", True) + except (json.JSONDecodeError, TypeError): + pass + + step = { + "id": step_id, + "index": self.step_index - 1, + "type": "tool_result", + "id_ref": ref_step_id, + "name": result.get("name", ""), + "content": content, + "success": success + } + self.all_steps.append(step) + self.all_tool_results.append({ + "role": "tool", + "tool_call_id": result.get("tool_call_id", ""), + "content": content + }) + + return step, _sse_event("process_step", {"step": step}) + + def set_completion(self, msg_id: str, token_count: int, usage: dict): + """Set completion info for saving.""" + self._last_message_id = msg_id + self._last_token_count = token_count + self._last_usage = usage + + def reset_completion(self): + """Reset completion info.""" + self._last_message_id = None + self._last_token_count = 0 + self._last_usage = None