# Data Aggregation Strategy ## Overview This document describes the comprehensive data aggregation strategy used in the TCP Trading Platform for converting real-time trade data into OHLCV (Open, High, Low, Close, Volume) candles across multiple timeframes. ## Core Principles ### 1. Right-Aligned Timestamps (Industry Standard) The system follows the **RIGHT-ALIGNED timestamp** convention used by major exchanges: - **Candle timestamp = end time of the interval (close time)** - This represents when the candle period **closes**, not when it opens - Aligns with Binance, OKX, Coinbase, and other major exchanges - Ensures consistency with historical data APIs **Examples:** ``` 5-minute candle with timestamp 09:05:00: ├─ Represents data from 09:00:01 to 09:05:00 ├─ Includes all trades in the interval [09:00:01, 09:05:00] └─ Candle "closes" at 09:05:00 1-hour candle with timestamp 14:00:00: ├─ Represents data from 13:00:01 to 14:00:00 ├─ Includes all trades in the interval [13:00:01, 14:00:00] └─ Candle "closes" at 14:00:00 ``` ### 2. Future Leakage Prevention **CRITICAL**: The system implements strict safeguards to prevent future leakage: - **Only emit completed candles** when time boundary is definitively crossed - **Never emit incomplete candles** during real-time processing - **No timer-based completion** - only trade timestamp-driven - **Strict time validation** for all trade additions ## Aggregation Process ### Real-Time Processing Flow ```mermaid graph TD A[Trade Arrives from WebSocket] --> B[Extract Timestamp T] B --> C[For Each Timeframe] C --> D[Calculate Bucket Start Time] D --> E{Bucket Exists?} E -->|No| F[Create New Bucket] E -->|Yes| G{Same Time Period?} G -->|Yes| H[Add Trade to Current Bucket] G -->|No| I[Complete Previous Bucket] I --> J[Emit Completed Candle] J --> K[Store in market_data Table] K --> F F --> H H --> L[Update OHLCV Values] L --> M[Continue Processing] ``` ### Time Bucket Calculation The system calculates which time bucket a trade belongs to based on its timestamp: ```python def get_bucket_start_time(timestamp: datetime, timeframe: str) -> datetime: """ Calculate the start time of the bucket for a given trade timestamp. This determines the LEFT boundary of the time interval. The RIGHT boundary (end_time) becomes the candle timestamp. """ # Normalize to remove seconds/microseconds dt = timestamp.replace(second=0, microsecond=0) if timeframe == '1m': # 1-minute: align to minute boundaries return dt elif timeframe == '5m': # 5-minute: 00:00, 00:05, 00:10, 00:15, etc. return dt.replace(minute=(dt.minute // 5) * 5) elif timeframe == '15m': # 15-minute: 00:00, 00:15, 00:30, 00:45 return dt.replace(minute=(dt.minute // 15) * 15) elif timeframe == '1h': # 1-hour: align to hour boundaries return dt.replace(minute=0) elif timeframe == '4h': # 4-hour: 00:00, 04:00, 08:00, 12:00, 16:00, 20:00 return dt.replace(minute=0, hour=(dt.hour // 4) * 4) elif timeframe == '1d': # 1-day: align to midnight UTC return dt.replace(minute=0, hour=0) ``` ### Detailed Examples #### 5-Minute Timeframe Processing ``` Current time: 09:03:45 Trade arrives at: 09:03:45 Step 1: Calculate bucket start time ├─ timeframe = '5m' ├─ minute = 3 ├─ bucket_minute = (3 // 5) * 5 = 0 └─ bucket_start = 09:00:00 Step 2: Bucket boundaries ├─ start_time = 09:00:00 (inclusive) ├─ end_time = 09:05:00 (exclusive) └─ candle_timestamp = 09:05:00 (right-aligned) Step 3: Trade validation ├─ 09:00:00 <= 09:03:45 < 09:05:00 ✓ └─ Trade belongs to this bucket Step 4: OHLCV update ├─ If first trade: set open price ├─ Update high/low prices ├─ Set close price (latest trade) ├─ Add to volume └─ Increment trade count ``` #### Boundary Crossing Example ``` Scenario: 5-minute timeframe, transition from 09:04:59 to 09:05:00 Trade 1: timestamp = 09:04:59 ├─ bucket_start = 09:00:00 ├─ Belongs to current bucket [09:00:00 - 09:05:00) └─ Add to current bucket Trade 2: timestamp = 09:05:00 ├─ bucket_start = 09:05:00 ├─ Different from current bucket (09:00:00) ├─ TIME BOUNDARY CROSSED! ├─ Complete previous bucket → candle with timestamp 09:05:00 ├─ Store completed candle in market_data table ├─ Create new bucket [09:05:00 - 09:10:00) └─ Add Trade 2 to new bucket ``` ## Data Storage Strategy ### Storage Tables #### 1. `raw_trades` Table **Purpose**: Store every individual piece of data as received **Data**: Trades, orderbook updates, tickers **Usage**: Debugging, compliance, detailed analysis ```sql CREATE TABLE raw_trades ( id SERIAL PRIMARY KEY, exchange VARCHAR(50) NOT NULL, symbol VARCHAR(20) NOT NULL, timestamp TIMESTAMPTZ NOT NULL, data_type VARCHAR(20) NOT NULL, -- 'trade', 'orderbook', 'ticker' raw_data JSONB NOT NULL ); ``` #### 2. `market_data` Table **Purpose**: Store completed OHLCV candles for trading decisions **Data**: Only completed candles with right-aligned timestamps **Usage**: Bot strategies, backtesting, analysis ```sql CREATE TABLE market_data ( id SERIAL PRIMARY KEY, exchange VARCHAR(50) NOT NULL, symbol VARCHAR(20) NOT NULL, timeframe VARCHAR(5) NOT NULL, timestamp TIMESTAMPTZ NOT NULL, -- RIGHT-ALIGNED (candle close time) open DECIMAL(18,8) NOT NULL, high DECIMAL(18,8) NOT NULL, low DECIMAL(18,8) NOT NULL, close DECIMAL(18,8) NOT NULL, volume DECIMAL(18,8) NOT NULL, trades_count INTEGER ); ``` ### Storage Flow ``` WebSocket Message ├─ Contains multiple trades ├─ Each trade stored in raw_trades table └─ Each trade processed through aggregation Aggregation Engine ├─ Groups trades by timeframe buckets ├─ Updates OHLCV values incrementally ├─ Detects time boundary crossings └─ Emits completed candles only Completed Candles ├─ Stored in market_data table ├─ Timestamp = bucket end time (right-aligned) ├─ is_complete = true └─ Available for trading strategies ``` ## Future Leakage Prevention ### Critical Safeguards #### 1. Boundary Crossing Detection ```python # CORRECT: Only complete when boundary definitively crossed if current_bucket.start_time != trade_bucket_start: # Time boundary crossed - safe to complete previous bucket if current_bucket.trade_count > 0: completed_candle = current_bucket.to_candle(is_complete=True) emit_candle(completed_candle) ``` #### 2. No Premature Completion ```python # WRONG: Never complete based on timers or external events if time.now() > bucket.end_time: completed_candle = bucket.to_candle(is_complete=True) # FUTURE LEAKAGE! # WRONG: Never complete incomplete buckets during real-time if some_condition: completed_candle = current_bucket.to_candle(is_complete=True) # WRONG! ``` #### 3. Strict Time Validation ```python def add_trade(self, trade: StandardizedTrade) -> bool: # Only accept trades within bucket boundaries if not (self.start_time <= trade.timestamp < self.end_time): return False # Reject trades outside time range # Safe to add trade self.update_ohlcv(trade) return True ``` #### 4. Historical Consistency ```python # Same logic for real-time and historical processing def process_trade(trade): """Used for both real-time WebSocket and historical API data""" return self._process_trade_for_timeframe(trade, timeframe) ``` ## Testing Strategy ### Validation Tests 1. **Timestamp Alignment Tests** - Verify candle timestamps are right-aligned - Check bucket boundary calculations - Validate timeframe-specific alignment 2. **Future Leakage Tests** - Ensure no incomplete candles are emitted - Verify boundary crossing detection - Test with edge case timestamps 3. **Data Integrity Tests** - OHLCV calculation accuracy - Volume aggregation correctness - Trade count validation ### Test Examples ```python def test_right_aligned_timestamps(): """Test that candle timestamps are right-aligned""" trades = [ create_trade("09:01:30", price=100), create_trade("09:03:45", price=101), create_trade("09:05:00", price=102), # Boundary crossing ] candles = process_trades(trades, timeframe='5m') # First candle should have timestamp 09:05:00 (right-aligned) assert candles[0].timestamp == datetime(hour=9, minute=5) assert candles[0].start_time == datetime(hour=9, minute=0) assert candles[0].end_time == datetime(hour=9, minute=5) def test_no_future_leakage(): """Test that incomplete candles are never emitted""" processor = RealTimeCandleProcessor(symbol='BTC-USDT', timeframes=['5m']) # Add trades within same bucket trade1 = create_trade("09:01:00", price=100) trade2 = create_trade("09:03:00", price=101) # Should return empty list (no completed candles) completed = processor.process_trade(trade1) assert len(completed) == 0 completed = processor.process_trade(trade2) assert len(completed) == 0 # Only when boundary crossed should candle be emitted trade3 = create_trade("09:05:00", price=102) completed = processor.process_trade(trade3) assert len(completed) == 1 # Previous bucket completed assert completed[0].is_complete == True ``` ## Performance Considerations ### Memory Management - Keep only current buckets in memory - Clear completed buckets immediately after emission - Limit maximum number of active timeframes ### Database Optimization - Batch insert completed candles - Use prepared statements for frequent inserts - Index on (symbol, timeframe, timestamp) for queries ### Processing Efficiency - Process all timeframes in single trade iteration - Use efficient bucket start time calculations - Minimize object creation in hot paths ## Conclusion This aggregation strategy ensures: ✅ **Industry Standard Compliance**: Right-aligned timestamps matching major exchanges ✅ **Future Leakage Prevention**: Strict boundary detection and validation ✅ **Data Integrity**: Accurate OHLCV calculations and storage ✅ **Performance**: Efficient real-time and batch processing ✅ **Consistency**: Same logic for real-time and historical data The implementation provides a robust foundation for building trading strategies with confidence in data accuracy and timing.