OKXTrading/technicalanalyzer.py
2025-11-05 19:39:02 +08:00

610 lines
25 KiB
Python

import logging
import numpy as np
logger = logging.getLogger(__name__)
class TechnicalAnalyzer:
"""Technical Analyzer"""
@staticmethod
def calculate_indicators(df):
"""Calculate technical indicators (optimized version)"""
if df is None or len(df) < 20: # Lower minimum data requirement
logger.warning("Insufficient data, unable to calculate technical indicators")
return {}
try:
indicators = {}
# KDJ indicator
k, d, j = TechnicalAnalyzer.calculate_kdj(df)
if k is not None:
indicators['kdj'] = (k, d, j)
# RSI indicator (multiple time periods)
rsi_14 = TechnicalAnalyzer.calculate_rsi(df['close'], 14)
rsi_7 = TechnicalAnalyzer.calculate_rsi(df['close'], 7)
rsi_21 = TechnicalAnalyzer.calculate_rsi(df['close'], 21)
if rsi_14 is not None:
indicators['rsi'] = rsi_14
indicators['rsi_7'] = rsi_7
indicators['rsi_21'] = rsi_21
# ATR indicator
atr = TechnicalAnalyzer.calculate_atr(df)
if atr is not None:
indicators['atr'] = atr
# MACD indicator
macd_line, signal_line, histogram = TechnicalAnalyzer.calculate_macd(df['close'])
if macd_line is not None:
indicators['macd'] = (macd_line, signal_line, histogram)
# Bollinger Bands (multiple standard deviations)
upper1, middle1, lower1 = TechnicalAnalyzer.calculate_bollinger_bands(df['close'], 20, 1)
upper2, middle2, lower2 = TechnicalAnalyzer.calculate_bollinger_bands(df['close'], 20, 2)
if upper1 is not None:
indicators['bollinger_1std'] = (upper1, middle1, lower1)
indicators['bollinger_2std'] = (upper2, middle2, lower2)
# Moving Averages (multiple periods)
sma_20 = TechnicalAnalyzer.calculate_sma(df['close'], 20)
sma_50 = TechnicalAnalyzer.calculate_sma(df['close'], 50)
sma_100 = TechnicalAnalyzer.calculate_sma(df['close'], 100)
ema_12 = TechnicalAnalyzer.calculate_ema(df['close'], 12)
ema_26 = TechnicalAnalyzer.calculate_ema(df['close'], 26)
if sma_20 is not None:
indicators['sma_20'] = sma_20
indicators['sma_50'] = sma_50
indicators['sma_100'] = sma_100
indicators['ema_12'] = ema_12
indicators['ema_26'] = ema_26
# Stochastic Oscillator
k, d = TechnicalAnalyzer.calculate_stochastic(df)
if k is not None:
indicators['stochastic'] = (k, d)
# Trend strength
indicators['trend_strength'] = TechnicalAnalyzer.calculate_trend_strength(df)
# Volatility
indicators['volatility'] = TechnicalAnalyzer.calculate_volatility(df)
# Current price
indicators['current_price'] = df['close'].iloc[-1]
return indicators
except Exception as e:
logger.error(f"Error calculating technical indicators: {e}")
return {}
@staticmethod
def calculate_ema(prices, period):
"""Calculate Exponential Moving Average"""
try:
return prices.ewm(span=period, adjust=False).mean()
except Exception as e:
logger.error(f"Error calculating EMA: {e}")
return None
@staticmethod
def calculate_trend_strength(df, period=20):
"""Calculate trend strength"""
try:
if len(df) < period:
return 0
# Use linear regression to calculate trend strength
x = np.arange(len(df))
y = df['close'].values
# Linear regression
slope, intercept = np.polyfit(x[-period:], y[-period:], 1)
# Calculate R² value as trend strength
y_pred = slope * x[-period:] + intercept
ss_res = np.sum((y[-period:] - y_pred) ** 2)
ss_tot = np.sum((y[-period:] - np.mean(y[-period:])) ** 2)
r_squared = 1 - (ss_res / ss_tot) if ss_tot != 0 else 0
return abs(r_squared) # Take absolute value, trend strength doesn't distinguish positive/negative
except Exception as e:
logger.error(f"Error calculating trend strength: {e}")
return 0
@staticmethod
def detect_rsi_divergence(df, rsi_period=14):
"""Detect RSI bullish divergence"""
try:
if len(df) < 30: # Need sufficient data
return False
# Calculate RSI
rsi = TechnicalAnalyzer.calculate_rsi(df['close'], rsi_period)
if rsi is None:
return False
# Use pandas methods to simplify calculation
# Find lowest points in recent 10 periods
recent_lows = df['low'].tail(10)
recent_rsi = rsi.tail(10)
# Find price lowest point and RSI lowest point
min_price_idx = recent_lows.idxmin()
min_rsi_idx = recent_rsi.idxmin()
# If price lowest point appears later than RSI lowest point, possible bullish divergence
if min_price_idx > min_rsi_idx:
# Check if price is making new lows while RSI is rising
price_trend = (recent_lows.iloc[-1] < recent_lows.iloc[-5])
rsi_trend = (recent_rsi.iloc[-1] > recent_rsi.iloc[-5])
return price_trend and rsi_trend and recent_rsi.iloc[-1] < 40
return False
except Exception as e:
logger.error(f"RSI divergence detection error: {e}")
return False
@staticmethod
def detect_macd_divergence(df):
"""Detect MACD bullish divergence"""
try:
if len(df) < 30:
return False
# Calculate MACD, especially focus on histogram
_, _, histogram = TechnicalAnalyzer.calculate_macd(df['close'])
if histogram is None:
return False
# Use recent data
recent_lows = df['low'].tail(10)
recent_hist = histogram.tail(10)
# Find price lowest point and MACD histogram lowest point
min_price_idx = recent_lows.idxmin()
min_hist_idx = recent_hist.idxmin()
# If price lowest point appears later than histogram lowest point, possible bullish divergence
if min_price_idx > min_hist_idx:
# Check price trend and histogram trend
price_trend = (recent_lows.iloc[-1] < recent_lows.iloc[-5])
hist_trend = (recent_hist.iloc[-1] > recent_hist.iloc[-5])
# Histogram rising from negative area is stronger signal
hist_improving = recent_hist.iloc[-1] > recent_hist.iloc[-3]
return price_trend and hist_trend and hist_improving
return False
except Exception as e:
logger.error(f"MACD divergence detection error: {e}")
return False
@staticmethod
def volume_confirmation(df, period=5):
"""Bottom volume confirmation"""
try:
if len(df) < period + 5:
return False
current_volume = df['volume'].iloc[-1]
avg_volume = df['volume'].tail(period).mean()
# Current volume significantly higher than average
volume_ratio = current_volume / avg_volume if avg_volume > 0 else 1
# Price falling but volume increasing (possible bottom accumulation)
price_change = (df['close'].iloc[-1] - df['close'].iloc[-2]) / df['close'].iloc[-2]
if volume_ratio > 1.5 and price_change < 0:
return True
return False
except Exception as e:
logger.error(f"Volume confirmation error: {e}")
return False
@staticmethod
def detect_hammer_pattern(df, lookback=5):
"""Detect hammer reversal pattern"""
try:
if len(df) < lookback + 1:
return False
latest = df.iloc[-1]
body_size = abs(latest['close'] - latest['open'])
total_range = latest['high'] - latest['low']
# Avoid division by zero
if total_range == 0:
return False
# Hammer characteristics: lower shadow at least 2x body, very short upper shadow
lower_shadow = min(latest['open'], latest['close']) - latest['low']
upper_shadow = latest['high'] - max(latest['open'], latest['close'])
is_hammer = (lower_shadow >= 2 * body_size and
upper_shadow <= body_size * 0.5 and
body_size > 0)
# Needs to be in downtrend
prev_trend = df['close'].iloc[-lookback] > df['close'].iloc[-1]
return is_hammer and prev_trend
except Exception as e:
logger.error(f"Hammer pattern detection error: {e}")
return False
@staticmethod
def detect_reversal_patterns(df):
"""Detect bottom reversal patterns"""
reversal_signals = []
try:
# RSI bullish divergence
if TechnicalAnalyzer.detect_rsi_divergence(df):
reversal_signals.append({
'type': 'RSI_DIVERGENCE_BULLISH',
'action': 'BUY',
'strength': 'MEDIUM',
'description': 'RSI bullish divergence, momentum divergence bullish'
})
# MACD bullish divergence
if TechnicalAnalyzer.detect_macd_divergence(df):
reversal_signals.append({
'type': 'MACD_DIVERGENCE_BULLISH',
'action': 'BUY',
'strength': 'MEDIUM',
'description': 'MACD histogram bullish divergence, momentum strengthening'
})
# Volume confirmation
if TechnicalAnalyzer.volume_confirmation(df):
reversal_signals.append({
'type': 'VOLUME_CONFIRMATION',
'action': 'BUY',
'strength': 'LOW',
'description': 'Bottom volume increase, signs of capital entry'
})
# Add hammer pattern detection
if TechnicalAnalyzer.detect_hammer_pattern(df):
reversal_signals.append({
'type': 'HAMMER_PATTERN',
'action': 'BUY',
'strength': 'LOW',
'description': 'Hammer pattern, short-term reversal signal'
})
except Exception as e:
logger.error(f"Reversal pattern detection error: {e}")
return reversal_signals
@staticmethod
def calculate_support_resistance(df, period=20):
"""Calculate support resistance levels (fixed version)"""
try:
# Recent high resistance
resistance = df['high'].rolling(window=period).max().iloc[-1]
# Recent low support
support = df['low'].rolling(window=period).min().iloc[-1]
# Dynamic support resistance (based on Bollinger Bands)
# Fix: correctly receive three return values from Bollinger Bands
upper_bb, middle_bb, lower_bb = TechnicalAnalyzer.calculate_bollinger_bands(df['close'], 20, 2)
# Check if Bollinger Band calculation successful
if upper_bb is not None and lower_bb is not None:
resistance_bb = upper_bb.iloc[-1] if hasattr(upper_bb, 'iloc') else upper_bb
support_bb = lower_bb.iloc[-1] if hasattr(lower_bb, 'iloc') else lower_bb
else:
# If Bollinger Band calculation fails, use static values as backup
resistance_bb = resistance
support_bb = support
current_price = df['close'].iloc[-1]
return {
'static_resistance': resistance,
'static_support': support,
'dynamic_resistance': resistance_bb,
'dynamic_support': support_bb,
'current_vs_resistance': (current_price - resistance) / resistance * 100 if resistance > 0 else 0,
'current_vs_support': (current_price - support) / support * 100 if support > 0 else 0
}
except Exception as e:
logger.error(f"Error calculating support resistance: {e}")
# Return default values to avoid subsequent errors
current_price = df['close'].iloc[-1] if len(df) > 0 else 0
return {
'static_resistance': current_price * 1.1 if current_price > 0 else 0,
'static_support': current_price * 0.9 if current_price > 0 else 0,
'dynamic_resistance': current_price * 1.05 if current_price > 0 else 0,
'dynamic_support': current_price * 0.95 if current_price > 0 else 0,
'current_vs_resistance': 0,
'current_vs_support': 0
}
@staticmethod
def calculate_volatility(df, period=20):
"""Calculate volatility"""
try:
returns = df['close'].pct_change().dropna()
volatility = returns.rolling(window=period).std() * np.sqrt(365) # Annualized volatility
return volatility.iloc[-1] if len(volatility) > 0 else 0
except Exception as e:
logger.error(f"Error calculating volatility: {e}")
return 0
@staticmethod
def generate_weighted_signals(df):
"""Generate weighted technical signals"""
signals = []
weights = {
'KDJ_GOLDEN_CROSS': 0.15,
'MACD_GOLDEN_CROSS': 0.20,
'MA_GOLDEN_CROSS': 0.25,
'RSI_OVERSOLD': 0.15,
'BOLLINGER_LOWER_TOUCH': 0.15,
'STOCH_GOLDEN_CROSS': 0.10
}
# Calculate various indicator signals
technical_signals = TechnicalAnalyzer.generate_signals(df)
# Calculate weighted score
total_score = 0
signal_count = 0
for signal in technical_signals:
weight = weights.get(signal['type'], 0.05)
strength_multiplier = 1.0 if signal['strength'] == 'STRONG' else 0.7
total_score += weight * strength_multiplier
signal_count += 1
# Normalize score
normalized_score = min(1.0, total_score)
# Determine action
if normalized_score > 0.6:
action = 'BUY'
elif normalized_score < 0.3:
action = 'SELL'
else:
action = 'HOLD'
return {
'score': normalized_score,
'action': action,
'signal_count': signal_count,
'signals': technical_signals,
'confidence': 'HIGH' if normalized_score > 0.7 or normalized_score < 0.2 else 'MEDIUM'
}
@staticmethod
def calculate_kdj(df, n=9, m1=3, m2=3):
"""Calculate KDJ indicator"""
try:
if len(df) < n:
return pd.Series([np.nan] * len(df)), pd.Series([np.nan] * len(df)), pd.Series([np.nan] * len(df))
low_list = df['low'].rolling(window=n, min_periods=1).min()
high_list = df['high'].rolling(window=n, min_periods=1).max()
# Avoid division by zero error
denominator = high_list - low_list
denominator = denominator.replace(0, np.nan)
rsv = ((df['close'] - low_list) / denominator) * 100
rsv = rsv.fillna(50)
k_series = rsv.ewm(span=m1-1, adjust=False).mean()
d_series = k_series.ewm(span=m2-1, adjust=False).mean()
j_series = 3 * k_series - 2 * d_series
return k_series, d_series, j_series
except Exception as e:
logger.error(f"Error calculating KDJ: {e}")
return None, None, None
@staticmethod
def calculate_rsi(prices, period=14):
"""Calculate RSI"""
try:
if len(prices) < period:
return pd.Series([np.nan] * len(prices))
delta = prices.diff()
gain = (delta.where(delta > 0, 0)).rolling(window=period, min_periods=1).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=period, min_periods=1).mean()
# Avoid division by zero error
rs = gain / loss.replace(0, np.nan)
rsi = 100 - (100 / (1 + rs))
return rsi.fillna(50)
except Exception as e:
logger.error(f"Error calculating RSI: {e}")
return None
@staticmethod
def calculate_atr(df, period=14):
"""Calculate ATR"""
try:
if len(df) < period:
return pd.Series([np.nan] * len(df))
high_low = df['high'] - df['low']
high_close = np.abs(df['high'] - df['close'].shift())
low_close = np.abs(df['low'] - df['close'].shift())
true_range = np.maximum(high_low, np.maximum(high_close, low_close))
atr = true_range.rolling(window=period, min_periods=1).mean()
return atr
except Exception as e:
logger.error(f"Error calculating ATR: {e}")
return None
@staticmethod
def calculate_macd(prices, fast_period=12, slow_period=26, signal_period=9):
"""Calculate MACD"""
try:
ema_fast = prices.ewm(span=fast_period, adjust=False).mean()
ema_slow = prices.ewm(span=slow_period, adjust=False).mean()
macd_line = ema_fast - ema_slow
signal_line = macd_line.ewm(span=signal_period, adjust=False).mean()
histogram = macd_line - signal_line
return macd_line, signal_line, histogram
except Exception as e:
logger.error(f"Error calculating MACD: {e}")
return None, None, None
@staticmethod
def calculate_bollinger_bands(prices, period=20, std_dev=2):
"""Calculate Bollinger Bands"""
try:
if len(prices) < period:
logger.warning(f"Data length {len(prices)} insufficient, unable to calculate {period}-period Bollinger Bands")
return None, None, None
middle = prices.rolling(window=period).mean()
std = prices.rolling(window=period).std()
# Handle NaN standard deviation
std = std.fillna(0)
upper = middle + (std * std_dev)
lower = middle - (std * std_dev)
return upper, middle, lower
except Exception as e:
logger.error(f"Error calculating Bollinger Bands: {e}")
return None, None, None
@staticmethod
def calculate_sma(prices, period):
"""Calculate Simple Moving Average"""
try:
return prices.rolling(window=period).mean()
except Exception as e:
logger.error(f"Error calculating SMA: {e}")
return None
@staticmethod
def calculate_stochastic(df, k_period=14, d_period=3):
"""Calculate Stochastic Oscillator"""
try:
low_min = df['low'].rolling(window=k_period).min()
high_max = df['high'].rolling(window=k_period).max()
k = 100 * ((df['close'] - low_min) / (high_max - low_min))
d = k.rolling(window=d_period).mean()
return k, d
except Exception as e:
logger.error(f"Error calculating Stochastic: {e}")
return None, None
@staticmethod
def generate_signals(df):
"""Generate technical signals"""
signals = []
try:
# KDJ
k, d, j = TechnicalAnalyzer.calculate_kdj(df)
if k is not None and len(k) > 1:
latest_k, latest_d, latest_j = k.iloc[-1], d.iloc[-1], j.iloc[-1]
prev_k, prev_d, prev_j = k.iloc[-2], d.iloc[-2], j.iloc[-2]
if prev_k <= prev_d and latest_k > latest_d:
signals.append({'type': 'KDJ_GOLDEN_CROSS', 'action': 'BUY', 'strength': 'MEDIUM'})
elif prev_k >= prev_d and latest_k < latest_d:
signals.append({'type': 'KDJ_DEATH_CROSS', 'action': 'SELL', 'strength': 'MEDIUM'})
if latest_j < 20:
signals.append({'type': 'KDJ_OVERSOLD', 'action': 'BUY', 'strength': 'MEDIUM'})
elif latest_j > 80:
signals.append({'type': 'KDJ_OVERBOUGHT', 'action': 'SELL', 'strength': 'MEDIUM'})
# RSI
rsi = TechnicalAnalyzer.calculate_rsi(df['close'])
if rsi is not None and len(rsi) > 0:
latest_rsi = rsi.iloc[-1]
if latest_rsi < 30:
signals.append({'type': 'RSI_OVERSOLD', 'action': 'BUY', 'strength': 'MEDIUM'})
elif latest_rsi > 70:
signals.append({'type': 'RSI_OVERBOUGHT', 'action': 'SELL', 'strength': 'MEDIUM'})
# MACD
macd_line, signal_line, _ = TechnicalAnalyzer.calculate_macd(df['close'])
if macd_line is not None and len(macd_line) > 1:
latest_macd = macd_line.iloc[-1]
latest_signal = signal_line.iloc[-1]
prev_macd = macd_line.iloc[-2]
prev_signal = signal_line.iloc[-2]
if prev_macd <= prev_signal and latest_macd > latest_signal:
signals.append({'type': 'MACD_GOLDEN_CROSS', 'action': 'BUY', 'strength': 'MEDIUM'})
elif prev_macd >= prev_signal and latest_macd < latest_signal:
signals.append({'type': 'MACD_DEATH_CROSS', 'action': 'SELL', 'strength': 'MEDIUM'})
# Bollinger Bands
upper, middle, lower = TechnicalAnalyzer.calculate_bollinger_bands(df['close'])
if upper is not None:
latest_close = df['close'].iloc[-1]
if latest_close <= lower.iloc[-1]:
signals.append({'type': 'BOLlinger_LOWER_TOUCH', 'action': 'BUY', 'strength': 'MEDIUM'})
elif latest_close >= upper.iloc[-1]:
signals.append({'type': 'BOLlinger_UPPER_TOUCH', 'action': 'SELL', 'strength': 'MEDIUM'})
# Moving Averages
sma_short = TechnicalAnalyzer.calculate_sma(df['close'], 50)
sma_long = TechnicalAnalyzer.calculate_sma(df['close'], 200)
if sma_short is not None and sma_long is not None and len(sma_short) > 1:
latest_short = sma_short.iloc[-1]
latest_long = sma_long.iloc[-1]
prev_short = sma_short.iloc[-2]
prev_long = sma_long.iloc[-2]
if prev_short <= prev_long and latest_short > latest_long:
signals.append({'type': 'MA_GOLDEN_CROSS', 'action': 'BUY', 'strength': 'MEDIUM'})
elif prev_short >= prev_long and latest_short < latest_long:
signals.append({'type': 'MA_DEATH_CROSS', 'action': 'SELL', 'strength': 'MEDIUM'})
# Stochastic
k, d = TechnicalAnalyzer.calculate_stochastic(df)
if k is not None and len(k) > 1:
latest_k = k.iloc[-1]
latest_d = d.iloc[-1]
prev_k = k.iloc[-2]
prev_d = d.iloc[-2]
if latest_k < 20:
signals.append({'type': 'STOCH_OVERSOLD', 'action': 'BUY', 'strength': 'MEDIUM'})
elif latest_k > 80:
signals.append({'type': 'STOCH_OVERBOUGHT', 'action': 'SELL', 'strength': 'MEDIUM'})
if prev_k <= prev_d and latest_k > latest_d and latest_k < 80:
signals.append({'type': 'STOCH_GOLDEN_CROSS', 'action': 'BUY', 'strength': 'MEDIUM'})
elif prev_k >= prev_d and latest_k < latest_d and latest_k > 20:
signals.append({'type': 'STOCH_DEATH_CROSS', 'action': 'SELL', 'strength': 'MEDIUM'})
except Exception as e:
logger.error(f"Error generating technical signals: {e}")
return signals