diff --git a/luxx/services/__init__.py b/luxx/services/__init__.py index 6cb90eb..82029a8 100644 --- a/luxx/services/__init__.py +++ b/luxx/services/__init__.py @@ -1,4 +1,4 @@ """Services module""" -from luxx.services.llm_client import LLMClient, llm_client, LLMResponse +from luxx.services.llm_client import LLMClient +from luxx.services.llm_response import ParsedDelta, LLMResponse, StreamAccumulator from luxx.services.chat import ChatService, chat_service -from luxx.services.llm_response import LLMResponseParser, llm_parser, ParsedDelta diff --git a/luxx/services/agentic_loop.py b/luxx/services/agentic_loop.py index 15fb2cb..efed7ee 100644 --- a/luxx/services/agentic_loop.py +++ b/luxx/services/agentic_loop.py @@ -8,17 +8,15 @@ The loop: 5. Repeat (max 10 iterations) 6. Return final response """ -import json import uuid import logging -import traceback -from typing import List, Dict, Any, AsyncGenerator +from typing import List, Dict, AsyncGenerator from luxx.tools.executor import ToolExecutor from luxx.services.llm_client import LLMClient from luxx.services.stream_context import StreamContext, _sse_event from luxx.services.process_result import ProcessResult -from luxx.services.llm_response import llm_parser +from luxx.services.llm_response import ParsedDelta logger = logging.getLogger(__name__) @@ -26,20 +24,11 @@ logger = logging.getLogger(__name__) MAX_ITERATIONS = 10 -def _parse_sse_line(line: str) -> tuple: - """Parse SSE line into (event_type, data_str).""" - event_type = None - data_str = None - for part in line.strip().split('\n'): - if part.startswith('event: '): - event_type = part[7:].strip() - elif part.startswith('data: '): - data_str = part[6:].strip() - return event_type, data_str - - class AgenticLoop: - """Executes the Agentic Loop: LLM + Tools iteration.""" + """Executes the Agentic Loop: LLM + Tools iteration. + + Supports multiple LLM Providers, auto-adapts response format. + """ def __init__(self, tool_executor: ToolExecutor): self.tool_executor = tool_executor @@ -66,8 +55,8 @@ class AgenticLoop: context.reset() has_error = False - # Stream LLM response - async for sse_line in llm.stream_call( + # Stream LLM response - now yields ParsedDelta directly + async for delta in llm.stream_call( model=model, messages=messages, tools=tools, @@ -75,8 +64,8 @@ class AgenticLoop: max_tokens=max_tokens, thinking_enabled=thinking_enabled ): - # Process stream line - result = self._process_stream_line(sse_line, context, total_usage) + # Process parsed delta + result = self._process_delta(delta, context, total_usage) # Yield events for event in result.events: @@ -110,85 +99,40 @@ class AgenticLoop: if not has_error: yield _sse_event("error", {"content": "Exceeded maximum tool call iterations"}) - def _process_stream_line(self, sse_line: str, ctx: 'StreamContext', - total_usage: dict) -> ProcessResult: - """Process single SSE line from LLM, return result with events and flags.""" + def _process_delta( + self, + delta: ParsedDelta, + ctx: 'StreamContext', + total_usage: dict + ) -> ProcessResult: + """Process ParsedDelta from adapter, return result with events and flags. + + Args: + delta: ParsedDelta from LLM adapter + ctx: StreamContext for state management + total_usage: Accumulated token usage + + Returns: + ProcessResult with events and flags + """ result = ProcessResult() - event_type, data_str = _parse_sse_line(sse_line) - if not data_str: + + # Check for error (empty delta with no content) + if not delta.has_content() and not delta.is_complete: + # Empty delta, possibly an error return result - # Handle upstream errors - if event_type == 'error': - try: - error_data = json.loads(data_str) - error_content = error_data.get("content", "Unknown error") - except json.JSONDecodeError: - error_content = data_str - result.set_error(error_content) - result.add_event(_sse_event("error", {"content": error_content})) - return result - - try: - chunk = json.loads(data_str) - except json.JSONDecodeError: - error_msg = f"Parse error: {data_str[:50]}" - result.set_error(error_msg) - result.add_event(_sse_event("error", {"content": error_msg})) - return result - - # Extract usage - if "usage" in chunk and chunk["usage"]: - usage = chunk["usage"] + # Update usage + if delta.usage: total_usage.update({ - "prompt_tokens": usage.get("prompt_tokens", 0), - "completion_tokens": usage.get("completion_tokens", 0), - "total_tokens": usage.get("total_tokens", 0) + "prompt_tokens": delta.usage.get("prompt_tokens", 0), + "completion_tokens": delta.usage.get("completion_tokens", 0), + "total_tokens": delta.usage.get("total_tokens", 0) }) - # Handle API errors - if "error" in chunk: - error_msg = chunk["error"].get("message", str(chunk["error"])) - result.set_error(error_msg) - result.add_event(_sse_event("error", {"content": f"API Error: {error_msg}"})) - return result - - # Get delta - choices = chunk.get("choices", []) - if not choices: - # Non-standard format: check for content directly - content = chunk.get("content") or "" - if content: - # Check for thinking tags in content - thinking_part, clean_text = llm_parser._extract_thinking_tags(content) - - if thinking_part: - ctx.full_thinking = (ctx.full_thinking or "") + thinking_part - if not ctx.current_step_id or ctx.current_step_type != "thinking": - ctx.start_step("thinking") - result.add_event(_sse_event("process_step", { - "step": {"id": ctx.current_step_id, "index": ctx.current_step_idx, "type": "thinking", "content": ctx.full_thinking} - })) - result.set_content() - - if clean_text: - ctx.full_content = (ctx.full_content or "") + clean_text - if not ctx.current_step_id or ctx.current_step_type != "text": - ctx.start_step("text") - result.add_event(_sse_event("process_step", { - "step": {"id": ctx.current_step_id, "index": ctx.current_step_idx, "type": "text", "content": ctx.full_content} - })) - result.set_content() - return result - - delta = choices[0].get("delta", {}) - - # Parse delta using unified parser - parsed = llm_parser.parse_openai(delta) - # Process thinking content - if parsed.thinking: - ctx.full_thinking = parsed.thinking + if delta.thinking: + ctx.full_thinking = delta.thinking if not ctx.current_step_id or ctx.current_step_type != "thinking": ctx.start_step("thinking") result.add_event(_sse_event("process_step", { @@ -202,8 +146,8 @@ class AgenticLoop: result.set_content() # Process text content - if parsed.text: - ctx.full_content = parsed.text + if delta.text: + ctx.full_content = delta.text if not ctx.current_step_id or ctx.current_step_type != "text": ctx.start_step("text") result.add_event(_sse_event("process_step", { @@ -216,10 +160,11 @@ class AgenticLoop: })) result.set_content() - # Accumulate tool calls - for tc in parsed.tool_calls or delta.get("tool_calls", []): - ctx.accumulate_tool_call(tc) - result.set_tool_calls() + # Process tool calls + if delta.tool_calls: + for tc in delta.tool_calls: + ctx.accumulate_tool_call(tc) + result.set_tool_calls() return result diff --git a/luxx/services/llm_adapters/__init__.py b/luxx/services/llm_adapters/__init__.py new file mode 100644 index 0000000..298197e --- /dev/null +++ b/luxx/services/llm_adapters/__init__.py @@ -0,0 +1,24 @@ +"""LLM Provider Adapters + +Adapter module for various LLM API formats. + +Adapter types: +- OpenAIAdapter: OpenAI/DeepSeek/GLM compatible APIs +- AnthropicAdapter: Anthropic Claude API + +Usage: + from luxx.services.llm_adapters import OpenAIAdapter, AnthropicAdapter + + adapter = OpenAIAdapter() + # Or use LLMClient for automatic selection +""" + +from .base import ProviderAdapter +from .openai_adapter import OpenAIAdapter +from .anthropic_adapter import AnthropicAdapter + +__all__ = [ + "ProviderAdapter", + "OpenAIAdapter", + "AnthropicAdapter", +] diff --git a/luxx/services/llm_adapters/anthropic_adapter.py b/luxx/services/llm_adapters/anthropic_adapter.py new file mode 100644 index 0000000..5c2f6ba --- /dev/null +++ b/luxx/services/llm_adapters/anthropic_adapter.py @@ -0,0 +1,398 @@ +"""Anthropic Adapter - Anthropic Claude API adapter + +Supports Anthropic Claude API streaming and non-streaming responses. +""" +import json +import logging +from typing import Dict, List, Any, AsyncGenerator + +from .base import ProviderAdapter +from ..llm_response import ParsedDelta, LLMResponse + +logger = logging.getLogger(__name__) + + +class AnthropicAdapter(ProviderAdapter): + """Anthropic Claude API adapter + + Anthropic API uses a completely different format from OpenAI: + - Endpoint: POST /v1/messages + - Streaming: SSE events (content_block_start, content_block_delta, etc.) + - Thinking: Independent thinking type content block + - Tools: tool_use type content block + + Reference: https://docs.anthropic.com/claude/reference/messages + """ + + # Anthropic API endpoint suffix + MESSAGES_PATH = "/v1/messages" + + # Anthropic API version + ANTHROPIC_VERSION = "2023-06-01" + + # Content block types + BLOCK_MESSAGE_START = "message_start" + BLOCK_CONTENT_BLOCK_START = "content_block_start" + BLOCK_CONTENT_BLOCK_DELTA = "content_block_delta" + BLOCK_CONTENT_BLOCK_STOP = "content_block_stop" + BLOCK_MESSAGE_DELTA = "message_delta" + BLOCK_MESSAGE_STOP = "message_stop" + BLOCK_ERROR = "error" + + # Delta types + DELTA_THINKING = "thinking_delta" + DELTA_TEXT = "text_delta" + DELTA_INPUT_JSON = "input_json_delta" + + # Content block subtypes + SUBTYPE_THINKING = "thinking" + SUBTYPE_TEXT = "text" + SUBTYPE_TOOL_USE = "tool_use" + + def __init__(self): + # Buffers for accumulating streaming content + self._thinking_buffer = "" + self._text_buffer = "" + # Buffers for accumulating deltas + self._tool_args_buffer = "" + self._current_tool_index = -1 + self._current_tool_name = "" + self._usage = {} + + @property + def provider_type(self) -> str: + return "anthropic" + + def build_request( + self, + model: str, + messages: List[Dict[str, Any]], + tools: List[Dict[str, Any]] = None, + **kwargs + ) -> tuple[Dict[str, Any], Dict[str, str]]: + """Build Anthropic-format request + + Anthropic request format differs from OpenAI: + - Uses "messages" instead of "message" + - Requires "max_tokens" + - Different tool format + + Args: + model: Model name (e.g., claude-3-5-sonnet-20241022) + messages: Message list + tools: Tool definition list + **kwargs: Other parameters + + Returns: + tuple: (body, headers) + """ + api_key = kwargs.get("api_key", "") + + headers = { + "Content-Type": "application/json", + "Authorization": f"Bearer {api_key}", + "anthropic-version": self.ANTHROPIC_VERSION + } + + # Convert messages to Anthropic format + anthropic_messages = self._convert_messages(messages) + + body = { + "model": model, + "messages": anthropic_messages, + "stream": kwargs.get("stream", True), + "max_tokens": kwargs.get("max_tokens", 4096) + } + + # System message + if "system" in kwargs: + body["system"] = kwargs["system"] + else: + # Extract from first message + for msg in messages: + if msg.get("role") == "system": + body["system"] = msg.get("content", "") + break + + # Thinking capability (Claude 3.5+) + if kwargs.get("thinking_enabled"): + body["thinking"] = { + "type": "enabled", + "budget_tokens": kwargs.get("thinking_budget_tokens", 10000) + } + + # Tool definitions + if tools: + body["tools"] = self._convert_tools(tools) + + # Optional parameters + if "temperature" in kwargs: + body["temperature"] = kwargs["temperature"] + + if "top_p" in kwargs: + body["top_p"] = kwargs["top_p"] + + if "stop_sequences" in kwargs: + body["stop_sequences"] = kwargs["stop_sequences"] + + return body, headers + + def _convert_messages(self, messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """Convert messages to Anthropic format + + Anthropic message format: + - role: user, assistant + - content: str or List[Dict] + + Args: + messages: OpenAI-format message list + + Returns: + Anthropic-format message list + """ + result = [] + + for msg in messages: + role = msg.get("role") + content = msg.get("content", "") + + # Skip system messages (handled separately) + if role == "system": + continue + + # Process content + if isinstance(content, str): + anthropic_content = content + elif isinstance(content, dict): + anthropic_content = content.get("text", "") + else: + anthropic_content = str(content) + + # Anthropic doesn't support tool role as message + if role == "tool": + # Tool results passed via tool_use block + continue + + result.append({ + "role": role, + "content": anthropic_content + }) + + return result + + def _convert_tools(self, tools: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """Convert tool definitions to Anthropic format + + Anthropic tool format: + { + "name": "function_name", + "description": "...", + "input_schema": {...} # JSON Schema + } + + Args: + tools: OpenAI-format tool list + + Returns: + Anthropic-format tool list + """ + result = [] + + for tool in tools: + func = tool.get("function", {}) + result.append({ + "name": func.get("name"), + "description": func.get("description", ""), + "input_schema": func.get("parameters", {"type": "object", "properties": {}}) + }) + + return result + + async def parse_stream_chunk( + self, + raw_chunk: str + ) -> AsyncGenerator[ParsedDelta, None]: + """Parse Anthropic-format SSE stream + + Anthropic streaming events: + - message_start: Message start + - content_block_start: Content block start (thinking/text/tool_use) + - content_block_delta: Content block delta + - content_block_stop: Content block stop + - message_delta: Message delta (usage) + - message_stop: Message completely stopped + - error: Error + + Args: + raw_chunk: Raw SSE line + + Yields: + ParsedDelta objects + """ + # Reset buffers + self._reset_buffers() + + try: + chunk = json.loads(raw_chunk.strip()) + except json.JSONDecodeError: + return + + chunk_type = chunk.get("type", "") + result = ParsedDelta() + + if chunk_type == self.BLOCK_MESSAGE_START: + # Message start + pass + + elif chunk_type == self.BLOCK_CONTENT_BLOCK_START: + # Content block start + block = chunk.get("content_block", {}) + block_type = block.get("type") + index = chunk.get("index", 0) + + if block_type == self.SUBTYPE_THINKING: + # Thinking block start + thinking_text = block.get("thinking", {}).get("thinking", "") + self._thinking_buffer = thinking_text + result.thinking = self._thinking_buffer + + elif block_type == self.SUBTYPE_TOOL_USE: + # Tool use block start + self._current_tool_index = index + self._current_tool_name = block.get("name", "") + self._tool_args_buffer = "" + + elif chunk_type == self.BLOCK_CONTENT_BLOCK_DELTA: + # Content block delta + delta = chunk.get("delta", {}) + delta_type = delta.get("type", "") + + if delta_type == self.DELTA_THINKING: + # Thinking delta + thinking = delta.get("thinking", "") + self._thinking_buffer += thinking + result.thinking = self._thinking_buffer + + elif delta_type == self.DELTA_TEXT: + # Text delta + text = delta.get("text", "") + self._text_buffer += text + result.text = self._text_buffer + + elif delta_type == self.DELTA_INPUT_JSON: + # Tool arguments delta - accumulate but don't return in result + partial_json = delta.get("partial_json", "") + self._tool_args_buffer += partial_json + + elif chunk_type == self.BLOCK_CONTENT_BLOCK_STOP: + # Content block stop + pass + + elif chunk_type == self.BLOCK_MESSAGE_DELTA: + # Message delta (usually contains usage) + delta = chunk.get("delta", {}) + usage = chunk.get("usage", {}) + + self._usage = { + "prompt_tokens": usage.get("input_tokens", 0), + "completion_tokens": usage.get("output_tokens", 0), + "total_tokens": usage.get("input_tokens", 0) + usage.get("output_tokens", 0) + } + result.usage = self._usage + + # Check if complete by stop reason + if delta.get("stop_reason"): + result.is_complete = True + + elif chunk_type == self.BLOCK_MESSAGE_STOP: + # Message completely stopped + result.is_complete = True + + elif chunk_type == self.BLOCK_ERROR: + # Error + error = chunk.get("error", {}) + error_msg = error.get("type", "") + ": " + error.get("message", "") + logger.error(f"Anthropic API error: {error_msg}") + yield ParsedDelta() + return + + yield result + + def parse_response( + self, + data: Dict[str, Any] + ) -> LLMResponse: + """Parse Anthropic-format non-streaming response + + Anthropic response format: + { + "id": "...", + "type": "message", + "role": "assistant", + "content": [ + {"type": "text", "text": "..."}, + {"type": "thinking", "thinking": "..."}, + {"type": "tool_use", "id": "...", "name": "...", "input": {...}} + ], + "model": "...", + "usage": {"input_tokens": ..., "output_tokens": ...} + } + + Args: + data: API response data + + Returns: + LLMResponse object + """ + contents = data.get("content", []) + + text_parts = [] + thinking = "" + tool_calls = [] + + for block in contents: + block_type = block.get("type") + + if block_type == self.SUBTYPE_TEXT: + text_parts.append(block.get("text", "")) + + elif block_type == self.SUBTYPE_THINKING: + thinking = block.get("thinking", "") + + elif block_type == self.SUBTYPE_TOOL_USE: + tool_calls.append({ + "id": block.get("id"), + "type": "function", + "function": { + "name": block.get("name"), + "arguments": json.dumps(block.get("input", {})) + } + }) + + usage = data.get("usage", {}) + + return LLMResponse( + content="\n".join(text_parts), + thinking=thinking, + tool_calls=tool_calls if tool_calls else None, + usage={ + "prompt_tokens": usage.get("input_tokens", 0), + "completion_tokens": usage.get("output_tokens", 0), + "total_tokens": usage.get("input_tokens", 0) + usage.get("output_tokens", 0) + } + ) + + def supports_thinking(self) -> bool: + return True + + def supports_tools(self) -> bool: + return True + + def _reset_buffers(self): + """Reset buffers (call when starting new message)""" + self._thinking_buffer = "" + self._text_buffer = "" + self._tool_args_buffer = "" + self._current_tool_index = -1 + self._current_tool_name = "" + self._usage = {} diff --git a/luxx/services/llm_adapters/base.py b/luxx/services/llm_adapters/base.py new file mode 100644 index 0000000..33c3bfd --- /dev/null +++ b/luxx/services/llm_adapters/base.py @@ -0,0 +1,97 @@ +"""ProviderAdapter - LLM Provider adapter base class + +Defines unified adapter interface that all Provider adapters must implement. +""" +from abc import ABC, abstractmethod +from typing import Dict, List, Any, AsyncGenerator + + +class ProviderAdapter(ABC): + """LLM Provider adapter base class + + All LLM API adapters must inherit from this class and implement its methods. + Adapters are responsible for: + 1. Building requests in provider-specific format + 2. Parsing streaming and non-streaming responses + 3. Providing provider capability information + + Attributes: + provider_type: Provider type identifier + """ + + @property + @abstractmethod + def provider_type(self) -> str: + """Return provider type identifier + + Returns: + str: Provider type, e.g., "openai", "anthropic" + """ + pass + + @abstractmethod + def build_request( + self, + model: str, + messages: List[Dict[str, Any]], + tools: List[Dict[str, Any]] = None, + **kwargs + ) -> tuple[Dict[str, Any], Dict[str, str]]: + """Build request body and headers + + Args: + model: Model name + messages: Message list + tools: Tool definition list + **kwargs: Other parameters (temperature, max_tokens, thinking_enabled, etc.) + + Returns: + tuple: (body, headers) tuple + """ + pass + + @abstractmethod + async def parse_stream_chunk( + self, + raw_chunk: str + ) -> AsyncGenerator[Any, None]: + """Parse streaming response chunk + + Args: + raw_chunk: Raw SSE line or chunk data + + Yields: + ParsedDelta object or other parsed results + """ + pass + + @abstractmethod + def parse_response( + self, + data: Dict[str, Any] + ) -> Any: + """Parse non-streaming response + + Args: + data: API response data + + Returns: + LLMResponse object + """ + pass + + def supports_thinking(self) -> bool: + """Whether provider supports thinking/reasoning + + Returns: + bool: True if provider supports thinking/reasoning content + """ + return False + + def supports_tools(self) -> bool: + """Whether provider supports tool/function calls + + Returns: + bool: True if provider supports function calling + """ + return False diff --git a/luxx/services/llm_adapters/openai_adapter.py b/luxx/services/llm_adapters/openai_adapter.py new file mode 100644 index 0000000..2b1e38f --- /dev/null +++ b/luxx/services/llm_adapters/openai_adapter.py @@ -0,0 +1,238 @@ +"""OpenAI Adapter - OpenAI-compatible API adapter + +Supports OpenAI, DeepSeek, GLM and other OpenAI-compatible APIs. +""" +import json +import logging +from typing import Dict, List, Any, AsyncGenerator + +from .base import ProviderAdapter +from ..llm_response import ParsedDelta, LLMResponse, StreamAccumulator, llm_parser_factory + +logger = logging.getLogger(__name__) + + +class OpenAIAdapter(ProviderAdapter): + """OpenAI-compatible API adapter + + Supported Providers: + - OpenAI (api.openai.com) + - DeepSeek (api.deepseek.com) + - GLM/Zhipu AI + - Any service compatible with OpenAI Chat Completions API + + Features: + - Thinking content (reasoning_content, reasoning) + - Tool calls (tool_calls) + - Streaming responses (SSE) + """ + + @property + def provider_type(self) -> str: + return "openai" + + def __init__(self): + self._accumulator = llm_parser_factory() + + def build_request( + self, + model: str, + messages: List[Dict[str, Any]], + tools: List[Dict[str, Any]] = None, + **kwargs + ) -> tuple[Dict[str, Any], Dict[str, str]]: + """Build OpenAI-format request""" + api_key = kwargs.get("api_key", "") + + headers = { + "Content-Type": "application/json", + "Authorization": f"Bearer {api_key}" + } + + body = { + "model": model, + "messages": messages, + "stream": kwargs.get("stream", True) + } + + # Optional parameters + if "temperature" in kwargs: + body["temperature"] = kwargs["temperature"] + + if "max_tokens" in kwargs: + body["max_tokens"] = kwargs["max_tokens"] + + if "top_p" in kwargs: + body["top_p"] = kwargs["top_p"] + + if "frequency_penalty" in kwargs: + body["frequency_penalty"] = kwargs["frequency_penalty"] + + if "presence_penalty" in kwargs: + body["presence_penalty"] = kwargs["presence_penalty"] + + if "stop" in kwargs: + body["stop"] = kwargs["stop"] + + # Tool definitions + if tools: + body["tools"] = tools + + # Thinking capability (DeepSeek, etc.) + if kwargs.get("thinking_enabled"): + body["thinking_enabled"] = True + body["thoughts"] = [{"type": "thought", "text": ""}] + + return body, headers + + def reset(self): + """Reset accumulator for new stream""" + self._accumulator.reset() + + async def parse_stream_chunk( + self, + raw_chunk: str + ) -> AsyncGenerator[ParsedDelta, None]: + """Parse OpenAI-format SSE stream""" + # Parse SSE line + event_type, data_str = self._parse_sse_line(raw_chunk) + + # Skip empty data + if not data_str: + return + + # Handle [DONE] marker + if data_str == "[DONE]": + self._accumulator.set_complete() + yield self._accumulator._create_delta() + return + + try: + chunk = json.loads(data_str) + except json.JSONDecodeError: + logger.warning(f"Failed to parse chunk: {data_str[:100]}") + return + + # Handle errors + if event_type == "error" or "error" in chunk: + error_content = chunk.get("error", {}).get("message", str(chunk)) + logger.error(f"Stream error: {error_content}") + yield ParsedDelta() + return + + # Extract usage (usually in the last chunk) + usage = chunk.get("usage") + if usage: + self._accumulator.set_usage(usage) + + # Parse choices + for choice in chunk.get("choices", []): + delta = choice.get("delta", {}) + + # Handle thinking content (DeepSeek, etc.) + thinking = delta.get("reasoning_content") or delta.get("reasoning") or "" + if thinking: + self._accumulator.thinking += thinking + self._accumulator.thinking = self._accumulator.thinking # trigger setter + + # Handle text content + content = delta.get("content") or "" + if content: + # Check for embedded thinking tags + thinking_part, clean_text = self._extract_thinking_tags(content) + + if thinking_part: + self._accumulator.thinking += thinking_part + if clean_text: + self._accumulator.text += clean_text + + # Tool calls + tool_calls = delta.get("tool_calls") + if tool_calls: + self._accumulator.tool_calls = tool_calls + + # Check if complete + finish_reason = choice.get("finish_reason") + if finish_reason: + self._accumulator.is_complete = True + + # Only yield if there's meaningful content + if self._accumulator.has_content() or self._accumulator.is_complete: + yield self._accumulator._create_delta() + + def parse_response( + self, + data: Dict[str, Any] + ) -> LLMResponse: + """Parse OpenAI-format non-streaming response""" + choice = data.get("choices", [{}])[0] + message = choice.get("message", {}) + + content = message.get("content", "") or "" + tool_calls = message.get("tool_calls") + usage = data.get("usage") + + # Extract thinking content + thinking = "" + if content: + thinking, clean_content = self._extract_thinking_tags(content) + content = clean_content + + # DeepSeek may put thinking content in separate field + if not thinking: + thinking = message.get("reasoning_content") or "" + + return LLMResponse( + content=content, + thinking=thinking, + tool_calls=tool_calls, + usage=usage + ) + + def supports_thinking(self) -> bool: + return True + + def supports_tools(self) -> bool: + return True + + def _parse_sse_line(self, line: str) -> tuple: + """Parse SSE line""" + event_type = None + data_str = None + + for part in line.strip().split('\n'): + if part.startswith('event: '): + event_type = part[7:].strip() + elif part.startswith('data: '): + data_str = part[6:].strip() + + return event_type, data_str + + def _extract_thinking_tags(self, content: str) -> tuple: + """Extract thinking tags from content + + Supported formats: + - Standard: ... + """ + thinking_parts = [] + clean_parts = [] + i = 0 + + while i < len(content): + remaining = content[i:].lower() + + # Standard format + if remaining.startswith(""): + end_tag = "" + start = i + 7 # len("") + end = content.find(end_tag, start) + if end != -1: + thinking_parts.append(content[start:end]) + i = end + len(end_tag) + continue + + # Regular character + clean_parts.append(content[i]) + i += 1 + + return "".join(thinking_parts), "".join(clean_parts) diff --git a/luxx/services/llm_client.py b/luxx/services/llm_client.py index fac67a7..680c5a5 100644 --- a/luxx/services/llm_client.py +++ b/luxx/services/llm_client.py @@ -1,165 +1,258 @@ -"""LLM API client""" +"""LLM API Client - Unified client with multi-Provider support + +Supports various LLM API formats: +- OpenAI (api.openai.com) +- DeepSeek (api.deepseek.com) +- Anthropic (api.anthropic.com) +- GLM/Zhipu AI + +Usage: + from luxx.services.llm_client import LLMClient + + # Auto-detect provider + client = LLMClient(api_key="...", api_url="...") + + # Specify provider + client = LLMClient(api_key="...", api_url="...", provider_type="anthropic") + + # Streaming call + async for delta in client.stream_call(model, messages, tools=tools): + print(delta.text, delta.thinking, delta.tool_calls) +""" import json -import httpx import logging import traceback -from typing import Dict, Any, Optional, List, AsyncGenerator +from typing import Dict, List, Any, Optional, AsyncGenerator + +import httpx from luxx.config import config +from luxx.services.llm_adapters import ( + ProviderAdapter, + OpenAIAdapter, + AnthropicAdapter, +) +from luxx.services.llm_response import ParsedDelta, LLMResponse logger = logging.getLogger(__name__) -class LLMResponse: - """LLM response""" - content: str - tool_calls: Optional[List[Dict]] = None - usage: Optional[Dict] = None +class LLMClient: + """LLM API Client with multi-Provider support + + Uses adapter pattern to support different API formats, auto-detects or manually specifies Provider type. + + Attributes: + api_key: API key + api_url: API base URL + default_model: Default model + provider_type: Provider type + adapter: Current adapter instance + """ + + # Provider type to adapter class mapping + PROVIDER_ADAPTERS: Dict[str, type] = { + # OpenAI-compatible formats + "openai": OpenAIAdapter, + "deepseek": OpenAIAdapter, + "glm": OpenAIAdapter, + "zhipu": OpenAIAdapter, + # Anthropic formats + "anthropic": AnthropicAdapter, + "claude": AnthropicAdapter, + } + + # URL keywords for provider detection + PROVIDER_URL_KEYWORDS: Dict[str, List[str]] = { + "anthropic": ["anthropic", "claude"], + "deepseek": ["deepseek"], + "glm": ["glm", "zhipu", "chatglm"], + "openai": ["openai"], + } def __init__( self, - content: str = "", - tool_calls: Optional[List[Dict]] = None, - usage: Optional[Dict] = None + api_key: str = None, + api_url: str = None, + model: str = None, + provider_type: str = None ): - self.content = content - self.tool_calls = tool_calls - self.usage = usage - - -class LLMClient: - """LLM API client with multi-provider support""" - - def __init__(self, api_key: str = None, api_url: str = None, model: str = None): + """Initialize LLM client + + Args: + api_key: API key, defaults to config value + api_url: API base URL, defaults to config value + model: Default model name + provider_type: Specify Provider type, defaults to auto-detect + """ self.api_key = api_key or config.llm_api_key self.api_url = api_url or config.llm_api_url self.default_model = model - self.provider = self._detect_provider() + + # Detect or use specified provider + if provider_type: + self.provider_type = provider_type + else: + self.provider_type = self._detect_provider_type(api_url) + + self.adapter = self._create_adapter() self._client: Optional[httpx.AsyncClient] = None - def _detect_provider(self) -> str: - """Detect provider from URL""" - url = self.api_url.lower() - if "deepseek" in url: - return "deepseek" - elif "glm" in url or "zhipu" in url: - return "glm" - elif "openai" in url: - return "openai" + def _detect_provider_type(self, url: str = None) -> str: + """Detect Provider type from URL + + Args: + url: API URL, uses self.api_url if None + + Returns: + Provider type string + """ + url = url or self.api_url + url_lower = url.lower() + + for provider, keywords in self.PROVIDER_URL_KEYWORDS.items(): + for keyword in keywords: + if keyword in url_lower: + logger.debug(f"Detected provider '{provider}' from URL: {url}") + return provider + + logger.debug(f"Defaulting to 'openai' for URL: {url}") return "openai" - async def close(self): - """Close client""" - if self._client: - await self._client.aclose() - self._client = None - - def _build_headers(self) -> Dict[str, str]: - """Build request headers""" - return { - "Content-Type": "application/json", - "Authorization": f"Bearer {self.api_key}" - } - - def _build_body( - self, - model: str, - messages: List[Dict], - tools: Optional[List[Dict]] = None, - stream: bool = False, - **kwargs - ) -> Dict[str, Any]: - """Build request body""" - body = { - "model": model, - "messages": messages, - "stream": stream - } + def _create_adapter(self) -> ProviderAdapter: + """Create adapter instance - if "temperature" in kwargs: - body["temperature"] = kwargs["temperature"] - - if "max_tokens" in kwargs: - body["max_tokens"] = kwargs["max_tokens"] - - if "thinking_enabled" in kwargs and kwargs["thinking_enabled"]: - body["thinking_enabled"] = True - - if tools: - body["tools"] = tools - - return body - - def _parse_response(self, data: Dict) -> LLMResponse: - """Parse response""" - content = "" - tool_calls = None - usage = None - - if "choices" in data: - choice = data["choices"][0] - content = choice.get("message", {}).get("content", "") - tool_calls = choice.get("message", {}).get("tool_calls") - - if "usage" in data: - usage = data["usage"] - - return LLMResponse( - content=content, - tool_calls=tool_calls, - usage=usage + Returns: + ProviderAdapter subclass instance + """ + adapter_class = self.PROVIDER_ADAPTERS.get( + self.provider_type, + OpenAIAdapter ) + logger.info(f"Created {adapter_class.__name__} for provider: {self.provider_type}") + return adapter_class() + + @property + def supports_thinking(self) -> bool: + """Whether current Provider supports thinking content""" + return self.adapter.supports_thinking() + + @property + def supports_tools(self) -> bool: + """Whether current Provider supports tool calls""" + return self.adapter.supports_tools() async def client(self) -> httpx.AsyncClient: - """Get HTTP client""" - if self._client is None: + """Get HTTP client (lazy load)""" + if self._client is None or self._client.is_closed: self._client = httpx.AsyncClient(timeout=120.0) return self._client - async def sync_call( + async def close(self): + """Close HTTP client""" + if self._client and not self._client.is_closed: + await self._client.aclose() + self._client = None + + def sync_call( self, model: str, - messages: List[Dict], - tools: Optional[List[Dict]] = None, + messages: List[Dict[str, Any]], + tools: List[Dict[str, Any]] = None, **kwargs ) -> LLMResponse: - """Call LLM API (non-streaming)""" - body = self._build_body(model, messages, tools, stream=False, **kwargs) + """Synchronous call to LLM (non-streaming) - async with httpx.AsyncClient(timeout=120.0) as client: - response = await client.post( - self.api_url, - headers=self._build_headers(), - json=body - ) - response.raise_for_status() - data = response.json() + Args: + model: Model name + messages: Message list + tools: Tool definition list + **kwargs: Other parameters (temperature, max_tokens, thinking_enabled, etc.) + + Returns: + LLMResponse object + """ + import asyncio + return asyncio.get_event_loop().run_until_complete( + self.async_sync_call(model, messages, tools, **kwargs) + ) + + async def async_sync_call( + self, + model: str, + messages: List[Dict[str, Any]], + tools: List[Dict[str, Any]] = None, + **kwargs + ) -> LLMResponse: + """Internal async sync call""" + model = model or self.default_model + kwargs["api_key"] = self.api_key - return self._parse_response(data) + body, headers = self.adapter.build_request( + model, messages, tools, stream=False, **kwargs + ) + + endpoint = self.api_url + logger.info(f"Sync call to {endpoint} with model {model}") + + try: + async with httpx.AsyncClient(timeout=120.0) as client: + response = await client.post( + endpoint, + headers=headers, + json=body + ) + response.raise_for_status() + data = response.json() + + return self.adapter.parse_response(data) + + except httpx.HTTPStatusError as e: + logger.error(f"HTTP error: {e.response.status_code} - {e.response.text}") + raise + except Exception as e: + logger.error(f"Sync call error: {e}\n{traceback.format_exc()}") + raise async def stream_call( self, model: str, - messages: List[Dict], - tools: Optional[List[Dict]] = None, + messages: List[Dict[str, Any]], + tools: List[Dict[str, Any]] = None, **kwargs - ) -> AsyncGenerator[str, None]: - """Stream call LLM API - yields raw SSE event lines + ) -> AsyncGenerator[ParsedDelta, None]: + """Streaming call to LLM + Args: + model: Model name + messages: Message list + tools: Tool definition list + **kwargs: Other parameters + Yields: - str: Raw SSE event lines for direct forwarding + ParsedDelta objects with accumulated content """ - body = self._build_body(model, messages, tools, stream=True, **kwargs) + # Reset adapter buffers for new stream + if hasattr(self.adapter, 'reset'): + self.adapter.reset() - logger.info(f"Starting stream_call for model: {model}, messages count: {len(messages)}") + model = model or self.default_model + kwargs["api_key"] = self.api_key + kwargs["stream"] = True + + body, headers = self.adapter.build_request( + model, messages, tools, **kwargs + ) + + endpoint = self.api_url + logger.info(f"Stream call to {endpoint} with model {model}") try: async with httpx.AsyncClient(timeout=120.0) as client: - logger.info(f"Sending request to {self.api_url}") async with client.stream( "POST", - self.api_url, - headers=self._build_headers(), + endpoint, + headers=headers, json=body ) as response: logger.info(f"Response status: {response.status_code}") @@ -167,15 +260,40 @@ class LLMClient: async for line in response.aiter_lines(): if line.strip(): - yield line + "\n" + async for delta in self.adapter.parse_stream_chunk(line): + yield delta + except httpx.HTTPStatusError as e: status_code = e.response.status_code if e.response else "?" - logger.error(f"HTTP error: {status_code}") - yield f"event: error\ndata: {json.dumps({'content': f'HTTP {status_code}: Request failed'})}\n\n" + error_body = e.response.text if e.response else "" + logger.error(f"HTTP error: {status_code} - {error_body}") + yield ParsedDelta() except Exception as e: - logger.error(f"Exception: {type(e).__name__}: {str(e)}\n{traceback.format_exc()}") - yield f"event: error\ndata: {json.dumps({'content': str(e)})}\n\n" + logger.error(f"Stream error: {type(e).__name__}: {e}\n{traceback.format_exc()}") + yield ParsedDelta() -# Global LLM client -llm_client = LLMClient() +# Convenience function +def create_client( + api_key: str = None, + api_url: str = None, + model: str = None, + provider_type: str = None +) -> LLMClient: + """Convenience function to create LLM client + + Args: + api_key: API key + api_url: API URL + model: Model + provider_type: Provider type + + Returns: + LLMClient instance + """ + return LLMClient( + api_key=api_key, + api_url=api_url, + model=model, + provider_type=provider_type + ) diff --git a/luxx/services/llm_response.py b/luxx/services/llm_response.py index f98ec13..5d6cfbf 100644 --- a/luxx/services/llm_response.py +++ b/luxx/services/llm_response.py @@ -1,309 +1,162 @@ -"""LLM Response Parser - Unified parser for multiple LLM API formats. +"""LLM Response - Unified LLM response data structures -Supported Providers: -- OpenAI: delta.content, delta.tool_calls -- DeepSeek: delta.content, delta.reasoning_content, delta.tool_calls -- Anthropic: content_block with thinking/text types -- MiniMax: <|im_start|>thinking...<|im_end|> tags in content +Provides unified response format for data transfer between adapter layer and business layer. -Data Flow: -``` -LLM API Response (SSE) - │ - ▼ -LLMResponseParser.parse_chunk() - │ - ├──► ParsedDelta { thinking, text, tool_calls } - │ - ▼ -AgenticLoop._process_stream_line() - │ - ▼ -SSE Events (process_step) - │ - ├──► type: "thinking" - ├──► type: "text" - └──► type: "tool_call" -``` - -API Response Formats: - -1. OpenAI Standard (DeepSeek, OpenAI): -```json -{ - "choices": [{ - "delta": { - "content": "Hello", - "reasoning_content": "Let me think...", - "tool_calls": [{"id": "call_1", "function": {...}}] - } - }] -} -``` - -2. Anthropic Streaming: -```json -{"type": "content_block_start", "content_block": {"type": "thinking", "thinking": "..."}} -{"type": "content_block_delta", "delta": {"type": "thinking_delta", "thinking": "..."}} -{"type": "content_block_delta", "delta": {"type": "text_delta", "text": "..."}} -{"type": "content_block_stop"} -``` - -3. MiniMax (with thinking tags in content): -```json -{ - "choices": [{ - "delta": { - "content": "<|im_start|>thinking分析中...<|im_end|>回复内容" - } - }] -} -``` - -4. Standard thinking tags: -```json -{ - "choices": [{ - "delta": { - "content": "思考内容回复内容" - } - }] -} -``` +Data format: +1. ParsedDelta: Single delta in streaming response (with accumulated content) +2. LLMResponse: Complete LLM response +3. StreamAccumulator: Helper class for accumulating streaming content """ from typing import Dict, Any, Optional, List -from dataclasses import dataclass +from dataclasses import dataclass, field @dataclass class ParsedDelta: - """Parsed response delta from LLM. + """Streaming response delta + + Contains accumulated content from streaming response. Attributes: - thinking: Thinking/reasoning content - text: Regular text content - tool_calls: Tool call requests - is_complete: Whether this delta completes a content block + thinking: Accumulated thinking/reasoning content + text: Accumulated text content + tool_calls: List of tool calls + is_complete: Whether the response is complete + usage: Token usage statistics """ thinking: str = "" text: str = "" - tool_calls: List[Dict] = None + tool_calls: List[Dict] = field(default_factory=list) is_complete: bool = False + usage: Dict[str, int] = None def __post_init__(self): if self.tool_calls is None: self.tool_calls = [] - - -class LLMResponseParser: - """Unified parser for LLM API response formats. + if self.usage is None: + self.usage = {} - Usage: - from luxx.services.llm_response import llm_parser - - # Parse OpenAI format - delta = {"content": "Hello", "reasoning_content": "Thinking..."} - parsed = llm_parser.parse_openai(delta) - - # Parse Anthropic format - chunk = {"type": "content_block_delta", "delta": {"type": "thinking_delta", "thinking": "..."}} - parsed = llm_parser.parse_anthropic(chunk) - - # Auto-detect format - parsed = llm_parser.parse_chunk(chunk, provider="anthropic") + def has_content(self) -> bool: + """Check if there's any meaningful content""" + return bool(self.thinking or self.text or self.tool_calls) + + def merge(self, other: 'ParsedDelta') -> 'ParsedDelta': + """Merge another delta""" + return ParsedDelta( + thinking=self.thinking + other.thinking, + text=self.text + other.text, + tool_calls=self.tool_calls or other.tool_calls, + is_complete=other.is_complete, + usage=other.usage or self.usage + ) + + +@dataclass +class LLMResponse: + """Complete LLM response + + Attributes: + content: Final text content + thinking: Final thinking content (if any) + tool_calls: List of tool calls (if any) + usage: Token usage + """ + content: str = "" + thinking: str = "" + tool_calls: List[Dict] = field(default_factory=list) + usage: Dict[str, int] = None + + def __post_init__(self): + if self.tool_calls is None: + self.tool_calls = [] + if self.usage is None: + self.usage = {} + + @property + def has_tool_calls(self) -> bool: + """Check if there are tool calls""" + return bool(self.tool_calls) + + def to_message_content(self) -> str: + """Convert to message content format""" + return self.content + + +class StreamAccumulator: + """Accumulates streaming response content + + Helper class for adapters to accumulate streaming content + and create ParsedDelta with accumulated results. """ - # Content block types - BLOCK_THINKING = "thinking" - BLOCK_TEXT = "text" - BLOCK_TOOL_USE = "tool_use" - BLOCK_TOOL_RESULT = "tool_result" - def __init__(self): - self._buffer = "" - self._thinking_buffer = "" - self._text_buffer = "" + self.reset() def reset(self): - """Reset parser state for new message.""" - self._buffer = "" - self._thinking_buffer = "" - self._text_buffer = "" + """Reset all buffers for new stream""" + self.thinking = "" + self.text = "" + self.tool_calls: List[Dict] = [] + self.is_complete = False + self.usage: Dict[str, int] = {} - def parse_openai(self, delta: Dict) -> ParsedDelta: - """Parse OpenAI format delta. - - Handles: - - OpenAI: delta.content, delta.tool_calls - - DeepSeek: delta.content, delta.reasoning_content, delta.tool_calls - - MiniMax: <|im_start|>thinking...<|im_end|> in content - - Standard: ... in content - - Args: - delta: Delta object from LLM API response - - Returns: - ParsedDelta with extracted thinking, text, and tool_calls - """ - result = ParsedDelta() - - # Get thinking content (DeepSeek uses reasoning_content) - thinking = delta.get("reasoning_content") or delta.get("reasoning") or "" - if thinking: - self._thinking_buffer += thinking - result.thinking = self._thinking_buffer - - # Get text content - text = delta.get("content") or "" - if text: - # Check for embedded thinking tags (MiniMax format) - thinking_part, clean_text = self._extract_thinking_tags(text) - if thinking_part: - self._thinking_buffer += thinking_part - result.thinking = self._thinking_buffer - if clean_text: - self._text_buffer += clean_text - result.text = self._text_buffer - elif thinking_part := delta.get("thinking"): - # Some providers use "thinking" field directly - self._thinking_buffer += thinking_part - result.thinking = self._thinking_buffer - - # Tool calls - result.tool_calls = delta.get("tool_calls") or [] - - return result + def has_content(self) -> bool: + """Check if there's any meaningful content""" + return bool(self.thinking or self.text or self.tool_calls) - def parse_anthropic(self, chunk: Dict) -> ParsedDelta: - """Parse Anthropic streaming format. - - Anthropic uses a different event structure: - - content_block_start: Begin a content block - - content_block_delta: Incremental content - - content_block_stop: End of content blocks - - Content block types: - - thinking: Model reasoning - - text: Regular text - - tool_use: Tool invocation - - tool_result: Tool output - - Args: - chunk: Anthropic SSE event chunk - - Returns: - ParsedDelta with extracted content - """ - result = ParsedDelta() - chunk_type = chunk.get("type", "") - - if chunk_type == "content_block_start": - block = chunk.get("content_block", {}) - if block.get("type") == self.BLOCK_THINKING: - thinking = block.get("thinking", "") - if thinking: - self._thinking_buffer = thinking - result.thinking = self._thinking_buffer - - elif chunk_type == "content_block_delta": - delta = chunk.get("delta", {}) - delta_type = delta.get("type", "") - - if delta_type == "thinking_delta": - thinking = delta.get("thinking", "") - self._thinking_buffer += thinking - result.thinking = self._thinking_buffer - - elif delta_type == "text_delta": - text = delta.get("text", "") - self._text_buffer += text - result.text = self._text_buffer - - elif delta_type == "partial_json": - # Partial JSON for tool calls - pass - - elif chunk_type == "content_block_stop": - result.is_complete = True - - return result + def add_thinking(self, content: str) -> 'ParsedDelta': + """Add thinking/reasoning content""" + if content: + self.thinking = content + return self._create_delta() - def parse_chunk(self, chunk: Dict, provider: str = "openai") -> ParsedDelta: - """Parse chunk based on provider. - - Args: - chunk: Response chunk from LLM - provider: Provider name ("openai", "anthropic", "deepseek", "minimax") - - Returns: - ParsedDelta with extracted content - """ - if provider == "anthropic": - return self.parse_anthropic(chunk) - - # Default to OpenAI format - return self.parse_openai(chunk) + def add_text(self, content: str) -> 'ParsedDelta': + """Add text content""" + if content: + self.text = content + return self._create_delta() - def _extract_thinking_tags(self, content: str) -> tuple: - """Extract thinking content from tags. - - Handles multiple tag formats: - - MiniMax: <|im_start|>thinking...<|im_end|> - - Standard: ... - - Args: - content: Raw content string from LLM - - Returns: - Tuple of (thinking_content, clean_text) - """ - thinking_parts = [] - clean_parts = [] - i = 0 - - while i < len(content): - remaining = content[i:].lower() - - # Check for MiniMax format - if remaining.startswith("<|im_start|>thinking"): - end_tag = "<|im_end|>" - start = i + 21 # len("<|im_start|>thinking") - end = content.find(end_tag, start) - if end != -1: - thinking_parts.append(content[start:end]) - i = end + len(end_tag) - continue - - # Check for standard format - if remaining.startswith(""): - end_tag = "" - start = i + 7 # len("") - end = content.find(end_tag, start) - if end != -1: - thinking_parts.append(content[start:end]) - i = end + len(end_tag) - continue - - # Regular character - clean_parts.append(content[i]) - i += 1 - - return "".join(thinking_parts), "".join(clean_parts) + def add_tool_calls(self, tool_calls: List[Dict]) -> 'ParsedDelta': + """Add tool calls""" + if tool_calls: + self.tool_calls = tool_calls + return self._create_delta() - def has_thinking_tags(self, content: str) -> bool: - """Check if content contains thinking tags. - - Args: - content: Raw content string - - Returns: - True if content contains thinking tags - """ - if not content: - return False - lower = content.lower() - return "<|im_start|>thinking" in lower or "" in lower + def set_complete(self, is_complete: bool = True) -> 'ParsedDelta': + """Mark response as complete""" + self.is_complete = is_complete + return self._create_delta() + + def set_usage(self, usage: Dict[str, int]) -> 'ParsedDelta': + """Set token usage""" + if usage: + self.usage = usage + return self._create_delta() + + def _create_delta(self) -> ParsedDelta: + """Create ParsedDelta from current accumulated state""" + return ParsedDelta( + thinking=self.thinking, + text=self.text, + tool_calls=self.tool_calls, + is_complete=self.is_complete, + usage=self.usage + ) + + def to_response(self) -> LLMResponse: + """Convert to complete LLMResponse""" + return LLMResponse( + content=self.text, + thinking=self.thinking, + tool_calls=self.tool_calls, + usage=self.usage + ) -# Global parser instance -llm_parser = LLMResponseParser() +# Backward compatibility alias +LLMResponseParser = StreamAccumulator + + +def llm_parser_factory() -> StreamAccumulator: + """Factory function to create a new StreamAccumulator""" + return StreamAccumulator()