diff --git a/data/common/validation.py b/data/common/validation.py deleted file mode 100644 index e820ea8..0000000 --- a/data/common/validation.py +++ /dev/null @@ -1,486 +0,0 @@ -""" -Base validation utilities for all exchanges. - -This module provides common validation patterns and base classes -that can be extended by exchange-specific validators. -""" - -import re -from datetime import datetime, timezone, timedelta -from decimal import Decimal, InvalidOperation -from typing import Dict, List, Optional, Any, Union, Pattern -from abc import ABC, abstractmethod - -from .data_types import DataValidationResult, StandardizedTrade, TradeSide - - -class ValidationResult: - """Simple validation result for individual field validation.""" - - def __init__(self, is_valid: bool, errors: List[str] = None, warnings: List[str] = None, sanitized_data: Any = None): - self.is_valid = is_valid - self.errors = errors or [] - self.warnings = warnings or [] - self.sanitized_data = sanitized_data - - -class BaseDataValidator(ABC): - """ - Abstract base class for exchange data validators. - - This class provides common validation patterns and utilities - that can be reused across different exchange implementations. - """ - - def __init__(self, - exchange_name: str, - component_name: str = "base_data_validator", - logger = None): - """ - Initialize base data validator. - - Args: - exchange_name: Name of the exchange (e.g., 'okx', 'binance') - component_name: Name for logging - logger: Logger instance. If None, no logging will be performed. - """ - self.exchange_name = exchange_name - self.component_name = component_name - self.logger = logger - - # Common validation patterns - self._numeric_pattern = re.compile(r'^-?\d*\.?\d+$') - self._trade_id_pattern = re.compile(r'^[a-zA-Z0-9_-]+$') # Flexible pattern - - # Valid trade sides - self._valid_trade_sides = {'buy', 'sell'} - - # Common price and size limits (can be overridden by subclasses) - self._min_price = Decimal('0.00000001') # 1 satoshi equivalent - self._max_price = Decimal('10000000') # 10 million - self._min_size = Decimal('0.00000001') # Minimum trade size - self._max_size = Decimal('1000000000') # 1 billion max size - - # Timestamp validation (milliseconds since epoch) - self._min_timestamp = 1000000000000 # 2001-09-09 (reasonable minimum) - self._max_timestamp = 9999999999999 # 2286-11-20 (reasonable maximum) - - if self.logger: - self.logger.debug(f"{self.component_name}: Initialized {exchange_name} data validator") - - # Abstract methods that must be implemented by subclasses - - @abstractmethod - def validate_symbol_format(self, symbol: str) -> ValidationResult: - """Validate exchange-specific symbol format.""" - pass - - @abstractmethod - def validate_websocket_message(self, message: Dict[str, Any]) -> DataValidationResult: - """Validate complete WebSocket message structure.""" - pass - - # Common validation methods available to all subclasses - - def validate_price(self, price: Union[str, int, float, Decimal]) -> ValidationResult: - """ - Validate price value with common rules. - - Args: - price: Price value to validate - - Returns: - ValidationResult with sanitized decimal price - """ - errors = [] - warnings = [] - sanitized_data = None - - try: - # Convert to Decimal for precise validation - if isinstance(price, str) and price.strip() == "": - errors.append("Empty price string") - return ValidationResult(False, errors, warnings) - - decimal_price = Decimal(str(price)) - sanitized_data = decimal_price - - # Check for negative prices - if decimal_price <= 0: - errors.append(f"Price must be positive, got {decimal_price}") - - # Check price bounds - if decimal_price < self._min_price: - warnings.append(f"Price {decimal_price} below minimum {self._min_price}") - elif decimal_price > self._max_price: - warnings.append(f"Price {decimal_price} above maximum {self._max_price}") - - # Check for excessive decimal places (warn only) - if decimal_price.as_tuple().exponent < -12: - warnings.append(f"Price has excessive decimal precision: {decimal_price}") - - except (InvalidOperation, ValueError, TypeError) as e: - errors.append(f"Invalid price value: {price} - {str(e)}") - - return ValidationResult(len(errors) == 0, errors, warnings, sanitized_data) - - def validate_size(self, size: Union[str, int, float, Decimal]) -> ValidationResult: - """ - Validate size/quantity value with common rules. - - Args: - size: Size value to validate - - Returns: - ValidationResult with sanitized decimal size - """ - errors = [] - warnings = [] - sanitized_data = None - - try: - # Convert to Decimal for precise validation - if isinstance(size, str) and size.strip() == "": - errors.append("Empty size string") - return ValidationResult(False, errors, warnings) - - decimal_size = Decimal(str(size)) - sanitized_data = decimal_size - - # Check for negative or zero sizes - if decimal_size <= 0: - errors.append(f"Size must be positive, got {decimal_size}") - - # Check size bounds - if decimal_size < self._min_size: - warnings.append(f"Size {decimal_size} below minimum {self._min_size}") - elif decimal_size > self._max_size: - warnings.append(f"Size {decimal_size} above maximum {self._max_size}") - - except (InvalidOperation, ValueError, TypeError) as e: - errors.append(f"Invalid size value: {size} - {str(e)}") - - return ValidationResult(len(errors) == 0, errors, warnings, sanitized_data) - - def validate_volume(self, volume: Union[str, int, float, Decimal]) -> ValidationResult: - """ - Validate volume value with common rules. - - Args: - volume: Volume value to validate - - Returns: - ValidationResult - """ - errors = [] - warnings = [] - - try: - decimal_volume = Decimal(str(volume)) - - # Volume can be zero (no trades in period) - if decimal_volume < 0: - errors.append(f"Volume cannot be negative, got {decimal_volume}") - - except (InvalidOperation, ValueError, TypeError) as e: - errors.append(f"Invalid volume value: {volume} - {str(e)}") - - return ValidationResult(len(errors) == 0, errors, warnings) - - def validate_trade_side(self, side: str) -> ValidationResult: - """ - Validate trade side with common rules. - - Args: - side: Trade side string - - Returns: - ValidationResult - """ - errors = [] - warnings = [] - - if not isinstance(side, str): - errors.append(f"Trade side must be string, got {type(side)}") - return ValidationResult(False, errors, warnings) - - normalized_side = side.lower() - if normalized_side not in self._valid_trade_sides: - errors.append(f"Invalid trade side: {side}. Must be 'buy' or 'sell'") - - return ValidationResult(len(errors) == 0, errors, warnings) - - def validate_timestamp(self, timestamp: Union[str, int], is_milliseconds: bool = True) -> ValidationResult: - """ - Validate timestamp value with common rules. - - Args: - timestamp: Timestamp value to validate - is_milliseconds: True if timestamp is in milliseconds, False for seconds - - Returns: - ValidationResult - """ - errors = [] - warnings = [] - - try: - # Convert to int - if isinstance(timestamp, str): - if not timestamp.isdigit(): - errors.append(f"Invalid timestamp format: {timestamp}") - return ValidationResult(False, errors, warnings) - timestamp_int = int(timestamp) - elif isinstance(timestamp, int): - timestamp_int = timestamp - else: - errors.append(f"Timestamp must be string or int, got {type(timestamp)}") - return ValidationResult(False, errors, warnings) - - # Convert to milliseconds if needed - if not is_milliseconds: - timestamp_int = timestamp_int * 1000 - - # Check timestamp bounds - if timestamp_int < self._min_timestamp: - errors.append(f"Timestamp {timestamp_int} too old") - elif timestamp_int > self._max_timestamp: - errors.append(f"Timestamp {timestamp_int} too far in future") - - # Check if timestamp is reasonable (within last year to next year) - current_time_ms = int(datetime.now(timezone.utc).timestamp() * 1000) - one_year_ms = 365 * 24 * 60 * 60 * 1000 - - if timestamp_int < (current_time_ms - one_year_ms): - warnings.append(f"Timestamp {timestamp_int} is older than 1 year") - elif timestamp_int > (current_time_ms + one_year_ms): - warnings.append(f"Timestamp {timestamp_int} is more than 1 year in future") - - except (ValueError, TypeError) as e: - errors.append(f"Invalid timestamp: {timestamp} - {str(e)}") - - return ValidationResult(len(errors) == 0, errors, warnings) - - def validate_trade_id(self, trade_id: Union[str, int]) -> ValidationResult: - """ - Validate trade ID with flexible rules. - - Args: - trade_id: Trade ID to validate - - Returns: - ValidationResult - """ - errors = [] - warnings = [] - - if isinstance(trade_id, int): - trade_id = str(trade_id) - - if not isinstance(trade_id, str): - errors.append(f"Trade ID must be string or int, got {type(trade_id)}") - return ValidationResult(False, errors, warnings) - - if not trade_id.strip(): - errors.append("Trade ID cannot be empty") - return ValidationResult(False, errors, warnings) - - # Flexible validation - allow alphanumeric, underscore, hyphen - if not self._trade_id_pattern.match(trade_id): - warnings.append(f"Trade ID has unusual format: {trade_id}") - - return ValidationResult(len(errors) == 0, errors, warnings) - - def validate_symbol_match(self, symbol: str, expected_symbol: Optional[str] = None) -> ValidationResult: - """ - Validate symbol matches expected value. - - Args: - symbol: Symbol to validate - expected_symbol: Expected symbol value - - Returns: - ValidationResult - """ - errors = [] - warnings = [] - - if not isinstance(symbol, str): - errors.append(f"Symbol must be string, got {type(symbol)}") - return ValidationResult(False, errors, warnings) - - if expected_symbol and symbol != expected_symbol: - warnings.append(f"Symbol mismatch: expected {expected_symbol}, got {symbol}") - - return ValidationResult(len(errors) == 0, errors, warnings) - - def validate_orderbook_side(self, side_data: List[List[str]], side_name: str) -> ValidationResult: - """ - Validate orderbook side (asks or bids) with common rules. - - Args: - side_data: List of price/size pairs - side_name: Name of side for error messages - - Returns: - ValidationResult with sanitized data - """ - errors = [] - warnings = [] - sanitized_data = [] - - if not isinstance(side_data, list): - errors.append(f"{side_name} must be a list") - return ValidationResult(False, errors, warnings) - - for i, level in enumerate(side_data): - if not isinstance(level, list) or len(level) < 2: - errors.append(f"{side_name}[{i}] must be a list with at least 2 elements") - continue - - # Validate price and size - price_result = self.validate_price(level[0]) - size_result = self.validate_size(level[1]) - - if not price_result.is_valid: - errors.extend([f"{side_name}[{i}] price: {error}" for error in price_result.errors]) - if not size_result.is_valid: - errors.extend([f"{side_name}[{i}] size: {error}" for error in size_result.errors]) - - # Add sanitized level - if price_result.is_valid and size_result.is_valid: - sanitized_level = [str(price_result.sanitized_data), str(size_result.sanitized_data)] - # Include additional fields if present - if len(level) > 2: - sanitized_level.extend(level[2:]) - sanitized_data.append(sanitized_level) - - return ValidationResult(len(errors) == 0, errors, warnings, sanitized_data) - - def validate_standardized_trade(self, trade: StandardizedTrade) -> DataValidationResult: - """ - Validate a standardized trade object. - - Args: - trade: StandardizedTrade object to validate - - Returns: - DataValidationResult - """ - errors = [] - warnings = [] - - try: - # Validate price - price_result = self.validate_price(trade.price) - if not price_result.is_valid: - errors.extend([f"price: {error}" for error in price_result.errors]) - warnings.extend([f"price: {warning}" for warning in price_result.warnings]) - - # Validate size - size_result = self.validate_size(trade.size) - if not size_result.is_valid: - errors.extend([f"size: {error}" for error in size_result.errors]) - warnings.extend([f"size: {warning}" for warning in size_result.warnings]) - - # Validate side - side_result = self.validate_trade_side(trade.side) - if not side_result.is_valid: - errors.extend([f"side: {error}" for error in side_result.errors]) - - # Validate trade ID - trade_id_result = self.validate_trade_id(trade.trade_id) - if not trade_id_result.is_valid: - errors.extend([f"trade_id: {error}" for error in trade_id_result.errors]) - warnings.extend([f"trade_id: {warning}" for warning in trade_id_result.warnings]) - - # Validate symbol format (exchange-specific) - symbol_result = self.validate_symbol_format(trade.symbol) - if not symbol_result.is_valid: - errors.extend([f"symbol: {error}" for error in symbol_result.errors]) - warnings.extend([f"symbol: {warning}" for warning in symbol_result.warnings]) - - # Validate timestamp - timestamp_ms = int(trade.timestamp.timestamp() * 1000) - timestamp_result = self.validate_timestamp(timestamp_ms, is_milliseconds=True) - if not timestamp_result.is_valid: - errors.extend([f"timestamp: {error}" for error in timestamp_result.errors]) - warnings.extend([f"timestamp: {warning}" for warning in timestamp_result.warnings]) - - return DataValidationResult(len(errors) == 0, errors, warnings) - - except Exception as e: - errors.append(f"Exception during trade validation: {str(e)}") - return DataValidationResult(False, errors, warnings) - - def get_validator_info(self) -> Dict[str, Any]: - """Get validator configuration information.""" - return { - 'exchange': self.exchange_name, - 'component': self.component_name, - 'limits': { - 'min_price': str(self._min_price), - 'max_price': str(self._max_price), - 'min_size': str(self._min_size), - 'max_size': str(self._max_size), - 'min_timestamp': self._min_timestamp, - 'max_timestamp': self._max_timestamp - }, - 'patterns': { - 'numeric': self._numeric_pattern.pattern, - 'trade_id': self._trade_id_pattern.pattern - } - } - - -# Utility functions for common validation patterns - -def is_valid_decimal(value: Any) -> bool: - """Check if value can be converted to a valid decimal.""" - try: - Decimal(str(value)) - return True - except (InvalidOperation, ValueError, TypeError): - return False - - -def normalize_symbol(symbol: str, exchange: str) -> str: - """ - Normalize symbol format for exchange. - - Args: - symbol: Raw symbol string - exchange: Exchange name - - Returns: - Normalized symbol string - """ - # Basic normalization - can be extended per exchange - return symbol.upper().strip() - - -def validate_required_fields(data: Dict[str, Any], required_fields: List[str]) -> List[str]: - """ - Validate that all required fields are present in data. - - Args: - data: Data dictionary to check - required_fields: List of required field names - - Returns: - List of missing field names - """ - missing_fields = [] - for field in required_fields: - if field not in data or data[field] is None: - missing_fields.append(field) - return missing_fields - - -__all__ = [ - 'ValidationResult', - 'BaseDataValidator', - 'is_valid_decimal', - 'normalize_symbol', - 'validate_required_fields' -] \ No newline at end of file diff --git a/data/common/validation/__init__.py b/data/common/validation/__init__.py new file mode 100644 index 0000000..4c43c6a --- /dev/null +++ b/data/common/validation/__init__.py @@ -0,0 +1,58 @@ +""" +Data validation utilities for exchange data. + +This package provides common validation patterns and base classes +that can be extended by exchange-specific validators. +""" + +from .result import ValidationResult, DataValidationResult +from .base import BaseDataValidator +from .field_validators import ( + validate_price, + validate_size, + validate_volume, + validate_trade_side, + validate_timestamp, + validate_trade_id, + validate_symbol_match, + validate_required_fields, + is_valid_decimal, + MIN_PRICE, + MAX_PRICE, + MIN_SIZE, + MAX_SIZE, + MIN_TIMESTAMP, + MAX_TIMESTAMP, + VALID_TRADE_SIDES, + NUMERIC_PATTERN, + TRADE_ID_PATTERN +) + +__all__ = [ + # Classes + 'ValidationResult', + 'DataValidationResult', + 'BaseDataValidator', + + # Field validation functions + 'validate_price', + 'validate_size', + 'validate_volume', + 'validate_trade_side', + 'validate_timestamp', + 'validate_trade_id', + 'validate_symbol_match', + 'validate_required_fields', + 'is_valid_decimal', + + # Constants + 'MIN_PRICE', + 'MAX_PRICE', + 'MIN_SIZE', + 'MAX_SIZE', + 'MIN_TIMESTAMP', + 'MAX_TIMESTAMP', + 'VALID_TRADE_SIDES', + 'NUMERIC_PATTERN', + 'TRADE_ID_PATTERN' +] \ No newline at end of file diff --git a/data/common/validation/base.py b/data/common/validation/base.py new file mode 100644 index 0000000..ea00a64 --- /dev/null +++ b/data/common/validation/base.py @@ -0,0 +1,255 @@ +""" +Base validator class for exchange data validation. + +This module provides the abstract base class for exchange-specific data validators, +along with common validation patterns and utilities. +""" + +from abc import ABC, abstractmethod +from typing import Dict, Any, Optional, List, Union +from decimal import Decimal +from logging import Logger + +from .result import ValidationResult, DataValidationResult +from .field_validators import ( + validate_price, + validate_size, + validate_volume, + validate_trade_side, + validate_timestamp, + validate_trade_id, + validate_symbol_match, + validate_required_fields, + MIN_PRICE, + MAX_PRICE, + MIN_SIZE, + MAX_SIZE, + MIN_TIMESTAMP, + MAX_TIMESTAMP, + VALID_TRADE_SIDES, + NUMERIC_PATTERN, + TRADE_ID_PATTERN +) + + +class BaseDataValidator(ABC): + """ + Abstract base class for exchange data validators. + + This class provides common validation patterns and utilities + that can be reused across different exchange implementations. + """ + + def __init__(self, + exchange_name: str, + component_name: str = "base_data_validator", + logger: Optional[Logger] = None): + """ + Initialize base data validator. + + Args: + exchange_name: Name of the exchange (e.g., 'okx', 'binance') + component_name: Name for logging + logger: Logger instance. If None, no logging will be performed. + """ + self.exchange_name = exchange_name + self.component_name = component_name + self.logger = logger + + # Common validation patterns + self._numeric_pattern = NUMERIC_PATTERN + self._trade_id_pattern = TRADE_ID_PATTERN + + # Valid trade sides + self._valid_trade_sides = VALID_TRADE_SIDES + + # Common price and size limits (can be overridden by subclasses) + self._min_price = MIN_PRICE + self._max_price = MAX_PRICE + self._min_size = MIN_SIZE + self._max_size = MAX_SIZE + + # Timestamp validation (milliseconds since epoch) + self._min_timestamp = MIN_TIMESTAMP + self._max_timestamp = MAX_TIMESTAMP + + if self.logger: + self.logger.debug(f"{self.component_name}: Initialized {exchange_name} data validator") + + # Abstract methods that must be implemented by subclasses + + @abstractmethod + def validate_symbol_format(self, symbol: str) -> ValidationResult: + """ + Validate exchange-specific symbol format. + + Args: + symbol: Symbol to validate + + Returns: + ValidationResult + """ + pass + + @abstractmethod + def validate_websocket_message(self, message: Dict[str, Any]) -> DataValidationResult: + """ + Validate complete WebSocket message structure. + + Args: + message: WebSocket message to validate + + Returns: + DataValidationResult + """ + pass + + # Common validation methods available to all subclasses + + def validate_price(self, price: Union[str, int, float, Decimal]) -> ValidationResult: + """ + Validate price value with common rules. + + Args: + price: Price value to validate + + Returns: + ValidationResult with sanitized decimal price + """ + return validate_price(price) + + def validate_size(self, size: Union[str, int, float, Decimal]) -> ValidationResult: + """ + Validate size/quantity value with common rules. + + Args: + size: Size value to validate + + Returns: + ValidationResult with sanitized decimal size + """ + return validate_size(size) + + def validate_volume(self, volume: Union[str, int, float, Decimal]) -> ValidationResult: + """ + Validate volume value with common rules. + + Args: + volume: Volume value to validate + + Returns: + ValidationResult + """ + return validate_volume(volume) + + def validate_trade_side(self, side: str) -> ValidationResult: + """ + Validate trade side with common rules. + + Args: + side: Trade side string + + Returns: + ValidationResult + """ + return validate_trade_side(side) + + def validate_timestamp(self, timestamp: Union[str, int], is_milliseconds: bool = True) -> ValidationResult: + """ + Validate timestamp value with common rules. + + Args: + timestamp: Timestamp value to validate + is_milliseconds: True if timestamp is in milliseconds, False for seconds + + Returns: + ValidationResult + """ + return validate_timestamp(timestamp, is_milliseconds) + + def validate_trade_id(self, trade_id: Union[str, int]) -> ValidationResult: + """ + Validate trade ID with flexible rules. + + Args: + trade_id: Trade ID to validate + + Returns: + ValidationResult + """ + return validate_trade_id(trade_id) + + def validate_symbol_match(self, symbol: str, expected_symbol: Optional[str] = None) -> ValidationResult: + """ + Validate symbol matches expected value. + + Args: + symbol: Symbol to validate + expected_symbol: Expected symbol value + + Returns: + ValidationResult + """ + return validate_symbol_match(symbol, expected_symbol) + + def validate_orderbook_side(self, side_data: List[List[str]], side_name: str) -> ValidationResult: + """ + Validate orderbook side (asks or bids) with common rules. + + Args: + side_data: List of price/size pairs + side_name: Name of side for error messages + + Returns: + ValidationResult with sanitized data + """ + errors = [] + warnings = [] + sanitized_data = [] + + if not isinstance(side_data, list): + errors.append(f"{side_name} must be a list") + return ValidationResult(False, errors, warnings) + + for i, level in enumerate(side_data): + if not isinstance(level, list) or len(level) < 2: + errors.append(f"{side_name}[{i}] must be a list with at least 2 elements") + continue + + # Validate price and size + price_result = self.validate_price(level[0]) + size_result = self.validate_size(level[1]) + + if not price_result.is_valid: + errors.extend([f"{side_name}[{i}] price: {error}" for error in price_result.errors]) + if not size_result.is_valid: + errors.extend([f"{side_name}[{i}] size: {error}" for error in size_result.errors]) + + # Add sanitized level + if price_result.is_valid and size_result.is_valid: + sanitized_level = [str(price_result.sanitized_data), str(size_result.sanitized_data)] + # Include additional fields if present + if len(level) > 2: + sanitized_level.extend(level[2:]) + sanitized_data.append(sanitized_level) + + return ValidationResult(len(errors) == 0, errors, warnings, sanitized_data) + + def get_validator_info(self) -> Dict[str, Any]: + """Get validator configuration information.""" + return { + 'exchange': self.exchange_name, + 'component': self.component_name, + 'limits': { + 'min_price': str(self._min_price), + 'max_price': str(self._max_price), + 'min_size': str(self._min_size), + 'max_size': str(self._max_size), + 'min_timestamp': self._min_timestamp, + 'max_timestamp': self._max_timestamp + }, + 'patterns': { + 'numeric': self._numeric_pattern.pattern, + 'trade_id': self._trade_id_pattern.pattern + } + } \ No newline at end of file diff --git a/data/common/validation/field_validators.py b/data/common/validation/field_validators.py new file mode 100644 index 0000000..96a1217 --- /dev/null +++ b/data/common/validation/field_validators.py @@ -0,0 +1,293 @@ +""" +Field validation functions for common data types. + +This module provides standalone validation functions for individual fields +like prices, sizes, timestamps, etc. +""" + +import re +from datetime import datetime, timezone +from decimal import Decimal, InvalidOperation +from typing import Union, List, Dict, Any, Set, Pattern + +from .result import ValidationResult + + +# Common validation patterns +NUMERIC_PATTERN: Pattern = re.compile(r'^-?\d*\.?\d+$') +TRADE_ID_PATTERN: Pattern = re.compile(r'^[a-zA-Z0-9_-]+$') + +# Common validation constants +MIN_PRICE: Decimal = Decimal('0.00000001') # 1 satoshi equivalent +MAX_PRICE: Decimal = Decimal('10000000') # 10 million +MIN_SIZE: Decimal = Decimal('0.00000001') # Minimum trade size +MAX_SIZE: Decimal = Decimal('1000000000') # 1 billion max size +MIN_TIMESTAMP: int = 1000000000000 # 2001-09-09 +MAX_TIMESTAMP: int = 9999999999999 # 2286-11-20 +VALID_TRADE_SIDES: Set[str] = {'buy', 'sell'} + + +def validate_price(price: Union[str, int, float, Decimal]) -> ValidationResult: + """ + Validate price value with common rules. + + Args: + price: Price value to validate + + Returns: + ValidationResult with sanitized decimal price + """ + errors = [] + warnings = [] + sanitized_data = None + + try: + # Convert to Decimal for precise validation + if isinstance(price, str) and price.strip() == "": + errors.append("Empty price string") + return ValidationResult(False, errors, warnings) + + decimal_price = Decimal(str(price)) + sanitized_data = decimal_price + + # Check for negative prices + if decimal_price <= 0: + errors.append(f"Price must be positive, got {decimal_price}") + + # Check price bounds + if decimal_price < MIN_PRICE: + warnings.append(f"Price {decimal_price} below minimum {MIN_PRICE}") + elif decimal_price > MAX_PRICE: + warnings.append(f"Price {decimal_price} above maximum {MAX_PRICE}") + + # Check for excessive decimal places (warn only) + if decimal_price.as_tuple().exponent < -12: + warnings.append(f"Price has excessive decimal precision: {decimal_price}") + + except (InvalidOperation, ValueError, TypeError) as e: + errors.append(f"Invalid price value: {price} - {str(e)}") + + return ValidationResult(len(errors) == 0, errors, warnings, sanitized_data) + + +def validate_size(size: Union[str, int, float, Decimal]) -> ValidationResult: + """ + Validate size/quantity value with common rules. + + Args: + size: Size value to validate + + Returns: + ValidationResult with sanitized decimal size + """ + errors = [] + warnings = [] + sanitized_data = None + + try: + # Convert to Decimal for precise validation + if isinstance(size, str) and size.strip() == "": + errors.append("Empty size string") + return ValidationResult(False, errors, warnings) + + decimal_size = Decimal(str(size)) + sanitized_data = decimal_size + + # Check for negative or zero sizes + if decimal_size <= 0: + errors.append(f"Size must be positive, got {decimal_size}") + + # Check size bounds + if decimal_size < MIN_SIZE: + warnings.append(f"Size {decimal_size} below minimum {MIN_SIZE}") + elif decimal_size > MAX_SIZE: + warnings.append(f"Size {decimal_size} above maximum {MAX_SIZE}") + + except (InvalidOperation, ValueError, TypeError) as e: + errors.append(f"Invalid size value: {size} - {str(e)}") + + return ValidationResult(len(errors) == 0, errors, warnings, sanitized_data) + + +def validate_volume(volume: Union[str, int, float, Decimal]) -> ValidationResult: + """ + Validate volume value with common rules. + + Args: + volume: Volume value to validate + + Returns: + ValidationResult + """ + errors = [] + warnings = [] + + try: + decimal_volume = Decimal(str(volume)) + + # Volume can be zero (no trades in period) + if decimal_volume < 0: + errors.append(f"Volume cannot be negative, got {decimal_volume}") + + except (InvalidOperation, ValueError, TypeError) as e: + errors.append(f"Invalid volume value: {volume} - {str(e)}") + + return ValidationResult(len(errors) == 0, errors, warnings) + + +def validate_trade_side(side: str) -> ValidationResult: + """ + Validate trade side with common rules. + + Args: + side: Trade side string + + Returns: + ValidationResult + """ + errors = [] + warnings = [] + + if not isinstance(side, str): + errors.append(f"Trade side must be string, got {type(side)}") + return ValidationResult(False, errors, warnings) + + normalized_side = side.lower() + if normalized_side not in VALID_TRADE_SIDES: + errors.append(f"Invalid trade side: {side}. Must be 'buy' or 'sell'") + + return ValidationResult(len(errors) == 0, errors, warnings) + + +def validate_timestamp(timestamp: Union[str, int], is_milliseconds: bool = True) -> ValidationResult: + """ + Validate timestamp value with common rules. + + Args: + timestamp: Timestamp value to validate + is_milliseconds: True if timestamp is in milliseconds, False for seconds + + Returns: + ValidationResult + """ + errors = [] + warnings = [] + + try: + # Convert to int + if isinstance(timestamp, str): + if not timestamp.isdigit(): + errors.append(f"Invalid timestamp format: {timestamp}") + return ValidationResult(False, errors, warnings) + timestamp_int = int(timestamp) + elif isinstance(timestamp, int): + timestamp_int = timestamp + else: + errors.append(f"Timestamp must be string or int, got {type(timestamp)}") + return ValidationResult(False, errors, warnings) + + # Convert to milliseconds if needed + if not is_milliseconds: + timestamp_int = timestamp_int * 1000 + + # Check timestamp bounds + if timestamp_int < MIN_TIMESTAMP: + errors.append(f"Timestamp {timestamp_int} too old") + elif timestamp_int > MAX_TIMESTAMP: + errors.append(f"Timestamp {timestamp_int} too far in future") + + # Check if timestamp is reasonable (within last year to next year) + current_time_ms = int(datetime.now(timezone.utc).timestamp() * 1000) + one_year_ms = 365 * 24 * 60 * 60 * 1000 + + if timestamp_int < (current_time_ms - one_year_ms): + warnings.append(f"Timestamp {timestamp_int} is older than 1 year") + elif timestamp_int > (current_time_ms + one_year_ms): + warnings.append(f"Timestamp {timestamp_int} is more than 1 year in future") + + except (ValueError, TypeError) as e: + errors.append(f"Invalid timestamp: {timestamp} - {str(e)}") + + return ValidationResult(len(errors) == 0, errors, warnings) + + +def validate_trade_id(trade_id: Union[str, int]) -> ValidationResult: + """ + Validate trade ID with flexible rules. + + Args: + trade_id: Trade ID to validate + + Returns: + ValidationResult + """ + errors = [] + warnings = [] + + if isinstance(trade_id, int): + trade_id = str(trade_id) + + if not isinstance(trade_id, str): + errors.append(f"Trade ID must be string or int, got {type(trade_id)}") + return ValidationResult(False, errors, warnings) + + if not trade_id.strip(): + errors.append("Trade ID cannot be empty") + return ValidationResult(False, errors, warnings) + + # Flexible validation - allow alphanumeric, underscore, hyphen + if not TRADE_ID_PATTERN.match(trade_id): + warnings.append(f"Trade ID has unusual format: {trade_id}") + + return ValidationResult(len(errors) == 0, errors, warnings) + + +def validate_symbol_match(symbol: str, expected_symbol: str = None) -> ValidationResult: + """ + Validate symbol matches expected value. + + Args: + symbol: Symbol to validate + expected_symbol: Expected symbol value + + Returns: + ValidationResult + """ + errors = [] + warnings = [] + + if not isinstance(symbol, str): + errors.append(f"Symbol must be string, got {type(symbol)}") + return ValidationResult(False, errors, warnings) + + if expected_symbol and symbol != expected_symbol: + warnings.append(f"Symbol mismatch: expected {expected_symbol}, got {symbol}") + + return ValidationResult(len(errors) == 0, errors, warnings) + + +def validate_required_fields(data: Dict[str, Any], required_fields: List[str]) -> List[str]: + """ + Validate that all required fields are present in data. + + Args: + data: Data dictionary to check + required_fields: List of required field names + + Returns: + List of missing field names + """ + missing_fields = [] + for field in required_fields: + if field not in data or data[field] is None: + missing_fields.append(field) + return missing_fields + + +def is_valid_decimal(value: Any) -> bool: + """Check if value can be converted to a valid decimal.""" + try: + Decimal(str(value)) + return True + except (InvalidOperation, ValueError, TypeError): + return False \ No newline at end of file diff --git a/data/common/validation/result.py b/data/common/validation/result.py new file mode 100644 index 0000000..7fbcc46 --- /dev/null +++ b/data/common/validation/result.py @@ -0,0 +1,113 @@ +""" +Validation result classes for data validation. + +This module provides result classes used to represent validation outcomes +across the validation system. +""" + +from typing import List, Any, Optional, Dict + + +class ValidationResult: + """Simple validation result for individual field validation.""" + + def __init__(self, + is_valid: bool, + errors: List[str] = None, + warnings: List[str] = None, + sanitized_data: Any = None): + """ + Initialize validation result. + + Args: + is_valid: Whether the validation passed + errors: List of error messages + warnings: List of warning messages + sanitized_data: Optional sanitized/normalized data + """ + self.is_valid = is_valid + self.errors = errors or [] + self.warnings = warnings or [] + self.sanitized_data = sanitized_data + + def __str__(self) -> str: + """String representation of validation result.""" + status = "valid" if self.is_valid else "invalid" + details = [] + if self.errors: + details.append(f"{len(self.errors)} errors") + if self.warnings: + details.append(f"{len(self.warnings)} warnings") + detail_str = f" with {', '.join(details)}" if details else "" + return f"ValidationResult: {status}{detail_str}" + + def add_error(self, error: str) -> None: + """Add an error message and set is_valid to False.""" + self.errors.append(error) + self.is_valid = False + + def add_warning(self, warning: str) -> None: + """Add a warning message.""" + self.warnings.append(warning) + + def merge(self, other: 'ValidationResult') -> None: + """Merge another validation result into this one.""" + self.is_valid = self.is_valid and other.is_valid + self.errors.extend(other.errors) + self.warnings.extend(other.warnings) + # Don't merge sanitized data - it's context specific + + +class DataValidationResult: + """Result of data validation - common across all exchanges.""" + + def __init__(self, + is_valid: bool, + errors: List[str], + warnings: List[str], + sanitized_data: Optional[Dict[str, Any]] = None): + """ + Initialize data validation result. + + Args: + is_valid: Whether the validation passed + errors: List of error messages + warnings: List of warning messages + sanitized_data: Optional sanitized/normalized data dictionary + """ + self.is_valid = is_valid + self.errors = errors + self.warnings = warnings + self.sanitized_data = sanitized_data + + def __str__(self) -> str: + """String representation of data validation result.""" + status = "valid" if self.is_valid else "invalid" + details = [] + if self.errors: + details.append(f"{len(self.errors)} errors") + if self.warnings: + details.append(f"{len(self.warnings)} warnings") + if self.sanitized_data: + details.append("has sanitized data") + detail_str = f" with {', '.join(details)}" if details else "" + return f"DataValidationResult: {status}{detail_str}" + + def add_error(self, error: str) -> None: + """Add an error message and set is_valid to False.""" + self.errors.append(error) + self.is_valid = False + + def add_warning(self, warning: str) -> None: + """Add a warning message.""" + self.warnings.append(warning) + + def merge(self, other: 'DataValidationResult') -> None: + """Merge another data validation result into this one.""" + self.is_valid = self.is_valid and other.is_valid + self.errors.extend(other.errors) + self.warnings.extend(other.warnings) + if other.sanitized_data: + if not self.sanitized_data: + self.sanitized_data = {} + self.sanitized_data.update(other.sanitized_data) \ No newline at end of file diff --git a/docs/modules/README.md b/docs/modules/README.md index 9e685ad..001c7f7 100644 --- a/docs/modules/README.md +++ b/docs/modules/README.md @@ -29,6 +29,16 @@ This section contains detailed technical documentation for all system modules in - Integration examples and patterns - Comprehensive troubleshooting guide +- **[Data Validation (`validation.md`)]** - *Robust data validation framework* + - **BaseDataValidator** abstract class for exchange-specific validation + - **Field Validators** for common market data fields + - **Validation Results** with error and warning handling + - **Exchange-Specific Validators** with custom rules + - Comprehensive test coverage + - Error handling and sanitization + - Performance optimization for high-frequency validation + - Integration examples and patterns + ### Database Operations - **[Database Operations (`database_operations.md`)]** - *Repository pattern for clean database interactions* diff --git a/docs/modules/validation.md b/docs/modules/validation.md new file mode 100644 index 0000000..2f9f2e8 --- /dev/null +++ b/docs/modules/validation.md @@ -0,0 +1,194 @@ +# Data Validation Module + +## Purpose +The data validation module provides a robust, extensible framework for validating market data across different exchanges. It ensures data consistency, type safety, and business rule compliance through a modular validation system. + +## Architecture + +### Package Structure +``` +data/common/validation/ +├── __init__.py # Public interface +├── result.py # Validation result classes +├── field_validators.py # Individual field validators +└── base.py # BaseDataValidator class +``` + +### Core Components + +#### ValidationResult +Represents the outcome of validating a single field or component: +```python +ValidationResult( + is_valid: bool, # Whether validation passed + errors: List[str] = [], # Error messages + warnings: List[str] = [], # Warning messages + sanitized_data: Any = None # Cleaned/normalized data +) +``` + +#### DataValidationResult +Represents the outcome of validating a complete data structure: +```python +DataValidationResult( + is_valid: bool, + errors: List[str], + warnings: List[str], + sanitized_data: Optional[Dict[str, Any]] = None +) +``` + +#### BaseDataValidator +Abstract base class providing common validation patterns for exchange-specific implementations: +```python +class BaseDataValidator(ABC): + def __init__(self, exchange_name: str, component_name: str, logger: Optional[Logger]) + + @abstractmethod + def validate_symbol_format(self, symbol: str) -> ValidationResult + + @abstractmethod + def validate_websocket_message(self, message: Dict[str, Any]) -> DataValidationResult +``` + +### Field Validators +Common validation functions for market data fields: +- `validate_price()`: Price value validation +- `validate_size()`: Size/quantity validation +- `validate_volume()`: Volume validation +- `validate_trade_side()`: Trade side validation +- `validate_timestamp()`: Timestamp validation +- `validate_trade_id()`: Trade ID validation +- `validate_symbol_match()`: Symbol matching validation +- `validate_required_fields()`: Required field presence validation + +## Usage Examples + +### Creating an Exchange-Specific Validator +```python +from data.common.validation import BaseDataValidator, ValidationResult + +class OKXDataValidator(BaseDataValidator): + def __init__(self, component_name: str = "okx_data_validator", logger = None): + super().__init__("okx", component_name, logger) + self._symbol_pattern = re.compile(r'^[A-Z0-9]+-[A-Z0-9]+$') + + def validate_symbol_format(self, symbol: str) -> ValidationResult: + errors = [] + warnings = [] + + if not isinstance(symbol, str): + errors.append(f"Symbol must be string, got {type(symbol)}") + return ValidationResult(False, errors, warnings) + + if not self._symbol_pattern.match(symbol): + errors.append(f"Invalid symbol format: {symbol}") + + return ValidationResult(len(errors) == 0, errors, warnings) +``` + +### Validating Trade Data +```python +def validate_trade(validator: BaseDataValidator, trade_data: Dict[str, Any]) -> None: + result = validator.validate_trade_data(trade_data) + + if not result.is_valid: + raise ValidationError(f"Trade validation failed: {result.errors}") + + if result.warnings: + logger.warning(f"Trade validation warnings: {result.warnings}") + + return result.sanitized_data +``` + +## Configuration + +### Validation Constants +The module defines several constants for validation rules: +```python +MIN_PRICE = Decimal('0.00000001') +MAX_PRICE = Decimal('1000000000') +MIN_SIZE = Decimal('0.00000001') +MAX_SIZE = Decimal('1000000000') +MIN_TIMESTAMP = 946684800000 # 2000-01-01 +MAX_TIMESTAMP = 32503680000000 # 3000-01-01 +VALID_TRADE_SIDES = {'buy', 'sell'} +``` + +### Regular Expression Patterns +```python +NUMERIC_PATTERN = re.compile(r'^-?\d*\.?\d+$') +TRADE_ID_PATTERN = re.compile(r'^[\w-]+$') +``` + +## Testing + +### Running Tests +```bash +pytest tests/test_data_validation.py -v +``` + +### Test Coverage +The validation module has comprehensive test coverage including: +- Basic validation result functionality +- Field validator functions +- Base validator class +- Exchange-specific validator implementations +- Error handling and edge cases + +## Dependencies +- Internal: + - `data.common.data_types` + - `data.base_collector` +- External: + - `typing` + - `decimal` + - `logging` + - `abc` + +## Error Handling + +### Common Validation Errors +- Invalid data type +- Value out of bounds +- Missing required fields +- Invalid format +- Symbol mismatch + +### Error Response Format +```python +{ + 'is_valid': False, + 'errors': ['Price must be positive', 'Size exceeds maximum'], + 'warnings': ['Price below recommended minimum'], + 'sanitized_data': None +} +``` + +## Best Practices + +### Implementing New Validators +1. Extend `BaseDataValidator` +2. Implement required abstract methods +3. Add exchange-specific validation rules +4. Reuse common field validators +5. Add comprehensive tests + +### Validation Guidelines +- Always sanitize input data +- Include helpful error messages +- Use warnings for non-critical issues +- Maintain type safety +- Log validation failures appropriately + +## Known Issues and Limitations +- Timestamp validation assumes millisecond precision +- Trade ID format is loosely validated +- Some exchanges may require custom numeric precision + +## Future Improvements +- Add support for custom validation rules +- Implement async validation methods +- Add validation rule configuration system +- Enhance performance for high-frequency validation +- Add more exchange-specific validators \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 8ca5b8b..f6b0ad1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -39,6 +39,7 @@ dependencies = [ "click>=8.0.0", # For CLI commands "pytest>=8.3.5", "psutil>=7.0.0", + "tzlocal>=5.3.1", ] [project.optional-dependencies] diff --git a/tasks/refactor-common-package.md b/tasks/refactor-common-package.md index 215b73a..bc66cd8 100644 --- a/tasks/refactor-common-package.md +++ b/tasks/refactor-common-package.md @@ -38,11 +38,11 @@ - [x] 2.8 Delete the original `data/common/indicators.py` file. - [x] 2.9 Run tests to verify the indicators logic still works as expected. -- [ ] 3.0 Refactor `validation.py` for better modularity. - - [ ] 3.1 Create safety net tests for validation module. - - [ ] 3.2 Extract common validation logic into separate functions. - - [ ] 3.3 Improve error handling and validation messages. - - [ ] 3.4 Run tests to verify validation still works as expected. +- [x] 3.0 Refactor `validation.py` for better modularity. + - [x] 3.1 Create safety net tests for validation module. + - [x] 3.2 Extract common validation logic into separate functions. + - [x] 3.3 Improve error handling and validation messages. + - [x] 3.4 Run tests to verify validation still works as expected. - [ ] 4.0 Refactor `transformation.py` for better modularity. - [ ] 4.1 Create safety net tests for transformation module. diff --git a/tests/test_data_validation.py b/tests/test_data_validation.py new file mode 100644 index 0000000..f9f2cf2 --- /dev/null +++ b/tests/test_data_validation.py @@ -0,0 +1,188 @@ +""" +Tests for data validation module. + +This module provides comprehensive test coverage for the data validation utilities +and base validator class. +""" + +import pytest +from datetime import datetime, timezone +from decimal import Decimal +from typing import Dict, Any + +from data.common.validation import ( + ValidationResult, + BaseDataValidator, + is_valid_decimal, + validate_required_fields +) +from data.common.data_types import DataValidationResult, StandardizedTrade, TradeSide + + +class TestValidationResult: + """Test ValidationResult class.""" + + def test_init_with_defaults(self): + """Test initialization with default values.""" + result = ValidationResult(is_valid=True) + assert result.is_valid + assert result.errors == [] + assert result.warnings == [] + assert result.sanitized_data is None + + def test_init_with_errors(self): + """Test initialization with errors.""" + errors = ["Error 1", "Error 2"] + result = ValidationResult(is_valid=False, errors=errors) + assert not result.is_valid + assert result.errors == errors + assert result.warnings == [] + + def test_init_with_warnings(self): + """Test initialization with warnings.""" + warnings = ["Warning 1"] + result = ValidationResult(is_valid=True, warnings=warnings) + assert result.is_valid + assert result.warnings == warnings + assert result.errors == [] + + def test_init_with_sanitized_data(self): + """Test initialization with sanitized data.""" + data = {"key": "value"} + result = ValidationResult(is_valid=True, sanitized_data=data) + assert result.sanitized_data == data + + +class MockDataValidator(BaseDataValidator): + """Mock implementation of BaseDataValidator for testing.""" + + def validate_symbol_format(self, symbol: str) -> ValidationResult: + """Mock implementation of validate_symbol_format.""" + if not symbol or not isinstance(symbol, str): + return ValidationResult(False, errors=["Invalid symbol format"]) + return ValidationResult(True) + + def validate_websocket_message(self, message: Dict[str, Any]) -> DataValidationResult: + """Mock implementation of validate_websocket_message.""" + if not isinstance(message, dict): + return DataValidationResult(False, ["Invalid message format"], []) + return DataValidationResult(True, [], []) + + +class TestBaseDataValidator: + """Test BaseDataValidator class.""" + + @pytest.fixture + def validator(self): + """Create a mock validator instance.""" + return MockDataValidator("test_exchange") + + def test_validate_price(self, validator): + """Test price validation.""" + # Test valid price + result = validator.validate_price("123.45") + assert result.is_valid + assert result.sanitized_data == Decimal("123.45") + + # Test invalid price + result = validator.validate_price("invalid") + assert not result.is_valid + assert "Invalid price value" in result.errors[0] + + # Test price bounds + result = validator.validate_price("0.000000001") # Below min + assert result.is_valid # Still valid but with warning + assert "below minimum" in result.warnings[0] + + def test_validate_size(self, validator): + """Test size validation.""" + # Test valid size + result = validator.validate_size("10.5") + assert result.is_valid + assert result.sanitized_data == Decimal("10.5") + + # Test invalid size + result = validator.validate_size("-1") + assert not result.is_valid + assert "must be positive" in result.errors[0] + + def test_validate_timestamp(self, validator): + """Test timestamp validation.""" + current_time = int(datetime.now(timezone.utc).timestamp() * 1000) + + # Test valid timestamp + result = validator.validate_timestamp(current_time) + assert result.is_valid + + # Test invalid timestamp + result = validator.validate_timestamp("invalid") + assert not result.is_valid + assert "Invalid timestamp format" in result.errors[0] + + # Test old timestamp + old_timestamp = 999999999999 # Before min_timestamp + result = validator.validate_timestamp(old_timestamp) + assert not result.is_valid + assert "too old" in result.errors[0] + + def test_validate_trade_side(self, validator): + """Test trade side validation.""" + # Test valid sides + assert validator.validate_trade_side("buy").is_valid + assert validator.validate_trade_side("sell").is_valid + + # Test invalid sides + result = validator.validate_trade_side("invalid") + assert not result.is_valid + assert "Must be 'buy' or 'sell'" in result.errors[0] + + def test_validate_trade_id(self, validator): + """Test trade ID validation.""" + # Test valid trade IDs + assert validator.validate_trade_id("trade123").is_valid + assert validator.validate_trade_id("123").is_valid + assert validator.validate_trade_id("trade-123_abc").is_valid + + # Test invalid trade IDs + result = validator.validate_trade_id("") + assert not result.is_valid + assert "cannot be empty" in result.errors[0] + + def test_validate_symbol_match(self, validator): + """Test symbol matching validation.""" + # Test basic symbol validation + assert validator.validate_symbol_match("BTC-USD").is_valid + + # Test symbol mismatch + result = validator.validate_symbol_match("BTC-USD", "ETH-USD") + assert result.is_valid # Still valid but with warning + assert "mismatch" in result.warnings[0] + + # Test invalid symbol type + result = validator.validate_symbol_match(123) + assert not result.is_valid + assert "must be string" in result.errors[0] + + +def test_is_valid_decimal(): + """Test is_valid_decimal utility function.""" + # Test valid decimals + assert is_valid_decimal("123.45") + assert is_valid_decimal(123.45) + assert is_valid_decimal(Decimal("123.45")) + + # Test invalid decimals + assert not is_valid_decimal("invalid") + assert not is_valid_decimal(None) + assert not is_valid_decimal("") + + +def test_validate_required_fields(): + """Test validate_required_fields utility function.""" + data = {"field1": "value1", "field2": None, "field3": "value3"} + required = ["field1", "field2", "field4"] + + missing = validate_required_fields(data, required) + assert "field2" in missing # None value + assert "field4" in missing # Missing field + assert "field1" not in missing # Present field \ No newline at end of file