- Implemented `_sanitize_error` method in `DataCollectionService` and `CollectorManager` to prevent leaking internal error details. - Improved error handling across various methods by catching specific exceptions and logging sanitized messages with `exc_info=True`. - Added file permission validation in `ServiceConfig` to ensure secure configuration file handling, including detailed logging for permission issues. - Refactored logging practices to enhance clarity and maintainability, ensuring consistent error reporting. These changes significantly bolster the security and robustness of the data collection services, aligning with project standards for error handling and maintainability.
226 lines
9.8 KiB
Python
226 lines
9.8 KiB
Python
"""
|
|
Data Collector Manager for supervising and managing multiple data collectors.
|
|
|
|
This module provides centralized management of data collectors with health monitoring,
|
|
auto-recovery, and coordinated lifecycle management.
|
|
"""
|
|
|
|
import asyncio
|
|
from typing import Dict, List, Optional, Any, Set
|
|
|
|
from utils.logger import get_logger
|
|
from .base_collector import BaseDataCollector, CollectorStatus
|
|
from .collector_types import ManagerStatus, CollectorConfig
|
|
from .manager_components import (
|
|
CollectorLifecycleManager,
|
|
ManagerHealthMonitor,
|
|
ManagerStatsTracker,
|
|
ManagerLogger
|
|
)
|
|
|
|
|
|
class CollectorManager:
|
|
"""
|
|
Manages multiple data collectors with health monitoring and auto-recovery.
|
|
|
|
The manager is responsible for:
|
|
- Starting and stopping collectors
|
|
- Health monitoring and auto-restart
|
|
- Coordinated lifecycle management
|
|
- Status reporting and metrics
|
|
"""
|
|
|
|
def __init__(self,
|
|
manager_name: str = "collector_manager",
|
|
global_health_check_interval: float = 60.0,
|
|
restart_delay: float = 5.0,
|
|
logger = None,
|
|
log_errors_only: bool = False):
|
|
"""Initialize the collector manager with component-based architecture."""
|
|
self.manager_name = manager_name
|
|
self.restart_delay = restart_delay
|
|
|
|
# Initialize components
|
|
self.logger_manager = ManagerLogger(logger, log_errors_only)
|
|
self.lifecycle_manager = CollectorLifecycleManager(self.logger_manager)
|
|
self.health_monitor = ManagerHealthMonitor(
|
|
global_health_check_interval, self.logger_manager, self.lifecycle_manager)
|
|
self.stats_tracker = ManagerStatsTracker(
|
|
30.0, self.logger_manager, self.lifecycle_manager, self.health_monitor)
|
|
|
|
# Manager state
|
|
self.status = ManagerStatus.STOPPED
|
|
self._running = False
|
|
self._tasks: Set[asyncio.Task] = set()
|
|
|
|
if self.logger_manager.is_debug_enabled():
|
|
self.logger_manager.log_info(f"Initialized collector manager: {manager_name}")
|
|
|
|
def _sanitize_error(self, message: str) -> str:
|
|
"""
|
|
Sanitize error message to prevent leaking internal details.
|
|
|
|
Args:
|
|
message: Original error message
|
|
|
|
Returns:
|
|
Sanitized error message
|
|
"""
|
|
# Delegate to the logger manager's sanitization method
|
|
return self.logger_manager._sanitize_error(message)
|
|
|
|
def add_collector(self, collector: BaseDataCollector, config: Optional[CollectorConfig] = None) -> None:
|
|
"""Add a collector to be managed."""
|
|
self.lifecycle_manager.add_collector(collector, config)
|
|
|
|
def remove_collector(self, collector_name: str) -> bool:
|
|
"""Remove a collector from management."""
|
|
return self.lifecycle_manager.remove_collector(collector_name)
|
|
|
|
def enable_collector(self, collector_name: str) -> bool:
|
|
"""Enable a collector (will be started if manager is running)."""
|
|
return self.lifecycle_manager.enable_collector(collector_name)
|
|
|
|
def disable_collector(self, collector_name: str) -> bool:
|
|
"""Disable a collector (will be stopped if running)."""
|
|
return self.lifecycle_manager.disable_collector(collector_name)
|
|
|
|
async def start(self) -> bool:
|
|
"""Start the collector manager and all enabled collectors."""
|
|
if self.status in [ManagerStatus.RUNNING, ManagerStatus.STARTING]:
|
|
self.logger_manager.log_warning("Collector manager is already running or starting")
|
|
return True
|
|
|
|
self.logger_manager.log_info("Starting collector manager")
|
|
self.status = ManagerStatus.STARTING
|
|
|
|
try:
|
|
self._running = True
|
|
|
|
# Set running state for all components
|
|
self.lifecycle_manager.set_running_state(True)
|
|
self.health_monitor.set_running_state(True)
|
|
self.stats_tracker.set_running_state(True)
|
|
|
|
# Start collectors and monitoring
|
|
await self.lifecycle_manager.start_all_enabled_collectors()
|
|
await self.health_monitor.start_monitoring()
|
|
|
|
# Track health monitoring task
|
|
health_task = self.health_monitor.get_health_task()
|
|
if health_task:
|
|
self._tasks.add(health_task)
|
|
health_task.add_done_callback(self._tasks.discard)
|
|
|
|
# Start statistics cache updates
|
|
await self.stats_tracker.start_cache_updates()
|
|
|
|
self.status = ManagerStatus.RUNNING
|
|
enabled_count = len(self.lifecycle_manager.get_enabled_collectors())
|
|
self.logger_manager.log_info(f"Collector manager started - Managing {enabled_count} collectors")
|
|
return True
|
|
|
|
except (asyncio.CancelledError, KeyboardInterrupt):
|
|
# Handle graceful shutdown scenarios
|
|
self.status = ManagerStatus.ERROR
|
|
self.logger_manager.log_warning("Collector manager startup was cancelled")
|
|
return False
|
|
except (ConnectionError, OSError, IOError) as e:
|
|
# Handle connection and I/O related errors
|
|
self.status = ManagerStatus.ERROR
|
|
sanitized_message = self._sanitize_error(f"Connection/IO error starting collector manager: {e}")
|
|
self.logger_manager.log_error(sanitized_message, exc_info=True)
|
|
return False
|
|
except (AttributeError, TypeError, ValueError) as e:
|
|
# Handle configuration and data validation errors
|
|
self.status = ManagerStatus.ERROR
|
|
sanitized_message = self._sanitize_error(f"Configuration error starting collector manager: {e}")
|
|
self.logger_manager.log_error(sanitized_message, exc_info=True)
|
|
return False
|
|
except Exception as e:
|
|
# Catch any other unexpected errors
|
|
self.status = ManagerStatus.ERROR
|
|
sanitized_message = self._sanitize_error(f"Unexpected error starting collector manager: {e}")
|
|
self.logger_manager.log_error(sanitized_message, exc_info=True)
|
|
return False
|
|
|
|
async def stop(self) -> None:
|
|
"""Stop the collector manager and all collectors."""
|
|
if self.status == ManagerStatus.STOPPED:
|
|
self.logger_manager.log_warning("Collector manager is already stopped")
|
|
return
|
|
|
|
self.logger_manager.log_info("Stopping collector manager")
|
|
self.status = ManagerStatus.STOPPING
|
|
self._running = False
|
|
|
|
try:
|
|
# Set running state for all components
|
|
self.lifecycle_manager.set_running_state(False)
|
|
self.health_monitor.set_running_state(False)
|
|
self.stats_tracker.set_running_state(False)
|
|
|
|
# Stop monitoring and statistics
|
|
await self.health_monitor.stop_monitoring()
|
|
await self.stats_tracker.stop_cache_updates()
|
|
|
|
# Cancel manager tasks
|
|
for task in list(self._tasks):
|
|
task.cancel()
|
|
if self._tasks:
|
|
await asyncio.gather(*self._tasks, return_exceptions=True)
|
|
|
|
# Stop all collectors
|
|
await self.lifecycle_manager.stop_all_collectors()
|
|
|
|
self.status = ManagerStatus.STOPPED
|
|
self.logger_manager.log_info("Collector manager stopped")
|
|
|
|
except (asyncio.CancelledError, KeyboardInterrupt):
|
|
# Handle graceful shutdown scenarios
|
|
self.status = ManagerStatus.ERROR
|
|
self.logger_manager.log_warning("Collector manager shutdown was interrupted")
|
|
except (ConnectionError, OSError, IOError) as e:
|
|
# Handle connection and I/O related errors during shutdown
|
|
self.status = ManagerStatus.ERROR
|
|
sanitized_message = self._sanitize_error(f"Connection/IO error stopping collector manager: {e}")
|
|
self.logger_manager.log_error(sanitized_message, exc_info=True)
|
|
except Exception as e:
|
|
# Catch any other unexpected errors during shutdown
|
|
self.status = ManagerStatus.ERROR
|
|
sanitized_message = self._sanitize_error(f"Unexpected error stopping collector manager: {e}")
|
|
self.logger_manager.log_error(sanitized_message, exc_info=True)
|
|
|
|
async def restart_collector(self, collector_name: str) -> bool:
|
|
"""Restart a specific collector."""
|
|
return await self.lifecycle_manager.restart_collector(collector_name)
|
|
|
|
async def restart_all_collectors(self) -> Dict[str, bool]:
|
|
"""Restart all enabled collectors."""
|
|
return await self.lifecycle_manager.restart_all_collectors()
|
|
|
|
def get_status(self, force_refresh: bool = False) -> Dict[str, Any]:
|
|
"""Get manager status and statistics."""
|
|
status_dict = self.stats_tracker.get_status(force_refresh)
|
|
status_dict['manager_status'] = self.status.value
|
|
return status_dict
|
|
|
|
def get_collector_status(self, collector_name: str) -> Optional[Dict[str, Any]]:
|
|
"""Get status for a specific collector."""
|
|
return self.stats_tracker.get_collector_status(collector_name)
|
|
|
|
def list_collectors(self) -> List[str]:
|
|
"""List all managed collector names."""
|
|
return self.stats_tracker.list_collectors()
|
|
|
|
def get_running_collectors(self) -> List[str]:
|
|
"""Get names of currently running collectors."""
|
|
return self.stats_tracker.get_running_collectors()
|
|
|
|
def get_failed_collectors(self) -> List[str]:
|
|
"""Get names of failed or unhealthy collectors."""
|
|
return self.stats_tracker.get_failed_collectors()
|
|
|
|
def __repr__(self) -> str:
|
|
"""String representation of the manager."""
|
|
return f"CollectorManager(name={self.manager_name}, status={self.status.value})" |