- Add SQLite database integration for environments and mappings - Update TaskManager to support pausing tasks (AWAITING_MAPPING) - Modify MigrationPlugin to detect missing mappings and wait for resolution - Add frontend UI for handling missing mappings interactively - Create dedicated migration routes and API endpoints - Update .gitignore and project documentation
120 lines
4.7 KiB
Python
Executable File
120 lines
4.7 KiB
Python
Executable File
# [DEF:AppModule:Module]
|
|
# @SEMANTICS: app, main, entrypoint, fastapi
|
|
# @PURPOSE: The main entry point for the FastAPI application. It initializes the app, configures CORS, sets up dependencies, includes API routers, and defines the WebSocket endpoint for log streaming.
|
|
# @LAYER: UI (API)
|
|
# @RELATION: Depends on the dependency module and API route modules.
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
# Add project root to sys.path to allow importing superset_tool
|
|
# Assuming app.py is in backend/src/
|
|
project_root = Path(__file__).resolve().parent.parent.parent
|
|
sys.path.append(str(project_root))
|
|
|
|
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Depends
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from fastapi.staticfiles import StaticFiles
|
|
from fastapi.responses import FileResponse
|
|
import asyncio
|
|
import os
|
|
|
|
from .dependencies import get_task_manager
|
|
from .core.logger import logger
|
|
from .api.routes import plugins, tasks, settings, environments, mappings
|
|
from .core.database import init_db
|
|
|
|
# Initialize database
|
|
init_db()
|
|
|
|
# [DEF:App:Global]
|
|
# @SEMANTICS: app, fastapi, instance
|
|
# @PURPOSE: The global FastAPI application instance.
|
|
app = FastAPI(
|
|
title="Superset Tools API",
|
|
description="API for managing Superset automation tools and plugins.",
|
|
version="1.0.0",
|
|
)
|
|
|
|
# Configure CORS
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=["*"], # Adjust this in production
|
|
allow_credentials=True,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
|
|
# Include API routes
|
|
app.include_router(plugins.router, prefix="/api/plugins", tags=["Plugins"])
|
|
app.include_router(tasks.router, prefix="/api/tasks", tags=["Tasks"])
|
|
app.include_router(settings.router, prefix="/api/settings", tags=["Settings"])
|
|
app.include_router(environments.router)
|
|
app.include_router(mappings.router)
|
|
|
|
# [DEF:WebSocketEndpoint:Endpoint]
|
|
# @SEMANTICS: websocket, logs, streaming, real-time
|
|
# @PURPOSE: Provides a WebSocket endpoint for clients to connect to and receive real-time log entries for a specific task.
|
|
@app.websocket("/ws/logs/{task_id}")
|
|
async def websocket_endpoint(websocket: WebSocket, task_id: str):
|
|
await websocket.accept()
|
|
logger.info(f"WebSocket connection accepted for task {task_id}")
|
|
task_manager = get_task_manager()
|
|
queue = await task_manager.subscribe_logs(task_id)
|
|
try:
|
|
# Send initial logs if any
|
|
initial_logs = task_manager.get_task_logs(task_id)
|
|
for log_entry in initial_logs:
|
|
# Convert datetime to string for JSON serialization
|
|
log_dict = log_entry.dict()
|
|
log_dict['timestamp'] = log_dict['timestamp'].isoformat()
|
|
await websocket.send_json(log_dict)
|
|
|
|
# Stream new logs
|
|
logger.info(f"Starting log stream for task {task_id}")
|
|
while True:
|
|
log_entry = await queue.get()
|
|
log_dict = log_entry.dict()
|
|
log_dict['timestamp'] = log_dict['timestamp'].isoformat()
|
|
await websocket.send_json(log_dict)
|
|
|
|
# If task is finished, we could potentially close the connection
|
|
# but let's keep it open for a bit or until the client disconnects
|
|
if "Task completed successfully" in log_entry.message or "Task failed" in log_entry.message:
|
|
# Wait a bit to ensure client receives the last message
|
|
await asyncio.sleep(2)
|
|
break
|
|
|
|
except WebSocketDisconnect:
|
|
logger.info(f"WebSocket connection disconnected for task {task_id}")
|
|
except Exception as e:
|
|
logger.error(f"WebSocket error for task {task_id}: {e}")
|
|
finally:
|
|
task_manager.unsubscribe_logs(task_id, queue)
|
|
|
|
# [/DEF]
|
|
|
|
# [DEF:StaticFiles:Mount]
|
|
# @SEMANTICS: static, frontend, spa
|
|
# @PURPOSE: Mounts the frontend build directory to serve static assets.
|
|
frontend_path = project_root / "frontend" / "build"
|
|
if frontend_path.exists():
|
|
app.mount("/_app", StaticFiles(directory=str(frontend_path / "_app")), name="static")
|
|
|
|
# Serve other static files from the root of build directory
|
|
@app.get("/{file_path:path}")
|
|
async def serve_spa(file_path: str):
|
|
full_path = frontend_path / file_path
|
|
if full_path.is_file():
|
|
return FileResponse(str(full_path))
|
|
# Fallback to index.html for SPA routing
|
|
return FileResponse(str(frontend_path / "index.html"))
|
|
else:
|
|
# [DEF:RootEndpoint:Endpoint]
|
|
# @SEMANTICS: root, healthcheck
|
|
# @PURPOSE: A simple root endpoint to confirm that the API is running.
|
|
@app.get("/")
|
|
async def read_root():
|
|
return {"message": "Superset Tools API is running (Frontend build not found)"}
|
|
# [/DEF]
|