diff --git a/luxx/services/agentic_loop.py b/luxx/services/agentic_loop.py index 324a888..e4e02f4 100644 --- a/luxx/services/agentic_loop.py +++ b/luxx/services/agentic_loop.py @@ -96,20 +96,17 @@ class AgenticLoop: need_new_text = result["text"] and ctx.current_step_type != StepType.TEXT if result["thinking"]: - ctx.full_thinking += result["thinking"] if need_new_thinking: ctx.start_step(StepType.THINKING) + ctx.full_thinking += result["thinking"] events.append(StreamRenderer.render_thinking(ctx)) if result["text"]: - ctx.full_content += result["text"] if need_new_text: ctx.start_step(StepType.TEXT) + ctx.full_content += result["text"] events.append(StreamRenderer.render_text(ctx)) - ctx._thinking_buf = "" - ctx._text_buf = "" - if delta.has_tool_call(): ctx.accumulate_tool_call(delta.tool_call) @@ -155,12 +152,14 @@ class AgenticLoop: if thinking: ctx.full_thinking += thinking - ctx.start_step(StepType.THINKING) + if ctx.current_step_type != StepType.THINKING: + ctx.start_step(StepType.THINKING) events.append(StreamRenderer.render_thinking(ctx)) ctx.finalize_step() if text: ctx.full_content += text - ctx.start_step(StepType.TEXT) + if ctx.current_step_type != StepType.TEXT: + ctx.start_step(StepType.TEXT) events.append(StreamRenderer.render_text(ctx)) ctx.finalize_step() diff --git a/luxx/services/stream_context.py b/luxx/services/stream_context.py index ffee77d..1343204 100644 --- a/luxx/services/stream_context.py +++ b/luxx/services/stream_context.py @@ -88,6 +88,9 @@ class StreamState: self._in_thinking = False self._thinking_buf = "" self._text_buf = "" + # Track content offset per step to avoid duplication across steps + self._text_offset = 0 + self._thinking_offset = 0 def process_content(self, content: str) -> Dict: """Process raw content, handling thinking tags.""" @@ -103,43 +106,45 @@ class StreamState: if not self._in_thinking and THINK_START in content: self._in_thinking = True idx = content.find(THINK_START) - # Any text before THINK_START goes to text buffer + # Any text before THINK_START is emitted immediately if idx > 0: - self._text_buf += content[:idx] + text += content[:idx] content = content[idx + len(THINK_START):] # Handle THINK_END - can appear anywhere if THINK_END in content: idx = content.find(THINK_END) thinking_content = content[:idx] - self._thinking_buf += thinking_content content = content[idx + len(THINK_END):] self._in_thinking = False should_emit = True + thinking = thinking_content - # Handle any remaining text after THINK_END (may have more thinking tags) + # Handle any remaining text after THINK_END while THINK_END in content: second_idx = content.find(THINK_END) - # Text between THINK_END and next THINK_END - self._text_buf += content[:second_idx] + text += content[:second_idx] content = content[second_idx + len(THINK_END):] # Any remaining content after last THINK_END is text if content: - self._text_buf += content + text += content - thinking_only = not bool(self._text_buf) + thinking_only = not bool(text) elif self._in_thinking: - # In thinking mode, accumulate - self._thinking_buf += content + # In thinking mode, stream content directly + thinking = content + should_emit = True + thinking_only = True else: - # Not in thinking mode, accumulate as text - self._text_buf += content + # Not in thinking mode, emit text immediately for streaming + text = content + should_emit = True - if should_emit: - thinking = self._thinking_buf - text = self._text_buf + # Emit text that appeared before THINK_START + if text and not should_emit: + should_emit = True return { "thinking": thinking, @@ -164,6 +169,11 @@ class StreamState: self.current_step_idx = self.step_index self.current_step_id = f"step-{self.step_index}" self.current_step_type = step_type + # Record content offset so this step only includes content added during it + if step_type == StepType.TEXT: + self._text_offset = len(self.full_content) + elif step_type == StepType.THINKING: + self._thinking_offset = len(self.full_thinking) self.step_index += 1 return self.current_step_id @@ -171,7 +181,12 @@ class StreamState: """Finalize the current step and add to all_steps""" if self.current_step_id is None: return - content = self.full_content if self.current_step_type == StepType.TEXT else self.full_thinking + if self.current_step_type == StepType.TEXT: + content = self.full_content[self._text_offset:] + elif self.current_step_type == StepType.THINKING: + content = self.full_thinking[self._thinking_offset:] + else: + content = "" step = Step( id=self.current_step_id, index=self.current_step_idx, @@ -279,7 +294,7 @@ class StreamRenderer: id=state.current_step_id, index=state.current_step_idx, type=StepType.THINKING, - content=state.full_thinking + content=state.full_thinking[state._thinking_offset:] ) return sse_event("process_step", {"step": step.to_dict()}) @@ -290,7 +305,7 @@ class StreamRenderer: id=state.current_step_id, index=state.current_step_idx, type=StepType.TEXT, - content=state.full_content + content=state.full_content[state._text_offset:] ) return sse_event("process_step", {"step": step.to_dict()}) diff --git a/luxx/tools/executor.py b/luxx/tools/executor.py index cd56ecd..2bd3c51 100644 --- a/luxx/tools/executor.py +++ b/luxx/tools/executor.py @@ -169,6 +169,7 @@ class ToolExecutor: from concurrent.futures import ThreadPoolExecutor, as_completed futures = {} + cached_results = [] with ThreadPoolExecutor(max_workers=self.max_workers) as executor: for call in tool_calls: @@ -181,7 +182,8 @@ class ToolExecutor: cached = self.cache.get(cache_key) if cached is not None: - futures[call_id] = (name, args, cached) + self.history.record(name, args, cached) + cached_results.append(self._create_tool_result(call_id, name, cached)) else: # Submit task future = executor.submit( @@ -189,19 +191,12 @@ class ToolExecutor: ) futures[future] = (call_id, name, args, cache_key) - results = [] + results = list(cached_results) - for future in as_completed(futures.keys()): - item = futures[future] - if len(item) == 3: - call_id, name, args = item - cache_key = self.cache.make_key(name, args, tool_ctx.workspace) - result = item[2] - else: - call_id, name, args, cache_key = item - result = future.result() - self.cache.set(cache_key, result) - + for future in as_completed(futures): + call_id, name, args, cache_key = futures[future] + result = future.result() + self.cache.set(cache_key, result) self.history.record(name, args, result) results.append(self._create_tool_result(call_id, name, result))