""" Module for managing collector status, health, and telemetry. This module provides a dedicated class to encapsulate the status, health checks, and statistical data for a data collector, promoting modularity and separation of concerns. """ from datetime import datetime, timezone, timedelta from enum import Enum from typing import Any, Dict, Optional class CollectorStatus(Enum): """Status of the data collector.""" STOPPED = "stopped" STARTING = "starting" RUNNING = "running" STOPPING = "stopping" ERROR = "error" RECONNECTING = "reconnecting" UNHEALTHY = "unhealthy" # Added for health monitoring class CollectorStateAndTelemetry: """ Manages the operational state, health, and performance statistics of a data collector. """ def __init__(self, exchange_name: str, component_name: str, health_check_interval: float = 30.0, max_silence_duration: timedelta = timedelta(minutes=5), logger=None, log_errors_only: bool = False): self.exchange_name = exchange_name self.component_name = component_name self.health_check_interval = health_check_interval self._max_silence_duration = max_silence_duration self.logger = logger self.log_errors_only = log_errors_only # Collector state self.status = CollectorStatus.STOPPED self._running = False self._should_be_running = False # Track desired state # Health monitoring self._last_heartbeat = datetime.now(timezone.utc) self._last_data_received = None # Statistics self._stats = { 'messages_received': 0, 'messages_processed': 0, 'errors': 0, 'restarts': 0, 'last_message_time': None, 'connection_uptime': None, 'last_error': None, 'last_restart_time': None } def _log_debug(self, message: str) -> None: """Log debug message if logger is available and not in errors-only mode.""" if self.logger and not self.log_errors_only: self.logger.debug(message) def _log_info(self, message: str) -> None: """Log info message if logger is available and not in errors-only mode.""" if self.logger and not self.log_errors_only: self.logger.info(message) def _log_warning(self, message: str) -> None: """Log warning message if logger is available and not in errors-only mode.""" if self.logger and not self.log_errors_only: self.logger.warning(message) def _log_error(self, message: str, exc_info: bool = False) -> None: """Log error message if logger is available (always logs errors regardless of log_errors_only).""" if self.logger: self.logger.error(message, exc_info=exc_info) def _log_critical(self, message: str, exc_info: bool = False) -> None: """Log critical message if logger is available (always logs critical regardless of log_errors_only).""" if self.logger: self.logger.critical(message, exc_info=exc_info) def update_status(self, new_status: CollectorStatus) -> None: """Update the collector's operational status.""" self.status = new_status self._log_debug(f"Collector status updated to: {new_status.value}") def set_running_state(self, is_running: bool) -> None: """Set the internal running state.""" self._running = is_running self._log_debug(f"Collector internal running state set to: {is_running}") def set_should_be_running(self, should_run: bool) -> None: """Set the desired running state.""" self._should_be_running = should_run self._log_debug(f"Collector desired running state set to: {should_run}") def update_heartbeat(self) -> None: """Update the last heartbeat timestamp.""" self._last_heartbeat = datetime.now(timezone.utc) self._log_debug("Heartbeat updated") def update_data_received_timestamp(self) -> None: """Update the last data received timestamp.""" self._last_data_received = datetime.now(timezone.utc) self._log_debug("Last data received timestamp updated") def increment_messages_received(self) -> None: """Increment the count of messages received.""" self._stats['messages_received'] += 1 self._log_debug(f"Messages received: {self._stats['messages_received']}") def increment_messages_processed(self) -> None: """Increment the count of messages processed.""" self._stats['messages_processed'] += 1 self._log_debug(f"Messages processed: {self._stats['messages_processed']}") def increment_errors(self, error_message: str) -> None: """Increment error count and store the last error message.""" self._stats['errors'] += 1 self._stats['last_error'] = error_message self._log_error(f"Error count: {self._stats['errors']}, Last error: {error_message}") def increment_restarts(self) -> None: """Increment restart count and update last restart time.""" self._stats['restarts'] += 1 self._stats['last_restart_time'] = datetime.now(timezone.utc) self._log_info(f"Collector restarts: {self._stats['restarts']}") def set_connection_uptime_start(self) -> None: """Set the connection uptime start time.""" self._stats['connection_uptime'] = datetime.now(timezone.utc) self._log_debug("Connection uptime start set") def get_status(self) -> Dict[str, Any]: """ Get current collector status and statistics. Returns: Dictionary containing status information """ uptime_seconds = None if self._stats['connection_uptime']: uptime_seconds = (datetime.now(timezone.utc) - self._stats['connection_uptime']).total_seconds() time_since_heartbeat = None if self._last_heartbeat: time_since_heartbeat = (datetime.now(timezone.utc) - self._last_heartbeat).total_seconds() time_since_data = None if self._last_data_received: time_since_data = (datetime.now(timezone.utc) - self._last_data_received).total_seconds() return { 'exchange': self.exchange_name, 'status': self.status.value, 'should_be_running': self._should_be_running, 'auto_restart': True, # This will be managed by the base collector, not state 'health': { 'time_since_heartbeat': time_since_heartbeat, 'time_since_data': time_since_data, 'max_silence_duration': self._max_silence_duration.total_seconds() }, 'statistics': { **self._stats, 'uptime_seconds': uptime_seconds, 'reconnect_attempts': 0 # This will be managed by connection manager } } def get_health_status(self) -> Dict[str, Any]: """ Get detailed health status for monitoring. Returns: Dictionary containing health information """ now = datetime.now(timezone.utc) is_healthy = True health_issues = [] # Check if should be running but isn't if self._should_be_running and not self._running: is_healthy = False health_issues.append("Should be running but is stopped") # Check heartbeat if self._last_heartbeat: time_since_heartbeat = now - self._last_heartbeat if time_since_heartbeat > timedelta(seconds=self.health_check_interval * 2): is_healthy = False health_issues.append(f"No heartbeat for {time_since_heartbeat.total_seconds():.1f}s") # Check data freshness if self._last_data_received: time_since_data = now - self._last_data_received if time_since_data > self._max_silence_duration: is_healthy = False health_issues.append(f"No data for {time_since_data.total_seconds():.1f}s") # Check for error status if self.status in [CollectorStatus.ERROR, CollectorStatus.UNHEALTHY]: is_healthy = False health_issues.append(f"Status: {self.status.value}") return { 'is_healthy': is_healthy, 'issues': health_issues, 'status': self.status.value, 'last_heartbeat': self._last_heartbeat.isoformat() if self._last_heartbeat else None, 'last_data_received': self._last_data_received.isoformat() if self._last_data_received else None, 'should_be_running': self._should_be_running, 'is_running': self._running }