From 65dab174249c2d574314f11e5150e9785ef7040b Mon Sep 17 00:00:00 2001 From: Simon Moisy Date: Fri, 12 Sep 2025 15:28:15 +0800 Subject: [PATCH] Refactor desktop application to enhance OHLC and OBI visualizations. Introduced dynamic candle width based on timestamps, improved data handling for metrics, and added crosshair functionality for better data inspection. Updated UI layout for improved user experience and integrated real-time data updates from the processor. --- desktop_app.py | 1002 ++++++++++++++++------------------------- level_parser.py | 132 +++--- main.py | 44 +- metrics_calculator.py | 66 ++- ohlc_processor.py | 197 ++++++-- 5 files changed, 708 insertions(+), 733 deletions(-) diff --git a/desktop_app.py b/desktop_app.py index 81fbf85..3a2977c 100644 --- a/desktop_app.py +++ b/desktop_app.py @@ -1,698 +1,494 @@ """ Desktop visualization application using PySide6 and PyQtGraph. -This module provides a native desktop replacement for the Dash web application, -offering better performance, debugging capabilities, and real-time potential. +- OHLC candles (time-bucketed, gap-filled, width from timestamp span) +- Volume bars (width from timestamp span) +- OBI candles (same visuals as OHLC but blue) +- CVD line +- Depth chart (cumulative bids/asks) +- Crosshairs + data inspection """ import sys -import json import logging -from pathlib import Path -from typing import List, Optional, Dict, Any -from PySide6.QtWidgets import QApplication, QMainWindow, QVBoxLayout, QWidget, QHBoxLayout -from PySide6.QtCore import QTimer +from typing import List, Dict, Any, Optional + +import numpy as np import pyqtgraph as pg from pyqtgraph import QtCore, QtGui -import numpy as np +from pyqtgraph.graphicsItems.DateAxisItem import DateAxisItem + +from PySide6.QtWidgets import QApplication, QMainWindow, QVBoxLayout, QWidget, QHBoxLayout +from PySide6.QtCore import Qt + from ohlc_processor import OHLCProcessor -class OHLCItem(pg.GraphicsObject): - """Custom OHLC candlestick item for PyQtGraph.""" - - def __init__(self, data): - """ - Initialize OHLC item with data. - - Args: - data: List of tuples (timestamp, open, high, low, close, volume) - """ - pg.GraphicsObject.__init__(self) - self.data = data - self.generatePicture() - - def generatePicture(self): - """Generate the candlestick chart picture.""" - self.picture = QtGui.QPicture() - painter = QtGui.QPainter(self.picture) - - pen_up = pg.mkPen(color='#00ff00', width=1) # Green for up candles - pen_down = pg.mkPen(color='#ff0000', width=1) # Red for down candles - brush_up = pg.mkBrush(color='#00ff00') - brush_down = pg.mkBrush(color='#ff0000') - - # Dynamic candle width based on data density - if len(self.data) > 1: - time_diff = (self.data[1][0] - self.data[0][0]) / 1000 # Time between candles in seconds - width = time_diff * 0.8 # 80% of time interval - else: - width = 30 # Default width in seconds - - for timestamp, open_price, high, low, close, volume in self.data: - x = timestamp / 1000 # Convert ms to seconds - - # Determine candle color - is_up = close >= open_price - pen = pen_up if is_up else pen_down - brush = brush_up if is_up else brush_down - - # Draw wick (high-low line) - painter.setPen(pen) - painter.drawLine(QtCore.QPointF(x, low), QtCore.QPointF(x, high)) - - # Draw body (open-close rectangle) - body_height = abs(close - open_price) - body_bottom = min(open_price, close) - - painter.setPen(pen) - painter.setBrush(brush) - painter.drawRect(QtCore.QRectF(x - width/2, body_bottom, width, body_height)) - - painter.end() - - def paint(self, painter, option, widget): - """Paint the candlestick chart.""" - painter.drawPicture(0, 0, self.picture) - - def boundingRect(self): - """Return the bounding rectangle of the item.""" - return QtCore.QRectF(self.picture.boundingRect()) -class OBIItem(pg.GraphicsObject): - """Custom OBI candlestick item with blue styling.""" - - def __init__(self, data): - """Initialize OBI item with blue color scheme.""" - pg.GraphicsObject.__init__(self) +# --------------------------- +# Candlestick Graphics Items +# --------------------------- +class _BaseCandle(pg.GraphicsObject): + """ + Base class for time-based candles: + - x-axis is epoch seconds (float) + - candle width derived from (timestamp_end - timestamp_start) + """ + def __init__(self, data: List[Dict[str, Any]], body_ratio: float = 0.8, + color_up="#00ff00", color_down="#ff0000", pen_width: int = 1): + super().__init__() self.data = data + self.body_ratio = max(0.0, min(1.0, body_ratio)) + self.pen_up = pg.mkPen(color=color_up, width=pen_width) + self.pen_down = pg.mkPen(color=color_down, width=pen_width) + self.brush_up = pg.mkBrush(color=color_up) + self.brush_down = pg.mkBrush(color=color_down) + self._picture = None self.generatePicture() - - def generatePicture(self): - """Generate OBI candlestick chart with blue styling.""" - self.picture = QtGui.QPicture() - painter = QtGui.QPainter(self.picture) - - # Blue color scheme for OBI - pen_up = pg.mkPen(color='#4a9eff', width=1) # Light blue for up - pen_down = pg.mkPen(color='#1f5f99', width=1) # Dark blue for down - brush_up = pg.mkBrush(color='#4a9eff') - brush_down = pg.mkBrush(color='#1f5f99') - - # Dynamic width calculation - if len(self.data) > 1: - time_diff = (self.data[1][0] - self.data[0][0]) / 1000 - width = time_diff * 0.8 - else: - width = 30 - - for timestamp, open_price, high, low, close, _ in self.data: - x = timestamp / 1000 - - # Determine color - is_up = close >= open_price - pen = pen_up if is_up else pen_down - brush = brush_up if is_up else brush_down - - # Draw wick - painter.setPen(pen) - painter.drawLine(QtCore.QPointF(x, low), QtCore.QPointF(x, high)) - - # Draw body - body_height = abs(close - open_price) - body_bottom = min(open_price, close) - - painter.setPen(pen) - painter.setBrush(brush) - painter.drawRect(QtCore.QRectF(x - width/2, body_bottom, width, body_height)) - - painter.end() - - def paint(self, painter, option, widget): - """Paint the OBI candlestick chart.""" - painter.drawPicture(0, 0, self.picture) - - def boundingRect(self): - """Return the bounding rectangle.""" - return QtCore.QRectF(self.picture.boundingRect()) + def generatePicture(self): + pic = QtGui.QPicture() + p = QtGui.QPainter(pic) + + for entry in self.data: + ts0_ms = entry.get("timestamp_start") + ts1_ms = entry.get("timestamp_end", ts0_ms) + if ts0_ms is None: + continue + + x0 = ts0_ms / 1000.0 + x1 = ts1_ms / 1000.0 + x_center = 0.5 * (x0 + x1) + full_w = max(1e-9, x1 - x0) # seconds + body_w = full_w * self.body_ratio + + o = float(entry["open"]) + h = float(entry["high"]) + l = float(entry["low"]) + c = float(entry["close"]) + + is_up = c >= o + pen = self.pen_up if is_up else self.pen_down + brush = self.brush_up if is_up else self.brush_down + + # wick + p.setPen(pen) + p.drawLine(QtCore.QPointF(x_center, l), QtCore.QPointF(x_center, h)) + + # body (ensure visible dojis) + body_h = abs(c - o) + if body_h == 0: + body_h = 1e-9 + body_bottom = min(o, c) + p.setPen(pen) + p.setBrush(brush) + p.drawRect(QtCore.QRectF(x_center - body_w / 2.0, body_bottom, body_w, body_h)) + + p.end() + self._picture = pic + + def paint(self, painter, *args): + if self._picture: + painter.drawPicture(0, 0, self._picture) + + def boundingRect(self): + if self._picture: + return QtCore.QRectF(self._picture.boundingRect()) + return QtCore.QRectF() + + +class OHLCItem(_BaseCandle): + """Green/Red candlesticks.""" + def __init__(self, data: List[Dict[str, Any]], body_ratio: float = 0.8): + super().__init__(data, body_ratio=body_ratio, color_up="#00ff00", color_down="#ff0000") + + +class OBIItem(_BaseCandle): + """Blue-themed candlesticks for OBI.""" + def __init__(self, data: List[Dict[str, Any]], body_ratio: float = 0.8): + super().__init__(data, body_ratio=body_ratio, color_up="#4a9eff", color_down="#1f5f99") + + +# --------------------------- +# Main Window +# --------------------------- class MainWindow(QMainWindow): """Main application window for orderflow visualization.""" - def __init__(self): super().__init__() - self.ohlc_data = [] - self.metrics_data = [] - self.depth_data = {"bids": [], "asks": []} # Cache for depth data - self.last_data_size = 0 # Track OHLC data changes - self.last_metrics_size = 0 # Track metrics data changes - self.last_depth_data = None # Track depth data changes - - self.setup_ui() - - def setup_ui(self): - """Initialize the user interface.""" + self.ohlc_data: List[Dict[str, Any]] = [] + self.metrics_data: List[Any] = [] # adapt shape to your MetricsCalculator + self.depth_data: Dict[str, List[List[float]]] = {"bids": [], "asks": []} + + self._init_ui() + + # ----- UI setup ----- + def _init_ui(self): self.setWindowTitle("Orderflow Backtest Visualizer") - self.setGeometry(100, 100, 1200, 800) - - # Central widget and main horizontal layout - central_widget = QWidget() - self.setCentralWidget(central_widget) - main_layout = QHBoxLayout(central_widget) - - # Left side: Charts layout (OHLC, Volume, OBI, CVD) + self.resize(1400, 900) + + central = QWidget(self) + self.setCentralWidget(central) + main_layout = QHBoxLayout(central) + charts_widget = QWidget() charts_layout = QVBoxLayout(charts_widget) - - # Right side: Depth chart widget + depth_widget = QWidget() depth_layout = QVBoxLayout(depth_widget) - - # Configure PyQtGraph - pg.setConfigOptions(antialias=True, background='k', foreground='w') - - # Create multiple plot widgets for different charts - self.ohlc_plot = pg.PlotWidget(title="OHLC Candlestick Chart") - self.ohlc_plot.setLabel('left', 'Price', units='USD') + + # PG appearance + pg.setConfigOptions(antialias=True, background="k", foreground="w") + + # Date axis for time plots + date_axis = DateAxisItem(orientation="bottom") + + self.ohlc_plot = pg.PlotWidget(title="OHLC", axisItems={"bottom": date_axis}) + self.ohlc_plot.setLabel("left", "Price", units="USD") self.ohlc_plot.showGrid(x=True, y=True, alpha=0.3) - self.ohlc_plot.setMouseEnabled(x=True, y=True) - self.ohlc_plot.enableAutoRange(axis='xy', enable=False) # Disable auto-range for better control - - self.volume_plot = pg.PlotWidget(title="Volume") - self.volume_plot.setLabel('left', 'Volume') + + self.volume_plot = pg.PlotWidget(title="Volume", axisItems={"bottom": DateAxisItem(orientation="bottom")}) + self.volume_plot.setLabel("left", "Volume") self.volume_plot.showGrid(x=True, y=True, alpha=0.3) - self.volume_plot.setMouseEnabled(x=True, y=True) - self.volume_plot.enableAutoRange(axis='xy', enable=False) - - self.obi_plot = pg.PlotWidget(title="Order Book Imbalance (OBI)") - self.obi_plot.setLabel('left', 'OBI') + + self.obi_plot = pg.PlotWidget(title="Order Book Imbalance (OBI)", axisItems={"bottom": DateAxisItem(orientation="bottom")}) + self.obi_plot.setLabel("left", "OBI") self.obi_plot.showGrid(x=True, y=True, alpha=0.3) - self.obi_plot.setMouseEnabled(x=True, y=True) - self.obi_plot.enableAutoRange(axis='xy', enable=False) - - self.cvd_plot = pg.PlotWidget(title="Cumulative Volume Delta (CVD)") - self.cvd_plot.setLabel('left', 'CVD') - self.cvd_plot.setLabel('bottom', 'Time', units='s') + + self.cvd_plot = pg.PlotWidget(title="Cumulative Volume Delta (CVD)", axisItems={"bottom": DateAxisItem(orientation="bottom")}) + self.cvd_plot.setLabel("left", "CVD") self.cvd_plot.showGrid(x=True, y=True, alpha=0.3) - self.cvd_plot.setMouseEnabled(x=True, y=True) - self.cvd_plot.enableAutoRange(axis='xy', enable=False) - - # Create depth chart (right side) + + # Depth (not time x-axis) self.depth_plot = pg.PlotWidget(title="Order Book Depth") - self.depth_plot.setLabel('left', 'Price', units='USD') - self.depth_plot.setLabel('bottom', 'Cumulative Volume') + self.depth_plot.setLabel("left", "Price", units="USD") + self.depth_plot.setLabel("bottom", "Cumulative Volume") self.depth_plot.showGrid(x=True, y=True, alpha=0.3) - self.depth_plot.setMouseEnabled(x=True, y=True) - - # Link x-axes for synchronized zooming/panning (main charts only) + + # Link x-axes self.volume_plot.setXLink(self.ohlc_plot) self.obi_plot.setXLink(self.ohlc_plot) self.cvd_plot.setXLink(self.ohlc_plot) - - # Add crosshairs to time-series charts + + # Crosshairs and interactions self._setup_crosshairs() - - # Add charts to left layout - charts_layout.addWidget(self.ohlc_plot, 3) # Larger space for OHLC + self._setup_double_click_autorange() + + # Layout weights + charts_layout.addWidget(self.ohlc_plot, 3) charts_layout.addWidget(self.volume_plot, 1) - charts_layout.addWidget(self.obi_plot, 1) + charts_layout.addWidget(self.obi_plot, 1) charts_layout.addWidget(self.cvd_plot, 1) - - # Add depth chart to right layout + depth_layout.addWidget(self.depth_plot) - - # Add both sides to main layout (3:1 ratio similar to original Dash) + main_layout.addWidget(charts_widget, 3) main_layout.addWidget(depth_widget, 1) - + logging.info("UI setup completed") - + + def _setup_double_click_autorange(self): + def _auto_range(_event): + try: + self.ohlc_plot.autoRange() + self.volume_plot.autoRange() + self.obi_plot.autoRange() + self.cvd_plot.autoRange() + except Exception as e: + logging.debug(f"Auto-range error: {e}") + + # Attach to ViewBox mouse double-click + self.ohlc_plot.plotItem.vb.mouseDoubleClickEvent = _auto_range + def _setup_crosshairs(self): - """Setup crosshair functionality for time-series charts.""" - # Create crosshair lines with proper pen style - crosshair_pen = pg.mkPen(color='#888888', width=1, style=QtCore.Qt.DashLine) - self.vline = pg.InfiniteLine(angle=90, movable=False, pen=crosshair_pen) - self.hline_ohlc = pg.InfiniteLine(angle=0, movable=False, pen=crosshair_pen) - self.hline_volume = pg.InfiniteLine(angle=0, movable=False, pen=crosshair_pen) - self.hline_obi = pg.InfiniteLine(angle=0, movable=False, pen=crosshair_pen) - self.hline_cvd = pg.InfiniteLine(angle=0, movable=False, pen=crosshair_pen) - - # Add crosshairs to plots - self.ohlc_plot.addItem(self.vline, ignoreBounds=True) + # One vertical line per plot (a single item cannot be in multiple ViewBoxes) + pen = pg.mkPen(color="#888888", width=1, style=Qt.DashLine) + + self.vline_ohlc = pg.InfiniteLine(angle=90, movable=False, pen=pen) + self.vline_volume = pg.InfiniteLine(angle=90, movable=False, pen=pen) + self.vline_obi = pg.InfiniteLine(angle=90, movable=False, pen=pen) + self.vline_cvd = pg.InfiniteLine(angle=90, movable=False, pen=pen) + + self.hline_ohlc = pg.InfiniteLine(angle=0, movable=False, pen=pen) + self.hline_volume = pg.InfiniteLine(angle=0, movable=False, pen=pen) + self.hline_obi = pg.InfiniteLine(angle=0, movable=False, pen=pen) + self.hline_cvd = pg.InfiniteLine(angle=0, movable=False, pen=pen) + + # Attach to plots + self.ohlc_plot.addItem(self.vline_ohlc, ignoreBounds=True) + self.volume_plot.addItem(self.vline_volume, ignoreBounds=True) + self.obi_plot.addItem(self.vline_obi, ignoreBounds=True) + self.cvd_plot.addItem(self.vline_cvd, ignoreBounds=True) + self.ohlc_plot.addItem(self.hline_ohlc, ignoreBounds=True) - self.volume_plot.addItem(self.vline, ignoreBounds=True) self.volume_plot.addItem(self.hline_volume, ignoreBounds=True) - self.obi_plot.addItem(self.vline, ignoreBounds=True) self.obi_plot.addItem(self.hline_obi, ignoreBounds=True) - self.cvd_plot.addItem(self.vline, ignoreBounds=True) self.cvd_plot.addItem(self.hline_cvd, ignoreBounds=True) - - # Connect mouse move events - self.ohlc_plot.scene().sigMouseMoved.connect(self._on_mouse_moved) - self.volume_plot.scene().sigMouseMoved.connect(self._on_mouse_moved) - self.obi_plot.scene().sigMouseMoved.connect(self._on_mouse_moved) - self.cvd_plot.scene().sigMouseMoved.connect(self._on_mouse_moved) - - # Create data inspection label - self.data_label = pg.LabelItem(justify='left') + + # Mouse move + for plot in [self.ohlc_plot, self.volume_plot, self.obi_plot, self.cvd_plot]: + plot.scene().sigMouseMoved.connect(self._on_mouse_moved) + + # Data label + self.data_label = pg.LabelItem(justify="left") self.ohlc_plot.addItem(self.data_label) - - # Add rectangle selection for zoom functionality - self._setup_rectangle_selection() - - logging.debug("Crosshairs setup completed") - - def _setup_rectangle_selection(self): - """Setup rectangle selection for zoom functionality.""" - # Enable rectangle selection on OHLC plot (main chart) - self.ohlc_plot.setMenuEnabled(False) # Disable context menu for cleaner interaction - - # Add double-click to auto-range - self.ohlc_plot.plotItem.vb.mouseDoubleClickEvent = self._on_double_click - - # Rectangle selection is handled by PyQtGraph's built-in ViewBox behavior - # Users can drag to select area and right-click to zoom to selection - logging.debug("Rectangle selection setup completed") - - def _on_double_click(self, event): - """Handle double-click to auto-range all charts.""" - try: - # Auto-range all linked charts - self.ohlc_plot.autoRange() - self.volume_plot.autoRange() - self.obi_plot.autoRange() - self.cvd_plot.autoRange() - logging.debug("Auto-range applied to all charts") - except Exception as e: - logging.debug(f"Error in double-click handler: {e}") - + + # ----- Data ingestion from processor ----- def update_data(self, data_processor: OHLCProcessor): - """Update chart data from direct processor or JSON files.""" - self._get_data_from_processor(data_processor) - - def _load_ohlc_data(self): - """Load OHLC data from JSON file.""" - ohlc_file = Path("ohlc_data.json") - if not ohlc_file.exists(): - return - + """ + Pull latest bars/metrics from the processor and refresh plots. + Call this whenever you've added trade/book data + processor.flush(). + """ + self.ohlc_data = data_processor.bars or [] + # Optional: if your MetricsCalculator exposes a series try: - with open(ohlc_file, 'r') as f: - data = json.load(f) - - # Only update if data has changed - if len(data) != self.last_data_size: - self.ohlc_data = data - self.last_data_size = len(data) - logging.debug(f"Loaded {len(data)} OHLC bars") - - except (json.JSONDecodeError, FileNotFoundError) as e: - logging.warning(f"Failed to load OHLC data: {e}") - - def _load_metrics_data(self): - """Load metrics data (OBI, CVD) from JSON file.""" - metrics_file = Path("metrics_data.json") - if not metrics_file.exists(): - return - - try: - with open(metrics_file, 'r') as f: - data = json.load(f) - - # Only update if data has changed - if len(data) != self.last_metrics_size: - self.metrics_data = data - self.last_metrics_size = len(data) - logging.debug(f"Loaded {len(data)} metrics bars") - - except (json.JSONDecodeError, FileNotFoundError) as e: - logging.warning(f"Failed to load metrics data: {e}") - - def _load_depth_data(self): - """Load depth data (bids, asks) from JSON file.""" - depth_file = Path("depth_data.json") - if not depth_file.exists(): - return - - try: - with open(depth_file, 'r') as f: - data = json.load(f) - - # Only update if data has changed - if data != self.last_depth_data: - self.depth_data = data - self.last_depth_data = data.copy() if isinstance(data, dict) else data - logging.debug(f"Loaded depth data: {len(data.get('bids', []))} bids, {len(data.get('asks', []))} asks") - - except (json.JSONDecodeError, FileNotFoundError) as e: - logging.warning(f"Failed to load depth data: {e}") - + self.metrics_data = data_processor.get_metrics_series() or [] + except Exception: + self.metrics_data = [] + self._update_all_plots() + + # ----- Plot updates ----- def _update_all_plots(self): - """Update all chart plots with current data.""" self._update_ohlc_plot() self._update_volume_plot() self._update_obi_plot() self._update_cvd_plot() self._update_depth_plot() - + + def _clear_plot_items_but_crosshair(self, plot: pg.PlotWidget): + protected = (pg.InfiniteLine, pg.LabelItem) + items = [it for it in plot.items() if not isinstance(it, protected)] + for it in items: + plot.removeItem(it) + def _update_ohlc_plot(self): - """Update the OHLC plot with candlestick chart.""" if not self.ohlc_data: return - - # Clear existing plot items (but preserve crosshairs) - items = [item for item in self.ohlc_plot.items() if not isinstance(item, pg.InfiniteLine)] - for item in items: - self.ohlc_plot.removeItem(item) + self._clear_plot_items_but_crosshair(self.ohlc_plot) + self.ohlc_plot.addItem(OHLCItem(self.ohlc_data, body_ratio=0.8)) + + first_ts = self.ohlc_data[0]["timestamp_start"] / 1000.0 + last_ts = self.ohlc_data[-1].get("timestamp_end", self.ohlc_data[-1]["timestamp_start"]) / 1000.0 + self.ohlc_plot.setXRange(first_ts, last_ts) + + lows = [bar["low"] for bar in self.ohlc_data] + highs = [bar["high"] for bar in self.ohlc_data] + self.ohlc_plot.setYRange(min(lows), max(highs)) - # Create OHLC candlestick item - ohlc_item = OHLCItem(self.ohlc_data) - self.ohlc_plot.addItem(ohlc_item) - - logging.debug(f"Updated OHLC chart with {len(self.ohlc_data)} bars") - def _update_volume_plot(self): - """Update volume bar chart.""" if not self.ohlc_data: return - - # Clear existing plot items (but preserve crosshairs) - items = [item for item in self.volume_plot.items() if not isinstance(item, pg.InfiniteLine)] - for item in items: - self.volume_plot.removeItem(item) - - # Extract volume and price change data - timestamps = [bar[0] / 1000 for bar in self.ohlc_data] - volumes = [bar[5] for bar in self.ohlc_data] - - # Create volume bars with color coding - for i, (ts, vol) in enumerate(zip(timestamps, volumes)): - # Determine color based on price movement + self._clear_plot_items_but_crosshair(self.volume_plot) + + # Build centered volume bars sized to bucket width + for i, bar in enumerate(self.ohlc_data): + ts0 = bar["timestamp_start"] / 1000.0 + ts1 = bar.get("timestamp_end", bar["timestamp_start"]) / 1000.0 + full_w = max(1e-9, ts1 - ts0) + bar_w = full_w * 0.8 + x_center = 0.5 * (ts0 + ts1) + if i > 0: - prev_close = self.ohlc_data[i-1][4] # Previous close - curr_close = self.ohlc_data[i][4] # Current close - color = '#00ff00' if curr_close >= prev_close else '#ff0000' + prev_close = self.ohlc_data[i - 1]["close"] + color = "#00ff00" if bar["close"] >= prev_close else "#ff0000" else: - color = '#888888' # Neutral for first bar - - # Create bar - bar_item = pg.BarGraphItem(x=[ts], height=[vol], width=30, brush=color) - self.volume_plot.addItem(bar_item) - - logging.debug(f"Updated volume chart with {len(volumes)} bars") - + color = "#888888" + + vol = float(bar["volume"]) + self.volume_plot.addItem(pg.BarGraphItem(x=[x_center], height=[vol], width=bar_w, brush=color)) + def _update_obi_plot(self): - """Update OBI candlestick chart.""" - if not self.metrics_data: - return - - # Clear existing plot items (but preserve crosshairs) - items = [item for item in self.obi_plot.items() if not isinstance(item, pg.InfiniteLine)] - for item in items: - self.obi_plot.removeItem(item) - - # Extract OBI data and create candlestick format - obi_candlesticks = [] - for bar in self.metrics_data: - if len(bar) >= 5: # Ensure we have OBI data - timestamp, obi_open, obi_high, obi_low, obi_close = bar[:5] - obi_candlesticks.append([timestamp, obi_open, obi_high, obi_low, obi_close, 0]) - - if obi_candlesticks: - # Create OBI candlestick item with blue styling - obi_item = OBIItem(obi_candlesticks) # Will create this class - self.obi_plot.addItem(obi_item) - - logging.debug(f"Updated OBI chart with {len(obi_candlesticks)} bars") - - def _update_cvd_plot(self): - """Update CVD line chart.""" - if not self.metrics_data: - return - - # Clear existing plot items (but preserve crosshairs) - items = [item for item in self.cvd_plot.items() if not isinstance(item, pg.InfiniteLine)] - for item in items: - self.cvd_plot.removeItem(item) - - # Extract CVD data - timestamps = [] - cvd_values = [] - - for bar in self.metrics_data: - if len(bar) >= 6: # Check for CVD value - timestamps.append(bar[0] / 1000) # Convert to seconds - cvd_values.append(bar[5]) # CVD value - elif len(bar) >= 5: # Fallback for older format - timestamps.append(bar[0] / 1000) - cvd_values.append(0.0) # Default CVD - - if timestamps and cvd_values: - # Plot CVD as line chart - self.cvd_plot.plot(timestamps, cvd_values, pen=pg.mkPen(color='#ffff00', width=2), name='CVD') - - logging.debug(f"Updated CVD chart with {len(cvd_values)} points") - - def _cumulate_levels(self, levels, reverse=False, limit=50): """ - Convert individual price levels to cumulative volumes. - - Args: - levels: List of [price, size] pairs - reverse: If True, sort in descending order (for bids) - limit: Maximum number of levels to include - - Returns: - List of (price, cumulative_volume) tuples + Expecting metrics_data entries shaped like: + [ts_start_ms, ts_end_ms, obi_o, obi_h, obi_l, obi_c, ...] + Adapt if your MetricsCalculator differs. + """ + + def _update_obi_plot(self): + """ + Update OBI panel with candlesticks from metrics_data. + Each row expected: [ts_start_ms, ts_end_ms, obi_o, obi_h, obi_l, obi_c, cvd] + """ + if not self.metrics_data: + return + self._clear_plot_items_but_crosshair(self.obi_plot) + + # Convert metrics_data rows to candle dicts + candlesticks = [] + for row in self.metrics_data: + if len(row) >= 6: + ts0, ts1, o, h, l, c = row[:6] + candlesticks.append({ + "timestamp_start": int(ts0), + "timestamp_end": int(ts1), + "open": float(o), + "high": float(h), + "low": float(l), + "close": float(c), + "volume": 0.0, + }) + + if candlesticks: + self.obi_plot.addItem(OBIItem(candlesticks, body_ratio=0.8)) + + # Also set Y range explicitly + lows = [c["low"] for c in candlesticks] + highs = [c["high"] for c in candlesticks] + self.obi_plot.setYRange(min(lows), max(highs)) + + def _update_cvd_plot(self): + """ + Plot CVD as a line if present at index 6, else skip. + """ + if not self.metrics_data: + return + self._clear_plot_items_but_crosshair(self.cvd_plot) + + xs = [] + ys = [] + for row in self.metrics_data: + if len(row) >= 7: + ts0 = row[0] / 1000.0 + cvd_val = float(row[6]) + xs.append(ts0) + ys.append(cvd_val) + if xs: + self.cvd_plot.plot(xs, ys, pen=pg.mkPen(color="#ffff00", width=2), name="CVD") + + # ----- Depth chart ----- + @staticmethod + def _cumulate_levels(levels, reverse=False, limit=50): + """ + levels: list of [price, size] + returns list of (price, cum_volume) """ if not levels: return [] - try: - # Sort levels by price - sorted_levels = sorted(levels[:limit], key=lambda x: x[0], reverse=reverse) - - # Calculate cumulative volumes - cumulative = [] - total_volume = 0.0 - - for price, size in sorted_levels: - total_volume += size - cumulative.append((price, total_volume)) - - return cumulative - + sorted_lvls = sorted(levels[:limit], key=lambda x: x[0], reverse=reverse) + cum = [] + total = 0.0 + for price, size in sorted_lvls: + total += float(size) + cum.append((float(price), total)) + return cum except Exception as e: - logging.warning(f"Error in cumulate_levels: {e}") + logging.warning(f"cumulate_levels error: {e}") return [] - + def _update_depth_plot(self): - """Update depth chart with cumulative bid/ask visualization.""" if not self.depth_data or not isinstance(self.depth_data, dict): return - - # Clear all items for depth chart (no crosshairs here) self.depth_plot.clear() - - bids = self.depth_data.get('bids', []) - asks = self.depth_data.get('asks', []) - - # Calculate cumulative levels + + bids = self.depth_data.get("bids", []) + asks = self.depth_data.get("asks", []) + cum_bids = self._cumulate_levels(bids, reverse=True, limit=50) cum_asks = self._cumulate_levels(asks, reverse=False, limit=50) - - # Plot bids (green) + if cum_bids: - bid_volumes = [vol for _, vol in cum_bids] - bid_prices = [price for price, _ in cum_bids] - - # Create stepped line plot for bids - self.depth_plot.plot(bid_volumes, bid_prices, - pen=pg.mkPen(color='#00c800', width=2), - stepMode='left', name='Bids') - - # Add fill area - fill_curve = pg.PlotCurveItem(bid_volumes + [0], bid_prices + [bid_prices[-1]], - fillLevel=0, fillBrush=pg.mkBrush(color=(0, 200, 0, 50))) - self.depth_plot.addItem(fill_curve) - - # Plot asks (red) + bid_prices = [p for p, _ in cum_bids] + bid_vols = [v for _, v in cum_bids] + self.depth_plot.plot(bid_vols, bid_prices, pen=pg.mkPen(color="#00c800", width=2), + stepMode="left", name="Bids") + fill_b = pg.PlotCurveItem(bid_vols + [0], bid_prices + [bid_prices[-1]], fillLevel=0, + fillBrush=pg.mkBrush(color=(0, 200, 0, 50))) + self.depth_plot.addItem(fill_b) + if cum_asks: - ask_volumes = [vol for _, vol in cum_asks] - ask_prices = [price for price, _ in cum_asks] - - # Create stepped line plot for asks - self.depth_plot.plot(ask_volumes, ask_prices, - pen=pg.mkPen(color='#c80000', width=2), - stepMode='left', name='Asks') - - # Add fill area - fill_curve = pg.PlotCurveItem(ask_volumes + [0], ask_prices + [ask_prices[-1]], - fillLevel=0, fillBrush=pg.mkBrush(color=(200, 0, 0, 50))) - self.depth_plot.addItem(fill_curve) - - logging.debug(f"Updated depth chart: {len(cum_bids)} bid levels, {len(cum_asks)} ask levels") - + ask_prices = [p for p, _ in cum_asks] + ask_vols = [v for _, v in cum_asks] + self.depth_plot.plot(ask_vols, ask_prices, pen=pg.mkPen(color="#c80000", width=2), + stepMode="left", name="Asks") + fill_a = pg.PlotCurveItem(ask_vols + [0], ask_prices + [ask_prices[-1]], fillLevel=0, + fillBrush=pg.mkBrush(color=(200, 0, 0, 50))) + self.depth_plot.addItem(fill_a) + + # ----- Crosshair + inspection ----- def _on_mouse_moved(self, pos): - """Handle mouse movement for crosshair and data inspection.""" try: - # Determine which plot triggered the event sender = self.sender() if not sender: return - - # Find the plot widget from the scene - plot_widget = None + target = None for plot in [self.ohlc_plot, self.volume_plot, self.obi_plot, self.cvd_plot]: if plot.scene() == sender: - plot_widget = plot + target = plot break - - if not plot_widget: + if target is None: return - - # Convert scene coordinates to plot coordinates - if plot_widget.sceneBoundingRect().contains(pos): - mouse_point = plot_widget.plotItem.vb.mapSceneToView(pos) - x_pos = mouse_point.x() - y_pos = mouse_point.y() - - # Update crosshair positions - self.vline.setPos(x_pos) - self.hline_ohlc.setPos(y_pos if plot_widget == self.ohlc_plot else self.hline_ohlc.pos()[1]) - self.hline_volume.setPos(y_pos if plot_widget == self.volume_plot else self.hline_volume.pos()[1]) - self.hline_obi.setPos(y_pos if plot_widget == self.obi_plot else self.hline_obi.pos()[1]) - self.hline_cvd.setPos(y_pos if plot_widget == self.cvd_plot else self.hline_cvd.pos()[1]) - - # Update data inspection - self._update_data_inspection(x_pos, plot_widget) - + + if target.sceneBoundingRect().contains(pos): + mp = target.plotItem.vb.mapSceneToView(pos) + x = mp.x() + y = mp.y() + + # Move all vertical lines to the same x (keep panels aligned) + self.vline_ohlc.setPos(x) + self.vline_volume.setPos(x) + self.vline_obi.setPos(x) + self.vline_cvd.setPos(x) + + # Move only the active panel's horizontal line + if target == self.ohlc_plot: + self.hline_ohlc.setPos(y) + elif target == self.volume_plot: + self.hline_volume.setPos(y) + elif target == self.obi_plot: + self.hline_obi.setPos(y) + elif target == self.cvd_plot: + self.hline_cvd.setPos(y) + + self._update_data_label(x) except Exception as e: - logging.debug(f"Error in mouse move handler: {e}") - - def _update_data_inspection(self, x_pos, plot_widget): - """Update data inspection label with values at cursor position.""" + logging.debug(f"mouse move err: {e}") + + def _update_data_label(self, x_seconds: float): try: - info_parts = [] - - # Find closest data point for OHLC + parts = [] + if self.ohlc_data: - closest_ohlc = self._find_closest_data_point(x_pos, self.ohlc_data) - if closest_ohlc: - ts, open_p, high, low, close, volume = closest_ohlc - time_str = self._format_timestamp(ts) - info_parts.append(f"Time: {time_str}") - info_parts.append(f"OHLC: O:{open_p:.2f} H:{high:.2f} L:{low:.2f} C:{close:.2f}") - info_parts.append(f"Volume: {volume:.4f}") - - # Find closest data point for Metrics (OBI/CVD) + closest = self._closest_bar(self.ohlc_data, x_seconds) + if closest: + parts.append(f"Time: {self._fmt_ts(closest['timestamp_start'])}") + parts.append(f"OHLC O:{closest['open']:.2f} H:{closest['high']:.2f} " + f"L:{closest['low']:.2f} C:{closest['close']:.2f}") + parts.append(f"Vol: {float(closest['volume']):.6f}") + if self.metrics_data: - closest_metrics = self._find_closest_data_point(x_pos, self.metrics_data) - if closest_metrics and len(closest_metrics) >= 5: - ts, obi_o, obi_h, obi_l, obi_c = closest_metrics[:5] - cvd_val = closest_metrics[5] if len(closest_metrics) > 5 else 0.0 - info_parts.append(f"OBI: O:{obi_o:.2f} H:{obi_h:.2f} L:{obi_l:.2f} C:{obi_c:.2f}") - info_parts.append(f"CVD: {cvd_val:.2f}") - - # Update label - if info_parts: - self.data_label.setText("
".join(info_parts)) - else: - self.data_label.setText("No data") - + # Optional: display OBI/CVD values if present + row = self._closest_metrics_row(self.metrics_data, x_seconds) + if row and len(row) >= 6: + _, _, o, h, l, c = row[:6] + parts.append(f"OBI O:{o:.2f} H:{h:.2f} L:{l:.2f} C:{c:.2f}") + if row and len(row) >= 7: + parts.append(f"CVD: {float(row[6]):.2f}") + + self.data_label.setText("
".join(parts) if parts else "No data") except Exception as e: - logging.debug(f"Error updating data inspection: {e}") - - def _find_closest_data_point(self, x_pos, data): - """Find the closest data point to the given x position.""" - if not data: + logging.debug(f"update label err: {e}") + + @staticmethod + def _closest_bar(bars: List[Dict[str, Any]], x_seconds: float) -> Optional[Dict[str, Any]]: + if not bars: return None - - # Convert x_pos (seconds) back to milliseconds for comparison - x_ms = x_pos * 1000 - - # Find closest timestamp - closest_idx = 0 - min_diff = abs(data[0][0] - x_ms) - - for i, bar in enumerate(data): - diff = abs(bar[0] - x_ms) - if diff < min_diff: - min_diff = diff - closest_idx = i - - return data[closest_idx] - - def _format_timestamp(self, timestamp_ms): - """Format timestamp for display.""" + x_ms = x_seconds * 1000.0 + closest = min(bars, key=lambda b: abs(b["timestamp_start"] - x_ms)) + return closest + + @staticmethod + def _closest_metrics_row(rows: List[Any], x_seconds: float) -> Optional[Any]: + if not rows: + return None + x_ms = x_seconds * 1000.0 + try: + return min(rows, key=lambda r: abs(r[0] - x_ms)) + except Exception: + return None + + @staticmethod + def _fmt_ts(ts_ms: int) -> str: from datetime import datetime try: - dt = datetime.fromtimestamp(timestamp_ms / 1000) - return dt.strftime("%H:%M:%S") - except: - return str(int(timestamp_ms)) - - def setup_data_processor(self, processor): - """Setup direct data integration with OHLCProcessor.""" - self.data_processor = processor - # For now, use JSON mode as the direct mode implementation is incomplete - self.direct_mode = False - - # Setup callbacks for real-time data updates - self._setup_processor_callbacks() - - logging.info("Data processor reference set, using JSON file mode for visualization") - - def _setup_processor_callbacks(self): - """Setup callbacks to receive data directly from processor.""" - if not self.data_processor: - return - - # Replace JSON polling with direct data access - # Note: This is a simplified approach - in production, you'd want proper callbacks - # from the processor when new data is available - - logging.debug("Processor callbacks setup completed") - - def _get_data_from_processor(self, data_processor: OHLCProcessor): - """Get data directly from processor instead of JSON files.""" - try: - self.ohlc_data = data_processor.get_ohlc_data() - self.metrics_data = data_processor.get_metrics_data() - self.depth_data = data_processor.get_current_depth() - - # Get OHLC data from processor (placeholder - needs actual processor API) - # processor_ohlc = self.data_processor.get_ohlc_data() - # if processor_ohlc: - # self.ohlc_data = processor_ohlc - - # Get metrics data from processor - # processor_metrics = self.data_processor.get_metrics_data() - # if processor_metrics: - # self.metrics_data = processor_metrics - - # Get depth data from processor - # processor_depth = self.data_processor.get_current_depth() - # if processor_depth: - # self.depth_data = processor_depth - - logging.debug("Retrieved data directly from processor") - - except Exception as e: - logging.warning(f"Error getting data from processor: {e}") - # Fallback to JSON mode - self.direct_mode = False - -def main(): - """Application entry point.""" - logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") - - app = QApplication(sys.argv) - - window = MainWindow() - window.show() - - logging.info("Desktop application started") - - sys.exit(app.exec()) - -if __name__ == "__main__": - main() + return datetime.fromtimestamp(ts_ms / 1000.0).strftime("%Y-%m-%d %H:%M:%S") + except Exception: + return str(int(ts_ms)) \ No newline at end of file diff --git a/level_parser.py b/level_parser.py index fb1fbf5..41111db 100644 --- a/level_parser.py +++ b/level_parser.py @@ -1,85 +1,87 @@ -"""Level parsing utilities for orderbook data.""" +"""Ultra-fast level parsing for strings like: +"[['110173.4', '0.0000454', '0', '4'], ['110177.1', '0', '0', '0'], ...]" +""" -import json -import ast -import logging -from typing import List, Any, Tuple +from typing import List, Tuple, Any def normalize_levels(levels: Any) -> List[List[float]]: """ - Convert string-encoded levels into [[price, size], ...] floats. - - Filters out zero/negative sizes. Supports JSON and Python literal formats. + Return [[price, size], ...] with size > 0 only (floats). + Assumes 'levels' is a single-quoted list-of-lists string as above. """ - if not levels or levels == '[]': - return [] - - parsed = _parse_string_to_list(levels) - if not parsed: - return [] - - pairs: List[List[float]] = [] - for item in parsed: - price, size = _extract_price_size(item) - if price is None or size is None: - continue - try: - p, s = float(price), float(size) - if s > 0: - pairs.append([p, s]) - except Exception: - continue - - if not pairs: - logging.debug("normalize_levels: no valid pairs parsed from input") - return pairs + pairs = _fast_pairs(levels) + # filter strictly positive sizes + return [[p, s] for (p, s) in pairs if s > 0.0] def parse_levels_including_zeros(levels: Any) -> List[Tuple[float, float]]: """ - Parse levels into (price, size) tuples including zero sizes for deletions. - - Similar to normalize_levels but preserves zero sizes (for orderbook deletions). + Return [(price, size), ...] (floats), preserving zeros for deletions. + Assumes 'levels' is a single-quoted list-of-lists string as above. """ - if not levels or levels == '[]': + return _fast_pairs(levels) + + +# ----------------- internal: fast path ----------------- + +def _fast_pairs(levels: Any) -> List[Tuple[float, float]]: + """ + Extremely fast parser for inputs like: + "[['110173.4','0.0000454','0','4'],['110177.1','0','0','0'], ...]" + Keeps only the first two fields from each row and converts to float. + """ + if not levels: return [] - parsed = _parse_string_to_list(levels) - if not parsed: + # If already a list (rare in your pipeline), fall back to simple handling + if isinstance(levels, (list, tuple)): + out: List[Tuple[float, float]] = [] + for item in levels: + if isinstance(item, (list, tuple)) and len(item) >= 2: + try: + p = float(item[0]); s = float(item[1]) + out.append((p, s)) + except Exception: + continue + return out + + # Expect a string: strip outer brackets and single quotes fast + s = str(levels).strip() + if len(s) < 5: # too short to contain "[[...]]" return [] - results: List[Tuple[float, float]] = [] - for item in parsed: - price, size = _extract_price_size(item) - if price is None or size is None: + # Remove the outermost [ and ] quickly (tolerant) + if s[0] == '[': + s = s[1:] + if s and s[-1] == ']': + s = s[:-1] + + # Remove *all* single quotes (input uses single quotes, not JSON) + s = s.replace("'", "") + + # Now s looks like: [[110173.4, 0.0000454, 0, 4], [110177.1, 0, 0, 0], ...] + # Split into rows on "],", then strip brackets/spaces per row + rows = s.split("],") + out: List[Tuple[float, float]] = [] + + for row in rows: + row = row.strip() + # strip any leading/trailing brackets/spaces + if row.startswith('['): + row = row[1:] + if row.endswith(']'): + row = row[:-1] + + # fast split by commas and take first two fields + cols = row.split(',') + if len(cols) < 2: continue try: - p, s = float(price), float(size) - if s >= 0: - results.append((p, s)) + p = float(cols[0].strip()) + s_ = float(cols[1].strip()) + out.append((p, s_)) except Exception: continue - return results - - -def _parse_string_to_list(levels: Any) -> List[Any]: - """Parse string levels to list, trying JSON first then literal_eval.""" - try: - parsed = json.loads(levels) - except Exception: - try: - parsed = ast.literal_eval(levels) - except Exception: - return [] - return parsed if isinstance(parsed, list) else [] - - -def _extract_price_size(item: Any) -> Tuple[Any, Any]: - """Extract price and size from dict or list/tuple format.""" - if isinstance(item, dict): - return item.get("price", item.get("p")), item.get("size", item.get("s")) - elif isinstance(item, (list, tuple)) and len(item) >= 2: - return item[0], item[1] - return None, None + return out diff --git a/main.py b/main.py index 56d194f..c1f93cc 100644 --- a/main.py +++ b/main.py @@ -10,15 +10,12 @@ from ohlc_processor import OHLCProcessor from desktop_app import MainWindow import sys from PySide6.QtWidgets import QApplication +from PySide6.QtCore import Signal, QTimer def main(instrument: str = typer.Argument(..., help="Instrument to backtest, e.g. BTC-USDT"), start_date: str = typer.Argument(..., help="Start date, e.g. 2025-07-01"), - end_date: str = typer.Argument(..., help="End date, e.g. 2025-08-01"), - window_seconds: int = typer.Option(60, help="OHLC window size in seconds")): - """ - Process orderbook data and visualize OHLC charts in real-time. - """ + end_date: str = typer.Argument(..., help="End date, e.g. 2025-08-01")): logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") start_date = datetime.strptime(start_date, "%Y-%m-%d").replace(tzinfo=timezone.utc) @@ -39,10 +36,18 @@ def main(instrument: str = typer.Argument(..., help="Instrument to backtest, e.g logging.info(f"Found {len(db_paths)} database files: {[p.name for p in db_paths]}") - processor = OHLCProcessor(window_seconds=window_seconds) + processor = OHLCProcessor(aggregate_window_seconds=60 * 60) + + app = QApplication(sys.argv) + desktop_app = MainWindow() + desktop_app.show() + + timer = QTimer() + timer.timeout.connect(lambda: desktop_app.update_data(processor)) + timer.start(1000) + def process_data(): - """Process database data in a separate thread.""" try: for db_path in db_paths: db_name_parts = db_path.name.split(".")[0].split("-") @@ -64,30 +69,19 @@ def main(instrument: str = typer.Argument(..., help="Instrument to backtest, e.g for orderbook_update, trades in db_interpreter.stream(): batch_count += 1 - processor.process_trades(trades) processor.update_orderbook(orderbook_update) + processor.process_trades(trades) + # desktop_app.update_data(processor) + - processor.finalize() logging.info("Data processing completed") except Exception as e: logging.error(f"Error in data processing: {e}") - try: - app = QApplication(sys.argv) - desktop_app = MainWindow() - - desktop_app.setup_data_processor(processor) - desktop_app.show() - - logging.info("Desktop visualizer started") - - data_thread = threading.Thread(target=process_data, daemon=True) - data_thread.start() - - app.exec() - - except Exception as e: - logging.error(f"Failed to start desktop visualizer: {e}") + data_thread = threading.Thread(target=process_data, daemon=True) + data_thread.start() + + app.exec() if __name__ == "__main__": diff --git a/metrics_calculator.py b/metrics_calculator.py index b1d6507..c61957f 100644 --- a/metrics_calculator.py +++ b/metrics_calculator.py @@ -1,5 +1,5 @@ import logging -from typing import Optional, Tuple +from typing import Optional, List class MetricsCalculator: @@ -7,6 +7,20 @@ class MetricsCalculator: self.cvd_cumulative = 0.0 self.obi_value = 0.0 + # --- per-bucket state --- + self._b_ts_start: Optional[int] = None + self._b_ts_end: Optional[int] = None + self._obi_o: Optional[float] = None + self._obi_h: Optional[float] = None + self._obi_l: Optional[float] = None + self._obi_c: Optional[float] = None + + # final series rows: [ts_start, ts_end, obi_o, obi_h, obi_l, obi_c, cvd] + self._series: List[List[float]] = [] + + # ------------------------------ + # CVD + # ------------------------------ def update_cvd_from_trade(self, side: str, size: float) -> None: if side == "buy": volume_delta = float(size) @@ -14,8 +28,56 @@ class MetricsCalculator: volume_delta = -float(size) else: logging.warning(f"Unknown trade side '{side}', treating as neutral") - + volume_delta = 0.0 self.cvd_cumulative += volume_delta + # ------------------------------ + # OBI + # ------------------------------ def update_obi_from_book(self, total_bids: float, total_asks: float) -> None: self.obi_value = float(total_bids - total_asks) + # update H/L/C if a bucket is open + if self._b_ts_start is not None: + v = self.obi_value + if self._obi_o is None: + self._obi_o = self._obi_h = self._obi_l = self._obi_c = v + else: + self._obi_h = max(self._obi_h, v) + self._obi_l = min(self._obi_l, v) + self._obi_c = v + + # ------------------------------ + # Bucket lifecycle + # ------------------------------ + def begin_bucket(self, ts_start_ms: int, ts_end_ms: int) -> None: + self._b_ts_start = int(ts_start_ms) + self._b_ts_end = int(ts_end_ms) + v = float(self.obi_value) + self._obi_o = self._obi_h = self._obi_l = self._obi_c = v + + def finalize_bucket(self) -> None: + if self._b_ts_start is None or self._b_ts_end is None: + return + o = float(self._obi_o if self._obi_o is not None else self.obi_value) + h = float(self._obi_h if self._obi_h is not None else self.obi_value) + l = float(self._obi_l if self._obi_l is not None else self.obi_value) + c = float(self._obi_c if self._obi_c is not None else self.obi_value) + self._series.append([ + self._b_ts_start, self._b_ts_end, o, h, l, c, float(self.cvd_cumulative) + ]) + # reset + self._b_ts_start = self._b_ts_end = None + self._obi_o = self._obi_h = self._obi_l = self._obi_c = None + + def add_flat_bucket(self, ts_start_ms: int, ts_end_ms: int) -> None: + v = float(self.obi_value) + self._series.append([ + int(ts_start_ms), int(ts_end_ms), + v, v, v, v, float(self.cvd_cumulative) + ]) + + # ------------------------------ + # Output + # ------------------------------ + def get_series(self): + return self._series diff --git a/ohlc_processor.py b/ohlc_processor.py index 4950e69..ca1f5f4 100644 --- a/ohlc_processor.py +++ b/ohlc_processor.py @@ -1,70 +1,191 @@ import logging -from typing import List, Any, Dict, Tuple +from typing import List, Any, Dict, Tuple, Optional -from viz_io import add_ohlc_bar, upsert_ohlc_bar, _atomic_write_json, DEPTH_FILE from db_interpreter import OrderbookUpdate -from level_parser import normalize_levels, parse_levels_including_zeros +from level_parser import parse_levels_including_zeros from orderbook_manager import OrderbookManager from metrics_calculator import MetricsCalculator class OHLCProcessor: """ - Processes trade data and orderbook updates into OHLC bars and depth snapshots. - - This class aggregates individual trades into time-windowed OHLC (Open, High, Low, Close) - bars and maintains an in-memory orderbook state for depth visualization. It also - calculates Order Book Imbalance (OBI) and Cumulative Volume Delta (CVD) metrics. - - The processor uses throttled updates to balance visualization responsiveness with - I/O efficiency, emitting intermediate updates during active windows. - - Attributes: - window_seconds: Time window duration for OHLC aggregation - depth_levels_per_side: Number of top price levels to maintain per side - trades_processed: Total number of trades processed - bars_created: Total number of OHLC bars created - cvd_cumulative: Running cumulative volume delta (via metrics calculator) + Time-bucketed OHLC aggregator with gap-bar filling and metric hooks. + + - Bars are aligned to fixed buckets of length `aggregate_window_seconds`. + - If there is a gap (no trades for one or more buckets), synthetic zero-volume + candles are emitted with O=H=L=C=last_close AND a flat metrics bucket is added. + - By default, the next bar's OPEN is the previous bar's CLOSE (configurable via + `carry_forward_open`). """ - def __init__(self) -> None: - self.current_bar = None + def __init__(self, aggregate_window_seconds: int, carry_forward_open: bool = True) -> None: + self.aggregate_window_seconds = int(aggregate_window_seconds) + self._bucket_ms = self.aggregate_window_seconds * 1000 + + self.carry_forward_open = carry_forward_open + + self.current_bar: Optional[Dict[str, Any]] = None + self._current_bucket_index: Optional[int] = None + self._last_close: Optional[float] = None + self.trades_processed = 0 + self.bars: List[Dict[str, Any]] = [] self._orderbook = OrderbookManager() self._metrics = MetricsCalculator() - @property - def cvd_cumulative(self) -> float: - """Access cumulative CVD from metrics calculator.""" - return self._metrics.cvd_cumulative + # ----------------------- + # Internal helpers + # ----------------------- + def _new_bar(self, bucket_start_ms: int, open_price: float) -> Dict[str, Any]: + return { + "timestamp_start": bucket_start_ms, + "timestamp_end": bucket_start_ms + self._bucket_ms, + "open": float(open_price), + "high": float(open_price), + "low": float(open_price), + "close": float(open_price), + "volume": 0.0, + } + def _emit_gap_bars(self, from_index: int, to_index: int) -> None: + """ + Emit empty buckets strictly between from_index and to_index. + Each synthetic bar has zero volume and O=H=L=C=last_close. + Also emit a flat metrics bucket for each gap. + """ + if self._last_close is None: + return + + for bi in range(from_index + 1, to_index): + start_ms = bi * self._bucket_ms + gap_bar = self._new_bar(start_ms, self._last_close) + self.bars.append(gap_bar) + + # metrics: add a flat bucket to keep OBI/CVD time-continuous + try: + self._metrics.add_flat_bucket(start_ms, start_ms + self._bucket_ms) + except Exception as e: + logging.debug(f"metrics add_flat_bucket error (ignored): {e}") + + # ----------------------- + # Public API + # ----------------------- def process_trades(self, trades: List[Tuple[Any, ...]]) -> None: + """ + trades: iterables like (trade_id, trade_id_str, price, size, side, timestamp_ms, ...) + timestamp_ms expected in milliseconds. + """ + if not trades: + return + + # Ensure time-ascending order; if upstream guarantees it, you can skip. + trades = sorted(trades, key=lambda t: int(t[5])) + for trade in trades: trade_id, trade_id_str, price, size, side, timestamp_ms = trade[:6] + price = float(price) + size = float(size) timestamp_ms = int(timestamp_ms) + self.trades_processed += 1 - self._metrics.update_cvd_from_trade(side, size) + # Metrics driven by trades: update CVD + try: + self._metrics.update_cvd_from_trade(side, size) + except Exception as e: + logging.debug(f"CVD update error (ignored): {e}") - if not self.current_bar: - self.current_bar = { - 'open': float(price), - 'high': float(price), - 'low': float(price), - 'close': float(price) - } - self.current_bar['high'] = max(self.current_bar['high'], float(price)) - self.current_bar['low'] = min(self.current_bar['low'], float(price)) - self.current_bar['close'] = float(price) - self.current_bar['volume'] += float(size) + # Determine this trade's bucket + bucket_index = timestamp_ms // self._bucket_ms + bucket_start = bucket_index * self._bucket_ms + # New bucket? + if self._current_bucket_index is None or bucket_index != self._current_bucket_index: + # finalize prior bar + if self.current_bar is not None: + self.bars.append(self.current_bar) + self._last_close = self.current_bar["close"] + # finalize metrics for the prior bucket window + try: + self._metrics.finalize_bucket() + except Exception as e: + logging.debug(f"metrics finalize_bucket error (ignored): {e}") + + # handle gaps + if self._current_bucket_index is not None and bucket_index > self._current_bucket_index + 1: + self._emit_gap_bars(self._current_bucket_index, bucket_index) + + # pick open price policy + if self.carry_forward_open and self._last_close is not None: + open_for_new = self._last_close + else: + open_for_new = price + + self.current_bar = self._new_bar(bucket_start, open_for_new) + self._current_bucket_index = bucket_index + + # begin a new metrics bucket aligned to this bar window + try: + self._metrics.begin_bucket(bucket_start, bucket_start + self._bucket_ms) + except Exception as e: + logging.debug(f"metrics begin_bucket error (ignored): {e}") + + # Update current bucket with this trade + b = self.current_bar + if b is None: + # Should not happen, but guard anyway + b = self._new_bar(bucket_start, price) + self.current_bar = b + self._current_bucket_index = bucket_index + try: + self._metrics.begin_bucket(bucket_start, bucket_start + self._bucket_ms) + except Exception as e: + logging.debug(f"metrics begin_bucket (guard) error (ignored): {e}") + + b["high"] = max(b["high"], price) + b["low"] = min(b["low"], price) + b["close"] = price + b["volume"] += size + # keep timestamp_end snapped to bucket boundary + b["timestamp_end"] = bucket_start + self._bucket_ms + + def flush(self) -> None: + """Emit the in-progress bar (if any). Call at the end of a run/backtest.""" + if self.current_bar is not None: + self.bars.append(self.current_bar) + self._last_close = self.current_bar["close"] + self.current_bar = None + # finalize any open metrics bucket + try: + self._metrics.finalize_bucket() + except Exception as e: + logging.debug(f"metrics finalize_bucket on flush error (ignored): {e}") def update_orderbook(self, ob_update: OrderbookUpdate) -> None: + """ + Apply orderbook deltas and refresh OBI metrics. + Call this frequently (on each OB update) so intra-bucket OBI highs/lows track the book. + """ bids_updates = parse_levels_including_zeros(ob_update.bids) asks_updates = parse_levels_including_zeros(ob_update.asks) self._orderbook.apply_updates(bids_updates, asks_updates) total_bids, total_asks = self._orderbook.get_total_volume() - self._metrics.update_obi_from_book(total_bids, total_asks) - + try: + self._metrics.update_obi_from_book(total_bids, total_asks) + except Exception as e: + logging.debug(f"OBI update error (ignored): {e}") + + # ----------------------- + # UI-facing helpers + # ----------------------- + def get_metrics_series(self): + """ + Returns a list of rows: + [timestamp_start_ms, timestamp_end_ms, obi_open, obi_high, obi_low, obi_close, cvd_value] + """ + try: + return self._metrics.get_series() + except Exception: + return []