Add whale activity plotting and alert processing to ChartView

- Introduced a new subplot for whale activity in ChartView (WIP)
- Implemented alert queue processing to handle whale alerts.
- Updated WhalesWatcher to support alert queue integration.
This commit is contained in:
Simon Moisy 2025-03-25 17:00:53 +08:00
parent 389cd7c919
commit fcc0342fd8
3 changed files with 188 additions and 19 deletions

View File

@ -8,9 +8,11 @@ from matplotlib.dates import date2num
import threading import threading
import tkinter as tk import tkinter as tk
from tkinter import ttk from tkinter import ttk
import queue
from datetime import datetime
class ChartView: class ChartView:
def __init__(self, exchange_ids, symbol='BTC/USDT', timeframe='1m', limit=100): def __init__(self, exchange_ids, symbol='BTC/USDT', timeframe='1m', limit=100, alert_queue=None):
self.exchange_ids = exchange_ids self.exchange_ids = exchange_ids
self.symbol = symbol self.symbol = symbol
self.timeframe = timeframe self.timeframe = timeframe
@ -19,10 +21,16 @@ class ChartView:
self.fig = None self.fig = None
self.ax1 = None self.ax1 = None
self.ax2 = None self.ax2 = None
self.ax_whales = None # New axis for whale activity
self.animation = None self.animation = None
self.current_exchange_id = exchange_ids[0] self.current_exchange_id = exchange_ids[0]
self.root = None self.root = None
self.canvas = None self.canvas = None
self.alert_queue = alert_queue
self.whale_markers = []
# Maintain a dict of whale alerts by exchange
self.whale_alerts = {exchange_id: [] for exchange_id in exchange_ids}
# Create exchanges dictionary to avoid reconnecting each time # Create exchanges dictionary to avoid reconnecting each time
self.exchanges = {} self.exchanges = {}
@ -69,7 +77,12 @@ class ChartView:
if df is not None: if df is not None:
self.ax1.clear() self.ax1.clear()
self.ax2.clear() self.ax2.clear()
self.ax_whales.clear()
# Process any pending whale alerts from the queue
self.process_whale_queue()
# Plot the chart
mpf.plot(df, type='candle', ax=self.ax1, volume=self.ax2, mpf.plot(df, type='candle', ax=self.ax1, volume=self.ax2,
style='yahoo', xrotation=0, ylabel='Price', style='yahoo', xrotation=0, ylabel='Price',
ylabel_lower='Volume', show_nontrading=False) ylabel_lower='Volume', show_nontrading=False)
@ -77,9 +90,149 @@ class ChartView:
latest_price = df['close'].iloc[-1] latest_price = df['close'].iloc[-1]
self.ax1.set_title(f'{self.symbol} on {self.current_exchange_id} - Last: ${latest_price:.2f}') self.ax1.set_title(f'{self.symbol} on {self.current_exchange_id} - Last: ${latest_price:.2f}')
# Plot whale activity on the dedicated subplot
self.plot_whale_activity(df)
# Refresh the canvas # Refresh the canvas
self.canvas.draw() self.canvas.draw()
def process_whale_queue(self):
"""Process any new whale alerts from the queue"""
if self.alert_queue is None:
return
# Process all available alerts
while not self.alert_queue.empty():
try:
alert = self.alert_queue.get_nowait()
exchange_id = alert['exchange']
# Add timestamp if not present
if 'timestamp' not in alert:
alert['timestamp'] = datetime.now().isoformat()
# Store the alert with the appropriate exchange
if exchange_id in self.whale_alerts:
self.whale_alerts[exchange_id].append(alert)
print(f"New whale alert for {exchange_id}: {alert['type']} ${alert['value_usd']:,.2f}")
self.alert_queue.task_done()
except queue.Empty:
break
except Exception as e:
print(f"Error processing whale alert from queue: {str(e)}")
if self.alert_queue is not None:
self.alert_queue.task_done()
def plot_whale_activity(self, df):
"""Plot whale activity on the dedicated whale subplot"""
if self.current_exchange_id not in self.whale_alerts:
self.ax_whales.set_ylabel('Whale Activity')
return
# Get the whale alerts for current exchange
exchange_alerts = self.whale_alerts[self.current_exchange_id]
if not exchange_alerts:
self.ax_whales.set_ylabel('Whale Activity')
return
try:
# Create a dataframe from the whale alerts
alerts_df = pd.DataFrame(exchange_alerts)
# Handle the timestamp conversion carefully
alerts_df['timestamp'] = pd.to_datetime(alerts_df['timestamp'])
# Check if the dataframe timestamps have timezone info
has_tz = False
if not alerts_df.empty:
has_tz = alerts_df['timestamp'].iloc[0].tzinfo is not None
# Get the start and end time of the current chart
start_time = df.index[0]
end_time = df.index[-1]
# Convert all timestamps to naive (remove timezone info) for comparison
# First create a copy of the timestamp column
alerts_df['plot_timestamp'] = alerts_df['timestamp']
# If timestamps have timezone, convert them to naive by replacing with their UTC equivalent
if has_tz:
alerts_df['plot_timestamp'] = alerts_df['timestamp'].dt.tz_localize(None)
else:
# If timestamps are naive, assume they're in local time and adjust by GMT+8 offset
alerts_df['plot_timestamp'] = alerts_df['timestamp'] - pd.Timedelta(hours=8)
# Filter to only include alerts in the visible time range
visible_alerts = alerts_df[
(alerts_df['plot_timestamp'] >= start_time) &
(alerts_df['plot_timestamp'] <= end_time)
]
if visible_alerts.empty:
self.ax_whales.set_ylabel('Whale Activity')
return
# Create two separate series for buy and sell orders
buy_orders = visible_alerts[visible_alerts['type'] == 'bid'].copy()
sell_orders = visible_alerts[visible_alerts['type'] == 'ask'].copy()
# Draw the buy orders as green bars going up
if not buy_orders.empty:
buy_values = buy_orders['value_usd'] / 1_000_000 # Convert to millions
self.ax_whales.bar(buy_orders['plot_timestamp'], buy_values,
color='green', alpha=0.6, width=pd.Timedelta(minutes=1))
# Draw the sell orders as red bars going down
if not sell_orders.empty:
sell_values = -1 * sell_orders['value_usd'] / 1_000_000 # Convert to millions and make negative
self.ax_whales.bar(sell_orders['plot_timestamp'], sell_values,
color='red', alpha=0.6, width=pd.Timedelta(minutes=1))
# Format the whale activity subplot
self.ax_whales.set_ylabel('Whale Activity ($M)')
# Add zero line
self.ax_whales.axhline(y=0, color='black', linestyle='-', alpha=0.3)
# Set y-axis limits with some padding
all_values = visible_alerts['value_usd'] / 1_000_000
if len(all_values) > 0:
max_val = all_values.max() * 1.1
self.ax_whales.set_ylim(-max_val, max_val)
# Align the x-axis with the price chart
self.ax_whales.sharex(self.ax1)
# Add text labels for significant whale activity
for idx, row in visible_alerts.iterrows():
value_millions = row['value_usd'] / 1_000_000
sign = 1 if row['type'] == 'bid' else -1
position = sign * value_millions
if abs(value_millions) > max(all_values) * 0.3: # Only label significant activity
self.ax_whales.text(
row['plot_timestamp'], position * 1.05,
f"${abs(value_millions):.1f}M",
ha='center', va='bottom' if sign > 0 else 'top',
fontsize=8, color='green' if sign > 0 else 'red'
)
except Exception as e:
print(f"Error plotting whale activity: {str(e)}")
import traceback
traceback.print_exc()
self.ax_whales.set_ylabel('Whale Activity - Error plotting data')
def parse_timeframe_to_minutes(self, timeframe):
"""Convert timeframe string to minutes"""
if timeframe.endswith('m'):
return int(timeframe[:-1])
elif timeframe.endswith('h'):
return int(timeframe[:-1]) * 60
elif timeframe.endswith('d'):
return int(timeframe[:-1]) * 1440
return 1 # default to 1 minute
def on_exchange_change(self, event=None): def on_exchange_change(self, event=None):
selected_exchange = self.exchange_var.get() selected_exchange = self.exchange_var.get()
if selected_exchange != self.current_exchange_id: if selected_exchange != self.current_exchange_id:
@ -102,10 +255,14 @@ class ChartView:
exchange_dropdown.pack(side=tk.LEFT, padx=(0, 10)) exchange_dropdown.pack(side=tk.LEFT, padx=(0, 10))
exchange_dropdown.bind("<<ComboboxSelected>>", self.on_exchange_change) exchange_dropdown.bind("<<ComboboxSelected>>", self.on_exchange_change)
# Create the figure # Create the figure with three subplots
self.fig = plt.Figure(figsize=(12, 8), dpi=100) self.fig = plt.Figure(figsize=(12, 8), dpi=100)
self.ax1 = self.fig.add_subplot(6, 1, (1, 4))
self.ax2 = self.fig.add_subplot(6, 1, (5, 6), sharex=self.ax1) # Adjust the subplot grid to add whale activity panel
# Price chart (60%), Volume (20%), Whale Activity (20%)
self.ax1 = self.fig.add_subplot(5, 1, (1, 3)) # Price chart - 3/5 of the space
self.ax2 = self.fig.add_subplot(5, 1, 4, sharex=self.ax1) # Volume - 1/5 of the space
self.ax_whales = self.fig.add_subplot(5, 1, 5, sharex=self.ax1) # Whale activity - 1/5 of the space
# Create the canvas to display the figure # Create the canvas to display the figure
self.canvas = FigureCanvasTkAgg(self.fig, master=self.root) self.canvas = FigureCanvasTkAgg(self.fig, master=self.root)

View File

@ -1,6 +1,6 @@
import pandas as pd import pandas as pd
import time import time
from datetime import datetime from datetime import datetime, timezone
import logging import logging
import ccxt import ccxt
from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime
@ -21,12 +21,12 @@ class Alert(Base):
timestamp = Column(DateTime) timestamp = Column(DateTime)
class WhalesWatcher: class WhalesWatcher:
def __init__(self): def __init__(self, alert_queue=None):
logging.basicConfig( logging.basicConfig(
level=logging.INFO, level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
filename='logs/whales_watcher.log', filename='logs/whales_watcher.log',
filemode='a' # Append mode filemode='a'
) )
console_handler = logging.StreamHandler() console_handler = logging.StreamHandler()
@ -38,10 +38,11 @@ class WhalesWatcher:
self.exchanges = {} self.exchanges = {}
self.symbol = 'BTC/USDT' self.symbol = 'BTC/USDT'
self.threshold = 1000000 self.threshold = 100000
self.check_interval = 5 self.check_interval = 5
self.large_orders = [] self.large_orders = []
self.is_running = False self.is_running = False
self.alert_queue = alert_queue
self.logger.info("WhalesWatcher initialized with symbol: %s and threshold: $%d", self.symbol, self.threshold) self.logger.info("WhalesWatcher initialized with symbol: %s and threshold: $%d", self.symbol, self.threshold)
@ -88,6 +89,9 @@ class WhalesWatcher:
whale_orders = [] whale_orders = []
# Get current time in UTC
current_time_utc = datetime.now().astimezone().astimezone(timezone.utc)
for bid in order_book['bids']: for bid in order_book['bids']:
if (len(bid) == 3): if (len(bid) == 3):
price, amount, timestamp = bid price, amount, timestamp = bid
@ -100,7 +104,7 @@ class WhalesWatcher:
'price': price, 'price': price,
'amount': amount, 'amount': amount,
'value_usd': value, 'value_usd': value,
'timestamp': datetime.now().isoformat() 'timestamp': current_time_utc.isoformat()
}) })
for ask in order_book['asks']: for ask in order_book['asks']:
@ -115,7 +119,7 @@ class WhalesWatcher:
'price': price, 'price': price,
'amount': amount, 'amount': amount,
'value_usd': value, 'value_usd': value,
'timestamp': datetime.now().isoformat() 'timestamp': current_time_utc.isoformat()
}) })
return whale_orders return whale_orders
@ -154,7 +158,7 @@ class WhalesWatcher:
self.logger.info(f"Saved whale activity data to {filename}") self.logger.info(f"Saved whale activity data to {filename}")
def log_alert(self, order): def log_alert(self, order):
"""Log the alert to the database.""" """Log the alert to the database and alert queue."""
session = self.Session() session = self.Session()
alert = Alert( alert = Alert(
exchange=order['exchange'], exchange=order['exchange'],
@ -167,6 +171,10 @@ class WhalesWatcher:
session.add(alert) session.add(alert)
session.commit() session.commit()
session.close() session.close()
# Add to queue if available
if self.alert_queue is not None:
self.alert_queue.put(order)
def run(self): def run(self):
"""Run the whale watcher continuously.""" """Run the whale watcher continuously."""

View File

@ -1,23 +1,27 @@
from WhalesWatcher import WhalesWatcher from WhalesWatcher import WhalesWatcher
from ChartView import ChartView from ChartView import ChartView
import threading import threading
import queue
def start_whales_watcher(): def start_whales_watcher(alert_queue):
whales_watcher = WhalesWatcher() whales_watcher = WhalesWatcher(alert_queue)
whales_watcher.run() whales_watcher.run()
if __name__ == "__main__": if __name__ == "__main__":
# Create a queue for passing whale alerts between threads
whale_alert_queue = queue.Queue()
# Start WhalesWatcher in a separate thread # Start WhalesWatcher in a separate thread
# watcher_thread = threading.Thread(target=start_whales_watcher) watcher_thread = threading.Thread(target=start_whales_watcher, args=(whale_alert_queue,))
# watcher_thread.daemon = True watcher_thread.daemon = True
# Start the watcher thread # Start the watcher thread
# watcher_thread.start() watcher_thread.start()
# Define exchanges for the chart # Define exchanges for the chart
exchange_ids = ['binance', 'kraken', 'coinbase'] # Add more exchanges as needed exchange_ids = ['binance', 'kraken', 'coinbase', 'bitfinex', 'huobi', 'kucoin', 'bybit', 'okx', 'gateio', 'mexc']
# Start the chart viewer # Start the chart viewer with alert queue
chart_view = ChartView(exchange_ids) chart_view = ChartView(exchange_ids, alert_queue=whale_alert_queue)
# Run the GUI (this will block until the window is closed) # Run the GUI (this will block until the window is closed)
chart_view.start_gui() chart_view.start_gui()