From ce9ee6dd89cca4acdaeddd680962dd4d8d8f0718 Mon Sep 17 00:00:00 2001 From: ViperEkura <3081035982@qq.com> Date: Fri, 24 Apr 2026 09:33:45 +0800 Subject: [PATCH] =?UTF-8?q?refactor:=20=E4=BC=98=E5=8C=96=E6=96=87?= =?UTF-8?q?=E4=BB=B6=E7=BB=93=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- luxx/services/agentic_loop.py | 238 +++++++++++++ luxx/services/chat.py | 605 ++++++++++---------------------- luxx/services/llm_client.py | 3 +- luxx/services/process_result.py | 37 ++ luxx/services/stream_context.py | 185 ++++++++++ 5 files changed, 650 insertions(+), 418 deletions(-) create mode 100644 luxx/services/agentic_loop.py create mode 100644 luxx/services/process_result.py create mode 100644 luxx/services/stream_context.py diff --git a/luxx/services/agentic_loop.py b/luxx/services/agentic_loop.py new file mode 100644 index 0000000..95db111 --- /dev/null +++ b/luxx/services/agentic_loop.py @@ -0,0 +1,238 @@ +"""AgenticLoop - Executes the Agentic Loop: LLM + Tools iteration. + +The loop: +1. Call LLM with messages and tools +2. Check for tool calls in response +3. Execute tools in parallel +4. Add results to messages +5. Repeat (max 10 iterations) +6. Return final response +""" +import json +import uuid +import logging +import traceback +from typing import List, Dict, Any, AsyncGenerator + +from luxx.tools.executor import ToolExecutor +from luxx.services.llm_client import LLMClient +from luxx.services.stream_context import StreamContext, _sse_event +from luxx.services.process_result import ProcessResult + +logger = logging.getLogger(__name__) + +# Maximum iterations to prevent infinite loops +MAX_ITERATIONS = 10 + + +def _parse_sse_line(line: str) -> tuple: + """Parse SSE line into (event_type, data_str).""" + event_type = None + data_str = None + for part in line.strip().split('\n'): + if part.startswith('event: '): + event_type = part[7:].strip() + elif part.startswith('data: '): + data_str = part[6:].strip() + return event_type, data_str + + +class AgenticLoop: + """Executes the Agentic Loop: LLM + Tools iteration.""" + + def __init__(self, tool_executor: ToolExecutor): + self.tool_executor = tool_executor + + async def execute( + self, + llm: LLMClient, + model: str, + messages: List[Dict], + tools: list, + temperature: float, + max_tokens: int, + thinking_enabled: bool, + context: 'StreamContext', + tool_context: dict = None + ) -> AsyncGenerator[str, None]: + """Execute the agentic loop. + + Yields SSE events for each step. + """ + total_usage = {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0} + + for iteration in range(MAX_ITERATIONS): + context.reset() + has_error = False + + # Stream LLM response + async for sse_line in llm.stream_call( + model=model, + messages=messages, + tools=tools, + temperature=temperature, + max_tokens=max_tokens, + thinking_enabled=thinking_enabled + ): + # Process stream line + result = self._process_stream_line(sse_line, context, total_usage) + + # Yield events + for event in result.events: + yield event + + # Check for errors + if result.has_error: + has_error = True + break + + # If error occurred, break the loop + if has_error: + break + + # Finalize current step + context.finalize_step() + + # Check for tool calls + if context.tool_calls_list: + # Execute tools and yield events + for event in self._execute_tools(context, messages, tool_context): + yield event + continue + + # No tools - complete + for event in self._complete(context, total_usage): + yield event + return + + # Max iterations exceeded or error occurred + if not has_error: + yield _sse_event("error", {"content": "Exceeded maximum tool call iterations"}) + + def _process_stream_line(self, sse_line: str, ctx: 'StreamContext', + total_usage: dict) -> ProcessResult: + """Process single SSE line from LLM, return result with events and flags.""" + result = ProcessResult() + event_type, data_str = _parse_sse_line(sse_line) + if not data_str: + return result + + # Handle upstream errors + if event_type == 'error': + try: + error_data = json.loads(data_str) + error_content = error_data.get("content", "Unknown error") + except json.JSONDecodeError: + error_content = data_str + result.set_error(error_content) + result.add_event(_sse_event("error", {"content": error_content})) + return result + + try: + chunk = json.loads(data_str) + except json.JSONDecodeError: + error_msg = f"Parse error: {data_str[:50]}" + result.set_error(error_msg) + result.add_event(_sse_event("error", {"content": error_msg})) + return result + + # Extract usage + if "usage" in chunk and chunk["usage"]: + usage = chunk["usage"] + total_usage.update({ + "prompt_tokens": usage.get("prompt_tokens", 0), + "completion_tokens": usage.get("completion_tokens", 0), + "total_tokens": usage.get("total_tokens", 0) + }) + + # Handle API errors + if "error" in chunk: + error_msg = chunk["error"].get("message", str(chunk["error"])) + result.set_error(error_msg) + result.add_event(_sse_event("error", {"content": f"API Error: {error_msg}"})) + return result + + # Get delta + choices = chunk.get("choices", []) + if not choices: + content = chunk.get("content") or chunk.get("message", {}).get("content", "") + if content: + if not ctx.full_content: + ctx.start_step("text") + ctx.full_content += content + result.set_content() + result.add_event(_sse_event("process_step", { + "step": {"id": ctx.current_step_id, "index": ctx.current_step_idx, "type": "text", "content": ctx.full_content} + })) + return result + + delta = choices[0].get("delta", {}) + + # Process deltas + event = ctx.handle_thinking(delta) + if event: + result.set_content() + result.add_event(event) + + event = ctx.handle_text(delta) + if event: + result.set_content() + result.add_event(event) + + # Accumulate tool calls + for tc in delta.get("tool_calls", []): + ctx.accumulate_tool_call(tc) + result.set_tool_calls() + + return result + + def _execute_tools(self, ctx: 'StreamContext', messages: list, + tool_context: dict = None) -> List[str]: + """Execute tools and return list of events.""" + events = [] + + # Emit tool call steps + for event in ctx.emit_tool_calls(): + events.append(event) + + # Execute in parallel + tool_results = self.tool_executor.process_tool_calls_parallel( + ctx.tool_calls_list, tool_context or {} + ) + + # Get tool call IDs for result linking + tool_ids = [tc.get("id") for tc in ctx.tool_calls_list] + tool_step_ids = [ + s["id"] for s in ctx.all_steps + if s["type"] == "tool_call" and s.get("id_ref") in tool_ids + ] + + # Emit tool result steps + for i, (tr, tc) in enumerate(zip(tool_results, ctx.tool_calls_list)): + ref_id = tool_step_ids[i] if i < len(tool_step_ids) else f"step-{len(ctx.all_steps) - len(tool_results) + i}" + _, event = ctx.emit_tool_result(tr, ref_id) + events.append(event) + + # Prepare for next iteration + messages.append({ + "role": "assistant", + "content": ctx.full_content or "", + "tool_calls": ctx.tool_calls_list + }) + messages.extend(ctx.all_tool_results[-len(tool_results):]) + + return events + + def _complete(self, ctx: 'StreamContext', total_usage: dict) -> List[str]: + """Complete the loop and return list of events.""" + token_count = total_usage.get("completion_tokens") or len(ctx.full_content) // 4 + msg_id = str(uuid.uuid4()) + logger.info(f"[TOKEN] usage={total_usage}, count={token_count}") + + ctx.set_completion(msg_id, token_count, total_usage) + + return [_sse_event("done", { + "message_id": msg_id, + "token_count": token_count, + "usage": total_usage + })] diff --git a/luxx/services/chat.py b/luxx/services/chat.py index a8dc3cc..3d67b72 100644 --- a/luxx/services/chat.py +++ b/luxx/services/chat.py @@ -1,34 +1,99 @@ -"""Chat service module""" -import json -import uuid -import logging -from typing import List, Dict, Any, AsyncGenerator, Optional +"""Chat service module with Agentic Loop pattern. + +This module provides the core chat service that orchestrates: +- StreamContext: Manages streaming state transitions +- MessageBuilder: Constructs message lists +- AgenticLoop: Executes the agentic loop (LLM + tools iteration) +- ChatService: Core chat service facade +""" +import json +import logging +import traceback +import httpx +from typing import List, Dict, Any, AsyncGenerator -from luxx.models import Conversation, Message from luxx.tools.executor import ToolExecutor from luxx.tools.core import registry from luxx.services.llm_client import LLMClient +from luxx.services.stream_context import StreamContext +from luxx.services.agentic_loop import AgenticLoop from luxx.config import config logger = logging.getLogger(__name__) -# Maximum iterations to prevent infinite loops -MAX_ITERATIONS = 10 -def _sse_event(event: str, data: dict) -> str: - """Format a Server-Sent Event string.""" - return f"event: {event}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n" +# ============== MessageBuilder ============== + +class MessageBuilder: + """Builds message lists for LLM requests.""" + + def __init__(self): + self.messages = [] + + def add_system(self, content: str) -> 'MessageBuilder': + """Add system message.""" + self.messages.append({"role": "system", "content": content}) + return self + + def add_user(self, content: str, attachments: list = None) -> 'MessageBuilder': + """Add user message in JSON format.""" + msg_content = json.dumps({ + "text": content, + "attachments": attachments or [] + }, ensure_ascii=False) + self.messages.append({"role": "user", "content": msg_content}) + return self + + def add_assistant(self, content: str, tool_calls: list = None) -> 'MessageBuilder': + """Add assistant message.""" + msg = {"role": "assistant", "content": content} + if tool_calls: + msg["tool_calls"] = tool_calls + self.messages.append(msg) + return self + + def add_tool_result(self, tool_call_id: str, content: str) -> 'MessageBuilder': + """Add tool result message.""" + self.messages.append({ + "role": "tool", + "tool_call_id": tool_call_id, + "content": content + }) + return self + + def build(self) -> List[Dict]: + """Build and return message list.""" + return self.messages.copy() + + @staticmethod + def extract_text(content: str) -> str: + """Extract text from message content (supports JSON format).""" + if not content: + return "" + try: + parsed = json.loads(content) + if isinstance(parsed, dict): + return parsed.get("text", content) + except (json.JSONDecodeError, TypeError): + pass + return content -def get_llm_client(conversation: Conversation = None): - """Get LLM client, optionally using conversation's provider. Returns (client, max_tokens)""" +# ============== Factory Function ============== + +def get_llm_client(conversation=None) -> tuple: + """Get LLM client based on conversation provider. Returns (client, max_tokens)""" + from luxx.models import LLMProvider + from luxx.database import SessionLocal + max_tokens = None + if conversation and conversation.provider_id: - from luxx.models import LLMProvider - from luxx.database import SessionLocal db = SessionLocal() try: - provider = db.query(LLMProvider).filter(LLMProvider.id == conversation.provider_id).first() + provider = db.query(LLMProvider).filter( + LLMProvider.id == conversation.provider_id + ).first() if provider: max_tokens = provider.max_tokens client = LLMClient( @@ -40,184 +105,27 @@ def get_llm_client(conversation: Conversation = None): finally: db.close() - # Fallback to global config - client = LLMClient() - return client, max_tokens + return LLMClient(), max_tokens -class StreamContext: - """Context for streaming response state management.""" - - def __init__( - self, - step_index: int = 0, - current_step_id: str = None, - current_step_idx: int = None, - current_stream_type: str = None, - full_content: str = "", - full_thinking: str = "" - ): - self.step_index = step_index - self.current_step_id = current_step_id - self.current_step_idx = current_step_idx - self.current_stream_type = current_stream_type - self.full_content = full_content - self.full_thinking = full_thinking - self.all_steps = [] - self.all_tool_calls = [] - self.all_tool_results = [] - self.tool_calls_list = [] - - def reset_iteration(self): - """Reset streaming step tracker for new iteration.""" - self.current_step_id = None - self.current_step_idx = None - self.current_stream_type = None - self.full_content = "" - self.full_thinking = "" - self.tool_calls_list = [] - - def start_stream_step(self, step_type: str) -> str: - """Start a new streaming step. Returns the step_id.""" - self.current_step_idx = self.step_index - self.current_step_id = f"step-{self.step_index}" - self.current_stream_type = step_type - self.step_index += 1 - return self.current_step_id - - def yield_stream_step(self, step_type: str, content: str) -> Dict[str, Any]: - """Yield a streaming step event.""" - return _sse_event("process_step", { - "step": { - "id": self.current_step_id, - "index": self.current_step_idx, - "type": step_type, - "content": content - } - }) - - def save_streaming_step(self): - """Save the current streaming step to all_steps.""" - if self.current_step_id is None: - return - - if self.current_stream_type == "thinking": - self.all_steps.append({ - "id": self.current_step_id, - "index": self.current_step_idx, - "type": "thinking", - "content": self.full_thinking - }) - elif self.current_stream_type == "text": - self.all_steps.append({ - "id": self.current_step_id, - "index": self.current_step_idx, - "type": "text", - "content": self.full_content - }) - - def handle_thinking_stream(self, delta: Dict) -> Optional[Dict]: - """Handle reasoning/thinking delta. Returns yield_obj if step was yielded.""" - reasoning = delta.get("reasoning_content", "") - if not reasoning: - return None - - prev_len = len(self.full_thinking) - self.full_thinking += reasoning - - if prev_len == 0: # New thinking stream started - self.start_stream_step("thinking") - - return self.yield_stream_step("thinking", self.full_thinking) - - def handle_text_stream(self, delta: Dict) -> Optional[Dict]: - """Handle content delta. Returns yield_obj if step was yielded.""" - content = delta.get("content", "") - if not content: - return None - - prev_len = len(self.full_content) - self.full_content += content - - if prev_len == 0: # New text stream started - self.start_stream_step("text") - - return self.yield_stream_step("text", self.full_content) - - def handle_tool_call(self) -> tuple: - """Handle tool calls. Returns (tool_call_step_ids, tool_call_steps, yield_objs).""" - tool_call_step_ids = [] - tool_call_steps = [] - yield_objs = [] - - for tc in self.tool_calls_list: - call_step_idx = self.step_index - call_step_id = f"step-{self.step_index}" - tool_call_step_ids.append(call_step_id) - self.step_index += 1 - - call_step = { - "id": call_step_id, - "index": call_step_idx, - "type": "tool_call", - "id_ref": tc.get("id", ""), - "name": tc["function"]["name"], - "arguments": tc["function"]["arguments"] - } - tool_call_steps.append(call_step) - yield_objs.append(_sse_event("process_step", {"step": call_step})) - - return tool_call_step_ids, tool_call_steps, yield_objs - - def handle_tool_result(self, tool_result: Dict, tool_call_step_id: str) -> tuple: - """Handle single tool result. Returns (result_step, yield_obj).""" - result_step_idx = self.step_index - result_step_id = f"step-{self.step_index}" - self.step_index += 1 - - content = tool_result.get("content", "") - success = True - try: - content_obj = json.loads(content) - if isinstance(content_obj, dict): - success = content_obj.get("success", True) - except: - pass - - result_step = { - "id": result_step_id, - "index": result_step_idx, - "type": "tool_result", - "id_ref": tool_call_step_id, - "name": tool_result.get("name", ""), - "content": content, - "success": success - } - return result_step, _sse_event("process_step", {"step": result_step}) - +# ============== ChatService ============== class ChatService: - """Chat service with tool support""" + """Core chat service with Agentic Loop support.""" def __init__(self): self.tool_executor = ToolExecutor() + self.agentic_loop = AgenticLoop(self.tool_executor) - def build_messages( - self, - conversation: Conversation, - include_system: bool = True - ) -> List[Dict[str, str]]: - """Build message list""" + def build_messages(self, conversation, include_system: bool = True) -> List[Dict]: + """Build message list from conversation history.""" from luxx.database import SessionLocal from luxx.models import Message messages = [] if include_system and conversation.system_prompt: - messages.append({ - "role": "system", - "content": conversation.system_prompt - }) + messages.append({"role": "system", "content": conversation.system_prompt}) db = SessionLocal() try: @@ -226,28 +134,23 @@ class ChatService: ).order_by(Message.created_at).all() for msg in db_messages: - # Parse JSON content if possible - try: - content_obj = json.loads(msg.content) if msg.content else {} - if isinstance(content_obj, dict): - content = content_obj.get("text", msg.content) - else: - content = msg.content - except (json.JSONDecodeError, TypeError): - content = msg.content - - messages.append({ - "role": msg.role, - "content": content - }) + content = MessageBuilder.extract_text(msg.content) + messages.append({"role": msg.role, "content": content}) finally: db.close() return messages + def _get_tools(self, enabled_tools: list) -> list: + """Filter tools based on enabled_tools list.""" + if not enabled_tools: + return [] + return [t for t in registry.list_all() + if t.get("function", {}).get("name") in enabled_tools] + async def stream_response( self, - conversation: Conversation, + conversation, user_message: str, thinking_enabled: bool = False, enabled_tools: list = None, @@ -255,256 +158,118 @@ class ChatService: username: str = None, workspace: str = None, user_permission_level: int = 1 - ) -> AsyncGenerator[Dict[str, str], None]: - """ - Streaming response generator - - Yields raw SSE event strings for direct forwarding. - """ + ) -> AsyncGenerator[str, None]: + """Streaming response with Agentic Loop.""" try: + # Build initial messages messages = self.build_messages(conversation) - messages.append({ "role": "user", "content": json.dumps({"text": user_message, "attachments": []}) }) - # Get tools based on enabled_tools filter - if enabled_tools: - tools = [t for t in registry.list_all() if t.get("function", {}).get("name") in enabled_tools] - else: - tools = [] - + # Get tools and LLM client + tools = self._get_tools(enabled_tools) llm, provider_max_tokens = get_llm_client(conversation) model = conversation.model or llm.default_model or "gpt-4" - # 直接使用 provider 的 max_tokens - max_tokens = provider_max_tokens + max_tokens = provider_max_tokens or 8192 - # Token usage tracking - total_usage = { - "prompt_tokens": 0, - "completion_tokens": 0, - "total_tokens": 0 + # Tool execution context + tool_context = { + "workspace": workspace, + "user_id": user_id, + "username": username, + "user_permission_level": user_permission_level } - actual_token_count = 0 - # Streaming context for state management + # Stream context ctx = StreamContext() - for iteration in range(MAX_ITERATIONS): - # Reset streaming context for this iteration - ctx.reset_iteration() - - async for sse_line in llm.stream_call( - model=model, - messages=messages, - tools=tools, - temperature=conversation.temperature, - max_tokens=max_tokens or 8192, - thinking_enabled=thinking_enabled or conversation.thinking_enabled - ): - # Parse SSE line - # Format: "event: xxx\ndata: {...}\n\n" - event_type = None - data_str = None - - for line in sse_line.strip().split('\n'): - if line.startswith('event: '): - event_type = line[7:].strip() - elif line.startswith('data: '): - data_str = line[6:].strip() - - if data_str is None: - continue - - # Handle error events from LLM - if event_type == 'error': - try: - error_data = json.loads(data_str) - yield _sse_event("error", {"content": error_data.get("content", "Unknown error")}) - except json.JSONDecodeError: - yield _sse_event("error", {"content": data_str}) - return - - # Parse the data - try: - chunk = json.loads(data_str) - except json.JSONDecodeError: - yield _sse_event("error", {"content": f"Failed to parse response: {data_str}"}) - return - - # 提取 API 返回的 usage 信息 - if "usage" in chunk: - usage = chunk["usage"] - total_usage["prompt_tokens"] = usage.get("prompt_tokens", 0) - total_usage["completion_tokens"] = usage.get("completion_tokens", 0) - total_usage["total_tokens"] = usage.get("total_tokens", 0) - - # Check for error in response - if "error" in chunk: - error_msg = chunk["error"].get("message", str(chunk["error"])) - yield _sse_event("error", {"content": f"API Error: {error_msg}"}) - return - - # Get delta - choices = chunk.get("choices", []) - if not choices: - # Check if there's any content in the response (for non-standard LLM responses) - if chunk.get("content") or chunk.get("message"): - content = chunk.get("content") or chunk.get("message", {}).get("content", "") - if content: - prev_len = len(ctx.full_content) - ctx.full_content += content - if prev_len == 0: # New text stream started - ctx.start_stream_step("text") - yield _sse_event("process_step", { - "step": { - "id": ctx.current_step_id if prev_len == 0 else f"step-{ctx.step_index - 1}", - "index": ctx.current_step_idx if prev_len == 0 else ctx.step_index - 1, - "type": "text", - "content": ctx.full_content - } - }) - continue - - delta = choices[0].get("delta", {}) - - # Handle reasoning (thinking) - yield_obj = ctx.handle_thinking_stream(delta) - if yield_obj: - yield yield_obj - - # Handle content - yield_obj = ctx.handle_text_stream(delta) - if yield_obj: - yield yield_obj - - # Accumulate tool calls - tool_calls_delta = delta.get("tool_calls", []) - for tc in tool_calls_delta: - idx = tc.get("index", 0) - if idx >= len(ctx.tool_calls_list): - ctx.tool_calls_list.append({ - "id": tc.get("id", ""), - "type": "function", - "function": {"name": "", "arguments": ""} - }) - func = tc.get("function", {}) - if func.get("name"): - ctx.tool_calls_list[idx]["function"]["name"] += func["name"] - if func.get("arguments"): - ctx.tool_calls_list[idx]["function"]["arguments"] += func["arguments"] - - # Save streaming step (thinking or text) - ctx.save_streaming_step() - - # Handle tool calls - if ctx.tool_calls_list: - ctx.all_tool_calls.extend(ctx.tool_calls_list) - - # Handle tool_call steps - tool_call_step_ids, tool_call_steps, yield_objs = ctx.handle_tool_call() - ctx.all_steps.extend(tool_call_steps) - for yield_obj in yield_objs: - yield yield_obj - - # Execute tools - tool_context = { - "workspace": workspace, - "user_id": user_id, - "username": username, - "user_permission_level": user_permission_level - } - tool_results = self.tool_executor.process_tool_calls_parallel( - ctx.tool_calls_list, tool_context - ) - - # Handle tool_result steps - for i, tr in enumerate(tool_results): - tool_call_step_id = tool_call_step_ids[i] if i < len(tool_call_step_ids) else f"step-{i}" - result_step, yield_obj = ctx.handle_tool_result(tr, tool_call_step_id) - ctx.all_steps.append(result_step) - yield yield_obj - - ctx.all_tool_results.append({ - "role": "tool", - "tool_call_id": tr.get("tool_call_id", ""), - "content": tr.get("content", "") - }) - - # Add assistant message with tool calls for next iteration - messages.append({ - "role": "assistant", - "content": ctx.full_content or "", - "tool_calls": ctx.tool_calls_list - }) - messages.extend(ctx.all_tool_results[-len(tool_results):]) - ctx.all_tool_results = [] - continue - - # No tool calls - final iteration, save message - msg_id = str(uuid.uuid4()) - - # 使用 API 返回的真实 completion_tokens,如果 API 没返回则降级使用估算值 - actual_token_count = total_usage.get("completion_tokens", 0) or len(ctx.full_content) // 4 - logger.info(f"[TOKEN] total_usage: {total_usage}, actual_token_count: {actual_token_count}") - - self._save_message( - conversation.id, - msg_id, - ctx.full_content, - ctx.all_tool_calls, - ctx.all_tool_results, - ctx.all_steps, - actual_token_count, - total_usage - ) - - yield _sse_event("done", { - "message_id": msg_id, - "token_count": actual_token_count, - "usage": total_usage - }) - return + # Execute agentic loop + async for event in self.agentic_loop.execute( + llm=llm, + model=model, + messages=messages, + tools=tools, + temperature=conversation.temperature, + max_tokens=max_tokens, + thinking_enabled=thinking_enabled or conversation.thinking_enabled, + context=ctx, + tool_context=tool_context + ): + yield event - # Max iterations exceeded - save message before error - if ctx.full_content or ctx.all_tool_calls: - msg_id = str(uuid.uuid4()) + # Save message after successful completion (only if we have content) + if ctx._last_message_id and (ctx.full_content or ctx.all_tool_calls): self._save_message( conversation.id, - msg_id, + ctx._last_message_id, ctx.full_content, ctx.all_tool_calls, ctx.all_tool_results, ctx.all_steps, - actual_token_count, - total_usage + ctx._last_token_count, + ctx._last_usage ) - yield _sse_event("error", {"content": "Exceeded maximum tool call iterations"}) except Exception as e: - logger.error(f"Stream error: {e}") + logger.error(f"Stream error: {e}\n{traceback.format_exc()}") yield _sse_event("error", {"content": str(e)}) - def _save_message( + async def non_stream_response( self, - conversation_id: str, - msg_id: str, - full_content: str, - all_tool_calls: list, - all_tool_results: list, - all_steps: list, - token_count: int = 0, - usage: dict = None - ): - """Save the assistant message to database.""" + conversation, + user_message: str, + tools_enabled: bool = True, + thinking_enabled: bool = False + ) -> Dict[str, Any]: + """Non-streaming response for simple requests.""" + try: + messages = self.build_messages(conversation) + messages.append({ + "role": "user", + "content": json.dumps({"text": user_message, "attachments": []}) + }) + + tools = [] if not tools_enabled else None + llm, max_tokens = get_llm_client(conversation) + model = conversation.model or llm.default_model or "gpt-4" + + response = await llm.sync_call( + model=model, + messages=messages, + tools=tools, + temperature=conversation.temperature, + max_tokens=max_tokens or 8192, + thinking_enabled=thinking_enabled or conversation.thinking_enabled + ) + + return { + "success": True, + "content": response.content, + "tool_calls": response.tool_calls, + "usage": response.usage + } + + except httpx.HTTPStatusError as e: + error_msg = f"HTTP {e.response.status_code}: {e.response.text[:200] if e.response else 'No response body'}" + logger.error(f"Non-stream HTTP error: {error_msg}") + return {"success": False, "error": error_msg} + except httpx.TimeoutException as e: + logger.error(f"Non-stream timeout: {e}") + return {"success": False, "error": "Request timeout"} + except Exception as e: + logger.error(f"Non-stream error: {type(e).__name__}: {e}\n{traceback.format_exc()}") + return {"success": False, "error": f"{type(e).__name__}: {str(e)}"} + + def _save_message(self, conversation_id: str, msg_id: str, full_content: str, + all_tool_calls: list, all_tool_results: list, all_steps: list, + token_count: int = 0, usage: dict = None): + """Save assistant message to database.""" from luxx.database import SessionLocal from luxx.models import Message - content_json = { - "text": full_content, - "steps": all_steps - } + content_json = {"text": full_content, "steps": all_steps} if all_tool_calls: content_json["tool_calls"] = all_tool_calls @@ -520,12 +285,18 @@ class ChatService: ) db.add(msg) db.commit() - except Exception as e: + except Exception: db.rollback() raise finally: db.close() -# Global chat service +def _sse_event(event: str, data: dict) -> str: + """Format a Server-Sent Event string.""" + return f"event: {event}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n" + + +# ============== Global Singleton ============== + chat_service = ChatService() diff --git a/luxx/services/llm_client.py b/luxx/services/llm_client.py index acec3eb..fac67a7 100644 --- a/luxx/services/llm_client.py +++ b/luxx/services/llm_client.py @@ -2,6 +2,7 @@ import json import httpx import logging +import traceback from typing import Dict, Any, Optional, List, AsyncGenerator from luxx.config import config @@ -172,7 +173,7 @@ class LLMClient: logger.error(f"HTTP error: {status_code}") yield f"event: error\ndata: {json.dumps({'content': f'HTTP {status_code}: Request failed'})}\n\n" except Exception as e: - logger.error(f"Exception: {type(e).__name__}: {str(e)}") + logger.error(f"Exception: {type(e).__name__}: {str(e)}\n{traceback.format_exc()}") yield f"event: error\ndata: {json.dumps({'content': str(e)})}\n\n" diff --git a/luxx/services/process_result.py b/luxx/services/process_result.py new file mode 100644 index 0000000..91cc555 --- /dev/null +++ b/luxx/services/process_result.py @@ -0,0 +1,37 @@ +"""ProcessResult - Result of processing an SSE line.""" + + +class ProcessResult: + """Result of processing an SSE line. + + Attributes: + events: List of SSE event strings to yield + has_error: Whether an error occurred + error_content: Error message if any + has_content: Whether content was received + has_tool_calls: Whether tool calls were received + """ + + def __init__(self): + self.events: list = [] + self.has_error: bool = False + self.error_content: str = "" + self.has_content: bool = False + self.has_tool_calls: bool = False + + def add_event(self, event: str): + """Add an event to the result.""" + self.events.append(event) + + def set_error(self, content: str): + """Set error state.""" + self.has_error = True + self.error_content = content + + def set_content(self): + """Mark that content was received.""" + self.has_content = True + + def set_tool_calls(self): + """Mark that tool calls were received.""" + self.has_tool_calls = True diff --git a/luxx/services/stream_context.py b/luxx/services/stream_context.py new file mode 100644 index 0000000..7ebde9a --- /dev/null +++ b/luxx/services/stream_context.py @@ -0,0 +1,185 @@ +"""StreamContext - Manages streaming state transitions during LLM response. + +Tracks steps in order: +- thinking: Model reasoning content +- text: Model response text +- tool_call: Tool invocation request +- tool_result: Tool execution result + +Each step has unique id and index for frontend rendering. +""" +import json +from typing import List, Dict, Optional + + +def _sse_event(event: str, data: dict) -> str: + """Format a Server-Sent Event string.""" + return f"event: {event}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n" + + +class StreamContext: + """Manages streaming state transitions during LLM response.""" + + def __init__(self): + self.step_index = 0 + self.current_step_id = None + self.current_step_idx = None + self.current_step_type = None + self.full_content = "" + self.full_thinking = "" + self.all_steps = [] + self.all_tool_calls = [] + self.all_tool_results = [] + self.tool_calls_list = [] + self._last_message_id = None + self._last_token_count = 0 + self._last_usage = None + + def reset(self): + """Reset state for new iteration.""" + self.current_step_id = None + self.current_step_idx = None + self.current_step_type = None + self.full_content = "" + self.full_thinking = "" + self.tool_calls_list = [] + + def start_step(self, step_type: str) -> str: + """Start a new step with unique ID.""" + self.current_step_idx = self.step_index + self.current_step_id = f"step-{self.step_index}" + self.current_step_type = step_type + self.step_index += 1 + return self.current_step_id + + def finalize_step(self): + """Save current step to all_steps.""" + if self.current_step_id is None: + return + + content = self.full_content if self.current_step_type == "text" else self.full_thinking + self.all_steps.append({ + "id": self.current_step_id, + "index": self.current_step_idx, + "type": self.current_step_type, + "content": content + }) + + def handle_thinking(self, delta: Dict) -> Optional[str]: + """Handle reasoning delta from LLM.""" + reasoning = delta.get("reasoning_content", "") + if not reasoning: + return None + + if not self.full_thinking: + self.start_step("thinking") + + self.full_thinking += reasoning + return _sse_event("process_step", { + "step": { + "id": self.current_step_id, + "index": self.current_step_idx, + "type": "thinking", + "content": self.full_thinking + } + }) + + def handle_text(self, delta: Dict) -> Optional[str]: + """Handle content delta from LLM.""" + content = delta.get("content", "") + if not content: + return None + + if not self.full_content: + self.start_step("text") + + self.full_content += content + return _sse_event("process_step", { + "step": { + "id": self.current_step_id, + "index": self.current_step_idx, + "type": "text", + "content": self.full_content + } + }) + + def accumulate_tool_call(self, tc_delta: Dict): + """Accumulate tool call delta.""" + idx = tc_delta.get("index", 0) + if idx >= len(self.tool_calls_list): + self.tool_calls_list.append({ + "id": tc_delta.get("id", ""), + "type": "function", + "function": {"name": "", "arguments": ""} + }) + + func = tc_delta.get("function", {}) + if func.get("name"): + self.tool_calls_list[idx]["function"]["name"] += func["name"] + if func.get("arguments"): + self.tool_calls_list[idx]["function"]["arguments"] += func["arguments"] + + def emit_tool_calls(self) -> List[str]: + """Emit tool call steps, return SSE events.""" + events = [] + for tc in self.tool_calls_list: + step_id = f"step-{self.step_index}" + self.step_index += 1 + + step = { + "id": step_id, + "index": self.step_index - 1, + "type": "tool_call", + "id_ref": tc.get("id", ""), + "name": tc["function"]["name"], + "arguments": tc["function"]["arguments"] + } + self.all_steps.append(step) + self.all_tool_calls.append(tc) + events.append(_sse_event("process_step", {"step": step})) + + return events + + def emit_tool_result(self, result: Dict, ref_step_id: str) -> tuple: + """Emit tool result step, return (step, event).""" + step_id = f"step-{self.step_index}" + self.step_index += 1 + + content = result.get("content", "") + success = True + try: + parsed = json.loads(content) + if isinstance(parsed, dict): + success = parsed.get("success", True) + except (json.JSONDecodeError, TypeError): + pass + + step = { + "id": step_id, + "index": self.step_index - 1, + "type": "tool_result", + "id_ref": ref_step_id, + "name": result.get("name", ""), + "content": content, + "success": success + } + self.all_steps.append(step) + self.all_tool_results.append({ + "role": "tool", + "tool_call_id": result.get("tool_call_id", ""), + "content": content + }) + + return step, _sse_event("process_step", {"step": step}) + + def set_completion(self, msg_id: str, token_count: int, usage: dict): + """Set completion info for saving.""" + self._last_message_id = msg_id + self._last_token_count = token_count + self._last_usage = usage + + def reset_completion(self): + """Reset completion info.""" + self._last_message_id = None + self._last_token_count = 0 + self._last_usage = None