orderflow_backtest/desktop_app.py

518 lines
19 KiB
Python

"""
Desktop visualization application using PySide6 and PyQtGraph.
- OHLC candles (time-bucketed, gap-filled, width from timestamp span)
- Volume bars (width from timestamp span)
- OBI candles (same visuals as OHLC but blue)
- CVD line
- Depth chart (cumulative bids/asks)
- Crosshairs + data inspection
"""
import sys
import logging
from typing import List, Dict, Any, Optional
import numpy as np
import pyqtgraph as pg
from pyqtgraph import QtCore, QtGui
from pyqtgraph.graphicsItems.DateAxisItem import DateAxisItem
from PySide6.QtWidgets import QApplication, QMainWindow, QVBoxLayout, QWidget, QHBoxLayout
from PySide6.QtCore import Qt
from ohlc_processor import OHLCProcessor
# ---------------------------
# Candlestick Graphics Items
# ---------------------------
class _BaseCandle(pg.GraphicsObject):
"""
Base class for time-based candles:
- x-axis is epoch seconds (float)
- candle width derived from (timestamp_end - timestamp_start)
"""
def __init__(self, data: List[Dict[str, Any]], body_ratio: float = 0.8,
color_up="#00ff00", color_down="#ff0000", pen_width: int = 1):
super().__init__()
self.data = data
self.body_ratio = max(0.0, min(1.0, body_ratio))
self.pen_up = pg.mkPen(color=color_up, width=pen_width)
self.pen_down = pg.mkPen(color=color_down, width=pen_width)
self.brush_up = pg.mkBrush(color=color_up)
self.brush_down = pg.mkBrush(color=color_down)
self._picture = None
self.generatePicture()
def generatePicture(self):
pic = QtGui.QPicture()
p = QtGui.QPainter(pic)
for entry in self.data:
ts0_ms = entry.get("timestamp_start")
ts1_ms = entry.get("timestamp_end", ts0_ms)
if ts0_ms is None:
continue
x0 = ts0_ms / 1000.0
x1 = ts1_ms / 1000.0
x_center = 0.5 * (x0 + x1)
full_w = max(1e-9, x1 - x0) # seconds
body_w = full_w * self.body_ratio
o = float(entry["open"])
h = float(entry["high"])
l = float(entry["low"])
c = float(entry["close"])
is_up = c >= o
pen = self.pen_up if is_up else self.pen_down
brush = self.brush_up if is_up else self.brush_down
# wick
p.setPen(pen)
p.drawLine(QtCore.QPointF(x_center, l), QtCore.QPointF(x_center, h))
# body (ensure visible dojis)
body_h = abs(c - o)
if body_h == 0:
body_h = 1e-9
body_bottom = min(o, c)
p.setPen(pen)
p.setBrush(brush)
p.drawRect(QtCore.QRectF(x_center - body_w / 2.0, body_bottom, body_w, body_h))
p.end()
self._picture = pic
def paint(self, painter, *args):
if self._picture:
painter.drawPicture(0, 0, self._picture)
def boundingRect(self):
if self._picture:
return QtCore.QRectF(self._picture.boundingRect())
return QtCore.QRectF()
class OHLCItem(_BaseCandle):
"""Green/Red candlesticks."""
def __init__(self, data: List[Dict[str, Any]], body_ratio: float = 0.8):
super().__init__(data, body_ratio=body_ratio, color_up="#00ff00", color_down="#ff0000")
class OBIItem(_BaseCandle):
"""Blue-themed candlesticks for OBI."""
def __init__(self, data: List[Dict[str, Any]], body_ratio: float = 0.8):
super().__init__(data, body_ratio=body_ratio, color_up="#4a9eff", color_down="#1f5f99")
# ---------------------------
# Main Window
# ---------------------------
class MainWindow(QMainWindow):
"""Main application window for orderflow visualization."""
def __init__(self):
super().__init__()
self.ohlc_data: List[Dict[str, Any]] = []
self.metrics_data: List[Any] = [] # adapt shape to your MetricsCalculator
# self.depth_data: Dict[str, List[List[float]]] = {"bids": [], "asks": []}
self._init_ui()
# ----- UI setup -----
def _init_ui(self):
self.setWindowTitle("Orderflow Backtest Visualizer")
self.resize(1400, 900)
central = QWidget(self)
self.setCentralWidget(central)
main_layout = QHBoxLayout(central)
charts_widget = QWidget()
charts_layout = QVBoxLayout(charts_widget)
# depth_widget = QWidget()
# depth_layout = QVBoxLayout(depth_widget)
# PG appearance
pg.setConfigOptions(antialias=True, background="k", foreground="w")
# Date axis for time plots
date_axis = DateAxisItem(orientation="bottom")
self.ohlc_plot = pg.PlotWidget(title="OHLC", axisItems={"bottom": date_axis})
self.ohlc_plot.setLabel("left", "Price", units="USD")
self.ohlc_plot.showGrid(x=True, y=True, alpha=0.3)
self.volume_plot = pg.PlotWidget(title="Volume", axisItems={"bottom": DateAxisItem(orientation="bottom")})
self.volume_plot.setLabel("left", "Volume")
self.volume_plot.showGrid(x=True, y=True, alpha=0.3)
self.obi_plot = pg.PlotWidget(title="Order Book Imbalance (OBI)", axisItems={"bottom": DateAxisItem(orientation="bottom")})
self.obi_plot.setLabel("left", "OBI")
self.obi_plot.showGrid(x=True, y=True, alpha=0.3)
self.cvd_plot = pg.PlotWidget(title="Cumulative Volume Delta (CVD)", axisItems={"bottom": DateAxisItem(orientation="bottom")})
self.cvd_plot.setLabel("left", "CVD")
self.cvd_plot.showGrid(x=True, y=True, alpha=0.3)
# Depth (not time x-axis)
# self.depth_plot = pg.PlotWidget(title="Order Book Depth")
# self.depth_plot.setLabel("left", "Price", units="USD")
# self.depth_plot.setLabel("bottom", "Cumulative Volume")
# self.depth_plot.showGrid(x=True, y=True, alpha=0.3)
# Link x-axes
self.volume_plot.setXLink(self.ohlc_plot)
self.obi_plot.setXLink(self.ohlc_plot)
self.cvd_plot.setXLink(self.ohlc_plot)
# Crosshairs and interactions
self._setup_crosshairs()
self._setup_double_click_autorange()
# Layout weights
charts_layout.addWidget(self.ohlc_plot, 1)
charts_layout.addWidget(self.volume_plot, 1)
charts_layout.addWidget(self.obi_plot, 1)
charts_layout.addWidget(self.cvd_plot, 1)
# depth_layout.addWidget(self.depth_plot)
main_layout.addWidget(charts_widget, 3)
# main_layout.addWidget(depth_widget, 1)
logging.info("UI setup completed")
def _setup_double_click_autorange(self):
def _auto_range(_event):
try:
self.ohlc_plot.autoRange()
self.volume_plot.autoRange()
self.obi_plot.autoRange()
self.cvd_plot.autoRange()
except Exception as e:
logging.debug(f"Auto-range error: {e}")
# Attach to ViewBox mouse double-click
self.ohlc_plot.plotItem.vb.mouseDoubleClickEvent = _auto_range
def _setup_crosshairs(self):
# One vertical line per plot (a single item cannot be in multiple ViewBoxes)
pen = pg.mkPen(color="#888888", width=1, style=Qt.DashLine)
self.vline_ohlc = pg.InfiniteLine(angle=90, movable=False, pen=pen)
self.vline_volume = pg.InfiniteLine(angle=90, movable=False, pen=pen)
self.vline_obi = pg.InfiniteLine(angle=90, movable=False, pen=pen)
self.vline_cvd = pg.InfiniteLine(angle=90, movable=False, pen=pen)
self.hline_ohlc = pg.InfiniteLine(angle=0, movable=False, pen=pen)
self.hline_volume = pg.InfiniteLine(angle=0, movable=False, pen=pen)
self.hline_obi = pg.InfiniteLine(angle=0, movable=False, pen=pen)
self.hline_cvd = pg.InfiniteLine(angle=0, movable=False, pen=pen)
# Attach to plots
self.ohlc_plot.addItem(self.vline_ohlc, ignoreBounds=True)
self.volume_plot.addItem(self.vline_volume, ignoreBounds=True)
self.obi_plot.addItem(self.vline_obi, ignoreBounds=True)
self.cvd_plot.addItem(self.vline_cvd, ignoreBounds=True)
self.ohlc_plot.addItem(self.hline_ohlc, ignoreBounds=True)
self.volume_plot.addItem(self.hline_volume, ignoreBounds=True)
self.obi_plot.addItem(self.hline_obi, ignoreBounds=True)
self.cvd_plot.addItem(self.hline_cvd, ignoreBounds=True)
# Mouse move
for plot in [self.ohlc_plot, self.volume_plot, self.obi_plot, self.cvd_plot]:
plot.scene().sigMouseMoved.connect(self._on_mouse_moved)
# Data label
self.data_label = pg.LabelItem(justify="left")
self.ohlc_plot.addItem(self.data_label)
# ----- Data ingestion from processor -----
def update_data(self, data_processor: OHLCProcessor, nb_bars: int):
"""
Pull latest bars/metrics from the processor and refresh plots.
Call this whenever you've added trade/book data + processor.flush().
"""
self.ohlc_data = data_processor.bars[-nb_bars:] or []
self.metrics_data = data_processor.get_metrics_series()
self.metrics_data = {
"cvd": self.metrics_data["cvd"][-nb_bars:],
"obi": self.metrics_data["obi"][-nb_bars:]
}
self._update_all_plots()
# ----- Plot updates -----
def _update_all_plots(self):
self._update_ohlc_plot()
self._update_volume_plot()
self._update_obi_plot()
self._update_cvd_plot()
# self._update_depth_plot()
def _clear_plot_items_but_crosshair(self, plot: pg.PlotWidget):
protected = (pg.InfiniteLine, pg.LabelItem)
items = [it for it in plot.items() if not isinstance(it, protected)]
for it in items:
plot.removeItem(it)
def _update_ohlc_plot(self):
if not self.ohlc_data:
return
self._clear_plot_items_but_crosshair(self.ohlc_plot)
self.ohlc_plot.addItem(OHLCItem(self.ohlc_data, body_ratio=0.8))
first_ts = self.ohlc_data[0]["timestamp_start"] / 1000.0
last_ts = self.ohlc_data[-1].get("timestamp_end", self.ohlc_data[-1]["timestamp_start"]) / 1000.0
self.ohlc_plot.setXRange(first_ts, last_ts)
lows = [bar["low"] for bar in self.ohlc_data]
highs = [bar["high"] for bar in self.ohlc_data]
self.ohlc_plot.setYRange(min(lows), max(highs))
def _update_volume_plot(self):
if not self.ohlc_data:
return
self._clear_plot_items_but_crosshair(self.volume_plot)
# Build centered volume bars sized to bucket width
for i, bar in enumerate(self.ohlc_data):
ts0 = bar["timestamp_start"] / 1000.0
ts1 = bar.get("timestamp_end", bar["timestamp_start"]) / 1000.0
full_w = max(1e-9, ts1 - ts0)
bar_w = full_w * 0.8
x_center = 0.5 * (ts0 + ts1)
if i > 0:
prev_close = self.ohlc_data[i - 1]["close"]
color = "#00ff00" if bar["close"] >= prev_close else "#ff0000"
else:
color = "#888888"
vol = float(bar["volume"])
self.volume_plot.addItem(pg.BarGraphItem(x=[x_center], height=[vol], width=bar_w, brush=color))
def _update_obi_plot(self):
"""
Update OBI panel with candlesticks from metrics_data.
Each row expected: [ts_start_ms, ts_end_ms, obi_o, obi_h, obi_l, obi_c, cvd]
"""
if not self.metrics_data:
return
self._clear_plot_items_but_crosshair(self.obi_plot)
# Convert metrics_data rows to candle dicts
candlesticks = []
for row in self.metrics_data['obi']:
if len(row) >= 6:
ts0, ts1, o, h, l, c = row[:6]
candlesticks.append({
"timestamp_start": int(ts0),
"timestamp_end": int(ts1),
"open": float(o),
"high": float(h),
"low": float(l),
"close": float(c),
"volume": 0.0,
})
if candlesticks:
self.obi_plot.addItem(OBIItem(candlesticks, body_ratio=0.8))
# Also set Y range explicitly
lows = [c["low"] for c in candlesticks]
highs = [c["high"] for c in candlesticks]
self.obi_plot.setYRange(min(lows), max(highs))
def _update_cvd_plot(self):
"""
Plot CVD as a line if present at index 6, else skip.
"""
# if not self.metrics_data:
# return
# self._clear_plot_items_but_crosshair(self.cvd_plot)
# xs = []
# ys = []
# for row in self.metrics_data['cvd']:
# if len(row) >= 7:
# ts0 = row[0] / 1000.0
# cvd_val = float(row[6])
# xs.append(ts0)
# ys.append(cvd_val)
# if xs:
# self.cvd_plot.plot(xs, ys, pen=pg.mkPen(color="#ffff00", width=2), name="CVD")
if not self.metrics_data:
return
self._clear_plot_items_but_crosshair(self.cvd_plot)
# Convert metrics_data rows to candle dicts
candlesticks = []
for row in self.metrics_data['cvd']:
if len(row) >= 6:
ts0, ts1, o, h, l, c = row[:6]
candlesticks.append({
"timestamp_start": int(ts0),
"timestamp_end": int(ts1),
"open": float(o),
"high": float(h),
"low": float(l),
"close": float(c),
"volume": 0.0,
})
if candlesticks:
self.cvd_plot.addItem(OBIItem(candlesticks, body_ratio=0.8))
# Also set Y range explicitly
lows = [c["low"] for c in candlesticks]
highs = [c["high"] for c in candlesticks]
self.cvd_plot.setYRange(min(lows), max(highs))
# ----- Depth chart -----
@staticmethod
def _cumulate_levels(levels, reverse=False, limit=50):
"""
levels: list of [price, size]
returns list of (price, cum_volume)
"""
if not levels:
return []
try:
sorted_lvls = sorted(levels[:limit], key=lambda x: x[0], reverse=reverse)
cum = []
total = 0.0
for price, size in sorted_lvls:
total += float(size)
cum.append((float(price), total))
return cum
except Exception as e:
logging.warning(f"cumulate_levels error: {e}")
return []
def _update_depth_plot(self):
if not self.depth_data or not isinstance(self.depth_data, dict):
return
self.depth_plot.clear()
bids = self.depth_data.get("bids", [])
asks = self.depth_data.get("asks", [])
cum_bids = self._cumulate_levels(bids, reverse=True, limit=50)
cum_asks = self._cumulate_levels(asks, reverse=False, limit=50)
if cum_bids:
bid_prices = [p for p, _ in cum_bids]
bid_vols = [v for _, v in cum_bids]
self.depth_plot.plot(bid_vols, bid_prices, pen=pg.mkPen(color="#00c800", width=2),
stepMode="left", name="Bids")
fill_b = pg.PlotCurveItem(bid_vols + [0], bid_prices + [bid_prices[-1]], fillLevel=0,
fillBrush=pg.mkBrush(color=(0, 200, 0, 50)))
self.depth_plot.addItem(fill_b)
if cum_asks:
ask_prices = [p for p, _ in cum_asks]
ask_vols = [v for _, v in cum_asks]
self.depth_plot.plot(ask_vols, ask_prices, pen=pg.mkPen(color="#c80000", width=2),
stepMode="left", name="Asks")
fill_a = pg.PlotCurveItem(ask_vols + [0], ask_prices + [ask_prices[-1]], fillLevel=0,
fillBrush=pg.mkBrush(color=(200, 0, 0, 50)))
self.depth_plot.addItem(fill_a)
# ----- Crosshair + inspection -----
def _on_mouse_moved(self, pos):
try:
sender = self.sender()
if not sender:
return
target = None
for plot in [self.ohlc_plot, self.volume_plot, self.obi_plot, self.cvd_plot]:
if plot.scene() == sender:
target = plot
break
if target is None:
return
if target.sceneBoundingRect().contains(pos):
mp = target.plotItem.vb.mapSceneToView(pos)
x = mp.x()
y = mp.y()
# Move all vertical lines to the same x (keep panels aligned)
self.vline_ohlc.setPos(x)
self.vline_volume.setPos(x)
self.vline_obi.setPos(x)
self.vline_cvd.setPos(x)
# Move only the active panel's horizontal line
if target == self.ohlc_plot:
self.hline_ohlc.setPos(y)
elif target == self.volume_plot:
self.hline_volume.setPos(y)
elif target == self.obi_plot:
self.hline_obi.setPos(y)
elif target == self.cvd_plot:
self.hline_cvd.setPos(y)
self._update_data_label(x)
except Exception as e:
logging.debug(f"mouse move err: {e}")
def _update_data_label(self, x_seconds: float):
try:
parts = []
if self.ohlc_data:
closest = self._closest_bar(self.ohlc_data, x_seconds)
if closest:
parts.append(f"Time: {self._fmt_ts(closest['timestamp_start'])}")
parts.append(f"OHLC O:{closest['open']:.2f} H:{closest['high']:.2f} "
f"L:{closest['low']:.2f} C:{closest['close']:.2f}")
parts.append(f"Vol: {float(closest['volume']):.6f}")
if self.metrics_data:
# Optional: display OBI/CVD values if present
return #fix access to dict
row = self._closest_metrics_row(self.metrics_data, x_seconds)
if row and len(row) >= 6:
_, _, o, h, l, c = row[:6]
parts.append(f"OBI O:{o:.2f} H:{h:.2f} L:{l:.2f} C:{c:.2f}")
if row and len(row) >= 7:
parts.append(f"CVD: {float(row[6]):.2f}")
self.data_label.setText("<br>".join(parts) if parts else "No data")
except Exception as e:
logging.debug(f"update label err: {e}")
@staticmethod
def _closest_bar(bars: List[Dict[str, Any]], x_seconds: float) -> Optional[Dict[str, Any]]:
if not bars:
return None
x_ms = x_seconds * 1000.0
closest = min(bars, key=lambda b: abs(b["timestamp_start"] - x_ms))
return closest
@staticmethod
def _closest_metrics_row(rows: List[Any], x_seconds: float) -> Optional[Any]:
if not rows:
return None
x_ms = x_seconds * 1000.0
try:
return min(rows, key=lambda r: abs(r[0] - x_ms))
except Exception:
return None
@staticmethod
def _fmt_ts(ts_ms: int) -> str:
from datetime import datetime
try:
return datetime.fromtimestamp(ts_ms / 1000.0).strftime("%Y-%m-%d %H:%M:%S")
except Exception:
return str(int(ts_ms))