MarketDataCollector/market_db.py
Vasily.onl 14272ab6eb - uv for package management
- improve websocket a bit
- updated database for daily rotation and locking threads, errors handling
2025-06-09 12:51:10 +08:00

159 lines
6.8 KiB
Python

import sqlite3
import os
from typing import Any, Dict, List, Optional
import logging
from datetime import datetime
import threading
class MarketDB:
def __init__(self, market: str, db_dir: str = "", date: Optional[datetime] = None):
logging.debug(f"MarketDB.__init__ start for {market}")
self.market = market
self.db_dir = db_dir
self.current_date = date or datetime.now()
self.conn = None
self._lock = threading.RLock() # Use re-entrant lock to avoid deadlock
logging.debug(f"MarketDB.__init__ before _connect_db for {market}")
self._connect_db()
logging.debug(f"MarketDB.__init__ end for {market}")
def _get_db_path(self, date: datetime) -> str:
db_name = f"{self.market}-{date.strftime('%y-%m-%d')}.db"
return db_name if not self.db_dir else f"{self.db_dir.rstrip('/')}/{db_name}"
def _connect_db(self):
logging.debug(f"MarketDB._connect_db start for {self.market}")
with self._lock: # Thread-safe connection management
try:
if self.conn:
try:
self.conn.close()
logging.debug(f"Closed previous connection for {self.market}")
except Exception as e:
logging.warning(f"Error closing previous connection: {e}")
if self.db_dir:
os.makedirs(self.db_dir, exist_ok=True)
logging.debug(f"Ensured db_dir exists for {self.market}")
db_path = self._get_db_path(self.current_date)
logging.debug(f"Connecting to sqlite3 at {db_path} for {self.market}")
self.conn = sqlite3.connect(db_path)
self.conn.execute('PRAGMA journal_mode=WAL')
logging.info(f"Connected to database at {db_path}")
self._create_tables()
logging.debug(f"MarketDB._connect_db end for {self.market}")
except Exception as e:
logging.error(f"Failed to connect to database: {e}")
raise
def check_and_rotate_db(self):
logging.debug(f"MarketDB.check_and_rotate_db start for {self.market}")
current_date = datetime.now()
if current_date.date() != self.current_date.date():
logging.info(f"Date changed from {self.current_date.date()} to {current_date.date()}, rotating database")
try:
self.current_date = current_date
self._connect_db()
logging.info("Successfully rotated to new database")
except Exception as e:
logging.error(f"Failed to rotate database: {e}")
try:
self.current_date = current_date.replace(hour=0, minute=0, second=0, microsecond=0)
self._connect_db()
except Exception as fallback_error:
logging.critical(f"Failed to reconnect to previous database: {fallback_error}")
raise
logging.debug(f"MarketDB.check_and_rotate_db end for {self.market}")
def _create_tables(self):
logging.debug(f"MarketDB._create_tables start for {self.market}")
with self._lock: # Thread-safe table creation
cursor = self.conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS trades (
id INTEGER PRIMARY KEY AUTOINCREMENT,
instrument TEXT,
trade_id TEXT,
price REAL,
size REAL,
side TEXT,
timestamp TEXT
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS book (
id INTEGER PRIMARY KEY AUTOINCREMENT,
instrument TEXT,
bids TEXT,
asks TEXT,
timestamp TEXT
)
''')
self.conn.commit()
logging.info("Database tables ensured.")
logging.debug(f"MarketDB._create_tables end for {self.market}")
def insert_trade(self, trade: Dict[str, Any]):
self.check_and_rotate_db()
with self._lock: # Thread-safe insert
try:
cursor = self.conn.cursor()
cursor.execute('''
INSERT INTO trades (instrument, trade_id, price, size, side, timestamp)
VALUES (?, ?, ?, ?, ?, ?)
''', (
trade.get('instrument'),
trade.get('trade_id'),
trade.get('price'),
trade.get('size'),
trade.get('side'),
trade.get('timestamp')
))
self.conn.commit()
logging.debug(f"Inserted trade: {trade}")
except sqlite3.Error as e:
logging.error(f"Database error during trade insert: {e}")
self._handle_db_error()
raise
def insert_book(self, book: Dict[str, Any]):
self.check_and_rotate_db()
with self._lock: # Thread-safe insert
try:
cursor = self.conn.cursor()
bids = book.get('bids', [])
asks = book.get('asks', [])
best_bid = next((b for b in bids if float(b[1]) > 0), ['-', '-'])
best_ask = next((a for a in asks if float(a[1]) > 0), ['-', '-'])
cursor.execute('''
INSERT INTO book (instrument, bids, asks, timestamp)
VALUES (?, ?, ?, ?)
''', (
book.get('instrument'),
str(bids),
str(asks),
book.get('timestamp')
))
self.conn.commit()
logging.debug(f"Inserted book: {book.get('instrument', 'N/A')} ts:{book.get('timestamp', 'N/A')} bid:{best_bid} ask:{best_ask}")
except sqlite3.Error as e:
logging.error(f"Database error during book insert: {e}")
self._handle_db_error()
raise
def _handle_db_error(self):
try:
self._connect_db()
except Exception as e:
logging.critical(f"Failed to reconnect to database after error: {e}")
raise
def close(self):
with self._lock: # Thread-safe close
if self.conn:
try:
self.conn.close()
logging.info("Database connection closed.")
except Exception as e:
logging.error(f"Error closing database connection: {e}")
finally:
self.conn = None