90 lines
2.3 KiB
Python

from __future__ import annotations
import sqlite3
from pathlib import Path
from typing import Iterator, Optional, Tuple
AGG_SCHEMA = """
CREATE TABLE IF NOT EXISTS book (
id INTEGER PRIMARY KEY AUTOINCREMENT,
instrument TEXT NOT NULL,
bids TEXT NOT NULL,
asks TEXT NOT NULL,
timestamp TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_book_instrument_ts ON book (instrument, timestamp);
CREATE UNIQUE INDEX IF NOT EXISTS ux_book_inst_ts ON book(instrument, timestamp);
"""
def init_aggregated_db(db_path: Path) -> None:
conn = sqlite3.connect(str(db_path))
try:
cur = conn.cursor()
cur.executescript(AGG_SCHEMA)
conn.commit()
finally:
conn.close()
def stream_source_book_rows(src_db: Path) -> Iterator[Tuple[str, str, str, str]]:
conn = sqlite3.connect(str(src_db))
try:
cur = conn.cursor()
cur.execute(
"SELECT instrument, bids, asks, timestamp FROM book ORDER BY timestamp ASC"
)
for row in cur:
yield row # (instrument, bids, asks, timestamp)
finally:
conn.close()
def ingest_source_db(agg_db: Path, src_db: Path) -> int:
conn = sqlite3.connect(str(agg_db))
inserted = 0
try:
cur = conn.cursor()
cur.execute("PRAGMA journal_mode = WAL;")
cur.execute("PRAGMA synchronous = NORMAL;")
cur.execute("PRAGMA temp_store = MEMORY;")
cur.execute("PRAGMA cache_size = -20000;") # ~20MB
for instrument, bids, asks, ts in stream_source_book_rows(src_db):
try:
cur.execute(
"INSERT OR IGNORE INTO book (instrument, bids, asks, timestamp) VALUES (?, ?, ?, ?)",
(instrument, bids, asks, ts),
)
if cur.rowcount:
inserted += 1
except sqlite3.Error:
# Skip malformed rows silently during ingestion
pass
if inserted % 10000 == 0:
conn.commit()
conn.commit()
finally:
conn.close()
return inserted
def get_instrument_time_bounds(agg_db: Path, instrument: str) -> Optional[Tuple[int, int]]:
"""Return (min_timestamp_ms, max_timestamp_ms) for an instrument, or None if empty."""
conn = sqlite3.connect(str(agg_db))
try:
cur = conn.cursor()
cur.execute(
"SELECT MIN(CAST(timestamp AS INTEGER)), MAX(CAST(timestamp AS INTEGER)) FROM book WHERE instrument = ?",
(instrument,),
)
row = cur.fetchone()
if not row or row[0] is None or row[1] is None:
return None
return int(row[0]), int(row[1])
finally:
conn.close()