# metrics_calculator.py import logging from typing import List, Optional class MetricsCalculator: def __init__(self): # ----- OBI (value at close) ----- self._obi_value = 0.0 # ----- CVD (bucket-local delta, TradingView-style) ----- self._bucket_buy = 0.0 self._bucket_sell = 0.0 self._bucket_net = 0.0 # buy - sell within current bucket # --- per-bucket lifecycle state --- self._b_ts_start: Optional[int] = None self._b_ts_end: Optional[int] = None self._obi_o = self._obi_h = self._obi_l = self._obi_c = None self._cvd_o = self._cvd_h = self._cvd_l = self._cvd_c = None # final series rows: # [ts_start, ts_end, o, h, l, c, value_at_close] self._series_obi: List[List[float]] = [] self._series_cvd: List[List[float]] = [] # ----- ATR(14) ----- self._atr_period: int = 14 self._prev_close: Optional[float] = None self._tr_window: List[float] = [] # last N TR values self._series_atr: List[float] = [] # one value per finalized bucket # ------------------------------ # CVD (bucket-local delta) # ------------------------------ def update_cvd_from_trade(self, side: str, size: float) -> None: """Accumulate buy/sell within the *current* bucket (TV-style volume delta).""" if self._b_ts_start is None: # bucket not open yet; processor should call begin_bucket first return s = float(size) if side == "buy": self._bucket_buy += s elif side == "sell": self._bucket_sell += s else: logging.warning(f"Unknown trade side '{side}', ignoring") return self._bucket_net = self._bucket_buy - self._bucket_sell v = self._bucket_net if self._cvd_o is None: self._cvd_o = 0.0 self._cvd_h = v self._cvd_l = v self._cvd_c = v else: self._cvd_h = max(self._cvd_h, v) self._cvd_l = min(self._cvd_l, v) self._cvd_c = v # ------------------------------ # OBI # ------------------------------ def update_obi_from_book(self, total_bids: float, total_asks: float) -> None: self._obi_value = float(total_bids - total_asks) if self._b_ts_start is not None: v = self._obi_value if self._obi_o is None: self._obi_o = self._obi_h = self._obi_l = self._obi_c = v else: self._obi_h = max(self._obi_h, v) self._obi_l = min(self._obi_l, v) self._obi_c = v # ------------------------------ # ATR helpers # ------------------------------ def _update_atr_from_bar(self, high: float, low: float, close: float) -> None: if self._prev_close is None: tr = float(high) - float(low) else: tr = max( float(high) - float(low), abs(float(high) - float(self._prev_close)), abs(float(low) - float(self._prev_close)), ) self._tr_window.append(tr) if len(self._tr_window) > self._atr_period: self._tr_window.pop(0) atr = (sum(self._tr_window) / len(self._tr_window)) if self._tr_window else 0.0 self._series_atr.append(atr) self._prev_close = float(close) # ------------------------------ # Bucket lifecycle # ------------------------------ def begin_bucket(self, ts_start_ms: int, ts_end_ms: int) -> None: self._b_ts_start = int(ts_start_ms) self._b_ts_end = int(ts_end_ms) # OBI opens at current value self._obi_o = self._obi_h = self._obi_l = self._obi_c = self._obi_value # CVD resets each bucket self._bucket_buy = 0.0 self._bucket_sell = 0.0 self._bucket_net = 0.0 self._cvd_o = 0.0 self._cvd_h = 0.0 self._cvd_l = 0.0 self._cvd_c = 0.0 def finalize_bucket(self, bar: Optional[dict] = None) -> None: if self._b_ts_start is None or self._b_ts_end is None: return # OBI row o = float(self._obi_o if self._obi_o is not None else self._obi_value) h = float(self._obi_h if self._obi_h is not None else self._obi_value) l = float(self._obi_l if self._obi_l is not None else self._obi_value) c = float(self._obi_c if self._obi_c is not None else self._obi_value) self._series_obi.append([self._b_ts_start, self._b_ts_end, o, h, l, c, float(self._obi_value)]) # CVD row (bucket-local delta) o = float(self._cvd_o if self._cvd_o is not None else 0.0) h = float(self._cvd_h if self._cvd_h is not None else 0.0) l = float(self._cvd_l if self._cvd_l is not None else 0.0) c = float(self._cvd_c if self._cvd_c is not None else 0.0) self._series_cvd.append([self._b_ts_start, self._b_ts_end, o, h, l, c, float(self._bucket_net)]) # ATR from the finalized OHLC bar if bar is not None: try: self._update_atr_from_bar(bar["high"], bar["low"], bar["close"]) except Exception as e: logging.debug(f"ATR update error (ignored): {e}") # reset state self._b_ts_start = self._b_ts_end = None self._obi_o = self._obi_h = self._obi_l = self._obi_c = None self._cvd_o = self._cvd_h = self._cvd_l = self._cvd_c = None def add_flat_bucket(self, ts_start_ms: int, ts_end_ms: int) -> None: # OBI flat v_obi = float(self._obi_value) self._series_obi.append([int(ts_start_ms), int(ts_end_ms), v_obi, v_obi, v_obi, v_obi, v_obi]) # CVD flat at zero (no trades in this bucket) self._series_cvd.append([int(ts_start_ms), int(ts_end_ms), 0.0, 0.0, 0.0, 0.0, 0.0]) # ATR: carry last ATR forward if any, else 0.0 last_atr = self._series_atr[-1] if self._series_atr else 0.0 self._series_atr.append(float(last_atr)) # ------------------------------ # Output # ------------------------------ def get_series(self): return {'cvd': self._series_cvd, 'obi': self._series_obi, 'atr': self._series_atr}