delete test file
This commit is contained in:
parent
bff3413eed
commit
ed6d668a8a
@ -1,249 +0,0 @@
|
||||
"""
|
||||
Test script for IncRandomStrategy
|
||||
|
||||
This script tests the incremental random strategy to verify it works correctly
|
||||
and can generate signals incrementally with proper performance characteristics.
|
||||
"""
|
||||
|
||||
import pandas as pd
|
||||
import numpy as np
|
||||
import time
|
||||
import logging
|
||||
from typing import List, Dict
|
||||
|
||||
from .random_strategy import IncRandomStrategy
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def generate_test_data(num_points: int = 100) -> List[Dict[str, float]]:
|
||||
"""
|
||||
Generate synthetic OHLCV data for testing.
|
||||
|
||||
Args:
|
||||
num_points: Number of data points to generate
|
||||
|
||||
Returns:
|
||||
List of OHLCV data dictionaries
|
||||
"""
|
||||
np.random.seed(42) # For reproducible test data
|
||||
|
||||
data_points = []
|
||||
base_price = 50000.0
|
||||
|
||||
for i in range(num_points):
|
||||
# Generate realistic OHLCV data with some volatility
|
||||
price_change = np.random.normal(0, 100) # Random walk with volatility
|
||||
base_price += price_change
|
||||
|
||||
# Ensure realistic OHLC relationships
|
||||
open_price = base_price
|
||||
high_price = open_price + abs(np.random.normal(0, 50))
|
||||
low_price = open_price - abs(np.random.normal(0, 50))
|
||||
close_price = open_price + np.random.normal(0, 30)
|
||||
|
||||
# Ensure OHLC constraints
|
||||
high_price = max(high_price, open_price, close_price)
|
||||
low_price = min(low_price, open_price, close_price)
|
||||
|
||||
volume = np.random.uniform(1000, 10000)
|
||||
|
||||
data_points.append({
|
||||
'open': open_price,
|
||||
'high': high_price,
|
||||
'low': low_price,
|
||||
'close': close_price,
|
||||
'volume': volume
|
||||
})
|
||||
|
||||
return data_points
|
||||
|
||||
|
||||
def test_inc_random_strategy():
|
||||
"""Test the IncRandomStrategy with synthetic data."""
|
||||
logger.info("Starting IncRandomStrategy test...")
|
||||
|
||||
# Create strategy with test parameters
|
||||
strategy_params = {
|
||||
"entry_probability": 0.2, # Higher probability for testing
|
||||
"exit_probability": 0.3,
|
||||
"min_confidence": 0.7,
|
||||
"max_confidence": 0.9,
|
||||
"signal_frequency": 3, # Generate signal every 3 bars
|
||||
"random_seed": 42 # For reproducible results
|
||||
}
|
||||
|
||||
strategy = IncRandomStrategy(weight=1.0, params=strategy_params)
|
||||
|
||||
# Generate test data
|
||||
test_data = generate_test_data(50)
|
||||
timestamps = pd.date_range(start='2024-01-01 09:00:00', periods=len(test_data), freq='1min')
|
||||
|
||||
logger.info(f"Generated {len(test_data)} test data points")
|
||||
logger.info(f"Strategy minimum buffer size: {strategy.get_minimum_buffer_size()}")
|
||||
logger.info(f"Strategy supports incremental: {strategy.supports_incremental_calculation()}")
|
||||
|
||||
# Track signals and performance
|
||||
entry_signals = []
|
||||
exit_signals = []
|
||||
update_times = []
|
||||
signal_times = []
|
||||
|
||||
# Process data incrementally
|
||||
for i, (data_point, timestamp) in enumerate(zip(test_data, timestamps)):
|
||||
# Measure update time
|
||||
start_time = time.perf_counter()
|
||||
strategy.calculate_on_data(data_point, timestamp)
|
||||
update_time = time.perf_counter() - start_time
|
||||
update_times.append(update_time)
|
||||
|
||||
# Generate signals
|
||||
start_time = time.perf_counter()
|
||||
entry_signal = strategy.get_entry_signal()
|
||||
exit_signal = strategy.get_exit_signal()
|
||||
signal_time = time.perf_counter() - start_time
|
||||
signal_times.append(signal_time)
|
||||
|
||||
# Track signals
|
||||
if entry_signal.signal_type == "ENTRY":
|
||||
entry_signals.append((i, entry_signal))
|
||||
logger.info(f"Entry signal at index {i}: confidence={entry_signal.confidence:.2f}, "
|
||||
f"price=${entry_signal.price:.2f}")
|
||||
|
||||
if exit_signal.signal_type == "EXIT":
|
||||
exit_signals.append((i, exit_signal))
|
||||
logger.info(f"Exit signal at index {i}: confidence={exit_signal.confidence:.2f}, "
|
||||
f"price=${exit_signal.price:.2f}, type={exit_signal.metadata.get('type')}")
|
||||
|
||||
# Log progress every 10 points
|
||||
if (i + 1) % 10 == 0:
|
||||
logger.info(f"Processed {i + 1}/{len(test_data)} data points, "
|
||||
f"warmed_up={strategy.is_warmed_up}")
|
||||
|
||||
# Performance analysis
|
||||
avg_update_time = np.mean(update_times) * 1000 # Convert to milliseconds
|
||||
max_update_time = np.max(update_times) * 1000
|
||||
avg_signal_time = np.mean(signal_times) * 1000
|
||||
max_signal_time = np.max(signal_times) * 1000
|
||||
|
||||
logger.info("\n" + "="*50)
|
||||
logger.info("TEST RESULTS")
|
||||
logger.info("="*50)
|
||||
logger.info(f"Total data points processed: {len(test_data)}")
|
||||
logger.info(f"Entry signals generated: {len(entry_signals)}")
|
||||
logger.info(f"Exit signals generated: {len(exit_signals)}")
|
||||
logger.info(f"Strategy warmed up: {strategy.is_warmed_up}")
|
||||
logger.info(f"Final calculation mode: {strategy.calculation_mode}")
|
||||
|
||||
logger.info("\nPERFORMANCE METRICS:")
|
||||
logger.info(f"Average update time: {avg_update_time:.3f} ms")
|
||||
logger.info(f"Maximum update time: {max_update_time:.3f} ms")
|
||||
logger.info(f"Average signal time: {avg_signal_time:.3f} ms")
|
||||
logger.info(f"Maximum signal time: {max_signal_time:.3f} ms")
|
||||
|
||||
# Performance targets check
|
||||
target_update_time = 1.0 # 1ms target
|
||||
target_signal_time = 10.0 # 10ms target
|
||||
|
||||
logger.info("\nPERFORMANCE TARGET CHECK:")
|
||||
logger.info(f"Update time target (<{target_update_time}ms): {'✅ PASS' if avg_update_time < target_update_time else '❌ FAIL'}")
|
||||
logger.info(f"Signal time target (<{target_signal_time}ms): {'✅ PASS' if avg_signal_time < target_signal_time else '❌ FAIL'}")
|
||||
|
||||
# State summary
|
||||
state_summary = strategy.get_current_state_summary()
|
||||
logger.info(f"\nFINAL STATE SUMMARY:")
|
||||
for key, value in state_summary.items():
|
||||
if key != 'performance_metrics': # Skip detailed performance metrics
|
||||
logger.info(f" {key}: {value}")
|
||||
|
||||
# Test state reset
|
||||
logger.info("\nTesting state reset...")
|
||||
strategy.reset_calculation_state()
|
||||
logger.info(f"After reset - warmed_up: {strategy.is_warmed_up}, mode: {strategy.calculation_mode}")
|
||||
|
||||
logger.info("\n✅ IncRandomStrategy test completed successfully!")
|
||||
|
||||
return {
|
||||
'entry_signals': len(entry_signals),
|
||||
'exit_signals': len(exit_signals),
|
||||
'avg_update_time_ms': avg_update_time,
|
||||
'avg_signal_time_ms': avg_signal_time,
|
||||
'performance_targets_met': avg_update_time < target_update_time and avg_signal_time < target_signal_time
|
||||
}
|
||||
|
||||
|
||||
def test_strategy_comparison():
|
||||
"""Test that incremental strategy produces consistent results with same random seed."""
|
||||
logger.info("\nTesting strategy consistency with same random seed...")
|
||||
|
||||
# Create two strategies with same parameters and seed
|
||||
params = {
|
||||
"entry_probability": 0.15,
|
||||
"exit_probability": 0.2,
|
||||
"random_seed": 123
|
||||
}
|
||||
|
||||
strategy1 = IncRandomStrategy(weight=1.0, params=params)
|
||||
strategy2 = IncRandomStrategy(weight=1.0, params=params)
|
||||
|
||||
# Generate test data
|
||||
test_data = generate_test_data(20)
|
||||
timestamps = pd.date_range(start='2024-01-01 10:00:00', periods=len(test_data), freq='1min')
|
||||
|
||||
signals1 = []
|
||||
signals2 = []
|
||||
|
||||
# Process same data with both strategies
|
||||
for data_point, timestamp in zip(test_data, timestamps):
|
||||
strategy1.calculate_on_data(data_point, timestamp)
|
||||
strategy2.calculate_on_data(data_point, timestamp)
|
||||
|
||||
entry1 = strategy1.get_entry_signal()
|
||||
entry2 = strategy2.get_entry_signal()
|
||||
|
||||
signals1.append(entry1.signal_type)
|
||||
signals2.append(entry2.signal_type)
|
||||
|
||||
# Check if signals are identical
|
||||
signals_match = signals1 == signals2
|
||||
logger.info(f"Signals consistency test: {'✅ PASS' if signals_match else '❌ FAIL'}")
|
||||
|
||||
if not signals_match:
|
||||
logger.warning("Signal mismatch detected:")
|
||||
for i, (s1, s2) in enumerate(zip(signals1, signals2)):
|
||||
if s1 != s2:
|
||||
logger.warning(f" Index {i}: Strategy1={s1}, Strategy2={s2}")
|
||||
|
||||
return signals_match
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
try:
|
||||
# Run main test
|
||||
test_results = test_inc_random_strategy()
|
||||
|
||||
# Run consistency test
|
||||
consistency_result = test_strategy_comparison()
|
||||
|
||||
# Summary
|
||||
logger.info("\n" + "="*60)
|
||||
logger.info("OVERALL TEST SUMMARY")
|
||||
logger.info("="*60)
|
||||
logger.info(f"Main test completed: ✅")
|
||||
logger.info(f"Performance targets met: {'✅' if test_results['performance_targets_met'] else '❌'}")
|
||||
logger.info(f"Consistency test passed: {'✅' if consistency_result else '❌'}")
|
||||
logger.info(f"Entry signals generated: {test_results['entry_signals']}")
|
||||
logger.info(f"Exit signals generated: {test_results['exit_signals']}")
|
||||
logger.info(f"Average update time: {test_results['avg_update_time_ms']:.3f} ms")
|
||||
logger.info(f"Average signal time: {test_results['avg_signal_time_ms']:.3f} ms")
|
||||
|
||||
if test_results['performance_targets_met'] and consistency_result:
|
||||
logger.info("\n🎉 ALL TESTS PASSED! IncRandomStrategy is ready for use.")
|
||||
else:
|
||||
logger.warning("\n⚠️ Some tests failed. Review the results above.")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Test failed with error: {e}")
|
||||
raise
|
||||
Loading…
x
Reference in New Issue
Block a user