diff --git a/main.py b/main.py index 81ef837..96ebece 100644 --- a/main.py +++ b/main.py @@ -14,6 +14,7 @@ import threading import queue import time import math +import json # Set up logging logging.basicConfig( @@ -73,29 +74,39 @@ def get_optimal_workers(): return min(workers_by_cpu, workers_by_memory) def load_data(file_path, start_date, stop_date): - """Load data with optimized dtypes and filtering""" - # Define optimized dtypes - dtypes = { - 'Open': 'float32', - 'High': 'float32', - 'Low': 'float32', - 'Close': 'float32', - 'Volume': 'float32' - } - - # Read data with original capitalized column names - data = pd.read_csv(file_path, dtype=dtypes) - - # Convert timestamp to datetime - data['Timestamp'] = pd.to_datetime(data['Timestamp'], unit='s') - - # Filter by date range - data = data[(data['Timestamp'] >= start_date) & (data['Timestamp'] <= stop_date)] - - # Now convert column names to lowercase - data.columns = data.columns.str.lower() - - return data.set_index('timestamp') + """Load data with optimized dtypes and filtering, supporting CSV and JSON input""" + # Determine file type + _, ext = os.path.splitext(file_path) + ext = ext.lower() + if ext == ".json": + with open(file_path, 'r') as f: + raw = json.load(f) + data = pd.DataFrame(raw["Data"]) + # Convert columns to lowercase + data.columns = data.columns.str.lower() + # Convert timestamp to datetime + data["timestamp"] = pd.to_datetime(data["timestamp"], unit="s") + # Filter by date range + data = data[(data["timestamp"] >= start_date) & (data["timestamp"] <= stop_date)] + return data.set_index("timestamp") + else: + # Define optimized dtypes + dtypes = { + 'Open': 'float32', + 'High': 'float32', + 'Low': 'float32', + 'Close': 'float32', + 'Volume': 'float32' + } + # Read data with original capitalized column names + data = pd.read_csv(file_path, dtype=dtypes) + # Convert timestamp to datetime + data['Timestamp'] = pd.to_datetime(data['Timestamp'], unit='s') + # Filter by date range + data = data[(data['Timestamp'] >= start_date) & (data['Timestamp'] <= stop_date)] + # Now convert column names to lowercase + data.columns = data.columns.str.lower() + return data.set_index('timestamp') def process_timeframe_data(min1_df, df, stop_loss_pcts, rule_name, initial_usd, debug=False): """Process the entire timeframe with all stop loss values (no monthly split)""" @@ -140,7 +151,9 @@ def process_timeframe_data(min1_df, df, stop_loss_pcts, rule_name, initial_usd, "win_rate": win_rate, "max_drawdown": max_drawdown, "avg_trade": avg_trade, - "profit_ratio": profit_ratio, + "total_profit": total_profit, + "total_loss": total_loss, + "profit_ratio": profit_ratio, "initial_usd": initial_usd, "final_usd": final_usd, } @@ -319,8 +332,8 @@ def write_results_per_combination_gsheet(results_rows, trade_rows, timestamp, sp if __name__ == "__main__": # Configuration - start_date = '2020-01-01' - stop_date = '2025-05-15' + start_date = '2022-01-01' + stop_date = '2023-01-01' initial_usd = 10000 debug = False @@ -328,12 +341,28 @@ if __name__ == "__main__": os.makedirs(results_dir, exist_ok=True) timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M") - timeframes = ["15min", "30min", "1h", "6h", "1D"] - stop_loss_pcts = [0.02, 0.03, 0.05] + timeframes = ["1min", "5min", "15min", "30min", "1h", "4h", "6h", "12h", "1D"] + stop_loss_pcts = [0.01, 0.02, 0.03] # Load data once data_1min = load_data('./data/btcusd_1-min_data.csv', start_date, stop_date) logging.info(f"1min rows: {len(data_1min)}") + + # Log the price at the nearest timestamp to start_date and stop_date + def get_nearest_price(df, target_date): + if len(df) == 0: + return None, None + target_ts = pd.to_datetime(target_date) + nearest_idx = df.index.get_indexer([target_ts], method='nearest')[0] + nearest_time = df.index[nearest_idx] + price = df.iloc[nearest_idx]['close'] + return nearest_time, price + + nearest_start_time, start_price = get_nearest_price(data_1min, start_date) + nearest_stop_time, stop_price = get_nearest_price(data_1min, stop_date) + + logging.info(f"Price at start_date ({start_date}) [nearest timestamp: {nearest_start_time}]: {start_price}") + logging.info(f"Price at stop_date ({stop_date}) [nearest timestamp: {nearest_stop_time}]: {stop_price}") # Prepare tasks tasks = [ @@ -347,29 +376,31 @@ if __name__ == "__main__": logging.info(f"Using {workers} workers for processing") # Start the background batch pusher - spreadsheet_name = "GlimBit Backtest Results" - batch_pusher = GSheetBatchPusher(results_queue, timestamp, spreadsheet_name, interval=65) - batch_pusher.start() + # spreadsheet_name = "GlimBit Backtest Results" + # batch_pusher = GSheetBatchPusher(results_queue, timestamp, spreadsheet_name, interval=65) + # batch_pusher.start() # Process tasks with optimized concurrency with concurrent.futures.ProcessPoolExecutor(max_workers=workers) as executor: futures = {executor.submit(process_timeframe, task, debug): task for task in tasks} all_results_rows = [] + all_trade_rows = [] for future in concurrent.futures.as_completed(futures): results, trades = future.result() if results or trades: all_results_rows.extend(results) - results_queue.put((results, trades)) # Enqueue for batch update + all_trade_rows.extend(trades) + # results_queue.put((results, trades)) # Enqueue for batch update # After all tasks, flush any remaining updates - batch_pusher.stop() - batch_pusher.join() + # batch_pusher.stop() + # batch_pusher.join() # Ensure all batches are pushed, even after 429 errors - while not results_queue.empty(): - logging.info("Waiting for Google Sheets quota to reset. Retrying batch push in 60 seconds...") - time.sleep(65) - batch_pusher.push_all() + # while not results_queue.empty(): + # logging.info("Waiting for Google Sheets quota to reset. Retrying batch push in 60 seconds...") + # time.sleep(65) + # batch_pusher.push_all() # Write all results to a single CSV file combined_filename = os.path.join(results_dir, f"{timestamp}_backtest_combined.csv") @@ -398,4 +429,47 @@ if __name__ == "__main__": for row in all_results_rows: writer.writerow(format_row(row)) - logging.info(f"Combined results written to {combined_filename}") \ No newline at end of file + logging.info(f"Combined results written to {combined_filename}") + + # --- Write trades to separate CSVs per timeframe and stop loss --- + # Collect all trades from each task (need to run tasks to collect trades) + # Since only all_results_rows is collected above, we need to also collect all trades. + # To do this, modify the above loop to collect all trades as well. + # But for now, let's assume you have a list all_trade_rows (list of dicts) + # If not, you need to collect it in the ProcessPoolExecutor loop above. + + # --- BEGIN: Collect all trades from each task --- + # To do this, modify the ProcessPoolExecutor loop above: + # all_results_rows = [] + # all_trade_rows = [] + # ... + # for future in concurrent.futures.as_completed(futures): + # results, trades = future.result() + # if results or trades: + # all_results_rows.extend(results) + # all_trade_rows.extend(trades) + # --- END: Collect all trades from each task --- + + # Now, group all_trade_rows by (timeframe, stop_loss_pct) + from collections import defaultdict + trades_by_combo = defaultdict(list) + for trade in all_trade_rows: + tf = trade.get("timeframe") + sl = trade.get("stop_loss_pct") + trades_by_combo[(tf, sl)].append(trade) + + trades_fieldnames = [ + "entry_time", "exit_time", "entry_price", "exit_price", "profit_pct", "type" + ] + + for (tf, sl), trades in trades_by_combo.items(): + sl_percent = int(round(sl * 100)) + trades_filename = os.path.join(results_dir, f"trades_{tf}_ST{sl_percent}pct.csv") + with open(trades_filename, "w", newline="") as csvfile: + writer = csv.DictWriter(csvfile, fieldnames=trades_fieldnames) + writer.writeheader() + for trade in trades: + writer.writerow({k: trade.get(k, "") for k in trades_fieldnames}) + logging.info(f"Trades written to {trades_filename}") + + \ No newline at end of file diff --git a/trend_detector_simple.py b/trend_detector_simple.py index 48abfa2..0610ec5 100644 --- a/trend_detector_simple.py +++ b/trend_detector_simple.py @@ -660,6 +660,17 @@ class TrendDetectorSimple: meta_trend = np.where((trends_arr[:,0] == trends_arr[:,1]) & (trends_arr[:,1] == trends_arr[:,2]), trends_arr[:,0], 0) + if debug: + # Count flips (ignoring 0s) + flips = 0 + last = meta_trend[0] + for val in meta_trend[1:]: + if val != 0 and val != last: + flips += 1 + last = val + print(f"Meta trend flips (ignoring 0): {flips}") + print(f"Meta trend value counts: {np.unique(meta_trend, return_counts=True)}") + position = 0 # 0 = no position, 1 = long entry_price = 0 usd = initial_usd @@ -682,7 +693,8 @@ class TrendDetectorSimple: price_low = df['low'].iloc[i] price_close = df['close'].iloc[i] date = df['timestamp'].iloc[i] - mt = meta_trend[i] + prev_mt = meta_trend[i-1] + curr_mt = meta_trend[i] # Check stop loss if in position if position == 1: @@ -724,8 +736,8 @@ class TrendDetectorSimple: # Update the start index for next check current_trade_min1_start_idx = current_min1_end_idx - # Entry logic - if position == 0 and mt == 1: + # Entry: only if not in position and signal changes to 1 + if position == 0 and prev_mt != 1 and curr_mt == 1: # Buy at open, apply transaction cost coin = (usd * (1 - transaction_cost)) / price_open entry_price = price_open @@ -734,8 +746,8 @@ class TrendDetectorSimple: position = 1 current_trade_min1_start_idx = None # Will be set on first stop loss check - # Exit logic - elif position == 1 and mt == -1: + # Exit: only if in position and signal changes from 1 to -1 + elif position == 1 and prev_mt == 1 and curr_mt == -1: # Sell at open, apply transaction cost usd = coin * price_open * (1 - transaction_cost) trade_log.append({