# Refactored Data Processing Architecture ## Overview The data processing system has been significantly refactored to improve reusability, maintainability, and scalability across different exchanges. The key improvement is the extraction of common utilities into a shared framework while keeping exchange-specific components focused and minimal. ## Architecture Changes ### Before (Monolithic) ``` data/exchanges/okx/ ├── data_processor.py # 1343 lines - everything in one file ├── collector.py └── websocket.py ``` ### After (Modular) ``` data/ ├── common/ # Shared utilities for all exchanges │ ├── __init__.py │ ├── data_types.py # StandardizedTrade, OHLCVCandle, etc. │ ├── aggregation.py # TimeframeBucket, RealTimeCandleProcessor │ ├── transformation.py # BaseDataTransformer, UnifiedDataTransformer │ └── validation.py # BaseDataValidator, common validation └── exchanges/ └── okx/ ├── data_processor.py # ~600 lines - OKX-specific only ├── collector.py # Updated to use common utilities └── websocket.py ``` ## Key Benefits ### 1. **Reusability Across Exchanges** - Candle aggregation logic works for any exchange - Standardized data formats enable uniform processing - Base classes provide common patterns for new exchanges ### 2. **Maintainability** - Smaller, focused files are easier to understand and modify - Common utilities are tested once and reused everywhere - Clear separation of concerns ### 3. **Extensibility** - Adding new exchanges requires minimal code - New data types and timeframes are automatically supported - Validation and transformation patterns are consistent ### 4. **Performance** - Optimized aggregation algorithms and memory usage - Efficient candle bucketing algorithms - Lazy evaluation where possible ### 5. **Testing** - Modular components are easier to test independently ## Time Aggregation Strategy ### Right-Aligned Timestamps (Industry Standard) The system uses **RIGHT-ALIGNED timestamps** following industry standards from major exchanges (Binance, OKX, Coinbase): - **Candle timestamp = end time of the interval (close time)** - 5-minute candle with timestamp `09:05:00` represents data from `09:00:01` to `09:05:00` - 1-minute candle with timestamp `14:32:00` represents data from `14:31:01` to `14:32:00` - This aligns with how exchanges report historical data ### Aggregation Process (No Future Leakage) ```python def process_trade_realtime(trade: StandardizedTrade, timeframe: str): """ Real-time aggregation with strict future leakage prevention CRITICAL: Only emit completed candles, never incomplete ones """ # 1. Calculate which time bucket this trade belongs to trade_bucket_start = get_bucket_start_time(trade.timestamp, timeframe) # 2. Check if current bucket exists and matches current_bucket = current_buckets.get(timeframe) # 3. Handle time boundary crossing if current_bucket is None: # First bucket for this timeframe current_bucket = create_bucket(trade_bucket_start, timeframe) elif current_bucket.start_time != trade_bucket_start: # Time boundary crossed - complete previous bucket FIRST if current_bucket.has_trades(): completed_candle = current_bucket.to_candle(is_complete=True) emit_candle(completed_candle) # Store in market_data table # Create new bucket for current time period current_bucket = create_bucket(trade_bucket_start, timeframe) # 4. Add trade to current bucket current_bucket.add_trade(trade) # 5. Return only completed candles (never incomplete/future data) return completed_candles # Empty list unless boundary crossed ``` ### Time Bucket Calculation Examples ```python # 5-minute timeframes (00:00, 00:05, 00:10, 00:15, etc.) trade_time = "09:03:45" -> bucket_start = "09:00:00", bucket_end = "09:05:00" trade_time = "09:07:23" -> bucket_start = "09:05:00", bucket_end = "09:10:00" trade_time = "09:05:00" -> bucket_start = "09:05:00", bucket_end = "09:10:00" # 1-hour timeframes (align to hour boundaries) trade_time = "14:35:22" -> bucket_start = "14:00:00", bucket_end = "15:00:00" trade_time = "15:00:00" -> bucket_start = "15:00:00", bucket_end = "16:00:00" # 4-hour timeframes (00:00, 04:00, 08:00, 12:00, 16:00, 20:00) trade_time = "13:45:12" -> bucket_start = "12:00:00", bucket_end = "16:00:00" trade_time = "16:00:01" -> bucket_start = "16:00:00", bucket_end = "20:00:00" ``` ### Future Leakage Prevention **CRITICAL SAFEGUARDS:** 1. **Boundary Crossing Detection**: Only complete candles when trade timestamp definitively crosses time boundary 2. **No Premature Completion**: Never emit incomplete candles during real-time processing 3. **Strict Time Validation**: Trades only added to buckets if `start_time <= trade.timestamp < end_time` 4. **Historical Consistency**: Same logic for real-time and historical processing ```python # CORRECT: Only complete candle when boundary is crossed if current_bucket.start_time != trade_bucket_start: # Time boundary definitely crossed - safe to complete completed_candle = current_bucket.to_candle(is_complete=True) emit_to_storage(completed_candle) # INCORRECT: Would cause future leakage if some_timer_expires(): # Never complete based on timers or external events completed_candle = current_bucket.to_candle(is_complete=True) # WRONG! ``` ### Data Storage Flow ``` WebSocket Trade Data → Validation → Transformation → Aggregation → Storage | | | ↓ ↓ ↓ Raw individual trades Completed OHLCV Incomplete OHLCV | candles (storage) (monitoring only) ↓ | raw_trades table market_data table (debugging/compliance) (trading decisions) ``` **Storage Rules:** - **Raw trades** → `raw_trades` table (every individual trade/orderbook/ticker) - **Completed candles** → `market_data` table (only when timeframe boundary crossed) - **Incomplete candles** → Memory only (never stored, used for monitoring) ### Aggregation Logic Implementation ```python def aggregate_to_timeframe(trades: List[StandardizedTrade], timeframe: str) -> List[OHLCVCandle]: """ Aggregate trades to specified timeframe with right-aligned timestamps """ # Group trades by time intervals buckets = {} completed_candles = [] for trade in sorted(trades, key=lambda t: t.timestamp): # Calculate bucket start time (left boundary) bucket_start = get_bucket_start_time(trade.timestamp, timeframe) # Get or create bucket if bucket_start not in buckets: buckets[bucket_start] = TimeframeBucket(timeframe, bucket_start) # Add trade to bucket buckets[bucket_start].add_trade(trade) # Convert all buckets to candles with right-aligned timestamps for bucket in buckets.values(): candle = bucket.to_candle(is_complete=True) # candle.timestamp = bucket.end_time (right-aligned) completed_candles.append(candle) return completed_candles ``` ## Common Components ### Data Types (`data/common/data_types.py`) **StandardizedTrade**: Universal trade format ```python @dataclass class StandardizedTrade: symbol: str trade_id: str price: Decimal size: Decimal side: str # 'buy' or 'sell' timestamp: datetime exchange: str = "okx" raw_data: Optional[Dict[str, Any]] = None ``` **OHLCVCandle**: Universal candle format ```python @dataclass class OHLCVCandle: symbol: str timeframe: str start_time: datetime end_time: datetime open: Decimal high: Decimal low: Decimal close: Decimal volume: Decimal trade_count: int is_complete: bool = False ``` ### Aggregation (`data/common/aggregation.py`) **RealTimeCandleProcessor**: Handles real-time candle building for any exchange - Processes trades immediately as they arrive - Supports multiple timeframes simultaneously - Emits completed candles when time boundaries cross - Thread-safe and memory efficient **BatchCandleProcessor**: Handles historical data processing - Processes large batches of trades efficiently - Memory-optimized for backfill scenarios - Same candle output format as real-time processor ### Transformation (`data/common/transformation.py`) **BaseDataTransformer**: Abstract base class for exchange transformers - Common transformation utilities (timestamp conversion, decimal handling) - Abstract methods for exchange-specific transformations - Consistent error handling patterns **UnifiedDataTransformer**: Unified interface for all transformation scenarios - Works with real-time, historical, and backfill data - Handles batch processing efficiently - Integrates with aggregation components ### Validation (`data/common/validation.py`) **BaseDataValidator**: Common validation patterns - Price, size, volume validation - Timestamp validation - Orderbook validation - Generic symbol validation ## Exchange-Specific Components ### OKX Data Processor (`data/exchanges/okx/data_processor.py`) Now focused only on OKX-specific functionality: **OKXDataValidator**: Extends BaseDataValidator - OKX-specific symbol patterns (BTC-USDT format) - OKX message structure validation - OKX field mappings and requirements **OKXDataTransformer**: Extends BaseDataTransformer - OKX WebSocket format transformation - OKX-specific field extraction - Integration with common utilities **OKXDataProcessor**: Main processor using common framework - Uses common validation and transformation utilities - Significantly simplified (~600 lines vs 1343 lines) - Better separation of concerns ### Updated OKX Collector (`data/exchanges/okx/collector.py`) **Key improvements:** - Uses OKXDataProcessor with common utilities - Automatic candle generation for trades - Simplified message processing - Better error handling and statistics - Callback system for real-time data ## Usage Examples ### Creating a New Exchange To add support for a new exchange (e.g., Binance): 1. **Create exchange-specific validator:** ```python class BinanceDataValidator(BaseDataValidator): def __init__(self, component_name="binance_validator"): super().__init__("binance", component_name) self._symbol_pattern = re.compile(r'^[A-Z]+[A-Z]+$') # BTCUSDT format def validate_symbol_format(self, symbol: str) -> ValidationResult: # Binance-specific symbol validation pass ``` 2. **Create exchange-specific transformer:** ```python class BinanceDataTransformer(BaseDataTransformer): def transform_trade_data(self, raw_data: Dict[str, Any], symbol: str) -> Optional[StandardizedTrade]: return create_standardized_trade( symbol=raw_data['s'], # Binance field mapping trade_id=raw_data['t'], price=raw_data['p'], size=raw_data['q'], side='buy' if raw_data['m'] else 'sell', timestamp=raw_data['T'], exchange="binance", raw_data=raw_data ) ``` 3. **Automatic candle support:** ```python # Real-time candles work automatically processor = RealTimeCandleProcessor(symbol, "binance", config) for trade in trades: completed_candles = processor.process_trade(trade) ``` ### Using Common Utilities **Data transformation:** ```python # Works with any exchange transformer = UnifiedDataTransformer(exchange_transformer) standardized_trade = transformer.transform_trade_data(raw_trade, symbol) # Batch processing candles = transformer.process_trades_to_candles( trades_iterator, ['1m', '5m', '1h'], symbol ) ``` **Real-time candle processing:** ```python # Same code works for any exchange candle_processor = RealTimeCandleProcessor(symbol, exchange, config) candle_processor.add_candle_callback(my_candle_handler) for trade in real_time_trades: completed_candles = candle_processor.process_trade(trade) ``` ## Testing The refactored architecture includes comprehensive testing: **Test script:** `scripts/test_refactored_okx.py` - Tests common utilities - Tests OKX-specific components - Tests integration between components - Performance and memory testing **Run tests:** ```bash python scripts/test_refactored_okx.py ``` ## Migration Guide ### For Existing OKX Code 1. **Update imports:** ```python # Old from data.exchanges.okx.data_processor import StandardizedTrade, OHLCVCandle # New from data.common import StandardizedTrade, OHLCVCandle ``` 2. **Use new processor:** ```python # Old from data.exchanges.okx.data_processor import OKXDataProcessor, UnifiedDataTransformer # New from data.exchanges.okx.data_processor import OKXDataProcessor # Uses common utilities internally ``` 3. **Existing functionality preserved:** - All existing APIs remain the same - Performance improved due to optimizations - More features available (better candle processing, validation) ### For New Exchange Development 1. **Start with common base classes** 2. **Implement only exchange-specific validation and transformation** 3. **Get candle processing, batch processing, and validation for free** 4. **Focus on exchange API integration rather than data processing logic** ## Performance Improvements **Memory Usage:** - Streaming processing reduces memory footprint - Efficient candle bucketing algorithms - Lazy evaluation where possible **Processing Speed:** - Optimized validation with early returns - Batch processing capabilities - Parallel processing support **Maintainability:** - Smaller, focused components - Better test coverage - Clear error handling and logging ## Future Enhancements **Planned Features:** 1. **Exchange Factory Pattern** - Automatically create collectors for any exchange 2. **Plugin System** - Load exchange implementations dynamically 3. **Configuration-Driven Development** - Define new exchanges via config files 4. **Enhanced Analytics** - Built-in technical indicators and statistics 5. **Multi-Exchange Arbitrage** - Cross-exchange data synchronization This refactored architecture provides a solid foundation for scalable, maintainable cryptocurrency data processing across any number of exchanges while keeping exchange-specific code minimal and focused.