import pandas as pd import numpy as np import logging import concurrent.futures import os import datetime import argparse import json from cycles.utils.storage import Storage from cycles.utils.system import SystemUtils from cycles.backtest import Backtest from cycles.Analysis.supertrend import Supertrends from cycles.charts import BacktestCharts from cycles.Analysis.strategies import Strategy from cycles.strategies import StrategyManager, create_strategy_manager logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", handlers=[ logging.FileHandler("backtest.log"), logging.StreamHandler() ] ) def default_init_strategy(backtester: Backtest): """Calculate meta trend """ supertrends = Supertrends(backtester.df, verbose=False) supertrend_results_list = supertrends.calculate_supertrend_indicators() trends = [st['results']['trend'] for st in supertrend_results_list] trends_arr = np.stack(trends, axis=1) meta_trend = np.where((trends_arr[:,0] == trends_arr[:,1]) & (trends_arr[:,1] == trends_arr[:,2]), trends_arr[:,0], 0) backtester.strategies["meta_trend"] = meta_trend def bbrs_init_strategy(backtester: Backtest): """BBRs entry strategy initialization - just setup basic structure""" # Initialize empty strategies backtester.strategies["buy_signals"] = pd.Series(False, index=range(len(backtester.df))) backtester.strategies["sell_signals"] = pd.Series(False, index=range(len(backtester.df))) return backtester def run_bbrs_strategy_processing(backtester: Backtest, original_df): """Run the actual strategy processing after backtest is initialized""" config_strategy = { "bb_width": 0.05, "bb_period": 20, "rsi_period": 14, "trending": { "rsi_threshold": [30, 70], "bb_std_dev_multiplier": 2.5, }, "sideways": { "rsi_threshold": [40, 60], "bb_std_dev_multiplier": 1.8, }, "strategy_name": "MarketRegimeStrategy", # "MarketRegimeStrategy", # CryptoTradingStrategy "SqueezeStrategy": True } strategy = Strategy(config=config_strategy, logging=logging) processed_data = strategy.run(original_df, config_strategy["strategy_name"]) print(f"processed_data: {processed_data.head()}") # Store processed data for plotting backtester.processed_data = processed_data if processed_data.empty: # If strategy processing failed, create empty signals aligned with backtest DataFrame buy_condition = pd.Series(False, index=range(len(backtester.df))) sell_condition = pd.Series(False, index=range(len(backtester.df))) else: # Get original signals from processed data buy_signals_raw = processed_data.get('BuySignal', pd.Series(False, index=processed_data.index)).astype(bool) sell_signals_raw = processed_data.get('SellSignal', pd.Series(False, index=processed_data.index)).astype(bool) # Get the DatetimeIndex from the original 1-minute data original_datetime_index = original_df.index # Reindex signals from 15-minute to 1-minute resolution using forward-fill # This maps each 15-minute signal to the corresponding 1-minute timestamps buy_signals_1min = buy_signals_raw.reindex(original_datetime_index, method='ffill').fillna(False) sell_signals_1min = sell_signals_raw.reindex(original_datetime_index, method='ffill').fillna(False) # Convert to integer index to match backtest DataFrame buy_condition = pd.Series(buy_signals_1min.values, index=range(len(buy_signals_1min))) sell_condition = pd.Series(sell_signals_1min.values, index=range(len(sell_signals_1min))) # Ensure same length as backtest DataFrame (should be same now, but safety check) if len(buy_condition) != len(backtester.df): target_length = len(backtester.df) if len(buy_condition) > target_length: buy_condition = buy_condition[:target_length] sell_condition = sell_condition[:target_length] else: # Pad with False if shorter buy_values = buy_condition.values sell_values = sell_condition.values buy_values = np.pad(buy_values, (0, target_length - len(buy_values)), constant_values=False) sell_values = np.pad(sell_values, (0, target_length - len(sell_values)), constant_values=False) buy_condition = pd.Series(buy_values, index=range(target_length)) sell_condition = pd.Series(sell_values, index=range(target_length)) backtester.strategies["buy_signals"] = buy_condition backtester.strategies["sell_signals"] = sell_condition # backtester.strategies["buy_signals"] = sell_condition # backtester.strategies["sell_signals"] = buy_condition print(f"buy_signals length: {len(backtester.strategies['buy_signals'])}, backtest df length: {len(backtester.df)}") def bbrs_entry_strategy(backtester: Backtest, df_index): """BBRs entry strategy Entry when buy signal is true """ return backtester.strategies["buy_signals"].iloc[df_index] def bbrs_exit_strategy(backtester: Backtest, df_index): """BBRs exit strategy Exit when sell signal is true or stop loss is triggered """ if backtester.strategies["sell_signals"].iloc[df_index]: return "SELL_SIGNAL", backtester.df.iloc[df_index]['close'] # Check for stop loss using BBRs-specific stop loss strategy stop_loss_result, sell_price = bbrs_stop_loss_strategy(backtester) if stop_loss_result: backtester.strategies["current_trade_min1_start_idx"] = \ backtester.current_trade_min1_start_idx return "STOP_LOSS", sell_price return None, None def bbrs_stop_loss_strategy(backtester: Backtest): """BBRs stop loss strategy Calculate stop loss price based on 5% loss Find the first min1 candle that is below the stop loss price If the stop loss price is below the open price, use the open price as the stop loss price """ # Use 5% stop loss as requested stop_loss_pct = 0.05 stop_price = backtester.entry_price * (1 - stop_loss_pct) # Use the original min1 dataframe that has datetime index min1_df = backtester.original_df if hasattr(backtester, 'original_df') else backtester.min1_df min1_index = min1_df.index # Find candles from entry time to current time start_candidates = min1_index[min1_index >= backtester.entry_time] if len(start_candidates) == 0: return False, None backtester.current_trade_min1_start_idx = start_candidates[0] end_candidates = min1_index[min1_index <= backtester.current_date] if len(end_candidates) == 0: print("Warning: no end candidate here. Need to be checked") return False, None backtester.current_min1_end_idx = end_candidates[-1] # Get the slice of data between entry and current time min1_slice = min1_df.loc[backtester.current_trade_min1_start_idx:backtester.current_min1_end_idx] # Check if any candle's low price hits the stop loss if (min1_slice['low'] <= stop_price).any(): stop_candle = min1_slice[min1_slice['low'] <= stop_price].iloc[0] # If the candle opened below stop price, use open price; otherwise use stop price if stop_candle['open'] < stop_price: sell_price = stop_candle['open'] else: sell_price = stop_price return True, sell_price return False, None def default_entry_strategy(backtester: Backtest, df_index): """Entry strategy Entry when meta trend is 1 """ return backtester.strategies["meta_trend"][df_index - 1] != 1 and backtester.strategies["meta_trend"][df_index] == 1 def stop_loss_strategy(backtester: Backtest): """Stop loss strategy Calculate stop loss price Find the first min1 candle that is below the stop loss price If the stop loss price is below the open price, use the open price as the stop loss price """ stop_price = backtester.entry_price * (1 - backtester.strategies["stop_loss_pct"]) min1_index = backtester.min1_df.index start_candidates = min1_index[min1_index >= backtester.entry_time] backtester.current_trade_min1_start_idx = start_candidates[0] end_candidates = min1_index[min1_index <= backtester.current_date] if len(end_candidates) == 0: print("Warning: no end candidate here. Need to be checked") return False, None backtester.current_min1_end_idx = end_candidates[-1] min1_slice = backtester.min1_df.loc[backtester.current_trade_min1_start_idx:backtester.current_min1_end_idx] # print(f"lowest low in that range: {min1_slice['low'].min()}, count: {len(min1_slice)}") # print(f"slice start: {min1_slice.index[0]}, slice end: {min1_slice.index[-1]}") if (min1_slice['low'] <= stop_price).any(): stop_candle = min1_slice[min1_slice['low'] <= stop_price].iloc[0] if stop_candle['open'] < stop_price: sell_price = stop_candle['open'] else: sell_price = stop_price return True, sell_price return False, None def default_exit_strategy(backtester: Backtest, df_index): if backtester.strategies["meta_trend"][df_index - 1] != 1 and \ backtester.strategies["meta_trend"][df_index] == -1: return "META_TREND_EXIT_SIGNAL", None stop_loss_result, sell_price = stop_loss_strategy(backtester) if stop_loss_result: backtester.strategies["current_trade_min1_start_idx"] = \ backtester.min1_df.index[backtester.min1_df.index <= backtester.current_date][-1] return "STOP_LOSS", sell_price return None, None def strategy_manager_init(backtester: Backtest): """Strategy Manager initialization function""" # This will be called by Backtest.__init__, but actual initialization # happens in strategy_manager.initialize() pass def strategy_manager_entry(backtester: Backtest, df_index: int): """Strategy Manager entry function""" return backtester.strategy_manager.get_entry_signal(backtester, df_index) def strategy_manager_exit(backtester: Backtest, df_index: int): """Strategy Manager exit function""" return backtester.strategy_manager.get_exit_signal(backtester, df_index) def process_timeframe_data(min1_df, df, stop_loss_pcts, rule_name, initial_usd, strategy_config=None, debug=False): """Process the entire timeframe with all stop loss values using Strategy Manager""" results_rows = [] trade_rows = [] for stop_loss_pct in stop_loss_pcts: # Create and initialize strategy manager if strategy_config: # Use provided strategy configuration strategy_manager = create_strategy_manager(strategy_config) else: # Default to single default strategy for backward compatibility default_strategy_config = { "strategies": [ { "name": "default", "weight": 1.0, "params": {"stop_loss_pct": stop_loss_pct} } ], "combination_rules": { "entry": "any", "exit": "any", "min_confidence": 0.5 } } strategy_manager = create_strategy_manager(default_strategy_config) # Inject stop_loss_pct into all strategy params if not present for strategy in strategy_manager.strategies: if "stop_loss_pct" not in strategy.params: strategy.params["stop_loss_pct"] = stop_loss_pct # Get the primary timeframe from the first strategy for backtester setup primary_strategy = strategy_manager.strategies[0] primary_timeframe = primary_strategy.get_timeframes()[0] # For BBRS strategy, it works with 1-minute data directly and handles internal resampling # For other strategies, use their preferred timeframe if primary_strategy.name == "bbrs": # BBRS strategy processes 1-minute data and outputs signals on its internal timeframes # Use 1-minute data for backtester working dataframe working_df = min1_df.copy() else: # Other strategies specify their preferred timeframe # Create backtester working data from the primary strategy's primary timeframe temp_backtester = type('temp', (), {})() temp_backtester.original_df = min1_df # Let the primary strategy resample the data to get the working dataframe primary_strategy._resample_data(min1_df) working_df = primary_strategy.get_primary_timeframe_data() # Prepare working dataframe for backtester (ensure timestamp column) working_df_for_backtest = working_df.copy().reset_index() if 'index' in working_df_for_backtest.columns: working_df_for_backtest = working_df_for_backtest.rename(columns={'index': 'timestamp'}) # Initialize backtest with strategy manager initialization backtester = Backtest(initial_usd, working_df_for_backtest, working_df_for_backtest, strategy_manager_init) # Store original min1_df for strategy processing backtester.original_df = min1_df # Attach strategy manager to backtester and initialize backtester.strategy_manager = strategy_manager strategy_manager.initialize(backtester) # Run backtest with strategy manager functions results = backtester.run( strategy_manager_entry, strategy_manager_exit, debug ) n_trades = results["n_trades"] trades = results.get('trades', []) wins = [1 for t in trades if t['exit'] is not None and t['exit'] > t['entry']] n_winning_trades = len(wins) total_profit = sum(trade['profit_pct'] for trade in trades) total_loss = sum(-trade['profit_pct'] for trade in trades if trade['profit_pct'] < 0) win_rate = n_winning_trades / n_trades if n_trades > 0 else 0 avg_trade = total_profit / n_trades if n_trades > 0 else 0 profit_ratio = total_profit / total_loss if total_loss > 0 else float('inf') cumulative_profit = 0 max_drawdown = 0 peak = 0 for trade in trades: cumulative_profit += trade['profit_pct'] if cumulative_profit > peak: peak = cumulative_profit drawdown = peak - cumulative_profit if drawdown > max_drawdown: max_drawdown = drawdown final_usd = initial_usd for trade in trades: final_usd *= (1 + trade['profit_pct']) total_fees_usd = sum(trade.get('fee_usd', 0.0) for trade in trades) # Update row to include timeframe information row = { "timeframe": f"{rule_name}({primary_timeframe})", # Show actual timeframe used "stop_loss_pct": stop_loss_pct, "n_trades": n_trades, "n_stop_loss": sum(1 for trade in trades if 'type' in trade and trade['type'] == 'STOP_LOSS'), "win_rate": win_rate, "max_drawdown": max_drawdown, "avg_trade": avg_trade, "total_profit": total_profit, "total_loss": total_loss, "profit_ratio": profit_ratio, "initial_usd": initial_usd, "final_usd": final_usd, "total_fees_usd": total_fees_usd, } results_rows.append(row) for trade in trades: trade_rows.append({ "timeframe": f"{rule_name}({primary_timeframe})", "stop_loss_pct": stop_loss_pct, "entry_time": trade.get("entry_time"), "exit_time": trade.get("exit_time"), "entry_price": trade.get("entry"), "exit_price": trade.get("exit"), "profit_pct": trade.get("profit_pct"), "type": trade.get("type"), "fee_usd": trade.get("fee_usd"), }) # Log strategy summary strategy_summary = strategy_manager.get_strategy_summary() logging.info(f"Timeframe: {rule_name}({primary_timeframe}), Stop Loss: {stop_loss_pct}, " f"Trades: {n_trades}, Strategies: {[s['name'] for s in strategy_summary['strategies']]}") if debug: # Plot after each backtest run try: # Check if any strategy has processed_data for universal plotting processed_data = None for strategy in strategy_manager.strategies: if hasattr(backtester, 'processed_data') and backtester.processed_data is not None: processed_data = backtester.processed_data break if processed_data is not None and not processed_data.empty: # Format strategy data with actual executed trades for universal plotting formatted_data = BacktestCharts.format_strategy_data_with_trades(processed_data, results) # Plot using universal function BacktestCharts.plot_data(formatted_data) else: # Fallback to meta_trend plot if available if "meta_trend" in backtester.strategies: meta_trend = backtester.strategies["meta_trend"] # Use the working dataframe for plotting BacktestCharts.plot(working_df, meta_trend) else: print("No plotting data available") except Exception as e: print(f"Plotting failed: {e}") return results_rows, trade_rows def process(timeframe_info, debug=False): """Process a single (timeframe, stop_loss_pct) combination with strategy config""" rule, data_1min, stop_loss_pct, initial_usd, strategy_config = timeframe_info # Pass the original 1-minute data - strategies will handle their own timeframe resampling results_rows, all_trade_rows = process_timeframe_data( data_1min, data_1min, [stop_loss_pct], rule, initial_usd, strategy_config, debug=debug ) return results_rows, all_trade_rows def aggregate_results(all_rows): """Aggregate results per stop_loss_pct and per rule (timeframe)""" from collections import defaultdict grouped = defaultdict(list) for row in all_rows: key = (row['timeframe'], row['stop_loss_pct']) grouped[key].append(row) summary_rows = [] for (rule, stop_loss_pct), rows in grouped.items(): n_months = len(rows) total_trades = sum(r['n_trades'] for r in rows) total_stop_loss = sum(r['n_stop_loss'] for r in rows) avg_win_rate = np.mean([r['win_rate'] for r in rows]) avg_max_drawdown = np.mean([r['max_drawdown'] for r in rows]) avg_avg_trade = np.mean([r['avg_trade'] for r in rows]) avg_profit_ratio = np.mean([r['profit_ratio'] for r in rows]) # Calculate final USD final_usd = np.mean([r.get('final_usd', initial_usd) for r in rows]) total_fees_usd = np.mean([r.get('total_fees_usd') for r in rows]) summary_rows.append({ "timeframe": rule, "stop_loss_pct": stop_loss_pct, "n_trades": total_trades, "n_stop_loss": total_stop_loss, "win_rate": avg_win_rate, "max_drawdown": avg_max_drawdown, "avg_trade": avg_avg_trade, "profit_ratio": avg_profit_ratio, "initial_usd": initial_usd, "final_usd": final_usd, "total_fees_usd": total_fees_usd, }) return summary_rows def get_nearest_price(df, target_date): if len(df) == 0: return None, None target_ts = pd.to_datetime(target_date) nearest_idx = df.index.get_indexer([target_ts], method='nearest')[0] nearest_time = df.index[nearest_idx] price = df.iloc[nearest_idx]['close'] return nearest_time, price if __name__ == "__main__": debug = True parser = argparse.ArgumentParser(description="Run backtest with config file.") parser.add_argument("config", type=str, nargs="?", help="Path to config JSON file.") args = parser.parse_args() # Default values (from config.json) default_config = { "start_date": "2025-03-01", "stop_date": datetime.datetime.today().strftime('%Y-%m-%d'), "initial_usd": 10000, "timeframes": ["15min"], "stop_loss_pcts": [0.03], "strategies": [ { "name": "default", "weight": 1.0, "params": {} } ], "combination_rules": { "entry": "any", "exit": "any", "min_confidence": 0.5 } } if args.config: with open(args.config, 'r') as f: config = json.load(f) elif not debug: print("No config file provided. Please enter the following values (press Enter to use default):") start_date = input(f"Start date [{default_config['start_date']}]: ") or default_config['start_date'] stop_date = input(f"Stop date [{default_config['stop_date']}]: ") or default_config['stop_date'] initial_usd_str = input(f"Initial USD [{default_config['initial_usd']}]: ") or str(default_config['initial_usd']) initial_usd = float(initial_usd_str) timeframes_str = input(f"Timeframes (comma separated) [{', '.join(default_config['timeframes'])}]: ") or ','.join(default_config['timeframes']) timeframes = [tf.strip() for tf in timeframes_str.split(',') if tf.strip()] stop_loss_pcts_str = input(f"Stop loss pcts (comma separated) [{', '.join(str(x) for x in default_config['stop_loss_pcts'])}]: ") or ','.join(str(x) for x in default_config['stop_loss_pcts']) stop_loss_pcts = [float(x.strip()) for x in stop_loss_pcts_str.split(',') if x.strip()] config = { 'start_date': start_date, 'stop_date': stop_date, 'initial_usd': initial_usd, 'timeframes': timeframes, 'stop_loss_pcts': stop_loss_pcts, 'strategies': default_config['strategies'], 'combination_rules': default_config['combination_rules'] } else: config = default_config start_date = config['start_date'] stop_date = config['stop_date'] initial_usd = config['initial_usd'] timeframes = config['timeframes'] stop_loss_pcts = config['stop_loss_pcts'] # Extract strategy configuration strategy_config = { "strategies": config.get('strategies', default_config['strategies']), "combination_rules": config.get('combination_rules', default_config['combination_rules']) } timestamp = datetime.datetime.now().strftime("%Y_%m_%d_%H_%M") storage = Storage(logging=logging) system_utils = SystemUtils(logging=logging) data_1min = storage.load_data('btcusd_1-min_data.csv', start_date, stop_date) nearest_start_time, start_price = get_nearest_price(data_1min, start_date) nearest_stop_time, stop_price = get_nearest_price(data_1min, stop_date) metadata_lines = [ f"Start date\t{start_date}\tPrice\t{start_price}", f"Stop date\t{stop_date}\tPrice\t{stop_price}", f"Initial USD\t{initial_usd}" ] # Create tasks for each (timeframe, stop_loss_pct) combination tasks = [ (name, data_1min, stop_loss_pct, initial_usd, strategy_config) for name in timeframes for stop_loss_pct in stop_loss_pcts ] if debug: all_results_rows = [] all_trade_rows = [] for task in tasks: results, trades = process(task, debug) if results or trades: all_results_rows.extend(results) all_trade_rows.extend(trades) else: workers = system_utils.get_optimal_workers() with concurrent.futures.ProcessPoolExecutor(max_workers=workers) as executor: futures = {executor.submit(process, task, debug): task for task in tasks} all_results_rows = [] all_trade_rows = [] for future in concurrent.futures.as_completed(futures): results, trades = future.result() if results or trades: all_results_rows.extend(results) all_trade_rows.extend(trades) backtest_filename = os.path.join(f"{timestamp}_backtest.csv") backtest_fieldnames = [ "timeframe", "stop_loss_pct", "n_trades", "n_stop_loss", "win_rate", "max_drawdown", "avg_trade", "profit_ratio", "final_usd", "total_fees_usd" ] storage.write_backtest_results(backtest_filename, backtest_fieldnames, all_results_rows, metadata_lines) trades_fieldnames = ["entry_time", "exit_time", "entry_price", "exit_price", "profit_pct", "type", "fee_usd"] storage.write_trades(all_trade_rows, trades_fieldnames)