MarketDataCollector/simple_server.py

156 lines
5.4 KiB
Python

from fastapi import FastAPI, HTTPException
from fastapi.responses import FileResponse, HTMLResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
from typing import List
import uvicorn
import os
import hashlib
PORT = 8051
# Get the absolute path to the directory where this script is located
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
DIRECTORY = os.path.join(SCRIPT_DIR, "data", "db")
# Ensure the data directory exists (if not already handled elsewhere)
os.makedirs(DIRECTORY, exist_ok=True)
app = FastAPI()
# Pydantic model for the delete file request
class DeleteFileRequest(BaseModel):
name: str
@app.post("/api/files/list", response_model=List[str])
async def api_list_files():
"""
Lists all files in the data/db directory.
"""
try:
files = [f for f in os.listdir(DIRECTORY) if os.path.isfile(os.path.join(DIRECTORY, f)) and f.endswith(".db")]
return files
except Exception as e:
raise HTTPException(status_code=500, detail=f"An error occurred while listing files: {e}")
@app.post("/api/files/hash")
async def api_file_hash(request: DeleteFileRequest):
"""
Calculates and returns the SHA256 hash for a specific file.
"""
file_path = os.path.join(DIRECTORY, request.name)
if not os.path.exists(file_path) or not os.path.isfile(file_path):
raise HTTPException(status_code=404, detail=f"File not found: {request.name}")
try:
hasher = hashlib.sha256()
with open(file_path, "rb") as afile:
while chunk := afile.read(8192):
hasher.update(chunk)
return {"name": request.name, "hash": hasher.hexdigest()}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Could not calculate hash for file: {e}")
@app.post("/api/files/delete")
async def api_delete_file(request: DeleteFileRequest):
"""
Deletes a specified file from the data/db directory.
"""
file_path = os.path.join(DIRECTORY, request.name)
if not os.path.exists(file_path) or not os.path.isfile(file_path):
raise HTTPException(status_code=404, detail=f"File not found: {request.name}")
try:
os.remove(file_path)
return {"success": True}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Could not delete file: {e}")
# Mount static files to serve content from the database directory
app.mount("/data", StaticFiles(directory=DIRECTORY), name="data")
app.mount("/static", StaticFiles(directory="static"), name="static")
@app.get("/files", response_class=HTMLResponse)
async def list_files():
db_files_info = []
for root, _, files in os.walk(DIRECTORY):
for file in files:
if file.endswith(".db"):
file_path = os.path.join(root, file)
relative_path = os.path.relpath(file_path, DIRECTORY)
try:
file_size = os.path.getsize(file_path)
creation_time = os.path.getctime(file_path)
db_files_info.append({
"relative_path": relative_path,
"file_size": file_size,
"creation_time": creation_time
})
except Exception as e:
# Log or handle error for specific file without breaking the list
print(f"Error accessing {file_path}: {e}")
# Sort files by creation time
db_files_info.sort(key=lambda x: x["creation_time"])
files_html = ""
for file_info in db_files_info:
# convert to GB
file_size_gb = file_info["file_size"] / (1024 * 1024 * 1024)
files_html += f"""
<li>
{file_info["relative_path"]} ({file_size_gb:.3f} GB)
<a href="/data/{file_info["relative_path"]}" download="{file_info["relative_path"]}">Download</a>
<button onclick="deleteFile('{file_info["relative_path"]}')">Delete</button>
</li>
"""
return f"""
<html>
<head>
<title>Files in Directory</title>
<script src="/static/script.js"></script>
</head>
<body>
<h1>Files in {DIRECTORY}</h1>
<ul>
{files_html}
</ul>
<p><a href="/">Back to Home</a></p>
</body>
</html>
"""
@app.delete("/delete_file/{filename}")
async def delete_single_file(filename: str):
file_path = os.path.join(DIRECTORY, filename)
if os.path.exists(file_path):
try:
os.remove(file_path)
return {"message": f"File {filename} deleted successfully.", "status": "deleted"}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error deleting file {filename}: {e}")
else:
raise HTTPException(status_code=404, detail=f"File {filename} not found.")
@app.get("/")
async def root():
return HTMLResponse("""
<html>
<head>
<title>Market Data Server</title>
<script src="/static/script.js"></script>
</head>
<body>
<h1>Market Data Server</h1>
<p>Welcome to the Market Data Server.</p>
<h2>Database Files</h2>
<p>Access database files directly via /data/your_file.db</p>
<p><a href="/files">List Files</a></p>
</body>
</html>
""")
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=PORT)