"""LLM Response - Unified LLM response data structures Provides unified response format for data transfer between adapter layer and business layer. Data format: 1. ParsedDelta: Single delta in streaming response (with accumulated content) 2. LLMResponse: Complete LLM response 3. StreamAccumulator: Helper class for accumulating streaming content """ from typing import Dict, Any, Optional, List from dataclasses import dataclass, field @dataclass class ParsedDelta: """Streaming response delta Contains accumulated content from streaming response. Attributes: thinking: Accumulated thinking/reasoning content text: Accumulated text content tool_calls: List of tool calls is_complete: Whether the response is complete usage: Token usage statistics """ thinking: str = "" text: str = "" tool_calls: List[Dict] = field(default_factory=list) is_complete: bool = False usage: Dict[str, int] = None def __post_init__(self): if self.tool_calls is None: self.tool_calls = [] if self.usage is None: self.usage = {} def has_content(self) -> bool: """Check if there's any meaningful content""" return bool(self.thinking or self.text or self.tool_calls) def merge(self, other: 'ParsedDelta') -> 'ParsedDelta': """Merge another delta""" return ParsedDelta( thinking=self.thinking + other.thinking, text=self.text + other.text, tool_calls=self.tool_calls or other.tool_calls, is_complete=other.is_complete, usage=other.usage or self.usage ) @dataclass class LLMResponse: """Complete LLM response Attributes: content: Final text content thinking: Final thinking content (if any) tool_calls: List of tool calls (if any) usage: Token usage """ content: str = "" thinking: str = "" tool_calls: List[Dict] = field(default_factory=list) usage: Dict[str, int] = None def __post_init__(self): if self.tool_calls is None: self.tool_calls = [] if self.usage is None: self.usage = {} @property def has_tool_calls(self) -> bool: """Check if there are tool calls""" return bool(self.tool_calls) def to_message_content(self) -> str: """Convert to message content format""" return self.content class StreamAccumulator: """Accumulates streaming response content Helper class for adapters to accumulate streaming content and create ParsedDelta with accumulated results. """ def __init__(self): self.reset() def reset(self): """Reset all buffers for new stream""" self.thinking = "" self.text = "" self.tool_calls: List[Dict] = [] self.is_complete = False self.usage: Dict[str, int] = {} def has_content(self) -> bool: """Check if there's any meaningful content""" return bool(self.thinking or self.text or self.tool_calls) def add_thinking(self, content: str) -> 'ParsedDelta': """Add thinking/reasoning content""" if content: self.thinking = content return self._create_delta() def add_text(self, content: str) -> 'ParsedDelta': """Add text content""" if content: self.text = content return self._create_delta() def add_tool_calls(self, tool_calls: List[Dict]) -> 'ParsedDelta': """Add tool calls""" if tool_calls: self.tool_calls = tool_calls return self._create_delta() def set_complete(self, is_complete: bool = True) -> 'ParsedDelta': """Mark response as complete""" self.is_complete = is_complete return self._create_delta() def set_usage(self, usage: Dict[str, int]) -> 'ParsedDelta': """Set token usage""" if usage: self.usage = usage return self._create_delta() def _create_delta(self) -> ParsedDelta: """Create ParsedDelta from current accumulated state""" return ParsedDelta( thinking=self.thinking, text=self.text, tool_calls=self.tool_calls, is_complete=self.is_complete, usage=self.usage ) def to_response(self) -> LLMResponse: """Convert to complete LLMResponse""" return LLMResponse( content=self.text, thinking=self.thinking, tool_calls=self.tool_calls, usage=self.usage ) # Backward compatibility alias LLMResponseParser = StreamAccumulator def llm_parser_factory() -> StreamAccumulator: """Factory function to create a new StreamAccumulator""" return StreamAccumulator()