Enhance data loading in main.py to support both CSV and JSON formats, improving flexibility for input data. Update date filtering and logging for better traceability. Refactor trade collection logic to ensure all trades are captured and written to separate CSV files by timeframe and stop loss percentage. Adjusted main execution parameters for broader timeframe analysis.

This commit is contained in:
Simon Moisy 2025-05-19 17:35:17 +08:00
parent 170751db0e
commit 0eb7fc77f9
2 changed files with 130 additions and 44 deletions

152
main.py
View File

@ -14,6 +14,7 @@ import threading
import queue
import time
import math
import json
# Set up logging
logging.basicConfig(
@ -73,29 +74,39 @@ def get_optimal_workers():
return min(workers_by_cpu, workers_by_memory)
def load_data(file_path, start_date, stop_date):
"""Load data with optimized dtypes and filtering"""
# Define optimized dtypes
dtypes = {
'Open': 'float32',
'High': 'float32',
'Low': 'float32',
'Close': 'float32',
'Volume': 'float32'
}
# Read data with original capitalized column names
data = pd.read_csv(file_path, dtype=dtypes)
# Convert timestamp to datetime
data['Timestamp'] = pd.to_datetime(data['Timestamp'], unit='s')
# Filter by date range
data = data[(data['Timestamp'] >= start_date) & (data['Timestamp'] <= stop_date)]
# Now convert column names to lowercase
data.columns = data.columns.str.lower()
return data.set_index('timestamp')
"""Load data with optimized dtypes and filtering, supporting CSV and JSON input"""
# Determine file type
_, ext = os.path.splitext(file_path)
ext = ext.lower()
if ext == ".json":
with open(file_path, 'r') as f:
raw = json.load(f)
data = pd.DataFrame(raw["Data"])
# Convert columns to lowercase
data.columns = data.columns.str.lower()
# Convert timestamp to datetime
data["timestamp"] = pd.to_datetime(data["timestamp"], unit="s")
# Filter by date range
data = data[(data["timestamp"] >= start_date) & (data["timestamp"] <= stop_date)]
return data.set_index("timestamp")
else:
# Define optimized dtypes
dtypes = {
'Open': 'float32',
'High': 'float32',
'Low': 'float32',
'Close': 'float32',
'Volume': 'float32'
}
# Read data with original capitalized column names
data = pd.read_csv(file_path, dtype=dtypes)
# Convert timestamp to datetime
data['Timestamp'] = pd.to_datetime(data['Timestamp'], unit='s')
# Filter by date range
data = data[(data['Timestamp'] >= start_date) & (data['Timestamp'] <= stop_date)]
# Now convert column names to lowercase
data.columns = data.columns.str.lower()
return data.set_index('timestamp')
def process_timeframe_data(min1_df, df, stop_loss_pcts, rule_name, initial_usd, debug=False):
"""Process the entire timeframe with all stop loss values (no monthly split)"""
@ -140,7 +151,9 @@ def process_timeframe_data(min1_df, df, stop_loss_pcts, rule_name, initial_usd,
"win_rate": win_rate,
"max_drawdown": max_drawdown,
"avg_trade": avg_trade,
"profit_ratio": profit_ratio,
"total_profit": total_profit,
"total_loss": total_loss,
"profit_ratio": profit_ratio,
"initial_usd": initial_usd,
"final_usd": final_usd,
}
@ -319,8 +332,8 @@ def write_results_per_combination_gsheet(results_rows, trade_rows, timestamp, sp
if __name__ == "__main__":
# Configuration
start_date = '2020-01-01'
stop_date = '2025-05-15'
start_date = '2022-01-01'
stop_date = '2023-01-01'
initial_usd = 10000
debug = False
@ -328,12 +341,28 @@ if __name__ == "__main__":
os.makedirs(results_dir, exist_ok=True)
timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M")
timeframes = ["15min", "30min", "1h", "6h", "1D"]
stop_loss_pcts = [0.02, 0.03, 0.05]
timeframes = ["1min", "5min", "15min", "30min", "1h", "4h", "6h", "12h", "1D"]
stop_loss_pcts = [0.01, 0.02, 0.03]
# Load data once
data_1min = load_data('./data/btcusd_1-min_data.csv', start_date, stop_date)
logging.info(f"1min rows: {len(data_1min)}")
# Log the price at the nearest timestamp to start_date and stop_date
def get_nearest_price(df, target_date):
if len(df) == 0:
return None, None
target_ts = pd.to_datetime(target_date)
nearest_idx = df.index.get_indexer([target_ts], method='nearest')[0]
nearest_time = df.index[nearest_idx]
price = df.iloc[nearest_idx]['close']
return nearest_time, price
nearest_start_time, start_price = get_nearest_price(data_1min, start_date)
nearest_stop_time, stop_price = get_nearest_price(data_1min, stop_date)
logging.info(f"Price at start_date ({start_date}) [nearest timestamp: {nearest_start_time}]: {start_price}")
logging.info(f"Price at stop_date ({stop_date}) [nearest timestamp: {nearest_stop_time}]: {stop_price}")
# Prepare tasks
tasks = [
@ -347,29 +376,31 @@ if __name__ == "__main__":
logging.info(f"Using {workers} workers for processing")
# Start the background batch pusher
spreadsheet_name = "GlimBit Backtest Results"
batch_pusher = GSheetBatchPusher(results_queue, timestamp, spreadsheet_name, interval=65)
batch_pusher.start()
# spreadsheet_name = "GlimBit Backtest Results"
# batch_pusher = GSheetBatchPusher(results_queue, timestamp, spreadsheet_name, interval=65)
# batch_pusher.start()
# Process tasks with optimized concurrency
with concurrent.futures.ProcessPoolExecutor(max_workers=workers) as executor:
futures = {executor.submit(process_timeframe, task, debug): task for task in tasks}
all_results_rows = []
all_trade_rows = []
for future in concurrent.futures.as_completed(futures):
results, trades = future.result()
if results or trades:
all_results_rows.extend(results)
results_queue.put((results, trades)) # Enqueue for batch update
all_trade_rows.extend(trades)
# results_queue.put((results, trades)) # Enqueue for batch update
# After all tasks, flush any remaining updates
batch_pusher.stop()
batch_pusher.join()
# batch_pusher.stop()
# batch_pusher.join()
# Ensure all batches are pushed, even after 429 errors
while not results_queue.empty():
logging.info("Waiting for Google Sheets quota to reset. Retrying batch push in 60 seconds...")
time.sleep(65)
batch_pusher.push_all()
# while not results_queue.empty():
# logging.info("Waiting for Google Sheets quota to reset. Retrying batch push in 60 seconds...")
# time.sleep(65)
# batch_pusher.push_all()
# Write all results to a single CSV file
combined_filename = os.path.join(results_dir, f"{timestamp}_backtest_combined.csv")
@ -398,4 +429,47 @@ if __name__ == "__main__":
for row in all_results_rows:
writer.writerow(format_row(row))
logging.info(f"Combined results written to {combined_filename}")
logging.info(f"Combined results written to {combined_filename}")
# --- Write trades to separate CSVs per timeframe and stop loss ---
# Collect all trades from each task (need to run tasks to collect trades)
# Since only all_results_rows is collected above, we need to also collect all trades.
# To do this, modify the above loop to collect all trades as well.
# But for now, let's assume you have a list all_trade_rows (list of dicts)
# If not, you need to collect it in the ProcessPoolExecutor loop above.
# --- BEGIN: Collect all trades from each task ---
# To do this, modify the ProcessPoolExecutor loop above:
# all_results_rows = []
# all_trade_rows = []
# ...
# for future in concurrent.futures.as_completed(futures):
# results, trades = future.result()
# if results or trades:
# all_results_rows.extend(results)
# all_trade_rows.extend(trades)
# --- END: Collect all trades from each task ---
# Now, group all_trade_rows by (timeframe, stop_loss_pct)
from collections import defaultdict
trades_by_combo = defaultdict(list)
for trade in all_trade_rows:
tf = trade.get("timeframe")
sl = trade.get("stop_loss_pct")
trades_by_combo[(tf, sl)].append(trade)
trades_fieldnames = [
"entry_time", "exit_time", "entry_price", "exit_price", "profit_pct", "type"
]
for (tf, sl), trades in trades_by_combo.items():
sl_percent = int(round(sl * 100))
trades_filename = os.path.join(results_dir, f"trades_{tf}_ST{sl_percent}pct.csv")
with open(trades_filename, "w", newline="") as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=trades_fieldnames)
writer.writeheader()
for trade in trades:
writer.writerow({k: trade.get(k, "") for k in trades_fieldnames})
logging.info(f"Trades written to {trades_filename}")

View File

@ -660,6 +660,17 @@ class TrendDetectorSimple:
meta_trend = np.where((trends_arr[:,0] == trends_arr[:,1]) & (trends_arr[:,1] == trends_arr[:,2]),
trends_arr[:,0], 0)
if debug:
# Count flips (ignoring 0s)
flips = 0
last = meta_trend[0]
for val in meta_trend[1:]:
if val != 0 and val != last:
flips += 1
last = val
print(f"Meta trend flips (ignoring 0): {flips}")
print(f"Meta trend value counts: {np.unique(meta_trend, return_counts=True)}")
position = 0 # 0 = no position, 1 = long
entry_price = 0
usd = initial_usd
@ -682,7 +693,8 @@ class TrendDetectorSimple:
price_low = df['low'].iloc[i]
price_close = df['close'].iloc[i]
date = df['timestamp'].iloc[i]
mt = meta_trend[i]
prev_mt = meta_trend[i-1]
curr_mt = meta_trend[i]
# Check stop loss if in position
if position == 1:
@ -724,8 +736,8 @@ class TrendDetectorSimple:
# Update the start index for next check
current_trade_min1_start_idx = current_min1_end_idx
# Entry logic
if position == 0 and mt == 1:
# Entry: only if not in position and signal changes to 1
if position == 0 and prev_mt != 1 and curr_mt == 1:
# Buy at open, apply transaction cost
coin = (usd * (1 - transaction_cost)) / price_open
entry_price = price_open
@ -734,8 +746,8 @@ class TrendDetectorSimple:
position = 1
current_trade_min1_start_idx = None # Will be set on first stop loss check
# Exit logic
elif position == 1 and mt == -1:
# Exit: only if in position and signal changes from 1 to -1
elif position == 1 and prev_mt == 1 and curr_mt == -1:
# Sell at open, apply transaction cost
usd = coin * price_open * (1 - transaction_cost)
trade_log.append({