135 lines
5.0 KiB
Python
135 lines
5.0 KiB
Python
"""OpenAI Adapter - OpenAI/DeepSeek/GLM/MiniMax compatible API adapter"""
|
|
|
|
import json
|
|
import logging
|
|
from typing import Dict, List, Any, AsyncGenerator
|
|
|
|
from .base import ProviderAdapter
|
|
from ..llm_response import ParsedDelta
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class OpenAIAdapter(ProviderAdapter):
|
|
"""OpenAI-compatible API adapter"""
|
|
|
|
def __init__(self):
|
|
pass
|
|
|
|
@property
|
|
def provider_type(self) -> str:
|
|
return "openai"
|
|
|
|
@property
|
|
def api_path(self) -> str:
|
|
return "/chat/completions"
|
|
|
|
def build_request(self, model: str, messages: List[Dict], tools=None, **kwargs) -> tuple:
|
|
api_key = kwargs.get("api_key", "")
|
|
headers = {"Content-Type": "application/json", "Authorization": f"Bearer {api_key}"}
|
|
body = {"model": model, "messages": messages, "stream": kwargs.get("stream", True)}
|
|
if "temperature" in kwargs:
|
|
body["temperature"] = kwargs["temperature"]
|
|
if "max_tokens" in kwargs:
|
|
body["max_tokens"] = kwargs["max_tokens"]
|
|
if tools:
|
|
body["tools"] = tools
|
|
body["tool_choice"] = "auto"
|
|
|
|
# DeepSeek-specific parameters
|
|
if "reasoning_effort" in kwargs:
|
|
body["reasoning_effort"] = kwargs["reasoning_effort"]
|
|
if "thinking_enabled" in kwargs and kwargs["thinking_enabled"]:
|
|
body["thinking"] = {"type": "enabled"}
|
|
if "response_format" in kwargs:
|
|
body["response_format"] = kwargs["response_format"]
|
|
|
|
return body, headers
|
|
|
|
def reset(self):
|
|
pass
|
|
|
|
async def parse_stream_chunk(self, raw_chunk: str) -> AsyncGenerator[ParsedDelta, None]:
|
|
"""Parse OpenAI/MiniMax format. Returns raw content for accumulation."""
|
|
if not raw_chunk or not raw_chunk.strip():
|
|
return
|
|
|
|
chunk_str = raw_chunk.strip()
|
|
if chunk_str.startswith("data: "):
|
|
chunk_str = chunk_str[6:]
|
|
elif chunk_str.startswith("data:"):
|
|
chunk_str = chunk_str[5:]
|
|
|
|
if chunk_str.strip() == "[DONE]":
|
|
yield ParsedDelta(is_complete=True)
|
|
return
|
|
|
|
try:
|
|
chunk = json.loads(chunk_str)
|
|
except json.JSONDecodeError:
|
|
logger.warning(f"Failed to parse chunk: {chunk_str[:100]}")
|
|
return
|
|
|
|
choices = chunk.get("choices", [])
|
|
if not choices:
|
|
# DeepSeek may send usage in a separate chunk without choices
|
|
usage = chunk.get("usage")
|
|
if usage:
|
|
logger.info(f"[TOKEN] Received usage from stream: {usage}")
|
|
yield ParsedDelta(usage={
|
|
"prompt_tokens": usage.get("prompt_tokens", 0),
|
|
"completion_tokens": usage.get("completion_tokens", 0),
|
|
"total_tokens": usage.get("total_tokens", 0)
|
|
})
|
|
return
|
|
|
|
choice = choices[0]
|
|
delta = choice.get("delta", {})
|
|
finish_reason = choice.get("finish_reason")
|
|
content = delta.get("content", "")
|
|
|
|
# BUG FIX: Extract DeepSeek reasoning_content (thinking mode).
|
|
# DeepSeek sends reasoning_content alongside normal content in stream deltas.
|
|
# This must be yielded as a separate ParsedDelta with only the "thinking" field
|
|
# set, so the agentic loop can:
|
|
# 1. Render it as a thinking step in the UI
|
|
# 2. Accumulate it to echo back in subsequent API calls
|
|
# Without this, DeepSeek returns 400:
|
|
# "The `reasoning_content` in the thinking mode must be passed back to the API."
|
|
reasoning_content = delta.get("reasoning_content", "")
|
|
|
|
# MiniMax may send tool_calls as array in delta
|
|
tool_calls = delta.get("tool_calls", [])
|
|
|
|
# Yield thinking (includes DeepSeek reasoning_content)
|
|
if reasoning_content:
|
|
yield ParsedDelta(thinking=reasoning_content)
|
|
|
|
# Yield content if present
|
|
if content:
|
|
yield ParsedDelta(content=content)
|
|
|
|
# Yield each tool_call from tool_calls array (MiniMax format)
|
|
for tc in tool_calls:
|
|
yield ParsedDelta(tool_call=tc)
|
|
|
|
# Only set is_complete for actual completion ("stop").
|
|
# DeepSeek sends "tool_calls" when it wants to call tools (stream continues with tool_call deltas).
|
|
# "length" / "content_filter" mean max_tokens hit - treat as incomplete, let loop handle it.
|
|
if finish_reason == "stop":
|
|
yield ParsedDelta(is_complete=True)
|
|
|
|
def parse_response(self, data: Dict) -> Dict:
|
|
"""Parse non-streaming response."""
|
|
choices = data.get("choices", [])
|
|
if not choices:
|
|
return {"content": "", "tool_calls": [], "usage": {}}
|
|
message = choices[0].get("message", {})
|
|
content = message.get("content", "")
|
|
tool_calls = message.get("tool_calls", [])
|
|
usage = data.get("usage", {})
|
|
return {"content": content, "tool_calls": tool_calls, "usage": usage}
|
|
|
|
def supports_tools(self) -> bool:
|
|
return True
|