518 lines
19 KiB
Python
518 lines
19 KiB
Python
"""
|
|
Desktop visualization application using PySide6 and PyQtGraph.
|
|
|
|
- OHLC candles (time-bucketed, gap-filled, width from timestamp span)
|
|
- Volume bars (width from timestamp span)
|
|
- OBI candles (same visuals as OHLC but blue)
|
|
- CVD line
|
|
- Depth chart (cumulative bids/asks)
|
|
- Crosshairs + data inspection
|
|
"""
|
|
import sys
|
|
import logging
|
|
from typing import List, Dict, Any, Optional
|
|
|
|
import numpy as np
|
|
import pyqtgraph as pg
|
|
from pyqtgraph import QtCore, QtGui
|
|
from pyqtgraph.graphicsItems.DateAxisItem import DateAxisItem
|
|
|
|
from PySide6.QtWidgets import QApplication, QMainWindow, QVBoxLayout, QWidget, QHBoxLayout
|
|
from PySide6.QtCore import Qt
|
|
|
|
from ohlc_processor import OHLCProcessor
|
|
|
|
|
|
# ---------------------------
|
|
# Candlestick Graphics Items
|
|
# ---------------------------
|
|
class _BaseCandle(pg.GraphicsObject):
|
|
"""
|
|
Base class for time-based candles:
|
|
- x-axis is epoch seconds (float)
|
|
- candle width derived from (timestamp_end - timestamp_start)
|
|
"""
|
|
def __init__(self, data: List[Dict[str, Any]], body_ratio: float = 0.8,
|
|
color_up="#00ff00", color_down="#ff0000", pen_width: int = 1):
|
|
super().__init__()
|
|
self.data = data
|
|
self.body_ratio = max(0.0, min(1.0, body_ratio))
|
|
self.pen_up = pg.mkPen(color=color_up, width=pen_width)
|
|
self.pen_down = pg.mkPen(color=color_down, width=pen_width)
|
|
self.brush_up = pg.mkBrush(color=color_up)
|
|
self.brush_down = pg.mkBrush(color=color_down)
|
|
self._picture = None
|
|
self.generatePicture()
|
|
|
|
def generatePicture(self):
|
|
pic = QtGui.QPicture()
|
|
p = QtGui.QPainter(pic)
|
|
|
|
for entry in self.data:
|
|
ts0_ms = entry.get("timestamp_start")
|
|
ts1_ms = entry.get("timestamp_end", ts0_ms)
|
|
if ts0_ms is None:
|
|
continue
|
|
|
|
x0 = ts0_ms / 1000.0
|
|
x1 = ts1_ms / 1000.0
|
|
x_center = 0.5 * (x0 + x1)
|
|
full_w = max(1e-9, x1 - x0) # seconds
|
|
body_w = full_w * self.body_ratio
|
|
|
|
o = float(entry["open"])
|
|
h = float(entry["high"])
|
|
l = float(entry["low"])
|
|
c = float(entry["close"])
|
|
|
|
is_up = c >= o
|
|
pen = self.pen_up if is_up else self.pen_down
|
|
brush = self.brush_up if is_up else self.brush_down
|
|
|
|
# wick
|
|
p.setPen(pen)
|
|
p.drawLine(QtCore.QPointF(x_center, l), QtCore.QPointF(x_center, h))
|
|
|
|
# body (ensure visible dojis)
|
|
body_h = abs(c - o)
|
|
if body_h == 0:
|
|
body_h = 1e-9
|
|
body_bottom = min(o, c)
|
|
p.setPen(pen)
|
|
p.setBrush(brush)
|
|
p.drawRect(QtCore.QRectF(x_center - body_w / 2.0, body_bottom, body_w, body_h))
|
|
|
|
p.end()
|
|
self._picture = pic
|
|
|
|
def paint(self, painter, *args):
|
|
if self._picture:
|
|
painter.drawPicture(0, 0, self._picture)
|
|
|
|
def boundingRect(self):
|
|
if self._picture:
|
|
return QtCore.QRectF(self._picture.boundingRect())
|
|
return QtCore.QRectF()
|
|
|
|
|
|
class OHLCItem(_BaseCandle):
|
|
"""Green/Red candlesticks."""
|
|
def __init__(self, data: List[Dict[str, Any]], body_ratio: float = 0.8):
|
|
super().__init__(data, body_ratio=body_ratio, color_up="#00ff00", color_down="#ff0000")
|
|
|
|
|
|
class OBIItem(_BaseCandle):
|
|
"""Blue-themed candlesticks for OBI."""
|
|
def __init__(self, data: List[Dict[str, Any]], body_ratio: float = 0.8):
|
|
super().__init__(data, body_ratio=body_ratio, color_up="#4a9eff", color_down="#1f5f99")
|
|
|
|
|
|
# ---------------------------
|
|
# Main Window
|
|
# ---------------------------
|
|
class MainWindow(QMainWindow):
|
|
"""Main application window for orderflow visualization."""
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.ohlc_data: List[Dict[str, Any]] = []
|
|
self.metrics_data: List[Any] = [] # adapt shape to your MetricsCalculator
|
|
# self.depth_data: Dict[str, List[List[float]]] = {"bids": [], "asks": []}
|
|
|
|
self._init_ui()
|
|
|
|
# ----- UI setup -----
|
|
def _init_ui(self):
|
|
self.setWindowTitle("Orderflow Backtest Visualizer")
|
|
self.resize(1400, 900)
|
|
|
|
central = QWidget(self)
|
|
self.setCentralWidget(central)
|
|
main_layout = QHBoxLayout(central)
|
|
|
|
charts_widget = QWidget()
|
|
charts_layout = QVBoxLayout(charts_widget)
|
|
|
|
# depth_widget = QWidget()
|
|
# depth_layout = QVBoxLayout(depth_widget)
|
|
|
|
# PG appearance
|
|
pg.setConfigOptions(antialias=True, background="k", foreground="w")
|
|
|
|
# Date axis for time plots
|
|
date_axis = DateAxisItem(orientation="bottom")
|
|
|
|
self.ohlc_plot = pg.PlotWidget(title="OHLC", axisItems={"bottom": date_axis})
|
|
self.ohlc_plot.setLabel("left", "Price", units="USD")
|
|
self.ohlc_plot.showGrid(x=True, y=True, alpha=0.3)
|
|
|
|
self.volume_plot = pg.PlotWidget(title="Volume", axisItems={"bottom": DateAxisItem(orientation="bottom")})
|
|
self.volume_plot.setLabel("left", "Volume")
|
|
self.volume_plot.showGrid(x=True, y=True, alpha=0.3)
|
|
|
|
self.obi_plot = pg.PlotWidget(title="Order Book Imbalance (OBI)", axisItems={"bottom": DateAxisItem(orientation="bottom")})
|
|
self.obi_plot.setLabel("left", "OBI")
|
|
self.obi_plot.showGrid(x=True, y=True, alpha=0.3)
|
|
|
|
self.cvd_plot = pg.PlotWidget(title="Cumulative Volume Delta (CVD)", axisItems={"bottom": DateAxisItem(orientation="bottom")})
|
|
self.cvd_plot.setLabel("left", "CVD")
|
|
self.cvd_plot.showGrid(x=True, y=True, alpha=0.3)
|
|
|
|
# Depth (not time x-axis)
|
|
# self.depth_plot = pg.PlotWidget(title="Order Book Depth")
|
|
# self.depth_plot.setLabel("left", "Price", units="USD")
|
|
# self.depth_plot.setLabel("bottom", "Cumulative Volume")
|
|
# self.depth_plot.showGrid(x=True, y=True, alpha=0.3)
|
|
|
|
# Link x-axes
|
|
self.volume_plot.setXLink(self.ohlc_plot)
|
|
self.obi_plot.setXLink(self.ohlc_plot)
|
|
self.cvd_plot.setXLink(self.ohlc_plot)
|
|
|
|
# Crosshairs and interactions
|
|
self._setup_crosshairs()
|
|
self._setup_double_click_autorange()
|
|
|
|
# Layout weights
|
|
charts_layout.addWidget(self.ohlc_plot, 1)
|
|
charts_layout.addWidget(self.volume_plot, 1)
|
|
charts_layout.addWidget(self.obi_plot, 1)
|
|
charts_layout.addWidget(self.cvd_plot, 1)
|
|
|
|
# depth_layout.addWidget(self.depth_plot)
|
|
|
|
main_layout.addWidget(charts_widget, 3)
|
|
# main_layout.addWidget(depth_widget, 1)
|
|
|
|
logging.info("UI setup completed")
|
|
|
|
def _setup_double_click_autorange(self):
|
|
def _auto_range(_event):
|
|
try:
|
|
self.ohlc_plot.autoRange()
|
|
self.volume_plot.autoRange()
|
|
self.obi_plot.autoRange()
|
|
self.cvd_plot.autoRange()
|
|
except Exception as e:
|
|
logging.debug(f"Auto-range error: {e}")
|
|
|
|
# Attach to ViewBox mouse double-click
|
|
self.ohlc_plot.plotItem.vb.mouseDoubleClickEvent = _auto_range
|
|
|
|
def _setup_crosshairs(self):
|
|
# One vertical line per plot (a single item cannot be in multiple ViewBoxes)
|
|
pen = pg.mkPen(color="#888888", width=1, style=Qt.DashLine)
|
|
|
|
self.vline_ohlc = pg.InfiniteLine(angle=90, movable=False, pen=pen)
|
|
self.vline_volume = pg.InfiniteLine(angle=90, movable=False, pen=pen)
|
|
self.vline_obi = pg.InfiniteLine(angle=90, movable=False, pen=pen)
|
|
self.vline_cvd = pg.InfiniteLine(angle=90, movable=False, pen=pen)
|
|
|
|
self.hline_ohlc = pg.InfiniteLine(angle=0, movable=False, pen=pen)
|
|
self.hline_volume = pg.InfiniteLine(angle=0, movable=False, pen=pen)
|
|
self.hline_obi = pg.InfiniteLine(angle=0, movable=False, pen=pen)
|
|
self.hline_cvd = pg.InfiniteLine(angle=0, movable=False, pen=pen)
|
|
|
|
# Attach to plots
|
|
self.ohlc_plot.addItem(self.vline_ohlc, ignoreBounds=True)
|
|
self.volume_plot.addItem(self.vline_volume, ignoreBounds=True)
|
|
self.obi_plot.addItem(self.vline_obi, ignoreBounds=True)
|
|
self.cvd_plot.addItem(self.vline_cvd, ignoreBounds=True)
|
|
|
|
self.ohlc_plot.addItem(self.hline_ohlc, ignoreBounds=True)
|
|
self.volume_plot.addItem(self.hline_volume, ignoreBounds=True)
|
|
self.obi_plot.addItem(self.hline_obi, ignoreBounds=True)
|
|
self.cvd_plot.addItem(self.hline_cvd, ignoreBounds=True)
|
|
|
|
# Mouse move
|
|
for plot in [self.ohlc_plot, self.volume_plot, self.obi_plot, self.cvd_plot]:
|
|
plot.scene().sigMouseMoved.connect(self._on_mouse_moved)
|
|
|
|
# Data label
|
|
self.data_label = pg.LabelItem(justify="left")
|
|
self.ohlc_plot.addItem(self.data_label)
|
|
|
|
# ----- Data ingestion from processor -----
|
|
def update_data(self, data_processor: OHLCProcessor, nb_bars: int):
|
|
"""
|
|
Pull latest bars/metrics from the processor and refresh plots.
|
|
Call this whenever you've added trade/book data + processor.flush().
|
|
"""
|
|
self.ohlc_data = data_processor.bars[-nb_bars:] or []
|
|
self.metrics_data = data_processor.get_metrics_series()
|
|
|
|
self.metrics_data = {
|
|
"cvd": self.metrics_data["cvd"][-nb_bars:],
|
|
"obi": self.metrics_data["obi"][-nb_bars:]
|
|
}
|
|
|
|
self._update_all_plots()
|
|
|
|
# ----- Plot updates -----
|
|
def _update_all_plots(self):
|
|
self._update_ohlc_plot()
|
|
self._update_volume_plot()
|
|
self._update_obi_plot()
|
|
self._update_cvd_plot()
|
|
# self._update_depth_plot()
|
|
|
|
def _clear_plot_items_but_crosshair(self, plot: pg.PlotWidget):
|
|
protected = (pg.InfiniteLine, pg.LabelItem)
|
|
items = [it for it in plot.items() if not isinstance(it, protected)]
|
|
for it in items:
|
|
plot.removeItem(it)
|
|
|
|
def _update_ohlc_plot(self):
|
|
if not self.ohlc_data:
|
|
return
|
|
self._clear_plot_items_but_crosshair(self.ohlc_plot)
|
|
self.ohlc_plot.addItem(OHLCItem(self.ohlc_data, body_ratio=0.8))
|
|
|
|
first_ts = self.ohlc_data[0]["timestamp_start"] / 1000.0
|
|
last_ts = self.ohlc_data[-1].get("timestamp_end", self.ohlc_data[-1]["timestamp_start"]) / 1000.0
|
|
self.ohlc_plot.setXRange(first_ts, last_ts)
|
|
|
|
lows = [bar["low"] for bar in self.ohlc_data]
|
|
highs = [bar["high"] for bar in self.ohlc_data]
|
|
self.ohlc_plot.setYRange(min(lows), max(highs))
|
|
|
|
def _update_volume_plot(self):
|
|
if not self.ohlc_data:
|
|
return
|
|
self._clear_plot_items_but_crosshair(self.volume_plot)
|
|
|
|
# Build centered volume bars sized to bucket width
|
|
for i, bar in enumerate(self.ohlc_data):
|
|
ts0 = bar["timestamp_start"] / 1000.0
|
|
ts1 = bar.get("timestamp_end", bar["timestamp_start"]) / 1000.0
|
|
full_w = max(1e-9, ts1 - ts0)
|
|
bar_w = full_w * 0.8
|
|
x_center = 0.5 * (ts0 + ts1)
|
|
|
|
if i > 0:
|
|
prev_close = self.ohlc_data[i - 1]["close"]
|
|
color = "#00ff00" if bar["close"] >= prev_close else "#ff0000"
|
|
else:
|
|
color = "#888888"
|
|
|
|
vol = float(bar["volume"])
|
|
self.volume_plot.addItem(pg.BarGraphItem(x=[x_center], height=[vol], width=bar_w, brush=color))
|
|
|
|
def _update_obi_plot(self):
|
|
"""
|
|
Update OBI panel with candlesticks from metrics_data.
|
|
Each row expected: [ts_start_ms, ts_end_ms, obi_o, obi_h, obi_l, obi_c, cvd]
|
|
"""
|
|
if not self.metrics_data:
|
|
return
|
|
self._clear_plot_items_but_crosshair(self.obi_plot)
|
|
|
|
# Convert metrics_data rows to candle dicts
|
|
candlesticks = []
|
|
for row in self.metrics_data['obi']:
|
|
if len(row) >= 6:
|
|
ts0, ts1, o, h, l, c = row[:6]
|
|
candlesticks.append({
|
|
"timestamp_start": int(ts0),
|
|
"timestamp_end": int(ts1),
|
|
"open": float(o),
|
|
"high": float(h),
|
|
"low": float(l),
|
|
"close": float(c),
|
|
"volume": 0.0,
|
|
})
|
|
|
|
if candlesticks:
|
|
self.obi_plot.addItem(OBIItem(candlesticks, body_ratio=0.8))
|
|
|
|
# Also set Y range explicitly
|
|
lows = [c["low"] for c in candlesticks]
|
|
highs = [c["high"] for c in candlesticks]
|
|
self.obi_plot.setYRange(min(lows), max(highs))
|
|
|
|
def _update_cvd_plot(self):
|
|
"""
|
|
Plot CVD as a line if present at index 6, else skip.
|
|
"""
|
|
# if not self.metrics_data:
|
|
# return
|
|
# self._clear_plot_items_but_crosshair(self.cvd_plot)
|
|
|
|
# xs = []
|
|
# ys = []
|
|
# for row in self.metrics_data['cvd']:
|
|
# if len(row) >= 7:
|
|
# ts0 = row[0] / 1000.0
|
|
# cvd_val = float(row[6])
|
|
# xs.append(ts0)
|
|
# ys.append(cvd_val)
|
|
# if xs:
|
|
# self.cvd_plot.plot(xs, ys, pen=pg.mkPen(color="#ffff00", width=2), name="CVD")
|
|
|
|
|
|
if not self.metrics_data:
|
|
return
|
|
self._clear_plot_items_but_crosshair(self.cvd_plot)
|
|
|
|
# Convert metrics_data rows to candle dicts
|
|
candlesticks = []
|
|
for row in self.metrics_data['cvd']:
|
|
if len(row) >= 6:
|
|
ts0, ts1, o, h, l, c = row[:6]
|
|
candlesticks.append({
|
|
"timestamp_start": int(ts0),
|
|
"timestamp_end": int(ts1),
|
|
"open": float(o),
|
|
"high": float(h),
|
|
"low": float(l),
|
|
"close": float(c),
|
|
"volume": 0.0,
|
|
})
|
|
|
|
if candlesticks:
|
|
self.cvd_plot.addItem(OBIItem(candlesticks, body_ratio=0.8))
|
|
|
|
# Also set Y range explicitly
|
|
lows = [c["low"] for c in candlesticks]
|
|
highs = [c["high"] for c in candlesticks]
|
|
self.cvd_plot.setYRange(min(lows), max(highs))
|
|
|
|
# ----- Depth chart -----
|
|
@staticmethod
|
|
def _cumulate_levels(levels, reverse=False, limit=50):
|
|
"""
|
|
levels: list of [price, size]
|
|
returns list of (price, cum_volume)
|
|
"""
|
|
if not levels:
|
|
return []
|
|
try:
|
|
sorted_lvls = sorted(levels[:limit], key=lambda x: x[0], reverse=reverse)
|
|
cum = []
|
|
total = 0.0
|
|
for price, size in sorted_lvls:
|
|
total += float(size)
|
|
cum.append((float(price), total))
|
|
return cum
|
|
except Exception as e:
|
|
logging.warning(f"cumulate_levels error: {e}")
|
|
return []
|
|
|
|
def _update_depth_plot(self):
|
|
if not self.depth_data or not isinstance(self.depth_data, dict):
|
|
return
|
|
self.depth_plot.clear()
|
|
|
|
bids = self.depth_data.get("bids", [])
|
|
asks = self.depth_data.get("asks", [])
|
|
|
|
cum_bids = self._cumulate_levels(bids, reverse=True, limit=50)
|
|
cum_asks = self._cumulate_levels(asks, reverse=False, limit=50)
|
|
|
|
if cum_bids:
|
|
bid_prices = [p for p, _ in cum_bids]
|
|
bid_vols = [v for _, v in cum_bids]
|
|
self.depth_plot.plot(bid_vols, bid_prices, pen=pg.mkPen(color="#00c800", width=2),
|
|
stepMode="left", name="Bids")
|
|
fill_b = pg.PlotCurveItem(bid_vols + [0], bid_prices + [bid_prices[-1]], fillLevel=0,
|
|
fillBrush=pg.mkBrush(color=(0, 200, 0, 50)))
|
|
self.depth_plot.addItem(fill_b)
|
|
|
|
if cum_asks:
|
|
ask_prices = [p for p, _ in cum_asks]
|
|
ask_vols = [v for _, v in cum_asks]
|
|
self.depth_plot.plot(ask_vols, ask_prices, pen=pg.mkPen(color="#c80000", width=2),
|
|
stepMode="left", name="Asks")
|
|
fill_a = pg.PlotCurveItem(ask_vols + [0], ask_prices + [ask_prices[-1]], fillLevel=0,
|
|
fillBrush=pg.mkBrush(color=(200, 0, 0, 50)))
|
|
self.depth_plot.addItem(fill_a)
|
|
|
|
# ----- Crosshair + inspection -----
|
|
def _on_mouse_moved(self, pos):
|
|
try:
|
|
sender = self.sender()
|
|
if not sender:
|
|
return
|
|
target = None
|
|
for plot in [self.ohlc_plot, self.volume_plot, self.obi_plot, self.cvd_plot]:
|
|
if plot.scene() == sender:
|
|
target = plot
|
|
break
|
|
if target is None:
|
|
return
|
|
|
|
if target.sceneBoundingRect().contains(pos):
|
|
mp = target.plotItem.vb.mapSceneToView(pos)
|
|
x = mp.x()
|
|
y = mp.y()
|
|
|
|
# Move all vertical lines to the same x (keep panels aligned)
|
|
self.vline_ohlc.setPos(x)
|
|
self.vline_volume.setPos(x)
|
|
self.vline_obi.setPos(x)
|
|
self.vline_cvd.setPos(x)
|
|
|
|
# Move only the active panel's horizontal line
|
|
if target == self.ohlc_plot:
|
|
self.hline_ohlc.setPos(y)
|
|
elif target == self.volume_plot:
|
|
self.hline_volume.setPos(y)
|
|
elif target == self.obi_plot:
|
|
self.hline_obi.setPos(y)
|
|
elif target == self.cvd_plot:
|
|
self.hline_cvd.setPos(y)
|
|
|
|
self._update_data_label(x)
|
|
except Exception as e:
|
|
logging.debug(f"mouse move err: {e}")
|
|
|
|
def _update_data_label(self, x_seconds: float):
|
|
try:
|
|
parts = []
|
|
|
|
if self.ohlc_data:
|
|
closest = self._closest_bar(self.ohlc_data, x_seconds)
|
|
if closest:
|
|
parts.append(f"Time: {self._fmt_ts(closest['timestamp_start'])}")
|
|
parts.append(f"OHLC O:{closest['open']:.2f} H:{closest['high']:.2f} "
|
|
f"L:{closest['low']:.2f} C:{closest['close']:.2f}")
|
|
parts.append(f"Vol: {float(closest['volume']):.6f}")
|
|
|
|
if self.metrics_data:
|
|
# Optional: display OBI/CVD values if present
|
|
return #fix access to dict
|
|
row = self._closest_metrics_row(self.metrics_data, x_seconds)
|
|
if row and len(row) >= 6:
|
|
_, _, o, h, l, c = row[:6]
|
|
parts.append(f"OBI O:{o:.2f} H:{h:.2f} L:{l:.2f} C:{c:.2f}")
|
|
if row and len(row) >= 7:
|
|
parts.append(f"CVD: {float(row[6]):.2f}")
|
|
|
|
self.data_label.setText("<br>".join(parts) if parts else "No data")
|
|
except Exception as e:
|
|
logging.debug(f"update label err: {e}")
|
|
|
|
@staticmethod
|
|
def _closest_bar(bars: List[Dict[str, Any]], x_seconds: float) -> Optional[Dict[str, Any]]:
|
|
if not bars:
|
|
return None
|
|
x_ms = x_seconds * 1000.0
|
|
closest = min(bars, key=lambda b: abs(b["timestamp_start"] - x_ms))
|
|
return closest
|
|
|
|
@staticmethod
|
|
def _closest_metrics_row(rows: List[Any], x_seconds: float) -> Optional[Any]:
|
|
if not rows:
|
|
return None
|
|
x_ms = x_seconds * 1000.0
|
|
try:
|
|
return min(rows, key=lambda r: abs(r[0] - x_ms))
|
|
except Exception:
|
|
return None
|
|
|
|
@staticmethod
|
|
def _fmt_ts(ts_ms: int) -> str:
|
|
from datetime import datetime
|
|
try:
|
|
return datetime.fromtimestamp(ts_ms / 1000.0).strftime("%Y-%m-%d %H:%M:%S")
|
|
except Exception:
|
|
return str(int(ts_ms)) |