TCPDashboard/docs/components/data_collectors.md

1322 lines
44 KiB
Markdown
Raw Normal View History

# Data Collector System Documentation
## Overview
The Data Collector System provides a robust, scalable framework for collecting real-time market data from cryptocurrency exchanges. It features comprehensive health monitoring, automatic recovery, centralized management, and a modular exchange-based architecture designed for production trading environments.
2025-06-03 12:08:43 +08:00
This documentation covers the **core collector components**. For the high-level service layer that orchestrates these collectors, see [Data Collection Service](../services/data_collection_service.md).
## Key Features
### 🏗️ **Modular Exchange Architecture**
- **Exchange-Based Organization**: Each exchange has its own implementation folder
- **Factory Pattern**: Easy creation of collectors from any supported exchange
- **Standardized Interface**: Consistent API across all exchange implementations
- **Scalable Design**: Easy addition of new exchanges (Binance, Coinbase, etc.)
### 🔄 **Auto-Recovery & Health Monitoring**
- **Heartbeat System**: Continuous health monitoring with configurable intervals
- **Auto-Restart**: Automatic restart on failures with exponential backoff
- **Connection Recovery**: Robust reconnection logic for network interruptions
- **Data Freshness Monitoring**: Detects stale data and triggers recovery
### 🎛️ **Centralized Management**
- **CollectorManager**: Supervises multiple collectors with coordinated lifecycle
- **Dynamic Control**: Enable/disable collectors at runtime without system restart
- **Global Health Checks**: System-wide monitoring and alerting
- **Graceful Shutdown**: Proper cleanup and resource management
### 📊 **Comprehensive Monitoring**
- **Real-time Status**: Detailed status reporting for all collectors
- **Performance Metrics**: Message counts, uptime, error rates, restart counts
- **Health Analytics**: Connection state, data freshness, error tracking
2025-06-03 12:08:43 +08:00
- **Conditional Logging**: Enhanced logging with configurable verbosity (see [Logging System](logging.md))
- **Multi-Timeframe Support**: Sub-second to daily candle aggregation (1s, 5s, 10s, 15s, 30s, 1m, 5m, 15m, 1h, 4h, 1d)
### 🛢️ **Database Integration**
- **Repository Pattern**: All database operations use the centralized `database/operations.py` module
- **No Raw SQL**: Clean API through `MarketDataRepository` and `RawTradeRepository` classes
- **Automatic Transaction Management**: Sessions, commits, and rollbacks handled automatically
- **Configurable Duplicate Handling**: `force_update_candles` parameter controls duplicate behavior
- **Real-time Storage**: Completed candles automatically saved to `market_data` table
- **Raw Data Storage**: Optional raw WebSocket data storage via `RawTradeRepository`
- **Custom Error Handling**: Proper exception handling with `DatabaseOperationError`
- **Health Monitoring**: Built-in database health checks and statistics
- **Connection Pooling**: Efficient database connection management through repositories
## Architecture
```
┌─────────────────────────────────────────────────────────────┐
│ CollectorManager │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Global Health Monitor │ │
│ │ • System-wide health checks │ │
│ │ • Auto-restart coordination │ │
│ │ • Performance analytics │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ │
│ ┌─────────────────┐ ┌─────────────────┐ ┌──────────────┐ │
│ │ OKX Collector │ │Binance Collector│ │ Custom │ │
│ │ │ │ │ │ Collector │ │
│ │ • Health Monitor│ │ • Health Monitor│ │ • Health Mon │ │
│ │ • Auto-restart │ │ • Auto-restart │ │ • Auto-resta │ │
│ │ • Data Validate │ │ • Data Validate │ │ • Data Valid │ │
│ └─────────────────┘ └─────────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────────┘
┌─────────────────┐
│ Data Output │
│ │
│ • Callbacks │
│ • Redis Pub/Sub │
│ • Database │
└─────────────────┘
```
### Exchange Module Structure
2025-06-03 12:08:43 +08:00
The modular architecture organizes exchange implementations:
```
data/
├── base_collector.py # Abstract base classes
├── collector_manager.py # Cross-platform collector manager
├── aggregator.py # Cross-exchange data aggregation
├── exchanges/ # Exchange-specific implementations
│ ├── __init__.py # Main exports and factory
│ ├── registry.py # Exchange registry and capabilities
│ ├── factory.py # Factory pattern for collectors
│ └── okx/ # OKX implementation
│ ├── __init__.py # OKX exports
│ ├── collector.py # OKXCollector class
│ └── websocket.py # OKXWebSocketClient class
│ └── binance/ # Future: Binance implementation
│ ├── __init__.py
│ ├── collector.py
│ └── websocket.py
```
## Quick Start
### 1. Using Exchange Factory (Recommended)
```python
import asyncio
from data.exchanges import ExchangeFactory, ExchangeCollectorConfig, create_okx_collector
from data.base_collector import DataType
2025-06-03 12:08:43 +08:00
from utils.logger import get_logger
async def main():
2025-06-03 12:08:43 +08:00
# Create logger for the collector
logger = get_logger('okx_collector', verbose=True)
# Method 1: Using factory with configuration
config = ExchangeCollectorConfig(
exchange='okx',
symbol='BTC-USDT',
data_types=[DataType.TRADE, DataType.ORDERBOOK],
auto_restart=True,
health_check_interval=30.0,
store_raw_data=True
)
2025-06-03 12:08:43 +08:00
collector = ExchangeFactory.create_collector(config, logger=logger)
# Method 2: Using convenience function
okx_collector = create_okx_collector(
symbol='ETH-USDT',
2025-06-03 12:08:43 +08:00
data_types=[DataType.TRADE, DataType.ORDERBOOK],
logger=logger
)
# Add data callback
def on_trade_data(data_point):
print(f"Trade: {data_point.symbol} - {data_point.data}")
collector.add_data_callback(DataType.TRADE, on_trade_data)
# Start collector
await collector.start()
# Let it run
await asyncio.sleep(60)
# Stop collector
await collector.stop()
asyncio.run(main())
```
2025-06-03 12:08:43 +08:00
### 2. Creating Multiple Collectors with Manager
```python
import asyncio
from data.exchanges import ExchangeFactory, ExchangeCollectorConfig
from data.base_collector import DataType
2025-06-03 12:08:43 +08:00
from data.collector_manager import CollectorManager
from utils.logger import get_logger
async def main():
2025-06-03 12:08:43 +08:00
# Create manager with logging
manager_logger = get_logger('collector_manager', verbose=True)
manager = CollectorManager(logger=manager_logger)
# Create multiple collectors using factory
configs = [
ExchangeCollectorConfig('okx', 'BTC-USDT', [DataType.TRADE, DataType.ORDERBOOK]),
ExchangeCollectorConfig('okx', 'ETH-USDT', [DataType.TRADE]),
ExchangeCollectorConfig('okx', 'SOL-USDT', [DataType.ORDERBOOK])
]
2025-06-03 12:08:43 +08:00
# Create collectors with individual loggers
for config in configs:
collector_logger = get_logger(f'okx_{config.symbol.lower().replace("-", "_")}')
collector = ExchangeFactory.create_collector(config, logger=collector_logger)
manager.add_collector(collector)
2025-06-03 12:08:43 +08:00
print(f"Created {len(manager.list_collectors())} collectors")
# Start all collectors
2025-06-03 12:08:43 +08:00
await manager.start()
# Monitor
await asyncio.sleep(60)
# Stop all
2025-06-03 12:08:43 +08:00
await manager.stop()
asyncio.run(main())
```
## API Reference
### BaseDataCollector
The abstract base class that all data collectors must inherit from.
#### Constructor
```python
def __init__(self,
exchange_name: str,
symbols: List[str],
data_types: Optional[List[DataType]] = None,
component_name: Optional[str] = None,
auto_restart: bool = True,
2025-06-03 12:08:43 +08:00
health_check_interval: float = 30.0,
logger: Optional[logging.Logger] = None,
log_errors_only: bool = False)
```
**Parameters:**
- `exchange_name`: Name of the exchange (e.g., 'okx', 'binance')
- `symbols`: List of trading symbols to collect data for
- `data_types`: Types of data to collect (default: [DataType.CANDLE])
- `component_name`: Name for logging (default: based on exchange_name)
- `auto_restart`: Enable automatic restart on failures (default: True)
- `health_check_interval`: Seconds between health checks (default: 30.0)
2025-06-03 12:08:43 +08:00
- `logger`: Logger instance for conditional logging (default: None)
- `log_errors_only`: Only log error-level messages (default: False)
#### Abstract Methods
Must be implemented by subclasses:
```python
async def connect(self) -> bool
async def disconnect(self) -> None
async def subscribe_to_data(self, symbols: List[str], data_types: List[DataType]) -> bool
async def unsubscribe_from_data(self, symbols: List[str], data_types: List[DataType]) -> bool
async def _process_message(self, message: Any) -> Optional[MarketDataPoint]
async def _handle_messages(self) -> None
```
#### Public Methods
```python
async def start() -> bool # Start the collector
async def stop(force: bool = False) -> None # Stop the collector
async def restart() -> bool # Restart the collector
# Callback management
def add_data_callback(self, data_type: DataType, callback: Callable) -> None
def remove_data_callback(self, data_type: DataType, callback: Callable) -> None
# Symbol management
def add_symbol(self, symbol: str) -> None
def remove_symbol(self, symbol: str) -> None
# Status and monitoring
def get_status(self) -> Dict[str, Any]
def get_health_status(self) -> Dict[str, Any]
# Data validation
def validate_ohlcv_data(self, data: Dict[str, Any], symbol: str, timeframe: str) -> OHLCVData
```
2025-06-03 12:08:43 +08:00
#### Conditional Logging Methods
All collectors support conditional logging (see [Logging System](logging.md) for details):
```python
def _log_debug(self, message: str) -> None # Debug messages (if not errors-only)
def _log_info(self, message: str) -> None # Info messages (if not errors-only)
def _log_warning(self, message: str) -> None # Warning messages (if not errors-only)
def _log_error(self, message: str, exc_info: bool = False) -> None # Always logged
def _log_critical(self, message: str, exc_info: bool = False) -> None # Always logged
```
#### Status Information
The `get_status()` method returns comprehensive status information:
```python
{
'exchange': 'okx',
'status': 'running', # Current status
'should_be_running': True, # Desired state
'symbols': ['BTC-USDT', 'ETH-USDT'], # Configured symbols
'data_types': ['ticker'], # Data types being collected
'auto_restart': True, # Auto-restart enabled
'health': {
'time_since_heartbeat': 5.2, # Seconds since last heartbeat
'time_since_data': 2.1, # Seconds since last data
'max_silence_duration': 300.0 # Max allowed silence
},
'statistics': {
'messages_received': 1250, # Total messages received
'messages_processed': 1248, # Successfully processed
'errors': 2, # Error count
'restarts': 1, # Restart count
'uptime_seconds': 3600.5, # Current uptime
'reconnect_attempts': 0, # Current reconnect attempts
'last_message_time': '2023-...', # ISO timestamp
'connection_uptime': '2023-...', # Connection start time
'last_error': 'Connection failed', # Last error message
'last_restart_time': '2023-...' # Last restart time
}
}
```
#### Health Status
The `get_health_status()` method provides detailed health information:
```python
{
'is_healthy': True, # Overall health status
'issues': [], # List of current issues
'status': 'running', # Current collector status
'last_heartbeat': '2023-...', # Last heartbeat timestamp
'last_data_received': '2023-...', # Last data timestamp
'should_be_running': True, # Expected state
'is_running': True # Actual running state
}
```
### CollectorManager
Manages multiple data collectors with coordinated lifecycle and health monitoring.
#### Constructor
```python
def __init__(self,
manager_name: str = "collector_manager",
global_health_check_interval: float = 60.0,
2025-06-03 12:08:43 +08:00
restart_delay: float = 5.0,
logger: Optional[logging.Logger] = None,
log_errors_only: bool = False)
```
2025-06-03 12:08:43 +08:00
**Parameters:**
- `manager_name`: Name for the manager (used in logging)
- `global_health_check_interval`: Seconds between global health checks
- `restart_delay`: Delay between restart attempts
- `logger`: Logger instance for conditional logging (default: None)
- `log_errors_only`: Only log error-level messages (default: False)
#### Public Methods
```python
# Collector management
def add_collector(self, collector: BaseDataCollector, config: Optional[CollectorConfig] = None) -> None
def remove_collector(self, collector_name: str) -> bool
def enable_collector(self, collector_name: str) -> bool
def disable_collector(self, collector_name: str) -> bool
# Lifecycle management
async def start() -> bool
async def stop() -> None
async def restart_collector(self, collector_name: str) -> bool
async def restart_all_collectors(self) -> Dict[str, bool]
# Status and monitoring
def get_status(self) -> Dict[str, Any]
def get_collector_status(self, collector_name: str) -> Optional[Dict[str, Any]]
def list_collectors(self) -> List[str]
def get_running_collectors(self) -> List[str]
def get_failed_collectors(self) -> List[str]
```
### CollectorConfig
Configuration dataclass for collectors:
```python
@dataclass
class CollectorConfig:
name: str # Unique collector name
exchange: str # Exchange name
symbols: List[str] # Trading symbols
data_types: List[str] # Data types to collect
auto_restart: bool = True # Enable auto-restart
health_check_interval: float = 30.0 # Health check interval
enabled: bool = True # Initially enabled
```
### Data Types
#### DataType Enum
```python
class DataType(Enum):
TICKER = "ticker" # Price and volume updates
TRADE = "trade" # Individual trade executions
ORDERBOOK = "orderbook" # Order book snapshots
CANDLE = "candle" # OHLCV candle data
BALANCE = "balance" # Account balance updates
```
#### MarketDataPoint
Standardized market data structure:
```python
@dataclass
class MarketDataPoint:
exchange: str # Exchange name
symbol: str # Trading symbol
timestamp: datetime # Data timestamp (UTC)
data_type: DataType # Type of data
data: Dict[str, Any] # Raw data payload
```
#### OHLCVData
OHLCV (candlestick) data structure with validation:
```python
@dataclass
class OHLCVData:
symbol: str # Trading symbol
timeframe: str # Timeframe (1m, 5m, 1h, etc.)
timestamp: datetime # Candle timestamp
open: Decimal # Opening price
high: Decimal # Highest price
low: Decimal # Lowest price
close: Decimal # Closing price
volume: Decimal # Trading volume
trades_count: Optional[int] = None # Number of trades
```
## Health Monitoring
### Monitoring Levels
The system provides multi-level health monitoring:
1. **Individual Collector Health**
- Heartbeat monitoring (message loop activity)
- Data freshness (time since last data received)
- Connection state monitoring
- Error rate tracking
2. **Manager-Level Health**
- Global health checks across all collectors
- Coordinated restart management
- System-wide performance metrics
- Resource utilization monitoring
### Health Check Intervals
- **Individual Collector**: Configurable per collector (default: 30s)
- **Global Manager**: Configurable for manager (default: 60s)
- **Heartbeat Updates**: Updated with each message loop iteration
- **Data Freshness**: Updated when data is received
### Auto-Restart Triggers
Collectors are automatically restarted when:
1. **No Heartbeat**: Message loop becomes unresponsive
2. **Stale Data**: No data received within configured timeout
3. **Connection Failures**: WebSocket or API connection lost
4. **Error Status**: Collector enters ERROR or UNHEALTHY state
5. **Manual Trigger**: Explicit restart request
### Failure Handling
```python
2025-06-03 12:08:43 +08:00
# Configure failure handling with conditional logging
from utils.logger import get_logger
logger = get_logger('my_collector', verbose=True)
collector = MyCollector(
symbols=["BTC-USDT"],
auto_restart=True, # Enable auto-restart
2025-06-03 12:08:43 +08:00
health_check_interval=30.0, # Check every 30 seconds
logger=logger, # Enable logging
log_errors_only=False # Log all levels
)
# The collector will automatically:
# 1. Detect failures within 30 seconds
# 2. Attempt reconnection with exponential backoff
# 3. Restart up to 5 times (configurable)
2025-06-03 12:08:43 +08:00
# 4. Log all recovery attempts (if logger provided)
# 5. Report status to manager
```
## Configuration
### Environment Variables
The system respects these environment variables:
```bash
2025-06-03 12:08:43 +08:00
# Logging configuration (see logging.md for details)
VERBOSE_LOGGING=true # Enable console logging
LOG_TO_CONSOLE=true # Alternative verbose setting
# Health monitoring
DEFAULT_HEALTH_CHECK_INTERVAL=30 # Default health check interval (seconds)
MAX_SILENCE_DURATION=300 # Max time without data (seconds)
MAX_RECONNECT_ATTEMPTS=5 # Maximum reconnection attempts
RECONNECT_DELAY=5 # Delay between reconnect attempts (seconds)
```
### Programmatic Configuration
```python
2025-06-03 12:08:43 +08:00
from utils.logger import get_logger
# Configure individual collector with conditional logging
logger = get_logger('custom_collector', verbose=True)
collector = MyCollector(
exchange_name="custom_exchange",
symbols=["BTC-USDT", "ETH-USDT"],
data_types=[DataType.TICKER, DataType.TRADE],
auto_restart=True,
2025-06-03 12:08:43 +08:00
health_check_interval=15.0, # Check every 15 seconds
logger=logger, # Enable logging
log_errors_only=False # Log all message types
)
2025-06-03 12:08:43 +08:00
# Configure manager with conditional logging
manager_logger = get_logger('production_manager', verbose=False)
manager = CollectorManager(
manager_name="production_manager",
global_health_check_interval=30.0, # Global checks every 30s
2025-06-03 12:08:43 +08:00
restart_delay=10.0, # 10s delay between restarts
logger=manager_logger, # Manager logging
log_errors_only=True # Only log errors for manager
)
# Configure specific collector in manager
config = CollectorConfig(
name="primary_okx",
exchange="okx",
symbols=["BTC-USDT", "ETH-USDT", "SOL-USDT"],
data_types=["ticker", "trade", "orderbook"],
auto_restart=True,
health_check_interval=20.0,
enabled=True
)
manager.add_collector(collector, config)
```
## Best Practices
2025-06-03 12:08:43 +08:00
### 1. Collector Implementation with Conditional Logging
```python
2025-06-03 12:08:43 +08:00
from utils.logger import get_logger
from data.base_collector import BaseDataCollector, DataType
class ProductionCollector(BaseDataCollector):
2025-06-03 12:08:43 +08:00
def __init__(self, exchange_name: str, symbols: list, logger=None):
super().__init__(
exchange_name=exchange_name,
symbols=symbols,
data_types=[DataType.TICKER, DataType.TRADE],
auto_restart=True, # Always enable auto-restart
2025-06-03 12:08:43 +08:00
health_check_interval=30.0, # Reasonable interval
logger=logger, # Pass logger for conditional logging
log_errors_only=False # Log all levels
)
# Connection management
self.connection_pool = None
self.rate_limiter = RateLimiter(100, 60) # 100 requests per minute
# Data validation
self.data_validator = DataValidator()
# Performance monitoring
self.metrics = MetricsCollector()
async def connect(self) -> bool:
"""Implement robust connection logic."""
try:
2025-06-03 12:08:43 +08:00
self._log_info("Establishing connection to exchange")
# Use connection pooling for reliability
self.connection_pool = await create_connection_pool(
self.exchange_name,
max_connections=5,
retry_attempts=3
)
# Test connection
await self.connection_pool.ping()
2025-06-03 12:08:43 +08:00
self._log_info("Connection established successfully")
return True
except Exception as e:
2025-06-03 12:08:43 +08:00
self._log_error(f"Connection failed: {e}", exc_info=True)
return False
async def _process_message(self, message) -> Optional[MarketDataPoint]:
"""Implement thorough data processing."""
try:
# Rate limiting
await self.rate_limiter.acquire()
# Data validation
if not self.data_validator.validate(message):
2025-06-03 12:08:43 +08:00
self._log_warning(f"Invalid message format received")
return None
# Metrics collection
self.metrics.increment('messages_processed')
2025-06-03 12:08:43 +08:00
# Log detailed processing (only if not errors-only)
self._log_debug(f"Processing message for {message.get('symbol', 'unknown')}")
# Create standardized data point
2025-06-03 12:08:43 +08:00
data_point = MarketDataPoint(
exchange=self.exchange_name,
symbol=message['symbol'],
timestamp=self._parse_timestamp(message['timestamp']),
data_type=DataType.TICKER,
data=self._normalize_data(message)
)
2025-06-03 12:08:43 +08:00
self._log_debug(f"Successfully processed data point for {data_point.symbol}")
return data_point
except Exception as e:
self.metrics.increment('processing_errors')
2025-06-03 12:08:43 +08:00
self._log_error(f"Message processing failed: {e}", exc_info=True)
raise # Let health monitor handle it
```
### 2. Error Handling
```python
2025-06-03 12:08:43 +08:00
# Implement proper error handling with conditional logging
class RobustCollector(BaseDataCollector):
async def _handle_messages(self) -> None:
"""Handle messages with proper error management."""
try:
# Check connection health
if not await self._check_connection_health():
raise ConnectionError("Connection health check failed")
# Receive message with timeout
message = await asyncio.wait_for(
self.websocket.receive(),
timeout=30.0 # 30 second timeout
)
# Process message
data_point = await self._process_message(message)
if data_point:
await self._notify_callbacks(data_point)
except asyncio.TimeoutError:
# No data received - let health monitor handle
2025-06-03 12:08:43 +08:00
self._log_warning("Message receive timeout")
raise ConnectionError("Message receive timeout")
except WebSocketError as e:
# WebSocket specific errors
2025-06-03 12:08:43 +08:00
self._log_error(f"WebSocket error: {e}")
raise ConnectionError(f"WebSocket failed: {e}")
except ValidationError as e:
# Data validation errors - don't restart for these
2025-06-03 12:08:43 +08:00
self._log_warning(f"Data validation failed: {e}")
# Continue without raising - these are data issues, not connection issues
except Exception as e:
# Unexpected errors - trigger restart
2025-06-03 12:08:43 +08:00
self._log_error(f"Unexpected error in message handling: {e}", exc_info=True)
raise
```
2025-06-03 12:08:43 +08:00
### 3. Manager Setup with Hierarchical Logging
```python
2025-06-03 12:08:43 +08:00
from utils.logger import get_logger
async def setup_production_system():
2025-06-03 12:08:43 +08:00
"""Setup production collector system with conditional logging."""
2025-06-03 12:08:43 +08:00
# Create manager with its own logger
manager_logger = get_logger('crypto_trading_system', verbose=True)
manager = CollectorManager(
manager_name="crypto_trading_system",
global_health_check_interval=60.0, # Check every minute
2025-06-03 12:08:43 +08:00
restart_delay=30.0, # 30s between restarts
logger=manager_logger, # Manager logging
log_errors_only=False # Log all levels for manager
)
2025-06-03 12:08:43 +08:00
# Add primary data sources with individual loggers
exchanges = ['okx', 'binance', 'coinbase']
symbols = ['BTC-USDT', 'ETH-USDT', 'SOL-USDT', 'AVAX-USDT']
for exchange in exchanges:
2025-06-03 12:08:43 +08:00
# Create individual logger for each exchange
exchange_logger = get_logger(f'{exchange}_collector', verbose=True)
collector = create_collector(
exchange,
symbols,
logger=exchange_logger # Individual collector logging
)
# Configure for production
config = CollectorConfig(
name=f"{exchange}_primary",
exchange=exchange,
symbols=symbols,
data_types=["ticker", "trade"],
auto_restart=True,
health_check_interval=30.0,
enabled=True
)
# Add callbacks for data processing
collector.add_data_callback(DataType.TICKER, process_ticker_data)
collector.add_data_callback(DataType.TRADE, process_trade_data)
manager.add_collector(collector, config)
# Start system
success = await manager.start()
if not success:
raise RuntimeError("Failed to start collector system")
return manager
# Usage
async def main():
manager = await setup_production_system()
# Monitor system health
while True:
status = manager.get_status()
if status['statistics']['failed_collectors'] > 0:
# Alert on failures
await send_alert(f"Collectors failed: {manager.get_failed_collectors()}")
2025-06-03 12:08:43 +08:00
# Log status every 5 minutes (if manager has logging enabled)
await asyncio.sleep(300)
```
### 4. Monitoring Integration
```python
2025-06-03 12:08:43 +08:00
# Integrate with monitoring systems and conditional logging
import prometheus_client
from utils.logger import get_logger
class MonitoredCollector(BaseDataCollector):
def __init__(self, *args, **kwargs):
2025-06-03 12:08:43 +08:00
# Extract logger before passing to parent
logger = kwargs.get('logger')
super().__init__(*args, **kwargs)
# Prometheus metrics
self.messages_counter = prometheus_client.Counter(
'collector_messages_total',
'Total messages processed',
['exchange', 'symbol', 'type']
)
self.errors_counter = prometheus_client.Counter(
'collector_errors_total',
'Total errors',
['exchange', 'error_type']
)
self.uptime_gauge = prometheus_client.Gauge(
'collector_uptime_seconds',
'Collector uptime',
['exchange']
)
async def _notify_callbacks(self, data_point: MarketDataPoint):
"""Override to add metrics."""
# Update metrics
self.messages_counter.labels(
exchange=data_point.exchange,
symbol=data_point.symbol,
type=data_point.data_type.value
).inc()
# Update uptime
status = self.get_status()
if status['statistics']['uptime_seconds']:
self.uptime_gauge.labels(
exchange=self.exchange_name
).set(status['statistics']['uptime_seconds'])
2025-06-03 12:08:43 +08:00
# Log metrics update (only if debug logging enabled)
self._log_debug(f"Updated metrics for {data_point.symbol}")
# Call parent
await super()._notify_callbacks(data_point)
async def _handle_connection_error(self) -> bool:
"""Override to add error metrics."""
self.errors_counter.labels(
exchange=self.exchange_name,
error_type='connection'
).inc()
2025-06-03 12:08:43 +08:00
# Always log connection errors
self._log_error("Connection error occurred")
return await super()._handle_connection_error()
```
## Troubleshooting
### Common Issues
#### 1. Collector Won't Start
**Symptoms**: `start()` returns `False`, status shows `ERROR`
**Solutions**:
```python
2025-06-03 12:08:43 +08:00
# Check connection details with debugging
from utils.logger import get_logger
debug_logger = get_logger('debug_collector', verbose=True)
collector = MyCollector(symbols=["BTC-USDT"], logger=debug_logger)
success = await collector.start()
if not success:
status = collector.get_status()
print(f"Error: {status['statistics']['last_error']}")
# Common fixes:
# - Verify API credentials
# - Check network connectivity
# - Validate symbol names
# - Review exchange-specific requirements
```
#### 2. Frequent Restarts
**Symptoms**: High restart count, intermittent data
**Solutions**:
```python
2025-06-03 12:08:43 +08:00
# Adjust health check intervals and enable detailed logging
logger = get_logger('troubleshoot_collector', verbose=True)
collector = MyCollector(
symbols=["BTC-USDT"],
health_check_interval=60.0, # Increase interval
2025-06-03 12:08:43 +08:00
auto_restart=True,
logger=logger, # Enable detailed logging
log_errors_only=False # Log all message types
)
# Check for:
# - Network instability
# - Exchange rate limiting
# - Invalid message formats
# - Resource constraints
```
#### 3. No Data Received
**Symptoms**: Collector running but no callbacks triggered
**Solutions**:
```python
2025-06-03 12:08:43 +08:00
# Check data flow with debug logging
logger = get_logger('data_debug', verbose=True)
collector = MyCollector(symbols=["BTC-USDT"], logger=logger)
def debug_callback(data_point):
print(f"Received: {data_point}")
collector.add_data_callback(DataType.TICKER, debug_callback)
# Verify:
# - Callback registration
# - Symbol subscription
# - Message parsing logic
# - Exchange data availability
```
#### 4. Memory Leaks
**Symptoms**: Increasing memory usage over time
**Solutions**:
```python
2025-06-03 12:08:43 +08:00
# Implement proper cleanup with logging
class CleanCollector(BaseDataCollector):
async def disconnect(self):
"""Ensure proper cleanup."""
2025-06-03 12:08:43 +08:00
self._log_info("Starting cleanup process")
# Clear buffers
2025-06-03 12:08:43 +08:00
if hasattr(self, 'message_buffer'):
self.message_buffer.clear()
self._log_debug("Cleared message buffer")
# Close connections
if self.websocket:
await self.websocket.close()
self.websocket = None
2025-06-03 12:08:43 +08:00
self._log_debug("Closed WebSocket connection")
# Clear callbacks
for callback_list in self._data_callbacks.values():
callback_list.clear()
2025-06-03 12:08:43 +08:00
self._log_debug("Cleared callbacks")
await super().disconnect()
2025-06-03 12:08:43 +08:00
self._log_info("Cleanup completed")
```
2025-06-03 12:08:43 +08:00
## Exchange Factory System
2025-06-03 12:08:43 +08:00
### Overview
The Exchange Factory system provides a standardized way to create data collectors for different exchanges. It implements the factory pattern to abstract the creation logic and provides a consistent interface across all exchanges.
### Exchange Registry
The system maintains a registry of supported exchanges and their capabilities:
```python
2025-06-03 12:08:43 +08:00
from data.exchanges import get_supported_exchanges, get_exchange_info
# Get all supported exchanges
exchanges = get_supported_exchanges()
print(f"Supported exchanges: {exchanges}") # ['okx']
# Get exchange information
okx_info = get_exchange_info('okx')
print(f"OKX pairs: {okx_info['supported_pairs']}")
print(f"OKX data types: {okx_info['supported_data_types']}")
```
2025-06-03 12:08:43 +08:00
### Factory Configuration
```python
2025-06-03 12:08:43 +08:00
from data.exchanges import ExchangeCollectorConfig, ExchangeFactory
from data.base_collector import DataType
from utils.logger import get_logger
# Create configuration with conditional logging
logger = get_logger('factory_collector', verbose=True)
config = ExchangeCollectorConfig(
exchange='okx', # Exchange name
symbol='BTC-USDT', # Trading pair
data_types=[DataType.TRADE, DataType.ORDERBOOK], # Data types
auto_restart=True, # Auto-restart on failures
health_check_interval=30.0, # Health check interval
store_raw_data=True, # Store raw data for debugging
custom_params={ # Exchange-specific parameters
'ping_interval': 25.0,
'max_reconnect_attempts': 5
}
)
# Validate configuration
is_valid = ExchangeFactory.validate_config(config)
if is_valid:
collector = ExchangeFactory.create_collector(config, logger=logger)
```
### Exchange Capabilities
Query what each exchange supports:
```python
from data.exchanges import ExchangeFactory
# Get supported trading pairs
okx_pairs = ExchangeFactory.get_supported_pairs('okx')
print(f"OKX supports: {okx_pairs}")
# Get supported data types
okx_data_types = ExchangeFactory.get_supported_data_types('okx')
print(f"OKX data types: {okx_data_types}")
```
2025-06-03 12:08:43 +08:00
### Convenience Functions
2025-06-03 12:08:43 +08:00
Each exchange provides convenience functions for easy collector creation:
```python
2025-06-03 12:08:43 +08:00
from data.exchanges import create_okx_collector
from utils.logger import get_logger
2025-06-03 12:08:43 +08:00
# Quick OKX collector creation with logging
logger = get_logger('okx_btc_usdt', verbose=True)
2025-06-03 12:08:43 +08:00
collector = create_okx_collector(
symbol='BTC-USDT',
data_types=[DataType.TRADE, DataType.ORDERBOOK],
auto_restart=True,
logger=logger
)
```
2025-06-03 12:08:43 +08:00
## OKX Implementation
### OKX Collector Features
The OKX collector provides:
- **Real-time Data**: Live trades, orderbook, and ticker data
- **Single Pair Focus**: Each collector handles one trading pair for better isolation
- **Ping/Pong Management**: OKX-specific keepalive mechanism with proper format
- **Raw Data Storage**: Optional storage of raw OKX messages for debugging
- **Connection Resilience**: Robust reconnection logic for OKX WebSocket
- **Conditional Logging**: Full integration with the logging system
### OKX Usage Examples
```python
from utils.logger import get_logger
2025-06-03 12:08:43 +08:00
# Direct OKX collector usage with conditional logging
logger = get_logger('okx_collector', verbose=True)
collector = OKXCollector(
symbol='BTC-USDT',
data_types=[DataType.TRADE, DataType.ORDERBOOK],
auto_restart=True,
health_check_interval=30.0,
store_raw_data=True,
logger=logger, # Enable logging
log_errors_only=False # Log all levels
)
# Factory pattern usage with error-only logging
error_logger = get_logger('okx_critical', verbose=False)
collector = create_okx_collector(
symbol='BTC-USDT',
data_types=[DataType.TRADE, DataType.ORDERBOOK],
logger=error_logger,
log_errors_only=True # Only log errors
)
# Multiple collectors with different logging strategies
configs = [
ExchangeCollectorConfig('okx', 'BTC-USDT', [DataType.TRADE]),
ExchangeCollectorConfig('okx', 'ETH-USDT', [DataType.ORDERBOOK])
]
collectors = []
for config in configs:
# Different logging for each collector
if config.symbol == 'BTC-USDT':
logger = get_logger('okx_btc', verbose=True) # Full logging
else:
logger = get_logger('okx_eth', verbose=False, log_errors_only=True) # Errors only
2025-06-03 12:08:43 +08:00
collector = ExchangeFactory.create_collector(config, logger=logger)
collectors.append(collector)
```
2025-06-03 12:08:43 +08:00
### OKX Data Processing
The OKX collector processes three main data types:
#### Trade Data
```python
# OKX trade message format
{
"arg": {"channel": "trades", "instId": "BTC-USDT"},
"data": [{
"tradeId": "12345678",
"px": "50000.5", # Price
"sz": "0.001", # Size
"side": "buy", # Side (buy/sell)
"ts": "1697123456789" # Timestamp (ms)
}]
}
```
#### Orderbook Data
```python
# OKX orderbook message format (books5)
{
"arg": {"channel": "books5", "instId": "BTC-USDT"},
"data": [{
"asks": [["50001.0", "0.5", "0", "3"]], # [price, size, liquidated, orders]
"bids": [["50000.0", "0.8", "0", "2"]],
"ts": "1697123456789"
}]
}
```
#### Ticker Data
```python
# OKX ticker message format
{
"arg": {"channel": "tickers", "instId": "BTC-USDT"},
"data": [{
"last": "50000.5", # Last price
"askPx": "50001.0", # Best ask price
"bidPx": "50000.0", # Best bid price
"open24h": "49500.0", # 24h open
"high24h": "50500.0", # 24h high
"low24h": "49000.0", # 24h low
"vol24h": "1234.567", # 24h volume
"ts": "1697123456789"
}]
}
```
For comprehensive OKX documentation, see [OKX Collector Documentation](okx_collector.md).
## Integration Examples
### Django Integration
```python
2025-06-03 12:08:43 +08:00
# Django management command with conditional logging
from django.core.management.base import BaseCommand
from data import CollectorManager
2025-06-03 12:08:43 +08:00
from utils.logger import get_logger
import asyncio
class Command(BaseCommand):
help = 'Start crypto data collectors'
def handle(self, *args, **options):
async def run_collectors():
2025-06-03 12:08:43 +08:00
# Create manager with logging
manager_logger = get_logger('django_collectors', verbose=True)
manager = CollectorManager("django_collectors", logger=manager_logger)
2025-06-03 12:08:43 +08:00
# Add collectors with individual loggers
from myapp.collectors import OKXCollector, BinanceCollector
2025-06-03 12:08:43 +08:00
okx_logger = get_logger('django_okx', verbose=True)
binance_logger = get_logger('django_binance', verbose=True, log_errors_only=True)
manager.add_collector(OKXCollector(['BTC-USDT'], logger=okx_logger))
manager.add_collector(BinanceCollector(['ETH-USDT'], logger=binance_logger))
# Start system
await manager.start()
# Keep running
try:
while True:
await asyncio.sleep(60)
status = manager.get_status()
self.stdout.write(f"Status: {status['statistics']}")
except KeyboardInterrupt:
await manager.stop()
asyncio.run(run_collectors())
```
### FastAPI Integration
```python
2025-06-03 12:08:43 +08:00
# FastAPI application with conditional logging
from fastapi import FastAPI
from data import CollectorManager
2025-06-03 12:08:43 +08:00
from utils.logger import get_logger
import asyncio
app = FastAPI()
manager = None
@app.on_event("startup")
async def startup_event():
global manager
2025-06-03 12:08:43 +08:00
# Create manager with logging
manager_logger = get_logger('fastapi_collectors', verbose=True)
manager = CollectorManager("fastapi_collectors", logger=manager_logger)
# Add collectors with error-only logging for production
from collectors import OKXCollector
2025-06-03 12:08:43 +08:00
collector_logger = get_logger('fastapi_okx', verbose=False, log_errors_only=True)
collector = OKXCollector(['BTC-USDT', 'ETH-USDT'], logger=collector_logger)
manager.add_collector(collector)
# Start in background
await manager.start()
@app.on_event("shutdown")
async def shutdown_event():
global manager
if manager:
await manager.stop()
@app.get("/collector/status")
async def get_collector_status():
return manager.get_status()
@app.post("/collector/{name}/restart")
async def restart_collector(name: str):
success = await manager.restart_collector(name)
return {"success": success}
```
## Migration Guide
### From Manual Connection Management
**Before** (manual management):
```python
class OldCollector:
def __init__(self):
self.websocket = None
self.running = False
async def start(self):
while self.running:
try:
self.websocket = await connect()
await self.listen()
except Exception as e:
print(f"Error: {e}")
await asyncio.sleep(5) # Manual retry
```
2025-06-03 12:08:43 +08:00
**After** (with BaseDataCollector and conditional logging):
```python
2025-06-03 12:08:43 +08:00
from utils.logger import get_logger
class NewCollector(BaseDataCollector):
def __init__(self):
2025-06-03 12:08:43 +08:00
logger = get_logger('new_collector', verbose=True)
super().__init__(
"exchange",
["BTC-USDT"],
logger=logger,
log_errors_only=False
)
# Auto-restart and health monitoring included
async def connect(self) -> bool:
2025-06-03 12:08:43 +08:00
self._log_info("Connecting to exchange")
self.websocket = await connect()
2025-06-03 12:08:43 +08:00
self._log_info("Connection established")
return True
async def _handle_messages(self):
message = await self.websocket.receive()
2025-06-03 12:08:43 +08:00
self._log_debug(f"Received message: {message}")
# Error handling and restart logic automatic
```
### From Basic Monitoring
**Before** (basic monitoring):
```python
# Manual status tracking
status = {
'connected': False,
'last_message': None,
'error_count': 0
}
# Manual health checks
async def health_check():
if time.time() - status['last_message'] > 300:
print("No data for 5 minutes!")
```
2025-06-03 12:08:43 +08:00
**After** (comprehensive monitoring with conditional logging):
```python
2025-06-03 12:08:43 +08:00
# Automatic health monitoring with logging
logger = get_logger('monitored_collector', verbose=True)
collector = MyCollector(["BTC-USDT"], logger=logger)
# Rich status information
status = collector.get_status()
health = collector.get_health_status()
2025-06-03 12:08:43 +08:00
# Automatic alerts and recovery with logging
if not health['is_healthy']:
print(f"Issues: {health['issues']}")
2025-06-03 12:08:43 +08:00
# Auto-restart already triggered and logged
```
2025-06-03 12:08:43 +08:00
## Related Documentation
- [Data Collection Service](../services/data_collection_service.md) - High-level service orchestration
- [Logging System](logging.md) - Conditional logging implementation
- [Database Operations](../database/operations.md) - Database integration patterns
- [Monitoring Guide](../monitoring/README.md) - System monitoring and alerting
---
## Support and Contributing
### Getting Help
2025-06-03 12:08:43 +08:00
1. **Check Logs**: Review logs in `./logs/` directory (see [Logging System](logging.md))
2. **Status Information**: Use `get_status()` and `get_health_status()` methods
2025-06-03 12:08:43 +08:00
3. **Debug Mode**: Enable debug logging with conditional logging system
4. **Test with Demo**: Run `examples/collector_demo.py` to verify setup
### Contributing
The data collector system is designed to be extensible. Contributions are welcome for:
- New exchange implementations
- Enhanced monitoring features
- Performance optimizations
- Additional data types
- Integration examples
2025-06-03 12:08:43 +08:00
- Logging system improvements
### License
This documentation and the associated code are part of the Crypto Trading Bot Platform project.
---
2025-06-03 12:08:43 +08:00
*For more information, see the main project documentation in `/docs/`.*