Refactor database configuration and schema for Crypto Trading Bot Platform
- Updated `docker-compose.yml` to remove hardcoded passwords, relying on environment variables for PostgreSQL and Redis configurations. - Modified `env.template` to reflect new password settings and ensure secure handling of sensitive information. - Introduced a new `database/connection.py` file for improved database connection management, including connection pooling and session handling. - Updated `database/models.py` to align with the new schema in `schema_clean.sql`, utilizing JSONB for optimized data storage. - Enhanced `setup.md` documentation to clarify the initialization process and emphasize the importance of the `.env` file for configuration. - Added a new `scripts/init_database.py` script for automated database initialization and verification, ensuring all tables are created as expected.
This commit is contained in:
487
database/connection.py
Normal file
487
database/connection.py
Normal file
@@ -0,0 +1,487 @@
|
||||
"""
|
||||
Database Connection Utility for Crypto Trading Bot Platform
|
||||
Provides connection pooling, session management, and database utilities
|
||||
"""
|
||||
|
||||
import os
|
||||
import json
|
||||
import logging
|
||||
from contextlib import contextmanager
|
||||
from typing import Optional, Generator, Any, Dict, List, Union
|
||||
from pathlib import Path
|
||||
|
||||
# Load environment variables from .env file if it exists
|
||||
try:
|
||||
from dotenv import load_dotenv
|
||||
env_file = Path(__file__).parent.parent / '.env'
|
||||
if env_file.exists():
|
||||
load_dotenv(env_file)
|
||||
except ImportError:
|
||||
# dotenv not available, proceed without it
|
||||
pass
|
||||
|
||||
from sqlalchemy import create_engine, Engine, text, event
|
||||
from sqlalchemy.orm import sessionmaker, Session, scoped_session
|
||||
from sqlalchemy.pool import QueuePool
|
||||
from sqlalchemy.exc import SQLAlchemyError, OperationalError, DisconnectionError
|
||||
from sqlalchemy.engine import make_url
|
||||
import time
|
||||
from functools import wraps
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
from .models import Base, create_all_tables, drop_all_tables, RawTrade
|
||||
|
||||
|
||||
# Configure logging
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DatabaseConfig:
|
||||
"""Database configuration class"""
|
||||
|
||||
def __init__(self):
|
||||
self.database_url = os.getenv(
|
||||
'DATABASE_URL',
|
||||
'postgresql://dashboard@localhost:5434/dashboard'
|
||||
)
|
||||
|
||||
# Connection pool settings
|
||||
self.pool_size = int(os.getenv('DB_POOL_SIZE', '5'))
|
||||
self.max_overflow = int(os.getenv('DB_MAX_OVERFLOW', '10'))
|
||||
self.pool_pre_ping = os.getenv('DB_POOL_PRE_PING', 'true').lower() == 'true'
|
||||
self.pool_recycle = int(os.getenv('DB_POOL_RECYCLE', '3600')) # 1 hour
|
||||
|
||||
# Connection timeout settings
|
||||
self.connect_timeout = int(os.getenv('DB_CONNECT_TIMEOUT', '30'))
|
||||
self.statement_timeout = int(os.getenv('DB_STATEMENT_TIMEOUT', '30000')) # 30 seconds in ms
|
||||
|
||||
# Retry settings
|
||||
self.max_retries = int(os.getenv('DB_MAX_RETRIES', '3'))
|
||||
self.retry_delay = float(os.getenv('DB_RETRY_DELAY', '1.0'))
|
||||
|
||||
# SSL settings
|
||||
self.ssl_mode = os.getenv('DB_SSL_MODE', 'prefer')
|
||||
|
||||
logger.info(f"Database configuration initialized for: {self._safe_url()}")
|
||||
|
||||
def _safe_url(self) -> str:
|
||||
"""Return database URL with password masked for logging"""
|
||||
url = make_url(self.database_url)
|
||||
return str(url.set(password="***"))
|
||||
|
||||
def get_engine_kwargs(self) -> Dict[str, Any]:
|
||||
"""Get SQLAlchemy engine configuration"""
|
||||
return {
|
||||
'poolclass': QueuePool,
|
||||
'pool_size': self.pool_size,
|
||||
'max_overflow': self.max_overflow,
|
||||
'pool_pre_ping': self.pool_pre_ping,
|
||||
'pool_recycle': self.pool_recycle,
|
||||
'connect_args': {
|
||||
'connect_timeout': self.connect_timeout,
|
||||
'options': f'-c statement_timeout={self.statement_timeout}',
|
||||
'sslmode': self.ssl_mode,
|
||||
},
|
||||
'echo': os.getenv('DEBUG', 'false').lower() == 'true',
|
||||
'future': True, # Use SQLAlchemy 2.0 style
|
||||
}
|
||||
|
||||
|
||||
class DatabaseManager:
|
||||
"""
|
||||
Database manager with connection pooling and session management
|
||||
"""
|
||||
|
||||
def __init__(self, config: Optional[DatabaseConfig] = None):
|
||||
self.config = config or DatabaseConfig()
|
||||
self._engine: Optional[Engine] = None
|
||||
self._session_factory: Optional[sessionmaker] = None
|
||||
self._scoped_session: Optional[scoped_session] = None
|
||||
|
||||
def initialize(self) -> None:
|
||||
"""Initialize database engine and session factory"""
|
||||
try:
|
||||
logger.info("Initializing database connection...")
|
||||
|
||||
# Create engine with retry logic
|
||||
self._engine = self._create_engine_with_retry()
|
||||
|
||||
# Setup session factory
|
||||
self._session_factory = sessionmaker(
|
||||
bind=self._engine,
|
||||
autocommit=False,
|
||||
autoflush=False,
|
||||
expire_on_commit=False
|
||||
)
|
||||
|
||||
# Setup scoped session for thread safety
|
||||
self._scoped_session = scoped_session(self._session_factory)
|
||||
|
||||
# Add connection event listeners
|
||||
self._setup_connection_events()
|
||||
|
||||
logger.info("Database connection initialized successfully")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to initialize database: {e}")
|
||||
raise
|
||||
|
||||
def _create_engine_with_retry(self) -> Engine:
|
||||
"""Create database engine with retry logic"""
|
||||
for attempt in range(self.config.max_retries):
|
||||
try:
|
||||
engine = create_engine(
|
||||
self.config.database_url,
|
||||
**self.config.get_engine_kwargs()
|
||||
)
|
||||
|
||||
# Test connection
|
||||
with engine.connect() as conn:
|
||||
conn.execute(text("SELECT 1"))
|
||||
logger.info("Database connection test successful")
|
||||
|
||||
return engine
|
||||
|
||||
except (OperationalError, DisconnectionError) as e:
|
||||
attempt_num = attempt + 1
|
||||
if attempt_num == self.config.max_retries:
|
||||
logger.error(f"Failed to connect to database after {self.config.max_retries} attempts: {e}")
|
||||
raise
|
||||
|
||||
logger.warning(f"Database connection attempt {attempt_num} failed: {e}. Retrying in {self.config.retry_delay}s...")
|
||||
time.sleep(self.config.retry_delay)
|
||||
|
||||
def _setup_connection_events(self) -> None:
|
||||
"""Setup SQLAlchemy connection event listeners"""
|
||||
if not self._engine:
|
||||
return
|
||||
|
||||
@event.listens_for(self._engine, "connect")
|
||||
def set_sqlite_pragma(dbapi_connection, connection_record):
|
||||
"""Set connection-level settings"""
|
||||
if 'postgresql' in str(self._engine.url):
|
||||
with dbapi_connection.cursor() as cursor:
|
||||
# Set timezone to UTC
|
||||
cursor.execute("SET timezone TO 'UTC'")
|
||||
# Set application name for monitoring
|
||||
cursor.execute("SET application_name TO 'crypto_trading_bot'")
|
||||
|
||||
@event.listens_for(self._engine, "checkout")
|
||||
def checkout_listener(dbapi_connection, connection_record, connection_proxy):
|
||||
"""Log connection checkout"""
|
||||
logger.debug("Database connection checked out from pool")
|
||||
|
||||
@event.listens_for(self._engine, "checkin")
|
||||
def checkin_listener(dbapi_connection, connection_record):
|
||||
"""Log connection checkin"""
|
||||
logger.debug("Database connection returned to pool")
|
||||
|
||||
@property
|
||||
def engine(self) -> Engine:
|
||||
"""Get database engine"""
|
||||
if not self._engine:
|
||||
raise RuntimeError("Database not initialized. Call initialize() first.")
|
||||
return self._engine
|
||||
|
||||
@property
|
||||
def session_factory(self) -> sessionmaker:
|
||||
"""Get session factory"""
|
||||
if not self._session_factory:
|
||||
raise RuntimeError("Database not initialized. Call initialize() first.")
|
||||
return self._session_factory
|
||||
|
||||
def create_session(self) -> Session:
|
||||
"""Create a new database session"""
|
||||
if not self._session_factory:
|
||||
raise RuntimeError("Database not initialized. Call initialize() first.")
|
||||
return self._session_factory()
|
||||
|
||||
@contextmanager
|
||||
def get_session(self) -> Generator[Session, None, None]:
|
||||
"""
|
||||
Context manager for database sessions with automatic cleanup
|
||||
|
||||
Usage:
|
||||
with db_manager.get_session() as session:
|
||||
# Use session here
|
||||
pass
|
||||
"""
|
||||
session = self.create_session()
|
||||
try:
|
||||
yield session
|
||||
session.commit()
|
||||
except Exception:
|
||||
session.rollback()
|
||||
raise
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
@contextmanager
|
||||
def get_scoped_session(self) -> Generator[Session, None, None]:
|
||||
"""
|
||||
Context manager for scoped sessions (thread-safe)
|
||||
|
||||
Usage:
|
||||
with db_manager.get_scoped_session() as session:
|
||||
# Use session here
|
||||
pass
|
||||
"""
|
||||
if not self._scoped_session:
|
||||
raise RuntimeError("Database not initialized. Call initialize() first.")
|
||||
|
||||
session = self._scoped_session()
|
||||
try:
|
||||
yield session
|
||||
session.commit()
|
||||
except Exception:
|
||||
session.rollback()
|
||||
raise
|
||||
finally:
|
||||
self._scoped_session.remove()
|
||||
|
||||
def test_connection(self) -> bool:
|
||||
"""Test database connection"""
|
||||
try:
|
||||
with self.get_session() as session:
|
||||
session.execute(text("SELECT 1"))
|
||||
logger.info("Database connection test successful")
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"Database connection test failed: {e}")
|
||||
return False
|
||||
|
||||
def get_pool_status(self) -> Dict[str, Any]:
|
||||
"""Get connection pool status"""
|
||||
if not self._engine or not hasattr(self._engine.pool, 'size'):
|
||||
return {"status": "Pool not available"}
|
||||
|
||||
pool = self._engine.pool
|
||||
return {
|
||||
"pool_size": pool.size(),
|
||||
"checked_in": pool.checkedin(),
|
||||
"checked_out": pool.checkedout(),
|
||||
"overflow": pool.overflow(),
|
||||
}
|
||||
|
||||
def create_tables(self) -> None:
|
||||
"""Create all database tables"""
|
||||
try:
|
||||
create_all_tables(self.engine)
|
||||
logger.info("Database tables created successfully")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to create database tables: {e}")
|
||||
raise
|
||||
|
||||
def drop_tables(self) -> None:
|
||||
"""Drop all database tables"""
|
||||
try:
|
||||
drop_all_tables(self.engine)
|
||||
logger.info("Database tables dropped successfully")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to drop database tables: {e}")
|
||||
raise
|
||||
|
||||
def execute_schema_file(self, schema_file_path: str) -> None:
|
||||
"""Execute SQL schema file"""
|
||||
try:
|
||||
with open(schema_file_path, 'r') as file:
|
||||
schema_sql = file.read()
|
||||
|
||||
with self.get_session() as session:
|
||||
# Split and execute each statement
|
||||
statements = [stmt.strip() for stmt in schema_sql.split(';') if stmt.strip()]
|
||||
for statement in statements:
|
||||
if statement:
|
||||
session.execute(text(statement))
|
||||
|
||||
logger.info(f"Schema file executed successfully: {schema_file_path}")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to execute schema file {schema_file_path}: {e}")
|
||||
raise
|
||||
|
||||
def close(self) -> None:
|
||||
"""Close database connections and cleanup"""
|
||||
try:
|
||||
if self._scoped_session:
|
||||
self._scoped_session.remove()
|
||||
|
||||
if self._engine:
|
||||
self._engine.dispose()
|
||||
logger.info("Database connections closed")
|
||||
except Exception as e:
|
||||
logger.error(f"Error closing database connections: {e}")
|
||||
|
||||
|
||||
def retry_on_database_error(max_retries: int = 3, delay: float = 1.0):
|
||||
"""
|
||||
Decorator to retry database operations on transient errors
|
||||
|
||||
Args:
|
||||
max_retries: Maximum number of retry attempts
|
||||
delay: Delay between retries in seconds
|
||||
"""
|
||||
def decorator(func):
|
||||
@wraps(func)
|
||||
def wrapper(*args, **kwargs):
|
||||
for attempt in range(max_retries):
|
||||
try:
|
||||
return func(*args, **kwargs)
|
||||
except (OperationalError, DisconnectionError) as e:
|
||||
if attempt == max_retries - 1:
|
||||
logger.error(f"Database operation failed after {max_retries} attempts: {e}")
|
||||
raise
|
||||
|
||||
logger.warning(f"Database operation failed (attempt {attempt + 1}): {e}. Retrying in {delay}s...")
|
||||
time.sleep(delay)
|
||||
return None
|
||||
return wrapper
|
||||
return decorator
|
||||
|
||||
|
||||
# Global database manager instance
|
||||
db_manager = DatabaseManager()
|
||||
|
||||
|
||||
def get_db_manager() -> DatabaseManager:
|
||||
"""Get global database manager instance"""
|
||||
return db_manager
|
||||
|
||||
|
||||
def init_database(config: Optional[DatabaseConfig] = None) -> DatabaseManager:
|
||||
"""
|
||||
Initialize global database manager
|
||||
|
||||
Args:
|
||||
config: Optional database configuration
|
||||
|
||||
Returns:
|
||||
DatabaseManager instance
|
||||
"""
|
||||
global db_manager
|
||||
if config:
|
||||
db_manager = DatabaseManager(config)
|
||||
db_manager.initialize()
|
||||
return db_manager
|
||||
|
||||
|
||||
# Convenience functions for common operations
|
||||
def get_session() -> Generator[Session, None, None]:
|
||||
"""Get database session (convenience function)"""
|
||||
return db_manager.get_session()
|
||||
|
||||
|
||||
def get_scoped_session() -> Generator[Session, None, None]:
|
||||
"""Get scoped database session (convenience function)"""
|
||||
return db_manager.get_scoped_session()
|
||||
|
||||
|
||||
def test_connection() -> bool:
|
||||
"""Test database connection (convenience function)"""
|
||||
return db_manager.test_connection()
|
||||
|
||||
|
||||
def get_pool_status() -> Dict[str, Any]:
|
||||
"""Get connection pool status (convenience function)"""
|
||||
return db_manager.get_pool_status()
|
||||
|
||||
|
||||
class RawDataManager:
|
||||
"""
|
||||
Utility class for managing raw data storage and retention
|
||||
"""
|
||||
|
||||
def __init__(self, db_manager: DatabaseManager):
|
||||
self.db_manager = db_manager
|
||||
|
||||
def store_raw_data(self, exchange: str, symbol: str, data_type: str,
|
||||
raw_data: Dict[str, Any], timestamp: Optional[datetime] = None) -> None:
|
||||
"""
|
||||
Store raw API data
|
||||
|
||||
Args:
|
||||
exchange: Exchange name (e.g., 'okx')
|
||||
symbol: Trading symbol (e.g., 'BTC-USDT')
|
||||
data_type: Type of data (ticker, trade, orderbook, candle, balance)
|
||||
raw_data: Complete API response
|
||||
timestamp: Data timestamp (defaults to now)
|
||||
"""
|
||||
from .models import RawTrade
|
||||
|
||||
if timestamp is None:
|
||||
timestamp = datetime.utcnow()
|
||||
|
||||
try:
|
||||
with self.db_manager.get_session() as session:
|
||||
raw_trade = RawTrade(
|
||||
exchange=exchange,
|
||||
symbol=symbol,
|
||||
timestamp=timestamp,
|
||||
data_type=data_type,
|
||||
raw_data=raw_data
|
||||
)
|
||||
session.add(raw_trade)
|
||||
logger.debug(f"Stored raw data: {exchange} {symbol} {data_type}")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to store raw data: {e}")
|
||||
raise
|
||||
|
||||
def cleanup_old_raw_data(self, days_to_keep: int = 7) -> int:
|
||||
"""
|
||||
Clean up old raw data to prevent table bloat
|
||||
|
||||
Args:
|
||||
days_to_keep: Number of days to retain data
|
||||
|
||||
Returns:
|
||||
Number of records deleted
|
||||
"""
|
||||
try:
|
||||
cutoff_date = datetime.utcnow() - timedelta(days=days_to_keep)
|
||||
|
||||
with self.db_manager.get_session() as session:
|
||||
deleted_count = session.execute(
|
||||
text("DELETE FROM raw_trades WHERE created_at < :cutoff_date"),
|
||||
{"cutoff_date": cutoff_date}
|
||||
).rowcount
|
||||
|
||||
logger.info(f"Cleaned up {deleted_count} old raw data records")
|
||||
return deleted_count
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to cleanup raw data: {e}")
|
||||
raise
|
||||
|
||||
def get_raw_data_stats(self) -> Dict[str, Any]:
|
||||
"""Get statistics about raw data storage"""
|
||||
try:
|
||||
with self.db_manager.get_session() as session:
|
||||
result = session.execute(text("""
|
||||
SELECT
|
||||
COUNT(*) as total_records,
|
||||
COUNT(DISTINCT symbol) as unique_symbols,
|
||||
COUNT(DISTINCT data_type) as data_types,
|
||||
MIN(created_at) as oldest_record,
|
||||
MAX(created_at) as newest_record,
|
||||
pg_size_pretty(pg_total_relation_size('raw_trades')) as table_size
|
||||
FROM raw_trades
|
||||
""")).fetchone()
|
||||
|
||||
if result:
|
||||
return {
|
||||
"total_records": result.total_records,
|
||||
"unique_symbols": result.unique_symbols,
|
||||
"data_types": result.data_types,
|
||||
"oldest_record": result.oldest_record,
|
||||
"newest_record": result.newest_record,
|
||||
"table_size": result.table_size
|
||||
}
|
||||
else:
|
||||
return {"status": "No data available"}
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to get raw data stats: {e}")
|
||||
return {"error": str(e)}
|
||||
|
||||
|
||||
# Add raw data manager to the global manager
|
||||
def get_raw_data_manager() -> RawDataManager:
|
||||
"""Get raw data manager instance"""
|
||||
return RawDataManager(db_manager)
|
||||
@@ -7,21 +7,11 @@ CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
|
||||
-- Set timezone
|
||||
SET timezone = 'UTC';
|
||||
|
||||
-- Create initial user with appropriate permissions (if not exists)
|
||||
DO $$
|
||||
BEGIN
|
||||
IF NOT EXISTS (SELECT FROM pg_catalog.pg_roles WHERE rolname = 'dashboard') THEN
|
||||
CREATE ROLE dashboard WITH LOGIN PASSWORD 'dashboard123';
|
||||
END IF;
|
||||
END
|
||||
$$;
|
||||
|
||||
-- Grant permissions
|
||||
GRANT ALL PRIVILEGES ON DATABASE dashboard TO dashboard;
|
||||
GRANT ALL ON SCHEMA public TO dashboard;
|
||||
-- The database user is automatically created by Docker with the POSTGRES_USER and POSTGRES_PASSWORD
|
||||
-- environment variables, so we don't need to create it here
|
||||
|
||||
-- Create initial comment
|
||||
COMMENT ON DATABASE dashboard IS 'Crypto Trading Bot Dashboard Database';
|
||||
|
||||
-- Execute the main schema file
|
||||
\i /docker-entrypoint-initdb.d/schema.sql
|
||||
-- Execute the clean schema file (without TimescaleDB hypertables for simpler setup)
|
||||
\i /docker-entrypoint-initdb.d/schema_clean.sql
|
||||
@@ -1,11 +1,10 @@
|
||||
-- Database Schema for Crypto Trading Bot Platform
|
||||
-- Following PRD specifications with optimized schema for time-series data
|
||||
-- Version: 1.0
|
||||
-- Database Schema for Crypto Trading Bot Platform (Clean Version)
|
||||
-- Following PRD specifications - optimized for rapid development
|
||||
-- Version: 1.0 (without hypertables for now)
|
||||
-- Author: Generated following PRD requirements
|
||||
|
||||
-- Create extensions
|
||||
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
|
||||
CREATE EXTENSION IF NOT EXISTS "timescaledb" CASCADE;
|
||||
|
||||
-- Set timezone to UTC for consistency
|
||||
SET timezone = 'UTC';
|
||||
@@ -15,7 +14,6 @@ SET timezone = 'UTC';
|
||||
-- ========================================
|
||||
|
||||
-- OHLCV Market Data (primary table for bot operations)
|
||||
-- This is the main table that bots will use for trading decisions
|
||||
CREATE TABLE market_data (
|
||||
id SERIAL PRIMARY KEY,
|
||||
exchange VARCHAR(50) NOT NULL DEFAULT 'okx',
|
||||
@@ -32,34 +30,28 @@ CREATE TABLE market_data (
|
||||
CONSTRAINT unique_market_data UNIQUE(exchange, symbol, timeframe, timestamp)
|
||||
);
|
||||
|
||||
-- Convert to hypertable for TimescaleDB optimization
|
||||
SELECT create_hypertable('market_data', 'timestamp', if_not_exists => TRUE);
|
||||
|
||||
-- Create optimized indexes for market data
|
||||
CREATE INDEX idx_market_data_lookup ON market_data(symbol, timeframe, timestamp);
|
||||
CREATE INDEX idx_market_data_recent ON market_data(timestamp DESC) WHERE timestamp > NOW() - INTERVAL '7 days';
|
||||
CREATE INDEX idx_market_data_symbol ON market_data(symbol);
|
||||
CREATE INDEX idx_market_data_timeframe ON market_data(timeframe);
|
||||
CREATE INDEX idx_market_data_recent ON market_data(timestamp DESC);
|
||||
|
||||
-- Raw Trade Data (optional, for detailed backtesting only)
|
||||
-- This table is partitioned by timestamp for better performance
|
||||
-- Raw Trade Data (for debugging, compliance, and detailed backtesting)
|
||||
CREATE TABLE raw_trades (
|
||||
id SERIAL PRIMARY KEY,
|
||||
exchange VARCHAR(50) NOT NULL DEFAULT 'okx',
|
||||
symbol VARCHAR(20) NOT NULL,
|
||||
timestamp TIMESTAMPTZ NOT NULL,
|
||||
type VARCHAR(10) NOT NULL, -- trade, order, balance, tick, books
|
||||
data JSONB NOT NULL, -- response from the exchange
|
||||
data_type VARCHAR(20) NOT NULL, -- ticker, trade, orderbook, candle, balance
|
||||
raw_data JSONB NOT NULL, -- Complete API response
|
||||
created_at TIMESTAMPTZ DEFAULT NOW()
|
||||
) PARTITION BY RANGE (timestamp);
|
||||
);
|
||||
|
||||
-- Create initial partition for current month
|
||||
CREATE TABLE raw_trades_current PARTITION OF raw_trades
|
||||
FOR VALUES FROM (date_trunc('month', NOW())) TO (date_trunc('month', NOW()) + INTERVAL '1 month');
|
||||
|
||||
-- Index for raw trades
|
||||
-- Create indexes for raw trades (optimized for time-series queries)
|
||||
CREATE INDEX idx_raw_trades_symbol_time ON raw_trades(symbol, timestamp);
|
||||
CREATE INDEX idx_raw_trades_type ON raw_trades(type);
|
||||
CREATE INDEX idx_raw_trades_type ON raw_trades(data_type);
|
||||
CREATE INDEX idx_raw_trades_timestamp ON raw_trades(timestamp DESC);
|
||||
CREATE INDEX idx_raw_trades_recent ON raw_trades(created_at DESC);
|
||||
|
||||
-- ========================================
|
||||
-- BOT MANAGEMENT TABLES
|
||||
@@ -106,9 +98,6 @@ CREATE TABLE signals (
|
||||
CONSTRAINT chk_confidence CHECK (confidence >= 0 AND confidence <= 1)
|
||||
);
|
||||
|
||||
-- Convert signals to hypertable for TimescaleDB optimization
|
||||
SELECT create_hypertable('signals', 'timestamp', if_not_exists => TRUE);
|
||||
|
||||
-- Indexes for signals
|
||||
CREATE INDEX idx_signals_bot_time ON signals(bot_id, timestamp);
|
||||
CREATE INDEX idx_signals_type ON signals(signal_type);
|
||||
@@ -137,9 +126,6 @@ CREATE TABLE trades (
|
||||
CONSTRAINT chk_non_negative_fees CHECK (fees >= 0)
|
||||
);
|
||||
|
||||
-- Convert trades to hypertable for TimescaleDB optimization
|
||||
SELECT create_hypertable('trades', 'timestamp', if_not_exists => TRUE);
|
||||
|
||||
-- Indexes for trades
|
||||
CREATE INDEX idx_trades_bot_time ON trades(bot_id, timestamp);
|
||||
CREATE INDEX idx_trades_side ON trades(side);
|
||||
@@ -172,9 +158,6 @@ CREATE TABLE bot_performance (
|
||||
CONSTRAINT chk_winning_trades_logic CHECK (winning_trades <= total_trades)
|
||||
);
|
||||
|
||||
-- Convert bot_performance to hypertable for TimescaleDB optimization
|
||||
SELECT create_hypertable('bot_performance', 'timestamp', if_not_exists => TRUE);
|
||||
|
||||
-- Indexes for bot performance
|
||||
CREATE INDEX idx_bot_performance_bot_time ON bot_performance(bot_id, timestamp);
|
||||
CREATE INDEX idx_bot_performance_timestamp ON bot_performance(timestamp);
|
||||
@@ -198,24 +181,6 @@ CREATE TRIGGER trigger_update_bot_timestamp
|
||||
FOR EACH ROW
|
||||
EXECUTE FUNCTION update_bot_timestamp();
|
||||
|
||||
-- Function to create monthly partition for raw_trades
|
||||
CREATE OR REPLACE FUNCTION create_monthly_partition_for_raw_trades(partition_date DATE)
|
||||
RETURNS VOID AS $$
|
||||
DECLARE
|
||||
partition_name TEXT;
|
||||
start_date DATE;
|
||||
end_date DATE;
|
||||
BEGIN
|
||||
start_date := date_trunc('month', partition_date);
|
||||
end_date := start_date + INTERVAL '1 month';
|
||||
partition_name := 'raw_trades_' || to_char(start_date, 'YYYY_MM');
|
||||
|
||||
EXECUTE format('CREATE TABLE IF NOT EXISTS %I PARTITION OF raw_trades
|
||||
FOR VALUES FROM (%L) TO (%L)',
|
||||
partition_name, start_date, end_date);
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
-- ========================================
|
||||
-- VIEWS FOR COMMON QUERIES
|
||||
-- ========================================
|
||||
@@ -311,13 +276,15 @@ ON CONFLICT (exchange) DO NOTHING;
|
||||
-- ========================================
|
||||
|
||||
COMMENT ON TABLE market_data IS 'Primary OHLCV market data table optimized for bot operations and backtesting';
|
||||
COMMENT ON TABLE raw_trades IS 'Optional raw trade data for detailed backtesting (partitioned by month)';
|
||||
COMMENT ON TABLE raw_trades IS 'Raw API responses and trade data for debugging, compliance, and detailed analysis';
|
||||
COMMENT ON TABLE bots IS 'Bot instance management with JSON configuration references';
|
||||
COMMENT ON TABLE signals IS 'Trading signals generated by strategies with confidence scores';
|
||||
COMMENT ON TABLE trades IS 'Virtual trade execution records with P&L tracking';
|
||||
COMMENT ON TABLE bot_performance IS 'Portfolio performance snapshots for visualization';
|
||||
|
||||
COMMENT ON COLUMN market_data.timestamp IS 'Right-aligned timestamp (candle close time) following exchange standards';
|
||||
COMMENT ON COLUMN raw_trades.data_type IS 'Type of raw data: ticker, trade, orderbook, candle, balance';
|
||||
COMMENT ON COLUMN raw_trades.raw_data IS 'Complete unprocessed API response in JSONB format';
|
||||
COMMENT ON COLUMN bots.config_file IS 'Path to JSON configuration file for strategy parameters';
|
||||
COMMENT ON COLUMN signals.confidence IS 'Signal confidence score from 0.0000 to 1.0000';
|
||||
COMMENT ON COLUMN trades.pnl IS 'Profit/Loss for this specific trade in base currency';
|
||||
@@ -1,6 +1,6 @@
|
||||
"""
|
||||
Database Models for Crypto Trading Bot Platform
|
||||
SQLAlchemy models corresponding to the database schema
|
||||
SQLAlchemy models corresponding to the database schema_clean.sql
|
||||
"""
|
||||
|
||||
from datetime import datetime
|
||||
@@ -9,9 +9,10 @@ from typing import Optional, Dict, Any
|
||||
|
||||
from sqlalchemy import (
|
||||
Boolean, Column, DateTime, ForeignKey, Integer,
|
||||
String, Text, DECIMAL, JSON, Index, CheckConstraint,
|
||||
String, Text, DECIMAL, Index, CheckConstraint,
|
||||
UniqueConstraint, text
|
||||
)
|
||||
from sqlalchemy.dialects.postgresql import JSONB
|
||||
from sqlalchemy.ext.declarative import declarative_base
|
||||
from sqlalchemy.orm import relationship
|
||||
from sqlalchemy.sql import func
|
||||
@@ -51,24 +52,26 @@ class MarketData(Base):
|
||||
|
||||
|
||||
class RawTrade(Base):
|
||||
"""Raw Trade Data - Optional table for detailed backtesting"""
|
||||
"""Raw Trade Data - For debugging, compliance, and detailed backtesting"""
|
||||
__tablename__ = 'raw_trades'
|
||||
|
||||
id = Column(Integer, primary_key=True)
|
||||
exchange = Column(String(50), nullable=False, default='okx')
|
||||
symbol = Column(String(20), nullable=False)
|
||||
timestamp = Column(DateTime(timezone=True), nullable=False)
|
||||
type = Column(String(10), nullable=False) # trade, order, balance, tick, books
|
||||
data = Column(JSON, nullable=False) # Response from exchange
|
||||
data_type = Column(String(20), nullable=False) # ticker, trade, orderbook, candle, balance
|
||||
raw_data = Column(JSONB, nullable=False) # Complete API response
|
||||
created_at = Column(DateTime(timezone=True), default=func.now())
|
||||
|
||||
__table_args__ = (
|
||||
Index('idx_raw_trades_symbol_time', 'symbol', 'timestamp'),
|
||||
Index('idx_raw_trades_type', 'type'),
|
||||
Index('idx_raw_trades_type', 'data_type'),
|
||||
Index('idx_raw_trades_timestamp', 'timestamp'),
|
||||
Index('idx_raw_trades_recent', 'created_at'),
|
||||
)
|
||||
|
||||
def __repr__(self):
|
||||
return f"<RawTrade({self.symbol} {self.type} {self.timestamp})>"
|
||||
return f"<RawTrade({self.symbol} {self.data_type} {self.timestamp})>"
|
||||
|
||||
|
||||
class Bot(Base):
|
||||
@@ -125,7 +128,7 @@ class Signal(Base):
|
||||
signal_type = Column(String(10), nullable=False) # buy, sell, hold
|
||||
price = Column(DECIMAL(18, 8))
|
||||
confidence = Column(DECIMAL(5, 4)) # 0.0000 to 1.0000
|
||||
indicators = Column(JSON) # Technical indicator values
|
||||
indicators = Column(JSONB) # Technical indicator values (using JSONB to match schema_clean.sql)
|
||||
created_at = Column(DateTime(timezone=True), default=func.now())
|
||||
|
||||
# Relationships
|
||||
|
||||
@@ -36,6 +36,23 @@ CREATE INDEX idx_market_data_symbol ON market_data(symbol);
|
||||
CREATE INDEX idx_market_data_timeframe ON market_data(timeframe);
|
||||
CREATE INDEX idx_market_data_recent ON market_data(timestamp DESC);
|
||||
|
||||
-- Raw Trade Data (for debugging, compliance, and detailed backtesting)
|
||||
CREATE TABLE raw_trades (
|
||||
id SERIAL PRIMARY KEY,
|
||||
exchange VARCHAR(50) NOT NULL DEFAULT 'okx',
|
||||
symbol VARCHAR(20) NOT NULL,
|
||||
timestamp TIMESTAMPTZ NOT NULL,
|
||||
data_type VARCHAR(20) NOT NULL, -- ticker, trade, orderbook, candle, balance
|
||||
raw_data JSONB NOT NULL, -- Complete API response
|
||||
created_at TIMESTAMPTZ DEFAULT NOW()
|
||||
);
|
||||
|
||||
-- Create indexes for raw trades (optimized for time-series queries)
|
||||
CREATE INDEX idx_raw_trades_symbol_time ON raw_trades(symbol, timestamp);
|
||||
CREATE INDEX idx_raw_trades_type ON raw_trades(data_type);
|
||||
CREATE INDEX idx_raw_trades_timestamp ON raw_trades(timestamp DESC);
|
||||
CREATE INDEX idx_raw_trades_recent ON raw_trades(created_at DESC);
|
||||
|
||||
-- ========================================
|
||||
-- BOT MANAGEMENT TABLES
|
||||
-- ========================================
|
||||
@@ -259,12 +276,15 @@ ON CONFLICT (exchange) DO NOTHING;
|
||||
-- ========================================
|
||||
|
||||
COMMENT ON TABLE market_data IS 'Primary OHLCV market data table optimized for bot operations and backtesting';
|
||||
COMMENT ON TABLE raw_trades IS 'Raw API responses and trade data for debugging, compliance, and detailed analysis';
|
||||
COMMENT ON TABLE bots IS 'Bot instance management with JSON configuration references';
|
||||
COMMENT ON TABLE signals IS 'Trading signals generated by strategies with confidence scores';
|
||||
COMMENT ON TABLE trades IS 'Virtual trade execution records with P&L tracking';
|
||||
COMMENT ON TABLE bot_performance IS 'Portfolio performance snapshots for visualization';
|
||||
|
||||
COMMENT ON COLUMN market_data.timestamp IS 'Right-aligned timestamp (candle close time) following exchange standards';
|
||||
COMMENT ON COLUMN raw_trades.data_type IS 'Type of raw data: ticker, trade, orderbook, candle, balance';
|
||||
COMMENT ON COLUMN raw_trades.raw_data IS 'Complete unprocessed API response in JSONB format';
|
||||
COMMENT ON COLUMN bots.config_file IS 'Path to JSON configuration file for strategy parameters';
|
||||
COMMENT ON COLUMN signals.confidence IS 'Signal confidence score from 0.0000 to 1.0000';
|
||||
COMMENT ON COLUMN trades.pnl IS 'Profit/Loss for this specific trade in base currency';
|
||||
|
||||
Reference in New Issue
Block a user