144 lines
5.1 KiB
Python
Raw Normal View History

"""
Time bucket implementation for building OHLCV candles.
This module provides the TimeframeBucket class which accumulates trades
within a specific time period and calculates OHLCV data incrementally.
"""
from datetime import datetime, timezone, timedelta
from decimal import Decimal
from typing import Optional, List
from ..data_types import StandardizedTrade, OHLCVCandle
class TimeframeBucket:
"""
Time bucket for building OHLCV candles from trades.
This class accumulates trades within a specific time period
and calculates OHLCV data incrementally.
IMPORTANT: Uses RIGHT-ALIGNED timestamps
- start_time: Beginning of the interval (inclusive)
- end_time: End of the interval (exclusive) - this becomes the candle timestamp
- Example: 09:00:00 - 09:05:00 bucket -> candle timestamp = 09:05:00
"""
def __init__(self, symbol: str, timeframe: str, start_time: datetime, exchange: str = "unknown"):
"""
Initialize time bucket for candle aggregation.
Args:
symbol: Trading symbol (e.g., 'BTC-USDT')
timeframe: Time period (e.g., '1m', '5m', '1h')
start_time: Start time for this bucket (inclusive)
exchange: Exchange name
"""
self.symbol = symbol
self.timeframe = timeframe
self.start_time = start_time
self.end_time = self._calculate_end_time(start_time, timeframe)
self.exchange = exchange
# OHLCV data
self.open: Optional[Decimal] = None
self.high: Optional[Decimal] = None
self.low: Optional[Decimal] = None
self.close: Optional[Decimal] = None
self.volume: Decimal = Decimal('0')
self.trade_count: int = 0
# Tracking
self.first_trade_time: Optional[datetime] = None
self.last_trade_time: Optional[datetime] = None
self.trades: List[StandardizedTrade] = []
def add_trade(self, trade: StandardizedTrade) -> bool:
"""
Add trade to this bucket if it belongs to this time period.
Args:
trade: Standardized trade data
Returns:
True if trade was added, False if outside time range
"""
# Check if trade belongs in this bucket (start_time <= trade.timestamp < end_time)
if not (self.start_time <= trade.timestamp < self.end_time):
return False
# First trade sets open price
if self.open is None:
self.open = trade.price
self.high = trade.price
self.low = trade.price
self.first_trade_time = trade.timestamp
# Update OHLCV
self.high = max(self.high, trade.price)
self.low = min(self.low, trade.price)
self.close = trade.price # Last trade sets close
self.volume += trade.size
self.trade_count += 1
self.last_trade_time = trade.timestamp
# Store trade for detailed analysis if needed
self.trades.append(trade)
return True
def to_candle(self, is_complete: bool = True) -> OHLCVCandle:
"""
Convert bucket to OHLCV candle.
IMPORTANT: Candle timestamp = end_time (right-aligned, industry standard)
"""
return OHLCVCandle(
symbol=self.symbol,
timeframe=self.timeframe,
start_time=self.start_time,
end_time=self.end_time,
open=self.open or Decimal('0'),
high=self.high or Decimal('0'),
low=self.low or Decimal('0'),
close=self.close or Decimal('0'),
volume=self.volume,
trade_count=self.trade_count,
exchange=self.exchange,
is_complete=is_complete,
first_trade_time=self.first_trade_time,
last_trade_time=self.last_trade_time
)
def _calculate_end_time(self, start_time: datetime, timeframe: str) -> datetime:
"""Calculate end time for this timeframe (right-aligned timestamp)."""
if timeframe == '1s':
return start_time + timedelta(seconds=1)
elif timeframe == '5s':
return start_time + timedelta(seconds=5)
elif timeframe == '10s':
return start_time + timedelta(seconds=10)
elif timeframe == '15s':
return start_time + timedelta(seconds=15)
elif timeframe == '30s':
return start_time + timedelta(seconds=30)
elif timeframe == '1m':
return start_time + timedelta(minutes=1)
elif timeframe == '5m':
return start_time + timedelta(minutes=5)
elif timeframe == '15m':
return start_time + timedelta(minutes=15)
elif timeframe == '30m':
return start_time + timedelta(minutes=30)
elif timeframe == '1h':
return start_time + timedelta(hours=1)
elif timeframe == '4h':
return start_time + timedelta(hours=4)
elif timeframe == '1d':
return start_time + timedelta(days=1)
else:
raise ValueError(f"Unsupported timeframe: {timeframe}")
__all__ = ['TimeframeBucket']