Refactor aggregation module and enhance structure

- Split the `aggregation.py` file into a dedicated sub-package, improving modularity and maintainability.
- Moved `TimeframeBucket`, `RealTimeCandleProcessor`, and `BatchCandleProcessor` classes into their respective files within the new `aggregation` sub-package.
- Introduced utility functions for trade aggregation and validation, enhancing code organization.
- Updated import paths throughout the codebase to reflect the new structure, ensuring compatibility.
- Added safety net tests for the aggregation package to verify core functionality and prevent regressions during refactoring.

These changes enhance the overall architecture of the aggregation module, making it more scalable and easier to manage.
This commit is contained in:
Vasily.onl
2025-06-07 01:17:22 +08:00
parent fe9d8e75ed
commit e7ede7f329
17 changed files with 965 additions and 616 deletions

View File

@@ -9,15 +9,14 @@ from .data_types import (
StandardizedTrade,
OHLCVCandle,
MarketDataPoint,
DataValidationResult
)
from .aggregation import (
TimeframeBucket,
RealTimeCandleProcessor,
DataValidationResult,
CandleProcessingConfig
)
from .aggregation import TimeframeBucket
# Temporarily import from old location until we move these classes
from .aggregation import RealTimeCandleProcessor
from .transformation import (
BaseDataTransformer,
UnifiedDataTransformer,
@@ -42,11 +41,11 @@ __all__ = [
'OHLCVCandle',
'MarketDataPoint',
'DataValidationResult',
'CandleProcessingConfig',
# Aggregation
'TimeframeBucket',
'RealTimeCandleProcessor',
'CandleProcessingConfig',
# Transformation
'BaseDataTransformer',

View File

@@ -1,598 +0,0 @@
"""
Common aggregation utilities for all exchanges.
This module provides shared functionality for building OHLCV candles
from trade data, regardless of the source exchange.
AGGREGATION STRATEGY:
- Uses RIGHT-ALIGNED timestamps (industry standard)
- Candle timestamp = end time of the interval (close time)
- 5-minute candle with timestamp 09:05:00 represents data from 09:00:01 to 09:05:00
- Prevents future leakage by only completing candles when time boundary is crossed
- Aligns with major exchanges (Binance, OKX, Coinbase)
PROCESS FLOW:
1. Trade arrives with timestamp T
2. Calculate which time bucket this trade belongs to
3. If bucket doesn't exist or time boundary crossed, complete previous bucket
4. Add trade to current bucket
5. Only emit completed candles (never future data)
"""
from datetime import datetime, timezone, timedelta
from decimal import Decimal
from typing import Dict, List, Optional, Any, Iterator, Callable
from collections import defaultdict
from .data_types import (
StandardizedTrade,
OHLCVCandle,
CandleProcessingConfig,
ProcessingStats
)
class TimeframeBucket:
"""
Time bucket for building OHLCV candles from trades.
This class accumulates trades within a specific time period
and calculates OHLCV data incrementally.
IMPORTANT: Uses RIGHT-ALIGNED timestamps
- start_time: Beginning of the interval (inclusive)
- end_time: End of the interval (exclusive) - this becomes the candle timestamp
- Example: 09:00:00 - 09:05:00 bucket -> candle timestamp = 09:05:00
"""
def __init__(self, symbol: str, timeframe: str, start_time: datetime, exchange: str = "unknown"):
"""
Initialize time bucket for candle aggregation.
Args:
symbol: Trading symbol (e.g., 'BTC-USDT')
timeframe: Time period (e.g., '1m', '5m', '1h')
start_time: Start time for this bucket (inclusive)
exchange: Exchange name
"""
self.symbol = symbol
self.timeframe = timeframe
self.start_time = start_time
self.end_time = self._calculate_end_time(start_time, timeframe)
self.exchange = exchange
# OHLCV data
self.open: Optional[Decimal] = None
self.high: Optional[Decimal] = None
self.low: Optional[Decimal] = None
self.close: Optional[Decimal] = None
self.volume: Decimal = Decimal('0')
self.trade_count: int = 0
# Tracking
self.first_trade_time: Optional[datetime] = None
self.last_trade_time: Optional[datetime] = None
self.trades: List[StandardizedTrade] = []
def add_trade(self, trade: StandardizedTrade) -> bool:
"""
Add trade to this bucket if it belongs to this time period.
Args:
trade: Standardized trade data
Returns:
True if trade was added, False if outside time range
"""
# Check if trade belongs in this bucket (start_time <= trade.timestamp < end_time)
if not (self.start_time <= trade.timestamp < self.end_time):
return False
# First trade sets open price
if self.open is None:
self.open = trade.price
self.high = trade.price
self.low = trade.price
self.first_trade_time = trade.timestamp
# Update OHLCV
self.high = max(self.high, trade.price)
self.low = min(self.low, trade.price)
self.close = trade.price # Last trade sets close
self.volume += trade.size
self.trade_count += 1
self.last_trade_time = trade.timestamp
# Store trade for detailed analysis if needed
self.trades.append(trade)
return True
def to_candle(self, is_complete: bool = True) -> OHLCVCandle:
"""
Convert bucket to OHLCV candle.
IMPORTANT: Candle timestamp = end_time (right-aligned, industry standard)
"""
return OHLCVCandle(
symbol=self.symbol,
timeframe=self.timeframe,
start_time=self.start_time,
end_time=self.end_time,
open=self.open or Decimal('0'),
high=self.high or Decimal('0'),
low=self.low or Decimal('0'),
close=self.close or Decimal('0'),
volume=self.volume,
trade_count=self.trade_count,
exchange=self.exchange,
is_complete=is_complete,
first_trade_time=self.first_trade_time,
last_trade_time=self.last_trade_time
)
def _calculate_end_time(self, start_time: datetime, timeframe: str) -> datetime:
"""Calculate end time for this timeframe (right-aligned timestamp)."""
if timeframe == '1s':
return start_time + timedelta(seconds=1)
elif timeframe == '5s':
return start_time + timedelta(seconds=5)
elif timeframe == '10s':
return start_time + timedelta(seconds=10)
elif timeframe == '15s':
return start_time + timedelta(seconds=15)
elif timeframe == '30s':
return start_time + timedelta(seconds=30)
elif timeframe == '1m':
return start_time + timedelta(minutes=1)
elif timeframe == '5m':
return start_time + timedelta(minutes=5)
elif timeframe == '15m':
return start_time + timedelta(minutes=15)
elif timeframe == '30m':
return start_time + timedelta(minutes=30)
elif timeframe == '1h':
return start_time + timedelta(hours=1)
elif timeframe == '4h':
return start_time + timedelta(hours=4)
elif timeframe == '1d':
return start_time + timedelta(days=1)
else:
raise ValueError(f"Unsupported timeframe: {timeframe}")
class RealTimeCandleProcessor:
"""
Real-time candle processor for live trade data.
This class processes trades immediately as they arrive from WebSocket,
building candles incrementally and emitting completed candles when
time boundaries are crossed.
AGGREGATION PROCESS (NO FUTURE LEAKAGE):
1. Trade arrives from WebSocket/API with timestamp T
2. For each configured timeframe (1m, 5m, etc.):
a. Calculate which time bucket this trade belongs to
b. Get current bucket for this timeframe
c. Check if trade timestamp crosses time boundary
d. If boundary crossed: complete and emit previous bucket, create new bucket
e. Add trade to current bucket (updates OHLCV)
3. Only emit candles when time boundary is definitively crossed
4. Never emit incomplete/future candles during real-time processing
TIMESTAMP ALIGNMENT:
- Uses RIGHT-ALIGNED timestamps (industry standard)
- 1-minute candle covering 09:00:00-09:01:00 gets timestamp 09:01:00
- 5-minute candle covering 09:00:00-09:05:00 gets timestamp 09:05:00
- Candle represents PAST data, never future
"""
def __init__(self,
symbol: str,
exchange: str,
config: Optional[CandleProcessingConfig] = None,
component_name: str = "realtime_candle_processor",
logger = None):
"""
Initialize real-time candle processor.
Args:
symbol: Trading symbol (e.g., 'BTC-USDT')
exchange: Exchange name (e.g., 'okx', 'binance')
config: Processing configuration
component_name: Name for logging
"""
self.symbol = symbol
self.exchange = exchange
self.config = config or CandleProcessingConfig()
self.component_name = component_name
self.logger = logger
# Current buckets for each timeframe
self.current_buckets: Dict[str, TimeframeBucket] = {}
# Callback functions for completed candles
self.candle_callbacks: List[Callable[[OHLCVCandle], None]] = []
# Statistics
self.stats = ProcessingStats(active_timeframes=len(self.config.timeframes))
if self.logger:
self.logger.info(f"{self.component_name}: Initialized real-time candle processor for {symbol} on {exchange} with timeframes: {self.config.timeframes}")
def add_candle_callback(self, callback: Callable[[OHLCVCandle], None]) -> None:
"""Add callback function to receive completed candles."""
self.candle_callbacks.append(callback)
if self.logger:
self.logger.debug(f"{self.component_name}: Added candle callback: {callback.__name__ if hasattr(callback, '__name__') else str(callback)}")
def process_trade(self, trade: StandardizedTrade) -> List[OHLCVCandle]:
"""
Process single trade - main entry point for real-time processing.
This is called for each trade as it arrives from WebSocket.
CRITICAL: Only returns completed candles (time boundary crossed)
Never returns incomplete/future candles to prevent leakage.
Args:
trade: Standardized trade data
Returns:
List of completed candles (if any time boundaries were crossed)
"""
try:
completed_candles = []
# Process trade for each timeframe
for timeframe in self.config.timeframes:
candle = self._process_trade_for_timeframe(trade, timeframe)
if candle:
completed_candles.append(candle)
# Update statistics
self.stats.trades_processed += 1
self.stats.last_trade_time = trade.timestamp
# Emit completed candles to callbacks
for candle in completed_candles:
self._emit_candle(candle)
return completed_candles
except Exception as e:
if self.logger:
self.logger.error(f"{self.component_name}: Error processing trade for {self.symbol}: {e}")
self.stats.errors_count += 1
return []
def _process_trade_for_timeframe(self, trade: StandardizedTrade, timeframe: str) -> Optional[OHLCVCandle]:
"""
Process trade for specific timeframe.
CRITICAL LOGIC FOR PREVENTING FUTURE LEAKAGE:
1. Calculate which bucket this trade belongs to
2. Check if current bucket exists and matches
3. If bucket mismatch (time boundary crossed), complete current bucket first
4. Create new bucket and add trade
5. Only return completed candles, never incomplete ones
"""
try:
# Calculate which bucket this trade belongs to
trade_bucket_start = self._get_bucket_start_time(trade.timestamp, timeframe)
# Check if we have a current bucket for this timeframe
current_bucket = self.current_buckets.get(timeframe)
completed_candle = None
# If no bucket exists or time boundary crossed, handle transition
if current_bucket is None:
# First bucket for this timeframe
current_bucket = TimeframeBucket(self.symbol, timeframe, trade_bucket_start, self.exchange)
self.current_buckets[timeframe] = current_bucket
elif current_bucket.start_time != trade_bucket_start:
# Time boundary crossed - complete previous bucket
if current_bucket.trade_count > 0: # Only complete if it has trades
completed_candle = current_bucket.to_candle(is_complete=True)
self.stats.candles_emitted += 1
self.stats.last_candle_time = completed_candle.end_time
# Create new bucket for current time period
current_bucket = TimeframeBucket(self.symbol, timeframe, trade_bucket_start, self.exchange)
self.current_buckets[timeframe] = current_bucket
# Add trade to current bucket
if not current_bucket.add_trade(trade):
# This should never happen if logic is correct
if self.logger:
self.logger.warning(f"{self.component_name}: Trade {trade.timestamp} could not be added to bucket {current_bucket.start_time}-{current_bucket.end_time}")
return completed_candle
except Exception as e:
if self.logger:
self.logger.error(f"{self.component_name}: Error processing trade for timeframe {timeframe}: {e}")
self.stats.errors_count += 1
return None
def _get_bucket_start_time(self, timestamp: datetime, timeframe: str) -> datetime:
"""
Calculate bucket start time for given timestamp and timeframe.
This function determines which time bucket a trade belongs to.
The start time is the LEFT boundary of the interval.
EXAMPLES:
- Trade at 09:03:45.123 for 1s timeframe -> bucket start = 09:03:45.000
- Trade at 09:03:47.456 for 5s timeframe -> bucket start = 09:03:45.000 (45-50s bucket)
- Trade at 09:03:52.789 for 10s timeframe -> bucket start = 09:03:50.000 (50-60s bucket)
- Trade at 09:03:23.456 for 15s timeframe -> bucket start = 09:03:15.000 (15-30s bucket)
- Trade at 09:03:45 for 5m timeframe -> bucket start = 09:00:00
- Trade at 09:07:23 for 5m timeframe -> bucket start = 09:05:00
- Trade at 14:00:00 for 1h timeframe -> bucket start = 14:00:00
Args:
timestamp: Trade timestamp
timeframe: Target timeframe
Returns:
Bucket start time (left boundary)
"""
if timeframe == '1s':
# 1-second buckets align to second boundaries (remove microseconds)
return timestamp.replace(microsecond=0)
elif timeframe == '5s':
# 5-second buckets: 00:00, 00:05, 00:10, 00:15, etc.
dt = timestamp.replace(microsecond=0)
return dt.replace(second=(dt.second // 5) * 5)
elif timeframe == '10s':
# 10-second buckets: 00:00, 00:10, 00:20, 00:30, 00:40, 00:50
dt = timestamp.replace(microsecond=0)
return dt.replace(second=(dt.second // 10) * 10)
elif timeframe == '15s':
# 15-second buckets: 00:00, 00:15, 00:30, 00:45
dt = timestamp.replace(microsecond=0)
return dt.replace(second=(dt.second // 15) * 15)
elif timeframe == '30s':
# 30-second buckets: 00:00, 00:30
dt = timestamp.replace(microsecond=0)
return dt.replace(second=(dt.second // 30) * 30)
# Normalize to UTC and remove microseconds for clean boundaries
dt = timestamp.replace(second=0, microsecond=0)
if timeframe == '1m':
# 1-minute buckets align to minute boundaries
return dt
elif timeframe == '5m':
# 5-minute buckets: 00:00, 00:05, 00:10, etc.
return dt.replace(minute=(dt.minute // 5) * 5)
elif timeframe == '15m':
# 15-minute buckets: 00:00, 00:15, 00:30, 00:45
return dt.replace(minute=(dt.minute // 15) * 15)
elif timeframe == '30m':
# 30-minute buckets: 00:00, 00:30
return dt.replace(minute=(dt.minute // 30) * 30)
elif timeframe == '1h':
# 1-hour buckets align to hour boundaries
return dt.replace(minute=0)
elif timeframe == '4h':
# 4-hour buckets: 00:00, 04:00, 08:00, 12:00, 16:00, 20:00
return dt.replace(minute=0, hour=(dt.hour // 4) * 4)
elif timeframe == '1d':
# 1-day buckets align to day boundaries (midnight UTC)
return dt.replace(minute=0, hour=0)
else:
raise ValueError(f"Unsupported timeframe: {timeframe}")
def _emit_candle(self, candle: OHLCVCandle) -> None:
"""Emit completed candle to all callbacks."""
try:
for callback in self.candle_callbacks:
callback(candle)
except Exception as e:
if self.logger:
self.logger.error(f"{self.component_name}: Error in candle callback: {e}")
self.stats.errors_count += 1
def get_current_candles(self, incomplete: bool = True) -> List[OHLCVCandle]:
"""
Get current incomplete candles for all timeframes.
WARNING: These are incomplete candles and should NOT be used for trading decisions.
They are useful for monitoring/debugging only.
"""
candles = []
for bucket in self.current_buckets.values():
if bucket.trade_count > 0: # Only return buckets with trades
candles.append(bucket.to_candle(is_complete=False))
return candles
def force_complete_all_candles(self) -> List[OHLCVCandle]:
"""
Force completion of all current candles (useful for shutdown/batch processing).
WARNING: This should only be used during shutdown or batch processing,
not during live trading as it forces incomplete candles to be marked complete.
"""
completed_candles = []
for bucket in self.current_buckets.values():
if bucket.trade_count > 0:
candle = bucket.to_candle(is_complete=True)
completed_candles.append(candle)
self._emit_candle(candle)
# Clear buckets
self.current_buckets.clear()
return completed_candles
def get_stats(self) -> Dict[str, Any]:
"""Get processing statistics."""
stats_dict = self.stats.to_dict()
stats_dict['current_buckets'] = {
tf: bucket.trade_count for tf, bucket in self.current_buckets.items()
}
return stats_dict
class BatchCandleProcessor:
"""
Batch candle processor for historical data processing.
This class processes large batches of historical trades efficiently,
building candles for multiple timeframes simultaneously.
"""
def __init__(self,
symbol: str,
exchange: str,
timeframes: List[str],
component_name: str = "batch_candle_processor",
logger = None):
"""
Initialize batch candle processor.
Args:
symbol: Trading symbol
exchange: Exchange name
timeframes: List of timeframes to process
component_name: Name for logging
"""
self.symbol = symbol
self.exchange = exchange
self.timeframes = timeframes
self.component_name = component_name
self.logger = logger
# Statistics
self.stats = ProcessingStats(active_timeframes=len(timeframes))
if self.logger:
self.logger.info(f"{self.component_name}: Initialized batch candle processor for {symbol} on {exchange}")
def process_trades_to_candles(self, trades: Iterator[StandardizedTrade]) -> List[OHLCVCandle]:
"""
Process trade iterator to candles - optimized for batch processing.
This function handles ALL scenarios:
- Historical: Batch trade iterators
- Backfill: API trade iterators
- Real-time batch: Multiple trades at once
Args:
trades: Iterator of standardized trades
Returns:
List of completed candles
"""
try:
# Create temporary processor for this batch
config = CandleProcessingConfig(timeframes=self.timeframes, auto_save_candles=False)
processor = RealTimeCandleProcessor(
self.symbol, self.exchange, config,
f"batch_processor_{self.symbol}_{self.exchange}"
)
all_candles = []
# Process all trades
for trade in trades:
completed_candles = processor.process_trade(trade)
all_candles.extend(completed_candles)
self.stats.trades_processed += 1
# Force complete any remaining candles
remaining_candles = processor.force_complete_all_candles()
all_candles.extend(remaining_candles)
# Update stats
self.stats.candles_emitted = len(all_candles)
if all_candles:
self.stats.last_candle_time = max(candle.end_time for candle in all_candles)
if self.logger:
self.logger.info(f"{self.component_name}: Batch processed {self.stats.trades_processed} trades to {len(all_candles)} candles")
return all_candles
except Exception as e:
if self.logger:
self.logger.error(f"{self.component_name}: Error in batch processing trades to candles: {e}")
self.stats.errors_count += 1
return []
def get_stats(self) -> Dict[str, Any]:
"""Get processing statistics."""
return self.stats.to_dict()
# Utility functions for common aggregation operations
def aggregate_trades_to_candles(trades: List[StandardizedTrade],
timeframes: List[str],
symbol: str,
exchange: str) -> List[OHLCVCandle]:
"""
Simple utility function to aggregate a list of trades to candles.
Args:
trades: List of standardized trades
timeframes: List of timeframes to generate
symbol: Trading symbol
exchange: Exchange name
Returns:
List of completed candles
"""
processor = BatchCandleProcessor(symbol, exchange, timeframes)
return processor.process_trades_to_candles(iter(trades))
def validate_timeframe(timeframe: str) -> bool:
"""
Validate if timeframe is supported.
Args:
timeframe: Timeframe string (e.g., '1s', '5s', '10s', '1m', '5m', '1h')
Returns:
True if supported, False otherwise
"""
supported = ['1s', '5s', '10s', '15s', '30s', '1m', '5m', '15m', '30m', '1h', '4h', '1d']
return timeframe in supported
def parse_timeframe(timeframe: str) -> tuple[int, str]:
"""
Parse timeframe string into number and unit.
Args:
timeframe: Timeframe string (e.g., '1s', '5m', '1h')
Returns:
Tuple of (number, unit)
Examples:
'1s' -> (1, 's')
'5m' -> (5, 'm')
'1h' -> (1, 'h')
'1d' -> (1, 'd')
"""
import re
match = re.match(r'^(\d+)([smhd])$', timeframe.lower())
if not match:
raise ValueError(f"Invalid timeframe format: {timeframe}")
number = int(match.group(1))
unit = match.group(2)
return number, unit
__all__ = [
'TimeframeBucket',
'RealTimeCandleProcessor',
'BatchCandleProcessor',
'aggregate_trades_to_candles',
'validate_timeframe',
'parse_timeframe'
]

View File

@@ -0,0 +1,34 @@
"""
Aggregation package for market data processing.
This package provides functionality for building OHLCV candles from trade data,
with support for both real-time and batch processing. It handles:
- Time-based bucketing of trades
- Real-time candle construction
- Batch processing for historical data
- Multiple timeframe support
Note: The actual class exports will be added here once the refactoring is complete.
"""
from .bucket import TimeframeBucket
from .realtime import RealTimeCandleProcessor
from .batch import BatchCandleProcessor
from .utils import (
aggregate_trades_to_candles,
validate_timeframe,
parse_timeframe
)
__all__ = [
'TimeframeBucket',
'RealTimeCandleProcessor',
'BatchCandleProcessor',
'aggregate_trades_to_candles',
'validate_timeframe',
'parse_timeframe'
]
# Placeholder for future imports and exports
# These will be added as we move the classes into their respective modules

View File

@@ -0,0 +1,153 @@
"""
Batch candle processor for historical trade data.
This module provides the BatchCandleProcessor class for building OHLCV candles
from historical trade data in batch mode.
"""
from datetime import datetime
from typing import Dict, List, Any, Iterator
from collections import defaultdict
from ..data_types import StandardizedTrade, OHLCVCandle, ProcessingStats
from .bucket import TimeframeBucket
class BatchCandleProcessor:
"""
Batch candle processor for historical trade data.
This class processes trades in batch mode, building candles for multiple
timeframes simultaneously. It's optimized for processing large amounts
of historical trade data efficiently.
"""
def __init__(self,
symbol: str,
exchange: str,
timeframes: List[str],
component_name: str = "batch_candle_processor",
logger = None):
"""
Initialize batch candle processor.
Args:
symbol: Trading symbol (e.g., 'BTC-USDT')
exchange: Exchange name
timeframes: List of timeframes to process (e.g., ['1m', '5m'])
component_name: Name for logging/stats
logger: Optional logger instance
"""
self.symbol = symbol
self.exchange = exchange
self.timeframes = timeframes
self.component_name = component_name
self.logger = logger
# Stats tracking
self.stats = ProcessingStats()
def process_trades_to_candles(self, trades: Iterator[StandardizedTrade]) -> List[OHLCVCandle]:
"""
Process trades in batch and return completed candles.
Args:
trades: Iterator of trades to process
Returns:
List of completed candles for all timeframes
"""
# Track buckets for each timeframe
buckets: Dict[str, Dict[datetime, TimeframeBucket]] = defaultdict(dict)
# Process all trades
for trade in trades:
self.stats.trades_processed += 1
# Process trade for each timeframe
for timeframe in self.timeframes:
# Get bucket for this trade's timestamp
bucket_start = self._get_bucket_start_time(trade.timestamp, timeframe)
# Create bucket if it doesn't exist
if bucket_start not in buckets[timeframe]:
buckets[timeframe][bucket_start] = TimeframeBucket(
symbol=self.symbol,
timeframe=timeframe,
start_time=bucket_start,
exchange=self.exchange
)
# Add trade to bucket
buckets[timeframe][bucket_start].add_trade(trade)
# Convert all buckets to candles
candles = []
for timeframe_buckets in buckets.values():
for bucket in timeframe_buckets.values():
candle = bucket.to_candle(is_complete=True)
candles.append(candle)
self.stats.candles_emitted += 1
return sorted(candles, key=lambda x: (x.timeframe, x.end_time))
def _get_bucket_start_time(self, timestamp: datetime, timeframe: str) -> datetime:
"""
Calculate the start time for the bucket that this timestamp belongs to.
IMPORTANT: Uses RIGHT-ALIGNED timestamps
- For 5m timeframe, buckets start at 00:00, 00:05, 00:10, etc.
- Trade at 09:03:45 belongs to 09:00-09:05 bucket
- Trade at 09:07:30 belongs to 09:05-09:10 bucket
Args:
timestamp: Trade timestamp
timeframe: Time period (e.g., '1m', '5m', '1h')
Returns:
Start time for the appropriate bucket
"""
if timeframe == '1s':
return timestamp.replace(microsecond=0)
elif timeframe == '5s':
seconds = (timestamp.second // 5) * 5
return timestamp.replace(second=seconds, microsecond=0)
elif timeframe == '10s':
seconds = (timestamp.second // 10) * 10
return timestamp.replace(second=seconds, microsecond=0)
elif timeframe == '15s':
seconds = (timestamp.second // 15) * 15
return timestamp.replace(second=seconds, microsecond=0)
elif timeframe == '30s':
seconds = (timestamp.second // 30) * 30
return timestamp.replace(second=seconds, microsecond=0)
elif timeframe == '1m':
return timestamp.replace(second=0, microsecond=0)
elif timeframe == '5m':
minutes = (timestamp.minute // 5) * 5
return timestamp.replace(minute=minutes, second=0, microsecond=0)
elif timeframe == '15m':
minutes = (timestamp.minute // 15) * 15
return timestamp.replace(minute=minutes, second=0, microsecond=0)
elif timeframe == '30m':
minutes = (timestamp.minute // 30) * 30
return timestamp.replace(minute=minutes, second=0, microsecond=0)
elif timeframe == '1h':
return timestamp.replace(minute=0, second=0, microsecond=0)
elif timeframe == '4h':
hours = (timestamp.hour // 4) * 4
return timestamp.replace(hour=hours, minute=0, second=0, microsecond=0)
elif timeframe == '1d':
return timestamp.replace(hour=0, minute=0, second=0, microsecond=0)
else:
raise ValueError(f"Unsupported timeframe: {timeframe}")
def get_stats(self) -> Dict[str, Any]:
"""Get processing statistics."""
return {
"component": self.component_name,
"stats": self.stats.to_dict()
}
__all__ = ['BatchCandleProcessor']

View File

@@ -0,0 +1,144 @@
"""
Time bucket implementation for building OHLCV candles.
This module provides the TimeframeBucket class which accumulates trades
within a specific time period and calculates OHLCV data incrementally.
"""
from datetime import datetime, timezone, timedelta
from decimal import Decimal
from typing import Optional, List
from ..data_types import StandardizedTrade, OHLCVCandle
class TimeframeBucket:
"""
Time bucket for building OHLCV candles from trades.
This class accumulates trades within a specific time period
and calculates OHLCV data incrementally.
IMPORTANT: Uses RIGHT-ALIGNED timestamps
- start_time: Beginning of the interval (inclusive)
- end_time: End of the interval (exclusive) - this becomes the candle timestamp
- Example: 09:00:00 - 09:05:00 bucket -> candle timestamp = 09:05:00
"""
def __init__(self, symbol: str, timeframe: str, start_time: datetime, exchange: str = "unknown"):
"""
Initialize time bucket for candle aggregation.
Args:
symbol: Trading symbol (e.g., 'BTC-USDT')
timeframe: Time period (e.g., '1m', '5m', '1h')
start_time: Start time for this bucket (inclusive)
exchange: Exchange name
"""
self.symbol = symbol
self.timeframe = timeframe
self.start_time = start_time
self.end_time = self._calculate_end_time(start_time, timeframe)
self.exchange = exchange
# OHLCV data
self.open: Optional[Decimal] = None
self.high: Optional[Decimal] = None
self.low: Optional[Decimal] = None
self.close: Optional[Decimal] = None
self.volume: Decimal = Decimal('0')
self.trade_count: int = 0
# Tracking
self.first_trade_time: Optional[datetime] = None
self.last_trade_time: Optional[datetime] = None
self.trades: List[StandardizedTrade] = []
def add_trade(self, trade: StandardizedTrade) -> bool:
"""
Add trade to this bucket if it belongs to this time period.
Args:
trade: Standardized trade data
Returns:
True if trade was added, False if outside time range
"""
# Check if trade belongs in this bucket (start_time <= trade.timestamp < end_time)
if not (self.start_time <= trade.timestamp < self.end_time):
return False
# First trade sets open price
if self.open is None:
self.open = trade.price
self.high = trade.price
self.low = trade.price
self.first_trade_time = trade.timestamp
# Update OHLCV
self.high = max(self.high, trade.price)
self.low = min(self.low, trade.price)
self.close = trade.price # Last trade sets close
self.volume += trade.size
self.trade_count += 1
self.last_trade_time = trade.timestamp
# Store trade for detailed analysis if needed
self.trades.append(trade)
return True
def to_candle(self, is_complete: bool = True) -> OHLCVCandle:
"""
Convert bucket to OHLCV candle.
IMPORTANT: Candle timestamp = end_time (right-aligned, industry standard)
"""
return OHLCVCandle(
symbol=self.symbol,
timeframe=self.timeframe,
start_time=self.start_time,
end_time=self.end_time,
open=self.open or Decimal('0'),
high=self.high or Decimal('0'),
low=self.low or Decimal('0'),
close=self.close or Decimal('0'),
volume=self.volume,
trade_count=self.trade_count,
exchange=self.exchange,
is_complete=is_complete,
first_trade_time=self.first_trade_time,
last_trade_time=self.last_trade_time
)
def _calculate_end_time(self, start_time: datetime, timeframe: str) -> datetime:
"""Calculate end time for this timeframe (right-aligned timestamp)."""
if timeframe == '1s':
return start_time + timedelta(seconds=1)
elif timeframe == '5s':
return start_time + timedelta(seconds=5)
elif timeframe == '10s':
return start_time + timedelta(seconds=10)
elif timeframe == '15s':
return start_time + timedelta(seconds=15)
elif timeframe == '30s':
return start_time + timedelta(seconds=30)
elif timeframe == '1m':
return start_time + timedelta(minutes=1)
elif timeframe == '5m':
return start_time + timedelta(minutes=5)
elif timeframe == '15m':
return start_time + timedelta(minutes=15)
elif timeframe == '30m':
return start_time + timedelta(minutes=30)
elif timeframe == '1h':
return start_time + timedelta(hours=1)
elif timeframe == '4h':
return start_time + timedelta(hours=4)
elif timeframe == '1d':
return start_time + timedelta(days=1)
else:
raise ValueError(f"Unsupported timeframe: {timeframe}")
__all__ = ['TimeframeBucket']

View File

@@ -0,0 +1,235 @@
"""
Real-time candle processor for live trade data.
This module provides the RealTimeCandleProcessor class for building OHLCV candles
from live trade data in real-time.
"""
from datetime import datetime, timezone, timedelta
from decimal import Decimal
from typing import Dict, List, Optional, Any, Callable
from collections import defaultdict
from ..data_types import StandardizedTrade, OHLCVCandle, CandleProcessingConfig, ProcessingStats
from .bucket import TimeframeBucket
class RealTimeCandleProcessor:
"""
Real-time candle processor for live trade data.
This class processes trades immediately as they arrive from WebSocket,
building candles incrementally and emitting completed candles when
time boundaries are crossed.
AGGREGATION PROCESS (NO FUTURE LEAKAGE):
1. Trade arrives from WebSocket/API with timestamp T
2. For each configured timeframe (1m, 5m, etc.):
a. Calculate which time bucket this trade belongs to
b. Get current bucket for this timeframe
c. Check if trade timestamp crosses time boundary
d. If boundary crossed: complete and emit previous bucket, create new bucket
e. Add trade to current bucket (updates OHLCV)
3. Only emit candles when time boundary is definitively crossed
4. Never emit incomplete/future candles during real-time processing
TIMESTAMP ALIGNMENT:
- Uses RIGHT-ALIGNED timestamps (industry standard)
- 1-minute candle covering 09:00:00-09:01:00 gets timestamp 09:01:00
- 5-minute candle covering 09:00:00-09:05:00 gets timestamp 09:05:00
- Candle represents PAST data, never future
"""
def __init__(self,
symbol: str,
exchange: str,
config: Optional[CandleProcessingConfig] = None,
component_name: str = "realtime_candle_processor",
logger = None):
"""
Initialize real-time candle processor.
Args:
symbol: Trading symbol (e.g., 'BTC-USDT')
exchange: Exchange name
config: Candle processing configuration
component_name: Name for logging/stats
logger: Optional logger instance
"""
self.symbol = symbol
self.exchange = exchange
self.config = config or CandleProcessingConfig()
self.component_name = component_name
self.logger = logger
# Current buckets for each timeframe
self.current_buckets: Dict[str, TimeframeBucket] = {}
# Callbacks for completed candles
self.candle_callbacks: List[Callable[[OHLCVCandle], None]] = []
# Stats tracking
self.stats = ProcessingStats()
def add_candle_callback(self, callback: Callable[[OHLCVCandle], None]) -> None:
"""Add callback to be called when candle is completed."""
self.candle_callbacks.append(callback)
def process_trade(self, trade: StandardizedTrade) -> List[OHLCVCandle]:
"""
Process a single trade and return any completed candles.
Args:
trade: Standardized trade data
Returns:
List of completed candles (if any time boundaries were crossed)
"""
self.stats.trades_processed += 1
completed_candles = []
for timeframe in self.config.timeframes:
completed = self._process_trade_for_timeframe(trade, timeframe)
if completed:
completed_candles.append(completed)
self.stats.candles_emitted += 1
return completed_candles
def _process_trade_for_timeframe(self, trade: StandardizedTrade, timeframe: str) -> Optional[OHLCVCandle]:
"""
Process trade for a specific timeframe and return completed candle if boundary crossed.
Args:
trade: Trade to process
timeframe: Timeframe to process for (e.g., '1m', '5m')
Returns:
Completed candle if time boundary crossed, None otherwise
"""
# Calculate which bucket this trade belongs to
bucket_start = self._get_bucket_start_time(trade.timestamp, timeframe)
# Get current bucket for this timeframe
current_bucket = self.current_buckets.get(timeframe)
completed_candle = None
# If we have a current bucket and trade belongs in a new bucket,
# complete current bucket and create new one
if current_bucket and bucket_start >= current_bucket.end_time:
completed_candle = current_bucket.to_candle(is_complete=True)
self._emit_candle(completed_candle)
current_bucket = None
# Create new bucket if needed
if not current_bucket:
current_bucket = TimeframeBucket(
symbol=self.symbol,
timeframe=timeframe,
start_time=bucket_start,
exchange=self.exchange
)
self.current_buckets[timeframe] = current_bucket
# Add trade to current bucket
current_bucket.add_trade(trade)
return completed_candle
def _get_bucket_start_time(self, timestamp: datetime, timeframe: str) -> datetime:
"""
Calculate the start time for the bucket that this timestamp belongs to.
IMPORTANT: Uses RIGHT-ALIGNED timestamps
- For 5m timeframe, buckets start at 00:00, 00:05, 00:10, etc.
- Trade at 09:03:45 belongs to 09:00-09:05 bucket
- Trade at 09:07:30 belongs to 09:05-09:10 bucket
Args:
timestamp: Trade timestamp
timeframe: Time period (e.g., '1m', '5m', '1h')
Returns:
Start time for the appropriate bucket
"""
if timeframe == '1s':
return timestamp.replace(microsecond=0)
elif timeframe == '5s':
seconds = (timestamp.second // 5) * 5
return timestamp.replace(second=seconds, microsecond=0)
elif timeframe == '10s':
seconds = (timestamp.second // 10) * 10
return timestamp.replace(second=seconds, microsecond=0)
elif timeframe == '15s':
seconds = (timestamp.second // 15) * 15
return timestamp.replace(second=seconds, microsecond=0)
elif timeframe == '30s':
seconds = (timestamp.second // 30) * 30
return timestamp.replace(second=seconds, microsecond=0)
elif timeframe == '1m':
return timestamp.replace(second=0, microsecond=0)
elif timeframe == '5m':
minutes = (timestamp.minute // 5) * 5
return timestamp.replace(minute=minutes, second=0, microsecond=0)
elif timeframe == '15m':
minutes = (timestamp.minute // 15) * 15
return timestamp.replace(minute=minutes, second=0, microsecond=0)
elif timeframe == '30m':
minutes = (timestamp.minute // 30) * 30
return timestamp.replace(minute=minutes, second=0, microsecond=0)
elif timeframe == '1h':
return timestamp.replace(minute=0, second=0, microsecond=0)
elif timeframe == '4h':
hours = (timestamp.hour // 4) * 4
return timestamp.replace(hour=hours, minute=0, second=0, microsecond=0)
elif timeframe == '1d':
return timestamp.replace(hour=0, minute=0, second=0, microsecond=0)
else:
raise ValueError(f"Unsupported timeframe: {timeframe}")
def _emit_candle(self, candle: OHLCVCandle) -> None:
"""Emit completed candle to all registered callbacks."""
for callback in self.candle_callbacks:
try:
callback(candle)
except Exception as e:
if self.logger:
self.logger.error(f"Error in candle callback: {e}")
def get_current_candles(self, incomplete: bool = True) -> List[OHLCVCandle]:
"""
Get current (incomplete) candles for all timeframes.
Args:
incomplete: Whether to mark candles as incomplete (default True)
"""
return [
bucket.to_candle(is_complete=not incomplete)
for bucket in self.current_buckets.values()
]
def force_complete_all_candles(self) -> List[OHLCVCandle]:
"""
Force completion of all current candles (e.g., on connection close).
Returns:
List of completed candles
"""
completed = []
for timeframe, bucket in self.current_buckets.items():
candle = bucket.to_candle(is_complete=True)
completed.append(candle)
self._emit_candle(candle)
self.current_buckets.clear()
return completed
def get_stats(self) -> Dict[str, Any]:
"""Get processing statistics."""
return {
"component": self.component_name,
"stats": self.stats.to_dict()
}
__all__ = ['RealTimeCandleProcessor']

View File

@@ -0,0 +1,78 @@
"""
Utility functions for market data aggregation.
This module provides common utility functions for working with OHLCV candles
and trade data aggregation.
"""
import re
from typing import List, Tuple
from ..data_types import StandardizedTrade, OHLCVCandle
from .batch import BatchCandleProcessor
def aggregate_trades_to_candles(trades: List[StandardizedTrade],
timeframes: List[str],
symbol: str,
exchange: str) -> List[OHLCVCandle]:
"""
Simple utility function to aggregate a list of trades to candles.
Args:
trades: List of standardized trades
timeframes: List of timeframes to generate
symbol: Trading symbol
exchange: Exchange name
Returns:
List of completed candles
"""
processor = BatchCandleProcessor(symbol, exchange, timeframes)
return processor.process_trades_to_candles(iter(trades))
def validate_timeframe(timeframe: str) -> bool:
"""
Validate if timeframe is supported.
Args:
timeframe: Timeframe string (e.g., '1s', '5s', '10s', '1m', '5m', '1h')
Returns:
True if supported, False otherwise
"""
supported = ['1s', '5s', '10s', '15s', '30s', '1m', '5m', '15m', '30m', '1h', '4h', '1d']
return timeframe in supported
def parse_timeframe(timeframe: str) -> Tuple[int, str]:
"""
Parse timeframe string into number and unit.
Args:
timeframe: Timeframe string (e.g., '1s', '5m', '1h')
Returns:
Tuple of (number, unit)
Examples:
'1s' -> (1, 's')
'5m' -> (5, 'm')
'1h' -> (1, 'h')
'1d' -> (1, 'd')
"""
match = re.match(r'^(\d+)([smhd])$', timeframe.lower())
if not match:
raise ValueError(f"Invalid timeframe format: {timeframe}")
number = int(match.group(1))
unit = match.group(2)
return number, unit
__all__ = [
'aggregate_trades_to_candles',
'validate_timeframe',
'parse_timeframe'
]

View File

@@ -11,7 +11,7 @@ from typing import Dict, List, Optional, Any, Iterator
from abc import ABC, abstractmethod
from .data_types import StandardizedTrade, OHLCVCandle, DataValidationResult
from .aggregation import BatchCandleProcessor
from .aggregation.batch import BatchCandleProcessor
class BaseDataTransformer(ABC):

View File

@@ -17,13 +17,13 @@ from ...common import (
StandardizedTrade,
OHLCVCandle,
CandleProcessingConfig,
RealTimeCandleProcessor,
BaseDataValidator,
ValidationResult,
BaseDataTransformer,
UnifiedDataTransformer,
create_standardized_trade
)
from ...common.aggregation.realtime import RealTimeCandleProcessor
class OKXMessageType(Enum):