212 lines
7.6 KiB
Python
212 lines
7.6 KiB
Python
from dataclasses import dataclass
|
|
from math import inf
|
|
from pathlib import Path
|
|
from typing import Iterator, Tuple
|
|
import sqlite3
|
|
|
|
@dataclass(slots=True)
|
|
class OrderbookLevel:
|
|
"""
|
|
Represents a single price level in an orderbook.
|
|
|
|
Attributes:
|
|
price: Price level in quote currency
|
|
size: Total size available at this price level
|
|
liquidation_count: Number of liquidation orders at this level
|
|
order_count: Total number of orders at this level
|
|
"""
|
|
price: float
|
|
size: float
|
|
liquidation_count: int
|
|
order_count: int
|
|
|
|
class OrderbookUpdate:
|
|
"""
|
|
Container for a windowed orderbook update with temporal boundaries.
|
|
|
|
Represents an orderbook snapshot with a defined time window, used for
|
|
aggregating trades that occur within the same time period.
|
|
|
|
Attributes:
|
|
id: Unique identifier for this orderbook row
|
|
bids: List of bid price levels (highest price first)
|
|
asks: List of ask price levels (lowest price first)
|
|
timestamp: Window start time in milliseconds since epoch
|
|
end_timestamp: Window end time in milliseconds since epoch
|
|
"""
|
|
id: int
|
|
bids: str # JSON string representation of bid levels
|
|
asks: str # JSON string representation of ask levels
|
|
timestamp: str # String timestamp from database
|
|
end_timestamp: str # String end timestamp from database
|
|
|
|
def __init__(self, id: int, bids: str, asks: str, timestamp: str, end_timestamp: str) -> None:
|
|
"""
|
|
Initialize orderbook update with raw database values.
|
|
|
|
Args:
|
|
id: Database row identifier
|
|
bids: JSON string of bid levels from database
|
|
asks: JSON string of ask levels from database
|
|
timestamp: Timestamp string from database
|
|
end_timestamp: End timestamp string from database
|
|
"""
|
|
self.id = id
|
|
self.bids = bids
|
|
self.asks = asks
|
|
self.timestamp = timestamp
|
|
self.end_timestamp = end_timestamp
|
|
|
|
@dataclass
|
|
class Trade:
|
|
"""
|
|
Represents a single trade execution.
|
|
|
|
Attributes:
|
|
id: Unique database identifier for this trade
|
|
trade_id: Exchange-specific trade identifier
|
|
price: Execution price in quote currency
|
|
size: Trade size in base currency
|
|
side: Trade direction ("buy" or "sell")
|
|
timestamp: Execution time in milliseconds since epoch
|
|
"""
|
|
id: int
|
|
trade_id: str
|
|
price: float
|
|
size: float
|
|
side: str
|
|
timestamp: int
|
|
|
|
class DBInterpreter:
|
|
"""
|
|
Provides efficient streaming access to SQLite orderbook and trade data.
|
|
|
|
Handles batch reading from SQLite databases with optimized PRAGMA settings
|
|
for read-only access. Uses temporal windowing to associate trades with
|
|
orderbook snapshots based on timestamps.
|
|
|
|
Attributes:
|
|
db_path: Path to the SQLite database file
|
|
"""
|
|
|
|
def __init__(self, db_path: Path) -> None:
|
|
"""
|
|
Initialize database interpreter with path validation.
|
|
|
|
Args:
|
|
db_path: Path to SQLite database containing 'book' and 'trades' tables
|
|
|
|
Raises:
|
|
FileNotFoundError: If database file does not exist
|
|
|
|
Example:
|
|
>>> db_path = Path("data/BTC-USDT-2025-01-01.db")
|
|
>>> interpreter = DBInterpreter(db_path)
|
|
"""
|
|
if not db_path.exists():
|
|
raise FileNotFoundError(f"Database file not found: {db_path}")
|
|
self.db_path = db_path
|
|
|
|
def stream(self) -> Iterator[Tuple[OrderbookUpdate, list[Trade]]]:
|
|
"""
|
|
Stream orderbook updates and associated trades efficiently.
|
|
|
|
This performs two linear scans over the `book` and `trades` tables using
|
|
separate cursors and batches, avoiding N+1 queries and large `fetchall()`
|
|
calls. It preserves bids/asks for future use in visualizations while
|
|
yielding trades that fall in [timestamp, next_book_timestamp).
|
|
"""
|
|
# Use read-only immutable mode for faster reads and protection
|
|
conn = sqlite3.connect(f"file:{self.db_path}?mode=ro&immutable=1", uri=True)
|
|
try:
|
|
# Read-optimized PRAGMAs (safe in read-only mode)
|
|
conn.execute("PRAGMA query_only=ON")
|
|
conn.execute("PRAGMA synchronous=OFF")
|
|
conn.execute("PRAGMA temp_store=MEMORY")
|
|
# The following values can be tuned depending on available memory
|
|
conn.execute("PRAGMA mmap_size=268435456") # 256MB
|
|
conn.execute("PRAGMA cache_size=-200000") # ~200MB page cache
|
|
|
|
book_cur = conn.cursor()
|
|
trade_cur = conn.cursor()
|
|
|
|
# Keep bids/asks for future visuals; cast timestamps to integer
|
|
book_cur.execute(
|
|
"SELECT id, bids, asks, CAST(timestamp AS INTEGER) AS timestamp "
|
|
"FROM book ORDER BY timestamp ASC"
|
|
)
|
|
trade_cur.execute(
|
|
"SELECT id, trade_id, price, size, side, CAST(timestamp AS INTEGER) AS timestamp "
|
|
"FROM trades ORDER BY timestamp ASC"
|
|
)
|
|
|
|
BOOK_BATCH = 2048
|
|
TRADE_BATCH = 4096
|
|
|
|
# Helpers to stream book rows with one-row lookahead
|
|
book_buffer = []
|
|
book_index = 0
|
|
|
|
def fetch_one_book():
|
|
nonlocal book_buffer, book_index
|
|
if book_index >= len(book_buffer):
|
|
book_buffer = book_cur.fetchmany(BOOK_BATCH)
|
|
book_index = 0
|
|
if not book_buffer:
|
|
return None
|
|
row = book_buffer[book_index]
|
|
book_index += 1
|
|
return row # (id, bids, asks, ts)
|
|
|
|
# Helpers to stream trade rows
|
|
trade_buffer = []
|
|
trade_index = 0
|
|
|
|
def peek_trade():
|
|
nonlocal trade_buffer, trade_index
|
|
if trade_index >= len(trade_buffer):
|
|
trade_buffer = trade_cur.fetchmany(TRADE_BATCH)
|
|
trade_index = 0
|
|
if not trade_buffer:
|
|
return None
|
|
return trade_buffer[trade_index]
|
|
|
|
def advance_trade():
|
|
nonlocal trade_index
|
|
trade_index += 1
|
|
|
|
# Prime first two book rows to compute window end timestamps
|
|
current_book = fetch_one_book()
|
|
next_book = fetch_one_book()
|
|
|
|
while current_book is not None:
|
|
book_id, bids, asks, book_ts = current_book
|
|
end_ts = next_book[3] if next_book is not None else inf
|
|
|
|
# Collect trades in [book_ts, end_ts)
|
|
trades_here = []
|
|
while True:
|
|
t = peek_trade()
|
|
if t is None:
|
|
break
|
|
# trade row: (id, trade_id, price, size, side, ts)
|
|
trade_ts = t[5]
|
|
if trade_ts < book_ts:
|
|
# advance until we reach current window
|
|
advance_trade()
|
|
continue
|
|
if trade_ts >= end_ts:
|
|
# next book window starts; stop collecting
|
|
break
|
|
# within [book_ts, end_ts)
|
|
trades_here.append(t)
|
|
advance_trade()
|
|
|
|
ob_update = OrderbookUpdate(book_id, bids, asks, book_ts, end_ts)
|
|
yield ob_update, trades_here
|
|
|
|
# Advance to next window
|
|
current_book = next_book
|
|
next_book = fetch_one_book()
|
|
finally:
|
|
conn.close() |