Luxx/luxx/routes/chat_rooms.py

477 lines
15 KiB
Python

"""Chat room routes for multi-agent conversations"""
from typing import Optional, List
from fastapi import APIRouter, Depends
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from sqlalchemy.orm import Session
from datetime import datetime
from luxx.database import get_db, SessionLocal
from luxx.models import ChatRoom, RoomAgent, Agent, Message, LLMProvider, User
from luxx.routes.auth import get_current_user
from luxx.services.chat_room import orchestrator
from luxx.utils.helpers import generate_id, success_response, error_response, paginate
router = APIRouter(prefix="/chat-rooms", tags=["Chat Rooms"])
# ============ Request Models ============
class AgentConfig(BaseModel):
agent_id: Optional[int] = None # Link to existing Agent
name: str = ""
role: str = ""
provider_id: Optional[int] = None
model: str = ""
system_prompt: str = "You are a helpful AI assistant."
color: str = "#2563eb"
# Supervision fields
agent_type: str = "producer" # producer | reviewer | executor | observer
reviews_for: Optional[str] = None # JSON: [agent_id_1, agent_id_2]
reviewed_by: Optional[str] = None # JSON: [agent_id_1, agent_id_2]
review_strictness: int = 3
capability_tags: Optional[str] = None # JSON: ["security", "performance"]
class ChatRoomCreate(BaseModel):
title: str
task: str
max_rounds: int = 5
execution_mode: str = "sequential" # sequential | parallel | review_loop
agents: List[AgentConfig] = []
class ChatRoomUpdate(BaseModel):
title: Optional[str] = None
task: Optional[str] = None
max_rounds: Optional[int] = None
status: Optional[str] = None
execution_mode: Optional[str] = None
class AgentCreate(BaseModel):
agent_id: Optional[int] = None # Link to existing Agent
name: str = ""
role: str = ""
provider_id: Optional[int] = None
model: str = ""
system_prompt: str = "You are a helpful AI assistant."
color: str = "#2563eb"
class AgentUpdate(BaseModel):
name: Optional[str] = None
role: Optional[str] = None
provider_id: Optional[int] = None
model: Optional[str] = None
system_prompt: Optional[str] = None
color: Optional[str] = None
turn_order: Optional[int] = None
# ============ Room CRUD ============
@router.get("/", response_model=dict)
def list_rooms(
page: int = 1,
page_size: int = 20,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""List chat rooms"""
query = db.query(ChatRoom).filter(ChatRoom.user_id == current_user.id)
result = paginate(query.order_by(ChatRoom.updated_at.desc()), page, page_size)
return success_response(data={
"items": [r.to_dict() for r in result["items"]],
"total": result["total"],
"page": result["page"],
"page_size": result["page_size"]
})
@router.post("/", response_model=dict)
def create_room(
data: ChatRoomCreate,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""Create a chat room with agents"""
room = ChatRoom(
id=generate_id("room"),
user_id=current_user.id,
title=data.title,
task=data.task,
max_rounds=data.max_rounds,
execution_mode=data.execution_mode
)
db.add(room)
db.flush()
for i, agent_cfg in enumerate(data.agents):
# If agent_id provided, copy config from existing Agent
if agent_cfg.agent_id:
existing = db.query(Agent).filter(
Agent.id == agent_cfg.agent_id,
Agent.user_id == current_user.id
).first()
if existing:
name = agent_cfg.name or existing.name
role = agent_cfg.role or existing.role
provider_id = agent_cfg.provider_id or existing.provider_id
model = agent_cfg.model or existing.model
system_prompt = agent_cfg.system_prompt if agent_cfg.system_prompt != "You are a helpful AI assistant." else existing.system_prompt
color = agent_cfg.color if agent_cfg.color != "#2563eb" else existing.color
agent_id = existing.id
else:
return error_response(f"Agent {agent_cfg.agent_id} not found", 404)
else:
name = agent_cfg.name or f"Agent {i+1}"
role = agent_cfg.role
provider_id = agent_cfg.provider_id
model = agent_cfg.model
system_prompt = agent_cfg.system_prompt
color = agent_cfg.color
agent_id = None
# Resolve model from provider if not specified
if provider_id and not model:
provider = db.query(LLMProvider).filter(
LLMProvider.id == provider_id,
LLMProvider.user_id == current_user.id
).first()
if provider:
model = provider.default_model
if not model:
default_provider = db.query(LLMProvider).filter(
LLMProvider.user_id == current_user.id,
LLMProvider.is_default == True
).first()
if default_provider:
provider_id = default_provider.id
model = default_provider.default_model
if not model:
model = "gpt-4"
agent = RoomAgent(
room_id=room.id,
agent_id=agent_id,
name=name,
role=role,
provider_id=provider_id,
model=model,
system_prompt=system_prompt,
color=color,
turn_order=i,
agent_type=agent_cfg.agent_type,
reviews_for=agent_cfg.reviews_for,
reviewed_by=agent_cfg.reviewed_by,
review_strictness=agent_cfg.review_strictness,
capability_tags=agent_cfg.capability_tags
)
db.add(agent)
db.commit()
db.refresh(room)
return success_response(data=room.to_dict(include_messages=False), message="Room created")
@router.get("/{room_id}", response_model=dict)
def get_room(
room_id: str,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""Get room details with agents"""
room = db.query(ChatRoom).filter(
ChatRoom.id == room_id,
ChatRoom.user_id == current_user.id
).first()
if not room:
return error_response("Room not found", 404)
result = room.to_dict(include_messages=False)
# Also get message count
msg_count = db.query(Message).filter(Message.room_id == room_id).count()
result["message_count"] = msg_count
return success_response(data=result)
@router.put("/{room_id}", response_model=dict)
def update_room(
room_id: str,
data: ChatRoomUpdate,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""Update room"""
room = db.query(ChatRoom).filter(
ChatRoom.id == room_id,
ChatRoom.user_id == current_user.id
).first()
if not room:
return error_response("Room not found", 404)
if room.status == "running":
return error_response("Cannot update a running room", 400)
update_data = data.dict(exclude_unset=True)
for key, value in update_data.items():
setattr(room, key, value)
db.commit()
db.refresh(room)
return success_response(data=room.to_dict(), message="Room updated")
@router.delete("/{room_id}", response_model=dict)
def delete_room(
room_id: str,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""Delete room"""
room = db.query(ChatRoom).filter(
ChatRoom.id == room_id,
ChatRoom.user_id == current_user.id
).first()
if not room:
return error_response("Room not found", 404)
if room.status == "running":
return error_response("Cannot delete a running room. Stop it first.", 400)
db.delete(room)
db.commit()
return success_response(message="Room deleted")
# ============ Room Actions ============
@router.post("/{room_id}/start")
async def start_room(
room_id: str,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""Start the multi-agent conversation as SSE stream"""
room = db.query(ChatRoom).filter(
ChatRoom.id == room_id,
ChatRoom.user_id == current_user.id
).first()
if not room:
return error_response("Room not found", 404)
if room.status == "running":
return error_response("Room is already running", 400)
async def event_generator():
async for sse_str in orchestrator.run_room(room_id):
yield sse_str
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no"
}
)
@router.post("/{room_id}/stop", response_model=dict)
def stop_room(
room_id: str,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""Stop a running room"""
room = db.query(ChatRoom).filter(
ChatRoom.id == room_id,
ChatRoom.user_id == current_user.id
).first()
if not room:
return error_response("Room not found", 404)
orchestrator.cancel(room_id)
room.status = "paused"
db.commit()
return success_response(message="Room stopped")
@router.post("/{room_id}/reset", response_model=dict)
def reset_room(
room_id: str,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""Reset room to initial state, clearing all messages"""
room = db.query(ChatRoom).filter(
ChatRoom.id == room_id,
ChatRoom.user_id == current_user.id
).first()
if not room:
return error_response("Room not found", 404)
if room.status == "running":
return error_response("Cannot reset a running room", 400)
# Delete all messages in this room
db.query(Message).filter(Message.room_id == room_id).delete()
room.status = "idle"
room.current_round = 0
db.commit()
return success_response(message="Room reset")
# ============ Messages ============
@router.get("/{room_id}/messages", response_model=dict)
def get_room_messages(
room_id: str,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""Get all messages in a room"""
room = db.query(ChatRoom).filter(
ChatRoom.id == room_id,
ChatRoom.user_id == current_user.id
).first()
if not room:
return error_response("Room not found", 404)
messages = db.query(Message).filter(
Message.room_id == room_id
).order_by(Message.created_at).all()
return success_response(data={
"messages": [m.to_dict() for m in messages],
"room": room.to_dict()
})
# ============ Agent CRUD ============
@router.post("/{room_id}/agents", response_model=dict)
def add_agent(
room_id: str,
data: AgentCreate,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""Add an agent to a room"""
room = db.query(ChatRoom).filter(
ChatRoom.id == room_id,
ChatRoom.user_id == current_user.id
).first()
if not room:
return error_response("Room not found", 404)
if room.status == "running":
return error_response("Cannot modify agents while room is running", 400)
# Get max turn_order
max_order = db.query(RoomAgent).filter(
RoomAgent.room_id == room_id
).count()
# If agent_id provided, copy from existing Agent
if data.agent_id:
existing = db.query(Agent).filter(
Agent.id == data.agent_id,
Agent.user_id == current_user.id
).first()
if not existing:
return error_response(f"Agent {data.agent_id} not found", 404)
name = data.name or existing.name
role = data.role or existing.role
provider_id = data.provider_id or existing.provider_id
model = data.model or existing.model
system_prompt = data.system_prompt if data.system_prompt != "You are a helpful AI assistant." else existing.system_prompt
color = data.color if data.color != "#2563eb" else existing.color
agent_id = existing.id
else:
name = data.name or f"Agent {max_order + 1}"
role = data.role
provider_id = data.provider_id
model = data.model
system_prompt = data.system_prompt
color = data.color
agent_id = None
model = model
if provider_id and not model:
provider = db.query(LLMProvider).filter(LLMProvider.id == provider_id).first()
if provider:
model = provider.default_model
if not model:
model = "gpt-4"
agent = RoomAgent(
room_id=room_id,
agent_id=agent_id,
name=name,
role=role,
provider_id=provider_id,
model=model,
system_prompt=system_prompt,
color=color,
turn_order=max_order
)
db.add(agent)
db.commit()
db.refresh(agent)
return success_response(data=agent.to_dict(), message="Agent added")
@router.put("/{room_id}/agents/{agent_id}", response_model=dict)
def update_agent(
room_id: str,
agent_id: int,
data: AgentUpdate,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""Update an agent"""
agent = db.query(RoomAgent).filter(
RoomAgent.id == agent_id,
RoomAgent.room_id == room_id
).first()
if not agent:
return error_response("Agent not found", 404)
room = db.query(ChatRoom).filter(ChatRoom.id == room_id).first()
if room and room.status == "running":
return error_response("Cannot modify agents while room is running", 400)
update_data = data.dict(exclude_unset=True)
for key, value in update_data.items():
setattr(agent, key, value)
db.commit()
return success_response(data=agent.to_dict(), message="Agent updated")
@router.delete("/{room_id}/agents/{agent_id}", response_model=dict)
def delete_agent(
room_id: str,
agent_id: int,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""Remove an agent from a room"""
agent = db.query(RoomAgent).filter(
RoomAgent.id == agent_id,
RoomAgent.room_id == room_id
).first()
if not agent:
return error_response("Agent not found", 404)
room = db.query(ChatRoom).filter(ChatRoom.id == room_id).first()
if room and room.status == "running":
return error_response("Cannot remove agents while room is running", 400)
db.delete(agent)
db.commit()
return success_response(message="Agent removed")