Compare commits

...

4 Commits

Author SHA1 Message Date
ViperEkura 4de03866f4 feat: 初步实现task 2026-04-16 21:21:56 +08:00
ViperEkura 119d566e89 feat: 增加task 图构建 2026-04-16 20:27:34 +08:00
ViperEkura 2f9deb40ae feat: 增加基础模型设置 2026-04-16 20:19:41 +08:00
ViperEkura 9edf4dac9c fix: 修复流式聊天部分问题 2026-04-16 16:12:59 +08:00
9 changed files with 1337 additions and 101 deletions

View File

@ -55,6 +55,49 @@
</div> </div>
</div> </div>
<!-- Task Start Step -->
<div v-else-if="item.type === 'task_start'" :key="`task-start-${item.key}`" class="step-item task-start">
<div class="step-header">
<span v-html="targetIcon"></span>
<span class="step-label">开始任务</span>
<span class="step-brief">{{ item.taskName || '新任务' }}</span>
<span class="arrow" :class="{ open: expandedKeys.has(item.key) }" v-html="chevronDown"></span>
</div>
<div v-if="expandedKeys.has(item.key)" class="step-content">
<div class="task-detail">
<div class="task-goal"><strong>目标:</strong> {{ item.goal }}</div>
<div v-if="item.steps && item.steps.length > 0" class="task-steps">
<strong>步骤:</strong>
<ol>
<li v-for="(step, idx) in item.steps" :key="idx">{{ step.name }}</li>
</ol>
</div>
</div>
</div>
</div>
<!-- Task Complete Step -->
<div v-else-if="item.type === 'task_complete'" :key="`task-complete-${item.key}`" class="step-item task-complete">
<div class="step-header">
<span v-html="checkCircleIcon"></span>
<span class="step-label">任务完成</span>
<span class="step-brief">{{ item.taskName || '任务' }}</span>
<span class="step-status">
<span v-if="item.success !== false" class="step-badge success">成功</span>
<span v-else class="step-badge error">失败</span>
<span class="arrow" :class="{ open: expandedKeys.has(item.key) }" v-html="chevronDown"></span>
</span>
</div>
<div v-if="expandedKeys.has(item.key)" class="step-content">
<div v-if="item.result" class="task-result">
<strong>结果:</strong> {{ item.result }}
</div>
<div v-if="item.summary" class="task-summary">
<span>完成步骤: {{ item.summary.completed_steps || 0 }} / {{ item.summary.total_steps || 0 }}</span>
</div>
</div>
</div>
<!-- Text Step --> <!-- Text Step -->
<div v-else-if="item.type === 'text'" :key="`text-${item.key}`" class="text-content md-content" v-html="renderMarkdown(item.content)"></div> <div v-else-if="item.type === 'text'" :key="`text-${item.key}`" class="text-content md-content" v-html="renderMarkdown(item.content)"></div>
</template> </template>
@ -167,6 +210,25 @@ const allItems = computed(() => {
index: step.index, index: step.index,
content: step.content || '', content: step.content || '',
}) })
} else if (step.type === 'task_start') {
items.push({
key: step.id || `task-start-${step.index}`,
type: 'task_start',
index: step.index,
taskName: step.taskName || step.name || '',
goal: step.goal || '',
steps: step.steps || [],
})
} else if (step.type === 'task_complete') {
items.push({
key: step.id || `task-complete-${step.index}`,
type: 'task_complete',
index: step.index,
taskName: step.taskName || step.name || '',
success: step.success,
result: step.result || '',
summary: step.summary || {},
})
} }
} }
} else if (props.toolCalls && props.toolCalls.length > 0) { } else if (props.toolCalls && props.toolCalls.length > 0) {
@ -246,6 +308,10 @@ const chevronDown = `<svg viewBox="0 0 24 24" width="16" height="16" fill="none"
const sparkleIcon = `<svg viewBox="0 0 24 24" width="14" height="14" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round"><path d="m12 3-1.912 5.813a2 2 0 0 1-1.275 1.275L3 12l5.813 1.912a2 2 0 0 1 1.275 1.275L12 21l1.912-5.813a2 2 0 0 1 1.275-1.275L21 12l-5.813-1.912a2 2 0 0 1-1.275-1.275L12 3Z"></path></svg>` const sparkleIcon = `<svg viewBox="0 0 24 24" width="14" height="14" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round"><path d="m12 3-1.912 5.813a2 2 0 0 1-1.275 1.275L3 12l5.813 1.912a2 2 0 0 1 1.275 1.275L12 21l1.912-5.813a2 2 0 0 1 1.275-1.275L21 12l-5.813-1.912a2 2 0 0 1-1.275-1.275L12 3Z"></path></svg>`
const alertIcon = `<svg viewBox="0 0 24 24" width="16" height="16" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round"><circle cx="12" cy="12" r="10"></circle><line x1="12" y1="8" x2="12" y2="12"></line><line x1="12" y1="16" x2="12.01" y2="16"></line></svg>` const alertIcon = `<svg viewBox="0 0 24 24" width="16" height="16" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round"><circle cx="12" cy="12" r="10"></circle><line x1="12" y1="8" x2="12" y2="12"></line><line x1="12" y1="16" x2="12.01" y2="16"></line></svg>`
const targetIcon = `<svg viewBox="0 0 24 24" width="16" height="16" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round"><circle cx="12" cy="12" r="10"></circle><circle cx="12" cy="12" r="6"></circle><circle cx="12" cy="12" r="2"></circle></svg>`
const checkCircleIcon = `<svg viewBox="0 0 24 24" width="16" height="16" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round"><path d="M22 11.08V12a10 10 0 1 1-5.93-9.14"></path><polyline points="22 4 12 14.01 9 11.01"></polyline></svg>`
</script> </script>
<style scoped> <style scoped>
@ -390,4 +456,20 @@ const alertIcon = `<svg viewBox="0 0 24 24" width="16" height="16" fill="none" s
.streaming-indicator { display: flex; align-items: center; gap: 8px; font-size: 12px; color: var(--text-tertiary); } .streaming-indicator { display: flex; align-items: center; gap: 8px; font-size: 12px; color: var(--text-tertiary); }
.process-block:has(.step-item) .streaming-indicator { margin-top: 8px; padding: 8px 0 0; border-top: 1px solid var(--border-light); } .process-block:has(.step-item) .streaming-indicator { margin-top: 8px; padding: 8px 0 0; border-top: 1px solid var(--border-light); }
/* Task styles */
.task-start .step-header svg:first-child { color: #3b82f6; }
.task-complete .step-header svg:first-child { color: #22c55e; }
.task-detail { font-size: 13px; }
.task-goal { margin-bottom: 8px; color: var(--text-secondary); line-height: 1.5; }
.task-goal strong { color: var(--text-primary); }
.task-steps { color: var(--text-secondary); }
.task-steps strong { color: var(--text-primary); display: block; margin-bottom: 4px; }
.task-steps ol { margin: 4px 0 0 20px; padding: 0; }
.task-steps li { margin-bottom: 4px; }
.task-result { font-size: 13px; color: var(--text-secondary); margin-bottom: 8px; }
.task-result strong { color: var(--text-primary); }
.task-summary { font-size: 12px; color: var(--text-tertiary); }
</style> </style>

View File

@ -68,6 +68,9 @@
<span>加载中...</span> <span>加载中...</span>
</div> </div>
<div v-else-if="convMessages.length || currentStreamState"> <div v-else-if="convMessages.length || currentStreamState">
<!-- 任务进度栏 -->
<TaskProgressBar v-if="currentTask" :task="currentTask" />
<!-- 历史消息 --> <!-- 历史消息 -->
<MessageBubble <MessageBubble
v-for="msg in convMessages" v-for="msg in convMessages"
@ -83,8 +86,8 @@
<div class="avatar">Luxx</div> <div class="avatar">Luxx</div>
<div class="message-content"> <div class="message-content">
<ProcessBlock <ProcessBlock
:process-steps="currentStreamState.process_steps" :process-steps="currentStreamState?.process_steps"
:streaming="currentStreamState.status === 'streaming'" :streaming="currentStreamState?.status === 'streaming'"
/> />
</div> </div>
</div> </div>
@ -207,6 +210,28 @@ const observedElements = new Set()
const editConv = ref(null) const editConv = ref(null)
//
const currentTask = ref(null)
//
watch(() => currentStreamState?.process_steps, (steps) => {
if (!steps || !steps.length) {
currentTask.value = null
return
}
// task_start
const taskStart = [...steps].reverse().find(s => s.type === 'task_start')
if (taskStart) {
currentTask.value = {
name: taskStart.taskName || taskStart.name,
goal: taskStart.goal,
status: 'running',
steps: taskStart.steps || []
}
}
}, { deep: true })
// //
const handleSend = async () => { const handleSend = async () => {
if (!newMessage.value.trim()) return if (!newMessage.value.trim()) return
@ -293,7 +318,7 @@ watch(convMessages, () => {
scrollToBottom() scrollToBottom()
}, { deep: true }) }, { deep: true })
watch(() => currentStreamState.value?.process_steps?.length, () => { watch(() => currentStreamState?.process_steps?.length, () => {
scrollToBottom() scrollToBottom()
}) })

124
luxx/agent/agent.py Normal file
View File

@ -0,0 +1,124 @@
"""Agent module for autonomous task execution"""
from enum import Enum
import logging
from dataclasses import dataclass, field
from datetime import datetime
from typing import Dict, List, Optional, Any, TYPE_CHECKING
from luxx.utils.helpers import generate_id
if TYPE_CHECKING:
from luxx.agent.task import Task
logger = logging.getLogger(__name__)
class AgentStatus(Enum):
"""Agent status"""
READY = "ready"
RUNNING = "running"
BLOCK = "block"
TERMINATED = "terminated"
@dataclass
class Agent:
"""Agent"""
id: str
name: str
description: str = ""
instructions: str = ""
tools: List[str] = field(default_factory=list)
current_task: Optional["Task"] = None
status: AgentStatus = AgentStatus.READY
result: Optional[Dict[str, Any]] = None
error: Optional[str] = None
created_at: datetime = field(default_factory=datetime.now)
updated_at: datetime = field(default_factory=datetime.now)
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary"""
return {
"id": self.id,
"name": self.name,
"description": self.description,
"instructions": self.instructions,
"tools": self.tools,
"current_task": self.current_task.to_dict() if self.current_task else None,
"status": self.status,
"result": self.result,
"error": self.error,
"created_at": self.created_at.isoformat() if self.created_at else None,
"updated_at": self.updated_at.isoformat() if self.updated_at else None
}
class AgentService:
"""Agent service for managing agent instances"""
def __init__(self):
self._agents: Dict[str, Agent] = {}
def create_agent(
self,
name: str,
description: str = "",
instructions: str = "",
tools: List[str] = None
) -> Agent:
"""Create a new agent instance"""
agent_id = generate_id("agent")
agent = Agent(
id=agent_id,
name=name,
description=description,
instructions=instructions,
tools=tools or []
)
self._agents[agent_id] = agent
logger.info(f"Created agent: {agent_id} - {name}")
return agent
def get_agent(self, agent_id: str) -> Optional[Agent]:
"""Get agent by ID"""
return self._agents.get(agent_id)
def list_agents(self) -> List[Agent]:
"""List all agents"""
return list(self._agents.values())
def delete_agent(self, agent_id: str) -> bool:
"""Delete agent by ID"""
if agent_id in self._agents:
del self._agents[agent_id]
logger.info(f"Deleted agent: {agent_id}")
return True
return False
def update_agent(
self,
agent_id: str,
name: str = None,
description: str = None,
instructions: str = None,
tools: List[str] = None
) -> Optional[Agent]:
"""Update agent configuration"""
agent = self._agents.get(agent_id)
if not agent:
return None
if name is not None:
agent.name = name
if description is not None:
agent.description = description
if instructions is not None:
agent.instructions = instructions
if tools is not None:
agent.tools = tools
agent.updated_at = datetime.now()
return agent
# Global service instances
agent_service = AgentService()

288
luxx/agent/task.py Normal file
View File

@ -0,0 +1,288 @@
"""Task module for autonomous task execution"""
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
import logging
from typing import List, Optional, Dict, Any
from luxx.utils.helpers import generate_id
logger = logging.getLogger(__name__)
class TaskStatus(Enum):
"""Task status enum"""
PENDING = "pending"
READY = "ready"
RUNNING = "running"
BLOCK = "block"
TERMINATED = "terminated"
class StepStatus(Enum):
"""Step status enum"""
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
SKIPPED = "skipped"
@dataclass
class Step:
"""Task step"""
id: str
name: str
description: str = ""
depends_on: List[str] = field(default_factory=list)
status: StepStatus = StepStatus.PENDING
result: Optional[Dict[str, Any]] = None
created_at: datetime = field(default_factory=datetime.now)
updated_at: datetime = field(default_factory=datetime.now)
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary"""
return {
"id": self.id,
"name": self.name,
"description": self.description,
"depends_on": self.depends_on,
"status": self.status.value,
"result": self.result,
"created_at": self.created_at.isoformat() if self.created_at else None,
"updated_at": self.updated_at.isoformat() if self.updated_at else None
}
@dataclass
class Task:
"""Task entity"""
id: str
name: str
description: str = ""
goal: str = ""
status: TaskStatus = TaskStatus.PENDING
steps: List[Step] = field(default_factory=list)
subtasks: List["Task"] = field(default_factory=list)
result: Optional[Dict[str, Any]] = None
created_at: datetime = field(default_factory=datetime.now)
updated_at: datetime = field(default_factory=datetime.now)
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary"""
return {
"id": self.id,
"name": self.name,
"description": self.description,
"goal": self.goal,
"status": self.status.value,
"steps": [s.to_dict() for s in self.steps],
"subtasks": [t.to_dict() for t in self.subtasks],
"result": self.result,
"created_at": self.created_at.isoformat() if self.created_at else None,
"updated_at": self.updated_at.isoformat() if self.updated_at else None
}
class TaskGraph:
"""Task graph for managing step dependencies"""
def __init__(self, task: Task):
self.task = task
self._adjacency: Dict[str, List[str]] = {}
self._reverse_adjacency: Dict[str, List[str]] = {}
self._in_degree: Dict[str, int] = {}
self._build_graph()
def _build_graph(self) -> None:
"""Build graph from task steps"""
for step in self.task.steps:
self._adjacency[step.id] = []
self._reverse_adjacency[step.id] = []
self._in_degree[step.id] = 0
for step in self.task.steps:
for dep_id in step.depends_on:
if dep_id in self._adjacency:
self._adjacency[dep_id].append(step.id)
self._reverse_adjacency[step.id].append(dep_id)
self._in_degree[step.id] += 1
def topological_sort(self) -> List[Step]:
"""Get steps in topological order"""
in_degree = self._in_degree.copy()
queue = [step_id for step_id, degree in in_degree.items() if degree == 0]
result = []
step_map = {step.id: step for step in self.task.steps}
while queue:
queue.sort()
current = queue.pop(0)
result.append(step_map[current])
for dependent_id in self._adjacency[current]:
in_degree[dependent_id] -= 1
if in_degree[dependent_id] == 0:
queue.append(dependent_id)
return result
def get_ready_steps(self, completed_step_ids: List[str]) -> List[Step]:
"""Get steps that are ready to execute"""
step_map = {step.id: step for step in self.task.steps}
ready = []
for step in self.task.steps:
if step.id in completed_step_ids:
continue
if step.status != StepStatus.PENDING:
continue
deps_completed = all(dep_id in completed_step_ids for dep_id in step.depends_on)
if deps_completed:
ready.append(step)
return ready
def detect_cycles(self) -> List[List[str]]:
"""Detect cycles in the graph"""
WHITE, GRAY, BLACK = 0, 1, 2
color = {step.id: WHITE for step in self.task.steps}
cycles = []
def dfs(node: str, path: List[str]) -> bool:
color[node] = GRAY
path.append(node)
for neighbor in self._adjacency.get(node, []):
if color[neighbor] == GRAY:
cycle_start = path.index(neighbor)
cycles.append(path[cycle_start:] + [neighbor])
return True
elif color[neighbor] == WHITE:
if dfs(neighbor, path):
return True
path.pop()
color[node] = BLACK
return False
for step in self.task.steps:
if color[step.id] == WHITE:
dfs(step.id, [])
return cycles
def validate(self) -> tuple[bool, Optional[str]]:
"""Validate the graph structure"""
cycles = self.detect_cycles()
if cycles:
return False, f"Circular dependency detected: {cycles[0]}"
step_ids = {step.id for step in self.task.steps}
for step in self.task.steps:
for dep_id in step.depends_on:
if dep_id not in step_ids:
return False, f"Step '{step.name}' depends on non-existent step '{dep_id}'"
return True, None
class TaskService:
"""Task service for managing tasks"""
def __init__(self):
self._tasks: Dict[str, Task] = {}
def create_task(
self,
name: str,
goal: str,
description: str = "",
steps: List[Dict[str, Any]] = None
) -> Task:
"""Create a new task"""
task_id = generate_id("task")
task = Task(
id=task_id,
name=name,
description=description,
goal=goal
)
if steps:
for step_data in steps:
step = Step(
id=generate_id("step"),
name=step_data.get("name", ""),
description=step_data.get("description", "")
)
task.steps.append(step)
self._tasks[task_id] = task
logger.info(f"Created task: {task_id}")
return task
def get_task(self, task_id: str) -> Optional[Task]:
"""Get task by ID"""
return self._tasks.get(task_id)
def list_tasks(self) -> List[Task]:
"""List all tasks"""
return list(self._tasks.values())
def update_task_status(
self,
task_id: str,
status: TaskStatus,
result: Any = None
) -> Optional[Task]:
"""Update task status"""
task = self._tasks.get(task_id)
if not task:
return None
task.status = status
task.result = result
task.updated_at = datetime.now()
return task
def add_steps(
self,
task_id: str,
steps: List[Dict[str, Any]]
) -> Optional[List[Step]]:
"""Add steps to task"""
task = self._tasks.get(task_id)
if not task:
return None
result = []
for step_data in steps:
step = Step(
id=generate_id("step"),
name=step_data.get("name", ""),
description=step_data.get("description", ""),
depends_on=step_data.get("depends_on", [])
)
task.steps.append(step)
result.append(step)
task.updated_at = datetime.now()
return result
def delete_task(self, task_id: str) -> bool:
"""Delete task"""
if task_id not in self._tasks:
return False
del self._tasks[task_id]
return True
def build_graph(self, task_id: str) -> Optional[TaskGraph]:
"""Build task graph for a task"""
task = self._tasks.get(task_id)
if not task:
return None
return TaskGraph(task)
task_service = TaskService()

308
luxx/routes/agent.py Normal file
View File

@ -0,0 +1,308 @@
"""Agent routes module"""
from typing import List, Optional
from fastapi import APIRouter
from pydantic import BaseModel
from luxx.agent.agent import agent_service, task_service
from luxx.agent.task import TaskStatus, StepStatus
from luxx.utils.helpers import success_response, error_response
router = APIRouter(prefix="/agent", tags=["Agent"])
# ==================== Request Models ====================
class CreateAgentRequest(BaseModel):
"""Create agent request"""
name: str
description: str = ""
instructions: str = ""
tools: List[str] = []
class UpdateAgentRequest(BaseModel):
"""Update agent request"""
name: Optional[str] = None
description: Optional[str] = None
instructions: Optional[str] = None
tools: Optional[List[str]] = None
class TaskStep(BaseModel):
"""Task step"""
name: str
description: str = ""
class CreateTaskRequest(BaseModel):
"""Create task request"""
name: str
goal: str
description: str = ""
steps: List[TaskStep] = []
class CreateSubtaskRequest(BaseModel):
"""Create subtask request"""
name: str
goal: str
description: str = ""
class UpdateTaskStatusRequest(BaseModel):
"""Update task status request"""
status: str
# ==================== Agent Endpoints ====================
@router.post("/agents", response_model=dict)
def create_agent(request: CreateAgentRequest):
"""Create a new agent instance"""
agent = agent_service.create_agent(
name=request.name,
description=request.description,
instructions=request.instructions,
tools=request.tools
)
return success_response(
data=agent.to_dict(),
message="Agent created successfully"
)
@router.get("/agents", response_model=dict)
def list_agents():
"""List all agents"""
agents = agent_service.list_agents()
return success_response(
data={
"agents": [a.to_dict() for a in agents],
"total": len(agents)
}
)
@router.get("/agents/{agent_id}", response_model=dict)
def get_agent(agent_id: str):
"""Get agent details"""
agent = agent_service.get_agent(agent_id)
if not agent:
return error_response("Agent not found", 404)
return success_response(data=agent.to_dict())
@router.put("/agents/{agent_id}", response_model=dict)
def update_agent(agent_id: str, request: UpdateAgentRequest):
"""Update agent configuration"""
agent = agent_service.update_agent(
agent_id=agent_id,
name=request.name,
description=request.description,
instructions=request.instructions,
tools=request.tools
)
if not agent:
return error_response("Agent not found", 404)
return success_response(
data=agent.to_dict(),
message="Agent updated successfully"
)
@router.delete("/agents/{agent_id}", response_model=dict)
def delete_agent(agent_id: str):
"""Delete agent instance"""
if agent_service.delete_agent(agent_id):
return success_response(message="Agent deleted successfully")
return error_response("Agent not found", 404)
# ==================== Task Endpoints ====================
@router.post("/tasks", response_model=dict)
def create_task(request: CreateTaskRequest):
"""Create a new task"""
task = task_service.create_task(
name=request.name,
goal=request.goal,
description=request.description,
steps=[s.dict() for s in request.steps] if request.steps else None
)
return success_response(
data=task.to_dict(),
message="Task created successfully"
)
@router.get("/agents/{agent_id}/tasks", response_model=dict)
def get_current_task(agent_id: str):
"""Get agent's current task"""
tasks = task_service.list_tasks(agent_id=agent_id)
if not tasks:
return success_response(
data={"task": None},
message="No task found"
)
return success_response(data={"task": tasks[0].to_dict()})
@router.get("/tasks", response_model=dict)
def list_tasks(agent_id: str = None):
"""List all tasks, optionally filtered by agent"""
tasks = task_service.list_tasks(agent_id=agent_id)
return success_response(
data={
"tasks": [t.to_dict() for t in tasks],
"total": len(tasks)
}
)
@router.get("/tasks/{task_id}", response_model=dict)
def get_task(task_id: str):
"""Get task details"""
task = task_service.get_task(task_id)
if not task:
return error_response("Task not found", 404)
return success_response(data=task.to_dict())
@router.put("/tasks/{task_id}/status", response_model=dict)
def update_task_status(task_id: str, request: UpdateTaskStatusRequest):
"""Update task status"""
try:
task_status = TaskStatus(request.status)
except ValueError:
valid_statuses = [s.value for s in TaskStatus]
return error_response(f"Invalid status: {request.status}. Valid: {valid_statuses}", 400)
task = task_service.update_task_status(task_id, task_status)
if not task:
return error_response("Task not found", 404)
return success_response(
data=task.to_dict(),
message="Task status updated"
)
@router.delete("/tasks/{task_id}", response_model=dict)
def delete_task(task_id: str):
"""Delete task"""
if task_service.delete_task(task_id):
return success_response(message="Task deleted successfully")
return error_response("Task not found", 404)
# ==================== Step Endpoints ====================
@router.post("/tasks/{task_id}/steps", response_model=dict)
def add_step(task_id: str, name: str, description: str = ""):
"""Add step to task"""
step = task_service.add_step(task_id, name, description)
if not step:
return error_response("Task not found", 404)
return success_response(
data=step.to_dict(),
message="Step added successfully"
)
@router.put("/tasks/{task_id}/steps/{step_id}/status", response_model=dict)
def update_step_status(task_id: str, step_id: str, status: str):
"""Update step status"""
try:
step_status = StepStatus(status)
except ValueError:
valid_statuses = [s.value for s in StepStatus]
return error_response(f"Invalid status: {status}. Valid: {valid_statuses}", 400)
task = task_service.get_task(task_id)
if not task:
return error_response("Task not found", 404)
for step in task.steps:
if step.id == step_id:
step.status = step_status
step.updated_at = __import__("datetime").datetime.now()
return success_response(
data=step.to_dict(),
message="Step status updated"
)
return error_response("Step not found", 404)
# ==================== Subtask Endpoints ====================
@router.post("/tasks/{task_id}/subtasks", response_model=dict)
def create_subtask(task_id: str, request: CreateSubtaskRequest):
"""Add subtask to parent task"""
subtask = task_service.add_subtask(
parent_task_id=task_id,
name=request.name,
goal=request.goal,
description=request.description
)
if not subtask:
return error_response("Parent task not found", 404)
return success_response(
data=subtask.to_dict(),
message="Subtask created successfully"
)
@router.get("/tasks/{task_id}/subtasks", response_model=dict)
def list_subtasks(task_id: str):
"""List subtasks of a task"""
task = task_service.get_task(task_id)
if not task:
return error_response("Task not found", 404)
return success_response(
data={
"subtasks": [t.to_dict() for t in task.subtasks],
"total": len(task.subtasks)
}
)
# ==================== Execution Endpoints ====================
@router.post("/agents/{agent_id}/execute", response_model=dict)
def execute_agent(agent_id: str, goal: str):
"""Trigger agent to execute a goal"""
agent = agent_service.get_agent(agent_id)
if not agent:
return error_response("Agent not found", 404)
if agent.status == "executing":
return error_response("Agent is already executing", 400)
# Update agent status
agent.status = "executing"
agent.updated_at = __import__("datetime").datetime.now()
# Create task
task = task_service.create_task(
agent_id=agent_id,
name=f"Task: {goal[:50]}...",
goal=goal,
description=f"Auto-generated task for goal: {goal}"
)
if not task:
return error_response("Failed to create task", 500)
return success_response(
data={
"agent_id": agent_id,
"task_id": task.id,
"status": "executing",
"message": "Agent execution started"
},
message="Execution started"
)

View File

@ -2,17 +2,17 @@
import json import json
import uuid import uuid
import logging import logging
from typing import List, Dict, Any, AsyncGenerator, Optional
from luxx.models import Conversation, Message from typing import List, Dict,AsyncGenerator
from luxx.models import Conversation, Message, LLMProvider
from luxx.tools.executor import ToolExecutor from luxx.tools.executor import ToolExecutor
from luxx.tools.core import registry from luxx.tools.core import registry
from luxx.services.llm_client import LLMClient from luxx.services.llm_client import LLMClient
from luxx.config import config from luxx.database import SessionLocal
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# Maximum iterations to prevent infinite loops # Maximum iterations to prevent infinite loops
MAX_ITERATIONS = 10 MAX_ITERATIONS = 20
def _sse_event(event: str, data: dict) -> str: def _sse_event(event: str, data: dict) -> str:
@ -24,8 +24,6 @@ def get_llm_client(conversation: Conversation = None):
"""Get LLM client, optionally using conversation's provider. Returns (client, max_tokens)""" """Get LLM client, optionally using conversation's provider. Returns (client, max_tokens)"""
max_tokens = None max_tokens = None
if conversation and conversation.provider_id: if conversation and conversation.provider_id:
from luxx.models import LLMProvider
from luxx.database import SessionLocal
db = SessionLocal() db = SessionLocal()
try: try:
provider = db.query(LLMProvider).filter(LLMProvider.id == conversation.provider_id).first() provider = db.query(LLMProvider).filter(LLMProvider.id == conversation.provider_id).first()
@ -57,8 +55,6 @@ class ChatService:
include_system: bool = True include_system: bool = True
) -> List[Dict[str, str]]: ) -> List[Dict[str, str]]:
"""Build message list""" """Build message list"""
from luxx.database import SessionLocal
from luxx.models import Message
messages = [] messages = []
@ -148,7 +144,7 @@ class ChatService:
text_step_id = None text_step_id = None
text_step_idx = None text_step_idx = None
for iteration in range(MAX_ITERATIONS): for _ in range(MAX_ITERATIONS):
# Stream from LLM # Stream from LLM
full_content = "" full_content = ""
full_thinking = "" full_thinking = ""
@ -213,30 +209,23 @@ class ChatService:
# Get delta # Get delta
choices = chunk.get("choices", []) choices = chunk.get("choices", [])
if not choices: delta = None
if choices:
delta = choices[0].get("delta", {})
# If no delta but has message (non-streaming response)
if not delta:
message = choices[0].get("message", {})
if message.get("content"):
delta = {"content": message["content"]}
if not delta:
# Check if there's any content in the response (for non-standard LLM responses) # Check if there's any content in the response (for non-standard LLM responses)
if chunk.get("content") or chunk.get("message"):
content = chunk.get("content") or chunk.get("message", {}).get("content", "") content = chunk.get("content") or chunk.get("message", {}).get("content", "")
if content: if content:
# BUG FIX: Update full_content so it gets saved to database delta = {"content": content}
prev_content_len = len(full_content)
full_content += content
if prev_content_len == 0: # New text stream started
text_step_idx = step_index
text_step_id = f"step-{step_index}"
step_index += 1
yield _sse_event("process_step", {
"step": {
"id": text_step_id if prev_content_len == 0 else f"step-{step_index - 1}",
"index": text_step_idx if prev_content_len == 0 else step_index - 1,
"type": "text",
"content": full_content # Always send accumulated content
}
})
continue
delta = choices[0].get("delta", {})
if delta:
# Handle reasoning (thinking) # Handle reasoning (thinking)
reasoning = delta.get("reasoning_content", "") reasoning = delta.get("reasoning_content", "")
if reasoning: if reasoning:
@ -347,7 +336,6 @@ class ChatService:
result_step_id = f"step-{step_index}" result_step_id = f"step-{step_index}"
step_index += 1 step_index += 1
# 解析 content 中的 success 状态
content = tr.get("content", "") content = tr.get("content", "")
success = True success = True
try: try:
@ -388,9 +376,8 @@ class ChatService:
# No tool calls - final iteration, save message # No tool calls - final iteration, save message
msg_id = str(uuid.uuid4()) msg_id = str(uuid.uuid4())
# 使用 API 返回的真实 completion_tokens如果 API 没返回则降级使用估算值 actual_token_count = total_usage.get("completion_tokens", 0)
actual_token_count = total_usage.get("completion_tokens", 0) or len(full_content) // 4 logger.info(f"total_usage: {total_usage}")
logger.info(f"[TOKEN] total_usage: {total_usage}, actual_token_count: {actual_token_count}")
self._save_message( self._save_message(
conversation.id, conversation.id,
@ -440,8 +427,7 @@ class ChatService:
usage: dict = None usage: dict = None
): ):
"""Save the assistant message to database.""" """Save the assistant message to database."""
from luxx.database import SessionLocal
from luxx.models import Message
content_json = { content_json = {
"text": full_content, "text": full_content,

View File

@ -143,30 +143,23 @@ class LLMClient:
tools: Optional[List[Dict]] = None, tools: Optional[List[Dict]] = None,
**kwargs **kwargs
) -> AsyncGenerator[str, None]: ) -> AsyncGenerator[str, None]:
"""Stream call LLM API - yields raw SSE event lines """Stream call LLM API - yields raw SSE event lines"""
Yields:
str: Raw SSE event lines for direct forwarding
"""
body = self._build_body(model, messages, tools, stream=True, **kwargs) body = self._build_body(model, messages, tools, stream=True, **kwargs)
logger.info(f"Starting stream_call for model: {model}, messages count: {len(messages)}")
try: try:
async with httpx.AsyncClient(timeout=120.0) as client: async with httpx.AsyncClient(timeout=120.0) as client:
logger.info(f"Sending request to {self.api_url}")
async with client.stream( async with client.stream(
"POST", "POST",
self.api_url, self.api_url,
headers=self._build_headers(), headers=self._build_headers(),
json=body json=body
) as response: ) as response:
logger.info(f"Response status: {response.status_code}")
response.raise_for_status() response.raise_for_status()
async for line in response.aiter_lines(): async for line in response.aiter_lines():
if line.strip(): if line.strip():
yield line + "\n" yield line + "\n"
logger.info(f"response finish with : {response.status_code}")
except httpx.HTTPStatusError as e: except httpx.HTTPStatusError as e:
status_code = e.response.status_code if e.response else "?" status_code = e.response.status_code if e.response else "?"
logger.error(f"HTTP error: {status_code}") logger.error(f"HTTP error: {status_code}")

View File

@ -5,5 +5,6 @@ from luxx.tools.builtin import code
from luxx.tools.builtin import data from luxx.tools.builtin import data
from luxx.tools.builtin import file from luxx.tools.builtin import file
from luxx.tools.builtin import shell from luxx.tools.builtin import shell
from luxx.tools.builtin import task
__all__ = ["crawler", "code", "data", "file", "shell"] __all__ = ["crawler", "code", "data", "file", "shell", "task"]

429
luxx/tools/builtin/task.py Normal file
View File

@ -0,0 +1,429 @@
"""Task management tools for LLM agent"""
from typing import Dict, Any, Optional
from luxx.agent.task import task_service, TaskStatus, StepStatus
from luxx.tools.factory import tool
from luxx.tools.core import ToolContext
# Current active task ID (session level)
_current_task_id: Optional[str] = None
def get_current_task_id() -> Optional[str]:
"""Get current active task ID"""
return _current_task_id
def set_current_task_id(task_id: Optional[str]):
"""Set current active task ID"""
global _current_task_id
_current_task_id = task_id
@tool(
name="set_task",
description=(
"Set up or create a task. "
"Use this tool when user requests a goal to accomplish. "
"Parameters: "
"- name: Task name (short description) "
"- goal: Task goal (detailed description of what to accomplish) "
"- description: Optional detailed description "
"- steps: Optional list of task steps, each with name and description"
),
parameters={
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "Task name (short description)"
},
"goal": {
"type": "string",
"description": "Task goal (detailed description)"
},
"description": {
"type": "string",
"description": "Optional detailed description"
},
"steps": {
"type": "array",
"description": "Optional list of task steps",
"items": {
"type": "object",
"properties": {
"name": {"type": "string", "description": "Step name"},
"description": {"type": "string", "description": "Step description"},
"depends_on": {
"type": "array",
"description": "List of step IDs this step depends on",
"items": {"type": "string"}
}
},
"required": ["name"]
}
}
},
"required": ["name", "goal"]
},
category="task",
required_params=["name", "goal"]
)
def set_task(arguments: dict, context: ToolContext = None) -> Dict[str, Any]:
"""Set up or create a task"""
name = arguments.get("name", "")
goal = arguments.get("goal", "")
description = arguments.get("description", "")
steps_data = arguments.get("steps", [])
# Convert steps data
steps = None
if steps_data:
steps = []
for step_data in steps_data:
step = {
"name": step_data.get("name", ""),
"description": step_data.get("description", "")
}
if "depends_on" in step_data:
step["depends_on"] = step_data["depends_on"]
steps.append(step)
# Create task
task = task_service.create_task(
name=name,
goal=goal,
description=description,
steps=steps
)
if not task:
return {"error": "Failed to create task."}
# Add extra steps if needed
if steps_data and len(steps_data) > len(task.steps):
additional_steps = steps_data[len(task.steps):]
task_service.add_steps(task.id, additional_steps)
task = task_service.get_task(task.id)
# Update task status to READY
task_service.update_task_status(task.id, TaskStatus.READY)
# Set current active task
set_current_task_id(task.id)
# Build task info
task_info = {
"task_id": task.id,
"name": task.name,
"goal": task.goal,
"status": "ready",
"steps": [
{
"id": s.id,
"name": s.name,
"description": s.description,
"status": s.status.value
}
for s in task.steps
]
}
return {
"message": (f"Task '{name}' has been set up successfully."),
"task": task_info
}
@tool(
name="add_task_steps",
description=(
"Add steps to an existing task. "
"Use when you need to add more steps to an already created task. "
"Parameters: "
"- steps: List of steps, each with name and description"
),
parameters={
"type": "object",
"properties": {
"steps": {
"type": "array",
"description": "List of steps to add",
"items": {
"type": "object",
"properties": {
"name": {"type": "string", "description": "Step name"},
"description": {"type": "string", "description": "Step description"},
"depends_on": {
"type": "array",
"description": "List of step IDs this step depends on",
"items": {"type": "string"}
}
},
"required": ["name"]
}
}
},
"required": ["steps"]
},
category="task",
required_params=["steps"]
)
def add_task_steps(arguments: dict, context: ToolContext = None) -> Dict[str, Any]:
"""Add steps to a task"""
task_id = get_current_task_id()
if not task_id:
return {"error": "No active task found. Use set_task first."}
task = task_service.get_task(task_id)
if not task:
return {"error": (f"Task not found: {task_id}")}
steps_data = arguments.get("steps", [])
# Convert steps data
steps = []
for step_data in steps_data:
step = {
"name": step_data.get("name", ""),
"description": step_data.get("description", "")
}
if "depends_on" in step_data:
step["depends_on"] = step_data["depends_on"]
steps.append(step)
# Add steps
added_steps = task_service.add_steps(task_id, steps)
if not added_steps:
return {"error": "Failed to add steps."}
return {
"message": (f"Added {len(added_steps)} step(s) to task '{task.name}'."),
"steps": [
{
"id": s.id,
"name": s.name,
"description": s.description,
"status": s.status.value
}
for s in added_steps
]
}
@tool(
name="update_step_status",
description=(
"Update task step status. "
"Use when completing a step or when a step fails. "
"Parameters: "
"- step_id: Step ID "
"- status: New status (pending/running/completed/failed/skipped)"
),
parameters={
"type": "object",
"properties": {
"step_id": {
"type": "string",
"description": "Step ID"
},
"status": {
"type": "string",
"description": "New status: pending, running, completed, failed, skipped"
},
"result": {
"type": "object",
"description": "Optional step execution result"
}
},
"required": ["step_id", "status"]
},
category="task",
required_params=["step_id", "status"]
)
def update_step_status(arguments: dict, context: ToolContext = None) -> Dict[str, Any]:
"""Update step status"""
task_id = get_current_task_id()
if not task_id:
return {"error": "No active task."}
task = task_service.get_task(task_id)
if not task:
return {"error": "Task not found."}
step_id = arguments.get("step_id")
status_str = arguments.get("status", "").lower()
result = arguments.get("result")
# Find step
step = None
for s in task.steps:
if s.id == step_id:
step = s
break
if not step:
return {"error": (f"Step '{step_id}' not found.")}
# Update status
from datetime import datetime
try:
step.status = StepStatus(status_str)
step.updated_at = datetime.now()
if result:
step.result = result
except ValueError:
valid_statuses = [s.value for s in StepStatus]
return {"error": (f"Invalid status '{status_str}'. Valid values: {valid_statuses}")}
task.updated_at = datetime.now()
return {
"message": (f"Step '{step.name}' status updated to '{status_str}'."),
"step": {
"id": step.id,
"name": step.name,
"status": step.status.value,
"result": step.result
}
}
@tool(
name="complete_task",
description=(
"Complete a task. "
"Use this tool to mark a task as completed when all steps are done. "
"Parameters: "
"- result: Optional final result summary "
"- success: Whether task completed successfully (default true)"
),
parameters={
"type": "object",
"properties": {
"result": {
"type": "string",
"description": "Optional final result summary"
},
"success": {
"type": "boolean",
"description": "Whether task completed successfully (default true)"
}
},
"required": []
},
category="task",
required_params=[]
)
def complete_task(arguments: dict, context: ToolContext = None) -> Dict[str, Any]:
"""Complete a task"""
task_id = get_current_task_id()
if not task_id:
return {"error": "No active task."}
task = task_service.get_task(task_id)
if not task:
return {"error": "Task not found."}
success = arguments.get("success", True)
result_summary = arguments.get("result", "")
# Check if all steps are completed
incomplete_steps = [s for s in task.steps if s.status not in [StepStatus.COMPLETED, StepStatus.SKIPPED]]
if incomplete_steps and success:
return {
"warning": (f"Task has {len(incomplete_steps)} incomplete step(s)."),
"incomplete_steps": [{"id": s.id, "name": s.name, "status": s.status.value} for s in incomplete_steps]
}
# Update task status
from datetime import datetime
final_status = TaskStatus.TERMINATED if success else TaskStatus.BLOCK
task_service.update_task_status(
task_id,
final_status,
result={"summary": result_summary, "success": success}
)
# Build completion summary
completed_steps = [s for s in task.steps if s.status == StepStatus.COMPLETED]
summary = {
"task_id": task.id,
"name": task.name,
"status": "completed" if success else "failed",
"completed_steps": len(completed_steps),
"total_steps": len(task.steps),
"result": result_summary
}
# Clear current active task
set_current_task_id(None)
return {
"message": (f"Task '{task.name}' has been {'completed successfully' if success else 'marked as failed'}."),
"summary": summary
}
@tool(
name="get_task_status",
description=(
"Get current task status. "
"View details of the current task including all step statuses."
),
parameters={
"type": "object",
"properties": {},
"required": []
},
category="task",
required_params=[]
)
def get_task_status(arguments: dict, context: ToolContext = None) -> Dict[str, Any]:
"""Get task status"""
task_id = get_current_task_id()
if not task_id:
return {"active": False, "message": "No active task."}
task = task_service.get_task(task_id)
if not task:
return {"active": False, "message": "Task not found."}
return {
"active": True,
"task_id": task.id,
"name": task.name,
"goal": task.goal,
"status": task.status.value,
"steps": [
{
"id": s.id,
"name": s.name,
"description": s.description,
"status": s.status.value,
"result": s.result
}
for s in task.steps
],
"pending_steps": len([s for s in task.steps if s.status == StepStatus.PENDING]),
"completed_steps": len([s for s in task.steps if s.status == StepStatus.COMPLETED])
}
# Export all task tools
__all__ = [
"set_task",
"add_task_steps",
"update_step_status",
"complete_task",
"get_task_status",
"get_current_task_id",
"set_current_task_id"
]