"""Anthropic Adapter - Anthropic Claude API adapter Supports Anthropic Claude API streaming and non-streaming responses. """ import json import logging from typing import Dict, List, Any, AsyncGenerator from .base import ProviderAdapter from ..llm_response import ParsedDelta, LLMResponse logger = logging.getLogger(__name__) class AnthropicAdapter(ProviderAdapter): """Anthropic Claude API adapter Pure parsing adapter - no internal state management. Each parse_stream_chunk call returns incremental content. Accumulation is handled by the consumer (AgenticLoop). Anthropic API uses a completely different format from OpenAI: - Endpoint: POST /v1/messages - Streaming: SSE events (content_block_start, content_block_delta, etc.) - Thinking: Independent thinking type content block - Tools: tool_use type content block Reference: https://docs.anthropic.com/claude/reference/messages """ # Anthropic API endpoint suffix MESSAGES_PATH = "/v1/messages" # Anthropic API version ANTHROPIC_VERSION = "2023-06-01" # Content block types BLOCK_MESSAGE_START = "message_start" BLOCK_CONTENT_BLOCK_START = "content_block_start" BLOCK_CONTENT_BLOCK_DELTA = "content_block_delta" BLOCK_CONTENT_BLOCK_STOP = "content_block_stop" BLOCK_MESSAGE_DELTA = "message_delta" BLOCK_MESSAGE_STOP = "message_stop" BLOCK_ERROR = "error" # Delta types DELTA_THINKING = "thinking_delta" DELTA_TEXT = "text_delta" DELTA_INPUT_JSON = "input_json_delta" # Content block subtypes SUBTYPE_THINKING = "thinking" SUBTYPE_TEXT = "text" SUBTYPE_TOOL_USE = "tool_use" def __init__(self): pass @property def provider_type(self) -> str: return "anthropic" def build_request( self, model: str, messages: List[Dict[str, Any]], tools: List[Dict[str, Any]] = None, **kwargs ) -> tuple[Dict[str, Any], Dict[str, str]]: """Build Anthropic-format request Anthropic request format differs from OpenAI: - Uses "messages" instead of "message" - Requires "max_tokens" - Different tool format """ api_key = kwargs.get("api_key", "") headers = { "Content-Type": "application/json", "Authorization": f"Bearer {api_key}", "anthropic-version": self.ANTHROPIC_VERSION } # Convert messages to Anthropic format anthropic_messages = self._convert_messages(messages) body = { "model": model, "messages": anthropic_messages, "stream": kwargs.get("stream", True), "max_tokens": kwargs.get("max_tokens", 4096) } # System message if "system" in kwargs: body["system"] = kwargs["system"] else: # Extract from first message for msg in messages: if msg.get("role") == "system": body["system"] = msg.get("content", "") break # Thinking capability (Claude 3.5+) if kwargs.get("thinking_enabled"): body["thinking"] = { "type": "enabled", "budget_tokens": kwargs.get("thinking_budget_tokens", 10000) } # Tool definitions if tools: body["tools"] = self._convert_tools(tools) # Optional parameters if "temperature" in kwargs: body["temperature"] = kwargs["temperature"] if "top_p" in kwargs: body["top_p"] = kwargs["top_p"] if "stop_sequences" in kwargs: body["stop_sequences"] = kwargs["stop_sequences"] return body, headers def _convert_messages(self, messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Convert messages to Anthropic format Anthropic message format: - role: user, assistant - content: str or List[Dict] """ anthropic_messages = [] for msg in messages: role = msg.get("role", "user") # Convert role if role == "system": continue # System messages handled separately anthropic_msg = { "role": "user" if role == "user" else "assistant", "content": [] } # Handle content content = msg.get("content", "") if isinstance(content, str): if content.strip(): anthropic_msg["content"] = content elif isinstance(content, list): # Convert tool results for item in content: if isinstance(item, dict): if item.get("type") == "tool_result": anthropic_msg["content"].append({ "type": "tool_result", "tool_use_id": item.get("tool_call_id", ""), "content": item.get("content", "") }) else: anthropic_msg["content"].append(item) else: anthropic_msg["content"].append(str(item)) anthropic_messages.append(anthropic_msg) return anthropic_messages def _convert_tools(self, tools: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Convert tool definitions to Anthropic format""" anthropic_tools = [] for tool in tools: anthropic_tool = { "name": tool.get("name", ""), "description": tool.get("description", ""), "input_schema": tool.get("parameters", {"type": "object", "properties": {}}) } anthropic_tools.append(anthropic_tool) return anthropic_tools def reset(self): """No-op for pure parsing adapter""" pass async def parse_stream_chunk( self, raw_chunk: str ) -> AsyncGenerator[ParsedDelta, None]: """Parse Anthropic-format SSE stream Returns incremental content - no accumulation. """ if not raw_chunk or raw_chunk.strip() == "": return try: chunk = json.loads(raw_chunk) except json.JSONDecodeError: return chunk_type = chunk.get("type", "") # Handle errors if chunk_type == self.BLOCK_ERROR: error_msg = chunk.get("error", {}).get("type", "unknown_error") logger.error(f"Anthropic API error: {error_msg}") yield ParsedDelta() return result = ParsedDelta() if chunk_type == self.BLOCK_MESSAGE_START: # Message start - no content yet pass elif chunk_type == self.BLOCK_CONTENT_BLOCK_START: # Content block start block = chunk.get("content_block", {}) block_type = block.get("type") if block_type == self.SUBTYPE_THINKING: # Thinking block start thinking_text = block.get("thinking", {}).get("thinking", "") result.thinking = thinking_text elif block_type == self.SUBTYPE_TOOL_USE: # Tool use block start tool_index = chunk.get("index", 0) tool_name = block.get("name", "") result.tool_calls = [{ "index": tool_index, "id": "", "type": "function", "function": {"name": tool_name, "arguments": ""} }] elif block_type == self.SUBTYPE_TEXT: # Text block start - nothing to output yet pass elif chunk_type == self.BLOCK_CONTENT_BLOCK_DELTA: # Content block delta delta = chunk.get("delta", {}) delta_type = delta.get("type", "") if delta_type == self.DELTA_THINKING: # Thinking delta (incremental) thinking = delta.get("thinking", "") result.thinking = thinking elif delta_type == self.DELTA_TEXT: # Text delta (incremental) text = delta.get("text", "") result.text = text elif delta_type == self.DELTA_INPUT_JSON: # Tool arguments delta (incremental) partial_json = delta.get("partial_json", "") # For tool calls, we need to update the arguments # This is handled by the consumer (AgenticLoop) if partial_json: result.tool_calls = [{ "index": 0, "function": {"arguments": partial_json} }] elif chunk_type == self.BLOCK_CONTENT_BLOCK_STOP: # Content block stop pass elif chunk_type == self.BLOCK_MESSAGE_DELTA: # Message delta (usually contains usage) delta = chunk.get("delta", {}) usage = chunk.get("usage", {}) result.usage = { "prompt_tokens": usage.get("input_tokens", 0), "completion_tokens": usage.get("output_tokens", 0), "total_tokens": usage.get("input_tokens", 0) + usage.get("output_tokens", 0) } # Check if complete by stop reason if delta.get("stop_reason"): result.is_complete = True elif chunk_type == self.BLOCK_MESSAGE_STOP: # Message stop result.is_complete = True # Yield result if there's any content if result.has_content() or result.is_complete: yield result def parse_response(self, data: Dict[str, Any]) -> LLMResponse: """Parse non-streaming response""" content = data.get("content", []) thinking = "" text_content = "" tool_calls = [] for block in content: if isinstance(block, dict): block_type = block.get("type") if block_type == "thinking": thinking = block.get("thinking", "") elif block_type == "text": text_content += block.get("text", "") elif block_type == "tool_use": tool_calls.append({ "id": block.get("id", ""), "name": block.get("name", ""), "input": block.get("input", {}) }) usage = data.get("usage", {}) return LLMResponse( content=text_content, thinking=thinking, tool_calls=tool_calls, usage={ "prompt_tokens": usage.get("input_tokens", 0), "completion_tokens": usage.get("output_tokens", 0), "total_tokens": usage.get("input_tokens", 0) + usage.get("output_tokens", 0) } ) def supports_thinking(self) -> bool: return True