610 lines
25 KiB
Python
610 lines
25 KiB
Python
import logging
|
|
import numpy as np
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class TechnicalAnalyzer:
|
|
"""Technical Analyzer"""
|
|
|
|
@staticmethod
|
|
def calculate_indicators(df):
|
|
"""Calculate technical indicators (optimized version)"""
|
|
if df is None or len(df) < 20: # Lower minimum data requirement
|
|
logger.warning("Insufficient data, unable to calculate technical indicators")
|
|
return {}
|
|
|
|
try:
|
|
indicators = {}
|
|
|
|
# KDJ indicator
|
|
k, d, j = TechnicalAnalyzer.calculate_kdj(df)
|
|
if k is not None:
|
|
indicators['kdj'] = (k, d, j)
|
|
|
|
# RSI indicator (multiple time periods)
|
|
rsi_14 = TechnicalAnalyzer.calculate_rsi(df['close'], 14)
|
|
rsi_7 = TechnicalAnalyzer.calculate_rsi(df['close'], 7)
|
|
rsi_21 = TechnicalAnalyzer.calculate_rsi(df['close'], 21)
|
|
if rsi_14 is not None:
|
|
indicators['rsi'] = rsi_14
|
|
indicators['rsi_7'] = rsi_7
|
|
indicators['rsi_21'] = rsi_21
|
|
|
|
# ATR indicator
|
|
atr = TechnicalAnalyzer.calculate_atr(df)
|
|
if atr is not None:
|
|
indicators['atr'] = atr
|
|
|
|
# MACD indicator
|
|
macd_line, signal_line, histogram = TechnicalAnalyzer.calculate_macd(df['close'])
|
|
if macd_line is not None:
|
|
indicators['macd'] = (macd_line, signal_line, histogram)
|
|
|
|
# Bollinger Bands (multiple standard deviations)
|
|
upper1, middle1, lower1 = TechnicalAnalyzer.calculate_bollinger_bands(df['close'], 20, 1)
|
|
upper2, middle2, lower2 = TechnicalAnalyzer.calculate_bollinger_bands(df['close'], 20, 2)
|
|
if upper1 is not None:
|
|
indicators['bollinger_1std'] = (upper1, middle1, lower1)
|
|
indicators['bollinger_2std'] = (upper2, middle2, lower2)
|
|
|
|
# Moving Averages (multiple periods)
|
|
sma_20 = TechnicalAnalyzer.calculate_sma(df['close'], 20)
|
|
sma_50 = TechnicalAnalyzer.calculate_sma(df['close'], 50)
|
|
sma_100 = TechnicalAnalyzer.calculate_sma(df['close'], 100)
|
|
ema_12 = TechnicalAnalyzer.calculate_ema(df['close'], 12)
|
|
ema_26 = TechnicalAnalyzer.calculate_ema(df['close'], 26)
|
|
if sma_20 is not None:
|
|
indicators['sma_20'] = sma_20
|
|
indicators['sma_50'] = sma_50
|
|
indicators['sma_100'] = sma_100
|
|
indicators['ema_12'] = ema_12
|
|
indicators['ema_26'] = ema_26
|
|
|
|
# Stochastic Oscillator
|
|
k, d = TechnicalAnalyzer.calculate_stochastic(df)
|
|
if k is not None:
|
|
indicators['stochastic'] = (k, d)
|
|
|
|
# Trend strength
|
|
indicators['trend_strength'] = TechnicalAnalyzer.calculate_trend_strength(df)
|
|
|
|
# Volatility
|
|
indicators['volatility'] = TechnicalAnalyzer.calculate_volatility(df)
|
|
|
|
# Current price
|
|
indicators['current_price'] = df['close'].iloc[-1]
|
|
|
|
return indicators
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error calculating technical indicators: {e}")
|
|
return {}
|
|
|
|
@staticmethod
|
|
def calculate_ema(prices, period):
|
|
"""Calculate Exponential Moving Average"""
|
|
try:
|
|
return prices.ewm(span=period, adjust=False).mean()
|
|
except Exception as e:
|
|
logger.error(f"Error calculating EMA: {e}")
|
|
return None
|
|
|
|
@staticmethod
|
|
def calculate_trend_strength(df, period=20):
|
|
"""Calculate trend strength"""
|
|
try:
|
|
if len(df) < period:
|
|
return 0
|
|
|
|
# Use linear regression to calculate trend strength
|
|
x = np.arange(len(df))
|
|
y = df['close'].values
|
|
|
|
# Linear regression
|
|
slope, intercept = np.polyfit(x[-period:], y[-period:], 1)
|
|
|
|
# Calculate R² value as trend strength
|
|
y_pred = slope * x[-period:] + intercept
|
|
ss_res = np.sum((y[-period:] - y_pred) ** 2)
|
|
ss_tot = np.sum((y[-period:] - np.mean(y[-period:])) ** 2)
|
|
r_squared = 1 - (ss_res / ss_tot) if ss_tot != 0 else 0
|
|
|
|
return abs(r_squared) # Take absolute value, trend strength doesn't distinguish positive/negative
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error calculating trend strength: {e}")
|
|
return 0
|
|
|
|
@staticmethod
|
|
def detect_rsi_divergence(df, rsi_period=14):
|
|
"""Detect RSI bullish divergence"""
|
|
try:
|
|
if len(df) < 30: # Need sufficient data
|
|
return False
|
|
|
|
# Calculate RSI
|
|
rsi = TechnicalAnalyzer.calculate_rsi(df['close'], rsi_period)
|
|
if rsi is None:
|
|
return False
|
|
|
|
# Use pandas methods to simplify calculation
|
|
# Find lowest points in recent 10 periods
|
|
recent_lows = df['low'].tail(10)
|
|
recent_rsi = rsi.tail(10)
|
|
|
|
# Find price lowest point and RSI lowest point
|
|
min_price_idx = recent_lows.idxmin()
|
|
min_rsi_idx = recent_rsi.idxmin()
|
|
|
|
# If price lowest point appears later than RSI lowest point, possible bullish divergence
|
|
if min_price_idx > min_rsi_idx:
|
|
# Check if price is making new lows while RSI is rising
|
|
price_trend = (recent_lows.iloc[-1] < recent_lows.iloc[-5])
|
|
rsi_trend = (recent_rsi.iloc[-1] > recent_rsi.iloc[-5])
|
|
|
|
return price_trend and rsi_trend and recent_rsi.iloc[-1] < 40
|
|
|
|
return False
|
|
|
|
except Exception as e:
|
|
logger.error(f"RSI divergence detection error: {e}")
|
|
return False
|
|
|
|
@staticmethod
|
|
def detect_macd_divergence(df):
|
|
"""Detect MACD bullish divergence"""
|
|
try:
|
|
if len(df) < 30:
|
|
return False
|
|
|
|
# Calculate MACD, especially focus on histogram
|
|
_, _, histogram = TechnicalAnalyzer.calculate_macd(df['close'])
|
|
if histogram is None:
|
|
return False
|
|
|
|
# Use recent data
|
|
recent_lows = df['low'].tail(10)
|
|
recent_hist = histogram.tail(10)
|
|
|
|
# Find price lowest point and MACD histogram lowest point
|
|
min_price_idx = recent_lows.idxmin()
|
|
min_hist_idx = recent_hist.idxmin()
|
|
|
|
# If price lowest point appears later than histogram lowest point, possible bullish divergence
|
|
if min_price_idx > min_hist_idx:
|
|
# Check price trend and histogram trend
|
|
price_trend = (recent_lows.iloc[-1] < recent_lows.iloc[-5])
|
|
hist_trend = (recent_hist.iloc[-1] > recent_hist.iloc[-5])
|
|
|
|
# Histogram rising from negative area is stronger signal
|
|
hist_improving = recent_hist.iloc[-1] > recent_hist.iloc[-3]
|
|
|
|
return price_trend and hist_trend and hist_improving
|
|
|
|
return False
|
|
|
|
except Exception as e:
|
|
logger.error(f"MACD divergence detection error: {e}")
|
|
return False
|
|
|
|
@staticmethod
|
|
def volume_confirmation(df, period=5):
|
|
"""Bottom volume confirmation"""
|
|
try:
|
|
if len(df) < period + 5:
|
|
return False
|
|
|
|
current_volume = df['volume'].iloc[-1]
|
|
avg_volume = df['volume'].tail(period).mean()
|
|
|
|
# Current volume significantly higher than average
|
|
volume_ratio = current_volume / avg_volume if avg_volume > 0 else 1
|
|
|
|
# Price falling but volume increasing (possible bottom accumulation)
|
|
price_change = (df['close'].iloc[-1] - df['close'].iloc[-2]) / df['close'].iloc[-2]
|
|
|
|
if volume_ratio > 1.5 and price_change < 0:
|
|
return True
|
|
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"Volume confirmation error: {e}")
|
|
return False
|
|
|
|
@staticmethod
|
|
def detect_hammer_pattern(df, lookback=5):
|
|
"""Detect hammer reversal pattern"""
|
|
try:
|
|
if len(df) < lookback + 1:
|
|
return False
|
|
|
|
latest = df.iloc[-1]
|
|
body_size = abs(latest['close'] - latest['open'])
|
|
total_range = latest['high'] - latest['low']
|
|
|
|
# Avoid division by zero
|
|
if total_range == 0:
|
|
return False
|
|
|
|
# Hammer characteristics: lower shadow at least 2x body, very short upper shadow
|
|
lower_shadow = min(latest['open'], latest['close']) - latest['low']
|
|
upper_shadow = latest['high'] - max(latest['open'], latest['close'])
|
|
|
|
is_hammer = (lower_shadow >= 2 * body_size and
|
|
upper_shadow <= body_size * 0.5 and
|
|
body_size > 0)
|
|
|
|
# Needs to be in downtrend
|
|
prev_trend = df['close'].iloc[-lookback] > df['close'].iloc[-1]
|
|
|
|
return is_hammer and prev_trend
|
|
|
|
except Exception as e:
|
|
logger.error(f"Hammer pattern detection error: {e}")
|
|
return False
|
|
|
|
@staticmethod
|
|
def detect_reversal_patterns(df):
|
|
"""Detect bottom reversal patterns"""
|
|
reversal_signals = []
|
|
|
|
try:
|
|
# RSI bullish divergence
|
|
if TechnicalAnalyzer.detect_rsi_divergence(df):
|
|
reversal_signals.append({
|
|
'type': 'RSI_DIVERGENCE_BULLISH',
|
|
'action': 'BUY',
|
|
'strength': 'MEDIUM',
|
|
'description': 'RSI bullish divergence, momentum divergence bullish'
|
|
})
|
|
|
|
# MACD bullish divergence
|
|
if TechnicalAnalyzer.detect_macd_divergence(df):
|
|
reversal_signals.append({
|
|
'type': 'MACD_DIVERGENCE_BULLISH',
|
|
'action': 'BUY',
|
|
'strength': 'MEDIUM',
|
|
'description': 'MACD histogram bullish divergence, momentum strengthening'
|
|
})
|
|
|
|
# Volume confirmation
|
|
if TechnicalAnalyzer.volume_confirmation(df):
|
|
reversal_signals.append({
|
|
'type': 'VOLUME_CONFIRMATION',
|
|
'action': 'BUY',
|
|
'strength': 'LOW',
|
|
'description': 'Bottom volume increase, signs of capital entry'
|
|
})
|
|
|
|
# Add hammer pattern detection
|
|
if TechnicalAnalyzer.detect_hammer_pattern(df):
|
|
reversal_signals.append({
|
|
'type': 'HAMMER_PATTERN',
|
|
'action': 'BUY',
|
|
'strength': 'LOW',
|
|
'description': 'Hammer pattern, short-term reversal signal'
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error(f"Reversal pattern detection error: {e}")
|
|
|
|
return reversal_signals
|
|
|
|
@staticmethod
|
|
def calculate_support_resistance(df, period=20):
|
|
"""Calculate support resistance levels (fixed version)"""
|
|
try:
|
|
# Recent high resistance
|
|
resistance = df['high'].rolling(window=period).max().iloc[-1]
|
|
|
|
# Recent low support
|
|
support = df['low'].rolling(window=period).min().iloc[-1]
|
|
|
|
# Dynamic support resistance (based on Bollinger Bands)
|
|
# Fix: correctly receive three return values from Bollinger Bands
|
|
upper_bb, middle_bb, lower_bb = TechnicalAnalyzer.calculate_bollinger_bands(df['close'], 20, 2)
|
|
|
|
# Check if Bollinger Band calculation successful
|
|
if upper_bb is not None and lower_bb is not None:
|
|
resistance_bb = upper_bb.iloc[-1] if hasattr(upper_bb, 'iloc') else upper_bb
|
|
support_bb = lower_bb.iloc[-1] if hasattr(lower_bb, 'iloc') else lower_bb
|
|
else:
|
|
# If Bollinger Band calculation fails, use static values as backup
|
|
resistance_bb = resistance
|
|
support_bb = support
|
|
|
|
current_price = df['close'].iloc[-1]
|
|
|
|
return {
|
|
'static_resistance': resistance,
|
|
'static_support': support,
|
|
'dynamic_resistance': resistance_bb,
|
|
'dynamic_support': support_bb,
|
|
'current_vs_resistance': (current_price - resistance) / resistance * 100 if resistance > 0 else 0,
|
|
'current_vs_support': (current_price - support) / support * 100 if support > 0 else 0
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Error calculating support resistance: {e}")
|
|
# Return default values to avoid subsequent errors
|
|
current_price = df['close'].iloc[-1] if len(df) > 0 else 0
|
|
return {
|
|
'static_resistance': current_price * 1.1 if current_price > 0 else 0,
|
|
'static_support': current_price * 0.9 if current_price > 0 else 0,
|
|
'dynamic_resistance': current_price * 1.05 if current_price > 0 else 0,
|
|
'dynamic_support': current_price * 0.95 if current_price > 0 else 0,
|
|
'current_vs_resistance': 0,
|
|
'current_vs_support': 0
|
|
}
|
|
|
|
@staticmethod
|
|
def calculate_volatility(df, period=20):
|
|
"""Calculate volatility"""
|
|
try:
|
|
returns = df['close'].pct_change().dropna()
|
|
volatility = returns.rolling(window=period).std() * np.sqrt(365) # Annualized volatility
|
|
return volatility.iloc[-1] if len(volatility) > 0 else 0
|
|
except Exception as e:
|
|
logger.error(f"Error calculating volatility: {e}")
|
|
return 0
|
|
|
|
@staticmethod
|
|
def generate_weighted_signals(df):
|
|
"""Generate weighted technical signals"""
|
|
signals = []
|
|
weights = {
|
|
'KDJ_GOLDEN_CROSS': 0.15,
|
|
'MACD_GOLDEN_CROSS': 0.20,
|
|
'MA_GOLDEN_CROSS': 0.25,
|
|
'RSI_OVERSOLD': 0.15,
|
|
'BOLLINGER_LOWER_TOUCH': 0.15,
|
|
'STOCH_GOLDEN_CROSS': 0.10
|
|
}
|
|
|
|
# Calculate various indicator signals
|
|
technical_signals = TechnicalAnalyzer.generate_signals(df)
|
|
|
|
# Calculate weighted score
|
|
total_score = 0
|
|
signal_count = 0
|
|
|
|
for signal in technical_signals:
|
|
weight = weights.get(signal['type'], 0.05)
|
|
strength_multiplier = 1.0 if signal['strength'] == 'STRONG' else 0.7
|
|
total_score += weight * strength_multiplier
|
|
signal_count += 1
|
|
|
|
# Normalize score
|
|
normalized_score = min(1.0, total_score)
|
|
|
|
# Determine action
|
|
if normalized_score > 0.6:
|
|
action = 'BUY'
|
|
elif normalized_score < 0.3:
|
|
action = 'SELL'
|
|
else:
|
|
action = 'HOLD'
|
|
|
|
return {
|
|
'score': normalized_score,
|
|
'action': action,
|
|
'signal_count': signal_count,
|
|
'signals': technical_signals,
|
|
'confidence': 'HIGH' if normalized_score > 0.7 or normalized_score < 0.2 else 'MEDIUM'
|
|
}
|
|
|
|
@staticmethod
|
|
def calculate_kdj(df, n=9, m1=3, m2=3):
|
|
"""Calculate KDJ indicator"""
|
|
try:
|
|
if len(df) < n:
|
|
return pd.Series([np.nan] * len(df)), pd.Series([np.nan] * len(df)), pd.Series([np.nan] * len(df))
|
|
|
|
low_list = df['low'].rolling(window=n, min_periods=1).min()
|
|
high_list = df['high'].rolling(window=n, min_periods=1).max()
|
|
|
|
# Avoid division by zero error
|
|
denominator = high_list - low_list
|
|
denominator = denominator.replace(0, np.nan)
|
|
|
|
rsv = ((df['close'] - low_list) / denominator) * 100
|
|
rsv = rsv.fillna(50)
|
|
|
|
k_series = rsv.ewm(span=m1-1, adjust=False).mean()
|
|
d_series = k_series.ewm(span=m2-1, adjust=False).mean()
|
|
j_series = 3 * k_series - 2 * d_series
|
|
|
|
return k_series, d_series, j_series
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error calculating KDJ: {e}")
|
|
return None, None, None
|
|
|
|
@staticmethod
|
|
def calculate_rsi(prices, period=14):
|
|
"""Calculate RSI"""
|
|
try:
|
|
if len(prices) < period:
|
|
return pd.Series([np.nan] * len(prices))
|
|
|
|
delta = prices.diff()
|
|
gain = (delta.where(delta > 0, 0)).rolling(window=period, min_periods=1).mean()
|
|
loss = (-delta.where(delta < 0, 0)).rolling(window=period, min_periods=1).mean()
|
|
|
|
# Avoid division by zero error
|
|
rs = gain / loss.replace(0, np.nan)
|
|
rsi = 100 - (100 / (1 + rs))
|
|
return rsi.fillna(50)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error calculating RSI: {e}")
|
|
return None
|
|
|
|
@staticmethod
|
|
def calculate_atr(df, period=14):
|
|
"""Calculate ATR"""
|
|
try:
|
|
if len(df) < period:
|
|
return pd.Series([np.nan] * len(df))
|
|
|
|
high_low = df['high'] - df['low']
|
|
high_close = np.abs(df['high'] - df['close'].shift())
|
|
low_close = np.abs(df['low'] - df['close'].shift())
|
|
|
|
true_range = np.maximum(high_low, np.maximum(high_close, low_close))
|
|
atr = true_range.rolling(window=period, min_periods=1).mean()
|
|
return atr
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error calculating ATR: {e}")
|
|
return None
|
|
|
|
@staticmethod
|
|
def calculate_macd(prices, fast_period=12, slow_period=26, signal_period=9):
|
|
"""Calculate MACD"""
|
|
try:
|
|
ema_fast = prices.ewm(span=fast_period, adjust=False).mean()
|
|
ema_slow = prices.ewm(span=slow_period, adjust=False).mean()
|
|
macd_line = ema_fast - ema_slow
|
|
signal_line = macd_line.ewm(span=signal_period, adjust=False).mean()
|
|
histogram = macd_line - signal_line
|
|
return macd_line, signal_line, histogram
|
|
except Exception as e:
|
|
logger.error(f"Error calculating MACD: {e}")
|
|
return None, None, None
|
|
|
|
@staticmethod
|
|
def calculate_bollinger_bands(prices, period=20, std_dev=2):
|
|
"""Calculate Bollinger Bands"""
|
|
try:
|
|
if len(prices) < period:
|
|
logger.warning(f"Data length {len(prices)} insufficient, unable to calculate {period}-period Bollinger Bands")
|
|
return None, None, None
|
|
|
|
middle = prices.rolling(window=period).mean()
|
|
std = prices.rolling(window=period).std()
|
|
|
|
# Handle NaN standard deviation
|
|
std = std.fillna(0)
|
|
|
|
upper = middle + (std * std_dev)
|
|
lower = middle - (std * std_dev)
|
|
|
|
return upper, middle, lower
|
|
except Exception as e:
|
|
logger.error(f"Error calculating Bollinger Bands: {e}")
|
|
return None, None, None
|
|
|
|
@staticmethod
|
|
def calculate_sma(prices, period):
|
|
"""Calculate Simple Moving Average"""
|
|
try:
|
|
return prices.rolling(window=period).mean()
|
|
except Exception as e:
|
|
logger.error(f"Error calculating SMA: {e}")
|
|
return None
|
|
|
|
@staticmethod
|
|
def calculate_stochastic(df, k_period=14, d_period=3):
|
|
"""Calculate Stochastic Oscillator"""
|
|
try:
|
|
low_min = df['low'].rolling(window=k_period).min()
|
|
high_max = df['high'].rolling(window=k_period).max()
|
|
k = 100 * ((df['close'] - low_min) / (high_max - low_min))
|
|
d = k.rolling(window=d_period).mean()
|
|
return k, d
|
|
except Exception as e:
|
|
logger.error(f"Error calculating Stochastic: {e}")
|
|
return None, None
|
|
|
|
@staticmethod
|
|
def generate_signals(df):
|
|
"""Generate technical signals"""
|
|
signals = []
|
|
|
|
try:
|
|
# KDJ
|
|
k, d, j = TechnicalAnalyzer.calculate_kdj(df)
|
|
if k is not None and len(k) > 1:
|
|
latest_k, latest_d, latest_j = k.iloc[-1], d.iloc[-1], j.iloc[-1]
|
|
prev_k, prev_d, prev_j = k.iloc[-2], d.iloc[-2], j.iloc[-2]
|
|
|
|
if prev_k <= prev_d and latest_k > latest_d:
|
|
signals.append({'type': 'KDJ_GOLDEN_CROSS', 'action': 'BUY', 'strength': 'MEDIUM'})
|
|
elif prev_k >= prev_d and latest_k < latest_d:
|
|
signals.append({'type': 'KDJ_DEATH_CROSS', 'action': 'SELL', 'strength': 'MEDIUM'})
|
|
|
|
if latest_j < 20:
|
|
signals.append({'type': 'KDJ_OVERSOLD', 'action': 'BUY', 'strength': 'MEDIUM'})
|
|
elif latest_j > 80:
|
|
signals.append({'type': 'KDJ_OVERBOUGHT', 'action': 'SELL', 'strength': 'MEDIUM'})
|
|
|
|
# RSI
|
|
rsi = TechnicalAnalyzer.calculate_rsi(df['close'])
|
|
if rsi is not None and len(rsi) > 0:
|
|
latest_rsi = rsi.iloc[-1]
|
|
|
|
if latest_rsi < 30:
|
|
signals.append({'type': 'RSI_OVERSOLD', 'action': 'BUY', 'strength': 'MEDIUM'})
|
|
elif latest_rsi > 70:
|
|
signals.append({'type': 'RSI_OVERBOUGHT', 'action': 'SELL', 'strength': 'MEDIUM'})
|
|
|
|
# MACD
|
|
macd_line, signal_line, _ = TechnicalAnalyzer.calculate_macd(df['close'])
|
|
if macd_line is not None and len(macd_line) > 1:
|
|
latest_macd = macd_line.iloc[-1]
|
|
latest_signal = signal_line.iloc[-1]
|
|
prev_macd = macd_line.iloc[-2]
|
|
prev_signal = signal_line.iloc[-2]
|
|
|
|
if prev_macd <= prev_signal and latest_macd > latest_signal:
|
|
signals.append({'type': 'MACD_GOLDEN_CROSS', 'action': 'BUY', 'strength': 'MEDIUM'})
|
|
elif prev_macd >= prev_signal and latest_macd < latest_signal:
|
|
signals.append({'type': 'MACD_DEATH_CROSS', 'action': 'SELL', 'strength': 'MEDIUM'})
|
|
|
|
# Bollinger Bands
|
|
upper, middle, lower = TechnicalAnalyzer.calculate_bollinger_bands(df['close'])
|
|
if upper is not None:
|
|
latest_close = df['close'].iloc[-1]
|
|
if latest_close <= lower.iloc[-1]:
|
|
signals.append({'type': 'BOLlinger_LOWER_TOUCH', 'action': 'BUY', 'strength': 'MEDIUM'})
|
|
elif latest_close >= upper.iloc[-1]:
|
|
signals.append({'type': 'BOLlinger_UPPER_TOUCH', 'action': 'SELL', 'strength': 'MEDIUM'})
|
|
|
|
# Moving Averages
|
|
sma_short = TechnicalAnalyzer.calculate_sma(df['close'], 50)
|
|
sma_long = TechnicalAnalyzer.calculate_sma(df['close'], 200)
|
|
if sma_short is not None and sma_long is not None and len(sma_short) > 1:
|
|
latest_short = sma_short.iloc[-1]
|
|
latest_long = sma_long.iloc[-1]
|
|
prev_short = sma_short.iloc[-2]
|
|
prev_long = sma_long.iloc[-2]
|
|
|
|
if prev_short <= prev_long and latest_short > latest_long:
|
|
signals.append({'type': 'MA_GOLDEN_CROSS', 'action': 'BUY', 'strength': 'MEDIUM'})
|
|
elif prev_short >= prev_long and latest_short < latest_long:
|
|
signals.append({'type': 'MA_DEATH_CROSS', 'action': 'SELL', 'strength': 'MEDIUM'})
|
|
|
|
# Stochastic
|
|
k, d = TechnicalAnalyzer.calculate_stochastic(df)
|
|
if k is not None and len(k) > 1:
|
|
latest_k = k.iloc[-1]
|
|
latest_d = d.iloc[-1]
|
|
prev_k = k.iloc[-2]
|
|
prev_d = d.iloc[-2]
|
|
|
|
if latest_k < 20:
|
|
signals.append({'type': 'STOCH_OVERSOLD', 'action': 'BUY', 'strength': 'MEDIUM'})
|
|
elif latest_k > 80:
|
|
signals.append({'type': 'STOCH_OVERBOUGHT', 'action': 'SELL', 'strength': 'MEDIUM'})
|
|
|
|
if prev_k <= prev_d and latest_k > latest_d and latest_k < 80:
|
|
signals.append({'type': 'STOCH_GOLDEN_CROSS', 'action': 'BUY', 'strength': 'MEDIUM'})
|
|
elif prev_k >= prev_d and latest_k < latest_d and latest_k > 20:
|
|
signals.append({'type': 'STOCH_DEATH_CROSS', 'action': 'SELL', 'strength': 'MEDIUM'})
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error generating technical signals: {e}")
|
|
|
|
return signals
|