diff --git a/cycles/__init__.py b/cycles/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/charts.py b/cycles/charts.py similarity index 100% rename from charts.py rename to cycles/charts.py diff --git a/main_debug.py b/cycles/main_debug.py similarity index 100% rename from main_debug.py rename to cycles/main_debug.py diff --git a/taxes.py b/cycles/taxes.py similarity index 100% rename from taxes.py rename to cycles/taxes.py diff --git a/trend_detector_simple.py b/cycles/trend_detector_simple.py similarity index 97% rename from trend_detector_simple.py rename to cycles/trend_detector_simple.py index 95a1bb5..d885959 100644 --- a/trend_detector_simple.py +++ b/cycles/trend_detector_simple.py @@ -1,849 +1,849 @@ -import pandas as pd -import numpy as np -import logging -from scipy.signal import find_peaks -from matplotlib.patches import Rectangle -from scipy import stats -import concurrent.futures -from functools import partial -from functools import lru_cache -import matplotlib.pyplot as plt - -# Color configuration -# Plot colors -DARK_BG_COLOR = '#181C27' -LEGEND_BG_COLOR = '#333333' -TITLE_COLOR = 'white' -AXIS_LABEL_COLOR = 'white' - -# Candlestick colors -CANDLE_UP_COLOR = '#089981' # Green -CANDLE_DOWN_COLOR = '#F23645' # Red - -# Marker colors -MIN_COLOR = 'red' -MAX_COLOR = 'green' - -# Line style colors -MIN_LINE_STYLE = 'g--' # Green dashed -MAX_LINE_STYLE = 'r--' # Red dashed -SMA7_LINE_STYLE = 'y-' # Yellow solid -SMA15_LINE_STYLE = 'm-' # Magenta solid - -# SuperTrend colors -ST_COLOR_UP = 'g-' -ST_COLOR_DOWN = 'r-' - -# Cache the calculation results by function parameters -@lru_cache(maxsize=32) -def cached_supertrend_calculation(period, multiplier, data_tuple): - # Convert tuple back to numpy arrays - high = np.array(data_tuple[0]) - low = np.array(data_tuple[1]) - close = np.array(data_tuple[2]) - - # Calculate TR and ATR using vectorized operations - tr = np.zeros_like(close) - tr[0] = high[0] - low[0] - hc_range = np.abs(high[1:] - close[:-1]) - lc_range = np.abs(low[1:] - close[:-1]) - hl_range = high[1:] - low[1:] - tr[1:] = np.maximum.reduce([hl_range, hc_range, lc_range]) - - # Use numpy's exponential moving average - atr = np.zeros_like(tr) - atr[0] = tr[0] - multiplier_ema = 2.0 / (period + 1) - for i in range(1, len(tr)): - atr[i] = (tr[i] * multiplier_ema) + (atr[i-1] * (1 - multiplier_ema)) - - # Calculate bands - upper_band = np.zeros_like(close) - lower_band = np.zeros_like(close) - for i in range(len(close)): - hl_avg = (high[i] + low[i]) / 2 - upper_band[i] = hl_avg + (multiplier * atr[i]) - lower_band[i] = hl_avg - (multiplier * atr[i]) - - final_upper = np.zeros_like(close) - final_lower = np.zeros_like(close) - supertrend = np.zeros_like(close) - trend = np.zeros_like(close) - final_upper[0] = upper_band[0] - final_lower[0] = lower_band[0] - if close[0] <= upper_band[0]: - supertrend[0] = upper_band[0] - trend[0] = -1 - else: - supertrend[0] = lower_band[0] - trend[0] = 1 - for i in range(1, len(close)): - if (upper_band[i] < final_upper[i-1]) or (close[i-1] > final_upper[i-1]): - final_upper[i] = upper_band[i] - else: - final_upper[i] = final_upper[i-1] - if (lower_band[i] > final_lower[i-1]) or (close[i-1] < final_lower[i-1]): - final_lower[i] = lower_band[i] - else: - final_lower[i] = final_lower[i-1] - if supertrend[i-1] == final_upper[i-1] and close[i] <= final_upper[i]: - supertrend[i] = final_upper[i] - trend[i] = -1 - elif supertrend[i-1] == final_upper[i-1] and close[i] > final_upper[i]: - supertrend[i] = final_lower[i] - trend[i] = 1 - elif supertrend[i-1] == final_lower[i-1] and close[i] >= final_lower[i]: - supertrend[i] = final_lower[i] - trend[i] = 1 - elif supertrend[i-1] == final_lower[i-1] and close[i] < final_lower[i]: - supertrend[i] = final_upper[i] - trend[i] = -1 - return { - 'supertrend': supertrend, - 'trend': trend, - 'upper_band': final_upper, - 'lower_band': final_lower - } - -def calculate_supertrend_external(data, period, multiplier): - # Convert DataFrame columns to hashable tuples - high_tuple = tuple(data['high']) - low_tuple = tuple(data['low']) - close_tuple = tuple(data['close']) - - # Call the cached function - return cached_supertrend_calculation(period, multiplier, (high_tuple, low_tuple, close_tuple)) - -class TrendDetectorSimple: - def __init__(self, data, verbose=False, display=False): - """ - Initialize the TrendDetectorSimple class. - - Parameters: - - data: pandas DataFrame containing price data - - verbose: boolean, whether to display detailed logging information - - display: boolean, whether to enable display/plotting features - """ - - self.data = data - self.verbose = verbose - self.display = display - - # Only define display-related variables if display is True - if self.display: - # Plot style configuration - self.plot_style = 'dark_background' - self.bg_color = DARK_BG_COLOR - self.plot_size = (12, 8) - - # Candlestick configuration - self.candle_width = 0.6 - self.candle_up_color = CANDLE_UP_COLOR - self.candle_down_color = CANDLE_DOWN_COLOR - self.candle_alpha = 0.8 - self.wick_width = 1 - - # Marker configuration - self.min_marker = '^' - self.min_color = MIN_COLOR - self.min_size = 100 - self.max_marker = 'v' - self.max_color = MAX_COLOR - self.max_size = 100 - self.marker_zorder = 100 - - # Line configuration - self.line_width = 1 - self.min_line_style = MIN_LINE_STYLE - self.max_line_style = MAX_LINE_STYLE - self.sma7_line_style = SMA7_LINE_STYLE - self.sma15_line_style = SMA15_LINE_STYLE - - # Text configuration - self.title_size = 14 - self.title_color = TITLE_COLOR - self.axis_label_size = 12 - self.axis_label_color = AXIS_LABEL_COLOR - - # Legend configuration - self.legend_loc = 'best' - self.legend_bg_color = LEGEND_BG_COLOR - - # Configure logging - logging.basicConfig(level=logging.INFO if verbose else logging.WARNING, - format='%(asctime)s - %(levelname)s - %(message)s') - self.logger = logging.getLogger('TrendDetectorSimple') - - # Convert data to pandas DataFrame if it's not already - if not isinstance(self.data, pd.DataFrame): - if isinstance(self.data, list): - self.data = pd.DataFrame({'close': self.data}) - else: - raise ValueError("Data must be a pandas DataFrame or a list") - - def calculate_tr(self): - """ - Calculate True Range (TR) for the price data. - - True Range is the greatest of: - 1. Current high - current low - 2. |Current high - previous close| - 3. |Current low - previous close| - - Returns: - - Numpy array of TR values - """ - df = self.data.copy() - high = df['high'].values - low = df['low'].values - close = df['close'].values - - tr = np.zeros_like(close) - tr[0] = high[0] - low[0] # First TR is just the first day's range - - for i in range(1, len(close)): - # Current high - current low - hl_range = high[i] - low[i] - # |Current high - previous close| - hc_range = abs(high[i] - close[i-1]) - # |Current low - previous close| - lc_range = abs(low[i] - close[i-1]) - - # TR is the maximum of these three values - tr[i] = max(hl_range, hc_range, lc_range) - - return tr - - def calculate_atr(self, period=14): - """ - Calculate Average True Range (ATR) for the price data. - - ATR is the exponential moving average of the True Range over a specified period. - - Parameters: - - period: int, the period for the ATR calculation (default: 14) - - Returns: - - Numpy array of ATR values - """ - - tr = self.calculate_tr() - atr = np.zeros_like(tr) - - # First ATR value is just the first TR - atr[0] = tr[0] - - # Calculate exponential moving average (EMA) of TR - multiplier = 2.0 / (period + 1) - - for i in range(1, len(tr)): - atr[i] = (tr[i] * multiplier) + (atr[i-1] * (1 - multiplier)) - - return atr - - def detect_trends(self): - """ - Detect trends by identifying local minima and maxima in the price data - using scipy.signal.find_peaks. - - Parameters: - - prominence: float, required prominence of peaks (relative to the price range) - - width: int, required width of peaks in data points - - Returns: - - DataFrame with columns for timestamps, prices, and trend indicators - - Dictionary containing analysis results including linear regression, SMAs, and SuperTrend indicators - """ - df = self.data - # close_prices = df['close'].values - - # max_peaks, _ = find_peaks(close_prices) - # min_peaks, _ = find_peaks(-close_prices) - - # df['is_min'] = False - # df['is_max'] = False - - # for peak in max_peaks: - # df.at[peak, 'is_max'] = True - # for peak in min_peaks: - # df.at[peak, 'is_min'] = True - - # result = df[['timestamp', 'close', 'is_min', 'is_max']].copy() - - # Perform linear regression on min_peaks and max_peaks - # min_prices = df['close'].iloc[min_peaks].values - # max_prices = df['close'].iloc[max_peaks].values - - # Linear regression for min peaks if we have at least 2 points - # min_slope, min_intercept, min_r_value, _, _ = stats.linregress(min_peaks, min_prices) - # Linear regression for max peaks if we have at least 2 points - # max_slope, max_intercept, max_r_value, _, _ = stats.linregress(max_peaks, max_prices) - - # Calculate Simple Moving Averages (SMA) for 7 and 15 periods - # sma_7 = pd.Series(close_prices).rolling(window=7, min_periods=1).mean().values - # sma_15 = pd.Series(close_prices).rolling(window=15, min_periods=1).mean().values - - analysis_results = {} - # analysis_results['linear_regression'] = { - # 'min': { - # 'slope': min_slope, - # 'intercept': min_intercept, - # 'r_squared': min_r_value ** 2 - # }, - # 'max': { - # 'slope': max_slope, - # 'intercept': max_intercept, - # 'r_squared': max_r_value ** 2 - # } - # } - # analysis_results['sma'] = { - # '7': sma_7, - # '15': sma_15 - # } - - # Calculate SuperTrend indicators - supertrend_results_list = self._calculate_supertrend_indicators() - analysis_results['supertrend'] = supertrend_results_list - - return analysis_results - - def _calculate_supertrend_indicators(self): - """ - Calculate SuperTrend indicators with different parameter sets in parallel. - Returns: - - list, the SuperTrend results - """ - supertrend_params = [ - {"period": 12, "multiplier": 3.0, "color_up": ST_COLOR_UP, "color_down": ST_COLOR_DOWN}, - {"period": 10, "multiplier": 1.0, "color_up": ST_COLOR_UP, "color_down": ST_COLOR_DOWN}, - {"period": 11, "multiplier": 2.0, "color_up": ST_COLOR_UP, "color_down": ST_COLOR_DOWN} - ] - data = self.data.copy() - - # For just 3 calculations, direct calculation might be faster than process pool - results = [] - for p in supertrend_params: - result = calculate_supertrend_external(data, p["period"], p["multiplier"]) - results.append(result) - - supertrend_results_list = [] - for params, result in zip(supertrend_params, results): - supertrend_results_list.append({ - "results": result, - "params": params - }) - return supertrend_results_list - - def plot_trends(self, trend_data, analysis_results, view="both"): - """ - Plot the price data with detected trends using a candlestick chart. - Also plots SuperTrend indicators with three different parameter sets. - - Parameters: - - trend_data: DataFrame, the output from detect_trends() - - analysis_results: Dictionary containing analysis results from detect_trends() - - view: str, one of 'both', 'trend', 'supertrend'; determines which plot(s) to display - - Returns: - - None (displays the plot) - """ - if not self.display: - return # Do nothing if display is False - - plt.style.use(self.plot_style) - - if view == "both": - fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(self.plot_size[0]*2, self.plot_size[1])) - else: - fig, ax = plt.subplots(figsize=self.plot_size) - ax1 = ax2 = None - if view == "trend": - ax1 = ax - elif view == "supertrend": - ax2 = ax - - fig.patch.set_facecolor(self.bg_color) - if ax1: ax1.set_facecolor(self.bg_color) - if ax2: ax2.set_facecolor(self.bg_color) - - df = self.data.copy() - - if ax1: - self._plot_trend_analysis(ax1, df, trend_data, analysis_results) - - if ax2: - self._plot_supertrend_analysis(ax2, df, analysis_results['supertrend']) - - plt.tight_layout() - plt.show() - - def _plot_candlesticks(self, ax, df): - """ - Plot candlesticks on the given axis. - - Parameters: - - ax: matplotlib.axes.Axes, the axis to plot on - - df: pandas.DataFrame, the data to plot - """ - from matplotlib.patches import Rectangle - - for i in range(len(df)): - # Get OHLC values for this candle - open_val = df['open'].iloc[i] - close_val = df['close'].iloc[i] - high_val = df['high'].iloc[i] - low_val = df['low'].iloc[i] - - # Determine candle color - color = self.candle_up_color if close_val >= open_val else self.candle_down_color - - # Plot candle body - body_height = abs(close_val - open_val) - bottom = min(open_val, close_val) - rect = Rectangle((i - self.candle_width/2, bottom), self.candle_width, body_height, - color=color, alpha=self.candle_alpha) - ax.add_patch(rect) - - # Plot candle wicks - ax.plot([i, i], [low_val, high_val], color=color, linewidth=self.wick_width) - - def _plot_trend_analysis(self, ax, df, trend_data, analysis_results): - """ - Plot trend analysis on the given axis. - - Parameters: - - ax: matplotlib.axes.Axes, the axis to plot on - - df: pandas.DataFrame, the data to plot - - trend_data: pandas.DataFrame, the trend data - - analysis_results: dict, the analysis results - """ - # Draw candlesticks - self._plot_candlesticks(ax, df) - - # Plot minima and maxima points - self._plot_min_max_points(ax, df, trend_data) - - # Plot trend lines and moving averages - if analysis_results: - self._plot_trend_lines(ax, df, analysis_results) - - # Configure the subplot - self._configure_subplot(ax, 'Price Chart with Trend Analysis', len(df)) - - def _plot_min_max_points(self, ax, df, trend_data): - """ - Plot minimum and maximum points on the given axis. - - Parameters: - - ax: matplotlib.axes.Axes, the axis to plot on - - df: pandas.DataFrame, the data to plot - - trend_data: pandas.DataFrame, the trend data - """ - min_indices = trend_data.index[trend_data['is_min'] == True].tolist() - if min_indices: - min_y = [df['close'].iloc[i] for i in min_indices] - ax.scatter(min_indices, min_y, color=self.min_color, s=self.min_size, - marker=self.min_marker, label='Local Minima', zorder=self.marker_zorder) - - max_indices = trend_data.index[trend_data['is_max'] == True].tolist() - if max_indices: - max_y = [df['close'].iloc[i] for i in max_indices] - ax.scatter(max_indices, max_y, color=self.max_color, s=self.max_size, - marker=self.max_marker, label='Local Maxima', zorder=self.marker_zorder) - - def _plot_trend_lines(self, ax, df, analysis_results): - """ - Plot trend lines on the given axis. - - Parameters: - - ax: matplotlib.axes.Axes, the axis to plot on - - df: pandas.DataFrame, the data to plot - - analysis_results: dict, the analysis results - """ - x_vals = np.arange(len(df)) - - # Minima regression line (support) - min_slope = analysis_results['linear_regression']['min']['slope'] - min_intercept = analysis_results['linear_regression']['min']['intercept'] - min_line = min_slope * x_vals + min_intercept - ax.plot(x_vals, min_line, self.min_line_style, linewidth=self.line_width, - label='Minima Regression') - - # Maxima regression line (resistance) - max_slope = analysis_results['linear_regression']['max']['slope'] - max_intercept = analysis_results['linear_regression']['max']['intercept'] - max_line = max_slope * x_vals + max_intercept - ax.plot(x_vals, max_line, self.max_line_style, linewidth=self.line_width, - label='Maxima Regression') - - # SMA-7 line - sma_7 = analysis_results['sma']['7'] - ax.plot(x_vals, sma_7, self.sma7_line_style, linewidth=self.line_width, - label='SMA-7') - - # SMA-15 line - sma_15 = analysis_results['sma']['15'] - valid_idx_15 = ~np.isnan(sma_15) - ax.plot(x_vals[valid_idx_15], sma_15[valid_idx_15], self.sma15_line_style, - linewidth=self.line_width, label='SMA-15') - - def _configure_subplot(self, ax, title, data_length): - """ - Configure the subplot with title, labels, limits, and legend. - - Parameters: - - ax: matplotlib.axes.Axes, the axis to configure - - title: str, the title of the subplot - - data_length: int, the length of the data - """ - # Set title and labels - ax.set_title(title, fontsize=self.title_size, color=self.title_color) - ax.set_xlabel('Date', fontsize=self.axis_label_size, color=self.axis_label_color) - ax.set_ylabel('Price', fontsize=self.axis_label_size, color=self.axis_label_color) - - # Set appropriate x-axis limits - ax.set_xlim(-0.5, data_length - 0.5) - - # Add a legend - ax.legend(loc=self.legend_loc, facecolor=self.legend_bg_color) - - def _plot_supertrend_analysis(self, ax, df, supertrend_results_list=None): - """ - Plot SuperTrend analysis on the given axis. - - Parameters: - - ax: matplotlib.axes.Axes, the axis to plot on - - df: pandas.DataFrame, the data to plot - - supertrend_results_list: list, the SuperTrend results (optional) - """ - self._plot_candlesticks(ax, df) - self._plot_supertrend_lines(ax, df, supertrend_results_list, style='Both') - self._configure_subplot(ax, 'Multiple SuperTrend Indicators', len(df)) - - def _plot_supertrend_lines(self, ax, df, supertrend_results_list, style="Horizontal"): - """ - Plot SuperTrend lines on the given axis. - - Parameters: - - ax: matplotlib.axes.Axes, the axis to plot on - - df: pandas.DataFrame, the data to plot - - supertrend_results_list: list, the SuperTrend results - """ - x_vals = np.arange(len(df)) - - if style == 'Horizontal' or style == 'Both': - if len(supertrend_results_list) != 3: - raise ValueError("Expected exactly 3 SuperTrend results for meta calculation") - - trends = [st["results"]["trend"] for st in supertrend_results_list] - - band_height = 0.02 * (df["high"].max() - df["low"].min()) - y_base = df["low"].min() - band_height * 1.5 - - prev_color = None - for i in range(1, len(x_vals)): - t_vals = [t[i] for t in trends] - up_count = t_vals.count(1) - down_count = t_vals.count(-1) - - if down_count == 3: - color = "red" - elif down_count == 2 and up_count == 1: - color = "orange" - elif down_count == 1 and up_count == 2: - color = "yellow" - elif up_count == 3: - color = "green" - else: - continue # skip if unknown or inconsistent values - - ax.add_patch(Rectangle( - (x_vals[i-1], y_base), - 1, - band_height, - color=color, - linewidth=0, - alpha=0.6 - )) - # Draw a vertical line at the change of color - if prev_color and prev_color != color: - ax.axvline(x_vals[i-1], color="grey", alpha=0.3, linewidth=1) - prev_color = color - - ax.set_ylim(bottom=y_base - band_height * 0.5) - if style == 'Curves' or style == 'Both': - for st in supertrend_results_list: - params = st["params"] - results = st["results"] - supertrend = results["supertrend"] - trend = results["trend"] - - # Plot SuperTrend line with color based on trend - for i in range(1, len(x_vals)): - if trend[i] == 1: # Uptrend - ax.plot(x_vals[i-1:i+1], supertrend[i-1:i+1], params["color_up"], linewidth=self.line_width) - else: # Downtrend - ax.plot(x_vals[i-1:i+1], supertrend[i-1:i+1], params["color_down"], linewidth=self.line_width) - self._plot_metasupertrend_lines(ax, df, supertrend_results_list) - self._add_supertrend_legend(ax, supertrend_results_list) - - def _plot_metasupertrend_lines(self, ax, df, supertrend_results_list): - """ - Plot a Meta SuperTrend line where all individual SuperTrends agree on trend. - - Parameters: - - ax: matplotlib.axes.Axes, the axis to plot on - - df: pandas.DataFrame, the data to plot - - supertrend_results_list: list, each item contains SuperTrend 'results' and 'params' - """ - x_vals = np.arange(len(df)) - - if len(supertrend_results_list) != 3: - raise ValueError("Expected exactly 3 SuperTrend results for meta calculation") - - trends = [st["results"]["trend"] for st in supertrend_results_list] - supertrends = [st["results"]["supertrend"] for st in supertrend_results_list] - params = supertrend_results_list[0]["params"] # Use first config for styling - - trends_arr = np.stack(trends, axis=1) - meta_trend = np.where((trends_arr[:,0] == trends_arr[:,1]) & (trends_arr[:,1] == trends_arr[:,2]), trends_arr[:,0], 0) - - for i in range(1, len(x_vals)): - t1, t2, t3 = trends[0][i], trends[1][i], trends[2][i] - if t1 == t2 == t3: - meta_trend = t1 - # Average the 3 supertrend values - st_avg_prev = np.mean([s[i-1] for s in supertrends]) - st_avg_curr = np.mean([s[i] for s in supertrends]) - color = params["color_up"] if meta_trend == 1 else params["color_down"] - ax.plot(x_vals[i-1:i+1], [st_avg_prev, st_avg_curr], color, linewidth=self.line_width) - - def _add_supertrend_legend(self, ax, supertrend_results_list): - """ - Add SuperTrend legend entries to the given axis. - - Parameters: - - ax: matplotlib.axes.Axes, the axis to add legend entries to - - supertrend_results_list: list, the SuperTrend results - """ - for st in supertrend_results_list: - params = st["params"] - period = params["period"] - multiplier = params["multiplier"] - color_up = params["color_up"] - color_down = params["color_down"] - - ax.plot([], [], color_up, linewidth=self.line_width, - label=f'ST (P:{period}, M:{multiplier}) Up') - ax.plot([], [], color_down, linewidth=self.line_width, - label=f'ST (P:{period}, M:{multiplier}) Down') - - def backtest_meta_supertrend(self, min1_df, initial_usd=10000, stop_loss_pct=0.05, transaction_cost=0.001, debug=False): - """ - Backtest a simple strategy using the meta supertrend (all three supertrends agree). - Buys when meta supertrend is positive, sells when negative, applies a percentage stop loss. - - Parameters: - - min1_df: pandas DataFrame, 1-minute timeframe data for more accurate stop loss checking (optional) - - initial_usd: float, starting USD amount - - stop_loss_pct: float, stop loss as a fraction (e.g. 0.05 for 5%) - - transaction_cost: float, transaction cost as a fraction (e.g. 0.001 for 0.1%) - - debug: bool, whether to print debug info - """ - df = self.data.copy().reset_index(drop=True) - df['timestamp'] = pd.to_datetime(df['timestamp']) - - # Get meta supertrend (all three agree) - supertrend_results_list = self._calculate_supertrend_indicators() - trends = [st['results']['trend'] for st in supertrend_results_list] - trends_arr = np.stack(trends, axis=1) - meta_trend = np.where((trends_arr[:,0] == trends_arr[:,1]) & (trends_arr[:,1] == trends_arr[:,2]), - trends_arr[:,0], 0) - - position = 0 # 0 = no position, 1 = long - entry_price = 0 - usd = initial_usd - coin = 0 - trade_log = [] - max_balance = initial_usd - drawdowns = [] - trades = [] - entry_time = None - current_trade_min1_start_idx = None - - min1_df['timestamp'] = pd.to_datetime(min1_df.index) - - for i in range(1, len(df)): - if i % 100 == 0 and debug: - self.logger.debug(f"Progress: {i}/{len(df)} rows processed.") - - price_open = df['open'].iloc[i] - price_high = df['high'].iloc[i] - price_low = df['low'].iloc[i] - price_close = df['close'].iloc[i] - date = df['timestamp'].iloc[i] - prev_mt = meta_trend[i-1] - curr_mt = meta_trend[i] - - # Check stop loss if in position - if position == 1: - stop_price = entry_price * (1 - stop_loss_pct) - - if current_trade_min1_start_idx is None: - # First check after entry, find the entry point in 1-min data - current_trade_min1_start_idx = min1_df.index[min1_df.index >= entry_time][0] - - # Get the end index for current check - current_min1_end_idx = min1_df.index[min1_df.index <= date][-1] - - # Check all 1-minute candles in between for stop loss - min1_slice = min1_df.loc[current_trade_min1_start_idx:current_min1_end_idx] - if (min1_slice['low'] <= stop_price).any(): - # Stop loss triggered, find the exact candle - stop_candle = min1_slice[min1_slice['low'] <= stop_price].iloc[0] - # More realistic fill: if open < stop, fill at open, else at stop - if stop_candle['open'] < stop_price: - sell_price = stop_candle['open'] - else: - sell_price = stop_price - if debug: - print(f"STOP LOSS triggered: entry={entry_price}, stop={stop_price}, sell_price={sell_price}, entry_time={entry_time}, stop_time={stop_candle.name}") - btc_to_sell = coin - fee_btc = btc_to_sell * transaction_cost - btc_after_fee = btc_to_sell - fee_btc - usd = btc_after_fee * sell_price - trade_log.append({ - 'type': 'STOP', - 'entry': entry_price, - 'exit': sell_price, - 'entry_time': entry_time, - 'exit_time': stop_candle.name, # Use index name instead of timestamp column - 'fee_btc': fee_btc - }) - coin = 0 - position = 0 - entry_price = 0 - current_trade_min1_start_idx = None - continue - - # Update the start index for next check - current_trade_min1_start_idx = current_min1_end_idx - - # Entry: only if not in position and signal changes to 1 - if position == 0 and prev_mt != 1 and curr_mt == 1: - # Buy at open, fee is charged in BTC (base currency) - gross_btc = usd / price_open - fee_btc = gross_btc * transaction_cost - coin = gross_btc - fee_btc - entry_price = price_open - entry_time = date - usd = 0 - position = 1 - current_trade_min1_start_idx = None # Will be set on first stop loss check - trade_log.append({ - 'type': 'BUY', - 'entry': entry_price, - 'exit': None, - 'entry_time': entry_time, - 'exit_time': None, - 'fee_btc': fee_btc - }) - - # Exit: only if in position and signal changes from 1 to -1 - elif position == 1 and prev_mt == 1 and curr_mt == -1: - # Sell at open, fee is charged in BTC (base currency) - btc_to_sell = coin - fee_btc = btc_to_sell * transaction_cost - btc_after_fee = btc_to_sell - fee_btc - usd = btc_after_fee * price_open - trade_log.append({ - 'type': 'SELL', - 'entry': entry_price, - 'exit': price_open, - 'entry_time': entry_time, - 'exit_time': date, - 'fee_btc': fee_btc - }) - coin = 0 - position = 0 - entry_price = 0 - current_trade_min1_start_idx = None - - # Track drawdown - balance = usd if position == 0 else coin * price_close - if balance > max_balance: - max_balance = balance - drawdown = (max_balance - balance) / max_balance - drawdowns.append(drawdown) - - # If still in position at end, sell at last close - if position == 1: - btc_to_sell = coin - fee_btc = btc_to_sell * transaction_cost - btc_after_fee = btc_to_sell - fee_btc - usd = btc_after_fee * df['close'].iloc[-1] - trade_log.append({ - 'type': 'EOD', - 'entry': entry_price, - 'exit': df['close'].iloc[-1], - 'entry_time': entry_time, - 'exit_time': df['timestamp'].iloc[-1], - 'fee_btc': fee_btc - }) - coin = 0 - position = 0 - entry_price = 0 - - # Calculate statistics - final_balance = usd - n_trades = len(trade_log) - wins = [1 for t in trade_log if t['exit'] is not None and t['exit'] > t['entry']] - win_rate = len(wins) / n_trades if n_trades > 0 else 0 - max_drawdown = max(drawdowns) if drawdowns else 0 - avg_trade = np.mean([t['exit']/t['entry']-1 for t in trade_log if t['exit'] is not None]) if trade_log else 0 - - trades = [] - total_fees_btc = 0.0 - total_fees_usd = 0.0 - for trade in trade_log: - if trade['exit'] is not None: - profit_pct = (trade['exit'] - trade['entry']) / trade['entry'] - else: - profit_pct = 0.0 - trades.append({ - 'entry_time': trade['entry_time'], - 'exit_time': trade['exit_time'], - 'entry': trade['entry'], - 'exit': trade['exit'], - 'profit_pct': profit_pct, - 'type': trade.get('type', 'SELL') - }) - # Sum up BTC fees and their USD equivalent (use exit price if available) - fee_btc = trade.get('fee_btc', 0.0) - total_fees_btc += fee_btc - if fee_btc and trade.get('exit') is not None: - total_fees_usd += fee_btc * trade['exit'] - - results = { - "initial_usd": initial_usd, - "final_usd": final_balance, - "n_trades": n_trades, - "win_rate": win_rate, - "max_drawdown": max_drawdown, - "avg_trade": avg_trade, - "trade_log": trade_log, - "trades": trades, - "total_fees_btc": total_fees_btc, - "total_fees_usd": total_fees_usd, - } - if n_trades > 0: - results["first_trade"] = { - "entry_time": trade_log[0]['entry_time'], - "entry": trade_log[0]['entry'] - } - results["last_trade"] = { - "exit_time": trade_log[-1]['exit_time'], - "exit": trade_log[-1]['exit'] - } - return results +import pandas as pd +import numpy as np +import logging +from scipy.signal import find_peaks +from matplotlib.patches import Rectangle +from scipy import stats +import concurrent.futures +from functools import partial +from functools import lru_cache +import matplotlib.pyplot as plt + +# Color configuration +# Plot colors +DARK_BG_COLOR = '#181C27' +LEGEND_BG_COLOR = '#333333' +TITLE_COLOR = 'white' +AXIS_LABEL_COLOR = 'white' + +# Candlestick colors +CANDLE_UP_COLOR = '#089981' # Green +CANDLE_DOWN_COLOR = '#F23645' # Red + +# Marker colors +MIN_COLOR = 'red' +MAX_COLOR = 'green' + +# Line style colors +MIN_LINE_STYLE = 'g--' # Green dashed +MAX_LINE_STYLE = 'r--' # Red dashed +SMA7_LINE_STYLE = 'y-' # Yellow solid +SMA15_LINE_STYLE = 'm-' # Magenta solid + +# SuperTrend colors +ST_COLOR_UP = 'g-' +ST_COLOR_DOWN = 'r-' + +# Cache the calculation results by function parameters +@lru_cache(maxsize=32) +def cached_supertrend_calculation(period, multiplier, data_tuple): + # Convert tuple back to numpy arrays + high = np.array(data_tuple[0]) + low = np.array(data_tuple[1]) + close = np.array(data_tuple[2]) + + # Calculate TR and ATR using vectorized operations + tr = np.zeros_like(close) + tr[0] = high[0] - low[0] + hc_range = np.abs(high[1:] - close[:-1]) + lc_range = np.abs(low[1:] - close[:-1]) + hl_range = high[1:] - low[1:] + tr[1:] = np.maximum.reduce([hl_range, hc_range, lc_range]) + + # Use numpy's exponential moving average + atr = np.zeros_like(tr) + atr[0] = tr[0] + multiplier_ema = 2.0 / (period + 1) + for i in range(1, len(tr)): + atr[i] = (tr[i] * multiplier_ema) + (atr[i-1] * (1 - multiplier_ema)) + + # Calculate bands + upper_band = np.zeros_like(close) + lower_band = np.zeros_like(close) + for i in range(len(close)): + hl_avg = (high[i] + low[i]) / 2 + upper_band[i] = hl_avg + (multiplier * atr[i]) + lower_band[i] = hl_avg - (multiplier * atr[i]) + + final_upper = np.zeros_like(close) + final_lower = np.zeros_like(close) + supertrend = np.zeros_like(close) + trend = np.zeros_like(close) + final_upper[0] = upper_band[0] + final_lower[0] = lower_band[0] + if close[0] <= upper_band[0]: + supertrend[0] = upper_band[0] + trend[0] = -1 + else: + supertrend[0] = lower_band[0] + trend[0] = 1 + for i in range(1, len(close)): + if (upper_band[i] < final_upper[i-1]) or (close[i-1] > final_upper[i-1]): + final_upper[i] = upper_band[i] + else: + final_upper[i] = final_upper[i-1] + if (lower_band[i] > final_lower[i-1]) or (close[i-1] < final_lower[i-1]): + final_lower[i] = lower_band[i] + else: + final_lower[i] = final_lower[i-1] + if supertrend[i-1] == final_upper[i-1] and close[i] <= final_upper[i]: + supertrend[i] = final_upper[i] + trend[i] = -1 + elif supertrend[i-1] == final_upper[i-1] and close[i] > final_upper[i]: + supertrend[i] = final_lower[i] + trend[i] = 1 + elif supertrend[i-1] == final_lower[i-1] and close[i] >= final_lower[i]: + supertrend[i] = final_lower[i] + trend[i] = 1 + elif supertrend[i-1] == final_lower[i-1] and close[i] < final_lower[i]: + supertrend[i] = final_upper[i] + trend[i] = -1 + return { + 'supertrend': supertrend, + 'trend': trend, + 'upper_band': final_upper, + 'lower_band': final_lower + } + +def calculate_supertrend_external(data, period, multiplier): + # Convert DataFrame columns to hashable tuples + high_tuple = tuple(data['high']) + low_tuple = tuple(data['low']) + close_tuple = tuple(data['close']) + + # Call the cached function + return cached_supertrend_calculation(period, multiplier, (high_tuple, low_tuple, close_tuple)) + +class TrendDetectorSimple: + def __init__(self, data, verbose=False, display=False): + """ + Initialize the TrendDetectorSimple class. + + Parameters: + - data: pandas DataFrame containing price data + - verbose: boolean, whether to display detailed logging information + - display: boolean, whether to enable display/plotting features + """ + + self.data = data + self.verbose = verbose + self.display = display + + # Only define display-related variables if display is True + if self.display: + # Plot style configuration + self.plot_style = 'dark_background' + self.bg_color = DARK_BG_COLOR + self.plot_size = (12, 8) + + # Candlestick configuration + self.candle_width = 0.6 + self.candle_up_color = CANDLE_UP_COLOR + self.candle_down_color = CANDLE_DOWN_COLOR + self.candle_alpha = 0.8 + self.wick_width = 1 + + # Marker configuration + self.min_marker = '^' + self.min_color = MIN_COLOR + self.min_size = 100 + self.max_marker = 'v' + self.max_color = MAX_COLOR + self.max_size = 100 + self.marker_zorder = 100 + + # Line configuration + self.line_width = 1 + self.min_line_style = MIN_LINE_STYLE + self.max_line_style = MAX_LINE_STYLE + self.sma7_line_style = SMA7_LINE_STYLE + self.sma15_line_style = SMA15_LINE_STYLE + + # Text configuration + self.title_size = 14 + self.title_color = TITLE_COLOR + self.axis_label_size = 12 + self.axis_label_color = AXIS_LABEL_COLOR + + # Legend configuration + self.legend_loc = 'best' + self.legend_bg_color = LEGEND_BG_COLOR + + # Configure logging + logging.basicConfig(level=logging.INFO if verbose else logging.WARNING, + format='%(asctime)s - %(levelname)s - %(message)s') + self.logger = logging.getLogger('TrendDetectorSimple') + + # Convert data to pandas DataFrame if it's not already + if not isinstance(self.data, pd.DataFrame): + if isinstance(self.data, list): + self.data = pd.DataFrame({'close': self.data}) + else: + raise ValueError("Data must be a pandas DataFrame or a list") + + def calculate_tr(self): + """ + Calculate True Range (TR) for the price data. + + True Range is the greatest of: + 1. Current high - current low + 2. |Current high - previous close| + 3. |Current low - previous close| + + Returns: + - Numpy array of TR values + """ + df = self.data.copy() + high = df['high'].values + low = df['low'].values + close = df['close'].values + + tr = np.zeros_like(close) + tr[0] = high[0] - low[0] # First TR is just the first day's range + + for i in range(1, len(close)): + # Current high - current low + hl_range = high[i] - low[i] + # |Current high - previous close| + hc_range = abs(high[i] - close[i-1]) + # |Current low - previous close| + lc_range = abs(low[i] - close[i-1]) + + # TR is the maximum of these three values + tr[i] = max(hl_range, hc_range, lc_range) + + return tr + + def calculate_atr(self, period=14): + """ + Calculate Average True Range (ATR) for the price data. + + ATR is the exponential moving average of the True Range over a specified period. + + Parameters: + - period: int, the period for the ATR calculation (default: 14) + + Returns: + - Numpy array of ATR values + """ + + tr = self.calculate_tr() + atr = np.zeros_like(tr) + + # First ATR value is just the first TR + atr[0] = tr[0] + + # Calculate exponential moving average (EMA) of TR + multiplier = 2.0 / (period + 1) + + for i in range(1, len(tr)): + atr[i] = (tr[i] * multiplier) + (atr[i-1] * (1 - multiplier)) + + return atr + + def detect_trends(self): + """ + Detect trends by identifying local minima and maxima in the price data + using scipy.signal.find_peaks. + + Parameters: + - prominence: float, required prominence of peaks (relative to the price range) + - width: int, required width of peaks in data points + + Returns: + - DataFrame with columns for timestamps, prices, and trend indicators + - Dictionary containing analysis results including linear regression, SMAs, and SuperTrend indicators + """ + df = self.data + # close_prices = df['close'].values + + # max_peaks, _ = find_peaks(close_prices) + # min_peaks, _ = find_peaks(-close_prices) + + # df['is_min'] = False + # df['is_max'] = False + + # for peak in max_peaks: + # df.at[peak, 'is_max'] = True + # for peak in min_peaks: + # df.at[peak, 'is_min'] = True + + # result = df[['timestamp', 'close', 'is_min', 'is_max']].copy() + + # Perform linear regression on min_peaks and max_peaks + # min_prices = df['close'].iloc[min_peaks].values + # max_prices = df['close'].iloc[max_peaks].values + + # Linear regression for min peaks if we have at least 2 points + # min_slope, min_intercept, min_r_value, _, _ = stats.linregress(min_peaks, min_prices) + # Linear regression for max peaks if we have at least 2 points + # max_slope, max_intercept, max_r_value, _, _ = stats.linregress(max_peaks, max_prices) + + # Calculate Simple Moving Averages (SMA) for 7 and 15 periods + # sma_7 = pd.Series(close_prices).rolling(window=7, min_periods=1).mean().values + # sma_15 = pd.Series(close_prices).rolling(window=15, min_periods=1).mean().values + + analysis_results = {} + # analysis_results['linear_regression'] = { + # 'min': { + # 'slope': min_slope, + # 'intercept': min_intercept, + # 'r_squared': min_r_value ** 2 + # }, + # 'max': { + # 'slope': max_slope, + # 'intercept': max_intercept, + # 'r_squared': max_r_value ** 2 + # } + # } + # analysis_results['sma'] = { + # '7': sma_7, + # '15': sma_15 + # } + + # Calculate SuperTrend indicators + supertrend_results_list = self._calculate_supertrend_indicators() + analysis_results['supertrend'] = supertrend_results_list + + return analysis_results + + def _calculate_supertrend_indicators(self): + """ + Calculate SuperTrend indicators with different parameter sets in parallel. + Returns: + - list, the SuperTrend results + """ + supertrend_params = [ + {"period": 12, "multiplier": 3.0, "color_up": ST_COLOR_UP, "color_down": ST_COLOR_DOWN}, + {"period": 10, "multiplier": 1.0, "color_up": ST_COLOR_UP, "color_down": ST_COLOR_DOWN}, + {"period": 11, "multiplier": 2.0, "color_up": ST_COLOR_UP, "color_down": ST_COLOR_DOWN} + ] + data = self.data.copy() + + # For just 3 calculations, direct calculation might be faster than process pool + results = [] + for p in supertrend_params: + result = calculate_supertrend_external(data, p["period"], p["multiplier"]) + results.append(result) + + supertrend_results_list = [] + for params, result in zip(supertrend_params, results): + supertrend_results_list.append({ + "results": result, + "params": params + }) + return supertrend_results_list + + def plot_trends(self, trend_data, analysis_results, view="both"): + """ + Plot the price data with detected trends using a candlestick chart. + Also plots SuperTrend indicators with three different parameter sets. + + Parameters: + - trend_data: DataFrame, the output from detect_trends() + - analysis_results: Dictionary containing analysis results from detect_trends() + - view: str, one of 'both', 'trend', 'supertrend'; determines which plot(s) to display + + Returns: + - None (displays the plot) + """ + if not self.display: + return # Do nothing if display is False + + plt.style.use(self.plot_style) + + if view == "both": + fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(self.plot_size[0]*2, self.plot_size[1])) + else: + fig, ax = plt.subplots(figsize=self.plot_size) + ax1 = ax2 = None + if view == "trend": + ax1 = ax + elif view == "supertrend": + ax2 = ax + + fig.patch.set_facecolor(self.bg_color) + if ax1: ax1.set_facecolor(self.bg_color) + if ax2: ax2.set_facecolor(self.bg_color) + + df = self.data.copy() + + if ax1: + self._plot_trend_analysis(ax1, df, trend_data, analysis_results) + + if ax2: + self._plot_supertrend_analysis(ax2, df, analysis_results['supertrend']) + + plt.tight_layout() + plt.show() + + def _plot_candlesticks(self, ax, df): + """ + Plot candlesticks on the given axis. + + Parameters: + - ax: matplotlib.axes.Axes, the axis to plot on + - df: pandas.DataFrame, the data to plot + """ + from matplotlib.patches import Rectangle + + for i in range(len(df)): + # Get OHLC values for this candle + open_val = df['open'].iloc[i] + close_val = df['close'].iloc[i] + high_val = df['high'].iloc[i] + low_val = df['low'].iloc[i] + + # Determine candle color + color = self.candle_up_color if close_val >= open_val else self.candle_down_color + + # Plot candle body + body_height = abs(close_val - open_val) + bottom = min(open_val, close_val) + rect = Rectangle((i - self.candle_width/2, bottom), self.candle_width, body_height, + color=color, alpha=self.candle_alpha) + ax.add_patch(rect) + + # Plot candle wicks + ax.plot([i, i], [low_val, high_val], color=color, linewidth=self.wick_width) + + def _plot_trend_analysis(self, ax, df, trend_data, analysis_results): + """ + Plot trend analysis on the given axis. + + Parameters: + - ax: matplotlib.axes.Axes, the axis to plot on + - df: pandas.DataFrame, the data to plot + - trend_data: pandas.DataFrame, the trend data + - analysis_results: dict, the analysis results + """ + # Draw candlesticks + self._plot_candlesticks(ax, df) + + # Plot minima and maxima points + self._plot_min_max_points(ax, df, trend_data) + + # Plot trend lines and moving averages + if analysis_results: + self._plot_trend_lines(ax, df, analysis_results) + + # Configure the subplot + self._configure_subplot(ax, 'Price Chart with Trend Analysis', len(df)) + + def _plot_min_max_points(self, ax, df, trend_data): + """ + Plot minimum and maximum points on the given axis. + + Parameters: + - ax: matplotlib.axes.Axes, the axis to plot on + - df: pandas.DataFrame, the data to plot + - trend_data: pandas.DataFrame, the trend data + """ + min_indices = trend_data.index[trend_data['is_min'] == True].tolist() + if min_indices: + min_y = [df['close'].iloc[i] for i in min_indices] + ax.scatter(min_indices, min_y, color=self.min_color, s=self.min_size, + marker=self.min_marker, label='Local Minima', zorder=self.marker_zorder) + + max_indices = trend_data.index[trend_data['is_max'] == True].tolist() + if max_indices: + max_y = [df['close'].iloc[i] for i in max_indices] + ax.scatter(max_indices, max_y, color=self.max_color, s=self.max_size, + marker=self.max_marker, label='Local Maxima', zorder=self.marker_zorder) + + def _plot_trend_lines(self, ax, df, analysis_results): + """ + Plot trend lines on the given axis. + + Parameters: + - ax: matplotlib.axes.Axes, the axis to plot on + - df: pandas.DataFrame, the data to plot + - analysis_results: dict, the analysis results + """ + x_vals = np.arange(len(df)) + + # Minima regression line (support) + min_slope = analysis_results['linear_regression']['min']['slope'] + min_intercept = analysis_results['linear_regression']['min']['intercept'] + min_line = min_slope * x_vals + min_intercept + ax.plot(x_vals, min_line, self.min_line_style, linewidth=self.line_width, + label='Minima Regression') + + # Maxima regression line (resistance) + max_slope = analysis_results['linear_regression']['max']['slope'] + max_intercept = analysis_results['linear_regression']['max']['intercept'] + max_line = max_slope * x_vals + max_intercept + ax.plot(x_vals, max_line, self.max_line_style, linewidth=self.line_width, + label='Maxima Regression') + + # SMA-7 line + sma_7 = analysis_results['sma']['7'] + ax.plot(x_vals, sma_7, self.sma7_line_style, linewidth=self.line_width, + label='SMA-7') + + # SMA-15 line + sma_15 = analysis_results['sma']['15'] + valid_idx_15 = ~np.isnan(sma_15) + ax.plot(x_vals[valid_idx_15], sma_15[valid_idx_15], self.sma15_line_style, + linewidth=self.line_width, label='SMA-15') + + def _configure_subplot(self, ax, title, data_length): + """ + Configure the subplot with title, labels, limits, and legend. + + Parameters: + - ax: matplotlib.axes.Axes, the axis to configure + - title: str, the title of the subplot + - data_length: int, the length of the data + """ + # Set title and labels + ax.set_title(title, fontsize=self.title_size, color=self.title_color) + ax.set_xlabel('Date', fontsize=self.axis_label_size, color=self.axis_label_color) + ax.set_ylabel('Price', fontsize=self.axis_label_size, color=self.axis_label_color) + + # Set appropriate x-axis limits + ax.set_xlim(-0.5, data_length - 0.5) + + # Add a legend + ax.legend(loc=self.legend_loc, facecolor=self.legend_bg_color) + + def _plot_supertrend_analysis(self, ax, df, supertrend_results_list=None): + """ + Plot SuperTrend analysis on the given axis. + + Parameters: + - ax: matplotlib.axes.Axes, the axis to plot on + - df: pandas.DataFrame, the data to plot + - supertrend_results_list: list, the SuperTrend results (optional) + """ + self._plot_candlesticks(ax, df) + self._plot_supertrend_lines(ax, df, supertrend_results_list, style='Both') + self._configure_subplot(ax, 'Multiple SuperTrend Indicators', len(df)) + + def _plot_supertrend_lines(self, ax, df, supertrend_results_list, style="Horizontal"): + """ + Plot SuperTrend lines on the given axis. + + Parameters: + - ax: matplotlib.axes.Axes, the axis to plot on + - df: pandas.DataFrame, the data to plot + - supertrend_results_list: list, the SuperTrend results + """ + x_vals = np.arange(len(df)) + + if style == 'Horizontal' or style == 'Both': + if len(supertrend_results_list) != 3: + raise ValueError("Expected exactly 3 SuperTrend results for meta calculation") + + trends = [st["results"]["trend"] for st in supertrend_results_list] + + band_height = 0.02 * (df["high"].max() - df["low"].min()) + y_base = df["low"].min() - band_height * 1.5 + + prev_color = None + for i in range(1, len(x_vals)): + t_vals = [t[i] for t in trends] + up_count = t_vals.count(1) + down_count = t_vals.count(-1) + + if down_count == 3: + color = "red" + elif down_count == 2 and up_count == 1: + color = "orange" + elif down_count == 1 and up_count == 2: + color = "yellow" + elif up_count == 3: + color = "green" + else: + continue # skip if unknown or inconsistent values + + ax.add_patch(Rectangle( + (x_vals[i-1], y_base), + 1, + band_height, + color=color, + linewidth=0, + alpha=0.6 + )) + # Draw a vertical line at the change of color + if prev_color and prev_color != color: + ax.axvline(x_vals[i-1], color="grey", alpha=0.3, linewidth=1) + prev_color = color + + ax.set_ylim(bottom=y_base - band_height * 0.5) + if style == 'Curves' or style == 'Both': + for st in supertrend_results_list: + params = st["params"] + results = st["results"] + supertrend = results["supertrend"] + trend = results["trend"] + + # Plot SuperTrend line with color based on trend + for i in range(1, len(x_vals)): + if trend[i] == 1: # Uptrend + ax.plot(x_vals[i-1:i+1], supertrend[i-1:i+1], params["color_up"], linewidth=self.line_width) + else: # Downtrend + ax.plot(x_vals[i-1:i+1], supertrend[i-1:i+1], params["color_down"], linewidth=self.line_width) + self._plot_metasupertrend_lines(ax, df, supertrend_results_list) + self._add_supertrend_legend(ax, supertrend_results_list) + + def _plot_metasupertrend_lines(self, ax, df, supertrend_results_list): + """ + Plot a Meta SuperTrend line where all individual SuperTrends agree on trend. + + Parameters: + - ax: matplotlib.axes.Axes, the axis to plot on + - df: pandas.DataFrame, the data to plot + - supertrend_results_list: list, each item contains SuperTrend 'results' and 'params' + """ + x_vals = np.arange(len(df)) + + if len(supertrend_results_list) != 3: + raise ValueError("Expected exactly 3 SuperTrend results for meta calculation") + + trends = [st["results"]["trend"] for st in supertrend_results_list] + supertrends = [st["results"]["supertrend"] for st in supertrend_results_list] + params = supertrend_results_list[0]["params"] # Use first config for styling + + trends_arr = np.stack(trends, axis=1) + meta_trend = np.where((trends_arr[:,0] == trends_arr[:,1]) & (trends_arr[:,1] == trends_arr[:,2]), trends_arr[:,0], 0) + + for i in range(1, len(x_vals)): + t1, t2, t3 = trends[0][i], trends[1][i], trends[2][i] + if t1 == t2 == t3: + meta_trend = t1 + # Average the 3 supertrend values + st_avg_prev = np.mean([s[i-1] for s in supertrends]) + st_avg_curr = np.mean([s[i] for s in supertrends]) + color = params["color_up"] if meta_trend == 1 else params["color_down"] + ax.plot(x_vals[i-1:i+1], [st_avg_prev, st_avg_curr], color, linewidth=self.line_width) + + def _add_supertrend_legend(self, ax, supertrend_results_list): + """ + Add SuperTrend legend entries to the given axis. + + Parameters: + - ax: matplotlib.axes.Axes, the axis to add legend entries to + - supertrend_results_list: list, the SuperTrend results + """ + for st in supertrend_results_list: + params = st["params"] + period = params["period"] + multiplier = params["multiplier"] + color_up = params["color_up"] + color_down = params["color_down"] + + ax.plot([], [], color_up, linewidth=self.line_width, + label=f'ST (P:{period}, M:{multiplier}) Up') + ax.plot([], [], color_down, linewidth=self.line_width, + label=f'ST (P:{period}, M:{multiplier}) Down') + + def backtest_meta_supertrend(self, min1_df, initial_usd=10000, stop_loss_pct=0.05, transaction_cost=0.001, debug=False): + """ + Backtest a simple strategy using the meta supertrend (all three supertrends agree). + Buys when meta supertrend is positive, sells when negative, applies a percentage stop loss. + + Parameters: + - min1_df: pandas DataFrame, 1-minute timeframe data for more accurate stop loss checking (optional) + - initial_usd: float, starting USD amount + - stop_loss_pct: float, stop loss as a fraction (e.g. 0.05 for 5%) + - transaction_cost: float, transaction cost as a fraction (e.g. 0.001 for 0.1%) + - debug: bool, whether to print debug info + """ + df = self.data.copy().reset_index(drop=True) + df['timestamp'] = pd.to_datetime(df['timestamp']) + + # Get meta supertrend (all three agree) + supertrend_results_list = self._calculate_supertrend_indicators() + trends = [st['results']['trend'] for st in supertrend_results_list] + trends_arr = np.stack(trends, axis=1) + meta_trend = np.where((trends_arr[:,0] == trends_arr[:,1]) & (trends_arr[:,1] == trends_arr[:,2]), + trends_arr[:,0], 0) + + position = 0 # 0 = no position, 1 = long + entry_price = 0 + usd = initial_usd + coin = 0 + trade_log = [] + max_balance = initial_usd + drawdowns = [] + trades = [] + entry_time = None + current_trade_min1_start_idx = None + + min1_df['timestamp'] = pd.to_datetime(min1_df.index) + + for i in range(1, len(df)): + if i % 100 == 0 and debug: + self.logger.debug(f"Progress: {i}/{len(df)} rows processed.") + + price_open = df['open'].iloc[i] + price_high = df['high'].iloc[i] + price_low = df['low'].iloc[i] + price_close = df['close'].iloc[i] + date = df['timestamp'].iloc[i] + prev_mt = meta_trend[i-1] + curr_mt = meta_trend[i] + + # Check stop loss if in position + if position == 1: + stop_price = entry_price * (1 - stop_loss_pct) + + if current_trade_min1_start_idx is None: + # First check after entry, find the entry point in 1-min data + current_trade_min1_start_idx = min1_df.index[min1_df.index >= entry_time][0] + + # Get the end index for current check + current_min1_end_idx = min1_df.index[min1_df.index <= date][-1] + + # Check all 1-minute candles in between for stop loss + min1_slice = min1_df.loc[current_trade_min1_start_idx:current_min1_end_idx] + if (min1_slice['low'] <= stop_price).any(): + # Stop loss triggered, find the exact candle + stop_candle = min1_slice[min1_slice['low'] <= stop_price].iloc[0] + # More realistic fill: if open < stop, fill at open, else at stop + if stop_candle['open'] < stop_price: + sell_price = stop_candle['open'] + else: + sell_price = stop_price + if debug: + print(f"STOP LOSS triggered: entry={entry_price}, stop={stop_price}, sell_price={sell_price}, entry_time={entry_time}, stop_time={stop_candle.name}") + btc_to_sell = coin + fee_btc = btc_to_sell * transaction_cost + btc_after_fee = btc_to_sell - fee_btc + usd = btc_after_fee * sell_price + trade_log.append({ + 'type': 'STOP', + 'entry': entry_price, + 'exit': sell_price, + 'entry_time': entry_time, + 'exit_time': stop_candle.name, # Use index name instead of timestamp column + 'fee_btc': fee_btc + }) + coin = 0 + position = 0 + entry_price = 0 + current_trade_min1_start_idx = None + continue + + # Update the start index for next check + current_trade_min1_start_idx = current_min1_end_idx + + # Entry: only if not in position and signal changes to 1 + if position == 0 and prev_mt != 1 and curr_mt == 1: + # Buy at open, fee is charged in BTC (base currency) + gross_btc = usd / price_open + fee_btc = gross_btc * transaction_cost + coin = gross_btc - fee_btc + entry_price = price_open + entry_time = date + usd = 0 + position = 1 + current_trade_min1_start_idx = None # Will be set on first stop loss check + trade_log.append({ + 'type': 'BUY', + 'entry': entry_price, + 'exit': None, + 'entry_time': entry_time, + 'exit_time': None, + 'fee_btc': fee_btc + }) + + # Exit: only if in position and signal changes from 1 to -1 + elif position == 1 and prev_mt == 1 and curr_mt == -1: + # Sell at open, fee is charged in BTC (base currency) + btc_to_sell = coin + fee_btc = btc_to_sell * transaction_cost + btc_after_fee = btc_to_sell - fee_btc + usd = btc_after_fee * price_open + trade_log.append({ + 'type': 'SELL', + 'entry': entry_price, + 'exit': price_open, + 'entry_time': entry_time, + 'exit_time': date, + 'fee_btc': fee_btc + }) + coin = 0 + position = 0 + entry_price = 0 + current_trade_min1_start_idx = None + + # Track drawdown + balance = usd if position == 0 else coin * price_close + if balance > max_balance: + max_balance = balance + drawdown = (max_balance - balance) / max_balance + drawdowns.append(drawdown) + + # If still in position at end, sell at last close + if position == 1: + btc_to_sell = coin + fee_btc = btc_to_sell * transaction_cost + btc_after_fee = btc_to_sell - fee_btc + usd = btc_after_fee * df['close'].iloc[-1] + trade_log.append({ + 'type': 'EOD', + 'entry': entry_price, + 'exit': df['close'].iloc[-1], + 'entry_time': entry_time, + 'exit_time': df['timestamp'].iloc[-1], + 'fee_btc': fee_btc + }) + coin = 0 + position = 0 + entry_price = 0 + + # Calculate statistics + final_balance = usd + n_trades = len(trade_log) + wins = [1 for t in trade_log if t['exit'] is not None and t['exit'] > t['entry']] + win_rate = len(wins) / n_trades if n_trades > 0 else 0 + max_drawdown = max(drawdowns) if drawdowns else 0 + avg_trade = np.mean([t['exit']/t['entry']-1 for t in trade_log if t['exit'] is not None]) if trade_log else 0 + + trades = [] + total_fees_btc = 0.0 + total_fees_usd = 0.0 + for trade in trade_log: + if trade['exit'] is not None: + profit_pct = (trade['exit'] - trade['entry']) / trade['entry'] + else: + profit_pct = 0.0 + trades.append({ + 'entry_time': trade['entry_time'], + 'exit_time': trade['exit_time'], + 'entry': trade['entry'], + 'exit': trade['exit'], + 'profit_pct': profit_pct, + 'type': trade.get('type', 'SELL') + }) + # Sum up BTC fees and their USD equivalent (use exit price if available) + fee_btc = trade.get('fee_btc', 0.0) + total_fees_btc += fee_btc + if fee_btc and trade.get('exit') is not None: + total_fees_usd += fee_btc * trade['exit'] + + results = { + "initial_usd": initial_usd, + "final_usd": final_balance, + "n_trades": n_trades, + "win_rate": win_rate, + "max_drawdown": max_drawdown, + "avg_trade": avg_trade, + "trade_log": trade_log, + "trades": trades, + "total_fees_btc": total_fees_btc, + "total_fees_usd": total_fees_usd, + } + if n_trades > 0: + results["first_trade"] = { + "entry_time": trade_log[0]['entry_time'], + "entry": trade_log[0]['entry'] + } + results["last_trade"] = { + "exit_time": trade_log[-1]['exit_time'], + "exit": trade_log[-1]['exit'] + } + return results \ No newline at end of file diff --git a/cycles/utils/__init__.py b/cycles/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/utils/apply_taxes_to_file.py b/cycles/utils/apply_taxes_to_file.py similarity index 100% rename from utils/apply_taxes_to_file.py rename to cycles/utils/apply_taxes_to_file.py diff --git a/cycles/utils/gsheets.py b/cycles/utils/gsheets.py new file mode 100644 index 0000000..853f002 --- /dev/null +++ b/cycles/utils/gsheets.py @@ -0,0 +1,128 @@ +import threading +import time +import queue +from google.oauth2.service_account import Credentials +import gspread +import math +import numpy as np +from collections import defaultdict + + +class GSheetBatchPusher(threading.Thread): + + def __init__(self, queue, timestamp, spreadsheet_name, interval=60, logging=None): + super().__init__(daemon=True) + self.queue = queue + self.timestamp = timestamp + self.spreadsheet_name = spreadsheet_name + self.interval = interval + self._stop_event = threading.Event() + self.logging = logging + + def run(self): + while not self._stop_event.is_set(): + self.push_all() + time.sleep(self.interval) + # Final push on stop + self.push_all() + + def stop(self): + self._stop_event.set() + + def push_all(self): + batch_results = [] + batch_trades = [] + while True: + try: + results, trades = self.queue.get_nowait() + batch_results.extend(results) + batch_trades.extend(trades) + except queue.Empty: + break + + if batch_results or batch_trades: + self.write_results_per_combination_gsheet(batch_results, batch_trades, self.timestamp, self.spreadsheet_name) + + + def write_results_per_combination_gsheet(self, results_rows, trade_rows, timestamp, spreadsheet_name="GlimBit Backtest Results"): + scopes = [ + "https://www.googleapis.com/auth/spreadsheets", + "https://www.googleapis.com/auth/drive" + ] + creds = Credentials.from_service_account_file('credentials/service_account.json', scopes=scopes) + gc = gspread.authorize(creds) + sh = gc.open(spreadsheet_name) + + try: + worksheet = sh.worksheet("Results") + except gspread.exceptions.WorksheetNotFound: + worksheet = sh.add_worksheet(title="Results", rows="1000", cols="20") + + # Clear the worksheet before writing new results + worksheet.clear() + + # Updated fieldnames to match your data rows + fieldnames = [ + "timeframe", "stop_loss_pct", "n_trades", "n_stop_loss", "win_rate", + "max_drawdown", "avg_trade", "profit_ratio", "initial_usd", "final_usd" + ] + + def to_native(val): + if isinstance(val, (np.generic, np.ndarray)): + val = val.item() + if hasattr(val, 'isoformat'): + return val.isoformat() + # Handle inf, -inf, nan + if isinstance(val, float): + if math.isinf(val): + return "∞" if val > 0 else "-∞" + if math.isnan(val): + return "" + return val + + # Write header if sheet is empty + if len(worksheet.get_all_values()) == 0: + worksheet.append_row(fieldnames) + + for row in results_rows: + values = [to_native(row.get(field, "")) for field in fieldnames] + worksheet.append_row(values) + + trades_fieldnames = [ + "entry_time", "exit_time", "entry_price", "exit_price", "profit_pct", "type" + ] + trades_by_combo = defaultdict(list) + + for trade in trade_rows: + tf = trade.get("timeframe") + sl = trade.get("stop_loss_pct") + trades_by_combo[(tf, sl)].append(trade) + + for (tf, sl), trades in trades_by_combo.items(): + sl_percent = int(round(sl * 100)) + sheet_name = f"Trades_{tf}_ST{sl_percent}%" + + try: + trades_ws = sh.worksheet(sheet_name) + except gspread.exceptions.WorksheetNotFound: + trades_ws = sh.add_worksheet(title=sheet_name, rows="1000", cols="20") + + # Clear the trades worksheet before writing new trades + trades_ws.clear() + + if len(trades_ws.get_all_values()) == 0: + trades_ws.append_row(trades_fieldnames) + + for trade in trades: + trade_row = [to_native(trade.get(field, "")) for field in trades_fieldnames] + try: + trades_ws.append_row(trade_row) + except gspread.exceptions.APIError as e: + if '429' in str(e): + if self.logging is not None: + self.logging.warning(f"Google Sheets API quota exceeded (429). Please wait one minute. Will retry on next batch push. Sheet: {sheet_name}") + # Re-queue the failed batch for retry + self.queue.put((results_rows, trade_rows)) + return # Stop pushing for this batch, will retry next interval + else: + raise \ No newline at end of file diff --git a/cycles/utils/storage.py b/cycles/utils/storage.py new file mode 100644 index 0000000..300d8cc --- /dev/null +++ b/cycles/utils/storage.py @@ -0,0 +1,155 @@ +import os +import json +import pandas as pd +import csv +from collections import defaultdict + +RESULTS_DIR = "results" +DATA_DIR = "data" + +class Storage: + + """Storage class for storing and loading results and data""" + def __init__(self, logging=None, results_dir=RESULTS_DIR, data_dir=DATA_DIR): + + self.results_dir = results_dir + self.data_dir = data_dir + self.logging = logging + + # Create directories if they don't exist + os.makedirs(self.results_dir, exist_ok=True) + os.makedirs(self.data_dir, exist_ok=True) + + def load_data(self, file_path, start_date, stop_date): + """Load data with optimized dtypes and filtering, supporting CSV and JSON input + Args: + file_path: path to the data file + start_date: start date + stop_date: stop date + Returns: + pandas DataFrame + """ + # Determine file type + _, ext = os.path.splitext(file_path) + ext = ext.lower() + try: + if ext == ".json": + with open(os.path.join(self.data_dir, file_path), 'r') as f: + raw = json.load(f) + data = pd.DataFrame(raw["Data"]) + # Convert columns to lowercase + data.columns = data.columns.str.lower() + # Convert timestamp to datetime + data["timestamp"] = pd.to_datetime(data["timestamp"], unit="s") + # Filter by date range + data = data[(data["timestamp"] >= start_date) & (data["timestamp"] <= stop_date)] + if self.logging is not None: + self.logging.info(f"Data loaded from {file_path} for date range {start_date} to {stop_date}") + return data.set_index("timestamp") + else: + # Define optimized dtypes + dtypes = { + 'Open': 'float32', + 'High': 'float32', + 'Low': 'float32', + 'Close': 'float32', + 'Volume': 'float32' + } + # Read data with original capitalized column names + data = pd.read_csv(os.path.join(self.data_dir, file_path), dtype=dtypes) + # Convert timestamp to datetime + data['Timestamp'] = pd.to_datetime(data['Timestamp'], unit='s') + # Filter by date range + data = data[(data['Timestamp'] >= start_date) & (data['Timestamp'] <= stop_date)] + # Now convert column names to lowercase + data.columns = data.columns.str.lower() + if self.logging is not None: + self.logging.info(f"Data loaded from {file_path} for date range {start_date} to {stop_date}") + return data.set_index('timestamp') + except Exception as e: + if self.logging is not None: + self.logging.error(f"Error loading data from {file_path}: {e}") + return None + + def format_row(self, row): + """Format a row for a combined results CSV file + Args: + row: row to format + Returns: + formatted row + """ + + return { + "timeframe": row["timeframe"], + "stop_loss_pct": f"{row['stop_loss_pct']*100:.2f}%", + "n_trades": row["n_trades"], + "n_stop_loss": row["n_stop_loss"], + "win_rate": f"{row['win_rate']*100:.2f}%", + "max_drawdown": f"{row['max_drawdown']*100:.2f}%", + "avg_trade": f"{row['avg_trade']*100:.2f}%", + "profit_ratio": f"{row['profit_ratio']*100:.2f}%", + "final_usd": f"{row['final_usd']:.2f}", + } + + def write_results_chunk(self, filename, fieldnames, rows, write_header=False, initial_usd=None): + """Write a chunk of results to a CSV file + Args: + filename: filename to write to + fieldnames: list of fieldnames + rows: list of rows + write_header: whether to write the header + initial_usd: initial USD + """ + mode = 'w' if write_header else 'a' + + with open(filename, mode, newline="") as csvfile: + writer = csv.DictWriter(csvfile, fieldnames=fieldnames) + if write_header: + csvfile.write(f"# initial_usd: {initial_usd}\n") + writer.writeheader() + + for row in rows: + # Only keep keys that are in fieldnames + filtered_row = {k: v for k, v in row.items() if k in fieldnames} + writer.writerow(filtered_row) + + def write_results_combined(self, filename, fieldnames, rows): + """Write a combined results to a CSV file + Args: + filename: filename to write to + fieldnames: list of fieldnames + rows: list of rows + """ + fname = os.path.join(self.results_dir, filename) + with open(fname, "w", newline="") as csvfile: + writer = csv.DictWriter(csvfile, fieldnames=fieldnames, delimiter='\t') + writer.writeheader() + for row in rows: + writer.writerow(self.format_row(row)) + if self.logging is not None: + self.logging.info(f"Combined results written to {fname}") + + def write_trades(self, all_trade_rows, trades_fieldnames): + """Write trades to a CSV file + Args: + all_trade_rows: list of trade rows + trades_fieldnames: list of trade fieldnames + logging: logging object + """ + + trades_by_combo = defaultdict(list) + for trade in all_trade_rows: + tf = trade.get("timeframe") + sl = trade.get("stop_loss_pct") + trades_by_combo[(tf, sl)].append(trade) + + for (tf, sl), trades in trades_by_combo.items(): + sl_percent = int(round(sl * 100)) + trades_filename = os.path.join(self.results_dir, f"trades_{tf}_ST{sl_percent}pct.csv") + with open(trades_filename, "w", newline="") as csvfile: + writer = csv.DictWriter(csvfile, fieldnames=trades_fieldnames) + writer.writeheader() + for trade in trades: + writer.writerow({k: trade.get(k, "") for k in trades_fieldnames}) + if self.logging is not None: + self.logging.info(f"Trades written to {trades_filename}") \ No newline at end of file diff --git a/cycles/utils/system.py b/cycles/utils/system.py new file mode 100644 index 0000000..8f8a07c --- /dev/null +++ b/cycles/utils/system.py @@ -0,0 +1,19 @@ +import os +import psutil + +class SystemUtils: + + def __init__(self, logging=None): + self.logging = logging + + def get_optimal_workers(self): + """Determine optimal number of worker processes based on system resources""" + cpu_count = os.cpu_count() or 4 + memory_gb = psutil.virtual_memory().total / (1024**3) + # Heuristic: Use 75% of cores, but cap based on available memory + # Assume each worker needs ~2GB for large datasets + workers_by_memory = max(1, int(memory_gb / 2)) + workers_by_cpu = max(1, int(cpu_count * 0.75)) + if self.logging is not None: + self.logging.info(f"Using {min(workers_by_cpu, workers_by_memory)} workers for processing") + return min(workers_by_cpu, workers_by_memory) \ No newline at end of file diff --git a/main.py b/main.py index 51343d8..43dea81 100644 --- a/main.py +++ b/main.py @@ -1,21 +1,16 @@ import pandas as pd import numpy as np -from trend_detector_simple import TrendDetectorSimple -import csv import logging import concurrent.futures import os -import psutil import datetime -import gspread -from google.oauth2.service_account import Credentials -from collections import defaultdict -import threading import queue -import time -import math -import json -from taxes import Taxes + +from cycles.trend_detector_simple import TrendDetectorSimple +from cycles.taxes import Taxes +from cycles.utils.storage import Storage +from cycles.utils.gsheets import GSheetBatchPusher +from cycles.utils.system import SystemUtils # Set up logging logging.basicConfig( @@ -30,85 +25,6 @@ logging.basicConfig( # Global queue for batching Google Sheets updates results_queue = queue.Queue() -# Background thread function to push updates every minute -class GSheetBatchPusher(threading.Thread): - def __init__(self, queue, timestamp, spreadsheet_name, interval=60): - super().__init__(daemon=True) - self.queue = queue - self.timestamp = timestamp - self.spreadsheet_name = spreadsheet_name - self.interval = interval - self._stop_event = threading.Event() - - def run(self): - while not self._stop_event.is_set(): - self.push_all() - time.sleep(self.interval) - # Final push on stop - self.push_all() - - def stop(self): - self._stop_event.set() - - def push_all(self): - batch_results = [] - batch_trades = [] - while True: - try: - results, trades = self.queue.get_nowait() - batch_results.extend(results) - batch_trades.extend(trades) - except queue.Empty: - break - - if batch_results or batch_trades: - write_results_per_combination_gsheet(batch_results, batch_trades, self.timestamp, self.spreadsheet_name) - -def get_optimal_workers(): - """Determine optimal number of worker processes based on system resources""" - cpu_count = os.cpu_count() or 4 - memory_gb = psutil.virtual_memory().total / (1024**3) - # Heuristic: Use 75% of cores, but cap based on available memory - # Assume each worker needs ~2GB for large datasets - workers_by_memory = max(1, int(memory_gb / 2)) - workers_by_cpu = max(1, int(cpu_count * 0.75)) - return min(workers_by_cpu, workers_by_memory) - -def load_data(file_path, start_date, stop_date): - """Load data with optimized dtypes and filtering, supporting CSV and JSON input""" - # Determine file type - _, ext = os.path.splitext(file_path) - ext = ext.lower() - if ext == ".json": - with open(file_path, 'r') as f: - raw = json.load(f) - data = pd.DataFrame(raw["Data"]) - # Convert columns to lowercase - data.columns = data.columns.str.lower() - # Convert timestamp to datetime - data["timestamp"] = pd.to_datetime(data["timestamp"], unit="s") - # Filter by date range - data = data[(data["timestamp"] >= start_date) & (data["timestamp"] <= stop_date)] - return data.set_index("timestamp") - else: - # Define optimized dtypes - dtypes = { - 'Open': 'float32', - 'High': 'float32', - 'Low': 'float32', - 'Close': 'float32', - 'Volume': 'float32' - } - # Read data with original capitalized column names - data = pd.read_csv(file_path, dtype=dtypes) - # Convert timestamp to datetime - data['Timestamp'] = pd.to_datetime(data['Timestamp'], unit='s') - # Filter by date range - data = data[(data['Timestamp'] >= start_date) & (data['Timestamp'] <= stop_date)] - # Now convert column names to lowercase - data.columns = data.columns.str.lower() - return data.set_index('timestamp') - def process_timeframe_data(min1_df, df, stop_loss_pcts, rule_name, initial_usd, debug=False): """Process the entire timeframe with all stop loss values (no monthly split)""" df = df.copy().reset_index(drop=True) @@ -199,21 +115,6 @@ def process_timeframe(timeframe_info, debug=False): results_rows, all_trade_rows = process_timeframe_data(data_1min, df, [stop_loss_pct], rule, initial_usd, debug=debug) return results_rows, all_trade_rows -def write_results_chunk(filename, fieldnames, rows, write_header=False): - """Write a chunk of results to a CSV file""" - mode = 'w' if write_header else 'a' - - with open(filename, mode, newline="") as csvfile: - writer = csv.DictWriter(csvfile, fieldnames=fieldnames) - if write_header: - csvfile.write(f"# initial_usd: {initial_usd}\n") - writer.writeheader() - - for row in rows: - # Only keep keys that are in fieldnames - filtered_row = {k: v for k, v in row.items() if k in fieldnames} - writer.writerow(filtered_row) - def aggregate_results(all_rows): """Aggregate results per stop_loss_pct and per rule (timeframe)""" from collections import defaultdict @@ -250,87 +151,14 @@ def aggregate_results(all_rows): }) return summary_rows -def write_results_per_combination_gsheet(results_rows, trade_rows, timestamp, spreadsheet_name="GlimBit Backtest Results"): - scopes = [ - "https://www.googleapis.com/auth/spreadsheets", - "https://www.googleapis.com/auth/drive" - ] - creds = Credentials.from_service_account_file('credentials/service_account.json', scopes=scopes) - gc = gspread.authorize(creds) - sh = gc.open(spreadsheet_name) - - try: - worksheet = sh.worksheet("Results") - except gspread.exceptions.WorksheetNotFound: - worksheet = sh.add_worksheet(title="Results", rows="1000", cols="20") - - # Clear the worksheet before writing new results - worksheet.clear() - - # Updated fieldnames to match your data rows - fieldnames = [ - "timeframe", "stop_loss_pct", "n_trades", "n_stop_loss", "win_rate", - "max_drawdown", "avg_trade", "profit_ratio", "initial_usd", "final_usd" - ] - - def to_native(val): - if isinstance(val, (np.generic, np.ndarray)): - val = val.item() - if hasattr(val, 'isoformat'): - return val.isoformat() - # Handle inf, -inf, nan - if isinstance(val, float): - if math.isinf(val): - return "∞" if val > 0 else "-∞" - if math.isnan(val): - return "" - return val - - # Write header if sheet is empty - if len(worksheet.get_all_values()) == 0: - worksheet.append_row(fieldnames) - - for row in results_rows: - values = [to_native(row.get(field, "")) for field in fieldnames] - worksheet.append_row(values) - - trades_fieldnames = [ - "entry_time", "exit_time", "entry_price", "exit_price", "profit_pct", "type" - ] - trades_by_combo = defaultdict(list) - - for trade in trade_rows: - tf = trade.get("timeframe") - sl = trade.get("stop_loss_pct") - trades_by_combo[(tf, sl)].append(trade) - - for (tf, sl), trades in trades_by_combo.items(): - sl_percent = int(round(sl * 100)) - sheet_name = f"Trades_{tf}_ST{sl_percent}%" - - try: - trades_ws = sh.worksheet(sheet_name) - except gspread.exceptions.WorksheetNotFound: - trades_ws = sh.add_worksheet(title=sheet_name, rows="1000", cols="20") - - # Clear the trades worksheet before writing new trades - trades_ws.clear() - - if len(trades_ws.get_all_values()) == 0: - trades_ws.append_row(trades_fieldnames) - - for trade in trades: - trade_row = [to_native(trade.get(field, "")) for field in trades_fieldnames] - try: - trades_ws.append_row(trade_row) - except gspread.exceptions.APIError as e: - if '429' in str(e): - logging.warning(f"Google Sheets API quota exceeded (429). Please wait one minute. Will retry on next batch push. Sheet: {sheet_name}") - # Re-queue the failed batch for retry - results_queue.put((results_rows, trade_rows)) - return # Stop pushing for this batch, will retry next interval - else: - raise +def get_nearest_price(df, target_date): + if len(df) == 0: + return None, None + target_ts = pd.to_datetime(target_date) + nearest_idx = df.index.get_indexer([target_ts], method='nearest')[0] + nearest_time = df.index[nearest_idx] + price = df.iloc[nearest_idx]['close'] + return nearest_time, price if __name__ == "__main__": # Configuration @@ -341,24 +169,16 @@ if __name__ == "__main__": initial_usd = 10000 debug = False - results_dir = "results" - os.makedirs(results_dir, exist_ok=True) timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M") - timeframes = ["1min", "5min"] + storage = Storage(logging=logging) + system_utils = SystemUtils(logging=logging) + + timeframes = ["1D"] stop_loss_pcts = [0.01, 0.02, 0.03] # Load data once - data_1min = load_data('./data/btcusd_1-min_data.csv', start_date, stop_date) - - def get_nearest_price(df, target_date): - if len(df) == 0: - return None, None - target_ts = pd.to_datetime(target_date) - nearest_idx = df.index.get_indexer([target_ts], method='nearest')[0] - nearest_time = df.index[nearest_idx] - price = df.iloc[nearest_idx]['close'] - return nearest_time, price + data_1min = storage.load_data('btcusd_1-min_data.csv', start_date, stop_date) nearest_start_time, start_price = get_nearest_price(data_1min, start_date) nearest_stop_time, stop_price = get_nearest_price(data_1min, stop_date) @@ -372,8 +192,7 @@ if __name__ == "__main__": for stop_loss_pct in stop_loss_pcts ] - workers = get_optimal_workers() - logging.info(f"Using {workers} workers for processing") + workers = system_utils.get_optimal_workers() # Start the background batch pusher # spreadsheet_name = "GlimBit Backtest Results" @@ -403,33 +222,12 @@ if __name__ == "__main__": # batch_pusher.push_all() # Write all results to a single CSV file - combined_filename = os.path.join(results_dir, f"{timestamp}_backtest_combined.csv") + combined_filename = os.path.join(f"{timestamp}_backtest_combined.csv") combined_fieldnames = [ "timeframe", "stop_loss_pct", "n_trades", "n_stop_loss", "win_rate", "max_drawdown", "avg_trade", "profit_ratio", "final_usd" ] - - def format_row(row): - # Format percentages and floats as in your example - return { - "timeframe": row["timeframe"], - "stop_loss_pct": f"{row['stop_loss_pct']*100:.2f}%", - "n_trades": row["n_trades"], - "n_stop_loss": row["n_stop_loss"], - "win_rate": f"{row['win_rate']*100:.2f}%", - "max_drawdown": f"{row['max_drawdown']*100:.2f}%", - "avg_trade": f"{row['avg_trade']*100:.2f}%", - "profit_ratio": f"{row['profit_ratio']*100:.2f}%", - "final_usd": f"{row['final_usd']:.2f}", - } - - with open(combined_filename, "w", newline="") as csvfile: - writer = csv.DictWriter(csvfile, fieldnames=combined_fieldnames, delimiter='\t') - writer.writeheader() - for row in all_results_rows: - writer.writerow(format_row(row)) - - logging.info(f"Combined results written to {combined_filename}") + storage.write_results_combined(combined_filename, combined_fieldnames, all_results_rows) # --- Add taxes to combined results CSV --- # taxes = Taxes() # Default 20% tax rate @@ -457,25 +255,11 @@ if __name__ == "__main__": # --- END: Collect all trades from each task --- # Now, group all_trade_rows by (timeframe, stop_loss_pct) - from collections import defaultdict - trades_by_combo = defaultdict(list) - for trade in all_trade_rows: - tf = trade.get("timeframe") - sl = trade.get("stop_loss_pct") - trades_by_combo[(tf, sl)].append(trade) + trades_fieldnames = [ "entry_time", "exit_time", "entry_price", "exit_price", "profit_pct", "type" ] - - for (tf, sl), trades in trades_by_combo.items(): - sl_percent = int(round(sl * 100)) - trades_filename = os.path.join(results_dir, f"trades_{tf}_ST{sl_percent}pct.csv") - with open(trades_filename, "w", newline="") as csvfile: - writer = csv.DictWriter(csvfile, fieldnames=trades_fieldnames) - writer.writeheader() - for trade in trades: - writer.writerow({k: trade.get(k, "") for k in trades_fieldnames}) - logging.info(f"Trades written to {trades_filename}") + storage.write_trades(all_trade_rows, trades_fieldnames) \ No newline at end of file