462 lines
16 KiB
Python
462 lines
16 KiB
Python
import pandas as pd
|
|
import concurrent.futures
|
|
import logging
|
|
from typing import List, Tuple, Dict, Any, Optional
|
|
|
|
from cycles.utils.storage import Storage
|
|
from cycles.utils.system import SystemUtils
|
|
from cycles.utils.progress_manager import ProgressManager
|
|
from result_processor import ResultProcessor
|
|
|
|
|
|
def _process_single_task_static(task: Tuple[str, str, pd.DataFrame, float, float], progress_callback=None) -> Tuple[List[Dict], List[Dict]]:
|
|
"""
|
|
Static version of _process_single_task for use with ProcessPoolExecutor
|
|
|
|
Args:
|
|
task: Tuple of (task_id, timeframe, data_1min, stop_loss_pct, initial_usd)
|
|
progress_callback: Optional progress callback function
|
|
|
|
Returns:
|
|
Tuple of (results, trades)
|
|
"""
|
|
task_id, timeframe, data_1min, stop_loss_pct, initial_usd = task
|
|
|
|
try:
|
|
if timeframe == "1T" or timeframe == "1min":
|
|
df = data_1min.copy()
|
|
else:
|
|
df = _resample_data_static(data_1min, timeframe)
|
|
|
|
# Create required components for processing
|
|
from cycles.utils.storage import Storage
|
|
from result_processor import ResultProcessor
|
|
|
|
# Create storage with default paths (for subprocess)
|
|
storage = Storage()
|
|
result_processor = ResultProcessor(storage)
|
|
|
|
results, trades = result_processor.process_timeframe_results(
|
|
data_1min,
|
|
df,
|
|
[stop_loss_pct],
|
|
timeframe,
|
|
initial_usd,
|
|
progress_callback=progress_callback
|
|
)
|
|
|
|
return results, trades
|
|
|
|
except Exception as e:
|
|
error_msg = f"Failed to process {timeframe} with stop loss {stop_loss_pct}: {e}"
|
|
raise RuntimeError(error_msg) from e
|
|
|
|
|
|
def _resample_data_static(data_1min: pd.DataFrame, timeframe: str) -> pd.DataFrame:
|
|
"""
|
|
Static function to resample 1-minute data to specified timeframe
|
|
|
|
Args:
|
|
data_1min: 1-minute data DataFrame
|
|
timeframe: Target timeframe string
|
|
|
|
Returns:
|
|
Resampled DataFrame
|
|
"""
|
|
try:
|
|
agg_dict = {
|
|
'open': 'first',
|
|
'high': 'max',
|
|
'low': 'min',
|
|
'close': 'last',
|
|
'volume': 'sum'
|
|
}
|
|
|
|
if 'predicted_close_price' in data_1min.columns:
|
|
agg_dict['predicted_close_price'] = 'last'
|
|
|
|
resampled = data_1min.resample(timeframe).agg(agg_dict).dropna()
|
|
|
|
return resampled.reset_index()
|
|
|
|
except Exception as e:
|
|
error_msg = f"Failed to resample data to {timeframe}: {e}"
|
|
raise ValueError(error_msg) from e
|
|
|
|
|
|
class BacktestRunner:
|
|
"""Handles the execution of backtests across multiple timeframes and parameters"""
|
|
|
|
def __init__(
|
|
self,
|
|
storage: Storage,
|
|
system_utils: SystemUtils,
|
|
result_processor: ResultProcessor,
|
|
logging_instance: Optional[logging.Logger] = None,
|
|
show_progress: bool = True
|
|
):
|
|
"""
|
|
Initialize backtest runner
|
|
|
|
Args:
|
|
storage: Storage instance for data operations
|
|
system_utils: System utilities for resource management
|
|
result_processor: Result processor for handling outputs
|
|
logging_instance: Optional logging instance
|
|
show_progress: Whether to show visual progress bars
|
|
"""
|
|
self.storage = storage
|
|
self.system_utils = system_utils
|
|
self.result_processor = result_processor
|
|
self.logging = logging_instance
|
|
self.show_progress = show_progress
|
|
self.progress_manager = ProgressManager() if show_progress else None
|
|
|
|
def run_backtests(
|
|
self,
|
|
data_1min: pd.DataFrame,
|
|
timeframes: List[str],
|
|
stop_loss_pcts: List[float],
|
|
initial_usd: float,
|
|
debug: bool = False
|
|
) -> Tuple[List[Dict], List[Dict]]:
|
|
"""
|
|
Run backtests across all timeframe and stop loss combinations
|
|
|
|
Args:
|
|
data_1min: 1-minute data DataFrame
|
|
timeframes: List of timeframe strings (e.g., ['1D', '6h'])
|
|
stop_loss_pcts: List of stop loss percentages
|
|
initial_usd: Initial USD amount
|
|
debug: Whether to enable debug mode
|
|
|
|
Returns:
|
|
Tuple of (all_results, all_trades)
|
|
"""
|
|
# Create tasks for all combinations
|
|
tasks = self._create_tasks(timeframes, stop_loss_pcts, data_1min, initial_usd)
|
|
|
|
if self.logging:
|
|
self.logging.info(f"Starting {len(tasks)} backtest tasks")
|
|
|
|
if debug:
|
|
return self._run_sequential(tasks)
|
|
else:
|
|
return self._run_parallel(tasks)
|
|
|
|
def _create_tasks(
|
|
self,
|
|
timeframes: List[str],
|
|
stop_loss_pcts: List[float],
|
|
data_1min: pd.DataFrame,
|
|
initial_usd: float
|
|
) -> List[Tuple]:
|
|
"""Create task tuples for processing"""
|
|
tasks = []
|
|
for timeframe in timeframes:
|
|
for stop_loss_pct in stop_loss_pcts:
|
|
task_id = f"{timeframe}_{stop_loss_pct}"
|
|
task = (task_id, timeframe, data_1min, stop_loss_pct, initial_usd)
|
|
tasks.append(task)
|
|
return tasks
|
|
|
|
|
|
|
|
def _run_sequential(self, tasks: List[Tuple]) -> Tuple[List[Dict], List[Dict]]:
|
|
"""Run tasks sequentially (for debug mode)"""
|
|
# Initialize progress tracking if enabled
|
|
if self.progress_manager:
|
|
for task in tasks:
|
|
task_id, timeframe, data_1min, stop_loss_pct, initial_usd = task
|
|
|
|
# Calculate actual DataFrame size that will be processed
|
|
if timeframe == "1T" or timeframe == "1min":
|
|
actual_df_size = len(data_1min)
|
|
else:
|
|
# Get the actual resampled DataFrame size
|
|
temp_df = self._resample_data(data_1min, timeframe)
|
|
actual_df_size = len(temp_df)
|
|
|
|
task_name = f"{timeframe} SL:{stop_loss_pct:.0%}"
|
|
self.progress_manager.start_task(task_id, task_name, actual_df_size)
|
|
|
|
self.progress_manager.start_display()
|
|
|
|
all_results = []
|
|
all_trades = []
|
|
|
|
try:
|
|
for task in tasks:
|
|
try:
|
|
# Get progress callback for this task if available
|
|
progress_callback = None
|
|
if self.progress_manager:
|
|
progress_callback = self.progress_manager.get_task_progress_callback(task[0])
|
|
|
|
results, trades = self._process_single_task(task, progress_callback)
|
|
|
|
if results:
|
|
all_results.extend(results)
|
|
if trades:
|
|
all_trades.extend(trades)
|
|
|
|
# Mark task as completed
|
|
if self.progress_manager:
|
|
self.progress_manager.complete_task(task[0])
|
|
|
|
except Exception as e:
|
|
error_msg = f"Error processing task {task[1]} with stop loss {task[3]}: {e}"
|
|
|
|
if self.logging:
|
|
self.logging.error(error_msg)
|
|
|
|
raise RuntimeError(error_msg) from e
|
|
finally:
|
|
# Stop progress display
|
|
if self.progress_manager:
|
|
self.progress_manager.stop_display()
|
|
|
|
return all_results, all_trades
|
|
|
|
def _run_parallel(self, tasks: List[Tuple]) -> Tuple[List[Dict], List[Dict]]:
|
|
"""Run tasks in parallel using ProcessPoolExecutor"""
|
|
workers = self.system_utils.get_optimal_workers()
|
|
|
|
if self.logging:
|
|
self.logging.info(f"Running {len(tasks)} tasks with {workers} workers")
|
|
|
|
# OPTIMIZATION: Disable progress manager for parallel execution to reduce overhead
|
|
# Progress tracking adds significant overhead in multiprocessing
|
|
if self.progress_manager and self.logging:
|
|
self.logging.info("Progress tracking disabled for parallel execution (performance optimization)")
|
|
|
|
all_results = []
|
|
all_trades = []
|
|
completed_tasks = 0
|
|
|
|
try:
|
|
with concurrent.futures.ProcessPoolExecutor(max_workers=workers) as executor:
|
|
future_to_task = {
|
|
executor.submit(_process_single_task_static, task): task
|
|
for task in tasks
|
|
}
|
|
|
|
for future in concurrent.futures.as_completed(future_to_task):
|
|
task = future_to_task[future]
|
|
try:
|
|
results, trades = future.result()
|
|
if results:
|
|
all_results.extend(results)
|
|
if trades:
|
|
all_trades.extend(trades)
|
|
|
|
completed_tasks += 1
|
|
|
|
if self.logging:
|
|
self.logging.info(f"Completed task {task[0]} ({completed_tasks}/{len(tasks)})")
|
|
|
|
except Exception as e:
|
|
error_msg = f"Task {task[1]} with stop loss {task[3]} failed: {e}"
|
|
if self.logging:
|
|
self.logging.error(error_msg)
|
|
raise RuntimeError(error_msg) from e
|
|
|
|
except Exception as e:
|
|
error_msg = f"Parallel execution failed: {e}"
|
|
if self.logging:
|
|
self.logging.error(error_msg)
|
|
raise RuntimeError(error_msg) from e
|
|
|
|
finally:
|
|
# Stop progress display
|
|
if self.progress_manager:
|
|
self.progress_manager.stop_display()
|
|
|
|
if self.logging:
|
|
self.logging.info(f"All {len(tasks)} tasks completed successfully")
|
|
|
|
return all_results, all_trades
|
|
|
|
def _process_single_task(
|
|
self,
|
|
task: Tuple[str, str, pd.DataFrame, float, float],
|
|
progress_callback=None
|
|
) -> Tuple[List[Dict], List[Dict]]:
|
|
"""
|
|
Process a single backtest task
|
|
|
|
Args:
|
|
task: Tuple of (task_id, timeframe, data_1min, stop_loss_pct, initial_usd)
|
|
progress_callback: Optional progress callback function
|
|
|
|
Returns:
|
|
Tuple of (results, trades)
|
|
"""
|
|
task_id, timeframe, data_1min, stop_loss_pct, initial_usd = task
|
|
|
|
try:
|
|
if timeframe == "1T" or timeframe == "1min":
|
|
df = data_1min.copy()
|
|
else:
|
|
df = self._resample_data(data_1min, timeframe)
|
|
|
|
results, trades = self.result_processor.process_timeframe_results(
|
|
data_1min,
|
|
df,
|
|
[stop_loss_pct],
|
|
timeframe,
|
|
initial_usd,
|
|
progress_callback=progress_callback
|
|
)
|
|
|
|
# OPTIMIZATION: Skip individual trade file saving during parallel execution
|
|
# Trade files will be saved in batch at the end
|
|
# if trades:
|
|
# self.result_processor.save_trade_file(trades, timeframe, stop_loss_pct)
|
|
|
|
if self.logging:
|
|
self.logging.info(f"Completed task {task_id}: {len(results)} results, {len(trades)} trades")
|
|
|
|
return results, trades
|
|
|
|
except Exception as e:
|
|
error_msg = f"Failed to process {timeframe} with stop loss {stop_loss_pct}: {e}"
|
|
if self.logging:
|
|
self.logging.error(error_msg)
|
|
raise RuntimeError(error_msg) from e
|
|
|
|
def _resample_data(self, data_1min: pd.DataFrame, timeframe: str) -> pd.DataFrame:
|
|
"""
|
|
Resample 1-minute data to specified timeframe
|
|
|
|
Args:
|
|
data_1min: 1-minute data DataFrame
|
|
timeframe: Target timeframe string
|
|
|
|
Returns:
|
|
Resampled DataFrame
|
|
"""
|
|
try:
|
|
agg_dict = {
|
|
'open': 'first',
|
|
'high': 'max',
|
|
'low': 'min',
|
|
'close': 'last',
|
|
'volume': 'sum'
|
|
}
|
|
|
|
if 'predicted_close_price' in data_1min.columns:
|
|
agg_dict['predicted_close_price'] = 'last'
|
|
|
|
resampled = data_1min.resample(timeframe).agg(agg_dict).dropna()
|
|
|
|
return resampled.reset_index()
|
|
|
|
except Exception as e:
|
|
error_msg = f"Failed to resample data to {timeframe}: {e}"
|
|
if self.logging:
|
|
self.logging.error(error_msg)
|
|
raise ValueError(error_msg) from e
|
|
|
|
def _get_timeframe_factor(self, timeframe: str) -> int:
|
|
"""
|
|
Get the factor by which data is reduced when resampling to timeframe
|
|
|
|
Args:
|
|
timeframe: Target timeframe string (e.g., '1h', '4h', '1D')
|
|
|
|
Returns:
|
|
Factor for estimating data size after resampling
|
|
"""
|
|
timeframe_factors = {
|
|
'1T': 1, '1min': 1,
|
|
'5T': 5, '5min': 5,
|
|
'15T': 15, '15min': 15,
|
|
'30T': 30, '30min': 30,
|
|
'1h': 60, '1H': 60,
|
|
'2h': 120, '2H': 120,
|
|
'4h': 240, '4H': 240,
|
|
'6h': 360, '6H': 360,
|
|
'8h': 480, '8H': 480,
|
|
'12h': 720, '12H': 720,
|
|
'1D': 1440, '1d': 1440,
|
|
'2D': 2880, '2d': 2880,
|
|
'3D': 4320, '3d': 4320,
|
|
'1W': 10080, '1w': 10080
|
|
}
|
|
return timeframe_factors.get(timeframe, 60) # Default to 1 hour if unknown
|
|
|
|
def load_data(self, filename: str, start_date: str, stop_date: str) -> pd.DataFrame:
|
|
"""
|
|
Load and validate data for backtesting
|
|
|
|
Args:
|
|
filename: Name of data file
|
|
start_date: Start date string
|
|
stop_date: Stop date string
|
|
|
|
Returns:
|
|
Loaded and validated DataFrame
|
|
|
|
Raises:
|
|
ValueError: If data is empty or invalid
|
|
"""
|
|
try:
|
|
data = self.storage.load_data(filename, start_date, stop_date)
|
|
|
|
if data.empty:
|
|
raise ValueError(f"No data loaded for period {start_date} to {stop_date}")
|
|
|
|
required_columns = ['open', 'high', 'low', 'close', 'volume']
|
|
|
|
if 'predicted_close_price' in data.columns:
|
|
required_columns.append('predicted_close_price')
|
|
|
|
missing_columns = [col for col in required_columns if col not in data.columns]
|
|
|
|
if missing_columns:
|
|
raise ValueError(f"Missing required columns: {missing_columns}")
|
|
|
|
if self.logging:
|
|
self.logging.info(f"Loaded {len(data)} rows of data from {filename}")
|
|
|
|
return data
|
|
|
|
except Exception as e:
|
|
error_msg = f"Failed to load data from {filename}: {e}"
|
|
if self.logging:
|
|
self.logging.error(error_msg)
|
|
raise RuntimeError(error_msg) from e
|
|
|
|
def validate_inputs(
|
|
self,
|
|
timeframes: List[str],
|
|
stop_loss_pcts: List[float],
|
|
initial_usd: float
|
|
) -> None:
|
|
"""
|
|
Validate backtest input parameters
|
|
|
|
Args:
|
|
timeframes: List of timeframe strings
|
|
stop_loss_pcts: List of stop loss percentages
|
|
initial_usd: Initial USD amount
|
|
|
|
Raises:
|
|
ValueError: If any input is invalid
|
|
"""
|
|
if not timeframes:
|
|
raise ValueError("At least one timeframe must be specified")
|
|
|
|
if not stop_loss_pcts:
|
|
raise ValueError("At least one stop loss percentage must be specified")
|
|
|
|
for pct in stop_loss_pcts:
|
|
if not 0 < pct < 1:
|
|
raise ValueError(f"Stop loss percentage must be between 0 and 1, got: {pct}")
|
|
|
|
if initial_usd <= 0:
|
|
raise ValueError("Initial USD must be positive")
|
|
|
|
if self.logging:
|
|
self.logging.info("Input validation completed successfully") |