Add daily model training scripts and terminal UI for live trading

- Introduced `train_daily.sh` for automating daily model retraining, including data download and model training steps.
- Added `install_cron.sh` for setting up a cron job to run the daily training script.
- Created `setup_schedule.sh` for configuring Systemd timers for daily training tasks.
- Implemented a terminal UI using Rich for real-time monitoring of trading performance, including metrics display and log handling.
- Updated `pyproject.toml` to include the `rich` dependency for UI functionality.
- Enhanced `.gitignore` to exclude model and log files.
- Added database support for trade persistence and metrics calculation.
- Updated README with installation and usage instructions for the new features.
This commit is contained in:
2026-01-18 11:08:57 +08:00
parent 35992ee374
commit b5550f4ff4
27 changed files with 3582 additions and 113 deletions

View File

@@ -0,0 +1,10 @@
"""Terminal UI module for live trading dashboard."""
from .dashboard import TradingDashboard
from .state import SharedState
from .log_handler import UILogHandler
__all__ = [
"TradingDashboard",
"SharedState",
"UILogHandler",
]

View File

@@ -0,0 +1,240 @@
"""Main trading dashboard UI orchestration."""
import logging
import queue
import threading
import time
from typing import Optional, Callable
from rich.console import Console
from rich.layout import Layout
from rich.live import Live
from rich.panel import Panel
from rich.text import Text
from .state import SharedState
from .log_handler import LogBuffer, UILogHandler
from .keyboard import KeyboardHandler
from .panels import (
HeaderPanel,
TabBar,
LogPanel,
HelpBar,
build_summary_panel,
)
from ..db.database import TradingDatabase
from ..db.metrics import MetricsCalculator, PeriodMetrics
logger = logging.getLogger(__name__)
class TradingDashboard:
"""
Main trading dashboard orchestrator.
Runs in a separate thread and provides real-time UI updates
while the trading loop runs in the main thread.
"""
def __init__(
self,
state: SharedState,
db: TradingDatabase,
log_queue: queue.Queue,
on_quit: Optional[Callable] = None,
):
self.state = state
self.db = db
self.log_queue = log_queue
self.on_quit = on_quit
self.console = Console()
self.log_buffer = LogBuffer(max_entries=1000)
self.keyboard = KeyboardHandler()
self.metrics_calculator = MetricsCalculator(db)
self._running = False
self._thread: Optional[threading.Thread] = None
self._active_tab = 0
self._cached_metrics: dict[int, PeriodMetrics] = {}
self._last_metrics_refresh = 0.0
def start(self) -> None:
"""Start the dashboard in a separate thread."""
if self._running:
return
self._running = True
self._thread = threading.Thread(target=self._run, daemon=True)
self._thread.start()
logger.debug("Dashboard thread started")
def stop(self) -> None:
"""Stop the dashboard."""
self._running = False
if self._thread:
self._thread.join(timeout=2.0)
logger.debug("Dashboard thread stopped")
def _run(self) -> None:
"""Main dashboard loop."""
try:
with self.keyboard:
with Live(
self._build_layout(),
console=self.console,
refresh_per_second=1,
screen=True,
) as live:
while self._running and self.state.is_running():
# Process keyboard input
action = self.keyboard.get_action(timeout=0.1)
if action:
self._handle_action(action)
# Drain log queue
self.log_buffer.drain_queue(self.log_queue)
# Refresh metrics periodically (every 5 seconds)
now = time.time()
if now - self._last_metrics_refresh > 5.0:
self._refresh_metrics()
self._last_metrics_refresh = now
# Update display
live.update(self._build_layout())
# Small sleep to prevent CPU spinning
time.sleep(0.1)
except Exception as e:
logger.error(f"Dashboard error: {e}", exc_info=True)
finally:
self._running = False
def _handle_action(self, action: str) -> None:
"""Handle keyboard action."""
if action == "quit":
logger.info("Quit requested from UI")
self.state.stop()
if self.on_quit:
self.on_quit()
elif action == "refresh":
self._refresh_metrics()
logger.debug("Manual refresh triggered")
elif action == "filter":
new_filter = self.log_buffer.cycle_filter()
logger.debug(f"Log filter changed to: {new_filter}")
elif action == "filter_trades":
self.log_buffer.set_filter(LogBuffer.FILTER_TRADES)
logger.debug("Log filter set to: trades")
elif action == "filter_all":
self.log_buffer.set_filter(LogBuffer.FILTER_ALL)
logger.debug("Log filter set to: all")
elif action == "filter_errors":
self.log_buffer.set_filter(LogBuffer.FILTER_ERRORS)
logger.debug("Log filter set to: errors")
elif action == "tab_general":
self._active_tab = 0
elif action == "tab_monthly":
if self._has_monthly_data():
self._active_tab = 1
elif action == "tab_weekly":
if self._has_weekly_data():
self._active_tab = 2
elif action == "tab_daily":
self._active_tab = 3
def _refresh_metrics(self) -> None:
"""Refresh metrics from database."""
try:
self._cached_metrics[0] = self.metrics_calculator.get_all_time_metrics()
self._cached_metrics[1] = self.metrics_calculator.get_monthly_metrics()
self._cached_metrics[2] = self.metrics_calculator.get_weekly_metrics()
self._cached_metrics[3] = self.metrics_calculator.get_daily_metrics()
except Exception as e:
logger.warning(f"Failed to refresh metrics: {e}")
def _has_monthly_data(self) -> bool:
"""Check if monthly tab should be shown."""
try:
return self.metrics_calculator.has_monthly_data()
except Exception:
return False
def _has_weekly_data(self) -> bool:
"""Check if weekly tab should be shown."""
try:
return self.metrics_calculator.has_weekly_data()
except Exception:
return False
def _build_layout(self) -> Layout:
"""Build the complete dashboard layout."""
layout = Layout()
# Calculate available height
term_height = self.console.height or 40
# Header takes 3 lines
# Help bar takes 1 line
# Summary panel takes about 12-14 lines
# Rest goes to logs
log_height = max(8, term_height - 20)
layout.split_column(
Layout(name="header", size=3),
Layout(name="summary", size=14),
Layout(name="logs", size=log_height),
Layout(name="help", size=1),
)
# Header
layout["header"].update(HeaderPanel(self.state).render())
# Summary panel with tabs
current_metrics = self._cached_metrics.get(self._active_tab)
tab_bar = TabBar(active_tab=self._active_tab)
layout["summary"].update(
build_summary_panel(
state=self.state,
metrics=current_metrics,
tab_bar=tab_bar,
has_monthly=self._has_monthly_data(),
has_weekly=self._has_weekly_data(),
)
)
# Log panel
layout["logs"].update(LogPanel(self.log_buffer).render(height=log_height))
# Help bar
layout["help"].update(HelpBar().render())
return layout
def setup_ui_logging(log_queue: queue.Queue) -> UILogHandler:
"""
Set up logging to capture messages for UI.
Args:
log_queue: Queue to send log messages to
Returns:
UILogHandler instance
"""
handler = UILogHandler(log_queue)
handler.setLevel(logging.INFO)
# Add handler to root logger
root_logger = logging.getLogger()
root_logger.addHandler(handler)
return handler

128
live_trading/ui/keyboard.py Normal file
View File

@@ -0,0 +1,128 @@
"""Keyboard input handling for terminal UI."""
import sys
import select
import termios
import tty
from typing import Optional, Callable
from dataclasses import dataclass
@dataclass
class KeyAction:
"""Represents a keyboard action."""
key: str
action: str
description: str
class KeyboardHandler:
"""
Non-blocking keyboard input handler.
Uses terminal raw mode to capture single keypresses
without waiting for Enter.
"""
# Key mappings
ACTIONS = {
"q": "quit",
"Q": "quit",
"\x03": "quit", # Ctrl+C
"r": "refresh",
"R": "refresh",
"f": "filter",
"F": "filter",
"t": "filter_trades",
"T": "filter_trades",
"l": "filter_all",
"L": "filter_all",
"e": "filter_errors",
"E": "filter_errors",
"1": "tab_general",
"2": "tab_monthly",
"3": "tab_weekly",
"4": "tab_daily",
}
def __init__(self):
self._old_settings = None
self._enabled = False
def enable(self) -> bool:
"""
Enable raw keyboard input mode.
Returns:
True if enabled successfully
"""
try:
if not sys.stdin.isatty():
return False
self._old_settings = termios.tcgetattr(sys.stdin)
tty.setcbreak(sys.stdin.fileno())
self._enabled = True
return True
except Exception:
return False
def disable(self) -> None:
"""Restore normal terminal mode."""
if self._enabled and self._old_settings:
try:
termios.tcsetattr(
sys.stdin,
termios.TCSADRAIN,
self._old_settings,
)
except Exception:
pass
self._enabled = False
def get_key(self, timeout: float = 0.1) -> Optional[str]:
"""
Get a keypress if available (non-blocking).
Args:
timeout: Seconds to wait for input
Returns:
Key character or None if no input
"""
if not self._enabled:
return None
try:
readable, _, _ = select.select([sys.stdin], [], [], timeout)
if readable:
return sys.stdin.read(1)
except Exception:
pass
return None
def get_action(self, timeout: float = 0.1) -> Optional[str]:
"""
Get action name for pressed key.
Args:
timeout: Seconds to wait for input
Returns:
Action name or None
"""
key = self.get_key(timeout)
if key:
return self.ACTIONS.get(key)
return None
def __enter__(self):
"""Context manager entry."""
self.enable()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit."""
self.disable()
return False

View File

@@ -0,0 +1,178 @@
"""Custom logging handler for UI integration."""
import logging
import queue
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
from collections import deque
@dataclass
class LogEntry:
"""A single log entry."""
timestamp: str
level: str
message: str
logger_name: str
@property
def level_color(self) -> str:
"""Get Rich color for log level."""
colors = {
"DEBUG": "dim",
"INFO": "white",
"WARNING": "yellow",
"ERROR": "red",
"CRITICAL": "bold red",
}
return colors.get(self.level, "white")
class UILogHandler(logging.Handler):
"""
Custom logging handler that sends logs to UI.
Uses a thread-safe queue to pass log entries from the trading
thread to the UI thread.
"""
def __init__(
self,
log_queue: queue.Queue,
max_entries: int = 1000,
):
super().__init__()
self.log_queue = log_queue
self.max_entries = max_entries
self.setFormatter(
logging.Formatter("%(asctime)s [%(levelname)s] %(name)s: %(message)s")
)
def emit(self, record: logging.LogRecord) -> None:
"""Emit a log record to the queue."""
try:
entry = LogEntry(
timestamp=datetime.fromtimestamp(record.created).strftime(
"%H:%M:%S"
),
level=record.levelname,
message=self.format_message(record),
logger_name=record.name,
)
# Non-blocking put, drop if queue is full
try:
self.log_queue.put_nowait(entry)
except queue.Full:
pass
except Exception:
self.handleError(record)
def format_message(self, record: logging.LogRecord) -> str:
"""Format the log message."""
return record.getMessage()
class LogBuffer:
"""
Thread-safe buffer for log entries with filtering support.
Maintains a fixed-size buffer of log entries and supports
filtering by log type.
"""
FILTER_ALL = "all"
FILTER_ERRORS = "errors"
FILTER_TRADES = "trades"
FILTER_SIGNALS = "signals"
FILTERS = [FILTER_ALL, FILTER_ERRORS, FILTER_TRADES, FILTER_SIGNALS]
def __init__(self, max_entries: int = 1000):
self.max_entries = max_entries
self._entries: deque[LogEntry] = deque(maxlen=max_entries)
self._current_filter = self.FILTER_ALL
def add(self, entry: LogEntry) -> None:
"""Add a log entry to the buffer."""
self._entries.append(entry)
def get_filtered(self, limit: int = 50) -> list[LogEntry]:
"""
Get filtered log entries.
Args:
limit: Maximum number of entries to return
Returns:
List of filtered LogEntry objects (most recent first)
"""
entries = list(self._entries)
if self._current_filter == self.FILTER_ERRORS:
entries = [e for e in entries if e.level in ("ERROR", "CRITICAL")]
elif self._current_filter == self.FILTER_TRADES:
# Key terms indicating actual trading activity
include_keywords = [
"order", "entry", "exit", "executed", "filled",
"opening", "closing", "position opened", "position closed"
]
# Terms to exclude (noise)
exclude_keywords = [
"sync complete", "0 positions", "portfolio: 0 positions"
]
entries = [
e for e in entries
if any(kw in e.message.lower() for kw in include_keywords)
and not any(ex in e.message.lower() for ex in exclude_keywords)
]
elif self._current_filter == self.FILTER_SIGNALS:
signal_keywords = ["signal", "z_score", "prob", "z="]
entries = [
e for e in entries
if any(kw in e.message.lower() for kw in signal_keywords)
]
# Return most recent entries
return list(reversed(entries[-limit:]))
def set_filter(self, filter_name: str) -> None:
"""Set a specific filter."""
if filter_name in self.FILTERS:
self._current_filter = filter_name
def cycle_filter(self) -> str:
"""Cycle to next filter and return its name."""
current_idx = self.FILTERS.index(self._current_filter)
next_idx = (current_idx + 1) % len(self.FILTERS)
self._current_filter = self.FILTERS[next_idx]
return self._current_filter
def get_current_filter(self) -> str:
"""Get current filter name."""
return self._current_filter
def clear(self) -> None:
"""Clear all log entries."""
self._entries.clear()
def drain_queue(self, log_queue: queue.Queue) -> int:
"""
Drain log entries from queue into buffer.
Args:
log_queue: Queue to drain from
Returns:
Number of entries drained
"""
count = 0
while True:
try:
entry = log_queue.get_nowait()
self.add(entry)
count += 1
except queue.Empty:
break
return count

399
live_trading/ui/panels.py Normal file
View File

@@ -0,0 +1,399 @@
"""UI panel components using Rich."""
from typing import Optional
from rich.console import Console, Group
from rich.panel import Panel
from rich.table import Table
from rich.text import Text
from rich.layout import Layout
from .state import SharedState, PositionState, StrategyState, AccountState
from .log_handler import LogBuffer, LogEntry
from ..db.metrics import PeriodMetrics
def format_pnl(value: float, include_sign: bool = True) -> Text:
"""Format PnL value with color."""
if value > 0:
sign = "+" if include_sign else ""
return Text(f"{sign}${value:.2f}", style="green")
elif value < 0:
return Text(f"${value:.2f}", style="red")
else:
return Text(f"${value:.2f}", style="white")
def format_pct(value: float, include_sign: bool = True) -> Text:
"""Format percentage value with color."""
if value > 0:
sign = "+" if include_sign else ""
return Text(f"{sign}{value:.2f}%", style="green")
elif value < 0:
return Text(f"{value:.2f}%", style="red")
else:
return Text(f"{value:.2f}%", style="white")
def format_side(side: str) -> Text:
"""Format position side with color."""
if side.lower() == "long":
return Text("LONG", style="bold green")
else:
return Text("SHORT", style="bold red")
class HeaderPanel:
"""Header panel with title and mode indicator."""
def __init__(self, state: SharedState):
self.state = state
def render(self) -> Panel:
"""Render the header panel."""
mode = self.state.get_mode()
eth_symbol, _ = self.state.get_symbols()
mode_style = "yellow" if mode == "DEMO" else "bold red"
mode_text = Text(f"[{mode}]", style=mode_style)
title = Text()
title.append("REGIME REVERSION STRATEGY - LIVE TRADING", style="bold white")
title.append(" ")
title.append(mode_text)
title.append(" ")
title.append(eth_symbol, style="cyan")
return Panel(title, style="blue", height=3)
class TabBar:
"""Tab bar for period selection."""
TABS = ["1:General", "2:Monthly", "3:Weekly", "4:Daily"]
def __init__(self, active_tab: int = 0):
self.active_tab = active_tab
def render(
self,
has_monthly: bool = True,
has_weekly: bool = True,
) -> Text:
"""Render the tab bar."""
text = Text()
text.append(" ")
for i, tab in enumerate(self.TABS):
# Check if tab should be shown
if i == 1 and not has_monthly:
continue
if i == 2 and not has_weekly:
continue
if i == self.active_tab:
text.append(f"[{tab}]", style="bold white on blue")
else:
text.append(f"[{tab}]", style="dim")
text.append(" ")
return text
class MetricsPanel:
"""Panel showing trading metrics."""
def __init__(self, metrics: Optional[PeriodMetrics] = None):
self.metrics = metrics
def render(self) -> Table:
"""Render metrics as a table."""
table = Table(
show_header=False,
show_edge=False,
box=None,
padding=(0, 1),
)
table.add_column("Label", style="dim")
table.add_column("Value")
if self.metrics is None or self.metrics.total_trades == 0:
table.add_row("Status", Text("No trade data", style="dim"))
return table
m = self.metrics
table.add_row("Total PnL:", format_pnl(m.total_pnl))
table.add_row("Win Rate:", Text(f"{m.win_rate:.1f}%", style="white"))
table.add_row("Total Trades:", Text(str(m.total_trades), style="white"))
table.add_row(
"Win/Loss:",
Text(f"{m.winning_trades}/{m.losing_trades}", style="white"),
)
table.add_row(
"Avg Duration:",
Text(f"{m.avg_trade_duration_hours:.1f}h", style="white"),
)
table.add_row("Max Drawdown:", format_pnl(-m.max_drawdown))
table.add_row("Best Trade:", format_pnl(m.best_trade))
table.add_row("Worst Trade:", format_pnl(m.worst_trade))
return table
class PositionPanel:
"""Panel showing current position."""
def __init__(self, position: Optional[PositionState] = None):
self.position = position
def render(self) -> Table:
"""Render position as a table."""
table = Table(
show_header=False,
show_edge=False,
box=None,
padding=(0, 1),
)
table.add_column("Label", style="dim")
table.add_column("Value")
if self.position is None:
table.add_row("Status", Text("No open position", style="dim"))
return table
p = self.position
table.add_row("Side:", format_side(p.side))
table.add_row("Entry:", Text(f"${p.entry_price:.2f}", style="white"))
table.add_row("Current:", Text(f"${p.current_price:.2f}", style="white"))
# Unrealized PnL
pnl_text = Text()
pnl_text.append_text(format_pnl(p.unrealized_pnl))
pnl_text.append(" (")
pnl_text.append_text(format_pct(p.unrealized_pnl_pct))
pnl_text.append(")")
table.add_row("Unrealized:", pnl_text)
table.add_row("Size:", Text(f"${p.size_usdt:.2f}", style="white"))
# SL/TP
if p.side == "long":
sl_dist = (p.stop_loss_price / p.entry_price - 1) * 100
tp_dist = (p.take_profit_price / p.entry_price - 1) * 100
else:
sl_dist = (1 - p.stop_loss_price / p.entry_price) * 100
tp_dist = (1 - p.take_profit_price / p.entry_price) * 100
sl_text = Text(f"${p.stop_loss_price:.2f} ({sl_dist:+.1f}%)", style="red")
tp_text = Text(f"${p.take_profit_price:.2f} ({tp_dist:+.1f}%)", style="green")
table.add_row("Stop Loss:", sl_text)
table.add_row("Take Profit:", tp_text)
return table
class AccountPanel:
"""Panel showing account information."""
def __init__(self, account: Optional[AccountState] = None):
self.account = account
def render(self) -> Table:
"""Render account info as a table."""
table = Table(
show_header=False,
show_edge=False,
box=None,
padding=(0, 1),
)
table.add_column("Label", style="dim")
table.add_column("Value")
if self.account is None:
table.add_row("Status", Text("Loading...", style="dim"))
return table
a = self.account
table.add_row("Balance:", Text(f"${a.balance:.2f}", style="white"))
table.add_row("Available:", Text(f"${a.available:.2f}", style="white"))
table.add_row("Leverage:", Text(f"{a.leverage}x", style="cyan"))
return table
class StrategyPanel:
"""Panel showing strategy state."""
def __init__(self, strategy: Optional[StrategyState] = None):
self.strategy = strategy
def render(self) -> Table:
"""Render strategy state as a table."""
table = Table(
show_header=False,
show_edge=False,
box=None,
padding=(0, 1),
)
table.add_column("Label", style="dim")
table.add_column("Value")
if self.strategy is None:
table.add_row("Status", Text("Waiting...", style="dim"))
return table
s = self.strategy
# Z-score with color based on threshold
z_style = "white"
if abs(s.z_score) > 1.0:
z_style = "yellow"
if abs(s.z_score) > 1.5:
z_style = "bold yellow"
table.add_row("Z-Score:", Text(f"{s.z_score:.2f}", style=z_style))
# Probability with color
prob_style = "white"
if s.probability > 0.5:
prob_style = "green"
if s.probability > 0.7:
prob_style = "bold green"
table.add_row("Probability:", Text(f"{s.probability:.2f}", style=prob_style))
# Funding rate
funding_style = "green" if s.funding_rate >= 0 else "red"
table.add_row(
"Funding:",
Text(f"{s.funding_rate:.4f}", style=funding_style),
)
# Last action
action_style = "white"
if s.last_action == "entry":
action_style = "bold cyan"
elif s.last_action == "check_exit":
action_style = "yellow"
table.add_row("Last Action:", Text(s.last_action, style=action_style))
return table
class LogPanel:
"""Panel showing log entries."""
def __init__(self, log_buffer: LogBuffer):
self.log_buffer = log_buffer
def render(self, height: int = 10) -> Panel:
"""Render log panel."""
filter_name = self.log_buffer.get_current_filter().title()
entries = self.log_buffer.get_filtered(limit=height - 2)
lines = []
for entry in entries:
line = Text()
line.append(f"{entry.timestamp} ", style="dim")
line.append(f"[{entry.level}] ", style=entry.level_color)
line.append(entry.message)
lines.append(line)
if not lines:
lines.append(Text("No logs to display", style="dim"))
content = Group(*lines)
# Build "tabbed" title
tabs = []
# All Logs tab
if filter_name == "All":
tabs.append("[bold white on blue] [L]ogs [/]")
else:
tabs.append("[dim] [L]ogs [/]")
# Trades tab
if filter_name == "Trades":
tabs.append("[bold white on blue] [T]rades [/]")
else:
tabs.append("[dim] [T]rades [/]")
# Errors tab
if filter_name == "Errors":
tabs.append("[bold white on blue] [E]rrors [/]")
else:
tabs.append("[dim] [E]rrors [/]")
title = " ".join(tabs)
subtitle = "Press 'l', 't', 'e' to switch tabs"
return Panel(
content,
title=title,
subtitle=subtitle,
title_align="left",
subtitle_align="right",
border_style="blue",
)
class HelpBar:
"""Bottom help bar with keyboard shortcuts."""
def render(self) -> Text:
"""Render help bar."""
text = Text()
text.append(" [q]", style="bold")
text.append("Quit ", style="dim")
text.append("[r]", style="bold")
text.append("Refresh ", style="dim")
text.append("[1-4]", style="bold")
text.append("Tabs ", style="dim")
text.append("[l/t/e]", style="bold")
text.append("LogView", style="dim")
return text
def build_summary_panel(
state: SharedState,
metrics: Optional[PeriodMetrics],
tab_bar: TabBar,
has_monthly: bool,
has_weekly: bool,
) -> Panel:
"""Build the complete summary panel with all sections."""
# Create layout for summary content
layout = Layout()
# Tab bar at top
tabs = tab_bar.render(has_monthly, has_weekly)
# Create tables for each section
metrics_table = MetricsPanel(metrics).render()
position_table = PositionPanel(state.get_position()).render()
account_table = AccountPanel(state.get_account()).render()
strategy_table = StrategyPanel(state.get_strategy()).render()
# Build two-column layout
left_col = Table(show_header=True, show_edge=False, box=None, padding=(0, 2))
left_col.add_column("PERFORMANCE", style="bold cyan")
left_col.add_column("ACCOUNT", style="bold cyan")
left_col.add_row(metrics_table, account_table)
right_col = Table(show_header=True, show_edge=False, box=None, padding=(0, 2))
right_col.add_column("CURRENT POSITION", style="bold cyan")
right_col.add_column("STRATEGY STATE", style="bold cyan")
right_col.add_row(position_table, strategy_table)
# Combine into main table
main_table = Table(show_header=False, show_edge=False, box=None, expand=True)
main_table.add_column(ratio=1)
main_table.add_column(ratio=1)
main_table.add_row(left_col, right_col)
content = Group(tabs, Text(""), main_table)
return Panel(content, border_style="blue")

195
live_trading/ui/state.py Normal file
View File

@@ -0,0 +1,195 @@
"""Thread-safe shared state for UI and trading loop."""
import threading
from dataclasses import dataclass, field
from typing import Optional
from datetime import datetime, timezone
@dataclass
class PositionState:
"""Current position information."""
trade_id: str = ""
symbol: str = ""
side: str = ""
entry_price: float = 0.0
current_price: float = 0.0
size: float = 0.0
size_usdt: float = 0.0
unrealized_pnl: float = 0.0
unrealized_pnl_pct: float = 0.0
stop_loss_price: float = 0.0
take_profit_price: float = 0.0
@dataclass
class StrategyState:
"""Current strategy signal state."""
z_score: float = 0.0
probability: float = 0.0
funding_rate: float = 0.0
last_action: str = "hold"
last_reason: str = ""
last_signal_time: str = ""
@dataclass
class AccountState:
"""Account balance information."""
balance: float = 0.0
available: float = 0.0
leverage: int = 1
class SharedState:
"""
Thread-safe shared state between trading loop and UI.
All access to state fields should go through the getter/setter methods
which use a lock for thread safety.
"""
def __init__(self):
self._lock = threading.Lock()
self._position: Optional[PositionState] = None
self._strategy = StrategyState()
self._account = AccountState()
self._is_running = True
self._last_cycle_time: Optional[str] = None
self._mode = "DEMO"
self._eth_symbol = "ETH/USDT:USDT"
self._btc_symbol = "BTC/USDT:USDT"
# Position methods
def get_position(self) -> Optional[PositionState]:
"""Get current position state."""
with self._lock:
return self._position
def set_position(self, position: Optional[PositionState]) -> None:
"""Set current position state."""
with self._lock:
self._position = position
def update_position_price(self, current_price: float) -> None:
"""Update current price and recalculate PnL."""
with self._lock:
if self._position is None:
return
self._position.current_price = current_price
if self._position.side == "long":
pnl = (current_price - self._position.entry_price)
self._position.unrealized_pnl = pnl * self._position.size
pnl_pct = (current_price / self._position.entry_price - 1) * 100
else:
pnl = (self._position.entry_price - current_price)
self._position.unrealized_pnl = pnl * self._position.size
pnl_pct = (1 - current_price / self._position.entry_price) * 100
self._position.unrealized_pnl_pct = pnl_pct
def clear_position(self) -> None:
"""Clear current position."""
with self._lock:
self._position = None
# Strategy methods
def get_strategy(self) -> StrategyState:
"""Get current strategy state."""
with self._lock:
return StrategyState(
z_score=self._strategy.z_score,
probability=self._strategy.probability,
funding_rate=self._strategy.funding_rate,
last_action=self._strategy.last_action,
last_reason=self._strategy.last_reason,
last_signal_time=self._strategy.last_signal_time,
)
def update_strategy(
self,
z_score: float,
probability: float,
funding_rate: float,
action: str,
reason: str,
) -> None:
"""Update strategy state."""
with self._lock:
self._strategy.z_score = z_score
self._strategy.probability = probability
self._strategy.funding_rate = funding_rate
self._strategy.last_action = action
self._strategy.last_reason = reason
self._strategy.last_signal_time = datetime.now(
timezone.utc
).isoformat()
# Account methods
def get_account(self) -> AccountState:
"""Get current account state."""
with self._lock:
return AccountState(
balance=self._account.balance,
available=self._account.available,
leverage=self._account.leverage,
)
def update_account(
self,
balance: float,
available: float,
leverage: int,
) -> None:
"""Update account state."""
with self._lock:
self._account.balance = balance
self._account.available = available
self._account.leverage = leverage
# Control methods
def is_running(self) -> bool:
"""Check if trading loop is running."""
with self._lock:
return self._is_running
def stop(self) -> None:
"""Signal to stop trading loop."""
with self._lock:
self._is_running = False
def get_last_cycle_time(self) -> Optional[str]:
"""Get last trading cycle time."""
with self._lock:
return self._last_cycle_time
def set_last_cycle_time(self, time_str: str) -> None:
"""Set last trading cycle time."""
with self._lock:
self._last_cycle_time = time_str
# Config methods
def get_mode(self) -> str:
"""Get trading mode (DEMO/LIVE)."""
with self._lock:
return self._mode
def set_mode(self, mode: str) -> None:
"""Set trading mode."""
with self._lock:
self._mode = mode
def get_symbols(self) -> tuple[str, str]:
"""Get trading symbols (eth, btc)."""
with self._lock:
return self._eth_symbol, self._btc_symbol
def set_symbols(self, eth_symbol: str, btc_symbol: str) -> None:
"""Set trading symbols."""
with self._lock:
self._eth_symbol = eth_symbol
self._btc_symbol = btc_symbol