""" Strategy Signal Validation Pipeline This module provides validation, filtering, and quality assessment for strategy-generated signals to ensure reliability and consistency. """ from typing import List, Dict, Any, Optional, Tuple from datetime import datetime, timezone from dataclasses import dataclass from .data_types import StrategySignal, SignalType, StrategyResult from utils.logger import get_logger @dataclass class ValidationConfig: """Configuration for signal validation.""" min_confidence: float = 0.0 max_confidence: float = 1.0 required_metadata_fields: List[str] = None allowed_signal_types: List[SignalType] = None price_tolerance_percent: float = 5.0 # Max price deviation from market def __post_init__(self): if self.required_metadata_fields is None: self.required_metadata_fields = [] if self.allowed_signal_types is None: self.allowed_signal_types = list(SignalType) class StrategySignalValidator: """ Validates strategy signals for quality, consistency, and compliance. Provides comprehensive validation including confidence checks, signal type validation, price reasonableness, and metadata validation. """ def __init__(self, config: ValidationConfig = None): """ Initialize signal validator. Args: config: Validation configuration """ self.config = config or ValidationConfig() self.logger = get_logger() # Validation statistics self._validation_stats = { 'total_signals_validated': 0, 'valid_signals': 0, 'invalid_signals': 0, 'validation_errors': {} } def validate_signal(self, signal: StrategySignal) -> Tuple[bool, List[str]]: """ Validate a single strategy signal. Args: signal: Signal to validate Returns: Tuple of (is_valid, list_of_errors) """ errors = [] self._validation_stats['total_signals_validated'] += 1 # Validate confidence if not (self.config.min_confidence <= signal.confidence <= self.config.max_confidence): errors.append(f"Invalid confidence {signal.confidence}, must be between {self.config.min_confidence} and {self.config.max_confidence}") # Validate signal type if signal.signal_type not in self.config.allowed_signal_types: errors.append(f"Signal type {signal.signal_type} not in allowed types") # Validate price if signal.price <= 0: errors.append(f"Invalid price {signal.price}, must be positive") # Validate required metadata if self.config.required_metadata_fields: if not signal.metadata: errors.append(f"Missing required metadata fields: {self.config.required_metadata_fields}") else: missing_fields = [field for field in self.config.required_metadata_fields if field not in signal.metadata] if missing_fields: errors.append(f"Missing required metadata fields: {missing_fields}") # Update statistics is_valid = len(errors) == 0 if is_valid: self._validation_stats['valid_signals'] += 1 else: self._validation_stats['invalid_signals'] += 1 for error in errors: error_type = error.split(':')[0] if ':' in error else error self._validation_stats['validation_errors'][error_type] = \ self._validation_stats['validation_errors'].get(error_type, 0) + 1 return is_valid, errors def validate_signals_batch(self, signals: List[StrategySignal]) -> Tuple[List[StrategySignal], List[StrategySignal]]: """ Validate multiple signals and return valid and invalid lists. Args: signals: List of signals to validate Returns: Tuple of (valid_signals, invalid_signals) """ valid_signals = [] invalid_signals = [] for signal in signals: is_valid, errors = self.validate_signal(signal) if is_valid: valid_signals.append(signal) else: invalid_signals.append(signal) self.logger.debug(f"Invalid signal filtered out: {errors}") return valid_signals, invalid_signals def filter_signals_by_confidence( self, signals: List[StrategySignal], min_confidence: float = None ) -> List[StrategySignal]: """ Filter signals by minimum confidence threshold. Args: signals: List of signals to filter min_confidence: Minimum confidence threshold (uses config if None) Returns: Filtered list of signals """ threshold = min_confidence if min_confidence is not None else self.config.min_confidence filtered_signals = [signal for signal in signals if signal.confidence >= threshold] self.logger.debug(f"Filtered {len(signals) - len(filtered_signals)} signals below confidence {threshold}") return filtered_signals def filter_signals_by_type( self, signals: List[StrategySignal], allowed_types: List[SignalType] = None ) -> List[StrategySignal]: """ Filter signals by allowed signal types. Args: signals: List of signals to filter allowed_types: Allowed signal types (uses config if None) Returns: Filtered list of signals """ types = allowed_types if allowed_types is not None else self.config.allowed_signal_types filtered_signals = [signal for signal in signals if signal.signal_type in types] self.logger.debug(f"Filtered {len(signals) - len(filtered_signals)} signals by type") return filtered_signals def get_validation_statistics(self) -> Dict[str, Any]: """Get comprehensive validation statistics.""" stats = self._validation_stats.copy() if stats['total_signals_validated'] > 0: stats['validation_success_rate'] = stats['valid_signals'] / stats['total_signals_validated'] stats['validation_failure_rate'] = stats['invalid_signals'] / stats['total_signals_validated'] else: stats['validation_success_rate'] = 0.0 stats['validation_failure_rate'] = 0.0 return stats def transform_signal_confidence( self, signal: StrategySignal, confidence_multiplier: float = 1.0, max_confidence: float = None ) -> StrategySignal: """ Transform signal confidence with multiplier and cap. Args: signal: Signal to transform confidence_multiplier: Multiplier for confidence max_confidence: Maximum confidence cap (uses config if None) Returns: Transformed signal with updated confidence """ max_conf = max_confidence if max_confidence is not None else self.config.max_confidence # Create new signal with transformed confidence new_confidence = min(signal.confidence * confidence_multiplier, max_conf) transformed_signal = StrategySignal( timestamp=signal.timestamp, symbol=signal.symbol, timeframe=signal.timeframe, signal_type=signal.signal_type, price=signal.price, confidence=new_confidence, metadata=signal.metadata.copy() if signal.metadata else None ) return transformed_signal def enrich_signal_metadata( self, signal: StrategySignal, additional_metadata: Dict[str, Any] ) -> StrategySignal: """ Enrich signal with additional metadata. Args: signal: Signal to enrich additional_metadata: Additional metadata to add Returns: Signal with enriched metadata """ # Merge metadata enriched_metadata = signal.metadata.copy() if signal.metadata else {} enriched_metadata.update(additional_metadata) enriched_signal = StrategySignal( timestamp=signal.timestamp, symbol=signal.symbol, timeframe=signal.timeframe, signal_type=signal.signal_type, price=signal.price, confidence=signal.confidence, metadata=enriched_metadata ) return enriched_signal def transform_signals_batch( self, signals: List[StrategySignal], confidence_multiplier: float = 1.0, additional_metadata: Dict[str, Any] = None ) -> List[StrategySignal]: """ Apply transformations to multiple signals. Args: signals: List of signals to transform confidence_multiplier: Confidence multiplier additional_metadata: Additional metadata to add Returns: List of transformed signals """ transformed_signals = [] for signal in signals: # Apply confidence transformation transformed_signal = self.transform_signal_confidence(signal, confidence_multiplier) # Apply metadata enrichment if provided if additional_metadata: transformed_signal = self.enrich_signal_metadata(transformed_signal, additional_metadata) transformed_signals.append(transformed_signal) self.logger.debug(f"Transformed {len(signals)} signals") return transformed_signals def calculate_signal_quality_metrics(self, signals: List[StrategySignal]) -> Dict[str, Any]: """ Calculate comprehensive quality metrics for signals. Args: signals: List of signals to analyze Returns: Dictionary containing quality metrics """ if not signals: return {'error': 'No signals provided for quality analysis'} # Basic metrics total_signals = len(signals) confidence_values = [signal.confidence for signal in signals] # Signal type distribution signal_type_counts = {} for signal in signals: signal_type_counts[signal.signal_type.value] = signal_type_counts.get(signal.signal_type.value, 0) + 1 # Confidence metrics avg_confidence = sum(confidence_values) / total_signals min_confidence = min(confidence_values) max_confidence = max(confidence_values) # Quality scoring (0-100) high_confidence_signals = sum(1 for conf in confidence_values if conf >= 0.7) quality_score = (high_confidence_signals / total_signals) * 100 # Metadata completeness signals_with_metadata = sum(1 for signal in signals if signal.metadata) metadata_completeness = (signals_with_metadata / total_signals) * 100 return { 'total_signals': total_signals, 'signal_type_distribution': signal_type_counts, 'confidence_metrics': { 'average': round(avg_confidence, 3), 'minimum': round(min_confidence, 3), 'maximum': round(max_confidence, 3), 'high_confidence_count': high_confidence_signals, 'high_confidence_percentage': round((high_confidence_signals / total_signals) * 100, 1) }, 'quality_score': round(quality_score, 1), 'metadata_completeness_percentage': round(metadata_completeness, 1), 'recommendations': self._generate_quality_recommendations(signals) } def _generate_quality_recommendations(self, signals: List[StrategySignal]) -> List[str]: """Generate quality improvement recommendations.""" recommendations = [] confidence_values = [signal.confidence for signal in signals] avg_confidence = sum(confidence_values) / len(confidence_values) if avg_confidence < 0.5: recommendations.append("Consider increasing confidence thresholds or improving signal generation logic") signals_with_metadata = sum(1 for signal in signals if signal.metadata) if signals_with_metadata / len(signals) < 0.8: recommendations.append("Enhance metadata collection to improve signal traceability") signal_types = set(signal.signal_type for signal in signals) if len(signal_types) == 1: recommendations.append("Consider diversifying signal types for better strategy coverage") return recommendations if recommendations else ["Signal quality appears good - no specific recommendations"] def generate_validation_report(self) -> Dict[str, Any]: """Generate comprehensive validation report.""" stats = self.get_validation_statistics() return { 'report_timestamp': datetime.now(timezone.utc).isoformat(), 'validation_summary': { 'total_validated': stats['total_signals_validated'], 'success_rate': f"{stats.get('validation_success_rate', 0) * 100:.1f}%", 'failure_rate': f"{stats.get('validation_failure_rate', 0) * 100:.1f}%" }, 'error_analysis': stats.get('validation_errors', {}), 'configuration': { 'min_confidence': self.config.min_confidence, 'max_confidence': self.config.max_confidence, 'allowed_signal_types': [st.value for st in self.config.allowed_signal_types], 'required_metadata_fields': self.config.required_metadata_fields }, 'health_status': 'good' if stats.get('validation_success_rate', 0) >= 0.8 else 'needs_attention' }