""" Database Operations Module This module provides a centralized `DatabaseOperations` class that serves as the main entry point for all database interactions. It follows the Repository pattern by composing individual repository classes, each responsible for a specific table. """ import logging from typing import Optional, Dict, Any from sqlalchemy import text from .repositories import ( BotRepository, MarketDataRepository, RawTradeRepository, StrategyRepository, DatabaseOperationError, ) class DatabaseOperations: """ Main database operations manager that provides access to all repositories. This is the main entry point for database operations, providing a centralized interface to all table-specific repositories. """ def __init__(self, logger: Optional[logging.Logger] = None): """Initialize database operations with optional logger.""" self.logger = logger # Initialize repositories self.bots = BotRepository(logger) self.market_data = MarketDataRepository(logger) self.raw_trades = RawTradeRepository(logger) self.strategies = StrategyRepository(logger) def health_check(self) -> bool: """ Perform a health check on database connections. Returns: True if database is healthy, False otherwise """ try: # We use one of the repositories to get a session with self.market_data.get_session() as session: result = session.execute(text("SELECT 1")) return result.fetchone() is not None except Exception as e: if self.logger: self.logger.error(f"Database health check failed: {e}") return False def get_stats(self) -> Dict[str, Any]: """ Get database statistics. Returns: Dictionary containing database statistics """ try: stats = { 'healthy': self.health_check(), 'repositories': { 'bots': 'BotRepository', 'market_data': 'MarketDataRepository', 'raw_trades': 'RawTradeRepository', 'strategies': 'StrategyRepository' } } # Use a single session for all counts for efficiency with self.market_data.get_session() as session: stats['bot_count'] = session.execute(text("SELECT COUNT(*) FROM bots")).scalar_one() stats['candle_count'] = session.execute(text("SELECT COUNT(*) FROM market_data")).scalar_one() stats['raw_trade_count'] = session.execute(text("SELECT COUNT(*) FROM raw_trades")).scalar_one() stats['strategy_runs_count'] = session.execute(text("SELECT COUNT(*) FROM strategy_runs")).scalar_one() stats['strategy_signals_count'] = session.execute(text("SELECT COUNT(*) FROM strategy_signals")).scalar_one() return stats except Exception as e: if self.logger: self.logger.error(f"Error getting database stats: {e}") return {'healthy': False, 'error': str(e)} # Singleton instance for global access _db_operations_instance: Optional[DatabaseOperations] = None def get_database_operations(logger: Optional[logging.Logger] = None) -> DatabaseOperations: """ Get the global database operations instance. Args: logger: Optional logger for database operations Returns: DatabaseOperations instance """ global _db_operations_instance if _db_operations_instance is None: _db_operations_instance = DatabaseOperations(logger) return _db_operations_instance