diff --git a/luxx/routes/messages.py b/luxx/routes/messages.py index fac223d..f9bc76f 100644 --- a/luxx/routes/messages.py +++ b/luxx/routes/messages.py @@ -10,11 +10,12 @@ from datetime import datetime from luxx.database import get_db from luxx.models import Conversation, Message, User from luxx.routes.auth import get_current_user -from luxx.services.chat import chat_service +from luxx.services.chat import create_chat_service from luxx.utils.helpers import generate_id, success_response, error_response router = APIRouter(prefix="/messages", tags=["Messages"]) +chat_service = create_chat_service() class MessageCreate(BaseModel): diff --git a/luxx/services/__init__.py b/luxx/services/__init__.py index 6cb90eb..a18b4b9 100644 --- a/luxx/services/__init__.py +++ b/luxx/services/__init__.py @@ -1,4 +1,4 @@ """Services module""" -from luxx.services.llm_client import LLMClient, llm_client, LLMResponse -from luxx.services.chat import ChatService, chat_service -from luxx.services.llm_response import LLMResponseParser, llm_parser, ParsedDelta +from luxx.services.llm_client import LLMClient +from luxx.services.llm_response import ParsedDelta, LLMResponse +from luxx.services.chat import ChatService, create_chat_service diff --git a/luxx/services/agentic_loop.py b/luxx/services/agentic_loop.py index 15fb2cb..d5ed504 100644 --- a/luxx/services/agentic_loop.py +++ b/luxx/services/agentic_loop.py @@ -8,17 +8,15 @@ The loop: 5. Repeat (max 10 iterations) 6. Return final response """ -import json import uuid import logging -import traceback -from typing import List, Dict, Any, AsyncGenerator +from typing import List, Dict, AsyncGenerator from luxx.tools.executor import ToolExecutor from luxx.services.llm_client import LLMClient from luxx.services.stream_context import StreamContext, _sse_event from luxx.services.process_result import ProcessResult -from luxx.services.llm_response import llm_parser +from luxx.services.llm_response import ParsedDelta logger = logging.getLogger(__name__) @@ -26,20 +24,11 @@ logger = logging.getLogger(__name__) MAX_ITERATIONS = 10 -def _parse_sse_line(line: str) -> tuple: - """Parse SSE line into (event_type, data_str).""" - event_type = None - data_str = None - for part in line.strip().split('\n'): - if part.startswith('event: '): - event_type = part[7:].strip() - elif part.startswith('data: '): - data_str = part[6:].strip() - return event_type, data_str - - class AgenticLoop: - """Executes the Agentic Loop: LLM + Tools iteration.""" + """Executes the Agentic Loop: LLM + Tools iteration. + + Supports multiple LLM Providers, auto-adapts response format. + """ def __init__(self, tool_executor: ToolExecutor): self.tool_executor = tool_executor @@ -66,8 +55,8 @@ class AgenticLoop: context.reset() has_error = False - # Stream LLM response - async for sse_line in llm.stream_call( + # Stream LLM response - now yields ParsedDelta directly + async for delta in llm.stream_call( model=model, messages=messages, tools=tools, @@ -75,8 +64,8 @@ class AgenticLoop: max_tokens=max_tokens, thinking_enabled=thinking_enabled ): - # Process stream line - result = self._process_stream_line(sse_line, context, total_usage) + # Process parsed delta + result = self._process_delta(delta, context, total_usage) # Yield events for event in result.events: @@ -110,85 +99,41 @@ class AgenticLoop: if not has_error: yield _sse_event("error", {"content": "Exceeded maximum tool call iterations"}) - def _process_stream_line(self, sse_line: str, ctx: 'StreamContext', - total_usage: dict) -> ProcessResult: - """Process single SSE line from LLM, return result with events and flags.""" + def _process_delta( + self, + delta: ParsedDelta, + ctx: 'StreamContext', + total_usage: dict + ) -> ProcessResult: + """Process ParsedDelta from adapter, return result with events and flags. + + Args: + delta: ParsedDelta from LLM adapter + ctx: StreamContext for state management + total_usage: Accumulated token usage + + Returns: + ProcessResult with events and flags + """ result = ProcessResult() - event_type, data_str = _parse_sse_line(sse_line) - if not data_str: + + # Check for error (empty delta with no content) + if not delta.has_content() and not delta.is_complete: + # Empty delta, possibly an error return result - # Handle upstream errors - if event_type == 'error': - try: - error_data = json.loads(data_str) - error_content = error_data.get("content", "Unknown error") - except json.JSONDecodeError: - error_content = data_str - result.set_error(error_content) - result.add_event(_sse_event("error", {"content": error_content})) - return result - - try: - chunk = json.loads(data_str) - except json.JSONDecodeError: - error_msg = f"Parse error: {data_str[:50]}" - result.set_error(error_msg) - result.add_event(_sse_event("error", {"content": error_msg})) - return result - - # Extract usage - if "usage" in chunk and chunk["usage"]: - usage = chunk["usage"] + # Update usage + if delta.usage: total_usage.update({ - "prompt_tokens": usage.get("prompt_tokens", 0), - "completion_tokens": usage.get("completion_tokens", 0), - "total_tokens": usage.get("total_tokens", 0) + "prompt_tokens": delta.usage.get("prompt_tokens", 0), + "completion_tokens": delta.usage.get("completion_tokens", 0), + "total_tokens": delta.usage.get("total_tokens", 0) }) - # Handle API errors - if "error" in chunk: - error_msg = chunk["error"].get("message", str(chunk["error"])) - result.set_error(error_msg) - result.add_event(_sse_event("error", {"content": f"API Error: {error_msg}"})) - return result - - # Get delta - choices = chunk.get("choices", []) - if not choices: - # Non-standard format: check for content directly - content = chunk.get("content") or "" - if content: - # Check for thinking tags in content - thinking_part, clean_text = llm_parser._extract_thinking_tags(content) - - if thinking_part: - ctx.full_thinking = (ctx.full_thinking or "") + thinking_part - if not ctx.current_step_id or ctx.current_step_type != "thinking": - ctx.start_step("thinking") - result.add_event(_sse_event("process_step", { - "step": {"id": ctx.current_step_id, "index": ctx.current_step_idx, "type": "thinking", "content": ctx.full_thinking} - })) - result.set_content() - - if clean_text: - ctx.full_content = (ctx.full_content or "") + clean_text - if not ctx.current_step_id or ctx.current_step_type != "text": - ctx.start_step("text") - result.add_event(_sse_event("process_step", { - "step": {"id": ctx.current_step_id, "index": ctx.current_step_idx, "type": "text", "content": ctx.full_content} - })) - result.set_content() - return result - - delta = choices[0].get("delta", {}) - - # Parse delta using unified parser - parsed = llm_parser.parse_openai(delta) - - # Process thinking content - if parsed.thinking: - ctx.full_thinking = parsed.thinking + # Process thinking content (incremental) + if delta.thinking: + logger.debug(f"Processing thinking: {delta.thinking[:50]}...") + ctx.full_thinking += delta.thinking # Accumulate incremental content if not ctx.current_step_id or ctx.current_step_type != "thinking": ctx.start_step("thinking") result.add_event(_sse_event("process_step", { @@ -201,9 +146,9 @@ class AgenticLoop: })) result.set_content() - # Process text content - if parsed.text: - ctx.full_content = parsed.text + # Process text content (incremental) + if delta.text: + ctx.full_content += delta.text # Accumulate incremental content if not ctx.current_step_id or ctx.current_step_type != "text": ctx.start_step("text") result.add_event(_sse_event("process_step", { @@ -216,10 +161,11 @@ class AgenticLoop: })) result.set_content() - # Accumulate tool calls - for tc in parsed.tool_calls or delta.get("tool_calls", []): - ctx.accumulate_tool_call(tc) - result.set_tool_calls() + # Process tool calls + if delta.tool_calls: + for tc in delta.tool_calls: + ctx.accumulate_tool_call(tc) + result.set_tool_calls() return result diff --git a/luxx/services/chat.py b/luxx/services/chat.py index 3d67b72..a38f893 100644 --- a/luxx/services/chat.py +++ b/luxx/services/chat.py @@ -297,6 +297,6 @@ def _sse_event(event: str, data: dict) -> str: return f"event: {event}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n" -# ============== Global Singleton ============== - -chat_service = ChatService() +def create_chat_service() -> ChatService: + """Factory function to create ChatService instances.""" + return ChatService() diff --git a/luxx/services/llm_adapters/__init__.py b/luxx/services/llm_adapters/__init__.py new file mode 100644 index 0000000..298197e --- /dev/null +++ b/luxx/services/llm_adapters/__init__.py @@ -0,0 +1,24 @@ +"""LLM Provider Adapters + +Adapter module for various LLM API formats. + +Adapter types: +- OpenAIAdapter: OpenAI/DeepSeek/GLM compatible APIs +- AnthropicAdapter: Anthropic Claude API + +Usage: + from luxx.services.llm_adapters import OpenAIAdapter, AnthropicAdapter + + adapter = OpenAIAdapter() + # Or use LLMClient for automatic selection +""" + +from .base import ProviderAdapter +from .openai_adapter import OpenAIAdapter +from .anthropic_adapter import AnthropicAdapter + +__all__ = [ + "ProviderAdapter", + "OpenAIAdapter", + "AnthropicAdapter", +] diff --git a/luxx/services/llm_adapters/anthropic_adapter.py b/luxx/services/llm_adapters/anthropic_adapter.py new file mode 100644 index 0000000..e316a95 --- /dev/null +++ b/luxx/services/llm_adapters/anthropic_adapter.py @@ -0,0 +1,336 @@ +"""Anthropic Adapter - Anthropic Claude API adapter + +Supports Anthropic Claude API streaming and non-streaming responses. +""" +import json +import logging +from typing import Dict, List, Any, AsyncGenerator + +from .base import ProviderAdapter +from ..llm_response import ParsedDelta, LLMResponse + +logger = logging.getLogger(__name__) + + +class AnthropicAdapter(ProviderAdapter): + """Anthropic Claude API adapter + + Pure parsing adapter - no internal state management. + Each parse_stream_chunk call returns incremental content. + Accumulation is handled by the consumer (AgenticLoop). + + Anthropic API uses a completely different format from OpenAI: + - Endpoint: POST /v1/messages + - Streaming: SSE events (content_block_start, content_block_delta, etc.) + - Thinking: Independent thinking type content block + - Tools: tool_use type content block + + Reference: https://docs.anthropic.com/claude/reference/messages + """ + + # Anthropic API endpoint suffix + MESSAGES_PATH = "/v1/messages" + + # Anthropic API version + ANTHROPIC_VERSION = "2023-06-01" + + # Content block types + BLOCK_MESSAGE_START = "message_start" + BLOCK_CONTENT_BLOCK_START = "content_block_start" + BLOCK_CONTENT_BLOCK_DELTA = "content_block_delta" + BLOCK_CONTENT_BLOCK_STOP = "content_block_stop" + BLOCK_MESSAGE_DELTA = "message_delta" + BLOCK_MESSAGE_STOP = "message_stop" + BLOCK_ERROR = "error" + + # Delta types + DELTA_THINKING = "thinking_delta" + DELTA_TEXT = "text_delta" + DELTA_INPUT_JSON = "input_json_delta" + + # Content block subtypes + SUBTYPE_THINKING = "thinking" + SUBTYPE_TEXT = "text" + SUBTYPE_TOOL_USE = "tool_use" + + def __init__(self): + pass + + @property + def provider_type(self) -> str: + return "anthropic" + + def build_request( + self, + model: str, + messages: List[Dict[str, Any]], + tools: List[Dict[str, Any]] = None, + **kwargs + ) -> tuple[Dict[str, Any], Dict[str, str]]: + """Build Anthropic-format request + + Anthropic request format differs from OpenAI: + - Uses "messages" instead of "message" + - Requires "max_tokens" + - Different tool format + """ + api_key = kwargs.get("api_key", "") + + headers = { + "Content-Type": "application/json", + "Authorization": f"Bearer {api_key}", + "anthropic-version": self.ANTHROPIC_VERSION + } + + # Convert messages to Anthropic format + anthropic_messages = self._convert_messages(messages) + + body = { + "model": model, + "messages": anthropic_messages, + "stream": kwargs.get("stream", True), + "max_tokens": kwargs.get("max_tokens", 4096) + } + + # System message + if "system" in kwargs: + body["system"] = kwargs["system"] + else: + # Extract from first message + for msg in messages: + if msg.get("role") == "system": + body["system"] = msg.get("content", "") + break + + # Thinking capability (Claude 3.5+) + if kwargs.get("thinking_enabled"): + body["thinking"] = { + "type": "enabled", + "budget_tokens": kwargs.get("thinking_budget_tokens", 10000) + } + + # Tool definitions + if tools: + body["tools"] = self._convert_tools(tools) + + # Optional parameters + if "temperature" in kwargs: + body["temperature"] = kwargs["temperature"] + + if "top_p" in kwargs: + body["top_p"] = kwargs["top_p"] + + if "stop_sequences" in kwargs: + body["stop_sequences"] = kwargs["stop_sequences"] + + return body, headers + + def _convert_messages(self, messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """Convert messages to Anthropic format + + Anthropic message format: + - role: user, assistant + - content: str or List[Dict] + """ + anthropic_messages = [] + + for msg in messages: + role = msg.get("role", "user") + + # Convert role + if role == "system": + continue # System messages handled separately + + anthropic_msg = { + "role": "user" if role == "user" else "assistant", + "content": [] + } + + # Handle content + content = msg.get("content", "") + if isinstance(content, str): + if content.strip(): + anthropic_msg["content"] = content + elif isinstance(content, list): + # Convert tool results + for item in content: + if isinstance(item, dict): + if item.get("type") == "tool_result": + anthropic_msg["content"].append({ + "type": "tool_result", + "tool_use_id": item.get("tool_call_id", ""), + "content": item.get("content", "") + }) + else: + anthropic_msg["content"].append(item) + else: + anthropic_msg["content"].append(str(item)) + + anthropic_messages.append(anthropic_msg) + + return anthropic_messages + + def _convert_tools(self, tools: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """Convert tool definitions to Anthropic format""" + anthropic_tools = [] + + for tool in tools: + anthropic_tool = { + "name": tool.get("name", ""), + "description": tool.get("description", ""), + "input_schema": tool.get("parameters", {"type": "object", "properties": {}}) + } + anthropic_tools.append(anthropic_tool) + + return anthropic_tools + + def reset(self): + """No-op for pure parsing adapter""" + pass + + async def parse_stream_chunk( + self, + raw_chunk: str + ) -> AsyncGenerator[ParsedDelta, None]: + """Parse Anthropic-format SSE stream + + Returns incremental content - no accumulation. + """ + if not raw_chunk or raw_chunk.strip() == "": + return + + try: + chunk = json.loads(raw_chunk) + except json.JSONDecodeError: + return + + chunk_type = chunk.get("type", "") + + # Handle errors + if chunk_type == self.BLOCK_ERROR: + error_msg = chunk.get("error", {}).get("type", "unknown_error") + logger.error(f"Anthropic API error: {error_msg}") + yield ParsedDelta() + return + + result = ParsedDelta() + + if chunk_type == self.BLOCK_MESSAGE_START: + # Message start - no content yet + pass + + elif chunk_type == self.BLOCK_CONTENT_BLOCK_START: + # Content block start + block = chunk.get("content_block", {}) + block_type = block.get("type") + + if block_type == self.SUBTYPE_THINKING: + # Thinking block start + thinking_text = block.get("thinking", {}).get("thinking", "") + result.thinking = thinking_text + + elif block_type == self.SUBTYPE_TOOL_USE: + # Tool use block start + tool_index = chunk.get("index", 0) + tool_name = block.get("name", "") + result.tool_calls = [{ + "index": tool_index, + "id": "", + "type": "function", + "function": {"name": tool_name, "arguments": ""} + }] + + elif block_type == self.SUBTYPE_TEXT: + # Text block start - nothing to output yet + pass + + elif chunk_type == self.BLOCK_CONTENT_BLOCK_DELTA: + # Content block delta + delta = chunk.get("delta", {}) + delta_type = delta.get("type", "") + + if delta_type == self.DELTA_THINKING: + # Thinking delta (incremental) + thinking = delta.get("thinking", "") + result.thinking = thinking + + elif delta_type == self.DELTA_TEXT: + # Text delta (incremental) + text = delta.get("text", "") + result.text = text + + elif delta_type == self.DELTA_INPUT_JSON: + # Tool arguments delta (incremental) + partial_json = delta.get("partial_json", "") + # For tool calls, we need to update the arguments + # This is handled by the consumer (AgenticLoop) + if partial_json: + result.tool_calls = [{ + "index": 0, + "function": {"arguments": partial_json} + }] + + elif chunk_type == self.BLOCK_CONTENT_BLOCK_STOP: + # Content block stop + pass + + elif chunk_type == self.BLOCK_MESSAGE_DELTA: + # Message delta (usually contains usage) + delta = chunk.get("delta", {}) + usage = chunk.get("usage", {}) + + result.usage = { + "prompt_tokens": usage.get("input_tokens", 0), + "completion_tokens": usage.get("output_tokens", 0), + "total_tokens": usage.get("input_tokens", 0) + usage.get("output_tokens", 0) + } + + # Check if complete by stop reason + if delta.get("stop_reason"): + result.is_complete = True + + elif chunk_type == self.BLOCK_MESSAGE_STOP: + # Message stop + result.is_complete = True + + # Yield result if there's any content + if result.has_content() or result.is_complete: + yield result + + def parse_response(self, data: Dict[str, Any]) -> LLMResponse: + """Parse non-streaming response""" + content = data.get("content", []) + thinking = "" + text_content = "" + tool_calls = [] + + for block in content: + if isinstance(block, dict): + block_type = block.get("type") + + if block_type == "thinking": + thinking = block.get("thinking", "") + elif block_type == "text": + text_content += block.get("text", "") + elif block_type == "tool_use": + tool_calls.append({ + "id": block.get("id", ""), + "name": block.get("name", ""), + "input": block.get("input", {}) + }) + + usage = data.get("usage", {}) + + return LLMResponse( + content=text_content, + thinking=thinking, + tool_calls=tool_calls, + usage={ + "prompt_tokens": usage.get("input_tokens", 0), + "completion_tokens": usage.get("output_tokens", 0), + "total_tokens": usage.get("input_tokens", 0) + usage.get("output_tokens", 0) + } + ) + + def supports_thinking(self) -> bool: + return True diff --git a/luxx/services/llm_adapters/base.py b/luxx/services/llm_adapters/base.py new file mode 100644 index 0000000..33c3bfd --- /dev/null +++ b/luxx/services/llm_adapters/base.py @@ -0,0 +1,97 @@ +"""ProviderAdapter - LLM Provider adapter base class + +Defines unified adapter interface that all Provider adapters must implement. +""" +from abc import ABC, abstractmethod +from typing import Dict, List, Any, AsyncGenerator + + +class ProviderAdapter(ABC): + """LLM Provider adapter base class + + All LLM API adapters must inherit from this class and implement its methods. + Adapters are responsible for: + 1. Building requests in provider-specific format + 2. Parsing streaming and non-streaming responses + 3. Providing provider capability information + + Attributes: + provider_type: Provider type identifier + """ + + @property + @abstractmethod + def provider_type(self) -> str: + """Return provider type identifier + + Returns: + str: Provider type, e.g., "openai", "anthropic" + """ + pass + + @abstractmethod + def build_request( + self, + model: str, + messages: List[Dict[str, Any]], + tools: List[Dict[str, Any]] = None, + **kwargs + ) -> tuple[Dict[str, Any], Dict[str, str]]: + """Build request body and headers + + Args: + model: Model name + messages: Message list + tools: Tool definition list + **kwargs: Other parameters (temperature, max_tokens, thinking_enabled, etc.) + + Returns: + tuple: (body, headers) tuple + """ + pass + + @abstractmethod + async def parse_stream_chunk( + self, + raw_chunk: str + ) -> AsyncGenerator[Any, None]: + """Parse streaming response chunk + + Args: + raw_chunk: Raw SSE line or chunk data + + Yields: + ParsedDelta object or other parsed results + """ + pass + + @abstractmethod + def parse_response( + self, + data: Dict[str, Any] + ) -> Any: + """Parse non-streaming response + + Args: + data: API response data + + Returns: + LLMResponse object + """ + pass + + def supports_thinking(self) -> bool: + """Whether provider supports thinking/reasoning + + Returns: + bool: True if provider supports thinking/reasoning content + """ + return False + + def supports_tools(self) -> bool: + """Whether provider supports tool/function calls + + Returns: + bool: True if provider supports function calling + """ + return False diff --git a/luxx/services/llm_adapters/openai_adapter.py b/luxx/services/llm_adapters/openai_adapter.py new file mode 100644 index 0000000..3463d13 --- /dev/null +++ b/luxx/services/llm_adapters/openai_adapter.py @@ -0,0 +1,191 @@ +"""OpenAI Adapter - OpenAI-compatible API adapter + +Supports OpenAI, DeepSeek, GLM and other OpenAI-compatible APIs. +""" +import json +import logging +from typing import Dict, List, Any, AsyncGenerator, Optional + +from .base import ProviderAdapter +from ..llm_response import ParsedDelta, LLMResponse + +logger = logging.getLogger(__name__) + + +class OpenAIAdapter(ProviderAdapter): + """OpenAI-compatible API adapter + + Pure parsing adapter - no internal state management. + Each parse_stream_chunk call returns incremental content. + Accumulation is handled by the consumer (AgenticLoop). + """ + + @property + def provider_type(self) -> str: + return "openai" + + def __init__(self): + pass + + def build_request( + self, + model: str, + messages: List[Dict[str, Any]], + tools: List[Dict[str, Any]] = None, + **kwargs + ) -> tuple[Dict[str, Any], Dict[str, str]]: + """Build OpenAI-format request""" + api_key = kwargs.get("api_key", "") + + headers = { + "Content-Type": "application/json", + "Authorization": f"Bearer {api_key}" + } + + body = { + "model": model, + "messages": messages, + "stream": kwargs.get("stream", True) + } + + # Optional parameters + if "temperature" in kwargs: + body["temperature"] = kwargs["temperature"] + if "max_tokens" in kwargs: + body["max_tokens"] = kwargs["max_tokens"] + if "top_p" in kwargs: + body["top_p"] = kwargs["top_p"] + if "frequency_penalty" in kwargs: + body["frequency_penalty"] = kwargs["frequency_penalty"] + if "presence_penalty" in kwargs: + body["presence_penalty"] = kwargs["presence_penalty"] + if "stop" in kwargs: + body["stop"] = kwargs["stop"] + if tools: + body["tools"] = tools + if kwargs.get("thinking_enabled"): + body["thinking_enabled"] = True + + return body, headers + + def reset(self): + """No-op for pure parsing adapter""" + pass + + async def parse_stream_chunk( + self, + raw_chunk: str + ) -> AsyncGenerator[ParsedDelta, None]: + """Parse OpenAI-format SSE stream + + Returns incremental content - no accumulation. + """ + # Parse SSE line + event_type, data_str = self._parse_sse_line(raw_chunk) + + if not data_str or data_str == "[DONE]": + if data_str == "[DONE]": + yield ParsedDelta(is_complete=True) + return + + try: + chunk = json.loads(data_str) + except json.JSONDecodeError: + return + + # Handle errors + if event_type == "error" or "error" in chunk: + yield ParsedDelta() + return + + # Extract usage + usage = chunk.get("usage", {}) + + # Parse choices + for choice in chunk.get("choices", []): + delta = choice.get("delta", {}) + content = delta.get("content") or "" + + # Extract thinking tags if present + thinking, clean_text = self._extract_tags(content) + + # Tool calls + tool_calls = delta.get("tool_calls", []) + + # Check if this is the final delta + is_complete = bool(choice.get("finish_reason")) + + if thinking or clean_text or tool_calls or is_complete or usage: + yield ParsedDelta( + thinking=thinking, + text=clean_text, + tool_calls=tool_calls if tool_calls else [], + is_complete=is_complete, + usage=usage if usage else {} + ) + + def parse_response(self, data: Dict[str, Any]) -> LLMResponse: + """Parse non-streaming response""" + choice = data.get("choices", [{}])[0] + message = choice.get("message", {}) + + content = message.get("content", "") or "" + thinking, clean_content = self._extract_tags(content) + if not thinking: + thinking = message.get("reasoning_content") or "" + + tool_calls = message.get("tool_calls", []) + + usage = data.get("usage", {}) + + return LLMResponse( + content=clean_content, + thinking=thinking, + tool_calls=tool_calls, + usage=usage + ) + + def _parse_sse_line(self, line: str) -> tuple: + """Parse a single SSE line, return (event_type, data)""" + if line.startswith("event:"): + return line[6:].strip(), None + elif line.startswith("data:"): + return "", line[5:].strip() + return "", None + + def _extract_tags(self, content: str) -> tuple: + """Extract thinking tags and return (thinking, clean_text)""" + if not content: + return "", "" + + thinking_parts = [] + clean_parts = [] + i = 0 + + while i < len(content): + remaining = content[i:] + remaining_lower = remaining.lower() + + if remaining_lower.startswith(""): + # Found start of thinking tag + end_pos = i + 7 + remaining_after_tag = content[end_pos:] + end_idx = remaining_after_tag.lower().find("") + + if end_idx != -1: + thinking_parts.append(remaining_after_tag[:end_idx]) + i = end_pos + end_idx + 9 + continue + else: + # No end tag - all remaining is thinking + thinking_parts.append(remaining.strip()) + break + + if remaining_lower.startswith(""): + i += 9 + continue + + clean_parts.append(content[i]) + i += 1 + + return "".join(thinking_parts), "".join(clean_parts) diff --git a/luxx/services/llm_client.py b/luxx/services/llm_client.py index fac67a7..680c5a5 100644 --- a/luxx/services/llm_client.py +++ b/luxx/services/llm_client.py @@ -1,165 +1,258 @@ -"""LLM API client""" +"""LLM API Client - Unified client with multi-Provider support + +Supports various LLM API formats: +- OpenAI (api.openai.com) +- DeepSeek (api.deepseek.com) +- Anthropic (api.anthropic.com) +- GLM/Zhipu AI + +Usage: + from luxx.services.llm_client import LLMClient + + # Auto-detect provider + client = LLMClient(api_key="...", api_url="...") + + # Specify provider + client = LLMClient(api_key="...", api_url="...", provider_type="anthropic") + + # Streaming call + async for delta in client.stream_call(model, messages, tools=tools): + print(delta.text, delta.thinking, delta.tool_calls) +""" import json -import httpx import logging import traceback -from typing import Dict, Any, Optional, List, AsyncGenerator +from typing import Dict, List, Any, Optional, AsyncGenerator + +import httpx from luxx.config import config +from luxx.services.llm_adapters import ( + ProviderAdapter, + OpenAIAdapter, + AnthropicAdapter, +) +from luxx.services.llm_response import ParsedDelta, LLMResponse logger = logging.getLogger(__name__) -class LLMResponse: - """LLM response""" - content: str - tool_calls: Optional[List[Dict]] = None - usage: Optional[Dict] = None +class LLMClient: + """LLM API Client with multi-Provider support + + Uses adapter pattern to support different API formats, auto-detects or manually specifies Provider type. + + Attributes: + api_key: API key + api_url: API base URL + default_model: Default model + provider_type: Provider type + adapter: Current adapter instance + """ + + # Provider type to adapter class mapping + PROVIDER_ADAPTERS: Dict[str, type] = { + # OpenAI-compatible formats + "openai": OpenAIAdapter, + "deepseek": OpenAIAdapter, + "glm": OpenAIAdapter, + "zhipu": OpenAIAdapter, + # Anthropic formats + "anthropic": AnthropicAdapter, + "claude": AnthropicAdapter, + } + + # URL keywords for provider detection + PROVIDER_URL_KEYWORDS: Dict[str, List[str]] = { + "anthropic": ["anthropic", "claude"], + "deepseek": ["deepseek"], + "glm": ["glm", "zhipu", "chatglm"], + "openai": ["openai"], + } def __init__( self, - content: str = "", - tool_calls: Optional[List[Dict]] = None, - usage: Optional[Dict] = None + api_key: str = None, + api_url: str = None, + model: str = None, + provider_type: str = None ): - self.content = content - self.tool_calls = tool_calls - self.usage = usage - - -class LLMClient: - """LLM API client with multi-provider support""" - - def __init__(self, api_key: str = None, api_url: str = None, model: str = None): + """Initialize LLM client + + Args: + api_key: API key, defaults to config value + api_url: API base URL, defaults to config value + model: Default model name + provider_type: Specify Provider type, defaults to auto-detect + """ self.api_key = api_key or config.llm_api_key self.api_url = api_url or config.llm_api_url self.default_model = model - self.provider = self._detect_provider() + + # Detect or use specified provider + if provider_type: + self.provider_type = provider_type + else: + self.provider_type = self._detect_provider_type(api_url) + + self.adapter = self._create_adapter() self._client: Optional[httpx.AsyncClient] = None - def _detect_provider(self) -> str: - """Detect provider from URL""" - url = self.api_url.lower() - if "deepseek" in url: - return "deepseek" - elif "glm" in url or "zhipu" in url: - return "glm" - elif "openai" in url: - return "openai" + def _detect_provider_type(self, url: str = None) -> str: + """Detect Provider type from URL + + Args: + url: API URL, uses self.api_url if None + + Returns: + Provider type string + """ + url = url or self.api_url + url_lower = url.lower() + + for provider, keywords in self.PROVIDER_URL_KEYWORDS.items(): + for keyword in keywords: + if keyword in url_lower: + logger.debug(f"Detected provider '{provider}' from URL: {url}") + return provider + + logger.debug(f"Defaulting to 'openai' for URL: {url}") return "openai" - async def close(self): - """Close client""" - if self._client: - await self._client.aclose() - self._client = None - - def _build_headers(self) -> Dict[str, str]: - """Build request headers""" - return { - "Content-Type": "application/json", - "Authorization": f"Bearer {self.api_key}" - } - - def _build_body( - self, - model: str, - messages: List[Dict], - tools: Optional[List[Dict]] = None, - stream: bool = False, - **kwargs - ) -> Dict[str, Any]: - """Build request body""" - body = { - "model": model, - "messages": messages, - "stream": stream - } + def _create_adapter(self) -> ProviderAdapter: + """Create adapter instance - if "temperature" in kwargs: - body["temperature"] = kwargs["temperature"] - - if "max_tokens" in kwargs: - body["max_tokens"] = kwargs["max_tokens"] - - if "thinking_enabled" in kwargs and kwargs["thinking_enabled"]: - body["thinking_enabled"] = True - - if tools: - body["tools"] = tools - - return body - - def _parse_response(self, data: Dict) -> LLMResponse: - """Parse response""" - content = "" - tool_calls = None - usage = None - - if "choices" in data: - choice = data["choices"][0] - content = choice.get("message", {}).get("content", "") - tool_calls = choice.get("message", {}).get("tool_calls") - - if "usage" in data: - usage = data["usage"] - - return LLMResponse( - content=content, - tool_calls=tool_calls, - usage=usage + Returns: + ProviderAdapter subclass instance + """ + adapter_class = self.PROVIDER_ADAPTERS.get( + self.provider_type, + OpenAIAdapter ) + logger.info(f"Created {adapter_class.__name__} for provider: {self.provider_type}") + return adapter_class() + + @property + def supports_thinking(self) -> bool: + """Whether current Provider supports thinking content""" + return self.adapter.supports_thinking() + + @property + def supports_tools(self) -> bool: + """Whether current Provider supports tool calls""" + return self.adapter.supports_tools() async def client(self) -> httpx.AsyncClient: - """Get HTTP client""" - if self._client is None: + """Get HTTP client (lazy load)""" + if self._client is None or self._client.is_closed: self._client = httpx.AsyncClient(timeout=120.0) return self._client - async def sync_call( + async def close(self): + """Close HTTP client""" + if self._client and not self._client.is_closed: + await self._client.aclose() + self._client = None + + def sync_call( self, model: str, - messages: List[Dict], - tools: Optional[List[Dict]] = None, + messages: List[Dict[str, Any]], + tools: List[Dict[str, Any]] = None, **kwargs ) -> LLMResponse: - """Call LLM API (non-streaming)""" - body = self._build_body(model, messages, tools, stream=False, **kwargs) + """Synchronous call to LLM (non-streaming) - async with httpx.AsyncClient(timeout=120.0) as client: - response = await client.post( - self.api_url, - headers=self._build_headers(), - json=body - ) - response.raise_for_status() - data = response.json() + Args: + model: Model name + messages: Message list + tools: Tool definition list + **kwargs: Other parameters (temperature, max_tokens, thinking_enabled, etc.) + + Returns: + LLMResponse object + """ + import asyncio + return asyncio.get_event_loop().run_until_complete( + self.async_sync_call(model, messages, tools, **kwargs) + ) + + async def async_sync_call( + self, + model: str, + messages: List[Dict[str, Any]], + tools: List[Dict[str, Any]] = None, + **kwargs + ) -> LLMResponse: + """Internal async sync call""" + model = model or self.default_model + kwargs["api_key"] = self.api_key - return self._parse_response(data) + body, headers = self.adapter.build_request( + model, messages, tools, stream=False, **kwargs + ) + + endpoint = self.api_url + logger.info(f"Sync call to {endpoint} with model {model}") + + try: + async with httpx.AsyncClient(timeout=120.0) as client: + response = await client.post( + endpoint, + headers=headers, + json=body + ) + response.raise_for_status() + data = response.json() + + return self.adapter.parse_response(data) + + except httpx.HTTPStatusError as e: + logger.error(f"HTTP error: {e.response.status_code} - {e.response.text}") + raise + except Exception as e: + logger.error(f"Sync call error: {e}\n{traceback.format_exc()}") + raise async def stream_call( self, model: str, - messages: List[Dict], - tools: Optional[List[Dict]] = None, + messages: List[Dict[str, Any]], + tools: List[Dict[str, Any]] = None, **kwargs - ) -> AsyncGenerator[str, None]: - """Stream call LLM API - yields raw SSE event lines + ) -> AsyncGenerator[ParsedDelta, None]: + """Streaming call to LLM + Args: + model: Model name + messages: Message list + tools: Tool definition list + **kwargs: Other parameters + Yields: - str: Raw SSE event lines for direct forwarding + ParsedDelta objects with accumulated content """ - body = self._build_body(model, messages, tools, stream=True, **kwargs) + # Reset adapter buffers for new stream + if hasattr(self.adapter, 'reset'): + self.adapter.reset() - logger.info(f"Starting stream_call for model: {model}, messages count: {len(messages)}") + model = model or self.default_model + kwargs["api_key"] = self.api_key + kwargs["stream"] = True + + body, headers = self.adapter.build_request( + model, messages, tools, **kwargs + ) + + endpoint = self.api_url + logger.info(f"Stream call to {endpoint} with model {model}") try: async with httpx.AsyncClient(timeout=120.0) as client: - logger.info(f"Sending request to {self.api_url}") async with client.stream( "POST", - self.api_url, - headers=self._build_headers(), + endpoint, + headers=headers, json=body ) as response: logger.info(f"Response status: {response.status_code}") @@ -167,15 +260,40 @@ class LLMClient: async for line in response.aiter_lines(): if line.strip(): - yield line + "\n" + async for delta in self.adapter.parse_stream_chunk(line): + yield delta + except httpx.HTTPStatusError as e: status_code = e.response.status_code if e.response else "?" - logger.error(f"HTTP error: {status_code}") - yield f"event: error\ndata: {json.dumps({'content': f'HTTP {status_code}: Request failed'})}\n\n" + error_body = e.response.text if e.response else "" + logger.error(f"HTTP error: {status_code} - {error_body}") + yield ParsedDelta() except Exception as e: - logger.error(f"Exception: {type(e).__name__}: {str(e)}\n{traceback.format_exc()}") - yield f"event: error\ndata: {json.dumps({'content': str(e)})}\n\n" + logger.error(f"Stream error: {type(e).__name__}: {e}\n{traceback.format_exc()}") + yield ParsedDelta() -# Global LLM client -llm_client = LLMClient() +# Convenience function +def create_client( + api_key: str = None, + api_url: str = None, + model: str = None, + provider_type: str = None +) -> LLMClient: + """Convenience function to create LLM client + + Args: + api_key: API key + api_url: API URL + model: Model + provider_type: Provider type + + Returns: + LLMClient instance + """ + return LLMClient( + api_key=api_key, + api_url=api_url, + model=model, + provider_type=provider_type + ) diff --git a/luxx/services/llm_response.py b/luxx/services/llm_response.py index f98ec13..a31f403 100644 --- a/luxx/services/llm_response.py +++ b/luxx/services/llm_response.py @@ -1,309 +1,65 @@ -"""LLM Response Parser - Unified parser for multiple LLM API formats. +"""LLM Response - Unified message classes for LLM communication -Supported Providers: -- OpenAI: delta.content, delta.tool_calls -- DeepSeek: delta.content, delta.reasoning_content, delta.tool_calls -- Anthropic: content_block with thinking/text types -- MiniMax: <|im_start|>thinking...<|im_end|> tags in content - -Data Flow: -``` -LLM API Response (SSE) - │ - ▼ -LLMResponseParser.parse_chunk() - │ - ├──► ParsedDelta { thinking, text, tool_calls } - │ - ▼ -AgenticLoop._process_stream_line() - │ - ▼ -SSE Events (process_step) - │ - ├──► type: "thinking" - ├──► type: "text" - └──► type: "tool_call" -``` - -API Response Formats: - -1. OpenAI Standard (DeepSeek, OpenAI): -```json -{ - "choices": [{ - "delta": { - "content": "Hello", - "reasoning_content": "Let me think...", - "tool_calls": [{"id": "call_1", "function": {...}}] - } - }] -} -``` - -2. Anthropic Streaming: -```json -{"type": "content_block_start", "content_block": {"type": "thinking", "thinking": "..."}} -{"type": "content_block_delta", "delta": {"type": "thinking_delta", "thinking": "..."}} -{"type": "content_block_delta", "delta": {"type": "text_delta", "text": "..."}} -{"type": "content_block_stop"} -``` - -3. MiniMax (with thinking tags in content): -```json -{ - "choices": [{ - "delta": { - "content": "<|im_start|>thinking分析中...<|im_end|>回复内容" - } - }] -} -``` - -4. Standard thinking tags: -```json -{ - "choices": [{ - "delta": { - "content": "思考内容回复内容" - } - }] -} -``` +This module provides unified data classes for message passing throughout the LLM pipeline. """ -from typing import Dict, Any, Optional, List -from dataclasses import dataclass +from typing import Dict, Any, List, Optional +from dataclasses import dataclass, field @dataclass class ParsedDelta: - """Parsed response delta from LLM. + """Streaming response delta + + Represents a single unit of streaming response data. + Used for streaming responses where content is accumulated incrementally. Attributes: - thinking: Thinking/reasoning content - text: Regular text content - tool_calls: Tool call requests - is_complete: Whether this delta completes a content block + thinking: Accumulated thinking/reasoning content + text: Accumulated text content + tool_calls: List of tool call requests + is_complete: Whether this is the final delta + usage: Token usage statistics """ thinking: str = "" text: str = "" - tool_calls: List[Dict] = None + tool_calls: List[Dict] = field(default_factory=list) is_complete: bool = False + usage: Dict[str, int] = field(default_factory=dict) - def __post_init__(self): - if self.tool_calls is None: - self.tool_calls = [] + def has_thinking(self) -> bool: + """Check if there's thinking content""" + return bool(self.thinking) + + def has_text(self) -> bool: + """Check if there's text content""" + return bool(self.text) + + def has_tool_calls(self) -> bool: + """Check if there are tool calls""" + return bool(self.tool_calls) + + def has_content(self) -> bool: + """Check if there's any content""" + return self.has_thinking() or self.has_text() or self.has_tool_calls() -class LLMResponseParser: - """Unified parser for LLM API response formats. +@dataclass +class LLMResponse: + """Complete LLM response - Usage: - from luxx.services.llm_response import llm_parser - - # Parse OpenAI format - delta = {"content": "Hello", "reasoning_content": "Thinking..."} - parsed = llm_parser.parse_openai(delta) - - # Parse Anthropic format - chunk = {"type": "content_block_delta", "delta": {"type": "thinking_delta", "thinking": "..."}} - parsed = llm_parser.parse_anthropic(chunk) - - # Auto-detect format - parsed = llm_parser.parse_chunk(chunk, provider="anthropic") + Represents a complete non-streaming response. + + Attributes: + content: Final text content + thinking: Final thinking content (if any) + tool_calls: List of tool calls (if any) + usage: Token usage statistics """ + content: str = "" + thinking: str = "" + tool_calls: List[Dict] = field(default_factory=list) + usage: Dict[str, int] = field(default=dict) - # Content block types - BLOCK_THINKING = "thinking" - BLOCK_TEXT = "text" - BLOCK_TOOL_USE = "tool_use" - BLOCK_TOOL_RESULT = "tool_result" - - def __init__(self): - self._buffer = "" - self._thinking_buffer = "" - self._text_buffer = "" - - def reset(self): - """Reset parser state for new message.""" - self._buffer = "" - self._thinking_buffer = "" - self._text_buffer = "" - - def parse_openai(self, delta: Dict) -> ParsedDelta: - """Parse OpenAI format delta. - - Handles: - - OpenAI: delta.content, delta.tool_calls - - DeepSeek: delta.content, delta.reasoning_content, delta.tool_calls - - MiniMax: <|im_start|>thinking...<|im_end|> in content - - Standard: ... in content - - Args: - delta: Delta object from LLM API response - - Returns: - ParsedDelta with extracted thinking, text, and tool_calls - """ - result = ParsedDelta() - - # Get thinking content (DeepSeek uses reasoning_content) - thinking = delta.get("reasoning_content") or delta.get("reasoning") or "" - if thinking: - self._thinking_buffer += thinking - result.thinking = self._thinking_buffer - - # Get text content - text = delta.get("content") or "" - if text: - # Check for embedded thinking tags (MiniMax format) - thinking_part, clean_text = self._extract_thinking_tags(text) - if thinking_part: - self._thinking_buffer += thinking_part - result.thinking = self._thinking_buffer - if clean_text: - self._text_buffer += clean_text - result.text = self._text_buffer - elif thinking_part := delta.get("thinking"): - # Some providers use "thinking" field directly - self._thinking_buffer += thinking_part - result.thinking = self._thinking_buffer - - # Tool calls - result.tool_calls = delta.get("tool_calls") or [] - - return result - - def parse_anthropic(self, chunk: Dict) -> ParsedDelta: - """Parse Anthropic streaming format. - - Anthropic uses a different event structure: - - content_block_start: Begin a content block - - content_block_delta: Incremental content - - content_block_stop: End of content blocks - - Content block types: - - thinking: Model reasoning - - text: Regular text - - tool_use: Tool invocation - - tool_result: Tool output - - Args: - chunk: Anthropic SSE event chunk - - Returns: - ParsedDelta with extracted content - """ - result = ParsedDelta() - chunk_type = chunk.get("type", "") - - if chunk_type == "content_block_start": - block = chunk.get("content_block", {}) - if block.get("type") == self.BLOCK_THINKING: - thinking = block.get("thinking", "") - if thinking: - self._thinking_buffer = thinking - result.thinking = self._thinking_buffer - - elif chunk_type == "content_block_delta": - delta = chunk.get("delta", {}) - delta_type = delta.get("type", "") - - if delta_type == "thinking_delta": - thinking = delta.get("thinking", "") - self._thinking_buffer += thinking - result.thinking = self._thinking_buffer - - elif delta_type == "text_delta": - text = delta.get("text", "") - self._text_buffer += text - result.text = self._text_buffer - - elif delta_type == "partial_json": - # Partial JSON for tool calls - pass - - elif chunk_type == "content_block_stop": - result.is_complete = True - - return result - - def parse_chunk(self, chunk: Dict, provider: str = "openai") -> ParsedDelta: - """Parse chunk based on provider. - - Args: - chunk: Response chunk from LLM - provider: Provider name ("openai", "anthropic", "deepseek", "minimax") - - Returns: - ParsedDelta with extracted content - """ - if provider == "anthropic": - return self.parse_anthropic(chunk) - - # Default to OpenAI format - return self.parse_openai(chunk) - - def _extract_thinking_tags(self, content: str) -> tuple: - """Extract thinking content from tags. - - Handles multiple tag formats: - - MiniMax: <|im_start|>thinking...<|im_end|> - - Standard: ... - - Args: - content: Raw content string from LLM - - Returns: - Tuple of (thinking_content, clean_text) - """ - thinking_parts = [] - clean_parts = [] - i = 0 - - while i < len(content): - remaining = content[i:].lower() - - # Check for MiniMax format - if remaining.startswith("<|im_start|>thinking"): - end_tag = "<|im_end|>" - start = i + 21 # len("<|im_start|>thinking") - end = content.find(end_tag, start) - if end != -1: - thinking_parts.append(content[start:end]) - i = end + len(end_tag) - continue - - # Check for standard format - if remaining.startswith(""): - end_tag = "" - start = i + 7 # len("") - end = content.find(end_tag, start) - if end != -1: - thinking_parts.append(content[start:end]) - i = end + len(end_tag) - continue - - # Regular character - clean_parts.append(content[i]) - i += 1 - - return "".join(thinking_parts), "".join(clean_parts) - - def has_thinking_tags(self, content: str) -> bool: - """Check if content contains thinking tags. - - Args: - content: Raw content string - - Returns: - True if content contains thinking tags - """ - if not content: - return False - lower = content.lower() - return "<|im_start|>thinking" in lower or "" in lower - - -# Global parser instance -llm_parser = LLMResponseParser() + def has_tool_calls(self) -> bool: + """Check if there are tool calls""" + return bool(self.tool_calls) diff --git a/luxx/services/stream_context.py b/luxx/services/stream_context.py index 7ebde9a..39debbf 100644 --- a/luxx/services/stream_context.py +++ b/luxx/services/stream_context.py @@ -65,44 +65,6 @@ class StreamContext: "content": content }) - def handle_thinking(self, delta: Dict) -> Optional[str]: - """Handle reasoning delta from LLM.""" - reasoning = delta.get("reasoning_content", "") - if not reasoning: - return None - - if not self.full_thinking: - self.start_step("thinking") - - self.full_thinking += reasoning - return _sse_event("process_step", { - "step": { - "id": self.current_step_id, - "index": self.current_step_idx, - "type": "thinking", - "content": self.full_thinking - } - }) - - def handle_text(self, delta: Dict) -> Optional[str]: - """Handle content delta from LLM.""" - content = delta.get("content", "") - if not content: - return None - - if not self.full_content: - self.start_step("text") - - self.full_content += content - return _sse_event("process_step", { - "step": { - "id": self.current_step_id, - "index": self.current_step_idx, - "type": "text", - "content": self.full_content - } - }) - def accumulate_tool_call(self, tc_delta: Dict): """Accumulate tool call delta.""" idx = tc_delta.get("index", 0)