Cycles/backtest_runner.py

289 lines
9.8 KiB
Python
Raw Normal View History

import pandas as pd
import concurrent.futures
import logging
from typing import List, Tuple, Dict, Any, Optional
from cycles.utils.storage import Storage
from cycles.utils.system import SystemUtils
from result_processor import ResultProcessor
class BacktestRunner:
"""Handles the execution of backtests across multiple timeframes and parameters"""
def __init__(
self,
storage: Storage,
system_utils: SystemUtils,
result_processor: ResultProcessor,
logging_instance: Optional[logging.Logger] = None
):
"""
Initialize backtest runner
Args:
storage: Storage instance for data operations
system_utils: System utilities for resource management
result_processor: Result processor for handling outputs
logging_instance: Optional logging instance
"""
self.storage = storage
self.system_utils = system_utils
self.result_processor = result_processor
self.logging = logging_instance
def run_backtests(
self,
data_1min: pd.DataFrame,
timeframes: List[str],
stop_loss_pcts: List[float],
initial_usd: float,
debug: bool = False
) -> Tuple[List[Dict], List[Dict]]:
"""
Run backtests across all timeframe and stop loss combinations
Args:
data_1min: 1-minute data DataFrame
timeframes: List of timeframe strings (e.g., ['1D', '6h'])
stop_loss_pcts: List of stop loss percentages
initial_usd: Initial USD amount
debug: Whether to enable debug mode
Returns:
Tuple of (all_results, all_trades)
"""
# Create tasks for all combinations
tasks = self._create_tasks(timeframes, stop_loss_pcts, data_1min, initial_usd)
if debug:
return self._run_sequential(tasks, debug)
else:
return self._run_parallel(tasks, debug)
def _create_tasks(
self,
timeframes: List[str],
stop_loss_pcts: List[float],
data_1min: pd.DataFrame,
initial_usd: float
) -> List[Tuple]:
"""Create task tuples for processing"""
tasks = []
for timeframe in timeframes:
for stop_loss_pct in stop_loss_pcts:
task = (timeframe, data_1min, stop_loss_pct, initial_usd)
tasks.append(task)
return tasks
def _run_sequential(self, tasks: List[Tuple], debug: bool) -> Tuple[List[Dict], List[Dict]]:
"""Run tasks sequentially (for debug mode)"""
all_results = []
all_trades = []
for task in tasks:
try:
results, trades = self._process_single_task(task, debug)
if results:
all_results.extend(results)
if trades:
all_trades.extend(trades)
except Exception as e:
error_msg = f"Error processing task {task[0]} with stop loss {task[2]}: {e}"
if self.logging:
self.logging.error(error_msg)
raise RuntimeError(error_msg) from e
return all_results, all_trades
def _run_parallel(self, tasks: List[Tuple], debug: bool) -> Tuple[List[Dict], List[Dict]]:
"""Run tasks in parallel using ProcessPoolExecutor"""
workers = self.system_utils.get_optimal_workers()
if self.logging:
self.logging.info(f"Running {len(tasks)} tasks with {workers} workers")
all_results = []
all_trades = []
try:
with concurrent.futures.ProcessPoolExecutor(max_workers=workers) as executor:
# Submit all tasks
future_to_task = {
executor.submit(self._process_single_task, task, debug): task
for task in tasks
}
# Collect results as they complete
for future in concurrent.futures.as_completed(future_to_task):
task = future_to_task[future]
try:
results, trades = future.result()
if results:
all_results.extend(results)
if trades:
all_trades.extend(trades)
except Exception as e:
error_msg = f"Task {task[0]} with stop loss {task[2]} failed: {e}"
if self.logging:
self.logging.error(error_msg)
raise RuntimeError(error_msg) from e
except Exception as e:
error_msg = f"Parallel execution failed: {e}"
if self.logging:
self.logging.error(error_msg)
raise RuntimeError(error_msg) from e
return all_results, all_trades
def _process_single_task(
self,
task: Tuple[str, pd.DataFrame, float, float],
debug: bool = False
) -> Tuple[List[Dict], List[Dict]]:
"""
Process a single backtest task
Args:
task: Tuple of (timeframe, data_1min, stop_loss_pct, initial_usd)
debug: Whether to enable debug output
Returns:
Tuple of (results, trades)
"""
timeframe, data_1min, stop_loss_pct, initial_usd = task
try:
# Resample data if needed
if timeframe == "1T" or timeframe == "1min":
df = data_1min.copy()
else:
df = self._resample_data(data_1min, timeframe)
# Process timeframe results
results, trades = self.result_processor.process_timeframe_results(
data_1min,
df,
[stop_loss_pct],
timeframe,
initial_usd,
debug
)
# Save individual trade files if trades exist
if trades:
self.result_processor.save_trade_file(trades, timeframe, stop_loss_pct)
return results, trades
except Exception as e:
error_msg = f"Failed to process {timeframe} with stop loss {stop_loss_pct}: {e}"
if self.logging:
self.logging.error(error_msg)
raise RuntimeError(error_msg) from e
def _resample_data(self, data_1min: pd.DataFrame, timeframe: str) -> pd.DataFrame:
"""
Resample 1-minute data to specified timeframe
Args:
data_1min: 1-minute data DataFrame
timeframe: Target timeframe string
Returns:
Resampled DataFrame
"""
try:
resampled = data_1min.resample(timeframe).agg({
'open': 'first',
'high': 'max',
'low': 'min',
'close': 'last',
'volume': 'sum'
}).dropna()
return resampled.reset_index()
except Exception as e:
error_msg = f"Failed to resample data to {timeframe}: {e}"
if self.logging:
self.logging.error(error_msg)
raise ValueError(error_msg) from e
def load_data(self, filename: str, start_date: str, stop_date: str) -> pd.DataFrame:
"""
Load and validate data for backtesting
Args:
filename: Name of data file
start_date: Start date string
stop_date: Stop date string
Returns:
Loaded and validated DataFrame
Raises:
ValueError: If data is empty or invalid
"""
try:
data = self.storage.load_data(filename, start_date, stop_date)
if data.empty:
raise ValueError(f"No data loaded for period {start_date} to {stop_date}")
# Validate required columns
required_columns = ['open', 'high', 'low', 'close', 'volume']
missing_columns = [col for col in required_columns if col not in data.columns]
if missing_columns:
raise ValueError(f"Missing required columns: {missing_columns}")
if self.logging:
self.logging.info(f"Loaded {len(data)} rows of data from {filename}")
return data
except Exception as e:
error_msg = f"Failed to load data from {filename}: {e}"
if self.logging:
self.logging.error(error_msg)
raise RuntimeError(error_msg) from e
def validate_inputs(
self,
timeframes: List[str],
stop_loss_pcts: List[float],
initial_usd: float
) -> None:
"""
Validate backtest input parameters
Args:
timeframes: List of timeframe strings
stop_loss_pcts: List of stop loss percentages
initial_usd: Initial USD amount
Raises:
ValueError: If any input is invalid
"""
# Validate timeframes
if not timeframes:
raise ValueError("At least one timeframe must be specified")
# Validate stop loss percentages
if not stop_loss_pcts:
raise ValueError("At least one stop loss percentage must be specified")
for pct in stop_loss_pcts:
if not 0 < pct < 1:
raise ValueError(f"Stop loss percentage must be between 0 and 1, got: {pct}")
# Validate initial USD
if initial_usd <= 0:
raise ValueError("Initial USD must be positive")
if self.logging:
self.logging.info("Input validation completed successfully")