Luxx/luxx/services/agentic_loop.py

222 lines
7.5 KiB
Python

"""AgenticLoop - Executes the Agentic Loop: LLM + Tools iteration.
The loop:
1. Call LLM with messages and tools
2. Check for tool calls in response
3. Execute tools in parallel
4. Add results to messages
5. Repeat (max 10 iterations)
6. Return final response
"""
import uuid
import logging
from typing import List, Dict, AsyncGenerator
from luxx.tools.executor import ToolExecutor
from luxx.services.llm_client import LLMClient
from luxx.services.stream_context import StreamContext, _sse_event
from luxx.services.process_result import ProcessResult
from luxx.services.llm_response import ParsedDelta
logger = logging.getLogger(__name__)
# Maximum iterations to prevent infinite loops
MAX_ITERATIONS = 10
class AgenticLoop:
"""Executes the Agentic Loop: LLM + Tools iteration.
Supports multiple LLM Providers, auto-adapts response format.
"""
def __init__(self, tool_executor: ToolExecutor):
self.tool_executor = tool_executor
async def execute(
self,
llm: LLMClient,
model: str,
messages: List[Dict],
tools: list,
temperature: float,
max_tokens: int,
thinking_enabled: bool,
context: 'StreamContext',
tool_context: dict = None
) -> AsyncGenerator[str, None]:
"""Execute the agentic loop.
Yields SSE events for each step.
"""
total_usage = {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0}
for iteration in range(MAX_ITERATIONS):
context.reset()
has_error = False
# Stream LLM response - now yields ParsedDelta directly
async for delta in llm.stream_call(
model=model,
messages=messages,
tools=tools,
temperature=temperature,
max_tokens=max_tokens,
thinking_enabled=thinking_enabled
):
# Process parsed delta
result = self._process_delta(delta, context, total_usage)
# Yield events
for event in result.events:
yield event
# Check for errors
if result.has_error:
has_error = True
break
# If error occurred, break the loop
if has_error:
break
# Finalize current step
context.finalize_step()
# Check for tool calls
if context.tool_calls_list:
# Execute tools and yield events
for event in self._execute_tools(context, messages, tool_context):
yield event
continue
# No tools - complete
for event in self._complete(context, total_usage):
yield event
return
# Max iterations exceeded or error occurred
if not has_error:
yield _sse_event("error", {"content": "Exceeded maximum tool call iterations"})
def _process_delta(
self,
delta: ParsedDelta,
ctx: 'StreamContext',
total_usage: dict
) -> ProcessResult:
"""Process ParsedDelta from adapter, return result with events and flags.
Args:
delta: ParsedDelta from LLM adapter
ctx: StreamContext for state management
total_usage: Accumulated token usage
Returns:
ProcessResult with events and flags
"""
result = ProcessResult()
# Check for error (empty delta with no content)
if not delta.has_content() and not delta.is_complete:
# Empty delta, possibly an error
return result
# Update usage
if delta.usage:
total_usage.update({
"prompt_tokens": delta.usage.get("prompt_tokens", 0),
"completion_tokens": delta.usage.get("completion_tokens", 0),
"total_tokens": delta.usage.get("total_tokens", 0)
})
# Process thinking content (incremental)
if delta.thinking:
logger.debug(f"Processing thinking: {delta.thinking[:50]}...")
ctx.full_thinking += delta.thinking # Accumulate incremental content
if not ctx.current_step_id or ctx.current_step_type != "thinking":
ctx.start_step("thinking")
result.add_event(_sse_event("process_step", {
"step": {
"id": ctx.current_step_id,
"index": ctx.current_step_idx,
"type": "thinking",
"content": ctx.full_thinking
}
}))
result.set_content()
# Process text content (incremental)
if delta.text:
ctx.full_content += delta.text # Accumulate incremental content
if not ctx.current_step_id or ctx.current_step_type != "text":
ctx.start_step("text")
result.add_event(_sse_event("process_step", {
"step": {
"id": ctx.current_step_id,
"index": ctx.current_step_idx,
"type": "text",
"content": ctx.full_content
}
}))
result.set_content()
# Process tool calls
if delta.tool_calls:
for tc in delta.tool_calls:
ctx.accumulate_tool_call(tc)
result.set_tool_calls()
return result
def _execute_tools(self, ctx: 'StreamContext', messages: list,
tool_context: dict = None) -> List[str]:
"""Execute tools and return list of events."""
events = []
# Emit tool call steps
for event in ctx.emit_tool_calls():
events.append(event)
# Execute in parallel
tool_results = self.tool_executor.process_tool_calls_parallel(
ctx.tool_calls_list, tool_context or {}
)
# Get tool call IDs for result linking
tool_ids = [tc.get("id") for tc in ctx.tool_calls_list]
tool_step_ids = [
s["id"] for s in ctx.all_steps
if s["type"] == "tool_call" and s.get("id_ref") in tool_ids
]
# Emit tool result steps
for i, (tr, tc) in enumerate(zip(tool_results, ctx.tool_calls_list)):
ref_id = tool_step_ids[i] if i < len(tool_step_ids) else f"step-{len(ctx.all_steps) - len(tool_results) + i}"
_, event = ctx.emit_tool_result(tr, ref_id)
events.append(event)
# Prepare for next iteration
messages.append({
"role": "assistant",
"content": ctx.full_content or "",
"tool_calls": ctx.tool_calls_list
})
messages.extend(ctx.all_tool_results[-len(tool_results):])
return events
def _complete(self, ctx: 'StreamContext', total_usage: dict) -> List[str]:
"""Complete the loop and return list of events."""
token_count = total_usage.get("completion_tokens") or len(ctx.full_content) // 4
msg_id = str(uuid.uuid4())
logger.info(f"[TOKEN] usage={total_usage}, count={token_count}")
ctx.set_completion(msg_id, token_count, total_usage)
return [_sse_event("done", {
"message_id": msg_id,
"token_count": token_count,
"usage": total_usage
})]