1059 lines
45 KiB
Python
1059 lines
45 KiB
Python
|
|
"""
|
||
|
|
Batch Processing for Strategy Backtesting
|
||
|
|
|
||
|
|
This module provides efficient batch processing capabilities for running
|
||
|
|
multiple strategies across large datasets with memory management and
|
||
|
|
performance optimization.
|
||
|
|
"""
|
||
|
|
|
||
|
|
from typing import List, Dict, Any, Optional, Tuple, Iterator
|
||
|
|
from dataclasses import dataclass
|
||
|
|
from datetime import datetime, timezone
|
||
|
|
import pandas as pd
|
||
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||
|
|
import gc
|
||
|
|
import os
|
||
|
|
import psutil
|
||
|
|
|
||
|
|
from .data_integration import StrategyDataIntegrator, StrategyDataIntegrationConfig
|
||
|
|
from .data_types import StrategyResult, StrategySignal
|
||
|
|
from .validation import StrategySignalValidator, ValidationConfig
|
||
|
|
from utils.logger import get_logger
|
||
|
|
|
||
|
|
|
||
|
|
@dataclass
|
||
|
|
class BatchProcessingConfig:
|
||
|
|
"""Configuration for batch processing operations."""
|
||
|
|
max_concurrent_strategies: int = 4 # Number of strategies to process concurrently
|
||
|
|
max_memory_usage_percent: float = 80.0 # Maximum memory usage threshold
|
||
|
|
chunk_size_days: int = 30 # Days to process per chunk for large datasets
|
||
|
|
enable_memory_monitoring: bool = True
|
||
|
|
enable_result_validation: bool = True
|
||
|
|
result_cache_size: int = 1000 # Maximum cached results
|
||
|
|
progress_reporting_interval: int = 10 # Report progress every N strategies
|
||
|
|
|
||
|
|
|
||
|
|
class BacktestingBatchProcessor:
|
||
|
|
"""
|
||
|
|
Efficient batch processing for strategy backtesting.
|
||
|
|
|
||
|
|
Provides memory-efficient processing of multiple strategies across
|
||
|
|
large datasets with parallel execution and performance monitoring.
|
||
|
|
"""
|
||
|
|
|
||
|
|
def __init__(self, config: BatchProcessingConfig = None):
|
||
|
|
"""
|
||
|
|
Initialize batch processor.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
config: Batch processing configuration
|
||
|
|
"""
|
||
|
|
self.config = config or BatchProcessingConfig()
|
||
|
|
self.logger = get_logger()
|
||
|
|
|
||
|
|
# Initialize components
|
||
|
|
self.data_integrator = StrategyDataIntegrator()
|
||
|
|
self.signal_validator = StrategySignalValidator() if self.config.enable_result_validation else None
|
||
|
|
|
||
|
|
# Processing statistics
|
||
|
|
self._processing_stats = {
|
||
|
|
'strategies_processed': 0,
|
||
|
|
'total_signals_generated': 0,
|
||
|
|
'processing_time_seconds': 0.0,
|
||
|
|
'memory_peak_mb': 0.0,
|
||
|
|
'errors_count': 0,
|
||
|
|
'validation_failures': 0
|
||
|
|
}
|
||
|
|
|
||
|
|
# Result cache for performance
|
||
|
|
self._result_cache = {}
|
||
|
|
|
||
|
|
def process_strategies_batch(
|
||
|
|
self,
|
||
|
|
strategy_configs: List[Dict[str, Any]],
|
||
|
|
symbols: List[str],
|
||
|
|
timeframe: str,
|
||
|
|
days_back: int,
|
||
|
|
exchange: str = "okx"
|
||
|
|
) -> Dict[str, List[StrategyResult]]:
|
||
|
|
"""
|
||
|
|
Process multiple strategies across multiple symbols efficiently.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
strategy_configs: List of strategy configurations
|
||
|
|
symbols: List of trading symbols to process
|
||
|
|
timeframe: Timeframe for processing
|
||
|
|
days_back: Number of days to look back
|
||
|
|
exchange: Exchange name
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
Dictionary mapping strategy names to their results
|
||
|
|
"""
|
||
|
|
try:
|
||
|
|
start_time = datetime.now()
|
||
|
|
self.logger.info(f"BacktestingBatchProcessor: Starting batch processing of {len(strategy_configs)} strategies across {len(symbols)} symbols")
|
||
|
|
|
||
|
|
# Initialize results container
|
||
|
|
batch_results = {}
|
||
|
|
|
||
|
|
# Process strategies with memory monitoring
|
||
|
|
for i, strategy_config in enumerate(strategy_configs):
|
||
|
|
strategy_name = strategy_config.get('name', f'strategy_{i}')
|
||
|
|
|
||
|
|
# Memory check
|
||
|
|
if self.config.enable_memory_monitoring:
|
||
|
|
self._check_memory_usage()
|
||
|
|
|
||
|
|
# Process strategy across all symbols
|
||
|
|
strategy_results = self._process_single_strategy_batch(
|
||
|
|
strategy_config, symbols, timeframe, days_back, exchange
|
||
|
|
)
|
||
|
|
|
||
|
|
batch_results[strategy_name] = strategy_results
|
||
|
|
self._processing_stats['strategies_processed'] += 1
|
||
|
|
|
||
|
|
# Progress reporting
|
||
|
|
if (i + 1) % self.config.progress_reporting_interval == 0:
|
||
|
|
progress = ((i + 1) / len(strategy_configs)) * 100
|
||
|
|
self.logger.info(f"Batch processing progress: {progress:.1f}% ({i + 1}/{len(strategy_configs)} strategies)")
|
||
|
|
|
||
|
|
# Update final statistics
|
||
|
|
processing_time = (datetime.now() - start_time).total_seconds()
|
||
|
|
self._processing_stats['processing_time_seconds'] = processing_time
|
||
|
|
|
||
|
|
self.logger.info(f"BacktestingBatchProcessor: Completed batch processing in {processing_time:.2f} seconds")
|
||
|
|
return batch_results
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
self.logger.error(f"Error in batch processing: {e}")
|
||
|
|
self._processing_stats['errors_count'] += 1
|
||
|
|
return {}
|
||
|
|
|
||
|
|
def _process_single_strategy_batch(
|
||
|
|
self,
|
||
|
|
strategy_config: Dict[str, Any],
|
||
|
|
symbols: List[str],
|
||
|
|
timeframe: str,
|
||
|
|
days_back: int,
|
||
|
|
exchange: str
|
||
|
|
) -> List[StrategyResult]:
|
||
|
|
"""Process a single strategy across multiple symbols."""
|
||
|
|
all_results = []
|
||
|
|
strategy_name = strategy_config.get('name', 'unknown')
|
||
|
|
|
||
|
|
for symbol in symbols:
|
||
|
|
try:
|
||
|
|
# Calculate strategy signals for this symbol
|
||
|
|
results = self.data_integrator.calculate_strategy_signals_orchestrated(
|
||
|
|
strategy_name=strategy_name,
|
||
|
|
strategy_config=strategy_config,
|
||
|
|
symbol=symbol,
|
||
|
|
timeframe=timeframe,
|
||
|
|
days_back=days_back,
|
||
|
|
exchange=exchange
|
||
|
|
)
|
||
|
|
|
||
|
|
# Validate results if enabled
|
||
|
|
if self.signal_validator and results:
|
||
|
|
validated_results = self._validate_strategy_results(results)
|
||
|
|
all_results.extend(validated_results)
|
||
|
|
else:
|
||
|
|
all_results.extend(results)
|
||
|
|
|
||
|
|
# Update signal count
|
||
|
|
signal_count = sum(len(result.signals) for result in results)
|
||
|
|
self._processing_stats['total_signals_generated'] += signal_count
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
self.logger.error(f"Error processing {strategy_name} for {symbol}: {e}")
|
||
|
|
self._processing_stats['errors_count'] += 1
|
||
|
|
|
||
|
|
return all_results
|
||
|
|
|
||
|
|
def _validate_strategy_results(self, results: List[StrategyResult]) -> List[StrategyResult]:
|
||
|
|
"""Validate strategy results and filter invalid signals."""
|
||
|
|
validated_results = []
|
||
|
|
|
||
|
|
for result in results:
|
||
|
|
if result.signals:
|
||
|
|
valid_signals, invalid_signals = self.signal_validator.validate_signals_batch(result.signals)
|
||
|
|
|
||
|
|
if invalid_signals:
|
||
|
|
self._processing_stats['validation_failures'] += len(invalid_signals)
|
||
|
|
self.logger.debug(f"Filtered {len(invalid_signals)} invalid signals from {result.strategy_name}")
|
||
|
|
|
||
|
|
# Create new result with only valid signals
|
||
|
|
if valid_signals:
|
||
|
|
validated_result = StrategyResult(
|
||
|
|
timestamp=result.timestamp,
|
||
|
|
symbol=result.symbol,
|
||
|
|
timeframe=result.timeframe,
|
||
|
|
strategy_name=result.strategy_name,
|
||
|
|
signals=valid_signals,
|
||
|
|
indicators_used=result.indicators_used,
|
||
|
|
metadata=result.metadata
|
||
|
|
)
|
||
|
|
validated_results.append(validated_result)
|
||
|
|
else:
|
||
|
|
validated_results.append(result)
|
||
|
|
|
||
|
|
return validated_results
|
||
|
|
|
||
|
|
def _check_memory_usage(self) -> None:
|
||
|
|
"""Monitor memory usage and trigger cleanup if needed."""
|
||
|
|
process = psutil.Process(os.getpid())
|
||
|
|
memory_percent = process.memory_percent()
|
||
|
|
memory_mb = process.memory_info().rss / 1024 / 1024
|
||
|
|
|
||
|
|
# Update peak memory tracking
|
||
|
|
self._processing_stats['memory_peak_mb'] = max(
|
||
|
|
self._processing_stats['memory_peak_mb'],
|
||
|
|
memory_mb
|
||
|
|
)
|
||
|
|
|
||
|
|
if memory_percent > self.config.max_memory_usage_percent:
|
||
|
|
self.logger.warning(f"High memory usage detected: {memory_percent:.1f}% ({memory_mb:.1f} MB)")
|
||
|
|
self._cleanup_memory()
|
||
|
|
|
||
|
|
def _cleanup_memory(self) -> None:
|
||
|
|
"""Perform memory cleanup operations."""
|
||
|
|
# Clear old cached results
|
||
|
|
if len(self._result_cache) > self.config.result_cache_size:
|
||
|
|
cache_items = list(self._result_cache.items())
|
||
|
|
# Keep only the most recent half
|
||
|
|
keep_size = self.config.result_cache_size // 2
|
||
|
|
self._result_cache = dict(cache_items[-keep_size:])
|
||
|
|
|
||
|
|
# Clear data integrator caches
|
||
|
|
self.data_integrator.clear_cache()
|
||
|
|
|
||
|
|
# Force garbage collection
|
||
|
|
gc.collect()
|
||
|
|
|
||
|
|
self.logger.debug("BacktestingBatchProcessor: Performed memory cleanup")
|
||
|
|
|
||
|
|
def get_processing_statistics(self) -> Dict[str, Any]:
|
||
|
|
"""Get comprehensive processing statistics."""
|
||
|
|
stats = self._processing_stats.copy()
|
||
|
|
|
||
|
|
# Calculate derived metrics
|
||
|
|
if stats['strategies_processed'] > 0:
|
||
|
|
stats['average_signals_per_strategy'] = stats['total_signals_generated'] / stats['strategies_processed']
|
||
|
|
stats['average_processing_time_per_strategy'] = stats['processing_time_seconds'] / stats['strategies_processed']
|
||
|
|
else:
|
||
|
|
stats['average_signals_per_strategy'] = 0
|
||
|
|
stats['average_processing_time_per_strategy'] = 0
|
||
|
|
|
||
|
|
stats['error_rate'] = (stats['errors_count'] / max(stats['strategies_processed'], 1)) * 100
|
||
|
|
stats['validation_failure_rate'] = (stats['validation_failures'] / max(stats['total_signals_generated'], 1)) * 100
|
||
|
|
|
||
|
|
return stats
|
||
|
|
|
||
|
|
def process_strategies_parallel(
|
||
|
|
self,
|
||
|
|
strategy_configs: List[Dict[str, Any]],
|
||
|
|
symbols: List[str],
|
||
|
|
timeframe: str,
|
||
|
|
days_back: int,
|
||
|
|
exchange: str = "okx"
|
||
|
|
) -> Dict[str, List[StrategyResult]]:
|
||
|
|
"""
|
||
|
|
Process multiple strategies in parallel for improved performance.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
strategy_configs: List of strategy configurations
|
||
|
|
symbols: List of trading symbols to process
|
||
|
|
timeframe: Timeframe for processing
|
||
|
|
days_back: Number of days to look back
|
||
|
|
exchange: Exchange name
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
Dictionary mapping strategy names to their results
|
||
|
|
"""
|
||
|
|
try:
|
||
|
|
start_time = datetime.now()
|
||
|
|
self.logger.info(f"BacktestingBatchProcessor: Starting parallel processing of {len(strategy_configs)} strategies")
|
||
|
|
|
||
|
|
batch_results = {}
|
||
|
|
|
||
|
|
# Use ThreadPoolExecutor for parallel strategy processing
|
||
|
|
with ThreadPoolExecutor(max_workers=self.config.max_concurrent_strategies) as executor:
|
||
|
|
# Submit all strategy processing tasks
|
||
|
|
future_to_strategy = {}
|
||
|
|
|
||
|
|
for strategy_config in strategy_configs:
|
||
|
|
strategy_name = strategy_config.get('name', f'strategy_{len(future_to_strategy)}')
|
||
|
|
|
||
|
|
future = executor.submit(
|
||
|
|
self._process_single_strategy_batch,
|
||
|
|
strategy_config, symbols, timeframe, days_back, exchange
|
||
|
|
)
|
||
|
|
future_to_strategy[future] = strategy_name
|
||
|
|
|
||
|
|
# Collect results as they complete
|
||
|
|
completed_count = 0
|
||
|
|
for future in as_completed(future_to_strategy):
|
||
|
|
strategy_name = future_to_strategy[future]
|
||
|
|
|
||
|
|
try:
|
||
|
|
strategy_results = future.result()
|
||
|
|
batch_results[strategy_name] = strategy_results
|
||
|
|
self._processing_stats['strategies_processed'] += 1
|
||
|
|
|
||
|
|
completed_count += 1
|
||
|
|
|
||
|
|
# Progress reporting
|
||
|
|
if completed_count % self.config.progress_reporting_interval == 0:
|
||
|
|
progress = (completed_count / len(strategy_configs)) * 100
|
||
|
|
self.logger.info(f"Parallel processing progress: {progress:.1f}% ({completed_count}/{len(strategy_configs)} strategies)")
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
self.logger.error(f"Error processing strategy {strategy_name}: {e}")
|
||
|
|
self._processing_stats['errors_count'] += 1
|
||
|
|
batch_results[strategy_name] = []
|
||
|
|
|
||
|
|
# Memory check after each completed strategy
|
||
|
|
if self.config.enable_memory_monitoring:
|
||
|
|
self._check_memory_usage()
|
||
|
|
|
||
|
|
# Update final statistics
|
||
|
|
processing_time = (datetime.now() - start_time).total_seconds()
|
||
|
|
self._processing_stats['processing_time_seconds'] = processing_time
|
||
|
|
|
||
|
|
self.logger.info(f"BacktestingBatchProcessor: Completed parallel processing in {processing_time:.2f} seconds")
|
||
|
|
return batch_results
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
self.logger.error(f"Error in parallel batch processing: {e}")
|
||
|
|
self._processing_stats['errors_count'] += 1
|
||
|
|
return {}
|
||
|
|
|
||
|
|
def process_symbols_parallel(
|
||
|
|
self,
|
||
|
|
strategy_config: Dict[str, Any],
|
||
|
|
symbols: List[str],
|
||
|
|
timeframe: str,
|
||
|
|
days_back: int,
|
||
|
|
exchange: str = "okx"
|
||
|
|
) -> List[StrategyResult]:
|
||
|
|
"""
|
||
|
|
Process a single strategy across multiple symbols in parallel.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
strategy_config: Strategy configuration
|
||
|
|
symbols: List of trading symbols to process
|
||
|
|
timeframe: Timeframe for processing
|
||
|
|
days_back: Number of days to look back
|
||
|
|
exchange: Exchange name
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
List of strategy results across all symbols
|
||
|
|
"""
|
||
|
|
try:
|
||
|
|
strategy_name = strategy_config.get('name', 'unknown')
|
||
|
|
self.logger.info(f"BacktestingBatchProcessor: Processing {strategy_name} across {len(symbols)} symbols in parallel")
|
||
|
|
|
||
|
|
all_results = []
|
||
|
|
|
||
|
|
# Use ThreadPoolExecutor for parallel symbol processing
|
||
|
|
with ThreadPoolExecutor(max_workers=min(len(symbols), self.config.max_concurrent_strategies)) as executor:
|
||
|
|
# Submit symbol processing tasks
|
||
|
|
future_to_symbol = {}
|
||
|
|
|
||
|
|
for symbol in symbols:
|
||
|
|
future = executor.submit(
|
||
|
|
self._process_strategy_for_symbol,
|
||
|
|
strategy_config, symbol, timeframe, days_back, exchange
|
||
|
|
)
|
||
|
|
future_to_symbol[future] = symbol
|
||
|
|
|
||
|
|
# Collect results as they complete
|
||
|
|
for future in as_completed(future_to_symbol):
|
||
|
|
symbol = future_to_symbol[future]
|
||
|
|
|
||
|
|
try:
|
||
|
|
symbol_results = future.result()
|
||
|
|
all_results.extend(symbol_results)
|
||
|
|
|
||
|
|
# Update signal count
|
||
|
|
signal_count = sum(len(result.signals) for result in symbol_results)
|
||
|
|
self._processing_stats['total_signals_generated'] += signal_count
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
self.logger.error(f"Error processing {strategy_name} for {symbol}: {e}")
|
||
|
|
self._processing_stats['errors_count'] += 1
|
||
|
|
|
||
|
|
return all_results
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
self.logger.error(f"Error in parallel symbol processing: {e}")
|
||
|
|
self._processing_stats['errors_count'] += 1
|
||
|
|
return []
|
||
|
|
|
||
|
|
def _process_strategy_for_symbol(
|
||
|
|
self,
|
||
|
|
strategy_config: Dict[str, Any],
|
||
|
|
symbol: str,
|
||
|
|
timeframe: str,
|
||
|
|
days_back: int,
|
||
|
|
exchange: str
|
||
|
|
) -> List[StrategyResult]:
|
||
|
|
"""Process a single strategy for a single symbol."""
|
||
|
|
try:
|
||
|
|
strategy_name = strategy_config.get('name', 'unknown')
|
||
|
|
|
||
|
|
# Calculate strategy signals for this symbol
|
||
|
|
results = self.data_integrator.calculate_strategy_signals_orchestrated(
|
||
|
|
strategy_name=strategy_name,
|
||
|
|
strategy_config=strategy_config,
|
||
|
|
symbol=symbol,
|
||
|
|
timeframe=timeframe,
|
||
|
|
days_back=days_back,
|
||
|
|
exchange=exchange
|
||
|
|
)
|
||
|
|
|
||
|
|
# Validate results if enabled
|
||
|
|
if self.signal_validator and results:
|
||
|
|
validated_results = self._validate_strategy_results(results)
|
||
|
|
return validated_results
|
||
|
|
else:
|
||
|
|
return results
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
self.logger.error(f"Error processing {strategy_config.get('name', 'unknown')} for {symbol}: {e}")
|
||
|
|
return []
|
||
|
|
|
||
|
|
def process_large_dataset_streaming(
|
||
|
|
self,
|
||
|
|
strategy_configs: List[Dict[str, Any]],
|
||
|
|
symbols: List[str],
|
||
|
|
timeframe: str,
|
||
|
|
total_days_back: int,
|
||
|
|
exchange: str = "okx"
|
||
|
|
) -> Iterator[Dict[str, List[StrategyResult]]]:
|
||
|
|
"""
|
||
|
|
Process large datasets using streaming approach with memory-efficient chunking.
|
||
|
|
|
||
|
|
This method processes data in chunks to avoid memory overflow when dealing
|
||
|
|
with very large historical datasets.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
strategy_configs: List of strategy configurations
|
||
|
|
symbols: List of trading symbols to process
|
||
|
|
timeframe: Timeframe for processing
|
||
|
|
total_days_back: Total number of days to process
|
||
|
|
exchange: Exchange name
|
||
|
|
|
||
|
|
Yields:
|
||
|
|
Dictionary chunks mapping strategy names to their results
|
||
|
|
"""
|
||
|
|
try:
|
||
|
|
chunk_size = self.config.chunk_size_days
|
||
|
|
total_chunks = (total_days_back + chunk_size - 1) // chunk_size # Ceiling division
|
||
|
|
|
||
|
|
self.logger.info(f"BacktestingBatchProcessor: Starting streaming processing of {total_days_back} days in {total_chunks} chunks")
|
||
|
|
|
||
|
|
for chunk_index in range(total_chunks):
|
||
|
|
# Calculate date range for this chunk
|
||
|
|
chunk_start_days = chunk_index * chunk_size
|
||
|
|
chunk_end_days = min((chunk_index + 1) * chunk_size, total_days_back)
|
||
|
|
chunk_days = chunk_end_days - chunk_start_days
|
||
|
|
|
||
|
|
self.logger.info(f"Processing chunk {chunk_index + 1}/{total_chunks}: {chunk_days} days")
|
||
|
|
|
||
|
|
# Memory check before processing chunk
|
||
|
|
if self.config.enable_memory_monitoring:
|
||
|
|
self._check_memory_usage()
|
||
|
|
|
||
|
|
# Process chunk using parallel processing
|
||
|
|
chunk_results = self.process_strategies_parallel(
|
||
|
|
strategy_configs=strategy_configs,
|
||
|
|
symbols=symbols,
|
||
|
|
timeframe=timeframe,
|
||
|
|
days_back=chunk_days,
|
||
|
|
exchange=exchange
|
||
|
|
)
|
||
|
|
|
||
|
|
# Yield chunk results
|
||
|
|
yield chunk_results
|
||
|
|
|
||
|
|
# Force cleanup after each chunk to manage memory
|
||
|
|
self._cleanup_memory()
|
||
|
|
|
||
|
|
# Report progress
|
||
|
|
progress = ((chunk_index + 1) / total_chunks) * 100
|
||
|
|
self.logger.info(f"Streaming progress: {progress:.1f}% ({chunk_index + 1}/{total_chunks} chunks)")
|
||
|
|
|
||
|
|
self.logger.info("BacktestingBatchProcessor: Completed streaming processing")
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
self.logger.error(f"Error in streaming processing: {e}")
|
||
|
|
self._processing_stats['errors_count'] += 1
|
||
|
|
|
||
|
|
def aggregate_streaming_results(
|
||
|
|
self,
|
||
|
|
result_stream: Iterator[Dict[str, List[StrategyResult]]]
|
||
|
|
) -> Dict[str, List[StrategyResult]]:
|
||
|
|
"""
|
||
|
|
Aggregate results from streaming processing.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
result_stream: Iterator of result chunks
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
Aggregated results across all chunks
|
||
|
|
"""
|
||
|
|
try:
|
||
|
|
aggregated_results = {}
|
||
|
|
chunk_count = 0
|
||
|
|
|
||
|
|
for chunk_results in result_stream:
|
||
|
|
chunk_count += 1
|
||
|
|
|
||
|
|
for strategy_name, strategy_results in chunk_results.items():
|
||
|
|
if strategy_name not in aggregated_results:
|
||
|
|
aggregated_results[strategy_name] = []
|
||
|
|
|
||
|
|
aggregated_results[strategy_name].extend(strategy_results)
|
||
|
|
|
||
|
|
# Periodic memory cleanup during aggregation
|
||
|
|
if chunk_count % 5 == 0: # Every 5 chunks
|
||
|
|
self._cleanup_memory()
|
||
|
|
|
||
|
|
# Final statistics
|
||
|
|
total_strategies = len(aggregated_results)
|
||
|
|
total_results = sum(len(results) for results in aggregated_results.values())
|
||
|
|
|
||
|
|
self.logger.info(f"Aggregated {total_results} results across {total_strategies} strategies from {chunk_count} chunks")
|
||
|
|
|
||
|
|
return aggregated_results
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
self.logger.error(f"Error aggregating streaming results: {e}")
|
||
|
|
return {}
|
||
|
|
|
||
|
|
def process_with_memory_constraints(
|
||
|
|
self,
|
||
|
|
strategy_configs: List[Dict[str, Any]],
|
||
|
|
symbols: List[str],
|
||
|
|
timeframe: str,
|
||
|
|
days_back: int,
|
||
|
|
max_memory_mb: float,
|
||
|
|
exchange: str = "okx"
|
||
|
|
) -> Dict[str, List[StrategyResult]]:
|
||
|
|
"""
|
||
|
|
Process strategies with strict memory constraints.
|
||
|
|
|
||
|
|
Automatically adjusts processing approach based on available memory.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
strategy_configs: List of strategy configurations
|
||
|
|
symbols: List of trading symbols to process
|
||
|
|
timeframe: Timeframe for processing
|
||
|
|
days_back: Number of days to look back
|
||
|
|
max_memory_mb: Maximum memory usage in MB
|
||
|
|
exchange: Exchange name
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
Dictionary mapping strategy names to their results
|
||
|
|
"""
|
||
|
|
try:
|
||
|
|
# Check current memory usage
|
||
|
|
current_memory = psutil.Process(os.getpid()).memory_info().rss / 1024 / 1024
|
||
|
|
available_memory = max_memory_mb - current_memory
|
||
|
|
|
||
|
|
self.logger.info(f"Memory-constrained processing: {available_memory:.1f} MB available of {max_memory_mb:.1f} MB limit")
|
||
|
|
|
||
|
|
# Estimate memory requirements
|
||
|
|
estimated_memory_per_strategy = 50 # MB - rough estimate
|
||
|
|
estimated_total_memory = len(strategy_configs) * len(symbols) * estimated_memory_per_strategy
|
||
|
|
|
||
|
|
if estimated_total_memory <= available_memory:
|
||
|
|
# Sufficient memory for parallel processing
|
||
|
|
self.logger.info("Sufficient memory available - using parallel processing")
|
||
|
|
return self.process_strategies_parallel(
|
||
|
|
strategy_configs, symbols, timeframe, days_back, exchange
|
||
|
|
)
|
||
|
|
|
||
|
|
elif estimated_total_memory <= available_memory * 2:
|
||
|
|
# Moderate memory constraint - use sequential processing
|
||
|
|
self.logger.info("Moderate memory constraint - using sequential processing")
|
||
|
|
return self.process_strategies_batch(
|
||
|
|
strategy_configs, symbols, timeframe, days_back, exchange
|
||
|
|
)
|
||
|
|
|
||
|
|
else:
|
||
|
|
# Severe memory constraint - use streaming with warm-up
|
||
|
|
self.logger.info("Severe memory constraint - using streaming processing with warm-up")
|
||
|
|
stream = self.process_large_dataset_streaming_with_warmup(
|
||
|
|
strategy_configs, symbols, timeframe, days_back, exchange
|
||
|
|
)
|
||
|
|
return self.aggregate_streaming_results(stream)
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
self.logger.error(f"Error in memory-constrained processing: {e}")
|
||
|
|
self._processing_stats['errors_count'] += 1
|
||
|
|
return {}
|
||
|
|
|
||
|
|
def get_performance_metrics(self) -> Dict[str, Any]:
|
||
|
|
"""
|
||
|
|
Get comprehensive performance metrics for batch processing.
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
Dictionary containing detailed performance metrics
|
||
|
|
"""
|
||
|
|
stats = self.get_processing_statistics()
|
||
|
|
|
||
|
|
# Enhanced metrics
|
||
|
|
enhanced_metrics = {
|
||
|
|
**stats,
|
||
|
|
'cache_hit_rate': self._calculate_cache_hit_rate(),
|
||
|
|
'memory_efficiency': self._calculate_memory_efficiency(),
|
||
|
|
'throughput_signals_per_second': self._calculate_throughput(),
|
||
|
|
'parallel_efficiency': self._calculate_parallel_efficiency(),
|
||
|
|
'optimization_recommendations': self._generate_optimization_recommendations()
|
||
|
|
}
|
||
|
|
|
||
|
|
return enhanced_metrics
|
||
|
|
|
||
|
|
def _calculate_cache_hit_rate(self) -> float:
|
||
|
|
"""Calculate cache hit rate from data integrator."""
|
||
|
|
try:
|
||
|
|
cache_stats = self.data_integrator.get_cache_stats()
|
||
|
|
total_requests = cache_stats.get('cache_hits', 0) + cache_stats.get('cache_misses', 0)
|
||
|
|
if total_requests > 0:
|
||
|
|
return (cache_stats.get('cache_hits', 0) / total_requests) * 100
|
||
|
|
return 0.0
|
||
|
|
except Exception:
|
||
|
|
return 0.0
|
||
|
|
|
||
|
|
def _calculate_memory_efficiency(self) -> float:
|
||
|
|
"""Calculate memory efficiency score."""
|
||
|
|
peak_memory = self._processing_stats.get('memory_peak_mb', 0)
|
||
|
|
strategies_processed = self._processing_stats.get('strategies_processed', 1)
|
||
|
|
|
||
|
|
if peak_memory > 0 and strategies_processed > 0:
|
||
|
|
# Memory per strategy in MB (lower is better)
|
||
|
|
memory_per_strategy = peak_memory / strategies_processed
|
||
|
|
|
||
|
|
# Efficiency score (0-100, higher is better)
|
||
|
|
# Assuming 100MB per strategy is baseline (50% efficiency)
|
||
|
|
baseline_memory = 100 # MB
|
||
|
|
efficiency = max(0, min(100, (baseline_memory / memory_per_strategy) * 50))
|
||
|
|
return efficiency
|
||
|
|
|
||
|
|
return 100.0 # Perfect efficiency if no memory tracked
|
||
|
|
|
||
|
|
def _calculate_throughput(self) -> float:
|
||
|
|
"""Calculate signals processed per second."""
|
||
|
|
total_signals = self._processing_stats.get('total_signals_generated', 0)
|
||
|
|
processing_time = self._processing_stats.get('processing_time_seconds', 1)
|
||
|
|
|
||
|
|
if processing_time > 0:
|
||
|
|
return total_signals / processing_time
|
||
|
|
return 0.0
|
||
|
|
|
||
|
|
def _calculate_parallel_efficiency(self) -> float:
|
||
|
|
"""Calculate parallel processing efficiency."""
|
||
|
|
strategies_processed = self._processing_stats.get('strategies_processed', 0)
|
||
|
|
processing_time = self._processing_stats.get('processing_time_seconds', 1)
|
||
|
|
max_workers = self.config.max_concurrent_strategies
|
||
|
|
|
||
|
|
if strategies_processed > 0 and processing_time > 0:
|
||
|
|
# Theoretical minimum time if perfectly parallel
|
||
|
|
avg_time_per_strategy = processing_time / strategies_processed
|
||
|
|
theoretical_min_time = avg_time_per_strategy * (strategies_processed / max_workers)
|
||
|
|
|
||
|
|
# Efficiency as percentage of theoretical optimum
|
||
|
|
efficiency = min(100, (theoretical_min_time / processing_time) * 100)
|
||
|
|
return efficiency
|
||
|
|
|
||
|
|
return 100.0
|
||
|
|
|
||
|
|
def _generate_optimization_recommendations(self) -> List[str]:
|
||
|
|
"""Generate optimization recommendations based on performance metrics."""
|
||
|
|
recommendations = []
|
||
|
|
|
||
|
|
# Memory recommendations
|
||
|
|
memory_efficiency = self._calculate_memory_efficiency()
|
||
|
|
if memory_efficiency < 50:
|
||
|
|
recommendations.append("Consider reducing chunk_size_days or max_concurrent_strategies to improve memory efficiency")
|
||
|
|
|
||
|
|
# Cache recommendations
|
||
|
|
cache_hit_rate = self._calculate_cache_hit_rate()
|
||
|
|
if cache_hit_rate < 30:
|
||
|
|
recommendations.append("Enable indicator caching or increase cache timeout to improve performance")
|
||
|
|
|
||
|
|
# Parallel efficiency recommendations
|
||
|
|
parallel_efficiency = self._calculate_parallel_efficiency()
|
||
|
|
if parallel_efficiency < 70:
|
||
|
|
recommendations.append("Consider adjusting max_concurrent_strategies based on system capabilities")
|
||
|
|
|
||
|
|
# Error rate recommendations
|
||
|
|
error_rate = self._processing_stats.get('error_rate', 0)
|
||
|
|
if error_rate > 10:
|
||
|
|
recommendations.append("High error rate detected - check data availability and strategy configurations")
|
||
|
|
|
||
|
|
# Throughput recommendations
|
||
|
|
throughput = self._calculate_throughput()
|
||
|
|
if throughput < 1.0: # Less than 1 signal per second
|
||
|
|
recommendations.append("Low throughput detected - consider optimizing strategy calculations or using simpler indicators")
|
||
|
|
|
||
|
|
if not recommendations:
|
||
|
|
recommendations.append("Performance metrics are within acceptable ranges")
|
||
|
|
|
||
|
|
return recommendations
|
||
|
|
|
||
|
|
def optimize_configuration(self) -> BatchProcessingConfig:
|
||
|
|
"""
|
||
|
|
Automatically optimize configuration based on current performance metrics.
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
Optimized configuration
|
||
|
|
"""
|
||
|
|
try:
|
||
|
|
current_config = self.config
|
||
|
|
optimized_config = BatchProcessingConfig()
|
||
|
|
|
||
|
|
# Copy current values as baseline
|
||
|
|
optimized_config.max_concurrent_strategies = current_config.max_concurrent_strategies
|
||
|
|
optimized_config.chunk_size_days = current_config.chunk_size_days
|
||
|
|
optimized_config.max_memory_usage_percent = current_config.max_memory_usage_percent
|
||
|
|
optimized_config.result_cache_size = current_config.result_cache_size
|
||
|
|
|
||
|
|
# Get current metrics
|
||
|
|
memory_efficiency = self._calculate_memory_efficiency()
|
||
|
|
parallel_efficiency = self._calculate_parallel_efficiency()
|
||
|
|
error_rate = self._processing_stats.get('error_rate', 0)
|
||
|
|
|
||
|
|
# Optimize based on metrics
|
||
|
|
if memory_efficiency < 50:
|
||
|
|
# Reduce memory pressure
|
||
|
|
optimized_config.max_concurrent_strategies = max(1, current_config.max_concurrent_strategies - 1)
|
||
|
|
optimized_config.chunk_size_days = max(7, current_config.chunk_size_days - 7)
|
||
|
|
|
||
|
|
elif memory_efficiency > 80 and parallel_efficiency < 70:
|
||
|
|
# Increase parallelism if memory allows
|
||
|
|
optimized_config.max_concurrent_strategies = min(8, current_config.max_concurrent_strategies + 1)
|
||
|
|
|
||
|
|
if error_rate > 10:
|
||
|
|
# Reduce load to minimize errors
|
||
|
|
optimized_config.max_concurrent_strategies = max(1, current_config.max_concurrent_strategies - 1)
|
||
|
|
|
||
|
|
# Cache optimization
|
||
|
|
cache_hit_rate = self._calculate_cache_hit_rate()
|
||
|
|
if cache_hit_rate < 30:
|
||
|
|
optimized_config.result_cache_size = min(2000, current_config.result_cache_size * 2)
|
||
|
|
|
||
|
|
self.logger.info(f"Configuration optimized: workers {current_config.max_concurrent_strategies} -> {optimized_config.max_concurrent_strategies}, "
|
||
|
|
f"chunk_size {current_config.chunk_size_days} -> {optimized_config.chunk_size_days}")
|
||
|
|
|
||
|
|
return optimized_config
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
self.logger.error(f"Error optimizing configuration: {e}")
|
||
|
|
return self.config
|
||
|
|
|
||
|
|
def benchmark_processing_methods(
|
||
|
|
self,
|
||
|
|
strategy_configs: List[Dict[str, Any]],
|
||
|
|
symbols: List[str],
|
||
|
|
timeframe: str,
|
||
|
|
days_back: int,
|
||
|
|
exchange: str = "okx"
|
||
|
|
) -> Dict[str, Dict[str, Any]]:
|
||
|
|
"""
|
||
|
|
Benchmark different processing methods to determine optimal approach.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
strategy_configs: List of strategy configurations
|
||
|
|
symbols: List of trading symbols to process
|
||
|
|
timeframe: Timeframe for processing
|
||
|
|
days_back: Number of days to look back
|
||
|
|
exchange: Exchange name
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
Dictionary containing benchmark results for each method
|
||
|
|
"""
|
||
|
|
try:
|
||
|
|
self.logger.info("Starting processing method benchmark")
|
||
|
|
benchmark_results = {}
|
||
|
|
|
||
|
|
# Test sequential processing
|
||
|
|
start_time = datetime.now()
|
||
|
|
self._reset_stats()
|
||
|
|
|
||
|
|
sequential_results = self.process_strategies_batch(
|
||
|
|
strategy_configs, symbols, timeframe, min(days_back, 7), exchange # Limit to 7 days for benchmark
|
||
|
|
)
|
||
|
|
|
||
|
|
sequential_time = (datetime.now() - start_time).total_seconds()
|
||
|
|
benchmark_results['sequential'] = {
|
||
|
|
'processing_time': sequential_time,
|
||
|
|
'results_count': sum(len(results) for results in sequential_results.values()),
|
||
|
|
'memory_peak_mb': self._processing_stats.get('memory_peak_mb', 0),
|
||
|
|
'throughput': self._calculate_throughput()
|
||
|
|
}
|
||
|
|
|
||
|
|
# Test parallel processing
|
||
|
|
start_time = datetime.now()
|
||
|
|
self._reset_stats()
|
||
|
|
|
||
|
|
parallel_results = self.process_strategies_parallel(
|
||
|
|
strategy_configs, symbols, timeframe, min(days_back, 7), exchange
|
||
|
|
)
|
||
|
|
|
||
|
|
parallel_time = (datetime.now() - start_time).total_seconds()
|
||
|
|
benchmark_results['parallel'] = {
|
||
|
|
'processing_time': parallel_time,
|
||
|
|
'results_count': sum(len(results) for results in parallel_results.values()),
|
||
|
|
'memory_peak_mb': self._processing_stats.get('memory_peak_mb', 0),
|
||
|
|
'throughput': self._calculate_throughput()
|
||
|
|
}
|
||
|
|
|
||
|
|
# Calculate speedup
|
||
|
|
if sequential_time > 0:
|
||
|
|
speedup = sequential_time / parallel_time
|
||
|
|
benchmark_results['parallel']['speedup'] = speedup
|
||
|
|
|
||
|
|
# Recommend best method
|
||
|
|
if parallel_time < sequential_time * 0.8: # 20% improvement threshold
|
||
|
|
benchmark_results['recommendation'] = 'parallel'
|
||
|
|
else:
|
||
|
|
benchmark_results['recommendation'] = 'sequential'
|
||
|
|
|
||
|
|
self.logger.info(f"Benchmark completed. Recommended method: {benchmark_results['recommendation']}")
|
||
|
|
return benchmark_results
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
self.logger.error(f"Error in benchmark: {e}")
|
||
|
|
return {}
|
||
|
|
|
||
|
|
def _reset_stats(self) -> None:
|
||
|
|
"""Reset processing statistics for benchmarking."""
|
||
|
|
self._processing_stats = {
|
||
|
|
'strategies_processed': 0,
|
||
|
|
'total_signals_generated': 0,
|
||
|
|
'processing_time_seconds': 0.0,
|
||
|
|
'memory_peak_mb': 0.0,
|
||
|
|
'errors_count': 0,
|
||
|
|
'validation_failures': 0
|
||
|
|
}
|
||
|
|
|
||
|
|
# Clear result cache
|
||
|
|
self._result_cache.clear()
|
||
|
|
|
||
|
|
self.logger.debug("BacktestingBatchProcessor: Reset processing statistics and cleared result cache")
|
||
|
|
|
||
|
|
def _calculate_warmup_period(self, strategy_configs: List[Dict[str, Any]]) -> int:
|
||
|
|
"""
|
||
|
|
Calculate the maximum warm-up period needed across all strategies.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
strategy_configs: List of strategy configurations
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
Maximum warm-up period in number of periods
|
||
|
|
"""
|
||
|
|
max_warmup = 0
|
||
|
|
|
||
|
|
for strategy_config in strategy_configs:
|
||
|
|
strategy_name = strategy_config.get('name', 'unknown')
|
||
|
|
|
||
|
|
# Common indicator warm-up requirements
|
||
|
|
indicator_warmups = {
|
||
|
|
'ema': strategy_config.get('slow_period', strategy_config.get('period', 26)),
|
||
|
|
'sma': strategy_config.get('period', 20),
|
||
|
|
'rsi': strategy_config.get('period', 14),
|
||
|
|
'macd': max(
|
||
|
|
strategy_config.get('slow_period', 26),
|
||
|
|
strategy_config.get('signal_period', 9)
|
||
|
|
),
|
||
|
|
'bollinger': strategy_config.get('period', 20),
|
||
|
|
'stochastic': strategy_config.get('k_period', 14)
|
||
|
|
}
|
||
|
|
|
||
|
|
# Determine strategy type and required warm-up
|
||
|
|
if 'ema' in strategy_name.lower():
|
||
|
|
warmup = max(
|
||
|
|
strategy_config.get('fast_period', 12),
|
||
|
|
strategy_config.get('slow_period', 26)
|
||
|
|
)
|
||
|
|
elif 'macd' in strategy_name.lower():
|
||
|
|
warmup = max(
|
||
|
|
strategy_config.get('slow_period', 26),
|
||
|
|
strategy_config.get('signal_period', 9)
|
||
|
|
) + 10 # Additional buffer for MACD convergence
|
||
|
|
elif 'rsi' in strategy_name.lower():
|
||
|
|
warmup = strategy_config.get('period', 14) + 5 # Additional buffer for RSI stabilization
|
||
|
|
else:
|
||
|
|
# Generic estimation based on common indicators
|
||
|
|
warmup = 30 # Conservative default
|
||
|
|
|
||
|
|
max_warmup = max(max_warmup, warmup)
|
||
|
|
|
||
|
|
# Add safety buffer
|
||
|
|
max_warmup += 10
|
||
|
|
|
||
|
|
self.logger.debug(f"Calculated warm-up period: {max_warmup} periods")
|
||
|
|
return max_warmup
|
||
|
|
|
||
|
|
def process_large_dataset_streaming_with_warmup(
|
||
|
|
self,
|
||
|
|
strategy_configs: List[Dict[str, Any]],
|
||
|
|
symbols: List[str],
|
||
|
|
timeframe: str,
|
||
|
|
total_days_back: int,
|
||
|
|
exchange: str = "okx"
|
||
|
|
) -> Iterator[Dict[str, List[StrategyResult]]]:
|
||
|
|
"""
|
||
|
|
Process large datasets using streaming approach with proper warm-up period handling.
|
||
|
|
|
||
|
|
This method ensures indicator continuity across chunks by including warm-up data
|
||
|
|
from previous chunks and trimming overlapping results.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
strategy_configs: List of strategy configurations
|
||
|
|
symbols: List of trading symbols to process
|
||
|
|
timeframe: Timeframe for processing
|
||
|
|
total_days_back: Total number of days to process
|
||
|
|
exchange: Exchange name
|
||
|
|
|
||
|
|
Yields:
|
||
|
|
Dictionary chunks mapping strategy names to their results (with overlaps removed)
|
||
|
|
"""
|
||
|
|
try:
|
||
|
|
chunk_size = self.config.chunk_size_days
|
||
|
|
warmup_days = self._calculate_warmup_period(strategy_configs)
|
||
|
|
|
||
|
|
# Adjust chunk size if it's too small relative to warm-up
|
||
|
|
if chunk_size <= warmup_days:
|
||
|
|
adjusted_chunk_size = warmup_days * 2
|
||
|
|
self.logger.warning(f"Chunk size ({chunk_size}) too small for warm-up ({warmup_days}). "
|
||
|
|
f"Adjusting to {adjusted_chunk_size} days")
|
||
|
|
chunk_size = adjusted_chunk_size
|
||
|
|
|
||
|
|
total_chunks = (total_days_back + chunk_size - 1) // chunk_size
|
||
|
|
|
||
|
|
self.logger.info(f"BacktestingBatchProcessor: Starting streaming with warm-up processing of "
|
||
|
|
f"{total_days_back} days in {total_chunks} chunks (warm-up: {warmup_days} days)")
|
||
|
|
|
||
|
|
for chunk_index in range(total_chunks):
|
||
|
|
# Calculate date range for this chunk
|
||
|
|
chunk_start_days = chunk_index * chunk_size
|
||
|
|
chunk_end_days = min((chunk_index + 1) * chunk_size, total_days_back)
|
||
|
|
|
||
|
|
# Include warm-up data for chunks after the first
|
||
|
|
if chunk_index == 0:
|
||
|
|
# First chunk: no warm-up available, process as-is
|
||
|
|
processing_start_days = chunk_start_days
|
||
|
|
processing_days = chunk_end_days - chunk_start_days
|
||
|
|
trim_warmup = False
|
||
|
|
else:
|
||
|
|
# Subsequent chunks: include warm-up from previous data
|
||
|
|
processing_start_days = max(0, chunk_start_days - warmup_days)
|
||
|
|
processing_days = chunk_end_days - processing_start_days
|
||
|
|
trim_warmup = True
|
||
|
|
|
||
|
|
self.logger.info(f"Processing chunk {chunk_index + 1}/{total_chunks}: "
|
||
|
|
f"target days {chunk_start_days}-{chunk_end_days}, "
|
||
|
|
f"processing days {processing_start_days}-{chunk_end_days} "
|
||
|
|
f"(warm-up: {warmup_days if trim_warmup else 0})")
|
||
|
|
|
||
|
|
# Memory check before processing chunk
|
||
|
|
if self.config.enable_memory_monitoring:
|
||
|
|
self._check_memory_usage()
|
||
|
|
|
||
|
|
# Process chunk with warm-up data
|
||
|
|
chunk_results = self.process_strategies_parallel(
|
||
|
|
strategy_configs=strategy_configs,
|
||
|
|
symbols=symbols,
|
||
|
|
timeframe=timeframe,
|
||
|
|
days_back=processing_days,
|
||
|
|
exchange=exchange
|
||
|
|
)
|
||
|
|
|
||
|
|
# Trim warm-up period from results to avoid overlaps
|
||
|
|
if trim_warmup:
|
||
|
|
chunk_results = self._trim_warmup_from_results(
|
||
|
|
chunk_results, warmup_days, chunk_start_days, chunk_end_days
|
||
|
|
)
|
||
|
|
|
||
|
|
# Yield processed chunk results
|
||
|
|
yield chunk_results
|
||
|
|
|
||
|
|
# Force cleanup after each chunk to manage memory
|
||
|
|
self._cleanup_memory()
|
||
|
|
|
||
|
|
# Report progress
|
||
|
|
progress = ((chunk_index + 1) / total_chunks) * 100
|
||
|
|
self.logger.info(f"Streaming progress: {progress:.1f}% ({chunk_index + 1}/{total_chunks} chunks)")
|
||
|
|
|
||
|
|
self.logger.info("BacktestingBatchProcessor: Completed streaming processing with warm-up")
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
self.logger.error(f"Error in streaming processing with warm-up: {e}")
|
||
|
|
self._processing_stats['errors_count'] += 1
|
||
|
|
|
||
|
|
def _trim_warmup_from_results(
|
||
|
|
self,
|
||
|
|
chunk_results: Dict[str, List[StrategyResult]],
|
||
|
|
warmup_days: int,
|
||
|
|
target_start_days: int,
|
||
|
|
target_end_days: int
|
||
|
|
) -> Dict[str, List[StrategyResult]]:
|
||
|
|
"""
|
||
|
|
Trim warm-up period from chunk results to avoid overlapping data.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
chunk_results: Results from chunk processing (including warm-up)
|
||
|
|
warmup_days: Number of warm-up days to trim
|
||
|
|
target_start_days: Target start day for this chunk
|
||
|
|
target_end_days: Target end day for this chunk
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
Trimmed results containing only the target date range
|
||
|
|
"""
|
||
|
|
try:
|
||
|
|
from datetime import datetime, timedelta
|
||
|
|
|
||
|
|
trimmed_results = {}
|
||
|
|
|
||
|
|
# Calculate cutoff timestamp (approximate, since we don't have exact start date)
|
||
|
|
# This is a simplified approach - in production, you'd use actual timestamps
|
||
|
|
|
||
|
|
for strategy_name, strategy_results in chunk_results.items():
|
||
|
|
if not strategy_results:
|
||
|
|
trimmed_results[strategy_name] = []
|
||
|
|
continue
|
||
|
|
|
||
|
|
# Sort results by timestamp to identify warm-up period
|
||
|
|
sorted_results = sorted(strategy_results, key=lambda r: r.timestamp)
|
||
|
|
|
||
|
|
# Simple approach: remove first portion equivalent to warm-up days
|
||
|
|
# This assumes roughly uniform distribution of signals
|
||
|
|
total_results = len(sorted_results)
|
||
|
|
if total_results > 0:
|
||
|
|
# Estimate warm-up portion based on proportion
|
||
|
|
processing_days = target_end_days - max(0, target_start_days - warmup_days)
|
||
|
|
target_days = target_end_days - target_start_days
|
||
|
|
|
||
|
|
if processing_days > target_days:
|
||
|
|
warmup_proportion = warmup_days / processing_days
|
||
|
|
warmup_count = int(total_results * warmup_proportion)
|
||
|
|
|
||
|
|
# Keep results after warm-up period
|
||
|
|
trimmed_results[strategy_name] = sorted_results[warmup_count:]
|
||
|
|
|
||
|
|
self.logger.debug(f"Strategy {strategy_name}: trimmed {warmup_count}/{total_results} warm-up results")
|
||
|
|
else:
|
||
|
|
# No trimming needed
|
||
|
|
trimmed_results[strategy_name] = sorted_results
|
||
|
|
else:
|
||
|
|
trimmed_results[strategy_name] = []
|
||
|
|
|
||
|
|
return trimmed_results
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
self.logger.error(f"Error trimming warm-up from results: {e}")
|
||
|
|
return chunk_results # Return original results if trimming fails
|