random strategy
This commit is contained in:
parent
d985830ecd
commit
9376e13888
360
cycles/IncStrategies/random_strategy.py
Normal file
360
cycles/IncStrategies/random_strategy.py
Normal file
@ -0,0 +1,360 @@
|
||||
"""
|
||||
Incremental Random Strategy for Testing
|
||||
|
||||
This strategy generates random entry and exit signals for testing the incremental strategy system.
|
||||
It's useful for verifying that the incremental strategy framework is working correctly.
|
||||
"""
|
||||
|
||||
import random
|
||||
import logging
|
||||
import time
|
||||
from typing import Dict, Optional
|
||||
import pandas as pd
|
||||
|
||||
from .base import IncStrategyBase, IncStrategySignal
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class IncRandomStrategy(IncStrategyBase):
|
||||
"""
|
||||
Incremental random signal generator strategy for testing.
|
||||
|
||||
This strategy generates random entry and exit signals with configurable
|
||||
probability and confidence levels. It's designed to test the incremental
|
||||
strategy framework and signal processing system.
|
||||
|
||||
The incremental version maintains minimal state and processes each new
|
||||
data point independently, making it ideal for testing real-time performance.
|
||||
|
||||
Parameters:
|
||||
entry_probability: Probability of generating an entry signal (0.0-1.0)
|
||||
exit_probability: Probability of generating an exit signal (0.0-1.0)
|
||||
min_confidence: Minimum confidence level for signals
|
||||
max_confidence: Maximum confidence level for signals
|
||||
timeframe: Timeframe to operate on (default: "1min")
|
||||
signal_frequency: How often to generate signals (every N bars)
|
||||
random_seed: Optional seed for reproducible random signals
|
||||
|
||||
Example:
|
||||
strategy = IncRandomStrategy(
|
||||
weight=1.0,
|
||||
params={
|
||||
"entry_probability": 0.1,
|
||||
"exit_probability": 0.15,
|
||||
"min_confidence": 0.7,
|
||||
"max_confidence": 0.9,
|
||||
"signal_frequency": 5,
|
||||
"random_seed": 42 # For reproducible testing
|
||||
}
|
||||
)
|
||||
"""
|
||||
|
||||
def __init__(self, weight: float = 1.0, params: Optional[Dict] = None):
|
||||
"""Initialize the incremental random strategy."""
|
||||
super().__init__("inc_random", weight, params)
|
||||
|
||||
# Strategy parameters with defaults
|
||||
self.entry_probability = self.params.get("entry_probability", 0.05) # 5% chance per bar
|
||||
self.exit_probability = self.params.get("exit_probability", 0.1) # 10% chance per bar
|
||||
self.min_confidence = self.params.get("min_confidence", 0.6)
|
||||
self.max_confidence = self.params.get("max_confidence", 0.9)
|
||||
self.timeframe = self.params.get("timeframe", "1min")
|
||||
self.signal_frequency = self.params.get("signal_frequency", 1) # Every bar
|
||||
|
||||
# Create separate random instance for this strategy
|
||||
self._random = random.Random()
|
||||
random_seed = self.params.get("random_seed")
|
||||
if random_seed is not None:
|
||||
self._random.seed(random_seed)
|
||||
logger.info(f"IncRandomStrategy: Set random seed to {random_seed}")
|
||||
|
||||
# Internal state (minimal for random strategy)
|
||||
self._bar_count = 0
|
||||
self._last_signal_bar = -1
|
||||
self._current_price = None
|
||||
self._last_timestamp = None
|
||||
|
||||
logger.info(f"IncRandomStrategy initialized with entry_prob={self.entry_probability}, "
|
||||
f"exit_prob={self.exit_probability}, timeframe={self.timeframe}")
|
||||
|
||||
def get_minimum_buffer_size(self) -> Dict[str, int]:
|
||||
"""
|
||||
Return minimum data points needed for each timeframe.
|
||||
|
||||
Random strategy doesn't need any historical data for calculations,
|
||||
so we only need 1 data point to start generating signals.
|
||||
|
||||
Returns:
|
||||
Dict[str, int]: Minimal buffer requirements
|
||||
"""
|
||||
return {"1min": 1} # Only need current data point
|
||||
|
||||
def supports_incremental_calculation(self) -> bool:
|
||||
"""
|
||||
Whether strategy supports incremental calculation.
|
||||
|
||||
Random strategy is ideal for incremental mode since it doesn't
|
||||
depend on historical calculations.
|
||||
|
||||
Returns:
|
||||
bool: Always True for random strategy
|
||||
"""
|
||||
return True
|
||||
|
||||
def calculate_on_data(self, new_data_point: Dict[str, float], timestamp: pd.Timestamp) -> None:
|
||||
"""
|
||||
Process a single new data point incrementally.
|
||||
|
||||
For random strategy, we just update our internal state with the
|
||||
current price and increment the bar counter.
|
||||
|
||||
Args:
|
||||
new_data_point: OHLCV data point {open, high, low, close, volume}
|
||||
timestamp: Timestamp of the data point
|
||||
"""
|
||||
start_time = time.perf_counter()
|
||||
|
||||
try:
|
||||
# Update timeframe buffers (handled by base class)
|
||||
self._update_timeframe_buffers(new_data_point, timestamp)
|
||||
|
||||
# Update internal state
|
||||
self._current_price = new_data_point['close']
|
||||
self._last_timestamp = timestamp
|
||||
self._data_points_received += 1
|
||||
|
||||
# Check if we should update bar count based on timeframe
|
||||
if self._should_update_bar_count(timestamp):
|
||||
self._bar_count += 1
|
||||
|
||||
# Debug logging every 10 bars
|
||||
if self._bar_count % 10 == 0:
|
||||
logger.debug(f"IncRandomStrategy: Processing bar {self._bar_count}, "
|
||||
f"price=${self._current_price:.2f}, timestamp={timestamp}")
|
||||
|
||||
# Update warm-up status
|
||||
if not self._is_warmed_up and self._data_points_received >= 1:
|
||||
self._is_warmed_up = True
|
||||
self._calculation_mode = "incremental"
|
||||
logger.info(f"IncRandomStrategy: Warmed up after {self._data_points_received} data points")
|
||||
|
||||
# Record performance metrics
|
||||
update_time = time.perf_counter() - start_time
|
||||
self._performance_metrics['update_times'].append(update_time)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"IncRandomStrategy: Error in calculate_on_data: {e}")
|
||||
self._performance_metrics['state_validation_failures'] += 1
|
||||
raise
|
||||
|
||||
def _should_update_bar_count(self, timestamp: pd.Timestamp) -> bool:
|
||||
"""
|
||||
Check if we should increment bar count based on timeframe.
|
||||
|
||||
For 1min timeframe, increment every data point.
|
||||
For other timeframes, increment when timeframe period has passed.
|
||||
|
||||
Args:
|
||||
timestamp: Current timestamp
|
||||
|
||||
Returns:
|
||||
bool: Whether to increment bar count
|
||||
"""
|
||||
if self.timeframe == "1min":
|
||||
return True # Every data point is a new bar
|
||||
|
||||
if self._last_timestamp is None:
|
||||
return True # First data point
|
||||
|
||||
# Calculate timeframe interval
|
||||
if self.timeframe.endswith("min"):
|
||||
minutes = int(self.timeframe[:-3])
|
||||
interval = pd.Timedelta(minutes=minutes)
|
||||
elif self.timeframe.endswith("h"):
|
||||
hours = int(self.timeframe[:-1])
|
||||
interval = pd.Timedelta(hours=hours)
|
||||
else:
|
||||
return True # Unknown timeframe, update anyway
|
||||
|
||||
# Check if enough time has passed
|
||||
return timestamp >= self._last_timestamp + interval
|
||||
|
||||
def get_entry_signal(self) -> IncStrategySignal:
|
||||
"""
|
||||
Generate random entry signals based on current state.
|
||||
|
||||
Returns:
|
||||
IncStrategySignal: Entry signal with confidence level
|
||||
"""
|
||||
if not self._is_warmed_up:
|
||||
return IncStrategySignal("HOLD", 0.0)
|
||||
|
||||
start_time = time.perf_counter()
|
||||
|
||||
try:
|
||||
# Check if we should generate a signal based on frequency
|
||||
if (self._bar_count - self._last_signal_bar) < self.signal_frequency:
|
||||
return IncStrategySignal("HOLD", 0.0)
|
||||
|
||||
# Generate random entry signal using strategy's random instance
|
||||
random_value = self._random.random()
|
||||
if random_value < self.entry_probability:
|
||||
confidence = self._random.uniform(self.min_confidence, self.max_confidence)
|
||||
self._last_signal_bar = self._bar_count
|
||||
|
||||
logger.info(f"IncRandomStrategy: Generated ENTRY signal at bar {self._bar_count}, "
|
||||
f"price=${self._current_price:.2f}, confidence={confidence:.2f}, "
|
||||
f"random_value={random_value:.3f}")
|
||||
|
||||
signal = IncStrategySignal(
|
||||
"ENTRY",
|
||||
confidence=confidence,
|
||||
price=self._current_price,
|
||||
metadata={
|
||||
"strategy": "inc_random",
|
||||
"bar_count": self._bar_count,
|
||||
"timeframe": self.timeframe,
|
||||
"random_value": random_value,
|
||||
"timestamp": self._last_timestamp
|
||||
}
|
||||
)
|
||||
|
||||
# Record performance metrics
|
||||
signal_time = time.perf_counter() - start_time
|
||||
self._performance_metrics['signal_generation_times'].append(signal_time)
|
||||
|
||||
return signal
|
||||
|
||||
return IncStrategySignal("HOLD", 0.0)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"IncRandomStrategy: Error in get_entry_signal: {e}")
|
||||
return IncStrategySignal("HOLD", 0.0)
|
||||
|
||||
def get_exit_signal(self) -> IncStrategySignal:
|
||||
"""
|
||||
Generate random exit signals based on current state.
|
||||
|
||||
Returns:
|
||||
IncStrategySignal: Exit signal with confidence level
|
||||
"""
|
||||
if not self._is_warmed_up:
|
||||
return IncStrategySignal("HOLD", 0.0)
|
||||
|
||||
start_time = time.perf_counter()
|
||||
|
||||
try:
|
||||
# Generate random exit signal using strategy's random instance
|
||||
random_value = self._random.random()
|
||||
if random_value < self.exit_probability:
|
||||
confidence = self._random.uniform(self.min_confidence, self.max_confidence)
|
||||
|
||||
# Randomly choose exit type
|
||||
exit_types = ["SELL_SIGNAL", "TAKE_PROFIT", "STOP_LOSS"]
|
||||
exit_type = self._random.choice(exit_types)
|
||||
|
||||
logger.info(f"IncRandomStrategy: Generated EXIT signal at bar {self._bar_count}, "
|
||||
f"price=${self._current_price:.2f}, confidence={confidence:.2f}, "
|
||||
f"type={exit_type}, random_value={random_value:.3f}")
|
||||
|
||||
signal = IncStrategySignal(
|
||||
"EXIT",
|
||||
confidence=confidence,
|
||||
price=self._current_price,
|
||||
metadata={
|
||||
"type": exit_type,
|
||||
"strategy": "inc_random",
|
||||
"bar_count": self._bar_count,
|
||||
"timeframe": self.timeframe,
|
||||
"random_value": random_value,
|
||||
"timestamp": self._last_timestamp
|
||||
}
|
||||
)
|
||||
|
||||
# Record performance metrics
|
||||
signal_time = time.perf_counter() - start_time
|
||||
self._performance_metrics['signal_generation_times'].append(signal_time)
|
||||
|
||||
return signal
|
||||
|
||||
return IncStrategySignal("HOLD", 0.0)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"IncRandomStrategy: Error in get_exit_signal: {e}")
|
||||
return IncStrategySignal("HOLD", 0.0)
|
||||
|
||||
def get_confidence(self) -> float:
|
||||
"""
|
||||
Return random confidence level for current market state.
|
||||
|
||||
Returns:
|
||||
float: Random confidence level between min and max confidence
|
||||
"""
|
||||
if not self._is_warmed_up:
|
||||
return 0.0
|
||||
|
||||
return self._random.uniform(self.min_confidence, self.max_confidence)
|
||||
|
||||
def reset_calculation_state(self) -> None:
|
||||
"""Reset internal calculation state for reinitialization."""
|
||||
super().reset_calculation_state()
|
||||
|
||||
# Reset random strategy specific state
|
||||
self._bar_count = 0
|
||||
self._last_signal_bar = -1
|
||||
self._current_price = None
|
||||
self._last_timestamp = None
|
||||
|
||||
# Reset random state if seed was provided
|
||||
random_seed = self.params.get("random_seed")
|
||||
if random_seed is not None:
|
||||
self._random.seed(random_seed)
|
||||
|
||||
logger.info("IncRandomStrategy: Calculation state reset")
|
||||
|
||||
def _reinitialize_from_buffers(self) -> None:
|
||||
"""
|
||||
Reinitialize indicators from available buffer data.
|
||||
|
||||
For random strategy, we just need to restore the current price
|
||||
from the latest data point in the buffer.
|
||||
"""
|
||||
try:
|
||||
# Get the latest data point from 1min buffer
|
||||
buffer_1min = self._timeframe_buffers.get("1min")
|
||||
if buffer_1min and len(buffer_1min) > 0:
|
||||
latest_data = buffer_1min[-1]
|
||||
self._current_price = latest_data['close']
|
||||
self._last_timestamp = latest_data.get('timestamp')
|
||||
self._bar_count = len(buffer_1min)
|
||||
|
||||
logger.info(f"IncRandomStrategy: Reinitialized from buffer with {self._bar_count} bars")
|
||||
else:
|
||||
logger.warning("IncRandomStrategy: No buffer data available for reinitialization")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"IncRandomStrategy: Error reinitializing from buffers: {e}")
|
||||
raise
|
||||
|
||||
def get_current_state_summary(self) -> Dict[str, any]:
|
||||
"""Get summary of current calculation state for debugging."""
|
||||
base_summary = super().get_current_state_summary()
|
||||
base_summary.update({
|
||||
'entry_probability': self.entry_probability,
|
||||
'exit_probability': self.exit_probability,
|
||||
'bar_count': self._bar_count,
|
||||
'last_signal_bar': self._last_signal_bar,
|
||||
'current_price': self._current_price,
|
||||
'last_timestamp': self._last_timestamp,
|
||||
'signal_frequency': self.signal_frequency,
|
||||
'timeframe': self.timeframe
|
||||
})
|
||||
return base_summary
|
||||
|
||||
def __repr__(self) -> str:
|
||||
"""String representation of the strategy."""
|
||||
return (f"IncRandomStrategy(entry_prob={self.entry_probability}, "
|
||||
f"exit_prob={self.exit_probability}, timeframe={self.timeframe}, "
|
||||
f"mode={self._calculation_mode}, warmed_up={self._is_warmed_up}, "
|
||||
f"bars={self._bar_count})")
|
||||
249
cycles/IncStrategies/test_random_strategy.py
Normal file
249
cycles/IncStrategies/test_random_strategy.py
Normal file
@ -0,0 +1,249 @@
|
||||
"""
|
||||
Test script for IncRandomStrategy
|
||||
|
||||
This script tests the incremental random strategy to verify it works correctly
|
||||
and can generate signals incrementally with proper performance characteristics.
|
||||
"""
|
||||
|
||||
import pandas as pd
|
||||
import numpy as np
|
||||
import time
|
||||
import logging
|
||||
from typing import List, Dict
|
||||
|
||||
from .random_strategy import IncRandomStrategy
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def generate_test_data(num_points: int = 100) -> List[Dict[str, float]]:
|
||||
"""
|
||||
Generate synthetic OHLCV data for testing.
|
||||
|
||||
Args:
|
||||
num_points: Number of data points to generate
|
||||
|
||||
Returns:
|
||||
List of OHLCV data dictionaries
|
||||
"""
|
||||
np.random.seed(42) # For reproducible test data
|
||||
|
||||
data_points = []
|
||||
base_price = 50000.0
|
||||
|
||||
for i in range(num_points):
|
||||
# Generate realistic OHLCV data with some volatility
|
||||
price_change = np.random.normal(0, 100) # Random walk with volatility
|
||||
base_price += price_change
|
||||
|
||||
# Ensure realistic OHLC relationships
|
||||
open_price = base_price
|
||||
high_price = open_price + abs(np.random.normal(0, 50))
|
||||
low_price = open_price - abs(np.random.normal(0, 50))
|
||||
close_price = open_price + np.random.normal(0, 30)
|
||||
|
||||
# Ensure OHLC constraints
|
||||
high_price = max(high_price, open_price, close_price)
|
||||
low_price = min(low_price, open_price, close_price)
|
||||
|
||||
volume = np.random.uniform(1000, 10000)
|
||||
|
||||
data_points.append({
|
||||
'open': open_price,
|
||||
'high': high_price,
|
||||
'low': low_price,
|
||||
'close': close_price,
|
||||
'volume': volume
|
||||
})
|
||||
|
||||
return data_points
|
||||
|
||||
|
||||
def test_inc_random_strategy():
|
||||
"""Test the IncRandomStrategy with synthetic data."""
|
||||
logger.info("Starting IncRandomStrategy test...")
|
||||
|
||||
# Create strategy with test parameters
|
||||
strategy_params = {
|
||||
"entry_probability": 0.2, # Higher probability for testing
|
||||
"exit_probability": 0.3,
|
||||
"min_confidence": 0.7,
|
||||
"max_confidence": 0.9,
|
||||
"signal_frequency": 3, # Generate signal every 3 bars
|
||||
"random_seed": 42 # For reproducible results
|
||||
}
|
||||
|
||||
strategy = IncRandomStrategy(weight=1.0, params=strategy_params)
|
||||
|
||||
# Generate test data
|
||||
test_data = generate_test_data(50)
|
||||
timestamps = pd.date_range(start='2024-01-01 09:00:00', periods=len(test_data), freq='1min')
|
||||
|
||||
logger.info(f"Generated {len(test_data)} test data points")
|
||||
logger.info(f"Strategy minimum buffer size: {strategy.get_minimum_buffer_size()}")
|
||||
logger.info(f"Strategy supports incremental: {strategy.supports_incremental_calculation()}")
|
||||
|
||||
# Track signals and performance
|
||||
entry_signals = []
|
||||
exit_signals = []
|
||||
update_times = []
|
||||
signal_times = []
|
||||
|
||||
# Process data incrementally
|
||||
for i, (data_point, timestamp) in enumerate(zip(test_data, timestamps)):
|
||||
# Measure update time
|
||||
start_time = time.perf_counter()
|
||||
strategy.calculate_on_data(data_point, timestamp)
|
||||
update_time = time.perf_counter() - start_time
|
||||
update_times.append(update_time)
|
||||
|
||||
# Generate signals
|
||||
start_time = time.perf_counter()
|
||||
entry_signal = strategy.get_entry_signal()
|
||||
exit_signal = strategy.get_exit_signal()
|
||||
signal_time = time.perf_counter() - start_time
|
||||
signal_times.append(signal_time)
|
||||
|
||||
# Track signals
|
||||
if entry_signal.signal_type == "ENTRY":
|
||||
entry_signals.append((i, entry_signal))
|
||||
logger.info(f"Entry signal at index {i}: confidence={entry_signal.confidence:.2f}, "
|
||||
f"price=${entry_signal.price:.2f}")
|
||||
|
||||
if exit_signal.signal_type == "EXIT":
|
||||
exit_signals.append((i, exit_signal))
|
||||
logger.info(f"Exit signal at index {i}: confidence={exit_signal.confidence:.2f}, "
|
||||
f"price=${exit_signal.price:.2f}, type={exit_signal.metadata.get('type')}")
|
||||
|
||||
# Log progress every 10 points
|
||||
if (i + 1) % 10 == 0:
|
||||
logger.info(f"Processed {i + 1}/{len(test_data)} data points, "
|
||||
f"warmed_up={strategy.is_warmed_up}")
|
||||
|
||||
# Performance analysis
|
||||
avg_update_time = np.mean(update_times) * 1000 # Convert to milliseconds
|
||||
max_update_time = np.max(update_times) * 1000
|
||||
avg_signal_time = np.mean(signal_times) * 1000
|
||||
max_signal_time = np.max(signal_times) * 1000
|
||||
|
||||
logger.info("\n" + "="*50)
|
||||
logger.info("TEST RESULTS")
|
||||
logger.info("="*50)
|
||||
logger.info(f"Total data points processed: {len(test_data)}")
|
||||
logger.info(f"Entry signals generated: {len(entry_signals)}")
|
||||
logger.info(f"Exit signals generated: {len(exit_signals)}")
|
||||
logger.info(f"Strategy warmed up: {strategy.is_warmed_up}")
|
||||
logger.info(f"Final calculation mode: {strategy.calculation_mode}")
|
||||
|
||||
logger.info("\nPERFORMANCE METRICS:")
|
||||
logger.info(f"Average update time: {avg_update_time:.3f} ms")
|
||||
logger.info(f"Maximum update time: {max_update_time:.3f} ms")
|
||||
logger.info(f"Average signal time: {avg_signal_time:.3f} ms")
|
||||
logger.info(f"Maximum signal time: {max_signal_time:.3f} ms")
|
||||
|
||||
# Performance targets check
|
||||
target_update_time = 1.0 # 1ms target
|
||||
target_signal_time = 10.0 # 10ms target
|
||||
|
||||
logger.info("\nPERFORMANCE TARGET CHECK:")
|
||||
logger.info(f"Update time target (<{target_update_time}ms): {'✅ PASS' if avg_update_time < target_update_time else '❌ FAIL'}")
|
||||
logger.info(f"Signal time target (<{target_signal_time}ms): {'✅ PASS' if avg_signal_time < target_signal_time else '❌ FAIL'}")
|
||||
|
||||
# State summary
|
||||
state_summary = strategy.get_current_state_summary()
|
||||
logger.info(f"\nFINAL STATE SUMMARY:")
|
||||
for key, value in state_summary.items():
|
||||
if key != 'performance_metrics': # Skip detailed performance metrics
|
||||
logger.info(f" {key}: {value}")
|
||||
|
||||
# Test state reset
|
||||
logger.info("\nTesting state reset...")
|
||||
strategy.reset_calculation_state()
|
||||
logger.info(f"After reset - warmed_up: {strategy.is_warmed_up}, mode: {strategy.calculation_mode}")
|
||||
|
||||
logger.info("\n✅ IncRandomStrategy test completed successfully!")
|
||||
|
||||
return {
|
||||
'entry_signals': len(entry_signals),
|
||||
'exit_signals': len(exit_signals),
|
||||
'avg_update_time_ms': avg_update_time,
|
||||
'avg_signal_time_ms': avg_signal_time,
|
||||
'performance_targets_met': avg_update_time < target_update_time and avg_signal_time < target_signal_time
|
||||
}
|
||||
|
||||
|
||||
def test_strategy_comparison():
|
||||
"""Test that incremental strategy produces consistent results with same random seed."""
|
||||
logger.info("\nTesting strategy consistency with same random seed...")
|
||||
|
||||
# Create two strategies with same parameters and seed
|
||||
params = {
|
||||
"entry_probability": 0.15,
|
||||
"exit_probability": 0.2,
|
||||
"random_seed": 123
|
||||
}
|
||||
|
||||
strategy1 = IncRandomStrategy(weight=1.0, params=params)
|
||||
strategy2 = IncRandomStrategy(weight=1.0, params=params)
|
||||
|
||||
# Generate test data
|
||||
test_data = generate_test_data(20)
|
||||
timestamps = pd.date_range(start='2024-01-01 10:00:00', periods=len(test_data), freq='1min')
|
||||
|
||||
signals1 = []
|
||||
signals2 = []
|
||||
|
||||
# Process same data with both strategies
|
||||
for data_point, timestamp in zip(test_data, timestamps):
|
||||
strategy1.calculate_on_data(data_point, timestamp)
|
||||
strategy2.calculate_on_data(data_point, timestamp)
|
||||
|
||||
entry1 = strategy1.get_entry_signal()
|
||||
entry2 = strategy2.get_entry_signal()
|
||||
|
||||
signals1.append(entry1.signal_type)
|
||||
signals2.append(entry2.signal_type)
|
||||
|
||||
# Check if signals are identical
|
||||
signals_match = signals1 == signals2
|
||||
logger.info(f"Signals consistency test: {'✅ PASS' if signals_match else '❌ FAIL'}")
|
||||
|
||||
if not signals_match:
|
||||
logger.warning("Signal mismatch detected:")
|
||||
for i, (s1, s2) in enumerate(zip(signals1, signals2)):
|
||||
if s1 != s2:
|
||||
logger.warning(f" Index {i}: Strategy1={s1}, Strategy2={s2}")
|
||||
|
||||
return signals_match
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
try:
|
||||
# Run main test
|
||||
test_results = test_inc_random_strategy()
|
||||
|
||||
# Run consistency test
|
||||
consistency_result = test_strategy_comparison()
|
||||
|
||||
# Summary
|
||||
logger.info("\n" + "="*60)
|
||||
logger.info("OVERALL TEST SUMMARY")
|
||||
logger.info("="*60)
|
||||
logger.info(f"Main test completed: ✅")
|
||||
logger.info(f"Performance targets met: {'✅' if test_results['performance_targets_met'] else '❌'}")
|
||||
logger.info(f"Consistency test passed: {'✅' if consistency_result else '❌'}")
|
||||
logger.info(f"Entry signals generated: {test_results['entry_signals']}")
|
||||
logger.info(f"Exit signals generated: {test_results['exit_signals']}")
|
||||
logger.info(f"Average update time: {test_results['avg_update_time_ms']:.3f} ms")
|
||||
logger.info(f"Average signal time: {test_results['avg_signal_time_ms']:.3f} ms")
|
||||
|
||||
if test_results['performance_targets_met'] and consistency_result:
|
||||
logger.info("\n🎉 ALL TESTS PASSED! IncRandomStrategy is ready for use.")
|
||||
else:
|
||||
logger.warning("\n⚠️ Some tests failed. Review the results above.")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Test failed with error: {e}")
|
||||
raise
|
||||
Loading…
x
Reference in New Issue
Block a user