from dataclasses import dataclass from math import inf from pathlib import Path from typing import Iterator, Tuple import sqlite3 @dataclass(slots=True) class OrderbookLevel: """ Represents a single price level in an orderbook. Attributes: price: Price level in quote currency size: Total size available at this price level liquidation_count: Number of liquidation orders at this level order_count: Total number of orders at this level """ price: float size: float liquidation_count: int order_count: int class OrderbookUpdate: """ Container for a windowed orderbook update with temporal boundaries. Represents an orderbook snapshot with a defined time window, used for aggregating trades that occur within the same time period. Attributes: id: Unique identifier for this orderbook row bids: List of bid price levels (highest price first) asks: List of ask price levels (lowest price first) timestamp: Window start time in milliseconds since epoch end_timestamp: Window end time in milliseconds since epoch """ id: int bids: str # JSON string representation of bid levels asks: str # JSON string representation of ask levels timestamp: str # String timestamp from database end_timestamp: str # String end timestamp from database def __init__(self, id: int, bids: str, asks: str, timestamp: str, end_timestamp: str) -> None: """ Initialize orderbook update with raw database values. Args: id: Database row identifier bids: JSON string of bid levels from database asks: JSON string of ask levels from database timestamp: Timestamp string from database end_timestamp: End timestamp string from database """ self.id = id self.bids = bids self.asks = asks self.timestamp = timestamp self.end_timestamp = end_timestamp @dataclass class Trade: """ Represents a single trade execution. Attributes: id: Unique database identifier for this trade trade_id: Exchange-specific trade identifier price: Execution price in quote currency size: Trade size in base currency side: Trade direction ("buy" or "sell") timestamp: Execution time in milliseconds since epoch """ id: int trade_id: str price: float size: float side: str timestamp: int class DBInterpreter: """ Provides efficient streaming access to SQLite orderbook and trade data. Handles batch reading from SQLite databases with optimized PRAGMA settings for read-only access. Uses temporal windowing to associate trades with orderbook snapshots based on timestamps. Attributes: db_path: Path to the SQLite database file """ def __init__(self, db_path: Path) -> None: """ Initialize database interpreter with path validation. Args: db_path: Path to SQLite database containing 'book' and 'trades' tables Raises: FileNotFoundError: If database file does not exist Example: >>> db_path = Path("data/BTC-USDT-2025-01-01.db") >>> interpreter = DBInterpreter(db_path) """ if not db_path.exists(): raise FileNotFoundError(f"Database file not found: {db_path}") self.db_path = db_path def stream(self) -> Iterator[Tuple[OrderbookUpdate, list[Trade]]]: """ Stream orderbook updates and associated trades efficiently. This performs two linear scans over the `book` and `trades` tables using separate cursors and batches, avoiding N+1 queries and large `fetchall()` calls. It preserves bids/asks for future use in visualizations while yielding trades that fall in [timestamp, next_book_timestamp). """ # Use read-only immutable mode for faster reads and protection conn = sqlite3.connect(f"file:{self.db_path}?mode=ro&immutable=1", uri=True) try: # Read-optimized PRAGMAs (safe in read-only mode) conn.execute("PRAGMA query_only=ON") conn.execute("PRAGMA synchronous=OFF") conn.execute("PRAGMA temp_store=MEMORY") # The following values can be tuned depending on available memory conn.execute("PRAGMA mmap_size=268435456") # 256MB conn.execute("PRAGMA cache_size=-200000") # ~200MB page cache book_cur = conn.cursor() trade_cur = conn.cursor() # Keep bids/asks for future visuals; cast timestamps to integer book_cur.execute( "SELECT id, bids, asks, CAST(timestamp AS INTEGER) AS timestamp " "FROM book ORDER BY timestamp ASC" ) trade_cur.execute( "SELECT id, trade_id, price, size, side, CAST(timestamp AS INTEGER) AS timestamp " "FROM trades ORDER BY timestamp ASC" ) BOOK_BATCH = 2048 TRADE_BATCH = 4096 # Helpers to stream book rows with one-row lookahead book_buffer = [] book_index = 0 def fetch_one_book(): nonlocal book_buffer, book_index if book_index >= len(book_buffer): book_buffer = book_cur.fetchmany(BOOK_BATCH) book_index = 0 if not book_buffer: return None row = book_buffer[book_index] book_index += 1 return row # (id, bids, asks, ts) # Helpers to stream trade rows trade_buffer = [] trade_index = 0 def peek_trade(): nonlocal trade_buffer, trade_index if trade_index >= len(trade_buffer): trade_buffer = trade_cur.fetchmany(TRADE_BATCH) trade_index = 0 if not trade_buffer: return None return trade_buffer[trade_index] def advance_trade(): nonlocal trade_index trade_index += 1 # Prime first two book rows to compute window end timestamps current_book = fetch_one_book() next_book = fetch_one_book() while current_book is not None: book_id, bids, asks, book_ts = current_book end_ts = next_book[3] if next_book is not None else inf # Collect trades in [book_ts, end_ts) trades_here = [] while True: t = peek_trade() if t is None: break # trade row: (id, trade_id, price, size, side, ts) trade_ts = t[5] if trade_ts < book_ts: # advance until we reach current window advance_trade() continue if trade_ts >= end_ts: # next book window starts; stop collecting break # within [book_ts, end_ts) trades_here.append(t) advance_trade() ob_update = OrderbookUpdate(book_id, bids, asks, book_ts, end_ts) yield ob_update, trades_here # Advance to next window current_book = next_book next_book = fetch_one_book() finally: conn.close()