Luxx/luxx/services/llm_adapters/anthropic_adapter.py

343 lines
11 KiB
Python

"""Anthropic Adapter - Anthropic Claude API adapter
Supports Anthropic Claude API streaming and non-streaming responses.
"""
import json
import logging
from typing import Dict, List, Any, AsyncGenerator
from .base import ProviderAdapter
from ..llm_response import ParsedDelta
logger = logging.getLogger(__name__)
class AnthropicAdapter(ProviderAdapter):
"""Anthropic Claude API adapter
Pure parsing adapter - no internal state management.
Each parse_stream_chunk call returns incremental content.
Accumulation is handled by the consumer (AgenticLoop).
Anthropic API uses a completely different format from OpenAI:
- Endpoint: POST /v1/messages
- Streaming: SSE events (content_block_start, content_block_delta, etc.)
- Thinking: Independent thinking type content block
- Tools: tool_use type content block
Reference: https://docs.anthropic.com/claude/reference/messages
"""
# Anthropic API endpoint suffix
MESSAGES_PATH = "/v1/messages"
# Anthropic API version
ANTHROPIC_VERSION = "2023-06-01"
# Content block types
BLOCK_MESSAGE_START = "message_start"
BLOCK_CONTENT_BLOCK_START = "content_block_start"
BLOCK_CONTENT_BLOCK_DELTA = "content_block_delta"
BLOCK_CONTENT_BLOCK_STOP = "content_block_stop"
BLOCK_MESSAGE_DELTA = "message_delta"
BLOCK_MESSAGE_STOP = "message_stop"
BLOCK_ERROR = "error"
# Delta types
DELTA_THINKING = "thinking_delta"
DELTA_TEXT = "text_delta"
DELTA_INPUT_JSON = "input_json_delta"
# Content block subtypes
SUBTYPE_THINKING = "thinking"
SUBTYPE_TEXT = "text"
SUBTYPE_TOOL_USE = "tool_use"
def __init__(self):
pass
@property
def provider_type(self) -> str:
return "anthropic"
@property
def api_path(self) -> str:
return self.MESSAGES_PATH
def build_request(
self,
model: str,
messages: List[Dict[str, Any]],
tools: List[Dict[str, Any]] = None,
**kwargs
) -> tuple[Dict[str, Any], Dict[str, str]]:
"""Build Anthropic-format request
Anthropic request format differs from OpenAI:
- Uses "messages" instead of "message"
- Requires "max_tokens"
- Different tool format
"""
api_key = kwargs.get("api_key", "")
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}",
"anthropic-version": self.ANTHROPIC_VERSION
}
# Convert messages to Anthropic format
anthropic_messages = self._convert_messages(messages)
body = {
"model": model,
"messages": anthropic_messages,
"stream": kwargs.get("stream", True),
"max_tokens": kwargs.get("max_tokens", 4096)
}
# System message
if "system" in kwargs:
body["system"] = kwargs["system"]
else:
# Extract from first message
for msg in messages:
if msg.get("role") == "system":
body["system"] = msg.get("content", "")
break
# Thinking capability (Claude 3.5+)
if kwargs.get("thinking_enabled"):
body["thinking"] = {
"type": "enabled",
"budget_tokens": kwargs.get("thinking_budget_tokens", 10000)
}
# Tool definitions
if tools:
body["tools"] = self._convert_tools(tools)
# Optional parameters
if "temperature" in kwargs:
body["temperature"] = kwargs["temperature"]
if "top_p" in kwargs:
body["top_p"] = kwargs["top_p"]
if "stop_sequences" in kwargs:
body["stop_sequences"] = kwargs["stop_sequences"]
return body, headers
def _convert_messages(self, messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Convert messages to Anthropic format
Anthropic message format:
- role: user, assistant
- content: str or List[Dict]
"""
anthropic_messages = []
for msg in messages:
role = msg.get("role", "user")
# Convert role
if role == "system":
continue # System messages handled separately
anthropic_msg = {
"role": "user" if role == "user" else "assistant",
"content": []
}
# Handle content
content = msg.get("content", "")
if isinstance(content, str):
if content.strip():
anthropic_msg["content"] = content
elif isinstance(content, list):
# Convert tool results
for item in content:
if isinstance(item, dict):
if item.get("type") == "tool_result":
anthropic_msg["content"].append({
"type": "tool_result",
"tool_use_id": item.get("tool_call_id", ""),
"content": item.get("content", "")
})
else:
anthropic_msg["content"].append(item)
else:
anthropic_msg["content"].append(str(item))
anthropic_messages.append(anthropic_msg)
return anthropic_messages
def _convert_tools(self, tools: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Convert tool definitions to Anthropic format"""
anthropic_tools = []
for tool in tools:
anthropic_tool = {
"name": tool.get("name", ""),
"description": tool.get("description", ""),
"input_schema": tool.get("parameters", {"type": "object", "properties": {}})
}
anthropic_tools.append(anthropic_tool)
return anthropic_tools
def reset(self):
"""No-op for pure parsing adapter"""
pass
async def parse_stream_chunk(
self,
raw_chunk: str
) -> AsyncGenerator[ParsedDelta, None]:
"""Parse Anthropic-format SSE stream
Returns incremental content - no accumulation.
"""
if not raw_chunk or raw_chunk.strip() == "":
return
try:
chunk = json.loads(raw_chunk)
except json.JSONDecodeError:
return
chunk_type = chunk.get("type", "")
# Handle errors
if chunk_type == self.BLOCK_ERROR:
error_msg = chunk.get("error", {}).get("type", "unknown_error")
logger.error(f"Anthropic API error: {error_msg}")
yield ParsedDelta()
return
result = ParsedDelta()
if chunk_type == self.BLOCK_MESSAGE_START:
# Message start - no content yet
pass
elif chunk_type == self.BLOCK_CONTENT_BLOCK_START:
# Content block start
block = chunk.get("content_block", {})
block_type = block.get("type")
if block_type == self.SUBTYPE_THINKING:
# Thinking block start
thinking_text = block.get("thinking", "")
result.thinking = thinking_text
elif block_type == self.SUBTYPE_TOOL_USE:
# Tool use block start
tool_index = chunk.get("index", 0)
tool_name = block.get("name", "")
result.tool_call = {
"index": tool_index,
"id": "",
"type": "function",
"function": {"name": tool_name, "arguments": ""}
}
elif block_type == self.SUBTYPE_TEXT:
# Text block start - nothing to output yet
pass
elif chunk_type == self.BLOCK_CONTENT_BLOCK_DELTA:
# Content block delta
delta = chunk.get("delta", {})
delta_type = delta.get("type", "")
if delta_type == self.DELTA_THINKING:
# Thinking delta (incremental)
thinking = delta.get("thinking", "")
result.thinking = thinking
elif delta_type == self.DELTA_TEXT:
# Text delta (incremental)
text = delta.get("text", "")
result.text = text
elif delta_type == self.DELTA_INPUT_JSON:
# Tool arguments delta (incremental)
partial_json = delta.get("partial_json", "")
if partial_json:
result.tool_call = {
"index": 0,
"function": {"arguments": partial_json}
}
elif chunk_type == self.BLOCK_CONTENT_BLOCK_STOP:
# Content block stop
pass
elif chunk_type == self.BLOCK_MESSAGE_DELTA:
# Message delta (usually contains usage)
delta = chunk.get("delta", {})
usage = chunk.get("usage", {})
result.usage = {
"prompt_tokens": usage.get("input_tokens", 0),
"completion_tokens": usage.get("output_tokens", 0),
"total_tokens": usage.get("input_tokens", 0) + usage.get("output_tokens", 0)
}
# Check if complete by stop reason
if delta.get("stop_reason"):
result.is_complete = True
elif chunk_type == self.BLOCK_MESSAGE_STOP:
# Message stop
result.is_complete = True
# Yield result if there's any content
if result.has_content() or result.is_complete:
yield result
def parse_response(self, data: Dict[str, Any]) -> Dict:
"""Parse non-streaming response"""
content = data.get("content", [])
thinking = ""
text_content = ""
tool_calls = []
for block in content:
if isinstance(block, dict):
block_type = block.get("type")
if block_type == "thinking":
thinking = block.get("thinking", "")
elif block_type == "text":
text_content += block.get("text", "")
elif block_type == "tool_use":
tool_calls.append({
"id": block.get("id", ""),
"name": block.get("name", ""),
"input": block.get("input", {})
})
usage = data.get("usage", {})
return {
"content": text_content,
"thinking": thinking,
"tool_calls": tool_calls,
"usage": {
"prompt_tokens": usage.get("input_tokens", 0),
"completion_tokens": usage.get("output_tokens", 0),
"total_tokens": usage.get("input_tokens", 0) + usage.get("output_tokens", 0)
}
}
def supports_thinking(self) -> bool:
return True
def supports_tools(self) -> bool:
return True