From 74c8048ed5a5bae07a8fb693748338dd3be270a3 Mon Sep 17 00:00:00 2001 From: Simon Moisy Date: Tue, 27 May 2025 17:49:55 +0800 Subject: [PATCH] shifted one day back on the metatrend to avoid lookahead bias, reverted metatrend calculus to use no cpu optimization for readability --- cycles/backtest.py | 7 +- cycles/supertrend.py | 315 +++++++++++-------------------------------- main.py | 55 ++++++-- 3 files changed, 131 insertions(+), 246 deletions(-) diff --git a/cycles/backtest.py b/cycles/backtest.py index f90800b..2fddd08 100644 --- a/cycles/backtest.py +++ b/cycles/backtest.py @@ -27,6 +27,9 @@ class Backtest: trends_arr = np.stack(trends, axis=1) meta_trend = np.where((trends_arr[:,0] == trends_arr[:,1]) & (trends_arr[:,1] == trends_arr[:,2]), trends_arr[:,0], 0) + # Shift meta_trend by one to avoid lookahead bias + meta_trend_signal = np.roll(meta_trend, 1) + meta_trend_signal[0] = 0 # or np.nan, but 0 means 'no signal' for first bar position = 0 # 0 = no position, 1 = long entry_price = 0 @@ -45,8 +48,8 @@ class Backtest: price_open = _df['open'].iloc[i] price_close = _df['close'].iloc[i] date = _df['timestamp'].iloc[i] - prev_mt = meta_trend[i-1] - curr_mt = meta_trend[i] + prev_mt = meta_trend_signal[i-1] + curr_mt = meta_trend_signal[i] # Check stop loss if in position if position == 1: diff --git a/cycles/supertrend.py b/cycles/supertrend.py index bd6e845..82fefda 100644 --- a/cycles/supertrend.py +++ b/cycles/supertrend.py @@ -1,70 +1,30 @@ import pandas as pd import numpy as np import logging -from scipy.signal import find_peaks -from matplotlib.patches import Rectangle -from scipy import stats -import concurrent.futures -from functools import partial from functools import lru_cache -import matplotlib.pyplot as plt -# Color configuration -# Plot colors -DARK_BG_COLOR = '#181C27' -LEGEND_BG_COLOR = '#333333' -TITLE_COLOR = 'white' -AXIS_LABEL_COLOR = 'white' - -# Candlestick colors -CANDLE_UP_COLOR = '#089981' # Green -CANDLE_DOWN_COLOR = '#F23645' # Red - -# Marker colors -MIN_COLOR = 'red' -MAX_COLOR = 'green' - -# Line style colors -MIN_LINE_STYLE = 'g--' # Green dashed -MAX_LINE_STYLE = 'r--' # Red dashed -SMA7_LINE_STYLE = 'y-' # Yellow solid -SMA15_LINE_STYLE = 'm-' # Magenta solid - -# SuperTrend colors -ST_COLOR_UP = 'g-' -ST_COLOR_DOWN = 'r-' - -# Cache the calculation results by function parameters @lru_cache(maxsize=32) def cached_supertrend_calculation(period, multiplier, data_tuple): - # Convert tuple back to numpy arrays high = np.array(data_tuple[0]) low = np.array(data_tuple[1]) close = np.array(data_tuple[2]) - - # Calculate TR and ATR using vectorized operations tr = np.zeros_like(close) tr[0] = high[0] - low[0] hc_range = np.abs(high[1:] - close[:-1]) lc_range = np.abs(low[1:] - close[:-1]) hl_range = high[1:] - low[1:] tr[1:] = np.maximum.reduce([hl_range, hc_range, lc_range]) - - # Use numpy's exponential moving average atr = np.zeros_like(tr) atr[0] = tr[0] multiplier_ema = 2.0 / (period + 1) for i in range(1, len(tr)): atr[i] = (tr[i] * multiplier_ema) + (atr[i-1] * (1 - multiplier_ema)) - - # Calculate bands upper_band = np.zeros_like(close) lower_band = np.zeros_like(close) for i in range(len(close)): hl_avg = (high[i] + low[i]) / 2 upper_band[i] = hl_avg + (multiplier * atr[i]) lower_band[i] = hl_avg - (multiplier * atr[i]) - final_upper = np.zeros_like(close) final_lower = np.zeros_like(close) supertrend = np.zeros_like(close) @@ -106,76 +66,18 @@ def cached_supertrend_calculation(period, multiplier, data_tuple): } def calculate_supertrend_external(data, period, multiplier): - # Convert DataFrame columns to hashable tuples high_tuple = tuple(data['high']) low_tuple = tuple(data['low']) close_tuple = tuple(data['close']) - - # Call the cached function return cached_supertrend_calculation(period, multiplier, (high_tuple, low_tuple, close_tuple)) - class Supertrends: def __init__(self, data, verbose=False, display=False): - """ - Initialize the TrendDetectorSimple class. - - Parameters: - - data: pandas DataFrame containing price data - - verbose: boolean, whether to display detailed logging information - - display: boolean, whether to enable display/plotting features - """ - self.data = data self.verbose = verbose - self.display = display - - # Only define display-related variables if display is True - if self.display: - # Plot style configuration - self.plot_style = 'dark_background' - self.bg_color = DARK_BG_COLOR - self.plot_size = (12, 8) - - # Candlestick configuration - self.candle_width = 0.6 - self.candle_up_color = CANDLE_UP_COLOR - self.candle_down_color = CANDLE_DOWN_COLOR - self.candle_alpha = 0.8 - self.wick_width = 1 - - # Marker configuration - self.min_marker = '^' - self.min_color = MIN_COLOR - self.min_size = 100 - self.max_marker = 'v' - self.max_color = MAX_COLOR - self.max_size = 100 - self.marker_zorder = 100 - - # Line configuration - self.line_width = 1 - self.min_line_style = MIN_LINE_STYLE - self.max_line_style = MAX_LINE_STYLE - self.sma7_line_style = SMA7_LINE_STYLE - self.sma15_line_style = SMA15_LINE_STYLE - - # Text configuration - self.title_size = 14 - self.title_color = TITLE_COLOR - self.axis_label_size = 12 - self.axis_label_color = AXIS_LABEL_COLOR - - # Legend configuration - self.legend_loc = 'best' - self.legend_bg_color = LEGEND_BG_COLOR - - # Configure logging logging.basicConfig(level=logging.INFO if verbose else logging.WARNING, format='%(asctime)s - %(levelname)s - %(message)s') self.logger = logging.getLogger('TrendDetectorSimple') - - # Convert data to pandas DataFrame if it's not already if not isinstance(self.data, pd.DataFrame): if isinstance(self.data, list): self.data = pd.DataFrame({'close': self.data}) @@ -183,154 +85,101 @@ class Supertrends: raise ValueError("Data must be a pandas DataFrame or a list") def calculate_tr(self): + df = self.data.copy() + high = df['high'].values + low = df['low'].values + close = df['close'].values + tr = np.zeros_like(close) + tr[0] = high[0] - low[0] + for i in range(1, len(close)): + hl_range = high[i] - low[i] + hc_range = abs(high[i] - close[i-1]) + lc_range = abs(low[i] - close[i-1]) + tr[i] = max(hl_range, hc_range, lc_range) + return tr + + def calculate_atr(self, period=14): + tr = self.calculate_tr() + atr = np.zeros_like(tr) + atr[0] = tr[0] + multiplier = 2.0 / (period + 1) + for i in range(1, len(tr)): + atr[i] = (tr[i] * multiplier) + (atr[i-1] * (1 - multiplier)) + return atr + + def calculate_supertrend(self, period=10, multiplier=3.0): """ - Calculate True Range (TR) for the price data. - - True Range is the greatest of: - 1. Current high - current low - 2. |Current high - previous close| - 3. |Current low - previous close| - + Calculate SuperTrend indicator for the price data. + SuperTrend is a trend-following indicator that uses ATR to determine the trend direction. + Parameters: + - period: int, the period for the ATR calculation (default: 10) + - multiplier: float, the multiplier for the ATR (default: 3.0) Returns: - - Numpy array of TR values + - Dictionary containing SuperTrend values, trend direction, and upper/lower bands """ df = self.data.copy() high = df['high'].values low = df['low'].values close = df['close'].values - - tr = np.zeros_like(close) - tr[0] = high[0] - low[0] # First TR is just the first day's range - + atr = self.calculate_atr(period) + upper_band = np.zeros_like(close) + lower_band = np.zeros_like(close) + for i in range(len(close)): + hl_avg = (high[i] + low[i]) / 2 + upper_band[i] = hl_avg + (multiplier * atr[i]) + lower_band[i] = hl_avg - (multiplier * atr[i]) + final_upper = np.zeros_like(close) + final_lower = np.zeros_like(close) + supertrend = np.zeros_like(close) + trend = np.zeros_like(close) + final_upper[0] = upper_band[0] + final_lower[0] = lower_band[0] + if close[0] <= upper_band[0]: + supertrend[0] = upper_band[0] + trend[0] = -1 + else: + supertrend[0] = lower_band[0] + trend[0] = 1 for i in range(1, len(close)): - # Current high - current low - hl_range = high[i] - low[i] - # |Current high - previous close| - hc_range = abs(high[i] - close[i-1]) - # |Current low - previous close| - lc_range = abs(low[i] - close[i-1]) - - # TR is the maximum of these three values - tr[i] = max(hl_range, hc_range, lc_range) - - return tr - - def calculate_atr(self, period=14): - """ - Calculate Average True Range (ATR) for the price data. - - ATR is the exponential moving average of the True Range over a specified period. - - Parameters: - - period: int, the period for the ATR calculation (default: 14) - - Returns: - - Numpy array of ATR values - """ - - tr = self.calculate_tr() - atr = np.zeros_like(tr) - - # First ATR value is just the first TR - atr[0] = tr[0] - - # Calculate exponential moving average (EMA) of TR - multiplier = 2.0 / (period + 1) - - for i in range(1, len(tr)): - atr[i] = (tr[i] * multiplier) + (atr[i-1] * (1 - multiplier)) - - return atr - - def detect_trends(self): - """ - Detect trends by identifying local minima and maxima in the price data - using scipy.signal.find_peaks. - - Parameters: - - prominence: float, required prominence of peaks (relative to the price range) - - width: int, required width of peaks in data points - - Returns: - - DataFrame with columns for timestamps, prices, and trend indicators - - Dictionary containing analysis results including linear regression, SMAs, and SuperTrend indicators - """ - df = self.data - # close_prices = df['close'].values - - # max_peaks, _ = find_peaks(close_prices) - # min_peaks, _ = find_peaks(-close_prices) - - # df['is_min'] = False - # df['is_max'] = False - - # for peak in max_peaks: - # df.at[peak, 'is_max'] = True - # for peak in min_peaks: - # df.at[peak, 'is_min'] = True - - # result = df[['timestamp', 'close', 'is_min', 'is_max']].copy() - - # Perform linear regression on min_peaks and max_peaks - # min_prices = df['close'].iloc[min_peaks].values - # max_prices = df['close'].iloc[max_peaks].values - - # Linear regression for min peaks if we have at least 2 points - # min_slope, min_intercept, min_r_value, _, _ = stats.linregress(min_peaks, min_prices) - # Linear regression for max peaks if we have at least 2 points - # max_slope, max_intercept, max_r_value, _, _ = stats.linregress(max_peaks, max_prices) + if (upper_band[i] < final_upper[i-1]) or (close[i-1] > final_upper[i-1]): + final_upper[i] = upper_band[i] + else: + final_upper[i] = final_upper[i-1] + if (lower_band[i] > final_lower[i-1]) or (close[i-1] < final_lower[i-1]): + final_lower[i] = lower_band[i] + else: + final_lower[i] = final_lower[i-1] + if supertrend[i-1] == final_upper[i-1] and close[i] <= final_upper[i]: + supertrend[i] = final_upper[i] + trend[i] = -1 + elif supertrend[i-1] == final_upper[i-1] and close[i] > final_upper[i]: + supertrend[i] = final_lower[i] + trend[i] = 1 + elif supertrend[i-1] == final_lower[i-1] and close[i] >= final_lower[i]: + supertrend[i] = final_lower[i] + trend[i] = 1 + elif supertrend[i-1] == final_lower[i-1] and close[i] < final_lower[i]: + supertrend[i] = final_upper[i] + trend[i] = -1 + supertrend_results = { + 'supertrend': supertrend, + 'trend': trend, + 'upper_band': final_upper, + 'lower_band': final_lower + } + return supertrend_results - # Calculate Simple Moving Averages (SMA) for 7 and 15 periods - # sma_7 = pd.Series(close_prices).rolling(window=7, min_periods=1).mean().values - # sma_15 = pd.Series(close_prices).rolling(window=15, min_periods=1).mean().values - - analysis_results = {} - # analysis_results['linear_regression'] = { - # 'min': { - # 'slope': min_slope, - # 'intercept': min_intercept, - # 'r_squared': min_r_value ** 2 - # }, - # 'max': { - # 'slope': max_slope, - # 'intercept': max_intercept, - # 'r_squared': max_r_value ** 2 - # } - # } - # analysis_results['sma'] = { - # '7': sma_7, - # '15': sma_15 - # } - - # Calculate SuperTrend indicators - supertrend_results_list = self._calculate_supertrend_indicators() - analysis_results['supertrend'] = supertrend_results_list - - return analysis_results - def calculate_supertrend_indicators(self): - """ - Calculate SuperTrend indicators with different parameter sets in parallel. - Returns: - - list, the SuperTrend results - """ supertrend_params = [ - {"period": 12, "multiplier": 3.0, "color_up": ST_COLOR_UP, "color_down": ST_COLOR_DOWN}, - {"period": 10, "multiplier": 1.0, "color_up": ST_COLOR_UP, "color_down": ST_COLOR_DOWN}, - {"period": 11, "multiplier": 2.0, "color_up": ST_COLOR_UP, "color_down": ST_COLOR_DOWN} + {"period": 12, "multiplier": 3.0}, + {"period": 10, "multiplier": 1.0}, + {"period": 11, "multiplier": 2.0} ] - data = self.data.copy() - - # For just 3 calculations, direct calculation might be faster than process pool results = [] for p in supertrend_params: - result = calculate_supertrend_external(data, p["period"], p["multiplier"]) - results.append(result) - - supertrend_results_list = [] - for params, result in zip(supertrend_params, results): - supertrend_results_list.append({ + result = self.calculate_supertrend(period=p["period"], multiplier=p["multiplier"]) + results.append({ "results": result, - "params": params + "params": p }) - return supertrend_results_list + return results diff --git a/main.py b/main.py index f0ab812..b7d8a7b 100644 --- a/main.py +++ b/main.py @@ -6,7 +6,6 @@ import os import datetime import argparse import json -import ast from cycles.utils.storage import Storage from cycles.utils.system import SystemUtils @@ -48,6 +47,7 @@ def process_timeframe_data(min1_df, df, stop_loss_pcts, rule_name, initial_usd, cumulative_profit = 0 max_drawdown = 0 peak = 0 + for trade in trades: cumulative_profit += trade['profit_pct'] if cumulative_profit > peak: @@ -55,10 +55,14 @@ def process_timeframe_data(min1_df, df, stop_loss_pcts, rule_name, initial_usd, drawdown = peak - cumulative_profit if drawdown > max_drawdown: max_drawdown = drawdown + final_usd = initial_usd + for trade in trades: final_usd *= (1 + trade['profit_pct']) + total_fees_usd = sum(trade.get('fee_usd', 0.0) for trade in trades) + row = { "timeframe": rule_name, "stop_loss_pct": stop_loss_pct, @@ -75,6 +79,7 @@ def process_timeframe_data(min1_df, df, stop_loss_pcts, rule_name, initial_usd, "total_fees_usd": total_fees_usd, } results_rows.append(row) + for trade in trades: trade_rows.append({ "timeframe": rule_name, @@ -87,7 +92,9 @@ def process_timeframe_data(min1_df, df, stop_loss_pcts, rule_name, initial_usd, "type": trade.get("type"), "fee_usd": trade.get("fee_usd"), }) + logging.info(f"Timeframe: {rule_name}, Stop Loss: {stop_loss_pct}, Trades: {n_trades}") + if debug: for trade in trades: if trade['type'] == 'STOP': @@ -95,13 +102,16 @@ def process_timeframe_data(min1_df, df, stop_loss_pcts, rule_name, initial_usd, for trade in trades: if trade['profit_pct'] < -0.09: # or whatever is close to -0.10 print("Large loss trade:", trade) + return results_rows, trade_rows def process(timeframe_info, debug=False): - """Process a single (timeframe, stop_loss_pct) combination (no monthly split)""" + from cycles.utils.storage import Storage # import inside function for safety + storage = Storage(logging=None) # or pass a logger if you want, but None is safest for multiprocessing + rule, data_1min, stop_loss_pct, initial_usd = timeframe_info - if rule == "1T": + if rule == "1T" or rule == "1min": df = data_1min.copy() else: df = data_1min.resample(rule).agg({ @@ -112,7 +122,33 @@ def process(timeframe_info, debug=False): 'volume': 'sum' }).dropna() df = df.reset_index() + results_rows, all_trade_rows = process_timeframe_data(data_1min, df, [stop_loss_pct], rule, initial_usd, debug=debug) + + if all_trade_rows: + trades_fieldnames = ["entry_time", "exit_time", "entry_price", "exit_price", "profit_pct", "type", "fee_usd"] + # Prepare header + summary_fields = ["timeframe", "stop_loss_pct", "n_trades", "n_stop_loss", "win_rate", "max_drawdown", "avg_trade", "profit_ratio", "final_usd"] + summary_row = results_rows[0] + header_line = "\t".join(summary_fields) + "\n" + value_line = "\t".join(str(summary_row.get(f, "")) for f in summary_fields) + "\n" + # File name + tf = summary_row["timeframe"] + sl = summary_row["stop_loss_pct"] + sl_percent = int(round(sl * 100)) + trades_filename = os.path.join(storage.results_dir, f"trades_{tf}_ST{sl_percent}pct.csv") + # Write header + with open(trades_filename, "w") as f: + f.write(header_line) + f.write(value_line) + # Now write trades (append mode, skip header) + with open(trades_filename, "a", newline="") as f: + import csv + writer = csv.DictWriter(f, fieldnames=trades_fieldnames) + writer.writeheader() + for trade in all_trade_rows: + writer.writerow({k: trade.get(k, "") for k in trades_fieldnames}) + return results_rows, all_trade_rows def aggregate_results(all_rows): @@ -126,7 +162,6 @@ def aggregate_results(all_rows): summary_rows = [] for (rule, stop_loss_pct), rows in grouped.items(): - n_months = len(rows) total_trades = sum(r['n_trades'] for r in rows) total_stop_loss = sum(r['n_stop_loss'] for r in rows) avg_win_rate = np.mean([r['win_rate'] for r in rows]) @@ -163,7 +198,7 @@ def get_nearest_price(df, target_date): return nearest_time, price if __name__ == "__main__": - debug = True + debug = False parser = argparse.ArgumentParser(description="Run backtest with config file.") parser.add_argument("config", type=str, nargs="?", help="Path to config JSON file.") @@ -171,11 +206,11 @@ if __name__ == "__main__": # Default values (from config.json) default_config = { - "start_date": "2024-05-15", + "start_date": "2025-05-01", "stop_date": datetime.datetime.today().strftime('%Y-%m-%d'), "initial_usd": 10000, - "timeframes": ["1D"], - "stop_loss_pcts": [0.01, 0.02, 0.03], + "timeframes": ["1D", "6h", "3h", "1h", "30m", "15m", "5m", "1m"], + "stop_loss_pcts": [0.01, 0.02, 0.03, 0.05], } if args.config: @@ -238,6 +273,7 @@ if __name__ == "__main__": if debug: all_results_rows = [] all_trade_rows = [] + for task in tasks: results, trades = process(task, debug) if results or trades: @@ -263,7 +299,4 @@ if __name__ == "__main__": ] storage.write_backtest_results(backtest_filename, backtest_fieldnames, all_results_rows, metadata_lines) - trades_fieldnames = ["entry_time", "exit_time", "entry_price", "exit_price", "profit_pct", "type", "fee_usd"] - storage.write_trades(all_trade_rows, trades_fieldnames) - \ No newline at end of file