186 lines
7.2 KiB
Python
186 lines
7.2 KiB
Python
import pandas as pd
|
|
import numpy as np
|
|
import logging
|
|
from functools import lru_cache
|
|
|
|
@lru_cache(maxsize=32)
|
|
def cached_supertrend_calculation(period, multiplier, data_tuple):
|
|
high = np.array(data_tuple[0])
|
|
low = np.array(data_tuple[1])
|
|
close = np.array(data_tuple[2])
|
|
tr = np.zeros_like(close)
|
|
tr[0] = high[0] - low[0]
|
|
hc_range = np.abs(high[1:] - close[:-1])
|
|
lc_range = np.abs(low[1:] - close[:-1])
|
|
hl_range = high[1:] - low[1:]
|
|
tr[1:] = np.maximum.reduce([hl_range, hc_range, lc_range])
|
|
atr = np.zeros_like(tr)
|
|
atr[0] = tr[0]
|
|
multiplier_ema = 2.0 / (period + 1)
|
|
for i in range(1, len(tr)):
|
|
atr[i] = (tr[i] * multiplier_ema) + (atr[i-1] * (1 - multiplier_ema))
|
|
upper_band = np.zeros_like(close)
|
|
lower_band = np.zeros_like(close)
|
|
for i in range(len(close)):
|
|
hl_avg = (high[i] + low[i]) / 2
|
|
upper_band[i] = hl_avg + (multiplier * atr[i])
|
|
lower_band[i] = hl_avg - (multiplier * atr[i])
|
|
final_upper = np.zeros_like(close)
|
|
final_lower = np.zeros_like(close)
|
|
supertrend = np.zeros_like(close)
|
|
trend = np.zeros_like(close)
|
|
final_upper[0] = upper_band[0]
|
|
final_lower[0] = lower_band[0]
|
|
if close[0] <= upper_band[0]:
|
|
supertrend[0] = upper_band[0]
|
|
trend[0] = -1
|
|
else:
|
|
supertrend[0] = lower_band[0]
|
|
trend[0] = 1
|
|
for i in range(1, len(close)):
|
|
if (upper_band[i] < final_upper[i-1]) or (close[i-1] > final_upper[i-1]):
|
|
final_upper[i] = upper_band[i]
|
|
else:
|
|
final_upper[i] = final_upper[i-1]
|
|
if (lower_band[i] > final_lower[i-1]) or (close[i-1] < final_lower[i-1]):
|
|
final_lower[i] = lower_band[i]
|
|
else:
|
|
final_lower[i] = final_lower[i-1]
|
|
if supertrend[i-1] == final_upper[i-1] and close[i] <= final_upper[i]:
|
|
supertrend[i] = final_upper[i]
|
|
trend[i] = -1
|
|
elif supertrend[i-1] == final_upper[i-1] and close[i] > final_upper[i]:
|
|
supertrend[i] = final_lower[i]
|
|
trend[i] = 1
|
|
elif supertrend[i-1] == final_lower[i-1] and close[i] >= final_lower[i]:
|
|
supertrend[i] = final_lower[i]
|
|
trend[i] = 1
|
|
elif supertrend[i-1] == final_lower[i-1] and close[i] < final_lower[i]:
|
|
supertrend[i] = final_upper[i]
|
|
trend[i] = -1
|
|
return {
|
|
'supertrend': supertrend,
|
|
'trend': trend,
|
|
'upper_band': final_upper,
|
|
'lower_band': final_lower
|
|
}
|
|
|
|
def calculate_supertrend_external(data, period, multiplier):
|
|
high_tuple = tuple(data['high'])
|
|
low_tuple = tuple(data['low'])
|
|
close_tuple = tuple(data['close'])
|
|
return cached_supertrend_calculation(period, multiplier, (high_tuple, low_tuple, close_tuple))
|
|
|
|
class Supertrends:
|
|
def __init__(self, data, verbose=False, display=False):
|
|
self.data = data
|
|
self.verbose = verbose
|
|
logging.basicConfig(level=logging.INFO if verbose else logging.WARNING,
|
|
format='%(asctime)s - %(levelname)s - %(message)s')
|
|
self.logger = logging.getLogger('TrendDetectorSimple')
|
|
if not isinstance(self.data, pd.DataFrame):
|
|
if isinstance(self.data, list):
|
|
self.data = pd.DataFrame({'close': self.data})
|
|
else:
|
|
raise ValueError("Data must be a pandas DataFrame or a list")
|
|
|
|
def calculate_tr(self):
|
|
df = self.data.copy()
|
|
high = df['high'].values
|
|
low = df['low'].values
|
|
close = df['close'].values
|
|
tr = np.zeros_like(close)
|
|
tr[0] = high[0] - low[0]
|
|
for i in range(1, len(close)):
|
|
hl_range = high[i] - low[i]
|
|
hc_range = abs(high[i] - close[i-1])
|
|
lc_range = abs(low[i] - close[i-1])
|
|
tr[i] = max(hl_range, hc_range, lc_range)
|
|
return tr
|
|
|
|
def calculate_atr(self, period=14):
|
|
tr = self.calculate_tr()
|
|
atr = np.zeros_like(tr)
|
|
atr[0] = tr[0]
|
|
multiplier = 2.0 / (period + 1)
|
|
for i in range(1, len(tr)):
|
|
atr[i] = (tr[i] * multiplier) + (atr[i-1] * (1 - multiplier))
|
|
return atr
|
|
|
|
def calculate_supertrend(self, period=10, multiplier=3.0):
|
|
"""
|
|
Calculate SuperTrend indicator for the price data.
|
|
SuperTrend is a trend-following indicator that uses ATR to determine the trend direction.
|
|
Parameters:
|
|
- period: int, the period for the ATR calculation (default: 10)
|
|
- multiplier: float, the multiplier for the ATR (default: 3.0)
|
|
Returns:
|
|
- Dictionary containing SuperTrend values, trend direction, and upper/lower bands
|
|
"""
|
|
df = self.data.copy()
|
|
high = df['high'].values
|
|
low = df['low'].values
|
|
close = df['close'].values
|
|
atr = self.calculate_atr(period)
|
|
upper_band = np.zeros_like(close)
|
|
lower_band = np.zeros_like(close)
|
|
for i in range(len(close)):
|
|
hl_avg = (high[i] + low[i]) / 2
|
|
upper_band[i] = hl_avg + (multiplier * atr[i])
|
|
lower_band[i] = hl_avg - (multiplier * atr[i])
|
|
final_upper = np.zeros_like(close)
|
|
final_lower = np.zeros_like(close)
|
|
supertrend = np.zeros_like(close)
|
|
trend = np.zeros_like(close)
|
|
final_upper[0] = upper_band[0]
|
|
final_lower[0] = lower_band[0]
|
|
if close[0] <= upper_band[0]:
|
|
supertrend[0] = upper_band[0]
|
|
trend[0] = -1
|
|
else:
|
|
supertrend[0] = lower_band[0]
|
|
trend[0] = 1
|
|
for i in range(1, len(close)):
|
|
if (upper_band[i] < final_upper[i-1]) or (close[i-1] > final_upper[i-1]):
|
|
final_upper[i] = upper_band[i]
|
|
else:
|
|
final_upper[i] = final_upper[i-1]
|
|
if (lower_band[i] > final_lower[i-1]) or (close[i-1] < final_lower[i-1]):
|
|
final_lower[i] = lower_band[i]
|
|
else:
|
|
final_lower[i] = final_lower[i-1]
|
|
if supertrend[i-1] == final_upper[i-1] and close[i] <= final_upper[i]:
|
|
supertrend[i] = final_upper[i]
|
|
trend[i] = -1
|
|
elif supertrend[i-1] == final_upper[i-1] and close[i] > final_upper[i]:
|
|
supertrend[i] = final_lower[i]
|
|
trend[i] = 1
|
|
elif supertrend[i-1] == final_lower[i-1] and close[i] >= final_lower[i]:
|
|
supertrend[i] = final_lower[i]
|
|
trend[i] = 1
|
|
elif supertrend[i-1] == final_lower[i-1] and close[i] < final_lower[i]:
|
|
supertrend[i] = final_upper[i]
|
|
trend[i] = -1
|
|
supertrend_results = {
|
|
'supertrend': supertrend,
|
|
'trend': trend,
|
|
'upper_band': final_upper,
|
|
'lower_band': final_lower
|
|
}
|
|
return supertrend_results
|
|
|
|
def calculate_supertrend_indicators(self):
|
|
supertrend_params = [
|
|
{"period": 12, "multiplier": 3.0},
|
|
{"period": 10, "multiplier": 1.0},
|
|
{"period": 11, "multiplier": 2.0}
|
|
]
|
|
results = []
|
|
for p in supertrend_params:
|
|
result = self.calculate_supertrend(period=p["period"], multiplier=p["multiplier"])
|
|
results.append({
|
|
"results": result,
|
|
"params": p
|
|
})
|
|
return results
|