diff --git a/database/connection.py b/database/connection.py index e88f248..5f4edcf 100644 --- a/database/connection.py +++ b/database/connection.py @@ -382,106 +382,4 @@ def test_connection() -> bool: def get_pool_status() -> Dict[str, Any]: """Get connection pool status (convenience function)""" - return db_manager.get_pool_status() - - -class RawDataManager: - """ - Utility class for managing raw data storage and retention - """ - - def __init__(self, db_manager: DatabaseManager): - self.db_manager = db_manager - - def store_raw_data(self, exchange: str, symbol: str, data_type: str, - raw_data: Dict[str, Any], timestamp: Optional[datetime] = None) -> None: - """ - Store raw API data - - Args: - exchange: Exchange name (e.g., 'okx') - symbol: Trading symbol (e.g., 'BTC-USDT') - data_type: Type of data (ticker, trade, orderbook, candle, balance) - raw_data: Complete API response - timestamp: Data timestamp (defaults to now) - """ - from .models import RawTrade - - if timestamp is None: - timestamp = datetime.utcnow() - - try: - with self.db_manager.get_session() as session: - raw_trade = RawTrade( - exchange=exchange, - symbol=symbol, - timestamp=timestamp, - data_type=data_type, - raw_data=raw_data - ) - session.add(raw_trade) - logger.debug(f"Stored raw data: {exchange} {symbol} {data_type}") - except Exception as e: - logger.error(f"Failed to store raw data: {e}") - raise - - def cleanup_old_raw_data(self, days_to_keep: int = 7) -> int: - """ - Clean up old raw data to prevent table bloat - - Args: - days_to_keep: Number of days to retain data - - Returns: - Number of records deleted - """ - try: - cutoff_date = datetime.utcnow() - timedelta(days=days_to_keep) - - with self.db_manager.get_session() as session: - deleted_count = session.execute( - text("DELETE FROM raw_trades WHERE created_at < :cutoff_date"), - {"cutoff_date": cutoff_date} - ).rowcount - - logger.info(f"Cleaned up {deleted_count} old raw data records") - return deleted_count - except Exception as e: - logger.error(f"Failed to cleanup raw data: {e}") - raise - - def get_raw_data_stats(self) -> Dict[str, Any]: - """Get statistics about raw data storage""" - try: - with self.db_manager.get_session() as session: - result = session.execute(text(""" - SELECT - COUNT(*) as total_records, - COUNT(DISTINCT symbol) as unique_symbols, - COUNT(DISTINCT data_type) as data_types, - MIN(created_at) as oldest_record, - MAX(created_at) as newest_record, - pg_size_pretty(pg_total_relation_size('raw_trades')) as table_size - FROM raw_trades - """)).fetchone() - - if result: - return { - "total_records": result.total_records, - "unique_symbols": result.unique_symbols, - "data_types": result.data_types, - "oldest_record": result.oldest_record, - "newest_record": result.newest_record, - "table_size": result.table_size - } - else: - return {"status": "No data available"} - except Exception as e: - logger.error(f"Failed to get raw data stats: {e}") - return {"error": str(e)} - - -# Add raw data manager to the global manager -def get_raw_data_manager() -> RawDataManager: - """Get raw data manager instance""" - return RawDataManager(db_manager) \ No newline at end of file + return db_manager.get_pool_status() \ No newline at end of file diff --git a/database/repositories/raw_trade_repository.py b/database/repositories/raw_trade_repository.py index cdeaa85..4ec3347 100644 --- a/database/repositories/raw_trade_repository.py +++ b/database/repositories/raw_trade_repository.py @@ -1,9 +1,9 @@ """Repository for raw_trades table operations.""" -from datetime import datetime +from datetime import datetime, timedelta from typing import Dict, Any, Optional, List -from sqlalchemy import desc +from sqlalchemy import desc, text from ..models import RawTrade from data.base_collector import MarketDataPoint @@ -106,4 +106,55 @@ class RawTradeRepository(BaseRepository): except Exception as e: self.log_error(f"Error retrieving raw trades for {symbol}: {e}") - raise DatabaseOperationError(f"Failed to retrieve raw trades: {e}") \ No newline at end of file + raise DatabaseOperationError(f"Failed to retrieve raw trades: {e}") + + def cleanup_old_raw_data(self, days_to_keep: int = 7) -> int: + """ + Clean up old raw data to prevent table bloat. + + Args: + days_to_keep: Number of days to retain data. + + Returns: + Number of records deleted. + """ + try: + cutoff_date = datetime.now(datetime.timezone.utc) - timedelta(days=days_to_keep) + + with self.get_session() as session: + result = session.execute( + text("DELETE FROM raw_trades WHERE created_at < :cutoff_date"), + {"cutoff_date": cutoff_date} + ) + deleted_count = result.rowcount + session.commit() + + self.log_info(f"Cleaned up {deleted_count} old raw data records") + return deleted_count + except Exception as e: + self.log_error(f"Failed to cleanup raw data: {e}") + raise DatabaseOperationError(f"Failed to cleanup raw data: {e}") + + def get_raw_data_stats(self) -> Dict[str, Any]: + """Get statistics about raw data storage.""" + try: + with self.get_session() as session: + result = session.execute(text(""" + SELECT + COUNT(*) as total_records, + COUNT(DISTINCT symbol) as unique_symbols, + COUNT(DISTINCT data_type) as data_types, + MIN(created_at) as oldest_record, + MAX(created_at) as newest_record, + pg_size_pretty(pg_total_relation_size('raw_trades')) as table_size + FROM raw_trades + """)).fetchone() + + if result: + return dict(result._mapping) + + return {"status": "No data available"} + + except Exception as e: + self.log_error(f"Failed to get raw data stats: {e}") + raise DatabaseOperationError(f"Failed to get raw data stats: {e}") \ No newline at end of file diff --git a/database/schema.sql b/database/schema.sql deleted file mode 100644 index 88deda2..0000000 --- a/database/schema.sql +++ /dev/null @@ -1,329 +0,0 @@ --- Database Schema for Crypto Trading Bot Platform --- Following PRD specifications with optimized schema for time-series data --- Version: 1.0 --- Author: Generated following PRD requirements - --- Create extensions -CREATE EXTENSION IF NOT EXISTS "uuid-ossp"; -CREATE EXTENSION IF NOT EXISTS "timescaledb" CASCADE; - --- Set timezone to UTC for consistency -SET timezone = 'UTC'; - --- ======================================== --- MARKET DATA TABLES --- ======================================== - --- OHLCV Market Data (primary table for bot operations) --- This is the main table that bots will use for trading decisions -CREATE TABLE market_data ( - id SERIAL PRIMARY KEY, - exchange VARCHAR(50) NOT NULL DEFAULT 'okx', - symbol VARCHAR(20) NOT NULL, - timeframe VARCHAR(5) NOT NULL, -- 1m, 5m, 15m, 1h, 4h, 1d - timestamp TIMESTAMPTZ NOT NULL, - open DECIMAL(18,8) NOT NULL, - high DECIMAL(18,8) NOT NULL, - low DECIMAL(18,8) NOT NULL, - close DECIMAL(18,8) NOT NULL, - volume DECIMAL(18,8) NOT NULL, - trades_count INTEGER, -- number of trades in this candle - created_at TIMESTAMPTZ DEFAULT NOW(), - CONSTRAINT unique_market_data UNIQUE(exchange, symbol, timeframe, timestamp) -); - --- Convert to hypertable for TimescaleDB optimization -SELECT create_hypertable('market_data', 'timestamp', if_not_exists => TRUE); - --- Create optimized indexes for market data -CREATE INDEX idx_market_data_lookup ON market_data(symbol, timeframe, timestamp); -CREATE INDEX idx_market_data_recent ON market_data(timestamp DESC) WHERE timestamp > NOW() - INTERVAL '7 days'; -CREATE INDEX idx_market_data_symbol ON market_data(symbol); -CREATE INDEX idx_market_data_timeframe ON market_data(timeframe); - --- Raw Trade Data (optional, for detailed backtesting only) --- This table is partitioned by timestamp for better performance -CREATE TABLE raw_trades ( - id SERIAL PRIMARY KEY, - exchange VARCHAR(50) NOT NULL DEFAULT 'okx', - symbol VARCHAR(20) NOT NULL, - timestamp TIMESTAMPTZ NOT NULL, - type VARCHAR(10) NOT NULL, -- trade, order, balance, tick, books - data JSONB NOT NULL, -- response from the exchange - created_at TIMESTAMPTZ DEFAULT NOW() -) PARTITION BY RANGE (timestamp); - --- Create initial partition for current month -CREATE TABLE raw_trades_current PARTITION OF raw_trades -FOR VALUES FROM (date_trunc('month', NOW())) TO (date_trunc('month', NOW()) + INTERVAL '1 month'); - --- Index for raw trades -CREATE INDEX idx_raw_trades_symbol_time ON raw_trades(symbol, timestamp); -CREATE INDEX idx_raw_trades_type ON raw_trades(type); - --- ======================================== --- BOT MANAGEMENT TABLES --- ======================================== - --- Bot Management (simplified) -CREATE TABLE bots ( - id SERIAL PRIMARY KEY, - name VARCHAR(100) NOT NULL, - strategy_name VARCHAR(50) NOT NULL, - symbol VARCHAR(20) NOT NULL, - timeframe VARCHAR(5) NOT NULL, - status VARCHAR(20) NOT NULL DEFAULT 'inactive', -- active, inactive, error, paused - config_file VARCHAR(200), -- path to JSON config - virtual_balance DECIMAL(18,8) DEFAULT 10000, - current_balance DECIMAL(18,8) DEFAULT 10000, - last_heartbeat TIMESTAMPTZ, - created_at TIMESTAMPTZ DEFAULT NOW(), - updated_at TIMESTAMPTZ DEFAULT NOW(), - CONSTRAINT chk_bot_status CHECK (status IN ('active', 'inactive', 'error', 'paused')) -); - --- Indexes for bot management -CREATE INDEX idx_bots_status ON bots(status); -CREATE INDEX idx_bots_symbol ON bots(symbol); -CREATE INDEX idx_bots_strategy ON bots(strategy_name); -CREATE INDEX idx_bots_last_heartbeat ON bots(last_heartbeat); - --- ======================================== --- TRADING SIGNAL TABLES --- ======================================== - --- Trading Signals (for analysis and debugging) -CREATE TABLE signals ( - id SERIAL PRIMARY KEY, - bot_id INTEGER REFERENCES bots(id) ON DELETE CASCADE, - timestamp TIMESTAMPTZ NOT NULL, - signal_type VARCHAR(10) NOT NULL, -- buy, sell, hold - price DECIMAL(18,8), - confidence DECIMAL(5,4), -- signal confidence score (0.0000 to 1.0000) - indicators JSONB, -- technical indicator values - created_at TIMESTAMPTZ DEFAULT NOW(), - CONSTRAINT chk_signal_type CHECK (signal_type IN ('buy', 'sell', 'hold')), - CONSTRAINT chk_confidence CHECK (confidence >= 0 AND confidence <= 1) -); - --- Convert signals to hypertable for TimescaleDB optimization -SELECT create_hypertable('signals', 'timestamp', if_not_exists => TRUE); - --- Indexes for signals -CREATE INDEX idx_signals_bot_time ON signals(bot_id, timestamp); -CREATE INDEX idx_signals_type ON signals(signal_type); -CREATE INDEX idx_signals_timestamp ON signals(timestamp); - --- ======================================== --- TRADE EXECUTION TABLES --- ======================================== - --- Trade Execution Records -CREATE TABLE trades ( - id SERIAL PRIMARY KEY, - bot_id INTEGER REFERENCES bots(id) ON DELETE CASCADE, - signal_id INTEGER REFERENCES signals(id) ON DELETE SET NULL, - timestamp TIMESTAMPTZ NOT NULL, - side VARCHAR(5) NOT NULL, -- buy, sell - price DECIMAL(18,8) NOT NULL, - quantity DECIMAL(18,8) NOT NULL, - fees DECIMAL(18,8) DEFAULT 0, - pnl DECIMAL(18,8), -- profit/loss for this trade - balance_after DECIMAL(18,8), -- portfolio balance after trade - created_at TIMESTAMPTZ DEFAULT NOW(), - CONSTRAINT chk_trade_side CHECK (side IN ('buy', 'sell')), - CONSTRAINT chk_positive_price CHECK (price > 0), - CONSTRAINT chk_positive_quantity CHECK (quantity > 0), - CONSTRAINT chk_non_negative_fees CHECK (fees >= 0) -); - --- Convert trades to hypertable for TimescaleDB optimization -SELECT create_hypertable('trades', 'timestamp', if_not_exists => TRUE); - --- Indexes for trades -CREATE INDEX idx_trades_bot_time ON trades(bot_id, timestamp); -CREATE INDEX idx_trades_side ON trades(side); -CREATE INDEX idx_trades_timestamp ON trades(timestamp); - --- ======================================== --- PERFORMANCE TRACKING TABLES --- ======================================== - --- Performance Snapshots (for plotting portfolio over time) -CREATE TABLE bot_performance ( - id SERIAL PRIMARY KEY, - bot_id INTEGER REFERENCES bots(id) ON DELETE CASCADE, - timestamp TIMESTAMPTZ NOT NULL, - total_value DECIMAL(18,8) NOT NULL, -- current portfolio value - cash_balance DECIMAL(18,8) NOT NULL, - crypto_balance DECIMAL(18,8) NOT NULL, - total_trades INTEGER DEFAULT 0, - winning_trades INTEGER DEFAULT 0, - total_fees DECIMAL(18,8) DEFAULT 0, - created_at TIMESTAMPTZ DEFAULT NOW(), - CONSTRAINT chk_non_negative_values CHECK ( - total_value >= 0 AND - cash_balance >= 0 AND - crypto_balance >= 0 AND - total_trades >= 0 AND - winning_trades >= 0 AND - total_fees >= 0 - ), - CONSTRAINT chk_winning_trades_logic CHECK (winning_trades <= total_trades) -); - --- Convert bot_performance to hypertable for TimescaleDB optimization -SELECT create_hypertable('bot_performance', 'timestamp', if_not_exists => TRUE); - --- Indexes for bot performance -CREATE INDEX idx_bot_performance_bot_time ON bot_performance(bot_id, timestamp); -CREATE INDEX idx_bot_performance_timestamp ON bot_performance(timestamp); - --- ======================================== --- FUNCTIONS AND TRIGGERS --- ======================================== - --- Function to update bot updated_at timestamp -CREATE OR REPLACE FUNCTION update_bot_timestamp() -RETURNS TRIGGER AS $$ -BEGIN - NEW.updated_at = NOW(); - RETURN NEW; -END; -$$ LANGUAGE plpgsql; - --- Trigger to automatically update bot updated_at -CREATE TRIGGER trigger_update_bot_timestamp - BEFORE UPDATE ON bots - FOR EACH ROW - EXECUTE FUNCTION update_bot_timestamp(); - --- Function to create monthly partition for raw_trades -CREATE OR REPLACE FUNCTION create_monthly_partition_for_raw_trades(partition_date DATE) -RETURNS VOID AS $$ -DECLARE - partition_name TEXT; - start_date DATE; - end_date DATE; -BEGIN - start_date := date_trunc('month', partition_date); - end_date := start_date + INTERVAL '1 month'; - partition_name := 'raw_trades_' || to_char(start_date, 'YYYY_MM'); - - EXECUTE format('CREATE TABLE IF NOT EXISTS %I PARTITION OF raw_trades - FOR VALUES FROM (%L) TO (%L)', - partition_name, start_date, end_date); -END; -$$ LANGUAGE plpgsql; - --- ======================================== --- VIEWS FOR COMMON QUERIES --- ======================================== - --- View for bot status overview -CREATE VIEW bot_status_overview AS -SELECT - b.id, - b.name, - b.strategy_name, - b.symbol, - b.status, - b.current_balance, - b.virtual_balance, - (b.current_balance - b.virtual_balance) as pnl, - b.last_heartbeat, - COUNT(t.id) as total_trades, - COALESCE(SUM(CASE WHEN t.pnl > 0 THEN 1 ELSE 0 END), 0) as winning_trades -FROM bots b -LEFT JOIN trades t ON b.id = t.bot_id -GROUP BY b.id, b.name, b.strategy_name, b.symbol, b.status, - b.current_balance, b.virtual_balance, b.last_heartbeat; - --- View for recent market data -CREATE VIEW recent_market_data AS -SELECT - symbol, - timeframe, - timestamp, - open, - high, - low, - close, - volume, - trades_count -FROM market_data -WHERE timestamp > NOW() - INTERVAL '24 hours' -ORDER BY symbol, timeframe, timestamp DESC; - --- View for trading performance summary -CREATE VIEW trading_performance_summary AS -SELECT - t.bot_id, - b.name as bot_name, - COUNT(t.id) as total_trades, - SUM(CASE WHEN t.pnl > 0 THEN 1 ELSE 0 END) as winning_trades, - ROUND((SUM(CASE WHEN t.pnl > 0 THEN 1 ELSE 0 END)::DECIMAL / COUNT(t.id)) * 100, 2) as win_rate_percent, - ROUND(SUM(t.pnl), 4) as total_pnl, - ROUND(SUM(t.fees), 4) as total_fees, - MIN(t.timestamp) as first_trade, - MAX(t.timestamp) as last_trade -FROM trades t -JOIN bots b ON t.bot_id = b.id -GROUP BY t.bot_id, b.name -ORDER BY total_pnl DESC; - --- ======================================== --- INITIAL DATA SEEDING --- ======================================== - --- Insert sample timeframes that the system supports -CREATE TABLE IF NOT EXISTS supported_timeframes ( - timeframe VARCHAR(5) PRIMARY KEY, - description VARCHAR(50), - minutes INTEGER -); - -INSERT INTO supported_timeframes (timeframe, description, minutes) VALUES -('1m', '1 Minute', 1), -('5m', '5 Minutes', 5), -('15m', '15 Minutes', 15), -('1h', '1 Hour', 60), -('4h', '4 Hours', 240), -('1d', '1 Day', 1440) -ON CONFLICT (timeframe) DO NOTHING; - --- Insert sample exchanges -CREATE TABLE IF NOT EXISTS supported_exchanges ( - exchange VARCHAR(50) PRIMARY KEY, - name VARCHAR(100), - api_url VARCHAR(200), - enabled BOOLEAN DEFAULT true -); - -INSERT INTO supported_exchanges (exchange, name, api_url, enabled) VALUES -('okx', 'OKX Exchange', 'https://www.okx.com/api/v5', true), -('binance', 'Binance Exchange', 'https://api.binance.com/api/v3', false), -('coinbase', 'Coinbase Pro', 'https://api.exchange.coinbase.com', false) -ON CONFLICT (exchange) DO NOTHING; - --- ======================================== --- COMMENTS FOR DOCUMENTATION --- ======================================== - -COMMENT ON TABLE market_data IS 'Primary OHLCV market data table optimized for bot operations and backtesting'; -COMMENT ON TABLE raw_trades IS 'Optional raw trade data for detailed backtesting (partitioned by month)'; -COMMENT ON TABLE bots IS 'Bot instance management with JSON configuration references'; -COMMENT ON TABLE signals IS 'Trading signals generated by strategies with confidence scores'; -COMMENT ON TABLE trades IS 'Virtual trade execution records with P&L tracking'; -COMMENT ON TABLE bot_performance IS 'Portfolio performance snapshots for visualization'; - -COMMENT ON COLUMN market_data.timestamp IS 'Right-aligned timestamp (candle close time) following exchange standards'; -COMMENT ON COLUMN bots.config_file IS 'Path to JSON configuration file for strategy parameters'; -COMMENT ON COLUMN signals.confidence IS 'Signal confidence score from 0.0000 to 1.0000'; -COMMENT ON COLUMN trades.pnl IS 'Profit/Loss for this specific trade in base currency'; -COMMENT ON COLUMN bot_performance.total_value IS 'Current total portfolio value (cash + crypto)'; - --- Grant permissions to dashboard user -GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO dashboard; -GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA public TO dashboard; -GRANT EXECUTE ON ALL FUNCTIONS IN SCHEMA public TO dashboard; \ No newline at end of file diff --git a/docs/modules/database_operations.md b/docs/modules/database_operations.md index fd52f0b..8f7f30d 100644 --- a/docs/modules/database_operations.md +++ b/docs/modules/database_operations.md @@ -342,6 +342,33 @@ trades = db.raw_trades.get_raw_trades( ) ``` +##### `cleanup_old_raw_data(days_to_keep: int = 7) -> int` + +Clean up old raw data to prevent table bloat. + +**Parameters:** +- `days_to_keep`: Number of days to retain raw data records. + +**Returns:** The number of records deleted. + +```python +# Clean up raw data older than 14 days +deleted_count = db.raw_trades.cleanup_old_raw_data(days_to_keep=14) +print(f"Deleted {deleted_count} old raw data records.") +``` + +##### `get_raw_data_stats() -> Dict[str, Any]` + +Get statistics about raw data storage. + +**Returns:** A dictionary with statistics like total records, table size, etc. + +```python +raw_stats = db.raw_trades.get_raw_data_stats() +print(f"Raw Trades Table Size: {raw_stats.get('table_size')}") +print(f"Total Raw Records: {raw_stats.get('total_records')}") +``` + ## Error Handling The database operations module includes comprehensive error handling with custom exceptions.