"""Agent Registry - manages agent instances in memory""" from typing import Dict, List, Optional import logging from luxx.agents.base import BaseAgent logger = logging.getLogger(__name__) class AgentRegistry: """Registry for managing agent instances""" def __init__(self): self._agents: Dict[str, BaseAgent] = {} def register(self, agent: BaseAgent) -> None: """Register an agent instance""" self._agents[agent.agent_id] = agent logger.info(f"Registered agent: {agent.name} ({agent.role})") def unregister(self, agent_id: str) -> bool: """Unregister an agent""" if agent_id in self._agents: agent = self._agents.pop(agent_id) logger.info(f"Unregistered agent: {agent.name}") return True return False def get(self, agent_id: str) -> Optional[BaseAgent]: """Get an agent by ID""" return self._agents.get(agent_id) def get_by_role(self, role: str) -> Optional[BaseAgent]: """Get an agent by role (returns first match)""" for agent in self._agents.values(): if agent.role == role: return agent return None def list_all(self) -> List[BaseAgent]: """List all registered agents""" return list(self._agents.values()) def list_by_role(self, role: str) -> List[BaseAgent]: """List all agents with a specific role""" return [a for a in self._agents.values() if a.role == role] def list_auto_response(self) -> List[BaseAgent]: """List all agents that auto-respond""" return [a for a in self._agents.values() if a.auto_response and not a.mention_trigger] def list_mention_trigger(self) -> List[BaseAgent]: """List all agents that respond on mention""" return [a for a in self._agents.values() if a.mention_trigger] def clear(self) -> None: """Clear all registered agents""" self._agents.clear() logger.info("Cleared all registered agents") # Global registry instance agent_registry = AgentRegistry()