OKXTrading/multistrategyrunner.py

168 lines
6.0 KiB
Python
Raw Permalink Normal View History

2025-11-05 19:39:02 +08:00
import atexit
import logging
import signal
import time
import threading
from okxapiclient import OKXAPIClient
from deepseekanalyzer import DeepSeekAnalyzer
from riskmanager import RiskManager
from tradingstrategy import TradingStrategy
logger = logging.getLogger(__name__)
class MultiStrategyRunner:
"""Multi-Strategy Runner"""
def __init__(self):
self.running = False
self.strategies = {}
self.threads = {}
# Configure each currency strategy
self.symbol_configs = {
'ETH-USDT': {'name': 'Ethereum', 'interval': 1800, 'timeframe': '1H'},
'BTC-USDT': {'name': 'Bitcoin', 'interval': 1800, 'timeframe': '1H'},
'SOL-USDT': {'name': 'Solana', 'interval': 1800, 'timeframe': '1H'},
'XRP-USDT': {'name': 'Ripple', 'interval': 1800, 'timeframe': '1H'},
'BNB-USDT': {'name': 'Binance Coin', 'interval': 1800, 'timeframe': '1H'},
'OKB-USDT': {'name': 'OKB', 'interval': 1800, 'timeframe': '1H'},
}
# Initialize API client
self.api = OKXAPIClient()
self.deepseek = DeepSeekAnalyzer()
self.risk_manager = RiskManager(self.api)
# Initialize all strategies
for symbol, config in self.symbol_configs.items():
self.strategies[symbol] = TradingStrategy(symbol, config, self.api, self.risk_manager, self.deepseek)
# Register exit handlers
atexit.register(self.shutdown)
signal.signal(signal.SIGINT, lambda s, f: self.shutdown())
signal.signal(signal.SIGTERM, lambda s, f: self.shutdown())
def start_strategy(self, symbol):
"""Start single currency strategy"""
if symbol not in self.strategies:
logger.error(f"Unsupported {symbol} trading pair")
return False
# Check if already running
if symbol in self.threads and self.threads[symbol].is_alive():
logger.info(f"{symbol} strategy already running")
return True
def strategy_worker():
"""Strategy worker thread"""
strategy = self.strategies[symbol]
logger.info(f"Starting {symbol} strategy")
try:
# Directly run strategy, strategy has its own loop internally
strategy.run()
except Exception as e:
logger.error(f"{symbol} strategy execution error: {e}")
finally:
logger.info(f"{symbol} strategy thread ended")
# Remove from active thread list when thread ends
if symbol in self.threads:
del self.threads[symbol]
# Create and start thread
thread = threading.Thread(target=strategy_worker, name=f"Strategy-{symbol}")
thread.daemon = True
thread.start()
self.threads[symbol] = thread
logger.info(f"Started {symbol} strategy")
return True
def start_all_strategies(self):
"""Start all strategies"""
self.running = True
for symbol in self.strategies.keys():
self.start_strategy(symbol)
time.sleep(60) # Each currency starts with 60 second interval
logger.info("All strategies started")
def stop_strategy(self, symbol):
"""Stop single currency strategy"""
if symbol in self.strategies:
# First stop strategy
self.strategies[symbol].stop_strategy()
if symbol in self.threads:
thread = self.threads[symbol]
thread.join(timeout=10) # Wait 10 seconds
if thread.is_alive():
logger.warning(f"Unable to stop {symbol} strategy thread")
else:
del self.threads[symbol]
logger.info(f"Stopped {symbol} strategy")
# Stop dynamic stop-loss monitoring
if symbol in self.strategies:
self.strategies[symbol].stop_dynamic_stop_monitor()
def stop_all_strategies(self):
"""Stop all strategies"""
self.running = False
# First send stop signal to all strategies
for symbol in list(self.strategies.keys()):
self.strategies[symbol].stop_strategy()
# Then stop threads
for symbol in list(self.threads.keys()):
self.stop_strategy(symbol)
logger.info("All strategies stopped")
def get_status(self):
"""Get system status"""
status = {
'running': self.running,
'active_strategies': list(self.threads.keys()),
'strategies_detail': {}
}
for symbol, strategy in self.strategies.items():
status['strategies_detail'][symbol] = {
'base_amount': strategy.base_amount,
'entry_price': strategy.entry_price,
'config': self.symbol_configs[symbol]
}
return status
def shutdown(self):
"""System shutdown handling"""
logger.info("System shutting down, stopping all strategies and monitoring...")
self.stop_all_strategies()
# Stop all dynamic stop-loss monitoring
for symbol, strategy in self.strategies.items():
try:
strategy.stop_dynamic_stop_monitor()
logger.info(f"Stopped {symbol} dynamic stop-loss monitoring")
except Exception as e:
logger.error(f"Error stopping {symbol} dynamic stop-loss monitoring: {e}")
# Wait for all threads to end
for symbol, thread in list(self.threads.items()):
if thread.is_alive():
thread.join(timeout=5) # Wait 5 seconds
if thread.is_alive():
logger.warning(f"{symbol} strategy thread didn't exit normally")
# Ensure all resources released
time.sleep(1)
logger.info("System shutdown completed")
print("👋 System safely exited, thank you for using!")