# Data Collector System Documentation ## Overview The Data Collector System provides a robust, scalable framework for collecting real-time market data from cryptocurrency exchanges. It features comprehensive health monitoring, automatic recovery, centralized management, and a modular exchange-based architecture designed for production trading environments. This documentation covers the **core collector components**. For the high-level service layer that orchestrates these collectors, see [Data Collection Service](../services/data_collection_service.md). ## Key Features ### πŸ—οΈ **Modular Exchange Architecture** - **Exchange-Based Organization**: Each exchange has its own implementation folder - **Factory Pattern**: Easy creation of collectors from any supported exchange - **Standardized Interface**: Consistent API across all exchange implementations - **Scalable Design**: Easy addition of new exchanges (Binance, Coinbase, etc.) ### πŸ”„ **Auto-Recovery & Health Monitoring** - **Heartbeat System**: Continuous health monitoring with configurable intervals - **Auto-Restart**: Automatic restart on failures with exponential backoff - **Connection Recovery**: Robust reconnection logic for network interruptions - **Data Freshness Monitoring**: Detects stale data and triggers recovery ### πŸŽ›οΈ **Centralized Management** - **CollectorManager**: Supervises multiple collectors with coordinated lifecycle - **Dynamic Control**: Enable/disable collectors at runtime without system restart - **Global Health Checks**: System-wide monitoring and alerting - **Graceful Shutdown**: Proper cleanup and resource management ### πŸ“Š **Comprehensive Monitoring** - **Real-time Status**: Detailed status reporting for all collectors - **Performance Metrics**: Message counts, uptime, error rates, restart counts - **Health Analytics**: Connection state, data freshness, error tracking - **Conditional Logging**: Enhanced logging with configurable verbosity (see [Logging System](logging.md)) - **Multi-Timeframe Support**: Sub-second to daily candle aggregation (1s, 5s, 10s, 15s, 30s, 1m, 5m, 15m, 1h, 4h, 1d) ### πŸ›’οΈ **Database Integration** - **Repository Pattern**: All database operations use the centralized `database/operations.py` module - **No Raw SQL**: Clean API through `MarketDataRepository` and `RawTradeRepository` classes - **Automatic Transaction Management**: Sessions, commits, and rollbacks handled automatically - **Configurable Duplicate Handling**: `force_update_candles` parameter controls duplicate behavior - **Real-time Storage**: Completed candles automatically saved to `market_data` table - **Raw Data Storage**: Optional raw WebSocket data storage via `RawTradeRepository` - **Custom Error Handling**: Proper exception handling with `DatabaseOperationError` - **Health Monitoring**: Built-in database health checks and statistics - **Connection Pooling**: Efficient database connection management through repositories ## Architecture ``` β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ CollectorManager β”‚ β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ β”‚ β”‚ Global Health Monitor β”‚ β”‚ β”‚ β”‚ β€’ System-wide health checks β”‚ β”‚ β”‚ β”‚ β€’ Auto-restart coordination β”‚ β”‚ β”‚ β”‚ β€’ Performance analytics β”‚ β”‚ β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β”‚ β”‚ β”‚ β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ β”‚ β”‚ OKX Collector β”‚ β”‚Binance Collectorβ”‚ β”‚ Custom β”‚ β”‚ β”‚ β”‚ β”‚ β”‚ β”‚ β”‚ Collector β”‚ β”‚ β”‚ β”‚ β€’ Health Monitorβ”‚ β”‚ β€’ Health Monitorβ”‚ β”‚ β€’ Health Mon β”‚ β”‚ β”‚ β”‚ β€’ Auto-restart β”‚ β”‚ β€’ Auto-restart β”‚ β”‚ β€’ Auto-resta β”‚ β”‚ β”‚ β”‚ β€’ Data Validate β”‚ β”‚ β€’ Data Validate β”‚ β”‚ β€’ Data Valid β”‚ β”‚ β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ Data Output β”‚ β”‚ β”‚ β”‚ β€’ Callbacks β”‚ β”‚ β€’ Redis Pub/Sub β”‚ β”‚ β€’ Database β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ ``` ### Exchange Module Structure The modular architecture organizes exchange implementations: ``` data/ β”œβ”€β”€ base_collector.py # Abstract base classes β”œβ”€β”€ collector_manager.py # Cross-platform collector manager β”œβ”€β”€ aggregator.py # Cross-exchange data aggregation β”œβ”€β”€ exchanges/ # Exchange-specific implementations β”‚ β”œβ”€β”€ __init__.py # Main exports and factory β”‚ β”œβ”€β”€ registry.py # Exchange registry and capabilities β”‚ β”œβ”€β”€ factory.py # Factory pattern for collectors β”‚ └── okx/ # OKX implementation β”‚ β”œβ”€β”€ __init__.py # OKX exports β”‚ β”œβ”€β”€ collector.py # OKXCollector class β”‚ └── websocket.py # OKXWebSocketClient class β”‚ └── binance/ # Future: Binance implementation β”‚ β”œβ”€β”€ __init__.py β”‚ β”œβ”€β”€ collector.py β”‚ └── websocket.py ``` ## Quick Start ### 1. Using Exchange Factory (Recommended) ```python import asyncio from data.exchanges import ExchangeFactory, ExchangeCollectorConfig, create_okx_collector from data.base_collector import DataType from utils.logger import get_logger async def main(): # Create logger for the collector logger = get_logger('okx_collector', verbose=True) # Method 1: Using factory with configuration config = ExchangeCollectorConfig( exchange='okx', symbol='BTC-USDT', data_types=[DataType.TRADE, DataType.ORDERBOOK], auto_restart=True, health_check_interval=30.0, store_raw_data=True ) collector = ExchangeFactory.create_collector(config, logger=logger) # Method 2: Using convenience function okx_collector = create_okx_collector( symbol='ETH-USDT', data_types=[DataType.TRADE, DataType.ORDERBOOK], logger=logger ) # Add data callback def on_trade_data(data_point): print(f"Trade: {data_point.symbol} - {data_point.data}") collector.add_data_callback(DataType.TRADE, on_trade_data) # Start collector await collector.start() # Let it run await asyncio.sleep(60) # Stop collector await collector.stop() asyncio.run(main()) ``` ### 2. Creating Multiple Collectors with Manager ```python import asyncio from data.exchanges import ExchangeFactory, ExchangeCollectorConfig from data.base_collector import DataType from data.collector_manager import CollectorManager from utils.logger import get_logger async def main(): # Create manager with logging manager_logger = get_logger('collector_manager', verbose=True) manager = CollectorManager(logger=manager_logger) # Create multiple collectors using factory configs = [ ExchangeCollectorConfig('okx', 'BTC-USDT', [DataType.TRADE, DataType.ORDERBOOK]), ExchangeCollectorConfig('okx', 'ETH-USDT', [DataType.TRADE]), ExchangeCollectorConfig('okx', 'SOL-USDT', [DataType.ORDERBOOK]) ] # Create collectors with individual loggers for config in configs: collector_logger = get_logger(f'okx_{config.symbol.lower().replace("-", "_")}') collector = ExchangeFactory.create_collector(config, logger=collector_logger) manager.add_collector(collector) print(f"Created {len(manager.list_collectors())} collectors") # Start all collectors await manager.start() # Monitor await asyncio.sleep(60) # Stop all await manager.stop() asyncio.run(main()) ``` ## API Reference ### BaseDataCollector The abstract base class that all data collectors must inherit from. #### Constructor ```python def __init__(self, exchange_name: str, symbols: List[str], data_types: Optional[List[DataType]] = None, component_name: Optional[str] = None, auto_restart: bool = True, health_check_interval: float = 30.0, logger: Optional[logging.Logger] = None, log_errors_only: bool = False) ``` **Parameters:** - `exchange_name`: Name of the exchange (e.g., 'okx', 'binance') - `symbols`: List of trading symbols to collect data for - `data_types`: Types of data to collect (default: [DataType.CANDLE]) - `component_name`: Name for logging (default: based on exchange_name) - `auto_restart`: Enable automatic restart on failures (default: True) - `health_check_interval`: Seconds between health checks (default: 30.0) - `logger`: Logger instance for conditional logging (default: None) - `log_errors_only`: Only log error-level messages (default: False) #### Abstract Methods Must be implemented by subclasses: ```python async def connect(self) -> bool async def disconnect(self) -> None async def subscribe_to_data(self, symbols: List[str], data_types: List[DataType]) -> bool async def unsubscribe_from_data(self, symbols: List[str], data_types: List[DataType]) -> bool async def _process_message(self, message: Any) -> Optional[MarketDataPoint] async def _handle_messages(self) -> None ``` #### Public Methods ```python async def start() -> bool # Start the collector async def stop(force: bool = False) -> None # Stop the collector async def restart() -> bool # Restart the collector # Callback management def add_data_callback(self, data_type: DataType, callback: Callable) -> None def remove_data_callback(self, data_type: DataType, callback: Callable) -> None # Symbol management def add_symbol(self, symbol: str) -> None def remove_symbol(self, symbol: str) -> None # Status and monitoring def get_status(self) -> Dict[str, Any] def get_health_status(self) -> Dict[str, Any] # Data validation def validate_ohlcv_data(self, data: Dict[str, Any], symbol: str, timeframe: str) -> OHLCVData ``` #### Conditional Logging Methods All collectors support conditional logging (see [Logging System](logging.md) for details): ```python def _log_debug(self, message: str) -> None # Debug messages (if not errors-only) def _log_info(self, message: str) -> None # Info messages (if not errors-only) def _log_warning(self, message: str) -> None # Warning messages (if not errors-only) def _log_error(self, message: str, exc_info: bool = False) -> None # Always logged def _log_critical(self, message: str, exc_info: bool = False) -> None # Always logged ``` #### Status Information The `get_status()` method returns comprehensive status information: ```python { 'exchange': 'okx', 'status': 'running', # Current status 'should_be_running': True, # Desired state 'symbols': ['BTC-USDT', 'ETH-USDT'], # Configured symbols 'data_types': ['ticker'], # Data types being collected 'auto_restart': True, # Auto-restart enabled 'health': { 'time_since_heartbeat': 5.2, # Seconds since last heartbeat 'time_since_data': 2.1, # Seconds since last data 'max_silence_duration': 300.0 # Max allowed silence }, 'statistics': { 'messages_received': 1250, # Total messages received 'messages_processed': 1248, # Successfully processed 'errors': 2, # Error count 'restarts': 1, # Restart count 'uptime_seconds': 3600.5, # Current uptime 'reconnect_attempts': 0, # Current reconnect attempts 'last_message_time': '2023-...', # ISO timestamp 'connection_uptime': '2023-...', # Connection start time 'last_error': 'Connection failed', # Last error message 'last_restart_time': '2023-...' # Last restart time } } ``` #### Health Status The `get_health_status()` method provides detailed health information: ```python { 'is_healthy': True, # Overall health status 'issues': [], # List of current issues 'status': 'running', # Current collector status 'last_heartbeat': '2023-...', # Last heartbeat timestamp 'last_data_received': '2023-...', # Last data timestamp 'should_be_running': True, # Expected state 'is_running': True # Actual running state } ``` ### CollectorManager Manages multiple data collectors with coordinated lifecycle and health monitoring. #### Constructor ```python def __init__(self, manager_name: str = "collector_manager", global_health_check_interval: float = 60.0, restart_delay: float = 5.0, logger: Optional[logging.Logger] = None, log_errors_only: bool = False) ``` **Parameters:** - `manager_name`: Name for the manager (used in logging) - `global_health_check_interval`: Seconds between global health checks - `restart_delay`: Delay between restart attempts - `logger`: Logger instance for conditional logging (default: None) - `log_errors_only`: Only log error-level messages (default: False) #### Public Methods ```python # Collector management def add_collector(self, collector: BaseDataCollector, config: Optional[CollectorConfig] = None) -> None def remove_collector(self, collector_name: str) -> bool def enable_collector(self, collector_name: str) -> bool def disable_collector(self, collector_name: str) -> bool # Lifecycle management async def start() -> bool async def stop() -> None async def restart_collector(self, collector_name: str) -> bool async def restart_all_collectors(self) -> Dict[str, bool] # Status and monitoring def get_status(self) -> Dict[str, Any] def get_collector_status(self, collector_name: str) -> Optional[Dict[str, Any]] def list_collectors(self) -> List[str] def get_running_collectors(self) -> List[str] def get_failed_collectors(self) -> List[str] ``` ### CollectorConfig Configuration dataclass for collectors: ```python @dataclass class CollectorConfig: name: str # Unique collector name exchange: str # Exchange name symbols: List[str] # Trading symbols data_types: List[str] # Data types to collect auto_restart: bool = True # Enable auto-restart health_check_interval: float = 30.0 # Health check interval enabled: bool = True # Initially enabled ``` ### Data Types #### DataType Enum ```python class DataType(Enum): TICKER = "ticker" # Price and volume updates TRADE = "trade" # Individual trade executions ORDERBOOK = "orderbook" # Order book snapshots CANDLE = "candle" # OHLCV candle data BALANCE = "balance" # Account balance updates ``` #### MarketDataPoint Standardized market data structure: ```python @dataclass class MarketDataPoint: exchange: str # Exchange name symbol: str # Trading symbol timestamp: datetime # Data timestamp (UTC) data_type: DataType # Type of data data: Dict[str, Any] # Raw data payload ``` #### OHLCVData OHLCV (candlestick) data structure with validation: ```python @dataclass class OHLCVData: symbol: str # Trading symbol timeframe: str # Timeframe (1m, 5m, 1h, etc.) timestamp: datetime # Candle timestamp open: Decimal # Opening price high: Decimal # Highest price low: Decimal # Lowest price close: Decimal # Closing price volume: Decimal # Trading volume trades_count: Optional[int] = None # Number of trades ``` ## Health Monitoring ### Monitoring Levels The system provides multi-level health monitoring: 1. **Individual Collector Health** - Heartbeat monitoring (message loop activity) - Data freshness (time since last data received) - Connection state monitoring - Error rate tracking 2. **Manager-Level Health** - Global health checks across all collectors - Coordinated restart management - System-wide performance metrics - Resource utilization monitoring ### Health Check Intervals - **Individual Collector**: Configurable per collector (default: 30s) - **Global Manager**: Configurable for manager (default: 60s) - **Heartbeat Updates**: Updated with each message loop iteration - **Data Freshness**: Updated when data is received ### Auto-Restart Triggers Collectors are automatically restarted when: 1. **No Heartbeat**: Message loop becomes unresponsive 2. **Stale Data**: No data received within configured timeout 3. **Connection Failures**: WebSocket or API connection lost 4. **Error Status**: Collector enters ERROR or UNHEALTHY state 5. **Manual Trigger**: Explicit restart request ### Failure Handling ```python # Configure failure handling with conditional logging from utils.logger import get_logger logger = get_logger('my_collector', verbose=True) collector = MyCollector( symbols=["BTC-USDT"], auto_restart=True, # Enable auto-restart health_check_interval=30.0, # Check every 30 seconds logger=logger, # Enable logging log_errors_only=False # Log all levels ) # The collector will automatically: # 1. Detect failures within 30 seconds # 2. Attempt reconnection with exponential backoff # 3. Restart up to 5 times (configurable) # 4. Log all recovery attempts (if logger provided) # 5. Report status to manager ``` ## Configuration ### Environment Variables The system respects these environment variables: ```bash # Logging configuration (see logging.md for details) VERBOSE_LOGGING=true # Enable console logging LOG_TO_CONSOLE=true # Alternative verbose setting # Health monitoring DEFAULT_HEALTH_CHECK_INTERVAL=30 # Default health check interval (seconds) MAX_SILENCE_DURATION=300 # Max time without data (seconds) MAX_RECONNECT_ATTEMPTS=5 # Maximum reconnection attempts RECONNECT_DELAY=5 # Delay between reconnect attempts (seconds) ``` ### Programmatic Configuration ```python from utils.logger import get_logger # Configure individual collector with conditional logging logger = get_logger('custom_collector', verbose=True) collector = MyCollector( exchange_name="custom_exchange", symbols=["BTC-USDT", "ETH-USDT"], data_types=[DataType.TICKER, DataType.TRADE], auto_restart=True, health_check_interval=15.0, # Check every 15 seconds logger=logger, # Enable logging log_errors_only=False # Log all message types ) # Configure manager with conditional logging manager_logger = get_logger('production_manager', verbose=False) manager = CollectorManager( manager_name="production_manager", global_health_check_interval=30.0, # Global checks every 30s restart_delay=10.0, # 10s delay between restarts logger=manager_logger, # Manager logging log_errors_only=True # Only log errors for manager ) # Configure specific collector in manager config = CollectorConfig( name="primary_okx", exchange="okx", symbols=["BTC-USDT", "ETH-USDT", "SOL-USDT"], data_types=["ticker", "trade", "orderbook"], auto_restart=True, health_check_interval=20.0, enabled=True ) manager.add_collector(collector, config) ``` ## Best Practices ### 1. Collector Implementation with Conditional Logging ```python from utils.logger import get_logger from data.base_collector import BaseDataCollector, DataType class ProductionCollector(BaseDataCollector): def __init__(self, exchange_name: str, symbols: list, logger=None): super().__init__( exchange_name=exchange_name, symbols=symbols, data_types=[DataType.TICKER, DataType.TRADE], auto_restart=True, # Always enable auto-restart health_check_interval=30.0, # Reasonable interval logger=logger, # Pass logger for conditional logging log_errors_only=False # Log all levels ) # Connection management self.connection_pool = None self.rate_limiter = RateLimiter(100, 60) # 100 requests per minute # Data validation self.data_validator = DataValidator() # Performance monitoring self.metrics = MetricsCollector() async def connect(self) -> bool: """Implement robust connection logic.""" try: self._log_info("Establishing connection to exchange") # Use connection pooling for reliability self.connection_pool = await create_connection_pool( self.exchange_name, max_connections=5, retry_attempts=3 ) # Test connection await self.connection_pool.ping() self._log_info("Connection established successfully") return True except Exception as e: self._log_error(f"Connection failed: {e}", exc_info=True) return False async def _process_message(self, message) -> Optional[MarketDataPoint]: """Implement thorough data processing.""" try: # Rate limiting await self.rate_limiter.acquire() # Data validation if not self.data_validator.validate(message): self._log_warning(f"Invalid message format received") return None # Metrics collection self.metrics.increment('messages_processed') # Log detailed processing (only if not errors-only) self._log_debug(f"Processing message for {message.get('symbol', 'unknown')}") # Create standardized data point data_point = MarketDataPoint( exchange=self.exchange_name, symbol=message['symbol'], timestamp=self._parse_timestamp(message['timestamp']), data_type=DataType.TICKER, data=self._normalize_data(message) ) self._log_debug(f"Successfully processed data point for {data_point.symbol}") return data_point except Exception as e: self.metrics.increment('processing_errors') self._log_error(f"Message processing failed: {e}", exc_info=True) raise # Let health monitor handle it ``` ### 2. Error Handling ```python # Implement proper error handling with conditional logging class RobustCollector(BaseDataCollector): async def _handle_messages(self) -> None: """Handle messages with proper error management.""" try: # Check connection health if not await self._check_connection_health(): raise ConnectionError("Connection health check failed") # Receive message with timeout message = await asyncio.wait_for( self.websocket.receive(), timeout=30.0 # 30 second timeout ) # Process message data_point = await self._process_message(message) if data_point: await self._notify_callbacks(data_point) except asyncio.TimeoutError: # No data received - let health monitor handle self._log_warning("Message receive timeout") raise ConnectionError("Message receive timeout") except WebSocketError as e: # WebSocket specific errors self._log_error(f"WebSocket error: {e}") raise ConnectionError(f"WebSocket failed: {e}") except ValidationError as e: # Data validation errors - don't restart for these self._log_warning(f"Data validation failed: {e}") # Continue without raising - these are data issues, not connection issues except Exception as e: # Unexpected errors - trigger restart self._log_error(f"Unexpected error in message handling: {e}", exc_info=True) raise ``` ### 3. Manager Setup with Hierarchical Logging ```python from utils.logger import get_logger async def setup_production_system(): """Setup production collector system with conditional logging.""" # Create manager with its own logger manager_logger = get_logger('crypto_trading_system', verbose=True) manager = CollectorManager( manager_name="crypto_trading_system", global_health_check_interval=60.0, # Check every minute restart_delay=30.0, # 30s between restarts logger=manager_logger, # Manager logging log_errors_only=False # Log all levels for manager ) # Add primary data sources with individual loggers exchanges = ['okx', 'binance', 'coinbase'] symbols = ['BTC-USDT', 'ETH-USDT', 'SOL-USDT', 'AVAX-USDT'] for exchange in exchanges: # Create individual logger for each exchange exchange_logger = get_logger(f'{exchange}_collector', verbose=True) collector = create_collector( exchange, symbols, logger=exchange_logger # Individual collector logging ) # Configure for production config = CollectorConfig( name=f"{exchange}_primary", exchange=exchange, symbols=symbols, data_types=["ticker", "trade"], auto_restart=True, health_check_interval=30.0, enabled=True ) # Add callbacks for data processing collector.add_data_callback(DataType.TICKER, process_ticker_data) collector.add_data_callback(DataType.TRADE, process_trade_data) manager.add_collector(collector, config) # Start system success = await manager.start() if not success: raise RuntimeError("Failed to start collector system") return manager # Usage async def main(): manager = await setup_production_system() # Monitor system health while True: status = manager.get_status() if status['statistics']['failed_collectors'] > 0: # Alert on failures await send_alert(f"Collectors failed: {manager.get_failed_collectors()}") # Log status every 5 minutes (if manager has logging enabled) await asyncio.sleep(300) ``` ### 4. Monitoring Integration ```python # Integrate with monitoring systems and conditional logging import prometheus_client from utils.logger import get_logger class MonitoredCollector(BaseDataCollector): def __init__(self, *args, **kwargs): # Extract logger before passing to parent logger = kwargs.get('logger') super().__init__(*args, **kwargs) # Prometheus metrics self.messages_counter = prometheus_client.Counter( 'collector_messages_total', 'Total messages processed', ['exchange', 'symbol', 'type'] ) self.errors_counter = prometheus_client.Counter( 'collector_errors_total', 'Total errors', ['exchange', 'error_type'] ) self.uptime_gauge = prometheus_client.Gauge( 'collector_uptime_seconds', 'Collector uptime', ['exchange'] ) async def _notify_callbacks(self, data_point: MarketDataPoint): """Override to add metrics.""" # Update metrics self.messages_counter.labels( exchange=data_point.exchange, symbol=data_point.symbol, type=data_point.data_type.value ).inc() # Update uptime status = self.get_status() if status['statistics']['uptime_seconds']: self.uptime_gauge.labels( exchange=self.exchange_name ).set(status['statistics']['uptime_seconds']) # Log metrics update (only if debug logging enabled) self._log_debug(f"Updated metrics for {data_point.symbol}") # Call parent await super()._notify_callbacks(data_point) async def _handle_connection_error(self) -> bool: """Override to add error metrics.""" self.errors_counter.labels( exchange=self.exchange_name, error_type='connection' ).inc() # Always log connection errors self._log_error("Connection error occurred") return await super()._handle_connection_error() ``` ## Troubleshooting ### Common Issues #### 1. Collector Won't Start **Symptoms**: `start()` returns `False`, status shows `ERROR` **Solutions**: ```python # Check connection details with debugging from utils.logger import get_logger debug_logger = get_logger('debug_collector', verbose=True) collector = MyCollector(symbols=["BTC-USDT"], logger=debug_logger) success = await collector.start() if not success: status = collector.get_status() print(f"Error: {status['statistics']['last_error']}") # Common fixes: # - Verify API credentials # - Check network connectivity # - Validate symbol names # - Review exchange-specific requirements ``` #### 2. Frequent Restarts **Symptoms**: High restart count, intermittent data **Solutions**: ```python # Adjust health check intervals and enable detailed logging logger = get_logger('troubleshoot_collector', verbose=True) collector = MyCollector( symbols=["BTC-USDT"], health_check_interval=60.0, # Increase interval auto_restart=True, logger=logger, # Enable detailed logging log_errors_only=False # Log all message types ) # Check for: # - Network instability # - Exchange rate limiting # - Invalid message formats # - Resource constraints ``` #### 3. No Data Received **Symptoms**: Collector running but no callbacks triggered **Solutions**: ```python # Check data flow with debug logging logger = get_logger('data_debug', verbose=True) collector = MyCollector(symbols=["BTC-USDT"], logger=logger) def debug_callback(data_point): print(f"Received: {data_point}") collector.add_data_callback(DataType.TICKER, debug_callback) # Verify: # - Callback registration # - Symbol subscription # - Message parsing logic # - Exchange data availability ``` #### 4. Memory Leaks **Symptoms**: Increasing memory usage over time **Solutions**: ```python # Implement proper cleanup with logging class CleanCollector(BaseDataCollector): async def disconnect(self): """Ensure proper cleanup.""" self._log_info("Starting cleanup process") # Clear buffers if hasattr(self, 'message_buffer'): self.message_buffer.clear() self._log_debug("Cleared message buffer") # Close connections if self.websocket: await self.websocket.close() self.websocket = None self._log_debug("Closed WebSocket connection") # Clear callbacks for callback_list in self._data_callbacks.values(): callback_list.clear() self._log_debug("Cleared callbacks") await super().disconnect() self._log_info("Cleanup completed") ``` ## Exchange Factory System ### Overview The Exchange Factory system provides a standardized way to create data collectors for different exchanges. It implements the factory pattern to abstract the creation logic and provides a consistent interface across all exchanges. ### Exchange Registry The system maintains a registry of supported exchanges and their capabilities: ```python from data.exchanges import get_supported_exchanges, get_exchange_info # Get all supported exchanges exchanges = get_supported_exchanges() print(f"Supported exchanges: {exchanges}") # ['okx'] # Get exchange information okx_info = get_exchange_info('okx') print(f"OKX pairs: {okx_info['supported_pairs']}") print(f"OKX data types: {okx_info['supported_data_types']}") ``` ### Factory Configuration ```python from data.exchanges import ExchangeCollectorConfig, ExchangeFactory from data.base_collector import DataType from utils.logger import get_logger # Create configuration with conditional logging logger = get_logger('factory_collector', verbose=True) config = ExchangeCollectorConfig( exchange='okx', # Exchange name symbol='BTC-USDT', # Trading pair data_types=[DataType.TRADE, DataType.ORDERBOOK], # Data types auto_restart=True, # Auto-restart on failures health_check_interval=30.0, # Health check interval store_raw_data=True, # Store raw data for debugging custom_params={ # Exchange-specific parameters 'ping_interval': 25.0, 'max_reconnect_attempts': 5 } ) # Validate configuration is_valid = ExchangeFactory.validate_config(config) if is_valid: collector = ExchangeFactory.create_collector(config, logger=logger) ``` ### Exchange Capabilities Query what each exchange supports: ```python from data.exchanges import ExchangeFactory # Get supported trading pairs okx_pairs = ExchangeFactory.get_supported_pairs('okx') print(f"OKX supports: {okx_pairs}") # Get supported data types okx_data_types = ExchangeFactory.get_supported_data_types('okx') print(f"OKX data types: {okx_data_types}") ``` ### Convenience Functions Each exchange provides convenience functions for easy collector creation: ```python from data.exchanges import create_okx_collector from utils.logger import get_logger # Quick OKX collector creation with logging logger = get_logger('okx_btc_usdt', verbose=True) collector = create_okx_collector( symbol='BTC-USDT', data_types=[DataType.TRADE, DataType.ORDERBOOK], auto_restart=True, logger=logger ) ``` ## OKX Implementation ### OKX Collector Features The OKX collector provides: - **Real-time Data**: Live trades, orderbook, and ticker data - **Single Pair Focus**: Each collector handles one trading pair for better isolation - **Ping/Pong Management**: OKX-specific keepalive mechanism with proper format - **Raw Data Storage**: Optional storage of raw OKX messages for debugging - **Connection Resilience**: Robust reconnection logic for OKX WebSocket - **Conditional Logging**: Full integration with the logging system ### OKX Usage Examples ```python from utils.logger import get_logger # Direct OKX collector usage with conditional logging logger = get_logger('okx_collector', verbose=True) collector = OKXCollector( symbol='BTC-USDT', data_types=[DataType.TRADE, DataType.ORDERBOOK], auto_restart=True, health_check_interval=30.0, store_raw_data=True, logger=logger, # Enable logging log_errors_only=False # Log all levels ) # Factory pattern usage with error-only logging error_logger = get_logger('okx_critical', verbose=False) collector = create_okx_collector( symbol='BTC-USDT', data_types=[DataType.TRADE, DataType.ORDERBOOK], logger=error_logger, log_errors_only=True # Only log errors ) # Multiple collectors with different logging strategies configs = [ ExchangeCollectorConfig('okx', 'BTC-USDT', [DataType.TRADE]), ExchangeCollectorConfig('okx', 'ETH-USDT', [DataType.ORDERBOOK]) ] collectors = [] for config in configs: # Different logging for each collector if config.symbol == 'BTC-USDT': logger = get_logger('okx_btc', verbose=True) # Full logging else: logger = get_logger('okx_eth', verbose=False, log_errors_only=True) # Errors only collector = ExchangeFactory.create_collector(config, logger=logger) collectors.append(collector) ``` ### OKX Data Processing The OKX collector processes three main data types: #### Trade Data ```python # OKX trade message format { "arg": {"channel": "trades", "instId": "BTC-USDT"}, "data": [{ "tradeId": "12345678", "px": "50000.5", # Price "sz": "0.001", # Size "side": "buy", # Side (buy/sell) "ts": "1697123456789" # Timestamp (ms) }] } ``` #### Orderbook Data ```python # OKX orderbook message format (books5) { "arg": {"channel": "books5", "instId": "BTC-USDT"}, "data": [{ "asks": [["50001.0", "0.5", "0", "3"]], # [price, size, liquidated, orders] "bids": [["50000.0", "0.8", "0", "2"]], "ts": "1697123456789" }] } ``` #### Ticker Data ```python # OKX ticker message format { "arg": {"channel": "tickers", "instId": "BTC-USDT"}, "data": [{ "last": "50000.5", # Last price "askPx": "50001.0", # Best ask price "bidPx": "50000.0", # Best bid price "open24h": "49500.0", # 24h open "high24h": "50500.0", # 24h high "low24h": "49000.0", # 24h low "vol24h": "1234.567", # 24h volume "ts": "1697123456789" }] } ``` For comprehensive OKX documentation, see [OKX Collector Documentation](okx_collector.md). ## Integration Examples ### Django Integration ```python # Django management command with conditional logging from django.core.management.base import BaseCommand from data import CollectorManager from utils.logger import get_logger import asyncio class Command(BaseCommand): help = 'Start crypto data collectors' def handle(self, *args, **options): async def run_collectors(): # Create manager with logging manager_logger = get_logger('django_collectors', verbose=True) manager = CollectorManager("django_collectors", logger=manager_logger) # Add collectors with individual loggers from myapp.collectors import OKXCollector, BinanceCollector okx_logger = get_logger('django_okx', verbose=True) binance_logger = get_logger('django_binance', verbose=True, log_errors_only=True) manager.add_collector(OKXCollector(['BTC-USDT'], logger=okx_logger)) manager.add_collector(BinanceCollector(['ETH-USDT'], logger=binance_logger)) # Start system await manager.start() # Keep running try: while True: await asyncio.sleep(60) status = manager.get_status() self.stdout.write(f"Status: {status['statistics']}") except KeyboardInterrupt: await manager.stop() asyncio.run(run_collectors()) ``` ### FastAPI Integration ```python # FastAPI application with conditional logging from fastapi import FastAPI from data import CollectorManager from utils.logger import get_logger import asyncio app = FastAPI() manager = None @app.on_event("startup") async def startup_event(): global manager # Create manager with logging manager_logger = get_logger('fastapi_collectors', verbose=True) manager = CollectorManager("fastapi_collectors", logger=manager_logger) # Add collectors with error-only logging for production from collectors import OKXCollector collector_logger = get_logger('fastapi_okx', verbose=False, log_errors_only=True) collector = OKXCollector(['BTC-USDT', 'ETH-USDT'], logger=collector_logger) manager.add_collector(collector) # Start in background await manager.start() @app.on_event("shutdown") async def shutdown_event(): global manager if manager: await manager.stop() @app.get("/collector/status") async def get_collector_status(): return manager.get_status() @app.post("/collector/{name}/restart") async def restart_collector(name: str): success = await manager.restart_collector(name) return {"success": success} ``` ## Migration Guide ### From Manual Connection Management **Before** (manual management): ```python class OldCollector: def __init__(self): self.websocket = None self.running = False async def start(self): while self.running: try: self.websocket = await connect() await self.listen() except Exception as e: print(f"Error: {e}") await asyncio.sleep(5) # Manual retry ``` **After** (with BaseDataCollector and conditional logging): ```python from utils.logger import get_logger class NewCollector(BaseDataCollector): def __init__(self): logger = get_logger('new_collector', verbose=True) super().__init__( "exchange", ["BTC-USDT"], logger=logger, log_errors_only=False ) # Auto-restart and health monitoring included async def connect(self) -> bool: self._log_info("Connecting to exchange") self.websocket = await connect() self._log_info("Connection established") return True async def _handle_messages(self): message = await self.websocket.receive() self._log_debug(f"Received message: {message}") # Error handling and restart logic automatic ``` ### From Basic Monitoring **Before** (basic monitoring): ```python # Manual status tracking status = { 'connected': False, 'last_message': None, 'error_count': 0 } # Manual health checks async def health_check(): if time.time() - status['last_message'] > 300: print("No data for 5 minutes!") ``` **After** (comprehensive monitoring with conditional logging): ```python # Automatic health monitoring with logging logger = get_logger('monitored_collector', verbose=True) collector = MyCollector(["BTC-USDT"], logger=logger) # Rich status information status = collector.get_status() health = collector.get_health_status() # Automatic alerts and recovery with logging if not health['is_healthy']: print(f"Issues: {health['issues']}") # Auto-restart already triggered and logged ``` ## Related Documentation - [Data Collection Service](../services/data_collection_service.md) - High-level service orchestration - [Logging System](logging.md) - Conditional logging implementation - [Database Operations](../database/operations.md) - Database integration patterns - [Monitoring Guide](../monitoring/README.md) - System monitoring and alerting --- ## Support and Contributing ### Getting Help 1. **Check Logs**: Review logs in `./logs/` directory (see [Logging System](logging.md)) 2. **Status Information**: Use `get_status()` and `get_health_status()` methods 3. **Debug Mode**: Enable debug logging with conditional logging system 4. **Test with Demo**: Run `examples/collector_demo.py` to verify setup ### Contributing The data collector system is designed to be extensible. Contributions are welcome for: - New exchange implementations - Enhanced monitoring features - Performance optimizations - Additional data types - Integration examples - Logging system improvements ### License This documentation and the associated code are part of the Crypto Trading Bot Platform project. --- *For more information, see the main project documentation in `/docs/`.*