156 lines
5.4 KiB
Python
156 lines
5.4 KiB
Python
from fastapi import FastAPI, HTTPException
|
|
from fastapi.responses import FileResponse, HTMLResponse
|
|
from fastapi.staticfiles import StaticFiles
|
|
from pydantic import BaseModel
|
|
from typing import List
|
|
import uvicorn
|
|
import os
|
|
import hashlib
|
|
|
|
PORT = 8051
|
|
# Get the absolute path to the directory where this script is located
|
|
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
|
|
DIRECTORY = os.path.join(SCRIPT_DIR, "data", "db")
|
|
|
|
# Ensure the data directory exists (if not already handled elsewhere)
|
|
os.makedirs(DIRECTORY, exist_ok=True)
|
|
|
|
app = FastAPI()
|
|
|
|
# Pydantic model for the delete file request
|
|
class DeleteFileRequest(BaseModel):
|
|
name: str
|
|
|
|
@app.post("/api/files/list", response_model=List[str])
|
|
async def api_list_files():
|
|
"""
|
|
Lists all files in the data/db directory.
|
|
"""
|
|
try:
|
|
files = [f for f in os.listdir(DIRECTORY) if os.path.isfile(os.path.join(DIRECTORY, f)) and f.endswith(".db")]
|
|
return files
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"An error occurred while listing files: {e}")
|
|
|
|
@app.post("/api/files/hash")
|
|
async def api_file_hash(request: DeleteFileRequest):
|
|
"""
|
|
Calculates and returns the SHA256 hash for a specific file.
|
|
"""
|
|
file_path = os.path.join(DIRECTORY, request.name)
|
|
|
|
if not os.path.exists(file_path) or not os.path.isfile(file_path):
|
|
raise HTTPException(status_code=404, detail=f"File not found: {request.name}")
|
|
|
|
try:
|
|
hasher = hashlib.sha256()
|
|
with open(file_path, "rb") as afile:
|
|
while chunk := afile.read(8192):
|
|
hasher.update(chunk)
|
|
return {"name": request.name, "hash": hasher.hexdigest()}
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Could not calculate hash for file: {e}")
|
|
|
|
@app.post("/api/files/delete")
|
|
async def api_delete_file(request: DeleteFileRequest):
|
|
"""
|
|
Deletes a specified file from the data/db directory.
|
|
"""
|
|
file_path = os.path.join(DIRECTORY, request.name)
|
|
|
|
if not os.path.exists(file_path) or not os.path.isfile(file_path):
|
|
raise HTTPException(status_code=404, detail=f"File not found: {request.name}")
|
|
|
|
try:
|
|
os.remove(file_path)
|
|
return {"success": True}
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Could not delete file: {e}")
|
|
|
|
# Mount static files to serve content from the database directory
|
|
app.mount("/data", StaticFiles(directory=DIRECTORY), name="data")
|
|
app.mount("/static", StaticFiles(directory="static"), name="static")
|
|
|
|
@app.get("/files", response_class=HTMLResponse)
|
|
async def list_files():
|
|
db_files_info = []
|
|
for root, _, files in os.walk(DIRECTORY):
|
|
for file in files:
|
|
if file.endswith(".db"):
|
|
file_path = os.path.join(root, file)
|
|
relative_path = os.path.relpath(file_path, DIRECTORY)
|
|
try:
|
|
file_size = os.path.getsize(file_path)
|
|
creation_time = os.path.getctime(file_path)
|
|
db_files_info.append({
|
|
"relative_path": relative_path,
|
|
"file_size": file_size,
|
|
"creation_time": creation_time
|
|
})
|
|
except Exception as e:
|
|
# Log or handle error for specific file without breaking the list
|
|
print(f"Error accessing {file_path}: {e}")
|
|
|
|
# Sort files by creation time
|
|
db_files_info.sort(key=lambda x: x["creation_time"])
|
|
|
|
files_html = ""
|
|
for file_info in db_files_info:
|
|
# convert to GB
|
|
file_size_gb = file_info["file_size"] / (1024 * 1024 * 1024)
|
|
files_html += f"""
|
|
<li>
|
|
{file_info["relative_path"]} ({file_size_gb:.3f} GB)
|
|
<a href="/data/{file_info["relative_path"]}" download="{file_info["relative_path"]}">Download</a>
|
|
<button onclick="deleteFile('{file_info["relative_path"]}')">Delete</button>
|
|
</li>
|
|
"""
|
|
|
|
return f"""
|
|
<html>
|
|
<head>
|
|
<title>Files in Directory</title>
|
|
<script src="/static/script.js"></script>
|
|
</head>
|
|
<body>
|
|
<h1>Files in {DIRECTORY}</h1>
|
|
<ul>
|
|
{files_html}
|
|
</ul>
|
|
<p><a href="/">Back to Home</a></p>
|
|
</body>
|
|
</html>
|
|
"""
|
|
|
|
@app.delete("/delete_file/{filename}")
|
|
async def delete_single_file(filename: str):
|
|
file_path = os.path.join(DIRECTORY, filename)
|
|
if os.path.exists(file_path):
|
|
try:
|
|
os.remove(file_path)
|
|
return {"message": f"File {filename} deleted successfully.", "status": "deleted"}
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Error deleting file {filename}: {e}")
|
|
else:
|
|
raise HTTPException(status_code=404, detail=f"File {filename} not found.")
|
|
|
|
@app.get("/")
|
|
async def root():
|
|
return HTMLResponse("""
|
|
<html>
|
|
<head>
|
|
<title>Market Data Server</title>
|
|
<script src="/static/script.js"></script>
|
|
</head>
|
|
<body>
|
|
<h1>Market Data Server</h1>
|
|
<p>Welcome to the Market Data Server.</p>
|
|
<h2>Database Files</h2>
|
|
<p>Access database files directly via /data/your_file.db</p>
|
|
<p><a href="/files">List Files</a></p>
|
|
</body>
|
|
</html>
|
|
""")
|
|
|
|
if __name__ == "__main__":
|
|
uvicorn.run(app, host="0.0.0.0", port=PORT) |