153 lines
5.4 KiB
Python
Raw Permalink Normal View History

"""
Time bucket implementation for building OHLCV candles.
This module provides the TimeframeBucket class which accumulates trades
within a specific time period and calculates OHLCV data incrementally.
"""
from datetime import datetime, timedelta
from decimal import Decimal
from typing import Optional, List
from ..data_types import StandardizedTrade, OHLCVCandle
from .utils import parse_timeframe
class TimeframeBucket:
"""
Time bucket for building OHLCV candles from trades.
This class accumulates trades within a specific time period
and calculates OHLCV data incrementally.
IMPORTANT: Uses RIGHT-ALIGNED timestamps
- start_time: Beginning of the interval (inclusive)
- end_time: End of the interval (exclusive) - this becomes the candle timestamp
- Example: 09:00:00 - 09:05:00 bucket -> candle timestamp = 09:05:00
"""
def __init__(self, symbol: str, timeframe: str, start_time: datetime, exchange: str = "unknown"):
"""
Initialize time bucket for candle aggregation.
Args:
symbol: Trading symbol (e.g., 'BTC-USDT')
timeframe: Time period (e.g., '1m', '5m', '1h')
start_time: Start time for this bucket (inclusive)
exchange: Exchange name
"""
self.symbol = symbol
self.timeframe = timeframe
self.start_time = start_time
self.end_time = self._calculate_end_time(start_time, timeframe)
self.exchange = exchange
# OHLCV data
self.open: Optional[Decimal] = None
self.high: Optional[Decimal] = None
self.low: Optional[Decimal] = None
self.close: Optional[Decimal] = None
self.volume: Decimal = Decimal('0')
self.trade_count: int = 0
# Tracking
self.first_trade_time: Optional[datetime] = None
self.last_trade_time: Optional[datetime] = None
self.trades: List[StandardizedTrade] = []
def add_trade(self, trade: StandardizedTrade) -> bool:
"""
Add trade to this bucket if it belongs to this time period.
Args:
trade: Standardized trade data
Returns:
True if trade was added, False if outside time range
"""
# Check if trade belongs in this bucket (start_time <= trade.timestamp < end_time)
if not (self.start_time <= trade.timestamp < self.end_time):
return False
# First trade sets open price
if self.open is None:
self.open = trade.price
self.high = trade.price
self.low = trade.price
self.first_trade_time = trade.timestamp
# Update OHLCV
self.high = max(self.high, trade.price)
self.low = min(self.low, trade.price)
self.close = trade.price # Last trade sets close
self.volume += trade.size
self.trade_count += 1
self.last_trade_time = trade.timestamp
# Store trade for detailed analysis if needed
self.trades.append(trade)
return True
def to_candle(self, is_complete: bool = True) -> OHLCVCandle:
"""
Convert bucket to OHLCV candle.
IMPORTANT: Candle timestamp = end_time (right-aligned, industry standard)
"""
return OHLCVCandle(
symbol=self.symbol,
timeframe=self.timeframe,
start_time=self.start_time,
end_time=self.end_time,
open=self.open or Decimal('0'),
high=self.high or Decimal('0'),
low=self.low or Decimal('0'),
close=self.close or Decimal('0'),
volume=self.volume,
trade_count=self.trade_count,
exchange=self.exchange,
is_complete=is_complete,
first_trade_time=self.first_trade_time,
last_trade_time=self.last_trade_time
)
@staticmethod
def _parse_timeframe_to_timedelta(timeframe: str) -> timedelta:
"""
Parse a timeframe string (e.g., '5m', '1h', '10s', '1d') into a timedelta object.
Supported units: 's' (seconds), 'm' (minutes), 'h' (hours), 'd' (days).
Args:
timeframe: Timeframe string (e.g., '5m', '1h')
Returns:
timedelta: Corresponding timedelta object
Raises:
ValueError: If the timeframe format or unit is unsupported
"""
number, unit = parse_timeframe(timeframe)
if unit == 's':
return timedelta(seconds=number)
elif unit == 'm':
return timedelta(minutes=number)
elif unit == 'h':
return timedelta(hours=number)
elif unit == 'd':
return timedelta(days=number)
else:
raise ValueError(f"Unsupported timeframe unit: {unit}")
def _calculate_end_time(self, start_time: datetime, timeframe: str) -> datetime:
"""
Calculate end time for this timeframe (right-aligned timestamp) using parsing-based logic.
Args:
start_time: The start datetime of the bucket
timeframe: Timeframe string (e.g., '5m', '1h')
Returns:
datetime: The end time of the bucket
Raises:
ValueError: If the timeframe is malformed or unsupported
"""
return start_time + self._parse_timeframe_to_timedelta(timeframe)
__all__ = ['TimeframeBucket']