Cycles/main.py
Vasily.onl 5d0b707bc6 Implement BBRS strategy with multi-timeframe support and enhance strategy manager
- Added BBRS strategy implementation, incorporating Bollinger Bands and RSI for trading signals.
- Introduced multi-timeframe analysis support, allowing strategies to handle internal resampling.
- Enhanced StrategyManager to log strategy initialization and unique timeframes in use.
- Updated DefaultStrategy to support flexible timeframe configurations and improved stop-loss execution.
- Improved plotting logic in BacktestCharts for better visualization of strategy outputs and trades.
- Refactored strategy base class to facilitate resampling and data handling across different timeframes.
2025-05-23 16:56:53 +08:00

600 lines
25 KiB
Python

import pandas as pd
import numpy as np
import logging
import concurrent.futures
import os
import datetime
import argparse
import json
from cycles.utils.storage import Storage
from cycles.utils.system import SystemUtils
from cycles.backtest import Backtest
from cycles.Analysis.supertrend import Supertrends
from cycles.charts import BacktestCharts
from cycles.Analysis.strategies import Strategy
from cycles.strategies import StrategyManager, create_strategy_manager
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.FileHandler("backtest.log"),
logging.StreamHandler()
]
)
def default_init_strategy(backtester: Backtest):
"""Calculate meta trend
"""
supertrends = Supertrends(backtester.df, verbose=False)
supertrend_results_list = supertrends.calculate_supertrend_indicators()
trends = [st['results']['trend'] for st in supertrend_results_list]
trends_arr = np.stack(trends, axis=1)
meta_trend = np.where((trends_arr[:,0] == trends_arr[:,1]) & (trends_arr[:,1] == trends_arr[:,2]),
trends_arr[:,0], 0)
backtester.strategies["meta_trend"] = meta_trend
def bbrs_init_strategy(backtester: Backtest):
"""BBRs entry strategy initialization - just setup basic structure"""
# Initialize empty strategies
backtester.strategies["buy_signals"] = pd.Series(False, index=range(len(backtester.df)))
backtester.strategies["sell_signals"] = pd.Series(False, index=range(len(backtester.df)))
return backtester
def run_bbrs_strategy_processing(backtester: Backtest, original_df):
"""Run the actual strategy processing after backtest is initialized"""
config_strategy = {
"bb_width": 0.05,
"bb_period": 20,
"rsi_period": 14,
"trending": {
"rsi_threshold": [30, 70],
"bb_std_dev_multiplier": 2.5,
},
"sideways": {
"rsi_threshold": [40, 60],
"bb_std_dev_multiplier": 1.8,
},
"strategy_name": "MarketRegimeStrategy", # "MarketRegimeStrategy", # CryptoTradingStrategy
"SqueezeStrategy": True
}
strategy = Strategy(config=config_strategy, logging=logging)
processed_data = strategy.run(original_df, config_strategy["strategy_name"])
print(f"processed_data: {processed_data.head()}")
# Store processed data for plotting
backtester.processed_data = processed_data
if processed_data.empty:
# If strategy processing failed, create empty signals aligned with backtest DataFrame
buy_condition = pd.Series(False, index=range(len(backtester.df)))
sell_condition = pd.Series(False, index=range(len(backtester.df)))
else:
# Get original signals from processed data
buy_signals_raw = processed_data.get('BuySignal', pd.Series(False, index=processed_data.index)).astype(bool)
sell_signals_raw = processed_data.get('SellSignal', pd.Series(False, index=processed_data.index)).astype(bool)
# Get the DatetimeIndex from the original 1-minute data
original_datetime_index = original_df.index
# Reindex signals from 15-minute to 1-minute resolution using forward-fill
# This maps each 15-minute signal to the corresponding 1-minute timestamps
buy_signals_1min = buy_signals_raw.reindex(original_datetime_index, method='ffill').fillna(False)
sell_signals_1min = sell_signals_raw.reindex(original_datetime_index, method='ffill').fillna(False)
# Convert to integer index to match backtest DataFrame
buy_condition = pd.Series(buy_signals_1min.values, index=range(len(buy_signals_1min)))
sell_condition = pd.Series(sell_signals_1min.values, index=range(len(sell_signals_1min)))
# Ensure same length as backtest DataFrame (should be same now, but safety check)
if len(buy_condition) != len(backtester.df):
target_length = len(backtester.df)
if len(buy_condition) > target_length:
buy_condition = buy_condition[:target_length]
sell_condition = sell_condition[:target_length]
else:
# Pad with False if shorter
buy_values = buy_condition.values
sell_values = sell_condition.values
buy_values = np.pad(buy_values, (0, target_length - len(buy_values)), constant_values=False)
sell_values = np.pad(sell_values, (0, target_length - len(sell_values)), constant_values=False)
buy_condition = pd.Series(buy_values, index=range(target_length))
sell_condition = pd.Series(sell_values, index=range(target_length))
backtester.strategies["buy_signals"] = buy_condition
backtester.strategies["sell_signals"] = sell_condition
# backtester.strategies["buy_signals"] = sell_condition
# backtester.strategies["sell_signals"] = buy_condition
print(f"buy_signals length: {len(backtester.strategies['buy_signals'])}, backtest df length: {len(backtester.df)}")
def bbrs_entry_strategy(backtester: Backtest, df_index):
"""BBRs entry strategy
Entry when buy signal is true
"""
return backtester.strategies["buy_signals"].iloc[df_index]
def bbrs_exit_strategy(backtester: Backtest, df_index):
"""BBRs exit strategy
Exit when sell signal is true or stop loss is triggered
"""
if backtester.strategies["sell_signals"].iloc[df_index]:
return "SELL_SIGNAL", backtester.df.iloc[df_index]['close']
# Check for stop loss using BBRs-specific stop loss strategy
stop_loss_result, sell_price = bbrs_stop_loss_strategy(backtester)
if stop_loss_result:
backtester.strategies["current_trade_min1_start_idx"] = \
backtester.current_trade_min1_start_idx
return "STOP_LOSS", sell_price
return None, None
def bbrs_stop_loss_strategy(backtester: Backtest):
"""BBRs stop loss strategy
Calculate stop loss price based on 5% loss
Find the first min1 candle that is below the stop loss price
If the stop loss price is below the open price, use the open price as the stop loss price
"""
# Use 5% stop loss as requested
stop_loss_pct = 0.05
stop_price = backtester.entry_price * (1 - stop_loss_pct)
# Use the original min1 dataframe that has datetime index
min1_df = backtester.original_df if hasattr(backtester, 'original_df') else backtester.min1_df
min1_index = min1_df.index
# Find candles from entry time to current time
start_candidates = min1_index[min1_index >= backtester.entry_time]
if len(start_candidates) == 0:
return False, None
backtester.current_trade_min1_start_idx = start_candidates[0]
end_candidates = min1_index[min1_index <= backtester.current_date]
if len(end_candidates) == 0:
print("Warning: no end candidate here. Need to be checked")
return False, None
backtester.current_min1_end_idx = end_candidates[-1]
# Get the slice of data between entry and current time
min1_slice = min1_df.loc[backtester.current_trade_min1_start_idx:backtester.current_min1_end_idx]
# Check if any candle's low price hits the stop loss
if (min1_slice['low'] <= stop_price).any():
stop_candle = min1_slice[min1_slice['low'] <= stop_price].iloc[0]
# If the candle opened below stop price, use open price; otherwise use stop price
if stop_candle['open'] < stop_price:
sell_price = stop_candle['open']
else:
sell_price = stop_price
return True, sell_price
return False, None
def default_entry_strategy(backtester: Backtest, df_index):
"""Entry strategy
Entry when meta trend is 1
"""
return backtester.strategies["meta_trend"][df_index - 1] != 1 and backtester.strategies["meta_trend"][df_index] == 1
def stop_loss_strategy(backtester: Backtest):
"""Stop loss strategy
Calculate stop loss price
Find the first min1 candle that is below the stop loss price
If the stop loss price is below the open price, use the open price as the stop loss price
"""
stop_price = backtester.entry_price * (1 - backtester.strategies["stop_loss_pct"])
min1_index = backtester.min1_df.index
start_candidates = min1_index[min1_index >= backtester.entry_time]
backtester.current_trade_min1_start_idx = start_candidates[0]
end_candidates = min1_index[min1_index <= backtester.current_date]
if len(end_candidates) == 0:
print("Warning: no end candidate here. Need to be checked")
return False, None
backtester.current_min1_end_idx = end_candidates[-1]
min1_slice = backtester.min1_df.loc[backtester.current_trade_min1_start_idx:backtester.current_min1_end_idx]
# print(f"lowest low in that range: {min1_slice['low'].min()}, count: {len(min1_slice)}")
# print(f"slice start: {min1_slice.index[0]}, slice end: {min1_slice.index[-1]}")
if (min1_slice['low'] <= stop_price).any():
stop_candle = min1_slice[min1_slice['low'] <= stop_price].iloc[0]
if stop_candle['open'] < stop_price:
sell_price = stop_candle['open']
else:
sell_price = stop_price
return True, sell_price
return False, None
def default_exit_strategy(backtester: Backtest, df_index):
if backtester.strategies["meta_trend"][df_index - 1] != 1 and \
backtester.strategies["meta_trend"][df_index] == -1:
return "META_TREND_EXIT_SIGNAL", None
stop_loss_result, sell_price = stop_loss_strategy(backtester)
if stop_loss_result:
backtester.strategies["current_trade_min1_start_idx"] = \
backtester.min1_df.index[backtester.min1_df.index <= backtester.current_date][-1]
return "STOP_LOSS", sell_price
return None, None
def strategy_manager_init(backtester: Backtest):
"""Strategy Manager initialization function"""
# This will be called by Backtest.__init__, but actual initialization
# happens in strategy_manager.initialize()
pass
def strategy_manager_entry(backtester: Backtest, df_index: int):
"""Strategy Manager entry function"""
return backtester.strategy_manager.get_entry_signal(backtester, df_index)
def strategy_manager_exit(backtester: Backtest, df_index: int):
"""Strategy Manager exit function"""
return backtester.strategy_manager.get_exit_signal(backtester, df_index)
def process_timeframe_data(min1_df, df, stop_loss_pcts, rule_name, initial_usd, strategy_config=None, debug=False):
"""Process the entire timeframe with all stop loss values using Strategy Manager"""
results_rows = []
trade_rows = []
for stop_loss_pct in stop_loss_pcts:
# Create and initialize strategy manager
if strategy_config:
# Use provided strategy configuration
strategy_manager = create_strategy_manager(strategy_config)
else:
# Default to single default strategy for backward compatibility
default_strategy_config = {
"strategies": [
{
"name": "default",
"weight": 1.0,
"params": {"stop_loss_pct": stop_loss_pct}
}
],
"combination_rules": {
"entry": "any",
"exit": "any",
"min_confidence": 0.5
}
}
strategy_manager = create_strategy_manager(default_strategy_config)
# Inject stop_loss_pct into all strategy params if not present
for strategy in strategy_manager.strategies:
if "stop_loss_pct" not in strategy.params:
strategy.params["stop_loss_pct"] = stop_loss_pct
# Get the primary timeframe from the first strategy for backtester setup
primary_strategy = strategy_manager.strategies[0]
primary_timeframe = primary_strategy.get_timeframes()[0]
# For BBRS strategy, it works with 1-minute data directly and handles internal resampling
# For other strategies, use their preferred timeframe
if primary_strategy.name == "bbrs":
# BBRS strategy processes 1-minute data and outputs signals on its internal timeframes
# Use 1-minute data for backtester working dataframe
working_df = min1_df.copy()
else:
# Other strategies specify their preferred timeframe
# Create backtester working data from the primary strategy's primary timeframe
temp_backtester = type('temp', (), {})()
temp_backtester.original_df = min1_df
# Let the primary strategy resample the data to get the working dataframe
primary_strategy._resample_data(min1_df)
working_df = primary_strategy.get_primary_timeframe_data()
# Prepare working dataframe for backtester (ensure timestamp column)
working_df_for_backtest = working_df.copy().reset_index()
if 'index' in working_df_for_backtest.columns:
working_df_for_backtest = working_df_for_backtest.rename(columns={'index': 'timestamp'})
# Initialize backtest with strategy manager initialization
backtester = Backtest(initial_usd, working_df_for_backtest, working_df_for_backtest, strategy_manager_init)
# Store original min1_df for strategy processing
backtester.original_df = min1_df
# Attach strategy manager to backtester and initialize
backtester.strategy_manager = strategy_manager
strategy_manager.initialize(backtester)
# Run backtest with strategy manager functions
results = backtester.run(
strategy_manager_entry,
strategy_manager_exit,
debug
)
n_trades = results["n_trades"]
trades = results.get('trades', [])
wins = [1 for t in trades if t['exit'] is not None and t['exit'] > t['entry']]
n_winning_trades = len(wins)
total_profit = sum(trade['profit_pct'] for trade in trades)
total_loss = sum(-trade['profit_pct'] for trade in trades if trade['profit_pct'] < 0)
win_rate = n_winning_trades / n_trades if n_trades > 0 else 0
avg_trade = total_profit / n_trades if n_trades > 0 else 0
profit_ratio = total_profit / total_loss if total_loss > 0 else float('inf')
cumulative_profit = 0
max_drawdown = 0
peak = 0
for trade in trades:
cumulative_profit += trade['profit_pct']
if cumulative_profit > peak:
peak = cumulative_profit
drawdown = peak - cumulative_profit
if drawdown > max_drawdown:
max_drawdown = drawdown
final_usd = initial_usd
for trade in trades:
final_usd *= (1 + trade['profit_pct'])
total_fees_usd = sum(trade.get('fee_usd', 0.0) for trade in trades)
# Update row to include timeframe information
row = {
"timeframe": f"{rule_name}({primary_timeframe})", # Show actual timeframe used
"stop_loss_pct": stop_loss_pct,
"n_trades": n_trades,
"n_stop_loss": sum(1 for trade in trades if 'type' in trade and trade['type'] == 'STOP_LOSS'),
"win_rate": win_rate,
"max_drawdown": max_drawdown,
"avg_trade": avg_trade,
"total_profit": total_profit,
"total_loss": total_loss,
"profit_ratio": profit_ratio,
"initial_usd": initial_usd,
"final_usd": final_usd,
"total_fees_usd": total_fees_usd,
}
results_rows.append(row)
for trade in trades:
trade_rows.append({
"timeframe": f"{rule_name}({primary_timeframe})",
"stop_loss_pct": stop_loss_pct,
"entry_time": trade.get("entry_time"),
"exit_time": trade.get("exit_time"),
"entry_price": trade.get("entry"),
"exit_price": trade.get("exit"),
"profit_pct": trade.get("profit_pct"),
"type": trade.get("type"),
"fee_usd": trade.get("fee_usd"),
})
# Log strategy summary
strategy_summary = strategy_manager.get_strategy_summary()
logging.info(f"Timeframe: {rule_name}({primary_timeframe}), Stop Loss: {stop_loss_pct}, "
f"Trades: {n_trades}, Strategies: {[s['name'] for s in strategy_summary['strategies']]}")
if debug:
# Plot after each backtest run
try:
# Check if any strategy has processed_data for universal plotting
processed_data = None
for strategy in strategy_manager.strategies:
if hasattr(backtester, 'processed_data') and backtester.processed_data is not None:
processed_data = backtester.processed_data
break
if processed_data is not None and not processed_data.empty:
# Format strategy data with actual executed trades for universal plotting
formatted_data = BacktestCharts.format_strategy_data_with_trades(processed_data, results)
# Plot using universal function
BacktestCharts.plot_data(formatted_data)
else:
# Fallback to meta_trend plot if available
if "meta_trend" in backtester.strategies:
meta_trend = backtester.strategies["meta_trend"]
# Use the working dataframe for plotting
BacktestCharts.plot(working_df, meta_trend)
else:
print("No plotting data available")
except Exception as e:
print(f"Plotting failed: {e}")
return results_rows, trade_rows
def process(timeframe_info, debug=False):
"""Process a single (timeframe, stop_loss_pct) combination with strategy config"""
rule, data_1min, stop_loss_pct, initial_usd, strategy_config = timeframe_info
# Pass the original 1-minute data - strategies will handle their own timeframe resampling
results_rows, all_trade_rows = process_timeframe_data(
data_1min, data_1min, [stop_loss_pct], rule, initial_usd, strategy_config, debug=debug
)
return results_rows, all_trade_rows
def aggregate_results(all_rows):
"""Aggregate results per stop_loss_pct and per rule (timeframe)"""
from collections import defaultdict
grouped = defaultdict(list)
for row in all_rows:
key = (row['timeframe'], row['stop_loss_pct'])
grouped[key].append(row)
summary_rows = []
for (rule, stop_loss_pct), rows in grouped.items():
n_months = len(rows)
total_trades = sum(r['n_trades'] for r in rows)
total_stop_loss = sum(r['n_stop_loss'] for r in rows)
avg_win_rate = np.mean([r['win_rate'] for r in rows])
avg_max_drawdown = np.mean([r['max_drawdown'] for r in rows])
avg_avg_trade = np.mean([r['avg_trade'] for r in rows])
avg_profit_ratio = np.mean([r['profit_ratio'] for r in rows])
# Calculate final USD
final_usd = np.mean([r.get('final_usd', initial_usd) for r in rows])
total_fees_usd = np.mean([r.get('total_fees_usd') for r in rows])
summary_rows.append({
"timeframe": rule,
"stop_loss_pct": stop_loss_pct,
"n_trades": total_trades,
"n_stop_loss": total_stop_loss,
"win_rate": avg_win_rate,
"max_drawdown": avg_max_drawdown,
"avg_trade": avg_avg_trade,
"profit_ratio": avg_profit_ratio,
"initial_usd": initial_usd,
"final_usd": final_usd,
"total_fees_usd": total_fees_usd,
})
return summary_rows
def get_nearest_price(df, target_date):
if len(df) == 0:
return None, None
target_ts = pd.to_datetime(target_date)
nearest_idx = df.index.get_indexer([target_ts], method='nearest')[0]
nearest_time = df.index[nearest_idx]
price = df.iloc[nearest_idx]['close']
return nearest_time, price
if __name__ == "__main__":
debug = True
parser = argparse.ArgumentParser(description="Run backtest with config file.")
parser.add_argument("config", type=str, nargs="?", help="Path to config JSON file.")
args = parser.parse_args()
# Default values (from config.json)
default_config = {
"start_date": "2025-03-01",
"stop_date": datetime.datetime.today().strftime('%Y-%m-%d'),
"initial_usd": 10000,
"timeframes": ["15min"],
"stop_loss_pcts": [0.03],
"strategies": [
{
"name": "default",
"weight": 1.0,
"params": {}
}
],
"combination_rules": {
"entry": "any",
"exit": "any",
"min_confidence": 0.5
}
}
if args.config:
with open(args.config, 'r') as f:
config = json.load(f)
elif not debug:
print("No config file provided. Please enter the following values (press Enter to use default):")
start_date = input(f"Start date [{default_config['start_date']}]: ") or default_config['start_date']
stop_date = input(f"Stop date [{default_config['stop_date']}]: ") or default_config['stop_date']
initial_usd_str = input(f"Initial USD [{default_config['initial_usd']}]: ") or str(default_config['initial_usd'])
initial_usd = float(initial_usd_str)
timeframes_str = input(f"Timeframes (comma separated) [{', '.join(default_config['timeframes'])}]: ") or ','.join(default_config['timeframes'])
timeframes = [tf.strip() for tf in timeframes_str.split(',') if tf.strip()]
stop_loss_pcts_str = input(f"Stop loss pcts (comma separated) [{', '.join(str(x) for x in default_config['stop_loss_pcts'])}]: ") or ','.join(str(x) for x in default_config['stop_loss_pcts'])
stop_loss_pcts = [float(x.strip()) for x in stop_loss_pcts_str.split(',') if x.strip()]
config = {
'start_date': start_date,
'stop_date': stop_date,
'initial_usd': initial_usd,
'timeframes': timeframes,
'stop_loss_pcts': stop_loss_pcts,
'strategies': default_config['strategies'],
'combination_rules': default_config['combination_rules']
}
else:
config = default_config
start_date = config['start_date']
stop_date = config['stop_date']
initial_usd = config['initial_usd']
timeframes = config['timeframes']
stop_loss_pcts = config['stop_loss_pcts']
# Extract strategy configuration
strategy_config = {
"strategies": config.get('strategies', default_config['strategies']),
"combination_rules": config.get('combination_rules', default_config['combination_rules'])
}
timestamp = datetime.datetime.now().strftime("%Y_%m_%d_%H_%M")
storage = Storage(logging=logging)
system_utils = SystemUtils(logging=logging)
data_1min = storage.load_data('btcusd_1-min_data.csv', start_date, stop_date)
nearest_start_time, start_price = get_nearest_price(data_1min, start_date)
nearest_stop_time, stop_price = get_nearest_price(data_1min, stop_date)
metadata_lines = [
f"Start date\t{start_date}\tPrice\t{start_price}",
f"Stop date\t{stop_date}\tPrice\t{stop_price}",
f"Initial USD\t{initial_usd}"
]
# Create tasks for each (timeframe, stop_loss_pct) combination
tasks = [
(name, data_1min, stop_loss_pct, initial_usd, strategy_config)
for name in timeframes
for stop_loss_pct in stop_loss_pcts
]
if debug:
all_results_rows = []
all_trade_rows = []
for task in tasks:
results, trades = process(task, debug)
if results or trades:
all_results_rows.extend(results)
all_trade_rows.extend(trades)
else:
workers = system_utils.get_optimal_workers()
with concurrent.futures.ProcessPoolExecutor(max_workers=workers) as executor:
futures = {executor.submit(process, task, debug): task for task in tasks}
all_results_rows = []
all_trade_rows = []
for future in concurrent.futures.as_completed(futures):
results, trades = future.result()
if results or trades:
all_results_rows.extend(results)
all_trade_rows.extend(trades)
backtest_filename = os.path.join(f"{timestamp}_backtest.csv")
backtest_fieldnames = [
"timeframe", "stop_loss_pct", "n_trades", "n_stop_loss", "win_rate",
"max_drawdown", "avg_trade", "profit_ratio", "final_usd", "total_fees_usd"
]
storage.write_backtest_results(backtest_filename, backtest_fieldnames, all_results_rows, metadata_lines)
trades_fieldnames = ["entry_time", "exit_time", "entry_price", "exit_price", "profit_pct", "type", "fee_usd"]
storage.write_trades(all_trade_rows, trades_fieldnames)