diff --git a/desktop_app.py b/desktop_app.py index 81fbf85..3a2977c 100644 --- a/desktop_app.py +++ b/desktop_app.py @@ -1,698 +1,494 @@ """ Desktop visualization application using PySide6 and PyQtGraph. -This module provides a native desktop replacement for the Dash web application, -offering better performance, debugging capabilities, and real-time potential. +- OHLC candles (time-bucketed, gap-filled, width from timestamp span) +- Volume bars (width from timestamp span) +- OBI candles (same visuals as OHLC but blue) +- CVD line +- Depth chart (cumulative bids/asks) +- Crosshairs + data inspection """ import sys -import json import logging -from pathlib import Path -from typing import List, Optional, Dict, Any -from PySide6.QtWidgets import QApplication, QMainWindow, QVBoxLayout, QWidget, QHBoxLayout -from PySide6.QtCore import QTimer +from typing import List, Dict, Any, Optional + +import numpy as np import pyqtgraph as pg from pyqtgraph import QtCore, QtGui -import numpy as np +from pyqtgraph.graphicsItems.DateAxisItem import DateAxisItem + +from PySide6.QtWidgets import QApplication, QMainWindow, QVBoxLayout, QWidget, QHBoxLayout +from PySide6.QtCore import Qt + from ohlc_processor import OHLCProcessor -class OHLCItem(pg.GraphicsObject): - """Custom OHLC candlestick item for PyQtGraph.""" - - def __init__(self, data): - """ - Initialize OHLC item with data. - - Args: - data: List of tuples (timestamp, open, high, low, close, volume) - """ - pg.GraphicsObject.__init__(self) - self.data = data - self.generatePicture() - - def generatePicture(self): - """Generate the candlestick chart picture.""" - self.picture = QtGui.QPicture() - painter = QtGui.QPainter(self.picture) - - pen_up = pg.mkPen(color='#00ff00', width=1) # Green for up candles - pen_down = pg.mkPen(color='#ff0000', width=1) # Red for down candles - brush_up = pg.mkBrush(color='#00ff00') - brush_down = pg.mkBrush(color='#ff0000') - - # Dynamic candle width based on data density - if len(self.data) > 1: - time_diff = (self.data[1][0] - self.data[0][0]) / 1000 # Time between candles in seconds - width = time_diff * 0.8 # 80% of time interval - else: - width = 30 # Default width in seconds - - for timestamp, open_price, high, low, close, volume in self.data: - x = timestamp / 1000 # Convert ms to seconds - - # Determine candle color - is_up = close >= open_price - pen = pen_up if is_up else pen_down - brush = brush_up if is_up else brush_down - - # Draw wick (high-low line) - painter.setPen(pen) - painter.drawLine(QtCore.QPointF(x, low), QtCore.QPointF(x, high)) - - # Draw body (open-close rectangle) - body_height = abs(close - open_price) - body_bottom = min(open_price, close) - - painter.setPen(pen) - painter.setBrush(brush) - painter.drawRect(QtCore.QRectF(x - width/2, body_bottom, width, body_height)) - - painter.end() - - def paint(self, painter, option, widget): - """Paint the candlestick chart.""" - painter.drawPicture(0, 0, self.picture) - - def boundingRect(self): - """Return the bounding rectangle of the item.""" - return QtCore.QRectF(self.picture.boundingRect()) -class OBIItem(pg.GraphicsObject): - """Custom OBI candlestick item with blue styling.""" - - def __init__(self, data): - """Initialize OBI item with blue color scheme.""" - pg.GraphicsObject.__init__(self) +# --------------------------- +# Candlestick Graphics Items +# --------------------------- +class _BaseCandle(pg.GraphicsObject): + """ + Base class for time-based candles: + - x-axis is epoch seconds (float) + - candle width derived from (timestamp_end - timestamp_start) + """ + def __init__(self, data: List[Dict[str, Any]], body_ratio: float = 0.8, + color_up="#00ff00", color_down="#ff0000", pen_width: int = 1): + super().__init__() self.data = data + self.body_ratio = max(0.0, min(1.0, body_ratio)) + self.pen_up = pg.mkPen(color=color_up, width=pen_width) + self.pen_down = pg.mkPen(color=color_down, width=pen_width) + self.brush_up = pg.mkBrush(color=color_up) + self.brush_down = pg.mkBrush(color=color_down) + self._picture = None self.generatePicture() - - def generatePicture(self): - """Generate OBI candlestick chart with blue styling.""" - self.picture = QtGui.QPicture() - painter = QtGui.QPainter(self.picture) - - # Blue color scheme for OBI - pen_up = pg.mkPen(color='#4a9eff', width=1) # Light blue for up - pen_down = pg.mkPen(color='#1f5f99', width=1) # Dark blue for down - brush_up = pg.mkBrush(color='#4a9eff') - brush_down = pg.mkBrush(color='#1f5f99') - - # Dynamic width calculation - if len(self.data) > 1: - time_diff = (self.data[1][0] - self.data[0][0]) / 1000 - width = time_diff * 0.8 - else: - width = 30 - - for timestamp, open_price, high, low, close, _ in self.data: - x = timestamp / 1000 - - # Determine color - is_up = close >= open_price - pen = pen_up if is_up else pen_down - brush = brush_up if is_up else brush_down - - # Draw wick - painter.setPen(pen) - painter.drawLine(QtCore.QPointF(x, low), QtCore.QPointF(x, high)) - - # Draw body - body_height = abs(close - open_price) - body_bottom = min(open_price, close) - - painter.setPen(pen) - painter.setBrush(brush) - painter.drawRect(QtCore.QRectF(x - width/2, body_bottom, width, body_height)) - - painter.end() - - def paint(self, painter, option, widget): - """Paint the OBI candlestick chart.""" - painter.drawPicture(0, 0, self.picture) - - def boundingRect(self): - """Return the bounding rectangle.""" - return QtCore.QRectF(self.picture.boundingRect()) + def generatePicture(self): + pic = QtGui.QPicture() + p = QtGui.QPainter(pic) + + for entry in self.data: + ts0_ms = entry.get("timestamp_start") + ts1_ms = entry.get("timestamp_end", ts0_ms) + if ts0_ms is None: + continue + + x0 = ts0_ms / 1000.0 + x1 = ts1_ms / 1000.0 + x_center = 0.5 * (x0 + x1) + full_w = max(1e-9, x1 - x0) # seconds + body_w = full_w * self.body_ratio + + o = float(entry["open"]) + h = float(entry["high"]) + l = float(entry["low"]) + c = float(entry["close"]) + + is_up = c >= o + pen = self.pen_up if is_up else self.pen_down + brush = self.brush_up if is_up else self.brush_down + + # wick + p.setPen(pen) + p.drawLine(QtCore.QPointF(x_center, l), QtCore.QPointF(x_center, h)) + + # body (ensure visible dojis) + body_h = abs(c - o) + if body_h == 0: + body_h = 1e-9 + body_bottom = min(o, c) + p.setPen(pen) + p.setBrush(brush) + p.drawRect(QtCore.QRectF(x_center - body_w / 2.0, body_bottom, body_w, body_h)) + + p.end() + self._picture = pic + + def paint(self, painter, *args): + if self._picture: + painter.drawPicture(0, 0, self._picture) + + def boundingRect(self): + if self._picture: + return QtCore.QRectF(self._picture.boundingRect()) + return QtCore.QRectF() + + +class OHLCItem(_BaseCandle): + """Green/Red candlesticks.""" + def __init__(self, data: List[Dict[str, Any]], body_ratio: float = 0.8): + super().__init__(data, body_ratio=body_ratio, color_up="#00ff00", color_down="#ff0000") + + +class OBIItem(_BaseCandle): + """Blue-themed candlesticks for OBI.""" + def __init__(self, data: List[Dict[str, Any]], body_ratio: float = 0.8): + super().__init__(data, body_ratio=body_ratio, color_up="#4a9eff", color_down="#1f5f99") + + +# --------------------------- +# Main Window +# --------------------------- class MainWindow(QMainWindow): """Main application window for orderflow visualization.""" - def __init__(self): super().__init__() - self.ohlc_data = [] - self.metrics_data = [] - self.depth_data = {"bids": [], "asks": []} # Cache for depth data - self.last_data_size = 0 # Track OHLC data changes - self.last_metrics_size = 0 # Track metrics data changes - self.last_depth_data = None # Track depth data changes - - self.setup_ui() - - def setup_ui(self): - """Initialize the user interface.""" + self.ohlc_data: List[Dict[str, Any]] = [] + self.metrics_data: List[Any] = [] # adapt shape to your MetricsCalculator + self.depth_data: Dict[str, List[List[float]]] = {"bids": [], "asks": []} + + self._init_ui() + + # ----- UI setup ----- + def _init_ui(self): self.setWindowTitle("Orderflow Backtest Visualizer") - self.setGeometry(100, 100, 1200, 800) - - # Central widget and main horizontal layout - central_widget = QWidget() - self.setCentralWidget(central_widget) - main_layout = QHBoxLayout(central_widget) - - # Left side: Charts layout (OHLC, Volume, OBI, CVD) + self.resize(1400, 900) + + central = QWidget(self) + self.setCentralWidget(central) + main_layout = QHBoxLayout(central) + charts_widget = QWidget() charts_layout = QVBoxLayout(charts_widget) - - # Right side: Depth chart widget + depth_widget = QWidget() depth_layout = QVBoxLayout(depth_widget) - - # Configure PyQtGraph - pg.setConfigOptions(antialias=True, background='k', foreground='w') - - # Create multiple plot widgets for different charts - self.ohlc_plot = pg.PlotWidget(title="OHLC Candlestick Chart") - self.ohlc_plot.setLabel('left', 'Price', units='USD') + + # PG appearance + pg.setConfigOptions(antialias=True, background="k", foreground="w") + + # Date axis for time plots + date_axis = DateAxisItem(orientation="bottom") + + self.ohlc_plot = pg.PlotWidget(title="OHLC", axisItems={"bottom": date_axis}) + self.ohlc_plot.setLabel("left", "Price", units="USD") self.ohlc_plot.showGrid(x=True, y=True, alpha=0.3) - self.ohlc_plot.setMouseEnabled(x=True, y=True) - self.ohlc_plot.enableAutoRange(axis='xy', enable=False) # Disable auto-range for better control - - self.volume_plot = pg.PlotWidget(title="Volume") - self.volume_plot.setLabel('left', 'Volume') + + self.volume_plot = pg.PlotWidget(title="Volume", axisItems={"bottom": DateAxisItem(orientation="bottom")}) + self.volume_plot.setLabel("left", "Volume") self.volume_plot.showGrid(x=True, y=True, alpha=0.3) - self.volume_plot.setMouseEnabled(x=True, y=True) - self.volume_plot.enableAutoRange(axis='xy', enable=False) - - self.obi_plot = pg.PlotWidget(title="Order Book Imbalance (OBI)") - self.obi_plot.setLabel('left', 'OBI') + + self.obi_plot = pg.PlotWidget(title="Order Book Imbalance (OBI)", axisItems={"bottom": DateAxisItem(orientation="bottom")}) + self.obi_plot.setLabel("left", "OBI") self.obi_plot.showGrid(x=True, y=True, alpha=0.3) - self.obi_plot.setMouseEnabled(x=True, y=True) - self.obi_plot.enableAutoRange(axis='xy', enable=False) - - self.cvd_plot = pg.PlotWidget(title="Cumulative Volume Delta (CVD)") - self.cvd_plot.setLabel('left', 'CVD') - self.cvd_plot.setLabel('bottom', 'Time', units='s') + + self.cvd_plot = pg.PlotWidget(title="Cumulative Volume Delta (CVD)", axisItems={"bottom": DateAxisItem(orientation="bottom")}) + self.cvd_plot.setLabel("left", "CVD") self.cvd_plot.showGrid(x=True, y=True, alpha=0.3) - self.cvd_plot.setMouseEnabled(x=True, y=True) - self.cvd_plot.enableAutoRange(axis='xy', enable=False) - - # Create depth chart (right side) + + # Depth (not time x-axis) self.depth_plot = pg.PlotWidget(title="Order Book Depth") - self.depth_plot.setLabel('left', 'Price', units='USD') - self.depth_plot.setLabel('bottom', 'Cumulative Volume') + self.depth_plot.setLabel("left", "Price", units="USD") + self.depth_plot.setLabel("bottom", "Cumulative Volume") self.depth_plot.showGrid(x=True, y=True, alpha=0.3) - self.depth_plot.setMouseEnabled(x=True, y=True) - - # Link x-axes for synchronized zooming/panning (main charts only) + + # Link x-axes self.volume_plot.setXLink(self.ohlc_plot) self.obi_plot.setXLink(self.ohlc_plot) self.cvd_plot.setXLink(self.ohlc_plot) - - # Add crosshairs to time-series charts + + # Crosshairs and interactions self._setup_crosshairs() - - # Add charts to left layout - charts_layout.addWidget(self.ohlc_plot, 3) # Larger space for OHLC + self._setup_double_click_autorange() + + # Layout weights + charts_layout.addWidget(self.ohlc_plot, 3) charts_layout.addWidget(self.volume_plot, 1) - charts_layout.addWidget(self.obi_plot, 1) + charts_layout.addWidget(self.obi_plot, 1) charts_layout.addWidget(self.cvd_plot, 1) - - # Add depth chart to right layout + depth_layout.addWidget(self.depth_plot) - - # Add both sides to main layout (3:1 ratio similar to original Dash) + main_layout.addWidget(charts_widget, 3) main_layout.addWidget(depth_widget, 1) - + logging.info("UI setup completed") - + + def _setup_double_click_autorange(self): + def _auto_range(_event): + try: + self.ohlc_plot.autoRange() + self.volume_plot.autoRange() + self.obi_plot.autoRange() + self.cvd_plot.autoRange() + except Exception as e: + logging.debug(f"Auto-range error: {e}") + + # Attach to ViewBox mouse double-click + self.ohlc_plot.plotItem.vb.mouseDoubleClickEvent = _auto_range + def _setup_crosshairs(self): - """Setup crosshair functionality for time-series charts.""" - # Create crosshair lines with proper pen style - crosshair_pen = pg.mkPen(color='#888888', width=1, style=QtCore.Qt.DashLine) - self.vline = pg.InfiniteLine(angle=90, movable=False, pen=crosshair_pen) - self.hline_ohlc = pg.InfiniteLine(angle=0, movable=False, pen=crosshair_pen) - self.hline_volume = pg.InfiniteLine(angle=0, movable=False, pen=crosshair_pen) - self.hline_obi = pg.InfiniteLine(angle=0, movable=False, pen=crosshair_pen) - self.hline_cvd = pg.InfiniteLine(angle=0, movable=False, pen=crosshair_pen) - - # Add crosshairs to plots - self.ohlc_plot.addItem(self.vline, ignoreBounds=True) + # One vertical line per plot (a single item cannot be in multiple ViewBoxes) + pen = pg.mkPen(color="#888888", width=1, style=Qt.DashLine) + + self.vline_ohlc = pg.InfiniteLine(angle=90, movable=False, pen=pen) + self.vline_volume = pg.InfiniteLine(angle=90, movable=False, pen=pen) + self.vline_obi = pg.InfiniteLine(angle=90, movable=False, pen=pen) + self.vline_cvd = pg.InfiniteLine(angle=90, movable=False, pen=pen) + + self.hline_ohlc = pg.InfiniteLine(angle=0, movable=False, pen=pen) + self.hline_volume = pg.InfiniteLine(angle=0, movable=False, pen=pen) + self.hline_obi = pg.InfiniteLine(angle=0, movable=False, pen=pen) + self.hline_cvd = pg.InfiniteLine(angle=0, movable=False, pen=pen) + + # Attach to plots + self.ohlc_plot.addItem(self.vline_ohlc, ignoreBounds=True) + self.volume_plot.addItem(self.vline_volume, ignoreBounds=True) + self.obi_plot.addItem(self.vline_obi, ignoreBounds=True) + self.cvd_plot.addItem(self.vline_cvd, ignoreBounds=True) + self.ohlc_plot.addItem(self.hline_ohlc, ignoreBounds=True) - self.volume_plot.addItem(self.vline, ignoreBounds=True) self.volume_plot.addItem(self.hline_volume, ignoreBounds=True) - self.obi_plot.addItem(self.vline, ignoreBounds=True) self.obi_plot.addItem(self.hline_obi, ignoreBounds=True) - self.cvd_plot.addItem(self.vline, ignoreBounds=True) self.cvd_plot.addItem(self.hline_cvd, ignoreBounds=True) - - # Connect mouse move events - self.ohlc_plot.scene().sigMouseMoved.connect(self._on_mouse_moved) - self.volume_plot.scene().sigMouseMoved.connect(self._on_mouse_moved) - self.obi_plot.scene().sigMouseMoved.connect(self._on_mouse_moved) - self.cvd_plot.scene().sigMouseMoved.connect(self._on_mouse_moved) - - # Create data inspection label - self.data_label = pg.LabelItem(justify='left') + + # Mouse move + for plot in [self.ohlc_plot, self.volume_plot, self.obi_plot, self.cvd_plot]: + plot.scene().sigMouseMoved.connect(self._on_mouse_moved) + + # Data label + self.data_label = pg.LabelItem(justify="left") self.ohlc_plot.addItem(self.data_label) - - # Add rectangle selection for zoom functionality - self._setup_rectangle_selection() - - logging.debug("Crosshairs setup completed") - - def _setup_rectangle_selection(self): - """Setup rectangle selection for zoom functionality.""" - # Enable rectangle selection on OHLC plot (main chart) - self.ohlc_plot.setMenuEnabled(False) # Disable context menu for cleaner interaction - - # Add double-click to auto-range - self.ohlc_plot.plotItem.vb.mouseDoubleClickEvent = self._on_double_click - - # Rectangle selection is handled by PyQtGraph's built-in ViewBox behavior - # Users can drag to select area and right-click to zoom to selection - logging.debug("Rectangle selection setup completed") - - def _on_double_click(self, event): - """Handle double-click to auto-range all charts.""" - try: - # Auto-range all linked charts - self.ohlc_plot.autoRange() - self.volume_plot.autoRange() - self.obi_plot.autoRange() - self.cvd_plot.autoRange() - logging.debug("Auto-range applied to all charts") - except Exception as e: - logging.debug(f"Error in double-click handler: {e}") - + + # ----- Data ingestion from processor ----- def update_data(self, data_processor: OHLCProcessor): - """Update chart data from direct processor or JSON files.""" - self._get_data_from_processor(data_processor) - - def _load_ohlc_data(self): - """Load OHLC data from JSON file.""" - ohlc_file = Path("ohlc_data.json") - if not ohlc_file.exists(): - return - + """ + Pull latest bars/metrics from the processor and refresh plots. + Call this whenever you've added trade/book data + processor.flush(). + """ + self.ohlc_data = data_processor.bars or [] + # Optional: if your MetricsCalculator exposes a series try: - with open(ohlc_file, 'r') as f: - data = json.load(f) - - # Only update if data has changed - if len(data) != self.last_data_size: - self.ohlc_data = data - self.last_data_size = len(data) - logging.debug(f"Loaded {len(data)} OHLC bars") - - except (json.JSONDecodeError, FileNotFoundError) as e: - logging.warning(f"Failed to load OHLC data: {e}") - - def _load_metrics_data(self): - """Load metrics data (OBI, CVD) from JSON file.""" - metrics_file = Path("metrics_data.json") - if not metrics_file.exists(): - return - - try: - with open(metrics_file, 'r') as f: - data = json.load(f) - - # Only update if data has changed - if len(data) != self.last_metrics_size: - self.metrics_data = data - self.last_metrics_size = len(data) - logging.debug(f"Loaded {len(data)} metrics bars") - - except (json.JSONDecodeError, FileNotFoundError) as e: - logging.warning(f"Failed to load metrics data: {e}") - - def _load_depth_data(self): - """Load depth data (bids, asks) from JSON file.""" - depth_file = Path("depth_data.json") - if not depth_file.exists(): - return - - try: - with open(depth_file, 'r') as f: - data = json.load(f) - - # Only update if data has changed - if data != self.last_depth_data: - self.depth_data = data - self.last_depth_data = data.copy() if isinstance(data, dict) else data - logging.debug(f"Loaded depth data: {len(data.get('bids', []))} bids, {len(data.get('asks', []))} asks") - - except (json.JSONDecodeError, FileNotFoundError) as e: - logging.warning(f"Failed to load depth data: {e}") - + self.metrics_data = data_processor.get_metrics_series() or [] + except Exception: + self.metrics_data = [] + self._update_all_plots() + + # ----- Plot updates ----- def _update_all_plots(self): - """Update all chart plots with current data.""" self._update_ohlc_plot() self._update_volume_plot() self._update_obi_plot() self._update_cvd_plot() self._update_depth_plot() - + + def _clear_plot_items_but_crosshair(self, plot: pg.PlotWidget): + protected = (pg.InfiniteLine, pg.LabelItem) + items = [it for it in plot.items() if not isinstance(it, protected)] + for it in items: + plot.removeItem(it) + def _update_ohlc_plot(self): - """Update the OHLC plot with candlestick chart.""" if not self.ohlc_data: return - - # Clear existing plot items (but preserve crosshairs) - items = [item for item in self.ohlc_plot.items() if not isinstance(item, pg.InfiniteLine)] - for item in items: - self.ohlc_plot.removeItem(item) + self._clear_plot_items_but_crosshair(self.ohlc_plot) + self.ohlc_plot.addItem(OHLCItem(self.ohlc_data, body_ratio=0.8)) + + first_ts = self.ohlc_data[0]["timestamp_start"] / 1000.0 + last_ts = self.ohlc_data[-1].get("timestamp_end", self.ohlc_data[-1]["timestamp_start"]) / 1000.0 + self.ohlc_plot.setXRange(first_ts, last_ts) + + lows = [bar["low"] for bar in self.ohlc_data] + highs = [bar["high"] for bar in self.ohlc_data] + self.ohlc_plot.setYRange(min(lows), max(highs)) - # Create OHLC candlestick item - ohlc_item = OHLCItem(self.ohlc_data) - self.ohlc_plot.addItem(ohlc_item) - - logging.debug(f"Updated OHLC chart with {len(self.ohlc_data)} bars") - def _update_volume_plot(self): - """Update volume bar chart.""" if not self.ohlc_data: return - - # Clear existing plot items (but preserve crosshairs) - items = [item for item in self.volume_plot.items() if not isinstance(item, pg.InfiniteLine)] - for item in items: - self.volume_plot.removeItem(item) - - # Extract volume and price change data - timestamps = [bar[0] / 1000 for bar in self.ohlc_data] - volumes = [bar[5] for bar in self.ohlc_data] - - # Create volume bars with color coding - for i, (ts, vol) in enumerate(zip(timestamps, volumes)): - # Determine color based on price movement + self._clear_plot_items_but_crosshair(self.volume_plot) + + # Build centered volume bars sized to bucket width + for i, bar in enumerate(self.ohlc_data): + ts0 = bar["timestamp_start"] / 1000.0 + ts1 = bar.get("timestamp_end", bar["timestamp_start"]) / 1000.0 + full_w = max(1e-9, ts1 - ts0) + bar_w = full_w * 0.8 + x_center = 0.5 * (ts0 + ts1) + if i > 0: - prev_close = self.ohlc_data[i-1][4] # Previous close - curr_close = self.ohlc_data[i][4] # Current close - color = '#00ff00' if curr_close >= prev_close else '#ff0000' + prev_close = self.ohlc_data[i - 1]["close"] + color = "#00ff00" if bar["close"] >= prev_close else "#ff0000" else: - color = '#888888' # Neutral for first bar - - # Create bar - bar_item = pg.BarGraphItem(x=[ts], height=[vol], width=30, brush=color) - self.volume_plot.addItem(bar_item) - - logging.debug(f"Updated volume chart with {len(volumes)} bars") - + color = "#888888" + + vol = float(bar["volume"]) + self.volume_plot.addItem(pg.BarGraphItem(x=[x_center], height=[vol], width=bar_w, brush=color)) + def _update_obi_plot(self): - """Update OBI candlestick chart.""" - if not self.metrics_data: - return - - # Clear existing plot items (but preserve crosshairs) - items = [item for item in self.obi_plot.items() if not isinstance(item, pg.InfiniteLine)] - for item in items: - self.obi_plot.removeItem(item) - - # Extract OBI data and create candlestick format - obi_candlesticks = [] - for bar in self.metrics_data: - if len(bar) >= 5: # Ensure we have OBI data - timestamp, obi_open, obi_high, obi_low, obi_close = bar[:5] - obi_candlesticks.append([timestamp, obi_open, obi_high, obi_low, obi_close, 0]) - - if obi_candlesticks: - # Create OBI candlestick item with blue styling - obi_item = OBIItem(obi_candlesticks) # Will create this class - self.obi_plot.addItem(obi_item) - - logging.debug(f"Updated OBI chart with {len(obi_candlesticks)} bars") - - def _update_cvd_plot(self): - """Update CVD line chart.""" - if not self.metrics_data: - return - - # Clear existing plot items (but preserve crosshairs) - items = [item for item in self.cvd_plot.items() if not isinstance(item, pg.InfiniteLine)] - for item in items: - self.cvd_plot.removeItem(item) - - # Extract CVD data - timestamps = [] - cvd_values = [] - - for bar in self.metrics_data: - if len(bar) >= 6: # Check for CVD value - timestamps.append(bar[0] / 1000) # Convert to seconds - cvd_values.append(bar[5]) # CVD value - elif len(bar) >= 5: # Fallback for older format - timestamps.append(bar[0] / 1000) - cvd_values.append(0.0) # Default CVD - - if timestamps and cvd_values: - # Plot CVD as line chart - self.cvd_plot.plot(timestamps, cvd_values, pen=pg.mkPen(color='#ffff00', width=2), name='CVD') - - logging.debug(f"Updated CVD chart with {len(cvd_values)} points") - - def _cumulate_levels(self, levels, reverse=False, limit=50): """ - Convert individual price levels to cumulative volumes. - - Args: - levels: List of [price, size] pairs - reverse: If True, sort in descending order (for bids) - limit: Maximum number of levels to include - - Returns: - List of (price, cumulative_volume) tuples + Expecting metrics_data entries shaped like: + [ts_start_ms, ts_end_ms, obi_o, obi_h, obi_l, obi_c, ...] + Adapt if your MetricsCalculator differs. + """ + + def _update_obi_plot(self): + """ + Update OBI panel with candlesticks from metrics_data. + Each row expected: [ts_start_ms, ts_end_ms, obi_o, obi_h, obi_l, obi_c, cvd] + """ + if not self.metrics_data: + return + self._clear_plot_items_but_crosshair(self.obi_plot) + + # Convert metrics_data rows to candle dicts + candlesticks = [] + for row in self.metrics_data: + if len(row) >= 6: + ts0, ts1, o, h, l, c = row[:6] + candlesticks.append({ + "timestamp_start": int(ts0), + "timestamp_end": int(ts1), + "open": float(o), + "high": float(h), + "low": float(l), + "close": float(c), + "volume": 0.0, + }) + + if candlesticks: + self.obi_plot.addItem(OBIItem(candlesticks, body_ratio=0.8)) + + # Also set Y range explicitly + lows = [c["low"] for c in candlesticks] + highs = [c["high"] for c in candlesticks] + self.obi_plot.setYRange(min(lows), max(highs)) + + def _update_cvd_plot(self): + """ + Plot CVD as a line if present at index 6, else skip. + """ + if not self.metrics_data: + return + self._clear_plot_items_but_crosshair(self.cvd_plot) + + xs = [] + ys = [] + for row in self.metrics_data: + if len(row) >= 7: + ts0 = row[0] / 1000.0 + cvd_val = float(row[6]) + xs.append(ts0) + ys.append(cvd_val) + if xs: + self.cvd_plot.plot(xs, ys, pen=pg.mkPen(color="#ffff00", width=2), name="CVD") + + # ----- Depth chart ----- + @staticmethod + def _cumulate_levels(levels, reverse=False, limit=50): + """ + levels: list of [price, size] + returns list of (price, cum_volume) """ if not levels: return [] - try: - # Sort levels by price - sorted_levels = sorted(levels[:limit], key=lambda x: x[0], reverse=reverse) - - # Calculate cumulative volumes - cumulative = [] - total_volume = 0.0 - - for price, size in sorted_levels: - total_volume += size - cumulative.append((price, total_volume)) - - return cumulative - + sorted_lvls = sorted(levels[:limit], key=lambda x: x[0], reverse=reverse) + cum = [] + total = 0.0 + for price, size in sorted_lvls: + total += float(size) + cum.append((float(price), total)) + return cum except Exception as e: - logging.warning(f"Error in cumulate_levels: {e}") + logging.warning(f"cumulate_levels error: {e}") return [] - + def _update_depth_plot(self): - """Update depth chart with cumulative bid/ask visualization.""" if not self.depth_data or not isinstance(self.depth_data, dict): return - - # Clear all items for depth chart (no crosshairs here) self.depth_plot.clear() - - bids = self.depth_data.get('bids', []) - asks = self.depth_data.get('asks', []) - - # Calculate cumulative levels + + bids = self.depth_data.get("bids", []) + asks = self.depth_data.get("asks", []) + cum_bids = self._cumulate_levels(bids, reverse=True, limit=50) cum_asks = self._cumulate_levels(asks, reverse=False, limit=50) - - # Plot bids (green) + if cum_bids: - bid_volumes = [vol for _, vol in cum_bids] - bid_prices = [price for price, _ in cum_bids] - - # Create stepped line plot for bids - self.depth_plot.plot(bid_volumes, bid_prices, - pen=pg.mkPen(color='#00c800', width=2), - stepMode='left', name='Bids') - - # Add fill area - fill_curve = pg.PlotCurveItem(bid_volumes + [0], bid_prices + [bid_prices[-1]], - fillLevel=0, fillBrush=pg.mkBrush(color=(0, 200, 0, 50))) - self.depth_plot.addItem(fill_curve) - - # Plot asks (red) + bid_prices = [p for p, _ in cum_bids] + bid_vols = [v for _, v in cum_bids] + self.depth_plot.plot(bid_vols, bid_prices, pen=pg.mkPen(color="#00c800", width=2), + stepMode="left", name="Bids") + fill_b = pg.PlotCurveItem(bid_vols + [0], bid_prices + [bid_prices[-1]], fillLevel=0, + fillBrush=pg.mkBrush(color=(0, 200, 0, 50))) + self.depth_plot.addItem(fill_b) + if cum_asks: - ask_volumes = [vol for _, vol in cum_asks] - ask_prices = [price for price, _ in cum_asks] - - # Create stepped line plot for asks - self.depth_plot.plot(ask_volumes, ask_prices, - pen=pg.mkPen(color='#c80000', width=2), - stepMode='left', name='Asks') - - # Add fill area - fill_curve = pg.PlotCurveItem(ask_volumes + [0], ask_prices + [ask_prices[-1]], - fillLevel=0, fillBrush=pg.mkBrush(color=(200, 0, 0, 50))) - self.depth_plot.addItem(fill_curve) - - logging.debug(f"Updated depth chart: {len(cum_bids)} bid levels, {len(cum_asks)} ask levels") - + ask_prices = [p for p, _ in cum_asks] + ask_vols = [v for _, v in cum_asks] + self.depth_plot.plot(ask_vols, ask_prices, pen=pg.mkPen(color="#c80000", width=2), + stepMode="left", name="Asks") + fill_a = pg.PlotCurveItem(ask_vols + [0], ask_prices + [ask_prices[-1]], fillLevel=0, + fillBrush=pg.mkBrush(color=(200, 0, 0, 50))) + self.depth_plot.addItem(fill_a) + + # ----- Crosshair + inspection ----- def _on_mouse_moved(self, pos): - """Handle mouse movement for crosshair and data inspection.""" try: - # Determine which plot triggered the event sender = self.sender() if not sender: return - - # Find the plot widget from the scene - plot_widget = None + target = None for plot in [self.ohlc_plot, self.volume_plot, self.obi_plot, self.cvd_plot]: if plot.scene() == sender: - plot_widget = plot + target = plot break - - if not plot_widget: + if target is None: return - - # Convert scene coordinates to plot coordinates - if plot_widget.sceneBoundingRect().contains(pos): - mouse_point = plot_widget.plotItem.vb.mapSceneToView(pos) - x_pos = mouse_point.x() - y_pos = mouse_point.y() - - # Update crosshair positions - self.vline.setPos(x_pos) - self.hline_ohlc.setPos(y_pos if plot_widget == self.ohlc_plot else self.hline_ohlc.pos()[1]) - self.hline_volume.setPos(y_pos if plot_widget == self.volume_plot else self.hline_volume.pos()[1]) - self.hline_obi.setPos(y_pos if plot_widget == self.obi_plot else self.hline_obi.pos()[1]) - self.hline_cvd.setPos(y_pos if plot_widget == self.cvd_plot else self.hline_cvd.pos()[1]) - - # Update data inspection - self._update_data_inspection(x_pos, plot_widget) - + + if target.sceneBoundingRect().contains(pos): + mp = target.plotItem.vb.mapSceneToView(pos) + x = mp.x() + y = mp.y() + + # Move all vertical lines to the same x (keep panels aligned) + self.vline_ohlc.setPos(x) + self.vline_volume.setPos(x) + self.vline_obi.setPos(x) + self.vline_cvd.setPos(x) + + # Move only the active panel's horizontal line + if target == self.ohlc_plot: + self.hline_ohlc.setPos(y) + elif target == self.volume_plot: + self.hline_volume.setPos(y) + elif target == self.obi_plot: + self.hline_obi.setPos(y) + elif target == self.cvd_plot: + self.hline_cvd.setPos(y) + + self._update_data_label(x) except Exception as e: - logging.debug(f"Error in mouse move handler: {e}") - - def _update_data_inspection(self, x_pos, plot_widget): - """Update data inspection label with values at cursor position.""" + logging.debug(f"mouse move err: {e}") + + def _update_data_label(self, x_seconds: float): try: - info_parts = [] - - # Find closest data point for OHLC + parts = [] + if self.ohlc_data: - closest_ohlc = self._find_closest_data_point(x_pos, self.ohlc_data) - if closest_ohlc: - ts, open_p, high, low, close, volume = closest_ohlc - time_str = self._format_timestamp(ts) - info_parts.append(f"Time: {time_str}") - info_parts.append(f"OHLC: O:{open_p:.2f} H:{high:.2f} L:{low:.2f} C:{close:.2f}") - info_parts.append(f"Volume: {volume:.4f}") - - # Find closest data point for Metrics (OBI/CVD) + closest = self._closest_bar(self.ohlc_data, x_seconds) + if closest: + parts.append(f"Time: {self._fmt_ts(closest['timestamp_start'])}") + parts.append(f"OHLC O:{closest['open']:.2f} H:{closest['high']:.2f} " + f"L:{closest['low']:.2f} C:{closest['close']:.2f}") + parts.append(f"Vol: {float(closest['volume']):.6f}") + if self.metrics_data: - closest_metrics = self._find_closest_data_point(x_pos, self.metrics_data) - if closest_metrics and len(closest_metrics) >= 5: - ts, obi_o, obi_h, obi_l, obi_c = closest_metrics[:5] - cvd_val = closest_metrics[5] if len(closest_metrics) > 5 else 0.0 - info_parts.append(f"OBI: O:{obi_o:.2f} H:{obi_h:.2f} L:{obi_l:.2f} C:{obi_c:.2f}") - info_parts.append(f"CVD: {cvd_val:.2f}") - - # Update label - if info_parts: - self.data_label.setText("
".join(info_parts)) - else: - self.data_label.setText("No data") - + # Optional: display OBI/CVD values if present + row = self._closest_metrics_row(self.metrics_data, x_seconds) + if row and len(row) >= 6: + _, _, o, h, l, c = row[:6] + parts.append(f"OBI O:{o:.2f} H:{h:.2f} L:{l:.2f} C:{c:.2f}") + if row and len(row) >= 7: + parts.append(f"CVD: {float(row[6]):.2f}") + + self.data_label.setText("
".join(parts) if parts else "No data") except Exception as e: - logging.debug(f"Error updating data inspection: {e}") - - def _find_closest_data_point(self, x_pos, data): - """Find the closest data point to the given x position.""" - if not data: + logging.debug(f"update label err: {e}") + + @staticmethod + def _closest_bar(bars: List[Dict[str, Any]], x_seconds: float) -> Optional[Dict[str, Any]]: + if not bars: return None - - # Convert x_pos (seconds) back to milliseconds for comparison - x_ms = x_pos * 1000 - - # Find closest timestamp - closest_idx = 0 - min_diff = abs(data[0][0] - x_ms) - - for i, bar in enumerate(data): - diff = abs(bar[0] - x_ms) - if diff < min_diff: - min_diff = diff - closest_idx = i - - return data[closest_idx] - - def _format_timestamp(self, timestamp_ms): - """Format timestamp for display.""" + x_ms = x_seconds * 1000.0 + closest = min(bars, key=lambda b: abs(b["timestamp_start"] - x_ms)) + return closest + + @staticmethod + def _closest_metrics_row(rows: List[Any], x_seconds: float) -> Optional[Any]: + if not rows: + return None + x_ms = x_seconds * 1000.0 + try: + return min(rows, key=lambda r: abs(r[0] - x_ms)) + except Exception: + return None + + @staticmethod + def _fmt_ts(ts_ms: int) -> str: from datetime import datetime try: - dt = datetime.fromtimestamp(timestamp_ms / 1000) - return dt.strftime("%H:%M:%S") - except: - return str(int(timestamp_ms)) - - def setup_data_processor(self, processor): - """Setup direct data integration with OHLCProcessor.""" - self.data_processor = processor - # For now, use JSON mode as the direct mode implementation is incomplete - self.direct_mode = False - - # Setup callbacks for real-time data updates - self._setup_processor_callbacks() - - logging.info("Data processor reference set, using JSON file mode for visualization") - - def _setup_processor_callbacks(self): - """Setup callbacks to receive data directly from processor.""" - if not self.data_processor: - return - - # Replace JSON polling with direct data access - # Note: This is a simplified approach - in production, you'd want proper callbacks - # from the processor when new data is available - - logging.debug("Processor callbacks setup completed") - - def _get_data_from_processor(self, data_processor: OHLCProcessor): - """Get data directly from processor instead of JSON files.""" - try: - self.ohlc_data = data_processor.get_ohlc_data() - self.metrics_data = data_processor.get_metrics_data() - self.depth_data = data_processor.get_current_depth() - - # Get OHLC data from processor (placeholder - needs actual processor API) - # processor_ohlc = self.data_processor.get_ohlc_data() - # if processor_ohlc: - # self.ohlc_data = processor_ohlc - - # Get metrics data from processor - # processor_metrics = self.data_processor.get_metrics_data() - # if processor_metrics: - # self.metrics_data = processor_metrics - - # Get depth data from processor - # processor_depth = self.data_processor.get_current_depth() - # if processor_depth: - # self.depth_data = processor_depth - - logging.debug("Retrieved data directly from processor") - - except Exception as e: - logging.warning(f"Error getting data from processor: {e}") - # Fallback to JSON mode - self.direct_mode = False - -def main(): - """Application entry point.""" - logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") - - app = QApplication(sys.argv) - - window = MainWindow() - window.show() - - logging.info("Desktop application started") - - sys.exit(app.exec()) - -if __name__ == "__main__": - main() + return datetime.fromtimestamp(ts_ms / 1000.0).strftime("%Y-%m-%d %H:%M:%S") + except Exception: + return str(int(ts_ms)) \ No newline at end of file diff --git a/level_parser.py b/level_parser.py index fb1fbf5..41111db 100644 --- a/level_parser.py +++ b/level_parser.py @@ -1,85 +1,87 @@ -"""Level parsing utilities for orderbook data.""" +"""Ultra-fast level parsing for strings like: +"[['110173.4', '0.0000454', '0', '4'], ['110177.1', '0', '0', '0'], ...]" +""" -import json -import ast -import logging -from typing import List, Any, Tuple +from typing import List, Tuple, Any def normalize_levels(levels: Any) -> List[List[float]]: """ - Convert string-encoded levels into [[price, size], ...] floats. - - Filters out zero/negative sizes. Supports JSON and Python literal formats. + Return [[price, size], ...] with size > 0 only (floats). + Assumes 'levels' is a single-quoted list-of-lists string as above. """ - if not levels or levels == '[]': - return [] - - parsed = _parse_string_to_list(levels) - if not parsed: - return [] - - pairs: List[List[float]] = [] - for item in parsed: - price, size = _extract_price_size(item) - if price is None or size is None: - continue - try: - p, s = float(price), float(size) - if s > 0: - pairs.append([p, s]) - except Exception: - continue - - if not pairs: - logging.debug("normalize_levels: no valid pairs parsed from input") - return pairs + pairs = _fast_pairs(levels) + # filter strictly positive sizes + return [[p, s] for (p, s) in pairs if s > 0.0] def parse_levels_including_zeros(levels: Any) -> List[Tuple[float, float]]: """ - Parse levels into (price, size) tuples including zero sizes for deletions. - - Similar to normalize_levels but preserves zero sizes (for orderbook deletions). + Return [(price, size), ...] (floats), preserving zeros for deletions. + Assumes 'levels' is a single-quoted list-of-lists string as above. """ - if not levels or levels == '[]': + return _fast_pairs(levels) + + +# ----------------- internal: fast path ----------------- + +def _fast_pairs(levels: Any) -> List[Tuple[float, float]]: + """ + Extremely fast parser for inputs like: + "[['110173.4','0.0000454','0','4'],['110177.1','0','0','0'], ...]" + Keeps only the first two fields from each row and converts to float. + """ + if not levels: return [] - parsed = _parse_string_to_list(levels) - if not parsed: + # If already a list (rare in your pipeline), fall back to simple handling + if isinstance(levels, (list, tuple)): + out: List[Tuple[float, float]] = [] + for item in levels: + if isinstance(item, (list, tuple)) and len(item) >= 2: + try: + p = float(item[0]); s = float(item[1]) + out.append((p, s)) + except Exception: + continue + return out + + # Expect a string: strip outer brackets and single quotes fast + s = str(levels).strip() + if len(s) < 5: # too short to contain "[[...]]" return [] - results: List[Tuple[float, float]] = [] - for item in parsed: - price, size = _extract_price_size(item) - if price is None or size is None: + # Remove the outermost [ and ] quickly (tolerant) + if s[0] == '[': + s = s[1:] + if s and s[-1] == ']': + s = s[:-1] + + # Remove *all* single quotes (input uses single quotes, not JSON) + s = s.replace("'", "") + + # Now s looks like: [[110173.4, 0.0000454, 0, 4], [110177.1, 0, 0, 0], ...] + # Split into rows on "],", then strip brackets/spaces per row + rows = s.split("],") + out: List[Tuple[float, float]] = [] + + for row in rows: + row = row.strip() + # strip any leading/trailing brackets/spaces + if row.startswith('['): + row = row[1:] + if row.endswith(']'): + row = row[:-1] + + # fast split by commas and take first two fields + cols = row.split(',') + if len(cols) < 2: continue try: - p, s = float(price), float(size) - if s >= 0: - results.append((p, s)) + p = float(cols[0].strip()) + s_ = float(cols[1].strip()) + out.append((p, s_)) except Exception: continue - return results - - -def _parse_string_to_list(levels: Any) -> List[Any]: - """Parse string levels to list, trying JSON first then literal_eval.""" - try: - parsed = json.loads(levels) - except Exception: - try: - parsed = ast.literal_eval(levels) - except Exception: - return [] - return parsed if isinstance(parsed, list) else [] - - -def _extract_price_size(item: Any) -> Tuple[Any, Any]: - """Extract price and size from dict or list/tuple format.""" - if isinstance(item, dict): - return item.get("price", item.get("p")), item.get("size", item.get("s")) - elif isinstance(item, (list, tuple)) and len(item) >= 2: - return item[0], item[1] - return None, None + return out diff --git a/main.py b/main.py index 56d194f..c1f93cc 100644 --- a/main.py +++ b/main.py @@ -10,15 +10,12 @@ from ohlc_processor import OHLCProcessor from desktop_app import MainWindow import sys from PySide6.QtWidgets import QApplication +from PySide6.QtCore import Signal, QTimer def main(instrument: str = typer.Argument(..., help="Instrument to backtest, e.g. BTC-USDT"), start_date: str = typer.Argument(..., help="Start date, e.g. 2025-07-01"), - end_date: str = typer.Argument(..., help="End date, e.g. 2025-08-01"), - window_seconds: int = typer.Option(60, help="OHLC window size in seconds")): - """ - Process orderbook data and visualize OHLC charts in real-time. - """ + end_date: str = typer.Argument(..., help="End date, e.g. 2025-08-01")): logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") start_date = datetime.strptime(start_date, "%Y-%m-%d").replace(tzinfo=timezone.utc) @@ -39,10 +36,18 @@ def main(instrument: str = typer.Argument(..., help="Instrument to backtest, e.g logging.info(f"Found {len(db_paths)} database files: {[p.name for p in db_paths]}") - processor = OHLCProcessor(window_seconds=window_seconds) + processor = OHLCProcessor(aggregate_window_seconds=60 * 60) + + app = QApplication(sys.argv) + desktop_app = MainWindow() + desktop_app.show() + + timer = QTimer() + timer.timeout.connect(lambda: desktop_app.update_data(processor)) + timer.start(1000) + def process_data(): - """Process database data in a separate thread.""" try: for db_path in db_paths: db_name_parts = db_path.name.split(".")[0].split("-") @@ -64,30 +69,19 @@ def main(instrument: str = typer.Argument(..., help="Instrument to backtest, e.g for orderbook_update, trades in db_interpreter.stream(): batch_count += 1 - processor.process_trades(trades) processor.update_orderbook(orderbook_update) + processor.process_trades(trades) + # desktop_app.update_data(processor) + - processor.finalize() logging.info("Data processing completed") except Exception as e: logging.error(f"Error in data processing: {e}") - try: - app = QApplication(sys.argv) - desktop_app = MainWindow() - - desktop_app.setup_data_processor(processor) - desktop_app.show() - - logging.info("Desktop visualizer started") - - data_thread = threading.Thread(target=process_data, daemon=True) - data_thread.start() - - app.exec() - - except Exception as e: - logging.error(f"Failed to start desktop visualizer: {e}") + data_thread = threading.Thread(target=process_data, daemon=True) + data_thread.start() + + app.exec() if __name__ == "__main__": diff --git a/metrics_calculator.py b/metrics_calculator.py index b1d6507..c61957f 100644 --- a/metrics_calculator.py +++ b/metrics_calculator.py @@ -1,5 +1,5 @@ import logging -from typing import Optional, Tuple +from typing import Optional, List class MetricsCalculator: @@ -7,6 +7,20 @@ class MetricsCalculator: self.cvd_cumulative = 0.0 self.obi_value = 0.0 + # --- per-bucket state --- + self._b_ts_start: Optional[int] = None + self._b_ts_end: Optional[int] = None + self._obi_o: Optional[float] = None + self._obi_h: Optional[float] = None + self._obi_l: Optional[float] = None + self._obi_c: Optional[float] = None + + # final series rows: [ts_start, ts_end, obi_o, obi_h, obi_l, obi_c, cvd] + self._series: List[List[float]] = [] + + # ------------------------------ + # CVD + # ------------------------------ def update_cvd_from_trade(self, side: str, size: float) -> None: if side == "buy": volume_delta = float(size) @@ -14,8 +28,56 @@ class MetricsCalculator: volume_delta = -float(size) else: logging.warning(f"Unknown trade side '{side}', treating as neutral") - + volume_delta = 0.0 self.cvd_cumulative += volume_delta + # ------------------------------ + # OBI + # ------------------------------ def update_obi_from_book(self, total_bids: float, total_asks: float) -> None: self.obi_value = float(total_bids - total_asks) + # update H/L/C if a bucket is open + if self._b_ts_start is not None: + v = self.obi_value + if self._obi_o is None: + self._obi_o = self._obi_h = self._obi_l = self._obi_c = v + else: + self._obi_h = max(self._obi_h, v) + self._obi_l = min(self._obi_l, v) + self._obi_c = v + + # ------------------------------ + # Bucket lifecycle + # ------------------------------ + def begin_bucket(self, ts_start_ms: int, ts_end_ms: int) -> None: + self._b_ts_start = int(ts_start_ms) + self._b_ts_end = int(ts_end_ms) + v = float(self.obi_value) + self._obi_o = self._obi_h = self._obi_l = self._obi_c = v + + def finalize_bucket(self) -> None: + if self._b_ts_start is None or self._b_ts_end is None: + return + o = float(self._obi_o if self._obi_o is not None else self.obi_value) + h = float(self._obi_h if self._obi_h is not None else self.obi_value) + l = float(self._obi_l if self._obi_l is not None else self.obi_value) + c = float(self._obi_c if self._obi_c is not None else self.obi_value) + self._series.append([ + self._b_ts_start, self._b_ts_end, o, h, l, c, float(self.cvd_cumulative) + ]) + # reset + self._b_ts_start = self._b_ts_end = None + self._obi_o = self._obi_h = self._obi_l = self._obi_c = None + + def add_flat_bucket(self, ts_start_ms: int, ts_end_ms: int) -> None: + v = float(self.obi_value) + self._series.append([ + int(ts_start_ms), int(ts_end_ms), + v, v, v, v, float(self.cvd_cumulative) + ]) + + # ------------------------------ + # Output + # ------------------------------ + def get_series(self): + return self._series diff --git a/ohlc_processor.py b/ohlc_processor.py index 4950e69..ca1f5f4 100644 --- a/ohlc_processor.py +++ b/ohlc_processor.py @@ -1,70 +1,191 @@ import logging -from typing import List, Any, Dict, Tuple +from typing import List, Any, Dict, Tuple, Optional -from viz_io import add_ohlc_bar, upsert_ohlc_bar, _atomic_write_json, DEPTH_FILE from db_interpreter import OrderbookUpdate -from level_parser import normalize_levels, parse_levels_including_zeros +from level_parser import parse_levels_including_zeros from orderbook_manager import OrderbookManager from metrics_calculator import MetricsCalculator class OHLCProcessor: """ - Processes trade data and orderbook updates into OHLC bars and depth snapshots. - - This class aggregates individual trades into time-windowed OHLC (Open, High, Low, Close) - bars and maintains an in-memory orderbook state for depth visualization. It also - calculates Order Book Imbalance (OBI) and Cumulative Volume Delta (CVD) metrics. - - The processor uses throttled updates to balance visualization responsiveness with - I/O efficiency, emitting intermediate updates during active windows. - - Attributes: - window_seconds: Time window duration for OHLC aggregation - depth_levels_per_side: Number of top price levels to maintain per side - trades_processed: Total number of trades processed - bars_created: Total number of OHLC bars created - cvd_cumulative: Running cumulative volume delta (via metrics calculator) + Time-bucketed OHLC aggregator with gap-bar filling and metric hooks. + + - Bars are aligned to fixed buckets of length `aggregate_window_seconds`. + - If there is a gap (no trades for one or more buckets), synthetic zero-volume + candles are emitted with O=H=L=C=last_close AND a flat metrics bucket is added. + - By default, the next bar's OPEN is the previous bar's CLOSE (configurable via + `carry_forward_open`). """ - def __init__(self) -> None: - self.current_bar = None + def __init__(self, aggregate_window_seconds: int, carry_forward_open: bool = True) -> None: + self.aggregate_window_seconds = int(aggregate_window_seconds) + self._bucket_ms = self.aggregate_window_seconds * 1000 + + self.carry_forward_open = carry_forward_open + + self.current_bar: Optional[Dict[str, Any]] = None + self._current_bucket_index: Optional[int] = None + self._last_close: Optional[float] = None + self.trades_processed = 0 + self.bars: List[Dict[str, Any]] = [] self._orderbook = OrderbookManager() self._metrics = MetricsCalculator() - @property - def cvd_cumulative(self) -> float: - """Access cumulative CVD from metrics calculator.""" - return self._metrics.cvd_cumulative + # ----------------------- + # Internal helpers + # ----------------------- + def _new_bar(self, bucket_start_ms: int, open_price: float) -> Dict[str, Any]: + return { + "timestamp_start": bucket_start_ms, + "timestamp_end": bucket_start_ms + self._bucket_ms, + "open": float(open_price), + "high": float(open_price), + "low": float(open_price), + "close": float(open_price), + "volume": 0.0, + } + def _emit_gap_bars(self, from_index: int, to_index: int) -> None: + """ + Emit empty buckets strictly between from_index and to_index. + Each synthetic bar has zero volume and O=H=L=C=last_close. + Also emit a flat metrics bucket for each gap. + """ + if self._last_close is None: + return + + for bi in range(from_index + 1, to_index): + start_ms = bi * self._bucket_ms + gap_bar = self._new_bar(start_ms, self._last_close) + self.bars.append(gap_bar) + + # metrics: add a flat bucket to keep OBI/CVD time-continuous + try: + self._metrics.add_flat_bucket(start_ms, start_ms + self._bucket_ms) + except Exception as e: + logging.debug(f"metrics add_flat_bucket error (ignored): {e}") + + # ----------------------- + # Public API + # ----------------------- def process_trades(self, trades: List[Tuple[Any, ...]]) -> None: + """ + trades: iterables like (trade_id, trade_id_str, price, size, side, timestamp_ms, ...) + timestamp_ms expected in milliseconds. + """ + if not trades: + return + + # Ensure time-ascending order; if upstream guarantees it, you can skip. + trades = sorted(trades, key=lambda t: int(t[5])) + for trade in trades: trade_id, trade_id_str, price, size, side, timestamp_ms = trade[:6] + price = float(price) + size = float(size) timestamp_ms = int(timestamp_ms) + self.trades_processed += 1 - self._metrics.update_cvd_from_trade(side, size) + # Metrics driven by trades: update CVD + try: + self._metrics.update_cvd_from_trade(side, size) + except Exception as e: + logging.debug(f"CVD update error (ignored): {e}") - if not self.current_bar: - self.current_bar = { - 'open': float(price), - 'high': float(price), - 'low': float(price), - 'close': float(price) - } - self.current_bar['high'] = max(self.current_bar['high'], float(price)) - self.current_bar['low'] = min(self.current_bar['low'], float(price)) - self.current_bar['close'] = float(price) - self.current_bar['volume'] += float(size) + # Determine this trade's bucket + bucket_index = timestamp_ms // self._bucket_ms + bucket_start = bucket_index * self._bucket_ms + # New bucket? + if self._current_bucket_index is None or bucket_index != self._current_bucket_index: + # finalize prior bar + if self.current_bar is not None: + self.bars.append(self.current_bar) + self._last_close = self.current_bar["close"] + # finalize metrics for the prior bucket window + try: + self._metrics.finalize_bucket() + except Exception as e: + logging.debug(f"metrics finalize_bucket error (ignored): {e}") + + # handle gaps + if self._current_bucket_index is not None and bucket_index > self._current_bucket_index + 1: + self._emit_gap_bars(self._current_bucket_index, bucket_index) + + # pick open price policy + if self.carry_forward_open and self._last_close is not None: + open_for_new = self._last_close + else: + open_for_new = price + + self.current_bar = self._new_bar(bucket_start, open_for_new) + self._current_bucket_index = bucket_index + + # begin a new metrics bucket aligned to this bar window + try: + self._metrics.begin_bucket(bucket_start, bucket_start + self._bucket_ms) + except Exception as e: + logging.debug(f"metrics begin_bucket error (ignored): {e}") + + # Update current bucket with this trade + b = self.current_bar + if b is None: + # Should not happen, but guard anyway + b = self._new_bar(bucket_start, price) + self.current_bar = b + self._current_bucket_index = bucket_index + try: + self._metrics.begin_bucket(bucket_start, bucket_start + self._bucket_ms) + except Exception as e: + logging.debug(f"metrics begin_bucket (guard) error (ignored): {e}") + + b["high"] = max(b["high"], price) + b["low"] = min(b["low"], price) + b["close"] = price + b["volume"] += size + # keep timestamp_end snapped to bucket boundary + b["timestamp_end"] = bucket_start + self._bucket_ms + + def flush(self) -> None: + """Emit the in-progress bar (if any). Call at the end of a run/backtest.""" + if self.current_bar is not None: + self.bars.append(self.current_bar) + self._last_close = self.current_bar["close"] + self.current_bar = None + # finalize any open metrics bucket + try: + self._metrics.finalize_bucket() + except Exception as e: + logging.debug(f"metrics finalize_bucket on flush error (ignored): {e}") def update_orderbook(self, ob_update: OrderbookUpdate) -> None: + """ + Apply orderbook deltas and refresh OBI metrics. + Call this frequently (on each OB update) so intra-bucket OBI highs/lows track the book. + """ bids_updates = parse_levels_including_zeros(ob_update.bids) asks_updates = parse_levels_including_zeros(ob_update.asks) self._orderbook.apply_updates(bids_updates, asks_updates) total_bids, total_asks = self._orderbook.get_total_volume() - self._metrics.update_obi_from_book(total_bids, total_asks) - + try: + self._metrics.update_obi_from_book(total_bids, total_asks) + except Exception as e: + logging.debug(f"OBI update error (ignored): {e}") + + # ----------------------- + # UI-facing helpers + # ----------------------- + def get_metrics_series(self): + """ + Returns a list of rows: + [timestamp_start_ms, timestamp_end_ms, obi_open, obi_high, obi_low, obi_close, cvd_value] + """ + try: + return self._metrics.get_series() + except Exception: + return []