Enhance main.py with optimized data loading, logging setup, and concurrent processing for backtesting. Introduce new functions for data handling and results aggregation. Update TrendDetectorSimple to support meta supertrend backtesting and improve SuperTrend calculations with caching and parallel execution. Refactor TrendDetectorMACD for better performance in trend detection.

This commit is contained in:
Simon Moisy 2025-05-16 02:44:22 +08:00
parent 7c4db08b1b
commit ec8b1a7cf2
4 changed files with 597 additions and 188 deletions

255
main.py
View File

@ -1,20 +1,253 @@
import pandas as pd
import numpy as np
from trend_detector_macd import TrendDetectorMACD
from trend_detector_simple import TrendDetectorSimple
from cycle_detector import CycleDetector
import csv
import logging
import concurrent.futures
import os
import psutil
# Load data from CSV file instead of database
data = pd.read_csv('data/btcusd_1-day_data.csv')
# Set up logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.FileHandler("backtest.log"),
logging.StreamHandler()
]
)
def get_optimal_workers():
"""Determine optimal number of worker processes based on system resources"""
cpu_count = os.cpu_count() or 4
memory_gb = psutil.virtual_memory().total / (1024**3)
# Heuristic: Use 75% of cores, but cap based on available memory
# Assume each worker needs ~2GB for large datasets
workers_by_memory = max(1, int(memory_gb / 2))
workers_by_cpu = max(1, int(cpu_count * 0.75))
return min(workers_by_cpu, workers_by_memory)
# Convert datetime column to datetime type
start_date = pd.to_datetime('2024-04-06')
stop_date = pd.to_datetime('2025-05-06')
def load_data(file_path, start_date, stop_date):
"""Load data with optimized dtypes and filtering"""
# Define optimized dtypes
dtypes = {
'Open': 'float32',
'High': 'float32',
'Low': 'float32',
'Close': 'float32',
'Volume': 'float32'
}
# Read data with original capitalized column names
data = pd.read_csv(file_path, dtype=dtypes)
# Convert timestamp to datetime
data['Timestamp'] = pd.to_datetime(data['Timestamp'], unit='s')
# Filter by date range
data = data[(data['Timestamp'] >= start_date) & (data['Timestamp'] <= stop_date)]
# Now convert column names to lowercase
data.columns = data.columns.str.lower()
return data.set_index('timestamp')
daily_data = data[(pd.to_datetime(data['datetime']) >= start_date) &
(pd.to_datetime(data['datetime']) < stop_date)]
print(f"Number of data points: {len(daily_data)}")
def process_month_timeframe(month_df, stop_loss_pcts, rule_name, initial_usd):
"""Process a single month for a given timeframe with all stop loss values"""
month_df = month_df.copy().reset_index(drop=True)
# Only calculate trends once per month-timeframe combination
trend_detector = TrendDetectorSimple(month_df, verbose=False)
analysis_results = trend_detector.detect_trends()
# Calculate backtest for each stop_loss_pct
results_rows = []
for stop_loss_pct in stop_loss_pcts:
results = trend_detector.backtest_meta_supertrend(
initial_usd=initial_usd,
stop_loss_pct=stop_loss_pct
)
# Process results
n_trades = results["n_trades"]
trades = results.get('trades', [])
n_winning_trades = sum(1 for trade in trades if trade['profit_pct'] > 0)
total_profit = sum(trade['profit_pct'] for trade in trades)
total_loss = sum(-trade['profit_pct'] for trade in trades if trade['profit_pct'] < 0)
win_rate = n_winning_trades / n_trades if n_trades > 0 else 0
avg_trade = total_profit / n_trades if n_trades > 0 else 0
profit_ratio = total_profit / total_loss if total_loss > 0 else float('inf')
# Calculate max drawdown
cumulative_profit = 0
max_drawdown = 0
peak = 0
for trade in trades:
cumulative_profit += trade['profit_pct']
if cumulative_profit > peak:
peak = cumulative_profit
drawdown = peak - cumulative_profit
if drawdown > max_drawdown:
max_drawdown = drawdown
# Create row
row = {
"timeframe": rule_name,
"month": str(month_df['timestamp'].iloc[0].to_period('M')),
"stop_loss_pct": stop_loss_pct,
"n_trades": n_trades,
"n_stop_loss": sum(1 for trade in trades if 'type' in trade and trade['type'] == 'STOP'),
"win_rate": win_rate,
"max_drawdown": max_drawdown,
"avg_trade": avg_trade,
"profit_ratio": profit_ratio
}
results_rows.append(row)
return results_rows
trend_detector = TrendDetectorSimple(daily_data, verbose=True)
trends, analysis_results = trend_detector.detect_trends()
trend_detector.plot_trends(trends, analysis_results, "supertrend")
def process_timeframe(timeframe_info):
"""Process an entire timeframe"""
rule, rule_name, data_1min, stop_loss_pcts, initial_usd = timeframe_info
# Resample data if needed
if rule == "1T":
df = data_1min.copy()
else:
df = data_1min.resample(rule).agg({
'open': 'first',
'high': 'max',
'low': 'min',
'close': 'last',
'volume': 'sum'
}).dropna()
df = df.reset_index()
df['month'] = df['timestamp'].dt.to_period('M')
results_rows = []
# Process each month
for month, month_df in df.groupby('month'):
if len(month_df) < 10: # Skip very small months
continue
logging.info(f"Processing: timeframe={rule_name}, month={month}")
try:
month_results = process_month_timeframe(month_df, stop_loss_pcts, rule_name, initial_usd)
results_rows.extend(month_results)
# Write intermediate results to avoid memory buildup
if len(results_rows) > 100:
return results_rows
except Exception as e:
logging.error(f"Error processing {rule_name}, month={month}: {str(e)}")
return results_rows
def write_results_chunk(filename, fieldnames, rows, write_header=False):
"""Write a chunk of results to a CSV file"""
mode = 'w' if write_header else 'a'
with open(filename, mode, newline="") as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
if write_header:
csvfile.write(f"# initial_usd: {initial_usd}\n")
writer.writeheader()
for row in rows:
writer.writerow(row)
def aggregate_results(all_rows):
"""Aggregate results per stop_loss_pct and per rule (timeframe)"""
from collections import defaultdict
grouped = defaultdict(list)
for row in all_rows:
key = (row['timeframe'], row['stop_loss_pct'])
grouped[key].append(row)
summary_rows = []
for (rule, stop_loss_pct), rows in grouped.items():
n_months = len(rows)
total_trades = sum(r['n_trades'] for r in rows)
total_stop_loss = sum(r['n_stop_loss'] for r in rows)
avg_win_rate = np.mean([r['win_rate'] for r in rows])
avg_max_drawdown = np.mean([r['max_drawdown'] for r in rows])
avg_avg_trade = np.mean([r['avg_trade'] for r in rows])
avg_profit_ratio = np.mean([r['profit_ratio'] for r in rows])
summary_rows.append({
"timeframe": rule,
"stop_loss_pct": stop_loss_pct,
"n_trades": total_trades,
"n_stop_loss": total_stop_loss,
"win_rate": avg_win_rate,
"max_drawdown": avg_max_drawdown,
"avg_trade": avg_avg_trade,
"profit_ratio": avg_profit_ratio,
})
return summary_rows
if __name__ == "__main__":
# Configuration
start_date = '2020-01-01'
stop_date = '2025-05-15'
initial_usd = 10000
timeframes = {
# "1T": "1min",
"15T": "15min",
"1H": "1h",
"6H": "6h",
"1D": "1D",
}
stop_loss_pcts = [0.01, 0.02, 0.03, 0.05, 0.07, 0.10]
# Load data once
data_1min = load_data('./data/btcusd_1-min_data.csv', start_date, stop_date)
logging.info(f"1min rows: {len(data_1min)}")
# Set up result file
filename = f"backtest_results_{start_date}_{stop_date}_multi_timeframe_stoploss.csv"
fieldnames = ["timeframe", "stop_loss_pct", "n_trades", "n_stop_loss", "win_rate", "max_drawdown", "avg_trade", "profit_ratio"]
# Initialize output file with header
write_results_chunk(filename, fieldnames, [], write_header=True)
# Prepare tasks
tasks = [
(rule, name, data_1min, stop_loss_pcts, initial_usd)
for rule, name in timeframes.items()
]
# Determine optimal worker count
workers = get_optimal_workers()
logging.info(f"Using {workers} workers for processing")
# Process tasks with optimized concurrency
with concurrent.futures.ProcessPoolExecutor(max_workers=workers) as executor:
futures = {executor.submit(process_timeframe, task): task[1] for task in tasks}
# Collect all results
all_results = []
for future in concurrent.futures.as_completed(futures):
timeframe_name = futures[future]
try:
results = future.result()
if results:
# logging.info(f"Writing {len(results)} results for {timeframe_name}")
# write_results_chunk(filename, fieldnames, results) # <-- REMOVE or COMMENT THIS OUT
all_results.extend(results)
except Exception as exc:
logging.error(f"{timeframe_name} generated an exception: {exc}")
# Write summary rows
summary_rows = aggregate_results(all_results)
write_results_chunk(filename, fieldnames, summary_rows, write_header=True) # Only write summary
logging.info(f"Results written to {filename}")

Binary file not shown.

View File

@ -6,6 +6,8 @@ import matplotlib.dates as mdates
import logging
import mplfinance as mpf
from matplotlib.patches import Rectangle
from concurrent.futures import ProcessPoolExecutor, as_completed
import concurrent.futures
class TrendDetectorMACD:
def __init__(self, data, verbose=False):
@ -24,8 +26,6 @@ class TrendDetectorMACD:
else:
self.logger.error("Invalid data format provided")
raise ValueError("Data must be a pandas DataFrame or a list")
self.logger.info(f"Initialized TrendDetector with {len(self.data)} data points")
def detect_trends_MACD_signal(self):
self.logger.info("Starting trend detection")
@ -257,3 +257,31 @@ class TrendDetectorMACD:
plt.show()
return plt
def _calculate_supertrend_indicators(self):
"""
Calculate SuperTrend indicators with different parameter sets in parallel.
Returns:
- list, the SuperTrend results
"""
supertrend_params = [
{"period": 12, "multiplier": 3.0, "color_up": ST_COLOR_UP, "color_down": ST_COLOR_DOWN},
{"period": 10, "multiplier": 1.0, "color_up": ST_COLOR_UP, "color_down": ST_COLOR_DOWN},
{"period": 11, "multiplier": 2.0, "color_up": ST_COLOR_UP, "color_down": ST_COLOR_DOWN}
]
def run_supertrend(params):
# Each thread gets its own copy of the data to avoid race conditions
return {
"results": self.calculate_supertrend(
period=params["period"],
multiplier=params["multiplier"]
),
"params": params
}
with concurrent.futures.ThreadPoolExecutor() as executor:
results = list(executor.map(run_supertrend, supertrend_params))
return results

View File

@ -5,6 +5,9 @@ from scipy.signal import find_peaks
from matplotlib.patches import Rectangle
from scipy import stats
from scipy import stats
import concurrent.futures
from functools import partial
from functools import lru_cache
# Color configuration
# Plot colors
@ -31,56 +34,140 @@ SMA15_LINE_STYLE = 'm-' # Magenta solid
ST_COLOR_UP = 'g-'
ST_COLOR_DOWN = 'r-'
# Cache the calculation results by function parameters
@lru_cache(maxsize=32)
def cached_supertrend_calculation(period, multiplier, data_tuple):
# Convert tuple back to numpy arrays
high = np.array(data_tuple[0])
low = np.array(data_tuple[1])
close = np.array(data_tuple[2])
# Calculate TR and ATR using vectorized operations
tr = np.zeros_like(close)
tr[0] = high[0] - low[0]
hc_range = np.abs(high[1:] - close[:-1])
lc_range = np.abs(low[1:] - close[:-1])
hl_range = high[1:] - low[1:]
tr[1:] = np.maximum.reduce([hl_range, hc_range, lc_range])
# Use numpy's exponential moving average
atr = np.zeros_like(tr)
atr[0] = tr[0]
multiplier_ema = 2.0 / (period + 1)
for i in range(1, len(tr)):
atr[i] = (tr[i] * multiplier_ema) + (atr[i-1] * (1 - multiplier_ema))
# Calculate bands
upper_band = np.zeros_like(close)
lower_band = np.zeros_like(close)
for i in range(len(close)):
hl_avg = (high[i] + low[i]) / 2
upper_band[i] = hl_avg + (multiplier * atr[i])
lower_band[i] = hl_avg - (multiplier * atr[i])
final_upper = np.zeros_like(close)
final_lower = np.zeros_like(close)
supertrend = np.zeros_like(close)
trend = np.zeros_like(close)
final_upper[0] = upper_band[0]
final_lower[0] = lower_band[0]
if close[0] <= upper_band[0]:
supertrend[0] = upper_band[0]
trend[0] = -1
else:
supertrend[0] = lower_band[0]
trend[0] = 1
for i in range(1, len(close)):
if (upper_band[i] < final_upper[i-1]) or (close[i-1] > final_upper[i-1]):
final_upper[i] = upper_band[i]
else:
final_upper[i] = final_upper[i-1]
if (lower_band[i] > final_lower[i-1]) or (close[i-1] < final_lower[i-1]):
final_lower[i] = lower_band[i]
else:
final_lower[i] = final_lower[i-1]
if supertrend[i-1] == final_upper[i-1] and close[i] <= final_upper[i]:
supertrend[i] = final_upper[i]
trend[i] = -1
elif supertrend[i-1] == final_upper[i-1] and close[i] > final_upper[i]:
supertrend[i] = final_lower[i]
trend[i] = 1
elif supertrend[i-1] == final_lower[i-1] and close[i] >= final_lower[i]:
supertrend[i] = final_lower[i]
trend[i] = 1
elif supertrend[i-1] == final_lower[i-1] and close[i] < final_lower[i]:
supertrend[i] = final_upper[i]
trend[i] = -1
return {
'supertrend': supertrend,
'trend': trend,
'upper_band': final_upper,
'lower_band': final_lower
}
def calculate_supertrend_external(data, period, multiplier):
# Convert DataFrame columns to hashable tuples
high_tuple = tuple(data['high'].values)
low_tuple = tuple(data['low'].values)
close_tuple = tuple(data['close'].values)
# Call the cached function
return cached_supertrend_calculation(period, multiplier, (high_tuple, low_tuple, close_tuple))
class TrendDetectorSimple:
def __init__(self, data, verbose=False):
def __init__(self, data, verbose=False, display=False):
"""
Initialize the TrendDetectorSimple class.
Parameters:
- data: pandas DataFrame containing price data
- verbose: boolean, whether to display detailed logging information
- display: boolean, whether to enable display/plotting features
"""
self.data = data
self.verbose = verbose
self.display = display
# Plot style configuration
self.plot_style = 'dark_background'
self.bg_color = DARK_BG_COLOR
self.plot_size = (12, 8)
# Candlestick configuration
self.candle_width = 0.6
self.candle_up_color = CANDLE_UP_COLOR
self.candle_down_color = CANDLE_DOWN_COLOR
self.candle_alpha = 0.8
self.wick_width = 1
# Marker configuration
self.min_marker = '^'
self.min_color = MIN_COLOR
self.min_size = 100
self.max_marker = 'v'
self.max_color = MAX_COLOR
self.max_size = 100
self.marker_zorder = 100
# Line configuration
self.line_width = 1
self.min_line_style = MIN_LINE_STYLE
self.max_line_style = MAX_LINE_STYLE
self.sma7_line_style = SMA7_LINE_STYLE
self.sma15_line_style = SMA15_LINE_STYLE
# Text configuration
self.title_size = 14
self.title_color = TITLE_COLOR
self.axis_label_size = 12
self.axis_label_color = AXIS_LABEL_COLOR
# Legend configuration
self.legend_loc = 'best'
self.legend_bg_color = LEGEND_BG_COLOR
# Only define display-related variables if display is True
if self.display:
# Plot style configuration
self.plot_style = 'dark_background'
self.bg_color = DARK_BG_COLOR
self.plot_size = (12, 8)
# Candlestick configuration
self.candle_width = 0.6
self.candle_up_color = CANDLE_UP_COLOR
self.candle_down_color = CANDLE_DOWN_COLOR
self.candle_alpha = 0.8
self.wick_width = 1
# Marker configuration
self.min_marker = '^'
self.min_color = MIN_COLOR
self.min_size = 100
self.max_marker = 'v'
self.max_color = MAX_COLOR
self.max_size = 100
self.marker_zorder = 100
# Line configuration
self.line_width = 1
self.min_line_style = MIN_LINE_STYLE
self.max_line_style = MAX_LINE_STYLE
self.sma7_line_style = SMA7_LINE_STYLE
self.sma15_line_style = SMA15_LINE_STYLE
# Text configuration
self.title_size = 14
self.title_color = TITLE_COLOR
self.axis_label_size = 12
self.axis_label_color = AXIS_LABEL_COLOR
# Legend configuration
self.legend_loc = 'best'
self.legend_bg_color = LEGEND_BG_COLOR
# Configure logging
logging.basicConfig(level=logging.INFO if verbose else logging.WARNING,
@ -93,8 +180,6 @@ class TrendDetectorSimple:
self.data = pd.DataFrame({'close': self.data})
else:
raise ValueError("Data must be a pandas DataFrame or a list")
self.logger.info(f"Initialized TrendDetectorSimple with {len(self.data)} data points")
def calculate_tr(self):
"""
@ -169,63 +254,62 @@ class TrendDetectorSimple:
- DataFrame with columns for timestamps, prices, and trend indicators
- Dictionary containing analysis results including linear regression, SMAs, and SuperTrend indicators
"""
df = self.data.copy()
close_prices = df['close'].values
df = self.data
# close_prices = df['close'].values
max_peaks, _ = find_peaks(close_prices)
min_peaks, _ = find_peaks(-close_prices)
# max_peaks, _ = find_peaks(close_prices)
# min_peaks, _ = find_peaks(-close_prices)
df['is_min'] = False
df['is_max'] = False
# df['is_min'] = False
# df['is_max'] = False
for peak in max_peaks:
df.at[peak, 'is_max'] = True
for peak in min_peaks:
df.at[peak, 'is_min'] = True
# for peak in max_peaks:
# df.at[peak, 'is_max'] = True
# for peak in min_peaks:
# df.at[peak, 'is_min'] = True
result = df[['datetime', 'close', 'is_min', 'is_max']].copy()
# result = df[['timestamp', 'close', 'is_min', 'is_max']].copy()
# Perform linear regression on min_peaks and max_peaks
min_prices = df['close'].iloc[min_peaks].values
max_prices = df['close'].iloc[max_peaks].values
# min_prices = df['close'].iloc[min_peaks].values
# max_prices = df['close'].iloc[max_peaks].values
# Linear regression for min peaks if we have at least 2 points
min_slope, min_intercept, min_r_value, _, _ = stats.linregress(min_peaks, min_prices)
# min_slope, min_intercept, min_r_value, _, _ = stats.linregress(min_peaks, min_prices)
# Linear regression for max peaks if we have at least 2 points
max_slope, max_intercept, max_r_value, _, _ = stats.linregress(max_peaks, max_prices)
# max_slope, max_intercept, max_r_value, _, _ = stats.linregress(max_peaks, max_prices)
# Calculate Simple Moving Averages (SMA) for 7 and 15 periods
sma_7 = pd.Series(close_prices).rolling(window=7, min_periods=1).mean().values
sma_15 = pd.Series(close_prices).rolling(window=15, min_periods=1).mean().values
# sma_7 = pd.Series(close_prices).rolling(window=7, min_periods=1).mean().values
# sma_15 = pd.Series(close_prices).rolling(window=15, min_periods=1).mean().values
analysis_results = {}
analysis_results['linear_regression'] = {
'min': {
'slope': min_slope,
'intercept': min_intercept,
'r_squared': min_r_value ** 2
},
'max': {
'slope': max_slope,
'intercept': max_intercept,
'r_squared': max_r_value ** 2
}
}
analysis_results['sma'] = {
'7': sma_7,
'15': sma_15
}
# analysis_results['linear_regression'] = {
# 'min': {
# 'slope': min_slope,
# 'intercept': min_intercept,
# 'r_squared': min_r_value ** 2
# },
# 'max': {
# 'slope': max_slope,
# 'intercept': max_intercept,
# 'r_squared': max_r_value ** 2
# }
# }
# analysis_results['sma'] = {
# '7': sma_7,
# '15': sma_15
# }
# Calculate SuperTrend indicators
supertrend_results_list = self._calculate_supertrend_indicators()
analysis_results['supertrend'] = supertrend_results_list
return result, analysis_results
return analysis_results
def _calculate_supertrend_indicators(self):
"""
Calculate SuperTrend indicators with different parameter sets.
Calculate SuperTrend indicators with different parameter sets in parallel.
Returns:
- list, the SuperTrend results
"""
@ -234,112 +318,22 @@ class TrendDetectorSimple:
{"period": 10, "multiplier": 1.0, "color_up": ST_COLOR_UP, "color_down": ST_COLOR_DOWN},
{"period": 11, "multiplier": 2.0, "color_up": ST_COLOR_UP, "color_down": ST_COLOR_DOWN}
]
data = self.data.copy()
# For just 3 calculations, direct calculation might be faster than process pool
results = []
for p in supertrend_params:
result = calculate_supertrend_external(data, p["period"], p["multiplier"])
results.append(result)
supertrend_results_list = []
for params in supertrend_params:
supertrend_results = self.calculate_supertrend(
period=params["period"],
multiplier=params["multiplier"]
)
for params, result in zip(supertrend_params, results):
supertrend_results_list.append({
"results": supertrend_results,
"results": result,
"params": params
})
return supertrend_results_list
def calculate_supertrend(self, period, multiplier):
"""
Calculate SuperTrend indicator for the price data.
SuperTrend is a trend-following indicator that uses ATR to determine the trend direction.
Parameters:
- period: int, the period for the ATR calculation (default: 10)
- multiplier: float, the multiplier for the ATR (default: 3.0)
Returns:
- Dictionary containing SuperTrend values, trend direction, and upper/lower bands
"""
df = self.data.copy()
high = df['high'].values
low = df['low'].values
close = df['close'].values
# Calculate ATR
atr = self.calculate_atr(period)
# Calculate basic upper and lower bands
upper_band = np.zeros_like(close)
lower_band = np.zeros_like(close)
for i in range(len(close)):
# Calculate the basic bands
hl_avg = (high[i] + low[i]) / 2
upper_band[i] = hl_avg + (multiplier * atr[i])
lower_band[i] = hl_avg - (multiplier * atr[i])
# Calculate final upper and lower bands with trend logic
final_upper = np.zeros_like(close)
final_lower = np.zeros_like(close)
supertrend = np.zeros_like(close)
trend = np.zeros_like(close) # 1 for uptrend, -1 for downtrend
# Initialize first values
final_upper[0] = upper_band[0]
final_lower[0] = lower_band[0]
# If close price is above upper band, we're in a downtrend (ST = upper band)
# If close price is below lower band, we're in an uptrend (ST = lower band)
if close[0] <= upper_band[0]:
supertrend[0] = upper_band[0]
trend[0] = -1 # Downtrend
else:
supertrend[0] = lower_band[0]
trend[0] = 1 # Uptrend
# Calculate SuperTrend for the rest of the data
for i in range(1, len(close)):
# Calculate final upper band
if (upper_band[i] < final_upper[i-1]) or (close[i-1] > final_upper[i-1]):
final_upper[i] = upper_band[i]
else:
final_upper[i] = final_upper[i-1]
# Calculate final lower band
if (lower_band[i] > final_lower[i-1]) or (close[i-1] < final_lower[i-1]):
final_lower[i] = lower_band[i]
else:
final_lower[i] = final_lower[i-1]
# Determine trend and SuperTrend value
if supertrend[i-1] == final_upper[i-1] and close[i] <= final_upper[i]:
# Continuing downtrend
supertrend[i] = final_upper[i]
trend[i] = -1
elif supertrend[i-1] == final_upper[i-1] and close[i] > final_upper[i]:
# Switching to uptrend
supertrend[i] = final_lower[i]
trend[i] = 1
elif supertrend[i-1] == final_lower[i-1] and close[i] >= final_lower[i]:
# Continuing uptrend
supertrend[i] = final_lower[i]
trend[i] = 1
elif supertrend[i-1] == final_lower[i-1] and close[i] < final_lower[i]:
# Switching to downtrend
supertrend[i] = final_upper[i]
trend[i] = -1
# Prepare result
supertrend_results = {
'supertrend': supertrend,
'trend': trend,
'upper_band': final_upper,
'lower_band': final_lower
}
return supertrend_results
def plot_trends(self, trend_data, analysis_results, view="both"):
"""
Plot the price data with detected trends using a candlestick chart.
@ -353,6 +347,9 @@ class TrendDetectorSimple:
Returns:
- None (displays the plot)
"""
if not self.display:
return # Do nothing if display is False
import matplotlib.pyplot as plt
from matplotlib.patches import Rectangle
@ -611,6 +608,9 @@ class TrendDetectorSimple:
supertrends = [st["results"]["supertrend"] for st in supertrend_results_list]
params = supertrend_results_list[0]["params"] # Use first config for styling
trends_arr = np.stack(trends, axis=1)
meta_trend = np.where((trends_arr[:,0] == trends_arr[:,1]) & (trends_arr[:,1] == trends_arr[:,2]), trends_arr[:,0], 0)
for i in range(1, len(x_vals)):
t1, t2, t3 = trends[0][i], trends[1][i], trends[2][i]
if t1 == t2 == t3:
@ -640,4 +640,152 @@ class TrendDetectorSimple:
label=f'ST (P:{period}, M:{multiplier}) Up')
ax.plot([], [], color_down, linewidth=self.line_width,
label=f'ST (P:{period}, M:{multiplier}) Down')
def backtest_meta_supertrend(self, initial_usd=10000, stop_loss_pct=0.05):
"""
Backtest a simple strategy using the meta supertrend (all three supertrends agree).
Buys when meta supertrend is positive, sells when negative, applies a percentage stop loss.
Parameters:
- initial_usd: float, starting USD amount
- stop_loss_pct: float, stop loss as a fraction (e.g. 0.05 for 5%)
"""
import pandas as pd
df = self.data.copy().reset_index(drop=True)
df['timestamp'] = pd.to_datetime(df['timestamp'])
if len(df) == 0:
self.logger.warning("No data available for backtest.")
return {
"initial_usd": initial_usd,
"final_usd": initial_usd,
"n_trades": 0,
"win_rate": 0,
"max_drawdown": 0,
"avg_trade": 0,
"trade_log": [],
"first_trade": {},
"last_trade": {},
"trades": [],
}
# Get meta supertrend (all three agree)
supertrend_results_list = self._calculate_supertrend_indicators()
trends = [st['results']['trend'] for st in supertrend_results_list]
trends_arr = np.stack(trends, axis=1)
meta_trend = np.where((trends_arr[:,0] == trends_arr[:,1]) & (trends_arr[:,1] == trends_arr[:,2]),
trends_arr[:,0], 0)
# Precompute buy/sell signals
buy_signals = (meta_trend == 1) & (np.roll(meta_trend, 1) != 1)
sell_signals = (meta_trend == -1) & (np.roll(meta_trend, 1) != -1)
buy_signals[0] = False # Ignore first element due to np.roll
sell_signals[0] = False
position = 0 # 0 = no position, 1 = long
entry_price = 0
usd = initial_usd
coin = 0
trade_log = []
max_balance = initial_usd
drawdowns = []
trades = []
for i in range(1, len(df)):
if i % 100 == 0:
self.logger.debug(f"Progress: {i}/{len(df)} rows processed.")
price_open = df['open'].iloc[i]
price_high = df['high'].iloc[i]
price_low = df['low'].iloc[i]
price_close = df['close'].iloc[i]
date = df['timestamp'].iloc[i]
mt = meta_trend[i]
# Check stop loss if in position
if position == 1:
stop_price = entry_price * (1 - stop_loss_pct)
if price_low <= stop_price:
# Stop loss triggered
sell_price = stop_price
usd = coin * sell_price
trade_log.append({'type': 'STOP', 'entry': entry_price, 'exit': sell_price, 'entry_time': entry_time, 'exit_time': date})
coin = 0
position = 0
entry_price = 0
continue
# Entry logic
if position == 0 and mt == 1:
# Buy at open
coin = usd / price_open
entry_price = price_open
entry_time = date
usd = 0
position = 1
# Exit logic
elif position == 1 and mt == -1:
# Sell at open
usd = coin * price_open
trade_log.append({'type': 'SELL', 'entry': entry_price, 'exit': price_open, 'entry_time': entry_time, 'exit_time': date})
coin = 0
position = 0
entry_price = 0
# Track drawdown
balance = usd if position == 0 else coin * price_close
if balance > max_balance:
max_balance = balance
drawdown = (max_balance - balance) / max_balance
drawdowns.append(drawdown)
if i % 1000 == 0 or i == len(df) - 1:
self.logger.debug(f"Progress: {i}/{len(df)} rows processed.")
# If still in position at end, sell at last close
if position == 1:
usd = coin * df['close'].iloc[-1]
trade_log.append({'type': 'EOD', 'entry': entry_price, 'exit': df['close'].iloc[-1], 'entry_time': entry_time, 'exit_time': df['timestamp'].iloc[-1]})
coin = 0
position = 0
entry_price = 0
final_balance = usd
n_trades = len(trade_log)
wins = [1 for t in trade_log if t['exit'] > t['entry']]
win_rate = len(wins) / n_trades if n_trades > 0 else 0
max_drawdown = max(drawdowns) if drawdowns else 0
avg_trade = np.mean([t['exit']/t['entry']-1 for t in trade_log]) if trade_log else 0
trades = []
for trade in trade_log:
profit_pct = (trade['exit'] - trade['entry']) / trade['entry']
trades.append({
'entry_time': trade['entry_time'],
'exit_time': trade['exit_time'],
'entry': trade['entry'],
'exit': trade['exit'],
'profit_pct': profit_pct,
'type': trade.get('type', 'SELL')
})
results = {
"initial_usd": initial_usd,
"final_usd": final_balance,
"n_trades": n_trades,
"win_rate": win_rate,
"max_drawdown": max_drawdown,
"avg_trade": avg_trade,
"trade_log": trade_log,
"trades": trades,
}
if n_trades > 0:
results["first_trade"] = {
"entry_time": trade_log[0]['entry_time'],
"entry": trade_log[0]['entry']
}
results["last_trade"] = {
"exit_time": trade_log[-1]['exit_time'],
"exit": trade_log[-1]['exit']
}
return results