TCPDashboard/data/manager_components/manager_health_monitor.py

185 lines
7.3 KiB
Python
Raw Permalink Normal View History

"""
Manager Health Monitor for monitoring collector health and auto-recovery.
This module handles health monitoring of data collectors including periodic health checks,
auto-restart functionality, and health status tracking.
"""
import asyncio
from datetime import datetime, timezone
from typing import Set, Dict, Optional
from ..collector.base_collector import BaseDataCollector, CollectorStatus
class ManagerHealthMonitor:
"""Monitors the health of data collectors and provides auto-recovery."""
def __init__(self,
global_health_check_interval: float = 60.0,
logger_manager=None,
lifecycle_manager=None):
"""
Initialize the health monitor.
Args:
global_health_check_interval: Seconds between global health checks
logger_manager: Logger manager instance for logging operations
lifecycle_manager: Lifecycle manager for restart operations
"""
self.global_health_check_interval = global_health_check_interval
self.logger_manager = logger_manager
self.lifecycle_manager = lifecycle_manager
# Health monitoring state
self._running = False
self._last_global_check = datetime.now(timezone.utc)
self._global_health_task: Optional[asyncio.Task] = None
# Health statistics
self._health_stats = {
'last_global_check': None,
'running_collectors': 0,
'failed_collectors': 0
}
def set_running_state(self, running: bool) -> None:
"""Set the running state of the monitor."""
self._running = running
def get_health_stats(self) -> Dict:
"""Get health monitoring statistics."""
return self._health_stats.copy()
def get_last_global_check(self) -> datetime:
"""Get the timestamp of the last global health check."""
return self._last_global_check
async def start_monitoring(self) -> None:
"""Start the global health monitoring task."""
if self._global_health_task and not self._global_health_task.done():
if self.logger_manager:
self.logger_manager.log_warning("Health monitoring is already running")
return
if self.logger_manager:
self.logger_manager.log_debug("Starting health monitoring")
self._global_health_task = asyncio.create_task(self._global_health_monitor())
async def stop_monitoring(self) -> None:
"""Stop the global health monitoring task."""
if self._global_health_task and not self._global_health_task.done():
self._global_health_task.cancel()
try:
await self._global_health_task
except asyncio.CancelledError:
pass
if self.logger_manager:
self.logger_manager.log_debug("Health monitoring stopped")
async def _global_health_monitor(self) -> None:
"""Global health monitoring for all collectors."""
if self.logger_manager:
self.logger_manager.log_debug("Starting global health monitor")
while self._running:
try:
await asyncio.sleep(self.global_health_check_interval)
self._last_global_check = datetime.now(timezone.utc)
self._health_stats['last_global_check'] = self._last_global_check
# Perform health check if lifecycle manager is available
if self.lifecycle_manager:
await self._perform_health_check()
except asyncio.CancelledError:
if self.logger_manager:
self.logger_manager.log_debug("Global health monitor cancelled")
break
except Exception as e:
if self.logger_manager:
self.logger_manager.log_error(f"Error in global health monitor: {e}", exc_info=True)
await asyncio.sleep(self.global_health_check_interval)
async def _perform_health_check(self) -> None:
"""Perform health check on all enabled collectors."""
if not self.lifecycle_manager:
return
enabled_collectors = self.lifecycle_manager.get_enabled_collectors()
collectors = self.lifecycle_manager.get_collectors()
running_count = 0
failed_count = 0
for collector_name in enabled_collectors:
if collector_name not in collectors:
continue
collector = collectors[collector_name]
health_status = collector.get_health_status()
if health_status['is_healthy'] and collector.status == CollectorStatus.RUNNING:
running_count += 1
elif not health_status['is_healthy']:
failed_count += 1
if self.logger_manager:
self.logger_manager.log_warning(
f"Collector {collector_name} is unhealthy: {health_status['issues']}"
)
# Auto-restart if needed and not already restarting
config = self.lifecycle_manager.get_collector_config(collector_name)
if (config and config.auto_restart and
collector.status not in [CollectorStatus.STARTING, CollectorStatus.STOPPING]):
if self.logger_manager:
self.logger_manager.log_info(f"Auto-restarting unhealthy collector: {collector_name}")
# Create restart task without awaiting to avoid blocking
asyncio.create_task(self.lifecycle_manager.restart_collector(collector_name))
# Update health statistics
self._health_stats['running_collectors'] = running_count
self._health_stats['failed_collectors'] = failed_count
if self.logger_manager:
self.logger_manager.log_debug(
f"Health check complete - Running: {running_count}, Failed: {failed_count}"
)
async def perform_immediate_health_check(self) -> Dict[str, Dict]:
"""
Perform an immediate health check on all collectors.
Returns:
Dictionary mapping collector names to their health status
"""
if not self.lifecycle_manager:
return {}
enabled_collectors = self.lifecycle_manager.get_enabled_collectors()
collectors = self.lifecycle_manager.get_collectors()
health_results = {}
for collector_name in enabled_collectors:
if collector_name not in collectors:
continue
collector = collectors[collector_name]
health_status = collector.get_health_status()
health_results[collector_name] = {
'is_healthy': health_status['is_healthy'],
'status': collector.status.value,
'issues': health_status.get('issues', [])
}
return health_results
def get_health_task(self) -> Optional[asyncio.Task]:
"""Get the current health monitoring task."""
return self._global_health_task