Luxx/luxx/services/stream_context.py

376 lines
14 KiB
Python

"""Stream Context - Manages streaming state and content accumulation
This module follows the Composition over Inheritance principle initially,
but StreamContext inherits from StreamState for simplicity.
The rendering logic is delegated to a separate StreamRenderer.
"""
from dataclasses import dataclass
from typing import List, Dict, Any, Optional
from enum import Enum
from luxx.services.events import sse_event
class StepType(str, Enum):
"""Step type enumeration"""
THINKING = "thinking"
TEXT = "text"
TOOL_CALL = "tool_call"
TOOL_RESULT = "tool_result"
THINK_START = "<think>"
THINK_END = "</think>"
@dataclass
class Step:
"""Represents a single step in the response process"""
id: str
index: int
type: str
content: str = ""
name: str = ""
arguments: str = ""
id_ref: str = ""
success: bool = True
def to_dict(self) -> Dict[str, Any]:
result = {
"id": self.id,
"index": self.index,
"type": self.type,
}
if self.content:
result["content"] = self.content
if self.name:
result["name"] = self.name
if self.arguments:
result["arguments"] = self.arguments
if self.id_ref:
result["id_ref"] = self.id_ref
if self.type == StepType.TOOL_RESULT:
result["success"] = self.success
return result
class StreamState:
"""Pure state management for streaming
This class maintains all state but delegates rendering to StreamRenderer.
"""
def __init__(self):
self.reset()
def reset(self, full_reset: bool = True):
"""Reset state for a new iteration or full reset.
Args:
full_reset: If True, reset everything. If False, only reset per-iteration state.
"""
if full_reset:
# Full reset - clear everything
self.step_index = 0
self.all_steps: List[Step] = []
self.all_tool_results: List[Dict] = []
# Per-iteration reset (always runs)
self.current_step_id: Optional[str] = None
self.current_step_idx: Optional[int] = None
self.current_step_type: Optional[str] = None
self.full_content = ""
self.full_thinking = ""
self.tool_calls_list: List[Dict] = []
self._last_message_id: Optional[str] = None
self._last_token_count = 0
self._last_usage: Optional[Dict] = None
self._in_thinking = False
self._thinking_buf = ""
self._text_buf = ""
# Track content offset per step to avoid duplication across steps
self._text_offset = 0
self._thinking_offset = 0
# Track step start offset for finalize_step to compute complete step content
self._step_start_text_offset = 0
self._step_start_thinking_offset = 0
def process_content(self, content: str) -> Dict:
"""Process raw content, handling thinking tags."""
if not content:
return {"thinking": "", "text": "", "should_emit": False, "thinking_only": False}
thinking = ""
text = ""
should_emit = False
thinking_only = False
# Handle THINK_START - can appear anywhere in content
if not self._in_thinking and THINK_START in content:
self._in_thinking = True
idx = content.find(THINK_START)
# Any text before THINK_START is emitted immediately
if idx > 0:
text += content[:idx]
content = content[idx + len(THINK_START):]
# Handle THINK_END - can appear anywhere
if THINK_END in content:
idx = content.find(THINK_END)
thinking_content = content[:idx]
content = content[idx + len(THINK_END):]
self._in_thinking = False
should_emit = True
thinking = thinking_content
# Handle any remaining text after THINK_END
while THINK_END in content:
second_idx = content.find(THINK_END)
text += content[:second_idx]
content = content[second_idx + len(THINK_END):]
# Any remaining content after last THINK_END is text
if content:
text += content
thinking_only = not bool(text)
elif self._in_thinking:
# In thinking mode, stream content directly
thinking = content
should_emit = True
thinking_only = True
else:
# Not in thinking mode, emit text immediately for streaming
text = content
should_emit = True
# Emit text that appeared before THINK_START
if text and not should_emit:
should_emit = True
return {
"thinking": thinking,
"text": text,
"should_emit": should_emit,
"thinking_only": thinking_only
}
def flush(self) -> tuple:
"""Flush remaining buffers and return content"""
thinking = self._thinking_buf
text = self._text_buf
self._thinking_buf = ""
self._text_buf = ""
return thinking, text
def start_step(self, step_type: str) -> str:
"""Start a new step and return its ID. Auto-finalizes previous step."""
# Auto-finalize previous step before starting new one
self.finalize_step()
self.current_step_idx = self.step_index
self.current_step_id = f"step-{self.step_index}"
self.current_step_type = step_type
# Record content offset so this step only includes content added during it
if step_type == StepType.TEXT:
self._text_offset = len(self.full_content)
self._step_start_text_offset = len(self.full_content)
elif step_type == StepType.THINKING:
self._thinking_offset = len(self.full_thinking)
self._step_start_thinking_offset = len(self.full_thinking)
self.step_index += 1
return self.current_step_id
def finalize_step(self):
"""Finalize the current step and add to all_steps"""
if self.current_step_id is None:
return
# TOOL_CALL and TOOL_RESULT steps are handled manually in render methods
# They have their own data (name, arguments, id_ref) that can't be auto-generated
if self.current_step_type in (StepType.TOOL_CALL, StepType.TOOL_RESULT):
# Just reset the state without adding a step
self.current_step_id = None
self.current_step_idx = None
self.current_step_type = None
return
if self.current_step_type == StepType.TEXT:
content = self.full_content[self._step_start_text_offset:]
elif self.current_step_type == StepType.THINKING:
content = self.full_thinking[self._step_start_thinking_offset:]
else:
content = ""
step = Step(
id=self.current_step_id,
index=self.current_step_idx,
type=self.current_step_type,
content=content
)
self.all_steps.append(step)
# Reset current step state but keep buffers for accumulation
self.current_step_id = None
self.current_step_idx = None
self.current_step_type = None
def accumulate_tool_call(self, tc_delta: Dict):
"""Accumulate tool call delta"""
idx = tc_delta.get("index", 0)
if idx >= len(self.tool_calls_list):
# BUG FIX: DeepSeek may omit "type" in tool_call deltas.
# Without defaulting to "function", the tool call would have
# type=None, causing JSON serialization issues or API rejections.
tc_type = tc_delta.get("type", "function")
self.tool_calls_list.append({
"id": tc_delta.get("id", ""),
"type": tc_type,
"function": {"name": "", "arguments": ""}
})
else:
# Update id if provided (LLM may send id in a later delta)
if tc_delta.get("id"):
self.tool_calls_list[idx]["id"] = tc_delta["id"]
# Ensure type field is always present
if "type" in tc_delta:
self.tool_calls_list[idx]["type"] = tc_delta["type"]
func = tc_delta.get("function", {})
if func.get("name"):
self.tool_calls_list[idx]["function"]["name"] += func["name"]
if func.get("arguments"):
self.tool_calls_list[idx]["function"]["arguments"] += func["arguments"]
def add_tool_result(self, result: Dict):
"""Add a tool result to history"""
self.all_tool_results.append({
"role": "tool",
"tool_call_id": result.get("tool_call_id", ""),
"content": result.get("content", "")
})
def set_completion(self, msg_id: str, token_count: int, usage: dict):
"""Set completion metadata"""
self._last_message_id = msg_id
self._last_token_count = token_count
self._last_usage = usage
def get_steps_for_save(self) -> List[Dict]:
"""Get all steps as dictionaries"""
return [step.to_dict() for step in self.all_steps]
class StreamRenderer:
"""Renders stream state to SSE events"""
@staticmethod
def render_tool_calls(state: StreamState) -> List[str]:
"""Render tool calls as SSE events
Note: This manually manages step creation for TOOL_CALL type.
The start_step is called to get a unique step id and index,
but the step is manually created and added to avoid duplication.
"""
import logging
logger = logging.getLogger(__name__)
events = []
for tc in state.tool_calls_list:
# Get step id and index from start_step
step_id = state.start_step(StepType.TOOL_CALL)
step = Step(
id=step_id,
index=state.current_step_idx,
type=StepType.TOOL_CALL,
name=tc["function"]["name"],
arguments=tc["function"]["arguments"],
id_ref=tc.get("id", "")
)
# Manually add the step (finalize_step won't add it for TOOL_CALL type)
state.all_steps.append(step)
logger.debug(f"[TOOL_CALL] Created step: id={step.id}, index={step.index}, name={step.name}, id_ref={step.id_ref}")
events.append(sse_event("process_step", {"step": step.to_dict()}))
# Reset current step state since we manually handled it
state.current_step_id = None
state.current_step_idx = None
state.current_step_type = None
return events
@staticmethod
def render_tool_result(state: StreamState, result: Dict, ref_step_id: str) -> tuple:
"""Render a tool result as SSE event
Note: This does NOT call start_step because tool_result should be linked
to its corresponding tool_call via id_ref, not create a new independent step.
The index is derived from the tool_call step to maintain proper ordering.
"""
import json
import logging
logger = logging.getLogger(__name__)
content = result.get("content", "")
success = True
try:
parsed = json.loads(content)
if isinstance(parsed, dict):
success = parsed.get("success", True)
except (json.JSONDecodeError, TypeError):
pass
# Find the corresponding tool_call step to get its index
tool_call_step = None
for s in state.all_steps:
if s.type == StepType.TOOL_CALL and s.id == ref_step_id:
tool_call_step = s
break
# Use the same index as the tool_call for proper ordering in frontend
step_index = tool_call_step.index if tool_call_step else state.step_index
step = Step(
id=f"result-{ref_step_id}", # Unique id based on tool_call step id
index=step_index, # Same index as tool_call for proper ordering
type=StepType.TOOL_RESULT,
name=result.get("name", ""),
content=content,
id_ref=ref_step_id,
success=success
)
state.all_steps.append(step)
state.add_tool_result(result)
logger.debug(f"[TOOL_RESULT] Created step: id={step.id}, index={step.index}, id_ref={step.id_ref}, ref_step_id={ref_step_id}")
return step, sse_event("process_step", {"step": step.to_dict()})
@staticmethod
def render_thinking(state: StreamState) -> str:
"""Render thinking content as SSE event"""
step = Step(
id=state.current_step_id,
index=state.current_step_idx,
type=StepType.THINKING,
content=state.full_thinking[state._thinking_offset:]
)
return sse_event("process_step", {"step": step.to_dict()})
@staticmethod
def render_text(state: StreamState) -> str:
"""Render text content as SSE event"""
step = Step(
id=state.current_step_id,
index=state.current_step_idx,
type=StepType.TEXT,
content=state.full_content[state._text_offset:]
)
return sse_event("process_step", {"step": step.to_dict()})
@staticmethod
def render_error(error_msg: str) -> str:
"""Render error event"""
return sse_event("error", {"content": error_msg})
# Convenience function for backward compatibility
def render_error(error_msg: str) -> str:
"""Render error event"""
return sse_event("error", {"content": error_msg})