Files
lowkey_backtest/engine/cryptoquant.py

202 lines
7.0 KiB
Python
Raw Permalink Normal View History

import os
import sys
import time
import requests
import pandas as pd
from datetime import datetime, timedelta
from dotenv import load_dotenv
# Load env vars from .env file
load_dotenv()
# Fix path for direct execution
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from engine.logging_config import get_logger
logger = get_logger(__name__)
class CryptoQuantClient:
"""
Client for fetching data from CryptoQuant API.
"""
BASE_URL = "https://api.cryptoquant.com/v1"
def __init__(self, api_key: str | None = None):
self.api_key = api_key or os.getenv("CRYPTOQUANT_API_KEY")
if not self.api_key:
raise ValueError("CryptoQuant API Key not found. Set CRYPTOQUANT_API_KEY env var.")
self.headers = {
"Authorization": f"Bearer {self.api_key}"
}
def fetch_metric(
self,
metric_path: str,
symbol: str,
start_date: str,
end_date: str,
exchange: str | None = "all_exchange",
window: str = "day"
) -> pd.DataFrame:
"""
Fetch a specific metric from CryptoQuant.
"""
url = f"{self.BASE_URL}/{metric_path}"
params = {
"window": window,
"from": start_date,
"to": end_date,
"limit": 100000
}
if exchange:
params["exchange"] = exchange
logger.info(f"Fetching {metric_path} for {symbol} ({start_date}-{end_date})...")
try:
response = requests.get(url, headers=self.headers, params=params)
response.raise_for_status()
data = response.json()
if 'result' in data and 'data' in data['result']:
df = pd.DataFrame(data['result']['data'])
if not df.empty:
if 'date' in df.columns:
df['timestamp'] = pd.to_datetime(df['date'])
df.set_index('timestamp', inplace=True)
df.sort_index(inplace=True)
return df
return pd.DataFrame()
except Exception as e:
logger.error(f"Error fetching CQ data {metric_path}: {e}")
if 'response' in locals() and hasattr(response, 'text'):
logger.error(f"Response: {response.text}")
return pd.DataFrame()
def fetch_multi_metrics(self, symbols: list[str], metrics: dict, start_date: str, end_date: str):
"""
Fetch multiple metrics for multiple symbols and combine them.
"""
combined_df = pd.DataFrame()
for symbol in symbols:
asset = symbol.lower()
for metric_name, api_path in metrics.items():
full_path = f"{asset}/{api_path}"
# Some metrics (like funding rates) might need specific exchange vs all_exchange
# Defaulting to all_exchange is usually safe for flows, but check specific logic if needed
exchange_param = "all_exchange"
if "funding-rates" in api_path:
# For funding rates, 'all_exchange' might not be valid or might be aggregated
# Let's try 'binance' as a proxy for market sentiment if all fails,
# or keep 'all_exchange' if supported.
# Based on testing, 'all_exchange' is standard for flows.
pass
df = self.fetch_metric(full_path, asset, start_date, end_date, exchange=exchange_param)
if not df.empty:
target_col = None
# Heuristic to find the value column
candidates = ['funding_rate', 'reserve', 'inflow_total', 'outflow_total', 'open_interest', 'ratio', 'value']
for col in df.columns:
if col in candidates:
target_col = col
break
if not target_col:
# Fallback: take first numeric col that isn't date
for col in df.columns:
if col not in ['date', 'datetime', 'timestamp_str', 'block_height']:
target_col = col
break
if target_col:
col_name = f"{asset}_{metric_name}"
subset = df[[target_col]].rename(columns={target_col: col_name})
if combined_df.empty:
combined_df = subset
else:
combined_df = combined_df.join(subset, how='outer')
time.sleep(0.2)
return combined_df
def fetch_history_chunked(
self,
symbols: list[str],
metrics: dict,
start_date: str,
end_date: str,
chunk_months: int = 3
) -> pd.DataFrame:
"""
Fetch historical data in chunks to avoid API limits.
"""
start_dt = datetime.strptime(start_date, "%Y%m%d")
end_dt = datetime.strptime(end_date, "%Y%m%d")
all_data = []
current = start_dt
while current < end_dt:
next_chunk = current + timedelta(days=chunk_months * 30)
if next_chunk > end_dt:
next_chunk = end_dt
s_str = current.strftime("%Y%m%d")
e_str = next_chunk.strftime("%Y%m%d")
logger.info(f"Processing chunk: {s_str} to {e_str}")
chunk_df = self.fetch_multi_metrics(symbols, metrics, s_str, e_str)
if not chunk_df.empty:
all_data.append(chunk_df)
current = next_chunk + timedelta(days=1)
time.sleep(1) # Be nice to API
if not all_data:
return pd.DataFrame()
# Combine all chunks
full_df = pd.concat(all_data)
# Remove duplicates if any overlap
full_df = full_df[~full_df.index.duplicated(keep='first')]
full_df.sort_index(inplace=True)
return full_df
if __name__ == "__main__":
cq = CryptoQuantClient()
# 12 Months Data (Jan 1 2025 - Jan 14 2026)
start = "20250101"
end = "20260114"
metrics = {
"reserves": "exchange-flows/exchange-reserve",
"inflow": "exchange-flows/inflow",
"funding": "market-data/funding-rates"
}
print(f"Fetching training data from {start} to {end}...")
df = cq.fetch_history_chunked(["btc", "eth"], metrics, start, end)
output_file = "data/cq_training_data.csv"
os.makedirs("data", exist_ok=True)
df.to_csv(output_file)
print(f"\nSaved {len(df)} rows to {output_file}")
print(df.head())