Vasily.onl f09864d61b 4.0 - 3.0 Implement strategy analysis tables and repository for backtesting
- Added `StrategyRun` and `StrategySignal` models to track strategy execution sessions and generated signals, respectively, ensuring a clear separation from live trading data.
- Introduced `StrategyRepository` for managing database operations related to strategy runs and signals, including methods for creating, updating, and retrieving strategy data.
- Updated `DatabaseOperations` to integrate the new repository, enhancing the overall architecture and maintaining consistency with existing database access patterns.
- Enhanced documentation to reflect the new database schema and repository functionalities, ensuring clarity for future development and usage.

These changes establish a robust foundation for strategy analysis and backtesting, aligning with project goals for modularity, performance, and maintainability.
2025-06-12 15:29:14 +08:00

361 lines
15 KiB
Python

"""
Database Models for Crypto Trading Bot Platform
SQLAlchemy models corresponding to the database schema_clean.sql
"""
from datetime import datetime
from decimal import Decimal
from typing import Optional, Dict, Any
from sqlalchemy import (
Boolean, Column, DateTime, ForeignKey, Integer,
String, Text, DECIMAL, Index, CheckConstraint,
UniqueConstraint, text
)
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import declarative_base, relationship
from sqlalchemy.sql import func
# Create base class for all models
Base = declarative_base()
class MarketData(Base):
"""OHLCV Market Data - Primary table for bot operations"""
__tablename__ = 'market_data'
id = Column(Integer, primary_key=True)
exchange = Column(String(50), nullable=False, default='okx')
symbol = Column(String(20), nullable=False)
timeframe = Column(String(5), nullable=False) # 1m, 5m, 15m, 1h, 4h, 1d
timestamp = Column(DateTime(timezone=True), nullable=False)
open = Column(DECIMAL(18, 8), nullable=False)
high = Column(DECIMAL(18, 8), nullable=False)
low = Column(DECIMAL(18, 8), nullable=False)
close = Column(DECIMAL(18, 8), nullable=False)
volume = Column(DECIMAL(18, 8), nullable=False)
trades_count = Column(Integer)
created_at = Column(DateTime(timezone=True), default=func.now())
# Constraints
__table_args__ = (
UniqueConstraint('exchange', 'symbol', 'timeframe', 'timestamp', name='unique_market_data'),
Index('idx_market_data_lookup', 'symbol', 'timeframe', 'timestamp'),
Index('idx_market_data_recent', 'timestamp'),
Index('idx_market_data_symbol', 'symbol'),
Index('idx_market_data_timeframe', 'timeframe'),
)
def __repr__(self):
return f"<MarketData({self.symbol} {self.timeframe} {self.timestamp} O:{self.open} C:{self.close})>"
class RawTrade(Base):
"""Raw Trade Data - For debugging, compliance, and detailed backtesting"""
__tablename__ = 'raw_trades'
id = Column(Integer, primary_key=True)
exchange = Column(String(50), nullable=False, default='okx')
symbol = Column(String(20), nullable=False)
timestamp = Column(DateTime(timezone=True), nullable=False)
data_type = Column(String(20), nullable=False) # ticker, trade, orderbook, candle, balance
raw_data = Column(JSONB, nullable=False) # Complete API response
created_at = Column(DateTime(timezone=True), default=func.now())
__table_args__ = (
Index('idx_raw_trades_symbol_time', 'symbol', 'timestamp'),
Index('idx_raw_trades_type', 'data_type'),
Index('idx_raw_trades_timestamp', 'timestamp'),
Index('idx_raw_trades_recent', 'created_at'),
)
def __repr__(self):
return f"<RawTrade({self.symbol} {self.data_type} {self.timestamp})>"
class Bot(Base):
"""Bot Management - Bot instances with configuration"""
__tablename__ = 'bots'
id = Column(Integer, primary_key=True)
name = Column(String(100), nullable=False)
strategy_name = Column(String(50), nullable=False)
symbol = Column(String(20), nullable=False)
timeframe = Column(String(5), nullable=False)
status = Column(String(20), nullable=False, default='inactive')
config_file = Column(String(200)) # Path to JSON config
virtual_balance = Column(DECIMAL(18, 8), default=Decimal('10000'))
current_balance = Column(DECIMAL(18, 8), default=Decimal('10000'))
last_heartbeat = Column(DateTime(timezone=True))
created_at = Column(DateTime(timezone=True), default=func.now())
updated_at = Column(DateTime(timezone=True), default=func.now(), onupdate=func.now())
# Relationships
signals = relationship("Signal", back_populates="bot", cascade="all, delete-orphan")
trades = relationship("Trade", back_populates="bot", cascade="all, delete-orphan")
performance = relationship("BotPerformance", back_populates="bot", cascade="all, delete-orphan")
__table_args__ = (
CheckConstraint("status IN ('active', 'inactive', 'error', 'paused')", name='chk_bot_status'),
Index('idx_bots_status', 'status'),
Index('idx_bots_symbol', 'symbol'),
Index('idx_bots_strategy', 'strategy_name'),
Index('idx_bots_last_heartbeat', 'last_heartbeat'),
)
@property
def pnl(self) -> Decimal:
"""Calculate current profit/loss"""
return self.current_balance - self.virtual_balance
@property
def is_active(self) -> bool:
"""Check if bot is currently active"""
return self.status == 'active'
def __repr__(self):
return f"<Bot({self.name} - {self.strategy_name} - {self.status})>"
class Signal(Base):
"""Trading Signals - Generated by strategies for analysis"""
__tablename__ = 'signals'
id = Column(Integer, primary_key=True)
bot_id = Column(Integer, ForeignKey('bots.id', ondelete='CASCADE'), nullable=False)
timestamp = Column(DateTime(timezone=True), nullable=False)
signal_type = Column(String(10), nullable=False) # buy, sell, hold
price = Column(DECIMAL(18, 8))
confidence = Column(DECIMAL(5, 4)) # 0.0000 to 1.0000
indicators = Column(JSONB) # Technical indicator values (using JSONB to match schema_clean.sql)
created_at = Column(DateTime(timezone=True), default=func.now())
# Relationships
bot = relationship("Bot", back_populates="signals")
trades = relationship("Trade", back_populates="signal")
__table_args__ = (
CheckConstraint("signal_type IN ('buy', 'sell', 'hold')", name='chk_signal_type'),
CheckConstraint("confidence >= 0 AND confidence <= 1", name='chk_confidence'),
Index('idx_signals_bot_time', 'bot_id', 'timestamp'),
Index('idx_signals_type', 'signal_type'),
Index('idx_signals_timestamp', 'timestamp'),
)
def __repr__(self):
return f"<Signal({self.signal_type} - {self.price} - {self.confidence})>"
class Trade(Base):
"""Trade Execution Records - Virtual trading results"""
__tablename__ = 'trades'
id = Column(Integer, primary_key=True)
bot_id = Column(Integer, ForeignKey('bots.id', ondelete='CASCADE'), nullable=False)
signal_id = Column(Integer, ForeignKey('signals.id', ondelete='SET NULL'))
timestamp = Column(DateTime(timezone=True), nullable=False)
side = Column(String(5), nullable=False) # buy, sell
price = Column(DECIMAL(18, 8), nullable=False)
quantity = Column(DECIMAL(18, 8), nullable=False)
fees = Column(DECIMAL(18, 8), default=Decimal('0'))
pnl = Column(DECIMAL(18, 8)) # Profit/loss for this trade
balance_after = Column(DECIMAL(18, 8)) # Portfolio balance after trade
created_at = Column(DateTime(timezone=True), default=func.now())
# Relationships
bot = relationship("Bot", back_populates="trades")
signal = relationship("Signal", back_populates="trades")
__table_args__ = (
CheckConstraint("side IN ('buy', 'sell')", name='chk_trade_side'),
CheckConstraint("price > 0", name='chk_positive_price'),
CheckConstraint("quantity > 0", name='chk_positive_quantity'),
CheckConstraint("fees >= 0", name='chk_non_negative_fees'),
Index('idx_trades_bot_time', 'bot_id', 'timestamp'),
Index('idx_trades_side', 'side'),
Index('idx_trades_timestamp', 'timestamp'),
)
@property
def trade_value(self) -> Decimal:
"""Calculate the total value of this trade"""
return self.price * self.quantity
@property
def net_pnl(self) -> Decimal:
"""Calculate net PnL after fees"""
return (self.pnl or Decimal('0')) - self.fees
def __repr__(self):
return f"<Trade({self.side} {self.quantity} @ {self.price} - PnL: {self.pnl})>"
class BotPerformance(Base):
"""Bot Performance Snapshots - For portfolio visualization"""
__tablename__ = 'bot_performance'
id = Column(Integer, primary_key=True)
bot_id = Column(Integer, ForeignKey('bots.id', ondelete='CASCADE'), nullable=False)
timestamp = Column(DateTime(timezone=True), nullable=False)
total_value = Column(DECIMAL(18, 8), nullable=False)
cash_balance = Column(DECIMAL(18, 8), nullable=False)
crypto_balance = Column(DECIMAL(18, 8), nullable=False)
total_trades = Column(Integer, default=0)
winning_trades = Column(Integer, default=0)
total_fees = Column(DECIMAL(18, 8), default=Decimal('0'))
created_at = Column(DateTime(timezone=True), default=func.now())
# Relationships
bot = relationship("Bot", back_populates="performance")
__table_args__ = (
CheckConstraint(
"total_value >= 0 AND cash_balance >= 0 AND crypto_balance >= 0 AND "
"total_trades >= 0 AND winning_trades >= 0 AND total_fees >= 0",
name='chk_non_negative_values'
),
CheckConstraint("winning_trades <= total_trades", name='chk_winning_trades_logic'),
Index('idx_bot_performance_bot_time', 'bot_id', 'timestamp'),
Index('idx_bot_performance_timestamp', 'timestamp'),
)
@property
def win_rate(self) -> float:
"""Calculate win rate percentage"""
if self.total_trades == 0:
return 0.0
return (self.winning_trades / self.total_trades) * 100
@property
def portfolio_allocation(self) -> Dict[str, float]:
"""Calculate portfolio allocation percentages"""
if self.total_value == 0:
return {"cash": 0.0, "crypto": 0.0}
cash_pct = float(self.cash_balance / self.total_value * 100)
crypto_pct = float(self.crypto_balance / self.total_value * 100)
return {"cash": cash_pct, "crypto": crypto_pct}
def __repr__(self):
return f"<BotPerformance(Bot {self.bot_id} - Value: {self.total_value} - Win Rate: {self.win_rate:.2f}%)>"
class StrategyRun(Base):
"""Strategy Execution Sessions - For tracking strategy backtesting and analysis runs"""
__tablename__ = 'strategy_runs'
id = Column(Integer, primary_key=True)
strategy_name = Column(String(100), nullable=False)
symbol = Column(String(20), nullable=False)
timeframe = Column(String(5), nullable=False)
start_time = Column(DateTime(timezone=True), nullable=False)
end_time = Column(DateTime(timezone=True))
status = Column(String(20), nullable=False, default='running') # running, completed, failed
config = Column(JSONB) # Strategy configuration parameters
run_metadata = Column(JSONB) # Run metadata (backtesting params, etc.)
total_signals = Column(Integer, default=0)
created_at = Column(DateTime(timezone=True), default=func.now())
updated_at = Column(DateTime(timezone=True), default=func.now(), onupdate=func.now())
# Relationships
strategy_signals = relationship("StrategySignal", back_populates="strategy_run", cascade="all, delete-orphan")
__table_args__ = (
CheckConstraint("status IN ('running', 'completed', 'failed')", name='chk_strategy_run_status'),
Index('idx_strategy_runs_strategy_time', 'strategy_name', 'start_time'),
Index('idx_strategy_runs_symbol', 'symbol'),
Index('idx_strategy_runs_status', 'status'),
Index('idx_strategy_runs_timeframe', 'timeframe'),
)
def __repr__(self):
return f"<StrategyRun({self.strategy_name} - {self.symbol} - {self.status})>"
class StrategySignal(Base):
"""Strategy Analysis Signals - Generated by strategies for analysis and backtesting (separate from bot signals)"""
__tablename__ = 'strategy_signals'
id = Column(Integer, primary_key=True)
run_id = Column(Integer, ForeignKey('strategy_runs.id', ondelete='CASCADE'), nullable=False)
strategy_name = Column(String(100), nullable=False)
strategy_config = Column(JSONB) # Strategy configuration used for this signal
symbol = Column(String(20), nullable=False)
timeframe = Column(String(5), nullable=False)
timestamp = Column(DateTime(timezone=True), nullable=False)
signal_type = Column(String(20), nullable=False) # buy, sell, hold, entry_long, exit_long, etc.
price = Column(DECIMAL(18, 8))
confidence = Column(DECIMAL(5, 4)) # 0.0000 to 1.0000
signal_metadata = Column(JSONB) # Additional signal metadata (indicator values, etc.)
created_at = Column(DateTime(timezone=True), default=func.now())
# Relationships
strategy_run = relationship("StrategyRun", back_populates="strategy_signals")
__table_args__ = (
CheckConstraint("signal_type IN ('buy', 'sell', 'hold', 'entry_long', 'exit_long', 'entry_short', 'exit_short', 'stop_loss', 'take_profit')", name='chk_strategy_signal_type'),
CheckConstraint("confidence >= 0 AND confidence <= 1", name='chk_strategy_confidence'),
Index('idx_strategy_signals_strategy_time', 'strategy_name', 'timestamp'),
Index('idx_strategy_signals_run_time', 'run_id', 'timestamp'),
Index('idx_strategy_signals_symbol_timeframe', 'symbol', 'timeframe'),
Index('idx_strategy_signals_type', 'signal_type'),
Index('idx_strategy_signals_timestamp', 'timestamp'),
)
def __repr__(self):
return f"<StrategySignal({self.strategy_name} - {self.signal_type} - {self.price} - {self.confidence})>"
# Reference tables for system configuration
class SupportedTimeframe(Base):
"""Supported timeframes configuration"""
__tablename__ = 'supported_timeframes'
timeframe = Column(String(5), primary_key=True)
description = Column(String(50))
minutes = Column(Integer)
def __repr__(self):
return f"<SupportedTimeframe({self.timeframe} - {self.description})>"
class SupportedExchange(Base):
"""Supported exchanges configuration"""
__tablename__ = 'supported_exchanges'
exchange = Column(String(50), primary_key=True)
name = Column(String(100))
api_url = Column(String(200))
enabled = Column(Boolean, default=True)
def __repr__(self):
return f"<SupportedExchange({self.exchange} - {self.name} - Enabled: {self.enabled})>"
# Helper functions for model operations
def get_model_by_table_name(table_name: str):
"""Get model class by table name"""
table_to_model = {
'market_data': MarketData,
'raw_trades': RawTrade,
'bots': Bot,
'signals': Signal,
'trades': Trade,
'bot_performance': BotPerformance,
'strategy_runs': StrategyRun,
'strategy_signals': StrategySignal,
'supported_timeframes': SupportedTimeframe,
'supported_exchanges': SupportedExchange,
}
return table_to_model.get(table_name)
def create_all_tables(engine):
"""Create all tables in the database"""
Base.metadata.create_all(engine)
def drop_all_tables(engine):
"""Drop all tables from the database"""
Base.metadata.drop_all(engine)