- Introduced `example_complete_series_aggregation.py` to demonstrate time series aggregation, emitting candles even when no trades occur. - Implemented `CompleteSeriesProcessor` extending `RealTimeCandleProcessor` to handle time-based candle emission and empty candle creation. - Refactored `OKXCollector` to utilize the new repository pattern for database operations, enhancing modularity and maintainability. - Updated database operations to centralize data handling through `DatabaseOperations`, improving error handling and logging. - Enhanced documentation to include details on the new aggregation example and repository pattern implementation, ensuring clarity for users.
288 lines
10 KiB
Python
288 lines
10 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Clean Database Monitor
|
|
|
|
Provides clean status updates for production data collection
|
|
with detailed logging to files.
|
|
|
|
Usage:
|
|
python scripts/monitor_clean.py [--interval seconds]
|
|
|
|
Examples:
|
|
# Check status once
|
|
python scripts/monitor_clean.py
|
|
|
|
# Monitor every 60 seconds
|
|
python scripts/monitor_clean.py --interval 60
|
|
"""
|
|
|
|
import asyncio
|
|
import argparse
|
|
import sys
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
# Add project root to path
|
|
project_root = Path(__file__).parent.parent
|
|
sys.path.insert(0, str(project_root))
|
|
|
|
# Set environment for clean output
|
|
import os
|
|
os.environ['DEBUG'] = 'false'
|
|
|
|
from database.connection import DatabaseManager
|
|
from database.models import MarketData, RawTrade
|
|
from sqlalchemy import func, desc
|
|
from utils.logger import get_logger
|
|
|
|
class CleanMonitor:
|
|
"""Clean database monitor for production use."""
|
|
|
|
def __init__(self):
|
|
self.logger = get_logger("clean_monitor", verbose=False)
|
|
self.db_manager = None
|
|
|
|
def connect(self) -> bool:
|
|
"""Connect to database quietly."""
|
|
try:
|
|
self.db_manager = DatabaseManager()
|
|
self.db_manager.initialize()
|
|
return True
|
|
except Exception as e:
|
|
print(f"❌ Database connection failed: {e}")
|
|
return False
|
|
|
|
def get_summary_stats(self) -> dict:
|
|
"""Get essential statistics for console display."""
|
|
try:
|
|
with self.db_manager.get_session() as session:
|
|
# Raw data count
|
|
raw_count = session.query(func.count(RawTrade.id)).scalar()
|
|
|
|
# Candle count
|
|
candle_count = session.query(func.count(MarketData.id)).scalar()
|
|
|
|
# Time range for raw data
|
|
raw_oldest = session.query(func.min(RawTrade.timestamp)).scalar()
|
|
raw_newest = session.query(func.max(RawTrade.timestamp)).scalar()
|
|
|
|
# Recent activity (last 5 minutes)
|
|
from datetime import timedelta, timezone
|
|
cutoff = datetime.now(timezone.utc) - timedelta(minutes=5)
|
|
recent_raw = session.query(func.count(RawTrade.id)).filter(
|
|
RawTrade.created_at >= cutoff
|
|
).scalar()
|
|
recent_candles = session.query(func.count(MarketData.id)).filter(
|
|
MarketData.created_at >= cutoff
|
|
).scalar()
|
|
|
|
# Timeframe breakdown with improved sorting
|
|
timeframes = session.query(
|
|
MarketData.timeframe,
|
|
func.count(MarketData.id)
|
|
).group_by(MarketData.timeframe).all()
|
|
|
|
# Latest prices - prioritize shorter timeframes for more recent data
|
|
latest_prices = {}
|
|
for symbol in ['BTC-USDT', 'ETH-USDT']:
|
|
# Try to get latest price from shortest available timeframe
|
|
price_timeframes = ['5s', '1s', '1m', '5m', '15m', '1h'] # Prefer shorter timeframes
|
|
latest = None
|
|
|
|
for tf in price_timeframes:
|
|
latest = session.query(MarketData).filter(
|
|
MarketData.symbol == symbol,
|
|
MarketData.timeframe == tf
|
|
).order_by(desc(MarketData.created_at)).first()
|
|
|
|
if latest:
|
|
break # Use first available timeframe
|
|
|
|
if latest:
|
|
latest_prices[symbol] = {
|
|
'price': float(latest.close),
|
|
'time': latest.timestamp,
|
|
'timeframe': latest.timeframe
|
|
}
|
|
|
|
# Second-based activity monitoring (last 1 minute for high-frequency data)
|
|
recent_cutoff_1min = datetime.now(timezone.utc) - timedelta(minutes=1)
|
|
recent_second_candles = session.query(func.count(MarketData.id)).filter(
|
|
MarketData.created_at >= recent_cutoff_1min,
|
|
MarketData.timeframe.in_(['1s', '5s', '10s', '15s', '30s'])
|
|
).scalar()
|
|
|
|
return {
|
|
'raw_count': raw_count,
|
|
'candle_count': candle_count,
|
|
'raw_timespan': (raw_newest - raw_oldest).total_seconds() / 3600 if raw_oldest and raw_newest else 0,
|
|
'recent_raw': recent_raw,
|
|
'recent_candles': recent_candles,
|
|
'recent_second_candles': recent_second_candles,
|
|
'timeframes': dict(timeframes),
|
|
'latest_prices': latest_prices
|
|
}
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error getting stats: {e}")
|
|
return {}
|
|
|
|
def _sort_timeframes(self, timeframes: dict) -> dict:
|
|
"""Sort timeframes logically: seconds -> minutes -> hours -> days."""
|
|
def timeframe_sort_key(tf):
|
|
"""Generate sort key for timeframe."""
|
|
import re
|
|
match = re.match(r'^(\d+)([smhd])$', tf.lower())
|
|
if not match:
|
|
return (999, 999) # Unknown formats last
|
|
|
|
number = int(match.group(1))
|
|
unit = match.group(2)
|
|
|
|
# Unit priority: s=0, m=1, h=2, d=3
|
|
unit_priority = {'s': 0, 'm': 1, 'h': 2, 'd': 3}.get(unit, 999)
|
|
return (unit_priority, number)
|
|
|
|
sorted_items = sorted(timeframes.items(), key=lambda x: timeframe_sort_key(x[0]))
|
|
return dict(sorted_items)
|
|
|
|
def print_status(self):
|
|
"""Print clean status summary."""
|
|
stats = self.get_summary_stats()
|
|
if not stats:
|
|
print("❌ Unable to get database statistics")
|
|
return
|
|
|
|
print("\n" + "="*50)
|
|
print(f"📊 DATA COLLECTION STATUS - {datetime.now().strftime('%H:%M:%S')}")
|
|
print("="*50)
|
|
|
|
# Main metrics
|
|
raw_count = stats.get('raw_count', 0)
|
|
candle_count = stats.get('candle_count', 0)
|
|
timespan = stats.get('raw_timespan', 0)
|
|
|
|
print(f"📈 Raw Data: {raw_count:,} entries ({timespan:.1f} hours)")
|
|
|
|
# Candle breakdown with improved sorting and formatting
|
|
timeframes = stats.get('timeframes', {})
|
|
if timeframes:
|
|
sorted_timeframes = self._sort_timeframes(timeframes)
|
|
|
|
# Group by type for better display
|
|
second_tfs = {k: v for k, v in sorted_timeframes.items() if k.endswith('s')}
|
|
minute_tfs = {k: v for k, v in sorted_timeframes.items() if k.endswith('m')}
|
|
hour_tfs = {k: v for k, v in sorted_timeframes.items() if k.endswith('h')}
|
|
day_tfs = {k: v for k, v in sorted_timeframes.items() if k.endswith('d')}
|
|
|
|
# Build display string
|
|
tf_parts = []
|
|
if second_tfs:
|
|
tf_parts.append(" ".join([f"{tf}:{count}" for tf, count in second_tfs.items()]))
|
|
if minute_tfs:
|
|
tf_parts.append(" ".join([f"{tf}:{count}" for tf, count in minute_tfs.items()]))
|
|
if hour_tfs:
|
|
tf_parts.append(" ".join([f"{tf}:{count}" for tf, count in hour_tfs.items()]))
|
|
if day_tfs:
|
|
tf_parts.append(" ".join([f"{tf}:{count}" for tf, count in day_tfs.items()]))
|
|
|
|
tf_summary = " | ".join(tf_parts)
|
|
print(f"📊 Candles: {candle_count:,} total")
|
|
print(f" {tf_summary}")
|
|
else:
|
|
print(f"📊 Candles: {candle_count:,} total")
|
|
|
|
# Enhanced recent activity with second-based monitoring
|
|
recent_raw = stats.get('recent_raw', 0)
|
|
recent_candles = stats.get('recent_candles', 0)
|
|
recent_second_candles = stats.get('recent_second_candles', 0)
|
|
|
|
print(f"🕐 Recent Activity:")
|
|
print(f" 5m: {recent_raw:,} raw trades, {recent_candles} total candles")
|
|
if recent_second_candles > 0:
|
|
print(f" 1m: {recent_second_candles} second-based candles (1s-30s)")
|
|
|
|
# Latest prices with timeframe information
|
|
latest_prices = stats.get('latest_prices', {})
|
|
if latest_prices:
|
|
print("💰 Latest Prices:")
|
|
for symbol, data in latest_prices.items():
|
|
price = data['price']
|
|
time_str = data['time'].strftime('%H:%M:%S')
|
|
timeframe = data.get('timeframe', '1m')
|
|
print(f" {symbol}: ${price:,.2f} at {time_str} ({timeframe})")
|
|
|
|
print("="*50)
|
|
|
|
def disconnect(self):
|
|
"""Disconnect from database."""
|
|
if self.db_manager:
|
|
self.db_manager.close()
|
|
|
|
async def monitor_clean(interval: int = 0):
|
|
"""Run clean monitoring."""
|
|
|
|
monitor = CleanMonitor()
|
|
|
|
try:
|
|
if not monitor.connect():
|
|
return False
|
|
|
|
if interval <= 0:
|
|
# Single check
|
|
monitor.print_status()
|
|
return True
|
|
|
|
# Continuous monitoring
|
|
print(f"📊 Monitoring every {interval} seconds (Ctrl+C to stop)")
|
|
|
|
while True:
|
|
monitor.print_status()
|
|
print(f"\n⏰ Next update in {interval} seconds...\n")
|
|
await asyncio.sleep(interval)
|
|
|
|
except KeyboardInterrupt:
|
|
print("\n👋 Monitoring stopped")
|
|
return True
|
|
except Exception as e:
|
|
print(f"❌ Monitor error: {e}")
|
|
return False
|
|
finally:
|
|
monitor.disconnect()
|
|
|
|
def main():
|
|
"""Main entry point."""
|
|
parser = argparse.ArgumentParser(
|
|
description="Clean Database Monitor",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog="""
|
|
Examples:
|
|
# Single status check
|
|
python scripts/monitor_clean.py
|
|
|
|
# Monitor every minute
|
|
python scripts/monitor_clean.py --interval 60
|
|
"""
|
|
)
|
|
|
|
parser.add_argument(
|
|
'--interval',
|
|
type=int,
|
|
default=0,
|
|
help='Monitor interval in seconds (0 = single check, default: 0)'
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
try:
|
|
success = asyncio.run(monitor_clean(args.interval))
|
|
sys.exit(0 if success else 1)
|
|
except KeyboardInterrupt:
|
|
print("\n👋 Exiting...")
|
|
sys.exit(0)
|
|
except Exception as e:
|
|
print(f"❌ Fatal error: {e}")
|
|
sys.exit(1)
|
|
|
|
if __name__ == "__main__":
|
|
main() |