orderflow_backtest/db_interpreter.py

212 lines
7.6 KiB
Python

from dataclasses import dataclass
from math import inf
from pathlib import Path
from typing import Iterator, Tuple
import sqlite3
@dataclass(slots=True)
class OrderbookLevel:
"""
Represents a single price level in an orderbook.
Attributes:
price: Price level in quote currency
size: Total size available at this price level
liquidation_count: Number of liquidation orders at this level
order_count: Total number of orders at this level
"""
price: float
size: float
liquidation_count: int
order_count: int
class OrderbookUpdate:
"""
Container for a windowed orderbook update with temporal boundaries.
Represents an orderbook snapshot with a defined time window, used for
aggregating trades that occur within the same time period.
Attributes:
id: Unique identifier for this orderbook row
bids: List of bid price levels (highest price first)
asks: List of ask price levels (lowest price first)
timestamp: Window start time in milliseconds since epoch
end_timestamp: Window end time in milliseconds since epoch
"""
id: int
bids: str # JSON string representation of bid levels
asks: str # JSON string representation of ask levels
timestamp: str # String timestamp from database
end_timestamp: str # String end timestamp from database
def __init__(self, id: int, bids: str, asks: str, timestamp: str, end_timestamp: str) -> None:
"""
Initialize orderbook update with raw database values.
Args:
id: Database row identifier
bids: JSON string of bid levels from database
asks: JSON string of ask levels from database
timestamp: Timestamp string from database
end_timestamp: End timestamp string from database
"""
self.id = id
self.bids = bids
self.asks = asks
self.timestamp = timestamp
self.end_timestamp = end_timestamp
@dataclass
class Trade:
"""
Represents a single trade execution.
Attributes:
id: Unique database identifier for this trade
trade_id: Exchange-specific trade identifier
price: Execution price in quote currency
size: Trade size in base currency
side: Trade direction ("buy" or "sell")
timestamp: Execution time in milliseconds since epoch
"""
id: int
trade_id: str
price: float
size: float
side: str
timestamp: int
class DBInterpreter:
"""
Provides efficient streaming access to SQLite orderbook and trade data.
Handles batch reading from SQLite databases with optimized PRAGMA settings
for read-only access. Uses temporal windowing to associate trades with
orderbook snapshots based on timestamps.
Attributes:
db_path: Path to the SQLite database file
"""
def __init__(self, db_path: Path) -> None:
"""
Initialize database interpreter with path validation.
Args:
db_path: Path to SQLite database containing 'book' and 'trades' tables
Raises:
FileNotFoundError: If database file does not exist
Example:
>>> db_path = Path("data/BTC-USDT-2025-01-01.db")
>>> interpreter = DBInterpreter(db_path)
"""
if not db_path.exists():
raise FileNotFoundError(f"Database file not found: {db_path}")
self.db_path = db_path
def stream(self) -> Iterator[Tuple[OrderbookUpdate, list[Trade]]]:
"""
Stream orderbook updates and associated trades efficiently.
This performs two linear scans over the `book` and `trades` tables using
separate cursors and batches, avoiding N+1 queries and large `fetchall()`
calls. It preserves bids/asks for future use in visualizations while
yielding trades that fall in [timestamp, next_book_timestamp).
"""
# Use read-only immutable mode for faster reads and protection
conn = sqlite3.connect(f"file:{self.db_path}?mode=ro&immutable=1", uri=True)
try:
# Read-optimized PRAGMAs (safe in read-only mode)
conn.execute("PRAGMA query_only=ON")
conn.execute("PRAGMA synchronous=OFF")
conn.execute("PRAGMA temp_store=MEMORY")
# The following values can be tuned depending on available memory
conn.execute("PRAGMA mmap_size=268435456") # 256MB
conn.execute("PRAGMA cache_size=-200000") # ~200MB page cache
book_cur = conn.cursor()
trade_cur = conn.cursor()
# Keep bids/asks for future visuals; cast timestamps to integer
book_cur.execute(
"SELECT id, bids, asks, CAST(timestamp AS INTEGER) AS timestamp "
"FROM book ORDER BY timestamp ASC"
)
trade_cur.execute(
"SELECT id, trade_id, price, size, side, CAST(timestamp AS INTEGER) AS timestamp "
"FROM trades ORDER BY timestamp ASC"
)
BOOK_BATCH = 2048
TRADE_BATCH = 4096
# Helpers to stream book rows with one-row lookahead
book_buffer = []
book_index = 0
def fetch_one_book():
nonlocal book_buffer, book_index
if book_index >= len(book_buffer):
book_buffer = book_cur.fetchmany(BOOK_BATCH)
book_index = 0
if not book_buffer:
return None
row = book_buffer[book_index]
book_index += 1
return row # (id, bids, asks, ts)
# Helpers to stream trade rows
trade_buffer = []
trade_index = 0
def peek_trade():
nonlocal trade_buffer, trade_index
if trade_index >= len(trade_buffer):
trade_buffer = trade_cur.fetchmany(TRADE_BATCH)
trade_index = 0
if not trade_buffer:
return None
return trade_buffer[trade_index]
def advance_trade():
nonlocal trade_index
trade_index += 1
# Prime first two book rows to compute window end timestamps
current_book = fetch_one_book()
next_book = fetch_one_book()
while current_book is not None:
book_id, bids, asks, book_ts = current_book
end_ts = next_book[3] if next_book is not None else inf
# Collect trades in [book_ts, end_ts)
trades_here = []
while True:
t = peek_trade()
if t is None:
break
# trade row: (id, trade_id, price, size, side, ts)
trade_ts = t[5]
if trade_ts < book_ts:
# advance until we reach current window
advance_trade()
continue
if trade_ts >= end_ts:
# next book window starts; stop collecting
break
# within [book_ts, end_ts)
trades_here.append(t)
advance_trade()
ob_update = OrderbookUpdate(book_id, bids, asks, book_ts, end_ts)
yield ob_update, trades_here
# Advance to next window
current_book = next_book
next_book = fetch_one_book()
finally:
conn.close()