Compare commits

..

4 Commits

Author SHA1 Message Date
ViperEkura 4de03866f4 feat: 初步实现task 2026-04-16 21:21:56 +08:00
ViperEkura 119d566e89 feat: 增加task 图构建 2026-04-16 20:27:34 +08:00
ViperEkura 2f9deb40ae feat: 增加基础模型设置 2026-04-16 20:19:41 +08:00
ViperEkura 9edf4dac9c fix: 修复流式聊天部分问题 2026-04-16 16:12:59 +08:00
9 changed files with 1146 additions and 254 deletions

View File

@ -55,6 +55,49 @@
</div>
</div>
<!-- Task Start Step -->
<div v-else-if="item.type === 'task_start'" :key="`task-start-${item.key}`" class="step-item task-start">
<div class="step-header">
<span v-html="targetIcon"></span>
<span class="step-label">开始任务</span>
<span class="step-brief">{{ item.taskName || '新任务' }}</span>
<span class="arrow" :class="{ open: expandedKeys.has(item.key) }" v-html="chevronDown"></span>
</div>
<div v-if="expandedKeys.has(item.key)" class="step-content">
<div class="task-detail">
<div class="task-goal"><strong>目标:</strong> {{ item.goal }}</div>
<div v-if="item.steps && item.steps.length > 0" class="task-steps">
<strong>步骤:</strong>
<ol>
<li v-for="(step, idx) in item.steps" :key="idx">{{ step.name }}</li>
</ol>
</div>
</div>
</div>
</div>
<!-- Task Complete Step -->
<div v-else-if="item.type === 'task_complete'" :key="`task-complete-${item.key}`" class="step-item task-complete">
<div class="step-header">
<span v-html="checkCircleIcon"></span>
<span class="step-label">任务完成</span>
<span class="step-brief">{{ item.taskName || '任务' }}</span>
<span class="step-status">
<span v-if="item.success !== false" class="step-badge success">成功</span>
<span v-else class="step-badge error">失败</span>
<span class="arrow" :class="{ open: expandedKeys.has(item.key) }" v-html="chevronDown"></span>
</span>
</div>
<div v-if="expandedKeys.has(item.key)" class="step-content">
<div v-if="item.result" class="task-result">
<strong>结果:</strong> {{ item.result }}
</div>
<div v-if="item.summary" class="task-summary">
<span>完成步骤: {{ item.summary.completed_steps || 0 }} / {{ item.summary.total_steps || 0 }}</span>
</div>
</div>
</div>
<!-- Text Step -->
<div v-else-if="item.type === 'text'" :key="`text-${item.key}`" class="text-content md-content" v-html="renderMarkdown(item.content)"></div>
</template>
@ -167,6 +210,25 @@ const allItems = computed(() => {
index: step.index,
content: step.content || '',
})
} else if (step.type === 'task_start') {
items.push({
key: step.id || `task-start-${step.index}`,
type: 'task_start',
index: step.index,
taskName: step.taskName || step.name || '',
goal: step.goal || '',
steps: step.steps || [],
})
} else if (step.type === 'task_complete') {
items.push({
key: step.id || `task-complete-${step.index}`,
type: 'task_complete',
index: step.index,
taskName: step.taskName || step.name || '',
success: step.success,
result: step.result || '',
summary: step.summary || {},
})
}
}
} else if (props.toolCalls && props.toolCalls.length > 0) {
@ -246,6 +308,10 @@ const chevronDown = `<svg viewBox="0 0 24 24" width="16" height="16" fill="none"
const sparkleIcon = `<svg viewBox="0 0 24 24" width="14" height="14" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round"><path d="m12 3-1.912 5.813a2 2 0 0 1-1.275 1.275L3 12l5.813 1.912a2 2 0 0 1 1.275 1.275L12 21l1.912-5.813a2 2 0 0 1 1.275-1.275L21 12l-5.813-1.912a2 2 0 0 1-1.275-1.275L12 3Z"></path></svg>`
const alertIcon = `<svg viewBox="0 0 24 24" width="16" height="16" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round"><circle cx="12" cy="12" r="10"></circle><line x1="12" y1="8" x2="12" y2="12"></line><line x1="12" y1="16" x2="12.01" y2="16"></line></svg>`
const targetIcon = `<svg viewBox="0 0 24 24" width="16" height="16" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round"><circle cx="12" cy="12" r="10"></circle><circle cx="12" cy="12" r="6"></circle><circle cx="12" cy="12" r="2"></circle></svg>`
const checkCircleIcon = `<svg viewBox="0 0 24 24" width="16" height="16" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round"><path d="M22 11.08V12a10 10 0 1 1-5.93-9.14"></path><polyline points="22 4 12 14.01 9 11.01"></polyline></svg>`
</script>
<style scoped>
@ -390,4 +456,20 @@ const alertIcon = `<svg viewBox="0 0 24 24" width="16" height="16" fill="none" s
.streaming-indicator { display: flex; align-items: center; gap: 8px; font-size: 12px; color: var(--text-tertiary); }
.process-block:has(.step-item) .streaming-indicator { margin-top: 8px; padding: 8px 0 0; border-top: 1px solid var(--border-light); }
/* Task styles */
.task-start .step-header svg:first-child { color: #3b82f6; }
.task-complete .step-header svg:first-child { color: #22c55e; }
.task-detail { font-size: 13px; }
.task-goal { margin-bottom: 8px; color: var(--text-secondary); line-height: 1.5; }
.task-goal strong { color: var(--text-primary); }
.task-steps { color: var(--text-secondary); }
.task-steps strong { color: var(--text-primary); display: block; margin-bottom: 4px; }
.task-steps ol { margin: 4px 0 0 20px; padding: 0; }
.task-steps li { margin-bottom: 4px; }
.task-result { font-size: 13px; color: var(--text-secondary); margin-bottom: 8px; }
.task-result strong { color: var(--text-primary); }
.task-summary { font-size: 12px; color: var(--text-tertiary); }
</style>

View File

@ -68,6 +68,9 @@
<span>加载中...</span>
</div>
<div v-else-if="convMessages.length || currentStreamState">
<!-- 任务进度栏 -->
<TaskProgressBar v-if="currentTask" :task="currentTask" />
<!-- 历史消息 -->
<MessageBubble
v-for="msg in convMessages"
@ -83,8 +86,8 @@
<div class="avatar">Luxx</div>
<div class="message-content">
<ProcessBlock
:process-steps="currentStreamState.process_steps"
:streaming="currentStreamState.status === 'streaming'"
:process-steps="currentStreamState?.process_steps"
:streaming="currentStreamState?.status === 'streaming'"
/>
</div>
</div>
@ -207,6 +210,28 @@ const observedElements = new Set()
const editConv = ref(null)
//
const currentTask = ref(null)
//
watch(() => currentStreamState?.process_steps, (steps) => {
if (!steps || !steps.length) {
currentTask.value = null
return
}
// task_start
const taskStart = [...steps].reverse().find(s => s.type === 'task_start')
if (taskStart) {
currentTask.value = {
name: taskStart.taskName || taskStart.name,
goal: taskStart.goal,
status: 'running',
steps: taskStart.steps || []
}
}
}, { deep: true })
//
const handleSend = async () => {
if (!newMessage.value.trim()) return
@ -293,7 +318,7 @@ watch(convMessages, () => {
scrollToBottom()
}, { deep: true })
watch(() => currentStreamState.value?.process_steps?.length, () => {
watch(() => currentStreamState?.process_steps?.length, () => {
scrollToBottom()
})

124
luxx/agent/agent.py Normal file
View File

@ -0,0 +1,124 @@
"""Agent module for autonomous task execution"""
from enum import Enum
import logging
from dataclasses import dataclass, field
from datetime import datetime
from typing import Dict, List, Optional, Any, TYPE_CHECKING
from luxx.utils.helpers import generate_id
if TYPE_CHECKING:
from luxx.agent.task import Task
logger = logging.getLogger(__name__)
class AgentStatus(Enum):
"""Agent status"""
READY = "ready"
RUNNING = "running"
BLOCK = "block"
TERMINATED = "terminated"
@dataclass
class Agent:
"""Agent"""
id: str
name: str
description: str = ""
instructions: str = ""
tools: List[str] = field(default_factory=list)
current_task: Optional["Task"] = None
status: AgentStatus = AgentStatus.READY
result: Optional[Dict[str, Any]] = None
error: Optional[str] = None
created_at: datetime = field(default_factory=datetime.now)
updated_at: datetime = field(default_factory=datetime.now)
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary"""
return {
"id": self.id,
"name": self.name,
"description": self.description,
"instructions": self.instructions,
"tools": self.tools,
"current_task": self.current_task.to_dict() if self.current_task else None,
"status": self.status,
"result": self.result,
"error": self.error,
"created_at": self.created_at.isoformat() if self.created_at else None,
"updated_at": self.updated_at.isoformat() if self.updated_at else None
}
class AgentService:
"""Agent service for managing agent instances"""
def __init__(self):
self._agents: Dict[str, Agent] = {}
def create_agent(
self,
name: str,
description: str = "",
instructions: str = "",
tools: List[str] = None
) -> Agent:
"""Create a new agent instance"""
agent_id = generate_id("agent")
agent = Agent(
id=agent_id,
name=name,
description=description,
instructions=instructions,
tools=tools or []
)
self._agents[agent_id] = agent
logger.info(f"Created agent: {agent_id} - {name}")
return agent
def get_agent(self, agent_id: str) -> Optional[Agent]:
"""Get agent by ID"""
return self._agents.get(agent_id)
def list_agents(self) -> List[Agent]:
"""List all agents"""
return list(self._agents.values())
def delete_agent(self, agent_id: str) -> bool:
"""Delete agent by ID"""
if agent_id in self._agents:
del self._agents[agent_id]
logger.info(f"Deleted agent: {agent_id}")
return True
return False
def update_agent(
self,
agent_id: str,
name: str = None,
description: str = None,
instructions: str = None,
tools: List[str] = None
) -> Optional[Agent]:
"""Update agent configuration"""
agent = self._agents.get(agent_id)
if not agent:
return None
if name is not None:
agent.name = name
if description is not None:
agent.description = description
if instructions is not None:
agent.instructions = instructions
if tools is not None:
agent.tools = tools
agent.updated_at = datetime.now()
return agent
# Global service instances
agent_service = AgentService()

308
luxx/routes/agent.py Normal file
View File

@ -0,0 +1,308 @@
"""Agent routes module"""
from typing import List, Optional
from fastapi import APIRouter
from pydantic import BaseModel
from luxx.agent.agent import agent_service, task_service
from luxx.agent.task import TaskStatus, StepStatus
from luxx.utils.helpers import success_response, error_response
router = APIRouter(prefix="/agent", tags=["Agent"])
# ==================== Request Models ====================
class CreateAgentRequest(BaseModel):
"""Create agent request"""
name: str
description: str = ""
instructions: str = ""
tools: List[str] = []
class UpdateAgentRequest(BaseModel):
"""Update agent request"""
name: Optional[str] = None
description: Optional[str] = None
instructions: Optional[str] = None
tools: Optional[List[str]] = None
class TaskStep(BaseModel):
"""Task step"""
name: str
description: str = ""
class CreateTaskRequest(BaseModel):
"""Create task request"""
name: str
goal: str
description: str = ""
steps: List[TaskStep] = []
class CreateSubtaskRequest(BaseModel):
"""Create subtask request"""
name: str
goal: str
description: str = ""
class UpdateTaskStatusRequest(BaseModel):
"""Update task status request"""
status: str
# ==================== Agent Endpoints ====================
@router.post("/agents", response_model=dict)
def create_agent(request: CreateAgentRequest):
"""Create a new agent instance"""
agent = agent_service.create_agent(
name=request.name,
description=request.description,
instructions=request.instructions,
tools=request.tools
)
return success_response(
data=agent.to_dict(),
message="Agent created successfully"
)
@router.get("/agents", response_model=dict)
def list_agents():
"""List all agents"""
agents = agent_service.list_agents()
return success_response(
data={
"agents": [a.to_dict() for a in agents],
"total": len(agents)
}
)
@router.get("/agents/{agent_id}", response_model=dict)
def get_agent(agent_id: str):
"""Get agent details"""
agent = agent_service.get_agent(agent_id)
if not agent:
return error_response("Agent not found", 404)
return success_response(data=agent.to_dict())
@router.put("/agents/{agent_id}", response_model=dict)
def update_agent(agent_id: str, request: UpdateAgentRequest):
"""Update agent configuration"""
agent = agent_service.update_agent(
agent_id=agent_id,
name=request.name,
description=request.description,
instructions=request.instructions,
tools=request.tools
)
if not agent:
return error_response("Agent not found", 404)
return success_response(
data=agent.to_dict(),
message="Agent updated successfully"
)
@router.delete("/agents/{agent_id}", response_model=dict)
def delete_agent(agent_id: str):
"""Delete agent instance"""
if agent_service.delete_agent(agent_id):
return success_response(message="Agent deleted successfully")
return error_response("Agent not found", 404)
# ==================== Task Endpoints ====================
@router.post("/tasks", response_model=dict)
def create_task(request: CreateTaskRequest):
"""Create a new task"""
task = task_service.create_task(
name=request.name,
goal=request.goal,
description=request.description,
steps=[s.dict() for s in request.steps] if request.steps else None
)
return success_response(
data=task.to_dict(),
message="Task created successfully"
)
@router.get("/agents/{agent_id}/tasks", response_model=dict)
def get_current_task(agent_id: str):
"""Get agent's current task"""
tasks = task_service.list_tasks(agent_id=agent_id)
if not tasks:
return success_response(
data={"task": None},
message="No task found"
)
return success_response(data={"task": tasks[0].to_dict()})
@router.get("/tasks", response_model=dict)
def list_tasks(agent_id: str = None):
"""List all tasks, optionally filtered by agent"""
tasks = task_service.list_tasks(agent_id=agent_id)
return success_response(
data={
"tasks": [t.to_dict() for t in tasks],
"total": len(tasks)
}
)
@router.get("/tasks/{task_id}", response_model=dict)
def get_task(task_id: str):
"""Get task details"""
task = task_service.get_task(task_id)
if not task:
return error_response("Task not found", 404)
return success_response(data=task.to_dict())
@router.put("/tasks/{task_id}/status", response_model=dict)
def update_task_status(task_id: str, request: UpdateTaskStatusRequest):
"""Update task status"""
try:
task_status = TaskStatus(request.status)
except ValueError:
valid_statuses = [s.value for s in TaskStatus]
return error_response(f"Invalid status: {request.status}. Valid: {valid_statuses}", 400)
task = task_service.update_task_status(task_id, task_status)
if not task:
return error_response("Task not found", 404)
return success_response(
data=task.to_dict(),
message="Task status updated"
)
@router.delete("/tasks/{task_id}", response_model=dict)
def delete_task(task_id: str):
"""Delete task"""
if task_service.delete_task(task_id):
return success_response(message="Task deleted successfully")
return error_response("Task not found", 404)
# ==================== Step Endpoints ====================
@router.post("/tasks/{task_id}/steps", response_model=dict)
def add_step(task_id: str, name: str, description: str = ""):
"""Add step to task"""
step = task_service.add_step(task_id, name, description)
if not step:
return error_response("Task not found", 404)
return success_response(
data=step.to_dict(),
message="Step added successfully"
)
@router.put("/tasks/{task_id}/steps/{step_id}/status", response_model=dict)
def update_step_status(task_id: str, step_id: str, status: str):
"""Update step status"""
try:
step_status = StepStatus(status)
except ValueError:
valid_statuses = [s.value for s in StepStatus]
return error_response(f"Invalid status: {status}. Valid: {valid_statuses}", 400)
task = task_service.get_task(task_id)
if not task:
return error_response("Task not found", 404)
for step in task.steps:
if step.id == step_id:
step.status = step_status
step.updated_at = __import__("datetime").datetime.now()
return success_response(
data=step.to_dict(),
message="Step status updated"
)
return error_response("Step not found", 404)
# ==================== Subtask Endpoints ====================
@router.post("/tasks/{task_id}/subtasks", response_model=dict)
def create_subtask(task_id: str, request: CreateSubtaskRequest):
"""Add subtask to parent task"""
subtask = task_service.add_subtask(
parent_task_id=task_id,
name=request.name,
goal=request.goal,
description=request.description
)
if not subtask:
return error_response("Parent task not found", 404)
return success_response(
data=subtask.to_dict(),
message="Subtask created successfully"
)
@router.get("/tasks/{task_id}/subtasks", response_model=dict)
def list_subtasks(task_id: str):
"""List subtasks of a task"""
task = task_service.get_task(task_id)
if not task:
return error_response("Task not found", 404)
return success_response(
data={
"subtasks": [t.to_dict() for t in task.subtasks],
"total": len(task.subtasks)
}
)
# ==================== Execution Endpoints ====================
@router.post("/agents/{agent_id}/execute", response_model=dict)
def execute_agent(agent_id: str, goal: str):
"""Trigger agent to execute a goal"""
agent = agent_service.get_agent(agent_id)
if not agent:
return error_response("Agent not found", 404)
if agent.status == "executing":
return error_response("Agent is already executing", 400)
# Update agent status
agent.status = "executing"
agent.updated_at = __import__("datetime").datetime.now()
# Create task
task = task_service.create_task(
agent_id=agent_id,
name=f"Task: {goal[:50]}...",
goal=goal,
description=f"Auto-generated task for goal: {goal}"
)
if not task:
return error_response("Failed to create task", 500)
return success_response(
data={
"agent_id": agent_id,
"task_id": task.id,
"status": "executing",
"message": "Agent execution started"
},
message="Execution started"
)

View File

@ -2,17 +2,17 @@
import json
import uuid
import logging
from typing import List, Dict, Any, AsyncGenerator, Optional
from luxx.models import Conversation, Message
from typing import List, Dict,AsyncGenerator
from luxx.models import Conversation, Message, LLMProvider
from luxx.tools.executor import ToolExecutor
from luxx.tools.core import registry
from luxx.services.llm_client import LLMClient
from luxx.config import config
from luxx.database import SessionLocal
logger = logging.getLogger(__name__)
# Maximum iterations to prevent infinite loops
MAX_ITERATIONS = 10
MAX_ITERATIONS = 20
def _sse_event(event: str, data: dict) -> str:
@ -24,8 +24,6 @@ def get_llm_client(conversation: Conversation = None):
"""Get LLM client, optionally using conversation's provider. Returns (client, max_tokens)"""
max_tokens = None
if conversation and conversation.provider_id:
from luxx.models import LLMProvider
from luxx.database import SessionLocal
db = SessionLocal()
try:
provider = db.query(LLMProvider).filter(LLMProvider.id == conversation.provider_id).first()
@ -45,157 +43,6 @@ def get_llm_client(conversation: Conversation = None):
return client, max_tokens
class StreamContext:
"""Context for streaming response state management."""
def __init__(
self,
step_index: int = 0,
current_step_id: str = None,
current_step_idx: int = None,
current_stream_type: str = None,
full_content: str = "",
full_thinking: str = ""
):
self.step_index = step_index
self.current_step_id = current_step_id
self.current_step_idx = current_step_idx
self.current_stream_type = current_stream_type
self.full_content = full_content
self.full_thinking = full_thinking
self.all_steps = []
self.all_tool_calls = []
self.all_tool_results = []
self.tool_calls_list = []
def reset_iteration(self):
"""Reset streaming step tracker for new iteration."""
self.current_step_id = None
self.current_step_idx = None
self.current_stream_type = None
self.full_content = ""
self.full_thinking = ""
self.tool_calls_list = []
def start_stream_step(self, step_type: str) -> str:
"""Start a new streaming step. Returns the step_id."""
self.current_step_idx = self.step_index
self.current_step_id = f"step-{self.step_index}"
self.current_stream_type = step_type
self.step_index += 1
return self.current_step_id
def yield_stream_step(self, step_type: str, content: str) -> Dict[str, Any]:
"""Yield a streaming step event."""
return _sse_event("process_step", {
"step": {
"id": self.current_step_id,
"index": self.current_step_idx,
"type": step_type,
"content": content
}
})
def save_streaming_step(self):
"""Save the current streaming step to all_steps."""
if self.current_step_id is None:
return
if self.current_stream_type == "thinking":
self.all_steps.append({
"id": self.current_step_id,
"index": self.current_step_idx,
"type": "thinking",
"content": self.full_thinking
})
elif self.current_stream_type == "text":
self.all_steps.append({
"id": self.current_step_id,
"index": self.current_step_idx,
"type": "text",
"content": self.full_content
})
def handle_thinking_stream(self, delta: Dict) -> Optional[Dict]:
"""Handle reasoning/thinking delta. Returns yield_obj if step was yielded."""
reasoning = delta.get("reasoning_content", "")
if not reasoning:
return None
prev_len = len(self.full_thinking)
self.full_thinking += reasoning
if prev_len == 0: # New thinking stream started
self.start_stream_step("thinking")
return self.yield_stream_step("thinking", self.full_thinking)
def handle_text_stream(self, delta: Dict) -> Optional[Dict]:
"""Handle content delta. Returns yield_obj if step was yielded."""
content = delta.get("content", "")
if not content:
return None
prev_len = len(self.full_content)
self.full_content += content
if prev_len == 0: # New text stream started
self.start_stream_step("text")
return self.yield_stream_step("text", self.full_content)
def handle_tool_call(self) -> tuple:
"""Handle tool calls. Returns (tool_call_step_ids, tool_call_steps, yield_objs)."""
tool_call_step_ids = []
tool_call_steps = []
yield_objs = []
for tc in self.tool_calls_list:
call_step_idx = self.step_index
call_step_id = f"step-{self.step_index}"
tool_call_step_ids.append(call_step_id)
self.step_index += 1
call_step = {
"id": call_step_id,
"index": call_step_idx,
"type": "tool_call",
"id_ref": tc.get("id", ""),
"name": tc["function"]["name"],
"arguments": tc["function"]["arguments"]
}
tool_call_steps.append(call_step)
yield_objs.append(_sse_event("process_step", {"step": call_step}))
return tool_call_step_ids, tool_call_steps, yield_objs
def handle_tool_result(self, tool_result: Dict, tool_call_step_id: str) -> tuple:
"""Handle single tool result. Returns (result_step, yield_obj)."""
result_step_idx = self.step_index
result_step_id = f"step-{self.step_index}"
self.step_index += 1
content = tool_result.get("content", "")
success = True
try:
content_obj = json.loads(content)
if isinstance(content_obj, dict):
success = content_obj.get("success", True)
except:
pass
result_step = {
"id": result_step_id,
"index": result_step_idx,
"type": "tool_result",
"id_ref": tool_call_step_id,
"name": tool_result.get("name", ""),
"content": content,
"success": success
}
return result_step, _sse_event("process_step", {"step": result_step})
class ChatService:
"""Chat service with tool support"""
@ -208,8 +55,6 @@ class ChatService:
include_system: bool = True
) -> List[Dict[str, str]]:
"""Build message list"""
from luxx.database import SessionLocal
from luxx.models import Message
messages = []
@ -280,6 +125,12 @@ class ChatService:
# 直接使用 provider 的 max_tokens
max_tokens = provider_max_tokens
# State tracking
all_steps = []
all_tool_calls = []
all_tool_results = []
step_index = 0
# Token usage tracking
total_usage = {
"prompt_tokens": 0,
@ -287,12 +138,23 @@ class ChatService:
"total_tokens": 0
}
# Streaming context for state management
ctx = StreamContext()
# Global step IDs for thinking and text (persist across iterations)
thinking_step_id = None
thinking_step_idx = None
text_step_id = None
text_step_idx = None
for iteration in range(MAX_ITERATIONS):
# Reset streaming context for this iteration
ctx.reset_iteration()
for _ in range(MAX_ITERATIONS):
# Stream from LLM
full_content = ""
full_thinking = ""
tool_calls_list = []
# Step tracking - use unified step-{index} format
thinking_step_id = None
thinking_step_idx = None
text_step_id = None
text_step_idx = None
async for sse_line in llm.stream_call(
model=model,
@ -347,65 +209,114 @@ class ChatService:
# Get delta
choices = chunk.get("choices", [])
if not choices:
delta = None
if choices:
delta = choices[0].get("delta", {})
# If no delta but has message (non-streaming response)
if not delta:
message = choices[0].get("message", {})
if message.get("content"):
delta = {"content": message["content"]}
if not delta:
# Check if there's any content in the response (for non-standard LLM responses)
if chunk.get("content") or chunk.get("message"):
content = chunk.get("content") or chunk.get("message", {}).get("content", "")
if content:
prev_len = len(ctx.full_content)
ctx.full_content += content
if prev_len == 0: # New text stream started
ctx.start_stream_step("text")
yield _sse_event("process_step", {
"step": {
"id": ctx.current_step_id if prev_len == 0 else f"step-{ctx.step_index - 1}",
"index": ctx.current_step_idx if prev_len == 0 else ctx.step_index - 1,
"type": "text",
"content": ctx.full_content
}
})
continue
content = chunk.get("content") or chunk.get("message", {}).get("content", "")
if content:
delta = {"content": content}
delta = choices[0].get("delta", {})
# Handle reasoning (thinking)
yield_obj = ctx.handle_thinking_stream(delta)
if yield_obj:
yield yield_obj
# Handle content
yield_obj = ctx.handle_text_stream(delta)
if yield_obj:
yield yield_obj
# Accumulate tool calls
tool_calls_delta = delta.get("tool_calls", [])
for tc in tool_calls_delta:
idx = tc.get("index", 0)
if idx >= len(ctx.tool_calls_list):
ctx.tool_calls_list.append({
"id": tc.get("id", ""),
"type": "function",
"function": {"name": "", "arguments": ""}
if delta:
# Handle reasoning (thinking)
reasoning = delta.get("reasoning_content", "")
if reasoning:
prev_thinking_len = len(full_thinking)
full_thinking += reasoning
if prev_thinking_len == 0: # New thinking stream started
thinking_step_idx = step_index
thinking_step_id = f"step-{step_index}"
step_index += 1
yield _sse_event("process_step", {
"step": {
"id": thinking_step_id,
"index": thinking_step_idx,
"type": "thinking",
"content": full_thinking
}
})
func = tc.get("function", {})
if func.get("name"):
ctx.tool_calls_list[idx]["function"]["name"] += func["name"]
if func.get("arguments"):
ctx.tool_calls_list[idx]["function"]["arguments"] += func["arguments"]
# Handle content
content = delta.get("content", "")
if content:
prev_content_len = len(full_content)
full_content += content
if prev_content_len == 0: # New text stream started
text_step_idx = step_index
text_step_id = f"step-{step_index}"
step_index += 1
yield _sse_event("process_step", {
"step": {
"id": text_step_id,
"index": text_step_idx,
"type": "text",
"content": full_content
}
})
# Accumulate tool calls
tool_calls_delta = delta.get("tool_calls", [])
for tc in tool_calls_delta:
idx = tc.get("index", 0)
if idx >= len(tool_calls_list):
tool_calls_list.append({
"id": tc.get("id", ""),
"type": "function",
"function": {"name": "", "arguments": ""}
})
func = tc.get("function", {})
if func.get("name"):
tool_calls_list[idx]["function"]["name"] += func["name"]
if func.get("arguments"):
tool_calls_list[idx]["function"]["arguments"] += func["arguments"]
# Save streaming step (thinking or text)
ctx.save_streaming_step()
# Save thinking step
if thinking_step_id is not None:
all_steps.append({
"id": thinking_step_id,
"index": thinking_step_idx,
"type": "thinking",
"content": full_thinking
})
# Save text step
if text_step_id is not None:
all_steps.append({
"id": text_step_id,
"index": text_step_idx,
"type": "text",
"content": full_content
})
# Handle tool calls
if ctx.tool_calls_list:
ctx.all_tool_calls.extend(ctx.tool_calls_list)
if tool_calls_list:
all_tool_calls.extend(tool_calls_list)
# Handle tool_call steps
tool_call_step_ids, tool_call_steps, yield_objs = ctx.handle_tool_call()
ctx.all_steps.extend(tool_call_steps)
for yield_obj in yield_objs:
yield yield_obj
# Yield tool_call steps - use unified step-{index} format
tool_call_step_ids = [] # Track step IDs for tool calls
for tc in tool_calls_list:
call_step_idx = step_index
call_step_id = f"step-{step_index}"
tool_call_step_ids.append(call_step_id)
step_index += 1
call_step = {
"id": call_step_id,
"index": call_step_idx,
"type": "tool_call",
"id_ref": tc.get("id", ""),
"name": tc["function"]["name"],
"arguments": tc["function"]["arguments"]
}
all_steps.append(call_step)
yield _sse_event("process_step", {"step": call_step})
# Execute tools
tool_context = {
@ -415,17 +326,38 @@ class ChatService:
"user_permission_level": user_permission_level
}
tool_results = self.tool_executor.process_tool_calls_parallel(
ctx.tool_calls_list, tool_context
tool_calls_list, tool_context
)
# Handle tool_result steps
# Yield tool_result steps - use unified step-{index} format
for i, tr in enumerate(tool_results):
tool_call_step_id = tool_call_step_ids[i] if i < len(tool_call_step_ids) else f"step-{i}"
result_step, yield_obj = ctx.handle_tool_result(tr, tool_call_step_id)
ctx.all_steps.append(result_step)
yield yield_obj
result_step_idx = step_index
result_step_id = f"step-{step_index}"
step_index += 1
ctx.all_tool_results.append({
content = tr.get("content", "")
success = True
try:
content_obj = json.loads(content)
if isinstance(content_obj, dict):
success = content_obj.get("success", True)
except:
pass
result_step = {
"id": result_step_id,
"index": result_step_idx,
"type": "tool_result",
"id_ref": tool_call_step_id, # Reference to the tool_call step
"name": tr.get("name", ""),
"content": content,
"success": success
}
all_steps.append(result_step)
yield _sse_event("process_step", {"step": result_step})
all_tool_results.append({
"role": "tool",
"tool_call_id": tr.get("tool_call_id", ""),
"content": tr.get("content", "")
@ -434,27 +366,26 @@ class ChatService:
# Add assistant message with tool calls for next iteration
messages.append({
"role": "assistant",
"content": ctx.full_content or "",
"tool_calls": ctx.tool_calls_list
"content": full_content or "",
"tool_calls": tool_calls_list
})
messages.extend(ctx.all_tool_results[-len(tool_results):])
ctx.all_tool_results = []
messages.extend(all_tool_results[-len(tool_results):])
all_tool_results = []
continue
# No tool calls - final iteration, save message
msg_id = str(uuid.uuid4())
# 使用 API 返回的真实 completion_tokens如果 API 没返回则降级使用估算值
actual_token_count = total_usage.get("completion_tokens", 0) or len(ctx.full_content) // 4
logger.info(f"[TOKEN] total_usage: {total_usage}, actual_token_count: {actual_token_count}")
actual_token_count = total_usage.get("completion_tokens", 0)
logger.info(f"total_usage: {total_usage}")
self._save_message(
conversation.id,
msg_id,
ctx.full_content,
ctx.all_tool_calls,
ctx.all_tool_results,
ctx.all_steps,
full_content,
all_tool_calls,
all_tool_results,
all_steps,
actual_token_count,
total_usage
)
@ -467,15 +398,15 @@ class ChatService:
return
# Max iterations exceeded - save message before error
if ctx.full_content or ctx.all_tool_calls:
if full_content or all_tool_calls:
msg_id = str(uuid.uuid4())
self._save_message(
conversation.id,
msg_id,
ctx.full_content,
ctx.all_tool_calls,
ctx.all_tool_results,
ctx.all_steps,
full_content,
all_tool_calls,
all_tool_results,
all_steps,
actual_token_count,
total_usage
)
@ -496,8 +427,7 @@ class ChatService:
usage: dict = None
):
"""Save the assistant message to database."""
from luxx.database import SessionLocal
from luxx.models import Message
content_json = {
"text": full_content,

View File

@ -143,30 +143,23 @@ class LLMClient:
tools: Optional[List[Dict]] = None,
**kwargs
) -> AsyncGenerator[str, None]:
"""Stream call LLM API - yields raw SSE event lines
Yields:
str: Raw SSE event lines for direct forwarding
"""
"""Stream call LLM API - yields raw SSE event lines"""
body = self._build_body(model, messages, tools, stream=True, **kwargs)
logger.info(f"Starting stream_call for model: {model}, messages count: {len(messages)}")
try:
async with httpx.AsyncClient(timeout=120.0) as client:
logger.info(f"Sending request to {self.api_url}")
async with client.stream(
"POST",
self.api_url,
headers=self._build_headers(),
json=body
) as response:
logger.info(f"Response status: {response.status_code}")
response.raise_for_status()
async for line in response.aiter_lines():
if line.strip():
yield line + "\n"
logger.info(f"response finish with : {response.status_code}")
except httpx.HTTPStatusError as e:
status_code = e.response.status_code if e.response else "?"
logger.error(f"HTTP error: {status_code}")

View File

@ -5,5 +5,6 @@ from luxx.tools.builtin import code
from luxx.tools.builtin import data
from luxx.tools.builtin import file
from luxx.tools.builtin import shell
from luxx.tools.builtin import task
__all__ = ["crawler", "code", "data", "file", "shell"]
__all__ = ["crawler", "code", "data", "file", "shell", "task"]

429
luxx/tools/builtin/task.py Normal file
View File

@ -0,0 +1,429 @@
"""Task management tools for LLM agent"""
from typing import Dict, Any, Optional
from luxx.agent.task import task_service, TaskStatus, StepStatus
from luxx.tools.factory import tool
from luxx.tools.core import ToolContext
# Current active task ID (session level)
_current_task_id: Optional[str] = None
def get_current_task_id() -> Optional[str]:
"""Get current active task ID"""
return _current_task_id
def set_current_task_id(task_id: Optional[str]):
"""Set current active task ID"""
global _current_task_id
_current_task_id = task_id
@tool(
name="set_task",
description=(
"Set up or create a task. "
"Use this tool when user requests a goal to accomplish. "
"Parameters: "
"- name: Task name (short description) "
"- goal: Task goal (detailed description of what to accomplish) "
"- description: Optional detailed description "
"- steps: Optional list of task steps, each with name and description"
),
parameters={
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "Task name (short description)"
},
"goal": {
"type": "string",
"description": "Task goal (detailed description)"
},
"description": {
"type": "string",
"description": "Optional detailed description"
},
"steps": {
"type": "array",
"description": "Optional list of task steps",
"items": {
"type": "object",
"properties": {
"name": {"type": "string", "description": "Step name"},
"description": {"type": "string", "description": "Step description"},
"depends_on": {
"type": "array",
"description": "List of step IDs this step depends on",
"items": {"type": "string"}
}
},
"required": ["name"]
}
}
},
"required": ["name", "goal"]
},
category="task",
required_params=["name", "goal"]
)
def set_task(arguments: dict, context: ToolContext = None) -> Dict[str, Any]:
"""Set up or create a task"""
name = arguments.get("name", "")
goal = arguments.get("goal", "")
description = arguments.get("description", "")
steps_data = arguments.get("steps", [])
# Convert steps data
steps = None
if steps_data:
steps = []
for step_data in steps_data:
step = {
"name": step_data.get("name", ""),
"description": step_data.get("description", "")
}
if "depends_on" in step_data:
step["depends_on"] = step_data["depends_on"]
steps.append(step)
# Create task
task = task_service.create_task(
name=name,
goal=goal,
description=description,
steps=steps
)
if not task:
return {"error": "Failed to create task."}
# Add extra steps if needed
if steps_data and len(steps_data) > len(task.steps):
additional_steps = steps_data[len(task.steps):]
task_service.add_steps(task.id, additional_steps)
task = task_service.get_task(task.id)
# Update task status to READY
task_service.update_task_status(task.id, TaskStatus.READY)
# Set current active task
set_current_task_id(task.id)
# Build task info
task_info = {
"task_id": task.id,
"name": task.name,
"goal": task.goal,
"status": "ready",
"steps": [
{
"id": s.id,
"name": s.name,
"description": s.description,
"status": s.status.value
}
for s in task.steps
]
}
return {
"message": (f"Task '{name}' has been set up successfully."),
"task": task_info
}
@tool(
name="add_task_steps",
description=(
"Add steps to an existing task. "
"Use when you need to add more steps to an already created task. "
"Parameters: "
"- steps: List of steps, each with name and description"
),
parameters={
"type": "object",
"properties": {
"steps": {
"type": "array",
"description": "List of steps to add",
"items": {
"type": "object",
"properties": {
"name": {"type": "string", "description": "Step name"},
"description": {"type": "string", "description": "Step description"},
"depends_on": {
"type": "array",
"description": "List of step IDs this step depends on",
"items": {"type": "string"}
}
},
"required": ["name"]
}
}
},
"required": ["steps"]
},
category="task",
required_params=["steps"]
)
def add_task_steps(arguments: dict, context: ToolContext = None) -> Dict[str, Any]:
"""Add steps to a task"""
task_id = get_current_task_id()
if not task_id:
return {"error": "No active task found. Use set_task first."}
task = task_service.get_task(task_id)
if not task:
return {"error": (f"Task not found: {task_id}")}
steps_data = arguments.get("steps", [])
# Convert steps data
steps = []
for step_data in steps_data:
step = {
"name": step_data.get("name", ""),
"description": step_data.get("description", "")
}
if "depends_on" in step_data:
step["depends_on"] = step_data["depends_on"]
steps.append(step)
# Add steps
added_steps = task_service.add_steps(task_id, steps)
if not added_steps:
return {"error": "Failed to add steps."}
return {
"message": (f"Added {len(added_steps)} step(s) to task '{task.name}'."),
"steps": [
{
"id": s.id,
"name": s.name,
"description": s.description,
"status": s.status.value
}
for s in added_steps
]
}
@tool(
name="update_step_status",
description=(
"Update task step status. "
"Use when completing a step or when a step fails. "
"Parameters: "
"- step_id: Step ID "
"- status: New status (pending/running/completed/failed/skipped)"
),
parameters={
"type": "object",
"properties": {
"step_id": {
"type": "string",
"description": "Step ID"
},
"status": {
"type": "string",
"description": "New status: pending, running, completed, failed, skipped"
},
"result": {
"type": "object",
"description": "Optional step execution result"
}
},
"required": ["step_id", "status"]
},
category="task",
required_params=["step_id", "status"]
)
def update_step_status(arguments: dict, context: ToolContext = None) -> Dict[str, Any]:
"""Update step status"""
task_id = get_current_task_id()
if not task_id:
return {"error": "No active task."}
task = task_service.get_task(task_id)
if not task:
return {"error": "Task not found."}
step_id = arguments.get("step_id")
status_str = arguments.get("status", "").lower()
result = arguments.get("result")
# Find step
step = None
for s in task.steps:
if s.id == step_id:
step = s
break
if not step:
return {"error": (f"Step '{step_id}' not found.")}
# Update status
from datetime import datetime
try:
step.status = StepStatus(status_str)
step.updated_at = datetime.now()
if result:
step.result = result
except ValueError:
valid_statuses = [s.value for s in StepStatus]
return {"error": (f"Invalid status '{status_str}'. Valid values: {valid_statuses}")}
task.updated_at = datetime.now()
return {
"message": (f"Step '{step.name}' status updated to '{status_str}'."),
"step": {
"id": step.id,
"name": step.name,
"status": step.status.value,
"result": step.result
}
}
@tool(
name="complete_task",
description=(
"Complete a task. "
"Use this tool to mark a task as completed when all steps are done. "
"Parameters: "
"- result: Optional final result summary "
"- success: Whether task completed successfully (default true)"
),
parameters={
"type": "object",
"properties": {
"result": {
"type": "string",
"description": "Optional final result summary"
},
"success": {
"type": "boolean",
"description": "Whether task completed successfully (default true)"
}
},
"required": []
},
category="task",
required_params=[]
)
def complete_task(arguments: dict, context: ToolContext = None) -> Dict[str, Any]:
"""Complete a task"""
task_id = get_current_task_id()
if not task_id:
return {"error": "No active task."}
task = task_service.get_task(task_id)
if not task:
return {"error": "Task not found."}
success = arguments.get("success", True)
result_summary = arguments.get("result", "")
# Check if all steps are completed
incomplete_steps = [s for s in task.steps if s.status not in [StepStatus.COMPLETED, StepStatus.SKIPPED]]
if incomplete_steps and success:
return {
"warning": (f"Task has {len(incomplete_steps)} incomplete step(s)."),
"incomplete_steps": [{"id": s.id, "name": s.name, "status": s.status.value} for s in incomplete_steps]
}
# Update task status
from datetime import datetime
final_status = TaskStatus.TERMINATED if success else TaskStatus.BLOCK
task_service.update_task_status(
task_id,
final_status,
result={"summary": result_summary, "success": success}
)
# Build completion summary
completed_steps = [s for s in task.steps if s.status == StepStatus.COMPLETED]
summary = {
"task_id": task.id,
"name": task.name,
"status": "completed" if success else "failed",
"completed_steps": len(completed_steps),
"total_steps": len(task.steps),
"result": result_summary
}
# Clear current active task
set_current_task_id(None)
return {
"message": (f"Task '{task.name}' has been {'completed successfully' if success else 'marked as failed'}."),
"summary": summary
}
@tool(
name="get_task_status",
description=(
"Get current task status. "
"View details of the current task including all step statuses."
),
parameters={
"type": "object",
"properties": {},
"required": []
},
category="task",
required_params=[]
)
def get_task_status(arguments: dict, context: ToolContext = None) -> Dict[str, Any]:
"""Get task status"""
task_id = get_current_task_id()
if not task_id:
return {"active": False, "message": "No active task."}
task = task_service.get_task(task_id)
if not task:
return {"active": False, "message": "Task not found."}
return {
"active": True,
"task_id": task.id,
"name": task.name,
"goal": task.goal,
"status": task.status.value,
"steps": [
{
"id": s.id,
"name": s.name,
"description": s.description,
"status": s.status.value,
"result": s.result
}
for s in task.steps
],
"pending_steps": len([s for s in task.steps if s.status == StepStatus.PENDING]),
"completed_steps": len([s for s in task.steps if s.status == StepStatus.COMPLETED])
}
# Export all task tools
__all__ = [
"set_task",
"add_task_steps",
"update_step_status",
"complete_task",
"get_task_status",
"get_current_task_id",
"set_current_task_id"
]