orderflow_backtest/metrics_calculator.py

84 lines
3.0 KiB
Python
Raw Normal View History

2025-09-10 15:39:16 +08:00
import logging
from typing import Optional, List
2025-09-10 15:39:16 +08:00
class MetricsCalculator:
def __init__(self):
self.cvd_cumulative = 0.0
self.obi_value = 0.0
# --- per-bucket state ---
self._b_ts_start: Optional[int] = None
self._b_ts_end: Optional[int] = None
self._obi_o: Optional[float] = None
self._obi_h: Optional[float] = None
self._obi_l: Optional[float] = None
self._obi_c: Optional[float] = None
# final series rows: [ts_start, ts_end, obi_o, obi_h, obi_l, obi_c, cvd]
self._series: List[List[float]] = []
# ------------------------------
# CVD
# ------------------------------
2025-09-10 15:39:16 +08:00
def update_cvd_from_trade(self, side: str, size: float) -> None:
if side == "buy":
volume_delta = float(size)
elif side == "sell":
volume_delta = -float(size)
else:
logging.warning(f"Unknown trade side '{side}', treating as neutral")
volume_delta = 0.0
2025-09-10 15:39:16 +08:00
self.cvd_cumulative += volume_delta
# ------------------------------
# OBI
# ------------------------------
2025-09-10 15:39:16 +08:00
def update_obi_from_book(self, total_bids: float, total_asks: float) -> None:
self.obi_value = float(total_bids - total_asks)
# update H/L/C if a bucket is open
if self._b_ts_start is not None:
v = self.obi_value
if self._obi_o is None:
self._obi_o = self._obi_h = self._obi_l = self._obi_c = v
else:
self._obi_h = max(self._obi_h, v)
self._obi_l = min(self._obi_l, v)
self._obi_c = v
# ------------------------------
# Bucket lifecycle
# ------------------------------
def begin_bucket(self, ts_start_ms: int, ts_end_ms: int) -> None:
self._b_ts_start = int(ts_start_ms)
self._b_ts_end = int(ts_end_ms)
v = float(self.obi_value)
self._obi_o = self._obi_h = self._obi_l = self._obi_c = v
def finalize_bucket(self) -> None:
if self._b_ts_start is None or self._b_ts_end is None:
return
o = float(self._obi_o if self._obi_o is not None else self.obi_value)
h = float(self._obi_h if self._obi_h is not None else self.obi_value)
l = float(self._obi_l if self._obi_l is not None else self.obi_value)
c = float(self._obi_c if self._obi_c is not None else self.obi_value)
self._series.append([
self._b_ts_start, self._b_ts_end, o, h, l, c, float(self.cvd_cumulative)
])
# reset
self._b_ts_start = self._b_ts_end = None
self._obi_o = self._obi_h = self._obi_l = self._obi_c = None
def add_flat_bucket(self, ts_start_ms: int, ts_end_ms: int) -> None:
v = float(self.obi_value)
self._series.append([
int(ts_start_ms), int(ts_end_ms),
v, v, v, v, float(self.cvd_cumulative)
])
# ------------------------------
# Output
# ------------------------------
def get_series(self):
return self._series