Luxx/luxx/services/chat.py

333 lines
11 KiB
Python

"""Chat service module with Agentic Loop pattern.
This module follows SOLID principles:
- Single Responsibility: Each class has one job
- Dependency Inversion: Depend on abstractions (repositories) not concretions
Components:
- MessageBuilder: Constructs message lists
- ChatService: Core chat service (orchestration only)
- ProviderFactory: Creates LLM clients (Dependency Injection)
"""
import json
import logging
import traceback
import httpx
from typing import List, Dict, Any, AsyncGenerator
from luxx.tools.executor import ToolExecutor
from luxx.tools.core import registry
from luxx.services.llm_client import LLMClient
from luxx.services.stream_context import StreamState
from luxx.services.agentic_loop import AgenticLoop
from luxx.services.events import sse_event
from luxx.repositories import UnitOfWork
logger = logging.getLogger(__name__)
# ============== MessageBuilder ==============
class MessageBuilder:
"""Builds message lists for LLM requests."""
def __init__(self):
self.messages = []
def add_system(self, content: str) -> 'MessageBuilder':
"""Add system message."""
self.messages.append({"role": "system", "content": content})
return self
def add_user(self, content: str, attachments: list = None) -> 'MessageBuilder':
"""Add user message in JSON format."""
msg_content = json.dumps({
"text": content,
"attachments": attachments or []
}, ensure_ascii=False)
self.messages.append({"role": "user", "content": msg_content})
return self
def add_assistant(self, content: str, tool_calls: list = None) -> 'MessageBuilder':
"""Add assistant message."""
msg = {"role": "assistant", "content": content}
if tool_calls:
msg["tool_calls"] = tool_calls
self.messages.append(msg)
return self
def add_tool_result(self, tool_call_id: str, content: str) -> 'MessageBuilder':
"""Add tool result message."""
self.messages.append({
"role": "tool",
"tool_call_id": tool_call_id,
"content": content
})
return self
def build(self) -> List[Dict]:
"""Build and return message list."""
return self.messages.copy()
@staticmethod
def extract_text(content: str) -> str:
"""Extract text from message content (supports JSON format)."""
if not content:
return ""
try:
parsed = json.loads(content)
if isinstance(parsed, dict):
return parsed.get("text", content)
except (json.JSONDecodeError, TypeError):
pass
return content
# ============== LLM Provider Factory ==============
class LLMProviderFactory:
"""Factory for creating LLM clients - follows Dependency Injection
This separates the creation of dependencies from their usage,
following the Dependency Inversion Principle.
"""
@staticmethod
def create_client(
provider=None,
api_key: str = None,
api_url: str = None,
model: str = None
) -> tuple:
"""Create LLM client from provider or direct parameters
Args:
provider: LLMProvider model instance (optional)
api_key: Direct API key (used if no provider)
api_url: Direct API URL (used if no provider)
model: Direct model name (used if no provider)
Returns:
tuple: (LLMClient, max_tokens)
"""
if provider is not None:
client = LLMClient(
api_key=provider.api_key,
api_url=provider.base_url,
model=provider.default_model,
provider_type=provider.provider_type
)
return client, provider.max_tokens
# Fallback to direct parameters
client = LLMClient(
api_key=api_key,
api_url=api_url,
model=model
)
return client, None
# ============== Chat Service ==============
class ChatService:
"""Core chat service with Agentic Loop support.
This class follows Single Responsibility - it orchestrates the chat flow
but delegates data access to repositories and tool execution to executors.
Dependencies are injected via constructor for better testability.
"""
def __init__(
self,
tool_executor: ToolExecutor = None,
agentic_loop: AgenticLoop = None,
provider_factory: LLMProviderFactory = None
):
"""Initialize ChatService with injected dependencies
Args:
tool_executor: Tool executor instance (creates default if None)
agentic_loop: Agentic loop instance (creates default if None)
provider_factory: LLM provider factory (uses default if None)
"""
self._tool_executor = tool_executor or ToolExecutor()
self._agentic_loop = agentic_loop or AgenticLoop(self._tool_executor)
self._provider_factory = provider_factory or LLMProviderFactory()
def build_messages(self, conversation, include_system: bool = True) -> List[Dict]:
"""Build message list from conversation history using Repository"""
messages = []
if include_system and conversation.system_prompt:
messages.append({"role": "system", "content": conversation.system_prompt})
with UnitOfWork() as uow:
db_messages = uow.messages.get_by_conversation(conversation.id)
for msg in db_messages:
content = MessageBuilder.extract_text(msg.content)
messages.append({"role": msg.role, "content": content})
return messages
def _get_tools(self, enabled_tools: list) -> list:
"""Filter tools based on enabled_tools list."""
if not enabled_tools:
return []
return [t for t in registry.list_all()
if t.get("function", {}).get("name") in enabled_tools]
async def stream_response(
self,
conversation,
user_message: str,
thinking_enabled: bool = False,
enabled_tools: list = None,
user_id: int = None,
username: str = None,
workspace: str = None,
user_permission_level: int = 1
) -> AsyncGenerator[str, None]:
"""Streaming response with Agentic Loop."""
try:
# Build initial messages
messages = self.build_messages(conversation)
messages.append({
"role": "user",
"content": json.dumps({"text": user_message, "attachments": []})
})
# Get tools and LLM client via factory
tools = self._get_tools(enabled_tools)
llm, provider_max_tokens = self._provider_factory.create_client(
provider=conversation.provider
)
model = conversation.model or llm.default_model or "gpt-4"
max_tokens = provider_max_tokens or 8192
# Tool execution context
tool_context = {
"workspace": workspace,
"user_id": user_id,
"username": username,
"user_permission_level": user_permission_level
}
# Stream context
ctx = StreamState()
# Execute agentic loop
async for event in self._agentic_loop.execute(
llm=llm,
model=model,
messages=messages,
tools=tools,
temperature=conversation.temperature,
max_tokens=max_tokens,
thinking_enabled=thinking_enabled or conversation.thinking_enabled,
context=ctx,
tool_context=tool_context
):
yield event
# Save message after successful completion
if ctx._last_message_id and ctx.all_steps:
self._save_message(
conversation.id,
ctx._last_message_id,
ctx.get_steps_for_save(),
ctx._last_token_count,
ctx._last_usage
)
except Exception as e:
logger.error(f"Stream error: {e}\n{traceback.format_exc()}")
yield sse_event("error", {"content": str(e)})
async def non_stream_response(
self,
conversation,
user_message: str,
tools_enabled: bool = True,
thinking_enabled: bool = False
) -> Dict[str, Any]:
"""Non-streaming response for simple requests."""
try:
messages = self.build_messages(conversation)
messages.append({
"role": "user",
"content": json.dumps({"text": user_message, "attachments": []})
})
tools = [] if not tools_enabled else None
llm, max_tokens = self._provider_factory.create_client(
provider=conversation.provider
)
model = conversation.model or llm.default_model or "gpt-4"
response = await llm.sync_call(
model=model,
messages=messages,
tools=tools,
temperature=conversation.temperature,
max_tokens=max_tokens or 8192,
thinking_enabled=thinking_enabled or conversation.thinking_enabled
)
return {
"success": True,
"content": response.get("content", ""),
"tool_calls": response.get("tool_calls", []),
"usage": response.get("usage", {})
}
except httpx.HTTPStatusError as e:
error_msg = f"HTTP {e.response.status_code}: {e.response.text[:200] if e.response else 'No response body'}"
logger.error(f"Non-stream HTTP error: {error_msg}")
return {"success": False, "error": error_msg}
except httpx.TimeoutException as e:
logger.error(f"Non-stream timeout: {e}")
return {"success": False, "error": "Request timeout"}
except Exception as e:
logger.error(f"Non-stream error: {type(e).__name__}: {e}\n{traceback.format_exc()}")
return {"success": False, "error": f"{type(e).__name__}: {str(e)}"}
def _save_message(
self,
conversation_id: str,
msg_id: str,
all_steps: list,
token_count: int = 0,
usage: dict = None
):
"""Save assistant message to database using Repository"""
content_json = {"steps": all_steps}
with UnitOfWork() as uow:
uow.messages.create(
msg_id=msg_id,
conversation_id=conversation_id,
role="assistant",
content=content_json,
token_count=token_count,
usage=usage
)
uow.commit()
# ============== Factory Function ==============
def create_chat_service(
tool_executor: ToolExecutor = None,
agentic_loop: AgenticLoop = None
) -> ChatService:
"""Factory function to create ChatService instances"""
return ChatService(
tool_executor=tool_executor,
agentic_loop=agentic_loop
)