"""Chat service module with Agentic Loop pattern. This module follows SOLID principles: - Single Responsibility: Each class has one job - Dependency Inversion: Depend on abstractions (repositories) not concretions Components: - MessageBuilder: Constructs message lists - ChatService: Core chat service (orchestration only) - ProviderFactory: Creates LLM clients (Dependency Injection) """ import json import logging import traceback import httpx from typing import List, Dict, Any, AsyncGenerator from luxx.tools.executor import ToolExecutor from luxx.tools.core import registry from luxx.services.llm_client import LLMClient from luxx.services.stream_context import StreamState from luxx.services.agentic_loop import AgenticLoop from luxx.services.events import sse_event from luxx.repositories import UnitOfWork logger = logging.getLogger(__name__) # ============== MessageBuilder ============== class MessageBuilder: """Builds message lists for LLM requests.""" def __init__(self): self.messages = [] def add_system(self, content: str) -> 'MessageBuilder': """Add system message.""" self.messages.append({"role": "system", "content": content}) return self def add_user(self, content: str, attachments: list = None) -> 'MessageBuilder': """Add user message in JSON format.""" msg_content = json.dumps({ "text": content, "attachments": attachments or [] }, ensure_ascii=False) self.messages.append({"role": "user", "content": msg_content}) return self def add_assistant(self, content: str, tool_calls: list = None) -> 'MessageBuilder': """Add assistant message.""" msg = {"role": "assistant", "content": content} if tool_calls: msg["tool_calls"] = tool_calls self.messages.append(msg) return self def add_tool_result(self, tool_call_id: str, content: str) -> 'MessageBuilder': """Add tool result message.""" self.messages.append({ "role": "tool", "tool_call_id": tool_call_id, "content": content }) return self def build(self) -> List[Dict]: """Build and return message list.""" return self.messages.copy() @staticmethod def extract_text(content: str) -> str: """Extract text from message content (supports JSON format).""" if not content: return "" try: parsed = json.loads(content) if isinstance(parsed, dict): return parsed.get("text", content) except (json.JSONDecodeError, TypeError): pass return content # ============== LLM Provider Factory ============== class LLMProviderFactory: """Factory for creating LLM clients - follows Dependency Injection This separates the creation of dependencies from their usage, following the Dependency Inversion Principle. """ @staticmethod def create_client( provider=None, api_key: str = None, api_url: str = None, model: str = None ) -> tuple: """Create LLM client from provider or direct parameters Args: provider: LLMProvider model instance (optional) api_key: Direct API key (used if no provider) api_url: Direct API URL (used if no provider) model: Direct model name (used if no provider) Returns: tuple: (LLMClient, max_tokens) """ if provider is not None: client = LLMClient( api_key=provider.api_key, api_url=provider.base_url, model=provider.default_model, provider_type=provider.provider_type ) return client, provider.max_tokens # Fallback to direct parameters client = LLMClient( api_key=api_key, api_url=api_url, model=model ) return client, None # ============== Chat Service ============== class ChatService: """Core chat service with Agentic Loop support. This class follows Single Responsibility - it orchestrates the chat flow but delegates data access to repositories and tool execution to executors. Dependencies are injected via constructor for better testability. """ def __init__( self, tool_executor: ToolExecutor = None, agentic_loop: AgenticLoop = None, provider_factory: LLMProviderFactory = None ): """Initialize ChatService with injected dependencies Args: tool_executor: Tool executor instance (creates default if None) agentic_loop: Agentic loop instance (creates default if None) provider_factory: LLM provider factory (uses default if None) """ self._tool_executor = tool_executor or ToolExecutor() self._agentic_loop = agentic_loop or AgenticLoop(self._tool_executor) self._provider_factory = provider_factory or LLMProviderFactory() def build_messages(self, conversation, include_system: bool = True) -> List[Dict]: """Build message list from conversation history using Repository""" messages = [] if include_system and conversation.system_prompt: messages.append({"role": "system", "content": conversation.system_prompt}) with UnitOfWork() as uow: db_messages = uow.messages.get_by_conversation(conversation.id) for msg in db_messages: content = MessageBuilder.extract_text(msg.content) messages.append({"role": msg.role, "content": content}) return messages def _get_tools(self, enabled_tools: list) -> list: """Filter tools based on enabled_tools list.""" if not enabled_tools: return [] return [t for t in registry.list_all() if t.get("function", {}).get("name") in enabled_tools] async def stream_response( self, conversation, user_message: str, thinking_enabled: bool = False, enabled_tools: list = None, user_id: int = None, username: str = None, workspace: str = None, user_permission_level: int = 1, skip_user_message: bool = False # Skip adding user message if already in DB ) -> AsyncGenerator[str, None]: """Streaming response with Agentic Loop.""" try: # Build initial messages messages = self.build_messages(conversation) # Only add user message if not already in database if not skip_user_message: messages.append({ "role": "user", "content": json.dumps({"text": user_message, "attachments": []}) }) # Get tools and LLM client via factory tools = self._get_tools(enabled_tools) llm, provider_max_tokens = self._provider_factory.create_client( provider=conversation.provider ) model = conversation.model or llm.default_model or "gpt-4" max_tokens = provider_max_tokens or 8192 # Tool execution context tool_context = { "workspace": workspace, "user_id": user_id, "username": username, "user_permission_level": user_permission_level } # Stream context ctx = StreamState() # Execute agentic loop async for event in self._agentic_loop.execute( llm=llm, model=model, messages=messages, tools=tools, temperature=conversation.temperature, max_tokens=max_tokens, thinking_enabled=thinking_enabled or conversation.thinking_enabled, context=ctx, tool_context=tool_context ): yield event # Save message after successful completion if ctx._last_message_id and ctx.all_steps: self._save_message( conversation.id, ctx._last_message_id, ctx.get_steps_for_save(), ctx._last_token_count, ctx._last_usage ) except Exception as e: logger.error(f"Stream error: {e}\n{traceback.format_exc()}") yield sse_event("error", {"content": str(e)}) async def non_stream_response( self, conversation, user_message: str, tools_enabled: bool = True, thinking_enabled: bool = False ) -> Dict[str, Any]: """Non-streaming response for simple requests.""" try: messages = self.build_messages(conversation) messages.append({ "role": "user", "content": json.dumps({"text": user_message, "attachments": []}) }) tools = [] if not tools_enabled else None llm, max_tokens = self._provider_factory.create_client( provider=conversation.provider ) model = conversation.model or llm.default_model or "gpt-4" response = await llm.sync_call( model=model, messages=messages, tools=tools, temperature=conversation.temperature, max_tokens=max_tokens or 8192, thinking_enabled=thinking_enabled or conversation.thinking_enabled ) return { "success": True, "content": response.get("content", ""), "tool_calls": response.get("tool_calls", []), "usage": response.get("usage", {}) } except httpx.HTTPStatusError as e: error_msg = f"HTTP {e.response.status_code}: {e.response.text[:200] if e.response else 'No response body'}" logger.error(f"Non-stream HTTP error: {error_msg}") return {"success": False, "error": error_msg} except httpx.TimeoutException as e: logger.error(f"Non-stream timeout: {e}") return {"success": False, "error": "Request timeout"} except Exception as e: logger.error(f"Non-stream error: {type(e).__name__}: {e}\n{traceback.format_exc()}") return {"success": False, "error": f"{type(e).__name__}: {str(e)}"} def _save_message( self, conversation_id: str, msg_id: str, all_steps: list, token_count: int = 0, usage: dict = None ): """Save assistant message to database using Repository""" content_json = {"steps": all_steps} with UnitOfWork() as uow: uow.messages.create( msg_id=msg_id, conversation_id=conversation_id, role="assistant", content=content_json, token_count=token_count, usage=usage ) uow.commit() # ============== Factory Function ============== def create_chat_service( tool_executor: ToolExecutor = None, agentic_loop: AgenticLoop = None ) -> ChatService: """Factory function to create ChatService instances""" return ChatService( tool_executor=tool_executor, agentic_loop=agentic_loop )