"""Custom logging handler for UI integration.""" import logging import queue from dataclasses import dataclass from datetime import datetime from typing import Optional from collections import deque @dataclass class LogEntry: """A single log entry.""" timestamp: str level: str message: str logger_name: str @property def level_color(self) -> str: """Get Rich color for log level.""" colors = { "DEBUG": "dim", "INFO": "white", "WARNING": "yellow", "ERROR": "red", "CRITICAL": "bold red", } return colors.get(self.level, "white") class UILogHandler(logging.Handler): """ Custom logging handler that sends logs to UI. Uses a thread-safe queue to pass log entries from the trading thread to the UI thread. """ def __init__( self, log_queue: queue.Queue, max_entries: int = 1000, ): super().__init__() self.log_queue = log_queue self.max_entries = max_entries self.setFormatter( logging.Formatter("%(asctime)s [%(levelname)s] %(name)s: %(message)s") ) def emit(self, record: logging.LogRecord) -> None: """Emit a log record to the queue.""" try: entry = LogEntry( timestamp=datetime.fromtimestamp(record.created).strftime( "%H:%M:%S" ), level=record.levelname, message=self.format_message(record), logger_name=record.name, ) # Non-blocking put, drop if queue is full try: self.log_queue.put_nowait(entry) except queue.Full: pass except Exception: self.handleError(record) def format_message(self, record: logging.LogRecord) -> str: """Format the log message.""" return record.getMessage() class LogBuffer: """ Thread-safe buffer for log entries with filtering support. Maintains a fixed-size buffer of log entries and supports filtering by log type. """ FILTER_ALL = "all" FILTER_ERRORS = "errors" FILTER_TRADES = "trades" FILTER_SIGNALS = "signals" FILTERS = [FILTER_ALL, FILTER_ERRORS, FILTER_TRADES, FILTER_SIGNALS] def __init__(self, max_entries: int = 1000): self.max_entries = max_entries self._entries: deque[LogEntry] = deque(maxlen=max_entries) self._current_filter = self.FILTER_ALL def add(self, entry: LogEntry) -> None: """Add a log entry to the buffer.""" self._entries.append(entry) def get_filtered(self, limit: int = 50) -> list[LogEntry]: """ Get filtered log entries. Args: limit: Maximum number of entries to return Returns: List of filtered LogEntry objects (most recent first) """ entries = list(self._entries) if self._current_filter == self.FILTER_ERRORS: entries = [e for e in entries if e.level in ("ERROR", "CRITICAL")] elif self._current_filter == self.FILTER_TRADES: # Key terms indicating actual trading activity include_keywords = [ "order", "entry", "exit", "executed", "filled", "opening", "closing", "position opened", "position closed" ] # Terms to exclude (noise) exclude_keywords = [ "sync complete", "0 positions", "portfolio: 0 positions" ] entries = [ e for e in entries if any(kw in e.message.lower() for kw in include_keywords) and not any(ex in e.message.lower() for ex in exclude_keywords) ] elif self._current_filter == self.FILTER_SIGNALS: signal_keywords = ["signal", "z_score", "prob", "z="] entries = [ e for e in entries if any(kw in e.message.lower() for kw in signal_keywords) ] # Return most recent entries return list(reversed(entries[-limit:])) def set_filter(self, filter_name: str) -> None: """Set a specific filter.""" if filter_name in self.FILTERS: self._current_filter = filter_name def cycle_filter(self) -> str: """Cycle to next filter and return its name.""" current_idx = self.FILTERS.index(self._current_filter) next_idx = (current_idx + 1) % len(self.FILTERS) self._current_filter = self.FILTERS[next_idx] return self._current_filter def get_current_filter(self) -> str: """Get current filter name.""" return self._current_filter def clear(self) -> None: """Clear all log entries.""" self._entries.clear() def drain_queue(self, log_queue: queue.Queue) -> int: """ Drain log entries from queue into buffer. Args: log_queue: Queue to drain from Returns: Number of entries drained """ count = 0 while True: try: entry = log_queue.get_nowait() self.add(entry) count += 1 except queue.Empty: break return count