# Database Operations Documentation ## Overview The Database Operations module (`database/operations.py`) provides a clean, centralized interface for all database interactions using the **Repository Pattern**. This approach abstracts SQL complexity from business logic, ensuring maintainable, testable, and consistent database operations across the entire application. ## Key Benefits ### πŸ—οΈ **Clean Architecture** - **Repository Pattern**: Separates data access logic from business logic - **Centralized Operations**: All database interactions go through well-defined APIs - **No Raw SQL**: Business logic never contains direct SQL queries - **Consistent Interface**: Standardized methods across all database operations ### πŸ›‘οΈ **Reliability & Safety** - **Automatic Transaction Management**: Sessions and commits handled automatically - **Error Handling**: Custom exceptions with proper context - **Connection Pooling**: Efficient database connection management - **Session Cleanup**: Automatic session management and cleanup ### πŸ”§ **Maintainability** - **Easy Testing**: Repository methods can be easily mocked for testing - **Database Agnostic**: Can change database implementations without affecting business logic - **Type Safety**: Full type hints for better IDE support and error detection - **Logging Integration**: Built-in logging for monitoring and debugging ## Architecture ``` β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ DatabaseOperations β”‚ β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ β”‚ β”‚ Health Check & Stats β”‚ β”‚ β”‚ β”‚ β€’ Connection health monitoring β”‚ β”‚ β”‚ β”‚ β€’ Database statistics β”‚ β”‚ β”‚ β”‚ β€’ Performance metrics β”‚ β”‚ β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β”‚ β”‚ β”‚ β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ β”‚ β”‚MarketDataRepo β”‚ β”‚RawTradeRepo β”‚ β”‚ BotRepo β”‚ β”‚ β”‚ β”‚ β”‚ β”‚ β”‚ β”‚ β”‚ β”‚ β”‚ β”‚ β€’ upsert_candle β”‚ β”‚ β€’ insert_data β”‚ β”‚ β€’ add β”‚ β”‚ β”‚ β”‚ β€’ get_candles β”‚ β”‚ β€’ get_trades β”‚ β”‚ β€’ get_by_id β”‚ β”‚ β”‚ β”‚ β€’ get_latest β”‚ β”‚ β€’ raw_websocket β”‚ β”‚ β€’ update/deleteβ”‚ β”‚ β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ BaseRepository β”‚ β”‚ β”‚ β”‚ β€’ Session Mgmt β”‚ β”‚ β€’ Error Logging β”‚ β”‚ β€’ DB Connection β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ ``` ## Quick Start ### Basic Usage ```python from database.operations import get_database_operations from data.common.data_types import OHLCVCandle from datetime import datetime, timezone # Get the database operations instance (singleton) db = get_database_operations() # Check database health if not db.health_check(): print("Database connection issue!") return # Store a candle candle = OHLCVCandle( exchange="okx", symbol="BTC-USDT", timeframe="5s", open=50000.0, high=50100.0, low=49900.0, close=50050.0, volume=1.5, trade_count=25, start_time=datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc), end_time=datetime(2024, 1, 1, 12, 0, 5, tzinfo=timezone.utc) ) # Store candle (with duplicate handling) success = db.market_data.upsert_candle(candle, force_update=False) if success: print("Candle stored successfully!") ``` ### With Data Collectors ```python import asyncio from data.exchanges.okx import OKXCollector from data.base_collector import DataType from database.operations import get_database_operations async def main(): # Initialize database operations db = get_database_operations() # The collector automatically uses the database operations module collector = OKXCollector( symbols=['BTC-USDT'], data_types=[DataType.TRADE], store_raw_data=True, # Stores raw WebSocket data force_update_candles=False # Ignore duplicate candles ) await collector.start() await asyncio.sleep(60) # Collect for 1 minute await collector.stop() # Check statistics stats = db.get_stats() print(f"Total bots: {stats['bot_count']}") print(f"Total candles: {stats['candle_count']}") print(f"Total raw trades: {stats['raw_trade_count']}") asyncio.run(main()) ``` ## API Reference ### DatabaseOperations Main entry point for all database operations. #### Methods ##### `health_check() -> bool` Test database connection health. ```python db = get_database_operations() if db.health_check(): print("βœ… Database is healthy") else: print("❌ Database connection issues") ``` ##### `get_stats() -> Dict[str, Any]` Get comprehensive database statistics. ```python stats = db.get_stats() print(f"Bots: {stats['bot_count']:,}") print(f"Candles: {stats['candle_count']:,}") print(f"Raw trades: {stats['raw_trade_count']:,}") print(f"Health: {stats['healthy']}") ``` ### MarketDataRepository Repository for `market_data` table operations (candles/OHLCV data). #### Methods ##### `upsert_candle(candle: OHLCVCandle, force_update: bool = False) -> bool` Store or update candle data with configurable duplicate handling. **Parameters:** - `candle`: OHLCVCandle object to store - `force_update`: If True, overwrites existing data; if False, ignores duplicates **Returns:** True if successful, False otherwise **Duplicate Handling:** - `force_update=False`: Uses `ON CONFLICT DO NOTHING` (preserves existing candles) - `force_update=True`: Uses `ON CONFLICT DO UPDATE SET` (overwrites existing candles) ```python # Store new candle, ignore if duplicate exists db.market_data.upsert_candle(candle, force_update=False) # Store candle, overwrite if duplicate exists db.market_data.upsert_candle(candle, force_update=True) ``` ##### `get_candles(symbol: str, timeframe: str, start_time: datetime, end_time: datetime, exchange: str = "okx") -> List[Dict[str, Any]]` Retrieve historical candle data. ```python from datetime import datetime, timezone candles = db.market_data.get_candles( symbol="BTC-USDT", timeframe="5s", start_time=datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc), end_time=datetime(2024, 1, 1, 13, 0, 0, tzinfo=timezone.utc), exchange="okx" ) for candle in candles: print(f"{candle['timestamp']}: O={candle['open']} H={candle['high']} L={candle['low']} C={candle['close']}") ``` ##### `get_latest_candle(symbol: str, timeframe: str, exchange: str = "okx") -> Optional[Dict[str, Any]]` Get the most recent candle for a symbol/timeframe combination. ```python latest = db.market_data.get_latest_candle("BTC-USDT", "5s") if latest: print(f"Latest 5s candle: {latest['close']} at {latest['timestamp']}") else: print("No candles found") ``` ### BotRepository Repository for `bots` table operations. #### Methods ##### `add(bot_data: Dict[str, Any]) -> Bot` Adds a new bot to the database. **Parameters:** - `bot_data`: Dictionary containing the bot's attributes (`name`, `strategy_name`, etc.) **Returns:** The newly created `Bot` object. ```python from decimal import Decimal bot_data = { "name": "MyTestBot", "strategy_name": "SimpleMACD", "symbol": "BTC-USDT", "timeframe": "1h", "status": "inactive", "virtual_balance": Decimal("10000"), } new_bot = db.bots.add(bot_data) print(f"Added bot with ID: {new_bot.id}") ``` ##### `get_by_id(bot_id: int) -> Optional[Bot]` Retrieves a bot by its unique ID. ```python bot = db.bots.get_by_id(1) if bot: print(f"Found bot: {bot.name}") ``` ##### `get_by_name(name: str) -> Optional[Bot]` Retrieves a bot by its unique name. ```python bot = db.bots.get_by_name("MyTestBot") if bot: print(f"Found bot with ID: {bot.id}") ``` ##### `update(bot_id: int, update_data: Dict[str, Any]) -> Optional[Bot]` Updates an existing bot's attributes. ```python from datetime import datetime, timezone update_payload = {"status": "active", "last_heartbeat": datetime.now(timezone.utc)} updated_bot = db.bots.update(1, update_payload) if updated_bot: print(f"Bot status updated to: {updated_bot.status}") ``` ##### `delete(bot_id: int) -> bool` Deletes a bot from the database. **Returns:** `True` if deletion was successful, `False` otherwise. ```python success = db.bots.delete(1) if success: print("Bot deleted successfully.") ``` ### RawTradeRepository Repository for `raw_trades` table operations (raw WebSocket data). #### Methods ##### `insert_market_data_point(data_point: MarketDataPoint) -> bool` Store raw market data from WebSocket streams. ```python from data.base_collector import MarketDataPoint, DataType from datetime import datetime, timezone data_point = MarketDataPoint( exchange="okx", symbol="BTC-USDT", timestamp=datetime.now(timezone.utc), data_type=DataType.TRADE, data={"price": 50000, "size": 0.1, "side": "buy"} ) success = db.raw_trades.insert_market_data_point(data_point) ``` ##### `insert_raw_websocket_data(exchange: str, symbol: str, data_type: str, raw_data: Dict[str, Any], timestamp: Optional[datetime] = None) -> bool` Store raw WebSocket data for debugging purposes. ```python db.raw_trades.insert_raw_websocket_data( exchange="okx", symbol="BTC-USDT", data_type="raw_trade", raw_data={"instId": "BTC-USDT", "px": "50000", "sz": "0.1"}, timestamp=datetime.now(timezone.utc) ) ``` ##### `get_raw_trades(symbol: str, data_type: str, start_time: datetime, end_time: datetime, exchange: str = "okx", limit: Optional[int] = None) -> List[Dict[str, Any]]` Retrieve raw trade data for analysis. ```python trades = db.raw_trades.get_raw_trades( symbol="BTC-USDT", data_type="trade", start_time=datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc), end_time=datetime(2024, 1, 1, 13, 0, 0, tzinfo=timezone.utc), limit=1000 ) ``` ##### `cleanup_old_raw_data(days_to_keep: int = 7) -> int` Clean up old raw data to prevent table bloat. **Parameters:** - `days_to_keep`: Number of days to retain raw data records. **Returns:** The number of records deleted. ```python # Clean up raw data older than 14 days deleted_count = db.raw_trades.cleanup_old_raw_data(days_to_keep=14) print(f"Deleted {deleted_count} old raw data records.") ``` ##### `get_raw_data_stats() -> Dict[str, Any]` Get statistics about raw data storage. **Returns:** A dictionary with statistics like total records, table size, etc. ```python raw_stats = db.raw_trades.get_raw_data_stats() print(f"Raw Trades Table Size: {raw_stats.get('table_size')}") print(f"Total Raw Records: {raw_stats.get('total_records')}") ``` ## Error Handling The database operations module includes comprehensive error handling with custom exceptions. ### DatabaseOperationError Custom exception for database operation failures. ```python from database.operations import DatabaseOperationError try: db.market_data.upsert_candle(candle) except DatabaseOperationError as e: logger.error(f"Database operation failed: {e}") # Handle the error appropriately ``` ### Best Practices 1. **Always Handle Exceptions**: Wrap database operations in try-catch blocks 2. **Check Health First**: Use `health_check()` before critical operations 3. **Monitor Performance**: Use `get_stats()` to monitor database growth 4. **Use Appropriate Repositories**: Use `market_data` for candles, `raw_trades` for raw data 5. **Handle Duplicates Appropriately**: Choose the right `force_update` setting ## Configuration ### Force Update Behavior The `force_update_candles` parameter in collectors controls duplicate handling: ```python # In OKX collector configuration collector = OKXCollector( symbols=['BTC-USDT'], force_update_candles=False # Default: ignore duplicates ) # Or enable force updates collector = OKXCollector( symbols=['BTC-USDT'], force_update_candles=True # Overwrite existing candles ) ``` ### Logging Integration Database operations automatically integrate with the application's logging system: ```python import logging from database.operations import get_database_operations logger = logging.getLogger(__name__) db = get_database_operations(logger) # All database operations will now log through your logger db.market_data.upsert_candle(candle) # Logs: "Stored candle: BTC-USDT 5s at ..." ``` ## Migration from Direct SQL If you have existing code using direct SQL, here's how to migrate: ### Before (Direct SQL - ❌ Don't do this) ```python # OLD WAY - direct SQL queries from database.connection import get_db_manager from sqlalchemy import text db_manager = get_db_manager() with db_manager.get_session() as session: session.execute(text(""" INSERT INTO market_data (exchange, symbol, timeframe, ...) VALUES (:exchange, :symbol, :timeframe, ...) """), {'exchange': 'okx', 'symbol': 'BTC-USDT', ...}) session.commit() ``` ### After (Repository Pattern - βœ… Correct way) ```python # NEW WAY - using repository pattern from database.operations import get_database_operations from data.common.data_types import OHLCVCandle db = get_database_operations() candle = OHLCVCandle(...) # Create candle object success = db.market_data.upsert_candle(candle) ``` The entire repository layer has been standardized to use the SQLAlchemy ORM internally, ensuring a consistent, maintainable, and database-agnostic approach. Raw SQL is avoided in favor of type-safe ORM queries. ## Performance Considerations ### Connection Pooling The database operations module automatically manages connection pooling through the underlying `DatabaseManager`. ### Batch Operations For high-throughput scenarios, consider batching operations: ```python # Store multiple candles efficiently candles = [candle1, candle2, candle3, ...] for candle in candles: db.market_data.upsert_candle(candle) ``` ### Monitoring Monitor database performance using the built-in statistics: ```python import time # Monitor database load while True: stats = db.get_stats() print(f"Candles: {stats['candle_count']:,}, Health: {stats['healthy']}") time.sleep(30) ``` ## Troubleshooting ### Common Issues #### 1. Connection Errors ```python if not db.health_check(): logger.error("Database connection failed - check connection settings") ``` #### 2. Duplicate Key Errors ```python # Use force_update=False to ignore duplicates db.market_data.upsert_candle(candle, force_update=False) ``` #### 3. Transaction Errors The repository automatically handles session management, but if you encounter issues: ```python try: db.market_data.upsert_candle(candle) except DatabaseOperationError as e: logger.error(f"Transaction failed: {e}") ``` ### Debug Mode Enable database query logging for debugging: ```python # Set environment variable import os os.environ['DEBUG'] = 'true' # This will log all SQL queries db = get_database_operations() ``` ## Related Documentation - **[Database Connection](../architecture/database.md)** - Connection pooling and configuration - **[Data Collectors](data_collectors.md)** - How collectors use database operations - **[Architecture Overview](../architecture/architecture.md)** - System design patterns --- *This documentation covers the repository pattern implementation in `database/operations.py`. For database schema details, see the [Architecture Documentation](../architecture/).*