""" Desktop visualization application using PySide6 and PyQtGraph. - OHLC candles (time-bucketed, gap-filled, width from timestamp span) - Volume bars (width from timestamp span) - OBI candles (same visuals as OHLC but blue) - CVD line - Depth chart (cumulative bids/asks) - Crosshairs + data inspection """ import sys import logging from typing import List, Dict, Any, Optional import numpy as np import pyqtgraph as pg from pyqtgraph import QtCore, QtGui from pyqtgraph.graphicsItems.DateAxisItem import DateAxisItem from PySide6.QtWidgets import QApplication, QMainWindow, QVBoxLayout, QWidget, QHBoxLayout from PySide6.QtCore import Qt from ohlc_processor import OHLCProcessor # --------------------------- # Candlestick Graphics Items # --------------------------- class _BaseCandle(pg.GraphicsObject): """ Base class for time-based candles: - x-axis is epoch seconds (float) - candle width derived from (timestamp_end - timestamp_start) """ def __init__(self, data: List[Dict[str, Any]], body_ratio: float = 0.8, color_up="#00ff00", color_down="#ff0000", pen_width: int = 1): super().__init__() self.data = data self.body_ratio = max(0.0, min(1.0, body_ratio)) self.pen_up = pg.mkPen(color=color_up, width=pen_width) self.pen_down = pg.mkPen(color=color_down, width=pen_width) self.brush_up = pg.mkBrush(color=color_up) self.brush_down = pg.mkBrush(color=color_down) self._picture = None self.generatePicture() def generatePicture(self): pic = QtGui.QPicture() p = QtGui.QPainter(pic) for entry in self.data: ts0_ms = entry.get("timestamp_start") ts1_ms = entry.get("timestamp_end", ts0_ms) if ts0_ms is None: continue x0 = ts0_ms / 1000.0 x1 = ts1_ms / 1000.0 x_center = 0.5 * (x0 + x1) full_w = max(1e-9, x1 - x0) # seconds body_w = full_w * self.body_ratio o = float(entry["open"]) h = float(entry["high"]) l = float(entry["low"]) c = float(entry["close"]) is_up = c >= o pen = self.pen_up if is_up else self.pen_down brush = self.brush_up if is_up else self.brush_down # wick p.setPen(pen) p.drawLine(QtCore.QPointF(x_center, l), QtCore.QPointF(x_center, h)) # body (ensure visible dojis) body_h = abs(c - o) if body_h == 0: body_h = 1e-9 body_bottom = min(o, c) p.setPen(pen) p.setBrush(brush) p.drawRect(QtCore.QRectF(x_center - body_w / 2.0, body_bottom, body_w, body_h)) p.end() self._picture = pic def paint(self, painter, *args): if self._picture: painter.drawPicture(0, 0, self._picture) def boundingRect(self): if self._picture: return QtCore.QRectF(self._picture.boundingRect()) return QtCore.QRectF() class OHLCItem(_BaseCandle): """Green/Red candlesticks.""" def __init__(self, data: List[Dict[str, Any]], body_ratio: float = 0.8): super().__init__(data, body_ratio=body_ratio, color_up="#00ff00", color_down="#ff0000") class OBIItem(_BaseCandle): """Blue-themed candlesticks for OBI.""" def __init__(self, data: List[Dict[str, Any]], body_ratio: float = 0.8): super().__init__(data, body_ratio=body_ratio, color_up="#4a9eff", color_down="#1f5f99") # --------------------------- # Main Window # --------------------------- class MainWindow(QMainWindow): """Main application window for orderflow visualization.""" def __init__(self): super().__init__() self.ohlc_data: List[Dict[str, Any]] = [] self.metrics_data: List[Any] = [] # adapt shape to your MetricsCalculator # self.depth_data: Dict[str, List[List[float]]] = {"bids": [], "asks": []} self._init_ui() # ----- UI setup ----- def _init_ui(self): self.setWindowTitle("Orderflow Backtest Visualizer") self.resize(1400, 900) central = QWidget(self) self.setCentralWidget(central) main_layout = QHBoxLayout(central) charts_widget = QWidget() charts_layout = QVBoxLayout(charts_widget) # depth_widget = QWidget() # depth_layout = QVBoxLayout(depth_widget) # PG appearance pg.setConfigOptions(antialias=True, background="k", foreground="w") # Date axis for time plots date_axis = DateAxisItem(orientation="bottom") self.ohlc_plot = pg.PlotWidget(title="OHLC", axisItems={"bottom": date_axis}) self.ohlc_plot.setLabel("left", "Price", units="USD") self.ohlc_plot.showGrid(x=True, y=True, alpha=0.3) self.volume_plot = pg.PlotWidget(title="Volume", axisItems={"bottom": DateAxisItem(orientation="bottom")}) self.volume_plot.setLabel("left", "Volume") self.volume_plot.showGrid(x=True, y=True, alpha=0.3) self.obi_plot = pg.PlotWidget(title="Order Book Imbalance (OBI)", axisItems={"bottom": DateAxisItem(orientation="bottom")}) self.obi_plot.setLabel("left", "OBI") self.obi_plot.showGrid(x=True, y=True, alpha=0.3) self.cvd_plot = pg.PlotWidget(title="Cumulative Volume Delta (CVD)", axisItems={"bottom": DateAxisItem(orientation="bottom")}) self.cvd_plot.setLabel("left", "CVD") self.cvd_plot.showGrid(x=True, y=True, alpha=0.3) # Depth (not time x-axis) # self.depth_plot = pg.PlotWidget(title="Order Book Depth") # self.depth_plot.setLabel("left", "Price", units="USD") # self.depth_plot.setLabel("bottom", "Cumulative Volume") # self.depth_plot.showGrid(x=True, y=True, alpha=0.3) # Link x-axes self.volume_plot.setXLink(self.ohlc_plot) self.obi_plot.setXLink(self.ohlc_plot) self.cvd_plot.setXLink(self.ohlc_plot) # Crosshairs and interactions self._setup_crosshairs() self._setup_double_click_autorange() # Layout weights charts_layout.addWidget(self.ohlc_plot, 1) charts_layout.addWidget(self.volume_plot, 1) charts_layout.addWidget(self.obi_plot, 1) charts_layout.addWidget(self.cvd_plot, 1) # depth_layout.addWidget(self.depth_plot) main_layout.addWidget(charts_widget, 3) # main_layout.addWidget(depth_widget, 1) logging.info("UI setup completed") def _setup_double_click_autorange(self): def _auto_range(_event): try: self.ohlc_plot.autoRange() self.volume_plot.autoRange() self.obi_plot.autoRange() self.cvd_plot.autoRange() except Exception as e: logging.debug(f"Auto-range error: {e}") # Attach to ViewBox mouse double-click self.ohlc_plot.plotItem.vb.mouseDoubleClickEvent = _auto_range def _setup_crosshairs(self): # One vertical line per plot (a single item cannot be in multiple ViewBoxes) pen = pg.mkPen(color="#888888", width=1, style=Qt.DashLine) self.vline_ohlc = pg.InfiniteLine(angle=90, movable=False, pen=pen) self.vline_volume = pg.InfiniteLine(angle=90, movable=False, pen=pen) self.vline_obi = pg.InfiniteLine(angle=90, movable=False, pen=pen) self.vline_cvd = pg.InfiniteLine(angle=90, movable=False, pen=pen) self.hline_ohlc = pg.InfiniteLine(angle=0, movable=False, pen=pen) self.hline_volume = pg.InfiniteLine(angle=0, movable=False, pen=pen) self.hline_obi = pg.InfiniteLine(angle=0, movable=False, pen=pen) self.hline_cvd = pg.InfiniteLine(angle=0, movable=False, pen=pen) # Attach to plots self.ohlc_plot.addItem(self.vline_ohlc, ignoreBounds=True) self.volume_plot.addItem(self.vline_volume, ignoreBounds=True) self.obi_plot.addItem(self.vline_obi, ignoreBounds=True) self.cvd_plot.addItem(self.vline_cvd, ignoreBounds=True) self.ohlc_plot.addItem(self.hline_ohlc, ignoreBounds=True) self.volume_plot.addItem(self.hline_volume, ignoreBounds=True) self.obi_plot.addItem(self.hline_obi, ignoreBounds=True) self.cvd_plot.addItem(self.hline_cvd, ignoreBounds=True) # Mouse move for plot in [self.ohlc_plot, self.volume_plot, self.obi_plot, self.cvd_plot]: plot.scene().sigMouseMoved.connect(self._on_mouse_moved) # Data label self.data_label = pg.LabelItem(justify="left") self.ohlc_plot.addItem(self.data_label) # ----- Data ingestion from processor ----- def update_data(self, data_processor: OHLCProcessor, nb_bars: int): """ Pull latest bars/metrics from the processor and refresh plots. Call this whenever you've added trade/book data + processor.flush(). """ self.ohlc_data = data_processor.bars[-nb_bars:] or [] self.metrics_data = data_processor.get_metrics_series() self.metrics_data = { "cvd": self.metrics_data["cvd"][-nb_bars:], "obi": self.metrics_data["obi"][-nb_bars:] } self._update_all_plots() # ----- Plot updates ----- def _update_all_plots(self): self._update_ohlc_plot() self._update_volume_plot() self._update_obi_plot() self._update_cvd_plot() # self._update_depth_plot() def _clear_plot_items_but_crosshair(self, plot: pg.PlotWidget): protected = (pg.InfiniteLine, pg.LabelItem) items = [it for it in plot.items() if not isinstance(it, protected)] for it in items: plot.removeItem(it) def _update_ohlc_plot(self): if not self.ohlc_data: return self._clear_plot_items_but_crosshair(self.ohlc_plot) self.ohlc_plot.addItem(OHLCItem(self.ohlc_data, body_ratio=0.8)) first_ts = self.ohlc_data[0]["timestamp_start"] / 1000.0 last_ts = self.ohlc_data[-1].get("timestamp_end", self.ohlc_data[-1]["timestamp_start"]) / 1000.0 self.ohlc_plot.setXRange(first_ts, last_ts) lows = [bar["low"] for bar in self.ohlc_data] highs = [bar["high"] for bar in self.ohlc_data] self.ohlc_plot.setYRange(min(lows), max(highs)) def _update_volume_plot(self): if not self.ohlc_data: return self._clear_plot_items_but_crosshair(self.volume_plot) # Build centered volume bars sized to bucket width for i, bar in enumerate(self.ohlc_data): ts0 = bar["timestamp_start"] / 1000.0 ts1 = bar.get("timestamp_end", bar["timestamp_start"]) / 1000.0 full_w = max(1e-9, ts1 - ts0) bar_w = full_w * 0.8 x_center = 0.5 * (ts0 + ts1) if i > 0: prev_close = self.ohlc_data[i - 1]["close"] color = "#00ff00" if bar["close"] >= prev_close else "#ff0000" else: color = "#888888" vol = float(bar["volume"]) self.volume_plot.addItem(pg.BarGraphItem(x=[x_center], height=[vol], width=bar_w, brush=color)) def _update_obi_plot(self): """ Update OBI panel with candlesticks from metrics_data. Each row expected: [ts_start_ms, ts_end_ms, obi_o, obi_h, obi_l, obi_c, cvd] """ if not self.metrics_data: return self._clear_plot_items_but_crosshair(self.obi_plot) # Convert metrics_data rows to candle dicts candlesticks = [] for row in self.metrics_data['obi']: if len(row) >= 6: ts0, ts1, o, h, l, c = row[:6] candlesticks.append({ "timestamp_start": int(ts0), "timestamp_end": int(ts1), "open": float(o), "high": float(h), "low": float(l), "close": float(c), "volume": 0.0, }) if candlesticks: self.obi_plot.addItem(OBIItem(candlesticks, body_ratio=0.8)) # Also set Y range explicitly lows = [c["low"] for c in candlesticks] highs = [c["high"] for c in candlesticks] self.obi_plot.setYRange(min(lows), max(highs)) def _update_cvd_plot(self): """ Plot CVD as a line if present at index 6, else skip. """ # if not self.metrics_data: # return # self._clear_plot_items_but_crosshair(self.cvd_plot) # xs = [] # ys = [] # for row in self.metrics_data['cvd']: # if len(row) >= 7: # ts0 = row[0] / 1000.0 # cvd_val = float(row[6]) # xs.append(ts0) # ys.append(cvd_val) # if xs: # self.cvd_plot.plot(xs, ys, pen=pg.mkPen(color="#ffff00", width=2), name="CVD") if not self.metrics_data: return self._clear_plot_items_but_crosshair(self.cvd_plot) # Convert metrics_data rows to candle dicts candlesticks = [] for row in self.metrics_data['cvd']: if len(row) >= 6: ts0, ts1, o, h, l, c = row[:6] candlesticks.append({ "timestamp_start": int(ts0), "timestamp_end": int(ts1), "open": float(o), "high": float(h), "low": float(l), "close": float(c), "volume": 0.0, }) if candlesticks: self.cvd_plot.addItem(OBIItem(candlesticks, body_ratio=0.8)) # Also set Y range explicitly lows = [c["low"] for c in candlesticks] highs = [c["high"] for c in candlesticks] self.cvd_plot.setYRange(min(lows), max(highs)) # ----- Depth chart ----- @staticmethod def _cumulate_levels(levels, reverse=False, limit=50): """ levels: list of [price, size] returns list of (price, cum_volume) """ if not levels: return [] try: sorted_lvls = sorted(levels[:limit], key=lambda x: x[0], reverse=reverse) cum = [] total = 0.0 for price, size in sorted_lvls: total += float(size) cum.append((float(price), total)) return cum except Exception as e: logging.warning(f"cumulate_levels error: {e}") return [] def _update_depth_plot(self): if not self.depth_data or not isinstance(self.depth_data, dict): return self.depth_plot.clear() bids = self.depth_data.get("bids", []) asks = self.depth_data.get("asks", []) cum_bids = self._cumulate_levels(bids, reverse=True, limit=50) cum_asks = self._cumulate_levels(asks, reverse=False, limit=50) if cum_bids: bid_prices = [p for p, _ in cum_bids] bid_vols = [v for _, v in cum_bids] self.depth_plot.plot(bid_vols, bid_prices, pen=pg.mkPen(color="#00c800", width=2), stepMode="left", name="Bids") fill_b = pg.PlotCurveItem(bid_vols + [0], bid_prices + [bid_prices[-1]], fillLevel=0, fillBrush=pg.mkBrush(color=(0, 200, 0, 50))) self.depth_plot.addItem(fill_b) if cum_asks: ask_prices = [p for p, _ in cum_asks] ask_vols = [v for _, v in cum_asks] self.depth_plot.plot(ask_vols, ask_prices, pen=pg.mkPen(color="#c80000", width=2), stepMode="left", name="Asks") fill_a = pg.PlotCurveItem(ask_vols + [0], ask_prices + [ask_prices[-1]], fillLevel=0, fillBrush=pg.mkBrush(color=(200, 0, 0, 50))) self.depth_plot.addItem(fill_a) # ----- Crosshair + inspection ----- def _on_mouse_moved(self, pos): try: sender = self.sender() if not sender: return target = None for plot in [self.ohlc_plot, self.volume_plot, self.obi_plot, self.cvd_plot]: if plot.scene() == sender: target = plot break if target is None: return if target.sceneBoundingRect().contains(pos): mp = target.plotItem.vb.mapSceneToView(pos) x = mp.x() y = mp.y() # Move all vertical lines to the same x (keep panels aligned) self.vline_ohlc.setPos(x) self.vline_volume.setPos(x) self.vline_obi.setPos(x) self.vline_cvd.setPos(x) # Move only the active panel's horizontal line if target == self.ohlc_plot: self.hline_ohlc.setPos(y) elif target == self.volume_plot: self.hline_volume.setPos(y) elif target == self.obi_plot: self.hline_obi.setPos(y) elif target == self.cvd_plot: self.hline_cvd.setPos(y) self._update_data_label(x) except Exception as e: logging.debug(f"mouse move err: {e}") def _update_data_label(self, x_seconds: float): try: parts = [] if self.ohlc_data: closest = self._closest_bar(self.ohlc_data, x_seconds) if closest: parts.append(f"Time: {self._fmt_ts(closest['timestamp_start'])}") parts.append(f"OHLC O:{closest['open']:.2f} H:{closest['high']:.2f} " f"L:{closest['low']:.2f} C:{closest['close']:.2f}") parts.append(f"Vol: {float(closest['volume']):.6f}") if self.metrics_data: # Optional: display OBI/CVD values if present return #fix access to dict row = self._closest_metrics_row(self.metrics_data, x_seconds) if row and len(row) >= 6: _, _, o, h, l, c = row[:6] parts.append(f"OBI O:{o:.2f} H:{h:.2f} L:{l:.2f} C:{c:.2f}") if row and len(row) >= 7: parts.append(f"CVD: {float(row[6]):.2f}") self.data_label.setText("
".join(parts) if parts else "No data") except Exception as e: logging.debug(f"update label err: {e}") @staticmethod def _closest_bar(bars: List[Dict[str, Any]], x_seconds: float) -> Optional[Dict[str, Any]]: if not bars: return None x_ms = x_seconds * 1000.0 closest = min(bars, key=lambda b: abs(b["timestamp_start"] - x_ms)) return closest @staticmethod def _closest_metrics_row(rows: List[Any], x_seconds: float) -> Optional[Any]: if not rows: return None x_ms = x_seconds * 1000.0 try: return min(rows, key=lambda r: abs(r[0] - x_ms)) except Exception: return None @staticmethod def _fmt_ts(ts_ms: int) -> str: from datetime import datetime try: return datetime.fromtimestamp(ts_ms / 1000.0).strftime("%Y-%m-%d %H:%M:%S") except Exception: return str(int(ts_ms))