434 lines
14 KiB
Markdown
434 lines
14 KiB
Markdown
|
|
# Refactored Data Processing Architecture
|
||
|
|
|
||
|
|
## Overview
|
||
|
|
|
||
|
|
The data processing system has been significantly refactored to improve reusability, maintainability, and scalability across different exchanges. The key improvement is the extraction of common utilities into a shared framework while keeping exchange-specific components focused and minimal.
|
||
|
|
|
||
|
|
## Architecture Changes
|
||
|
|
|
||
|
|
### Before (Monolithic)
|
||
|
|
```
|
||
|
|
data/exchanges/okx/
|
||
|
|
├── data_processor.py # 1343 lines - everything in one file
|
||
|
|
├── collector.py
|
||
|
|
└── websocket.py
|
||
|
|
```
|
||
|
|
|
||
|
|
### After (Modular)
|
||
|
|
```
|
||
|
|
data/
|
||
|
|
├── common/ # Shared utilities for all exchanges
|
||
|
|
│ ├── __init__.py
|
||
|
|
│ ├── data_types.py # StandardizedTrade, OHLCVCandle, etc.
|
||
|
|
│ ├── aggregation.py # TimeframeBucket, RealTimeCandleProcessor
|
||
|
|
│ ├── transformation.py # BaseDataTransformer, UnifiedDataTransformer
|
||
|
|
│ └── validation.py # BaseDataValidator, common validation
|
||
|
|
└── exchanges/
|
||
|
|
└── okx/
|
||
|
|
├── data_processor.py # ~600 lines - OKX-specific only
|
||
|
|
├── collector.py # Updated to use common utilities
|
||
|
|
└── websocket.py
|
||
|
|
```
|
||
|
|
|
||
|
|
## Key Benefits
|
||
|
|
|
||
|
|
### 1. **Reusability Across Exchanges**
|
||
|
|
- Candle aggregation logic works for any exchange
|
||
|
|
- Standardized data formats enable uniform processing
|
||
|
|
- Base classes provide common patterns for new exchanges
|
||
|
|
|
||
|
|
### 2. **Maintainability**
|
||
|
|
- Smaller, focused files are easier to understand and modify
|
||
|
|
- Common utilities are tested once and reused everywhere
|
||
|
|
- Clear separation of concerns
|
||
|
|
|
||
|
|
### 3. **Extensibility**
|
||
|
|
- Adding new exchanges requires minimal code
|
||
|
|
- New data types and timeframes are automatically supported
|
||
|
|
- Validation and transformation patterns are consistent
|
||
|
|
|
||
|
|
### 4. **Performance**
|
||
|
|
- Optimized aggregation algorithms and memory usage
|
||
|
|
- Efficient candle bucketing algorithms
|
||
|
|
- Lazy evaluation where possible
|
||
|
|
|
||
|
|
### 5. **Testing**
|
||
|
|
- Modular components are easier to test independently
|
||
|
|
|
||
|
|
## Time Aggregation Strategy
|
||
|
|
|
||
|
|
### Right-Aligned Timestamps (Industry Standard)
|
||
|
|
|
||
|
|
The system uses **RIGHT-ALIGNED timestamps** following industry standards from major exchanges (Binance, OKX, Coinbase):
|
||
|
|
|
||
|
|
- **Candle timestamp = end time of the interval (close time)**
|
||
|
|
- 5-minute candle with timestamp `09:05:00` represents data from `09:00:01` to `09:05:00`
|
||
|
|
- 1-minute candle with timestamp `14:32:00` represents data from `14:31:01` to `14:32:00`
|
||
|
|
- This aligns with how exchanges report historical data
|
||
|
|
|
||
|
|
### Aggregation Process (No Future Leakage)
|
||
|
|
|
||
|
|
```python
|
||
|
|
def process_trade_realtime(trade: StandardizedTrade, timeframe: str):
|
||
|
|
"""
|
||
|
|
Real-time aggregation with strict future leakage prevention
|
||
|
|
|
||
|
|
CRITICAL: Only emit completed candles, never incomplete ones
|
||
|
|
"""
|
||
|
|
|
||
|
|
# 1. Calculate which time bucket this trade belongs to
|
||
|
|
trade_bucket_start = get_bucket_start_time(trade.timestamp, timeframe)
|
||
|
|
|
||
|
|
# 2. Check if current bucket exists and matches
|
||
|
|
current_bucket = current_buckets.get(timeframe)
|
||
|
|
|
||
|
|
# 3. Handle time boundary crossing
|
||
|
|
if current_bucket is None:
|
||
|
|
# First bucket for this timeframe
|
||
|
|
current_bucket = create_bucket(trade_bucket_start, timeframe)
|
||
|
|
elif current_bucket.start_time != trade_bucket_start:
|
||
|
|
# Time boundary crossed - complete previous bucket FIRST
|
||
|
|
if current_bucket.has_trades():
|
||
|
|
completed_candle = current_bucket.to_candle(is_complete=True)
|
||
|
|
emit_candle(completed_candle) # Store in market_data table
|
||
|
|
|
||
|
|
# Create new bucket for current time period
|
||
|
|
current_bucket = create_bucket(trade_bucket_start, timeframe)
|
||
|
|
|
||
|
|
# 4. Add trade to current bucket
|
||
|
|
current_bucket.add_trade(trade)
|
||
|
|
|
||
|
|
# 5. Return only completed candles (never incomplete/future data)
|
||
|
|
return completed_candles # Empty list unless boundary crossed
|
||
|
|
```
|
||
|
|
|
||
|
|
### Time Bucket Calculation Examples
|
||
|
|
|
||
|
|
```python
|
||
|
|
# 5-minute timeframes (00:00, 00:05, 00:10, 00:15, etc.)
|
||
|
|
trade_time = "09:03:45" -> bucket_start = "09:00:00", bucket_end = "09:05:00"
|
||
|
|
trade_time = "09:07:23" -> bucket_start = "09:05:00", bucket_end = "09:10:00"
|
||
|
|
trade_time = "09:05:00" -> bucket_start = "09:05:00", bucket_end = "09:10:00"
|
||
|
|
|
||
|
|
# 1-hour timeframes (align to hour boundaries)
|
||
|
|
trade_time = "14:35:22" -> bucket_start = "14:00:00", bucket_end = "15:00:00"
|
||
|
|
trade_time = "15:00:00" -> bucket_start = "15:00:00", bucket_end = "16:00:00"
|
||
|
|
|
||
|
|
# 4-hour timeframes (00:00, 04:00, 08:00, 12:00, 16:00, 20:00)
|
||
|
|
trade_time = "13:45:12" -> bucket_start = "12:00:00", bucket_end = "16:00:00"
|
||
|
|
trade_time = "16:00:01" -> bucket_start = "16:00:00", bucket_end = "20:00:00"
|
||
|
|
```
|
||
|
|
|
||
|
|
### Future Leakage Prevention
|
||
|
|
|
||
|
|
**CRITICAL SAFEGUARDS:**
|
||
|
|
|
||
|
|
1. **Boundary Crossing Detection**: Only complete candles when trade timestamp definitively crosses time boundary
|
||
|
|
2. **No Premature Completion**: Never emit incomplete candles during real-time processing
|
||
|
|
3. **Strict Time Validation**: Trades only added to buckets if `start_time <= trade.timestamp < end_time`
|
||
|
|
4. **Historical Consistency**: Same logic for real-time and historical processing
|
||
|
|
|
||
|
|
```python
|
||
|
|
# CORRECT: Only complete candle when boundary is crossed
|
||
|
|
if current_bucket.start_time != trade_bucket_start:
|
||
|
|
# Time boundary definitely crossed - safe to complete
|
||
|
|
completed_candle = current_bucket.to_candle(is_complete=True)
|
||
|
|
emit_to_storage(completed_candle)
|
||
|
|
|
||
|
|
# INCORRECT: Would cause future leakage
|
||
|
|
if some_timer_expires():
|
||
|
|
# Never complete based on timers or external events
|
||
|
|
completed_candle = current_bucket.to_candle(is_complete=True) # WRONG!
|
||
|
|
```
|
||
|
|
|
||
|
|
### Data Storage Flow
|
||
|
|
|
||
|
|
```
|
||
|
|
WebSocket Trade Data → Validation → Transformation → Aggregation → Storage
|
||
|
|
| | |
|
||
|
|
↓ ↓ ↓
|
||
|
|
Raw individual trades Completed OHLCV Incomplete OHLCV
|
||
|
|
| candles (storage) (monitoring only)
|
||
|
|
↓ |
|
||
|
|
raw_trades table market_data table
|
||
|
|
(debugging/compliance) (trading decisions)
|
||
|
|
```
|
||
|
|
|
||
|
|
**Storage Rules:**
|
||
|
|
- **Raw trades** → `raw_trades` table (every individual trade/orderbook/ticker)
|
||
|
|
- **Completed candles** → `market_data` table (only when timeframe boundary crossed)
|
||
|
|
- **Incomplete candles** → Memory only (never stored, used for monitoring)
|
||
|
|
|
||
|
|
### Aggregation Logic Implementation
|
||
|
|
|
||
|
|
```python
|
||
|
|
def aggregate_to_timeframe(trades: List[StandardizedTrade], timeframe: str) -> List[OHLCVCandle]:
|
||
|
|
"""
|
||
|
|
Aggregate trades to specified timeframe with right-aligned timestamps
|
||
|
|
"""
|
||
|
|
# Group trades by time intervals
|
||
|
|
buckets = {}
|
||
|
|
completed_candles = []
|
||
|
|
|
||
|
|
for trade in sorted(trades, key=lambda t: t.timestamp):
|
||
|
|
# Calculate bucket start time (left boundary)
|
||
|
|
bucket_start = get_bucket_start_time(trade.timestamp, timeframe)
|
||
|
|
|
||
|
|
# Get or create bucket
|
||
|
|
if bucket_start not in buckets:
|
||
|
|
buckets[bucket_start] = TimeframeBucket(timeframe, bucket_start)
|
||
|
|
|
||
|
|
# Add trade to bucket
|
||
|
|
buckets[bucket_start].add_trade(trade)
|
||
|
|
|
||
|
|
# Convert all buckets to candles with right-aligned timestamps
|
||
|
|
for bucket in buckets.values():
|
||
|
|
candle = bucket.to_candle(is_complete=True)
|
||
|
|
# candle.timestamp = bucket.end_time (right-aligned)
|
||
|
|
completed_candles.append(candle)
|
||
|
|
|
||
|
|
return completed_candles
|
||
|
|
```
|
||
|
|
|
||
|
|
## Common Components
|
||
|
|
|
||
|
|
### Data Types (`data/common/data_types.py`)
|
||
|
|
|
||
|
|
**StandardizedTrade**: Universal trade format
|
||
|
|
```python
|
||
|
|
@dataclass
|
||
|
|
class StandardizedTrade:
|
||
|
|
symbol: str
|
||
|
|
trade_id: str
|
||
|
|
price: Decimal
|
||
|
|
size: Decimal
|
||
|
|
side: str # 'buy' or 'sell'
|
||
|
|
timestamp: datetime
|
||
|
|
exchange: str = "okx"
|
||
|
|
raw_data: Optional[Dict[str, Any]] = None
|
||
|
|
```
|
||
|
|
|
||
|
|
**OHLCVCandle**: Universal candle format
|
||
|
|
```python
|
||
|
|
@dataclass
|
||
|
|
class OHLCVCandle:
|
||
|
|
symbol: str
|
||
|
|
timeframe: str
|
||
|
|
start_time: datetime
|
||
|
|
end_time: datetime
|
||
|
|
open: Decimal
|
||
|
|
high: Decimal
|
||
|
|
low: Decimal
|
||
|
|
close: Decimal
|
||
|
|
volume: Decimal
|
||
|
|
trade_count: int
|
||
|
|
is_complete: bool = False
|
||
|
|
```
|
||
|
|
|
||
|
|
### Aggregation (`data/common/aggregation.py`)
|
||
|
|
|
||
|
|
**RealTimeCandleProcessor**: Handles real-time candle building for any exchange
|
||
|
|
- Processes trades immediately as they arrive
|
||
|
|
- Supports multiple timeframes simultaneously
|
||
|
|
- Emits completed candles when time boundaries cross
|
||
|
|
- Thread-safe and memory efficient
|
||
|
|
|
||
|
|
**BatchCandleProcessor**: Handles historical data processing
|
||
|
|
- Processes large batches of trades efficiently
|
||
|
|
- Memory-optimized for backfill scenarios
|
||
|
|
- Same candle output format as real-time processor
|
||
|
|
|
||
|
|
### Transformation (`data/common/transformation.py`)
|
||
|
|
|
||
|
|
**BaseDataTransformer**: Abstract base class for exchange transformers
|
||
|
|
- Common transformation utilities (timestamp conversion, decimal handling)
|
||
|
|
- Abstract methods for exchange-specific transformations
|
||
|
|
- Consistent error handling patterns
|
||
|
|
|
||
|
|
**UnifiedDataTransformer**: Unified interface for all transformation scenarios
|
||
|
|
- Works with real-time, historical, and backfill data
|
||
|
|
- Handles batch processing efficiently
|
||
|
|
- Integrates with aggregation components
|
||
|
|
|
||
|
|
### Validation (`data/common/validation.py`)
|
||
|
|
|
||
|
|
**BaseDataValidator**: Common validation patterns
|
||
|
|
- Price, size, volume validation
|
||
|
|
- Timestamp validation
|
||
|
|
- Orderbook validation
|
||
|
|
- Generic symbol validation
|
||
|
|
|
||
|
|
## Exchange-Specific Components
|
||
|
|
|
||
|
|
### OKX Data Processor (`data/exchanges/okx/data_processor.py`)
|
||
|
|
|
||
|
|
Now focused only on OKX-specific functionality:
|
||
|
|
|
||
|
|
**OKXDataValidator**: Extends BaseDataValidator
|
||
|
|
- OKX-specific symbol patterns (BTC-USDT format)
|
||
|
|
- OKX message structure validation
|
||
|
|
- OKX field mappings and requirements
|
||
|
|
|
||
|
|
**OKXDataTransformer**: Extends BaseDataTransformer
|
||
|
|
- OKX WebSocket format transformation
|
||
|
|
- OKX-specific field extraction
|
||
|
|
- Integration with common utilities
|
||
|
|
|
||
|
|
**OKXDataProcessor**: Main processor using common framework
|
||
|
|
- Uses common validation and transformation utilities
|
||
|
|
- Significantly simplified (~600 lines vs 1343 lines)
|
||
|
|
- Better separation of concerns
|
||
|
|
|
||
|
|
### Updated OKX Collector (`data/exchanges/okx/collector.py`)
|
||
|
|
|
||
|
|
**Key improvements:**
|
||
|
|
- Uses OKXDataProcessor with common utilities
|
||
|
|
- Automatic candle generation for trades
|
||
|
|
- Simplified message processing
|
||
|
|
- Better error handling and statistics
|
||
|
|
- Callback system for real-time data
|
||
|
|
|
||
|
|
## Usage Examples
|
||
|
|
|
||
|
|
### Creating a New Exchange
|
||
|
|
|
||
|
|
To add support for a new exchange (e.g., Binance):
|
||
|
|
|
||
|
|
1. **Create exchange-specific validator:**
|
||
|
|
```python
|
||
|
|
class BinanceDataValidator(BaseDataValidator):
|
||
|
|
def __init__(self, component_name="binance_validator"):
|
||
|
|
super().__init__("binance", component_name)
|
||
|
|
self._symbol_pattern = re.compile(r'^[A-Z]+[A-Z]+$') # BTCUSDT format
|
||
|
|
|
||
|
|
def validate_symbol_format(self, symbol: str) -> ValidationResult:
|
||
|
|
# Binance-specific symbol validation
|
||
|
|
pass
|
||
|
|
```
|
||
|
|
|
||
|
|
2. **Create exchange-specific transformer:**
|
||
|
|
```python
|
||
|
|
class BinanceDataTransformer(BaseDataTransformer):
|
||
|
|
def transform_trade_data(self, raw_data: Dict[str, Any], symbol: str) -> Optional[StandardizedTrade]:
|
||
|
|
return create_standardized_trade(
|
||
|
|
symbol=raw_data['s'], # Binance field mapping
|
||
|
|
trade_id=raw_data['t'],
|
||
|
|
price=raw_data['p'],
|
||
|
|
size=raw_data['q'],
|
||
|
|
side='buy' if raw_data['m'] else 'sell',
|
||
|
|
timestamp=raw_data['T'],
|
||
|
|
exchange="binance",
|
||
|
|
raw_data=raw_data
|
||
|
|
)
|
||
|
|
```
|
||
|
|
|
||
|
|
3. **Automatic candle support:**
|
||
|
|
```python
|
||
|
|
# Real-time candles work automatically
|
||
|
|
processor = RealTimeCandleProcessor(symbol, "binance", config)
|
||
|
|
for trade in trades:
|
||
|
|
completed_candles = processor.process_trade(trade)
|
||
|
|
```
|
||
|
|
|
||
|
|
### Using Common Utilities
|
||
|
|
|
||
|
|
**Data transformation:**
|
||
|
|
```python
|
||
|
|
# Works with any exchange
|
||
|
|
transformer = UnifiedDataTransformer(exchange_transformer)
|
||
|
|
standardized_trade = transformer.transform_trade_data(raw_trade, symbol)
|
||
|
|
|
||
|
|
# Batch processing
|
||
|
|
candles = transformer.process_trades_to_candles(
|
||
|
|
trades_iterator,
|
||
|
|
['1m', '5m', '1h'],
|
||
|
|
symbol
|
||
|
|
)
|
||
|
|
```
|
||
|
|
|
||
|
|
**Real-time candle processing:**
|
||
|
|
```python
|
||
|
|
# Same code works for any exchange
|
||
|
|
candle_processor = RealTimeCandleProcessor(symbol, exchange, config)
|
||
|
|
candle_processor.add_candle_callback(my_candle_handler)
|
||
|
|
|
||
|
|
for trade in real_time_trades:
|
||
|
|
completed_candles = candle_processor.process_trade(trade)
|
||
|
|
```
|
||
|
|
|
||
|
|
## Testing
|
||
|
|
|
||
|
|
The refactored architecture includes comprehensive testing:
|
||
|
|
|
||
|
|
**Test script:** `scripts/test_refactored_okx.py`
|
||
|
|
- Tests common utilities
|
||
|
|
- Tests OKX-specific components
|
||
|
|
- Tests integration between components
|
||
|
|
- Performance and memory testing
|
||
|
|
|
||
|
|
**Run tests:**
|
||
|
|
```bash
|
||
|
|
python scripts/test_refactored_okx.py
|
||
|
|
```
|
||
|
|
|
||
|
|
## Migration Guide
|
||
|
|
|
||
|
|
### For Existing OKX Code
|
||
|
|
|
||
|
|
1. **Update imports:**
|
||
|
|
```python
|
||
|
|
# Old
|
||
|
|
from data.exchanges.okx.data_processor import StandardizedTrade, OHLCVCandle
|
||
|
|
|
||
|
|
# New
|
||
|
|
from data.common import StandardizedTrade, OHLCVCandle
|
||
|
|
```
|
||
|
|
|
||
|
|
2. **Use new processor:**
|
||
|
|
```python
|
||
|
|
# Old
|
||
|
|
from data.exchanges.okx.data_processor import OKXDataProcessor, UnifiedDataTransformer
|
||
|
|
|
||
|
|
# New
|
||
|
|
from data.exchanges.okx.data_processor import OKXDataProcessor # Uses common utilities internally
|
||
|
|
```
|
||
|
|
|
||
|
|
3. **Existing functionality preserved:**
|
||
|
|
- All existing APIs remain the same
|
||
|
|
- Performance improved due to optimizations
|
||
|
|
- More features available (better candle processing, validation)
|
||
|
|
|
||
|
|
### For New Exchange Development
|
||
|
|
|
||
|
|
1. **Start with common base classes**
|
||
|
|
2. **Implement only exchange-specific validation and transformation**
|
||
|
|
3. **Get candle processing, batch processing, and validation for free**
|
||
|
|
4. **Focus on exchange API integration rather than data processing logic**
|
||
|
|
|
||
|
|
## Performance Improvements
|
||
|
|
|
||
|
|
**Memory Usage:**
|
||
|
|
- Streaming processing reduces memory footprint
|
||
|
|
- Efficient candle bucketing algorithms
|
||
|
|
- Lazy evaluation where possible
|
||
|
|
|
||
|
|
**Processing Speed:**
|
||
|
|
- Optimized validation with early returns
|
||
|
|
- Batch processing capabilities
|
||
|
|
- Parallel processing support
|
||
|
|
|
||
|
|
**Maintainability:**
|
||
|
|
- Smaller, focused components
|
||
|
|
- Better test coverage
|
||
|
|
- Clear error handling and logging
|
||
|
|
|
||
|
|
## Future Enhancements
|
||
|
|
|
||
|
|
**Planned Features:**
|
||
|
|
1. **Exchange Factory Pattern** - Automatically create collectors for any exchange
|
||
|
|
2. **Plugin System** - Load exchange implementations dynamically
|
||
|
|
3. **Configuration-Driven Development** - Define new exchanges via config files
|
||
|
|
4. **Enhanced Analytics** - Built-in technical indicators and statistics
|
||
|
|
5. **Multi-Exchange Arbitrage** - Cross-exchange data synchronization
|
||
|
|
|
||
|
|
This refactored architecture provides a solid foundation for scalable, maintainable cryptocurrency data processing across any number of exchanges while keeping exchange-specific code minimal and focused.
|