CryptoMarketParser/WhalesWatcher.py
Simon Moisy fcc0342fd8 Add whale activity plotting and alert processing to ChartView
- Introduced a new subplot for whale activity in ChartView (WIP)
- Implemented alert queue processing to handle whale alerts.
- Updated WhalesWatcher to support alert queue integration.
2025-03-25 17:00:53 +08:00

227 lines
8.6 KiB
Python

import pandas as pd
import time
from datetime import datetime, timezone
import logging
import ccxt
from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
Base = declarative_base()
class Alert(Base):
__tablename__ = 'alerts'
id = Column(Integer, primary_key=True)
exchange = Column(String)
order_type = Column(String)
amount = Column(Float)
price = Column(Float)
value_usd = Column(Float)
timestamp = Column(DateTime)
class WhalesWatcher:
def __init__(self, alert_queue=None):
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
filename='logs/whales_watcher.log',
filemode='a'
)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
self.logger = logging.getLogger('WhalesWatcher')
self.logger.addHandler(console_handler)
self.exchanges = {}
self.symbol = 'BTC/USDT'
self.threshold = 100000
self.check_interval = 5
self.large_orders = []
self.is_running = False
self.alert_queue = alert_queue
self.logger.info("WhalesWatcher initialized with symbol: %s and threshold: $%d", self.symbol, self.threshold)
self.engine = create_engine('sqlite:///databases/alerts.db')
Base.metadata.create_all(self.engine)
self.Session = sessionmaker(bind=self.engine)
def connect_to_exchanges(self, exchange_ids, api_key=None, secret=None):
"""Connect to multiple cryptocurrency exchanges."""
connected_exchanges = {}
for exchange_id in exchange_ids:
self.logger.info("Attempting to connect to exchange: %s", exchange_id)
try:
exchange_class = getattr(ccxt, exchange_id)
exchange = exchange_class({
'apiKey': api_key,
'secret': secret,
'enableRateLimit': True,
})
exchange.load_markets()
connected_exchanges[exchange_id] = exchange
self.logger.info(f"Connected to {exchange_id} successfully")
except Exception as e:
self.logger.error(f"Failed to connect to {exchange_id}: {str(e)}")
return connected_exchanges
def fetch_order_book(self, exchange, symbol):
"""Fetch the current order book for the given symbol on the specified exchange."""
try:
if exchange.id == 'huobi':
order_book = exchange.fetch_order_book(symbol, limit=150)
else:
order_book = exchange.fetch_order_book(symbol, limit=100)
return order_book
except Exception as e:
self.logger.error(f"Error fetching order book from {exchange.id} for {symbol}: {str(e)}")
return None
def detect_whales(self, order_book):
"""Detect whale orders in the order book."""
if not order_book:
return []
whale_orders = []
# Get current time in UTC
current_time_utc = datetime.now().astimezone().astimezone(timezone.utc)
for bid in order_book['bids']:
if (len(bid) == 3):
price, amount, timestamp = bid
else:
price, amount = bid
value = price * amount
if value >= self.threshold:
whale_orders.append({
'type': 'bid',
'price': price,
'amount': amount,
'value_usd': value,
'timestamp': current_time_utc.isoformat()
})
for ask in order_book['asks']:
if (len(ask) == 3):
price, amount, timestamp = ask
else:
price, amount = ask
value = price * amount
if value >= self.threshold:
whale_orders.append({
'type': 'ask',
'price': price,
'amount': amount,
'value_usd': value,
'timestamp': current_time_utc.isoformat()
})
return whale_orders
def analyze_whale_activity(self):
"""Analyze the collected whale activity."""
self.logger.info("Analyzing whale activity")
if not self.large_orders:
return "No whale activity detected yet."
df = pd.DataFrame(self.large_orders)
# Basic analysis
bid_count = len(df[df['type'] == 'bid'])
ask_count = len(df[df['type'] == 'ask'])
total_bid_value = df[df['type'] == 'bid']['value_usd'].sum()
total_ask_value = df[df['type'] == 'ask']['value_usd'].sum()
analysis = {
'total_whales': len(df),
'buy_orders': bid_count,
'sell_orders': ask_count,
'buy_pressure': total_bid_value,
'sell_pressure': total_ask_value,
'net_pressure': total_bid_value - total_ask_value
}
return analysis
def save_data(self, filename='whale_activity.csv'):
"""Save the collected whale data to a CSV file."""
self.logger.info("Saving whale activity data to %s", filename)
if self.large_orders:
df = pd.DataFrame(self.large_orders)
df.to_csv(filename, index=False)
self.logger.info(f"Saved whale activity data to {filename}")
def log_alert(self, order):
"""Log the alert to the database and alert queue."""
session = self.Session()
alert = Alert(
exchange=order['exchange'],
order_type='BUY' if order['type'] == 'bid' else 'SELL',
amount=order['amount'],
price=order['price'],
value_usd=order['value_usd'],
timestamp=datetime.now()
)
session.add(alert)
session.commit()
session.close()
# Add to queue if available
if self.alert_queue is not None:
self.alert_queue.put(order)
def run(self):
"""Run the whale watcher continuously."""
self.logger.info("Running whale watcher")
if not self.exchanges:
# Connect to multiple exchanges
exchange_ids = ['binance', 'kraken', 'coinbase', 'bitfinex', 'huobi', 'kucoin', 'bybit', 'okx', 'gateio', 'mexc']
self.exchanges = self.connect_to_exchanges(exchange_ids)
if not self.exchanges:
self.logger.error("Cannot run whale watcher without connecting to any exchange")
return
self.is_running = True
self.logger.info(f"Starting whale watcher with threshold ${self.threshold:,}")
self.logger.info(f"Connected to {len(self.exchanges)} exchanges: {', '.join(self.exchanges.keys())}")
try:
while self.is_running:
for exchange_id, exchange in self.exchanges.items():
if self.symbol not in exchange.markets:
continue
order_book = self.fetch_order_book(exchange, self.symbol)
if not order_book:
continue
whale_orders = self.detect_whales(order_book)
if whale_orders:
for order in whale_orders:
order['exchange'] = exchange_id
self.large_orders.extend(whale_orders)
self.log_alert(order)
order_type = "BUY" if order['type'] == 'bid' else "SELL"
self.logger.info(
f"WHALE ALERT on {exchange_id}: {order_type} {order['amount']:.4f} BTC at ${order['price']:,.2f} " +
f"(${order['value_usd']:,.2f})"
)
time.sleep(self.check_interval)
except Exception as e:
self.logger.error(f"Error in run method: {str(e)}")
def stop(self):
"""Stop the whale watcher."""
self.logger.info("Stopping whale watcher...")
self.is_running = False
self.logger.info("Stopping whale watcher...")