Luxx/luxx/routes/tasks.py

352 lines
9.6 KiB
Python

"""Task API routes"""
from fastapi import APIRouter, HTTPException, Depends
from pydantic import BaseModel
from typing import List, Optional, Dict, Any
from datetime import datetime
from luxx.services.task import (
task_service,
TaskService,
TaskStatus,
StepStatus
)
from luxx.utils.helpers import success_response, error_response
router = APIRouter(prefix="/tasks", tags=["tasks"])
class StepCreate(BaseModel):
"""Step creation model"""
name: str
description: str = ""
goal: str = ""
depends_on: List[str] = []
class Config:
json_schema_extra = {
"example": {
"name": "Read file",
"description": "Read the input file",
"goal": "Use file_read tool to read data.txt",
"depends_on": []
}
}
class TaskCreate(BaseModel):
"""Task creation model"""
name: str
goal: str
description: str = ""
steps: List[StepCreate] = []
class Config:
json_schema_extra = {
"example": {
"name": "Data Processing",
"goal": "Process and analyze the input data",
"description": "A complex data processing pipeline",
"steps": []
}
}
class TaskUpdate(BaseModel):
"""Task update model"""
name: Optional[str] = None
goal: Optional[str] = None
description: Optional[str] = None
status: Optional[str] = None
class StepUpdate(BaseModel):
"""Step update model"""
name: Optional[str] = None
description: Optional[str] = None
goal: Optional[str] = None
status: Optional[str] = None
class PlanGenerate(BaseModel):
"""Request model for plan generation"""
goal: str
context: Optional[Dict[str, Any]] = None
class Config:
json_schema_extra = {
"example": {
"goal": "Analyze sales data and generate a report",
"context": {
"available_tools": ["file_read", "python_execute"],
"constraints": "Must use Python for analysis"
}
}
}
def get_task_service() -> TaskService:
"""Dependency to get task service"""
return task_service
@router.get("")
async def list_tasks(service: TaskService = Depends(get_task_service)):
"""List all tasks"""
tasks = service.list_tasks()
return success_response([t.to_dict() for t in tasks])
@router.post("")
async def create_task(
task_data: TaskCreate,
service: TaskService = Depends(get_task_service)
):
"""Create a new task"""
steps = [s.model_dump() for s in task_data.steps]
task = service.create_task(
name=task_data.name,
goal=task_data.goal,
description=task_data.description,
steps=steps if steps else None
)
return success_response(task.to_dict(), "Task created successfully")
@router.get("/{task_id}")
async def get_task(
task_id: str,
service: TaskService = Depends(get_task_service)
):
"""Get task by ID"""
task = service.get_task(task_id)
if not task:
return error_response("Task not found", code=404)
return success_response(task.to_dict())
@router.put("/{task_id}")
async def update_task(
task_id: str,
update_data: TaskUpdate,
service: TaskService = Depends(get_task_service)
):
"""Update task"""
task = service.get_task(task_id)
if not task:
return error_response("Task not found", code=404)
if update_data.name is not None:
task.name = update_data.name
if update_data.goal is not None:
task.goal = update_data.goal
if update_data.description is not None:
task.description = update_data.description
if update_data.status is not None:
try:
task.status = TaskStatus(update_data.status)
except ValueError:
return error_response(f"Invalid status: {update_data.status}")
task.updated_at = datetime.now()
return success_response(task.to_dict())
@router.delete("/{task_id}")
async def delete_task(
task_id: str,
service: TaskService = Depends(get_task_service)
):
"""Delete task"""
if not service.delete_task(task_id):
return error_response("Task not found", code=404)
return success_response(message="Task deleted successfully")
# Step endpoints
@router.post("/{task_id}/steps")
async def add_steps(
task_id: str,
steps: List[StepCreate],
service: TaskService = Depends(get_task_service)
):
"""Add steps to task"""
steps_data = [s.model_dump() for s in steps]
added = service.add_steps(task_id, steps_data)
if added is None:
return error_response("Task not found", code=404)
return success_response([s.to_dict() for s in added], "Steps added successfully")
@router.get("/{task_id}/graph")
async def get_task_graph(
task_id: str,
service: TaskService = Depends(get_task_service)
):
"""Get task dependency graph"""
task = service.get_task(task_id)
if not task:
return error_response("Task not found", code=404)
graph = service.build_graph(task_id)
if not graph:
return error_response("Failed to build graph")
# Validate graph
is_valid, error = graph.validate()
if not is_valid:
return error_response(f"Invalid graph: {error}")
# Get execution order
sorted_steps = graph.topological_sort()
# Get parallel execution levels
parallel_levels = graph.get_parallel_levels()
return success_response({
"task_id": task_id,
"steps": [s.to_dict() for s in task.steps],
"execution_order": [s.id for s in sorted_steps],
"parallel_levels": [[s.id for s in level] for level in parallel_levels],
"is_valid": True
})
@router.get("/{task_id}/ready-steps")
async def get_ready_steps(
task_id: str,
service: TaskService = Depends(get_task_service)
):
"""Get steps that are ready to execute"""
task = service.get_task(task_id)
if not task:
return error_response("Task not found", code=404)
graph = service.build_graph(task_id)
if not graph:
return error_response("Failed to build graph")
ready = graph.get_ready_steps()
return success_response([s.to_dict() for s in ready])
@router.put("/{task_id}/steps/{step_id}")
async def update_step(
task_id: str,
step_id: str,
update_data: StepUpdate,
service: TaskService = Depends(get_task_service)
):
"""Update step"""
task = service.get_task(task_id)
if not task:
return error_response("Task not found", code=404)
step = task.get_step(step_id)
if not step:
return error_response("Step not found", code=404)
if update_data.name is not None:
step.name = update_data.name
if update_data.description is not None:
step.description = update_data.description
if update_data.goal is not None:
step.goal = update_data.goal
if update_data.status is not None:
try:
step.status = StepStatus(update_data.status)
except ValueError:
return error_response(f"Invalid status: {update_data.status}")
step.updated_at = datetime.now()
task.updated_at = datetime.now()
return success_response(step.to_dict())
@router.post("/{task_id}/execute")
async def execute_task(
task_id: str,
service: TaskService = Depends(get_task_service)
):
"""Mark task as running and return first ready steps"""
task = service.get_task(task_id)
if not task:
return error_response("Task not found", code=404)
if task.status == TaskStatus.RUNNING:
return error_response("Task is already running")
# Validate graph
graph = service.build_graph(task_id)
if not graph:
return error_response("Failed to build graph")
is_valid, error = graph.validate()
if not is_valid:
return error_response(f"Invalid task graph: {error}")
# Update status to running
service.update_task_status(task_id, TaskStatus.RUNNING)
# Get ready steps
ready = graph.get_ready_steps()
if not ready:
if task.is_complete():
service.update_task_status(task_id, TaskStatus.COMPLETED)
return success_response({
"status": "completed",
"message": "All steps completed"
})
return error_response("No steps ready to execute")
return success_response({
"status": "running",
"ready_steps": [s.to_dict() for s in ready]
})
@router.post("/{task_id}/complete")
async def complete_task(
task_id: str,
service: TaskService = Depends(get_task_service)
):
"""Mark task as completed"""
task = service.get_task(task_id)
if not task:
return error_response("Task not found", code=404)
if task.is_complete():
service.update_task_status(task_id, TaskStatus.COMPLETED)
return success_response({
"status": "completed",
"message": "Task completed successfully"
})
return error_response("Task is not yet complete")
# Export/Import endpoints
@router.get("/{task_id}/export")
async def export_task(
task_id: str,
service: TaskService = Depends(get_task_service)
):
"""Export task as JSON"""
task_json = service.export_task(task_id)
if not task_json:
return error_response("Task not found", code=404)
return success_response({"json": task_json})
@router.post("/import")
async def import_task(
task_json: str,
service: TaskService = Depends(get_task_service)
):
"""Import task from JSON"""
task = service.import_task(task_json)
if not task:
return error_response("Failed to import task")
return success_response(task.to_dict(), "Task imported successfully")