Cycles/test/indicators/test_moving_averages.py

335 lines
13 KiB
Python
Raw Normal View History

"""
Moving Average Indicators Comparison Test
Focused testing for Moving Average and Exponential Moving Average implementations.
"""
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from datetime import datetime
import sys
from pathlib import Path
# Add project root to path
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
# Import original indicators
from cycles.IncStrategies.indicators import (
MovingAverageState as OriginalMA,
ExponentialMovingAverageState as OriginalEMA
)
# Import new indicators
from IncrementalTrader.strategies.indicators import (
MovingAverageState as NewMA,
ExponentialMovingAverageState as NewEMA
)
class MovingAverageComparisonTest:
"""Test framework for comparing moving average implementations."""
def __init__(self, data_file: str = "data/btcusd_1-min_data.csv", sample_size: int = 5000):
self.data_file = data_file
self.sample_size = sample_size
self.data = None
self.results = {}
# Create results directory
self.results_dir = Path("test/results/moving_averages")
self.results_dir.mkdir(parents=True, exist_ok=True)
def load_data(self):
"""Load and prepare the data for testing."""
print(f"Loading data from {self.data_file}...")
df = pd.read_csv(self.data_file)
df['datetime'] = pd.to_datetime(df['Timestamp'], unit='s')
if self.sample_size and len(df) > self.sample_size:
df = df.tail(self.sample_size).reset_index(drop=True)
self.data = df
print(f"Loaded {len(df)} data points from {df['datetime'].iloc[0]} to {df['datetime'].iloc[-1]}")
def test_simple_moving_average(self, periods=[5, 10, 20, 50, 100]):
"""Test Simple Moving Average implementations."""
print("\n=== Testing Simple Moving Average ===")
for period in periods:
print(f"Testing SMA({period})...")
# Initialize indicators
original_ma = OriginalMA(period)
new_ma = NewMA(period)
original_values = []
new_values = []
prices = []
# Process data
for _, row in self.data.iterrows():
price = row['Close']
prices.append(price)
original_ma.update(price)
new_ma.update(price)
original_values.append(original_ma.get_current_value() if original_ma.is_warmed_up() else np.nan)
new_values.append(new_ma.get_current_value() if new_ma.is_warmed_up() else np.nan)
# Store results
self.results[f'SMA_{period}'] = {
'original': original_values,
'new': new_values,
'prices': prices,
'dates': self.data['datetime'].tolist(),
'period': period
}
# Calculate differences
diff = np.array(new_values) - np.array(original_values)
valid_diff = diff[~np.isnan(diff)]
if len(valid_diff) > 0:
max_diff = np.max(np.abs(valid_diff))
mean_diff = np.mean(np.abs(valid_diff))
std_diff = np.std(valid_diff)
print(f" Max difference: {max_diff:.12f}")
print(f" Mean difference: {mean_diff:.12f}")
print(f" Std difference: {std_diff:.12f}")
# Status check
if max_diff < 1e-10:
print(f" ✅ PASSED: Mathematically equivalent")
elif max_diff < 1e-6:
print(f" ⚠️ WARNING: Small differences (floating point precision)")
else:
print(f" ❌ FAILED: Significant differences detected")
else:
print(f" ❌ ERROR: No valid data points")
def test_exponential_moving_average(self, periods=[5, 10, 20, 50, 100]):
"""Test Exponential Moving Average implementations."""
print("\n=== Testing Exponential Moving Average ===")
for period in periods:
print(f"Testing EMA({period})...")
# Initialize indicators
original_ema = OriginalEMA(period)
new_ema = NewEMA(period)
original_values = []
new_values = []
prices = []
# Process data
for _, row in self.data.iterrows():
price = row['Close']
prices.append(price)
original_ema.update(price)
new_ema.update(price)
original_values.append(original_ema.get_current_value() if original_ema.is_warmed_up() else np.nan)
new_values.append(new_ema.get_current_value() if new_ema.is_warmed_up() else np.nan)
# Store results
self.results[f'EMA_{period}'] = {
'original': original_values,
'new': new_values,
'prices': prices,
'dates': self.data['datetime'].tolist(),
'period': period
}
# Calculate differences
diff = np.array(new_values) - np.array(original_values)
valid_diff = diff[~np.isnan(diff)]
if len(valid_diff) > 0:
max_diff = np.max(np.abs(valid_diff))
mean_diff = np.mean(np.abs(valid_diff))
std_diff = np.std(valid_diff)
print(f" Max difference: {max_diff:.12f}")
print(f" Mean difference: {mean_diff:.12f}")
print(f" Std difference: {std_diff:.12f}")
# Status check
if max_diff < 1e-10:
print(f" ✅ PASSED: Mathematically equivalent")
elif max_diff < 1e-6:
print(f" ⚠️ WARNING: Small differences (floating point precision)")
else:
print(f" ❌ FAILED: Significant differences detected")
else:
print(f" ❌ ERROR: No valid data points")
def plot_comparison(self, indicator_name: str):
"""Plot detailed comparison for a specific indicator."""
if indicator_name not in self.results:
print(f"No results found for {indicator_name}")
return
result = self.results[indicator_name]
dates = pd.to_datetime(result['dates'])
# Create figure with subplots
fig, axes = plt.subplots(3, 1, figsize=(15, 12))
fig.suptitle(f'{indicator_name} - Detailed Comparison Analysis', fontsize=16)
# Plot 1: Price and indicators
ax1 = axes[0]
ax1.plot(dates, result['prices'], label='Price', alpha=0.6, color='gray')
ax1.plot(dates, result['original'], label='Original', alpha=0.8, linewidth=2)
ax1.plot(dates, result['new'], label='New', alpha=0.8, linewidth=2, linestyle='--')
ax1.set_title(f'{indicator_name} vs Price')
ax1.legend()
ax1.grid(True, alpha=0.3)
# Plot 2: Overlay comparison (zoomed)
ax2 = axes[1]
ax2.plot(dates, result['original'], label='Original', alpha=0.8, linewidth=2)
ax2.plot(dates, result['new'], label='New', alpha=0.8, linewidth=2, linestyle='--')
ax2.set_title(f'{indicator_name} Values Comparison (Detailed)')
ax2.legend()
ax2.grid(True, alpha=0.3)
# Plot 3: Difference analysis
ax3 = axes[2]
diff = np.array(result['new']) - np.array(result['original'])
ax3.plot(dates, diff, color='red', alpha=0.7, linewidth=1)
ax3.set_title(f'{indicator_name} Difference (New - Original)')
ax3.axhline(y=0, color='black', linestyle='-', alpha=0.5)
ax3.grid(True, alpha=0.3)
# Add statistics text
valid_diff = diff[~np.isnan(diff)]
if len(valid_diff) > 0:
stats_text = f'Max: {np.max(np.abs(valid_diff)):.2e}\n'
stats_text += f'Mean: {np.mean(np.abs(valid_diff)):.2e}\n'
stats_text += f'Std: {np.std(valid_diff):.2e}'
ax3.text(0.02, 0.98, stats_text, transform=ax3.transAxes,
verticalalignment='top', bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.8))
# Format x-axis
for ax in axes:
ax.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d'))
ax.xaxis.set_major_locator(mdates.DayLocator(interval=max(1, len(dates)//10)))
plt.setp(ax.xaxis.get_majorticklabels(), rotation=45)
plt.tight_layout()
# Save plot
plot_path = self.results_dir / f"{indicator_name}_detailed_comparison.png"
plt.savefig(plot_path, dpi=300, bbox_inches='tight')
print(f"Plot saved to {plot_path}")
plt.show()
def plot_all_comparisons(self):
"""Plot comparisons for all tested indicators."""
print("\n=== Generating Detailed Comparison Plots ===")
for indicator_name in self.results.keys():
print(f"Plotting {indicator_name}...")
self.plot_comparison(indicator_name)
plt.close('all')
def generate_report(self):
"""Generate detailed report for moving averages."""
print("\n=== Generating Moving Average Report ===")
report_lines = []
report_lines.append("# Moving Average Indicators Comparison Report")
report_lines.append(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
report_lines.append(f"Data file: {self.data_file}")
report_lines.append(f"Sample size: {len(self.data)} data points")
report_lines.append("")
# Summary table
report_lines.append("## Summary Table")
report_lines.append("| Indicator | Period | Max Diff | Mean Diff | Status |")
report_lines.append("|-----------|--------|----------|-----------|--------|")
for indicator_name, result in self.results.items():
diff = np.array(result['new']) - np.array(result['original'])
valid_diff = diff[~np.isnan(diff)]
if len(valid_diff) > 0:
max_diff = np.max(np.abs(valid_diff))
mean_diff = np.mean(np.abs(valid_diff))
if max_diff < 1e-10:
status = "✅ PASSED"
elif max_diff < 1e-6:
status = "⚠️ WARNING"
else:
status = "❌ FAILED"
report_lines.append(f"| {indicator_name} | {result['period']} | {max_diff:.2e} | {mean_diff:.2e} | {status} |")
else:
report_lines.append(f"| {indicator_name} | {result['period']} | N/A | N/A | ❌ ERROR |")
report_lines.append("")
# Detailed analysis
report_lines.append("## Detailed Analysis")
for indicator_name, result in self.results.items():
report_lines.append(f"### {indicator_name}")
diff = np.array(result['new']) - np.array(result['original'])
valid_diff = diff[~np.isnan(diff)]
if len(valid_diff) > 0:
report_lines.append(f"- **Period**: {result['period']}")
report_lines.append(f"- **Valid data points**: {len(valid_diff)}")
report_lines.append(f"- **Max absolute difference**: {np.max(np.abs(valid_diff)):.12f}")
report_lines.append(f"- **Mean absolute difference**: {np.mean(np.abs(valid_diff)):.12f}")
report_lines.append(f"- **Standard deviation**: {np.std(valid_diff):.12f}")
report_lines.append(f"- **Min difference**: {np.min(valid_diff):.12f}")
report_lines.append(f"- **Max difference**: {np.max(valid_diff):.12f}")
# Percentile analysis
percentiles = [1, 5, 25, 50, 75, 95, 99]
perc_values = np.percentile(np.abs(valid_diff), percentiles)
perc_str = ", ".join([f"P{p}: {v:.2e}" for p, v in zip(percentiles, perc_values)])
report_lines.append(f"- **Percentiles**: {perc_str}")
report_lines.append("")
# Save report
report_path = self.results_dir / "moving_averages_report.md"
with open(report_path, 'w', encoding='utf-8') as f:
f.write('\n'.join(report_lines))
print(f"Report saved to {report_path}")
def run_tests(self):
"""Run all moving average tests."""
print("Starting Moving Average Comparison Tests...")
# Load data
self.load_data()
# Run tests
self.test_simple_moving_average()
self.test_exponential_moving_average()
# Generate outputs
self.plot_all_comparisons()
self.generate_report()
print("\n✅ Moving Average tests completed!")
if __name__ == "__main__":
tester = MovingAverageComparisonTest(sample_size=3000)
tester.run_tests()