196 lines
6.3 KiB
Python
Raw Normal View History

"""
Exchange Factory for creating data collectors.
This module provides a factory pattern for creating data collectors
from different exchanges based on configuration.
"""
import importlib
from typing import Dict, List, Optional, Any, Type
from dataclasses import dataclass
from ..base_collector import BaseDataCollector, DataType
from .registry import EXCHANGE_REGISTRY, get_supported_exchanges, get_exchange_info
@dataclass
class ExchangeCollectorConfig:
"""Configuration for creating an exchange collector."""
exchange: str
symbol: str
data_types: List[DataType]
auto_restart: bool = True
health_check_interval: float = 30.0
store_raw_data: bool = True
custom_params: Optional[Dict[str, Any]] = None
class ExchangeFactory:
"""Factory for creating exchange-specific data collectors."""
@staticmethod
def create_collector(config: ExchangeCollectorConfig) -> BaseDataCollector:
"""
Create a data collector for the specified exchange.
Args:
config: Configuration for the collector
Returns:
Instance of the appropriate collector class
Raises:
ValueError: If exchange is not supported
ImportError: If collector class cannot be imported
"""
exchange_name = config.exchange.lower()
if exchange_name not in EXCHANGE_REGISTRY:
supported = get_supported_exchanges()
raise ValueError(f"Exchange '{config.exchange}' not supported. "
f"Supported exchanges: {supported}")
exchange_info = get_exchange_info(exchange_name)
collector_class_path = exchange_info['collector']
# Parse module and class name
module_path, class_name = collector_class_path.rsplit('.', 1)
try:
# Import the module
module = importlib.import_module(module_path)
# Get the collector class
collector_class = getattr(module, class_name)
# Prepare collector arguments
collector_args = {
'symbol': config.symbol,
'data_types': config.data_types,
'auto_restart': config.auto_restart,
'health_check_interval': config.health_check_interval,
'store_raw_data': config.store_raw_data
}
# Add any custom parameters
if config.custom_params:
collector_args.update(config.custom_params)
# Create and return the collector instance
return collector_class(**collector_args)
except ImportError as e:
raise ImportError(f"Failed to import collector class '{collector_class_path}': {e}")
except Exception as e:
raise RuntimeError(f"Failed to create collector for '{config.exchange}': {e}")
@staticmethod
def create_multiple_collectors(configs: List[ExchangeCollectorConfig]) -> List[BaseDataCollector]:
"""
Create multiple collectors from a list of configurations.
Args:
configs: List of collector configurations
Returns:
List of collector instances
"""
collectors = []
for config in configs:
try:
collector = ExchangeFactory.create_collector(config)
collectors.append(collector)
except Exception as e:
# Log error but continue with other collectors
print(f"Failed to create collector for {config.exchange} {config.symbol}: {e}")
return collectors
@staticmethod
def get_supported_pairs(exchange: str) -> List[str]:
"""
Get supported trading pairs for an exchange.
Args:
exchange: Exchange name
Returns:
List of supported trading pairs
"""
exchange_info = get_exchange_info(exchange)
if exchange_info:
return exchange_info.get('supported_pairs', [])
return []
@staticmethod
def get_supported_data_types(exchange: str) -> List[str]:
"""
Get supported data types for an exchange.
Args:
exchange: Exchange name
Returns:
List of supported data types
"""
exchange_info = get_exchange_info(exchange)
if exchange_info:
return exchange_info.get('supported_data_types', [])
return []
@staticmethod
def validate_config(config: ExchangeCollectorConfig) -> bool:
"""
Validate collector configuration.
Args:
config: Configuration to validate
Returns:
True if valid, False otherwise
"""
# Check if exchange is supported
if config.exchange.lower() not in EXCHANGE_REGISTRY:
return False
# Check if symbol is supported
supported_pairs = ExchangeFactory.get_supported_pairs(config.exchange)
if supported_pairs and config.symbol not in supported_pairs:
return False
# Check if data types are supported
supported_data_types = ExchangeFactory.get_supported_data_types(config.exchange)
if supported_data_types:
for data_type in config.data_types:
if data_type.value not in supported_data_types:
return False
return True
def create_okx_collector(symbol: str,
data_types: Optional[List[DataType]] = None,
**kwargs) -> BaseDataCollector:
"""
Convenience function to create an OKX collector.
Args:
symbol: Trading pair symbol (e.g., 'BTC-USDT')
data_types: List of data types to collect
**kwargs: Additional collector parameters
Returns:
OKX collector instance
"""
if data_types is None:
data_types = [DataType.TRADE, DataType.ORDERBOOK]
config = ExchangeCollectorConfig(
exchange='okx',
symbol=symbol,
data_types=data_types,
**kwargs
)
return ExchangeFactory.create_collector(config)