Luxx/luxx/services/llm_adapters/openai_adapter.py

104 lines
3.4 KiB
Python

"""OpenAI Adapter - OpenAI/DeepSeek/GLM/MiniMax compatible API adapter"""
import json
import logging
from typing import Dict, List, Any, AsyncGenerator
from .base import ProviderAdapter
from ..llm_response import ParsedDelta
logger = logging.getLogger(__name__)
class OpenAIAdapter(ProviderAdapter):
"""OpenAI-compatible API adapter"""
def __init__(self):
pass
@property
def provider_type(self) -> str:
return "openai"
def build_request(self, model: str, messages: List[Dict], tools=None, **kwargs) -> tuple:
api_key = kwargs.get("api_key", "")
headers = {"Content-Type": "application/json", "Authorization": f"Bearer {api_key}"}
body = {"model": model, "messages": messages, "stream": kwargs.get("stream", True)}
if "temperature" in kwargs:
body["temperature"] = kwargs["temperature"]
if "max_tokens" in kwargs:
body["max_tokens"] = kwargs["max_tokens"]
if tools:
body["tools"] = tools
body["tool_choice"] = "auto"
return body, headers
def reset(self):
pass
async def parse_stream_chunk(self, raw_chunk: str) -> AsyncGenerator[ParsedDelta, None]:
"""Parse OpenAI/MiniMax format. Returns raw content for accumulation."""
if not raw_chunk or not raw_chunk.strip():
return
chunk_str = raw_chunk.strip()
if chunk_str.startswith("data: "):
chunk_str = chunk_str[6:]
elif chunk_str.startswith("data:"):
chunk_str = chunk_str[5:]
if chunk_str.strip() == "[DONE]":
yield ParsedDelta(is_complete=True)
return
try:
chunk = json.loads(chunk_str)
except json.JSONDecodeError:
logger.warning(f"Failed to parse chunk: {chunk_str[:100]}")
return
choices = chunk.get("choices", [])
if not choices:
usage = chunk.get("usage")
if usage:
logger.debug(f"Usage chunk: {usage}")
return
choice = choices[0]
delta = choice.get("delta", {})
finish_reason = choice.get("finish_reason")
content = delta.get("content", "")
# MiniMax may send tool_calls as array in delta
tool_calls = delta.get("tool_calls", [])
# Log for debugging
if finish_reason:
logger.debug(f"finish_reason={finish_reason}, content={content[:30] if content else 'empty'}, tool_calls={len(tool_calls)}")
# Yield content if present
if content:
yield ParsedDelta(content=content)
# Yield each tool_call from tool_calls array (MiniMax format)
for tc in tool_calls:
yield ParsedDelta(tool_call=tc)
# Set is_complete for final chunks
if finish_reason in ("stop", "tool_calls"):
yield ParsedDelta(is_complete=True)
def parse_response(self, data: Dict) -> Dict:
"""Parse non-streaming response."""
choices = data.get("choices", [])
if not choices:
return {"content": "", "tool_calls": [], "usage": {}}
message = choices[0].get("message", {})
content = message.get("content", "")
tool_calls = message.get("tool_calls", [])
usage = data.get("usage", {})
return {"content": content, "tool_calls": tool_calls, "usage": usage}
def supports_tools(self) -> bool:
return True