Cycles/test_metatrend_comparison.py
2025-05-26 14:55:03 +08:00

804 lines
35 KiB
Python

"""
MetaTrend Strategy Comparison Test
This test verifies that our incremental indicators produce identical results
to the original DefaultStrategy (metatrend strategy) implementation.
The test compares:
1. Individual Supertrend indicators (3 different parameter sets)
2. Meta-trend calculation (agreement between all 3 Supertrends)
3. Entry/exit signal generation
4. Overall strategy behavior
Test ensures our incremental implementation is mathematically equivalent
to the original batch calculation approach.
"""
import pandas as pd
import numpy as np
import logging
from typing import Dict, List, Tuple
import os
import sys
# Add project root to path
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from cycles.strategies.default_strategy import DefaultStrategy
from cycles.IncStrategies.indicators.supertrend import SupertrendState, SupertrendCollection
from cycles.Analysis.supertrend import Supertrends
from cycles.backtest import Backtest
from cycles.utils.storage import Storage
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class MetaTrendComparisonTest:
"""
Comprehensive test suite for comparing original and incremental MetaTrend implementations.
"""
def __init__(self):
"""Initialize the test suite."""
self.test_data = None
self.original_results = None
self.incremental_results = None
self.storage = Storage(logging=logger)
# Supertrend parameters from original implementation
self.supertrend_params = [
{"period": 12, "multiplier": 3.0},
{"period": 10, "multiplier": 1.0},
{"period": 11, "multiplier": 2.0}
]
def load_test_data(self, symbol: str = "BTCUSD", start_date: str = "2022-01-01", end_date: str = "2023-01-01", limit: int = None) -> pd.DataFrame:
"""
Load test data for comparison using the Storage class.
Args:
symbol: Trading symbol to load (used for filename)
start_date: Start date in YYYY-MM-DD format
end_date: End date in YYYY-MM-DD format
limit: Optional limit on number of data points (applied after date filtering)
Returns:
DataFrame with OHLCV data
"""
logger.info(f"Loading test data for {symbol} from {start_date} to {end_date}")
try:
# Use the Storage class to load data with date filtering
filename = "btcusd_1-min_data.csv"
# Convert date strings to pandas datetime
start_dt = pd.to_datetime(start_date)
end_dt = pd.to_datetime(end_date)
# Load data using Storage class
df = self.storage.load_data(filename, start_dt, end_dt)
if df.empty:
raise ValueError(f"No data found for the specified date range: {start_date} to {end_date}")
logger.info(f"Loaded {len(df)} data points from {start_date} to {end_date}")
logger.info(f"Date range in data: {df.index.min()} to {df.index.max()}")
# Apply limit if specified
if limit is not None and len(df) > limit:
df = df.tail(limit)
logger.info(f"Limited data to last {limit} points")
# Ensure required columns (Storage class should handle column name conversion)
required_cols = ['open', 'high', 'low', 'close', 'volume']
for col in required_cols:
if col not in df.columns:
if col == 'volume':
df['volume'] = 1000.0 # Default volume
else:
raise ValueError(f"Missing required column: {col}")
# Reset index to get timestamp as column for incremental processing
df_with_timestamp = df.reset_index()
self.test_data = df_with_timestamp
logger.info(f"Test data prepared: {len(df_with_timestamp)} rows")
logger.info(f"Columns: {list(df_with_timestamp.columns)}")
logger.info(f"Sample data:\n{df_with_timestamp.head()}")
return df_with_timestamp
except Exception as e:
logger.error(f"Failed to load test data: {e}")
import traceback
traceback.print_exc()
# Fallback to synthetic data if real data loading fails
logger.warning("Falling back to synthetic data generation")
df = self._generate_synthetic_data(limit or 1000)
df_with_timestamp = df.reset_index()
self.test_data = df_with_timestamp
return df_with_timestamp
def _generate_synthetic_data(self, length: int) -> pd.DataFrame:
"""Generate synthetic OHLCV data for testing."""
logger.info(f"Generating {length} synthetic data points")
np.random.seed(42) # For reproducible results
# Generate price series with trend and noise
base_price = 50000.0
trend = np.linspace(0, 0.1, length) # Slight upward trend
noise = np.random.normal(0, 0.02, length) # 2% volatility
close_prices = base_price * (1 + trend + noise.cumsum() * 0.1)
# Generate OHLC from close prices
data = []
timestamps = pd.date_range(start='2024-01-01', periods=length, freq='1min')
for i in range(length):
close = close_prices[i]
volatility = close * 0.01 # 1% intraday volatility
high = close + np.random.uniform(0, volatility)
low = close - np.random.uniform(0, volatility)
open_price = low + np.random.uniform(0, high - low)
# Ensure OHLC relationships
high = max(high, open_price, close)
low = min(low, open_price, close)
data.append({
'timestamp': timestamps[i],
'open': open_price,
'high': high,
'low': low,
'close': close,
'volume': np.random.uniform(100, 1000)
})
df = pd.DataFrame(data)
# Set timestamp as index for compatibility with original strategy
df.set_index('timestamp', inplace=True)
return df
def test_original_strategy(self) -> Dict:
"""
Test the original DefaultStrategy implementation.
Returns:
Dictionary with original strategy results
"""
logger.info("Testing original DefaultStrategy implementation...")
try:
# Create indexed DataFrame for original strategy (needs DatetimeIndex)
indexed_data = self.test_data.set_index('timestamp')
# The original strategy limits data to 200 points for performance
# We need to account for this in our comparison
if len(indexed_data) > 200:
original_data_used = indexed_data.tail(200)
logger.info(f"Original strategy will use last {len(original_data_used)} points of {len(indexed_data)} total points")
else:
original_data_used = indexed_data
# Create a minimal backtest instance for strategy initialization
class MockBacktester:
def __init__(self, df):
self.original_df = df
self.min1_df = df
self.strategies = {}
backtester = MockBacktester(original_data_used)
# Initialize original strategy
strategy = DefaultStrategy(weight=1.0, params={
"stop_loss_pct": 0.03,
"timeframe": "1min" # Use 1min since our test data is 1min
})
# Initialize strategy (this calculates meta-trend)
strategy.initialize(backtester)
# Extract results
if hasattr(strategy, 'meta_trend') and strategy.meta_trend is not None:
meta_trend = strategy.meta_trend
trends = None # Individual trends not directly available from strategy
else:
# Fallback: calculate manually using original Supertrends class
logger.info("Strategy meta_trend not available, calculating manually...")
supertrends = Supertrends(original_data_used, verbose=False)
supertrend_results_list = supertrends.calculate_supertrend_indicators()
# Extract trend arrays
trends = [st['results']['trend'] for st in supertrend_results_list]
trends_arr = np.stack(trends, axis=1)
# Calculate meta-trend
meta_trend = np.where(
(trends_arr[:,0] == trends_arr[:,1]) & (trends_arr[:,1] == trends_arr[:,2]),
trends_arr[:,0],
0
)
# Generate signals
entry_signals = []
exit_signals = []
for i in range(1, len(meta_trend)):
# Entry signal: meta-trend changes from != 1 to == 1
if meta_trend[i-1] != 1 and meta_trend[i] == 1:
entry_signals.append(i)
# Exit signal: meta-trend changes to -1
if meta_trend[i-1] != -1 and meta_trend[i] == -1:
exit_signals.append(i)
self.original_results = {
'meta_trend': meta_trend,
'entry_signals': entry_signals,
'exit_signals': exit_signals,
'individual_trends': trends,
'data_start_index': len(self.test_data) - len(original_data_used) # Track where original data starts
}
logger.info(f"Original strategy: {len(entry_signals)} entry signals, {len(exit_signals)} exit signals")
logger.info(f"Meta-trend length: {len(meta_trend)}, unique values: {np.unique(meta_trend)}")
return self.original_results
except Exception as e:
logger.error(f"Original strategy test failed: {e}")
import traceback
traceback.print_exc()
raise
def test_incremental_indicators(self) -> Dict:
"""
Test the incremental indicators implementation.
Returns:
Dictionary with incremental results
"""
logger.info("Testing incremental indicators implementation...")
try:
# Create SupertrendCollection with same parameters as original
supertrend_configs = [
(params["period"], params["multiplier"])
for params in self.supertrend_params
]
collection = SupertrendCollection(supertrend_configs)
# Determine data range to match original strategy
data_start_index = self.original_results.get('data_start_index', 0)
test_data_subset = self.test_data.iloc[data_start_index:]
logger.info(f"Processing incremental indicators on {len(test_data_subset)} points (starting from index {data_start_index})")
# Process data incrementally
meta_trends = []
individual_trends_list = []
for _, row in test_data_subset.iterrows():
ohlc = {
'open': row['open'],
'high': row['high'],
'low': row['low'],
'close': row['close']
}
result = collection.update(ohlc)
meta_trends.append(result['meta_trend'])
individual_trends_list.append(result['trends'])
meta_trend = np.array(meta_trends)
individual_trends = np.array(individual_trends_list)
# Generate signals
entry_signals = []
exit_signals = []
for i in range(1, len(meta_trend)):
# Entry signal: meta-trend changes from != 1 to == 1
if meta_trend[i-1] != 1 and meta_trend[i] == 1:
entry_signals.append(i)
# Exit signal: meta-trend changes to -1
if meta_trend[i-1] != -1 and meta_trend[i] == -1:
exit_signals.append(i)
self.incremental_results = {
'meta_trend': meta_trend,
'entry_signals': entry_signals,
'exit_signals': exit_signals,
'individual_trends': individual_trends
}
logger.info(f"Incremental indicators: {len(entry_signals)} entry signals, {len(exit_signals)} exit signals")
return self.incremental_results
except Exception as e:
logger.error(f"Incremental indicators test failed: {e}")
raise
def compare_results(self) -> Dict[str, bool]:
"""
Compare original and incremental results.
Returns:
Dictionary with comparison results
"""
logger.info("Comparing original vs incremental results...")
if self.original_results is None or self.incremental_results is None:
raise ValueError("Must run both tests before comparison")
comparison = {}
# Compare meta-trend arrays
orig_meta = self.original_results['meta_trend']
inc_meta = self.incremental_results['meta_trend']
# Handle length differences (original might be shorter due to initialization)
min_length = min(len(orig_meta), len(inc_meta))
orig_meta_trimmed = orig_meta[-min_length:]
inc_meta_trimmed = inc_meta[-min_length:]
meta_trend_match = np.array_equal(orig_meta_trimmed, inc_meta_trimmed)
comparison['meta_trend_match'] = meta_trend_match
if not meta_trend_match:
# Find differences
diff_indices = np.where(orig_meta_trimmed != inc_meta_trimmed)[0]
logger.warning(f"Meta-trend differences at indices: {diff_indices[:10]}...") # Show first 10
# Show some examples
for i in diff_indices[:5]:
logger.warning(f"Index {i}: Original={orig_meta_trimmed[i]}, Incremental={inc_meta_trimmed[i]}")
# Compare individual trends if available
if (self.original_results['individual_trends'] is not None and
self.incremental_results['individual_trends'] is not None):
orig_trends = self.original_results['individual_trends']
inc_trends = self.incremental_results['individual_trends']
# Trim to same length
orig_trends_trimmed = orig_trends[-min_length:]
inc_trends_trimmed = inc_trends[-min_length:]
individual_trends_match = np.array_equal(orig_trends_trimmed, inc_trends_trimmed)
comparison['individual_trends_match'] = individual_trends_match
if not individual_trends_match:
logger.warning("Individual trends do not match")
# Check each Supertrend separately
for st_idx in range(3):
st_match = np.array_equal(orig_trends_trimmed[:, st_idx], inc_trends_trimmed[:, st_idx])
comparison[f'supertrend_{st_idx}_match'] = st_match
if not st_match:
diff_indices = np.where(orig_trends_trimmed[:, st_idx] != inc_trends_trimmed[:, st_idx])[0]
logger.warning(f"Supertrend {st_idx} differences at indices: {diff_indices[:5]}...")
# Compare signals
orig_entry = set(self.original_results['entry_signals'])
inc_entry = set(self.incremental_results['entry_signals'])
entry_signals_match = orig_entry == inc_entry
comparison['entry_signals_match'] = entry_signals_match
if not entry_signals_match:
logger.warning(f"Entry signals differ: Original={orig_entry}, Incremental={inc_entry}")
orig_exit = set(self.original_results['exit_signals'])
inc_exit = set(self.incremental_results['exit_signals'])
exit_signals_match = orig_exit == inc_exit
comparison['exit_signals_match'] = exit_signals_match
if not exit_signals_match:
logger.warning(f"Exit signals differ: Original={orig_exit}, Incremental={inc_exit}")
# Overall match
comparison['overall_match'] = all([
meta_trend_match,
entry_signals_match,
exit_signals_match
])
return comparison
def save_detailed_comparison(self, filename: str = "metatrend_comparison.csv"):
"""Save detailed comparison data to CSV for analysis."""
if self.original_results is None or self.incremental_results is None:
logger.warning("No results to save")
return
# Prepare comparison DataFrame
orig_meta = self.original_results['meta_trend']
inc_meta = self.incremental_results['meta_trend']
min_length = min(len(orig_meta), len(inc_meta))
# Get the correct data range for timestamps and prices
data_start_index = self.original_results.get('data_start_index', 0)
comparison_data = self.test_data.iloc[data_start_index:data_start_index + min_length]
comparison_df = pd.DataFrame({
'timestamp': comparison_data['timestamp'].values,
'close': comparison_data['close'].values,
'original_meta_trend': orig_meta[:min_length],
'incremental_meta_trend': inc_meta[:min_length],
'meta_trend_match': orig_meta[:min_length] == inc_meta[:min_length]
})
# Add individual trends if available
if (self.original_results['individual_trends'] is not None and
self.incremental_results['individual_trends'] is not None):
orig_trends = self.original_results['individual_trends'][:min_length]
inc_trends = self.incremental_results['individual_trends'][:min_length]
for i in range(3):
comparison_df[f'original_st{i}_trend'] = orig_trends[:, i]
comparison_df[f'incremental_st{i}_trend'] = inc_trends[:, i]
comparison_df[f'st{i}_trend_match'] = orig_trends[:, i] == inc_trends[:, i]
# Save to results directory
os.makedirs("results", exist_ok=True)
filepath = os.path.join("results", filename)
comparison_df.to_csv(filepath, index=False)
logger.info(f"Detailed comparison saved to {filepath}")
def save_trend_changes_analysis(self, filename_prefix: str = "trend_changes"):
"""Save detailed trend changes analysis for manual comparison."""
if self.original_results is None or self.incremental_results is None:
logger.warning("No results to save")
return
# Get the correct data range
data_start_index = self.original_results.get('data_start_index', 0)
orig_meta = self.original_results['meta_trend']
inc_meta = self.incremental_results['meta_trend']
min_length = min(len(orig_meta), len(inc_meta))
comparison_data = self.test_data.iloc[data_start_index:data_start_index + min_length]
# Analyze original trend changes
original_changes = []
for i in range(1, len(orig_meta)):
if orig_meta[i] != orig_meta[i-1]:
original_changes.append({
'index': i,
'timestamp': comparison_data.iloc[i]['timestamp'],
'close_price': comparison_data.iloc[i]['close'],
'prev_trend': orig_meta[i-1],
'new_trend': orig_meta[i],
'change_type': self._get_change_type(orig_meta[i-1], orig_meta[i])
})
# Analyze incremental trend changes
incremental_changes = []
for i in range(1, len(inc_meta)):
if inc_meta[i] != inc_meta[i-1]:
incremental_changes.append({
'index': i,
'timestamp': comparison_data.iloc[i]['timestamp'],
'close_price': comparison_data.iloc[i]['close'],
'prev_trend': inc_meta[i-1],
'new_trend': inc_meta[i],
'change_type': self._get_change_type(inc_meta[i-1], inc_meta[i])
})
# Save original trend changes
os.makedirs("results", exist_ok=True)
original_df = pd.DataFrame(original_changes)
original_file = os.path.join("results", f"{filename_prefix}_original.csv")
original_df.to_csv(original_file, index=False)
logger.info(f"Original trend changes saved to {original_file} ({len(original_changes)} changes)")
# Save incremental trend changes
incremental_df = pd.DataFrame(incremental_changes)
incremental_file = os.path.join("results", f"{filename_prefix}_incremental.csv")
incremental_df.to_csv(incremental_file, index=False)
logger.info(f"Incremental trend changes saved to {incremental_file} ({len(incremental_changes)} changes)")
# Create side-by-side comparison
comparison_changes = []
max_changes = max(len(original_changes), len(incremental_changes))
for i in range(max_changes):
orig_change = original_changes[i] if i < len(original_changes) else {}
inc_change = incremental_changes[i] if i < len(incremental_changes) else {}
comparison_changes.append({
'change_num': i + 1,
'orig_index': orig_change.get('index', ''),
'orig_timestamp': orig_change.get('timestamp', ''),
'orig_close': orig_change.get('close_price', ''),
'orig_prev_trend': orig_change.get('prev_trend', ''),
'orig_new_trend': orig_change.get('new_trend', ''),
'orig_change_type': orig_change.get('change_type', ''),
'inc_index': inc_change.get('index', ''),
'inc_timestamp': inc_change.get('timestamp', ''),
'inc_close': inc_change.get('close_price', ''),
'inc_prev_trend': inc_change.get('prev_trend', ''),
'inc_new_trend': inc_change.get('new_trend', ''),
'inc_change_type': inc_change.get('change_type', ''),
'match': (orig_change.get('index') == inc_change.get('index') and
orig_change.get('new_trend') == inc_change.get('new_trend')) if orig_change and inc_change else False
})
comparison_df = pd.DataFrame(comparison_changes)
comparison_file = os.path.join("results", f"{filename_prefix}_comparison.csv")
comparison_df.to_csv(comparison_file, index=False)
logger.info(f"Side-by-side comparison saved to {comparison_file}")
# Create summary statistics
summary = {
'original_total_changes': len(original_changes),
'incremental_total_changes': len(incremental_changes),
'original_entry_signals': len([c for c in original_changes if c['change_type'] == 'ENTRY']),
'incremental_entry_signals': len([c for c in incremental_changes if c['change_type'] == 'ENTRY']),
'original_exit_signals': len([c for c in original_changes if c['change_type'] == 'EXIT']),
'incremental_exit_signals': len([c for c in incremental_changes if c['change_type'] == 'EXIT']),
'original_to_neutral': len([c for c in original_changes if c['new_trend'] == 0]),
'incremental_to_neutral': len([c for c in incremental_changes if c['new_trend'] == 0]),
'matching_changes': len([c for c in comparison_changes if c['match']]),
'total_comparison_points': max_changes
}
summary_file = os.path.join("results", f"{filename_prefix}_summary.json")
import json
with open(summary_file, 'w') as f:
json.dump(summary, f, indent=2)
logger.info(f"Summary statistics saved to {summary_file}")
return {
'original_changes': original_changes,
'incremental_changes': incremental_changes,
'summary': summary
}
def _get_change_type(self, prev_trend: float, new_trend: float) -> str:
"""Classify the type of trend change."""
if prev_trend != 1 and new_trend == 1:
return 'ENTRY'
elif prev_trend != -1 and new_trend == -1:
return 'EXIT'
elif new_trend == 0:
return 'TO_NEUTRAL'
elif prev_trend == 0 and new_trend != 0:
return 'FROM_NEUTRAL'
else:
return 'OTHER'
def save_individual_supertrend_analysis(self, filename_prefix: str = "supertrend_individual"):
"""Save detailed analysis of individual Supertrend indicators."""
if (self.original_results is None or self.incremental_results is None or
self.original_results['individual_trends'] is None or
self.incremental_results['individual_trends'] is None):
logger.warning("Individual trends data not available")
return
data_start_index = self.original_results.get('data_start_index', 0)
orig_trends = self.original_results['individual_trends']
inc_trends = self.incremental_results['individual_trends']
min_length = min(len(orig_trends), len(inc_trends))
comparison_data = self.test_data.iloc[data_start_index:data_start_index + min_length]
# Analyze each Supertrend indicator separately
for st_idx in range(3):
st_params = self.supertrend_params[st_idx]
st_name = f"ST{st_idx}_P{st_params['period']}_M{st_params['multiplier']}"
# Original Supertrend changes
orig_st_changes = []
for i in range(1, len(orig_trends)):
if orig_trends[i, st_idx] != orig_trends[i-1, st_idx]:
orig_st_changes.append({
'index': i,
'timestamp': comparison_data.iloc[i]['timestamp'],
'close_price': comparison_data.iloc[i]['close'],
'prev_trend': orig_trends[i-1, st_idx],
'new_trend': orig_trends[i, st_idx],
'change_type': 'UP' if orig_trends[i, st_idx] == 1 else 'DOWN'
})
# Incremental Supertrend changes
inc_st_changes = []
for i in range(1, len(inc_trends)):
if inc_trends[i, st_idx] != inc_trends[i-1, st_idx]:
inc_st_changes.append({
'index': i,
'timestamp': comparison_data.iloc[i]['timestamp'],
'close_price': comparison_data.iloc[i]['close'],
'prev_trend': inc_trends[i-1, st_idx],
'new_trend': inc_trends[i, st_idx],
'change_type': 'UP' if inc_trends[i, st_idx] == 1 else 'DOWN'
})
# Save individual Supertrend analysis
os.makedirs("results", exist_ok=True)
# Original
orig_df = pd.DataFrame(orig_st_changes)
orig_file = os.path.join("results", f"{filename_prefix}_{st_name}_original.csv")
orig_df.to_csv(orig_file, index=False)
# Incremental
inc_df = pd.DataFrame(inc_st_changes)
inc_file = os.path.join("results", f"{filename_prefix}_{st_name}_incremental.csv")
inc_df.to_csv(inc_file, index=False)
logger.info(f"Supertrend {st_idx} analysis: Original={len(orig_st_changes)} changes, Incremental={len(inc_st_changes)} changes")
def save_full_timeline_data(self, filename: str = "full_timeline_comparison.csv"):
"""Save complete timeline data with all values for manual analysis."""
if self.original_results is None or self.incremental_results is None:
logger.warning("No results to save")
return
data_start_index = self.original_results.get('data_start_index', 0)
orig_meta = self.original_results['meta_trend']
inc_meta = self.incremental_results['meta_trend']
min_length = min(len(orig_meta), len(inc_meta))
comparison_data = self.test_data.iloc[data_start_index:data_start_index + min_length]
# Create comprehensive timeline
timeline_data = []
for i in range(min_length):
row_data = {
'index': i,
'timestamp': comparison_data.iloc[i]['timestamp'],
'open': comparison_data.iloc[i]['open'],
'high': comparison_data.iloc[i]['high'],
'low': comparison_data.iloc[i]['low'],
'close': comparison_data.iloc[i]['close'],
'original_meta_trend': orig_meta[i],
'incremental_meta_trend': inc_meta[i],
'meta_trend_match': orig_meta[i] == inc_meta[i],
'meta_trend_diff': abs(orig_meta[i] - inc_meta[i])
}
# Add individual Supertrend data if available
if (self.original_results['individual_trends'] is not None and
self.incremental_results['individual_trends'] is not None):
orig_trends = self.original_results['individual_trends']
inc_trends = self.incremental_results['individual_trends']
for st_idx in range(3):
st_params = self.supertrend_params[st_idx]
prefix = f"ST{st_idx}_P{st_params['period']}_M{st_params['multiplier']}"
row_data[f'{prefix}_orig'] = orig_trends[i, st_idx]
row_data[f'{prefix}_inc'] = inc_trends[i, st_idx]
row_data[f'{prefix}_match'] = orig_trends[i, st_idx] == inc_trends[i, st_idx]
# Mark trend changes
if i > 0:
row_data['orig_meta_changed'] = orig_meta[i] != orig_meta[i-1]
row_data['inc_meta_changed'] = inc_meta[i] != inc_meta[i-1]
row_data['orig_change_type'] = self._get_change_type(orig_meta[i-1], orig_meta[i]) if orig_meta[i] != orig_meta[i-1] else ''
row_data['inc_change_type'] = self._get_change_type(inc_meta[i-1], inc_meta[i]) if inc_meta[i] != inc_meta[i-1] else ''
else:
row_data['orig_meta_changed'] = False
row_data['inc_meta_changed'] = False
row_data['orig_change_type'] = ''
row_data['inc_change_type'] = ''
timeline_data.append(row_data)
# Save timeline data
os.makedirs("results", exist_ok=True)
timeline_df = pd.DataFrame(timeline_data)
filepath = os.path.join("results", filename)
timeline_df.to_csv(filepath, index=False)
logger.info(f"Full timeline comparison saved to {filepath} ({len(timeline_data)} rows)")
return timeline_df
def run_full_test(self, symbol: str = "BTCUSD", start_date: str = "2022-01-01", end_date: str = "2023-01-01", limit: int = None) -> bool:
"""
Run the complete comparison test.
Args:
symbol: Trading symbol to test
start_date: Start date in YYYY-MM-DD format
end_date: End date in YYYY-MM-DD format
limit: Optional limit on number of data points (applied after date filtering)
Returns:
True if all tests pass, False otherwise
"""
logger.info("=" * 60)
logger.info("STARTING METATREND STRATEGY COMPARISON TEST")
logger.info("=" * 60)
try:
# Load test data
self.load_test_data(symbol, start_date, end_date, limit)
logger.info(f"Test data loaded: {len(self.test_data)} points")
# Test original strategy
logger.info("\n" + "-" * 40)
logger.info("TESTING ORIGINAL STRATEGY")
logger.info("-" * 40)
self.test_original_strategy()
# Test incremental indicators
logger.info("\n" + "-" * 40)
logger.info("TESTING INCREMENTAL INDICATORS")
logger.info("-" * 40)
self.test_incremental_indicators()
# Compare results
logger.info("\n" + "-" * 40)
logger.info("COMPARING RESULTS")
logger.info("-" * 40)
comparison = self.compare_results()
# Save detailed comparison
self.save_detailed_comparison()
# Save trend changes analysis
self.save_trend_changes_analysis()
# Save individual supertrend analysis
self.save_individual_supertrend_analysis()
# Save full timeline data
self.save_full_timeline_data()
# Print results
logger.info("\n" + "=" * 60)
logger.info("COMPARISON RESULTS")
logger.info("=" * 60)
for key, value in comparison.items():
status = "✅ PASS" if value else "❌ FAIL"
logger.info(f"{key}: {status}")
overall_pass = comparison.get('overall_match', False)
if overall_pass:
logger.info("\n🎉 ALL TESTS PASSED! Incremental indicators match original strategy.")
else:
logger.error("\n❌ TESTS FAILED! Incremental indicators do not match original strategy.")
return overall_pass
except Exception as e:
logger.error(f"Test failed with error: {e}")
import traceback
traceback.print_exc()
return False
def main():
"""Run the MetaTrend comparison test."""
test = MetaTrendComparisonTest()
# Run test with real BTCUSD data from 2022-01-01 to 2023-01-01
logger.info(f"\n{'='*80}")
logger.info(f"RUNNING METATREND COMPARISON TEST")
logger.info(f"Using real BTCUSD data from 2022-01-01 to 2023-01-01")
logger.info(f"{'='*80}")
# Test with the full year of data (no limit)
passed = test.run_full_test("BTCUSD", "2022-01-01", "2023-01-01", limit=None)
if passed:
logger.info("\n🎉 TEST PASSED! Incremental indicators match original strategy.")
else:
logger.error("\n❌ TEST FAILED! Incremental indicators do not match original strategy.")
return passed
if __name__ == "__main__":
success = main()
sys.exit(0 if success else 1)