""" Original vs Incremental Strategy Comparison Plot This script creates plots comparing: 1. Original DefaultStrategy (with bug) 2. Incremental IncMetaTrendStrategy Using full year data from 2022-01-01 to 2023-01-01 """ import pandas as pd import numpy as np import matplotlib.pyplot as plt import matplotlib.dates as mdates import seaborn as sns import logging from typing import Dict, List, Tuple import os import sys # Add project root to path sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from cycles.strategies.default_strategy import DefaultStrategy from cycles.IncStrategies.metatrend_strategy import IncMetaTrendStrategy from cycles.utils.storage import Storage # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # Set style for better plots plt.style.use('seaborn-v0_8') sns.set_palette("husl") class OriginalVsIncrementalPlotter: """Class to create comparison plots between original and incremental strategies.""" def __init__(self): """Initialize the plotter.""" self.storage = Storage(logging=logger) self.test_data = None self.original_signals = [] self.incremental_signals = [] self.original_meta_trend = None self.incremental_meta_trend = [] self.individual_trends = [] def load_and_prepare_data(self, start_date: str = "2023-01-01", end_date: str = "2024-01-01") -> pd.DataFrame: """Load test data for the specified date range.""" logger.info(f"Loading data from {start_date} to {end_date}") try: # Load data for the full year filename = "btcusd_1-min_data.csv" start_dt = pd.to_datetime(start_date) end_dt = pd.to_datetime(end_date) df = self.storage.load_data(filename, start_dt, end_dt) # Reset index to get timestamp as column df_with_timestamp = df.reset_index() self.test_data = df_with_timestamp logger.info(f"Loaded {len(df_with_timestamp)} data points") logger.info(f"Date range: {df_with_timestamp['timestamp'].min()} to {df_with_timestamp['timestamp'].max()}") return df_with_timestamp except Exception as e: logger.error(f"Failed to load test data: {e}") raise def run_original_strategy(self) -> Tuple[List[Dict], np.ndarray]: """Run original strategy and extract signals and meta-trend.""" logger.info("Running Original DefaultStrategy...") # Create indexed DataFrame for original strategy indexed_data = self.test_data.set_index('timestamp') # Limit to 200 points like original strategy does if len(indexed_data) > 200: original_data_used = indexed_data.tail(200) data_start_index = len(self.test_data) - 200 logger.info(f"Original strategy using last 200 points out of {len(indexed_data)} total") else: original_data_used = indexed_data data_start_index = 0 # Create mock backtester class MockBacktester: def __init__(self, df): self.original_df = df self.min1_df = df self.strategies = {} backtester = MockBacktester(original_data_used) # Initialize original strategy strategy = DefaultStrategy(weight=1.0, params={ "stop_loss_pct": 0.03, "timeframe": "1min" }) strategy.initialize(backtester) # Extract signals and meta-trend signals = [] meta_trend = strategy.meta_trend for i in range(len(original_data_used)): # Get entry signal entry_signal = strategy.get_entry_signal(backtester, i) if entry_signal.signal_type == "ENTRY": signals.append({ 'index': i, 'global_index': data_start_index + i, 'timestamp': original_data_used.index[i], 'close': original_data_used.iloc[i]['close'], 'signal_type': 'ENTRY', 'confidence': entry_signal.confidence, 'source': 'original' }) # Get exit signal exit_signal = strategy.get_exit_signal(backtester, i) if exit_signal.signal_type == "EXIT": signals.append({ 'index': i, 'global_index': data_start_index + i, 'timestamp': original_data_used.index[i], 'close': original_data_used.iloc[i]['close'], 'signal_type': 'EXIT', 'confidence': exit_signal.confidence, 'source': 'original' }) logger.info(f"Original strategy generated {len(signals)} signals") # Count signal types entry_count = len([s for s in signals if s['signal_type'] == 'ENTRY']) exit_count = len([s for s in signals if s['signal_type'] == 'EXIT']) logger.info(f"Original: {entry_count} entries, {exit_count} exits") return signals, meta_trend, data_start_index def run_incremental_strategy(self, data_start_index: int = 0) -> Tuple[List[Dict], List[int], List[List[int]]]: """Run incremental strategy and extract signals, meta-trend, and individual trends.""" logger.info("Running Incremental IncMetaTrendStrategy...") # Create strategy instance strategy = IncMetaTrendStrategy("metatrend", weight=1.0, params={ "timeframe": "1min", "enable_logging": False }) # Determine data range to match original strategy if len(self.test_data) > 200: test_data_subset = self.test_data.tail(200) logger.info(f"Incremental strategy using last 200 points out of {len(self.test_data)} total") else: test_data_subset = self.test_data # Process data incrementally and collect signals signals = [] meta_trends = [] individual_trends_list = [] for idx, (_, row) in enumerate(test_data_subset.iterrows()): ohlc = { 'open': row['open'], 'high': row['high'], 'low': row['low'], 'close': row['close'] } # Update strategy with new data point strategy.calculate_on_data(ohlc, row['timestamp']) # Get current meta-trend and individual trends current_meta_trend = strategy.get_current_meta_trend() meta_trends.append(current_meta_trend) # Get individual Supertrend states individual_states = strategy.get_individual_supertrend_states() if individual_states and len(individual_states) >= 3: individual_trends = [state.get('current_trend', 0) for state in individual_states] else: individual_trends = [0, 0, 0] # Default if not available individual_trends_list.append(individual_trends) # Check for entry signal entry_signal = strategy.get_entry_signal() if entry_signal.signal_type == "ENTRY": signals.append({ 'index': idx, 'global_index': data_start_index + idx, 'timestamp': row['timestamp'], 'close': row['close'], 'signal_type': 'ENTRY', 'confidence': entry_signal.confidence, 'source': 'incremental' }) # Check for exit signal exit_signal = strategy.get_exit_signal() if exit_signal.signal_type == "EXIT": signals.append({ 'index': idx, 'global_index': data_start_index + idx, 'timestamp': row['timestamp'], 'close': row['close'], 'signal_type': 'EXIT', 'confidence': exit_signal.confidence, 'source': 'incremental' }) logger.info(f"Incremental strategy generated {len(signals)} signals") # Count signal types entry_count = len([s for s in signals if s['signal_type'] == 'ENTRY']) exit_count = len([s for s in signals if s['signal_type'] == 'EXIT']) logger.info(f"Incremental: {entry_count} entries, {exit_count} exits") return signals, meta_trends, individual_trends_list def create_comparison_plot(self, save_path: str = "results/original_vs_incremental_plot.png"): """Create comparison plot between original and incremental strategies.""" logger.info("Creating original vs incremental comparison plot...") # Load and prepare data self.load_and_prepare_data(start_date="2023-01-01", end_date="2024-01-01") # Run both strategies self.original_signals, self.original_meta_trend, data_start_index = self.run_original_strategy() self.incremental_signals, self.incremental_meta_trend, self.individual_trends = self.run_incremental_strategy(data_start_index) # Prepare data for plotting (last 200 points to match strategies) if len(self.test_data) > 200: plot_data = self.test_data.tail(200).copy() else: plot_data = self.test_data.copy() plot_data['timestamp'] = pd.to_datetime(plot_data['timestamp']) # Create figure with subplots fig, axes = plt.subplots(3, 1, figsize=(16, 15)) fig.suptitle('Original vs Incremental MetaTrend Strategy Comparison\n(Data: 2022-01-01 to 2023-01-01)', fontsize=16, fontweight='bold') # Plot 1: Price with signals self._plot_price_with_signals(axes[0], plot_data) # Plot 2: Meta-trend comparison self._plot_meta_trends(axes[1], plot_data) # Plot 3: Signal timing comparison self._plot_signal_timing(axes[2], plot_data) # Adjust layout and save plt.tight_layout() os.makedirs("results", exist_ok=True) plt.savefig(save_path, dpi=300, bbox_inches='tight') logger.info(f"Plot saved to {save_path}") plt.show() def _plot_price_with_signals(self, ax, plot_data): """Plot price data with signals overlaid.""" ax.set_title('BTC Price with Trading Signals', fontsize=14, fontweight='bold') # Plot price ax.plot(plot_data['timestamp'], plot_data['close'], color='black', linewidth=1.5, label='BTC Price', alpha=0.9, zorder=1) # Calculate price range for offset calculation price_range = plot_data['close'].max() - plot_data['close'].min() offset_amount = price_range * 0.02 # 2% of price range for offset # Plot signals with enhanced styling and offsets signal_colors = { 'original': {'ENTRY': '#FF4444', 'EXIT': '#CC0000'}, # Bright red tones 'incremental': {'ENTRY': '#00AA00', 'EXIT': '#006600'} # Bright green tones } signal_markers = {'ENTRY': '^', 'EXIT': 'v'} signal_sizes = {'ENTRY': 150, 'EXIT': 120} # Plot original signals (offset downward) original_entry_plotted = False original_exit_plotted = False for signal in self.original_signals: if signal['index'] < len(plot_data): timestamp = plot_data.iloc[signal['index']]['timestamp'] # Offset original signals downward price = signal['close'] - offset_amount label = None if signal['signal_type'] == 'ENTRY' and not original_entry_plotted: label = "Original Entry (buggy)" original_entry_plotted = True elif signal['signal_type'] == 'EXIT' and not original_exit_plotted: label = "Original Exit (buggy)" original_exit_plotted = True ax.scatter(timestamp, price, c=signal_colors['original'][signal['signal_type']], marker=signal_markers[signal['signal_type']], s=signal_sizes[signal['signal_type']], alpha=0.8, edgecolors='white', linewidth=2, label=label, zorder=3) # Plot incremental signals (offset upward) inc_entry_plotted = False inc_exit_plotted = False for signal in self.incremental_signals: if signal['index'] < len(plot_data): timestamp = plot_data.iloc[signal['index']]['timestamp'] # Offset incremental signals upward price = signal['close'] + offset_amount label = None if signal['signal_type'] == 'ENTRY' and not inc_entry_plotted: label = "Incremental Entry (correct)" inc_entry_plotted = True elif signal['signal_type'] == 'EXIT' and not inc_exit_plotted: label = "Incremental Exit (correct)" inc_exit_plotted = True ax.scatter(timestamp, price, c=signal_colors['incremental'][signal['signal_type']], marker=signal_markers[signal['signal_type']], s=signal_sizes[signal['signal_type']], alpha=0.9, edgecolors='black', linewidth=1.5, label=label, zorder=4) # Add connecting lines to show actual price for offset signals for signal in self.original_signals: if signal['index'] < len(plot_data): timestamp = plot_data.iloc[signal['index']]['timestamp'] actual_price = signal['close'] offset_price = actual_price - offset_amount ax.plot([timestamp, timestamp], [actual_price, offset_price], color=signal_colors['original'][signal['signal_type']], alpha=0.3, linewidth=1, zorder=2) for signal in self.incremental_signals: if signal['index'] < len(plot_data): timestamp = plot_data.iloc[signal['index']]['timestamp'] actual_price = signal['close'] offset_price = actual_price + offset_amount ax.plot([timestamp, timestamp], [actual_price, offset_price], color=signal_colors['incremental'][signal['signal_type']], alpha=0.3, linewidth=1, zorder=2) ax.set_ylabel('Price (USD)') ax.legend(loc='upper left', fontsize=10, framealpha=0.9) ax.grid(True, alpha=0.3) # Format x-axis ax.xaxis.set_major_formatter(mdates.DateFormatter('%m-%d %H:%M')) ax.xaxis.set_major_locator(mdates.DayLocator(interval=1)) plt.setp(ax.xaxis.get_majorticklabels(), rotation=45) # Add text annotation explaining the offset ax.text(0.02, 0.02, 'Note: Original signals offset down, Incremental signals offset up for clarity', transform=ax.transAxes, fontsize=9, style='italic', bbox=dict(boxstyle='round,pad=0.3', facecolor='lightgray', alpha=0.7)) def _plot_meta_trends(self, ax, plot_data): """Plot meta-trend comparison.""" ax.set_title('Meta-Trend Comparison', fontsize=14, fontweight='bold') timestamps = plot_data['timestamp'] # Plot original meta-trend if self.original_meta_trend is not None: ax.plot(timestamps, self.original_meta_trend, color='red', linewidth=2, alpha=0.7, label='Original (with bug)', marker='o', markersize=2) # Plot incremental meta-trend if self.incremental_meta_trend: ax.plot(timestamps, self.incremental_meta_trend, color='green', linewidth=2, alpha=0.8, label='Incremental (correct)', marker='s', markersize=2) # Add horizontal lines for trend levels ax.axhline(y=1, color='lightgreen', linestyle='--', alpha=0.5, label='Uptrend (+1)') ax.axhline(y=0, color='gray', linestyle='-', alpha=0.5, label='Neutral (0)') ax.axhline(y=-1, color='lightcoral', linestyle='--', alpha=0.5, label='Downtrend (-1)') ax.set_ylabel('Meta-Trend Value') ax.set_ylim(-1.5, 1.5) ax.legend(loc='upper left', fontsize=10) ax.grid(True, alpha=0.3) # Format x-axis ax.xaxis.set_major_formatter(mdates.DateFormatter('%m-%d %H:%M')) ax.xaxis.set_major_locator(mdates.DayLocator(interval=1)) plt.setp(ax.xaxis.get_majorticklabels(), rotation=45) def _plot_signal_timing(self, ax, plot_data): """Plot signal timing comparison.""" ax.set_title('Signal Timing Comparison', fontsize=14, fontweight='bold') timestamps = plot_data['timestamp'] # Create signal arrays original_entry = np.zeros(len(timestamps)) original_exit = np.zeros(len(timestamps)) inc_entry = np.zeros(len(timestamps)) inc_exit = np.zeros(len(timestamps)) # Fill signal arrays for signal in self.original_signals: if signal['index'] < len(timestamps): if signal['signal_type'] == 'ENTRY': original_entry[signal['index']] = 1 else: original_exit[signal['index']] = -1 for signal in self.incremental_signals: if signal['index'] < len(timestamps): if signal['signal_type'] == 'ENTRY': inc_entry[signal['index']] = 1 else: inc_exit[signal['index']] = -1 # Plot signals as vertical lines and markers y_positions = [2, 1] labels = ['Original (with bug)', 'Incremental (correct)'] colors = ['red', 'green'] for i, (entry_signals, exit_signals, label, color) in enumerate(zip( [original_entry, inc_entry], [original_exit, inc_exit], labels, colors )): y_pos = y_positions[i] # Plot entry signals entry_indices = np.where(entry_signals == 1)[0] for idx in entry_indices: ax.axvline(x=timestamps.iloc[idx], ymin=(y_pos-0.3)/3, ymax=(y_pos+0.3)/3, color=color, linewidth=2, alpha=0.8) ax.scatter(timestamps.iloc[idx], y_pos, marker='^', s=60, color=color, alpha=0.8) # Plot exit signals exit_indices = np.where(exit_signals == -1)[0] for idx in exit_indices: ax.axvline(x=timestamps.iloc[idx], ymin=(y_pos-0.3)/3, ymax=(y_pos+0.3)/3, color=color, linewidth=2, alpha=0.8) ax.scatter(timestamps.iloc[idx], y_pos, marker='v', s=60, color=color, alpha=0.8) ax.set_yticks(y_positions) ax.set_yticklabels(labels) ax.set_ylabel('Strategy') ax.set_ylim(0.5, 2.5) ax.grid(True, alpha=0.3) # Format x-axis ax.xaxis.set_major_formatter(mdates.DateFormatter('%m-%d %H:%M')) ax.xaxis.set_major_locator(mdates.DayLocator(interval=1)) plt.setp(ax.xaxis.get_majorticklabels(), rotation=45) # Add legend from matplotlib.lines import Line2D legend_elements = [ Line2D([0], [0], marker='^', color='gray', linestyle='None', markersize=8, label='Entry Signal'), Line2D([0], [0], marker='v', color='gray', linestyle='None', markersize=8, label='Exit Signal') ] ax.legend(handles=legend_elements, loc='upper right', fontsize=10) # Add signal count text orig_entries = len([s for s in self.original_signals if s['signal_type'] == 'ENTRY']) orig_exits = len([s for s in self.original_signals if s['signal_type'] == 'EXIT']) inc_entries = len([s for s in self.incremental_signals if s['signal_type'] == 'ENTRY']) inc_exits = len([s for s in self.incremental_signals if s['signal_type'] == 'EXIT']) ax.text(0.02, 0.98, f'Original: {orig_entries} entries, {orig_exits} exits\nIncremental: {inc_entries} entries, {inc_exits} exits', transform=ax.transAxes, fontsize=10, verticalalignment='top', bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.8)) def main(): """Create and display the original vs incremental comparison plot.""" plotter = OriginalVsIncrementalPlotter() plotter.create_comparison_plot() if __name__ == "__main__": main()