From 9376e138889f8f13fef016dc11c73d167c6489b8 Mon Sep 17 00:00:00 2001 From: "Vasily.onl" Date: Mon, 26 May 2025 13:26:16 +0800 Subject: [PATCH] random strategy --- cycles/IncStrategies/random_strategy.py | 360 +++++++++++++++++++ cycles/IncStrategies/test_random_strategy.py | 249 +++++++++++++ 2 files changed, 609 insertions(+) create mode 100644 cycles/IncStrategies/random_strategy.py create mode 100644 cycles/IncStrategies/test_random_strategy.py diff --git a/cycles/IncStrategies/random_strategy.py b/cycles/IncStrategies/random_strategy.py new file mode 100644 index 0000000..698c28a --- /dev/null +++ b/cycles/IncStrategies/random_strategy.py @@ -0,0 +1,360 @@ +""" +Incremental Random Strategy for Testing + +This strategy generates random entry and exit signals for testing the incremental strategy system. +It's useful for verifying that the incremental strategy framework is working correctly. +""" + +import random +import logging +import time +from typing import Dict, Optional +import pandas as pd + +from .base import IncStrategyBase, IncStrategySignal + +logger = logging.getLogger(__name__) + + +class IncRandomStrategy(IncStrategyBase): + """ + Incremental random signal generator strategy for testing. + + This strategy generates random entry and exit signals with configurable + probability and confidence levels. It's designed to test the incremental + strategy framework and signal processing system. + + The incremental version maintains minimal state and processes each new + data point independently, making it ideal for testing real-time performance. + + Parameters: + entry_probability: Probability of generating an entry signal (0.0-1.0) + exit_probability: Probability of generating an exit signal (0.0-1.0) + min_confidence: Minimum confidence level for signals + max_confidence: Maximum confidence level for signals + timeframe: Timeframe to operate on (default: "1min") + signal_frequency: How often to generate signals (every N bars) + random_seed: Optional seed for reproducible random signals + + Example: + strategy = IncRandomStrategy( + weight=1.0, + params={ + "entry_probability": 0.1, + "exit_probability": 0.15, + "min_confidence": 0.7, + "max_confidence": 0.9, + "signal_frequency": 5, + "random_seed": 42 # For reproducible testing + } + ) + """ + + def __init__(self, weight: float = 1.0, params: Optional[Dict] = None): + """Initialize the incremental random strategy.""" + super().__init__("inc_random", weight, params) + + # Strategy parameters with defaults + self.entry_probability = self.params.get("entry_probability", 0.05) # 5% chance per bar + self.exit_probability = self.params.get("exit_probability", 0.1) # 10% chance per bar + self.min_confidence = self.params.get("min_confidence", 0.6) + self.max_confidence = self.params.get("max_confidence", 0.9) + self.timeframe = self.params.get("timeframe", "1min") + self.signal_frequency = self.params.get("signal_frequency", 1) # Every bar + + # Create separate random instance for this strategy + self._random = random.Random() + random_seed = self.params.get("random_seed") + if random_seed is not None: + self._random.seed(random_seed) + logger.info(f"IncRandomStrategy: Set random seed to {random_seed}") + + # Internal state (minimal for random strategy) + self._bar_count = 0 + self._last_signal_bar = -1 + self._current_price = None + self._last_timestamp = None + + logger.info(f"IncRandomStrategy initialized with entry_prob={self.entry_probability}, " + f"exit_prob={self.exit_probability}, timeframe={self.timeframe}") + + def get_minimum_buffer_size(self) -> Dict[str, int]: + """ + Return minimum data points needed for each timeframe. + + Random strategy doesn't need any historical data for calculations, + so we only need 1 data point to start generating signals. + + Returns: + Dict[str, int]: Minimal buffer requirements + """ + return {"1min": 1} # Only need current data point + + def supports_incremental_calculation(self) -> bool: + """ + Whether strategy supports incremental calculation. + + Random strategy is ideal for incremental mode since it doesn't + depend on historical calculations. + + Returns: + bool: Always True for random strategy + """ + return True + + def calculate_on_data(self, new_data_point: Dict[str, float], timestamp: pd.Timestamp) -> None: + """ + Process a single new data point incrementally. + + For random strategy, we just update our internal state with the + current price and increment the bar counter. + + Args: + new_data_point: OHLCV data point {open, high, low, close, volume} + timestamp: Timestamp of the data point + """ + start_time = time.perf_counter() + + try: + # Update timeframe buffers (handled by base class) + self._update_timeframe_buffers(new_data_point, timestamp) + + # Update internal state + self._current_price = new_data_point['close'] + self._last_timestamp = timestamp + self._data_points_received += 1 + + # Check if we should update bar count based on timeframe + if self._should_update_bar_count(timestamp): + self._bar_count += 1 + + # Debug logging every 10 bars + if self._bar_count % 10 == 0: + logger.debug(f"IncRandomStrategy: Processing bar {self._bar_count}, " + f"price=${self._current_price:.2f}, timestamp={timestamp}") + + # Update warm-up status + if not self._is_warmed_up and self._data_points_received >= 1: + self._is_warmed_up = True + self._calculation_mode = "incremental" + logger.info(f"IncRandomStrategy: Warmed up after {self._data_points_received} data points") + + # Record performance metrics + update_time = time.perf_counter() - start_time + self._performance_metrics['update_times'].append(update_time) + + except Exception as e: + logger.error(f"IncRandomStrategy: Error in calculate_on_data: {e}") + self._performance_metrics['state_validation_failures'] += 1 + raise + + def _should_update_bar_count(self, timestamp: pd.Timestamp) -> bool: + """ + Check if we should increment bar count based on timeframe. + + For 1min timeframe, increment every data point. + For other timeframes, increment when timeframe period has passed. + + Args: + timestamp: Current timestamp + + Returns: + bool: Whether to increment bar count + """ + if self.timeframe == "1min": + return True # Every data point is a new bar + + if self._last_timestamp is None: + return True # First data point + + # Calculate timeframe interval + if self.timeframe.endswith("min"): + minutes = int(self.timeframe[:-3]) + interval = pd.Timedelta(minutes=minutes) + elif self.timeframe.endswith("h"): + hours = int(self.timeframe[:-1]) + interval = pd.Timedelta(hours=hours) + else: + return True # Unknown timeframe, update anyway + + # Check if enough time has passed + return timestamp >= self._last_timestamp + interval + + def get_entry_signal(self) -> IncStrategySignal: + """ + Generate random entry signals based on current state. + + Returns: + IncStrategySignal: Entry signal with confidence level + """ + if not self._is_warmed_up: + return IncStrategySignal("HOLD", 0.0) + + start_time = time.perf_counter() + + try: + # Check if we should generate a signal based on frequency + if (self._bar_count - self._last_signal_bar) < self.signal_frequency: + return IncStrategySignal("HOLD", 0.0) + + # Generate random entry signal using strategy's random instance + random_value = self._random.random() + if random_value < self.entry_probability: + confidence = self._random.uniform(self.min_confidence, self.max_confidence) + self._last_signal_bar = self._bar_count + + logger.info(f"IncRandomStrategy: Generated ENTRY signal at bar {self._bar_count}, " + f"price=${self._current_price:.2f}, confidence={confidence:.2f}, " + f"random_value={random_value:.3f}") + + signal = IncStrategySignal( + "ENTRY", + confidence=confidence, + price=self._current_price, + metadata={ + "strategy": "inc_random", + "bar_count": self._bar_count, + "timeframe": self.timeframe, + "random_value": random_value, + "timestamp": self._last_timestamp + } + ) + + # Record performance metrics + signal_time = time.perf_counter() - start_time + self._performance_metrics['signal_generation_times'].append(signal_time) + + return signal + + return IncStrategySignal("HOLD", 0.0) + + except Exception as e: + logger.error(f"IncRandomStrategy: Error in get_entry_signal: {e}") + return IncStrategySignal("HOLD", 0.0) + + def get_exit_signal(self) -> IncStrategySignal: + """ + Generate random exit signals based on current state. + + Returns: + IncStrategySignal: Exit signal with confidence level + """ + if not self._is_warmed_up: + return IncStrategySignal("HOLD", 0.0) + + start_time = time.perf_counter() + + try: + # Generate random exit signal using strategy's random instance + random_value = self._random.random() + if random_value < self.exit_probability: + confidence = self._random.uniform(self.min_confidence, self.max_confidence) + + # Randomly choose exit type + exit_types = ["SELL_SIGNAL", "TAKE_PROFIT", "STOP_LOSS"] + exit_type = self._random.choice(exit_types) + + logger.info(f"IncRandomStrategy: Generated EXIT signal at bar {self._bar_count}, " + f"price=${self._current_price:.2f}, confidence={confidence:.2f}, " + f"type={exit_type}, random_value={random_value:.3f}") + + signal = IncStrategySignal( + "EXIT", + confidence=confidence, + price=self._current_price, + metadata={ + "type": exit_type, + "strategy": "inc_random", + "bar_count": self._bar_count, + "timeframe": self.timeframe, + "random_value": random_value, + "timestamp": self._last_timestamp + } + ) + + # Record performance metrics + signal_time = time.perf_counter() - start_time + self._performance_metrics['signal_generation_times'].append(signal_time) + + return signal + + return IncStrategySignal("HOLD", 0.0) + + except Exception as e: + logger.error(f"IncRandomStrategy: Error in get_exit_signal: {e}") + return IncStrategySignal("HOLD", 0.0) + + def get_confidence(self) -> float: + """ + Return random confidence level for current market state. + + Returns: + float: Random confidence level between min and max confidence + """ + if not self._is_warmed_up: + return 0.0 + + return self._random.uniform(self.min_confidence, self.max_confidence) + + def reset_calculation_state(self) -> None: + """Reset internal calculation state for reinitialization.""" + super().reset_calculation_state() + + # Reset random strategy specific state + self._bar_count = 0 + self._last_signal_bar = -1 + self._current_price = None + self._last_timestamp = None + + # Reset random state if seed was provided + random_seed = self.params.get("random_seed") + if random_seed is not None: + self._random.seed(random_seed) + + logger.info("IncRandomStrategy: Calculation state reset") + + def _reinitialize_from_buffers(self) -> None: + """ + Reinitialize indicators from available buffer data. + + For random strategy, we just need to restore the current price + from the latest data point in the buffer. + """ + try: + # Get the latest data point from 1min buffer + buffer_1min = self._timeframe_buffers.get("1min") + if buffer_1min and len(buffer_1min) > 0: + latest_data = buffer_1min[-1] + self._current_price = latest_data['close'] + self._last_timestamp = latest_data.get('timestamp') + self._bar_count = len(buffer_1min) + + logger.info(f"IncRandomStrategy: Reinitialized from buffer with {self._bar_count} bars") + else: + logger.warning("IncRandomStrategy: No buffer data available for reinitialization") + + except Exception as e: + logger.error(f"IncRandomStrategy: Error reinitializing from buffers: {e}") + raise + + def get_current_state_summary(self) -> Dict[str, any]: + """Get summary of current calculation state for debugging.""" + base_summary = super().get_current_state_summary() + base_summary.update({ + 'entry_probability': self.entry_probability, + 'exit_probability': self.exit_probability, + 'bar_count': self._bar_count, + 'last_signal_bar': self._last_signal_bar, + 'current_price': self._current_price, + 'last_timestamp': self._last_timestamp, + 'signal_frequency': self.signal_frequency, + 'timeframe': self.timeframe + }) + return base_summary + + def __repr__(self) -> str: + """String representation of the strategy.""" + return (f"IncRandomStrategy(entry_prob={self.entry_probability}, " + f"exit_prob={self.exit_probability}, timeframe={self.timeframe}, " + f"mode={self._calculation_mode}, warmed_up={self._is_warmed_up}, " + f"bars={self._bar_count})") \ No newline at end of file diff --git a/cycles/IncStrategies/test_random_strategy.py b/cycles/IncStrategies/test_random_strategy.py new file mode 100644 index 0000000..5236c18 --- /dev/null +++ b/cycles/IncStrategies/test_random_strategy.py @@ -0,0 +1,249 @@ +""" +Test script for IncRandomStrategy + +This script tests the incremental random strategy to verify it works correctly +and can generate signals incrementally with proper performance characteristics. +""" + +import pandas as pd +import numpy as np +import time +import logging +from typing import List, Dict + +from .random_strategy import IncRandomStrategy + +# Configure logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + + +def generate_test_data(num_points: int = 100) -> List[Dict[str, float]]: + """ + Generate synthetic OHLCV data for testing. + + Args: + num_points: Number of data points to generate + + Returns: + List of OHLCV data dictionaries + """ + np.random.seed(42) # For reproducible test data + + data_points = [] + base_price = 50000.0 + + for i in range(num_points): + # Generate realistic OHLCV data with some volatility + price_change = np.random.normal(0, 100) # Random walk with volatility + base_price += price_change + + # Ensure realistic OHLC relationships + open_price = base_price + high_price = open_price + abs(np.random.normal(0, 50)) + low_price = open_price - abs(np.random.normal(0, 50)) + close_price = open_price + np.random.normal(0, 30) + + # Ensure OHLC constraints + high_price = max(high_price, open_price, close_price) + low_price = min(low_price, open_price, close_price) + + volume = np.random.uniform(1000, 10000) + + data_points.append({ + 'open': open_price, + 'high': high_price, + 'low': low_price, + 'close': close_price, + 'volume': volume + }) + + return data_points + + +def test_inc_random_strategy(): + """Test the IncRandomStrategy with synthetic data.""" + logger.info("Starting IncRandomStrategy test...") + + # Create strategy with test parameters + strategy_params = { + "entry_probability": 0.2, # Higher probability for testing + "exit_probability": 0.3, + "min_confidence": 0.7, + "max_confidence": 0.9, + "signal_frequency": 3, # Generate signal every 3 bars + "random_seed": 42 # For reproducible results + } + + strategy = IncRandomStrategy(weight=1.0, params=strategy_params) + + # Generate test data + test_data = generate_test_data(50) + timestamps = pd.date_range(start='2024-01-01 09:00:00', periods=len(test_data), freq='1min') + + logger.info(f"Generated {len(test_data)} test data points") + logger.info(f"Strategy minimum buffer size: {strategy.get_minimum_buffer_size()}") + logger.info(f"Strategy supports incremental: {strategy.supports_incremental_calculation()}") + + # Track signals and performance + entry_signals = [] + exit_signals = [] + update_times = [] + signal_times = [] + + # Process data incrementally + for i, (data_point, timestamp) in enumerate(zip(test_data, timestamps)): + # Measure update time + start_time = time.perf_counter() + strategy.calculate_on_data(data_point, timestamp) + update_time = time.perf_counter() - start_time + update_times.append(update_time) + + # Generate signals + start_time = time.perf_counter() + entry_signal = strategy.get_entry_signal() + exit_signal = strategy.get_exit_signal() + signal_time = time.perf_counter() - start_time + signal_times.append(signal_time) + + # Track signals + if entry_signal.signal_type == "ENTRY": + entry_signals.append((i, entry_signal)) + logger.info(f"Entry signal at index {i}: confidence={entry_signal.confidence:.2f}, " + f"price=${entry_signal.price:.2f}") + + if exit_signal.signal_type == "EXIT": + exit_signals.append((i, exit_signal)) + logger.info(f"Exit signal at index {i}: confidence={exit_signal.confidence:.2f}, " + f"price=${exit_signal.price:.2f}, type={exit_signal.metadata.get('type')}") + + # Log progress every 10 points + if (i + 1) % 10 == 0: + logger.info(f"Processed {i + 1}/{len(test_data)} data points, " + f"warmed_up={strategy.is_warmed_up}") + + # Performance analysis + avg_update_time = np.mean(update_times) * 1000 # Convert to milliseconds + max_update_time = np.max(update_times) * 1000 + avg_signal_time = np.mean(signal_times) * 1000 + max_signal_time = np.max(signal_times) * 1000 + + logger.info("\n" + "="*50) + logger.info("TEST RESULTS") + logger.info("="*50) + logger.info(f"Total data points processed: {len(test_data)}") + logger.info(f"Entry signals generated: {len(entry_signals)}") + logger.info(f"Exit signals generated: {len(exit_signals)}") + logger.info(f"Strategy warmed up: {strategy.is_warmed_up}") + logger.info(f"Final calculation mode: {strategy.calculation_mode}") + + logger.info("\nPERFORMANCE METRICS:") + logger.info(f"Average update time: {avg_update_time:.3f} ms") + logger.info(f"Maximum update time: {max_update_time:.3f} ms") + logger.info(f"Average signal time: {avg_signal_time:.3f} ms") + logger.info(f"Maximum signal time: {max_signal_time:.3f} ms") + + # Performance targets check + target_update_time = 1.0 # 1ms target + target_signal_time = 10.0 # 10ms target + + logger.info("\nPERFORMANCE TARGET CHECK:") + logger.info(f"Update time target (<{target_update_time}ms): {'✅ PASS' if avg_update_time < target_update_time else '❌ FAIL'}") + logger.info(f"Signal time target (<{target_signal_time}ms): {'✅ PASS' if avg_signal_time < target_signal_time else '❌ FAIL'}") + + # State summary + state_summary = strategy.get_current_state_summary() + logger.info(f"\nFINAL STATE SUMMARY:") + for key, value in state_summary.items(): + if key != 'performance_metrics': # Skip detailed performance metrics + logger.info(f" {key}: {value}") + + # Test state reset + logger.info("\nTesting state reset...") + strategy.reset_calculation_state() + logger.info(f"After reset - warmed_up: {strategy.is_warmed_up}, mode: {strategy.calculation_mode}") + + logger.info("\n✅ IncRandomStrategy test completed successfully!") + + return { + 'entry_signals': len(entry_signals), + 'exit_signals': len(exit_signals), + 'avg_update_time_ms': avg_update_time, + 'avg_signal_time_ms': avg_signal_time, + 'performance_targets_met': avg_update_time < target_update_time and avg_signal_time < target_signal_time + } + + +def test_strategy_comparison(): + """Test that incremental strategy produces consistent results with same random seed.""" + logger.info("\nTesting strategy consistency with same random seed...") + + # Create two strategies with same parameters and seed + params = { + "entry_probability": 0.15, + "exit_probability": 0.2, + "random_seed": 123 + } + + strategy1 = IncRandomStrategy(weight=1.0, params=params) + strategy2 = IncRandomStrategy(weight=1.0, params=params) + + # Generate test data + test_data = generate_test_data(20) + timestamps = pd.date_range(start='2024-01-01 10:00:00', periods=len(test_data), freq='1min') + + signals1 = [] + signals2 = [] + + # Process same data with both strategies + for data_point, timestamp in zip(test_data, timestamps): + strategy1.calculate_on_data(data_point, timestamp) + strategy2.calculate_on_data(data_point, timestamp) + + entry1 = strategy1.get_entry_signal() + entry2 = strategy2.get_entry_signal() + + signals1.append(entry1.signal_type) + signals2.append(entry2.signal_type) + + # Check if signals are identical + signals_match = signals1 == signals2 + logger.info(f"Signals consistency test: {'✅ PASS' if signals_match else '❌ FAIL'}") + + if not signals_match: + logger.warning("Signal mismatch detected:") + for i, (s1, s2) in enumerate(zip(signals1, signals2)): + if s1 != s2: + logger.warning(f" Index {i}: Strategy1={s1}, Strategy2={s2}") + + return signals_match + + +if __name__ == "__main__": + try: + # Run main test + test_results = test_inc_random_strategy() + + # Run consistency test + consistency_result = test_strategy_comparison() + + # Summary + logger.info("\n" + "="*60) + logger.info("OVERALL TEST SUMMARY") + logger.info("="*60) + logger.info(f"Main test completed: ✅") + logger.info(f"Performance targets met: {'✅' if test_results['performance_targets_met'] else '❌'}") + logger.info(f"Consistency test passed: {'✅' if consistency_result else '❌'}") + logger.info(f"Entry signals generated: {test_results['entry_signals']}") + logger.info(f"Exit signals generated: {test_results['exit_signals']}") + logger.info(f"Average update time: {test_results['avg_update_time_ms']:.3f} ms") + logger.info(f"Average signal time: {test_results['avg_signal_time_ms']:.3f} ms") + + if test_results['performance_targets_met'] and consistency_result: + logger.info("\n🎉 ALL TESTS PASSED! IncRandomStrategy is ready for use.") + else: + logger.warning("\n⚠️ Some tests failed. Review the results above.") + + except Exception as e: + logger.error(f"Test failed with error: {e}") + raise \ No newline at end of file