2025-10-14 12:41:58 +08:00

294 lines
11 KiB
Python

from okx_client import OKXClient
from market_db import MarketDB
import json
import logging
import threading
from collections import deque
import time
import signal
import queue
# Memory tuning flags (set to True only if you need in-memory histories or latest snapshot)
ENABLE_HISTORY = False
STORE_LATEST_BOOK = False
latest_book = {'bids': [], 'asks': [], 'timestamp': None}
book_history = deque(maxlen=200) # Keep last 100 book updates (~20-60 seconds)
trade_history = deque(maxlen=500) # Keep last 1000 trades (~2-10 minutes)
TRADE_HISTORY_SECONDS = 60
BOOK_HISTORY_SECONDS = 5
shutdown_flag = threading.Event()
PING_INTERVAL = 25 # N seconds, must be < 30
PONG_TIMEOUT = 25 # Wait this long for pong after ping
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s %(levelname)s %(message)s',
handlers=[
logging.FileHandler('market_data_collector.log', mode='a'),
logging.StreamHandler()
]
)
def connect(instrument, max_retries=5):
logging.info(f"Attempting to connect to OKX for instrument: {instrument}")
retries = 0
backoff = 1
while not shutdown_flag.is_set():
try:
logging.info(f"Creating OKXClient for {instrument}")
client = OKXClient(authenticate=False)
logging.info(f"Subscribing to trades for {instrument}")
client.subscribe_trades(instrument)
logging.info(f"Subscribing to book for {instrument}")
client.subscribe_book(instrument, depth=5, channel="books")
logging.info(f"Successfully connected and subscribed for {instrument}")
return client
except Exception as e:
retries += 1
logging.error(f"Failed to connect to OKX for {instrument}: {e}. Retry {retries}/{max_retries} in {backoff}s.")
if retries >= max_retries:
logging.critical(f"Max retries reached for {instrument}. Exiting connect loop.")
raise
time.sleep(backoff)
backoff = min(backoff * 2, 60) # exponential backoff, max 60s
return None
def cleanup(client, db):
if client and hasattr(client, 'ws') and client.ws:
try:
client.ws.close()
except Exception as e:
logging.warning(f"Error closing websocket: {e}")
if db:
db.close()
def signal_handler(signum, frame):
logging.info(f"Received signal {signum}, shutting down...")
shutdown_flag.set()
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
def initialize_instruments(instruments):
dbs = {}
clients = {}
last_msg_time = {}
ping_sent = {}
pong_queue = {}
for instrument in instruments:
try:
logging.info(f"Initializing database for {instrument}")
dbs[instrument] = MarketDB(market=instrument, db_dir="./data/db")
logging.info(f"Successfully initialized database for {instrument}")
logging.info(f"Establishing websocket connection for {instrument}")
clients[instrument] = connect(instrument)
if not clients[instrument]:
logging.error(f"Failed to establish connection for {instrument}")
continue
last_msg_time[instrument] = time.time()
ping_sent[instrument] = False
pong_queue[instrument] = queue.Queue()
logging.info(f"Completed initialization for {instrument}")
except Exception as e:
logging.error(f"Failed to initialize {instrument}: {e}")
# Cleanup any partial initialization
if instrument in dbs:
try:
dbs[instrument].close()
del dbs[instrument]
except Exception as cleanup_e:
logging.error(f"Error during cleanup for {instrument}: {cleanup_e}")
if instrument in clients:
try:
cleanup(clients[instrument], None)
del clients[instrument]
except Exception as cleanup_e:
logging.error(f"Error during client cleanup for {instrument}: {cleanup_e}")
raise # Re-raise the exception to handle it in main()
return dbs, clients, last_msg_time, ping_sent, pong_queue
def handle_message(data, instrument, db, trade_history, book_history, latest_book, pong_queue):
now = time.time()
try:
msg = json.loads(data)
except Exception as e:
logging.warning(f"Failed to parse JSON for {instrument}: {e}, data: {data}")
return False, now
# Handle pong
if msg.get('event') == 'pong':
pong_queue[instrument].put(True)
return False, now
if 'arg' in msg and msg['arg'].get('channel') == 'trades':
for trade in msg.get('data', []):
db.insert_trade({
'instrument': instrument,
'trade_id': trade.get('tradeId'),
'price': float(trade.get('px')),
'size': float(trade.get('sz')),
'side': trade.get('side'),
'timestamp': trade.get('ts')
})
if ENABLE_HISTORY:
ts = float(trade.get('ts', time.time() * 1000))
trade_history.append({
'price': trade.get('px'),
'size': trade.get('sz'),
'side': trade.get('side'),
'timestamp': ts
})
elif 'arg' in msg and msg['arg'].get('channel', '').startswith('books'):
for book in msg.get('data', []):
db.insert_book({
'instrument': instrument,
'bids': book.get('bids'),
'asks': book.get('asks'),
'timestamp': book.get('ts')
})
if STORE_LATEST_BOOK:
latest_book['bids'] = book.get('bids', [])
latest_book['asks'] = book.get('asks', [])
latest_book['timestamp'] = book.get('ts')
if ENABLE_HISTORY:
ts = float(book.get('ts', time.time() * 1000))
book_history.append({
'bids': book.get('bids', []),
'asks': book.get('asks', []),
'timestamp': ts
})
else:
logging.info(f"Unknown message for {instrument}: {msg}")
return True, now
def handle_ping_pong(ws, instrument, last_msg_time, ping_sent, pong_queue):
now = time.time()
time_since_last = now - last_msg_time[instrument]
if not ping_sent[instrument] and time_since_last > PING_INTERVAL:
try:
ws.send(json.dumps({'op': 'ping'}))
ping_sent[instrument] = True
logging.info(f"Sent ping to {instrument}")
# Wait for pong
pong_received = False
pong_start = time.time()
while time.time() - pong_start < PONG_TIMEOUT:
try:
pong_received = pong_queue[instrument].get(timeout=1)
if pong_received:
logging.info(f"Pong received from {instrument}")
last_msg_time[instrument] = time.time()
ping_sent[instrument] = False
break
except queue.Empty:
continue
if not pong_received:
raise Exception("Pong not received in time")
except Exception as e:
logging.warning(f"No pong from {instrument}: {e}. Reconnecting...")
return False
return True
def reconnect_instrument(instrument, clients, last_msg_time, ping_sent, pong_queue):
cleanup(clients[instrument], None)
try:
clients[instrument] = connect(instrument)
last_msg_time[instrument] = time.time()
ping_sent[instrument] = False
pong_queue[instrument] = queue.Queue()
return True
except Exception as e:
logging.critical(f"Could not reconnect {instrument}: {e}. Skipping.")
return False
def shutdown_cleanup(clients, dbs):
for client in clients.values():
cleanup(client, None)
for db in dbs.values():
cleanup(None, db)
logging.info('Shutdown complete.')
def main():
instruments = [
"ETH-USDT",
"BTC-USDT",
"BTC-USDT-SWAP"
]
try:
logging.info("Starting Market Data Collector")
logging.info(f"Initializing instruments: {instruments}")
dbs, clients, last_msg_time, ping_sent, pong_queue = initialize_instruments(instruments)
if not dbs or not clients:
logging.critical("Failed to initialize all required components. Exiting.")
return
logging.info("Successfully initialized all components. Starting main loop.")
while not shutdown_flag.is_set():
now = time.time()
for instrument in instruments:
if instrument not in clients or instrument not in dbs:
logging.warning(f"Skipping {instrument} - not properly initialized")
continue
client = clients[instrument]
db = dbs[instrument]
ws = client.ws
if not ws:
logging.error(f"No websocket connection for {instrument}. Attempting to reconnect.")
if not reconnect_instrument(instrument, clients, last_msg_time, ping_sent, pong_queue):
continue
ws = clients[instrument].ws
try:
ws.settimeout(1)
try:
data = ws.recv()
except Exception as e:
if isinstance(e, TimeoutError) or 'timed out' in str(e):
data = None
else:
raise
if shutdown_flag.is_set():
break
if data:
processed, msg_time = handle_message(
data, instrument, db, trade_history, book_history, latest_book, pong_queue
)
if processed:
last_msg_time[instrument] = now
ping_sent[instrument] = False
# --- Ping/Pong Keepalive Logic ---
if not handle_ping_pong(ws, instrument, last_msg_time, ping_sent, pong_queue):
if not reconnect_instrument(instrument, clients, last_msg_time, ping_sent, pong_queue):
continue
except Exception as e:
logging.warning(f"WebSocket error for {instrument}: {e}. Attempting to reconnect...")
if not reconnect_instrument(instrument, clients, last_msg_time, ping_sent, pong_queue):
continue
except Exception as e:
logging.critical(f"Fatal error in main: {e}")
finally:
logging.info("Shutting down Market Data Collector")
shutdown_cleanup(clients, dbs)
if __name__ == '__main__':
main()