fix: 修复重复显示的bug

This commit is contained in:
ViperEkura 2026-04-25 17:15:25 +08:00
parent 3e5c76cd83
commit 9b5766ba65
3 changed files with 47 additions and 38 deletions

View File

@ -96,20 +96,17 @@ class AgenticLoop:
need_new_text = result["text"] and ctx.current_step_type != StepType.TEXT need_new_text = result["text"] and ctx.current_step_type != StepType.TEXT
if result["thinking"]: if result["thinking"]:
ctx.full_thinking += result["thinking"]
if need_new_thinking: if need_new_thinking:
ctx.start_step(StepType.THINKING) ctx.start_step(StepType.THINKING)
ctx.full_thinking += result["thinking"]
events.append(StreamRenderer.render_thinking(ctx)) events.append(StreamRenderer.render_thinking(ctx))
if result["text"]: if result["text"]:
ctx.full_content += result["text"]
if need_new_text: if need_new_text:
ctx.start_step(StepType.TEXT) ctx.start_step(StepType.TEXT)
ctx.full_content += result["text"]
events.append(StreamRenderer.render_text(ctx)) events.append(StreamRenderer.render_text(ctx))
ctx._thinking_buf = ""
ctx._text_buf = ""
if delta.has_tool_call(): if delta.has_tool_call():
ctx.accumulate_tool_call(delta.tool_call) ctx.accumulate_tool_call(delta.tool_call)
@ -155,12 +152,14 @@ class AgenticLoop:
if thinking: if thinking:
ctx.full_thinking += thinking ctx.full_thinking += thinking
ctx.start_step(StepType.THINKING) if ctx.current_step_type != StepType.THINKING:
ctx.start_step(StepType.THINKING)
events.append(StreamRenderer.render_thinking(ctx)) events.append(StreamRenderer.render_thinking(ctx))
ctx.finalize_step() ctx.finalize_step()
if text: if text:
ctx.full_content += text ctx.full_content += text
ctx.start_step(StepType.TEXT) if ctx.current_step_type != StepType.TEXT:
ctx.start_step(StepType.TEXT)
events.append(StreamRenderer.render_text(ctx)) events.append(StreamRenderer.render_text(ctx))
ctx.finalize_step() ctx.finalize_step()

View File

@ -88,6 +88,9 @@ class StreamState:
self._in_thinking = False self._in_thinking = False
self._thinking_buf = "" self._thinking_buf = ""
self._text_buf = "" self._text_buf = ""
# Track content offset per step to avoid duplication across steps
self._text_offset = 0
self._thinking_offset = 0
def process_content(self, content: str) -> Dict: def process_content(self, content: str) -> Dict:
"""Process raw content, handling thinking tags.""" """Process raw content, handling thinking tags."""
@ -103,43 +106,45 @@ class StreamState:
if not self._in_thinking and THINK_START in content: if not self._in_thinking and THINK_START in content:
self._in_thinking = True self._in_thinking = True
idx = content.find(THINK_START) idx = content.find(THINK_START)
# Any text before THINK_START goes to text buffer # Any text before THINK_START is emitted immediately
if idx > 0: if idx > 0:
self._text_buf += content[:idx] text += content[:idx]
content = content[idx + len(THINK_START):] content = content[idx + len(THINK_START):]
# Handle THINK_END - can appear anywhere # Handle THINK_END - can appear anywhere
if THINK_END in content: if THINK_END in content:
idx = content.find(THINK_END) idx = content.find(THINK_END)
thinking_content = content[:idx] thinking_content = content[:idx]
self._thinking_buf += thinking_content
content = content[idx + len(THINK_END):] content = content[idx + len(THINK_END):]
self._in_thinking = False self._in_thinking = False
should_emit = True should_emit = True
thinking = thinking_content
# Handle any remaining text after THINK_END (may have more thinking tags) # Handle any remaining text after THINK_END
while THINK_END in content: while THINK_END in content:
second_idx = content.find(THINK_END) second_idx = content.find(THINK_END)
# Text between THINK_END and next THINK_END text += content[:second_idx]
self._text_buf += content[:second_idx]
content = content[second_idx + len(THINK_END):] content = content[second_idx + len(THINK_END):]
# Any remaining content after last THINK_END is text # Any remaining content after last THINK_END is text
if content: if content:
self._text_buf += content text += content
thinking_only = not bool(self._text_buf) thinking_only = not bool(text)
elif self._in_thinking: elif self._in_thinking:
# In thinking mode, accumulate # In thinking mode, stream content directly
self._thinking_buf += content thinking = content
should_emit = True
thinking_only = True
else: else:
# Not in thinking mode, accumulate as text # Not in thinking mode, emit text immediately for streaming
self._text_buf += content text = content
should_emit = True
if should_emit: # Emit text that appeared before THINK_START
thinking = self._thinking_buf if text and not should_emit:
text = self._text_buf should_emit = True
return { return {
"thinking": thinking, "thinking": thinking,
@ -164,6 +169,11 @@ class StreamState:
self.current_step_idx = self.step_index self.current_step_idx = self.step_index
self.current_step_id = f"step-{self.step_index}" self.current_step_id = f"step-{self.step_index}"
self.current_step_type = step_type self.current_step_type = step_type
# Record content offset so this step only includes content added during it
if step_type == StepType.TEXT:
self._text_offset = len(self.full_content)
elif step_type == StepType.THINKING:
self._thinking_offset = len(self.full_thinking)
self.step_index += 1 self.step_index += 1
return self.current_step_id return self.current_step_id
@ -171,7 +181,12 @@ class StreamState:
"""Finalize the current step and add to all_steps""" """Finalize the current step and add to all_steps"""
if self.current_step_id is None: if self.current_step_id is None:
return return
content = self.full_content if self.current_step_type == StepType.TEXT else self.full_thinking if self.current_step_type == StepType.TEXT:
content = self.full_content[self._text_offset:]
elif self.current_step_type == StepType.THINKING:
content = self.full_thinking[self._thinking_offset:]
else:
content = ""
step = Step( step = Step(
id=self.current_step_id, id=self.current_step_id,
index=self.current_step_idx, index=self.current_step_idx,
@ -279,7 +294,7 @@ class StreamRenderer:
id=state.current_step_id, id=state.current_step_id,
index=state.current_step_idx, index=state.current_step_idx,
type=StepType.THINKING, type=StepType.THINKING,
content=state.full_thinking content=state.full_thinking[state._thinking_offset:]
) )
return sse_event("process_step", {"step": step.to_dict()}) return sse_event("process_step", {"step": step.to_dict()})
@ -290,7 +305,7 @@ class StreamRenderer:
id=state.current_step_id, id=state.current_step_id,
index=state.current_step_idx, index=state.current_step_idx,
type=StepType.TEXT, type=StepType.TEXT,
content=state.full_content content=state.full_content[state._text_offset:]
) )
return sse_event("process_step", {"step": step.to_dict()}) return sse_event("process_step", {"step": step.to_dict()})

View File

@ -169,6 +169,7 @@ class ToolExecutor:
from concurrent.futures import ThreadPoolExecutor, as_completed from concurrent.futures import ThreadPoolExecutor, as_completed
futures = {} futures = {}
cached_results = []
with ThreadPoolExecutor(max_workers=self.max_workers) as executor: with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
for call in tool_calls: for call in tool_calls:
@ -181,7 +182,8 @@ class ToolExecutor:
cached = self.cache.get(cache_key) cached = self.cache.get(cache_key)
if cached is not None: if cached is not None:
futures[call_id] = (name, args, cached) self.history.record(name, args, cached)
cached_results.append(self._create_tool_result(call_id, name, cached))
else: else:
# Submit task # Submit task
future = executor.submit( future = executor.submit(
@ -189,19 +191,12 @@ class ToolExecutor:
) )
futures[future] = (call_id, name, args, cache_key) futures[future] = (call_id, name, args, cache_key)
results = [] results = list(cached_results)
for future in as_completed(futures.keys()): for future in as_completed(futures):
item = futures[future] call_id, name, args, cache_key = futures[future]
if len(item) == 3: result = future.result()
call_id, name, args = item self.cache.set(cache_key, result)
cache_key = self.cache.make_key(name, args, tool_ctx.workspace)
result = item[2]
else:
call_id, name, args, cache_key = item
result = future.result()
self.cache.set(cache_key, result)
self.history.record(name, args, result) self.history.record(name, args, result)
results.append(self._create_tool_result(call_id, name, result)) results.append(self._create_tool_result(call_id, name, result))