diff --git a/data/exchanges/okx/collector.py b/data/exchanges/okx/collector.py index 24bdf00..b6689f3 100644 --- a/data/exchanges/okx/collector.py +++ b/data/exchanges/okx/collector.py @@ -21,7 +21,7 @@ from .websocket import ( ConnectionState, OKXWebSocketError ) from .data_processor import OKXDataProcessor -from database.connection import get_db_manager, get_raw_data_manager +from database.operations import get_database_operations, DatabaseOperationError from database.models import MarketData, RawTrade @@ -104,9 +104,8 @@ class OKXCollector(BaseDataCollector): self._data_processor.add_trade_callback(self._on_trade_processed) self._data_processor.add_candle_callback(self._on_candle_processed) - # Database managers - self._db_manager = None - self._raw_data_manager = None + # Database operations using new repository pattern + self._db_operations = None # Data processing counters self._message_count = 0 @@ -136,10 +135,8 @@ class OKXCollector(BaseDataCollector): if self.logger: self.logger.info(f"{self.component_name}: Connecting OKX collector for {self.symbol}") - # Initialize database managers - self._db_manager = get_db_manager() - if self.store_raw_data: - self._raw_data_manager = get_raw_data_manager() + # Initialize database operations using repository pattern + self._db_operations = get_database_operations(self.logger) # Create WebSocket client ws_component_name = f"okx_ws_{self.symbol.replace('-', '_').lower()}" @@ -370,22 +367,17 @@ class OKXCollector(BaseDataCollector): data_point: Raw market data point (trade, orderbook, ticker) """ try: - if not self._db_manager: + if not self._db_operations: return - # Store raw market data points in raw_trades table - with self._db_manager.get_session() as session: - raw_trade = RawTrade( - exchange="okx", - symbol=data_point.symbol, - timestamp=data_point.timestamp, - data_type=data_point.data_type.value, - raw_data=data_point.data - ) - session.add(raw_trade) - if self.logger: - self.logger.debug(f"{self.component_name}: Stored raw data: {data_point.data_type.value} for {data_point.symbol}") + # Store raw market data points in raw_trades table using repository + success = self._db_operations.raw_trades.insert_market_data_point(data_point) + if success and self.logger: + self.logger.debug(f"{self.component_name}: Stored raw data: {data_point.data_type.value} for {data_point.symbol}") + except DatabaseOperationError as e: + if self.logger: + self.logger.error(f"{self.component_name}: Database error storing raw market data: {e}") except Exception as e: if self.logger: self.logger.error(f"{self.component_name}: Error storing raw market data: {e}") @@ -402,70 +394,22 @@ class OKXCollector(BaseDataCollector): candle: Completed OHLCV candle """ try: - if not self._db_manager: + if not self._db_operations: return - # Use right-aligned timestamp (end_time) following industry standard - candle_timestamp = candle.end_time + # Store completed candles using repository pattern + success = self._db_operations.market_data.upsert_candle(candle, self.force_update_candles) - # Store completed candles in market_data table with configurable duplicate handling - with self._db_manager.get_session() as session: - if self.force_update_candles: - # Force update: Overwrite existing candles with new data - upsert_query = """ - INSERT INTO market_data ( - exchange, symbol, timeframe, timestamp, - open, high, low, close, volume, trades_count, - created_at, updated_at - ) VALUES ( - :exchange, :symbol, :timeframe, :timestamp, - :open, :high, :low, :close, :volume, :trades_count, - NOW(), NOW() - ) - ON CONFLICT (exchange, symbol, timeframe, timestamp) - DO UPDATE SET - open = EXCLUDED.open, - high = EXCLUDED.high, - low = EXCLUDED.low, - close = EXCLUDED.close, - volume = EXCLUDED.volume, - trades_count = EXCLUDED.trades_count, - updated_at = NOW() - """ - action_type = "Updated" - else: - # Keep existing: Ignore duplicates, preserve first candle - upsert_query = """ - INSERT INTO market_data ( - exchange, symbol, timeframe, timestamp, - open, high, low, close, volume, trades_count, - created_at, updated_at - ) VALUES ( - :exchange, :symbol, :timeframe, :timestamp, - :open, :high, :low, :close, :volume, :trades_count, - NOW(), NOW() - ) - ON CONFLICT (exchange, symbol, timeframe, timestamp) - DO NOTHING - """ - action_type = "Stored" - - session.execute(upsert_query, { - 'exchange': candle.exchange, - 'symbol': candle.symbol, - 'timeframe': candle.timeframe, - 'timestamp': candle_timestamp, - 'open': float(candle.open), - 'high': float(candle.high), - 'low': float(candle.low), - 'close': float(candle.close), - 'volume': float(candle.volume), - 'trades_count': candle.trade_count - }) - - if self.logger: - self.logger.info(f"{self.component_name}: {action_type} candle: {candle.symbol} {candle.timeframe} at {candle_timestamp} (force_update={self.force_update_candles}) - OHLCV: {candle.open}/{candle.high}/{candle.low}/{candle.close}, Vol: {candle.volume}, Trades: {candle.trade_count}") + if success and self.logger: + action = "Updated" if self.force_update_candles else "Stored" + self.logger.info(f"{self.component_name}: {action} candle: {candle.symbol} {candle.timeframe} at {candle.end_time} (force_update={self.force_update_candles}) - OHLCV: {candle.open}/{candle.high}/{candle.low}/{candle.close}, Vol: {candle.volume}, Trades: {candle.trade_count}") + except DatabaseOperationError as e: + if self.logger: + self.logger.error(f"{self.component_name}: Database error storing completed candle: {e}") + # Log candle details for debugging + self.logger.error(f"{self.component_name}: Failed candle details: {candle.symbol} {candle.timeframe} {candle.end_time} - OHLCV: {candle.open}/{candle.high}/{candle.low}/{candle.close}") + self._error_count += 1 except Exception as e: if self.logger: self.logger.error(f"{self.component_name}: Error storing completed candle: {e}") @@ -482,19 +426,24 @@ class OKXCollector(BaseDataCollector): raw_message: Raw WebSocket message """ try: - if not self._raw_data_manager or 'data' not in raw_message: + if not self._db_operations or 'data' not in raw_message: return - # Store each data item as a separate raw data record + # Store each data item as a separate raw data record using repository for data_item in raw_message['data']: - self._raw_data_manager.store_raw_data( + success = self._db_operations.raw_trades.insert_raw_websocket_data( exchange="okx", symbol=self.symbol, data_type=f"raw_{channel}", # Prefix with 'raw_' to distinguish from processed data raw_data=data_item, timestamp=datetime.now(timezone.utc) ) + if not success and self.logger: + self.logger.warning(f"{self.component_name}: Failed to store raw WebSocket data for {channel}") + except DatabaseOperationError as e: + if self.logger: + self.logger.error(f"{self.component_name}: Database error storing raw WebSocket data: {e}") except Exception as e: if self.logger: self.logger.error(f"{self.component_name}: Error storing raw WebSocket data: {e}") diff --git a/database/operations.py b/database/operations.py new file mode 100644 index 0000000..c3dd10e --- /dev/null +++ b/database/operations.py @@ -0,0 +1,513 @@ +""" +Database Operations Module + +This module provides centralized database operations for all tables, +following the Repository pattern to abstract SQL complexity from business logic. + +Benefits: +- Centralized database operations +- Clean API for different tables +- Easy to test and maintain +- Database implementation can change without affecting business logic +- Consistent error handling and logging +""" + +from datetime import datetime +from decimal import Decimal +from typing import List, Optional, Dict, Any, Union +from contextlib import contextmanager +import logging +import json +from sqlalchemy import text + +from .connection import get_db_manager +from .models import MarketData, RawTrade +from data.common.data_types import OHLCVCandle, StandardizedTrade +from data.base_collector import MarketDataPoint, DataType + + +class DatabaseOperationError(Exception): + """Custom exception for database operation errors.""" + pass + + +class BaseRepository: + """Base class for all repository classes.""" + + def __init__(self, logger: Optional[logging.Logger] = None): + """Initialize repository with optional logger.""" + self.logger = logger + self._db_manager = get_db_manager() + self._db_manager.initialize() + + def log_info(self, message: str) -> None: + """Log info message if logger is available.""" + if self.logger: + self.logger.info(message) + + def log_error(self, message: str) -> None: + """Log error message if logger is available.""" + if self.logger: + self.logger.error(message) + + @contextmanager + def get_session(self): + """Get database session with automatic cleanup.""" + if not self._db_manager: + raise DatabaseOperationError("Database manager not initialized") + + with self._db_manager.get_session() as session: + yield session + + +class MarketDataRepository(BaseRepository): + """Repository for market_data table operations.""" + + def upsert_candle(self, candle: OHLCVCandle, force_update: bool = False) -> bool: + """ + Insert or update a candle in the market_data table. + + Args: + candle: OHLCV candle to store + force_update: If True, update existing records; if False, ignore duplicates + + Returns: + True if operation successful, False otherwise + """ + try: + # Use right-aligned timestamp (end_time) following industry standard + candle_timestamp = candle.end_time + + with self.get_session() as session: + if force_update: + # Update existing records with new data + query = text(""" + INSERT INTO market_data ( + exchange, symbol, timeframe, timestamp, + open, high, low, close, volume, trades_count, + created_at + ) VALUES ( + :exchange, :symbol, :timeframe, :timestamp, + :open, :high, :low, :close, :volume, :trades_count, + NOW() + ) + ON CONFLICT (exchange, symbol, timeframe, timestamp) + DO UPDATE SET + open = EXCLUDED.open, + high = EXCLUDED.high, + low = EXCLUDED.low, + close = EXCLUDED.close, + volume = EXCLUDED.volume, + trades_count = EXCLUDED.trades_count + """) + action = "Updated" + else: + # Ignore duplicates, keep existing records + query = text(""" + INSERT INTO market_data ( + exchange, symbol, timeframe, timestamp, + open, high, low, close, volume, trades_count, + created_at + ) VALUES ( + :exchange, :symbol, :timeframe, :timestamp, + :open, :high, :low, :close, :volume, :trades_count, + NOW() + ) + ON CONFLICT (exchange, symbol, timeframe, timestamp) + DO NOTHING + """) + action = "Stored" + + session.execute(query, { + 'exchange': candle.exchange, + 'symbol': candle.symbol, + 'timeframe': candle.timeframe, + 'timestamp': candle_timestamp, + 'open': float(candle.open), + 'high': float(candle.high), + 'low': float(candle.low), + 'close': float(candle.close), + 'volume': float(candle.volume), + 'trades_count': candle.trade_count + }) + + session.commit() + + self.log_info(f"{action} candle: {candle.symbol} {candle.timeframe} at {candle_timestamp} (force_update={force_update})") + return True + + except Exception as e: + self.log_error(f"Error storing candle {candle.symbol} {candle.timeframe}: {e}") + raise DatabaseOperationError(f"Failed to store candle: {e}") + + def get_candles(self, + symbol: str, + timeframe: str, + start_time: datetime, + end_time: datetime, + exchange: str = "okx") -> List[Dict[str, Any]]: + """ + Retrieve candles from the database. + + Args: + symbol: Trading symbol + timeframe: Candle timeframe + start_time: Start timestamp + end_time: End timestamp + exchange: Exchange name + + Returns: + List of candle dictionaries + """ + try: + with self.get_session() as session: + query = text(""" + SELECT exchange, symbol, timeframe, timestamp, + open, high, low, close, volume, trades_count, + created_at, updated_at + FROM market_data + WHERE exchange = :exchange + AND symbol = :symbol + AND timeframe = :timeframe + AND timestamp >= :start_time + AND timestamp <= :end_time + ORDER BY timestamp ASC + """) + + result = session.execute(query, { + 'exchange': exchange, + 'symbol': symbol, + 'timeframe': timeframe, + 'start_time': start_time, + 'end_time': end_time + }) + + candles = [] + for row in result: + candles.append({ + 'exchange': row.exchange, + 'symbol': row.symbol, + 'timeframe': row.timeframe, + 'timestamp': row.timestamp, + 'open': row.open, + 'high': row.high, + 'low': row.low, + 'close': row.close, + 'volume': row.volume, + 'trades_count': row.trades_count, + 'created_at': row.created_at, + 'updated_at': row.updated_at + }) + + self.log_info(f"Retrieved {len(candles)} candles for {symbol} {timeframe}") + return candles + + except Exception as e: + self.log_error(f"Error retrieving candles for {symbol} {timeframe}: {e}") + raise DatabaseOperationError(f"Failed to retrieve candles: {e}") + + def get_latest_candle(self, symbol: str, timeframe: str, exchange: str = "okx") -> Optional[Dict[str, Any]]: + """ + Get the latest candle for a symbol and timeframe. + + Args: + symbol: Trading symbol + timeframe: Candle timeframe + exchange: Exchange name + + Returns: + Latest candle dictionary or None + """ + try: + with self.get_session() as session: + query = text(""" + SELECT exchange, symbol, timeframe, timestamp, + open, high, low, close, volume, trades_count, + created_at, updated_at + FROM market_data + WHERE exchange = :exchange + AND symbol = :symbol + AND timeframe = :timeframe + ORDER BY timestamp DESC + LIMIT 1 + """) + + result = session.execute(query, { + 'exchange': exchange, + 'symbol': symbol, + 'timeframe': timeframe + }) + + row = result.fetchone() + if row: + return { + 'exchange': row.exchange, + 'symbol': row.symbol, + 'timeframe': row.timeframe, + 'timestamp': row.timestamp, + 'open': row.open, + 'high': row.high, + 'low': row.low, + 'close': row.close, + 'volume': row.volume, + 'trades_count': row.trades_count, + 'created_at': row.created_at, + 'updated_at': row.updated_at + } + return None + + except Exception as e: + self.log_error(f"Error retrieving latest candle for {symbol} {timeframe}: {e}") + raise DatabaseOperationError(f"Failed to retrieve latest candle: {e}") + + +class RawTradeRepository(BaseRepository): + """Repository for raw_trades table operations.""" + + def insert_market_data_point(self, data_point: MarketDataPoint) -> bool: + """ + Insert a market data point into raw_trades table. + + Args: + data_point: Market data point to store + + Returns: + True if operation successful, False otherwise + """ + try: + with self.get_session() as session: + query = text(""" + INSERT INTO raw_trades ( + exchange, symbol, timestamp, data_type, raw_data, created_at + ) VALUES ( + :exchange, :symbol, :timestamp, :data_type, :raw_data, NOW() + ) + """) + + session.execute(query, { + 'exchange': data_point.exchange, + 'symbol': data_point.symbol, + 'timestamp': data_point.timestamp, + 'data_type': data_point.data_type.value, + 'raw_data': json.dumps(data_point.data) + }) + + session.commit() + + self.log_info(f"Stored raw {data_point.data_type.value} data for {data_point.symbol}") + return True + + except Exception as e: + self.log_error(f"Error storing raw data for {data_point.symbol}: {e}") + raise DatabaseOperationError(f"Failed to store raw data: {e}") + + def insert_raw_websocket_data(self, + exchange: str, + symbol: str, + data_type: str, + raw_data: Dict[str, Any], + timestamp: Optional[datetime] = None) -> bool: + """ + Insert raw WebSocket data for debugging purposes. + + Args: + exchange: Exchange name + symbol: Trading symbol + data_type: Type of data (e.g., 'raw_trades', 'raw_orderbook') + raw_data: Raw data dictionary + timestamp: Optional timestamp (defaults to now) + + Returns: + True if operation successful, False otherwise + """ + try: + if timestamp is None: + timestamp = datetime.now() + + with self.get_session() as session: + query = text(""" + INSERT INTO raw_trades ( + exchange, symbol, timestamp, data_type, raw_data, created_at + ) VALUES ( + :exchange, :symbol, :timestamp, :data_type, :raw_data, NOW() + ) + """) + + session.execute(query, { + 'exchange': exchange, + 'symbol': symbol, + 'timestamp': timestamp, + 'data_type': data_type, + 'raw_data': json.dumps(raw_data) + }) + + session.commit() + + self.log_info(f"Stored raw WebSocket data: {data_type} for {symbol}") + return True + + except Exception as e: + self.log_error(f"Error storing raw WebSocket data for {symbol}: {e}") + raise DatabaseOperationError(f"Failed to store raw WebSocket data: {e}") + + def get_raw_trades(self, + symbol: str, + data_type: str, + start_time: datetime, + end_time: datetime, + exchange: str = "okx", + limit: Optional[int] = None) -> List[Dict[str, Any]]: + """ + Retrieve raw trades from the database. + + Args: + symbol: Trading symbol + data_type: Data type filter + start_time: Start timestamp + end_time: End timestamp + exchange: Exchange name + limit: Maximum number of records to return + + Returns: + List of raw trade dictionaries + """ + try: + with self.get_session() as session: + query = text(""" + SELECT id, exchange, symbol, timestamp, data_type, raw_data, created_at + FROM raw_trades + WHERE exchange = :exchange + AND symbol = :symbol + AND data_type = :data_type + AND timestamp >= :start_time + AND timestamp <= :end_time + ORDER BY timestamp ASC + """) + + if limit: + query += f" LIMIT {limit}" + + result = session.execute(query, { + 'exchange': exchange, + 'symbol': symbol, + 'data_type': data_type, + 'start_time': start_time, + 'end_time': end_time + }) + + trades = [] + for row in result: + trades.append({ + 'id': row.id, + 'exchange': row.exchange, + 'symbol': row.symbol, + 'timestamp': row.timestamp, + 'data_type': row.data_type, + 'raw_data': row.raw_data, + 'created_at': row.created_at + }) + + self.log_info(f"Retrieved {len(trades)} raw trades for {symbol} {data_type}") + return trades + + except Exception as e: + self.log_error(f"Error retrieving raw trades for {symbol}: {e}") + raise DatabaseOperationError(f"Failed to retrieve raw trades: {e}") + + +class DatabaseOperations: + """ + Main database operations manager that provides access to all repositories. + + This is the main entry point for database operations, providing a + centralized interface to all table-specific repositories. + """ + + def __init__(self, logger: Optional[logging.Logger] = None): + """Initialize database operations with optional logger.""" + self.logger = logger + + # Initialize repositories + self.market_data = MarketDataRepository(logger) + self.raw_trades = RawTradeRepository(logger) + + def health_check(self) -> bool: + """ + Perform a health check on database connections. + + Returns: + True if database is healthy, False otherwise + """ + try: + with self.market_data.get_session() as session: + # Simple query to test connection + result = session.execute(text("SELECT 1")) + return result.fetchone() is not None + except Exception as e: + if self.logger: + self.logger.error(f"Database health check failed: {e}") + return False + + def get_stats(self) -> Dict[str, Any]: + """ + Get database statistics. + + Returns: + Dictionary containing database statistics + """ + try: + stats = { + 'healthy': self.health_check(), + 'repositories': { + 'market_data': 'MarketDataRepository', + 'raw_trades': 'RawTradeRepository' + } + } + + # Get table counts + with self.market_data.get_session() as session: + # Market data count + result = session.execute(text("SELECT COUNT(*) FROM market_data")) + stats['candle_count'] = result.fetchone()[0] + + # Raw trades count + result = session.execute(text("SELECT COUNT(*) FROM raw_trades")) + stats['raw_trade_count'] = result.fetchone()[0] + + return stats + + except Exception as e: + if self.logger: + self.logger.error(f"Error getting database stats: {e}") + return {'healthy': False, 'error': str(e)} + + +# Singleton instance for global access +_db_operations_instance: Optional[DatabaseOperations] = None + + +def get_database_operations(logger: Optional[logging.Logger] = None) -> DatabaseOperations: + """ + Get the global database operations instance. + + Args: + logger: Optional logger for database operations + + Returns: + DatabaseOperations instance + """ + global _db_operations_instance + + if _db_operations_instance is None: + _db_operations_instance = DatabaseOperations(logger) + + return _db_operations_instance + + +__all__ = [ + 'DatabaseOperationError', + 'MarketDataRepository', + 'RawTradeRepository', + 'DatabaseOperations', + 'get_database_operations' +] \ No newline at end of file diff --git a/database/schema_clean.sql b/database/schema_clean.sql index 109b343..9e35e81 100644 --- a/database/schema_clean.sql +++ b/database/schema_clean.sql @@ -27,7 +27,7 @@ CREATE TABLE market_data ( volume DECIMAL(18,8) NOT NULL, trades_count INTEGER, -- number of trades in this candle created_at TIMESTAMPTZ DEFAULT NOW(), - updated_at TIMESTAMPTZ DEFAULT NOW(), + -- updated_at TIMESTAMPTZ DEFAULT NOW(), CONSTRAINT unique_market_data UNIQUE(exchange, symbol, timeframe, timestamp) ); diff --git a/docs/architecture/architecture.md b/docs/architecture/architecture.md index d7cc353..962c9ca 100644 --- a/docs/architecture/architecture.md +++ b/docs/architecture/architecture.md @@ -194,6 +194,70 @@ def calculate_performance_metrics(portfolio_values: List[float]) -> dict: 6. **Virtual Trading**: Simulation-first approach with fee modeling 7. **Simplified Architecture**: Monolithic design with clear component boundaries for future scaling +## Repository Pattern for Database Operations + +### Database Abstraction Layer +The system uses the **Repository Pattern** to abstract database operations from business logic, providing a clean, maintainable, and testable interface for all data access. + +```python +# Centralized database operations +from database.operations import get_database_operations + +class DataCollector: + def __init__(self): + # Use repository pattern instead of direct SQL + self.db = get_database_operations() + + def store_candle(self, candle: OHLCVCandle): + """Store candle using repository pattern""" + success = self.db.market_data.upsert_candle(candle, force_update=False) + + def store_raw_trade(self, data_point: MarketDataPoint): + """Store raw trade data using repository pattern""" + success = self.db.raw_trades.insert_market_data_point(data_point) +``` + +### Repository Structure +```python +# Clean API for database operations +class DatabaseOperations: + def __init__(self): + self.market_data = MarketDataRepository() # Candle operations + self.raw_trades = RawTradeRepository() # Raw data operations + + def health_check(self) -> bool: + """Check database connection health""" + + def get_stats(self) -> dict: + """Get database statistics and metrics""" + +class MarketDataRepository: + def upsert_candle(self, candle: OHLCVCandle, force_update: bool = False) -> bool: + """Store or update candle with duplicate handling""" + + def get_candles(self, symbol: str, timeframe: str, start: datetime, end: datetime) -> List[dict]: + """Retrieve historical candle data""" + + def get_latest_candle(self, symbol: str, timeframe: str) -> Optional[dict]: + """Get most recent candle for symbol/timeframe""" + +class RawTradeRepository: + def insert_market_data_point(self, data_point: MarketDataPoint) -> bool: + """Store raw WebSocket data""" + + def get_raw_trades(self, symbol: str, data_type: str, start: datetime, end: datetime) -> List[dict]: + """Retrieve raw trade data for analysis""" +``` + +### Benefits of Repository Pattern +- **No Raw SQL**: Business logic never contains direct SQL queries +- **Centralized Operations**: All database interactions go through well-defined APIs +- **Easy Testing**: Repository methods can be easily mocked for unit tests +- **Database Agnostic**: Can change database implementations without affecting business logic +- **Automatic Transaction Management**: Sessions, commits, and rollbacks handled automatically +- **Consistent Error Handling**: Custom exceptions with proper context +- **Type Safety**: Full type hints for better IDE support and error detection + ## Database Architecture ### Core Tables diff --git a/docs/components/README.md b/docs/components/README.md index 6c004fe..a0971da 100644 --- a/docs/components/README.md +++ b/docs/components/README.md @@ -17,6 +17,18 @@ This section contains detailed technical documentation for all system components - Integration examples and patterns - Comprehensive troubleshooting guide +### Database Operations + +- **[Database Operations](database_operations.md)** - *Repository pattern for clean database interactions* + - **Repository Pattern** implementation for data access abstraction + - **MarketDataRepository** for candle/OHLCV operations + - **RawTradeRepository** for WebSocket data storage + - Automatic transaction management and session cleanup + - Configurable duplicate handling with force update options + - Custom error handling with DatabaseOperationError + - Database health monitoring and performance statistics + - Migration guide from direct SQL to repository pattern + ### Logging & Monitoring - **[Enhanced Logging System](logging.md)** - *Unified logging framework* diff --git a/docs/components/data_collectors.md b/docs/components/data_collectors.md index 1611ca1..c28931e 100644 --- a/docs/components/data_collectors.md +++ b/docs/components/data_collectors.md @@ -31,6 +31,17 @@ The Data Collector System provides a robust, scalable framework for collecting r - **Logging Integration**: Enhanced logging with configurable verbosity - **Multi-Timeframe Support**: Sub-second to daily candle aggregation (1s, 5s, 10s, 15s, 30s, 1m, 5m, 15m, 1h, 4h, 1d) +### πŸ›’οΈ **Database Integration** +- **Repository Pattern**: All database operations use the centralized `database/operations.py` module +- **No Raw SQL**: Clean API through `MarketDataRepository` and `RawTradeRepository` classes +- **Automatic Transaction Management**: Sessions, commits, and rollbacks handled automatically +- **Configurable Duplicate Handling**: `force_update_candles` parameter controls duplicate behavior +- **Real-time Storage**: Completed candles automatically saved to `market_data` table +- **Raw Data Storage**: Optional raw WebSocket data storage via `RawTradeRepository` +- **Custom Error Handling**: Proper exception handling with `DatabaseOperationError` +- **Health Monitoring**: Built-in database health checks and statistics +- **Connection Pooling**: Efficient database connection management through repositories + ## Architecture ``` @@ -233,26 +244,26 @@ The `get_status()` method returns comprehensive status information: { 'exchange': 'okx', 'status': 'running', # Current status - 'should_be_running': True, # Desired state - 'symbols': ['BTC-USDT', 'ETH-USDT'], # Configured symbols - 'data_types': ['ticker'], # Data types being collected - 'auto_restart': True, # Auto-restart enabled + 'should_be_running': True, # Desired state + 'symbols': ['BTC-USDT', 'ETH-USDT'], # Configured symbols + 'data_types': ['ticker'], # Data types being collected + 'auto_restart': True, # Auto-restart enabled 'health': { - 'time_since_heartbeat': 5.2, # Seconds since last heartbeat - 'time_since_data': 2.1, # Seconds since last data - 'max_silence_duration': 300.0 # Max allowed silence + 'time_since_heartbeat': 5.2, # Seconds since last heartbeat + 'time_since_data': 2.1, # Seconds since last data + 'max_silence_duration': 300.0 # Max allowed silence }, 'statistics': { - 'messages_received': 1250, # Total messages received - 'messages_processed': 1248, # Successfully processed - 'errors': 2, # Error count - 'restarts': 1, # Restart count - 'uptime_seconds': 3600.5, # Current uptime - 'reconnect_attempts': 0, # Current reconnect attempts - 'last_message_time': '2023-...', # ISO timestamp - 'connection_uptime': '2023-...', # Connection start time - 'last_error': 'Connection failed', # Last error message - 'last_restart_time': '2023-...' # Last restart time + 'messages_received': 1250, # Total messages received + 'messages_processed': 1248, # Successfully processed + 'errors': 2, # Error count + 'restarts': 1, # Restart count + 'uptime_seconds': 3600.5, # Current uptime + 'reconnect_attempts': 0, # Current reconnect attempts + 'last_message_time': '2023-...', # ISO timestamp + 'connection_uptime': '2023-...', # Connection start time + 'last_error': 'Connection failed', # Last error message + 'last_restart_time': '2023-...' # Last restart time } } ``` @@ -263,13 +274,13 @@ The `get_health_status()` method provides detailed health information: ```python { - 'is_healthy': True, # Overall health status - 'issues': [], # List of current issues - 'status': 'running', # Current collector status - 'last_heartbeat': '2023-...', # Last heartbeat timestamp - 'last_data_received': '2023-...', # Last data timestamp - 'should_be_running': True, # Expected state - 'is_running': True # Actual running state + 'is_healthy': True, # Overall health status + 'issues': [], # List of current issues + 'status': 'running', # Current collector status + 'last_heartbeat': '2023-...', # Last heartbeat timestamp + 'last_data_received': '2023-...', # Last data timestamp + 'should_be_running': True, # Expected state + 'is_running': True # Actual running state } ``` diff --git a/docs/components/database_operations.md b/docs/components/database_operations.md new file mode 100644 index 0000000..198be91 --- /dev/null +++ b/docs/components/database_operations.md @@ -0,0 +1,437 @@ +# Database Operations Documentation + +## Overview + +The Database Operations module (`database/operations.py`) provides a clean, centralized interface for all database interactions using the **Repository Pattern**. This approach abstracts SQL complexity from business logic, ensuring maintainable, testable, and consistent database operations across the entire application. + +## Key Benefits + +### πŸ—οΈ **Clean Architecture** +- **Repository Pattern**: Separates data access logic from business logic +- **Centralized Operations**: All database interactions go through well-defined APIs +- **No Raw SQL**: Business logic never contains direct SQL queries +- **Consistent Interface**: Standardized methods across all database operations + +### πŸ›‘οΈ **Reliability & Safety** +- **Automatic Transaction Management**: Sessions and commits handled automatically +- **Error Handling**: Custom exceptions with proper context +- **Connection Pooling**: Efficient database connection management +- **Session Cleanup**: Automatic session management and cleanup + +### πŸ”§ **Maintainability** +- **Easy Testing**: Repository methods can be easily mocked for testing +- **Database Agnostic**: Can change database implementations without affecting business logic +- **Type Safety**: Full type hints for better IDE support and error detection +- **Logging Integration**: Built-in logging for monitoring and debugging + +## Architecture + +``` +β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” +β”‚ DatabaseOperations β”‚ +β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ +β”‚ β”‚ Health Check & Stats β”‚ β”‚ +β”‚ β”‚ β€’ Connection health monitoring β”‚ β”‚ +β”‚ β”‚ β€’ Database statistics β”‚ β”‚ +β”‚ β”‚ β€’ Performance metrics β”‚ β”‚ +β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ +β”‚ β”‚ β”‚ +β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ +β”‚ β”‚MarketDataRepo β”‚ β”‚RawTradeRepo β”‚ β”‚ Future β”‚ β”‚ +β”‚ β”‚ β”‚ β”‚ β”‚ β”‚ Repositories β”‚ β”‚ +β”‚ β”‚ β€’ upsert_candle β”‚ β”‚ β€’ insert_data β”‚ β”‚ β€’ OrderBook β”‚ β”‚ +β”‚ β”‚ β€’ get_candles β”‚ β”‚ β€’ get_trades β”‚ β”‚ β€’ UserTrades β”‚ β”‚ +β”‚ β”‚ β€’ get_latest β”‚ β”‚ β€’ raw_websocket β”‚ β”‚ β€’ Positions β”‚ β”‚ +β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ +β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ + β”‚ + β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” + β”‚ BaseRepository β”‚ + β”‚ β”‚ + β”‚ β€’ Session Mgmt β”‚ + β”‚ β€’ Error Logging β”‚ + β”‚ β€’ DB Connection β”‚ + β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ +``` + +## Quick Start + +### Basic Usage + +```python +from database.operations import get_database_operations +from data.common.data_types import OHLCVCandle +from datetime import datetime, timezone + +# Get the database operations instance (singleton) +db = get_database_operations() + +# Check database health +if not db.health_check(): + print("Database connection issue!") + return + +# Store a candle +candle = OHLCVCandle( + exchange="okx", + symbol="BTC-USDT", + timeframe="5s", + open=50000.0, + high=50100.0, + low=49900.0, + close=50050.0, + volume=1.5, + trade_count=25, + start_time=datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc), + end_time=datetime(2024, 1, 1, 12, 0, 5, tzinfo=timezone.utc) +) + +# Store candle (with duplicate handling) +success = db.market_data.upsert_candle(candle, force_update=False) +if success: + print("Candle stored successfully!") +``` + +### With Data Collectors + +```python +import asyncio +from data.exchanges.okx import OKXCollector +from data.base_collector import DataType +from database.operations import get_database_operations + +async def main(): + # Initialize database operations + db = get_database_operations() + + # The collector automatically uses the database operations module + collector = OKXCollector( + symbols=['BTC-USDT'], + data_types=[DataType.TRADE], + store_raw_data=True, # Stores raw WebSocket data + force_update_candles=False # Ignore duplicate candles + ) + + await collector.start() + await asyncio.sleep(60) # Collect for 1 minute + await collector.stop() + + # Check statistics + stats = db.get_stats() + print(f"Total candles: {stats['candle_count']}") + print(f"Total raw trades: {stats['raw_trade_count']}") + +asyncio.run(main()) +``` + +## API Reference + +### DatabaseOperations + +Main entry point for all database operations. + +#### Methods + +##### `health_check() -> bool` +Test database connection health. + +```python +db = get_database_operations() +if db.health_check(): + print("βœ… Database is healthy") +else: + print("❌ Database connection issues") +``` + +##### `get_stats() -> Dict[str, Any]` +Get comprehensive database statistics. + +```python +stats = db.get_stats() +print(f"Candles: {stats['candle_count']:,}") +print(f"Raw trades: {stats['raw_trade_count']:,}") +print(f"Health: {stats['healthy']}") +``` + +### MarketDataRepository + +Repository for `market_data` table operations (candles/OHLCV data). + +#### Methods + +##### `upsert_candle(candle: OHLCVCandle, force_update: bool = False) -> bool` + +Store or update candle data with configurable duplicate handling. + +**Parameters:** +- `candle`: OHLCVCandle object to store +- `force_update`: If True, overwrites existing data; if False, ignores duplicates + +**Returns:** True if successful, False otherwise + +**Duplicate Handling:** +- `force_update=False`: Uses `ON CONFLICT DO NOTHING` (preserves existing candles) +- `force_update=True`: Uses `ON CONFLICT DO UPDATE SET` (overwrites existing candles) + +```python +# Store new candle, ignore if duplicate exists +db.market_data.upsert_candle(candle, force_update=False) + +# Store candle, overwrite if duplicate exists +db.market_data.upsert_candle(candle, force_update=True) +``` + +##### `get_candles(symbol: str, timeframe: str, start_time: datetime, end_time: datetime, exchange: str = "okx") -> List[Dict[str, Any]]` + +Retrieve historical candle data. + +```python +from datetime import datetime, timezone + +candles = db.market_data.get_candles( + symbol="BTC-USDT", + timeframe="5s", + start_time=datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc), + end_time=datetime(2024, 1, 1, 13, 0, 0, tzinfo=timezone.utc), + exchange="okx" +) + +for candle in candles: + print(f"{candle['timestamp']}: O={candle['open']} H={candle['high']} L={candle['low']} C={candle['close']}") +``` + +##### `get_latest_candle(symbol: str, timeframe: str, exchange: str = "okx") -> Optional[Dict[str, Any]]` + +Get the most recent candle for a symbol/timeframe combination. + +```python +latest = db.market_data.get_latest_candle("BTC-USDT", "5s") +if latest: + print(f"Latest 5s candle: {latest['close']} at {latest['timestamp']}") +else: + print("No candles found") +``` + +### RawTradeRepository + +Repository for `raw_trades` table operations (raw WebSocket data). + +#### Methods + +##### `insert_market_data_point(data_point: MarketDataPoint) -> bool` + +Store raw market data from WebSocket streams. + +```python +from data.base_collector import MarketDataPoint, DataType +from datetime import datetime, timezone + +data_point = MarketDataPoint( + exchange="okx", + symbol="BTC-USDT", + timestamp=datetime.now(timezone.utc), + data_type=DataType.TRADE, + data={"price": 50000, "size": 0.1, "side": "buy"} +) + +success = db.raw_trades.insert_market_data_point(data_point) +``` + +##### `insert_raw_websocket_data(exchange: str, symbol: str, data_type: str, raw_data: Dict[str, Any], timestamp: Optional[datetime] = None) -> bool` + +Store raw WebSocket data for debugging purposes. + +```python +db.raw_trades.insert_raw_websocket_data( + exchange="okx", + symbol="BTC-USDT", + data_type="raw_trade", + raw_data={"instId": "BTC-USDT", "px": "50000", "sz": "0.1"}, + timestamp=datetime.now(timezone.utc) +) +``` + +##### `get_raw_trades(symbol: str, data_type: str, start_time: datetime, end_time: datetime, exchange: str = "okx", limit: Optional[int] = None) -> List[Dict[str, Any]]` + +Retrieve raw trade data for analysis. + +```python +trades = db.raw_trades.get_raw_trades( + symbol="BTC-USDT", + data_type="trade", + start_time=datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc), + end_time=datetime(2024, 1, 1, 13, 0, 0, tzinfo=timezone.utc), + limit=1000 +) +``` + +## Error Handling + +The database operations module includes comprehensive error handling with custom exceptions. + +### DatabaseOperationError + +Custom exception for database operation failures. + +```python +from database.operations import DatabaseOperationError + +try: + db.market_data.upsert_candle(candle) +except DatabaseOperationError as e: + logger.error(f"Database operation failed: {e}") + # Handle the error appropriately +``` + +### Best Practices + +1. **Always Handle Exceptions**: Wrap database operations in try-catch blocks +2. **Check Health First**: Use `health_check()` before critical operations +3. **Monitor Performance**: Use `get_stats()` to monitor database growth +4. **Use Appropriate Repositories**: Use `market_data` for candles, `raw_trades` for raw data +5. **Handle Duplicates Appropriately**: Choose the right `force_update` setting + +## Configuration + +### Force Update Behavior + +The `force_update_candles` parameter in collectors controls duplicate handling: + +```python +# In OKX collector configuration +collector = OKXCollector( + symbols=['BTC-USDT'], + force_update_candles=False # Default: ignore duplicates +) + +# Or enable force updates +collector = OKXCollector( + symbols=['BTC-USDT'], + force_update_candles=True # Overwrite existing candles +) +``` + +### Logging Integration + +Database operations automatically integrate with the application's logging system: + +```python +import logging +from database.operations import get_database_operations + +logger = logging.getLogger(__name__) +db = get_database_operations(logger) + +# All database operations will now log through your logger +db.market_data.upsert_candle(candle) # Logs: "Stored candle: BTC-USDT 5s at ..." +``` + +## Migration from Direct SQL + +If you have existing code using direct SQL, here's how to migrate: + +### Before (Direct SQL - ❌ Don't do this) + +```python +# OLD WAY - direct SQL queries +from database.connection import get_db_manager +from sqlalchemy import text + +db_manager = get_db_manager() +with db_manager.get_session() as session: + session.execute(text(""" + INSERT INTO market_data (exchange, symbol, timeframe, ...) + VALUES (:exchange, :symbol, :timeframe, ...) + """), {...}) + session.commit() +``` + +### After (Repository Pattern - βœ… Correct way) + +```python +# NEW WAY - using repository pattern +from database.operations import get_database_operations + +db = get_database_operations() +success = db.market_data.upsert_candle(candle) +``` + +## Performance Considerations + +### Connection Pooling + +The database operations module automatically manages connection pooling through the underlying `DatabaseManager`. + +### Batch Operations + +For high-throughput scenarios, consider batching operations: + +```python +# Store multiple candles efficiently +candles = [candle1, candle2, candle3, ...] + +for candle in candles: + db.market_data.upsert_candle(candle) +``` + +### Monitoring + +Monitor database performance using the built-in statistics: + +```python +import time + +# Monitor database load +while True: + stats = db.get_stats() + print(f"Candles: {stats['candle_count']:,}, Health: {stats['healthy']}") + time.sleep(30) +``` + +## Troubleshooting + +### Common Issues + +#### 1. Connection Errors +```python +if not db.health_check(): + logger.error("Database connection failed - check connection settings") +``` + +#### 2. Duplicate Key Errors +```python +# Use force_update=False to ignore duplicates +db.market_data.upsert_candle(candle, force_update=False) +``` + +#### 3. Transaction Errors +The repository automatically handles session management, but if you encounter issues: +```python +try: + db.market_data.upsert_candle(candle) +except DatabaseOperationError as e: + logger.error(f"Transaction failed: {e}") +``` + +### Debug Mode + +Enable database query logging for debugging: + +```python +# Set environment variable +import os +os.environ['DEBUG'] = 'true' + +# This will log all SQL queries +db = get_database_operations() +``` + +## Related Documentation + +- **[Database Connection](../architecture/database.md)** - Connection pooling and configuration +- **[Data Collectors](data_collectors.md)** - How collectors use database operations +- **[Architecture Overview](../architecture/architecture.md)** - System design patterns + +--- + +*This documentation covers the repository pattern implementation in `database/operations.py`. For database schema details, see the [Architecture Documentation](../architecture/).* \ No newline at end of file diff --git a/example_complete_series_aggregation.py b/example_complete_series_aggregation.py new file mode 100644 index 0000000..e5b170d --- /dev/null +++ b/example_complete_series_aggregation.py @@ -0,0 +1,236 @@ +#!/usr/bin/env python3 +""" +Example: Complete Time Series Aggregation + +This example shows how to modify the aggregation system to emit candles +for every time period, even when there are no trades. +""" + +import asyncio +from datetime import datetime, timezone, timedelta +from decimal import Decimal +from typing import Dict, List, Optional + +from data.common.data_types import StandardizedTrade, OHLCVCandle, CandleProcessingConfig +from data.common.aggregation import RealTimeCandleProcessor + + +class CompleteSeriesProcessor(RealTimeCandleProcessor): + """ + Extended processor that emits candles for every time period, + filling gaps with previous close prices when no trades occur. + """ + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.last_prices = {} # Track last known price for each timeframe + self.timers = {} # Timer tasks for each timeframe + + async def start_time_based_emission(self): + """Start timers to emit candles on time boundaries regardless of trades.""" + for timeframe in self.config.timeframes: + self.timers[timeframe] = asyncio.create_task( + self._time_based_candle_emitter(timeframe) + ) + + async def stop_time_based_emission(self): + """Stop all timers.""" + for task in self.timers.values(): + task.cancel() + self.timers.clear() + + async def _time_based_candle_emitter(self, timeframe: str): + """Emit candles on time boundaries for a specific timeframe.""" + try: + while True: + # Calculate next boundary + now = datetime.now(timezone.utc) + next_boundary = self._get_next_time_boundary(now, timeframe) + + # Wait until next boundary + wait_seconds = (next_boundary - now).total_seconds() + if wait_seconds > 0: + await asyncio.sleep(wait_seconds) + + # Check if we have an active bucket with trades + current_bucket = self.current_buckets.get(timeframe) + + if current_bucket is None or current_bucket.trade_count == 0: + # No trades during this period - create empty candle + await self._emit_empty_candle(timeframe, next_boundary) + # If there are trades, they will be handled by normal trade processing + + except asyncio.CancelledError: + pass # Timer was cancelled + + async def _emit_empty_candle(self, timeframe: str, end_time: datetime): + """Emit an empty candle when no trades occurred during the period.""" + try: + # Calculate start time + start_time = self._get_bucket_start_time(end_time - timedelta(seconds=1), timeframe) + + # Use last known price or default + last_price = self.last_prices.get(timeframe, Decimal('0')) + + # Create empty candle with last known price as OHLC + empty_candle = OHLCVCandle( + symbol=self.symbol, + timeframe=timeframe, + start_time=start_time, + end_time=end_time, + open=last_price, + high=last_price, + low=last_price, + close=last_price, + volume=Decimal('0'), + trade_count=0, + exchange=self.exchange, + is_complete=True, + first_trade_time=None, + last_trade_time=None + ) + + # Emit the empty candle + self._emit_candle(empty_candle) + + if self.logger: + self.logger.info( + f"β­• {timeframe.upper()} EMPTY CANDLE at {end_time.strftime('%H:%M:%S')}: " + f"No trades, using last price ${last_price}" + ) + + except Exception as e: + if self.logger: + self.logger.error(f"Error emitting empty candle: {e}") + + def _emit_candle(self, candle: OHLCVCandle) -> None: + """Override to track last prices.""" + # Update last known price + if candle.close > 0: + self.last_prices[candle.timeframe] = candle.close + + # Call parent implementation + super()._emit_candle(candle) + + def _get_next_time_boundary(self, current_time: datetime, timeframe: str) -> datetime: + """Calculate the next time boundary for a timeframe.""" + if timeframe == '1s': + # Next second boundary + return (current_time + timedelta(seconds=1)).replace(microsecond=0) + elif timeframe == '5s': + # Next 5-second boundary + next_sec = (current_time.second // 5 + 1) * 5 + if next_sec >= 60: + return current_time.replace(second=0, microsecond=0, minute=current_time.minute + 1) + return current_time.replace(second=next_sec, microsecond=0) + elif timeframe == '10s': + # Next 10-second boundary + next_sec = (current_time.second // 10 + 1) * 10 + if next_sec >= 60: + return current_time.replace(second=0, microsecond=0, minute=current_time.minute + 1) + return current_time.replace(second=next_sec, microsecond=0) + elif timeframe == '15s': + # Next 15-second boundary + next_sec = (current_time.second // 15 + 1) * 15 + if next_sec >= 60: + return current_time.replace(second=0, microsecond=0, minute=current_time.minute + 1) + return current_time.replace(second=next_sec, microsecond=0) + elif timeframe == '30s': + # Next 30-second boundary + next_sec = (current_time.second // 30 + 1) * 30 + if next_sec >= 60: + return current_time.replace(second=0, microsecond=0, minute=current_time.minute + 1) + return current_time.replace(second=next_sec, microsecond=0) + elif timeframe == '1m': + # Next minute boundary + return (current_time + timedelta(minutes=1)).replace(second=0, microsecond=0) + elif timeframe == '5m': + # Next 5-minute boundary + next_min = (current_time.minute // 5 + 1) * 5 + if next_min >= 60: + return current_time.replace(minute=0, second=0, microsecond=0, hour=current_time.hour + 1) + return current_time.replace(minute=next_min, second=0, microsecond=0) + else: + # For other timeframes, add appropriate logic + return current_time + timedelta(minutes=1) + + +# Example usage +async def demo_complete_series(): + """Demonstrate complete time series aggregation.""" + print("πŸ• Complete Time Series Aggregation Demo") + print("This will emit candles even when no trades occur\n") + + # Create processor with complete series capability + config = CandleProcessingConfig(timeframes=['1s', '5s', '30s']) + processor = CompleteSeriesProcessor( + symbol="BTC-USDT", + exchange="demo", + config=config, + component_name="complete_series_demo" + ) + + # Set initial price + processor.last_prices = {'1s': Decimal('50000'), '5s': Decimal('50000'), '30s': Decimal('50000')} + + # Add callback to see emitted candles + def on_candle(candle: OHLCVCandle): + candle_type = "TRADE" if candle.trade_count > 0 else "EMPTY" + print(f"πŸ“Š {candle_type} {candle.timeframe.upper()} at {candle.end_time.strftime('%H:%M:%S')}: " + f"${candle.close} (T={candle.trade_count})") + + processor.add_candle_callback(on_candle) + + # Start time-based emission + await processor.start_time_based_emission() + + try: + # Simulate some trades with gaps + print("Simulating trades with gaps...\n") + + base_time = datetime.now(timezone.utc) + + # Trade at T+0 + trade1 = StandardizedTrade( + symbol="BTC-USDT", + trade_id="1", + price=Decimal('50100'), + size=Decimal('0.1'), + side="buy", + timestamp=base_time, + exchange="demo" + ) + processor.process_trade(trade1) + + # Wait 3 seconds (should see empty candles for missing periods) + await asyncio.sleep(3) + + # Trade at T+3 + trade2 = StandardizedTrade( + symbol="BTC-USDT", + trade_id="2", + price=Decimal('50200'), + size=Decimal('0.2'), + side="sell", + timestamp=base_time + timedelta(seconds=3), + exchange="demo" + ) + processor.process_trade(trade2) + + # Wait more to see more empty candles + await asyncio.sleep(5) + + print("\nβœ… Demo completed - You can see both trade candles and empty candles") + + finally: + await processor.stop_time_based_emission() + + +if __name__ == "__main__": + print("Complete Time Series Aggregation Example") + print("=" * 50) + print("This shows how to emit candles even when no trades occur.") + print("Uncomment the line below to run the demo:\n") + + # Uncomment to run the demo: + # asyncio.run(demo_complete_series()) \ No newline at end of file diff --git a/scripts/monitor_clean.py b/scripts/monitor_clean.py index fe10159..bd69f4c 100644 --- a/scripts/monitor_clean.py +++ b/scripts/monitor_clean.py @@ -76,32 +76,49 @@ class CleanMonitor: MarketData.created_at >= cutoff ).scalar() - # Timeframe breakdown + # Timeframe breakdown with improved sorting timeframes = session.query( MarketData.timeframe, func.count(MarketData.id) ).group_by(MarketData.timeframe).all() - # Latest prices + # Latest prices - prioritize shorter timeframes for more recent data latest_prices = {} for symbol in ['BTC-USDT', 'ETH-USDT']: - latest = session.query(MarketData).filter( - MarketData.symbol == symbol, - MarketData.timeframe == '1m' - ).order_by(desc(MarketData.created_at)).first() + # Try to get latest price from shortest available timeframe + price_timeframes = ['5s', '1s', '1m', '5m', '15m', '1h'] # Prefer shorter timeframes + latest = None + + for tf in price_timeframes: + latest = session.query(MarketData).filter( + MarketData.symbol == symbol, + MarketData.timeframe == tf + ).order_by(desc(MarketData.created_at)).first() + + if latest: + break # Use first available timeframe if latest: latest_prices[symbol] = { 'price': float(latest.close), - 'time': latest.timestamp + 'time': latest.timestamp, + 'timeframe': latest.timeframe } + # Second-based activity monitoring (last 1 minute for high-frequency data) + recent_cutoff_1min = datetime.now(timezone.utc) - timedelta(minutes=1) + recent_second_candles = session.query(func.count(MarketData.id)).filter( + MarketData.created_at >= recent_cutoff_1min, + MarketData.timeframe.in_(['1s', '5s', '10s', '15s', '30s']) + ).scalar() + return { 'raw_count': raw_count, 'candle_count': candle_count, 'raw_timespan': (raw_newest - raw_oldest).total_seconds() / 3600 if raw_oldest and raw_newest else 0, 'recent_raw': recent_raw, 'recent_candles': recent_candles, + 'recent_second_candles': recent_second_candles, 'timeframes': dict(timeframes), 'latest_prices': latest_prices } @@ -110,6 +127,25 @@ class CleanMonitor: self.logger.error(f"Error getting stats: {e}") return {} + def _sort_timeframes(self, timeframes: dict) -> dict: + """Sort timeframes logically: seconds -> minutes -> hours -> days.""" + def timeframe_sort_key(tf): + """Generate sort key for timeframe.""" + import re + match = re.match(r'^(\d+)([smhd])$', tf.lower()) + if not match: + return (999, 999) # Unknown formats last + + number = int(match.group(1)) + unit = match.group(2) + + # Unit priority: s=0, m=1, h=2, d=3 + unit_priority = {'s': 0, 'm': 1, 'h': 2, 'd': 3}.get(unit, 999) + return (unit_priority, number) + + sorted_items = sorted(timeframes.items(), key=lambda x: timeframe_sort_key(x[0])) + return dict(sorted_items) + def print_status(self): """Print clean status summary.""" stats = self.get_summary_stats() @@ -128,27 +164,53 @@ class CleanMonitor: print(f"πŸ“ˆ Raw Data: {raw_count:,} entries ({timespan:.1f} hours)") - # Candle breakdown + # Candle breakdown with improved sorting and formatting timeframes = stats.get('timeframes', {}) if timeframes: - tf_summary = ", ".join([f"{tf}:{count}" for tf, count in timeframes.items()]) - print(f"πŸ“Š Candles: {candle_count:,} total ({tf_summary})") + sorted_timeframes = self._sort_timeframes(timeframes) + + # Group by type for better display + second_tfs = {k: v for k, v in sorted_timeframes.items() if k.endswith('s')} + minute_tfs = {k: v for k, v in sorted_timeframes.items() if k.endswith('m')} + hour_tfs = {k: v for k, v in sorted_timeframes.items() if k.endswith('h')} + day_tfs = {k: v for k, v in sorted_timeframes.items() if k.endswith('d')} + + # Build display string + tf_parts = [] + if second_tfs: + tf_parts.append(" ".join([f"{tf}:{count}" for tf, count in second_tfs.items()])) + if minute_tfs: + tf_parts.append(" ".join([f"{tf}:{count}" for tf, count in minute_tfs.items()])) + if hour_tfs: + tf_parts.append(" ".join([f"{tf}:{count}" for tf, count in hour_tfs.items()])) + if day_tfs: + tf_parts.append(" ".join([f"{tf}:{count}" for tf, count in day_tfs.items()])) + + tf_summary = " | ".join(tf_parts) + print(f"πŸ“Š Candles: {candle_count:,} total") + print(f" {tf_summary}") else: print(f"πŸ“Š Candles: {candle_count:,} total") - # Recent activity + # Enhanced recent activity with second-based monitoring recent_raw = stats.get('recent_raw', 0) recent_candles = stats.get('recent_candles', 0) - print(f"πŸ• Recent (5m): {recent_raw:,} raw, {recent_candles} candles") + recent_second_candles = stats.get('recent_second_candles', 0) - # Latest prices + print(f"πŸ• Recent Activity:") + print(f" 5m: {recent_raw:,} raw trades, {recent_candles} total candles") + if recent_second_candles > 0: + print(f" 1m: {recent_second_candles} second-based candles (1s-30s)") + + # Latest prices with timeframe information latest_prices = stats.get('latest_prices', {}) if latest_prices: print("πŸ’° Latest Prices:") for symbol, data in latest_prices.items(): price = data['price'] time_str = data['time'].strftime('%H:%M:%S') - print(f" {symbol}: ${price:,.2f} at {time_str}") + timeframe = data.get('timeframe', '1m') + print(f" {symbol}: ${price:,.2f} at {time_str} ({timeframe})") print("="*50) diff --git a/scripts/production_clean.py b/scripts/production_clean.py index a7ca821..d6a4389 100644 --- a/scripts/production_clean.py +++ b/scripts/production_clean.py @@ -100,12 +100,14 @@ class ProductionManager: symbol = pair_config['symbol'] data_types = [DataType(dt) for dt in pair_config.get('data_types', ['trade'])] - self.logger.info(f"πŸ“ˆ Creating collector for {symbol} with data types: {[dt.value for dt in data_types]}") + # Get timeframes from config file for this trading pair + config_timeframes = pair_config.get('timeframes', ['1m', '5m']) - # Create custom candle processing config for 1m and 5m timeframes - # Note: 1s timeframes are not supported by the aggregation framework + self.logger.info(f"πŸ“ˆ Creating collector for {symbol} with timeframes: {config_timeframes}") + + # Create custom candle processing config using timeframes from config candle_config = CandleProcessingConfig( - timeframes=['1m', '5m'], + timeframes=config_timeframes, emit_incomplete_candles=False, # Only complete candles auto_save_candles=True ) @@ -142,10 +144,14 @@ class ProductionManager: self.collectors.append(collector) self.statistics['collectors_created'] += 1 - self.logger.info(f"βœ… Collector created for {symbol} with 1m/5m timeframes and error-only logging") + self.logger.info(f"βœ… Collector created for {symbol} with {'/'.join(config_timeframes)} timeframes") - self.logger.info(f"πŸŽ‰ All {len(self.collectors)} collectors created successfully with error-only logging") - self.logger.info(f"πŸ“Š Collectors configured with 1m and 5m aggregation timeframes") + self.logger.info(f"πŸŽ‰ All {len(self.collectors)} collectors created successfully") + # Get unique timeframes across all collectors for summary + all_timeframes = set() + for pair in enabled_pairs: + all_timeframes.update(pair.get('timeframes', ['1m', '5m'])) + self.logger.info(f"πŸ“Š Collectors configured with timeframes: {', '.join(sorted(all_timeframes))}") return True except Exception as e: @@ -210,6 +216,20 @@ async def run_clean_production(duration_hours: Optional[float] = None): signal.signal(signal.SIGTERM, signal_handler) try: + # Read config to show actual timeframes in banner + config_path = "config/okx_config.json" + try: + with open(config_path, 'r') as f: + config = json.load(f) + # Get unique timeframes from all enabled trading pairs + all_timeframes = set() + for pair in config.get('trading_pairs', []): + if pair.get('enabled', True): + all_timeframes.update(pair.get('timeframes', ['1m', '5m'])) + timeframes_str = ', '.join(sorted(all_timeframes)) + except: + timeframes_str = "configured timeframes" + # Header print("πŸš€ OKX PRODUCTION DATA COLLECTOR") print("="*50) @@ -217,7 +237,7 @@ async def run_clean_production(duration_hours: Optional[float] = None): print(f"⏱️ Duration: {duration_hours} hours") else: print(f"⏱️ Duration: Indefinite (until stopped)") - print(f"πŸ“Š Timeframes: 1m and 5m candles") + print(f"πŸ“Š Timeframes: {timeframes_str}") print(f"πŸ’Ύ Database: Raw trades + aggregated candles") print(f"πŸ“ Logs: logs/ directory") print("="*50) diff --git a/tests/test_real_storage.py b/tests/test_real_storage.py index ce4313c..7f16706 100644 --- a/tests/test_real_storage.py +++ b/tests/test_real_storage.py @@ -14,7 +14,7 @@ from datetime import datetime, timezone from data.exchanges.okx import OKXCollector from data.base_collector import DataType -from database.connection import DatabaseConnection +from database.operations import get_database_operations from utils.logger import get_logger # Global test state @@ -36,12 +36,15 @@ signal.signal(signal.SIGTERM, signal_handler) async def check_database_connection(): """Check if database connection is available.""" try: - db_manager = DatabaseConnection() - # Test connection - with db_manager.get_session() as session: - session.execute("SELECT 1") - print("βœ… Database connection successful") - return True + db_operations = get_database_operations() + # Test connection using the new repository pattern + is_healthy = db_operations.health_check() + if is_healthy: + print("βœ… Database connection successful") + return True + else: + print("❌ Database health check failed") + return False except Exception as e: print(f"❌ Database connection failed: {e}") print(" Make sure your database is running and configured correctly") @@ -49,18 +52,22 @@ async def check_database_connection(): async def count_stored_data(): - """Count raw trades and candles in database.""" + """Count raw trades and candles in database using repository pattern.""" try: - db_manager = DatabaseConnection() - with db_manager.get_session() as session: - # Count raw trades - raw_count = session.execute("SELECT COUNT(*) FROM raw_trades WHERE exchange = 'okx'").scalar() - - # Count market data candles - candle_count = session.execute("SELECT COUNT(*) FROM market_data WHERE exchange = 'okx'").scalar() - - print(f"πŸ“Š Database counts: Raw trades: {raw_count}, Candles: {candle_count}") - return raw_count, candle_count + db_operations = get_database_operations() + + # Get database statistics using the new operations module + stats = db_operations.get_stats() + + if 'error' in stats: + print(f"❌ Error getting database stats: {stats['error']}") + return 0, 0 + + raw_count = stats.get('raw_trade_count', 0) + candle_count = stats.get('candle_count', 0) + + print(f"πŸ“Š Database counts: Raw trades: {raw_count}, Candles: {candle_count}") + return raw_count, candle_count except Exception as e: print(f"❌ Error counting database records: {e}") return 0, 0