refactor: 优化文件结构

This commit is contained in:
ViperEkura 2026-04-24 12:45:04 +08:00
parent 08d2a2be98
commit 1bdccb437b
8 changed files with 1050 additions and 425 deletions

View File

@ -29,7 +29,11 @@ luxx/
│ ├── providers.py # LLM 提供商管理
│ └── tools.py # 工具管理
├── services/ # 服务层
│ ├── chat.py # 聊天服务 (Agentic Loop)
│ ├── chat.py # 聊天服务门面
│ ├── agentic_loop.py # Agentic Loop 执行器
│ ├── stream_context.py# 流式状态管理
│ ├── llm_response.py # LLM 响应解析器
│ ├── process_result.py# 处理结果
│ └── llm_client.py # LLM 客户端
├── tools/ # 工具系统
│ ├── core.py # 核心类 (ToolRegistry, ToolDefinition, ToolResult)
@ -308,6 +312,28 @@ ToolExecutor 返回结果
### 6. 服务层
#### LLMResponseParser (`services/llm_response.py`)
统一解析器,兼容多种 LLM API 格式:
- **OpenAI**: `delta.content`, `delta.tool_calls`
- **DeepSeek**: `delta.content`, `delta.reasoning_content`
- **Anthropic**: `content_block` 类型事件
- **MiniMax**: `<|im_start|>thinking...<|im_end|>` 标签
```python
from luxx.services.llm_response import llm_parser
# 解析 OpenAI 格式
parsed = llm_parser.parse_openai(delta)
# 解析 Anthropic 格式
parsed = llm_parser.parse_anthropic(chunk)
# 返回 ParsedDelta
parsed.thinking # 思考内容
parsed.text # 文本内容
parsed.tool_calls # 工具调用
```
#### ChatService (`services/chat.py`)
核心聊天服务:
- Agentic Loop 迭代执行(最多 10 轮)
@ -315,9 +341,22 @@ ToolExecutor 返回结果
- 工具调用编排(并行执行)
- 消息历史管理
- 自动重试机制
- 支持 thinking_content 提取
- Token 用量追踪
#### AgenticLoop (`services/agentic_loop.py`)
执行 Agentic Loop 的核心循环:
- 调用 LLM 获取响应
- 使用 LLMResponseParser 解析响应
- 管理 thinking/text/tool_call/tool_result 步骤
- 工具并行执行
#### StreamContext (`services/stream_context.py`)
流式状态管理:
- 追踪当前步骤类型和索引
- 累积 thinking 和 text 内容
- 管理 tool_calls 列表
- 生成 SSE 事件
#### LLMClient (`services/llm_client.py`)
LLM API 客户端:
- 多提供商DeepSeek、GLM、OpenAI
@ -364,23 +403,30 @@ sequenceDiagram
participant Client
participant API as POST /messages/stream
participant CS as ChatService
participant AL as AgenticLoop
participant Parser as LLMResponseParser
participant LLM as LLM API
participant TE as ToolExecutor
Client->>API: POST {content, tools, thinking_enabled}
API->>CS: stream_response()
CS->>AL: execute()
loop MAX_ITERATIONS (10)
CS->>LLM: call(messages, tools)
LLM-->>CS: SSE Stream
AL->>LLM: stream_call(messages, tools)
LLM-->>AL: SSE Stream
AL->>Parser: parse_chunk()
Parser-->>AL: ParsedDelta {thinking, text, tool_calls}
alt tool_calls
CS->>TE: process_tool_calls_parallel()
TE-->>CS: tool_results
CS->>CS: 追加到 messages
AL->>TE: process_tool_calls_parallel()
TE-->>AL: tool_results
AL->>AL: 追加到 messages
end
end
AL->>CS: done event
CS->>CS: _save_message()
CS->>API: SSE Stream
API-->>Client: 流式响应

View File

@ -1,3 +1,4 @@
"""Services module"""
from luxx.services.llm_client import LLMClient, llm_client, LLMResponse
from luxx.services.chat import ChatService, chat_service
from luxx.services.llm_response import LLMResponseParser, llm_parser, ParsedDelta

View File

@ -0,0 +1,275 @@
"""AgenticLoop - Executes the Agentic Loop: LLM + Tools iteration.
The loop:
1. Call LLM with messages and tools
2. Check for tool calls in response
3. Execute tools in parallel
4. Add results to messages
5. Repeat (max 10 iterations)
6. Return final response
"""
import json
import uuid
import logging
import traceback
from typing import List, Dict, Any, AsyncGenerator
from luxx.tools.executor import ToolExecutor
from luxx.services.llm_client import LLMClient
from luxx.services.stream_context import StreamContext, _sse_event
from luxx.services.process_result import ProcessResult
from luxx.services.llm_response import llm_parser
logger = logging.getLogger(__name__)
# Maximum iterations to prevent infinite loops
MAX_ITERATIONS = 10
def _parse_sse_line(line: str) -> tuple:
"""Parse SSE line into (event_type, data_str)."""
event_type = None
data_str = None
for part in line.strip().split('\n'):
if part.startswith('event: '):
event_type = part[7:].strip()
elif part.startswith('data: '):
data_str = part[6:].strip()
return event_type, data_str
class AgenticLoop:
"""Executes the Agentic Loop: LLM + Tools iteration."""
def __init__(self, tool_executor: ToolExecutor):
self.tool_executor = tool_executor
async def execute(
self,
llm: LLMClient,
model: str,
messages: List[Dict],
tools: list,
temperature: float,
max_tokens: int,
thinking_enabled: bool,
context: 'StreamContext',
tool_context: dict = None
) -> AsyncGenerator[str, None]:
"""Execute the agentic loop.
Yields SSE events for each step.
"""
total_usage = {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0}
for iteration in range(MAX_ITERATIONS):
context.reset()
has_error = False
# Stream LLM response
async for sse_line in llm.stream_call(
model=model,
messages=messages,
tools=tools,
temperature=temperature,
max_tokens=max_tokens,
thinking_enabled=thinking_enabled
):
# Process stream line
result = self._process_stream_line(sse_line, context, total_usage)
# Yield events
for event in result.events:
yield event
# Check for errors
if result.has_error:
has_error = True
break
# If error occurred, break the loop
if has_error:
break
# Finalize current step
context.finalize_step()
# Check for tool calls
if context.tool_calls_list:
# Execute tools and yield events
for event in self._execute_tools(context, messages, tool_context):
yield event
continue
# No tools - complete
for event in self._complete(context, total_usage):
yield event
return
# Max iterations exceeded or error occurred
if not has_error:
yield _sse_event("error", {"content": "Exceeded maximum tool call iterations"})
def _process_stream_line(self, sse_line: str, ctx: 'StreamContext',
total_usage: dict) -> ProcessResult:
"""Process single SSE line from LLM, return result with events and flags."""
result = ProcessResult()
event_type, data_str = _parse_sse_line(sse_line)
if not data_str:
return result
# Handle upstream errors
if event_type == 'error':
try:
error_data = json.loads(data_str)
error_content = error_data.get("content", "Unknown error")
except json.JSONDecodeError:
error_content = data_str
result.set_error(error_content)
result.add_event(_sse_event("error", {"content": error_content}))
return result
try:
chunk = json.loads(data_str)
except json.JSONDecodeError:
error_msg = f"Parse error: {data_str[:50]}"
result.set_error(error_msg)
result.add_event(_sse_event("error", {"content": error_msg}))
return result
# Extract usage
if "usage" in chunk and chunk["usage"]:
usage = chunk["usage"]
total_usage.update({
"prompt_tokens": usage.get("prompt_tokens", 0),
"completion_tokens": usage.get("completion_tokens", 0),
"total_tokens": usage.get("total_tokens", 0)
})
# Handle API errors
if "error" in chunk:
error_msg = chunk["error"].get("message", str(chunk["error"]))
result.set_error(error_msg)
result.add_event(_sse_event("error", {"content": f"API Error: {error_msg}"}))
return result
# Get delta
choices = chunk.get("choices", [])
if not choices:
# Non-standard format: check for content directly
content = chunk.get("content") or ""
if content:
# Check for thinking tags in content
thinking_part, clean_text = llm_parser._extract_thinking_tags(content)
if thinking_part:
ctx.full_thinking = (ctx.full_thinking or "") + thinking_part
if not ctx.current_step_id or ctx.current_step_type != "thinking":
ctx.start_step("thinking")
result.add_event(_sse_event("process_step", {
"step": {"id": ctx.current_step_id, "index": ctx.current_step_idx, "type": "thinking", "content": ctx.full_thinking}
}))
result.set_content()
if clean_text:
ctx.full_content = (ctx.full_content or "") + clean_text
if not ctx.current_step_id or ctx.current_step_type != "text":
ctx.start_step("text")
result.add_event(_sse_event("process_step", {
"step": {"id": ctx.current_step_id, "index": ctx.current_step_idx, "type": "text", "content": ctx.full_content}
}))
result.set_content()
return result
delta = choices[0].get("delta", {})
# Parse delta using unified parser
parsed = llm_parser.parse_openai(delta)
# Process thinking content
if parsed.thinking:
ctx.full_thinking = parsed.thinking
if not ctx.current_step_id or ctx.current_step_type != "thinking":
ctx.start_step("thinking")
result.add_event(_sse_event("process_step", {
"step": {
"id": ctx.current_step_id,
"index": ctx.current_step_idx,
"type": "thinking",
"content": ctx.full_thinking
}
}))
result.set_content()
# Process text content
if parsed.text:
ctx.full_content = parsed.text
if not ctx.current_step_id or ctx.current_step_type != "text":
ctx.start_step("text")
result.add_event(_sse_event("process_step", {
"step": {
"id": ctx.current_step_id,
"index": ctx.current_step_idx,
"type": "text",
"content": ctx.full_content
}
}))
result.set_content()
# Accumulate tool calls
for tc in parsed.tool_calls or delta.get("tool_calls", []):
ctx.accumulate_tool_call(tc)
result.set_tool_calls()
return result
def _execute_tools(self, ctx: 'StreamContext', messages: list,
tool_context: dict = None) -> List[str]:
"""Execute tools and return list of events."""
events = []
# Emit tool call steps
for event in ctx.emit_tool_calls():
events.append(event)
# Execute in parallel
tool_results = self.tool_executor.process_tool_calls_parallel(
ctx.tool_calls_list, tool_context or {}
)
# Get tool call IDs for result linking
tool_ids = [tc.get("id") for tc in ctx.tool_calls_list]
tool_step_ids = [
s["id"] for s in ctx.all_steps
if s["type"] == "tool_call" and s.get("id_ref") in tool_ids
]
# Emit tool result steps
for i, (tr, tc) in enumerate(zip(tool_results, ctx.tool_calls_list)):
ref_id = tool_step_ids[i] if i < len(tool_step_ids) else f"step-{len(ctx.all_steps) - len(tool_results) + i}"
_, event = ctx.emit_tool_result(tr, ref_id)
events.append(event)
# Prepare for next iteration
messages.append({
"role": "assistant",
"content": ctx.full_content or "",
"tool_calls": ctx.tool_calls_list
})
messages.extend(ctx.all_tool_results[-len(tool_results):])
return events
def _complete(self, ctx: 'StreamContext', total_usage: dict) -> List[str]:
"""Complete the loop and return list of events."""
token_count = total_usage.get("completion_tokens") or len(ctx.full_content) // 4
msg_id = str(uuid.uuid4())
logger.info(f"[TOKEN] usage={total_usage}, count={token_count}")
ctx.set_completion(msg_id, token_count, total_usage)
return [_sse_event("done", {
"message_id": msg_id,
"token_count": token_count,
"usage": total_usage
})]

View File

@ -1,34 +1,99 @@
"""Chat service module"""
import json
import uuid
import logging
from typing import List, Dict, Any, AsyncGenerator, Optional
"""Chat service module with Agentic Loop pattern.
This module provides the core chat service that orchestrates:
- StreamContext: Manages streaming state transitions
- MessageBuilder: Constructs message lists
- AgenticLoop: Executes the agentic loop (LLM + tools iteration)
- ChatService: Core chat service facade
"""
import json
import logging
import traceback
import httpx
from typing import List, Dict, Any, AsyncGenerator
from luxx.models import Conversation, Message
from luxx.tools.executor import ToolExecutor
from luxx.tools.core import registry
from luxx.services.llm_client import LLMClient
from luxx.services.stream_context import StreamContext
from luxx.services.agentic_loop import AgenticLoop
from luxx.config import config
logger = logging.getLogger(__name__)
# Maximum iterations to prevent infinite loops
MAX_ITERATIONS = 10
def _sse_event(event: str, data: dict) -> str:
"""Format a Server-Sent Event string."""
return f"event: {event}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n"
# ============== MessageBuilder ==============
class MessageBuilder:
"""Builds message lists for LLM requests."""
def __init__(self):
self.messages = []
def add_system(self, content: str) -> 'MessageBuilder':
"""Add system message."""
self.messages.append({"role": "system", "content": content})
return self
def add_user(self, content: str, attachments: list = None) -> 'MessageBuilder':
"""Add user message in JSON format."""
msg_content = json.dumps({
"text": content,
"attachments": attachments or []
}, ensure_ascii=False)
self.messages.append({"role": "user", "content": msg_content})
return self
def add_assistant(self, content: str, tool_calls: list = None) -> 'MessageBuilder':
"""Add assistant message."""
msg = {"role": "assistant", "content": content}
if tool_calls:
msg["tool_calls"] = tool_calls
self.messages.append(msg)
return self
def add_tool_result(self, tool_call_id: str, content: str) -> 'MessageBuilder':
"""Add tool result message."""
self.messages.append({
"role": "tool",
"tool_call_id": tool_call_id,
"content": content
})
return self
def build(self) -> List[Dict]:
"""Build and return message list."""
return self.messages.copy()
@staticmethod
def extract_text(content: str) -> str:
"""Extract text from message content (supports JSON format)."""
if not content:
return ""
try:
parsed = json.loads(content)
if isinstance(parsed, dict):
return parsed.get("text", content)
except (json.JSONDecodeError, TypeError):
pass
return content
def get_llm_client(conversation: Conversation = None):
"""Get LLM client, optionally using conversation's provider. Returns (client, max_tokens)"""
max_tokens = None
if conversation and conversation.provider_id:
# ============== Factory Function ==============
def get_llm_client(conversation=None) -> tuple:
"""Get LLM client based on conversation provider. Returns (client, max_tokens)"""
from luxx.models import LLMProvider
from luxx.database import SessionLocal
max_tokens = None
if conversation and conversation.provider_id:
db = SessionLocal()
try:
provider = db.query(LLMProvider).filter(LLMProvider.id == conversation.provider_id).first()
provider = db.query(LLMProvider).filter(
LLMProvider.id == conversation.provider_id
).first()
if provider:
max_tokens = provider.max_tokens
client = LLMClient(
@ -40,184 +105,27 @@ def get_llm_client(conversation: Conversation = None):
finally:
db.close()
# Fallback to global config
client = LLMClient()
return client, max_tokens
return LLMClient(), max_tokens
class StreamContext:
"""Context for streaming response state management."""
def __init__(
self,
step_index: int = 0,
current_step_id: str = None,
current_step_idx: int = None,
current_stream_type: str = None,
full_content: str = "",
full_thinking: str = ""
):
self.step_index = step_index
self.current_step_id = current_step_id
self.current_step_idx = current_step_idx
self.current_stream_type = current_stream_type
self.full_content = full_content
self.full_thinking = full_thinking
self.all_steps = []
self.all_tool_calls = []
self.all_tool_results = []
self.tool_calls_list = []
def reset_iteration(self):
"""Reset streaming step tracker for new iteration."""
self.current_step_id = None
self.current_step_idx = None
self.current_stream_type = None
self.full_content = ""
self.full_thinking = ""
self.tool_calls_list = []
def start_stream_step(self, step_type: str) -> str:
"""Start a new streaming step. Returns the step_id."""
self.current_step_idx = self.step_index
self.current_step_id = f"step-{self.step_index}"
self.current_stream_type = step_type
self.step_index += 1
return self.current_step_id
def yield_stream_step(self, step_type: str, content: str) -> Dict[str, Any]:
"""Yield a streaming step event."""
return _sse_event("process_step", {
"step": {
"id": self.current_step_id,
"index": self.current_step_idx,
"type": step_type,
"content": content
}
})
def save_streaming_step(self):
"""Save the current streaming step to all_steps."""
if self.current_step_id is None:
return
if self.current_stream_type == "thinking":
self.all_steps.append({
"id": self.current_step_id,
"index": self.current_step_idx,
"type": "thinking",
"content": self.full_thinking
})
elif self.current_stream_type == "text":
self.all_steps.append({
"id": self.current_step_id,
"index": self.current_step_idx,
"type": "text",
"content": self.full_content
})
def handle_thinking_stream(self, delta: Dict) -> Optional[Dict]:
"""Handle reasoning/thinking delta. Returns yield_obj if step was yielded."""
reasoning = delta.get("reasoning_content", "")
if not reasoning:
return None
prev_len = len(self.full_thinking)
self.full_thinking += reasoning
if prev_len == 0: # New thinking stream started
self.start_stream_step("thinking")
return self.yield_stream_step("thinking", self.full_thinking)
def handle_text_stream(self, delta: Dict) -> Optional[Dict]:
"""Handle content delta. Returns yield_obj if step was yielded."""
content = delta.get("content", "")
if not content:
return None
prev_len = len(self.full_content)
self.full_content += content
if prev_len == 0: # New text stream started
self.start_stream_step("text")
return self.yield_stream_step("text", self.full_content)
def handle_tool_call(self) -> tuple:
"""Handle tool calls. Returns (tool_call_step_ids, tool_call_steps, yield_objs)."""
tool_call_step_ids = []
tool_call_steps = []
yield_objs = []
for tc in self.tool_calls_list:
call_step_idx = self.step_index
call_step_id = f"step-{self.step_index}"
tool_call_step_ids.append(call_step_id)
self.step_index += 1
call_step = {
"id": call_step_id,
"index": call_step_idx,
"type": "tool_call",
"id_ref": tc.get("id", ""),
"name": tc["function"]["name"],
"arguments": tc["function"]["arguments"]
}
tool_call_steps.append(call_step)
yield_objs.append(_sse_event("process_step", {"step": call_step}))
return tool_call_step_ids, tool_call_steps, yield_objs
def handle_tool_result(self, tool_result: Dict, tool_call_step_id: str) -> tuple:
"""Handle single tool result. Returns (result_step, yield_obj)."""
result_step_idx = self.step_index
result_step_id = f"step-{self.step_index}"
self.step_index += 1
content = tool_result.get("content", "")
success = True
try:
content_obj = json.loads(content)
if isinstance(content_obj, dict):
success = content_obj.get("success", True)
except:
pass
result_step = {
"id": result_step_id,
"index": result_step_idx,
"type": "tool_result",
"id_ref": tool_call_step_id,
"name": tool_result.get("name", ""),
"content": content,
"success": success
}
return result_step, _sse_event("process_step", {"step": result_step})
# ============== ChatService ==============
class ChatService:
"""Chat service with tool support"""
"""Core chat service with Agentic Loop support."""
def __init__(self):
self.tool_executor = ToolExecutor()
self.agentic_loop = AgenticLoop(self.tool_executor)
def build_messages(
self,
conversation: Conversation,
include_system: bool = True
) -> List[Dict[str, str]]:
"""Build message list"""
def build_messages(self, conversation, include_system: bool = True) -> List[Dict]:
"""Build message list from conversation history."""
from luxx.database import SessionLocal
from luxx.models import Message
messages = []
if include_system and conversation.system_prompt:
messages.append({
"role": "system",
"content": conversation.system_prompt
})
messages.append({"role": "system", "content": conversation.system_prompt})
db = SessionLocal()
try:
@ -226,28 +134,23 @@ class ChatService:
).order_by(Message.created_at).all()
for msg in db_messages:
# Parse JSON content if possible
try:
content_obj = json.loads(msg.content) if msg.content else {}
if isinstance(content_obj, dict):
content = content_obj.get("text", msg.content)
else:
content = msg.content
except (json.JSONDecodeError, TypeError):
content = msg.content
messages.append({
"role": msg.role,
"content": content
})
content = MessageBuilder.extract_text(msg.content)
messages.append({"role": msg.role, "content": content})
finally:
db.close()
return messages
def _get_tools(self, enabled_tools: list) -> list:
"""Filter tools based on enabled_tools list."""
if not enabled_tools:
return []
return [t for t in registry.list_all()
if t.get("function", {}).get("name") in enabled_tools]
async def stream_response(
self,
conversation: Conversation,
conversation,
user_message: str,
thinking_enabled: bool = False,
enabled_tools: list = None,
@ -255,256 +158,118 @@ class ChatService:
username: str = None,
workspace: str = None,
user_permission_level: int = 1
) -> AsyncGenerator[Dict[str, str], None]:
"""
Streaming response generator
Yields raw SSE event strings for direct forwarding.
"""
) -> AsyncGenerator[str, None]:
"""Streaming response with Agentic Loop."""
try:
# Build initial messages
messages = self.build_messages(conversation)
messages.append({
"role": "user",
"content": json.dumps({"text": user_message, "attachments": []})
})
# Get tools based on enabled_tools filter
if enabled_tools:
tools = [t for t in registry.list_all() if t.get("function", {}).get("name") in enabled_tools]
else:
tools = []
# Get tools and LLM client
tools = self._get_tools(enabled_tools)
llm, provider_max_tokens = get_llm_client(conversation)
model = conversation.model or llm.default_model or "gpt-4"
# 直接使用 provider 的 max_tokens
max_tokens = provider_max_tokens
max_tokens = provider_max_tokens or 8192
# Token usage tracking
total_usage = {
"prompt_tokens": 0,
"completion_tokens": 0,
"total_tokens": 0
}
actual_token_count = 0
# Streaming context for state management
ctx = StreamContext()
for iteration in range(MAX_ITERATIONS):
# Reset streaming context for this iteration
ctx.reset_iteration()
async for sse_line in llm.stream_call(
model=model,
messages=messages,
tools=tools,
temperature=conversation.temperature,
max_tokens=max_tokens or 8192,
thinking_enabled=thinking_enabled or conversation.thinking_enabled
):
# Parse SSE line
# Format: "event: xxx\ndata: {...}\n\n"
event_type = None
data_str = None
for line in sse_line.strip().split('\n'):
if line.startswith('event: '):
event_type = line[7:].strip()
elif line.startswith('data: '):
data_str = line[6:].strip()
if data_str is None:
continue
# Handle error events from LLM
if event_type == 'error':
try:
error_data = json.loads(data_str)
yield _sse_event("error", {"content": error_data.get("content", "Unknown error")})
except json.JSONDecodeError:
yield _sse_event("error", {"content": data_str})
return
# Parse the data
try:
chunk = json.loads(data_str)
except json.JSONDecodeError:
yield _sse_event("error", {"content": f"Failed to parse response: {data_str}"})
return
# 提取 API 返回的 usage 信息
if "usage" in chunk:
usage = chunk["usage"]
total_usage["prompt_tokens"] = usage.get("prompt_tokens", 0)
total_usage["completion_tokens"] = usage.get("completion_tokens", 0)
total_usage["total_tokens"] = usage.get("total_tokens", 0)
# Check for error in response
if "error" in chunk:
error_msg = chunk["error"].get("message", str(chunk["error"]))
yield _sse_event("error", {"content": f"API Error: {error_msg}"})
return
# Get delta
choices = chunk.get("choices", [])
if not choices:
# Check if there's any content in the response (for non-standard LLM responses)
if chunk.get("content") or chunk.get("message"):
content = chunk.get("content") or chunk.get("message", {}).get("content", "")
if content:
prev_len = len(ctx.full_content)
ctx.full_content += content
if prev_len == 0: # New text stream started
ctx.start_stream_step("text")
yield _sse_event("process_step", {
"step": {
"id": ctx.current_step_id if prev_len == 0 else f"step-{ctx.step_index - 1}",
"index": ctx.current_step_idx if prev_len == 0 else ctx.step_index - 1,
"type": "text",
"content": ctx.full_content
}
})
continue
delta = choices[0].get("delta", {})
# Handle reasoning (thinking)
yield_obj = ctx.handle_thinking_stream(delta)
if yield_obj:
yield yield_obj
# Handle content
yield_obj = ctx.handle_text_stream(delta)
if yield_obj:
yield yield_obj
# Accumulate tool calls
tool_calls_delta = delta.get("tool_calls", [])
for tc in tool_calls_delta:
idx = tc.get("index", 0)
if idx >= len(ctx.tool_calls_list):
ctx.tool_calls_list.append({
"id": tc.get("id", ""),
"type": "function",
"function": {"name": "", "arguments": ""}
})
func = tc.get("function", {})
if func.get("name"):
ctx.tool_calls_list[idx]["function"]["name"] += func["name"]
if func.get("arguments"):
ctx.tool_calls_list[idx]["function"]["arguments"] += func["arguments"]
# Save streaming step (thinking or text)
ctx.save_streaming_step()
# Handle tool calls
if ctx.tool_calls_list:
ctx.all_tool_calls.extend(ctx.tool_calls_list)
# Handle tool_call steps
tool_call_step_ids, tool_call_steps, yield_objs = ctx.handle_tool_call()
ctx.all_steps.extend(tool_call_steps)
for yield_obj in yield_objs:
yield yield_obj
# Execute tools
# Tool execution context
tool_context = {
"workspace": workspace,
"user_id": user_id,
"username": username,
"user_permission_level": user_permission_level
}
tool_results = self.tool_executor.process_tool_calls_parallel(
ctx.tool_calls_list, tool_context
)
# Handle tool_result steps
for i, tr in enumerate(tool_results):
tool_call_step_id = tool_call_step_ids[i] if i < len(tool_call_step_ids) else f"step-{i}"
result_step, yield_obj = ctx.handle_tool_result(tr, tool_call_step_id)
ctx.all_steps.append(result_step)
yield yield_obj
# Stream context
ctx = StreamContext()
ctx.all_tool_results.append({
"role": "tool",
"tool_call_id": tr.get("tool_call_id", ""),
"content": tr.get("content", "")
})
# Add assistant message with tool calls for next iteration
messages.append({
"role": "assistant",
"content": ctx.full_content or "",
"tool_calls": ctx.tool_calls_list
})
messages.extend(ctx.all_tool_results[-len(tool_results):])
ctx.all_tool_results = []
continue
# No tool calls - final iteration, save message
msg_id = str(uuid.uuid4())
# 使用 API 返回的真实 completion_tokens如果 API 没返回则降级使用估算值
actual_token_count = total_usage.get("completion_tokens", 0) or len(ctx.full_content) // 4
logger.info(f"[TOKEN] total_usage: {total_usage}, actual_token_count: {actual_token_count}")
# Execute agentic loop
async for event in self.agentic_loop.execute(
llm=llm,
model=model,
messages=messages,
tools=tools,
temperature=conversation.temperature,
max_tokens=max_tokens,
thinking_enabled=thinking_enabled or conversation.thinking_enabled,
context=ctx,
tool_context=tool_context
):
yield event
# Save message after successful completion (only if we have content)
if ctx._last_message_id and (ctx.full_content or ctx.all_tool_calls):
self._save_message(
conversation.id,
msg_id,
ctx._last_message_id,
ctx.full_content,
ctx.all_tool_calls,
ctx.all_tool_results,
ctx.all_steps,
actual_token_count,
total_usage
ctx._last_token_count,
ctx._last_usage
)
yield _sse_event("done", {
"message_id": msg_id,
"token_count": actual_token_count,
"usage": total_usage
})
return
# Max iterations exceeded - save message before error
if ctx.full_content or ctx.all_tool_calls:
msg_id = str(uuid.uuid4())
self._save_message(
conversation.id,
msg_id,
ctx.full_content,
ctx.all_tool_calls,
ctx.all_tool_results,
ctx.all_steps,
actual_token_count,
total_usage
)
yield _sse_event("error", {"content": "Exceeded maximum tool call iterations"})
except Exception as e:
logger.error(f"Stream error: {e}")
logger.error(f"Stream error: {e}\n{traceback.format_exc()}")
yield _sse_event("error", {"content": str(e)})
def _save_message(
async def non_stream_response(
self,
conversation_id: str,
msg_id: str,
full_content: str,
all_tool_calls: list,
all_tool_results: list,
all_steps: list,
token_count: int = 0,
usage: dict = None
):
"""Save the assistant message to database."""
conversation,
user_message: str,
tools_enabled: bool = True,
thinking_enabled: bool = False
) -> Dict[str, Any]:
"""Non-streaming response for simple requests."""
try:
messages = self.build_messages(conversation)
messages.append({
"role": "user",
"content": json.dumps({"text": user_message, "attachments": []})
})
tools = [] if not tools_enabled else None
llm, max_tokens = get_llm_client(conversation)
model = conversation.model or llm.default_model or "gpt-4"
response = await llm.sync_call(
model=model,
messages=messages,
tools=tools,
temperature=conversation.temperature,
max_tokens=max_tokens or 8192,
thinking_enabled=thinking_enabled or conversation.thinking_enabled
)
return {
"success": True,
"content": response.content,
"tool_calls": response.tool_calls,
"usage": response.usage
}
except httpx.HTTPStatusError as e:
error_msg = f"HTTP {e.response.status_code}: {e.response.text[:200] if e.response else 'No response body'}"
logger.error(f"Non-stream HTTP error: {error_msg}")
return {"success": False, "error": error_msg}
except httpx.TimeoutException as e:
logger.error(f"Non-stream timeout: {e}")
return {"success": False, "error": "Request timeout"}
except Exception as e:
logger.error(f"Non-stream error: {type(e).__name__}: {e}\n{traceback.format_exc()}")
return {"success": False, "error": f"{type(e).__name__}: {str(e)}"}
def _save_message(self, conversation_id: str, msg_id: str, full_content: str,
all_tool_calls: list, all_tool_results: list, all_steps: list,
token_count: int = 0, usage: dict = None):
"""Save assistant message to database."""
from luxx.database import SessionLocal
from luxx.models import Message
content_json = {
"text": full_content,
"steps": all_steps
}
content_json = {"text": full_content, "steps": all_steps}
if all_tool_calls:
content_json["tool_calls"] = all_tool_calls
@ -520,12 +285,18 @@ class ChatService:
)
db.add(msg)
db.commit()
except Exception as e:
except Exception:
db.rollback()
raise
finally:
db.close()
# Global chat service
def _sse_event(event: str, data: dict) -> str:
"""Format a Server-Sent Event string."""
return f"event: {event}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n"
# ============== Global Singleton ==============
chat_service = ChatService()

View File

@ -2,6 +2,7 @@
import json
import httpx
import logging
import traceback
from typing import Dict, Any, Optional, List, AsyncGenerator
from luxx.config import config
@ -172,7 +173,7 @@ class LLMClient:
logger.error(f"HTTP error: {status_code}")
yield f"event: error\ndata: {json.dumps({'content': f'HTTP {status_code}: Request failed'})}\n\n"
except Exception as e:
logger.error(f"Exception: {type(e).__name__}: {str(e)}")
logger.error(f"Exception: {type(e).__name__}: {str(e)}\n{traceback.format_exc()}")
yield f"event: error\ndata: {json.dumps({'content': str(e)})}\n\n"

View File

@ -0,0 +1,309 @@
"""LLM Response Parser - Unified parser for multiple LLM API formats.
Supported Providers:
- OpenAI: delta.content, delta.tool_calls
- DeepSeek: delta.content, delta.reasoning_content, delta.tool_calls
- Anthropic: content_block with thinking/text types
- MiniMax: <|im_start|>thinking...<|im_end|> tags in content
Data Flow:
```
LLM API Response (SSE)
LLMResponseParser.parse_chunk()
ParsedDelta { thinking, text, tool_calls }
AgenticLoop._process_stream_line()
SSE Events (process_step)
type: "thinking"
type: "text"
type: "tool_call"
```
API Response Formats:
1. OpenAI Standard (DeepSeek, OpenAI):
```json
{
"choices": [{
"delta": {
"content": "Hello",
"reasoning_content": "Let me think...",
"tool_calls": [{"id": "call_1", "function": {...}}]
}
}]
}
```
2. Anthropic Streaming:
```json
{"type": "content_block_start", "content_block": {"type": "thinking", "thinking": "..."}}
{"type": "content_block_delta", "delta": {"type": "thinking_delta", "thinking": "..."}}
{"type": "content_block_delta", "delta": {"type": "text_delta", "text": "..."}}
{"type": "content_block_stop"}
```
3. MiniMax (with thinking tags in content):
```json
{
"choices": [{
"delta": {
"content": "<|im_start|>thinking分析中...<|im_end|>回复内容"
}
}]
}
```
4. Standard thinking tags:
```json
{
"choices": [{
"delta": {
"content": "<think>思考内容</think>回复内容"
}
}]
}
```
"""
from typing import Dict, Any, Optional, List
from dataclasses import dataclass
@dataclass
class ParsedDelta:
"""Parsed response delta from LLM.
Attributes:
thinking: Thinking/reasoning content
text: Regular text content
tool_calls: Tool call requests
is_complete: Whether this delta completes a content block
"""
thinking: str = ""
text: str = ""
tool_calls: List[Dict] = None
is_complete: bool = False
def __post_init__(self):
if self.tool_calls is None:
self.tool_calls = []
class LLMResponseParser:
"""Unified parser for LLM API response formats.
Usage:
from luxx.services.llm_response import llm_parser
# Parse OpenAI format
delta = {"content": "Hello", "reasoning_content": "Thinking..."}
parsed = llm_parser.parse_openai(delta)
# Parse Anthropic format
chunk = {"type": "content_block_delta", "delta": {"type": "thinking_delta", "thinking": "..."}}
parsed = llm_parser.parse_anthropic(chunk)
# Auto-detect format
parsed = llm_parser.parse_chunk(chunk, provider="anthropic")
"""
# Content block types
BLOCK_THINKING = "thinking"
BLOCK_TEXT = "text"
BLOCK_TOOL_USE = "tool_use"
BLOCK_TOOL_RESULT = "tool_result"
def __init__(self):
self._buffer = ""
self._thinking_buffer = ""
self._text_buffer = ""
def reset(self):
"""Reset parser state for new message."""
self._buffer = ""
self._thinking_buffer = ""
self._text_buffer = ""
def parse_openai(self, delta: Dict) -> ParsedDelta:
"""Parse OpenAI format delta.
Handles:
- OpenAI: delta.content, delta.tool_calls
- DeepSeek: delta.content, delta.reasoning_content, delta.tool_calls
- MiniMax: <|im_start|>thinking...<|im_end|> in content
- Standard: <think>...</think> in content
Args:
delta: Delta object from LLM API response
Returns:
ParsedDelta with extracted thinking, text, and tool_calls
"""
result = ParsedDelta()
# Get thinking content (DeepSeek uses reasoning_content)
thinking = delta.get("reasoning_content") or delta.get("reasoning") or ""
if thinking:
self._thinking_buffer += thinking
result.thinking = self._thinking_buffer
# Get text content
text = delta.get("content") or ""
if text:
# Check for embedded thinking tags (MiniMax format)
thinking_part, clean_text = self._extract_thinking_tags(text)
if thinking_part:
self._thinking_buffer += thinking_part
result.thinking = self._thinking_buffer
if clean_text:
self._text_buffer += clean_text
result.text = self._text_buffer
elif thinking_part := delta.get("thinking"):
# Some providers use "thinking" field directly
self._thinking_buffer += thinking_part
result.thinking = self._thinking_buffer
# Tool calls
result.tool_calls = delta.get("tool_calls") or []
return result
def parse_anthropic(self, chunk: Dict) -> ParsedDelta:
"""Parse Anthropic streaming format.
Anthropic uses a different event structure:
- content_block_start: Begin a content block
- content_block_delta: Incremental content
- content_block_stop: End of content blocks
Content block types:
- thinking: Model reasoning
- text: Regular text
- tool_use: Tool invocation
- tool_result: Tool output
Args:
chunk: Anthropic SSE event chunk
Returns:
ParsedDelta with extracted content
"""
result = ParsedDelta()
chunk_type = chunk.get("type", "")
if chunk_type == "content_block_start":
block = chunk.get("content_block", {})
if block.get("type") == self.BLOCK_THINKING:
thinking = block.get("thinking", "")
if thinking:
self._thinking_buffer = thinking
result.thinking = self._thinking_buffer
elif chunk_type == "content_block_delta":
delta = chunk.get("delta", {})
delta_type = delta.get("type", "")
if delta_type == "thinking_delta":
thinking = delta.get("thinking", "")
self._thinking_buffer += thinking
result.thinking = self._thinking_buffer
elif delta_type == "text_delta":
text = delta.get("text", "")
self._text_buffer += text
result.text = self._text_buffer
elif delta_type == "partial_json":
# Partial JSON for tool calls
pass
elif chunk_type == "content_block_stop":
result.is_complete = True
return result
def parse_chunk(self, chunk: Dict, provider: str = "openai") -> ParsedDelta:
"""Parse chunk based on provider.
Args:
chunk: Response chunk from LLM
provider: Provider name ("openai", "anthropic", "deepseek", "minimax")
Returns:
ParsedDelta with extracted content
"""
if provider == "anthropic":
return self.parse_anthropic(chunk)
# Default to OpenAI format
return self.parse_openai(chunk)
def _extract_thinking_tags(self, content: str) -> tuple:
"""Extract thinking content from tags.
Handles multiple tag formats:
- MiniMax: <|im_start|>thinking...<|im_end|>
- Standard: <think>...</think>
Args:
content: Raw content string from LLM
Returns:
Tuple of (thinking_content, clean_text)
"""
thinking_parts = []
clean_parts = []
i = 0
while i < len(content):
remaining = content[i:].lower()
# Check for MiniMax format
if remaining.startswith("<|im_start|>thinking"):
end_tag = "<|im_end|>"
start = i + 21 # len("<|im_start|>thinking")
end = content.find(end_tag, start)
if end != -1:
thinking_parts.append(content[start:end])
i = end + len(end_tag)
continue
# Check for standard format
if remaining.startswith("<think>"):
end_tag = "</think>"
start = i + 7 # len("<think>")
end = content.find(end_tag, start)
if end != -1:
thinking_parts.append(content[start:end])
i = end + len(end_tag)
continue
# Regular character
clean_parts.append(content[i])
i += 1
return "".join(thinking_parts), "".join(clean_parts)
def has_thinking_tags(self, content: str) -> bool:
"""Check if content contains thinking tags.
Args:
content: Raw content string
Returns:
True if content contains thinking tags
"""
if not content:
return False
lower = content.lower()
return "<|im_start|>thinking" in lower or "<think>" in lower
# Global parser instance
llm_parser = LLMResponseParser()

View File

@ -0,0 +1,37 @@
"""ProcessResult - Result of processing an SSE line."""
class ProcessResult:
"""Result of processing an SSE line.
Attributes:
events: List of SSE event strings to yield
has_error: Whether an error occurred
error_content: Error message if any
has_content: Whether content was received
has_tool_calls: Whether tool calls were received
"""
def __init__(self):
self.events: list = []
self.has_error: bool = False
self.error_content: str = ""
self.has_content: bool = False
self.has_tool_calls: bool = False
def add_event(self, event: str):
"""Add an event to the result."""
self.events.append(event)
def set_error(self, content: str):
"""Set error state."""
self.has_error = True
self.error_content = content
def set_content(self):
"""Mark that content was received."""
self.has_content = True
def set_tool_calls(self):
"""Mark that tool calls were received."""
self.has_tool_calls = True

View File

@ -0,0 +1,185 @@
"""StreamContext - Manages streaming state transitions during LLM response.
Tracks steps in order:
- thinking: Model reasoning content
- text: Model response text
- tool_call: Tool invocation request
- tool_result: Tool execution result
Each step has unique id and index for frontend rendering.
"""
import json
from typing import List, Dict, Optional
def _sse_event(event: str, data: dict) -> str:
"""Format a Server-Sent Event string."""
return f"event: {event}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n"
class StreamContext:
"""Manages streaming state transitions during LLM response."""
def __init__(self):
self.step_index = 0
self.current_step_id = None
self.current_step_idx = None
self.current_step_type = None
self.full_content = ""
self.full_thinking = ""
self.all_steps = []
self.all_tool_calls = []
self.all_tool_results = []
self.tool_calls_list = []
self._last_message_id = None
self._last_token_count = 0
self._last_usage = None
def reset(self):
"""Reset state for new iteration."""
self.current_step_id = None
self.current_step_idx = None
self.current_step_type = None
self.full_content = ""
self.full_thinking = ""
self.tool_calls_list = []
def start_step(self, step_type: str) -> str:
"""Start a new step with unique ID."""
self.current_step_idx = self.step_index
self.current_step_id = f"step-{self.step_index}"
self.current_step_type = step_type
self.step_index += 1
return self.current_step_id
def finalize_step(self):
"""Save current step to all_steps."""
if self.current_step_id is None:
return
content = self.full_content if self.current_step_type == "text" else self.full_thinking
self.all_steps.append({
"id": self.current_step_id,
"index": self.current_step_idx,
"type": self.current_step_type,
"content": content
})
def handle_thinking(self, delta: Dict) -> Optional[str]:
"""Handle reasoning delta from LLM."""
reasoning = delta.get("reasoning_content", "")
if not reasoning:
return None
if not self.full_thinking:
self.start_step("thinking")
self.full_thinking += reasoning
return _sse_event("process_step", {
"step": {
"id": self.current_step_id,
"index": self.current_step_idx,
"type": "thinking",
"content": self.full_thinking
}
})
def handle_text(self, delta: Dict) -> Optional[str]:
"""Handle content delta from LLM."""
content = delta.get("content", "")
if not content:
return None
if not self.full_content:
self.start_step("text")
self.full_content += content
return _sse_event("process_step", {
"step": {
"id": self.current_step_id,
"index": self.current_step_idx,
"type": "text",
"content": self.full_content
}
})
def accumulate_tool_call(self, tc_delta: Dict):
"""Accumulate tool call delta."""
idx = tc_delta.get("index", 0)
if idx >= len(self.tool_calls_list):
self.tool_calls_list.append({
"id": tc_delta.get("id", ""),
"type": "function",
"function": {"name": "", "arguments": ""}
})
func = tc_delta.get("function", {})
if func.get("name"):
self.tool_calls_list[idx]["function"]["name"] += func["name"]
if func.get("arguments"):
self.tool_calls_list[idx]["function"]["arguments"] += func["arguments"]
def emit_tool_calls(self) -> List[str]:
"""Emit tool call steps, return SSE events."""
events = []
for tc in self.tool_calls_list:
step_id = f"step-{self.step_index}"
self.step_index += 1
step = {
"id": step_id,
"index": self.step_index - 1,
"type": "tool_call",
"id_ref": tc.get("id", ""),
"name": tc["function"]["name"],
"arguments": tc["function"]["arguments"]
}
self.all_steps.append(step)
self.all_tool_calls.append(tc)
events.append(_sse_event("process_step", {"step": step}))
return events
def emit_tool_result(self, result: Dict, ref_step_id: str) -> tuple:
"""Emit tool result step, return (step, event)."""
step_id = f"step-{self.step_index}"
self.step_index += 1
content = result.get("content", "")
success = True
try:
parsed = json.loads(content)
if isinstance(parsed, dict):
success = parsed.get("success", True)
except (json.JSONDecodeError, TypeError):
pass
step = {
"id": step_id,
"index": self.step_index - 1,
"type": "tool_result",
"id_ref": ref_step_id,
"name": result.get("name", ""),
"content": content,
"success": success
}
self.all_steps.append(step)
self.all_tool_results.append({
"role": "tool",
"tool_call_id": result.get("tool_call_id", ""),
"content": content
})
return step, _sse_event("process_step", {"step": step})
def set_completion(self, msg_id: str, token_count: int, usage: dict):
"""Set completion info for saving."""
self._last_message_id = msg_id
self._last_token_count = token_count
self._last_usage = usage
def reset_completion(self):
"""Reset completion info."""
self._last_message_id = None
self._last_token_count = 0
self._last_usage = None