orderflow_backtest/strategy.py

387 lines
16 KiB
Python

# strategy.py
import logging
from typing import List, Dict, Optional
from statistics import median
from datetime import datetime, timezone
class Strategy:
"""
Long-only CVD Divergence with ATR-based execution, fee-aware PnL, cooldown,
adaptive CVD strength, optional confirmation entry, and debug logging.
Configure logging in main.py, for example:
logging.basicConfig(
filename="strategy.log",
level=logging.DEBUG,
format="%(asctime)s %(levelname)s %(message)s"
)
"""
def __init__(
self,
# Core signal windows
lookback: int = 30,
min_volume_factor: float = 1.0,
# ATR & execution
atr_period: int = 14,
atr_mult_init: float = 2.0,
atr_mult_trail: float = 3.0,
breakeven_after_rr: float = 1.5,
min_bars_before_be: int = 2,
atr_min_rel_to_med: float = 1.0,
cooldown_bars: int = 3,
# Divergence strength thresholds
price_ll_min_atr: float = 0.05,
cvd_min_gap: float = 0.0, # if 0 → adaptive
cvd_gap_pct_of_range: float = 0.10, # 10% of rolling cumCVD range
# Entry confirmation
confirm_break_signal_high: bool = True,
# Fees
fee_rate: float = 0.002, # taker 0.20% per side
fee_rate_maker: float = 0.0008, # maker 0.08% per side
maker_entry: bool = False,
maker_exit: bool = False,
# Debug
debug: bool = False,
debug_level: int = 1, # 0=quiet, 1=key, 2=detail
debug_every_n_bars: int = 200,
):
# Params
self.lookback = lookback
self.min_volume_factor = min_volume_factor
self.atr_period = atr_period
self.atr_mult_init = atr_mult_init
self.atr_mult_trail = atr_mult_trail
self.breakeven_after_rr = breakeven_after_rr
self.min_bars_before_be = min_bars_before_be
self.atr_min_rel_to_med = atr_min_rel_to_med
self.cooldown_bars = cooldown_bars
self.price_ll_min_atr = price_ll_min_atr
self.cvd_min_gap = cvd_min_gap
self.cvd_gap_pct_of_range = cvd_gap_pct_of_range
self.confirm_break_signal_high = confirm_break_signal_high
self.fee_rate = fee_rate
self.fee_rate_maker = fee_rate_maker
self.maker_entry = maker_entry
self.maker_exit = maker_exit
self.debug = debug
self.debug_level = debug_level
self.debug_every_n_bars = debug_every_n_bars
# Runtime state
self._last_bar_i: int = 0
self._cum_cvd: List[float] = []
self._atr_vals: List[float] = []
self._in_position: bool = False
self._entry_price: float = 0.0
self._entry_i: int = -1
self._atr_at_entry: float = 0.0
self._stop: float = 0.0
self._pending_entry_i: Optional[int] = None
self._pending_from_signal_i: Optional[int] = None
self._signal_high: Optional[float] = None
self._cooldown_until_i: int = -1
self.trades: List[Dict] = []
# Debug counters
self._dbg = {
"bars_seen": 0,
"checked": 0,
"fail_div_price": 0,
"fail_div_cvd": 0,
"fail_vol": 0,
"fail_atr_regime": 0,
"fail_price_strength": 0,
"fail_cvd_strength": 0,
"pass_all": 0,
"signals_created": 0,
"signals_canceled_no_break": 0,
"skipped_cooldown": 0,
"entries": 0,
"trail_raises": 0,
"to_be": 0,
"exits_sl": 0,
"exits_be": 0,
"exits_eor": 0,
}
# ============ helpers ============
@staticmethod
def _fmt_ts(ms: int) -> str:
try:
return datetime.fromtimestamp(int(ms) / 1000, tz=timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
except Exception:
return str(ms)
def _ensure_cum_cvd(self, cvd_rows: List[List[float]], upto_len: int) -> None:
while len(self._cum_cvd) < upto_len:
i = len(self._cum_cvd)
bucket_net = float(cvd_rows[i][6]) if i < len(cvd_rows) else 0.0
prev = self._cum_cvd[-1] if self._cum_cvd else 0.0
self._cum_cvd.append(prev + bucket_net)
def _ensure_atr(self, bars: List[Dict], upto_len: int, metrics_atr: Optional[List[float]]) -> None:
if metrics_atr and len(metrics_atr) >= upto_len:
self._atr_vals = [float(x) for x in metrics_atr[:upto_len]]
return
while len(self._atr_vals) < upto_len:
i = len(self._atr_vals)
if i == 0:
self._atr_vals.append(0.0)
continue
h = float(bars[i]["high"])
l = float(bars[i]["low"])
pc = float(bars[i-1]["close"])
tr = max(h - l, abs(h - pc), abs(l - pc))
if i < self.atr_period:
prev_sum = (self._atr_vals[-1] * (i - 1)) if i > 1 else 0.0
atr = (prev_sum + tr) / float(i)
else:
prev_atr = self._atr_vals[-1]
atr = (prev_atr * (self.atr_period - 1) + tr) / float(self.atr_period)
self._atr_vals.append(atr)
# ============ filters ============
def _volume_ok(self, bars, i):
if self.min_volume_factor <= 0:
return True
start = max(0, i - self.lookback)
past = [b["volume"] for b in bars[start:i]] or [0.0]
med_v = median(past)
return (med_v == 0) or (bars[i]["volume"] >= self.min_volume_factor * med_v)
def _atr_ok(self, i):
if i <= 0 or i >= len(self._atr_vals):
return False
start = max(0, i - self.lookback)
window = self._atr_vals[start:i] or [0.0]
med_atr = median(window)
return (med_atr == 0.0) or (self._atr_vals[i] >= self.atr_min_rel_to_med * med_atr)
def _adaptive_cvd_gap(self, i):
if self.cvd_min_gap > 0.0:
return self.cvd_min_gap
start = max(0, i - self.lookback)
window = self._cum_cvd[start:i] or [0.0]
rng = (max(window) - min(window)) if window else 0.0
return rng * self.cvd_gap_pct_of_range
def _is_bullish_divergence(self, bars, i):
if i < self.lookback:
return False
self._dbg["checked"] += 1
start = i - self.lookback
wl = [b["low"] for b in bars[start:i]]
win_low = min(wl) if wl else bars[i]["low"]
pr_ll = bars[i]["low"] < win_low
if not pr_ll:
self._dbg["fail_div_price"] += 1
return False
wcvd = self._cum_cvd[start:i] or [self._cum_cvd[i]]
win_cvd_min = min(wcvd) if wcvd else self._cum_cvd[i]
cvd_hl = self._cum_cvd[i] > win_cvd_min
if not cvd_hl:
self._dbg["fail_div_cvd"] += 1
return False
if not self._volume_ok(bars, i):
self._dbg["fail_vol"] += 1
return False
if not self._atr_ok(i):
self._dbg["fail_atr_regime"] += 1
return False
atr_i = self._atr_vals[i]
price_gap = (win_low - bars[i]["low"])
if price_gap < self.price_ll_min_atr * atr_i:
self._dbg["fail_price_strength"] += 1
return False
required_gap = self._adaptive_cvd_gap(i)
cvd_gap = (self._cum_cvd[i] - win_cvd_min)
if cvd_gap < required_gap:
self._dbg["fail_cvd_strength"] += 1
return False
self._dbg["pass_all"] += 1
return True
# ============ execution ============
def _net_breakeven_price(self):
f_entry = self.fee_rate_maker if self.maker_entry else self.fee_rate
f_exit = self.fee_rate_maker if self.maker_exit else self.fee_rate
return self._entry_price * ((1.0 + f_entry) / max(1e-12, (1.0 - f_exit)))
def _do_enter(self, bars, i):
b = bars[i]
atr = float(self._atr_vals[i]) if i < len(self._atr_vals) else 0.0
self._in_position = True
self._entry_price = float(b["open"])
self._entry_i = i
self._atr_at_entry = atr
self._stop = self._entry_price - self.atr_mult_init * atr
self._pending_entry_i = None
self._signal_high = None
self._pending_from_signal_i = None
self._dbg["entries"] += 1
logging.info(f"[ENTRY] ts={b['timestamp_start']} ({self._fmt_ts(b['timestamp_start'])}) "
f"price={self._entry_price:.2f} stop={self._stop:.2f} (ATR={atr:.2f})")
def _exit_with_fees(self, bars, i, exit_price, reason):
entry = self.trades[-1] if self.trades and self.trades[-1].get("exit_i") is None else None
if not entry:
entry = {"entry_i": self._entry_i, "entry_ts": bars[self._entry_i]["timestamp_start"] if self._entry_i >= 0 else None,
"entry_price": self._entry_price}
self.trades.append(entry)
entry_price = float(entry["entry_price"])
exit_price = float(exit_price)
fr_entry = self.fee_rate_maker if self.maker_entry else self.fee_rate
fr_exit = self.fee_rate_maker if self.maker_exit else self.fee_rate
pnl_gross = (exit_price / entry_price) - 1.0
net_factor = (exit_price * (1.0 - fr_exit)) / (entry_price * (1.0 + fr_entry))
pnl_net = net_factor - 1.0
if reason == "SL": self._dbg["exits_sl"] += 1
elif reason == "BE": self._dbg["exits_be"] += 1
elif reason == "EoR": self._dbg["exits_eor"] += 1
entry.update({
"exit_i": i, "exit_ts": bars[i]["timestamp_start"], "exit_price": exit_price,
"pnl_gross_pct": pnl_gross * 100.0, "pnl_net_pct": pnl_net * 100.0,
"fees_pct": (fr_entry + fr_exit) * 100.0, "reason": reason,
"fee_rate_entry": fr_entry, "fee_rate_exit": fr_exit,
})
logging.info(f"[EXIT {reason}] ts={bars[i]['timestamp_start']} ({self._fmt_ts(bars[i]['timestamp_start'])}) "
f"pnl_net={pnl_net*100:.2f}% (gross={pnl_gross*100:.2f}%, fee={(fr_entry+fr_exit)*100:.2f}%)")
# ============ main ============
def process(self, processor):
bars = processor.bars
series = processor.get_metrics_series()
cvd_rows = series.get("cvd", [])
metrics_atr = series.get("atr")
n = min(len(bars), len(cvd_rows))
if n <= self._last_bar_i:
return
self._ensure_cum_cvd(cvd_rows, n)
self._ensure_atr(bars, n, metrics_atr)
for i in range(self._last_bar_i, n):
b = bars[i]
self._dbg["bars_seen"] += 1
# periodic snapshot
if self.debug and self.debug_level >= 1 and (i % max(1, self.debug_every_n_bars) == 0):
atr_i = self._atr_vals[i] if i < len(self._atr_vals) else 0.0
logging.debug(f"[BAR] i={i} ts={b['timestamp_start']} "
f"O={b['open']:.2f} H={b['high']:.2f} L={b['low']:.2f} C={b['close']:.2f} "
f"V={b['volume']:.4f} ATR={atr_i:.2f} CUMCVD={self._cum_cvd[i]:.2f}")
# pending entry
if self._pending_entry_i is not None and i == self._pending_entry_i and not self._in_position:
if self.confirm_break_signal_high and self._signal_high is not None:
if b["high"] > self._signal_high:
logging.debug(f"[CONFIRM] i={i} broke signal_high={self._signal_high:.2f} with H={b['high']:.2f} → ENTER")
self._do_enter(bars, i)
else:
self._dbg["signals_canceled_no_break"] += 1
logging.debug(f"[CANCEL] i={i} no break of signal_high={self._signal_high:.2f} (H={b['high']:.2f})")
self._pending_entry_i = None
self._signal_high = None
self._pending_from_signal_i = None
else:
self._do_enter(bars, i)
# manage position
if self._in_position:
if b["low"] <= self._stop:
be_price = self._net_breakeven_price()
reason = "BE" if self._stop >= be_price else "SL"
self._exit_with_fees(bars, i, max(self._stop, b["low"]), reason)
self._in_position = False
self._cooldown_until_i = i + self.cooldown_bars
logging.debug(f"[COOLDN] start i={i} until={self._cooldown_until_i}")
continue
atr_i = self._atr_vals[i]
new_trail = b["close"] - self.atr_mult_trail * atr_i
if new_trail > self._stop:
self._dbg["trail_raises"] += 1
logging.debug(f"[TRAIL] i={i} stop {self._stop:.2f}{new_trail:.2f} (ATR={atr_i:.2f})")
self._stop = new_trail
if (i - self._entry_i) >= self.min_bars_before_be and self._atr_at_entry > 0.0:
if b["close"] >= self._entry_price + self.breakeven_after_rr * self._atr_at_entry:
be_price = self._net_breakeven_price()
self._stop = max(self._stop, be_price, b["close"] - self.atr_mult_trail * atr_i)
self._dbg["to_be"] += 1
logging.debug(f"[BE] i={i} set stop ≥ netBE={be_price:.2f} now stop={self._stop:.2f}")
# new signal
if not self._in_position:
if i < self._cooldown_until_i:
self._dbg["skipped_cooldown"] += 1
else:
if self._is_bullish_divergence(bars, i):
self._signal_high = b["high"]
self._pending_from_signal_i = i
self._pending_entry_i = i + 1
self._dbg["signals_created"] += 1
logging.debug(f"[SIGNAL] i={i} ts={b['timestamp_start']} signal_high={self._signal_high:.2f}")
self._last_bar_i = n
def on_finish(self, processor):
bars = processor.bars
if self._in_position and bars:
last_i = len(bars) - 1
last_close = float(bars[last_i]["close"])
self._exit_with_fees(bars, last_i, last_close, "EoR")
self._in_position = False
d = self._dbg
logging.info(
"[SUMMARY] "
f"bars={d['bars_seen']} checked={d['checked']} pass={d['pass_all']} "
f"fail_price_div={d['fail_div_price']} fail_cvd_div={d['fail_div_cvd']} "
f"fail_vol={d['fail_vol']} fail_atr_regime={d['fail_atr_regime']} "
f"fail_price_str={d['fail_price_strength']} fail_cvd_str={d['fail_cvd_strength']} "
f"signals={d['signals_created']} canceled_no_break={d['signals_canceled_no_break']} "
f"cooldown_skips={d['skipped_cooldown']} entries={d['entries']} "
f"trail_raises={d['trail_raises']} to_be={d['to_be']} "
f"exits_sl={d['exits_sl']} exits_be={d['exits_be']} exits_eor={d['exits_eor']}"
)
def get_stats(self) -> Dict:
done = [t for t in self.trades if "pnl_net_pct" in t]
total = len(done)
wins = [t for t in done if t["pnl_net_pct"] > 0]
avg_net = (sum(t["pnl_net_pct"] for t in done) / total) if total else 0.0
sum_net = sum(t["pnl_net_pct"] for t in done)
equity = 1.0
for t in done:
equity *= (1.0 + t["pnl_net_pct"] / 100.0)
compounded_net = (equity - 1.0) * 100.0
avg_gross = (sum(t.get("pnl_gross_pct", 0.0) for t in done) / total) if total else 0.0
total_fees = sum((t.get("fees_pct") or 0.0) for t in done)
return {
"trades": total,
"win_rate": (len(wins) / total if total else 0.0),
"avg_pnl_pct": avg_net,
"sum_return_pct": sum_net,
"compounded_return_pct": compounded_net,
"avg_pnl_gross_pct": avg_gross,
"total_fees_pct": total_fees
}