Cycles/cycle_detector.py

249 lines
10 KiB
Python
Raw Normal View History

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from scipy.signal import argrelextrema
class CycleDetector:
def __init__(self, data, timeframe='daily'):
"""
Initialize the CycleDetector with price data.
Parameters:
- data: DataFrame with at least 'date' or 'datetime' and 'close' columns
- timeframe: 'daily', 'weekly', or 'monthly'
"""
self.data = data.copy()
self.timeframe = timeframe
# Ensure we have a consistent date column name
if 'datetime' in self.data.columns and 'date' not in self.data.columns:
self.data.rename(columns={'datetime': 'date'}, inplace=True)
# Convert data to specified timeframe if needed
if timeframe == 'weekly' and 'date' in self.data.columns:
self.data = self._convert_data(self.data, 'W')
elif timeframe == 'monthly' and 'date' in self.data.columns:
self.data = self._convert_data(self.data, 'M')
# Add columns for local minima and maxima detection
self._add_swing_points()
def _convert_data(self, data, timeframe):
"""Convert daily data to 'timeframe' timeframe."""
data['date'] = pd.to_datetime(data['date'])
data.set_index('date', inplace=True)
weekly = data.resample(timeframe).agg({
'open': 'first',
'high': 'max',
'low': 'min',
'close': 'last',
'volume': 'sum'
})
return weekly.reset_index()
def _add_swing_points(self, window=5):
"""
Identify swing points (local minima and maxima).
Parameters:
- window: The window size for local minima/maxima detection
"""
# Set the index to make calculations easier
if 'date' in self.data.columns:
self.data.set_index('date', inplace=True)
# Detect local minima (swing lows)
min_idx = argrelextrema(self.data['low'].values, np.less, order=window)[0]
self.data['swing_low'] = False
self.data.iloc[min_idx, self.data.columns.get_loc('swing_low')] = True
# Detect local maxima (swing highs)
max_idx = argrelextrema(self.data['high'].values, np.greater, order=window)[0]
self.data['swing_high'] = False
self.data.iloc[max_idx, self.data.columns.get_loc('swing_high')] = True
# Reset index
self.data.reset_index(inplace=True)
def find_cycle_lows(self):
"""Find all swing lows which represent cycle lows."""
swing_low_dates = self.data[self.data['swing_low']]['date'].values
return swing_low_dates
def calculate_cycle_lengths(self):
"""Calculate the lengths of each cycle between consecutive lows."""
swing_low_indices = np.where(self.data['swing_low'])[0]
cycle_lengths = np.diff(swing_low_indices)
return cycle_lengths
def get_average_cycle_length(self):
"""Calculate the average cycle length."""
cycle_lengths = self.calculate_cycle_lengths()
if len(cycle_lengths) > 0:
return np.mean(cycle_lengths)
return None
def get_cycle_window(self, tolerance=0.10):
"""
Get the cycle window with the specified tolerance.
Parameters:
- tolerance: The tolerance as a percentage (default: 10%)
Returns:
- tuple: (min_cycle_length, avg_cycle_length, max_cycle_length)
"""
avg_length = self.get_average_cycle_length()
if avg_length is not None:
min_length = avg_length * (1 - tolerance)
max_length = avg_length * (1 + tolerance)
return (min_length, avg_length, max_length)
return None
def detect_two_drives_pattern(self, lookback=10):
"""
Detect 2-drives pattern: a swing low, counter trend bounce, and a lower low.
Parameters:
- lookback: Number of periods to look back
Returns:
- list: Indices where 2-drives patterns are detected
"""
patterns = []
for i in range(lookback, len(self.data) - 1):
if not self.data.iloc[i]['swing_low']:
continue
# Get the segment of data to check for pattern
segment = self.data.iloc[i-lookback:i+1]
swing_lows = segment[segment['swing_low']]['low'].values
if len(swing_lows) >= 2 and swing_lows[-1] < swing_lows[-2]:
# Check if there was a bounce between the two lows
between_lows = segment.iloc[-len(swing_lows):-1]
if len(between_lows) > 0 and max(between_lows['high']) > swing_lows[-2]:
patterns.append(i)
return patterns
def detect_v_shaped_lows(self, window=5, threshold=0.02):
"""
Detect V-shaped cycle lows (sharp decline followed by sharp rise).
Parameters:
- window: Window to look for sharp price changes
- threshold: Percentage change threshold to consider 'sharp'
Returns:
- list: Indices where V-shaped patterns are detected
"""
patterns = []
# Find all swing lows
swing_low_indices = np.where(self.data['swing_low'])[0]
for idx in swing_low_indices:
# Need enough data points before and after
if idx < window or idx + window >= len(self.data):
continue
# Get the low price at this swing low
low_price = self.data.iloc[idx]['low']
# Check for sharp decline before low (at least window bars before)
before_segment = self.data.iloc[max(0, idx-window):idx]
if len(before_segment) > 0:
max_before = before_segment['high'].max()
decline = (max_before - low_price) / max_before
# Check for sharp rise after low (at least window bars after)
after_segment = self.data.iloc[idx+1:min(len(self.data), idx+window+1)]
if len(after_segment) > 0:
max_after = after_segment['high'].max()
rise = (max_after - low_price) / low_price
# Both decline and rise must exceed threshold to be considered V-shaped
if decline > threshold and rise > threshold:
patterns.append(idx)
return patterns
def plot_cycles(self, pattern_detection=None, title_suffix=''):
"""
Plot the price data with cycle lows and detected patterns.
Parameters:
- pattern_detection: 'two_drives', 'v_shape', or None
- title_suffix: Optional suffix for the plot title
"""
plt.figure(figsize=(14, 7))
# Determine the date column name (could be 'date' or 'datetime')
date_col = 'date' if 'date' in self.data.columns else 'datetime'
# Plot price data
plt.plot(self.data[date_col], self.data['close'], label='Close Price')
# Calculate a consistent vertical position for indicators based on price range
price_range = self.data['close'].max() - self.data['close'].min()
indicator_offset = price_range * 0.01 # 1% of price range
# Plot cycle lows (now at a fixed offset below the low price)
swing_lows = self.data[self.data['swing_low']]
plt.scatter(swing_lows[date_col], swing_lows['low'] - indicator_offset,
color='green', marker='^', s=100, label='Cycle Lows')
# Plot specific patterns if requested
if 'two_drives' in pattern_detection:
pattern_indices = self.detect_two_drives_pattern()
if pattern_indices:
patterns = self.data.iloc[pattern_indices]
plt.scatter(patterns[date_col], patterns['low'] - indicator_offset * 2,
color='red', marker='o', s=150, label='Two Drives Pattern')
elif 'v_shape' in pattern_detection:
pattern_indices = self.detect_v_shaped_lows()
if pattern_indices:
patterns = self.data.iloc[pattern_indices]
plt.scatter(patterns[date_col], patterns['low'] - indicator_offset * 2,
color='purple', marker='o', s=150, label='V-Shape Pattern')
# Add cycle lengths and averages
cycle_lengths = self.calculate_cycle_lengths()
avg_cycle = self.get_average_cycle_length()
cycle_window = self.get_cycle_window()
window_text = ""
if cycle_window:
window_text = f"Tolerance Window: [{cycle_window[0]:.2f} - {cycle_window[2]:.2f}]"
plt.title(f"Detected Cycles - {self.timeframe.capitalize()} Timeframe {title_suffix}\n"
f"Average Cycle Length: {avg_cycle:.2f} periods, {window_text}")
plt.legend()
plt.grid(True)
plt.show()
# Usage example:
# 1. Load your data
# data = pd.read_csv('your_price_data.csv')
# 2. Create cycle detector instances for different timeframes
# weekly_detector = CycleDetector(data, timeframe='weekly')
# daily_detector = CycleDetector(data, timeframe='daily')
# 3. Analyze cycles
# weekly_cycle_length = weekly_detector.get_average_cycle_length()
# daily_cycle_length = daily_detector.get_average_cycle_length()
# 4. Detect patterns
# two_drives = weekly_detector.detect_two_drives_pattern()
# v_shapes = daily_detector.detect_v_shaped_lows()
# 5. Visualize
# weekly_detector.plot_cycles(pattern_detection='two_drives')
# daily_detector.plot_cycles(pattern_detection='v_shape')