90 lines
2.3 KiB
Python
90 lines
2.3 KiB
Python
from __future__ import annotations
|
|
|
|
import sqlite3
|
|
from pathlib import Path
|
|
from typing import Iterator, Optional, Tuple
|
|
|
|
|
|
AGG_SCHEMA = """
|
|
CREATE TABLE IF NOT EXISTS book (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
instrument TEXT NOT NULL,
|
|
bids TEXT NOT NULL,
|
|
asks TEXT NOT NULL,
|
|
timestamp TEXT NOT NULL
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_book_instrument_ts ON book (instrument, timestamp);
|
|
CREATE UNIQUE INDEX IF NOT EXISTS ux_book_inst_ts ON book(instrument, timestamp);
|
|
"""
|
|
|
|
|
|
def init_aggregated_db(db_path: Path) -> None:
|
|
conn = sqlite3.connect(str(db_path))
|
|
try:
|
|
cur = conn.cursor()
|
|
cur.executescript(AGG_SCHEMA)
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def stream_source_book_rows(src_db: Path) -> Iterator[Tuple[str, str, str, str]]:
|
|
conn = sqlite3.connect(str(src_db))
|
|
try:
|
|
cur = conn.cursor()
|
|
cur.execute(
|
|
"SELECT instrument, bids, asks, timestamp FROM book ORDER BY timestamp ASC"
|
|
)
|
|
for row in cur:
|
|
yield row # (instrument, bids, asks, timestamp)
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def ingest_source_db(agg_db: Path, src_db: Path) -> int:
|
|
conn = sqlite3.connect(str(agg_db))
|
|
inserted = 0
|
|
try:
|
|
cur = conn.cursor()
|
|
cur.execute("PRAGMA journal_mode = WAL;")
|
|
cur.execute("PRAGMA synchronous = NORMAL;")
|
|
cur.execute("PRAGMA temp_store = MEMORY;")
|
|
cur.execute("PRAGMA cache_size = -20000;") # ~20MB
|
|
|
|
for instrument, bids, asks, ts in stream_source_book_rows(src_db):
|
|
try:
|
|
cur.execute(
|
|
"INSERT OR IGNORE INTO book (instrument, bids, asks, timestamp) VALUES (?, ?, ?, ?)",
|
|
(instrument, bids, asks, ts),
|
|
)
|
|
if cur.rowcount:
|
|
inserted += 1
|
|
except sqlite3.Error:
|
|
# Skip malformed rows silently during ingestion
|
|
pass
|
|
if inserted % 10000 == 0:
|
|
conn.commit()
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
return inserted
|
|
|
|
|
|
def get_instrument_time_bounds(agg_db: Path, instrument: str) -> Optional[Tuple[int, int]]:
|
|
"""Return (min_timestamp_ms, max_timestamp_ms) for an instrument, or None if empty."""
|
|
conn = sqlite3.connect(str(agg_db))
|
|
try:
|
|
cur = conn.cursor()
|
|
cur.execute(
|
|
"SELECT MIN(CAST(timestamp AS INTEGER)), MAX(CAST(timestamp AS INTEGER)) FROM book WHERE instrument = ?",
|
|
(instrument,),
|
|
)
|
|
row = cur.fetchone()
|
|
if not row or row[0] is None or row[1] is None:
|
|
return None
|
|
return int(row[0]), int(row[1])
|
|
finally:
|
|
conn.close()
|
|
|
|
|