16 KiB

API Documentation

Overview

This document provides comprehensive API documentation for the Orderflow Backtest System, including public interfaces, data models, and usage examples.

Core Data Models

OrderbookLevel

Represents a single price level in the orderbook.

@dataclass(slots=True)
class OrderbookLevel:
    price: float                # Price level
    size: float                 # Total size at this price
    liquidation_count: int      # Number of liquidations
    order_count: int           # Number of resting orders

Example:

level = OrderbookLevel(
    price=50000.0,
    size=10.5,
    liquidation_count=0,
    order_count=3
)

Trade

Represents a single trade execution.

@dataclass(slots=True)
class Trade:
    id: int                    # Unique trade identifier
    trade_id: float           # Exchange trade ID
    price: float              # Execution price
    size: float               # Trade size
    side: str                 # "buy" or "sell"
    timestamp: int            # Unix timestamp

Example:

trade = Trade(
    id=1,
    trade_id=123456.0,
    price=50000.0,
    size=0.5,
    side="buy",
    timestamp=1640995200
)

BookSnapshot

Complete orderbook state at a specific timestamp.

@dataclass
class BookSnapshot:
    id: int                                              # Snapshot identifier
    timestamp: int                                       # Unix timestamp
    bids: Dict[float, OrderbookLevel]                   # Bid side levels
    asks: Dict[float, OrderbookLevel]                   # Ask side levels
    trades: List[Trade]                                 # Associated trades

Example:

snapshot = BookSnapshot(
    id=1,
    timestamp=1640995200,
    bids={
        50000.0: OrderbookLevel(50000.0, 10.0, 0, 1),
        49999.0: OrderbookLevel(49999.0, 5.0, 0, 1)
    },
    asks={
        50001.0: OrderbookLevel(50001.0, 3.0, 0, 1),
        50002.0: OrderbookLevel(50002.0, 2.0, 0, 1)
    },
    trades=[]
)

Metric

Calculated financial metrics for a snapshot.

@dataclass(slots=True)
class Metric:
    snapshot_id: int           # Reference to source snapshot
    timestamp: int             # Unix timestamp
    obi: float                # Order Book Imbalance [-1, 1]
    cvd: float                # Cumulative Volume Delta
    best_bid: float | None    # Best bid price
    best_ask: float | None    # Best ask price

Example:

metric = Metric(
    snapshot_id=1,
    timestamp=1640995200,
    obi=0.333,
    cvd=150.5,
    best_bid=50000.0,
    best_ask=50001.0
)

MetricCalculator API

Static class providing financial metric calculations.

calculate_obi()

@staticmethod
def calculate_obi(snapshot: BookSnapshot) -> float:
    """
    Calculate Order Book Imbalance.
    
    Formula: OBI = (Vb - Va) / (Vb + Va)
    
    Args:
        snapshot: BookSnapshot with bids and asks
        
    Returns:
        float: OBI value between -1 and 1
        
    Example:
        >>> obi = MetricCalculator.calculate_obi(snapshot)
        >>> print(f"OBI: {obi:.3f}")
        OBI: 0.333
    """

calculate_volume_delta()

@staticmethod
def calculate_volume_delta(trades: List[Trade]) -> float:
    """
    Calculate Volume Delta for trades.
    
    Formula: VD = Buy Volume - Sell Volume
    
    Args:
        trades: List of Trade objects
        
    Returns:
        float: Net volume delta
        
    Example:
        >>> vd = MetricCalculator.calculate_volume_delta(trades)
        >>> print(f"Volume Delta: {vd}")
        Volume Delta: 7.5
    """

calculate_cvd()

@staticmethod
def calculate_cvd(previous_cvd: float, volume_delta: float) -> float:
    """
    Calculate Cumulative Volume Delta.
    
    Formula: CVD_t = CVD_{t-1} + VD_t
    
    Args:
        previous_cvd: Previous CVD value
        volume_delta: Current volume delta
        
    Returns:
        float: New CVD value
        
    Example:
        >>> cvd = MetricCalculator.calculate_cvd(100.0, 7.5)
        >>> print(f"CVD: {cvd}")
        CVD: 107.5
    """

get_best_bid_ask()

@staticmethod
def get_best_bid_ask(snapshot: BookSnapshot) -> tuple[float | None, float | None]:
    """
    Extract best bid and ask prices.
    
    Args:
        snapshot: BookSnapshot with bids and asks
        
    Returns:
        tuple: (best_bid, best_ask) or (None, None)
        
    Example:
        >>> best_bid, best_ask = MetricCalculator.get_best_bid_ask(snapshot)
        >>> print(f"Spread: {best_ask - best_bid}")
        Spread: 1.0
    """

Repository APIs

SQLiteOrderflowRepository

Read-only repository for orderbook and trades data.

connect()

def connect(self) -> sqlite3.Connection:
    """
    Create optimized SQLite connection.
    
    Returns:
        sqlite3.Connection: Configured database connection
        
    Example:
        >>> repo = SQLiteOrderflowRepository(db_path)
        >>> with repo.connect() as conn:
        ...     # Use connection
    """

load_trades_by_timestamp()

def load_trades_by_timestamp(self, conn: sqlite3.Connection) -> Dict[int, List[Trade]]:
    """
    Load all trades grouped by timestamp.
    
    Args:
        conn: Active database connection
        
    Returns:
        Dict[int, List[Trade]]: Trades grouped by timestamp
        
    Example:
        >>> trades_by_ts = repo.load_trades_by_timestamp(conn)
        >>> trades_at_1000 = trades_by_ts.get(1000, [])
    """

iterate_book_rows()

def iterate_book_rows(self, conn: sqlite3.Connection) -> Iterator[Tuple[int, str, str, int]]:
    """
    Memory-efficient iteration over orderbook rows.
    
    Args:
        conn: Active database connection
        
    Yields:
        Tuple[int, str, str, int]: (id, bids_text, asks_text, timestamp)
        
    Example:
        >>> for row_id, bids, asks, ts in repo.iterate_book_rows(conn):
        ...     # Process row
    """

SQLiteMetricsRepository

Write-enabled repository for metrics storage and retrieval.

create_metrics_table()

def create_metrics_table(self, conn: sqlite3.Connection) -> None:
    """
    Create metrics table with indexes.
    
    Args:
        conn: Active database connection
        
    Raises:
        sqlite3.Error: If table creation fails
        
    Example:
        >>> repo.create_metrics_table(conn)
        >>> # Metrics table now available
    """

insert_metrics_batch()

def insert_metrics_batch(self, conn: sqlite3.Connection, metrics: List[Metric]) -> None:
    """
    Insert metrics in batch for performance.
    
    Args:
        conn: Active database connection
        metrics: List of Metric objects to insert
        
    Example:
        >>> metrics = [Metric(...), Metric(...)]
        >>> repo.insert_metrics_batch(conn, metrics)
        >>> conn.commit()
    """

load_metrics_by_timerange()

def load_metrics_by_timerange(
    self, 
    conn: sqlite3.Connection, 
    start_timestamp: int, 
    end_timestamp: int
) -> List[Metric]:
    """
    Load metrics within time range.
    
    Args:
        conn: Active database connection
        start_timestamp: Start time (inclusive)
        end_timestamp: End time (inclusive)
        
    Returns:
        List[Metric]: Metrics ordered by timestamp
        
    Example:
        >>> metrics = repo.load_metrics_by_timerange(conn, 1000, 2000)
        >>> print(f"Loaded {len(metrics)} metrics")
    """

Storage API

Storage

High-level data processing orchestrator.

init()

def __init__(self, instrument: str) -> None:
    """
    Initialize storage for specific instrument.
    
    Args:
        instrument: Trading pair identifier (e.g., "BTC-USDT")
        
    Example:
        >>> storage = Storage("BTC-USDT")
    """

build_booktick_from_db()

def build_booktick_from_db(self, db_path: Path, db_date: datetime) -> None:
    """
    Process database and calculate metrics.
    
    This is the main processing pipeline that:
    1. Loads orderbook and trades data
    2. Calculates OBI and CVD metrics per snapshot
    3. Stores metrics in database
    4. Populates book with snapshots
    
    Args:
        db_path: Path to SQLite database file
        db_date: Date for this database (informational)
        
    Example:
        >>> storage.build_booktick_from_db(Path("data.db"), datetime.now())
        >>> print(f"Processed {len(storage.book.snapshots)} snapshots")
    """

Strategy API

DefaultStrategy

Trading strategy with metrics analysis capabilities.

init()

def __init__(self, instrument: str) -> None:
    """
    Initialize strategy for instrument.
    
    Args:
        instrument: Trading pair identifier
        
    Example:
        >>> strategy = DefaultStrategy("BTC-USDT")
    """

set_db_path()

def set_db_path(self, db_path: Path) -> None:
    """
    Configure database path for metrics access.
    
    Args:
        db_path: Path to database with metrics
        
    Example:
        >>> strategy.set_db_path(Path("data.db"))
    """

load_stored_metrics()

def load_stored_metrics(self, start_timestamp: int, end_timestamp: int) -> List[Metric]:
    """
    Load stored metrics for analysis.
    
    Args:
        start_timestamp: Start of time range
        end_timestamp: End of time range
        
    Returns:
        List[Metric]: Metrics for specified range
        
    Example:
        >>> metrics = strategy.load_stored_metrics(1000, 2000)
        >>> latest_obi = metrics[-1].obi
    """

get_metrics_summary()

def get_metrics_summary(self, metrics: List[Metric]) -> dict:
    """
    Generate statistical summary of metrics.
    
    Args:
        metrics: List of metrics to analyze
        
    Returns:
        dict: Statistical summary with keys:
            - obi_min, obi_max, obi_avg
            - cvd_start, cvd_end, cvd_change
            - total_snapshots
            
    Example:
        >>> summary = strategy.get_metrics_summary(metrics)
        >>> print(f"OBI range: {summary['obi_min']:.3f} to {summary['obi_max']:.3f}")
    """

Visualizer API

Visualizer

Multi-chart visualization system.

init()

def __init__(self, window_seconds: int = 60, max_bars: int = 200) -> None:
    """
    Initialize visualizer with chart parameters.
    
    Args:
        window_seconds: OHLC aggregation window
        max_bars: Maximum bars to display
        
    Example:
        >>> visualizer = Visualizer(window_seconds=300, max_bars=1000)
    """

set_db_path()

def set_db_path(self, db_path: Path) -> None:
    """
    Configure database path for metrics loading.
    
    Args:
        db_path: Path to database with metrics
        
    Example:
        >>> visualizer.set_db_path(Path("data.db"))
    """

update_from_book()

def update_from_book(self, book: Book) -> None:
    """
    Update charts with book data and stored metrics.
    
    Creates 4-subplot layout:
    1. OHLC candlesticks
    2. Volume bars  
    3. OBI line chart
    4. CVD line chart
    
    Args:
        book: Book with snapshots for OHLC calculation
        
    Example:
        >>> visualizer.update_from_book(storage.book)
        >>> # Charts updated with latest data
    """

show()

def show() -> None:
    """
    Display interactive chart window.
    
    Example:
        >>> visualizer.show()
        >>> # Interactive Qt5 window opens
    """

Database Schema

Input Tables (Required)

These tables must exist in the SQLite database files:

book table

CREATE TABLE book (
    id INTEGER PRIMARY KEY,
    instrument TEXT,
    bids TEXT NOT NULL,        -- JSON array: [[price, size, liq_count, order_count], ...]
    asks TEXT NOT NULL,        -- JSON array: [[price, size, liq_count, order_count], ...]
    timestamp TEXT NOT NULL
);

trades table

CREATE TABLE trades (
    id INTEGER PRIMARY KEY,
    instrument TEXT,
    trade_id TEXT,
    price REAL NOT NULL,
    size REAL NOT NULL,
    side TEXT NOT NULL,        -- "buy" or "sell"
    timestamp TEXT NOT NULL
);

Output Table (Auto-created)

This table is automatically created by the system:

metrics table

CREATE TABLE metrics (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    snapshot_id INTEGER NOT NULL,
    timestamp TEXT NOT NULL,
    obi REAL NOT NULL,         -- Order Book Imbalance [-1, 1]
    cvd REAL NOT NULL,         -- Cumulative Volume Delta
    best_bid REAL,             -- Best bid price
    best_ask REAL,             -- Best ask price
    FOREIGN KEY (snapshot_id) REFERENCES book(id)
);

-- Performance indexes
CREATE INDEX idx_metrics_timestamp ON metrics(timestamp);
CREATE INDEX idx_metrics_snapshot_id ON metrics(snapshot_id);

Usage Examples

Complete Processing Workflow

from pathlib import Path
from datetime import datetime
from storage import Storage
from strategies import DefaultStrategy
from visualizer import Visualizer

# Initialize components
storage = Storage("BTC-USDT")
strategy = DefaultStrategy("BTC-USDT")
visualizer = Visualizer(window_seconds=60, max_bars=500)

# Process database
db_path = Path("data/BTC-USDT-25-06-09.db")
strategy.set_db_path(db_path)
visualizer.set_db_path(db_path)

# Build book and calculate metrics
storage.build_booktick_from_db(db_path, datetime.now())

# Analyze metrics
strategy.on_booktick(storage.book)

# Update visualization
visualizer.update_from_book(storage.book)
visualizer.show()

Metrics Analysis

# Load and analyze stored metrics
strategy = DefaultStrategy("BTC-USDT")
strategy.set_db_path(Path("data.db"))

# Get metrics for specific time range
metrics = strategy.load_stored_metrics(1640995200, 1640998800)

# Analyze metrics
summary = strategy.get_metrics_summary(metrics)
print(f"OBI Range: {summary['obi_min']:.3f} to {summary['obi_max']:.3f}")
print(f"CVD Change: {summary['cvd_change']:.1f}")

# Find significant imbalances
significant_obi = [m for m in metrics if abs(m.obi) > 0.2]
print(f"Found {len(significant_obi)} snapshots with >20% imbalance")

Custom Metric Calculations

from models import MetricCalculator

# Calculate metrics for single snapshot
obi = MetricCalculator.calculate_obi(snapshot)
best_bid, best_ask = MetricCalculator.get_best_bid_ask(snapshot)

# Calculate CVD over time
cvd = 0.0
for trades in trades_by_timestamp.values():
    volume_delta = MetricCalculator.calculate_volume_delta(trades)
    cvd = MetricCalculator.calculate_cvd(cvd, volume_delta)
    print(f"CVD: {cvd:.1f}")

Error Handling

Common Error Scenarios

Database Connection Issues

try:
    repo = SQLiteMetricsRepository(db_path)
    with repo.connect() as conn:
        metrics = repo.load_metrics_by_timerange(conn, start, end)
except sqlite3.Error as e:
    logging.error(f"Database error: {e}")
    metrics = []  # Fallback to empty list

Missing Metrics Table

repo = SQLiteMetricsRepository(db_path)
with repo.connect() as conn:
    if not repo.table_exists(conn, "metrics"):
        repo.create_metrics_table(conn)
        logging.info("Created metrics table")

Empty Data Handling

# All methods handle empty data gracefully
obi = MetricCalculator.calculate_obi(empty_snapshot)  # Returns 0.0
vd = MetricCalculator.calculate_volume_delta([])      # Returns 0.0
summary = strategy.get_metrics_summary([])           # Returns {}

This API documentation provides complete coverage of the public interfaces for the Orderflow Backtest System. For implementation details and architecture information, see the additional documentation in the docs/ directory.