diff --git a/config/service_config.py b/config/service_config.py new file mode 100644 index 0000000..886d683 --- /dev/null +++ b/config/service_config.py @@ -0,0 +1,330 @@ +""" +Service Configuration Manager for data collection service. + +This module handles configuration loading, validation, schema management, +and default configuration creation with security enhancements. +""" + +import json +import os +import stat +from pathlib import Path +from typing import Dict, Any, Optional, List +from dataclasses import dataclass + + +@dataclass +class ServiceConfigSchema: + """Schema definition for service configuration.""" + exchange: str = "okx" + connection: Dict[str, Any] = None + data_collection: Dict[str, Any] = None + trading_pairs: List[Dict[str, Any]] = None + logging: Dict[str, Any] = None + database: Dict[str, Any] = None + + +class ServiceConfig: + """Manages service configuration with validation and security.""" + + def __init__(self, config_path: str = "config/data_collection.json", logger=None): + """ + Initialize the service configuration manager. + + Args: + config_path: Path to the configuration file + logger: Logger instance for logging operations + """ + self.config_path = config_path + self.logger = logger + self._config: Optional[Dict[str, Any]] = None + + def load_config(self) -> Dict[str, Any]: + """ + Load and validate service configuration from JSON file. + + Returns: + Dictionary containing the configuration + + Raises: + Exception: If configuration loading or validation fails + """ + try: + config_file = Path(self.config_path) + + # Create default config if it doesn't exist + if not config_file.exists(): + self._create_default_config(config_file) + + # Validate file permissions for security + self._validate_file_permissions(config_file) + + # Load configuration + with open(config_file, 'r') as f: + config = json.load(f) + + # Validate configuration schema + validated_config = self._validate_config_schema(config) + + self._config = validated_config + + if self.logger: + self.logger.info(f"✅ Configuration loaded from {self.config_path}") + + return validated_config + + except Exception as e: + if self.logger: + self.logger.error(f"❌ Failed to load configuration: {e}", exc_info=True) + raise + + def _validate_file_permissions(self, config_file: Path) -> None: + """ + Validate configuration file permissions for security. + + Args: + config_file: Path to the configuration file + + Raises: + PermissionError: If file permissions are too permissive + """ + try: + file_stat = config_file.stat() + file_mode = file_stat.st_mode + + # Check if file is readable by others (security risk) + if file_mode & stat.S_IROTH: + if self.logger: + self.logger.warning(f"⚠️ Configuration file {config_file} is readable by others") + + # Check if file is writable by others (security risk) + if file_mode & stat.S_IWOTH: + if self.logger: + self.logger.warning(f"⚠️ Configuration file {config_file} is writable by others") + + except Exception as e: + if self.logger: + self.logger.warning(f"⚠️ Could not validate file permissions: {e}") + + def _validate_config_schema(self, config: Dict[str, Any]) -> Dict[str, Any]: + """ + Validate configuration against schema and apply defaults. + + Args: + config: Raw configuration dictionary + + Returns: + Validated configuration with defaults applied + """ + validated = config.copy() + + # Validate required fields + required_fields = ['exchange', 'trading_pairs'] + for field in required_fields: + if field not in validated: + raise ValueError(f"Missing required configuration field: {field}") + + # Apply defaults for optional sections + if 'connection' not in validated: + validated['connection'] = self._get_default_connection_config() + + if 'data_collection' not in validated: + validated['data_collection'] = self._get_default_data_collection_config() + + if 'logging' not in validated: + validated['logging'] = self._get_default_logging_config() + + if 'database' not in validated: + validated['database'] = self._get_default_database_config() + + # Validate trading pairs + self._validate_trading_pairs(validated['trading_pairs']) + + return validated + + def _validate_trading_pairs(self, trading_pairs: List[Dict[str, Any]]) -> None: + """ + Validate trading pairs configuration. + + Args: + trading_pairs: List of trading pair configurations + + Raises: + ValueError: If trading pairs configuration is invalid + """ + if not trading_pairs: + raise ValueError("At least one trading pair must be configured") + + for i, pair in enumerate(trading_pairs): + if 'symbol' not in pair: + raise ValueError(f"Trading pair {i} missing required 'symbol' field") + + symbol = pair['symbol'] + if not isinstance(symbol, str) or '-' not in symbol: + raise ValueError(f"Invalid symbol format: {symbol}. Expected format: 'BASE-QUOTE'") + + # Validate data types + data_types = pair.get('data_types', ['trade']) + valid_data_types = ['trade', 'orderbook', 'ticker', 'candle'] + for dt in data_types: + if dt not in valid_data_types: + raise ValueError(f"Invalid data type '{dt}' for {symbol}. Valid types: {valid_data_types}") + + def _create_default_config(self, config_file: Path) -> None: + """ + Create a default configuration file. + + Args: + config_file: Path where the configuration file should be created + """ + default_config = { + "exchange": "okx", + "connection": self._get_default_connection_config(), + "data_collection": self._get_default_data_collection_config(), + "trading_pairs": self._get_default_trading_pairs(), + "logging": self._get_default_logging_config(), + "database": self._get_default_database_config() + } + + # Ensure directory exists + config_file.parent.mkdir(parents=True, exist_ok=True) + + # Write configuration file + with open(config_file, 'w') as f: + json.dump(default_config, f, indent=2) + + # Set secure file permissions (owner read/write only) + try: + os.chmod(config_file, stat.S_IRUSR | stat.S_IWUSR) + except Exception as e: + if self.logger: + self.logger.warning(f"⚠️ Could not set secure file permissions: {e}") + + if self.logger: + self.logger.info(f"📄 Created default configuration: {config_file}") + + def _get_default_connection_config(self) -> Dict[str, Any]: + """Get default connection configuration.""" + return { + "public_ws_url": "wss://ws.okx.com:8443/ws/v5/public", + "private_ws_url": "wss://ws.okx.com:8443/ws/v5/private", + "ping_interval": 25.0, + "pong_timeout": 10.0, + "max_reconnect_attempts": 5, + "reconnect_delay": 5.0 + } + + def _get_default_data_collection_config(self) -> Dict[str, Any]: + """Get default data collection configuration.""" + return { + "store_raw_data": True, + "health_check_interval": 120.0, + "auto_restart": True, + "buffer_size": 1000 + } + + def _get_default_trading_pairs(self) -> List[Dict[str, Any]]: + """Get default trading pairs configuration.""" + return [ + { + "symbol": "BTC-USDT", + "enabled": True, + "data_types": ["trade", "orderbook"], + "timeframes": ["1m", "5m", "15m", "1h"], + "channels": { + "trades": "trades", + "orderbook": "books5", + "ticker": "tickers" + } + }, + { + "symbol": "ETH-USDT", + "enabled": True, + "data_types": ["trade", "orderbook"], + "timeframes": ["1m", "5m", "15m", "1h"], + "channels": { + "trades": "trades", + "orderbook": "books5", + "ticker": "tickers" + } + } + ] + + def _get_default_logging_config(self) -> Dict[str, Any]: + """Get default logging configuration.""" + return { + "component_name_template": "okx_collector_{symbol}", + "log_level": "INFO", + "verbose": False + } + + def _get_default_database_config(self) -> Dict[str, Any]: + """Get default database configuration.""" + return { + "store_processed_data": True, + "store_raw_data": True, + "force_update_candles": False, + "batch_size": 100, + "flush_interval": 5.0 + } + + def get_config(self) -> Dict[str, Any]: + """ + Get the current configuration. + + Returns: + Current configuration dictionary + + Raises: + RuntimeError: If configuration has not been loaded + """ + if self._config is None: + raise RuntimeError("Configuration has not been loaded. Call load_config() first.") + return self._config.copy() + + def get_exchange_config(self) -> Dict[str, Any]: + """Get exchange-specific configuration.""" + config = self.get_config() + return { + 'exchange': config['exchange'], + 'connection': config['connection'] + } + + def get_enabled_trading_pairs(self) -> List[Dict[str, Any]]: + """Get list of enabled trading pairs.""" + config = self.get_config() + trading_pairs = config.get('trading_pairs', []) + return [pair for pair in trading_pairs if pair.get('enabled', True)] + + def get_data_collection_config(self) -> Dict[str, Any]: + """Get data collection configuration.""" + config = self.get_config() + return config.get('data_collection', {}) + + def update_config(self, updates: Dict[str, Any]) -> None: + """ + Update configuration with new values. + + Args: + updates: Dictionary of configuration updates + """ + if self._config is None: + raise RuntimeError("Configuration has not been loaded. Call load_config() first.") + + self._config.update(updates) + + # Optionally save to file + if self.logger: + self.logger.info("Configuration updated in memory") + + def save_config(self) -> None: + """Save current configuration to file.""" + if self._config is None: + raise RuntimeError("Configuration has not been loaded. Call load_config() first.") + + config_file = Path(self.config_path) + with open(config_file, 'w') as f: + json.dump(self._config, f, indent=2) + + if self.logger: + self.logger.info(f"Configuration saved to {self.config_path}") \ No newline at end of file diff --git a/data/__init__.py b/data/__init__.py index b3cb177..a3a5d63 100644 --- a/data/__init__.py +++ b/data/__init__.py @@ -11,7 +11,8 @@ from .base_collector import ( from .collector.collector_state_telemetry import CollectorStatus from .common.ohlcv_data import OHLCVData, DataValidationError from .common.data_types import DataType, MarketDataPoint -from .collector_manager import CollectorManager, ManagerStatus, CollectorConfig +from .collector_manager import CollectorManager +from .collector_types import ManagerStatus, CollectorConfig __all__ = [ 'BaseDataCollector', diff --git a/data/collection_service.py b/data/collection_service.py index 36ed575..7e74d19 100644 --- a/data/collection_service.py +++ b/data/collection_service.py @@ -4,16 +4,12 @@ Data Collection Service Production-ready service for cryptocurrency market data collection with clean logging and robust error handling. - -This service manages multiple data collectors for different trading pairs -and exchanges, with proper health monitoring and graceful shutdown. """ import asyncio import signal import sys import time -import json from datetime import datetime from pathlib import Path from typing import List, Optional, Dict, Any @@ -27,52 +23,35 @@ sys.path.insert(0, str(project_root)) import os os.environ['DEBUG'] = 'false' -# Suppress verbose SQLAlchemy logging for production +# Suppress verbose SQLAlchemy logging logging.getLogger('sqlalchemy').setLevel(logging.WARNING) logging.getLogger('sqlalchemy.engine').setLevel(logging.WARNING) logging.getLogger('sqlalchemy.pool').setLevel(logging.WARNING) logging.getLogger('sqlalchemy.dialects').setLevel(logging.WARNING) logging.getLogger('sqlalchemy.orm').setLevel(logging.WARNING) -from data.exchanges.factory import ExchangeFactory from data.collector_manager import CollectorManager -from data.base_collector import DataType +from config.service_config import ServiceConfig +from data.collector_factory import CollectorFactory from database.connection import init_database from utils.logger import get_logger class DataCollectionService: - """ - Production data collection service. - - Manages multiple data collectors with clean logging focused on: - - Service lifecycle (start/stop/restart) - - Connection status (connect/disconnect/reconnect) - - Health status and errors - - Basic collection statistics - - Excludes verbose logging of individual trades/candles for production clarity. - """ + """Production data collection service with modular architecture.""" def __init__(self, config_path: str = "config/data_collection.json"): """Initialize the data collection service.""" self.config_path = config_path + self.logger = get_logger("data_collection_service", log_level="INFO", verbose=False) - # Initialize clean logging first - only essential information - self.logger = get_logger( - "data_collection_service", - log_level="INFO", - verbose=False # Clean console output - ) - - # Load configuration after logger is initialized - self.config = self._load_config() + # Initialize configuration and factory + self.service_config = ServiceConfig(config_path, logger=self.logger) + self.config = self.service_config.load_config() + self.collector_factory = CollectorFactory(logger=self.logger) # Core components - self.collector_manager = CollectorManager( - logger=self.logger, - log_errors_only=True # Only log errors and essential events - ) + self.collector_manager = CollectorManager(logger=self.logger, log_errors_only=True) self.collectors: List = [] # Service state @@ -92,171 +71,26 @@ class DataCollectionService: self.logger.info("🚀 Data Collection Service initialized") self.logger.info(f"📁 Configuration: {config_path}") - def _load_config(self) -> Dict[str, Any]: - """Load service configuration from JSON file.""" - try: - config_file = Path(self.config_path) - if not config_file.exists(): - # Create default config if it doesn't exist - self._create_default_config(config_file) - - with open(config_file, 'r') as f: - config = json.load(f) - - self.logger.info(f"✅ Configuration loaded from {self.config_path}") - return config - - except Exception as e: - self.logger.error(f"❌ Failed to load configuration: {e}") - raise - - def _create_default_config(self, config_file: Path) -> None: - """Create a default configuration file.""" - default_config = { - "exchange": "okx", - "connection": { - "public_ws_url": "wss://ws.okx.com:8443/ws/v5/public", - "private_ws_url": "wss://ws.okx.com:8443/ws/v5/private", - "ping_interval": 25.0, - "pong_timeout": 10.0, - "max_reconnect_attempts": 5, - "reconnect_delay": 5.0 - }, - "data_collection": { - "store_raw_data": True, - "health_check_interval": 120.0, - "auto_restart": True, - "buffer_size": 1000 - }, - "trading_pairs": [ - { - "symbol": "BTC-USDT", - "enabled": True, - "data_types": ["trade", "orderbook"], - "timeframes": ["1m", "5m", "15m", "1h"], - "channels": { - "trades": "trades", - "orderbook": "books5", - "ticker": "tickers" - } - }, - { - "symbol": "ETH-USDT", - "enabled": True, - "data_types": ["trade", "orderbook"], - "timeframes": ["1m", "5m", "15m", "1h"], - "channels": { - "trades": "trades", - "orderbook": "books5", - "ticker": "tickers" - } - } - ], - "logging": { - "component_name_template": "okx_collector_{symbol}", - "log_level": "INFO", - "verbose": False - }, - "database": { - "store_processed_data": True, - "store_raw_data": True, - "force_update_candles": False, - "batch_size": 100, - "flush_interval": 5.0 - } - } - - # Ensure directory exists - config_file.parent.mkdir(parents=True, exist_ok=True) - - with open(config_file, 'w') as f: - json.dump(default_config, f, indent=2) - - self.logger.info(f"📄 Created default configuration: {config_file}") - async def initialize_collectors(self) -> bool: """Initialize all data collectors based on configuration.""" try: - # Get exchange configuration (now using okx_config.json structure) - exchange_name = self.config.get('exchange', 'okx') - trading_pairs = self.config.get('trading_pairs', []) - data_collection_config = self.config.get('data_collection', {}) + collectors = await self.collector_factory.create_collectors_from_config(self.config) - enabled_pairs = [pair for pair in trading_pairs if pair.get('enabled', True)] - - if not enabled_pairs: - self.logger.warning(f"⚠️ No enabled trading pairs for {exchange_name}") + if not collectors: + self.logger.error("❌ No collectors were successfully created") return False - self.logger.info(f"🔧 Initializing {len(enabled_pairs)} collectors for {exchange_name.upper()}") - - total_collectors = 0 - - # Create collectors for each trading pair - for pair_config in enabled_pairs: - if await self._create_collector(exchange_name, pair_config, data_collection_config): - total_collectors += 1 - else: - self.logger.error(f"❌ Failed to create collector for {pair_config.get('symbol', 'unknown')}") - self.stats['errors_count'] += 1 - - self.stats['collectors_created'] = total_collectors - - if total_collectors > 0: - self.logger.info(f"✅ Successfully initialized {total_collectors} data collectors") - return True - else: - self.logger.error("❌ No collectors were successfully initialized") - return False - - except Exception as e: - self.logger.error(f"❌ Failed to initialize collectors: {e}") - self.stats['errors_count'] += 1 - return False - - async def _create_collector(self, exchange_name: str, pair_config: Dict[str, Any], data_collection_config: Dict[str, Any]) -> bool: - """Create a single data collector for a trading pair.""" - try: - from data.exchanges.factory import ExchangeCollectorConfig - - symbol = pair_config['symbol'] - data_types = [DataType(dt) for dt in pair_config.get('data_types', ['trade'])] - timeframes = pair_config.get('timeframes', ['1m', '5m']) - - # Create collector configuration using the proper structure - collector_config = ExchangeCollectorConfig( - exchange=exchange_name, - symbol=symbol, - data_types=data_types, - timeframes=timeframes, # Pass timeframes to config - auto_restart=data_collection_config.get('auto_restart', True), - health_check_interval=data_collection_config.get('health_check_interval', 120.0), - store_raw_data=data_collection_config.get('store_raw_data', True), - custom_params={ - 'component_name': f"{exchange_name}_collector_{symbol.replace('-', '_').lower()}", - 'logger': self.logger, - 'log_errors_only': True, # Clean logging - only errors and essential events - 'force_update_candles': self.config.get('database', {}).get('force_update_candles', False), - 'timeframes': timeframes # Pass timeframes to collector - } - ) - - # Create collector using factory with proper config - collector = ExchangeFactory.create_collector(collector_config) - - if collector: - # Add to manager + for collector in collectors: self.collector_manager.add_collector(collector) self.collectors.append(collector) - - self.logger.info(f"✅ Created collector: {symbol} [{'/'.join(timeframes)}]") - return True - else: - self.logger.error(f"❌ Failed to create collector for {symbol}") - return False + + self.stats['collectors_created'] = len(collectors) + self.logger.info(f"✅ Successfully initialized {len(collectors)} data collectors") + return True except Exception as e: - self.logger.error(f"❌ Error creating collector for {pair_config.get('symbol', 'unknown')}: {e}") + self.logger.error(f"❌ Failed to initialize collectors: {e}", exc_info=True) + self.stats['errors_count'] += 1 return False async def start(self) -> bool: @@ -267,7 +101,6 @@ class DataCollectionService: self.logger.info("🚀 Starting Data Collection Service...") - # Initialize database self.logger.info("📊 Initializing database connection...") init_database() self.logger.info("✅ Database connection established") @@ -289,7 +122,7 @@ class DataCollectionService: return False except Exception as e: - self.logger.error(f"❌ Failed to start service: {e}") + self.logger.error(f"❌ Failed to start service: {e}", exc_info=True) self.stats['errors_count'] += 1 return False @@ -312,7 +145,7 @@ class DataCollectionService: self.logger.info(f"📊 Total uptime: {self.stats['total_uptime_seconds']:.1f} seconds") except Exception as e: - self.logger.error(f"❌ Error during service shutdown: {e}") + self.logger.error(f"❌ Error during service shutdown: {e}", exc_info=True) self.stats['errors_count'] += 1 def get_status(self) -> Dict[str, Any]: @@ -386,7 +219,7 @@ class DataCollectionService: current_time = time.time() # Check duration limit - if duration_hours: + if duration_hours and self.start_time: elapsed_hours = (current_time - self.start_time) / 3600 if elapsed_hours >= duration_hours: self.logger.info(f"⏰ Completed {duration_hours} hour run") @@ -394,17 +227,17 @@ class DataCollectionService: # Periodic status update if current_time - last_update >= update_interval: - elapsed_hours = (current_time - self.start_time) / 3600 - self.logger.info(f"⏱️ Service uptime: {elapsed_hours:.1f} hours") + if self.start_time: + elapsed_hours = (current_time - self.start_time) / 3600 + self.logger.info(f"⏱️ Service uptime: {elapsed_hours:.1f} hours") last_update = current_time return True except Exception as e: - self.logger.error(f"❌ Service error: {e}") + self.logger.error(f"❌ Service error: {e}", exc_info=True) self.stats['errors_count'] += 1 return False - finally: await self.stop() @@ -414,38 +247,18 @@ async def run_data_collection_service( config_path: str = "config/data_collection.json", duration_hours: Optional[float] = None ) -> bool: - """ - Run the data collection service. - - Args: - config_path: Path to configuration file - duration_hours: Optional duration in hours (None = indefinite) - - Returns: - bool: True if successful, False otherwise - """ + """Run the data collection service.""" service = DataCollectionService(config_path) return await service.run(duration_hours) - if __name__ == "__main__": - # Simple CLI when run directly import argparse parser = argparse.ArgumentParser(description="Data Collection Service") - parser.add_argument('--config', default="config/data_collection.json", - help='Configuration file path') - parser.add_argument('--hours', type=float, - help='Run duration in hours (default: indefinite)') + parser.add_argument("--config", default="config/data_collection.json", help="Configuration file path") + parser.add_argument("--duration", type=float, help="Duration to run in hours (default: indefinite)") args = parser.parse_args() - try: - success = asyncio.run(run_data_collection_service(args.config, args.hours)) - sys.exit(0 if success else 1) - except KeyboardInterrupt: - print("\n👋 Service interrupted by user") - sys.exit(0) - except Exception as e: - print(f"❌ Fatal error: {e}") - sys.exit(1) \ No newline at end of file + # Run service + asyncio.run(run_data_collection_service(args.config, args.duration)) \ No newline at end of file diff --git a/data/collector_factory.py b/data/collector_factory.py new file mode 100644 index 0000000..f196a47 --- /dev/null +++ b/data/collector_factory.py @@ -0,0 +1,322 @@ +""" +Collector Factory for data collection service. + +This module handles the creation of data collectors with proper configuration +and error handling, separating collector creation logic from the main service. +""" + +from typing import Dict, Any, List, Optional +from data.exchanges.factory import ExchangeFactory, ExchangeCollectorConfig +from data.base_collector import DataType + + +class CollectorFactory: + """Factory for creating and configuring data collectors.""" + + def __init__(self, logger=None): + """ + Initialize the collector factory. + + Args: + logger: Logger instance for logging operations + """ + self.logger = logger + + async def create_collector( + self, + exchange_name: str, + pair_config: Dict[str, Any], + data_collection_config: Dict[str, Any], + database_config: Dict[str, Any] + ) -> Optional[Any]: + """ + Create a single data collector for a trading pair. + + Args: + exchange_name: Name of the exchange (e.g., 'okx') + pair_config: Configuration for the trading pair + data_collection_config: Data collection settings + database_config: Database configuration settings + + Returns: + Created collector instance or None if creation failed + """ + try: + symbol = pair_config['symbol'] + + # Validate and parse configuration + validated_config = self._validate_pair_config(pair_config) + if not validated_config: + return None + + data_types = [DataType(dt) for dt in validated_config['data_types']] + timeframes = validated_config['timeframes'] + + # Create collector configuration + collector_config = self._create_collector_config( + exchange_name=exchange_name, + symbol=symbol, + data_types=data_types, + timeframes=timeframes, + data_collection_config=data_collection_config, + database_config=database_config + ) + + # Create collector using exchange factory + collector = ExchangeFactory.create_collector(collector_config) + + if collector: + if self.logger: + self.logger.info(f"Created collector: {symbol} [{'/'.join(timeframes)}]") + return collector + else: + if self.logger: + self.logger.error(f"Failed to create collector for {symbol}") + return None + + except Exception as e: + symbol = pair_config.get('symbol', 'unknown') + if self.logger: + self.logger.error(f"Error creating collector for {symbol}: {e}", exc_info=True) + return None + + def _validate_pair_config(self, pair_config: Dict[str, Any]) -> Optional[Dict[str, Any]]: + """ + Validate trading pair configuration. + + Args: + pair_config: Raw trading pair configuration + + Returns: + Validated configuration or None if invalid + """ + try: + # Validate required fields + if 'symbol' not in pair_config: + if self.logger: + self.logger.error("Trading pair missing required 'symbol' field") + return None + + symbol = pair_config['symbol'] + if not isinstance(symbol, str) or '-' not in symbol: + if self.logger: + self.logger.error(f"Invalid symbol format: {symbol}. Expected format: 'BASE-QUOTE'") + return None + + # Apply defaults and validate data types + data_types = pair_config.get('data_types', ['trade']) + valid_data_types = ['trade', 'orderbook', 'ticker', 'candle'] + + validated_data_types = [] + for dt in data_types: + if dt in valid_data_types: + validated_data_types.append(dt) + else: + if self.logger: + self.logger.warning(f"Invalid data type '{dt}' for {symbol}, skipping") + + if not validated_data_types: + validated_data_types = ['trade'] # Default fallback + + # Validate timeframes + timeframes = pair_config.get('timeframes', ['1m', '5m']) + # OKX supports second-level timeframes for real-time candle aggregation + valid_timeframes = ['1s', '5s', '1m', '3m', '5m', '15m', '30m', '1h', '2h', '4h', '6h', '8h', '12h', '1d'] + + validated_timeframes = [] + for tf in timeframes: + if tf in valid_timeframes: + validated_timeframes.append(tf) + else: + if self.logger: + self.logger.warning(f"Invalid timeframe '{tf}' for {symbol}, skipping") + + if not validated_timeframes: + validated_timeframes = ['1m', '5m'] # Default fallback + + return { + 'symbol': symbol, + 'data_types': validated_data_types, + 'timeframes': validated_timeframes, + 'enabled': pair_config.get('enabled', True), + 'channels': pair_config.get('channels', {}) + } + + except Exception as e: + if self.logger: + self.logger.error(f"Error validating pair config: {e}", exc_info=True) + return None + + def _create_collector_config( + self, + exchange_name: str, + symbol: str, + data_types: List[DataType], + timeframes: List[str], + data_collection_config: Dict[str, Any], + database_config: Dict[str, Any] + ) -> ExchangeCollectorConfig: + """ + Create exchange collector configuration. + + Args: + exchange_name: Name of the exchange + symbol: Trading pair symbol + data_types: List of data types to collect + timeframes: List of timeframes for candle data + data_collection_config: Data collection settings + database_config: Database configuration + + Returns: + Configured ExchangeCollectorConfig instance + """ + # Generate component name + component_name = f"{exchange_name}_collector_{symbol.replace('-', '_').lower()}" + + # Build custom parameters - only include parameters the collector accepts + custom_params = { + 'component_name': component_name, + 'logger': self.logger, + 'log_errors_only': True, # Clean logging - only errors and essential events + 'force_update_candles': database_config.get('force_update_candles', False), + } + + # Create and return collector configuration + return ExchangeCollectorConfig( + exchange=exchange_name, + symbol=symbol, + data_types=data_types, + timeframes=timeframes, + auto_restart=data_collection_config.get('auto_restart', True), + health_check_interval=data_collection_config.get('health_check_interval', 120.0), + store_raw_data=data_collection_config.get('store_raw_data', True), + custom_params=custom_params + ) + + async def create_collectors_from_config( + self, + config: Dict[str, Any] + ) -> List[Any]: + """ + Create multiple collectors from service configuration. + + Args: + config: Complete service configuration dictionary + + Returns: + List of created collector instances + """ + collectors = [] + + try: + exchange_name = config.get('exchange', 'okx') + trading_pairs = config.get('trading_pairs', []) + data_collection_config = config.get('data_collection', {}) + database_config = config.get('database', {}) + + # Filter enabled pairs + enabled_pairs = [pair for pair in trading_pairs if pair.get('enabled', True)] + + if not enabled_pairs: + if self.logger: + self.logger.warning(f"No enabled trading pairs for {exchange_name}") + return collectors + + if self.logger: + self.logger.info(f"Creating {len(enabled_pairs)} collectors for {exchange_name.upper()}") + + # Create collectors for each enabled pair + for pair_config in enabled_pairs: + collector = await self.create_collector( + exchange_name=exchange_name, + pair_config=pair_config, + data_collection_config=data_collection_config, + database_config=database_config + ) + + if collector: + collectors.append(collector) + else: + symbol = pair_config.get('symbol', 'unknown') + if self.logger: + self.logger.error(f"Failed to create collector for {symbol}") + + if self.logger: + self.logger.info(f"Successfully created {len(collectors)} collectors") + + return collectors + + except Exception as e: + if self.logger: + self.logger.error(f"Error creating collectors from config: {e}", exc_info=True) + return collectors + + def get_supported_exchanges(self) -> List[str]: + """ + Get list of supported exchanges. + + Returns: + List of supported exchange names + """ + # This could be enhanced to dynamically discover available exchanges + return ['okx', 'binance'] # Add more as they become available + + def get_supported_data_types(self) -> List[str]: + """ + Get list of supported data types. + + Returns: + List of supported data type names + """ + return ['trade', 'orderbook', 'ticker', 'candle'] + + def get_supported_timeframes(self) -> List[str]: + """ + Get list of supported timeframes. + + Returns: + List of supported timeframe strings + """ + return ['1s', '5s', '1m', '3m', '5m', '15m', '30m', '1h', '2h', '4h', '6h', '8h', '12h', '1d'] + + def validate_collector_requirements( + self, + exchange: str, + data_types: List[str], + timeframes: List[str] + ) -> Dict[str, Any]: + """ + Validate collector requirements against supported features. + + Args: + exchange: Exchange name + data_types: Required data types + timeframes: Required timeframes + + Returns: + Validation result with details + """ + result = { + 'valid': True, + 'errors': [], + 'warnings': [] + } + + # Validate exchange + if exchange not in self.get_supported_exchanges(): + result['valid'] = False + result['errors'].append(f"Unsupported exchange: {exchange}") + + # Validate data types + supported_data_types = self.get_supported_data_types() + for dt in data_types: + if dt not in supported_data_types: + result['warnings'].append(f"Unsupported data type: {dt}") + + # Validate timeframes + supported_timeframes = self.get_supported_timeframes() + for tf in timeframes: + if tf not in supported_timeframes: + result['warnings'].append(f"Unsupported timeframe: {tf}") + + return result \ No newline at end of file diff --git a/data/collector_manager.py b/data/collector_manager.py index 07043ec..8dc1763 100644 --- a/data/collector_manager.py +++ b/data/collector_manager.py @@ -6,35 +6,17 @@ auto-recovery, and coordinated lifecycle management. """ import asyncio -import time -from datetime import datetime, timezone, timedelta from typing import Dict, List, Optional, Any, Set -from dataclasses import dataclass -from enum import Enum from utils.logger import get_logger from .base_collector import BaseDataCollector, CollectorStatus - - -class ManagerStatus(Enum): - """Status of the collector manager.""" - STOPPED = "stopped" - STARTING = "starting" - RUNNING = "running" - STOPPING = "stopping" - ERROR = "error" - - -@dataclass -class CollectorConfig: - """Configuration for a data collector.""" - name: str - exchange: str - symbols: List[str] - data_types: List[str] - auto_restart: bool = True - health_check_interval: float = 30.0 - enabled: bool = True +from .collector_types import ManagerStatus, CollectorConfig +from .manager_components import ( + CollectorLifecycleManager, + ManagerHealthMonitor, + ManagerStatsTracker, + ManagerLogger +) class CollectorManager: @@ -54,510 +36,148 @@ class CollectorManager: restart_delay: float = 5.0, logger = None, log_errors_only: bool = False): - """ - Initialize the collector manager. - - Args: - manager_name: Name for logging - global_health_check_interval: Seconds between global health checks - restart_delay: Delay between restart attempts - logger: Logger instance. If None, no logging will be performed. - log_errors_only: If True and logger is provided, only log error-level messages - """ + """Initialize the collector manager with component-based architecture.""" self.manager_name = manager_name - self.global_health_check_interval = global_health_check_interval self.restart_delay = restart_delay - self.log_errors_only = log_errors_only - # Initialize logger based on parameters - if logger is not None: - self.logger = logger - else: - self.logger = None + # Initialize components + self.logger_manager = ManagerLogger(logger, log_errors_only) + self.lifecycle_manager = CollectorLifecycleManager(self.logger_manager) + self.health_monitor = ManagerHealthMonitor( + global_health_check_interval, self.logger_manager, self.lifecycle_manager) + self.stats_tracker = ManagerStatsTracker( + 30.0, self.logger_manager, self.lifecycle_manager, self.health_monitor) # Manager state self.status = ManagerStatus.STOPPED self._running = False self._tasks: Set[asyncio.Task] = set() - # Collector management - self._collectors: Dict[str, BaseDataCollector] = {} - self._collector_configs: Dict[str, CollectorConfig] = {} - self._enabled_collectors: Set[str] = set() - - # Health monitoring - self._last_global_check = datetime.now(timezone.utc) - self._global_health_task = None - - # Statistics - self._stats = { - 'total_collectors': 0, - 'running_collectors': 0, - 'failed_collectors': 0, - 'restarts_performed': 0, - 'last_global_check': None, - 'uptime_start': None - } - - if self.logger and not self.log_errors_only: - self.logger.info(f"Initialized collector manager: {manager_name}") + if self.logger_manager.is_debug_enabled(): + self.logger_manager.log_info(f"Initialized collector manager: {manager_name}") - def _log_debug(self, message: str) -> None: - """Log debug message if logger is available and not in errors-only mode.""" - if self.logger and not self.log_errors_only: - self.logger.debug(message) - - def _log_info(self, message: str) -> None: - """Log info message if logger is available and not in errors-only mode.""" - if self.logger and not self.log_errors_only: - self.logger.info(message) - - def _log_warning(self, message: str) -> None: - """Log warning message if logger is available and not in errors-only mode.""" - if self.logger and not self.log_errors_only: - self.logger.warning(message) - - def _log_error(self, message: str, exc_info: bool = False) -> None: - """Log error message if logger is available (always logs errors regardless of log_errors_only).""" - if self.logger: - self.logger.error(message, exc_info=exc_info) - - def _log_critical(self, message: str, exc_info: bool = False) -> None: - """Log critical message if logger is available (always logs critical regardless of log_errors_only).""" - if self.logger: - self.logger.critical(message, exc_info=exc_info) - - def add_collector(self, - collector: BaseDataCollector, - config: Optional[CollectorConfig] = None) -> None: - """ - Add a collector to be managed. - - Args: - collector: Data collector instance - config: Optional configuration (will create default if not provided) - """ - # Use a more unique name to avoid duplicates - collector_name = f"{collector.exchange_name}_{int(time.time() * 1000000) % 1000000}" - - # Ensure unique name - counter = 1 - base_name = collector_name - while collector_name in self._collectors: - collector_name = f"{base_name}_{counter}" - counter += 1 - - if config is None: - config = CollectorConfig( - name=collector_name, - exchange=collector.exchange_name, - symbols=list(collector.symbols), - data_types=[dt.value for dt in collector.data_types], - auto_restart=collector.auto_restart, - health_check_interval=collector._state_telemetry.health_check_interval - ) - - self._collectors[collector_name] = collector - self._collector_configs[collector_name] = config - - if config.enabled: - self._enabled_collectors.add(collector_name) - - self._stats['total_collectors'] = len(self._collectors) - - self._log_info(f"Added collector: {collector_name} ({collector.exchange_name}) - " - f"Symbols: {', '.join(collector.symbols)} - Enabled: {config.enabled}") + def add_collector(self, collector: BaseDataCollector, config: Optional[CollectorConfig] = None) -> None: + """Add a collector to be managed.""" + self.lifecycle_manager.add_collector(collector, config) def remove_collector(self, collector_name: str) -> bool: - """ - Remove a collector from management. - - Args: - collector_name: Name of the collector to remove - - Returns: - True if removed successfully, False if not found - """ - if collector_name not in self._collectors: - self._log_warning(f"Collector not found: {collector_name}") - return False - - # Stop the collector first (only if event loop is running) - collector = self._collectors[collector_name] - if collector.status != CollectorStatus.STOPPED: - try: - # Try to create task only if event loop is running - asyncio.create_task(collector.stop(force=True)) - except RuntimeError: - # No event loop running, just log - self._log_info(f"Collector {collector_name} will be removed without stopping (no event loop)") - - # Remove from management - del self._collectors[collector_name] - del self._collector_configs[collector_name] - self._enabled_collectors.discard(collector_name) - - self._stats['total_collectors'] = len(self._collectors) - - self._log_info(f"Removed collector: {collector_name}") - return True + """Remove a collector from management.""" + return self.lifecycle_manager.remove_collector(collector_name) def enable_collector(self, collector_name: str) -> bool: - """ - Enable a collector (will be started if manager is running). - - Args: - collector_name: Name of the collector to enable - - Returns: - True if enabled successfully, False if not found - """ - if collector_name not in self._collectors: - self._log_warning(f"Collector not found: {collector_name}") - return False - - self._enabled_collectors.add(collector_name) - self._collector_configs[collector_name].enabled = True - - # Start the collector if manager is running (only if event loop is running) - if self._running: - try: - asyncio.create_task(self._start_collector(collector_name)) - except RuntimeError: - # No event loop running, will be started when manager starts - self._log_debug(f"Collector {collector_name} enabled but will start when manager starts") - - self._log_info(f"Enabled collector: {collector_name}") - return True + """Enable a collector (will be started if manager is running).""" + return self.lifecycle_manager.enable_collector(collector_name) def disable_collector(self, collector_name: str) -> bool: - """ - Disable a collector (will be stopped if running). - - Args: - collector_name: Name of the collector to disable - - Returns: - True if disabled successfully, False if not found - """ - if collector_name not in self._collectors: - self.logger.warning(f"Collector not found: {collector_name}") - return False - - self._enabled_collectors.discard(collector_name) - self._collector_configs[collector_name].enabled = False - - # Stop the collector (only if event loop is running) - collector = self._collectors[collector_name] - try: - asyncio.create_task(collector.stop(force=True)) - except RuntimeError: - # No event loop running, just log - self.logger.debug(f"Collector {collector_name} disabled but cannot stop (no event loop)") - - self.logger.info(f"Disabled collector: {collector_name}") - return True + """Disable a collector (will be stopped if running).""" + return self.lifecycle_manager.disable_collector(collector_name) async def start(self) -> bool: - """ - Start the collector manager and all enabled collectors. - - Returns: - True if started successfully, False otherwise - """ + """Start the collector manager and all enabled collectors.""" if self.status in [ManagerStatus.RUNNING, ManagerStatus.STARTING]: - self._log_warning("Collector manager is already running or starting") + self.logger_manager.log_warning("Collector manager is already running or starting") return True - self._log_info("Starting collector manager") + self.logger_manager.log_info("Starting collector manager") self.status = ManagerStatus.STARTING try: self._running = True - self._stats['uptime_start'] = datetime.now(timezone.utc) - # Start all enabled collectors - start_tasks = [] - for collector_name in self._enabled_collectors: - task = asyncio.create_task(self._start_collector(collector_name)) - start_tasks.append(task) + # Set running state for all components + self.lifecycle_manager.set_running_state(True) + self.health_monitor.set_running_state(True) + self.stats_tracker.set_running_state(True) - # Wait for all collectors to start (with timeout) - if start_tasks: - try: - await asyncio.wait_for(asyncio.gather(*start_tasks, return_exceptions=True), timeout=30.0) - except asyncio.TimeoutError: - self._log_warning("Some collectors took too long to start") + # Start collectors and monitoring + await self.lifecycle_manager.start_all_enabled_collectors() + await self.health_monitor.start_monitoring() - # Start global health monitoring - health_task = asyncio.create_task(self._global_health_monitor()) - self._tasks.add(health_task) - health_task.add_done_callback(self._tasks.discard) + # Track health monitoring task + health_task = self.health_monitor.get_health_task() + if health_task: + self._tasks.add(health_task) + health_task.add_done_callback(self._tasks.discard) + + # Start statistics cache updates + await self.stats_tracker.start_cache_updates() self.status = ManagerStatus.RUNNING - self._log_info(f"Collector manager started - Managing {len(self._enabled_collectors)} collectors") + enabled_count = len(self.lifecycle_manager.get_enabled_collectors()) + self.logger_manager.log_info(f"Collector manager started - Managing {enabled_count} collectors") return True except Exception as e: self.status = ManagerStatus.ERROR - self._log_error(f"Failed to start collector manager: {e}") + self.logger_manager.log_error(f"Failed to start collector manager: {e}", exc_info=True) return False async def stop(self) -> None: """Stop the collector manager and all collectors.""" if self.status == ManagerStatus.STOPPED: - self._log_warning("Collector manager is already stopped") + self.logger_manager.log_warning("Collector manager is already stopped") return - self._log_info("Stopping collector manager") + self.logger_manager.log_info("Stopping collector manager") self.status = ManagerStatus.STOPPING self._running = False try: + # Set running state for all components + self.lifecycle_manager.set_running_state(False) + self.health_monitor.set_running_state(False) + self.stats_tracker.set_running_state(False) + + # Stop monitoring and statistics + await self.health_monitor.stop_monitoring() + await self.stats_tracker.stop_cache_updates() + # Cancel manager tasks for task in list(self._tasks): task.cancel() - if self._tasks: await asyncio.gather(*self._tasks, return_exceptions=True) # Stop all collectors - stop_tasks = [] - for collector in self._collectors.values(): - task = asyncio.create_task(collector.stop(force=True)) - stop_tasks.append(task) - - # Wait for all collectors to stop (with timeout) - if stop_tasks: - try: - await asyncio.wait_for(asyncio.gather(*stop_tasks, return_exceptions=True), timeout=30.0) - except asyncio.TimeoutError: - self._log_warning("Some collectors took too long to stop") + await self.lifecycle_manager.stop_all_collectors() self.status = ManagerStatus.STOPPED - self._log_info("Collector manager stopped") + self.logger_manager.log_info("Collector manager stopped") except Exception as e: self.status = ManagerStatus.ERROR - self._log_error(f"Error stopping collector manager: {e}") + self.logger_manager.log_error(f"Error stopping collector manager: {e}", exc_info=True) async def restart_collector(self, collector_name: str) -> bool: - """ - Restart a specific collector. - - Args: - collector_name: Name of the collector to restart - - Returns: - True if restarted successfully, False otherwise - """ - if collector_name not in self._collectors: - self._log_warning(f"Collector not found: {collector_name}") - return False - - collector = self._collectors[collector_name] - self._log_info(f"Restarting collector: {collector_name}") - - try: - success = await collector.restart() - if success: - self._stats['restarts_performed'] += 1 - self._log_info(f"Successfully restarted collector: {collector_name}") - else: - self._log_error(f"Failed to restart collector: {collector_name}") - return success - - except Exception as e: - self._log_error(f"Error restarting collector {collector_name}: {e}") - return False - - async def _start_collector(self, collector_name: str) -> bool: - """ - Start a specific collector. - - Args: - collector_name: Name of the collector to start - - Returns: - True if started successfully, False otherwise - """ - if collector_name not in self._collectors: - self._log_warning(f"Collector not found: {collector_name}") - return False - - collector = self._collectors[collector_name] - - try: - success = await collector.start() - if success: - self._log_info(f"Started collector: {collector_name}") - else: - self._log_error(f"Failed to start collector: {collector_name}") - return success - - except Exception as e: - self._log_error(f"Error starting collector {collector_name}: {e}") - return False - - async def _global_health_monitor(self) -> None: - """Global health monitoring for all collectors.""" - self._log_debug("Starting global health monitor") - - while self._running: - try: - await asyncio.sleep(self.global_health_check_interval) - - self._last_global_check = datetime.now(timezone.utc) - self._stats['last_global_check'] = self._last_global_check - - # Check each enabled collector - running_count = 0 - failed_count = 0 - - for collector_name in self._enabled_collectors: - collector = self._collectors[collector_name] - health_status = collector.get_health_status() - - if health_status['is_healthy'] and collector.status == CollectorStatus.RUNNING: - running_count += 1 - elif not health_status['is_healthy']: - failed_count += 1 - self._log_warning(f"Collector {collector_name} is unhealthy: {health_status['issues']}") - - # Auto-restart if needed and not already restarting - if (collector.auto_restart and - collector.status not in [CollectorStatus.STARTING, CollectorStatus.STOPPING]): - self._log_info(f"Auto-restarting unhealthy collector: {collector_name}") - asyncio.create_task(self.restart_collector(collector_name)) - - # Update global statistics - self._stats['running_collectors'] = running_count - self._stats['failed_collectors'] = failed_count - - self._log_debug(f"Health check complete - Running: {running_count}, Failed: {failed_count}") - - except asyncio.CancelledError: - self._log_debug("Global health monitor cancelled") - break - except Exception as e: - self._log_error(f"Error in global health monitor: {e}") - await asyncio.sleep(self.global_health_check_interval) - - def get_status(self) -> Dict[str, Any]: - """ - Get manager status and statistics. - - Returns: - Dictionary containing status information - """ - uptime_seconds = None - if self._stats['uptime_start']: - uptime_seconds = (datetime.now(timezone.utc) - self._stats['uptime_start']).total_seconds() - - # Get individual collector statuses - collector_statuses = {} - for name, collector in self._collectors.items(): - collector_statuses[name] = { - 'status': collector.status.value, - 'enabled': name in self._enabled_collectors, - 'health': collector.get_health_status() - } - - return { - 'manager_status': self.status.value, - 'uptime_seconds': uptime_seconds, - 'statistics': self._stats, - 'collectors': collector_statuses, - 'enabled_collectors': list(self._enabled_collectors), - 'total_collectors': len(self._collectors) - } - - def get_collector_status(self, collector_name: str) -> Optional[Dict[str, Any]]: - """ - Get status for a specific collector. - - Args: - collector_name: Name of the collector - - Returns: - Collector status dict or None if not found - """ - if collector_name not in self._collectors: - return None - - collector = self._collectors[collector_name] - return { - 'name': collector_name, - 'config': self._collector_configs[collector_name].__dict__, - 'status': collector.get_status(), - 'health': collector.get_health_status() - } - - def list_collectors(self) -> List[str]: - """ - List all managed collector names. - - Returns: - List of collector names - """ - return list(self._collectors.keys()) - - def get_running_collectors(self) -> List[str]: - """ - Get names of currently running collectors. - - Returns: - List of running collector names - """ - running = [] - for name, collector in self._collectors.items(): - if collector.status == CollectorStatus.RUNNING: - running.append(name) - return running - - def get_failed_collectors(self) -> List[str]: - """ - Get names of failed or unhealthy collectors. - - Returns: - List of failed collector names - """ - failed = [] - for name, collector in self._collectors.items(): - health_status = collector.get_health_status() - if not health_status['is_healthy']: - failed.append(name) - return failed + """Restart a specific collector.""" + return await self.lifecycle_manager.restart_collector(collector_name) async def restart_all_collectors(self) -> Dict[str, bool]: - """ - Restart all enabled collectors. - - Returns: - Dictionary mapping collector names to restart success status - """ - self.logger.info("Restarting all enabled collectors") - - results = {} - restart_tasks = [] - - for collector_name in self._enabled_collectors: - task = asyncio.create_task(self.restart_collector(collector_name)) - restart_tasks.append((collector_name, task)) - - # Wait for all restarts to complete - for collector_name, task in restart_tasks: - try: - results[collector_name] = await task - except Exception as e: - self.logger.error(f"Error restarting {collector_name}: {e}") - results[collector_name] = False - - successful_restarts = sum(1 for success in results.values() if success) - self.logger.info(f"Restart complete - {successful_restarts}/{len(results)} collectors restarted successfully") - - return results + """Restart all enabled collectors.""" + return await self.lifecycle_manager.restart_all_collectors() + + def get_status(self, force_refresh: bool = False) -> Dict[str, Any]: + """Get manager status and statistics.""" + status_dict = self.stats_tracker.get_status(force_refresh) + status_dict['manager_status'] = self.status.value + return status_dict + + def get_collector_status(self, collector_name: str) -> Optional[Dict[str, Any]]: + """Get status for a specific collector.""" + return self.stats_tracker.get_collector_status(collector_name) + + def list_collectors(self) -> List[str]: + """List all managed collector names.""" + return self.stats_tracker.list_collectors() + + def get_running_collectors(self) -> List[str]: + """Get names of currently running collectors.""" + return self.stats_tracker.get_running_collectors() + + def get_failed_collectors(self) -> List[str]: + """Get names of failed or unhealthy collectors.""" + return self.stats_tracker.get_failed_collectors() def __repr__(self) -> str: """String representation of the manager.""" - return f"" \ No newline at end of file + return f"CollectorManager(name={self.manager_name}, status={self.status.value})" \ No newline at end of file diff --git a/data/collector_types.py b/data/collector_types.py new file mode 100644 index 0000000..62f4ab4 --- /dev/null +++ b/data/collector_types.py @@ -0,0 +1,30 @@ +""" +Data types and structures for collector management. + +This module contains shared data structures used across the collector management system. +""" + +from dataclasses import dataclass +from enum import Enum +from typing import List + + +class ManagerStatus(Enum): + """Status of the collector manager.""" + STOPPED = "stopped" + STARTING = "starting" + RUNNING = "running" + STOPPING = "stopping" + ERROR = "error" + + +@dataclass +class CollectorConfig: + """Configuration for a data collector.""" + name: str + exchange: str + symbols: List[str] + data_types: List[str] + auto_restart: bool = True + health_check_interval: float = 30.0 + enabled: bool = True \ No newline at end of file diff --git a/data/manager_components/__init__.py b/data/manager_components/__init__.py new file mode 100644 index 0000000..cef4dc3 --- /dev/null +++ b/data/manager_components/__init__.py @@ -0,0 +1,18 @@ +""" +Manager components package for collector management. + +This package contains specialized components that handle different aspects +of collector management following the Single Responsibility Principle. +""" + +from .collector_lifecycle_manager import CollectorLifecycleManager +from .manager_health_monitor import ManagerHealthMonitor +from .manager_stats_tracker import ManagerStatsTracker +from .manager_logger import ManagerLogger + +__all__ = [ + 'CollectorLifecycleManager', + 'ManagerHealthMonitor', + 'ManagerStatsTracker', + 'ManagerLogger' +] \ No newline at end of file diff --git a/data/manager_components/collector_lifecycle_manager.py b/data/manager_components/collector_lifecycle_manager.py new file mode 100644 index 0000000..148a7a1 --- /dev/null +++ b/data/manager_components/collector_lifecycle_manager.py @@ -0,0 +1,342 @@ +""" +Collector Lifecycle Manager for handling collector lifecycle operations. + +This module handles the lifecycle of data collectors including adding, removing, +enabling, disabling, starting, and restarting collectors. +""" + +import asyncio +import time +from typing import Dict, Set, Optional +from ..base_collector import BaseDataCollector, CollectorStatus +from ..collector_types import CollectorConfig + + +class CollectorLifecycleManager: + """Manages the lifecycle of data collectors.""" + + def __init__(self, logger_manager=None): + """ + Initialize the lifecycle manager. + + Args: + logger_manager: Logger manager instance for logging operations + """ + self.logger_manager = logger_manager + + # Collector storage + self._collectors: Dict[str, BaseDataCollector] = {} + self._collector_configs: Dict[str, CollectorConfig] = {} + self._enabled_collectors: Set[str] = set() + + # Manager state + self._running = False + self._stats = {'total_collectors': 0, 'restarts_performed': 0} + + def set_running_state(self, running: bool) -> None: + """Set the running state of the manager.""" + self._running = running + + def get_stats(self) -> Dict: + """Get lifecycle statistics.""" + return self._stats.copy() + + def add_collector(self, + collector: BaseDataCollector, + config: Optional[CollectorConfig] = None) -> None: + """ + Add a collector to be managed. + + Args: + collector: Data collector instance + config: Optional configuration (will create default if not provided) + """ + # Use a more unique name to avoid duplicates + collector_name = f"{collector.exchange_name}_{int(time.time() * 1000000) % 1000000}" + + # Ensure unique name + counter = 1 + base_name = collector_name + while collector_name in self._collectors: + collector_name = f"{base_name}_{counter}" + counter += 1 + + if config is None: + config = CollectorConfig( + name=collector_name, + exchange=collector.exchange_name, + symbols=list(collector.symbols), + data_types=[dt.value for dt in collector.data_types], + auto_restart=collector.auto_restart, + health_check_interval=collector._state_telemetry.health_check_interval + ) + + self._collectors[collector_name] = collector + self._collector_configs[collector_name] = config + + if config.enabled: + self._enabled_collectors.add(collector_name) + + self._stats['total_collectors'] = len(self._collectors) + + if self.logger_manager: + self.logger_manager.log_info( + f"Added collector: {collector_name} ({collector.exchange_name}) - " + f"Symbols: {', '.join(collector.symbols)} - Enabled: {config.enabled}" + ) + + def remove_collector(self, collector_name: str) -> bool: + """ + Remove a collector from management. + + Args: + collector_name: Name of the collector to remove + + Returns: + True if removed successfully, False if not found + """ + if collector_name not in self._collectors: + if self.logger_manager: + self.logger_manager.log_warning(f"Collector not found: {collector_name}") + return False + + # Stop the collector first (only if event loop is running) + collector = self._collectors[collector_name] + if collector.status != CollectorStatus.STOPPED: + try: + asyncio.create_task(collector.stop(force=True)) + except RuntimeError: + # No event loop running, just log + if self.logger_manager: + self.logger_manager.log_info( + f"Collector {collector_name} will be removed without stopping (no event loop)" + ) + + # Remove from management + del self._collectors[collector_name] + del self._collector_configs[collector_name] + self._enabled_collectors.discard(collector_name) + + self._stats['total_collectors'] = len(self._collectors) + + if self.logger_manager: + self.logger_manager.log_info(f"Removed collector: {collector_name}") + return True + + def enable_collector(self, collector_name: str) -> bool: + """ + Enable a collector (will be started if manager is running). + + Args: + collector_name: Name of the collector to enable + + Returns: + True if enabled successfully, False if not found + """ + if collector_name not in self._collectors: + if self.logger_manager: + self.logger_manager.log_warning(f"Collector not found: {collector_name}") + return False + + self._enabled_collectors.add(collector_name) + self._collector_configs[collector_name].enabled = True + + # Start the collector if manager is running (only if event loop is running) + if self._running: + try: + asyncio.create_task(self._start_collector(collector_name)) + except RuntimeError: + # No event loop running, will be started when manager starts + if self.logger_manager: + self.logger_manager.log_debug( + f"Collector {collector_name} enabled but will start when manager starts" + ) + + if self.logger_manager: + self.logger_manager.log_info(f"Enabled collector: {collector_name}") + return True + + def disable_collector(self, collector_name: str) -> bool: + """ + Disable a collector (will be stopped if running). + + Args: + collector_name: Name of the collector to disable + + Returns: + True if disabled successfully, False if not found + """ + if collector_name not in self._collectors: + if self.logger_manager: + self.logger_manager.log_warning(f"Collector not found: {collector_name}") + return False + + self._enabled_collectors.discard(collector_name) + self._collector_configs[collector_name].enabled = False + + # Stop the collector (only if event loop is running) + collector = self._collectors[collector_name] + try: + asyncio.create_task(collector.stop(force=True)) + except RuntimeError: + # No event loop running, just log + if self.logger_manager: + self.logger_manager.log_debug( + f"Collector {collector_name} disabled but cannot stop (no event loop)" + ) + + if self.logger_manager: + self.logger_manager.log_info(f"Disabled collector: {collector_name}") + return True + + async def _start_collector(self, collector_name: str) -> bool: + """ + Start a specific collector. + + Args: + collector_name: Name of the collector to start + + Returns: + True if started successfully, False otherwise + """ + if collector_name not in self._collectors: + if self.logger_manager: + self.logger_manager.log_warning(f"Collector not found: {collector_name}") + return False + + collector = self._collectors[collector_name] + + try: + success = await collector.start() + if success: + if self.logger_manager: + self.logger_manager.log_info(f"Started collector: {collector_name}") + else: + if self.logger_manager: + self.logger_manager.log_error(f"Failed to start collector: {collector_name}") + return success + + except Exception as e: + if self.logger_manager: + self.logger_manager.log_error(f"Error starting collector {collector_name}: {e}", exc_info=True) + return False + + async def restart_collector(self, collector_name: str) -> bool: + """ + Restart a specific collector. + + Args: + collector_name: Name of the collector to restart + + Returns: + True if restarted successfully, False otherwise + """ + if collector_name not in self._collectors: + if self.logger_manager: + self.logger_manager.log_warning(f"Collector not found: {collector_name}") + return False + + collector = self._collectors[collector_name] + if self.logger_manager: + self.logger_manager.log_info(f"Restarting collector: {collector_name}") + + try: + success = await collector.restart() + if success: + self._stats['restarts_performed'] += 1 + if self.logger_manager: + self.logger_manager.log_info(f"Successfully restarted collector: {collector_name}") + else: + if self.logger_manager: + self.logger_manager.log_error(f"Failed to restart collector: {collector_name}") + return success + + except Exception as e: + if self.logger_manager: + self.logger_manager.log_error(f"Error restarting collector {collector_name}: {e}", exc_info=True) + return False + + async def restart_all_collectors(self) -> Dict[str, bool]: + """ + Restart all enabled collectors. + + Returns: + Dictionary mapping collector names to restart success status + """ + if self.logger_manager: + self.logger_manager.log_info("Restarting all enabled collectors") + + results = {} + restart_tasks = [] + + for collector_name in self._enabled_collectors: + task = asyncio.create_task(self.restart_collector(collector_name)) + restart_tasks.append((collector_name, task)) + + # Wait for all restarts to complete + for collector_name, task in restart_tasks: + try: + results[collector_name] = await task + except Exception as e: + if self.logger_manager: + self.logger_manager.log_error(f"Error restarting {collector_name}: {e}", exc_info=True) + results[collector_name] = False + + successful_restarts = sum(1 for success in results.values() if success) + if self.logger_manager: + self.logger_manager.log_info( + f"Restart complete - {successful_restarts}/{len(results)} collectors restarted successfully" + ) + + return results + + async def start_all_enabled_collectors(self) -> None: + """Start all enabled collectors.""" + start_tasks = [] + for collector_name in self._enabled_collectors: + task = asyncio.create_task(self._start_collector(collector_name)) + start_tasks.append(task) + + # Wait for all collectors to start (with timeout) + if start_tasks: + try: + await asyncio.wait_for(asyncio.gather(*start_tasks, return_exceptions=True), timeout=30.0) + except asyncio.TimeoutError: + if self.logger_manager: + self.logger_manager.log_warning("Some collectors took too long to start") + + async def stop_all_collectors(self) -> None: + """Stop all collectors.""" + stop_tasks = [] + for collector in self._collectors.values(): + task = asyncio.create_task(collector.stop(force=True)) + stop_tasks.append(task) + + # Wait for all collectors to stop (with timeout) + if stop_tasks: + try: + await asyncio.wait_for(asyncio.gather(*stop_tasks, return_exceptions=True), timeout=30.0) + except asyncio.TimeoutError: + if self.logger_manager: + self.logger_manager.log_warning("Some collectors took too long to stop") + + # Getters for data access + def get_collectors(self) -> Dict[str, BaseDataCollector]: + """Get all collectors.""" + return self._collectors + + def get_collector_configs(self) -> Dict[str, CollectorConfig]: + """Get all collector configurations.""" + return self._collector_configs + + def get_enabled_collectors(self) -> Set[str]: + """Get enabled collector names.""" + return self._enabled_collectors + + def get_collector(self, name: str) -> Optional[BaseDataCollector]: + """Get a specific collector by name.""" + return self._collectors.get(name) + + def get_collector_config(self, name: str) -> Optional[CollectorConfig]: + """Get a specific collector config by name.""" + return self._collector_configs.get(name) \ No newline at end of file diff --git a/data/manager_components/manager_health_monitor.py b/data/manager_components/manager_health_monitor.py new file mode 100644 index 0000000..4341aeb --- /dev/null +++ b/data/manager_components/manager_health_monitor.py @@ -0,0 +1,185 @@ +""" +Manager Health Monitor for monitoring collector health and auto-recovery. + +This module handles health monitoring of data collectors including periodic health checks, +auto-restart functionality, and health status tracking. +""" + +import asyncio +from datetime import datetime, timezone +from typing import Set, Dict, Optional +from ..base_collector import BaseDataCollector, CollectorStatus + + +class ManagerHealthMonitor: + """Monitors the health of data collectors and provides auto-recovery.""" + + def __init__(self, + global_health_check_interval: float = 60.0, + logger_manager=None, + lifecycle_manager=None): + """ + Initialize the health monitor. + + Args: + global_health_check_interval: Seconds between global health checks + logger_manager: Logger manager instance for logging operations + lifecycle_manager: Lifecycle manager for restart operations + """ + self.global_health_check_interval = global_health_check_interval + self.logger_manager = logger_manager + self.lifecycle_manager = lifecycle_manager + + # Health monitoring state + self._running = False + self._last_global_check = datetime.now(timezone.utc) + self._global_health_task: Optional[asyncio.Task] = None + + # Health statistics + self._health_stats = { + 'last_global_check': None, + 'running_collectors': 0, + 'failed_collectors': 0 + } + + def set_running_state(self, running: bool) -> None: + """Set the running state of the monitor.""" + self._running = running + + def get_health_stats(self) -> Dict: + """Get health monitoring statistics.""" + return self._health_stats.copy() + + def get_last_global_check(self) -> datetime: + """Get the timestamp of the last global health check.""" + return self._last_global_check + + async def start_monitoring(self) -> None: + """Start the global health monitoring task.""" + if self._global_health_task and not self._global_health_task.done(): + if self.logger_manager: + self.logger_manager.log_warning("Health monitoring is already running") + return + + if self.logger_manager: + self.logger_manager.log_debug("Starting health monitoring") + + self._global_health_task = asyncio.create_task(self._global_health_monitor()) + + async def stop_monitoring(self) -> None: + """Stop the global health monitoring task.""" + if self._global_health_task and not self._global_health_task.done(): + self._global_health_task.cancel() + try: + await self._global_health_task + except asyncio.CancelledError: + pass + + if self.logger_manager: + self.logger_manager.log_debug("Health monitoring stopped") + + async def _global_health_monitor(self) -> None: + """Global health monitoring for all collectors.""" + if self.logger_manager: + self.logger_manager.log_debug("Starting global health monitor") + + while self._running: + try: + await asyncio.sleep(self.global_health_check_interval) + + self._last_global_check = datetime.now(timezone.utc) + self._health_stats['last_global_check'] = self._last_global_check + + # Perform health check if lifecycle manager is available + if self.lifecycle_manager: + await self._perform_health_check() + + except asyncio.CancelledError: + if self.logger_manager: + self.logger_manager.log_debug("Global health monitor cancelled") + break + except Exception as e: + if self.logger_manager: + self.logger_manager.log_error(f"Error in global health monitor: {e}", exc_info=True) + await asyncio.sleep(self.global_health_check_interval) + + async def _perform_health_check(self) -> None: + """Perform health check on all enabled collectors.""" + if not self.lifecycle_manager: + return + + enabled_collectors = self.lifecycle_manager.get_enabled_collectors() + collectors = self.lifecycle_manager.get_collectors() + + running_count = 0 + failed_count = 0 + + for collector_name in enabled_collectors: + if collector_name not in collectors: + continue + + collector = collectors[collector_name] + health_status = collector.get_health_status() + + if health_status['is_healthy'] and collector.status == CollectorStatus.RUNNING: + running_count += 1 + elif not health_status['is_healthy']: + failed_count += 1 + if self.logger_manager: + self.logger_manager.log_warning( + f"Collector {collector_name} is unhealthy: {health_status['issues']}" + ) + + # Auto-restart if needed and not already restarting + config = self.lifecycle_manager.get_collector_config(collector_name) + if (config and config.auto_restart and + collector.status not in [CollectorStatus.STARTING, CollectorStatus.STOPPING]): + + if self.logger_manager: + self.logger_manager.log_info(f"Auto-restarting unhealthy collector: {collector_name}") + + # Create restart task without awaiting to avoid blocking + asyncio.create_task(self.lifecycle_manager.restart_collector(collector_name)) + + # Update health statistics + self._health_stats['running_collectors'] = running_count + self._health_stats['failed_collectors'] = failed_count + + if self.logger_manager: + self.logger_manager.log_debug( + f"Health check complete - Running: {running_count}, Failed: {failed_count}" + ) + + async def perform_immediate_health_check(self) -> Dict[str, Dict]: + """ + Perform an immediate health check on all collectors. + + Returns: + Dictionary mapping collector names to their health status + """ + if not self.lifecycle_manager: + return {} + + enabled_collectors = self.lifecycle_manager.get_enabled_collectors() + collectors = self.lifecycle_manager.get_collectors() + + health_results = {} + + for collector_name in enabled_collectors: + if collector_name not in collectors: + continue + + collector = collectors[collector_name] + health_status = collector.get_health_status() + + health_results[collector_name] = { + 'is_healthy': health_status['is_healthy'], + 'status': collector.status.value, + 'issues': health_status.get('issues', []) + } + + return health_results + + def get_health_task(self) -> Optional[asyncio.Task]: + """Get the current health monitoring task.""" + return self._global_health_task \ No newline at end of file diff --git a/data/manager_components/manager_logger.py b/data/manager_components/manager_logger.py new file mode 100644 index 0000000..70ae3e5 --- /dev/null +++ b/data/manager_components/manager_logger.py @@ -0,0 +1,136 @@ +""" +Manager Logger for centralized logging operations. + +This module provides a centralized logging interface for the collector management system +with configurable log levels and error sanitization. +""" + +from typing import Optional, Any + + +class ManagerLogger: + """Centralized logger wrapper for collector management operations.""" + + def __init__(self, logger=None, log_errors_only: bool = False): + """ + Initialize the manager logger. + + Args: + logger: Logger instance. If None, no logging will be performed. + log_errors_only: If True and logger is provided, only log error-level messages + """ + self.logger = logger + self.log_errors_only = log_errors_only + + def _sanitize_error(self, message: str) -> str: + """ + Sanitize error message to prevent leaking internal details. + + Args: + message: Original error message + + Returns: + Sanitized error message + """ + # Remove sensitive patterns that might leak internal information + sensitive_patterns = [ + 'password=', + 'token=', + 'key=', + 'secret=', + 'auth=', + 'api_key=', + 'api_secret=', + 'access_token=', + 'refresh_token=' + ] + + sanitized = message + for pattern in sensitive_patterns: + if pattern.lower() in sanitized.lower(): + # Replace the value part after = with [REDACTED] + parts = sanitized.split(pattern) + if len(parts) > 1: + # Find the end of the value (space, comma, or end of string) + value_part = parts[1] + end_chars = [' ', ',', ')', ']', '}', '\n', '\t'] + end_idx = len(value_part) + + for char in end_chars: + char_idx = value_part.find(char) + if char_idx != -1 and char_idx < end_idx: + end_idx = char_idx + + # Replace the value with [REDACTED] + sanitized = parts[0] + pattern + '[REDACTED]' + value_part[end_idx:] + + return sanitized + + def log_debug(self, message: str) -> None: + """Log debug message if logger is available and not in errors-only mode.""" + if self.logger and not self.log_errors_only: + self.logger.debug(message) + + def log_info(self, message: str) -> None: + """Log info message if logger is available and not in errors-only mode.""" + if self.logger and not self.log_errors_only: + self.logger.info(message) + + def log_warning(self, message: str) -> None: + """Log warning message if logger is available and not in errors-only mode.""" + if self.logger and not self.log_errors_only: + self.logger.warning(message) + + def log_error(self, message: str, exc_info: bool = False) -> None: + """ + Log error message if logger is available (always logs errors regardless of log_errors_only). + + Args: + message: Error message to log + exc_info: Whether to include exception info + """ + if self.logger: + sanitized_message = self._sanitize_error(message) + self.logger.error(sanitized_message, exc_info=exc_info) + + def log_critical(self, message: str, exc_info: bool = False) -> None: + """ + Log critical message if logger is available (always logs critical regardless of log_errors_only). + + Args: + message: Critical message to log + exc_info: Whether to include exception info + """ + if self.logger: + sanitized_message = self._sanitize_error(message) + self.logger.critical(sanitized_message, exc_info=exc_info) + + def is_enabled(self) -> bool: + """Check if logging is enabled.""" + return self.logger is not None + + def is_debug_enabled(self) -> bool: + """Check if debug logging is enabled.""" + return self.logger is not None and not self.log_errors_only + + def set_logger(self, logger) -> None: + """Set or update the logger instance.""" + self.logger = logger + + def set_errors_only(self, errors_only: bool) -> None: + """Set or update the errors-only mode.""" + self.log_errors_only = errors_only + + def get_logger_info(self) -> dict: + """ + Get information about the logger configuration. + + Returns: + Dictionary with logger configuration details + """ + return { + 'logger_available': self.logger is not None, + 'logger_name': getattr(self.logger, 'name', None) if self.logger else None, + 'log_errors_only': self.log_errors_only, + 'debug_enabled': self.is_debug_enabled() + } \ No newline at end of file diff --git a/data/manager_components/manager_stats_tracker.py b/data/manager_components/manager_stats_tracker.py new file mode 100644 index 0000000..80c3204 --- /dev/null +++ b/data/manager_components/manager_stats_tracker.py @@ -0,0 +1,275 @@ +""" +Manager Statistics Tracker for managing collector statistics and caching. + +This module handles statistics collection, caching, and periodic updates +to optimize performance by avoiding real-time calculations on every status request. +""" + +import asyncio +from datetime import datetime, timezone +from typing import Dict, Any, Optional, List +from ..base_collector import BaseDataCollector, CollectorStatus + + +class ManagerStatsTracker: + """Manages statistics tracking and caching for the collector manager.""" + + def __init__(self, + cache_update_interval: float = 30.0, + logger_manager=None, + lifecycle_manager=None, + health_monitor=None): + """ + Initialize the statistics tracker. + + Args: + cache_update_interval: Seconds between cache updates + logger_manager: Logger manager instance for logging operations + lifecycle_manager: Lifecycle manager for accessing collectors + health_monitor: Health monitor for accessing health stats + """ + self.cache_update_interval = cache_update_interval + self.logger_manager = logger_manager + self.lifecycle_manager = lifecycle_manager + self.health_monitor = health_monitor + + # Statistics storage + self._stats = { + 'total_collectors': 0, + 'running_collectors': 0, + 'failed_collectors': 0, + 'restarts_performed': 0, + 'last_global_check': None, + 'uptime_start': None + } + + # Cache management + self._cached_status: Optional[Dict[str, Any]] = None + self._cache_last_updated: Optional[datetime] = None + self._cache_update_task: Optional[asyncio.Task] = None + self._running = False + + def set_running_state(self, running: bool) -> None: + """Set the running state of the tracker.""" + self._running = running + if running: + self._stats['uptime_start'] = datetime.now(timezone.utc) + else: + self._stats['uptime_start'] = None + + def get_stats(self) -> Dict[str, Any]: + """Get current statistics.""" + return self._stats.copy() + + def update_stat(self, key: str, value: Any) -> None: + """Update a specific statistic.""" + self._stats[key] = value + + def increment_stat(self, key: str, amount: int = 1) -> None: + """Increment a numeric statistic.""" + if key in self._stats and isinstance(self._stats[key], (int, float)): + self._stats[key] += amount + else: + self._stats[key] = amount + + async def start_cache_updates(self) -> None: + """Start the background cache update task.""" + if self._cache_update_task and not self._cache_update_task.done(): + if self.logger_manager: + self.logger_manager.log_warning("Cache updates are already running") + return + + if self.logger_manager: + self.logger_manager.log_debug("Starting statistics cache updates") + + self._cache_update_task = asyncio.create_task(self._cache_update_loop()) + + async def stop_cache_updates(self) -> None: + """Stop the background cache update task.""" + if self._cache_update_task and not self._cache_update_task.done(): + self._cache_update_task.cancel() + try: + await self._cache_update_task + except asyncio.CancelledError: + pass + + if self.logger_manager: + self.logger_manager.log_debug("Statistics cache updates stopped") + + async def _cache_update_loop(self) -> None: + """Background loop for updating cached statistics.""" + while self._running: + try: + await asyncio.sleep(self.cache_update_interval) + await self._update_cached_status() + + except asyncio.CancelledError: + if self.logger_manager: + self.logger_manager.log_debug("Statistics cache update loop cancelled") + break + except Exception as e: + if self.logger_manager: + self.logger_manager.log_error(f"Error in statistics cache update: {e}", exc_info=True) + await asyncio.sleep(self.cache_update_interval) + + async def _update_cached_status(self) -> None: + """Update the cached status information.""" + try: + # Update basic stats from lifecycle manager + if self.lifecycle_manager: + lifecycle_stats = self.lifecycle_manager.get_stats() + self._stats.update(lifecycle_stats) + + # Update health stats from health monitor + if self.health_monitor: + health_stats = self.health_monitor.get_health_stats() + self._stats.update(health_stats) + + # Calculate uptime + uptime_seconds = None + if self._stats['uptime_start']: + uptime_seconds = (datetime.now(timezone.utc) - self._stats['uptime_start']).total_seconds() + + # Build cached status + self._cached_status = self._build_status_dict(uptime_seconds) + self._cache_last_updated = datetime.now(timezone.utc) + + if self.logger_manager: + self.logger_manager.log_debug("Statistics cache updated") + + except Exception as e: + if self.logger_manager: + self.logger_manager.log_error(f"Failed to update statistics cache: {e}", exc_info=True) + + def _build_status_dict(self, uptime_seconds: Optional[float]) -> Dict[str, Any]: + """Build the complete status dictionary.""" + # Get individual collector statuses + collector_statuses = {} + if self.lifecycle_manager: + collectors = self.lifecycle_manager.get_collectors() + enabled_collectors = self.lifecycle_manager.get_enabled_collectors() + + for name, collector in collectors.items(): + collector_statuses[name] = { + 'status': collector.status.value, + 'enabled': name in enabled_collectors, + 'health': collector.get_health_status() + } + + return { + 'uptime_seconds': uptime_seconds, + 'statistics': self._stats.copy(), + 'collectors': collector_statuses, + 'enabled_collectors': list(self.lifecycle_manager.get_enabled_collectors()) if self.lifecycle_manager else [], + 'total_collectors': len(collector_statuses), + 'cache_last_updated': self._cache_last_updated.isoformat() if self._cache_last_updated else None + } + + def get_status(self, force_refresh: bool = False) -> Dict[str, Any]: + """ + Get manager status and statistics. + + Args: + force_refresh: If True, bypass cache and calculate real-time + + Returns: + Dictionary containing status information + """ + # Return cached status if available and not forcing refresh + if not force_refresh and self._cached_status and self._cache_last_updated: + # Check if cache is recent enough (within 2x the update interval) + cache_age = (datetime.now(timezone.utc) - self._cache_last_updated).total_seconds() + if cache_age <= (self.cache_update_interval * 2): + return self._cached_status.copy() + + # Calculate real-time status + uptime_seconds = None + if self._stats['uptime_start']: + uptime_seconds = (datetime.now(timezone.utc) - self._stats['uptime_start']).total_seconds() + + return self._build_status_dict(uptime_seconds) + + def get_collector_status(self, collector_name: str) -> Optional[Dict[str, Any]]: + """ + Get status for a specific collector. + + Args: + collector_name: Name of the collector + + Returns: + Collector status dict or None if not found + """ + if not self.lifecycle_manager: + return None + + collector = self.lifecycle_manager.get_collector(collector_name) + if not collector: + return None + + config = self.lifecycle_manager.get_collector_config(collector_name) + + return { + 'name': collector_name, + 'config': config.__dict__ if config else {}, + 'status': collector.get_status(), + 'health': collector.get_health_status() + } + + def list_collectors(self) -> List[str]: + """ + List all managed collector names. + + Returns: + List of collector names + """ + if self.lifecycle_manager: + return list(self.lifecycle_manager.get_collectors().keys()) + return [] + + def get_running_collectors(self) -> List[str]: + """ + Get names of currently running collectors. + + Returns: + List of running collector names + """ + running = [] + if self.lifecycle_manager: + collectors = self.lifecycle_manager.get_collectors() + for name, collector in collectors.items(): + if collector.status == CollectorStatus.RUNNING: + running.append(name) + return running + + def get_failed_collectors(self) -> List[str]: + """ + Get names of failed or unhealthy collectors. + + Returns: + List of failed collector names + """ + failed = [] + if self.lifecycle_manager: + collectors = self.lifecycle_manager.get_collectors() + for name, collector in collectors.items(): + health_status = collector.get_health_status() + if not health_status['is_healthy']: + failed.append(name) + return failed + + def force_cache_refresh(self) -> None: + """Force an immediate cache refresh.""" + if self._running: + asyncio.create_task(self._update_cached_status()) + + def get_cache_info(self) -> Dict[str, Any]: + """Get information about the cache state.""" + return { + 'cache_enabled': True, + 'cache_update_interval': self.cache_update_interval, + 'cache_last_updated': self._cache_last_updated.isoformat() if self._cache_last_updated else None, + 'cache_age_seconds': ( + (datetime.now(timezone.utc) - self._cache_last_updated).total_seconds() + if self._cache_last_updated else None + ) + } \ No newline at end of file diff --git a/scripts/production_clean.py b/scripts/production_clean.py index d6a4389..8a8d03c 100644 --- a/scripts/production_clean.py +++ b/scripts/production_clean.py @@ -2,39 +2,27 @@ """ Clean Production OKX Data Collector -This script runs OKX data collection with minimal console output -and comprehensive file logging for production use. - -Usage: - python scripts/production_clean.py [--hours duration] - -Examples: - # Run for 8 hours - python scripts/production_clean.py --hours 8 - - # Run overnight (12 hours) - python scripts/production_clean.py --hours 12 +Simplified production script using the new DataCollectionService architecture. +Provides clean console output with minimal logging for production environments. """ import asyncio -import argparse import signal import sys import time import json -from datetime import datetime +from typing import Optional from pathlib import Path -from typing import List, Optional # Add project root to path project_root = Path(__file__).parent.parent sys.path.insert(0, str(project_root)) -# Set environment variable to disable SQLAlchemy echo for clean production +# Set environment for clean production logging import os os.environ['DEBUG'] = 'false' -# Suppress SQLAlchemy verbose logging globally for production +# Suppress verbose SQLAlchemy logging import logging logging.getLogger('sqlalchemy').setLevel(logging.CRITICAL) logging.getLogger('sqlalchemy.engine').setLevel(logging.CRITICAL) @@ -42,319 +30,88 @@ logging.getLogger('sqlalchemy.pool').setLevel(logging.CRITICAL) logging.getLogger('sqlalchemy.dialects').setLevel(logging.CRITICAL) logging.getLogger('sqlalchemy.orm').setLevel(logging.CRITICAL) -from data.exchanges.okx import OKXCollector -from data.exchanges.okx.data_processor import OKXDataProcessor -from data.collector_manager import CollectorManager -from data.base_collector import DataType -from data.common import CandleProcessingConfig -from database.connection import init_database -from utils.logger import get_logger +from data.collection_service import run_data_collection_service -class ProductionManager: - """Production manager for OKX data collection.""" - - def __init__(self, config_path: str = "config/okx_config.json"): - self.config_path = config_path - self.config = self._load_config() - - # Configure clean logging - minimal console output, error-only file logs - self.logger = get_logger("production_manager", verbose=False) - - # Core components with error-only logging - self.collector_manager = CollectorManager(logger=self.logger, log_errors_only=True) - self.collectors: List[OKXCollector] = [] - - # Runtime state - self.running = False - self.start_time = None - self.statistics = { - 'collectors_created': 0, - 'uptime_seconds': 0 - } - - self.logger.info(f"🚀 Production Manager initialized with error-only logging") - self.logger.info(f"📁 Config: {config_path}") - - def _load_config(self) -> dict: - """Load configuration from JSON file.""" - try: - with open(self.config_path, 'r') as f: - config = json.load(f) - return config - except Exception as e: - print(f"❌ Failed to load config from {self.config_path}: {e}") - sys.exit(1) - - async def create_collectors(self) -> bool: - """Create collectors for all enabled trading pairs.""" - try: - enabled_pairs = [ - pair for pair in self.config['trading_pairs'] - if pair.get('enabled', True) - ] - - self.logger.info(f"🎯 Creating collectors for {len(enabled_pairs)} trading pairs...") - - for pair_config in enabled_pairs: - symbol = pair_config['symbol'] - data_types = [DataType(dt) for dt in pair_config.get('data_types', ['trade'])] - - # Get timeframes from config file for this trading pair - config_timeframes = pair_config.get('timeframes', ['1m', '5m']) - - self.logger.info(f"📈 Creating collector for {symbol} with timeframes: {config_timeframes}") - - # Create custom candle processing config using timeframes from config - candle_config = CandleProcessingConfig( - timeframes=config_timeframes, - emit_incomplete_candles=False, # Only complete candles - auto_save_candles=True - ) - - # Create custom data processor with error-only logging - data_processor = OKXDataProcessor( - symbol=symbol, - config=candle_config, - component_name=f"okx_processor_{symbol.replace('-', '_').lower()}", - logger=self.logger - ) - - # Create OKX collector with error-only logging - collector = OKXCollector( - symbol=symbol, - data_types=data_types, - component_name=f"okx_collector_{symbol.replace('-', '_').lower()}", - auto_restart=False, # Disable auto-restart to prevent health check interference - health_check_interval=self.config.get('data_collection', {}).get('health_check_interval', 120.0), - store_raw_data=self.config.get('data_collection', {}).get('store_raw_data', True), - logger=self.logger, - log_errors_only=False # Enable full logging temporarily to debug WebSocket issues - ) - - # Replace the default data processor with our custom one - collector._data_processor = data_processor - - # Add callbacks for processed data - data_processor.add_trade_callback(collector._on_trade_processed) - data_processor.add_candle_callback(collector._on_candle_processed) - - # Add to manager - self.collector_manager.add_collector(collector) - self.collectors.append(collector) - self.statistics['collectors_created'] += 1 - - self.logger.info(f"✅ Collector created for {symbol} with {'/'.join(config_timeframes)} timeframes") - - self.logger.info(f"🎉 All {len(self.collectors)} collectors created successfully") - # Get unique timeframes across all collectors for summary - all_timeframes = set() - for pair in enabled_pairs: +async def get_config_timeframes(config_path: str) -> str: + """Get timeframes from configuration for display.""" + try: + with open(config_path, 'r') as f: + config = json.load(f) + # Get unique timeframes from all enabled trading pairs + all_timeframes = set() + for pair in config.get('trading_pairs', []): + if pair.get('enabled', True): all_timeframes.update(pair.get('timeframes', ['1m', '5m'])) - self.logger.info(f"📊 Collectors configured with timeframes: {', '.join(sorted(all_timeframes))}") - return True - - except Exception as e: - self.logger.error(f"❌ Failed to create collectors: {e}") - return False - - async def start(self) -> bool: - """Start all collectors and begin data collection.""" - try: - self.start_time = time.time() - self.running = True - - self.logger.info("🚀 Starting production data collection...") - - # Initialize global database managers - self.logger.info("📊 Initializing database...") - init_database() - self.logger.info("✅ Database initialized successfully") - - # Start collector manager - success = await self.collector_manager.start() - if not success: - self.logger.error("❌ Failed to start collector manager") - return False - - self.logger.info("✅ All collectors started successfully") - self.logger.info("📊 Data collection is now active with built-in processing") - return True - - except Exception as e: - self.logger.error(f"❌ Failed to start collectors: {e}") - return False - - async def stop(self) -> None: - """Stop all collectors gracefully.""" - try: - self.logger.info("🛑 Stopping production data collection...") - self.running = False - - # Stop collector manager - await self.collector_manager.stop() - - self.logger.info("✅ All collectors stopped gracefully") - - except Exception as e: - self.logger.error(f"❌ Error during shutdown: {e}") + return ', '.join(sorted(all_timeframes)) + except: + return "configured timeframes" -async def run_clean_production(duration_hours: Optional[float] = None): +async def run_clean_production(duration_hours: Optional[float] = None) -> bool: """Run production collector with clean output.""" - # Global state for signal handling - shutdown_event = asyncio.Event() - manager = None - - def signal_handler(signum, frame): - print(f"\n📡 Shutdown signal received, stopping gracefully...") - shutdown_event.set() - - # Set up signal handlers - signal.signal(signal.SIGINT, signal_handler) - signal.signal(signal.SIGTERM, signal_handler) + # Configuration path - use the new service config format + config_path = "config/data_collection.json" try: - # Read config to show actual timeframes in banner - config_path = "config/okx_config.json" - try: - with open(config_path, 'r') as f: - config = json.load(f) - # Get unique timeframes from all enabled trading pairs - all_timeframes = set() - for pair in config.get('trading_pairs', []): - if pair.get('enabled', True): - all_timeframes.update(pair.get('timeframes', ['1m', '5m'])) - timeframes_str = ', '.join(sorted(all_timeframes)) - except: - timeframes_str = "configured timeframes" + # Get timeframes for display + timeframes_str = await get_config_timeframes(config_path) # Header - print("🚀 OKX PRODUCTION DATA COLLECTOR") + print("OKX PRODUCTION DATA COLLECTOR") print("="*50) if duration_hours: - print(f"⏱️ Duration: {duration_hours} hours") + print(f"Duration: {duration_hours} hours") else: - print(f"⏱️ Duration: Indefinite (until stopped)") - print(f"📊 Timeframes: {timeframes_str}") - print(f"💾 Database: Raw trades + aggregated candles") - print(f"📝 Logs: logs/ directory") + print(f"Duration: Indefinite (until stopped)") + print(f"Timeframes: {timeframes_str}") + print(f"Database: Raw trades + aggregated candles") + print(f"Logs: logs/ directory") print("="*50) - # Create manager - print("🎯 Initializing collector...") - manager = ProductionManager("config/okx_config.json") + # Start data collection using the new service + print("Starting data collection service...") + success = await run_data_collection_service(config_path, duration_hours) - # Create collectors - if not await manager.create_collectors(): - print("❌ Failed to create collectors") + if success: + print("Data collection completed successfully") + return True + else: + print("Data collection failed") return False - # Start data collection - print("🚀 Starting data collection...") - if not await manager.start(): - print("❌ Failed to start data collection") - return False - - # Running status - start_time = time.time() - print("✅ Data collection active!") - print(f"📈 Collecting: {len(manager.collectors)} trading pairs") - print(f"📊 Monitor: python scripts/monitor_clean.py") - if not duration_hours: - print("⏹️ Stop: Ctrl+C") - print("-" * 50) - - # Main monitoring loop - last_update = time.time() - update_interval = 600 # Update every 10 minutes - - while not shutdown_event.is_set(): - # Wait for shutdown or timeout - try: - await asyncio.wait_for(shutdown_event.wait(), timeout=1.0) - break - except asyncio.TimeoutError: - pass - - # Check duration if specified - current_time = time.time() - if duration_hours: - duration_seconds = int(duration_hours * 3600) - if current_time - start_time >= duration_seconds: - print(f"⏰ Completed {duration_hours} hour run") - break - - # Periodic status update - if current_time - last_update >= update_interval: - elapsed_hours = (current_time - start_time) / 3600 - if duration_hours: - remaining_hours = duration_hours - elapsed_hours - print(f"⏱️ Runtime: {elapsed_hours:.1f}h | Remaining: {remaining_hours:.1f}h") - else: - print(f"⏱️ Runtime: {elapsed_hours:.1f}h | Mode: Continuous") - last_update = current_time - - # Final summary - total_runtime = (time.time() - start_time) / 3600 - print(f"\n📊 COLLECTION COMPLETE") - print(f"⏱️ Total runtime: {total_runtime:.2f} hours") - print(f"📈 Collectors: {len(manager.collectors)} active") - print(f"📋 View results: python scripts/monitor_clean.py") - - return True - except Exception as e: - print(f"❌ Error: {e}") + print(f"Error: {e}") return False - - finally: - if manager: - print("🛑 Stopping collectors...") - await manager.stop() - print("✅ Shutdown complete") def main(): """Main entry point.""" - parser = argparse.ArgumentParser( - description="Clean Production OKX Data Collector", - formatter_class=argparse.RawDescriptionHelpFormatter, - epilog=""" -Examples: - # Run indefinitely (until stopped with Ctrl+C) - python scripts/production_clean.py - - # Run for 8 hours - python scripts/production_clean.py --hours 8 - - # Run overnight (12 hours) - python scripts/production_clean.py --hours 12 - """ - ) + import argparse + parser = argparse.ArgumentParser(description="Clean Production OKX Data Collector") parser.add_argument( - '--hours', + "--hours", type=float, - default=None, - help='Collection duration in hours (default: indefinite until stopped manually)' + help="Collection duration in hours (default: indefinite until stopped manually)" ) args = parser.parse_args() + # Validate arguments if args.hours is not None and args.hours <= 0: - print("❌ Duration must be positive") + print("Duration must be positive") sys.exit(1) try: success = asyncio.run(run_clean_production(args.hours)) sys.exit(0 if success else 1) except KeyboardInterrupt: - print("\n👋 Interrupted by user") + print("\nInterrupted by user") sys.exit(0) except Exception as e: - print(f"❌ Fatal error: {e}") + print(f"Fatal error: {e}") sys.exit(1) diff --git a/tasks/collector-service-tasks-optimization.md b/tasks/collector-service-tasks-optimization.md new file mode 100644 index 0000000..dc93d41 --- /dev/null +++ b/tasks/collector-service-tasks-optimization.md @@ -0,0 +1,129 @@ +## Relevant Files + +- `data/collector_manager.py` - Core manager for data collectors (refactored: 563→178 lines). +- `data/collection_service.py` - Main service for data collection. +- `data/collector_types.py` - Shared data types for collector management (new file). +- `data/manager_components/` - Component classes for modular manager architecture (new directory). +- `data/__init__.py` - Updated imports for new structure. +- `tests/test_collector_manager.py` - Unit tests for `collector_manager.py` (imports updated). +- `tests/test_data_collection_aggregation.py` - Integration tests (imports updated). +- `scripts/production_clean.py` - Production script (verified working). +- `scripts/start_data_collection.py` - Data collection script (verified working). + +## Code Review Analysis: `collection_service.py` & `collector_manager.py` + +### Overall Assessment +Both files show good foundational architecture but exceed the recommended file size limits and contain several areas for improvement. + +### 📏 File Size Violations +- **`collector_manager.py`**: 563 lines (❌ Exceeds 250-line limit by 125%) +- **`collection_service.py`**: 451 lines (❌ Exceeds 250-line limit by 80%) + +### 🔍 Function Size Analysis +**Functions Exceeding 50-Line Limit:** +**`collector_manager.py`:** +- `__init__()` - 65 lines +- `_global_health_monitor()` - 71 lines +- `get_status()` - 53 lines + +**`collection_service.py`:** +- `_create_default_config()` - 89 lines +- `run()` - 98 lines + +### 🏗️ Architecture & Design Issues +1. **Tight Coupling in CollectorManager** + - **Issue**: The manager class handles too many responsibilities (collector lifecycle, health monitoring, statistics, logging). + - **Solution**: Apply Single Responsibility Principle by creating dedicated component classes. +2. **Configuration Management Complexity** + - **Issue**: Configuration logic scattered across multiple methods. + - **Solution**: Dedicated configuration manager for centralized handling. + +### 🔒 Security & Error Handling Review +**Strengths:** +- Proper exception handling with context +- No hardcoded credentials +- Graceful shutdown handling +- Input validation in configuration + +**Areas for Improvement:** +1. **Error Information Leakage** + - **Issue**: Could leak internal details. + - **Solution**: Sanitize error messages before logging. +2. **Configuration File Security** + - **Issue**: No file permission validation. + - **Solution**: Add validation to ensure appropriate file permissions. + +### 🚀 Performance Optimization Opportunities +1. **Async Task Management** + - **Issue**: Potential memory leaks with untracked tasks. + - **Solution**: Implement proper task lifecycle management with a `TaskManager`. +2. **Statistics Collection Optimization** + - **Issue**: Statistics calculated on every status request. + - **Solution**: Use cached statistics with background updates via a `CachedStatsManager`. + +### 🧪 Testing & Maintainability +**Missing Test Coverage Areas:** +1. Collector manager state transitions +2. Health monitoring edge cases +3. Configuration validation +4. Signal handling +5. Concurrent collector operations + +### 📝 Documentation Improvements +1. **Missing API Documentation** + - **Issue**: Public methods and classes lack comprehensive docstrings. + - **Solution**: Add examples, thread safety, and performance considerations. +2. **Configuration Schema Documentation** + - **Issue**: No formal schema validation. + - **Solution**: Implement JSON schema validation for configurations. + +### 📊 Quality Metrics Summary +| Metric | Current | Target | Status | +|--------|---------|--------|--------| +| File Size | 563/451 lines | <250 lines | ❌ | +| Function Size | 5 functions >50 lines | 0 functions >50 lines | ❌ | +| Cyclomatic Complexity | Medium-High | Low-Medium | ⚠️ | +| Test Coverage | ~30% estimated | >80% | ❌ | +| Documentation | Basic | Comprehensive | ⚠️ | +| Error Handling | Good | Excellent | ✅ | + +## Tasks + +- [x] 1.0 Refactor `collector_manager.py` for Modularity and Readability + - [x] 1.1 Extract `ManagerStatus` and `CollectorConfig` dataclasses to `data/collector_types.py`. + - [x] 1.2 Create `data/manager_components/collector_lifecycle_manager.py` to handle `add_collector`, `remove_collector`, `enable_collector`, `disable_collector`, `_start_collector`, `restart_collector`, `restart_all_collectors`. + - [x] 1.3 Create `data/manager_components/manager_health_monitor.py` to encapsulate `_global_health_monitor` logic. + - [x] 1.4 Create `data/manager_components/manager_stats_tracker.py` to manage statistics in `get_status` and update `_stats`. + - [x] 1.5 Create `data/manager_components/manager_logger.py` to centralize logging methods (`_log_debug`, `_log_info`, `_log_warning`, `_log_error`, `_log_critical`). + - [x] 1.6 Update `CollectorManager` to use instances of these new component classes. + - [x] 1.7 Ensure `CollectorManager` `__init__` method is under 50 lines by delegating initialization to helper methods within the class or component classes. + +- [x] 2.0 Refactor `collection_service.py` for Improved Structure + - [x] 2.1 Create `config/service_config.py` to handle `_load_config` and `_create_default_config` logic, including schema validation. + - [x] 2.2 Create `data/collector_factory.py` to encapsulate `_create_collector` logic. + - [x] 2.3 Update `DataCollectionService` to use instances of these new component classes. + - [x] 2.4 Refactor `run()` method to be under 50 lines by extracting sub-logics (e.g., `_run_main_loop`). + - [x] 2.5 Test './scripts/start_data_collection.py' and './scripts/production_clean.py' to ensure they work as expected. + +- [ ] 3.0 Enhance Error Handling and Security + - [ ] 3.1 Implement a `_sanitize_error` method in `CollectorManager` and `DataCollectionService` to prevent leaking internal error details. + - [ ] 3.2 Add file permission validation for configuration files in `config/service_config.py`. + - [ ] 3.3 Review all `try-except` blocks to ensure specific exceptions are caught rather than broad `Exception`. + - [ ] 3.4 Ensure all logger calls include `exc_info=True` for error and critical logs. + - [ ] 3.5 Test './scripts/start_data_collection.py' and './scripts/production_clean.py' to ensure they work as expected. + + +- [ ] 4.0 Optimize Performance and Resource Management + - [ ] 4.1 Implement a `TaskManager` class in `utils/async_task_manager.py` to manage and track `asyncio.Task` instances in `CollectorManager` and `DataCollectionService`. + - [ ] 4.2 Introduce a `CachedStatsManager` in `data/manager_components/manager_stats_tracker.py` for `CollectorManager` to cache statistics and update them periodically instead of on every `get_status` call. + - [ ] 4.3 Review all `asyncio.sleep` calls for optimal intervals. + - [ ] 4.4 Test './scripts/start_data_collection.py' and './scripts/production_clean.py' to ensure they work as expected. + +- [ ] 5.0 Improve Documentation and Test Coverage + - [ ] 5.1 Add comprehensive docstrings to all public methods and classes in `CollectorManager` and `DataCollectionService`, including examples, thread safety notes, and performance considerations. + - [ ] 5.2 Create new unit test files: `tests/data/manager_components/test_collector_lifecycle_manager.py`, `tests/data/manager_components/test_manager_health_monitor.py`, `tests/data/manager_components/test_manager_stats_tracker.py`, `tests/config/test_service_config.py`, `tests/data/test_collector_factory.py`. + - [ ] 5.3 Write unit tests for all new components (lifecycle manager, health monitor, stats tracker, service config, collector factory). + - [ ] 5.4 Enhance existing tests or create new ones for `CollectorManager` to cover state transitions, health monitoring edge cases, and concurrent operations. + - [ ] 5.5 Enhance existing tests or create new ones for `DataCollectionService` to cover configuration validation, service lifecycle, and signal handling. + - [ ] 5.6 Ensure all tests use `uv run pytest` and verify passing. + - [ ] 5.7 Test './scripts/start_data_collection.py' and './scripts/production_clean.py' to ensure they work as expected. \ No newline at end of file diff --git a/tests/data/collector/test_collector_manager.py b/tests/data/collector/test_collector_manager.py index afc76bc..d99e0a3 100644 --- a/tests/data/collector/test_collector_manager.py +++ b/tests/data/collector/test_collector_manager.py @@ -8,7 +8,8 @@ from datetime import datetime, timezone from unittest.mock import AsyncMock, MagicMock from utils.logger import get_logger -from data.collector_manager import CollectorManager, ManagerStatus, CollectorConfig +from data.collector_manager import CollectorManager +from data.collector_types import ManagerStatus, CollectorConfig from data.base_collector import BaseDataCollector, DataType, CollectorStatus diff --git a/tests/test_data_collection_aggregation.py b/tests/test_data_collection_aggregation.py index 4f8ab7c..a158485 100644 --- a/tests/test_data_collection_aggregation.py +++ b/tests/test_data_collection_aggregation.py @@ -24,7 +24,8 @@ from collections import defaultdict # Import modules under test from data.base_collector import BaseDataCollector, DataType, MarketDataPoint, CollectorStatus -from data.collector_manager import CollectorManager, CollectorConfig +from data.collector_manager import CollectorManager +from data.collector_types import CollectorConfig from data.collection_service import DataCollectionService from data.exchanges.okx.collector import OKXCollector from data.exchanges.okx.data_processor import OKXDataProcessor, OKXDataValidator, OKXDataTransformer