diff --git a/cycle_detector.py b/cycle_detector.py index 2192a0b..a86f014 100644 --- a/cycle_detector.py +++ b/cycle_detector.py @@ -1,248 +1,248 @@ -import pandas as pd -import numpy as np -import matplotlib.pyplot as plt -from scipy.signal import argrelextrema - -class CycleDetector: - def __init__(self, data, timeframe='daily'): - """ - Initialize the CycleDetector with price data. - - Parameters: - - data: DataFrame with at least 'date' or 'datetime' and 'close' columns - - timeframe: 'daily', 'weekly', or 'monthly' - """ - self.data = data.copy() - self.timeframe = timeframe - - # Ensure we have a consistent date column name - if 'datetime' in self.data.columns and 'date' not in self.data.columns: - self.data.rename(columns={'datetime': 'date'}, inplace=True) - - # Convert data to specified timeframe if needed - if timeframe == 'weekly' and 'date' in self.data.columns: - self.data = self._convert_data(self.data, 'W') - elif timeframe == 'monthly' and 'date' in self.data.columns: - self.data = self._convert_data(self.data, 'M') - - # Add columns for local minima and maxima detection - self._add_swing_points() - - def _convert_data(self, data, timeframe): - """Convert daily data to 'timeframe' timeframe.""" - data['date'] = pd.to_datetime(data['date']) - data.set_index('date', inplace=True) - weekly = data.resample(timeframe).agg({ - 'open': 'first', - 'high': 'max', - 'low': 'min', - 'close': 'last', - 'volume': 'sum' - }) - return weekly.reset_index() - - - def _add_swing_points(self, window=5): - """ - Identify swing points (local minima and maxima). - - Parameters: - - window: The window size for local minima/maxima detection - """ - # Set the index to make calculations easier - if 'date' in self.data.columns: - self.data.set_index('date', inplace=True) - - # Detect local minima (swing lows) - min_idx = argrelextrema(self.data['low'].values, np.less, order=window)[0] - self.data['swing_low'] = False - self.data.iloc[min_idx, self.data.columns.get_loc('swing_low')] = True - - # Detect local maxima (swing highs) - max_idx = argrelextrema(self.data['high'].values, np.greater, order=window)[0] - self.data['swing_high'] = False - self.data.iloc[max_idx, self.data.columns.get_loc('swing_high')] = True - - # Reset index - self.data.reset_index(inplace=True) - - def find_cycle_lows(self): - """Find all swing lows which represent cycle lows.""" - swing_low_dates = self.data[self.data['swing_low']]['date'].values - return swing_low_dates - - def calculate_cycle_lengths(self): - """Calculate the lengths of each cycle between consecutive lows.""" - swing_low_indices = np.where(self.data['swing_low'])[0] - cycle_lengths = np.diff(swing_low_indices) - return cycle_lengths - - def get_average_cycle_length(self): - """Calculate the average cycle length.""" - cycle_lengths = self.calculate_cycle_lengths() - if len(cycle_lengths) > 0: - return np.mean(cycle_lengths) - return None - - def get_cycle_window(self, tolerance=0.10): - """ - Get the cycle window with the specified tolerance. - - Parameters: - - tolerance: The tolerance as a percentage (default: 10%) - - Returns: - - tuple: (min_cycle_length, avg_cycle_length, max_cycle_length) - """ - avg_length = self.get_average_cycle_length() - if avg_length is not None: - min_length = avg_length * (1 - tolerance) - max_length = avg_length * (1 + tolerance) - return (min_length, avg_length, max_length) - return None - - def detect_two_drives_pattern(self, lookback=10): - """ - Detect 2-drives pattern: a swing low, counter trend bounce, and a lower low. - - Parameters: - - lookback: Number of periods to look back - - Returns: - - list: Indices where 2-drives patterns are detected - """ - patterns = [] - - for i in range(lookback, len(self.data) - 1): - if not self.data.iloc[i]['swing_low']: - continue - - # Get the segment of data to check for pattern - segment = self.data.iloc[i-lookback:i+1] - swing_lows = segment[segment['swing_low']]['low'].values - - if len(swing_lows) >= 2 and swing_lows[-1] < swing_lows[-2]: - # Check if there was a bounce between the two lows - between_lows = segment.iloc[-len(swing_lows):-1] - if len(between_lows) > 0 and max(between_lows['high']) > swing_lows[-2]: - patterns.append(i) - - return patterns - - def detect_v_shaped_lows(self, window=5, threshold=0.02): - """ - Detect V-shaped cycle lows (sharp decline followed by sharp rise). - - Parameters: - - window: Window to look for sharp price changes - - threshold: Percentage change threshold to consider 'sharp' - - Returns: - - list: Indices where V-shaped patterns are detected - """ - patterns = [] - - # Find all swing lows - swing_low_indices = np.where(self.data['swing_low'])[0] - - for idx in swing_low_indices: - # Need enough data points before and after - if idx < window or idx + window >= len(self.data): - continue - - # Get the low price at this swing low - low_price = self.data.iloc[idx]['low'] - - # Check for sharp decline before low (at least window bars before) - before_segment = self.data.iloc[max(0, idx-window):idx] - if len(before_segment) > 0: - max_before = before_segment['high'].max() - decline = (max_before - low_price) / max_before - - # Check for sharp rise after low (at least window bars after) - after_segment = self.data.iloc[idx+1:min(len(self.data), idx+window+1)] - if len(after_segment) > 0: - max_after = after_segment['high'].max() - rise = (max_after - low_price) / low_price - - # Both decline and rise must exceed threshold to be considered V-shaped - if decline > threshold and rise > threshold: - patterns.append(idx) - - return patterns - - def plot_cycles(self, pattern_detection=None, title_suffix=''): - """ - Plot the price data with cycle lows and detected patterns. - - Parameters: - - pattern_detection: 'two_drives', 'v_shape', or None - - title_suffix: Optional suffix for the plot title - """ - plt.figure(figsize=(14, 7)) - - # Determine the date column name (could be 'date' or 'datetime') - date_col = 'date' if 'date' in self.data.columns else 'datetime' - - # Plot price data - plt.plot(self.data[date_col], self.data['close'], label='Close Price') - - # Calculate a consistent vertical position for indicators based on price range - price_range = self.data['close'].max() - self.data['close'].min() - indicator_offset = price_range * 0.01 # 1% of price range - - # Plot cycle lows (now at a fixed offset below the low price) - swing_lows = self.data[self.data['swing_low']] - plt.scatter(swing_lows[date_col], swing_lows['low'] - indicator_offset, - color='green', marker='^', s=100, label='Cycle Lows') - - # Plot specific patterns if requested - if 'two_drives' in pattern_detection: - pattern_indices = self.detect_two_drives_pattern() - if pattern_indices: - patterns = self.data.iloc[pattern_indices] - plt.scatter(patterns[date_col], patterns['low'] - indicator_offset * 2, - color='red', marker='o', s=150, label='Two Drives Pattern') - - elif 'v_shape' in pattern_detection: - pattern_indices = self.detect_v_shaped_lows() - if pattern_indices: - patterns = self.data.iloc[pattern_indices] - plt.scatter(patterns[date_col], patterns['low'] - indicator_offset * 2, - color='purple', marker='o', s=150, label='V-Shape Pattern') - - # Add cycle lengths and averages - cycle_lengths = self.calculate_cycle_lengths() - avg_cycle = self.get_average_cycle_length() - cycle_window = self.get_cycle_window() - - window_text = "" - if cycle_window: - window_text = f"Tolerance Window: [{cycle_window[0]:.2f} - {cycle_window[2]:.2f}]" - - plt.title(f"Detected Cycles - {self.timeframe.capitalize()} Timeframe {title_suffix}\n" - f"Average Cycle Length: {avg_cycle:.2f} periods, {window_text}") - - plt.legend() - plt.grid(True) - plt.show() - -# Usage example: -# 1. Load your data -# data = pd.read_csv('your_price_data.csv') - -# 2. Create cycle detector instances for different timeframes -# weekly_detector = CycleDetector(data, timeframe='weekly') -# daily_detector = CycleDetector(data, timeframe='daily') - -# 3. Analyze cycles -# weekly_cycle_length = weekly_detector.get_average_cycle_length() -# daily_cycle_length = daily_detector.get_average_cycle_length() - -# 4. Detect patterns -# two_drives = weekly_detector.detect_two_drives_pattern() -# v_shapes = daily_detector.detect_v_shaped_lows() - -# 5. Visualize -# weekly_detector.plot_cycles(pattern_detection='two_drives') -# daily_detector.plot_cycles(pattern_detection='v_shape') +import pandas as pd +import numpy as np +import matplotlib.pyplot as plt +from scipy.signal import argrelextrema + +class CycleDetector: + def __init__(self, data, timeframe='daily'): + """ + Initialize the CycleDetector with price data. + + Parameters: + - data: DataFrame with at least 'date' or 'datetime' and 'close' columns + - timeframe: 'daily', 'weekly', or 'monthly' + """ + self.data = data.copy() + self.timeframe = timeframe + + # Ensure we have a consistent date column name + if 'datetime' in self.data.columns and 'date' not in self.data.columns: + self.data.rename(columns={'datetime': 'date'}, inplace=True) + + # Convert data to specified timeframe if needed + if timeframe == 'weekly' and 'date' in self.data.columns: + self.data = self._convert_data(self.data, 'W') + elif timeframe == 'monthly' and 'date' in self.data.columns: + self.data = self._convert_data(self.data, 'M') + + # Add columns for local minima and maxima detection + self._add_swing_points() + + def _convert_data(self, data, timeframe): + """Convert daily data to 'timeframe' timeframe.""" + data['date'] = pd.to_datetime(data['date']) + data.set_index('date', inplace=True) + weekly = data.resample(timeframe).agg({ + 'open': 'first', + 'high': 'max', + 'low': 'min', + 'close': 'last', + 'volume': 'sum' + }) + return weekly.reset_index() + + + def _add_swing_points(self, window=5): + """ + Identify swing points (local minima and maxima). + + Parameters: + - window: The window size for local minima/maxima detection + """ + # Set the index to make calculations easier + if 'date' in self.data.columns: + self.data.set_index('date', inplace=True) + + # Detect local minima (swing lows) + min_idx = argrelextrema(self.data['low'].values, np.less, order=window)[0] + self.data['swing_low'] = False + self.data.iloc[min_idx, self.data.columns.get_loc('swing_low')] = True + + # Detect local maxima (swing highs) + max_idx = argrelextrema(self.data['high'].values, np.greater, order=window)[0] + self.data['swing_high'] = False + self.data.iloc[max_idx, self.data.columns.get_loc('swing_high')] = True + + # Reset index + self.data.reset_index(inplace=True) + + def find_cycle_lows(self): + """Find all swing lows which represent cycle lows.""" + swing_low_dates = self.data[self.data['swing_low']]['date'].values + return swing_low_dates + + def calculate_cycle_lengths(self): + """Calculate the lengths of each cycle between consecutive lows.""" + swing_low_indices = np.where(self.data['swing_low'])[0] + cycle_lengths = np.diff(swing_low_indices) + return cycle_lengths + + def get_average_cycle_length(self): + """Calculate the average cycle length.""" + cycle_lengths = self.calculate_cycle_lengths() + if len(cycle_lengths) > 0: + return np.mean(cycle_lengths) + return None + + def get_cycle_window(self, tolerance=0.10): + """ + Get the cycle window with the specified tolerance. + + Parameters: + - tolerance: The tolerance as a percentage (default: 10%) + + Returns: + - tuple: (min_cycle_length, avg_cycle_length, max_cycle_length) + """ + avg_length = self.get_average_cycle_length() + if avg_length is not None: + min_length = avg_length * (1 - tolerance) + max_length = avg_length * (1 + tolerance) + return (min_length, avg_length, max_length) + return None + + def detect_two_drives_pattern(self, lookback=10): + """ + Detect 2-drives pattern: a swing low, counter trend bounce, and a lower low. + + Parameters: + - lookback: Number of periods to look back + + Returns: + - list: Indices where 2-drives patterns are detected + """ + patterns = [] + + for i in range(lookback, len(self.data) - 1): + if not self.data.iloc[i]['swing_low']: + continue + + # Get the segment of data to check for pattern + segment = self.data.iloc[i-lookback:i+1] + swing_lows = segment[segment['swing_low']]['low'].values + + if len(swing_lows) >= 2 and swing_lows[-1] < swing_lows[-2]: + # Check if there was a bounce between the two lows + between_lows = segment.iloc[-len(swing_lows):-1] + if len(between_lows) > 0 and max(between_lows['high']) > swing_lows[-2]: + patterns.append(i) + + return patterns + + def detect_v_shaped_lows(self, window=5, threshold=0.02): + """ + Detect V-shaped cycle lows (sharp decline followed by sharp rise). + + Parameters: + - window: Window to look for sharp price changes + - threshold: Percentage change threshold to consider 'sharp' + + Returns: + - list: Indices where V-shaped patterns are detected + """ + patterns = [] + + # Find all swing lows + swing_low_indices = np.where(self.data['swing_low'])[0] + + for idx in swing_low_indices: + # Need enough data points before and after + if idx < window or idx + window >= len(self.data): + continue + + # Get the low price at this swing low + low_price = self.data.iloc[idx]['low'] + + # Check for sharp decline before low (at least window bars before) + before_segment = self.data.iloc[max(0, idx-window):idx] + if len(before_segment) > 0: + max_before = before_segment['high'].max() + decline = (max_before - low_price) / max_before + + # Check for sharp rise after low (at least window bars after) + after_segment = self.data.iloc[idx+1:min(len(self.data), idx+window+1)] + if len(after_segment) > 0: + max_after = after_segment['high'].max() + rise = (max_after - low_price) / low_price + + # Both decline and rise must exceed threshold to be considered V-shaped + if decline > threshold and rise > threshold: + patterns.append(idx) + + return patterns + + def plot_cycles(self, pattern_detection=None, title_suffix=''): + """ + Plot the price data with cycle lows and detected patterns. + + Parameters: + - pattern_detection: 'two_drives', 'v_shape', or None + - title_suffix: Optional suffix for the plot title + """ + plt.figure(figsize=(14, 7)) + + # Determine the date column name (could be 'date' or 'datetime') + date_col = 'date' if 'date' in self.data.columns else 'datetime' + + # Plot price data + plt.plot(self.data[date_col], self.data['close'], label='Close Price') + + # Calculate a consistent vertical position for indicators based on price range + price_range = self.data['close'].max() - self.data['close'].min() + indicator_offset = price_range * 0.01 # 1% of price range + + # Plot cycle lows (now at a fixed offset below the low price) + swing_lows = self.data[self.data['swing_low']] + plt.scatter(swing_lows[date_col], swing_lows['low'] - indicator_offset, + color='green', marker='^', s=100, label='Cycle Lows') + + # Plot specific patterns if requested + if 'two_drives' in pattern_detection: + pattern_indices = self.detect_two_drives_pattern() + if pattern_indices: + patterns = self.data.iloc[pattern_indices] + plt.scatter(patterns[date_col], patterns['low'] - indicator_offset * 2, + color='red', marker='o', s=150, label='Two Drives Pattern') + + elif 'v_shape' in pattern_detection: + pattern_indices = self.detect_v_shaped_lows() + if pattern_indices: + patterns = self.data.iloc[pattern_indices] + plt.scatter(patterns[date_col], patterns['low'] - indicator_offset * 2, + color='purple', marker='o', s=150, label='V-Shape Pattern') + + # Add cycle lengths and averages + cycle_lengths = self.calculate_cycle_lengths() + avg_cycle = self.get_average_cycle_length() + cycle_window = self.get_cycle_window() + + window_text = "" + if cycle_window: + window_text = f"Tolerance Window: [{cycle_window[0]:.2f} - {cycle_window[2]:.2f}]" + + plt.title(f"Detected Cycles - {self.timeframe.capitalize()} Timeframe {title_suffix}\n" + f"Average Cycle Length: {avg_cycle:.2f} periods, {window_text}") + + plt.legend() + plt.grid(True) + plt.show() + +# Usage example: +# 1. Load your data +# data = pd.read_csv('your_price_data.csv') + +# 2. Create cycle detector instances for different timeframes +# weekly_detector = CycleDetector(data, timeframe='weekly') +# daily_detector = CycleDetector(data, timeframe='daily') + +# 3. Analyze cycles +# weekly_cycle_length = weekly_detector.get_average_cycle_length() +# daily_cycle_length = daily_detector.get_average_cycle_length() + +# 4. Detect patterns +# two_drives = weekly_detector.detect_two_drives_pattern() +# v_shapes = daily_detector.detect_v_shaped_lows() + +# 5. Visualize +# weekly_detector.plot_cycles(pattern_detection='two_drives') +# daily_detector.plot_cycles(pattern_detection='v_shape') diff --git a/main.py b/main.py index 4df366c..06a343b 100644 --- a/main.py +++ b/main.py @@ -6,6 +6,7 @@ from cycle_detector import CycleDetector # Load data from CSV file instead of database data = pd.read_csv('data/btcusd_1-day_data.csv') + # Convert datetime column to datetime type start_date = pd.to_datetime('2025-04-01') stop_date = pd.to_datetime('2025-05-06') diff --git a/trend_detector_macd.py b/trend_detector_macd.py index e3bf538..045f604 100644 --- a/trend_detector_macd.py +++ b/trend_detector_macd.py @@ -1,259 +1,259 @@ -import pandas as pd -import numpy as np -import ta -import matplotlib.pyplot as plt -import matplotlib.dates as mdates -import logging -import mplfinance as mpf -from matplotlib.patches import Rectangle - -class TrendDetectorMACD: - def __init__(self, data, verbose=False): - self.data = data - self.verbose = verbose - # Configure logging - logging.basicConfig(level=logging.INFO if verbose else logging.WARNING, - format='%(asctime)s - %(levelname)s - %(message)s') - self.logger = logging.getLogger('TrendDetector') - - # Convert data to pandas DataFrame if it's not already - if not isinstance(self.data, pd.DataFrame): - if isinstance(self.data, list): - self.logger.info("Converting list to DataFrame") - self.data = pd.DataFrame({'close': self.data}) - else: - self.logger.error("Invalid data format provided") - raise ValueError("Data must be a pandas DataFrame or a list") - - self.logger.info(f"Initialized TrendDetector with {len(self.data)} data points") - - def detect_trends_MACD_signal(self): - self.logger.info("Starting trend detection") - if len(self.data) < 3: - self.logger.warning("Not enough data points for trend detection") - return {"error": "Not enough data points for trend detection"} - - # Create a copy of the DataFrame to avoid modifying the original - df = self.data.copy() - self.logger.info("Created copy of input data") - - # If 'close' column doesn't exist, try to use a relevant column - if 'close' not in df.columns and len(df.columns) > 0: - self.logger.info(f"'close' column not found, using {df.columns[0]} instead") - df['close'] = df[df.columns[0]] # Use the first column as 'close' - - # Add trend indicators - self.logger.info("Calculating MACD indicators") - # Moving Average Convergence Divergence (MACD) - df['macd'] = ta.trend.macd(df['close']) - df['macd_signal'] = ta.trend.macd_signal(df['close']) - df['macd_diff'] = ta.trend.macd_diff(df['close']) - - # Directional Movement Index (DMI) - if all(col in df.columns for col in ['high', 'low', 'close']): - self.logger.info("Calculating ADX indicators") - df['adx'] = ta.trend.adx(df['high'], df['low'], df['close']) - df['adx_pos'] = ta.trend.adx_pos(df['high'], df['low'], df['close']) - df['adx_neg'] = ta.trend.adx_neg(df['high'], df['low'], df['close']) - - # Identify trend changes - self.logger.info("Identifying trend changes") - df['trend'] = np.where(df['macd'] > df['macd_signal'], 'up', 'down') - df['trend_change'] = df['trend'] != df['trend'].shift(1) - - # Generate trend segments - self.logger.info("Generating trend segments") - trends = [] - trend_start = 0 - - for i in range(1, len(df)): - - if df['trend_change'].iloc[i]: - if i > trend_start: - trends.append({ - "type": df['trend'].iloc[i-1], - "start_index": trend_start, - "end_index": i-1, - "start_value": df['close'].iloc[trend_start], - "end_value": df['close'].iloc[i-1] - }) - trend_start = i - - # Add the last trend - if trend_start < len(df): - trends.append({ - "type": df['trend'].iloc[-1], - "start_index": trend_start, - "end_index": len(df)-1, - "start_value": df['close'].iloc[trend_start], - "end_value": df['close'].iloc[-1] - }) - - self.logger.info(f"Detected {len(trends)} trend segments") - return trends - - def get_strongest_trend(self): - self.logger.info("Finding strongest trend") - trends = self.detect_trends_MACD_signal() - if isinstance(trends, dict) and "error" in trends: - self.logger.warning(f"Error in trend detection: {trends['error']}") - return trends - - if not trends: - self.logger.info("No significant trends detected") - return {"message": "No significant trends detected"} - - strongest = max(trends, key=lambda x: abs(x["end_value"] - x["start_value"])) - self.logger.info(f"Strongest trend: {strongest['type']} from index {strongest['start_index']} to {strongest['end_index']}") - return strongest - - def plot_trends(self, trends): - """ - Plot price data with identified trends highlighted using candlestick charts. - """ - self.logger.info("Plotting trends with candlesticks") - if isinstance(trends, dict) and "error" in trends: - self.logger.error(trends["error"]) - print(trends["error"]) - return - - if not trends: - self.logger.warning("No significant trends detected for plotting") - print("No significant trends detected") - return - - # Create a figure with 2 subplots that share the x-axis - fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(14, 8), gridspec_kw={'height_ratios': [2, 1]}, sharex=True) - self.logger.info("Creating plot figure with shared x-axis") - - # Prepare data for candlestick chart - df = self.data.copy() - - # Ensure required columns exist for candlestick - required_cols = ['open', 'high', 'low', 'close'] - if not all(col in df.columns for col in required_cols): - self.logger.warning("Missing required columns for candlestick. Defaulting to line chart.") - if 'close' in df.columns: - ax1.plot(df.index if 'datetime' not in df.columns else df['datetime'], - df['close'], color='black', alpha=0.7, linewidth=1, label='Price') - else: - ax1.plot(df.index if 'datetime' not in df.columns else df['datetime'], - df[df.columns[0]], color='black', alpha=0.7, linewidth=1, label='Price') - else: - # Get x values (dates if available, otherwise indices) - if 'datetime' in df.columns: - x_label = 'Date' - # Format date axis - ax1.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d')) - ax2.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d')) - fig.autofmt_xdate() - self.logger.info("Using datetime for x-axis") - - # For candlestick, ensure datetime is the index - if df.index.name != 'datetime': - df = df.set_index('datetime') - else: - x_label = 'Index' - self.logger.info("Using index for x-axis") - - # Plot candlestick chart - up_color = 'green' - down_color = 'red' - - # Draw candlesticks manually - width = 0.6 - for i in range(len(df)): - # Get OHLC values for this candle - open_val = df['open'].iloc[i] - close_val = df['close'].iloc[i] - high_val = df['high'].iloc[i] - low_val = df['low'].iloc[i] - idx = df.index[i] - - # Determine candle color - color = up_color if close_val >= open_val else down_color - - # Plot candle body - body_height = abs(close_val - open_val) - bottom = min(open_val, close_val) - rect = Rectangle((i - width/2, bottom), width, body_height, color=color, alpha=0.8) - ax1.add_patch(rect) - - # Plot candle wicks - ax1.plot([i, i], [low_val, high_val], color='black', linewidth=1) - - # Set appropriate x-axis limits - ax1.set_xlim(-0.5, len(df) - 0.5) - - # Highlight each trend with a different color - self.logger.info("Highlighting trends on plot") - for trend in trends: - start_idx = trend['start_index'] - end_idx = trend['end_index'] - trend_type = trend['type'] - - # Get x-coordinates for trend plotting - x_start = start_idx - x_end = end_idx - - # Get y-coordinates for trend line - if 'close' in df.columns: - y_start = df['close'].iloc[start_idx] - y_end = df['close'].iloc[end_idx] - else: - y_start = df[df.columns[0]].iloc[start_idx] - y_end = df[df.columns[0]].iloc[end_idx] - - # Choose color based on trend type - color = 'green' if trend_type == 'up' else 'red' - - # Plot trend line - ax1.plot([x_start, x_end], [y_start, y_end], color=color, linewidth=2, - label=f"{trend_type.capitalize()} Trend" if f"{trend_type.capitalize()} Trend" not in ax1.get_legend_handles_labels()[1] else "") - - # Add markers at start and end points - ax1.scatter(x_start, y_start, color=color, marker='o', s=50) - ax1.scatter(x_end, y_end, color=color, marker='s', s=50) - - # Configure first subplot - ax1.set_title('Price with Trends (Candlestick)', fontsize=16) - ax1.set_ylabel('Price', fontsize=14) - ax1.grid(alpha=0.3) - ax1.legend() - - # Create MACD in second subplot - self.logger.info("Creating MACD subplot") - - # Calculate MACD indicators if not already present - if 'macd' not in df.columns: - if 'close' not in df.columns and len(df.columns) > 0: - df['close'] = df[df.columns[0]] - - df['macd'] = ta.trend.macd(df['close']) - df['macd_signal'] = ta.trend.macd_signal(df['close']) - df['macd_diff'] = ta.trend.macd_diff(df['close']) - - # Plot MACD components on second subplot - x_indices = np.arange(len(df)) - ax2.plot(x_indices, df['macd'], label='MACD', color='blue') - ax2.plot(x_indices, df['macd_signal'], label='Signal', color='orange') - - # Plot MACD histogram - for i in range(len(df)): - if df['macd_diff'].iloc[i] >= 0: - ax2.bar(i, df['macd_diff'].iloc[i], color='green', alpha=0.5, width=0.8) - else: - ax2.bar(i, df['macd_diff'].iloc[i], color='red', alpha=0.5, width=0.8) - - ax2.set_title('MACD Indicator', fontsize=16) - ax2.set_xlabel(x_label, fontsize=14) - ax2.set_ylabel('MACD', fontsize=14) - ax2.grid(alpha=0.3) - ax2.legend() - - # Enable synchronized zooming - plt.tight_layout() - plt.subplots_adjust(hspace=0.1) - plt.show() - - return plt +import pandas as pd +import numpy as np +import ta +import matplotlib.pyplot as plt +import matplotlib.dates as mdates +import logging +import mplfinance as mpf +from matplotlib.patches import Rectangle + +class TrendDetectorMACD: + def __init__(self, data, verbose=False): + self.data = data + self.verbose = verbose + # Configure logging + logging.basicConfig(level=logging.INFO if verbose else logging.WARNING, + format='%(asctime)s - %(levelname)s - %(message)s') + self.logger = logging.getLogger('TrendDetector') + + # Convert data to pandas DataFrame if it's not already + if not isinstance(self.data, pd.DataFrame): + if isinstance(self.data, list): + self.logger.info("Converting list to DataFrame") + self.data = pd.DataFrame({'close': self.data}) + else: + self.logger.error("Invalid data format provided") + raise ValueError("Data must be a pandas DataFrame or a list") + + self.logger.info(f"Initialized TrendDetector with {len(self.data)} data points") + + def detect_trends_MACD_signal(self): + self.logger.info("Starting trend detection") + if len(self.data) < 3: + self.logger.warning("Not enough data points for trend detection") + return {"error": "Not enough data points for trend detection"} + + # Create a copy of the DataFrame to avoid modifying the original + df = self.data.copy() + self.logger.info("Created copy of input data") + + # If 'close' column doesn't exist, try to use a relevant column + if 'close' not in df.columns and len(df.columns) > 0: + self.logger.info(f"'close' column not found, using {df.columns[0]} instead") + df['close'] = df[df.columns[0]] # Use the first column as 'close' + + # Add trend indicators + self.logger.info("Calculating MACD indicators") + # Moving Average Convergence Divergence (MACD) + df['macd'] = ta.trend.macd(df['close']) + df['macd_signal'] = ta.trend.macd_signal(df['close']) + df['macd_diff'] = ta.trend.macd_diff(df['close']) + + # Directional Movement Index (DMI) + if all(col in df.columns for col in ['high', 'low', 'close']): + self.logger.info("Calculating ADX indicators") + df['adx'] = ta.trend.adx(df['high'], df['low'], df['close']) + df['adx_pos'] = ta.trend.adx_pos(df['high'], df['low'], df['close']) + df['adx_neg'] = ta.trend.adx_neg(df['high'], df['low'], df['close']) + + # Identify trend changes + self.logger.info("Identifying trend changes") + df['trend'] = np.where(df['macd'] > df['macd_signal'], 'up', 'down') + df['trend_change'] = df['trend'] != df['trend'].shift(1) + + # Generate trend segments + self.logger.info("Generating trend segments") + trends = [] + trend_start = 0 + + for i in range(1, len(df)): + + if df['trend_change'].iloc[i]: + if i > trend_start: + trends.append({ + "type": df['trend'].iloc[i-1], + "start_index": trend_start, + "end_index": i-1, + "start_value": df['close'].iloc[trend_start], + "end_value": df['close'].iloc[i-1] + }) + trend_start = i + + # Add the last trend + if trend_start < len(df): + trends.append({ + "type": df['trend'].iloc[-1], + "start_index": trend_start, + "end_index": len(df)-1, + "start_value": df['close'].iloc[trend_start], + "end_value": df['close'].iloc[-1] + }) + + self.logger.info(f"Detected {len(trends)} trend segments") + return trends + + def get_strongest_trend(self): + self.logger.info("Finding strongest trend") + trends = self.detect_trends_MACD_signal() + if isinstance(trends, dict) and "error" in trends: + self.logger.warning(f"Error in trend detection: {trends['error']}") + return trends + + if not trends: + self.logger.info("No significant trends detected") + return {"message": "No significant trends detected"} + + strongest = max(trends, key=lambda x: abs(x["end_value"] - x["start_value"])) + self.logger.info(f"Strongest trend: {strongest['type']} from index {strongest['start_index']} to {strongest['end_index']}") + return strongest + + def plot_trends(self, trends): + """ + Plot price data with identified trends highlighted using candlestick charts. + """ + self.logger.info("Plotting trends with candlesticks") + if isinstance(trends, dict) and "error" in trends: + self.logger.error(trends["error"]) + print(trends["error"]) + return + + if not trends: + self.logger.warning("No significant trends detected for plotting") + print("No significant trends detected") + return + + # Create a figure with 2 subplots that share the x-axis + fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(14, 8), gridspec_kw={'height_ratios': [2, 1]}, sharex=True) + self.logger.info("Creating plot figure with shared x-axis") + + # Prepare data for candlestick chart + df = self.data.copy() + + # Ensure required columns exist for candlestick + required_cols = ['open', 'high', 'low', 'close'] + if not all(col in df.columns for col in required_cols): + self.logger.warning("Missing required columns for candlestick. Defaulting to line chart.") + if 'close' in df.columns: + ax1.plot(df.index if 'datetime' not in df.columns else df['datetime'], + df['close'], color='black', alpha=0.7, linewidth=1, label='Price') + else: + ax1.plot(df.index if 'datetime' not in df.columns else df['datetime'], + df[df.columns[0]], color='black', alpha=0.7, linewidth=1, label='Price') + else: + # Get x values (dates if available, otherwise indices) + if 'datetime' in df.columns: + x_label = 'Date' + # Format date axis + ax1.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d')) + ax2.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d')) + fig.autofmt_xdate() + self.logger.info("Using datetime for x-axis") + + # For candlestick, ensure datetime is the index + if df.index.name != 'datetime': + df = df.set_index('datetime') + else: + x_label = 'Index' + self.logger.info("Using index for x-axis") + + # Plot candlestick chart + up_color = 'green' + down_color = 'red' + + # Draw candlesticks manually + width = 0.6 + for i in range(len(df)): + # Get OHLC values for this candle + open_val = df['open'].iloc[i] + close_val = df['close'].iloc[i] + high_val = df['high'].iloc[i] + low_val = df['low'].iloc[i] + idx = df.index[i] + + # Determine candle color + color = up_color if close_val >= open_val else down_color + + # Plot candle body + body_height = abs(close_val - open_val) + bottom = min(open_val, close_val) + rect = Rectangle((i - width/2, bottom), width, body_height, color=color, alpha=0.8) + ax1.add_patch(rect) + + # Plot candle wicks + ax1.plot([i, i], [low_val, high_val], color='black', linewidth=1) + + # Set appropriate x-axis limits + ax1.set_xlim(-0.5, len(df) - 0.5) + + # Highlight each trend with a different color + self.logger.info("Highlighting trends on plot") + for trend in trends: + start_idx = trend['start_index'] + end_idx = trend['end_index'] + trend_type = trend['type'] + + # Get x-coordinates for trend plotting + x_start = start_idx + x_end = end_idx + + # Get y-coordinates for trend line + if 'close' in df.columns: + y_start = df['close'].iloc[start_idx] + y_end = df['close'].iloc[end_idx] + else: + y_start = df[df.columns[0]].iloc[start_idx] + y_end = df[df.columns[0]].iloc[end_idx] + + # Choose color based on trend type + color = 'green' if trend_type == 'up' else 'red' + + # Plot trend line + ax1.plot([x_start, x_end], [y_start, y_end], color=color, linewidth=2, + label=f"{trend_type.capitalize()} Trend" if f"{trend_type.capitalize()} Trend" not in ax1.get_legend_handles_labels()[1] else "") + + # Add markers at start and end points + ax1.scatter(x_start, y_start, color=color, marker='o', s=50) + ax1.scatter(x_end, y_end, color=color, marker='s', s=50) + + # Configure first subplot + ax1.set_title('Price with Trends (Candlestick)', fontsize=16) + ax1.set_ylabel('Price', fontsize=14) + ax1.grid(alpha=0.3) + ax1.legend() + + # Create MACD in second subplot + self.logger.info("Creating MACD subplot") + + # Calculate MACD indicators if not already present + if 'macd' not in df.columns: + if 'close' not in df.columns and len(df.columns) > 0: + df['close'] = df[df.columns[0]] + + df['macd'] = ta.trend.macd(df['close']) + df['macd_signal'] = ta.trend.macd_signal(df['close']) + df['macd_diff'] = ta.trend.macd_diff(df['close']) + + # Plot MACD components on second subplot + x_indices = np.arange(len(df)) + ax2.plot(x_indices, df['macd'], label='MACD', color='blue') + ax2.plot(x_indices, df['macd_signal'], label='Signal', color='orange') + + # Plot MACD histogram + for i in range(len(df)): + if df['macd_diff'].iloc[i] >= 0: + ax2.bar(i, df['macd_diff'].iloc[i], color='green', alpha=0.5, width=0.8) + else: + ax2.bar(i, df['macd_diff'].iloc[i], color='red', alpha=0.5, width=0.8) + + ax2.set_title('MACD Indicator', fontsize=16) + ax2.set_xlabel(x_label, fontsize=14) + ax2.set_ylabel('MACD', fontsize=14) + ax2.grid(alpha=0.3) + ax2.legend() + + # Enable synchronized zooming + plt.tight_layout() + plt.subplots_adjust(hspace=0.1) + plt.show() + + return plt diff --git a/trend_detector_simple.py b/trend_detector_simple.py index 141998c..5e92e05 100644 --- a/trend_detector_simple.py +++ b/trend_detector_simple.py @@ -1,205 +1,255 @@ -import pandas as pd -import numpy as np -import logging -from scipy.signal import find_peaks -import matplotlib.dates as mdates -from scipy import stats - -class TrendDetectorSimple: - def __init__(self, data, verbose=False): - """ - Initialize the TrendDetectorSimple class. - - Parameters: - - data: pandas DataFrame containing price data - - verbose: boolean, whether to display detailed logging information - """ - - self.data = data - self.verbose = verbose - - # Configure logging - logging.basicConfig(level=logging.INFO if verbose else logging.WARNING, - format='%(asctime)s - %(levelname)s - %(message)s') - self.logger = logging.getLogger('TrendDetectorSimple') - - # Convert data to pandas DataFrame if it's not already - if not isinstance(self.data, pd.DataFrame): - if isinstance(self.data, list): - self.logger.info("Converting list to DataFrame") - self.data = pd.DataFrame({'close': self.data}) - else: - self.logger.error("Invalid data format provided") - raise ValueError("Data must be a pandas DataFrame or a list") - - self.logger.info(f"Initialized TrendDetectorSimple with {len(self.data)} data points") - - def detect_trends(self): - """ - Detect trends by identifying local minima and maxima in the price data - using scipy.signal.find_peaks. - - Parameters: - - prominence: float, required prominence of peaks (relative to the price range) - - width: int, required width of peaks in data points - - Returns: - - DataFrame with columns for timestamps, prices, and trend indicators - """ - self.logger.info(f"Detecting trends") - - df = self.data.copy() - close_prices = df['close'].values - - max_peaks, _ = find_peaks(close_prices) - min_peaks, _ = find_peaks(-close_prices) - - self.logger.info(f"Found {len(min_peaks)} local minima and {len(max_peaks)} local maxima") - - df['is_min'] = False - df['is_max'] = False - - for peak in max_peaks: - df.at[peak, 'is_max'] = True - for peak in min_peaks: - df.at[peak, 'is_min'] = True - - result = df[['datetime', 'close', 'is_min', 'is_max']].copy() - - # Perform linear regression on min_peaks and max_peaks - self.logger.info("Performing linear regression on min and max peaks") - min_prices = df['close'].iloc[min_peaks].values - max_prices = df['close'].iloc[max_peaks].values - - # Linear regression for min peaks if we have at least 2 points - min_slope, min_intercept, min_r_value, _, _ = stats.linregress(min_peaks, min_prices) - # Linear regression for max peaks if we have at least 2 points - max_slope, max_intercept, max_r_value, _, _ = stats.linregress(max_peaks, max_prices) - - # Calculate Simple Moving Averages (SMA) for 7 and 15 periods - self.logger.info("Calculating SMA-7 and SMA-15") - - # Calculate SMA values and exclude NaN values - sma_7 = df['close'].rolling(window=7).mean().dropna().values - sma_15 = df['close'].rolling(window=15).mean().dropna().values - - # Add SMA values to regression_results - analysis_results = {} - analysis_results['linear_regression'] = { - 'min': { - 'slope': min_slope, - 'intercept': min_intercept, - 'r_squared': min_r_value ** 2 - }, - 'max': { - 'slope': max_slope, - 'intercept': max_intercept, - 'r_squared': max_r_value ** 2 - } - } - analysis_results['sma'] = { - '7': sma_7, - '15': sma_15 - } - - self.logger.info(f"Min peaks regression: slope={min_slope:.4f}, intercept={min_intercept:.4f}, r²={min_r_value**2:.4f}") - self.logger.info(f"Max peaks regression: slope={max_slope:.4f}, intercept={max_intercept:.4f}, r²={max_r_value**2:.4f}") - - return result, analysis_results - - def plot_trends(self, trend_data, analysis_results): - """ - Plot the price data with detected trends using a candlestick chart. - - Parameters: - - trend_data: DataFrame, the output from detect_trends(). If None, detect_trends() will be called. - - Returns: - - None (displays the plot) - """ - import matplotlib.pyplot as plt - from matplotlib.patches import Rectangle - - # Create the figure and axis - fig, ax = plt.subplots(figsize=(12, 8)) - - # Create a copy of the data - df = self.data.copy() - - # Plot candlestick chart - up_color = 'green' - down_color = 'red' - - # Draw candlesticks manually - width = 0.6 - x_values = range(len(df)) - - for i in range(len(df)): - # Get OHLC values for this candle - open_val = df['open'].iloc[i] - close_val = df['close'].iloc[i] - high_val = df['high'].iloc[i] - low_val = df['low'].iloc[i] - - # Determine candle color - color = up_color if close_val >= open_val else down_color - - # Plot candle body - body_height = abs(close_val - open_val) - bottom = min(open_val, close_val) - rect = Rectangle((i - width/2, bottom), width, body_height, color=color, alpha=0.8) - ax.add_patch(rect) - - # Plot candle wicks - ax.plot([i, i], [low_val, high_val], color='black', linewidth=1) - - min_indices = trend_data.index[trend_data['is_min'] == True].tolist() - if min_indices: - min_y = [df['close'].iloc[i] for i in min_indices] - ax.scatter(min_indices, min_y, color='darkred', s=200, marker='^', label='Local Minima', zorder=100) - - max_indices = trend_data.index[trend_data['is_max'] == True].tolist() - if max_indices: - max_y = [df['close'].iloc[i] for i in max_indices] - ax.scatter(max_indices, max_y, color='darkgreen', s=200, marker='v', label='Local Maxima', zorder=100) - - if analysis_results: - x_vals = np.arange(len(df)) - # Minima regression line (support) - min_slope = analysis_results['linear_regression']['min']['slope'] - min_intercept = analysis_results['linear_regression']['min']['intercept'] - min_line = min_slope * x_vals + min_intercept - ax.plot(x_vals, min_line, 'g--', linewidth=2, label='Minima Regression') - - # Maxima regression line (resistance) - max_slope = analysis_results['linear_regression']['max']['slope'] - max_intercept = analysis_results['linear_regression']['max']['intercept'] - max_line = max_slope * x_vals + max_intercept - ax.plot(x_vals, max_line, 'r--', linewidth=2, label='Maxima Regression') - - # SMA-7 line - sma_7 = analysis_results['sma']['7'] - ax.plot(x_vals, sma_7, 'y-', linewidth=2, label='SMA-7') - - # SMA-15 line - # sma_15 = analysis_results['sma']['15'] - # valid_idx_15 = ~np.isnan(sma_15) - # ax.plot(x_vals[valid_idx_15], sma_15[valid_idx_15], 'm-', linewidth=2, label='SMA-15') - - # Set title and labels - ax.set_title('Price Candlestick Chart with Local Minima and Maxima', fontsize=14) - ax.set_xlabel('Date', fontsize=12) - ax.set_ylabel('Price', fontsize=12) - - # Set appropriate x-axis limits - ax.set_xlim(-0.5, len(df) - 0.5) - - # Add a legend - ax.legend(loc='best') - - # Adjust layout - plt.tight_layout() - - # Show the plot - plt.show() +import pandas as pd +import numpy as np +import logging +from scipy.signal import find_peaks +import matplotlib.dates as mdates +from scipy import stats +from scipy import stats + +class TrendDetectorSimple: + def __init__(self, data, verbose=False): + """ + Initialize the TrendDetectorSimple class. + + Parameters: + - data: pandas DataFrame containing price data + - verbose: boolean, whether to display detailed logging information + """ + + self.data = data + self.verbose = verbose + + # Plot style configuration + self.plot_style = 'dark_background' + self.bg_color = '#181C27' + self.plot_size = (12, 8) + + # Candlestick configuration + self.candle_width = 0.6 + self.candle_up_color = '#089981' + self.candle_down_color = '#F23645' + self.candle_alpha = 0.8 + self.wick_width = 1 + + # Marker configuration + self.min_marker = '^' + self.min_color = 'red' + self.min_size = 100 + self.max_marker = 'v' + self.max_color = 'green' + self.max_size = 100 + self.marker_zorder = 100 + + # Line configuration + self.line_width = 2 + self.min_line_style = 'g--' # green dashed + self.max_line_style = 'r--' # red dashed + self.sma7_line_style = 'y-' # yellow solid + self.sma15_line_style = 'm-' # magenta solid + + # Text configuration + self.title_size = 14 + self.title_color = 'white' + self.axis_label_size = 12 + self.axis_label_color = 'white' + + # Legend configuration + self.legend_loc = 'best' + self.legend_bg_color = '#333333' + + # Configure logging + logging.basicConfig(level=logging.INFO if verbose else logging.WARNING, + format='%(asctime)s - %(levelname)s - %(message)s') + self.logger = logging.getLogger('TrendDetectorSimple') + + # Convert data to pandas DataFrame if it's not already + if not isinstance(self.data, pd.DataFrame): + if isinstance(self.data, list): + self.logger.info("Converting list to DataFrame") + self.data = pd.DataFrame({'close': self.data}) + else: + self.logger.error("Invalid data format provided") + raise ValueError("Data must be a pandas DataFrame or a list") + + self.logger.info(f"Initialized TrendDetectorSimple with {len(self.data)} data points") + + def detect_trends(self): + def detect_trends(self): + """ + Detect trends by identifying local minima and maxima in the price data + using scipy.signal.find_peaks. + + Parameters: + - prominence: float, required prominence of peaks (relative to the price range) + - width: int, required width of peaks in data points + + Returns: + - DataFrame with columns for timestamps, prices, and trend indicators + """ + self.logger.info(f"Detecting trends") + self.logger.info(f"Detecting trends") + + df = self.data.copy() + close_prices = df['close'].values + + max_peaks, _ = find_peaks(close_prices) + min_peaks, _ = find_peaks(-close_prices) + max_peaks, _ = find_peaks(close_prices) + min_peaks, _ = find_peaks(-close_prices) + + self.logger.info(f"Found {len(min_peaks)} local minima and {len(max_peaks)} local maxima") + + df['is_min'] = False + df['is_max'] = False + + for peak in max_peaks: + df.at[peak, 'is_max'] = True + for peak in min_peaks: + df.at[peak, 'is_min'] = True + + result = df[['datetime', 'close', 'is_min', 'is_max']].copy() + + # Perform linear regression on min_peaks and max_peaks + self.logger.info("Performing linear regression on min and max peaks") + min_prices = df['close'].iloc[min_peaks].values + max_prices = df['close'].iloc[max_peaks].values + + # Linear regression for min peaks if we have at least 2 points + min_slope, min_intercept, min_r_value, _, _ = stats.linregress(min_peaks, min_prices) + # Linear regression for max peaks if we have at least 2 points + max_slope, max_intercept, max_r_value, _, _ = stats.linregress(max_peaks, max_prices) + + # Calculate Simple Moving Averages (SMA) for 7 and 15 periods + self.logger.info("Calculating SMA-7 and SMA-15") + + sma_7 = pd.Series(close_prices).rolling(window=7, min_periods=1).mean().values + sma_15 = pd.Series(close_prices).rolling(window=15, min_periods=1).mean().values + + analysis_results = {} + analysis_results['linear_regression'] = { + 'min': { + 'slope': min_slope, + 'intercept': min_intercept, + 'r_squared': min_r_value ** 2 + }, + 'max': { + 'slope': max_slope, + 'intercept': max_intercept, + 'r_squared': max_r_value ** 2 + } + } + analysis_results['sma'] = { + '7': sma_7, + '15': sma_15 + } + + self.logger.info(f"Min peaks regression: slope={min_slope:.4f}, intercept={min_intercept:.4f}, r²={min_r_value**2:.4f}") + self.logger.info(f"Max peaks regression: slope={max_slope:.4f}, intercept={max_intercept:.4f}, r²={max_r_value**2:.4f}") + + return result, analysis_results + + def plot_trends(self, trend_data, analysis_results): + def plot_trends(self, trend_data, analysis_results): + """ + Plot the price data with detected trends using a candlestick chart. + + Parameters: + - trend_data: DataFrame, the output from detect_trends(). If None, detect_trends() will be called. + + Returns: + - None (displays the plot) + """ + import matplotlib.pyplot as plt + from matplotlib.patches import Rectangle + + # Create the figure and axis with specified background + plt.style.use(self.plot_style) + fig, ax = plt.subplots(figsize=self.plot_size) + + # Set the custom background color + fig.patch.set_facecolor(self.bg_color) + ax.set_facecolor(self.bg_color) + + # Create a copy of the data + df = self.data.copy() + + # Draw candlesticks manually + x_values = range(len(df)) + + for i in range(len(df)): + # Get OHLC values for this candle + open_val = df['open'].iloc[i] + close_val = df['close'].iloc[i] + high_val = df['high'].iloc[i] + low_val = df['low'].iloc[i] + + # Determine candle color + color = self.candle_up_color if close_val >= open_val else self.candle_down_color + + # Plot candle body + body_height = abs(close_val - open_val) + bottom = min(open_val, close_val) + rect = Rectangle((i - self.candle_width/2, bottom), self.candle_width, body_height, + color=color, alpha=self.candle_alpha) + ax.add_patch(rect) + + # Plot candle wicks + ax.plot([i, i], [low_val, high_val], color=color, linewidth=self.wick_width) + + min_indices = trend_data.index[trend_data['is_min'] == True].tolist() + if min_indices: + min_y = [df['close'].iloc[i] for i in min_indices] + ax.scatter(min_indices, min_y, color=self.min_color, s=self.min_size, + marker=self.min_marker, label='Local Minima', zorder=self.marker_zorder) + + max_indices = trend_data.index[trend_data['is_max'] == True].tolist() + if max_indices: + max_y = [df['close'].iloc[i] for i in max_indices] + ax.scatter(max_indices, max_y, color=self.max_color, s=self.max_size, + marker=self.max_marker, label='Local Maxima', zorder=self.marker_zorder) + + if analysis_results: + x_vals = np.arange(len(df)) + # Minima regression line (support) + min_slope = analysis_results['linear_regression']['min']['slope'] + min_intercept = analysis_results['linear_regression']['min']['intercept'] + min_line = min_slope * x_vals + min_intercept + ax.plot(x_vals, min_line, self.min_line_style, linewidth=self.line_width, + label='Minima Regression') + + # Maxima regression line (resistance) + max_slope = analysis_results['linear_regression']['max']['slope'] + max_intercept = analysis_results['linear_regression']['max']['intercept'] + max_line = max_slope * x_vals + max_intercept + ax.plot(x_vals, max_line, self.max_line_style, linewidth=self.line_width, + label='Maxima Regression') + + # SMA-7 line + sma_7 = analysis_results['sma']['7'] + ax.plot(x_vals, sma_7, self.sma7_line_style, linewidth=self.line_width, + label='SMA-7') + + # SMA-15 line + sma_15 = analysis_results['sma']['15'] + valid_idx_15 = ~np.isnan(sma_15) + ax.plot(x_vals[valid_idx_15], sma_15[valid_idx_15], self.sma15_line_style, + linewidth=self.line_width, label='SMA-15') + + # Set title and labels + ax.set_title('Price Candlestick Chart with Local Minima and Maxima', + fontsize=self.title_size, color=self.title_color) + ax.set_xlabel('Date', fontsize=self.axis_label_size, color=self.axis_label_color) + ax.set_ylabel('Price', fontsize=self.axis_label_size, color=self.axis_label_color) + + # Set appropriate x-axis limits + ax.set_xlim(-0.5, len(df) - 0.5) + + # Add a legend + ax.legend(loc=self.legend_loc, facecolor=self.legend_bg_color) + + # Adjust layout + plt.tight_layout() + + # Show the plot + plt.show() \ No newline at end of file