336 lines
13 KiB
Python
336 lines
13 KiB
Python
import pandas as pd
|
|
import numpy as np
|
|
import logging
|
|
import concurrent.futures
|
|
import os
|
|
import datetime
|
|
import argparse
|
|
import json
|
|
|
|
from cycles.utils.storage import Storage
|
|
from cycles.utils.system import SystemUtils
|
|
from cycles.backtest import Backtest
|
|
from cycles.Analysis.supertrend import Supertrends
|
|
from cycles.charts import BacktestCharts
|
|
from cycles.Analysis.strategies import Strategy
|
|
from cycles.strategies import StrategyManager, create_strategy_manager
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s [%(levelname)s] %(message)s",
|
|
handlers=[
|
|
logging.FileHandler("backtest.log"),
|
|
logging.StreamHandler()
|
|
]
|
|
)
|
|
|
|
def strategy_manager_init(backtester: Backtest):
|
|
"""Strategy Manager initialization function"""
|
|
# This will be called by Backtest.__init__, but actual initialization
|
|
# happens in strategy_manager.initialize()
|
|
pass
|
|
|
|
def strategy_manager_entry(backtester: Backtest, df_index: int):
|
|
"""Strategy Manager entry function"""
|
|
return backtester.strategy_manager.get_entry_signal(backtester, df_index)
|
|
|
|
def strategy_manager_exit(backtester: Backtest, df_index: int):
|
|
"""Strategy Manager exit function"""
|
|
return backtester.strategy_manager.get_exit_signal(backtester, df_index)
|
|
|
|
def process_timeframe_data(data_1min, timeframe, config, debug=False):
|
|
"""Process a timeframe using Strategy Manager with configuration"""
|
|
|
|
results_rows = []
|
|
trade_rows = []
|
|
|
|
# Extract values from config
|
|
initial_usd = config['initial_usd']
|
|
strategy_config = {
|
|
"strategies": config['strategies'],
|
|
"combination_rules": config['combination_rules']
|
|
}
|
|
|
|
# Create and initialize strategy manager
|
|
if not strategy_config:
|
|
logging.error("No strategy configuration provided")
|
|
return results_rows, trade_rows
|
|
|
|
strategy_manager = create_strategy_manager(strategy_config)
|
|
|
|
# Get the primary timeframe from the first strategy for backtester setup
|
|
primary_strategy = strategy_manager.strategies[0]
|
|
primary_timeframe = primary_strategy.get_timeframes()[0]
|
|
|
|
# For BBRS strategy, it works with 1-minute data directly and handles internal resampling
|
|
# For other strategies, use their preferred timeframe
|
|
if primary_strategy.name == "bbrs":
|
|
# BBRS strategy processes 1-minute data and outputs signals on its internal timeframes
|
|
# Use 1-minute data for backtester working dataframe
|
|
working_df = data_1min.copy()
|
|
else:
|
|
# Other strategies specify their preferred timeframe
|
|
# Let the primary strategy resample the data to get the working dataframe
|
|
primary_strategy._resample_data(data_1min)
|
|
working_df = primary_strategy.get_primary_timeframe_data()
|
|
|
|
# Prepare working dataframe for backtester (ensure timestamp column)
|
|
working_df_for_backtest = working_df.copy().reset_index()
|
|
if 'index' in working_df_for_backtest.columns:
|
|
working_df_for_backtest = working_df_for_backtest.rename(columns={'index': 'timestamp'})
|
|
|
|
# Initialize backtest with strategy manager initialization
|
|
backtester = Backtest(initial_usd, working_df_for_backtest, working_df_for_backtest, strategy_manager_init)
|
|
|
|
# Store original min1_df for strategy processing
|
|
backtester.original_df = data_1min
|
|
|
|
# Attach strategy manager to backtester and initialize
|
|
backtester.strategy_manager = strategy_manager
|
|
strategy_manager.initialize(backtester)
|
|
|
|
# Run backtest with strategy manager functions
|
|
results = backtester.run(
|
|
strategy_manager_entry,
|
|
strategy_manager_exit,
|
|
debug
|
|
)
|
|
|
|
n_trades = results["n_trades"]
|
|
trades = results.get('trades', [])
|
|
wins = [1 for t in trades if t['exit'] is not None and t['exit'] > t['entry']]
|
|
n_winning_trades = len(wins)
|
|
total_profit = sum(trade['profit_pct'] for trade in trades)
|
|
total_loss = sum(-trade['profit_pct'] for trade in trades if trade['profit_pct'] < 0)
|
|
win_rate = n_winning_trades / n_trades if n_trades > 0 else 0
|
|
avg_trade = total_profit / n_trades if n_trades > 0 else 0
|
|
profit_ratio = total_profit / total_loss if total_loss > 0 else float('inf')
|
|
cumulative_profit = 0
|
|
max_drawdown = 0
|
|
peak = 0
|
|
|
|
for trade in trades:
|
|
cumulative_profit += trade['profit_pct']
|
|
|
|
if cumulative_profit > peak:
|
|
peak = cumulative_profit
|
|
drawdown = peak - cumulative_profit
|
|
|
|
if drawdown > max_drawdown:
|
|
max_drawdown = drawdown
|
|
|
|
final_usd = initial_usd
|
|
|
|
for trade in trades:
|
|
final_usd *= (1 + trade['profit_pct'])
|
|
|
|
total_fees_usd = sum(trade.get('fee_usd', 0.0) for trade in trades)
|
|
|
|
# Get stop_loss_pct from the first strategy for reporting
|
|
# In multi-strategy setups, strategies can have different stop_loss_pct values
|
|
stop_loss_pct = primary_strategy.params.get("stop_loss_pct", "N/A")
|
|
|
|
# Update row to include timeframe information
|
|
row = {
|
|
"timeframe": f"{timeframe}({primary_timeframe})", # Show actual timeframe used
|
|
"stop_loss_pct": stop_loss_pct,
|
|
"n_trades": n_trades,
|
|
"n_stop_loss": sum(1 for trade in trades if 'type' in trade and trade['type'] == 'STOP_LOSS'),
|
|
"win_rate": win_rate,
|
|
"max_drawdown": max_drawdown,
|
|
"avg_trade": avg_trade,
|
|
"total_profit": total_profit,
|
|
"total_loss": total_loss,
|
|
"profit_ratio": profit_ratio,
|
|
"initial_usd": initial_usd,
|
|
"final_usd": final_usd,
|
|
"total_fees_usd": total_fees_usd,
|
|
}
|
|
results_rows.append(row)
|
|
|
|
for trade in trades:
|
|
trade_rows.append({
|
|
"timeframe": f"{timeframe}({primary_timeframe})",
|
|
"stop_loss_pct": stop_loss_pct,
|
|
"entry_time": trade.get("entry_time"),
|
|
"exit_time": trade.get("exit_time"),
|
|
"entry_price": trade.get("entry"),
|
|
"exit_price": trade.get("exit"),
|
|
"profit_pct": trade.get("profit_pct"),
|
|
"type": trade.get("type"),
|
|
"fee_usd": trade.get("fee_usd"),
|
|
})
|
|
|
|
# Log strategy summary
|
|
strategy_summary = strategy_manager.get_strategy_summary()
|
|
logging.info(f"Timeframe: {timeframe}({primary_timeframe}), Stop Loss: {stop_loss_pct}, "
|
|
f"Trades: {n_trades}, Strategies: {[s['name'] for s in strategy_summary['strategies']]}")
|
|
|
|
if debug:
|
|
# Plot after each backtest run
|
|
try:
|
|
# Check if any strategy has processed_data for universal plotting
|
|
processed_data = None
|
|
for strategy in strategy_manager.strategies:
|
|
if hasattr(backtester, 'processed_data') and backtester.processed_data is not None:
|
|
processed_data = backtester.processed_data
|
|
break
|
|
|
|
if processed_data is not None and not processed_data.empty:
|
|
# Format strategy data with actual executed trades for universal plotting
|
|
formatted_data = BacktestCharts.format_strategy_data_with_trades(processed_data, results)
|
|
# Plot using universal function
|
|
BacktestCharts.plot_data(formatted_data)
|
|
else:
|
|
# Fallback to meta_trend plot if available
|
|
if "meta_trend" in backtester.strategies:
|
|
meta_trend = backtester.strategies["meta_trend"]
|
|
# Use the working dataframe for plotting
|
|
BacktestCharts.plot(working_df, meta_trend)
|
|
else:
|
|
print("No plotting data available")
|
|
except Exception as e:
|
|
print(f"Plotting failed: {e}")
|
|
|
|
return results_rows, trade_rows
|
|
|
|
def process(timeframe_info, debug=False):
|
|
"""Process a single timeframe with strategy config"""
|
|
timeframe, data_1min, config = timeframe_info
|
|
|
|
# Pass the essential data and full config
|
|
results_rows, all_trade_rows = process_timeframe_data(
|
|
data_1min, timeframe, config, debug=debug
|
|
)
|
|
return results_rows, all_trade_rows
|
|
|
|
def aggregate_results(all_rows):
|
|
"""Aggregate results per stop_loss_pct and per rule (timeframe)"""
|
|
from collections import defaultdict
|
|
|
|
grouped = defaultdict(list)
|
|
for row in all_rows:
|
|
key = (row['timeframe'], row['stop_loss_pct'])
|
|
grouped[key].append(row)
|
|
|
|
summary_rows = []
|
|
for (rule, stop_loss_pct), rows in grouped.items():
|
|
n_months = len(rows)
|
|
total_trades = sum(r['n_trades'] for r in rows)
|
|
total_stop_loss = sum(r['n_stop_loss'] for r in rows)
|
|
avg_win_rate = np.mean([r['win_rate'] for r in rows])
|
|
avg_max_drawdown = np.mean([r['max_drawdown'] for r in rows])
|
|
avg_avg_trade = np.mean([r['avg_trade'] for r in rows])
|
|
avg_profit_ratio = np.mean([r['profit_ratio'] for r in rows])
|
|
|
|
# Calculate final USD
|
|
final_usd = np.mean([r.get('final_usd', initial_usd) for r in rows])
|
|
total_fees_usd = np.mean([r.get('total_fees_usd') for r in rows])
|
|
|
|
summary_rows.append({
|
|
"timeframe": rule,
|
|
"stop_loss_pct": stop_loss_pct,
|
|
"n_trades": total_trades,
|
|
"n_stop_loss": total_stop_loss,
|
|
"win_rate": avg_win_rate,
|
|
"max_drawdown": avg_max_drawdown,
|
|
"avg_trade": avg_avg_trade,
|
|
"profit_ratio": avg_profit_ratio,
|
|
"initial_usd": initial_usd,
|
|
"final_usd": final_usd,
|
|
"total_fees_usd": total_fees_usd,
|
|
})
|
|
return summary_rows
|
|
|
|
def get_nearest_price(df, target_date):
|
|
if len(df) == 0:
|
|
return None, None
|
|
target_ts = pd.to_datetime(target_date)
|
|
nearest_idx = df.index.get_indexer([target_ts], method='nearest')[0]
|
|
nearest_time = df.index[nearest_idx]
|
|
price = df.iloc[nearest_idx]['close']
|
|
return nearest_time, price
|
|
|
|
if __name__ == "__main__":
|
|
debug = True
|
|
|
|
parser = argparse.ArgumentParser(description="Run backtest with config file.")
|
|
parser.add_argument("config", type=str, nargs="?", help="Path to config JSON file.")
|
|
args = parser.parse_args()
|
|
|
|
# Use config_default.json as fallback if no config provided
|
|
config_file = args.config or "configs/config_default.json"
|
|
|
|
try:
|
|
with open(config_file, 'r') as f:
|
|
config = json.load(f)
|
|
print(f"Using config: {config_file}")
|
|
except FileNotFoundError:
|
|
print(f"Error: Config file '{config_file}' not found.")
|
|
print("Available configs: configs/config_default.json, configs/config_bbrs.json, configs/config_combined.json")
|
|
exit(1)
|
|
except json.JSONDecodeError as e:
|
|
print(f"Error: Invalid JSON in config file '{config_file}': {e}")
|
|
exit(1)
|
|
|
|
start_date = config['start_date']
|
|
stop_date = config['stop_date']
|
|
initial_usd = config['initial_usd']
|
|
timeframes = config['timeframes']
|
|
|
|
timestamp = datetime.datetime.now().strftime("%Y_%m_%d_%H_%M")
|
|
|
|
storage = Storage(logging=logging)
|
|
system_utils = SystemUtils(logging=logging)
|
|
|
|
data_1min = storage.load_data('btcusd_1-min_data.csv', start_date, stop_date)
|
|
|
|
nearest_start_time, start_price = get_nearest_price(data_1min, start_date)
|
|
nearest_stop_time, stop_price = get_nearest_price(data_1min, stop_date)
|
|
|
|
metadata_lines = [
|
|
f"Start date\t{start_date}\tPrice\t{start_price}",
|
|
f"Stop date\t{stop_date}\tPrice\t{stop_price}",
|
|
f"Initial USD\t{initial_usd}"
|
|
]
|
|
|
|
# Create tasks for each timeframe
|
|
tasks = [
|
|
(name, data_1min, config)
|
|
for name in timeframes
|
|
]
|
|
|
|
if debug:
|
|
all_results_rows = []
|
|
all_trade_rows = []
|
|
for task in tasks:
|
|
results, trades = process(task, debug)
|
|
if results or trades:
|
|
all_results_rows.extend(results)
|
|
all_trade_rows.extend(trades)
|
|
else:
|
|
workers = system_utils.get_optimal_workers()
|
|
|
|
with concurrent.futures.ProcessPoolExecutor(max_workers=workers) as executor:
|
|
futures = {executor.submit(process, task, debug): task for task in tasks}
|
|
all_results_rows = []
|
|
all_trade_rows = []
|
|
|
|
for future in concurrent.futures.as_completed(futures):
|
|
results, trades = future.result()
|
|
|
|
if results or trades:
|
|
all_results_rows.extend(results)
|
|
all_trade_rows.extend(trades)
|
|
|
|
backtest_filename = os.path.join(f"{timestamp}_backtest.csv")
|
|
backtest_fieldnames = [
|
|
"timeframe", "stop_loss_pct", "n_trades", "n_stop_loss", "win_rate",
|
|
"max_drawdown", "avg_trade", "profit_ratio", "final_usd", "total_fees_usd"
|
|
]
|
|
storage.write_backtest_results(backtest_filename, backtest_fieldnames, all_results_rows, metadata_lines)
|
|
|
|
trades_fieldnames = ["entry_time", "exit_time", "entry_price", "exit_price", "profit_pct", "type", "fee_usd"]
|
|
storage.write_trades(all_trade_rows, trades_fieldnames)
|
|
|
|
|