OKXTrading/multistrategyrunner.py
2025-11-05 19:39:02 +08:00

168 lines
6.0 KiB
Python

import atexit
import logging
import signal
import time
import threading
from okxapiclient import OKXAPIClient
from deepseekanalyzer import DeepSeekAnalyzer
from riskmanager import RiskManager
from tradingstrategy import TradingStrategy
logger = logging.getLogger(__name__)
class MultiStrategyRunner:
"""Multi-Strategy Runner"""
def __init__(self):
self.running = False
self.strategies = {}
self.threads = {}
# Configure each currency strategy
self.symbol_configs = {
'ETH-USDT': {'name': 'Ethereum', 'interval': 1800, 'timeframe': '1H'},
'BTC-USDT': {'name': 'Bitcoin', 'interval': 1800, 'timeframe': '1H'},
'SOL-USDT': {'name': 'Solana', 'interval': 1800, 'timeframe': '1H'},
'XRP-USDT': {'name': 'Ripple', 'interval': 1800, 'timeframe': '1H'},
'BNB-USDT': {'name': 'Binance Coin', 'interval': 1800, 'timeframe': '1H'},
'OKB-USDT': {'name': 'OKB', 'interval': 1800, 'timeframe': '1H'},
}
# Initialize API client
self.api = OKXAPIClient()
self.deepseek = DeepSeekAnalyzer()
self.risk_manager = RiskManager(self.api)
# Initialize all strategies
for symbol, config in self.symbol_configs.items():
self.strategies[symbol] = TradingStrategy(symbol, config, self.api, self.risk_manager, self.deepseek)
# Register exit handlers
atexit.register(self.shutdown)
signal.signal(signal.SIGINT, lambda s, f: self.shutdown())
signal.signal(signal.SIGTERM, lambda s, f: self.shutdown())
def start_strategy(self, symbol):
"""Start single currency strategy"""
if symbol not in self.strategies:
logger.error(f"Unsupported {symbol} trading pair")
return False
# Check if already running
if symbol in self.threads and self.threads[symbol].is_alive():
logger.info(f"{symbol} strategy already running")
return True
def strategy_worker():
"""Strategy worker thread"""
strategy = self.strategies[symbol]
logger.info(f"Starting {symbol} strategy")
try:
# Directly run strategy, strategy has its own loop internally
strategy.run()
except Exception as e:
logger.error(f"{symbol} strategy execution error: {e}")
finally:
logger.info(f"{symbol} strategy thread ended")
# Remove from active thread list when thread ends
if symbol in self.threads:
del self.threads[symbol]
# Create and start thread
thread = threading.Thread(target=strategy_worker, name=f"Strategy-{symbol}")
thread.daemon = True
thread.start()
self.threads[symbol] = thread
logger.info(f"Started {symbol} strategy")
return True
def start_all_strategies(self):
"""Start all strategies"""
self.running = True
for symbol in self.strategies.keys():
self.start_strategy(symbol)
time.sleep(60) # Each currency starts with 60 second interval
logger.info("All strategies started")
def stop_strategy(self, symbol):
"""Stop single currency strategy"""
if symbol in self.strategies:
# First stop strategy
self.strategies[symbol].stop_strategy()
if symbol in self.threads:
thread = self.threads[symbol]
thread.join(timeout=10) # Wait 10 seconds
if thread.is_alive():
logger.warning(f"Unable to stop {symbol} strategy thread")
else:
del self.threads[symbol]
logger.info(f"Stopped {symbol} strategy")
# Stop dynamic stop-loss monitoring
if symbol in self.strategies:
self.strategies[symbol].stop_dynamic_stop_monitor()
def stop_all_strategies(self):
"""Stop all strategies"""
self.running = False
# First send stop signal to all strategies
for symbol in list(self.strategies.keys()):
self.strategies[symbol].stop_strategy()
# Then stop threads
for symbol in list(self.threads.keys()):
self.stop_strategy(symbol)
logger.info("All strategies stopped")
def get_status(self):
"""Get system status"""
status = {
'running': self.running,
'active_strategies': list(self.threads.keys()),
'strategies_detail': {}
}
for symbol, strategy in self.strategies.items():
status['strategies_detail'][symbol] = {
'base_amount': strategy.base_amount,
'entry_price': strategy.entry_price,
'config': self.symbol_configs[symbol]
}
return status
def shutdown(self):
"""System shutdown handling"""
logger.info("System shutting down, stopping all strategies and monitoring...")
self.stop_all_strategies()
# Stop all dynamic stop-loss monitoring
for symbol, strategy in self.strategies.items():
try:
strategy.stop_dynamic_stop_monitor()
logger.info(f"Stopped {symbol} dynamic stop-loss monitoring")
except Exception as e:
logger.error(f"Error stopping {symbol} dynamic stop-loss monitoring: {e}")
# Wait for all threads to end
for symbol, thread in list(self.threads.items()):
if thread.is_alive():
thread.join(timeout=5) # Wait 5 seconds
if thread.is_alive():
logger.warning(f"{symbol} strategy thread didn't exit normally")
# Ensure all resources released
time.sleep(1)
logger.info("System shutdown completed")
print("👋 System safely exited, thank you for using!")