276 lines
11 KiB
Python
276 lines
11 KiB
Python
import os
|
|
import time
|
|
import hmac
|
|
import hashlib
|
|
import base64
|
|
import json
|
|
import threading
|
|
import requests
|
|
import websocket
|
|
import logging
|
|
|
|
class OKXClient:
|
|
PUBLIC_WS_URL = "wss://ws.okx.com:8443/ws/v5/public"
|
|
PRIVATE_WS_URL = "wss://ws.okx.com:8443/ws/v5/private"
|
|
REST_URL = "https://www.okx.com"
|
|
|
|
def __init__(self, authenticate: bool = True):
|
|
self.authenticated = False
|
|
self.api_key = None
|
|
self.api_secret = None
|
|
self.api_passphrase = None
|
|
self.ws = None
|
|
self.ws_private = None
|
|
self._lock = threading.Lock()
|
|
self._private_lock = threading.Lock()
|
|
|
|
if authenticate:
|
|
config_path = os.path.join(os.path.dirname(__file__), '../credentials/okx_creds.json')
|
|
try:
|
|
with open(config_path, 'r') as f:
|
|
config = json.load(f)
|
|
except FileNotFoundError:
|
|
raise FileNotFoundError(f"Credentials file not found at {config_path}. Please create it with the required keys.")
|
|
except json.JSONDecodeError:
|
|
raise ValueError(f"Credentials file at {config_path} is not valid JSON.")
|
|
|
|
self.api_key = config.get("OKX_API_KEY")
|
|
self.api_secret = config.get("OKX_API_SECRET")
|
|
self.api_passphrase = config.get("OKX_API_PASSPHRASE")
|
|
|
|
if not self.api_key or not self.api_secret or not self.api_passphrase:
|
|
raise ValueError("API key, secret, and passphrase must be set in the credentials JSON file.")
|
|
|
|
self._authenticate()
|
|
self._connect_ws()
|
|
|
|
def _connect_ws(self):
|
|
logging.info("Attempting to connect to OKX websocket...")
|
|
try:
|
|
if self.ws is None:
|
|
logging.debug("Creating public websocket connection...")
|
|
self.ws = websocket.create_connection(
|
|
self.PUBLIC_WS_URL,
|
|
timeout=10,
|
|
header=["User-Agent: OKX-Market-Data-Collector"]
|
|
)
|
|
logging.info("Successfully connected to public websocket")
|
|
|
|
if self.authenticated and self.api_key and self.api_secret and self.api_passphrase and self.ws_private is None:
|
|
logging.debug("Creating private websocket connection...")
|
|
self.ws_private = websocket.create_connection(
|
|
self.PRIVATE_WS_URL,
|
|
timeout=10,
|
|
header=["User-Agent: OKX-Market-Data-Collector"]
|
|
)
|
|
logging.info("Successfully connected to private websocket")
|
|
except Exception as e:
|
|
logging.error(f"Failed to connect to websocket: {e}")
|
|
if self.ws:
|
|
try:
|
|
self.ws.close()
|
|
except:
|
|
pass
|
|
self.ws = None
|
|
if self.ws_private:
|
|
try:
|
|
self.ws_private.close()
|
|
except:
|
|
pass
|
|
self.ws_private = None
|
|
raise
|
|
|
|
def _get_timestamp(self):
|
|
return str(round(time.time(), 3))
|
|
|
|
def _sign(self, timestamp, method, request_path, body):
|
|
if not body:
|
|
body = ''
|
|
message = f'{timestamp}{method}{request_path}{body}'
|
|
mac = hmac.new(self.api_secret.encode('utf-8'), message.encode('utf-8'), hashlib.sha256)
|
|
return base64.b64encode(mac.digest()).decode()
|
|
|
|
def _authenticate(self):
|
|
timestamp = self._get_timestamp()
|
|
sign = self._sign(timestamp, 'GET', '/users/self/verify', '')
|
|
login_params = {
|
|
"op": "login",
|
|
"args": [{
|
|
"apiKey": self.api_key,
|
|
"passphrase": self.api_passphrase,
|
|
"timestamp": timestamp,
|
|
"sign": sign
|
|
}]
|
|
}
|
|
self.ws_private.send(json.dumps(login_params))
|
|
logging.info("Waiting for login response from OKX...")
|
|
while True:
|
|
try:
|
|
resp = self.ws_private.recv()
|
|
logging.debug(f"Received from OKX private WS: {resp}")
|
|
if not resp:
|
|
continue
|
|
try:
|
|
msg = json.loads(resp)
|
|
except Exception:
|
|
logging.warning(f"Non-JSON message received: {resp}")
|
|
continue
|
|
if msg.get("event") == "login":
|
|
if msg.get("code") == "0":
|
|
logging.info("OKX WebSocket login successful.")
|
|
self.authenticated = True
|
|
break
|
|
else:
|
|
raise Exception(f"WebSocket authentication failed: {msg}")
|
|
except websocket._exceptions.WebSocketConnectionClosedException as e:
|
|
logging.error(f"WebSocket connection closed during authentication: {e}")
|
|
raise
|
|
except Exception as e:
|
|
logging.error(f"Exception during authentication: {e}")
|
|
raise
|
|
|
|
def _send_subscription(self, params, is_private=False):
|
|
ws = self.ws_private if is_private else self.ws
|
|
if not ws:
|
|
raise Exception("WebSocket connection not established")
|
|
|
|
try:
|
|
ws.send(json.dumps(params))
|
|
# Wait for subscription confirmation
|
|
resp = ws.recv()
|
|
msg = json.loads(resp)
|
|
if msg.get("event") == "error":
|
|
raise Exception(f"Subscription error: {msg}")
|
|
elif msg.get("event") == "subscribe":
|
|
logging.info(f"Successfully subscribed to {params['args'][0]['channel']}")
|
|
else:
|
|
logging.debug(f"Received subscription response: {msg}")
|
|
except Exception as e:
|
|
logging.error(f"Error during subscription: {e}")
|
|
raise
|
|
|
|
def subscribe_candlesticks(self, instrument="BTC-USDT", timeframe="1m"):
|
|
tf_map = {"1m": "candle1m", "5m": "candle5m", "15m": "candle15m", "1h": "candle1H"}
|
|
channel = tf_map.get(timeframe, f"candle{timeframe}")
|
|
params = {
|
|
"op": "subscribe",
|
|
"args": [{"channel": channel, "instId": instrument}]
|
|
}
|
|
logging.info(f"Subscribing to candlesticks for {instrument}")
|
|
self._send_subscription(params)
|
|
|
|
def subscribe_trades(self, instrument="BTC-USDT"):
|
|
params = {
|
|
"op": "subscribe",
|
|
"args": [{"channel": "trades", "instId": instrument}]
|
|
}
|
|
logging.info(f"Subscribing to trades for {instrument}")
|
|
self._send_subscription(params)
|
|
|
|
def subscribe_ticker(self, instrument="BTC-USDT"):
|
|
params = {
|
|
"op": "subscribe",
|
|
"args": [{"channel": "tickers", "instId": instrument}]
|
|
}
|
|
logging.info(f"Subscribing to ticker for {instrument}")
|
|
self._send_subscription(params)
|
|
|
|
def subscribe_book(self, instrument="BTC-USDT", depth=5, channel="books"):
|
|
params = {
|
|
"op": "subscribe",
|
|
"args": [{"channel": channel, "instId": instrument}]
|
|
}
|
|
logging.info(f"Subscribing to order book for {instrument}")
|
|
self._send_subscription(params)
|
|
|
|
def subscribe_user_order(self):
|
|
if not self.authenticated:
|
|
logging.warning("Attempted to subscribe to user order channel without authentication.")
|
|
return
|
|
params = {
|
|
"op": "subscribe",
|
|
"args": [{"channel": "orders", "instType": "SPOT"}]
|
|
}
|
|
self._send_subscription(params, is_private=True)
|
|
|
|
def subscribe_user_trade(self):
|
|
if not self.authenticated:
|
|
logging.warning("Attempted to subscribe to user trade channel without authentication.")
|
|
return
|
|
params = {
|
|
"op": "subscribe",
|
|
"args": [{"channel": "trades", "instType": "SPOT"}]
|
|
}
|
|
self._send_subscription(params, is_private=True)
|
|
|
|
def subscribe_user_balance(self):
|
|
if not self.authenticated:
|
|
logging.warning("Attempted to subscribe to user balance channel without authentication.")
|
|
return
|
|
params = {
|
|
"op": "subscribe",
|
|
"args": [{"channel": "balance_and_position"}]
|
|
}
|
|
self._send_subscription(params, is_private=True)
|
|
|
|
def get_balance(self, currency=None):
|
|
url = f"{self.REST_URL}/api/v5/account/balance"
|
|
timestamp = self._get_timestamp()
|
|
method = "GET"
|
|
request_path = "/api/v5/account/balance"
|
|
body = ''
|
|
sign = self._sign(timestamp, method, request_path, body)
|
|
headers = {
|
|
"OK-ACCESS-KEY": self.api_key,
|
|
"OK-ACCESS-SIGN": sign,
|
|
"OK-ACCESS-TIMESTAMP": timestamp,
|
|
"OK-ACCESS-PASSPHRASE": self.api_passphrase,
|
|
"Content-Type": "application/json"
|
|
}
|
|
resp = requests.get(url, headers=headers)
|
|
if resp.status_code == 200:
|
|
data = resp.json()
|
|
balances = data.get("data", [{}])[0].get("details", [])
|
|
if currency:
|
|
return [b for b in balances if b.get("ccy") == currency]
|
|
return balances
|
|
return []
|
|
|
|
def place_order(self, side, amount, instrument="BTC-USDT"):
|
|
url = f"{self.REST_URL}/api/v5/trade/order"
|
|
timestamp = self._get_timestamp()
|
|
method = "POST"
|
|
request_path = "/api/v5/trade/order"
|
|
body_dict = {
|
|
"instId": instrument,
|
|
"tdMode": "cash",
|
|
"side": side.lower(),
|
|
"ordType": "market",
|
|
"sz": str(amount)
|
|
}
|
|
body = json.dumps(body_dict)
|
|
sign = self._sign(timestamp, method, request_path, body)
|
|
headers = {
|
|
"OK-ACCESS-KEY": self.api_key,
|
|
"OK-ACCESS-SIGN": sign,
|
|
"OK-ACCESS-TIMESTAMP": timestamp,
|
|
"OK-ACCESS-PASSPHRASE": self.api_passphrase,
|
|
"Content-Type": "application/json"
|
|
}
|
|
resp = requests.post(url, headers=headers, data=body)
|
|
return resp.json()
|
|
|
|
def buy_btc(self, amount, instrument="BTC-USDT"):
|
|
return self.place_order("buy", amount, instrument)
|
|
|
|
def sell_btc(self, amount, instrument="BTC-USDT"):
|
|
return self.place_order("sell", amount, instrument)
|
|
|
|
def get_instruments(self):
|
|
url = f"{self.REST_URL}/api/v5/public/instruments?instType=SPOT"
|
|
resp = requests.get(url)
|
|
if resp.status_code == 200:
|
|
data = resp.json()
|
|
return data.get("data", [])
|
|
return []
|