- improve websocket a bit - updated database for daily rotation and locking threads, errors handling
159 lines
6.8 KiB
Python
159 lines
6.8 KiB
Python
import sqlite3
|
|
import os
|
|
from typing import Any, Dict, List, Optional
|
|
import logging
|
|
from datetime import datetime
|
|
import threading
|
|
|
|
class MarketDB:
|
|
def __init__(self, market: str, db_dir: str = "", date: Optional[datetime] = None):
|
|
logging.debug(f"MarketDB.__init__ start for {market}")
|
|
self.market = market
|
|
self.db_dir = db_dir
|
|
self.current_date = date or datetime.now()
|
|
self.conn = None
|
|
self._lock = threading.RLock() # Use re-entrant lock to avoid deadlock
|
|
logging.debug(f"MarketDB.__init__ before _connect_db for {market}")
|
|
self._connect_db()
|
|
logging.debug(f"MarketDB.__init__ end for {market}")
|
|
|
|
def _get_db_path(self, date: datetime) -> str:
|
|
db_name = f"{self.market}-{date.strftime('%y-%m-%d')}.db"
|
|
return db_name if not self.db_dir else f"{self.db_dir.rstrip('/')}/{db_name}"
|
|
|
|
def _connect_db(self):
|
|
logging.debug(f"MarketDB._connect_db start for {self.market}")
|
|
with self._lock: # Thread-safe connection management
|
|
try:
|
|
if self.conn:
|
|
try:
|
|
self.conn.close()
|
|
logging.debug(f"Closed previous connection for {self.market}")
|
|
except Exception as e:
|
|
logging.warning(f"Error closing previous connection: {e}")
|
|
if self.db_dir:
|
|
os.makedirs(self.db_dir, exist_ok=True)
|
|
logging.debug(f"Ensured db_dir exists for {self.market}")
|
|
db_path = self._get_db_path(self.current_date)
|
|
logging.debug(f"Connecting to sqlite3 at {db_path} for {self.market}")
|
|
self.conn = sqlite3.connect(db_path)
|
|
self.conn.execute('PRAGMA journal_mode=WAL')
|
|
logging.info(f"Connected to database at {db_path}")
|
|
self._create_tables()
|
|
logging.debug(f"MarketDB._connect_db end for {self.market}")
|
|
except Exception as e:
|
|
logging.error(f"Failed to connect to database: {e}")
|
|
raise
|
|
|
|
def check_and_rotate_db(self):
|
|
logging.debug(f"MarketDB.check_and_rotate_db start for {self.market}")
|
|
current_date = datetime.now()
|
|
if current_date.date() != self.current_date.date():
|
|
logging.info(f"Date changed from {self.current_date.date()} to {current_date.date()}, rotating database")
|
|
try:
|
|
self.current_date = current_date
|
|
self._connect_db()
|
|
logging.info("Successfully rotated to new database")
|
|
except Exception as e:
|
|
logging.error(f"Failed to rotate database: {e}")
|
|
try:
|
|
self.current_date = current_date.replace(hour=0, minute=0, second=0, microsecond=0)
|
|
self._connect_db()
|
|
except Exception as fallback_error:
|
|
logging.critical(f"Failed to reconnect to previous database: {fallback_error}")
|
|
raise
|
|
logging.debug(f"MarketDB.check_and_rotate_db end for {self.market}")
|
|
|
|
def _create_tables(self):
|
|
logging.debug(f"MarketDB._create_tables start for {self.market}")
|
|
with self._lock: # Thread-safe table creation
|
|
cursor = self.conn.cursor()
|
|
cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS trades (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
instrument TEXT,
|
|
trade_id TEXT,
|
|
price REAL,
|
|
size REAL,
|
|
side TEXT,
|
|
timestamp TEXT
|
|
)
|
|
''')
|
|
cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS book (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
instrument TEXT,
|
|
bids TEXT,
|
|
asks TEXT,
|
|
timestamp TEXT
|
|
)
|
|
''')
|
|
self.conn.commit()
|
|
logging.info("Database tables ensured.")
|
|
logging.debug(f"MarketDB._create_tables end for {self.market}")
|
|
|
|
def insert_trade(self, trade: Dict[str, Any]):
|
|
self.check_and_rotate_db()
|
|
with self._lock: # Thread-safe insert
|
|
try:
|
|
cursor = self.conn.cursor()
|
|
cursor.execute('''
|
|
INSERT INTO trades (instrument, trade_id, price, size, side, timestamp)
|
|
VALUES (?, ?, ?, ?, ?, ?)
|
|
''', (
|
|
trade.get('instrument'),
|
|
trade.get('trade_id'),
|
|
trade.get('price'),
|
|
trade.get('size'),
|
|
trade.get('side'),
|
|
trade.get('timestamp')
|
|
))
|
|
self.conn.commit()
|
|
logging.debug(f"Inserted trade: {trade}")
|
|
except sqlite3.Error as e:
|
|
logging.error(f"Database error during trade insert: {e}")
|
|
self._handle_db_error()
|
|
raise
|
|
|
|
def insert_book(self, book: Dict[str, Any]):
|
|
self.check_and_rotate_db()
|
|
with self._lock: # Thread-safe insert
|
|
try:
|
|
cursor = self.conn.cursor()
|
|
bids = book.get('bids', [])
|
|
asks = book.get('asks', [])
|
|
best_bid = next((b for b in bids if float(b[1]) > 0), ['-', '-'])
|
|
best_ask = next((a for a in asks if float(a[1]) > 0), ['-', '-'])
|
|
cursor.execute('''
|
|
INSERT INTO book (instrument, bids, asks, timestamp)
|
|
VALUES (?, ?, ?, ?)
|
|
''', (
|
|
book.get('instrument'),
|
|
str(bids),
|
|
str(asks),
|
|
book.get('timestamp')
|
|
))
|
|
self.conn.commit()
|
|
logging.debug(f"Inserted book: {book.get('instrument', 'N/A')} ts:{book.get('timestamp', 'N/A')} bid:{best_bid} ask:{best_ask}")
|
|
except sqlite3.Error as e:
|
|
logging.error(f"Database error during book insert: {e}")
|
|
self._handle_db_error()
|
|
raise
|
|
|
|
def _handle_db_error(self):
|
|
try:
|
|
self._connect_db()
|
|
except Exception as e:
|
|
logging.critical(f"Failed to reconnect to database after error: {e}")
|
|
raise
|
|
|
|
def close(self):
|
|
with self._lock: # Thread-safe close
|
|
if self.conn:
|
|
try:
|
|
self.conn.close()
|
|
logging.info("Database connection closed.")
|
|
except Exception as e:
|
|
logging.error(f"Error closing database connection: {e}")
|
|
finally:
|
|
self.conn = None |