diff --git a/data/common/__init__.py b/data/common/__init__.py index d41ce5c..759fe06 100644 --- a/data/common/__init__.py +++ b/data/common/__init__.py @@ -9,15 +9,14 @@ from .data_types import ( StandardizedTrade, OHLCVCandle, MarketDataPoint, - DataValidationResult -) - -from .aggregation import ( - TimeframeBucket, - RealTimeCandleProcessor, + DataValidationResult, CandleProcessingConfig ) +from .aggregation import TimeframeBucket +# Temporarily import from old location until we move these classes +from .aggregation import RealTimeCandleProcessor + from .transformation import ( BaseDataTransformer, UnifiedDataTransformer, @@ -42,11 +41,11 @@ __all__ = [ 'OHLCVCandle', 'MarketDataPoint', 'DataValidationResult', + 'CandleProcessingConfig', # Aggregation 'TimeframeBucket', 'RealTimeCandleProcessor', - 'CandleProcessingConfig', # Transformation 'BaseDataTransformer', diff --git a/data/common/aggregation.py b/data/common/aggregation.py deleted file mode 100644 index 0e44ce7..0000000 --- a/data/common/aggregation.py +++ /dev/null @@ -1,598 +0,0 @@ -""" -Common aggregation utilities for all exchanges. - -This module provides shared functionality for building OHLCV candles -from trade data, regardless of the source exchange. - -AGGREGATION STRATEGY: -- Uses RIGHT-ALIGNED timestamps (industry standard) -- Candle timestamp = end time of the interval (close time) -- 5-minute candle with timestamp 09:05:00 represents data from 09:00:01 to 09:05:00 -- Prevents future leakage by only completing candles when time boundary is crossed -- Aligns with major exchanges (Binance, OKX, Coinbase) - -PROCESS FLOW: -1. Trade arrives with timestamp T -2. Calculate which time bucket this trade belongs to -3. If bucket doesn't exist or time boundary crossed, complete previous bucket -4. Add trade to current bucket -5. Only emit completed candles (never future data) -""" - -from datetime import datetime, timezone, timedelta -from decimal import Decimal -from typing import Dict, List, Optional, Any, Iterator, Callable -from collections import defaultdict - -from .data_types import ( - StandardizedTrade, - OHLCVCandle, - CandleProcessingConfig, - ProcessingStats -) - - -class TimeframeBucket: - """ - Time bucket for building OHLCV candles from trades. - - This class accumulates trades within a specific time period - and calculates OHLCV data incrementally. - - IMPORTANT: Uses RIGHT-ALIGNED timestamps - - start_time: Beginning of the interval (inclusive) - - end_time: End of the interval (exclusive) - this becomes the candle timestamp - - Example: 09:00:00 - 09:05:00 bucket -> candle timestamp = 09:05:00 - """ - - def __init__(self, symbol: str, timeframe: str, start_time: datetime, exchange: str = "unknown"): - """ - Initialize time bucket for candle aggregation. - - Args: - symbol: Trading symbol (e.g., 'BTC-USDT') - timeframe: Time period (e.g., '1m', '5m', '1h') - start_time: Start time for this bucket (inclusive) - exchange: Exchange name - """ - self.symbol = symbol - self.timeframe = timeframe - self.start_time = start_time - self.end_time = self._calculate_end_time(start_time, timeframe) - self.exchange = exchange - - # OHLCV data - self.open: Optional[Decimal] = None - self.high: Optional[Decimal] = None - self.low: Optional[Decimal] = None - self.close: Optional[Decimal] = None - self.volume: Decimal = Decimal('0') - self.trade_count: int = 0 - - # Tracking - self.first_trade_time: Optional[datetime] = None - self.last_trade_time: Optional[datetime] = None - self.trades: List[StandardizedTrade] = [] - - def add_trade(self, trade: StandardizedTrade) -> bool: - """ - Add trade to this bucket if it belongs to this time period. - - Args: - trade: Standardized trade data - - Returns: - True if trade was added, False if outside time range - """ - # Check if trade belongs in this bucket (start_time <= trade.timestamp < end_time) - if not (self.start_time <= trade.timestamp < self.end_time): - return False - - # First trade sets open price - if self.open is None: - self.open = trade.price - self.high = trade.price - self.low = trade.price - self.first_trade_time = trade.timestamp - - # Update OHLCV - self.high = max(self.high, trade.price) - self.low = min(self.low, trade.price) - self.close = trade.price # Last trade sets close - self.volume += trade.size - self.trade_count += 1 - self.last_trade_time = trade.timestamp - - # Store trade for detailed analysis if needed - self.trades.append(trade) - - return True - - def to_candle(self, is_complete: bool = True) -> OHLCVCandle: - """ - Convert bucket to OHLCV candle. - - IMPORTANT: Candle timestamp = end_time (right-aligned, industry standard) - """ - return OHLCVCandle( - symbol=self.symbol, - timeframe=self.timeframe, - start_time=self.start_time, - end_time=self.end_time, - open=self.open or Decimal('0'), - high=self.high or Decimal('0'), - low=self.low or Decimal('0'), - close=self.close or Decimal('0'), - volume=self.volume, - trade_count=self.trade_count, - exchange=self.exchange, - is_complete=is_complete, - first_trade_time=self.first_trade_time, - last_trade_time=self.last_trade_time - ) - - def _calculate_end_time(self, start_time: datetime, timeframe: str) -> datetime: - """Calculate end time for this timeframe (right-aligned timestamp).""" - if timeframe == '1s': - return start_time + timedelta(seconds=1) - elif timeframe == '5s': - return start_time + timedelta(seconds=5) - elif timeframe == '10s': - return start_time + timedelta(seconds=10) - elif timeframe == '15s': - return start_time + timedelta(seconds=15) - elif timeframe == '30s': - return start_time + timedelta(seconds=30) - elif timeframe == '1m': - return start_time + timedelta(minutes=1) - elif timeframe == '5m': - return start_time + timedelta(minutes=5) - elif timeframe == '15m': - return start_time + timedelta(minutes=15) - elif timeframe == '30m': - return start_time + timedelta(minutes=30) - elif timeframe == '1h': - return start_time + timedelta(hours=1) - elif timeframe == '4h': - return start_time + timedelta(hours=4) - elif timeframe == '1d': - return start_time + timedelta(days=1) - else: - raise ValueError(f"Unsupported timeframe: {timeframe}") - - -class RealTimeCandleProcessor: - """ - Real-time candle processor for live trade data. - - This class processes trades immediately as they arrive from WebSocket, - building candles incrementally and emitting completed candles when - time boundaries are crossed. - - AGGREGATION PROCESS (NO FUTURE LEAKAGE): - - 1. Trade arrives from WebSocket/API with timestamp T - 2. For each configured timeframe (1m, 5m, etc.): - a. Calculate which time bucket this trade belongs to - b. Get current bucket for this timeframe - c. Check if trade timestamp crosses time boundary - d. If boundary crossed: complete and emit previous bucket, create new bucket - e. Add trade to current bucket (updates OHLCV) - 3. Only emit candles when time boundary is definitively crossed - 4. Never emit incomplete/future candles during real-time processing - - TIMESTAMP ALIGNMENT: - - Uses RIGHT-ALIGNED timestamps (industry standard) - - 1-minute candle covering 09:00:00-09:01:00 gets timestamp 09:01:00 - - 5-minute candle covering 09:00:00-09:05:00 gets timestamp 09:05:00 - - Candle represents PAST data, never future - """ - - def __init__(self, - symbol: str, - exchange: str, - config: Optional[CandleProcessingConfig] = None, - component_name: str = "realtime_candle_processor", - logger = None): - """ - Initialize real-time candle processor. - - Args: - symbol: Trading symbol (e.g., 'BTC-USDT') - exchange: Exchange name (e.g., 'okx', 'binance') - config: Processing configuration - component_name: Name for logging - """ - self.symbol = symbol - self.exchange = exchange - self.config = config or CandleProcessingConfig() - self.component_name = component_name - self.logger = logger - - # Current buckets for each timeframe - self.current_buckets: Dict[str, TimeframeBucket] = {} - - # Callback functions for completed candles - self.candle_callbacks: List[Callable[[OHLCVCandle], None]] = [] - - # Statistics - self.stats = ProcessingStats(active_timeframes=len(self.config.timeframes)) - - if self.logger: - self.logger.info(f"{self.component_name}: Initialized real-time candle processor for {symbol} on {exchange} with timeframes: {self.config.timeframes}") - - def add_candle_callback(self, callback: Callable[[OHLCVCandle], None]) -> None: - """Add callback function to receive completed candles.""" - self.candle_callbacks.append(callback) - if self.logger: - self.logger.debug(f"{self.component_name}: Added candle callback: {callback.__name__ if hasattr(callback, '__name__') else str(callback)}") - - def process_trade(self, trade: StandardizedTrade) -> List[OHLCVCandle]: - """ - Process single trade - main entry point for real-time processing. - - This is called for each trade as it arrives from WebSocket. - - CRITICAL: Only returns completed candles (time boundary crossed) - Never returns incomplete/future candles to prevent leakage. - - Args: - trade: Standardized trade data - - Returns: - List of completed candles (if any time boundaries were crossed) - """ - try: - completed_candles = [] - - # Process trade for each timeframe - for timeframe in self.config.timeframes: - candle = self._process_trade_for_timeframe(trade, timeframe) - if candle: - completed_candles.append(candle) - - # Update statistics - self.stats.trades_processed += 1 - self.stats.last_trade_time = trade.timestamp - - # Emit completed candles to callbacks - for candle in completed_candles: - self._emit_candle(candle) - - return completed_candles - - except Exception as e: - if self.logger: - self.logger.error(f"{self.component_name}: Error processing trade for {self.symbol}: {e}") - self.stats.errors_count += 1 - return [] - - def _process_trade_for_timeframe(self, trade: StandardizedTrade, timeframe: str) -> Optional[OHLCVCandle]: - """ - Process trade for specific timeframe. - - CRITICAL LOGIC FOR PREVENTING FUTURE LEAKAGE: - 1. Calculate which bucket this trade belongs to - 2. Check if current bucket exists and matches - 3. If bucket mismatch (time boundary crossed), complete current bucket first - 4. Create new bucket and add trade - 5. Only return completed candles, never incomplete ones - """ - try: - # Calculate which bucket this trade belongs to - trade_bucket_start = self._get_bucket_start_time(trade.timestamp, timeframe) - - # Check if we have a current bucket for this timeframe - current_bucket = self.current_buckets.get(timeframe) - completed_candle = None - - # If no bucket exists or time boundary crossed, handle transition - if current_bucket is None: - # First bucket for this timeframe - current_bucket = TimeframeBucket(self.symbol, timeframe, trade_bucket_start, self.exchange) - self.current_buckets[timeframe] = current_bucket - elif current_bucket.start_time != trade_bucket_start: - # Time boundary crossed - complete previous bucket - if current_bucket.trade_count > 0: # Only complete if it has trades - completed_candle = current_bucket.to_candle(is_complete=True) - self.stats.candles_emitted += 1 - self.stats.last_candle_time = completed_candle.end_time - - # Create new bucket for current time period - current_bucket = TimeframeBucket(self.symbol, timeframe, trade_bucket_start, self.exchange) - self.current_buckets[timeframe] = current_bucket - - # Add trade to current bucket - if not current_bucket.add_trade(trade): - # This should never happen if logic is correct - if self.logger: - self.logger.warning(f"{self.component_name}: Trade {trade.timestamp} could not be added to bucket {current_bucket.start_time}-{current_bucket.end_time}") - - return completed_candle - - except Exception as e: - if self.logger: - self.logger.error(f"{self.component_name}: Error processing trade for timeframe {timeframe}: {e}") - self.stats.errors_count += 1 - return None - - def _get_bucket_start_time(self, timestamp: datetime, timeframe: str) -> datetime: - """ - Calculate bucket start time for given timestamp and timeframe. - - This function determines which time bucket a trade belongs to. - The start time is the LEFT boundary of the interval. - - EXAMPLES: - - Trade at 09:03:45.123 for 1s timeframe -> bucket start = 09:03:45.000 - - Trade at 09:03:47.456 for 5s timeframe -> bucket start = 09:03:45.000 (45-50s bucket) - - Trade at 09:03:52.789 for 10s timeframe -> bucket start = 09:03:50.000 (50-60s bucket) - - Trade at 09:03:23.456 for 15s timeframe -> bucket start = 09:03:15.000 (15-30s bucket) - - Trade at 09:03:45 for 5m timeframe -> bucket start = 09:00:00 - - Trade at 09:07:23 for 5m timeframe -> bucket start = 09:05:00 - - Trade at 14:00:00 for 1h timeframe -> bucket start = 14:00:00 - - Args: - timestamp: Trade timestamp - timeframe: Target timeframe - - Returns: - Bucket start time (left boundary) - """ - if timeframe == '1s': - # 1-second buckets align to second boundaries (remove microseconds) - return timestamp.replace(microsecond=0) - elif timeframe == '5s': - # 5-second buckets: 00:00, 00:05, 00:10, 00:15, etc. - dt = timestamp.replace(microsecond=0) - return dt.replace(second=(dt.second // 5) * 5) - elif timeframe == '10s': - # 10-second buckets: 00:00, 00:10, 00:20, 00:30, 00:40, 00:50 - dt = timestamp.replace(microsecond=0) - return dt.replace(second=(dt.second // 10) * 10) - elif timeframe == '15s': - # 15-second buckets: 00:00, 00:15, 00:30, 00:45 - dt = timestamp.replace(microsecond=0) - return dt.replace(second=(dt.second // 15) * 15) - elif timeframe == '30s': - # 30-second buckets: 00:00, 00:30 - dt = timestamp.replace(microsecond=0) - return dt.replace(second=(dt.second // 30) * 30) - - # Normalize to UTC and remove microseconds for clean boundaries - dt = timestamp.replace(second=0, microsecond=0) - - if timeframe == '1m': - # 1-minute buckets align to minute boundaries - return dt - elif timeframe == '5m': - # 5-minute buckets: 00:00, 00:05, 00:10, etc. - return dt.replace(minute=(dt.minute // 5) * 5) - elif timeframe == '15m': - # 15-minute buckets: 00:00, 00:15, 00:30, 00:45 - return dt.replace(minute=(dt.minute // 15) * 15) - elif timeframe == '30m': - # 30-minute buckets: 00:00, 00:30 - return dt.replace(minute=(dt.minute // 30) * 30) - elif timeframe == '1h': - # 1-hour buckets align to hour boundaries - return dt.replace(minute=0) - elif timeframe == '4h': - # 4-hour buckets: 00:00, 04:00, 08:00, 12:00, 16:00, 20:00 - return dt.replace(minute=0, hour=(dt.hour // 4) * 4) - elif timeframe == '1d': - # 1-day buckets align to day boundaries (midnight UTC) - return dt.replace(minute=0, hour=0) - else: - raise ValueError(f"Unsupported timeframe: {timeframe}") - - def _emit_candle(self, candle: OHLCVCandle) -> None: - """Emit completed candle to all callbacks.""" - try: - for callback in self.candle_callbacks: - callback(candle) - except Exception as e: - if self.logger: - self.logger.error(f"{self.component_name}: Error in candle callback: {e}") - self.stats.errors_count += 1 - - def get_current_candles(self, incomplete: bool = True) -> List[OHLCVCandle]: - """ - Get current incomplete candles for all timeframes. - - WARNING: These are incomplete candles and should NOT be used for trading decisions. - They are useful for monitoring/debugging only. - """ - candles = [] - for bucket in self.current_buckets.values(): - if bucket.trade_count > 0: # Only return buckets with trades - candles.append(bucket.to_candle(is_complete=False)) - return candles - - def force_complete_all_candles(self) -> List[OHLCVCandle]: - """ - Force completion of all current candles (useful for shutdown/batch processing). - - WARNING: This should only be used during shutdown or batch processing, - not during live trading as it forces incomplete candles to be marked complete. - """ - completed_candles = [] - for bucket in self.current_buckets.values(): - if bucket.trade_count > 0: - candle = bucket.to_candle(is_complete=True) - completed_candles.append(candle) - self._emit_candle(candle) - - # Clear buckets - self.current_buckets.clear() - return completed_candles - - def get_stats(self) -> Dict[str, Any]: - """Get processing statistics.""" - stats_dict = self.stats.to_dict() - stats_dict['current_buckets'] = { - tf: bucket.trade_count for tf, bucket in self.current_buckets.items() - } - return stats_dict - - -class BatchCandleProcessor: - """ - Batch candle processor for historical data processing. - - This class processes large batches of historical trades efficiently, - building candles for multiple timeframes simultaneously. - """ - - def __init__(self, - symbol: str, - exchange: str, - timeframes: List[str], - component_name: str = "batch_candle_processor", - logger = None): - """ - Initialize batch candle processor. - - Args: - symbol: Trading symbol - exchange: Exchange name - timeframes: List of timeframes to process - component_name: Name for logging - """ - self.symbol = symbol - self.exchange = exchange - self.timeframes = timeframes - self.component_name = component_name - self.logger = logger - - # Statistics - self.stats = ProcessingStats(active_timeframes=len(timeframes)) - - if self.logger: - self.logger.info(f"{self.component_name}: Initialized batch candle processor for {symbol} on {exchange}") - - def process_trades_to_candles(self, trades: Iterator[StandardizedTrade]) -> List[OHLCVCandle]: - """ - Process trade iterator to candles - optimized for batch processing. - - This function handles ALL scenarios: - - Historical: Batch trade iterators - - Backfill: API trade iterators - - Real-time batch: Multiple trades at once - - Args: - trades: Iterator of standardized trades - - Returns: - List of completed candles - """ - try: - # Create temporary processor for this batch - config = CandleProcessingConfig(timeframes=self.timeframes, auto_save_candles=False) - processor = RealTimeCandleProcessor( - self.symbol, self.exchange, config, - f"batch_processor_{self.symbol}_{self.exchange}" - ) - - all_candles = [] - - # Process all trades - for trade in trades: - completed_candles = processor.process_trade(trade) - all_candles.extend(completed_candles) - self.stats.trades_processed += 1 - - # Force complete any remaining candles - remaining_candles = processor.force_complete_all_candles() - all_candles.extend(remaining_candles) - - # Update stats - self.stats.candles_emitted = len(all_candles) - if all_candles: - self.stats.last_candle_time = max(candle.end_time for candle in all_candles) - - if self.logger: - self.logger.info(f"{self.component_name}: Batch processed {self.stats.trades_processed} trades to {len(all_candles)} candles") - return all_candles - - except Exception as e: - if self.logger: - self.logger.error(f"{self.component_name}: Error in batch processing trades to candles: {e}") - self.stats.errors_count += 1 - return [] - - def get_stats(self) -> Dict[str, Any]: - """Get processing statistics.""" - return self.stats.to_dict() - - -# Utility functions for common aggregation operations - -def aggregate_trades_to_candles(trades: List[StandardizedTrade], - timeframes: List[str], - symbol: str, - exchange: str) -> List[OHLCVCandle]: - """ - Simple utility function to aggregate a list of trades to candles. - - Args: - trades: List of standardized trades - timeframes: List of timeframes to generate - symbol: Trading symbol - exchange: Exchange name - - Returns: - List of completed candles - """ - processor = BatchCandleProcessor(symbol, exchange, timeframes) - return processor.process_trades_to_candles(iter(trades)) - - -def validate_timeframe(timeframe: str) -> bool: - """ - Validate if timeframe is supported. - - Args: - timeframe: Timeframe string (e.g., '1s', '5s', '10s', '1m', '5m', '1h') - - Returns: - True if supported, False otherwise - """ - supported = ['1s', '5s', '10s', '15s', '30s', '1m', '5m', '15m', '30m', '1h', '4h', '1d'] - return timeframe in supported - - -def parse_timeframe(timeframe: str) -> tuple[int, str]: - """ - Parse timeframe string into number and unit. - - Args: - timeframe: Timeframe string (e.g., '1s', '5m', '1h') - - Returns: - Tuple of (number, unit) - - Examples: - '1s' -> (1, 's') - '5m' -> (5, 'm') - '1h' -> (1, 'h') - '1d' -> (1, 'd') - """ - import re - match = re.match(r'^(\d+)([smhd])$', timeframe.lower()) - if not match: - raise ValueError(f"Invalid timeframe format: {timeframe}") - - number = int(match.group(1)) - unit = match.group(2) - return number, unit - - -__all__ = [ - 'TimeframeBucket', - 'RealTimeCandleProcessor', - 'BatchCandleProcessor', - 'aggregate_trades_to_candles', - 'validate_timeframe', - 'parse_timeframe' -] \ No newline at end of file diff --git a/data/common/aggregation/__init__.py b/data/common/aggregation/__init__.py new file mode 100644 index 0000000..86a9be4 --- /dev/null +++ b/data/common/aggregation/__init__.py @@ -0,0 +1,34 @@ +""" +Aggregation package for market data processing. + +This package provides functionality for building OHLCV candles from trade data, +with support for both real-time and batch processing. It handles: + +- Time-based bucketing of trades +- Real-time candle construction +- Batch processing for historical data +- Multiple timeframe support + +Note: The actual class exports will be added here once the refactoring is complete. +""" + +from .bucket import TimeframeBucket +from .realtime import RealTimeCandleProcessor +from .batch import BatchCandleProcessor +from .utils import ( + aggregate_trades_to_candles, + validate_timeframe, + parse_timeframe +) + +__all__ = [ + 'TimeframeBucket', + 'RealTimeCandleProcessor', + 'BatchCandleProcessor', + 'aggregate_trades_to_candles', + 'validate_timeframe', + 'parse_timeframe' +] + +# Placeholder for future imports and exports +# These will be added as we move the classes into their respective modules \ No newline at end of file diff --git a/data/common/aggregation/batch.py b/data/common/aggregation/batch.py new file mode 100644 index 0000000..4d0e0ee --- /dev/null +++ b/data/common/aggregation/batch.py @@ -0,0 +1,153 @@ +""" +Batch candle processor for historical trade data. + +This module provides the BatchCandleProcessor class for building OHLCV candles +from historical trade data in batch mode. +""" + +from datetime import datetime +from typing import Dict, List, Any, Iterator +from collections import defaultdict + +from ..data_types import StandardizedTrade, OHLCVCandle, ProcessingStats +from .bucket import TimeframeBucket + + +class BatchCandleProcessor: + """ + Batch candle processor for historical trade data. + + This class processes trades in batch mode, building candles for multiple + timeframes simultaneously. It's optimized for processing large amounts + of historical trade data efficiently. + """ + + def __init__(self, + symbol: str, + exchange: str, + timeframes: List[str], + component_name: str = "batch_candle_processor", + logger = None): + """ + Initialize batch candle processor. + + Args: + symbol: Trading symbol (e.g., 'BTC-USDT') + exchange: Exchange name + timeframes: List of timeframes to process (e.g., ['1m', '5m']) + component_name: Name for logging/stats + logger: Optional logger instance + """ + self.symbol = symbol + self.exchange = exchange + self.timeframes = timeframes + self.component_name = component_name + self.logger = logger + + # Stats tracking + self.stats = ProcessingStats() + + def process_trades_to_candles(self, trades: Iterator[StandardizedTrade]) -> List[OHLCVCandle]: + """ + Process trades in batch and return completed candles. + + Args: + trades: Iterator of trades to process + + Returns: + List of completed candles for all timeframes + """ + # Track buckets for each timeframe + buckets: Dict[str, Dict[datetime, TimeframeBucket]] = defaultdict(dict) + + # Process all trades + for trade in trades: + self.stats.trades_processed += 1 + + # Process trade for each timeframe + for timeframe in self.timeframes: + # Get bucket for this trade's timestamp + bucket_start = self._get_bucket_start_time(trade.timestamp, timeframe) + + # Create bucket if it doesn't exist + if bucket_start not in buckets[timeframe]: + buckets[timeframe][bucket_start] = TimeframeBucket( + symbol=self.symbol, + timeframe=timeframe, + start_time=bucket_start, + exchange=self.exchange + ) + + # Add trade to bucket + buckets[timeframe][bucket_start].add_trade(trade) + + # Convert all buckets to candles + candles = [] + for timeframe_buckets in buckets.values(): + for bucket in timeframe_buckets.values(): + candle = bucket.to_candle(is_complete=True) + candles.append(candle) + self.stats.candles_emitted += 1 + + return sorted(candles, key=lambda x: (x.timeframe, x.end_time)) + + def _get_bucket_start_time(self, timestamp: datetime, timeframe: str) -> datetime: + """ + Calculate the start time for the bucket that this timestamp belongs to. + + IMPORTANT: Uses RIGHT-ALIGNED timestamps + - For 5m timeframe, buckets start at 00:00, 00:05, 00:10, etc. + - Trade at 09:03:45 belongs to 09:00-09:05 bucket + - Trade at 09:07:30 belongs to 09:05-09:10 bucket + + Args: + timestamp: Trade timestamp + timeframe: Time period (e.g., '1m', '5m', '1h') + + Returns: + Start time for the appropriate bucket + """ + if timeframe == '1s': + return timestamp.replace(microsecond=0) + elif timeframe == '5s': + seconds = (timestamp.second // 5) * 5 + return timestamp.replace(second=seconds, microsecond=0) + elif timeframe == '10s': + seconds = (timestamp.second // 10) * 10 + return timestamp.replace(second=seconds, microsecond=0) + elif timeframe == '15s': + seconds = (timestamp.second // 15) * 15 + return timestamp.replace(second=seconds, microsecond=0) + elif timeframe == '30s': + seconds = (timestamp.second // 30) * 30 + return timestamp.replace(second=seconds, microsecond=0) + elif timeframe == '1m': + return timestamp.replace(second=0, microsecond=0) + elif timeframe == '5m': + minutes = (timestamp.minute // 5) * 5 + return timestamp.replace(minute=minutes, second=0, microsecond=0) + elif timeframe == '15m': + minutes = (timestamp.minute // 15) * 15 + return timestamp.replace(minute=minutes, second=0, microsecond=0) + elif timeframe == '30m': + minutes = (timestamp.minute // 30) * 30 + return timestamp.replace(minute=minutes, second=0, microsecond=0) + elif timeframe == '1h': + return timestamp.replace(minute=0, second=0, microsecond=0) + elif timeframe == '4h': + hours = (timestamp.hour // 4) * 4 + return timestamp.replace(hour=hours, minute=0, second=0, microsecond=0) + elif timeframe == '1d': + return timestamp.replace(hour=0, minute=0, second=0, microsecond=0) + else: + raise ValueError(f"Unsupported timeframe: {timeframe}") + + def get_stats(self) -> Dict[str, Any]: + """Get processing statistics.""" + return { + "component": self.component_name, + "stats": self.stats.to_dict() + } + + +__all__ = ['BatchCandleProcessor'] \ No newline at end of file diff --git a/data/common/aggregation/bucket.py b/data/common/aggregation/bucket.py new file mode 100644 index 0000000..ac5d182 --- /dev/null +++ b/data/common/aggregation/bucket.py @@ -0,0 +1,144 @@ +""" +Time bucket implementation for building OHLCV candles. + +This module provides the TimeframeBucket class which accumulates trades +within a specific time period and calculates OHLCV data incrementally. +""" + +from datetime import datetime, timezone, timedelta +from decimal import Decimal +from typing import Optional, List + +from ..data_types import StandardizedTrade, OHLCVCandle + + +class TimeframeBucket: + """ + Time bucket for building OHLCV candles from trades. + + This class accumulates trades within a specific time period + and calculates OHLCV data incrementally. + + IMPORTANT: Uses RIGHT-ALIGNED timestamps + - start_time: Beginning of the interval (inclusive) + - end_time: End of the interval (exclusive) - this becomes the candle timestamp + - Example: 09:00:00 - 09:05:00 bucket -> candle timestamp = 09:05:00 + """ + + def __init__(self, symbol: str, timeframe: str, start_time: datetime, exchange: str = "unknown"): + """ + Initialize time bucket for candle aggregation. + + Args: + symbol: Trading symbol (e.g., 'BTC-USDT') + timeframe: Time period (e.g., '1m', '5m', '1h') + start_time: Start time for this bucket (inclusive) + exchange: Exchange name + """ + self.symbol = symbol + self.timeframe = timeframe + self.start_time = start_time + self.end_time = self._calculate_end_time(start_time, timeframe) + self.exchange = exchange + + # OHLCV data + self.open: Optional[Decimal] = None + self.high: Optional[Decimal] = None + self.low: Optional[Decimal] = None + self.close: Optional[Decimal] = None + self.volume: Decimal = Decimal('0') + self.trade_count: int = 0 + + # Tracking + self.first_trade_time: Optional[datetime] = None + self.last_trade_time: Optional[datetime] = None + self.trades: List[StandardizedTrade] = [] + + def add_trade(self, trade: StandardizedTrade) -> bool: + """ + Add trade to this bucket if it belongs to this time period. + + Args: + trade: Standardized trade data + + Returns: + True if trade was added, False if outside time range + """ + # Check if trade belongs in this bucket (start_time <= trade.timestamp < end_time) + if not (self.start_time <= trade.timestamp < self.end_time): + return False + + # First trade sets open price + if self.open is None: + self.open = trade.price + self.high = trade.price + self.low = trade.price + self.first_trade_time = trade.timestamp + + # Update OHLCV + self.high = max(self.high, trade.price) + self.low = min(self.low, trade.price) + self.close = trade.price # Last trade sets close + self.volume += trade.size + self.trade_count += 1 + self.last_trade_time = trade.timestamp + + # Store trade for detailed analysis if needed + self.trades.append(trade) + + return True + + def to_candle(self, is_complete: bool = True) -> OHLCVCandle: + """ + Convert bucket to OHLCV candle. + + IMPORTANT: Candle timestamp = end_time (right-aligned, industry standard) + """ + return OHLCVCandle( + symbol=self.symbol, + timeframe=self.timeframe, + start_time=self.start_time, + end_time=self.end_time, + open=self.open or Decimal('0'), + high=self.high or Decimal('0'), + low=self.low or Decimal('0'), + close=self.close or Decimal('0'), + volume=self.volume, + trade_count=self.trade_count, + exchange=self.exchange, + is_complete=is_complete, + first_trade_time=self.first_trade_time, + last_trade_time=self.last_trade_time + ) + + def _calculate_end_time(self, start_time: datetime, timeframe: str) -> datetime: + """Calculate end time for this timeframe (right-aligned timestamp).""" + if timeframe == '1s': + return start_time + timedelta(seconds=1) + elif timeframe == '5s': + return start_time + timedelta(seconds=5) + elif timeframe == '10s': + return start_time + timedelta(seconds=10) + elif timeframe == '15s': + return start_time + timedelta(seconds=15) + elif timeframe == '30s': + return start_time + timedelta(seconds=30) + elif timeframe == '1m': + return start_time + timedelta(minutes=1) + elif timeframe == '5m': + return start_time + timedelta(minutes=5) + elif timeframe == '15m': + return start_time + timedelta(minutes=15) + elif timeframe == '30m': + return start_time + timedelta(minutes=30) + elif timeframe == '1h': + return start_time + timedelta(hours=1) + elif timeframe == '4h': + return start_time + timedelta(hours=4) + elif timeframe == '1d': + return start_time + timedelta(days=1) + else: + raise ValueError(f"Unsupported timeframe: {timeframe}") + + +__all__ = ['TimeframeBucket'] \ No newline at end of file diff --git a/data/common/aggregation/realtime.py b/data/common/aggregation/realtime.py new file mode 100644 index 0000000..c5eb15e --- /dev/null +++ b/data/common/aggregation/realtime.py @@ -0,0 +1,235 @@ +""" +Real-time candle processor for live trade data. + +This module provides the RealTimeCandleProcessor class for building OHLCV candles +from live trade data in real-time. +""" + +from datetime import datetime, timezone, timedelta +from decimal import Decimal +from typing import Dict, List, Optional, Any, Callable +from collections import defaultdict + +from ..data_types import StandardizedTrade, OHLCVCandle, CandleProcessingConfig, ProcessingStats +from .bucket import TimeframeBucket + + +class RealTimeCandleProcessor: + """ + Real-time candle processor for live trade data. + + This class processes trades immediately as they arrive from WebSocket, + building candles incrementally and emitting completed candles when + time boundaries are crossed. + + AGGREGATION PROCESS (NO FUTURE LEAKAGE): + + 1. Trade arrives from WebSocket/API with timestamp T + 2. For each configured timeframe (1m, 5m, etc.): + a. Calculate which time bucket this trade belongs to + b. Get current bucket for this timeframe + c. Check if trade timestamp crosses time boundary + d. If boundary crossed: complete and emit previous bucket, create new bucket + e. Add trade to current bucket (updates OHLCV) + 3. Only emit candles when time boundary is definitively crossed + 4. Never emit incomplete/future candles during real-time processing + + TIMESTAMP ALIGNMENT: + - Uses RIGHT-ALIGNED timestamps (industry standard) + - 1-minute candle covering 09:00:00-09:01:00 gets timestamp 09:01:00 + - 5-minute candle covering 09:00:00-09:05:00 gets timestamp 09:05:00 + - Candle represents PAST data, never future + """ + + def __init__(self, + symbol: str, + exchange: str, + config: Optional[CandleProcessingConfig] = None, + component_name: str = "realtime_candle_processor", + logger = None): + """ + Initialize real-time candle processor. + + Args: + symbol: Trading symbol (e.g., 'BTC-USDT') + exchange: Exchange name + config: Candle processing configuration + component_name: Name for logging/stats + logger: Optional logger instance + """ + self.symbol = symbol + self.exchange = exchange + self.config = config or CandleProcessingConfig() + self.component_name = component_name + self.logger = logger + + # Current buckets for each timeframe + self.current_buckets: Dict[str, TimeframeBucket] = {} + + # Callbacks for completed candles + self.candle_callbacks: List[Callable[[OHLCVCandle], None]] = [] + + # Stats tracking + self.stats = ProcessingStats() + + def add_candle_callback(self, callback: Callable[[OHLCVCandle], None]) -> None: + """Add callback to be called when candle is completed.""" + self.candle_callbacks.append(callback) + + def process_trade(self, trade: StandardizedTrade) -> List[OHLCVCandle]: + """ + Process a single trade and return any completed candles. + + Args: + trade: Standardized trade data + + Returns: + List of completed candles (if any time boundaries were crossed) + """ + self.stats.trades_processed += 1 + + completed_candles = [] + for timeframe in self.config.timeframes: + completed = self._process_trade_for_timeframe(trade, timeframe) + if completed: + completed_candles.append(completed) + self.stats.candles_emitted += 1 + + return completed_candles + + def _process_trade_for_timeframe(self, trade: StandardizedTrade, timeframe: str) -> Optional[OHLCVCandle]: + """ + Process trade for a specific timeframe and return completed candle if boundary crossed. + + Args: + trade: Trade to process + timeframe: Timeframe to process for (e.g., '1m', '5m') + + Returns: + Completed candle if time boundary crossed, None otherwise + """ + # Calculate which bucket this trade belongs to + bucket_start = self._get_bucket_start_time(trade.timestamp, timeframe) + + # Get current bucket for this timeframe + current_bucket = self.current_buckets.get(timeframe) + completed_candle = None + + # If we have a current bucket and trade belongs in a new bucket, + # complete current bucket and create new one + if current_bucket and bucket_start >= current_bucket.end_time: + completed_candle = current_bucket.to_candle(is_complete=True) + self._emit_candle(completed_candle) + current_bucket = None + + # Create new bucket if needed + if not current_bucket: + current_bucket = TimeframeBucket( + symbol=self.symbol, + timeframe=timeframe, + start_time=bucket_start, + exchange=self.exchange + ) + self.current_buckets[timeframe] = current_bucket + + # Add trade to current bucket + current_bucket.add_trade(trade) + + return completed_candle + + def _get_bucket_start_time(self, timestamp: datetime, timeframe: str) -> datetime: + """ + Calculate the start time for the bucket that this timestamp belongs to. + + IMPORTANT: Uses RIGHT-ALIGNED timestamps + - For 5m timeframe, buckets start at 00:00, 00:05, 00:10, etc. + - Trade at 09:03:45 belongs to 09:00-09:05 bucket + - Trade at 09:07:30 belongs to 09:05-09:10 bucket + + Args: + timestamp: Trade timestamp + timeframe: Time period (e.g., '1m', '5m', '1h') + + Returns: + Start time for the appropriate bucket + """ + if timeframe == '1s': + return timestamp.replace(microsecond=0) + elif timeframe == '5s': + seconds = (timestamp.second // 5) * 5 + return timestamp.replace(second=seconds, microsecond=0) + elif timeframe == '10s': + seconds = (timestamp.second // 10) * 10 + return timestamp.replace(second=seconds, microsecond=0) + elif timeframe == '15s': + seconds = (timestamp.second // 15) * 15 + return timestamp.replace(second=seconds, microsecond=0) + elif timeframe == '30s': + seconds = (timestamp.second // 30) * 30 + return timestamp.replace(second=seconds, microsecond=0) + elif timeframe == '1m': + return timestamp.replace(second=0, microsecond=0) + elif timeframe == '5m': + minutes = (timestamp.minute // 5) * 5 + return timestamp.replace(minute=minutes, second=0, microsecond=0) + elif timeframe == '15m': + minutes = (timestamp.minute // 15) * 15 + return timestamp.replace(minute=minutes, second=0, microsecond=0) + elif timeframe == '30m': + minutes = (timestamp.minute // 30) * 30 + return timestamp.replace(minute=minutes, second=0, microsecond=0) + elif timeframe == '1h': + return timestamp.replace(minute=0, second=0, microsecond=0) + elif timeframe == '4h': + hours = (timestamp.hour // 4) * 4 + return timestamp.replace(hour=hours, minute=0, second=0, microsecond=0) + elif timeframe == '1d': + return timestamp.replace(hour=0, minute=0, second=0, microsecond=0) + else: + raise ValueError(f"Unsupported timeframe: {timeframe}") + + def _emit_candle(self, candle: OHLCVCandle) -> None: + """Emit completed candle to all registered callbacks.""" + for callback in self.candle_callbacks: + try: + callback(candle) + except Exception as e: + if self.logger: + self.logger.error(f"Error in candle callback: {e}") + + def get_current_candles(self, incomplete: bool = True) -> List[OHLCVCandle]: + """ + Get current (incomplete) candles for all timeframes. + + Args: + incomplete: Whether to mark candles as incomplete (default True) + """ + return [ + bucket.to_candle(is_complete=not incomplete) + for bucket in self.current_buckets.values() + ] + + def force_complete_all_candles(self) -> List[OHLCVCandle]: + """ + Force completion of all current candles (e.g., on connection close). + + Returns: + List of completed candles + """ + completed = [] + for timeframe, bucket in self.current_buckets.items(): + candle = bucket.to_candle(is_complete=True) + completed.append(candle) + self._emit_candle(candle) + self.current_buckets.clear() + return completed + + def get_stats(self) -> Dict[str, Any]: + """Get processing statistics.""" + return { + "component": self.component_name, + "stats": self.stats.to_dict() + } + + +__all__ = ['RealTimeCandleProcessor'] \ No newline at end of file diff --git a/data/common/aggregation/utils.py b/data/common/aggregation/utils.py new file mode 100644 index 0000000..85962e0 --- /dev/null +++ b/data/common/aggregation/utils.py @@ -0,0 +1,78 @@ +""" +Utility functions for market data aggregation. + +This module provides common utility functions for working with OHLCV candles +and trade data aggregation. +""" + +import re +from typing import List, Tuple + +from ..data_types import StandardizedTrade, OHLCVCandle +from .batch import BatchCandleProcessor + + +def aggregate_trades_to_candles(trades: List[StandardizedTrade], + timeframes: List[str], + symbol: str, + exchange: str) -> List[OHLCVCandle]: + """ + Simple utility function to aggregate a list of trades to candles. + + Args: + trades: List of standardized trades + timeframes: List of timeframes to generate + symbol: Trading symbol + exchange: Exchange name + + Returns: + List of completed candles + """ + processor = BatchCandleProcessor(symbol, exchange, timeframes) + return processor.process_trades_to_candles(iter(trades)) + + +def validate_timeframe(timeframe: str) -> bool: + """ + Validate if timeframe is supported. + + Args: + timeframe: Timeframe string (e.g., '1s', '5s', '10s', '1m', '5m', '1h') + + Returns: + True if supported, False otherwise + """ + supported = ['1s', '5s', '10s', '15s', '30s', '1m', '5m', '15m', '30m', '1h', '4h', '1d'] + return timeframe in supported + + +def parse_timeframe(timeframe: str) -> Tuple[int, str]: + """ + Parse timeframe string into number and unit. + + Args: + timeframe: Timeframe string (e.g., '1s', '5m', '1h') + + Returns: + Tuple of (number, unit) + + Examples: + '1s' -> (1, 's') + '5m' -> (5, 'm') + '1h' -> (1, 'h') + '1d' -> (1, 'd') + """ + match = re.match(r'^(\d+)([smhd])$', timeframe.lower()) + if not match: + raise ValueError(f"Invalid timeframe format: {timeframe}") + + number = int(match.group(1)) + unit = match.group(2) + return number, unit + + +__all__ = [ + 'aggregate_trades_to_candles', + 'validate_timeframe', + 'parse_timeframe' +] \ No newline at end of file diff --git a/data/common/transformation.py b/data/common/transformation.py index 25b412b..6e9a44c 100644 --- a/data/common/transformation.py +++ b/data/common/transformation.py @@ -11,7 +11,7 @@ from typing import Dict, List, Optional, Any, Iterator from abc import ABC, abstractmethod from .data_types import StandardizedTrade, OHLCVCandle, DataValidationResult -from .aggregation import BatchCandleProcessor +from .aggregation.batch import BatchCandleProcessor class BaseDataTransformer(ABC): diff --git a/data/exchanges/okx/data_processor.py b/data/exchanges/okx/data_processor.py index dc7c0b0..4adf6f1 100644 --- a/data/exchanges/okx/data_processor.py +++ b/data/exchanges/okx/data_processor.py @@ -17,13 +17,13 @@ from ...common import ( StandardizedTrade, OHLCVCandle, CandleProcessingConfig, - RealTimeCandleProcessor, BaseDataValidator, ValidationResult, BaseDataTransformer, UnifiedDataTransformer, create_standardized_trade ) +from ...common.aggregation.realtime import RealTimeCandleProcessor class OKXMessageType(Enum): diff --git a/docs/decisions/ADR-001-data-processing-refactor.md b/docs/decisions/ADR-001-data-processing-refactor.md index cc435a8..fa76108 100644 --- a/docs/decisions/ADR-001-data-processing-refactor.md +++ b/docs/decisions/ADR-001-data-processing-refactor.md @@ -398,12 +398,21 @@ candles = transformer.process_trades_to_candles( **Real-time candle processing:** ```python -# Same code works for any exchange -candle_processor = RealTimeCandleProcessor(symbol, exchange, config) -candle_processor.add_candle_callback(my_candle_handler) +# Example usage +from data.common.aggregation.realtime import RealTimeCandleProcessor -for trade in real_time_trades: - completed_candles = candle_processor.process_trade(trade) +processor = RealTimeCandleProcessor(symbol, "binance", config) +processor.add_candle_callback(on_candle_completed) +processor.process_trade(trade) +``` + +```python +# Example usage +from data.common.aggregation.realtime import RealTimeCandleProcessor + +candle_processor = RealTimeCandleProcessor(symbol, exchange, config) +candle_processor.add_candle_callback(on_candle_completed) +candle_processor.process_trade(trade) ``` ## Testing diff --git a/docs/modules/technical-indicators.md b/docs/modules/technical-indicators.md index 65e0665..81a1ea5 100644 --- a/docs/modules/technical-indicators.md +++ b/docs/modules/technical-indicators.md @@ -164,7 +164,7 @@ The indicators module is designed to work seamlessly with the TCP platform's agg ### Real-Time Processing ```python -from data.common.aggregation import RealTimeCandleProcessor +from data.common.aggregation.realtime import RealTimeCandleProcessor from data.common.indicators import TechnicalIndicators # Set up real-time processing diff --git a/tasks/refactor-common-package.md b/tasks/refactor-common-package.md new file mode 100644 index 0000000..d063729 --- /dev/null +++ b/tasks/refactor-common-package.md @@ -0,0 +1,63 @@ +## Relevant Files + +- `data/common/aggregation.py` - To be broken into a sub-package. +- `data/common/indicators.py` - To be broken into a sub-package and have a bug fixed. +- `data/common/validation.py` - To be refactored for better modularity. +- `data/common/transformation.py` - To be refactored for better modularity. +- `data/common/data_types.py` - To be updated with new types from other modules. +- `data/common/__init__.py` - To be updated to reflect the new package structure. +- `tests/` - Existing tests will need to be run after each step to ensure no regressions. + +### Notes + +- This refactoring focuses on improving modularity by splitting large files into smaller, more focused modules, as outlined in the `refactoring.mdc` guide. +- Each major step will be followed by a verification phase to ensure the application remains stable. + +## Tasks + +- [x] 1.0 Refactor `aggregation.py` into a dedicated sub-package. + - [x] 1.1 Create safety net tests to ensure the aggregation logic still works as expected. + - [x] 1.2 Create a new directory `data/common/aggregation`. + - [x] 1.3 Create `data/common/aggregation/__init__.py` to mark it as a package. + - [x] 1.4 Move the `TimeframeBucket` class to `data/common/aggregation/bucket.py`. + - [x] 1.5 Move the `RealTimeCandleProcessor` class to `data/common/aggregation/realtime.py`. + - [x] 1.6 Move the `BatchCandleProcessor` class to `data/common/aggregation/batch.py`. + - [x] 1.7 Move the utility functions to `data/common/aggregation/utils.py`. + - [x] 1.8 Update `data/common/aggregation/__init__.py` to expose all public classes and functions. + - [x] 1.9 Delete the original `data/common/aggregation.py` file. + - [x] 1.10 Run tests to verify the aggregation logic still works as expected. + +- [ ] 2.0 Refactor `indicators.py` into a dedicated sub-package. + - [ ] 2.1 Create safety net tests for indicators module. + - [ ] 2.2 Create a new directory `data/common/indicators`. + - [ ] 2.3 Create `data/common/indicators/__init__.py` to mark it as a package. + - [ ] 2.4 Move the `TechnicalIndicators` class to `data/common/indicators/technical.py`. + - [ ] 2.5 Move the `IndicatorResult` class to `data/common/indicators/result.py`. + - [ ] 2.6 Move the utility functions to `data/common/indicators/utils.py`. + - [ ] 2.7 Update `data/common/indicators/__init__.py` to expose all public classes and functions. + - [ ] 2.8 Delete the original `data/common/indicators.py` file. + - [ ] 2.9 Run tests to verify the indicators logic still works as expected. + +- [ ] 3.0 Refactor `validation.py` for better modularity. + - [ ] 3.1 Create safety net tests for validation module. + - [ ] 3.2 Extract common validation logic into separate functions. + - [ ] 3.3 Improve error handling and validation messages. + - [ ] 3.4 Run tests to verify validation still works as expected. + +- [ ] 4.0 Refactor `transformation.py` for better modularity. + - [ ] 4.1 Create safety net tests for transformation module. + - [ ] 4.2 Extract common transformation logic into separate functions. + - [ ] 4.3 Improve error handling and transformation messages. + - [ ] 4.4 Run tests to verify transformation still works as expected. + +- [ ] 5.0 Update `data_types.py` with new types. + - [ ] 5.1 Review and document all data types. + - [ ] 5.2 Add any missing type hints. + - [ ] 5.3 Add validation for data types. + - [ ] 5.4 Run tests to verify data types still work as expected. + +- [ ] 6.0 Final verification and cleanup. + - [ ] 6.1 Run all tests to ensure no regressions. + - [ ] 6.2 Update documentation to reflect new structure. + - [ ] 6.3 Review and clean up any remaining TODOs. + - [ ] 6.4 Create PR with changes. \ No newline at end of file diff --git a/tests/data/common/test_aggregation_safety.py b/tests/data/common/test_aggregation_safety.py new file mode 100644 index 0000000..82499dc --- /dev/null +++ b/tests/data/common/test_aggregation_safety.py @@ -0,0 +1,231 @@ +""" +Safety net tests for the aggregation package. + +These tests verify the core functionality of the aggregation module +before and during refactoring to ensure no regressions are introduced. +""" + +import unittest +from datetime import datetime, timezone, timedelta +from decimal import Decimal +from typing import Dict, List + +from data.common.aggregation.bucket import TimeframeBucket +from data.common.aggregation.realtime import RealTimeCandleProcessor +from data.common.aggregation.batch import BatchCandleProcessor +from data.common.aggregation.utils import ( + validate_timeframe, + parse_timeframe, + aggregate_trades_to_candles +) +from data.common import ( + StandardizedTrade, + OHLCVCandle, + CandleProcessingConfig +) + +class TestTimeframeBucketSafety(unittest.TestCase): + """Safety net tests for TimeframeBucket class.""" + + def setUp(self): + self.symbol = "BTC-USDT" + self.timeframe = "5m" + self.start_time = datetime(2024, 1, 1, 10, 0, tzinfo=timezone.utc) + self.bucket = TimeframeBucket(self.symbol, self.timeframe, self.start_time) + + def test_bucket_initialization(self): + """Test bucket initialization and time boundaries.""" + self.assertEqual(self.bucket.symbol, self.symbol) + self.assertEqual(self.bucket.timeframe, self.timeframe) + self.assertEqual(self.bucket.start_time, self.start_time) + self.assertEqual(self.bucket.end_time, self.start_time + timedelta(minutes=5)) + + def test_add_trade_updates_ohlcv(self): + """Test that adding trades correctly updates OHLCV data.""" + trade1 = StandardizedTrade( + symbol=self.symbol, + trade_id="1", + price=Decimal("50000"), + size=Decimal("1"), + side="buy", + timestamp=self.start_time + timedelta(minutes=1), + exchange="test" + ) + + trade2 = StandardizedTrade( + symbol=self.symbol, + trade_id="2", + price=Decimal("51000"), + size=Decimal("0.5"), + side="sell", + timestamp=self.start_time + timedelta(minutes=2), + exchange="test" + ) + + # Add first trade + self.bucket.add_trade(trade1) + self.assertEqual(self.bucket.open, Decimal("50000")) + self.assertEqual(self.bucket.high, Decimal("50000")) + self.assertEqual(self.bucket.low, Decimal("50000")) + self.assertEqual(self.bucket.close, Decimal("50000")) + self.assertEqual(self.bucket.volume, Decimal("1")) + self.assertEqual(self.bucket.trade_count, 1) + + # Add second trade + self.bucket.add_trade(trade2) + self.assertEqual(self.bucket.open, Decimal("50000")) + self.assertEqual(self.bucket.high, Decimal("51000")) + self.assertEqual(self.bucket.low, Decimal("50000")) + self.assertEqual(self.bucket.close, Decimal("51000")) + self.assertEqual(self.bucket.volume, Decimal("1.5")) + self.assertEqual(self.bucket.trade_count, 2) + + def test_bucket_time_boundaries(self): + """Test that trades are only added within correct time boundaries.""" + valid_trade = StandardizedTrade( + symbol=self.symbol, + trade_id="1", + price=Decimal("50000"), + size=Decimal("1"), + side="buy", + timestamp=self.start_time + timedelta(minutes=1), + exchange="test" + ) + + invalid_trade = StandardizedTrade( + symbol=self.symbol, + trade_id="2", + price=Decimal("51000"), + size=Decimal("1"), + side="buy", + timestamp=self.start_time + timedelta(minutes=6), + exchange="test" + ) + + self.assertTrue(self.bucket.add_trade(valid_trade)) + self.assertFalse(self.bucket.add_trade(invalid_trade)) + +class TestRealTimeCandleProcessorSafety(unittest.TestCase): + """Safety net tests for RealTimeCandleProcessor class.""" + + def setUp(self): + self.symbol = "BTC-USDT" + self.exchange = "test" + self.config = CandleProcessingConfig(timeframes=["1m", "5m"]) + self.processor = RealTimeCandleProcessor(self.symbol, self.exchange, self.config) + + def test_process_single_trade(self): + """Test processing a single trade.""" + trade = StandardizedTrade( + symbol=self.symbol, + trade_id="1", + price=Decimal("50000"), + size=Decimal("1"), + side="buy", + timestamp=datetime(2024, 1, 1, 10, 0, 30, tzinfo=timezone.utc), + exchange=self.exchange + ) + + completed_candles = self.processor.process_trade(trade) + self.assertEqual(len(completed_candles), 0) # No completed candles yet + + current_candles = self.processor.get_current_candles() + self.assertEqual(len(current_candles), 2) # One for each timeframe + + def test_candle_completion(self): + """Test that candles are completed at correct time boundaries.""" + # First trade in first minute + trade1 = StandardizedTrade( + symbol=self.symbol, + trade_id="1", + price=Decimal("50000"), + size=Decimal("1"), + side="buy", + timestamp=datetime(2024, 1, 1, 10, 0, 30, tzinfo=timezone.utc), + exchange=self.exchange + ) + + # Second trade in next minute - should complete 1m candle + trade2 = StandardizedTrade( + symbol=self.symbol, + trade_id="2", + price=Decimal("51000"), + size=Decimal("1"), + side="sell", + timestamp=datetime(2024, 1, 1, 10, 1, 15, tzinfo=timezone.utc), + exchange=self.exchange + ) + + completed1 = self.processor.process_trade(trade1) + self.assertEqual(len(completed1), 0) + + completed2 = self.processor.process_trade(trade2) + self.assertEqual(len(completed2), 1) # 1m candle completed + self.assertEqual(completed2[0].timeframe, "1m") + +class TestBatchCandleProcessorSafety(unittest.TestCase): + """Safety net tests for BatchCandleProcessor class.""" + + def setUp(self): + self.symbol = "BTC-USDT" + self.exchange = "test" + self.timeframes = ["1m", "5m"] + self.processor = BatchCandleProcessor(self.symbol, self.exchange, self.timeframes) + + def test_batch_processing(self): + """Test processing multiple trades in batch.""" + trades = [ + StandardizedTrade( + symbol=self.symbol, + trade_id=str(i), + price=Decimal(str(50000 + i)), + size=Decimal("1"), + side="buy" if i % 2 == 0 else "sell", + timestamp=datetime(2024, 1, 1, 10, 0, i, tzinfo=timezone.utc), + exchange=self.exchange + ) + for i in range(10) + ] + + candles = self.processor.process_trades_to_candles(iter(trades)) + self.assertTrue(len(candles) > 0) + + # Verify candle integrity + for candle in candles: + self.assertEqual(candle.symbol, self.symbol) + self.assertTrue(candle.timeframe in self.timeframes) + self.assertTrue(candle.is_complete) + self.assertTrue(candle.volume > 0) + self.assertTrue(candle.trade_count > 0) + +class TestAggregationUtilsSafety(unittest.TestCase): + """Safety net tests for aggregation utility functions.""" + + def test_validate_timeframe(self): + """Test timeframe validation.""" + valid_timeframes = ['1s', '5s', '10s', '15s', '30s', '1m', '5m', '15m', '30m', '1h', '4h', '1d'] + invalid_timeframes = ['2m', '2h', '1w', 'invalid'] + + for tf in valid_timeframes: + self.assertTrue(validate_timeframe(tf)) + + for tf in invalid_timeframes: + self.assertFalse(validate_timeframe(tf)) + + def test_parse_timeframe(self): + """Test timeframe parsing.""" + test_cases = [ + ('1s', (1, 's')), + ('5m', (5, 'm')), + ('1h', (1, 'h')), + ('1d', (1, 'd')) + ] + + for tf, expected in test_cases: + self.assertEqual(parse_timeframe(tf), expected) + + with self.assertRaises(ValueError): + parse_timeframe('invalid') + +if __name__ == '__main__': + unittest.main() \ No newline at end of file diff --git a/tests/quick_aggregation_test.py b/tests/quick_aggregation_test.py index fa23689..f31dc37 100644 --- a/tests/quick_aggregation_test.py +++ b/tests/quick_aggregation_test.py @@ -14,7 +14,7 @@ from typing import Dict, List, Any # Import our modules from data.common.data_types import StandardizedTrade, CandleProcessingConfig, OHLCVCandle -from data.common.aggregation import RealTimeCandleProcessor +from data.common.aggregation.realtime import RealTimeCandleProcessor from data.exchanges.okx.websocket import OKXWebSocketClient, OKXSubscription, OKXChannelType # Set up minimal logging diff --git a/tests/test_data_collection_aggregation.py b/tests/test_data_collection_aggregation.py index f05530e..0e98b13 100644 --- a/tests/test_data_collection_aggregation.py +++ b/tests/test_data_collection_aggregation.py @@ -33,7 +33,7 @@ from data.common.data_types import ( StandardizedTrade, OHLCVCandle, CandleProcessingConfig, DataValidationResult ) -from data.common.aggregation import RealTimeCandleProcessor +from data.common.aggregation.realtime import RealTimeCandleProcessor from data.common.validation import BaseDataValidator, ValidationResult from data.common.transformation import BaseDataTransformer from utils.logger import get_logger diff --git a/tests/test_real_okx_aggregation.py b/tests/test_real_okx_aggregation.py index 647b449..2e58370 100644 --- a/tests/test_real_okx_aggregation.py +++ b/tests/test_real_okx_aggregation.py @@ -19,7 +19,7 @@ from collections import defaultdict # Import our modules from data.common.data_types import StandardizedTrade, CandleProcessingConfig, OHLCVCandle -from data.common.aggregation import RealTimeCandleProcessor +from data.common.aggregation.realtime import RealTimeCandleProcessor from data.exchanges.okx.websocket import OKXWebSocketClient, OKXSubscription, OKXChannelType from data.exchanges.okx.data_processor import OKXDataProcessor diff --git a/tests/test_refactored_okx.py b/tests/test_refactored_okx.py index 8c2941f..27aefa0 100644 --- a/tests/test_refactored_okx.py +++ b/tests/test_refactored_okx.py @@ -25,6 +25,7 @@ from data.common import ( RealTimeCandleProcessor, CandleProcessingConfig ) +from data.common.aggregation.realtime import RealTimeCandleProcessor from data.base_collector import DataType from utils.logger import get_logger