From 1bdccb437b62f945a1ebbe635a2c5f67272927c1 Mon Sep 17 00:00:00 2001
From: ViperEkura <3081035982@qq.com>
Date: Fri, 24 Apr 2026 12:45:04 +0800
Subject: [PATCH] =?UTF-8?q?refactor:=20=E4=BC=98=E5=8C=96=E6=96=87?=
=?UTF-8?q?=E4=BB=B6=E7=BB=93=E6=9E=84?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
assets/ARCHITECTURE.md | 60 +++-
luxx/services/__init__.py | 1 +
luxx/services/agentic_loop.py | 275 +++++++++++++++
luxx/services/chat.py | 605 ++++++++++----------------------
luxx/services/llm_client.py | 3 +-
luxx/services/llm_response.py | 309 ++++++++++++++++
luxx/services/process_result.py | 37 ++
luxx/services/stream_context.py | 185 ++++++++++
8 files changed, 1050 insertions(+), 425 deletions(-)
create mode 100644 luxx/services/agentic_loop.py
create mode 100644 luxx/services/llm_response.py
create mode 100644 luxx/services/process_result.py
create mode 100644 luxx/services/stream_context.py
diff --git a/assets/ARCHITECTURE.md b/assets/ARCHITECTURE.md
index 06c423e..66c54d0 100644
--- a/assets/ARCHITECTURE.md
+++ b/assets/ARCHITECTURE.md
@@ -29,7 +29,11 @@ luxx/
│ ├── providers.py # LLM 提供商管理
│ └── tools.py # 工具管理
├── services/ # 服务层
-│ ├── chat.py # 聊天服务 (Agentic Loop)
+│ ├── chat.py # 聊天服务门面
+│ ├── agentic_loop.py # Agentic Loop 执行器
+│ ├── stream_context.py# 流式状态管理
+│ ├── llm_response.py # LLM 响应解析器
+│ ├── process_result.py# 处理结果
│ └── llm_client.py # LLM 客户端
├── tools/ # 工具系统
│ ├── core.py # 核心类 (ToolRegistry, ToolDefinition, ToolResult)
@@ -308,6 +312,28 @@ ToolExecutor 返回结果
### 6. 服务层
+#### LLMResponseParser (`services/llm_response.py`)
+统一解析器,兼容多种 LLM API 格式:
+- **OpenAI**: `delta.content`, `delta.tool_calls`
+- **DeepSeek**: `delta.content`, `delta.reasoning_content`
+- **Anthropic**: `content_block` 类型事件
+- **MiniMax**: `<|im_start|>thinking...<|im_end|>` 标签
+
+```python
+from luxx.services.llm_response import llm_parser
+
+# 解析 OpenAI 格式
+parsed = llm_parser.parse_openai(delta)
+
+# 解析 Anthropic 格式
+parsed = llm_parser.parse_anthropic(chunk)
+
+# 返回 ParsedDelta
+parsed.thinking # 思考内容
+parsed.text # 文本内容
+parsed.tool_calls # 工具调用
+```
+
#### ChatService (`services/chat.py`)
核心聊天服务:
- Agentic Loop 迭代执行(最多 10 轮)
@@ -315,9 +341,22 @@ ToolExecutor 返回结果
- 工具调用编排(并行执行)
- 消息历史管理
- 自动重试机制
-- 支持 thinking_content 提取
- Token 用量追踪
+#### AgenticLoop (`services/agentic_loop.py`)
+执行 Agentic Loop 的核心循环:
+- 调用 LLM 获取响应
+- 使用 LLMResponseParser 解析响应
+- 管理 thinking/text/tool_call/tool_result 步骤
+- 工具并行执行
+
+#### StreamContext (`services/stream_context.py`)
+流式状态管理:
+- 追踪当前步骤类型和索引
+- 累积 thinking 和 text 内容
+- 管理 tool_calls 列表
+- 生成 SSE 事件
+
#### LLMClient (`services/llm_client.py`)
LLM API 客户端:
- 多提供商:DeepSeek、GLM、OpenAI
@@ -364,23 +403,30 @@ sequenceDiagram
participant Client
participant API as POST /messages/stream
participant CS as ChatService
+ participant AL as AgenticLoop
+ participant Parser as LLMResponseParser
participant LLM as LLM API
participant TE as ToolExecutor
Client->>API: POST {content, tools, thinking_enabled}
API->>CS: stream_response()
+ CS->>AL: execute()
loop MAX_ITERATIONS (10)
- CS->>LLM: call(messages, tools)
- LLM-->>CS: SSE Stream
+ AL->>LLM: stream_call(messages, tools)
+ LLM-->>AL: SSE Stream
+
+ AL->>Parser: parse_chunk()
+ Parser-->>AL: ParsedDelta {thinking, text, tool_calls}
alt tool_calls
- CS->>TE: process_tool_calls_parallel()
- TE-->>CS: tool_results
- CS->>CS: 追加到 messages
+ AL->>TE: process_tool_calls_parallel()
+ TE-->>AL: tool_results
+ AL->>AL: 追加到 messages
end
end
+ AL->>CS: done event
CS->>CS: _save_message()
CS->>API: SSE Stream
API-->>Client: 流式响应
diff --git a/luxx/services/__init__.py b/luxx/services/__init__.py
index d4d0ab0..6cb90eb 100644
--- a/luxx/services/__init__.py
+++ b/luxx/services/__init__.py
@@ -1,3 +1,4 @@
"""Services module"""
from luxx.services.llm_client import LLMClient, llm_client, LLMResponse
from luxx.services.chat import ChatService, chat_service
+from luxx.services.llm_response import LLMResponseParser, llm_parser, ParsedDelta
diff --git a/luxx/services/agentic_loop.py b/luxx/services/agentic_loop.py
new file mode 100644
index 0000000..15fb2cb
--- /dev/null
+++ b/luxx/services/agentic_loop.py
@@ -0,0 +1,275 @@
+"""AgenticLoop - Executes the Agentic Loop: LLM + Tools iteration.
+
+The loop:
+1. Call LLM with messages and tools
+2. Check for tool calls in response
+3. Execute tools in parallel
+4. Add results to messages
+5. Repeat (max 10 iterations)
+6. Return final response
+"""
+import json
+import uuid
+import logging
+import traceback
+from typing import List, Dict, Any, AsyncGenerator
+
+from luxx.tools.executor import ToolExecutor
+from luxx.services.llm_client import LLMClient
+from luxx.services.stream_context import StreamContext, _sse_event
+from luxx.services.process_result import ProcessResult
+from luxx.services.llm_response import llm_parser
+
+logger = logging.getLogger(__name__)
+
+# Maximum iterations to prevent infinite loops
+MAX_ITERATIONS = 10
+
+
+def _parse_sse_line(line: str) -> tuple:
+ """Parse SSE line into (event_type, data_str)."""
+ event_type = None
+ data_str = None
+ for part in line.strip().split('\n'):
+ if part.startswith('event: '):
+ event_type = part[7:].strip()
+ elif part.startswith('data: '):
+ data_str = part[6:].strip()
+ return event_type, data_str
+
+
+class AgenticLoop:
+ """Executes the Agentic Loop: LLM + Tools iteration."""
+
+ def __init__(self, tool_executor: ToolExecutor):
+ self.tool_executor = tool_executor
+
+ async def execute(
+ self,
+ llm: LLMClient,
+ model: str,
+ messages: List[Dict],
+ tools: list,
+ temperature: float,
+ max_tokens: int,
+ thinking_enabled: bool,
+ context: 'StreamContext',
+ tool_context: dict = None
+ ) -> AsyncGenerator[str, None]:
+ """Execute the agentic loop.
+
+ Yields SSE events for each step.
+ """
+ total_usage = {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0}
+
+ for iteration in range(MAX_ITERATIONS):
+ context.reset()
+ has_error = False
+
+ # Stream LLM response
+ async for sse_line in llm.stream_call(
+ model=model,
+ messages=messages,
+ tools=tools,
+ temperature=temperature,
+ max_tokens=max_tokens,
+ thinking_enabled=thinking_enabled
+ ):
+ # Process stream line
+ result = self._process_stream_line(sse_line, context, total_usage)
+
+ # Yield events
+ for event in result.events:
+ yield event
+
+ # Check for errors
+ if result.has_error:
+ has_error = True
+ break
+
+ # If error occurred, break the loop
+ if has_error:
+ break
+
+ # Finalize current step
+ context.finalize_step()
+
+ # Check for tool calls
+ if context.tool_calls_list:
+ # Execute tools and yield events
+ for event in self._execute_tools(context, messages, tool_context):
+ yield event
+ continue
+
+ # No tools - complete
+ for event in self._complete(context, total_usage):
+ yield event
+ return
+
+ # Max iterations exceeded or error occurred
+ if not has_error:
+ yield _sse_event("error", {"content": "Exceeded maximum tool call iterations"})
+
+ def _process_stream_line(self, sse_line: str, ctx: 'StreamContext',
+ total_usage: dict) -> ProcessResult:
+ """Process single SSE line from LLM, return result with events and flags."""
+ result = ProcessResult()
+ event_type, data_str = _parse_sse_line(sse_line)
+ if not data_str:
+ return result
+
+ # Handle upstream errors
+ if event_type == 'error':
+ try:
+ error_data = json.loads(data_str)
+ error_content = error_data.get("content", "Unknown error")
+ except json.JSONDecodeError:
+ error_content = data_str
+ result.set_error(error_content)
+ result.add_event(_sse_event("error", {"content": error_content}))
+ return result
+
+ try:
+ chunk = json.loads(data_str)
+ except json.JSONDecodeError:
+ error_msg = f"Parse error: {data_str[:50]}"
+ result.set_error(error_msg)
+ result.add_event(_sse_event("error", {"content": error_msg}))
+ return result
+
+ # Extract usage
+ if "usage" in chunk and chunk["usage"]:
+ usage = chunk["usage"]
+ total_usage.update({
+ "prompt_tokens": usage.get("prompt_tokens", 0),
+ "completion_tokens": usage.get("completion_tokens", 0),
+ "total_tokens": usage.get("total_tokens", 0)
+ })
+
+ # Handle API errors
+ if "error" in chunk:
+ error_msg = chunk["error"].get("message", str(chunk["error"]))
+ result.set_error(error_msg)
+ result.add_event(_sse_event("error", {"content": f"API Error: {error_msg}"}))
+ return result
+
+ # Get delta
+ choices = chunk.get("choices", [])
+ if not choices:
+ # Non-standard format: check for content directly
+ content = chunk.get("content") or ""
+ if content:
+ # Check for thinking tags in content
+ thinking_part, clean_text = llm_parser._extract_thinking_tags(content)
+
+ if thinking_part:
+ ctx.full_thinking = (ctx.full_thinking or "") + thinking_part
+ if not ctx.current_step_id or ctx.current_step_type != "thinking":
+ ctx.start_step("thinking")
+ result.add_event(_sse_event("process_step", {
+ "step": {"id": ctx.current_step_id, "index": ctx.current_step_idx, "type": "thinking", "content": ctx.full_thinking}
+ }))
+ result.set_content()
+
+ if clean_text:
+ ctx.full_content = (ctx.full_content or "") + clean_text
+ if not ctx.current_step_id or ctx.current_step_type != "text":
+ ctx.start_step("text")
+ result.add_event(_sse_event("process_step", {
+ "step": {"id": ctx.current_step_id, "index": ctx.current_step_idx, "type": "text", "content": ctx.full_content}
+ }))
+ result.set_content()
+ return result
+
+ delta = choices[0].get("delta", {})
+
+ # Parse delta using unified parser
+ parsed = llm_parser.parse_openai(delta)
+
+ # Process thinking content
+ if parsed.thinking:
+ ctx.full_thinking = parsed.thinking
+ if not ctx.current_step_id or ctx.current_step_type != "thinking":
+ ctx.start_step("thinking")
+ result.add_event(_sse_event("process_step", {
+ "step": {
+ "id": ctx.current_step_id,
+ "index": ctx.current_step_idx,
+ "type": "thinking",
+ "content": ctx.full_thinking
+ }
+ }))
+ result.set_content()
+
+ # Process text content
+ if parsed.text:
+ ctx.full_content = parsed.text
+ if not ctx.current_step_id or ctx.current_step_type != "text":
+ ctx.start_step("text")
+ result.add_event(_sse_event("process_step", {
+ "step": {
+ "id": ctx.current_step_id,
+ "index": ctx.current_step_idx,
+ "type": "text",
+ "content": ctx.full_content
+ }
+ }))
+ result.set_content()
+
+ # Accumulate tool calls
+ for tc in parsed.tool_calls or delta.get("tool_calls", []):
+ ctx.accumulate_tool_call(tc)
+ result.set_tool_calls()
+
+ return result
+
+ def _execute_tools(self, ctx: 'StreamContext', messages: list,
+ tool_context: dict = None) -> List[str]:
+ """Execute tools and return list of events."""
+ events = []
+
+ # Emit tool call steps
+ for event in ctx.emit_tool_calls():
+ events.append(event)
+
+ # Execute in parallel
+ tool_results = self.tool_executor.process_tool_calls_parallel(
+ ctx.tool_calls_list, tool_context or {}
+ )
+
+ # Get tool call IDs for result linking
+ tool_ids = [tc.get("id") for tc in ctx.tool_calls_list]
+ tool_step_ids = [
+ s["id"] for s in ctx.all_steps
+ if s["type"] == "tool_call" and s.get("id_ref") in tool_ids
+ ]
+
+ # Emit tool result steps
+ for i, (tr, tc) in enumerate(zip(tool_results, ctx.tool_calls_list)):
+ ref_id = tool_step_ids[i] if i < len(tool_step_ids) else f"step-{len(ctx.all_steps) - len(tool_results) + i}"
+ _, event = ctx.emit_tool_result(tr, ref_id)
+ events.append(event)
+
+ # Prepare for next iteration
+ messages.append({
+ "role": "assistant",
+ "content": ctx.full_content or "",
+ "tool_calls": ctx.tool_calls_list
+ })
+ messages.extend(ctx.all_tool_results[-len(tool_results):])
+
+ return events
+
+ def _complete(self, ctx: 'StreamContext', total_usage: dict) -> List[str]:
+ """Complete the loop and return list of events."""
+ token_count = total_usage.get("completion_tokens") or len(ctx.full_content) // 4
+ msg_id = str(uuid.uuid4())
+ logger.info(f"[TOKEN] usage={total_usage}, count={token_count}")
+
+ ctx.set_completion(msg_id, token_count, total_usage)
+
+ return [_sse_event("done", {
+ "message_id": msg_id,
+ "token_count": token_count,
+ "usage": total_usage
+ })]
diff --git a/luxx/services/chat.py b/luxx/services/chat.py
index a8dc3cc..3d67b72 100644
--- a/luxx/services/chat.py
+++ b/luxx/services/chat.py
@@ -1,34 +1,99 @@
-"""Chat service module"""
-import json
-import uuid
-import logging
-from typing import List, Dict, Any, AsyncGenerator, Optional
+"""Chat service module with Agentic Loop pattern.
+
+This module provides the core chat service that orchestrates:
+- StreamContext: Manages streaming state transitions
+- MessageBuilder: Constructs message lists
+- AgenticLoop: Executes the agentic loop (LLM + tools iteration)
+- ChatService: Core chat service facade
+"""
+import json
+import logging
+import traceback
+import httpx
+from typing import List, Dict, Any, AsyncGenerator
-from luxx.models import Conversation, Message
from luxx.tools.executor import ToolExecutor
from luxx.tools.core import registry
from luxx.services.llm_client import LLMClient
+from luxx.services.stream_context import StreamContext
+from luxx.services.agentic_loop import AgenticLoop
from luxx.config import config
logger = logging.getLogger(__name__)
-# Maximum iterations to prevent infinite loops
-MAX_ITERATIONS = 10
-def _sse_event(event: str, data: dict) -> str:
- """Format a Server-Sent Event string."""
- return f"event: {event}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n"
+# ============== MessageBuilder ==============
+
+class MessageBuilder:
+ """Builds message lists for LLM requests."""
+
+ def __init__(self):
+ self.messages = []
+
+ def add_system(self, content: str) -> 'MessageBuilder':
+ """Add system message."""
+ self.messages.append({"role": "system", "content": content})
+ return self
+
+ def add_user(self, content: str, attachments: list = None) -> 'MessageBuilder':
+ """Add user message in JSON format."""
+ msg_content = json.dumps({
+ "text": content,
+ "attachments": attachments or []
+ }, ensure_ascii=False)
+ self.messages.append({"role": "user", "content": msg_content})
+ return self
+
+ def add_assistant(self, content: str, tool_calls: list = None) -> 'MessageBuilder':
+ """Add assistant message."""
+ msg = {"role": "assistant", "content": content}
+ if tool_calls:
+ msg["tool_calls"] = tool_calls
+ self.messages.append(msg)
+ return self
+
+ def add_tool_result(self, tool_call_id: str, content: str) -> 'MessageBuilder':
+ """Add tool result message."""
+ self.messages.append({
+ "role": "tool",
+ "tool_call_id": tool_call_id,
+ "content": content
+ })
+ return self
+
+ def build(self) -> List[Dict]:
+ """Build and return message list."""
+ return self.messages.copy()
+
+ @staticmethod
+ def extract_text(content: str) -> str:
+ """Extract text from message content (supports JSON format)."""
+ if not content:
+ return ""
+ try:
+ parsed = json.loads(content)
+ if isinstance(parsed, dict):
+ return parsed.get("text", content)
+ except (json.JSONDecodeError, TypeError):
+ pass
+ return content
-def get_llm_client(conversation: Conversation = None):
- """Get LLM client, optionally using conversation's provider. Returns (client, max_tokens)"""
+# ============== Factory Function ==============
+
+def get_llm_client(conversation=None) -> tuple:
+ """Get LLM client based on conversation provider. Returns (client, max_tokens)"""
+ from luxx.models import LLMProvider
+ from luxx.database import SessionLocal
+
max_tokens = None
+
if conversation and conversation.provider_id:
- from luxx.models import LLMProvider
- from luxx.database import SessionLocal
db = SessionLocal()
try:
- provider = db.query(LLMProvider).filter(LLMProvider.id == conversation.provider_id).first()
+ provider = db.query(LLMProvider).filter(
+ LLMProvider.id == conversation.provider_id
+ ).first()
if provider:
max_tokens = provider.max_tokens
client = LLMClient(
@@ -40,184 +105,27 @@ def get_llm_client(conversation: Conversation = None):
finally:
db.close()
- # Fallback to global config
- client = LLMClient()
- return client, max_tokens
+ return LLMClient(), max_tokens
-class StreamContext:
- """Context for streaming response state management."""
-
- def __init__(
- self,
- step_index: int = 0,
- current_step_id: str = None,
- current_step_idx: int = None,
- current_stream_type: str = None,
- full_content: str = "",
- full_thinking: str = ""
- ):
- self.step_index = step_index
- self.current_step_id = current_step_id
- self.current_step_idx = current_step_idx
- self.current_stream_type = current_stream_type
- self.full_content = full_content
- self.full_thinking = full_thinking
- self.all_steps = []
- self.all_tool_calls = []
- self.all_tool_results = []
- self.tool_calls_list = []
-
- def reset_iteration(self):
- """Reset streaming step tracker for new iteration."""
- self.current_step_id = None
- self.current_step_idx = None
- self.current_stream_type = None
- self.full_content = ""
- self.full_thinking = ""
- self.tool_calls_list = []
-
- def start_stream_step(self, step_type: str) -> str:
- """Start a new streaming step. Returns the step_id."""
- self.current_step_idx = self.step_index
- self.current_step_id = f"step-{self.step_index}"
- self.current_stream_type = step_type
- self.step_index += 1
- return self.current_step_id
-
- def yield_stream_step(self, step_type: str, content: str) -> Dict[str, Any]:
- """Yield a streaming step event."""
- return _sse_event("process_step", {
- "step": {
- "id": self.current_step_id,
- "index": self.current_step_idx,
- "type": step_type,
- "content": content
- }
- })
-
- def save_streaming_step(self):
- """Save the current streaming step to all_steps."""
- if self.current_step_id is None:
- return
-
- if self.current_stream_type == "thinking":
- self.all_steps.append({
- "id": self.current_step_id,
- "index": self.current_step_idx,
- "type": "thinking",
- "content": self.full_thinking
- })
- elif self.current_stream_type == "text":
- self.all_steps.append({
- "id": self.current_step_id,
- "index": self.current_step_idx,
- "type": "text",
- "content": self.full_content
- })
-
- def handle_thinking_stream(self, delta: Dict) -> Optional[Dict]:
- """Handle reasoning/thinking delta. Returns yield_obj if step was yielded."""
- reasoning = delta.get("reasoning_content", "")
- if not reasoning:
- return None
-
- prev_len = len(self.full_thinking)
- self.full_thinking += reasoning
-
- if prev_len == 0: # New thinking stream started
- self.start_stream_step("thinking")
-
- return self.yield_stream_step("thinking", self.full_thinking)
-
- def handle_text_stream(self, delta: Dict) -> Optional[Dict]:
- """Handle content delta. Returns yield_obj if step was yielded."""
- content = delta.get("content", "")
- if not content:
- return None
-
- prev_len = len(self.full_content)
- self.full_content += content
-
- if prev_len == 0: # New text stream started
- self.start_stream_step("text")
-
- return self.yield_stream_step("text", self.full_content)
-
- def handle_tool_call(self) -> tuple:
- """Handle tool calls. Returns (tool_call_step_ids, tool_call_steps, yield_objs)."""
- tool_call_step_ids = []
- tool_call_steps = []
- yield_objs = []
-
- for tc in self.tool_calls_list:
- call_step_idx = self.step_index
- call_step_id = f"step-{self.step_index}"
- tool_call_step_ids.append(call_step_id)
- self.step_index += 1
-
- call_step = {
- "id": call_step_id,
- "index": call_step_idx,
- "type": "tool_call",
- "id_ref": tc.get("id", ""),
- "name": tc["function"]["name"],
- "arguments": tc["function"]["arguments"]
- }
- tool_call_steps.append(call_step)
- yield_objs.append(_sse_event("process_step", {"step": call_step}))
-
- return tool_call_step_ids, tool_call_steps, yield_objs
-
- def handle_tool_result(self, tool_result: Dict, tool_call_step_id: str) -> tuple:
- """Handle single tool result. Returns (result_step, yield_obj)."""
- result_step_idx = self.step_index
- result_step_id = f"step-{self.step_index}"
- self.step_index += 1
-
- content = tool_result.get("content", "")
- success = True
- try:
- content_obj = json.loads(content)
- if isinstance(content_obj, dict):
- success = content_obj.get("success", True)
- except:
- pass
-
- result_step = {
- "id": result_step_id,
- "index": result_step_idx,
- "type": "tool_result",
- "id_ref": tool_call_step_id,
- "name": tool_result.get("name", ""),
- "content": content,
- "success": success
- }
- return result_step, _sse_event("process_step", {"step": result_step})
-
+# ============== ChatService ==============
class ChatService:
- """Chat service with tool support"""
+ """Core chat service with Agentic Loop support."""
def __init__(self):
self.tool_executor = ToolExecutor()
+ self.agentic_loop = AgenticLoop(self.tool_executor)
- def build_messages(
- self,
- conversation: Conversation,
- include_system: bool = True
- ) -> List[Dict[str, str]]:
- """Build message list"""
+ def build_messages(self, conversation, include_system: bool = True) -> List[Dict]:
+ """Build message list from conversation history."""
from luxx.database import SessionLocal
from luxx.models import Message
messages = []
if include_system and conversation.system_prompt:
- messages.append({
- "role": "system",
- "content": conversation.system_prompt
- })
+ messages.append({"role": "system", "content": conversation.system_prompt})
db = SessionLocal()
try:
@@ -226,28 +134,23 @@ class ChatService:
).order_by(Message.created_at).all()
for msg in db_messages:
- # Parse JSON content if possible
- try:
- content_obj = json.loads(msg.content) if msg.content else {}
- if isinstance(content_obj, dict):
- content = content_obj.get("text", msg.content)
- else:
- content = msg.content
- except (json.JSONDecodeError, TypeError):
- content = msg.content
-
- messages.append({
- "role": msg.role,
- "content": content
- })
+ content = MessageBuilder.extract_text(msg.content)
+ messages.append({"role": msg.role, "content": content})
finally:
db.close()
return messages
+ def _get_tools(self, enabled_tools: list) -> list:
+ """Filter tools based on enabled_tools list."""
+ if not enabled_tools:
+ return []
+ return [t for t in registry.list_all()
+ if t.get("function", {}).get("name") in enabled_tools]
+
async def stream_response(
self,
- conversation: Conversation,
+ conversation,
user_message: str,
thinking_enabled: bool = False,
enabled_tools: list = None,
@@ -255,256 +158,118 @@ class ChatService:
username: str = None,
workspace: str = None,
user_permission_level: int = 1
- ) -> AsyncGenerator[Dict[str, str], None]:
- """
- Streaming response generator
-
- Yields raw SSE event strings for direct forwarding.
- """
+ ) -> AsyncGenerator[str, None]:
+ """Streaming response with Agentic Loop."""
try:
+ # Build initial messages
messages = self.build_messages(conversation)
-
messages.append({
"role": "user",
"content": json.dumps({"text": user_message, "attachments": []})
})
- # Get tools based on enabled_tools filter
- if enabled_tools:
- tools = [t for t in registry.list_all() if t.get("function", {}).get("name") in enabled_tools]
- else:
- tools = []
-
+ # Get tools and LLM client
+ tools = self._get_tools(enabled_tools)
llm, provider_max_tokens = get_llm_client(conversation)
model = conversation.model or llm.default_model or "gpt-4"
- # 直接使用 provider 的 max_tokens
- max_tokens = provider_max_tokens
+ max_tokens = provider_max_tokens or 8192
- # Token usage tracking
- total_usage = {
- "prompt_tokens": 0,
- "completion_tokens": 0,
- "total_tokens": 0
+ # Tool execution context
+ tool_context = {
+ "workspace": workspace,
+ "user_id": user_id,
+ "username": username,
+ "user_permission_level": user_permission_level
}
- actual_token_count = 0
- # Streaming context for state management
+ # Stream context
ctx = StreamContext()
- for iteration in range(MAX_ITERATIONS):
- # Reset streaming context for this iteration
- ctx.reset_iteration()
-
- async for sse_line in llm.stream_call(
- model=model,
- messages=messages,
- tools=tools,
- temperature=conversation.temperature,
- max_tokens=max_tokens or 8192,
- thinking_enabled=thinking_enabled or conversation.thinking_enabled
- ):
- # Parse SSE line
- # Format: "event: xxx\ndata: {...}\n\n"
- event_type = None
- data_str = None
-
- for line in sse_line.strip().split('\n'):
- if line.startswith('event: '):
- event_type = line[7:].strip()
- elif line.startswith('data: '):
- data_str = line[6:].strip()
-
- if data_str is None:
- continue
-
- # Handle error events from LLM
- if event_type == 'error':
- try:
- error_data = json.loads(data_str)
- yield _sse_event("error", {"content": error_data.get("content", "Unknown error")})
- except json.JSONDecodeError:
- yield _sse_event("error", {"content": data_str})
- return
-
- # Parse the data
- try:
- chunk = json.loads(data_str)
- except json.JSONDecodeError:
- yield _sse_event("error", {"content": f"Failed to parse response: {data_str}"})
- return
-
- # 提取 API 返回的 usage 信息
- if "usage" in chunk:
- usage = chunk["usage"]
- total_usage["prompt_tokens"] = usage.get("prompt_tokens", 0)
- total_usage["completion_tokens"] = usage.get("completion_tokens", 0)
- total_usage["total_tokens"] = usage.get("total_tokens", 0)
-
- # Check for error in response
- if "error" in chunk:
- error_msg = chunk["error"].get("message", str(chunk["error"]))
- yield _sse_event("error", {"content": f"API Error: {error_msg}"})
- return
-
- # Get delta
- choices = chunk.get("choices", [])
- if not choices:
- # Check if there's any content in the response (for non-standard LLM responses)
- if chunk.get("content") or chunk.get("message"):
- content = chunk.get("content") or chunk.get("message", {}).get("content", "")
- if content:
- prev_len = len(ctx.full_content)
- ctx.full_content += content
- if prev_len == 0: # New text stream started
- ctx.start_stream_step("text")
- yield _sse_event("process_step", {
- "step": {
- "id": ctx.current_step_id if prev_len == 0 else f"step-{ctx.step_index - 1}",
- "index": ctx.current_step_idx if prev_len == 0 else ctx.step_index - 1,
- "type": "text",
- "content": ctx.full_content
- }
- })
- continue
-
- delta = choices[0].get("delta", {})
-
- # Handle reasoning (thinking)
- yield_obj = ctx.handle_thinking_stream(delta)
- if yield_obj:
- yield yield_obj
-
- # Handle content
- yield_obj = ctx.handle_text_stream(delta)
- if yield_obj:
- yield yield_obj
-
- # Accumulate tool calls
- tool_calls_delta = delta.get("tool_calls", [])
- for tc in tool_calls_delta:
- idx = tc.get("index", 0)
- if idx >= len(ctx.tool_calls_list):
- ctx.tool_calls_list.append({
- "id": tc.get("id", ""),
- "type": "function",
- "function": {"name": "", "arguments": ""}
- })
- func = tc.get("function", {})
- if func.get("name"):
- ctx.tool_calls_list[idx]["function"]["name"] += func["name"]
- if func.get("arguments"):
- ctx.tool_calls_list[idx]["function"]["arguments"] += func["arguments"]
-
- # Save streaming step (thinking or text)
- ctx.save_streaming_step()
-
- # Handle tool calls
- if ctx.tool_calls_list:
- ctx.all_tool_calls.extend(ctx.tool_calls_list)
-
- # Handle tool_call steps
- tool_call_step_ids, tool_call_steps, yield_objs = ctx.handle_tool_call()
- ctx.all_steps.extend(tool_call_steps)
- for yield_obj in yield_objs:
- yield yield_obj
-
- # Execute tools
- tool_context = {
- "workspace": workspace,
- "user_id": user_id,
- "username": username,
- "user_permission_level": user_permission_level
- }
- tool_results = self.tool_executor.process_tool_calls_parallel(
- ctx.tool_calls_list, tool_context
- )
-
- # Handle tool_result steps
- for i, tr in enumerate(tool_results):
- tool_call_step_id = tool_call_step_ids[i] if i < len(tool_call_step_ids) else f"step-{i}"
- result_step, yield_obj = ctx.handle_tool_result(tr, tool_call_step_id)
- ctx.all_steps.append(result_step)
- yield yield_obj
-
- ctx.all_tool_results.append({
- "role": "tool",
- "tool_call_id": tr.get("tool_call_id", ""),
- "content": tr.get("content", "")
- })
-
- # Add assistant message with tool calls for next iteration
- messages.append({
- "role": "assistant",
- "content": ctx.full_content or "",
- "tool_calls": ctx.tool_calls_list
- })
- messages.extend(ctx.all_tool_results[-len(tool_results):])
- ctx.all_tool_results = []
- continue
-
- # No tool calls - final iteration, save message
- msg_id = str(uuid.uuid4())
-
- # 使用 API 返回的真实 completion_tokens,如果 API 没返回则降级使用估算值
- actual_token_count = total_usage.get("completion_tokens", 0) or len(ctx.full_content) // 4
- logger.info(f"[TOKEN] total_usage: {total_usage}, actual_token_count: {actual_token_count}")
-
- self._save_message(
- conversation.id,
- msg_id,
- ctx.full_content,
- ctx.all_tool_calls,
- ctx.all_tool_results,
- ctx.all_steps,
- actual_token_count,
- total_usage
- )
-
- yield _sse_event("done", {
- "message_id": msg_id,
- "token_count": actual_token_count,
- "usage": total_usage
- })
- return
+ # Execute agentic loop
+ async for event in self.agentic_loop.execute(
+ llm=llm,
+ model=model,
+ messages=messages,
+ tools=tools,
+ temperature=conversation.temperature,
+ max_tokens=max_tokens,
+ thinking_enabled=thinking_enabled or conversation.thinking_enabled,
+ context=ctx,
+ tool_context=tool_context
+ ):
+ yield event
- # Max iterations exceeded - save message before error
- if ctx.full_content or ctx.all_tool_calls:
- msg_id = str(uuid.uuid4())
+ # Save message after successful completion (only if we have content)
+ if ctx._last_message_id and (ctx.full_content or ctx.all_tool_calls):
self._save_message(
conversation.id,
- msg_id,
+ ctx._last_message_id,
ctx.full_content,
ctx.all_tool_calls,
ctx.all_tool_results,
ctx.all_steps,
- actual_token_count,
- total_usage
+ ctx._last_token_count,
+ ctx._last_usage
)
- yield _sse_event("error", {"content": "Exceeded maximum tool call iterations"})
except Exception as e:
- logger.error(f"Stream error: {e}")
+ logger.error(f"Stream error: {e}\n{traceback.format_exc()}")
yield _sse_event("error", {"content": str(e)})
- def _save_message(
+ async def non_stream_response(
self,
- conversation_id: str,
- msg_id: str,
- full_content: str,
- all_tool_calls: list,
- all_tool_results: list,
- all_steps: list,
- token_count: int = 0,
- usage: dict = None
- ):
- """Save the assistant message to database."""
+ conversation,
+ user_message: str,
+ tools_enabled: bool = True,
+ thinking_enabled: bool = False
+ ) -> Dict[str, Any]:
+ """Non-streaming response for simple requests."""
+ try:
+ messages = self.build_messages(conversation)
+ messages.append({
+ "role": "user",
+ "content": json.dumps({"text": user_message, "attachments": []})
+ })
+
+ tools = [] if not tools_enabled else None
+ llm, max_tokens = get_llm_client(conversation)
+ model = conversation.model or llm.default_model or "gpt-4"
+
+ response = await llm.sync_call(
+ model=model,
+ messages=messages,
+ tools=tools,
+ temperature=conversation.temperature,
+ max_tokens=max_tokens or 8192,
+ thinking_enabled=thinking_enabled or conversation.thinking_enabled
+ )
+
+ return {
+ "success": True,
+ "content": response.content,
+ "tool_calls": response.tool_calls,
+ "usage": response.usage
+ }
+
+ except httpx.HTTPStatusError as e:
+ error_msg = f"HTTP {e.response.status_code}: {e.response.text[:200] if e.response else 'No response body'}"
+ logger.error(f"Non-stream HTTP error: {error_msg}")
+ return {"success": False, "error": error_msg}
+ except httpx.TimeoutException as e:
+ logger.error(f"Non-stream timeout: {e}")
+ return {"success": False, "error": "Request timeout"}
+ except Exception as e:
+ logger.error(f"Non-stream error: {type(e).__name__}: {e}\n{traceback.format_exc()}")
+ return {"success": False, "error": f"{type(e).__name__}: {str(e)}"}
+
+ def _save_message(self, conversation_id: str, msg_id: str, full_content: str,
+ all_tool_calls: list, all_tool_results: list, all_steps: list,
+ token_count: int = 0, usage: dict = None):
+ """Save assistant message to database."""
from luxx.database import SessionLocal
from luxx.models import Message
- content_json = {
- "text": full_content,
- "steps": all_steps
- }
+ content_json = {"text": full_content, "steps": all_steps}
if all_tool_calls:
content_json["tool_calls"] = all_tool_calls
@@ -520,12 +285,18 @@ class ChatService:
)
db.add(msg)
db.commit()
- except Exception as e:
+ except Exception:
db.rollback()
raise
finally:
db.close()
-# Global chat service
+def _sse_event(event: str, data: dict) -> str:
+ """Format a Server-Sent Event string."""
+ return f"event: {event}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n"
+
+
+# ============== Global Singleton ==============
+
chat_service = ChatService()
diff --git a/luxx/services/llm_client.py b/luxx/services/llm_client.py
index acec3eb..fac67a7 100644
--- a/luxx/services/llm_client.py
+++ b/luxx/services/llm_client.py
@@ -2,6 +2,7 @@
import json
import httpx
import logging
+import traceback
from typing import Dict, Any, Optional, List, AsyncGenerator
from luxx.config import config
@@ -172,7 +173,7 @@ class LLMClient:
logger.error(f"HTTP error: {status_code}")
yield f"event: error\ndata: {json.dumps({'content': f'HTTP {status_code}: Request failed'})}\n\n"
except Exception as e:
- logger.error(f"Exception: {type(e).__name__}: {str(e)}")
+ logger.error(f"Exception: {type(e).__name__}: {str(e)}\n{traceback.format_exc()}")
yield f"event: error\ndata: {json.dumps({'content': str(e)})}\n\n"
diff --git a/luxx/services/llm_response.py b/luxx/services/llm_response.py
new file mode 100644
index 0000000..f98ec13
--- /dev/null
+++ b/luxx/services/llm_response.py
@@ -0,0 +1,309 @@
+"""LLM Response Parser - Unified parser for multiple LLM API formats.
+
+Supported Providers:
+- OpenAI: delta.content, delta.tool_calls
+- DeepSeek: delta.content, delta.reasoning_content, delta.tool_calls
+- Anthropic: content_block with thinking/text types
+- MiniMax: <|im_start|>thinking...<|im_end|> tags in content
+
+Data Flow:
+```
+LLM API Response (SSE)
+ │
+ ▼
+LLMResponseParser.parse_chunk()
+ │
+ ├──► ParsedDelta { thinking, text, tool_calls }
+ │
+ ▼
+AgenticLoop._process_stream_line()
+ │
+ ▼
+SSE Events (process_step)
+ │
+ ├──► type: "thinking"
+ ├──► type: "text"
+ └──► type: "tool_call"
+```
+
+API Response Formats:
+
+1. OpenAI Standard (DeepSeek, OpenAI):
+```json
+{
+ "choices": [{
+ "delta": {
+ "content": "Hello",
+ "reasoning_content": "Let me think...",
+ "tool_calls": [{"id": "call_1", "function": {...}}]
+ }
+ }]
+}
+```
+
+2. Anthropic Streaming:
+```json
+{"type": "content_block_start", "content_block": {"type": "thinking", "thinking": "..."}}
+{"type": "content_block_delta", "delta": {"type": "thinking_delta", "thinking": "..."}}
+{"type": "content_block_delta", "delta": {"type": "text_delta", "text": "..."}}
+{"type": "content_block_stop"}
+```
+
+3. MiniMax (with thinking tags in content):
+```json
+{
+ "choices": [{
+ "delta": {
+ "content": "<|im_start|>thinking分析中...<|im_end|>回复内容"
+ }
+ }]
+}
+```
+
+4. Standard thinking tags:
+```json
+{
+ "choices": [{
+ "delta": {
+ "content": "思考内容回复内容"
+ }
+ }]
+}
+```
+"""
+from typing import Dict, Any, Optional, List
+from dataclasses import dataclass
+
+
+@dataclass
+class ParsedDelta:
+ """Parsed response delta from LLM.
+
+ Attributes:
+ thinking: Thinking/reasoning content
+ text: Regular text content
+ tool_calls: Tool call requests
+ is_complete: Whether this delta completes a content block
+ """
+ thinking: str = ""
+ text: str = ""
+ tool_calls: List[Dict] = None
+ is_complete: bool = False
+
+ def __post_init__(self):
+ if self.tool_calls is None:
+ self.tool_calls = []
+
+
+class LLMResponseParser:
+ """Unified parser for LLM API response formats.
+
+ Usage:
+ from luxx.services.llm_response import llm_parser
+
+ # Parse OpenAI format
+ delta = {"content": "Hello", "reasoning_content": "Thinking..."}
+ parsed = llm_parser.parse_openai(delta)
+
+ # Parse Anthropic format
+ chunk = {"type": "content_block_delta", "delta": {"type": "thinking_delta", "thinking": "..."}}
+ parsed = llm_parser.parse_anthropic(chunk)
+
+ # Auto-detect format
+ parsed = llm_parser.parse_chunk(chunk, provider="anthropic")
+ """
+
+ # Content block types
+ BLOCK_THINKING = "thinking"
+ BLOCK_TEXT = "text"
+ BLOCK_TOOL_USE = "tool_use"
+ BLOCK_TOOL_RESULT = "tool_result"
+
+ def __init__(self):
+ self._buffer = ""
+ self._thinking_buffer = ""
+ self._text_buffer = ""
+
+ def reset(self):
+ """Reset parser state for new message."""
+ self._buffer = ""
+ self._thinking_buffer = ""
+ self._text_buffer = ""
+
+ def parse_openai(self, delta: Dict) -> ParsedDelta:
+ """Parse OpenAI format delta.
+
+ Handles:
+ - OpenAI: delta.content, delta.tool_calls
+ - DeepSeek: delta.content, delta.reasoning_content, delta.tool_calls
+ - MiniMax: <|im_start|>thinking...<|im_end|> in content
+ - Standard: ... in content
+
+ Args:
+ delta: Delta object from LLM API response
+
+ Returns:
+ ParsedDelta with extracted thinking, text, and tool_calls
+ """
+ result = ParsedDelta()
+
+ # Get thinking content (DeepSeek uses reasoning_content)
+ thinking = delta.get("reasoning_content") or delta.get("reasoning") or ""
+ if thinking:
+ self._thinking_buffer += thinking
+ result.thinking = self._thinking_buffer
+
+ # Get text content
+ text = delta.get("content") or ""
+ if text:
+ # Check for embedded thinking tags (MiniMax format)
+ thinking_part, clean_text = self._extract_thinking_tags(text)
+ if thinking_part:
+ self._thinking_buffer += thinking_part
+ result.thinking = self._thinking_buffer
+ if clean_text:
+ self._text_buffer += clean_text
+ result.text = self._text_buffer
+ elif thinking_part := delta.get("thinking"):
+ # Some providers use "thinking" field directly
+ self._thinking_buffer += thinking_part
+ result.thinking = self._thinking_buffer
+
+ # Tool calls
+ result.tool_calls = delta.get("tool_calls") or []
+
+ return result
+
+ def parse_anthropic(self, chunk: Dict) -> ParsedDelta:
+ """Parse Anthropic streaming format.
+
+ Anthropic uses a different event structure:
+ - content_block_start: Begin a content block
+ - content_block_delta: Incremental content
+ - content_block_stop: End of content blocks
+
+ Content block types:
+ - thinking: Model reasoning
+ - text: Regular text
+ - tool_use: Tool invocation
+ - tool_result: Tool output
+
+ Args:
+ chunk: Anthropic SSE event chunk
+
+ Returns:
+ ParsedDelta with extracted content
+ """
+ result = ParsedDelta()
+ chunk_type = chunk.get("type", "")
+
+ if chunk_type == "content_block_start":
+ block = chunk.get("content_block", {})
+ if block.get("type") == self.BLOCK_THINKING:
+ thinking = block.get("thinking", "")
+ if thinking:
+ self._thinking_buffer = thinking
+ result.thinking = self._thinking_buffer
+
+ elif chunk_type == "content_block_delta":
+ delta = chunk.get("delta", {})
+ delta_type = delta.get("type", "")
+
+ if delta_type == "thinking_delta":
+ thinking = delta.get("thinking", "")
+ self._thinking_buffer += thinking
+ result.thinking = self._thinking_buffer
+
+ elif delta_type == "text_delta":
+ text = delta.get("text", "")
+ self._text_buffer += text
+ result.text = self._text_buffer
+
+ elif delta_type == "partial_json":
+ # Partial JSON for tool calls
+ pass
+
+ elif chunk_type == "content_block_stop":
+ result.is_complete = True
+
+ return result
+
+ def parse_chunk(self, chunk: Dict, provider: str = "openai") -> ParsedDelta:
+ """Parse chunk based on provider.
+
+ Args:
+ chunk: Response chunk from LLM
+ provider: Provider name ("openai", "anthropic", "deepseek", "minimax")
+
+ Returns:
+ ParsedDelta with extracted content
+ """
+ if provider == "anthropic":
+ return self.parse_anthropic(chunk)
+
+ # Default to OpenAI format
+ return self.parse_openai(chunk)
+
+ def _extract_thinking_tags(self, content: str) -> tuple:
+ """Extract thinking content from tags.
+
+ Handles multiple tag formats:
+ - MiniMax: <|im_start|>thinking...<|im_end|>
+ - Standard: ...
+
+ Args:
+ content: Raw content string from LLM
+
+ Returns:
+ Tuple of (thinking_content, clean_text)
+ """
+ thinking_parts = []
+ clean_parts = []
+ i = 0
+
+ while i < len(content):
+ remaining = content[i:].lower()
+
+ # Check for MiniMax format
+ if remaining.startswith("<|im_start|>thinking"):
+ end_tag = "<|im_end|>"
+ start = i + 21 # len("<|im_start|>thinking")
+ end = content.find(end_tag, start)
+ if end != -1:
+ thinking_parts.append(content[start:end])
+ i = end + len(end_tag)
+ continue
+
+ # Check for standard format
+ if remaining.startswith(""):
+ end_tag = ""
+ start = i + 7 # len("")
+ end = content.find(end_tag, start)
+ if end != -1:
+ thinking_parts.append(content[start:end])
+ i = end + len(end_tag)
+ continue
+
+ # Regular character
+ clean_parts.append(content[i])
+ i += 1
+
+ return "".join(thinking_parts), "".join(clean_parts)
+
+ def has_thinking_tags(self, content: str) -> bool:
+ """Check if content contains thinking tags.
+
+ Args:
+ content: Raw content string
+
+ Returns:
+ True if content contains thinking tags
+ """
+ if not content:
+ return False
+ lower = content.lower()
+ return "<|im_start|>thinking" in lower or "" in lower
+
+
+# Global parser instance
+llm_parser = LLMResponseParser()
diff --git a/luxx/services/process_result.py b/luxx/services/process_result.py
new file mode 100644
index 0000000..91cc555
--- /dev/null
+++ b/luxx/services/process_result.py
@@ -0,0 +1,37 @@
+"""ProcessResult - Result of processing an SSE line."""
+
+
+class ProcessResult:
+ """Result of processing an SSE line.
+
+ Attributes:
+ events: List of SSE event strings to yield
+ has_error: Whether an error occurred
+ error_content: Error message if any
+ has_content: Whether content was received
+ has_tool_calls: Whether tool calls were received
+ """
+
+ def __init__(self):
+ self.events: list = []
+ self.has_error: bool = False
+ self.error_content: str = ""
+ self.has_content: bool = False
+ self.has_tool_calls: bool = False
+
+ def add_event(self, event: str):
+ """Add an event to the result."""
+ self.events.append(event)
+
+ def set_error(self, content: str):
+ """Set error state."""
+ self.has_error = True
+ self.error_content = content
+
+ def set_content(self):
+ """Mark that content was received."""
+ self.has_content = True
+
+ def set_tool_calls(self):
+ """Mark that tool calls were received."""
+ self.has_tool_calls = True
diff --git a/luxx/services/stream_context.py b/luxx/services/stream_context.py
new file mode 100644
index 0000000..7ebde9a
--- /dev/null
+++ b/luxx/services/stream_context.py
@@ -0,0 +1,185 @@
+"""StreamContext - Manages streaming state transitions during LLM response.
+
+Tracks steps in order:
+- thinking: Model reasoning content
+- text: Model response text
+- tool_call: Tool invocation request
+- tool_result: Tool execution result
+
+Each step has unique id and index for frontend rendering.
+"""
+import json
+from typing import List, Dict, Optional
+
+
+def _sse_event(event: str, data: dict) -> str:
+ """Format a Server-Sent Event string."""
+ return f"event: {event}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n"
+
+
+class StreamContext:
+ """Manages streaming state transitions during LLM response."""
+
+ def __init__(self):
+ self.step_index = 0
+ self.current_step_id = None
+ self.current_step_idx = None
+ self.current_step_type = None
+ self.full_content = ""
+ self.full_thinking = ""
+ self.all_steps = []
+ self.all_tool_calls = []
+ self.all_tool_results = []
+ self.tool_calls_list = []
+ self._last_message_id = None
+ self._last_token_count = 0
+ self._last_usage = None
+
+ def reset(self):
+ """Reset state for new iteration."""
+ self.current_step_id = None
+ self.current_step_idx = None
+ self.current_step_type = None
+ self.full_content = ""
+ self.full_thinking = ""
+ self.tool_calls_list = []
+
+ def start_step(self, step_type: str) -> str:
+ """Start a new step with unique ID."""
+ self.current_step_idx = self.step_index
+ self.current_step_id = f"step-{self.step_index}"
+ self.current_step_type = step_type
+ self.step_index += 1
+ return self.current_step_id
+
+ def finalize_step(self):
+ """Save current step to all_steps."""
+ if self.current_step_id is None:
+ return
+
+ content = self.full_content if self.current_step_type == "text" else self.full_thinking
+ self.all_steps.append({
+ "id": self.current_step_id,
+ "index": self.current_step_idx,
+ "type": self.current_step_type,
+ "content": content
+ })
+
+ def handle_thinking(self, delta: Dict) -> Optional[str]:
+ """Handle reasoning delta from LLM."""
+ reasoning = delta.get("reasoning_content", "")
+ if not reasoning:
+ return None
+
+ if not self.full_thinking:
+ self.start_step("thinking")
+
+ self.full_thinking += reasoning
+ return _sse_event("process_step", {
+ "step": {
+ "id": self.current_step_id,
+ "index": self.current_step_idx,
+ "type": "thinking",
+ "content": self.full_thinking
+ }
+ })
+
+ def handle_text(self, delta: Dict) -> Optional[str]:
+ """Handle content delta from LLM."""
+ content = delta.get("content", "")
+ if not content:
+ return None
+
+ if not self.full_content:
+ self.start_step("text")
+
+ self.full_content += content
+ return _sse_event("process_step", {
+ "step": {
+ "id": self.current_step_id,
+ "index": self.current_step_idx,
+ "type": "text",
+ "content": self.full_content
+ }
+ })
+
+ def accumulate_tool_call(self, tc_delta: Dict):
+ """Accumulate tool call delta."""
+ idx = tc_delta.get("index", 0)
+ if idx >= len(self.tool_calls_list):
+ self.tool_calls_list.append({
+ "id": tc_delta.get("id", ""),
+ "type": "function",
+ "function": {"name": "", "arguments": ""}
+ })
+
+ func = tc_delta.get("function", {})
+ if func.get("name"):
+ self.tool_calls_list[idx]["function"]["name"] += func["name"]
+ if func.get("arguments"):
+ self.tool_calls_list[idx]["function"]["arguments"] += func["arguments"]
+
+ def emit_tool_calls(self) -> List[str]:
+ """Emit tool call steps, return SSE events."""
+ events = []
+ for tc in self.tool_calls_list:
+ step_id = f"step-{self.step_index}"
+ self.step_index += 1
+
+ step = {
+ "id": step_id,
+ "index": self.step_index - 1,
+ "type": "tool_call",
+ "id_ref": tc.get("id", ""),
+ "name": tc["function"]["name"],
+ "arguments": tc["function"]["arguments"]
+ }
+ self.all_steps.append(step)
+ self.all_tool_calls.append(tc)
+ events.append(_sse_event("process_step", {"step": step}))
+
+ return events
+
+ def emit_tool_result(self, result: Dict, ref_step_id: str) -> tuple:
+ """Emit tool result step, return (step, event)."""
+ step_id = f"step-{self.step_index}"
+ self.step_index += 1
+
+ content = result.get("content", "")
+ success = True
+ try:
+ parsed = json.loads(content)
+ if isinstance(parsed, dict):
+ success = parsed.get("success", True)
+ except (json.JSONDecodeError, TypeError):
+ pass
+
+ step = {
+ "id": step_id,
+ "index": self.step_index - 1,
+ "type": "tool_result",
+ "id_ref": ref_step_id,
+ "name": result.get("name", ""),
+ "content": content,
+ "success": success
+ }
+ self.all_steps.append(step)
+ self.all_tool_results.append({
+ "role": "tool",
+ "tool_call_id": result.get("tool_call_id", ""),
+ "content": content
+ })
+
+ return step, _sse_event("process_step", {"step": step})
+
+ def set_completion(self, msg_id: str, token_count: int, usage: dict):
+ """Set completion info for saving."""
+ self._last_message_id = msg_id
+ self._last_token_count = token_count
+ self._last_usage = usage
+
+ def reset_completion(self):
+ """Reset completion info."""
+ self._last_message_id = None
+ self._last_token_count = 0
+ self._last_usage = None