""" Enhanced system health callbacks for the dashboard. """ import asyncio import json import subprocess import psutil from datetime import datetime, timedelta from typing import Dict, Any, Optional, List from dash import Output, Input, State, html, callback_context, no_update import dash_bootstrap_components as dbc from utils.logger import get_logger from database.connection import DatabaseManager from database.redis_manager import RedisManager logger = get_logger("system_health_callbacks") def register_system_health_callbacks(app): """Register enhanced system health callbacks with comprehensive monitoring.""" # Quick Status Updates (Top Cards) @app.callback( [Output('data-collection-quick-status', 'children'), Output('database-quick-status', 'children'), Output('redis-quick-status', 'children'), Output('performance-quick-status', 'children')], Input('interval-component', 'n_intervals') ) def update_quick_status(n_intervals): """Update quick status indicators.""" try: # Data Collection Status dc_status = _get_data_collection_quick_status() # Database Status db_status = _get_database_quick_status() # Redis Status redis_status = _get_redis_quick_status() # Performance Status perf_status = _get_performance_quick_status() return dc_status, db_status, redis_status, perf_status except Exception as e: logger.error(f"Error updating quick status: {e}") error_status = dbc.Badge("🔴 Error", color="danger", className="me-1") return error_status, error_status, error_status, error_status # Detailed Data Collection Service Status @app.callback( [Output('data-collection-service-status', 'children'), Output('data-collection-metrics', 'children')], [Input('interval-component', 'n_intervals'), Input('refresh-data-status-btn', 'n_clicks')] ) def update_data_collection_status(n_intervals, refresh_clicks): """Update detailed data collection service status and metrics.""" try: service_status = _get_data_collection_service_status() metrics = _get_data_collection_metrics() return service_status, metrics except Exception as e: logger.error(f"Error updating data collection status: {e}") error_div = dbc.Alert( f"Error: {str(e)}", color="danger", dismissable=True ) return error_div, error_div # Individual Collectors Status @app.callback( Output('individual-collectors-status', 'children'), [Input('interval-component', 'n_intervals'), Input('refresh-data-status-btn', 'n_clicks')] ) def update_individual_collectors_status(n_intervals, refresh_clicks): """Update individual data collector health status.""" try: return _get_individual_collectors_status() except Exception as e: logger.error(f"Error updating individual collectors status: {e}") return dbc.Alert( f"Error: {str(e)}", color="danger", dismissable=True ) # Database Status and Statistics @app.callback( [Output('database-status', 'children'), Output('database-stats', 'children')], Input('interval-component', 'n_intervals') ) def update_database_status(n_intervals): """Update database connection status and statistics.""" try: db_status = _get_database_status() db_stats = _get_database_statistics() return db_status, db_stats except Exception as e: logger.error(f"Error updating database status: {e}") error_alert = dbc.Alert( f"Error: {str(e)}", color="danger", dismissable=True ) return error_alert, error_alert # Redis Status and Statistics @app.callback( [Output('redis-status', 'children'), Output('redis-stats', 'children')], Input('interval-component', 'n_intervals') ) def update_redis_status(n_intervals): """Update Redis connection status and statistics.""" try: redis_status = _get_redis_status() redis_stats = _get_redis_statistics() return redis_status, redis_stats except Exception as e: logger.error(f"Error updating Redis status: {e}") error_alert = dbc.Alert( f"Error: {str(e)}", color="danger", dismissable=True ) return error_alert, error_alert # System Performance Metrics @app.callback( Output('system-performance-metrics', 'children'), Input('interval-component', 'n_intervals') ) def update_system_performance(n_intervals): """Update system performance metrics.""" try: return _get_system_performance_metrics() except Exception as e: logger.error(f"Error updating system performance: {e}") return dbc.Alert( f"Error: {str(e)}", color="danger", dismissable=True ) # Data Collection Details Modal @app.callback( [Output("collection-details-modal", "is_open"), Output("collection-details-content", "children")], [Input("view-collection-details-btn", "n_clicks")], [State("collection-details-modal", "is_open")] ) def toggle_collection_details_modal(n_clicks, is_open): """Toggle and populate the collection details modal.""" if n_clicks: details_content = _get_collection_details_content() return not is_open, details_content return is_open, no_update # Collection Logs Modal @app.callback( [Output("collection-logs-modal", "is_open"), Output("collection-logs-content", "children")], [Input("view-collection-logs-btn", "n_clicks"), Input("refresh-logs-btn", "n_clicks")], [State("collection-logs-modal", "is_open")], prevent_initial_call=True ) def toggle_collection_logs_modal(logs_clicks, refresh_clicks, is_open): """Toggle and populate the collection logs modal.""" ctx = callback_context if not ctx.triggered: return is_open, no_update triggered_id = ctx.triggered_id if triggered_id in ["view-collection-logs-btn", "refresh-logs-btn"]: logs_content = _get_collection_logs_content() return True, logs_content return is_open, no_update @app.callback( Output("collection-logs-modal", "is_open", allow_duplicate=True), Input("close-logs-modal", "n_clicks"), State("collection-logs-modal", "is_open"), prevent_initial_call=True ) def close_logs_modal(n_clicks, is_open): if n_clicks: return not is_open return is_open logger.info("Enhanced system health callbacks registered successfully") # Helper Functions def _get_data_collection_quick_status() -> dbc.Badge: """Get quick data collection status.""" try: is_running = _check_data_collection_service_running() if is_running: return dbc.Badge("Active", color="success", className="me-1") else: return dbc.Badge("Stopped", color="danger", className="me-1") except: return dbc.Badge("Unknown", color="warning", className="me-1") def _get_database_quick_status() -> dbc.Badge: """Get quick database status.""" try: db_manager = DatabaseManager() db_manager.initialize() if db_manager.test_connection(): return dbc.Badge("Connected", color="success", className="me-1") else: return dbc.Badge("Error", color="danger", className="me-1") except: return dbc.Badge("Error", color="danger", className="me-1") def _get_redis_quick_status() -> dbc.Badge: """Get quick Redis status.""" try: redis_manager = RedisManager() redis_manager.initialize() if redis_manager.test_connection(): return dbc.Badge("Connected", color="success", className="me-1") else: return dbc.Badge("Error", color="danger", className="me-1") except: return dbc.Badge("Error", color="danger", className="me-1") def _get_performance_quick_status() -> dbc.Badge: """Get quick performance status.""" try: cpu_percent = psutil.cpu_percent(interval=0.1) memory = psutil.virtual_memory() if cpu_percent < 80 and memory.percent < 80: return dbc.Badge("Good", color="success", className="me-1") elif cpu_percent < 90 and memory.percent < 90: return dbc.Badge("Warning", color="warning", className="me-1") else: return dbc.Badge("High", color="danger", className="me-1") except: return dbc.Badge("Unknown", color="secondary", className="me-1") def _get_data_collection_service_status() -> html.Div: """Get detailed data collection service status.""" try: is_running = _check_data_collection_service_running() current_time = datetime.now().strftime('%H:%M:%S') if is_running: status_badge = dbc.Badge("Service Running", color="success", className="me-2") status_text = html.P("Data collection service is actively collecting market data.", className="mb-0") details = html.Div() else: status_badge = dbc.Badge("Service Stopped", color="danger", className="me-2") status_text = html.P("Data collection service is not running.", className="text-danger") details = html.Div([ html.P("To start the service, run:", className="mt-2 mb-1"), html.Code("python scripts/start_data_collection.py") ]) return html.Div([ dbc.Row([ dbc.Col(status_badge, width="auto"), dbc.Col(html.P(f"Checked: {current_time}", className="text-muted mb-0"), width="auto") ], align="center", className="mb-2"), status_text, details ]) except Exception as e: return dbc.Alert(f"Error checking status: {e}", color="danger") def _get_data_collection_metrics() -> html.Div: """Get data collection metrics.""" try: db_manager = DatabaseManager() db_manager.initialize() with db_manager.get_session() as session: from sqlalchemy import text candles_count = session.execute(text("SELECT COUNT(*) FROM market_data")).scalar() or 0 tickers_count = session.execute(text("SELECT COUNT(*) FROM raw_trades WHERE data_type = 'ticker'")).scalar() or 0 latest_market_data = session.execute(text("SELECT MAX(timestamp) FROM market_data")).scalar() latest_raw_data = session.execute(text("SELECT MAX(timestamp) FROM raw_trades")).scalar() latest_data = max(d for d in [latest_market_data, latest_raw_data] if d) if any([latest_market_data, latest_raw_data]) else None if latest_data: time_diff = datetime.utcnow() - (latest_data.replace(tzinfo=None) if latest_data.tzinfo else latest_data) if time_diff < timedelta(minutes=5): freshness_badge = dbc.Badge(f"Fresh ({time_diff.seconds // 60}m ago)", color="success") elif time_diff < timedelta(hours=1): freshness_badge = dbc.Badge(f"Recent ({time_diff.seconds // 60}m ago)", color="warning") else: freshness_badge = dbc.Badge(f"Stale ({time_diff.total_seconds() // 3600:.1f}h ago)", color="danger") else: freshness_badge = dbc.Badge("No data", color="secondary") return html.Div([ dbc.Row([ dbc.Col(html.Strong("Candles:")), dbc.Col(f"{candles_count:,}", className="text-end") ]), dbc.Row([ dbc.Col(html.Strong("Tickers:")), dbc.Col(f"{tickers_count:,}", className="text-end") ]), dbc.Row([ dbc.Col(html.Strong("Data Freshness:")), dbc.Col(freshness_badge, className="text-end") ]) ]) except Exception as e: return dbc.Alert(f"Error loading metrics: {e}", color="danger") def _get_individual_collectors_status() -> html.Div: """Get individual data collector status.""" try: return dbc.Alert([ html.P("Individual collector health data will be displayed here when the data collection service is running.", className="mb-2"), html.Hr(), html.P("To start monitoring, run the following command:", className="mb-1"), html.Code("python scripts/start_data_collection.py") ], color="info") except Exception as e: return dbc.Alert(f"Error checking collector status: {e}", color="danger") def _get_database_status() -> html.Div: """Get detailed database status.""" try: db_manager = DatabaseManager() db_manager.initialize() with db_manager.get_session() as session: from sqlalchemy import text result = session.execute(text("SELECT version()")).fetchone() version = result[0] if result else "Unknown" connections = session.execute(text("SELECT count(*) FROM pg_stat_activity")).scalar() or 0 return html.Div([ dbc.Row([ dbc.Col(dbc.Badge("Database Connected", color="success"), width="auto"), dbc.Col(f"Checked: {datetime.now().strftime('%H:%M:%S')}", className="text-muted") ], align="center", className="mb-2"), html.P(f"Version: PostgreSQL {version.split()[1] if 'PostgreSQL' in version else 'Unknown'}", className="mb-1"), html.P(f"Active connections: {connections}", className="mb-0") ]) except Exception as e: return dbc.Alert(f"Error connecting to database: {e}", color="danger") def _get_database_statistics() -> html.Div: """Get database statistics.""" try: db_manager = DatabaseManager() db_manager.initialize() with db_manager.get_session() as session: from sqlalchemy import text table_stats_query = """ SELECT tablename, pg_size_pretty(pg_total_relation_size('public.'||tablename)) as size FROM pg_tables WHERE schemaname = 'public' ORDER BY pg_total_relation_size('public.'||tablename) DESC LIMIT 5 """ table_stats = session.execute(text(table_stats_query)).fetchall() market_data_activity = session.execute(text("SELECT COUNT(*) FROM market_data WHERE timestamp > NOW() - INTERVAL '1 hour'")).scalar() or 0 raw_data_activity = session.execute(text("SELECT COUNT(*) FROM raw_trades WHERE timestamp > NOW() - INTERVAL '1 hour'")).scalar() or 0 total_recent_activity = market_data_activity + raw_data_activity components = [ dbc.Row([ dbc.Col(html.Strong("Recent Activity (1h):")), dbc.Col(f"{total_recent_activity:,} records", className="text-end") ]), html.Hr(className="my-2"), html.Strong("Largest Tables:"), ] if table_stats: for table, size in table_stats: components.append(dbc.Row([ dbc.Col(f"• {table}"), dbc.Col(size, className="text-end text-muted") ])) else: components.append(html.P("No table statistics available.", className="text-muted")) return html.Div(components) except Exception as e: return dbc.Alert(f"Error loading database stats: {e}", color="danger") def _get_redis_status() -> html.Div: """Get Redis status.""" try: redis_manager = RedisManager() redis_manager.initialize() info = redis_manager.get_info() return html.Div([ dbc.Row([ dbc.Col(dbc.Badge("Redis Connected", color="success"), width="auto"), dbc.Col(f"Checked: {datetime.now().strftime('%H:%M:%S')}", className="text-muted") ], align="center", className="mb-2"), html.P(f"Host: {redis_manager.config.host}:{redis_manager.config.port}", className="mb-0") ]) except Exception as e: return dbc.Alert(f"Error connecting to Redis: {e}", color="danger") def _get_redis_statistics() -> html.Div: """Get Redis statistics.""" try: redis_manager = RedisManager() redis_manager.initialize() info = redis_manager.get_info() return html.Div([ dbc.Row([dbc.Col("Memory Used:"), dbc.Col(info.get('used_memory_human', 'N/A'), className="text-end")]), dbc.Row([dbc.Col("Connected Clients:"), dbc.Col(info.get('connected_clients', 'N/A'), className="text-end")]), dbc.Row([dbc.Col("Uptime (hours):"), dbc.Col(f"{info.get('uptime_in_seconds', 0) // 3600}", className="text-end")]) ]) except Exception as e: return dbc.Alert(f"Error loading Redis stats: {e}", color="danger") def _get_system_performance_metrics() -> html.Div: """Get system performance metrics.""" try: cpu_percent = psutil.cpu_percent(interval=0.1) cpu_count = psutil.cpu_count() memory = psutil.virtual_memory() disk = psutil.disk_usage('/') def get_color(percent): if percent < 70: return "success" if percent < 85: return "warning" return "danger" return html.Div([ html.Div([ html.Strong("CPU Usage: "), dbc.Badge(f"{cpu_percent:.1f}%", color=get_color(cpu_percent)), html.Span(f" ({cpu_count} cores)", className="text-muted ms-1") ], className="mb-2"), dbc.Progress(value=cpu_percent, color=get_color(cpu_percent), style={"height": "10px"}, className="mb-3"), html.Div([ html.Strong("Memory Usage: "), dbc.Badge(f"{memory.percent:.1f}%", color=get_color(memory.percent)), html.Span(f" ({memory.used / (1024**3):.1f} / {memory.total / (1024**3):.1f} GB)", className="text-muted ms-1") ], className="mb-2"), dbc.Progress(value=memory.percent, color=get_color(memory.percent), style={"height": "10px"}, className="mb-3"), html.Div([ html.Strong("Disk Usage: "), dbc.Badge(f"{disk.percent:.1f}%", color=get_color(disk.percent)), html.Span(f" ({disk.used / (1024**3):.1f} / {disk.total / (1024**3):.1f} GB)", className="text-muted ms-1") ], className="mb-2"), dbc.Progress(value=disk.percent, color=get_color(disk.percent), style={"height": "10px"}) ]) except Exception as e: return dbc.Alert(f"Error loading performance metrics: {e}", color="danger") def _get_collection_details_content() -> html.Div: """Get detailed collection information for modal.""" try: return html.Div([ html.H5("Data Collection Service Details"), html.P("Comprehensive data collection service information would be displayed here."), html.Hr(), html.H6("Configuration"), html.P("Service configuration details..."), html.H6("Performance Metrics"), html.P("Detailed performance analytics..."), html.H6("Health Status"), html.P("Individual collector health information...") ]) except Exception as e: return dbc.Alert(f"Error loading details: {e}", color="danger") def _get_collection_logs_content() -> str: """Get recent collection service logs.""" try: # This would read from actual log files # For now, return a placeholder current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") return f"""[{current_time}] INFO - Data Collection Service Logs Recent log entries would be displayed here from the data collection service. This would include: - Service startup/shutdown events - Collector connection status changes - Data collection statistics - Error messages and warnings - Performance metrics To view real logs, check the logs/ directory or configure log file monitoring. """ except Exception as e: return f"Error loading logs: {str(e)}" def _check_data_collection_service_running() -> bool: """Check if data collection service is running.""" try: # Check for running processes (simplified) for proc in psutil.process_iter(['pid', 'name', 'cmdline']): try: if proc.info['cmdline']: cmdline = ' '.join(proc.info['cmdline']) if 'start_data_collection.py' in cmdline or 'collection_service' in cmdline: return True except (psutil.NoSuchProcess, psutil.AccessDenied): continue return False except: return False