From fc3cac24bd762ed898706e489faf6800991f37dd Mon Sep 17 00:00:00 2001 From: "Vasily.onl" Date: Mon, 9 Jun 2025 14:18:32 +0800 Subject: [PATCH] update agregation to remove hardcoded timeframes and use unified timeframe function in all methods --- data/common/aggregation/batch.py | 49 +++++++--------------- data/common/aggregation/bucket.py | 65 ++++++++++++++++------------- data/common/aggregation/realtime.py | 54 +++++++----------------- data/common/aggregation/utils.py | 6 ++- tests/data/common/test_bucket.py | 64 ++++++++++++++++++++++++++++ 5 files changed, 134 insertions(+), 104 deletions(-) create mode 100644 tests/data/common/test_bucket.py diff --git a/data/common/aggregation/batch.py b/data/common/aggregation/batch.py index 4d0e0ee..cfce027 100644 --- a/data/common/aggregation/batch.py +++ b/data/common/aggregation/batch.py @@ -11,6 +11,7 @@ from collections import defaultdict from ..data_types import StandardizedTrade, OHLCVCandle, ProcessingStats from .bucket import TimeframeBucket +from .utils import parse_timeframe class BatchCandleProcessor: @@ -93,54 +94,32 @@ class BatchCandleProcessor: def _get_bucket_start_time(self, timestamp: datetime, timeframe: str) -> datetime: """ - Calculate the start time for the bucket that this timestamp belongs to. IMPORTANT: Uses RIGHT-ALIGNED timestamps - - For 5m timeframe, buckets start at 00:00, 00:05, 00:10, etc. - - Trade at 09:03:45 belongs to 09:00-09:05 bucket - - Trade at 09:07:30 belongs to 09:05-09:10 bucket - + Calculate the start time for the bucket that this timestamp belongs to using parsing-based logic. Args: timestamp: Trade timestamp timeframe: Time period (e.g., '1m', '5m', '1h') - Returns: Start time for the appropriate bucket + Raises: + ValueError: If the timeframe is malformed or unsupported """ - if timeframe == '1s': - return timestamp.replace(microsecond=0) - elif timeframe == '5s': - seconds = (timestamp.second // 5) * 5 + number, unit = parse_timeframe(timeframe) + if unit == 's': + seconds = (timestamp.second // number) * number return timestamp.replace(second=seconds, microsecond=0) - elif timeframe == '10s': - seconds = (timestamp.second // 10) * 10 - return timestamp.replace(second=seconds, microsecond=0) - elif timeframe == '15s': - seconds = (timestamp.second // 15) * 15 - return timestamp.replace(second=seconds, microsecond=0) - elif timeframe == '30s': - seconds = (timestamp.second // 30) * 30 - return timestamp.replace(second=seconds, microsecond=0) - elif timeframe == '1m': - return timestamp.replace(second=0, microsecond=0) - elif timeframe == '5m': - minutes = (timestamp.minute // 5) * 5 + elif unit == 'm': + minutes = (timestamp.minute // number) * number return timestamp.replace(minute=minutes, second=0, microsecond=0) - elif timeframe == '15m': - minutes = (timestamp.minute // 15) * 15 - return timestamp.replace(minute=minutes, second=0, microsecond=0) - elif timeframe == '30m': - minutes = (timestamp.minute // 30) * 30 - return timestamp.replace(minute=minutes, second=0, microsecond=0) - elif timeframe == '1h': - return timestamp.replace(minute=0, second=0, microsecond=0) - elif timeframe == '4h': - hours = (timestamp.hour // 4) * 4 + elif unit == 'h': + hours = (timestamp.hour // number) * number return timestamp.replace(hour=hours, minute=0, second=0, microsecond=0) - elif timeframe == '1d': + elif unit == 'd': + # For days, always floor to midnight return timestamp.replace(hour=0, minute=0, second=0, microsecond=0) else: - raise ValueError(f"Unsupported timeframe: {timeframe}") + raise ValueError(f"Unsupported timeframe unit: {unit}") def get_stats(self) -> Dict[str, Any]: """Get processing statistics.""" diff --git a/data/common/aggregation/bucket.py b/data/common/aggregation/bucket.py index ac5d182..3e9ec8a 100644 --- a/data/common/aggregation/bucket.py +++ b/data/common/aggregation/bucket.py @@ -5,11 +5,12 @@ This module provides the TimeframeBucket class which accumulates trades within a specific time period and calculates OHLCV data incrementally. """ -from datetime import datetime, timezone, timedelta +from datetime import datetime, timedelta from decimal import Decimal from typing import Optional, List from ..data_types import StandardizedTrade, OHLCVCandle +from .utils import parse_timeframe class TimeframeBucket: @@ -111,34 +112,42 @@ class TimeframeBucket: last_trade_time=self.last_trade_time ) - def _calculate_end_time(self, start_time: datetime, timeframe: str) -> datetime: - """Calculate end time for this timeframe (right-aligned timestamp).""" - if timeframe == '1s': - return start_time + timedelta(seconds=1) - elif timeframe == '5s': - return start_time + timedelta(seconds=5) - elif timeframe == '10s': - return start_time + timedelta(seconds=10) - elif timeframe == '15s': - return start_time + timedelta(seconds=15) - elif timeframe == '30s': - return start_time + timedelta(seconds=30) - elif timeframe == '1m': - return start_time + timedelta(minutes=1) - elif timeframe == '5m': - return start_time + timedelta(minutes=5) - elif timeframe == '15m': - return start_time + timedelta(minutes=15) - elif timeframe == '30m': - return start_time + timedelta(minutes=30) - elif timeframe == '1h': - return start_time + timedelta(hours=1) - elif timeframe == '4h': - return start_time + timedelta(hours=4) - elif timeframe == '1d': - return start_time + timedelta(days=1) + @staticmethod + def _parse_timeframe_to_timedelta(timeframe: str) -> timedelta: + """ + Parse a timeframe string (e.g., '5m', '1h', '10s', '1d') into a timedelta object. + Supported units: 's' (seconds), 'm' (minutes), 'h' (hours), 'd' (days). + Args: + timeframe: Timeframe string (e.g., '5m', '1h') + Returns: + timedelta: Corresponding timedelta object + Raises: + ValueError: If the timeframe format or unit is unsupported + """ + number, unit = parse_timeframe(timeframe) + if unit == 's': + return timedelta(seconds=number) + elif unit == 'm': + return timedelta(minutes=number) + elif unit == 'h': + return timedelta(hours=number) + elif unit == 'd': + return timedelta(days=number) else: - raise ValueError(f"Unsupported timeframe: {timeframe}") + raise ValueError(f"Unsupported timeframe unit: {unit}") + + def _calculate_end_time(self, start_time: datetime, timeframe: str) -> datetime: + """ + Calculate end time for this timeframe (right-aligned timestamp) using parsing-based logic. + Args: + start_time: The start datetime of the bucket + timeframe: Timeframe string (e.g., '5m', '1h') + Returns: + datetime: The end time of the bucket + Raises: + ValueError: If the timeframe is malformed or unsupported + """ + return start_time + self._parse_timeframe_to_timedelta(timeframe) __all__ = ['TimeframeBucket'] \ No newline at end of file diff --git a/data/common/aggregation/realtime.py b/data/common/aggregation/realtime.py index 0b49377..96168d1 100644 --- a/data/common/aggregation/realtime.py +++ b/data/common/aggregation/realtime.py @@ -5,10 +5,8 @@ This module provides the RealTimeCandleProcessor class for building OHLCV candle from live trade data in real-time. """ -from datetime import datetime, timezone, timedelta -from decimal import Decimal +from datetime import datetime from typing import Dict, List, Optional, Any, Callable -from collections import defaultdict from ..data_types import ( StandardizedTrade, @@ -17,6 +15,7 @@ from ..data_types import ( ProcessingStats ) from .bucket import TimeframeBucket +from .utils import parse_timeframe class RealTimeCandleProcessor: @@ -147,54 +146,31 @@ class RealTimeCandleProcessor: def _get_bucket_start_time(self, timestamp: datetime, timeframe: str) -> datetime: """ - Calculate the start time for the bucket that this timestamp belongs to. - IMPORTANT: Uses RIGHT-ALIGNED timestamps - - For 5m timeframe, buckets start at 00:00, 00:05, 00:10, etc. - - Trade at 09:03:45 belongs to 09:00-09:05 bucket - - Trade at 09:07:30 belongs to 09:05-09:10 bucket - + Calculate the start time for the bucket that this timestamp belongs to using parsing-based logic. Args: timestamp: Trade timestamp timeframe: Time period (e.g., '1m', '5m', '1h') - Returns: Start time for the appropriate bucket + Raises: + ValueError: If the timeframe is malformed or unsupported """ - if timeframe == '1s': - return timestamp.replace(microsecond=0) - elif timeframe == '5s': - seconds = (timestamp.second // 5) * 5 + number, unit = parse_timeframe(timeframe) + if unit == 's': + seconds = (timestamp.second // number) * number return timestamp.replace(second=seconds, microsecond=0) - elif timeframe == '10s': - seconds = (timestamp.second // 10) * 10 - return timestamp.replace(second=seconds, microsecond=0) - elif timeframe == '15s': - seconds = (timestamp.second // 15) * 15 - return timestamp.replace(second=seconds, microsecond=0) - elif timeframe == '30s': - seconds = (timestamp.second // 30) * 30 - return timestamp.replace(second=seconds, microsecond=0) - elif timeframe == '1m': - return timestamp.replace(second=0, microsecond=0) - elif timeframe == '5m': - minutes = (timestamp.minute // 5) * 5 + elif unit == 'm': + minutes = (timestamp.minute // number) * number return timestamp.replace(minute=minutes, second=0, microsecond=0) - elif timeframe == '15m': - minutes = (timestamp.minute // 15) * 15 - return timestamp.replace(minute=minutes, second=0, microsecond=0) - elif timeframe == '30m': - minutes = (timestamp.minute // 30) * 30 - return timestamp.replace(minute=minutes, second=0, microsecond=0) - elif timeframe == '1h': - return timestamp.replace(minute=0, second=0, microsecond=0) - elif timeframe == '4h': - hours = (timestamp.hour // 4) * 4 + elif unit == 'h': + hours = (timestamp.hour // number) * number return timestamp.replace(hour=hours, minute=0, second=0, microsecond=0) - elif timeframe == '1d': + elif unit == 'd': + # For days, always floor to midnight return timestamp.replace(hour=0, minute=0, second=0, microsecond=0) else: - raise ValueError(f"Unsupported timeframe: {timeframe}") + raise ValueError(f"Unsupported timeframe unit: {unit}") def _emit_candle(self, candle: OHLCVCandle) -> None: """Emit completed candle to all registered callbacks.""" diff --git a/data/common/aggregation/utils.py b/data/common/aggregation/utils.py index 85962e0..441fd24 100644 --- a/data/common/aggregation/utils.py +++ b/data/common/aggregation/utils.py @@ -9,7 +9,6 @@ import re from typing import List, Tuple from ..data_types import StandardizedTrade, OHLCVCandle -from .batch import BatchCandleProcessor def aggregate_trades_to_candles(trades: List[StandardizedTrade], @@ -28,6 +27,7 @@ def aggregate_trades_to_candles(trades: List[StandardizedTrade], Returns: List of completed candles """ + from .batch import BatchCandleProcessor processor = BatchCandleProcessor(symbol, exchange, timeframes) return processor.process_trades_to_candles(iter(trades)) @@ -65,9 +65,11 @@ def parse_timeframe(timeframe: str) -> Tuple[int, str]: match = re.match(r'^(\d+)([smhd])$', timeframe.lower()) if not match: raise ValueError(f"Invalid timeframe format: {timeframe}") - number = int(match.group(1)) unit = match.group(2) + # Disallow zero or negative timeframes, as they are not meaningful for bucket intervals + if number <= 0: + raise ValueError(f"Timeframe value must be positive: {timeframe}") return number, unit diff --git a/tests/data/common/test_bucket.py b/tests/data/common/test_bucket.py new file mode 100644 index 0000000..5f0983b --- /dev/null +++ b/tests/data/common/test_bucket.py @@ -0,0 +1,64 @@ +import pytest +from datetime import datetime, timedelta +from data.common.aggregation.bucket import TimeframeBucket +from data.common.aggregation.batch import BatchCandleProcessor + +class TestTimeframeBucket: + @pytest.mark.parametrize("timeframe, expected", [ + ("1s", timedelta(seconds=1)), + ("5s", timedelta(seconds=5)), + ("10s", timedelta(seconds=10)), + ("15s", timedelta(seconds=15)), + ("30s", timedelta(seconds=30)), + ("1m", timedelta(minutes=1)), + ("5m", timedelta(minutes=5)), + ("15m", timedelta(minutes=15)), + ("30m", timedelta(minutes=30)), + ("1h", timedelta(hours=1)), + ("4h", timedelta(hours=4)), + ("1d", timedelta(days=1)), + ]) + def test_parse_timeframe_to_timedelta_valid(self, timeframe, expected): + assert TimeframeBucket._parse_timeframe_to_timedelta(timeframe) == expected + + @pytest.mark.parametrize("timeframe", [ + "1w", "abc", "60x", "", "m5", "-1m", "0h" + ]) + def test_parse_timeframe_to_timedelta_invalid(self, timeframe): + with pytest.raises(ValueError): + TimeframeBucket._parse_timeframe_to_timedelta(timeframe) + + @pytest.mark.parametrize("timeframe", [ + "-1m", "-5h", "-10s", "-2d" + ]) + def test_parse_timeframe_to_timedelta_negative(self, timeframe): + with pytest.raises(ValueError): + TimeframeBucket._parse_timeframe_to_timedelta(timeframe) + + def test_calculate_end_time(self): + start = datetime(2024, 1, 1, 0, 0, 0) + bucket = TimeframeBucket(symbol="BTC-USDT", timeframe="5m", start_time=start) + expected_end = start + timedelta(minutes=5) + assert bucket.end_time == expected_end + +@pytest.mark.parametrize("timeframe, timestamp, expected_start", [ + ("1s", datetime(2024, 1, 1, 12, 0, 0, 123456), datetime(2024, 1, 1, 12, 0, 0)), + ("5s", datetime(2024, 1, 1, 12, 0, 7), datetime(2024, 1, 1, 12, 0, 5)), + ("10s", datetime(2024, 1, 1, 12, 0, 17), datetime(2024, 1, 1, 12, 0, 10)), + ("1m", datetime(2024, 1, 1, 12, 7, 45), datetime(2024, 1, 1, 12, 7, 0)), + ("5m", datetime(2024, 1, 1, 9, 7, 30), datetime(2024, 1, 1, 9, 5, 0)), + ("15m", datetime(2024, 1, 1, 9, 23, 0), datetime(2024, 1, 1, 9, 15, 0)), + ("1h", datetime(2024, 1, 1, 13, 45, 0), datetime(2024, 1, 1, 13, 0, 0)), + ("4h", datetime(2024, 1, 1, 11, 15, 0), datetime(2024, 1, 1, 8, 0, 0)), + ("1d", datetime(2024, 1, 1, 18, 30, 0), datetime(2024, 1, 1, 0, 0, 0)), +]) +def test_get_bucket_start_time_valid(timeframe, timestamp, expected_start): + processor = BatchCandleProcessor(symbol="BTC-USDT", exchange="okx", timeframes=[timeframe]) + start = processor._get_bucket_start_time(timestamp, timeframe) + assert start == expected_start + +@pytest.mark.parametrize("timeframe", ["0h", "-5m", "abc", "1w", "", "m5"]) +def test_get_bucket_start_time_invalid(timeframe): + processor = BatchCandleProcessor(symbol="BTC-USDT", exchange="okx", timeframes=[timeframe]) + with pytest.raises(ValueError): + processor._get_bucket_start_time(datetime(2024, 1, 1, 12, 0, 0), timeframe) \ No newline at end of file