import pandas as pd import numpy as np from trend_detector_simple import TrendDetectorSimple import csv import logging import concurrent.futures import os import psutil import datetime import gspread from google.oauth2.service_account import Credentials from collections import defaultdict import threading import queue import time import math import json from taxes import Taxes # Set up logging logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", handlers=[ logging.FileHandler("backtest.log"), logging.StreamHandler() ] ) def get_optimal_workers(): """Determine optimal number of worker processes based on system resources""" cpu_count = os.cpu_count() or 4 memory_gb = psutil.virtual_memory().total / (1024**3) # Heuristic: Use 75% of cores, but cap based on available memory # Assume each worker needs ~2GB for large datasets workers_by_memory = max(1, int(memory_gb / 2)) workers_by_cpu = max(1, int(cpu_count * 0.75)) return min(workers_by_cpu, workers_by_memory) def load_data(file_path, start_date, stop_date): """Load data with optimized dtypes and filtering, supporting CSV and JSON input""" # Determine file type _, ext = os.path.splitext(file_path) ext = ext.lower() if ext == ".json": with open(file_path, 'r') as f: raw = json.load(f) data = pd.DataFrame(raw["Data"]) # Convert columns to lowercase data.columns = data.columns.str.lower() # Convert timestamp to datetime data["timestamp"] = pd.to_datetime(data["timestamp"], unit="s") # Filter by date range data = data[(data["timestamp"] >= start_date) & (data["timestamp"] <= stop_date)] return data.set_index("timestamp") else: # Define optimized dtypes dtypes = { 'Open': 'float32', 'High': 'float32', 'Low': 'float32', 'Close': 'float32', 'Volume': 'float32' } # Read data with original capitalized column names data = pd.read_csv(file_path, dtype=dtypes) # Convert timestamp to datetime data['Timestamp'] = pd.to_datetime(data['Timestamp'], unit='s') # Filter by date range data = data[(data['Timestamp'] >= start_date) & (data['Timestamp'] <= stop_date)] # Now convert column names to lowercase data.columns = data.columns.str.lower() return data.set_index('timestamp') def process_timeframe_data(min1_df, df, stop_loss_pcts, rule_name, initial_usd, debug=False): """Process the entire timeframe with all stop loss values (no monthly split)""" df = df.copy().reset_index(drop=True) trend_detector = TrendDetectorSimple(df, verbose=False) results_rows = [] trade_rows = [] for stop_loss_pct in stop_loss_pcts: results = trend_detector.backtest_meta_supertrend( min1_df, initial_usd=initial_usd, stop_loss_pct=stop_loss_pct, debug=debug ) n_trades = results["n_trades"] trades = results.get('trades', []) wins = [1 for t in trades if t['exit'] is not None and t['exit'] > t['entry']] n_winning_trades = len(wins) total_profit = sum(trade['profit_pct'] for trade in trades) total_loss = sum(-trade['profit_pct'] for trade in trades if trade['profit_pct'] < 0) win_rate = n_winning_trades / n_trades if n_trades > 0 else 0 avg_trade = total_profit / n_trades if n_trades > 0 else 0 profit_ratio = total_profit / total_loss if total_loss > 0 else float('inf') cumulative_profit = 0 max_drawdown = 0 peak = 0 for trade in trades: cumulative_profit += trade['profit_pct'] if cumulative_profit > peak: peak = cumulative_profit drawdown = peak - cumulative_profit if drawdown > max_drawdown: max_drawdown = drawdown final_usd = initial_usd for trade in trades: final_usd *= (1 + trade['profit_pct']) total_fees_usd = sum(trade.get('fee_usd', 0.0) for trade in trades) row = { "timeframe": rule_name, "stop_loss_pct": stop_loss_pct, "n_trades": n_trades, "n_stop_loss": sum(1 for trade in trades if 'type' in trade and trade['type'] == 'STOP'), "win_rate": win_rate, "max_drawdown": max_drawdown, "avg_trade": avg_trade, "total_profit": total_profit, "total_loss": total_loss, "profit_ratio": profit_ratio, "initial_usd": initial_usd, "final_usd": final_usd, "total_fees_usd": total_fees_usd, } results_rows.append(row) for trade in trades: trade_rows.append({ "timeframe": rule_name, "stop_loss_pct": stop_loss_pct, "entry_time": trade.get("entry_time"), "exit_time": trade.get("exit_time"), "entry_price": trade.get("entry"), "exit_price": trade.get("exit"), "profit_pct": trade.get("profit_pct"), "type": trade.get("type"), "fee_usd": trade.get("fee_usd"), }) logging.info(f"Timeframe: {rule_name}, Stop Loss: {stop_loss_pct}, Trades: {n_trades}") if debug: for trade in trades: if trade['type'] == 'STOP': print(trade) for trade in trades: if trade['profit_pct'] < -0.09: # or whatever is close to -0.10 print("Large loss trade:", trade) return results_rows, trade_rows def process_timeframe(timeframe_info, debug=False): """Process a single (timeframe, stop_loss_pct) combination (no monthly split)""" rule, data_1min, stop_loss_pct, initial_usd = timeframe_info if rule == "1T": df = data_1min.copy() else: df = data_1min.resample(rule).agg({ 'open': 'first', 'high': 'max', 'low': 'min', 'close': 'last', 'volume': 'sum' }).dropna() df = df.reset_index() # Only process one stop loss results_rows, all_trade_rows = process_timeframe_data(data_1min, df, [stop_loss_pct], rule, initial_usd, debug=debug) return results_rows, all_trade_rows def write_results_chunk(filename, fieldnames, rows, write_header=False): """Write a chunk of results to a CSV file""" mode = 'w' if write_header else 'a' with open(filename, mode, newline="") as csvfile: writer = csv.DictWriter(csvfile, fieldnames=fieldnames) if write_header: csvfile.write(f"# initial_usd: {initial_usd}\n") writer.writeheader() for row in rows: # Only keep keys that are in fieldnames filtered_row = {k: v for k, v in row.items() if k in fieldnames} writer.writerow(filtered_row) def aggregate_results(all_rows): """Aggregate results per stop_loss_pct and per rule (timeframe)""" from collections import defaultdict grouped = defaultdict(list) for row in all_rows: key = (row['timeframe'], row['stop_loss_pct']) grouped[key].append(row) summary_rows = [] for (rule, stop_loss_pct), rows in grouped.items(): n_months = len(rows) total_trades = sum(r['n_trades'] for r in rows) total_stop_loss = sum(r['n_stop_loss'] for r in rows) avg_win_rate = np.mean([r['win_rate'] for r in rows]) avg_max_drawdown = np.mean([r['max_drawdown'] for r in rows]) avg_avg_trade = np.mean([r['avg_trade'] for r in rows]) avg_profit_ratio = np.mean([r['profit_ratio'] for r in rows]) # Calculate final USD final_usd = np.mean([r.get('final_usd', initial_usd) for r in rows]) total_fees_usd = np.mean([r.get('total_fees_usd') for r in rows]) summary_rows.append({ "timeframe": rule, "stop_loss_pct": stop_loss_pct, "n_trades": total_trades, "n_stop_loss": total_stop_loss, "win_rate": avg_win_rate, "max_drawdown": avg_max_drawdown, "avg_trade": avg_avg_trade, "profit_ratio": avg_profit_ratio, "initial_usd": initial_usd, "final_usd": final_usd, "total_fees_usd": total_fees_usd, }) return summary_rows if __name__ == "__main__": # Configuration # start_date = '2022-01-01' # stop_date = '2023-01-01' start_date = '2024-05-15' stop_date = '2025-05-15' initial_usd = 10000 debug = False results_dir = "results" os.makedirs(results_dir, exist_ok=True) timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M") timeframes = ["1min", "5min", "15min"] stop_loss_pcts = [0.01, 0.02, 0.03, 0.04, 0.05] data_1min = load_data('./data/btcusd_1-min_data.csv', start_date, stop_date) def get_nearest_price(df, target_date): if len(df) == 0: return None, None target_ts = pd.to_datetime(target_date) nearest_idx = df.index.get_indexer([target_ts], method='nearest')[0] nearest_time = df.index[nearest_idx] price = df.iloc[nearest_idx]['close'] return nearest_time, price nearest_start_time, start_price = get_nearest_price(data_1min, start_date) nearest_stop_time, stop_price = get_nearest_price(data_1min, stop_date) logging.info(f"Price at start_date ({start_date}) [nearest timestamp: {nearest_start_time}]: {start_price}") logging.info(f"Price at stop_date ({stop_date}) [nearest timestamp: {nearest_stop_time}]: {stop_price}") tasks = [ (name, data_1min, stop_loss_pct, initial_usd) for name in timeframes for stop_loss_pct in stop_loss_pcts ] workers = get_optimal_workers() logging.info(f"Using {workers} workers for processing") # Process tasks with optimized concurrency with concurrent.futures.ProcessPoolExecutor(max_workers=workers) as executor: futures = {executor.submit(process_timeframe, task, debug): task for task in tasks} all_results_rows = [] all_trade_rows = [] for future in concurrent.futures.as_completed(futures): results, trades = future.result() if results or trades: all_results_rows.extend(results) all_trade_rows.extend(trades) # Write all results to a single CSV file combined_filename = os.path.join(results_dir, f"{timestamp}_backtest_combined.csv") combined_fieldnames = [ "timeframe", "stop_loss_pct", "n_trades", "n_stop_loss", "win_rate", "max_drawdown", "avg_trade", "profit_ratio", "final_usd", "total_fees_usd" ] def format_row(row): return { "timeframe": row["timeframe"], "stop_loss_pct": f"{row['stop_loss_pct']*100:.2f}%", "n_trades": row["n_trades"], "n_stop_loss": row["n_stop_loss"], "win_rate": f"{row['win_rate']*100:.2f}%", "max_drawdown": f"{row['max_drawdown']*100:.2f}%", "avg_trade": f"{row['avg_trade']*100:.2f}%", "profit_ratio": f"{row['profit_ratio']*100:.2f}%", "final_usd": f"{row['final_usd']:.2f}", "total_fees_usd": f"{row.get('total_fees_usd'):.2f}", } with open(combined_filename, "w", newline="") as csvfile: writer = csv.DictWriter(csvfile, fieldnames=combined_fieldnames, delimiter='\t') writer.writeheader() for row in all_results_rows: writer.writerow(format_row(row)) logging.info(f"Combined results written to {combined_filename}") # Now, group all_trade_rows by (timeframe, stop_loss_pct) from collections import defaultdict trades_by_combo = defaultdict(list) for trade in all_trade_rows: tf = trade.get("timeframe") sl = trade.get("stop_loss_pct") trades_by_combo[(tf, sl)].append(trade) trades_fieldnames = [ "entry_time", "exit_time", "entry_price", "exit_price", "profit_pct", "type", "fee_usd" ] for (tf, sl), trades in trades_by_combo.items(): sl_percent = int(round(sl * 100)) trades_filename = os.path.join(results_dir, f"trades_{tf}_ST{sl_percent}pct.csv") with open(trades_filename, "w", newline="") as csvfile: writer = csv.DictWriter(csvfile, fieldnames=trades_fieldnames) writer.writeheader() for trade in trades: row = {k: trade.get(k, "") for k in trades_fieldnames} fee = trade.get("fee_usd") row["fee_usd"] = f"{float(fee):.2f}" writer.writerow(row) logging.info(f"Trades written to {trades_filename}")