164 lines
6.2 KiB
Python
164 lines
6.2 KiB
Python
# metrics_calculator.py
|
|
import logging
|
|
from typing import List, Optional
|
|
|
|
|
|
class MetricsCalculator:
|
|
def __init__(self):
|
|
# ----- OBI (value at close) -----
|
|
self._obi_value = 0.0
|
|
|
|
# ----- CVD (bucket-local delta, TradingView-style) -----
|
|
self._bucket_buy = 0.0
|
|
self._bucket_sell = 0.0
|
|
self._bucket_net = 0.0 # buy - sell within current bucket
|
|
|
|
# --- per-bucket lifecycle state ---
|
|
self._b_ts_start: Optional[int] = None
|
|
self._b_ts_end: Optional[int] = None
|
|
|
|
self._obi_o = self._obi_h = self._obi_l = self._obi_c = None
|
|
self._cvd_o = self._cvd_h = self._cvd_l = self._cvd_c = None
|
|
|
|
# final series rows:
|
|
# [ts_start, ts_end, o, h, l, c, value_at_close]
|
|
self._series_obi: List[List[float]] = []
|
|
self._series_cvd: List[List[float]] = []
|
|
|
|
# ----- ATR(14) -----
|
|
self._atr_period: int = 14
|
|
self._prev_close: Optional[float] = None
|
|
self._tr_window: List[float] = [] # last N TR values
|
|
self._series_atr: List[float] = [] # one value per finalized bucket
|
|
|
|
# ------------------------------
|
|
# CVD (bucket-local delta)
|
|
# ------------------------------
|
|
def update_cvd_from_trade(self, side: str, size: float) -> None:
|
|
"""Accumulate buy/sell within the *current* bucket (TV-style volume delta)."""
|
|
if self._b_ts_start is None:
|
|
# bucket not open yet; processor should call begin_bucket first
|
|
return
|
|
|
|
s = float(size)
|
|
if side == "buy":
|
|
self._bucket_buy += s
|
|
elif side == "sell":
|
|
self._bucket_sell += s
|
|
else:
|
|
logging.warning(f"Unknown trade side '{side}', ignoring")
|
|
return
|
|
|
|
self._bucket_net = self._bucket_buy - self._bucket_sell
|
|
v = self._bucket_net
|
|
|
|
if self._cvd_o is None:
|
|
self._cvd_o = 0.0
|
|
self._cvd_h = v
|
|
self._cvd_l = v
|
|
self._cvd_c = v
|
|
else:
|
|
self._cvd_h = max(self._cvd_h, v)
|
|
self._cvd_l = min(self._cvd_l, v)
|
|
self._cvd_c = v
|
|
|
|
# ------------------------------
|
|
# OBI
|
|
# ------------------------------
|
|
def update_obi_from_book(self, total_bids: float, total_asks: float) -> None:
|
|
self._obi_value = float(total_bids - total_asks)
|
|
if self._b_ts_start is not None:
|
|
v = self._obi_value
|
|
if self._obi_o is None:
|
|
self._obi_o = self._obi_h = self._obi_l = self._obi_c = v
|
|
else:
|
|
self._obi_h = max(self._obi_h, v)
|
|
self._obi_l = min(self._obi_l, v)
|
|
self._obi_c = v
|
|
|
|
# ------------------------------
|
|
# ATR helpers
|
|
# ------------------------------
|
|
def _update_atr_from_bar(self, high: float, low: float, close: float) -> None:
|
|
if self._prev_close is None:
|
|
tr = float(high) - float(low)
|
|
else:
|
|
tr = max(
|
|
float(high) - float(low),
|
|
abs(float(high) - float(self._prev_close)),
|
|
abs(float(low) - float(self._prev_close)),
|
|
)
|
|
self._tr_window.append(tr)
|
|
if len(self._tr_window) > self._atr_period:
|
|
self._tr_window.pop(0)
|
|
atr = (sum(self._tr_window) / len(self._tr_window)) if self._tr_window else 0.0
|
|
self._series_atr.append(atr)
|
|
self._prev_close = float(close)
|
|
|
|
# ------------------------------
|
|
# Bucket lifecycle
|
|
# ------------------------------
|
|
def begin_bucket(self, ts_start_ms: int, ts_end_ms: int) -> None:
|
|
self._b_ts_start = int(ts_start_ms)
|
|
self._b_ts_end = int(ts_end_ms)
|
|
|
|
# OBI opens at current value
|
|
self._obi_o = self._obi_h = self._obi_l = self._obi_c = self._obi_value
|
|
|
|
# CVD resets each bucket
|
|
self._bucket_buy = 0.0
|
|
self._bucket_sell = 0.0
|
|
self._bucket_net = 0.0
|
|
self._cvd_o = 0.0
|
|
self._cvd_h = 0.0
|
|
self._cvd_l = 0.0
|
|
self._cvd_c = 0.0
|
|
|
|
def finalize_bucket(self, bar: Optional[dict] = None) -> None:
|
|
if self._b_ts_start is None or self._b_ts_end is None:
|
|
return
|
|
|
|
# OBI row
|
|
o = float(self._obi_o if self._obi_o is not None else self._obi_value)
|
|
h = float(self._obi_h if self._obi_h is not None else self._obi_value)
|
|
l = float(self._obi_l if self._obi_l is not None else self._obi_value)
|
|
c = float(self._obi_c if self._obi_c is not None else self._obi_value)
|
|
self._series_obi.append([self._b_ts_start, self._b_ts_end, o, h, l, c, float(self._obi_value)])
|
|
|
|
# CVD row (bucket-local delta)
|
|
o = float(self._cvd_o if self._cvd_o is not None else 0.0)
|
|
h = float(self._cvd_h if self._cvd_h is not None else 0.0)
|
|
l = float(self._cvd_l if self._cvd_l is not None else 0.0)
|
|
c = float(self._cvd_c if self._cvd_c is not None else 0.0)
|
|
self._series_cvd.append([self._b_ts_start, self._b_ts_end, o, h, l, c, float(self._bucket_net)])
|
|
|
|
# ATR from the finalized OHLC bar
|
|
if bar is not None:
|
|
try:
|
|
self._update_atr_from_bar(bar["high"], bar["low"], bar["close"])
|
|
except Exception as e:
|
|
logging.debug(f"ATR update error (ignored): {e}")
|
|
|
|
# reset state
|
|
self._b_ts_start = self._b_ts_end = None
|
|
self._obi_o = self._obi_h = self._obi_l = self._obi_c = None
|
|
self._cvd_o = self._cvd_h = self._cvd_l = self._cvd_c = None
|
|
|
|
def add_flat_bucket(self, ts_start_ms: int, ts_end_ms: int) -> None:
|
|
# OBI flat
|
|
v_obi = float(self._obi_value)
|
|
self._series_obi.append([int(ts_start_ms), int(ts_end_ms), v_obi, v_obi, v_obi, v_obi, v_obi])
|
|
|
|
# CVD flat at zero (no trades in this bucket)
|
|
self._series_cvd.append([int(ts_start_ms), int(ts_end_ms), 0.0, 0.0, 0.0, 0.0, 0.0])
|
|
|
|
# ATR: carry last ATR forward if any, else 0.0
|
|
last_atr = self._series_atr[-1] if self._series_atr else 0.0
|
|
self._series_atr.append(float(last_atr))
|
|
|
|
# ------------------------------
|
|
# Output
|
|
# ------------------------------
|
|
def get_series(self):
|
|
return {'cvd': self._series_cvd, 'obi': self._series_obi, 'atr': self._series_atr}
|