""" Walk-Forward Analysis optimizer for strategy parameter optimization. Implements expanding window walk-forward analysis with train/test splits. """ import numpy as np import pandas as pd import vectorbt as vbt from engine.logging_config import get_logger logger = get_logger(__name__) def create_rolling_windows( index: pd.Index, n_windows: int, train_split: float = 0.7 ): """ Create rolling train/test split indices using expanding window approach. Args: index: DataFrame index to split n_windows: Number of walk-forward windows train_split: Unused, kept for API compatibility Yields: Tuples of (train_idx, test_idx) numpy arrays """ chunks = np.array_split(index, n_windows + 1) for i in range(n_windows): train_idx = np.concatenate([c for c in chunks[:i+1]]) test_idx = chunks[i+1] yield train_idx, test_idx class WalkForwardOptimizer: """ Walk-Forward Analysis optimizer for strategy backtesting. Optimizes strategy parameters on training windows and validates on out-of-sample test windows. """ def __init__( self, backtester, strategy, param_grid: dict, metric: str = 'Sharpe Ratio', fees: float = 0.001, freq: str = '1m' ): """ Initialize the optimizer. Args: backtester: Backtester instance strategy: Strategy instance to optimize param_grid: Parameter grid for optimization metric: Performance metric to optimize fees: Transaction fees for simulation freq: Data frequency for portfolio simulation """ self.bt = backtester self.strategy = strategy self.param_grid = param_grid self.metric = metric self.fees = fees self.freq = freq # Separate grid params (lists) from fixed params (scalars) self.grid_keys = [] self.fixed_params = {} for k, v in param_grid.items(): if isinstance(v, (list, np.ndarray)): self.grid_keys.append(k) else: self.fixed_params[k] = v def run( self, close_price: pd.Series, high: pd.Series | None = None, low: pd.Series | None = None, n_windows: int = 10 ) -> tuple[pd.DataFrame, pd.Series | None]: """ Execute walk-forward analysis. Args: close_price: Close price series high: High price series (optional) low: Low price series (optional) n_windows: Number of walk-forward windows Returns: Tuple of (results DataFrame, stitched equity curve) """ results = [] equity_curves = [] logger.info( "Starting Walk-Forward Analysis with %d windows (Expanding Train)...", n_windows ) splitter = create_rolling_windows(close_price.index, n_windows) for i, (train_idx, test_idx) in enumerate(splitter): logger.info("Processing Window %d/%d...", i + 1, n_windows) window_result = self._process_window( i, train_idx, test_idx, close_price, high, low ) if window_result is not None: result_dict, eq_curve = window_result results.append(result_dict) equity_curves.append(eq_curve) stitched_series = self._stitch_equity_curves(equity_curves) return pd.DataFrame(results), stitched_series def _process_window( self, window_idx: int, train_idx: np.ndarray, test_idx: np.ndarray, close_price: pd.Series, high: pd.Series | None, low: pd.Series | None ) -> tuple[dict, pd.Series] | None: """Process a single WFA window.""" try: # Slice data for train/test train_close = close_price.loc[train_idx] train_high = high.loc[train_idx] if high is not None else None train_low = low.loc[train_idx] if low is not None else None # Train phase: find best parameters best_params, best_score = self._optimize_train( train_close, train_high, train_low ) # Test phase: validate with best params test_close = close_price.loc[test_idx] test_high = high.loc[test_idx] if high is not None else None test_low = low.loc[test_idx] if low is not None else None test_params = {**self.fixed_params, **best_params} test_score, test_return, eq_curve = self._run_test( test_close, test_high, test_low, test_params ) return { 'window': window_idx + 1, 'train_start': train_idx[0], 'train_end': train_idx[-1], 'test_start': test_idx[0], 'test_end': test_idx[-1], 'best_params': best_params, 'train_score': best_score, 'test_score': test_score, 'test_return': test_return }, eq_curve except Exception as e: logger.error("Error in window %d: %s", window_idx + 1, e, exc_info=True) return None def _optimize_train( self, close: pd.Series, high: pd.Series | None, low: pd.Series | None ) -> tuple[dict, float]: """Run grid search on training data to find best parameters.""" entries, exits = self.strategy.run( close, high=high, low=low, **self.param_grid ) pf_train = vbt.Portfolio.from_signals( close, entries, exits, fees=self.fees, freq=self.freq ) perf_stats = pf_train.sharpe_ratio() perf_stats = perf_stats.fillna(-999) best_idx = perf_stats.idxmax() best_score = perf_stats.max() # Extract best params from grid search if len(self.grid_keys) == 1: best_params = {self.grid_keys[0]: best_idx} elif len(self.grid_keys) > 1: best_params = dict(zip(self.grid_keys, best_idx)) else: best_params = {} return best_params, best_score def _run_test( self, close: pd.Series, high: pd.Series | None, low: pd.Series | None, params: dict ) -> tuple[float, float, pd.Series]: """Run test phase with given parameters.""" entries, exits = self.strategy.run( close, high=high, low=low, **params ) pf_test = vbt.Portfolio.from_signals( close, entries, exits, fees=self.fees, freq=self.freq ) return pf_test.sharpe_ratio(), pf_test.total_return(), pf_test.value() def _stitch_equity_curves( self, equity_curves: list[pd.Series] ) -> pd.Series | None: """Stitch multiple equity curves into a continuous series.""" if not equity_curves: return None stitched = [equity_curves[0]] for j in range(1, len(equity_curves)): prev_end_val = stitched[-1].iloc[-1] curr_curve = equity_curves[j] init_cash = curr_curve.iloc[0] # Scale curve to continue from previous end value scaled_curve = (curr_curve / init_cash) * prev_end_val stitched.append(scaled_curve) return pd.concat(stitched)