import os import time import threading import logging import pandas as pd import numpy as np import requests import json import hashlib import hmac import base64 import schedule import okx.Account as Account import okx.MarketData as MarketData import okx.Trade as Trade from datetime import datetime, timezone, timedelta from dotenv import load_dotenv from logging.handlers import RotatingFileHandler from database_manager import DatabaseManager import atexit import signal tdmode = 'cross' # Load environment variables load_dotenv() def setup_logging(): """Configure logging system""" # Set environment variable to ensure UTF-8 encoding os.environ['PYTHONIOENCODING'] = 'utf-8' # Create formatter formatter = logging.Formatter( '%(asctime)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) # File handler - using UTF-8 encoding file_handler = RotatingFileHandler( 'trading_system.log', maxBytes=10 * 1024 * 1024, backupCount=5, encoding='utf-8' ) file_handler.setFormatter(formatter) file_handler.setLevel(logging.INFO) # Console handler - handle encoding issues class SafeStreamHandler(logging.StreamHandler): def emit(self, record): try: msg = self.format(record) stream = self.stream if hasattr(stream, 'buffer'): stream.buffer.write(msg.encode('utf-8') + b'\n') stream.buffer.flush() else: stream.write(msg + self.terminator) self.flush() except UnicodeEncodeError: safe_msg = msg.encode('ascii', 'ignore').decode('ascii') stream.write(safe_msg + self.terminator) self.flush() except Exception: self.handleError(record) console_handler = SafeStreamHandler() console_handler.setFormatter(formatter) console_handler.setLevel(logging.INFO) # Configure root logger logging.basicConfig( level=logging.INFO, handlers=[file_handler, console_handler] ) return logging.getLogger(__name__) logger = setup_logging() class APIRateLimiter: """API rate limiter""" def __init__(self, calls_per_second=2): self.min_interval = 1.0 / calls_per_second self.last_call = {} self.lock = threading.Lock() def wait(self, api_name): """Wait until API can be called""" with self.lock: current_time = time.time() if api_name in self.last_call: elapsed = current_time - self.last_call[api_name] if elapsed < self.min_interval: time.sleep(self.min_interval - elapsed) self.last_call[api_name] = current_time class OKXAPIClient: """OKX API Client (using official SDK)""" def __init__(self): self.api_key = os.getenv('OKX_API_KEY') self.secret_key = os.getenv('OKX_SECRET_KEY') self.password = os.getenv('OKX_PASSWORD') if not all([self.api_key, self.secret_key, self.password]): raise ValueError("Please set OKX API key and password") # Initialize OKX SDK client - using live trading environment Flag = "1" # Live trading environment self.account_api = Account.AccountAPI(self.api_key, self.secret_key, self.password, False, Flag) self.market_api = MarketData.MarketAPI(self.api_key, self.secret_key, self.password, False, Flag) self.trade_api = Trade.TradeAPI(self.api_key, self.secret_key, self.password, False, Flag) # API rate limiting self.rate_limiter = APIRateLimiter(2) self.log_http = False # Cache instrument info self.instrument_cache = {} def get_market_data(self, symbol, timeframe='1H', limit=200): """Get market data""" self.rate_limiter.wait("market_data") try: result = self.market_api.get_candlesticks( instId=symbol, bar=timeframe, limit=str(limit) ) if self.log_http: logger.debug(f"HTTP Request: GET {symbol} {timeframe} {result.get('code', 'Unknown')}") # Error checking if 'code' not in result or result['code'] != '0': error_msg = result.get('msg', 'Unknown error') error_code = result.get('code', 'Unknown code') logger.error(f"Failed to get {symbol} market data: {error_msg} (code: {error_code})") return None # Check if data exists if 'data' not in result or not result['data']: logger.warning(f"{symbol} market data is empty") return None # Create DataFrame columns = ['timestamp', 'open', 'high', 'low', 'close', 'volume', 'volCcy', 'volCcyQuote', 'confirm'] df = pd.DataFrame(result['data'], columns=columns) # Convert data types df['timestamp'] = pd.to_datetime(df['timestamp'].astype(np.int64), unit='ms') numeric_cols = ['open', 'high', 'low', 'close', 'volume', 'volCcy', 'volCcyQuote'] df[numeric_cols] = df[numeric_cols].apply(pd.to_numeric) return df.sort_values('timestamp') except Exception as e: logger.error(f"Error getting market data: {e}") return None def get_account_balance(self): """Get account balance (USDT)""" self.rate_limiter.wait("balance") try: result = self.account_api.get_account_balance() if self.log_http: logger.debug(f"HTTP Request: GET balance {result['code']}") # Correction: code "0" indicates success if result['code'] != '0': logger.error(f"Failed to get balance: {result['msg']} (code: {result['code']})") return None # Extract USDT balance for currency in result['data'][0]['details']: if currency['ccy'] == 'USDT': return float(currency['availBal']) return 0.0 except Exception as e: logger.error(f"Error getting balance: {e}") return None def get_currency_balances(self): """Get all currency balances""" self.rate_limiter.wait("balances") try: result = self.account_api.get_account_balance() # Correction: code "0" indicates success if result['code'] != '0': logger.error(f"Failed to get balance: {result['msg']} (code: {result['code']})") return {} # Check response data structure if not result['data'] or len(result['data']) == 0: logger.error("No balance data in API response") return {} # Check if details field exists if 'details' not in result['data'][0]: logger.error("No details field in API response") return {} balances = {} for currency in result['data'][0]['details']: if float(currency.get('availBal', 0)) > 0: balances[currency['ccy']] = { 'amount': float(currency.get('availBal', 0)), 'frozen': float(currency.get('frozenBal', 0)) } return balances except Exception as e: logger.error(f"Error getting currency balances: {e}") return {} def get_positions(self): """Get exchange position information (based on currency balances)""" try: # Get all currency balances balances = self.get_currency_balances() if not balances: return {} # Filter out non-USDT currencies as positions positions = {} for currency, balance in balances.items(): if currency != 'USDT' and balance['amount'] > 0: # Construct trading pair symbol symbol = f"{currency}-USDT" # Get current price to calculate position value current_price = self.get_current_price(symbol) if current_price: positions[symbol] = { 'amount': balance['amount'], 'value': balance['amount'] * current_price, 'avg_price': 0.0 # Spot positions don't have average price concept } else: positions[symbol] = { 'amount': balance['amount'], 'value': 0.0, 'avg_price': 0.0 } return positions except Exception as e: logger.error(f"Error getting positions: {e}") return {} def get_current_price(self, symbol): """Get current price""" self.rate_limiter.wait("price") try: result = self.market_api.get_ticker(instId=symbol) # Correction: code "0" indicates success if result['code'] != '0': logger.error(f"Failed to get price: {result['msg']} (code: {result['code']})") return None return float(result['data'][0]['last']) except Exception as e: logger.error(f"Error getting price: {e}") return None def get_instrument_info(self, symbol): """Get trading pair information""" if symbol in self.instrument_cache: return self.instrument_cache[symbol] self.rate_limiter.wait("instrument") try: result = self.account_api.get_instruments(instType='SPOT') if result['code'] != '0': logger.error(f"Failed to get instrument: {result['msg']} (code: {result['code']})") return None, None # Find specified trading pair for inst in result['data']: if inst['instId'] == symbol: min_sz = float(inst['minSz']) lot_sz = float(inst['lotSz']) logger.debug(f"Got {symbol} precision: minSz={min_sz}, lotSz={lot_sz}") self.instrument_cache[symbol] = (min_sz, lot_sz) return min_sz, lot_sz logger.error(f"Trading pair not found: {symbol}") return None, None except Exception as e: logger.error(f"Error getting instrument info: {e}") return None, None def get_default_min_size(self, symbol): """Get default minimum order size""" # Set default minimum order size based on currency defaults = { 'BTC-USDT': 0.0001, 'ETH-USDT': 0.001, 'SOL-USDT': 0.01, 'XRP-USDT': 1.0 } return defaults.get(symbol, 0.01) # Default 0.01 def create_order(self, symbol, side, amount, retries=3): """Create order""" for attempt in range(retries): try: self.rate_limiter.wait("order") # Parse trading pair symbol parts = symbol.split('-') if len(parts) != 2: logger.error(f"Invalid trading pair format: {symbol}") return None base_currency, quote_currency = parts # Adjust parameters based on buy/sell direction if side == 'buy': # When buying, amount is quote currency amount (USDT amount) # Use amount-based order placement order_params = { 'instId': symbol, 'tdMode': tdmode, 'side': 'buy', 'ordType': 'market', 'sz': str(amount), # Quote currency amount 'tgtCcy': 'quote_ccy' # Specify sz as quote currency } logger.info(f"[{symbol}] Create buy order: amount={amount:.2f} {quote_currency}") else: # When selling, amount is base currency quantity # Get precision info and adjust quantity min_sz, lot_sz = self.get_instrument_info(symbol) if min_sz is None: min_sz = self.get_default_min_size(symbol) if lot_sz is None: lot_sz = min_sz # Adjust quantity to appropriate precision if lot_sz > 0: amount = (amount / lot_sz) * lot_sz amount_str = f"{amount:.10f}" order_params = { 'instId': symbol, 'tdMode': tdmode, 'side': 'sell', 'ordType': 'market', 'sz': amount_str # Base currency quantity } logger.info(f"[{symbol}] Create sell order: quantity={amount_str} {base_currency}") # Use SDK to create order result = self.trade_api.place_order(**order_params) if self.log_http: logger.debug(f"HTTP Request: POST create order {result['code']}") # Check API response if result['code'] != '0': logger.error(f"Failed to create order: {result['msg']} (code: {result['code']})") if 'data' in result and len(result['data']) > 0: for item in result['data']: logger.error(f"Detailed error: {item.get('sMsg', 'Unknown')} (sCode: {item.get('sCode', 'Unknown')})") # Specific error handling if result['code'] == '50113': # Insufficient permissions logger.error("API key may not have trading permissions, please check API key settings") elif result['code'] == '51020': # Minimum order amount logger.error("Order amount below exchange minimum requirement") if attempt < retries - 1: wait_time = 2 ** attempt time.sleep(wait_time) continue # Check order status if len(result['data']) > 0: order_data = result['data'][0] if order_data.get('sCode') != '0': logger.error(f"Order creation failed: {order_data.get('sMsg', 'Unknown error')} (sCode: {order_data.get('sCode', 'Unknown')})") if attempt < retries - 1: wait_time = 2 ** attempt time.sleep(wait_time) continue order_id = order_data.get('ordId') if order_id: logger.info(f"Order created successfully: {order_id}") return order_id else: logger.error("Order ID is empty") if attempt < retries - 1: wait_time = 2 ** attempt time.sleep(wait_time) continue else: logger.error("No order data in API response") if attempt < retries - 1: wait_time = 2 ** attempt time.sleep(wait_time) continue except Exception as e: logger.error(f"Error creating order (attempt {attempt+1}/{retries}): {str(e)}") if attempt < retries - 1: wait_time = 2 ** attempt time.sleep(wait_time) else: return None return None def get_order_status(self, symbol, order_id): """Get order status""" self.rate_limiter.wait("order_status") try: result = self.trade_api.get_order(instId=symbol, ordId=order_id) # Correction: code "0" indicates success if result['code'] != '0': logger.error(f"Failed to get order status: {result['msg']} (code: {result['code']})") return None if len(result['data']) > 0: order_data = result['data'][0] return { 'state': order_data.get('state'), 'avgPx': float(order_data.get('avgPx', 0)), 'accFillSz': float(order_data.get('accFillSz', 0)), 'fillPx': float(order_data.get('fillPx', 0)), 'fillSz': float(order_data.get('fillSz', 0)), 'fillTime': order_data.get('fillTime') } else: logger.error("No order data in API response") return None except Exception as e: logger.error(f"Error getting order status: {e}") return None def wait_for_order_completion(self, symbol, order_id, max_attempts=10, interval=1): """Wait for order completion""" for attempt in range(max_attempts): order_status = self.get_order_status(symbol, order_id) if order_status is None: return None state = order_status['state'] if state == 'filled': logger.info(f"Order completed: {order_id}, fill price={order_status['avgPx']:.2f}, fill quantity={order_status['accFillSz']:.10f}") return order_status elif state == 'canceled': logger.warning(f"Order canceled: {order_id}") return None elif state == 'partially_filled': logger.info(f"Order partially filled: {order_id}, filled={order_status['accFillSz']:.10f}") time.sleep(interval) else: logger.info(f"Order status: {state}, waiting...") time.sleep(interval) logger.warning(f"Order not completed within specified time: {order_id}") return None class DeepSeekAnalyzer: """DeepSeek AI Analyzer (KEN2.0 optimized)""" def __init__(self): self.api_key = os.getenv('DEEPSEEK_API_KEY') self.api_url = 'https://api.deepseek.com/v1/chat/completions' def analyze_market(self, symbol, market_data, technical_indicators): """Use DeepSeek to analyze market (optimized version)""" if not self.api_key: logger.warning("DeepSeek API key not set, skipping AI analysis") return None try: # Prepare analysis data latest = market_data.iloc[-1] technical_summary = self._prepare_enhanced_technical_summary(technical_indicators, market_data) prompt = self._create_enhanced_analysis_prompt(symbol, latest, market_data, technical_summary) response = self._call_deepseek_api(prompt) return self._parse_deepseek_response(response) except Exception as e: logger.error(f"DeepSeek analysis error: {e}") return None def _prepare_enhanced_technical_summary(self, indicators, market_data): """Prepare enhanced technical indicator summary""" summary = [] try: # Price information current_price = indicators.get('current_price', 0) summary.append(f"Current price: ${current_price:.4f}") # Multi-period RSI rsi_7 = indicators.get('rsi_7', 50) rsi_14 = indicators.get('rsi', 50) rsi_21 = indicators.get('rsi_21', 50) if hasattr(rsi_7, '__len__'): rsi_7, rsi_14, rsi_21 = rsi_7.iloc[-1], rsi_14.iloc[-1], rsi_21.iloc[-1] summary.append(f"RSI(7/14/21): {rsi_7:.1f}/{rsi_14:.1f}/{rsi_21:.1f}") # RSI status analysis rsi_status = [] for rsi_val, period in [(rsi_7, 7), (rsi_14, 14), (rsi_21, 21)]: if rsi_val > 70: rsi_status.append(f"RSI{period} overbought") elif rsi_val < 30: rsi_status.append(f"RSI{period} oversold") if rsi_status: summary.append(f"RSI status: {', '.join(rsi_status)}") # KDJ indicator k, d, j = indicators.get('kdj', (50, 50, 50)) if hasattr(k, '__len__'): k, d, j = k.iloc[-1], d.iloc[-1], j.iloc[-1] summary.append(f"KDJ: K={k:.1f}, D={d:.1f}, J={j:.1f}") if j > 80: summary.append("KDJ status: Overbought area") elif j < 20: summary.append("KDJ status: Oversold area") # MACD indicator macd_line, signal_line, histogram = indicators.get('macd', (0, 0, 0)) if hasattr(macd_line, '__len__'): macd_line, signal_line, histogram = macd_line.iloc[-1], signal_line.iloc[-1], histogram.iloc[-1] summary.append(f"MACD: line={macd_line:.4f}, signal={signal_line:.4f}, histogram={histogram:.4f}") summary.append(f"MACD status: {'Golden cross' if macd_line > signal_line else 'Death cross'}") # Moving averages sma_20 = indicators.get('sma_20', 0) sma_50 = indicators.get('sma_50', 0) sma_100 = indicators.get('sma_100', 0) if hasattr(sma_20, '__len__'): sma_20, sma_50, sma_100 = sma_20.iloc[-1], sma_50.iloc[-1], sma_100.iloc[-1] price_vs_ma = [] if current_price > sma_20: price_vs_ma.append("Above 20-day MA") else: price_vs_ma.append("Below 20-day MA") if current_price > sma_50: price_vs_ma.append("Above 50-day MA") else: price_vs_ma.append("Below 50-day MA") summary.append(f"Moving averages: 20-day=${sma_20:.2f}, 50-day=${sma_50:.2f}, 100-day=${sma_100:.2f}") summary.append(f"Price position: {', '.join(price_vs_ma)}") # Bollinger Bands upper, middle, lower = indicators.get('bollinger_2std', (0, 0, 0)) if hasattr(upper, '__len__'): upper, middle, lower = upper.iloc[-1], middle.iloc[-1], lower.iloc[-1] bb_position = (current_price - lower) / (upper - lower) * 100 if (upper - lower) > 0 else 50 summary.append(f"Bollinger Band position: {bb_position:.1f}% (0%=lower band, 100%=upper band)") # Trend strength trend_strength = indicators.get('trend_strength', 0) summary.append(f"Trend strength: {trend_strength:.2f} (0-1, higher means stronger trend)") # Volatility volatility = indicators.get('volatility', 0) * 100 # Convert to percentage summary.append(f"Annualized volatility: {volatility:.1f}%") # ATR atr = indicators.get('atr', 0) if hasattr(atr, '__len__'): atr = atr.iloc[-1] atr_percent = (atr / current_price * 100) if current_price > 0 else 0 summary.append(f"ATR: {atr:.4f} ({atr_percent:.2f}%)") # Support resistance information sr_levels = TechnicalAnalyzer.calculate_support_resistance(market_data) if sr_levels: summary.append(f"Static support: ${sr_levels['static_support']:.4f}, resistance: ${sr_levels['static_resistance']:.4f}") summary.append(f"Dynamic support: ${sr_levels['dynamic_support']:.4f}, resistance: ${sr_levels['dynamic_resistance']:.4f}") summary.append(f"Relative resistance: {sr_levels['current_vs_resistance']:+.2f}%, relative support: {sr_levels['current_vs_support']:+.2f}%") return "\n".join(summary) except Exception as e: logger.error(f"Error preparing technical summary: {e}") return "Technical indicator calculation exception" def _create_enhanced_analysis_prompt(self, symbol, latest, market_data, technical_summary): """Create enhanced analysis prompt""" # Calculate price changes price_change_24h = 0 price_change_7d = 0 if len(market_data) >= 24: price_change_24h = ((latest['close'] - market_data.iloc[-24]['close']) / market_data.iloc[-24]['close'] * 100) if len(market_data) >= 168: # 7 days data (hourly) price_change_7d = ((latest['close'] - market_data.iloc[-168]['close']) / market_data.iloc[-168]['close'] * 100) # Calculate volume changes volume_trend = "Rising" if latest['volume'] > market_data['volume'].mean() else "Falling" prompt = f""" You are a professional cryptocurrency trading AI, conducting autonomous trading in the OKX digital currency market. Please comprehensively analyze the {symbol} trading pair and execute trades. # Core Objective **Maximize Sharpe Ratio** Sharpe Ratio = Average Return / Return Volatility, which means: Quality trades (high win rate, large profit/loss ratio) → Improve Sharpe Stable returns, control drawdowns → Improve Sharpe Frequent trading, small profits and losses → Increase volatility, severely reduce Sharpe 【Market Overview】 - Current price: ${latest['close']:.4f} - 24-hour change: {price_change_24h:+.2f}% - 7-day change: {price_change_7d:+.2f}% - Volume: {latest['volume']:.0f} ({volume_trend}) - Volume relative level: {'High' if latest['volume'] > market_data['volume'].quantile(0.7) else 'Normal' if latest['volume'] > market_data['volume'].quantile(0.3) else 'Low'} 【Multi-dimensional Technical Analysis】 {technical_summary} 【Market Environment Assessment】 Please consider comprehensively: 1. Trend direction and strength 2. Overbought/oversold status 3. Volume coordination 4. Volatility level 5. Support resistance positions 6. Multi-timeframe consistency 【Risk Management Suggestions】 - Suggested position: Adjust based on signal strength and volatility - Stop-loss setting: Reference ATR and support levels - Holding time: Based on trend strength 【Output Format Requirements】 Please reply strictly in the following JSON format: {{ "action": "BUY/SELL/HOLD", "confidence": "HIGH/MEDIUM/LOW", "position_size": "Suggested position ratio (0.1-0.5)", "stop_loss": "Suggested stop-loss price or percentage", "take_profit": "Suggested take-profit price or percentage", "timeframe": "Suggested holding time (SHORT/MEDIUM/LONG)", "reasoning": "Detailed analysis logic and risk explanation" }} Please provide objective suggestions based on professional technical analysis and risk management principles. """ return prompt def analyze_sell_proportion(self, symbol, position_info, market_analysis, current_price): """Analyze sell proportion""" if not self.api_key: logger.warning("DeepSeek API key not set, using default sell proportion") return self._get_fallback_sell_proportion(position_info, 'MEDIUM') try: prompt = self._create_sell_proportion_prompt(symbol, position_info, market_analysis, current_price) response = self._call_deepseek_api(prompt) return self._parse_sell_proportion_response(response) except Exception as e: logger.error(f"DeepSeek sell proportion analysis error: {e}") return self._get_fallback_sell_proportion(position_info, 'MEDIUM') def _create_sell_proportion_prompt(self, symbol, position_info, market_analysis, current_price): """Create sell proportion analysis prompt""" # Calculate profit situation entry_price = position_info['entry_price'] profit_ratio = (current_price - entry_price) / entry_price if entry_price > 0 else 0 profit_status = "Profit" if profit_ratio > 0 else "Loss" profit_amount = (current_price - entry_price) * position_info['base_amount'] # Calculate position ratio position_ratio = position_info['position_ratio'] * 100 # Convert to percentage # Prepare technical indicator summary technical_summary = self._prepare_enhanced_technical_summary( market_analysis['technical_indicators'], market_analysis['market_data'] ) prompt = f""" You are a professional cryptocurrency trading AI, conducting autonomous trading in the OKX digital currency market. Analyze the {symbol} position situation and consider and execute sell decisions. 【Current Position Situation】 - Position quantity: {position_info['base_amount']:.8f} - Average entry price: ${entry_price:.4f} - Current price: ${current_price:.4f} - Profit ratio: {profit_ratio:+.2%} ({profit_status}) - Profit amount: ${profit_amount:+.2f} - Position ratio: {position_ratio:.2f}% (percentage of total portfolio) - Position value: ${position_info['position_value']:.2f} 【Market Technical Analysis】 {technical_summary} 【Risk Situation】 - Current price distance to stop-loss: {position_info['distance_to_stop_loss']:.2%} - Current price distance to take-profit: {position_info['distance_to_take_profit']:.2%} - Market volatility: {position_info['market_volatility']:.2%} - Trend strength: {position_info.get('trend_strength', 0):.2f} 【Sell Strategy Considerations】 Please consider the following factors to determine sell proportion: 1. Profit ratio: Large profits can consider partial profit-taking, keep some position for higher gains 2. Position ratio: Overweight positions should reduce position to lower risk, light positions can consider holding 3. Technical signals: Strong bearish signals should increase sell proportion, bullish signals can reduce sell proportion 4. Market environment: High volatility markets should be more conservative, low volatility markets can be more aggressive 5. Risk control: Close to stop-loss should sell decisively, far from stop-loss can be more flexible 【Position Management Principles】 - Profit over 20%: Consider partial profit-taking (30-70%), lock in profits - Profit 10-20%: Decide based on signal strength (20-50%) - Small profit (0-10%): Mainly based on technical signals (0-30%) - Small loss (0-5%): Based on risk control (50-80%) - Large loss (>5%): Consider stop-loss or position reduction (80-100%) 【Output Format Requirements】 Please reply strictly in the following JSON format, only containing numeric ratio (0-1): {{ "sell_proportion": 0.75 }} Explanation: - 0.1 means sell 10% of position - 0.5 means sell 50% of position - 1.0 means sell all position - 0.0 means don't sell (only in extremely bullish situations) Please provide reasonable sell proportion suggestions based on professional analysis and risk management. """ return prompt def _parse_sell_proportion_response(self, response): """Parse sell proportion response""" try: content = response['choices'][0]['message']['content'] # Try to parse JSON format if '{' in content and '}' in content: json_start = content.find('{') json_end = content.rfind('}') + 1 json_str = content[json_start:json_end] parsed = json.loads(json_str) proportion = parsed.get('sell_proportion', 0.5) # Ensure proportion is within reasonable range proportion = max(0.0, min(1.0, proportion)) logger.info(f"DeepSeek suggested sell proportion: {proportion:.1%}") return proportion else: # Text analysis: try to extract proportion from text import re patterns = [ r'[\"\"]([\d.]+)%[\"\"]', # "50%" r'sell\s*([\d.]+)%', # sell 50% r'([\d.]+)%', # 50% r'[\"\"]([\d.]+)[\"\"]', # "0.5" ] for pattern in patterns: match = re.search(pattern, content) if match: value = float(match.group(1)) if value > 1: # If in percentage format value = value / 100 proportion = max(0.0, min(1.0, value)) logger.info(f"Parsed sell proportion from text: {proportion:.1%}") return proportion # Default value logger.warning("Unable to parse sell proportion, using default 50%") return 0.5 except Exception as e: logger.error(f"Error parsing sell proportion response: {e}") return 0.5 def _get_fallback_sell_proportion(self, position_info, decision_confidence): """Fallback sell proportion decision (when DeepSeek is unavailable)""" profit_ratio = position_info['profit_ratio'] position_ratio = position_info['position_ratio'] market_volatility = position_info.get('market_volatility', 0) trend_strength = position_info.get('trend_strength', 0) # Decision based on profit situation if profit_ratio >= 0.20: # Profit over 20% base_proportion = 0.6 if decision_confidence == 'HIGH' else 0.4 elif profit_ratio >= 0.10: # Profit 10%-20% base_proportion = 0.5 if decision_confidence == 'HIGH' else 0.3 elif profit_ratio >= 0: # Profit 0%-10% base_proportion = 0.4 if decision_confidence == 'HIGH' else 0.2 elif profit_ratio >= -0.05: # Loss 0%-5% base_proportion = 0.7 if decision_confidence == 'HIGH' else 0.5 else: # Loss over 5% base_proportion = 0.9 if decision_confidence == 'HIGH' else 0.7 # Adjustment based on position ratio if position_ratio > 0.15: # Overweight position base_proportion = min(1.0, base_proportion + 0.2) elif position_ratio < 0.05: # Very light position base_proportion = max(0.1, base_proportion - 0.1) # Adjustment based on market volatility if market_volatility > 0.05: # High volatility market base_proportion = min(1.0, base_proportion + 0.1) elif market_volatility < 0.02: # Low volatility market base_proportion = max(0.1, base_proportion - 0.1) # Adjustment based on trend strength if trend_strength > 0.7: # Strong trend base_proportion = max(0.1, base_proportion - 0.1) elif trend_strength < 0.3: # Weak trend base_proportion = min(1.0, base_proportion + 0.1) # Ensure proportion is within reasonable range base_proportion = max(0.1, min(1.0, base_proportion)) logger.info(f"Fallback sell proportion decision: profit={profit_ratio:+.2%}, position ratio={position_ratio:.2%}, " f"volatility={market_volatility:.2%}, trend strength={trend_strength:.2f}, final proportion={base_proportion:.1%}") return base_proportion def _call_deepseek_api(self, prompt, max_retries=3): """Call DeepSeek API (improved retry mechanism)""" headers = { 'Authorization': f'Bearer {self.api_key}', 'Content-Type': 'application/json' } data = { 'model': 'deepseek-chat', 'messages': [ { 'role': 'system', 'content': 'You are a professional cryptocurrency trading AI, conducting autonomous trading in the OKX digital currency market, pursuing stable and sustainable returns' }, { 'role': 'user', 'content': prompt } ], 'temperature': 0.3, 'max_tokens': 1000 } for attempt in range(max_retries): try: response = requests.post( self.api_url, headers=headers, json=data, timeout=30 # 30 second timeout ) if response.status_code == 200: return response.json() elif response.status_code == 429: # Rate limit wait_time = 2 ** attempt # Exponential backoff logger.warning(f"DeepSeek API rate limit, waiting {wait_time} seconds before retry") time.sleep(wait_time) continue else: logger.error(f"DeepSeek API call failed: {response.status_code} - {response.text}") if attempt < max_retries - 1: time.sleep(1) continue else: raise Exception(f"API call failed: {response.status_code}") except requests.exceptions.Timeout: logger.warning(f"DeepSeek API request timeout, attempt {attempt + 1}/{max_retries}") if attempt < max_retries - 1: time.sleep(1) continue else: raise Exception("DeepSeek API request timeout") except Exception as e: logger.error(f"DeepSeek API call exception: {e}") if attempt < max_retries - 1: time.sleep(1) continue else: raise def _parse_deepseek_response(self, response): """Parse DeepSeek response - enhanced version""" try: content = response['choices'][0]['message']['content'].strip() logger.debug(f"DeepSeek raw response: {content}") # Remove possible code block markers if content.startswith('```json'): content = content[7:] if content.endswith('```'): content = content[:-3] content = content.strip() # Try to parse JSON format if '{' in content and '}' in content: json_start = content.find('{') json_end = content.rfind('}') + 1 json_str = content[json_start:json_end] try: parsed = json.loads(json_str) result = { 'action': parsed.get('action', 'HOLD').upper(), 'confidence': parsed.get('confidence', 'MEDIUM').upper(), 'reasoning': parsed.get('reasoning', ''), 'raw_response': content } # Validate action and confidence legality if result['action'] not in ['BUY', 'SELL', 'HOLD']: logger.warning(f"AI returned illegal action: {result['action']}, using HOLD") result['action'] = 'HOLD' if result['confidence'] not in ['HIGH', 'MEDIUM', 'LOW']: logger.warning(f"AI returned illegal confidence: {result['confidence']}, using MEDIUM") result['confidence'] = 'MEDIUM' logger.info(f"AI parsing successful: {result['action']} (confidence: {result['confidence']})") return result except json.JSONDecodeError as e: logger.warning(f"JSON parsing failed: {e}, trying text analysis") return self._parse_text_response(content) else: # Text analysis return self._parse_text_response(content) except Exception as e: logger.error(f"Error parsing DeepSeek response: {e}") # Return default values to avoid system crash return { 'action': 'HOLD', 'confidence': 'MEDIUM', 'reasoning': f'Parsing error: {str(e)}', 'raw_response': str(response) } def _parse_text_response(self, text): """Parse text response - enhanced version""" text_lower = text.lower() # More precise action recognition action = 'HOLD' if any(word in text_lower for word in ['买入', '做多', 'buy', 'long', '上涨', '看涨']): action = 'BUY' elif any(word in text_lower for word in ['卖出', '做空', 'sell', 'short', '下跌', '看跌']): action = 'SELL' # More precise confidence level recognition confidence = 'MEDIUM' if any(word in text_lower for word in ['强烈', '高信心', 'high', '非常', '强烈建议']): confidence = 'HIGH' elif any(word in text_lower for word in ['低', '低信心', 'low', '轻微', '谨慎']): confidence = 'LOW' logger.info(f"Text parsing result: {action} (confidence: {confidence})") return { 'action': action, 'confidence': confidence, 'reasoning': text, 'raw_response': text } class TechnicalAnalyzer: """Technical Analyzer""" @staticmethod def calculate_indicators(df): """Calculate technical indicators (optimized version)""" if df is None or len(df) < 20: # Lower minimum data requirement logger.warning("Insufficient data, unable to calculate technical indicators") return {} try: indicators = {} # KDJ indicator k, d, j = TechnicalAnalyzer.calculate_kdj(df) if k is not None: indicators['kdj'] = (k, d, j) # RSI indicator (multiple time periods) rsi_14 = TechnicalAnalyzer.calculate_rsi(df['close'], 14) rsi_7 = TechnicalAnalyzer.calculate_rsi(df['close'], 7) rsi_21 = TechnicalAnalyzer.calculate_rsi(df['close'], 21) if rsi_14 is not None: indicators['rsi'] = rsi_14 indicators['rsi_7'] = rsi_7 indicators['rsi_21'] = rsi_21 # ATR indicator atr = TechnicalAnalyzer.calculate_atr(df) if atr is not None: indicators['atr'] = atr # MACD indicator macd_line, signal_line, histogram = TechnicalAnalyzer.calculate_macd(df['close']) if macd_line is not None: indicators['macd'] = (macd_line, signal_line, histogram) # Bollinger Bands (multiple standard deviations) upper1, middle1, lower1 = TechnicalAnalyzer.calculate_bollinger_bands(df['close'], 20, 1) upper2, middle2, lower2 = TechnicalAnalyzer.calculate_bollinger_bands(df['close'], 20, 2) if upper1 is not None: indicators['bollinger_1std'] = (upper1, middle1, lower1) indicators['bollinger_2std'] = (upper2, middle2, lower2) # Moving Averages (multiple periods) sma_20 = TechnicalAnalyzer.calculate_sma(df['close'], 20) sma_50 = TechnicalAnalyzer.calculate_sma(df['close'], 50) sma_100 = TechnicalAnalyzer.calculate_sma(df['close'], 100) ema_12 = TechnicalAnalyzer.calculate_ema(df['close'], 12) ema_26 = TechnicalAnalyzer.calculate_ema(df['close'], 26) if sma_20 is not None: indicators['sma_20'] = sma_20 indicators['sma_50'] = sma_50 indicators['sma_100'] = sma_100 indicators['ema_12'] = ema_12 indicators['ema_26'] = ema_26 # Stochastic Oscillator k, d = TechnicalAnalyzer.calculate_stochastic(df) if k is not None: indicators['stochastic'] = (k, d) # Trend strength indicators['trend_strength'] = TechnicalAnalyzer.calculate_trend_strength(df) # Volatility indicators['volatility'] = TechnicalAnalyzer.calculate_volatility(df) # Current price indicators['current_price'] = df['close'].iloc[-1] return indicators except Exception as e: logger.error(f"Error calculating technical indicators: {e}") return {} @staticmethod def calculate_ema(prices, period): """Calculate Exponential Moving Average""" try: return prices.ewm(span=period, adjust=False).mean() except Exception as e: logger.error(f"Error calculating EMA: {e}") return None @staticmethod def calculate_trend_strength(df, period=20): """Calculate trend strength""" try: if len(df) < period: return 0 # Use linear regression to calculate trend strength x = np.arange(len(df)) y = df['close'].values # Linear regression slope, intercept = np.polyfit(x[-period:], y[-period:], 1) # Calculate R² value as trend strength y_pred = slope * x[-period:] + intercept ss_res = np.sum((y[-period:] - y_pred) ** 2) ss_tot = np.sum((y[-period:] - np.mean(y[-period:])) ** 2) r_squared = 1 - (ss_res / ss_tot) if ss_tot != 0 else 0 return abs(r_squared) # Take absolute value, trend strength doesn't distinguish positive/negative except Exception as e: logger.error(f"Error calculating trend strength: {e}") return 0 @staticmethod def detect_rsi_divergence(df, rsi_period=14): """Detect RSI bullish divergence""" try: if len(df) < 30: # Need sufficient data return False # Calculate RSI rsi = TechnicalAnalyzer.calculate_rsi(df['close'], rsi_period) if rsi is None: return False # Use pandas methods to simplify calculation # Find lowest points in recent 10 periods recent_lows = df['low'].tail(10) recent_rsi = rsi.tail(10) # Find price lowest point and RSI lowest point min_price_idx = recent_lows.idxmin() min_rsi_idx = recent_rsi.idxmin() # If price lowest point appears later than RSI lowest point, possible bullish divergence if min_price_idx > min_rsi_idx: # Check if price is making new lows while RSI is rising price_trend = (recent_lows.iloc[-1] < recent_lows.iloc[-5]) rsi_trend = (recent_rsi.iloc[-1] > recent_rsi.iloc[-5]) return price_trend and rsi_trend and recent_rsi.iloc[-1] < 40 return False except Exception as e: logger.error(f"RSI divergence detection error: {e}") return False @staticmethod def detect_macd_divergence(df): """Detect MACD bullish divergence""" try: if len(df) < 30: return False # Calculate MACD, especially focus on histogram _, _, histogram = TechnicalAnalyzer.calculate_macd(df['close']) if histogram is None: return False # Use recent data recent_lows = df['low'].tail(10) recent_hist = histogram.tail(10) # Find price lowest point and MACD histogram lowest point min_price_idx = recent_lows.idxmin() min_hist_idx = recent_hist.idxmin() # If price lowest point appears later than histogram lowest point, possible bullish divergence if min_price_idx > min_hist_idx: # Check price trend and histogram trend price_trend = (recent_lows.iloc[-1] < recent_lows.iloc[-5]) hist_trend = (recent_hist.iloc[-1] > recent_hist.iloc[-5]) # Histogram rising from negative area is stronger signal hist_improving = recent_hist.iloc[-1] > recent_hist.iloc[-3] return price_trend and hist_trend and hist_improving return False except Exception as e: logger.error(f"MACD divergence detection error: {e}") return False @staticmethod def volume_confirmation(df, period=5): """Bottom volume confirmation""" try: if len(df) < period + 5: return False current_volume = df['volume'].iloc[-1] avg_volume = df['volume'].tail(period).mean() # Current volume significantly higher than average volume_ratio = current_volume / avg_volume if avg_volume > 0 else 1 # Price falling but volume increasing (possible bottom accumulation) price_change = (df['close'].iloc[-1] - df['close'].iloc[-2]) / df['close'].iloc[-2] if volume_ratio > 1.5 and price_change < 0: return True return False except Exception as e: logger.error(f"Volume confirmation error: {e}") return False @staticmethod def detect_hammer_pattern(df, lookback=5): """Detect hammer reversal pattern""" try: if len(df) < lookback + 1: return False latest = df.iloc[-1] body_size = abs(latest['close'] - latest['open']) total_range = latest['high'] - latest['low'] # Avoid division by zero if total_range == 0: return False # Hammer characteristics: lower shadow at least 2x body, very short upper shadow lower_shadow = min(latest['open'], latest['close']) - latest['low'] upper_shadow = latest['high'] - max(latest['open'], latest['close']) is_hammer = (lower_shadow >= 2 * body_size and upper_shadow <= body_size * 0.5 and body_size > 0) # Needs to be in downtrend prev_trend = df['close'].iloc[-lookback] > df['close'].iloc[-1] return is_hammer and prev_trend except Exception as e: logger.error(f"Hammer pattern detection error: {e}") return False @staticmethod def detect_reversal_patterns(df): """Detect bottom reversal patterns""" reversal_signals = [] try: # RSI bullish divergence if TechnicalAnalyzer.detect_rsi_divergence(df): reversal_signals.append({ 'type': 'RSI_DIVERGENCE_BULLISH', 'action': 'BUY', 'strength': 'MEDIUM', 'description': 'RSI bullish divergence, momentum divergence bullish' }) # MACD bullish divergence if TechnicalAnalyzer.detect_macd_divergence(df): reversal_signals.append({ 'type': 'MACD_DIVERGENCE_BULLISH', 'action': 'BUY', 'strength': 'MEDIUM', 'description': 'MACD histogram bullish divergence, momentum strengthening' }) # Volume confirmation if TechnicalAnalyzer.volume_confirmation(df): reversal_signals.append({ 'type': 'VOLUME_CONFIRMATION', 'action': 'BUY', 'strength': 'LOW', 'description': 'Bottom volume increase, signs of capital entry' }) # Add hammer pattern detection if TechnicalAnalyzer.detect_hammer_pattern(df): reversal_signals.append({ 'type': 'HAMMER_PATTERN', 'action': 'BUY', 'strength': 'LOW', 'description': 'Hammer pattern, short-term reversal signal' }) except Exception as e: logger.error(f"Reversal pattern detection error: {e}") return reversal_signals @staticmethod def calculate_support_resistance(df, period=20): """Calculate support resistance levels (fixed version)""" try: # Recent high resistance resistance = df['high'].rolling(window=period).max().iloc[-1] # Recent low support support = df['low'].rolling(window=period).min().iloc[-1] # Dynamic support resistance (based on Bollinger Bands) # Fix: correctly receive three return values from Bollinger Bands upper_bb, middle_bb, lower_bb = TechnicalAnalyzer.calculate_bollinger_bands(df['close'], 20, 2) # Check if Bollinger Band calculation successful if upper_bb is not None and lower_bb is not None: resistance_bb = upper_bb.iloc[-1] if hasattr(upper_bb, 'iloc') else upper_bb support_bb = lower_bb.iloc[-1] if hasattr(lower_bb, 'iloc') else lower_bb else: # If Bollinger Band calculation fails, use static values as backup resistance_bb = resistance support_bb = support current_price = df['close'].iloc[-1] return { 'static_resistance': resistance, 'static_support': support, 'dynamic_resistance': resistance_bb, 'dynamic_support': support_bb, 'current_vs_resistance': (current_price - resistance) / resistance * 100 if resistance > 0 else 0, 'current_vs_support': (current_price - support) / support * 100 if support > 0 else 0 } except Exception as e: logger.error(f"Error calculating support resistance: {e}") # Return default values to avoid subsequent errors current_price = df['close'].iloc[-1] if len(df) > 0 else 0 return { 'static_resistance': current_price * 1.1 if current_price > 0 else 0, 'static_support': current_price * 0.9 if current_price > 0 else 0, 'dynamic_resistance': current_price * 1.05 if current_price > 0 else 0, 'dynamic_support': current_price * 0.95 if current_price > 0 else 0, 'current_vs_resistance': 0, 'current_vs_support': 0 } @staticmethod def calculate_volatility(df, period=20): """Calculate volatility""" try: returns = df['close'].pct_change().dropna() volatility = returns.rolling(window=period).std() * np.sqrt(365) # Annualized volatility return volatility.iloc[-1] if len(volatility) > 0 else 0 except Exception as e: logger.error(f"Error calculating volatility: {e}") return 0 @staticmethod def generate_weighted_signals(df): """Generate weighted technical signals""" signals = [] weights = { 'KDJ_GOLDEN_CROSS': 0.15, 'MACD_GOLDEN_CROSS': 0.20, 'MA_GOLDEN_CROSS': 0.25, 'RSI_OVERSOLD': 0.15, 'BOLLINGER_LOWER_TOUCH': 0.15, 'STOCH_GOLDEN_CROSS': 0.10 } # Calculate various indicator signals technical_signals = TechnicalAnalyzer.generate_signals(df) # Calculate weighted score total_score = 0 signal_count = 0 for signal in technical_signals: weight = weights.get(signal['type'], 0.05) strength_multiplier = 1.0 if signal['strength'] == 'STRONG' else 0.7 total_score += weight * strength_multiplier signal_count += 1 # Normalize score normalized_score = min(1.0, total_score) # Determine action if normalized_score > 0.6: action = 'BUY' elif normalized_score < 0.3: action = 'SELL' else: action = 'HOLD' return { 'score': normalized_score, 'action': action, 'signal_count': signal_count, 'signals': technical_signals, 'confidence': 'HIGH' if normalized_score > 0.7 or normalized_score < 0.2 else 'MEDIUM' } @staticmethod def calculate_kdj(df, n=9, m1=3, m2=3): """Calculate KDJ indicator""" try: if len(df) < n: return pd.Series([np.nan] * len(df)), pd.Series([np.nan] * len(df)), pd.Series([np.nan] * len(df)) low_list = df['low'].rolling(window=n, min_periods=1).min() high_list = df['high'].rolling(window=n, min_periods=1).max() # Avoid division by zero error denominator = high_list - low_list denominator = denominator.replace(0, np.nan) rsv = ((df['close'] - low_list) / denominator) * 100 rsv = rsv.fillna(50) k_series = rsv.ewm(span=m1-1, adjust=False).mean() d_series = k_series.ewm(span=m2-1, adjust=False).mean() j_series = 3 * k_series - 2 * d_series return k_series, d_series, j_series except Exception as e: logger.error(f"Error calculating KDJ: {e}") return None, None, None @staticmethod def calculate_rsi(prices, period=14): """Calculate RSI""" try: if len(prices) < period: return pd.Series([np.nan] * len(prices)) delta = prices.diff() gain = (delta.where(delta > 0, 0)).rolling(window=period, min_periods=1).mean() loss = (-delta.where(delta < 0, 0)).rolling(window=period, min_periods=1).mean() # Avoid division by zero error rs = gain / loss.replace(0, np.nan) rsi = 100 - (100 / (1 + rs)) return rsi.fillna(50) except Exception as e: logger.error(f"Error calculating RSI: {e}") return None @staticmethod def calculate_atr(df, period=14): """Calculate ATR""" try: if len(df) < period: return pd.Series([np.nan] * len(df)) high_low = df['high'] - df['low'] high_close = np.abs(df['high'] - df['close'].shift()) low_close = np.abs(df['low'] - df['close'].shift()) true_range = np.maximum(high_low, np.maximum(high_close, low_close)) atr = true_range.rolling(window=period, min_periods=1).mean() return atr except Exception as e: logger.error(f"Error calculating ATR: {e}") return None @staticmethod def calculate_macd(prices, fast_period=12, slow_period=26, signal_period=9): """Calculate MACD""" try: ema_fast = prices.ewm(span=fast_period, adjust=False).mean() ema_slow = prices.ewm(span=slow_period, adjust=False).mean() macd_line = ema_fast - ema_slow signal_line = macd_line.ewm(span=signal_period, adjust=False).mean() histogram = macd_line - signal_line return macd_line, signal_line, histogram except Exception as e: logger.error(f"Error calculating MACD: {e}") return None, None, None @staticmethod def calculate_bollinger_bands(prices, period=20, std_dev=2): """Calculate Bollinger Bands""" try: if len(prices) < period: logger.warning(f"Data length {len(prices)} insufficient, unable to calculate {period}-period Bollinger Bands") return None, None, None middle = prices.rolling(window=period).mean() std = prices.rolling(window=period).std() # Handle NaN standard deviation std = std.fillna(0) upper = middle + (std * std_dev) lower = middle - (std * std_dev) return upper, middle, lower except Exception as e: logger.error(f"Error calculating Bollinger Bands: {e}") return None, None, None @staticmethod def calculate_sma(prices, period): """Calculate Simple Moving Average""" try: return prices.rolling(window=period).mean() except Exception as e: logger.error(f"Error calculating SMA: {e}") return None @staticmethod def calculate_stochastic(df, k_period=14, d_period=3): """Calculate Stochastic Oscillator""" try: low_min = df['low'].rolling(window=k_period).min() high_max = df['high'].rolling(window=k_period).max() k = 100 * ((df['close'] - low_min) / (high_max - low_min)) d = k.rolling(window=d_period).mean() return k, d except Exception as e: logger.error(f"Error calculating Stochastic: {e}") return None, None @staticmethod def generate_signals(df): """Generate technical signals""" signals = [] try: # KDJ k, d, j = TechnicalAnalyzer.calculate_kdj(df) if k is not None and len(k) > 1: latest_k, latest_d, latest_j = k.iloc[-1], d.iloc[-1], j.iloc[-1] prev_k, prev_d, prev_j = k.iloc[-2], d.iloc[-2], j.iloc[-2] if prev_k <= prev_d and latest_k > latest_d: signals.append({'type': 'KDJ_GOLDEN_CROSS', 'action': 'BUY', 'strength': 'MEDIUM'}) elif prev_k >= prev_d and latest_k < latest_d: signals.append({'type': 'KDJ_DEATH_CROSS', 'action': 'SELL', 'strength': 'MEDIUM'}) if latest_j < 20: signals.append({'type': 'KDJ_OVERSOLD', 'action': 'BUY', 'strength': 'MEDIUM'}) elif latest_j > 80: signals.append({'type': 'KDJ_OVERBOUGHT', 'action': 'SELL', 'strength': 'MEDIUM'}) # RSI rsi = TechnicalAnalyzer.calculate_rsi(df['close']) if rsi is not None and len(rsi) > 0: latest_rsi = rsi.iloc[-1] if latest_rsi < 30: signals.append({'type': 'RSI_OVERSOLD', 'action': 'BUY', 'strength': 'MEDIUM'}) elif latest_rsi > 70: signals.append({'type': 'RSI_OVERBOUGHT', 'action': 'SELL', 'strength': 'MEDIUM'}) # MACD macd_line, signal_line, _ = TechnicalAnalyzer.calculate_macd(df['close']) if macd_line is not None and len(macd_line) > 1: latest_macd = macd_line.iloc[-1] latest_signal = signal_line.iloc[-1] prev_macd = macd_line.iloc[-2] prev_signal = signal_line.iloc[-2] if prev_macd <= prev_signal and latest_macd > latest_signal: signals.append({'type': 'MACD_GOLDEN_CROSS', 'action': 'BUY', 'strength': 'MEDIUM'}) elif prev_macd >= prev_signal and latest_macd < latest_signal: signals.append({'type': 'MACD_DEATH_CROSS', 'action': 'SELL', 'strength': 'MEDIUM'}) # Bollinger Bands upper, middle, lower = TechnicalAnalyzer.calculate_bollinger_bands(df['close']) if upper is not None: latest_close = df['close'].iloc[-1] if latest_close <= lower.iloc[-1]: signals.append({'type': 'BOLlinger_LOWER_TOUCH', 'action': 'BUY', 'strength': 'MEDIUM'}) elif latest_close >= upper.iloc[-1]: signals.append({'type': 'BOLlinger_UPPER_TOUCH', 'action': 'SELL', 'strength': 'MEDIUM'}) # Moving Averages sma_short = TechnicalAnalyzer.calculate_sma(df['close'], 50) sma_long = TechnicalAnalyzer.calculate_sma(df['close'], 200) if sma_short is not None and sma_long is not None and len(sma_short) > 1: latest_short = sma_short.iloc[-1] latest_long = sma_long.iloc[-1] prev_short = sma_short.iloc[-2] prev_long = sma_long.iloc[-2] if prev_short <= prev_long and latest_short > latest_long: signals.append({'type': 'MA_GOLDEN_CROSS', 'action': 'BUY', 'strength': 'MEDIUM'}) elif prev_short >= prev_long and latest_short < latest_long: signals.append({'type': 'MA_DEATH_CROSS', 'action': 'SELL', 'strength': 'MEDIUM'}) # Stochastic k, d = TechnicalAnalyzer.calculate_stochastic(df) if k is not None and len(k) > 1: latest_k = k.iloc[-1] latest_d = d.iloc[-1] prev_k = k.iloc[-2] prev_d = d.iloc[-2] if latest_k < 20: signals.append({'type': 'STOCH_OVERSOLD', 'action': 'BUY', 'strength': 'MEDIUM'}) elif latest_k > 80: signals.append({'type': 'STOCH_OVERBOUGHT', 'action': 'SELL', 'strength': 'MEDIUM'}) if prev_k <= prev_d and latest_k > latest_d and latest_k < 80: signals.append({'type': 'STOCH_GOLDEN_CROSS', 'action': 'BUY', 'strength': 'MEDIUM'}) elif prev_k >= prev_d and latest_k < latest_d and latest_k > 20: signals.append({'type': 'STOCH_DEATH_CROSS', 'action': 'SELL', 'strength': 'MEDIUM'}) except Exception as e: logger.error(f"Error generating technical signals: {e}") return signals class RiskManager: """Risk Control Manager (Dynamic Position Management Version)""" def __init__(self, api): self.api = api self.max_total_position = float(os.getenv('MAX_TOTAL_POSITION', '1')) # Maximum total position 100% self.max_single_position = float(os.getenv('MAX_SINGLE_POSITION', '0.3')) # Single currency maximum position 30% self.stop_loss = float(os.getenv('STOP_LOSS', '0.05')) # Stop-loss 5% self.take_profit = float(os.getenv('TAKE_PROFIT', '0.15')) # Take-profit 15% self.max_daily_trades = int(os.getenv('MAX_DAILY_TRADES', '10')) # Maximum daily trades self.dust_threshold_value = 1.0 # Dust position value threshold (USDT) self.low_position_ratio = 0.05 # Low position ratio threshold (5%) self.trailing_stop_percent = float(os.getenv('TRAILING_STOP_PERCENT', '0.03')) # Trailing stop percentage self.monitor_interval = int(os.getenv('MONITOR_INTERVAL', '300')) # Monitoring interval seconds # Dynamic position parameters self.volatility_adjustment = True self.trend_adjustment = True self.market_sentiment_adjustment = True # Trade records self.daily_trade_count = {} self.db = DatabaseManager() def calculate_total_assets(self): """Calculate total assets (in USDT)""" try: # Get all currency balances balances = self.api.get_currency_balances() if not balances: logger.error("Unable to get currency balances") return 0.0, 0.0, {} total_usdt_value = 0.0 position_details = {} # Calculate USDT balance (including available and frozen) usdt_balance = balances.get('USDT', {}) usdt_avail = usdt_balance.get('amount', 0.0) usdt_frozen = usdt_balance.get('frozen', 0.0) total_usdt = usdt_avail + usdt_frozen total_usdt_value += total_usdt # Calculate value of other currencies for currency, balance_info in balances.items(): if currency != 'USDT': avail = balance_info.get('amount', 0.0) frozen = balance_info.get('frozen', 0.0) total_amount = avail + frozen if total_amount > 0: symbol = f"{currency}-USDT" current_price = self.api.get_current_price(symbol) if current_price: value = total_amount * current_price total_usdt_value += value position_details[symbol] = { 'amount': total_amount, # Total quantity (available + frozen) 'value': value, 'current_price': current_price } else: logger.warning(f"Unable to get {currency} price, skipping value calculation") logger.debug(f"Total asset calculation: USDT={total_usdt:.2f}(available{usdt_avail:.2f}+frozen{usdt_frozen:.2f}), position value={total_usdt_value - total_usdt:.2f}, total={total_usdt_value:.2f}") return total_usdt_value, usdt_avail, position_details except Exception as e: logger.error(f"Error calculating total assets: {e}") return 0.0, 0.0, {} def get_position_ratio(self, symbol): """Get specified currency's position ratio""" try: total_assets, usdt_balance, positions = self.calculate_total_assets() if total_assets == 0: return 0.0 position_value = positions.get(symbol, {}).get('value', 0.0) return position_value / total_assets except Exception as e: logger.error(f"Error calculating position ratio: {e}") return 0.0 def get_available_usdt_ratio(self): """Get available USDT ratio""" try: total_assets, usdt_balance, _ = self.calculate_total_assets() if total_assets == 0: return 0.0 return usdt_balance / total_assets except Exception as e: logger.error(f"Error calculating USDT ratio: {e}") return 0.0 def get_position_size(self, symbol, confidence, current_price): """Calculate position size based on confidence level and USDT availability""" try: total_assets, usdt_balance, positions = self.calculate_total_assets() if total_assets == 0: logger.warning("Total assets is 0, unable to calculate position") return 0.0 # Get current position value current_pos_value = positions.get(symbol, {}).get('value', 0.0) # Calculate USDT available ratio usdt_ratio = usdt_balance / total_assets # Adjust maximum position based on USDT availability adjusted_max_single = self.max_single_position * min(1.0, usdt_ratio * 3) # More USDT allows larger positions # Calculate maximum additional position max_single_add = max(0, total_assets * adjusted_max_single - current_pos_value) max_total_add = max(0, total_assets * self.max_total_position - (total_assets - usdt_balance)) max_add = min(max_single_add, max_total_add, usdt_balance) # Adjust based on confidence level multiplier = { 'HIGH': 0.8, # High confidence uses 80% of available amount 'MEDIUM': 0.5, # Medium confidence uses 50% 'LOW': 0.2 # Low confidence uses 20% }.get(confidence, 0.2) position_size = max_add * multiplier # Ensure doesn't exceed available USDT balance position_size = min(position_size, usdt_balance) logger.info(f"Position calculation: {symbol}, total assets=${total_assets:.2f}, USDT=${usdt_balance:.2f}, " f"current position=${current_pos_value:.2f}, suggested position=${position_size:.2f}") return position_size except Exception as e: logger.error(f"Error calculating position size: {e}") return 0.0 def get_dynamic_position_size(self, symbol, confidence, technical_indicators, current_price): """Dynamic position calculation""" try: # Base position calculation base_size = self.get_position_size(symbol, confidence, current_price) if base_size == 0: return 0.0 # Get market state parameters volatility = technical_indicators.get('volatility', 0) trend_strength = technical_indicators.get('trend_strength', 0) market_state = self.assess_market_state(technical_indicators) # Volatility adjustment (high volatility reduces position) volatility_factor = self._calculate_volatility_factor(volatility) # Trend strength adjustment (strong trend increases position) trend_factor = self._calculate_trend_factor(trend_strength) # Market state adjustment market_factor = self._calculate_market_factor(market_state) # Confidence level adjustment confidence_factor = self._calculate_confidence_factor(confidence) # Calculate dynamic position dynamic_size = base_size * volatility_factor * trend_factor * market_factor * confidence_factor logger.info(f"Dynamic position calculation: {symbol}, base={base_size:.2f}, " f"volatility factor={volatility_factor:.2f}, trend factor={trend_factor:.2f}, " f"market factor={market_factor:.2f}, confidence factor={confidence_factor:.2f}, " f"final={dynamic_size:.2f}") return dynamic_size except Exception as e: logger.error(f"Dynamic position calculation error: {e}") return self.get_position_size(symbol, confidence, current_price) def _calculate_volatility_factor(self, volatility): """Calculate volatility adjustment factor""" if not self.volatility_adjustment: return 1.0 # Annualized volatility conversion and adjustment if volatility > 0.8: # 80%+ annualized volatility return 0.3 elif volatility > 0.6: # 60-80% return 0.5 elif volatility > 0.4: # 40-60% return 0.7 elif volatility > 0.2: # 20-40% return 0.9 else: # <20% return 1.0 def _calculate_trend_factor(self, trend_strength): """Calculate trend strength adjustment factor""" if not self.trend_adjustment: return 1.0 if trend_strength > 0.7: # Strong trend return 1.3 elif trend_strength > 0.4: # Medium trend return 1.1 elif trend_strength > 0.2: # Weak trend return 1.0 else: # No trend return 0.8 def _calculate_market_factor(self, market_state): """Calculate market state adjustment factor""" if not self.market_sentiment_adjustment: return 1.0 factors = { 'STRONG_BULL': 1.2, 'BULL': 1.1, 'NEUTRAL': 1.0, 'BEAR': 0.7, 'STRONG_BEAR': 0.5 } return factors.get(market_state, 1.0) def _calculate_confidence_factor(self, confidence): """Calculate confidence level adjustment factor""" factors = { 'HIGH': 1.0, 'MEDIUM': 0.7, 'LOW': 0.4 } return factors.get(confidence, 0.5) def assess_market_state(self, technical_indicators): """Assess market state""" try: # Get key indicators rsi = technical_indicators.get('rsi', 50) if hasattr(rsi, '__len__'): rsi = rsi.iloc[-1] macd_line, signal_line, _ = technical_indicators.get('macd', (0, 0, 0)) if hasattr(macd_line, '__len__'): macd_line, signal_line = macd_line.iloc[-1], signal_line.iloc[-1] sma_20 = technical_indicators.get('sma_20', 0) sma_50 = technical_indicators.get('sma_50', 0) current_price = technical_indicators.get('current_price', 0) if hasattr(sma_20, '__len__'): sma_20, sma_50 = sma_20.iloc[-1], sma_50.iloc[-1] # Calculate bullish signal score bull_signals = 0 total_signals = 0 # RSI signal if rsi > 50: bull_signals += 1 total_signals += 1 # MACD signal if macd_line > signal_line: bull_signals += 1 total_signals += 1 # Moving average signal if current_price > sma_20: bull_signals += 1 if current_price > sma_50: bull_signals += 1 total_signals += 2 # Determine market state bull_ratio = bull_signals / total_signals if bull_ratio >= 0.8: return 'STRONG_BULL' elif bull_ratio >= 0.6: return 'BULL' elif bull_ratio >= 0.4: return 'NEUTRAL' elif bull_ratio >= 0.2: return 'BEAR' else: return 'STRONG_BEAR' except Exception as e: logger.error(f"Error assessing market state: {e}") return 'NEUTRAL' def is_dust_position(self, symbol, current_price, base_amount): """Check if dust position""" hold_value = base_amount * current_price if current_price else 0 return hold_value < self.dust_threshold_value def is_low_position_ratio(self, symbol, total_assets, current_price, base_amount): """Check if position ratio is low""" hold_value = base_amount * current_price if current_price else 0 return (hold_value / total_assets) < self.low_position_ratio if total_assets > 0 else False def can_trade(self, symbol, amount, current_price): """Check if can trade""" try: # Check daily trade count today = datetime.now().date() if symbol not in self.daily_trade_count: self.daily_trade_count[symbol] = {'date': today, 'count': 0} if self.daily_trade_count[symbol]['date'] != today: self.daily_trade_count[symbol] = {'date': today, 'count': 0} if self.daily_trade_count[symbol]['count'] >= self.max_daily_trades: logger.warning(f"{symbol} daily trade count reached {self.max_daily_trades} limit") return False return True except Exception as e: logger.error(f"Error checking trade conditions: {e}") return False def increment_trade_count(self, symbol): """Increment trade count""" try: today = datetime.now().date() if symbol in self.daily_trade_count and self.daily_trade_count[symbol]['date'] == today: self.daily_trade_count[symbol]['count'] += 1 except Exception as e: logger.error(f"Error incrementing trade count: {e}") class TradingStrategy: """Trading Strategy (Comprehensive Optimized Version)""" def __init__(self, symbol, config, api, risk_manager, deepseek): self.symbol = symbol self.config = config self.api = api self.risk_manager = risk_manager self.deepseek = deepseek self.technical_analyzer = TechnicalAnalyzer() self.db = DatabaseManager() # Use default configuration # Position status self.base_amount = 0.0 self.entry_price = 0.0 # Dynamic stop-loss monitoring - using environment variable configured interval self.monitor_interval = 300 # Default 5 minutes if hasattr(risk_manager, 'monitor_interval'): self.monitor_interval = risk_manager.monitor_interval self.monitor_running = False self.monitor_thread = None # Running status control self.strategy_running = False # Load position status self.load_position() # Start dynamic stop-loss monitoring self.start_dynamic_stop_monitor() def load_position(self): """Load position status from database""" try: position_data = self.db.load_position(self.symbol) if position_data: self.base_amount = position_data['base_amount'] self.entry_price = position_data['entry_price'] logger.info(f"Loaded {self.symbol} position: currency quantity={self.base_amount:.10f}, entry price=${self.entry_price:.2f}") else: # If no database record, sync with exchange position self.sync_with_exchange() except Exception as e: logger.error(f"Error loading position status: {e}") self.sync_with_exchange() def save_position(self): """Save position status to database""" try: if self.base_amount > 0: self.db.save_position(self.symbol, self.base_amount, self.entry_price) else: # If position is 0, delete record self.db.delete_position(self.symbol) # Also delete dynamic stop record self.db.delete_dynamic_stop(self.symbol) except Exception as e: logger.error(f"Error saving position status: {e}") def sync_with_exchange(self): """Sync with exchange position""" try: logger.debug(f"Starting {self.symbol} position status sync") # Get all currency balances balances = self.api.get_currency_balances() if not balances: logger.warning("Unable to get currency balances") return False # Extract base currency base_currency = self.symbol.split('-')[0] # Update local balance if base_currency in balances: new_base_amount = balances[base_currency].get('amount', 0.0) # If position quantity changes, print log if abs(new_base_amount - self.base_amount) > 1e-10: logger.info(f"Synced {self.symbol} latest balance: {base_currency}={new_base_amount:.10f}") # If position decreases but entry price not updated, keep original entry price (for calculating profit/loss) if new_base_amount < self.base_amount and self.entry_price > 0: # Position decreased, reset entry price to conservative estimate (current price) if new_base_amount <= 0: self.entry_price = 0 logger.info(f"{self.symbol} position cleared, reset entry price to 0") else: current_price = self.api.get_current_price(self.symbol) if current_price: self.entry_price = current_price logger.info(f"{self.symbol} partial sell, reset entry price to current price: ${current_price:.2f}") else: logger.warning(f"{self.symbol} partial sell but unable to get current price, keep original entry price") # If from no position to having position, and no entry price record elif self.base_amount == 0 and new_base_amount > 0 and self.entry_price == 0: # Try to load historical entry price from database position_data = self.db.load_position(self.symbol) if position_data and position_data['entry_price'] > 0: self.entry_price = position_data['entry_price'] logger.info(f"Restored entry price from database: ${self.entry_price:.2f}") else: # Get current price as reference (but note this is reference price) current_price = self.api.get_current_price(self.symbol) if current_price: self.entry_price = current_price logger.info(f"Set reference entry price: ${current_price:.2f} (Note: This is reference price, actual entry price may differ)") self.base_amount = new_base_amount else: self.base_amount = 0.0 # Don't clear entry price, keep for historical record analysis logger.info(f"Synced {self.symbol}: No position") # Save position status self.save_position() return True except Exception as e: logger.error(f"Error syncing exchange position: {e}") return False def get_actual_position_from_exchange(self): """Get actual position data from exchange""" try: balances = self.api.get_currency_balances() if not balances: return 0.0 base_currency = self.symbol.split('-')[0] return balances.get(base_currency, {}).get('amount', 0.0) except Exception as e: logger.error(f"Error getting exchange position: {e}") return 0.0 def analyze_market(self): """Comprehensive market analysis (optimized version)""" try: # Get market data df = self.api.get_market_data(self.symbol, self.config['timeframe']) if df is None or len(df) < 20: logger.error(f"Failed to get {self.symbol} market data") return None # Calculate technical indicators technical_indicators = self.technical_analyzer.calculate_indicators(df) if not technical_indicators: logger.warning(f"Failed to calculate {self.symbol} technical indicators") return None # Weighted technical signals weighted_signals = self.technical_analyzer.generate_weighted_signals(df) # DeepSeek AI analysis deepseek_analysis = self.deepseek.analyze_market(self.symbol, df, technical_indicators) # Technical signals technical_signals = self.technical_analyzer.generate_signals(df) return { 'deepseek': deepseek_analysis, 'technical_signals': technical_signals, 'weighted_signals': weighted_signals, 'technical_indicators': technical_indicators, 'current_price': df['close'].iloc[-1], 'market_data': df } except Exception as e: logger.error(f"{self.symbol} market analysis error: {e}") return None def make_enhanced_decision(self, analysis): """Enhanced trading decision""" if analysis is None: return {'action': 'HOLD', 'confidence': 'LOW', 'reason': 'Insufficient analysis data'} deepseek_signal = analysis['deepseek'] weighted_signals = analysis['weighted_signals'] technical_indicators = analysis['technical_indicators'] market_data = analysis['market_data'] # Build decision matrix decision_matrix = self._build_decision_matrix(deepseek_signal, weighted_signals, technical_indicators, market_data) return self._finalize_decision(decision_matrix) def _build_decision_matrix(self, ai_signal, weighted_signal, technical_indicators, market_data): """Build decision matrix""" matrix = { 'ai_weight': 0.6, # AI analysis weight 'technical_weight': 0.2, # Traditional technical indicator weight 'reversal_weight': 0.05, # Reversal signal weight 'support_resistance_weight': 0.1, # Support resistance weight 'market_weight': 0.05 # Market state weight } # AI signal scoring ai_score = self._score_ai_signal(ai_signal) if ai_signal else 0.5 # Technical signal scoring tech_score = self._score_technical_signal(weighted_signal) # Market state scoring market_score = self._score_market_condition(technical_indicators) # Reversal signal scoring reversal_signals = TechnicalAnalyzer.detect_reversal_patterns(market_data) reversal_score = 0.5 if reversal_signals: # Score based on reversal signal quantity and strength strong_signals = [s for s in reversal_signals if s['strength'] == 'STRONG'] reversal_score = 0.5 + (len(strong_signals) * 0.1) + (len(reversal_signals) * 0.05) reversal_score = min(0.9, reversal_score) # Support resistance scoring sr_levels = TechnicalAnalyzer.calculate_support_resistance(market_data) sr_score = 0.5 if sr_levels: # Near support level adds points, near resistance level subtracts points if sr_levels['current_vs_support'] > -2: # Very close to support sr_score = 0.7 elif sr_levels['current_vs_resistance'] < 2: # Very close to resistance sr_score = 0.3 # Comprehensive scoring total_score = ( ai_score * matrix['ai_weight'] + tech_score * matrix['technical_weight'] + reversal_score * matrix['reversal_weight'] + sr_score * matrix['support_resistance_weight'] + market_score * matrix['market_weight'] ) logger.info(f"[{self.symbol}] Decision matrix details: " f"AI score={ai_score:.3f}, technical score={tech_score:.3f}, " f"market score={market_score:.3f}, reversal score={reversal_score:.3f}, " f"support resistance score={sr_score:.3f}, total score={total_score:.3f}") # Determine action and confidence if total_score > 0.6: action = 'BUY' confidence = 'HIGH' if total_score > 0.75 else 'MEDIUM' elif total_score < 0.4: action = 'SELL' confidence = 'HIGH' if total_score < 0.25 else 'MEDIUM' else: action = 'HOLD' confidence = 'MEDIUM' if abs(total_score - 0.5) > 0.1 else 'LOW' return { 'total_score': total_score, 'action': action, 'confidence': confidence, 'components': { 'ai_score': ai_score, 'tech_score': tech_score, 'market_score': market_score, 'reversal_score': reversal_score, 'sr_score': sr_score } } def _score_ai_signal(self, ai_signal): """AI signal scoring""" if not ai_signal: logger.warning("AI signal is empty, using neutral score") return 0.5 action = ai_signal.get('action', 'HOLD') confidence = ai_signal.get('confidence', 'MEDIUM') # Correction: Redesign scoring logic to ensure score has differentiation if action == 'BUY': base_score = 0.7 # Buy base score elif action == 'SELL': base_score = 0.3 # Sell base score else: # HOLD base_score = 0.5 # Hold neutral score # Confidence adjustment - ensure final score is within reasonable range confidence_adjustments = { 'HIGH': 0.2, # High confidence +0.2 'MEDIUM': 0.0, # Medium confidence unchanged 'LOW': -0.2 # Low confidence -0.2 } adjustment = confidence_adjustments.get(confidence, 0.0) final_score = base_score + adjustment # Ensure within 0-1 range final_score = max(0.1, min(0.9, final_score)) logger.debug(f"AI signal scoring: action={action}, confidence={confidence}, " f"base={base_score}, adjustment={adjustment}, final={final_score}") return final_score def _score_technical_signal(self, weighted_signal): """Technical signal scoring""" if not weighted_signal: logger.warning("Technical signal is empty, using neutral score") return 0.5 action = weighted_signal.get('action', 'HOLD') score = weighted_signal.get('score', 0.5) signal_count = weighted_signal.get('signal_count', 0) # Adjust based on signal strength and quantity if action == 'BUY': # Buy signal: 0.5-1.0 range if score > 0.7 and signal_count >= 3: final_score = 0.8 elif score > 0.6: final_score = 0.7 else: final_score = 0.6 elif action == 'SELL': # Sell signal: 0.0-0.5 range if score < 0.3 and signal_count >= 3: final_score = 0.2 elif score < 0.4: final_score = 0.3 else: final_score = 0.4 else: # HOLD final_score = 0.5 logger.debug(f"Technical signal scoring: action={action}, score={score}, " f"signal_count={signal_count}, final={final_score}") return final_score def _score_market_condition(self, technical_indicators): """Market state scoring""" try: # Assess market state based on multiple indicators rsi = technical_indicators.get('rsi', 50) if hasattr(rsi, '__len__'): rsi = rsi.iloc[-1] trend_strength = technical_indicators.get('trend_strength', 0) volatility = technical_indicators.get('volatility', 0) # RSI scoring (30-70 is neutral) rsi_score = 0.5 if rsi < 30: # Oversold rsi_score = 0.8 elif rsi > 70: # Overbought rsi_score = 0.2 # Trend strength scoring trend_score = 0.5 + (trend_strength * 0.3) # Strong trend favors buying # Volatility scoring (medium volatility is best) vol_score = 0.7 if 0.3 < volatility < 0.6 else 0.5 # Comprehensive scoring return (rsi_score + trend_score + vol_score) / 3 except Exception as e: logger.error(f"Market state scoring error: {e}") return 0.5 def _finalize_decision(self, decision_matrix): """Final decision""" action = decision_matrix['action'] confidence = decision_matrix['confidence'] total_score = decision_matrix['total_score'] reason = f"Comprehensive score: {total_score:.3f} (AI: {decision_matrix['components']['ai_score']:.3f}, " reason += f"Technical: {decision_matrix['components']['tech_score']:.3f}, " reason += f"Market: {decision_matrix['components']['market_score']:.3f}," reason += f"Reversal: {decision_matrix['components']['reversal_score']:.3f}," reason += f"Support Resistance: {decision_matrix['components']['sr_score']:.3f})," return { 'action': action, 'confidence': confidence, 'reason': reason, 'total_score': total_score, 'source': 'Enhanced Decision Matrix' } def execute_trade(self, decision, current_price): """Execute trade""" if decision['action'] == 'HOLD': logger.info(f"[{self.symbol}] Decision is HOLD, not executing trade") return # Sync exchange position self.sync_with_exchange() actual_position = self.base_amount # Use synced position logger.info(f"[{self.symbol}] Current position: {self.base_amount:.10f}") # Only get needed total asset information total_assets, available_usdt, _ = self.risk_manager.calculate_total_assets() if decision['action'] == 'BUY': # Get market analysis for dynamic position calculation market_analysis = self.analyze_market() if market_analysis is None: logger.warning(f"[{self.symbol}] Unable to get market analysis, using base position") buy_amount = self.risk_manager.get_position_size(self.symbol, decision['confidence'], current_price) else: # Use dynamic position calculation buy_amount = self.risk_manager.get_dynamic_position_size( self.symbol, decision['confidence'], market_analysis['technical_indicators'], current_price ) if buy_amount == 0: logger.warning(f"[{self.symbol}] Calculated position size is 0, skipping buy") return # Get exchange minimum trade quantity for buy check min_sz, _ = self.api.get_instrument_info(self.symbol) if min_sz is None: min_sz = self.api.get_default_min_size(self.symbol) # Calculate base currency quantity to buy estimated_base_amount = buy_amount / current_price if current_price > 0 else 0 logger.info(f"[{self.symbol}] Pre-buy check: estimated quantity={estimated_base_amount:.10f}, " f"minimum trade quantity={min_sz}, available USDT=${available_usdt:.2f}, " f"buy amount=${buy_amount:.2f}") # Check if buy quantity meets exchange minimum requirement if estimated_base_amount < min_sz: logger.warning(f"[{self.symbol}] Estimated buy quantity {estimated_base_amount:.10f} less than minimum trade quantity {min_sz}, canceling buy") return # Check if quote currency balance is sufficient if available_usdt < buy_amount: logger.warning(f"[{self.symbol}] Quote currency balance insufficient: need ${buy_amount:.2f}, current ${available_usdt:.2f}") return logger.info(f"[{self.symbol}] Creating buy order: amount=${buy_amount:.2f}, price=${current_price:.2f}") order_id = self.api.create_order( symbol=self.symbol, side='buy', amount=buy_amount ) if order_id: # Wait for order completion order_status = self.api.wait_for_order_completion(self.symbol, order_id) if order_status is None: logger.error(f"[{self.symbol}] Order not completed") return # Get actual fill price and quantity fill_price = order_status['avgPx'] fill_amount = order_status['accFillSz'] # Update position status self.base_amount += fill_amount # Update average entry price (weighted average) if self.entry_price > 0: # If adding position, calculate weighted average price self.entry_price = (self.entry_price * (self.base_amount - fill_amount) + fill_price * fill_amount) / self.base_amount logger.info(f"[{self.symbol}] Position addition operation, updated average entry price: ${self.entry_price:.2f}") else: self.entry_price = fill_price logger.info(f"[{self.symbol}] Buy successful: quantity={fill_amount:.10f}, price=${fill_price:.2f}") logger.info(f"[{self.symbol}] Updated position: {self.symbol.split('-')[0]}={self.base_amount:.10f}") # Sync exchange position self.sync_with_exchange() # Set dynamic stop-loss trailing_percent = self.risk_manager.trailing_stop_percent self.db.set_dynamic_stop(self.symbol, fill_price, trailing_percent=trailing_percent) logger.info(f"[{self.symbol}] Set dynamic stop-loss: initial price=${fill_price:.2f}, trailing_percent={trailing_percent:.1%}") # Note: sync_with_exchange already saves position, no need to save again here self.risk_manager.increment_trade_count(self.symbol) else: logger.error(f"[{self.symbol}] Buy order creation failed") elif decision['action'] == 'SELL': if actual_position <= 0: logger.info(f"[{self.symbol}] Exchange actual position is 0, ignoring sell signal") # Update local status self.base_amount = 0 self.entry_price = 0 self.save_position() return # Prepare position information for AI analysis position_info = self._prepare_position_info(actual_position, current_price, total_assets) # Get market analysis data (for technical indicators) market_analysis = self.analyze_market() # Use DeepSeek to analyze sell proportion try: sell_proportion = self.deepseek.analyze_sell_proportion( self.symbol, position_info, market_analysis, current_price ) except Exception as e: logger.warning(f"[{self.symbol}] DeepSeek sell proportion analysis failed: {e}, using fallback decision") sell_proportion = self.deepseek._get_fallback_sell_proportion(position_info, decision['confidence']) # Calculate sell quantity based on AI suggestion sell_amount = actual_position * sell_proportion # Ensure sell quantity doesn't exceed actual position sell_amount = min(sell_amount, actual_position) logger.info(f"[{self.symbol}] AI suggested sell proportion: {sell_proportion:.1%}, calculated sell quantity: {sell_amount:.10f}") # Get exchange minimum trade quantity for sell check min_sz, _ = self.api.get_instrument_info(self.symbol) if min_sz is None: min_sz = self.api.get_default_min_size(self.symbol) # Check if sell quantity is less than exchange minimum trade quantity if sell_amount < min_sz: if sell_proportion < 1.0: # Partial sell logger.info(f"[{self.symbol}] Sell quantity {sell_amount:.10f} less than minimum trade quantity {min_sz}, ignoring this partial sell") return else: # Full sell logger.info(f"[{self.symbol}] Position quantity {sell_amount:.10f} less than minimum trade quantity {min_sz}, dust position, ignoring sell") return # Record sell decision details logger.info(f"[{self.symbol}] Final sell decision: proportion={sell_proportion:.1%}, quantity={sell_amount:.10f}, " f"reason={decision.get('reason', 'AI decision')}") # Execute sell order order_id = self.api.create_order( symbol=self.symbol, side='sell', amount=sell_amount ) if order_id: # Wait for order completion order_status = self.api.wait_for_order_completion(self.symbol, order_id) if order_status is None: logger.error(f"[{self.symbol}] Order not completed") return # Get actual fill price and quantity fill_price = order_status['avgPx'] fill_amount = order_status['accFillSz'] # Calculate profit profit = (fill_price - self.entry_price) * fill_amount profit_percent = (fill_price / self.entry_price - 1) * 100 if self.entry_price > 0 else 0 # Sync exchange position self.sync_with_exchange() logger.info(f"[{self.symbol}] Sell successful: quantity={fill_amount:.10f}, price=${fill_price:.2f}") logger.info(f"[{self.symbol}] Profit: ${profit:.2f} ({profit_percent:+.2f}%)") logger.info(f"[{self.symbol}] Updated position: {self.symbol.split('-')[0]}={self.base_amount:.10f}") # Save trade record self.db.save_trade_record(self.symbol, 'sell', fill_amount, fill_price, order_id) # If fully sold, reset status if self.base_amount <= 0: self.entry_price = 0 self.db.delete_dynamic_stop(self.symbol) logger.info(f"[{self.symbol}] Position cleared, removed dynamic stop-loss") else: # After partial sell, update dynamic stop-loss self.db.set_dynamic_stop(self.symbol, self.entry_price, trailing_percent=0.03, multiplier=2) logger.info(f"[{self.symbol}] After partial sell, reset dynamic stop-loss") # Note: sync_with_exchange already saves position, no need to save again here self.risk_manager.increment_trade_count(self.symbol) else: logger.error(f"[{self.symbol}] Sell order creation failed") def _prepare_position_info(self, actual_position, current_price, total_assets): """Prepare position information for AI analysis""" try: # Calculate position value position_value = actual_position * current_price # Calculate position ratio position_ratio = position_value / total_assets if total_assets > 0 else 0 # Calculate profit ratio profit_ratio = (current_price - self.entry_price) / self.entry_price if self.entry_price > 0 else 0 # Calculate distance to stop-loss/take-profit stop_loss_price = self.entry_price * (1 - self.risk_manager.stop_loss) if self.entry_price > 0 else 0 take_profit_price = self.entry_price * (1 + self.risk_manager.take_profit) if self.entry_price > 0 else 0 distance_to_stop_loss = (current_price - stop_loss_price) / current_price if current_price > 0 else 0 distance_to_take_profit = (take_profit_price - current_price) / current_price if current_price > 0 else 0 # Get market volatility market_data = self.api.get_market_data(self.symbol, '1H', limit=24) if market_data is not None and len(market_data) > 0: market_volatility = market_data['close'].pct_change().std() * 100 else: market_volatility = 0 # Get trend strength market_analysis = self.analyze_market() trend_strength = 0 if market_analysis and 'technical_indicators' in market_analysis: trend_strength = market_analysis['technical_indicators'].get('trend_strength', 0) position_info = { 'base_amount': actual_position, 'entry_price': self.entry_price, 'current_price': current_price, 'position_value': position_value, 'position_ratio': position_ratio, 'profit_ratio': profit_ratio, 'distance_to_stop_loss': distance_to_stop_loss, 'distance_to_take_profit': distance_to_take_profit, 'market_volatility': market_volatility, 'trend_strength': trend_strength, 'total_assets': total_assets } logger.debug(f"[{self.symbol}] Position analysis information: ratio={position_ratio:.2%}, profit={profit_ratio:+.2%}, " f"distance to stop-loss={distance_to_stop_loss:.2%}, volatility={market_volatility:.2f}%") return position_info except Exception as e: logger.error(f"[{self.symbol}] Error preparing position information: {e}") # Return basic position information return { 'base_amount': actual_position, 'entry_price': self.entry_price, 'current_price': current_price, 'position_value': actual_position * current_price, 'position_ratio': 0, 'profit_ratio': 0, 'distance_to_stop_loss': 0, 'distance_to_take_profit': 0, 'market_volatility': 0, 'trend_strength': 0, 'total_assets': total_assets } def start_dynamic_stop_monitor(self): """Start dynamic stop-loss monitoring""" if self.monitor_running and self.monitor_thread and self.monitor_thread.is_alive(): logger.info(f"[{self.symbol}] Dynamic stop-loss monitoring already running") return self.monitor_running = True self.monitor_thread = threading.Thread(target=self._dynamic_stop_monitor_loop, name=f"StopMonitor-{self.symbol}") self.monitor_thread.daemon = True self.monitor_thread.start() logger.info(f"[{self.symbol}] Dynamic stop-loss monitoring started") def stop_dynamic_stop_monitor(self): """Stop dynamic stop-loss monitoring""" self.monitor_running = False if self.monitor_thread and self.monitor_thread.is_alive(): self.monitor_thread.join(timeout=5) if self.monitor_thread.is_alive(): logger.warning(f"{self.symbol} dynamic stop-loss monitoring thread didn't exit normally") else: logger.info(f"{self.symbol} dynamic stop-loss monitoring stopped") def _dynamic_stop_monitor_loop(self): """Dynamic stop-loss monitoring loop""" while self.monitor_running: try: # Only monitor when there is position if self.base_amount > 0: self.check_dynamic_stops() # Monitor every 5 minutes time.sleep(self.monitor_interval) except Exception as e: logger.error(f"[{self.symbol}] Dynamic stop-loss monitoring error: {e}") time.sleep(60) # Wait 1 minute before continuing when error occurs def check_dynamic_stops(self): """Check dynamic stop-loss conditions""" try: # Get current price current_price = self.api.get_current_price(self.symbol) if not current_price: logger.warning(f"[{self.symbol}] Unable to get current price, skipping stop-loss check") return # Get dynamic stop-loss information stop_info = self.db.get_dynamic_stop(self.symbol) if not stop_info: logger.debug(f"[{self.symbol}] No dynamic stop-loss settings") return stop_loss_price = stop_info['current_stop_loss'] take_profit_price = stop_info['current_take_profit'] trailing_percent = stop_info['trailing_percent'] logger.debug(f"[{self.symbol}] Dynamic stop-loss check: current price=${current_price:.2f}, stop-loss=${stop_loss_price:.2f}, take-profit=${take_profit_price:.2f}") # Check stop-loss condition if current_price <= stop_loss_price: logger.warning(f"⚠️ {self.symbol} triggered dynamic stop-loss! Current price=${current_price:.2f} <= stop-loss price=${stop_loss_price:.2f}") self.execute_trade({ 'action': 'SELL', 'confidence': 'HIGH', 'reason': f'Dynamic stop-loss triggered: {current_price:.2f} <= {stop_loss_price:.2f}' }, current_price) return # Check take-profit condition if current_price >= take_profit_price: logger.info(f"🎯 {self.symbol} triggered dynamic take-profit! Current price=${current_price:.2f} >= take-profit price=${take_profit_price:.2f}") self.execute_trade({ 'action': 'SELL', 'confidence': 'HIGH', 'reason': f'Dynamic take-profit triggered: {current_price:.2f} >= {take_profit_price:.2f}' }, current_price) return # Update trailing stop-loss (only moves upward) - fix logic if stop_info and current_price > stop_info.get('highest_price', 0): new_stop_loss = self.db.update_dynamic_stop(self.symbol, current_price, trailing_percent=trailing_percent, multiplier=2) if new_stop_loss: logger.info(f"[{self.symbol}] Trailing stop-loss updated: new stop-loss price=${new_stop_loss:.2f}") except Exception as e: logger.error(f"[{self.symbol}] Error checking dynamic stop-loss: {e}") def run(self): """Run strategy (using enhanced decision)""" self.strategy_running = True while self.strategy_running: try: logger.info(f"\n=== {self.symbol} Strategy Auto Execution ===") # Market analysis analysis = self.analyze_market() if analysis is None: logger.warning("Analysis failed, waiting for next execution") time.sleep(self.config['interval']) continue # Make trading decision decision = self.make_enhanced_decision(analysis) logger.info(f"{self.symbol} Decision: {decision['action']} (Confidence: {decision['confidence']}, Score: {decision['total_score']:.3f})") if decision['reason']: logger.info(f"Reason: {decision['reason']}") # Execute trade self.execute_trade(decision, analysis['current_price']) # Risk management check self.check_risk_management(analysis['current_price']) time.sleep(self.config['interval']) except Exception as e: logger.error(f"Strategy execution error: {e}") if "Network" in str(e) or "Connection" in str(e): wait_time = min(300, self.config['interval'] * 2) # Network error wait longer else: wait_time = self.config['interval'] if self.strategy_running: time.sleep(wait_time) def stop_strategy(self): """Stop strategy running""" with threading.Lock(): # Add thread lock self.strategy_running = False logger.info(f"{self.symbol} strategy stop signal sent") def check_risk_management(self, current_price): """Check risk management (fixed stop-loss/take-profit)""" if self.base_amount == 0: return # Calculate profit/loss percentage pnl_pct = (current_price - self.entry_price) / self.entry_price if self.entry_price > 0 else 0 # Fixed stop-loss/take-profit check if pnl_pct <= -self.risk_manager.stop_loss: logger.warning(f"⚠️ {self.symbol} triggered fixed stop-loss ({pnl_pct:.2%})") self.execute_trade({ 'action': 'SELL', 'confidence': 'HIGH', 'reason': 'Fixed stop-loss triggered' }, current_price) elif pnl_pct >= self.risk_manager.take_profit: logger.info(f"🎯 {self.symbol} triggered fixed take-profit ({pnl_pct:.2%})") self.execute_trade({ 'action': 'SELL', 'confidence': 'HIGH', 'reason': 'Fixed take-profit triggered' }, current_price) class MultiStrategyRunner: """Multi-Strategy Runner""" def __init__(self): self.running = False self.strategies = {} self.threads = {} # Configure each currency strategy self.symbol_configs = { 'ETH-USDT': {'name': 'Ethereum', 'interval': 1800, 'timeframe': '1H'}, 'BTC-USDT': {'name': 'Bitcoin', 'interval': 1800, 'timeframe': '1H'}, 'SOL-USDT': {'name': 'Solana', 'interval': 1800, 'timeframe': '1H'}, 'XRP-USDT': {'name': 'Ripple', 'interval': 1800, 'timeframe': '1H'}, 'BNB-USDT': {'name': 'Binance Coin', 'interval': 1800, 'timeframe': '1H'}, 'OKB-USDT': {'name': 'OKB', 'interval': 1800, 'timeframe': '1H'}, } # Initialize API client self.api = OKXAPIClient() self.deepseek = DeepSeekAnalyzer() self.risk_manager = RiskManager(self.api) # Initialize all strategies for symbol, config in self.symbol_configs.items(): self.strategies[symbol] = TradingStrategy(symbol, config, self.api, self.risk_manager, self.deepseek) # Register exit handlers atexit.register(self.shutdown) signal.signal(signal.SIGINT, lambda s, f: self.shutdown()) signal.signal(signal.SIGTERM, lambda s, f: self.shutdown()) def start_strategy(self, symbol): """Start single currency strategy""" if symbol not in self.strategies: logger.error(f"Unsupported {symbol} trading pair") return False # Check if already running if symbol in self.threads and self.threads[symbol].is_alive(): logger.info(f"{symbol} strategy already running") return True def strategy_worker(): """Strategy worker thread""" strategy = self.strategies[symbol] logger.info(f"Starting {symbol} strategy") try: # Directly run strategy, strategy has its own loop internally strategy.run() except Exception as e: logger.error(f"{symbol} strategy execution error: {e}") finally: logger.info(f"{symbol} strategy thread ended") # Remove from active thread list when thread ends if symbol in self.threads: del self.threads[symbol] # Create and start thread thread = threading.Thread(target=strategy_worker, name=f"Strategy-{symbol}") thread.daemon = True thread.start() self.threads[symbol] = thread logger.info(f"Started {symbol} strategy") return True def start_all_strategies(self): """Start all strategies""" self.running = True for symbol in self.strategies.keys(): self.start_strategy(symbol) time.sleep(60) # Each currency starts with 60 second interval logger.info("All strategies started") def stop_strategy(self, symbol): """Stop single currency strategy""" if symbol in self.strategies: # First stop strategy self.strategies[symbol].stop_strategy() if symbol in self.threads: thread = self.threads[symbol] thread.join(timeout=10) # Wait 10 seconds if thread.is_alive(): logger.warning(f"Unable to stop {symbol} strategy thread") else: del self.threads[symbol] logger.info(f"Stopped {symbol} strategy") # Stop dynamic stop-loss monitoring if symbol in self.strategies: self.strategies[symbol].stop_dynamic_stop_monitor() def stop_all_strategies(self): """Stop all strategies""" self.running = False # First send stop signal to all strategies for symbol in list(self.strategies.keys()): self.strategies[symbol].stop_strategy() # Then stop threads for symbol in list(self.threads.keys()): self.stop_strategy(symbol) logger.info("All strategies stopped") def get_status(self): """Get system status""" status = { 'running': self.running, 'active_strategies': list(self.threads.keys()), 'strategies_detail': {} } for symbol, strategy in self.strategies.items(): status['strategies_detail'][symbol] = { 'base_amount': strategy.base_amount, 'entry_price': strategy.entry_price, 'config': self.symbol_configs[symbol] } return status def shutdown(self): """System shutdown handling""" logger.info("System shutting down, stopping all strategies and monitoring...") self.stop_all_strategies() # Stop all dynamic stop-loss monitoring for symbol, strategy in self.strategies.items(): try: strategy.stop_dynamic_stop_monitor() logger.info(f"Stopped {symbol} dynamic stop-loss monitoring") except Exception as e: logger.error(f"Error stopping {symbol} dynamic stop-loss monitoring: {e}") # Wait for all threads to end for symbol, thread in list(self.threads.items()): if thread.is_alive(): thread.join(timeout=5) # Wait 5 seconds if thread.is_alive(): logger.warning(f"{symbol} strategy thread didn't exit normally") # Ensure all resources released time.sleep(1) logger.info("System shutdown completed") print("👋 System safely exited, thank you for using!") def main(): """Main function""" print("=" * 60) print(" " * 20 + "Multi-Currency Intelligent Trading System") print("=" * 60) print("Core Features:") print(" • DeepSeek AI Driven Decisions") print(" • KDJ + RSI + ATR Technical Indicators") print(" • Multi-level Risk Control") print(" • Position Status Persistence") print("=" * 60) # Check environment variables required_env_vars = ['OKX_API_KEY', 'OKX_SECRET_KEY', 'OKX_PASSWORD'] if not all(os.getenv(var) for var in required_env_vars): print("❌ Please set OKX API environment variables") return # Create strategy runner runner = MultiStrategyRunner() # Add debug mode switch debug_mode = False def toggle_debug_mode(): nonlocal debug_mode debug_mode = not debug_mode runner.api.log_http = debug_mode print(f"Debug mode {'enabled' if debug_mode else 'disabled'}") try: while True: print("\n" + "=" * 50) print(" " * 15 + "Trading System Control Panel") print("=" * 50) print("1. Start All Strategies") print("2. Start Specific Strategy") print("3. Stop All Strategies") print("4. Stop Specific Strategy") print("5. View System Status") print("6. Manual Position Sync") print("7. Toggle Debug Mode") print("8. Exit System") print("=" * 50) choice = input("Please enter choice (1-8): ").strip() if choice == '1': print("Starting all trading strategies...") runner.start_all_strategies() print("✅ All strategies started") elif choice == '2': print("\nSelect currency to start:") symbols = list(runner.symbol_configs.keys()) for i, symbol in enumerate(symbols, 1): config = runner.symbol_configs[symbol] print(f"{i}. {symbol} ({config['name']})") print(f"{len(symbols)+1}. Back") symbol_choice = input("Please enter choice: ").strip() if symbol_choice.isdigit(): index = int(symbol_choice) - 1 if 0 <= index < len(symbols): symbol = symbols[index] runner.start_strategy(symbol) print(f"✅ Started {symbol} strategy") elif index == len(symbols): continue else: print("❌ Invalid choice") else: print("❌ Invalid input") elif choice == '3': print("Stopping all trading strategies...") runner.stop_all_strategies() print("✅ All strategies stopped") elif choice == '4': print("\nSelect currency to stop:") symbols = list(runner.symbol_configs.keys()) for i, symbol in enumerate(symbols, 1): config = runner.symbol_configs[symbol] print(f"{i}. {symbol} ({config['name']})") print(f"{len(symbols)+1}. Back") symbol_choice = input("Please enter choice: ").strip() if symbol_choice.isdigit(): index = int(symbol_choice) - 1 if 0 <= index < len(symbols): symbol = symbols[index] runner.stop_strategy(symbol) print(f"✅ Stopped {symbol} strategy") elif index == len(symbols): continue else: print("❌ Invalid choice") else: print("❌ Invalid input") elif choice == '5': status = runner.get_status() print(f"\nSystem Status: {'Running' if status['running'] else 'Stopped'}") print(f"Active Strategies: {len(status['active_strategies'])}") for symbol in status['active_strategies']: print(f" • {symbol}") print("\nPosition Status:") for symbol, detail in status['strategies_detail'].items(): base_amount = detail['base_amount'] entry_price = detail['entry_price'] if base_amount > 0: print(f" {symbol}: Position {base_amount:.10f} @ ${entry_price:.2f}") elif choice == '6': print("Manual sync all currency positions...") for symbol, strategy in runner.strategies.items(): strategy.sync_with_exchange() print("✅ Sync completed") elif choice == '7': toggle_debug_mode() elif choice == '8': print("Exiting system, please wait for all strategies to stop...") break else: print("❌ Invalid choice, please re-enter") except KeyboardInterrupt: print("\nReceived interrupt signal, exiting...") if __name__ == "__main__": main()