Compare commits

..

No commits in common. "5f03524d6ac28178fbcee9e240a4e52b58d8cffb" and "2fd73085b81ca3f657e1ff363e39b39459b74db1" have entirely different histories.

3 changed files with 232 additions and 117 deletions

View File

@ -27,9 +27,6 @@ class Backtest:
trends_arr = np.stack(trends, axis=1) trends_arr = np.stack(trends, axis=1)
meta_trend = np.where((trends_arr[:,0] == trends_arr[:,1]) & (trends_arr[:,1] == trends_arr[:,2]), meta_trend = np.where((trends_arr[:,0] == trends_arr[:,1]) & (trends_arr[:,1] == trends_arr[:,2]),
trends_arr[:,0], 0) trends_arr[:,0], 0)
# Shift meta_trend by one to avoid lookahead bias
meta_trend_signal = np.roll(meta_trend, 1)
meta_trend_signal[0] = 0 # or np.nan, but 0 means 'no signal' for first bar
position = 0 # 0 = no position, 1 = long position = 0 # 0 = no position, 1 = long
entry_price = 0 entry_price = 0
@ -48,8 +45,8 @@ class Backtest:
price_open = _df['open'].iloc[i] price_open = _df['open'].iloc[i]
price_close = _df['close'].iloc[i] price_close = _df['close'].iloc[i]
date = _df['timestamp'].iloc[i] date = _df['timestamp'].iloc[i]
prev_mt = meta_trend_signal[i-1] prev_mt = meta_trend[i-1]
curr_mt = meta_trend_signal[i] curr_mt = meta_trend[i]
# Check stop loss if in position # Check stop loss if in position
if position == 1: if position == 1:

View File

@ -1,30 +1,70 @@
import pandas as pd import pandas as pd
import numpy as np import numpy as np
import logging import logging
from scipy.signal import find_peaks
from matplotlib.patches import Rectangle
from scipy import stats
import concurrent.futures
from functools import partial
from functools import lru_cache from functools import lru_cache
import matplotlib.pyplot as plt
# Color configuration
# Plot colors
DARK_BG_COLOR = '#181C27'
LEGEND_BG_COLOR = '#333333'
TITLE_COLOR = 'white'
AXIS_LABEL_COLOR = 'white'
# Candlestick colors
CANDLE_UP_COLOR = '#089981' # Green
CANDLE_DOWN_COLOR = '#F23645' # Red
# Marker colors
MIN_COLOR = 'red'
MAX_COLOR = 'green'
# Line style colors
MIN_LINE_STYLE = 'g--' # Green dashed
MAX_LINE_STYLE = 'r--' # Red dashed
SMA7_LINE_STYLE = 'y-' # Yellow solid
SMA15_LINE_STYLE = 'm-' # Magenta solid
# SuperTrend colors
ST_COLOR_UP = 'g-'
ST_COLOR_DOWN = 'r-'
# Cache the calculation results by function parameters
@lru_cache(maxsize=32) @lru_cache(maxsize=32)
def cached_supertrend_calculation(period, multiplier, data_tuple): def cached_supertrend_calculation(period, multiplier, data_tuple):
# Convert tuple back to numpy arrays
high = np.array(data_tuple[0]) high = np.array(data_tuple[0])
low = np.array(data_tuple[1]) low = np.array(data_tuple[1])
close = np.array(data_tuple[2]) close = np.array(data_tuple[2])
# Calculate TR and ATR using vectorized operations
tr = np.zeros_like(close) tr = np.zeros_like(close)
tr[0] = high[0] - low[0] tr[0] = high[0] - low[0]
hc_range = np.abs(high[1:] - close[:-1]) hc_range = np.abs(high[1:] - close[:-1])
lc_range = np.abs(low[1:] - close[:-1]) lc_range = np.abs(low[1:] - close[:-1])
hl_range = high[1:] - low[1:] hl_range = high[1:] - low[1:]
tr[1:] = np.maximum.reduce([hl_range, hc_range, lc_range]) tr[1:] = np.maximum.reduce([hl_range, hc_range, lc_range])
# Use numpy's exponential moving average
atr = np.zeros_like(tr) atr = np.zeros_like(tr)
atr[0] = tr[0] atr[0] = tr[0]
multiplier_ema = 2.0 / (period + 1) multiplier_ema = 2.0 / (period + 1)
for i in range(1, len(tr)): for i in range(1, len(tr)):
atr[i] = (tr[i] * multiplier_ema) + (atr[i-1] * (1 - multiplier_ema)) atr[i] = (tr[i] * multiplier_ema) + (atr[i-1] * (1 - multiplier_ema))
# Calculate bands
upper_band = np.zeros_like(close) upper_band = np.zeros_like(close)
lower_band = np.zeros_like(close) lower_band = np.zeros_like(close)
for i in range(len(close)): for i in range(len(close)):
hl_avg = (high[i] + low[i]) / 2 hl_avg = (high[i] + low[i]) / 2
upper_band[i] = hl_avg + (multiplier * atr[i]) upper_band[i] = hl_avg + (multiplier * atr[i])
lower_band[i] = hl_avg - (multiplier * atr[i]) lower_band[i] = hl_avg - (multiplier * atr[i])
final_upper = np.zeros_like(close) final_upper = np.zeros_like(close)
final_lower = np.zeros_like(close) final_lower = np.zeros_like(close)
supertrend = np.zeros_like(close) supertrend = np.zeros_like(close)
@ -66,18 +106,76 @@ def cached_supertrend_calculation(period, multiplier, data_tuple):
} }
def calculate_supertrend_external(data, period, multiplier): def calculate_supertrend_external(data, period, multiplier):
# Convert DataFrame columns to hashable tuples
high_tuple = tuple(data['high']) high_tuple = tuple(data['high'])
low_tuple = tuple(data['low']) low_tuple = tuple(data['low'])
close_tuple = tuple(data['close']) close_tuple = tuple(data['close'])
# Call the cached function
return cached_supertrend_calculation(period, multiplier, (high_tuple, low_tuple, close_tuple)) return cached_supertrend_calculation(period, multiplier, (high_tuple, low_tuple, close_tuple))
class Supertrends: class Supertrends:
def __init__(self, data, verbose=False, display=False): def __init__(self, data, verbose=False, display=False):
"""
Initialize the TrendDetectorSimple class.
Parameters:
- data: pandas DataFrame containing price data
- verbose: boolean, whether to display detailed logging information
- display: boolean, whether to enable display/plotting features
"""
self.data = data self.data = data
self.verbose = verbose self.verbose = verbose
self.display = display
# Only define display-related variables if display is True
if self.display:
# Plot style configuration
self.plot_style = 'dark_background'
self.bg_color = DARK_BG_COLOR
self.plot_size = (12, 8)
# Candlestick configuration
self.candle_width = 0.6
self.candle_up_color = CANDLE_UP_COLOR
self.candle_down_color = CANDLE_DOWN_COLOR
self.candle_alpha = 0.8
self.wick_width = 1
# Marker configuration
self.min_marker = '^'
self.min_color = MIN_COLOR
self.min_size = 100
self.max_marker = 'v'
self.max_color = MAX_COLOR
self.max_size = 100
self.marker_zorder = 100
# Line configuration
self.line_width = 1
self.min_line_style = MIN_LINE_STYLE
self.max_line_style = MAX_LINE_STYLE
self.sma7_line_style = SMA7_LINE_STYLE
self.sma15_line_style = SMA15_LINE_STYLE
# Text configuration
self.title_size = 14
self.title_color = TITLE_COLOR
self.axis_label_size = 12
self.axis_label_color = AXIS_LABEL_COLOR
# Legend configuration
self.legend_loc = 'best'
self.legend_bg_color = LEGEND_BG_COLOR
# Configure logging
logging.basicConfig(level=logging.INFO if verbose else logging.WARNING, logging.basicConfig(level=logging.INFO if verbose else logging.WARNING,
format='%(asctime)s - %(levelname)s - %(message)s') format='%(asctime)s - %(levelname)s - %(message)s')
self.logger = logging.getLogger('TrendDetectorSimple') self.logger = logging.getLogger('TrendDetectorSimple')
# Convert data to pandas DataFrame if it's not already
if not isinstance(self.data, pd.DataFrame): if not isinstance(self.data, pd.DataFrame):
if isinstance(self.data, list): if isinstance(self.data, list):
self.data = pd.DataFrame({'close': self.data}) self.data = pd.DataFrame({'close': self.data})
@ -85,101 +183,154 @@ class Supertrends:
raise ValueError("Data must be a pandas DataFrame or a list") raise ValueError("Data must be a pandas DataFrame or a list")
def calculate_tr(self): def calculate_tr(self):
"""
Calculate True Range (TR) for the price data.
True Range is the greatest of:
1. Current high - current low
2. |Current high - previous close|
3. |Current low - previous close|
Returns:
- Numpy array of TR values
"""
df = self.data.copy() df = self.data.copy()
high = df['high'].values high = df['high'].values
low = df['low'].values low = df['low'].values
close = df['close'].values close = df['close'].values
tr = np.zeros_like(close) tr = np.zeros_like(close)
tr[0] = high[0] - low[0] tr[0] = high[0] - low[0] # First TR is just the first day's range
for i in range(1, len(close)): for i in range(1, len(close)):
# Current high - current low
hl_range = high[i] - low[i] hl_range = high[i] - low[i]
# |Current high - previous close|
hc_range = abs(high[i] - close[i-1]) hc_range = abs(high[i] - close[i-1])
# |Current low - previous close|
lc_range = abs(low[i] - close[i-1]) lc_range = abs(low[i] - close[i-1])
# TR is the maximum of these three values
tr[i] = max(hl_range, hc_range, lc_range) tr[i] = max(hl_range, hc_range, lc_range)
return tr return tr
def calculate_atr(self, period=14): def calculate_atr(self, period=14):
"""
Calculate Average True Range (ATR) for the price data.
ATR is the exponential moving average of the True Range over a specified period.
Parameters:
- period: int, the period for the ATR calculation (default: 14)
Returns:
- Numpy array of ATR values
"""
tr = self.calculate_tr() tr = self.calculate_tr()
atr = np.zeros_like(tr) atr = np.zeros_like(tr)
# First ATR value is just the first TR
atr[0] = tr[0] atr[0] = tr[0]
# Calculate exponential moving average (EMA) of TR
multiplier = 2.0 / (period + 1) multiplier = 2.0 / (period + 1)
for i in range(1, len(tr)): for i in range(1, len(tr)):
atr[i] = (tr[i] * multiplier) + (atr[i-1] * (1 - multiplier)) atr[i] = (tr[i] * multiplier) + (atr[i-1] * (1 - multiplier))
return atr return atr
def calculate_supertrend(self, period=10, multiplier=3.0): def detect_trends(self):
""" """
Calculate SuperTrend indicator for the price data. Detect trends by identifying local minima and maxima in the price data
SuperTrend is a trend-following indicator that uses ATR to determine the trend direction. using scipy.signal.find_peaks.
Parameters: Parameters:
- period: int, the period for the ATR calculation (default: 10) - prominence: float, required prominence of peaks (relative to the price range)
- multiplier: float, the multiplier for the ATR (default: 3.0) - width: int, required width of peaks in data points
Returns: Returns:
- Dictionary containing SuperTrend values, trend direction, and upper/lower bands - DataFrame with columns for timestamps, prices, and trend indicators
- Dictionary containing analysis results including linear regression, SMAs, and SuperTrend indicators
""" """
df = self.data.copy() df = self.data
high = df['high'].values # close_prices = df['close'].values
low = df['low'].values
close = df['close'].values # max_peaks, _ = find_peaks(close_prices)
atr = self.calculate_atr(period) # min_peaks, _ = find_peaks(-close_prices)
upper_band = np.zeros_like(close)
lower_band = np.zeros_like(close) # df['is_min'] = False
for i in range(len(close)): # df['is_max'] = False
hl_avg = (high[i] + low[i]) / 2
upper_band[i] = hl_avg + (multiplier * atr[i]) # for peak in max_peaks:
lower_band[i] = hl_avg - (multiplier * atr[i]) # df.at[peak, 'is_max'] = True
final_upper = np.zeros_like(close) # for peak in min_peaks:
final_lower = np.zeros_like(close) # df.at[peak, 'is_min'] = True
supertrend = np.zeros_like(close)
trend = np.zeros_like(close) # result = df[['timestamp', 'close', 'is_min', 'is_max']].copy()
final_upper[0] = upper_band[0]
final_lower[0] = lower_band[0] # Perform linear regression on min_peaks and max_peaks
if close[0] <= upper_band[0]: # min_prices = df['close'].iloc[min_peaks].values
supertrend[0] = upper_band[0] # max_prices = df['close'].iloc[max_peaks].values
trend[0] = -1
else: # Linear regression for min peaks if we have at least 2 points
supertrend[0] = lower_band[0] # min_slope, min_intercept, min_r_value, _, _ = stats.linregress(min_peaks, min_prices)
trend[0] = 1 # Linear regression for max peaks if we have at least 2 points
for i in range(1, len(close)): # max_slope, max_intercept, max_r_value, _, _ = stats.linregress(max_peaks, max_prices)
if (upper_band[i] < final_upper[i-1]) or (close[i-1] > final_upper[i-1]):
final_upper[i] = upper_band[i]
else:
final_upper[i] = final_upper[i-1]
if (lower_band[i] > final_lower[i-1]) or (close[i-1] < final_lower[i-1]):
final_lower[i] = lower_band[i]
else:
final_lower[i] = final_lower[i-1]
if supertrend[i-1] == final_upper[i-1] and close[i] <= final_upper[i]:
supertrend[i] = final_upper[i]
trend[i] = -1
elif supertrend[i-1] == final_upper[i-1] and close[i] > final_upper[i]:
supertrend[i] = final_lower[i]
trend[i] = 1
elif supertrend[i-1] == final_lower[i-1] and close[i] >= final_lower[i]:
supertrend[i] = final_lower[i]
trend[i] = 1
elif supertrend[i-1] == final_lower[i-1] and close[i] < final_lower[i]:
supertrend[i] = final_upper[i]
trend[i] = -1
supertrend_results = {
'supertrend': supertrend,
'trend': trend,
'upper_band': final_upper,
'lower_band': final_lower
}
return supertrend_results
# Calculate Simple Moving Averages (SMA) for 7 and 15 periods
# sma_7 = pd.Series(close_prices).rolling(window=7, min_periods=1).mean().values
# sma_15 = pd.Series(close_prices).rolling(window=15, min_periods=1).mean().values
analysis_results = {}
# analysis_results['linear_regression'] = {
# 'min': {
# 'slope': min_slope,
# 'intercept': min_intercept,
# 'r_squared': min_r_value ** 2
# },
# 'max': {
# 'slope': max_slope,
# 'intercept': max_intercept,
# 'r_squared': max_r_value ** 2
# }
# }
# analysis_results['sma'] = {
# '7': sma_7,
# '15': sma_15
# }
# Calculate SuperTrend indicators
supertrend_results_list = self._calculate_supertrend_indicators()
analysis_results['supertrend'] = supertrend_results_list
return analysis_results
def calculate_supertrend_indicators(self): def calculate_supertrend_indicators(self):
"""
Calculate SuperTrend indicators with different parameter sets in parallel.
Returns:
- list, the SuperTrend results
"""
supertrend_params = [ supertrend_params = [
{"period": 12, "multiplier": 3.0}, {"period": 12, "multiplier": 3.0, "color_up": ST_COLOR_UP, "color_down": ST_COLOR_DOWN},
{"period": 10, "multiplier": 1.0}, {"period": 10, "multiplier": 1.0, "color_up": ST_COLOR_UP, "color_down": ST_COLOR_DOWN},
{"period": 11, "multiplier": 2.0} {"period": 11, "multiplier": 2.0, "color_up": ST_COLOR_UP, "color_down": ST_COLOR_DOWN}
] ]
data = self.data.copy()
# For just 3 calculations, direct calculation might be faster than process pool
results = [] results = []
for p in supertrend_params: for p in supertrend_params:
result = self.calculate_supertrend(period=p["period"], multiplier=p["multiplier"]) result = calculate_supertrend_external(data, p["period"], p["multiplier"])
results.append({ results.append(result)
supertrend_results_list = []
for params, result in zip(supertrend_params, results):
supertrend_results_list.append({
"results": result, "results": result,
"params": p "params": params
}) })
return results return supertrend_results_list

57
main.py
View File

@ -6,6 +6,7 @@ import os
import datetime import datetime
import argparse import argparse
import json import json
import ast
from cycles.utils.storage import Storage from cycles.utils.storage import Storage
from cycles.utils.system import SystemUtils from cycles.utils.system import SystemUtils
@ -47,7 +48,6 @@ def process_timeframe_data(min1_df, df, stop_loss_pcts, rule_name, initial_usd,
cumulative_profit = 0 cumulative_profit = 0
max_drawdown = 0 max_drawdown = 0
peak = 0 peak = 0
for trade in trades: for trade in trades:
cumulative_profit += trade['profit_pct'] cumulative_profit += trade['profit_pct']
if cumulative_profit > peak: if cumulative_profit > peak:
@ -55,14 +55,10 @@ def process_timeframe_data(min1_df, df, stop_loss_pcts, rule_name, initial_usd,
drawdown = peak - cumulative_profit drawdown = peak - cumulative_profit
if drawdown > max_drawdown: if drawdown > max_drawdown:
max_drawdown = drawdown max_drawdown = drawdown
final_usd = initial_usd final_usd = initial_usd
for trade in trades: for trade in trades:
final_usd *= (1 + trade['profit_pct']) final_usd *= (1 + trade['profit_pct'])
total_fees_usd = sum(trade.get('fee_usd', 0.0) for trade in trades)
total_fees_usd = sum(trade['fee_usd'] for trade in trades)
row = { row = {
"timeframe": rule_name, "timeframe": rule_name,
"stop_loss_pct": stop_loss_pct, "stop_loss_pct": stop_loss_pct,
@ -79,7 +75,6 @@ def process_timeframe_data(min1_df, df, stop_loss_pcts, rule_name, initial_usd,
"total_fees_usd": total_fees_usd, "total_fees_usd": total_fees_usd,
} }
results_rows.append(row) results_rows.append(row)
for trade in trades: for trade in trades:
trade_rows.append({ trade_rows.append({
"timeframe": rule_name, "timeframe": rule_name,
@ -92,9 +87,7 @@ def process_timeframe_data(min1_df, df, stop_loss_pcts, rule_name, initial_usd,
"type": trade.get("type"), "type": trade.get("type"),
"fee_usd": trade.get("fee_usd"), "fee_usd": trade.get("fee_usd"),
}) })
logging.info(f"Timeframe: {rule_name}, Stop Loss: {stop_loss_pct}, Trades: {n_trades}") logging.info(f"Timeframe: {rule_name}, Stop Loss: {stop_loss_pct}, Trades: {n_trades}")
if debug: if debug:
for trade in trades: for trade in trades:
if trade['type'] == 'STOP': if trade['type'] == 'STOP':
@ -102,16 +95,13 @@ def process_timeframe_data(min1_df, df, stop_loss_pcts, rule_name, initial_usd,
for trade in trades: for trade in trades:
if trade['profit_pct'] < -0.09: # or whatever is close to -0.10 if trade['profit_pct'] < -0.09: # or whatever is close to -0.10
print("Large loss trade:", trade) print("Large loss trade:", trade)
return results_rows, trade_rows return results_rows, trade_rows
def process(timeframe_info, debug=False): def process(timeframe_info, debug=False):
from cycles.utils.storage import Storage # import inside function for safety """Process a single (timeframe, stop_loss_pct) combination (no monthly split)"""
storage = Storage(logging=None) # or pass a logger if you want, but None is safest for multiprocessing
rule, data_1min, stop_loss_pct, initial_usd = timeframe_info rule, data_1min, stop_loss_pct, initial_usd = timeframe_info
if rule == "1T" or rule == "1min": if rule == "1T":
df = data_1min.copy() df = data_1min.copy()
else: else:
df = data_1min.resample(rule).agg({ df = data_1min.resample(rule).agg({
@ -122,33 +112,7 @@ def process(timeframe_info, debug=False):
'volume': 'sum' 'volume': 'sum'
}).dropna() }).dropna()
df = df.reset_index() df = df.reset_index()
results_rows, all_trade_rows = process_timeframe_data(data_1min, df, [stop_loss_pct], rule, initial_usd, debug=debug) results_rows, all_trade_rows = process_timeframe_data(data_1min, df, [stop_loss_pct], rule, initial_usd, debug=debug)
if all_trade_rows:
trades_fieldnames = ["entry_time", "exit_time", "entry_price", "exit_price", "profit_pct", "type", "fee_usd"]
# Prepare header
summary_fields = ["timeframe", "stop_loss_pct", "n_trades", "n_stop_loss", "win_rate", "max_drawdown", "avg_trade", "profit_ratio", "final_usd"]
summary_row = results_rows[0]
header_line = "\t".join(summary_fields) + "\n"
value_line = "\t".join(str(summary_row.get(f, "")) for f in summary_fields) + "\n"
# File name
tf = summary_row["timeframe"]
sl = summary_row["stop_loss_pct"]
sl_percent = int(round(sl * 100))
trades_filename = os.path.join(storage.results_dir, f"trades_{tf}_ST{sl_percent}pct.csv")
# Write header
with open(trades_filename, "w") as f:
f.write(header_line)
f.write(value_line)
# Now write trades (append mode, skip header)
with open(trades_filename, "a", newline="") as f:
import csv
writer = csv.DictWriter(f, fieldnames=trades_fieldnames)
writer.writeheader()
for trade in all_trade_rows:
writer.writerow({k: trade.get(k, "") for k in trades_fieldnames})
return results_rows, all_trade_rows return results_rows, all_trade_rows
def aggregate_results(all_rows): def aggregate_results(all_rows):
@ -162,6 +126,7 @@ def aggregate_results(all_rows):
summary_rows = [] summary_rows = []
for (rule, stop_loss_pct), rows in grouped.items(): for (rule, stop_loss_pct), rows in grouped.items():
n_months = len(rows)
total_trades = sum(r['n_trades'] for r in rows) total_trades = sum(r['n_trades'] for r in rows)
total_stop_loss = sum(r['n_stop_loss'] for r in rows) total_stop_loss = sum(r['n_stop_loss'] for r in rows)
avg_win_rate = np.mean([r['win_rate'] for r in rows]) avg_win_rate = np.mean([r['win_rate'] for r in rows])
@ -198,7 +163,7 @@ def get_nearest_price(df, target_date):
return nearest_time, price return nearest_time, price
if __name__ == "__main__": if __name__ == "__main__":
debug = False debug = True
parser = argparse.ArgumentParser(description="Run backtest with config file.") parser = argparse.ArgumentParser(description="Run backtest with config file.")
parser.add_argument("config", type=str, nargs="?", help="Path to config JSON file.") parser.add_argument("config", type=str, nargs="?", help="Path to config JSON file.")
@ -206,11 +171,11 @@ if __name__ == "__main__":
# Default values (from config.json) # Default values (from config.json)
default_config = { default_config = {
"start_date": "2025-05-01", "start_date": "2024-05-15",
"stop_date": datetime.datetime.today().strftime('%Y-%m-%d'), "stop_date": datetime.datetime.today().strftime('%Y-%m-%d'),
"initial_usd": 10000, "initial_usd": 10000,
"timeframes": ["1D", "6h", "3h", "1h", "30m", "15m", "5m", "1m"], "timeframes": ["1D"],
"stop_loss_pcts": [0.01, 0.02, 0.03, 0.05], "stop_loss_pcts": [0.01, 0.02, 0.03],
} }
if args.config: if args.config:
@ -273,7 +238,6 @@ if __name__ == "__main__":
if debug: if debug:
all_results_rows = [] all_results_rows = []
all_trade_rows = [] all_trade_rows = []
for task in tasks: for task in tasks:
results, trades = process(task, debug) results, trades = process(task, debug)
if results or trades: if results or trades:
@ -299,4 +263,7 @@ if __name__ == "__main__":
] ]
storage.write_backtest_results(backtest_filename, backtest_fieldnames, all_results_rows, metadata_lines) storage.write_backtest_results(backtest_filename, backtest_fieldnames, all_results_rows, metadata_lines)
trades_fieldnames = ["entry_time", "exit_time", "entry_price", "exit_price", "profit_pct", "type", "fee_usd"]
storage.write_trades(all_trade_rows, trades_fieldnames)