diff --git a/data/exchanges/exceptions.py b/data/exchanges/exceptions.py new file mode 100644 index 0000000..82311a6 --- /dev/null +++ b/data/exchanges/exceptions.py @@ -0,0 +1,30 @@ +""" +Custom exceptions for the exchanges module. + +This module contains all custom exceptions used in the exchanges package +to provide more specific error handling and better error messages. +""" + +class ExchangeError(Exception): + """Base exception for all exchange-related errors.""" + pass + +class ExchangeNotSupportedError(ExchangeError): + """Raised when an exchange is not supported or not found in registry.""" + pass + +class InvalidConfigurationError(ExchangeError): + """Raised when exchange configuration is invalid.""" + pass + +class CollectorCreationError(ExchangeError): + """Raised when there's an error creating a collector instance.""" + pass + +class ExchangeConnectionError(ExchangeError): + """Raised when there's an error connecting to an exchange.""" + pass + +class ValidationError(ExchangeError): + """Raised when validation fails for exchange parameters.""" + pass \ No newline at end of file diff --git a/data/exchanges/factory.py b/data/exchanges/factory.py index 666d6aa..7e7b673 100644 --- a/data/exchanges/factory.py +++ b/data/exchanges/factory.py @@ -6,12 +6,22 @@ from different exchanges based on configuration. """ import importlib -from typing import Dict, List, Optional, Any, Type +from typing import Dict, List, Optional, Any, Type, Tuple from dataclasses import dataclass +from utils.logger import get_logger from ..base_collector import BaseDataCollector, DataType from .registry import EXCHANGE_REGISTRY, get_supported_exchanges, get_exchange_info +from .exceptions import ( + ExchangeError, + ExchangeNotSupportedError, + InvalidConfigurationError, + CollectorCreationError, + ValidationError +) +# Initialize logger +logger = get_logger('exchanges') @dataclass class ExchangeCollectorConfig: @@ -24,6 +34,16 @@ class ExchangeCollectorConfig: store_raw_data: bool = True custom_params: Optional[Dict[str, Any]] = None + def __post_init__(self): + """Validate configuration after initialization.""" + if not self.exchange: + raise InvalidConfigurationError("Exchange name cannot be empty") + if not self.symbol: + raise InvalidConfigurationError("Symbol cannot be empty") + if not self.data_types: + raise InvalidConfigurationError("At least one data type must be specified") + logger.debug(f"Created collector config for {self.exchange} {self.symbol}") + class ExchangeFactory: """Factory for creating exchange-specific data collectors.""" @@ -40,15 +60,17 @@ class ExchangeFactory: Instance of the appropriate collector class Raises: - ValueError: If exchange is not supported - ImportError: If collector class cannot be imported + ExchangeNotSupportedError: If exchange is not supported + CollectorCreationError: If collector creation fails """ exchange_name = config.exchange.lower() + logger.info(f"Creating collector for {exchange_name} {config.symbol}") if exchange_name not in EXCHANGE_REGISTRY: supported = get_supported_exchanges() - raise ValueError(f"Exchange '{config.exchange}' not supported. " - f"Supported exchanges: {supported}") + error_msg = f"Exchange '{config.exchange}' not supported. Supported exchanges: {supported}" + logger.error(error_msg) + raise ExchangeNotSupportedError(error_msg) exchange_info = get_exchange_info(exchange_name) collector_class_path = exchange_info['collector'] @@ -58,6 +80,7 @@ class ExchangeFactory: try: # Import the module + logger.debug(f"Importing collector module {module_path}") module = importlib.import_module(module_path) # Get the collector class @@ -77,12 +100,17 @@ class ExchangeFactory: collector_args.update(config.custom_params) # Create and return the collector instance + logger.info(f"Successfully created collector for {exchange_name} {config.symbol}") return collector_class(**collector_args) except ImportError as e: - raise ImportError(f"Failed to import collector class '{collector_class_path}': {e}") + error_msg = f"Failed to import collector class '{collector_class_path}': {e}" + logger.error(error_msg) + raise CollectorCreationError(error_msg) from e except Exception as e: - raise RuntimeError(f"Failed to create collector for '{config.exchange}': {e}") + error_msg = f"Failed to create collector for '{config.exchange}': {e}" + logger.error(error_msg) + raise CollectorCreationError(error_msg) from e @staticmethod def create_multiple_collectors(configs: List[ExchangeCollectorConfig]) -> List[BaseDataCollector]: @@ -96,15 +124,17 @@ class ExchangeFactory: List of collector instances """ collectors = [] + logger.info(f"Creating {len(configs)} collectors") for config in configs: try: collector = ExchangeFactory.create_collector(config) collectors.append(collector) - except Exception as e: - # Log error but continue with other collectors - print(f"Failed to create collector for {config.exchange} {config.symbol}: {e}") + logger.debug(f"Successfully created collector for {config.exchange} {config.symbol}") + except ExchangeError as e: + logger.error(f"Failed to create collector for {config.exchange} {config.symbol}: {e}") + logger.info(f"Successfully created {len(collectors)} out of {len(configs)} collectors") return collectors @staticmethod @@ -140,7 +170,7 @@ class ExchangeFactory: return [] @staticmethod - def validate_config(config: ExchangeCollectorConfig) -> bool: + def validate_config(config: ExchangeCollectorConfig) -> Tuple[bool, List[str]]: """ Validate collector configuration. @@ -148,25 +178,34 @@ class ExchangeFactory: config: Configuration to validate Returns: - True if valid, False otherwise + Tuple of (is_valid, list_of_errors) """ + logger.debug(f"Validating configuration for {config.exchange} {config.symbol}") + errors = [] + # Check if exchange is supported if config.exchange.lower() not in EXCHANGE_REGISTRY: - return False + errors.append(f"Exchange '{config.exchange}' not supported") # Check if symbol is supported supported_pairs = ExchangeFactory.get_supported_pairs(config.exchange) if supported_pairs and config.symbol not in supported_pairs: - return False + errors.append(f"Symbol '{config.symbol}' not supported for {config.exchange}") # Check if data types are supported supported_data_types = ExchangeFactory.get_supported_data_types(config.exchange) if supported_data_types: for data_type in config.data_types: if data_type.value not in supported_data_types: - return False + errors.append(f"Data type '{data_type.value}' not supported for {config.exchange}") - return True + is_valid = len(errors) == 0 + if not is_valid: + logger.warning(f"Configuration validation failed for {config.exchange}: {errors}") + else: + logger.debug(f"Configuration validation passed for {config.exchange}") + + return is_valid, errors def create_okx_collector(symbol: str, @@ -186,6 +225,8 @@ def create_okx_collector(symbol: str, if data_types is None: data_types = [DataType.TRADE, DataType.ORDERBOOK] + logger.debug(f"Creating OKX collector for {symbol}") + config = ExchangeCollectorConfig( exchange='okx', symbol=symbol, diff --git a/data/exchanges/registry.py b/data/exchanges/registry.py index ae6775e..934e23d 100644 --- a/data/exchanges/registry.py +++ b/data/exchanges/registry.py @@ -5,6 +5,12 @@ This module contains the registry of supported exchanges and their capabilities, separated to avoid circular import issues. """ +from utils.logger import get_logger +from .exceptions import ExchangeNotSupportedError + +# Initialize logger +logger = get_logger('exchanges') + # Exchange registry for factory pattern EXCHANGE_REGISTRY = { 'okx': { @@ -17,11 +23,33 @@ EXCHANGE_REGISTRY = { } -def get_supported_exchanges(): +def get_supported_exchanges() -> list: """Get list of supported exchange names.""" - return list(EXCHANGE_REGISTRY.keys()) + exchanges = list(EXCHANGE_REGISTRY.keys()) + logger.debug(f"Available exchanges: {exchanges}") + return exchanges -def get_exchange_info(exchange_name: str): - """Get information about a specific exchange.""" - return EXCHANGE_REGISTRY.get(exchange_name.lower()) \ No newline at end of file +def get_exchange_info(exchange_name: str) -> dict: + """ + Get information about a specific exchange. + + Args: + exchange_name: Name of the exchange + + Returns: + Dictionary containing exchange information + + Raises: + ExchangeNotSupportedError: If exchange is not found in registry + """ + exchange_name = exchange_name.lower() + exchange_info = EXCHANGE_REGISTRY.get(exchange_name) + + if not exchange_info: + error_msg = f"Exchange '{exchange_name}' not found in registry" + logger.error(error_msg) + raise ExchangeNotSupportedError(error_msg) + + logger.debug(f"Retrieved info for exchange: {exchange_name}") + return exchange_info \ No newline at end of file diff --git a/docs/modules/exchanges/README.md b/docs/modules/exchanges/README.md index 9199c33..c686a34 100644 --- a/docs/modules/exchanges/README.md +++ b/docs/modules/exchanges/README.md @@ -1,43 +1,44 @@ # Exchange Integrations -This section provides documentation for integrating with different cryptocurrency exchanges. +## Overview +This module provides a standardized interface for collecting real-time data from various cryptocurrency exchanges. It uses a modular architecture that allows easy addition of new exchanges while maintaining consistent behavior and error handling. -## Architecture +## Documentation Structure -The platform uses a modular architecture for exchange integration, allowing for easy addition of new exchanges without modifying core application logic. +- **[Technical Documentation](exchanges.md)**: Detailed technical documentation of the exchange module architecture, including factory pattern, configuration, and error handling. +- **Exchange-Specific Implementations**: + - **[OKX](okx_collector.md)**: Complete guide for OKX exchange integration -### Core Components +## Quick Links -- **`BaseDataCollector`**: An abstract base class defining the standard interface for all exchange collectors. -- **`ExchangeFactory`**: A factory for creating exchange-specific collector instances. -- **Exchange-Specific Modules**: Each exchange has its own module containing the collector implementation and any specific data processing logic. +- [Data Collection Architecture](../data_collectors.md) +- [Error Handling Guide](../error_handling.md) +- [Logging Configuration](../logging.md) -For a high-level overview of the data collection system, see the [Data Collectors Documentation (`../data_collectors.md`)](../data_collectors.md). +## Exchange Status -## Supported Exchanges +| Exchange | Status | Features | Documentation | +|----------|---------|-----------|---------------| +| OKX | ✅ Production | Trades, Order Book, Ticker, Candles | [Guide](okx_collector.md) | +| Binance | 🔄 Planned | TBD | - | +| Coinbase | 🔄 Planned | TBD | - | -### OKX -- **Status**: Production Ready -- **Features**: Real-time trades, order book, and ticker data. -- **Documentation**: [OKX Collector Guide (`okx.md`)] +## Adding New Exchanges -### Binance -- **Status**: Planned -- **Features**: To be determined. +See [Technical Documentation](exchanges.md) for detailed implementation guide. -### Coinbase -- **Status**: Planned -- **Features**: To be determined. +Key Steps: +1. Create exchange module in `data/exchanges/` +2. Implement collector class extending `BaseDataCollector` +3. Add WebSocket/REST implementations +4. Register in `ExchangeFactory` +5. Add documentation -## Adding a New Exchange +## Support -To add support for a new exchange, you need to: - -1. Create a new module in the `data/exchanges/` directory. -2. Implement a new collector class that inherits from `BaseDataCollector`. -3. Implement the exchange-specific WebSocket connection and data parsing logic. -4. Register the new collector in the `ExchangeFactory`. -5. Add a new documentation file in this directory explaining the implementation details. +- Report issues in the project issue tracker +- See [Contributing Guide](../../CONTRIBUTING.md) for development guidelines +- Check [Known Issues](exchanges.md#known-issues) for current limitations --- -*Back to [Modules Documentation (`../README.md`)]* \ No newline at end of file +*Back to [Main Documentation](../../README.md)* \ No newline at end of file diff --git a/docs/modules/exchanges/exchanges.md b/docs/modules/exchanges/exchanges.md new file mode 100644 index 0000000..a572180 --- /dev/null +++ b/docs/modules/exchanges/exchanges.md @@ -0,0 +1,207 @@ +# Exchange Module Technical Documentation + +## Implementation Guide + +### Core Components + +1. **Base Collector** + - Inherit from `BaseDataCollector` + - Implement required abstract methods + - Handle connection lifecycle + +2. **WebSocket Client** + - Implement exchange-specific WebSocket handling + - Manage subscriptions and message parsing + - Handle reconnection logic + +3. **Configuration** + - Define exchange-specific parameters + - Implement validation rules + - Set up default values + +### Factory Implementation + +The `ExchangeFactory` uses a registry pattern for dynamic collector creation: + +```python +@dataclass +class ExchangeCollectorConfig: + """Configuration for creating an exchange collector.""" + exchange: str + symbol: str + data_types: List[DataType] + auto_restart: bool = True + health_check_interval: float = 30.0 + store_raw_data: bool = True + custom_params: Optional[Dict[str, Any]] = None + + def __post_init__(self): + """Validate configuration after initialization.""" + if not self.exchange: + raise InvalidConfigurationError("Exchange name cannot be empty") + if not self.symbol: + raise InvalidConfigurationError("Symbol cannot be empty") + if not self.data_types: + raise InvalidConfigurationError("At least one data type must be specified") +``` + +### Registry Configuration + +Exchange capabilities are defined in the registry: + +```python +EXCHANGE_REGISTRY = { + 'okx': { + 'collector': 'data.exchanges.okx.collector.OKXCollector', + 'websocket': 'data.exchanges.okx.websocket.OKXWebSocketClient', + 'name': 'OKX', + 'supported_pairs': ['BTC-USDT', 'ETH-USDT', 'SOL-USDT', 'DOGE-USDT', 'TON-USDT'], + 'supported_data_types': ['trade', 'orderbook', 'ticker', 'candles'] + } +} +``` + +### Error Handling + +Custom exceptions hierarchy for precise error handling: + +```python +class ExchangeError(Exception): + """Base exception for all exchange-related errors.""" + pass + +class ExchangeNotSupportedError(ExchangeError): + """Exchange not supported/found in registry.""" + pass + +class InvalidConfigurationError(ExchangeError): + """Invalid exchange configuration.""" + pass + +# Usage example: +try: + collector = ExchangeFactory.create_collector(config) +except ExchangeNotSupportedError as e: + logger.error(f"Exchange not supported: {e}") +except InvalidConfigurationError as e: + logger.error(f"Invalid configuration: {e}") +``` + +### Logging Integration + +The module uses the project's unified logging system: + +```python +from utils.logger import get_logger + +logger = get_logger('exchanges') + +class ExchangeFactory: + @staticmethod + def create_collector(config: ExchangeCollectorConfig) -> BaseDataCollector: + logger.info(f"Creating collector for {config.exchange} {config.symbol}") + try: + # Implementation + logger.debug("Collector created successfully") + except Exception as e: + logger.error(f"Failed to create collector: {e}") + raise +``` + +## Testing Guidelines + +### Unit Tests + +```python +def test_exchange_factory_validation(): + """Test configuration validation.""" + config = ExchangeCollectorConfig( + exchange="okx", + symbol="BTC-USDT", + data_types=[DataType.TRADE] + ) + is_valid, errors = ExchangeFactory.validate_config(config) + assert is_valid + assert not errors + +def test_invalid_exchange(): + """Test handling of invalid exchange.""" + with pytest.raises(ExchangeNotSupportedError): + ExchangeFactory.create_collector( + ExchangeCollectorConfig( + exchange="invalid", + symbol="BTC-USDT", + data_types=[DataType.TRADE] + ) + ) +``` + +### Integration Tests + +```python +async def test_collector_lifecycle(): + """Test collector startup and shutdown.""" + collector = create_okx_collector("BTC-USDT") + + await collector.start() + assert collector.is_running() + + await asyncio.sleep(5) # Allow time for connection + status = collector.get_status() + assert status['status'] == 'running' + + await collector.stop() + assert not collector.is_running() +``` + +## Performance Considerations + +1. **Memory Management** + - Implement proper cleanup in collector shutdown + - Monitor message queue sizes + - Clear unused subscriptions + +2. **Connection Management** + - Implement exponential backoff for reconnections + - Monitor connection health + - Handle rate limits properly + +3. **Data Processing** + - Process messages asynchronously + - Batch updates when possible + - Use efficient data structures + +## Future Improvements + +1. **Rate Limiting** + ```python + class ExchangeRateLimit: + def __init__(self, requests_per_second: int): + self.rate = requests_per_second + self.tokens = requests_per_second + self.last_update = time.time() + ``` + +2. **Automatic Retries** + ```python + async def with_retry(func, max_retries=3, backoff_factor=1.5): + for attempt in range(max_retries): + try: + return await func() + except ExchangeError as e: + if attempt == max_retries - 1: + raise + wait_time = backoff_factor ** attempt + await asyncio.sleep(wait_time) + ``` + +3. **Exchange-Specific Validation** + ```python + class ExchangeValidator: + def __init__(self, exchange_info: dict): + self.rules = exchange_info.get('validation_rules', {}) + + def validate_symbol(self, symbol: str) -> bool: + pattern = self.rules.get('symbol_pattern') + return bool(re.match(pattern, symbol)) + ``` \ No newline at end of file