550 lines
17 KiB
Python
550 lines
17 KiB
Python
"""Task module for autonomous task execution and planning"""
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime
|
|
from enum import Enum
|
|
import logging
|
|
from typing import List, Optional, Dict, Any, AsyncGenerator, Callable
|
|
import json
|
|
|
|
from luxx.utils.helpers import generate_id
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class TaskStatus(Enum):
|
|
"""Task status - 5-state model"""
|
|
PENDING = "pending" # 创建/就绪
|
|
RUNNING = "running" # 运行中
|
|
BLOCK = "block" # 阻塞
|
|
TERMINATED = "terminated" # 终止
|
|
COMPLETED = "completed" # 完成
|
|
|
|
|
|
class StepStatus(Enum):
|
|
"""Step status"""
|
|
PENDING = "pending"
|
|
RUNNING = "running"
|
|
COMPLETED = "completed"
|
|
FAILED = "failed"
|
|
SKIPPED = "skipped"
|
|
|
|
|
|
@dataclass
|
|
class Step:
|
|
"""Task step"""
|
|
id: str
|
|
name: str
|
|
description: str = ""
|
|
goal: str = "" # 步骤目标(用于 LLM 执行)
|
|
depends_on: List[str] = field(default_factory=list)
|
|
status: StepStatus = StepStatus.PENDING
|
|
result: Optional[Dict[str, Any]] = None
|
|
tool_calls: List[Dict[str, Any]] = field(default_factory=list)
|
|
context: Dict[str, Any] = field(default_factory=dict)
|
|
created_at: datetime = field(default_factory=datetime.now)
|
|
updated_at: datetime = field(default_factory=datetime.now)
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
return {
|
|
"id": self.id,
|
|
"name": self.name,
|
|
"description": self.description,
|
|
"goal": self.goal,
|
|
"depends_on": self.depends_on,
|
|
"status": self.status.value,
|
|
"result": self.result,
|
|
"tool_calls": self.tool_calls,
|
|
"context": self.context,
|
|
"created_at": self.created_at.isoformat() if self.created_at else None,
|
|
"updated_at": self.updated_at.isoformat() if self.updated_at else None
|
|
}
|
|
|
|
@classmethod
|
|
def from_dict(cls, data: Dict[str, Any]) -> "Step":
|
|
return cls(
|
|
id=data.get("id", generate_id("step")),
|
|
name=data.get("name", ""),
|
|
description=data.get("description", ""),
|
|
goal=data.get("goal", ""),
|
|
depends_on=data.get("depends_on", []),
|
|
status=StepStatus(data.get("status", "pending")),
|
|
result=data.get("result"),
|
|
tool_calls=data.get("tool_calls", []),
|
|
context=data.get("context", {})
|
|
)
|
|
|
|
|
|
@dataclass
|
|
class Task:
|
|
"""Task entity with built-in planning capability"""
|
|
id: str
|
|
name: str
|
|
description: str = ""
|
|
goal: str = ""
|
|
status: TaskStatus = TaskStatus.PENDING
|
|
steps: List[Step] = field(default_factory=list)
|
|
subtasks: List["Task"] = field(default_factory=list)
|
|
result: Optional[Dict[str, Any]] = None
|
|
metadata: Dict[str, Any] = field(default_factory=dict)
|
|
created_at: datetime = field(default_factory=datetime.now)
|
|
updated_at: datetime = field(default_factory=datetime.now)
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
return {
|
|
"id": self.id,
|
|
"name": self.name,
|
|
"description": self.description,
|
|
"goal": self.goal,
|
|
"status": self.status.value,
|
|
"steps": [s.to_dict() for s in self.steps],
|
|
"subtasks": [t.to_dict() for t in self.subtasks],
|
|
"result": self.result,
|
|
"metadata": self.metadata,
|
|
"created_at": self.created_at.isoformat() if self.created_at else None,
|
|
"updated_at": self.updated_at.isoformat() if self.updated_at else None
|
|
}
|
|
|
|
@classmethod
|
|
def from_dict(cls, data: Dict[str, Any]) -> "Task":
|
|
steps = [Step.from_dict(s) if isinstance(s, dict) else s for s in data.get("steps", [])]
|
|
subtasks = [cls.from_dict(t) if isinstance(t, dict) else t for t in data.get("subtasks", [])]
|
|
|
|
return cls(
|
|
id=data.get("id", generate_id("task")),
|
|
name=data.get("name", ""),
|
|
description=data.get("description", ""),
|
|
goal=data.get("goal", ""),
|
|
status=TaskStatus(data.get("status", "pending")),
|
|
steps=steps,
|
|
subtasks=subtasks,
|
|
result=data.get("result"),
|
|
metadata=data.get("metadata", {})
|
|
)
|
|
|
|
def get_step(self, step_id: str) -> Optional[Step]:
|
|
for step in self.steps:
|
|
if step.id == step_id:
|
|
return step
|
|
return None
|
|
|
|
def get_completed_step_ids(self) -> List[str]:
|
|
return [s.id for s in self.steps if s.status == StepStatus.COMPLETED]
|
|
|
|
def is_complete(self) -> bool:
|
|
return all(s.status in (StepStatus.COMPLETED, StepStatus.SKIPPED) for s in self.steps)
|
|
|
|
|
|
class TaskGraph:
|
|
"""Task graph for managing step dependencies"""
|
|
|
|
def __init__(self, task: Task):
|
|
self.task = task
|
|
self._adjacency: Dict[str, List[str]] = {}
|
|
self._reverse_adjacency: Dict[str, List[str]] = {}
|
|
self._in_degree: Dict[str, int] = {}
|
|
self._build_graph()
|
|
|
|
def _build_graph(self) -> None:
|
|
for step in self.task.steps:
|
|
self._adjacency[step.id] = []
|
|
self._reverse_adjacency[step.id] = []
|
|
self._in_degree[step.id] = 0
|
|
|
|
for step in self.task.steps:
|
|
for dep_id in step.depends_on:
|
|
if dep_id in self._adjacency:
|
|
self._adjacency[dep_id].append(step.id)
|
|
self._reverse_adjacency[step.id].append(dep_id)
|
|
self._in_degree[step.id] += 1
|
|
|
|
def topological_sort(self) -> List[Step]:
|
|
in_degree = self._in_degree.copy()
|
|
queue = [step_id for step_id, degree in in_degree.items() if degree == 0]
|
|
result = []
|
|
step_map = {step.id: step for step in self.task.steps}
|
|
|
|
while queue:
|
|
queue.sort()
|
|
current = queue.pop(0)
|
|
result.append(step_map[current])
|
|
|
|
for dependent_id in self._adjacency[current]:
|
|
in_degree[dependent_id] -= 1
|
|
if in_degree[dependent_id] == 0:
|
|
queue.append(dependent_id)
|
|
|
|
return result
|
|
|
|
def get_ready_steps(self, completed_step_ids: List[str] = None) -> List[Step]:
|
|
if completed_step_ids is None:
|
|
completed_step_ids = self.task.get_completed_step_ids()
|
|
|
|
step_map = {step.id: step for step in self.task.steps}
|
|
ready = []
|
|
|
|
for step in self.task.steps:
|
|
if step.status != StepStatus.PENDING:
|
|
continue
|
|
deps_completed = all(dep_id in completed_step_ids for dep_id in step.depends_on)
|
|
if deps_completed:
|
|
ready.append(step)
|
|
|
|
return ready
|
|
|
|
def get_parallel_levels(self) -> List[List[Step]]:
|
|
"""Get steps grouped by dependency level for parallel execution"""
|
|
completed = set(self.task.get_completed_step_ids())
|
|
levels: List[List[Step]] = []
|
|
remaining = {s.id for s in self.task.steps if s.status == StepStatus.PENDING}
|
|
|
|
while remaining:
|
|
current_level = []
|
|
for step_id in list(remaining):
|
|
step = self.task.get_step(step_id)
|
|
if step and all(dep_id in completed for dep_id in step.depends_on):
|
|
current_level.append(step)
|
|
|
|
if not current_level:
|
|
break
|
|
|
|
levels.append(current_level)
|
|
for step in current_level:
|
|
remaining.remove(step.id)
|
|
completed.add(step.id)
|
|
|
|
return levels
|
|
|
|
def detect_cycles(self) -> List[List[str]]:
|
|
WHITE, GRAY, BLACK = 0, 1, 2
|
|
color = {step.id: WHITE for step in self.task.steps}
|
|
cycles = []
|
|
|
|
def dfs(node: str, path: List[str]) -> bool:
|
|
color[node] = GRAY
|
|
path.append(node)
|
|
|
|
for neighbor in self._adjacency.get(node, []):
|
|
if color[neighbor] == GRAY:
|
|
cycle_start = path.index(neighbor)
|
|
cycles.append(path[cycle_start:] + [neighbor])
|
|
return True
|
|
elif color[neighbor] == WHITE:
|
|
if dfs(neighbor, path):
|
|
return True
|
|
|
|
path.pop()
|
|
color[node] = BLACK
|
|
return False
|
|
|
|
for step in self.task.steps:
|
|
if color[step.id] == WHITE:
|
|
dfs(step.id, [])
|
|
|
|
return cycles
|
|
|
|
def validate(self) -> tuple[bool, Optional[str]]:
|
|
cycles = self.detect_cycles()
|
|
if cycles:
|
|
return False, f"Circular dependency detected: {cycles[0]}"
|
|
|
|
step_ids = {step.id for step in self.task.steps}
|
|
for step in self.task.steps:
|
|
for dep_id in step.depends_on:
|
|
if dep_id not in step_ids:
|
|
return False, f"Step '{step.name}' depends on non-existent step '{dep_id}'"
|
|
|
|
return True, None
|
|
|
|
|
|
class TaskService:
|
|
"""Task service for managing tasks"""
|
|
|
|
def __init__(self):
|
|
self._tasks: Dict[str, Task] = {}
|
|
|
|
def create_task(
|
|
self,
|
|
name: str,
|
|
goal: str,
|
|
description: str = "",
|
|
steps: List[Dict[str, Any]] = None
|
|
) -> Task:
|
|
task_id = generate_id("task")
|
|
task = Task(
|
|
id=task_id,
|
|
name=name,
|
|
description=description,
|
|
goal=goal
|
|
)
|
|
|
|
if steps:
|
|
for step_data in steps:
|
|
step = Step(
|
|
id=generate_id("step"),
|
|
name=step_data.get("name", ""),
|
|
description=step_data.get("description", ""),
|
|
goal=step_data.get("goal", "")
|
|
)
|
|
task.steps.append(step)
|
|
|
|
self._tasks[task_id] = task
|
|
logger.info(f"Created task: {task_id}")
|
|
return task
|
|
|
|
def get_task(self, task_id: str) -> Optional[Task]:
|
|
return self._tasks.get(task_id)
|
|
|
|
def list_tasks(self) -> List[Task]:
|
|
return list(self._tasks.values())
|
|
|
|
def update_task_status(
|
|
self,
|
|
task_id: str,
|
|
status: TaskStatus,
|
|
result: Any = None
|
|
) -> Optional[Task]:
|
|
task = self._tasks.get(task_id)
|
|
if not task:
|
|
return None
|
|
|
|
task.status = status
|
|
task.result = result
|
|
task.updated_at = datetime.now()
|
|
return task
|
|
|
|
def update_step_status(
|
|
self,
|
|
task_id: str,
|
|
step_id: str,
|
|
status: StepStatus,
|
|
result: Any = None
|
|
) -> Optional[Step]:
|
|
task = self._tasks.get(task_id)
|
|
if not task:
|
|
return None
|
|
|
|
step = task.get_step(step_id)
|
|
if not step:
|
|
return None
|
|
|
|
step.status = status
|
|
step.result = result
|
|
step.updated_at = datetime.now()
|
|
task.updated_at = datetime.now()
|
|
return step
|
|
|
|
def add_steps(
|
|
self,
|
|
task_id: str,
|
|
steps: List[Dict[str, Any]]
|
|
) -> Optional[List[Step]]:
|
|
task = self._tasks.get(task_id)
|
|
if not task:
|
|
return None
|
|
|
|
result = []
|
|
for step_data in steps:
|
|
step = Step(
|
|
id=generate_id("step"),
|
|
name=step_data.get("name", ""),
|
|
description=step_data.get("description", ""),
|
|
goal=step_data.get("goal", ""),
|
|
depends_on=step_data.get("depends_on", [])
|
|
)
|
|
task.steps.append(step)
|
|
result.append(step)
|
|
|
|
task.updated_at = datetime.now()
|
|
return result
|
|
|
|
def set_steps_from_plan(
|
|
self,
|
|
task_id: str,
|
|
plan_steps: List[Dict[str, Any]]
|
|
) -> Optional[List[Step]]:
|
|
"""Set steps from LLM-generated plan"""
|
|
task = self._tasks.get(task_id)
|
|
if not task:
|
|
return None
|
|
|
|
task.steps = []
|
|
for step_data in plan_steps:
|
|
step = Step(
|
|
id=generate_id("step"),
|
|
name=step_data.get("name", ""),
|
|
description=step_data.get("description", ""),
|
|
goal=step_data.get("goal", step_data.get("name", "")),
|
|
depends_on=step_data.get("depends_on", [])
|
|
)
|
|
task.steps.append(step)
|
|
|
|
task.metadata["raw_plan"] = plan_steps
|
|
task.updated_at = datetime.now()
|
|
logger.info(f"Set {len(task.steps)} steps for task: {task_id}")
|
|
return task.steps
|
|
|
|
def delete_task(self, task_id: str) -> bool:
|
|
if task_id not in self._tasks:
|
|
return False
|
|
|
|
del self._tasks[task_id]
|
|
return True
|
|
|
|
def build_graph(self, task_id: str) -> Optional[TaskGraph]:
|
|
task = self._tasks.get(task_id)
|
|
if not task:
|
|
return None
|
|
return TaskGraph(task)
|
|
|
|
def export_task(self, task_id: str) -> Optional[str]:
|
|
task = self._tasks.get(task_id)
|
|
if not task:
|
|
return None
|
|
return json.dumps(task.to_dict(), ensure_ascii=False, indent=2)
|
|
|
|
def import_task(self, task_json: str) -> Optional[Task]:
|
|
try:
|
|
data = json.loads(task_json)
|
|
task = Task.from_dict(data)
|
|
self._tasks[task.id] = task
|
|
return task
|
|
except Exception as e:
|
|
logger.error(f"Failed to import task: {e}")
|
|
return None
|
|
|
|
|
|
class Planner:
|
|
"""LLM-based task planner - generates steps from goal"""
|
|
|
|
PLANNER_SYSTEM_PROMPT = """You are a task planning assistant. Your job is to break down complex goals into clear, actionable steps.
|
|
|
|
For a given goal, you should output a JSON array of steps. Each step should have:
|
|
- name: Short, descriptive name
|
|
- description: What this step does
|
|
- depends_on: (optional) List of step indices this depends on
|
|
|
|
Rules:
|
|
1. Steps should be atomic and focused
|
|
2. Consider dependencies between steps
|
|
3. Order steps logically
|
|
4. Use simple dependencies when needed
|
|
|
|
Output ONLY valid JSON array, no other text."""
|
|
|
|
def __init__(self, llm_client):
|
|
self.llm = llm_client
|
|
|
|
async def plan(
|
|
self,
|
|
goal: str,
|
|
context: Dict[str, Any] = None
|
|
) -> List[Dict[str, Any]]:
|
|
"""Generate a plan (list of steps) for the given goal"""
|
|
user_message = f"Goal: {goal}\n\n"
|
|
|
|
if context:
|
|
if context.get("available_tools"):
|
|
user_message += f"Available tools: {', '.join(context['available_tools'])}\n"
|
|
if context.get("constraints"):
|
|
user_message += f"Constraints: {context['constraints']}\n"
|
|
|
|
messages = [
|
|
{"role": "system", "content": self.PLANNER_SYSTEM_PROMPT},
|
|
{"role": "user", "content": user_message}
|
|
]
|
|
|
|
try:
|
|
response = await self.llm.call(messages=messages)
|
|
|
|
# Parse JSON response
|
|
plan_text = response.get("content", "")
|
|
# Extract JSON from response
|
|
start = plan_text.find("[")
|
|
end = plan_text.rfind("]") + 1
|
|
if start != -1 and end > start:
|
|
plan_json = plan_text[start:end]
|
|
steps = json.loads(plan_json)
|
|
|
|
# Normalize steps - ensure depends_on uses indices
|
|
normalized_steps = []
|
|
for i, step in enumerate(steps):
|
|
normalized = {
|
|
"name": step.get("name", f"Step {i+1}"),
|
|
"description": step.get("description", ""),
|
|
"goal": step.get("goal", step.get("description", "")),
|
|
"depends_on": []
|
|
}
|
|
|
|
# Convert index dependencies to step IDs (placeholder, will be fixed later)
|
|
deps = step.get("depends_on", [])
|
|
if isinstance(deps, list):
|
|
normalized["depends_on"] = deps
|
|
|
|
normalized_steps.append(normalized)
|
|
|
|
return normalized_steps
|
|
|
|
return []
|
|
|
|
except Exception as e:
|
|
logger.error(f"Planning failed: {e}")
|
|
return []
|
|
|
|
def plan_sync(
|
|
self,
|
|
goal: str,
|
|
context: Dict[str, Any] = None
|
|
) -> List[Dict[str, Any]]:
|
|
"""Synchronous version of plan"""
|
|
raise NotImplementedError("Use async plan() method")
|
|
|
|
|
|
class TaskRunner:
|
|
"""Execute tasks using TaskGraph"""
|
|
|
|
def __init__(self, task_service: TaskService):
|
|
self.task_service = task_service
|
|
|
|
def run_step(
|
|
self,
|
|
task_id: str,
|
|
step_id: str
|
|
) -> Optional[Step]:
|
|
"""Mark step as running"""
|
|
return self.task_service.update_step_status(
|
|
task_id, step_id, StepStatus.RUNNING
|
|
)
|
|
|
|
def complete_step(
|
|
self,
|
|
task_id: str,
|
|
step_id: str,
|
|
result: Dict[str, Any] = None
|
|
) -> Optional[Step]:
|
|
"""Mark step as completed"""
|
|
return self.task_service.update_step_status(
|
|
task_id, step_id, StepStatus.COMPLETED, result
|
|
)
|
|
|
|
def fail_step(
|
|
self,
|
|
task_id: str,
|
|
step_id: str,
|
|
error: str = ""
|
|
) -> Optional[Step]:
|
|
"""Mark step as failed"""
|
|
return self.task_service.update_step_status(
|
|
task_id, step_id, StepStatus.FAILED, {"error": error}
|
|
)
|
|
|
|
def get_next_steps(self, task_id: str) -> List[Step]:
|
|
"""Get steps ready to execute"""
|
|
graph = self.task_service.build_graph(task_id)
|
|
if not graph:
|
|
return []
|
|
return graph.get_ready_steps()
|
|
|
|
|
|
# Global task service instance
|
|
task_service = TaskService()
|