- Introduced a new subplot for whale activity in ChartView (WIP) - Implemented alert queue processing to handle whale alerts. - Updated WhalesWatcher to support alert queue integration.
227 lines
8.6 KiB
Python
227 lines
8.6 KiB
Python
import pandas as pd
|
|
import time
|
|
from datetime import datetime, timezone
|
|
import logging
|
|
import ccxt
|
|
from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime
|
|
from sqlalchemy.ext.declarative import declarative_base
|
|
from sqlalchemy.orm import sessionmaker
|
|
|
|
Base = declarative_base()
|
|
|
|
class Alert(Base):
|
|
__tablename__ = 'alerts'
|
|
|
|
id = Column(Integer, primary_key=True)
|
|
exchange = Column(String)
|
|
order_type = Column(String)
|
|
amount = Column(Float)
|
|
price = Column(Float)
|
|
value_usd = Column(Float)
|
|
timestamp = Column(DateTime)
|
|
|
|
class WhalesWatcher:
|
|
def __init__(self, alert_queue=None):
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
|
filename='logs/whales_watcher.log',
|
|
filemode='a'
|
|
)
|
|
|
|
console_handler = logging.StreamHandler()
|
|
console_handler.setLevel(logging.INFO)
|
|
console_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
|
|
|
|
self.logger = logging.getLogger('WhalesWatcher')
|
|
self.logger.addHandler(console_handler)
|
|
|
|
self.exchanges = {}
|
|
self.symbol = 'BTC/USDT'
|
|
self.threshold = 100000
|
|
self.check_interval = 5
|
|
self.large_orders = []
|
|
self.is_running = False
|
|
self.alert_queue = alert_queue
|
|
|
|
self.logger.info("WhalesWatcher initialized with symbol: %s and threshold: $%d", self.symbol, self.threshold)
|
|
|
|
self.engine = create_engine('sqlite:///databases/alerts.db')
|
|
Base.metadata.create_all(self.engine)
|
|
self.Session = sessionmaker(bind=self.engine)
|
|
|
|
def connect_to_exchanges(self, exchange_ids, api_key=None, secret=None):
|
|
"""Connect to multiple cryptocurrency exchanges."""
|
|
connected_exchanges = {}
|
|
for exchange_id in exchange_ids:
|
|
self.logger.info("Attempting to connect to exchange: %s", exchange_id)
|
|
try:
|
|
exchange_class = getattr(ccxt, exchange_id)
|
|
exchange = exchange_class({
|
|
'apiKey': api_key,
|
|
'secret': secret,
|
|
'enableRateLimit': True,
|
|
})
|
|
exchange.load_markets()
|
|
connected_exchanges[exchange_id] = exchange
|
|
self.logger.info(f"Connected to {exchange_id} successfully")
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to connect to {exchange_id}: {str(e)}")
|
|
|
|
return connected_exchanges
|
|
|
|
def fetch_order_book(self, exchange, symbol):
|
|
"""Fetch the current order book for the given symbol on the specified exchange."""
|
|
try:
|
|
if exchange.id == 'huobi':
|
|
order_book = exchange.fetch_order_book(symbol, limit=150)
|
|
else:
|
|
order_book = exchange.fetch_order_book(symbol, limit=100)
|
|
return order_book
|
|
except Exception as e:
|
|
self.logger.error(f"Error fetching order book from {exchange.id} for {symbol}: {str(e)}")
|
|
return None
|
|
|
|
def detect_whales(self, order_book):
|
|
"""Detect whale orders in the order book."""
|
|
if not order_book:
|
|
return []
|
|
|
|
whale_orders = []
|
|
|
|
# Get current time in UTC
|
|
current_time_utc = datetime.now().astimezone().astimezone(timezone.utc)
|
|
|
|
for bid in order_book['bids']:
|
|
if (len(bid) == 3):
|
|
price, amount, timestamp = bid
|
|
else:
|
|
price, amount = bid
|
|
value = price * amount
|
|
if value >= self.threshold:
|
|
whale_orders.append({
|
|
'type': 'bid',
|
|
'price': price,
|
|
'amount': amount,
|
|
'value_usd': value,
|
|
'timestamp': current_time_utc.isoformat()
|
|
})
|
|
|
|
for ask in order_book['asks']:
|
|
if (len(ask) == 3):
|
|
price, amount, timestamp = ask
|
|
else:
|
|
price, amount = ask
|
|
value = price * amount
|
|
if value >= self.threshold:
|
|
whale_orders.append({
|
|
'type': 'ask',
|
|
'price': price,
|
|
'amount': amount,
|
|
'value_usd': value,
|
|
'timestamp': current_time_utc.isoformat()
|
|
})
|
|
|
|
return whale_orders
|
|
|
|
def analyze_whale_activity(self):
|
|
"""Analyze the collected whale activity."""
|
|
self.logger.info("Analyzing whale activity")
|
|
if not self.large_orders:
|
|
return "No whale activity detected yet."
|
|
|
|
df = pd.DataFrame(self.large_orders)
|
|
|
|
# Basic analysis
|
|
bid_count = len(df[df['type'] == 'bid'])
|
|
ask_count = len(df[df['type'] == 'ask'])
|
|
total_bid_value = df[df['type'] == 'bid']['value_usd'].sum()
|
|
total_ask_value = df[df['type'] == 'ask']['value_usd'].sum()
|
|
|
|
analysis = {
|
|
'total_whales': len(df),
|
|
'buy_orders': bid_count,
|
|
'sell_orders': ask_count,
|
|
'buy_pressure': total_bid_value,
|
|
'sell_pressure': total_ask_value,
|
|
'net_pressure': total_bid_value - total_ask_value
|
|
}
|
|
|
|
return analysis
|
|
|
|
def save_data(self, filename='whale_activity.csv'):
|
|
"""Save the collected whale data to a CSV file."""
|
|
self.logger.info("Saving whale activity data to %s", filename)
|
|
if self.large_orders:
|
|
df = pd.DataFrame(self.large_orders)
|
|
df.to_csv(filename, index=False)
|
|
self.logger.info(f"Saved whale activity data to {filename}")
|
|
|
|
def log_alert(self, order):
|
|
"""Log the alert to the database and alert queue."""
|
|
session = self.Session()
|
|
alert = Alert(
|
|
exchange=order['exchange'],
|
|
order_type='BUY' if order['type'] == 'bid' else 'SELL',
|
|
amount=order['amount'],
|
|
price=order['price'],
|
|
value_usd=order['value_usd'],
|
|
timestamp=datetime.now()
|
|
)
|
|
session.add(alert)
|
|
session.commit()
|
|
session.close()
|
|
|
|
# Add to queue if available
|
|
if self.alert_queue is not None:
|
|
self.alert_queue.put(order)
|
|
|
|
def run(self):
|
|
"""Run the whale watcher continuously."""
|
|
self.logger.info("Running whale watcher")
|
|
if not self.exchanges:
|
|
# Connect to multiple exchanges
|
|
exchange_ids = ['binance', 'kraken', 'coinbase', 'bitfinex', 'huobi', 'kucoin', 'bybit', 'okx', 'gateio', 'mexc']
|
|
self.exchanges = self.connect_to_exchanges(exchange_ids)
|
|
if not self.exchanges:
|
|
self.logger.error("Cannot run whale watcher without connecting to any exchange")
|
|
return
|
|
|
|
self.is_running = True
|
|
self.logger.info(f"Starting whale watcher with threshold ${self.threshold:,}")
|
|
self.logger.info(f"Connected to {len(self.exchanges)} exchanges: {', '.join(self.exchanges.keys())}")
|
|
|
|
try:
|
|
while self.is_running:
|
|
for exchange_id, exchange in self.exchanges.items():
|
|
if self.symbol not in exchange.markets:
|
|
continue
|
|
|
|
order_book = self.fetch_order_book(exchange, self.symbol)
|
|
if not order_book:
|
|
continue
|
|
|
|
whale_orders = self.detect_whales(order_book)
|
|
|
|
if whale_orders:
|
|
for order in whale_orders:
|
|
order['exchange'] = exchange_id
|
|
self.large_orders.extend(whale_orders)
|
|
|
|
self.log_alert(order)
|
|
|
|
order_type = "BUY" if order['type'] == 'bid' else "SELL"
|
|
self.logger.info(
|
|
f"WHALE ALERT on {exchange_id}: {order_type} {order['amount']:.4f} BTC at ${order['price']:,.2f} " +
|
|
f"(${order['value_usd']:,.2f})"
|
|
)
|
|
time.sleep(self.check_interval)
|
|
except Exception as e:
|
|
self.logger.error(f"Error in run method: {str(e)}")
|
|
|
|
def stop(self):
|
|
"""Stop the whale watcher."""
|
|
self.logger.info("Stopping whale watcher...")
|
|
self.is_running = False
|
|
self.logger.info("Stopping whale watcher...")
|