#!/usr/bin/env python3 """ Multi-Pair Divergence Live Trading Bot. Trades the top 10 cryptocurrency pairs based on spread divergence using a universal ML model for signal generation. Usage: # Run with demo account (default) uv run python -m live_trading.multi_pair.main # Run with specific settings uv run python -m live_trading.multi_pair.main --max-position 500 --leverage 2 """ import argparse import logging import signal import sys import time from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path sys.path.insert(0, str(Path(__file__).parent.parent.parent)) from live_trading.okx_client import OKXClient from live_trading.position_manager import PositionManager from live_trading.multi_pair.config import ( OKXConfig, MultiPairLiveConfig, PathConfig, get_multi_pair_config ) from live_trading.multi_pair.data_feed import MultiPairDataFeed, TradingPair from live_trading.multi_pair.strategy import LiveMultiPairStrategy def setup_logging(log_dir: Path) -> logging.Logger: """Configure logging for the trading bot.""" log_file = log_dir / "multi_pair_live.log" logging.basicConfig( level=logging.INFO, format='%(asctime)s [%(levelname)s] %(name)s: %(message)s', handlers=[ logging.FileHandler(log_file), logging.StreamHandler(sys.stdout), ], force=True ) return logging.getLogger(__name__) @dataclass class PositionState: """Track current position state for multi-pair.""" pair: TradingPair | None = None pair_id: str | None = None direction: str | None = None entry_price: float = 0.0 size: float = 0.0 stop_loss: float = 0.0 take_profit: float = 0.0 entry_time: datetime | None = None class MultiPairLiveTradingBot: """ Main trading bot for multi-pair divergence strategy. Coordinates data fetching, pair scoring, and order execution. """ def __init__( self, okx_config: OKXConfig, trading_config: MultiPairLiveConfig, path_config: PathConfig ): self.okx_config = okx_config self.trading_config = trading_config self.path_config = path_config self.logger = logging.getLogger(__name__) self.running = True # Initialize components self.logger.info("Initializing multi-pair trading bot...") # Create OKX client with adapted config self._adapted_trading_config = self._adapt_config_for_okx_client() self.okx_client = OKXClient(okx_config, self._adapted_trading_config) # Initialize data feed self.data_feed = MultiPairDataFeed( self.okx_client, trading_config, path_config ) # Initialize position manager (reuse from single-pair) self.position_manager = PositionManager( self.okx_client, self._adapted_trading_config, path_config ) # Initialize strategy self.strategy = LiveMultiPairStrategy(trading_config, path_config) # Current position state self.position = PositionState() # Register signal handlers signal.signal(signal.SIGINT, self._handle_shutdown) signal.signal(signal.SIGTERM, self._handle_shutdown) self._print_startup_banner() # Sync with exchange positions on startup self._sync_position_from_exchange() def _adapt_config_for_okx_client(self): """Create config compatible with OKXClient.""" # OKXClient expects specific attributes @dataclass class AdaptedConfig: eth_symbol: str = "ETH/USDT:USDT" btc_symbol: str = "BTC/USDT:USDT" timeframe: str = "1h" candles_to_fetch: int = 500 max_position_usdt: float = -1.0 min_position_usdt: float = 10.0 leverage: int = 1 margin_mode: str = "cross" stop_loss_pct: float = 0.06 take_profit_pct: float = 0.05 max_concurrent_positions: int = 1 z_entry_threshold: float = 1.0 z_window: int = 24 model_prob_threshold: float = 0.5 funding_threshold: float = 0.0005 sleep_seconds: int = 3600 slippage_pct: float = 0.001 adapted = AdaptedConfig() adapted.timeframe = self.trading_config.timeframe adapted.candles_to_fetch = self.trading_config.candles_to_fetch adapted.max_position_usdt = self.trading_config.max_position_usdt adapted.min_position_usdt = self.trading_config.min_position_usdt adapted.leverage = self.trading_config.leverage adapted.margin_mode = self.trading_config.margin_mode adapted.max_concurrent_positions = self.trading_config.max_concurrent_positions adapted.sleep_seconds = self.trading_config.sleep_seconds adapted.slippage_pct = self.trading_config.slippage_pct return adapted def _print_startup_banner(self) -> None: """Print startup information.""" mode = "DEMO/SANDBOX" if self.okx_config.demo_mode else "LIVE" print("=" * 60) print(" Multi-Pair Divergence Strategy - Live Trading Bot") print("=" * 60) print(f" Mode: {mode}") print(f" Assets: {len(self.trading_config.assets)} assets") print(f" Pairs: {self.trading_config.get_pair_count()} pairs") print(f" Timeframe: {self.trading_config.timeframe}") print(f" Max Position: ${self.trading_config.max_position_usdt if self.trading_config.max_position_usdt > 0 else 'All available'}") print(f" Leverage: {self.trading_config.leverage}x") print(f" Z-Entry: > {self.trading_config.z_entry_threshold}") print(f" Prob Threshold: > {self.trading_config.prob_threshold}") print(f" Cycle Interval: {self.trading_config.sleep_seconds // 60} minutes") print("=" * 60) print(f" Assets: {', '.join([a.split('/')[0] for a in self.trading_config.assets])}") print("=" * 60) if not self.okx_config.demo_mode: print("\n *** WARNING: LIVE TRADING MODE - REAL FUNDS AT RISK ***\n") def _handle_shutdown(self, signum, frame) -> None: """Handle shutdown signals gracefully.""" self.logger.info("Shutdown signal received, stopping...") self.running = False def _sync_position_from_exchange(self) -> bool: """ Sync internal position state with exchange positions. Checks for existing open positions on the exchange and updates internal state to match. This prevents stacking positions when the bot is restarted. Returns: True if a position was synced, False otherwise """ try: positions = self.okx_client.get_positions() if not positions: if self.position.pair is not None: # Position was closed externally (e.g., SL/TP hit) self.logger.info( "Position %s was closed externally, resetting state", self.position.pair.name if self.position.pair else "unknown" ) self.position = PositionState() return False # Check each position against our tradeable assets our_assets = set(self.trading_config.assets) for pos in positions: pos_symbol = pos.get('symbol', '') contracts = abs(float(pos.get('contracts', 0))) if contracts == 0: continue # Check if this position is for one of our assets if pos_symbol not in our_assets: continue # Found a position for one of our assets side = pos.get('side', 'long') entry_price = float(pos.get('entryPrice', 0)) unrealized_pnl = float(pos.get('unrealizedPnl', 0)) # If we already track this position, just update if (self.position.pair is not None and self.position.pair.base_asset == pos_symbol): self.logger.debug( "Position already tracked: %s %s %.2f contracts", side, pos_symbol, contracts ) return True # New position found - sync it # Find or create a TradingPair for this position matched_pair = None for pair in self.data_feed.pairs: if pair.base_asset == pos_symbol: matched_pair = pair break if matched_pair is None: # Create a placeholder pair (we don't know the quote asset) matched_pair = TradingPair( base_asset=pos_symbol, quote_asset="UNKNOWN" ) # Calculate approximate SL/TP based on config defaults sl_pct = self.trading_config.base_sl_pct tp_pct = self.trading_config.base_tp_pct if side == 'long': stop_loss = entry_price * (1 - sl_pct) take_profit = entry_price * (1 + tp_pct) else: stop_loss = entry_price * (1 + sl_pct) take_profit = entry_price * (1 - tp_pct) self.position = PositionState( pair=matched_pair, pair_id=matched_pair.pair_id, direction=side, entry_price=entry_price, size=contracts, stop_loss=stop_loss, take_profit=take_profit, entry_time=None # Unknown for synced positions ) self.logger.info( "Synced existing position from exchange: %s %s %.4f @ %.4f (PnL: %.2f)", side.upper(), pos_symbol, contracts, entry_price, unrealized_pnl ) return True # No matching positions found if self.position.pair is not None: self.logger.info( "Position %s no longer exists on exchange, resetting state", self.position.pair.name ) self.position = PositionState() return False except Exception as e: self.logger.error("Failed to sync position from exchange: %s", e) return False def run_trading_cycle(self) -> None: """ Execute one trading cycle. 1. Sync position state with exchange 2. Fetch latest market data for all assets 3. Calculate features for all pairs 4. Score pairs and find best opportunity 5. Check exit conditions for current position 6. Execute trades if needed """ cycle_start = datetime.now(timezone.utc) self.logger.info("--- Trading Cycle Start: %s ---", cycle_start.isoformat()) try: # 1. Sync position state with exchange (detect SL/TP closures) self._sync_position_from_exchange() # 2. Fetch all market data pair_features = self.data_feed.get_latest_data() if pair_features is None: self.logger.warning("No market data available, skipping cycle") return # 2. Check exit conditions for current position if self.position.pair is not None: exit_signal = self.strategy.check_exit_signal( pair_features, self.position.pair_id ) if exit_signal['action'] == 'exit': self._execute_exit(exit_signal) else: # Check SL/TP current_price = self.data_feed.get_current_price( self.position.pair.base_asset ) if current_price: sl_tp_exit = self._check_sl_tp(current_price) if sl_tp_exit: self._execute_exit({'reason': sl_tp_exit}) # 3. Generate entry signal if no position if self.position.pair is None: entry_signal = self.strategy.generate_signal( pair_features, self.data_feed.pairs ) if entry_signal['action'] == 'entry': self._execute_entry(entry_signal) # 4. Log status if self.position.pair: self.logger.info( "Position: %s %s, entry=%.4f, current PnL check pending", self.position.direction, self.position.pair.name, self.position.entry_price ) else: self.logger.info("No open position") except Exception as e: self.logger.error("Trading cycle error: %s", e, exc_info=True) cycle_duration = (datetime.now(timezone.utc) - cycle_start).total_seconds() self.logger.info("--- Cycle completed in %.1fs ---", cycle_duration) def _check_sl_tp(self, current_price: float) -> str | None: """Check stop-loss and take-profit levels.""" if self.position.direction == 'long': if current_price <= self.position.stop_loss: return f"stop_loss ({current_price:.4f} <= {self.position.stop_loss:.4f})" if current_price >= self.position.take_profit: return f"take_profit ({current_price:.4f} >= {self.position.take_profit:.4f})" else: # short if current_price >= self.position.stop_loss: return f"stop_loss ({current_price:.4f} >= {self.position.stop_loss:.4f})" if current_price <= self.position.take_profit: return f"take_profit ({current_price:.4f} <= {self.position.take_profit:.4f})" return None def _execute_entry(self, signal: dict) -> None: """Execute entry trade.""" pair = signal['pair'] symbol = pair.base_asset # Trade the base asset direction = signal['direction'] self.logger.info( "Entry signal: %s %s (z=%.2f, p=%.2f, score=%.3f)", direction.upper(), pair.name, signal['z_score'], signal['probability'], signal['divergence_score'] ) # Get account balance try: balance = self.okx_client.get_balance() available_usdt = balance['free'] except Exception as e: self.logger.error("Could not get balance: %s", e) return # Calculate position size size_usdt = self.strategy.calculate_position_size( signal['divergence_score'], available_usdt ) if size_usdt <= 0: self.logger.info("Position size too small, skipping entry") return current_price = signal['base_price'] size_asset = size_usdt / current_price # Calculate SL/TP stop_loss, take_profit = self.strategy.calculate_sl_tp( current_price, direction, signal['atr'], signal['atr_pct'] ) self.logger.info( "Executing %s entry: %.6f %s @ %.4f ($%.2f), SL=%.4f, TP=%.4f", direction.upper(), size_asset, symbol.split('/')[0], current_price, size_usdt, stop_loss, take_profit ) try: # Place market order order_side = "buy" if direction == "long" else "sell" order = self.okx_client.place_market_order(symbol, order_side, size_asset) filled_price = order.get('average') or order.get('price') or current_price filled_amount = order.get('filled') or order.get('amount') or size_asset if filled_price is None or filled_price == 0: filled_price = current_price if filled_amount is None or filled_amount == 0: filled_amount = size_asset # Recalculate SL/TP with filled price stop_loss, take_profit = self.strategy.calculate_sl_tp( filled_price, direction, signal['atr'], signal['atr_pct'] ) # Update position state self.position = PositionState( pair=pair, pair_id=pair.pair_id, direction=direction, entry_price=filled_price, size=filled_amount, stop_loss=stop_loss, take_profit=take_profit, entry_time=datetime.now(timezone.utc) ) self.logger.info( "Position opened: %s %s %.6f @ %.4f", direction.upper(), pair.name, filled_amount, filled_price ) # Try to set SL/TP on exchange try: self.okx_client.set_stop_loss_take_profit( symbol, direction, filled_amount, stop_loss, take_profit ) except Exception as e: self.logger.warning("Could not set SL/TP on exchange: %s", e) except Exception as e: self.logger.error("Order execution failed: %s", e, exc_info=True) def _execute_exit(self, signal: dict) -> None: """Execute exit trade.""" if self.position.pair is None: return symbol = self.position.pair.base_asset reason = signal.get('reason', 'unknown') self.logger.info( "Exit signal: %s %s, reason: %s", self.position.direction, self.position.pair.name, reason ) try: # Close position on exchange self.okx_client.close_position(symbol) self.logger.info( "Position closed: %s %s", self.position.direction, self.position.pair.name ) # Reset position state self.position = PositionState() except Exception as e: self.logger.error("Exit execution failed: %s", e, exc_info=True) def run(self) -> None: """Main trading loop.""" self.logger.info("Starting multi-pair trading loop...") while self.running: try: self.run_trading_cycle() if self.running: sleep_seconds = self.trading_config.sleep_seconds minutes = sleep_seconds // 60 self.logger.info("Sleeping for %d minutes...", minutes) for _ in range(sleep_seconds): if not self.running: break time.sleep(1) except KeyboardInterrupt: self.logger.info("Keyboard interrupt received") break except Exception as e: self.logger.error("Unexpected error in main loop: %s", e, exc_info=True) time.sleep(60) self.logger.info("Shutting down...") self.logger.info("Shutdown complete") def parse_args(): """Parse command line arguments.""" parser = argparse.ArgumentParser( description="Multi-Pair Divergence Live Trading Bot" ) parser.add_argument( "--max-position", type=float, default=None, help="Maximum position size in USDT" ) parser.add_argument( "--leverage", type=int, default=None, help="Trading leverage (1-125)" ) parser.add_argument( "--interval", type=int, default=None, help="Trading cycle interval in seconds" ) parser.add_argument( "--live", action="store_true", help="Use live trading mode (requires OKX_DEMO_MODE=false)" ) return parser.parse_args() def main(): """Main entry point.""" args = parse_args() # Load configuration okx_config, trading_config, path_config = get_multi_pair_config() # Apply command line overrides if args.max_position is not None: trading_config.max_position_usdt = args.max_position if args.leverage is not None: trading_config.leverage = args.leverage if args.interval is not None: trading_config.sleep_seconds = args.interval if args.live: okx_config.demo_mode = False # Setup logging logger = setup_logging(path_config.logs_dir) try: # Validate config okx_config.validate() # Create and run bot bot = MultiPairLiveTradingBot(okx_config, trading_config, path_config) bot.run() except ValueError as e: logger.error("Configuration error: %s", e) sys.exit(1) except Exception as e: logger.error("Fatal error: %s", e, exc_info=True) sys.exit(1) if __name__ == "__main__": main()