168 lines
6.0 KiB
Python
168 lines
6.0 KiB
Python
import atexit
|
|
import logging
|
|
import signal
|
|
import time
|
|
import threading
|
|
|
|
from okxapiclient import OKXAPIClient
|
|
from deepseekanalyzer import DeepSeekAnalyzer
|
|
from riskmanager import RiskManager
|
|
from tradingstrategy import TradingStrategy
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class MultiStrategyRunner:
|
|
"""Multi-Strategy Runner"""
|
|
|
|
def __init__(self):
|
|
self.running = False
|
|
self.strategies = {}
|
|
self.threads = {}
|
|
|
|
# Configure each currency strategy
|
|
self.symbol_configs = {
|
|
'ETH-USDT': {'name': 'Ethereum', 'interval': 1800, 'timeframe': '1H'},
|
|
'BTC-USDT': {'name': 'Bitcoin', 'interval': 1800, 'timeframe': '1H'},
|
|
'SOL-USDT': {'name': 'Solana', 'interval': 1800, 'timeframe': '1H'},
|
|
'XRP-USDT': {'name': 'Ripple', 'interval': 1800, 'timeframe': '1H'},
|
|
'BNB-USDT': {'name': 'Binance Coin', 'interval': 1800, 'timeframe': '1H'},
|
|
'OKB-USDT': {'name': 'OKB', 'interval': 1800, 'timeframe': '1H'},
|
|
}
|
|
|
|
# Initialize API client
|
|
self.api = OKXAPIClient()
|
|
self.deepseek = DeepSeekAnalyzer()
|
|
self.risk_manager = RiskManager(self.api)
|
|
|
|
# Initialize all strategies
|
|
for symbol, config in self.symbol_configs.items():
|
|
self.strategies[symbol] = TradingStrategy(symbol, config, self.api, self.risk_manager, self.deepseek)
|
|
|
|
# Register exit handlers
|
|
atexit.register(self.shutdown)
|
|
signal.signal(signal.SIGINT, lambda s, f: self.shutdown())
|
|
signal.signal(signal.SIGTERM, lambda s, f: self.shutdown())
|
|
|
|
def start_strategy(self, symbol):
|
|
"""Start single currency strategy"""
|
|
if symbol not in self.strategies:
|
|
logger.error(f"Unsupported {symbol} trading pair")
|
|
return False
|
|
|
|
# Check if already running
|
|
if symbol in self.threads and self.threads[symbol].is_alive():
|
|
logger.info(f"{symbol} strategy already running")
|
|
return True
|
|
|
|
def strategy_worker():
|
|
"""Strategy worker thread"""
|
|
strategy = self.strategies[symbol]
|
|
logger.info(f"Starting {symbol} strategy")
|
|
|
|
try:
|
|
# Directly run strategy, strategy has its own loop internally
|
|
strategy.run()
|
|
except Exception as e:
|
|
logger.error(f"{symbol} strategy execution error: {e}")
|
|
finally:
|
|
logger.info(f"{symbol} strategy thread ended")
|
|
# Remove from active thread list when thread ends
|
|
if symbol in self.threads:
|
|
del self.threads[symbol]
|
|
|
|
# Create and start thread
|
|
thread = threading.Thread(target=strategy_worker, name=f"Strategy-{symbol}")
|
|
thread.daemon = True
|
|
thread.start()
|
|
self.threads[symbol] = thread
|
|
|
|
logger.info(f"Started {symbol} strategy")
|
|
return True
|
|
|
|
def start_all_strategies(self):
|
|
"""Start all strategies"""
|
|
self.running = True
|
|
|
|
for symbol in self.strategies.keys():
|
|
self.start_strategy(symbol)
|
|
time.sleep(60) # Each currency starts with 60 second interval
|
|
|
|
|
|
logger.info("All strategies started")
|
|
|
|
def stop_strategy(self, symbol):
|
|
"""Stop single currency strategy"""
|
|
if symbol in self.strategies:
|
|
# First stop strategy
|
|
self.strategies[symbol].stop_strategy()
|
|
|
|
if symbol in self.threads:
|
|
thread = self.threads[symbol]
|
|
thread.join(timeout=10) # Wait 10 seconds
|
|
if thread.is_alive():
|
|
logger.warning(f"Unable to stop {symbol} strategy thread")
|
|
else:
|
|
del self.threads[symbol]
|
|
logger.info(f"Stopped {symbol} strategy")
|
|
|
|
# Stop dynamic stop-loss monitoring
|
|
if symbol in self.strategies:
|
|
self.strategies[symbol].stop_dynamic_stop_monitor()
|
|
|
|
def stop_all_strategies(self):
|
|
"""Stop all strategies"""
|
|
self.running = False
|
|
|
|
# First send stop signal to all strategies
|
|
for symbol in list(self.strategies.keys()):
|
|
self.strategies[symbol].stop_strategy()
|
|
|
|
# Then stop threads
|
|
for symbol in list(self.threads.keys()):
|
|
self.stop_strategy(symbol)
|
|
|
|
logger.info("All strategies stopped")
|
|
|
|
def get_status(self):
|
|
"""Get system status"""
|
|
status = {
|
|
'running': self.running,
|
|
'active_strategies': list(self.threads.keys()),
|
|
'strategies_detail': {}
|
|
}
|
|
|
|
for symbol, strategy in self.strategies.items():
|
|
status['strategies_detail'][symbol] = {
|
|
'base_amount': strategy.base_amount,
|
|
'entry_price': strategy.entry_price,
|
|
'config': self.symbol_configs[symbol]
|
|
}
|
|
|
|
return status
|
|
|
|
def shutdown(self):
|
|
"""System shutdown handling"""
|
|
logger.info("System shutting down, stopping all strategies and monitoring...")
|
|
self.stop_all_strategies()
|
|
|
|
# Stop all dynamic stop-loss monitoring
|
|
for symbol, strategy in self.strategies.items():
|
|
try:
|
|
strategy.stop_dynamic_stop_monitor()
|
|
logger.info(f"Stopped {symbol} dynamic stop-loss monitoring")
|
|
except Exception as e:
|
|
logger.error(f"Error stopping {symbol} dynamic stop-loss monitoring: {e}")
|
|
|
|
# Wait for all threads to end
|
|
for symbol, thread in list(self.threads.items()):
|
|
if thread.is_alive():
|
|
thread.join(timeout=5) # Wait 5 seconds
|
|
if thread.is_alive():
|
|
logger.warning(f"{symbol} strategy thread didn't exit normally")
|
|
|
|
# Ensure all resources released
|
|
time.sleep(1)
|
|
logger.info("System shutdown completed")
|
|
print("👋 System safely exited, thank you for using!")
|