"""Core data models for orderbook reconstruction and backtesting. This module defines lightweight data structures for orderbook levels, trades, book snapshots, and the in-memory `Book` container used by the `Storage` layer. """ from dataclasses import dataclass, field from typing import Dict, List @dataclass(slots=True) class OrderbookLevel: """Represents a single price level on one side of the orderbook. Attributes: price: Price level for the orderbook entry. size: Total size at this price level. liquidation_count: Number of liquidations at this level. order_count: Number of resting orders at this level. """ price: float size: float liquidation_count: int order_count: int @dataclass(slots=True) class Trade: """Represents a single trade event.""" id: int trade_id: float price: float size: float side: str timestamp: int @dataclass(slots=True) class Metric: """Represents calculated metrics for a snapshot.""" snapshot_id: int timestamp: int obi: float cvd: float best_bid: float | None = None best_ask: float | None = None @dataclass class BookSnapshot: """In-memory representation of an orderbook state at a specific timestamp.""" id: int = 0 timestamp: int = 0 bids: Dict[float, OrderbookLevel] = field(default_factory=dict) asks: Dict[float, OrderbookLevel] = field(default_factory=dict) trades: List[Trade] = field(default_factory=list) class Book: """Container for managing orderbook snapshots and their evolution over time.""" def __init__(self) -> None: """Initialize an empty book.""" self.snapshots: List[BookSnapshot] = [] self.first_timestamp = 0 self.last_timestamp = 0 def add_snapshot(self, snapshot: BookSnapshot) -> None: """Add a snapshot to the book's history and update time bounds.""" self.snapshots.append(snapshot) if self.first_timestamp == 0 or snapshot.timestamp < self.first_timestamp: self.first_timestamp = snapshot.timestamp if snapshot.timestamp > self.last_timestamp: self.last_timestamp = snapshot.timestamp def create_snapshot(self, id: int, timestamp: int) -> BookSnapshot: """Create a new snapshot, add it to history, and return it. Copies bids/asks/trades from the previous snapshot to maintain continuity. """ prev_snapshot = self.snapshots[-1] if self.snapshots else BookSnapshot() snapshot = BookSnapshot( id=id, timestamp=timestamp, bids={ k: OrderbookLevel( price=v.price, size=v.size, liquidation_count=v.liquidation_count, order_count=v.order_count, ) for k, v in prev_snapshot.bids.items() }, asks={ k: OrderbookLevel( price=v.price, size=v.size, liquidation_count=v.liquidation_count, order_count=v.order_count, ) for k, v in prev_snapshot.asks.items() }, trades=prev_snapshot.trades.copy() if prev_snapshot.trades else [], ) self.add_snapshot(snapshot) return snapshot class MetricCalculator: """Calculator for OBI and CVD metrics from orderbook snapshots and trades.""" @staticmethod def calculate_obi(snapshot: BookSnapshot) -> float: """Calculate Order Book Imbalance for a snapshot. Formula: OBI = (Vb - Va) / (Vb + Va) Where Vb = total bid volume, Va = total ask volume Args: snapshot: BookSnapshot containing bids and asks data. Returns: OBI value between -1 and 1, or 0.0 if no volume. """ # Calculate total bid volume vb = sum(level.size for level in snapshot.bids.values()) # Calculate total ask volume va = sum(level.size for level in snapshot.asks.values()) # Handle edge case where total volume is zero if vb + va == 0: return 0.0 # Calculate OBI using standard formula obi = (vb - va) / (vb + va) # Ensure result is within expected bounds [-1, 1] return max(-1.0, min(1.0, obi)) @staticmethod def get_best_bid_ask(snapshot: BookSnapshot) -> tuple[float | None, float | None]: """Extract best bid and ask prices from a snapshot. Args: snapshot: BookSnapshot containing bids and asks data. Returns: Tuple of (best_bid, best_ask) or (None, None) if no data. """ best_bid = max(snapshot.bids.keys()) if snapshot.bids else None best_ask = min(snapshot.asks.keys()) if snapshot.asks else None return best_bid, best_ask @staticmethod def calculate_volume_delta(trades: List[Trade]) -> float: """Calculate Volume Delta for a list of trades. Volume Delta = Buy Volume - Sell Volume Buy trades (side = "buy") contribute positive volume Sell trades (side = "sell") contribute negative volume Args: trades: List of Trade objects for a specific timestamp. Returns: Volume delta value (can be positive, negative, or zero). """ buy_volume = sum(trade.size for trade in trades if trade.side == "buy") sell_volume = sum(trade.size for trade in trades if trade.side == "sell") return buy_volume - sell_volume @staticmethod def calculate_cvd(previous_cvd: float, volume_delta: float) -> float: """Calculate Cumulative Volume Delta. CVD_t = CVD_{t-1} + Volume_Delta_t Args: previous_cvd: Previous CVD value (use 0.0 for reset or first calculation). volume_delta: Current volume delta to add. Returns: New cumulative volume delta value. """ return previous_cvd + volume_delta