TCPDashboard/scripts/monitor_clean.py

288 lines
10 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
"""
Clean Database Monitor
Provides clean status updates for production data collection
with detailed logging to files.
Usage:
python scripts/monitor_clean.py [--interval seconds]
Examples:
# Check status once
python scripts/monitor_clean.py
# Monitor every 60 seconds
python scripts/monitor_clean.py --interval 60
"""
import asyncio
import argparse
import sys
from datetime import datetime
from pathlib import Path
# Add project root to path
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
# Set environment for clean output
import os
os.environ['DEBUG'] = 'false'
from database.connection import DatabaseManager
from database.models import MarketData, RawTrade
from sqlalchemy import func, desc
from utils.logger import get_logger
class CleanMonitor:
"""Clean database monitor for production use."""
def __init__(self):
self.logger = get_logger("clean_monitor", verbose=False)
self.db_manager = None
def connect(self) -> bool:
"""Connect to database quietly."""
try:
self.db_manager = DatabaseManager()
self.db_manager.initialize()
return True
except Exception as e:
print(f"❌ Database connection failed: {e}")
return False
def get_summary_stats(self) -> dict:
"""Get essential statistics for console display."""
try:
with self.db_manager.get_session() as session:
# Raw data count
raw_count = session.query(func.count(RawTrade.id)).scalar()
# Candle count
candle_count = session.query(func.count(MarketData.id)).scalar()
# Time range for raw data
raw_oldest = session.query(func.min(RawTrade.timestamp)).scalar()
raw_newest = session.query(func.max(RawTrade.timestamp)).scalar()
# Recent activity (last 5 minutes)
from datetime import timedelta, timezone
cutoff = datetime.now(timezone.utc) - timedelta(minutes=5)
recent_raw = session.query(func.count(RawTrade.id)).filter(
RawTrade.created_at >= cutoff
).scalar()
recent_candles = session.query(func.count(MarketData.id)).filter(
MarketData.created_at >= cutoff
).scalar()
# Timeframe breakdown with improved sorting
timeframes = session.query(
MarketData.timeframe,
func.count(MarketData.id)
).group_by(MarketData.timeframe).all()
# Latest prices - prioritize shorter timeframes for more recent data
latest_prices = {}
for symbol in ['BTC-USDT', 'ETH-USDT']:
# Try to get latest price from shortest available timeframe
price_timeframes = ['5s', '1s', '1m', '5m', '15m', '1h'] # Prefer shorter timeframes
latest = None
for tf in price_timeframes:
latest = session.query(MarketData).filter(
MarketData.symbol == symbol,
MarketData.timeframe == tf
).order_by(desc(MarketData.created_at)).first()
if latest:
break # Use first available timeframe
if latest:
latest_prices[symbol] = {
'price': float(latest.close),
'time': latest.timestamp,
'timeframe': latest.timeframe
}
# Second-based activity monitoring (last 1 minute for high-frequency data)
recent_cutoff_1min = datetime.now(timezone.utc) - timedelta(minutes=1)
recent_second_candles = session.query(func.count(MarketData.id)).filter(
MarketData.created_at >= recent_cutoff_1min,
MarketData.timeframe.in_(['1s', '5s', '10s', '15s', '30s'])
).scalar()
return {
'raw_count': raw_count,
'candle_count': candle_count,
'raw_timespan': (raw_newest - raw_oldest).total_seconds() / 3600 if raw_oldest and raw_newest else 0,
'recent_raw': recent_raw,
'recent_candles': recent_candles,
'recent_second_candles': recent_second_candles,
'timeframes': dict(timeframes),
'latest_prices': latest_prices
}
except Exception as e:
self.logger.error(f"Error getting stats: {e}")
return {}
def _sort_timeframes(self, timeframes: dict) -> dict:
"""Sort timeframes logically: seconds -> minutes -> hours -> days."""
def timeframe_sort_key(tf):
"""Generate sort key for timeframe."""
import re
match = re.match(r'^(\d+)([smhd])$', tf.lower())
if not match:
return (999, 999) # Unknown formats last
number = int(match.group(1))
unit = match.group(2)
# Unit priority: s=0, m=1, h=2, d=3
unit_priority = {'s': 0, 'm': 1, 'h': 2, 'd': 3}.get(unit, 999)
return (unit_priority, number)
sorted_items = sorted(timeframes.items(), key=lambda x: timeframe_sort_key(x[0]))
return dict(sorted_items)
def print_status(self):
"""Print clean status summary."""
stats = self.get_summary_stats()
if not stats:
print("❌ Unable to get database statistics")
return
print("\n" + "="*50)
print(f"📊 DATA COLLECTION STATUS - {datetime.now().strftime('%H:%M:%S')}")
print("="*50)
# Main metrics
raw_count = stats.get('raw_count', 0)
candle_count = stats.get('candle_count', 0)
timespan = stats.get('raw_timespan', 0)
print(f"📈 Raw Data: {raw_count:,} entries ({timespan:.1f} hours)")
# Candle breakdown with improved sorting and formatting
timeframes = stats.get('timeframes', {})
if timeframes:
sorted_timeframes = self._sort_timeframes(timeframes)
# Group by type for better display
second_tfs = {k: v for k, v in sorted_timeframes.items() if k.endswith('s')}
minute_tfs = {k: v for k, v in sorted_timeframes.items() if k.endswith('m')}
hour_tfs = {k: v for k, v in sorted_timeframes.items() if k.endswith('h')}
day_tfs = {k: v for k, v in sorted_timeframes.items() if k.endswith('d')}
# Build display string
tf_parts = []
if second_tfs:
tf_parts.append(" ".join([f"{tf}:{count}" for tf, count in second_tfs.items()]))
if minute_tfs:
tf_parts.append(" ".join([f"{tf}:{count}" for tf, count in minute_tfs.items()]))
if hour_tfs:
tf_parts.append(" ".join([f"{tf}:{count}" for tf, count in hour_tfs.items()]))
if day_tfs:
tf_parts.append(" ".join([f"{tf}:{count}" for tf, count in day_tfs.items()]))
tf_summary = " | ".join(tf_parts)
print(f"📊 Candles: {candle_count:,} total")
print(f" {tf_summary}")
else:
print(f"📊 Candles: {candle_count:,} total")
# Enhanced recent activity with second-based monitoring
recent_raw = stats.get('recent_raw', 0)
recent_candles = stats.get('recent_candles', 0)
recent_second_candles = stats.get('recent_second_candles', 0)
print(f"🕐 Recent Activity:")
print(f" 5m: {recent_raw:,} raw trades, {recent_candles} total candles")
if recent_second_candles > 0:
print(f" 1m: {recent_second_candles} second-based candles (1s-30s)")
# Latest prices with timeframe information
latest_prices = stats.get('latest_prices', {})
if latest_prices:
print("💰 Latest Prices:")
for symbol, data in latest_prices.items():
price = data['price']
time_str = data['time'].strftime('%H:%M:%S')
timeframe = data.get('timeframe', '1m')
print(f" {symbol}: ${price:,.2f} at {time_str} ({timeframe})")
print("="*50)
def disconnect(self):
"""Disconnect from database."""
if self.db_manager:
self.db_manager.close()
async def monitor_clean(interval: int = 0):
"""Run clean monitoring."""
monitor = CleanMonitor()
try:
if not monitor.connect():
return False
if interval <= 0:
# Single check
monitor.print_status()
return True
# Continuous monitoring
print(f"📊 Monitoring every {interval} seconds (Ctrl+C to stop)")
while True:
monitor.print_status()
print(f"\n⏰ Next update in {interval} seconds...\n")
await asyncio.sleep(interval)
except KeyboardInterrupt:
print("\n👋 Monitoring stopped")
return True
except Exception as e:
print(f"❌ Monitor error: {e}")
return False
finally:
monitor.disconnect()
def main():
"""Main entry point."""
parser = argparse.ArgumentParser(
description="Clean Database Monitor",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Single status check
python scripts/monitor_clean.py
# Monitor every minute
python scripts/monitor_clean.py --interval 60
"""
)
parser.add_argument(
'--interval',
type=int,
default=0,
help='Monitor interval in seconds (0 = single check, default: 0)'
)
args = parser.parse_args()
try:
success = asyncio.run(monitor_clean(args.interval))
sys.exit(0 if success else 1)
except KeyboardInterrupt:
print("\n👋 Exiting...")
sys.exit(0)
except Exception as e:
print(f"❌ Fatal error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()