Cycles/cycles/supertrend.py

216 lines
8.5 KiB
Python

import pandas as pd
import numpy as np
import logging
from functools import lru_cache
@lru_cache(maxsize=32)
def cached_supertrend_calculation(period, multiplier, data_tuple):
high = np.array(data_tuple[0])
low = np.array(data_tuple[1])
close = np.array(data_tuple[2])
tr = np.zeros_like(close)
tr[0] = high[0] - low[0]
hc_range = np.abs(high[1:] - close[:-1])
lc_range = np.abs(low[1:] - close[:-1])
hl_range = high[1:] - low[1:]
tr[1:] = np.maximum.reduce([hl_range, hc_range, lc_range])
atr = np.zeros_like(tr)
atr[0] = tr[0]
multiplier_ema = 2.0 / (period + 1)
for i in range(1, len(tr)):
atr[i] = (tr[i] * multiplier_ema) + (atr[i-1] * (1 - multiplier_ema))
upper_band = np.zeros_like(close)
lower_band = np.zeros_like(close)
for i in range(len(close)):
hl_avg = (high[i] + low[i]) / 2
upper_band[i] = hl_avg + (multiplier * atr[i])
lower_band[i] = hl_avg - (multiplier * atr[i])
final_upper = np.zeros_like(close)
final_lower = np.zeros_like(close)
supertrend = np.zeros_like(close)
trend = np.zeros_like(close)
final_upper[0] = upper_band[0]
final_lower[0] = lower_band[0]
if close[0] <= upper_band[0]:
supertrend[0] = upper_band[0]
trend[0] = -1
else:
supertrend[0] = lower_band[0]
trend[0] = 1
for i in range(1, len(close)):
if (upper_band[i] < final_upper[i-1]) or (close[i-1] > final_upper[i-1]):
final_upper[i] = upper_band[i]
else:
final_upper[i] = final_upper[i-1]
if (lower_band[i] > final_lower[i-1]) or (close[i-1] < final_lower[i-1]):
final_lower[i] = lower_band[i]
else:
final_lower[i] = final_lower[i-1]
if supertrend[i-1] == final_upper[i-1] and close[i] <= final_upper[i]:
supertrend[i] = final_upper[i]
trend[i] = -1
elif supertrend[i-1] == final_upper[i-1] and close[i] > final_upper[i]:
supertrend[i] = final_lower[i]
trend[i] = 1
elif supertrend[i-1] == final_lower[i-1] and close[i] >= final_lower[i]:
supertrend[i] = final_lower[i]
trend[i] = 1
elif supertrend[i-1] == final_lower[i-1] and close[i] < final_lower[i]:
supertrend[i] = final_upper[i]
trend[i] = -1
return {
'supertrend': supertrend,
'trend': trend,
'upper_band': final_upper,
'lower_band': final_lower
}
def calculate_supertrend_external(data, period, multiplier, close_column='close'):
"""
External function to calculate SuperTrend with configurable close column
Parameters:
- data: DataFrame with OHLC data
- period: int, period for ATR calculation
- multiplier: float, multiplier for ATR
- close_column: str, name of the column to use as close price (default: 'close')
"""
high_tuple = tuple(data['high'])
low_tuple = tuple(data['low'])
close_tuple = tuple(data[close_column])
return cached_supertrend_calculation(period, multiplier, (high_tuple, low_tuple, close_tuple))
class Supertrends:
def __init__(self, data, close_column='close', verbose=False, display=False):
"""
Initialize Supertrends calculator
Parameters:
- data: pandas DataFrame with OHLC data or list of prices
- close_column: str, name of the column to use as close price (default: 'close')
- verbose: bool, enable verbose logging
- display: bool, display mode (currently unused)
"""
self.close_column = close_column
self.data = data
self.verbose = verbose
logging.basicConfig(level=logging.INFO if verbose else logging.WARNING,
format='%(asctime)s - %(levelname)s - %(message)s')
self.logger = logging.getLogger('TrendDetectorSimple')
if not isinstance(self.data, pd.DataFrame):
if isinstance(self.data, list):
self.data = pd.DataFrame({self.close_column: self.data})
else:
raise ValueError("Data must be a pandas DataFrame or a list")
# Validate that required columns exist
required_columns = ['high', 'low', self.close_column]
missing_columns = [col for col in required_columns if col not in self.data.columns]
if missing_columns:
raise ValueError(f"Missing required columns: {missing_columns}")
def calculate_tr(self):
"""Calculate True Range using the configured close column"""
df = self.data.copy()
high = df['high'].values
low = df['low'].values
close = df[self.close_column].values
tr = np.zeros_like(close)
tr[0] = high[0] - low[0]
for i in range(1, len(close)):
hl_range = high[i] - low[i]
hc_range = abs(high[i] - close[i-1])
lc_range = abs(low[i] - close[i-1])
tr[i] = max(hl_range, hc_range, lc_range)
return tr
def calculate_atr(self, period=14):
"""Calculate Average True Range"""
tr = self.calculate_tr()
atr = np.zeros_like(tr)
atr[0] = tr[0]
multiplier = 2.0 / (period + 1)
for i in range(1, len(tr)):
atr[i] = (tr[i] * multiplier) + (atr[i-1] * (1 - multiplier))
return atr
def calculate_supertrend(self, period=10, multiplier=3.0):
"""
Calculate SuperTrend indicator for the price data using the configured close column.
SuperTrend is a trend-following indicator that uses ATR to determine the trend direction.
Parameters:
- period: int, the period for the ATR calculation (default: 10)
- multiplier: float, the multiplier for the ATR (default: 3.0)
Returns:
- Dictionary containing SuperTrend values, trend direction, and upper/lower bands
"""
df = self.data.copy()
high = df['high'].values
low = df['low'].values
close = df[self.close_column].values
atr = self.calculate_atr(period)
upper_band = np.zeros_like(close)
lower_band = np.zeros_like(close)
for i in range(len(close)):
hl_avg = (high[i] + low[i]) / 2
upper_band[i] = hl_avg + (multiplier * atr[i])
lower_band[i] = hl_avg - (multiplier * atr[i])
final_upper = np.zeros_like(close)
final_lower = np.zeros_like(close)
supertrend = np.zeros_like(close)
trend = np.zeros_like(close)
final_upper[0] = upper_band[0]
final_lower[0] = lower_band[0]
if close[0] <= upper_band[0]:
supertrend[0] = upper_band[0]
trend[0] = -1
else:
supertrend[0] = lower_band[0]
trend[0] = 1
for i in range(1, len(close)):
if (upper_band[i] < final_upper[i-1]) or (close[i-1] > final_upper[i-1]):
final_upper[i] = upper_band[i]
else:
final_upper[i] = final_upper[i-1]
if (lower_band[i] > final_lower[i-1]) or (close[i-1] < final_lower[i-1]):
final_lower[i] = lower_band[i]
else:
final_lower[i] = final_lower[i-1]
if supertrend[i-1] == final_upper[i-1] and close[i] <= final_upper[i]:
supertrend[i] = final_upper[i]
trend[i] = -1
elif supertrend[i-1] == final_upper[i-1] and close[i] > final_upper[i]:
supertrend[i] = final_lower[i]
trend[i] = 1
elif supertrend[i-1] == final_lower[i-1] and close[i] >= final_lower[i]:
supertrend[i] = final_lower[i]
trend[i] = 1
elif supertrend[i-1] == final_lower[i-1] and close[i] < final_lower[i]:
supertrend[i] = final_upper[i]
trend[i] = -1
supertrend_results = {
'supertrend': supertrend,
'trend': trend,
'upper_band': final_upper,
'lower_band': final_lower
}
return supertrend_results
def calculate_supertrend_indicators(self):
supertrend_params = [
{"period": 12, "multiplier": 3.0},
{"period": 10, "multiplier": 1.0},
{"period": 11, "multiplier": 2.0}
]
results = []
for p in supertrend_params:
result = self.calculate_supertrend(period=p["period"], multiplier=p["multiplier"])
results.append({
"results": result,
"params": p
})
return results