#!/usr/bin/env python3 """ Clean Database Monitor Provides clean status updates for production data collection with detailed logging to files. Usage: python scripts/monitor_clean.py [--interval seconds] Examples: # Check status once python scripts/monitor_clean.py # Monitor every 60 seconds python scripts/monitor_clean.py --interval 60 """ import asyncio import argparse import sys from datetime import datetime from pathlib import Path # Add project root to path project_root = Path(__file__).parent.parent sys.path.insert(0, str(project_root)) # Set environment for clean output import os os.environ['DEBUG'] = 'false' from database.connection import DatabaseManager from database.models import MarketData, RawTrade from sqlalchemy import func, desc from utils.logger import get_logger class CleanMonitor: """Clean database monitor for production use.""" def __init__(self): self.logger = get_logger("clean_monitor", verbose=False) self.db_manager = None def connect(self) -> bool: """Connect to database quietly.""" try: self.db_manager = DatabaseManager() self.db_manager.initialize() return True except Exception as e: print(f"āŒ Database connection failed: {e}") return False def get_summary_stats(self) -> dict: """Get essential statistics for console display.""" try: with self.db_manager.get_session() as session: # Raw data count raw_count = session.query(func.count(RawTrade.id)).scalar() # Candle count candle_count = session.query(func.count(MarketData.id)).scalar() # Time range for raw data raw_oldest = session.query(func.min(RawTrade.timestamp)).scalar() raw_newest = session.query(func.max(RawTrade.timestamp)).scalar() # Recent activity (last 5 minutes) from datetime import timedelta, timezone cutoff = datetime.now(timezone.utc) - timedelta(minutes=5) recent_raw = session.query(func.count(RawTrade.id)).filter( RawTrade.created_at >= cutoff ).scalar() recent_candles = session.query(func.count(MarketData.id)).filter( MarketData.created_at >= cutoff ).scalar() # Timeframe breakdown with improved sorting timeframes = session.query( MarketData.timeframe, func.count(MarketData.id) ).group_by(MarketData.timeframe).all() # Latest prices - prioritize shorter timeframes for more recent data latest_prices = {} for symbol in ['BTC-USDT', 'ETH-USDT']: # Try to get latest price from shortest available timeframe price_timeframes = ['5s', '1s', '1m', '5m', '15m', '1h'] # Prefer shorter timeframes latest = None for tf in price_timeframes: latest = session.query(MarketData).filter( MarketData.symbol == symbol, MarketData.timeframe == tf ).order_by(desc(MarketData.created_at)).first() if latest: break # Use first available timeframe if latest: latest_prices[symbol] = { 'price': float(latest.close), 'time': latest.timestamp, 'timeframe': latest.timeframe } # Second-based activity monitoring (last 1 minute for high-frequency data) recent_cutoff_1min = datetime.now(timezone.utc) - timedelta(minutes=1) recent_second_candles = session.query(func.count(MarketData.id)).filter( MarketData.created_at >= recent_cutoff_1min, MarketData.timeframe.in_(['1s', '5s', '10s', '15s', '30s']) ).scalar() return { 'raw_count': raw_count, 'candle_count': candle_count, 'raw_timespan': (raw_newest - raw_oldest).total_seconds() / 3600 if raw_oldest and raw_newest else 0, 'recent_raw': recent_raw, 'recent_candles': recent_candles, 'recent_second_candles': recent_second_candles, 'timeframes': dict(timeframes), 'latest_prices': latest_prices } except Exception as e: self.logger.error(f"Error getting stats: {e}") return {} def _sort_timeframes(self, timeframes: dict) -> dict: """Sort timeframes logically: seconds -> minutes -> hours -> days.""" def timeframe_sort_key(tf): """Generate sort key for timeframe.""" import re match = re.match(r'^(\d+)([smhd])$', tf.lower()) if not match: return (999, 999) # Unknown formats last number = int(match.group(1)) unit = match.group(2) # Unit priority: s=0, m=1, h=2, d=3 unit_priority = {'s': 0, 'm': 1, 'h': 2, 'd': 3}.get(unit, 999) return (unit_priority, number) sorted_items = sorted(timeframes.items(), key=lambda x: timeframe_sort_key(x[0])) return dict(sorted_items) def print_status(self): """Print clean status summary.""" stats = self.get_summary_stats() if not stats: print("āŒ Unable to get database statistics") return print("\n" + "="*50) print(f"šŸ“Š DATA COLLECTION STATUS - {datetime.now().strftime('%H:%M:%S')}") print("="*50) # Main metrics raw_count = stats.get('raw_count', 0) candle_count = stats.get('candle_count', 0) timespan = stats.get('raw_timespan', 0) print(f"šŸ“ˆ Raw Data: {raw_count:,} entries ({timespan:.1f} hours)") # Candle breakdown with improved sorting and formatting timeframes = stats.get('timeframes', {}) if timeframes: sorted_timeframes = self._sort_timeframes(timeframes) # Group by type for better display second_tfs = {k: v for k, v in sorted_timeframes.items() if k.endswith('s')} minute_tfs = {k: v for k, v in sorted_timeframes.items() if k.endswith('m')} hour_tfs = {k: v for k, v in sorted_timeframes.items() if k.endswith('h')} day_tfs = {k: v for k, v in sorted_timeframes.items() if k.endswith('d')} # Build display string tf_parts = [] if second_tfs: tf_parts.append(" ".join([f"{tf}:{count}" for tf, count in second_tfs.items()])) if minute_tfs: tf_parts.append(" ".join([f"{tf}:{count}" for tf, count in minute_tfs.items()])) if hour_tfs: tf_parts.append(" ".join([f"{tf}:{count}" for tf, count in hour_tfs.items()])) if day_tfs: tf_parts.append(" ".join([f"{tf}:{count}" for tf, count in day_tfs.items()])) tf_summary = " | ".join(tf_parts) print(f"šŸ“Š Candles: {candle_count:,} total") print(f" {tf_summary}") else: print(f"šŸ“Š Candles: {candle_count:,} total") # Enhanced recent activity with second-based monitoring recent_raw = stats.get('recent_raw', 0) recent_candles = stats.get('recent_candles', 0) recent_second_candles = stats.get('recent_second_candles', 0) print(f"šŸ• Recent Activity:") print(f" 5m: {recent_raw:,} raw trades, {recent_candles} total candles") if recent_second_candles > 0: print(f" 1m: {recent_second_candles} second-based candles (1s-30s)") # Latest prices with timeframe information latest_prices = stats.get('latest_prices', {}) if latest_prices: print("šŸ’° Latest Prices:") for symbol, data in latest_prices.items(): price = data['price'] time_str = data['time'].strftime('%H:%M:%S') timeframe = data.get('timeframe', '1m') print(f" {symbol}: ${price:,.2f} at {time_str} ({timeframe})") print("="*50) def disconnect(self): """Disconnect from database.""" if self.db_manager: self.db_manager.close() async def monitor_clean(interval: int = 0): """Run clean monitoring.""" monitor = CleanMonitor() try: if not monitor.connect(): return False if interval <= 0: # Single check monitor.print_status() return True # Continuous monitoring print(f"šŸ“Š Monitoring every {interval} seconds (Ctrl+C to stop)") while True: monitor.print_status() print(f"\nā° Next update in {interval} seconds...\n") await asyncio.sleep(interval) except KeyboardInterrupt: print("\nšŸ‘‹ Monitoring stopped") return True except Exception as e: print(f"āŒ Monitor error: {e}") return False finally: monitor.disconnect() def main(): """Main entry point.""" parser = argparse.ArgumentParser( description="Clean Database Monitor", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: # Single status check python scripts/monitor_clean.py # Monitor every minute python scripts/monitor_clean.py --interval 60 """ ) parser.add_argument( '--interval', type=int, default=0, help='Monitor interval in seconds (0 = single check, default: 0)' ) args = parser.parse_args() try: success = asyncio.run(monitor_clean(args.interval)) sys.exit(0 if success else 1) except KeyboardInterrupt: print("\nšŸ‘‹ Exiting...") sys.exit(0) except Exception as e: print(f"āŒ Fatal error: {e}") sys.exit(1) if __name__ == "__main__": main()