153 lines
5.7 KiB
Python
153 lines
5.7 KiB
Python
from okx_client import OKXClient
|
|
from market_db import MarketDB
|
|
import json
|
|
import logging
|
|
import threading
|
|
from collections import deque
|
|
import time
|
|
import signal
|
|
|
|
latest_book = {'bids': [], 'asks': [], 'timestamp': None}
|
|
book_history = deque()
|
|
trade_history = deque()
|
|
|
|
TRADE_HISTORY_SECONDS = 60
|
|
BOOK_HISTORY_SECONDS = 5
|
|
|
|
shutdown_flag = threading.Event()
|
|
|
|
def connect(instrument, max_retries=5):
|
|
logging.info(f"Connecting to OKX for instrument: {instrument}")
|
|
retries = 0
|
|
backoff = 1
|
|
while not shutdown_flag.is_set():
|
|
try:
|
|
client = OKXClient(authenticate=False)
|
|
client.subscribe_trades(instrument)
|
|
client.subscribe_book(instrument, depth=5, channel="books")
|
|
logging.info(f"Subscribed to trades and book for {instrument}")
|
|
return client
|
|
except Exception as e:
|
|
retries += 1
|
|
logging.error(f"Failed to connect to OKX: {e}. Retry {retries}/{max_retries} in {backoff}s.")
|
|
if retries >= max_retries:
|
|
logging.critical("Max retries reached. Exiting connect loop.")
|
|
raise
|
|
time.sleep(backoff)
|
|
backoff = min(backoff * 2, 60) # exponential backoff, max 60s
|
|
return None
|
|
|
|
def cleanup(client, db):
|
|
if client and hasattr(client, 'ws') and client.ws:
|
|
try:
|
|
client.ws.close()
|
|
except Exception as e:
|
|
logging.warning(f"Error closing websocket: {e}")
|
|
if db:
|
|
db.close()
|
|
|
|
def signal_handler(signum, frame):
|
|
logging.info(f"Received signal {signum}, shutting down...")
|
|
shutdown_flag.set()
|
|
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
signal.signal(signal.SIGTERM, signal_handler)
|
|
|
|
def main():
|
|
instruments = [
|
|
"ETH-USDT",
|
|
"BTC-USDT",
|
|
"SOL-USDT",
|
|
"DOGE-USDT",
|
|
"TON-USDT",
|
|
"ETH-USDC",
|
|
"SOPH-USDT",
|
|
"PEPE-USDT",
|
|
"BTC-USDC",
|
|
"UNI-USDT"
|
|
]
|
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s')
|
|
dbs = {}
|
|
clients = {}
|
|
try:
|
|
for instrument in instruments:
|
|
dbs[instrument] = MarketDB(market=instrument.replace("-", "_"), db_dir="./data/db")
|
|
logging.info(f"Database initialized for {instrument}")
|
|
clients[instrument] = connect(instrument)
|
|
|
|
while not shutdown_flag.is_set():
|
|
for instrument in instruments:
|
|
client = clients[instrument]
|
|
db = dbs[instrument]
|
|
try:
|
|
data = client.ws.recv()
|
|
except Exception as e:
|
|
logging.warning(f"WebSocket disconnected or error for {instrument}: {e}. Reconnecting...")
|
|
cleanup(client, None)
|
|
try:
|
|
clients[instrument] = connect(instrument)
|
|
except Exception as e:
|
|
logging.critical(f"Could not reconnect {instrument}: {e}. Skipping.")
|
|
continue
|
|
continue
|
|
|
|
if shutdown_flag.is_set():
|
|
break
|
|
if data == '':
|
|
continue
|
|
|
|
try:
|
|
msg = json.loads(data)
|
|
except Exception as e:
|
|
logging.warning(f"Failed to parse JSON for {instrument}: {e}, data: {data}")
|
|
continue
|
|
|
|
if 'arg' in msg and msg['arg'].get('channel') == 'trades':
|
|
for trade in msg.get('data', []):
|
|
db.insert_trade({
|
|
'instrument': instrument,
|
|
'trade_id': trade.get('tradeId'),
|
|
'price': float(trade.get('px')),
|
|
'size': float(trade.get('sz')),
|
|
'side': trade.get('side'),
|
|
'timestamp': trade.get('ts')
|
|
})
|
|
ts = float(trade.get('ts', time.time() * 1000))
|
|
trade_history.append({
|
|
'price': trade.get('px'),
|
|
'size': trade.get('sz'),
|
|
'side': trade.get('side'),
|
|
'timestamp': ts
|
|
})
|
|
elif 'arg' in msg and msg['arg'].get('channel', '').startswith('books'):
|
|
for book in msg.get('data', []):
|
|
db.insert_book({
|
|
'instrument': instrument,
|
|
'bids': book.get('bids'),
|
|
'asks': book.get('asks'),
|
|
'timestamp': book.get('ts')
|
|
})
|
|
latest_book['bids'] = book.get('bids', [])
|
|
latest_book['asks'] = book.get('asks', [])
|
|
latest_book['timestamp'] = book.get('ts')
|
|
ts = float(book.get('ts', time.time() * 1000))
|
|
book_history.append({
|
|
'bids': book.get('bids', []),
|
|
'asks': book.get('asks', []),
|
|
'timestamp': ts
|
|
})
|
|
else:
|
|
logging.info(f"Unknown message for {instrument}: {msg}")
|
|
except Exception as e:
|
|
logging.critical(f"Fatal error in main: {e}")
|
|
finally:
|
|
for client in clients.values():
|
|
cleanup(client, None)
|
|
for db in dbs.values():
|
|
cleanup(None, db)
|
|
logging.info('Shutdown complete.')
|
|
|
|
if __name__ == '__main__':
|
|
main()
|