"""AgenticLoop - Executes the Agentic Loop: LLM + Tools iteration. This module follows the Single Responsibility Principle. """ import json import uuid import logging from typing import List, Dict, AsyncGenerator from luxx.tools.executor import ToolExecutor from luxx.services.llm_client import LLMClient from luxx.services.stream_context import StreamState, StreamRenderer, StepType from luxx.services.llm_response import ParsedDelta from luxx.services.events import sse_event logger = logging.getLogger(__name__) MAX_ITERATIONS = 10 class AgenticLoop: """Executes the agentic loop (LLM + Tools iteration)""" def __init__(self, tool_executor: ToolExecutor): self.tool_executor = tool_executor self._reasoning_content = "" # DeepSeek thinking mode - persists across iterations async def execute( self, llm: LLMClient, model: str, messages: List[Dict], tools: list, temperature: float, max_tokens: int, thinking_enabled: bool, context: StreamState, tool_context: dict = None ) -> AsyncGenerator[str, None]: total_usage = {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0} # Reset reasoning_content for each execution (DeepSeek thinking mode) self._reasoning_content = "" for iteration in range(MAX_ITERATIONS): # Per-iteration reset, keep previous steps and tool results context.reset(full_reset=False) # BUG FIX: DeepSeek thinking mode requires reasoning_content to be passed back. # Without this, DeepSeek returns: # "The `reasoning_content` in the thinking mode must be passed back to the API." # We find the last assistant message and add reasoning_content from the previous # iteration's accumulated thinking content. if iteration > 0 and self._reasoning_content: for i in range(len(messages) - 1, -1, -1): if messages[i].get("role") == "assistant": messages[i]["reasoning_content"] = self._reasoning_content break async for delta in llm.stream_call( model=model, messages=messages, tools=tools if iteration == 0 else None, # Only send tools on first iteration temperature=temperature, max_tokens=max_tokens, thinking_enabled=thinking_enabled ): # Handle error delta - propagate as SSE error event if delta.has_error(): logger.error(f"[AGENTIC_LOOP] Stream error: {delta.error_msg}") yield sse_event("error", {"content": delta.error_msg}) return # Accumulate reasoning_content for DeepSeek thinking mode if delta.has_thinking(): self._reasoning_content += delta.thinking events = self._process_delta(delta, context, total_usage) for event in events: yield event # Empty delta without complete signal - skip and continue if not delta.has_content() and not delta.is_complete: continue # No error flag needed - rely on is_complete check below if delta.is_complete: for event in self._flush_remaining(context): yield event context.finalize_step() if context.tool_calls_list: for event in self._execute_tools(context, messages, tool_context): yield event continue for event in self._complete(context, total_usage): yield event return # Exceeded max iterations yield sse_event("error", {"content": "Exceeded maximum tool call iterations"}) def _process_delta(self, delta: ParsedDelta, ctx: StreamState, total_usage: dict) -> List[str]: """Process a single delta from the LLM stream""" events = [] if delta.usage: total_usage.update({ "prompt_tokens": delta.usage.get("prompt_tokens", 0), "completion_tokens": delta.usage.get("completion_tokens", 0), "total_tokens": delta.usage.get("total_tokens", 0) }) # BUG FIX: Handle DeepSeek reasoning_content (delta.thinking). # DeepSeek sends reasoning_content in separate deltas from normal content. # These must be rendered as thinking steps AND accumulated into # self._reasoning_content for subsequent API calls (line 71-72). # Without this, DeepSeek returns 400: # "The `reasoning_content` in the thinking mode must be passed back to the API." if delta.thinking: if ctx.current_step_type != StepType.THINKING: ctx.start_step(StepType.THINKING) ctx.full_thinking += delta.thinking events.append(StreamRenderer.render_thinking(ctx)) ctx._thinking_offset = len(ctx.full_thinking) if delta.content: result = ctx.process_content(delta.content) if result["should_emit"]: # Track if we need new step need_new_thinking = result["thinking"] and ctx.current_step_type != StepType.THINKING need_new_text = result["text"] and ctx.current_step_type != StepType.TEXT if result["thinking"]: if need_new_thinking: ctx.start_step(StepType.THINKING) ctx.full_thinking += result["thinking"] events.append(StreamRenderer.render_thinking(ctx)) # Advance offset to avoid resending accumulated content on next delta ctx._thinking_offset = len(ctx.full_thinking) if result["text"]: if need_new_text: ctx.start_step(StepType.TEXT) ctx.full_content += result["text"] events.append(StreamRenderer.render_text(ctx)) # Advance offset to avoid resending accumulated content on next delta ctx._text_offset = len(ctx.full_content) if delta.has_tool_call(): ctx.accumulate_tool_call(delta.tool_call) return events def _execute_tools(self, ctx: StreamState, messages: list, tool_context: dict = None) -> List[str]: """Execute tools and add results to messages""" events = [] # Check if there's an active streaming step that needs to be finalized # before tool execution (e.g. text step started but not finalized) if ctx.current_step_id is not None: ctx.finalize_step() for event in StreamRenderer.render_tool_calls(ctx): events.append(event) # Execute tools and handle any exceptions try: tool_results = self.tool_executor.process_tool_calls_parallel( ctx.tool_calls_list, tool_context or {} ) except Exception as e: logger.error(f"[EXECUTE_TOOLS] Tool execution failed: {type(e).__name__}: {e}") # Generate error results for each tool call tool_results = [] for tc in ctx.tool_calls_list: tool_results.append({ "tool_call_id": tc.get("id", ""), "role": "tool", "name": tc.get("function", {}).get("name", "unknown"), "content": json.dumps({"success": False, "error": f"Tool execution failed: {type(e).__name__}: {str(e)}"}) }) # Build mapping from LLM tool_call_id to step id # Use index-based matching as fallback when id is empty tool_call_steps = [s for s in ctx.all_steps if s.type == StepType.TOOL_CALL] logger.debug(f"[EXECUTE_TOOLS] tool_call_steps: {[(s.id, s.name, s.id_ref) for s in tool_call_steps]}") logger.debug(f"[EXECUTE_TOOLS] tool_calls_list: {[(tc.get('id'), tc.get('function', {}).get('name')) for tc in ctx.tool_calls_list]}") for i, (tr, tc) in enumerate(zip(tool_results, ctx.tool_calls_list)): # Find the corresponding tool_call step tc_id = tc.get("id", "") ref_id = None # First try to match by LLM tool_call_id if tc_id: for s in tool_call_steps: if s.id_ref == tc_id: ref_id = s.id logger.debug(f"[EXECUTE_TOOLS] Matched by id: tc_id={tc_id} -> step_id={ref_id}") break # Fallback: use index-based matching (most reliable for parallel execution) if ref_id is None and i < len(tool_call_steps): ref_id = tool_call_steps[i].id logger.debug(f"[EXECUTE_TOOLS] Matched by index: i={i} -> step_id={ref_id}") # Last resort: generate a step id if ref_id is None: ref_id = f"step-{len(ctx.all_steps) - len(tool_results) + i}" logger.warning(f"[EXECUTE_TOOLS] Could not match tool result, generated ref_id: {ref_id}") _, event = StreamRenderer.render_tool_result(ctx, tr, ref_id) events.append(event) # When tool_calls exist, content should be null (OpenAI API spec) assistant_msg = { "role": "assistant", "tool_calls": ctx.tool_calls_list } # When tool_calls exist, content must be null (don't include text before tool calls) if not ctx.tool_calls_list and ctx.full_content and not ctx.full_content.isspace(): assistant_msg["content"] = ctx.full_content # If tool_calls exist, omit content field entirely (or set to null) # BUG FIX: Include reasoning_content in assistant message for DeepSeek. # DeepSeek's thinking mode REQUIRES reasoning_content to be echoed back # to the API in every subsequent request. Without this, the API returns: # HTTP 400: "The `reasoning_content` in the thinking mode must be # passed back to the API." # This field is accumulated from delta.thinking in the stream loop above. if self._reasoning_content: assistant_msg["reasoning_content"] = self._reasoning_content messages.append(assistant_msg) messages.extend(ctx.all_tool_results[-len(tool_results):]) return events def _flush_remaining(self, ctx: StreamState) -> List[str]: """Flush remaining buffers on complete""" events = [] # Use current buffers (not flushed by process_content if no ) thinking = ctx._thinking_buf text = ctx._text_buf if thinking: ctx.full_thinking += thinking if ctx.current_step_type != StepType.THINKING: ctx.start_step(StepType.THINKING) events.append(StreamRenderer.render_thinking(ctx)) ctx.finalize_step() if text: ctx.full_content += text if ctx.current_step_type != StepType.TEXT: ctx.start_step(StepType.TEXT) events.append(StreamRenderer.render_text(ctx)) ctx.finalize_step() ctx._thinking_buf = "" ctx._text_buf = "" return events def _complete(self, ctx: StreamState, total_usage: dict) -> List[str]: """Signal completion of the agentic loop""" token_count = total_usage.get("completion_tokens") or len(ctx.full_content) // 4 msg_id = str(uuid.uuid4()) logger.info(f"[TOKEN] usage={total_usage}, count={token_count}") ctx.set_completion(msg_id, token_count, total_usage) return [sse_event("done", { "message_id": msg_id, "token_count": token_count, "usage": total_usage })]