"""Stream Context - Manages streaming state and content accumulation This module follows the Composition over Inheritance principle initially, but StreamContext inherits from StreamState for simplicity. The rendering logic is delegated to a separate StreamRenderer. """ from dataclasses import dataclass from typing import List, Dict, Any, Optional from enum import Enum from luxx.services.events import sse_event class StepType(str, Enum): """Step type enumeration""" THINKING = "thinking" TEXT = "text" TOOL_CALL = "tool_call" TOOL_RESULT = "tool_result" THINK_START = "" THINK_END = "" @dataclass class Step: """Represents a single step in the response process""" id: str index: int type: str content: str = "" name: str = "" arguments: str = "" id_ref: str = "" success: bool = True def to_dict(self) -> Dict[str, Any]: result = { "id": self.id, "index": self.index, "type": self.type, } if self.content: result["content"] = self.content if self.name: result["name"] = self.name if self.arguments: result["arguments"] = self.arguments if self.id_ref: result["id_ref"] = self.id_ref if self.type == StepType.TOOL_RESULT: result["success"] = self.success return result class StreamState: """Pure state management for streaming This class maintains all state but delegates rendering to StreamRenderer. """ def __init__(self): self.reset() def reset(self, full_reset: bool = True): """Reset state for a new iteration or full reset. Args: full_reset: If True, reset everything. If False, only reset per-iteration state. """ if full_reset: # Full reset - clear everything self.step_index = 0 self.all_steps: List[Step] = [] self.all_tool_results: List[Dict] = [] # Per-iteration reset (always runs) self.current_step_id: Optional[str] = None self.current_step_idx: Optional[int] = None self.current_step_type: Optional[str] = None self.full_content = "" self.full_thinking = "" self.tool_calls_list: List[Dict] = [] self._last_message_id: Optional[str] = None self._last_token_count = 0 self._last_usage: Optional[Dict] = None self._in_thinking = False self._thinking_buf = "" self._text_buf = "" # Track content offset per step to avoid duplication across steps self._text_offset = 0 self._thinking_offset = 0 # Track step start offset for finalize_step to compute complete step content self._step_start_text_offset = 0 self._step_start_thinking_offset = 0 def process_content(self, content: str) -> Dict: """Process raw content, handling thinking tags.""" if not content: return {"thinking": "", "text": "", "should_emit": False, "thinking_only": False} thinking = "" text = "" should_emit = False thinking_only = False # Handle THINK_START - can appear anywhere in content if not self._in_thinking and THINK_START in content: self._in_thinking = True idx = content.find(THINK_START) # Any text before THINK_START is emitted immediately if idx > 0: text += content[:idx] content = content[idx + len(THINK_START):] # Handle THINK_END - can appear anywhere if THINK_END in content: idx = content.find(THINK_END) thinking_content = content[:idx] content = content[idx + len(THINK_END):] self._in_thinking = False should_emit = True thinking = thinking_content # Handle any remaining text after THINK_END while THINK_END in content: second_idx = content.find(THINK_END) text += content[:second_idx] content = content[second_idx + len(THINK_END):] # Any remaining content after last THINK_END is text if content: text += content thinking_only = not bool(text) elif self._in_thinking: # In thinking mode, stream content directly thinking = content should_emit = True thinking_only = True else: # Not in thinking mode, emit text immediately for streaming text = content should_emit = True # Emit text that appeared before THINK_START if text and not should_emit: should_emit = True return { "thinking": thinking, "text": text, "should_emit": should_emit, "thinking_only": thinking_only } def flush(self) -> tuple: """Flush remaining buffers and return content""" thinking = self._thinking_buf text = self._text_buf self._thinking_buf = "" self._text_buf = "" return thinking, text def start_step(self, step_type: str) -> str: """Start a new step and return its ID. Auto-finalizes previous step.""" # Auto-finalize previous step before starting new one self.finalize_step() self.current_step_idx = self.step_index self.current_step_id = f"step-{self.step_index}" self.current_step_type = step_type # Record content offset so this step only includes content added during it if step_type == StepType.TEXT: self._text_offset = len(self.full_content) self._step_start_text_offset = len(self.full_content) elif step_type == StepType.THINKING: self._thinking_offset = len(self.full_thinking) self._step_start_thinking_offset = len(self.full_thinking) self.step_index += 1 return self.current_step_id def finalize_step(self): """Finalize the current step and add to all_steps""" if self.current_step_id is None: return # TOOL_CALL and TOOL_RESULT steps are handled manually in render methods # They have their own data (name, arguments, id_ref) that can't be auto-generated if self.current_step_type in (StepType.TOOL_CALL, StepType.TOOL_RESULT): # Just reset the state without adding a step self.current_step_id = None self.current_step_idx = None self.current_step_type = None return if self.current_step_type == StepType.TEXT: content = self.full_content[self._step_start_text_offset:] elif self.current_step_type == StepType.THINKING: content = self.full_thinking[self._step_start_thinking_offset:] else: content = "" step = Step( id=self.current_step_id, index=self.current_step_idx, type=self.current_step_type, content=content ) self.all_steps.append(step) # Reset current step state but keep buffers for accumulation self.current_step_id = None self.current_step_idx = None self.current_step_type = None def accumulate_tool_call(self, tc_delta: Dict): """Accumulate tool call delta""" idx = tc_delta.get("index", 0) if idx >= len(self.tool_calls_list): # BUG FIX: DeepSeek may omit "type" in tool_call deltas. # Without defaulting to "function", the tool call would have # type=None, causing JSON serialization issues or API rejections. tc_type = tc_delta.get("type", "function") self.tool_calls_list.append({ "id": tc_delta.get("id", ""), "type": tc_type, "function": {"name": "", "arguments": ""} }) else: # Update id if provided (LLM may send id in a later delta) if tc_delta.get("id"): self.tool_calls_list[idx]["id"] = tc_delta["id"] # Ensure type field is always present if "type" in tc_delta: self.tool_calls_list[idx]["type"] = tc_delta["type"] func = tc_delta.get("function", {}) if func.get("name"): self.tool_calls_list[idx]["function"]["name"] += func["name"] if func.get("arguments"): self.tool_calls_list[idx]["function"]["arguments"] += func["arguments"] def add_tool_result(self, result: Dict): """Add a tool result to history""" self.all_tool_results.append({ "role": "tool", "tool_call_id": result.get("tool_call_id", ""), "content": result.get("content", "") }) def set_completion(self, msg_id: str, token_count: int, usage: dict): """Set completion metadata""" self._last_message_id = msg_id self._last_token_count = token_count self._last_usage = usage def get_steps_for_save(self) -> List[Dict]: """Get all steps as dictionaries""" return [step.to_dict() for step in self.all_steps] class StreamRenderer: """Renders stream state to SSE events""" @staticmethod def render_tool_calls(state: StreamState) -> List[str]: """Render tool calls as SSE events Note: This manually manages step creation for TOOL_CALL type. The start_step is called to get a unique step id and index, but the step is manually created and added to avoid duplication. """ import logging logger = logging.getLogger(__name__) events = [] for tc in state.tool_calls_list: # Get step id and index from start_step step_id = state.start_step(StepType.TOOL_CALL) step = Step( id=step_id, index=state.current_step_idx, type=StepType.TOOL_CALL, name=tc["function"]["name"], arguments=tc["function"]["arguments"], id_ref=tc.get("id", "") ) # Manually add the step (finalize_step won't add it for TOOL_CALL type) state.all_steps.append(step) logger.debug(f"[TOOL_CALL] Created step: id={step.id}, index={step.index}, name={step.name}, id_ref={step.id_ref}") events.append(sse_event("process_step", {"step": step.to_dict()})) # Reset current step state since we manually handled it state.current_step_id = None state.current_step_idx = None state.current_step_type = None return events @staticmethod def render_tool_result(state: StreamState, result: Dict, ref_step_id: str) -> tuple: """Render a tool result as SSE event Note: This does NOT call start_step because tool_result should be linked to its corresponding tool_call via id_ref, not create a new independent step. The index is derived from the tool_call step to maintain proper ordering. """ import json import logging logger = logging.getLogger(__name__) content = result.get("content", "") success = True try: parsed = json.loads(content) if isinstance(parsed, dict): success = parsed.get("success", True) except (json.JSONDecodeError, TypeError): pass # Find the corresponding tool_call step to get its index tool_call_step = None for s in state.all_steps: if s.type == StepType.TOOL_CALL and s.id == ref_step_id: tool_call_step = s break # Use the same index as the tool_call for proper ordering in frontend step_index = tool_call_step.index if tool_call_step else state.step_index step = Step( id=f"result-{ref_step_id}", # Unique id based on tool_call step id index=step_index, # Same index as tool_call for proper ordering type=StepType.TOOL_RESULT, name=result.get("name", ""), content=content, id_ref=ref_step_id, success=success ) state.all_steps.append(step) state.add_tool_result(result) logger.debug(f"[TOOL_RESULT] Created step: id={step.id}, index={step.index}, id_ref={step.id_ref}, ref_step_id={ref_step_id}") return step, sse_event("process_step", {"step": step.to_dict()}) @staticmethod def render_thinking(state: StreamState) -> str: """Render thinking content as SSE event""" step = Step( id=state.current_step_id, index=state.current_step_idx, type=StepType.THINKING, content=state.full_thinking[state._thinking_offset:] ) return sse_event("process_step", {"step": step.to_dict()}) @staticmethod def render_text(state: StreamState) -> str: """Render text content as SSE event""" step = Step( id=state.current_step_id, index=state.current_step_idx, type=StepType.TEXT, content=state.full_content[state._text_offset:] ) return sse_event("process_step", {"step": step.to_dict()}) @staticmethod def render_error(error_msg: str) -> str: """Render error event""" return sse_event("error", {"content": error_msg}) # Convenience function for backward compatibility def render_error(error_msg: str) -> str: """Render error event""" return sse_event("error", {"content": error_msg})