import logging from typing import List, Any, Dict, Tuple, Optional from db_interpreter import OrderbookUpdate from level_parser import parse_levels_including_zeros from orderbook_manager import OrderbookManager from metrics_calculator import MetricsCalculator class OHLCProcessor: """ Time-bucketed OHLC aggregator with gap-bar filling and metric hooks. - Bars are aligned to fixed buckets of length `aggregate_window_seconds`. - If there is a gap (no trades for one or more buckets), synthetic zero-volume candles are emitted with O=H=L=C=last_close AND a flat metrics bucket is added. - By default, the next bar's OPEN is the previous bar's CLOSE (configurable via `carry_forward_open`). """ def __init__(self, aggregate_window_seconds: int) -> None: self.aggregate_window_seconds = int(aggregate_window_seconds) self._bucket_ms = self.aggregate_window_seconds * 1000 self.current_bar: Optional[Dict[str, Any]] = None self._current_bucket_index: Optional[int] = None self._last_close: Optional[float] = None self.trades_processed = 0 self.bars: List[Dict[str, Any]] = [] self.orderbook = OrderbookManager() self.metrics = MetricsCalculator() # ----------------------- # Internal helpers # ----------------------- def _new_bar(self, bucket_start_ms: int, open_price: float) -> Dict[str, Any]: return { "timestamp_start": bucket_start_ms, "timestamp_end": bucket_start_ms + self._bucket_ms, "open": float(open_price), "high": float(open_price), "low": float(open_price), "close": float(open_price), "volume": 0.0, } def _emit_gap_bars(self, from_index: int, to_index: int) -> None: """ Emit empty buckets strictly between from_index and to_index. Each synthetic bar has zero volume and O=H=L=C=last_close. Also emit a flat metrics bucket for each gap. """ if self._last_close is None: return for bi in range(from_index + 1, to_index): start_ms = bi * self._bucket_ms gap_bar = self._new_bar(start_ms, self._last_close) self.bars.append(gap_bar) # metrics: add a flat bucket to keep OBI/CVD/ATR time-continuous try: self.metrics.add_flat_bucket(start_ms, start_ms + self._bucket_ms) except Exception as e: logging.debug(f"metrics add_flat_bucket error (ignored): {e}") # ----------------------- # Public API # ----------------------- def process_trades(self, trades: List[Tuple[Any, ...]]) -> None: """ trades: iterables like (trade_id, trade_id_str, price, size, side, timestamp_ms, ...) timestamp_ms expected in milliseconds. """ if not trades: return # Ensure time-ascending order; if upstream guarantees it, you can skip. trades = sorted(trades, key=lambda t: int(t[5])) for trade in trades: trade_id, trade_id_str, price, size, side, timestamp_ms = trade[:6] price = float(price) size = float(size) timestamp_ms = int(timestamp_ms) self.trades_processed += 1 # Determine this trade's bucket bucket_index = timestamp_ms // self._bucket_ms bucket_start = bucket_index * self._bucket_ms # New bucket? if self._current_bucket_index is None or bucket_index != self._current_bucket_index: # finalize prior bar (also finalizes metrics incl. ATR) if self.current_bar is not None: self.bars.append(self.current_bar) self._last_close = self.current_bar["close"] self.metrics.finalize_bucket(self.current_bar) # <— pass bar for ATR # handle gaps if self._current_bucket_index is not None and bucket_index > self._current_bucket_index + 1: self._emit_gap_bars(self._current_bucket_index, bucket_index) open_for_new = self._last_close if self._last_close is not None else price self.current_bar = self._new_bar(bucket_start, open_for_new) self._current_bucket_index = bucket_index self.metrics.begin_bucket(bucket_start, bucket_start + self._bucket_ms) # Metrics driven by trades: update CVD self.metrics.update_cvd_from_trade(side, float(size)) # Update current bucket with this trade b = self.current_bar b["high"] = max(b["high"], price) b["low"] = min(b["low"], price) b["close"] = price b["volume"] += size # keep timestamp_end snapped to bucket boundary b["timestamp_end"] = bucket_start + self._bucket_ms def flush(self) -> None: """Emit the in-progress bar (if any). Call at the end of a run/backtest.""" if self.current_bar is not None: self.bars.append(self.current_bar) self._last_close = self.current_bar["close"] try: self.metrics.finalize_bucket(self.current_bar) # <— pass bar for ATR except Exception as e: logging.debug(f"metrics finalize_bucket on flush error (ignored): {e}") self.current_bar = None else: try: self.metrics.finalize_bucket(None) except Exception as e: logging.debug(f"metrics finalize_bucket on flush error (ignored): {e}") def update_orderbook(self, ob_update: OrderbookUpdate) -> None: """ Apply orderbook deltas and refresh OBI metrics. Call this frequently (on each OB update) so intra-bucket OBI highs/lows track the book. """ bids_updates = parse_levels_including_zeros(ob_update.bids) asks_updates = parse_levels_including_zeros(ob_update.asks) self.orderbook.apply_updates(bids_updates, asks_updates) total_bids, total_asks = self.orderbook.get_total_volume() try: self.metrics.update_obi_from_book(total_bids, total_asks) except Exception as e: logging.debug(f"OBI update error (ignored): {e}") # ----------------------- # UI-facing helpers # ----------------------- def get_metrics_series(self): """ Returns: { 'cvd': [[ts_start, ts_end, o, h, l, c, value_at_close], ...], 'obi': [[...], ...], 'atr': [atr_value_per_bar, ...] } """ try: return self.metrics.get_series() except Exception: return {}