Cycles/main.py

336 lines
13 KiB
Python
Raw Normal View History

2025-05-06 15:24:36 +08:00
import pandas as pd
import numpy as np
import logging
import concurrent.futures
import os
import datetime
import argparse
import json
2025-05-20 16:59:17 +08:00
from cycles.utils.storage import Storage
from cycles.utils.system import SystemUtils
from cycles.backtest import Backtest
from cycles.Analysis.supertrend import Supertrends
from cycles.charts import BacktestCharts
from cycles.Analysis.strategies import Strategy
from cycles.strategies import StrategyManager, create_strategy_manager
2025-05-06 15:24:36 +08:00
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.FileHandler("backtest.log"),
logging.StreamHandler()
]
)
def strategy_manager_init(backtester: Backtest):
"""Strategy Manager initialization function"""
# This will be called by Backtest.__init__, but actual initialization
# happens in strategy_manager.initialize()
pass
def strategy_manager_entry(backtester: Backtest, df_index: int):
"""Strategy Manager entry function"""
return backtester.strategy_manager.get_entry_signal(backtester, df_index)
def strategy_manager_exit(backtester: Backtest, df_index: int):
"""Strategy Manager exit function"""
return backtester.strategy_manager.get_exit_signal(backtester, df_index)
def process_timeframe_data(data_1min, timeframe, config, debug=False):
"""Process a timeframe using Strategy Manager with configuration"""
results_rows = []
trade_rows = []
# Extract values from config
initial_usd = config['initial_usd']
strategy_config = {
"strategies": config['strategies'],
"combination_rules": config['combination_rules']
}
# Create and initialize strategy manager
if not strategy_config:
logging.error("No strategy configuration provided")
return results_rows, trade_rows
strategy_manager = create_strategy_manager(strategy_config)
# Get the primary timeframe from the first strategy for backtester setup
primary_strategy = strategy_manager.strategies[0]
primary_timeframe = primary_strategy.get_timeframes()[0]
# For BBRS strategy, it works with 1-minute data directly and handles internal resampling
# For other strategies, use their preferred timeframe
if primary_strategy.name == "bbrs":
# BBRS strategy processes 1-minute data and outputs signals on its internal timeframes
# Use 1-minute data for backtester working dataframe
working_df = data_1min.copy()
else:
# Other strategies specify their preferred timeframe
# Let the primary strategy resample the data to get the working dataframe
primary_strategy._resample_data(data_1min)
working_df = primary_strategy.get_primary_timeframe_data()
# Prepare working dataframe for backtester (ensure timestamp column)
working_df_for_backtest = working_df.copy().reset_index()
if 'index' in working_df_for_backtest.columns:
working_df_for_backtest = working_df_for_backtest.rename(columns={'index': 'timestamp'})
# Initialize backtest with strategy manager initialization
backtester = Backtest(initial_usd, working_df_for_backtest, working_df_for_backtest, strategy_manager_init)
# Store original min1_df for strategy processing
backtester.original_df = data_1min
# Attach strategy manager to backtester and initialize
backtester.strategy_manager = strategy_manager
strategy_manager.initialize(backtester)
# Run backtest with strategy manager functions
results = backtester.run(
strategy_manager_entry,
strategy_manager_exit,
debug
)
n_trades = results["n_trades"]
trades = results.get('trades', [])
wins = [1 for t in trades if t['exit'] is not None and t['exit'] > t['entry']]
n_winning_trades = len(wins)
total_profit = sum(trade['profit_pct'] for trade in trades)
total_loss = sum(-trade['profit_pct'] for trade in trades if trade['profit_pct'] < 0)
win_rate = n_winning_trades / n_trades if n_trades > 0 else 0
avg_trade = total_profit / n_trades if n_trades > 0 else 0
profit_ratio = total_profit / total_loss if total_loss > 0 else float('inf')
cumulative_profit = 0
max_drawdown = 0
peak = 0
for trade in trades:
cumulative_profit += trade['profit_pct']
if cumulative_profit > peak:
peak = cumulative_profit
drawdown = peak - cumulative_profit
if drawdown > max_drawdown:
max_drawdown = drawdown
final_usd = initial_usd
for trade in trades:
final_usd *= (1 + trade['profit_pct'])
total_fees_usd = sum(trade.get('fee_usd', 0.0) for trade in trades)
# Get stop_loss_pct from the first strategy for reporting
# In multi-strategy setups, strategies can have different stop_loss_pct values
stop_loss_pct = primary_strategy.params.get("stop_loss_pct", "N/A")
# Update row to include timeframe information
row = {
"timeframe": f"{timeframe}({primary_timeframe})", # Show actual timeframe used
"stop_loss_pct": stop_loss_pct,
"n_trades": n_trades,
"n_stop_loss": sum(1 for trade in trades if 'type' in trade and trade['type'] == 'STOP_LOSS'),
"win_rate": win_rate,
"max_drawdown": max_drawdown,
"avg_trade": avg_trade,
"total_profit": total_profit,
"total_loss": total_loss,
"profit_ratio": profit_ratio,
"initial_usd": initial_usd,
"final_usd": final_usd,
"total_fees_usd": total_fees_usd,
}
results_rows.append(row)
for trade in trades:
trade_rows.append({
"timeframe": f"{timeframe}({primary_timeframe})",
"stop_loss_pct": stop_loss_pct,
"entry_time": trade.get("entry_time"),
"exit_time": trade.get("exit_time"),
"entry_price": trade.get("entry"),
"exit_price": trade.get("exit"),
"profit_pct": trade.get("profit_pct"),
"type": trade.get("type"),
"fee_usd": trade.get("fee_usd"),
})
# Log strategy summary
strategy_summary = strategy_manager.get_strategy_summary()
logging.info(f"Timeframe: {timeframe}({primary_timeframe}), Stop Loss: {stop_loss_pct}, "
f"Trades: {n_trades}, Strategies: {[s['name'] for s in strategy_summary['strategies']]}")
if debug:
# Plot after each backtest run
try:
# Check if any strategy has processed_data for universal plotting
processed_data = None
for strategy in strategy_manager.strategies:
if hasattr(backtester, 'processed_data') and backtester.processed_data is not None:
processed_data = backtester.processed_data
break
if processed_data is not None and not processed_data.empty:
# Format strategy data with actual executed trades for universal plotting
formatted_data = BacktestCharts.format_strategy_data_with_trades(processed_data, results)
# Plot using universal function
BacktestCharts.plot_data(formatted_data)
else:
# Fallback to meta_trend plot if available
if "meta_trend" in backtester.strategies:
meta_trend = backtester.strategies["meta_trend"]
# Use the working dataframe for plotting
BacktestCharts.plot(working_df, meta_trend)
else:
print("No plotting data available")
except Exception as e:
print(f"Plotting failed: {e}")
return results_rows, trade_rows
2025-05-06 15:24:36 +08:00
def process(timeframe_info, debug=False):
"""Process a single timeframe with strategy config"""
timeframe, data_1min, config = timeframe_info
# Pass the essential data and full config
results_rows, all_trade_rows = process_timeframe_data(
data_1min, timeframe, config, debug=debug
)
return results_rows, all_trade_rows
def aggregate_results(all_rows):
"""Aggregate results per stop_loss_pct and per rule (timeframe)"""
from collections import defaultdict
grouped = defaultdict(list)
for row in all_rows:
key = (row['timeframe'], row['stop_loss_pct'])
grouped[key].append(row)
summary_rows = []
for (rule, stop_loss_pct), rows in grouped.items():
n_months = len(rows)
total_trades = sum(r['n_trades'] for r in rows)
total_stop_loss = sum(r['n_stop_loss'] for r in rows)
avg_win_rate = np.mean([r['win_rate'] for r in rows])
avg_max_drawdown = np.mean([r['max_drawdown'] for r in rows])
avg_avg_trade = np.mean([r['avg_trade'] for r in rows])
avg_profit_ratio = np.mean([r['profit_ratio'] for r in rows])
# Calculate final USD
final_usd = np.mean([r.get('final_usd', initial_usd) for r in rows])
total_fees_usd = np.mean([r.get('total_fees_usd') for r in rows])
summary_rows.append({
"timeframe": rule,
"stop_loss_pct": stop_loss_pct,
"n_trades": total_trades,
"n_stop_loss": total_stop_loss,
"win_rate": avg_win_rate,
"max_drawdown": avg_max_drawdown,
"avg_trade": avg_avg_trade,
"profit_ratio": avg_profit_ratio,
"initial_usd": initial_usd,
"final_usd": final_usd,
"total_fees_usd": total_fees_usd,
})
return summary_rows
2025-05-20 16:59:17 +08:00
def get_nearest_price(df, target_date):
if len(df) == 0:
return None, None
target_ts = pd.to_datetime(target_date)
nearest_idx = df.index.get_indexer([target_ts], method='nearest')[0]
nearest_time = df.index[nearest_idx]
price = df.iloc[nearest_idx]['close']
return nearest_time, price
if __name__ == "__main__":
debug = True
parser = argparse.ArgumentParser(description="Run backtest with config file.")
parser.add_argument("config", type=str, nargs="?", help="Path to config JSON file.")
args = parser.parse_args()
# Use config_default.json as fallback if no config provided
config_file = args.config or "configs/config_default.json"
try:
with open(config_file, 'r') as f:
config = json.load(f)
print(f"Using config: {config_file}")
except FileNotFoundError:
print(f"Error: Config file '{config_file}' not found.")
print("Available configs: configs/config_default.json, configs/config_bbrs.json, configs/config_combined.json")
exit(1)
except json.JSONDecodeError as e:
print(f"Error: Invalid JSON in config file '{config_file}': {e}")
exit(1)
start_date = config['start_date']
stop_date = config['stop_date']
initial_usd = config['initial_usd']
timeframes = config['timeframes']
timestamp = datetime.datetime.now().strftime("%Y_%m_%d_%H_%M")
2025-05-20 16:59:17 +08:00
storage = Storage(logging=logging)
system_utils = SystemUtils(logging=logging)
2025-05-20 16:59:17 +08:00
data_1min = storage.load_data('btcusd_1-min_data.csv', start_date, stop_date)
nearest_start_time, start_price = get_nearest_price(data_1min, start_date)
nearest_stop_time, stop_price = get_nearest_price(data_1min, stop_date)
metadata_lines = [
f"Start date\t{start_date}\tPrice\t{start_price}",
f"Stop date\t{stop_date}\tPrice\t{stop_price}",
f"Initial USD\t{initial_usd}"
]
# Create tasks for each timeframe
tasks = [
(name, data_1min, config)
for name in timeframes
]
if debug:
all_results_rows = []
all_trade_rows = []
for task in tasks:
results, trades = process(task, debug)
if results or trades:
all_results_rows.extend(results)
all_trade_rows.extend(trades)
else:
workers = system_utils.get_optimal_workers()
with concurrent.futures.ProcessPoolExecutor(max_workers=workers) as executor:
futures = {executor.submit(process, task, debug): task for task in tasks}
all_results_rows = []
all_trade_rows = []
for future in concurrent.futures.as_completed(futures):
results, trades = future.result()
if results or trades:
all_results_rows.extend(results)
all_trade_rows.extend(trades)
backtest_filename = os.path.join(f"{timestamp}_backtest.csv")
backtest_fieldnames = [
"timeframe", "stop_loss_pct", "n_trades", "n_stop_loss", "win_rate",
2025-05-21 15:14:00 +08:00
"max_drawdown", "avg_trade", "profit_ratio", "final_usd", "total_fees_usd"
]
storage.write_backtest_results(backtest_filename, backtest_fieldnames, all_results_rows, metadata_lines)
trades_fieldnames = ["entry_time", "exit_time", "entry_price", "exit_price", "profit_pct", "type", "fee_usd"]
2025-05-20 16:59:17 +08:00
storage.write_trades(all_trade_rows, trades_fieldnames)