Cycles/test/test_bbrs_incremental.py
Vasily.onl bd6a0f05d7 Implement Incremental BBRS Strategy for Real-time Data Processing
- Introduced `BBRSIncrementalState` for real-time processing of the Bollinger Bands + RSI strategy, allowing minute-level data input and internal timeframe aggregation.
- Added `TimeframeAggregator` class to handle real-time data aggregation to higher timeframes (15min, 1h, etc.).
- Updated `README_BBRS.md` to document the new incremental strategy, including key features and usage examples.
- Created comprehensive tests to validate the incremental strategy against the original implementation, ensuring signal accuracy and performance consistency.
- Enhanced error handling and logging for better monitoring during real-time processing.
- Updated `TODO.md` to reflect the completion of the incremental BBRS strategy implementation.
2025-05-26 16:46:04 +08:00

289 lines
12 KiB
Python

"""
Test Incremental BBRS Strategy vs Original Implementation
This script validates that the incremental BBRS strategy produces
equivalent results to the original batch implementation.
"""
import pandas as pd
import numpy as np
import logging
from datetime import datetime
import matplotlib.pyplot as plt
# Import original implementation
from cycles.Analysis.bb_rsi import BollingerBandsStrategy
# Import incremental implementation
from cycles.IncStrategies.bbrs_incremental import BBRSIncrementalState
# Import storage utility
from cycles.utils.storage import Storage
# Import aggregation function to match original behavior
from cycles.utils.data_utils import aggregate_to_minutes
# Setup logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.FileHandler("test_bbrs_incremental.log"),
logging.StreamHandler()
]
)
def load_test_data():
"""Load 2023-2024 BTC data for testing."""
storage = Storage(logging=logging)
# Load data for testing period
start_date = "2023-01-01"
end_date = "2023-01-07" # One week for faster testing
data = storage.load_data("btcusd_1-min_data.csv", start_date, end_date)
if data.empty:
logging.error("No data loaded for testing period")
return None
logging.info(f"Loaded {len(data)} rows of data from {data.index[0]} to {data.index[-1]}")
return data
def test_bbrs_strategy_comparison():
"""Test incremental BBRS vs original implementation."""
# Load test data
data = load_test_data()
if data is None:
return
# Use subset for testing
test_data = data.copy() # First 5000 rows
logging.info(f"Using {len(test_data)} rows for testing")
# Aggregate to hourly to match original strategy
hourly_data = data = aggregate_to_minutes(data, 15)
# hourly_data = test_data.copy()
logging.info(f"Aggregated to {len(hourly_data)} hourly data points")
# Configuration
config = {
"bb_width": 0.05,
"bb_period": 20,
"rsi_period": 14,
"trending": {
"rsi_threshold": [30, 70],
"bb_std_dev_multiplier": 2.5,
},
"sideways": {
"rsi_threshold": [40, 60],
"bb_std_dev_multiplier": 1.8,
},
"strategy_name": "MarketRegimeStrategy",
"SqueezeStrategy": True
}
logging.info("Testing original BBRS implementation...")
# Original implementation (already aggregates internally)
original_strategy = BollingerBandsStrategy(config=config, logging=logging)
original_result = original_strategy.run(test_data.copy(), "MarketRegimeStrategy")
logging.info("Testing incremental BBRS implementation...")
# Incremental implementation (use pre-aggregated data)
incremental_strategy = BBRSIncrementalState(config)
incremental_results = []
# Process hourly data incrementally
for i, (timestamp, row) in enumerate(hourly_data.iterrows()):
ohlcv_data = {
'open': row['open'],
'high': row['high'],
'low': row['low'],
'close': row['close'],
'volume': row['volume']
}
result = incremental_strategy.update(ohlcv_data)
result['timestamp'] = timestamp
incremental_results.append(result)
if i % 50 == 0: # Log every 50 hourly points
logging.info(f"Processed {i+1}/{len(hourly_data)} hourly data points")
# Convert incremental results to DataFrame
incremental_df = pd.DataFrame(incremental_results)
incremental_df.set_index('timestamp', inplace=True)
logging.info("Comparing results...")
# Compare key metrics after warm-up period
warmup_period = max(config["bb_period"], config["rsi_period"]) + 20 # Add volume MA period
if len(original_result) > warmup_period and len(incremental_df) > warmup_period:
# Compare after warm-up
orig_warmed = original_result.iloc[warmup_period:]
inc_warmed = incremental_df.iloc[warmup_period:]
# Align indices
common_index = orig_warmed.index.intersection(inc_warmed.index)
orig_aligned = orig_warmed.loc[common_index]
inc_aligned = inc_warmed.loc[common_index]
logging.info(f"Comparing {len(common_index)} aligned data points after warm-up")
# Compare signals
if 'BuySignal' in orig_aligned.columns and 'buy_signal' in inc_aligned.columns:
buy_signal_match = (orig_aligned['BuySignal'] == inc_aligned['buy_signal']).mean()
logging.info(f"Buy signal match rate: {buy_signal_match:.4f} ({buy_signal_match*100:.2f}%)")
buy_signals_orig = orig_aligned['BuySignal'].sum()
buy_signals_inc = inc_aligned['buy_signal'].sum()
logging.info(f"Buy signals - Original: {buy_signals_orig}, Incremental: {buy_signals_inc}")
if 'SellSignal' in orig_aligned.columns and 'sell_signal' in inc_aligned.columns:
sell_signal_match = (orig_aligned['SellSignal'] == inc_aligned['sell_signal']).mean()
logging.info(f"Sell signal match rate: {sell_signal_match:.4f} ({sell_signal_match*100:.2f}%)")
sell_signals_orig = orig_aligned['SellSignal'].sum()
sell_signals_inc = inc_aligned['sell_signal'].sum()
logging.info(f"Sell signals - Original: {sell_signals_orig}, Incremental: {sell_signals_inc}")
# Compare RSI values
if 'RSI' in orig_aligned.columns and 'rsi' in inc_aligned.columns:
# Filter out NaN values
valid_mask = ~(orig_aligned['RSI'].isna() | inc_aligned['rsi'].isna())
if valid_mask.sum() > 0:
rsi_orig = orig_aligned['RSI'][valid_mask]
rsi_inc = inc_aligned['rsi'][valid_mask]
rsi_diff = np.abs(rsi_orig - rsi_inc)
rsi_max_diff = rsi_diff.max()
rsi_mean_diff = rsi_diff.mean()
logging.info(f"RSI comparison - Max diff: {rsi_max_diff:.6f}, Mean diff: {rsi_mean_diff:.6f}")
# Compare Bollinger Bands
bb_comparisons = [
('UpperBand', 'upper_band'),
('LowerBand', 'lower_band'),
('SMA', 'middle_band')
]
for orig_col, inc_col in bb_comparisons:
if orig_col in orig_aligned.columns and inc_col in inc_aligned.columns:
valid_mask = ~(orig_aligned[orig_col].isna() | inc_aligned[inc_col].isna())
if valid_mask.sum() > 0:
orig_vals = orig_aligned[orig_col][valid_mask]
inc_vals = inc_aligned[inc_col][valid_mask]
diff = np.abs(orig_vals - inc_vals)
max_diff = diff.max()
mean_diff = diff.mean()
logging.info(f"{orig_col} comparison - Max diff: {max_diff:.6f}, Mean diff: {mean_diff:.6f}")
# Plot comparison for visual inspection
plot_comparison(orig_aligned, inc_aligned)
else:
logging.warning("Not enough data after warm-up period for comparison")
def plot_comparison(original_df, incremental_df, save_path="bbrs_strategy_comparison.png"):
"""Plot comparison between original and incremental BBRS strategies."""
# Plot first 1000 points for visibility
plot_points = min(1000, len(original_df), len(incremental_df))
fig, axes = plt.subplots(4, 1, figsize=(15, 12))
x_range = range(plot_points)
# Plot 1: Price and Bollinger Bands
if all(col in original_df.columns for col in ['close', 'UpperBand', 'LowerBand', 'SMA']):
axes[0].plot(x_range, original_df['close'].iloc[:plot_points], 'k-', label='Price', alpha=0.7)
axes[0].plot(x_range, original_df['UpperBand'].iloc[:plot_points], 'b-', label='Original Upper BB', alpha=0.7)
axes[0].plot(x_range, original_df['SMA'].iloc[:plot_points], 'g-', label='Original SMA', alpha=0.7)
axes[0].plot(x_range, original_df['LowerBand'].iloc[:plot_points], 'r-', label='Original Lower BB', alpha=0.7)
if all(col in incremental_df.columns for col in ['upper_band', 'lower_band', 'middle_band']):
axes[0].plot(x_range, incremental_df['upper_band'].iloc[:plot_points], 'b--', label='Incremental Upper BB', alpha=0.7)
axes[0].plot(x_range, incremental_df['middle_band'].iloc[:plot_points], 'g--', label='Incremental SMA', alpha=0.7)
axes[0].plot(x_range, incremental_df['lower_band'].iloc[:plot_points], 'r--', label='Incremental Lower BB', alpha=0.7)
axes[0].set_title('Bollinger Bands Comparison')
axes[0].legend()
axes[0].grid(True)
# Plot 2: RSI
if 'RSI' in original_df.columns and 'rsi' in incremental_df.columns:
axes[1].plot(x_range, original_df['RSI'].iloc[:plot_points], 'b-', label='Original RSI', alpha=0.7)
axes[1].plot(x_range, incremental_df['rsi'].iloc[:plot_points], 'r--', label='Incremental RSI', alpha=0.7)
axes[1].axhline(y=70, color='gray', linestyle=':', alpha=0.5)
axes[1].axhline(y=30, color='gray', linestyle=':', alpha=0.5)
axes[1].set_title('RSI Comparison')
axes[1].legend()
axes[1].grid(True)
# Plot 3: Buy/Sell Signals
if 'BuySignal' in original_df.columns and 'buy_signal' in incremental_df.columns:
buy_orig = original_df['BuySignal'].iloc[:plot_points]
buy_inc = incremental_df['buy_signal'].iloc[:plot_points]
# Plot as scatter points where signals occur
buy_orig_idx = [i for i, val in enumerate(buy_orig) if val]
buy_inc_idx = [i for i, val in enumerate(buy_inc) if val]
axes[2].scatter(buy_orig_idx, [1]*len(buy_orig_idx), color='green', marker='^',
label='Original Buy', alpha=0.7, s=30)
axes[2].scatter(buy_inc_idx, [0.8]*len(buy_inc_idx), color='blue', marker='^',
label='Incremental Buy', alpha=0.7, s=30)
if 'SellSignal' in original_df.columns and 'sell_signal' in incremental_df.columns:
sell_orig = original_df['SellSignal'].iloc[:plot_points]
sell_inc = incremental_df['sell_signal'].iloc[:plot_points]
sell_orig_idx = [i for i, val in enumerate(sell_orig) if val]
sell_inc_idx = [i for i, val in enumerate(sell_inc) if val]
axes[2].scatter(sell_orig_idx, [0.6]*len(sell_orig_idx), color='red', marker='v',
label='Original Sell', alpha=0.7, s=30)
axes[2].scatter(sell_inc_idx, [0.4]*len(sell_inc_idx), color='orange', marker='v',
label='Incremental Sell', alpha=0.7, s=30)
axes[2].set_title('Trading Signals Comparison')
axes[2].legend()
axes[2].grid(True)
axes[2].set_ylim(0, 1.2)
# Plot 4: Market Regime
if 'market_regime' in incremental_df.columns:
regime_numeric = [1 if regime == 'sideways' else 0 for regime in incremental_df['market_regime'].iloc[:plot_points]]
axes[3].plot(x_range, regime_numeric, 'purple', label='Market Regime (1=Sideways, 0=Trending)', alpha=0.7)
axes[3].set_title('Market Regime Detection')
axes[3].legend()
axes[3].grid(True)
axes[3].set_xlabel('Time Index')
plt.tight_layout()
plt.savefig(save_path, dpi=300, bbox_inches='tight')
logging.info(f"Comparison plot saved to {save_path}")
plt.show()
def main():
"""Main test function."""
logging.info("Starting BBRS incremental strategy validation test")
try:
test_bbrs_strategy_comparison()
logging.info("BBRS incremental strategy test completed successfully!")
except Exception as e:
logging.error(f"Test failed with error: {e}")
raise
if __name__ == "__main__":
main()