diff --git a/trader/cryptocom_trader.py b/trader/cryptocom_trader.py deleted file mode 100644 index 928dc7f..0000000 --- a/trader/cryptocom_trader.py +++ /dev/null @@ -1,229 +0,0 @@ -import os -import time -import hmac -import hashlib -import base64 -import json -import pandas as pd -import threading -from websocket import create_connection, WebSocketTimeoutException - -class CryptoComTrader: - ENV_URLS = { - "production": { - "WS_URL": "wss://deriv-stream.crypto.com/v1/market", - "WS_PRIVATE_URL": "wss://deriv-stream.crypto.com/v1/user" - }, - "uat": { - "WS_URL": "wss://uat-deriv-stream.3ona.co/v1/market", - "WS_PRIVATE_URL": "wss://uat-deriv-stream.3ona.co/v1/user" - } - } - - def __init__(self): - self.env = os.getenv("CRYPTOCOM_ENV", "UAT").lower() - urls = self.ENV_URLS.get(self.env, self.ENV_URLS["production"]) - self.WS_URL = urls["WS_URL"] - self.WS_PRIVATE_URL = urls["WS_PRIVATE_URL"] - self.api_key = os.getenv("CRYPTOCOM_API_KEY") - self.api_secret = os.getenv("CRYPTOCOM_API_SECRET") - self.ws = None - self.ws_private = None - self._lock = threading.Lock() - self._private_lock = threading.Lock() - self._connect_ws() - - def _connect_ws(self): - if self.ws is None: - self.ws = create_connection(self.WS_URL, timeout=10) - if self.api_key and self.api_secret and self.ws_private is None: - self.ws_private = create_connection(self.WS_PRIVATE_URL, timeout=10) - - def _send_ws(self, payload, private=False): - ws = self.ws_private if private else self.ws - lock = self._private_lock if private else self._lock - with lock: - ws.send(json.dumps(payload)) - try: - resp = ws.recv() - return json.loads(resp) - except WebSocketTimeoutException: - return None - - def _sign(self, params): - t = str(int(time.time() * 1000)) - params['id'] = t - params['nonce'] = t - params['api_key'] = self.api_key - param_str = json.dumps(params, separators=(',', ':'), sort_keys=True) - sig = hmac.new( - bytes(self.api_secret, 'utf-8'), - msg=bytes(param_str, 'utf-8'), - digestmod=hashlib.sha256 - ).hexdigest() - params['sig'] = sig - return params - - def get_price(self): - """ - Get the latest ask price for BTC_USDC using WebSocket ticker subscription (one-shot). - """ - payload = { - "id": int(time.time() * 1000), - "method": "subscribe", - "params": {"channels": ["ticker.BTC_USDC"]} - } - resp = self._send_ws(payload) - # Wait for ticker update - while True: - data = self.ws.recv() - msg = json.loads(data) - if msg.get("method") == "ticker.update": - # 'a' is ask price - return msg["params"]["data"][0].get("a") - - def get_order_book(self, depth=10): - """ - Fetch the order book for BTC_USDC with the specified depth using WebSocket (one-shot). - Returns a dict with 'bids' and 'asks'. - """ - payload = { - "id": int(time.time() * 1000), - "method": "subscribe", - "params": {"channels": [f"book.BTC_USDC.{depth}"]} - } - resp = self._send_ws(payload) - # Wait for book update - while True: - data = self.ws.recv() - msg = json.loads(data) - if msg.get("method") == "book.update": - book = msg["params"]["data"][0] - return { - "bids": book.get("bids", []), - "asks": book.get("asks", []) - } - - def _authenticate(self): - """ - Authenticate the private WebSocket connection. Only needs to be done once per session. - """ - if not self.api_key or not self.api_secret: - raise ValueError("API key and secret must be set in environment variables.") - payload = { - "id": int(time.time() * 1000), - "method": "public/auth", - "api_key": self.api_key, - "nonce": int(time.time() * 1000), - } - # For auth, sig is HMAC_SHA256(method + id + api_key + nonce) - sig_payload = ( - payload["method"] + str(payload["id"]) + self.api_key + str(payload["nonce"]) - ) - payload["sig"] = hmac.new( - bytes(self.api_secret, "utf-8"), - msg=bytes(sig_payload, "utf-8"), - digestmod=hashlib.sha256, - ).hexdigest() - resp = self._send_ws(payload, private=True) - if not resp or resp.get("code") != 0: - raise Exception(f"WebSocket authentication failed: {resp}") - - def _ensure_private_auth(self): - if self.ws_private is None: - self._connect_ws() - time.sleep(1) # recommended by docs - self._authenticate() - - def get_balance(self, currency="USDC"): - """ - Fetch user balance using WebSocket private API. - """ - self._ensure_private_auth() - payload = { - "id": int(time.time() * 1000), - "method": "private/user-balance", - "params": {}, - "nonce": int(time.time() * 1000), - } - resp = self._send_ws(payload, private=True) - if resp and resp.get("code") == 0: - balances = resp.get("result", {}).get("data", []) - if currency: - return [b for b in balances if b.get("instrument_name") == currency] - return balances - return [] - - def place_order(self, side, amount): - """ - Place a market order using WebSocket private API. - side: 'BUY' or 'SELL', amount: in BTC - """ - self._ensure_private_auth() - params = { - "instrument_name": "BTC_USDC", - "side": side, - "type": "MARKET", - "quantity": str(amount), - } - payload = { - "id": int(time.time() * 1000), - "method": "private/create-order", - "params": params, - "nonce": int(time.time() * 1000), - } - resp = self._send_ws(payload, private=True) - return resp - - def buy_btc(self, amount): - return self.place_order("BUY", amount) - - def sell_btc(self, amount): - return self.place_order("SELL", amount) - - def get_candlesticks(self, timeframe='1m', count=100): - """ - Fetch candlestick (OHLCV) data for BTC_USDC using WebSocket. - Args: - timeframe (str): Timeframe for each candle (e.g., '1m', '5m', '15m', '1h', '4h', '1d'). - count (int): Number of candles to fetch (max 1000 per API docs). - Returns: - pd.DataFrame: DataFrame with columns ['timestamp', 'open', 'high', 'low', 'close', 'volume'] - """ - payload = { - "id": int(time.time() * 1000), - "method": "public/get-candlestick", - "params": { - "instrument_name": "BTC_USDC", - "timeframe": timeframe, - "count": count - } - } - resp = self._send_ws(payload) - candles = resp.get("result", {}).get("data", []) if resp else [] - if not candles: - return pd.DataFrame(columns=["timestamp", "open", "high", "low", "close", "volume"]) - df = pd.DataFrame(candles) - df['timestamp'] = pd.to_datetime(df['t'], unit='ms') - df = df.rename(columns={ - 'o': 'open', - 'h': 'high', - 'l': 'low', - 'c': 'close', - 'v': 'volume' - }) - return df[['timestamp', 'open', 'high', 'low', 'close', 'volume']].sort_values('timestamp') - - def get_instruments(self): - """ - Fetch the list of available trading instruments from Crypto.com using WebSocket. - Returns: - list: List of instrument dicts. - """ - payload = { - "id": int(time.time() * 1000), - "method": "public/get-instruments", - "params": {} - } - resp = self._send_ws(payload) - return resp.get("result", {}).get("data", []) if resp else [] diff --git a/trader/main.py b/trader/main.py index 8c1234d..cf5ef52 100644 --- a/trader/main.py +++ b/trader/main.py @@ -1,84 +1,42 @@ import time -import plotly.graph_objs as go -import plotly.io as pio -from cryptocom_trader import CryptoComTrader - - -def plot_candlesticks(df): - if df.empty: - print("No data to plot.") - return None - - # Convert columns to float - for col in ['open', 'high', 'low', 'close', 'volume']: - df[col] = df[col].astype(float) - - # Plotly expects datetime for x-axis - fig = go.Figure(data=[go.Candlestick( - x=df['timestamp'], - open=df['open'], - high=df['high'], - low=df['low'], - close=df['close'], - increasing_line_color='#089981', - decreasing_line_color='#F23645' - )]) - - fig.update_layout( - title='BTC/USDC Realtime Candlestick (1m)', - yaxis_title='Price (USDC)', - xaxis_title='Time', - xaxis_rangeslider_visible=False, - template='plotly_dark' - ) - return fig +from okx_trader import OKXTrader +import json def main(): - trader = CryptoComTrader() - pio.renderers.default = "browser" # Open in browser + trader = OKXTrader() + instrument = "BTC-USDT" - # Fetch and print BTC/USDC-related instruments - instruments = trader.get_instruments() - btc_usdc_instruments = [ - inst for inst in instruments - if ( - ('BTC' in inst.get('base_ccy', '') or 'BTC' in inst.get('base_currency', '')) and - ('USDC' in inst.get('quote_ccy', '') or 'USDC' in inst.get('quote_currency', '')) - ) - ] - print("BTC/USDC-related instruments:") - for inst in btc_usdc_instruments: - print(inst) + trader.subscribe_candlesticks(instrument, timeframe="5m") + # trader.subscribe_trades(instrument) + trader.subscribe_ticker(instrument) + # trader.subscribe_book(instrument, depth=5) + # trader.subscribe_user_order() + # trader.subscribe_user_trade() + # trader.subscribe_user_balance() + + # print(trader.get_balance()) - # Optionally, show balance (private API) try: - balance = trader.get_balance("USDC") - print("USDC Balance:", balance) - except Exception as e: - print("[WARN] Could not fetch balance (private API):", e) + while True: + try: + data = trader.ws.recv() + except Exception as e: + print(f"[WARN] WebSocket disconnected or error: {e}.") + + trader = OKXTrader() + trader.subscribe_ticker(instrument) + + continue - all_instruments = trader.get_instruments() - for inst in all_instruments: - print(inst) + if data == '': continue - while True: - try: - df = trader.get_candlesticks(timeframe='1m', count=60) - # fig = plot_candlesticks(df) - # if fig: - # fig.show() - if not df.empty: - print(df[['high', 'low', 'open', 'close', 'volume']]) - else: - print("No data to print.") - time.sleep(10) - except KeyboardInterrupt: - print('Exiting...') - break - except Exception as e: - print(f'Error: {e}') - time.sleep(10) + msg = json.loads(data) + print(json.dumps(msg, indent=4)) + + time.sleep(1) + except KeyboardInterrupt: + print('Exiting...') if __name__ == '__main__': main() diff --git a/trader/okx_trader.py b/trader/okx_trader.py new file mode 100644 index 0000000..8bf4671 --- /dev/null +++ b/trader/okx_trader.py @@ -0,0 +1,211 @@ +import os +import time +import hmac +import hashlib +import base64 +import json +import pandas as pd +import threading +import requests +import websocket +import datetime + +class OKXTrader: + PUBLIC_WS_URL = "wss://ws.okx.com:8443/ws/v5/public" + PRIVATE_WS_URL = "wss://ws.okx.com:8443/ws/v5/private" + REST_URL = "https://www.okx.com" + + def __init__(self): + # Load credentials from JSON config file + config_path = os.path.join(os.path.dirname(__file__), '../credentials/okx_creds.json') + try: + with open(config_path, 'r') as f: + config = json.load(f) + except FileNotFoundError: + raise FileNotFoundError(f"Credentials file not found at {config_path}. Please create it with the required keys.") + except json.JSONDecodeError: + raise ValueError(f"Credentials file at {config_path} is not valid JSON.") + + self.api_key = config.get("OKX_API_KEY") + self.api_secret = config.get("OKX_API_SECRET") + self.api_passphrase = config.get("OKX_API_PASSPHRASE") + if not self.api_key or not self.api_secret or not self.api_passphrase: + raise ValueError("API key, secret, and passphrase must be set in the credentials JSON file.") + self.ws = None + self.ws_private = None + self._lock = threading.Lock() + self._private_lock = threading.Lock() + print(f"[DEBUG] Connecting to public WebSocket: {self.PUBLIC_WS_URL}") + self._connect_ws() + self._authenticate() + + def _connect_ws(self): + if self.ws is None: + self.ws = websocket.create_connection(self.PUBLIC_WS_URL, timeout=10) + if self.api_key and self.api_secret and self.api_passphrase and self.ws_private is None: + self.ws_private = websocket.create_connection(self.PRIVATE_WS_URL, timeout=10) + + def _get_timestamp(self): + return str(round(time.time(), 3)) + + def _sign(self, timestamp, method, request_path, body): + if not body: + body = '' + message = f'{timestamp}{method}{request_path}{body}' + mac = hmac.new(self.api_secret.encode('utf-8'), message.encode('utf-8'), hashlib.sha256) + return base64.b64encode(mac.digest()).decode() + + def _authenticate(self): + import websocket + timestamp = self._get_timestamp() + sign = self._sign(timestamp, 'GET', '/users/self/verify', '') + login_params = { + "op": "login", + "args": [{ + "apiKey": self.api_key, + "passphrase": self.api_passphrase, + "timestamp": timestamp, + "sign": sign + }] + } + self.ws_private.send(json.dumps(login_params)) + print("Waiting for login response from OKX...") + while True: + try: + resp = self.ws_private.recv() + print(f"[DEBUG] Received from OKX private WS: {resp}") + if not resp: + continue + try: + msg = json.loads(resp) + except Exception: + print(f"[WARN] Non-JSON message received: {resp}") + continue + if msg.get("event") == "login": + if msg.get("code") == "0": + print("[INFO] OKX WebSocket login successful.") + break + else: + raise Exception(f"WebSocket authentication failed: {msg}") + except websocket._exceptions.WebSocketConnectionClosedException as e: + print(f"[ERROR] WebSocket connection closed during authentication: {e}") + raise + except Exception as e: + print(f"[ERROR] Exception during authentication: {e}") + raise + + def subscribe_candlesticks(self, instrument="BTC-USDT", timeframe="1m"): + # OKX uses candle1m, candle5m, etc. + tf_map = {"1m": "candle1m", "5m": "candle5m", "15m": "candle15m", "1h": "candle1H"} + channel = tf_map.get(timeframe, f"candle{timeframe}") + params = { + "op": "subscribe", + "args": [{"channel": channel, "instId": instrument}] + } + print(f"[DEBUG] Subscribing to candlesticks: {json.dumps(params)}") + self.ws.send(json.dumps(params)) + + def subscribe_trades(self, instrument="BTC-USDT"): + params = { + "op": "subscribe", + "args": [{"channel": "trades", "instId": instrument}] + } + self.ws.send(json.dumps(params)) + + def subscribe_ticker(self, instrument="BTC-USDT"): + params = { + "op": "subscribe", + "args": [{"channel": "tickers", "instId": instrument}] + } + self.ws.send(json.dumps(params)) + + def subscribe_book(self, instrument="BTC-USDT", depth=5): + # OKX supports books5, books50, books-l2-tbt + channel = "books5" if depth <= 5 else "books50" + params = { + "op": "subscribe", + "args": [{"channel": channel, "instId": instrument}] + } + self.ws.send(json.dumps(params)) + + def subscribe_user_order(self): + params = { + "op": "subscribe", + "args": [{"channel": "orders", "instType": "SPOT"}] + } + self.ws_private.send(json.dumps(params)) + + def subscribe_user_trade(self): + params = { + "op": "subscribe", + "args": [{"channel": "trades", "instType": "SPOT"}] + } + self.ws_private.send(json.dumps(params)) + + def subscribe_user_balance(self): + params = { + "op": "subscribe", + "args": [{"channel": "balance_and_position"}] + } + self.ws_private.send(json.dumps(params)) + + def get_balance(self, currency=None): + url = f"{self.REST_URL}/api/v5/account/balance" + timestamp = self._get_timestamp() + method = "GET" + request_path = "/api/v5/account/balance" + body = '' + sign = self._sign(timestamp, method, request_path, body) + headers = { + "OK-ACCESS-KEY": self.api_key, + "OK-ACCESS-SIGN": sign, + "OK-ACCESS-TIMESTAMP": timestamp, + "OK-ACCESS-PASSPHRASE": self.api_passphrase, + "Content-Type": "application/json" + } + resp = requests.get(url, headers=headers) + if resp.status_code == 200: + data = resp.json() + balances = data.get("data", [{}])[0].get("details", []) + if currency: + return [b for b in balances if b.get("ccy") == currency] + return balances + return [] + + def place_order(self, side, amount, instrument="BTC-USDT"): + url = f"{self.REST_URL}/api/v5/trade/order" + timestamp = self._get_timestamp() + method = "POST" + request_path = "/api/v5/trade/order" + body_dict = { + "instId": instrument, + "tdMode": "cash", + "side": side.lower(), + "ordType": "market", + "sz": str(amount) + } + body = json.dumps(body_dict) + sign = self._sign(timestamp, method, request_path, body) + headers = { + "OK-ACCESS-KEY": self.api_key, + "OK-ACCESS-SIGN": sign, + "OK-ACCESS-TIMESTAMP": timestamp, + "OK-ACCESS-PASSPHRASE": self.api_passphrase, + "Content-Type": "application/json" + } + resp = requests.post(url, headers=headers, data=body) + return resp.json() + + def buy_btc(self, amount, instrument="BTC-USDT"): + return self.place_order("buy", amount, instrument) + + def sell_btc(self, amount, instrument="BTC-USDT"): + return self.place_order("sell", amount, instrument) + + def get_instruments(self): + url = f"{self.REST_URL}/api/v5/public/instruments?instType=SPOT" + resp = requests.get(url) + if resp.status_code == 200: + data = resp.json() + return data.get("data", []) + return []