from fastapi import FastAPI, HTTPException from fastapi.responses import FileResponse, HTMLResponse from fastapi.staticfiles import StaticFiles from pydantic import BaseModel from typing import List import uvicorn import os import hashlib PORT = 8051 DIRECTORY = "./data/db" # Ensure the data directory exists (if not already handled elsewhere) os.makedirs(DIRECTORY, exist_ok=True) app = FastAPI() # Pydantic model for the delete file request class DeleteFileRequest(BaseModel): name: str class FileInfo(BaseModel): name: str hash: str @app.post("/api/files/list", response_model=List[FileInfo]) async def api_list_files(): """ Lists all files and their SHA256 hashes in the data/db directory. """ try: files_with_hashes = [] for f in os.listdir(DIRECTORY): file_path = os.path.join(DIRECTORY, f) if os.path.isfile(file_path) and f.endswith(".db"): hasher = hashlib.sha256() with open(file_path, "rb") as afile: buf = afile.read() hasher.update(buf) files_with_hashes.append({"name": f, "hash": hasher.hexdigest()}) return files_with_hashes except Exception as e: raise HTTPException(status_code=500, detail=f"An error occurred while listing files: {e}") @app.post("/api/files/delete") async def api_delete_file(request: DeleteFileRequest): """ Deletes a specified file from the data/db directory. """ file_path = os.path.join(DIRECTORY, request.name) if not os.path.exists(file_path) or not os.path.isfile(file_path): raise HTTPException(status_code=404, detail=f"File not found: {request.name}") try: os.remove(file_path) return {"success": True} except Exception as e: raise HTTPException(status_code=500, detail=f"Could not delete file: {e}") # Mount static files to serve content from the database directory app.mount("/data", StaticFiles(directory=DIRECTORY), name="data") app.mount("/static", StaticFiles(directory="static"), name="static") @app.get("/files", response_class=HTMLResponse) async def list_files(): db_files_info = [] for root, _, files in os.walk(DIRECTORY): for file in files: if file.endswith(".db"): file_path = os.path.join(root, file) relative_path = os.path.relpath(file_path, DIRECTORY) try: file_size = os.path.getsize(file_path) creation_time = os.path.getctime(file_path) db_files_info.append({ "relative_path": relative_path, "file_size": file_size, "creation_time": creation_time }) except Exception as e: # Log or handle error for specific file without breaking the list print(f"Error accessing {file_path}: {e}") # Sort files by creation time db_files_info.sort(key=lambda x: x["creation_time"]) files_html = "" for file_info in db_files_info: # convert to GB file_size_gb = file_info["file_size"] / (1024 * 1024 * 1024) files_html += f"""
Welcome to the Market Data Server.
Access database files directly via /data/your_file.db
""") if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=PORT)