MarketDataCollector/okx_client.py
Vasily.onl 14272ab6eb - uv for package management
- improve websocket a bit
- updated database for daily rotation and locking threads, errors handling
2025-06-09 12:51:10 +08:00

277 lines
11 KiB
Python

import os
import time
import hmac
import hashlib
import base64
import json
import pandas as pd
import threading
import requests
import websocket
import logging
class OKXClient:
PUBLIC_WS_URL = "wss://ws.okx.com:8443/ws/v5/public"
PRIVATE_WS_URL = "wss://ws.okx.com:8443/ws/v5/private"
REST_URL = "https://www.okx.com"
def __init__(self, authenticate: bool = True):
self.authenticated = False
self.api_key = None
self.api_secret = None
self.api_passphrase = None
self.ws = None
self.ws_private = None
self._lock = threading.Lock()
self._private_lock = threading.Lock()
if authenticate:
config_path = os.path.join(os.path.dirname(__file__), '../credentials/okx_creds.json')
try:
with open(config_path, 'r') as f:
config = json.load(f)
except FileNotFoundError:
raise FileNotFoundError(f"Credentials file not found at {config_path}. Please create it with the required keys.")
except json.JSONDecodeError:
raise ValueError(f"Credentials file at {config_path} is not valid JSON.")
self.api_key = config.get("OKX_API_KEY")
self.api_secret = config.get("OKX_API_SECRET")
self.api_passphrase = config.get("OKX_API_PASSPHRASE")
if not self.api_key or not self.api_secret or not self.api_passphrase:
raise ValueError("API key, secret, and passphrase must be set in the credentials JSON file.")
self._authenticate()
self._connect_ws()
def _connect_ws(self):
logging.info("Attempting to connect to OKX websocket...")
try:
if self.ws is None:
logging.debug("Creating public websocket connection...")
self.ws = websocket.create_connection(
self.PUBLIC_WS_URL,
timeout=10,
header=["User-Agent: OKX-Market-Data-Collector"]
)
logging.info("Successfully connected to public websocket")
if self.authenticated and self.api_key and self.api_secret and self.api_passphrase and self.ws_private is None:
logging.debug("Creating private websocket connection...")
self.ws_private = websocket.create_connection(
self.PRIVATE_WS_URL,
timeout=10,
header=["User-Agent: OKX-Market-Data-Collector"]
)
logging.info("Successfully connected to private websocket")
except Exception as e:
logging.error(f"Failed to connect to websocket: {e}")
if self.ws:
try:
self.ws.close()
except:
pass
self.ws = None
if self.ws_private:
try:
self.ws_private.close()
except:
pass
self.ws_private = None
raise
def _get_timestamp(self):
return str(round(time.time(), 3))
def _sign(self, timestamp, method, request_path, body):
if not body:
body = ''
message = f'{timestamp}{method}{request_path}{body}'
mac = hmac.new(self.api_secret.encode('utf-8'), message.encode('utf-8'), hashlib.sha256)
return base64.b64encode(mac.digest()).decode()
def _authenticate(self):
timestamp = self._get_timestamp()
sign = self._sign(timestamp, 'GET', '/users/self/verify', '')
login_params = {
"op": "login",
"args": [{
"apiKey": self.api_key,
"passphrase": self.api_passphrase,
"timestamp": timestamp,
"sign": sign
}]
}
self.ws_private.send(json.dumps(login_params))
logging.info("Waiting for login response from OKX...")
while True:
try:
resp = self.ws_private.recv()
logging.debug(f"Received from OKX private WS: {resp}")
if not resp:
continue
try:
msg = json.loads(resp)
except Exception:
logging.warning(f"Non-JSON message received: {resp}")
continue
if msg.get("event") == "login":
if msg.get("code") == "0":
logging.info("OKX WebSocket login successful.")
self.authenticated = True
break
else:
raise Exception(f"WebSocket authentication failed: {msg}")
except websocket._exceptions.WebSocketConnectionClosedException as e:
logging.error(f"WebSocket connection closed during authentication: {e}")
raise
except Exception as e:
logging.error(f"Exception during authentication: {e}")
raise
def _send_subscription(self, params, is_private=False):
ws = self.ws_private if is_private else self.ws
if not ws:
raise Exception("WebSocket connection not established")
try:
ws.send(json.dumps(params))
# Wait for subscription confirmation
resp = ws.recv()
msg = json.loads(resp)
if msg.get("event") == "error":
raise Exception(f"Subscription error: {msg}")
elif msg.get("event") == "subscribe":
logging.info(f"Successfully subscribed to {params['args'][0]['channel']}")
else:
logging.debug(f"Received subscription response: {msg}")
except Exception as e:
logging.error(f"Error during subscription: {e}")
raise
def subscribe_candlesticks(self, instrument="BTC-USDT", timeframe="1m"):
tf_map = {"1m": "candle1m", "5m": "candle5m", "15m": "candle15m", "1h": "candle1H"}
channel = tf_map.get(timeframe, f"candle{timeframe}")
params = {
"op": "subscribe",
"args": [{"channel": channel, "instId": instrument}]
}
logging.info(f"Subscribing to candlesticks for {instrument}")
self._send_subscription(params)
def subscribe_trades(self, instrument="BTC-USDT"):
params = {
"op": "subscribe",
"args": [{"channel": "trades", "instId": instrument}]
}
logging.info(f"Subscribing to trades for {instrument}")
self._send_subscription(params)
def subscribe_ticker(self, instrument="BTC-USDT"):
params = {
"op": "subscribe",
"args": [{"channel": "tickers", "instId": instrument}]
}
logging.info(f"Subscribing to ticker for {instrument}")
self._send_subscription(params)
def subscribe_book(self, instrument="BTC-USDT", depth=5, channel="books"):
params = {
"op": "subscribe",
"args": [{"channel": channel, "instId": instrument}]
}
logging.info(f"Subscribing to order book for {instrument}")
self._send_subscription(params)
def subscribe_user_order(self):
if not self.authenticated:
logging.warning("Attempted to subscribe to user order channel without authentication.")
return
params = {
"op": "subscribe",
"args": [{"channel": "orders", "instType": "SPOT"}]
}
self._send_subscription(params, is_private=True)
def subscribe_user_trade(self):
if not self.authenticated:
logging.warning("Attempted to subscribe to user trade channel without authentication.")
return
params = {
"op": "subscribe",
"args": [{"channel": "trades", "instType": "SPOT"}]
}
self._send_subscription(params, is_private=True)
def subscribe_user_balance(self):
if not self.authenticated:
logging.warning("Attempted to subscribe to user balance channel without authentication.")
return
params = {
"op": "subscribe",
"args": [{"channel": "balance_and_position"}]
}
self._send_subscription(params, is_private=True)
def get_balance(self, currency=None):
url = f"{self.REST_URL}/api/v5/account/balance"
timestamp = self._get_timestamp()
method = "GET"
request_path = "/api/v5/account/balance"
body = ''
sign = self._sign(timestamp, method, request_path, body)
headers = {
"OK-ACCESS-KEY": self.api_key,
"OK-ACCESS-SIGN": sign,
"OK-ACCESS-TIMESTAMP": timestamp,
"OK-ACCESS-PASSPHRASE": self.api_passphrase,
"Content-Type": "application/json"
}
resp = requests.get(url, headers=headers)
if resp.status_code == 200:
data = resp.json()
balances = data.get("data", [{}])[0].get("details", [])
if currency:
return [b for b in balances if b.get("ccy") == currency]
return balances
return []
def place_order(self, side, amount, instrument="BTC-USDT"):
url = f"{self.REST_URL}/api/v5/trade/order"
timestamp = self._get_timestamp()
method = "POST"
request_path = "/api/v5/trade/order"
body_dict = {
"instId": instrument,
"tdMode": "cash",
"side": side.lower(),
"ordType": "market",
"sz": str(amount)
}
body = json.dumps(body_dict)
sign = self._sign(timestamp, method, request_path, body)
headers = {
"OK-ACCESS-KEY": self.api_key,
"OK-ACCESS-SIGN": sign,
"OK-ACCESS-TIMESTAMP": timestamp,
"OK-ACCESS-PASSPHRASE": self.api_passphrase,
"Content-Type": "application/json"
}
resp = requests.post(url, headers=headers, data=body)
return resp.json()
def buy_btc(self, amount, instrument="BTC-USDT"):
return self.place_order("buy", amount, instrument)
def sell_btc(self, amount, instrument="BTC-USDT"):
return self.place_order("sell", amount, instrument)
def get_instruments(self):
url = f"{self.REST_URL}/api/v5/public/instruments?instType=SPOT"
resp = requests.get(url)
if resp.status_code == 200:
data = resp.json()
return data.get("data", [])
return []