78 lines
2.6 KiB
Python
78 lines
2.6 KiB
Python
import sqlite3
|
|
import os
|
|
from typing import Any, Dict, List, Optional
|
|
import logging
|
|
|
|
class MarketDB:
|
|
def __init__(self, market: str, db_dir: str = ""):
|
|
db_name = f"{market}.db"
|
|
db_path = db_name if not db_dir else f"{db_dir.rstrip('/')}/{db_name}"
|
|
if db_dir:
|
|
os.makedirs(db_dir, exist_ok=True)
|
|
self.conn = sqlite3.connect(db_path)
|
|
logging.info(f"Connected to database at {db_path}")
|
|
self._create_tables()
|
|
|
|
def _create_tables(self):
|
|
cursor = self.conn.cursor()
|
|
cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS trades (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
instrument TEXT,
|
|
trade_id TEXT,
|
|
price REAL,
|
|
size REAL,
|
|
side TEXT,
|
|
timestamp TEXT
|
|
)
|
|
''')
|
|
cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS book (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
instrument TEXT,
|
|
bids TEXT,
|
|
asks TEXT,
|
|
timestamp TEXT
|
|
)
|
|
''')
|
|
self.conn.commit()
|
|
logging.info("Database tables ensured.")
|
|
|
|
def insert_trade(self, trade: Dict[str, Any]):
|
|
cursor = self.conn.cursor()
|
|
cursor.execute('''
|
|
INSERT INTO trades (instrument, trade_id, price, size, side, timestamp)
|
|
VALUES (?, ?, ?, ?, ?, ?)
|
|
''', (
|
|
trade.get('instrument'),
|
|
trade.get('trade_id'),
|
|
trade.get('price'),
|
|
trade.get('size'),
|
|
trade.get('side'),
|
|
trade.get('timestamp')
|
|
))
|
|
self.conn.commit()
|
|
logging.debug(f"Inserted trade: {trade}")
|
|
|
|
def insert_book(self, book: Dict[str, Any]):
|
|
cursor = self.conn.cursor()
|
|
bids = book.get('bids', [])
|
|
asks = book.get('asks', [])
|
|
best_bid = next((b for b in bids if float(b[1]) > 0), ['-', '-'])
|
|
best_ask = next((a for a in asks if float(a[1]) > 0), ['-', '-'])
|
|
cursor.execute('''
|
|
INSERT INTO book (instrument, bids, asks, timestamp)
|
|
VALUES (?, ?, ?, ?)
|
|
''', (
|
|
book.get('instrument'),
|
|
str(bids),
|
|
str(asks),
|
|
book.get('timestamp')
|
|
))
|
|
self.conn.commit()
|
|
logging.debug(f"Inserted book: {book.get('instrument', 'N/A')} ts:{book.get('timestamp', 'N/A')} bid:{best_bid} ask:{best_ask}")
|
|
|
|
def close(self):
|
|
self.conn.close()
|
|
logging.info("Database connection closed.")
|