""" OKX Data Collector implementation. This module provides the main OKX data collector class that extends BaseDataCollector, handling real-time market data collection for a single trading pair with robust error handling, health monitoring, and database integration. """ import asyncio from datetime import datetime, timezone from typing import Dict, List, Optional, Any from dataclasses import dataclass from ...base_collector import ( BaseDataCollector, DataType, CollectorStatus, MarketDataPoint, OHLCVData, DataValidationError, ConnectionError ) from ...common import ( StandardizedTrade, OHLCVCandle, CandleProcessingConfig ) from .websocket import ( OKXWebSocketClient, OKXSubscription, OKXChannelType, ConnectionState, OKXWebSocketError ) from .data_processor import OKXDataProcessor from database.operations import get_database_operations, DatabaseOperationError from database.models import MarketData, RawTrade @dataclass class OKXMarketData: """OKX-specific market data structure.""" symbol: str timestamp: datetime data_type: str channel: str raw_data: Dict[str, Any] class OKXCollector(BaseDataCollector): """ OKX data collector for real-time market data. This collector handles a single trading pair and collects real-time data including trades, orderbook, and ticker information from OKX exchange. Uses the new common data processing framework for validation, transformation, and aggregation. """ def __init__(self, symbol: str, data_types: Optional[List[DataType]] = None, component_name: Optional[str] = None, auto_restart: bool = True, health_check_interval: float = 30.0, store_raw_data: bool = True, force_update_candles: bool = False, timeframes: Optional[List[str]] = None, candle_config: Optional[CandleProcessingConfig] = None, logger = None, log_errors_only: bool = False): """ Initialize OKX collector for a single trading pair. Args: symbol: Trading symbol (e.g., 'BTC-USDT') data_types: Types of data to collect (default: [DataType.TRADE, DataType.ORDERBOOK]) component_name: Name for logging (default: f'okx_collector_{symbol}') auto_restart: Enable automatic restart on failures health_check_interval: Seconds between health checks store_raw_data: Whether to store raw data for debugging force_update_candles: If True, update existing candles; if False, keep existing candles unchanged timeframes: List of timeframes to collect (e.g., ['1s', '5s', '1m']) candle_config: Optional CandleProcessingConfig instance (will create one if not provided) logger: Logger instance for conditional logging (None for no logging) log_errors_only: If True and logger provided, only log error-level messages """ # Default data types if not specified if data_types is None: data_types = [DataType.TRADE, DataType.ORDERBOOK] # Component name for logging if component_name is None: component_name = f"okx_collector_{symbol.replace('-', '_').lower()}" # Initialize base collector super().__init__( exchange_name="okx", symbols=[symbol], data_types=data_types, timeframes=timeframes, # Pass timeframes to base collector component_name=component_name, auto_restart=auto_restart, health_check_interval=health_check_interval, logger=logger, log_errors_only=log_errors_only ) # OKX-specific settings self.symbol = symbol self.store_raw_data = store_raw_data self.force_update_candles = force_update_candles # WebSocket client self._ws_client: Optional[OKXWebSocketClient] = None # Data processor using new common framework self._data_processor = OKXDataProcessor( symbol, config=candle_config or CandleProcessingConfig(timeframes=self.timeframes), # Use provided config or create new one component_name=f"{component_name}_processor", logger=logger ) # Add callbacks for processed data self._data_processor.add_trade_callback(self._on_trade_processed) self._data_processor.add_candle_callback(self._on_candle_processed) # Database operations using new repository pattern self._db_operations = None # Data processing counters self._message_count = 0 self._processed_trades = 0 self._processed_candles = 0 self._error_count = 0 # OKX channel mapping self._channel_mapping = { DataType.TRADE: OKXChannelType.TRADES.value, DataType.ORDERBOOK: OKXChannelType.BOOKS5.value, DataType.TICKER: OKXChannelType.TICKERS.value } if logger: logger.info(f"{component_name}: Initialized OKX collector for {symbol} with data types: {[dt.value for dt in data_types]}") logger.info(f"{component_name}: Using timeframes: {self.timeframes}") logger.info(f"{component_name}: Using common data processing framework") async def connect(self) -> bool: """ Establish connection to OKX WebSocket API. Returns: True if connection successful, False otherwise """ try: if self.logger: self.logger.info(f"{self.component_name}: Connecting OKX collector for {self.symbol}") # Initialize database operations using repository pattern self._db_operations = get_database_operations(self.logger) # Create WebSocket client ws_component_name = f"okx_ws_{self.symbol.replace('-', '_').lower()}" self._ws_client = OKXWebSocketClient( component_name=ws_component_name, ping_interval=25.0, pong_timeout=10.0, max_reconnect_attempts=5, reconnect_delay=5.0, logger=self.logger # Pass the logger to enable ping/pong logging ) # Add message callback self._ws_client.add_message_callback(self._on_message) # Connect to WebSocket if not await self._ws_client.connect(use_public=True): if self.logger: self.logger.error(f"{self.component_name}: Failed to connect to OKX WebSocket") return False if self.logger: self.logger.info(f"{self.component_name}: Successfully connected OKX collector for {self.symbol}") return True except Exception as e: if self.logger: self.logger.error(f"{self.component_name}: Error connecting OKX collector for {self.symbol}: {e}") return False async def disconnect(self) -> None: """Disconnect from OKX WebSocket API.""" try: if self.logger: self.logger.info(f"{self.component_name}: Disconnecting OKX collector for {self.symbol}") if self._ws_client: await self._ws_client.disconnect() self._ws_client = None if self.logger: self.logger.info(f"{self.component_name}: Disconnected OKX collector for {self.symbol}") except Exception as e: if self.logger: self.logger.error(f"{self.component_name}: Error disconnecting OKX collector for {self.symbol}: {e}") async def subscribe_to_data(self, symbols: List[str], data_types: List[DataType]) -> bool: """ Subscribe to data streams for specified symbols and data types. Args: symbols: Trading symbols to subscribe to (should contain self.symbol) data_types: Types of data to subscribe to Returns: True if subscription successful, False otherwise """ if not self._ws_client or not self._ws_client.is_connected: if self.logger: self.logger.error(f"{self.component_name}: WebSocket client not connected") return False # Validate symbol if self.symbol not in symbols: if self.logger: self.logger.warning(f"{self.component_name}: Symbol {self.symbol} not in subscription list: {symbols}") return False try: # Build subscriptions subscriptions = [] for data_type in data_types: if data_type in self._channel_mapping: channel = self._channel_mapping[data_type] subscription = OKXSubscription( channel=channel, inst_id=self.symbol, enabled=True ) subscriptions.append(subscription) if self.logger: self.logger.debug(f"{self.component_name}: Added subscription: {channel} for {self.symbol}") else: if self.logger: self.logger.warning(f"{self.component_name}: Unsupported data type: {data_type}") if not subscriptions: if self.logger: self.logger.warning(f"{self.component_name}: No valid subscriptions to create") return False # Subscribe to channels success = await self._ws_client.subscribe(subscriptions) if success: if self.logger: self.logger.info(f"{self.component_name}: Successfully subscribed to {len(subscriptions)} channels for {self.symbol}") return True else: if self.logger: self.logger.error(f"{self.component_name}: Failed to subscribe to channels for {self.symbol}") return False except Exception as e: if self.logger: self.logger.error(f"{self.component_name}: Error subscribing to data for {self.symbol}: {e}") return False async def unsubscribe_from_data(self, symbols: List[str], data_types: List[DataType]) -> bool: """ Unsubscribe from data streams for specified symbols and data types. Args: symbols: Trading symbols to unsubscribe from data_types: Types of data to unsubscribe from Returns: True if unsubscription successful, False otherwise """ if not self._ws_client or not self._ws_client.is_connected: if self.logger: self.logger.warning(f"{self.component_name}: WebSocket client not connected") return True # Consider it successful if not connected try: # Build unsubscription list subscriptions = [] for data_type in data_types: if data_type in self._channel_mapping: channel = self._channel_mapping[data_type] subscription = OKXSubscription( channel=channel, inst_id=self.symbol, enabled=False # False for unsubscribe ) subscriptions.append(subscription) if not subscriptions: return True # Unsubscribe from channels success = await self._ws_client.unsubscribe(subscriptions) if success: if self.logger: self.logger.info(f"{self.component_name}: Successfully unsubscribed from {len(subscriptions)} channels for {self.symbol}") return True else: if self.logger: self.logger.error(f"{self.component_name}: Failed to unsubscribe from channels for {self.symbol}") return False except Exception as e: if self.logger: self.logger.error(f"{self.component_name}: Error unsubscribing from data for {self.symbol}: {e}") return False async def _process_message(self, message: Any) -> Optional[MarketDataPoint]: """ Process received message using the new data processor. Args: message: Raw message from WebSocket Returns: MarketDataPoint if processing successful, None otherwise """ if not isinstance(message, dict): if self.logger: self.logger.warning(f"{self.component_name}: Received non-dict message: {type(message)}") return None try: self._message_count += 1 # Use the new data processor for validation and processing success, market_data_points, errors = self._data_processor.validate_and_process_message( message, expected_symbol=self.symbol ) if not success: self._error_count += 1 if self.logger: self.logger.error(f"{self.component_name}: Message processing failed: {errors}") return None if errors: if self.logger: self.logger.warning(f"{self.component_name}: Message processing warnings: {errors}") # Store raw data if enabled (for debugging/compliance) if self.store_raw_data: if 'data' in message and 'arg' in message: await self._store_raw_data(message['arg'].get('channel', 'unknown'), message) # Store processed market data points in raw_trades table for data_point in market_data_points: await self._store_processed_data(data_point) # Return the first data point for compatibility (most use cases have single data point per message) return market_data_points[0] if market_data_points else None except Exception as e: self._error_count += 1 if self.logger: self.logger.error(f"{self.component_name}: Error processing message: {e}") return None async def _handle_messages(self) -> None: """Handle message processing in the background.""" # The new data processor handles messages through callbacks # This method exists for compatibility with BaseDataCollector # Update heartbeat to indicate the message loop is active self._last_heartbeat = datetime.now(timezone.utc) # Check if we're receiving WebSocket messages if self._ws_client and self._ws_client.is_connected: # Update last data received timestamp if WebSocket is connected and active self._last_data_received = datetime.now(timezone.utc) # Short sleep to prevent busy loop while maintaining heartbeat await asyncio.sleep(0.1) async def _store_processed_data(self, data_point: MarketDataPoint) -> None: """ Store raw market data in the raw_trades table. Args: data_point: Raw market data point (trade, orderbook, ticker) """ try: if not self._db_operations: return # Store raw market data points in raw_trades table using repository success = self._db_operations.raw_trades.insert_market_data_point(data_point) if success and self.logger: self.logger.debug(f"{self.component_name}: Stored raw data: {data_point.data_type.value} for {data_point.symbol}") except DatabaseOperationError as e: if self.logger: self.logger.error(f"{self.component_name}: Database error storing raw market data: {e}") except Exception as e: if self.logger: self.logger.error(f"{self.component_name}: Error storing raw market data: {e}") async def _store_completed_candle(self, candle: OHLCVCandle) -> None: """ Store completed OHLCV candle in the market_data table. Handles duplicate candles based on force_update_candles setting: - If force_update_candles=True: UPDATE existing records with latest values - If force_update_candles=False: IGNORE duplicates, keep existing records unchanged Args: candle: Completed OHLCV candle """ try: if not self._db_operations: return # Store completed candles using repository pattern success = self._db_operations.market_data.upsert_candle(candle, self.force_update_candles) if success and self.logger: action = "Updated" if self.force_update_candles else "Stored" self.logger.debug(f"{self.component_name}: {action} candle: {candle.symbol} {candle.timeframe} at {candle.end_time} (force_update={self.force_update_candles}) - OHLCV: {candle.open}/{candle.high}/{candle.low}/{candle.close}, Vol: {candle.volume}, Trades: {candle.trade_count}") except DatabaseOperationError as e: if self.logger: self.logger.error(f"{self.component_name}: Database error storing completed candle: {e}") # Log candle details for debugging self.logger.error(f"{self.component_name}: Failed candle details: {candle.symbol} {candle.timeframe} {candle.end_time} - OHLCV: {candle.open}/{candle.high}/{candle.low}/{candle.close}") self._error_count += 1 except Exception as e: if self.logger: self.logger.error(f"{self.component_name}: Error storing completed candle: {e}") # Log candle details for debugging self.logger.error(f"{self.component_name}: Failed candle details: {candle.symbol} {candle.timeframe} {candle.end_time} - OHLCV: {candle.open}/{candle.high}/{candle.low}/{candle.close}") self._error_count += 1 async def _store_raw_data(self, channel: str, raw_message: Dict[str, Any]) -> None: """ Store raw WebSocket data for debugging in raw_trades table. Args: channel: Channel name raw_message: Raw WebSocket message """ try: if not self._db_operations or 'data' not in raw_message: return # Store each data item as a separate raw data record using repository for data_item in raw_message['data']: success = self._db_operations.raw_trades.insert_raw_websocket_data( exchange="okx", symbol=self.symbol, data_type=f"raw_{channel}", # Prefix with 'raw_' to distinguish from processed data raw_data=data_item, timestamp=datetime.now(timezone.utc) ) if not success and self.logger: self.logger.warning(f"{self.component_name}: Failed to store raw WebSocket data for {channel}") except DatabaseOperationError as e: if self.logger: self.logger.error(f"{self.component_name}: Database error storing raw WebSocket data: {e}") except Exception as e: if self.logger: self.logger.error(f"{self.component_name}: Error storing raw WebSocket data: {e}") def _on_message(self, message: Dict[str, Any]) -> None: """ Handle incoming WebSocket message. Args: message: WebSocket message from OKX """ try: # Update heartbeat and data received timestamps current_time = datetime.now(timezone.utc) self._last_heartbeat = current_time self._last_data_received = current_time self._message_count += 1 # Process message asynchronously asyncio.create_task(self._process_message(message)) except Exception as e: if self.logger: self.logger.error(f"{self.component_name}: Error handling WebSocket message: {e}") def _on_trade_processed(self, trade: StandardizedTrade) -> None: """ Callback for processed trades from data processor. Args: trade: Processed standardized trade """ self._processed_trades += 1 if self.logger: self.logger.debug(f"{self.component_name}: Processed trade: {trade.symbol} {trade.side} {trade.size}@{trade.price}") def _on_candle_processed(self, candle: OHLCVCandle) -> None: """ Callback for completed candles from data processor. Args: candle: Completed OHLCV candle """ self._processed_candles += 1 if self.logger: self.logger.debug(f"{self.component_name}: Completed candle: {candle.symbol} {candle.timeframe} O:{candle.open} H:{candle.high} L:{candle.low} C:{candle.close} V:{candle.volume}") # Store completed candle in market_data table if candle.is_complete: asyncio.create_task(self._store_completed_candle(candle)) def get_status(self) -> Dict[str, Any]: """ Get current collector status including processing statistics. Returns: Dictionary containing collector status information """ base_status = super().get_status() # Add OKX-specific status okx_status = { "symbol": self.symbol, "websocket_connected": self._ws_client.is_connected if self._ws_client else False, "websocket_state": self._ws_client.connection_state.value if self._ws_client else "disconnected", "store_raw_data": self.store_raw_data, "force_update_candles": self.force_update_candles, "timeframes": self.timeframes, "processing_stats": { "messages_received": self._message_count, "trades_processed": self._processed_trades, "candles_processed": self._processed_candles, "errors": self._error_count } } # Add data processor statistics if self._data_processor: okx_status["data_processor_stats"] = self._data_processor.get_processing_stats() # Add WebSocket statistics if self._ws_client: okx_status["websocket_stats"] = self._ws_client.get_stats() # Merge with base status base_status.update(okx_status) return base_status def __repr__(self) -> str: """String representation of the collector.""" return f"OKXCollector(symbol='{self.symbol}', status='{self.status.value}', data_types={[dt.value for dt in self.data_types]})"