import pandas as pd import numpy as np import logging import concurrent.futures import os import datetime import argparse import json import ast from cycles.utils.storage import Storage from cycles.utils.system import SystemUtils from cycles.backtest import Backtest logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", handlers=[ logging.FileHandler("backtest.log"), logging.StreamHandler() ] ) def process_timeframe_data(min1_df, df, stop_loss_pcts, rule_name, initial_usd, debug=False): """Process the entire timeframe with all stop loss values (no monthly split)""" df = df.copy().reset_index(drop=True) results_rows = [] trade_rows = [] for stop_loss_pct in stop_loss_pcts: results = Backtest.run( min1_df, df, initial_usd=initial_usd, stop_loss_pct=stop_loss_pct, debug=debug ) n_trades = results["n_trades"] trades = results.get('trades', []) wins = [1 for t in trades if t['exit'] is not None and t['exit'] > t['entry']] n_winning_trades = len(wins) total_profit = sum(trade['profit_pct'] for trade in trades) total_loss = sum(-trade['profit_pct'] for trade in trades if trade['profit_pct'] < 0) win_rate = n_winning_trades / n_trades if n_trades > 0 else 0 avg_trade = total_profit / n_trades if n_trades > 0 else 0 profit_ratio = total_profit / total_loss if total_loss > 0 else float('inf') cumulative_profit = 0 max_drawdown = 0 peak = 0 for trade in trades: cumulative_profit += trade['profit_pct'] if cumulative_profit > peak: peak = cumulative_profit drawdown = peak - cumulative_profit if drawdown > max_drawdown: max_drawdown = drawdown final_usd = initial_usd for trade in trades: final_usd *= (1 + trade['profit_pct']) total_fees_usd = sum(trade.get('fee_usd', 0.0) for trade in trades) row = { "timeframe": rule_name, "stop_loss_pct": stop_loss_pct, "n_trades": n_trades, "n_stop_loss": sum(1 for trade in trades if 'type' in trade and trade['type'] == 'STOP'), "win_rate": win_rate, "max_drawdown": max_drawdown, "avg_trade": avg_trade, "total_profit": total_profit, "total_loss": total_loss, "profit_ratio": profit_ratio, "initial_usd": initial_usd, "final_usd": final_usd, "total_fees_usd": total_fees_usd, } results_rows.append(row) for trade in trades: trade_rows.append({ "timeframe": rule_name, "stop_loss_pct": stop_loss_pct, "entry_time": trade.get("entry_time"), "exit_time": trade.get("exit_time"), "entry_price": trade.get("entry"), "exit_price": trade.get("exit"), "profit_pct": trade.get("profit_pct"), "type": trade.get("type"), "fee_usd": trade.get("fee_usd"), }) logging.info(f"Timeframe: {rule_name}, Stop Loss: {stop_loss_pct}, Trades: {n_trades}") if debug: for trade in trades: if trade['type'] == 'STOP': print(trade) for trade in trades: if trade['profit_pct'] < -0.09: # or whatever is close to -0.10 print("Large loss trade:", trade) return results_rows, trade_rows def process(timeframe_info, debug=False): """Process a single (timeframe, stop_loss_pct) combination (no monthly split)""" rule, data_1min, stop_loss_pct, initial_usd = timeframe_info if rule == "1T": df = data_1min.copy() else: df = data_1min.resample(rule).agg({ 'open': 'first', 'high': 'max', 'low': 'min', 'close': 'last', 'volume': 'sum' }).dropna() df = df.reset_index() results_rows, all_trade_rows = process_timeframe_data(data_1min, df, [stop_loss_pct], rule, initial_usd, debug=debug) return results_rows, all_trade_rows def aggregate_results(all_rows): """Aggregate results per stop_loss_pct and per rule (timeframe)""" from collections import defaultdict grouped = defaultdict(list) for row in all_rows: key = (row['timeframe'], row['stop_loss_pct']) grouped[key].append(row) summary_rows = [] for (rule, stop_loss_pct), rows in grouped.items(): n_months = len(rows) total_trades = sum(r['n_trades'] for r in rows) total_stop_loss = sum(r['n_stop_loss'] for r in rows) avg_win_rate = np.mean([r['win_rate'] for r in rows]) avg_max_drawdown = np.mean([r['max_drawdown'] for r in rows]) avg_avg_trade = np.mean([r['avg_trade'] for r in rows]) avg_profit_ratio = np.mean([r['profit_ratio'] for r in rows]) # Calculate final USD final_usd = np.mean([r.get('final_usd', initial_usd) for r in rows]) total_fees_usd = np.mean([r.get('total_fees_usd') for r in rows]) summary_rows.append({ "timeframe": rule, "stop_loss_pct": stop_loss_pct, "n_trades": total_trades, "n_stop_loss": total_stop_loss, "win_rate": avg_win_rate, "max_drawdown": avg_max_drawdown, "avg_trade": avg_avg_trade, "profit_ratio": avg_profit_ratio, "initial_usd": initial_usd, "final_usd": final_usd, "total_fees_usd": total_fees_usd, }) return summary_rows def get_nearest_price(df, target_date): if len(df) == 0: return None, None target_ts = pd.to_datetime(target_date) nearest_idx = df.index.get_indexer([target_ts], method='nearest')[0] nearest_time = df.index[nearest_idx] price = df.iloc[nearest_idx]['close'] return nearest_time, price if __name__ == "__main__": debug = False parser = argparse.ArgumentParser(description="Run backtest with config file.") parser.add_argument("config", type=str, nargs="?", help="Path to config JSON file.") args = parser.parse_args() # Default values (from config.json) default_config = { "start_date": "2024-05-15", "stop_date": datetime.datetime.today().strftime('%Y-%m-%d'), "initial_usd": 10000, "timeframes": ["1D"], "stop_loss_pcts": [0.01, 0.02, 0.03], } if args.config: with open(args.config, 'r') as f: config = json.load(f) else: print("No config file provided. Please enter the following values (press Enter to use default):") start_date = input(f"Start date [{default_config['start_date']}]: ") or default_config['start_date'] stop_date = input(f"Stop date [{default_config['stop_date']}]: ") or default_config['stop_date'] initial_usd_str = input(f"Initial USD [{default_config['initial_usd']}]: ") or str(default_config['initial_usd']) initial_usd = float(initial_usd_str) timeframes_str = input(f"Timeframes (comma separated) [{', '.join(default_config['timeframes'])}]: ") or ','.join(default_config['timeframes']) timeframes = [tf.strip() for tf in timeframes_str.split(',') if tf.strip()] stop_loss_pcts_str = input(f"Stop loss pcts (comma separated) [{', '.join(str(x) for x in default_config['stop_loss_pcts'])}]: ") or ','.join(str(x) for x in default_config['stop_loss_pcts']) stop_loss_pcts = [float(x.strip()) for x in stop_loss_pcts_str.split(',') if x.strip()] config = { 'start_date': start_date, 'stop_date': stop_date, 'initial_usd': initial_usd, 'timeframes': timeframes, 'stop_loss_pcts': stop_loss_pcts, } # Use config values start_date = config['start_date'] stop_date = config['stop_date'] initial_usd = config['initial_usd'] timeframes = config['timeframes'] stop_loss_pcts = config['stop_loss_pcts'] timestamp = datetime.datetime.now().strftime("%Y_%m_%d_%H_%M") storage = Storage(logging=logging) system_utils = SystemUtils(logging=logging) data_1min = storage.load_data('btcusd_1-min_data.csv', start_date, stop_date) nearest_start_time, start_price = get_nearest_price(data_1min, start_date) nearest_stop_time, stop_price = get_nearest_price(data_1min, stop_date) metadata_lines = [ f"Start date\t{start_date}\tPrice\t{start_price}", f"Stop date\t{stop_date}\tPrice\t{stop_price}", f"Initial USD\t{initial_usd}" ] tasks = [ (name, data_1min, stop_loss_pct, initial_usd) for name in timeframes for stop_loss_pct in stop_loss_pcts ] workers = system_utils.get_optimal_workers() if debug: all_results_rows = [] all_trade_rows = [] for task in tasks: results, trades = process(task, debug) if results or trades: all_results_rows.extend(results) all_trade_rows.extend(trades) else: with concurrent.futures.ProcessPoolExecutor(max_workers=workers) as executor: futures = {executor.submit(process, task, debug): task for task in tasks} all_results_rows = [] all_trade_rows = [] for future in concurrent.futures.as_completed(futures): results, trades = future.result() if results or trades: all_results_rows.extend(results) all_trade_rows.extend(trades) backtest_filename = os.path.join(f"{timestamp}_backtest.csv") backtest_fieldnames = [ "timeframe", "stop_loss_pct", "n_trades", "n_stop_loss", "win_rate", "max_drawdown", "avg_trade", "profit_ratio", "final_usd", "total_fees_usd" ] storage.write_backtest_results(backtest_filename, backtest_fieldnames, all_results_rows, metadata_lines) trades_fieldnames = ["entry_time", "exit_time", "entry_price", "exit_price", "profit_pct", "type", "fee_usd"] storage.write_trades(all_trade_rows, trades_fieldnames)