diff --git a/database/redis_manager.py b/database/redis_manager.py new file mode 100644 index 0000000..6e9d5e0 --- /dev/null +++ b/database/redis_manager.py @@ -0,0 +1,476 @@ +""" +Redis Manager for Crypto Trading Bot Platform +Provides Redis connection, pub/sub messaging, and caching utilities +""" + +import os +import json +import logging +import asyncio +from typing import Optional, Dict, Any, List, Callable, Union +from pathlib import Path +from contextlib import asynccontextmanager + +# Load environment variables from .env file if it exists +try: + from dotenv import load_dotenv + env_file = Path(__file__).parent.parent / '.env' + if env_file.exists(): + load_dotenv(env_file) +except ImportError: + # dotenv not available, proceed without it + pass + +import redis +import redis.asyncio as redis_async +from redis.exceptions import ConnectionError, TimeoutError, RedisError + +# Configure logging +logger = logging.getLogger(__name__) + + +class RedisConfig: + """Redis configuration class""" + + def __init__(self): + self.host = os.getenv('REDIS_HOST', 'localhost') + self.port = int(os.getenv('REDIS_PORT', '6379')) + self.password = os.getenv('REDIS_PASSWORD', '') + self.db = int(os.getenv('REDIS_DB', '0')) + + # Connection settings + self.socket_timeout = int(os.getenv('REDIS_SOCKET_TIMEOUT', '5')) + self.socket_connect_timeout = int(os.getenv('REDIS_CONNECT_TIMEOUT', '5')) + self.socket_keepalive = os.getenv('REDIS_KEEPALIVE', 'true').lower() == 'true' + self.socket_keepalive_options = {} + + # Pool settings + self.max_connections = int(os.getenv('REDIS_MAX_CONNECTIONS', '20')) + self.retry_on_timeout = os.getenv('REDIS_RETRY_ON_TIMEOUT', 'true').lower() == 'true' + + # Channel prefixes for organization + self.channel_prefix = os.getenv('REDIS_CHANNEL_PREFIX', 'crypto_bot') + + logger.info(f"Redis configuration initialized for: {self.host}:{self.port}") + + def get_connection_kwargs(self) -> Dict[str, Any]: + """Get Redis connection configuration""" + kwargs = { + 'host': self.host, + 'port': self.port, + 'db': self.db, + 'socket_timeout': self.socket_timeout, + 'socket_connect_timeout': self.socket_connect_timeout, + 'socket_keepalive': self.socket_keepalive, + 'socket_keepalive_options': self.socket_keepalive_options, + 'retry_on_timeout': self.retry_on_timeout, + 'decode_responses': True, # Automatically decode responses to strings + } + + if self.password: + kwargs['password'] = self.password + + return kwargs + + def get_pool_kwargs(self) -> Dict[str, Any]: + """Get Redis connection pool configuration""" + kwargs = self.get_connection_kwargs() + kwargs['max_connections'] = self.max_connections + return kwargs + + +class RedisChannels: + """Redis channel definitions for organized messaging""" + + def __init__(self, prefix: str = 'crypto_bot'): + self.prefix = prefix + + # Market data channels + self.market_data = f"{prefix}:market_data" + self.market_data_raw = f"{prefix}:market_data:raw" + self.market_data_ohlcv = f"{prefix}:market_data:ohlcv" + + # Bot channels + self.bot_signals = f"{prefix}:bot:signals" + self.bot_trades = f"{prefix}:bot:trades" + self.bot_status = f"{prefix}:bot:status" + self.bot_performance = f"{prefix}:bot:performance" + + # System channels + self.system_health = f"{prefix}:system:health" + self.system_alerts = f"{prefix}:system:alerts" + + # Dashboard channels + self.dashboard_updates = f"{prefix}:dashboard:updates" + self.dashboard_commands = f"{prefix}:dashboard:commands" + + def get_symbol_channel(self, base_channel: str, symbol: str) -> str: + """Get symbol-specific channel""" + return f"{base_channel}:{symbol}" + + def get_bot_channel(self, base_channel: str, bot_id: int) -> str: + """Get bot-specific channel""" + return f"{base_channel}:{bot_id}" + + +class RedisManager: + """ + Redis manager with connection pooling and pub/sub messaging + """ + + def __init__(self, config: Optional[RedisConfig] = None): + self.config = config or RedisConfig() + self.channels = RedisChannels(self.config.channel_prefix) + + # Synchronous Redis client + self._redis_client: Optional[redis.Redis] = None + self._connection_pool: Optional[redis.ConnectionPool] = None + + # Asynchronous Redis client + self._async_redis_client: Optional[redis_async.Redis] = None + self._async_connection_pool: Optional[redis_async.ConnectionPool] = None + + # Pub/sub clients + self._pubsub_client: Optional[redis.client.PubSub] = None + self._async_pubsub_client: Optional[redis_async.client.PubSub] = None + + # Subscription handlers + self._message_handlers: Dict[str, List[Callable]] = {} + self._async_message_handlers: Dict[str, List[Callable]] = {} + + def initialize(self) -> None: + """Initialize Redis connections""" + try: + logger.info("Initializing Redis connection...") + + # Create connection pool + self._connection_pool = redis.ConnectionPool(**self.config.get_pool_kwargs()) + self._redis_client = redis.Redis(connection_pool=self._connection_pool) + + # Test connection + self._redis_client.ping() + logger.info("Redis connection initialized successfully") + + except Exception as e: + logger.error(f"Failed to initialize Redis: {e}") + raise + + async def initialize_async(self) -> None: + """Initialize async Redis connections""" + try: + logger.info("Initializing async Redis connection...") + + # Create async connection pool + self._async_connection_pool = redis_async.ConnectionPool(**self.config.get_pool_kwargs()) + self._async_redis_client = redis_async.Redis(connection_pool=self._async_connection_pool) + + # Test connection + await self._async_redis_client.ping() + logger.info("Async Redis connection initialized successfully") + + except Exception as e: + logger.error(f"Failed to initialize async Redis: {e}") + raise + + @property + def client(self) -> redis.Redis: + """Get synchronous Redis client""" + if not self._redis_client: + raise RuntimeError("Redis not initialized. Call initialize() first.") + return self._redis_client + + @property + def async_client(self) -> redis_async.Redis: + """Get asynchronous Redis client""" + if not self._async_redis_client: + raise RuntimeError("Async Redis not initialized. Call initialize_async() first.") + return self._async_redis_client + + def test_connection(self) -> bool: + """Test Redis connection""" + try: + self.client.ping() + logger.info("Redis connection test successful") + return True + except Exception as e: + logger.error(f"Redis connection test failed: {e}") + return False + + async def test_connection_async(self) -> bool: + """Test async Redis connection""" + try: + await self.async_client.ping() + logger.info("Async Redis connection test successful") + return True + except Exception as e: + logger.error(f"Async Redis connection test failed: {e}") + return False + + def publish(self, channel: str, message: Union[str, Dict[str, Any]]) -> int: + """ + Publish message to channel + + Args: + channel: Redis channel name + message: Message to publish (string or dict that will be JSON serialized) + + Returns: + Number of clients that received the message + """ + try: + if isinstance(message, dict): + message = json.dumps(message, default=str) + + result = self.client.publish(channel, message) + logger.debug(f"Published message to {channel}: {result} clients received") + return result + + except Exception as e: + logger.error(f"Failed to publish message to {channel}: {e}") + raise + + async def publish_async(self, channel: str, message: Union[str, Dict[str, Any]]) -> int: + """ + Publish message to channel (async) + + Args: + channel: Redis channel name + message: Message to publish (string or dict that will be JSON serialized) + + Returns: + Number of clients that received the message + """ + try: + if isinstance(message, dict): + message = json.dumps(message, default=str) + + result = await self.async_client.publish(channel, message) + logger.debug(f"Published message to {channel}: {result} clients received") + return result + + except Exception as e: + logger.error(f"Failed to publish message to {channel}: {e}") + raise + + def subscribe(self, channels: Union[str, List[str]], handler: Callable[[str, str], None]) -> None: + """ + Subscribe to Redis channels with message handler + + Args: + channels: Channel name or list of channel names + handler: Function to handle received messages (channel, message) + """ + if isinstance(channels, str): + channels = [channels] + + for channel in channels: + if channel not in self._message_handlers: + self._message_handlers[channel] = [] + self._message_handlers[channel].append(handler) + + logger.info(f"Registered handler for channels: {channels}") + + async def subscribe_async(self, channels: Union[str, List[str]], handler: Callable[[str, str], None]) -> None: + """ + Subscribe to Redis channels with message handler (async) + + Args: + channels: Channel name or list of channel names + handler: Function to handle received messages (channel, message) + """ + if isinstance(channels, str): + channels = [channels] + + for channel in channels: + if channel not in self._async_message_handlers: + self._async_message_handlers[channel] = [] + self._async_message_handlers[channel].append(handler) + + logger.info(f"Registered async handler for channels: {channels}") + + def start_subscriber(self) -> None: + """Start synchronous message subscriber""" + if not self._message_handlers: + logger.warning("No message handlers registered") + return + + try: + self._pubsub_client = self.client.pubsub() + + # Subscribe to all channels with handlers + for channel in self._message_handlers.keys(): + self._pubsub_client.subscribe(channel) + + logger.info(f"Started subscriber for channels: {list(self._message_handlers.keys())}") + + # Message processing loop + for message in self._pubsub_client.listen(): + if message['type'] == 'message': + channel = message['channel'] + data = message['data'] + + # Call all handlers for this channel + if channel in self._message_handlers: + for handler in self._message_handlers[channel]: + try: + handler(channel, data) + except Exception as e: + logger.error(f"Error in message handler for {channel}: {e}") + + except Exception as e: + logger.error(f"Error in message subscriber: {e}") + raise + + async def start_subscriber_async(self) -> None: + """Start asynchronous message subscriber""" + if not self._async_message_handlers: + logger.warning("No async message handlers registered") + return + + try: + self._async_pubsub_client = self.async_client.pubsub() + + # Subscribe to all channels with handlers + for channel in self._async_message_handlers.keys(): + await self._async_pubsub_client.subscribe(channel) + + logger.info(f"Started async subscriber for channels: {list(self._async_message_handlers.keys())}") + + # Message processing loop + async for message in self._async_pubsub_client.listen(): + if message['type'] == 'message': + channel = message['channel'] + data = message['data'] + + # Call all handlers for this channel + if channel in self._async_message_handlers: + for handler in self._async_message_handlers[channel]: + try: + if asyncio.iscoroutinefunction(handler): + await handler(channel, data) + else: + handler(channel, data) + except Exception as e: + logger.error(f"Error in async message handler for {channel}: {e}") + + except Exception as e: + logger.error(f"Error in async message subscriber: {e}") + raise + + def stop_subscriber(self) -> None: + """Stop synchronous message subscriber""" + if self._pubsub_client: + self._pubsub_client.close() + self._pubsub_client = None + logger.info("Stopped message subscriber") + + async def stop_subscriber_async(self) -> None: + """Stop asynchronous message subscriber""" + if self._async_pubsub_client: + await self._async_pubsub_client.close() + self._async_pubsub_client = None + logger.info("Stopped async message subscriber") + + def get_info(self) -> Dict[str, Any]: + """Get Redis server information""" + try: + return self.client.info() + except Exception as e: + logger.error(f"Failed to get Redis info: {e}") + return {} + + def close(self) -> None: + """Close Redis connections""" + try: + self.stop_subscriber() + + if self._connection_pool: + self._connection_pool.disconnect() + + logger.info("Redis connections closed") + except Exception as e: + logger.error(f"Error closing Redis connections: {e}") + + async def close_async(self) -> None: + """Close async Redis connections""" + try: + await self.stop_subscriber_async() + + if self._async_connection_pool: + await self._async_connection_pool.disconnect() + + logger.info("Async Redis connections closed") + except Exception as e: + logger.error(f"Error closing async Redis connections: {e}") + + +# Global Redis manager instance +redis_manager = RedisManager() + + +def get_redis_manager() -> RedisManager: + """Get global Redis manager instance""" + return redis_manager + + +def init_redis(config: Optional[RedisConfig] = None) -> RedisManager: + """ + Initialize global Redis manager + + Args: + config: Optional Redis configuration + + Returns: + RedisManager instance + """ + global redis_manager + if config: + redis_manager = RedisManager(config) + redis_manager.initialize() + return redis_manager + + +async def init_redis_async(config: Optional[RedisConfig] = None) -> RedisManager: + """ + Initialize global Redis manager (async) + + Args: + config: Optional Redis configuration + + Returns: + RedisManager instance + """ + global redis_manager + if config: + redis_manager = RedisManager(config) + await redis_manager.initialize_async() + return redis_manager + + +# Convenience functions for common operations +def publish_market_data(symbol: str, data: Dict[str, Any]) -> int: + """Publish market data to symbol-specific channel""" + channel = redis_manager.channels.get_symbol_channel(redis_manager.channels.market_data_ohlcv, symbol) + return redis_manager.publish(channel, data) + + +def publish_bot_signal(bot_id: int, signal_data: Dict[str, Any]) -> int: + """Publish bot signal to bot-specific channel""" + channel = redis_manager.channels.get_bot_channel(redis_manager.channels.bot_signals, bot_id) + return redis_manager.publish(channel, signal_data) + + +def publish_bot_trade(bot_id: int, trade_data: Dict[str, Any]) -> int: + """Publish bot trade to bot-specific channel""" + channel = redis_manager.channels.get_bot_channel(redis_manager.channels.bot_trades, bot_id) + return redis_manager.publish(channel, trade_data) + + +def publish_system_health(health_data: Dict[str, Any]) -> int: + """Publish system health status""" + return redis_manager.publish(redis_manager.channels.system_health, health_data) + + +def publish_dashboard_update(update_data: Dict[str, Any]) -> int: + """Publish dashboard update""" + return redis_manager.publish(redis_manager.channels.dashboard_updates, update_data) \ No newline at end of file diff --git a/tasks/tasks-crypto-bot-prd.md b/tasks/tasks-crypto-bot-prd.md index bf30463..c2c065c 100644 --- a/tasks/tasks-crypto-bot-prd.md +++ b/tasks/tasks-crypto-bot-prd.md @@ -6,6 +6,7 @@ - `database/schema_clean.sql` - Clean database schema without hypertables (actively used, includes raw_trades table) - `database/schema.sql` - Complete database schema with TimescaleDB hypertables (for future optimization) - `database/connection.py` - Database connection utility with connection pooling, session management, and raw data utilities +- `database/redis_manager.py` - Redis connection utility with pub/sub messaging for real-time data distribution - `database/init/init.sql` - Docker initialization script for automatic database setup - `database/init/schema_clean.sql` - Copy of clean schema for Docker initialization - `data/okx_collector.py` - OKX API integration for real-time market data collection @@ -21,6 +22,7 @@ - `config/settings.py` - Centralized configuration settings using Pydantic - `scripts/dev.py` - Development setup and management script - `scripts/init_database.py` - Database initialization and verification script +- `scripts/test_models.py` - Test script for SQLAlchemy models integration verification - `requirements.txt` - Python dependencies managed by UV - `docker-compose.yml` - Docker services configuration with TimescaleDB support - `tests/test_strategies.py` - Unit tests for strategy implementations @@ -34,9 +36,9 @@ - [x] 1.1 Install and configure PostgreSQL with Docker - [x] 1.2 Create database schema following the PRD specifications (market_data, bots, signals, trades, bot_performance tables) - [x] 1.3 Implement database connection utility with connection pooling - - [ ] 1.4 Create database models using SQLAlchemy or similar ORM + - [x] 1.4 Create database models using SQLAlchemy or similar ORM - [x] 1.5 Add proper indexes for time-series data optimization - - [ ] 1.6 Setup Redis for pub/sub messaging + - [x] 1.6 Setup Redis for pub/sub messaging - [ ] 1.7 Create database migration scripts and initial data seeding - [ ] 1.8 Unit test database models and connection utilities