636 lines
20 KiB
Markdown
636 lines
20 KiB
Markdown
# Timeframe Aggregation Usage Guide
|
|
|
|
## Overview
|
|
|
|
This guide covers how to use the new timeframe aggregation utilities in the IncrementalTrader framework. The new system provides mathematically correct aggregation with proper timestamp handling to prevent future data leakage.
|
|
|
|
## Key Features
|
|
|
|
### ✅ **Fixed Critical Issues**
|
|
- **No Future Data Leakage**: Bar timestamps represent END of period
|
|
- **Mathematical Correctness**: Results match pandas resampling exactly
|
|
- **Trading Industry Standard**: Uses standard bar grouping conventions
|
|
- **Proper OHLCV Aggregation**: Correct first/max/min/last/sum rules
|
|
|
|
### 🚀 **New Capabilities**
|
|
- **MinuteDataBuffer**: Efficient real-time data management
|
|
- **Flexible Timestamp Modes**: Support for both bar start and end timestamps
|
|
- **Memory Bounded**: Automatic buffer size management
|
|
- **Performance Optimized**: Fast aggregation for real-time use
|
|
|
|
## Quick Start
|
|
|
|
### Basic Usage
|
|
|
|
```python
|
|
from IncrementalTrader.utils.timeframe_utils import aggregate_minute_data_to_timeframe
|
|
|
|
# Sample minute data
|
|
minute_data = [
|
|
{
|
|
'timestamp': pd.Timestamp('2024-01-01 09:00:00'),
|
|
'open': 50000.0, 'high': 50050.0, 'low': 49950.0, 'close': 50025.0, 'volume': 1000
|
|
},
|
|
{
|
|
'timestamp': pd.Timestamp('2024-01-01 09:01:00'),
|
|
'open': 50025.0, 'high': 50075.0, 'low': 50000.0, 'close': 50050.0, 'volume': 1200
|
|
},
|
|
# ... more minute data
|
|
]
|
|
|
|
# Aggregate to 15-minute bars
|
|
bars_15m = aggregate_minute_data_to_timeframe(minute_data, "15min")
|
|
|
|
# Result: bars with END timestamps (no future data leakage)
|
|
for bar in bars_15m:
|
|
print(f"Bar ending at {bar['timestamp']}: OHLCV = {bar['open']}, {bar['high']}, {bar['low']}, {bar['close']}, {bar['volume']}")
|
|
```
|
|
|
|
### Using MinuteDataBuffer for Real-Time Strategies
|
|
|
|
```python
|
|
from IncrementalTrader.utils.timeframe_utils import MinuteDataBuffer
|
|
|
|
class MyStrategy(IncStrategyBase):
|
|
def __init__(self, name: str = "my_strategy", weight: float = 1.0, params: Optional[Dict] = None):
|
|
super().__init__(name, weight, params)
|
|
self.timeframe = self.params.get("timeframe", "15min")
|
|
self.minute_buffer = MinuteDataBuffer(max_size=1440) # 24 hours
|
|
self.last_processed_bar_timestamp = None
|
|
|
|
def calculate_on_data(self, new_data_point: Dict[str, float], timestamp: pd.Timestamp) -> None:
|
|
# Add to buffer
|
|
self.minute_buffer.add(timestamp, new_data_point)
|
|
|
|
# Get latest complete bar
|
|
latest_bar = self.minute_buffer.get_latest_complete_bar(self.timeframe)
|
|
|
|
if latest_bar and latest_bar['timestamp'] != self.last_processed_bar_timestamp:
|
|
# Process new complete bar
|
|
self.last_processed_bar_timestamp = latest_bar['timestamp']
|
|
self._process_complete_bar(latest_bar)
|
|
|
|
def _process_complete_bar(self, bar: Dict[str, float]) -> None:
|
|
# Your strategy logic here
|
|
# bar['timestamp'] is the END of the bar period (no future data)
|
|
pass
|
|
```
|
|
|
|
## Core Functions
|
|
|
|
### aggregate_minute_data_to_timeframe()
|
|
|
|
**Purpose**: Aggregate minute-level OHLCV data to higher timeframes
|
|
|
|
**Signature**:
|
|
```python
|
|
def aggregate_minute_data_to_timeframe(
|
|
minute_data: List[Dict[str, Union[float, pd.Timestamp]]],
|
|
timeframe: str,
|
|
timestamp_mode: str = "end"
|
|
) -> List[Dict[str, Union[float, pd.Timestamp]]]
|
|
```
|
|
|
|
**Parameters**:
|
|
- `minute_data`: List of minute OHLCV dictionaries with 'timestamp' field
|
|
- `timeframe`: Target timeframe ("1min", "5min", "15min", "1h", "4h", "1d")
|
|
- `timestamp_mode`: "end" (default) for bar end timestamps, "start" for bar start
|
|
|
|
**Returns**: List of aggregated OHLCV dictionaries with proper timestamps
|
|
|
|
**Example**:
|
|
```python
|
|
# Aggregate to 5-minute bars with end timestamps
|
|
bars_5m = aggregate_minute_data_to_timeframe(minute_data, "5min", "end")
|
|
|
|
# Aggregate to 1-hour bars with start timestamps
|
|
bars_1h = aggregate_minute_data_to_timeframe(minute_data, "1h", "start")
|
|
```
|
|
|
|
### get_latest_complete_bar()
|
|
|
|
**Purpose**: Get the latest complete bar for real-time processing
|
|
|
|
**Signature**:
|
|
```python
|
|
def get_latest_complete_bar(
|
|
minute_data: List[Dict[str, Union[float, pd.Timestamp]]],
|
|
timeframe: str,
|
|
timestamp_mode: str = "end"
|
|
) -> Optional[Dict[str, Union[float, pd.Timestamp]]]
|
|
```
|
|
|
|
**Example**:
|
|
```python
|
|
# Get latest complete 15-minute bar
|
|
latest_15m = get_latest_complete_bar(minute_data, "15min")
|
|
if latest_15m:
|
|
print(f"Latest complete bar: {latest_15m['timestamp']}")
|
|
```
|
|
|
|
### parse_timeframe_to_minutes()
|
|
|
|
**Purpose**: Parse timeframe strings to minutes
|
|
|
|
**Signature**:
|
|
```python
|
|
def parse_timeframe_to_minutes(timeframe: str) -> int
|
|
```
|
|
|
|
**Supported Formats**:
|
|
- Minutes: "1min", "5min", "15min", "30min"
|
|
- Hours: "1h", "2h", "4h", "6h", "12h"
|
|
- Days: "1d", "7d"
|
|
- Weeks: "1w", "2w"
|
|
|
|
**Example**:
|
|
```python
|
|
minutes = parse_timeframe_to_minutes("15min") # Returns 15
|
|
minutes = parse_timeframe_to_minutes("1h") # Returns 60
|
|
minutes = parse_timeframe_to_minutes("1d") # Returns 1440
|
|
```
|
|
|
|
## MinuteDataBuffer Class
|
|
|
|
### Overview
|
|
|
|
The `MinuteDataBuffer` class provides efficient buffer management for minute-level data with automatic aggregation capabilities.
|
|
|
|
### Key Features
|
|
|
|
- **Memory Bounded**: Configurable maximum size (default: 1440 minutes = 24 hours)
|
|
- **Automatic Cleanup**: Old data automatically removed when buffer is full
|
|
- **Thread Safe**: Safe for use in multi-threaded environments
|
|
- **Efficient Access**: Fast data retrieval and aggregation methods
|
|
|
|
### Basic Usage
|
|
|
|
```python
|
|
from IncrementalTrader.utils.timeframe_utils import MinuteDataBuffer
|
|
|
|
# Create buffer for 24 hours of data
|
|
buffer = MinuteDataBuffer(max_size=1440)
|
|
|
|
# Add minute data
|
|
buffer.add(timestamp, {
|
|
'open': 50000.0,
|
|
'high': 50050.0,
|
|
'low': 49950.0,
|
|
'close': 50025.0,
|
|
'volume': 1000
|
|
})
|
|
|
|
# Get aggregated data
|
|
bars_15m = buffer.aggregate_to_timeframe("15min", lookback_bars=4)
|
|
latest_bar = buffer.get_latest_complete_bar("15min")
|
|
|
|
# Buffer management
|
|
print(f"Buffer size: {buffer.size()}")
|
|
print(f"Is full: {buffer.is_full()}")
|
|
print(f"Time range: {buffer.get_time_range()}")
|
|
```
|
|
|
|
### Methods
|
|
|
|
#### add(timestamp, ohlcv_data)
|
|
Add new minute data point to the buffer.
|
|
|
|
```python
|
|
buffer.add(pd.Timestamp('2024-01-01 09:00:00'), {
|
|
'open': 50000.0, 'high': 50050.0, 'low': 49950.0, 'close': 50025.0, 'volume': 1000
|
|
})
|
|
```
|
|
|
|
#### get_data(lookback_minutes=None)
|
|
Get data from buffer.
|
|
|
|
```python
|
|
# Get all data
|
|
all_data = buffer.get_data()
|
|
|
|
# Get last 60 minutes
|
|
recent_data = buffer.get_data(lookback_minutes=60)
|
|
```
|
|
|
|
#### aggregate_to_timeframe(timeframe, lookback_bars=None, timestamp_mode="end")
|
|
Aggregate buffer data to specified timeframe.
|
|
|
|
```python
|
|
# Get last 4 bars of 15-minute data
|
|
bars = buffer.aggregate_to_timeframe("15min", lookback_bars=4)
|
|
|
|
# Get all available 1-hour bars
|
|
bars = buffer.aggregate_to_timeframe("1h")
|
|
```
|
|
|
|
#### get_latest_complete_bar(timeframe, timestamp_mode="end")
|
|
Get the latest complete bar for the specified timeframe.
|
|
|
|
```python
|
|
latest_bar = buffer.get_latest_complete_bar("15min")
|
|
if latest_bar:
|
|
print(f"Latest complete bar ends at: {latest_bar['timestamp']}")
|
|
```
|
|
|
|
## Timestamp Modes
|
|
|
|
### "end" Mode (Default - Recommended)
|
|
|
|
- **Bar timestamps represent the END of the bar period**
|
|
- **Prevents future data leakage**
|
|
- **Safe for real-time trading**
|
|
|
|
```python
|
|
# 5-minute bar from 09:00-09:04 is timestamped 09:05
|
|
bars = aggregate_minute_data_to_timeframe(data, "5min", "end")
|
|
```
|
|
|
|
### "start" Mode
|
|
|
|
- **Bar timestamps represent the START of the bar period**
|
|
- **Matches some external data sources**
|
|
- **Use with caution in real-time systems**
|
|
|
|
```python
|
|
# 5-minute bar from 09:00-09:04 is timestamped 09:00
|
|
bars = aggregate_minute_data_to_timeframe(data, "5min", "start")
|
|
```
|
|
|
|
## Best Practices
|
|
|
|
### 1. Always Use "end" Mode for Real-Time Trading
|
|
|
|
```python
|
|
# ✅ GOOD: Prevents future data leakage
|
|
bars = aggregate_minute_data_to_timeframe(data, "15min", "end")
|
|
|
|
# ❌ RISKY: Could lead to future data leakage
|
|
bars = aggregate_minute_data_to_timeframe(data, "15min", "start")
|
|
```
|
|
|
|
### 2. Use MinuteDataBuffer for Strategies
|
|
|
|
```python
|
|
# ✅ GOOD: Efficient memory management
|
|
class MyStrategy(IncStrategyBase):
|
|
def __init__(self, ...):
|
|
self.buffer = MinuteDataBuffer(max_size=1440) # 24 hours
|
|
|
|
def calculate_on_data(self, data, timestamp):
|
|
self.buffer.add(timestamp, data)
|
|
latest_bar = self.buffer.get_latest_complete_bar(self.timeframe)
|
|
# Process latest_bar...
|
|
|
|
# ❌ INEFFICIENT: Keeping all data in memory
|
|
class BadStrategy(IncStrategyBase):
|
|
def __init__(self, ...):
|
|
self.all_data = [] # Grows indefinitely
|
|
```
|
|
|
|
### 3. Check for Complete Bars
|
|
|
|
```python
|
|
# ✅ GOOD: Only process complete bars
|
|
latest_bar = buffer.get_latest_complete_bar("15min")
|
|
if latest_bar and latest_bar['timestamp'] != self.last_processed:
|
|
self.process_bar(latest_bar)
|
|
self.last_processed = latest_bar['timestamp']
|
|
|
|
# ❌ BAD: Processing incomplete bars
|
|
bars = buffer.aggregate_to_timeframe("15min")
|
|
if bars:
|
|
self.process_bar(bars[-1]) # Might be incomplete!
|
|
```
|
|
|
|
### 4. Handle Edge Cases
|
|
|
|
```python
|
|
# ✅ GOOD: Robust error handling
|
|
try:
|
|
bars = aggregate_minute_data_to_timeframe(data, timeframe)
|
|
if bars:
|
|
# Process bars...
|
|
else:
|
|
logger.warning("No complete bars available")
|
|
except TimeframeError as e:
|
|
logger.error(f"Invalid timeframe: {e}")
|
|
except ValueError as e:
|
|
logger.error(f"Invalid data: {e}")
|
|
|
|
# ❌ BAD: No error handling
|
|
bars = aggregate_minute_data_to_timeframe(data, timeframe)
|
|
latest_bar = bars[-1] # Could crash if bars is empty!
|
|
```
|
|
|
|
### 5. Optimize Buffer Size
|
|
|
|
```python
|
|
# ✅ GOOD: Size buffer based on strategy needs
|
|
# For 15min strategy needing 20 bars lookback: 20 * 15 = 300 minutes
|
|
buffer = MinuteDataBuffer(max_size=300)
|
|
|
|
# For daily strategy: 24 * 60 = 1440 minutes
|
|
buffer = MinuteDataBuffer(max_size=1440)
|
|
|
|
# ❌ WASTEFUL: Oversized buffer
|
|
buffer = MinuteDataBuffer(max_size=10080) # 1 week for 15min strategy
|
|
```
|
|
|
|
## Performance Considerations
|
|
|
|
### Memory Usage
|
|
|
|
- **MinuteDataBuffer**: ~1KB per minute of data
|
|
- **1440 minutes (24h)**: ~1.4MB memory usage
|
|
- **Automatic cleanup**: Old data removed when buffer is full
|
|
|
|
### Processing Speed
|
|
|
|
- **Small datasets (< 500 minutes)**: < 5ms aggregation time
|
|
- **Large datasets (2000+ minutes)**: < 15ms aggregation time
|
|
- **Real-time processing**: < 2ms per minute update
|
|
|
|
### Optimization Tips
|
|
|
|
1. **Use appropriate buffer sizes** - don't keep more data than needed
|
|
2. **Process complete bars only** - avoid reprocessing incomplete bars
|
|
3. **Cache aggregated results** - don't re-aggregate the same data
|
|
4. **Use lookback_bars parameter** - limit returned data to what you need
|
|
|
|
```python
|
|
# ✅ OPTIMIZED: Only get what you need
|
|
recent_bars = buffer.aggregate_to_timeframe("15min", lookback_bars=20)
|
|
|
|
# ❌ INEFFICIENT: Getting all data every time
|
|
all_bars = buffer.aggregate_to_timeframe("15min")
|
|
recent_bars = all_bars[-20:] # Wasteful
|
|
```
|
|
|
|
## Common Patterns
|
|
|
|
### Pattern 1: Simple Strategy with Buffer
|
|
|
|
```python
|
|
class TrendStrategy(IncStrategyBase):
|
|
def __init__(self, name: str = "trend", weight: float = 1.0, params: Optional[Dict] = None):
|
|
super().__init__(name, weight, params)
|
|
self.timeframe = self.params.get("timeframe", "15min")
|
|
self.lookback_period = self.params.get("lookback_period", 20)
|
|
|
|
# Calculate buffer size: lookback_period * timeframe_minutes
|
|
timeframe_minutes = parse_timeframe_to_minutes(self.timeframe)
|
|
buffer_size = self.lookback_period * timeframe_minutes
|
|
self.buffer = MinuteDataBuffer(max_size=buffer_size)
|
|
|
|
self.last_processed_timestamp = None
|
|
|
|
def calculate_on_data(self, new_data_point: Dict[str, float], timestamp: pd.Timestamp) -> None:
|
|
# Add to buffer
|
|
self.buffer.add(timestamp, new_data_point)
|
|
|
|
# Get latest complete bar
|
|
latest_bar = self.buffer.get_latest_complete_bar(self.timeframe)
|
|
|
|
if latest_bar and latest_bar['timestamp'] != self.last_processed_timestamp:
|
|
# Get historical bars for analysis
|
|
historical_bars = self.buffer.aggregate_to_timeframe(
|
|
self.timeframe,
|
|
lookback_bars=self.lookback_period
|
|
)
|
|
|
|
if len(historical_bars) >= self.lookback_period:
|
|
signal = self._analyze_trend(historical_bars)
|
|
if signal:
|
|
self._generate_signal(signal, latest_bar['timestamp'])
|
|
|
|
self.last_processed_timestamp = latest_bar['timestamp']
|
|
|
|
def _analyze_trend(self, bars: List[Dict]) -> Optional[str]:
|
|
# Your trend analysis logic here
|
|
closes = [bar['close'] for bar in bars]
|
|
# ... analysis ...
|
|
return "BUY" if trend_up else "SELL" if trend_down else None
|
|
```
|
|
|
|
### Pattern 2: Multi-Timeframe Strategy
|
|
|
|
```python
|
|
class MultiTimeframeStrategy(IncStrategyBase):
|
|
def __init__(self, name: str = "multi_tf", weight: float = 1.0, params: Optional[Dict] = None):
|
|
super().__init__(name, weight, params)
|
|
self.primary_timeframe = self.params.get("primary_timeframe", "15min")
|
|
self.secondary_timeframe = self.params.get("secondary_timeframe", "1h")
|
|
|
|
# Buffer size for the largest timeframe needed
|
|
max_timeframe_minutes = max(
|
|
parse_timeframe_to_minutes(self.primary_timeframe),
|
|
parse_timeframe_to_minutes(self.secondary_timeframe)
|
|
)
|
|
buffer_size = 50 * max_timeframe_minutes # 50 bars of largest timeframe
|
|
self.buffer = MinuteDataBuffer(max_size=buffer_size)
|
|
|
|
self.last_processed = {
|
|
self.primary_timeframe: None,
|
|
self.secondary_timeframe: None
|
|
}
|
|
|
|
def calculate_on_data(self, new_data_point: Dict[str, float], timestamp: pd.Timestamp) -> None:
|
|
self.buffer.add(timestamp, new_data_point)
|
|
|
|
# Check both timeframes
|
|
for timeframe in [self.primary_timeframe, self.secondary_timeframe]:
|
|
latest_bar = self.buffer.get_latest_complete_bar(timeframe)
|
|
|
|
if latest_bar and latest_bar['timestamp'] != self.last_processed[timeframe]:
|
|
self._process_timeframe(timeframe, latest_bar)
|
|
self.last_processed[timeframe] = latest_bar['timestamp']
|
|
|
|
def _process_timeframe(self, timeframe: str, latest_bar: Dict) -> None:
|
|
if timeframe == self.primary_timeframe:
|
|
# Primary timeframe logic
|
|
pass
|
|
elif timeframe == self.secondary_timeframe:
|
|
# Secondary timeframe logic
|
|
pass
|
|
```
|
|
|
|
### Pattern 3: Backtesting with Historical Data
|
|
|
|
```python
|
|
def backtest_strategy(strategy_class, historical_data: List[Dict], params: Dict):
|
|
"""Run backtest with historical minute data."""
|
|
strategy = strategy_class("backtest", params=params)
|
|
|
|
signals = []
|
|
|
|
# Process data chronologically
|
|
for data_point in historical_data:
|
|
timestamp = data_point['timestamp']
|
|
ohlcv = {k: v for k, v in data_point.items() if k != 'timestamp'}
|
|
|
|
# Process data point
|
|
signal = strategy.process_data_point(timestamp, ohlcv)
|
|
|
|
if signal and signal.signal_type != "HOLD":
|
|
signals.append({
|
|
'timestamp': timestamp,
|
|
'signal_type': signal.signal_type,
|
|
'confidence': signal.confidence
|
|
})
|
|
|
|
return signals
|
|
|
|
# Usage
|
|
historical_data = load_historical_data("BTCUSD", "2024-01-01", "2024-01-31")
|
|
signals = backtest_strategy(TrendStrategy, historical_data, {"timeframe": "15min"})
|
|
```
|
|
|
|
## Error Handling
|
|
|
|
### Common Errors and Solutions
|
|
|
|
#### TimeframeError
|
|
```python
|
|
try:
|
|
bars = aggregate_minute_data_to_timeframe(data, "invalid_timeframe")
|
|
except TimeframeError as e:
|
|
logger.error(f"Invalid timeframe: {e}")
|
|
# Use default timeframe
|
|
bars = aggregate_minute_data_to_timeframe(data, "15min")
|
|
```
|
|
|
|
#### ValueError (Invalid Data)
|
|
```python
|
|
try:
|
|
buffer.add(timestamp, ohlcv_data)
|
|
except ValueError as e:
|
|
logger.error(f"Invalid data: {e}")
|
|
# Skip this data point
|
|
continue
|
|
```
|
|
|
|
#### Empty Data
|
|
```python
|
|
bars = aggregate_minute_data_to_timeframe(minute_data, "15min")
|
|
if not bars:
|
|
logger.warning("No complete bars available")
|
|
return
|
|
|
|
latest_bar = get_latest_complete_bar(minute_data, "15min")
|
|
if latest_bar is None:
|
|
logger.warning("No complete bar available")
|
|
return
|
|
```
|
|
|
|
## Migration from Old System
|
|
|
|
### Before (Old TimeframeAggregator)
|
|
```python
|
|
# Old approach - potential future data leakage
|
|
class OldStrategy(IncStrategyBase):
|
|
def __init__(self, ...):
|
|
self.aggregator = TimeframeAggregator(timeframe="15min")
|
|
|
|
def calculate_on_data(self, data, timestamp):
|
|
# Potential issues:
|
|
# - Bar timestamps might represent start (future data leakage)
|
|
# - Inconsistent aggregation logic
|
|
# - Memory not bounded
|
|
pass
|
|
```
|
|
|
|
### After (New Utilities)
|
|
```python
|
|
# New approach - safe and efficient
|
|
class NewStrategy(IncStrategyBase):
|
|
def __init__(self, ...):
|
|
self.buffer = MinuteDataBuffer(max_size=1440)
|
|
self.timeframe = "15min"
|
|
self.last_processed = None
|
|
|
|
def calculate_on_data(self, data, timestamp):
|
|
self.buffer.add(timestamp, data)
|
|
latest_bar = self.buffer.get_latest_complete_bar(self.timeframe)
|
|
|
|
if latest_bar and latest_bar['timestamp'] != self.last_processed:
|
|
# Safe: bar timestamp is END of period (no future data)
|
|
# Efficient: bounded memory usage
|
|
# Correct: matches pandas resampling
|
|
self.process_bar(latest_bar)
|
|
self.last_processed = latest_bar['timestamp']
|
|
```
|
|
|
|
### Migration Checklist
|
|
|
|
- [ ] Replace `TimeframeAggregator` with `MinuteDataBuffer`
|
|
- [ ] Update timestamp handling to use "end" mode
|
|
- [ ] Add checks for complete bars only
|
|
- [ ] Set appropriate buffer sizes
|
|
- [ ] Update error handling
|
|
- [ ] Test with historical data
|
|
- [ ] Verify no future data leakage
|
|
|
|
## Troubleshooting
|
|
|
|
### Issue: No bars returned
|
|
**Cause**: Not enough data for complete bars
|
|
**Solution**: Check data length vs timeframe requirements
|
|
|
|
```python
|
|
timeframe_minutes = parse_timeframe_to_minutes("15min") # 15
|
|
if len(minute_data) < timeframe_minutes:
|
|
logger.warning(f"Need at least {timeframe_minutes} minutes for {timeframe} bars")
|
|
```
|
|
|
|
### Issue: Memory usage growing
|
|
**Cause**: Buffer size too large or not using buffer
|
|
**Solution**: Optimize buffer size
|
|
|
|
```python
|
|
# Calculate optimal buffer size
|
|
lookback_bars = 20
|
|
timeframe_minutes = parse_timeframe_to_minutes("15min")
|
|
optimal_size = lookback_bars * timeframe_minutes # 300 minutes
|
|
buffer = MinuteDataBuffer(max_size=optimal_size)
|
|
```
|
|
|
|
### Issue: Signals generated too frequently
|
|
**Cause**: Processing incomplete bars
|
|
**Solution**: Only process complete bars
|
|
|
|
```python
|
|
# ✅ CORRECT: Only process new complete bars
|
|
if latest_bar and latest_bar['timestamp'] != self.last_processed:
|
|
self.process_bar(latest_bar)
|
|
self.last_processed = latest_bar['timestamp']
|
|
|
|
# ❌ WRONG: Processing every minute
|
|
self.process_bar(latest_bar) # Processes same bar multiple times
|
|
```
|
|
|
|
### Issue: Inconsistent results
|
|
**Cause**: Using "start" mode or wrong pandas comparison
|
|
**Solution**: Use "end" mode and trading standard comparison
|
|
|
|
```python
|
|
# ✅ CORRECT: Trading standard with end timestamps
|
|
bars = aggregate_minute_data_to_timeframe(data, "15min", "end")
|
|
|
|
# ❌ INCONSISTENT: Start mode can cause confusion
|
|
bars = aggregate_minute_data_to_timeframe(data, "15min", "start")
|
|
```
|
|
|
|
---
|
|
|
|
## Summary
|
|
|
|
The new timeframe aggregation system provides:
|
|
|
|
- **✅ Mathematical Correctness**: Matches pandas resampling exactly
|
|
- **✅ No Future Data Leakage**: Bar end timestamps prevent future data usage
|
|
- **✅ Trading Industry Standard**: Compatible with major trading platforms
|
|
- **✅ Memory Efficient**: Bounded buffer management
|
|
- **✅ Performance Optimized**: Fast real-time processing
|
|
- **✅ Easy to Use**: Simple, intuitive API
|
|
|
|
Use this guide to implement robust, efficient timeframe aggregation in your trading strategies! |