""" Collector Lifecycle Manager for handling collector lifecycle operations. This module handles the lifecycle of data collectors including adding, removing, enabling, disabling, starting, and restarting collectors. """ import asyncio import time from typing import Dict, Set, Optional from ..base_collector import BaseDataCollector, CollectorStatus from ..collector_types import CollectorConfig class CollectorLifecycleManager: """Manages the lifecycle of data collectors.""" def __init__(self, logger_manager=None): """ Initialize the lifecycle manager. Args: logger_manager: Logger manager instance for logging operations """ self.logger_manager = logger_manager # Collector storage self._collectors: Dict[str, BaseDataCollector] = {} self._collector_configs: Dict[str, CollectorConfig] = {} self._enabled_collectors: Set[str] = set() # Manager state self._running = False self._stats = {'total_collectors': 0, 'restarts_performed': 0} def set_running_state(self, running: bool) -> None: """Set the running state of the manager.""" self._running = running def get_stats(self) -> Dict: """Get lifecycle statistics.""" return self._stats.copy() def add_collector(self, collector: BaseDataCollector, config: Optional[CollectorConfig] = None) -> None: """ Add a collector to be managed. Args: collector: Data collector instance config: Optional configuration (will create default if not provided) """ # Use a more unique name to avoid duplicates collector_name = f"{collector.exchange_name}_{int(time.time() * 1000000) % 1000000}" # Ensure unique name counter = 1 base_name = collector_name while collector_name in self._collectors: collector_name = f"{base_name}_{counter}" counter += 1 if config is None: config = CollectorConfig( name=collector_name, exchange=collector.exchange_name, symbols=list(collector.symbols), data_types=[dt.value for dt in collector.data_types], auto_restart=collector.auto_restart, health_check_interval=collector._state_telemetry.health_check_interval ) self._collectors[collector_name] = collector self._collector_configs[collector_name] = config if config.enabled: self._enabled_collectors.add(collector_name) self._stats['total_collectors'] = len(self._collectors) if self.logger_manager: self.logger_manager.log_info( f"Added collector: {collector_name} ({collector.exchange_name}) - " f"Symbols: {', '.join(collector.symbols)} - Enabled: {config.enabled}" ) def remove_collector(self, collector_name: str) -> bool: """ Remove a collector from management. Args: collector_name: Name of the collector to remove Returns: True if removed successfully, False if not found """ if collector_name not in self._collectors: if self.logger_manager: self.logger_manager.log_warning(f"Collector not found: {collector_name}") return False # Stop the collector first (only if event loop is running) collector = self._collectors[collector_name] if collector.status != CollectorStatus.STOPPED: try: asyncio.create_task(collector.stop(force=True)) except RuntimeError: # No event loop running, just log if self.logger_manager: self.logger_manager.log_info( f"Collector {collector_name} will be removed without stopping (no event loop)" ) # Remove from management del self._collectors[collector_name] del self._collector_configs[collector_name] self._enabled_collectors.discard(collector_name) self._stats['total_collectors'] = len(self._collectors) if self.logger_manager: self.logger_manager.log_info(f"Removed collector: {collector_name}") return True def enable_collector(self, collector_name: str) -> bool: """ Enable a collector (will be started if manager is running). Args: collector_name: Name of the collector to enable Returns: True if enabled successfully, False if not found """ if collector_name not in self._collectors: if self.logger_manager: self.logger_manager.log_warning(f"Collector not found: {collector_name}") return False self._enabled_collectors.add(collector_name) self._collector_configs[collector_name].enabled = True # Start the collector if manager is running (only if event loop is running) if self._running: try: asyncio.create_task(self._start_collector(collector_name)) except RuntimeError: # No event loop running, will be started when manager starts if self.logger_manager: self.logger_manager.log_debug( f"Collector {collector_name} enabled but will start when manager starts" ) if self.logger_manager: self.logger_manager.log_info(f"Enabled collector: {collector_name}") return True def disable_collector(self, collector_name: str) -> bool: """ Disable a collector (will be stopped if running). Args: collector_name: Name of the collector to disable Returns: True if disabled successfully, False if not found """ if collector_name not in self._collectors: if self.logger_manager: self.logger_manager.log_warning(f"Collector not found: {collector_name}") return False self._enabled_collectors.discard(collector_name) self._collector_configs[collector_name].enabled = False # Stop the collector (only if event loop is running) collector = self._collectors[collector_name] try: asyncio.create_task(collector.stop(force=True)) except RuntimeError: # No event loop running, just log if self.logger_manager: self.logger_manager.log_debug( f"Collector {collector_name} disabled but cannot stop (no event loop)" ) if self.logger_manager: self.logger_manager.log_info(f"Disabled collector: {collector_name}") return True async def _start_collector(self, collector_name: str) -> bool: """ Start a specific collector. Args: collector_name: Name of the collector to start Returns: True if started successfully, False otherwise """ if collector_name not in self._collectors: if self.logger_manager: self.logger_manager.log_warning(f"Collector not found: {collector_name}") return False collector = self._collectors[collector_name] try: success = await collector.start() if success: if self.logger_manager: self.logger_manager.log_info(f"Started collector: {collector_name}") else: if self.logger_manager: self.logger_manager.log_error(f"Failed to start collector: {collector_name}") return success except Exception as e: if self.logger_manager: self.logger_manager.log_error(f"Error starting collector {collector_name}: {e}", exc_info=True) return False async def restart_collector(self, collector_name: str) -> bool: """ Restart a specific collector. Args: collector_name: Name of the collector to restart Returns: True if restarted successfully, False otherwise """ if collector_name not in self._collectors: if self.logger_manager: self.logger_manager.log_warning(f"Collector not found: {collector_name}") return False collector = self._collectors[collector_name] if self.logger_manager: self.logger_manager.log_info(f"Restarting collector: {collector_name}") try: success = await collector.restart() if success: self._stats['restarts_performed'] += 1 if self.logger_manager: self.logger_manager.log_info(f"Successfully restarted collector: {collector_name}") else: if self.logger_manager: self.logger_manager.log_error(f"Failed to restart collector: {collector_name}") return success except Exception as e: if self.logger_manager: self.logger_manager.log_error(f"Error restarting collector {collector_name}: {e}", exc_info=True) return False async def restart_all_collectors(self) -> Dict[str, bool]: """ Restart all enabled collectors. Returns: Dictionary mapping collector names to restart success status """ if self.logger_manager: self.logger_manager.log_info("Restarting all enabled collectors") results = {} restart_tasks = [] for collector_name in self._enabled_collectors: task = asyncio.create_task(self.restart_collector(collector_name)) restart_tasks.append((collector_name, task)) # Wait for all restarts to complete for collector_name, task in restart_tasks: try: results[collector_name] = await task except Exception as e: if self.logger_manager: self.logger_manager.log_error(f"Error restarting {collector_name}: {e}", exc_info=True) results[collector_name] = False successful_restarts = sum(1 for success in results.values() if success) if self.logger_manager: self.logger_manager.log_info( f"Restart complete - {successful_restarts}/{len(results)} collectors restarted successfully" ) return results async def start_all_enabled_collectors(self) -> None: """Start all enabled collectors.""" start_tasks = [] for collector_name in self._enabled_collectors: task = asyncio.create_task(self._start_collector(collector_name)) start_tasks.append(task) # Wait for all collectors to start (with timeout) if start_tasks: try: await asyncio.wait_for(asyncio.gather(*start_tasks, return_exceptions=True), timeout=30.0) except asyncio.TimeoutError: if self.logger_manager: self.logger_manager.log_warning("Some collectors took too long to start") async def stop_all_collectors(self) -> None: """Stop all collectors.""" stop_tasks = [] for collector in self._collectors.values(): task = asyncio.create_task(collector.stop(force=True)) stop_tasks.append(task) # Wait for all collectors to stop (with timeout) if stop_tasks: try: await asyncio.wait_for(asyncio.gather(*stop_tasks, return_exceptions=True), timeout=30.0) except asyncio.TimeoutError: if self.logger_manager: self.logger_manager.log_warning("Some collectors took too long to stop") # Getters for data access def get_collectors(self) -> Dict[str, BaseDataCollector]: """Get all collectors.""" return self._collectors def get_collector_configs(self) -> Dict[str, CollectorConfig]: """Get all collector configurations.""" return self._collector_configs def get_enabled_collectors(self) -> Set[str]: """Get enabled collector names.""" return self._enabled_collectors def get_collector(self, name: str) -> Optional[BaseDataCollector]: """Get a specific collector by name.""" return self._collectors.get(name) def get_collector_config(self, name: str) -> Optional[CollectorConfig]: """Get a specific collector config by name.""" return self._collector_configs.get(name)