From 78af48c8c444f731e1c7b32094b418ef871ee57f Mon Sep 17 00:00:00 2001
From: ViperEkura <3081035982@qq.com>
Date: Sat, 25 Apr 2026 13:17:05 +0800
Subject: [PATCH] =?UTF-8?q?refactor:=20=E5=A2=9E=E5=8A=A0=E6=B6=88?=
=?UTF-8?q?=E6=81=AF=E9=80=82=E9=85=8D?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
luxx/routes/messages.py | 3 +-
luxx/services/__init__.py | 6 +-
luxx/services/agentic_loop.py | 148 +++----
luxx/services/chat.py | 6 +-
luxx/services/llm_adapters/__init__.py | 24 ++
.../llm_adapters/anthropic_adapter.py | 336 ++++++++++++++++
luxx/services/llm_adapters/base.py | 97 +++++
luxx/services/llm_adapters/openai_adapter.py | 191 +++++++++
luxx/services/llm_client.py | 362 ++++++++++++------
luxx/services/llm_response.py | 338 +++-------------
luxx/services/stream_context.py | 38 --
11 files changed, 990 insertions(+), 559 deletions(-)
create mode 100644 luxx/services/llm_adapters/__init__.py
create mode 100644 luxx/services/llm_adapters/anthropic_adapter.py
create mode 100644 luxx/services/llm_adapters/base.py
create mode 100644 luxx/services/llm_adapters/openai_adapter.py
diff --git a/luxx/routes/messages.py b/luxx/routes/messages.py
index fac223d..f9bc76f 100644
--- a/luxx/routes/messages.py
+++ b/luxx/routes/messages.py
@@ -10,11 +10,12 @@ from datetime import datetime
from luxx.database import get_db
from luxx.models import Conversation, Message, User
from luxx.routes.auth import get_current_user
-from luxx.services.chat import chat_service
+from luxx.services.chat import create_chat_service
from luxx.utils.helpers import generate_id, success_response, error_response
router = APIRouter(prefix="/messages", tags=["Messages"])
+chat_service = create_chat_service()
class MessageCreate(BaseModel):
diff --git a/luxx/services/__init__.py b/luxx/services/__init__.py
index 6cb90eb..a18b4b9 100644
--- a/luxx/services/__init__.py
+++ b/luxx/services/__init__.py
@@ -1,4 +1,4 @@
"""Services module"""
-from luxx.services.llm_client import LLMClient, llm_client, LLMResponse
-from luxx.services.chat import ChatService, chat_service
-from luxx.services.llm_response import LLMResponseParser, llm_parser, ParsedDelta
+from luxx.services.llm_client import LLMClient
+from luxx.services.llm_response import ParsedDelta, LLMResponse
+from luxx.services.chat import ChatService, create_chat_service
diff --git a/luxx/services/agentic_loop.py b/luxx/services/agentic_loop.py
index 15fb2cb..d5ed504 100644
--- a/luxx/services/agentic_loop.py
+++ b/luxx/services/agentic_loop.py
@@ -8,17 +8,15 @@ The loop:
5. Repeat (max 10 iterations)
6. Return final response
"""
-import json
import uuid
import logging
-import traceback
-from typing import List, Dict, Any, AsyncGenerator
+from typing import List, Dict, AsyncGenerator
from luxx.tools.executor import ToolExecutor
from luxx.services.llm_client import LLMClient
from luxx.services.stream_context import StreamContext, _sse_event
from luxx.services.process_result import ProcessResult
-from luxx.services.llm_response import llm_parser
+from luxx.services.llm_response import ParsedDelta
logger = logging.getLogger(__name__)
@@ -26,20 +24,11 @@ logger = logging.getLogger(__name__)
MAX_ITERATIONS = 10
-def _parse_sse_line(line: str) -> tuple:
- """Parse SSE line into (event_type, data_str)."""
- event_type = None
- data_str = None
- for part in line.strip().split('\n'):
- if part.startswith('event: '):
- event_type = part[7:].strip()
- elif part.startswith('data: '):
- data_str = part[6:].strip()
- return event_type, data_str
-
-
class AgenticLoop:
- """Executes the Agentic Loop: LLM + Tools iteration."""
+ """Executes the Agentic Loop: LLM + Tools iteration.
+
+ Supports multiple LLM Providers, auto-adapts response format.
+ """
def __init__(self, tool_executor: ToolExecutor):
self.tool_executor = tool_executor
@@ -66,8 +55,8 @@ class AgenticLoop:
context.reset()
has_error = False
- # Stream LLM response
- async for sse_line in llm.stream_call(
+ # Stream LLM response - now yields ParsedDelta directly
+ async for delta in llm.stream_call(
model=model,
messages=messages,
tools=tools,
@@ -75,8 +64,8 @@ class AgenticLoop:
max_tokens=max_tokens,
thinking_enabled=thinking_enabled
):
- # Process stream line
- result = self._process_stream_line(sse_line, context, total_usage)
+ # Process parsed delta
+ result = self._process_delta(delta, context, total_usage)
# Yield events
for event in result.events:
@@ -110,85 +99,41 @@ class AgenticLoop:
if not has_error:
yield _sse_event("error", {"content": "Exceeded maximum tool call iterations"})
- def _process_stream_line(self, sse_line: str, ctx: 'StreamContext',
- total_usage: dict) -> ProcessResult:
- """Process single SSE line from LLM, return result with events and flags."""
+ def _process_delta(
+ self,
+ delta: ParsedDelta,
+ ctx: 'StreamContext',
+ total_usage: dict
+ ) -> ProcessResult:
+ """Process ParsedDelta from adapter, return result with events and flags.
+
+ Args:
+ delta: ParsedDelta from LLM adapter
+ ctx: StreamContext for state management
+ total_usage: Accumulated token usage
+
+ Returns:
+ ProcessResult with events and flags
+ """
result = ProcessResult()
- event_type, data_str = _parse_sse_line(sse_line)
- if not data_str:
+
+ # Check for error (empty delta with no content)
+ if not delta.has_content() and not delta.is_complete:
+ # Empty delta, possibly an error
return result
- # Handle upstream errors
- if event_type == 'error':
- try:
- error_data = json.loads(data_str)
- error_content = error_data.get("content", "Unknown error")
- except json.JSONDecodeError:
- error_content = data_str
- result.set_error(error_content)
- result.add_event(_sse_event("error", {"content": error_content}))
- return result
-
- try:
- chunk = json.loads(data_str)
- except json.JSONDecodeError:
- error_msg = f"Parse error: {data_str[:50]}"
- result.set_error(error_msg)
- result.add_event(_sse_event("error", {"content": error_msg}))
- return result
-
- # Extract usage
- if "usage" in chunk and chunk["usage"]:
- usage = chunk["usage"]
+ # Update usage
+ if delta.usage:
total_usage.update({
- "prompt_tokens": usage.get("prompt_tokens", 0),
- "completion_tokens": usage.get("completion_tokens", 0),
- "total_tokens": usage.get("total_tokens", 0)
+ "prompt_tokens": delta.usage.get("prompt_tokens", 0),
+ "completion_tokens": delta.usage.get("completion_tokens", 0),
+ "total_tokens": delta.usage.get("total_tokens", 0)
})
- # Handle API errors
- if "error" in chunk:
- error_msg = chunk["error"].get("message", str(chunk["error"]))
- result.set_error(error_msg)
- result.add_event(_sse_event("error", {"content": f"API Error: {error_msg}"}))
- return result
-
- # Get delta
- choices = chunk.get("choices", [])
- if not choices:
- # Non-standard format: check for content directly
- content = chunk.get("content") or ""
- if content:
- # Check for thinking tags in content
- thinking_part, clean_text = llm_parser._extract_thinking_tags(content)
-
- if thinking_part:
- ctx.full_thinking = (ctx.full_thinking or "") + thinking_part
- if not ctx.current_step_id or ctx.current_step_type != "thinking":
- ctx.start_step("thinking")
- result.add_event(_sse_event("process_step", {
- "step": {"id": ctx.current_step_id, "index": ctx.current_step_idx, "type": "thinking", "content": ctx.full_thinking}
- }))
- result.set_content()
-
- if clean_text:
- ctx.full_content = (ctx.full_content or "") + clean_text
- if not ctx.current_step_id or ctx.current_step_type != "text":
- ctx.start_step("text")
- result.add_event(_sse_event("process_step", {
- "step": {"id": ctx.current_step_id, "index": ctx.current_step_idx, "type": "text", "content": ctx.full_content}
- }))
- result.set_content()
- return result
-
- delta = choices[0].get("delta", {})
-
- # Parse delta using unified parser
- parsed = llm_parser.parse_openai(delta)
-
- # Process thinking content
- if parsed.thinking:
- ctx.full_thinking = parsed.thinking
+ # Process thinking content (incremental)
+ if delta.thinking:
+ logger.debug(f"Processing thinking: {delta.thinking[:50]}...")
+ ctx.full_thinking += delta.thinking # Accumulate incremental content
if not ctx.current_step_id or ctx.current_step_type != "thinking":
ctx.start_step("thinking")
result.add_event(_sse_event("process_step", {
@@ -201,9 +146,9 @@ class AgenticLoop:
}))
result.set_content()
- # Process text content
- if parsed.text:
- ctx.full_content = parsed.text
+ # Process text content (incremental)
+ if delta.text:
+ ctx.full_content += delta.text # Accumulate incremental content
if not ctx.current_step_id or ctx.current_step_type != "text":
ctx.start_step("text")
result.add_event(_sse_event("process_step", {
@@ -216,10 +161,11 @@ class AgenticLoop:
}))
result.set_content()
- # Accumulate tool calls
- for tc in parsed.tool_calls or delta.get("tool_calls", []):
- ctx.accumulate_tool_call(tc)
- result.set_tool_calls()
+ # Process tool calls
+ if delta.tool_calls:
+ for tc in delta.tool_calls:
+ ctx.accumulate_tool_call(tc)
+ result.set_tool_calls()
return result
diff --git a/luxx/services/chat.py b/luxx/services/chat.py
index 3d67b72..a38f893 100644
--- a/luxx/services/chat.py
+++ b/luxx/services/chat.py
@@ -297,6 +297,6 @@ def _sse_event(event: str, data: dict) -> str:
return f"event: {event}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n"
-# ============== Global Singleton ==============
-
-chat_service = ChatService()
+def create_chat_service() -> ChatService:
+ """Factory function to create ChatService instances."""
+ return ChatService()
diff --git a/luxx/services/llm_adapters/__init__.py b/luxx/services/llm_adapters/__init__.py
new file mode 100644
index 0000000..298197e
--- /dev/null
+++ b/luxx/services/llm_adapters/__init__.py
@@ -0,0 +1,24 @@
+"""LLM Provider Adapters
+
+Adapter module for various LLM API formats.
+
+Adapter types:
+- OpenAIAdapter: OpenAI/DeepSeek/GLM compatible APIs
+- AnthropicAdapter: Anthropic Claude API
+
+Usage:
+ from luxx.services.llm_adapters import OpenAIAdapter, AnthropicAdapter
+
+ adapter = OpenAIAdapter()
+ # Or use LLMClient for automatic selection
+"""
+
+from .base import ProviderAdapter
+from .openai_adapter import OpenAIAdapter
+from .anthropic_adapter import AnthropicAdapter
+
+__all__ = [
+ "ProviderAdapter",
+ "OpenAIAdapter",
+ "AnthropicAdapter",
+]
diff --git a/luxx/services/llm_adapters/anthropic_adapter.py b/luxx/services/llm_adapters/anthropic_adapter.py
new file mode 100644
index 0000000..e316a95
--- /dev/null
+++ b/luxx/services/llm_adapters/anthropic_adapter.py
@@ -0,0 +1,336 @@
+"""Anthropic Adapter - Anthropic Claude API adapter
+
+Supports Anthropic Claude API streaming and non-streaming responses.
+"""
+import json
+import logging
+from typing import Dict, List, Any, AsyncGenerator
+
+from .base import ProviderAdapter
+from ..llm_response import ParsedDelta, LLMResponse
+
+logger = logging.getLogger(__name__)
+
+
+class AnthropicAdapter(ProviderAdapter):
+ """Anthropic Claude API adapter
+
+ Pure parsing adapter - no internal state management.
+ Each parse_stream_chunk call returns incremental content.
+ Accumulation is handled by the consumer (AgenticLoop).
+
+ Anthropic API uses a completely different format from OpenAI:
+ - Endpoint: POST /v1/messages
+ - Streaming: SSE events (content_block_start, content_block_delta, etc.)
+ - Thinking: Independent thinking type content block
+ - Tools: tool_use type content block
+
+ Reference: https://docs.anthropic.com/claude/reference/messages
+ """
+
+ # Anthropic API endpoint suffix
+ MESSAGES_PATH = "/v1/messages"
+
+ # Anthropic API version
+ ANTHROPIC_VERSION = "2023-06-01"
+
+ # Content block types
+ BLOCK_MESSAGE_START = "message_start"
+ BLOCK_CONTENT_BLOCK_START = "content_block_start"
+ BLOCK_CONTENT_BLOCK_DELTA = "content_block_delta"
+ BLOCK_CONTENT_BLOCK_STOP = "content_block_stop"
+ BLOCK_MESSAGE_DELTA = "message_delta"
+ BLOCK_MESSAGE_STOP = "message_stop"
+ BLOCK_ERROR = "error"
+
+ # Delta types
+ DELTA_THINKING = "thinking_delta"
+ DELTA_TEXT = "text_delta"
+ DELTA_INPUT_JSON = "input_json_delta"
+
+ # Content block subtypes
+ SUBTYPE_THINKING = "thinking"
+ SUBTYPE_TEXT = "text"
+ SUBTYPE_TOOL_USE = "tool_use"
+
+ def __init__(self):
+ pass
+
+ @property
+ def provider_type(self) -> str:
+ return "anthropic"
+
+ def build_request(
+ self,
+ model: str,
+ messages: List[Dict[str, Any]],
+ tools: List[Dict[str, Any]] = None,
+ **kwargs
+ ) -> tuple[Dict[str, Any], Dict[str, str]]:
+ """Build Anthropic-format request
+
+ Anthropic request format differs from OpenAI:
+ - Uses "messages" instead of "message"
+ - Requires "max_tokens"
+ - Different tool format
+ """
+ api_key = kwargs.get("api_key", "")
+
+ headers = {
+ "Content-Type": "application/json",
+ "Authorization": f"Bearer {api_key}",
+ "anthropic-version": self.ANTHROPIC_VERSION
+ }
+
+ # Convert messages to Anthropic format
+ anthropic_messages = self._convert_messages(messages)
+
+ body = {
+ "model": model,
+ "messages": anthropic_messages,
+ "stream": kwargs.get("stream", True),
+ "max_tokens": kwargs.get("max_tokens", 4096)
+ }
+
+ # System message
+ if "system" in kwargs:
+ body["system"] = kwargs["system"]
+ else:
+ # Extract from first message
+ for msg in messages:
+ if msg.get("role") == "system":
+ body["system"] = msg.get("content", "")
+ break
+
+ # Thinking capability (Claude 3.5+)
+ if kwargs.get("thinking_enabled"):
+ body["thinking"] = {
+ "type": "enabled",
+ "budget_tokens": kwargs.get("thinking_budget_tokens", 10000)
+ }
+
+ # Tool definitions
+ if tools:
+ body["tools"] = self._convert_tools(tools)
+
+ # Optional parameters
+ if "temperature" in kwargs:
+ body["temperature"] = kwargs["temperature"]
+
+ if "top_p" in kwargs:
+ body["top_p"] = kwargs["top_p"]
+
+ if "stop_sequences" in kwargs:
+ body["stop_sequences"] = kwargs["stop_sequences"]
+
+ return body, headers
+
+ def _convert_messages(self, messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
+ """Convert messages to Anthropic format
+
+ Anthropic message format:
+ - role: user, assistant
+ - content: str or List[Dict]
+ """
+ anthropic_messages = []
+
+ for msg in messages:
+ role = msg.get("role", "user")
+
+ # Convert role
+ if role == "system":
+ continue # System messages handled separately
+
+ anthropic_msg = {
+ "role": "user" if role == "user" else "assistant",
+ "content": []
+ }
+
+ # Handle content
+ content = msg.get("content", "")
+ if isinstance(content, str):
+ if content.strip():
+ anthropic_msg["content"] = content
+ elif isinstance(content, list):
+ # Convert tool results
+ for item in content:
+ if isinstance(item, dict):
+ if item.get("type") == "tool_result":
+ anthropic_msg["content"].append({
+ "type": "tool_result",
+ "tool_use_id": item.get("tool_call_id", ""),
+ "content": item.get("content", "")
+ })
+ else:
+ anthropic_msg["content"].append(item)
+ else:
+ anthropic_msg["content"].append(str(item))
+
+ anthropic_messages.append(anthropic_msg)
+
+ return anthropic_messages
+
+ def _convert_tools(self, tools: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
+ """Convert tool definitions to Anthropic format"""
+ anthropic_tools = []
+
+ for tool in tools:
+ anthropic_tool = {
+ "name": tool.get("name", ""),
+ "description": tool.get("description", ""),
+ "input_schema": tool.get("parameters", {"type": "object", "properties": {}})
+ }
+ anthropic_tools.append(anthropic_tool)
+
+ return anthropic_tools
+
+ def reset(self):
+ """No-op for pure parsing adapter"""
+ pass
+
+ async def parse_stream_chunk(
+ self,
+ raw_chunk: str
+ ) -> AsyncGenerator[ParsedDelta, None]:
+ """Parse Anthropic-format SSE stream
+
+ Returns incremental content - no accumulation.
+ """
+ if not raw_chunk or raw_chunk.strip() == "":
+ return
+
+ try:
+ chunk = json.loads(raw_chunk)
+ except json.JSONDecodeError:
+ return
+
+ chunk_type = chunk.get("type", "")
+
+ # Handle errors
+ if chunk_type == self.BLOCK_ERROR:
+ error_msg = chunk.get("error", {}).get("type", "unknown_error")
+ logger.error(f"Anthropic API error: {error_msg}")
+ yield ParsedDelta()
+ return
+
+ result = ParsedDelta()
+
+ if chunk_type == self.BLOCK_MESSAGE_START:
+ # Message start - no content yet
+ pass
+
+ elif chunk_type == self.BLOCK_CONTENT_BLOCK_START:
+ # Content block start
+ block = chunk.get("content_block", {})
+ block_type = block.get("type")
+
+ if block_type == self.SUBTYPE_THINKING:
+ # Thinking block start
+ thinking_text = block.get("thinking", {}).get("thinking", "")
+ result.thinking = thinking_text
+
+ elif block_type == self.SUBTYPE_TOOL_USE:
+ # Tool use block start
+ tool_index = chunk.get("index", 0)
+ tool_name = block.get("name", "")
+ result.tool_calls = [{
+ "index": tool_index,
+ "id": "",
+ "type": "function",
+ "function": {"name": tool_name, "arguments": ""}
+ }]
+
+ elif block_type == self.SUBTYPE_TEXT:
+ # Text block start - nothing to output yet
+ pass
+
+ elif chunk_type == self.BLOCK_CONTENT_BLOCK_DELTA:
+ # Content block delta
+ delta = chunk.get("delta", {})
+ delta_type = delta.get("type", "")
+
+ if delta_type == self.DELTA_THINKING:
+ # Thinking delta (incremental)
+ thinking = delta.get("thinking", "")
+ result.thinking = thinking
+
+ elif delta_type == self.DELTA_TEXT:
+ # Text delta (incremental)
+ text = delta.get("text", "")
+ result.text = text
+
+ elif delta_type == self.DELTA_INPUT_JSON:
+ # Tool arguments delta (incremental)
+ partial_json = delta.get("partial_json", "")
+ # For tool calls, we need to update the arguments
+ # This is handled by the consumer (AgenticLoop)
+ if partial_json:
+ result.tool_calls = [{
+ "index": 0,
+ "function": {"arguments": partial_json}
+ }]
+
+ elif chunk_type == self.BLOCK_CONTENT_BLOCK_STOP:
+ # Content block stop
+ pass
+
+ elif chunk_type == self.BLOCK_MESSAGE_DELTA:
+ # Message delta (usually contains usage)
+ delta = chunk.get("delta", {})
+ usage = chunk.get("usage", {})
+
+ result.usage = {
+ "prompt_tokens": usage.get("input_tokens", 0),
+ "completion_tokens": usage.get("output_tokens", 0),
+ "total_tokens": usage.get("input_tokens", 0) + usage.get("output_tokens", 0)
+ }
+
+ # Check if complete by stop reason
+ if delta.get("stop_reason"):
+ result.is_complete = True
+
+ elif chunk_type == self.BLOCK_MESSAGE_STOP:
+ # Message stop
+ result.is_complete = True
+
+ # Yield result if there's any content
+ if result.has_content() or result.is_complete:
+ yield result
+
+ def parse_response(self, data: Dict[str, Any]) -> LLMResponse:
+ """Parse non-streaming response"""
+ content = data.get("content", [])
+ thinking = ""
+ text_content = ""
+ tool_calls = []
+
+ for block in content:
+ if isinstance(block, dict):
+ block_type = block.get("type")
+
+ if block_type == "thinking":
+ thinking = block.get("thinking", "")
+ elif block_type == "text":
+ text_content += block.get("text", "")
+ elif block_type == "tool_use":
+ tool_calls.append({
+ "id": block.get("id", ""),
+ "name": block.get("name", ""),
+ "input": block.get("input", {})
+ })
+
+ usage = data.get("usage", {})
+
+ return LLMResponse(
+ content=text_content,
+ thinking=thinking,
+ tool_calls=tool_calls,
+ usage={
+ "prompt_tokens": usage.get("input_tokens", 0),
+ "completion_tokens": usage.get("output_tokens", 0),
+ "total_tokens": usage.get("input_tokens", 0) + usage.get("output_tokens", 0)
+ }
+ )
+
+ def supports_thinking(self) -> bool:
+ return True
diff --git a/luxx/services/llm_adapters/base.py b/luxx/services/llm_adapters/base.py
new file mode 100644
index 0000000..33c3bfd
--- /dev/null
+++ b/luxx/services/llm_adapters/base.py
@@ -0,0 +1,97 @@
+"""ProviderAdapter - LLM Provider adapter base class
+
+Defines unified adapter interface that all Provider adapters must implement.
+"""
+from abc import ABC, abstractmethod
+from typing import Dict, List, Any, AsyncGenerator
+
+
+class ProviderAdapter(ABC):
+ """LLM Provider adapter base class
+
+ All LLM API adapters must inherit from this class and implement its methods.
+ Adapters are responsible for:
+ 1. Building requests in provider-specific format
+ 2. Parsing streaming and non-streaming responses
+ 3. Providing provider capability information
+
+ Attributes:
+ provider_type: Provider type identifier
+ """
+
+ @property
+ @abstractmethod
+ def provider_type(self) -> str:
+ """Return provider type identifier
+
+ Returns:
+ str: Provider type, e.g., "openai", "anthropic"
+ """
+ pass
+
+ @abstractmethod
+ def build_request(
+ self,
+ model: str,
+ messages: List[Dict[str, Any]],
+ tools: List[Dict[str, Any]] = None,
+ **kwargs
+ ) -> tuple[Dict[str, Any], Dict[str, str]]:
+ """Build request body and headers
+
+ Args:
+ model: Model name
+ messages: Message list
+ tools: Tool definition list
+ **kwargs: Other parameters (temperature, max_tokens, thinking_enabled, etc.)
+
+ Returns:
+ tuple: (body, headers) tuple
+ """
+ pass
+
+ @abstractmethod
+ async def parse_stream_chunk(
+ self,
+ raw_chunk: str
+ ) -> AsyncGenerator[Any, None]:
+ """Parse streaming response chunk
+
+ Args:
+ raw_chunk: Raw SSE line or chunk data
+
+ Yields:
+ ParsedDelta object or other parsed results
+ """
+ pass
+
+ @abstractmethod
+ def parse_response(
+ self,
+ data: Dict[str, Any]
+ ) -> Any:
+ """Parse non-streaming response
+
+ Args:
+ data: API response data
+
+ Returns:
+ LLMResponse object
+ """
+ pass
+
+ def supports_thinking(self) -> bool:
+ """Whether provider supports thinking/reasoning
+
+ Returns:
+ bool: True if provider supports thinking/reasoning content
+ """
+ return False
+
+ def supports_tools(self) -> bool:
+ """Whether provider supports tool/function calls
+
+ Returns:
+ bool: True if provider supports function calling
+ """
+ return False
diff --git a/luxx/services/llm_adapters/openai_adapter.py b/luxx/services/llm_adapters/openai_adapter.py
new file mode 100644
index 0000000..3463d13
--- /dev/null
+++ b/luxx/services/llm_adapters/openai_adapter.py
@@ -0,0 +1,191 @@
+"""OpenAI Adapter - OpenAI-compatible API adapter
+
+Supports OpenAI, DeepSeek, GLM and other OpenAI-compatible APIs.
+"""
+import json
+import logging
+from typing import Dict, List, Any, AsyncGenerator, Optional
+
+from .base import ProviderAdapter
+from ..llm_response import ParsedDelta, LLMResponse
+
+logger = logging.getLogger(__name__)
+
+
+class OpenAIAdapter(ProviderAdapter):
+ """OpenAI-compatible API adapter
+
+ Pure parsing adapter - no internal state management.
+ Each parse_stream_chunk call returns incremental content.
+ Accumulation is handled by the consumer (AgenticLoop).
+ """
+
+ @property
+ def provider_type(self) -> str:
+ return "openai"
+
+ def __init__(self):
+ pass
+
+ def build_request(
+ self,
+ model: str,
+ messages: List[Dict[str, Any]],
+ tools: List[Dict[str, Any]] = None,
+ **kwargs
+ ) -> tuple[Dict[str, Any], Dict[str, str]]:
+ """Build OpenAI-format request"""
+ api_key = kwargs.get("api_key", "")
+
+ headers = {
+ "Content-Type": "application/json",
+ "Authorization": f"Bearer {api_key}"
+ }
+
+ body = {
+ "model": model,
+ "messages": messages,
+ "stream": kwargs.get("stream", True)
+ }
+
+ # Optional parameters
+ if "temperature" in kwargs:
+ body["temperature"] = kwargs["temperature"]
+ if "max_tokens" in kwargs:
+ body["max_tokens"] = kwargs["max_tokens"]
+ if "top_p" in kwargs:
+ body["top_p"] = kwargs["top_p"]
+ if "frequency_penalty" in kwargs:
+ body["frequency_penalty"] = kwargs["frequency_penalty"]
+ if "presence_penalty" in kwargs:
+ body["presence_penalty"] = kwargs["presence_penalty"]
+ if "stop" in kwargs:
+ body["stop"] = kwargs["stop"]
+ if tools:
+ body["tools"] = tools
+ if kwargs.get("thinking_enabled"):
+ body["thinking_enabled"] = True
+
+ return body, headers
+
+ def reset(self):
+ """No-op for pure parsing adapter"""
+ pass
+
+ async def parse_stream_chunk(
+ self,
+ raw_chunk: str
+ ) -> AsyncGenerator[ParsedDelta, None]:
+ """Parse OpenAI-format SSE stream
+
+ Returns incremental content - no accumulation.
+ """
+ # Parse SSE line
+ event_type, data_str = self._parse_sse_line(raw_chunk)
+
+ if not data_str or data_str == "[DONE]":
+ if data_str == "[DONE]":
+ yield ParsedDelta(is_complete=True)
+ return
+
+ try:
+ chunk = json.loads(data_str)
+ except json.JSONDecodeError:
+ return
+
+ # Handle errors
+ if event_type == "error" or "error" in chunk:
+ yield ParsedDelta()
+ return
+
+ # Extract usage
+ usage = chunk.get("usage", {})
+
+ # Parse choices
+ for choice in chunk.get("choices", []):
+ delta = choice.get("delta", {})
+ content = delta.get("content") or ""
+
+ # Extract thinking tags if present
+ thinking, clean_text = self._extract_tags(content)
+
+ # Tool calls
+ tool_calls = delta.get("tool_calls", [])
+
+ # Check if this is the final delta
+ is_complete = bool(choice.get("finish_reason"))
+
+ if thinking or clean_text or tool_calls or is_complete or usage:
+ yield ParsedDelta(
+ thinking=thinking,
+ text=clean_text,
+ tool_calls=tool_calls if tool_calls else [],
+ is_complete=is_complete,
+ usage=usage if usage else {}
+ )
+
+ def parse_response(self, data: Dict[str, Any]) -> LLMResponse:
+ """Parse non-streaming response"""
+ choice = data.get("choices", [{}])[0]
+ message = choice.get("message", {})
+
+ content = message.get("content", "") or ""
+ thinking, clean_content = self._extract_tags(content)
+ if not thinking:
+ thinking = message.get("reasoning_content") or ""
+
+ tool_calls = message.get("tool_calls", [])
+
+ usage = data.get("usage", {})
+
+ return LLMResponse(
+ content=clean_content,
+ thinking=thinking,
+ tool_calls=tool_calls,
+ usage=usage
+ )
+
+ def _parse_sse_line(self, line: str) -> tuple:
+ """Parse a single SSE line, return (event_type, data)"""
+ if line.startswith("event:"):
+ return line[6:].strip(), None
+ elif line.startswith("data:"):
+ return "", line[5:].strip()
+ return "", None
+
+ def _extract_tags(self, content: str) -> tuple:
+ """Extract thinking tags and return (thinking, clean_text)"""
+ if not content:
+ return "", ""
+
+ thinking_parts = []
+ clean_parts = []
+ i = 0
+
+ while i < len(content):
+ remaining = content[i:]
+ remaining_lower = remaining.lower()
+
+ if remaining_lower.startswith(""):
+ # Found start of thinking tag
+ end_pos = i + 7
+ remaining_after_tag = content[end_pos:]
+ end_idx = remaining_after_tag.lower().find("")
+
+ if end_idx != -1:
+ thinking_parts.append(remaining_after_tag[:end_idx])
+ i = end_pos + end_idx + 9
+ continue
+ else:
+ # No end tag - all remaining is thinking
+ thinking_parts.append(remaining.strip())
+ break
+
+ if remaining_lower.startswith(""):
+ i += 9
+ continue
+
+ clean_parts.append(content[i])
+ i += 1
+
+ return "".join(thinking_parts), "".join(clean_parts)
diff --git a/luxx/services/llm_client.py b/luxx/services/llm_client.py
index fac67a7..680c5a5 100644
--- a/luxx/services/llm_client.py
+++ b/luxx/services/llm_client.py
@@ -1,165 +1,258 @@
-"""LLM API client"""
+"""LLM API Client - Unified client with multi-Provider support
+
+Supports various LLM API formats:
+- OpenAI (api.openai.com)
+- DeepSeek (api.deepseek.com)
+- Anthropic (api.anthropic.com)
+- GLM/Zhipu AI
+
+Usage:
+ from luxx.services.llm_client import LLMClient
+
+ # Auto-detect provider
+ client = LLMClient(api_key="...", api_url="...")
+
+ # Specify provider
+ client = LLMClient(api_key="...", api_url="...", provider_type="anthropic")
+
+ # Streaming call
+ async for delta in client.stream_call(model, messages, tools=tools):
+ print(delta.text, delta.thinking, delta.tool_calls)
+"""
import json
-import httpx
import logging
import traceback
-from typing import Dict, Any, Optional, List, AsyncGenerator
+from typing import Dict, List, Any, Optional, AsyncGenerator
+
+import httpx
from luxx.config import config
+from luxx.services.llm_adapters import (
+ ProviderAdapter,
+ OpenAIAdapter,
+ AnthropicAdapter,
+)
+from luxx.services.llm_response import ParsedDelta, LLMResponse
logger = logging.getLogger(__name__)
-class LLMResponse:
- """LLM response"""
- content: str
- tool_calls: Optional[List[Dict]] = None
- usage: Optional[Dict] = None
+class LLMClient:
+ """LLM API Client with multi-Provider support
+
+ Uses adapter pattern to support different API formats, auto-detects or manually specifies Provider type.
+
+ Attributes:
+ api_key: API key
+ api_url: API base URL
+ default_model: Default model
+ provider_type: Provider type
+ adapter: Current adapter instance
+ """
+
+ # Provider type to adapter class mapping
+ PROVIDER_ADAPTERS: Dict[str, type] = {
+ # OpenAI-compatible formats
+ "openai": OpenAIAdapter,
+ "deepseek": OpenAIAdapter,
+ "glm": OpenAIAdapter,
+ "zhipu": OpenAIAdapter,
+ # Anthropic formats
+ "anthropic": AnthropicAdapter,
+ "claude": AnthropicAdapter,
+ }
+
+ # URL keywords for provider detection
+ PROVIDER_URL_KEYWORDS: Dict[str, List[str]] = {
+ "anthropic": ["anthropic", "claude"],
+ "deepseek": ["deepseek"],
+ "glm": ["glm", "zhipu", "chatglm"],
+ "openai": ["openai"],
+ }
def __init__(
self,
- content: str = "",
- tool_calls: Optional[List[Dict]] = None,
- usage: Optional[Dict] = None
+ api_key: str = None,
+ api_url: str = None,
+ model: str = None,
+ provider_type: str = None
):
- self.content = content
- self.tool_calls = tool_calls
- self.usage = usage
-
-
-class LLMClient:
- """LLM API client with multi-provider support"""
-
- def __init__(self, api_key: str = None, api_url: str = None, model: str = None):
+ """Initialize LLM client
+
+ Args:
+ api_key: API key, defaults to config value
+ api_url: API base URL, defaults to config value
+ model: Default model name
+ provider_type: Specify Provider type, defaults to auto-detect
+ """
self.api_key = api_key or config.llm_api_key
self.api_url = api_url or config.llm_api_url
self.default_model = model
- self.provider = self._detect_provider()
+
+ # Detect or use specified provider
+ if provider_type:
+ self.provider_type = provider_type
+ else:
+ self.provider_type = self._detect_provider_type(api_url)
+
+ self.adapter = self._create_adapter()
self._client: Optional[httpx.AsyncClient] = None
- def _detect_provider(self) -> str:
- """Detect provider from URL"""
- url = self.api_url.lower()
- if "deepseek" in url:
- return "deepseek"
- elif "glm" in url or "zhipu" in url:
- return "glm"
- elif "openai" in url:
- return "openai"
+ def _detect_provider_type(self, url: str = None) -> str:
+ """Detect Provider type from URL
+
+ Args:
+ url: API URL, uses self.api_url if None
+
+ Returns:
+ Provider type string
+ """
+ url = url or self.api_url
+ url_lower = url.lower()
+
+ for provider, keywords in self.PROVIDER_URL_KEYWORDS.items():
+ for keyword in keywords:
+ if keyword in url_lower:
+ logger.debug(f"Detected provider '{provider}' from URL: {url}")
+ return provider
+
+ logger.debug(f"Defaulting to 'openai' for URL: {url}")
return "openai"
- async def close(self):
- """Close client"""
- if self._client:
- await self._client.aclose()
- self._client = None
-
- def _build_headers(self) -> Dict[str, str]:
- """Build request headers"""
- return {
- "Content-Type": "application/json",
- "Authorization": f"Bearer {self.api_key}"
- }
-
- def _build_body(
- self,
- model: str,
- messages: List[Dict],
- tools: Optional[List[Dict]] = None,
- stream: bool = False,
- **kwargs
- ) -> Dict[str, Any]:
- """Build request body"""
- body = {
- "model": model,
- "messages": messages,
- "stream": stream
- }
+ def _create_adapter(self) -> ProviderAdapter:
+ """Create adapter instance
- if "temperature" in kwargs:
- body["temperature"] = kwargs["temperature"]
-
- if "max_tokens" in kwargs:
- body["max_tokens"] = kwargs["max_tokens"]
-
- if "thinking_enabled" in kwargs and kwargs["thinking_enabled"]:
- body["thinking_enabled"] = True
-
- if tools:
- body["tools"] = tools
-
- return body
-
- def _parse_response(self, data: Dict) -> LLMResponse:
- """Parse response"""
- content = ""
- tool_calls = None
- usage = None
-
- if "choices" in data:
- choice = data["choices"][0]
- content = choice.get("message", {}).get("content", "")
- tool_calls = choice.get("message", {}).get("tool_calls")
-
- if "usage" in data:
- usage = data["usage"]
-
- return LLMResponse(
- content=content,
- tool_calls=tool_calls,
- usage=usage
+ Returns:
+ ProviderAdapter subclass instance
+ """
+ adapter_class = self.PROVIDER_ADAPTERS.get(
+ self.provider_type,
+ OpenAIAdapter
)
+ logger.info(f"Created {adapter_class.__name__} for provider: {self.provider_type}")
+ return adapter_class()
+
+ @property
+ def supports_thinking(self) -> bool:
+ """Whether current Provider supports thinking content"""
+ return self.adapter.supports_thinking()
+
+ @property
+ def supports_tools(self) -> bool:
+ """Whether current Provider supports tool calls"""
+ return self.adapter.supports_tools()
async def client(self) -> httpx.AsyncClient:
- """Get HTTP client"""
- if self._client is None:
+ """Get HTTP client (lazy load)"""
+ if self._client is None or self._client.is_closed:
self._client = httpx.AsyncClient(timeout=120.0)
return self._client
- async def sync_call(
+ async def close(self):
+ """Close HTTP client"""
+ if self._client and not self._client.is_closed:
+ await self._client.aclose()
+ self._client = None
+
+ def sync_call(
self,
model: str,
- messages: List[Dict],
- tools: Optional[List[Dict]] = None,
+ messages: List[Dict[str, Any]],
+ tools: List[Dict[str, Any]] = None,
**kwargs
) -> LLMResponse:
- """Call LLM API (non-streaming)"""
- body = self._build_body(model, messages, tools, stream=False, **kwargs)
+ """Synchronous call to LLM (non-streaming)
- async with httpx.AsyncClient(timeout=120.0) as client:
- response = await client.post(
- self.api_url,
- headers=self._build_headers(),
- json=body
- )
- response.raise_for_status()
- data = response.json()
+ Args:
+ model: Model name
+ messages: Message list
+ tools: Tool definition list
+ **kwargs: Other parameters (temperature, max_tokens, thinking_enabled, etc.)
+
+ Returns:
+ LLMResponse object
+ """
+ import asyncio
+ return asyncio.get_event_loop().run_until_complete(
+ self.async_sync_call(model, messages, tools, **kwargs)
+ )
+
+ async def async_sync_call(
+ self,
+ model: str,
+ messages: List[Dict[str, Any]],
+ tools: List[Dict[str, Any]] = None,
+ **kwargs
+ ) -> LLMResponse:
+ """Internal async sync call"""
+ model = model or self.default_model
+ kwargs["api_key"] = self.api_key
- return self._parse_response(data)
+ body, headers = self.adapter.build_request(
+ model, messages, tools, stream=False, **kwargs
+ )
+
+ endpoint = self.api_url
+ logger.info(f"Sync call to {endpoint} with model {model}")
+
+ try:
+ async with httpx.AsyncClient(timeout=120.0) as client:
+ response = await client.post(
+ endpoint,
+ headers=headers,
+ json=body
+ )
+ response.raise_for_status()
+ data = response.json()
+
+ return self.adapter.parse_response(data)
+
+ except httpx.HTTPStatusError as e:
+ logger.error(f"HTTP error: {e.response.status_code} - {e.response.text}")
+ raise
+ except Exception as e:
+ logger.error(f"Sync call error: {e}\n{traceback.format_exc()}")
+ raise
async def stream_call(
self,
model: str,
- messages: List[Dict],
- tools: Optional[List[Dict]] = None,
+ messages: List[Dict[str, Any]],
+ tools: List[Dict[str, Any]] = None,
**kwargs
- ) -> AsyncGenerator[str, None]:
- """Stream call LLM API - yields raw SSE event lines
+ ) -> AsyncGenerator[ParsedDelta, None]:
+ """Streaming call to LLM
+ Args:
+ model: Model name
+ messages: Message list
+ tools: Tool definition list
+ **kwargs: Other parameters
+
Yields:
- str: Raw SSE event lines for direct forwarding
+ ParsedDelta objects with accumulated content
"""
- body = self._build_body(model, messages, tools, stream=True, **kwargs)
+ # Reset adapter buffers for new stream
+ if hasattr(self.adapter, 'reset'):
+ self.adapter.reset()
- logger.info(f"Starting stream_call for model: {model}, messages count: {len(messages)}")
+ model = model or self.default_model
+ kwargs["api_key"] = self.api_key
+ kwargs["stream"] = True
+
+ body, headers = self.adapter.build_request(
+ model, messages, tools, **kwargs
+ )
+
+ endpoint = self.api_url
+ logger.info(f"Stream call to {endpoint} with model {model}")
try:
async with httpx.AsyncClient(timeout=120.0) as client:
- logger.info(f"Sending request to {self.api_url}")
async with client.stream(
"POST",
- self.api_url,
- headers=self._build_headers(),
+ endpoint,
+ headers=headers,
json=body
) as response:
logger.info(f"Response status: {response.status_code}")
@@ -167,15 +260,40 @@ class LLMClient:
async for line in response.aiter_lines():
if line.strip():
- yield line + "\n"
+ async for delta in self.adapter.parse_stream_chunk(line):
+ yield delta
+
except httpx.HTTPStatusError as e:
status_code = e.response.status_code if e.response else "?"
- logger.error(f"HTTP error: {status_code}")
- yield f"event: error\ndata: {json.dumps({'content': f'HTTP {status_code}: Request failed'})}\n\n"
+ error_body = e.response.text if e.response else ""
+ logger.error(f"HTTP error: {status_code} - {error_body}")
+ yield ParsedDelta()
except Exception as e:
- logger.error(f"Exception: {type(e).__name__}: {str(e)}\n{traceback.format_exc()}")
- yield f"event: error\ndata: {json.dumps({'content': str(e)})}\n\n"
+ logger.error(f"Stream error: {type(e).__name__}: {e}\n{traceback.format_exc()}")
+ yield ParsedDelta()
-# Global LLM client
-llm_client = LLMClient()
+# Convenience function
+def create_client(
+ api_key: str = None,
+ api_url: str = None,
+ model: str = None,
+ provider_type: str = None
+) -> LLMClient:
+ """Convenience function to create LLM client
+
+ Args:
+ api_key: API key
+ api_url: API URL
+ model: Model
+ provider_type: Provider type
+
+ Returns:
+ LLMClient instance
+ """
+ return LLMClient(
+ api_key=api_key,
+ api_url=api_url,
+ model=model,
+ provider_type=provider_type
+ )
diff --git a/luxx/services/llm_response.py b/luxx/services/llm_response.py
index f98ec13..a31f403 100644
--- a/luxx/services/llm_response.py
+++ b/luxx/services/llm_response.py
@@ -1,309 +1,65 @@
-"""LLM Response Parser - Unified parser for multiple LLM API formats.
+"""LLM Response - Unified message classes for LLM communication
-Supported Providers:
-- OpenAI: delta.content, delta.tool_calls
-- DeepSeek: delta.content, delta.reasoning_content, delta.tool_calls
-- Anthropic: content_block with thinking/text types
-- MiniMax: <|im_start|>thinking...<|im_end|> tags in content
-
-Data Flow:
-```
-LLM API Response (SSE)
- │
- ▼
-LLMResponseParser.parse_chunk()
- │
- ├──► ParsedDelta { thinking, text, tool_calls }
- │
- ▼
-AgenticLoop._process_stream_line()
- │
- ▼
-SSE Events (process_step)
- │
- ├──► type: "thinking"
- ├──► type: "text"
- └──► type: "tool_call"
-```
-
-API Response Formats:
-
-1. OpenAI Standard (DeepSeek, OpenAI):
-```json
-{
- "choices": [{
- "delta": {
- "content": "Hello",
- "reasoning_content": "Let me think...",
- "tool_calls": [{"id": "call_1", "function": {...}}]
- }
- }]
-}
-```
-
-2. Anthropic Streaming:
-```json
-{"type": "content_block_start", "content_block": {"type": "thinking", "thinking": "..."}}
-{"type": "content_block_delta", "delta": {"type": "thinking_delta", "thinking": "..."}}
-{"type": "content_block_delta", "delta": {"type": "text_delta", "text": "..."}}
-{"type": "content_block_stop"}
-```
-
-3. MiniMax (with thinking tags in content):
-```json
-{
- "choices": [{
- "delta": {
- "content": "<|im_start|>thinking分析中...<|im_end|>回复内容"
- }
- }]
-}
-```
-
-4. Standard thinking tags:
-```json
-{
- "choices": [{
- "delta": {
- "content": "思考内容回复内容"
- }
- }]
-}
-```
+This module provides unified data classes for message passing throughout the LLM pipeline.
"""
-from typing import Dict, Any, Optional, List
-from dataclasses import dataclass
+from typing import Dict, Any, List, Optional
+from dataclasses import dataclass, field
@dataclass
class ParsedDelta:
- """Parsed response delta from LLM.
+ """Streaming response delta
+
+ Represents a single unit of streaming response data.
+ Used for streaming responses where content is accumulated incrementally.
Attributes:
- thinking: Thinking/reasoning content
- text: Regular text content
- tool_calls: Tool call requests
- is_complete: Whether this delta completes a content block
+ thinking: Accumulated thinking/reasoning content
+ text: Accumulated text content
+ tool_calls: List of tool call requests
+ is_complete: Whether this is the final delta
+ usage: Token usage statistics
"""
thinking: str = ""
text: str = ""
- tool_calls: List[Dict] = None
+ tool_calls: List[Dict] = field(default_factory=list)
is_complete: bool = False
+ usage: Dict[str, int] = field(default_factory=dict)
- def __post_init__(self):
- if self.tool_calls is None:
- self.tool_calls = []
+ def has_thinking(self) -> bool:
+ """Check if there's thinking content"""
+ return bool(self.thinking)
+
+ def has_text(self) -> bool:
+ """Check if there's text content"""
+ return bool(self.text)
+
+ def has_tool_calls(self) -> bool:
+ """Check if there are tool calls"""
+ return bool(self.tool_calls)
+
+ def has_content(self) -> bool:
+ """Check if there's any content"""
+ return self.has_thinking() or self.has_text() or self.has_tool_calls()
-class LLMResponseParser:
- """Unified parser for LLM API response formats.
+@dataclass
+class LLMResponse:
+ """Complete LLM response
- Usage:
- from luxx.services.llm_response import llm_parser
-
- # Parse OpenAI format
- delta = {"content": "Hello", "reasoning_content": "Thinking..."}
- parsed = llm_parser.parse_openai(delta)
-
- # Parse Anthropic format
- chunk = {"type": "content_block_delta", "delta": {"type": "thinking_delta", "thinking": "..."}}
- parsed = llm_parser.parse_anthropic(chunk)
-
- # Auto-detect format
- parsed = llm_parser.parse_chunk(chunk, provider="anthropic")
+ Represents a complete non-streaming response.
+
+ Attributes:
+ content: Final text content
+ thinking: Final thinking content (if any)
+ tool_calls: List of tool calls (if any)
+ usage: Token usage statistics
"""
+ content: str = ""
+ thinking: str = ""
+ tool_calls: List[Dict] = field(default_factory=list)
+ usage: Dict[str, int] = field(default=dict)
- # Content block types
- BLOCK_THINKING = "thinking"
- BLOCK_TEXT = "text"
- BLOCK_TOOL_USE = "tool_use"
- BLOCK_TOOL_RESULT = "tool_result"
-
- def __init__(self):
- self._buffer = ""
- self._thinking_buffer = ""
- self._text_buffer = ""
-
- def reset(self):
- """Reset parser state for new message."""
- self._buffer = ""
- self._thinking_buffer = ""
- self._text_buffer = ""
-
- def parse_openai(self, delta: Dict) -> ParsedDelta:
- """Parse OpenAI format delta.
-
- Handles:
- - OpenAI: delta.content, delta.tool_calls
- - DeepSeek: delta.content, delta.reasoning_content, delta.tool_calls
- - MiniMax: <|im_start|>thinking...<|im_end|> in content
- - Standard: ... in content
-
- Args:
- delta: Delta object from LLM API response
-
- Returns:
- ParsedDelta with extracted thinking, text, and tool_calls
- """
- result = ParsedDelta()
-
- # Get thinking content (DeepSeek uses reasoning_content)
- thinking = delta.get("reasoning_content") or delta.get("reasoning") or ""
- if thinking:
- self._thinking_buffer += thinking
- result.thinking = self._thinking_buffer
-
- # Get text content
- text = delta.get("content") or ""
- if text:
- # Check for embedded thinking tags (MiniMax format)
- thinking_part, clean_text = self._extract_thinking_tags(text)
- if thinking_part:
- self._thinking_buffer += thinking_part
- result.thinking = self._thinking_buffer
- if clean_text:
- self._text_buffer += clean_text
- result.text = self._text_buffer
- elif thinking_part := delta.get("thinking"):
- # Some providers use "thinking" field directly
- self._thinking_buffer += thinking_part
- result.thinking = self._thinking_buffer
-
- # Tool calls
- result.tool_calls = delta.get("tool_calls") or []
-
- return result
-
- def parse_anthropic(self, chunk: Dict) -> ParsedDelta:
- """Parse Anthropic streaming format.
-
- Anthropic uses a different event structure:
- - content_block_start: Begin a content block
- - content_block_delta: Incremental content
- - content_block_stop: End of content blocks
-
- Content block types:
- - thinking: Model reasoning
- - text: Regular text
- - tool_use: Tool invocation
- - tool_result: Tool output
-
- Args:
- chunk: Anthropic SSE event chunk
-
- Returns:
- ParsedDelta with extracted content
- """
- result = ParsedDelta()
- chunk_type = chunk.get("type", "")
-
- if chunk_type == "content_block_start":
- block = chunk.get("content_block", {})
- if block.get("type") == self.BLOCK_THINKING:
- thinking = block.get("thinking", "")
- if thinking:
- self._thinking_buffer = thinking
- result.thinking = self._thinking_buffer
-
- elif chunk_type == "content_block_delta":
- delta = chunk.get("delta", {})
- delta_type = delta.get("type", "")
-
- if delta_type == "thinking_delta":
- thinking = delta.get("thinking", "")
- self._thinking_buffer += thinking
- result.thinking = self._thinking_buffer
-
- elif delta_type == "text_delta":
- text = delta.get("text", "")
- self._text_buffer += text
- result.text = self._text_buffer
-
- elif delta_type == "partial_json":
- # Partial JSON for tool calls
- pass
-
- elif chunk_type == "content_block_stop":
- result.is_complete = True
-
- return result
-
- def parse_chunk(self, chunk: Dict, provider: str = "openai") -> ParsedDelta:
- """Parse chunk based on provider.
-
- Args:
- chunk: Response chunk from LLM
- provider: Provider name ("openai", "anthropic", "deepseek", "minimax")
-
- Returns:
- ParsedDelta with extracted content
- """
- if provider == "anthropic":
- return self.parse_anthropic(chunk)
-
- # Default to OpenAI format
- return self.parse_openai(chunk)
-
- def _extract_thinking_tags(self, content: str) -> tuple:
- """Extract thinking content from tags.
-
- Handles multiple tag formats:
- - MiniMax: <|im_start|>thinking...<|im_end|>
- - Standard: ...
-
- Args:
- content: Raw content string from LLM
-
- Returns:
- Tuple of (thinking_content, clean_text)
- """
- thinking_parts = []
- clean_parts = []
- i = 0
-
- while i < len(content):
- remaining = content[i:].lower()
-
- # Check for MiniMax format
- if remaining.startswith("<|im_start|>thinking"):
- end_tag = "<|im_end|>"
- start = i + 21 # len("<|im_start|>thinking")
- end = content.find(end_tag, start)
- if end != -1:
- thinking_parts.append(content[start:end])
- i = end + len(end_tag)
- continue
-
- # Check for standard format
- if remaining.startswith(""):
- end_tag = ""
- start = i + 7 # len("")
- end = content.find(end_tag, start)
- if end != -1:
- thinking_parts.append(content[start:end])
- i = end + len(end_tag)
- continue
-
- # Regular character
- clean_parts.append(content[i])
- i += 1
-
- return "".join(thinking_parts), "".join(clean_parts)
-
- def has_thinking_tags(self, content: str) -> bool:
- """Check if content contains thinking tags.
-
- Args:
- content: Raw content string
-
- Returns:
- True if content contains thinking tags
- """
- if not content:
- return False
- lower = content.lower()
- return "<|im_start|>thinking" in lower or "" in lower
-
-
-# Global parser instance
-llm_parser = LLMResponseParser()
+ def has_tool_calls(self) -> bool:
+ """Check if there are tool calls"""
+ return bool(self.tool_calls)
diff --git a/luxx/services/stream_context.py b/luxx/services/stream_context.py
index 7ebde9a..39debbf 100644
--- a/luxx/services/stream_context.py
+++ b/luxx/services/stream_context.py
@@ -65,44 +65,6 @@ class StreamContext:
"content": content
})
- def handle_thinking(self, delta: Dict) -> Optional[str]:
- """Handle reasoning delta from LLM."""
- reasoning = delta.get("reasoning_content", "")
- if not reasoning:
- return None
-
- if not self.full_thinking:
- self.start_step("thinking")
-
- self.full_thinking += reasoning
- return _sse_event("process_step", {
- "step": {
- "id": self.current_step_id,
- "index": self.current_step_idx,
- "type": "thinking",
- "content": self.full_thinking
- }
- })
-
- def handle_text(self, delta: Dict) -> Optional[str]:
- """Handle content delta from LLM."""
- content = delta.get("content", "")
- if not content:
- return None
-
- if not self.full_content:
- self.start_step("text")
-
- self.full_content += content
- return _sse_event("process_step", {
- "step": {
- "id": self.current_step_id,
- "index": self.current_step_idx,
- "type": "text",
- "content": self.full_content
- }
- })
-
def accumulate_tool_call(self, tc_delta: Dict):
"""Accumulate tool call delta."""
idx = tc_delta.get("index", 0)