diff --git a/main.py b/main.py index 7b2d4fb..58d625e 100644 --- a/main.py +++ b/main.py @@ -1,20 +1,253 @@ import pandas as pd +import numpy as np from trend_detector_macd import TrendDetectorMACD from trend_detector_simple import TrendDetectorSimple from cycle_detector import CycleDetector +import csv +import logging +import concurrent.futures +import os +import psutil -# Load data from CSV file instead of database -data = pd.read_csv('data/btcusd_1-day_data.csv') +# Set up logging +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(message)s", + handlers=[ + logging.FileHandler("backtest.log"), + logging.StreamHandler() + ] +) +def get_optimal_workers(): + """Determine optimal number of worker processes based on system resources""" + cpu_count = os.cpu_count() or 4 + memory_gb = psutil.virtual_memory().total / (1024**3) + # Heuristic: Use 75% of cores, but cap based on available memory + # Assume each worker needs ~2GB for large datasets + workers_by_memory = max(1, int(memory_gb / 2)) + workers_by_cpu = max(1, int(cpu_count * 0.75)) + return min(workers_by_cpu, workers_by_memory) -# Convert datetime column to datetime type -start_date = pd.to_datetime('2024-04-06') -stop_date = pd.to_datetime('2025-05-06') +def load_data(file_path, start_date, stop_date): + """Load data with optimized dtypes and filtering""" + # Define optimized dtypes + dtypes = { + 'Open': 'float32', + 'High': 'float32', + 'Low': 'float32', + 'Close': 'float32', + 'Volume': 'float32' + } + + # Read data with original capitalized column names + data = pd.read_csv(file_path, dtype=dtypes) + + # Convert timestamp to datetime + data['Timestamp'] = pd.to_datetime(data['Timestamp'], unit='s') + + # Filter by date range + data = data[(data['Timestamp'] >= start_date) & (data['Timestamp'] <= stop_date)] + + # Now convert column names to lowercase + data.columns = data.columns.str.lower() + + return data.set_index('timestamp') -daily_data = data[(pd.to_datetime(data['datetime']) >= start_date) & - (pd.to_datetime(data['datetime']) < stop_date)] -print(f"Number of data points: {len(daily_data)}") +def process_month_timeframe(month_df, stop_loss_pcts, rule_name, initial_usd): + """Process a single month for a given timeframe with all stop loss values""" + month_df = month_df.copy().reset_index(drop=True) + + # Only calculate trends once per month-timeframe combination + trend_detector = TrendDetectorSimple(month_df, verbose=False) + analysis_results = trend_detector.detect_trends() + + # Calculate backtest for each stop_loss_pct + results_rows = [] + for stop_loss_pct in stop_loss_pcts: + results = trend_detector.backtest_meta_supertrend( + initial_usd=initial_usd, + stop_loss_pct=stop_loss_pct + ) + + # Process results + n_trades = results["n_trades"] + trades = results.get('trades', []) + n_winning_trades = sum(1 for trade in trades if trade['profit_pct'] > 0) + total_profit = sum(trade['profit_pct'] for trade in trades) + total_loss = sum(-trade['profit_pct'] for trade in trades if trade['profit_pct'] < 0) + + win_rate = n_winning_trades / n_trades if n_trades > 0 else 0 + avg_trade = total_profit / n_trades if n_trades > 0 else 0 + profit_ratio = total_profit / total_loss if total_loss > 0 else float('inf') + + # Calculate max drawdown + cumulative_profit = 0 + max_drawdown = 0 + peak = 0 + for trade in trades: + cumulative_profit += trade['profit_pct'] + if cumulative_profit > peak: + peak = cumulative_profit + drawdown = peak - cumulative_profit + if drawdown > max_drawdown: + max_drawdown = drawdown + + # Create row + row = { + "timeframe": rule_name, + "month": str(month_df['timestamp'].iloc[0].to_period('M')), + "stop_loss_pct": stop_loss_pct, + "n_trades": n_trades, + "n_stop_loss": sum(1 for trade in trades if 'type' in trade and trade['type'] == 'STOP'), + "win_rate": win_rate, + "max_drawdown": max_drawdown, + "avg_trade": avg_trade, + "profit_ratio": profit_ratio + } + results_rows.append(row) + + return results_rows -trend_detector = TrendDetectorSimple(daily_data, verbose=True) -trends, analysis_results = trend_detector.detect_trends() -trend_detector.plot_trends(trends, analysis_results, "supertrend") \ No newline at end of file +def process_timeframe(timeframe_info): + """Process an entire timeframe""" + rule, rule_name, data_1min, stop_loss_pcts, initial_usd = timeframe_info + + # Resample data if needed + if rule == "1T": + df = data_1min.copy() + else: + df = data_1min.resample(rule).agg({ + 'open': 'first', + 'high': 'max', + 'low': 'min', + 'close': 'last', + 'volume': 'sum' + }).dropna() + + df = df.reset_index() + df['month'] = df['timestamp'].dt.to_period('M') + results_rows = [] + + # Process each month + for month, month_df in df.groupby('month'): + if len(month_df) < 10: # Skip very small months + continue + + logging.info(f"Processing: timeframe={rule_name}, month={month}") + + try: + month_results = process_month_timeframe(month_df, stop_loss_pcts, rule_name, initial_usd) + results_rows.extend(month_results) + + # Write intermediate results to avoid memory buildup + if len(results_rows) > 100: + return results_rows + except Exception as e: + logging.error(f"Error processing {rule_name}, month={month}: {str(e)}") + + return results_rows + +def write_results_chunk(filename, fieldnames, rows, write_header=False): + """Write a chunk of results to a CSV file""" + mode = 'w' if write_header else 'a' + + with open(filename, mode, newline="") as csvfile: + writer = csv.DictWriter(csvfile, fieldnames=fieldnames) + if write_header: + csvfile.write(f"# initial_usd: {initial_usd}\n") + writer.writeheader() + + for row in rows: + writer.writerow(row) + +def aggregate_results(all_rows): + """Aggregate results per stop_loss_pct and per rule (timeframe)""" + from collections import defaultdict + + grouped = defaultdict(list) + for row in all_rows: + key = (row['timeframe'], row['stop_loss_pct']) + grouped[key].append(row) + + summary_rows = [] + for (rule, stop_loss_pct), rows in grouped.items(): + n_months = len(rows) + total_trades = sum(r['n_trades'] for r in rows) + total_stop_loss = sum(r['n_stop_loss'] for r in rows) + avg_win_rate = np.mean([r['win_rate'] for r in rows]) + avg_max_drawdown = np.mean([r['max_drawdown'] for r in rows]) + avg_avg_trade = np.mean([r['avg_trade'] for r in rows]) + avg_profit_ratio = np.mean([r['profit_ratio'] for r in rows]) + + summary_rows.append({ + "timeframe": rule, + "stop_loss_pct": stop_loss_pct, + "n_trades": total_trades, + "n_stop_loss": total_stop_loss, + "win_rate": avg_win_rate, + "max_drawdown": avg_max_drawdown, + "avg_trade": avg_avg_trade, + "profit_ratio": avg_profit_ratio, + }) + return summary_rows + +if __name__ == "__main__": + # Configuration + start_date = '2020-01-01' + stop_date = '2025-05-15' + initial_usd = 10000 + + timeframes = { + # "1T": "1min", + "15T": "15min", + "1H": "1h", + "6H": "6h", + "1D": "1D", + } + + stop_loss_pcts = [0.01, 0.02, 0.03, 0.05, 0.07, 0.10] + + # Load data once + data_1min = load_data('./data/btcusd_1-min_data.csv', start_date, stop_date) + logging.info(f"1min rows: {len(data_1min)}") + + # Set up result file + filename = f"backtest_results_{start_date}_{stop_date}_multi_timeframe_stoploss.csv" + fieldnames = ["timeframe", "stop_loss_pct", "n_trades", "n_stop_loss", "win_rate", "max_drawdown", "avg_trade", "profit_ratio"] + + # Initialize output file with header + write_results_chunk(filename, fieldnames, [], write_header=True) + + # Prepare tasks + tasks = [ + (rule, name, data_1min, stop_loss_pcts, initial_usd) + for rule, name in timeframes.items() + ] + + # Determine optimal worker count + workers = get_optimal_workers() + logging.info(f"Using {workers} workers for processing") + + # Process tasks with optimized concurrency + with concurrent.futures.ProcessPoolExecutor(max_workers=workers) as executor: + futures = {executor.submit(process_timeframe, task): task[1] for task in tasks} + + # Collect all results + all_results = [] + for future in concurrent.futures.as_completed(futures): + timeframe_name = futures[future] + try: + results = future.result() + if results: + # logging.info(f"Writing {len(results)} results for {timeframe_name}") + # write_results_chunk(filename, fieldnames, results) # <-- REMOVE or COMMENT THIS OUT + all_results.extend(results) + except Exception as exc: + logging.error(f"{timeframe_name} generated an exception: {exc}") + + # Write summary rows + summary_rows = aggregate_results(all_results) + write_results_chunk(filename, fieldnames, summary_rows, write_header=True) # Only write summary + + logging.info(f"Results written to {filename}") \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index a08bc15..5b6dd4d 100644 Binary files a/requirements.txt and b/requirements.txt differ diff --git a/trend_detector_macd.py b/trend_detector_macd.py index 045f604..e5caf85 100644 --- a/trend_detector_macd.py +++ b/trend_detector_macd.py @@ -6,6 +6,8 @@ import matplotlib.dates as mdates import logging import mplfinance as mpf from matplotlib.patches import Rectangle +from concurrent.futures import ProcessPoolExecutor, as_completed +import concurrent.futures class TrendDetectorMACD: def __init__(self, data, verbose=False): @@ -24,8 +26,6 @@ class TrendDetectorMACD: else: self.logger.error("Invalid data format provided") raise ValueError("Data must be a pandas DataFrame or a list") - - self.logger.info(f"Initialized TrendDetector with {len(self.data)} data points") def detect_trends_MACD_signal(self): self.logger.info("Starting trend detection") @@ -257,3 +257,31 @@ class TrendDetectorMACD: plt.show() return plt + + def _calculate_supertrend_indicators(self): + """ + Calculate SuperTrend indicators with different parameter sets in parallel. + + Returns: + - list, the SuperTrend results + """ + supertrend_params = [ + {"period": 12, "multiplier": 3.0, "color_up": ST_COLOR_UP, "color_down": ST_COLOR_DOWN}, + {"period": 10, "multiplier": 1.0, "color_up": ST_COLOR_UP, "color_down": ST_COLOR_DOWN}, + {"period": 11, "multiplier": 2.0, "color_up": ST_COLOR_UP, "color_down": ST_COLOR_DOWN} + ] + + def run_supertrend(params): + # Each thread gets its own copy of the data to avoid race conditions + return { + "results": self.calculate_supertrend( + period=params["period"], + multiplier=params["multiplier"] + ), + "params": params + } + + with concurrent.futures.ThreadPoolExecutor() as executor: + results = list(executor.map(run_supertrend, supertrend_params)) + + return results diff --git a/trend_detector_simple.py b/trend_detector_simple.py index 9563bcb..61ae291 100644 --- a/trend_detector_simple.py +++ b/trend_detector_simple.py @@ -5,6 +5,9 @@ from scipy.signal import find_peaks from matplotlib.patches import Rectangle from scipy import stats from scipy import stats +import concurrent.futures +from functools import partial +from functools import lru_cache # Color configuration # Plot colors @@ -31,56 +34,140 @@ SMA15_LINE_STYLE = 'm-' # Magenta solid ST_COLOR_UP = 'g-' ST_COLOR_DOWN = 'r-' +# Cache the calculation results by function parameters +@lru_cache(maxsize=32) +def cached_supertrend_calculation(period, multiplier, data_tuple): + # Convert tuple back to numpy arrays + high = np.array(data_tuple[0]) + low = np.array(data_tuple[1]) + close = np.array(data_tuple[2]) + + # Calculate TR and ATR using vectorized operations + tr = np.zeros_like(close) + tr[0] = high[0] - low[0] + hc_range = np.abs(high[1:] - close[:-1]) + lc_range = np.abs(low[1:] - close[:-1]) + hl_range = high[1:] - low[1:] + tr[1:] = np.maximum.reduce([hl_range, hc_range, lc_range]) + + # Use numpy's exponential moving average + atr = np.zeros_like(tr) + atr[0] = tr[0] + multiplier_ema = 2.0 / (period + 1) + for i in range(1, len(tr)): + atr[i] = (tr[i] * multiplier_ema) + (atr[i-1] * (1 - multiplier_ema)) + + # Calculate bands + upper_band = np.zeros_like(close) + lower_band = np.zeros_like(close) + for i in range(len(close)): + hl_avg = (high[i] + low[i]) / 2 + upper_band[i] = hl_avg + (multiplier * atr[i]) + lower_band[i] = hl_avg - (multiplier * atr[i]) + + final_upper = np.zeros_like(close) + final_lower = np.zeros_like(close) + supertrend = np.zeros_like(close) + trend = np.zeros_like(close) + final_upper[0] = upper_band[0] + final_lower[0] = lower_band[0] + if close[0] <= upper_band[0]: + supertrend[0] = upper_band[0] + trend[0] = -1 + else: + supertrend[0] = lower_band[0] + trend[0] = 1 + for i in range(1, len(close)): + if (upper_band[i] < final_upper[i-1]) or (close[i-1] > final_upper[i-1]): + final_upper[i] = upper_band[i] + else: + final_upper[i] = final_upper[i-1] + if (lower_band[i] > final_lower[i-1]) or (close[i-1] < final_lower[i-1]): + final_lower[i] = lower_band[i] + else: + final_lower[i] = final_lower[i-1] + if supertrend[i-1] == final_upper[i-1] and close[i] <= final_upper[i]: + supertrend[i] = final_upper[i] + trend[i] = -1 + elif supertrend[i-1] == final_upper[i-1] and close[i] > final_upper[i]: + supertrend[i] = final_lower[i] + trend[i] = 1 + elif supertrend[i-1] == final_lower[i-1] and close[i] >= final_lower[i]: + supertrend[i] = final_lower[i] + trend[i] = 1 + elif supertrend[i-1] == final_lower[i-1] and close[i] < final_lower[i]: + supertrend[i] = final_upper[i] + trend[i] = -1 + return { + 'supertrend': supertrend, + 'trend': trend, + 'upper_band': final_upper, + 'lower_band': final_lower + } + +def calculate_supertrend_external(data, period, multiplier): + # Convert DataFrame columns to hashable tuples + high_tuple = tuple(data['high'].values) + low_tuple = tuple(data['low'].values) + close_tuple = tuple(data['close'].values) + + # Call the cached function + return cached_supertrend_calculation(period, multiplier, (high_tuple, low_tuple, close_tuple)) + class TrendDetectorSimple: - def __init__(self, data, verbose=False): + def __init__(self, data, verbose=False, display=False): """ Initialize the TrendDetectorSimple class. Parameters: - data: pandas DataFrame containing price data - verbose: boolean, whether to display detailed logging information + - display: boolean, whether to enable display/plotting features """ self.data = data self.verbose = verbose + self.display = display - # Plot style configuration - self.plot_style = 'dark_background' - self.bg_color = DARK_BG_COLOR - self.plot_size = (12, 8) - - # Candlestick configuration - self.candle_width = 0.6 - self.candle_up_color = CANDLE_UP_COLOR - self.candle_down_color = CANDLE_DOWN_COLOR - self.candle_alpha = 0.8 - self.wick_width = 1 - - # Marker configuration - self.min_marker = '^' - self.min_color = MIN_COLOR - self.min_size = 100 - self.max_marker = 'v' - self.max_color = MAX_COLOR - self.max_size = 100 - self.marker_zorder = 100 - - # Line configuration - self.line_width = 1 - self.min_line_style = MIN_LINE_STYLE - self.max_line_style = MAX_LINE_STYLE - self.sma7_line_style = SMA7_LINE_STYLE - self.sma15_line_style = SMA15_LINE_STYLE - - # Text configuration - self.title_size = 14 - self.title_color = TITLE_COLOR - self.axis_label_size = 12 - self.axis_label_color = AXIS_LABEL_COLOR - - # Legend configuration - self.legend_loc = 'best' - self.legend_bg_color = LEGEND_BG_COLOR + # Only define display-related variables if display is True + if self.display: + # Plot style configuration + self.plot_style = 'dark_background' + self.bg_color = DARK_BG_COLOR + self.plot_size = (12, 8) + + # Candlestick configuration + self.candle_width = 0.6 + self.candle_up_color = CANDLE_UP_COLOR + self.candle_down_color = CANDLE_DOWN_COLOR + self.candle_alpha = 0.8 + self.wick_width = 1 + + # Marker configuration + self.min_marker = '^' + self.min_color = MIN_COLOR + self.min_size = 100 + self.max_marker = 'v' + self.max_color = MAX_COLOR + self.max_size = 100 + self.marker_zorder = 100 + + # Line configuration + self.line_width = 1 + self.min_line_style = MIN_LINE_STYLE + self.max_line_style = MAX_LINE_STYLE + self.sma7_line_style = SMA7_LINE_STYLE + self.sma15_line_style = SMA15_LINE_STYLE + + # Text configuration + self.title_size = 14 + self.title_color = TITLE_COLOR + self.axis_label_size = 12 + self.axis_label_color = AXIS_LABEL_COLOR + + # Legend configuration + self.legend_loc = 'best' + self.legend_bg_color = LEGEND_BG_COLOR # Configure logging logging.basicConfig(level=logging.INFO if verbose else logging.WARNING, @@ -93,8 +180,6 @@ class TrendDetectorSimple: self.data = pd.DataFrame({'close': self.data}) else: raise ValueError("Data must be a pandas DataFrame or a list") - - self.logger.info(f"Initialized TrendDetectorSimple with {len(self.data)} data points") def calculate_tr(self): """ @@ -169,63 +254,62 @@ class TrendDetectorSimple: - DataFrame with columns for timestamps, prices, and trend indicators - Dictionary containing analysis results including linear regression, SMAs, and SuperTrend indicators """ - df = self.data.copy() - close_prices = df['close'].values + df = self.data + # close_prices = df['close'].values - max_peaks, _ = find_peaks(close_prices) - min_peaks, _ = find_peaks(-close_prices) + # max_peaks, _ = find_peaks(close_prices) + # min_peaks, _ = find_peaks(-close_prices) - df['is_min'] = False - df['is_max'] = False + # df['is_min'] = False + # df['is_max'] = False - for peak in max_peaks: - df.at[peak, 'is_max'] = True - for peak in min_peaks: - df.at[peak, 'is_min'] = True + # for peak in max_peaks: + # df.at[peak, 'is_max'] = True + # for peak in min_peaks: + # df.at[peak, 'is_min'] = True - result = df[['datetime', 'close', 'is_min', 'is_max']].copy() + # result = df[['timestamp', 'close', 'is_min', 'is_max']].copy() # Perform linear regression on min_peaks and max_peaks - min_prices = df['close'].iloc[min_peaks].values - max_prices = df['close'].iloc[max_peaks].values + # min_prices = df['close'].iloc[min_peaks].values + # max_prices = df['close'].iloc[max_peaks].values # Linear regression for min peaks if we have at least 2 points - min_slope, min_intercept, min_r_value, _, _ = stats.linregress(min_peaks, min_prices) + # min_slope, min_intercept, min_r_value, _, _ = stats.linregress(min_peaks, min_prices) # Linear regression for max peaks if we have at least 2 points - max_slope, max_intercept, max_r_value, _, _ = stats.linregress(max_peaks, max_prices) + # max_slope, max_intercept, max_r_value, _, _ = stats.linregress(max_peaks, max_prices) # Calculate Simple Moving Averages (SMA) for 7 and 15 periods - sma_7 = pd.Series(close_prices).rolling(window=7, min_periods=1).mean().values - sma_15 = pd.Series(close_prices).rolling(window=15, min_periods=1).mean().values + # sma_7 = pd.Series(close_prices).rolling(window=7, min_periods=1).mean().values + # sma_15 = pd.Series(close_prices).rolling(window=15, min_periods=1).mean().values analysis_results = {} - analysis_results['linear_regression'] = { - 'min': { - 'slope': min_slope, - 'intercept': min_intercept, - 'r_squared': min_r_value ** 2 - }, - 'max': { - 'slope': max_slope, - 'intercept': max_intercept, - 'r_squared': max_r_value ** 2 - } - } - analysis_results['sma'] = { - '7': sma_7, - '15': sma_15 - } + # analysis_results['linear_regression'] = { + # 'min': { + # 'slope': min_slope, + # 'intercept': min_intercept, + # 'r_squared': min_r_value ** 2 + # }, + # 'max': { + # 'slope': max_slope, + # 'intercept': max_intercept, + # 'r_squared': max_r_value ** 2 + # } + # } + # analysis_results['sma'] = { + # '7': sma_7, + # '15': sma_15 + # } # Calculate SuperTrend indicators supertrend_results_list = self._calculate_supertrend_indicators() analysis_results['supertrend'] = supertrend_results_list - return result, analysis_results + return analysis_results def _calculate_supertrend_indicators(self): """ - Calculate SuperTrend indicators with different parameter sets. - + Calculate SuperTrend indicators with different parameter sets in parallel. Returns: - list, the SuperTrend results """ @@ -234,112 +318,22 @@ class TrendDetectorSimple: {"period": 10, "multiplier": 1.0, "color_up": ST_COLOR_UP, "color_down": ST_COLOR_DOWN}, {"period": 11, "multiplier": 2.0, "color_up": ST_COLOR_UP, "color_down": ST_COLOR_DOWN} ] + data = self.data.copy() + + # For just 3 calculations, direct calculation might be faster than process pool + results = [] + for p in supertrend_params: + result = calculate_supertrend_external(data, p["period"], p["multiplier"]) + results.append(result) supertrend_results_list = [] - for params in supertrend_params: - supertrend_results = self.calculate_supertrend( - period=params["period"], - multiplier=params["multiplier"] - ) + for params, result in zip(supertrend_params, results): supertrend_results_list.append({ - "results": supertrend_results, + "results": result, "params": params }) - return supertrend_results_list - def calculate_supertrend(self, period, multiplier): - """ - Calculate SuperTrend indicator for the price data. - - SuperTrend is a trend-following indicator that uses ATR to determine the trend direction. - - Parameters: - - period: int, the period for the ATR calculation (default: 10) - - multiplier: float, the multiplier for the ATR (default: 3.0) - - Returns: - - Dictionary containing SuperTrend values, trend direction, and upper/lower bands - """ - df = self.data.copy() - high = df['high'].values - low = df['low'].values - close = df['close'].values - - # Calculate ATR - atr = self.calculate_atr(period) - - # Calculate basic upper and lower bands - upper_band = np.zeros_like(close) - lower_band = np.zeros_like(close) - - for i in range(len(close)): - # Calculate the basic bands - hl_avg = (high[i] + low[i]) / 2 - upper_band[i] = hl_avg + (multiplier * atr[i]) - lower_band[i] = hl_avg - (multiplier * atr[i]) - - # Calculate final upper and lower bands with trend logic - final_upper = np.zeros_like(close) - final_lower = np.zeros_like(close) - supertrend = np.zeros_like(close) - trend = np.zeros_like(close) # 1 for uptrend, -1 for downtrend - - # Initialize first values - final_upper[0] = upper_band[0] - final_lower[0] = lower_band[0] - - # If close price is above upper band, we're in a downtrend (ST = upper band) - # If close price is below lower band, we're in an uptrend (ST = lower band) - if close[0] <= upper_band[0]: - supertrend[0] = upper_band[0] - trend[0] = -1 # Downtrend - else: - supertrend[0] = lower_band[0] - trend[0] = 1 # Uptrend - - # Calculate SuperTrend for the rest of the data - for i in range(1, len(close)): - # Calculate final upper band - if (upper_band[i] < final_upper[i-1]) or (close[i-1] > final_upper[i-1]): - final_upper[i] = upper_band[i] - else: - final_upper[i] = final_upper[i-1] - - # Calculate final lower band - if (lower_band[i] > final_lower[i-1]) or (close[i-1] < final_lower[i-1]): - final_lower[i] = lower_band[i] - else: - final_lower[i] = final_lower[i-1] - - # Determine trend and SuperTrend value - if supertrend[i-1] == final_upper[i-1] and close[i] <= final_upper[i]: - # Continuing downtrend - supertrend[i] = final_upper[i] - trend[i] = -1 - elif supertrend[i-1] == final_upper[i-1] and close[i] > final_upper[i]: - # Switching to uptrend - supertrend[i] = final_lower[i] - trend[i] = 1 - elif supertrend[i-1] == final_lower[i-1] and close[i] >= final_lower[i]: - # Continuing uptrend - supertrend[i] = final_lower[i] - trend[i] = 1 - elif supertrend[i-1] == final_lower[i-1] and close[i] < final_lower[i]: - # Switching to downtrend - supertrend[i] = final_upper[i] - trend[i] = -1 - - # Prepare result - supertrend_results = { - 'supertrend': supertrend, - 'trend': trend, - 'upper_band': final_upper, - 'lower_band': final_lower - } - - return supertrend_results - def plot_trends(self, trend_data, analysis_results, view="both"): """ Plot the price data with detected trends using a candlestick chart. @@ -353,6 +347,9 @@ class TrendDetectorSimple: Returns: - None (displays the plot) """ + if not self.display: + return # Do nothing if display is False + import matplotlib.pyplot as plt from matplotlib.patches import Rectangle @@ -611,6 +608,9 @@ class TrendDetectorSimple: supertrends = [st["results"]["supertrend"] for st in supertrend_results_list] params = supertrend_results_list[0]["params"] # Use first config for styling + trends_arr = np.stack(trends, axis=1) + meta_trend = np.where((trends_arr[:,0] == trends_arr[:,1]) & (trends_arr[:,1] == trends_arr[:,2]), trends_arr[:,0], 0) + for i in range(1, len(x_vals)): t1, t2, t3 = trends[0][i], trends[1][i], trends[2][i] if t1 == t2 == t3: @@ -640,4 +640,152 @@ class TrendDetectorSimple: label=f'ST (P:{period}, M:{multiplier}) Up') ax.plot([], [], color_down, linewidth=self.line_width, label=f'ST (P:{period}, M:{multiplier}) Down') + + def backtest_meta_supertrend(self, initial_usd=10000, stop_loss_pct=0.05): + """ + Backtest a simple strategy using the meta supertrend (all three supertrends agree). + Buys when meta supertrend is positive, sells when negative, applies a percentage stop loss. + + Parameters: + - initial_usd: float, starting USD amount + - stop_loss_pct: float, stop loss as a fraction (e.g. 0.05 for 5%) + """ + import pandas as pd + df = self.data.copy().reset_index(drop=True) + df['timestamp'] = pd.to_datetime(df['timestamp']) + + if len(df) == 0: + self.logger.warning("No data available for backtest.") + return { + "initial_usd": initial_usd, + "final_usd": initial_usd, + "n_trades": 0, + "win_rate": 0, + "max_drawdown": 0, + "avg_trade": 0, + "trade_log": [], + "first_trade": {}, + "last_trade": {}, + "trades": [], + } + + # Get meta supertrend (all three agree) + supertrend_results_list = self._calculate_supertrend_indicators() + trends = [st['results']['trend'] for st in supertrend_results_list] + trends_arr = np.stack(trends, axis=1) + meta_trend = np.where((trends_arr[:,0] == trends_arr[:,1]) & (trends_arr[:,1] == trends_arr[:,2]), + trends_arr[:,0], 0) + + # Precompute buy/sell signals + buy_signals = (meta_trend == 1) & (np.roll(meta_trend, 1) != 1) + sell_signals = (meta_trend == -1) & (np.roll(meta_trend, 1) != -1) + buy_signals[0] = False # Ignore first element due to np.roll + sell_signals[0] = False + + position = 0 # 0 = no position, 1 = long + entry_price = 0 + usd = initial_usd + coin = 0 + trade_log = [] + max_balance = initial_usd + drawdowns = [] + trades = [] + + for i in range(1, len(df)): + if i % 100 == 0: + self.logger.debug(f"Progress: {i}/{len(df)} rows processed.") + + price_open = df['open'].iloc[i] + price_high = df['high'].iloc[i] + price_low = df['low'].iloc[i] + price_close = df['close'].iloc[i] + date = df['timestamp'].iloc[i] + mt = meta_trend[i] + + # Check stop loss if in position + if position == 1: + stop_price = entry_price * (1 - stop_loss_pct) + if price_low <= stop_price: + # Stop loss triggered + sell_price = stop_price + usd = coin * sell_price + trade_log.append({'type': 'STOP', 'entry': entry_price, 'exit': sell_price, 'entry_time': entry_time, 'exit_time': date}) + coin = 0 + position = 0 + entry_price = 0 + continue + + # Entry logic + if position == 0 and mt == 1: + # Buy at open + coin = usd / price_open + entry_price = price_open + entry_time = date + usd = 0 + position = 1 + # Exit logic + elif position == 1 and mt == -1: + # Sell at open + usd = coin * price_open + trade_log.append({'type': 'SELL', 'entry': entry_price, 'exit': price_open, 'entry_time': entry_time, 'exit_time': date}) + coin = 0 + position = 0 + entry_price = 0 + + # Track drawdown + balance = usd if position == 0 else coin * price_close + if balance > max_balance: + max_balance = balance + drawdown = (max_balance - balance) / max_balance + drawdowns.append(drawdown) + if i % 1000 == 0 or i == len(df) - 1: + self.logger.debug(f"Progress: {i}/{len(df)} rows processed.") + + # If still in position at end, sell at last close + if position == 1: + usd = coin * df['close'].iloc[-1] + trade_log.append({'type': 'EOD', 'entry': entry_price, 'exit': df['close'].iloc[-1], 'entry_time': entry_time, 'exit_time': df['timestamp'].iloc[-1]}) + coin = 0 + position = 0 + entry_price = 0 + + final_balance = usd + n_trades = len(trade_log) + wins = [1 for t in trade_log if t['exit'] > t['entry']] + win_rate = len(wins) / n_trades if n_trades > 0 else 0 + max_drawdown = max(drawdowns) if drawdowns else 0 + avg_trade = np.mean([t['exit']/t['entry']-1 for t in trade_log]) if trade_log else 0 + + trades = [] + for trade in trade_log: + profit_pct = (trade['exit'] - trade['entry']) / trade['entry'] + trades.append({ + 'entry_time': trade['entry_time'], + 'exit_time': trade['exit_time'], + 'entry': trade['entry'], + 'exit': trade['exit'], + 'profit_pct': profit_pct, + 'type': trade.get('type', 'SELL') + }) + + results = { + "initial_usd": initial_usd, + "final_usd": final_balance, + "n_trades": n_trades, + "win_rate": win_rate, + "max_drawdown": max_drawdown, + "avg_trade": avg_trade, + "trade_log": trade_log, + "trades": trades, + } + if n_trades > 0: + results["first_trade"] = { + "entry_time": trade_log[0]['entry_time'], + "entry": trade_log[0]['entry'] + } + results["last_trade"] = { + "exit_time": trade_log[-1]['exit_time'], + "exit": trade_log[-1]['exit'] + } + return results \ No newline at end of file