orderflow_backtest/desktop_app.py

518 lines
19 KiB
Python
Raw Permalink Normal View History

2025-09-10 15:39:16 +08:00
"""
Desktop visualization application using PySide6 and PyQtGraph.
- OHLC candles (time-bucketed, gap-filled, width from timestamp span)
- Volume bars (width from timestamp span)
- OBI candles (same visuals as OHLC but blue)
- CVD line
- Depth chart (cumulative bids/asks)
- Crosshairs + data inspection
2025-09-10 15:39:16 +08:00
"""
import sys
import logging
from typing import List, Dict, Any, Optional
import numpy as np
2025-09-10 15:39:16 +08:00
import pyqtgraph as pg
from pyqtgraph import QtCore, QtGui
from pyqtgraph.graphicsItems.DateAxisItem import DateAxisItem
from PySide6.QtWidgets import QApplication, QMainWindow, QVBoxLayout, QWidget, QHBoxLayout
from PySide6.QtCore import Qt
2025-09-10 15:39:16 +08:00
from ohlc_processor import OHLCProcessor
# ---------------------------
# Candlestick Graphics Items
# ---------------------------
class _BaseCandle(pg.GraphicsObject):
"""
Base class for time-based candles:
- x-axis is epoch seconds (float)
- candle width derived from (timestamp_end - timestamp_start)
"""
def __init__(self, data: List[Dict[str, Any]], body_ratio: float = 0.8,
color_up="#00ff00", color_down="#ff0000", pen_width: int = 1):
super().__init__()
2025-09-10 15:39:16 +08:00
self.data = data
self.body_ratio = max(0.0, min(1.0, body_ratio))
self.pen_up = pg.mkPen(color=color_up, width=pen_width)
self.pen_down = pg.mkPen(color=color_down, width=pen_width)
self.brush_up = pg.mkBrush(color=color_up)
self.brush_down = pg.mkBrush(color=color_down)
self._picture = None
2025-09-10 15:39:16 +08:00
self.generatePicture()
2025-09-10 15:39:16 +08:00
def generatePicture(self):
pic = QtGui.QPicture()
p = QtGui.QPainter(pic)
for entry in self.data:
ts0_ms = entry.get("timestamp_start")
ts1_ms = entry.get("timestamp_end", ts0_ms)
if ts0_ms is None:
continue
x0 = ts0_ms / 1000.0
x1 = ts1_ms / 1000.0
x_center = 0.5 * (x0 + x1)
full_w = max(1e-9, x1 - x0) # seconds
body_w = full_w * self.body_ratio
o = float(entry["open"])
h = float(entry["high"])
l = float(entry["low"])
c = float(entry["close"])
is_up = c >= o
pen = self.pen_up if is_up else self.pen_down
brush = self.brush_up if is_up else self.brush_down
# wick
p.setPen(pen)
p.drawLine(QtCore.QPointF(x_center, l), QtCore.QPointF(x_center, h))
# body (ensure visible dojis)
body_h = abs(c - o)
if body_h == 0:
body_h = 1e-9
body_bottom = min(o, c)
p.setPen(pen)
p.setBrush(brush)
p.drawRect(QtCore.QRectF(x_center - body_w / 2.0, body_bottom, body_w, body_h))
p.end()
self._picture = pic
def paint(self, painter, *args):
if self._picture:
painter.drawPicture(0, 0, self._picture)
2025-09-10 15:39:16 +08:00
def boundingRect(self):
if self._picture:
return QtCore.QRectF(self._picture.boundingRect())
return QtCore.QRectF()
class OHLCItem(_BaseCandle):
"""Green/Red candlesticks."""
def __init__(self, data: List[Dict[str, Any]], body_ratio: float = 0.8):
super().__init__(data, body_ratio=body_ratio, color_up="#00ff00", color_down="#ff0000")
class OBIItem(_BaseCandle):
"""Blue-themed candlesticks for OBI."""
def __init__(self, data: List[Dict[str, Any]], body_ratio: float = 0.8):
super().__init__(data, body_ratio=body_ratio, color_up="#4a9eff", color_down="#1f5f99")
2025-09-10 15:39:16 +08:00
# ---------------------------
# Main Window
# ---------------------------
2025-09-10 15:39:16 +08:00
class MainWindow(QMainWindow):
"""Main application window for orderflow visualization."""
def __init__(self):
super().__init__()
self.ohlc_data: List[Dict[str, Any]] = []
self.metrics_data: List[Any] = [] # adapt shape to your MetricsCalculator
# self.depth_data: Dict[str, List[List[float]]] = {"bids": [], "asks": []}
self._init_ui()
# ----- UI setup -----
def _init_ui(self):
2025-09-10 15:39:16 +08:00
self.setWindowTitle("Orderflow Backtest Visualizer")
self.resize(1400, 900)
central = QWidget(self)
self.setCentralWidget(central)
main_layout = QHBoxLayout(central)
2025-09-10 15:39:16 +08:00
charts_widget = QWidget()
charts_layout = QVBoxLayout(charts_widget)
# depth_widget = QWidget()
# depth_layout = QVBoxLayout(depth_widget)
# PG appearance
pg.setConfigOptions(antialias=True, background="k", foreground="w")
# Date axis for time plots
date_axis = DateAxisItem(orientation="bottom")
self.ohlc_plot = pg.PlotWidget(title="OHLC", axisItems={"bottom": date_axis})
self.ohlc_plot.setLabel("left", "Price", units="USD")
2025-09-10 15:39:16 +08:00
self.ohlc_plot.showGrid(x=True, y=True, alpha=0.3)
self.volume_plot = pg.PlotWidget(title="Volume", axisItems={"bottom": DateAxisItem(orientation="bottom")})
self.volume_plot.setLabel("left", "Volume")
2025-09-10 15:39:16 +08:00
self.volume_plot.showGrid(x=True, y=True, alpha=0.3)
self.obi_plot = pg.PlotWidget(title="Order Book Imbalance (OBI)", axisItems={"bottom": DateAxisItem(orientation="bottom")})
self.obi_plot.setLabel("left", "OBI")
2025-09-10 15:39:16 +08:00
self.obi_plot.showGrid(x=True, y=True, alpha=0.3)
self.cvd_plot = pg.PlotWidget(title="Cumulative Volume Delta (CVD)", axisItems={"bottom": DateAxisItem(orientation="bottom")})
self.cvd_plot.setLabel("left", "CVD")
2025-09-10 15:39:16 +08:00
self.cvd_plot.showGrid(x=True, y=True, alpha=0.3)
# Depth (not time x-axis)
# self.depth_plot = pg.PlotWidget(title="Order Book Depth")
# self.depth_plot.setLabel("left", "Price", units="USD")
# self.depth_plot.setLabel("bottom", "Cumulative Volume")
# self.depth_plot.showGrid(x=True, y=True, alpha=0.3)
# Link x-axes
2025-09-10 15:39:16 +08:00
self.volume_plot.setXLink(self.ohlc_plot)
self.obi_plot.setXLink(self.ohlc_plot)
self.cvd_plot.setXLink(self.ohlc_plot)
# Crosshairs and interactions
2025-09-10 15:39:16 +08:00
self._setup_crosshairs()
self._setup_double_click_autorange()
# Layout weights
charts_layout.addWidget(self.ohlc_plot, 1)
2025-09-10 15:39:16 +08:00
charts_layout.addWidget(self.volume_plot, 1)
charts_layout.addWidget(self.obi_plot, 1)
2025-09-10 15:39:16 +08:00
charts_layout.addWidget(self.cvd_plot, 1)
# depth_layout.addWidget(self.depth_plot)
2025-09-10 15:39:16 +08:00
main_layout.addWidget(charts_widget, 3)
# main_layout.addWidget(depth_widget, 1)
2025-09-10 15:39:16 +08:00
logging.info("UI setup completed")
def _setup_double_click_autorange(self):
def _auto_range(_event):
try:
self.ohlc_plot.autoRange()
self.volume_plot.autoRange()
self.obi_plot.autoRange()
self.cvd_plot.autoRange()
except Exception as e:
logging.debug(f"Auto-range error: {e}")
# Attach to ViewBox mouse double-click
self.ohlc_plot.plotItem.vb.mouseDoubleClickEvent = _auto_range
2025-09-10 15:39:16 +08:00
def _setup_crosshairs(self):
# One vertical line per plot (a single item cannot be in multiple ViewBoxes)
pen = pg.mkPen(color="#888888", width=1, style=Qt.DashLine)
self.vline_ohlc = pg.InfiniteLine(angle=90, movable=False, pen=pen)
self.vline_volume = pg.InfiniteLine(angle=90, movable=False, pen=pen)
self.vline_obi = pg.InfiniteLine(angle=90, movable=False, pen=pen)
self.vline_cvd = pg.InfiniteLine(angle=90, movable=False, pen=pen)
self.hline_ohlc = pg.InfiniteLine(angle=0, movable=False, pen=pen)
self.hline_volume = pg.InfiniteLine(angle=0, movable=False, pen=pen)
self.hline_obi = pg.InfiniteLine(angle=0, movable=False, pen=pen)
self.hline_cvd = pg.InfiniteLine(angle=0, movable=False, pen=pen)
# Attach to plots
self.ohlc_plot.addItem(self.vline_ohlc, ignoreBounds=True)
self.volume_plot.addItem(self.vline_volume, ignoreBounds=True)
self.obi_plot.addItem(self.vline_obi, ignoreBounds=True)
self.cvd_plot.addItem(self.vline_cvd, ignoreBounds=True)
2025-09-10 15:39:16 +08:00
self.ohlc_plot.addItem(self.hline_ohlc, ignoreBounds=True)
self.volume_plot.addItem(self.hline_volume, ignoreBounds=True)
self.obi_plot.addItem(self.hline_obi, ignoreBounds=True)
self.cvd_plot.addItem(self.hline_cvd, ignoreBounds=True)
# Mouse move
for plot in [self.ohlc_plot, self.volume_plot, self.obi_plot, self.cvd_plot]:
plot.scene().sigMouseMoved.connect(self._on_mouse_moved)
# Data label
self.data_label = pg.LabelItem(justify="left")
2025-09-10 15:39:16 +08:00
self.ohlc_plot.addItem(self.data_label)
# ----- Data ingestion from processor -----
def update_data(self, data_processor: OHLCProcessor, nb_bars: int):
"""
Pull latest bars/metrics from the processor and refresh plots.
Call this whenever you've added trade/book data + processor.flush().
"""
self.ohlc_data = data_processor.bars[-nb_bars:] or []
self.metrics_data = data_processor.get_metrics_series()
self.metrics_data = {
"cvd": self.metrics_data["cvd"][-nb_bars:],
"obi": self.metrics_data["obi"][-nb_bars:]
}
self._update_all_plots()
# ----- Plot updates -----
2025-09-10 15:39:16 +08:00
def _update_all_plots(self):
self._update_ohlc_plot()
self._update_volume_plot()
self._update_obi_plot()
self._update_cvd_plot()
# self._update_depth_plot()
def _clear_plot_items_but_crosshair(self, plot: pg.PlotWidget):
protected = (pg.InfiniteLine, pg.LabelItem)
items = [it for it in plot.items() if not isinstance(it, protected)]
for it in items:
plot.removeItem(it)
2025-09-10 15:39:16 +08:00
def _update_ohlc_plot(self):
if not self.ohlc_data:
return
self._clear_plot_items_but_crosshair(self.ohlc_plot)
self.ohlc_plot.addItem(OHLCItem(self.ohlc_data, body_ratio=0.8))
first_ts = self.ohlc_data[0]["timestamp_start"] / 1000.0
last_ts = self.ohlc_data[-1].get("timestamp_end", self.ohlc_data[-1]["timestamp_start"]) / 1000.0
self.ohlc_plot.setXRange(first_ts, last_ts)
lows = [bar["low"] for bar in self.ohlc_data]
highs = [bar["high"] for bar in self.ohlc_data]
self.ohlc_plot.setYRange(min(lows), max(highs))
2025-09-10 15:39:16 +08:00
def _update_volume_plot(self):
if not self.ohlc_data:
return
self._clear_plot_items_but_crosshair(self.volume_plot)
# Build centered volume bars sized to bucket width
for i, bar in enumerate(self.ohlc_data):
ts0 = bar["timestamp_start"] / 1000.0
ts1 = bar.get("timestamp_end", bar["timestamp_start"]) / 1000.0
full_w = max(1e-9, ts1 - ts0)
bar_w = full_w * 0.8
x_center = 0.5 * (ts0 + ts1)
2025-09-10 15:39:16 +08:00
if i > 0:
prev_close = self.ohlc_data[i - 1]["close"]
color = "#00ff00" if bar["close"] >= prev_close else "#ff0000"
2025-09-10 15:39:16 +08:00
else:
color = "#888888"
vol = float(bar["volume"])
self.volume_plot.addItem(pg.BarGraphItem(x=[x_center], height=[vol], width=bar_w, brush=color))
def _update_obi_plot(self):
"""
Update OBI panel with candlesticks from metrics_data.
Each row expected: [ts_start_ms, ts_end_ms, obi_o, obi_h, obi_l, obi_c, cvd]
"""
2025-09-10 15:39:16 +08:00
if not self.metrics_data:
return
self._clear_plot_items_but_crosshair(self.obi_plot)
# Convert metrics_data rows to candle dicts
candlesticks = []
for row in self.metrics_data['obi']:
if len(row) >= 6:
ts0, ts1, o, h, l, c = row[:6]
candlesticks.append({
"timestamp_start": int(ts0),
"timestamp_end": int(ts1),
"open": float(o),
"high": float(h),
"low": float(l),
"close": float(c),
"volume": 0.0,
})
if candlesticks:
self.obi_plot.addItem(OBIItem(candlesticks, body_ratio=0.8))
# Also set Y range explicitly
lows = [c["low"] for c in candlesticks]
highs = [c["high"] for c in candlesticks]
self.obi_plot.setYRange(min(lows), max(highs))
2025-09-10 15:39:16 +08:00
def _update_cvd_plot(self):
"""
Plot CVD as a line if present at index 6, else skip.
"""
# if not self.metrics_data:
# return
# self._clear_plot_items_but_crosshair(self.cvd_plot)
# xs = []
# ys = []
# for row in self.metrics_data['cvd']:
# if len(row) >= 7:
# ts0 = row[0] / 1000.0
# cvd_val = float(row[6])
# xs.append(ts0)
# ys.append(cvd_val)
# if xs:
# self.cvd_plot.plot(xs, ys, pen=pg.mkPen(color="#ffff00", width=2), name="CVD")
2025-09-10 15:39:16 +08:00
if not self.metrics_data:
return
self._clear_plot_items_but_crosshair(self.cvd_plot)
# Convert metrics_data rows to candle dicts
candlesticks = []
for row in self.metrics_data['cvd']:
if len(row) >= 6:
ts0, ts1, o, h, l, c = row[:6]
candlesticks.append({
"timestamp_start": int(ts0),
"timestamp_end": int(ts1),
"open": float(o),
"high": float(h),
"low": float(l),
"close": float(c),
"volume": 0.0,
})
if candlesticks:
self.cvd_plot.addItem(OBIItem(candlesticks, body_ratio=0.8))
# Also set Y range explicitly
lows = [c["low"] for c in candlesticks]
highs = [c["high"] for c in candlesticks]
self.cvd_plot.setYRange(min(lows), max(highs))
# ----- Depth chart -----
@staticmethod
def _cumulate_levels(levels, reverse=False, limit=50):
2025-09-10 15:39:16 +08:00
"""
levels: list of [price, size]
returns list of (price, cum_volume)
2025-09-10 15:39:16 +08:00
"""
if not levels:
return []
try:
sorted_lvls = sorted(levels[:limit], key=lambda x: x[0], reverse=reverse)
cum = []
total = 0.0
for price, size in sorted_lvls:
total += float(size)
cum.append((float(price), total))
return cum
2025-09-10 15:39:16 +08:00
except Exception as e:
logging.warning(f"cumulate_levels error: {e}")
2025-09-10 15:39:16 +08:00
return []
2025-09-10 15:39:16 +08:00
def _update_depth_plot(self):
if not self.depth_data or not isinstance(self.depth_data, dict):
return
self.depth_plot.clear()
bids = self.depth_data.get("bids", [])
asks = self.depth_data.get("asks", [])
2025-09-10 15:39:16 +08:00
cum_bids = self._cumulate_levels(bids, reverse=True, limit=50)
cum_asks = self._cumulate_levels(asks, reverse=False, limit=50)
2025-09-10 15:39:16 +08:00
if cum_bids:
bid_prices = [p for p, _ in cum_bids]
bid_vols = [v for _, v in cum_bids]
self.depth_plot.plot(bid_vols, bid_prices, pen=pg.mkPen(color="#00c800", width=2),
stepMode="left", name="Bids")
fill_b = pg.PlotCurveItem(bid_vols + [0], bid_prices + [bid_prices[-1]], fillLevel=0,
fillBrush=pg.mkBrush(color=(0, 200, 0, 50)))
self.depth_plot.addItem(fill_b)
2025-09-10 15:39:16 +08:00
if cum_asks:
ask_prices = [p for p, _ in cum_asks]
ask_vols = [v for _, v in cum_asks]
self.depth_plot.plot(ask_vols, ask_prices, pen=pg.mkPen(color="#c80000", width=2),
stepMode="left", name="Asks")
fill_a = pg.PlotCurveItem(ask_vols + [0], ask_prices + [ask_prices[-1]], fillLevel=0,
fillBrush=pg.mkBrush(color=(200, 0, 0, 50)))
self.depth_plot.addItem(fill_a)
# ----- Crosshair + inspection -----
2025-09-10 15:39:16 +08:00
def _on_mouse_moved(self, pos):
try:
sender = self.sender()
if not sender:
return
target = None
2025-09-10 15:39:16 +08:00
for plot in [self.ohlc_plot, self.volume_plot, self.obi_plot, self.cvd_plot]:
if plot.scene() == sender:
target = plot
2025-09-10 15:39:16 +08:00
break
if target is None:
2025-09-10 15:39:16 +08:00
return
if target.sceneBoundingRect().contains(pos):
mp = target.plotItem.vb.mapSceneToView(pos)
x = mp.x()
y = mp.y()
# Move all vertical lines to the same x (keep panels aligned)
self.vline_ohlc.setPos(x)
self.vline_volume.setPos(x)
self.vline_obi.setPos(x)
self.vline_cvd.setPos(x)
# Move only the active panel's horizontal line
if target == self.ohlc_plot:
self.hline_ohlc.setPos(y)
elif target == self.volume_plot:
self.hline_volume.setPos(y)
elif target == self.obi_plot:
self.hline_obi.setPos(y)
elif target == self.cvd_plot:
self.hline_cvd.setPos(y)
self._update_data_label(x)
2025-09-10 15:39:16 +08:00
except Exception as e:
logging.debug(f"mouse move err: {e}")
def _update_data_label(self, x_seconds: float):
2025-09-10 15:39:16 +08:00
try:
parts = []
2025-09-10 15:39:16 +08:00
if self.ohlc_data:
closest = self._closest_bar(self.ohlc_data, x_seconds)
if closest:
parts.append(f"Time: {self._fmt_ts(closest['timestamp_start'])}")
parts.append(f"OHLC O:{closest['open']:.2f} H:{closest['high']:.2f} "
f"L:{closest['low']:.2f} C:{closest['close']:.2f}")
parts.append(f"Vol: {float(closest['volume']):.6f}")
2025-09-10 15:39:16 +08:00
if self.metrics_data:
# Optional: display OBI/CVD values if present
return #fix access to dict
row = self._closest_metrics_row(self.metrics_data, x_seconds)
if row and len(row) >= 6:
_, _, o, h, l, c = row[:6]
parts.append(f"OBI O:{o:.2f} H:{h:.2f} L:{l:.2f} C:{c:.2f}")
if row and len(row) >= 7:
parts.append(f"CVD: {float(row[6]):.2f}")
self.data_label.setText("<br>".join(parts) if parts else "No data")
2025-09-10 15:39:16 +08:00
except Exception as e:
logging.debug(f"update label err: {e}")
@staticmethod
def _closest_bar(bars: List[Dict[str, Any]], x_seconds: float) -> Optional[Dict[str, Any]]:
if not bars:
2025-09-10 15:39:16 +08:00
return None
x_ms = x_seconds * 1000.0
closest = min(bars, key=lambda b: abs(b["timestamp_start"] - x_ms))
return closest
@staticmethod
def _closest_metrics_row(rows: List[Any], x_seconds: float) -> Optional[Any]:
if not rows:
return None
x_ms = x_seconds * 1000.0
2025-09-10 15:39:16 +08:00
try:
return min(rows, key=lambda r: abs(r[0] - x_ms))
except Exception:
return None
@staticmethod
def _fmt_ts(ts_ms: int) -> str:
from datetime import datetime
2025-09-10 15:39:16 +08:00
try:
return datetime.fromtimestamp(ts_ms / 1000.0).strftime("%Y-%m-%d %H:%M:%S")
except Exception:
return str(int(ts_ms))