Add incremental MetaTrend strategy implementation

- Introduced `IncMetaTrendStrategy` for real-time processing of the MetaTrend trading strategy, utilizing three Supertrend indicators.
- Added comprehensive documentation in `METATREND_IMPLEMENTATION.md` detailing architecture, key components, and usage examples.
- Updated `__init__.py` to include the new strategy in the strategy registry.
- Created tests to compare the incremental strategy's signals against the original implementation, ensuring mathematical equivalence.
- Developed visual comparison scripts to analyze performance and signal accuracy between original and incremental strategies.
This commit is contained in:
Vasily.onl
2025-05-26 16:09:32 +08:00
parent b1f80099fe
commit ba78539cbb
10 changed files with 2975 additions and 98 deletions

View File

@@ -0,0 +1,493 @@
"""
Original vs Incremental Strategy Comparison Plot
This script creates plots comparing:
1. Original DefaultStrategy (with bug)
2. Incremental IncMetaTrendStrategy
Using full year data from 2022-01-01 to 2023-01-01
"""
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
import seaborn as sns
import logging
from typing import Dict, List, Tuple
import os
import sys
# Add project root to path
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from cycles.strategies.default_strategy import DefaultStrategy
from cycles.IncStrategies.metatrend_strategy import IncMetaTrendStrategy
from cycles.utils.storage import Storage
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Set style for better plots
plt.style.use('seaborn-v0_8')
sns.set_palette("husl")
class OriginalVsIncrementalPlotter:
"""Class to create comparison plots between original and incremental strategies."""
def __init__(self):
"""Initialize the plotter."""
self.storage = Storage(logging=logger)
self.test_data = None
self.original_signals = []
self.incremental_signals = []
self.original_meta_trend = None
self.incremental_meta_trend = []
self.individual_trends = []
def load_and_prepare_data(self, start_date: str = "2023-01-01", end_date: str = "2024-01-01") -> pd.DataFrame:
"""Load test data for the specified date range."""
logger.info(f"Loading data from {start_date} to {end_date}")
try:
# Load data for the full year
filename = "btcusd_1-min_data.csv"
start_dt = pd.to_datetime(start_date)
end_dt = pd.to_datetime(end_date)
df = self.storage.load_data(filename, start_dt, end_dt)
# Reset index to get timestamp as column
df_with_timestamp = df.reset_index()
self.test_data = df_with_timestamp
logger.info(f"Loaded {len(df_with_timestamp)} data points")
logger.info(f"Date range: {df_with_timestamp['timestamp'].min()} to {df_with_timestamp['timestamp'].max()}")
return df_with_timestamp
except Exception as e:
logger.error(f"Failed to load test data: {e}")
raise
def run_original_strategy(self) -> Tuple[List[Dict], np.ndarray]:
"""Run original strategy and extract signals and meta-trend."""
logger.info("Running Original DefaultStrategy...")
# Create indexed DataFrame for original strategy
indexed_data = self.test_data.set_index('timestamp')
# Limit to 200 points like original strategy does
if len(indexed_data) > 200:
original_data_used = indexed_data.tail(200)
data_start_index = len(self.test_data) - 200
logger.info(f"Original strategy using last 200 points out of {len(indexed_data)} total")
else:
original_data_used = indexed_data
data_start_index = 0
# Create mock backtester
class MockBacktester:
def __init__(self, df):
self.original_df = df
self.min1_df = df
self.strategies = {}
backtester = MockBacktester(original_data_used)
# Initialize original strategy
strategy = DefaultStrategy(weight=1.0, params={
"stop_loss_pct": 0.03,
"timeframe": "1min"
})
strategy.initialize(backtester)
# Extract signals and meta-trend
signals = []
meta_trend = strategy.meta_trend
for i in range(len(original_data_used)):
# Get entry signal
entry_signal = strategy.get_entry_signal(backtester, i)
if entry_signal.signal_type == "ENTRY":
signals.append({
'index': i,
'global_index': data_start_index + i,
'timestamp': original_data_used.index[i],
'close': original_data_used.iloc[i]['close'],
'signal_type': 'ENTRY',
'confidence': entry_signal.confidence,
'source': 'original'
})
# Get exit signal
exit_signal = strategy.get_exit_signal(backtester, i)
if exit_signal.signal_type == "EXIT":
signals.append({
'index': i,
'global_index': data_start_index + i,
'timestamp': original_data_used.index[i],
'close': original_data_used.iloc[i]['close'],
'signal_type': 'EXIT',
'confidence': exit_signal.confidence,
'source': 'original'
})
logger.info(f"Original strategy generated {len(signals)} signals")
# Count signal types
entry_count = len([s for s in signals if s['signal_type'] == 'ENTRY'])
exit_count = len([s for s in signals if s['signal_type'] == 'EXIT'])
logger.info(f"Original: {entry_count} entries, {exit_count} exits")
return signals, meta_trend, data_start_index
def run_incremental_strategy(self, data_start_index: int = 0) -> Tuple[List[Dict], List[int], List[List[int]]]:
"""Run incremental strategy and extract signals, meta-trend, and individual trends."""
logger.info("Running Incremental IncMetaTrendStrategy...")
# Create strategy instance
strategy = IncMetaTrendStrategy("metatrend", weight=1.0, params={
"timeframe": "1min",
"enable_logging": False
})
# Determine data range to match original strategy
if len(self.test_data) > 200:
test_data_subset = self.test_data.tail(200)
logger.info(f"Incremental strategy using last 200 points out of {len(self.test_data)} total")
else:
test_data_subset = self.test_data
# Process data incrementally and collect signals
signals = []
meta_trends = []
individual_trends_list = []
for idx, (_, row) in enumerate(test_data_subset.iterrows()):
ohlc = {
'open': row['open'],
'high': row['high'],
'low': row['low'],
'close': row['close']
}
# Update strategy with new data point
strategy.calculate_on_data(ohlc, row['timestamp'])
# Get current meta-trend and individual trends
current_meta_trend = strategy.get_current_meta_trend()
meta_trends.append(current_meta_trend)
# Get individual Supertrend states
individual_states = strategy.get_individual_supertrend_states()
if individual_states and len(individual_states) >= 3:
individual_trends = [state.get('current_trend', 0) for state in individual_states]
else:
individual_trends = [0, 0, 0] # Default if not available
individual_trends_list.append(individual_trends)
# Check for entry signal
entry_signal = strategy.get_entry_signal()
if entry_signal.signal_type == "ENTRY":
signals.append({
'index': idx,
'global_index': data_start_index + idx,
'timestamp': row['timestamp'],
'close': row['close'],
'signal_type': 'ENTRY',
'confidence': entry_signal.confidence,
'source': 'incremental'
})
# Check for exit signal
exit_signal = strategy.get_exit_signal()
if exit_signal.signal_type == "EXIT":
signals.append({
'index': idx,
'global_index': data_start_index + idx,
'timestamp': row['timestamp'],
'close': row['close'],
'signal_type': 'EXIT',
'confidence': exit_signal.confidence,
'source': 'incremental'
})
logger.info(f"Incremental strategy generated {len(signals)} signals")
# Count signal types
entry_count = len([s for s in signals if s['signal_type'] == 'ENTRY'])
exit_count = len([s for s in signals if s['signal_type'] == 'EXIT'])
logger.info(f"Incremental: {entry_count} entries, {exit_count} exits")
return signals, meta_trends, individual_trends_list
def create_comparison_plot(self, save_path: str = "results/original_vs_incremental_plot.png"):
"""Create comparison plot between original and incremental strategies."""
logger.info("Creating original vs incremental comparison plot...")
# Load and prepare data
self.load_and_prepare_data(start_date="2023-01-01", end_date="2024-01-01")
# Run both strategies
self.original_signals, self.original_meta_trend, data_start_index = self.run_original_strategy()
self.incremental_signals, self.incremental_meta_trend, self.individual_trends = self.run_incremental_strategy(data_start_index)
# Prepare data for plotting (last 200 points to match strategies)
if len(self.test_data) > 200:
plot_data = self.test_data.tail(200).copy()
else:
plot_data = self.test_data.copy()
plot_data['timestamp'] = pd.to_datetime(plot_data['timestamp'])
# Create figure with subplots
fig, axes = plt.subplots(3, 1, figsize=(16, 15))
fig.suptitle('Original vs Incremental MetaTrend Strategy Comparison\n(Data: 2022-01-01 to 2023-01-01)',
fontsize=16, fontweight='bold')
# Plot 1: Price with signals
self._plot_price_with_signals(axes[0], plot_data)
# Plot 2: Meta-trend comparison
self._plot_meta_trends(axes[1], plot_data)
# Plot 3: Signal timing comparison
self._plot_signal_timing(axes[2], plot_data)
# Adjust layout and save
plt.tight_layout()
os.makedirs("results", exist_ok=True)
plt.savefig(save_path, dpi=300, bbox_inches='tight')
logger.info(f"Plot saved to {save_path}")
plt.show()
def _plot_price_with_signals(self, ax, plot_data):
"""Plot price data with signals overlaid."""
ax.set_title('BTC Price with Trading Signals', fontsize=14, fontweight='bold')
# Plot price
ax.plot(plot_data['timestamp'], plot_data['close'],
color='black', linewidth=1.5, label='BTC Price', alpha=0.9, zorder=1)
# Calculate price range for offset calculation
price_range = plot_data['close'].max() - plot_data['close'].min()
offset_amount = price_range * 0.02 # 2% of price range for offset
# Plot signals with enhanced styling and offsets
signal_colors = {
'original': {'ENTRY': '#FF4444', 'EXIT': '#CC0000'}, # Bright red tones
'incremental': {'ENTRY': '#00AA00', 'EXIT': '#006600'} # Bright green tones
}
signal_markers = {'ENTRY': '^', 'EXIT': 'v'}
signal_sizes = {'ENTRY': 150, 'EXIT': 120}
# Plot original signals (offset downward)
original_entry_plotted = False
original_exit_plotted = False
for signal in self.original_signals:
if signal['index'] < len(plot_data):
timestamp = plot_data.iloc[signal['index']]['timestamp']
# Offset original signals downward
price = signal['close'] - offset_amount
label = None
if signal['signal_type'] == 'ENTRY' and not original_entry_plotted:
label = "Original Entry (buggy)"
original_entry_plotted = True
elif signal['signal_type'] == 'EXIT' and not original_exit_plotted:
label = "Original Exit (buggy)"
original_exit_plotted = True
ax.scatter(timestamp, price,
c=signal_colors['original'][signal['signal_type']],
marker=signal_markers[signal['signal_type']],
s=signal_sizes[signal['signal_type']],
alpha=0.8, edgecolors='white', linewidth=2,
label=label, zorder=3)
# Plot incremental signals (offset upward)
inc_entry_plotted = False
inc_exit_plotted = False
for signal in self.incremental_signals:
if signal['index'] < len(plot_data):
timestamp = plot_data.iloc[signal['index']]['timestamp']
# Offset incremental signals upward
price = signal['close'] + offset_amount
label = None
if signal['signal_type'] == 'ENTRY' and not inc_entry_plotted:
label = "Incremental Entry (correct)"
inc_entry_plotted = True
elif signal['signal_type'] == 'EXIT' and not inc_exit_plotted:
label = "Incremental Exit (correct)"
inc_exit_plotted = True
ax.scatter(timestamp, price,
c=signal_colors['incremental'][signal['signal_type']],
marker=signal_markers[signal['signal_type']],
s=signal_sizes[signal['signal_type']],
alpha=0.9, edgecolors='black', linewidth=1.5,
label=label, zorder=4)
# Add connecting lines to show actual price for offset signals
for signal in self.original_signals:
if signal['index'] < len(plot_data):
timestamp = plot_data.iloc[signal['index']]['timestamp']
actual_price = signal['close']
offset_price = actual_price - offset_amount
ax.plot([timestamp, timestamp], [actual_price, offset_price],
color=signal_colors['original'][signal['signal_type']],
alpha=0.3, linewidth=1, zorder=2)
for signal in self.incremental_signals:
if signal['index'] < len(plot_data):
timestamp = plot_data.iloc[signal['index']]['timestamp']
actual_price = signal['close']
offset_price = actual_price + offset_amount
ax.plot([timestamp, timestamp], [actual_price, offset_price],
color=signal_colors['incremental'][signal['signal_type']],
alpha=0.3, linewidth=1, zorder=2)
ax.set_ylabel('Price (USD)')
ax.legend(loc='upper left', fontsize=10, framealpha=0.9)
ax.grid(True, alpha=0.3)
# Format x-axis
ax.xaxis.set_major_formatter(mdates.DateFormatter('%m-%d %H:%M'))
ax.xaxis.set_major_locator(mdates.DayLocator(interval=1))
plt.setp(ax.xaxis.get_majorticklabels(), rotation=45)
# Add text annotation explaining the offset
ax.text(0.02, 0.02, 'Note: Original signals offset down, Incremental signals offset up for clarity',
transform=ax.transAxes, fontsize=9, style='italic',
bbox=dict(boxstyle='round,pad=0.3', facecolor='lightgray', alpha=0.7))
def _plot_meta_trends(self, ax, plot_data):
"""Plot meta-trend comparison."""
ax.set_title('Meta-Trend Comparison', fontsize=14, fontweight='bold')
timestamps = plot_data['timestamp']
# Plot original meta-trend
if self.original_meta_trend is not None:
ax.plot(timestamps, self.original_meta_trend,
color='red', linewidth=2, alpha=0.7,
label='Original (with bug)', marker='o', markersize=2)
# Plot incremental meta-trend
if self.incremental_meta_trend:
ax.plot(timestamps, self.incremental_meta_trend,
color='green', linewidth=2, alpha=0.8,
label='Incremental (correct)', marker='s', markersize=2)
# Add horizontal lines for trend levels
ax.axhline(y=1, color='lightgreen', linestyle='--', alpha=0.5, label='Uptrend (+1)')
ax.axhline(y=0, color='gray', linestyle='-', alpha=0.5, label='Neutral (0)')
ax.axhline(y=-1, color='lightcoral', linestyle='--', alpha=0.5, label='Downtrend (-1)')
ax.set_ylabel('Meta-Trend Value')
ax.set_ylim(-1.5, 1.5)
ax.legend(loc='upper left', fontsize=10)
ax.grid(True, alpha=0.3)
# Format x-axis
ax.xaxis.set_major_formatter(mdates.DateFormatter('%m-%d %H:%M'))
ax.xaxis.set_major_locator(mdates.DayLocator(interval=1))
plt.setp(ax.xaxis.get_majorticklabels(), rotation=45)
def _plot_signal_timing(self, ax, plot_data):
"""Plot signal timing comparison."""
ax.set_title('Signal Timing Comparison', fontsize=14, fontweight='bold')
timestamps = plot_data['timestamp']
# Create signal arrays
original_entry = np.zeros(len(timestamps))
original_exit = np.zeros(len(timestamps))
inc_entry = np.zeros(len(timestamps))
inc_exit = np.zeros(len(timestamps))
# Fill signal arrays
for signal in self.original_signals:
if signal['index'] < len(timestamps):
if signal['signal_type'] == 'ENTRY':
original_entry[signal['index']] = 1
else:
original_exit[signal['index']] = -1
for signal in self.incremental_signals:
if signal['index'] < len(timestamps):
if signal['signal_type'] == 'ENTRY':
inc_entry[signal['index']] = 1
else:
inc_exit[signal['index']] = -1
# Plot signals as vertical lines and markers
y_positions = [2, 1]
labels = ['Original (with bug)', 'Incremental (correct)']
colors = ['red', 'green']
for i, (entry_signals, exit_signals, label, color) in enumerate(zip(
[original_entry, inc_entry],
[original_exit, inc_exit],
labels, colors
)):
y_pos = y_positions[i]
# Plot entry signals
entry_indices = np.where(entry_signals == 1)[0]
for idx in entry_indices:
ax.axvline(x=timestamps.iloc[idx], ymin=(y_pos-0.3)/3, ymax=(y_pos+0.3)/3,
color=color, linewidth=2, alpha=0.8)
ax.scatter(timestamps.iloc[idx], y_pos, marker='^', s=60, color=color, alpha=0.8)
# Plot exit signals
exit_indices = np.where(exit_signals == -1)[0]
for idx in exit_indices:
ax.axvline(x=timestamps.iloc[idx], ymin=(y_pos-0.3)/3, ymax=(y_pos+0.3)/3,
color=color, linewidth=2, alpha=0.8)
ax.scatter(timestamps.iloc[idx], y_pos, marker='v', s=60, color=color, alpha=0.8)
ax.set_yticks(y_positions)
ax.set_yticklabels(labels)
ax.set_ylabel('Strategy')
ax.set_ylim(0.5, 2.5)
ax.grid(True, alpha=0.3)
# Format x-axis
ax.xaxis.set_major_formatter(mdates.DateFormatter('%m-%d %H:%M'))
ax.xaxis.set_major_locator(mdates.DayLocator(interval=1))
plt.setp(ax.xaxis.get_majorticklabels(), rotation=45)
# Add legend
from matplotlib.lines import Line2D
legend_elements = [
Line2D([0], [0], marker='^', color='gray', linestyle='None', markersize=8, label='Entry Signal'),
Line2D([0], [0], marker='v', color='gray', linestyle='None', markersize=8, label='Exit Signal')
]
ax.legend(handles=legend_elements, loc='upper right', fontsize=10)
# Add signal count text
orig_entries = len([s for s in self.original_signals if s['signal_type'] == 'ENTRY'])
orig_exits = len([s for s in self.original_signals if s['signal_type'] == 'EXIT'])
inc_entries = len([s for s in self.incremental_signals if s['signal_type'] == 'ENTRY'])
inc_exits = len([s for s in self.incremental_signals if s['signal_type'] == 'EXIT'])
ax.text(0.02, 0.98, f'Original: {orig_entries} entries, {orig_exits} exits\nIncremental: {inc_entries} entries, {inc_exits} exits',
transform=ax.transAxes, fontsize=10, verticalalignment='top',
bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.8))
def main():
"""Create and display the original vs incremental comparison plot."""
plotter = OriginalVsIncrementalPlotter()
plotter.create_comparison_plot()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,534 @@
"""
Visual Signal Comparison Plot
This script creates comprehensive plots comparing:
1. Price data with signals overlaid
2. Meta-trend values over time
3. Individual Supertrend indicators
4. Signal timing comparison
Shows both original (buggy and fixed) and incremental strategies.
"""
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from matplotlib.patches import Rectangle
import seaborn as sns
import logging
from typing import Dict, List, Tuple
import os
import sys
# Add project root to path
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from cycles.strategies.default_strategy import DefaultStrategy
from cycles.IncStrategies.metatrend_strategy import IncMetaTrendStrategy
from cycles.IncStrategies.indicators.supertrend import SupertrendCollection
from cycles.utils.storage import Storage
from cycles.strategies.base import StrategySignal
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Set style for better plots
plt.style.use('seaborn-v0_8')
sns.set_palette("husl")
class FixedDefaultStrategy(DefaultStrategy):
"""DefaultStrategy with the exit condition bug fixed."""
def get_exit_signal(self, backtester, df_index: int) -> StrategySignal:
"""Generate exit signal with CORRECTED logic."""
if not self.initialized:
return StrategySignal("HOLD", 0.0)
if df_index < 1:
return StrategySignal("HOLD", 0.0)
# Check bounds
if not hasattr(self, 'meta_trend') or df_index >= len(self.meta_trend):
return StrategySignal("HOLD", 0.0)
# Check for meta-trend exit signal (CORRECTED LOGIC)
prev_trend = self.meta_trend[df_index - 1]
curr_trend = self.meta_trend[df_index]
# FIXED: Check if prev_trend != -1 (not prev_trend != 1)
if prev_trend != -1 and curr_trend == -1:
return StrategySignal("EXIT", confidence=1.0,
metadata={"type": "META_TREND_EXIT_SIGNAL"})
return StrategySignal("HOLD", confidence=0.0)
class SignalPlotter:
"""Class to create comprehensive signal comparison plots."""
def __init__(self):
"""Initialize the plotter."""
self.storage = Storage(logging=logger)
self.test_data = None
self.original_signals = []
self.fixed_original_signals = []
self.incremental_signals = []
self.original_meta_trend = None
self.fixed_original_meta_trend = None
self.incremental_meta_trend = []
self.individual_trends = []
def load_and_prepare_data(self, limit: int = 1000) -> pd.DataFrame:
"""Load test data and prepare all strategy results."""
logger.info(f"Loading and preparing data (limit: {limit} points)")
try:
# Load recent data
filename = "btcusd_1-min_data.csv"
start_date = pd.to_datetime("2024-12-31")
end_date = pd.to_datetime("2025-01-01")
df = self.storage.load_data(filename, start_date, end_date)
if len(df) > limit:
df = df.tail(limit)
logger.info(f"Limited data to last {limit} points")
# Reset index to get timestamp as column
df_with_timestamp = df.reset_index()
self.test_data = df_with_timestamp
logger.info(f"Loaded {len(df_with_timestamp)} data points")
logger.info(f"Date range: {df_with_timestamp['timestamp'].min()} to {df_with_timestamp['timestamp'].max()}")
return df_with_timestamp
except Exception as e:
logger.error(f"Failed to load test data: {e}")
raise
def run_original_strategy(self, use_fixed: bool = False) -> Tuple[List[Dict], np.ndarray]:
"""Run original strategy and extract signals and meta-trend."""
strategy_name = "FIXED Original" if use_fixed else "Original (Buggy)"
logger.info(f"Running {strategy_name} DefaultStrategy...")
# Create indexed DataFrame for original strategy
indexed_data = self.test_data.set_index('timestamp')
# Limit to 200 points like original strategy does
if len(indexed_data) > 200:
original_data_used = indexed_data.tail(200)
data_start_index = len(self.test_data) - 200
else:
original_data_used = indexed_data
data_start_index = 0
# Create mock backtester
class MockBacktester:
def __init__(self, df):
self.original_df = df
self.min1_df = df
self.strategies = {}
backtester = MockBacktester(original_data_used)
# Initialize strategy (fixed or original)
if use_fixed:
strategy = FixedDefaultStrategy(weight=1.0, params={
"stop_loss_pct": 0.03,
"timeframe": "1min"
})
else:
strategy = DefaultStrategy(weight=1.0, params={
"stop_loss_pct": 0.03,
"timeframe": "1min"
})
strategy.initialize(backtester)
# Extract signals and meta-trend
signals = []
meta_trend = strategy.meta_trend
for i in range(len(original_data_used)):
# Get entry signal
entry_signal = strategy.get_entry_signal(backtester, i)
if entry_signal.signal_type == "ENTRY":
signals.append({
'index': i,
'global_index': data_start_index + i,
'timestamp': original_data_used.index[i],
'close': original_data_used.iloc[i]['close'],
'signal_type': 'ENTRY',
'confidence': entry_signal.confidence,
'source': 'fixed_original' if use_fixed else 'original'
})
# Get exit signal
exit_signal = strategy.get_exit_signal(backtester, i)
if exit_signal.signal_type == "EXIT":
signals.append({
'index': i,
'global_index': data_start_index + i,
'timestamp': original_data_used.index[i],
'close': original_data_used.iloc[i]['close'],
'signal_type': 'EXIT',
'confidence': exit_signal.confidence,
'source': 'fixed_original' if use_fixed else 'original'
})
logger.info(f"{strategy_name} generated {len(signals)} signals")
return signals, meta_trend, data_start_index
def run_incremental_strategy(self, data_start_index: int = 0) -> Tuple[List[Dict], List[int], List[List[int]]]:
"""Run incremental strategy and extract signals, meta-trend, and individual trends."""
logger.info("Running Incremental IncMetaTrendStrategy...")
# Create strategy instance
strategy = IncMetaTrendStrategy("metatrend", weight=1.0, params={
"timeframe": "1min",
"enable_logging": False
})
# Determine data range to match original strategy
if len(self.test_data) > 200:
test_data_subset = self.test_data.tail(200)
else:
test_data_subset = self.test_data
# Process data incrementally and collect signals
signals = []
meta_trends = []
individual_trends_list = []
for idx, (_, row) in enumerate(test_data_subset.iterrows()):
ohlc = {
'open': row['open'],
'high': row['high'],
'low': row['low'],
'close': row['close']
}
# Update strategy with new data point
strategy.calculate_on_data(ohlc, row['timestamp'])
# Get current meta-trend and individual trends
current_meta_trend = strategy.get_current_meta_trend()
meta_trends.append(current_meta_trend)
# Get individual Supertrend states
individual_states = strategy.get_individual_supertrend_states()
if individual_states and len(individual_states) >= 3:
individual_trends = [state.get('current_trend', 0) for state in individual_states]
else:
individual_trends = [0, 0, 0] # Default if not available
individual_trends_list.append(individual_trends)
# Check for entry signal
entry_signal = strategy.get_entry_signal()
if entry_signal.signal_type == "ENTRY":
signals.append({
'index': idx,
'global_index': data_start_index + idx,
'timestamp': row['timestamp'],
'close': row['close'],
'signal_type': 'ENTRY',
'confidence': entry_signal.confidence,
'source': 'incremental'
})
# Check for exit signal
exit_signal = strategy.get_exit_signal()
if exit_signal.signal_type == "EXIT":
signals.append({
'index': idx,
'global_index': data_start_index + idx,
'timestamp': row['timestamp'],
'close': row['close'],
'signal_type': 'EXIT',
'confidence': exit_signal.confidence,
'source': 'incremental'
})
logger.info(f"Incremental strategy generated {len(signals)} signals")
return signals, meta_trends, individual_trends_list
def create_comprehensive_plot(self, save_path: str = "results/signal_comparison_plot.png"):
"""Create comprehensive comparison plot."""
logger.info("Creating comprehensive comparison plot...")
# Load and prepare data
self.load_and_prepare_data(limit=2000)
# Run all strategies
self.original_signals, self.original_meta_trend, data_start_index = self.run_original_strategy(use_fixed=False)
self.fixed_original_signals, self.fixed_original_meta_trend, _ = self.run_original_strategy(use_fixed=True)
self.incremental_signals, self.incremental_meta_trend, self.individual_trends = self.run_incremental_strategy(data_start_index)
# Prepare data for plotting
if len(self.test_data) > 200:
plot_data = self.test_data.tail(200).copy()
else:
plot_data = self.test_data.copy()
plot_data['timestamp'] = pd.to_datetime(plot_data['timestamp'])
# Create figure with subplots
fig, axes = plt.subplots(4, 1, figsize=(16, 20))
fig.suptitle('MetaTrend Strategy Signal Comparison', fontsize=16, fontweight='bold')
# Plot 1: Price with signals
self._plot_price_with_signals(axes[0], plot_data)
# Plot 2: Meta-trend comparison
self._plot_meta_trends(axes[1], plot_data)
# Plot 3: Individual Supertrend indicators
self._plot_individual_supertrends(axes[2], plot_data)
# Plot 4: Signal timing comparison
self._plot_signal_timing(axes[3], plot_data)
# Adjust layout and save
plt.tight_layout()
os.makedirs("results", exist_ok=True)
plt.savefig(save_path, dpi=300, bbox_inches='tight')
logger.info(f"Plot saved to {save_path}")
plt.show()
def _plot_price_with_signals(self, ax, plot_data):
"""Plot price data with signals overlaid."""
ax.set_title('Price Chart with Trading Signals', fontsize=14, fontweight='bold')
# Plot price
ax.plot(plot_data['timestamp'], plot_data['close'],
color='black', linewidth=1, label='BTC Price', alpha=0.8)
# Plot signals
signal_colors = {
'original': {'ENTRY': 'red', 'EXIT': 'darkred'},
'fixed_original': {'ENTRY': 'blue', 'EXIT': 'darkblue'},
'incremental': {'ENTRY': 'green', 'EXIT': 'darkgreen'}
}
signal_markers = {'ENTRY': '^', 'EXIT': 'v'}
signal_sizes = {'ENTRY': 100, 'EXIT': 80}
# Plot original signals
for signal in self.original_signals:
if signal['index'] < len(plot_data):
timestamp = plot_data.iloc[signal['index']]['timestamp']
price = signal['close']
ax.scatter(timestamp, price,
c=signal_colors['original'][signal['signal_type']],
marker=signal_markers[signal['signal_type']],
s=signal_sizes[signal['signal_type']],
alpha=0.7,
label=f"Original {signal['signal_type']}" if signal == self.original_signals[0] else "")
# Plot fixed original signals
for signal in self.fixed_original_signals:
if signal['index'] < len(plot_data):
timestamp = plot_data.iloc[signal['index']]['timestamp']
price = signal['close']
ax.scatter(timestamp, price,
c=signal_colors['fixed_original'][signal['signal_type']],
marker=signal_markers[signal['signal_type']],
s=signal_sizes[signal['signal_type']],
alpha=0.7, edgecolors='white', linewidth=1,
label=f"Fixed {signal['signal_type']}" if signal == self.fixed_original_signals[0] else "")
# Plot incremental signals
for signal in self.incremental_signals:
if signal['index'] < len(plot_data):
timestamp = plot_data.iloc[signal['index']]['timestamp']
price = signal['close']
ax.scatter(timestamp, price,
c=signal_colors['incremental'][signal['signal_type']],
marker=signal_markers[signal['signal_type']],
s=signal_sizes[signal['signal_type']],
alpha=0.8, edgecolors='black', linewidth=0.5,
label=f"Incremental {signal['signal_type']}" if signal == self.incremental_signals[0] else "")
ax.set_ylabel('Price (USD)')
ax.legend(loc='upper left', fontsize=10)
ax.grid(True, alpha=0.3)
# Format x-axis
ax.xaxis.set_major_formatter(mdates.DateFormatter('%H:%M'))
ax.xaxis.set_major_locator(mdates.HourLocator(interval=2))
plt.setp(ax.xaxis.get_majorticklabels(), rotation=45)
def _plot_meta_trends(self, ax, plot_data):
"""Plot meta-trend comparison."""
ax.set_title('Meta-Trend Comparison', fontsize=14, fontweight='bold')
timestamps = plot_data['timestamp']
# Plot original meta-trend
if self.original_meta_trend is not None:
ax.plot(timestamps, self.original_meta_trend,
color='red', linewidth=2, alpha=0.7,
label='Original (Buggy)', marker='o', markersize=3)
# Plot fixed original meta-trend
if self.fixed_original_meta_trend is not None:
ax.plot(timestamps, self.fixed_original_meta_trend,
color='blue', linewidth=2, alpha=0.7,
label='Fixed Original', marker='s', markersize=3)
# Plot incremental meta-trend
if self.incremental_meta_trend:
ax.plot(timestamps, self.incremental_meta_trend,
color='green', linewidth=2, alpha=0.8,
label='Incremental', marker='D', markersize=3)
# Add horizontal lines for trend levels
ax.axhline(y=1, color='lightgreen', linestyle='--', alpha=0.5, label='Uptrend')
ax.axhline(y=0, color='gray', linestyle='-', alpha=0.5, label='Neutral')
ax.axhline(y=-1, color='lightcoral', linestyle='--', alpha=0.5, label='Downtrend')
ax.set_ylabel('Meta-Trend Value')
ax.set_ylim(-1.5, 1.5)
ax.legend(loc='upper left', fontsize=10)
ax.grid(True, alpha=0.3)
# Format x-axis
ax.xaxis.set_major_formatter(mdates.DateFormatter('%H:%M'))
ax.xaxis.set_major_locator(mdates.HourLocator(interval=2))
plt.setp(ax.xaxis.get_majorticklabels(), rotation=45)
def _plot_individual_supertrends(self, ax, plot_data):
"""Plot individual Supertrend indicators."""
ax.set_title('Individual Supertrend Indicators (Incremental)', fontsize=14, fontweight='bold')
if not self.individual_trends:
ax.text(0.5, 0.5, 'No individual trend data available',
transform=ax.transAxes, ha='center', va='center')
return
timestamps = plot_data['timestamp']
individual_trends_array = np.array(self.individual_trends)
# Plot each Supertrend
supertrend_configs = [(12, 3.0), (10, 1.0), (11, 2.0)]
colors = ['purple', 'orange', 'brown']
for i, (period, multiplier) in enumerate(supertrend_configs):
if i < individual_trends_array.shape[1]:
ax.plot(timestamps, individual_trends_array[:, i],
color=colors[i], linewidth=1.5, alpha=0.8,
label=f'ST{i+1} (P={period}, M={multiplier})',
marker='o', markersize=2)
# Add horizontal lines for trend levels
ax.axhline(y=1, color='lightgreen', linestyle='--', alpha=0.5)
ax.axhline(y=0, color='gray', linestyle='-', alpha=0.5)
ax.axhline(y=-1, color='lightcoral', linestyle='--', alpha=0.5)
ax.set_ylabel('Supertrend Value')
ax.set_ylim(-1.5, 1.5)
ax.legend(loc='upper left', fontsize=10)
ax.grid(True, alpha=0.3)
# Format x-axis
ax.xaxis.set_major_formatter(mdates.DateFormatter('%H:%M'))
ax.xaxis.set_major_locator(mdates.HourLocator(interval=2))
plt.setp(ax.xaxis.get_majorticklabels(), rotation=45)
def _plot_signal_timing(self, ax, plot_data):
"""Plot signal timing comparison."""
ax.set_title('Signal Timing Comparison', fontsize=14, fontweight='bold')
timestamps = plot_data['timestamp']
# Create signal arrays
original_entry = np.zeros(len(timestamps))
original_exit = np.zeros(len(timestamps))
fixed_entry = np.zeros(len(timestamps))
fixed_exit = np.zeros(len(timestamps))
inc_entry = np.zeros(len(timestamps))
inc_exit = np.zeros(len(timestamps))
# Fill signal arrays
for signal in self.original_signals:
if signal['index'] < len(timestamps):
if signal['signal_type'] == 'ENTRY':
original_entry[signal['index']] = 1
else:
original_exit[signal['index']] = -1
for signal in self.fixed_original_signals:
if signal['index'] < len(timestamps):
if signal['signal_type'] == 'ENTRY':
fixed_entry[signal['index']] = 1
else:
fixed_exit[signal['index']] = -1
for signal in self.incremental_signals:
if signal['index'] < len(timestamps):
if signal['signal_type'] == 'ENTRY':
inc_entry[signal['index']] = 1
else:
inc_exit[signal['index']] = -1
# Plot signals as vertical lines
y_positions = [3, 2, 1]
labels = ['Original (Buggy)', 'Fixed Original', 'Incremental']
colors = ['red', 'blue', 'green']
for i, (entry_signals, exit_signals, label, color) in enumerate(zip(
[original_entry, fixed_entry, inc_entry],
[original_exit, fixed_exit, inc_exit],
labels, colors
)):
y_pos = y_positions[i]
# Plot entry signals
entry_indices = np.where(entry_signals == 1)[0]
for idx in entry_indices:
ax.axvline(x=timestamps.iloc[idx], ymin=(y_pos-0.4)/4, ymax=(y_pos+0.4)/4,
color=color, linewidth=3, alpha=0.8)
ax.scatter(timestamps.iloc[idx], y_pos, marker='^', s=50, color=color, alpha=0.8)
# Plot exit signals
exit_indices = np.where(exit_signals == -1)[0]
for idx in exit_indices:
ax.axvline(x=timestamps.iloc[idx], ymin=(y_pos-0.4)/4, ymax=(y_pos+0.4)/4,
color=color, linewidth=3, alpha=0.8)
ax.scatter(timestamps.iloc[idx], y_pos, marker='v', s=50, color=color, alpha=0.8)
ax.set_yticks(y_positions)
ax.set_yticklabels(labels)
ax.set_ylabel('Strategy')
ax.set_ylim(0.5, 3.5)
ax.grid(True, alpha=0.3)
# Format x-axis
ax.xaxis.set_major_formatter(mdates.DateFormatter('%H:%M'))
ax.xaxis.set_major_locator(mdates.HourLocator(interval=2))
plt.setp(ax.xaxis.get_majorticklabels(), rotation=45)
# Add legend
from matplotlib.lines import Line2D
legend_elements = [
Line2D([0], [0], marker='^', color='gray', linestyle='None', markersize=8, label='Entry Signal'),
Line2D([0], [0], marker='v', color='gray', linestyle='None', markersize=8, label='Exit Signal')
]
ax.legend(handles=legend_elements, loc='upper right', fontsize=10)
def main():
"""Create and display the comprehensive signal comparison plot."""
plotter = SignalPlotter()
plotter.create_comprehensive_plot()
if __name__ == "__main__":
main()

161
test/test_bbrsi.py Normal file
View File

@@ -0,0 +1,161 @@
import logging
import seaborn as sns
import matplotlib.pyplot as plt
import pandas as pd
import datetime
from cycles.utils.storage import Storage
from cycles.Analysis.strategies import Strategy
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.FileHandler("backtest.log"),
logging.StreamHandler()
]
)
config = {
"start_date": "2025-03-01",
"stop_date": datetime.datetime.today().strftime('%Y-%m-%d'),
"data_file": "btcusd_1-min_data.csv"
}
config_strategy = {
"bb_width": 0.05,
"bb_period": 20,
"rsi_period": 14,
"trending": {
"rsi_threshold": [30, 70],
"bb_std_dev_multiplier": 2.5,
},
"sideways": {
"rsi_threshold": [40, 60],
"bb_std_dev_multiplier": 1.8,
},
"strategy_name": "MarketRegimeStrategy", # CryptoTradingStrategy
"SqueezeStrategy": True
}
IS_DAY = False
if __name__ == "__main__":
# Load data
storage = Storage(logging=logging)
data = storage.load_data(config["data_file"], config["start_date"], config["stop_date"])
# Run strategy
strategy = Strategy(config=config_strategy, logging=logging)
processed_data = strategy.run(data.copy(), config_strategy["strategy_name"])
# Get buy and sell signals
buy_condition = processed_data.get('BuySignal', pd.Series(False, index=processed_data.index)).astype(bool)
sell_condition = processed_data.get('SellSignal', pd.Series(False, index=processed_data.index)).astype(bool)
buy_signals = processed_data[buy_condition]
sell_signals = processed_data[sell_condition]
# Plot the data with seaborn library
if processed_data is not None and not processed_data.empty:
# Create a figure with two subplots, sharing the x-axis
fig, (ax1, ax2, ax3) = plt.subplots(3, 1, figsize=(16, 8), sharex=True)
strategy_name = config_strategy["strategy_name"]
# Plot 1: Close Price and Strategy-Specific Bands/Levels
sns.lineplot(x=processed_data.index, y='close', data=processed_data, label='Close Price', ax=ax1)
# Use standardized column names for bands
if 'UpperBand' in processed_data.columns and 'LowerBand' in processed_data.columns:
# Instead of lines, shade the area between upper and lower bands
ax1.fill_between(processed_data.index,
processed_data['LowerBand'],
processed_data['UpperBand'],
alpha=0.1, color='blue', label='Bollinger Bands')
else:
logging.warning(f"{strategy_name}: UpperBand or LowerBand not found for plotting.")
# Add strategy-specific extra indicators if available
if strategy_name == "CryptoTradingStrategy":
if 'StopLoss' in processed_data.columns:
sns.lineplot(x=processed_data.index, y='StopLoss', data=processed_data, label='Stop Loss', ax=ax1, linestyle='--', color='orange')
if 'TakeProfit' in processed_data.columns:
sns.lineplot(x=processed_data.index, y='TakeProfit', data=processed_data, label='Take Profit', ax=ax1, linestyle='--', color='purple')
# Plot Buy/Sell signals on Price chart
if not buy_signals.empty:
ax1.scatter(buy_signals.index, buy_signals['close'], color='green', marker='o', s=20, label='Buy Signal', zorder=5)
if not sell_signals.empty:
ax1.scatter(sell_signals.index, sell_signals['close'], color='red', marker='o', s=20, label='Sell Signal', zorder=5)
ax1.set_title(f'Price and Signals ({strategy_name})')
ax1.set_ylabel('Price')
ax1.legend()
ax1.grid(True)
# Plot 2: RSI and Strategy-Specific Thresholds
if 'RSI' in processed_data.columns:
sns.lineplot(x=processed_data.index, y='RSI', data=processed_data, label=f'RSI (' + str(config_strategy.get("rsi_period", 14)) + ')', ax=ax2, color='purple')
if strategy_name == "MarketRegimeStrategy":
# Get threshold values
upper_threshold = config_strategy.get("trending", {}).get("rsi_threshold", [30,70])[1]
lower_threshold = config_strategy.get("trending", {}).get("rsi_threshold", [30,70])[0]
# Shade overbought area (upper)
ax2.fill_between(processed_data.index, upper_threshold, 100,
alpha=0.1, color='red', label=f'Overbought (>{upper_threshold})')
# Shade oversold area (lower)
ax2.fill_between(processed_data.index, 0, lower_threshold,
alpha=0.1, color='green', label=f'Oversold (<{lower_threshold})')
elif strategy_name == "CryptoTradingStrategy":
# Shade overbought area (upper)
ax2.fill_between(processed_data.index, 65, 100,
alpha=0.1, color='red', label='Overbought (>65)')
# Shade oversold area (lower)
ax2.fill_between(processed_data.index, 0, 35,
alpha=0.1, color='green', label='Oversold (<35)')
# Plot Buy/Sell signals on RSI chart
if not buy_signals.empty and 'RSI' in buy_signals.columns:
ax2.scatter(buy_signals.index, buy_signals['RSI'], color='green', marker='o', s=20, label='Buy Signal (RSI)', zorder=5)
if not sell_signals.empty and 'RSI' in sell_signals.columns:
ax2.scatter(sell_signals.index, sell_signals['RSI'], color='red', marker='o', s=20, label='Sell Signal (RSI)', zorder=5)
ax2.set_title('Relative Strength Index (RSI) with Signals')
ax2.set_ylabel('RSI Value')
ax2.set_ylim(0, 100)
ax2.legend()
ax2.grid(True)
else:
logging.info("RSI data not available for plotting.")
# Plot 3: Strategy-Specific Indicators
ax3.clear() # Clear previous plot content if any
if 'BBWidth' in processed_data.columns:
sns.lineplot(x=processed_data.index, y='BBWidth', data=processed_data, label='BB Width', ax=ax3)
if strategy_name == "MarketRegimeStrategy":
if 'MarketRegime' in processed_data.columns:
sns.lineplot(x=processed_data.index, y='MarketRegime', data=processed_data, label='Market Regime (Sideways: 1, Trending: 0)', ax=ax3)
ax3.set_title('Bollinger Bands Width & Market Regime')
ax3.set_ylabel('Value')
elif strategy_name == "CryptoTradingStrategy":
if 'VolumeMA' in processed_data.columns:
sns.lineplot(x=processed_data.index, y='VolumeMA', data=processed_data, label='Volume MA', ax=ax3)
if 'volume' in processed_data.columns:
sns.lineplot(x=processed_data.index, y='volume', data=processed_data, label='Volume', ax=ax3, alpha=0.5)
ax3.set_title('Volume Analysis')
ax3.set_ylabel('Volume')
ax3.legend()
ax3.grid(True)
plt.xlabel('Date')
fig.tight_layout()
plt.show()
else:
logging.info("No data to plot.")

View File

@@ -0,0 +1,960 @@
"""
MetaTrend Strategy Comparison Test
This test verifies that our incremental indicators produce identical results
to the original DefaultStrategy (metatrend strategy) implementation.
The test compares:
1. Individual Supertrend indicators (3 different parameter sets)
2. Meta-trend calculation (agreement between all 3 Supertrends)
3. Entry/exit signal generation
4. Overall strategy behavior
Test ensures our incremental implementation is mathematically equivalent
to the original batch calculation approach.
"""
import pandas as pd
import numpy as np
import logging
from typing import Dict, List, Tuple
import os
import sys
# Add project root to path
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from cycles.strategies.default_strategy import DefaultStrategy
from cycles.IncStrategies.indicators.supertrend import SupertrendState, SupertrendCollection
from cycles.Analysis.supertrend import Supertrends
from cycles.backtest import Backtest
from cycles.utils.storage import Storage
from cycles.IncStrategies.metatrend_strategy import IncMetaTrendStrategy
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class MetaTrendComparisonTest:
"""
Comprehensive test suite for comparing original and incremental MetaTrend implementations.
"""
def __init__(self):
"""Initialize the test suite."""
self.test_data = None
self.original_results = None
self.incremental_results = None
self.incremental_strategy_results = None
self.storage = Storage(logging=logger)
# Supertrend parameters from original implementation
self.supertrend_params = [
{"period": 12, "multiplier": 3.0},
{"period": 10, "multiplier": 1.0},
{"period": 11, "multiplier": 2.0}
]
def load_test_data(self, symbol: str = "BTCUSD", start_date: str = "2022-01-01", end_date: str = "2023-01-01", limit: int = None) -> pd.DataFrame:
"""
Load test data for comparison using the Storage class.
Args:
symbol: Trading symbol to load (used for filename)
start_date: Start date in YYYY-MM-DD format
end_date: End date in YYYY-MM-DD format
limit: Optional limit on number of data points (applied after date filtering)
Returns:
DataFrame with OHLCV data
"""
logger.info(f"Loading test data for {symbol} from {start_date} to {end_date}")
try:
# Use the Storage class to load data with date filtering
filename = "btcusd_1-min_data.csv"
# Convert date strings to pandas datetime
start_dt = pd.to_datetime(start_date)
end_dt = pd.to_datetime(end_date)
# Load data using Storage class
df = self.storage.load_data(filename, start_dt, end_dt)
if df.empty:
raise ValueError(f"No data found for the specified date range: {start_date} to {end_date}")
logger.info(f"Loaded {len(df)} data points from {start_date} to {end_date}")
logger.info(f"Date range in data: {df.index.min()} to {df.index.max()}")
# Apply limit if specified
if limit is not None and len(df) > limit:
df = df.tail(limit)
logger.info(f"Limited data to last {limit} points")
# Ensure required columns (Storage class should handle column name conversion)
required_cols = ['open', 'high', 'low', 'close', 'volume']
for col in required_cols:
if col not in df.columns:
if col == 'volume':
df['volume'] = 1000.0 # Default volume
else:
raise ValueError(f"Missing required column: {col}")
# Reset index to get timestamp as column for incremental processing
df_with_timestamp = df.reset_index()
self.test_data = df_with_timestamp
logger.info(f"Test data prepared: {len(df_with_timestamp)} rows")
logger.info(f"Columns: {list(df_with_timestamp.columns)}")
logger.info(f"Sample data:\n{df_with_timestamp.head()}")
return df_with_timestamp
except Exception as e:
logger.error(f"Failed to load test data: {e}")
import traceback
traceback.print_exc()
# Fallback to synthetic data if real data loading fails
logger.warning("Falling back to synthetic data generation")
df = self._generate_synthetic_data(limit or 1000)
df_with_timestamp = df.reset_index()
self.test_data = df_with_timestamp
return df_with_timestamp
def _generate_synthetic_data(self, length: int) -> pd.DataFrame:
"""Generate synthetic OHLCV data for testing."""
logger.info(f"Generating {length} synthetic data points")
np.random.seed(42) # For reproducible results
# Generate price series with trend and noise
base_price = 50000.0
trend = np.linspace(0, 0.1, length) # Slight upward trend
noise = np.random.normal(0, 0.02, length) # 2% volatility
close_prices = base_price * (1 + trend + noise.cumsum() * 0.1)
# Generate OHLC from close prices
data = []
timestamps = pd.date_range(start='2024-01-01', periods=length, freq='1min')
for i in range(length):
close = close_prices[i]
volatility = close * 0.01 # 1% intraday volatility
high = close + np.random.uniform(0, volatility)
low = close - np.random.uniform(0, volatility)
open_price = low + np.random.uniform(0, high - low)
# Ensure OHLC relationships
high = max(high, open_price, close)
low = min(low, open_price, close)
data.append({
'timestamp': timestamps[i],
'open': open_price,
'high': high,
'low': low,
'close': close,
'volume': np.random.uniform(100, 1000)
})
df = pd.DataFrame(data)
# Set timestamp as index for compatibility with original strategy
df.set_index('timestamp', inplace=True)
return df
def test_original_strategy(self) -> Dict:
"""
Test the original DefaultStrategy implementation.
Returns:
Dictionary with original strategy results
"""
logger.info("Testing original DefaultStrategy implementation...")
try:
# Create indexed DataFrame for original strategy (needs DatetimeIndex)
indexed_data = self.test_data.set_index('timestamp')
# The original strategy limits data to 200 points for performance
# We need to account for this in our comparison
if len(indexed_data) > 200:
original_data_used = indexed_data.tail(200)
logger.info(f"Original strategy will use last {len(original_data_used)} points of {len(indexed_data)} total points")
else:
original_data_used = indexed_data
# Create a minimal backtest instance for strategy initialization
class MockBacktester:
def __init__(self, df):
self.original_df = df
self.min1_df = df
self.strategies = {}
backtester = MockBacktester(original_data_used)
# Initialize original strategy
strategy = DefaultStrategy(weight=1.0, params={
"stop_loss_pct": 0.03,
"timeframe": "1min" # Use 1min since our test data is 1min
})
# Initialize strategy (this calculates meta-trend)
strategy.initialize(backtester)
# Extract results
if hasattr(strategy, 'meta_trend') and strategy.meta_trend is not None:
meta_trend = strategy.meta_trend
trends = None # Individual trends not directly available from strategy
else:
# Fallback: calculate manually using original Supertrends class
logger.info("Strategy meta_trend not available, calculating manually...")
supertrends = Supertrends(original_data_used, verbose=False)
supertrend_results_list = supertrends.calculate_supertrend_indicators()
# Extract trend arrays
trends = [st['results']['trend'] for st in supertrend_results_list]
trends_arr = np.stack(trends, axis=1)
# Calculate meta-trend
meta_trend = np.where(
(trends_arr[:,0] == trends_arr[:,1]) & (trends_arr[:,1] == trends_arr[:,2]),
trends_arr[:,0],
0
)
# Generate signals
entry_signals = []
exit_signals = []
for i in range(1, len(meta_trend)):
# Entry signal: meta-trend changes from != 1 to == 1
if meta_trend[i-1] != 1 and meta_trend[i] == 1:
entry_signals.append(i)
# Exit signal: meta-trend changes to -1
if meta_trend[i-1] != -1 and meta_trend[i] == -1:
exit_signals.append(i)
self.original_results = {
'meta_trend': meta_trend,
'entry_signals': entry_signals,
'exit_signals': exit_signals,
'individual_trends': trends,
'data_start_index': len(self.test_data) - len(original_data_used) # Track where original data starts
}
logger.info(f"Original strategy: {len(entry_signals)} entry signals, {len(exit_signals)} exit signals")
logger.info(f"Meta-trend length: {len(meta_trend)}, unique values: {np.unique(meta_trend)}")
return self.original_results
except Exception as e:
logger.error(f"Original strategy test failed: {e}")
import traceback
traceback.print_exc()
raise
def test_incremental_indicators(self) -> Dict:
"""
Test the incremental indicators implementation.
Returns:
Dictionary with incremental results
"""
logger.info("Testing incremental indicators implementation...")
try:
# Create SupertrendCollection with same parameters as original
supertrend_configs = [
(params["period"], params["multiplier"])
for params in self.supertrend_params
]
collection = SupertrendCollection(supertrend_configs)
# Determine data range to match original strategy
data_start_index = self.original_results.get('data_start_index', 0)
test_data_subset = self.test_data.iloc[data_start_index:]
logger.info(f"Processing incremental indicators on {len(test_data_subset)} points (starting from index {data_start_index})")
# Process data incrementally
meta_trends = []
individual_trends_list = []
for _, row in test_data_subset.iterrows():
ohlc = {
'open': row['open'],
'high': row['high'],
'low': row['low'],
'close': row['close']
}
result = collection.update(ohlc)
meta_trends.append(result['meta_trend'])
individual_trends_list.append(result['trends'])
meta_trend = np.array(meta_trends)
individual_trends = np.array(individual_trends_list)
# Generate signals
entry_signals = []
exit_signals = []
for i in range(1, len(meta_trend)):
# Entry signal: meta-trend changes from != 1 to == 1
if meta_trend[i-1] != 1 and meta_trend[i] == 1:
entry_signals.append(i)
# Exit signal: meta-trend changes to -1
if meta_trend[i-1] != -1 and meta_trend[i] == -1:
exit_signals.append(i)
self.incremental_results = {
'meta_trend': meta_trend,
'entry_signals': entry_signals,
'exit_signals': exit_signals,
'individual_trends': individual_trends
}
logger.info(f"Incremental indicators: {len(entry_signals)} entry signals, {len(exit_signals)} exit signals")
return self.incremental_results
except Exception as e:
logger.error(f"Incremental indicators test failed: {e}")
raise
def test_incremental_strategy(self) -> Dict:
"""
Test the new IncMetaTrendStrategy implementation.
Returns:
Dictionary with incremental strategy results
"""
logger.info("Testing IncMetaTrendStrategy implementation...")
try:
# Create strategy instance
strategy = IncMetaTrendStrategy("metatrend", weight=1.0, params={
"timeframe": "1min", # Use 1min since our test data is 1min
"enable_logging": False # Disable logging for cleaner test output
})
# Determine data range to match original strategy
data_start_index = self.original_results.get('data_start_index', 0)
test_data_subset = self.test_data.iloc[data_start_index:]
logger.info(f"Processing IncMetaTrendStrategy on {len(test_data_subset)} points (starting from index {data_start_index})")
# Process data incrementally
meta_trends = []
individual_trends_list = []
entry_signals = []
exit_signals = []
for idx, row in test_data_subset.iterrows():
ohlc = {
'open': row['open'],
'high': row['high'],
'low': row['low'],
'close': row['close']
}
# Update strategy with new data point
strategy.calculate_on_data(ohlc, row['timestamp'])
# Get current meta-trend and individual trends
current_meta_trend = strategy.get_current_meta_trend()
meta_trends.append(current_meta_trend)
# Get individual Supertrend states
individual_states = strategy.get_individual_supertrend_states()
if individual_states and len(individual_states) >= 3:
individual_trends = [state.get('current_trend', 0) for state in individual_states]
else:
# Fallback: extract from collection state
collection_state = strategy.supertrend_collection.get_state_summary()
if 'supertrends' in collection_state:
individual_trends = [st.get('current_trend', 0) for st in collection_state['supertrends']]
else:
individual_trends = [0, 0, 0] # Default if not available
individual_trends_list.append(individual_trends)
# Check for signals
entry_signal = strategy.get_entry_signal()
exit_signal = strategy.get_exit_signal()
if entry_signal.signal_type == "ENTRY":
entry_signals.append(len(meta_trends) - 1) # Current index
if exit_signal.signal_type == "EXIT":
exit_signals.append(len(meta_trends) - 1) # Current index
meta_trend = np.array(meta_trends)
individual_trends = np.array(individual_trends_list)
self.incremental_strategy_results = {
'meta_trend': meta_trend,
'entry_signals': entry_signals,
'exit_signals': exit_signals,
'individual_trends': individual_trends,
'strategy_state': strategy.get_current_state_summary()
}
logger.info(f"IncMetaTrendStrategy: {len(entry_signals)} entry signals, {len(exit_signals)} exit signals")
logger.info(f"Strategy state: warmed_up={strategy.is_warmed_up}, updates={strategy._update_count}")
return self.incremental_strategy_results
except Exception as e:
logger.error(f"IncMetaTrendStrategy test failed: {e}")
import traceback
traceback.print_exc()
raise
def compare_results(self) -> Dict[str, bool]:
"""
Compare original, incremental indicators, and incremental strategy results.
Returns:
Dictionary with comparison results
"""
logger.info("Comparing original vs incremental results...")
if self.original_results is None or self.incremental_results is None:
raise ValueError("Must run both tests before comparison")
comparison = {}
# Compare meta-trend arrays (Original vs SupertrendCollection)
orig_meta = self.original_results['meta_trend']
inc_meta = self.incremental_results['meta_trend']
# Handle length differences (original might be shorter due to initialization)
min_length = min(len(orig_meta), len(inc_meta))
orig_meta_trimmed = orig_meta[-min_length:]
inc_meta_trimmed = inc_meta[-min_length:]
meta_trend_match = np.array_equal(orig_meta_trimmed, inc_meta_trimmed)
comparison['meta_trend_match'] = meta_trend_match
if not meta_trend_match:
# Find differences
diff_indices = np.where(orig_meta_trimmed != inc_meta_trimmed)[0]
logger.warning(f"Meta-trend differences at indices: {diff_indices[:10]}...") # Show first 10
# Show some examples
for i in diff_indices[:5]:
logger.warning(f"Index {i}: Original={orig_meta_trimmed[i]}, Incremental={inc_meta_trimmed[i]}")
# Compare with IncMetaTrendStrategy if available
if self.incremental_strategy_results is not None:
strategy_meta = self.incremental_strategy_results['meta_trend']
# Compare Original vs IncMetaTrendStrategy
strategy_min_length = min(len(orig_meta), len(strategy_meta))
orig_strategy_trimmed = orig_meta[-strategy_min_length:]
strategy_meta_trimmed = strategy_meta[-strategy_min_length:]
strategy_meta_trend_match = np.array_equal(orig_strategy_trimmed, strategy_meta_trimmed)
comparison['strategy_meta_trend_match'] = strategy_meta_trend_match
if not strategy_meta_trend_match:
diff_indices = np.where(orig_strategy_trimmed != strategy_meta_trimmed)[0]
logger.warning(f"Strategy meta-trend differences at indices: {diff_indices[:10]}...")
for i in diff_indices[:5]:
logger.warning(f"Index {i}: Original={orig_strategy_trimmed[i]}, Strategy={strategy_meta_trimmed[i]}")
# Compare SupertrendCollection vs IncMetaTrendStrategy
collection_strategy_min_length = min(len(inc_meta), len(strategy_meta))
inc_collection_trimmed = inc_meta[-collection_strategy_min_length:]
strategy_collection_trimmed = strategy_meta[-collection_strategy_min_length:]
collection_strategy_match = np.array_equal(inc_collection_trimmed, strategy_collection_trimmed)
comparison['collection_strategy_match'] = collection_strategy_match
if not collection_strategy_match:
diff_indices = np.where(inc_collection_trimmed != strategy_collection_trimmed)[0]
logger.warning(f"Collection vs Strategy differences at indices: {diff_indices[:10]}...")
# Compare individual trends if available
if (self.original_results['individual_trends'] is not None and
self.incremental_results['individual_trends'] is not None):
orig_trends = self.original_results['individual_trends']
inc_trends = self.incremental_results['individual_trends']
# Trim to same length
orig_trends_trimmed = orig_trends[-min_length:]
inc_trends_trimmed = inc_trends[-min_length:]
individual_trends_match = np.array_equal(orig_trends_trimmed, inc_trends_trimmed)
comparison['individual_trends_match'] = individual_trends_match
if not individual_trends_match:
logger.warning("Individual trends do not match")
# Check each Supertrend separately
for st_idx in range(3):
st_match = np.array_equal(orig_trends_trimmed[:, st_idx], inc_trends_trimmed[:, st_idx])
comparison[f'supertrend_{st_idx}_match'] = st_match
if not st_match:
diff_indices = np.where(orig_trends_trimmed[:, st_idx] != inc_trends_trimmed[:, st_idx])[0]
logger.warning(f"Supertrend {st_idx} differences at indices: {diff_indices[:5]}...")
# Compare signals (Original vs SupertrendCollection)
orig_entry = set(self.original_results['entry_signals'])
inc_entry = set(self.incremental_results['entry_signals'])
entry_signals_match = orig_entry == inc_entry
comparison['entry_signals_match'] = entry_signals_match
if not entry_signals_match:
logger.warning(f"Entry signals differ: Original={orig_entry}, Incremental={inc_entry}")
orig_exit = set(self.original_results['exit_signals'])
inc_exit = set(self.incremental_results['exit_signals'])
exit_signals_match = orig_exit == inc_exit
comparison['exit_signals_match'] = exit_signals_match
if not exit_signals_match:
logger.warning(f"Exit signals differ: Original={orig_exit}, Incremental={inc_exit}")
# Compare signals with IncMetaTrendStrategy if available
if self.incremental_strategy_results is not None:
strategy_entry = set(self.incremental_strategy_results['entry_signals'])
strategy_exit = set(self.incremental_strategy_results['exit_signals'])
# Original vs Strategy signals
strategy_entry_signals_match = orig_entry == strategy_entry
strategy_exit_signals_match = orig_exit == strategy_exit
comparison['strategy_entry_signals_match'] = strategy_entry_signals_match
comparison['strategy_exit_signals_match'] = strategy_exit_signals_match
if not strategy_entry_signals_match:
logger.warning(f"Strategy entry signals differ: Original={orig_entry}, Strategy={strategy_entry}")
if not strategy_exit_signals_match:
logger.warning(f"Strategy exit signals differ: Original={orig_exit}, Strategy={strategy_exit}")
# Collection vs Strategy signals
collection_strategy_entry_match = inc_entry == strategy_entry
collection_strategy_exit_match = inc_exit == strategy_exit
comparison['collection_strategy_entry_match'] = collection_strategy_entry_match
comparison['collection_strategy_exit_match'] = collection_strategy_exit_match
# Overall match (Original vs SupertrendCollection)
comparison['overall_match'] = all([
meta_trend_match,
entry_signals_match,
exit_signals_match
])
# Overall strategy match (Original vs IncMetaTrendStrategy)
if self.incremental_strategy_results is not None:
comparison['strategy_overall_match'] = all([
comparison.get('strategy_meta_trend_match', False),
comparison.get('strategy_entry_signals_match', False),
comparison.get('strategy_exit_signals_match', False)
])
return comparison
def save_detailed_comparison(self, filename: str = "metatrend_comparison.csv"):
"""Save detailed comparison data to CSV for analysis."""
if self.original_results is None or self.incremental_results is None:
logger.warning("No results to save")
return
# Prepare comparison DataFrame
orig_meta = self.original_results['meta_trend']
inc_meta = self.incremental_results['meta_trend']
min_length = min(len(orig_meta), len(inc_meta))
# Get the correct data range for timestamps and prices
data_start_index = self.original_results.get('data_start_index', 0)
comparison_data = self.test_data.iloc[data_start_index:data_start_index + min_length]
comparison_df = pd.DataFrame({
'timestamp': comparison_data['timestamp'].values,
'close': comparison_data['close'].values,
'original_meta_trend': orig_meta[:min_length],
'incremental_meta_trend': inc_meta[:min_length],
'meta_trend_match': orig_meta[:min_length] == inc_meta[:min_length]
})
# Add individual trends if available
if (self.original_results['individual_trends'] is not None and
self.incremental_results['individual_trends'] is not None):
orig_trends = self.original_results['individual_trends'][:min_length]
inc_trends = self.incremental_results['individual_trends'][:min_length]
for i in range(3):
comparison_df[f'original_st{i}_trend'] = orig_trends[:, i]
comparison_df[f'incremental_st{i}_trend'] = inc_trends[:, i]
comparison_df[f'st{i}_trend_match'] = orig_trends[:, i] == inc_trends[:, i]
# Save to results directory
os.makedirs("results", exist_ok=True)
filepath = os.path.join("results", filename)
comparison_df.to_csv(filepath, index=False)
logger.info(f"Detailed comparison saved to {filepath}")
def save_trend_changes_analysis(self, filename_prefix: str = "trend_changes"):
"""Save detailed trend changes analysis for manual comparison."""
if self.original_results is None or self.incremental_results is None:
logger.warning("No results to save")
return
# Get the correct data range
data_start_index = self.original_results.get('data_start_index', 0)
orig_meta = self.original_results['meta_trend']
inc_meta = self.incremental_results['meta_trend']
min_length = min(len(orig_meta), len(inc_meta))
comparison_data = self.test_data.iloc[data_start_index:data_start_index + min_length]
# Analyze original trend changes
original_changes = []
for i in range(1, len(orig_meta)):
if orig_meta[i] != orig_meta[i-1]:
original_changes.append({
'index': i,
'timestamp': comparison_data.iloc[i]['timestamp'],
'close_price': comparison_data.iloc[i]['close'],
'prev_trend': orig_meta[i-1],
'new_trend': orig_meta[i],
'change_type': self._get_change_type(orig_meta[i-1], orig_meta[i])
})
# Analyze incremental trend changes
incremental_changes = []
for i in range(1, len(inc_meta)):
if inc_meta[i] != inc_meta[i-1]:
incremental_changes.append({
'index': i,
'timestamp': comparison_data.iloc[i]['timestamp'],
'close_price': comparison_data.iloc[i]['close'],
'prev_trend': inc_meta[i-1],
'new_trend': inc_meta[i],
'change_type': self._get_change_type(inc_meta[i-1], inc_meta[i])
})
# Save original trend changes
os.makedirs("results", exist_ok=True)
original_df = pd.DataFrame(original_changes)
original_file = os.path.join("results", f"{filename_prefix}_original.csv")
original_df.to_csv(original_file, index=False)
logger.info(f"Original trend changes saved to {original_file} ({len(original_changes)} changes)")
# Save incremental trend changes
incremental_df = pd.DataFrame(incremental_changes)
incremental_file = os.path.join("results", f"{filename_prefix}_incremental.csv")
incremental_df.to_csv(incremental_file, index=False)
logger.info(f"Incremental trend changes saved to {incremental_file} ({len(incremental_changes)} changes)")
# Create side-by-side comparison
comparison_changes = []
max_changes = max(len(original_changes), len(incremental_changes))
for i in range(max_changes):
orig_change = original_changes[i] if i < len(original_changes) else {}
inc_change = incremental_changes[i] if i < len(incremental_changes) else {}
comparison_changes.append({
'change_num': i + 1,
'orig_index': orig_change.get('index', ''),
'orig_timestamp': orig_change.get('timestamp', ''),
'orig_close': orig_change.get('close_price', ''),
'orig_prev_trend': orig_change.get('prev_trend', ''),
'orig_new_trend': orig_change.get('new_trend', ''),
'orig_change_type': orig_change.get('change_type', ''),
'inc_index': inc_change.get('index', ''),
'inc_timestamp': inc_change.get('timestamp', ''),
'inc_close': inc_change.get('close_price', ''),
'inc_prev_trend': inc_change.get('prev_trend', ''),
'inc_new_trend': inc_change.get('new_trend', ''),
'inc_change_type': inc_change.get('change_type', ''),
'match': (orig_change.get('index') == inc_change.get('index') and
orig_change.get('new_trend') == inc_change.get('new_trend')) if orig_change and inc_change else False
})
comparison_df = pd.DataFrame(comparison_changes)
comparison_file = os.path.join("results", f"{filename_prefix}_comparison.csv")
comparison_df.to_csv(comparison_file, index=False)
logger.info(f"Side-by-side comparison saved to {comparison_file}")
# Create summary statistics
summary = {
'original_total_changes': len(original_changes),
'incremental_total_changes': len(incremental_changes),
'original_entry_signals': len([c for c in original_changes if c['change_type'] == 'ENTRY']),
'incremental_entry_signals': len([c for c in incremental_changes if c['change_type'] == 'ENTRY']),
'original_exit_signals': len([c for c in original_changes if c['change_type'] == 'EXIT']),
'incremental_exit_signals': len([c for c in incremental_changes if c['change_type'] == 'EXIT']),
'original_to_neutral': len([c for c in original_changes if c['new_trend'] == 0]),
'incremental_to_neutral': len([c for c in incremental_changes if c['new_trend'] == 0]),
'matching_changes': len([c for c in comparison_changes if c['match']]),
'total_comparison_points': max_changes
}
summary_file = os.path.join("results", f"{filename_prefix}_summary.json")
import json
with open(summary_file, 'w') as f:
json.dump(summary, f, indent=2)
logger.info(f"Summary statistics saved to {summary_file}")
return {
'original_changes': original_changes,
'incremental_changes': incremental_changes,
'summary': summary
}
def _get_change_type(self, prev_trend: float, new_trend: float) -> str:
"""Classify the type of trend change."""
if prev_trend != 1 and new_trend == 1:
return 'ENTRY'
elif prev_trend != -1 and new_trend == -1:
return 'EXIT'
elif new_trend == 0:
return 'TO_NEUTRAL'
elif prev_trend == 0 and new_trend != 0:
return 'FROM_NEUTRAL'
else:
return 'OTHER'
def save_individual_supertrend_analysis(self, filename_prefix: str = "supertrend_individual"):
"""Save detailed analysis of individual Supertrend indicators."""
if (self.original_results is None or self.incremental_results is None or
self.original_results['individual_trends'] is None or
self.incremental_results['individual_trends'] is None):
logger.warning("Individual trends data not available")
return
data_start_index = self.original_results.get('data_start_index', 0)
orig_trends = self.original_results['individual_trends']
inc_trends = self.incremental_results['individual_trends']
min_length = min(len(orig_trends), len(inc_trends))
comparison_data = self.test_data.iloc[data_start_index:data_start_index + min_length]
# Analyze each Supertrend indicator separately
for st_idx in range(3):
st_params = self.supertrend_params[st_idx]
st_name = f"ST{st_idx}_P{st_params['period']}_M{st_params['multiplier']}"
# Original Supertrend changes
orig_st_changes = []
for i in range(1, len(orig_trends)):
if orig_trends[i, st_idx] != orig_trends[i-1, st_idx]:
orig_st_changes.append({
'index': i,
'timestamp': comparison_data.iloc[i]['timestamp'],
'close_price': comparison_data.iloc[i]['close'],
'prev_trend': orig_trends[i-1, st_idx],
'new_trend': orig_trends[i, st_idx],
'change_type': 'UP' if orig_trends[i, st_idx] == 1 else 'DOWN'
})
# Incremental Supertrend changes
inc_st_changes = []
for i in range(1, len(inc_trends)):
if inc_trends[i, st_idx] != inc_trends[i-1, st_idx]:
inc_st_changes.append({
'index': i,
'timestamp': comparison_data.iloc[i]['timestamp'],
'close_price': comparison_data.iloc[i]['close'],
'prev_trend': inc_trends[i-1, st_idx],
'new_trend': inc_trends[i, st_idx],
'change_type': 'UP' if inc_trends[i, st_idx] == 1 else 'DOWN'
})
# Save individual Supertrend analysis
os.makedirs("results", exist_ok=True)
# Original
orig_df = pd.DataFrame(orig_st_changes)
orig_file = os.path.join("results", f"{filename_prefix}_{st_name}_original.csv")
orig_df.to_csv(orig_file, index=False)
# Incremental
inc_df = pd.DataFrame(inc_st_changes)
inc_file = os.path.join("results", f"{filename_prefix}_{st_name}_incremental.csv")
inc_df.to_csv(inc_file, index=False)
logger.info(f"Supertrend {st_idx} analysis: Original={len(orig_st_changes)} changes, Incremental={len(inc_st_changes)} changes")
def save_full_timeline_data(self, filename: str = "full_timeline_comparison.csv"):
"""Save complete timeline data with all values for manual analysis."""
if self.original_results is None or self.incremental_results is None:
logger.warning("No results to save")
return
data_start_index = self.original_results.get('data_start_index', 0)
orig_meta = self.original_results['meta_trend']
inc_meta = self.incremental_results['meta_trend']
min_length = min(len(orig_meta), len(inc_meta))
comparison_data = self.test_data.iloc[data_start_index:data_start_index + min_length]
# Create comprehensive timeline
timeline_data = []
for i in range(min_length):
row_data = {
'index': i,
'timestamp': comparison_data.iloc[i]['timestamp'],
'open': comparison_data.iloc[i]['open'],
'high': comparison_data.iloc[i]['high'],
'low': comparison_data.iloc[i]['low'],
'close': comparison_data.iloc[i]['close'],
'original_meta_trend': orig_meta[i],
'incremental_meta_trend': inc_meta[i],
'meta_trend_match': orig_meta[i] == inc_meta[i],
'meta_trend_diff': abs(orig_meta[i] - inc_meta[i])
}
# Add individual Supertrend data if available
if (self.original_results['individual_trends'] is not None and
self.incremental_results['individual_trends'] is not None):
orig_trends = self.original_results['individual_trends']
inc_trends = self.incremental_results['individual_trends']
for st_idx in range(3):
st_params = self.supertrend_params[st_idx]
prefix = f"ST{st_idx}_P{st_params['period']}_M{st_params['multiplier']}"
row_data[f'{prefix}_orig'] = orig_trends[i, st_idx]
row_data[f'{prefix}_inc'] = inc_trends[i, st_idx]
row_data[f'{prefix}_match'] = orig_trends[i, st_idx] == inc_trends[i, st_idx]
# Mark trend changes
if i > 0:
row_data['orig_meta_changed'] = orig_meta[i] != orig_meta[i-1]
row_data['inc_meta_changed'] = inc_meta[i] != inc_meta[i-1]
row_data['orig_change_type'] = self._get_change_type(orig_meta[i-1], orig_meta[i]) if orig_meta[i] != orig_meta[i-1] else ''
row_data['inc_change_type'] = self._get_change_type(inc_meta[i-1], inc_meta[i]) if inc_meta[i] != inc_meta[i-1] else ''
else:
row_data['orig_meta_changed'] = False
row_data['inc_meta_changed'] = False
row_data['orig_change_type'] = ''
row_data['inc_change_type'] = ''
timeline_data.append(row_data)
# Save timeline data
os.makedirs("results", exist_ok=True)
timeline_df = pd.DataFrame(timeline_data)
filepath = os.path.join("results", filename)
timeline_df.to_csv(filepath, index=False)
logger.info(f"Full timeline comparison saved to {filepath} ({len(timeline_data)} rows)")
return timeline_df
def run_full_test(self, symbol: str = "BTCUSD", start_date: str = "2022-01-01", end_date: str = "2023-01-01", limit: int = None) -> bool:
"""
Run the complete comparison test.
Args:
symbol: Trading symbol to test
start_date: Start date in YYYY-MM-DD format
end_date: End date in YYYY-MM-DD format
limit: Optional limit on number of data points (applied after date filtering)
Returns:
True if all tests pass, False otherwise
"""
logger.info("=" * 60)
logger.info("STARTING METATREND STRATEGY COMPARISON TEST")
logger.info("=" * 60)
try:
# Load test data
self.load_test_data(symbol, start_date, end_date, limit)
logger.info(f"Test data loaded: {len(self.test_data)} points")
# Test original strategy
logger.info("\n" + "-" * 40)
logger.info("TESTING ORIGINAL STRATEGY")
logger.info("-" * 40)
self.test_original_strategy()
# Test incremental indicators
logger.info("\n" + "-" * 40)
logger.info("TESTING INCREMENTAL INDICATORS")
logger.info("-" * 40)
self.test_incremental_indicators()
# Test incremental strategy
logger.info("\n" + "-" * 40)
logger.info("TESTING INCREMENTAL STRATEGY")
logger.info("-" * 40)
self.test_incremental_strategy()
# Compare results
logger.info("\n" + "-" * 40)
logger.info("COMPARING RESULTS")
logger.info("-" * 40)
comparison = self.compare_results()
# Save detailed comparison
self.save_detailed_comparison()
# Save trend changes analysis
self.save_trend_changes_analysis()
# Save individual supertrend analysis
self.save_individual_supertrend_analysis()
# Save full timeline data
self.save_full_timeline_data()
# Print results
logger.info("\n" + "=" * 60)
logger.info("COMPARISON RESULTS")
logger.info("=" * 60)
for key, value in comparison.items():
status = "✅ PASS" if value else "❌ FAIL"
logger.info(f"{key}: {status}")
overall_pass = comparison.get('overall_match', False)
if overall_pass:
logger.info("\n🎉 ALL TESTS PASSED! Incremental indicators match original strategy.")
else:
logger.error("\n❌ TESTS FAILED! Incremental indicators do not match original strategy.")
return overall_pass
except Exception as e:
logger.error(f"Test failed with error: {e}")
import traceback
traceback.print_exc()
return False
def main():
"""Run the MetaTrend comparison test."""
test = MetaTrendComparisonTest()
# Run test with real BTCUSD data from 2022-01-01 to 2023-01-01
logger.info(f"\n{'='*80}")
logger.info(f"RUNNING METATREND COMPARISON TEST")
logger.info(f"Using real BTCUSD data from 2022-01-01 to 2023-01-01")
logger.info(f"{'='*80}")
# Test with the full year of data (no limit)
passed = test.run_full_test("BTCUSD", "2022-01-01", "2023-01-01", limit=None)
if passed:
logger.info("\n🎉 TEST PASSED! Incremental indicators match original strategy.")
else:
logger.error("\n❌ TEST FAILED! Incremental indicators do not match original strategy.")
return passed
if __name__ == "__main__":
success = main()
sys.exit(0 if success else 1)

View File

@@ -0,0 +1,406 @@
"""
Signal Comparison Test
This test compares the exact signals generated by:
1. Original DefaultStrategy
2. Incremental IncMetaTrendStrategy
Focus is on signal timing, type, and accuracy.
"""
import pandas as pd
import numpy as np
import logging
from typing import Dict, List, Tuple
import os
import sys
# Add project root to path
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from cycles.strategies.default_strategy import DefaultStrategy
from cycles.IncStrategies.metatrend_strategy import IncMetaTrendStrategy
from cycles.utils.storage import Storage
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class SignalComparisonTest:
"""Test to compare signals between original and incremental strategies."""
def __init__(self):
"""Initialize the signal comparison test."""
self.storage = Storage(logging=logger)
self.test_data = None
self.original_signals = []
self.incremental_signals = []
def load_test_data(self, limit: int = 500) -> pd.DataFrame:
"""Load a small dataset for signal testing."""
logger.info(f"Loading test data (limit: {limit} points)")
try:
# Load recent data
filename = "btcusd_1-min_data.csv"
start_date = pd.to_datetime("2022-12-31")
end_date = pd.to_datetime("2023-01-01")
df = self.storage.load_data(filename, start_date, end_date)
if len(df) > limit:
df = df.tail(limit)
logger.info(f"Limited data to last {limit} points")
# Reset index to get timestamp as column
df_with_timestamp = df.reset_index()
self.test_data = df_with_timestamp
logger.info(f"Loaded {len(df_with_timestamp)} data points")
logger.info(f"Date range: {df_with_timestamp['timestamp'].min()} to {df_with_timestamp['timestamp'].max()}")
return df_with_timestamp
except Exception as e:
logger.error(f"Failed to load test data: {e}")
raise
def test_original_strategy_signals(self) -> List[Dict]:
"""Test original DefaultStrategy and extract all signals."""
logger.info("Testing Original DefaultStrategy signals...")
# Create indexed DataFrame for original strategy
indexed_data = self.test_data.set_index('timestamp')
# Limit to 200 points like original strategy does
if len(indexed_data) > 200:
original_data_used = indexed_data.tail(200)
data_start_index = len(self.test_data) - 200
else:
original_data_used = indexed_data
data_start_index = 0
# Create mock backtester
class MockBacktester:
def __init__(self, df):
self.original_df = df
self.min1_df = df
self.strategies = {}
backtester = MockBacktester(original_data_used)
# Initialize original strategy
strategy = DefaultStrategy(weight=1.0, params={
"stop_loss_pct": 0.03,
"timeframe": "1min"
})
strategy.initialize(backtester)
# Extract signals by simulating the strategy step by step
signals = []
for i in range(len(original_data_used)):
# Get entry signal
entry_signal = strategy.get_entry_signal(backtester, i)
if entry_signal.signal_type == "ENTRY":
signals.append({
'index': i,
'global_index': data_start_index + i,
'timestamp': original_data_used.index[i],
'close': original_data_used.iloc[i]['close'],
'signal_type': 'ENTRY',
'confidence': entry_signal.confidence,
'metadata': entry_signal.metadata,
'source': 'original'
})
# Get exit signal
exit_signal = strategy.get_exit_signal(backtester, i)
if exit_signal.signal_type == "EXIT":
signals.append({
'index': i,
'global_index': data_start_index + i,
'timestamp': original_data_used.index[i],
'close': original_data_used.iloc[i]['close'],
'signal_type': 'EXIT',
'confidence': exit_signal.confidence,
'metadata': exit_signal.metadata,
'source': 'original'
})
self.original_signals = signals
logger.info(f"Original strategy generated {len(signals)} signals")
return signals
def test_incremental_strategy_signals(self) -> List[Dict]:
"""Test incremental IncMetaTrendStrategy and extract all signals."""
logger.info("Testing Incremental IncMetaTrendStrategy signals...")
# Create strategy instance
strategy = IncMetaTrendStrategy("metatrend", weight=1.0, params={
"timeframe": "1min",
"enable_logging": False
})
# Determine data range to match original strategy
if len(self.test_data) > 200:
test_data_subset = self.test_data.tail(200)
data_start_index = len(self.test_data) - 200
else:
test_data_subset = self.test_data
data_start_index = 0
# Process data incrementally and collect signals
signals = []
for idx, (_, row) in enumerate(test_data_subset.iterrows()):
ohlc = {
'open': row['open'],
'high': row['high'],
'low': row['low'],
'close': row['close']
}
# Update strategy with new data point
strategy.calculate_on_data(ohlc, row['timestamp'])
# Check for entry signal
entry_signal = strategy.get_entry_signal()
if entry_signal.signal_type == "ENTRY":
signals.append({
'index': idx,
'global_index': data_start_index + idx,
'timestamp': row['timestamp'],
'close': row['close'],
'signal_type': 'ENTRY',
'confidence': entry_signal.confidence,
'metadata': entry_signal.metadata,
'source': 'incremental'
})
# Check for exit signal
exit_signal = strategy.get_exit_signal()
if exit_signal.signal_type == "EXIT":
signals.append({
'index': idx,
'global_index': data_start_index + idx,
'timestamp': row['timestamp'],
'close': row['close'],
'signal_type': 'EXIT',
'confidence': exit_signal.confidence,
'metadata': exit_signal.metadata,
'source': 'incremental'
})
self.incremental_signals = signals
logger.info(f"Incremental strategy generated {len(signals)} signals")
return signals
def compare_signals(self) -> Dict:
"""Compare signals between original and incremental strategies."""
logger.info("Comparing signals between strategies...")
if not self.original_signals or not self.incremental_signals:
raise ValueError("Must run both signal tests before comparison")
# Separate by signal type
orig_entry = [s for s in self.original_signals if s['signal_type'] == 'ENTRY']
orig_exit = [s for s in self.original_signals if s['signal_type'] == 'EXIT']
inc_entry = [s for s in self.incremental_signals if s['signal_type'] == 'ENTRY']
inc_exit = [s for s in self.incremental_signals if s['signal_type'] == 'EXIT']
# Compare counts
comparison = {
'original_total': len(self.original_signals),
'incremental_total': len(self.incremental_signals),
'original_entry_count': len(orig_entry),
'original_exit_count': len(orig_exit),
'incremental_entry_count': len(inc_entry),
'incremental_exit_count': len(inc_exit),
'entry_count_match': len(orig_entry) == len(inc_entry),
'exit_count_match': len(orig_exit) == len(inc_exit),
'total_count_match': len(self.original_signals) == len(self.incremental_signals)
}
# Compare signal timing (by index)
orig_entry_indices = set(s['index'] for s in orig_entry)
orig_exit_indices = set(s['index'] for s in orig_exit)
inc_entry_indices = set(s['index'] for s in inc_entry)
inc_exit_indices = set(s['index'] for s in inc_exit)
comparison.update({
'entry_indices_match': orig_entry_indices == inc_entry_indices,
'exit_indices_match': orig_exit_indices == inc_exit_indices,
'entry_index_diff': orig_entry_indices.symmetric_difference(inc_entry_indices),
'exit_index_diff': orig_exit_indices.symmetric_difference(inc_exit_indices)
})
return comparison
def print_signal_details(self):
"""Print detailed signal information for analysis."""
print("\n" + "="*80)
print("DETAILED SIGNAL COMPARISON")
print("="*80)
# Original signals
print(f"\n📊 ORIGINAL STRATEGY SIGNALS ({len(self.original_signals)} total)")
print("-" * 60)
for signal in self.original_signals:
print(f"Index {signal['index']:3d} | {signal['timestamp']} | "
f"{signal['signal_type']:5s} | Price: {signal['close']:8.2f} | "
f"Conf: {signal['confidence']:.2f}")
# Incremental signals
print(f"\n📊 INCREMENTAL STRATEGY SIGNALS ({len(self.incremental_signals)} total)")
print("-" * 60)
for signal in self.incremental_signals:
print(f"Index {signal['index']:3d} | {signal['timestamp']} | "
f"{signal['signal_type']:5s} | Price: {signal['close']:8.2f} | "
f"Conf: {signal['confidence']:.2f}")
# Side-by-side comparison
print(f"\n🔄 SIDE-BY-SIDE COMPARISON")
print("-" * 80)
print(f"{'Index':<6} {'Original':<20} {'Incremental':<20} {'Match':<8}")
print("-" * 80)
# Get all unique indices
all_indices = set()
for signal in self.original_signals + self.incremental_signals:
all_indices.add(signal['index'])
for idx in sorted(all_indices):
orig_signal = next((s for s in self.original_signals if s['index'] == idx), None)
inc_signal = next((s for s in self.incremental_signals if s['index'] == idx), None)
orig_str = f"{orig_signal['signal_type']}" if orig_signal else "---"
inc_str = f"{inc_signal['signal_type']}" if inc_signal else "---"
match_str = "" if orig_str == inc_str else ""
print(f"{idx:<6} {orig_str:<20} {inc_str:<20} {match_str:<8}")
def save_signal_comparison(self, filename: str = "signal_comparison.csv"):
"""Save detailed signal comparison to CSV."""
all_signals = []
# Add original signals
for signal in self.original_signals:
all_signals.append({
'index': signal['index'],
'timestamp': signal['timestamp'],
'close': signal['close'],
'original_signal': signal['signal_type'],
'original_confidence': signal['confidence'],
'incremental_signal': '',
'incremental_confidence': '',
'match': False
})
# Add incremental signals
for signal in self.incremental_signals:
# Find if there's already a row for this index
existing = next((s for s in all_signals if s['index'] == signal['index']), None)
if existing:
existing['incremental_signal'] = signal['signal_type']
existing['incremental_confidence'] = signal['confidence']
existing['match'] = existing['original_signal'] == signal['signal_type']
else:
all_signals.append({
'index': signal['index'],
'timestamp': signal['timestamp'],
'close': signal['close'],
'original_signal': '',
'original_confidence': '',
'incremental_signal': signal['signal_type'],
'incremental_confidence': signal['confidence'],
'match': False
})
# Sort by index
all_signals.sort(key=lambda x: x['index'])
# Save to CSV
os.makedirs("results", exist_ok=True)
df = pd.DataFrame(all_signals)
filepath = os.path.join("results", filename)
df.to_csv(filepath, index=False)
logger.info(f"Signal comparison saved to {filepath}")
def run_signal_test(self, limit: int = 500) -> bool:
"""Run the complete signal comparison test."""
logger.info("="*80)
logger.info("STARTING SIGNAL COMPARISON TEST")
logger.info("="*80)
try:
# Load test data
self.load_test_data(limit)
# Test both strategies
self.test_original_strategy_signals()
self.test_incremental_strategy_signals()
# Compare results
comparison = self.compare_signals()
# Print results
print("\n" + "="*80)
print("SIGNAL COMPARISON RESULTS")
print("="*80)
print(f"\n📊 SIGNAL COUNTS:")
print(f"Original Strategy: {comparison['original_entry_count']} entries, {comparison['original_exit_count']} exits")
print(f"Incremental Strategy: {comparison['incremental_entry_count']} entries, {comparison['incremental_exit_count']} exits")
print(f"\n✅ MATCHES:")
print(f"Entry count match: {'✅ YES' if comparison['entry_count_match'] else '❌ NO'}")
print(f"Exit count match: {'✅ YES' if comparison['exit_count_match'] else '❌ NO'}")
print(f"Entry timing match: {'✅ YES' if comparison['entry_indices_match'] else '❌ NO'}")
print(f"Exit timing match: {'✅ YES' if comparison['exit_indices_match'] else '❌ NO'}")
if comparison['entry_index_diff']:
print(f"\n❌ Entry signal differences at indices: {sorted(comparison['entry_index_diff'])}")
if comparison['exit_index_diff']:
print(f"❌ Exit signal differences at indices: {sorted(comparison['exit_index_diff'])}")
# Print detailed signals
self.print_signal_details()
# Save comparison
self.save_signal_comparison()
# Overall result
overall_match = (comparison['entry_count_match'] and
comparison['exit_count_match'] and
comparison['entry_indices_match'] and
comparison['exit_indices_match'])
print(f"\n🏆 OVERALL RESULT: {'✅ SIGNALS MATCH PERFECTLY' if overall_match else '❌ SIGNALS DIFFER'}")
return overall_match
except Exception as e:
logger.error(f"Signal test failed: {e}")
import traceback
traceback.print_exc()
return False
def main():
"""Run the signal comparison test."""
test = SignalComparisonTest()
# Run test with 500 data points
success = test.run_signal_test(limit=500)
return success
if __name__ == "__main__":
success = main()
sys.exit(0 if success else 1)

View File

@@ -0,0 +1,394 @@
"""
Signal Comparison Test (Fixed Original Strategy)
This test compares signals between:
1. Original DefaultStrategy (with exit condition bug FIXED)
2. Incremental IncMetaTrendStrategy
The original strategy has a bug in get_exit_signal where it checks:
if prev_trend != 1 and curr_trend == -1:
But it should check:
if prev_trend != -1 and curr_trend == -1:
This test fixes that bug to see if the strategies match when both are correct.
"""
import pandas as pd
import numpy as np
import logging
from typing import Dict, List, Tuple
import os
import sys
# Add project root to path
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from cycles.strategies.default_strategy import DefaultStrategy
from cycles.IncStrategies.metatrend_strategy import IncMetaTrendStrategy
from cycles.utils.storage import Storage
from cycles.strategies.base import StrategySignal
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class FixedDefaultStrategy(DefaultStrategy):
"""DefaultStrategy with the exit condition bug fixed."""
def get_exit_signal(self, backtester, df_index: int) -> StrategySignal:
"""
Generate exit signal with CORRECTED logic.
Exit occurs when meta-trend changes from != -1 to == -1 (FIXED)
"""
if not self.initialized:
return StrategySignal("HOLD", 0.0)
if df_index < 1:
return StrategySignal("HOLD", 0.0)
# Check bounds
if not hasattr(self, 'meta_trend') or df_index >= len(self.meta_trend):
return StrategySignal("HOLD", 0.0)
# Check for meta-trend exit signal (CORRECTED LOGIC)
prev_trend = self.meta_trend[df_index - 1]
curr_trend = self.meta_trend[df_index]
# FIXED: Check if prev_trend != -1 (not prev_trend != 1)
if prev_trend != -1 and curr_trend == -1:
return StrategySignal("EXIT", confidence=1.0,
metadata={"type": "META_TREND_EXIT_SIGNAL"})
return StrategySignal("HOLD", confidence=0.0)
class SignalComparisonTestFixed:
"""Test to compare signals between fixed original and incremental strategies."""
def __init__(self):
"""Initialize the signal comparison test."""
self.storage = Storage(logging=logger)
self.test_data = None
self.original_signals = []
self.incremental_signals = []
def load_test_data(self, limit: int = 500) -> pd.DataFrame:
"""Load a small dataset for signal testing."""
logger.info(f"Loading test data (limit: {limit} points)")
try:
# Load recent data
filename = "btcusd_1-min_data.csv"
start_date = pd.to_datetime("2022-12-31")
end_date = pd.to_datetime("2023-01-01")
df = self.storage.load_data(filename, start_date, end_date)
if len(df) > limit:
df = df.tail(limit)
logger.info(f"Limited data to last {limit} points")
# Reset index to get timestamp as column
df_with_timestamp = df.reset_index()
self.test_data = df_with_timestamp
logger.info(f"Loaded {len(df_with_timestamp)} data points")
logger.info(f"Date range: {df_with_timestamp['timestamp'].min()} to {df_with_timestamp['timestamp'].max()}")
return df_with_timestamp
except Exception as e:
logger.error(f"Failed to load test data: {e}")
raise
def test_fixed_original_strategy_signals(self) -> List[Dict]:
"""Test FIXED original DefaultStrategy and extract all signals."""
logger.info("Testing FIXED Original DefaultStrategy signals...")
# Create indexed DataFrame for original strategy
indexed_data = self.test_data.set_index('timestamp')
# Limit to 200 points like original strategy does
if len(indexed_data) > 200:
original_data_used = indexed_data.tail(200)
data_start_index = len(self.test_data) - 200
else:
original_data_used = indexed_data
data_start_index = 0
# Create mock backtester
class MockBacktester:
def __init__(self, df):
self.original_df = df
self.min1_df = df
self.strategies = {}
backtester = MockBacktester(original_data_used)
# Initialize FIXED original strategy
strategy = FixedDefaultStrategy(weight=1.0, params={
"stop_loss_pct": 0.03,
"timeframe": "1min"
})
strategy.initialize(backtester)
# Extract signals by simulating the strategy step by step
signals = []
for i in range(len(original_data_used)):
# Get entry signal
entry_signal = strategy.get_entry_signal(backtester, i)
if entry_signal.signal_type == "ENTRY":
signals.append({
'index': i,
'global_index': data_start_index + i,
'timestamp': original_data_used.index[i],
'close': original_data_used.iloc[i]['close'],
'signal_type': 'ENTRY',
'confidence': entry_signal.confidence,
'metadata': entry_signal.metadata,
'source': 'fixed_original'
})
# Get exit signal
exit_signal = strategy.get_exit_signal(backtester, i)
if exit_signal.signal_type == "EXIT":
signals.append({
'index': i,
'global_index': data_start_index + i,
'timestamp': original_data_used.index[i],
'close': original_data_used.iloc[i]['close'],
'signal_type': 'EXIT',
'confidence': exit_signal.confidence,
'metadata': exit_signal.metadata,
'source': 'fixed_original'
})
self.original_signals = signals
logger.info(f"Fixed original strategy generated {len(signals)} signals")
return signals
def test_incremental_strategy_signals(self) -> List[Dict]:
"""Test incremental IncMetaTrendStrategy and extract all signals."""
logger.info("Testing Incremental IncMetaTrendStrategy signals...")
# Create strategy instance
strategy = IncMetaTrendStrategy("metatrend", weight=1.0, params={
"timeframe": "1min",
"enable_logging": False
})
# Determine data range to match original strategy
if len(self.test_data) > 200:
test_data_subset = self.test_data.tail(200)
data_start_index = len(self.test_data) - 200
else:
test_data_subset = self.test_data
data_start_index = 0
# Process data incrementally and collect signals
signals = []
for idx, (_, row) in enumerate(test_data_subset.iterrows()):
ohlc = {
'open': row['open'],
'high': row['high'],
'low': row['low'],
'close': row['close']
}
# Update strategy with new data point
strategy.calculate_on_data(ohlc, row['timestamp'])
# Check for entry signal
entry_signal = strategy.get_entry_signal()
if entry_signal.signal_type == "ENTRY":
signals.append({
'index': idx,
'global_index': data_start_index + idx,
'timestamp': row['timestamp'],
'close': row['close'],
'signal_type': 'ENTRY',
'confidence': entry_signal.confidence,
'metadata': entry_signal.metadata,
'source': 'incremental'
})
# Check for exit signal
exit_signal = strategy.get_exit_signal()
if exit_signal.signal_type == "EXIT":
signals.append({
'index': idx,
'global_index': data_start_index + idx,
'timestamp': row['timestamp'],
'close': row['close'],
'signal_type': 'EXIT',
'confidence': exit_signal.confidence,
'metadata': exit_signal.metadata,
'source': 'incremental'
})
self.incremental_signals = signals
logger.info(f"Incremental strategy generated {len(signals)} signals")
return signals
def compare_signals(self) -> Dict:
"""Compare signals between fixed original and incremental strategies."""
logger.info("Comparing signals between strategies...")
if not self.original_signals or not self.incremental_signals:
raise ValueError("Must run both signal tests before comparison")
# Separate by signal type
orig_entry = [s for s in self.original_signals if s['signal_type'] == 'ENTRY']
orig_exit = [s for s in self.original_signals if s['signal_type'] == 'EXIT']
inc_entry = [s for s in self.incremental_signals if s['signal_type'] == 'ENTRY']
inc_exit = [s for s in self.incremental_signals if s['signal_type'] == 'EXIT']
# Compare counts
comparison = {
'original_total': len(self.original_signals),
'incremental_total': len(self.incremental_signals),
'original_entry_count': len(orig_entry),
'original_exit_count': len(orig_exit),
'incremental_entry_count': len(inc_entry),
'incremental_exit_count': len(inc_exit),
'entry_count_match': len(orig_entry) == len(inc_entry),
'exit_count_match': len(orig_exit) == len(inc_exit),
'total_count_match': len(self.original_signals) == len(self.incremental_signals)
}
# Compare signal timing (by index)
orig_entry_indices = set(s['index'] for s in orig_entry)
orig_exit_indices = set(s['index'] for s in orig_exit)
inc_entry_indices = set(s['index'] for s in inc_entry)
inc_exit_indices = set(s['index'] for s in inc_exit)
comparison.update({
'entry_indices_match': orig_entry_indices == inc_entry_indices,
'exit_indices_match': orig_exit_indices == inc_exit_indices,
'entry_index_diff': orig_entry_indices.symmetric_difference(inc_entry_indices),
'exit_index_diff': orig_exit_indices.symmetric_difference(inc_exit_indices)
})
return comparison
def print_signal_details(self):
"""Print detailed signal information for analysis."""
print("\n" + "="*80)
print("DETAILED SIGNAL COMPARISON (FIXED ORIGINAL)")
print("="*80)
# Original signals
print(f"\n📊 FIXED ORIGINAL STRATEGY SIGNALS ({len(self.original_signals)} total)")
print("-" * 60)
for signal in self.original_signals:
print(f"Index {signal['index']:3d} | {signal['timestamp']} | "
f"{signal['signal_type']:5s} | Price: {signal['close']:8.2f} | "
f"Conf: {signal['confidence']:.2f}")
# Incremental signals
print(f"\n📊 INCREMENTAL STRATEGY SIGNALS ({len(self.incremental_signals)} total)")
print("-" * 60)
for signal in self.incremental_signals:
print(f"Index {signal['index']:3d} | {signal['timestamp']} | "
f"{signal['signal_type']:5s} | Price: {signal['close']:8.2f} | "
f"Conf: {signal['confidence']:.2f}")
# Side-by-side comparison
print(f"\n🔄 SIDE-BY-SIDE COMPARISON")
print("-" * 80)
print(f"{'Index':<6} {'Fixed Original':<20} {'Incremental':<20} {'Match':<8}")
print("-" * 80)
# Get all unique indices
all_indices = set()
for signal in self.original_signals + self.incremental_signals:
all_indices.add(signal['index'])
for idx in sorted(all_indices):
orig_signal = next((s for s in self.original_signals if s['index'] == idx), None)
inc_signal = next((s for s in self.incremental_signals if s['index'] == idx), None)
orig_str = f"{orig_signal['signal_type']}" if orig_signal else "---"
inc_str = f"{inc_signal['signal_type']}" if inc_signal else "---"
match_str = "" if orig_str == inc_str else ""
print(f"{idx:<6} {orig_str:<20} {inc_str:<20} {match_str:<8}")
def run_signal_test(self, limit: int = 500) -> bool:
"""Run the complete signal comparison test."""
logger.info("="*80)
logger.info("STARTING FIXED SIGNAL COMPARISON TEST")
logger.info("="*80)
try:
# Load test data
self.load_test_data(limit)
# Test both strategies
self.test_fixed_original_strategy_signals()
self.test_incremental_strategy_signals()
# Compare results
comparison = self.compare_signals()
# Print results
print("\n" + "="*80)
print("FIXED SIGNAL COMPARISON RESULTS")
print("="*80)
print(f"\n📊 SIGNAL COUNTS:")
print(f"Fixed Original Strategy: {comparison['original_entry_count']} entries, {comparison['original_exit_count']} exits")
print(f"Incremental Strategy: {comparison['incremental_entry_count']} entries, {comparison['incremental_exit_count']} exits")
print(f"\n✅ MATCHES:")
print(f"Entry count match: {'✅ YES' if comparison['entry_count_match'] else '❌ NO'}")
print(f"Exit count match: {'✅ YES' if comparison['exit_count_match'] else '❌ NO'}")
print(f"Entry timing match: {'✅ YES' if comparison['entry_indices_match'] else '❌ NO'}")
print(f"Exit timing match: {'✅ YES' if comparison['exit_indices_match'] else '❌ NO'}")
if comparison['entry_index_diff']:
print(f"\n❌ Entry signal differences at indices: {sorted(comparison['entry_index_diff'])}")
if comparison['exit_index_diff']:
print(f"❌ Exit signal differences at indices: {sorted(comparison['exit_index_diff'])}")
# Print detailed signals
self.print_signal_details()
# Overall result
overall_match = (comparison['entry_count_match'] and
comparison['exit_count_match'] and
comparison['entry_indices_match'] and
comparison['exit_indices_match'])
print(f"\n🏆 OVERALL RESULT: {'✅ SIGNALS MATCH PERFECTLY' if overall_match else '❌ SIGNALS DIFFER'}")
return overall_match
except Exception as e:
logger.error(f"Signal test failed: {e}")
import traceback
traceback.print_exc()
return False
def main():
"""Run the fixed signal comparison test."""
test = SignalComparisonTestFixed()
# Run test with 500 data points
success = test.run_signal_test(limit=500)
return success
if __name__ == "__main__":
success = main()
sys.exit(0 if success else 1)