From 60434afd5dc7199e6ee72e80429ac45a5f31b0c3 Mon Sep 17 00:00:00 2001 From: "Vasily.onl" Date: Mon, 9 Jun 2025 17:27:29 +0800 Subject: [PATCH] Refactor BaseDataCollector to utilize CollectorStateAndTelemetry for improved state management - Introduced a new `CollectorStateAndTelemetry` class to encapsulate the status, health checks, and statistics of the data collector, promoting modularity and separation of concerns. - Updated `BaseDataCollector` to replace direct status management with calls to the new telemetry class, enhancing maintainability and readability. - Refactored logging methods to utilize the telemetry class, ensuring consistent logging practices. - Modified the `OKXCollector` to integrate with the new telemetry system for improved status reporting and error handling. - Added comprehensive tests for the `CollectorStateAndTelemetry` class to ensure functionality and reliability. These changes streamline the data collector's architecture, aligning with project standards for maintainability and performance. --- data/base_collector.py | 244 +++++------------- data/collector/collector_state_telemetry.py | 223 ++++++++++++++++ data/collector_manager.py | 2 +- data/exchanges/okx/collector.py | 189 +++++--------- tasks/tasks-base-collector-refactoring.md | 66 +++++ .../test_collector_state_telemetry.py | 185 +++++++++++++ 6 files changed, 615 insertions(+), 294 deletions(-) create mode 100644 data/collector/collector_state_telemetry.py create mode 100644 tasks/tasks-base-collector-refactoring.md create mode 100644 tests/data/collector/test_collector_state_telemetry.py diff --git a/data/base_collector.py b/data/base_collector.py index 02a7ea7..4d7aaed 100644 --- a/data/base_collector.py +++ b/data/base_collector.py @@ -14,6 +14,7 @@ from dataclasses import dataclass from enum import Enum from utils.logger import get_logger +from .collector.collector_state_telemetry import CollectorStatus, CollectorStateAndTelemetry class DataType(Enum): @@ -25,17 +26,6 @@ class DataType(Enum): BALANCE = "balance" -class CollectorStatus(Enum): - """Status of the data collector.""" - STOPPED = "stopped" - STARTING = "starting" - RUNNING = "running" - STOPPING = "stopping" - ERROR = "error" - RECONNECTING = "reconnecting" - UNHEALTHY = "unhealthy" # Added for health monitoring - - @dataclass class MarketDataPoint: """Standardized market data structure.""" @@ -141,19 +131,25 @@ class BaseDataCollector(ABC): self.data_types = data_types or [DataType.CANDLE] self.timeframes = timeframes or ['1m', '5m'] # Default timeframes if none provided self.auto_restart = auto_restart - self.health_check_interval = health_check_interval - self.log_errors_only = log_errors_only # Initialize logger based on parameters if logger is not None: self.logger = logger else: - self.logger = None + self.logger = get_logger(self.exchange_name) # Ensure a logger is always available - # Collector state - self.status = CollectorStatus.STOPPED - self._running = False - self._should_be_running = False # Track desired state + # Initialize state and telemetry manager + component = component_name or f"{self.exchange_name}_collector" + self._state_telemetry = CollectorStateAndTelemetry( + exchange_name=self.exchange_name, + component_name=component, + health_check_interval=health_check_interval, + logger=self.logger, # Pass the actual logger instance + log_errors_only=log_errors_only + ) + self.component_name = component # Keep for external access + + # Collector state (now managed by _state_telemetry) self._tasks: Set[asyncio.Task] = set() # Data callbacks @@ -167,58 +163,30 @@ class BaseDataCollector(ABC): self._max_reconnect_attempts = 5 self._reconnect_delay = 5.0 # seconds - # Health monitoring - self._last_heartbeat = datetime.now(timezone.utc) - self._last_data_received = None - self._health_check_task = None - self._max_silence_duration = timedelta(minutes=5) # Max time without data before unhealthy - - # Statistics - self._stats = { - 'messages_received': 0, - 'messages_processed': 0, - 'errors': 0, - 'restarts': 0, - 'last_message_time': None, - 'connection_uptime': None, - 'last_error': None, - 'last_restart_time': None - } - # Log initialization if logger is available - if self.logger: - component = component_name or f"{self.exchange_name}_collector" - self.component_name = component - if not self.log_errors_only: - self.logger.info(f"{self.component_name}: Initialized {self.exchange_name} data collector for symbols: {', '.join(symbols)}") - self.logger.info(f"{self.component_name}: Using timeframes: {', '.join(self.timeframes)}") - else: - self.component_name = component_name or f"{self.exchange_name}_collector" + if self._state_telemetry.logger: + if not self._state_telemetry.log_errors_only: + self._state_telemetry._log_info(f"{self.component_name}: Initialized {self.exchange_name} data collector for symbols: {', '.join(symbols)}") + self._state_telemetry._log_info(f"{self.component_name}: Using timeframes: {', '.join(self.timeframes)}") + @property + def status(self) -> CollectorStatus: + return self._state_telemetry.status + def _log_debug(self, message: str) -> None: - """Log debug message if logger is available and not in errors-only mode.""" - if self.logger and not self.log_errors_only: - self.logger.debug(message) + self._state_telemetry._log_debug(message) def _log_info(self, message: str) -> None: - """Log info message if logger is available and not in errors-only mode.""" - if self.logger and not self.log_errors_only: - self.logger.info(message) + self._state_telemetry._log_info(message) def _log_warning(self, message: str) -> None: - """Log warning message if logger is available and not in errors-only mode.""" - if self.logger and not self.log_errors_only: - self.logger.warning(message) + self._state_telemetry._log_warning(message) def _log_error(self, message: str, exc_info: bool = False) -> None: - """Log error message if logger is available (always logs errors regardless of log_errors_only).""" - if self.logger: - self.logger.error(message, exc_info=exc_info) + self._state_telemetry._log_error(message, exc_info=exc_info) def _log_critical(self, message: str, exc_info: bool = False) -> None: - """Log critical message if logger is available (always logs critical regardless of log_errors_only).""" - if self.logger: - self.logger.critical(message, exc_info=exc_info) + self._state_telemetry._log_critical(message, exc_info=exc_info) @abstractmethod async def connect(self) -> bool: @@ -284,31 +252,32 @@ class BaseDataCollector(ABC): True if started successfully, False otherwise """ # Check if already running or starting - if self.status in [CollectorStatus.RUNNING, CollectorStatus.STARTING]: + if self._state_telemetry.status in [CollectorStatus.RUNNING, CollectorStatus.STARTING]: self._log_warning("Data collector is already running or starting") return True self._log_info(f"Starting {self.exchange_name} data collector") - self.status = CollectorStatus.STARTING - self._should_be_running = True + self._state_telemetry.update_status(CollectorStatus.STARTING) + self._state_telemetry.set_should_be_running(True) try: # Connect to data source if not await self.connect(): self._log_error("Failed to connect to data source") - self.status = CollectorStatus.ERROR + self._state_telemetry.update_status(CollectorStatus.ERROR) return False # Subscribe to data streams if not await self.subscribe_to_data(list(self.symbols), self.data_types): self._log_error("Failed to subscribe to data streams") - self.status = CollectorStatus.ERROR + self._state_telemetry.update_status(CollectorStatus.ERROR) await self.disconnect() return False # Start background tasks - self._running = True - self.status = CollectorStatus.RUNNING + self._state_telemetry.set_running_state(True) + self._state_telemetry.update_status(CollectorStatus.RUNNING) + self._state_telemetry.set_connection_uptime_start() # Record connection uptime start # Start message processing task message_task = asyncio.create_task(self._message_loop()) @@ -316,7 +285,7 @@ class BaseDataCollector(ABC): message_task.add_done_callback(self._tasks.discard) # Start health monitoring task - if self.health_check_interval > 0: + if self._state_telemetry.health_check_interval > 0: health_task = asyncio.create_task(self._health_monitor()) self._tasks.add(health_task) health_task.add_done_callback(self._tasks.discard) @@ -326,8 +295,8 @@ class BaseDataCollector(ABC): except Exception as e: self._log_error(f"Failed to start data collector: {e}") - self.status = CollectorStatus.ERROR - self._should_be_running = False + self._state_telemetry.update_status(CollectorStatus.ERROR) + self._state_telemetry.set_should_be_running(False) return False async def stop(self, force: bool = False) -> None: @@ -337,17 +306,17 @@ class BaseDataCollector(ABC): Args: force: Force stop even if not graceful """ - if self.status == CollectorStatus.STOPPED: + if self._state_telemetry.status == CollectorStatus.STOPPED: self._log_warning("Data collector is already stopped") return self._log_info(f"Stopping {self.exchange_name} data collector") - self.status = CollectorStatus.STOPPING - self._should_be_running = False + self._state_telemetry.update_status(CollectorStatus.STOPPING) + self._state_telemetry.set_should_be_running(False) try: # Stop background tasks - self._running = False + self._state_telemetry.set_running_state(False) # Cancel all tasks for task in list(self._tasks): @@ -365,12 +334,12 @@ class BaseDataCollector(ABC): await self.unsubscribe_from_data(list(self.symbols), self.data_types) await self.disconnect() - self.status = CollectorStatus.STOPPED + self._state_telemetry.update_status(CollectorStatus.STOPPED) self._log_info(f"{self.exchange_name} data collector stopped") except Exception as e: self._log_error(f"Error stopping data collector: {e}") - self.status = CollectorStatus.ERROR + self._state_telemetry.update_status(CollectorStatus.ERROR) async def restart(self) -> bool: """ @@ -380,8 +349,7 @@ class BaseDataCollector(ABC): True if restarted successfully, False otherwise """ self._log_info(f"Restarting {self.exchange_name} data collector") - self._stats['restarts'] += 1 - self._stats['last_restart_time'] = datetime.now(timezone.utc) + self._state_telemetry.increment_restarts() # Stop first await self.stop() @@ -397,14 +365,13 @@ class BaseDataCollector(ABC): try: self._log_debug("Starting message processing loop") - while self._running: + while self._state_telemetry._running: try: await self._handle_messages() except asyncio.CancelledError: break except Exception as e: - self._stats['errors'] += 1 - self._stats['last_error'] = str(e) + self._state_telemetry.increment_errors(str(e)) self._log_error(f"Error processing messages: {e}") # Small delay to prevent tight error loops @@ -415,46 +382,44 @@ class BaseDataCollector(ABC): raise except Exception as e: self._log_error(f"Error in message loop: {e}") - self.status = CollectorStatus.ERROR + self._state_telemetry.update_status(CollectorStatus.ERROR) async def _health_monitor(self) -> None: """Monitor collector health and restart if needed.""" try: self._log_debug("Starting health monitor") - while self._running: + while self._state_telemetry._running: try: - await asyncio.sleep(self.health_check_interval) - - current_time = datetime.now(timezone.utc) + await asyncio.sleep(self._state_telemetry.health_check_interval) # Check if collector should be running but isn't - if self._should_be_running and self.status != CollectorStatus.RUNNING: + if self._state_telemetry._should_be_running and self._state_telemetry.status != CollectorStatus.RUNNING: self._log_warning("Collector should be running but isn't - restarting") if self.auto_restart: asyncio.create_task(self.restart()) continue # Check heartbeat - time_since_heartbeat = current_time - self._last_heartbeat - if time_since_heartbeat > timedelta(seconds=self.health_check_interval * 2): + time_since_heartbeat = datetime.now(timezone.utc) - self._state_telemetry._last_heartbeat + if time_since_heartbeat > timedelta(seconds=self._state_telemetry.health_check_interval * 2): self._log_warning(f"No heartbeat for {time_since_heartbeat.total_seconds():.1f}s - restarting") if self.auto_restart: asyncio.create_task(self.restart()) continue # Check data reception - if self._last_data_received: - time_since_data = current_time - self._last_data_received - if time_since_data > self._max_silence_duration: + if self._state_telemetry._last_data_received: + time_since_data = datetime.now(timezone.utc) - self._state_telemetry._last_data_received + if time_since_data > self._state_telemetry._max_silence_duration: self._log_warning(f"No data received for {time_since_data.total_seconds():.1f}s - restarting") if self.auto_restart: asyncio.create_task(self.restart()) continue # Check for error status - if self.status == CollectorStatus.ERROR: - self._log_warning(f"Collector in {self.status.value} status - restarting") + if self._state_telemetry.status == CollectorStatus.ERROR: + self._log_warning(f"Collector in {self._state_telemetry.status.value} status - restarting") if self.auto_restart: asyncio.create_task(self.restart()) @@ -486,11 +451,11 @@ class BaseDataCollector(ABC): if self._reconnect_attempts > self._max_reconnect_attempts: self._log_error(f"Max reconnection attempts ({self._max_reconnect_attempts}) exceeded") - self.status = CollectorStatus.ERROR - self._should_be_running = False + self._state_telemetry.update_status(CollectorStatus.ERROR) + self._state_telemetry.set_should_be_running(False) return False - self.status = CollectorStatus.RECONNECTING + self._state_telemetry.update_status(CollectorStatus.RECONNECTING) self._log_warning(f"Connection lost. Attempting reconnection {self._reconnect_attempts}/{self._max_reconnect_attempts}") # Disconnect and wait before retrying @@ -502,7 +467,7 @@ class BaseDataCollector(ABC): if await self.connect(): if await self.subscribe_to_data(list(self.symbols), self.data_types): self._log_info("Reconnection successful") - self.status = CollectorStatus.RUNNING + self._state_telemetry.update_status(CollectorStatus.RUNNING) self._reconnect_attempts = 0 return True @@ -556,10 +521,10 @@ class BaseDataCollector(ABC): self._log_error(f"Error in data callback: {e}") # Update statistics - self._stats['messages_processed'] += 1 - self._stats['last_message_time'] = data_point.timestamp - self._last_data_received = datetime.now(timezone.utc) - self._last_heartbeat = datetime.now(timezone.utc) + self._state_telemetry.increment_messages_processed() + self._state_telemetry._stats['last_message_time'] = data_point.timestamp # Direct update for now, will refactor + self._state_telemetry.update_data_received_timestamp() + self._state_telemetry.update_heartbeat() def get_status(self) -> Dict[str, Any]: """ @@ -568,37 +533,7 @@ class BaseDataCollector(ABC): Returns: Dictionary containing status information """ - uptime_seconds = None - if self._stats['connection_uptime']: - uptime_seconds = (datetime.now(timezone.utc) - self._stats['connection_uptime']).total_seconds() - - time_since_heartbeat = None - if self._last_heartbeat: - time_since_heartbeat = (datetime.now(timezone.utc) - self._last_heartbeat).total_seconds() - - time_since_data = None - if self._last_data_received: - time_since_data = (datetime.now(timezone.utc) - self._last_data_received).total_seconds() - - return { - 'exchange': self.exchange_name, - 'status': self.status.value, - 'should_be_running': self._should_be_running, - 'symbols': list(self.symbols), - 'data_types': [dt.value for dt in self.data_types], - 'timeframes': self.timeframes, - 'auto_restart': self.auto_restart, - 'health': { - 'time_since_heartbeat': time_since_heartbeat, - 'time_since_data': time_since_data, - 'max_silence_duration': self._max_silence_duration.total_seconds() - }, - 'statistics': { - **self._stats, - 'uptime_seconds': uptime_seconds, - 'reconnect_attempts': self._reconnect_attempts - } - } + return self._state_telemetry.get_status() def get_health_status(self) -> Dict[str, Any]: """ @@ -607,46 +542,7 @@ class BaseDataCollector(ABC): Returns: Dictionary containing health information """ - now = datetime.now(timezone.utc) - - is_healthy = True - health_issues = [] - - # Check if should be running but isn't - if self._should_be_running and not self._running: - is_healthy = False - health_issues.append("Should be running but is stopped") - - # Check heartbeat - if self._last_heartbeat: - time_since_heartbeat = now - self._last_heartbeat - if time_since_heartbeat > timedelta(seconds=self.health_check_interval * 2): - is_healthy = False - health_issues.append(f"No heartbeat for {time_since_heartbeat.total_seconds():.1f}s") - - # Check data freshness - if self._last_data_received: - time_since_data = now - self._last_data_received - if time_since_data > self._max_silence_duration: - is_healthy = False - health_issues.append(f"No data for {time_since_data.total_seconds():.1f}s") - - # Check status - if self.status in [CollectorStatus.ERROR, CollectorStatus.UNHEALTHY]: - is_healthy = False - health_issues.append(f"Status: {self.status.value}") - - return { - 'is_healthy': is_healthy, - 'issues': health_issues, - 'status': self.status.value, - 'last_heartbeat': self._last_heartbeat.isoformat() if self._last_heartbeat else None, - 'last_data_received': self._last_data_received.isoformat() if self._last_data_received else None, - 'should_be_running': self._should_be_running, - 'is_running': self._running, - 'timeframes': self.timeframes, - 'data_types': [dt.value for dt in self.data_types] - } + return self._state_telemetry.get_health_status() def add_symbol(self, symbol: str) -> None: """ @@ -660,7 +556,7 @@ class BaseDataCollector(ABC): self._log_info(f"Added symbol: {symbol}") # If collector is running, subscribe to new symbol - if self.status == CollectorStatus.RUNNING: + if self._state_telemetry.status == CollectorStatus.RUNNING: # Note: This needs to be called from an async context # Users should handle this appropriately pass @@ -677,7 +573,7 @@ class BaseDataCollector(ABC): self._log_info(f"Removed symbol: {symbol}") # If collector is running, unsubscribe from symbol - if self.status == CollectorStatus.RUNNING: + if self._state_telemetry.status == CollectorStatus.RUNNING: # Note: This needs to be called from an async context # Users should handle this appropriately pass diff --git a/data/collector/collector_state_telemetry.py b/data/collector/collector_state_telemetry.py new file mode 100644 index 0000000..b485ffa --- /dev/null +++ b/data/collector/collector_state_telemetry.py @@ -0,0 +1,223 @@ +""" +Module for managing collector status, health, and telemetry. + +This module provides a dedicated class to encapsulate the status, +health checks, and statistical data for a data collector, promoting +modularity and separation of concerns. +""" + +from datetime import datetime, timezone, timedelta +from enum import Enum +from typing import Any, Dict, Optional + + +class CollectorStatus(Enum): + """Status of the data collector.""" + STOPPED = "stopped" + STARTING = "starting" + RUNNING = "running" + STOPPING = "stopping" + ERROR = "error" + RECONNECTING = "reconnecting" + UNHEALTHY = "unhealthy" # Added for health monitoring + + +class CollectorStateAndTelemetry: + """ + Manages the operational state, health, and performance statistics + of a data collector. + """ + + def __init__(self, + exchange_name: str, + component_name: str, + health_check_interval: float = 30.0, + max_silence_duration: timedelta = timedelta(minutes=5), + logger=None, + log_errors_only: bool = False): + self.exchange_name = exchange_name + self.component_name = component_name + self.health_check_interval = health_check_interval + self._max_silence_duration = max_silence_duration + self.logger = logger + self.log_errors_only = log_errors_only + + # Collector state + self.status = CollectorStatus.STOPPED + self._running = False + self._should_be_running = False # Track desired state + + # Health monitoring + self._last_heartbeat = datetime.now(timezone.utc) + self._last_data_received = None + + # Statistics + self._stats = { + 'messages_received': 0, + 'messages_processed': 0, + 'errors': 0, + 'restarts': 0, + 'last_message_time': None, + 'connection_uptime': None, + 'last_error': None, + 'last_restart_time': None + } + + def _log_debug(self, message: str) -> None: + """Log debug message if logger is available and not in errors-only mode.""" + if self.logger and not self.log_errors_only: + self.logger.debug(message) + + def _log_info(self, message: str) -> None: + """Log info message if logger is available and not in errors-only mode.""" + if self.logger and not self.log_errors_only: + self.logger.info(message) + + def _log_warning(self, message: str) -> None: + """Log warning message if logger is available and not in errors-only mode.""" + if self.logger and not self.log_errors_only: + self.logger.warning(message) + + def _log_error(self, message: str, exc_info: bool = False) -> None: + """Log error message if logger is available (always logs errors regardless of log_errors_only).""" + if self.logger: + self.logger.error(message, exc_info=exc_info) + + def _log_critical(self, message: str, exc_info: bool = False) -> None: + """Log critical message if logger is available (always logs critical regardless of log_errors_only).""" + if self.logger: + self.logger.critical(message, exc_info=exc_info) + + def update_status(self, new_status: CollectorStatus) -> None: + """Update the collector's operational status.""" + self.status = new_status + self._log_debug(f"Collector status updated to: {new_status.value}") + + def set_running_state(self, is_running: bool) -> None: + """Set the internal running state.""" + self._running = is_running + self._log_debug(f"Collector internal running state set to: {is_running}") + + def set_should_be_running(self, should_run: bool) -> None: + """Set the desired running state.""" + self._should_be_running = should_run + self._log_debug(f"Collector desired running state set to: {should_run}") + + def update_heartbeat(self) -> None: + """Update the last heartbeat timestamp.""" + self._last_heartbeat = datetime.now(timezone.utc) + self._log_debug("Heartbeat updated") + + def update_data_received_timestamp(self) -> None: + """Update the last data received timestamp.""" + self._last_data_received = datetime.now(timezone.utc) + self._log_debug("Last data received timestamp updated") + + def increment_messages_received(self) -> None: + """Increment the count of messages received.""" + self._stats['messages_received'] += 1 + self._log_debug(f"Messages received: {self._stats['messages_received']}") + + def increment_messages_processed(self) -> None: + """Increment the count of messages processed.""" + self._stats['messages_processed'] += 1 + self._log_debug(f"Messages processed: {self._stats['messages_processed']}") + + def increment_errors(self, error_message: str) -> None: + """Increment error count and store the last error message.""" + self._stats['errors'] += 1 + self._stats['last_error'] = error_message + self._log_error(f"Error count: {self._stats['errors']}, Last error: {error_message}") + + def increment_restarts(self) -> None: + """Increment restart count and update last restart time.""" + self._stats['restarts'] += 1 + self._stats['last_restart_time'] = datetime.now(timezone.utc) + self._log_info(f"Collector restarts: {self._stats['restarts']}") + + def set_connection_uptime_start(self) -> None: + """Set the connection uptime start time.""" + self._stats['connection_uptime'] = datetime.now(timezone.utc) + self._log_debug("Connection uptime start set") + + def get_status(self) -> Dict[str, Any]: + """ + Get current collector status and statistics. + + Returns: + Dictionary containing status information + """ + uptime_seconds = None + if self._stats['connection_uptime']: + uptime_seconds = (datetime.now(timezone.utc) - self._stats['connection_uptime']).total_seconds() + + time_since_heartbeat = None + if self._last_heartbeat: + time_since_heartbeat = (datetime.now(timezone.utc) - self._last_heartbeat).total_seconds() + + time_since_data = None + if self._last_data_received: + time_since_data = (datetime.now(timezone.utc) - self._last_data_received).total_seconds() + + return { + 'exchange': self.exchange_name, + 'status': self.status.value, + 'should_be_running': self._should_be_running, + 'auto_restart': True, # This will be managed by the base collector, not state + 'health': { + 'time_since_heartbeat': time_since_heartbeat, + 'time_since_data': time_since_data, + 'max_silence_duration': self._max_silence_duration.total_seconds() + }, + 'statistics': { + **self._stats, + 'uptime_seconds': uptime_seconds, + 'reconnect_attempts': 0 # This will be managed by connection manager + } + } + + def get_health_status(self) -> Dict[str, Any]: + """ + Get detailed health status for monitoring. + + Returns: + Dictionary containing health information + """ + now = datetime.now(timezone.utc) + + is_healthy = True + health_issues = [] + + # Check if should be running but isn't + if self._should_be_running and not self._running: + is_healthy = False + health_issues.append("Should be running but is stopped") + + # Check heartbeat + if self._last_heartbeat: + time_since_heartbeat = now - self._last_heartbeat + if time_since_heartbeat > timedelta(seconds=self.health_check_interval * 2): + is_healthy = False + health_issues.append(f"No heartbeat for {time_since_heartbeat.total_seconds():.1f}s") + + # Check data freshness + if self._last_data_received: + time_since_data = now - self._last_data_received + if time_since_data > self._max_silence_duration: + is_healthy = False + health_issues.append(f"No data for {time_since_data.total_seconds():.1f}s") + + # Check for error status + if self.status in [CollectorStatus.ERROR, CollectorStatus.UNHEALTHY]: + is_healthy = False + health_issues.append(f"Status: {self.status.value}") + + return { + 'is_healthy': is_healthy, + 'issues': health_issues, + 'status': self.status.value, + 'last_heartbeat': self._last_heartbeat.isoformat() if self._last_heartbeat else None, + 'last_data_received': self._last_data_received.isoformat() if self._last_data_received else None, + 'should_be_running': self._should_be_running, + 'is_running': self._running + } \ No newline at end of file diff --git a/data/collector_manager.py b/data/collector_manager.py index c5e2e4e..07043ec 100644 --- a/data/collector_manager.py +++ b/data/collector_manager.py @@ -154,7 +154,7 @@ class CollectorManager: symbols=list(collector.symbols), data_types=[dt.value for dt in collector.data_types], auto_restart=collector.auto_restart, - health_check_interval=collector.health_check_interval + health_check_interval=collector._state_telemetry.health_check_interval ) self._collectors[collector_name] = collector diff --git a/data/exchanges/okx/collector.py b/data/exchanges/okx/collector.py index 14df8d1..ad61f3a 100644 --- a/data/exchanges/okx/collector.py +++ b/data/exchanges/okx/collector.py @@ -132,10 +132,9 @@ class OKXCollector(BaseDataCollector): DataType.TICKER: OKXChannelType.TICKERS.value } - if logger: - logger.info(f"{component_name}: Initialized OKX collector for {symbol} with data types: {[dt.value for dt in data_types]}") - logger.info(f"{component_name}: Using timeframes: {self.timeframes}") - logger.info(f"{component_name}: Using common data processing framework") + self._log_info(f"{component_name}: Initialized OKX collector for {symbol} with data types: {[dt.value for dt in data_types]}") + self._log_info(f"{component_name}: Using timeframes: {self.timeframes}") + self._log_info(f"{component_name}: Using common data processing framework") async def connect(self) -> bool: """ @@ -145,11 +144,10 @@ class OKXCollector(BaseDataCollector): True if connection successful, False otherwise """ try: - if self.logger: - self.logger.info(f"{self.component_name}: Connecting OKX collector for {self.symbol}") + self._log_info(f"Connecting OKX collector for {self.symbol}") # Initialize database operations using repository pattern - self._db_operations = get_database_operations(self.logger) + self._db_operations = get_database_operations(self.logger) # self.logger needs to be accessible for database operations # Create WebSocket client ws_component_name = f"okx_ws_{self.symbol.replace('-', '_').lower()}" @@ -167,35 +165,31 @@ class OKXCollector(BaseDataCollector): # Connect to WebSocket if not await self._ws_client.connect(use_public=True): - if self.logger: - self.logger.error(f"{self.component_name}: Failed to connect to OKX WebSocket") + self._log_error(f"Failed to connect to OKX WebSocket") return False - if self.logger: - self.logger.info(f"{self.component_name}: Successfully connected OKX collector for {self.symbol}") + self._log_info(f"Successfully connected OKX collector for {self.symbol}") return True except Exception as e: - if self.logger: - self.logger.error(f"{self.component_name}: Error connecting OKX collector for {self.symbol}: {e}") + self._log_error(f"Error connecting OKX collector for {self.symbol}: {e}") return False async def disconnect(self) -> None: - """Disconnect from OKX WebSocket API.""" + """ + Disconnect from OKX WebSocket API. + """ try: - if self.logger: - self.logger.info(f"{self.component_name}: Disconnecting OKX collector for {self.symbol}") + self._log_info(f"Disconnecting OKX collector for {self.symbol}") if self._ws_client: await self._ws_client.disconnect() self._ws_client = None - if self.logger: - self.logger.info(f"{self.component_name}: Disconnected OKX collector for {self.symbol}") + self._log_info(f"Disconnected OKX collector for {self.symbol}") except Exception as e: - if self.logger: - self.logger.error(f"{self.component_name}: Error disconnecting OKX collector for {self.symbol}: {e}") + self._log_error(f"Error disconnecting OKX collector for {self.symbol}: {e}") async def subscribe_to_data(self, symbols: List[str], data_types: List[DataType]) -> bool: """ @@ -209,14 +203,12 @@ class OKXCollector(BaseDataCollector): True if subscription successful, False otherwise """ if not self._ws_client or not self._ws_client.is_connected: - if self.logger: - self.logger.error(f"{self.component_name}: WebSocket client not connected") + self._log_error(f"WebSocket client not connected") return False # Validate symbol if self.symbol not in symbols: - if self.logger: - self.logger.warning(f"{self.component_name}: Symbol {self.symbol} not in subscription list: {symbols}") + self._log_warning(f"Symbol {self.symbol} not in subscription list: {symbols}") return False try: @@ -231,31 +223,25 @@ class OKXCollector(BaseDataCollector): enabled=True ) subscriptions.append(subscription) - if self.logger: - self.logger.debug(f"{self.component_name}: Added subscription: {channel} for {self.symbol}") + self._log_debug(f"Added subscription: {channel} for {self.symbol}") else: - if self.logger: - self.logger.warning(f"{self.component_name}: Unsupported data type: {data_type}") + self._log_warning(f"Unsupported data type: {data_type}") if not subscriptions: - if self.logger: - self.logger.warning(f"{self.component_name}: No valid subscriptions to create") + self._log_warning(f"No valid subscriptions to create") return False # Subscribe to channels success = await self._ws_client.subscribe(subscriptions) if success: - if self.logger: - self.logger.info(f"{self.component_name}: Successfully subscribed to {len(subscriptions)} channels for {self.symbol}") + self._log_info(f"Successfully subscribed to {len(subscriptions)} channels for {self.symbol}") return True else: - if self.logger: - self.logger.error(f"{self.component_name}: Failed to subscribe to channels for {self.symbol}") + self._log_error(f"Failed to subscribe to channels for {self.symbol}") return False except Exception as e: - if self.logger: - self.logger.error(f"{self.component_name}: Error subscribing to data for {self.symbol}: {e}") + self._log_error(f"Error subscribing to data for {self.symbol}: {e}") return False async def unsubscribe_from_data(self, symbols: List[str], data_types: List[DataType]) -> bool: @@ -270,8 +256,7 @@ class OKXCollector(BaseDataCollector): True if unsubscription successful, False otherwise """ if not self._ws_client or not self._ws_client.is_connected: - if self.logger: - self.logger.warning(f"{self.component_name}: WebSocket client not connected") + self._log_warning(f"WebSocket client not connected") return True # Consider it successful if not connected try: @@ -293,17 +278,14 @@ class OKXCollector(BaseDataCollector): # Unsubscribe from channels success = await self._ws_client.unsubscribe(subscriptions) if success: - if self.logger: - self.logger.info(f"{self.component_name}: Successfully unsubscribed from {len(subscriptions)} channels for {self.symbol}") + self._log_info(f"Successfully unsubscribed from {len(subscriptions)} channels for {self.symbol}") return True else: - if self.logger: - self.logger.error(f"{self.component_name}: Failed to unsubscribe from channels for {self.symbol}") + self._log_error(f"Failed to unsubscribe from channels for {self.symbol}") return False except Exception as e: - if self.logger: - self.logger.error(f"{self.component_name}: Error unsubscribing from data for {self.symbol}: {e}") + self._log_error(f"Error unsubscribing from data for {self.symbol}: {e}") return False async def _process_message(self, message: Any) -> Optional[MarketDataPoint]: @@ -317,8 +299,7 @@ class OKXCollector(BaseDataCollector): MarketDataPoint if processing successful, None otherwise """ if not isinstance(message, dict): - if self.logger: - self.logger.warning(f"{self.component_name}: Received non-dict message: {type(message)}") + self._log_warning(f"Received non-dict message: {type(message)}") return None try: @@ -331,13 +312,11 @@ class OKXCollector(BaseDataCollector): if not success: self._error_count += 1 - if self.logger: - self.logger.error(f"{self.component_name}: Message processing failed: {errors}") + self._log_error(f"Message processing failed: {errors}") return None if errors: - if self.logger: - self.logger.warning(f"{self.component_name}: Message processing warnings: {errors}") + self._log_warning(f"Message processing warnings: {errors}") # Store raw data if enabled (for debugging/compliance) if self.store_raw_data: @@ -353,22 +332,23 @@ class OKXCollector(BaseDataCollector): except Exception as e: self._error_count += 1 - if self.logger: - self.logger.error(f"{self.component_name}: Error processing message: {e}") + self._log_error(f"Error processing message: {e}") return None async def _handle_messages(self) -> None: - """Handle message processing in the background.""" + """ + Handle message processing in the background. + This method exists for compatibility with BaseDataCollector + """ # The new data processor handles messages through callbacks - # This method exists for compatibility with BaseDataCollector # Update heartbeat to indicate the message loop is active - self._last_heartbeat = datetime.now(timezone.utc) + self._state_telemetry.update_heartbeat() # Check if we're receiving WebSocket messages if self._ws_client and self._ws_client.is_connected: # Update last data received timestamp if WebSocket is connected and active - self._last_data_received = datetime.now(timezone.utc) + self._state_telemetry.update_data_received_timestamp() # Short sleep to prevent busy loop while maintaining heartbeat await asyncio.sleep(0.1) @@ -386,15 +366,13 @@ class OKXCollector(BaseDataCollector): # Store raw market data points in raw_trades table using repository success = self._db_operations.raw_trades.insert_market_data_point(data_point) - if success and self.logger: - self.logger.debug(f"{self.component_name}: Stored raw data: {data_point.data_type.value} for {data_point.symbol}") + if success: + self._log_debug(f"Stored raw data: {data_point.data_type.value} for {data_point.symbol}") except DatabaseOperationError as e: - if self.logger: - self.logger.error(f"{self.component_name}: Database error storing raw market data: {e}") + self._log_error(f"Database error storing raw market data: {e}") except Exception as e: - if self.logger: - self.logger.error(f"{self.component_name}: Error storing raw market data: {e}") + self._log_error(f"Error storing raw market data: {e}") async def _store_completed_candle(self, candle: OHLCVCandle) -> None: """ @@ -414,21 +392,19 @@ class OKXCollector(BaseDataCollector): # Store completed candles using repository pattern success = self._db_operations.market_data.upsert_candle(candle, self.force_update_candles) - if success and self.logger: + if success: action = "Updated" if self.force_update_candles else "Stored" - self.logger.debug(f"{self.component_name}: {action} candle: {candle.symbol} {candle.timeframe} at {candle.end_time} (force_update={self.force_update_candles}) - OHLCV: {candle.open}/{candle.high}/{candle.low}/{candle.close}, Vol: {candle.volume}, Trades: {candle.trade_count}") + self._log_debug(f"{action} candle: {candle.symbol} {candle.timeframe} at {candle.end_time} (force_update={self.force_update_candles}) - OHLCV: {candle.open}/{candle.high}/{candle.low}/{candle.close}, Vol: {candle.volume}, Trades: {candle.trade_count}") except DatabaseOperationError as e: - if self.logger: - self.logger.error(f"{self.component_name}: Database error storing completed candle: {e}") - # Log candle details for debugging - self.logger.error(f"{self.component_name}: Failed candle details: {candle.symbol} {candle.timeframe} {candle.end_time} - OHLCV: {candle.open}/{candle.high}/{candle.low}/{candle.close}") + self._log_error(f"Database error storing completed candle: {e}") + # Log candle details for debugging + self._log_error(f"Failed candle details: {candle.symbol} {candle.timeframe} {candle.end_time} - OHLCV: {candle.open}/{candle.high}/{candle.low}/{candle.close}") self._error_count += 1 except Exception as e: - if self.logger: - self.logger.error(f"{self.component_name}: Error storing completed candle: {e}") - # Log candle details for debugging - self.logger.error(f"{self.component_name}: Failed candle details: {candle.symbol} {candle.timeframe} {candle.end_time} - OHLCV: {candle.open}/{candle.high}/{candle.low}/{candle.close}") + self._log_error(f"Error storing completed candle: {e}") + # Log candle details for debugging + self._log_error(f"Failed candle details: {candle.symbol} {candle.timeframe} {candle.end_time} - OHLCV: {candle.open}/{candle.high}/{candle.low}/{candle.close}") self._error_count += 1 async def _store_raw_data(self, channel: str, raw_message: Dict[str, Any]) -> None: @@ -452,15 +428,13 @@ class OKXCollector(BaseDataCollector): raw_data=data_item, timestamp=datetime.now(timezone.utc) ) - if not success and self.logger: - self.logger.warning(f"{self.component_name}: Failed to store raw WebSocket data for {channel}") + if not success: + self._log_warning(f"Failed to store raw WebSocket data for {channel}") except DatabaseOperationError as e: - if self.logger: - self.logger.error(f"{self.component_name}: Database error storing raw WebSocket data: {e}") + self._log_error(f"Database error storing raw WebSocket data: {e}") except Exception as e: - if self.logger: - self.logger.error(f"{self.component_name}: Error storing raw WebSocket data: {e}") + self._log_error(f"Error storing raw WebSocket data: {e}") def _on_message(self, message: Dict[str, Any]) -> None: """ @@ -471,16 +445,14 @@ class OKXCollector(BaseDataCollector): """ try: # Update heartbeat and data received timestamps - current_time = datetime.now(timezone.utc) - self._last_heartbeat = current_time - self._last_data_received = current_time + self._state_telemetry.update_heartbeat() + self._state_telemetry.update_data_received_timestamp() self._message_count += 1 # Process message asynchronously asyncio.create_task(self._process_message(message)) except Exception as e: - if self.logger: - self.logger.error(f"{self.component_name}: Error handling WebSocket message: {e}") + self._log_error(f"Error handling WebSocket message: {e}") def _on_trade_processed(self, trade: StandardizedTrade) -> None: """ @@ -490,8 +462,7 @@ class OKXCollector(BaseDataCollector): trade: Processed standardized trade """ self._processed_trades += 1 - if self.logger: - self.logger.debug(f"{self.component_name}: Processed trade: {trade.symbol} {trade.side} {trade.size}@{trade.price}") + self._log_debug(f"Processed trade: {trade.symbol} {trade.side} {trade.size}@{trade.price}") def _on_candle_processed(self, candle: OHLCVCandle) -> None: """ @@ -501,8 +472,7 @@ class OKXCollector(BaseDataCollector): candle: Completed OHLCV candle """ self._processed_candles += 1 - if self.logger: - self.logger.debug(f"{self.component_name}: Completed candle: {candle.symbol} {candle.timeframe} O:{candle.open} H:{candle.high} L:{candle.low} C:{candle.close} V:{candle.volume}") + self._log_debug(f"Processed candle: {candle.symbol} {candle.timeframe}") # Store completed candle in market_data table if candle.is_complete: @@ -510,41 +480,22 @@ class OKXCollector(BaseDataCollector): def get_status(self) -> Dict[str, Any]: """ - Get current collector status including processing statistics. + Get current collector status and statistics. Returns: - Dictionary containing collector status information + Dictionary containing status information """ - base_status = super().get_status() - - # Add OKX-specific status - okx_status = { - "symbol": self.symbol, - "websocket_connected": self._ws_client.is_connected if self._ws_client else False, - "websocket_state": self._ws_client.connection_state.value if self._ws_client else "disconnected", - "store_raw_data": self.store_raw_data, - "force_update_candles": self.force_update_candles, - "timeframes": self.timeframes, - "processing_stats": { - "messages_received": self._message_count, - "trades_processed": self._processed_trades, - "candles_processed": self._processed_candles, - "errors": self._error_count - } - } - - # Add data processor statistics - if self._data_processor: - okx_status["data_processor_stats"] = self._data_processor.get_processing_stats() - - # Add WebSocket statistics - if self._ws_client: - okx_status["websocket_stats"] = self._ws_client.get_stats() - - # Merge with base status - base_status.update(okx_status) - return base_status + return self._state_telemetry.get_status() + def get_health_status(self) -> Dict[str, Any]: + """ + Get detailed health status for monitoring. + + Returns: + Dictionary containing health information + """ + return self._state_telemetry.get_health_status() + def __repr__(self) -> str: """String representation of the collector.""" - return f"OKXCollector(symbol='{self.symbol}', status='{self.status.value}', data_types={[dt.value for dt in self.data_types]})" \ No newline at end of file + return f"<{self.__class__.__name__}({self.exchange_name}, {len(self.symbols)} symbols, {self._state_telemetry.status.value})>" \ No newline at end of file diff --git a/tasks/tasks-base-collector-refactoring.md b/tasks/tasks-base-collector-refactoring.md new file mode 100644 index 0000000..bf32e71 --- /dev/null +++ b/tasks/tasks-base-collector-refactoring.md @@ -0,0 +1,66 @@ +## Relevant Files + +- `data/base_collector.py` - The main file to be refactored, where `BaseDataCollector` is defined. +- `data/collector/collector_state_telemetry.py` - New file for managing collector status, health, and statistics. +- `data/collector/collector_connection_manager.py` - New file for handling connection, disconnection, and reconnection logic. +- `data/collector/collector_callback_dispatcher.py` - New file for managing data callbacks and notifications. +- `data/ohlcv_data.py` - Potential new file for `OHLCVData` and related validation if deemed beneficial. +- `tests/data/test_base_collector.py` - Existing test file for `BaseDataCollector`. +- `tests/data/collector/test_collector_state_telemetry.py` - New test file for `CollectorStateAndTelemetry` class. +- `tests/data/collector/test_collector_connection_manager.py` - New test file for `ConnectionManager` class. +- `tests/data/collector/test_collector_callback_dispatcher.py` - New test file for `CallbackDispatcher` class. +- `tests/data/test_ohlcv_data.py` - New test file for `OHLCVData` and validation. + +### Notes + +- Unit tests should typically be placed alongside the code files they are testing (e.g., `MyComponent.tsx` and `MyComponent.test.tsx` in the same directory). +- Each refactoring step will be small and verified with existing tests, and new tests will be created for extracted components. + +## Tasks + +- [ ] 0.0 Create `data/collector` directory +- [x] 1.0 Extract `CollectorStateAndTelemetry` Class + - [x] 1.1 Create `data/collector/collector_state_telemetry.py`. + - [x] 1.2 Move `CollectorStatus` enum to `data/collector/collector_state_telemetry.py`. + - [x] 1.3 Move `_stats` initialization and related helper methods (`_log_debug`, `_log_info`, `_log_warning`, `_log_error`, `_log_critical`) to `CollectorStateAndTelemetry`. + - [x] 1.4 Move `get_status` and `get_health_status` methods to `CollectorStateAndTelemetry`. + - [x] 1.5 Implement a constructor for `CollectorStateAndTelemetry` to receive logger and initial parameters. + - [x] 1.6 Add necessary imports to both `data/base_collector.py` and `data/collector/collector_state_telemetry.py`. + - [x] 1.7 Create `tests/data/collector/test_collector_state_telemetry.py` and add initial tests for the new class. + +- [ ] 2.0 Extract `ConnectionManager` Class + - [ ] 2.1 Create `data/collector/collector_connection_manager.py`. + - [ ] 2.2 Move connection-related attributes (`_connection`, `_reconnect_attempts`, `_max_reconnect_attempts`, `_reconnect_delay`) to `ConnectionManager`. + - [ ] 2.3 Move `connect`, `disconnect`, `_handle_connection_error` methods to `ConnectionManager`. + - [ ] 2.4 Implement a constructor for `ConnectionManager` to receive logger and other necessary parameters. + - [ ] 2.5 Add necessary imports to both `data/base_collector.py` and `data/collector/collector_connection_manager.py`. + - [ ] 2.6 Create `tests/data/collector/test_collector_connection_manager.py` and add initial tests for the new class. + +- [ ] 3.0 Extract `CallbackDispatcher` Class + - [ ] 3.1 Create `data/collector/collector_callback_dispatcher.py`. + - [ ] 3.2 Move `_data_callbacks` attribute to `CallbackDispatcher`. + - [ ] 3.3 Move `add_data_callback`, `remove_data_callback`, `_notify_callbacks` methods to `CallbackDispatcher`. + - [ ] 3.4 Implement a constructor for `CallbackDispatcher` to receive logger. + - [ ] 3.5 Add necessary imports to both `data/base_collector.py` and `data/collector/collector_callback_dispatcher.py`. + - [ ] 3.6 Create `tests/data/collector/test_collector_callback_dispatcher.py` and add initial tests for the new class. + +- [ ] 4.0 Refactor `BaseDataCollector` to use new components + - [ ] 4.1 Update `BaseDataCollector.__init__` to instantiate and use `CollectorStateAndTelemetry`, `ConnectionManager`, and `CallbackDispatcher` instances. + - [ ] 4.2 Replace direct access to moved attributes/methods with calls to the new component instances (e.g., `self.logger.info` becomes `self._state_telemetry.log_info`). + - [ ] 4.3 Modify `start`, `stop`, `restart`, `_message_loop`, `_health_monitor` to interact with the new components, delegating responsibilities appropriately. + - [ ] 4.4 Update `get_status` and `get_health_status` in `BaseDataCollector` to delegate to `CollectorStateAndTelemetry`. + - [ ] 4.5 Review and update abstract methods and their calls as needed, ensuring they interact correctly with the new components. + - [ ] 4.6 Ensure all existing tests for `BaseDataCollector` still pass after refactoring. + - [ ] 4.7 Update `data/exchanges/okx/collector.py` to use the new `CollectorStateAndTelemetry` and `ConnectionManager` classes for logging, status updates, and connection handling. + - [ ] 4.8 Update `data/collector_manager.py` to interact with the new `CollectorStateAndTelemetry` class for health checks and status retrieval from `BaseDataCollector` instances. + +- [ ] 5.0 Review and potentially extract `OHLCVData` and related validation + - [ ] 5.1 Analyze if `OHLCVData` and `validate_ohlcv_data` are frequently used outside of `data/base_collector.py`. + - [ ] 5.2 If analysis indicates external usage or clear separation benefits, move `OHLCVData` class and `DataValidationError` to a new `data/ohlcv_data.py` file. + - [ ] 5.3 Update imports in `data/base_collector.py` and any other affected files. + - [ ] 5.4 If `OHLCVData` is extracted, create `tests/data/test_ohlcv_data.py` with tests for its structure and validation logic. + +- [ ] 6.0 Update Module Imports + - [ ] 6.1 Update imports in `data/__init__.py` to reflect the new locations of `CollectorStatus`, `DataCollectorError`, `DataValidationError`, `DataType`, `MarketDataPoint`, and `OHLCVData` (if moved). + - [ ] 6.2 Update imports in `data/common/data_types.py` for `DataType` and `MarketDataPoint`. + - [ ] 6.3 Review and update imports in all test files (`tests/test_refactored_okx.py`, `tests/test_real_storage.py`, `tests/test_okx_collector.py`, `tests/test_exchange_factory.py`, `tests/test_data_collection_aggregation.py`, `tests/test_collector_manager.py`, `tests/test_base_collector.py`, `tests/database/test_database_operations.py`) and scripts (`scripts/production_clean.py`) that import directly from `data.base_collector`. \ No newline at end of file diff --git a/tests/data/collector/test_collector_state_telemetry.py b/tests/data/collector/test_collector_state_telemetry.py new file mode 100644 index 0000000..6d6dc24 --- /dev/null +++ b/tests/data/collector/test_collector_state_telemetry.py @@ -0,0 +1,185 @@ +import pytest +import asyncio +from datetime import datetime, timedelta, timezone +from unittest.mock import Mock + +from data.collector.collector_state_telemetry import CollectorStatus, CollectorStateAndTelemetry + +class TestCollectorStateAndTelemetry: + @pytest.fixture + def mock_logger(self): + return Mock() + + @pytest.fixture + def telemetry(self, mock_logger): + return CollectorStateAndTelemetry( + exchange_name="test_exchange", + component_name="test_collector", + health_check_interval=30.0, + max_silence_duration=timedelta(minutes=5), + logger=mock_logger, + log_errors_only=False + ) + + def test_initial_state(self, telemetry): + assert telemetry.status == CollectorStatus.STOPPED + assert telemetry._running is False + assert telemetry._should_be_running is False + assert telemetry._stats['messages_received'] == 0 + assert telemetry._stats['errors'] == 0 + assert telemetry._last_heartbeat is not None + assert telemetry._last_data_received is None + + def test_update_status(self, telemetry): + telemetry.update_status(CollectorStatus.RUNNING) + assert telemetry.status == CollectorStatus.RUNNING + telemetry.logger.debug.assert_called_with("Collector status updated to: running") + + def test_set_running_state(self, telemetry): + telemetry.set_running_state(True) + assert telemetry._running is True + telemetry.logger.debug.assert_called_with("Collector internal running state set to: True") + + def test_set_should_be_running(self, telemetry): + telemetry.set_should_be_running(True) + assert telemetry._should_be_running is True + telemetry.logger.debug.assert_called_with("Collector desired running state set to: True") + + def test_update_heartbeat(self, telemetry): + old_heartbeat = telemetry._last_heartbeat + telemetry.update_heartbeat() + assert telemetry._last_heartbeat >= old_heartbeat + assert telemetry._last_heartbeat.date() == datetime.now(timezone.utc).date() + telemetry.logger.debug.assert_called_with("Heartbeat updated") + + def test_update_data_received_timestamp(self, telemetry): + old_timestamp = telemetry._last_data_received + telemetry.update_data_received_timestamp() + assert telemetry._last_data_received is not None + assert telemetry._last_data_received >= (old_timestamp if old_timestamp else datetime.min.replace(tzinfo=timezone.utc)) + telemetry.logger.debug.assert_called_with("Last data received timestamp updated") + + def test_increment_messages_received(self, telemetry): + telemetry.increment_messages_received() + assert telemetry._stats['messages_received'] == 1 + telemetry.logger.debug.assert_called_with("Messages received: 1") + + def test_increment_messages_processed(self, telemetry): + telemetry.increment_messages_processed() + assert telemetry._stats['messages_processed'] == 1 + telemetry.logger.debug.assert_called_with("Messages processed: 1") + + def test_increment_errors(self, telemetry): + error_msg = "Test Error" + telemetry.increment_errors(error_msg) + assert telemetry._stats['errors'] == 1 + assert telemetry._stats['last_error'] == error_msg + telemetry.logger.error.assert_called_with(f"Error count: 1, Last error: {error_msg}", exc_info=False) + + def test_increment_restarts(self, telemetry): + telemetry.increment_restarts() + assert telemetry._stats['restarts'] == 1 + assert telemetry._stats['last_restart_time'] is not None + telemetry.logger.info.assert_called_with("Collector restarts: 1") + + def test_set_connection_uptime_start(self, telemetry): + telemetry.set_connection_uptime_start() + assert telemetry._stats['connection_uptime'] is not None + telemetry.logger.debug.assert_called_with("Connection uptime start set") + + def test_get_status(self, telemetry): + telemetry.set_connection_uptime_start() + telemetry.update_heartbeat() + telemetry.update_data_received_timestamp() + telemetry.update_status(CollectorStatus.RUNNING) + telemetry.set_should_be_running(True) + telemetry.set_running_state(True) + + status = telemetry.get_status() + assert status['exchange'] == "test_exchange" + assert status['status'] == "running" + assert status['should_be_running'] is True + assert status['auto_restart'] is True + assert status['health']['time_since_heartbeat'] is not None + assert status['health']['time_since_data'] is not None + assert status['statistics']['uptime_seconds'] is not None + assert status['statistics']['reconnect_attempts'] == 0 + + def test_get_health_status_healthy(self, telemetry): + telemetry.set_running_state(True) + telemetry.set_should_be_running(True) + telemetry.update_status(CollectorStatus.RUNNING) + telemetry.update_heartbeat() + telemetry.update_data_received_timestamp() + + health = telemetry.get_health_status() + assert health['is_healthy'] is True + assert len(health['issues']) == 0 + assert health['status'] == "running" + + @pytest.mark.asyncio + async def test_get_health_status_unhealthy_no_heartbeat(self, telemetry): + telemetry.set_running_state(True) + telemetry.set_should_be_running(True) + telemetry.update_status(CollectorStatus.RUNNING) + + # Simulate no heartbeat for a long time + telemetry._last_heartbeat = datetime.now(timezone.utc) - timedelta(seconds=telemetry.health_check_interval * 3) + + health = telemetry.get_health_status() + assert health['is_healthy'] is False + assert "No heartbeat for" in health['issues'][0] + + @pytest.mark.asyncio + async def test_get_health_status_unhealthy_no_data(self, telemetry): + telemetry.set_running_state(True) + telemetry.set_should_be_running(True) + telemetry.update_status(CollectorStatus.RUNNING) + telemetry.update_heartbeat() + + # Simulate no data for a long time + telemetry._last_data_received = datetime.now(timezone.utc) - (telemetry._max_silence_duration + timedelta(minutes=1)) + + health = telemetry.get_health_status() + assert health['is_healthy'] is False + assert "No data for" in health['issues'][0] + + def test_get_health_status_error_status(self, telemetry): + telemetry.update_status(CollectorStatus.ERROR) + health = telemetry.get_health_status() + assert health['is_healthy'] is False + assert health['issues'][0] == "Status: error" + + def test_logging_methods_no_logger(self): + telemetry_no_logger = CollectorStateAndTelemetry( + exchange_name="test", + component_name="test", + logger=None + ) + # Should not raise an error, calls should just be no-ops + telemetry_no_logger._log_debug("test") + telemetry_no_logger._log_info("test") + telemetry_no_logger._log_warning("test") + telemetry_no_logger._log_error("test") + telemetry_no_logger._log_critical("test") + + def test_logging_methods_log_errors_only(self, mock_logger): + telemetry_errors_only = CollectorStateAndTelemetry( + exchange_name="test", + component_name="test", + logger=mock_logger, + log_errors_only=True + ) + telemetry_errors_only._log_debug("debug msg") + telemetry_errors_only._log_info("info msg") + telemetry_errors_only._log_warning("warning msg") + + mock_logger.debug.assert_not_called() + mock_logger.info.assert_not_called() + mock_logger.warning.assert_not_called() + + telemetry_errors_only._log_error("error msg") + telemetry_errors_only._log_critical("critical msg") + + mock_logger.error.assert_called_once_with("error msg", exc_info=False) + mock_logger.critical.assert_called_once_with("critical msg", exc_info=False) \ No newline at end of file