430 lines
18 KiB
Python
430 lines
18 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Compare Strategy Signals Only (No Backtesting)
|
|
==============================================
|
|
|
|
This script extracts entry and exit signals from both the original and incremental
|
|
strategies on the same data and plots them for visual comparison.
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
import pandas as pd
|
|
import numpy as np
|
|
from datetime import datetime
|
|
import matplotlib.pyplot as plt
|
|
import matplotlib.dates as mdates
|
|
|
|
# Add the parent directory to the path to import cycles modules
|
|
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
|
|
from cycles.utils.storage import Storage
|
|
from cycles.IncStrategies.metatrend_strategy import IncMetaTrendStrategy
|
|
from cycles.utils.data_utils import aggregate_to_minutes
|
|
from cycles.strategies.default_strategy import DefaultStrategy
|
|
|
|
|
|
def extract_original_signals(data_1min: pd.DataFrame, timeframe: str = "15min"):
|
|
"""Extract signals from the original strategy."""
|
|
print(f"\n🔄 Extracting Original Strategy Signals...")
|
|
|
|
# Create a mock backtester object for the strategy
|
|
class MockBacktester:
|
|
def __init__(self, data):
|
|
self.original_df = data
|
|
self.strategies = {}
|
|
self.current_position = None
|
|
self.entry_price = None
|
|
|
|
# Initialize the original strategy
|
|
strategy = DefaultStrategy(
|
|
weight=1.0,
|
|
params={
|
|
"timeframe": timeframe,
|
|
"stop_loss_pct": 0.03
|
|
}
|
|
)
|
|
|
|
# Create mock backtester and initialize strategy
|
|
mock_backtester = MockBacktester(data_1min)
|
|
strategy.initialize(mock_backtester)
|
|
|
|
if not strategy.initialized:
|
|
print(" ❌ Strategy initialization failed")
|
|
return []
|
|
|
|
# Get the aggregated data for the primary timeframe
|
|
primary_data = strategy.get_primary_timeframe_data()
|
|
if primary_data is None or len(primary_data) == 0:
|
|
print(" ❌ No primary timeframe data available")
|
|
return []
|
|
|
|
signals = []
|
|
|
|
# Process each data point in the primary timeframe
|
|
for i in range(len(primary_data)):
|
|
timestamp = primary_data.index[i]
|
|
row = primary_data.iloc[i]
|
|
|
|
# Get entry signal
|
|
entry_signal = strategy.get_entry_signal(mock_backtester, i)
|
|
if entry_signal and entry_signal.signal_type == "ENTRY":
|
|
signals.append({
|
|
'timestamp': timestamp,
|
|
'type': 'ENTRY',
|
|
'price': entry_signal.price if entry_signal.price else row['close'],
|
|
'strategy': 'Original',
|
|
'confidence': entry_signal.confidence,
|
|
'metadata': entry_signal.metadata
|
|
})
|
|
|
|
# Get exit signal
|
|
exit_signal = strategy.get_exit_signal(mock_backtester, i)
|
|
if exit_signal and exit_signal.signal_type == "EXIT":
|
|
signals.append({
|
|
'timestamp': timestamp,
|
|
'type': 'EXIT',
|
|
'price': exit_signal.price if exit_signal.price else row['close'],
|
|
'strategy': 'Original',
|
|
'confidence': exit_signal.confidence,
|
|
'metadata': exit_signal.metadata
|
|
})
|
|
|
|
print(f" Found {len([s for s in signals if s['type'] == 'ENTRY'])} entry signals")
|
|
print(f" Found {len([s for s in signals if s['type'] == 'EXIT'])} exit signals")
|
|
|
|
return signals
|
|
|
|
|
|
def extract_incremental_signals(data_1min: pd.DataFrame, timeframe: str = "15min"):
|
|
"""Extract signals from the incremental strategy."""
|
|
print(f"\n🔄 Extracting Incremental Strategy Signals...")
|
|
|
|
# Initialize the incremental strategy
|
|
strategy = IncMetaTrendStrategy(
|
|
name="metatrend",
|
|
weight=1.0,
|
|
params={
|
|
"timeframe": timeframe,
|
|
"enable_logging": False
|
|
}
|
|
)
|
|
|
|
signals = []
|
|
|
|
# Process each minute of data
|
|
for i, (timestamp, row) in enumerate(data_1min.iterrows()):
|
|
# Create the data structure for incremental strategy
|
|
ohlcv_data = {
|
|
'open': row['open'],
|
|
'high': row['high'],
|
|
'low': row['low'],
|
|
'close': row['close'],
|
|
'volume': row['volume']
|
|
}
|
|
|
|
# Update the strategy with new data (correct method signature)
|
|
result = strategy.update_minute_data(timestamp, ohlcv_data)
|
|
|
|
# Check if a complete timeframe bar was formed
|
|
if result is not None:
|
|
# Get entry signal
|
|
entry_signal = strategy.get_entry_signal()
|
|
if entry_signal and entry_signal.signal_type.upper() in ['BUY', 'ENTRY']:
|
|
signals.append({
|
|
'timestamp': timestamp,
|
|
'type': 'BUY',
|
|
'price': entry_signal.price if entry_signal.price else row['close'],
|
|
'strategy': 'Incremental',
|
|
'confidence': entry_signal.confidence,
|
|
'reason': entry_signal.metadata.get('type', 'ENTRY') if entry_signal.metadata else 'ENTRY'
|
|
})
|
|
|
|
# Get exit signal
|
|
exit_signal = strategy.get_exit_signal()
|
|
if exit_signal and exit_signal.signal_type.upper() in ['SELL', 'EXIT']:
|
|
signals.append({
|
|
'timestamp': timestamp,
|
|
'type': 'SELL',
|
|
'price': exit_signal.price if exit_signal.price else row['close'],
|
|
'strategy': 'Incremental',
|
|
'confidence': exit_signal.confidence,
|
|
'reason': exit_signal.metadata.get('type', 'EXIT') if exit_signal.metadata else 'EXIT'
|
|
})
|
|
|
|
print(f" Found {len([s for s in signals if s['type'] == 'BUY'])} buy signals")
|
|
print(f" Found {len([s for s in signals if s['type'] == 'SELL'])} sell signals")
|
|
|
|
return signals
|
|
|
|
|
|
def create_signals_comparison_plot(data_1min: pd.DataFrame, original_signals: list,
|
|
incremental_signals: list, start_date: str, end_date: str,
|
|
output_dir: str):
|
|
"""Create a comprehensive signals comparison plot."""
|
|
print(f"\n📊 Creating signals comparison plot...")
|
|
|
|
# Aggregate data for plotting (15min for cleaner visualization)
|
|
aggregated_data = aggregate_to_minutes(data_1min, 15)
|
|
|
|
# Create figure with subplots
|
|
fig, (ax1, ax2, ax3) = plt.subplots(3, 1, figsize=(20, 16))
|
|
|
|
# Plot 1: Price with all signals
|
|
ax1.plot(aggregated_data.index, aggregated_data['close'], 'k-', alpha=0.7, linewidth=1.5, label='BTC Price (15min)')
|
|
|
|
# Plot original strategy signals
|
|
original_entries = [s for s in original_signals if s['type'] == 'ENTRY']
|
|
original_exits = [s for s in original_signals if s['type'] == 'EXIT']
|
|
|
|
if original_entries:
|
|
entry_times = [s['timestamp'] for s in original_entries]
|
|
entry_prices = [s['price'] * 1.03 for s in original_entries] # Position above price
|
|
ax1.scatter(entry_times, entry_prices, color='green', marker='^', s=100,
|
|
alpha=0.8, label=f'Original Entry ({len(original_entries)})', zorder=5)
|
|
|
|
if original_exits:
|
|
exit_times = [s['timestamp'] for s in original_exits]
|
|
exit_prices = [s['price'] * 1.03 for s in original_exits] # Position above price
|
|
ax1.scatter(exit_times, exit_prices, color='red', marker='v', s=100,
|
|
alpha=0.8, label=f'Original Exit ({len(original_exits)})', zorder=5)
|
|
|
|
# Plot incremental strategy signals
|
|
incremental_entries = [s for s in incremental_signals if s['type'] == 'BUY']
|
|
incremental_exits = [s for s in incremental_signals if s['type'] == 'SELL']
|
|
|
|
if incremental_entries:
|
|
entry_times = [s['timestamp'] for s in incremental_entries]
|
|
entry_prices = [s['price'] * 0.97 for s in incremental_entries] # Position below price
|
|
ax1.scatter(entry_times, entry_prices, color='lightgreen', marker='^', s=80,
|
|
alpha=0.8, label=f'Incremental Entry ({len(incremental_entries)})', zorder=5)
|
|
|
|
if incremental_exits:
|
|
exit_times = [s['timestamp'] for s in incremental_exits]
|
|
exit_prices = [s['price'] * 0.97 for s in incremental_exits] # Position below price
|
|
ax1.scatter(exit_times, exit_prices, color='orange', marker='v', s=80,
|
|
alpha=0.8, label=f'Incremental Exit ({len(incremental_exits)})', zorder=5)
|
|
|
|
ax1.set_title(f'Strategy Signals Comparison: {start_date} to {end_date}', fontsize=16, fontweight='bold')
|
|
ax1.set_ylabel('Price (USD)', fontsize=12)
|
|
ax1.legend(loc='upper left', fontsize=10)
|
|
ax1.grid(True, alpha=0.3)
|
|
|
|
# Format x-axis
|
|
ax1.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d'))
|
|
ax1.xaxis.set_major_locator(mdates.WeekdayLocator(interval=2))
|
|
plt.setp(ax1.xaxis.get_majorticklabels(), rotation=45)
|
|
|
|
# Plot 2: Signal frequency over time (daily counts)
|
|
# Create daily signal counts
|
|
daily_signals = {}
|
|
|
|
for signal in original_signals:
|
|
date = signal['timestamp'].date()
|
|
if date not in daily_signals:
|
|
daily_signals[date] = {'original_entry': 0, 'original_exit': 0, 'inc_entry': 0, 'inc_exit': 0}
|
|
if signal['type'] == 'ENTRY':
|
|
daily_signals[date]['original_entry'] += 1
|
|
else:
|
|
daily_signals[date]['original_exit'] += 1
|
|
|
|
for signal in incremental_signals:
|
|
date = signal['timestamp'].date()
|
|
if date not in daily_signals:
|
|
daily_signals[date] = {'original_entry': 0, 'original_exit': 0, 'inc_entry': 0, 'inc_exit': 0}
|
|
if signal['type'] == 'BUY':
|
|
daily_signals[date]['inc_entry'] += 1
|
|
else:
|
|
daily_signals[date]['inc_exit'] += 1
|
|
|
|
if daily_signals:
|
|
dates = sorted(daily_signals.keys())
|
|
orig_entries = [daily_signals[d]['original_entry'] for d in dates]
|
|
orig_exits = [daily_signals[d]['original_exit'] for d in dates]
|
|
inc_entries = [daily_signals[d]['inc_entry'] for d in dates]
|
|
inc_exits = [daily_signals[d]['inc_exit'] for d in dates]
|
|
|
|
width = 0.35
|
|
x = np.arange(len(dates))
|
|
|
|
ax2.bar(x - width/2, orig_entries, width, label='Original Entries', color='green', alpha=0.7)
|
|
ax2.bar(x - width/2, orig_exits, width, bottom=orig_entries, label='Original Exits', color='red', alpha=0.7)
|
|
ax2.bar(x + width/2, inc_entries, width, label='Incremental Entries', color='lightgreen', alpha=0.7)
|
|
ax2.bar(x + width/2, inc_exits, width, bottom=inc_entries, label='Incremental Exits', color='orange', alpha=0.7)
|
|
|
|
ax2.set_title('Daily Signal Frequency', fontsize=14, fontweight='bold')
|
|
ax2.set_ylabel('Number of Signals', fontsize=12)
|
|
ax2.set_xticks(x[::7]) # Show every 7th date
|
|
ax2.set_xticklabels([dates[i].strftime('%m-%d') for i in range(0, len(dates), 7)], rotation=45)
|
|
ax2.legend(fontsize=10)
|
|
ax2.grid(True, alpha=0.3, axis='y')
|
|
|
|
# Plot 3: Signal statistics comparison
|
|
strategies = ['Original', 'Incremental']
|
|
entry_counts = [len(original_entries), len(incremental_entries)]
|
|
exit_counts = [len(original_exits), len(incremental_exits)]
|
|
|
|
x = np.arange(len(strategies))
|
|
width = 0.35
|
|
|
|
bars1 = ax3.bar(x - width/2, entry_counts, width, label='Entry Signals', color='green', alpha=0.7)
|
|
bars2 = ax3.bar(x + width/2, exit_counts, width, label='Exit Signals', color='red', alpha=0.7)
|
|
|
|
ax3.set_title('Total Signal Counts', fontsize=14, fontweight='bold')
|
|
ax3.set_ylabel('Number of Signals', fontsize=12)
|
|
ax3.set_xticks(x)
|
|
ax3.set_xticklabels(strategies)
|
|
ax3.legend(fontsize=10)
|
|
ax3.grid(True, alpha=0.3, axis='y')
|
|
|
|
# Add value labels on bars
|
|
for bars in [bars1, bars2]:
|
|
for bar in bars:
|
|
height = bar.get_height()
|
|
ax3.text(bar.get_x() + bar.get_width()/2., height + 0.5,
|
|
f'{int(height)}', ha='center', va='bottom', fontweight='bold')
|
|
|
|
plt.tight_layout()
|
|
|
|
# Save plot
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
# plt.show()
|
|
plot_file = os.path.join(output_dir, "signals_comparison.png")
|
|
plt.savefig(plot_file, dpi=300, bbox_inches='tight')
|
|
plt.close()
|
|
print(f"Saved signals comparison plot to: {plot_file}")
|
|
|
|
|
|
def save_signals_data(original_signals: list, incremental_signals: list, output_dir: str):
|
|
"""Save signals data to CSV files."""
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
|
|
# Save original signals
|
|
if original_signals:
|
|
orig_df = pd.DataFrame(original_signals)
|
|
orig_file = os.path.join(output_dir, "original_signals.csv")
|
|
orig_df.to_csv(orig_file, index=False)
|
|
print(f"Saved original signals to: {orig_file}")
|
|
|
|
# Save incremental signals
|
|
if incremental_signals:
|
|
inc_df = pd.DataFrame(incremental_signals)
|
|
inc_file = os.path.join(output_dir, "incremental_signals.csv")
|
|
inc_df.to_csv(inc_file, index=False)
|
|
print(f"Saved incremental signals to: {inc_file}")
|
|
|
|
# Create summary
|
|
summary = {
|
|
'test_date': datetime.now().isoformat(),
|
|
'original_strategy': {
|
|
'total_signals': len(original_signals),
|
|
'entry_signals': len([s for s in original_signals if s['type'] == 'ENTRY']),
|
|
'exit_signals': len([s for s in original_signals if s['type'] == 'EXIT'])
|
|
},
|
|
'incremental_strategy': {
|
|
'total_signals': len(incremental_signals),
|
|
'entry_signals': len([s for s in incremental_signals if s['type'] == 'BUY']),
|
|
'exit_signals': len([s for s in incremental_signals if s['type'] == 'SELL'])
|
|
}
|
|
}
|
|
|
|
import json
|
|
summary_file = os.path.join(output_dir, "signals_summary.json")
|
|
with open(summary_file, 'w') as f:
|
|
json.dump(summary, f, indent=2)
|
|
print(f"Saved signals summary to: {summary_file}")
|
|
|
|
|
|
def print_signals_summary(original_signals: list, incremental_signals: list):
|
|
"""Print a detailed signals comparison summary."""
|
|
print("\n" + "="*80)
|
|
print("SIGNALS COMPARISON SUMMARY")
|
|
print("="*80)
|
|
|
|
# Count signals by type
|
|
orig_entries = len([s for s in original_signals if s['type'] == 'ENTRY'])
|
|
orig_exits = len([s for s in original_signals if s['type'] == 'EXIT'])
|
|
inc_entries = len([s for s in incremental_signals if s['type'] == 'BUY'])
|
|
inc_exits = len([s for s in incremental_signals if s['type'] == 'SELL'])
|
|
|
|
print(f"\n📊 SIGNAL COUNTS:")
|
|
print(f"{'Signal Type':<20} {'Original':<15} {'Incremental':<15} {'Difference':<15}")
|
|
print("-" * 65)
|
|
print(f"{'Entry Signals':<20} {orig_entries:<15} {inc_entries:<15} {inc_entries - orig_entries:<15}")
|
|
print(f"{'Exit Signals':<20} {orig_exits:<15} {inc_exits:<15} {inc_exits - orig_exits:<15}")
|
|
print(f"{'Total Signals':<20} {len(original_signals):<15} {len(incremental_signals):<15} {len(incremental_signals) - len(original_signals):<15}")
|
|
|
|
# Signal timing analysis
|
|
if original_signals and incremental_signals:
|
|
orig_times = [s['timestamp'] for s in original_signals]
|
|
inc_times = [s['timestamp'] for s in incremental_signals]
|
|
|
|
print(f"\n📅 TIMING ANALYSIS:")
|
|
print(f"{'Metric':<20} {'Original':<15} {'Incremental':<15}")
|
|
print("-" * 50)
|
|
print(f"{'First Signal':<20} {min(orig_times).strftime('%Y-%m-%d %H:%M'):<15} {min(inc_times).strftime('%Y-%m-%d %H:%M'):<15}")
|
|
print(f"{'Last Signal':<20} {max(orig_times).strftime('%Y-%m-%d %H:%M'):<15} {max(inc_times).strftime('%Y-%m-%d %H:%M'):<15}")
|
|
|
|
print("\n" + "="*80)
|
|
|
|
|
|
def main():
|
|
"""Main signals comparison function."""
|
|
print("🚀 Comparing Strategy Signals (No Backtesting)")
|
|
print("=" * 80)
|
|
|
|
# Configuration
|
|
start_date = "2025-01-01"
|
|
end_date = "2025-01-10"
|
|
timeframe = "15min"
|
|
|
|
print(f"📅 Test Period: {start_date} to {end_date}")
|
|
print(f"⏱️ Timeframe: {timeframe}")
|
|
print(f"📊 Data Source: btcusd_1-min_data.csv")
|
|
|
|
try:
|
|
# Load data
|
|
storage = Storage()
|
|
data_file = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "data", "btcusd_1-min_data.csv")
|
|
|
|
print(f"\n📂 Loading data from: {data_file}")
|
|
data_1min = storage.load_data(data_file, start_date, end_date)
|
|
print(f" Loaded {len(data_1min)} minute-level data points")
|
|
|
|
if len(data_1min) == 0:
|
|
print(f"❌ No data loaded for period {start_date} to {end_date}")
|
|
return False
|
|
|
|
# Extract signals from both strategies
|
|
original_signals = extract_original_signals(data_1min, timeframe)
|
|
incremental_signals = extract_incremental_signals(data_1min, timeframe)
|
|
|
|
# Print comparison summary
|
|
print_signals_summary(original_signals, incremental_signals)
|
|
|
|
# Save signals data
|
|
output_dir = "results/signals_comparison"
|
|
save_signals_data(original_signals, incremental_signals, output_dir)
|
|
|
|
# Create comparison plot
|
|
create_signals_comparison_plot(data_1min, original_signals, incremental_signals,
|
|
start_date, end_date, output_dir)
|
|
|
|
print(f"\n📁 Results saved to: {output_dir}/")
|
|
print(f" - signals_comparison.png")
|
|
print(f" - original_signals.csv")
|
|
print(f" - incremental_signals.csv")
|
|
print(f" - signals_summary.json")
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"\n❌ Error during signals comparison: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
return False
|
|
|
|
|
|
if __name__ == "__main__":
|
|
success = main()
|
|
sys.exit(0 if success else 1) |