#!/usr/bin/env python3 """ Example: Complete Time Series Aggregation This example shows how to modify the aggregation system to emit candles for every time period, even when there are no trades. """ import asyncio from datetime import datetime, timezone, timedelta from decimal import Decimal from typing import Dict, List, Optional from data.common.data_types import StandardizedTrade, OHLCVCandle, CandleProcessingConfig from data.common.aggregation import RealTimeCandleProcessor class CompleteSeriesProcessor(RealTimeCandleProcessor): """ Extended processor that emits candles for every time period, filling gaps with previous close prices when no trades occur. """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.last_prices = {} # Track last known price for each timeframe self.timers = {} # Timer tasks for each timeframe async def start_time_based_emission(self): """Start timers to emit candles on time boundaries regardless of trades.""" for timeframe in self.config.timeframes: self.timers[timeframe] = asyncio.create_task( self._time_based_candle_emitter(timeframe) ) async def stop_time_based_emission(self): """Stop all timers.""" for task in self.timers.values(): task.cancel() self.timers.clear() async def _time_based_candle_emitter(self, timeframe: str): """Emit candles on time boundaries for a specific timeframe.""" try: while True: # Calculate next boundary now = datetime.now(timezone.utc) next_boundary = self._get_next_time_boundary(now, timeframe) # Wait until next boundary wait_seconds = (next_boundary - now).total_seconds() if wait_seconds > 0: await asyncio.sleep(wait_seconds) # Check if we have an active bucket with trades current_bucket = self.current_buckets.get(timeframe) if current_bucket is None or current_bucket.trade_count == 0: # No trades during this period - create empty candle await self._emit_empty_candle(timeframe, next_boundary) # If there are trades, they will be handled by normal trade processing except asyncio.CancelledError: pass # Timer was cancelled async def _emit_empty_candle(self, timeframe: str, end_time: datetime): """Emit an empty candle when no trades occurred during the period.""" try: # Calculate start time start_time = self._get_bucket_start_time(end_time - timedelta(seconds=1), timeframe) # Use last known price or default last_price = self.last_prices.get(timeframe, Decimal('0')) # Create empty candle with last known price as OHLC empty_candle = OHLCVCandle( symbol=self.symbol, timeframe=timeframe, start_time=start_time, end_time=end_time, open=last_price, high=last_price, low=last_price, close=last_price, volume=Decimal('0'), trade_count=0, exchange=self.exchange, is_complete=True, first_trade_time=None, last_trade_time=None ) # Emit the empty candle self._emit_candle(empty_candle) if self.logger: self.logger.info( f"ā­• {timeframe.upper()} EMPTY CANDLE at {end_time.strftime('%H:%M:%S')}: " f"No trades, using last price ${last_price}" ) except Exception as e: if self.logger: self.logger.error(f"Error emitting empty candle: {e}") def _emit_candle(self, candle: OHLCVCandle) -> None: """Override to track last prices.""" # Update last known price if candle.close > 0: self.last_prices[candle.timeframe] = candle.close # Call parent implementation super()._emit_candle(candle) def _get_next_time_boundary(self, current_time: datetime, timeframe: str) -> datetime: """Calculate the next time boundary for a timeframe.""" if timeframe == '1s': # Next second boundary return (current_time + timedelta(seconds=1)).replace(microsecond=0) elif timeframe == '5s': # Next 5-second boundary next_sec = (current_time.second // 5 + 1) * 5 if next_sec >= 60: return current_time.replace(second=0, microsecond=0, minute=current_time.minute + 1) return current_time.replace(second=next_sec, microsecond=0) elif timeframe == '10s': # Next 10-second boundary next_sec = (current_time.second // 10 + 1) * 10 if next_sec >= 60: return current_time.replace(second=0, microsecond=0, minute=current_time.minute + 1) return current_time.replace(second=next_sec, microsecond=0) elif timeframe == '15s': # Next 15-second boundary next_sec = (current_time.second // 15 + 1) * 15 if next_sec >= 60: return current_time.replace(second=0, microsecond=0, minute=current_time.minute + 1) return current_time.replace(second=next_sec, microsecond=0) elif timeframe == '30s': # Next 30-second boundary next_sec = (current_time.second // 30 + 1) * 30 if next_sec >= 60: return current_time.replace(second=0, microsecond=0, minute=current_time.minute + 1) return current_time.replace(second=next_sec, microsecond=0) elif timeframe == '1m': # Next minute boundary return (current_time + timedelta(minutes=1)).replace(second=0, microsecond=0) elif timeframe == '5m': # Next 5-minute boundary next_min = (current_time.minute // 5 + 1) * 5 if next_min >= 60: return current_time.replace(minute=0, second=0, microsecond=0, hour=current_time.hour + 1) return current_time.replace(minute=next_min, second=0, microsecond=0) else: # For other timeframes, add appropriate logic return current_time + timedelta(minutes=1) # Example usage async def demo_complete_series(): """Demonstrate complete time series aggregation.""" print("šŸ• Complete Time Series Aggregation Demo") print("This will emit candles even when no trades occur\n") # Create processor with complete series capability config = CandleProcessingConfig(timeframes=['1s', '5s', '30s']) processor = CompleteSeriesProcessor( symbol="BTC-USDT", exchange="demo", config=config, component_name="complete_series_demo" ) # Set initial price processor.last_prices = {'1s': Decimal('50000'), '5s': Decimal('50000'), '30s': Decimal('50000')} # Add callback to see emitted candles def on_candle(candle: OHLCVCandle): candle_type = "TRADE" if candle.trade_count > 0 else "EMPTY" print(f"šŸ“Š {candle_type} {candle.timeframe.upper()} at {candle.end_time.strftime('%H:%M:%S')}: " f"${candle.close} (T={candle.trade_count})") processor.add_candle_callback(on_candle) # Start time-based emission await processor.start_time_based_emission() try: # Simulate some trades with gaps print("Simulating trades with gaps...\n") base_time = datetime.now(timezone.utc) # Trade at T+0 trade1 = StandardizedTrade( symbol="BTC-USDT", trade_id="1", price=Decimal('50100'), size=Decimal('0.1'), side="buy", timestamp=base_time, exchange="demo" ) processor.process_trade(trade1) # Wait 3 seconds (should see empty candles for missing periods) await asyncio.sleep(3) # Trade at T+3 trade2 = StandardizedTrade( symbol="BTC-USDT", trade_id="2", price=Decimal('50200'), size=Decimal('0.2'), side="sell", timestamp=base_time + timedelta(seconds=3), exchange="demo" ) processor.process_trade(trade2) # Wait more to see more empty candles await asyncio.sleep(5) print("\nāœ… Demo completed - You can see both trade candles and empty candles") finally: await processor.stop_time_based_emission() if __name__ == "__main__": print("Complete Time Series Aggregation Example") print("=" * 50) print("This shows how to emit candles even when no trades occur.") print("Uncomment the line below to run the demo:\n") # Uncomment to run the demo: # asyncio.run(demo_complete_series())