TCPDashboard/tests/database/test_redis_manager.py
Vasily.onl fe9d8e75ed Refactor Redis management and enhance system health callbacks
- Replaced the `RedisManager` class with a more modular `SyncRedisManager` and `AsyncRedisManager`, improving the separation of synchronous and asynchronous operations.
- Updated the `system_health.py` callbacks to utilize the new `get_sync_redis_manager` function for Redis interactions, simplifying the connection process.
- Enhanced error handling and logging in Redis status checks, providing clearer feedback on connection issues.
- Revised the setup documentation to reflect changes in Redis connection testing, ensuring clarity for users.

These updates improve the maintainability and reliability of Redis interactions within the system, aligning with best practices for modular design.
2025-06-07 00:27:17 +08:00

108 lines
3.9 KiB
Python

import asyncio
import unittest
from unittest.mock import MagicMock, AsyncMock, patch
from database.redis_manager import (
RedisConfig,
SyncRedisManager,
AsyncRedisManager,
publish_market_data,
get_sync_redis_manager
)
class TestRedisManagers(unittest.TestCase):
def setUp(self):
"""Set up mock configs and managers for each test."""
self.config = RedisConfig()
@patch('redis.Redis')
@patch('redis.ConnectionPool')
def test_sync_manager_initialization(self, mock_pool, mock_redis):
"""Test that SyncRedisManager initializes correctly."""
mock_redis_instance = mock_redis.return_value
manager = SyncRedisManager(self.config)
manager.initialize()
mock_pool.assert_called_once_with(**self.config.get_pool_kwargs())
mock_redis.assert_called_once_with(connection_pool=mock_pool.return_value)
mock_redis_instance.ping.assert_called_once()
self.assertIsNotNone(manager.client)
@patch('redis.asyncio.Redis')
@patch('redis.asyncio.ConnectionPool')
def test_async_manager_initialization(self, mock_pool, mock_redis_class):
"""Test that AsyncRedisManager initializes correctly."""
async def run_test():
mock_redis_instance = AsyncMock()
mock_redis_class.return_value = mock_redis_instance
manager = AsyncRedisManager(self.config)
await manager.initialize()
mock_pool.assert_called_once_with(**self.config.get_pool_kwargs())
mock_redis_class.assert_called_once_with(connection_pool=mock_pool.return_value)
mock_redis_instance.ping.assert_awaited_once()
self.assertIsNotNone(manager.async_client)
asyncio.run(run_test())
def test_sync_caching(self):
"""Test set, get, and delete operations for SyncRedisManager."""
manager = SyncRedisManager(self.config)
manager._redis_client = MagicMock()
# Test set
manager.set("key1", {"data": "value1"}, ex=60)
manager.client.set.assert_called_once_with("key1", '{"data": "value1"}', ex=60)
# Test get
manager.client.get.return_value = '{"data": "value1"}'
result = manager.get("key1")
self.assertEqual(result, {"data": "value1"})
# Test delete
manager.delete("key1")
manager.client.delete.assert_called_once_with("key1")
def test_async_caching(self):
"""Test async set, get, and delete for AsyncRedisManager."""
async def run_test():
manager = AsyncRedisManager(self.config)
manager._async_redis_client = AsyncMock()
# Test set
await manager.set("key2", "value2", ex=30)
manager.async_client.set.assert_awaited_once_with("key2", '"value2"', ex=30)
# Test get
manager.async_client.get.return_value = '"value2"'
result = await manager.get("key2")
self.assertEqual(result, "value2")
# Test delete
await manager.delete("key2")
manager.async_client.delete.assert_awaited_once_with("key2")
asyncio.run(run_test())
@patch('database.redis_manager.sync_redis_manager', new_callable=MagicMock)
def test_publish_market_data_convenience_func(self, mock_global_manager):
"""Test the publish_market_data convenience function."""
symbol = "BTC/USDT"
data = {"price": 100}
# This setup is needed because the global manager is patched
mock_global_manager.channels = get_sync_redis_manager().channels
publish_market_data(symbol, data)
expected_channel = mock_global_manager.channels.get_symbol_channel(
mock_global_manager.channels.market_data_ohlcv, symbol
)
mock_global_manager.publish.assert_called_once_with(expected_channel, data)
if __name__ == '__main__':
unittest.main()