import logging import numpy as np logger = logging.getLogger(__name__) class TechnicalAnalyzer: """Technical Analyzer""" @staticmethod def calculate_indicators(df): """Calculate technical indicators (optimized version)""" if df is None or len(df) < 20: # Lower minimum data requirement logger.warning("Insufficient data, unable to calculate technical indicators") return {} try: indicators = {} # KDJ indicator k, d, j = TechnicalAnalyzer.calculate_kdj(df) if k is not None: indicators['kdj'] = (k, d, j) # RSI indicator (multiple time periods) rsi_14 = TechnicalAnalyzer.calculate_rsi(df['close'], 14) rsi_7 = TechnicalAnalyzer.calculate_rsi(df['close'], 7) rsi_21 = TechnicalAnalyzer.calculate_rsi(df['close'], 21) if rsi_14 is not None: indicators['rsi'] = rsi_14 indicators['rsi_7'] = rsi_7 indicators['rsi_21'] = rsi_21 # ATR indicator atr = TechnicalAnalyzer.calculate_atr(df) if atr is not None: indicators['atr'] = atr # MACD indicator macd_line, signal_line, histogram = TechnicalAnalyzer.calculate_macd(df['close']) if macd_line is not None: indicators['macd'] = (macd_line, signal_line, histogram) # Bollinger Bands (multiple standard deviations) upper1, middle1, lower1 = TechnicalAnalyzer.calculate_bollinger_bands(df['close'], 20, 1) upper2, middle2, lower2 = TechnicalAnalyzer.calculate_bollinger_bands(df['close'], 20, 2) if upper1 is not None: indicators['bollinger_1std'] = (upper1, middle1, lower1) indicators['bollinger_2std'] = (upper2, middle2, lower2) # Moving Averages (multiple periods) sma_20 = TechnicalAnalyzer.calculate_sma(df['close'], 20) sma_50 = TechnicalAnalyzer.calculate_sma(df['close'], 50) sma_100 = TechnicalAnalyzer.calculate_sma(df['close'], 100) ema_12 = TechnicalAnalyzer.calculate_ema(df['close'], 12) ema_26 = TechnicalAnalyzer.calculate_ema(df['close'], 26) if sma_20 is not None: indicators['sma_20'] = sma_20 indicators['sma_50'] = sma_50 indicators['sma_100'] = sma_100 indicators['ema_12'] = ema_12 indicators['ema_26'] = ema_26 # Stochastic Oscillator k, d = TechnicalAnalyzer.calculate_stochastic(df) if k is not None: indicators['stochastic'] = (k, d) # Trend strength indicators['trend_strength'] = TechnicalAnalyzer.calculate_trend_strength(df) # Volatility indicators['volatility'] = TechnicalAnalyzer.calculate_volatility(df) # Current price indicators['current_price'] = df['close'].iloc[-1] return indicators except Exception as e: logger.error(f"Error calculating technical indicators: {e}") return {} @staticmethod def calculate_ema(prices, period): """Calculate Exponential Moving Average""" try: return prices.ewm(span=period, adjust=False).mean() except Exception as e: logger.error(f"Error calculating EMA: {e}") return None @staticmethod def calculate_trend_strength(df, period=20): """Calculate trend strength""" try: if len(df) < period: return 0 # Use linear regression to calculate trend strength x = np.arange(len(df)) y = df['close'].values # Linear regression slope, intercept = np.polyfit(x[-period:], y[-period:], 1) # Calculate R² value as trend strength y_pred = slope * x[-period:] + intercept ss_res = np.sum((y[-period:] - y_pred) ** 2) ss_tot = np.sum((y[-period:] - np.mean(y[-period:])) ** 2) r_squared = 1 - (ss_res / ss_tot) if ss_tot != 0 else 0 return abs(r_squared) # Take absolute value, trend strength doesn't distinguish positive/negative except Exception as e: logger.error(f"Error calculating trend strength: {e}") return 0 @staticmethod def detect_rsi_divergence(df, rsi_period=14): """Detect RSI bullish divergence""" try: if len(df) < 30: # Need sufficient data return False # Calculate RSI rsi = TechnicalAnalyzer.calculate_rsi(df['close'], rsi_period) if rsi is None: return False # Use pandas methods to simplify calculation # Find lowest points in recent 10 periods recent_lows = df['low'].tail(10) recent_rsi = rsi.tail(10) # Find price lowest point and RSI lowest point min_price_idx = recent_lows.idxmin() min_rsi_idx = recent_rsi.idxmin() # If price lowest point appears later than RSI lowest point, possible bullish divergence if min_price_idx > min_rsi_idx: # Check if price is making new lows while RSI is rising price_trend = (recent_lows.iloc[-1] < recent_lows.iloc[-5]) rsi_trend = (recent_rsi.iloc[-1] > recent_rsi.iloc[-5]) return price_trend and rsi_trend and recent_rsi.iloc[-1] < 40 return False except Exception as e: logger.error(f"RSI divergence detection error: {e}") return False @staticmethod def detect_macd_divergence(df): """Detect MACD bullish divergence""" try: if len(df) < 30: return False # Calculate MACD, especially focus on histogram _, _, histogram = TechnicalAnalyzer.calculate_macd(df['close']) if histogram is None: return False # Use recent data recent_lows = df['low'].tail(10) recent_hist = histogram.tail(10) # Find price lowest point and MACD histogram lowest point min_price_idx = recent_lows.idxmin() min_hist_idx = recent_hist.idxmin() # If price lowest point appears later than histogram lowest point, possible bullish divergence if min_price_idx > min_hist_idx: # Check price trend and histogram trend price_trend = (recent_lows.iloc[-1] < recent_lows.iloc[-5]) hist_trend = (recent_hist.iloc[-1] > recent_hist.iloc[-5]) # Histogram rising from negative area is stronger signal hist_improving = recent_hist.iloc[-1] > recent_hist.iloc[-3] return price_trend and hist_trend and hist_improving return False except Exception as e: logger.error(f"MACD divergence detection error: {e}") return False @staticmethod def volume_confirmation(df, period=5): """Bottom volume confirmation""" try: if len(df) < period + 5: return False current_volume = df['volume'].iloc[-1] avg_volume = df['volume'].tail(period).mean() # Current volume significantly higher than average volume_ratio = current_volume / avg_volume if avg_volume > 0 else 1 # Price falling but volume increasing (possible bottom accumulation) price_change = (df['close'].iloc[-1] - df['close'].iloc[-2]) / df['close'].iloc[-2] if volume_ratio > 1.5 and price_change < 0: return True return False except Exception as e: logger.error(f"Volume confirmation error: {e}") return False @staticmethod def detect_hammer_pattern(df, lookback=5): """Detect hammer reversal pattern""" try: if len(df) < lookback + 1: return False latest = df.iloc[-1] body_size = abs(latest['close'] - latest['open']) total_range = latest['high'] - latest['low'] # Avoid division by zero if total_range == 0: return False # Hammer characteristics: lower shadow at least 2x body, very short upper shadow lower_shadow = min(latest['open'], latest['close']) - latest['low'] upper_shadow = latest['high'] - max(latest['open'], latest['close']) is_hammer = (lower_shadow >= 2 * body_size and upper_shadow <= body_size * 0.5 and body_size > 0) # Needs to be in downtrend prev_trend = df['close'].iloc[-lookback] > df['close'].iloc[-1] return is_hammer and prev_trend except Exception as e: logger.error(f"Hammer pattern detection error: {e}") return False @staticmethod def detect_reversal_patterns(df): """Detect bottom reversal patterns""" reversal_signals = [] try: # RSI bullish divergence if TechnicalAnalyzer.detect_rsi_divergence(df): reversal_signals.append({ 'type': 'RSI_DIVERGENCE_BULLISH', 'action': 'BUY', 'strength': 'MEDIUM', 'description': 'RSI bullish divergence, momentum divergence bullish' }) # MACD bullish divergence if TechnicalAnalyzer.detect_macd_divergence(df): reversal_signals.append({ 'type': 'MACD_DIVERGENCE_BULLISH', 'action': 'BUY', 'strength': 'MEDIUM', 'description': 'MACD histogram bullish divergence, momentum strengthening' }) # Volume confirmation if TechnicalAnalyzer.volume_confirmation(df): reversal_signals.append({ 'type': 'VOLUME_CONFIRMATION', 'action': 'BUY', 'strength': 'LOW', 'description': 'Bottom volume increase, signs of capital entry' }) # Add hammer pattern detection if TechnicalAnalyzer.detect_hammer_pattern(df): reversal_signals.append({ 'type': 'HAMMER_PATTERN', 'action': 'BUY', 'strength': 'LOW', 'description': 'Hammer pattern, short-term reversal signal' }) except Exception as e: logger.error(f"Reversal pattern detection error: {e}") return reversal_signals @staticmethod def calculate_support_resistance(df, period=20): """Calculate support resistance levels (fixed version)""" try: # Recent high resistance resistance = df['high'].rolling(window=period).max().iloc[-1] # Recent low support support = df['low'].rolling(window=period).min().iloc[-1] # Dynamic support resistance (based on Bollinger Bands) # Fix: correctly receive three return values from Bollinger Bands upper_bb, middle_bb, lower_bb = TechnicalAnalyzer.calculate_bollinger_bands(df['close'], 20, 2) # Check if Bollinger Band calculation successful if upper_bb is not None and lower_bb is not None: resistance_bb = upper_bb.iloc[-1] if hasattr(upper_bb, 'iloc') else upper_bb support_bb = lower_bb.iloc[-1] if hasattr(lower_bb, 'iloc') else lower_bb else: # If Bollinger Band calculation fails, use static values as backup resistance_bb = resistance support_bb = support current_price = df['close'].iloc[-1] return { 'static_resistance': resistance, 'static_support': support, 'dynamic_resistance': resistance_bb, 'dynamic_support': support_bb, 'current_vs_resistance': (current_price - resistance) / resistance * 100 if resistance > 0 else 0, 'current_vs_support': (current_price - support) / support * 100 if support > 0 else 0 } except Exception as e: logger.error(f"Error calculating support resistance: {e}") # Return default values to avoid subsequent errors current_price = df['close'].iloc[-1] if len(df) > 0 else 0 return { 'static_resistance': current_price * 1.1 if current_price > 0 else 0, 'static_support': current_price * 0.9 if current_price > 0 else 0, 'dynamic_resistance': current_price * 1.05 if current_price > 0 else 0, 'dynamic_support': current_price * 0.95 if current_price > 0 else 0, 'current_vs_resistance': 0, 'current_vs_support': 0 } @staticmethod def calculate_volatility(df, period=20): """Calculate volatility""" try: returns = df['close'].pct_change().dropna() volatility = returns.rolling(window=period).std() * np.sqrt(365) # Annualized volatility return volatility.iloc[-1] if len(volatility) > 0 else 0 except Exception as e: logger.error(f"Error calculating volatility: {e}") return 0 @staticmethod def generate_weighted_signals(df): """Generate weighted technical signals""" signals = [] weights = { 'KDJ_GOLDEN_CROSS': 0.15, 'MACD_GOLDEN_CROSS': 0.20, 'MA_GOLDEN_CROSS': 0.25, 'RSI_OVERSOLD': 0.15, 'BOLLINGER_LOWER_TOUCH': 0.15, 'STOCH_GOLDEN_CROSS': 0.10 } # Calculate various indicator signals technical_signals = TechnicalAnalyzer.generate_signals(df) # Calculate weighted score total_score = 0 signal_count = 0 for signal in technical_signals: weight = weights.get(signal['type'], 0.05) strength_multiplier = 1.0 if signal['strength'] == 'STRONG' else 0.7 total_score += weight * strength_multiplier signal_count += 1 # Normalize score normalized_score = min(1.0, total_score) # Determine action if normalized_score > 0.6: action = 'BUY' elif normalized_score < 0.3: action = 'SELL' else: action = 'HOLD' return { 'score': normalized_score, 'action': action, 'signal_count': signal_count, 'signals': technical_signals, 'confidence': 'HIGH' if normalized_score > 0.7 or normalized_score < 0.2 else 'MEDIUM' } @staticmethod def calculate_kdj(df, n=9, m1=3, m2=3): """Calculate KDJ indicator""" try: if len(df) < n: return pd.Series([np.nan] * len(df)), pd.Series([np.nan] * len(df)), pd.Series([np.nan] * len(df)) low_list = df['low'].rolling(window=n, min_periods=1).min() high_list = df['high'].rolling(window=n, min_periods=1).max() # Avoid division by zero error denominator = high_list - low_list denominator = denominator.replace(0, np.nan) rsv = ((df['close'] - low_list) / denominator) * 100 rsv = rsv.fillna(50) k_series = rsv.ewm(span=m1-1, adjust=False).mean() d_series = k_series.ewm(span=m2-1, adjust=False).mean() j_series = 3 * k_series - 2 * d_series return k_series, d_series, j_series except Exception as e: logger.error(f"Error calculating KDJ: {e}") return None, None, None @staticmethod def calculate_rsi(prices, period=14): """Calculate RSI""" try: if len(prices) < period: return pd.Series([np.nan] * len(prices)) delta = prices.diff() gain = (delta.where(delta > 0, 0)).rolling(window=period, min_periods=1).mean() loss = (-delta.where(delta < 0, 0)).rolling(window=period, min_periods=1).mean() # Avoid division by zero error rs = gain / loss.replace(0, np.nan) rsi = 100 - (100 / (1 + rs)) return rsi.fillna(50) except Exception as e: logger.error(f"Error calculating RSI: {e}") return None @staticmethod def calculate_atr(df, period=14): """Calculate ATR""" try: if len(df) < period: return pd.Series([np.nan] * len(df)) high_low = df['high'] - df['low'] high_close = np.abs(df['high'] - df['close'].shift()) low_close = np.abs(df['low'] - df['close'].shift()) true_range = np.maximum(high_low, np.maximum(high_close, low_close)) atr = true_range.rolling(window=period, min_periods=1).mean() return atr except Exception as e: logger.error(f"Error calculating ATR: {e}") return None @staticmethod def calculate_macd(prices, fast_period=12, slow_period=26, signal_period=9): """Calculate MACD""" try: ema_fast = prices.ewm(span=fast_period, adjust=False).mean() ema_slow = prices.ewm(span=slow_period, adjust=False).mean() macd_line = ema_fast - ema_slow signal_line = macd_line.ewm(span=signal_period, adjust=False).mean() histogram = macd_line - signal_line return macd_line, signal_line, histogram except Exception as e: logger.error(f"Error calculating MACD: {e}") return None, None, None @staticmethod def calculate_bollinger_bands(prices, period=20, std_dev=2): """Calculate Bollinger Bands""" try: if len(prices) < period: logger.warning(f"Data length {len(prices)} insufficient, unable to calculate {period}-period Bollinger Bands") return None, None, None middle = prices.rolling(window=period).mean() std = prices.rolling(window=period).std() # Handle NaN standard deviation std = std.fillna(0) upper = middle + (std * std_dev) lower = middle - (std * std_dev) return upper, middle, lower except Exception as e: logger.error(f"Error calculating Bollinger Bands: {e}") return None, None, None @staticmethod def calculate_sma(prices, period): """Calculate Simple Moving Average""" try: return prices.rolling(window=period).mean() except Exception as e: logger.error(f"Error calculating SMA: {e}") return None @staticmethod def calculate_stochastic(df, k_period=14, d_period=3): """Calculate Stochastic Oscillator""" try: low_min = df['low'].rolling(window=k_period).min() high_max = df['high'].rolling(window=k_period).max() k = 100 * ((df['close'] - low_min) / (high_max - low_min)) d = k.rolling(window=d_period).mean() return k, d except Exception as e: logger.error(f"Error calculating Stochastic: {e}") return None, None @staticmethod def generate_signals(df): """Generate technical signals""" signals = [] try: # KDJ k, d, j = TechnicalAnalyzer.calculate_kdj(df) if k is not None and len(k) > 1: latest_k, latest_d, latest_j = k.iloc[-1], d.iloc[-1], j.iloc[-1] prev_k, prev_d, prev_j = k.iloc[-2], d.iloc[-2], j.iloc[-2] if prev_k <= prev_d and latest_k > latest_d: signals.append({'type': 'KDJ_GOLDEN_CROSS', 'action': 'BUY', 'strength': 'MEDIUM'}) elif prev_k >= prev_d and latest_k < latest_d: signals.append({'type': 'KDJ_DEATH_CROSS', 'action': 'SELL', 'strength': 'MEDIUM'}) if latest_j < 20: signals.append({'type': 'KDJ_OVERSOLD', 'action': 'BUY', 'strength': 'MEDIUM'}) elif latest_j > 80: signals.append({'type': 'KDJ_OVERBOUGHT', 'action': 'SELL', 'strength': 'MEDIUM'}) # RSI rsi = TechnicalAnalyzer.calculate_rsi(df['close']) if rsi is not None and len(rsi) > 0: latest_rsi = rsi.iloc[-1] if latest_rsi < 30: signals.append({'type': 'RSI_OVERSOLD', 'action': 'BUY', 'strength': 'MEDIUM'}) elif latest_rsi > 70: signals.append({'type': 'RSI_OVERBOUGHT', 'action': 'SELL', 'strength': 'MEDIUM'}) # MACD macd_line, signal_line, _ = TechnicalAnalyzer.calculate_macd(df['close']) if macd_line is not None and len(macd_line) > 1: latest_macd = macd_line.iloc[-1] latest_signal = signal_line.iloc[-1] prev_macd = macd_line.iloc[-2] prev_signal = signal_line.iloc[-2] if prev_macd <= prev_signal and latest_macd > latest_signal: signals.append({'type': 'MACD_GOLDEN_CROSS', 'action': 'BUY', 'strength': 'MEDIUM'}) elif prev_macd >= prev_signal and latest_macd < latest_signal: signals.append({'type': 'MACD_DEATH_CROSS', 'action': 'SELL', 'strength': 'MEDIUM'}) # Bollinger Bands upper, middle, lower = TechnicalAnalyzer.calculate_bollinger_bands(df['close']) if upper is not None: latest_close = df['close'].iloc[-1] if latest_close <= lower.iloc[-1]: signals.append({'type': 'BOLlinger_LOWER_TOUCH', 'action': 'BUY', 'strength': 'MEDIUM'}) elif latest_close >= upper.iloc[-1]: signals.append({'type': 'BOLlinger_UPPER_TOUCH', 'action': 'SELL', 'strength': 'MEDIUM'}) # Moving Averages sma_short = TechnicalAnalyzer.calculate_sma(df['close'], 50) sma_long = TechnicalAnalyzer.calculate_sma(df['close'], 200) if sma_short is not None and sma_long is not None and len(sma_short) > 1: latest_short = sma_short.iloc[-1] latest_long = sma_long.iloc[-1] prev_short = sma_short.iloc[-2] prev_long = sma_long.iloc[-2] if prev_short <= prev_long and latest_short > latest_long: signals.append({'type': 'MA_GOLDEN_CROSS', 'action': 'BUY', 'strength': 'MEDIUM'}) elif prev_short >= prev_long and latest_short < latest_long: signals.append({'type': 'MA_DEATH_CROSS', 'action': 'SELL', 'strength': 'MEDIUM'}) # Stochastic k, d = TechnicalAnalyzer.calculate_stochastic(df) if k is not None and len(k) > 1: latest_k = k.iloc[-1] latest_d = d.iloc[-1] prev_k = k.iloc[-2] prev_d = d.iloc[-2] if latest_k < 20: signals.append({'type': 'STOCH_OVERSOLD', 'action': 'BUY', 'strength': 'MEDIUM'}) elif latest_k > 80: signals.append({'type': 'STOCH_OVERBOUGHT', 'action': 'SELL', 'strength': 'MEDIUM'}) if prev_k <= prev_d and latest_k > latest_d and latest_k < 80: signals.append({'type': 'STOCH_GOLDEN_CROSS', 'action': 'BUY', 'strength': 'MEDIUM'}) elif prev_k >= prev_d and latest_k < latest_d and latest_k > 20: signals.append({'type': 'STOCH_DEATH_CROSS', 'action': 'SELL', 'strength': 'MEDIUM'}) except Exception as e: logger.error(f"Error generating technical signals: {e}") return signals