@@ -27,18 +27,18 @@
@@ -55,12 +55,21 @@ import ProcessBlock from './ProcessBlock.vue'
const props = defineProps({
message: { type: Object, required: true },
deletable: { type: Boolean, default: false },
+ senderName: { type: String, default: 'Luxx' }, // 可配置的发送者名称
})
defineEmits(['delete', 'regenerate'])
const messageRef = ref(null)
+// 支持 sender_type (Room 场景) 和 role (Conversation 场景)
+const messageType = computed(() => {
+ return props.message.sender_type || props.message.role || 'assistant'
+})
+
+const isUser = computed(() => messageType.value === 'user')
+const isAssistant = computed(() => messageType.value === 'assistant' || messageType.value === 'agent')
+
const renderedContent = computed(() => {
const text = props.message.content || props.message.text || ''
if (!text) return ''
diff --git a/dashboard/src/views/RoomView.vue b/dashboard/src/views/RoomView.vue
index 5c83e78..ed5558d 100644
--- a/dashboard/src/views/RoomView.vue
+++ b/dashboard/src/views/RoomView.vue
@@ -136,9 +136,10 @@
-
@@ -149,10 +150,15 @@
@@ -304,7 +310,8 @@ import { roomsAPI, createRoomWS, agentsAPI, providersAPI } from '@/api'
import { marked } from 'marked'
import ListItem from '@/components/ListItem.vue'
import InlineForm from '@/components/InlineForm.vue'
-import RoomMessageBubble from '@/components/RoomMessageBubble.vue'
+import MessageBubble from '@/components/MessageBubble.vue'
+import ProcessBlock from '@/components/ProcessBlock.vue'
const rooms = ref([])
const currentRoom = ref(null)
@@ -321,11 +328,15 @@ const editingRoom = ref(null)
const editingAgent = ref(null)
const showAddMembers = ref(false)
+const showCreateRoom = ref(false)
+const showRoomManage = ref(false)
+const roomTab = ref('list')
const inputMessage = ref('')
const messagesContainer = ref(null)
let ws = null
const roomForm = reactive({ name: '', description: '' })
+const newRoom = reactive({ name: '', description: '' })
const agentForm = reactive({
name: '',
@@ -398,6 +409,43 @@ function closeRoomForm() {
Object.assign(roomForm, { name: '', description: '' })
}
+async function createRoom() {
+ try {
+ const res = await roomsAPI.create(newRoom)
+ rooms.value.push(res.room)
+ showCreateRoom.value = false
+ Object.assign(newRoom, { name: '', description: '' })
+ joinRoom(res.room)
+ } catch (e) {
+ console.error('Failed to create room:', e)
+ alert('创建失败')
+ }
+}
+
+function editRoom(room) {
+ editingRoom.value = room
+ Object.assign(roomForm, { name: room.name, description: room.description || '' })
+ roomTab.value = 'edit'
+}
+
+async function deleteRoom(roomId) {
+ if (!confirm('确定删除此聊天室?')) return
+ try {
+ await roomsAPI.delete(roomId)
+ rooms.value = rooms.value.filter(r => r.id !== roomId)
+ if (currentRoom.value?.id === roomId) {
+ currentRoom.value = null
+ messages.value = []
+ }
+ if (roomTab.value === 'list') {
+ // Refresh room list in modal
+ }
+ } catch (e) {
+ console.error('Failed to delete room:', e)
+ alert('删除失败')
+ }
+}
+
async function saveRoom() {
try {
if (editingRoom.value) {
@@ -445,39 +493,113 @@ async function joinRoom(room) {
streamingMessages.value = {}
typingAgents.value = new Set()
- // 加载房间成员
- try {
- const res = await roomsAPI.listAgents(room.id)
- roomAgents.value = res.agents || []
- } catch (e) {
- console.error('Failed to load room agents:', e)
- roomAgents.value = []
- }
-
- // WebSocket 连接
+ // WebSocket 连接 - 使用新的事件格式
ws = createRoomWS(room.id, {
- onConnected: () => console.log('Connected to room'),
- onHistory: (msgs) => {
+ onConnect: () => console.log('Connecting to room...'),
+
+ onConnected: (data) => {
+ console.log('Connected to room:', data)
+ },
+
+ onRoomInfo: (roomData) => {
+ currentRoom.value = roomData
+ },
+
+ onHistory: (msgs, hasMore) => {
+ // 新格式: messages 直接是数组
messages.value = msgs || []
scrollToBottom()
},
- onAgentsUpdate: (agents) => {
+
+ onAgentsUpdate: (agents, count) => {
+ // 新格式: agents 是对象数组
roomAgents.value = agents || []
},
+
onMessage: (msg) => {
- messages.value.push(msg)
+ // 新格式: msg 是 { message: {...} } 结构,需要提取 message
+ const message = msg.message || msg
+ messages.value.push(message)
scrollToBottom()
},
+
onTyping: (data) => {
+ // 新格式: {sender_id, sender_type, agent_name, is_typing}
if (data.is_typing) {
- typingAgents.value.add(data.agent_id)
+ typingAgents.value.add(data.sender_id)
} else {
- typingAgents.value.delete(data.agent_id)
+ typingAgents.value.delete(data.sender_id)
}
},
+
+ // 新的流式响应处理
+ onStreamStart: (data) => {
+ // {stream_id, message_id, agent, parent_message_id}
+ const agentId = data.agent?.id
+ const agentName = data.agent?.name || 'Agent'
+ streamingMessages.value[agentId] = {
+ streamId: data.stream_id,
+ agentName,
+ content: '',
+ steps: []
+ }
+ typingAgents.value.add(agentId)
+ },
+
+ onStreamStep: (data) => {
+ // {stream_id, step: {id, type, delta, full, done}}
+ const streamData = streamingMessages.value[data.stream_id] || streamingMessages.value[Object.keys(streamingMessages.value)[0]]
+ if (!streamData) return
+
+ const step = data.step
+ if (step.type === 'text' || step.type === 'thinking') {
+ streamData.content = step.full
+ }
+ if (step.arguments !== undefined) {
+ // Tool call step
+ if (!streamData.toolCalls) streamData.toolCalls = []
+ streamData.toolCalls.push(step)
+ }
+ },
+
+ onStreamEnd: (data) => {
+ // {stream_id, content, token_count, usage}
+ const agentId = Object.keys(streamingMessages.value).find(
+ key => streamingMessages.value[key].streamId === data.stream_id
+ )
+
+ if (agentId && streamingMessages.value[agentId]) {
+ const streamData = streamingMessages.value[agentId]
+ messages.value.push({
+ id: `msg-${Date.now()}`,
+ sender_type: 'agent',
+ sender_id: agentId,
+ sender_name: streamData.agentName,
+ content: data.content || streamData.content,
+ token_count: data.token_count,
+ created_at: new Date().toISOString()
+ })
+ delete streamingMessages.value[agentId]
+ }
+ typingAgents.value.delete(agentId)
+ scrollToBottom()
+ },
+
+ onStreamError: (data) => {
+ // {stream_id, error}
+ console.error('Stream error:', data.error)
+ const agentId = Object.keys(streamingMessages.value).find(
+ key => streamingMessages.value[key].streamId === data.stream_id
+ )
+ if (agentId) {
+ delete streamingMessages.value[agentId]
+ typingAgents.value.delete(agentId)
+ }
+ },
+
+ // 兼容旧的流式格式
onStream: (event, data, agentId, agentName) => {
if (event === 'process_step') {
- // data.step contains {id, index, type, content}
const step = data.step || data
if (!streamingMessages.value[agentId]) {
streamingMessages.value[agentId] = { agentName, content: '' }
@@ -486,7 +608,6 @@ async function joinRoom(room) {
streamingMessages.value[agentId].content = step.content
}
} else if (event === 'done') {
- // Save streaming message to messages list before removing
if (streamingMessages.value[agentId]) {
messages.value.push({
id: `msg-${Date.now()}`,
@@ -501,8 +622,16 @@ async function joinRoom(room) {
typingAgents.value.delete(agentId)
}
},
+
onSystem: (data) => {
- console.log('System:', data)
+ // {type, sender, content, message}
+ console.log('System event:', data)
+ if (data.message) {
+ // 提取 message 对象
+ const message = data.message.message || data.message
+ messages.value.push(message)
+ scrollToBottom()
+ }
}
})
}
diff --git a/luxx/api/rooms.py b/luxx/api/rooms.py
index 062a68a..fb9cf42 100644
--- a/luxx/api/rooms.py
+++ b/luxx/api/rooms.py
@@ -22,6 +22,8 @@ class UpdateChatRoomRequest(BaseModel):
class SendMessageRequest(BaseModel):
content: str
+ reply_to: Optional[str] = None # Message ID to reply to
+ mentions: Optional[List[str]] = None # Mentioned agent IDs
class AddAgentRequest(BaseModel):
@@ -56,7 +58,7 @@ async def create_chat_room(request: CreateChatRoomRequest):
@router.get("/{room_id}")
async def get_chat_room(room_id: str):
- """Get a chat room by ID"""
+ """Get a chat room by ID with agents"""
room = chat_room_service.get_room(room_id)
if not room:
raise HTTPException(status_code=404, detail="Chat room not found")
@@ -79,7 +81,7 @@ async def update_chat_room(room_id: str, request: UpdateChatRoomRequest):
@router.delete("/{room_id}")
async def delete_chat_room(room_id: str):
- """Delete a chat room"""
+ """Delete a chat room and all related data"""
success = chat_room_service.delete_room(room_id)
if not success:
raise HTTPException(status_code=404, detail="Chat room not found")
@@ -88,9 +90,23 @@ async def delete_chat_room(room_id: str):
@router.get("/{room_id}/agents")
async def get_room_agents(room_id: str):
- """Get all agents in a chat room"""
+ """Get all agents in a chat room (from stable RoomAgent table)"""
+ # Return both BaseAgent objects and info from RoomAgent table
agents = chat_room_service.get_room_agents(room_id)
- return {"agents": [a.to_dict() for a in agents]}
+ agents_info = chat_room_service.get_room_agents_info(room_id)
+
+ # Merge agent data
+ agent_data = []
+ for agent in agents:
+ agent_dict = agent.to_dict()
+ # Find matching info
+ for info in agents_info:
+ if info.get("id") == agent.agent_id:
+ agent_dict.update(info)
+ break
+ agent_data.append(agent_dict)
+
+ return {"agents": agent_data, "count": len(agent_data)}
@router.post("/{room_id}/agents")
@@ -99,7 +115,9 @@ async def add_agent_to_room(room_id: str, request: AddAgentRequest):
success = chat_room_service.add_agent_to_room(room_id, request.agent_id)
if not success:
raise HTTPException(status_code=400, detail="Failed to add agent")
- return {"success": True}
+ # Return updated agents list
+ agents = chat_room_service.get_room_agents_info(room_id)
+ return {"success": True, "agents": agents}
@router.delete("/{room_id}/agents/{agent_id}")
@@ -115,12 +133,15 @@ async def remove_agent_from_room(room_id: str, agent_id: str):
async def get_room_messages(room_id: str, limit: int = 50, before_id: str = None):
"""Get messages from a chat room"""
messages = chat_room_service.get_messages(room_id, limit=limit, before_id=before_id)
- return {"messages": messages}
+ return {"messages": messages, "count": len(messages)}
@router.post("/{room_id}/messages")
async def send_message(room_id: str, request: SendMessageRequest):
- """Send a message to a chat room. Returns a streaming response via SSE."""
+ """Send a message to a chat room. Returns a streaming response via SSE.
+
+ This endpoint is for HTTP-based messaging. WebSocket is preferred for real-time chat.
+ """
from fastapi.responses import StreamingResponse
import json
@@ -128,21 +149,27 @@ async def send_message(room_id: str, request: SendMessageRequest):
user_name = "User"
async def generate():
+ # Save user message first
+ user_msg = chat_room_service.save_message(
+ room_id=room_id,
+ sender_type="user",
+ sender_name=user_name,
+ content=request.content,
+ sender_id=user_id,
+ mentions=request.mentions,
+ parent_id=request.reply_to
+ )
+
+ # Yield saved message event
+ yield f"data: {json.dumps({'event': 'message', 'data': {'message': user_msg}}, ensure_ascii=False)}\n\n"
+
+ # Process and stream agent responses
async for event in chat_room_service.process_message(
room_id=room_id,
user_message=request.content,
- user_id=user_id,
- user_name=user_name
+ sender_id=user_id,
+ sender_name=user_name
):
- if event.get("event") in ["process_step", "done"]:
- chat_room_service.save_message(
- room_id=room_id,
- sender_type="user",
- sender_id=user_id,
- sender_name=user_name,
- content=request.content
- )
-
yield f"data: {json.dumps(event, ensure_ascii=False)}\n\n"
return StreamingResponse(
diff --git a/luxx/models/__init__.py b/luxx/models/__init__.py
index 838d782..8bd7bb3 100644
--- a/luxx/models/__init__.py
+++ b/luxx/models/__init__.py
@@ -1,12 +1,12 @@
"""Models package"""
from luxx.models.user import User, LLMProvider, Project
from luxx.models.chat import Conversation, Message
-from luxx.models.room import ChatRoom, Agent
+from luxx.models.room import ChatRoom, Agent, RoomAgent
from luxx.models.participant import Participant, ParticipantType
__all__ = [
"User", "LLMProvider", "Project",
"Conversation", "Message",
- "ChatRoom", "Agent",
+ "ChatRoom", "Agent", "RoomAgent",
"Participant", "ParticipantType",
]
diff --git a/luxx/models/chat.py b/luxx/models/chat.py
index e006ee4..261dedb 100644
--- a/luxx/models/chat.py
+++ b/luxx/models/chat.py
@@ -57,18 +57,35 @@ class Conversation(Base):
class Message(Base):
"""Unified Message model for Conversation and ChatRoom.
-
- role: user/assistant/system/tool
- content: JSON format with text, attachments, tool_calls, steps
+
+ 统一消息模型,支持:
+ - Conversation: 单人会话
+ - ChatRoom: 聊天室(多 Agent)
+
+ sender_type: user | agent | system
+ content: JSON 格式 {"text": "...", "steps": [...]}
"""
__tablename__ = "messages"
id: Mapped[str] = mapped_column(String(64), primary_key=True)
conversation_id: Mapped[Optional[str]] = mapped_column(String(64), ForeignKey("conversations.id"), nullable=True)
room_id: Mapped[Optional[str]] = mapped_column(String(64), nullable=True)
- role: Mapped[str] = mapped_column(String(16), nullable=False)
- content: Mapped[str] = mapped_column(Text, nullable=False, default="")
+
+ # 发送者信息
+ sender_id: Mapped[str] = mapped_column(String(64), nullable=False, default="") # 用户ID 或 AgentID
+ sender_type: Mapped[str] = mapped_column(String(16), nullable=False, default="user") # "user" | "agent" | "system"
sender_name: Mapped[str] = mapped_column(String(50), nullable=False, default="")
+
+ # 消息内容(兼容旧格式,同时保留 role 字段用于兼容 Conversation)
+ role: Mapped[str] = mapped_column(String(16), nullable=False, default="user") # 保留,兼容 Conversation
+ content: Mapped[str] = mapped_column(Text, nullable=False, default="")
+
+ # 流式响应元数据
+ is_streaming: Mapped[bool] = mapped_column(Boolean, default=False)
+ stream_id: Mapped[Optional[str]] = mapped_column(String(64), nullable=True)
+ parent_id: Mapped[Optional[str]] = mapped_column(String(64), nullable=True) # 回复的消息ID
+
+ # 元数据
mentions: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
token_count: Mapped[int] = mapped_column(Integer, default=0)
usage: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
@@ -84,18 +101,36 @@ class Message(Base):
def target_id(self) -> str:
return self.conversation_id or self.room_id or ""
- def to_dict(self):
+ def to_dict(self, include_stream_data: bool = False):
+ """转换消息为字典
+
+ 统一使用 sender 格式,content 保留 JSON 和纯文本两种格式
+ """
result = {
"id": self.id,
"conversation_id": self.conversation_id,
"room_id": self.room_id,
"target_type": self.target_type,
"target_id": self.target_id,
- "role": self.role,
+ "sender": {
+ "id": self.sender_id,
+ "type": self.sender_type,
+ "name": self.sender_name
+ },
+ # 兼容字段
+ "sender_id": self.sender_id,
+ "sender_type": self.sender_type,
"sender_name": self.sender_name,
+ "role": self.role, # 保留,兼容 Conversation
"token_count": self.token_count,
+ "is_streaming": self.is_streaming,
"created_at": self.created_at.isoformat() if self.created_at else None
}
+
+ # 流式数据
+ if include_stream_data:
+ result["stream_id"] = self.stream_id
+ result["parent_id"] = self.parent_id
# Parse usage JSON
if self.usage:
@@ -113,22 +148,20 @@ class Message(Base):
else:
result["mentions"] = []
- # Parse content JSON
+ # Parse content JSON - 提取 text 和 steps
try:
content_obj = json.loads(self.content) if self.content else {}
+ result["content"] = content_obj.get("text", content_obj.get("content", self.content))
+ result["text"] = result["content"]
+ result["process_steps"] = content_obj.get("steps", content_obj.get("process_steps", []))
+ result["attachments"] = content_obj.get("attachments", [])
+ result["tool_calls"] = content_obj.get("tool_calls", [])
except json.JSONDecodeError:
+ # 纯文本内容
result["text"] = self.content
+ result["content"] = self.content
+ result["process_steps"] = []
result["attachments"] = []
result["tool_calls"] = []
- result["process_steps"] = []
- return result
-
- result["text"] = content_obj.get("text", "")
- result["attachments"] = content_obj.get("attachments", [])
- result["tool_calls"] = content_obj.get("tool_calls", [])
- result["process_steps"] = content_obj.get("steps", [])
-
- if "content" not in content_obj:
- result["content"] = result["text"]
return result
diff --git a/luxx/models/room.py b/luxx/models/room.py
index e07b28e..6105cff 100644
--- a/luxx/models/room.py
+++ b/luxx/models/room.py
@@ -1,7 +1,7 @@
"""ChatRoom models - unified participant architecture"""
from datetime import datetime
from typing import Optional, List, TYPE_CHECKING
-from sqlalchemy import String, Integer, Boolean, Text, DateTime, ForeignKey
+from sqlalchemy import String, Integer, Boolean, Text, DateTime, ForeignKey, UniqueConstraint
from sqlalchemy.orm import Mapped, mapped_column, relationship
from luxx.core.database import Base
@@ -27,14 +27,44 @@ class ChatRoom(Base):
updated_at: Mapped[datetime] = mapped_column(DateTime, default=local_now, onupdate=local_now)
owner: Mapped["User"] = relationship("User", backref="chat_rooms")
+ room_agents: Mapped[List["RoomAgent"]] = relationship("RoomAgent", back_populates="room", cascade="all, delete-orphan")
- def to_dict(self):
- return {
+ def to_dict(self, include_agents: bool = False):
+ result = {
"id": self.id, "name": self.name, "description": self.description,
"owner_id": self.owner_id, "is_active": self.is_active,
"created_at": self.created_at.isoformat() if self.created_at else None,
"updated_at": self.updated_at.isoformat() if self.updated_at else None
}
+ if include_agents:
+ result["agents"] = [ra.to_dict() for ra in self.room_agents]
+ return result
+
+
+class RoomAgent(Base):
+ """ChatRoom 与 Agent 的关联表(替代依赖 Message 表的不稳定方案)"""
+ __tablename__ = "room_agents"
+ __table_args__ = (
+ UniqueConstraint('room_id', 'agent_id', name='uq_room_agent'),
+ )
+
+ id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
+ room_id: Mapped[str] = mapped_column(String(64), ForeignKey("chat_rooms.id", ondelete="CASCADE"), nullable=False)
+ agent_id: Mapped[str] = mapped_column(String(64), ForeignKey("agents.id", ondelete="CASCADE"), nullable=False)
+ joined_at: Mapped[datetime] = mapped_column(DateTime, default=local_now)
+ is_active: Mapped[bool] = mapped_column(Boolean, default=True)
+
+ room: Mapped["ChatRoom"] = relationship("ChatRoom", back_populates="room_agents")
+ agent: Mapped["Agent"] = relationship("Agent")
+
+ def to_dict(self):
+ return {
+ "id": self.agent_id,
+ "room_agent_id": self.id,
+ "joined_at": self.joined_at.isoformat() if self.joined_at else None,
+ "is_active": self.is_active,
+ "agent": self.agent.to_dict() if self.agent else None
+ }
class Agent(Base):
diff --git a/luxx/services/participant.py b/luxx/services/participant.py
index d5a526a..e26733a 100644
--- a/luxx/services/participant.py
+++ b/luxx/services/participant.py
@@ -1,6 +1,6 @@
"""Participant Service - unified service for users and agents in chat rooms."""
import logging
-from typing import Dict, Any, Optional, AsyncGenerator
+from typing import Dict, Any, Optional, AsyncGenerator, List
from luxx.agents.base import BaseAgent
from luxx.agents.registry import agent_registry
@@ -25,11 +25,11 @@ class ParticipantService:
# ==================== Agent ====================
def register_agent(self, agent: BaseAgent) -> Participant:
+ """Register an active agent in the participant service"""
self._active_agents[agent.agent_id] = agent
agent_registry.register(agent)
return Participant.from_agent(
- agent.agent_id, agent.name, agent.role, agent.avatar,
- agent.auto_response, agent.mention_trigger, agent.priority
+ agent.agent_id, agent.name, agent.role, agent.avatar
)
def unregister_agent(self, agent_id: str) -> bool:
@@ -40,13 +40,13 @@ class ParticipantService:
return False
def get_agent_participant(self, agent_id: str) -> Optional[Participant]:
+ """Get agent participant info"""
agent = self._active_agents.get(agent_id) or chat_room_service.get_agent(agent_id)
if agent:
if agent_id not in self._active_agents:
self._active_agents[agent_id] = agent
return Participant.from_agent(
- agent.agent_id, agent.name, agent.role, agent.avatar,
- agent.auto_response, agent.mention_trigger, agent.priority
+ agent.agent_id, agent.name, agent.role, agent.avatar
)
return None
@@ -86,42 +86,76 @@ class ParticipantService:
self, room_id: str, content: str, sender_id: str,
sender_name: str, sender_type: str = "user", context: Dict = None
) -> AsyncGenerator[Dict[str, Any], None]:
+ """Process a message in a chat room
+
+ This is a wrapper around chat_room_service.process_message
+ that handles broadcasting and typing indicators.
+ """
cm = self._cm()
- msg = chat_room_service.save_message(room_id, sender_type, sender_id, sender_name, content)
- await cm.broadcast_to_room(room_id, {"event": "message", "data": msg})
+ # Save and broadcast message
+ msg = chat_room_service.save_message(
+ room_id=room_id,
+ sender_type=sender_type,
+ sender_name=sender_name,
+ content=content,
+ sender_id=str(sender_id)
+ )
+ await cm.broadcast_to_room(room_id, {"event": "message", "data": {"message": msg}})
+ # Get room agents
room_agents = chat_room_service.get_room_agents(room_id)
if sender_type == "agent":
room_agents = [a for a in room_agents if a.agent_id != sender_id]
+ # Broadcast typing indicators
for agent in room_agents:
await cm.broadcast_to_room(room_id, {
"event": "typing",
- "data": {"agent_id": agent.agent_id, "agent_name": agent.name, "is_typing": True}
+ "data": {
+ "sender_id": agent.agent_id,
+ "sender_type": "agent",
+ "agent_name": agent.name,
+ "is_typing": True
+ }
})
+ # Process and yield events
ctx = (context or {})
ctx.update({"sender_type": sender_type, "sender_id": sender_id, "username": sender_name})
async for event in chat_room_service.process_message(room_id, content, sender_id, sender_name, ctx):
yield event
+ # Clear typing indicators
for agent in room_agents:
await cm.broadcast_to_room(room_id, {
"event": "typing",
- "data": {"agent_id": agent.agent_id, "agent_name": agent.name, "is_typing": False}
+ "data": {
+ "sender_id": agent.agent_id,
+ "sender_type": "agent",
+ "agent_name": agent.name,
+ "is_typing": False
+ }
})
async def send_message(
self, room_id: str, participant_id: str,
- participant_type: str, participant_name: str, content: str
+ participant_type: str, participant_name: str, content: str,
+ mentions: List[str] = None, parent_id: str = None
):
+ """Send a message as a participant"""
cm = self._cm()
msg = chat_room_service.save_message(
- room_id, participant_type, participant_id, participant_name, content
+ room_id=room_id,
+ sender_type=participant_type,
+ sender_name=participant_name,
+ content=content,
+ sender_id=str(participant_id),
+ mentions=mentions,
+ parent_id=parent_id
)
- await cm.broadcast_to_room(room_id, {"event": "message", "data": msg})
+ await cm.broadcast_to_room(room_id, {"event": "message", "data": {"message": msg}})
return msg
diff --git a/luxx/services/room.py b/luxx/services/room.py
index e6fb445..9826153 100644
--- a/luxx/services/room.py
+++ b/luxx/services/room.py
@@ -2,10 +2,13 @@
import json
import uuid
import logging
-from typing import List, Dict, Any, Optional, AsyncGenerator
+from typing import List, Dict, Optional
+from datetime import datetime
+
+from sqlalchemy.orm import joinedload
from luxx.core.database import SessionLocal
-from luxx.models.room import ChatRoom, Agent
+from luxx.models.room import ChatRoom, Agent, RoomAgent
from luxx.models.chat import Message
from luxx.agents.base import BaseAgent
@@ -21,41 +24,42 @@ class ChatRoomService:
db.close()
def get_room_agents(self, room_id: str) -> List[BaseAgent]:
- """Get active agents in a room from Message table"""
+ """Get active agents in a room from RoomAgent association table"""
db = SessionLocal()
try:
- # Query distinct agent records from messages
- messages = db.query(Message).filter(
- Message.room_id == room_id,
- Message.role == "agent"
- ).distinct().all()
+ # Query from RoomAgent table (stable approach) with eager loading
+ room_agents = db.query(RoomAgent).options(
+ joinedload(RoomAgent.agent)
+ ).filter(
+ RoomAgent.room_id == room_id,
+ RoomAgent.is_active == True
+ ).all()
- agent_ids = []
- seen = set()
- for msg in messages:
- # Extract agent_id from content JSON
- try:
- content = json.loads(msg.content) if msg.content else {}
- agent_id = content.get("agent_id")
- if agent_id and agent_id not in seen:
- seen.add(agent_id)
- agent_ids.append(agent_id)
- except json.JSONDecodeError:
- pass
-
agents = []
- for agent_id in agent_ids:
- agent_db = db.query(Agent).filter(
- Agent.id == agent_id,
- Agent.is_active == True
- ).first()
- if agent_db:
- agents.append(BaseAgent.from_model(agent_db))
+ for ra in room_agents:
+ if ra.agent and ra.agent.is_active:
+ agents.append(BaseAgent.from_model(ra.agent))
return sorted(agents, key=lambda a: a.priority)
finally:
db.close()
+ def get_room_agents_info(self, room_id: str) -> List[Dict]:
+ """Get room agents info with join metadata (using eager loading)"""
+ db = SessionLocal()
+ try:
+ # Use joinedload to eager load agent relationship
+ room_agents = db.query(RoomAgent).options(
+ joinedload(RoomAgent.agent)
+ ).filter(
+ RoomAgent.room_id == room_id,
+ RoomAgent.is_active == True
+ ).all()
+
+ return [ra.to_dict() for ra in room_agents]
+ finally:
+ db.close()
+
def get_agent(self, agent_id: str) -> Optional[BaseAgent]:
db = SessionLocal()
try:
@@ -75,6 +79,7 @@ class ChatRoomService:
db.close()
def create_room(self, name: str, owner_id: int, description: str = None, agent_ids: List[str] = None) -> Dict:
+ """Create a new chat room with optional initial agents"""
db = SessionLocal()
try:
room = ChatRoom(
@@ -85,19 +90,31 @@ class ChatRoomService:
)
db.add(room)
- # Record agents as join messages
+ # Add agents using RoomAgent association table
for agent_id in (agent_ids or []):
- msg = Message(
- id=str(uuid.uuid4()),
- room_id=room.id,
- role="agent",
- content=json.dumps({"type": "join", "agent_id": agent_id}),
- sender_name=agent_id
- )
- db.add(msg)
+ # Check if agent exists
+ agent = db.query(Agent).filter(Agent.id == agent_id).first()
+ if agent:
+ room_agent = RoomAgent(
+ room_id=room.id,
+ agent_id=agent_id
+ )
+ db.add(room_agent)
+
+ # Record system message
+ msg = Message(
+ id=str(uuid.uuid4()),
+ room_id=room.id,
+ sender_id=agent_id,
+ sender_type="system",
+ sender_name="System",
+ role="system",
+ content=json.dumps({"type": "agent_join", "agent_id": agent_id, "agent_name": agent.name})
+ )
+ db.add(msg)
db.commit()
- return room.to_dict()
+ return room.to_dict(include_agents=True)
finally:
db.close()
@@ -116,10 +133,14 @@ class ChatRoomService:
db.close()
def delete_room(self, room_id: str) -> bool:
+ """Delete a chat room and all related data"""
db = SessionLocal()
try:
room = db.query(ChatRoom).filter(ChatRoom.id == room_id).first()
if room:
+ # Delete related messages
+ db.query(Message).filter(Message.room_id == room_id).delete()
+ # RoomAgent will be cascade deleted due to relationship config
db.delete(room)
db.commit()
return True
@@ -160,6 +181,94 @@ class ChatRoomService:
finally:
db.close()
+ def add_agent_to_room(self, room_id: str, agent_id: str) -> bool:
+ """Add an agent to a chat room using RoomAgent association table"""
+ db = SessionLocal()
+ try:
+ room = db.query(ChatRoom).filter(ChatRoom.id == room_id).first()
+ if not room:
+ return False
+
+ # Check if agent exists
+ agent = db.query(Agent).filter(Agent.id == agent_id).first()
+ if not agent:
+ return False
+
+ # Check if already in room
+ existing = db.query(RoomAgent).filter(
+ RoomAgent.room_id == room_id,
+ RoomAgent.agent_id == agent_id
+ ).first()
+
+ if existing:
+ # Reactivate if was removed
+ if not existing.is_active:
+ existing.is_active = True
+ existing.joined_at = datetime.now()
+ db.commit()
+ return True
+
+ # Add new association
+ room_agent = RoomAgent(
+ room_id=room_id,
+ agent_id=agent_id
+ )
+ db.add(room_agent)
+
+ # Record system message
+ msg = Message(
+ id=str(uuid.uuid4()),
+ room_id=room_id,
+ sender_id=agent_id,
+ sender_type="system",
+ sender_name="System",
+ role="system",
+ content=json.dumps({"type": "agent_join", "agent_id": agent_id, "agent_name": agent.name})
+ )
+ db.add(msg)
+
+ db.commit()
+ return True
+ except Exception as e:
+ logger.error(f"Failed to add agent to room: {e}")
+ db.rollback()
+ return False
+ finally:
+ db.close()
+
+ def remove_agent_from_room(self, room_id: str, agent_id: str) -> bool:
+ """Remove an agent from a chat room"""
+ db = SessionLocal()
+ try:
+ # Soft delete: mark as inactive
+ result = db.query(RoomAgent).filter(
+ RoomAgent.room_id == room_id,
+ RoomAgent.agent_id == agent_id
+ ).update({"is_active": False})
+
+ if result > 0:
+ # Record system message
+ agent = db.query(Agent).filter(Agent.id == agent_id).first()
+ msg = Message(
+ id=str(uuid.uuid4()),
+ room_id=room_id,
+ sender_id=agent_id,
+ sender_type="system",
+ sender_name="System",
+ role="system",
+ content=json.dumps({"type": "agent_leave", "agent_id": agent_id, "agent_name": agent.name if agent else agent_id})
+ )
+ db.add(msg)
+ db.commit()
+ return True
+ return False
+ except Exception as e:
+ logger.error(f"Failed to remove agent from room: {e}")
+ db.rollback()
+ return False
+ finally:
+ db.close()
+
def get_messages(self, room_id: str, limit: int = 50, before_id: str = None) -> List[Dict]:
db = SessionLocal()
try:
@@ -175,29 +284,59 @@ class ChatRoomService:
def save_message(
self,
room_id: str,
- role: str,
+ sender_type: str,
sender_name: str,
content: str,
+ sender_id: str = None,
mentions: List[str] = None,
- token_count: int = 0
+ token_count: int = 0,
+ is_streaming: bool = False,
+ stream_id: str = None,
+ parent_id: str = None
) -> Dict:
+ """Save a message to the room
+
+ Args:
+ room_id: Room ID
+ sender_type: "user" | "agent" | "system"
+ sender_name: Display name of sender
+ content: Message content (can be plain text or JSON string)
+ sender_id: Sender ID (user_id or agent_id)
+ mentions: List of mentioned agent IDs
+ token_count: Token usage count
+ is_streaming: Whether this is a streaming message
+ stream_id: Streaming session ID
+ parent_id: Parent message ID (for replies)
+ """
db = SessionLocal()
try:
+ # Resolve sender_id from sender_name if not provided
+ if not sender_id:
+ sender_id = sender_name
+
+ # Wrap plain text content in JSON format
+ if not content.startswith('{'):
+ content = json.dumps({"text": content})
+
msg = Message(
id=str(uuid.uuid4()),
room_id=room_id,
- role=role,
+ sender_id=str(sender_id),
+ sender_type=sender_type,
sender_name=sender_name,
+ role=sender_type, # Keep role in sync
content=content,
mentions=json.dumps(mentions) if mentions else None,
- token_count=token_count
+ token_count=token_count,
+ is_streaming=is_streaming,
+ stream_id=stream_id,
+ parent_id=parent_id
)
db.add(msg)
# Update room updated_at
room = db.query(ChatRoom).filter(ChatRoom.id == room_id).first()
if room:
- from datetime import datetime
room.updated_at = datetime.now()
db.commit()
@@ -206,8 +345,19 @@ class ChatRoomService:
db.close()
async def process_message(
- self, room_id: str, user_message: str, sender_id: str, sender_name: str = None
+ self, room_id: str, user_message: str, sender_id: str, sender_name: str = None,
+ context: dict = None, skip_save_user_message: bool = False
):
+ """Process a message and trigger agent responses
+
+ Args:
+ room_id: Room ID
+ user_message: The user's message content
+ sender_id: Sender ID (user_id or agent_id)
+ sender_name: Sender display name
+ context: Additional context
+ skip_save_user_message: If True, skip saving user message (already saved by caller)
+ """
room = self.get_room(room_id)
if not room:
yield {"event": "error", "data": {"content": "Chat room not found"}}
@@ -218,9 +368,10 @@ class ChatRoomService:
yield {"event": "error", "data": {"content": "No agents available"}}
return
- # Check if sender is agent
+ # Determine sender type
from luxx.agents.registry import agent_registry
sender_is_agent = agent_registry.get(sender_id) is not None
+ sender_type = "agent" if sender_is_agent else "user"
# Filter out sender if agent
if sender_is_agent:
@@ -243,15 +394,108 @@ class ChatRoomService:
yield {"event": "no_response", "data": {"message": "No agents triggered"}}
return
- # Get history
+ # Save user message (or use existing one if already saved)
+ if skip_save_user_message:
+ # Get the message that was already saved
+ from luxx.core.database import SessionLocal
+ from luxx.models.chat import Message
+ db = SessionLocal()
+ try:
+ recent_msg = db.query(Message).filter(
+ Message.room_id == room_id
+ ).order_by(Message.created_at.desc()).first()
+ user_msg = recent_msg.to_dict() if recent_msg else {"id": None}
+ finally:
+ db.close()
+ else:
+ user_msg = self.save_message(
+ room_id=room_id,
+ sender_type="user",
+ sender_name=sender_name or "User",
+ content=user_message,
+ sender_id=str(sender_id),
+ mentions=[a.agent_id for a in triggered] if mentions else None
+ )
+
+ # Get history for context
messages = self.get_messages(room_id, limit=20)
- # Stream responses
+ # Stream responses with new event format
for agent in triggered:
- async for event in agent.stream_response(user_message, messages):
- yield event
+ stream_id = f"stream_{uuid.uuid4().hex[:8]}"
+
+ # Emit stream_start
+ yield {
+ "event": "stream_start",
+ "data": {
+ "stream_id": stream_id,
+ "message_id": None, # Will be set when complete
+ "agent": {"id": agent.agent_id, "name": agent.name},
+ "parent_message_id": user_msg["id"]
+ }
+ }
+
+ full_content = ""
+
+ # Parse SSE string and transform to new format
+ async for sse_str in agent.stream_response(user_message, messages):
+ # SSE format: "event: xxx\ndata: {...}\n\n"
+ try:
+ event_type = "process_step" # default
+ data_str = ""
+
+ for line in sse_str.strip().split('\n'):
+ line = line.strip()
+ if line.startswith('event: '):
+ event_type = line[7:].strip()
+ elif line.startswith('data: '):
+ data_str = line[6:].strip()
+
+ if not data_str:
+ continue
+
+ import json
+ data = json.loads(data_str)
+
+ if event_type == "process_step":
+ step = data.get("step", {})
+ full_content = step.get("content", full_content)
+ yield {
+ "event": "stream_step",
+ "data": {
+ "stream_id": stream_id,
+ "step": {
+ "id": step.get("id", "step_0"),
+ "type": step.get("type", "text"),
+ "delta": step.get("content", ""),
+ "full": full_content,
+ "done": False
+ }
+ }
+ }
+ elif event_type == "done":
+ yield {
+ "event": "stream_end",
+ "data": {
+ "stream_id": stream_id,
+ "content": full_content,
+ "token_count": data.get("token_count", 0),
+ "usage": data.get("usage", {})
+ }
+ }
+ elif event_type == "error":
+ yield {
+ "event": "stream_error",
+ "data": {
+ "stream_id": stream_id,
+ "error": data.get("content", "Unknown error")
+ }
+ }
+ except Exception as e:
+ logger.error(f"Error parsing SSE string: {e}, raw: {sse_str[:100]}")
+ continue
- self.save_message(room_id, "user", sender_id, sender_name, user_message)
+ yield {"event": "message_sent", "data": {"message": user_msg}}
# Global instance
diff --git a/luxx/services/room_ws.py b/luxx/services/room_ws.py
index 0302d7c..58696dd 100644
--- a/luxx/services/room_ws.py
+++ b/luxx/services/room_ws.py
@@ -1,4 +1,5 @@
"""WebSocket handler for Chat Rooms - unified user and agent participants."""
+import json
import logging
from typing import Dict, Set
from fastapi import WebSocket, WebSocketDisconnect
@@ -9,6 +10,11 @@ from luxx.services.participant import participant_service
logger = logging.getLogger(__name__)
+def _ws_message(event: str, data: dict) -> dict:
+ """Create a standardized WebSocket message"""
+ return {"event": event, "data": data}
+
+
class ConnectionManager:
def __init__(self):
self._rooms: Dict[str, Set[WebSocket]] = {}
@@ -18,25 +24,44 @@ class ConnectionManager:
await ws.accept()
self._rooms.setdefault(room_id, set()).add(ws)
self._info[ws] = {"type": ptype, "id": pid, "name": pname}
- await ws.send_json({"event": "connected", "data": {"room_id": room_id, "type": ptype}})
+ await ws.send_json(_ws_message("connected", {
+ "room_id": room_id,
+ "participant_type": ptype,
+ "participant_id": pid,
+ "joined_at": None # Will be set by caller
+ }))
def disconnect(self, ws: WebSocket):
info = self._info.pop(ws, {})
- room = self._rooms.get(self._info.get(ws, {}).get("id"))
- if room:
- room.discard(ws)
- if not room:
- del self._rooms[room]
+ for room_id, room_ws in list(self._rooms.items()):
+ if ws in room_ws:
+ room_ws.discard(ws)
+ if not room_ws:
+ del self._rooms[room_id]
+ break
return info
async def broadcast(self, room_id: str, msg: dict, exclude: WebSocket = None):
- for ws in self._rooms.get(room_id, set()):
+ """Broadcast message to all clients in a room"""
+ ws_list = list(self._rooms.get(room_id, set()))
+ for ws in ws_list:
if ws != exclude:
try:
await ws.send_json(msg)
except:
self.disconnect(ws)
+ async def send_to(self, ws: WebSocket, msg: dict):
+ """Send message to a specific client"""
+ try:
+ await ws.send_json(msg)
+ except:
+ self.disconnect(ws)
+
+ async def broadcast_to_room(self, room_id: str, msg: dict, exclude: WebSocket = None):
+ """Alias for broadcast - for compatibility with participant_service"""
+ await self.broadcast(room_id, msg, exclude)
+
def size(self, room_id: str) -> int:
return len(self._rooms.get(room_id, set()))
@@ -45,6 +70,7 @@ cm = ConnectionManager()
async def websocket_handler(ws: WebSocket, room_id: str):
+ """Main WebSocket handler for chat rooms"""
params = dict(ws.query_params)
ptype = params.get("participant_type", "user")
pid = params.get("participant_id", "")
@@ -54,27 +80,61 @@ async def websocket_handler(ws: WebSocket, room_id: str):
room = chat_room_service.get_room(room_id)
if not room:
- await ws.send_json({"event": "error", "data": {"content": "Room not found"}})
+ await ws.send_json(_ws_message("error", {"content": "Room not found"}))
await ws.close()
return
+ # Register agent if applicable
if ptype == "agent" and pid:
agent = chat_room_service.get_agent(pid)
if agent:
participant_service.register_agent(agent)
try:
+ # Get room agents info (only once)
+ agents = chat_room_service.get_room_agents_info(room_id)
+
+ # Send room info
+ room_dict = room.to_dict()
+ room_dict["agents"] = agents
+ await ws.send_json(_ws_message("room_info", {
+ "room": room_dict
+ }))
+
# Send history
- await ws.send_json({"event": "history", "data": {"messages": chat_room_service.get_messages(room_id)}})
- await ws.send_json({"event": "agents", "data": {
- "agents": [a.to_dict() for a in chat_room_service.get_room_agents(room_id)]
- }})
+ messages = chat_room_service.get_messages(room_id)
+ await ws.send_json(_ws_message("history", {
+ "messages": messages,
+ "has_more": False
+ }))
- await cm.broadcast(room_id, {
- "event": "system",
- "data": {"content": f"{pname} joined", "type": f"{ptype}_join"}
- }, exclude=ws)
+ # Send agents list (from RoomAgent table - stable source)
+ await ws.send_json(_ws_message("agents", {
+ "agents": agents,
+ "count": len(agents)
+ }))
+ # Broadcast join event
+ join_msg = chat_room_service.save_message(
+ room_id=room_id,
+ sender_type="system",
+ sender_name="System",
+ content=json.dumps({
+ "type": "participant_join",
+ "participant_type": ptype,
+ "participant_id": pid,
+ "participant_name": pname
+ }),
+ sender_id=pid
+ )
+ await cm.broadcast(room_id, _ws_message("system", {
+ "type": "participant_join",
+ "sender": {"id": pid, "type": ptype, "name": pname},
+ "content": f"{pname} joined the room",
+ "message": join_msg
+ }), exclude=ws)
+
+ # Main message loop
while True:
data = await ws.receive_json()
action = data.get("action")
@@ -84,27 +144,94 @@ async def websocket_handler(ws: WebSocket, room_id: str):
if not content:
continue
- sid = pid if ptype == "agent" else str(data.get("user_id", pid or "anonymous"))
- sname = pname if ptype == "agent" else data.get("user_name", pname or "Anonymous")
+ reply_to = data.get("reply_to") # Optional: reply to a message
+ mentions = data.get("mentions", []) # Optional: mentioned agents
- async for event in participant_service.process_message(
- room_id, content, sid, sname, ptype
+ # Save user message first
+ user_msg = chat_room_service.save_message(
+ room_id=room_id,
+ sender_type=ptype,
+ sender_name=pname,
+ content=content,
+ sender_id=pid,
+ mentions=mentions,
+ parent_id=reply_to
+ )
+
+ # Broadcast user message
+ await cm.broadcast(room_id, _ws_message("message", {
+ "message": user_msg
+ }))
+
+ # Process and broadcast agent responses
+ sender_id = pid if ptype == "agent" else str(data.get("user_id", pid or "anonymous"))
+ sender_name = pname if ptype == "agent" else data.get("user_name", pname or "Anonymous")
+
+ async for event in chat_room_service.process_message(
+ room_id, content, sender_id, sender_name, skip_save_user_message=True
):
- if event.get("event") in ["process_step", "done", "error"]:
- await cm.broadcast(room_id, {
- "event": event["event"],
- "data": event.get("data", {}),
- "agent_id": event.get("agent_id")
- })
+ # Broadcast stream events to all clients
+ await cm.broadcast(room_id, event)
+
+ # Also send the final message to message list
+ if event.get("event") == "stream_end":
+ stream_data = event.get("data", {})
+ agent_info = event.get("data", {}).get("agent")
+
+ # Save agent response as final message
+ agent_msg = chat_room_service.save_message(
+ room_id=room_id,
+ sender_type="agent",
+ sender_name=agent_info.get("name", "Agent") if agent_info else "Agent",
+ content=stream_data.get("content", ""),
+ sender_id=agent_info.get("id") if agent_info else None,
+ token_count=stream_data.get("token_count", 0),
+ parent_id=user_msg.get("id")
+ )
+
+ # Broadcast saved message
+ await cm.broadcast(room_id, _ws_message("message", {
+ "message": agent_msg
+ }))
+
+ elif action == "join":
+ # Handle re-join with updated info
+ ptype = data.get("participant_type", ptype)
+ pid = data.get("participant_id", pid)
+ pname = data.get("participant_name", pname)
+ cm._info[ws] = {"type": ptype, "id": pid, "name": pname}
+ await ws.send_json(_ws_message("joined", {
+ "participant_type": ptype,
+ "participant_id": pid,
+ "participant_name": pname
+ }))
elif action == "ping":
- await ws.send_json({"event": "pong", "data": {}})
+ await ws.send_json(_ws_message("pong", {}))
except WebSocketDisconnect:
- await cm.broadcast(room_id, {"event": "system", "data": {"content": f"{pname} left", "type": "leave"}})
+ # Broadcast leave event
+ leave_msg = chat_room_service.save_message(
+ room_id=room_id,
+ sender_type="system",
+ sender_name="System",
+ content=json.dumps({
+ "type": "participant_leave",
+ "participant_type": ptype,
+ "participant_id": pid,
+ "participant_name": pname
+ }),
+ sender_id=pid
+ )
+ await cm.broadcast(room_id, _ws_message("system", {
+ "type": "participant_leave",
+ "sender": {"id": pid, "type": ptype, "name": pname},
+ "content": f"{pname} left the room",
+ "message": leave_msg
+ }))
except Exception as e:
logger.error(f"WebSocket error: {e}")
- await cm.broadcast(room_id, {"event": "error", "data": {"content": str(e)}})
+ await cm.broadcast(room_id, _ws_message("error", {"content": str(e)}))
finally:
cm.disconnect(ws)