OKXTrading/deepseekanalyzer.py

524 lines
23 KiB
Python

import os
import logging
import json
import requests
from technicalanalyzer import TechnicalAnalyzer
logger = logging.getLogger(__name__)
class DeepSeekAnalyzer:
"""DeepSeek AI Analyzer (KEN2.0 optimized)"""
def __init__(self):
self.api_key = os.getenv('DEEPSEEK_API_KEY')
self.api_url = 'https://api.deepseek.com/v1/chat/completions'
def analyze_market(self, symbol, market_data, technical_indicators):
"""Use DeepSeek to analyze market (optimized version)"""
if not self.api_key:
logger.warning("DeepSeek API key not set, skipping AI analysis")
return None
try:
# Prepare analysis data
latest = market_data.iloc[-1]
technical_summary = self._prepare_enhanced_technical_summary(technical_indicators, market_data)
prompt = self._create_enhanced_analysis_prompt(symbol, latest, market_data, technical_summary)
response = self._call_deepseek_api(prompt)
return self._parse_deepseek_response(response)
except Exception as e:
logger.error(f"DeepSeek analysis error: {e}")
return None
def _prepare_enhanced_technical_summary(self, indicators, market_data):
"""Prepare enhanced technical indicator summary"""
summary = []
try:
# Price information
current_price = indicators.get('current_price', 0)
summary.append(f"Current price: ${current_price:.4f}")
# Multi-period RSI
rsi_7 = indicators.get('rsi_7', 50)
rsi_14 = indicators.get('rsi', 50)
rsi_21 = indicators.get('rsi_21', 50)
if hasattr(rsi_7, '__len__'):
rsi_7, rsi_14, rsi_21 = rsi_7.iloc[-1], rsi_14.iloc[-1], rsi_21.iloc[-1]
summary.append(f"RSI(7/14/21): {rsi_7:.1f}/{rsi_14:.1f}/{rsi_21:.1f}")
# RSI status analysis
rsi_status = []
for rsi_val, period in [(rsi_7, 7), (rsi_14, 14), (rsi_21, 21)]:
if rsi_val > 70:
rsi_status.append(f"RSI{period} overbought")
elif rsi_val < 30:
rsi_status.append(f"RSI{period} oversold")
if rsi_status:
summary.append(f"RSI status: {', '.join(rsi_status)}")
# KDJ indicator
k, d, j = indicators.get('kdj', (50, 50, 50))
if hasattr(k, '__len__'):
k, d, j = k.iloc[-1], d.iloc[-1], j.iloc[-1]
summary.append(f"KDJ: K={k:.1f}, D={d:.1f}, J={j:.1f}")
if j > 80:
summary.append("KDJ status: Overbought area")
elif j < 20:
summary.append("KDJ status: Oversold area")
# MACD indicator
macd_line, signal_line, histogram = indicators.get('macd', (0, 0, 0))
if hasattr(macd_line, '__len__'):
macd_line, signal_line, histogram = macd_line.iloc[-1], signal_line.iloc[-1], histogram.iloc[-1]
summary.append(f"MACD: line={macd_line:.4f}, signal={signal_line:.4f}, histogram={histogram:.4f}")
summary.append(f"MACD status: {'Golden cross' if macd_line > signal_line else 'Death cross'}")
# Moving averages
sma_20 = indicators.get('sma_20', 0)
sma_50 = indicators.get('sma_50', 0)
sma_100 = indicators.get('sma_100', 0)
if hasattr(sma_20, '__len__'):
sma_20, sma_50, sma_100 = sma_20.iloc[-1], sma_50.iloc[-1], sma_100.iloc[-1]
price_vs_ma = []
if current_price > sma_20: price_vs_ma.append("Above 20-day MA")
else: price_vs_ma.append("Below 20-day MA")
if current_price > sma_50: price_vs_ma.append("Above 50-day MA")
else: price_vs_ma.append("Below 50-day MA")
summary.append(f"Moving averages: 20-day=${sma_20:.2f}, 50-day=${sma_50:.2f}, 100-day=${sma_100:.2f}")
summary.append(f"Price position: {', '.join(price_vs_ma)}")
# Bollinger Bands
upper, middle, lower = indicators.get('bollinger_2std', (0, 0, 0))
if hasattr(upper, '__len__'):
upper, middle, lower = upper.iloc[-1], middle.iloc[-1], lower.iloc[-1]
bb_position = (current_price - lower) / (upper - lower) * 100 if (upper - lower) > 0 else 50
summary.append(f"Bollinger Band position: {bb_position:.1f}% (0%=lower band, 100%=upper band)")
# Trend strength
trend_strength = indicators.get('trend_strength', 0)
summary.append(f"Trend strength: {trend_strength:.2f} (0-1, higher means stronger trend)")
# Volatility
volatility = indicators.get('volatility', 0) * 100 # Convert to percentage
summary.append(f"Annualized volatility: {volatility:.1f}%")
# ATR
atr = indicators.get('atr', 0)
if hasattr(atr, '__len__'):
atr = atr.iloc[-1]
atr_percent = (atr / current_price * 100) if current_price > 0 else 0
summary.append(f"ATR: {atr:.4f} ({atr_percent:.2f}%)")
# Support resistance information
sr_levels = TechnicalAnalyzer.calculate_support_resistance(market_data)
if sr_levels:
summary.append(f"Static support: ${sr_levels['static_support']:.4f}, resistance: ${sr_levels['static_resistance']:.4f}")
summary.append(f"Dynamic support: ${sr_levels['dynamic_support']:.4f}, resistance: ${sr_levels['dynamic_resistance']:.4f}")
summary.append(f"Relative resistance: {sr_levels['current_vs_resistance']:+.2f}%, relative support: {sr_levels['current_vs_support']:+.2f}%")
return "\n".join(summary)
except Exception as e:
logger.error(f"Error preparing technical summary: {e}")
return "Technical indicator calculation exception"
def _create_enhanced_analysis_prompt(self, symbol, latest, market_data, technical_summary):
"""Create enhanced analysis prompt"""
# Calculate price changes
price_change_24h = 0
price_change_7d = 0
if len(market_data) >= 24:
price_change_24h = ((latest['close'] - market_data.iloc[-24]['close']) / market_data.iloc[-24]['close'] * 100)
if len(market_data) >= 168: # 7 days data (hourly)
price_change_7d = ((latest['close'] - market_data.iloc[-168]['close']) / market_data.iloc[-168]['close'] * 100)
# Calculate volume changes
volume_trend = "Rising" if latest['volume'] > market_data['volume'].mean() else "Falling"
prompt = f"""
You are a professional cryptocurrency trading AI, conducting autonomous trading in the OKX digital currency market. Please comprehensively analyze the {symbol} trading pair and execute trades.
# Core Objective
**Maximize Sharpe Ratio**
Sharpe Ratio = Average Return / Return Volatility, which means:
Quality trades (high win rate, large profit/loss ratio) → Improve Sharpe
Stable returns, control drawdowns → Improve Sharpe
Frequent trading, small profits and losses → Increase volatility, severely reduce Sharpe
【Market Overview】
- Current price: ${latest['close']:.4f}
- 24-hour change: {price_change_24h:+.2f}%
- 7-day change: {price_change_7d:+.2f}%
- Volume: {latest['volume']:.0f} ({volume_trend})
- Volume relative level: {'High' if latest['volume'] > market_data['volume'].quantile(0.7) else 'Normal' if latest['volume'] > market_data['volume'].quantile(0.3) else 'Low'}
【Multi-dimensional Technical Analysis】
{technical_summary}
【Market Environment Assessment】
Please consider comprehensively:
1. Trend direction and strength
2. Overbought/oversold status
3. Volume coordination
4. Volatility level
5. Support resistance positions
6. Multi-timeframe consistency
【Risk Management Suggestions】
- Suggested position: Adjust based on signal strength and volatility
- Stop-loss setting: Reference ATR and support levels
- Holding time: Based on trend strength
【Output Format Requirements】
Please reply strictly in the following JSON format:
{{
"action": "BUY/SELL/HOLD",
"confidence": "HIGH/MEDIUM/LOW",
"position_size": "Suggested position ratio (0.1-0.5)",
"stop_loss": "Suggested stop-loss price or percentage",
"take_profit": "Suggested take-profit price or percentage",
"timeframe": "Suggested holding time (SHORT/MEDIUM/LONG)",
"reasoning": "Detailed analysis logic and risk explanation"
}}
Please provide objective suggestions based on professional technical analysis and risk management principles.
"""
return prompt
def analyze_sell_proportion(self, symbol, position_info, market_analysis, current_price):
"""Analyze sell proportion"""
if not self.api_key:
logger.warning("DeepSeek API key not set, using default sell proportion")
return self._get_fallback_sell_proportion(position_info, 'MEDIUM')
try:
prompt = self._create_sell_proportion_prompt(symbol, position_info, market_analysis, current_price)
response = self._call_deepseek_api(prompt)
return self._parse_sell_proportion_response(response)
except Exception as e:
logger.error(f"DeepSeek sell proportion analysis error: {e}")
return self._get_fallback_sell_proportion(position_info, 'MEDIUM')
def _create_sell_proportion_prompt(self, symbol, position_info, market_analysis, current_price):
"""Create sell proportion analysis prompt"""
# Calculate profit situation
entry_price = position_info['entry_price']
profit_ratio = (current_price - entry_price) / entry_price if entry_price > 0 else 0
profit_status = "Profit" if profit_ratio > 0 else "Loss"
profit_amount = (current_price - entry_price) * position_info['base_amount']
# Calculate position ratio
position_ratio = position_info['position_ratio'] * 100 # Convert to percentage
# Prepare technical indicator summary
technical_summary = self._prepare_enhanced_technical_summary(
market_analysis['technical_indicators'],
market_analysis['market_data']
)
prompt = f"""
You are a professional cryptocurrency trading AI, conducting autonomous trading in the OKX digital currency market. Analyze the {symbol} position situation and consider and execute sell decisions.
【Current Position Situation】
- Position quantity: {position_info['base_amount']:.8f}
- Average entry price: ${entry_price:.4f}
- Current price: ${current_price:.4f}
- Profit ratio: {profit_ratio:+.2%} ({profit_status})
- Profit amount: ${profit_amount:+.2f}
- Position ratio: {position_ratio:.2f}% (percentage of total portfolio)
- Position value: ${position_info['position_value']:.2f}
【Market Technical Analysis】
{technical_summary}
【Risk Situation】
- Current price distance to stop-loss: {position_info['distance_to_stop_loss']:.2%}
- Current price distance to take-profit: {position_info['distance_to_take_profit']:.2%}
- Market volatility: {position_info['market_volatility']:.2%}
- Trend strength: {position_info.get('trend_strength', 0):.2f}
【Sell Strategy Considerations】
Please consider the following factors to determine sell proportion:
1. Profit ratio: Large profits can consider partial profit-taking, keep some position for higher gains
2. Position ratio: Overweight positions should reduce position to lower risk, light positions can consider holding
3. Technical signals: Strong bearish signals should increase sell proportion, bullish signals can reduce sell proportion
4. Market environment: High volatility markets should be more conservative, low volatility markets can be more aggressive
5. Risk control: Close to stop-loss should sell decisively, far from stop-loss can be more flexible
【Position Management Principles】
- Profit over 20%: Consider partial profit-taking (30-70%), lock in profits
- Profit 10-20%: Decide based on signal strength (20-50%)
- Small profit (0-10%): Mainly based on technical signals (0-30%)
- Small loss (0-5%): Based on risk control (50-80%)
- Large loss (>5%): Consider stop-loss or position reduction (80-100%)
【Output Format Requirements】
Please reply strictly in the following JSON format, only containing numeric ratio (0-1):
{{
"sell_proportion": 0.75
}}
Explanation:
- 0.1 means sell 10% of position
- 0.5 means sell 50% of position
- 1.0 means sell all position
- 0.0 means don't sell (only in extremely bullish situations)
Please provide reasonable sell proportion suggestions based on professional analysis and risk management.
"""
return prompt
def _parse_sell_proportion_response(self, response):
"""Parse sell proportion response"""
try:
content = response['choices'][0]['message']['content']
# Try to parse JSON format
if '{' in content and '}' in content:
json_start = content.find('{')
json_end = content.rfind('}') + 1
json_str = content[json_start:json_end]
parsed = json.loads(json_str)
proportion = parsed.get('sell_proportion', 0.5)
# Ensure proportion is within reasonable range
proportion = max(0.0, min(1.0, proportion))
logger.info(f"DeepSeek suggested sell proportion: {proportion:.1%}")
return proportion
else:
# Text analysis: try to extract proportion from text
import re
patterns = [
r'[\"\"]([\d.]+)%[\"\"]', # "50%"
r'sell\s*([\d.]+)%', # sell 50%
r'([\d.]+)%', # 50%
r'[\"\"]([\d.]+)[\"\"]', # "0.5"
]
for pattern in patterns:
match = re.search(pattern, content)
if match:
value = float(match.group(1))
if value > 1: # If in percentage format
value = value / 100
proportion = max(0.0, min(1.0, value))
logger.info(f"Parsed sell proportion from text: {proportion:.1%}")
return proportion
# Default value
logger.warning("Unable to parse sell proportion, using default 50%")
return 0.5
except Exception as e:
logger.error(f"Error parsing sell proportion response: {e}")
return 0.5
def _get_fallback_sell_proportion(self, position_info, decision_confidence):
"""Fallback sell proportion decision (when DeepSeek is unavailable)"""
profit_ratio = position_info['profit_ratio']
position_ratio = position_info['position_ratio']
market_volatility = position_info.get('market_volatility', 0)
trend_strength = position_info.get('trend_strength', 0)
# Decision based on profit situation
if profit_ratio >= 0.20: # Profit over 20%
base_proportion = 0.6 if decision_confidence == 'HIGH' else 0.4
elif profit_ratio >= 0.10: # Profit 10%-20%
base_proportion = 0.5 if decision_confidence == 'HIGH' else 0.3
elif profit_ratio >= 0: # Profit 0%-10%
base_proportion = 0.4 if decision_confidence == 'HIGH' else 0.2
elif profit_ratio >= -0.05: # Loss 0%-5%
base_proportion = 0.7 if decision_confidence == 'HIGH' else 0.5
else: # Loss over 5%
base_proportion = 0.9 if decision_confidence == 'HIGH' else 0.7
# Adjustment based on position ratio
if position_ratio > 0.15: # Overweight position
base_proportion = min(1.0, base_proportion + 0.2)
elif position_ratio < 0.05: # Very light position
base_proportion = max(0.1, base_proportion - 0.1)
# Adjustment based on market volatility
if market_volatility > 0.05: # High volatility market
base_proportion = min(1.0, base_proportion + 0.1)
elif market_volatility < 0.02: # Low volatility market
base_proportion = max(0.1, base_proportion - 0.1)
# Adjustment based on trend strength
if trend_strength > 0.7: # Strong trend
base_proportion = max(0.1, base_proportion - 0.1)
elif trend_strength < 0.3: # Weak trend
base_proportion = min(1.0, base_proportion + 0.1)
# Ensure proportion is within reasonable range
base_proportion = max(0.1, min(1.0, base_proportion))
logger.info(f"Fallback sell proportion decision: profit={profit_ratio:+.2%}, position ratio={position_ratio:.2%}, "
f"volatility={market_volatility:.2%}, trend strength={trend_strength:.2f}, final proportion={base_proportion:.1%}")
return base_proportion
def _call_deepseek_api(self, prompt, max_retries=3):
"""Call DeepSeek API (improved retry mechanism)"""
headers = {
'Authorization': f'Bearer {self.api_key}',
'Content-Type': 'application/json'
}
data = {
'model': 'deepseek-chat',
'messages': [
{
'role': 'system',
'content': 'You are a professional cryptocurrency trading AI, conducting autonomous trading in the OKX digital currency market, pursuing stable and sustainable returns'
},
{
'role': 'user',
'content': prompt
}
],
'temperature': 0.3,
'max_tokens': 1000
}
for attempt in range(max_retries):
try:
response = requests.post(
self.api_url,
headers=headers,
json=data,
timeout=30 # 30 second timeout
)
if response.status_code == 200:
return response.json()
elif response.status_code == 429: # Rate limit
wait_time = 2 ** attempt # Exponential backoff
logger.warning(f"DeepSeek API rate limit, waiting {wait_time} seconds before retry")
time.sleep(wait_time)
continue
else:
logger.error(f"DeepSeek API call failed: {response.status_code} - {response.text}")
if attempt < max_retries - 1:
time.sleep(1)
continue
else:
raise Exception(f"API call failed: {response.status_code}")
except requests.exceptions.Timeout:
logger.warning(f"DeepSeek API request timeout, attempt {attempt + 1}/{max_retries}")
if attempt < max_retries - 1:
time.sleep(1)
continue
else:
raise Exception("DeepSeek API request timeout")
except Exception as e:
logger.error(f"DeepSeek API call exception: {e}")
if attempt < max_retries - 1:
time.sleep(1)
continue
else:
raise
def _parse_deepseek_response(self, response):
"""Parse DeepSeek response - enhanced version"""
try:
content = response['choices'][0]['message']['content'].strip()
logger.debug(f"DeepSeek raw response: {content}")
# Remove possible code block markers
if content.startswith('```json'):
content = content[7:]
if content.endswith('```'):
content = content[:-3]
content = content.strip()
# Try to parse JSON format
if '{' in content and '}' in content:
json_start = content.find('{')
json_end = content.rfind('}') + 1
json_str = content[json_start:json_end]
try:
parsed = json.loads(json_str)
result = {
'action': parsed.get('action', 'HOLD').upper(),
'confidence': parsed.get('confidence', 'MEDIUM').upper(),
'reasoning': parsed.get('reasoning', ''),
'raw_response': content
}
# Validate action and confidence legality
if result['action'] not in ['BUY', 'SELL', 'HOLD']:
logger.warning(f"AI returned illegal action: {result['action']}, using HOLD")
result['action'] = 'HOLD'
if result['confidence'] not in ['HIGH', 'MEDIUM', 'LOW']:
logger.warning(f"AI returned illegal confidence: {result['confidence']}, using MEDIUM")
result['confidence'] = 'MEDIUM'
logger.info(f"AI parsing successful: {result['action']} (confidence: {result['confidence']})")
return result
except json.JSONDecodeError as e:
logger.warning(f"JSON parsing failed: {e}, trying text analysis")
return self._parse_text_response(content)
else:
# Text analysis
return self._parse_text_response(content)
except Exception as e:
logger.error(f"Error parsing DeepSeek response: {e}")
# Return default values to avoid system crash
return {
'action': 'HOLD',
'confidence': 'MEDIUM',
'reasoning': f'Parsing error: {str(e)}',
'raw_response': str(response)
}
def _parse_text_response(self, text):
"""Parse text response - enhanced version"""
text_lower = text.lower()
# More precise action recognition
action = 'HOLD'
if any(word in text_lower for word in ['买入', '做多', 'buy', 'long', '上涨', '看涨']):
action = 'BUY'
elif any(word in text_lower for word in ['卖出', '做空', 'sell', 'short', '下跌', '看跌']):
action = 'SELL'
# More precise confidence level recognition
confidence = 'MEDIUM'
if any(word in text_lower for word in ['强烈', '高信心', 'high', '非常', '强烈建议']):
confidence = 'HIGH'
elif any(word in text_lower for word in ['', '低信心', 'low', '轻微', '谨慎']):
confidence = 'LOW'
logger.info(f"Text parsing result: {action} (confidence: {confidence})")
return {
'action': action,
'confidence': confidence,
'reasoning': text,
'raw_response': text
}