""" Test script for IncRandomStrategy This script tests the incremental random strategy to verify it works correctly and can generate signals incrementally with proper performance characteristics. """ import pandas as pd import numpy as np import time import logging from typing import List, Dict from .random_strategy import IncRandomStrategy # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) def generate_test_data(num_points: int = 100) -> List[Dict[str, float]]: """ Generate synthetic OHLCV data for testing. Args: num_points: Number of data points to generate Returns: List of OHLCV data dictionaries """ np.random.seed(42) # For reproducible test data data_points = [] base_price = 50000.0 for i in range(num_points): # Generate realistic OHLCV data with some volatility price_change = np.random.normal(0, 100) # Random walk with volatility base_price += price_change # Ensure realistic OHLC relationships open_price = base_price high_price = open_price + abs(np.random.normal(0, 50)) low_price = open_price - abs(np.random.normal(0, 50)) close_price = open_price + np.random.normal(0, 30) # Ensure OHLC constraints high_price = max(high_price, open_price, close_price) low_price = min(low_price, open_price, close_price) volume = np.random.uniform(1000, 10000) data_points.append({ 'open': open_price, 'high': high_price, 'low': low_price, 'close': close_price, 'volume': volume }) return data_points def test_inc_random_strategy(): """Test the IncRandomStrategy with synthetic data.""" logger.info("Starting IncRandomStrategy test...") # Create strategy with test parameters strategy_params = { "entry_probability": 0.2, # Higher probability for testing "exit_probability": 0.3, "min_confidence": 0.7, "max_confidence": 0.9, "signal_frequency": 3, # Generate signal every 3 bars "random_seed": 42 # For reproducible results } strategy = IncRandomStrategy(weight=1.0, params=strategy_params) # Generate test data test_data = generate_test_data(50) timestamps = pd.date_range(start='2024-01-01 09:00:00', periods=len(test_data), freq='1min') logger.info(f"Generated {len(test_data)} test data points") logger.info(f"Strategy minimum buffer size: {strategy.get_minimum_buffer_size()}") logger.info(f"Strategy supports incremental: {strategy.supports_incremental_calculation()}") # Track signals and performance entry_signals = [] exit_signals = [] update_times = [] signal_times = [] # Process data incrementally for i, (data_point, timestamp) in enumerate(zip(test_data, timestamps)): # Measure update time start_time = time.perf_counter() strategy.calculate_on_data(data_point, timestamp) update_time = time.perf_counter() - start_time update_times.append(update_time) # Generate signals start_time = time.perf_counter() entry_signal = strategy.get_entry_signal() exit_signal = strategy.get_exit_signal() signal_time = time.perf_counter() - start_time signal_times.append(signal_time) # Track signals if entry_signal.signal_type == "ENTRY": entry_signals.append((i, entry_signal)) logger.info(f"Entry signal at index {i}: confidence={entry_signal.confidence:.2f}, " f"price=${entry_signal.price:.2f}") if exit_signal.signal_type == "EXIT": exit_signals.append((i, exit_signal)) logger.info(f"Exit signal at index {i}: confidence={exit_signal.confidence:.2f}, " f"price=${exit_signal.price:.2f}, type={exit_signal.metadata.get('type')}") # Log progress every 10 points if (i + 1) % 10 == 0: logger.info(f"Processed {i + 1}/{len(test_data)} data points, " f"warmed_up={strategy.is_warmed_up}") # Performance analysis avg_update_time = np.mean(update_times) * 1000 # Convert to milliseconds max_update_time = np.max(update_times) * 1000 avg_signal_time = np.mean(signal_times) * 1000 max_signal_time = np.max(signal_times) * 1000 logger.info("\n" + "="*50) logger.info("TEST RESULTS") logger.info("="*50) logger.info(f"Total data points processed: {len(test_data)}") logger.info(f"Entry signals generated: {len(entry_signals)}") logger.info(f"Exit signals generated: {len(exit_signals)}") logger.info(f"Strategy warmed up: {strategy.is_warmed_up}") logger.info(f"Final calculation mode: {strategy.calculation_mode}") logger.info("\nPERFORMANCE METRICS:") logger.info(f"Average update time: {avg_update_time:.3f} ms") logger.info(f"Maximum update time: {max_update_time:.3f} ms") logger.info(f"Average signal time: {avg_signal_time:.3f} ms") logger.info(f"Maximum signal time: {max_signal_time:.3f} ms") # Performance targets check target_update_time = 1.0 # 1ms target target_signal_time = 10.0 # 10ms target logger.info("\nPERFORMANCE TARGET CHECK:") logger.info(f"Update time target (<{target_update_time}ms): {'✅ PASS' if avg_update_time < target_update_time else '❌ FAIL'}") logger.info(f"Signal time target (<{target_signal_time}ms): {'✅ PASS' if avg_signal_time < target_signal_time else '❌ FAIL'}") # State summary state_summary = strategy.get_current_state_summary() logger.info(f"\nFINAL STATE SUMMARY:") for key, value in state_summary.items(): if key != 'performance_metrics': # Skip detailed performance metrics logger.info(f" {key}: {value}") # Test state reset logger.info("\nTesting state reset...") strategy.reset_calculation_state() logger.info(f"After reset - warmed_up: {strategy.is_warmed_up}, mode: {strategy.calculation_mode}") logger.info("\n✅ IncRandomStrategy test completed successfully!") return { 'entry_signals': len(entry_signals), 'exit_signals': len(exit_signals), 'avg_update_time_ms': avg_update_time, 'avg_signal_time_ms': avg_signal_time, 'performance_targets_met': avg_update_time < target_update_time and avg_signal_time < target_signal_time } def test_strategy_comparison(): """Test that incremental strategy produces consistent results with same random seed.""" logger.info("\nTesting strategy consistency with same random seed...") # Create two strategies with same parameters and seed params = { "entry_probability": 0.15, "exit_probability": 0.2, "random_seed": 123 } strategy1 = IncRandomStrategy(weight=1.0, params=params) strategy2 = IncRandomStrategy(weight=1.0, params=params) # Generate test data test_data = generate_test_data(20) timestamps = pd.date_range(start='2024-01-01 10:00:00', periods=len(test_data), freq='1min') signals1 = [] signals2 = [] # Process same data with both strategies for data_point, timestamp in zip(test_data, timestamps): strategy1.calculate_on_data(data_point, timestamp) strategy2.calculate_on_data(data_point, timestamp) entry1 = strategy1.get_entry_signal() entry2 = strategy2.get_entry_signal() signals1.append(entry1.signal_type) signals2.append(entry2.signal_type) # Check if signals are identical signals_match = signals1 == signals2 logger.info(f"Signals consistency test: {'✅ PASS' if signals_match else '❌ FAIL'}") if not signals_match: logger.warning("Signal mismatch detected:") for i, (s1, s2) in enumerate(zip(signals1, signals2)): if s1 != s2: logger.warning(f" Index {i}: Strategy1={s1}, Strategy2={s2}") return signals_match if __name__ == "__main__": try: # Run main test test_results = test_inc_random_strategy() # Run consistency test consistency_result = test_strategy_comparison() # Summary logger.info("\n" + "="*60) logger.info("OVERALL TEST SUMMARY") logger.info("="*60) logger.info(f"Main test completed: ✅") logger.info(f"Performance targets met: {'✅' if test_results['performance_targets_met'] else '❌'}") logger.info(f"Consistency test passed: {'✅' if consistency_result else '❌'}") logger.info(f"Entry signals generated: {test_results['entry_signals']}") logger.info(f"Exit signals generated: {test_results['exit_signals']}") logger.info(f"Average update time: {test_results['avg_update_time_ms']:.3f} ms") logger.info(f"Average signal time: {test_results['avg_signal_time_ms']:.3f} ms") if test_results['performance_targets_met'] and consistency_result: logger.info("\n🎉 ALL TESTS PASSED! IncRandomStrategy is ready for use.") else: logger.warning("\n⚠️ Some tests failed. Review the results above.") except Exception as e: logger.error(f"Test failed with error: {e}") raise