orderflow_backtest/metrics_calculator.py

164 lines
6.2 KiB
Python

# metrics_calculator.py
import logging
from typing import List, Optional
class MetricsCalculator:
def __init__(self):
# ----- OBI (value at close) -----
self._obi_value = 0.0
# ----- CVD (bucket-local delta, TradingView-style) -----
self._bucket_buy = 0.0
self._bucket_sell = 0.0
self._bucket_net = 0.0 # buy - sell within current bucket
# --- per-bucket lifecycle state ---
self._b_ts_start: Optional[int] = None
self._b_ts_end: Optional[int] = None
self._obi_o = self._obi_h = self._obi_l = self._obi_c = None
self._cvd_o = self._cvd_h = self._cvd_l = self._cvd_c = None
# final series rows:
# [ts_start, ts_end, o, h, l, c, value_at_close]
self._series_obi: List[List[float]] = []
self._series_cvd: List[List[float]] = []
# ----- ATR(14) -----
self._atr_period: int = 14
self._prev_close: Optional[float] = None
self._tr_window: List[float] = [] # last N TR values
self._series_atr: List[float] = [] # one value per finalized bucket
# ------------------------------
# CVD (bucket-local delta)
# ------------------------------
def update_cvd_from_trade(self, side: str, size: float) -> None:
"""Accumulate buy/sell within the *current* bucket (TV-style volume delta)."""
if self._b_ts_start is None:
# bucket not open yet; processor should call begin_bucket first
return
s = float(size)
if side == "buy":
self._bucket_buy += s
elif side == "sell":
self._bucket_sell += s
else:
logging.warning(f"Unknown trade side '{side}', ignoring")
return
self._bucket_net = self._bucket_buy - self._bucket_sell
v = self._bucket_net
if self._cvd_o is None:
self._cvd_o = 0.0
self._cvd_h = v
self._cvd_l = v
self._cvd_c = v
else:
self._cvd_h = max(self._cvd_h, v)
self._cvd_l = min(self._cvd_l, v)
self._cvd_c = v
# ------------------------------
# OBI
# ------------------------------
def update_obi_from_book(self, total_bids: float, total_asks: float) -> None:
self._obi_value = float(total_bids - total_asks)
if self._b_ts_start is not None:
v = self._obi_value
if self._obi_o is None:
self._obi_o = self._obi_h = self._obi_l = self._obi_c = v
else:
self._obi_h = max(self._obi_h, v)
self._obi_l = min(self._obi_l, v)
self._obi_c = v
# ------------------------------
# ATR helpers
# ------------------------------
def _update_atr_from_bar(self, high: float, low: float, close: float) -> None:
if self._prev_close is None:
tr = float(high) - float(low)
else:
tr = max(
float(high) - float(low),
abs(float(high) - float(self._prev_close)),
abs(float(low) - float(self._prev_close)),
)
self._tr_window.append(tr)
if len(self._tr_window) > self._atr_period:
self._tr_window.pop(0)
atr = (sum(self._tr_window) / len(self._tr_window)) if self._tr_window else 0.0
self._series_atr.append(atr)
self._prev_close = float(close)
# ------------------------------
# Bucket lifecycle
# ------------------------------
def begin_bucket(self, ts_start_ms: int, ts_end_ms: int) -> None:
self._b_ts_start = int(ts_start_ms)
self._b_ts_end = int(ts_end_ms)
# OBI opens at current value
self._obi_o = self._obi_h = self._obi_l = self._obi_c = self._obi_value
# CVD resets each bucket
self._bucket_buy = 0.0
self._bucket_sell = 0.0
self._bucket_net = 0.0
self._cvd_o = 0.0
self._cvd_h = 0.0
self._cvd_l = 0.0
self._cvd_c = 0.0
def finalize_bucket(self, bar: Optional[dict] = None) -> None:
if self._b_ts_start is None or self._b_ts_end is None:
return
# OBI row
o = float(self._obi_o if self._obi_o is not None else self._obi_value)
h = float(self._obi_h if self._obi_h is not None else self._obi_value)
l = float(self._obi_l if self._obi_l is not None else self._obi_value)
c = float(self._obi_c if self._obi_c is not None else self._obi_value)
self._series_obi.append([self._b_ts_start, self._b_ts_end, o, h, l, c, float(self._obi_value)])
# CVD row (bucket-local delta)
o = float(self._cvd_o if self._cvd_o is not None else 0.0)
h = float(self._cvd_h if self._cvd_h is not None else 0.0)
l = float(self._cvd_l if self._cvd_l is not None else 0.0)
c = float(self._cvd_c if self._cvd_c is not None else 0.0)
self._series_cvd.append([self._b_ts_start, self._b_ts_end, o, h, l, c, float(self._bucket_net)])
# ATR from the finalized OHLC bar
if bar is not None:
try:
self._update_atr_from_bar(bar["high"], bar["low"], bar["close"])
except Exception as e:
logging.debug(f"ATR update error (ignored): {e}")
# reset state
self._b_ts_start = self._b_ts_end = None
self._obi_o = self._obi_h = self._obi_l = self._obi_c = None
self._cvd_o = self._cvd_h = self._cvd_l = self._cvd_c = None
def add_flat_bucket(self, ts_start_ms: int, ts_end_ms: int) -> None:
# OBI flat
v_obi = float(self._obi_value)
self._series_obi.append([int(ts_start_ms), int(ts_end_ms), v_obi, v_obi, v_obi, v_obi, v_obi])
# CVD flat at zero (no trades in this bucket)
self._series_cvd.append([int(ts_start_ms), int(ts_end_ms), 0.0, 0.0, 0.0, 0.0, 0.0])
# ATR: carry last ATR forward if any, else 0.0
last_atr = self._series_atr[-1] if self._series_atr else 0.0
self._series_atr.append(float(last_atr))
# ------------------------------
# Output
# ------------------------------
def get_series(self):
return {'cvd': self._series_cvd, 'obi': self._series_obi, 'atr': self._series_atr}