MarketDataCollector/simple_server.py
2025-10-20 11:48:48 +08:00

134 lines
4.5 KiB
Python

from fastapi import FastAPI, HTTPException
from fastapi.responses import FileResponse, HTMLResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
from typing import List
import uvicorn
import os
PORT = 8051
DIRECTORY = "./data/db"
# Ensure the data directory exists (if not already handled elsewhere)
os.makedirs(DIRECTORY, exist_ok=True)
app = FastAPI()
# Pydantic model for the delete file request
class DeleteFileRequest(BaseModel):
name: str
@app.post("/api/files/list", response_model=List[str])
async def api_list_files():
"""
Lists all files in the data/db directory.
"""
try:
files = [f for f in os.listdir(DIRECTORY) if os.path.isfile(os.path.join(DIRECTORY, f)) and f.endswith(".db")]
return files
except Exception as e:
raise HTTPException(status_code=500, detail=f"An error occurred while listing files: {e}")
@app.post("/api/files/delete")
async def api_delete_file(request: DeleteFileRequest):
"""
Deletes a specified file from the data/db directory.
"""
file_path = os.path.join(DIRECTORY, request.name)
if not os.path.exists(file_path) or not os.path.isfile(file_path):
raise HTTPException(status_code=404, detail=f"File not found: {request.name}")
try:
os.remove(file_path)
return {"success": True}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Could not delete file: {e}")
# Mount static files to serve content from the database directory
app.mount("/data", StaticFiles(directory=DIRECTORY), name="data")
app.mount("/static", StaticFiles(directory="static"), name="static")
@app.get("/files", response_class=HTMLResponse)
async def list_files():
db_files_info = []
for root, _, files in os.walk(DIRECTORY):
for file in files:
if file.endswith(".db"):
file_path = os.path.join(root, file)
relative_path = os.path.relpath(file_path, DIRECTORY)
try:
file_size = os.path.getsize(file_path)
creation_time = os.path.getctime(file_path)
db_files_info.append({
"relative_path": relative_path,
"file_size": file_size,
"creation_time": creation_time
})
except Exception as e:
# Log or handle error for specific file without breaking the list
print(f"Error accessing {file_path}: {e}")
# Sort files by creation time
db_files_info.sort(key=lambda x: x["creation_time"])
files_html = ""
for file_info in db_files_info:
# convert to GB
file_size_gb = file_info["file_size"] / (1024 * 1024 * 1024)
files_html += f"""
<li>
{file_info["relative_path"]} ({file_size_gb:.3f} GB)
<a href="/data/{file_info["relative_path"]}" download="{file_info["relative_path"]}">Download</a>
<button onclick="deleteFile('{file_info["relative_path"]}')">Delete</button>
</li>
"""
return f"""
<html>
<head>
<title>Files in Directory</title>
<script src="/static/script.js"></script>
</head>
<body>
<h1>Files in {DIRECTORY}</h1>
<ul>
{files_html}
</ul>
<p><a href="/">Back to Home</a></p>
</body>
</html>
"""
@app.delete("/delete_file/{filename}")
async def delete_single_file(filename: str):
file_path = os.path.join(DIRECTORY, filename)
if os.path.exists(file_path):
try:
os.remove(file_path)
return {"message": f"File {filename} deleted successfully.", "status": "deleted"}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error deleting file {filename}: {e}")
else:
raise HTTPException(status_code=404, detail=f"File {filename} not found.")
@app.get("/")
async def root():
return HTMLResponse("""
<html>
<head>
<title>Market Data Server</title>
<script src="/static/script.js"></script>
</head>
<body>
<h1>Market Data Server</h1>
<p>Welcome to the Market Data Server.</p>
<h2>Database Files</h2>
<p>Access database files directly via /data/your_file.db</p>
<p><a href="/files">List Files</a></p>
</body>
</html>
""")
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=PORT)