Cycles/backtest_runner.py

462 lines
16 KiB
Python

import pandas as pd
import concurrent.futures
import logging
from typing import List, Tuple, Dict, Any, Optional
from cycles.utils.storage import Storage
from cycles.utils.system import SystemUtils
from cycles.utils.progress_manager import ProgressManager
from result_processor import ResultProcessor
def _process_single_task_static(task: Tuple[str, str, pd.DataFrame, float, float], progress_callback=None) -> Tuple[List[Dict], List[Dict]]:
"""
Static version of _process_single_task for use with ProcessPoolExecutor
Args:
task: Tuple of (task_id, timeframe, data_1min, stop_loss_pct, initial_usd)
progress_callback: Optional progress callback function
Returns:
Tuple of (results, trades)
"""
task_id, timeframe, data_1min, stop_loss_pct, initial_usd = task
try:
if timeframe == "1T" or timeframe == "1min":
df = data_1min.copy()
else:
df = _resample_data_static(data_1min, timeframe)
# Create required components for processing
from cycles.utils.storage import Storage
from result_processor import ResultProcessor
# Create storage with default paths (for subprocess)
storage = Storage()
result_processor = ResultProcessor(storage)
results, trades = result_processor.process_timeframe_results(
data_1min,
df,
[stop_loss_pct],
timeframe,
initial_usd,
progress_callback=progress_callback
)
return results, trades
except Exception as e:
error_msg = f"Failed to process {timeframe} with stop loss {stop_loss_pct}: {e}"
raise RuntimeError(error_msg) from e
def _resample_data_static(data_1min: pd.DataFrame, timeframe: str) -> pd.DataFrame:
"""
Static function to resample 1-minute data to specified timeframe
Args:
data_1min: 1-minute data DataFrame
timeframe: Target timeframe string
Returns:
Resampled DataFrame
"""
try:
agg_dict = {
'open': 'first',
'high': 'max',
'low': 'min',
'close': 'last',
'volume': 'sum'
}
if 'predicted_close_price' in data_1min.columns:
agg_dict['predicted_close_price'] = 'last'
resampled = data_1min.resample(timeframe).agg(agg_dict).dropna()
return resampled.reset_index()
except Exception as e:
error_msg = f"Failed to resample data to {timeframe}: {e}"
raise ValueError(error_msg) from e
class BacktestRunner:
"""Handles the execution of backtests across multiple timeframes and parameters"""
def __init__(
self,
storage: Storage,
system_utils: SystemUtils,
result_processor: ResultProcessor,
logging_instance: Optional[logging.Logger] = None,
show_progress: bool = True
):
"""
Initialize backtest runner
Args:
storage: Storage instance for data operations
system_utils: System utilities for resource management
result_processor: Result processor for handling outputs
logging_instance: Optional logging instance
show_progress: Whether to show visual progress bars
"""
self.storage = storage
self.system_utils = system_utils
self.result_processor = result_processor
self.logging = logging_instance
self.show_progress = show_progress
self.progress_manager = ProgressManager() if show_progress else None
def run_backtests(
self,
data_1min: pd.DataFrame,
timeframes: List[str],
stop_loss_pcts: List[float],
initial_usd: float,
debug: bool = False
) -> Tuple[List[Dict], List[Dict]]:
"""
Run backtests across all timeframe and stop loss combinations
Args:
data_1min: 1-minute data DataFrame
timeframes: List of timeframe strings (e.g., ['1D', '6h'])
stop_loss_pcts: List of stop loss percentages
initial_usd: Initial USD amount
debug: Whether to enable debug mode
Returns:
Tuple of (all_results, all_trades)
"""
# Create tasks for all combinations
tasks = self._create_tasks(timeframes, stop_loss_pcts, data_1min, initial_usd)
if self.logging:
self.logging.info(f"Starting {len(tasks)} backtest tasks")
if debug:
return self._run_sequential(tasks)
else:
return self._run_parallel(tasks)
def _create_tasks(
self,
timeframes: List[str],
stop_loss_pcts: List[float],
data_1min: pd.DataFrame,
initial_usd: float
) -> List[Tuple]:
"""Create task tuples for processing"""
tasks = []
for timeframe in timeframes:
for stop_loss_pct in stop_loss_pcts:
task_id = f"{timeframe}_{stop_loss_pct}"
task = (task_id, timeframe, data_1min, stop_loss_pct, initial_usd)
tasks.append(task)
return tasks
def _run_sequential(self, tasks: List[Tuple]) -> Tuple[List[Dict], List[Dict]]:
"""Run tasks sequentially (for debug mode)"""
# Initialize progress tracking if enabled
if self.progress_manager:
for task in tasks:
task_id, timeframe, data_1min, stop_loss_pct, initial_usd = task
# Calculate actual DataFrame size that will be processed
if timeframe == "1T" or timeframe == "1min":
actual_df_size = len(data_1min)
else:
# Get the actual resampled DataFrame size
temp_df = self._resample_data(data_1min, timeframe)
actual_df_size = len(temp_df)
task_name = f"{timeframe} SL:{stop_loss_pct:.0%}"
self.progress_manager.start_task(task_id, task_name, actual_df_size)
self.progress_manager.start_display()
all_results = []
all_trades = []
try:
for task in tasks:
try:
# Get progress callback for this task if available
progress_callback = None
if self.progress_manager:
progress_callback = self.progress_manager.get_task_progress_callback(task[0])
results, trades = self._process_single_task(task, progress_callback)
if results:
all_results.extend(results)
if trades:
all_trades.extend(trades)
# Mark task as completed
if self.progress_manager:
self.progress_manager.complete_task(task[0])
except Exception as e:
error_msg = f"Error processing task {task[1]} with stop loss {task[3]}: {e}"
if self.logging:
self.logging.error(error_msg)
raise RuntimeError(error_msg) from e
finally:
# Stop progress display
if self.progress_manager:
self.progress_manager.stop_display()
return all_results, all_trades
def _run_parallel(self, tasks: List[Tuple]) -> Tuple[List[Dict], List[Dict]]:
"""Run tasks in parallel using ProcessPoolExecutor"""
workers = self.system_utils.get_optimal_workers()
if self.logging:
self.logging.info(f"Running {len(tasks)} tasks with {workers} workers")
# OPTIMIZATION: Disable progress manager for parallel execution to reduce overhead
# Progress tracking adds significant overhead in multiprocessing
if self.progress_manager and self.logging:
self.logging.info("Progress tracking disabled for parallel execution (performance optimization)")
all_results = []
all_trades = []
completed_tasks = 0
try:
with concurrent.futures.ProcessPoolExecutor(max_workers=workers) as executor:
future_to_task = {
executor.submit(_process_single_task_static, task): task
for task in tasks
}
for future in concurrent.futures.as_completed(future_to_task):
task = future_to_task[future]
try:
results, trades = future.result()
if results:
all_results.extend(results)
if trades:
all_trades.extend(trades)
completed_tasks += 1
if self.logging:
self.logging.info(f"Completed task {task[0]} ({completed_tasks}/{len(tasks)})")
except Exception as e:
error_msg = f"Task {task[1]} with stop loss {task[3]} failed: {e}"
if self.logging:
self.logging.error(error_msg)
raise RuntimeError(error_msg) from e
except Exception as e:
error_msg = f"Parallel execution failed: {e}"
if self.logging:
self.logging.error(error_msg)
raise RuntimeError(error_msg) from e
finally:
# Stop progress display
if self.progress_manager:
self.progress_manager.stop_display()
if self.logging:
self.logging.info(f"All {len(tasks)} tasks completed successfully")
return all_results, all_trades
def _process_single_task(
self,
task: Tuple[str, str, pd.DataFrame, float, float],
progress_callback=None
) -> Tuple[List[Dict], List[Dict]]:
"""
Process a single backtest task
Args:
task: Tuple of (task_id, timeframe, data_1min, stop_loss_pct, initial_usd)
progress_callback: Optional progress callback function
Returns:
Tuple of (results, trades)
"""
task_id, timeframe, data_1min, stop_loss_pct, initial_usd = task
try:
if timeframe == "1T" or timeframe == "1min":
df = data_1min.copy()
else:
df = self._resample_data(data_1min, timeframe)
results, trades = self.result_processor.process_timeframe_results(
data_1min,
df,
[stop_loss_pct],
timeframe,
initial_usd,
progress_callback=progress_callback
)
# OPTIMIZATION: Skip individual trade file saving during parallel execution
# Trade files will be saved in batch at the end
# if trades:
# self.result_processor.save_trade_file(trades, timeframe, stop_loss_pct)
if self.logging:
self.logging.info(f"Completed task {task_id}: {len(results)} results, {len(trades)} trades")
return results, trades
except Exception as e:
error_msg = f"Failed to process {timeframe} with stop loss {stop_loss_pct}: {e}"
if self.logging:
self.logging.error(error_msg)
raise RuntimeError(error_msg) from e
def _resample_data(self, data_1min: pd.DataFrame, timeframe: str) -> pd.DataFrame:
"""
Resample 1-minute data to specified timeframe
Args:
data_1min: 1-minute data DataFrame
timeframe: Target timeframe string
Returns:
Resampled DataFrame
"""
try:
agg_dict = {
'open': 'first',
'high': 'max',
'low': 'min',
'close': 'last',
'volume': 'sum'
}
if 'predicted_close_price' in data_1min.columns:
agg_dict['predicted_close_price'] = 'last'
resampled = data_1min.resample(timeframe).agg(agg_dict).dropna()
return resampled.reset_index()
except Exception as e:
error_msg = f"Failed to resample data to {timeframe}: {e}"
if self.logging:
self.logging.error(error_msg)
raise ValueError(error_msg) from e
def _get_timeframe_factor(self, timeframe: str) -> int:
"""
Get the factor by which data is reduced when resampling to timeframe
Args:
timeframe: Target timeframe string (e.g., '1h', '4h', '1D')
Returns:
Factor for estimating data size after resampling
"""
timeframe_factors = {
'1T': 1, '1min': 1,
'5T': 5, '5min': 5,
'15T': 15, '15min': 15,
'30T': 30, '30min': 30,
'1h': 60, '1H': 60,
'2h': 120, '2H': 120,
'4h': 240, '4H': 240,
'6h': 360, '6H': 360,
'8h': 480, '8H': 480,
'12h': 720, '12H': 720,
'1D': 1440, '1d': 1440,
'2D': 2880, '2d': 2880,
'3D': 4320, '3d': 4320,
'1W': 10080, '1w': 10080
}
return timeframe_factors.get(timeframe, 60) # Default to 1 hour if unknown
def load_data(self, filename: str, start_date: str, stop_date: str) -> pd.DataFrame:
"""
Load and validate data for backtesting
Args:
filename: Name of data file
start_date: Start date string
stop_date: Stop date string
Returns:
Loaded and validated DataFrame
Raises:
ValueError: If data is empty or invalid
"""
try:
data = self.storage.load_data(filename, start_date, stop_date)
if data.empty:
raise ValueError(f"No data loaded for period {start_date} to {stop_date}")
required_columns = ['open', 'high', 'low', 'close', 'volume']
if 'predicted_close_price' in data.columns:
required_columns.append('predicted_close_price')
missing_columns = [col for col in required_columns if col not in data.columns]
if missing_columns:
raise ValueError(f"Missing required columns: {missing_columns}")
if self.logging:
self.logging.info(f"Loaded {len(data)} rows of data from {filename}")
return data
except Exception as e:
error_msg = f"Failed to load data from {filename}: {e}"
if self.logging:
self.logging.error(error_msg)
raise RuntimeError(error_msg) from e
def validate_inputs(
self,
timeframes: List[str],
stop_loss_pcts: List[float],
initial_usd: float
) -> None:
"""
Validate backtest input parameters
Args:
timeframes: List of timeframe strings
stop_loss_pcts: List of stop loss percentages
initial_usd: Initial USD amount
Raises:
ValueError: If any input is invalid
"""
if not timeframes:
raise ValueError("At least one timeframe must be specified")
if not stop_loss_pcts:
raise ValueError("At least one stop loss percentage must be specified")
for pct in stop_loss_pcts:
if not 0 < pct < 1:
raise ValueError(f"Stop loss percentage must be between 0 and 1, got: {pct}")
if initial_usd <= 0:
raise ValueError("Initial USD must be positive")
if self.logging:
self.logging.info("Input validation completed successfully")