TCPDashboard/database/redis_manager.py

291 lines
11 KiB
Python
Raw Normal View History

"""
Redis Manager for Crypto Trading Bot Platform.
Provides Redis connection, pub/sub messaging, and caching utilities.
"""
import asyncio
import json
import logging
from contextlib import asynccontextmanager
from typing import Any, Callable, Dict, List, Optional, Union, Type
from pydantic_settings import BaseSettings
import redis
import redis.asyncio as redis_async
from redis.exceptions import ConnectionError, RedisError, TimeoutError
# Configure logging
logger = logging.getLogger(__name__)
class RedisConfig(BaseSettings):
"""Redis configuration class using Pydantic for validation."""
REDIS_HOST: str = 'localhost'
REDIS_PORT: int = 6379
REDIS_PASSWORD: str = ''
REDIS_DB: int = 0
# Connection settings
REDIS_SOCKET_TIMEOUT: int = 5
REDIS_CONNECT_TIMEOUT: int = 5
REDIS_KEEPALIVE: bool = True
# Pool settings
REDIS_MAX_CONNECTIONS: int = 20
REDIS_RETRY_ON_TIMEOUT: bool = True
# Channel prefixes for organization
REDIS_CHANNEL_PREFIX: str = 'crypto_bot'
model_config = {
"env_file": ".env",
"env_file_encoding": "utf-8",
"case_sensitive": True,
"extra": "ignore"
}
def get_connection_kwargs(self) -> Dict[str, Any]:
"""Get Redis connection configuration."""
kwargs = {
'host': self.REDIS_HOST,
'port': self.REDIS_PORT,
'db': self.REDIS_DB,
'socket_timeout': self.REDIS_SOCKET_TIMEOUT,
'socket_connect_timeout': self.REDIS_CONNECT_TIMEOUT,
'socket_keepalive': self.REDIS_KEEPALIVE,
'socket_keepalive_options': {},
'retry_on_timeout': self.REDIS_RETRY_ON_TIMEOUT,
'decode_responses': True,
}
if self.REDIS_PASSWORD:
kwargs['password'] = self.REDIS_PASSWORD
return kwargs
def get_pool_kwargs(self) -> Dict[str, Any]:
"""Get Redis connection pool configuration."""
kwargs = self.get_connection_kwargs()
kwargs['max_connections'] = self.REDIS_MAX_CONNECTIONS
return kwargs
class RedisChannels:
"""Redis channel definitions for organized messaging."""
def __init__(self, prefix: str = 'crypto_bot'):
self.prefix = prefix
self.market_data = f"{prefix}:market_data"
self.market_data_raw = f"{prefix}:market_data:raw"
self.market_data_ohlcv = f"{prefix}:market_data:ohlcv"
self.bot_signals = f"{prefix}:bot:signals"
self.bot_trades = f"{prefix}:bot:trades"
self.bot_status = f"{prefix}:bot:status"
self.bot_performance = f"{prefix}:bot:performance"
self.system_health = f"{prefix}:system:health"
self.system_alerts = f"{prefix}:system:alerts"
self.dashboard_updates = f"{prefix}:dashboard:updates"
self.dashboard_commands = f"{prefix}:dashboard:commands"
def get_symbol_channel(self, base_channel: str, symbol: str) -> str:
"""Get symbol-specific channel."""
return f"{base_channel}:{symbol}"
def get_bot_channel(self, base_channel: str, bot_id: int) -> str:
"""Get bot-specific channel."""
return f"{base_channel}:{bot_id}"
class BaseRedisManager:
"""Base class for Redis managers, handling config and channels."""
def __init__(self, config: Optional[RedisConfig] = None):
self.config = config or RedisConfig()
self.channels = RedisChannels(self.config.REDIS_CHANNEL_PREFIX)
class SyncRedisManager(BaseRedisManager):
"""Synchronous Redis manager for standard operations."""
def __init__(self, config: Optional[RedisConfig] = None):
super().__init__(config)
self._connection_pool: Optional[redis.ConnectionPool] = None
self._redis_client: Optional[redis.Redis] = None
self._pubsub_client: Optional[redis.client.PubSub] = None
self._message_handlers: Dict[str, List[Callable]] = {}
def initialize(self) -> None:
"""Initialize synchronous Redis connection."""
try:
logger.info("Initializing sync Redis connection...")
self._connection_pool = redis.ConnectionPool(**self.config.get_pool_kwargs())
self._redis_client = redis.Redis(connection_pool=self._connection_pool)
self._redis_client.ping()
logger.info("Sync Redis connection initialized successfully.")
except (ConnectionError, TimeoutError) as e:
logger.error(f"Failed to initialize sync Redis: {e}")
raise
@property
def client(self) -> redis.Redis:
"""Get synchronous Redis client."""
if not self._redis_client:
raise RuntimeError("Sync Redis not initialized. Call initialize() first.")
return self._redis_client
def publish(self, channel: str, message: Union[str, Dict[str, Any]]) -> int:
"""Publish message to a channel."""
if isinstance(message, dict):
message = json.dumps(message, default=str)
return self.client.publish(channel, message)
def set(self, key: str, value: Any, ex: Optional[int] = None) -> None:
"""Set a key-value pair with an optional expiration."""
self.client.set(key, json.dumps(value, default=str), ex=ex)
def get(self, key: str) -> Optional[Any]:
"""Get a value by key."""
value = self.client.get(key)
return json.loads(value) if value else None
def delete(self, *keys: str) -> int:
"""Delete one or more keys."""
return self.client.delete(*keys)
def close(self) -> None:
"""Close Redis connections."""
if self._connection_pool:
self._connection_pool.disconnect()
logger.info("Sync Redis connections closed.")
class AsyncRedisManager(BaseRedisManager):
"""Asynchronous Redis manager for asyncio operations."""
def __init__(self, config: Optional[RedisConfig] = None):
super().__init__(config)
self._async_connection_pool: Optional[redis_async.ConnectionPool] = None
self._async_redis_client: Optional[redis_async.Redis] = None
self._async_pubsub_client: Optional[redis_async.client.PubSub] = None
self._async_message_handlers: Dict[str, List[Callable]] = {}
async def initialize(self) -> None:
"""Initialize asynchronous Redis connection."""
try:
logger.info("Initializing async Redis connection...")
self._async_connection_pool = redis_async.ConnectionPool(**self.config.get_pool_kwargs())
self._async_redis_client = redis_async.Redis(connection_pool=self._async_connection_pool)
await self._async_redis_client.ping()
logger.info("Async Redis connection initialized successfully.")
except (ConnectionError, TimeoutError) as e:
logger.error(f"Failed to initialize async Redis: {e}")
raise
@property
def async_client(self) -> redis_async.Redis:
"""Get asynchronous Redis client."""
if not self._async_redis_client:
raise RuntimeError("Async Redis not initialized. Call initialize() first.")
return self._async_redis_client
async def publish(self, channel: str, message: Union[str, Dict[str, Any]]) -> int:
"""Publish message to a channel asynchronously."""
if isinstance(message, dict):
message = json.dumps(message, default=str)
return await self.async_client.publish(channel, message)
async def set(self, key: str, value: Any, ex: Optional[int] = None) -> None:
"""Set a key-value pair asynchronously."""
await self.async_client.set(key, json.dumps(value, default=str), ex=ex)
async def get(self, key: str) -> Optional[Any]:
"""Get a value by key asynchronously."""
value = await self.async_client.get(key)
return json.loads(value) if value else None
async def delete(self, *keys: str) -> int:
"""Delete one or more keys asynchronously."""
return await self.async_client.delete(*keys)
async def close(self) -> None:
"""Close async Redis connections."""
if self._async_connection_pool:
await self._async_connection_pool.disconnect()
logger.info("Async Redis connections closed.")
# Global instances (to be managed carefully, e.g., via a factory or DI)
sync_redis_manager = SyncRedisManager()
async_redis_manager = AsyncRedisManager()
def get_sync_redis_manager() -> SyncRedisManager:
"""Get the global synchronous Redis manager instance."""
return sync_redis_manager
def get_async_redis_manager() -> AsyncRedisManager:
"""Get the global asynchronous Redis manager instance."""
return async_redis_manager
def init_redis(config: Optional[RedisConfig] = None) -> SyncRedisManager:
"""
Initialize global sync Redis manager.
Args:
config: Optional Redis configuration.
Returns:
SyncRedisManager instance.
"""
global sync_redis_manager
if config:
sync_redis_manager = SyncRedisManager(config)
sync_redis_manager.initialize()
return sync_redis_manager
async def init_redis_async(config: Optional[RedisConfig] = None) -> AsyncRedisManager:
"""
Initialize global async Redis manager.
Args:
config: Optional Redis configuration.
Returns:
AsyncRedisManager instance.
"""
global async_redis_manager
if config:
async_redis_manager = AsyncRedisManager(config)
await async_redis_manager.initialize()
return async_redis_manager
# Convenience functions for common operations
def publish_market_data(symbol: str, data: Dict[str, Any]) -> int:
"""Publish market data to symbol-specific channel."""
channel = sync_redis_manager.channels.get_symbol_channel(sync_redis_manager.channels.market_data_ohlcv, symbol)
return sync_redis_manager.publish(channel, data)
def publish_bot_signal(bot_id: int, signal_data: Dict[str, Any]) -> int:
"""Publish bot signal to bot-specific channel."""
channel = sync_redis_manager.channels.get_bot_channel(sync_redis_manager.channels.bot_signals, bot_id)
return sync_redis_manager.publish(channel, signal_data)
def publish_bot_trade(bot_id: int, trade_data: Dict[str, Any]) -> int:
"""Publish bot trade to bot-specific channel."""
channel = sync_redis_manager.channels.get_bot_channel(sync_redis_manager.channels.bot_trades, bot_id)
return sync_redis_manager.publish(channel, trade_data)
def publish_system_health(health_data: Dict[str, Any]) -> int:
"""Publish system health status."""
return sync_redis_manager.publish(sync_redis_manager.channels.system_health, health_data)
def publish_dashboard_update(update_data: Dict[str, Any]) -> int:
"""Publish dashboard update."""
return sync_redis_manager.publish(sync_redis_manager.channels.dashboard_updates, update_data)