343 lines
11 KiB
Python
343 lines
11 KiB
Python
"""Anthropic Adapter - Anthropic Claude API adapter
|
|
|
|
Supports Anthropic Claude API streaming and non-streaming responses.
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
from typing import Dict, List, Any, AsyncGenerator
|
|
|
|
from .base import ProviderAdapter
|
|
from ..llm_response import ParsedDelta
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class AnthropicAdapter(ProviderAdapter):
|
|
"""Anthropic Claude API adapter
|
|
|
|
Pure parsing adapter - no internal state management.
|
|
Each parse_stream_chunk call returns incremental content.
|
|
Accumulation is handled by the consumer (AgenticLoop).
|
|
|
|
Anthropic API uses a completely different format from OpenAI:
|
|
- Endpoint: POST /v1/messages
|
|
- Streaming: SSE events (content_block_start, content_block_delta, etc.)
|
|
- Thinking: Independent thinking type content block
|
|
- Tools: tool_use type content block
|
|
|
|
Reference: https://docs.anthropic.com/claude/reference/messages
|
|
"""
|
|
|
|
# Anthropic API endpoint suffix
|
|
MESSAGES_PATH = "/v1/messages"
|
|
|
|
# Anthropic API version
|
|
ANTHROPIC_VERSION = "2023-06-01"
|
|
|
|
# Content block types
|
|
BLOCK_MESSAGE_START = "message_start"
|
|
BLOCK_CONTENT_BLOCK_START = "content_block_start"
|
|
BLOCK_CONTENT_BLOCK_DELTA = "content_block_delta"
|
|
BLOCK_CONTENT_BLOCK_STOP = "content_block_stop"
|
|
BLOCK_MESSAGE_DELTA = "message_delta"
|
|
BLOCK_MESSAGE_STOP = "message_stop"
|
|
BLOCK_ERROR = "error"
|
|
|
|
# Delta types
|
|
DELTA_THINKING = "thinking_delta"
|
|
DELTA_TEXT = "text_delta"
|
|
DELTA_INPUT_JSON = "input_json_delta"
|
|
|
|
# Content block subtypes
|
|
SUBTYPE_THINKING = "thinking"
|
|
SUBTYPE_TEXT = "text"
|
|
SUBTYPE_TOOL_USE = "tool_use"
|
|
|
|
def __init__(self):
|
|
pass
|
|
|
|
@property
|
|
def provider_type(self) -> str:
|
|
return "anthropic"
|
|
|
|
@property
|
|
def api_path(self) -> str:
|
|
return self.MESSAGES_PATH
|
|
|
|
def build_request(
|
|
self,
|
|
model: str,
|
|
messages: List[Dict[str, Any]],
|
|
tools: List[Dict[str, Any]] = None,
|
|
**kwargs
|
|
) -> tuple[Dict[str, Any], Dict[str, str]]:
|
|
"""Build Anthropic-format request
|
|
|
|
Anthropic request format differs from OpenAI:
|
|
- Uses "messages" instead of "message"
|
|
- Requires "max_tokens"
|
|
- Different tool format
|
|
"""
|
|
api_key = kwargs.get("api_key", "")
|
|
|
|
headers = {
|
|
"Content-Type": "application/json",
|
|
"Authorization": f"Bearer {api_key}",
|
|
"anthropic-version": self.ANTHROPIC_VERSION
|
|
}
|
|
|
|
# Convert messages to Anthropic format
|
|
anthropic_messages = self._convert_messages(messages)
|
|
|
|
body = {
|
|
"model": model,
|
|
"messages": anthropic_messages,
|
|
"stream": kwargs.get("stream", True),
|
|
"max_tokens": kwargs.get("max_tokens", 4096)
|
|
}
|
|
|
|
# System message
|
|
if "system" in kwargs:
|
|
body["system"] = kwargs["system"]
|
|
else:
|
|
# Extract from first message
|
|
for msg in messages:
|
|
if msg.get("role") == "system":
|
|
body["system"] = msg.get("content", "")
|
|
break
|
|
|
|
# Thinking capability (Claude 3.5+)
|
|
if kwargs.get("thinking_enabled"):
|
|
body["thinking"] = {
|
|
"type": "enabled",
|
|
"budget_tokens": kwargs.get("thinking_budget_tokens", 10000)
|
|
}
|
|
|
|
# Tool definitions
|
|
if tools:
|
|
body["tools"] = self._convert_tools(tools)
|
|
|
|
# Optional parameters
|
|
if "temperature" in kwargs:
|
|
body["temperature"] = kwargs["temperature"]
|
|
|
|
if "top_p" in kwargs:
|
|
body["top_p"] = kwargs["top_p"]
|
|
|
|
if "stop_sequences" in kwargs:
|
|
body["stop_sequences"] = kwargs["stop_sequences"]
|
|
|
|
return body, headers
|
|
|
|
def _convert_messages(self, messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
|
"""Convert messages to Anthropic format
|
|
|
|
Anthropic message format:
|
|
- role: user, assistant
|
|
- content: str or List[Dict]
|
|
"""
|
|
anthropic_messages = []
|
|
|
|
for msg in messages:
|
|
role = msg.get("role", "user")
|
|
|
|
# Convert role
|
|
if role == "system":
|
|
continue # System messages handled separately
|
|
|
|
anthropic_msg = {
|
|
"role": "user" if role == "user" else "assistant",
|
|
"content": []
|
|
}
|
|
|
|
# Handle content
|
|
content = msg.get("content", "")
|
|
if isinstance(content, str):
|
|
if content.strip():
|
|
anthropic_msg["content"] = content
|
|
elif isinstance(content, list):
|
|
# Convert tool results
|
|
for item in content:
|
|
if isinstance(item, dict):
|
|
if item.get("type") == "tool_result":
|
|
anthropic_msg["content"].append({
|
|
"type": "tool_result",
|
|
"tool_use_id": item.get("tool_call_id", ""),
|
|
"content": item.get("content", "")
|
|
})
|
|
else:
|
|
anthropic_msg["content"].append(item)
|
|
else:
|
|
anthropic_msg["content"].append(str(item))
|
|
|
|
anthropic_messages.append(anthropic_msg)
|
|
|
|
return anthropic_messages
|
|
|
|
def _convert_tools(self, tools: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
|
"""Convert tool definitions to Anthropic format"""
|
|
anthropic_tools = []
|
|
|
|
for tool in tools:
|
|
anthropic_tool = {
|
|
"name": tool.get("name", ""),
|
|
"description": tool.get("description", ""),
|
|
"input_schema": tool.get("parameters", {"type": "object", "properties": {}})
|
|
}
|
|
anthropic_tools.append(anthropic_tool)
|
|
|
|
return anthropic_tools
|
|
|
|
def reset(self):
|
|
"""No-op for pure parsing adapter"""
|
|
pass
|
|
|
|
async def parse_stream_chunk(
|
|
self,
|
|
raw_chunk: str
|
|
) -> AsyncGenerator[ParsedDelta, None]:
|
|
"""Parse Anthropic-format SSE stream
|
|
|
|
Returns incremental content - no accumulation.
|
|
"""
|
|
if not raw_chunk or raw_chunk.strip() == "":
|
|
return
|
|
|
|
try:
|
|
chunk = json.loads(raw_chunk)
|
|
except json.JSONDecodeError:
|
|
return
|
|
|
|
chunk_type = chunk.get("type", "")
|
|
|
|
# Handle errors
|
|
if chunk_type == self.BLOCK_ERROR:
|
|
error_msg = chunk.get("error", {}).get("type", "unknown_error")
|
|
logger.error(f"Anthropic API error: {error_msg}")
|
|
yield ParsedDelta()
|
|
return
|
|
|
|
result = ParsedDelta()
|
|
|
|
if chunk_type == self.BLOCK_MESSAGE_START:
|
|
# Message start - no content yet
|
|
pass
|
|
|
|
elif chunk_type == self.BLOCK_CONTENT_BLOCK_START:
|
|
# Content block start
|
|
block = chunk.get("content_block", {})
|
|
block_type = block.get("type")
|
|
|
|
if block_type == self.SUBTYPE_THINKING:
|
|
# Thinking block start
|
|
thinking_text = block.get("thinking", {}).get("thinking", "")
|
|
result.thinking = thinking_text
|
|
|
|
elif block_type == self.SUBTYPE_TOOL_USE:
|
|
# Tool use block start
|
|
tool_index = chunk.get("index", 0)
|
|
tool_name = block.get("name", "")
|
|
result.tool_call = {
|
|
"index": tool_index,
|
|
"id": "",
|
|
"type": "function",
|
|
"function": {"name": tool_name, "arguments": ""}
|
|
}
|
|
|
|
elif block_type == self.SUBTYPE_TEXT:
|
|
# Text block start - nothing to output yet
|
|
pass
|
|
|
|
elif chunk_type == self.BLOCK_CONTENT_BLOCK_DELTA:
|
|
# Content block delta
|
|
delta = chunk.get("delta", {})
|
|
delta_type = delta.get("type", "")
|
|
|
|
if delta_type == self.DELTA_THINKING:
|
|
# Thinking delta (incremental)
|
|
thinking = delta.get("thinking", "")
|
|
result.thinking = thinking
|
|
|
|
elif delta_type == self.DELTA_TEXT:
|
|
# Text delta (incremental)
|
|
text = delta.get("text", "")
|
|
result.text = text
|
|
|
|
elif delta_type == self.DELTA_INPUT_JSON:
|
|
# Tool arguments delta (incremental)
|
|
partial_json = delta.get("partial_json", "")
|
|
if partial_json:
|
|
result.tool_call = {
|
|
"index": 0,
|
|
"function": {"arguments": partial_json}
|
|
}
|
|
|
|
elif chunk_type == self.BLOCK_CONTENT_BLOCK_STOP:
|
|
# Content block stop
|
|
pass
|
|
|
|
elif chunk_type == self.BLOCK_MESSAGE_DELTA:
|
|
# Message delta (usually contains usage)
|
|
delta = chunk.get("delta", {})
|
|
usage = chunk.get("usage", {})
|
|
|
|
result.usage = {
|
|
"prompt_tokens": usage.get("input_tokens", 0),
|
|
"completion_tokens": usage.get("output_tokens", 0),
|
|
"total_tokens": usage.get("input_tokens", 0) + usage.get("output_tokens", 0)
|
|
}
|
|
|
|
# Check if complete by stop reason
|
|
if delta.get("stop_reason"):
|
|
result.is_complete = True
|
|
|
|
elif chunk_type == self.BLOCK_MESSAGE_STOP:
|
|
# Message stop
|
|
result.is_complete = True
|
|
|
|
# Yield result if there's any content
|
|
if result.has_content() or result.is_complete:
|
|
yield result
|
|
|
|
def parse_response(self, data: Dict[str, Any]) -> Dict:
|
|
"""Parse non-streaming response"""
|
|
content = data.get("content", [])
|
|
thinking = ""
|
|
text_content = ""
|
|
tool_calls = []
|
|
|
|
for block in content:
|
|
if isinstance(block, dict):
|
|
block_type = block.get("type")
|
|
|
|
if block_type == "thinking":
|
|
thinking = block.get("thinking", "")
|
|
elif block_type == "text":
|
|
text_content += block.get("text", "")
|
|
elif block_type == "tool_use":
|
|
tool_calls.append({
|
|
"id": block.get("id", ""),
|
|
"name": block.get("name", ""),
|
|
"input": block.get("input", {})
|
|
})
|
|
|
|
usage = data.get("usage", {})
|
|
|
|
return {
|
|
"content": text_content,
|
|
"thinking": thinking,
|
|
"tool_calls": tool_calls,
|
|
"usage": {
|
|
"prompt_tokens": usage.get("input_tokens", 0),
|
|
"completion_tokens": usage.get("output_tokens", 0),
|
|
"total_tokens": usage.get("input_tokens", 0) + usage.get("output_tokens", 0)
|
|
}
|
|
}
|
|
|
|
def supports_thinking(self) -> bool:
|
|
return True
|
|
|
|
def supports_tools(self) -> bool:
|
|
return True
|