import pandas as pd import numpy as np import logging from scipy.signal import find_peaks from matplotlib.patches import Rectangle from scipy import stats import concurrent.futures from functools import partial from functools import lru_cache import matplotlib.pyplot as plt # Color configuration # Plot colors DARK_BG_COLOR = '#181C27' LEGEND_BG_COLOR = '#333333' TITLE_COLOR = 'white' AXIS_LABEL_COLOR = 'white' # Candlestick colors CANDLE_UP_COLOR = '#089981' # Green CANDLE_DOWN_COLOR = '#F23645' # Red # Marker colors MIN_COLOR = 'red' MAX_COLOR = 'green' # Line style colors MIN_LINE_STYLE = 'g--' # Green dashed MAX_LINE_STYLE = 'r--' # Red dashed SMA7_LINE_STYLE = 'y-' # Yellow solid SMA15_LINE_STYLE = 'm-' # Magenta solid # SuperTrend colors ST_COLOR_UP = 'g-' ST_COLOR_DOWN = 'r-' # Cache the calculation results by function parameters @lru_cache(maxsize=32) def cached_supertrend_calculation(period, multiplier, data_tuple): # Convert tuple back to numpy arrays high = np.array(data_tuple[0]) low = np.array(data_tuple[1]) close = np.array(data_tuple[2]) # Calculate TR and ATR using vectorized operations tr = np.zeros_like(close) tr[0] = high[0] - low[0] hc_range = np.abs(high[1:] - close[:-1]) lc_range = np.abs(low[1:] - close[:-1]) hl_range = high[1:] - low[1:] tr[1:] = np.maximum.reduce([hl_range, hc_range, lc_range]) # Use numpy's exponential moving average atr = np.zeros_like(tr) atr[0] = tr[0] multiplier_ema = 2.0 / (period + 1) for i in range(1, len(tr)): atr[i] = (tr[i] * multiplier_ema) + (atr[i-1] * (1 - multiplier_ema)) # Calculate bands upper_band = np.zeros_like(close) lower_band = np.zeros_like(close) for i in range(len(close)): hl_avg = (high[i] + low[i]) / 2 upper_band[i] = hl_avg + (multiplier * atr[i]) lower_band[i] = hl_avg - (multiplier * atr[i]) final_upper = np.zeros_like(close) final_lower = np.zeros_like(close) supertrend = np.zeros_like(close) trend = np.zeros_like(close) final_upper[0] = upper_band[0] final_lower[0] = lower_band[0] if close[0] <= upper_band[0]: supertrend[0] = upper_band[0] trend[0] = -1 else: supertrend[0] = lower_band[0] trend[0] = 1 for i in range(1, len(close)): if (upper_band[i] < final_upper[i-1]) or (close[i-1] > final_upper[i-1]): final_upper[i] = upper_band[i] else: final_upper[i] = final_upper[i-1] if (lower_band[i] > final_lower[i-1]) or (close[i-1] < final_lower[i-1]): final_lower[i] = lower_band[i] else: final_lower[i] = final_lower[i-1] if supertrend[i-1] == final_upper[i-1] and close[i] <= final_upper[i]: supertrend[i] = final_upper[i] trend[i] = -1 elif supertrend[i-1] == final_upper[i-1] and close[i] > final_upper[i]: supertrend[i] = final_lower[i] trend[i] = 1 elif supertrend[i-1] == final_lower[i-1] and close[i] >= final_lower[i]: supertrend[i] = final_lower[i] trend[i] = 1 elif supertrend[i-1] == final_lower[i-1] and close[i] < final_lower[i]: supertrend[i] = final_upper[i] trend[i] = -1 return { 'supertrend': supertrend, 'trend': trend, 'upper_band': final_upper, 'lower_band': final_lower } def calculate_supertrend_external(data, period, multiplier): # Convert DataFrame columns to hashable tuples high_tuple = tuple(data['high']) low_tuple = tuple(data['low']) close_tuple = tuple(data['close']) # Call the cached function return cached_supertrend_calculation(period, multiplier, (high_tuple, low_tuple, close_tuple)) class TrendDetectorSimple: def __init__(self, data, verbose=False, display=False): """ Initialize the TrendDetectorSimple class. Parameters: - data: pandas DataFrame containing price data - verbose: boolean, whether to display detailed logging information - display: boolean, whether to enable display/plotting features """ self.data = data self.verbose = verbose self.display = display # Only define display-related variables if display is True if self.display: # Plot style configuration self.plot_style = 'dark_background' self.bg_color = DARK_BG_COLOR self.plot_size = (12, 8) # Candlestick configuration self.candle_width = 0.6 self.candle_up_color = CANDLE_UP_COLOR self.candle_down_color = CANDLE_DOWN_COLOR self.candle_alpha = 0.8 self.wick_width = 1 # Marker configuration self.min_marker = '^' self.min_color = MIN_COLOR self.min_size = 100 self.max_marker = 'v' self.max_color = MAX_COLOR self.max_size = 100 self.marker_zorder = 100 # Line configuration self.line_width = 1 self.min_line_style = MIN_LINE_STYLE self.max_line_style = MAX_LINE_STYLE self.sma7_line_style = SMA7_LINE_STYLE self.sma15_line_style = SMA15_LINE_STYLE # Text configuration self.title_size = 14 self.title_color = TITLE_COLOR self.axis_label_size = 12 self.axis_label_color = AXIS_LABEL_COLOR # Legend configuration self.legend_loc = 'best' self.legend_bg_color = LEGEND_BG_COLOR # Configure logging logging.basicConfig(level=logging.INFO if verbose else logging.WARNING, format='%(asctime)s - %(levelname)s - %(message)s') self.logger = logging.getLogger('TrendDetectorSimple') # Convert data to pandas DataFrame if it's not already if not isinstance(self.data, pd.DataFrame): if isinstance(self.data, list): self.data = pd.DataFrame({'close': self.data}) else: raise ValueError("Data must be a pandas DataFrame or a list") def calculate_tr(self): """ Calculate True Range (TR) for the price data. True Range is the greatest of: 1. Current high - current low 2. |Current high - previous close| 3. |Current low - previous close| Returns: - Numpy array of TR values """ df = self.data.copy() high = df['high'].values low = df['low'].values close = df['close'].values tr = np.zeros_like(close) tr[0] = high[0] - low[0] # First TR is just the first day's range for i in range(1, len(close)): # Current high - current low hl_range = high[i] - low[i] # |Current high - previous close| hc_range = abs(high[i] - close[i-1]) # |Current low - previous close| lc_range = abs(low[i] - close[i-1]) # TR is the maximum of these three values tr[i] = max(hl_range, hc_range, lc_range) return tr def calculate_atr(self, period=14): """ Calculate Average True Range (ATR) for the price data. ATR is the exponential moving average of the True Range over a specified period. Parameters: - period: int, the period for the ATR calculation (default: 14) Returns: - Numpy array of ATR values """ tr = self.calculate_tr() atr = np.zeros_like(tr) # First ATR value is just the first TR atr[0] = tr[0] # Calculate exponential moving average (EMA) of TR multiplier = 2.0 / (period + 1) for i in range(1, len(tr)): atr[i] = (tr[i] * multiplier) + (atr[i-1] * (1 - multiplier)) return atr def detect_trends(self): """ Detect trends by identifying local minima and maxima in the price data using scipy.signal.find_peaks. Parameters: - prominence: float, required prominence of peaks (relative to the price range) - width: int, required width of peaks in data points Returns: - DataFrame with columns for timestamps, prices, and trend indicators - Dictionary containing analysis results including linear regression, SMAs, and SuperTrend indicators """ df = self.data # close_prices = df['close'].values # max_peaks, _ = find_peaks(close_prices) # min_peaks, _ = find_peaks(-close_prices) # df['is_min'] = False # df['is_max'] = False # for peak in max_peaks: # df.at[peak, 'is_max'] = True # for peak in min_peaks: # df.at[peak, 'is_min'] = True # result = df[['timestamp', 'close', 'is_min', 'is_max']].copy() # Perform linear regression on min_peaks and max_peaks # min_prices = df['close'].iloc[min_peaks].values # max_prices = df['close'].iloc[max_peaks].values # Linear regression for min peaks if we have at least 2 points # min_slope, min_intercept, min_r_value, _, _ = stats.linregress(min_peaks, min_prices) # Linear regression for max peaks if we have at least 2 points # max_slope, max_intercept, max_r_value, _, _ = stats.linregress(max_peaks, max_prices) # Calculate Simple Moving Averages (SMA) for 7 and 15 periods # sma_7 = pd.Series(close_prices).rolling(window=7, min_periods=1).mean().values # sma_15 = pd.Series(close_prices).rolling(window=15, min_periods=1).mean().values analysis_results = {} # analysis_results['linear_regression'] = { # 'min': { # 'slope': min_slope, # 'intercept': min_intercept, # 'r_squared': min_r_value ** 2 # }, # 'max': { # 'slope': max_slope, # 'intercept': max_intercept, # 'r_squared': max_r_value ** 2 # } # } # analysis_results['sma'] = { # '7': sma_7, # '15': sma_15 # } # Calculate SuperTrend indicators supertrend_results_list = self._calculate_supertrend_indicators() analysis_results['supertrend'] = supertrend_results_list return analysis_results def _calculate_supertrend_indicators(self): """ Calculate SuperTrend indicators with different parameter sets in parallel. Returns: - list, the SuperTrend results """ supertrend_params = [ {"period": 12, "multiplier": 3.0, "color_up": ST_COLOR_UP, "color_down": ST_COLOR_DOWN}, {"period": 10, "multiplier": 1.0, "color_up": ST_COLOR_UP, "color_down": ST_COLOR_DOWN}, {"period": 11, "multiplier": 2.0, "color_up": ST_COLOR_UP, "color_down": ST_COLOR_DOWN} ] data = self.data.copy() # For just 3 calculations, direct calculation might be faster than process pool results = [] for p in supertrend_params: result = calculate_supertrend_external(data, p["period"], p["multiplier"]) results.append(result) supertrend_results_list = [] for params, result in zip(supertrend_params, results): supertrend_results_list.append({ "results": result, "params": params }) return supertrend_results_list def plot_trends(self, trend_data, analysis_results, view="both"): """ Plot the price data with detected trends using a candlestick chart. Also plots SuperTrend indicators with three different parameter sets. Parameters: - trend_data: DataFrame, the output from detect_trends() - analysis_results: Dictionary containing analysis results from detect_trends() - view: str, one of 'both', 'trend', 'supertrend'; determines which plot(s) to display Returns: - None (displays the plot) """ if not self.display: return # Do nothing if display is False plt.style.use(self.plot_style) if view == "both": fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(self.plot_size[0]*2, self.plot_size[1])) else: fig, ax = plt.subplots(figsize=self.plot_size) ax1 = ax2 = None if view == "trend": ax1 = ax elif view == "supertrend": ax2 = ax fig.patch.set_facecolor(self.bg_color) if ax1: ax1.set_facecolor(self.bg_color) if ax2: ax2.set_facecolor(self.bg_color) df = self.data.copy() if ax1: self._plot_trend_analysis(ax1, df, trend_data, analysis_results) if ax2: self._plot_supertrend_analysis(ax2, df, analysis_results['supertrend']) plt.tight_layout() plt.show() def _plot_candlesticks(self, ax, df): """ Plot candlesticks on the given axis. Parameters: - ax: matplotlib.axes.Axes, the axis to plot on - df: pandas.DataFrame, the data to plot """ from matplotlib.patches import Rectangle for i in range(len(df)): # Get OHLC values for this candle open_val = df['open'].iloc[i] close_val = df['close'].iloc[i] high_val = df['high'].iloc[i] low_val = df['low'].iloc[i] # Determine candle color color = self.candle_up_color if close_val >= open_val else self.candle_down_color # Plot candle body body_height = abs(close_val - open_val) bottom = min(open_val, close_val) rect = Rectangle((i - self.candle_width/2, bottom), self.candle_width, body_height, color=color, alpha=self.candle_alpha) ax.add_patch(rect) # Plot candle wicks ax.plot([i, i], [low_val, high_val], color=color, linewidth=self.wick_width) def _plot_trend_analysis(self, ax, df, trend_data, analysis_results): """ Plot trend analysis on the given axis. Parameters: - ax: matplotlib.axes.Axes, the axis to plot on - df: pandas.DataFrame, the data to plot - trend_data: pandas.DataFrame, the trend data - analysis_results: dict, the analysis results """ # Draw candlesticks self._plot_candlesticks(ax, df) # Plot minima and maxima points self._plot_min_max_points(ax, df, trend_data) # Plot trend lines and moving averages if analysis_results: self._plot_trend_lines(ax, df, analysis_results) # Configure the subplot self._configure_subplot(ax, 'Price Chart with Trend Analysis', len(df)) def _plot_min_max_points(self, ax, df, trend_data): """ Plot minimum and maximum points on the given axis. Parameters: - ax: matplotlib.axes.Axes, the axis to plot on - df: pandas.DataFrame, the data to plot - trend_data: pandas.DataFrame, the trend data """ min_indices = trend_data.index[trend_data['is_min'] == True].tolist() if min_indices: min_y = [df['close'].iloc[i] for i in min_indices] ax.scatter(min_indices, min_y, color=self.min_color, s=self.min_size, marker=self.min_marker, label='Local Minima', zorder=self.marker_zorder) max_indices = trend_data.index[trend_data['is_max'] == True].tolist() if max_indices: max_y = [df['close'].iloc[i] for i in max_indices] ax.scatter(max_indices, max_y, color=self.max_color, s=self.max_size, marker=self.max_marker, label='Local Maxima', zorder=self.marker_zorder) def _plot_trend_lines(self, ax, df, analysis_results): """ Plot trend lines on the given axis. Parameters: - ax: matplotlib.axes.Axes, the axis to plot on - df: pandas.DataFrame, the data to plot - analysis_results: dict, the analysis results """ x_vals = np.arange(len(df)) # Minima regression line (support) min_slope = analysis_results['linear_regression']['min']['slope'] min_intercept = analysis_results['linear_regression']['min']['intercept'] min_line = min_slope * x_vals + min_intercept ax.plot(x_vals, min_line, self.min_line_style, linewidth=self.line_width, label='Minima Regression') # Maxima regression line (resistance) max_slope = analysis_results['linear_regression']['max']['slope'] max_intercept = analysis_results['linear_regression']['max']['intercept'] max_line = max_slope * x_vals + max_intercept ax.plot(x_vals, max_line, self.max_line_style, linewidth=self.line_width, label='Maxima Regression') # SMA-7 line sma_7 = analysis_results['sma']['7'] ax.plot(x_vals, sma_7, self.sma7_line_style, linewidth=self.line_width, label='SMA-7') # SMA-15 line sma_15 = analysis_results['sma']['15'] valid_idx_15 = ~np.isnan(sma_15) ax.plot(x_vals[valid_idx_15], sma_15[valid_idx_15], self.sma15_line_style, linewidth=self.line_width, label='SMA-15') def _configure_subplot(self, ax, title, data_length): """ Configure the subplot with title, labels, limits, and legend. Parameters: - ax: matplotlib.axes.Axes, the axis to configure - title: str, the title of the subplot - data_length: int, the length of the data """ # Set title and labels ax.set_title(title, fontsize=self.title_size, color=self.title_color) ax.set_xlabel('Date', fontsize=self.axis_label_size, color=self.axis_label_color) ax.set_ylabel('Price', fontsize=self.axis_label_size, color=self.axis_label_color) # Set appropriate x-axis limits ax.set_xlim(-0.5, data_length - 0.5) # Add a legend ax.legend(loc=self.legend_loc, facecolor=self.legend_bg_color) def _plot_supertrend_analysis(self, ax, df, supertrend_results_list=None): """ Plot SuperTrend analysis on the given axis. Parameters: - ax: matplotlib.axes.Axes, the axis to plot on - df: pandas.DataFrame, the data to plot - supertrend_results_list: list, the SuperTrend results (optional) """ self._plot_candlesticks(ax, df) self._plot_supertrend_lines(ax, df, supertrend_results_list, style='Both') self._configure_subplot(ax, 'Multiple SuperTrend Indicators', len(df)) def _plot_supertrend_lines(self, ax, df, supertrend_results_list, style="Horizontal"): """ Plot SuperTrend lines on the given axis. Parameters: - ax: matplotlib.axes.Axes, the axis to plot on - df: pandas.DataFrame, the data to plot - supertrend_results_list: list, the SuperTrend results """ x_vals = np.arange(len(df)) if style == 'Horizontal' or style == 'Both': if len(supertrend_results_list) != 3: raise ValueError("Expected exactly 3 SuperTrend results for meta calculation") trends = [st["results"]["trend"] for st in supertrend_results_list] band_height = 0.02 * (df["high"].max() - df["low"].min()) y_base = df["low"].min() - band_height * 1.5 prev_color = None for i in range(1, len(x_vals)): t_vals = [t[i] for t in trends] up_count = t_vals.count(1) down_count = t_vals.count(-1) if down_count == 3: color = "red" elif down_count == 2 and up_count == 1: color = "orange" elif down_count == 1 and up_count == 2: color = "yellow" elif up_count == 3: color = "green" else: continue # skip if unknown or inconsistent values ax.add_patch(Rectangle( (x_vals[i-1], y_base), 1, band_height, color=color, linewidth=0, alpha=0.6 )) # Draw a vertical line at the change of color if prev_color and prev_color != color: ax.axvline(x_vals[i-1], color="grey", alpha=0.3, linewidth=1) prev_color = color ax.set_ylim(bottom=y_base - band_height * 0.5) if style == 'Curves' or style == 'Both': for st in supertrend_results_list: params = st["params"] results = st["results"] supertrend = results["supertrend"] trend = results["trend"] # Plot SuperTrend line with color based on trend for i in range(1, len(x_vals)): if trend[i] == 1: # Uptrend ax.plot(x_vals[i-1:i+1], supertrend[i-1:i+1], params["color_up"], linewidth=self.line_width) else: # Downtrend ax.plot(x_vals[i-1:i+1], supertrend[i-1:i+1], params["color_down"], linewidth=self.line_width) self._plot_metasupertrend_lines(ax, df, supertrend_results_list) self._add_supertrend_legend(ax, supertrend_results_list) def _plot_metasupertrend_lines(self, ax, df, supertrend_results_list): """ Plot a Meta SuperTrend line where all individual SuperTrends agree on trend. Parameters: - ax: matplotlib.axes.Axes, the axis to plot on - df: pandas.DataFrame, the data to plot - supertrend_results_list: list, each item contains SuperTrend 'results' and 'params' """ x_vals = np.arange(len(df)) if len(supertrend_results_list) != 3: raise ValueError("Expected exactly 3 SuperTrend results for meta calculation") trends = [st["results"]["trend"] for st in supertrend_results_list] supertrends = [st["results"]["supertrend"] for st in supertrend_results_list] params = supertrend_results_list[0]["params"] # Use first config for styling trends_arr = np.stack(trends, axis=1) meta_trend = np.where((trends_arr[:,0] == trends_arr[:,1]) & (trends_arr[:,1] == trends_arr[:,2]), trends_arr[:,0], 0) for i in range(1, len(x_vals)): t1, t2, t3 = trends[0][i], trends[1][i], trends[2][i] if t1 == t2 == t3: meta_trend = t1 # Average the 3 supertrend values st_avg_prev = np.mean([s[i-1] for s in supertrends]) st_avg_curr = np.mean([s[i] for s in supertrends]) color = params["color_up"] if meta_trend == 1 else params["color_down"] ax.plot(x_vals[i-1:i+1], [st_avg_prev, st_avg_curr], color, linewidth=self.line_width) def _add_supertrend_legend(self, ax, supertrend_results_list): """ Add SuperTrend legend entries to the given axis. Parameters: - ax: matplotlib.axes.Axes, the axis to add legend entries to - supertrend_results_list: list, the SuperTrend results """ for st in supertrend_results_list: params = st["params"] period = params["period"] multiplier = params["multiplier"] color_up = params["color_up"] color_down = params["color_down"] ax.plot([], [], color_up, linewidth=self.line_width, label=f'ST (P:{period}, M:{multiplier}) Up') ax.plot([], [], color_down, linewidth=self.line_width, label=f'ST (P:{period}, M:{multiplier}) Down') def backtest_meta_supertrend(self, min1_df, initial_usd=10000, stop_loss_pct=0.05, transaction_cost=0.001, debug=False): """ Backtest a simple strategy using the meta supertrend (all three supertrends agree). Buys when meta supertrend is positive, sells when negative, applies a percentage stop loss. Parameters: - min1_df: pandas DataFrame, 1-minute timeframe data for more accurate stop loss checking (optional) - initial_usd: float, starting USD amount - stop_loss_pct: float, stop loss as a fraction (e.g. 0.05 for 5%) - transaction_cost: float, transaction cost as a fraction (e.g. 0.001 for 0.1%) - debug: bool, whether to print debug info """ df = self.data.copy().reset_index(drop=True) df['timestamp'] = pd.to_datetime(df['timestamp']) # Get meta supertrend (all three agree) supertrend_results_list = self._calculate_supertrend_indicators() trends = [st['results']['trend'] for st in supertrend_results_list] trends_arr = np.stack(trends, axis=1) meta_trend = np.where((trends_arr[:,0] == trends_arr[:,1]) & (trends_arr[:,1] == trends_arr[:,2]), trends_arr[:,0], 0) position = 0 # 0 = no position, 1 = long entry_price = 0 usd = initial_usd coin = 0 trade_log = [] max_balance = initial_usd drawdowns = [] trades = [] entry_time = None current_trade_min1_start_idx = None min1_df['timestamp'] = pd.to_datetime(min1_df.index) for i in range(1, len(df)): if i % 100 == 0 and debug: self.logger.debug(f"Progress: {i}/{len(df)} rows processed.") price_open = df['open'].iloc[i] price_high = df['high'].iloc[i] price_low = df['low'].iloc[i] price_close = df['close'].iloc[i] date = df['timestamp'].iloc[i] prev_mt = meta_trend[i-1] curr_mt = meta_trend[i] # Check stop loss if in position if position == 1: stop_price = entry_price * (1 - stop_loss_pct) if current_trade_min1_start_idx is None: # First check after entry, find the entry point in 1-min data current_trade_min1_start_idx = min1_df.index[min1_df.index >= entry_time][0] # Get the end index for current check current_min1_end_idx = min1_df.index[min1_df.index <= date][-1] # Check all 1-minute candles in between for stop loss min1_slice = min1_df.loc[current_trade_min1_start_idx:current_min1_end_idx] if (min1_slice['low'] <= stop_price).any(): # Stop loss triggered, find the exact candle stop_candle = min1_slice[min1_slice['low'] <= stop_price].iloc[0] # More realistic fill: if open < stop, fill at open, else at stop if stop_candle['open'] < stop_price: sell_price = stop_candle['open'] else: sell_price = stop_price if debug: print(f"STOP LOSS triggered: entry={entry_price}, stop={stop_price}, sell_price={sell_price}, entry_time={entry_time}, stop_time={stop_candle.name}") btc_to_sell = coin fee_btc = btc_to_sell * transaction_cost btc_after_fee = btc_to_sell - fee_btc usd = btc_after_fee * sell_price trade_log.append({ 'type': 'STOP', 'entry': entry_price, 'exit': sell_price, 'entry_time': entry_time, 'exit_time': stop_candle.name, # Use index name instead of timestamp column 'fee_btc': fee_btc }) coin = 0 position = 0 entry_price = 0 current_trade_min1_start_idx = None continue # Update the start index for next check current_trade_min1_start_idx = current_min1_end_idx # Entry: only if not in position and signal changes to 1 if position == 0 and prev_mt != 1 and curr_mt == 1: # Buy at open, fee is charged in BTC (base currency) gross_btc = usd / price_open fee_btc = gross_btc * transaction_cost coin = gross_btc - fee_btc entry_price = price_open entry_time = date usd = 0 position = 1 current_trade_min1_start_idx = None # Will be set on first stop loss check trade_log.append({ 'type': 'BUY', 'entry': entry_price, 'exit': None, 'entry_time': entry_time, 'exit_time': None, 'fee_btc': fee_btc }) # Exit: only if in position and signal changes from 1 to -1 elif position == 1 and prev_mt == 1 and curr_mt == -1: # Sell at open, fee is charged in BTC (base currency) btc_to_sell = coin fee_btc = btc_to_sell * transaction_cost btc_after_fee = btc_to_sell - fee_btc usd = btc_after_fee * price_open trade_log.append({ 'type': 'SELL', 'entry': entry_price, 'exit': price_open, 'entry_time': entry_time, 'exit_time': date, 'fee_btc': fee_btc }) coin = 0 position = 0 entry_price = 0 current_trade_min1_start_idx = None # Track drawdown balance = usd if position == 0 else coin * price_close if balance > max_balance: max_balance = balance drawdown = (max_balance - balance) / max_balance drawdowns.append(drawdown) # If still in position at end, sell at last close if position == 1: btc_to_sell = coin fee_btc = btc_to_sell * transaction_cost btc_after_fee = btc_to_sell - fee_btc usd = btc_after_fee * df['close'].iloc[-1] trade_log.append({ 'type': 'EOD', 'entry': entry_price, 'exit': df['close'].iloc[-1], 'entry_time': entry_time, 'exit_time': df['timestamp'].iloc[-1], 'fee_btc': fee_btc }) coin = 0 position = 0 entry_price = 0 # Calculate statistics final_balance = usd n_trades = len(trade_log) wins = [1 for t in trade_log if t['exit'] is not None and t['exit'] > t['entry']] win_rate = len(wins) / n_trades if n_trades > 0 else 0 max_drawdown = max(drawdowns) if drawdowns else 0 avg_trade = np.mean([t['exit']/t['entry']-1 for t in trade_log if t['exit'] is not None]) if trade_log else 0 trades = [] total_fees_btc = 0.0 total_fees_usd = 0.0 for trade in trade_log: if trade['exit'] is not None: profit_pct = (trade['exit'] - trade['entry']) / trade['entry'] else: profit_pct = 0.0 trades.append({ 'entry_time': trade['entry_time'], 'exit_time': trade['exit_time'], 'entry': trade['entry'], 'exit': trade['exit'], 'profit_pct': profit_pct, 'type': trade.get('type', 'SELL') }) # Sum up BTC fees and their USD equivalent (use exit price if available) fee_btc = trade.get('fee_btc', 0.0) total_fees_btc += fee_btc if fee_btc and trade.get('exit') is not None: total_fees_usd += fee_btc * trade['exit'] results = { "initial_usd": initial_usd, "final_usd": final_balance, "n_trades": n_trades, "win_rate": win_rate, "max_drawdown": max_drawdown, "avg_trade": avg_trade, "trade_log": trade_log, "trades": trades, "total_fees_btc": total_fees_btc, "total_fees_usd": total_fees_usd, } if n_trades > 0: results["first_trade"] = { "entry_time": trade_log[0]['entry_time'], "entry": trade_log[0]['entry'] } results["last_trade"] = { "exit_time": trade_log[-1]['exit_time'], "exit": trade_log[-1]['exit'] } return results