""" Batch Processing for Strategy Backtesting This module provides efficient batch processing capabilities for running multiple strategies across large datasets with memory management and performance optimization. """ from typing import List, Dict, Any, Optional, Tuple, Iterator from dataclasses import dataclass from datetime import datetime, timezone import pandas as pd from concurrent.futures import ThreadPoolExecutor, as_completed import gc import os import psutil from .data_integration import StrategyDataIntegrator, StrategyDataIntegrationConfig from .data_types import StrategyResult, StrategySignal from .validation import StrategySignalValidator, ValidationConfig from utils.logger import get_logger @dataclass class BatchProcessingConfig: """Configuration for batch processing operations.""" max_concurrent_strategies: int = 4 # Number of strategies to process concurrently max_memory_usage_percent: float = 80.0 # Maximum memory usage threshold chunk_size_days: int = 30 # Days to process per chunk for large datasets enable_memory_monitoring: bool = True enable_result_validation: bool = True result_cache_size: int = 1000 # Maximum cached results progress_reporting_interval: int = 10 # Report progress every N strategies class BacktestingBatchProcessor: """ Efficient batch processing for strategy backtesting. Provides memory-efficient processing of multiple strategies across large datasets with parallel execution and performance monitoring. """ def __init__(self, config: BatchProcessingConfig = None): """ Initialize batch processor. Args: config: Batch processing configuration """ self.config = config or BatchProcessingConfig() self.logger = get_logger() # Initialize components self.data_integrator = StrategyDataIntegrator() self.signal_validator = StrategySignalValidator() if self.config.enable_result_validation else None # Processing statistics self._processing_stats = { 'strategies_processed': 0, 'total_signals_generated': 0, 'processing_time_seconds': 0.0, 'memory_peak_mb': 0.0, 'errors_count': 0, 'validation_failures': 0 } # Result cache for performance self._result_cache = {} def process_strategies_batch( self, strategy_configs: List[Dict[str, Any]], symbols: List[str], timeframe: str, days_back: int, exchange: str = "okx" ) -> Dict[str, List[StrategyResult]]: """ Process multiple strategies across multiple symbols efficiently. Args: strategy_configs: List of strategy configurations symbols: List of trading symbols to process timeframe: Timeframe for processing days_back: Number of days to look back exchange: Exchange name Returns: Dictionary mapping strategy names to their results """ try: start_time = datetime.now() self.logger.info(f"BacktestingBatchProcessor: Starting batch processing of {len(strategy_configs)} strategies across {len(symbols)} symbols") # Initialize results container batch_results = {} # Process strategies with memory monitoring for i, strategy_config in enumerate(strategy_configs): strategy_name = strategy_config.get('name', f'strategy_{i}') # Memory check if self.config.enable_memory_monitoring: self._check_memory_usage() # Process strategy across all symbols strategy_results = self._process_single_strategy_batch( strategy_config, symbols, timeframe, days_back, exchange ) batch_results[strategy_name] = strategy_results self._processing_stats['strategies_processed'] += 1 # Progress reporting if (i + 1) % self.config.progress_reporting_interval == 0: progress = ((i + 1) / len(strategy_configs)) * 100 self.logger.info(f"Batch processing progress: {progress:.1f}% ({i + 1}/{len(strategy_configs)} strategies)") # Update final statistics processing_time = (datetime.now() - start_time).total_seconds() self._processing_stats['processing_time_seconds'] = processing_time self.logger.info(f"BacktestingBatchProcessor: Completed batch processing in {processing_time:.2f} seconds") return batch_results except Exception as e: self.logger.error(f"Error in batch processing: {e}") self._processing_stats['errors_count'] += 1 return {} def _process_single_strategy_batch( self, strategy_config: Dict[str, Any], symbols: List[str], timeframe: str, days_back: int, exchange: str ) -> List[StrategyResult]: """Process a single strategy across multiple symbols.""" all_results = [] strategy_name = strategy_config.get('name', 'unknown') for symbol in symbols: try: # Calculate strategy signals for this symbol results = self.data_integrator.calculate_strategy_signals_orchestrated( strategy_name=strategy_name, strategy_config=strategy_config, symbol=symbol, timeframe=timeframe, days_back=days_back, exchange=exchange ) # Validate results if enabled if self.signal_validator and results: validated_results = self._validate_strategy_results(results) all_results.extend(validated_results) else: all_results.extend(results) # Update signal count signal_count = sum(len(result.signals) for result in results) self._processing_stats['total_signals_generated'] += signal_count except Exception as e: self.logger.error(f"Error processing {strategy_name} for {symbol}: {e}") self._processing_stats['errors_count'] += 1 return all_results def _validate_strategy_results(self, results: List[StrategyResult]) -> List[StrategyResult]: """Validate strategy results and filter invalid signals.""" validated_results = [] for result in results: if result.signals: valid_signals, invalid_signals = self.signal_validator.validate_signals_batch(result.signals) if invalid_signals: self._processing_stats['validation_failures'] += len(invalid_signals) self.logger.debug(f"Filtered {len(invalid_signals)} invalid signals from {result.strategy_name}") # Create new result with only valid signals if valid_signals: validated_result = StrategyResult( timestamp=result.timestamp, symbol=result.symbol, timeframe=result.timeframe, strategy_name=result.strategy_name, signals=valid_signals, indicators_used=result.indicators_used, metadata=result.metadata ) validated_results.append(validated_result) else: validated_results.append(result) return validated_results def _check_memory_usage(self) -> None: """Monitor memory usage and trigger cleanup if needed.""" process = psutil.Process(os.getpid()) memory_percent = process.memory_percent() memory_mb = process.memory_info().rss / 1024 / 1024 # Update peak memory tracking self._processing_stats['memory_peak_mb'] = max( self._processing_stats['memory_peak_mb'], memory_mb ) if memory_percent > self.config.max_memory_usage_percent: self.logger.warning(f"High memory usage detected: {memory_percent:.1f}% ({memory_mb:.1f} MB)") self._cleanup_memory() def _cleanup_memory(self) -> None: """Perform memory cleanup operations.""" # Clear old cached results if len(self._result_cache) > self.config.result_cache_size: cache_items = list(self._result_cache.items()) # Keep only the most recent half keep_size = self.config.result_cache_size // 2 self._result_cache = dict(cache_items[-keep_size:]) # Clear data integrator caches self.data_integrator.clear_cache() # Force garbage collection gc.collect() self.logger.debug("BacktestingBatchProcessor: Performed memory cleanup") def get_processing_statistics(self) -> Dict[str, Any]: """Get comprehensive processing statistics.""" stats = self._processing_stats.copy() # Calculate derived metrics if stats['strategies_processed'] > 0: stats['average_signals_per_strategy'] = stats['total_signals_generated'] / stats['strategies_processed'] stats['average_processing_time_per_strategy'] = stats['processing_time_seconds'] / stats['strategies_processed'] else: stats['average_signals_per_strategy'] = 0 stats['average_processing_time_per_strategy'] = 0 stats['error_rate'] = (stats['errors_count'] / max(stats['strategies_processed'], 1)) * 100 stats['validation_failure_rate'] = (stats['validation_failures'] / max(stats['total_signals_generated'], 1)) * 100 return stats def process_strategies_parallel( self, strategy_configs: List[Dict[str, Any]], symbols: List[str], timeframe: str, days_back: int, exchange: str = "okx" ) -> Dict[str, List[StrategyResult]]: """ Process multiple strategies in parallel for improved performance. Args: strategy_configs: List of strategy configurations symbols: List of trading symbols to process timeframe: Timeframe for processing days_back: Number of days to look back exchange: Exchange name Returns: Dictionary mapping strategy names to their results """ try: start_time = datetime.now() self.logger.info(f"BacktestingBatchProcessor: Starting parallel processing of {len(strategy_configs)} strategies") batch_results = {} # Use ThreadPoolExecutor for parallel strategy processing with ThreadPoolExecutor(max_workers=self.config.max_concurrent_strategies) as executor: # Submit all strategy processing tasks future_to_strategy = {} for strategy_config in strategy_configs: strategy_name = strategy_config.get('name', f'strategy_{len(future_to_strategy)}') future = executor.submit( self._process_single_strategy_batch, strategy_config, symbols, timeframe, days_back, exchange ) future_to_strategy[future] = strategy_name # Collect results as they complete completed_count = 0 for future in as_completed(future_to_strategy): strategy_name = future_to_strategy[future] try: strategy_results = future.result() batch_results[strategy_name] = strategy_results self._processing_stats['strategies_processed'] += 1 completed_count += 1 # Progress reporting if completed_count % self.config.progress_reporting_interval == 0: progress = (completed_count / len(strategy_configs)) * 100 self.logger.info(f"Parallel processing progress: {progress:.1f}% ({completed_count}/{len(strategy_configs)} strategies)") except Exception as e: self.logger.error(f"Error processing strategy {strategy_name}: {e}") self._processing_stats['errors_count'] += 1 batch_results[strategy_name] = [] # Memory check after each completed strategy if self.config.enable_memory_monitoring: self._check_memory_usage() # Update final statistics processing_time = (datetime.now() - start_time).total_seconds() self._processing_stats['processing_time_seconds'] = processing_time self.logger.info(f"BacktestingBatchProcessor: Completed parallel processing in {processing_time:.2f} seconds") return batch_results except Exception as e: self.logger.error(f"Error in parallel batch processing: {e}") self._processing_stats['errors_count'] += 1 return {} def process_symbols_parallel( self, strategy_config: Dict[str, Any], symbols: List[str], timeframe: str, days_back: int, exchange: str = "okx" ) -> List[StrategyResult]: """ Process a single strategy across multiple symbols in parallel. Args: strategy_config: Strategy configuration symbols: List of trading symbols to process timeframe: Timeframe for processing days_back: Number of days to look back exchange: Exchange name Returns: List of strategy results across all symbols """ try: strategy_name = strategy_config.get('name', 'unknown') self.logger.info(f"BacktestingBatchProcessor: Processing {strategy_name} across {len(symbols)} symbols in parallel") all_results = [] # Use ThreadPoolExecutor for parallel symbol processing with ThreadPoolExecutor(max_workers=min(len(symbols), self.config.max_concurrent_strategies)) as executor: # Submit symbol processing tasks future_to_symbol = {} for symbol in symbols: future = executor.submit( self._process_strategy_for_symbol, strategy_config, symbol, timeframe, days_back, exchange ) future_to_symbol[future] = symbol # Collect results as they complete for future in as_completed(future_to_symbol): symbol = future_to_symbol[future] try: symbol_results = future.result() all_results.extend(symbol_results) # Update signal count signal_count = sum(len(result.signals) for result in symbol_results) self._processing_stats['total_signals_generated'] += signal_count except Exception as e: self.logger.error(f"Error processing {strategy_name} for {symbol}: {e}") self._processing_stats['errors_count'] += 1 return all_results except Exception as e: self.logger.error(f"Error in parallel symbol processing: {e}") self._processing_stats['errors_count'] += 1 return [] def _process_strategy_for_symbol( self, strategy_config: Dict[str, Any], symbol: str, timeframe: str, days_back: int, exchange: str ) -> List[StrategyResult]: """Process a single strategy for a single symbol.""" try: strategy_name = strategy_config.get('name', 'unknown') # Calculate strategy signals for this symbol results = self.data_integrator.calculate_strategy_signals_orchestrated( strategy_name=strategy_name, strategy_config=strategy_config, symbol=symbol, timeframe=timeframe, days_back=days_back, exchange=exchange ) # Validate results if enabled if self.signal_validator and results: validated_results = self._validate_strategy_results(results) return validated_results else: return results except Exception as e: self.logger.error(f"Error processing {strategy_config.get('name', 'unknown')} for {symbol}: {e}") return [] def process_large_dataset_streaming( self, strategy_configs: List[Dict[str, Any]], symbols: List[str], timeframe: str, total_days_back: int, exchange: str = "okx" ) -> Iterator[Dict[str, List[StrategyResult]]]: """ Process large datasets using streaming approach with memory-efficient chunking. This method processes data in chunks to avoid memory overflow when dealing with very large historical datasets. Args: strategy_configs: List of strategy configurations symbols: List of trading symbols to process timeframe: Timeframe for processing total_days_back: Total number of days to process exchange: Exchange name Yields: Dictionary chunks mapping strategy names to their results """ try: chunk_size = self.config.chunk_size_days total_chunks = (total_days_back + chunk_size - 1) // chunk_size # Ceiling division self.logger.info(f"BacktestingBatchProcessor: Starting streaming processing of {total_days_back} days in {total_chunks} chunks") for chunk_index in range(total_chunks): # Calculate date range for this chunk chunk_start_days = chunk_index * chunk_size chunk_end_days = min((chunk_index + 1) * chunk_size, total_days_back) chunk_days = chunk_end_days - chunk_start_days self.logger.info(f"Processing chunk {chunk_index + 1}/{total_chunks}: {chunk_days} days") # Memory check before processing chunk if self.config.enable_memory_monitoring: self._check_memory_usage() # Process chunk using parallel processing chunk_results = self.process_strategies_parallel( strategy_configs=strategy_configs, symbols=symbols, timeframe=timeframe, days_back=chunk_days, exchange=exchange ) # Yield chunk results yield chunk_results # Force cleanup after each chunk to manage memory self._cleanup_memory() # Report progress progress = ((chunk_index + 1) / total_chunks) * 100 self.logger.info(f"Streaming progress: {progress:.1f}% ({chunk_index + 1}/{total_chunks} chunks)") self.logger.info("BacktestingBatchProcessor: Completed streaming processing") except Exception as e: self.logger.error(f"Error in streaming processing: {e}") self._processing_stats['errors_count'] += 1 def aggregate_streaming_results( self, result_stream: Iterator[Dict[str, List[StrategyResult]]] ) -> Dict[str, List[StrategyResult]]: """ Aggregate results from streaming processing. Args: result_stream: Iterator of result chunks Returns: Aggregated results across all chunks """ try: aggregated_results = {} chunk_count = 0 for chunk_results in result_stream: chunk_count += 1 for strategy_name, strategy_results in chunk_results.items(): if strategy_name not in aggregated_results: aggregated_results[strategy_name] = [] aggregated_results[strategy_name].extend(strategy_results) # Periodic memory cleanup during aggregation if chunk_count % 5 == 0: # Every 5 chunks self._cleanup_memory() # Final statistics total_strategies = len(aggregated_results) total_results = sum(len(results) for results in aggregated_results.values()) self.logger.info(f"Aggregated {total_results} results across {total_strategies} strategies from {chunk_count} chunks") return aggregated_results except Exception as e: self.logger.error(f"Error aggregating streaming results: {e}") return {} def process_with_memory_constraints( self, strategy_configs: List[Dict[str, Any]], symbols: List[str], timeframe: str, days_back: int, max_memory_mb: float, exchange: str = "okx" ) -> Dict[str, List[StrategyResult]]: """ Process strategies with strict memory constraints. Automatically adjusts processing approach based on available memory. Args: strategy_configs: List of strategy configurations symbols: List of trading symbols to process timeframe: Timeframe for processing days_back: Number of days to look back max_memory_mb: Maximum memory usage in MB exchange: Exchange name Returns: Dictionary mapping strategy names to their results """ try: # Check current memory usage current_memory = psutil.Process(os.getpid()).memory_info().rss / 1024 / 1024 available_memory = max_memory_mb - current_memory self.logger.info(f"Memory-constrained processing: {available_memory:.1f} MB available of {max_memory_mb:.1f} MB limit") # Estimate memory requirements estimated_memory_per_strategy = 50 # MB - rough estimate estimated_total_memory = len(strategy_configs) * len(symbols) * estimated_memory_per_strategy if estimated_total_memory <= available_memory: # Sufficient memory for parallel processing self.logger.info("Sufficient memory available - using parallel processing") return self.process_strategies_parallel( strategy_configs, symbols, timeframe, days_back, exchange ) elif estimated_total_memory <= available_memory * 2: # Moderate memory constraint - use sequential processing self.logger.info("Moderate memory constraint - using sequential processing") return self.process_strategies_batch( strategy_configs, symbols, timeframe, days_back, exchange ) else: # Severe memory constraint - use streaming with warm-up self.logger.info("Severe memory constraint - using streaming processing with warm-up") stream = self.process_large_dataset_streaming_with_warmup( strategy_configs, symbols, timeframe, days_back, exchange ) return self.aggregate_streaming_results(stream) except Exception as e: self.logger.error(f"Error in memory-constrained processing: {e}") self._processing_stats['errors_count'] += 1 return {} def get_performance_metrics(self) -> Dict[str, Any]: """ Get comprehensive performance metrics for batch processing. Returns: Dictionary containing detailed performance metrics """ stats = self.get_processing_statistics() # Enhanced metrics enhanced_metrics = { **stats, 'cache_hit_rate': self._calculate_cache_hit_rate(), 'memory_efficiency': self._calculate_memory_efficiency(), 'throughput_signals_per_second': self._calculate_throughput(), 'parallel_efficiency': self._calculate_parallel_efficiency(), 'optimization_recommendations': self._generate_optimization_recommendations() } return enhanced_metrics def _calculate_cache_hit_rate(self) -> float: """Calculate cache hit rate from data integrator.""" try: cache_stats = self.data_integrator.get_cache_stats() total_requests = cache_stats.get('cache_hits', 0) + cache_stats.get('cache_misses', 0) if total_requests > 0: return (cache_stats.get('cache_hits', 0) / total_requests) * 100 return 0.0 except Exception: return 0.0 def _calculate_memory_efficiency(self) -> float: """Calculate memory efficiency score.""" peak_memory = self._processing_stats.get('memory_peak_mb', 0) strategies_processed = self._processing_stats.get('strategies_processed', 1) if peak_memory > 0 and strategies_processed > 0: # Memory per strategy in MB (lower is better) memory_per_strategy = peak_memory / strategies_processed # Efficiency score (0-100, higher is better) # Assuming 100MB per strategy is baseline (50% efficiency) baseline_memory = 100 # MB efficiency = max(0, min(100, (baseline_memory / memory_per_strategy) * 50)) return efficiency return 100.0 # Perfect efficiency if no memory tracked def _calculate_throughput(self) -> float: """Calculate signals processed per second.""" total_signals = self._processing_stats.get('total_signals_generated', 0) processing_time = self._processing_stats.get('processing_time_seconds', 1) if processing_time > 0: return total_signals / processing_time return 0.0 def _calculate_parallel_efficiency(self) -> float: """Calculate parallel processing efficiency.""" strategies_processed = self._processing_stats.get('strategies_processed', 0) processing_time = self._processing_stats.get('processing_time_seconds', 1) max_workers = self.config.max_concurrent_strategies if strategies_processed > 0 and processing_time > 0: # Theoretical minimum time if perfectly parallel avg_time_per_strategy = processing_time / strategies_processed theoretical_min_time = avg_time_per_strategy * (strategies_processed / max_workers) # Efficiency as percentage of theoretical optimum efficiency = min(100, (theoretical_min_time / processing_time) * 100) return efficiency return 100.0 def _generate_optimization_recommendations(self) -> List[str]: """Generate optimization recommendations based on performance metrics.""" recommendations = [] # Memory recommendations memory_efficiency = self._calculate_memory_efficiency() if memory_efficiency < 50: recommendations.append("Consider reducing chunk_size_days or max_concurrent_strategies to improve memory efficiency") # Cache recommendations cache_hit_rate = self._calculate_cache_hit_rate() if cache_hit_rate < 30: recommendations.append("Enable indicator caching or increase cache timeout to improve performance") # Parallel efficiency recommendations parallel_efficiency = self._calculate_parallel_efficiency() if parallel_efficiency < 70: recommendations.append("Consider adjusting max_concurrent_strategies based on system capabilities") # Error rate recommendations error_rate = self._processing_stats.get('error_rate', 0) if error_rate > 10: recommendations.append("High error rate detected - check data availability and strategy configurations") # Throughput recommendations throughput = self._calculate_throughput() if throughput < 1.0: # Less than 1 signal per second recommendations.append("Low throughput detected - consider optimizing strategy calculations or using simpler indicators") if not recommendations: recommendations.append("Performance metrics are within acceptable ranges") return recommendations def optimize_configuration(self) -> BatchProcessingConfig: """ Automatically optimize configuration based on current performance metrics. Returns: Optimized configuration """ try: current_config = self.config optimized_config = BatchProcessingConfig() # Copy current values as baseline optimized_config.max_concurrent_strategies = current_config.max_concurrent_strategies optimized_config.chunk_size_days = current_config.chunk_size_days optimized_config.max_memory_usage_percent = current_config.max_memory_usage_percent optimized_config.result_cache_size = current_config.result_cache_size # Get current metrics memory_efficiency = self._calculate_memory_efficiency() parallel_efficiency = self._calculate_parallel_efficiency() error_rate = self._processing_stats.get('error_rate', 0) # Optimize based on metrics if memory_efficiency < 50: # Reduce memory pressure optimized_config.max_concurrent_strategies = max(1, current_config.max_concurrent_strategies - 1) optimized_config.chunk_size_days = max(7, current_config.chunk_size_days - 7) elif memory_efficiency > 80 and parallel_efficiency < 70: # Increase parallelism if memory allows optimized_config.max_concurrent_strategies = min(8, current_config.max_concurrent_strategies + 1) if error_rate > 10: # Reduce load to minimize errors optimized_config.max_concurrent_strategies = max(1, current_config.max_concurrent_strategies - 1) # Cache optimization cache_hit_rate = self._calculate_cache_hit_rate() if cache_hit_rate < 30: optimized_config.result_cache_size = min(2000, current_config.result_cache_size * 2) self.logger.info(f"Configuration optimized: workers {current_config.max_concurrent_strategies} -> {optimized_config.max_concurrent_strategies}, " f"chunk_size {current_config.chunk_size_days} -> {optimized_config.chunk_size_days}") return optimized_config except Exception as e: self.logger.error(f"Error optimizing configuration: {e}") return self.config def benchmark_processing_methods( self, strategy_configs: List[Dict[str, Any]], symbols: List[str], timeframe: str, days_back: int, exchange: str = "okx" ) -> Dict[str, Dict[str, Any]]: """ Benchmark different processing methods to determine optimal approach. Args: strategy_configs: List of strategy configurations symbols: List of trading symbols to process timeframe: Timeframe for processing days_back: Number of days to look back exchange: Exchange name Returns: Dictionary containing benchmark results for each method """ try: self.logger.info("Starting processing method benchmark") benchmark_results = {} # Test sequential processing start_time = datetime.now() self._reset_stats() sequential_results = self.process_strategies_batch( strategy_configs, symbols, timeframe, min(days_back, 7), exchange # Limit to 7 days for benchmark ) sequential_time = (datetime.now() - start_time).total_seconds() benchmark_results['sequential'] = { 'processing_time': sequential_time, 'results_count': sum(len(results) for results in sequential_results.values()), 'memory_peak_mb': self._processing_stats.get('memory_peak_mb', 0), 'throughput': self._calculate_throughput() } # Test parallel processing start_time = datetime.now() self._reset_stats() parallel_results = self.process_strategies_parallel( strategy_configs, symbols, timeframe, min(days_back, 7), exchange ) parallel_time = (datetime.now() - start_time).total_seconds() benchmark_results['parallel'] = { 'processing_time': parallel_time, 'results_count': sum(len(results) for results in parallel_results.values()), 'memory_peak_mb': self._processing_stats.get('memory_peak_mb', 0), 'throughput': self._calculate_throughput() } # Calculate speedup if sequential_time > 0: speedup = sequential_time / parallel_time benchmark_results['parallel']['speedup'] = speedup # Recommend best method if parallel_time < sequential_time * 0.8: # 20% improvement threshold benchmark_results['recommendation'] = 'parallel' else: benchmark_results['recommendation'] = 'sequential' self.logger.info(f"Benchmark completed. Recommended method: {benchmark_results['recommendation']}") return benchmark_results except Exception as e: self.logger.error(f"Error in benchmark: {e}") return {} def _reset_stats(self) -> None: """Reset processing statistics for benchmarking.""" self._processing_stats = { 'strategies_processed': 0, 'total_signals_generated': 0, 'processing_time_seconds': 0.0, 'memory_peak_mb': 0.0, 'errors_count': 0, 'validation_failures': 0 } # Clear result cache self._result_cache.clear() self.logger.debug("BacktestingBatchProcessor: Reset processing statistics and cleared result cache") def _calculate_warmup_period(self, strategy_configs: List[Dict[str, Any]]) -> int: """ Calculate the maximum warm-up period needed across all strategies. Args: strategy_configs: List of strategy configurations Returns: Maximum warm-up period in number of periods """ max_warmup = 0 for strategy_config in strategy_configs: strategy_name = strategy_config.get('name', 'unknown') # Common indicator warm-up requirements indicator_warmups = { 'ema': strategy_config.get('slow_period', strategy_config.get('period', 26)), 'sma': strategy_config.get('period', 20), 'rsi': strategy_config.get('period', 14), 'macd': max( strategy_config.get('slow_period', 26), strategy_config.get('signal_period', 9) ), 'bollinger': strategy_config.get('period', 20), 'stochastic': strategy_config.get('k_period', 14) } # Determine strategy type and required warm-up if 'ema' in strategy_name.lower(): warmup = max( strategy_config.get('fast_period', 12), strategy_config.get('slow_period', 26) ) elif 'macd' in strategy_name.lower(): warmup = max( strategy_config.get('slow_period', 26), strategy_config.get('signal_period', 9) ) + 10 # Additional buffer for MACD convergence elif 'rsi' in strategy_name.lower(): warmup = strategy_config.get('period', 14) + 5 # Additional buffer for RSI stabilization else: # Generic estimation based on common indicators warmup = 30 # Conservative default max_warmup = max(max_warmup, warmup) # Add safety buffer max_warmup += 10 self.logger.debug(f"Calculated warm-up period: {max_warmup} periods") return max_warmup def process_large_dataset_streaming_with_warmup( self, strategy_configs: List[Dict[str, Any]], symbols: List[str], timeframe: str, total_days_back: int, exchange: str = "okx" ) -> Iterator[Dict[str, List[StrategyResult]]]: """ Process large datasets using streaming approach with proper warm-up period handling. This method ensures indicator continuity across chunks by including warm-up data from previous chunks and trimming overlapping results. Args: strategy_configs: List of strategy configurations symbols: List of trading symbols to process timeframe: Timeframe for processing total_days_back: Total number of days to process exchange: Exchange name Yields: Dictionary chunks mapping strategy names to their results (with overlaps removed) """ try: chunk_size = self.config.chunk_size_days warmup_days = self._calculate_warmup_period(strategy_configs) # Adjust chunk size if it's too small relative to warm-up if chunk_size <= warmup_days: adjusted_chunk_size = warmup_days * 2 self.logger.warning(f"Chunk size ({chunk_size}) too small for warm-up ({warmup_days}). " f"Adjusting to {adjusted_chunk_size} days") chunk_size = adjusted_chunk_size total_chunks = (total_days_back + chunk_size - 1) // chunk_size self.logger.info(f"BacktestingBatchProcessor: Starting streaming with warm-up processing of " f"{total_days_back} days in {total_chunks} chunks (warm-up: {warmup_days} days)") for chunk_index in range(total_chunks): # Calculate date range for this chunk chunk_start_days = chunk_index * chunk_size chunk_end_days = min((chunk_index + 1) * chunk_size, total_days_back) # Include warm-up data for chunks after the first if chunk_index == 0: # First chunk: no warm-up available, process as-is processing_start_days = chunk_start_days processing_days = chunk_end_days - chunk_start_days trim_warmup = False else: # Subsequent chunks: include warm-up from previous data processing_start_days = max(0, chunk_start_days - warmup_days) processing_days = chunk_end_days - processing_start_days trim_warmup = True self.logger.info(f"Processing chunk {chunk_index + 1}/{total_chunks}: " f"target days {chunk_start_days}-{chunk_end_days}, " f"processing days {processing_start_days}-{chunk_end_days} " f"(warm-up: {warmup_days if trim_warmup else 0})") # Memory check before processing chunk if self.config.enable_memory_monitoring: self._check_memory_usage() # Process chunk with warm-up data chunk_results = self.process_strategies_parallel( strategy_configs=strategy_configs, symbols=symbols, timeframe=timeframe, days_back=processing_days, exchange=exchange ) # Trim warm-up period from results to avoid overlaps if trim_warmup: chunk_results = self._trim_warmup_from_results( chunk_results, warmup_days, chunk_start_days, chunk_end_days ) # Yield processed chunk results yield chunk_results # Force cleanup after each chunk to manage memory self._cleanup_memory() # Report progress progress = ((chunk_index + 1) / total_chunks) * 100 self.logger.info(f"Streaming progress: {progress:.1f}% ({chunk_index + 1}/{total_chunks} chunks)") self.logger.info("BacktestingBatchProcessor: Completed streaming processing with warm-up") except Exception as e: self.logger.error(f"Error in streaming processing with warm-up: {e}") self._processing_stats['errors_count'] += 1 def _trim_warmup_from_results( self, chunk_results: Dict[str, List[StrategyResult]], warmup_days: int, target_start_days: int, target_end_days: int ) -> Dict[str, List[StrategyResult]]: """ Trim warm-up period from chunk results to avoid overlapping data. Args: chunk_results: Results from chunk processing (including warm-up) warmup_days: Number of warm-up days to trim target_start_days: Target start day for this chunk target_end_days: Target end day for this chunk Returns: Trimmed results containing only the target date range """ try: from datetime import datetime, timedelta trimmed_results = {} # Calculate cutoff timestamp (approximate, since we don't have exact start date) # This is a simplified approach - in production, you'd use actual timestamps for strategy_name, strategy_results in chunk_results.items(): if not strategy_results: trimmed_results[strategy_name] = [] continue # Sort results by timestamp to identify warm-up period sorted_results = sorted(strategy_results, key=lambda r: r.timestamp) # Simple approach: remove first portion equivalent to warm-up days # This assumes roughly uniform distribution of signals total_results = len(sorted_results) if total_results > 0: # Estimate warm-up portion based on proportion processing_days = target_end_days - max(0, target_start_days - warmup_days) target_days = target_end_days - target_start_days if processing_days > target_days: warmup_proportion = warmup_days / processing_days warmup_count = int(total_results * warmup_proportion) # Keep results after warm-up period trimmed_results[strategy_name] = sorted_results[warmup_count:] self.logger.debug(f"Strategy {strategy_name}: trimmed {warmup_count}/{total_results} warm-up results") else: # No trimming needed trimmed_results[strategy_name] = sorted_results else: trimmed_results[strategy_name] = [] return trimmed_results except Exception as e: self.logger.error(f"Error trimming warm-up from results: {e}") return chunk_results # Return original results if trimming fails