536 lines
22 KiB
Python
536 lines
22 KiB
Python
"""
|
|
OKX Data Collector implementation.
|
|
|
|
This module provides the main OKX data collector class that extends BaseDataCollector,
|
|
handling real-time market data collection for a single trading pair with robust
|
|
error handling, health monitoring, and database integration.
|
|
"""
|
|
|
|
import asyncio
|
|
from datetime import datetime, timezone
|
|
from typing import Dict, List, Optional, Any
|
|
from dataclasses import dataclass
|
|
|
|
from ...base_collector import (
|
|
BaseDataCollector, DataType, CollectorStatus, MarketDataPoint,
|
|
OHLCVData, DataValidationError, ConnectionError
|
|
)
|
|
from ...common import StandardizedTrade, OHLCVCandle
|
|
from .websocket import (
|
|
OKXWebSocketClient, OKXSubscription, OKXChannelType,
|
|
ConnectionState, OKXWebSocketError
|
|
)
|
|
from .data_processor import OKXDataProcessor
|
|
from database.operations import get_database_operations, DatabaseOperationError
|
|
from database.models import MarketData, RawTrade
|
|
|
|
|
|
@dataclass
|
|
class OKXMarketData:
|
|
"""OKX-specific market data structure."""
|
|
symbol: str
|
|
timestamp: datetime
|
|
data_type: str
|
|
channel: str
|
|
raw_data: Dict[str, Any]
|
|
|
|
|
|
class OKXCollector(BaseDataCollector):
|
|
"""
|
|
OKX data collector for real-time market data.
|
|
|
|
This collector handles a single trading pair and collects real-time data
|
|
including trades, orderbook, and ticker information from OKX exchange.
|
|
Uses the new common data processing framework for validation, transformation,
|
|
and aggregation.
|
|
"""
|
|
|
|
def __init__(self,
|
|
symbol: str,
|
|
data_types: Optional[List[DataType]] = None,
|
|
component_name: Optional[str] = None,
|
|
auto_restart: bool = True,
|
|
health_check_interval: float = 30.0,
|
|
store_raw_data: bool = True,
|
|
force_update_candles: bool = False,
|
|
logger = None,
|
|
log_errors_only: bool = False):
|
|
"""
|
|
Initialize OKX collector for a single trading pair.
|
|
|
|
Args:
|
|
symbol: Trading symbol (e.g., 'BTC-USDT')
|
|
data_types: Types of data to collect (default: [DataType.TRADE, DataType.ORDERBOOK])
|
|
component_name: Name for logging (default: f'okx_collector_{symbol}')
|
|
auto_restart: Enable automatic restart on failures
|
|
health_check_interval: Seconds between health checks
|
|
store_raw_data: Whether to store raw data for debugging
|
|
force_update_candles: If True, update existing candles; if False, keep existing candles unchanged
|
|
logger: Logger instance for conditional logging (None for no logging)
|
|
log_errors_only: If True and logger provided, only log error-level messages
|
|
"""
|
|
# Default data types if not specified
|
|
if data_types is None:
|
|
data_types = [DataType.TRADE, DataType.ORDERBOOK]
|
|
|
|
# Component name for logging
|
|
if component_name is None:
|
|
component_name = f"okx_collector_{symbol.replace('-', '_').lower()}"
|
|
|
|
# Initialize base collector
|
|
super().__init__(
|
|
exchange_name="okx",
|
|
symbols=[symbol],
|
|
data_types=data_types,
|
|
component_name=component_name,
|
|
auto_restart=auto_restart,
|
|
health_check_interval=health_check_interval,
|
|
logger=logger,
|
|
log_errors_only=log_errors_only
|
|
)
|
|
|
|
# OKX-specific settings
|
|
self.symbol = symbol
|
|
self.store_raw_data = store_raw_data
|
|
self.force_update_candles = force_update_candles
|
|
|
|
# WebSocket client
|
|
self._ws_client: Optional[OKXWebSocketClient] = None
|
|
|
|
# Data processor using new common framework
|
|
self._data_processor = OKXDataProcessor(symbol, component_name=f"{component_name}_processor", logger=logger)
|
|
|
|
# Add callbacks for processed data
|
|
self._data_processor.add_trade_callback(self._on_trade_processed)
|
|
self._data_processor.add_candle_callback(self._on_candle_processed)
|
|
|
|
# Database operations using new repository pattern
|
|
self._db_operations = None
|
|
|
|
# Data processing counters
|
|
self._message_count = 0
|
|
self._processed_trades = 0
|
|
self._processed_candles = 0
|
|
self._error_count = 0
|
|
|
|
# OKX channel mapping
|
|
self._channel_mapping = {
|
|
DataType.TRADE: OKXChannelType.TRADES.value,
|
|
DataType.ORDERBOOK: OKXChannelType.BOOKS5.value,
|
|
DataType.TICKER: OKXChannelType.TICKERS.value
|
|
}
|
|
|
|
if logger:
|
|
logger.info(f"{component_name}: Initialized OKX collector for {symbol} with data types: {[dt.value for dt in data_types]}")
|
|
logger.info(f"{component_name}: Using common data processing framework")
|
|
|
|
async def connect(self) -> bool:
|
|
"""
|
|
Establish connection to OKX WebSocket API.
|
|
|
|
Returns:
|
|
True if connection successful, False otherwise
|
|
"""
|
|
try:
|
|
if self.logger:
|
|
self.logger.info(f"{self.component_name}: Connecting OKX collector for {self.symbol}")
|
|
|
|
# Initialize database operations using repository pattern
|
|
self._db_operations = get_database_operations(self.logger)
|
|
|
|
# Create WebSocket client
|
|
ws_component_name = f"okx_ws_{self.symbol.replace('-', '_').lower()}"
|
|
self._ws_client = OKXWebSocketClient(
|
|
component_name=ws_component_name,
|
|
ping_interval=25.0,
|
|
pong_timeout=10.0,
|
|
max_reconnect_attempts=5,
|
|
reconnect_delay=5.0,
|
|
logger=self.logger # Pass the logger to enable ping/pong logging
|
|
)
|
|
|
|
# Add message callback
|
|
self._ws_client.add_message_callback(self._on_message)
|
|
|
|
# Connect to WebSocket
|
|
if not await self._ws_client.connect(use_public=True):
|
|
if self.logger:
|
|
self.logger.error(f"{self.component_name}: Failed to connect to OKX WebSocket")
|
|
return False
|
|
|
|
if self.logger:
|
|
self.logger.info(f"{self.component_name}: Successfully connected OKX collector for {self.symbol}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
if self.logger:
|
|
self.logger.error(f"{self.component_name}: Error connecting OKX collector for {self.symbol}: {e}")
|
|
return False
|
|
|
|
async def disconnect(self) -> None:
|
|
"""Disconnect from OKX WebSocket API."""
|
|
try:
|
|
if self.logger:
|
|
self.logger.info(f"{self.component_name}: Disconnecting OKX collector for {self.symbol}")
|
|
|
|
if self._ws_client:
|
|
await self._ws_client.disconnect()
|
|
self._ws_client = None
|
|
|
|
if self.logger:
|
|
self.logger.info(f"{self.component_name}: Disconnected OKX collector for {self.symbol}")
|
|
|
|
except Exception as e:
|
|
if self.logger:
|
|
self.logger.error(f"{self.component_name}: Error disconnecting OKX collector for {self.symbol}: {e}")
|
|
|
|
async def subscribe_to_data(self, symbols: List[str], data_types: List[DataType]) -> bool:
|
|
"""
|
|
Subscribe to data streams for specified symbols and data types.
|
|
|
|
Args:
|
|
symbols: Trading symbols to subscribe to (should contain self.symbol)
|
|
data_types: Types of data to subscribe to
|
|
|
|
Returns:
|
|
True if subscription successful, False otherwise
|
|
"""
|
|
if not self._ws_client or not self._ws_client.is_connected:
|
|
if self.logger:
|
|
self.logger.error(f"{self.component_name}: WebSocket client not connected")
|
|
return False
|
|
|
|
# Validate symbol
|
|
if self.symbol not in symbols:
|
|
if self.logger:
|
|
self.logger.warning(f"{self.component_name}: Symbol {self.symbol} not in subscription list: {symbols}")
|
|
return False
|
|
|
|
try:
|
|
# Build subscriptions
|
|
subscriptions = []
|
|
for data_type in data_types:
|
|
if data_type in self._channel_mapping:
|
|
channel = self._channel_mapping[data_type]
|
|
subscription = OKXSubscription(
|
|
channel=channel,
|
|
inst_id=self.symbol,
|
|
enabled=True
|
|
)
|
|
subscriptions.append(subscription)
|
|
if self.logger:
|
|
self.logger.debug(f"{self.component_name}: Added subscription: {channel} for {self.symbol}")
|
|
else:
|
|
if self.logger:
|
|
self.logger.warning(f"{self.component_name}: Unsupported data type: {data_type}")
|
|
|
|
if not subscriptions:
|
|
if self.logger:
|
|
self.logger.warning(f"{self.component_name}: No valid subscriptions to create")
|
|
return False
|
|
|
|
# Subscribe to channels
|
|
success = await self._ws_client.subscribe(subscriptions)
|
|
if success:
|
|
if self.logger:
|
|
self.logger.info(f"{self.component_name}: Successfully subscribed to {len(subscriptions)} channels for {self.symbol}")
|
|
return True
|
|
else:
|
|
if self.logger:
|
|
self.logger.error(f"{self.component_name}: Failed to subscribe to channels for {self.symbol}")
|
|
return False
|
|
|
|
except Exception as e:
|
|
if self.logger:
|
|
self.logger.error(f"{self.component_name}: Error subscribing to data for {self.symbol}: {e}")
|
|
return False
|
|
|
|
async def unsubscribe_from_data(self, symbols: List[str], data_types: List[DataType]) -> bool:
|
|
"""
|
|
Unsubscribe from data streams for specified symbols and data types.
|
|
|
|
Args:
|
|
symbols: Trading symbols to unsubscribe from
|
|
data_types: Types of data to unsubscribe from
|
|
|
|
Returns:
|
|
True if unsubscription successful, False otherwise
|
|
"""
|
|
if not self._ws_client or not self._ws_client.is_connected:
|
|
if self.logger:
|
|
self.logger.warning(f"{self.component_name}: WebSocket client not connected")
|
|
return True # Consider it successful if not connected
|
|
|
|
try:
|
|
# Build unsubscription list
|
|
subscriptions = []
|
|
for data_type in data_types:
|
|
if data_type in self._channel_mapping:
|
|
channel = self._channel_mapping[data_type]
|
|
subscription = OKXSubscription(
|
|
channel=channel,
|
|
inst_id=self.symbol,
|
|
enabled=False # False for unsubscribe
|
|
)
|
|
subscriptions.append(subscription)
|
|
|
|
if not subscriptions:
|
|
return True
|
|
|
|
# Unsubscribe from channels
|
|
success = await self._ws_client.unsubscribe(subscriptions)
|
|
if success:
|
|
if self.logger:
|
|
self.logger.info(f"{self.component_name}: Successfully unsubscribed from {len(subscriptions)} channels for {self.symbol}")
|
|
return True
|
|
else:
|
|
if self.logger:
|
|
self.logger.error(f"{self.component_name}: Failed to unsubscribe from channels for {self.symbol}")
|
|
return False
|
|
|
|
except Exception as e:
|
|
if self.logger:
|
|
self.logger.error(f"{self.component_name}: Error unsubscribing from data for {self.symbol}: {e}")
|
|
return False
|
|
|
|
async def _process_message(self, message: Any) -> Optional[MarketDataPoint]:
|
|
"""
|
|
Process received message using the new data processor.
|
|
|
|
Args:
|
|
message: Raw message from WebSocket
|
|
|
|
Returns:
|
|
MarketDataPoint if processing successful, None otherwise
|
|
"""
|
|
if not isinstance(message, dict):
|
|
if self.logger:
|
|
self.logger.warning(f"{self.component_name}: Received non-dict message: {type(message)}")
|
|
return None
|
|
|
|
try:
|
|
self._message_count += 1
|
|
|
|
# Use the new data processor for validation and processing
|
|
success, market_data_points, errors = self._data_processor.validate_and_process_message(
|
|
message, expected_symbol=self.symbol
|
|
)
|
|
|
|
if not success:
|
|
self._error_count += 1
|
|
if self.logger:
|
|
self.logger.error(f"{self.component_name}: Message processing failed: {errors}")
|
|
return None
|
|
|
|
if errors:
|
|
if self.logger:
|
|
self.logger.warning(f"{self.component_name}: Message processing warnings: {errors}")
|
|
|
|
# Store raw data if enabled (for debugging/compliance)
|
|
if self.store_raw_data:
|
|
if 'data' in message and 'arg' in message:
|
|
await self._store_raw_data(message['arg'].get('channel', 'unknown'), message)
|
|
|
|
# Store processed market data points in raw_trades table
|
|
for data_point in market_data_points:
|
|
await self._store_processed_data(data_point)
|
|
|
|
# Return the first data point for compatibility (most use cases have single data point per message)
|
|
return market_data_points[0] if market_data_points else None
|
|
|
|
except Exception as e:
|
|
self._error_count += 1
|
|
if self.logger:
|
|
self.logger.error(f"{self.component_name}: Error processing message: {e}")
|
|
return None
|
|
|
|
async def _handle_messages(self) -> None:
|
|
"""Handle message processing in the background."""
|
|
# The new data processor handles messages through callbacks
|
|
# This method exists for compatibility with BaseDataCollector
|
|
|
|
# Update heartbeat to indicate the message loop is active
|
|
self._last_heartbeat = datetime.now(timezone.utc)
|
|
|
|
# Check if we're receiving WebSocket messages
|
|
if self._ws_client and self._ws_client.is_connected:
|
|
# Update last data received timestamp if WebSocket is connected and active
|
|
self._last_data_received = datetime.now(timezone.utc)
|
|
|
|
# Short sleep to prevent busy loop while maintaining heartbeat
|
|
await asyncio.sleep(0.1)
|
|
|
|
async def _store_processed_data(self, data_point: MarketDataPoint) -> None:
|
|
"""
|
|
Store raw market data in the raw_trades table.
|
|
|
|
Args:
|
|
data_point: Raw market data point (trade, orderbook, ticker)
|
|
"""
|
|
try:
|
|
if not self._db_operations:
|
|
return
|
|
|
|
# Store raw market data points in raw_trades table using repository
|
|
success = self._db_operations.raw_trades.insert_market_data_point(data_point)
|
|
if success and self.logger:
|
|
self.logger.debug(f"{self.component_name}: Stored raw data: {data_point.data_type.value} for {data_point.symbol}")
|
|
|
|
except DatabaseOperationError as e:
|
|
if self.logger:
|
|
self.logger.error(f"{self.component_name}: Database error storing raw market data: {e}")
|
|
except Exception as e:
|
|
if self.logger:
|
|
self.logger.error(f"{self.component_name}: Error storing raw market data: {e}")
|
|
|
|
async def _store_completed_candle(self, candle: OHLCVCandle) -> None:
|
|
"""
|
|
Store completed OHLCV candle in the market_data table.
|
|
|
|
Handles duplicate candles based on force_update_candles setting:
|
|
- If force_update_candles=True: UPDATE existing records with latest values
|
|
- If force_update_candles=False: IGNORE duplicates, keep existing records unchanged
|
|
|
|
Args:
|
|
candle: Completed OHLCV candle
|
|
"""
|
|
try:
|
|
if not self._db_operations:
|
|
return
|
|
|
|
# Store completed candles using repository pattern
|
|
success = self._db_operations.market_data.upsert_candle(candle, self.force_update_candles)
|
|
|
|
if success and self.logger:
|
|
action = "Updated" if self.force_update_candles else "Stored"
|
|
self.logger.debug(f"{self.component_name}: {action} candle: {candle.symbol} {candle.timeframe} at {candle.end_time} (force_update={self.force_update_candles}) - OHLCV: {candle.open}/{candle.high}/{candle.low}/{candle.close}, Vol: {candle.volume}, Trades: {candle.trade_count}")
|
|
|
|
except DatabaseOperationError as e:
|
|
if self.logger:
|
|
self.logger.error(f"{self.component_name}: Database error storing completed candle: {e}")
|
|
# Log candle details for debugging
|
|
self.logger.error(f"{self.component_name}: Failed candle details: {candle.symbol} {candle.timeframe} {candle.end_time} - OHLCV: {candle.open}/{candle.high}/{candle.low}/{candle.close}")
|
|
self._error_count += 1
|
|
except Exception as e:
|
|
if self.logger:
|
|
self.logger.error(f"{self.component_name}: Error storing completed candle: {e}")
|
|
# Log candle details for debugging
|
|
self.logger.error(f"{self.component_name}: Failed candle details: {candle.symbol} {candle.timeframe} {candle.end_time} - OHLCV: {candle.open}/{candle.high}/{candle.low}/{candle.close}")
|
|
self._error_count += 1
|
|
|
|
async def _store_raw_data(self, channel: str, raw_message: Dict[str, Any]) -> None:
|
|
"""
|
|
Store raw WebSocket data for debugging in raw_trades table.
|
|
|
|
Args:
|
|
channel: Channel name
|
|
raw_message: Raw WebSocket message
|
|
"""
|
|
try:
|
|
if not self._db_operations or 'data' not in raw_message:
|
|
return
|
|
|
|
# Store each data item as a separate raw data record using repository
|
|
for data_item in raw_message['data']:
|
|
success = self._db_operations.raw_trades.insert_raw_websocket_data(
|
|
exchange="okx",
|
|
symbol=self.symbol,
|
|
data_type=f"raw_{channel}", # Prefix with 'raw_' to distinguish from processed data
|
|
raw_data=data_item,
|
|
timestamp=datetime.now(timezone.utc)
|
|
)
|
|
if not success and self.logger:
|
|
self.logger.warning(f"{self.component_name}: Failed to store raw WebSocket data for {channel}")
|
|
|
|
except DatabaseOperationError as e:
|
|
if self.logger:
|
|
self.logger.error(f"{self.component_name}: Database error storing raw WebSocket data: {e}")
|
|
except Exception as e:
|
|
if self.logger:
|
|
self.logger.error(f"{self.component_name}: Error storing raw WebSocket data: {e}")
|
|
|
|
def _on_message(self, message: Dict[str, Any]) -> None:
|
|
"""
|
|
Handle incoming WebSocket message.
|
|
|
|
Args:
|
|
message: WebSocket message from OKX
|
|
"""
|
|
try:
|
|
# Update heartbeat and data received timestamps
|
|
current_time = datetime.now(timezone.utc)
|
|
self._last_heartbeat = current_time
|
|
self._last_data_received = current_time
|
|
self._message_count += 1
|
|
|
|
# Process message asynchronously
|
|
asyncio.create_task(self._process_message(message))
|
|
except Exception as e:
|
|
if self.logger:
|
|
self.logger.error(f"{self.component_name}: Error handling WebSocket message: {e}")
|
|
|
|
def _on_trade_processed(self, trade: StandardizedTrade) -> None:
|
|
"""
|
|
Callback for processed trades from data processor.
|
|
|
|
Args:
|
|
trade: Processed standardized trade
|
|
"""
|
|
self._processed_trades += 1
|
|
if self.logger:
|
|
self.logger.debug(f"{self.component_name}: Processed trade: {trade.symbol} {trade.side} {trade.size}@{trade.price}")
|
|
|
|
def _on_candle_processed(self, candle: OHLCVCandle) -> None:
|
|
"""
|
|
Callback for completed candles from data processor.
|
|
|
|
Args:
|
|
candle: Completed OHLCV candle
|
|
"""
|
|
self._processed_candles += 1
|
|
if self.logger:
|
|
self.logger.debug(f"{self.component_name}: Completed candle: {candle.symbol} {candle.timeframe} O:{candle.open} H:{candle.high} L:{candle.low} C:{candle.close} V:{candle.volume}")
|
|
|
|
# Store completed candle in market_data table
|
|
if candle.is_complete:
|
|
asyncio.create_task(self._store_completed_candle(candle))
|
|
|
|
def get_status(self) -> Dict[str, Any]:
|
|
"""
|
|
Get current collector status including processing statistics.
|
|
|
|
Returns:
|
|
Dictionary containing collector status information
|
|
"""
|
|
base_status = super().get_status()
|
|
|
|
# Add OKX-specific status
|
|
okx_status = {
|
|
"symbol": self.symbol,
|
|
"websocket_connected": self._ws_client.is_connected if self._ws_client else False,
|
|
"websocket_state": self._ws_client.connection_state.value if self._ws_client else "disconnected",
|
|
"store_raw_data": self.store_raw_data,
|
|
"force_update_candles": self.force_update_candles,
|
|
"processing_stats": {
|
|
"messages_received": self._message_count,
|
|
"trades_processed": self._processed_trades,
|
|
"candles_processed": self._processed_candles,
|
|
"errors": self._error_count
|
|
}
|
|
}
|
|
|
|
# Add data processor statistics
|
|
if self._data_processor:
|
|
okx_status["data_processor_stats"] = self._data_processor.get_processing_stats()
|
|
|
|
# Add WebSocket statistics
|
|
if self._ws_client:
|
|
okx_status["websocket_stats"] = self._ws_client.get_stats()
|
|
|
|
# Merge with base status
|
|
base_status.update(okx_status)
|
|
return base_status
|
|
|
|
def __repr__(self) -> str:
|
|
"""String representation of the collector."""
|
|
return f"OKXCollector(symbol='{self.symbol}', status='{self.status.value}', data_types={[dt.value for dt in self.data_types]})" |