from okx_client import OKXClient from market_db import MarketDB import json import logging import threading from collections import deque import time import signal latest_book = {'bids': [], 'asks': [], 'timestamp': None} book_history = deque() trade_history = deque() TRADE_HISTORY_SECONDS = 60 BOOK_HISTORY_SECONDS = 5 shutdown_flag = threading.Event() def connect(instrument, max_retries=5): logging.info(f"Connecting to OKX for instrument: {instrument}") retries = 0 backoff = 1 while not shutdown_flag.is_set(): try: client = OKXClient(authenticate=False) client.subscribe_trades(instrument) client.subscribe_book(instrument, depth=5, channel="books") logging.info(f"Subscribed to trades and book for {instrument}") return client except Exception as e: retries += 1 logging.error(f"Failed to connect to OKX: {e}. Retry {retries}/{max_retries} in {backoff}s.") if retries >= max_retries: logging.critical("Max retries reached. Exiting connect loop.") raise time.sleep(backoff) backoff = min(backoff * 2, 60) # exponential backoff, max 60s return None def cleanup(client, db): if client and hasattr(client, 'ws') and client.ws: try: client.ws.close() except Exception as e: logging.warning(f"Error closing websocket: {e}") if db: db.close() def signal_handler(signum, frame): logging.info(f"Received signal {signum}, shutting down...") shutdown_flag.set() signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) def main(): instruments = [ "ETH-USDT", "BTC-USDT", "SOL-USDT", "DOGE-USDT", "TON-USDT", "ETH-USDC", "SOPH-USDT", "PEPE-USDT", "BTC-USDC", "UNI-USDT" ] logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s') dbs = {} clients = {} try: for instrument in instruments: dbs[instrument] = MarketDB(market=instrument.replace("-", "_"), db_dir="./data/db") logging.info(f"Database initialized for {instrument}") clients[instrument] = connect(instrument) while not shutdown_flag.is_set(): for instrument in instruments: client = clients[instrument] db = dbs[instrument] try: data = client.ws.recv() except Exception as e: logging.warning(f"WebSocket disconnected or error for {instrument}: {e}. Reconnecting...") cleanup(client, None) try: clients[instrument] = connect(instrument) except Exception as e: logging.critical(f"Could not reconnect {instrument}: {e}. Skipping.") continue continue if shutdown_flag.is_set(): break if data == '': continue try: msg = json.loads(data) except Exception as e: logging.warning(f"Failed to parse JSON for {instrument}: {e}, data: {data}") continue if 'arg' in msg and msg['arg'].get('channel') == 'trades': for trade in msg.get('data', []): db.insert_trade({ 'instrument': instrument, 'trade_id': trade.get('tradeId'), 'price': float(trade.get('px')), 'size': float(trade.get('sz')), 'side': trade.get('side'), 'timestamp': trade.get('ts') }) ts = float(trade.get('ts', time.time() * 1000)) trade_history.append({ 'price': trade.get('px'), 'size': trade.get('sz'), 'side': trade.get('side'), 'timestamp': ts }) elif 'arg' in msg and msg['arg'].get('channel', '').startswith('books'): for book in msg.get('data', []): db.insert_book({ 'instrument': instrument, 'bids': book.get('bids'), 'asks': book.get('asks'), 'timestamp': book.get('ts') }) latest_book['bids'] = book.get('bids', []) latest_book['asks'] = book.get('asks', []) latest_book['timestamp'] = book.get('ts') ts = float(book.get('ts', time.time() * 1000)) book_history.append({ 'bids': book.get('bids', []), 'asks': book.get('asks', []), 'timestamp': ts }) else: logging.info(f"Unknown message for {instrument}: {msg}") except Exception as e: logging.critical(f"Fatal error in main: {e}") finally: for client in clients.values(): cleanup(client, None) for db in dbs.values(): cleanup(None, db) logging.info('Shutdown complete.') if __name__ == '__main__': main()