"""Task module for autonomous task execution and planning""" from dataclasses import dataclass, field from datetime import datetime from enum import Enum import logging from typing import List, Optional, Dict, Any, AsyncGenerator, Callable import json from luxx.utils.helpers import generate_id logger = logging.getLogger(__name__) class TaskStatus(Enum): """Task status - 5-state model""" PENDING = "pending" # 创建/就绪 RUNNING = "running" # 运行中 BLOCK = "block" # 阻塞 TERMINATED = "terminated" # 终止 COMPLETED = "completed" # 完成 class StepStatus(Enum): """Step status""" PENDING = "pending" RUNNING = "running" COMPLETED = "completed" FAILED = "failed" SKIPPED = "skipped" @dataclass class Step: """Task step""" id: str name: str description: str = "" goal: str = "" # 步骤目标(用于 LLM 执行) depends_on: List[str] = field(default_factory=list) status: StepStatus = StepStatus.PENDING result: Optional[Dict[str, Any]] = None tool_calls: List[Dict[str, Any]] = field(default_factory=list) context: Dict[str, Any] = field(default_factory=dict) created_at: datetime = field(default_factory=datetime.now) updated_at: datetime = field(default_factory=datetime.now) def to_dict(self) -> Dict[str, Any]: return { "id": self.id, "name": self.name, "description": self.description, "goal": self.goal, "depends_on": self.depends_on, "status": self.status.value, "result": self.result, "tool_calls": self.tool_calls, "context": self.context, "created_at": self.created_at.isoformat() if self.created_at else None, "updated_at": self.updated_at.isoformat() if self.updated_at else None } @classmethod def from_dict(cls, data: Dict[str, Any]) -> "Step": return cls( id=data.get("id", generate_id("step")), name=data.get("name", ""), description=data.get("description", ""), goal=data.get("goal", ""), depends_on=data.get("depends_on", []), status=StepStatus(data.get("status", "pending")), result=data.get("result"), tool_calls=data.get("tool_calls", []), context=data.get("context", {}) ) @dataclass class Task: """Task entity with built-in planning capability""" id: str name: str description: str = "" goal: str = "" status: TaskStatus = TaskStatus.PENDING steps: List[Step] = field(default_factory=list) subtasks: List["Task"] = field(default_factory=list) result: Optional[Dict[str, Any]] = None metadata: Dict[str, Any] = field(default_factory=dict) created_at: datetime = field(default_factory=datetime.now) updated_at: datetime = field(default_factory=datetime.now) def to_dict(self) -> Dict[str, Any]: return { "id": self.id, "name": self.name, "description": self.description, "goal": self.goal, "status": self.status.value, "steps": [s.to_dict() for s in self.steps], "subtasks": [t.to_dict() for t in self.subtasks], "result": self.result, "metadata": self.metadata, "created_at": self.created_at.isoformat() if self.created_at else None, "updated_at": self.updated_at.isoformat() if self.updated_at else None } @classmethod def from_dict(cls, data: Dict[str, Any]) -> "Task": steps = [Step.from_dict(s) if isinstance(s, dict) else s for s in data.get("steps", [])] subtasks = [cls.from_dict(t) if isinstance(t, dict) else t for t in data.get("subtasks", [])] return cls( id=data.get("id", generate_id("task")), name=data.get("name", ""), description=data.get("description", ""), goal=data.get("goal", ""), status=TaskStatus(data.get("status", "pending")), steps=steps, subtasks=subtasks, result=data.get("result"), metadata=data.get("metadata", {}) ) def get_step(self, step_id: str) -> Optional[Step]: for step in self.steps: if step.id == step_id: return step return None def get_completed_step_ids(self) -> List[str]: return [s.id for s in self.steps if s.status == StepStatus.COMPLETED] def is_complete(self) -> bool: return all(s.status in (StepStatus.COMPLETED, StepStatus.SKIPPED) for s in self.steps) class TaskGraph: """Task graph for managing step dependencies""" def __init__(self, task: Task): self.task = task self._adjacency: Dict[str, List[str]] = {} self._reverse_adjacency: Dict[str, List[str]] = {} self._in_degree: Dict[str, int] = {} self._build_graph() def _build_graph(self) -> None: for step in self.task.steps: self._adjacency[step.id] = [] self._reverse_adjacency[step.id] = [] self._in_degree[step.id] = 0 for step in self.task.steps: for dep_id in step.depends_on: if dep_id in self._adjacency: self._adjacency[dep_id].append(step.id) self._reverse_adjacency[step.id].append(dep_id) self._in_degree[step.id] += 1 def topological_sort(self) -> List[Step]: in_degree = self._in_degree.copy() queue = [step_id for step_id, degree in in_degree.items() if degree == 0] result = [] step_map = {step.id: step for step in self.task.steps} while queue: queue.sort() current = queue.pop(0) result.append(step_map[current]) for dependent_id in self._adjacency[current]: in_degree[dependent_id] -= 1 if in_degree[dependent_id] == 0: queue.append(dependent_id) return result def get_ready_steps(self, completed_step_ids: List[str] = None) -> List[Step]: if completed_step_ids is None: completed_step_ids = self.task.get_completed_step_ids() step_map = {step.id: step for step in self.task.steps} ready = [] for step in self.task.steps: if step.status != StepStatus.PENDING: continue deps_completed = all(dep_id in completed_step_ids for dep_id in step.depends_on) if deps_completed: ready.append(step) return ready def get_parallel_levels(self) -> List[List[Step]]: """Get steps grouped by dependency level for parallel execution""" completed = set(self.task.get_completed_step_ids()) levels: List[List[Step]] = [] remaining = {s.id for s in self.task.steps if s.status == StepStatus.PENDING} while remaining: current_level = [] for step_id in list(remaining): step = self.task.get_step(step_id) if step and all(dep_id in completed for dep_id in step.depends_on): current_level.append(step) if not current_level: break levels.append(current_level) for step in current_level: remaining.remove(step.id) completed.add(step.id) return levels def detect_cycles(self) -> List[List[str]]: WHITE, GRAY, BLACK = 0, 1, 2 color = {step.id: WHITE for step in self.task.steps} cycles = [] def dfs(node: str, path: List[str]) -> bool: color[node] = GRAY path.append(node) for neighbor in self._adjacency.get(node, []): if color[neighbor] == GRAY: cycle_start = path.index(neighbor) cycles.append(path[cycle_start:] + [neighbor]) return True elif color[neighbor] == WHITE: if dfs(neighbor, path): return True path.pop() color[node] = BLACK return False for step in self.task.steps: if color[step.id] == WHITE: dfs(step.id, []) return cycles def validate(self) -> tuple[bool, Optional[str]]: cycles = self.detect_cycles() if cycles: return False, f"Circular dependency detected: {cycles[0]}" step_ids = {step.id for step in self.task.steps} for step in self.task.steps: for dep_id in step.depends_on: if dep_id not in step_ids: return False, f"Step '{step.name}' depends on non-existent step '{dep_id}'" return True, None class TaskService: """Task service for managing tasks""" def __init__(self): self._tasks: Dict[str, Task] = {} def create_task( self, name: str, goal: str, description: str = "", steps: List[Dict[str, Any]] = None ) -> Task: task_id = generate_id("task") task = Task( id=task_id, name=name, description=description, goal=goal ) if steps: for step_data in steps: step = Step( id=generate_id("step"), name=step_data.get("name", ""), description=step_data.get("description", ""), goal=step_data.get("goal", "") ) task.steps.append(step) self._tasks[task_id] = task logger.info(f"Created task: {task_id}") return task def get_task(self, task_id: str) -> Optional[Task]: return self._tasks.get(task_id) def list_tasks(self) -> List[Task]: return list(self._tasks.values()) def update_task_status( self, task_id: str, status: TaskStatus, result: Any = None ) -> Optional[Task]: task = self._tasks.get(task_id) if not task: return None task.status = status task.result = result task.updated_at = datetime.now() return task def update_step_status( self, task_id: str, step_id: str, status: StepStatus, result: Any = None ) -> Optional[Step]: task = self._tasks.get(task_id) if not task: return None step = task.get_step(step_id) if not step: return None step.status = status step.result = result step.updated_at = datetime.now() task.updated_at = datetime.now() return step def add_steps( self, task_id: str, steps: List[Dict[str, Any]] ) -> Optional[List[Step]]: task = self._tasks.get(task_id) if not task: return None result = [] for step_data in steps: step = Step( id=generate_id("step"), name=step_data.get("name", ""), description=step_data.get("description", ""), goal=step_data.get("goal", ""), depends_on=step_data.get("depends_on", []) ) task.steps.append(step) result.append(step) task.updated_at = datetime.now() return result def set_steps_from_plan( self, task_id: str, plan_steps: List[Dict[str, Any]] ) -> Optional[List[Step]]: """Set steps from LLM-generated plan""" task = self._tasks.get(task_id) if not task: return None task.steps = [] for step_data in plan_steps: step = Step( id=generate_id("step"), name=step_data.get("name", ""), description=step_data.get("description", ""), goal=step_data.get("goal", step_data.get("name", "")), depends_on=step_data.get("depends_on", []) ) task.steps.append(step) task.metadata["raw_plan"] = plan_steps task.updated_at = datetime.now() logger.info(f"Set {len(task.steps)} steps for task: {task_id}") return task.steps def delete_task(self, task_id: str) -> bool: if task_id not in self._tasks: return False del self._tasks[task_id] return True def build_graph(self, task_id: str) -> Optional[TaskGraph]: task = self._tasks.get(task_id) if not task: return None return TaskGraph(task) def export_task(self, task_id: str) -> Optional[str]: task = self._tasks.get(task_id) if not task: return None return json.dumps(task.to_dict(), ensure_ascii=False, indent=2) def import_task(self, task_json: str) -> Optional[Task]: try: data = json.loads(task_json) task = Task.from_dict(data) self._tasks[task.id] = task return task except Exception as e: logger.error(f"Failed to import task: {e}") return None class Planner: """LLM-based task planner - generates steps from goal""" PLANNER_SYSTEM_PROMPT = """You are a task planning assistant. Your job is to break down complex goals into clear, actionable steps. For a given goal, you should output a JSON array of steps. Each step should have: - name: Short, descriptive name - description: What this step does - depends_on: (optional) List of step indices this depends on Rules: 1. Steps should be atomic and focused 2. Consider dependencies between steps 3. Order steps logically 4. Use simple dependencies when needed Output ONLY valid JSON array, no other text.""" def __init__(self, llm_client): self.llm = llm_client async def plan( self, goal: str, context: Dict[str, Any] = None ) -> List[Dict[str, Any]]: """Generate a plan (list of steps) for the given goal""" user_message = f"Goal: {goal}\n\n" if context: if context.get("available_tools"): user_message += f"Available tools: {', '.join(context['available_tools'])}\n" if context.get("constraints"): user_message += f"Constraints: {context['constraints']}\n" messages = [ {"role": "system", "content": self.PLANNER_SYSTEM_PROMPT}, {"role": "user", "content": user_message} ] try: response = await self.llm.call(messages=messages) # Parse JSON response plan_text = response.get("content", "") # Extract JSON from response start = plan_text.find("[") end = plan_text.rfind("]") + 1 if start != -1 and end > start: plan_json = plan_text[start:end] steps = json.loads(plan_json) # Normalize steps - ensure depends_on uses indices normalized_steps = [] for i, step in enumerate(steps): normalized = { "name": step.get("name", f"Step {i+1}"), "description": step.get("description", ""), "goal": step.get("goal", step.get("description", "")), "depends_on": [] } # Convert index dependencies to step IDs (placeholder, will be fixed later) deps = step.get("depends_on", []) if isinstance(deps, list): normalized["depends_on"] = deps normalized_steps.append(normalized) return normalized_steps return [] except Exception as e: logger.error(f"Planning failed: {e}") return [] def plan_sync( self, goal: str, context: Dict[str, Any] = None ) -> List[Dict[str, Any]]: """Synchronous version of plan""" raise NotImplementedError("Use async plan() method") class TaskRunner: """Execute tasks using TaskGraph""" def __init__(self, task_service: TaskService): self.task_service = task_service def run_step( self, task_id: str, step_id: str ) -> Optional[Step]: """Mark step as running""" return self.task_service.update_step_status( task_id, step_id, StepStatus.RUNNING ) def complete_step( self, task_id: str, step_id: str, result: Dict[str, Any] = None ) -> Optional[Step]: """Mark step as completed""" return self.task_service.update_step_status( task_id, step_id, StepStatus.COMPLETED, result ) def fail_step( self, task_id: str, step_id: str, error: str = "" ) -> Optional[Step]: """Mark step as failed""" return self.task_service.update_step_status( task_id, step_id, StepStatus.FAILED, {"error": error} ) def get_next_steps(self, task_id: str) -> List[Step]: """Get steps ready to execute""" graph = self.task_service.build_graph(task_id) if not graph: return [] return graph.get_ready_steps() # Global task service instance task_service = TaskService()