From 3e94387dcb72003823b5ac79e951c5095edca405 Mon Sep 17 00:00:00 2001 From: "Vasily.onl" Date: Mon, 26 May 2025 14:45:44 +0800 Subject: [PATCH] tested and updated supertrand indicators to give us the same result as in original strategy --- cycles/IncStrategies/indicators/supertrend.py | 19 +- test_metatrend_comparison.py | 796 ++++++++++++++++++ 2 files changed, 806 insertions(+), 9 deletions(-) create mode 100644 test_metatrend_comparison.py diff --git a/cycles/IncStrategies/indicators/supertrend.py b/cycles/IncStrategies/indicators/supertrend.py index 068b0d6..2ad8861 100644 --- a/cycles/IncStrategies/indicators/supertrend.py +++ b/cycles/IncStrategies/indicators/supertrend.py @@ -65,12 +65,12 @@ class SupertrendState(OHLCIndicatorState): # State variables self.previous_close = None - self.previous_trend = 1 # Start with uptrend assumption + self.previous_trend = None # Don't assume initial trend, let first calculation determine it self.final_upper_band = None self.final_lower_band = None # Current values - self.current_trend = 1 + self.current_trend = None self.current_supertrend = None self.is_initialized = True @@ -123,10 +123,11 @@ class SupertrendState(OHLCIndicatorState): # Determine trend if self.previous_close is None: - # First calculation - trend = 1 if close > final_lower_band else -1 + # First calculation - match original logic + # If close <= upper_band, trend is -1 (downtrend), else trend is 1 (uptrend) + trend = -1 if close <= basic_upper_band else 1 else: - # Trend logic + # Trend logic for subsequent calculations if self.previous_trend == 1 and close <= final_lower_band: trend = -1 elif self.previous_trend == -1 and close >= final_upper_band: @@ -174,10 +175,10 @@ class SupertrendState(OHLCIndicatorState): """Reset Supertrend state to initial conditions.""" self.atr_state.reset() self.previous_close = None - self.previous_trend = 1 + self.previous_trend = None self.final_upper_band = None self.final_lower_band = None - self.current_trend = 1 + self.current_trend = None self.current_supertrend = None self.values_received = 0 self._current_values = {} @@ -198,9 +199,9 @@ class SupertrendState(OHLCIndicatorState): Get current trend direction. Returns: - Current trend: +1 for uptrend, -1 for downtrend + Current trend: +1 for uptrend, -1 for downtrend, 0 if not initialized """ - return self.current_trend + return self.current_trend if self.current_trend is not None else 0 def get_current_supertrend_value(self) -> Optional[float]: """ diff --git a/test_metatrend_comparison.py b/test_metatrend_comparison.py new file mode 100644 index 0000000..ce87080 --- /dev/null +++ b/test_metatrend_comparison.py @@ -0,0 +1,796 @@ +""" +MetaTrend Strategy Comparison Test + +This test verifies that our incremental indicators produce identical results +to the original DefaultStrategy (metatrend strategy) implementation. + +The test compares: +1. Individual Supertrend indicators (3 different parameter sets) +2. Meta-trend calculation (agreement between all 3 Supertrends) +3. Entry/exit signal generation +4. Overall strategy behavior + +Test ensures our incremental implementation is mathematically equivalent +to the original batch calculation approach. +""" + +import pandas as pd +import numpy as np +import logging +from typing import Dict, List, Tuple +import os +import sys + +# Add project root to path +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from cycles.strategies.default_strategy import DefaultStrategy +from cycles.IncStrategies.indicators.supertrend import SupertrendState, SupertrendCollection +from cycles.Analysis.supertrend import Supertrends +from cycles.backtest import Backtest + +# Configure logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + + +class MetaTrendComparisonTest: + """ + Comprehensive test suite for comparing original and incremental MetaTrend implementations. + """ + + def __init__(self): + """Initialize the test suite.""" + self.test_data = None + self.original_results = None + self.incremental_results = None + + # Supertrend parameters from original implementation + self.supertrend_params = [ + {"period": 12, "multiplier": 3.0}, + {"period": 10, "multiplier": 1.0}, + {"period": 11, "multiplier": 2.0} + ] + + def load_test_data(self, symbol: str = "BTCUSDT", limit: int = 500) -> pd.DataFrame: + """ + Load test data for comparison. + + Args: + symbol: Trading symbol to load + limit: Number of data points to load + + Returns: + DataFrame with OHLCV data + """ + logger.info(f"Loading test data for {symbol} (limit: {limit})") + + try: + # Try to load from existing data files + data_file = f"data/{symbol}_1m.csv" + if os.path.exists(data_file): + df = pd.read_csv(data_file) + df['timestamp'] = pd.to_datetime(df['timestamp']) + df.set_index('timestamp', inplace=True) + df = df.tail(limit) + logger.info(f"Loaded {len(df)} data points from {data_file}") + else: + # Generate synthetic data for testing if no real data available + logger.warning(f"No data file found at {data_file}, generating synthetic data") + df = self._generate_synthetic_data(limit) + + # Ensure required columns + required_cols = ['open', 'high', 'low', 'close', 'volume'] + for col in required_cols: + if col not in df.columns: + if col == 'volume': + df['volume'] = 1000.0 # Default volume + else: + raise ValueError(f"Missing required column: {col}") + + # Reset index to get timestamp as column for incremental processing + df_with_timestamp = df.reset_index() + + self.test_data = df_with_timestamp + return df_with_timestamp + + except Exception as e: + logger.error(f"Failed to load test data: {e}") + # Fallback to synthetic data + df = self._generate_synthetic_data(limit) + df_with_timestamp = df.reset_index() + self.test_data = df_with_timestamp + return df_with_timestamp + + def _generate_synthetic_data(self, length: int) -> pd.DataFrame: + """Generate synthetic OHLCV data for testing.""" + logger.info(f"Generating {length} synthetic data points") + + np.random.seed(42) # For reproducible results + + # Generate price series with trend and noise + base_price = 50000.0 + trend = np.linspace(0, 0.1, length) # Slight upward trend + noise = np.random.normal(0, 0.02, length) # 2% volatility + + close_prices = base_price * (1 + trend + noise.cumsum() * 0.1) + + # Generate OHLC from close prices + data = [] + timestamps = pd.date_range(start='2024-01-01', periods=length, freq='1min') + + for i in range(length): + close = close_prices[i] + volatility = close * 0.01 # 1% intraday volatility + + high = close + np.random.uniform(0, volatility) + low = close - np.random.uniform(0, volatility) + open_price = low + np.random.uniform(0, high - low) + + # Ensure OHLC relationships + high = max(high, open_price, close) + low = min(low, open_price, close) + + data.append({ + 'timestamp': timestamps[i], + 'open': open_price, + 'high': high, + 'low': low, + 'close': close, + 'volume': np.random.uniform(100, 1000) + }) + + df = pd.DataFrame(data) + # Set timestamp as index for compatibility with original strategy + df.set_index('timestamp', inplace=True) + return df + + def test_original_strategy(self) -> Dict: + """ + Test the original DefaultStrategy implementation. + + Returns: + Dictionary with original strategy results + """ + logger.info("Testing original DefaultStrategy implementation...") + + try: + # Create indexed DataFrame for original strategy (needs DatetimeIndex) + indexed_data = self.test_data.set_index('timestamp') + + # The original strategy limits data to 200 points for performance + # We need to account for this in our comparison + if len(indexed_data) > 200: + original_data_used = indexed_data.tail(200) + logger.info(f"Original strategy will use last {len(original_data_used)} points of {len(indexed_data)} total points") + else: + original_data_used = indexed_data + + # Create a minimal backtest instance for strategy initialization + class MockBacktester: + def __init__(self, df): + self.original_df = df + self.min1_df = df + self.strategies = {} + + backtester = MockBacktester(original_data_used) + + # Initialize original strategy + strategy = DefaultStrategy(weight=1.0, params={ + "stop_loss_pct": 0.03, + "timeframe": "1min" # Use 1min since our test data is 1min + }) + + # Initialize strategy (this calculates meta-trend) + strategy.initialize(backtester) + + # Extract results + if hasattr(strategy, 'meta_trend') and strategy.meta_trend is not None: + meta_trend = strategy.meta_trend + trends = None # Individual trends not directly available from strategy + else: + # Fallback: calculate manually using original Supertrends class + logger.info("Strategy meta_trend not available, calculating manually...") + supertrends = Supertrends(original_data_used, verbose=False) + supertrend_results_list = supertrends.calculate_supertrend_indicators() + + # Extract trend arrays + trends = [st['results']['trend'] for st in supertrend_results_list] + trends_arr = np.stack(trends, axis=1) + + # Calculate meta-trend + meta_trend = np.where( + (trends_arr[:,0] == trends_arr[:,1]) & (trends_arr[:,1] == trends_arr[:,2]), + trends_arr[:,0], + 0 + ) + + # Generate signals + entry_signals = [] + exit_signals = [] + + for i in range(1, len(meta_trend)): + # Entry signal: meta-trend changes from != 1 to == 1 + if meta_trend[i-1] != 1 and meta_trend[i] == 1: + entry_signals.append(i) + + # Exit signal: meta-trend changes to -1 + if meta_trend[i-1] != -1 and meta_trend[i] == -1: + exit_signals.append(i) + + self.original_results = { + 'meta_trend': meta_trend, + 'entry_signals': entry_signals, + 'exit_signals': exit_signals, + 'individual_trends': trends, + 'data_start_index': len(self.test_data) - len(original_data_used) # Track where original data starts + } + + logger.info(f"Original strategy: {len(entry_signals)} entry signals, {len(exit_signals)} exit signals") + logger.info(f"Meta-trend length: {len(meta_trend)}, unique values: {np.unique(meta_trend)}") + return self.original_results + + except Exception as e: + logger.error(f"Original strategy test failed: {e}") + import traceback + traceback.print_exc() + raise + + def test_incremental_indicators(self) -> Dict: + """ + Test the incremental indicators implementation. + + Returns: + Dictionary with incremental results + """ + logger.info("Testing incremental indicators implementation...") + + try: + # Create SupertrendCollection with same parameters as original + supertrend_configs = [ + (params["period"], params["multiplier"]) + for params in self.supertrend_params + ] + + collection = SupertrendCollection(supertrend_configs) + + # Determine data range to match original strategy + data_start_index = self.original_results.get('data_start_index', 0) + test_data_subset = self.test_data.iloc[data_start_index:] + + logger.info(f"Processing incremental indicators on {len(test_data_subset)} points (starting from index {data_start_index})") + + # Process data incrementally + meta_trends = [] + individual_trends_list = [] + + for _, row in test_data_subset.iterrows(): + ohlc = { + 'open': row['open'], + 'high': row['high'], + 'low': row['low'], + 'close': row['close'] + } + + result = collection.update(ohlc) + meta_trends.append(result['meta_trend']) + individual_trends_list.append(result['trends']) + + meta_trend = np.array(meta_trends) + individual_trends = np.array(individual_trends_list) + + # Generate signals + entry_signals = [] + exit_signals = [] + + for i in range(1, len(meta_trend)): + # Entry signal: meta-trend changes from != 1 to == 1 + if meta_trend[i-1] != 1 and meta_trend[i] == 1: + entry_signals.append(i) + + # Exit signal: meta-trend changes to -1 + if meta_trend[i-1] != -1 and meta_trend[i] == -1: + exit_signals.append(i) + + self.incremental_results = { + 'meta_trend': meta_trend, + 'entry_signals': entry_signals, + 'exit_signals': exit_signals, + 'individual_trends': individual_trends + } + + logger.info(f"Incremental indicators: {len(entry_signals)} entry signals, {len(exit_signals)} exit signals") + return self.incremental_results + + except Exception as e: + logger.error(f"Incremental indicators test failed: {e}") + raise + + def compare_results(self) -> Dict[str, bool]: + """ + Compare original and incremental results. + + Returns: + Dictionary with comparison results + """ + logger.info("Comparing original vs incremental results...") + + if self.original_results is None or self.incremental_results is None: + raise ValueError("Must run both tests before comparison") + + comparison = {} + + # Compare meta-trend arrays + orig_meta = self.original_results['meta_trend'] + inc_meta = self.incremental_results['meta_trend'] + + # Handle length differences (original might be shorter due to initialization) + min_length = min(len(orig_meta), len(inc_meta)) + orig_meta_trimmed = orig_meta[-min_length:] + inc_meta_trimmed = inc_meta[-min_length:] + + meta_trend_match = np.array_equal(orig_meta_trimmed, inc_meta_trimmed) + comparison['meta_trend_match'] = meta_trend_match + + if not meta_trend_match: + # Find differences + diff_indices = np.where(orig_meta_trimmed != inc_meta_trimmed)[0] + logger.warning(f"Meta-trend differences at indices: {diff_indices[:10]}...") # Show first 10 + + # Show some examples + for i in diff_indices[:5]: + logger.warning(f"Index {i}: Original={orig_meta_trimmed[i]}, Incremental={inc_meta_trimmed[i]}") + + # Compare individual trends if available + if (self.original_results['individual_trends'] is not None and + self.incremental_results['individual_trends'] is not None): + + orig_trends = self.original_results['individual_trends'] + inc_trends = self.incremental_results['individual_trends'] + + # Trim to same length + orig_trends_trimmed = orig_trends[-min_length:] + inc_trends_trimmed = inc_trends[-min_length:] + + individual_trends_match = np.array_equal(orig_trends_trimmed, inc_trends_trimmed) + comparison['individual_trends_match'] = individual_trends_match + + if not individual_trends_match: + logger.warning("Individual trends do not match") + # Check each Supertrend separately + for st_idx in range(3): + st_match = np.array_equal(orig_trends_trimmed[:, st_idx], inc_trends_trimmed[:, st_idx]) + comparison[f'supertrend_{st_idx}_match'] = st_match + if not st_match: + diff_indices = np.where(orig_trends_trimmed[:, st_idx] != inc_trends_trimmed[:, st_idx])[0] + logger.warning(f"Supertrend {st_idx} differences at indices: {diff_indices[:5]}...") + + # Compare signals + orig_entry = set(self.original_results['entry_signals']) + inc_entry = set(self.incremental_results['entry_signals']) + entry_signals_match = orig_entry == inc_entry + comparison['entry_signals_match'] = entry_signals_match + + if not entry_signals_match: + logger.warning(f"Entry signals differ: Original={orig_entry}, Incremental={inc_entry}") + + orig_exit = set(self.original_results['exit_signals']) + inc_exit = set(self.incremental_results['exit_signals']) + exit_signals_match = orig_exit == inc_exit + comparison['exit_signals_match'] = exit_signals_match + + if not exit_signals_match: + logger.warning(f"Exit signals differ: Original={orig_exit}, Incremental={inc_exit}") + + # Overall match + comparison['overall_match'] = all([ + meta_trend_match, + entry_signals_match, + exit_signals_match + ]) + + return comparison + + def save_detailed_comparison(self, filename: str = "metatrend_comparison.csv"): + """Save detailed comparison data to CSV for analysis.""" + if self.original_results is None or self.incremental_results is None: + logger.warning("No results to save") + return + + # Prepare comparison DataFrame + orig_meta = self.original_results['meta_trend'] + inc_meta = self.incremental_results['meta_trend'] + + min_length = min(len(orig_meta), len(inc_meta)) + + # Get the correct data range for timestamps and prices + data_start_index = self.original_results.get('data_start_index', 0) + comparison_data = self.test_data.iloc[data_start_index:data_start_index + min_length] + + comparison_df = pd.DataFrame({ + 'timestamp': comparison_data['timestamp'].values, + 'close': comparison_data['close'].values, + 'original_meta_trend': orig_meta[:min_length], + 'incremental_meta_trend': inc_meta[:min_length], + 'meta_trend_match': orig_meta[:min_length] == inc_meta[:min_length] + }) + + # Add individual trends if available + if (self.original_results['individual_trends'] is not None and + self.incremental_results['individual_trends'] is not None): + + orig_trends = self.original_results['individual_trends'][:min_length] + inc_trends = self.incremental_results['individual_trends'][:min_length] + + for i in range(3): + comparison_df[f'original_st{i}_trend'] = orig_trends[:, i] + comparison_df[f'incremental_st{i}_trend'] = inc_trends[:, i] + comparison_df[f'st{i}_trend_match'] = orig_trends[:, i] == inc_trends[:, i] + + # Save to results directory + os.makedirs("results", exist_ok=True) + filepath = os.path.join("results", filename) + comparison_df.to_csv(filepath, index=False) + logger.info(f"Detailed comparison saved to {filepath}") + + def save_trend_changes_analysis(self, filename_prefix: str = "trend_changes"): + """Save detailed trend changes analysis for manual comparison.""" + if self.original_results is None or self.incremental_results is None: + logger.warning("No results to save") + return + + # Get the correct data range + data_start_index = self.original_results.get('data_start_index', 0) + orig_meta = self.original_results['meta_trend'] + inc_meta = self.incremental_results['meta_trend'] + min_length = min(len(orig_meta), len(inc_meta)) + comparison_data = self.test_data.iloc[data_start_index:data_start_index + min_length] + + # Analyze original trend changes + original_changes = [] + for i in range(1, len(orig_meta)): + if orig_meta[i] != orig_meta[i-1]: + original_changes.append({ + 'index': i, + 'timestamp': comparison_data.iloc[i]['timestamp'], + 'close_price': comparison_data.iloc[i]['close'], + 'prev_trend': orig_meta[i-1], + 'new_trend': orig_meta[i], + 'change_type': self._get_change_type(orig_meta[i-1], orig_meta[i]) + }) + + # Analyze incremental trend changes + incremental_changes = [] + for i in range(1, len(inc_meta)): + if inc_meta[i] != inc_meta[i-1]: + incremental_changes.append({ + 'index': i, + 'timestamp': comparison_data.iloc[i]['timestamp'], + 'close_price': comparison_data.iloc[i]['close'], + 'prev_trend': inc_meta[i-1], + 'new_trend': inc_meta[i], + 'change_type': self._get_change_type(inc_meta[i-1], inc_meta[i]) + }) + + # Save original trend changes + os.makedirs("results", exist_ok=True) + original_df = pd.DataFrame(original_changes) + original_file = os.path.join("results", f"{filename_prefix}_original.csv") + original_df.to_csv(original_file, index=False) + logger.info(f"Original trend changes saved to {original_file} ({len(original_changes)} changes)") + + # Save incremental trend changes + incremental_df = pd.DataFrame(incremental_changes) + incremental_file = os.path.join("results", f"{filename_prefix}_incremental.csv") + incremental_df.to_csv(incremental_file, index=False) + logger.info(f"Incremental trend changes saved to {incremental_file} ({len(incremental_changes)} changes)") + + # Create side-by-side comparison + comparison_changes = [] + max_changes = max(len(original_changes), len(incremental_changes)) + + for i in range(max_changes): + orig_change = original_changes[i] if i < len(original_changes) else {} + inc_change = incremental_changes[i] if i < len(incremental_changes) else {} + + comparison_changes.append({ + 'change_num': i + 1, + 'orig_index': orig_change.get('index', ''), + 'orig_timestamp': orig_change.get('timestamp', ''), + 'orig_close': orig_change.get('close_price', ''), + 'orig_prev_trend': orig_change.get('prev_trend', ''), + 'orig_new_trend': orig_change.get('new_trend', ''), + 'orig_change_type': orig_change.get('change_type', ''), + 'inc_index': inc_change.get('index', ''), + 'inc_timestamp': inc_change.get('timestamp', ''), + 'inc_close': inc_change.get('close_price', ''), + 'inc_prev_trend': inc_change.get('prev_trend', ''), + 'inc_new_trend': inc_change.get('new_trend', ''), + 'inc_change_type': inc_change.get('change_type', ''), + 'match': (orig_change.get('index') == inc_change.get('index') and + orig_change.get('new_trend') == inc_change.get('new_trend')) if orig_change and inc_change else False + }) + + comparison_df = pd.DataFrame(comparison_changes) + comparison_file = os.path.join("results", f"{filename_prefix}_comparison.csv") + comparison_df.to_csv(comparison_file, index=False) + logger.info(f"Side-by-side comparison saved to {comparison_file}") + + # Create summary statistics + summary = { + 'original_total_changes': len(original_changes), + 'incremental_total_changes': len(incremental_changes), + 'original_entry_signals': len([c for c in original_changes if c['change_type'] == 'ENTRY']), + 'incremental_entry_signals': len([c for c in incremental_changes if c['change_type'] == 'ENTRY']), + 'original_exit_signals': len([c for c in original_changes if c['change_type'] == 'EXIT']), + 'incremental_exit_signals': len([c for c in incremental_changes if c['change_type'] == 'EXIT']), + 'original_to_neutral': len([c for c in original_changes if c['new_trend'] == 0]), + 'incremental_to_neutral': len([c for c in incremental_changes if c['new_trend'] == 0]), + 'matching_changes': len([c for c in comparison_changes if c['match']]), + 'total_comparison_points': max_changes + } + + summary_file = os.path.join("results", f"{filename_prefix}_summary.json") + import json + with open(summary_file, 'w') as f: + json.dump(summary, f, indent=2) + logger.info(f"Summary statistics saved to {summary_file}") + + return { + 'original_changes': original_changes, + 'incremental_changes': incremental_changes, + 'summary': summary + } + + def _get_change_type(self, prev_trend: float, new_trend: float) -> str: + """Classify the type of trend change.""" + if prev_trend != 1 and new_trend == 1: + return 'ENTRY' + elif prev_trend != -1 and new_trend == -1: + return 'EXIT' + elif new_trend == 0: + return 'TO_NEUTRAL' + elif prev_trend == 0 and new_trend != 0: + return 'FROM_NEUTRAL' + else: + return 'OTHER' + + def save_individual_supertrend_analysis(self, filename_prefix: str = "supertrend_individual"): + """Save detailed analysis of individual Supertrend indicators.""" + if (self.original_results is None or self.incremental_results is None or + self.original_results['individual_trends'] is None or + self.incremental_results['individual_trends'] is None): + logger.warning("Individual trends data not available") + return + + data_start_index = self.original_results.get('data_start_index', 0) + orig_trends = self.original_results['individual_trends'] + inc_trends = self.incremental_results['individual_trends'] + min_length = min(len(orig_trends), len(inc_trends)) + comparison_data = self.test_data.iloc[data_start_index:data_start_index + min_length] + + # Analyze each Supertrend indicator separately + for st_idx in range(3): + st_params = self.supertrend_params[st_idx] + st_name = f"ST{st_idx}_P{st_params['period']}_M{st_params['multiplier']}" + + # Original Supertrend changes + orig_st_changes = [] + for i in range(1, len(orig_trends)): + if orig_trends[i, st_idx] != orig_trends[i-1, st_idx]: + orig_st_changes.append({ + 'index': i, + 'timestamp': comparison_data.iloc[i]['timestamp'], + 'close_price': comparison_data.iloc[i]['close'], + 'prev_trend': orig_trends[i-1, st_idx], + 'new_trend': orig_trends[i, st_idx], + 'change_type': 'UP' if orig_trends[i, st_idx] == 1 else 'DOWN' + }) + + # Incremental Supertrend changes + inc_st_changes = [] + for i in range(1, len(inc_trends)): + if inc_trends[i, st_idx] != inc_trends[i-1, st_idx]: + inc_st_changes.append({ + 'index': i, + 'timestamp': comparison_data.iloc[i]['timestamp'], + 'close_price': comparison_data.iloc[i]['close'], + 'prev_trend': inc_trends[i-1, st_idx], + 'new_trend': inc_trends[i, st_idx], + 'change_type': 'UP' if inc_trends[i, st_idx] == 1 else 'DOWN' + }) + + # Save individual Supertrend analysis + os.makedirs("results", exist_ok=True) + + # Original + orig_df = pd.DataFrame(orig_st_changes) + orig_file = os.path.join("results", f"{filename_prefix}_{st_name}_original.csv") + orig_df.to_csv(orig_file, index=False) + + # Incremental + inc_df = pd.DataFrame(inc_st_changes) + inc_file = os.path.join("results", f"{filename_prefix}_{st_name}_incremental.csv") + inc_df.to_csv(inc_file, index=False) + + logger.info(f"Supertrend {st_idx} analysis: Original={len(orig_st_changes)} changes, Incremental={len(inc_st_changes)} changes") + + def save_full_timeline_data(self, filename: str = "full_timeline_comparison.csv"): + """Save complete timeline data with all values for manual analysis.""" + if self.original_results is None or self.incremental_results is None: + logger.warning("No results to save") + return + + data_start_index = self.original_results.get('data_start_index', 0) + orig_meta = self.original_results['meta_trend'] + inc_meta = self.incremental_results['meta_trend'] + min_length = min(len(orig_meta), len(inc_meta)) + comparison_data = self.test_data.iloc[data_start_index:data_start_index + min_length] + + # Create comprehensive timeline + timeline_data = [] + for i in range(min_length): + row_data = { + 'index': i, + 'timestamp': comparison_data.iloc[i]['timestamp'], + 'open': comparison_data.iloc[i]['open'], + 'high': comparison_data.iloc[i]['high'], + 'low': comparison_data.iloc[i]['low'], + 'close': comparison_data.iloc[i]['close'], + 'original_meta_trend': orig_meta[i], + 'incremental_meta_trend': inc_meta[i], + 'meta_trend_match': orig_meta[i] == inc_meta[i], + 'meta_trend_diff': abs(orig_meta[i] - inc_meta[i]) + } + + # Add individual Supertrend data if available + if (self.original_results['individual_trends'] is not None and + self.incremental_results['individual_trends'] is not None): + + orig_trends = self.original_results['individual_trends'] + inc_trends = self.incremental_results['individual_trends'] + + for st_idx in range(3): + st_params = self.supertrend_params[st_idx] + prefix = f"ST{st_idx}_P{st_params['period']}_M{st_params['multiplier']}" + + row_data[f'{prefix}_orig'] = orig_trends[i, st_idx] + row_data[f'{prefix}_inc'] = inc_trends[i, st_idx] + row_data[f'{prefix}_match'] = orig_trends[i, st_idx] == inc_trends[i, st_idx] + + # Mark trend changes + if i > 0: + row_data['orig_meta_changed'] = orig_meta[i] != orig_meta[i-1] + row_data['inc_meta_changed'] = inc_meta[i] != inc_meta[i-1] + row_data['orig_change_type'] = self._get_change_type(orig_meta[i-1], orig_meta[i]) if orig_meta[i] != orig_meta[i-1] else '' + row_data['inc_change_type'] = self._get_change_type(inc_meta[i-1], inc_meta[i]) if inc_meta[i] != inc_meta[i-1] else '' + else: + row_data['orig_meta_changed'] = False + row_data['inc_meta_changed'] = False + row_data['orig_change_type'] = '' + row_data['inc_change_type'] = '' + + timeline_data.append(row_data) + + # Save timeline data + os.makedirs("results", exist_ok=True) + timeline_df = pd.DataFrame(timeline_data) + filepath = os.path.join("results", filename) + timeline_df.to_csv(filepath, index=False) + logger.info(f"Full timeline comparison saved to {filepath} ({len(timeline_data)} rows)") + + return timeline_df + + def run_full_test(self, symbol: str = "BTCUSDT", limit: int = 500) -> bool: + """ + Run the complete comparison test. + + Args: + symbol: Trading symbol to test + limit: Number of data points to test + + Returns: + True if all tests pass, False otherwise + """ + logger.info("=" * 60) + logger.info("STARTING METATREND STRATEGY COMPARISON TEST") + logger.info("=" * 60) + + try: + # Load test data + self.load_test_data(symbol, limit) + logger.info(f"Test data loaded: {len(self.test_data)} points") + + # Test original strategy + logger.info("\n" + "-" * 40) + logger.info("TESTING ORIGINAL STRATEGY") + logger.info("-" * 40) + self.test_original_strategy() + + # Test incremental indicators + logger.info("\n" + "-" * 40) + logger.info("TESTING INCREMENTAL INDICATORS") + logger.info("-" * 40) + self.test_incremental_indicators() + + # Compare results + logger.info("\n" + "-" * 40) + logger.info("COMPARING RESULTS") + logger.info("-" * 40) + comparison = self.compare_results() + + # Save detailed comparison + self.save_detailed_comparison() + + # Save trend changes analysis + self.save_trend_changes_analysis() + + # Save individual supertrend analysis + self.save_individual_supertrend_analysis() + + # Save full timeline data + self.save_full_timeline_data() + + # Print results + logger.info("\n" + "=" * 60) + logger.info("COMPARISON RESULTS") + logger.info("=" * 60) + + for key, value in comparison.items(): + status = "āœ… PASS" if value else "āŒ FAIL" + logger.info(f"{key}: {status}") + + overall_pass = comparison.get('overall_match', False) + + if overall_pass: + logger.info("\nšŸŽ‰ ALL TESTS PASSED! Incremental indicators match original strategy.") + else: + logger.error("\nāŒ TESTS FAILED! Incremental indicators do not match original strategy.") + + return overall_pass + + except Exception as e: + logger.error(f"Test failed with error: {e}") + import traceback + traceback.print_exc() + return False + + +def main(): + """Run the MetaTrend comparison test.""" + test = MetaTrendComparisonTest() + + # Run test with different data sizes + test_cases = [ + ("BTCUSDT", 200), # Small test + ("BTCUSDT", 500), # Medium test + ("BTCUSDT", 1000), # Large test + ] + + all_passed = True + + for symbol, limit in test_cases: + logger.info(f"\n{'='*80}") + logger.info(f"RUNNING TEST CASE: {symbol} with {limit} data points") + logger.info(f"{'='*80}") + + passed = test.run_full_test(symbol, limit) + all_passed = all_passed and passed + + if not passed: + logger.error(f"Test case {symbol}:{limit} FAILED") + break + else: + logger.info(f"Test case {symbol}:{limit} PASSED") + + if all_passed: + logger.info("\nšŸŽ‰ ALL TEST CASES PASSED!") + else: + logger.error("\nāŒ SOME TEST CASES FAILED!") + + return all_passed + + +if __name__ == "__main__": + success = main() + sys.exit(0 if success else 1) \ No newline at end of file