TCPDashboard/docs/components/data_collectors.md
Vasily.onl cffc54b648 Add complete time series aggregation example and refactor OKXCollector for repository pattern
- Introduced `example_complete_series_aggregation.py` to demonstrate time series aggregation, emitting candles even when no trades occur.
- Implemented `CompleteSeriesProcessor` extending `RealTimeCandleProcessor` to handle time-based candle emission and empty candle creation.
- Refactored `OKXCollector` to utilize the new repository pattern for database operations, enhancing modularity and maintainability.
- Updated database operations to centralize data handling through `DatabaseOperations`, improving error handling and logging.
- Enhanced documentation to include details on the new aggregation example and repository pattern implementation, ensuring clarity for users.
2025-06-02 13:27:01 +08:00

41 KiB

Data Collector System Documentation

Overview

The Data Collector System provides a robust, scalable framework for collecting real-time market data from cryptocurrency exchanges. It features comprehensive health monitoring, automatic recovery, centralized management, and a modular exchange-based architecture designed for production trading environments.

Key Features

🏗️ Modular Exchange Architecture

  • Exchange-Based Organization: Each exchange has its own implementation folder
  • Factory Pattern: Easy creation of collectors from any supported exchange
  • Standardized Interface: Consistent API across all exchange implementations
  • Scalable Design: Easy addition of new exchanges (Binance, Coinbase, etc.)

🔄 Auto-Recovery & Health Monitoring

  • Heartbeat System: Continuous health monitoring with configurable intervals
  • Auto-Restart: Automatic restart on failures with exponential backoff
  • Connection Recovery: Robust reconnection logic for network interruptions
  • Data Freshness Monitoring: Detects stale data and triggers recovery

🎛️ Centralized Management

  • CollectorManager: Supervises multiple collectors with coordinated lifecycle
  • Dynamic Control: Enable/disable collectors at runtime without system restart
  • Global Health Checks: System-wide monitoring and alerting
  • Graceful Shutdown: Proper cleanup and resource management

📊 Comprehensive Monitoring

  • Real-time Status: Detailed status reporting for all collectors
  • Performance Metrics: Message counts, uptime, error rates, restart counts
  • Health Analytics: Connection state, data freshness, error tracking
  • Logging Integration: Enhanced logging with configurable verbosity
  • Multi-Timeframe Support: Sub-second to daily candle aggregation (1s, 5s, 10s, 15s, 30s, 1m, 5m, 15m, 1h, 4h, 1d)

🛢️ Database Integration

  • Repository Pattern: All database operations use the centralized database/operations.py module
  • No Raw SQL: Clean API through MarketDataRepository and RawTradeRepository classes
  • Automatic Transaction Management: Sessions, commits, and rollbacks handled automatically
  • Configurable Duplicate Handling: force_update_candles parameter controls duplicate behavior
  • Real-time Storage: Completed candles automatically saved to market_data table
  • Raw Data Storage: Optional raw WebSocket data storage via RawTradeRepository
  • Custom Error Handling: Proper exception handling with DatabaseOperationError
  • Health Monitoring: Built-in database health checks and statistics
  • Connection Pooling: Efficient database connection management through repositories

Architecture

┌─────────────────────────────────────────────────────────────┐
│                   CollectorManager                          │
│  ┌─────────────────────────────────────────────────────┐    │
│  │              Global Health Monitor                  │    │
│  │  • System-wide health checks                       │    │
│  │  • Auto-restart coordination                       │    │
│  │  • Performance analytics                           │    │
│  └─────────────────────────────────────────────────────┘    │
│                           │                                 │
│  ┌─────────────────┐  ┌─────────────────┐  ┌──────────────┐ │
│  │  OKX Collector  │  │Binance Collector│  │   Custom     │ │
│  │                 │  │                 │  │  Collector   │ │
│  │ • Health Monitor│  │ • Health Monitor│  │ • Health Mon │ │
│  │ • Auto-restart  │  │ • Auto-restart  │  │ • Auto-resta │ │
│  │ • Data Validate │  │ • Data Validate │  │ • Data Valid │ │
│  └─────────────────┘  └─────────────────┘  └──────────────┘ │
└─────────────────────────────────────────────────────────────┘
                              │
                    ┌─────────────────┐
                    │   Data Output   │
                    │                 │
                    │ • Callbacks     │
                    │ • Redis Pub/Sub │
                    │ • Database      │
                    └─────────────────┘

Exchange Module Structure

The new modular architecture organizes exchange implementations:

data/
├── base_collector.py           # Abstract base classes
├── collector_manager.py        # Cross-platform collector manager
├── aggregator.py              # Cross-exchange data aggregation
├── exchanges/                 # Exchange-specific implementations
│   ├── __init__.py           # Main exports and factory
│   ├── registry.py           # Exchange registry and capabilities
│   ├── factory.py            # Factory pattern for collectors
│   └── okx/                  # OKX implementation
│       ├── __init__.py       # OKX exports
│       ├── collector.py      # OKXCollector class
│       └── websocket.py      # OKXWebSocketClient class
│   └── binance/              # Future: Binance implementation
│       ├── __init__.py
│       ├── collector.py
│       └── websocket.py

Quick Start

import asyncio
from data.exchanges import ExchangeFactory, ExchangeCollectorConfig, create_okx_collector
from data.base_collector import DataType

async def main():
    # Method 1: Using factory with configuration
    config = ExchangeCollectorConfig(
        exchange='okx',
        symbol='BTC-USDT',
        data_types=[DataType.TRADE, DataType.ORDERBOOK],
        auto_restart=True,
        health_check_interval=30.0,
        store_raw_data=True
    )
    
    collector = ExchangeFactory.create_collector(config)
    
    # Method 2: Using convenience function
    okx_collector = create_okx_collector(
        symbol='ETH-USDT',
        data_types=[DataType.TRADE, DataType.ORDERBOOK]
    )
    
    # Add data callback
    def on_trade_data(data_point):
        print(f"Trade: {data_point.symbol} - {data_point.data}")
    
    collector.add_data_callback(DataType.TRADE, on_trade_data)
    
    # Start collector
    await collector.start()
    
    # Let it run
    await asyncio.sleep(60)
    
    # Stop collector
    await collector.stop()

asyncio.run(main())

2. Creating Multiple Collectors

import asyncio
from data.exchanges import ExchangeFactory, ExchangeCollectorConfig
from data.base_collector import DataType

async def main():
    # Create multiple collectors using factory
    configs = [
        ExchangeCollectorConfig('okx', 'BTC-USDT', [DataType.TRADE, DataType.ORDERBOOK]),
        ExchangeCollectorConfig('okx', 'ETH-USDT', [DataType.TRADE]),
        ExchangeCollectorConfig('okx', 'SOL-USDT', [DataType.ORDERBOOK])
    ]
    
    collectors = ExchangeFactory.create_multiple_collectors(configs)
    
    print(f"Created {len(collectors)} collectors")
    
    # Start all collectors
    for collector in collectors:
        await collector.start()
    
    # Monitor
    await asyncio.sleep(60)
    
    # Stop all
    for collector in collectors:
        await collector.stop()

asyncio.run(main())

API Reference

BaseDataCollector

The abstract base class that all data collectors must inherit from.

Constructor

def __init__(self, 
             exchange_name: str,
             symbols: List[str],
             data_types: Optional[List[DataType]] = None,
             component_name: Optional[str] = None,
             auto_restart: bool = True,
             health_check_interval: float = 30.0)

Parameters:

  • exchange_name: Name of the exchange (e.g., 'okx', 'binance')
  • symbols: List of trading symbols to collect data for
  • data_types: Types of data to collect (default: [DataType.CANDLE])
  • component_name: Name for logging (default: based on exchange_name)
  • auto_restart: Enable automatic restart on failures (default: True)
  • health_check_interval: Seconds between health checks (default: 30.0)

Abstract Methods

Must be implemented by subclasses:

async def connect(self) -> bool
async def disconnect(self) -> None
async def subscribe_to_data(self, symbols: List[str], data_types: List[DataType]) -> bool
async def unsubscribe_from_data(self, symbols: List[str], data_types: List[DataType]) -> bool
async def _process_message(self, message: Any) -> Optional[MarketDataPoint]
async def _handle_messages(self) -> None

Public Methods

async def start() -> bool                    # Start the collector
async def stop(force: bool = False) -> None  # Stop the collector
async def restart() -> bool                  # Restart the collector

# Callback management
def add_data_callback(self, data_type: DataType, callback: Callable) -> None
def remove_data_callback(self, data_type: DataType, callback: Callable) -> None

# Symbol management
def add_symbol(self, symbol: str) -> None
def remove_symbol(self, symbol: str) -> None

# Status and monitoring
def get_status(self) -> Dict[str, Any]
def get_health_status(self) -> Dict[str, Any]

# Data validation
def validate_ohlcv_data(self, data: Dict[str, Any], symbol: str, timeframe: str) -> OHLCVData

Status Information

The get_status() method returns comprehensive status information:

{
    'exchange': 'okx',
    'status': 'running',                    # Current status
    'should_be_running': True,               # Desired state
    'symbols': ['BTC-USDT', 'ETH-USDT'],      # Configured symbols
    'data_types': ['ticker'],                  # Data types being collected
    'auto_restart': True,                      # Auto-restart enabled
    'health': {
        'time_since_heartbeat': 5.2,            # Seconds since last heartbeat
        'time_since_data': 2.1,                 # Seconds since last data
        'max_silence_duration': 300.0           # Max allowed silence
    },
    'statistics': {
        'messages_received': 1250,                # Total messages received
        'messages_processed': 1248,                # Successfully processed
        'errors': 2,                              # Error count
        'restarts': 1,                            # Restart count
        'uptime_seconds': 3600.5,                 # Current uptime
        'reconnect_attempts': 0,                  # Current reconnect attempts
        'last_message_time': '2023-...',           # ISO timestamp
        'connection_uptime': '2023-...',           # Connection start time
        'last_error': 'Connection failed',          # Last error message
        'last_restart_time': '2023-...'            # Last restart time
    }
}

Health Status

The get_health_status() method provides detailed health information:

{
    'is_healthy': True,                          # Overall health status
    'issues': [],                               # List of current issues
    'status': 'running',                         # Current collector status
    'last_heartbeat': '2023-...',                 # Last heartbeat timestamp
    'last_data_received': '2023-...',             # Last data timestamp
    'should_be_running': True,                    # Expected state
    'is_running': True                           # Actual running state
}

CollectorManager

Manages multiple data collectors with coordinated lifecycle and health monitoring.

Constructor

def __init__(self,
             manager_name: str = "collector_manager",
             global_health_check_interval: float = 60.0,
             restart_delay: float = 5.0)

Public Methods

# Collector management
def add_collector(self, collector: BaseDataCollector, config: Optional[CollectorConfig] = None) -> None
def remove_collector(self, collector_name: str) -> bool
def enable_collector(self, collector_name: str) -> bool
def disable_collector(self, collector_name: str) -> bool

# Lifecycle management
async def start() -> bool
async def stop() -> None
async def restart_collector(self, collector_name: str) -> bool
async def restart_all_collectors(self) -> Dict[str, bool]

# Status and monitoring
def get_status(self) -> Dict[str, Any]
def get_collector_status(self, collector_name: str) -> Optional[Dict[str, Any]]
def list_collectors(self) -> List[str]
def get_running_collectors(self) -> List[str]
def get_failed_collectors(self) -> List[str]

CollectorConfig

Configuration dataclass for collectors:

@dataclass
class CollectorConfig:
    name: str                               # Unique collector name
    exchange: str                           # Exchange name
    symbols: List[str]                      # Trading symbols
    data_types: List[str]                   # Data types to collect
    auto_restart: bool = True               # Enable auto-restart
    health_check_interval: float = 30.0    # Health check interval
    enabled: bool = True                    # Initially enabled

Data Types

DataType Enum

class DataType(Enum):
    TICKER = "ticker"        # Price and volume updates
    TRADE = "trade"          # Individual trade executions
    ORDERBOOK = "orderbook"  # Order book snapshots
    CANDLE = "candle"        # OHLCV candle data
    BALANCE = "balance"      # Account balance updates

MarketDataPoint

Standardized market data structure:

@dataclass
class MarketDataPoint:
    exchange: str            # Exchange name
    symbol: str             # Trading symbol
    timestamp: datetime     # Data timestamp (UTC)
    data_type: DataType     # Type of data
    data: Dict[str, Any]    # Raw data payload

OHLCVData

OHLCV (candlestick) data structure with validation:

@dataclass  
class OHLCVData:
    symbol: str                          # Trading symbol
    timeframe: str                       # Timeframe (1m, 5m, 1h, etc.)
    timestamp: datetime                  # Candle timestamp
    open: Decimal                        # Opening price
    high: Decimal                        # Highest price
    low: Decimal                         # Lowest price
    close: Decimal                       # Closing price
    volume: Decimal                      # Trading volume
    trades_count: Optional[int] = None   # Number of trades

Health Monitoring

Monitoring Levels

The system provides multi-level health monitoring:

  1. Individual Collector Health

    • Heartbeat monitoring (message loop activity)
    • Data freshness (time since last data received)
    • Connection state monitoring
    • Error rate tracking
  2. Manager-Level Health

    • Global health checks across all collectors
    • Coordinated restart management
    • System-wide performance metrics
    • Resource utilization monitoring

Health Check Intervals

  • Individual Collector: Configurable per collector (default: 30s)
  • Global Manager: Configurable for manager (default: 60s)
  • Heartbeat Updates: Updated with each message loop iteration
  • Data Freshness: Updated when data is received

Auto-Restart Triggers

Collectors are automatically restarted when:

  1. No Heartbeat: Message loop becomes unresponsive
  2. Stale Data: No data received within configured timeout
  3. Connection Failures: WebSocket or API connection lost
  4. Error Status: Collector enters ERROR or UNHEALTHY state
  5. Manual Trigger: Explicit restart request

Failure Handling

# Configure failure handling
collector = MyCollector(
    symbols=["BTC-USDT"],
    auto_restart=True,                    # Enable auto-restart
    health_check_interval=30.0            # Check every 30 seconds
)

# The collector will automatically:
# 1. Detect failures within 30 seconds
# 2. Attempt reconnection with exponential backoff
# 3. Restart up to 5 times (configurable)
# 4. Log all recovery attempts
# 5. Report status to manager

Configuration

Environment Variables

The system respects these environment variables:

# Logging configuration
LOG_LEVEL=INFO                    # Logging level (DEBUG, INFO, WARN, ERROR)
LOG_CLEANUP=true                  # Enable automatic log cleanup
LOG_MAX_FILES=30                  # Maximum log files to retain

# Health monitoring
DEFAULT_HEALTH_CHECK_INTERVAL=30  # Default health check interval (seconds)
MAX_SILENCE_DURATION=300          # Max time without data (seconds)
MAX_RECONNECT_ATTEMPTS=5          # Maximum reconnection attempts
RECONNECT_DELAY=5                 # Delay between reconnect attempts (seconds)

Programmatic Configuration

# Configure individual collector
collector = MyCollector(
    exchange_name="custom_exchange",
    symbols=["BTC-USDT", "ETH-USDT"],
    data_types=[DataType.TICKER, DataType.TRADE],
    auto_restart=True,
    health_check_interval=15.0        # Check every 15 seconds
)

# Configure manager
manager = CollectorManager(
    manager_name="production_manager",
    global_health_check_interval=30.0,   # Global checks every 30s
    restart_delay=10.0                    # 10s delay between restarts
)

# Configure specific collector in manager
config = CollectorConfig(
    name="primary_okx",
    exchange="okx",
    symbols=["BTC-USDT", "ETH-USDT", "SOL-USDT"],
    data_types=["ticker", "trade", "orderbook"],
    auto_restart=True,
    health_check_interval=20.0,
    enabled=True
)

manager.add_collector(collector, config)

Best Practices

1. Collector Implementation

class ProductionCollector(BaseDataCollector):
    def __init__(self, exchange_name: str, symbols: list):
        super().__init__(
            exchange_name=exchange_name,
            symbols=symbols,
            data_types=[DataType.TICKER, DataType.TRADE],
            auto_restart=True,               # Always enable auto-restart
            health_check_interval=30.0       # Reasonable interval
        )
        
        # Connection management
        self.connection_pool = None
        self.rate_limiter = RateLimiter(100, 60)  # 100 requests per minute
        
        # Data validation
        self.data_validator = DataValidator()
        
        # Performance monitoring
        self.metrics = MetricsCollector()
    
    async def connect(self) -> bool:
        """Implement robust connection logic."""
        try:
            # Use connection pooling for reliability
            self.connection_pool = await create_connection_pool(
                self.exchange_name,
                max_connections=5,
                retry_attempts=3
            )
            
            # Test connection
            await self.connection_pool.ping()
            return True
            
        except Exception as e:
            self.logger.error(f"Connection failed: {e}")
            return False
    
    async def _process_message(self, message) -> Optional[MarketDataPoint]:
        """Implement thorough data processing."""
        try:
            # Rate limiting
            await self.rate_limiter.acquire()
            
            # Data validation
            if not self.data_validator.validate(message):
                self.logger.warning(f"Invalid message: {message}")
                return None
            
            # Metrics collection
            self.metrics.increment('messages_processed')
            
            # Create standardized data point
            return MarketDataPoint(
                exchange=self.exchange_name,
                symbol=message['symbol'],
                timestamp=self._parse_timestamp(message['timestamp']),
                data_type=DataType.TICKER,
                data=self._normalize_data(message)
            )
            
        except Exception as e:
            self.metrics.increment('processing_errors')
            self.logger.error(f"Message processing failed: {e}")
            raise  # Let health monitor handle it

2. Error Handling

# Implement proper error handling
class RobustCollector(BaseDataCollector):
    async def _handle_messages(self) -> None:
        """Handle messages with proper error management."""
        try:
            # Check connection health
            if not await self._check_connection_health():
                raise ConnectionError("Connection health check failed")
            
            # Receive message with timeout
            message = await asyncio.wait_for(
                self.websocket.receive(),
                timeout=30.0  # 30 second timeout
            )
            
            # Process message
            data_point = await self._process_message(message)
            if data_point:
                await self._notify_callbacks(data_point)
                
        except asyncio.TimeoutError:
            # No data received - let health monitor handle
            raise ConnectionError("Message receive timeout")
            
        except WebSocketError as e:
            # WebSocket specific errors
            self.logger.error(f"WebSocket error: {e}")
            raise ConnectionError(f"WebSocket failed: {e}")
            
        except ValidationError as e:
            # Data validation errors - don't restart for these
            self.logger.warning(f"Data validation failed: {e}")
            # Continue without raising - these are data issues, not connection issues
            
        except Exception as e:
            # Unexpected errors - trigger restart
            self.logger.error(f"Unexpected error: {e}")
            raise

3. Manager Setup

async def setup_production_system():
    """Setup production collector system."""
    
    # Create manager with appropriate settings
    manager = CollectorManager(
        manager_name="crypto_trading_system",
        global_health_check_interval=60.0,    # Check every minute
        restart_delay=30.0                     # 30s between restarts
    )
    
    # Add primary data sources
    exchanges = ['okx', 'binance', 'coinbase']
    symbols = ['BTC-USDT', 'ETH-USDT', 'SOL-USDT', 'AVAX-USDT']
    
    for exchange in exchanges:
        collector = create_collector(exchange, symbols)
        
        # Configure for production
        config = CollectorConfig(
            name=f"{exchange}_primary",
            exchange=exchange,
            symbols=symbols,
            data_types=["ticker", "trade"],
            auto_restart=True,
            health_check_interval=30.0,
            enabled=True
        )
        
        # Add callbacks for data processing
        collector.add_data_callback(DataType.TICKER, process_ticker_data)
        collector.add_data_callback(DataType.TRADE, process_trade_data)
        
        manager.add_collector(collector, config)
    
    # Start system
    success = await manager.start()
    if not success:
        raise RuntimeError("Failed to start collector system")
    
    return manager

# Usage
async def main():
    manager = await setup_production_system()
    
    # Monitor system health
    while True:
        status = manager.get_status()
        
        if status['statistics']['failed_collectors'] > 0:
            # Alert on failures
            await send_alert(f"Collectors failed: {manager.get_failed_collectors()}")
        
        # Log status every 5 minutes
        await asyncio.sleep(300)

4. Monitoring Integration

# Integrate with monitoring systems
import prometheus_client
from utils.logger import get_logger

class MonitoredCollector(BaseDataCollector):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        
        # Prometheus metrics
        self.messages_counter = prometheus_client.Counter(
            'collector_messages_total',
            'Total messages processed',
            ['exchange', 'symbol', 'type']
        )
        
        self.errors_counter = prometheus_client.Counter(
            'collector_errors_total', 
            'Total errors',
            ['exchange', 'error_type']
        )
        
        self.uptime_gauge = prometheus_client.Gauge(
            'collector_uptime_seconds',
            'Collector uptime',
            ['exchange']
        )
    
    async def _notify_callbacks(self, data_point: MarketDataPoint):
        """Override to add metrics."""
        # Update metrics
        self.messages_counter.labels(
            exchange=data_point.exchange,
            symbol=data_point.symbol,
            type=data_point.data_type.value
        ).inc()
        
        # Update uptime
        status = self.get_status()
        if status['statistics']['uptime_seconds']:
            self.uptime_gauge.labels(
                exchange=self.exchange_name
            ).set(status['statistics']['uptime_seconds'])
        
        # Call parent
        await super()._notify_callbacks(data_point)
    
    async def _handle_connection_error(self) -> bool:
        """Override to add error metrics."""
        self.errors_counter.labels(
            exchange=self.exchange_name,
            error_type='connection'
        ).inc()
        
        return await super()._handle_connection_error()

Troubleshooting

Common Issues

1. Collector Won't Start

Symptoms: start() returns False, status shows ERROR

Solutions:

# Check connection details
collector = MyCollector(symbols=["BTC-USDT"])
success = await collector.start()
if not success:
    status = collector.get_status()
    print(f"Error: {status['statistics']['last_error']}")
    
# Common fixes:
# - Verify API credentials
# - Check network connectivity  
# - Validate symbol names
# - Review exchange-specific requirements

2. Frequent Restarts

Symptoms: High restart count, intermittent data

Solutions:

# Adjust health check intervals
collector = MyCollector(
    symbols=["BTC-USDT"],
    health_check_interval=60.0,  # Increase interval
    auto_restart=True
)

# Check for:
# - Network instability
# - Exchange rate limiting
# - Invalid message formats
# - Resource constraints

3. No Data Received

Symptoms: Collector running but no callbacks triggered

Solutions:

# Check data flow
collector = MyCollector(symbols=["BTC-USDT"])

def debug_callback(data_point):
    print(f"Received: {data_point}")

collector.add_data_callback(DataType.TICKER, debug_callback)

# Verify:
# - Callback registration
# - Symbol subscription
# - Message parsing logic
# - Exchange data availability

4. Memory Leaks

Symptoms: Increasing memory usage over time

Solutions:

# Implement proper cleanup
class CleanCollector(BaseDataCollector):
    async def disconnect(self):
        """Ensure proper cleanup."""
        # Clear buffers
        self.message_buffer.clear()
        
        # Close connections
        if self.websocket:
            await self.websocket.close()
            self.websocket = None
        
        # Clear callbacks
        for callback_list in self._data_callbacks.values():
            callback_list.clear()
        
        await super().disconnect()

Performance Optimization

1. Batch Processing

class BatchingCollector(BaseDataCollector):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.message_batch = []
        self.batch_size = 100
        self.batch_timeout = 1.0
    
    async def _handle_messages(self):
        """Batch process messages for efficiency."""
        message = await self.websocket.receive()
        self.message_batch.append(message)
        
        # Process batch when full or timeout
        if (len(self.message_batch) >= self.batch_size or 
            time.time() - self.last_batch_time > self.batch_timeout):
            await self._process_batch()
    
    async def _process_batch(self):
        """Process messages in batch."""
        batch = self.message_batch.copy()
        self.message_batch.clear()
        self.last_batch_time = time.time()
        
        for message in batch:
            data_point = await self._process_message(message)
            if data_point:
                await self._notify_callbacks(data_point)

2. Connection Pooling

class PooledCollector(BaseDataCollector):
    async def connect(self) -> bool:
        """Use connection pooling for better performance."""
        try:
            # Create connection pool
            self.connection_pool = await aiohttp.ClientSession(
                connector=aiohttp.TCPConnector(
                    limit=10,              # Pool size
                    limit_per_host=5,      # Per-host limit
                    keepalive_timeout=300, # Keep connections alive
                    enable_cleanup_closed=True
                )
            )
            return True
        except Exception:
            return False

Logging and Debugging

Enable Debug Logging

import os
os.environ['LOG_LEVEL'] = 'DEBUG'

# Collector will now log detailed information
collector = MyCollector(symbols=["BTC-USDT"])
await collector.start()

# Check logs in ./logs/ directory
# - collector_debug.log: Debug information
# - collector_info.log: General information  
# - collector_error.log: Error messages

Custom Logging

from utils.logger import get_logger

class CustomCollector(BaseDataCollector):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        
        # Add custom logger
        self.performance_logger = get_logger(
            f"{self.exchange_name}_performance",
            verbose=False
        )
    
    async def _process_message(self, message):
        start_time = time.time()
        
        try:
            result = await super()._process_message(message)
            
            # Log performance
            processing_time = time.time() - start_time
            self.performance_logger.info(
                f"Message processed in {processing_time:.3f}s"
            )
            
            return result
        except Exception as e:
            self.performance_logger.error(
                f"Processing failed after {time.time() - start_time:.3f}s: {e}"
            )
            raise

Integration Examples

Django Integration

# Django management command
from django.core.management.base import BaseCommand
from data import CollectorManager
import asyncio

class Command(BaseCommand):
    help = 'Start crypto data collectors'
    
    def handle(self, *args, **options):
        async def run_collectors():
            manager = CollectorManager("django_collectors")
            
            # Add collectors
            from myapp.collectors import OKXCollector, BinanceCollector
            manager.add_collector(OKXCollector(['BTC-USDT']))
            manager.add_collector(BinanceCollector(['ETH-USDT']))
            
            # Start system
            await manager.start()
            
            # Keep running
            try:
                while True:
                    await asyncio.sleep(60)
                    status = manager.get_status()
                    self.stdout.write(f"Status: {status['statistics']}")
            except KeyboardInterrupt:
                await manager.stop()
        
        asyncio.run(run_collectors())

FastAPI Integration

# FastAPI application
from fastapi import FastAPI
from data import CollectorManager
import asyncio

app = FastAPI()
manager = None

@app.on_event("startup")
async def startup_event():
    global manager
    manager = CollectorManager("fastapi_collectors")
    
    # Add collectors
    from collectors import OKXCollector
    collector = OKXCollector(['BTC-USDT', 'ETH-USDT'])
    manager.add_collector(collector)
    
    # Start in background
    await manager.start()

@app.on_event("shutdown")
async def shutdown_event():
    global manager
    if manager:
        await manager.stop()

@app.get("/collector/status")
async def get_collector_status():
    return manager.get_status()

@app.post("/collector/{name}/restart")
async def restart_collector(name: str):
    success = await manager.restart_collector(name)
    return {"success": success}

Celery Integration

# Celery task
from celery import Celery
from data import CollectorManager
import asyncio

app = Celery('crypto_collectors')

@app.task
def start_data_collection():
    """Start data collection as Celery task."""
    
    async def run():
        manager = CollectorManager("celery_collectors")
        
        # Setup collectors
        from collectors import OKXCollector, BinanceCollector
        manager.add_collector(OKXCollector(['BTC-USDT']))
        manager.add_collector(BinanceCollector(['ETH-USDT']))
        
        # Start and monitor
        await manager.start()
        
        # Run until stopped
        try:
            while True:
                await asyncio.sleep(300)  # 5 minute intervals
                
                # Check health and restart if needed
                failed = manager.get_failed_collectors()
                if failed:
                    print(f"Restarting failed collectors: {failed}")
                    await manager.restart_all_collectors()
                    
        except Exception as e:
            print(f"Collection error: {e}")
        finally:
            await manager.stop()
    
    # Run async task
    asyncio.run(run())

Migration Guide

From Manual Connection Management

Before (manual management):

class OldCollector:
    def __init__(self):
        self.websocket = None
        self.running = False
    
    async def start(self):
        while self.running:
            try:
                self.websocket = await connect()
                await self.listen()
            except Exception as e:
                print(f"Error: {e}")
                await asyncio.sleep(5)  # Manual retry

After (with BaseDataCollector):

class NewCollector(BaseDataCollector):
    def __init__(self):
        super().__init__("exchange", ["BTC-USDT"])
        # Auto-restart and health monitoring included
    
    async def connect(self) -> bool:
        self.websocket = await connect()
        return True
    
    async def _handle_messages(self):
        message = await self.websocket.receive()
        # Error handling and restart logic automatic

From Basic Monitoring

Before (basic monitoring):

# Manual status tracking
status = {
    'connected': False,
    'last_message': None,
    'error_count': 0
}

# Manual health checks
async def health_check():
    if time.time() - status['last_message'] > 300:
        print("No data for 5 minutes!")

After (comprehensive monitoring):

# Automatic health monitoring
collector = MyCollector(["BTC-USDT"])

# Rich status information
status = collector.get_status()
health = collector.get_health_status()

# Automatic alerts and recovery
if not health['is_healthy']:
    print(f"Issues: {health['issues']}")
    # Auto-restart already triggered

Support and Contributing

Getting Help

  1. Check Logs: Review logs in ./logs/ directory
  2. Status Information: Use get_status() and get_health_status() methods
  3. Debug Mode: Set LOG_LEVEL=DEBUG for detailed logging
  4. Test with Demo: Run examples/collector_demo.py to verify setup

Contributing

The data collector system is designed to be extensible. Contributions are welcome for:

  • New exchange implementations
  • Enhanced monitoring features
  • Performance optimizations
  • Additional data types
  • Integration examples

License

This documentation and the associated code are part of the Crypto Trading Bot Platform project.


For more information, see the main project documentation in /docs/.

Exchange Factory System

Overview

The Exchange Factory system provides a standardized way to create data collectors for different exchanges. It implements the factory pattern to abstract the creation logic and provides a consistent interface across all exchanges.

Exchange Registry

The system maintains a registry of supported exchanges and their capabilities:

from data.exchanges import get_supported_exchanges, get_exchange_info

# Get all supported exchanges
exchanges = get_supported_exchanges()
print(f"Supported exchanges: {exchanges}")  # ['okx']

# Get exchange information
okx_info = get_exchange_info('okx')
print(f"OKX pairs: {okx_info['supported_pairs']}")
print(f"OKX data types: {okx_info['supported_data_types']}")

Factory Configuration

from data.exchanges import ExchangeCollectorConfig, ExchangeFactory
from data.base_collector import DataType

# Create configuration
config = ExchangeCollectorConfig(
    exchange='okx',                                    # Exchange name
    symbol='BTC-USDT',                                # Trading pair
    data_types=[DataType.TRADE, DataType.ORDERBOOK],  # Data types
    auto_restart=True,                                # Auto-restart on failures
    health_check_interval=30.0,                       # Health check interval
    store_raw_data=True,                              # Store raw data for debugging
    custom_params={                                   # Exchange-specific parameters
        'ping_interval': 25.0,
        'max_reconnect_attempts': 5
    }
)

# Validate configuration
is_valid = ExchangeFactory.validate_config(config)
if is_valid:
    collector = ExchangeFactory.create_collector(config)

Exchange Capabilities

Query what each exchange supports:

from data.exchanges import ExchangeFactory

# Get supported trading pairs
okx_pairs = ExchangeFactory.get_supported_pairs('okx')
print(f"OKX supports: {okx_pairs}")

# Get supported data types  
okx_data_types = ExchangeFactory.get_supported_data_types('okx')
print(f"OKX data types: {okx_data_types}")

Convenience Functions

Each exchange provides convenience functions for easy collector creation:

from data.exchanges import create_okx_collector

# Quick OKX collector creation
collector = create_okx_collector(
    symbol='BTC-USDT',
    data_types=[DataType.TRADE, DataType.ORDERBOOK],
    auto_restart=True
)

OKX Implementation

OKX Collector Features

The OKX collector provides:

  • Real-time Data: Live trades, orderbook, and ticker data
  • Single Pair Focus: Each collector handles one trading pair for better isolation
  • Ping/Pong Management: OKX-specific keepalive mechanism with proper format
  • Raw Data Storage: Optional storage of raw OKX messages for debugging
  • Connection Resilience: Robust reconnection logic for OKX WebSocket

OKX Usage Examples

# Direct OKX collector usage
from data.exchanges.okx import OKXCollector
from data.base_collector import DataType

collector = OKXCollector(
    symbol='BTC-USDT',
    data_types=[DataType.TRADE, DataType.ORDERBOOK],
    auto_restart=True,
    health_check_interval=30.0,
    store_raw_data=True
)

# Factory pattern usage
from data.exchanges import create_okx_collector

collector = create_okx_collector(
    symbol='BTC-USDT',
    data_types=[DataType.TRADE, DataType.ORDERBOOK]
)

# Multiple collectors
from data.exchanges import ExchangeFactory, ExchangeCollectorConfig

configs = [
    ExchangeCollectorConfig('okx', 'BTC-USDT', [DataType.TRADE]),
    ExchangeCollectorConfig('okx', 'ETH-USDT', [DataType.ORDERBOOK])
]

collectors = ExchangeFactory.create_multiple_collectors(configs)

OKX Data Processing

The OKX collector processes three main data types:

Trade Data

# OKX trade message format
{
    "arg": {"channel": "trades", "instId": "BTC-USDT"},
    "data": [{
        "tradeId": "12345678",
        "px": "50000.5",      # Price
        "sz": "0.001",        # Size
        "side": "buy",        # Side (buy/sell)
        "ts": "1697123456789" # Timestamp (ms)
    }]
}

Orderbook Data

# OKX orderbook message format (books5)
{
    "arg": {"channel": "books5", "instId": "BTC-USDT"},
    "data": [{
        "asks": [["50001.0", "0.5", "0", "3"]],  # [price, size, liquidated, orders]
        "bids": [["50000.0", "0.8", "0", "2"]],
        "ts": "1697123456789"
    }]
}

Ticker Data

# OKX ticker message format
{
    "arg": {"channel": "tickers", "instId": "BTC-USDT"},
    "data": [{
        "last": "50000.5",      # Last price
        "askPx": "50001.0",     # Best ask price
        "bidPx": "50000.0",     # Best bid price
        "open24h": "49500.0",   # 24h open
        "high24h": "50500.0",   # 24h high
        "low24h": "49000.0",    # 24h low
        "vol24h": "1234.567",   # 24h volume
        "ts": "1697123456789"
    }]
}

For comprehensive OKX documentation, see OKX Collector Documentation.