- Introduced a comprehensive data collection framework, including `CollectorServiceConfig`, `BaseDataCollector`, and `CollectorManager`, enhancing modularity and maintainability. - Developed `CollectorFactory` for streamlined collector creation, promoting separation of concerns and improved configuration handling. - Enhanced `DataCollectionService` to utilize the new architecture, ensuring robust error handling and logging practices. - Added `TaskManager` for efficient management of asynchronous tasks, improving performance and resource management. - Implemented health monitoring and auto-recovery features in `CollectorManager`, ensuring reliable operation of data collectors. - Updated imports across the codebase to reflect the new structure, ensuring consistent access to components. These changes significantly improve the architecture and maintainability of the data collection service, aligning with project standards for modularity, performance, and error handling.
226 lines
10 KiB
Python
226 lines
10 KiB
Python
"""
|
|
Data Collector Manager for supervising and managing multiple data collectors.
|
|
|
|
This module provides centralized management of data collectors with health monitoring,
|
|
auto-recovery, and coordinated lifecycle management.
|
|
"""
|
|
|
|
import asyncio
|
|
from typing import Dict, List, Optional, Any, Set
|
|
|
|
from utils.logger import get_logger
|
|
from utils.async_task_manager import TaskManager
|
|
from .base_collector import BaseDataCollector, CollectorStatus
|
|
from .collector_types import ManagerStatus, CollectorConfig
|
|
from ..manager_components import (
|
|
CollectorLifecycleManager,
|
|
ManagerHealthMonitor,
|
|
ManagerStatsTracker,
|
|
ManagerLogger
|
|
)
|
|
|
|
|
|
class CollectorManager:
|
|
"""
|
|
Manages multiple data collectors with health monitoring and auto-recovery.
|
|
|
|
The manager is responsible for:
|
|
- Starting and stopping collectors
|
|
- Health monitoring and auto-restart
|
|
- Coordinated lifecycle management
|
|
- Status reporting and metrics
|
|
"""
|
|
|
|
def __init__(self,
|
|
manager_name: str = "collector_manager",
|
|
global_health_check_interval: float = 60.0,
|
|
restart_delay: float = 5.0,
|
|
logger = None,
|
|
log_errors_only: bool = False):
|
|
"""Initialize the collector manager with component-based architecture."""
|
|
self.manager_name = manager_name
|
|
self.restart_delay = restart_delay
|
|
|
|
# Initialize components
|
|
self.logger_manager = ManagerLogger(logger, log_errors_only)
|
|
self.task_manager = TaskManager(f"{manager_name}_tasks", logger=logger)
|
|
self.lifecycle_manager = CollectorLifecycleManager(self.logger_manager)
|
|
self.health_monitor = ManagerHealthMonitor(
|
|
global_health_check_interval, self.logger_manager, self.lifecycle_manager)
|
|
self.stats_tracker = ManagerStatsTracker(
|
|
30.0, self.logger_manager, self.lifecycle_manager, self.health_monitor)
|
|
|
|
# Manager state
|
|
self.status = ManagerStatus.STOPPED
|
|
self._running = False
|
|
|
|
if self.logger_manager.is_debug_enabled():
|
|
self.logger_manager.log_info(f"Initialized collector manager: {manager_name}")
|
|
|
|
def _sanitize_error(self, message: str) -> str:
|
|
"""
|
|
Sanitize error message to prevent leaking internal details.
|
|
|
|
Args:
|
|
message: Original error message
|
|
|
|
Returns:
|
|
Sanitized error message
|
|
"""
|
|
# Delegate to the logger manager's sanitization method
|
|
return self.logger_manager._sanitize_error(message)
|
|
|
|
def add_collector(self, collector: BaseDataCollector, config: Optional[CollectorConfig] = None) -> None:
|
|
"""Add a collector to be managed."""
|
|
self.lifecycle_manager.add_collector(collector, config)
|
|
|
|
def remove_collector(self, collector_name: str) -> bool:
|
|
"""Remove a collector from management."""
|
|
return self.lifecycle_manager.remove_collector(collector_name)
|
|
|
|
def enable_collector(self, collector_name: str) -> bool:
|
|
"""Enable a collector (will be started if manager is running)."""
|
|
return self.lifecycle_manager.enable_collector(collector_name)
|
|
|
|
def disable_collector(self, collector_name: str) -> bool:
|
|
"""Disable a collector (will be stopped if running)."""
|
|
return self.lifecycle_manager.disable_collector(collector_name)
|
|
|
|
async def start(self) -> bool:
|
|
"""Start the collector manager and all enabled collectors."""
|
|
if self.status in [ManagerStatus.RUNNING, ManagerStatus.STARTING]:
|
|
self.logger_manager.log_warning("Collector manager is already running or starting")
|
|
return True
|
|
|
|
self.logger_manager.log_info("Starting collector manager")
|
|
self.status = ManagerStatus.STARTING
|
|
|
|
try:
|
|
self._running = True
|
|
|
|
# Set running state for all components
|
|
self.lifecycle_manager.set_running_state(True)
|
|
self.health_monitor.set_running_state(True)
|
|
self.stats_tracker.set_running_state(True)
|
|
|
|
# Start collectors and monitoring
|
|
await self.lifecycle_manager.start_all_enabled_collectors()
|
|
await self.health_monitor.start_monitoring()
|
|
|
|
# Track health monitoring task with task manager
|
|
health_task = self.health_monitor.get_health_task()
|
|
if health_task:
|
|
# Transfer task to task manager for better tracking
|
|
self.task_manager._tasks.add(health_task)
|
|
self.task_manager._task_names[health_task] = "health_monitor"
|
|
health_task.add_done_callback(self.task_manager._task_done_callback)
|
|
|
|
# Start statistics cache updates
|
|
await self.stats_tracker.start_cache_updates()
|
|
|
|
self.status = ManagerStatus.RUNNING
|
|
enabled_count = len(self.lifecycle_manager.get_enabled_collectors())
|
|
self.logger_manager.log_info(f"Collector manager started - Managing {enabled_count} collectors")
|
|
return True
|
|
|
|
except (asyncio.CancelledError, KeyboardInterrupt):
|
|
# Handle graceful shutdown scenarios
|
|
self.status = ManagerStatus.ERROR
|
|
self.logger_manager.log_warning("Collector manager startup was cancelled")
|
|
return False
|
|
except (ConnectionError, OSError, IOError) as e:
|
|
# Handle connection and I/O related errors
|
|
self.status = ManagerStatus.ERROR
|
|
sanitized_message = self._sanitize_error(f"Connection/IO error starting collector manager: {e}")
|
|
self.logger_manager.log_error(sanitized_message, exc_info=True)
|
|
return False
|
|
except (AttributeError, TypeError, ValueError) as e:
|
|
# Handle configuration and data validation errors
|
|
self.status = ManagerStatus.ERROR
|
|
sanitized_message = self._sanitize_error(f"Configuration error starting collector manager: {e}")
|
|
self.logger_manager.log_error(sanitized_message, exc_info=True)
|
|
return False
|
|
except Exception as e:
|
|
# Catch any other unexpected errors
|
|
self.status = ManagerStatus.ERROR
|
|
sanitized_message = self._sanitize_error(f"Unexpected error starting collector manager: {e}")
|
|
self.logger_manager.log_error(sanitized_message, exc_info=True)
|
|
return False
|
|
|
|
async def stop(self) -> None:
|
|
"""Stop the collector manager and all collectors."""
|
|
if self.status == ManagerStatus.STOPPED:
|
|
self.logger_manager.log_warning("Collector manager is already stopped")
|
|
return
|
|
|
|
self.logger_manager.log_info("Stopping collector manager")
|
|
self.status = ManagerStatus.STOPPING
|
|
self._running = False
|
|
|
|
try:
|
|
# Set running state for all components
|
|
self.lifecycle_manager.set_running_state(False)
|
|
self.health_monitor.set_running_state(False)
|
|
self.stats_tracker.set_running_state(False)
|
|
|
|
# Stop monitoring and statistics
|
|
await self.health_monitor.stop_monitoring()
|
|
await self.stats_tracker.stop_cache_updates()
|
|
|
|
# Gracefully shutdown task manager
|
|
await self.task_manager.shutdown(graceful=True)
|
|
|
|
# Stop all collectors
|
|
await self.lifecycle_manager.stop_all_collectors()
|
|
|
|
self.status = ManagerStatus.STOPPED
|
|
self.logger_manager.log_info("Collector manager stopped")
|
|
|
|
except (asyncio.CancelledError, KeyboardInterrupt):
|
|
# Handle graceful shutdown scenarios
|
|
self.status = ManagerStatus.ERROR
|
|
self.logger_manager.log_warning("Collector manager shutdown was interrupted")
|
|
except (ConnectionError, OSError, IOError) as e:
|
|
# Handle connection and I/O related errors during shutdown
|
|
self.status = ManagerStatus.ERROR
|
|
sanitized_message = self._sanitize_error(f"Connection/IO error stopping collector manager: {e}")
|
|
self.logger_manager.log_error(sanitized_message, exc_info=True)
|
|
except Exception as e:
|
|
# Catch any other unexpected errors during shutdown
|
|
self.status = ManagerStatus.ERROR
|
|
sanitized_message = self._sanitize_error(f"Unexpected error stopping collector manager: {e}")
|
|
self.logger_manager.log_error(sanitized_message, exc_info=True)
|
|
|
|
async def restart_collector(self, collector_name: str) -> bool:
|
|
"""Restart a specific collector."""
|
|
return await self.lifecycle_manager.restart_collector(collector_name)
|
|
|
|
async def restart_all_collectors(self) -> Dict[str, bool]:
|
|
"""Restart all enabled collectors."""
|
|
return await self.lifecycle_manager.restart_all_collectors()
|
|
|
|
def get_status(self, force_refresh: bool = False) -> Dict[str, Any]:
|
|
"""Get manager status and statistics."""
|
|
status_dict = self.stats_tracker.get_status(force_refresh)
|
|
status_dict['manager_status'] = self.status.value
|
|
return status_dict
|
|
|
|
def get_collector_status(self, collector_name: str) -> Optional[Dict[str, Any]]:
|
|
"""Get status for a specific collector."""
|
|
return self.stats_tracker.get_collector_status(collector_name)
|
|
|
|
def list_collectors(self) -> List[str]:
|
|
"""List all managed collector names."""
|
|
return self.stats_tracker.list_collectors()
|
|
|
|
def get_running_collectors(self) -> List[str]:
|
|
"""Get names of currently running collectors."""
|
|
return self.stats_tracker.get_running_collectors()
|
|
|
|
def get_failed_collectors(self) -> List[str]:
|
|
"""Get names of failed or unhealthy collectors."""
|
|
return self.stats_tracker.get_failed_collectors()
|
|
|
|
def __repr__(self) -> str:
|
|
"""String representation of the manager."""
|
|
return f"CollectorManager(name={self.manager_name}, status={self.status.value})" |