Luxx/luxx/services/agentic_loop.py

204 lines
7.6 KiB
Python

"""AgenticLoop - Executes the Agentic Loop: LLM + Tools iteration.
This module follows the Single Responsibility Principle.
"""
import uuid
import logging
from typing import List, Dict, AsyncGenerator
from luxx.tools.executor import ToolExecutor
from luxx.services.llm_client import LLMClient
from luxx.services.stream_context import StreamState, StreamRenderer, StepType
from luxx.services.llm_response import ParsedDelta
from luxx.services.events import sse_event
logger = logging.getLogger(__name__)
MAX_ITERATIONS = 10
class AgenticLoop:
"""Executes the agentic loop (LLM + Tools iteration)"""
def __init__(self, tool_executor: ToolExecutor):
self.tool_executor = tool_executor
async def execute(
self,
llm: LLMClient,
model: str,
messages: List[Dict],
tools: list,
temperature: float,
max_tokens: int,
thinking_enabled: bool,
context: StreamState,
tool_context: dict = None
) -> AsyncGenerator[str, None]:
total_usage = {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0}
for iteration in range(MAX_ITERATIONS):
# Per-iteration reset, keep previous steps and tool results
context.reset(full_reset=False)
async for delta in llm.stream_call(
model=model,
messages=messages,
tools=tools,
temperature=temperature,
max_tokens=max_tokens,
thinking_enabled=thinking_enabled
):
events = self._process_delta(delta, context, total_usage)
for event in events:
yield event
# Empty delta without complete signal - skip and continue
if not delta.has_content() and not delta.is_complete:
continue
# No error flag needed - rely on is_complete check below
if delta.is_complete:
for event in self._flush_remaining(context):
yield event
context.finalize_step()
if context.tool_calls_list:
for event in self._execute_tools(context, messages, tool_context):
yield event
continue
for event in self._complete(context, total_usage):
yield event
return
# Exceeded max iterations
yield sse_event("error", {"content": "Exceeded maximum tool call iterations"})
def _process_delta(self, delta: ParsedDelta, ctx: StreamState, total_usage: dict) -> List[str]:
"""Process a single delta from the LLM stream"""
events = []
if delta.usage:
total_usage.update({
"prompt_tokens": delta.usage.get("prompt_tokens", 0),
"completion_tokens": delta.usage.get("completion_tokens", 0),
"total_tokens": delta.usage.get("total_tokens", 0)
})
if delta.content:
result = ctx.process_content(delta.content)
if result["should_emit"]:
# Track if we need new step
need_new_thinking = result["thinking"] and ctx.current_step_type != StepType.THINKING
need_new_text = result["text"] and ctx.current_step_type != StepType.TEXT
if result["thinking"]:
if need_new_thinking:
ctx.start_step(StepType.THINKING)
ctx.full_thinking += result["thinking"]
events.append(StreamRenderer.render_thinking(ctx))
if result["text"]:
if need_new_text:
ctx.start_step(StepType.TEXT)
ctx.full_content += result["text"]
events.append(StreamRenderer.render_text(ctx))
if delta.has_tool_call():
ctx.accumulate_tool_call(delta.tool_call)
return events
def _execute_tools(self, ctx: StreamState, messages: list, tool_context: dict = None) -> List[str]:
"""Execute tools and add results to messages"""
events = []
for event in StreamRenderer.render_tool_calls(ctx):
events.append(event)
tool_results = self.tool_executor.process_tool_calls_parallel(
ctx.tool_calls_list, tool_context or {}
)
# Build mapping from LLM tool_call_id to step id
# Use index-based matching as fallback when id is empty
tool_call_steps = [s for s in ctx.all_steps if s.type == StepType.TOOL_CALL]
logger.debug(f"[EXECUTE_TOOLS] tool_call_steps: {[(s.id, s.name, s.id_ref) for s in tool_call_steps]}")
logger.debug(f"[EXECUTE_TOOLS] tool_calls_list: {[(tc.get('id'), tc.get('function', {}).get('name')) for tc in ctx.tool_calls_list]}")
for i, (tr, tc) in enumerate(zip(tool_results, ctx.tool_calls_list)):
# Find the corresponding tool_call step
tc_id = tc.get("id", "")
ref_id = None
# First try to match by LLM tool_call_id
if tc_id:
for s in tool_call_steps:
if s.id_ref == tc_id:
ref_id = s.id
logger.debug(f"[EXECUTE_TOOLS] Matched by id: tc_id={tc_id} -> step_id={ref_id}")
break
# Fallback: use index-based matching
if ref_id is None and i < len(tool_call_steps):
ref_id = tool_call_steps[i].id
logger.debug(f"[EXECUTE_TOOLS] Matched by index: i={i} -> step_id={ref_id}")
# Last resort: generate a step id
if ref_id is None:
ref_id = f"step-{len(ctx.all_steps) - len(tool_results) + i}"
logger.debug(f"[EXECUTE_TOOLS] Generated ref_id: {ref_id}")
_, event = StreamRenderer.render_tool_result(ctx, tr, ref_id)
events.append(event)
messages.append({
"role": "assistant",
"content": ctx.full_content or "",
"tool_calls": ctx.tool_calls_list
})
messages.extend(ctx.all_tool_results[-len(tool_results):])
return events
def _flush_remaining(self, ctx: StreamState) -> List[str]:
"""Flush remaining buffers on complete"""
events = []
# Use current buffers (not flushed by process_content if no </think>)
thinking = ctx._thinking_buf
text = ctx._text_buf
if thinking:
ctx.full_thinking += thinking
if ctx.current_step_type != StepType.THINKING:
ctx.start_step(StepType.THINKING)
events.append(StreamRenderer.render_thinking(ctx))
ctx.finalize_step()
if text:
ctx.full_content += text
if ctx.current_step_type != StepType.TEXT:
ctx.start_step(StepType.TEXT)
events.append(StreamRenderer.render_text(ctx))
ctx.finalize_step()
ctx._thinking_buf = ""
ctx._text_buf = ""
return events
def _complete(self, ctx: StreamState, total_usage: dict) -> List[str]:
"""Signal completion of the agentic loop"""
token_count = total_usage.get("completion_tokens") or len(ctx.full_content) // 4
msg_id = str(uuid.uuid4())
logger.info(f"[TOKEN] usage={total_usage}, count={token_count}")
ctx.set_completion(msg_id, token_count, total_usage)
return [sse_event("done", {
"message_id": msg_id,
"token_count": token_count,
"usage": total_usage
})]