from __future__ import annotations import sqlite3 from pathlib import Path from typing import Iterator, Optional, Tuple AGG_SCHEMA = """ CREATE TABLE IF NOT EXISTS book ( id INTEGER PRIMARY KEY AUTOINCREMENT, instrument TEXT NOT NULL, bids TEXT NOT NULL, asks TEXT NOT NULL, timestamp TEXT NOT NULL ); CREATE INDEX IF NOT EXISTS idx_book_instrument_ts ON book (instrument, timestamp); CREATE UNIQUE INDEX IF NOT EXISTS ux_book_inst_ts ON book(instrument, timestamp); """ def init_aggregated_db(db_path: Path) -> None: conn = sqlite3.connect(str(db_path)) try: cur = conn.cursor() cur.executescript(AGG_SCHEMA) conn.commit() finally: conn.close() def stream_source_book_rows(src_db: Path) -> Iterator[Tuple[str, str, str, str]]: conn = sqlite3.connect(str(src_db)) try: cur = conn.cursor() cur.execute( "SELECT instrument, bids, asks, timestamp FROM book ORDER BY timestamp ASC" ) for row in cur: yield row # (instrument, bids, asks, timestamp) finally: conn.close() def ingest_source_db(agg_db: Path, src_db: Path) -> int: conn = sqlite3.connect(str(agg_db)) inserted = 0 try: cur = conn.cursor() cur.execute("PRAGMA journal_mode = WAL;") cur.execute("PRAGMA synchronous = NORMAL;") cur.execute("PRAGMA temp_store = MEMORY;") cur.execute("PRAGMA cache_size = -20000;") # ~20MB for instrument, bids, asks, ts in stream_source_book_rows(src_db): try: cur.execute( "INSERT OR IGNORE INTO book (instrument, bids, asks, timestamp) VALUES (?, ?, ?, ?)", (instrument, bids, asks, ts), ) if cur.rowcount: inserted += 1 except sqlite3.Error: # Skip malformed rows silently during ingestion pass if inserted % 10000 == 0: conn.commit() conn.commit() finally: conn.close() return inserted def get_instrument_time_bounds(agg_db: Path, instrument: str) -> Optional[Tuple[int, int]]: """Return (min_timestamp_ms, max_timestamp_ms) for an instrument, or None if empty.""" conn = sqlite3.connect(str(agg_db)) try: cur = conn.cursor() cur.execute( "SELECT MIN(CAST(timestamp AS INTEGER)), MAX(CAST(timestamp AS INTEGER)) FROM book WHERE instrument = ?", (instrument,), ) row = cur.fetchone() if not row or row[0] is None or row[1] is None: return None return int(row[0]), int(row[1]) finally: conn.close()