106 lines
3.5 KiB
Python

import json
import logging
import typer
from pathlib import Path
from datetime import datetime, timezone
import threading
from db_interpreter import DBInterpreter
from ohlc_processor import OHLCProcessor
from strategy import Strategy
from desktop_app import MainWindow
import sys
from PySide6.QtWidgets import QApplication
from PySide6.QtCore import QTimer
def main(instrument: str = typer.Argument(..., help="Instrument to backtest, e.g. BTC-USDT"),
start_date: str = typer.Argument(..., help="Start date, e.g. 2025-07-01"),
end_date: str = typer.Argument(..., help="End date, e.g. 2025-08-01"),
timeframe_minutes: int = typer.Option(15, "--timeframe-minutes", help="Timeframe in minutes"),
ui: bool = typer.Option(False, "--ui/--no-ui", help="Enable UI")):
logging.basicConfig(filename="strategy.log", level=logging.DEBUG, format="%(asctime)s %(levelname)s %(message)s")
start_date = datetime.strptime(start_date, "%Y-%m-%d").replace(tzinfo=timezone.utc)
end_date = datetime.strptime(end_date, "%Y-%m-%d").replace(tzinfo=timezone.utc)
databases_path = Path("../data/OKX")
if not databases_path.exists():
logging.error(f"Database path does not exist: {databases_path}")
return
db_paths = list(databases_path.glob(f"{instrument}*.db"))
db_paths.sort()
if not db_paths:
logging.error(f"No database files found for instrument {instrument} in {databases_path}")
return
logging.info(f"Found {len(db_paths)} database files: {[p.name for p in db_paths]}")
processor = OHLCProcessor(aggregate_window_seconds=60 * timeframe_minutes)
strategy = Strategy(
lookback=30,
min_volume_factor=1.0,
confirm_break_signal_high=True, # or False to disable
debug=True,
debug_level=2, # 1=key events, 2=per-bar detail
debug_every_n_bars=100
)
def process_data():
db_to_process = []
for db_path in db_paths:
db_name_parts = db_path.name.split(".")[0].split("-")
if len(db_name_parts) < 5:
logging.warning(f"Unexpected filename format: {db_path.name}")
continue
db_name = db_name_parts[2:5]
db_date = datetime.strptime("".join(db_name), "%y%m%d").replace(tzinfo=timezone.utc)
if db_date < start_date or db_date >= end_date:
continue
db_to_process.append(db_path)
for i, db_path in enumerate(db_to_process):
print(f"{i}/{len(db_to_process)}")
db_interpreter = DBInterpreter(db_path)
for orderbook_update, trades in db_interpreter.stream():
processor.update_orderbook(orderbook_update)
processor.process_trades(trades)
strategy.process(processor)
processor.flush()
strategy.process(processor)
try:
strategy.on_finish(processor) # optional: flat at last close
except Exception:
pass
print(json.dumps(strategy.get_stats(), indent=2))
if ui:
data_thread = threading.Thread(target=process_data, daemon=True)
data_thread.start()
app = QApplication(sys.argv)
desktop_app = MainWindow()
desktop_app.show()
timer = QTimer()
timer.timeout.connect(lambda: desktop_app.update_data(processor, 30))
timer.start(1000)
app.exec()
else:
process_data()
if __name__ == "__main__":
typer.run(main)