207 lines
5.6 KiB
Markdown
Raw Normal View History

# Exchange Module Technical Documentation
## Implementation Guide
### Core Components
1. **Base Collector**
- Inherit from `BaseDataCollector`
- Implement required abstract methods
- Handle connection lifecycle
2. **WebSocket Client**
- Implement exchange-specific WebSocket handling
- Manage subscriptions and message parsing
- Handle reconnection logic
3. **Configuration**
- Define exchange-specific parameters
- Implement validation rules
- Set up default values
### Factory Implementation
The `ExchangeFactory` uses a registry pattern for dynamic collector creation:
```python
@dataclass
class ExchangeCollectorConfig:
"""Configuration for creating an exchange collector."""
exchange: str
symbol: str
data_types: List[DataType]
auto_restart: bool = True
health_check_interval: float = 30.0
store_raw_data: bool = True
custom_params: Optional[Dict[str, Any]] = None
def __post_init__(self):
"""Validate configuration after initialization."""
if not self.exchange:
raise InvalidConfigurationError("Exchange name cannot be empty")
if not self.symbol:
raise InvalidConfigurationError("Symbol cannot be empty")
if not self.data_types:
raise InvalidConfigurationError("At least one data type must be specified")
```
### Registry Configuration
Exchange capabilities are defined in the registry:
```python
EXCHANGE_REGISTRY = {
'okx': {
'collector': 'data.exchanges.okx.collector.OKXCollector',
'websocket': 'data.exchanges.okx.websocket.OKXWebSocketClient',
'name': 'OKX',
'supported_pairs': ['BTC-USDT', 'ETH-USDT', 'SOL-USDT', 'DOGE-USDT', 'TON-USDT'],
'supported_data_types': ['trade', 'orderbook', 'ticker', 'candles']
}
}
```
### Error Handling
Custom exceptions hierarchy for precise error handling:
```python
class ExchangeError(Exception):
"""Base exception for all exchange-related errors."""
pass
class ExchangeNotSupportedError(ExchangeError):
"""Exchange not supported/found in registry."""
pass
class InvalidConfigurationError(ExchangeError):
"""Invalid exchange configuration."""
pass
# Usage example:
try:
collector = ExchangeFactory.create_collector(config)
except ExchangeNotSupportedError as e:
logger.error(f"Exchange not supported: {e}")
except InvalidConfigurationError as e:
logger.error(f"Invalid configuration: {e}")
```
### Logging Integration
The module uses the project's unified logging system:
```python
from utils.logger import get_logger
logger = get_logger('exchanges')
class ExchangeFactory:
@staticmethod
def create_collector(config: ExchangeCollectorConfig) -> BaseDataCollector:
logger.info(f"Creating collector for {config.exchange} {config.symbol}")
try:
# Implementation
logger.debug("Collector created successfully")
except Exception as e:
logger.error(f"Failed to create collector: {e}")
raise
```
## Testing Guidelines
### Unit Tests
```python
def test_exchange_factory_validation():
"""Test configuration validation."""
config = ExchangeCollectorConfig(
exchange="okx",
symbol="BTC-USDT",
data_types=[DataType.TRADE]
)
is_valid, errors = ExchangeFactory.validate_config(config)
assert is_valid
assert not errors
def test_invalid_exchange():
"""Test handling of invalid exchange."""
with pytest.raises(ExchangeNotSupportedError):
ExchangeFactory.create_collector(
ExchangeCollectorConfig(
exchange="invalid",
symbol="BTC-USDT",
data_types=[DataType.TRADE]
)
)
```
### Integration Tests
```python
async def test_collector_lifecycle():
"""Test collector startup and shutdown."""
collector = create_okx_collector("BTC-USDT")
await collector.start()
assert collector.is_running()
await asyncio.sleep(5) # Allow time for connection
status = collector.get_status()
assert status['status'] == 'running'
await collector.stop()
assert not collector.is_running()
```
## Performance Considerations
1. **Memory Management**
- Implement proper cleanup in collector shutdown
- Monitor message queue sizes
- Clear unused subscriptions
2. **Connection Management**
- Implement exponential backoff for reconnections
- Monitor connection health
- Handle rate limits properly
3. **Data Processing**
- Process messages asynchronously
- Batch updates when possible
- Use efficient data structures
## Future Improvements
1. **Rate Limiting**
```python
class ExchangeRateLimit:
def __init__(self, requests_per_second: int):
self.rate = requests_per_second
self.tokens = requests_per_second
self.last_update = time.time()
```
2. **Automatic Retries**
```python
async def with_retry(func, max_retries=3, backoff_factor=1.5):
for attempt in range(max_retries):
try:
return await func()
except ExchangeError as e:
if attempt == max_retries - 1:
raise
wait_time = backoff_factor ** attempt
await asyncio.sleep(wait_time)
```
3. **Exchange-Specific Validation**
```python
class ExchangeValidator:
def __init__(self, exchange_info: dict):
self.rules = exchange_info.get('validation_rules', {})
def validate_symbol(self, symbol: str) -> bool:
pattern = self.rules.get('symbol_pattern')
return bool(re.match(pattern, symbol))
```