import os import logging import json import requests from technicalanalyzer import TechnicalAnalyzer logger = logging.getLogger(__name__) class DeepSeekAnalyzer: """DeepSeek AI Analyzer (KEN2.0 optimized)""" def __init__(self): self.api_key = os.getenv('DEEPSEEK_API_KEY') self.api_url = 'https://api.deepseek.com/v1/chat/completions' def analyze_market(self, symbol, market_data, technical_indicators): """Use DeepSeek to analyze market (optimized version)""" if not self.api_key: logger.warning("DeepSeek API key not set, skipping AI analysis") return None try: # Prepare analysis data latest = market_data.iloc[-1] technical_summary = self._prepare_enhanced_technical_summary(technical_indicators, market_data) prompt = self._create_enhanced_analysis_prompt(symbol, latest, market_data, technical_summary) response = self._call_deepseek_api(prompt) return self._parse_deepseek_response(response) except Exception as e: logger.error(f"DeepSeek analysis error: {e}") return None def _prepare_enhanced_technical_summary(self, indicators, market_data): """Prepare enhanced technical indicator summary""" summary = [] try: # Price information current_price = indicators.get('current_price', 0) summary.append(f"Current price: ${current_price:.4f}") # Multi-period RSI rsi_7 = indicators.get('rsi_7', 50) rsi_14 = indicators.get('rsi', 50) rsi_21 = indicators.get('rsi_21', 50) if hasattr(rsi_7, '__len__'): rsi_7, rsi_14, rsi_21 = rsi_7.iloc[-1], rsi_14.iloc[-1], rsi_21.iloc[-1] summary.append(f"RSI(7/14/21): {rsi_7:.1f}/{rsi_14:.1f}/{rsi_21:.1f}") # RSI status analysis rsi_status = [] for rsi_val, period in [(rsi_7, 7), (rsi_14, 14), (rsi_21, 21)]: if rsi_val > 70: rsi_status.append(f"RSI{period} overbought") elif rsi_val < 30: rsi_status.append(f"RSI{period} oversold") if rsi_status: summary.append(f"RSI status: {', '.join(rsi_status)}") # KDJ indicator k, d, j = indicators.get('kdj', (50, 50, 50)) if hasattr(k, '__len__'): k, d, j = k.iloc[-1], d.iloc[-1], j.iloc[-1] summary.append(f"KDJ: K={k:.1f}, D={d:.1f}, J={j:.1f}") if j > 80: summary.append("KDJ status: Overbought area") elif j < 20: summary.append("KDJ status: Oversold area") # MACD indicator macd_line, signal_line, histogram = indicators.get('macd', (0, 0, 0)) if hasattr(macd_line, '__len__'): macd_line, signal_line, histogram = macd_line.iloc[-1], signal_line.iloc[-1], histogram.iloc[-1] summary.append(f"MACD: line={macd_line:.4f}, signal={signal_line:.4f}, histogram={histogram:.4f}") summary.append(f"MACD status: {'Golden cross' if macd_line > signal_line else 'Death cross'}") # Moving averages sma_20 = indicators.get('sma_20', 0) sma_50 = indicators.get('sma_50', 0) sma_100 = indicators.get('sma_100', 0) if hasattr(sma_20, '__len__'): sma_20, sma_50, sma_100 = sma_20.iloc[-1], sma_50.iloc[-1], sma_100.iloc[-1] price_vs_ma = [] if current_price > sma_20: price_vs_ma.append("Above 20-day MA") else: price_vs_ma.append("Below 20-day MA") if current_price > sma_50: price_vs_ma.append("Above 50-day MA") else: price_vs_ma.append("Below 50-day MA") summary.append(f"Moving averages: 20-day=${sma_20:.2f}, 50-day=${sma_50:.2f}, 100-day=${sma_100:.2f}") summary.append(f"Price position: {', '.join(price_vs_ma)}") # Bollinger Bands upper, middle, lower = indicators.get('bollinger_2std', (0, 0, 0)) if hasattr(upper, '__len__'): upper, middle, lower = upper.iloc[-1], middle.iloc[-1], lower.iloc[-1] bb_position = (current_price - lower) / (upper - lower) * 100 if (upper - lower) > 0 else 50 summary.append(f"Bollinger Band position: {bb_position:.1f}% (0%=lower band, 100%=upper band)") # Trend strength trend_strength = indicators.get('trend_strength', 0) summary.append(f"Trend strength: {trend_strength:.2f} (0-1, higher means stronger trend)") # Volatility volatility = indicators.get('volatility', 0) * 100 # Convert to percentage summary.append(f"Annualized volatility: {volatility:.1f}%") # ATR atr = indicators.get('atr', 0) if hasattr(atr, '__len__'): atr = atr.iloc[-1] atr_percent = (atr / current_price * 100) if current_price > 0 else 0 summary.append(f"ATR: {atr:.4f} ({atr_percent:.2f}%)") # Support resistance information sr_levels = TechnicalAnalyzer.calculate_support_resistance(market_data) if sr_levels: summary.append(f"Static support: ${sr_levels['static_support']:.4f}, resistance: ${sr_levels['static_resistance']:.4f}") summary.append(f"Dynamic support: ${sr_levels['dynamic_support']:.4f}, resistance: ${sr_levels['dynamic_resistance']:.4f}") summary.append(f"Relative resistance: {sr_levels['current_vs_resistance']:+.2f}%, relative support: {sr_levels['current_vs_support']:+.2f}%") return "\n".join(summary) except Exception as e: logger.error(f"Error preparing technical summary: {e}") return "Technical indicator calculation exception" def _create_enhanced_analysis_prompt(self, symbol, latest, market_data, technical_summary): """Create enhanced analysis prompt""" # Calculate price changes price_change_24h = 0 price_change_7d = 0 if len(market_data) >= 24: price_change_24h = ((latest['close'] - market_data.iloc[-24]['close']) / market_data.iloc[-24]['close'] * 100) if len(market_data) >= 168: # 7 days data (hourly) price_change_7d = ((latest['close'] - market_data.iloc[-168]['close']) / market_data.iloc[-168]['close'] * 100) # Calculate volume changes volume_trend = "Rising" if latest['volume'] > market_data['volume'].mean() else "Falling" prompt = f""" You are a professional cryptocurrency trading AI, conducting autonomous trading in the OKX digital currency market. Please comprehensively analyze the {symbol} trading pair and execute trades. # Core Objective **Maximize Sharpe Ratio** Sharpe Ratio = Average Return / Return Volatility, which means: Quality trades (high win rate, large profit/loss ratio) → Improve Sharpe Stable returns, control drawdowns → Improve Sharpe Frequent trading, small profits and losses → Increase volatility, severely reduce Sharpe 【Market Overview】 - Current price: ${latest['close']:.4f} - 24-hour change: {price_change_24h:+.2f}% - 7-day change: {price_change_7d:+.2f}% - Volume: {latest['volume']:.0f} ({volume_trend}) - Volume relative level: {'High' if latest['volume'] > market_data['volume'].quantile(0.7) else 'Normal' if latest['volume'] > market_data['volume'].quantile(0.3) else 'Low'} 【Multi-dimensional Technical Analysis】 {technical_summary} 【Market Environment Assessment】 Please consider comprehensively: 1. Trend direction and strength 2. Overbought/oversold status 3. Volume coordination 4. Volatility level 5. Support resistance positions 6. Multi-timeframe consistency 【Risk Management Suggestions】 - Suggested position: Adjust based on signal strength and volatility - Stop-loss setting: Reference ATR and support levels - Holding time: Based on trend strength 【Output Format Requirements】 Please reply strictly in the following JSON format: {{ "action": "BUY/SELL/HOLD", "confidence": "HIGH/MEDIUM/LOW", "position_size": "Suggested position ratio (0.1-0.5)", "stop_loss": "Suggested stop-loss price or percentage", "take_profit": "Suggested take-profit price or percentage", "timeframe": "Suggested holding time (SHORT/MEDIUM/LONG)", "reasoning": "Detailed analysis logic and risk explanation" }} Please provide objective suggestions based on professional technical analysis and risk management principles. """ return prompt def analyze_sell_proportion(self, symbol, position_info, market_analysis, current_price): """Analyze sell proportion""" if not self.api_key: logger.warning("DeepSeek API key not set, using default sell proportion") return self._get_fallback_sell_proportion(position_info, 'MEDIUM') try: prompt = self._create_sell_proportion_prompt(symbol, position_info, market_analysis, current_price) response = self._call_deepseek_api(prompt) return self._parse_sell_proportion_response(response) except Exception as e: logger.error(f"DeepSeek sell proportion analysis error: {e}") return self._get_fallback_sell_proportion(position_info, 'MEDIUM') def _create_sell_proportion_prompt(self, symbol, position_info, market_analysis, current_price): """Create sell proportion analysis prompt""" # Calculate profit situation entry_price = position_info['entry_price'] profit_ratio = (current_price - entry_price) / entry_price if entry_price > 0 else 0 profit_status = "Profit" if profit_ratio > 0 else "Loss" profit_amount = (current_price - entry_price) * position_info['base_amount'] # Calculate position ratio position_ratio = position_info['position_ratio'] * 100 # Convert to percentage # Prepare technical indicator summary technical_summary = self._prepare_enhanced_technical_summary( market_analysis['technical_indicators'], market_analysis['market_data'] ) prompt = f""" You are a professional cryptocurrency trading AI, conducting autonomous trading in the OKX digital currency market. Analyze the {symbol} position situation and consider and execute sell decisions. 【Current Position Situation】 - Position quantity: {position_info['base_amount']:.8f} - Average entry price: ${entry_price:.4f} - Current price: ${current_price:.4f} - Profit ratio: {profit_ratio:+.2%} ({profit_status}) - Profit amount: ${profit_amount:+.2f} - Position ratio: {position_ratio:.2f}% (percentage of total portfolio) - Position value: ${position_info['position_value']:.2f} 【Market Technical Analysis】 {technical_summary} 【Risk Situation】 - Current price distance to stop-loss: {position_info['distance_to_stop_loss']:.2%} - Current price distance to take-profit: {position_info['distance_to_take_profit']:.2%} - Market volatility: {position_info['market_volatility']:.2%} - Trend strength: {position_info.get('trend_strength', 0):.2f} 【Sell Strategy Considerations】 Please consider the following factors to determine sell proportion: 1. Profit ratio: Large profits can consider partial profit-taking, keep some position for higher gains 2. Position ratio: Overweight positions should reduce position to lower risk, light positions can consider holding 3. Technical signals: Strong bearish signals should increase sell proportion, bullish signals can reduce sell proportion 4. Market environment: High volatility markets should be more conservative, low volatility markets can be more aggressive 5. Risk control: Close to stop-loss should sell decisively, far from stop-loss can be more flexible 【Position Management Principles】 - Profit over 20%: Consider partial profit-taking (30-70%), lock in profits - Profit 10-20%: Decide based on signal strength (20-50%) - Small profit (0-10%): Mainly based on technical signals (0-30%) - Small loss (0-5%): Based on risk control (50-80%) - Large loss (>5%): Consider stop-loss or position reduction (80-100%) 【Output Format Requirements】 Please reply strictly in the following JSON format, only containing numeric ratio (0-1): {{ "sell_proportion": 0.75 }} Explanation: - 0.1 means sell 10% of position - 0.5 means sell 50% of position - 1.0 means sell all position - 0.0 means don't sell (only in extremely bullish situations) Please provide reasonable sell proportion suggestions based on professional analysis and risk management. """ return prompt def _parse_sell_proportion_response(self, response): """Parse sell proportion response""" try: content = response['choices'][0]['message']['content'] # Try to parse JSON format if '{' in content and '}' in content: json_start = content.find('{') json_end = content.rfind('}') + 1 json_str = content[json_start:json_end] parsed = json.loads(json_str) proportion = parsed.get('sell_proportion', 0.5) # Ensure proportion is within reasonable range proportion = max(0.0, min(1.0, proportion)) logger.info(f"DeepSeek suggested sell proportion: {proportion:.1%}") return proportion else: # Text analysis: try to extract proportion from text import re patterns = [ r'[\"\"]([\d.]+)%[\"\"]', # "50%" r'sell\s*([\d.]+)%', # sell 50% r'([\d.]+)%', # 50% r'[\"\"]([\d.]+)[\"\"]', # "0.5" ] for pattern in patterns: match = re.search(pattern, content) if match: value = float(match.group(1)) if value > 1: # If in percentage format value = value / 100 proportion = max(0.0, min(1.0, value)) logger.info(f"Parsed sell proportion from text: {proportion:.1%}") return proportion # Default value logger.warning("Unable to parse sell proportion, using default 50%") return 0.5 except Exception as e: logger.error(f"Error parsing sell proportion response: {e}") return 0.5 def _get_fallback_sell_proportion(self, position_info, decision_confidence): """Fallback sell proportion decision (when DeepSeek is unavailable)""" profit_ratio = position_info['profit_ratio'] position_ratio = position_info['position_ratio'] market_volatility = position_info.get('market_volatility', 0) trend_strength = position_info.get('trend_strength', 0) # Decision based on profit situation if profit_ratio >= 0.20: # Profit over 20% base_proportion = 0.6 if decision_confidence == 'HIGH' else 0.4 elif profit_ratio >= 0.10: # Profit 10%-20% base_proportion = 0.5 if decision_confidence == 'HIGH' else 0.3 elif profit_ratio >= 0: # Profit 0%-10% base_proportion = 0.4 if decision_confidence == 'HIGH' else 0.2 elif profit_ratio >= -0.05: # Loss 0%-5% base_proportion = 0.7 if decision_confidence == 'HIGH' else 0.5 else: # Loss over 5% base_proportion = 0.9 if decision_confidence == 'HIGH' else 0.7 # Adjustment based on position ratio if position_ratio > 0.15: # Overweight position base_proportion = min(1.0, base_proportion + 0.2) elif position_ratio < 0.05: # Very light position base_proportion = max(0.1, base_proportion - 0.1) # Adjustment based on market volatility if market_volatility > 0.05: # High volatility market base_proportion = min(1.0, base_proportion + 0.1) elif market_volatility < 0.02: # Low volatility market base_proportion = max(0.1, base_proportion - 0.1) # Adjustment based on trend strength if trend_strength > 0.7: # Strong trend base_proportion = max(0.1, base_proportion - 0.1) elif trend_strength < 0.3: # Weak trend base_proportion = min(1.0, base_proportion + 0.1) # Ensure proportion is within reasonable range base_proportion = max(0.1, min(1.0, base_proportion)) logger.info(f"Fallback sell proportion decision: profit={profit_ratio:+.2%}, position ratio={position_ratio:.2%}, " f"volatility={market_volatility:.2%}, trend strength={trend_strength:.2f}, final proportion={base_proportion:.1%}") return base_proportion def _call_deepseek_api(self, prompt, max_retries=3): """Call DeepSeek API (improved retry mechanism)""" headers = { 'Authorization': f'Bearer {self.api_key}', 'Content-Type': 'application/json' } data = { 'model': 'deepseek-chat', 'messages': [ { 'role': 'system', 'content': 'You are a professional cryptocurrency trading AI, conducting autonomous trading in the OKX digital currency market, pursuing stable and sustainable returns' }, { 'role': 'user', 'content': prompt } ], 'temperature': 0.3, 'max_tokens': 1000 } for attempt in range(max_retries): try: response = requests.post( self.api_url, headers=headers, json=data, timeout=30 # 30 second timeout ) if response.status_code == 200: return response.json() elif response.status_code == 429: # Rate limit wait_time = 2 ** attempt # Exponential backoff logger.warning(f"DeepSeek API rate limit, waiting {wait_time} seconds before retry") time.sleep(wait_time) continue else: logger.error(f"DeepSeek API call failed: {response.status_code} - {response.text}") if attempt < max_retries - 1: time.sleep(1) continue else: raise Exception(f"API call failed: {response.status_code}") except requests.exceptions.Timeout: logger.warning(f"DeepSeek API request timeout, attempt {attempt + 1}/{max_retries}") if attempt < max_retries - 1: time.sleep(1) continue else: raise Exception("DeepSeek API request timeout") except Exception as e: logger.error(f"DeepSeek API call exception: {e}") if attempt < max_retries - 1: time.sleep(1) continue else: raise def _parse_deepseek_response(self, response): """Parse DeepSeek response - enhanced version""" try: content = response['choices'][0]['message']['content'].strip() logger.debug(f"DeepSeek raw response: {content}") # Remove possible code block markers if content.startswith('```json'): content = content[7:] if content.endswith('```'): content = content[:-3] content = content.strip() # Try to parse JSON format if '{' in content and '}' in content: json_start = content.find('{') json_end = content.rfind('}') + 1 json_str = content[json_start:json_end] try: parsed = json.loads(json_str) result = { 'action': parsed.get('action', 'HOLD').upper(), 'confidence': parsed.get('confidence', 'MEDIUM').upper(), 'reasoning': parsed.get('reasoning', ''), 'raw_response': content } # Validate action and confidence legality if result['action'] not in ['BUY', 'SELL', 'HOLD']: logger.warning(f"AI returned illegal action: {result['action']}, using HOLD") result['action'] = 'HOLD' if result['confidence'] not in ['HIGH', 'MEDIUM', 'LOW']: logger.warning(f"AI returned illegal confidence: {result['confidence']}, using MEDIUM") result['confidence'] = 'MEDIUM' logger.info(f"AI parsing successful: {result['action']} (confidence: {result['confidence']})") return result except json.JSONDecodeError as e: logger.warning(f"JSON parsing failed: {e}, trying text analysis") return self._parse_text_response(content) else: # Text analysis return self._parse_text_response(content) except Exception as e: logger.error(f"Error parsing DeepSeek response: {e}") # Return default values to avoid system crash return { 'action': 'HOLD', 'confidence': 'MEDIUM', 'reasoning': f'Parsing error: {str(e)}', 'raw_response': str(response) } def _parse_text_response(self, text): """Parse text response - enhanced version""" text_lower = text.lower() # More precise action recognition action = 'HOLD' if any(word in text_lower for word in ['买入', '做多', 'buy', 'long', '上涨', '看涨']): action = 'BUY' elif any(word in text_lower for word in ['卖出', '做空', 'sell', 'short', '下跌', '看跌']): action = 'SELL' # More precise confidence level recognition confidence = 'MEDIUM' if any(word in text_lower for word in ['强烈', '高信心', 'high', '非常', '强烈建议']): confidence = 'HIGH' elif any(word in text_lower for word in ['低', '低信心', 'low', '轻微', '谨慎']): confidence = 'LOW' logger.info(f"Text parsing result: {action} (confidence: {confidence})") return { 'action': action, 'confidence': confidence, 'reasoning': text, 'raw_response': text }