"""OpenAI Adapter - OpenAI/DeepSeek/GLM/MiniMax compatible API adapter""" import json import logging from typing import Dict, List, Any, AsyncGenerator from .base import ProviderAdapter from ..llm_response import ParsedDelta logger = logging.getLogger(__name__) class OpenAIAdapter(ProviderAdapter): """OpenAI-compatible API adapter""" def __init__(self): pass @property def provider_type(self) -> str: return "openai" def build_request(self, model: str, messages: List[Dict], tools=None, **kwargs) -> tuple: api_key = kwargs.get("api_key", "") headers = {"Content-Type": "application/json", "Authorization": f"Bearer {api_key}"} body = {"model": model, "messages": messages, "stream": kwargs.get("stream", True)} if "temperature" in kwargs: body["temperature"] = kwargs["temperature"] if "max_tokens" in kwargs: body["max_tokens"] = kwargs["max_tokens"] if tools: body["tools"] = tools body["tool_choice"] = "auto" return body, headers def reset(self): pass async def parse_stream_chunk(self, raw_chunk: str) -> AsyncGenerator[ParsedDelta, None]: """Parse OpenAI/MiniMax format. Returns raw content for accumulation.""" if not raw_chunk or not raw_chunk.strip(): return chunk_str = raw_chunk.strip() if chunk_str.startswith("data: "): chunk_str = chunk_str[6:] elif chunk_str.startswith("data:"): chunk_str = chunk_str[5:] if chunk_str.strip() == "[DONE]": yield ParsedDelta(is_complete=True) return try: chunk = json.loads(chunk_str) except json.JSONDecodeError: logger.warning(f"Failed to parse chunk: {chunk_str[:100]}") return choices = chunk.get("choices", []) if not choices: usage = chunk.get("usage") if usage: logger.debug(f"Usage chunk: {usage}") return choice = choices[0] delta = choice.get("delta", {}) finish_reason = choice.get("finish_reason") content = delta.get("content", "") # MiniMax may send tool_calls as array in delta tool_calls = delta.get("tool_calls", []) # Yield content if present if content: yield ParsedDelta(content=content) # Yield each tool_call from tool_calls array (MiniMax format) for tc in tool_calls: yield ParsedDelta(tool_call=tc) # Set is_complete for final chunks if finish_reason in ("stop", "tool_calls"): yield ParsedDelta(is_complete=True) def parse_response(self, data: Dict) -> Dict: """Parse non-streaming response.""" choices = data.get("choices", []) if not choices: return {"content": "", "tool_calls": [], "usage": {}} message = choices[0].get("message", {}) content = message.get("content", "") tool_calls = message.get("tool_calls", []) usage = data.get("usage", {}) return {"content": content, "tool_calls": tool_calls, "usage": usage} def supports_tools(self) -> bool: return True