280 lines
12 KiB
Python
280 lines
12 KiB
Python
"""AgenticLoop - Executes the Agentic Loop: LLM + Tools iteration.
|
|
|
|
This module follows the Single Responsibility Principle.
|
|
"""
|
|
import json
|
|
import uuid
|
|
import logging
|
|
from typing import List, Dict, AsyncGenerator
|
|
|
|
from luxx.tools.executor import ToolExecutor
|
|
from luxx.services.llm_client import LLMClient
|
|
from luxx.services.stream_context import StreamState, StreamRenderer, StepType
|
|
from luxx.services.llm_response import ParsedDelta
|
|
from luxx.services.events import sse_event
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
MAX_ITERATIONS = 10
|
|
|
|
|
|
class AgenticLoop:
|
|
"""Executes the agentic loop (LLM + Tools iteration)"""
|
|
|
|
def __init__(self, tool_executor: ToolExecutor):
|
|
self.tool_executor = tool_executor
|
|
self._reasoning_content = "" # DeepSeek thinking mode - persists across iterations
|
|
|
|
async def execute(
|
|
self,
|
|
llm: LLMClient,
|
|
model: str,
|
|
messages: List[Dict],
|
|
tools: list,
|
|
temperature: float,
|
|
max_tokens: int,
|
|
thinking_enabled: bool,
|
|
context: StreamState,
|
|
tool_context: dict = None
|
|
) -> AsyncGenerator[str, None]:
|
|
total_usage = {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0}
|
|
|
|
# Reset reasoning_content for each execution (DeepSeek thinking mode)
|
|
self._reasoning_content = ""
|
|
|
|
for iteration in range(MAX_ITERATIONS):
|
|
# Per-iteration reset, keep previous steps and tool results
|
|
context.reset(full_reset=False)
|
|
|
|
# BUG FIX: DeepSeek thinking mode requires reasoning_content to be passed back.
|
|
# Without this, DeepSeek returns:
|
|
# "The `reasoning_content` in the thinking mode must be passed back to the API."
|
|
# We find the last assistant message and add reasoning_content from the previous
|
|
# iteration's accumulated thinking content.
|
|
if iteration > 0 and self._reasoning_content:
|
|
for i in range(len(messages) - 1, -1, -1):
|
|
if messages[i].get("role") == "assistant":
|
|
messages[i]["reasoning_content"] = self._reasoning_content
|
|
break
|
|
|
|
async for delta in llm.stream_call(
|
|
model=model,
|
|
messages=messages,
|
|
tools=tools if iteration == 0 else None, # Only send tools on first iteration
|
|
temperature=temperature,
|
|
max_tokens=max_tokens,
|
|
thinking_enabled=thinking_enabled
|
|
):
|
|
# Handle error delta - propagate as SSE error event
|
|
if delta.has_error():
|
|
logger.error(f"[AGENTIC_LOOP] Stream error: {delta.error_msg}")
|
|
yield sse_event("error", {"content": delta.error_msg})
|
|
return
|
|
|
|
# Accumulate reasoning_content for DeepSeek thinking mode
|
|
if delta.has_thinking():
|
|
self._reasoning_content += delta.thinking
|
|
|
|
events = self._process_delta(delta, context, total_usage)
|
|
for event in events:
|
|
yield event
|
|
|
|
# Empty delta without complete signal - skip and continue
|
|
if not delta.has_content() and not delta.is_complete:
|
|
continue
|
|
|
|
# No error flag needed - rely on is_complete check below
|
|
|
|
if delta.is_complete:
|
|
for event in self._flush_remaining(context):
|
|
yield event
|
|
|
|
context.finalize_step()
|
|
|
|
if context.tool_calls_list:
|
|
for event in self._execute_tools(context, messages, tool_context):
|
|
yield event
|
|
continue
|
|
|
|
for event in self._complete(context, total_usage):
|
|
yield event
|
|
return
|
|
|
|
# Exceeded max iterations
|
|
yield sse_event("error", {"content": "Exceeded maximum tool call iterations"})
|
|
|
|
def _process_delta(self, delta: ParsedDelta, ctx: StreamState, total_usage: dict) -> List[str]:
|
|
"""Process a single delta from the LLM stream"""
|
|
events = []
|
|
|
|
if delta.usage:
|
|
total_usage.update({
|
|
"prompt_tokens": delta.usage.get("prompt_tokens", 0),
|
|
"completion_tokens": delta.usage.get("completion_tokens", 0),
|
|
"total_tokens": delta.usage.get("total_tokens", 0)
|
|
})
|
|
|
|
# BUG FIX: Handle DeepSeek reasoning_content (delta.thinking).
|
|
# DeepSeek sends reasoning_content in separate deltas from normal content.
|
|
# These must be rendered as thinking steps AND accumulated into
|
|
# self._reasoning_content for subsequent API calls (line 71-72).
|
|
# Without this, DeepSeek returns 400:
|
|
# "The `reasoning_content` in the thinking mode must be passed back to the API."
|
|
if delta.thinking:
|
|
if ctx.current_step_type != StepType.THINKING:
|
|
ctx.start_step(StepType.THINKING)
|
|
ctx.full_thinking += delta.thinking
|
|
events.append(StreamRenderer.render_thinking(ctx))
|
|
ctx._thinking_offset = len(ctx.full_thinking)
|
|
|
|
if delta.content:
|
|
result = ctx.process_content(delta.content)
|
|
if result["should_emit"]:
|
|
# Track if we need new step
|
|
need_new_thinking = result["thinking"] and ctx.current_step_type != StepType.THINKING
|
|
need_new_text = result["text"] and ctx.current_step_type != StepType.TEXT
|
|
|
|
if result["thinking"]:
|
|
if need_new_thinking:
|
|
ctx.start_step(StepType.THINKING)
|
|
ctx.full_thinking += result["thinking"]
|
|
events.append(StreamRenderer.render_thinking(ctx))
|
|
# Advance offset to avoid resending accumulated content on next delta
|
|
ctx._thinking_offset = len(ctx.full_thinking)
|
|
|
|
if result["text"]:
|
|
if need_new_text:
|
|
ctx.start_step(StepType.TEXT)
|
|
ctx.full_content += result["text"]
|
|
events.append(StreamRenderer.render_text(ctx))
|
|
# Advance offset to avoid resending accumulated content on next delta
|
|
ctx._text_offset = len(ctx.full_content)
|
|
|
|
if delta.has_tool_call():
|
|
ctx.accumulate_tool_call(delta.tool_call)
|
|
|
|
return events
|
|
|
|
def _execute_tools(self, ctx: StreamState, messages: list, tool_context: dict = None) -> List[str]:
|
|
"""Execute tools and add results to messages"""
|
|
events = []
|
|
|
|
# Check if there's an active streaming step that needs to be finalized
|
|
# before tool execution (e.g. text step started but not finalized)
|
|
if ctx.current_step_id is not None:
|
|
ctx.finalize_step()
|
|
|
|
for event in StreamRenderer.render_tool_calls(ctx):
|
|
events.append(event)
|
|
|
|
# Execute tools and handle any exceptions
|
|
try:
|
|
tool_results = self.tool_executor.process_tool_calls_parallel(
|
|
ctx.tool_calls_list, tool_context or {}
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"[EXECUTE_TOOLS] Tool execution failed: {type(e).__name__}: {e}")
|
|
# Generate error results for each tool call
|
|
tool_results = []
|
|
for tc in ctx.tool_calls_list:
|
|
tool_results.append({
|
|
"tool_call_id": tc.get("id", ""),
|
|
"role": "tool",
|
|
"name": tc.get("function", {}).get("name", "unknown"),
|
|
"content": json.dumps({"success": False, "error": f"Tool execution failed: {type(e).__name__}: {str(e)}"})
|
|
})
|
|
|
|
# Build mapping from LLM tool_call_id to step id
|
|
# Use index-based matching as fallback when id is empty
|
|
tool_call_steps = [s for s in ctx.all_steps if s.type == StepType.TOOL_CALL]
|
|
logger.debug(f"[EXECUTE_TOOLS] tool_call_steps: {[(s.id, s.name, s.id_ref) for s in tool_call_steps]}")
|
|
logger.debug(f"[EXECUTE_TOOLS] tool_calls_list: {[(tc.get('id'), tc.get('function', {}).get('name')) for tc in ctx.tool_calls_list]}")
|
|
|
|
for i, (tr, tc) in enumerate(zip(tool_results, ctx.tool_calls_list)):
|
|
# Find the corresponding tool_call step
|
|
tc_id = tc.get("id", "")
|
|
ref_id = None
|
|
|
|
# First try to match by LLM tool_call_id
|
|
if tc_id:
|
|
for s in tool_call_steps:
|
|
if s.id_ref == tc_id:
|
|
ref_id = s.id
|
|
logger.debug(f"[EXECUTE_TOOLS] Matched by id: tc_id={tc_id} -> step_id={ref_id}")
|
|
break
|
|
|
|
# Fallback: use index-based matching (most reliable for parallel execution)
|
|
if ref_id is None and i < len(tool_call_steps):
|
|
ref_id = tool_call_steps[i].id
|
|
logger.debug(f"[EXECUTE_TOOLS] Matched by index: i={i} -> step_id={ref_id}")
|
|
|
|
# Last resort: generate a step id
|
|
if ref_id is None:
|
|
ref_id = f"step-{len(ctx.all_steps) - len(tool_results) + i}"
|
|
logger.warning(f"[EXECUTE_TOOLS] Could not match tool result, generated ref_id: {ref_id}")
|
|
|
|
_, event = StreamRenderer.render_tool_result(ctx, tr, ref_id)
|
|
events.append(event)
|
|
|
|
# When tool_calls exist, content should be null (OpenAI API spec)
|
|
assistant_msg = {
|
|
"role": "assistant",
|
|
"tool_calls": ctx.tool_calls_list
|
|
}
|
|
# When tool_calls exist, content must be null (don't include text before tool calls)
|
|
if not ctx.tool_calls_list and ctx.full_content and not ctx.full_content.isspace():
|
|
assistant_msg["content"] = ctx.full_content
|
|
# If tool_calls exist, omit content field entirely (or set to null)
|
|
|
|
# BUG FIX: Include reasoning_content in assistant message for DeepSeek.
|
|
# DeepSeek's thinking mode REQUIRES reasoning_content to be echoed back
|
|
# to the API in every subsequent request. Without this, the API returns:
|
|
# HTTP 400: "The `reasoning_content` in the thinking mode must be
|
|
# passed back to the API."
|
|
# This field is accumulated from delta.thinking in the stream loop above.
|
|
if self._reasoning_content:
|
|
assistant_msg["reasoning_content"] = self._reasoning_content
|
|
|
|
messages.append(assistant_msg)
|
|
messages.extend(ctx.all_tool_results[-len(tool_results):])
|
|
|
|
return events
|
|
|
|
def _flush_remaining(self, ctx: StreamState) -> List[str]:
|
|
"""Flush remaining buffers on complete"""
|
|
events = []
|
|
# Use current buffers (not flushed by process_content if no </think>)
|
|
thinking = ctx._thinking_buf
|
|
text = ctx._text_buf
|
|
|
|
if thinking:
|
|
ctx.full_thinking += thinking
|
|
if ctx.current_step_type != StepType.THINKING:
|
|
ctx.start_step(StepType.THINKING)
|
|
events.append(StreamRenderer.render_thinking(ctx))
|
|
ctx.finalize_step()
|
|
if text:
|
|
ctx.full_content += text
|
|
if ctx.current_step_type != StepType.TEXT:
|
|
ctx.start_step(StepType.TEXT)
|
|
events.append(StreamRenderer.render_text(ctx))
|
|
ctx.finalize_step()
|
|
|
|
ctx._thinking_buf = ""
|
|
ctx._text_buf = ""
|
|
return events
|
|
|
|
def _complete(self, ctx: StreamState, total_usage: dict) -> List[str]:
|
|
"""Signal completion of the agentic loop"""
|
|
token_count = total_usage.get("completion_tokens") or len(ctx.full_content) // 4
|
|
msg_id = str(uuid.uuid4())
|
|
logger.info(f"[TOKEN] usage={total_usage}, count={token_count}")
|
|
|
|
ctx.set_completion(msg_id, token_count, total_usage)
|
|
|
|
return [sse_event("done", {
|
|
"message_id": msg_id,
|
|
"token_count": token_count,
|
|
"usage": total_usage
|
|
})]
|