""" Database Connection Utility for Crypto Trading Bot Platform Provides connection pooling, session management, and database utilities """ import os import json import logging from contextlib import contextmanager from typing import Optional, Generator, Any, Dict, List, Union from pathlib import Path # Load environment variables from .env file if it exists try: from dotenv import load_dotenv env_file = Path(__file__).parent.parent / '.env' if env_file.exists(): load_dotenv(env_file) except ImportError: # dotenv not available, proceed without it pass from sqlalchemy import create_engine, Engine, text, event from sqlalchemy.orm import sessionmaker, Session, scoped_session from sqlalchemy.pool import QueuePool from sqlalchemy.exc import SQLAlchemyError, OperationalError, DisconnectionError from sqlalchemy.engine import make_url import time from functools import wraps from datetime import datetime, timedelta from .models import Base, create_all_tables, drop_all_tables, RawTrade # Configure logging logger = logging.getLogger(__name__) class DatabaseConfig: """Database configuration class""" def __init__(self): self.database_url = os.getenv( 'DATABASE_URL', 'postgresql://dashboard@localhost:5434/dashboard' ) # Connection pool settings self.pool_size = int(os.getenv('DB_POOL_SIZE', '5')) self.max_overflow = int(os.getenv('DB_MAX_OVERFLOW', '10')) self.pool_pre_ping = os.getenv('DB_POOL_PRE_PING', 'true').lower() == 'true' self.pool_recycle = int(os.getenv('DB_POOL_RECYCLE', '3600')) # 1 hour # Connection timeout settings self.connect_timeout = int(os.getenv('DB_CONNECT_TIMEOUT', '30')) self.statement_timeout = int(os.getenv('DB_STATEMENT_TIMEOUT', '30000')) # 30 seconds in ms # Retry settings self.max_retries = int(os.getenv('DB_MAX_RETRIES', '3')) self.retry_delay = float(os.getenv('DB_RETRY_DELAY', '1.0')) # SSL settings self.ssl_mode = os.getenv('DB_SSL_MODE', 'prefer') logger.info(f"Database configuration initialized for: {self._safe_url()}") def _safe_url(self) -> str: """Return database URL with password masked for logging""" url = make_url(self.database_url) return str(url.set(password="***")) def get_engine_kwargs(self) -> Dict[str, Any]: """Get SQLAlchemy engine configuration""" return { 'poolclass': QueuePool, 'pool_size': self.pool_size, 'max_overflow': self.max_overflow, 'pool_pre_ping': self.pool_pre_ping, 'pool_recycle': self.pool_recycle, 'connect_args': { 'connect_timeout': self.connect_timeout, 'options': f'-c statement_timeout={self.statement_timeout}', 'sslmode': self.ssl_mode, }, 'echo': False, # Disable SQL logging to reduce verbosity 'future': True, # Use SQLAlchemy 2.0 style } class DatabaseManager: """ Database manager with connection pooling and session management """ def __init__(self, config: Optional[DatabaseConfig] = None): self.config = config or DatabaseConfig() self._engine: Optional[Engine] = None self._session_factory: Optional[sessionmaker] = None self._scoped_session: Optional[scoped_session] = None def initialize(self) -> None: """Initialize database engine and session factory""" try: logger.info("Initializing database connection...") # Create engine with retry logic self._engine = self._create_engine_with_retry() # Setup session factory self._session_factory = sessionmaker( bind=self._engine, autocommit=False, autoflush=False, expire_on_commit=False ) # Setup scoped session for thread safety self._scoped_session = scoped_session(self._session_factory) # Add connection event listeners self._setup_connection_events() logger.info("Database connection initialized successfully") except Exception as e: logger.error(f"Failed to initialize database: {e}") raise def _create_engine_with_retry(self) -> Engine: """Create database engine with retry logic""" for attempt in range(self.config.max_retries): try: engine = create_engine( self.config.database_url, **self.config.get_engine_kwargs() ) # Test connection with engine.connect() as conn: conn.execute(text("SELECT 1")) logger.info("Database connection test successful") return engine except (OperationalError, DisconnectionError) as e: attempt_num = attempt + 1 if attempt_num == self.config.max_retries: logger.error(f"Failed to connect to database after {self.config.max_retries} attempts: {e}") raise logger.warning(f"Database connection attempt {attempt_num} failed: {e}. Retrying in {self.config.retry_delay}s...") time.sleep(self.config.retry_delay) def _setup_connection_events(self) -> None: """Setup SQLAlchemy connection event listeners""" if not self._engine: return @event.listens_for(self._engine, "connect") def set_sqlite_pragma(dbapi_connection, connection_record): """Set connection-level settings""" if 'postgresql' in str(self._engine.url): with dbapi_connection.cursor() as cursor: # Set timezone to UTC cursor.execute("SET timezone TO 'UTC'") # Set application name for monitoring cursor.execute("SET application_name TO 'crypto_trading_bot'") @event.listens_for(self._engine, "checkout") def checkout_listener(dbapi_connection, connection_record, connection_proxy): """Log connection checkout""" logger.debug("Database connection checked out from pool") @event.listens_for(self._engine, "checkin") def checkin_listener(dbapi_connection, connection_record): """Log connection checkin""" logger.debug("Database connection returned to pool") @property def engine(self) -> Engine: """Get database engine""" if not self._engine: raise RuntimeError("Database not initialized. Call initialize() first.") return self._engine @property def session_factory(self) -> sessionmaker: """Get session factory""" if not self._session_factory: raise RuntimeError("Database not initialized. Call initialize() first.") return self._session_factory def create_session(self) -> Session: """Create a new database session""" if not self._session_factory: raise RuntimeError("Database not initialized. Call initialize() first.") return self._session_factory() @contextmanager def get_session(self) -> Generator[Session, None, None]: """ Context manager for database sessions with automatic cleanup Usage: with db_manager.get_session() as session: # Use session here pass """ session = self.create_session() try: yield session session.commit() except Exception: session.rollback() raise finally: session.close() @contextmanager def get_scoped_session(self) -> Generator[Session, None, None]: """ Context manager for scoped sessions (thread-safe) Usage: with db_manager.get_scoped_session() as session: # Use session here pass """ if not self._scoped_session: raise RuntimeError("Database not initialized. Call initialize() first.") session = self._scoped_session() try: yield session session.commit() except Exception: session.rollback() raise finally: self._scoped_session.remove() def test_connection(self) -> bool: """Test database connection""" try: with self.get_session() as session: session.execute(text("SELECT 1")) logger.info("Database connection test successful") return True except Exception as e: logger.error(f"Database connection test failed: {e}") return False def get_pool_status(self) -> Dict[str, Any]: """Get connection pool status""" if not self._engine or not hasattr(self._engine.pool, 'size'): return {"status": "Pool not available"} pool = self._engine.pool return { "pool_size": pool.size(), "checked_in": pool.checkedin(), "checked_out": pool.checkedout(), "overflow": pool.overflow(), } def create_tables(self) -> None: """Create all database tables""" try: create_all_tables(self.engine) logger.info("Database tables created successfully") except Exception as e: logger.error(f"Failed to create database tables: {e}") raise def drop_tables(self) -> None: """Drop all database tables""" try: drop_all_tables(self.engine) logger.info("Database tables dropped successfully") except Exception as e: logger.error(f"Failed to drop database tables: {e}") raise def execute_schema_file(self, schema_file_path: str) -> None: """Execute SQL schema file""" try: with open(schema_file_path, 'r') as file: schema_sql = file.read() with self.get_session() as session: # Split and execute each statement statements = [stmt.strip() for stmt in schema_sql.split(';') if stmt.strip()] for statement in statements: if statement: session.execute(text(statement)) logger.info(f"Schema file executed successfully: {schema_file_path}") except Exception as e: logger.error(f"Failed to execute schema file {schema_file_path}: {e}") raise def close(self) -> None: """Close database connections and cleanup""" try: if self._scoped_session: self._scoped_session.remove() if self._engine: self._engine.dispose() logger.info("Database connections closed") except Exception as e: logger.error(f"Error closing database connections: {e}") def retry_on_database_error(max_retries: int = 3, delay: float = 1.0): """ Decorator to retry database operations on transient errors Args: max_retries: Maximum number of retry attempts delay: Delay between retries in seconds """ def decorator(func): @wraps(func) def wrapper(*args, **kwargs): for attempt in range(max_retries): try: return func(*args, **kwargs) except (OperationalError, DisconnectionError) as e: if attempt == max_retries - 1: logger.error(f"Database operation failed after {max_retries} attempts: {e}") raise logger.warning(f"Database operation failed (attempt {attempt + 1}): {e}. Retrying in {delay}s...") time.sleep(delay) return None return wrapper return decorator # Global database manager instance db_manager = DatabaseManager() def get_db_manager() -> DatabaseManager: """Get global database manager instance""" return db_manager def init_database(config: Optional[DatabaseConfig] = None) -> DatabaseManager: """ Initialize global database manager Args: config: Optional database configuration Returns: DatabaseManager instance """ global db_manager if config: db_manager = DatabaseManager(config) db_manager.initialize() return db_manager # Convenience functions for common operations def get_session() -> Generator[Session, None, None]: """Get database session (convenience function)""" return db_manager.get_session() def get_scoped_session() -> Generator[Session, None, None]: """Get scoped database session (convenience function)""" return db_manager.get_scoped_session() def test_connection() -> bool: """Test database connection (convenience function)""" return db_manager.test_connection() def get_pool_status() -> Dict[str, Any]: """Get connection pool status (convenience function)""" return db_manager.get_pool_status()