Add Redis connection utility for pub/sub messaging
- Introduced `database/redis_manager.py` to manage Redis connections, including synchronous and asynchronous clients. - Implemented pub/sub messaging capabilities for real-time data distribution, with structured channel definitions for market data, bot signals, and system health. - Added configuration options for Redis connection pooling and error handling, ensuring robust integration with the Crypto Trading Bot Platform.
This commit is contained in:
parent
73b7e8bb9d
commit
dd75546508
476
database/redis_manager.py
Normal file
476
database/redis_manager.py
Normal file
@ -0,0 +1,476 @@
|
||||
"""
|
||||
Redis Manager for Crypto Trading Bot Platform
|
||||
Provides Redis connection, pub/sub messaging, and caching utilities
|
||||
"""
|
||||
|
||||
import os
|
||||
import json
|
||||
import logging
|
||||
import asyncio
|
||||
from typing import Optional, Dict, Any, List, Callable, Union
|
||||
from pathlib import Path
|
||||
from contextlib import asynccontextmanager
|
||||
|
||||
# Load environment variables from .env file if it exists
|
||||
try:
|
||||
from dotenv import load_dotenv
|
||||
env_file = Path(__file__).parent.parent / '.env'
|
||||
if env_file.exists():
|
||||
load_dotenv(env_file)
|
||||
except ImportError:
|
||||
# dotenv not available, proceed without it
|
||||
pass
|
||||
|
||||
import redis
|
||||
import redis.asyncio as redis_async
|
||||
from redis.exceptions import ConnectionError, TimeoutError, RedisError
|
||||
|
||||
# Configure logging
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class RedisConfig:
|
||||
"""Redis configuration class"""
|
||||
|
||||
def __init__(self):
|
||||
self.host = os.getenv('REDIS_HOST', 'localhost')
|
||||
self.port = int(os.getenv('REDIS_PORT', '6379'))
|
||||
self.password = os.getenv('REDIS_PASSWORD', '')
|
||||
self.db = int(os.getenv('REDIS_DB', '0'))
|
||||
|
||||
# Connection settings
|
||||
self.socket_timeout = int(os.getenv('REDIS_SOCKET_TIMEOUT', '5'))
|
||||
self.socket_connect_timeout = int(os.getenv('REDIS_CONNECT_TIMEOUT', '5'))
|
||||
self.socket_keepalive = os.getenv('REDIS_KEEPALIVE', 'true').lower() == 'true'
|
||||
self.socket_keepalive_options = {}
|
||||
|
||||
# Pool settings
|
||||
self.max_connections = int(os.getenv('REDIS_MAX_CONNECTIONS', '20'))
|
||||
self.retry_on_timeout = os.getenv('REDIS_RETRY_ON_TIMEOUT', 'true').lower() == 'true'
|
||||
|
||||
# Channel prefixes for organization
|
||||
self.channel_prefix = os.getenv('REDIS_CHANNEL_PREFIX', 'crypto_bot')
|
||||
|
||||
logger.info(f"Redis configuration initialized for: {self.host}:{self.port}")
|
||||
|
||||
def get_connection_kwargs(self) -> Dict[str, Any]:
|
||||
"""Get Redis connection configuration"""
|
||||
kwargs = {
|
||||
'host': self.host,
|
||||
'port': self.port,
|
||||
'db': self.db,
|
||||
'socket_timeout': self.socket_timeout,
|
||||
'socket_connect_timeout': self.socket_connect_timeout,
|
||||
'socket_keepalive': self.socket_keepalive,
|
||||
'socket_keepalive_options': self.socket_keepalive_options,
|
||||
'retry_on_timeout': self.retry_on_timeout,
|
||||
'decode_responses': True, # Automatically decode responses to strings
|
||||
}
|
||||
|
||||
if self.password:
|
||||
kwargs['password'] = self.password
|
||||
|
||||
return kwargs
|
||||
|
||||
def get_pool_kwargs(self) -> Dict[str, Any]:
|
||||
"""Get Redis connection pool configuration"""
|
||||
kwargs = self.get_connection_kwargs()
|
||||
kwargs['max_connections'] = self.max_connections
|
||||
return kwargs
|
||||
|
||||
|
||||
class RedisChannels:
|
||||
"""Redis channel definitions for organized messaging"""
|
||||
|
||||
def __init__(self, prefix: str = 'crypto_bot'):
|
||||
self.prefix = prefix
|
||||
|
||||
# Market data channels
|
||||
self.market_data = f"{prefix}:market_data"
|
||||
self.market_data_raw = f"{prefix}:market_data:raw"
|
||||
self.market_data_ohlcv = f"{prefix}:market_data:ohlcv"
|
||||
|
||||
# Bot channels
|
||||
self.bot_signals = f"{prefix}:bot:signals"
|
||||
self.bot_trades = f"{prefix}:bot:trades"
|
||||
self.bot_status = f"{prefix}:bot:status"
|
||||
self.bot_performance = f"{prefix}:bot:performance"
|
||||
|
||||
# System channels
|
||||
self.system_health = f"{prefix}:system:health"
|
||||
self.system_alerts = f"{prefix}:system:alerts"
|
||||
|
||||
# Dashboard channels
|
||||
self.dashboard_updates = f"{prefix}:dashboard:updates"
|
||||
self.dashboard_commands = f"{prefix}:dashboard:commands"
|
||||
|
||||
def get_symbol_channel(self, base_channel: str, symbol: str) -> str:
|
||||
"""Get symbol-specific channel"""
|
||||
return f"{base_channel}:{symbol}"
|
||||
|
||||
def get_bot_channel(self, base_channel: str, bot_id: int) -> str:
|
||||
"""Get bot-specific channel"""
|
||||
return f"{base_channel}:{bot_id}"
|
||||
|
||||
|
||||
class RedisManager:
|
||||
"""
|
||||
Redis manager with connection pooling and pub/sub messaging
|
||||
"""
|
||||
|
||||
def __init__(self, config: Optional[RedisConfig] = None):
|
||||
self.config = config or RedisConfig()
|
||||
self.channels = RedisChannels(self.config.channel_prefix)
|
||||
|
||||
# Synchronous Redis client
|
||||
self._redis_client: Optional[redis.Redis] = None
|
||||
self._connection_pool: Optional[redis.ConnectionPool] = None
|
||||
|
||||
# Asynchronous Redis client
|
||||
self._async_redis_client: Optional[redis_async.Redis] = None
|
||||
self._async_connection_pool: Optional[redis_async.ConnectionPool] = None
|
||||
|
||||
# Pub/sub clients
|
||||
self._pubsub_client: Optional[redis.client.PubSub] = None
|
||||
self._async_pubsub_client: Optional[redis_async.client.PubSub] = None
|
||||
|
||||
# Subscription handlers
|
||||
self._message_handlers: Dict[str, List[Callable]] = {}
|
||||
self._async_message_handlers: Dict[str, List[Callable]] = {}
|
||||
|
||||
def initialize(self) -> None:
|
||||
"""Initialize Redis connections"""
|
||||
try:
|
||||
logger.info("Initializing Redis connection...")
|
||||
|
||||
# Create connection pool
|
||||
self._connection_pool = redis.ConnectionPool(**self.config.get_pool_kwargs())
|
||||
self._redis_client = redis.Redis(connection_pool=self._connection_pool)
|
||||
|
||||
# Test connection
|
||||
self._redis_client.ping()
|
||||
logger.info("Redis connection initialized successfully")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to initialize Redis: {e}")
|
||||
raise
|
||||
|
||||
async def initialize_async(self) -> None:
|
||||
"""Initialize async Redis connections"""
|
||||
try:
|
||||
logger.info("Initializing async Redis connection...")
|
||||
|
||||
# Create async connection pool
|
||||
self._async_connection_pool = redis_async.ConnectionPool(**self.config.get_pool_kwargs())
|
||||
self._async_redis_client = redis_async.Redis(connection_pool=self._async_connection_pool)
|
||||
|
||||
# Test connection
|
||||
await self._async_redis_client.ping()
|
||||
logger.info("Async Redis connection initialized successfully")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to initialize async Redis: {e}")
|
||||
raise
|
||||
|
||||
@property
|
||||
def client(self) -> redis.Redis:
|
||||
"""Get synchronous Redis client"""
|
||||
if not self._redis_client:
|
||||
raise RuntimeError("Redis not initialized. Call initialize() first.")
|
||||
return self._redis_client
|
||||
|
||||
@property
|
||||
def async_client(self) -> redis_async.Redis:
|
||||
"""Get asynchronous Redis client"""
|
||||
if not self._async_redis_client:
|
||||
raise RuntimeError("Async Redis not initialized. Call initialize_async() first.")
|
||||
return self._async_redis_client
|
||||
|
||||
def test_connection(self) -> bool:
|
||||
"""Test Redis connection"""
|
||||
try:
|
||||
self.client.ping()
|
||||
logger.info("Redis connection test successful")
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"Redis connection test failed: {e}")
|
||||
return False
|
||||
|
||||
async def test_connection_async(self) -> bool:
|
||||
"""Test async Redis connection"""
|
||||
try:
|
||||
await self.async_client.ping()
|
||||
logger.info("Async Redis connection test successful")
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"Async Redis connection test failed: {e}")
|
||||
return False
|
||||
|
||||
def publish(self, channel: str, message: Union[str, Dict[str, Any]]) -> int:
|
||||
"""
|
||||
Publish message to channel
|
||||
|
||||
Args:
|
||||
channel: Redis channel name
|
||||
message: Message to publish (string or dict that will be JSON serialized)
|
||||
|
||||
Returns:
|
||||
Number of clients that received the message
|
||||
"""
|
||||
try:
|
||||
if isinstance(message, dict):
|
||||
message = json.dumps(message, default=str)
|
||||
|
||||
result = self.client.publish(channel, message)
|
||||
logger.debug(f"Published message to {channel}: {result} clients received")
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to publish message to {channel}: {e}")
|
||||
raise
|
||||
|
||||
async def publish_async(self, channel: str, message: Union[str, Dict[str, Any]]) -> int:
|
||||
"""
|
||||
Publish message to channel (async)
|
||||
|
||||
Args:
|
||||
channel: Redis channel name
|
||||
message: Message to publish (string or dict that will be JSON serialized)
|
||||
|
||||
Returns:
|
||||
Number of clients that received the message
|
||||
"""
|
||||
try:
|
||||
if isinstance(message, dict):
|
||||
message = json.dumps(message, default=str)
|
||||
|
||||
result = await self.async_client.publish(channel, message)
|
||||
logger.debug(f"Published message to {channel}: {result} clients received")
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to publish message to {channel}: {e}")
|
||||
raise
|
||||
|
||||
def subscribe(self, channels: Union[str, List[str]], handler: Callable[[str, str], None]) -> None:
|
||||
"""
|
||||
Subscribe to Redis channels with message handler
|
||||
|
||||
Args:
|
||||
channels: Channel name or list of channel names
|
||||
handler: Function to handle received messages (channel, message)
|
||||
"""
|
||||
if isinstance(channels, str):
|
||||
channels = [channels]
|
||||
|
||||
for channel in channels:
|
||||
if channel not in self._message_handlers:
|
||||
self._message_handlers[channel] = []
|
||||
self._message_handlers[channel].append(handler)
|
||||
|
||||
logger.info(f"Registered handler for channels: {channels}")
|
||||
|
||||
async def subscribe_async(self, channels: Union[str, List[str]], handler: Callable[[str, str], None]) -> None:
|
||||
"""
|
||||
Subscribe to Redis channels with message handler (async)
|
||||
|
||||
Args:
|
||||
channels: Channel name or list of channel names
|
||||
handler: Function to handle received messages (channel, message)
|
||||
"""
|
||||
if isinstance(channels, str):
|
||||
channels = [channels]
|
||||
|
||||
for channel in channels:
|
||||
if channel not in self._async_message_handlers:
|
||||
self._async_message_handlers[channel] = []
|
||||
self._async_message_handlers[channel].append(handler)
|
||||
|
||||
logger.info(f"Registered async handler for channels: {channels}")
|
||||
|
||||
def start_subscriber(self) -> None:
|
||||
"""Start synchronous message subscriber"""
|
||||
if not self._message_handlers:
|
||||
logger.warning("No message handlers registered")
|
||||
return
|
||||
|
||||
try:
|
||||
self._pubsub_client = self.client.pubsub()
|
||||
|
||||
# Subscribe to all channels with handlers
|
||||
for channel in self._message_handlers.keys():
|
||||
self._pubsub_client.subscribe(channel)
|
||||
|
||||
logger.info(f"Started subscriber for channels: {list(self._message_handlers.keys())}")
|
||||
|
||||
# Message processing loop
|
||||
for message in self._pubsub_client.listen():
|
||||
if message['type'] == 'message':
|
||||
channel = message['channel']
|
||||
data = message['data']
|
||||
|
||||
# Call all handlers for this channel
|
||||
if channel in self._message_handlers:
|
||||
for handler in self._message_handlers[channel]:
|
||||
try:
|
||||
handler(channel, data)
|
||||
except Exception as e:
|
||||
logger.error(f"Error in message handler for {channel}: {e}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error in message subscriber: {e}")
|
||||
raise
|
||||
|
||||
async def start_subscriber_async(self) -> None:
|
||||
"""Start asynchronous message subscriber"""
|
||||
if not self._async_message_handlers:
|
||||
logger.warning("No async message handlers registered")
|
||||
return
|
||||
|
||||
try:
|
||||
self._async_pubsub_client = self.async_client.pubsub()
|
||||
|
||||
# Subscribe to all channels with handlers
|
||||
for channel in self._async_message_handlers.keys():
|
||||
await self._async_pubsub_client.subscribe(channel)
|
||||
|
||||
logger.info(f"Started async subscriber for channels: {list(self._async_message_handlers.keys())}")
|
||||
|
||||
# Message processing loop
|
||||
async for message in self._async_pubsub_client.listen():
|
||||
if message['type'] == 'message':
|
||||
channel = message['channel']
|
||||
data = message['data']
|
||||
|
||||
# Call all handlers for this channel
|
||||
if channel in self._async_message_handlers:
|
||||
for handler in self._async_message_handlers[channel]:
|
||||
try:
|
||||
if asyncio.iscoroutinefunction(handler):
|
||||
await handler(channel, data)
|
||||
else:
|
||||
handler(channel, data)
|
||||
except Exception as e:
|
||||
logger.error(f"Error in async message handler for {channel}: {e}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error in async message subscriber: {e}")
|
||||
raise
|
||||
|
||||
def stop_subscriber(self) -> None:
|
||||
"""Stop synchronous message subscriber"""
|
||||
if self._pubsub_client:
|
||||
self._pubsub_client.close()
|
||||
self._pubsub_client = None
|
||||
logger.info("Stopped message subscriber")
|
||||
|
||||
async def stop_subscriber_async(self) -> None:
|
||||
"""Stop asynchronous message subscriber"""
|
||||
if self._async_pubsub_client:
|
||||
await self._async_pubsub_client.close()
|
||||
self._async_pubsub_client = None
|
||||
logger.info("Stopped async message subscriber")
|
||||
|
||||
def get_info(self) -> Dict[str, Any]:
|
||||
"""Get Redis server information"""
|
||||
try:
|
||||
return self.client.info()
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to get Redis info: {e}")
|
||||
return {}
|
||||
|
||||
def close(self) -> None:
|
||||
"""Close Redis connections"""
|
||||
try:
|
||||
self.stop_subscriber()
|
||||
|
||||
if self._connection_pool:
|
||||
self._connection_pool.disconnect()
|
||||
|
||||
logger.info("Redis connections closed")
|
||||
except Exception as e:
|
||||
logger.error(f"Error closing Redis connections: {e}")
|
||||
|
||||
async def close_async(self) -> None:
|
||||
"""Close async Redis connections"""
|
||||
try:
|
||||
await self.stop_subscriber_async()
|
||||
|
||||
if self._async_connection_pool:
|
||||
await self._async_connection_pool.disconnect()
|
||||
|
||||
logger.info("Async Redis connections closed")
|
||||
except Exception as e:
|
||||
logger.error(f"Error closing async Redis connections: {e}")
|
||||
|
||||
|
||||
# Global Redis manager instance
|
||||
redis_manager = RedisManager()
|
||||
|
||||
|
||||
def get_redis_manager() -> RedisManager:
|
||||
"""Get global Redis manager instance"""
|
||||
return redis_manager
|
||||
|
||||
|
||||
def init_redis(config: Optional[RedisConfig] = None) -> RedisManager:
|
||||
"""
|
||||
Initialize global Redis manager
|
||||
|
||||
Args:
|
||||
config: Optional Redis configuration
|
||||
|
||||
Returns:
|
||||
RedisManager instance
|
||||
"""
|
||||
global redis_manager
|
||||
if config:
|
||||
redis_manager = RedisManager(config)
|
||||
redis_manager.initialize()
|
||||
return redis_manager
|
||||
|
||||
|
||||
async def init_redis_async(config: Optional[RedisConfig] = None) -> RedisManager:
|
||||
"""
|
||||
Initialize global Redis manager (async)
|
||||
|
||||
Args:
|
||||
config: Optional Redis configuration
|
||||
|
||||
Returns:
|
||||
RedisManager instance
|
||||
"""
|
||||
global redis_manager
|
||||
if config:
|
||||
redis_manager = RedisManager(config)
|
||||
await redis_manager.initialize_async()
|
||||
return redis_manager
|
||||
|
||||
|
||||
# Convenience functions for common operations
|
||||
def publish_market_data(symbol: str, data: Dict[str, Any]) -> int:
|
||||
"""Publish market data to symbol-specific channel"""
|
||||
channel = redis_manager.channels.get_symbol_channel(redis_manager.channels.market_data_ohlcv, symbol)
|
||||
return redis_manager.publish(channel, data)
|
||||
|
||||
|
||||
def publish_bot_signal(bot_id: int, signal_data: Dict[str, Any]) -> int:
|
||||
"""Publish bot signal to bot-specific channel"""
|
||||
channel = redis_manager.channels.get_bot_channel(redis_manager.channels.bot_signals, bot_id)
|
||||
return redis_manager.publish(channel, signal_data)
|
||||
|
||||
|
||||
def publish_bot_trade(bot_id: int, trade_data: Dict[str, Any]) -> int:
|
||||
"""Publish bot trade to bot-specific channel"""
|
||||
channel = redis_manager.channels.get_bot_channel(redis_manager.channels.bot_trades, bot_id)
|
||||
return redis_manager.publish(channel, trade_data)
|
||||
|
||||
|
||||
def publish_system_health(health_data: Dict[str, Any]) -> int:
|
||||
"""Publish system health status"""
|
||||
return redis_manager.publish(redis_manager.channels.system_health, health_data)
|
||||
|
||||
|
||||
def publish_dashboard_update(update_data: Dict[str, Any]) -> int:
|
||||
"""Publish dashboard update"""
|
||||
return redis_manager.publish(redis_manager.channels.dashboard_updates, update_data)
|
||||
@ -6,6 +6,7 @@
|
||||
- `database/schema_clean.sql` - Clean database schema without hypertables (actively used, includes raw_trades table)
|
||||
- `database/schema.sql` - Complete database schema with TimescaleDB hypertables (for future optimization)
|
||||
- `database/connection.py` - Database connection utility with connection pooling, session management, and raw data utilities
|
||||
- `database/redis_manager.py` - Redis connection utility with pub/sub messaging for real-time data distribution
|
||||
- `database/init/init.sql` - Docker initialization script for automatic database setup
|
||||
- `database/init/schema_clean.sql` - Copy of clean schema for Docker initialization
|
||||
- `data/okx_collector.py` - OKX API integration for real-time market data collection
|
||||
@ -21,6 +22,7 @@
|
||||
- `config/settings.py` - Centralized configuration settings using Pydantic
|
||||
- `scripts/dev.py` - Development setup and management script
|
||||
- `scripts/init_database.py` - Database initialization and verification script
|
||||
- `scripts/test_models.py` - Test script for SQLAlchemy models integration verification
|
||||
- `requirements.txt` - Python dependencies managed by UV
|
||||
- `docker-compose.yml` - Docker services configuration with TimescaleDB support
|
||||
- `tests/test_strategies.py` - Unit tests for strategy implementations
|
||||
@ -34,9 +36,9 @@
|
||||
- [x] 1.1 Install and configure PostgreSQL with Docker
|
||||
- [x] 1.2 Create database schema following the PRD specifications (market_data, bots, signals, trades, bot_performance tables)
|
||||
- [x] 1.3 Implement database connection utility with connection pooling
|
||||
- [ ] 1.4 Create database models using SQLAlchemy or similar ORM
|
||||
- [x] 1.4 Create database models using SQLAlchemy or similar ORM
|
||||
- [x] 1.5 Add proper indexes for time-series data optimization
|
||||
- [ ] 1.6 Setup Redis for pub/sub messaging
|
||||
- [x] 1.6 Setup Redis for pub/sub messaging
|
||||
- [ ] 1.7 Create database migration scripts and initial data seeding
|
||||
- [ ] 1.8 Unit test database models and connection utilities
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user