TCPDashboard/tests/test_exchange_factory.py
Vasily.onl 4510181b39 Add OKX data collector implementation and modular exchange architecture
- Introduced the `OKXCollector` and `OKXWebSocketClient` classes for real-time market data collection from the OKX exchange.
- Implemented a factory pattern for creating exchange-specific collectors, enhancing modularity and scalability.
- Added configuration support for the OKX collector in `config/okx_config.json`.
- Updated documentation to reflect the new modular architecture and provide guidance on using the OKX collector.
- Created unit tests for the OKX collector and exchange factory to ensure functionality and reliability.
- Enhanced logging and error handling throughout the new implementation for improved monitoring and debugging.
2025-05-31 20:49:31 +08:00

126 lines
4.2 KiB
Python

#!/usr/bin/env python3
"""
Test script for exchange factory pattern.
This script demonstrates how to use the new exchange factory
to create collectors from different exchanges.
"""
import asyncio
import sys
from pathlib import Path
# Add project root to Python path
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from data.exchanges import (
ExchangeFactory,
ExchangeCollectorConfig,
create_okx_collector,
get_supported_exchanges
)
from data.base_collector import DataType
from database.connection import init_database
from utils.logger import get_logger
async def test_factory_pattern():
"""Test the exchange factory pattern."""
logger = get_logger("factory_test", verbose=True)
try:
# Initialize database
logger.info("Initializing database...")
init_database()
# Test 1: Show supported exchanges
logger.info("=== Supported Exchanges ===")
supported = get_supported_exchanges()
logger.info(f"Supported exchanges: {supported}")
# Test 2: Create collector using factory
logger.info("=== Testing Exchange Factory ===")
config = ExchangeCollectorConfig(
exchange='okx',
symbol='BTC-USDT',
data_types=[DataType.TRADE, DataType.ORDERBOOK],
auto_restart=True,
health_check_interval=30.0,
store_raw_data=True
)
# Validate configuration
is_valid = ExchangeFactory.validate_config(config)
logger.info(f"Configuration valid: {is_valid}")
if is_valid:
# Create collector using factory
collector = ExchangeFactory.create_collector(config)
logger.info(f"Created collector: {type(collector).__name__}")
logger.info(f"Collector symbol: {collector.symbols}")
logger.info(f"Collector data types: {[dt.value for dt in collector.data_types]}")
# Test 3: Create collector using convenience function
logger.info("=== Testing Convenience Function ===")
okx_collector = create_okx_collector(
symbol='ETH-USDT',
data_types=[DataType.TRADE],
auto_restart=False
)
logger.info(f"Created OKX collector: {type(okx_collector).__name__}")
logger.info(f"OKX collector symbol: {okx_collector.symbols}")
# Test 4: Create multiple collectors
logger.info("=== Testing Multiple Collectors ===")
configs = [
ExchangeCollectorConfig('okx', 'BTC-USDT', [DataType.TRADE]),
ExchangeCollectorConfig('okx', 'ETH-USDT', [DataType.ORDERBOOK]),
ExchangeCollectorConfig('okx', 'SOL-USDT', [DataType.TRADE, DataType.ORDERBOOK])
]
collectors = ExchangeFactory.create_multiple_collectors(configs)
logger.info(f"Created {len(collectors)} collectors:")
for i, collector in enumerate(collectors):
logger.info(f" {i+1}. {type(collector).__name__} - {collector.symbols}")
# Test 5: Get exchange capabilities
logger.info("=== Exchange Capabilities ===")
okx_pairs = ExchangeFactory.get_supported_pairs('okx')
okx_data_types = ExchangeFactory.get_supported_data_types('okx')
logger.info(f"OKX supported pairs: {okx_pairs}")
logger.info(f"OKX supported data types: {okx_data_types}")
logger.info("All factory tests completed successfully!")
return True
except Exception as e:
logger.error(f"Factory test failed: {e}")
return False
async def main():
"""Main test function."""
logger = get_logger("main", verbose=True)
logger.info("Testing exchange factory pattern...")
success = await test_factory_pattern()
if success:
logger.info("Factory tests completed successfully!")
else:
logger.error("Factory tests failed!")
return success
if __name__ == "__main__":
try:
success = asyncio.run(main())
sys.exit(0 if success else 1)
except KeyboardInterrupt:
print("\nTest interrupted by user")
sys.exit(1)
except Exception as e:
print(f"Test failed with error: {e}")
sys.exit(1)