Cycles/trader/cryptocom_trader.py
Simon Moisy 3483aaf6d7 Add CryptoComTrader class and main execution script for trading operations
- Introduced the CryptoComTrader class to handle WebSocket connections for real-time trading data and operations.
- Implemented methods for fetching price, order book, user balance, and placing orders.
- Added functionality to retrieve candlestick data and available trading instruments.
- Created a main script to initialize the trader, fetch instruments, and display candlestick data in a loop.
- Integrated Plotly for visualizing candlestick data, enhancing user interaction and data representation.
2025-05-23 17:14:26 +08:00

230 lines
7.9 KiB
Python

import os
import time
import hmac
import hashlib
import base64
import json
import pandas as pd
import threading
from websocket import create_connection, WebSocketTimeoutException
class CryptoComTrader:
ENV_URLS = {
"production": {
"WS_URL": "wss://deriv-stream.crypto.com/v1/market",
"WS_PRIVATE_URL": "wss://deriv-stream.crypto.com/v1/user"
},
"uat": {
"WS_URL": "wss://uat-deriv-stream.3ona.co/v1/market",
"WS_PRIVATE_URL": "wss://uat-deriv-stream.3ona.co/v1/user"
}
}
def __init__(self):
self.env = os.getenv("CRYPTOCOM_ENV", "UAT").lower()
urls = self.ENV_URLS.get(self.env, self.ENV_URLS["production"])
self.WS_URL = urls["WS_URL"]
self.WS_PRIVATE_URL = urls["WS_PRIVATE_URL"]
self.api_key = os.getenv("CRYPTOCOM_API_KEY")
self.api_secret = os.getenv("CRYPTOCOM_API_SECRET")
self.ws = None
self.ws_private = None
self._lock = threading.Lock()
self._private_lock = threading.Lock()
self._connect_ws()
def _connect_ws(self):
if self.ws is None:
self.ws = create_connection(self.WS_URL, timeout=10)
if self.api_key and self.api_secret and self.ws_private is None:
self.ws_private = create_connection(self.WS_PRIVATE_URL, timeout=10)
def _send_ws(self, payload, private=False):
ws = self.ws_private if private else self.ws
lock = self._private_lock if private else self._lock
with lock:
ws.send(json.dumps(payload))
try:
resp = ws.recv()
return json.loads(resp)
except WebSocketTimeoutException:
return None
def _sign(self, params):
t = str(int(time.time() * 1000))
params['id'] = t
params['nonce'] = t
params['api_key'] = self.api_key
param_str = json.dumps(params, separators=(',', ':'), sort_keys=True)
sig = hmac.new(
bytes(self.api_secret, 'utf-8'),
msg=bytes(param_str, 'utf-8'),
digestmod=hashlib.sha256
).hexdigest()
params['sig'] = sig
return params
def get_price(self):
"""
Get the latest ask price for BTC_USDC using WebSocket ticker subscription (one-shot).
"""
payload = {
"id": int(time.time() * 1000),
"method": "subscribe",
"params": {"channels": ["ticker.BTC_USDC"]}
}
resp = self._send_ws(payload)
# Wait for ticker update
while True:
data = self.ws.recv()
msg = json.loads(data)
if msg.get("method") == "ticker.update":
# 'a' is ask price
return msg["params"]["data"][0].get("a")
def get_order_book(self, depth=10):
"""
Fetch the order book for BTC_USDC with the specified depth using WebSocket (one-shot).
Returns a dict with 'bids' and 'asks'.
"""
payload = {
"id": int(time.time() * 1000),
"method": "subscribe",
"params": {"channels": [f"book.BTC_USDC.{depth}"]}
}
resp = self._send_ws(payload)
# Wait for book update
while True:
data = self.ws.recv()
msg = json.loads(data)
if msg.get("method") == "book.update":
book = msg["params"]["data"][0]
return {
"bids": book.get("bids", []),
"asks": book.get("asks", [])
}
def _authenticate(self):
"""
Authenticate the private WebSocket connection. Only needs to be done once per session.
"""
if not self.api_key or not self.api_secret:
raise ValueError("API key and secret must be set in environment variables.")
payload = {
"id": int(time.time() * 1000),
"method": "public/auth",
"api_key": self.api_key,
"nonce": int(time.time() * 1000),
}
# For auth, sig is HMAC_SHA256(method + id + api_key + nonce)
sig_payload = (
payload["method"] + str(payload["id"]) + self.api_key + str(payload["nonce"])
)
payload["sig"] = hmac.new(
bytes(self.api_secret, "utf-8"),
msg=bytes(sig_payload, "utf-8"),
digestmod=hashlib.sha256,
).hexdigest()
resp = self._send_ws(payload, private=True)
if not resp or resp.get("code") != 0:
raise Exception(f"WebSocket authentication failed: {resp}")
def _ensure_private_auth(self):
if self.ws_private is None:
self._connect_ws()
time.sleep(1) # recommended by docs
self._authenticate()
def get_balance(self, currency="USDC"):
"""
Fetch user balance using WebSocket private API.
"""
self._ensure_private_auth()
payload = {
"id": int(time.time() * 1000),
"method": "private/user-balance",
"params": {},
"nonce": int(time.time() * 1000),
}
resp = self._send_ws(payload, private=True)
if resp and resp.get("code") == 0:
balances = resp.get("result", {}).get("data", [])
if currency:
return [b for b in balances if b.get("instrument_name") == currency]
return balances
return []
def place_order(self, side, amount):
"""
Place a market order using WebSocket private API.
side: 'BUY' or 'SELL', amount: in BTC
"""
self._ensure_private_auth()
params = {
"instrument_name": "BTC_USDC",
"side": side,
"type": "MARKET",
"quantity": str(amount),
}
payload = {
"id": int(time.time() * 1000),
"method": "private/create-order",
"params": params,
"nonce": int(time.time() * 1000),
}
resp = self._send_ws(payload, private=True)
return resp
def buy_btc(self, amount):
return self.place_order("BUY", amount)
def sell_btc(self, amount):
return self.place_order("SELL", amount)
def get_candlesticks(self, timeframe='1m', count=100):
"""
Fetch candlestick (OHLCV) data for BTC_USDC using WebSocket.
Args:
timeframe (str): Timeframe for each candle (e.g., '1m', '5m', '15m', '1h', '4h', '1d').
count (int): Number of candles to fetch (max 1000 per API docs).
Returns:
pd.DataFrame: DataFrame with columns ['timestamp', 'open', 'high', 'low', 'close', 'volume']
"""
payload = {
"id": int(time.time() * 1000),
"method": "public/get-candlestick",
"params": {
"instrument_name": "BTC_USDC",
"timeframe": timeframe,
"count": count
}
}
resp = self._send_ws(payload)
candles = resp.get("result", {}).get("data", []) if resp else []
if not candles:
return pd.DataFrame(columns=["timestamp", "open", "high", "low", "close", "volume"])
df = pd.DataFrame(candles)
df['timestamp'] = pd.to_datetime(df['t'], unit='ms')
df = df.rename(columns={
'o': 'open',
'h': 'high',
'l': 'low',
'c': 'close',
'v': 'volume'
})
return df[['timestamp', 'open', 'high', 'low', 'close', 'volume']].sort_values('timestamp')
def get_instruments(self):
"""
Fetch the list of available trading instruments from Crypto.com using WebSocket.
Returns:
list: List of instrument dicts.
"""
payload = {
"id": int(time.time() * 1000),
"method": "public/get-instruments",
"params": {}
}
resp = self._send_ws(payload)
return resp.get("result", {}).get("data", []) if resp else []