Luxx/luxx/services/task.py

550 lines
17 KiB
Python

"""Task module for autonomous task execution and planning"""
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
import logging
from typing import List, Optional, Dict, Any, AsyncGenerator, Callable
import json
from luxx.utils.helpers import generate_id
logger = logging.getLogger(__name__)
class TaskStatus(Enum):
"""Task status - 5-state model"""
PENDING = "pending" # 创建/就绪
RUNNING = "running" # 运行中
BLOCK = "block" # 阻塞
TERMINATED = "terminated" # 终止
COMPLETED = "completed" # 完成
class StepStatus(Enum):
"""Step status"""
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
SKIPPED = "skipped"
@dataclass
class Step:
"""Task step"""
id: str
name: str
description: str = ""
goal: str = "" # 步骤目标(用于 LLM 执行)
depends_on: List[str] = field(default_factory=list)
status: StepStatus = StepStatus.PENDING
result: Optional[Dict[str, Any]] = None
tool_calls: List[Dict[str, Any]] = field(default_factory=list)
context: Dict[str, Any] = field(default_factory=dict)
created_at: datetime = field(default_factory=datetime.now)
updated_at: datetime = field(default_factory=datetime.now)
def to_dict(self) -> Dict[str, Any]:
return {
"id": self.id,
"name": self.name,
"description": self.description,
"goal": self.goal,
"depends_on": self.depends_on,
"status": self.status.value,
"result": self.result,
"tool_calls": self.tool_calls,
"context": self.context,
"created_at": self.created_at.isoformat() if self.created_at else None,
"updated_at": self.updated_at.isoformat() if self.updated_at else None
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "Step":
return cls(
id=data.get("id", generate_id("step")),
name=data.get("name", ""),
description=data.get("description", ""),
goal=data.get("goal", ""),
depends_on=data.get("depends_on", []),
status=StepStatus(data.get("status", "pending")),
result=data.get("result"),
tool_calls=data.get("tool_calls", []),
context=data.get("context", {})
)
@dataclass
class Task:
"""Task entity with built-in planning capability"""
id: str
name: str
description: str = ""
goal: str = ""
status: TaskStatus = TaskStatus.PENDING
steps: List[Step] = field(default_factory=list)
subtasks: List["Task"] = field(default_factory=list)
result: Optional[Dict[str, Any]] = None
metadata: Dict[str, Any] = field(default_factory=dict)
created_at: datetime = field(default_factory=datetime.now)
updated_at: datetime = field(default_factory=datetime.now)
def to_dict(self) -> Dict[str, Any]:
return {
"id": self.id,
"name": self.name,
"description": self.description,
"goal": self.goal,
"status": self.status.value,
"steps": [s.to_dict() for s in self.steps],
"subtasks": [t.to_dict() for t in self.subtasks],
"result": self.result,
"metadata": self.metadata,
"created_at": self.created_at.isoformat() if self.created_at else None,
"updated_at": self.updated_at.isoformat() if self.updated_at else None
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "Task":
steps = [Step.from_dict(s) if isinstance(s, dict) else s for s in data.get("steps", [])]
subtasks = [cls.from_dict(t) if isinstance(t, dict) else t for t in data.get("subtasks", [])]
return cls(
id=data.get("id", generate_id("task")),
name=data.get("name", ""),
description=data.get("description", ""),
goal=data.get("goal", ""),
status=TaskStatus(data.get("status", "pending")),
steps=steps,
subtasks=subtasks,
result=data.get("result"),
metadata=data.get("metadata", {})
)
def get_step(self, step_id: str) -> Optional[Step]:
for step in self.steps:
if step.id == step_id:
return step
return None
def get_completed_step_ids(self) -> List[str]:
return [s.id for s in self.steps if s.status == StepStatus.COMPLETED]
def is_complete(self) -> bool:
return all(s.status in (StepStatus.COMPLETED, StepStatus.SKIPPED) for s in self.steps)
class TaskGraph:
"""Task graph for managing step dependencies"""
def __init__(self, task: Task):
self.task = task
self._adjacency: Dict[str, List[str]] = {}
self._reverse_adjacency: Dict[str, List[str]] = {}
self._in_degree: Dict[str, int] = {}
self._build_graph()
def _build_graph(self) -> None:
for step in self.task.steps:
self._adjacency[step.id] = []
self._reverse_adjacency[step.id] = []
self._in_degree[step.id] = 0
for step in self.task.steps:
for dep_id in step.depends_on:
if dep_id in self._adjacency:
self._adjacency[dep_id].append(step.id)
self._reverse_adjacency[step.id].append(dep_id)
self._in_degree[step.id] += 1
def topological_sort(self) -> List[Step]:
in_degree = self._in_degree.copy()
queue = [step_id for step_id, degree in in_degree.items() if degree == 0]
result = []
step_map = {step.id: step for step in self.task.steps}
while queue:
queue.sort()
current = queue.pop(0)
result.append(step_map[current])
for dependent_id in self._adjacency[current]:
in_degree[dependent_id] -= 1
if in_degree[dependent_id] == 0:
queue.append(dependent_id)
return result
def get_ready_steps(self, completed_step_ids: List[str] = None) -> List[Step]:
if completed_step_ids is None:
completed_step_ids = self.task.get_completed_step_ids()
step_map = {step.id: step for step in self.task.steps}
ready = []
for step in self.task.steps:
if step.status != StepStatus.PENDING:
continue
deps_completed = all(dep_id in completed_step_ids for dep_id in step.depends_on)
if deps_completed:
ready.append(step)
return ready
def get_parallel_levels(self) -> List[List[Step]]:
"""Get steps grouped by dependency level for parallel execution"""
completed = set(self.task.get_completed_step_ids())
levels: List[List[Step]] = []
remaining = {s.id for s in self.task.steps if s.status == StepStatus.PENDING}
while remaining:
current_level = []
for step_id in list(remaining):
step = self.task.get_step(step_id)
if step and all(dep_id in completed for dep_id in step.depends_on):
current_level.append(step)
if not current_level:
break
levels.append(current_level)
for step in current_level:
remaining.remove(step.id)
completed.add(step.id)
return levels
def detect_cycles(self) -> List[List[str]]:
WHITE, GRAY, BLACK = 0, 1, 2
color = {step.id: WHITE for step in self.task.steps}
cycles = []
def dfs(node: str, path: List[str]) -> bool:
color[node] = GRAY
path.append(node)
for neighbor in self._adjacency.get(node, []):
if color[neighbor] == GRAY:
cycle_start = path.index(neighbor)
cycles.append(path[cycle_start:] + [neighbor])
return True
elif color[neighbor] == WHITE:
if dfs(neighbor, path):
return True
path.pop()
color[node] = BLACK
return False
for step in self.task.steps:
if color[step.id] == WHITE:
dfs(step.id, [])
return cycles
def validate(self) -> tuple[bool, Optional[str]]:
cycles = self.detect_cycles()
if cycles:
return False, f"Circular dependency detected: {cycles[0]}"
step_ids = {step.id for step in self.task.steps}
for step in self.task.steps:
for dep_id in step.depends_on:
if dep_id not in step_ids:
return False, f"Step '{step.name}' depends on non-existent step '{dep_id}'"
return True, None
class TaskService:
"""Task service for managing tasks"""
def __init__(self):
self._tasks: Dict[str, Task] = {}
def create_task(
self,
name: str,
goal: str,
description: str = "",
steps: List[Dict[str, Any]] = None
) -> Task:
task_id = generate_id("task")
task = Task(
id=task_id,
name=name,
description=description,
goal=goal
)
if steps:
for step_data in steps:
step = Step(
id=generate_id("step"),
name=step_data.get("name", ""),
description=step_data.get("description", ""),
goal=step_data.get("goal", "")
)
task.steps.append(step)
self._tasks[task_id] = task
logger.info(f"Created task: {task_id}")
return task
def get_task(self, task_id: str) -> Optional[Task]:
return self._tasks.get(task_id)
def list_tasks(self) -> List[Task]:
return list(self._tasks.values())
def update_task_status(
self,
task_id: str,
status: TaskStatus,
result: Any = None
) -> Optional[Task]:
task = self._tasks.get(task_id)
if not task:
return None
task.status = status
task.result = result
task.updated_at = datetime.now()
return task
def update_step_status(
self,
task_id: str,
step_id: str,
status: StepStatus,
result: Any = None
) -> Optional[Step]:
task = self._tasks.get(task_id)
if not task:
return None
step = task.get_step(step_id)
if not step:
return None
step.status = status
step.result = result
step.updated_at = datetime.now()
task.updated_at = datetime.now()
return step
def add_steps(
self,
task_id: str,
steps: List[Dict[str, Any]]
) -> Optional[List[Step]]:
task = self._tasks.get(task_id)
if not task:
return None
result = []
for step_data in steps:
step = Step(
id=generate_id("step"),
name=step_data.get("name", ""),
description=step_data.get("description", ""),
goal=step_data.get("goal", ""),
depends_on=step_data.get("depends_on", [])
)
task.steps.append(step)
result.append(step)
task.updated_at = datetime.now()
return result
def set_steps_from_plan(
self,
task_id: str,
plan_steps: List[Dict[str, Any]]
) -> Optional[List[Step]]:
"""Set steps from LLM-generated plan"""
task = self._tasks.get(task_id)
if not task:
return None
task.steps = []
for step_data in plan_steps:
step = Step(
id=generate_id("step"),
name=step_data.get("name", ""),
description=step_data.get("description", ""),
goal=step_data.get("goal", step_data.get("name", "")),
depends_on=step_data.get("depends_on", [])
)
task.steps.append(step)
task.metadata["raw_plan"] = plan_steps
task.updated_at = datetime.now()
logger.info(f"Set {len(task.steps)} steps for task: {task_id}")
return task.steps
def delete_task(self, task_id: str) -> bool:
if task_id not in self._tasks:
return False
del self._tasks[task_id]
return True
def build_graph(self, task_id: str) -> Optional[TaskGraph]:
task = self._tasks.get(task_id)
if not task:
return None
return TaskGraph(task)
def export_task(self, task_id: str) -> Optional[str]:
task = self._tasks.get(task_id)
if not task:
return None
return json.dumps(task.to_dict(), ensure_ascii=False, indent=2)
def import_task(self, task_json: str) -> Optional[Task]:
try:
data = json.loads(task_json)
task = Task.from_dict(data)
self._tasks[task.id] = task
return task
except Exception as e:
logger.error(f"Failed to import task: {e}")
return None
class Planner:
"""LLM-based task planner - generates steps from goal"""
PLANNER_SYSTEM_PROMPT = """You are a task planning assistant. Your job is to break down complex goals into clear, actionable steps.
For a given goal, you should output a JSON array of steps. Each step should have:
- name: Short, descriptive name
- description: What this step does
- depends_on: (optional) List of step indices this depends on
Rules:
1. Steps should be atomic and focused
2. Consider dependencies between steps
3. Order steps logically
4. Use simple dependencies when needed
Output ONLY valid JSON array, no other text."""
def __init__(self, llm_client):
self.llm = llm_client
async def plan(
self,
goal: str,
context: Dict[str, Any] = None
) -> List[Dict[str, Any]]:
"""Generate a plan (list of steps) for the given goal"""
user_message = f"Goal: {goal}\n\n"
if context:
if context.get("available_tools"):
user_message += f"Available tools: {', '.join(context['available_tools'])}\n"
if context.get("constraints"):
user_message += f"Constraints: {context['constraints']}\n"
messages = [
{"role": "system", "content": self.PLANNER_SYSTEM_PROMPT},
{"role": "user", "content": user_message}
]
try:
response = await self.llm.call(messages=messages)
# Parse JSON response
plan_text = response.get("content", "")
# Extract JSON from response
start = plan_text.find("[")
end = plan_text.rfind("]") + 1
if start != -1 and end > start:
plan_json = plan_text[start:end]
steps = json.loads(plan_json)
# Normalize steps - ensure depends_on uses indices
normalized_steps = []
for i, step in enumerate(steps):
normalized = {
"name": step.get("name", f"Step {i+1}"),
"description": step.get("description", ""),
"goal": step.get("goal", step.get("description", "")),
"depends_on": []
}
# Convert index dependencies to step IDs (placeholder, will be fixed later)
deps = step.get("depends_on", [])
if isinstance(deps, list):
normalized["depends_on"] = deps
normalized_steps.append(normalized)
return normalized_steps
return []
except Exception as e:
logger.error(f"Planning failed: {e}")
return []
def plan_sync(
self,
goal: str,
context: Dict[str, Any] = None
) -> List[Dict[str, Any]]:
"""Synchronous version of plan"""
raise NotImplementedError("Use async plan() method")
class TaskRunner:
"""Execute tasks using TaskGraph"""
def __init__(self, task_service: TaskService):
self.task_service = task_service
def run_step(
self,
task_id: str,
step_id: str
) -> Optional[Step]:
"""Mark step as running"""
return self.task_service.update_step_status(
task_id, step_id, StepStatus.RUNNING
)
def complete_step(
self,
task_id: str,
step_id: str,
result: Dict[str, Any] = None
) -> Optional[Step]:
"""Mark step as completed"""
return self.task_service.update_step_status(
task_id, step_id, StepStatus.COMPLETED, result
)
def fail_step(
self,
task_id: str,
step_id: str,
error: str = ""
) -> Optional[Step]:
"""Mark step as failed"""
return self.task_service.update_step_status(
task_id, step_id, StepStatus.FAILED, {"error": error}
)
def get_next_steps(self, task_id: str) -> List[Step]:
"""Get steps ready to execute"""
graph = self.task_service.build_graph(task_id)
if not graph:
return []
return graph.get_ready_steps()
# Global task service instance
task_service = TaskService()