294 lines
11 KiB
Python
294 lines
11 KiB
Python
from okx_client import OKXClient
|
|
from market_db import MarketDB
|
|
import json
|
|
import logging
|
|
import threading
|
|
from collections import deque
|
|
import time
|
|
import signal
|
|
import queue
|
|
|
|
# Memory tuning flags (set to True only if you need in-memory histories or latest snapshot)
|
|
ENABLE_HISTORY = False
|
|
STORE_LATEST_BOOK = False
|
|
|
|
latest_book = {'bids': [], 'asks': [], 'timestamp': None}
|
|
book_history = deque(maxlen=200) # Keep last 100 book updates (~20-60 seconds)
|
|
trade_history = deque(maxlen=500) # Keep last 1000 trades (~2-10 minutes)
|
|
|
|
TRADE_HISTORY_SECONDS = 60
|
|
BOOK_HISTORY_SECONDS = 5
|
|
|
|
shutdown_flag = threading.Event()
|
|
|
|
PING_INTERVAL = 25 # N seconds, must be < 30
|
|
PONG_TIMEOUT = 25 # Wait this long for pong after ping
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s %(levelname)s %(message)s',
|
|
handlers=[
|
|
logging.FileHandler('market_data_collector.log', mode='a'),
|
|
logging.StreamHandler()
|
|
]
|
|
)
|
|
|
|
def connect(instrument, max_retries=5):
|
|
logging.info(f"Attempting to connect to OKX for instrument: {instrument}")
|
|
retries = 0
|
|
backoff = 1
|
|
while not shutdown_flag.is_set():
|
|
try:
|
|
logging.info(f"Creating OKXClient for {instrument}")
|
|
client = OKXClient(authenticate=False)
|
|
logging.info(f"Subscribing to trades for {instrument}")
|
|
client.subscribe_trades(instrument)
|
|
logging.info(f"Subscribing to book for {instrument}")
|
|
client.subscribe_book(instrument, depth=5, channel="books")
|
|
logging.info(f"Successfully connected and subscribed for {instrument}")
|
|
return client
|
|
except Exception as e:
|
|
retries += 1
|
|
logging.error(f"Failed to connect to OKX for {instrument}: {e}. Retry {retries}/{max_retries} in {backoff}s.")
|
|
if retries >= max_retries:
|
|
logging.critical(f"Max retries reached for {instrument}. Exiting connect loop.")
|
|
raise
|
|
time.sleep(backoff)
|
|
backoff = min(backoff * 2, 60) # exponential backoff, max 60s
|
|
return None
|
|
|
|
def cleanup(client, db):
|
|
if client and hasattr(client, 'ws') and client.ws:
|
|
try:
|
|
client.ws.close()
|
|
except Exception as e:
|
|
logging.warning(f"Error closing websocket: {e}")
|
|
if db:
|
|
db.close()
|
|
|
|
def signal_handler(signum, frame):
|
|
logging.info(f"Received signal {signum}, shutting down...")
|
|
shutdown_flag.set()
|
|
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
signal.signal(signal.SIGTERM, signal_handler)
|
|
|
|
def initialize_instruments(instruments):
|
|
dbs = {}
|
|
clients = {}
|
|
last_msg_time = {}
|
|
ping_sent = {}
|
|
pong_queue = {}
|
|
|
|
for instrument in instruments:
|
|
try:
|
|
logging.info(f"Initializing database for {instrument}")
|
|
dbs[instrument] = MarketDB(market=instrument, db_dir="./data/db")
|
|
logging.info(f"Successfully initialized database for {instrument}")
|
|
|
|
logging.info(f"Establishing websocket connection for {instrument}")
|
|
clients[instrument] = connect(instrument)
|
|
if not clients[instrument]:
|
|
logging.error(f"Failed to establish connection for {instrument}")
|
|
continue
|
|
|
|
last_msg_time[instrument] = time.time()
|
|
ping_sent[instrument] = False
|
|
pong_queue[instrument] = queue.Queue()
|
|
logging.info(f"Completed initialization for {instrument}")
|
|
except Exception as e:
|
|
logging.error(f"Failed to initialize {instrument}: {e}")
|
|
# Cleanup any partial initialization
|
|
if instrument in dbs:
|
|
try:
|
|
dbs[instrument].close()
|
|
del dbs[instrument]
|
|
except Exception as cleanup_e:
|
|
logging.error(f"Error during cleanup for {instrument}: {cleanup_e}")
|
|
if instrument in clients:
|
|
try:
|
|
cleanup(clients[instrument], None)
|
|
del clients[instrument]
|
|
except Exception as cleanup_e:
|
|
logging.error(f"Error during client cleanup for {instrument}: {cleanup_e}")
|
|
raise # Re-raise the exception to handle it in main()
|
|
|
|
return dbs, clients, last_msg_time, ping_sent, pong_queue
|
|
|
|
def handle_message(data, instrument, db, trade_history, book_history, latest_book, pong_queue):
|
|
now = time.time()
|
|
try:
|
|
msg = json.loads(data)
|
|
except Exception as e:
|
|
logging.warning(f"Failed to parse JSON for {instrument}: {e}, data: {data}")
|
|
return False, now
|
|
|
|
# Handle pong
|
|
if msg.get('event') == 'pong':
|
|
pong_queue[instrument].put(True)
|
|
return False, now
|
|
|
|
if 'arg' in msg and msg['arg'].get('channel') == 'trades':
|
|
for trade in msg.get('data', []):
|
|
db.insert_trade({
|
|
'instrument': instrument,
|
|
'trade_id': trade.get('tradeId'),
|
|
'price': float(trade.get('px')),
|
|
'size': float(trade.get('sz')),
|
|
'side': trade.get('side'),
|
|
'timestamp': trade.get('ts')
|
|
})
|
|
if ENABLE_HISTORY:
|
|
ts = float(trade.get('ts', time.time() * 1000))
|
|
trade_history.append({
|
|
'price': trade.get('px'),
|
|
'size': trade.get('sz'),
|
|
'side': trade.get('side'),
|
|
'timestamp': ts
|
|
})
|
|
elif 'arg' in msg and msg['arg'].get('channel', '').startswith('books'):
|
|
for book in msg.get('data', []):
|
|
db.insert_book({
|
|
'instrument': instrument,
|
|
'bids': book.get('bids'),
|
|
'asks': book.get('asks'),
|
|
'timestamp': book.get('ts')
|
|
})
|
|
if STORE_LATEST_BOOK:
|
|
latest_book['bids'] = book.get('bids', [])
|
|
latest_book['asks'] = book.get('asks', [])
|
|
latest_book['timestamp'] = book.get('ts')
|
|
if ENABLE_HISTORY:
|
|
ts = float(book.get('ts', time.time() * 1000))
|
|
book_history.append({
|
|
'bids': book.get('bids', []),
|
|
'asks': book.get('asks', []),
|
|
'timestamp': ts
|
|
})
|
|
else:
|
|
logging.info(f"Unknown message for {instrument}: {msg}")
|
|
return True, now
|
|
|
|
def handle_ping_pong(ws, instrument, last_msg_time, ping_sent, pong_queue):
|
|
now = time.time()
|
|
time_since_last = now - last_msg_time[instrument]
|
|
|
|
if not ping_sent[instrument] and time_since_last > PING_INTERVAL:
|
|
try:
|
|
ws.send(json.dumps({'op': 'ping'}))
|
|
ping_sent[instrument] = True
|
|
logging.info(f"Sent ping to {instrument}")
|
|
# Wait for pong
|
|
pong_received = False
|
|
pong_start = time.time()
|
|
while time.time() - pong_start < PONG_TIMEOUT:
|
|
try:
|
|
pong_received = pong_queue[instrument].get(timeout=1)
|
|
if pong_received:
|
|
logging.info(f"Pong received from {instrument}")
|
|
last_msg_time[instrument] = time.time()
|
|
ping_sent[instrument] = False
|
|
break
|
|
except queue.Empty:
|
|
continue
|
|
if not pong_received:
|
|
raise Exception("Pong not received in time")
|
|
except Exception as e:
|
|
logging.warning(f"No pong from {instrument}: {e}. Reconnecting...")
|
|
return False
|
|
return True
|
|
|
|
def reconnect_instrument(instrument, clients, last_msg_time, ping_sent, pong_queue):
|
|
cleanup(clients[instrument], None)
|
|
try:
|
|
clients[instrument] = connect(instrument)
|
|
last_msg_time[instrument] = time.time()
|
|
ping_sent[instrument] = False
|
|
pong_queue[instrument] = queue.Queue()
|
|
return True
|
|
except Exception as e:
|
|
logging.critical(f"Could not reconnect {instrument}: {e}. Skipping.")
|
|
return False
|
|
|
|
def shutdown_cleanup(clients, dbs):
|
|
for client in clients.values():
|
|
cleanup(client, None)
|
|
for db in dbs.values():
|
|
cleanup(None, db)
|
|
logging.info('Shutdown complete.')
|
|
|
|
def main():
|
|
instruments = [
|
|
"ETH-USDT",
|
|
"BTC-USDT",
|
|
"BTC-USDT-SWAP"
|
|
]
|
|
|
|
try:
|
|
logging.info("Starting Market Data Collector")
|
|
logging.info(f"Initializing instruments: {instruments}")
|
|
dbs, clients, last_msg_time, ping_sent, pong_queue = initialize_instruments(instruments)
|
|
|
|
if not dbs or not clients:
|
|
logging.critical("Failed to initialize all required components. Exiting.")
|
|
return
|
|
|
|
logging.info("Successfully initialized all components. Starting main loop.")
|
|
|
|
while not shutdown_flag.is_set():
|
|
now = time.time()
|
|
for instrument in instruments:
|
|
if instrument not in clients or instrument not in dbs:
|
|
logging.warning(f"Skipping {instrument} - not properly initialized")
|
|
continue
|
|
|
|
client = clients[instrument]
|
|
db = dbs[instrument]
|
|
ws = client.ws
|
|
|
|
if not ws:
|
|
logging.error(f"No websocket connection for {instrument}. Attempting to reconnect.")
|
|
if not reconnect_instrument(instrument, clients, last_msg_time, ping_sent, pong_queue):
|
|
continue
|
|
ws = clients[instrument].ws
|
|
|
|
try:
|
|
ws.settimeout(1)
|
|
try:
|
|
data = ws.recv()
|
|
except Exception as e:
|
|
if isinstance(e, TimeoutError) or 'timed out' in str(e):
|
|
data = None
|
|
else:
|
|
raise
|
|
|
|
if shutdown_flag.is_set():
|
|
break
|
|
|
|
if data:
|
|
processed, msg_time = handle_message(
|
|
data, instrument, db, trade_history, book_history, latest_book, pong_queue
|
|
)
|
|
if processed:
|
|
last_msg_time[instrument] = now
|
|
ping_sent[instrument] = False
|
|
|
|
# --- Ping/Pong Keepalive Logic ---
|
|
if not handle_ping_pong(ws, instrument, last_msg_time, ping_sent, pong_queue):
|
|
if not reconnect_instrument(instrument, clients, last_msg_time, ping_sent, pong_queue):
|
|
continue
|
|
|
|
except Exception as e:
|
|
logging.warning(f"WebSocket error for {instrument}: {e}. Attempting to reconnect...")
|
|
if not reconnect_instrument(instrument, clients, last_msg_time, ping_sent, pong_queue):
|
|
continue
|
|
|
|
except Exception as e:
|
|
logging.critical(f"Fatal error in main: {e}")
|
|
finally:
|
|
logging.info("Shutting down Market Data Collector")
|
|
shutdown_cleanup(clients, dbs)
|
|
|
|
if __name__ == '__main__':
|
|
main()
|