TCPDashboard/strategies/batch_processing.py
Vasily.onl 8c23489ff0 4.0 - 4.0 Implement real-time strategy execution and data integration features
- Added `realtime_execution.py` for real-time strategy execution, enabling live signal generation and integration with the dashboard's chart refresh cycle.
- Introduced `data_integration.py` to manage market data orchestration, caching, and technical indicator calculations for strategy signal generation.
- Implemented `validation.py` for comprehensive validation and quality assessment of strategy-generated signals, ensuring reliability and consistency.
- Developed `batch_processing.py` to facilitate efficient backtesting of multiple strategies across large datasets with memory management and performance optimization.
- Updated `__init__.py` files to include new modules and ensure proper exports, enhancing modularity and maintainability.
- Enhanced unit tests for the new features, ensuring robust functionality and adherence to project standards.

These changes establish a solid foundation for real-time strategy execution and data integration, aligning with project goals for modularity, performance, and maintainability.
2025-06-12 18:29:39 +08:00

1059 lines
45 KiB
Python

"""
Batch Processing for Strategy Backtesting
This module provides efficient batch processing capabilities for running
multiple strategies across large datasets with memory management and
performance optimization.
"""
from typing import List, Dict, Any, Optional, Tuple, Iterator
from dataclasses import dataclass
from datetime import datetime, timezone
import pandas as pd
from concurrent.futures import ThreadPoolExecutor, as_completed
import gc
import os
import psutil
from .data_integration import StrategyDataIntegrator, StrategyDataIntegrationConfig
from .data_types import StrategyResult, StrategySignal
from .validation import StrategySignalValidator, ValidationConfig
from utils.logger import get_logger
@dataclass
class BatchProcessingConfig:
"""Configuration for batch processing operations."""
max_concurrent_strategies: int = 4 # Number of strategies to process concurrently
max_memory_usage_percent: float = 80.0 # Maximum memory usage threshold
chunk_size_days: int = 30 # Days to process per chunk for large datasets
enable_memory_monitoring: bool = True
enable_result_validation: bool = True
result_cache_size: int = 1000 # Maximum cached results
progress_reporting_interval: int = 10 # Report progress every N strategies
class BacktestingBatchProcessor:
"""
Efficient batch processing for strategy backtesting.
Provides memory-efficient processing of multiple strategies across
large datasets with parallel execution and performance monitoring.
"""
def __init__(self, config: BatchProcessingConfig = None):
"""
Initialize batch processor.
Args:
config: Batch processing configuration
"""
self.config = config or BatchProcessingConfig()
self.logger = get_logger()
# Initialize components
self.data_integrator = StrategyDataIntegrator()
self.signal_validator = StrategySignalValidator() if self.config.enable_result_validation else None
# Processing statistics
self._processing_stats = {
'strategies_processed': 0,
'total_signals_generated': 0,
'processing_time_seconds': 0.0,
'memory_peak_mb': 0.0,
'errors_count': 0,
'validation_failures': 0
}
# Result cache for performance
self._result_cache = {}
def process_strategies_batch(
self,
strategy_configs: List[Dict[str, Any]],
symbols: List[str],
timeframe: str,
days_back: int,
exchange: str = "okx"
) -> Dict[str, List[StrategyResult]]:
"""
Process multiple strategies across multiple symbols efficiently.
Args:
strategy_configs: List of strategy configurations
symbols: List of trading symbols to process
timeframe: Timeframe for processing
days_back: Number of days to look back
exchange: Exchange name
Returns:
Dictionary mapping strategy names to their results
"""
try:
start_time = datetime.now()
self.logger.info(f"BacktestingBatchProcessor: Starting batch processing of {len(strategy_configs)} strategies across {len(symbols)} symbols")
# Initialize results container
batch_results = {}
# Process strategies with memory monitoring
for i, strategy_config in enumerate(strategy_configs):
strategy_name = strategy_config.get('name', f'strategy_{i}')
# Memory check
if self.config.enable_memory_monitoring:
self._check_memory_usage()
# Process strategy across all symbols
strategy_results = self._process_single_strategy_batch(
strategy_config, symbols, timeframe, days_back, exchange
)
batch_results[strategy_name] = strategy_results
self._processing_stats['strategies_processed'] += 1
# Progress reporting
if (i + 1) % self.config.progress_reporting_interval == 0:
progress = ((i + 1) / len(strategy_configs)) * 100
self.logger.info(f"Batch processing progress: {progress:.1f}% ({i + 1}/{len(strategy_configs)} strategies)")
# Update final statistics
processing_time = (datetime.now() - start_time).total_seconds()
self._processing_stats['processing_time_seconds'] = processing_time
self.logger.info(f"BacktestingBatchProcessor: Completed batch processing in {processing_time:.2f} seconds")
return batch_results
except Exception as e:
self.logger.error(f"Error in batch processing: {e}")
self._processing_stats['errors_count'] += 1
return {}
def _process_single_strategy_batch(
self,
strategy_config: Dict[str, Any],
symbols: List[str],
timeframe: str,
days_back: int,
exchange: str
) -> List[StrategyResult]:
"""Process a single strategy across multiple symbols."""
all_results = []
strategy_name = strategy_config.get('name', 'unknown')
for symbol in symbols:
try:
# Calculate strategy signals for this symbol
results = self.data_integrator.calculate_strategy_signals_orchestrated(
strategy_name=strategy_name,
strategy_config=strategy_config,
symbol=symbol,
timeframe=timeframe,
days_back=days_back,
exchange=exchange
)
# Validate results if enabled
if self.signal_validator and results:
validated_results = self._validate_strategy_results(results)
all_results.extend(validated_results)
else:
all_results.extend(results)
# Update signal count
signal_count = sum(len(result.signals) for result in results)
self._processing_stats['total_signals_generated'] += signal_count
except Exception as e:
self.logger.error(f"Error processing {strategy_name} for {symbol}: {e}")
self._processing_stats['errors_count'] += 1
return all_results
def _validate_strategy_results(self, results: List[StrategyResult]) -> List[StrategyResult]:
"""Validate strategy results and filter invalid signals."""
validated_results = []
for result in results:
if result.signals:
valid_signals, invalid_signals = self.signal_validator.validate_signals_batch(result.signals)
if invalid_signals:
self._processing_stats['validation_failures'] += len(invalid_signals)
self.logger.debug(f"Filtered {len(invalid_signals)} invalid signals from {result.strategy_name}")
# Create new result with only valid signals
if valid_signals:
validated_result = StrategyResult(
timestamp=result.timestamp,
symbol=result.symbol,
timeframe=result.timeframe,
strategy_name=result.strategy_name,
signals=valid_signals,
indicators_used=result.indicators_used,
metadata=result.metadata
)
validated_results.append(validated_result)
else:
validated_results.append(result)
return validated_results
def _check_memory_usage(self) -> None:
"""Monitor memory usage and trigger cleanup if needed."""
process = psutil.Process(os.getpid())
memory_percent = process.memory_percent()
memory_mb = process.memory_info().rss / 1024 / 1024
# Update peak memory tracking
self._processing_stats['memory_peak_mb'] = max(
self._processing_stats['memory_peak_mb'],
memory_mb
)
if memory_percent > self.config.max_memory_usage_percent:
self.logger.warning(f"High memory usage detected: {memory_percent:.1f}% ({memory_mb:.1f} MB)")
self._cleanup_memory()
def _cleanup_memory(self) -> None:
"""Perform memory cleanup operations."""
# Clear old cached results
if len(self._result_cache) > self.config.result_cache_size:
cache_items = list(self._result_cache.items())
# Keep only the most recent half
keep_size = self.config.result_cache_size // 2
self._result_cache = dict(cache_items[-keep_size:])
# Clear data integrator caches
self.data_integrator.clear_cache()
# Force garbage collection
gc.collect()
self.logger.debug("BacktestingBatchProcessor: Performed memory cleanup")
def get_processing_statistics(self) -> Dict[str, Any]:
"""Get comprehensive processing statistics."""
stats = self._processing_stats.copy()
# Calculate derived metrics
if stats['strategies_processed'] > 0:
stats['average_signals_per_strategy'] = stats['total_signals_generated'] / stats['strategies_processed']
stats['average_processing_time_per_strategy'] = stats['processing_time_seconds'] / stats['strategies_processed']
else:
stats['average_signals_per_strategy'] = 0
stats['average_processing_time_per_strategy'] = 0
stats['error_rate'] = (stats['errors_count'] / max(stats['strategies_processed'], 1)) * 100
stats['validation_failure_rate'] = (stats['validation_failures'] / max(stats['total_signals_generated'], 1)) * 100
return stats
def process_strategies_parallel(
self,
strategy_configs: List[Dict[str, Any]],
symbols: List[str],
timeframe: str,
days_back: int,
exchange: str = "okx"
) -> Dict[str, List[StrategyResult]]:
"""
Process multiple strategies in parallel for improved performance.
Args:
strategy_configs: List of strategy configurations
symbols: List of trading symbols to process
timeframe: Timeframe for processing
days_back: Number of days to look back
exchange: Exchange name
Returns:
Dictionary mapping strategy names to their results
"""
try:
start_time = datetime.now()
self.logger.info(f"BacktestingBatchProcessor: Starting parallel processing of {len(strategy_configs)} strategies")
batch_results = {}
# Use ThreadPoolExecutor for parallel strategy processing
with ThreadPoolExecutor(max_workers=self.config.max_concurrent_strategies) as executor:
# Submit all strategy processing tasks
future_to_strategy = {}
for strategy_config in strategy_configs:
strategy_name = strategy_config.get('name', f'strategy_{len(future_to_strategy)}')
future = executor.submit(
self._process_single_strategy_batch,
strategy_config, symbols, timeframe, days_back, exchange
)
future_to_strategy[future] = strategy_name
# Collect results as they complete
completed_count = 0
for future in as_completed(future_to_strategy):
strategy_name = future_to_strategy[future]
try:
strategy_results = future.result()
batch_results[strategy_name] = strategy_results
self._processing_stats['strategies_processed'] += 1
completed_count += 1
# Progress reporting
if completed_count % self.config.progress_reporting_interval == 0:
progress = (completed_count / len(strategy_configs)) * 100
self.logger.info(f"Parallel processing progress: {progress:.1f}% ({completed_count}/{len(strategy_configs)} strategies)")
except Exception as e:
self.logger.error(f"Error processing strategy {strategy_name}: {e}")
self._processing_stats['errors_count'] += 1
batch_results[strategy_name] = []
# Memory check after each completed strategy
if self.config.enable_memory_monitoring:
self._check_memory_usage()
# Update final statistics
processing_time = (datetime.now() - start_time).total_seconds()
self._processing_stats['processing_time_seconds'] = processing_time
self.logger.info(f"BacktestingBatchProcessor: Completed parallel processing in {processing_time:.2f} seconds")
return batch_results
except Exception as e:
self.logger.error(f"Error in parallel batch processing: {e}")
self._processing_stats['errors_count'] += 1
return {}
def process_symbols_parallel(
self,
strategy_config: Dict[str, Any],
symbols: List[str],
timeframe: str,
days_back: int,
exchange: str = "okx"
) -> List[StrategyResult]:
"""
Process a single strategy across multiple symbols in parallel.
Args:
strategy_config: Strategy configuration
symbols: List of trading symbols to process
timeframe: Timeframe for processing
days_back: Number of days to look back
exchange: Exchange name
Returns:
List of strategy results across all symbols
"""
try:
strategy_name = strategy_config.get('name', 'unknown')
self.logger.info(f"BacktestingBatchProcessor: Processing {strategy_name} across {len(symbols)} symbols in parallel")
all_results = []
# Use ThreadPoolExecutor for parallel symbol processing
with ThreadPoolExecutor(max_workers=min(len(symbols), self.config.max_concurrent_strategies)) as executor:
# Submit symbol processing tasks
future_to_symbol = {}
for symbol in symbols:
future = executor.submit(
self._process_strategy_for_symbol,
strategy_config, symbol, timeframe, days_back, exchange
)
future_to_symbol[future] = symbol
# Collect results as they complete
for future in as_completed(future_to_symbol):
symbol = future_to_symbol[future]
try:
symbol_results = future.result()
all_results.extend(symbol_results)
# Update signal count
signal_count = sum(len(result.signals) for result in symbol_results)
self._processing_stats['total_signals_generated'] += signal_count
except Exception as e:
self.logger.error(f"Error processing {strategy_name} for {symbol}: {e}")
self._processing_stats['errors_count'] += 1
return all_results
except Exception as e:
self.logger.error(f"Error in parallel symbol processing: {e}")
self._processing_stats['errors_count'] += 1
return []
def _process_strategy_for_symbol(
self,
strategy_config: Dict[str, Any],
symbol: str,
timeframe: str,
days_back: int,
exchange: str
) -> List[StrategyResult]:
"""Process a single strategy for a single symbol."""
try:
strategy_name = strategy_config.get('name', 'unknown')
# Calculate strategy signals for this symbol
results = self.data_integrator.calculate_strategy_signals_orchestrated(
strategy_name=strategy_name,
strategy_config=strategy_config,
symbol=symbol,
timeframe=timeframe,
days_back=days_back,
exchange=exchange
)
# Validate results if enabled
if self.signal_validator and results:
validated_results = self._validate_strategy_results(results)
return validated_results
else:
return results
except Exception as e:
self.logger.error(f"Error processing {strategy_config.get('name', 'unknown')} for {symbol}: {e}")
return []
def process_large_dataset_streaming(
self,
strategy_configs: List[Dict[str, Any]],
symbols: List[str],
timeframe: str,
total_days_back: int,
exchange: str = "okx"
) -> Iterator[Dict[str, List[StrategyResult]]]:
"""
Process large datasets using streaming approach with memory-efficient chunking.
This method processes data in chunks to avoid memory overflow when dealing
with very large historical datasets.
Args:
strategy_configs: List of strategy configurations
symbols: List of trading symbols to process
timeframe: Timeframe for processing
total_days_back: Total number of days to process
exchange: Exchange name
Yields:
Dictionary chunks mapping strategy names to their results
"""
try:
chunk_size = self.config.chunk_size_days
total_chunks = (total_days_back + chunk_size - 1) // chunk_size # Ceiling division
self.logger.info(f"BacktestingBatchProcessor: Starting streaming processing of {total_days_back} days in {total_chunks} chunks")
for chunk_index in range(total_chunks):
# Calculate date range for this chunk
chunk_start_days = chunk_index * chunk_size
chunk_end_days = min((chunk_index + 1) * chunk_size, total_days_back)
chunk_days = chunk_end_days - chunk_start_days
self.logger.info(f"Processing chunk {chunk_index + 1}/{total_chunks}: {chunk_days} days")
# Memory check before processing chunk
if self.config.enable_memory_monitoring:
self._check_memory_usage()
# Process chunk using parallel processing
chunk_results = self.process_strategies_parallel(
strategy_configs=strategy_configs,
symbols=symbols,
timeframe=timeframe,
days_back=chunk_days,
exchange=exchange
)
# Yield chunk results
yield chunk_results
# Force cleanup after each chunk to manage memory
self._cleanup_memory()
# Report progress
progress = ((chunk_index + 1) / total_chunks) * 100
self.logger.info(f"Streaming progress: {progress:.1f}% ({chunk_index + 1}/{total_chunks} chunks)")
self.logger.info("BacktestingBatchProcessor: Completed streaming processing")
except Exception as e:
self.logger.error(f"Error in streaming processing: {e}")
self._processing_stats['errors_count'] += 1
def aggregate_streaming_results(
self,
result_stream: Iterator[Dict[str, List[StrategyResult]]]
) -> Dict[str, List[StrategyResult]]:
"""
Aggregate results from streaming processing.
Args:
result_stream: Iterator of result chunks
Returns:
Aggregated results across all chunks
"""
try:
aggregated_results = {}
chunk_count = 0
for chunk_results in result_stream:
chunk_count += 1
for strategy_name, strategy_results in chunk_results.items():
if strategy_name not in aggregated_results:
aggregated_results[strategy_name] = []
aggregated_results[strategy_name].extend(strategy_results)
# Periodic memory cleanup during aggregation
if chunk_count % 5 == 0: # Every 5 chunks
self._cleanup_memory()
# Final statistics
total_strategies = len(aggregated_results)
total_results = sum(len(results) for results in aggregated_results.values())
self.logger.info(f"Aggregated {total_results} results across {total_strategies} strategies from {chunk_count} chunks")
return aggregated_results
except Exception as e:
self.logger.error(f"Error aggregating streaming results: {e}")
return {}
def process_with_memory_constraints(
self,
strategy_configs: List[Dict[str, Any]],
symbols: List[str],
timeframe: str,
days_back: int,
max_memory_mb: float,
exchange: str = "okx"
) -> Dict[str, List[StrategyResult]]:
"""
Process strategies with strict memory constraints.
Automatically adjusts processing approach based on available memory.
Args:
strategy_configs: List of strategy configurations
symbols: List of trading symbols to process
timeframe: Timeframe for processing
days_back: Number of days to look back
max_memory_mb: Maximum memory usage in MB
exchange: Exchange name
Returns:
Dictionary mapping strategy names to their results
"""
try:
# Check current memory usage
current_memory = psutil.Process(os.getpid()).memory_info().rss / 1024 / 1024
available_memory = max_memory_mb - current_memory
self.logger.info(f"Memory-constrained processing: {available_memory:.1f} MB available of {max_memory_mb:.1f} MB limit")
# Estimate memory requirements
estimated_memory_per_strategy = 50 # MB - rough estimate
estimated_total_memory = len(strategy_configs) * len(symbols) * estimated_memory_per_strategy
if estimated_total_memory <= available_memory:
# Sufficient memory for parallel processing
self.logger.info("Sufficient memory available - using parallel processing")
return self.process_strategies_parallel(
strategy_configs, symbols, timeframe, days_back, exchange
)
elif estimated_total_memory <= available_memory * 2:
# Moderate memory constraint - use sequential processing
self.logger.info("Moderate memory constraint - using sequential processing")
return self.process_strategies_batch(
strategy_configs, symbols, timeframe, days_back, exchange
)
else:
# Severe memory constraint - use streaming with warm-up
self.logger.info("Severe memory constraint - using streaming processing with warm-up")
stream = self.process_large_dataset_streaming_with_warmup(
strategy_configs, symbols, timeframe, days_back, exchange
)
return self.aggregate_streaming_results(stream)
except Exception as e:
self.logger.error(f"Error in memory-constrained processing: {e}")
self._processing_stats['errors_count'] += 1
return {}
def get_performance_metrics(self) -> Dict[str, Any]:
"""
Get comprehensive performance metrics for batch processing.
Returns:
Dictionary containing detailed performance metrics
"""
stats = self.get_processing_statistics()
# Enhanced metrics
enhanced_metrics = {
**stats,
'cache_hit_rate': self._calculate_cache_hit_rate(),
'memory_efficiency': self._calculate_memory_efficiency(),
'throughput_signals_per_second': self._calculate_throughput(),
'parallel_efficiency': self._calculate_parallel_efficiency(),
'optimization_recommendations': self._generate_optimization_recommendations()
}
return enhanced_metrics
def _calculate_cache_hit_rate(self) -> float:
"""Calculate cache hit rate from data integrator."""
try:
cache_stats = self.data_integrator.get_cache_stats()
total_requests = cache_stats.get('cache_hits', 0) + cache_stats.get('cache_misses', 0)
if total_requests > 0:
return (cache_stats.get('cache_hits', 0) / total_requests) * 100
return 0.0
except Exception:
return 0.0
def _calculate_memory_efficiency(self) -> float:
"""Calculate memory efficiency score."""
peak_memory = self._processing_stats.get('memory_peak_mb', 0)
strategies_processed = self._processing_stats.get('strategies_processed', 1)
if peak_memory > 0 and strategies_processed > 0:
# Memory per strategy in MB (lower is better)
memory_per_strategy = peak_memory / strategies_processed
# Efficiency score (0-100, higher is better)
# Assuming 100MB per strategy is baseline (50% efficiency)
baseline_memory = 100 # MB
efficiency = max(0, min(100, (baseline_memory / memory_per_strategy) * 50))
return efficiency
return 100.0 # Perfect efficiency if no memory tracked
def _calculate_throughput(self) -> float:
"""Calculate signals processed per second."""
total_signals = self._processing_stats.get('total_signals_generated', 0)
processing_time = self._processing_stats.get('processing_time_seconds', 1)
if processing_time > 0:
return total_signals / processing_time
return 0.0
def _calculate_parallel_efficiency(self) -> float:
"""Calculate parallel processing efficiency."""
strategies_processed = self._processing_stats.get('strategies_processed', 0)
processing_time = self._processing_stats.get('processing_time_seconds', 1)
max_workers = self.config.max_concurrent_strategies
if strategies_processed > 0 and processing_time > 0:
# Theoretical minimum time if perfectly parallel
avg_time_per_strategy = processing_time / strategies_processed
theoretical_min_time = avg_time_per_strategy * (strategies_processed / max_workers)
# Efficiency as percentage of theoretical optimum
efficiency = min(100, (theoretical_min_time / processing_time) * 100)
return efficiency
return 100.0
def _generate_optimization_recommendations(self) -> List[str]:
"""Generate optimization recommendations based on performance metrics."""
recommendations = []
# Memory recommendations
memory_efficiency = self._calculate_memory_efficiency()
if memory_efficiency < 50:
recommendations.append("Consider reducing chunk_size_days or max_concurrent_strategies to improve memory efficiency")
# Cache recommendations
cache_hit_rate = self._calculate_cache_hit_rate()
if cache_hit_rate < 30:
recommendations.append("Enable indicator caching or increase cache timeout to improve performance")
# Parallel efficiency recommendations
parallel_efficiency = self._calculate_parallel_efficiency()
if parallel_efficiency < 70:
recommendations.append("Consider adjusting max_concurrent_strategies based on system capabilities")
# Error rate recommendations
error_rate = self._processing_stats.get('error_rate', 0)
if error_rate > 10:
recommendations.append("High error rate detected - check data availability and strategy configurations")
# Throughput recommendations
throughput = self._calculate_throughput()
if throughput < 1.0: # Less than 1 signal per second
recommendations.append("Low throughput detected - consider optimizing strategy calculations or using simpler indicators")
if not recommendations:
recommendations.append("Performance metrics are within acceptable ranges")
return recommendations
def optimize_configuration(self) -> BatchProcessingConfig:
"""
Automatically optimize configuration based on current performance metrics.
Returns:
Optimized configuration
"""
try:
current_config = self.config
optimized_config = BatchProcessingConfig()
# Copy current values as baseline
optimized_config.max_concurrent_strategies = current_config.max_concurrent_strategies
optimized_config.chunk_size_days = current_config.chunk_size_days
optimized_config.max_memory_usage_percent = current_config.max_memory_usage_percent
optimized_config.result_cache_size = current_config.result_cache_size
# Get current metrics
memory_efficiency = self._calculate_memory_efficiency()
parallel_efficiency = self._calculate_parallel_efficiency()
error_rate = self._processing_stats.get('error_rate', 0)
# Optimize based on metrics
if memory_efficiency < 50:
# Reduce memory pressure
optimized_config.max_concurrent_strategies = max(1, current_config.max_concurrent_strategies - 1)
optimized_config.chunk_size_days = max(7, current_config.chunk_size_days - 7)
elif memory_efficiency > 80 and parallel_efficiency < 70:
# Increase parallelism if memory allows
optimized_config.max_concurrent_strategies = min(8, current_config.max_concurrent_strategies + 1)
if error_rate > 10:
# Reduce load to minimize errors
optimized_config.max_concurrent_strategies = max(1, current_config.max_concurrent_strategies - 1)
# Cache optimization
cache_hit_rate = self._calculate_cache_hit_rate()
if cache_hit_rate < 30:
optimized_config.result_cache_size = min(2000, current_config.result_cache_size * 2)
self.logger.info(f"Configuration optimized: workers {current_config.max_concurrent_strategies} -> {optimized_config.max_concurrent_strategies}, "
f"chunk_size {current_config.chunk_size_days} -> {optimized_config.chunk_size_days}")
return optimized_config
except Exception as e:
self.logger.error(f"Error optimizing configuration: {e}")
return self.config
def benchmark_processing_methods(
self,
strategy_configs: List[Dict[str, Any]],
symbols: List[str],
timeframe: str,
days_back: int,
exchange: str = "okx"
) -> Dict[str, Dict[str, Any]]:
"""
Benchmark different processing methods to determine optimal approach.
Args:
strategy_configs: List of strategy configurations
symbols: List of trading symbols to process
timeframe: Timeframe for processing
days_back: Number of days to look back
exchange: Exchange name
Returns:
Dictionary containing benchmark results for each method
"""
try:
self.logger.info("Starting processing method benchmark")
benchmark_results = {}
# Test sequential processing
start_time = datetime.now()
self._reset_stats()
sequential_results = self.process_strategies_batch(
strategy_configs, symbols, timeframe, min(days_back, 7), exchange # Limit to 7 days for benchmark
)
sequential_time = (datetime.now() - start_time).total_seconds()
benchmark_results['sequential'] = {
'processing_time': sequential_time,
'results_count': sum(len(results) for results in sequential_results.values()),
'memory_peak_mb': self._processing_stats.get('memory_peak_mb', 0),
'throughput': self._calculate_throughput()
}
# Test parallel processing
start_time = datetime.now()
self._reset_stats()
parallel_results = self.process_strategies_parallel(
strategy_configs, symbols, timeframe, min(days_back, 7), exchange
)
parallel_time = (datetime.now() - start_time).total_seconds()
benchmark_results['parallel'] = {
'processing_time': parallel_time,
'results_count': sum(len(results) for results in parallel_results.values()),
'memory_peak_mb': self._processing_stats.get('memory_peak_mb', 0),
'throughput': self._calculate_throughput()
}
# Calculate speedup
if sequential_time > 0:
speedup = sequential_time / parallel_time
benchmark_results['parallel']['speedup'] = speedup
# Recommend best method
if parallel_time < sequential_time * 0.8: # 20% improvement threshold
benchmark_results['recommendation'] = 'parallel'
else:
benchmark_results['recommendation'] = 'sequential'
self.logger.info(f"Benchmark completed. Recommended method: {benchmark_results['recommendation']}")
return benchmark_results
except Exception as e:
self.logger.error(f"Error in benchmark: {e}")
return {}
def _reset_stats(self) -> None:
"""Reset processing statistics for benchmarking."""
self._processing_stats = {
'strategies_processed': 0,
'total_signals_generated': 0,
'processing_time_seconds': 0.0,
'memory_peak_mb': 0.0,
'errors_count': 0,
'validation_failures': 0
}
# Clear result cache
self._result_cache.clear()
self.logger.debug("BacktestingBatchProcessor: Reset processing statistics and cleared result cache")
def _calculate_warmup_period(self, strategy_configs: List[Dict[str, Any]]) -> int:
"""
Calculate the maximum warm-up period needed across all strategies.
Args:
strategy_configs: List of strategy configurations
Returns:
Maximum warm-up period in number of periods
"""
max_warmup = 0
for strategy_config in strategy_configs:
strategy_name = strategy_config.get('name', 'unknown')
# Common indicator warm-up requirements
indicator_warmups = {
'ema': strategy_config.get('slow_period', strategy_config.get('period', 26)),
'sma': strategy_config.get('period', 20),
'rsi': strategy_config.get('period', 14),
'macd': max(
strategy_config.get('slow_period', 26),
strategy_config.get('signal_period', 9)
),
'bollinger': strategy_config.get('period', 20),
'stochastic': strategy_config.get('k_period', 14)
}
# Determine strategy type and required warm-up
if 'ema' in strategy_name.lower():
warmup = max(
strategy_config.get('fast_period', 12),
strategy_config.get('slow_period', 26)
)
elif 'macd' in strategy_name.lower():
warmup = max(
strategy_config.get('slow_period', 26),
strategy_config.get('signal_period', 9)
) + 10 # Additional buffer for MACD convergence
elif 'rsi' in strategy_name.lower():
warmup = strategy_config.get('period', 14) + 5 # Additional buffer for RSI stabilization
else:
# Generic estimation based on common indicators
warmup = 30 # Conservative default
max_warmup = max(max_warmup, warmup)
# Add safety buffer
max_warmup += 10
self.logger.debug(f"Calculated warm-up period: {max_warmup} periods")
return max_warmup
def process_large_dataset_streaming_with_warmup(
self,
strategy_configs: List[Dict[str, Any]],
symbols: List[str],
timeframe: str,
total_days_back: int,
exchange: str = "okx"
) -> Iterator[Dict[str, List[StrategyResult]]]:
"""
Process large datasets using streaming approach with proper warm-up period handling.
This method ensures indicator continuity across chunks by including warm-up data
from previous chunks and trimming overlapping results.
Args:
strategy_configs: List of strategy configurations
symbols: List of trading symbols to process
timeframe: Timeframe for processing
total_days_back: Total number of days to process
exchange: Exchange name
Yields:
Dictionary chunks mapping strategy names to their results (with overlaps removed)
"""
try:
chunk_size = self.config.chunk_size_days
warmup_days = self._calculate_warmup_period(strategy_configs)
# Adjust chunk size if it's too small relative to warm-up
if chunk_size <= warmup_days:
adjusted_chunk_size = warmup_days * 2
self.logger.warning(f"Chunk size ({chunk_size}) too small for warm-up ({warmup_days}). "
f"Adjusting to {adjusted_chunk_size} days")
chunk_size = adjusted_chunk_size
total_chunks = (total_days_back + chunk_size - 1) // chunk_size
self.logger.info(f"BacktestingBatchProcessor: Starting streaming with warm-up processing of "
f"{total_days_back} days in {total_chunks} chunks (warm-up: {warmup_days} days)")
for chunk_index in range(total_chunks):
# Calculate date range for this chunk
chunk_start_days = chunk_index * chunk_size
chunk_end_days = min((chunk_index + 1) * chunk_size, total_days_back)
# Include warm-up data for chunks after the first
if chunk_index == 0:
# First chunk: no warm-up available, process as-is
processing_start_days = chunk_start_days
processing_days = chunk_end_days - chunk_start_days
trim_warmup = False
else:
# Subsequent chunks: include warm-up from previous data
processing_start_days = max(0, chunk_start_days - warmup_days)
processing_days = chunk_end_days - processing_start_days
trim_warmup = True
self.logger.info(f"Processing chunk {chunk_index + 1}/{total_chunks}: "
f"target days {chunk_start_days}-{chunk_end_days}, "
f"processing days {processing_start_days}-{chunk_end_days} "
f"(warm-up: {warmup_days if trim_warmup else 0})")
# Memory check before processing chunk
if self.config.enable_memory_monitoring:
self._check_memory_usage()
# Process chunk with warm-up data
chunk_results = self.process_strategies_parallel(
strategy_configs=strategy_configs,
symbols=symbols,
timeframe=timeframe,
days_back=processing_days,
exchange=exchange
)
# Trim warm-up period from results to avoid overlaps
if trim_warmup:
chunk_results = self._trim_warmup_from_results(
chunk_results, warmup_days, chunk_start_days, chunk_end_days
)
# Yield processed chunk results
yield chunk_results
# Force cleanup after each chunk to manage memory
self._cleanup_memory()
# Report progress
progress = ((chunk_index + 1) / total_chunks) * 100
self.logger.info(f"Streaming progress: {progress:.1f}% ({chunk_index + 1}/{total_chunks} chunks)")
self.logger.info("BacktestingBatchProcessor: Completed streaming processing with warm-up")
except Exception as e:
self.logger.error(f"Error in streaming processing with warm-up: {e}")
self._processing_stats['errors_count'] += 1
def _trim_warmup_from_results(
self,
chunk_results: Dict[str, List[StrategyResult]],
warmup_days: int,
target_start_days: int,
target_end_days: int
) -> Dict[str, List[StrategyResult]]:
"""
Trim warm-up period from chunk results to avoid overlapping data.
Args:
chunk_results: Results from chunk processing (including warm-up)
warmup_days: Number of warm-up days to trim
target_start_days: Target start day for this chunk
target_end_days: Target end day for this chunk
Returns:
Trimmed results containing only the target date range
"""
try:
from datetime import datetime, timedelta
trimmed_results = {}
# Calculate cutoff timestamp (approximate, since we don't have exact start date)
# This is a simplified approach - in production, you'd use actual timestamps
for strategy_name, strategy_results in chunk_results.items():
if not strategy_results:
trimmed_results[strategy_name] = []
continue
# Sort results by timestamp to identify warm-up period
sorted_results = sorted(strategy_results, key=lambda r: r.timestamp)
# Simple approach: remove first portion equivalent to warm-up days
# This assumes roughly uniform distribution of signals
total_results = len(sorted_results)
if total_results > 0:
# Estimate warm-up portion based on proportion
processing_days = target_end_days - max(0, target_start_days - warmup_days)
target_days = target_end_days - target_start_days
if processing_days > target_days:
warmup_proportion = warmup_days / processing_days
warmup_count = int(total_results * warmup_proportion)
# Keep results after warm-up period
trimmed_results[strategy_name] = sorted_results[warmup_count:]
self.logger.debug(f"Strategy {strategy_name}: trimmed {warmup_count}/{total_results} warm-up results")
else:
# No trimming needed
trimmed_results[strategy_name] = sorted_results
else:
trimmed_results[strategy_name] = []
return trimmed_results
except Exception as e:
self.logger.error(f"Error trimming warm-up from results: {e}")
return chunk_results # Return original results if trimming fails