Cycles/main.py
2025-05-28 02:50:40 +08:00

302 lines
11 KiB
Python

import pandas as pd
import numpy as np
import logging
import concurrent.futures
import os
import datetime
import argparse
import json
from cycles.utils.storage import Storage
from cycles.utils.system import SystemUtils
from cycles.backtest import Backtest
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.FileHandler("backtest.log"),
logging.StreamHandler()
]
)
def process_timeframe_data(min1_df, df, stop_loss_pcts, rule_name, initial_usd, debug=False):
"""Process the entire timeframe with all stop loss values (no monthly split)"""
df = df.copy().reset_index(drop=True)
results_rows = []
trade_rows = []
for stop_loss_pct in stop_loss_pcts:
results = Backtest.run(
min1_df,
df,
initial_usd=initial_usd,
stop_loss_pct=stop_loss_pct,
debug=debug
)
n_trades = results["n_trades"]
trades = results.get('trades', [])
wins = [1 for t in trades if t['exit'] is not None and t['exit'] > t['entry']]
n_winning_trades = len(wins)
total_profit = sum(trade['profit_pct'] for trade in trades)
total_loss = sum(-trade['profit_pct'] for trade in trades if trade['profit_pct'] < 0)
win_rate = n_winning_trades / n_trades if n_trades > 0 else 0
avg_trade = total_profit / n_trades if n_trades > 0 else 0
profit_ratio = total_profit / total_loss if total_loss > 0 else float('inf')
cumulative_profit = 0
max_drawdown = 0
peak = 0
for trade in trades:
cumulative_profit += trade['profit_pct']
if cumulative_profit > peak:
peak = cumulative_profit
drawdown = peak - cumulative_profit
if drawdown > max_drawdown:
max_drawdown = drawdown
final_usd = initial_usd
for trade in trades:
final_usd *= (1 + trade['profit_pct'])
total_fees_usd = sum(trade['fee_usd'] for trade in trades)
row = {
"timeframe": rule_name,
"stop_loss_pct": stop_loss_pct,
"n_trades": n_trades,
"n_stop_loss": sum(1 for trade in trades if 'type' in trade and trade['type'] == 'STOP'),
"win_rate": win_rate,
"max_drawdown": max_drawdown,
"avg_trade": avg_trade,
"total_profit": total_profit,
"total_loss": total_loss,
"profit_ratio": profit_ratio,
"initial_usd": initial_usd,
"final_usd": final_usd,
"total_fees_usd": total_fees_usd,
}
results_rows.append(row)
for trade in trades:
trade_rows.append({
"timeframe": rule_name,
"stop_loss_pct": stop_loss_pct,
"entry_time": trade.get("entry_time"),
"exit_time": trade.get("exit_time"),
"entry_price": trade.get("entry"),
"exit_price": trade.get("exit"),
"profit_pct": trade.get("profit_pct"),
"type": trade.get("type"),
"fee_usd": trade.get("fee_usd"),
})
logging.info(f"Timeframe: {rule_name}, Stop Loss: {stop_loss_pct}, Trades: {n_trades}")
if debug:
for trade in trades:
if trade['type'] == 'STOP':
print(trade)
for trade in trades:
if trade['profit_pct'] < -0.09: # or whatever is close to -0.10
print("Large loss trade:", trade)
return results_rows, trade_rows
def process(timeframe_info, debug=False):
from cycles.utils.storage import Storage # import inside function for safety
storage = Storage(logging=None) # or pass a logger if you want, but None is safest for multiprocessing
rule, data_1min, stop_loss_pct, initial_usd = timeframe_info
if rule == "1T" or rule == "1min":
df = data_1min.copy()
else:
df = data_1min.resample(rule).agg({
'open': 'first',
'high': 'max',
'low': 'min',
'close': 'last',
'volume': 'sum'
}).dropna()
df = df.reset_index()
results_rows, all_trade_rows = process_timeframe_data(data_1min, df, [stop_loss_pct], rule, initial_usd, debug=debug)
if all_trade_rows:
trades_fieldnames = ["entry_time", "exit_time", "entry_price", "exit_price", "profit_pct", "type", "fee_usd"]
# Prepare header
summary_fields = ["timeframe", "stop_loss_pct", "n_trades", "n_stop_loss", "win_rate", "max_drawdown", "avg_trade", "profit_ratio", "final_usd"]
summary_row = results_rows[0]
header_line = "\t".join(summary_fields) + "\n"
value_line = "\t".join(str(summary_row.get(f, "")) for f in summary_fields) + "\n"
# File name
tf = summary_row["timeframe"]
sl = summary_row["stop_loss_pct"]
sl_percent = int(round(sl * 100))
trades_filename = os.path.join(storage.results_dir, f"trades_{tf}_ST{sl_percent}pct.csv")
# Write header
with open(trades_filename, "w") as f:
f.write(header_line)
f.write(value_line)
# Now write trades (append mode, skip header)
with open(trades_filename, "a", newline="") as f:
import csv
writer = csv.DictWriter(f, fieldnames=trades_fieldnames)
writer.writeheader()
for trade in all_trade_rows:
writer.writerow({k: trade.get(k, "") for k in trades_fieldnames})
return results_rows, all_trade_rows
def aggregate_results(all_rows):
"""Aggregate results per stop_loss_pct and per rule (timeframe)"""
from collections import defaultdict
grouped = defaultdict(list)
for row in all_rows:
key = (row['timeframe'], row['stop_loss_pct'])
grouped[key].append(row)
summary_rows = []
for (rule, stop_loss_pct), rows in grouped.items():
total_trades = sum(r['n_trades'] for r in rows)
total_stop_loss = sum(r['n_stop_loss'] for r in rows)
avg_win_rate = np.mean([r['win_rate'] for r in rows])
avg_max_drawdown = np.mean([r['max_drawdown'] for r in rows])
avg_avg_trade = np.mean([r['avg_trade'] for r in rows])
avg_profit_ratio = np.mean([r['profit_ratio'] for r in rows])
# Calculate final USD
final_usd = np.mean([r.get('final_usd', initial_usd) for r in rows])
total_fees_usd = np.mean([r.get('total_fees_usd') for r in rows])
summary_rows.append({
"timeframe": rule,
"stop_loss_pct": stop_loss_pct,
"n_trades": total_trades,
"n_stop_loss": total_stop_loss,
"win_rate": avg_win_rate,
"max_drawdown": avg_max_drawdown,
"avg_trade": avg_avg_trade,
"profit_ratio": avg_profit_ratio,
"initial_usd": initial_usd,
"final_usd": final_usd,
"total_fees_usd": total_fees_usd,
})
return summary_rows
def get_nearest_price(df, target_date):
if len(df) == 0:
return None, None
target_ts = pd.to_datetime(target_date)
nearest_idx = df.index.get_indexer([target_ts], method='nearest')[0]
nearest_time = df.index[nearest_idx]
price = df.iloc[nearest_idx]['close']
return nearest_time, price
if __name__ == "__main__":
debug = False
parser = argparse.ArgumentParser(description="Run backtest with config file.")
parser.add_argument("config", type=str, nargs="?", help="Path to config JSON file.")
args = parser.parse_args()
# Default values (from config.json)
default_config = {
"start_date": "2025-05-01",
"stop_date": datetime.datetime.today().strftime('%Y-%m-%d'),
"initial_usd": 10000,
"timeframes": ["1D", "6h", "3h", "1h", "30m", "15m", "5m", "1m"],
"stop_loss_pcts": [0.01, 0.02, 0.03, 0.05],
}
if args.config:
with open(args.config, 'r') as f:
config = json.load(f)
else:
print("No config file provided. Please enter the following values (press Enter to use default):")
start_date = input(f"Start date [{default_config['start_date']}]: ") or default_config['start_date']
stop_date = input(f"Stop date [{default_config['stop_date']}]: ") or default_config['stop_date']
initial_usd_str = input(f"Initial USD [{default_config['initial_usd']}]: ") or str(default_config['initial_usd'])
initial_usd = float(initial_usd_str)
timeframes_str = input(f"Timeframes (comma separated) [{', '.join(default_config['timeframes'])}]: ") or ','.join(default_config['timeframes'])
timeframes = [tf.strip() for tf in timeframes_str.split(',') if tf.strip()]
stop_loss_pcts_str = input(f"Stop loss pcts (comma separated) [{', '.join(str(x) for x in default_config['stop_loss_pcts'])}]: ") or ','.join(str(x) for x in default_config['stop_loss_pcts'])
stop_loss_pcts = [float(x.strip()) for x in stop_loss_pcts_str.split(',') if x.strip()]
config = {
'start_date': start_date,
'stop_date': stop_date,
'initial_usd': initial_usd,
'timeframes': timeframes,
'stop_loss_pcts': stop_loss_pcts,
}
# Use config values
start_date = config['start_date']
stop_date = config['stop_date']
initial_usd = config['initial_usd']
timeframes = config['timeframes']
stop_loss_pcts = config['stop_loss_pcts']
timestamp = datetime.datetime.now().strftime("%Y_%m_%d_%H_%M")
storage = Storage(logging=logging)
system_utils = SystemUtils(logging=logging)
data_1min = storage.load_data('btcusd_1-min_data.csv', start_date, stop_date)
nearest_start_time, start_price = get_nearest_price(data_1min, start_date)
nearest_stop_time, stop_price = get_nearest_price(data_1min, stop_date)
metadata_lines = [
f"Start date\t{start_date}\tPrice\t{start_price}",
f"Stop date\t{stop_date}\tPrice\t{stop_price}",
f"Initial USD\t{initial_usd}"
]
tasks = [
(name, data_1min, stop_loss_pct, initial_usd)
for name in timeframes
for stop_loss_pct in stop_loss_pcts
]
workers = system_utils.get_optimal_workers()
if debug:
all_results_rows = []
all_trade_rows = []
for task in tasks:
results, trades = process(task, debug)
if results or trades:
all_results_rows.extend(results)
all_trade_rows.extend(trades)
else:
with concurrent.futures.ProcessPoolExecutor(max_workers=workers) as executor:
futures = {executor.submit(process, task, debug): task for task in tasks}
all_results_rows = []
all_trade_rows = []
for future in concurrent.futures.as_completed(futures):
results, trades = future.result()
if results or trades:
all_results_rows.extend(results)
all_trade_rows.extend(trades)
backtest_filename = os.path.join(f"{timestamp}_backtest.csv")
backtest_fieldnames = [
"timeframe", "stop_loss_pct", "n_trades", "n_stop_loss", "win_rate",
"max_drawdown", "avg_trade", "profit_ratio", "final_usd", "total_fees_usd"
]
storage.write_backtest_results(backtest_filename, backtest_fieldnames, all_results_rows, metadata_lines)