TCPDashboard/docs/exchanges/okx_collector.md
Vasily.onl 02a51521a0 Update OKX configuration and aggregation logic for enhanced multi-timeframe support
- Increased health check interval from 30s to 120s in `okx_config.json`.
- Added support for additional timeframes (1s, 5s, 10s, 15s, 30s) in the aggregation logic across multiple components.
- Updated `CandleProcessingConfig` and `RealTimeCandleProcessor` to handle new timeframes.
- Enhanced validation and parsing functions to include new second-based timeframes.
- Updated database schema to support new timeframes in `schema_clean.sql`.
- Improved documentation to reflect changes in multi-timeframe aggregation capabilities.
2025-06-02 12:35:19 +08:00

25 KiB

OKX Data Collector Documentation

Overview

The OKX Data Collector provides real-time market data collection from OKX exchange using WebSocket API. It's built on the modular exchange architecture and provides robust connection management, automatic reconnection, health monitoring, and comprehensive data processing.

Features

🎯 OKX-Specific Features

  • Real-time Data: Live trades, orderbook, and ticker data
  • Single Pair Focus: Each collector handles one trading pair for better isolation
  • Ping/Pong Management: OKX-specific keepalive mechanism with proper format
  • Raw Data Storage: Optional storage of raw OKX messages for debugging
  • Connection Resilience: Robust reconnection logic for OKX WebSocket

📊 Supported Data Types

  • Trades: Real-time trade executions (trades channel)
  • Orderbook: 5-level order book depth (books5 channel)
  • Ticker: 24h ticker statistics (tickers channel)
  • Candles: Real-time OHLCV aggregation (1s, 5s, 10s, 15s, 30s, 1m, 5m, 15m, 1h, 4h, 1d)

🔧 Configuration Options

  • Auto-restart on failures
  • Health check intervals
  • Raw data storage toggle
  • Custom ping/pong timing
  • Reconnection attempts configuration
  • Multi-timeframe candle aggregation

Quick Start

import asyncio
from data.exchanges import create_okx_collector
from data.base_collector import DataType

async def main():
    # Create OKX collector using convenience function
    collector = create_okx_collector(
        symbol='BTC-USDT',
        data_types=[DataType.TRADE, DataType.ORDERBOOK],
        auto_restart=True,
        health_check_interval=30.0,
        store_raw_data=True
    )
    
    # Add data callbacks
    def on_trade(data_point):
        trade = data_point.data
        print(f"Trade: {trade['side']} {trade['sz']} @ {trade['px']} (ID: {trade['tradeId']})")
    
    def on_orderbook(data_point):
        book = data_point.data
        if book.get('bids') and book.get('asks'):
            best_bid = book['bids'][0]
            best_ask = book['asks'][0]
            print(f"Orderbook: Bid {best_bid[0]}@{best_bid[1]} Ask {best_ask[0]}@{best_ask[1]}")
    
    collector.add_data_callback(DataType.TRADE, on_trade)
    collector.add_data_callback(DataType.ORDERBOOK, on_orderbook)
    
    # Start collector
    await collector.start()
    
    # Run for 60 seconds
    await asyncio.sleep(60)
    
    # Stop gracefully
    await collector.stop()

asyncio.run(main())

2. Direct OKX Collector Usage

import asyncio
from data.exchanges.okx import OKXCollector
from data.base_collector import DataType

async def main():
    # Create collector directly
    collector = OKXCollector(
        symbol='ETH-USDT',
        data_types=[DataType.TRADE, DataType.ORDERBOOK],
        component_name='eth_collector',
        auto_restart=True,
        health_check_interval=30.0,
        store_raw_data=True
    )
    
    # Add callbacks
    def on_data(data_point):
        print(f"{data_point.data_type.value}: {data_point.symbol} - {data_point.timestamp}")
    
    collector.add_data_callback(DataType.TRADE, on_data)
    collector.add_data_callback(DataType.ORDERBOOK, on_data)
    
    # Start and monitor
    await collector.start()
    
    # Monitor status
    for i in range(12):  # 60 seconds total
        await asyncio.sleep(5)
        status = collector.get_status()
        print(f"Status: {status['status']} - Messages: {status.get('messages_processed', 0)}")
    
    await collector.stop()

asyncio.run(main())

3. Multiple OKX Collectors with Manager

import asyncio
from data.collector_manager import CollectorManager
from data.exchanges import create_okx_collector
from data.base_collector import DataType

async def main():
    # Create manager
    manager = CollectorManager(
        manager_name="okx_trading_system",
        global_health_check_interval=30.0
    )
    
    # Create multiple OKX collectors
    symbols = ['BTC-USDT', 'ETH-USDT', 'SOL-USDT']
    
    for symbol in symbols:
        collector = create_okx_collector(
            symbol=symbol,
            data_types=[DataType.TRADE, DataType.ORDERBOOK],
            auto_restart=True
        )
        manager.add_collector(collector)
    
    # Start manager
    await manager.start()
    
    # Monitor all collectors
    while True:
        status = manager.get_status()
        stats = status.get('statistics', {})
        
        print(f"=== OKX Collectors Status ===")
        print(f"Running: {stats.get('running_collectors', 0)}")
        print(f"Failed: {stats.get('failed_collectors', 0)}")
        print(f"Total messages: {stats.get('total_messages', 0)}")
        
        # Individual collector status
        for collector_name in manager.list_collectors():
            collector_status = manager.get_collector_status(collector_name)
            if collector_status:
                info = collector_status.get('status', {})
                print(f"  {collector_name}: {info.get('status')} - "
                      f"Messages: {info.get('messages_processed', 0)}")
        
        await asyncio.sleep(15)

asyncio.run(main())

3. Multi-Timeframe Candle Processing

import asyncio
from data.exchanges.okx import OKXCollector
from data.base_collector import DataType
from data.common import CandleProcessingConfig

async def main():
    # Configure multi-timeframe candle processing
    candle_config = CandleProcessingConfig(
        timeframes=['1s', '5s', '10s', '15s', '30s', '1m', '5m', '15m', '1h'],
        auto_save_candles=True,
        emit_incomplete_candles=False
    )
    
    # Create collector with candle processing
    collector = OKXCollector(
        symbol='BTC-USDT',
        data_types=[DataType.TRADE],  # Trades needed for candle aggregation
        candle_config=candle_config,
        auto_restart=True,
        store_raw_data=False  # Disable raw storage for production
    )
    
    # Add candle callback
    def on_candle_completed(candle):
        print(f"Completed {candle.timeframe} candle: "
              f"OHLCV=({candle.open},{candle.high},{candle.low},{candle.close},{candle.volume}) "
              f"at {candle.end_time}")
    
    collector.add_candle_callback(on_candle_completed)
    
    # Start collector
    await collector.start()
    
    # Monitor real-time candle generation
    await asyncio.sleep(300)  # 5 minutes
    
    await collector.stop()

asyncio.run(main())

Configuration

1. JSON Configuration File

The system uses config/okx_config.json for configuration:

{
  "exchange": "okx",
  "connection": {
    "public_ws_url": "wss://ws.okx.com:8443/ws/v5/public",
    "private_ws_url": "wss://ws.okx.com:8443/ws/v5/private",
    "ping_interval": 25.0,
    "pong_timeout": 10.0,
    "max_reconnect_attempts": 5,
    "reconnect_delay": 5.0
  },
  "data_collection": {
    "store_raw_data": true,
    "health_check_interval": 30.0,
    "auto_restart": true,
    "buffer_size": 1000
  },
  "factory": {
    "use_factory_pattern": true,
    "default_data_types": ["trade", "orderbook"],
    "batch_create": true
  },
  "trading_pairs": [
    {
      "symbol": "BTC-USDT",
      "enabled": true,
      "data_types": ["trade", "orderbook"],
      "channels": {
        "trades": "trades",
        "orderbook": "books5",
        "ticker": "tickers"
      }
    },
    {
      "symbol": "ETH-USDT",
      "enabled": true,
      "data_types": ["trade", "orderbook"],
      "channels": {
        "trades": "trades",
        "orderbook": "books5",
        "ticker": "tickers"
      }
    }
  ],
  "logging": {
    "component_name_template": "okx_collector_{symbol}",
    "log_level": "INFO",
    "verbose": false
  },
  "database": {
    "store_processed_data": true,
    "store_raw_data": true,
    "batch_size": 100,
    "flush_interval": 5.0
  },
  "monitoring": {
    "enable_health_checks": true,
    "health_check_interval": 30.0,
    "alert_on_connection_loss": true,
    "max_consecutive_errors": 5
  }
}

2. Programmatic Configuration

from data.exchanges.okx import OKXCollector
from data.base_collector import DataType

# Custom configuration
collector = OKXCollector(
    symbol='BTC-USDT',
    data_types=[DataType.TRADE, DataType.ORDERBOOK],
    component_name='custom_btc_collector',
    auto_restart=True,
    health_check_interval=15.0,  # Check every 15 seconds
    store_raw_data=True         # Store raw OKX messages
)

3. Factory Configuration

from data.exchanges import ExchangeFactory, ExchangeCollectorConfig
from data.base_collector import DataType

config = ExchangeCollectorConfig(
    exchange='okx',
    symbol='ETH-USDT',
    data_types=[DataType.TRADE, DataType.ORDERBOOK],
    auto_restart=True,
    health_check_interval=30.0,
    store_raw_data=True,
    custom_params={
        'ping_interval': 20.0,              # Custom ping interval
        'max_reconnect_attempts': 10,       # More reconnection attempts
        'pong_timeout': 15.0               # Longer pong timeout
    }
)

collector = ExchangeFactory.create_collector(config)

Data Processing

OKX Message Formats

Trade Data

# Raw OKX trade message
{
    "arg": {
        "channel": "trades",
        "instId": "BTC-USDT"
    },
    "data": [
        {
            "instId": "BTC-USDT",
            "tradeId": "12345678",
            "px": "50000.5",      # Price
            "sz": "0.001",        # Size
            "side": "buy",        # Side (buy/sell)
            "ts": "1697123456789" # Timestamp (ms)
        }
    ]
}

# Processed MarketDataPoint
MarketDataPoint(
    exchange="okx",
    symbol="BTC-USDT",
    timestamp=datetime(2023, 10, 12, 15, 30, 56, tzinfo=timezone.utc),
    data_type=DataType.TRADE,
    data={
        "instId": "BTC-USDT",
        "tradeId": "12345678",
        "px": "50000.5",
        "sz": "0.001",
        "side": "buy",
        "ts": "1697123456789"
    }
)

Orderbook Data

# Raw OKX orderbook message (books5)
{
    "arg": {
        "channel": "books5",
        "instId": "BTC-USDT"
    },
    "data": [
        {
            "asks": [
                ["50001.0", "0.5", "0", "3"],  # [price, size, liquidated, orders]
                ["50002.0", "1.0", "0", "5"]
            ],
            "bids": [
                ["50000.0", "0.8", "0", "2"],
                ["49999.0", "1.2", "0", "4"]
            ],
            "ts": "1697123456789",
            "checksum": "123456789"
        }
    ]
}

# Usage in callback
def on_orderbook(data_point):
    book = data_point.data
    
    if book.get('bids') and book.get('asks'):
        best_bid = book['bids'][0]
        best_ask = book['asks'][0]
        
        spread = float(best_ask[0]) - float(best_bid[0])
        print(f"Spread: ${spread:.2f}")

Ticker Data

# Raw OKX ticker message
{
    "arg": {
        "channel": "tickers",
        "instId": "BTC-USDT"
    },
    "data": [
        {
            "instType": "SPOT",
            "instId": "BTC-USDT",
            "last": "50000.5",      # Last price
            "lastSz": "0.001",      # Last size
            "askPx": "50001.0",     # Best ask price
            "askSz": "0.5",         # Best ask size
            "bidPx": "50000.0",     # Best bid price
            "bidSz": "0.8",         # Best bid size
            "open24h": "49500.0",   # 24h open
            "high24h": "50500.0",   # 24h high
            "low24h": "49000.0",    # 24h low
            "vol24h": "1234.567",   # 24h volume
            "ts": "1697123456789"
        }
    ]
}

Data Validation

The OKX collector includes comprehensive data validation:

# Automatic validation in collector
class OKXCollector(BaseDataCollector):
    async def _process_data_item(self, channel: str, data_item: Dict[str, Any]):
        # Validate message structure
        if not isinstance(data_item, dict):
            self.logger.warning("Invalid data item type")
            return None
        
        # Validate required fields based on channel
        if channel == "trades":
            required_fields = ['tradeId', 'px', 'sz', 'side', 'ts']
        elif channel == "books5":
            required_fields = ['bids', 'asks', 'ts']
        elif channel == "tickers":
            required_fields = ['last', 'ts']
        else:
            self.logger.warning(f"Unknown channel: {channel}")
            return None
        
        # Check required fields
        for field in required_fields:
            if field not in data_item:
                self.logger.warning(f"Missing required field '{field}' in {channel} data")
                return None
        
        # Process and return validated data
        return await self._create_market_data_point(channel, data_item)

Monitoring and Status

Status Information

# Get comprehensive status
status = collector.get_status()

print(f"Exchange: {status['exchange']}")                          # 'okx'
print(f"Symbol: {status['symbol']}")                              # 'BTC-USDT'
print(f"Status: {status['status']}")                              # 'running'
print(f"WebSocket Connected: {status['websocket_connected']}")     # True/False
print(f"WebSocket State: {status['websocket_state']}")            # 'connected'
print(f"Messages Processed: {status['messages_processed']}")       # Integer
print(f"Errors: {status['errors']}")                              # Integer
print(f"Last Trade ID: {status['last_trade_id']}")               # String or None

# WebSocket statistics
if 'websocket_stats' in status:
    ws_stats = status['websocket_stats']
    print(f"Messages Received: {ws_stats['messages_received']}")
    print(f"Messages Sent: {ws_stats['messages_sent']}")
    print(f"Pings Sent: {ws_stats['pings_sent']}")
    print(f"Pongs Received: {ws_stats['pongs_received']}")
    print(f"Reconnections: {ws_stats['reconnections']}")

Health Monitoring

# Get health status
health = collector.get_health_status()

print(f"Is Healthy: {health['is_healthy']}")                     # True/False
print(f"Issues: {health['issues']}")                             # List of issues
print(f"Last Heartbeat: {health['last_heartbeat']}")            # ISO timestamp
print(f"Last Data: {health['last_data_received']}")             # ISO timestamp
print(f"Should Be Running: {health['should_be_running']}")       # True/False
print(f"Is Running: {health['is_running']}")                     # True/False

# Auto-restart status
if not health['is_healthy']:
    print("Collector is unhealthy - auto-restart will trigger")
    for issue in health['issues']:
        print(f"  Issue: {issue}")

Performance Monitoring

import time

async def monitor_performance():
    collector = create_okx_collector('BTC-USDT', [DataType.TRADE])
    await collector.start()
    
    start_time = time.time()
    last_message_count = 0
    
    while True:
        await asyncio.sleep(10)  # Check every 10 seconds
        
        status = collector.get_status()
        current_messages = status.get('messages_processed', 0)
        
        # Calculate message rate
        elapsed = time.time() - start_time
        messages_per_second = current_messages / elapsed if elapsed > 0 else 0
        
        # Calculate recent rate
        recent_messages = current_messages - last_message_count
        recent_rate = recent_messages / 10  # per second over last 10 seconds
        
        print(f"=== Performance Stats ===")
        print(f"Total Messages: {current_messages}")
        print(f"Average Rate: {messages_per_second:.2f} msg/sec")
        print(f"Recent Rate: {recent_rate:.2f} msg/sec")
        print(f"Errors: {status.get('errors', 0)}")
        print(f"WebSocket State: {status.get('websocket_state', 'unknown')}")
        
        last_message_count = current_messages

# Run performance monitoring
asyncio.run(monitor_performance())

WebSocket Connection Details

OKX WebSocket Client

The OKX implementation includes a specialized WebSocket client:

from data.exchanges.okx import OKXWebSocketClient, OKXSubscription, OKXChannelType

# Create WebSocket client directly (usually handled by collector)
ws_client = OKXWebSocketClient(
    component_name='okx_ws_btc',
    ping_interval=25.0,        # Must be < 30 seconds for OKX
    pong_timeout=10.0,
    max_reconnect_attempts=5,
    reconnect_delay=5.0
)

# Connect to OKX
await ws_client.connect(use_public=True)

# Create subscriptions
subscriptions = [
    OKXSubscription(
        channel=OKXChannelType.TRADES.value,
        inst_id='BTC-USDT',
        enabled=True
    ),
    OKXSubscription(
        channel=OKXChannelType.BOOKS5.value,
        inst_id='BTC-USDT',
        enabled=True
    )
]

# Subscribe to channels
await ws_client.subscribe(subscriptions)

# Add message callback
def on_message(message):
    print(f"Received: {message}")

ws_client.add_message_callback(on_message)

# WebSocket will handle messages automatically
await asyncio.sleep(60)

# Disconnect
await ws_client.disconnect()

Connection States

The WebSocket client tracks connection states:

from data.exchanges.okx.websocket import ConnectionState

# Check connection state
state = ws_client.connection_state

if state == ConnectionState.CONNECTED:
    print("WebSocket is connected and ready")
elif state == ConnectionState.CONNECTING:
    print("WebSocket is connecting...")
elif state == ConnectionState.RECONNECTING:
    print("WebSocket is reconnecting...")
elif state == ConnectionState.DISCONNECTED:
    print("WebSocket is disconnected")
elif state == ConnectionState.ERROR:
    print("WebSocket has error")

Ping/Pong Mechanism

OKX requires specific ping/pong format:

# OKX expects simple "ping" string (not JSON)
# The WebSocket client handles this automatically:

# Send: "ping"
# Receive: "pong"

# This is handled automatically by OKXWebSocketClient
# Ping interval must be < 30 seconds to avoid disconnection

Error Handling and Troubleshooting

Common Issues and Solutions

1. Connection Failures

# Check connection status
status = collector.get_status()
if not status['websocket_connected']:
    print("WebSocket not connected")
    
    # Check WebSocket state
    ws_state = status.get('websocket_state', 'unknown')
    
    if ws_state == 'error':
        print("WebSocket in error state - will auto-restart")
    elif ws_state == 'reconnecting':
        print("WebSocket is reconnecting...")
    
    # Manual restart if needed
    await collector.restart()

2. Ping/Pong Issues

# Monitor ping/pong status
if 'websocket_stats' in status:
    ws_stats = status['websocket_stats']
    pings_sent = ws_stats.get('pings_sent', 0)
    pongs_received = ws_stats.get('pongs_received', 0)
    
    if pings_sent > pongs_received + 3:  # Allow some tolerance
        print("Ping/pong issue detected - connection may be stale")
        # Auto-restart will handle this

3. Data Validation Errors

# Monitor for validation errors
errors = status.get('errors', 0)
if errors > 0:
    print(f"Data validation errors detected: {errors}")
    
    # Check logs for details:
    # - Malformed messages
    # - Missing required fields
    # - Invalid data types

4. Performance Issues

# Monitor message processing rate
messages = status.get('messages_processed', 0)
uptime = status.get('uptime_seconds', 1)
rate = messages / uptime

if rate < 1.0:  # Less than 1 message per second
    print("Low message rate - check:")
    print("- Network connectivity")
    print("- OKX API status")
    print("- Symbol activity")

Debug Mode

Enable debug logging for detailed information:

import os
os.environ['LOG_LEVEL'] = 'DEBUG'

# Create collector with verbose logging
collector = create_okx_collector(
    symbol='BTC-USDT',
    data_types=[DataType.TRADE, DataType.ORDERBOOK]
)

await collector.start()

# Check logs in ./logs/ directory:
# - okx_collector_btc_usdt_debug.log
# - okx_collector_btc_usdt_info.log
# - okx_collector_btc_usdt_error.log

Testing

Unit Tests

Run the existing test scripts:

# Test single collector
python scripts/test_okx_collector.py single

# Test collector manager
python scripts/test_okx_collector.py manager

# Test factory pattern
python scripts/test_exchange_factory.py

Custom Testing

import asyncio
from data.exchanges import create_okx_collector
from data.base_collector import DataType

async def test_okx_collector():
    """Test OKX collector functionality."""
    
    # Test data collection
    message_count = 0
    error_count = 0
    
    def on_trade(data_point):
        nonlocal message_count
        message_count += 1
        print(f"Trade #{message_count}: {data_point.data.get('tradeId')}")
    
    def on_error(error):
        nonlocal error_count
        error_count += 1
        print(f"Error #{error_count}: {error}")
    
    # Create and configure collector
    collector = create_okx_collector(
        symbol='BTC-USDT',
        data_types=[DataType.TRADE],
        auto_restart=True
    )
    
    collector.add_data_callback(DataType.TRADE, on_trade)
    
    # Test lifecycle
    print("Starting collector...")
    await collector.start()
    
    print("Collecting data for 30 seconds...")
    await asyncio.sleep(30)
    
    print("Stopping collector...")
    await collector.stop()
    
    # Check results
    status = collector.get_status()
    print(f"Final status: {status['status']}")
    print(f"Messages processed: {status.get('messages_processed', 0)}")
    print(f"Errors: {status.get('errors', 0)}")
    
    assert message_count > 0, "No messages received"
    assert error_count == 0, f"Unexpected errors: {error_count}"
    
    print("Test passed!")

# Run test
asyncio.run(test_okx_collector())

Production Deployment

# Production-ready OKX collector setup
import asyncio
from data.collector_manager import CollectorManager
from data.exchanges import create_okx_collector
from data.base_collector import DataType

async def deploy_okx_production():
    """Production deployment configuration."""
    
    # Create manager with appropriate settings
    manager = CollectorManager(
        manager_name="okx_production",
        global_health_check_interval=30.0,  # Check every 30 seconds
        restart_delay=10.0                   # Wait 10 seconds between restarts
    )
    
    # Production trading pairs
    trading_pairs = [
        'BTC-USDT', 'ETH-USDT', 'SOL-USDT',
        'DOGE-USDT', 'TON-USDT', 'UNI-USDT'
    ]
    
    # Create collectors with production settings
    for symbol in trading_pairs:
        collector = create_okx_collector(
            symbol=symbol,
            data_types=[DataType.TRADE, DataType.ORDERBOOK],
            auto_restart=True,
            health_check_interval=15.0,      # More frequent health checks
            store_raw_data=False             # Disable raw data storage in production
        )
        
        manager.add_collector(collector)
    
    # Start system
    await manager.start()
    
    # Production monitoring loop
    try:
        while True:
            await asyncio.sleep(60)  # Check every minute
            
            status = manager.get_status()
            stats = status.get('statistics', {})
            
            # Log production metrics
            print(f"=== Production Status ===")
            print(f"Running: {stats.get('running_collectors', 0)}/{len(trading_pairs)}")
            print(f"Failed: {stats.get('failed_collectors', 0)}")
            print(f"Total restarts: {stats.get('restarts_performed', 0)}")
            
            # Alert on failures
            failed_count = stats.get('failed_collectors', 0)
            if failed_count > 0:
                print(f"ALERT: {failed_count} collectors failed!")
                # Implement alerting system here
            
    except KeyboardInterrupt:
        print("Shutting down production system...")
        await manager.stop()
        print("Production system stopped")

# Deploy to production
asyncio.run(deploy_okx_production())

Docker Deployment

# Dockerfile for OKX collector
FROM python:3.11-slim

WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt

COPY . .

# Production command
CMD ["python", "-m", "scripts.deploy_okx_production"]

Environment Variables

# Production environment variables
export LOG_LEVEL=INFO
export OKX_ENV=production
export HEALTH_CHECK_INTERVAL=30
export AUTO_RESTART=true
export STORE_RAW_DATA=false
export DATABASE_URL=postgresql://user:pass@host:5432/db

API Reference

OKXCollector Class

class OKXCollector(BaseDataCollector):
    def __init__(self,
                 symbol: str,
                 data_types: Optional[List[DataType]] = None,
                 component_name: Optional[str] = None,
                 auto_restart: bool = True,
                 health_check_interval: float = 30.0,
                 store_raw_data: bool = True):
        """
        Initialize OKX collector.
        
        Args:
            symbol: Trading symbol (e.g., 'BTC-USDT')
            data_types: Data types to collect (default: [TRADE, ORDERBOOK])
            component_name: Name for logging (default: auto-generated)
            auto_restart: Enable automatic restart on failures
            health_check_interval: Seconds between health checks
            store_raw_data: Whether to store raw OKX data
        """