Luxx/luxx/agents/registry.py

64 lines
2.0 KiB
Python

"""Agent Registry - manages agent instances in memory"""
from typing import Dict, List, Optional
import logging
from luxx.agents.base import BaseAgent
logger = logging.getLogger(__name__)
class AgentRegistry:
"""Registry for managing agent instances"""
def __init__(self):
self._agents: Dict[str, BaseAgent] = {}
def register(self, agent: BaseAgent) -> None:
"""Register an agent instance"""
self._agents[agent.agent_id] = agent
logger.info(f"Registered agent: {agent.name} ({agent.role})")
def unregister(self, agent_id: str) -> bool:
"""Unregister an agent"""
if agent_id in self._agents:
agent = self._agents.pop(agent_id)
logger.info(f"Unregistered agent: {agent.name}")
return True
return False
def get(self, agent_id: str) -> Optional[BaseAgent]:
"""Get an agent by ID"""
return self._agents.get(agent_id)
def get_by_role(self, role: str) -> Optional[BaseAgent]:
"""Get an agent by role (returns first match)"""
for agent in self._agents.values():
if agent.role == role:
return agent
return None
def list_all(self) -> List[BaseAgent]:
"""List all registered agents"""
return list(self._agents.values())
def list_by_role(self, role: str) -> List[BaseAgent]:
"""List all agents with a specific role"""
return [a for a in self._agents.values() if a.role == role]
def list_auto_response(self) -> List[BaseAgent]:
"""List all agents that auto-respond"""
return [a for a in self._agents.values() if a.auto_response and not a.mention_trigger]
def list_mention_trigger(self) -> List[BaseAgent]:
"""List all agents that respond on mention"""
return [a for a in self._agents.values() if a.mention_trigger]
def clear(self) -> None:
"""Clear all registered agents"""
self._agents.clear()
logger.info("Cleared all registered agents")
# Global registry instance
agent_registry = AgentRegistry()