""" Redis Manager for Crypto Trading Bot Platform. Provides Redis connection, pub/sub messaging, and caching utilities. """ import asyncio import json import logging from contextlib import asynccontextmanager from typing import Any, Callable, Dict, List, Optional, Union, Type from pydantic_settings import BaseSettings import redis import redis.asyncio as redis_async from redis.exceptions import ConnectionError, RedisError, TimeoutError # Configure logging logger = logging.getLogger(__name__) class RedisConfig(BaseSettings): """Redis configuration class using Pydantic for validation.""" REDIS_HOST: str = 'localhost' REDIS_PORT: int = 6379 REDIS_PASSWORD: str = '' REDIS_DB: int = 0 # Connection settings REDIS_SOCKET_TIMEOUT: int = 5 REDIS_CONNECT_TIMEOUT: int = 5 REDIS_KEEPALIVE: bool = True # Pool settings REDIS_MAX_CONNECTIONS: int = 20 REDIS_RETRY_ON_TIMEOUT: bool = True # Channel prefixes for organization REDIS_CHANNEL_PREFIX: str = 'crypto_bot' model_config = { "env_file": ".env", "env_file_encoding": "utf-8", "case_sensitive": True, "extra": "ignore" } def get_connection_kwargs(self) -> Dict[str, Any]: """Get Redis connection configuration.""" kwargs = { 'host': self.REDIS_HOST, 'port': self.REDIS_PORT, 'db': self.REDIS_DB, 'socket_timeout': self.REDIS_SOCKET_TIMEOUT, 'socket_connect_timeout': self.REDIS_CONNECT_TIMEOUT, 'socket_keepalive': self.REDIS_KEEPALIVE, 'socket_keepalive_options': {}, 'retry_on_timeout': self.REDIS_RETRY_ON_TIMEOUT, 'decode_responses': True, } if self.REDIS_PASSWORD: kwargs['password'] = self.REDIS_PASSWORD return kwargs def get_pool_kwargs(self) -> Dict[str, Any]: """Get Redis connection pool configuration.""" kwargs = self.get_connection_kwargs() kwargs['max_connections'] = self.REDIS_MAX_CONNECTIONS return kwargs class RedisChannels: """Redis channel definitions for organized messaging.""" def __init__(self, prefix: str = 'crypto_bot'): self.prefix = prefix self.market_data = f"{prefix}:market_data" self.market_data_raw = f"{prefix}:market_data:raw" self.market_data_ohlcv = f"{prefix}:market_data:ohlcv" self.bot_signals = f"{prefix}:bot:signals" self.bot_trades = f"{prefix}:bot:trades" self.bot_status = f"{prefix}:bot:status" self.bot_performance = f"{prefix}:bot:performance" self.system_health = f"{prefix}:system:health" self.system_alerts = f"{prefix}:system:alerts" self.dashboard_updates = f"{prefix}:dashboard:updates" self.dashboard_commands = f"{prefix}:dashboard:commands" def get_symbol_channel(self, base_channel: str, symbol: str) -> str: """Get symbol-specific channel.""" return f"{base_channel}:{symbol}" def get_bot_channel(self, base_channel: str, bot_id: int) -> str: """Get bot-specific channel.""" return f"{base_channel}:{bot_id}" class BaseRedisManager: """Base class for Redis managers, handling config and channels.""" def __init__(self, config: Optional[RedisConfig] = None): self.config = config or RedisConfig() self.channels = RedisChannels(self.config.REDIS_CHANNEL_PREFIX) class SyncRedisManager(BaseRedisManager): """Synchronous Redis manager for standard operations.""" def __init__(self, config: Optional[RedisConfig] = None): super().__init__(config) self._connection_pool: Optional[redis.ConnectionPool] = None self._redis_client: Optional[redis.Redis] = None self._pubsub_client: Optional[redis.client.PubSub] = None self._message_handlers: Dict[str, List[Callable]] = {} def initialize(self) -> None: """Initialize synchronous Redis connection.""" try: logger.info("Initializing sync Redis connection...") self._connection_pool = redis.ConnectionPool(**self.config.get_pool_kwargs()) self._redis_client = redis.Redis(connection_pool=self._connection_pool) self._redis_client.ping() logger.info("Sync Redis connection initialized successfully.") except (ConnectionError, TimeoutError) as e: logger.error(f"Failed to initialize sync Redis: {e}") raise @property def client(self) -> redis.Redis: """Get synchronous Redis client.""" if not self._redis_client: raise RuntimeError("Sync Redis not initialized. Call initialize() first.") return self._redis_client def publish(self, channel: str, message: Union[str, Dict[str, Any]]) -> int: """Publish message to a channel.""" if isinstance(message, dict): message = json.dumps(message, default=str) return self.client.publish(channel, message) def set(self, key: str, value: Any, ex: Optional[int] = None) -> None: """Set a key-value pair with an optional expiration.""" self.client.set(key, json.dumps(value, default=str), ex=ex) def get(self, key: str) -> Optional[Any]: """Get a value by key.""" value = self.client.get(key) return json.loads(value) if value else None def delete(self, *keys: str) -> int: """Delete one or more keys.""" return self.client.delete(*keys) def close(self) -> None: """Close Redis connections.""" if self._connection_pool: self._connection_pool.disconnect() logger.info("Sync Redis connections closed.") class AsyncRedisManager(BaseRedisManager): """Asynchronous Redis manager for asyncio operations.""" def __init__(self, config: Optional[RedisConfig] = None): super().__init__(config) self._async_connection_pool: Optional[redis_async.ConnectionPool] = None self._async_redis_client: Optional[redis_async.Redis] = None self._async_pubsub_client: Optional[redis_async.client.PubSub] = None self._async_message_handlers: Dict[str, List[Callable]] = {} async def initialize(self) -> None: """Initialize asynchronous Redis connection.""" try: logger.info("Initializing async Redis connection...") self._async_connection_pool = redis_async.ConnectionPool(**self.config.get_pool_kwargs()) self._async_redis_client = redis_async.Redis(connection_pool=self._async_connection_pool) await self._async_redis_client.ping() logger.info("Async Redis connection initialized successfully.") except (ConnectionError, TimeoutError) as e: logger.error(f"Failed to initialize async Redis: {e}") raise @property def async_client(self) -> redis_async.Redis: """Get asynchronous Redis client.""" if not self._async_redis_client: raise RuntimeError("Async Redis not initialized. Call initialize() first.") return self._async_redis_client async def publish(self, channel: str, message: Union[str, Dict[str, Any]]) -> int: """Publish message to a channel asynchronously.""" if isinstance(message, dict): message = json.dumps(message, default=str) return await self.async_client.publish(channel, message) async def set(self, key: str, value: Any, ex: Optional[int] = None) -> None: """Set a key-value pair asynchronously.""" await self.async_client.set(key, json.dumps(value, default=str), ex=ex) async def get(self, key: str) -> Optional[Any]: """Get a value by key asynchronously.""" value = await self.async_client.get(key) return json.loads(value) if value else None async def delete(self, *keys: str) -> int: """Delete one or more keys asynchronously.""" return await self.async_client.delete(*keys) async def close(self) -> None: """Close async Redis connections.""" if self._async_connection_pool: await self._async_connection_pool.disconnect() logger.info("Async Redis connections closed.") # Global instances (to be managed carefully, e.g., via a factory or DI) sync_redis_manager = SyncRedisManager() async_redis_manager = AsyncRedisManager() def get_sync_redis_manager() -> SyncRedisManager: """Get the global synchronous Redis manager instance.""" return sync_redis_manager def get_async_redis_manager() -> AsyncRedisManager: """Get the global asynchronous Redis manager instance.""" return async_redis_manager def init_redis(config: Optional[RedisConfig] = None) -> SyncRedisManager: """ Initialize global sync Redis manager. Args: config: Optional Redis configuration. Returns: SyncRedisManager instance. """ global sync_redis_manager if config: sync_redis_manager = SyncRedisManager(config) sync_redis_manager.initialize() return sync_redis_manager async def init_redis_async(config: Optional[RedisConfig] = None) -> AsyncRedisManager: """ Initialize global async Redis manager. Args: config: Optional Redis configuration. Returns: AsyncRedisManager instance. """ global async_redis_manager if config: async_redis_manager = AsyncRedisManager(config) await async_redis_manager.initialize() return async_redis_manager # Convenience functions for common operations def publish_market_data(symbol: str, data: Dict[str, Any]) -> int: """Publish market data to symbol-specific channel.""" channel = sync_redis_manager.channels.get_symbol_channel(sync_redis_manager.channels.market_data_ohlcv, symbol) return sync_redis_manager.publish(channel, data) def publish_bot_signal(bot_id: int, signal_data: Dict[str, Any]) -> int: """Publish bot signal to bot-specific channel.""" channel = sync_redis_manager.channels.get_bot_channel(sync_redis_manager.channels.bot_signals, bot_id) return sync_redis_manager.publish(channel, signal_data) def publish_bot_trade(bot_id: int, trade_data: Dict[str, Any]) -> int: """Publish bot trade to bot-specific channel.""" channel = sync_redis_manager.channels.get_bot_channel(sync_redis_manager.channels.bot_trades, bot_id) return sync_redis_manager.publish(channel, trade_data) def publish_system_health(health_data: Dict[str, Any]) -> int: """Publish system health status.""" return sync_redis_manager.publish(sync_redis_manager.channels.system_health, health_data) def publish_dashboard_update(update_data: Dict[str, Any]) -> int: """Publish dashboard update.""" return sync_redis_manager.publish(sync_redis_manager.channels.dashboard_updates, update_data)