TCPDashboard/data/collector/collector_factory.py

322 lines
12 KiB
Python
Raw Permalink Normal View History

"""
Collector Factory for data collection service.
This module handles the creation of data collectors with proper configuration
and error handling, separating collector creation logic from the main service.
"""
from typing import Dict, Any, List, Optional
from ..exchanges.factory import ExchangeFactory, ExchangeCollectorConfig
from .base_collector import DataType
class CollectorFactory:
"""Factory for creating and configuring data collectors."""
def __init__(self, logger=None):
"""
Initialize the collector factory.
Args:
logger: Logger instance for logging operations
"""
self.logger = logger
async def create_collector(
self,
exchange_name: str,
pair_config: Dict[str, Any],
data_collection_config: Dict[str, Any],
database_config: Dict[str, Any]
) -> Optional[Any]:
"""
Create a single data collector for a trading pair.
Args:
exchange_name: Name of the exchange (e.g., 'okx')
pair_config: Configuration for the trading pair
data_collection_config: Data collection settings
database_config: Database configuration settings
Returns:
Created collector instance or None if creation failed
"""
try:
symbol = pair_config['symbol']
# Validate and parse configuration
validated_config = self._validate_pair_config(pair_config)
if not validated_config:
return None
data_types = [DataType(dt) for dt in validated_config['data_types']]
timeframes = validated_config['timeframes']
# Create collector configuration
collector_config = self._create_collector_config(
exchange_name=exchange_name,
symbol=symbol,
data_types=data_types,
timeframes=timeframes,
data_collection_config=data_collection_config,
database_config=database_config
)
# Create collector using exchange factory
collector = ExchangeFactory.create_collector(collector_config)
if collector:
if self.logger:
self.logger.info(f"Created collector: {symbol} [{'/'.join(timeframes)}]")
return collector
else:
if self.logger:
self.logger.error(f"Failed to create collector for {symbol}", exc_info=True)
return None
except Exception as e:
symbol = pair_config.get('symbol', 'unknown')
if self.logger:
self.logger.error(f"Error creating collector for {symbol}: {e}", exc_info=True)
return None
def _validate_pair_config(self, pair_config: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""
Validate trading pair configuration.
Args:
pair_config: Raw trading pair configuration
Returns:
Validated configuration or None if invalid
"""
try:
# Validate required fields
if 'symbol' not in pair_config:
if self.logger:
self.logger.error("Trading pair missing required 'symbol' field", exc_info=True)
return None
symbol = pair_config['symbol']
if not isinstance(symbol, str) or '-' not in symbol:
if self.logger:
self.logger.error(f"Invalid symbol format: {symbol}. Expected format: 'BASE-QUOTE'", exc_info=True)
return None
# Apply defaults and validate data types
data_types = pair_config.get('data_types', ['trade'])
valid_data_types = ['trade', 'orderbook', 'ticker', 'candle']
validated_data_types = []
for dt in data_types:
if dt in valid_data_types:
validated_data_types.append(dt)
else:
if self.logger:
self.logger.warning(f"Invalid data type '{dt}' for {symbol}, skipping")
if not validated_data_types:
validated_data_types = ['trade'] # Default fallback
# Validate timeframes
timeframes = pair_config.get('timeframes', ['1m', '5m'])
# OKX supports second-level timeframes for real-time candle aggregation
valid_timeframes = ['1s', '5s', '1m', '3m', '5m', '15m', '30m', '1h', '2h', '4h', '6h', '8h', '12h', '1d']
validated_timeframes = []
for tf in timeframes:
if tf in valid_timeframes:
validated_timeframes.append(tf)
else:
if self.logger:
self.logger.warning(f"Invalid timeframe '{tf}' for {symbol}, skipping")
if not validated_timeframes:
validated_timeframes = ['1m', '5m'] # Default fallback
return {
'symbol': symbol,
'data_types': validated_data_types,
'timeframes': validated_timeframes,
'enabled': pair_config.get('enabled', True),
'channels': pair_config.get('channels', {})
}
except Exception as e:
if self.logger:
self.logger.error(f"Error validating pair config: {e}", exc_info=True)
return None
def _create_collector_config(
self,
exchange_name: str,
symbol: str,
data_types: List[DataType],
timeframes: List[str],
data_collection_config: Dict[str, Any],
database_config: Dict[str, Any]
) -> ExchangeCollectorConfig:
"""
Create exchange collector configuration.
Args:
exchange_name: Name of the exchange
symbol: Trading pair symbol
data_types: List of data types to collect
timeframes: List of timeframes for candle data
data_collection_config: Data collection settings
database_config: Database configuration
Returns:
Configured ExchangeCollectorConfig instance
"""
# Generate component name
component_name = f"{exchange_name}_collector_{symbol.replace('-', '_').lower()}"
# Build custom parameters - only include parameters the collector accepts
custom_params = {
'component_name': component_name,
'logger': self.logger,
'log_errors_only': True, # Clean logging - only errors and essential events
'force_update_candles': database_config.get('force_update_candles', False),
}
# Create and return collector configuration
return ExchangeCollectorConfig(
exchange=exchange_name,
symbol=symbol,
data_types=data_types,
timeframes=timeframes,
auto_restart=data_collection_config.get('auto_restart', True),
health_check_interval=data_collection_config.get('health_check_interval', 120.0),
store_raw_data=data_collection_config.get('store_raw_data', True),
custom_params=custom_params
)
async def create_collectors_from_config(
self,
config: Dict[str, Any]
) -> List[Any]:
"""
Create multiple collectors from service configuration.
Args:
config: Complete service configuration dictionary
Returns:
List of created collector instances
"""
collectors = []
try:
exchange_name = config.get('exchange', 'okx')
trading_pairs = config.get('trading_pairs', [])
data_collection_config = config.get('data_collection', {})
database_config = config.get('database', {})
# Filter enabled pairs
enabled_pairs = [pair for pair in trading_pairs if pair.get('enabled', True)]
if not enabled_pairs:
if self.logger:
self.logger.warning(f"No enabled trading pairs for {exchange_name}")
return collectors
if self.logger:
self.logger.info(f"Creating {len(enabled_pairs)} collectors for {exchange_name.upper()}")
# Create collectors for each enabled pair
for pair_config in enabled_pairs:
collector = await self.create_collector(
exchange_name=exchange_name,
pair_config=pair_config,
data_collection_config=data_collection_config,
database_config=database_config
)
if collector:
collectors.append(collector)
else:
symbol = pair_config.get('symbol', 'unknown')
if self.logger:
self.logger.error(f"Failed to create collector for {symbol}", exc_info=True)
if self.logger:
self.logger.info(f"Successfully created {len(collectors)} collectors")
return collectors
except Exception as e:
if self.logger:
self.logger.error(f"Error creating collectors from config: {e}", exc_info=True)
return collectors
def get_supported_exchanges(self) -> List[str]:
"""
Get list of supported exchanges.
Returns:
List of supported exchange names
"""
# This could be enhanced to dynamically discover available exchanges
return ['okx', 'binance'] # Add more as they become available
def get_supported_data_types(self) -> List[str]:
"""
Get list of supported data types.
Returns:
List of supported data type names
"""
return ['trade', 'orderbook', 'ticker', 'candle']
def get_supported_timeframes(self) -> List[str]:
"""
Get list of supported timeframes.
Returns:
List of supported timeframe strings
"""
return ['1s', '5s', '1m', '3m', '5m', '15m', '30m', '1h', '2h', '4h', '6h', '8h', '12h', '1d']
def validate_collector_requirements(
self,
exchange: str,
data_types: List[str],
timeframes: List[str]
) -> Dict[str, Any]:
"""
Validate collector requirements against supported features.
Args:
exchange: Exchange name
data_types: Required data types
timeframes: Required timeframes
Returns:
Validation result with details
"""
result = {
'valid': True,
'errors': [],
'warnings': []
}
# Validate exchange
if exchange not in self.get_supported_exchanges():
result['valid'] = False
result['errors'].append(f"Unsupported exchange: {exchange}")
# Validate data types
supported_data_types = self.get_supported_data_types()
for dt in data_types:
if dt not in supported_data_types:
result['warnings'].append(f"Unsupported data type: {dt}")
# Validate timeframes
supported_timeframes = self.get_supported_timeframes()
for tf in timeframes:
if tf not in supported_timeframes:
result['warnings'].append(f"Unsupported timeframe: {tf}")
return result