import os import time import hmac import hashlib import base64 import json import pandas as pd import threading from websocket import create_connection, WebSocketTimeoutException class CryptoComTrader: ENV_URLS = { "production": { "WS_URL": "wss://deriv-stream.crypto.com/v1/market", "WS_PRIVATE_URL": "wss://deriv-stream.crypto.com/v1/user" }, "uat": { "WS_URL": "wss://uat-deriv-stream.3ona.co/v1/market", "WS_PRIVATE_URL": "wss://uat-deriv-stream.3ona.co/v1/user" } } def __init__(self): self.env = os.getenv("CRYPTOCOM_ENV", "UAT").lower() urls = self.ENV_URLS.get(self.env, self.ENV_URLS["production"]) self.WS_URL = urls["WS_URL"] self.WS_PRIVATE_URL = urls["WS_PRIVATE_URL"] self.api_key = os.getenv("CRYPTOCOM_API_KEY") self.api_secret = os.getenv("CRYPTOCOM_API_SECRET") self.ws = None self.ws_private = None self._lock = threading.Lock() self._private_lock = threading.Lock() self._connect_ws() def _connect_ws(self): if self.ws is None: self.ws = create_connection(self.WS_URL, timeout=10) if self.api_key and self.api_secret and self.ws_private is None: self.ws_private = create_connection(self.WS_PRIVATE_URL, timeout=10) def _send_ws(self, payload, private=False): ws = self.ws_private if private else self.ws lock = self._private_lock if private else self._lock with lock: ws.send(json.dumps(payload)) try: resp = ws.recv() return json.loads(resp) except WebSocketTimeoutException: return None def _sign(self, params): t = str(int(time.time() * 1000)) params['id'] = t params['nonce'] = t params['api_key'] = self.api_key param_str = json.dumps(params, separators=(',', ':'), sort_keys=True) sig = hmac.new( bytes(self.api_secret, 'utf-8'), msg=bytes(param_str, 'utf-8'), digestmod=hashlib.sha256 ).hexdigest() params['sig'] = sig return params def get_price(self): """ Get the latest ask price for BTC_USDC using WebSocket ticker subscription (one-shot). """ payload = { "id": int(time.time() * 1000), "method": "subscribe", "params": {"channels": ["ticker.BTC_USDC"]} } resp = self._send_ws(payload) # Wait for ticker update while True: data = self.ws.recv() msg = json.loads(data) if msg.get("method") == "ticker.update": # 'a' is ask price return msg["params"]["data"][0].get("a") def get_order_book(self, depth=10): """ Fetch the order book for BTC_USDC with the specified depth using WebSocket (one-shot). Returns a dict with 'bids' and 'asks'. """ payload = { "id": int(time.time() * 1000), "method": "subscribe", "params": {"channels": [f"book.BTC_USDC.{depth}"]} } resp = self._send_ws(payload) # Wait for book update while True: data = self.ws.recv() msg = json.loads(data) if msg.get("method") == "book.update": book = msg["params"]["data"][0] return { "bids": book.get("bids", []), "asks": book.get("asks", []) } def _authenticate(self): """ Authenticate the private WebSocket connection. Only needs to be done once per session. """ if not self.api_key or not self.api_secret: raise ValueError("API key and secret must be set in environment variables.") payload = { "id": int(time.time() * 1000), "method": "public/auth", "api_key": self.api_key, "nonce": int(time.time() * 1000), } # For auth, sig is HMAC_SHA256(method + id + api_key + nonce) sig_payload = ( payload["method"] + str(payload["id"]) + self.api_key + str(payload["nonce"]) ) payload["sig"] = hmac.new( bytes(self.api_secret, "utf-8"), msg=bytes(sig_payload, "utf-8"), digestmod=hashlib.sha256, ).hexdigest() resp = self._send_ws(payload, private=True) if not resp or resp.get("code") != 0: raise Exception(f"WebSocket authentication failed: {resp}") def _ensure_private_auth(self): if self.ws_private is None: self._connect_ws() time.sleep(1) # recommended by docs self._authenticate() def get_balance(self, currency="USDC"): """ Fetch user balance using WebSocket private API. """ self._ensure_private_auth() payload = { "id": int(time.time() * 1000), "method": "private/user-balance", "params": {}, "nonce": int(time.time() * 1000), } resp = self._send_ws(payload, private=True) if resp and resp.get("code") == 0: balances = resp.get("result", {}).get("data", []) if currency: return [b for b in balances if b.get("instrument_name") == currency] return balances return [] def place_order(self, side, amount): """ Place a market order using WebSocket private API. side: 'BUY' or 'SELL', amount: in BTC """ self._ensure_private_auth() params = { "instrument_name": "BTC_USDC", "side": side, "type": "MARKET", "quantity": str(amount), } payload = { "id": int(time.time() * 1000), "method": "private/create-order", "params": params, "nonce": int(time.time() * 1000), } resp = self._send_ws(payload, private=True) return resp def buy_btc(self, amount): return self.place_order("BUY", amount) def sell_btc(self, amount): return self.place_order("SELL", amount) def get_candlesticks(self, timeframe='1m', count=100): """ Fetch candlestick (OHLCV) data for BTC_USDC using WebSocket. Args: timeframe (str): Timeframe for each candle (e.g., '1m', '5m', '15m', '1h', '4h', '1d'). count (int): Number of candles to fetch (max 1000 per API docs). Returns: pd.DataFrame: DataFrame with columns ['timestamp', 'open', 'high', 'low', 'close', 'volume'] """ payload = { "id": int(time.time() * 1000), "method": "public/get-candlestick", "params": { "instrument_name": "BTC_USDC", "timeframe": timeframe, "count": count } } resp = self._send_ws(payload) candles = resp.get("result", {}).get("data", []) if resp else [] if not candles: return pd.DataFrame(columns=["timestamp", "open", "high", "low", "close", "volume"]) df = pd.DataFrame(candles) df['timestamp'] = pd.to_datetime(df['t'], unit='ms') df = df.rename(columns={ 'o': 'open', 'h': 'high', 'l': 'low', 'c': 'close', 'v': 'volume' }) return df[['timestamp', 'open', 'high', 'low', 'close', 'volume']].sort_values('timestamp') def get_instruments(self): """ Fetch the list of available trading instruments from Crypto.com using WebSocket. Returns: list: List of instrument dicts. """ payload = { "id": int(time.time() * 1000), "method": "public/get-instruments", "params": {} } resp = self._send_ws(payload) return resp.get("result", {}).get("data", []) if resp else []