update agregation to remove hardcoded timeframes and use unified timeframe function in all methods

This commit is contained in:
Vasily.onl 2025-06-09 14:18:32 +08:00
parent 0c08d0a8fe
commit fc3cac24bd
5 changed files with 134 additions and 104 deletions

View File

@ -11,6 +11,7 @@ from collections import defaultdict
from ..data_types import StandardizedTrade, OHLCVCandle, ProcessingStats from ..data_types import StandardizedTrade, OHLCVCandle, ProcessingStats
from .bucket import TimeframeBucket from .bucket import TimeframeBucket
from .utils import parse_timeframe
class BatchCandleProcessor: class BatchCandleProcessor:
@ -93,54 +94,32 @@ class BatchCandleProcessor:
def _get_bucket_start_time(self, timestamp: datetime, timeframe: str) -> datetime: def _get_bucket_start_time(self, timestamp: datetime, timeframe: str) -> datetime:
""" """
Calculate the start time for the bucket that this timestamp belongs to.
IMPORTANT: Uses RIGHT-ALIGNED timestamps IMPORTANT: Uses RIGHT-ALIGNED timestamps
- For 5m timeframe, buckets start at 00:00, 00:05, 00:10, etc. Calculate the start time for the bucket that this timestamp belongs to using parsing-based logic.
- Trade at 09:03:45 belongs to 09:00-09:05 bucket
- Trade at 09:07:30 belongs to 09:05-09:10 bucket
Args: Args:
timestamp: Trade timestamp timestamp: Trade timestamp
timeframe: Time period (e.g., '1m', '5m', '1h') timeframe: Time period (e.g., '1m', '5m', '1h')
Returns: Returns:
Start time for the appropriate bucket Start time for the appropriate bucket
Raises:
ValueError: If the timeframe is malformed or unsupported
""" """
if timeframe == '1s': number, unit = parse_timeframe(timeframe)
return timestamp.replace(microsecond=0) if unit == 's':
elif timeframe == '5s': seconds = (timestamp.second // number) * number
seconds = (timestamp.second // 5) * 5
return timestamp.replace(second=seconds, microsecond=0) return timestamp.replace(second=seconds, microsecond=0)
elif timeframe == '10s': elif unit == 'm':
seconds = (timestamp.second // 10) * 10 minutes = (timestamp.minute // number) * number
return timestamp.replace(second=seconds, microsecond=0)
elif timeframe == '15s':
seconds = (timestamp.second // 15) * 15
return timestamp.replace(second=seconds, microsecond=0)
elif timeframe == '30s':
seconds = (timestamp.second // 30) * 30
return timestamp.replace(second=seconds, microsecond=0)
elif timeframe == '1m':
return timestamp.replace(second=0, microsecond=0)
elif timeframe == '5m':
minutes = (timestamp.minute // 5) * 5
return timestamp.replace(minute=minutes, second=0, microsecond=0) return timestamp.replace(minute=minutes, second=0, microsecond=0)
elif timeframe == '15m': elif unit == 'h':
minutes = (timestamp.minute // 15) * 15 hours = (timestamp.hour // number) * number
return timestamp.replace(minute=minutes, second=0, microsecond=0)
elif timeframe == '30m':
minutes = (timestamp.minute // 30) * 30
return timestamp.replace(minute=minutes, second=0, microsecond=0)
elif timeframe == '1h':
return timestamp.replace(minute=0, second=0, microsecond=0)
elif timeframe == '4h':
hours = (timestamp.hour // 4) * 4
return timestamp.replace(hour=hours, minute=0, second=0, microsecond=0) return timestamp.replace(hour=hours, minute=0, second=0, microsecond=0)
elif timeframe == '1d': elif unit == 'd':
# For days, always floor to midnight
return timestamp.replace(hour=0, minute=0, second=0, microsecond=0) return timestamp.replace(hour=0, minute=0, second=0, microsecond=0)
else: else:
raise ValueError(f"Unsupported timeframe: {timeframe}") raise ValueError(f"Unsupported timeframe unit: {unit}")
def get_stats(self) -> Dict[str, Any]: def get_stats(self) -> Dict[str, Any]:
"""Get processing statistics.""" """Get processing statistics."""

View File

@ -5,11 +5,12 @@ This module provides the TimeframeBucket class which accumulates trades
within a specific time period and calculates OHLCV data incrementally. within a specific time period and calculates OHLCV data incrementally.
""" """
from datetime import datetime, timezone, timedelta from datetime import datetime, timedelta
from decimal import Decimal from decimal import Decimal
from typing import Optional, List from typing import Optional, List
from ..data_types import StandardizedTrade, OHLCVCandle from ..data_types import StandardizedTrade, OHLCVCandle
from .utils import parse_timeframe
class TimeframeBucket: class TimeframeBucket:
@ -111,34 +112,42 @@ class TimeframeBucket:
last_trade_time=self.last_trade_time last_trade_time=self.last_trade_time
) )
def _calculate_end_time(self, start_time: datetime, timeframe: str) -> datetime: @staticmethod
"""Calculate end time for this timeframe (right-aligned timestamp).""" def _parse_timeframe_to_timedelta(timeframe: str) -> timedelta:
if timeframe == '1s': """
return start_time + timedelta(seconds=1) Parse a timeframe string (e.g., '5m', '1h', '10s', '1d') into a timedelta object.
elif timeframe == '5s': Supported units: 's' (seconds), 'm' (minutes), 'h' (hours), 'd' (days).
return start_time + timedelta(seconds=5) Args:
elif timeframe == '10s': timeframe: Timeframe string (e.g., '5m', '1h')
return start_time + timedelta(seconds=10) Returns:
elif timeframe == '15s': timedelta: Corresponding timedelta object
return start_time + timedelta(seconds=15) Raises:
elif timeframe == '30s': ValueError: If the timeframe format or unit is unsupported
return start_time + timedelta(seconds=30) """
elif timeframe == '1m': number, unit = parse_timeframe(timeframe)
return start_time + timedelta(minutes=1) if unit == 's':
elif timeframe == '5m': return timedelta(seconds=number)
return start_time + timedelta(minutes=5) elif unit == 'm':
elif timeframe == '15m': return timedelta(minutes=number)
return start_time + timedelta(minutes=15) elif unit == 'h':
elif timeframe == '30m': return timedelta(hours=number)
return start_time + timedelta(minutes=30) elif unit == 'd':
elif timeframe == '1h': return timedelta(days=number)
return start_time + timedelta(hours=1)
elif timeframe == '4h':
return start_time + timedelta(hours=4)
elif timeframe == '1d':
return start_time + timedelta(days=1)
else: else:
raise ValueError(f"Unsupported timeframe: {timeframe}") raise ValueError(f"Unsupported timeframe unit: {unit}")
def _calculate_end_time(self, start_time: datetime, timeframe: str) -> datetime:
"""
Calculate end time for this timeframe (right-aligned timestamp) using parsing-based logic.
Args:
start_time: The start datetime of the bucket
timeframe: Timeframe string (e.g., '5m', '1h')
Returns:
datetime: The end time of the bucket
Raises:
ValueError: If the timeframe is malformed or unsupported
"""
return start_time + self._parse_timeframe_to_timedelta(timeframe)
__all__ = ['TimeframeBucket'] __all__ = ['TimeframeBucket']

View File

@ -5,10 +5,8 @@ This module provides the RealTimeCandleProcessor class for building OHLCV candle
from live trade data in real-time. from live trade data in real-time.
""" """
from datetime import datetime, timezone, timedelta from datetime import datetime
from decimal import Decimal
from typing import Dict, List, Optional, Any, Callable from typing import Dict, List, Optional, Any, Callable
from collections import defaultdict
from ..data_types import ( from ..data_types import (
StandardizedTrade, StandardizedTrade,
@ -17,6 +15,7 @@ from ..data_types import (
ProcessingStats ProcessingStats
) )
from .bucket import TimeframeBucket from .bucket import TimeframeBucket
from .utils import parse_timeframe
class RealTimeCandleProcessor: class RealTimeCandleProcessor:
@ -147,54 +146,31 @@ class RealTimeCandleProcessor:
def _get_bucket_start_time(self, timestamp: datetime, timeframe: str) -> datetime: def _get_bucket_start_time(self, timestamp: datetime, timeframe: str) -> datetime:
""" """
Calculate the start time for the bucket that this timestamp belongs to.
IMPORTANT: Uses RIGHT-ALIGNED timestamps IMPORTANT: Uses RIGHT-ALIGNED timestamps
- For 5m timeframe, buckets start at 00:00, 00:05, 00:10, etc. Calculate the start time for the bucket that this timestamp belongs to using parsing-based logic.
- Trade at 09:03:45 belongs to 09:00-09:05 bucket
- Trade at 09:07:30 belongs to 09:05-09:10 bucket
Args: Args:
timestamp: Trade timestamp timestamp: Trade timestamp
timeframe: Time period (e.g., '1m', '5m', '1h') timeframe: Time period (e.g., '1m', '5m', '1h')
Returns: Returns:
Start time for the appropriate bucket Start time for the appropriate bucket
Raises:
ValueError: If the timeframe is malformed or unsupported
""" """
if timeframe == '1s': number, unit = parse_timeframe(timeframe)
return timestamp.replace(microsecond=0) if unit == 's':
elif timeframe == '5s': seconds = (timestamp.second // number) * number
seconds = (timestamp.second // 5) * 5
return timestamp.replace(second=seconds, microsecond=0) return timestamp.replace(second=seconds, microsecond=0)
elif timeframe == '10s': elif unit == 'm':
seconds = (timestamp.second // 10) * 10 minutes = (timestamp.minute // number) * number
return timestamp.replace(second=seconds, microsecond=0)
elif timeframe == '15s':
seconds = (timestamp.second // 15) * 15
return timestamp.replace(second=seconds, microsecond=0)
elif timeframe == '30s':
seconds = (timestamp.second // 30) * 30
return timestamp.replace(second=seconds, microsecond=0)
elif timeframe == '1m':
return timestamp.replace(second=0, microsecond=0)
elif timeframe == '5m':
minutes = (timestamp.minute // 5) * 5
return timestamp.replace(minute=minutes, second=0, microsecond=0) return timestamp.replace(minute=minutes, second=0, microsecond=0)
elif timeframe == '15m': elif unit == 'h':
minutes = (timestamp.minute // 15) * 15 hours = (timestamp.hour // number) * number
return timestamp.replace(minute=minutes, second=0, microsecond=0)
elif timeframe == '30m':
minutes = (timestamp.minute // 30) * 30
return timestamp.replace(minute=minutes, second=0, microsecond=0)
elif timeframe == '1h':
return timestamp.replace(minute=0, second=0, microsecond=0)
elif timeframe == '4h':
hours = (timestamp.hour // 4) * 4
return timestamp.replace(hour=hours, minute=0, second=0, microsecond=0) return timestamp.replace(hour=hours, minute=0, second=0, microsecond=0)
elif timeframe == '1d': elif unit == 'd':
# For days, always floor to midnight
return timestamp.replace(hour=0, minute=0, second=0, microsecond=0) return timestamp.replace(hour=0, minute=0, second=0, microsecond=0)
else: else:
raise ValueError(f"Unsupported timeframe: {timeframe}") raise ValueError(f"Unsupported timeframe unit: {unit}")
def _emit_candle(self, candle: OHLCVCandle) -> None: def _emit_candle(self, candle: OHLCVCandle) -> None:
"""Emit completed candle to all registered callbacks.""" """Emit completed candle to all registered callbacks."""

View File

@ -9,7 +9,6 @@ import re
from typing import List, Tuple from typing import List, Tuple
from ..data_types import StandardizedTrade, OHLCVCandle from ..data_types import StandardizedTrade, OHLCVCandle
from .batch import BatchCandleProcessor
def aggregate_trades_to_candles(trades: List[StandardizedTrade], def aggregate_trades_to_candles(trades: List[StandardizedTrade],
@ -28,6 +27,7 @@ def aggregate_trades_to_candles(trades: List[StandardizedTrade],
Returns: Returns:
List of completed candles List of completed candles
""" """
from .batch import BatchCandleProcessor
processor = BatchCandleProcessor(symbol, exchange, timeframes) processor = BatchCandleProcessor(symbol, exchange, timeframes)
return processor.process_trades_to_candles(iter(trades)) return processor.process_trades_to_candles(iter(trades))
@ -65,9 +65,11 @@ def parse_timeframe(timeframe: str) -> Tuple[int, str]:
match = re.match(r'^(\d+)([smhd])$', timeframe.lower()) match = re.match(r'^(\d+)([smhd])$', timeframe.lower())
if not match: if not match:
raise ValueError(f"Invalid timeframe format: {timeframe}") raise ValueError(f"Invalid timeframe format: {timeframe}")
number = int(match.group(1)) number = int(match.group(1))
unit = match.group(2) unit = match.group(2)
# Disallow zero or negative timeframes, as they are not meaningful for bucket intervals
if number <= 0:
raise ValueError(f"Timeframe value must be positive: {timeframe}")
return number, unit return number, unit

View File

@ -0,0 +1,64 @@
import pytest
from datetime import datetime, timedelta
from data.common.aggregation.bucket import TimeframeBucket
from data.common.aggregation.batch import BatchCandleProcessor
class TestTimeframeBucket:
@pytest.mark.parametrize("timeframe, expected", [
("1s", timedelta(seconds=1)),
("5s", timedelta(seconds=5)),
("10s", timedelta(seconds=10)),
("15s", timedelta(seconds=15)),
("30s", timedelta(seconds=30)),
("1m", timedelta(minutes=1)),
("5m", timedelta(minutes=5)),
("15m", timedelta(minutes=15)),
("30m", timedelta(minutes=30)),
("1h", timedelta(hours=1)),
("4h", timedelta(hours=4)),
("1d", timedelta(days=1)),
])
def test_parse_timeframe_to_timedelta_valid(self, timeframe, expected):
assert TimeframeBucket._parse_timeframe_to_timedelta(timeframe) == expected
@pytest.mark.parametrize("timeframe", [
"1w", "abc", "60x", "", "m5", "-1m", "0h"
])
def test_parse_timeframe_to_timedelta_invalid(self, timeframe):
with pytest.raises(ValueError):
TimeframeBucket._parse_timeframe_to_timedelta(timeframe)
@pytest.mark.parametrize("timeframe", [
"-1m", "-5h", "-10s", "-2d"
])
def test_parse_timeframe_to_timedelta_negative(self, timeframe):
with pytest.raises(ValueError):
TimeframeBucket._parse_timeframe_to_timedelta(timeframe)
def test_calculate_end_time(self):
start = datetime(2024, 1, 1, 0, 0, 0)
bucket = TimeframeBucket(symbol="BTC-USDT", timeframe="5m", start_time=start)
expected_end = start + timedelta(minutes=5)
assert bucket.end_time == expected_end
@pytest.mark.parametrize("timeframe, timestamp, expected_start", [
("1s", datetime(2024, 1, 1, 12, 0, 0, 123456), datetime(2024, 1, 1, 12, 0, 0)),
("5s", datetime(2024, 1, 1, 12, 0, 7), datetime(2024, 1, 1, 12, 0, 5)),
("10s", datetime(2024, 1, 1, 12, 0, 17), datetime(2024, 1, 1, 12, 0, 10)),
("1m", datetime(2024, 1, 1, 12, 7, 45), datetime(2024, 1, 1, 12, 7, 0)),
("5m", datetime(2024, 1, 1, 9, 7, 30), datetime(2024, 1, 1, 9, 5, 0)),
("15m", datetime(2024, 1, 1, 9, 23, 0), datetime(2024, 1, 1, 9, 15, 0)),
("1h", datetime(2024, 1, 1, 13, 45, 0), datetime(2024, 1, 1, 13, 0, 0)),
("4h", datetime(2024, 1, 1, 11, 15, 0), datetime(2024, 1, 1, 8, 0, 0)),
("1d", datetime(2024, 1, 1, 18, 30, 0), datetime(2024, 1, 1, 0, 0, 0)),
])
def test_get_bucket_start_time_valid(timeframe, timestamp, expected_start):
processor = BatchCandleProcessor(symbol="BTC-USDT", exchange="okx", timeframes=[timeframe])
start = processor._get_bucket_start_time(timestamp, timeframe)
assert start == expected_start
@pytest.mark.parametrize("timeframe", ["0h", "-5m", "abc", "1w", "", "m5"])
def test_get_bucket_start_time_invalid(timeframe):
processor = BatchCandleProcessor(symbol="BTC-USDT", exchange="okx", timeframes=[timeframe])
with pytest.raises(ValueError):
processor._get_bucket_start_time(datetime(2024, 1, 1, 12, 0, 0), timeframe)