- Introduced a new `CollectorStateAndTelemetry` class to encapsulate the status, health checks, and statistics of the data collector, promoting modularity and separation of concerns. - Updated `BaseDataCollector` to replace direct status management with calls to the new telemetry class, enhancing maintainability and readability. - Refactored logging methods to utilize the telemetry class, ensuring consistent logging practices. - Modified the `OKXCollector` to integrate with the new telemetry system for improved status reporting and error handling. - Added comprehensive tests for the `CollectorStateAndTelemetry` class to ensure functionality and reliability. These changes streamline the data collector's architecture, aligning with project standards for maintainability and performance.
563 lines
21 KiB
Python
563 lines
21 KiB
Python
"""
|
|
Data Collector Manager for supervising and managing multiple data collectors.
|
|
|
|
This module provides centralized management of data collectors with health monitoring,
|
|
auto-recovery, and coordinated lifecycle management.
|
|
"""
|
|
|
|
import asyncio
|
|
import time
|
|
from datetime import datetime, timezone, timedelta
|
|
from typing import Dict, List, Optional, Any, Set
|
|
from dataclasses import dataclass
|
|
from enum import Enum
|
|
|
|
from utils.logger import get_logger
|
|
from .base_collector import BaseDataCollector, CollectorStatus
|
|
|
|
|
|
class ManagerStatus(Enum):
|
|
"""Status of the collector manager."""
|
|
STOPPED = "stopped"
|
|
STARTING = "starting"
|
|
RUNNING = "running"
|
|
STOPPING = "stopping"
|
|
ERROR = "error"
|
|
|
|
|
|
@dataclass
|
|
class CollectorConfig:
|
|
"""Configuration for a data collector."""
|
|
name: str
|
|
exchange: str
|
|
symbols: List[str]
|
|
data_types: List[str]
|
|
auto_restart: bool = True
|
|
health_check_interval: float = 30.0
|
|
enabled: bool = True
|
|
|
|
|
|
class CollectorManager:
|
|
"""
|
|
Manages multiple data collectors with health monitoring and auto-recovery.
|
|
|
|
The manager is responsible for:
|
|
- Starting and stopping collectors
|
|
- Health monitoring and auto-restart
|
|
- Coordinated lifecycle management
|
|
- Status reporting and metrics
|
|
"""
|
|
|
|
def __init__(self,
|
|
manager_name: str = "collector_manager",
|
|
global_health_check_interval: float = 60.0,
|
|
restart_delay: float = 5.0,
|
|
logger = None,
|
|
log_errors_only: bool = False):
|
|
"""
|
|
Initialize the collector manager.
|
|
|
|
Args:
|
|
manager_name: Name for logging
|
|
global_health_check_interval: Seconds between global health checks
|
|
restart_delay: Delay between restart attempts
|
|
logger: Logger instance. If None, no logging will be performed.
|
|
log_errors_only: If True and logger is provided, only log error-level messages
|
|
"""
|
|
self.manager_name = manager_name
|
|
self.global_health_check_interval = global_health_check_interval
|
|
self.restart_delay = restart_delay
|
|
self.log_errors_only = log_errors_only
|
|
|
|
# Initialize logger based on parameters
|
|
if logger is not None:
|
|
self.logger = logger
|
|
else:
|
|
self.logger = None
|
|
|
|
# Manager state
|
|
self.status = ManagerStatus.STOPPED
|
|
self._running = False
|
|
self._tasks: Set[asyncio.Task] = set()
|
|
|
|
# Collector management
|
|
self._collectors: Dict[str, BaseDataCollector] = {}
|
|
self._collector_configs: Dict[str, CollectorConfig] = {}
|
|
self._enabled_collectors: Set[str] = set()
|
|
|
|
# Health monitoring
|
|
self._last_global_check = datetime.now(timezone.utc)
|
|
self._global_health_task = None
|
|
|
|
# Statistics
|
|
self._stats = {
|
|
'total_collectors': 0,
|
|
'running_collectors': 0,
|
|
'failed_collectors': 0,
|
|
'restarts_performed': 0,
|
|
'last_global_check': None,
|
|
'uptime_start': None
|
|
}
|
|
|
|
if self.logger and not self.log_errors_only:
|
|
self.logger.info(f"Initialized collector manager: {manager_name}")
|
|
|
|
def _log_debug(self, message: str) -> None:
|
|
"""Log debug message if logger is available and not in errors-only mode."""
|
|
if self.logger and not self.log_errors_only:
|
|
self.logger.debug(message)
|
|
|
|
def _log_info(self, message: str) -> None:
|
|
"""Log info message if logger is available and not in errors-only mode."""
|
|
if self.logger and not self.log_errors_only:
|
|
self.logger.info(message)
|
|
|
|
def _log_warning(self, message: str) -> None:
|
|
"""Log warning message if logger is available and not in errors-only mode."""
|
|
if self.logger and not self.log_errors_only:
|
|
self.logger.warning(message)
|
|
|
|
def _log_error(self, message: str, exc_info: bool = False) -> None:
|
|
"""Log error message if logger is available (always logs errors regardless of log_errors_only)."""
|
|
if self.logger:
|
|
self.logger.error(message, exc_info=exc_info)
|
|
|
|
def _log_critical(self, message: str, exc_info: bool = False) -> None:
|
|
"""Log critical message if logger is available (always logs critical regardless of log_errors_only)."""
|
|
if self.logger:
|
|
self.logger.critical(message, exc_info=exc_info)
|
|
|
|
def add_collector(self,
|
|
collector: BaseDataCollector,
|
|
config: Optional[CollectorConfig] = None) -> None:
|
|
"""
|
|
Add a collector to be managed.
|
|
|
|
Args:
|
|
collector: Data collector instance
|
|
config: Optional configuration (will create default if not provided)
|
|
"""
|
|
# Use a more unique name to avoid duplicates
|
|
collector_name = f"{collector.exchange_name}_{int(time.time() * 1000000) % 1000000}"
|
|
|
|
# Ensure unique name
|
|
counter = 1
|
|
base_name = collector_name
|
|
while collector_name in self._collectors:
|
|
collector_name = f"{base_name}_{counter}"
|
|
counter += 1
|
|
|
|
if config is None:
|
|
config = CollectorConfig(
|
|
name=collector_name,
|
|
exchange=collector.exchange_name,
|
|
symbols=list(collector.symbols),
|
|
data_types=[dt.value for dt in collector.data_types],
|
|
auto_restart=collector.auto_restart,
|
|
health_check_interval=collector._state_telemetry.health_check_interval
|
|
)
|
|
|
|
self._collectors[collector_name] = collector
|
|
self._collector_configs[collector_name] = config
|
|
|
|
if config.enabled:
|
|
self._enabled_collectors.add(collector_name)
|
|
|
|
self._stats['total_collectors'] = len(self._collectors)
|
|
|
|
self._log_info(f"Added collector: {collector_name} ({collector.exchange_name}) - "
|
|
f"Symbols: {', '.join(collector.symbols)} - Enabled: {config.enabled}")
|
|
|
|
def remove_collector(self, collector_name: str) -> bool:
|
|
"""
|
|
Remove a collector from management.
|
|
|
|
Args:
|
|
collector_name: Name of the collector to remove
|
|
|
|
Returns:
|
|
True if removed successfully, False if not found
|
|
"""
|
|
if collector_name not in self._collectors:
|
|
self._log_warning(f"Collector not found: {collector_name}")
|
|
return False
|
|
|
|
# Stop the collector first (only if event loop is running)
|
|
collector = self._collectors[collector_name]
|
|
if collector.status != CollectorStatus.STOPPED:
|
|
try:
|
|
# Try to create task only if event loop is running
|
|
asyncio.create_task(collector.stop(force=True))
|
|
except RuntimeError:
|
|
# No event loop running, just log
|
|
self._log_info(f"Collector {collector_name} will be removed without stopping (no event loop)")
|
|
|
|
# Remove from management
|
|
del self._collectors[collector_name]
|
|
del self._collector_configs[collector_name]
|
|
self._enabled_collectors.discard(collector_name)
|
|
|
|
self._stats['total_collectors'] = len(self._collectors)
|
|
|
|
self._log_info(f"Removed collector: {collector_name}")
|
|
return True
|
|
|
|
def enable_collector(self, collector_name: str) -> bool:
|
|
"""
|
|
Enable a collector (will be started if manager is running).
|
|
|
|
Args:
|
|
collector_name: Name of the collector to enable
|
|
|
|
Returns:
|
|
True if enabled successfully, False if not found
|
|
"""
|
|
if collector_name not in self._collectors:
|
|
self._log_warning(f"Collector not found: {collector_name}")
|
|
return False
|
|
|
|
self._enabled_collectors.add(collector_name)
|
|
self._collector_configs[collector_name].enabled = True
|
|
|
|
# Start the collector if manager is running (only if event loop is running)
|
|
if self._running:
|
|
try:
|
|
asyncio.create_task(self._start_collector(collector_name))
|
|
except RuntimeError:
|
|
# No event loop running, will be started when manager starts
|
|
self._log_debug(f"Collector {collector_name} enabled but will start when manager starts")
|
|
|
|
self._log_info(f"Enabled collector: {collector_name}")
|
|
return True
|
|
|
|
def disable_collector(self, collector_name: str) -> bool:
|
|
"""
|
|
Disable a collector (will be stopped if running).
|
|
|
|
Args:
|
|
collector_name: Name of the collector to disable
|
|
|
|
Returns:
|
|
True if disabled successfully, False if not found
|
|
"""
|
|
if collector_name not in self._collectors:
|
|
self.logger.warning(f"Collector not found: {collector_name}")
|
|
return False
|
|
|
|
self._enabled_collectors.discard(collector_name)
|
|
self._collector_configs[collector_name].enabled = False
|
|
|
|
# Stop the collector (only if event loop is running)
|
|
collector = self._collectors[collector_name]
|
|
try:
|
|
asyncio.create_task(collector.stop(force=True))
|
|
except RuntimeError:
|
|
# No event loop running, just log
|
|
self.logger.debug(f"Collector {collector_name} disabled but cannot stop (no event loop)")
|
|
|
|
self.logger.info(f"Disabled collector: {collector_name}")
|
|
return True
|
|
|
|
async def start(self) -> bool:
|
|
"""
|
|
Start the collector manager and all enabled collectors.
|
|
|
|
Returns:
|
|
True if started successfully, False otherwise
|
|
"""
|
|
if self.status in [ManagerStatus.RUNNING, ManagerStatus.STARTING]:
|
|
self._log_warning("Collector manager is already running or starting")
|
|
return True
|
|
|
|
self._log_info("Starting collector manager")
|
|
self.status = ManagerStatus.STARTING
|
|
|
|
try:
|
|
self._running = True
|
|
self._stats['uptime_start'] = datetime.now(timezone.utc)
|
|
|
|
# Start all enabled collectors
|
|
start_tasks = []
|
|
for collector_name in self._enabled_collectors:
|
|
task = asyncio.create_task(self._start_collector(collector_name))
|
|
start_tasks.append(task)
|
|
|
|
# Wait for all collectors to start (with timeout)
|
|
if start_tasks:
|
|
try:
|
|
await asyncio.wait_for(asyncio.gather(*start_tasks, return_exceptions=True), timeout=30.0)
|
|
except asyncio.TimeoutError:
|
|
self._log_warning("Some collectors took too long to start")
|
|
|
|
# Start global health monitoring
|
|
health_task = asyncio.create_task(self._global_health_monitor())
|
|
self._tasks.add(health_task)
|
|
health_task.add_done_callback(self._tasks.discard)
|
|
|
|
self.status = ManagerStatus.RUNNING
|
|
self._log_info(f"Collector manager started - Managing {len(self._enabled_collectors)} collectors")
|
|
return True
|
|
|
|
except Exception as e:
|
|
self.status = ManagerStatus.ERROR
|
|
self._log_error(f"Failed to start collector manager: {e}")
|
|
return False
|
|
|
|
async def stop(self) -> None:
|
|
"""Stop the collector manager and all collectors."""
|
|
if self.status == ManagerStatus.STOPPED:
|
|
self._log_warning("Collector manager is already stopped")
|
|
return
|
|
|
|
self._log_info("Stopping collector manager")
|
|
self.status = ManagerStatus.STOPPING
|
|
self._running = False
|
|
|
|
try:
|
|
# Cancel manager tasks
|
|
for task in list(self._tasks):
|
|
task.cancel()
|
|
|
|
if self._tasks:
|
|
await asyncio.gather(*self._tasks, return_exceptions=True)
|
|
|
|
# Stop all collectors
|
|
stop_tasks = []
|
|
for collector in self._collectors.values():
|
|
task = asyncio.create_task(collector.stop(force=True))
|
|
stop_tasks.append(task)
|
|
|
|
# Wait for all collectors to stop (with timeout)
|
|
if stop_tasks:
|
|
try:
|
|
await asyncio.wait_for(asyncio.gather(*stop_tasks, return_exceptions=True), timeout=30.0)
|
|
except asyncio.TimeoutError:
|
|
self._log_warning("Some collectors took too long to stop")
|
|
|
|
self.status = ManagerStatus.STOPPED
|
|
self._log_info("Collector manager stopped")
|
|
|
|
except Exception as e:
|
|
self.status = ManagerStatus.ERROR
|
|
self._log_error(f"Error stopping collector manager: {e}")
|
|
|
|
async def restart_collector(self, collector_name: str) -> bool:
|
|
"""
|
|
Restart a specific collector.
|
|
|
|
Args:
|
|
collector_name: Name of the collector to restart
|
|
|
|
Returns:
|
|
True if restarted successfully, False otherwise
|
|
"""
|
|
if collector_name not in self._collectors:
|
|
self._log_warning(f"Collector not found: {collector_name}")
|
|
return False
|
|
|
|
collector = self._collectors[collector_name]
|
|
self._log_info(f"Restarting collector: {collector_name}")
|
|
|
|
try:
|
|
success = await collector.restart()
|
|
if success:
|
|
self._stats['restarts_performed'] += 1
|
|
self._log_info(f"Successfully restarted collector: {collector_name}")
|
|
else:
|
|
self._log_error(f"Failed to restart collector: {collector_name}")
|
|
return success
|
|
|
|
except Exception as e:
|
|
self._log_error(f"Error restarting collector {collector_name}: {e}")
|
|
return False
|
|
|
|
async def _start_collector(self, collector_name: str) -> bool:
|
|
"""
|
|
Start a specific collector.
|
|
|
|
Args:
|
|
collector_name: Name of the collector to start
|
|
|
|
Returns:
|
|
True if started successfully, False otherwise
|
|
"""
|
|
if collector_name not in self._collectors:
|
|
self._log_warning(f"Collector not found: {collector_name}")
|
|
return False
|
|
|
|
collector = self._collectors[collector_name]
|
|
|
|
try:
|
|
success = await collector.start()
|
|
if success:
|
|
self._log_info(f"Started collector: {collector_name}")
|
|
else:
|
|
self._log_error(f"Failed to start collector: {collector_name}")
|
|
return success
|
|
|
|
except Exception as e:
|
|
self._log_error(f"Error starting collector {collector_name}: {e}")
|
|
return False
|
|
|
|
async def _global_health_monitor(self) -> None:
|
|
"""Global health monitoring for all collectors."""
|
|
self._log_debug("Starting global health monitor")
|
|
|
|
while self._running:
|
|
try:
|
|
await asyncio.sleep(self.global_health_check_interval)
|
|
|
|
self._last_global_check = datetime.now(timezone.utc)
|
|
self._stats['last_global_check'] = self._last_global_check
|
|
|
|
# Check each enabled collector
|
|
running_count = 0
|
|
failed_count = 0
|
|
|
|
for collector_name in self._enabled_collectors:
|
|
collector = self._collectors[collector_name]
|
|
health_status = collector.get_health_status()
|
|
|
|
if health_status['is_healthy'] and collector.status == CollectorStatus.RUNNING:
|
|
running_count += 1
|
|
elif not health_status['is_healthy']:
|
|
failed_count += 1
|
|
self._log_warning(f"Collector {collector_name} is unhealthy: {health_status['issues']}")
|
|
|
|
# Auto-restart if needed and not already restarting
|
|
if (collector.auto_restart and
|
|
collector.status not in [CollectorStatus.STARTING, CollectorStatus.STOPPING]):
|
|
self._log_info(f"Auto-restarting unhealthy collector: {collector_name}")
|
|
asyncio.create_task(self.restart_collector(collector_name))
|
|
|
|
# Update global statistics
|
|
self._stats['running_collectors'] = running_count
|
|
self._stats['failed_collectors'] = failed_count
|
|
|
|
self._log_debug(f"Health check complete - Running: {running_count}, Failed: {failed_count}")
|
|
|
|
except asyncio.CancelledError:
|
|
self._log_debug("Global health monitor cancelled")
|
|
break
|
|
except Exception as e:
|
|
self._log_error(f"Error in global health monitor: {e}")
|
|
await asyncio.sleep(self.global_health_check_interval)
|
|
|
|
def get_status(self) -> Dict[str, Any]:
|
|
"""
|
|
Get manager status and statistics.
|
|
|
|
Returns:
|
|
Dictionary containing status information
|
|
"""
|
|
uptime_seconds = None
|
|
if self._stats['uptime_start']:
|
|
uptime_seconds = (datetime.now(timezone.utc) - self._stats['uptime_start']).total_seconds()
|
|
|
|
# Get individual collector statuses
|
|
collector_statuses = {}
|
|
for name, collector in self._collectors.items():
|
|
collector_statuses[name] = {
|
|
'status': collector.status.value,
|
|
'enabled': name in self._enabled_collectors,
|
|
'health': collector.get_health_status()
|
|
}
|
|
|
|
return {
|
|
'manager_status': self.status.value,
|
|
'uptime_seconds': uptime_seconds,
|
|
'statistics': self._stats,
|
|
'collectors': collector_statuses,
|
|
'enabled_collectors': list(self._enabled_collectors),
|
|
'total_collectors': len(self._collectors)
|
|
}
|
|
|
|
def get_collector_status(self, collector_name: str) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Get status for a specific collector.
|
|
|
|
Args:
|
|
collector_name: Name of the collector
|
|
|
|
Returns:
|
|
Collector status dict or None if not found
|
|
"""
|
|
if collector_name not in self._collectors:
|
|
return None
|
|
|
|
collector = self._collectors[collector_name]
|
|
return {
|
|
'name': collector_name,
|
|
'config': self._collector_configs[collector_name].__dict__,
|
|
'status': collector.get_status(),
|
|
'health': collector.get_health_status()
|
|
}
|
|
|
|
def list_collectors(self) -> List[str]:
|
|
"""
|
|
List all managed collector names.
|
|
|
|
Returns:
|
|
List of collector names
|
|
"""
|
|
return list(self._collectors.keys())
|
|
|
|
def get_running_collectors(self) -> List[str]:
|
|
"""
|
|
Get names of currently running collectors.
|
|
|
|
Returns:
|
|
List of running collector names
|
|
"""
|
|
running = []
|
|
for name, collector in self._collectors.items():
|
|
if collector.status == CollectorStatus.RUNNING:
|
|
running.append(name)
|
|
return running
|
|
|
|
def get_failed_collectors(self) -> List[str]:
|
|
"""
|
|
Get names of failed or unhealthy collectors.
|
|
|
|
Returns:
|
|
List of failed collector names
|
|
"""
|
|
failed = []
|
|
for name, collector in self._collectors.items():
|
|
health_status = collector.get_health_status()
|
|
if not health_status['is_healthy']:
|
|
failed.append(name)
|
|
return failed
|
|
|
|
async def restart_all_collectors(self) -> Dict[str, bool]:
|
|
"""
|
|
Restart all enabled collectors.
|
|
|
|
Returns:
|
|
Dictionary mapping collector names to restart success status
|
|
"""
|
|
self.logger.info("Restarting all enabled collectors")
|
|
|
|
results = {}
|
|
restart_tasks = []
|
|
|
|
for collector_name in self._enabled_collectors:
|
|
task = asyncio.create_task(self.restart_collector(collector_name))
|
|
restart_tasks.append((collector_name, task))
|
|
|
|
# Wait for all restarts to complete
|
|
for collector_name, task in restart_tasks:
|
|
try:
|
|
results[collector_name] = await task
|
|
except Exception as e:
|
|
self.logger.error(f"Error restarting {collector_name}: {e}")
|
|
results[collector_name] = False
|
|
|
|
successful_restarts = sum(1 for success in results.values() if success)
|
|
self.logger.info(f"Restart complete - {successful_restarts}/{len(results)} collectors restarted successfully")
|
|
|
|
return results
|
|
|
|
def __repr__(self) -> str:
|
|
"""String representation of the manager."""
|
|
return f"<CollectorManager({self.manager_name}, {len(self._collectors)} collectors, {self.status.value})>" |