import sqlite3 import os from typing import Any, Dict, List, Optional import logging from datetime import datetime import threading class MarketDB: def __init__(self, market: str, db_dir: str = "", date: Optional[datetime] = None): logging.debug(f"MarketDB.__init__ start for {market}") self.market = market self.db_dir = db_dir self.current_date = date or datetime.now() self.conn = None self._lock = threading.RLock() # Use re-entrant lock to avoid deadlock logging.debug(f"MarketDB.__init__ before _connect_db for {market}") self._connect_db() logging.debug(f"MarketDB.__init__ end for {market}") def _get_db_path(self, date: datetime) -> str: db_name = f"{self.market}-{date.strftime('%y-%m-%d')}.db" return db_name if not self.db_dir else f"{self.db_dir.rstrip('/')}/{db_name}" def _connect_db(self): logging.debug(f"MarketDB._connect_db start for {self.market}") with self._lock: # Thread-safe connection management try: if self.conn: try: self.conn.close() logging.debug(f"Closed previous connection for {self.market}") except Exception as e: logging.warning(f"Error closing previous connection: {e}") if self.db_dir: os.makedirs(self.db_dir, exist_ok=True) logging.debug(f"Ensured db_dir exists for {self.market}") db_path = self._get_db_path(self.current_date) logging.debug(f"Connecting to sqlite3 at {db_path} for {self.market}") self.conn = sqlite3.connect(db_path) self.conn.execute('PRAGMA journal_mode=WAL') logging.info(f"Connected to database at {db_path}") self._create_tables() logging.debug(f"MarketDB._connect_db end for {self.market}") except Exception as e: logging.error(f"Failed to connect to database: {e}") raise def check_and_rotate_db(self): logging.debug(f"MarketDB.check_and_rotate_db start for {self.market}") current_date = datetime.now() if current_date.date() != self.current_date.date(): logging.info(f"Date changed from {self.current_date.date()} to {current_date.date()}, rotating database") try: self.current_date = current_date self._connect_db() logging.info("Successfully rotated to new database") except Exception as e: logging.error(f"Failed to rotate database: {e}") try: self.current_date = current_date.replace(hour=0, minute=0, second=0, microsecond=0) self._connect_db() except Exception as fallback_error: logging.critical(f"Failed to reconnect to previous database: {fallback_error}") raise logging.debug(f"MarketDB.check_and_rotate_db end for {self.market}") def _create_tables(self): logging.debug(f"MarketDB._create_tables start for {self.market}") with self._lock: # Thread-safe table creation cursor = self.conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS trades ( id INTEGER PRIMARY KEY AUTOINCREMENT, instrument TEXT, trade_id TEXT, price REAL, size REAL, side TEXT, timestamp TEXT ) ''') cursor.execute(''' CREATE TABLE IF NOT EXISTS book ( id INTEGER PRIMARY KEY AUTOINCREMENT, instrument TEXT, bids TEXT, asks TEXT, timestamp TEXT ) ''') self.conn.commit() logging.info("Database tables ensured.") logging.debug(f"MarketDB._create_tables end for {self.market}") def insert_trade(self, trade: Dict[str, Any]): self.check_and_rotate_db() with self._lock: # Thread-safe insert try: cursor = self.conn.cursor() cursor.execute(''' INSERT INTO trades (instrument, trade_id, price, size, side, timestamp) VALUES (?, ?, ?, ?, ?, ?) ''', ( trade.get('instrument'), trade.get('trade_id'), trade.get('price'), trade.get('size'), trade.get('side'), trade.get('timestamp') )) self.conn.commit() logging.debug(f"Inserted trade: {trade}") except sqlite3.Error as e: logging.error(f"Database error during trade insert: {e}") self._handle_db_error() raise def insert_book(self, book: Dict[str, Any]): self.check_and_rotate_db() with self._lock: # Thread-safe insert try: cursor = self.conn.cursor() bids = book.get('bids', []) asks = book.get('asks', []) best_bid = next((b for b in bids if float(b[1]) > 0), ['-', '-']) best_ask = next((a for a in asks if float(a[1]) > 0), ['-', '-']) cursor.execute(''' INSERT INTO book (instrument, bids, asks, timestamp) VALUES (?, ?, ?, ?) ''', ( book.get('instrument'), str(bids), str(asks), book.get('timestamp') )) self.conn.commit() logging.debug(f"Inserted book: {book.get('instrument', 'N/A')} ts:{book.get('timestamp', 'N/A')} bid:{best_bid} ask:{best_ask}") except sqlite3.Error as e: logging.error(f"Database error during book insert: {e}") self._handle_db_error() raise def _handle_db_error(self): try: self._connect_db() except Exception as e: logging.critical(f"Failed to reconnect to database after error: {e}") raise def close(self): with self._lock: # Thread-safe close if self.conn: try: self.conn.close() logging.info("Database connection closed.") except Exception as e: logging.error(f"Error closing database connection: {e}") finally: self.conn = None