diff --git a/dashboard/src/components/ProcessBlock.vue b/dashboard/src/components/ProcessBlock.vue index c79dbbd..ab86c82 100644 --- a/dashboard/src/components/ProcessBlock.vue +++ b/dashboard/src/components/ProcessBlock.vue @@ -112,7 +112,16 @@ const allItems = computed(() => { } else if (step.type === 'tool_result') { // 合并 tool_result 到对应的 tool_call const toolId = step.id_ref || step.id - const match = items.findLast(it => it.type === 'tool_call' && it.id === toolId) + // Use find() instead of findLast() for better compatibility + // Search in reverse order to find the most recent matching tool_call + let match = null + for (let j = items.length - 1; j >= 0; j--) { + if (items[j].type === 'tool_call' && items[j].id === toolId) { + match = items[j] + break + } + } + if (match) { let resultContent = step.content || '' let displayContent = resultContent diff --git a/luxx/services/agentic_loop.py b/luxx/services/agentic_loop.py index e4e02f4..efa9359 100644 --- a/luxx/services/agentic_loop.py +++ b/luxx/services/agentic_loop.py @@ -123,14 +123,35 @@ class AgenticLoop: ctx.tool_calls_list, tool_context or {} ) - tool_ids = [tc.get("id") for tc in ctx.tool_calls_list] - tool_step_ids = [ - s.id for s in ctx.all_steps - if s.type == StepType.TOOL_CALL and s.id_ref in tool_ids - ] - + # Build mapping from LLM tool_call_id to step id + # Use index-based matching as fallback when id is empty + tool_call_steps = [s for s in ctx.all_steps if s.type == StepType.TOOL_CALL] + logger.debug(f"[EXECUTE_TOOLS] tool_call_steps: {[(s.id, s.name, s.id_ref) for s in tool_call_steps]}") + logger.debug(f"[EXECUTE_TOOLS] tool_calls_list: {[(tc.get('id'), tc.get('function', {}).get('name')) for tc in ctx.tool_calls_list]}") + for i, (tr, tc) in enumerate(zip(tool_results, ctx.tool_calls_list)): - ref_id = tool_step_ids[i] if i < len(tool_step_ids) else f"step-{len(ctx.all_steps) - len(tool_results) + i}" + # Find the corresponding tool_call step + tc_id = tc.get("id", "") + ref_id = None + + # First try to match by LLM tool_call_id + if tc_id: + for s in tool_call_steps: + if s.id_ref == tc_id: + ref_id = s.id + logger.debug(f"[EXECUTE_TOOLS] Matched by id: tc_id={tc_id} -> step_id={ref_id}") + break + + # Fallback: use index-based matching + if ref_id is None and i < len(tool_call_steps): + ref_id = tool_call_steps[i].id + logger.debug(f"[EXECUTE_TOOLS] Matched by index: i={i} -> step_id={ref_id}") + + # Last resort: generate a step id + if ref_id is None: + ref_id = f"step-{len(ctx.all_steps) - len(tool_results) + i}" + logger.debug(f"[EXECUTE_TOOLS] Generated ref_id: {ref_id}") + _, event = StreamRenderer.render_tool_result(ctx, tr, ref_id) events.append(event) diff --git a/luxx/services/stream_context.py b/luxx/services/stream_context.py index 1343204..2f35eb2 100644 --- a/luxx/services/stream_context.py +++ b/luxx/services/stream_context.py @@ -181,6 +181,14 @@ class StreamState: """Finalize the current step and add to all_steps""" if self.current_step_id is None: return + # TOOL_CALL and TOOL_RESULT steps are handled manually in render methods + # They have their own data (name, arguments, id_ref) that can't be auto-generated + if self.current_step_type in (StepType.TOOL_CALL, StepType.TOOL_RESULT): + # Just reset the state without adding a step + self.current_step_id = None + self.current_step_idx = None + self.current_step_type = None + return if self.current_step_type == StepType.TEXT: content = self.full_content[self._text_offset:] elif self.current_step_type == StepType.THINKING: @@ -208,6 +216,10 @@ class StreamState: "type": "function", "function": {"name": "", "arguments": ""} }) + else: + # Update id if provided (LLM may send id in a later delta) + if tc_delta.get("id"): + self.tool_calls_list[idx]["id"] = tc_delta["id"] func = tc_delta.get("function", {}) if func.get("name"): self.tool_calls_list[idx]["function"]["name"] += func["name"] @@ -238,31 +250,49 @@ class StreamRenderer: @staticmethod def render_tool_calls(state: StreamState) -> List[str]: - """Render tool calls as SSE events""" + """Render tool calls as SSE events + + Note: This manually manages step creation for TOOL_CALL type. + The start_step is called to get a unique step id and index, + but the step is manually created and added to avoid duplication. + """ + import logging + logger = logging.getLogger(__name__) + events = [] for tc in state.tool_calls_list: - # Use start_step to auto-finalize previous and create new step - state.start_step(StepType.TOOL_CALL) + # Get step id and index from start_step + step_id = state.start_step(StepType.TOOL_CALL) step = Step( - id=state.current_step_id, + id=step_id, index=state.current_step_idx, type=StepType.TOOL_CALL, name=tc["function"]["name"], arguments=tc["function"]["arguments"], id_ref=tc.get("id", "") ) - # Append again since start_step finalized previous (if any) + # Manually add the step (finalize_step won't add it for TOOL_CALL type) state.all_steps.append(step) + logger.debug(f"[TOOL_CALL] Created step: id={step.id}, index={step.index}, name={step.name}, id_ref={step.id_ref}") events.append(sse_event("process_step", {"step": step.to_dict()})) + # Reset current step state since we manually handled it + state.current_step_id = None + state.current_step_idx = None + state.current_step_type = None return events @staticmethod def render_tool_result(state: StreamState, result: Dict, ref_step_id: str) -> tuple: - """Render a tool result as SSE event""" - import json + """Render a tool result as SSE event + + Note: This does NOT call start_step because tool_result should be linked + to its corresponding tool_call via id_ref, not create a new independent step. + The index is derived from the tool_call step to maintain proper ordering. + """ + import json + import logging + logger = logging.getLogger(__name__) - # Use start_step to auto-finalize previous and create new step - state.start_step(StepType.TOOL_RESULT) content = result.get("content", "") success = True @@ -273,9 +303,19 @@ class StreamRenderer: except (json.JSONDecodeError, TypeError): pass + # Find the corresponding tool_call step to get its index + tool_call_step = None + for s in state.all_steps: + if s.type == StepType.TOOL_CALL and s.id == ref_step_id: + tool_call_step = s + break + + # Use the same index as the tool_call for proper ordering in frontend + step_index = tool_call_step.index if tool_call_step else state.step_index + step = Step( - id=state.current_step_id, - index=state.current_step_idx, + id=f"result-{ref_step_id}", # Unique id based on tool_call step id + index=step_index, # Same index as tool_call for proper ordering type=StepType.TOOL_RESULT, name=result.get("name", ""), content=content, @@ -285,6 +325,8 @@ class StreamRenderer: state.all_steps.append(step) state.add_tool_result(result) + logger.debug(f"[TOOL_RESULT] Created step: id={step.id}, index={step.index}, id_ref={step.id_ref}, ref_step_id={ref_step_id}") + return step, sse_event("process_step", {"step": step.to_dict()}) @staticmethod diff --git a/luxx/tools/executor.py b/luxx/tools/executor.py index 2bd3c51..0926dfb 100644 --- a/luxx/tools/executor.py +++ b/luxx/tools/executor.py @@ -159,20 +159,26 @@ class ToolExecutor: tool_calls: List[Dict[str, Any]], context: Dict[str, Any] ) -> List[Dict[str, Any]]: - """Process tool calls in parallel""" + """Process tool calls in parallel + + IMPORTANT: Results are returned in the SAME ORDER as input tool_calls, + not in completion order. This ensures proper matching between tool_call + and tool_result steps in the frontend. + """ if len(tool_calls) <= 1: return self.process_tool_calls(tool_calls, context) tool_ctx = self._build_tool_context(context) try: - from concurrent.futures import ThreadPoolExecutor, as_completed + from concurrent.futures import ThreadPoolExecutor - futures = {} - cached_results = [] + # Store futures with their original index to maintain order + futures_with_index = {} + results = [None] * len(tool_calls) with ThreadPoolExecutor(max_workers=self.max_workers) as executor: - for call in tool_calls: + for idx, call in enumerate(tool_calls): call_id = call.get("id", "") name = call.get("function", {}).get("name", "") args = self._parse_arguments(call) @@ -183,24 +189,25 @@ class ToolExecutor: if cached is not None: self.history.record(name, args, cached) - cached_results.append(self._create_tool_result(call_id, name, cached)) + # Store result at the correct index + results[idx] = self._create_tool_result(call_id, name, cached) else: - # Submit task + # Submit task with index future = executor.submit( registry.execute, name, args, context=tool_ctx ) - futures[future] = (call_id, name, args, cache_key) + futures_with_index[future] = (idx, call_id, name, args, cache_key) - results = list(cached_results) - - for future in as_completed(futures): - call_id, name, args, cache_key = futures[future] + # Wait for all futures and store results at correct indices + for future in futures_with_index: + idx, call_id, name, args, cache_key = futures_with_index[future] result = future.result() self.cache.set(cache_key, result) self.history.record(name, args, result) - results.append(self._create_tool_result(call_id, name, result)) - - return results + results[idx] = self._create_tool_result(call_id, name, result) + + # Filter out None values (shouldn't happen, but safety check) + return [r for r in results if r is not None] except ImportError: return self.process_tool_calls(tool_calls, context)