153 lines
5.4 KiB
Python
153 lines
5.4 KiB
Python
"""
|
|
Time bucket implementation for building OHLCV candles.
|
|
|
|
This module provides the TimeframeBucket class which accumulates trades
|
|
within a specific time period and calculates OHLCV data incrementally.
|
|
"""
|
|
|
|
from datetime import datetime, timedelta
|
|
from decimal import Decimal
|
|
from typing import Optional, List
|
|
|
|
from ..data_types import StandardizedTrade, OHLCVCandle
|
|
from .utils import parse_timeframe
|
|
|
|
|
|
class TimeframeBucket:
|
|
"""
|
|
Time bucket for building OHLCV candles from trades.
|
|
|
|
This class accumulates trades within a specific time period
|
|
and calculates OHLCV data incrementally.
|
|
|
|
IMPORTANT: Uses RIGHT-ALIGNED timestamps
|
|
- start_time: Beginning of the interval (inclusive)
|
|
- end_time: End of the interval (exclusive) - this becomes the candle timestamp
|
|
- Example: 09:00:00 - 09:05:00 bucket -> candle timestamp = 09:05:00
|
|
"""
|
|
|
|
def __init__(self, symbol: str, timeframe: str, start_time: datetime, exchange: str = "unknown"):
|
|
"""
|
|
Initialize time bucket for candle aggregation.
|
|
|
|
Args:
|
|
symbol: Trading symbol (e.g., 'BTC-USDT')
|
|
timeframe: Time period (e.g., '1m', '5m', '1h')
|
|
start_time: Start time for this bucket (inclusive)
|
|
exchange: Exchange name
|
|
"""
|
|
self.symbol = symbol
|
|
self.timeframe = timeframe
|
|
self.start_time = start_time
|
|
self.end_time = self._calculate_end_time(start_time, timeframe)
|
|
self.exchange = exchange
|
|
|
|
# OHLCV data
|
|
self.open: Optional[Decimal] = None
|
|
self.high: Optional[Decimal] = None
|
|
self.low: Optional[Decimal] = None
|
|
self.close: Optional[Decimal] = None
|
|
self.volume: Decimal = Decimal('0')
|
|
self.trade_count: int = 0
|
|
|
|
# Tracking
|
|
self.first_trade_time: Optional[datetime] = None
|
|
self.last_trade_time: Optional[datetime] = None
|
|
self.trades: List[StandardizedTrade] = []
|
|
|
|
def add_trade(self, trade: StandardizedTrade) -> bool:
|
|
"""
|
|
Add trade to this bucket if it belongs to this time period.
|
|
|
|
Args:
|
|
trade: Standardized trade data
|
|
|
|
Returns:
|
|
True if trade was added, False if outside time range
|
|
"""
|
|
# Check if trade belongs in this bucket (start_time <= trade.timestamp < end_time)
|
|
if not (self.start_time <= trade.timestamp < self.end_time):
|
|
return False
|
|
|
|
# First trade sets open price
|
|
if self.open is None:
|
|
self.open = trade.price
|
|
self.high = trade.price
|
|
self.low = trade.price
|
|
self.first_trade_time = trade.timestamp
|
|
|
|
# Update OHLCV
|
|
self.high = max(self.high, trade.price)
|
|
self.low = min(self.low, trade.price)
|
|
self.close = trade.price # Last trade sets close
|
|
self.volume += trade.size
|
|
self.trade_count += 1
|
|
self.last_trade_time = trade.timestamp
|
|
|
|
# Store trade for detailed analysis if needed
|
|
self.trades.append(trade)
|
|
|
|
return True
|
|
|
|
def to_candle(self, is_complete: bool = True) -> OHLCVCandle:
|
|
"""
|
|
Convert bucket to OHLCV candle.
|
|
|
|
IMPORTANT: Candle timestamp = end_time (right-aligned, industry standard)
|
|
"""
|
|
return OHLCVCandle(
|
|
symbol=self.symbol,
|
|
timeframe=self.timeframe,
|
|
start_time=self.start_time,
|
|
end_time=self.end_time,
|
|
open=self.open or Decimal('0'),
|
|
high=self.high or Decimal('0'),
|
|
low=self.low or Decimal('0'),
|
|
close=self.close or Decimal('0'),
|
|
volume=self.volume,
|
|
trade_count=self.trade_count,
|
|
exchange=self.exchange,
|
|
is_complete=is_complete,
|
|
first_trade_time=self.first_trade_time,
|
|
last_trade_time=self.last_trade_time
|
|
)
|
|
|
|
@staticmethod
|
|
def _parse_timeframe_to_timedelta(timeframe: str) -> timedelta:
|
|
"""
|
|
Parse a timeframe string (e.g., '5m', '1h', '10s', '1d') into a timedelta object.
|
|
Supported units: 's' (seconds), 'm' (minutes), 'h' (hours), 'd' (days).
|
|
Args:
|
|
timeframe: Timeframe string (e.g., '5m', '1h')
|
|
Returns:
|
|
timedelta: Corresponding timedelta object
|
|
Raises:
|
|
ValueError: If the timeframe format or unit is unsupported
|
|
"""
|
|
number, unit = parse_timeframe(timeframe)
|
|
if unit == 's':
|
|
return timedelta(seconds=number)
|
|
elif unit == 'm':
|
|
return timedelta(minutes=number)
|
|
elif unit == 'h':
|
|
return timedelta(hours=number)
|
|
elif unit == 'd':
|
|
return timedelta(days=number)
|
|
else:
|
|
raise ValueError(f"Unsupported timeframe unit: {unit}")
|
|
|
|
def _calculate_end_time(self, start_time: datetime, timeframe: str) -> datetime:
|
|
"""
|
|
Calculate end time for this timeframe (right-aligned timestamp) using parsing-based logic.
|
|
Args:
|
|
start_time: The start datetime of the bucket
|
|
timeframe: Timeframe string (e.g., '5m', '1h')
|
|
Returns:
|
|
datetime: The end time of the bucket
|
|
Raises:
|
|
ValueError: If the timeframe is malformed or unsupported
|
|
"""
|
|
return start_time + self._parse_timeframe_to_timedelta(timeframe)
|
|
|
|
|
|
__all__ = ['TimeframeBucket'] |