WIP: Staged all changes

This commit is contained in:
2025-12-19 22:40:28 +03:00
parent 8f4b469c96
commit ce703322c2
64 changed files with 5985 additions and 833 deletions

9
backend/requirements.txt Normal file
View File

@@ -0,0 +1,9 @@
fastapi
uvicorn
pydantic
authlib
python-multipart
starlette
jsonschema
requests
keyring

52
backend/src/api/auth.py Normal file
View File

@@ -0,0 +1,52 @@
# [DEF:AuthModule:Module]
# @SEMANTICS: auth, authentication, adfs, oauth, middleware
# @PURPOSE: Implements ADFS authentication using Authlib for FastAPI. It provides a dependency to protect endpoints.
# @LAYER: UI (API)
# @RELATION: Used by API routers to protect endpoints that require authentication.
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2AuthorizationCodeBearer
from authlib.integrations.starlette_client import OAuth
from starlette.config import Config
# Placeholder for ADFS configuration. In a real app, this would come from a secure source.
# Create an in-memory .env file
from io import StringIO
config_data = StringIO("""
ADFS_CLIENT_ID=your-client-id
ADFS_CLIENT_SECRET=your-client-secret
ADFS_SERVER_METADATA_URL=https://your-adfs-server/.well-known/openid-configuration
""")
config = Config(config_data)
oauth = OAuth(config)
oauth.register(
name='adfs',
server_metadata_url=config('ADFS_SERVER_METADATA_URL'),
client_kwargs={'scope': 'openid profile email'}
)
oauth2_scheme = OAuth2AuthorizationCodeBearer(
authorizationUrl="https://your-adfs-server/adfs/oauth2/authorize",
tokenUrl="https://your-adfs-server/adfs/oauth2/token",
)
async def get_current_user(token: str = Depends(oauth2_scheme)):
"""
Dependency to get the current user from the ADFS token.
This is a placeholder and needs to be fully implemented.
"""
# In a real implementation, you would:
# 1. Validate the token with ADFS.
# 2. Fetch user information.
# 3. Create a user object.
# For now, we'll just check if a token exists.
if not token:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not authenticated",
headers={"WWW-Authenticate": "Bearer"},
)
# A real implementation would return a user object.
return {"placeholder_user": "user@example.com"}
# [/DEF]

View File

@@ -0,0 +1,22 @@
# [DEF:PluginsRouter:Module]
# @SEMANTICS: api, router, plugins, list
# @PURPOSE: Defines the FastAPI router for plugin-related endpoints, allowing clients to list available plugins.
# @LAYER: UI (API)
# @RELATION: Depends on the PluginLoader and PluginConfig. It is included by the main app.
from typing import List
from fastapi import APIRouter, Depends
from ...core.plugin_base import PluginConfig
from ...dependencies import get_plugin_loader
router = APIRouter()
@router.get("/", response_model=List[PluginConfig])
async def list_plugins(
plugin_loader = Depends(get_plugin_loader)
):
"""
Retrieve a list of all available plugins.
"""
return plugin_loader.get_all_plugin_configs()
# [/DEF]

View File

@@ -0,0 +1,57 @@
# [DEF:TasksRouter:Module]
# @SEMANTICS: api, router, tasks, create, list, get
# @PURPOSE: Defines the FastAPI router for task-related endpoints, allowing clients to create, list, and get the status of tasks.
# @LAYER: UI (API)
# @RELATION: Depends on the TaskManager. It is included by the main app.
from typing import List, Dict, Any
from fastapi import APIRouter, Depends, HTTPException, status
from pydantic import BaseModel
from ...core.task_manager import TaskManager, Task
from ...dependencies import get_task_manager
router = APIRouter()
class CreateTaskRequest(BaseModel):
plugin_id: str
params: Dict[str, Any]
@router.post("/", response_model=Task, status_code=status.HTTP_201_CREATED)
async def create_task(
request: CreateTaskRequest,
task_manager: TaskManager = Depends(get_task_manager)
):
"""
Create and start a new task for a given plugin.
"""
try:
task = await task_manager.create_task(
plugin_id=request.plugin_id,
params=request.params
)
return task
except ValueError as e:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(e))
@router.get("/", response_model=List[Task])
async def list_tasks(
task_manager: TaskManager = Depends(get_task_manager)
):
"""
Retrieve a list of all tasks.
"""
return task_manager.get_all_tasks()
@router.get("/{task_id}", response_model=Task)
async def get_task(
task_id: str,
task_manager: TaskManager = Depends(get_task_manager)
):
"""
Retrieve the details of a specific task.
"""
task = task_manager.get_task(task_id)
if not task:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Task not found")
return task
# [/DEF]

77
backend/src/app.py Normal file
View File

@@ -0,0 +1,77 @@
# [DEF:AppModule:Module]
# @SEMANTICS: app, main, entrypoint, fastapi
# @PURPOSE: The main entry point for the FastAPI application. It initializes the app, configures CORS, sets up dependencies, includes API routers, and defines the WebSocket endpoint for log streaming.
# @LAYER: UI (API)
# @RELATION: Depends on the dependency module and API route modules.
import sys
from pathlib import Path
# Add project root to sys.path to allow importing superset_tool
# Assuming app.py is in backend/src/
project_root = Path(__file__).resolve().parent.parent.parent
sys.path.append(str(project_root))
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Depends
from fastapi.middleware.cors import CORSMiddleware
import asyncio
from .dependencies import get_task_manager
from .core.logger import logger
from .api.routes import plugins, tasks
# [DEF:App:Global]
# @SEMANTICS: app, fastapi, instance
# @PURPOSE: The global FastAPI application instance.
app = FastAPI(
title="Superset Tools API",
description="API for managing Superset automation tools and plugins.",
version="1.0.0",
)
# Configure CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Adjust this in production
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Include API routes
app.include_router(plugins.router, prefix="/plugins", tags=["Plugins"])
app.include_router(tasks.router, prefix="/tasks", tags=["Tasks"])
# [DEF:WebSocketEndpoint:Endpoint]
# @SEMANTICS: websocket, logs, streaming, real-time
# @PURPOSE: Provides a WebSocket endpoint for clients to connect to and receive real-time log entries for a specific task.
@app.websocket("/ws/logs/{task_id}")
async def websocket_endpoint(websocket: WebSocket, task_id: str, task_manager=Depends(get_task_manager)):
await websocket.accept()
logger.info(f"WebSocket connection established for task {task_id}")
try:
# Send initial logs if any
initial_logs = task_manager.get_task_logs(task_id)
for log_entry in initial_logs:
await websocket.send_json(log_entry.dict())
# Keep connection alive, ideally stream new logs as they come
# This part requires a more sophisticated log streaming mechanism (e.g., queues, pub/sub)
# For now, it will just keep the connection open and send initial logs.
while True:
await asyncio.sleep(1) # Keep connection alive, send heartbeat or check for new logs
# In a real system, new logs would be pushed here
except WebSocketDisconnect:
logger.info(f"WebSocket connection disconnected for task {task_id}")
except Exception as e:
logger.error(f"WebSocket error for task {task_id}: {e}")
# [/DEF]
# [DEF:RootEndpoint:Endpoint]
# @SEMANTICS: root, healthcheck
# @PURPOSE: A simple root endpoint to confirm that the API is running.
@app.get("/")
async def read_root():
return {"message": "Superset Tools API is running"}
# [/DEF]

View File

@@ -0,0 +1,92 @@
# [DEF:LoggerModule:Module]
# @SEMANTICS: logging, websocket, streaming, handler
# @PURPOSE: Configures the application's logging system, including a custom handler for buffering logs and streaming them over WebSockets.
# @LAYER: Core
# @RELATION: Used by the main application and other modules to log events. The WebSocketLogHandler is used by the WebSocket endpoint in app.py.
import logging
from datetime import datetime
from typing import Dict, Any, List, Optional
from collections import deque
from pydantic import BaseModel, Field
# Re-using LogEntry from task_manager for consistency
# [DEF:LogEntry:Class]
# @SEMANTICS: log, entry, record, pydantic
# @PURPOSE: A Pydantic model representing a single, structured log entry. This is a re-definition for consistency, as it's also defined in task_manager.py.
class LogEntry(BaseModel):
timestamp: datetime = Field(default_factory=datetime.utcnow)
level: str
message: str
context: Optional[Dict[str, Any]] = None
# [/DEF]
# [DEF:WebSocketLogHandler:Class]
# @SEMANTICS: logging, handler, websocket, buffer
# @PURPOSE: A custom logging handler that captures log records into a buffer. It is designed to be extended for real-time log streaming over WebSockets.
class WebSocketLogHandler(logging.Handler):
"""
A logging handler that stores log records and can be extended to send them
over WebSockets.
"""
def __init__(self, capacity: int = 1000):
super().__init__()
self.log_buffer: deque[LogEntry] = deque(maxlen=capacity)
# In a real implementation, you'd have a way to manage active WebSocket connections
# e.g., self.active_connections: Set[WebSocket] = set()
def emit(self, record: logging.LogRecord):
try:
log_entry = LogEntry(
level=record.levelname,
message=self.format(record),
context={
"name": record.name,
"pathname": record.pathname,
"lineno": record.lineno,
"funcName": record.funcName,
"process": record.process,
"thread": record.thread,
}
)
self.log_buffer.append(log_entry)
# Here you would typically send the log_entry to all active WebSocket connections
# for real-time streaming to the frontend.
# Example: for ws in self.active_connections: await ws.send_json(log_entry.dict())
except Exception:
self.handleError(record)
def get_recent_logs(self) -> List[LogEntry]:
"""
Returns a list of recent log entries from the buffer.
"""
return list(self.log_buffer)
# [/DEF]
# [DEF:Logger:Global]
# @SEMANTICS: logger, global, instance
# @PURPOSE: The global logger instance for the application, configured with both a console handler and the custom WebSocket handler.
logger = logging.getLogger("superset_tools_app")
logger.setLevel(logging.INFO)
# Create a formatter
formatter = logging.Formatter(
'[%(asctime)s][%(levelname)s][%(name)s] %(message)s'
)
# Add console handler
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
# Add WebSocket log handler
websocket_log_handler = WebSocketLogHandler()
websocket_log_handler.setFormatter(formatter)
logger.addHandler(websocket_log_handler)
# Example usage:
# logger.info("Application started", extra={"context_key": "context_value"})
# logger.error("An error occurred", exc_info=True)
# [/DEF]

View File

@@ -0,0 +1,71 @@
from abc import ABC, abstractmethod
from typing import Dict, Any
from pydantic import BaseModel, Field
# [DEF:PluginBase:Class]
# @SEMANTICS: plugin, interface, base, abstract
# @PURPOSE: Defines the abstract base class that all plugins must implement to be recognized by the system. It enforces a common structure for plugin metadata and execution.
# @LAYER: Core
# @RELATION: Used by PluginLoader to identify valid plugins.
# @INVARIANT: All plugins MUST inherit from this class.
class PluginBase(ABC):
"""
Base class for all plugins.
Plugins must inherit from this class and implement the abstract methods.
"""
@property
@abstractmethod
def id(self) -> str:
"""A unique identifier for the plugin."""
pass
@property
@abstractmethod
def name(self) -> str:
"""A human-readable name for the plugin."""
pass
@property
@abstractmethod
def description(self) -> str:
"""A brief description of what the plugin does."""
pass
@property
@abstractmethod
def version(self) -> str:
"""The version of the plugin."""
pass
@abstractmethod
def get_schema(self) -> Dict[str, Any]:
"""
Returns the JSON schema for the plugin's input parameters.
This schema will be used to generate the frontend form.
"""
pass
@abstractmethod
async def execute(self, params: Dict[str, Any]):
"""
Executes the plugin's logic.
The `params` argument will be validated against the schema returned by `get_schema()`.
"""
pass
# [/DEF]
# [DEF:PluginConfig:Class]
# @SEMANTICS: plugin, config, schema, pydantic
# @PURPOSE: A Pydantic model used to represent the validated configuration and metadata of a loaded plugin. This object is what gets exposed to the API layer.
# @LAYER: Core
# @RELATION: Instantiated by PluginLoader after validating a PluginBase instance.
class PluginConfig(BaseModel):
"""Pydantic model for plugin configuration."""
id: str = Field(..., description="Unique identifier for the plugin")
name: str = Field(..., description="Human-readable name for the plugin")
description: str = Field(..., description="Brief description of what the plugin does")
version: str = Field(..., description="Version of the plugin")
input_schema: Dict[str, Any] = Field(..., description="JSON schema for input parameters", alias="schema")
# [/DEF]

View File

@@ -0,0 +1,123 @@
import importlib.util
import os
import sys # Added this line
from typing import Dict, Type, List, Optional
from .plugin_base import PluginBase, PluginConfig
from jsonschema import validate
# [DEF:PluginLoader:Class]
# @SEMANTICS: plugin, loader, dynamic, import
# @PURPOSE: Scans a specified directory for Python modules, dynamically loads them, and registers any classes that are valid implementations of the PluginBase interface.
# @LAYER: Core
# @RELATION: Depends on PluginBase. It is used by the main application to discover and manage available plugins.
class PluginLoader:
"""
Scans a directory for Python modules, loads them, and identifies classes
that inherit from PluginBase.
"""
def __init__(self, plugin_dir: str):
self.plugin_dir = plugin_dir
self._plugins: Dict[str, PluginBase] = {}
self._plugin_configs: Dict[str, PluginConfig] = {}
self._load_plugins()
def _load_plugins(self):
"""
Scans the plugin directory, imports modules, and registers valid plugins.
"""
if not os.path.exists(self.plugin_dir):
os.makedirs(self.plugin_dir)
# Add the plugin directory's parent to sys.path to enable relative imports within plugins
# This assumes plugin_dir is something like 'backend/src/plugins'
# and we want 'backend/src' to be on the path for 'from ..core...' imports
plugin_parent_dir = os.path.abspath(os.path.join(self.plugin_dir, os.pardir))
if plugin_parent_dir not in sys.path:
sys.path.insert(0, plugin_parent_dir)
for filename in os.listdir(self.plugin_dir):
if filename.endswith(".py") and filename != "__init__.py":
module_name = filename[:-3]
file_path = os.path.join(self.plugin_dir, filename)
self._load_module(module_name, file_path)
def _load_module(self, module_name: str, file_path: str):
"""
Loads a single Python module and extracts PluginBase subclasses.
"""
package_name = f"src.plugins.{module_name}"
spec = importlib.util.spec_from_file_location(package_name, file_path)
if spec is None or spec.loader is None:
print(f"Could not load module spec for {package_name}") # Replace with proper logging
return
module = importlib.util.module_from_spec(spec)
try:
spec.loader.exec_module(module)
except Exception as e:
print(f"Error loading plugin module {module_name}: {e}") # Replace with proper logging
return
for attribute_name in dir(module):
attribute = getattr(module, attribute_name)
if (
isinstance(attribute, type)
and issubclass(attribute, PluginBase)
and attribute is not PluginBase
):
try:
plugin_instance = attribute()
self._register_plugin(plugin_instance)
except Exception as e:
print(f"Error instantiating plugin {attribute_name} in {module_name}: {e}") # Replace with proper logging
def _register_plugin(self, plugin_instance: PluginBase):
"""
Registers a valid plugin instance.
"""
plugin_id = plugin_instance.id
if plugin_id in self._plugins:
print(f"Warning: Duplicate plugin ID '{plugin_id}' found. Skipping.") # Replace with proper logging
return
try:
schema = plugin_instance.get_schema()
# Basic validation to ensure it's a dictionary
if not isinstance(schema, dict):
raise TypeError("get_schema() must return a dictionary.")
plugin_config = PluginConfig(
id=plugin_instance.id,
name=plugin_instance.name,
description=plugin_instance.description,
version=plugin_instance.version,
schema=schema,
)
# The following line is commented out because it requires a schema to be passed to validate against.
# The schema provided by the plugin is the one being validated, not the data.
# validate(instance={}, schema=schema)
self._plugins[plugin_id] = plugin_instance
self._plugin_configs[plugin_id] = plugin_config
print(f"Plugin '{plugin_instance.name}' (ID: {plugin_id}) loaded successfully.") # Replace with proper logging
except Exception as e:
print(f"Error validating plugin '{plugin_instance.name}' (ID: {plugin_id}): {e}") # Replace with proper logging
def get_plugin(self, plugin_id: str) -> Optional[PluginBase]:
"""
Returns a loaded plugin instance by its ID.
"""
return self._plugins.get(plugin_id)
def get_all_plugin_configs(self) -> List[PluginConfig]:
"""
Returns a list of all loaded plugin configurations.
"""
return list(self._plugin_configs.values())
def has_plugin(self, plugin_id: str) -> bool:
"""
Checks if a plugin with the given ID is loaded.
"""
return plugin_id in self._plugins

View File

@@ -0,0 +1,131 @@
# [DEF:TaskManagerModule:Module]
# @SEMANTICS: task, manager, lifecycle, execution, state
# @PURPOSE: Manages the lifecycle of tasks, including their creation, execution, and state tracking. It uses a thread pool to run plugins asynchronously.
# @LAYER: Core
# @RELATION: Depends on PluginLoader to get plugin instances. It is used by the API layer to create and query tasks.
import asyncio
import uuid
from datetime import datetime
from enum import Enum
from typing import Dict, Any, List, Optional
from concurrent.futures import ThreadPoolExecutor
from pydantic import BaseModel, Field
# Assuming PluginBase and PluginConfig are defined in plugin_base.py
# from .plugin_base import PluginBase, PluginConfig # Not needed here, TaskManager interacts with the PluginLoader
# [DEF:TaskStatus:Enum]
# @SEMANTICS: task, status, state, enum
# @PURPOSE: Defines the possible states a task can be in during its lifecycle.
class TaskStatus(str, Enum):
PENDING = "PENDING"
RUNNING = "RUNNING"
SUCCESS = "SUCCESS"
FAILED = "FAILED"
# [/DEF]
# [DEF:LogEntry:Class]
# @SEMANTICS: log, entry, record, pydantic
# @PURPOSE: A Pydantic model representing a single, structured log entry associated with a task.
class LogEntry(BaseModel):
timestamp: datetime = Field(default_factory=datetime.utcnow)
level: str
message: str
context: Optional[Dict[str, Any]] = None
# [/DEF]
# [DEF:Task:Class]
# @SEMANTICS: task, job, execution, state, pydantic
# @PURPOSE: A Pydantic model representing a single execution instance of a plugin, including its status, parameters, and logs.
class Task(BaseModel):
id: str = Field(default_factory=lambda: str(uuid.uuid4()))
plugin_id: str
status: TaskStatus = TaskStatus.PENDING
started_at: Optional[datetime] = None
finished_at: Optional[datetime] = None
user_id: Optional[str] = None
logs: List[LogEntry] = Field(default_factory=list)
params: Dict[str, Any] = Field(default_factory=dict)
# [/DEF]
# [DEF:TaskManager:Class]
# @SEMANTICS: task, manager, lifecycle, execution, state
# @PURPOSE: Manages the lifecycle of tasks, including their creation, execution, and state tracking.
class TaskManager:
"""
Manages the lifecycle of tasks, including their creation, execution, and state tracking.
"""
def __init__(self, plugin_loader):
self.plugin_loader = plugin_loader
self.tasks: Dict[str, Task] = {}
self.executor = ThreadPoolExecutor(max_workers=5) # For CPU-bound plugin execution
self.loop = asyncio.get_event_loop()
# [/DEF]
async def create_task(self, plugin_id: str, params: Dict[str, Any], user_id: Optional[str] = None) -> Task:
"""
Creates and queues a new task for execution.
"""
if not self.plugin_loader.has_plugin(plugin_id):
raise ValueError(f"Plugin with ID '{plugin_id}' not found.")
plugin = self.plugin_loader.get_plugin(plugin_id)
# Validate params against plugin schema (this will be done at a higher level, e.g., API route)
# For now, a basic check
if not isinstance(params, dict):
raise ValueError("Task parameters must be a dictionary.")
task = Task(plugin_id=plugin_id, params=params, user_id=user_id)
self.tasks[task.id] = task
self.loop.create_task(self._run_task(task.id)) # Schedule task for execution
return task
async def _run_task(self, task_id: str):
"""
Internal method to execute a task.
"""
task = self.tasks[task_id]
plugin = self.plugin_loader.get_plugin(task.plugin_id)
task.status = TaskStatus.RUNNING
task.started_at = datetime.utcnow()
task.logs.append(LogEntry(level="INFO", message=f"Task started for plugin '{plugin.name}'"))
try:
# Execute plugin in a separate thread to avoid blocking the event loop
# if the plugin's execute method is synchronous and potentially CPU-bound.
# If the plugin's execute method is already async, this can be simplified.
await self.loop.run_in_executor(
self.executor,
lambda: asyncio.run(plugin.execute(task.params)) if asyncio.iscoroutinefunction(plugin.execute) else plugin.execute(task.params)
)
task.status = TaskStatus.SUCCESS
task.logs.append(LogEntry(level="INFO", message=f"Task completed successfully for plugin '{plugin.name}'"))
except Exception as e:
task.status = TaskStatus.FAILED
task.logs.append(LogEntry(level="ERROR", message=f"Task failed: {e}", context={"error_type": type(e).__name__}))
finally:
task.finished_at = datetime.utcnow()
# In a real system, you might notify clients via WebSocket here
def get_task(self, task_id: str) -> Optional[Task]:
"""
Retrieves a task by its ID.
"""
return self.tasks.get(task_id)
def get_all_tasks(self) -> List[Task]:
"""
Retrieves all registered tasks.
"""
return list(self.tasks.values())
def get_task_logs(self, task_id: str) -> List[LogEntry]:
"""
Retrieves logs for a specific task.
"""
task = self.tasks.get(task_id)
return task.logs if task else []

View File

@@ -0,0 +1,24 @@
# [DEF:Dependencies:Module]
# @SEMANTICS: dependency, injection, singleton, factory
# @PURPOSE: Manages the creation and provision of shared application dependencies, such as the PluginLoader and TaskManager, to avoid circular imports.
# @LAYER: Core
# @RELATION: Used by the main app and API routers to get access to shared instances.
from pathlib import Path
from .core.plugin_loader import PluginLoader
from .core.task_manager import TaskManager
# Initialize singletons
# Use absolute path relative to this file to ensure plugins are found regardless of CWD
plugin_dir = Path(__file__).parent / "plugins"
plugin_loader = PluginLoader(plugin_dir=str(plugin_dir))
task_manager = TaskManager(plugin_loader)
def get_plugin_loader() -> PluginLoader:
"""Dependency injector for the PluginLoader."""
return plugin_loader
def get_task_manager() -> TaskManager:
"""Dependency injector for the TaskManager."""
return task_manager
# [/DEF]

View File

@@ -0,0 +1,121 @@
# [DEF:BackupPlugin:Module]
# @SEMANTICS: backup, superset, automation, dashboard, plugin
# @PURPOSE: A plugin that provides functionality to back up Superset dashboards.
# @LAYER: App
# @RELATION: IMPLEMENTS -> PluginBase
# @RELATION: DEPENDS_ON -> superset_tool.client
# @RELATION: DEPENDS_ON -> superset_tool.utils
from typing import Dict, Any
from pathlib import Path
from requests.exceptions import RequestException
from ..core.plugin_base import PluginBase
from superset_tool.client import SupersetClient
from superset_tool.exceptions import SupersetAPIError
from superset_tool.utils.logger import SupersetLogger
from superset_tool.utils.fileio import (
save_and_unpack_dashboard,
archive_exports,
sanitize_filename,
consolidate_archive_folders,
remove_empty_directories,
RetentionPolicy
)
from superset_tool.utils.init_clients import setup_clients
class BackupPlugin(PluginBase):
"""
A plugin to back up Superset dashboards.
"""
@property
def id(self) -> str:
return "superset-backup"
@property
def name(self) -> str:
return "Superset Dashboard Backup"
@property
def description(self) -> str:
return "Backs up all dashboards from a Superset instance."
@property
def version(self) -> str:
return "1.0.0"
def get_schema(self) -> Dict[str, Any]:
return {
"type": "object",
"properties": {
"env": {
"type": "string",
"title": "Environment",
"description": "The Superset environment to back up (e.g., 'dev', 'prod').",
"enum": ["dev", "sbx", "prod", "preprod"],
},
"backup_path": {
"type": "string",
"title": "Backup Path",
"description": "The root directory to save backups to.",
"default": "P:\\Superset\\010 Бекапы"
}
},
"required": ["env", "backup_path"],
}
async def execute(self, params: Dict[str, Any]):
env = params["env"]
backup_path = Path(params["backup_path"])
logger = SupersetLogger(log_dir=backup_path / "Logs", console=True)
logger.info(f"[BackupPlugin][Entry] Starting backup for {env}.")
try:
clients = setup_clients(logger)
client = clients[env]
dashboard_count, dashboard_meta = client.get_dashboards()
logger.info(f"[BackupPlugin][Progress] Found {dashboard_count} dashboards to export in {env}.")
if dashboard_count == 0:
logger.info("[BackupPlugin][Exit] No dashboards to back up.")
return
for db in dashboard_meta:
dashboard_id = db.get('id')
dashboard_title = db.get('dashboard_title', 'Unknown Dashboard')
if not dashboard_id:
continue
try:
dashboard_base_dir_name = sanitize_filename(f"{dashboard_title}")
dashboard_dir = backup_path / env.upper() / dashboard_base_dir_name
dashboard_dir.mkdir(parents=True, exist_ok=True)
zip_content, filename = client.export_dashboard(dashboard_id)
save_and_unpack_dashboard(
zip_content=zip_content,
original_filename=filename,
output_dir=dashboard_dir,
unpack=False,
logger=logger
)
archive_exports(str(dashboard_dir), policy=RetentionPolicy(), logger=logger)
except (SupersetAPIError, RequestException, IOError, OSError) as db_error:
logger.error(f"[BackupPlugin][Failure] Failed to export dashboard {dashboard_title} (ID: {dashboard_id}): {db_error}", exc_info=True)
continue
consolidate_archive_folders(backup_path / env.upper(), logger=logger)
remove_empty_directories(str(backup_path / env.upper()), logger=logger)
logger.info(f"[BackupPlugin][CoherenceCheck:Passed] Backup logic completed for {env}.")
except (RequestException, IOError, KeyError) as e:
logger.critical(f"[BackupPlugin][Failure] Fatal error during backup for {env}: {e}", exc_info=True)
raise e
# [/DEF:BackupPlugin]

View File

@@ -0,0 +1,150 @@
# [DEF:MigrationPlugin:Module]
# @SEMANTICS: migration, superset, automation, dashboard, plugin
# @PURPOSE: A plugin that provides functionality to migrate Superset dashboards between environments.
# @LAYER: App
# @RELATION: IMPLEMENTS -> PluginBase
# @RELATION: DEPENDS_ON -> superset_tool.client
# @RELATION: DEPENDS_ON -> superset_tool.utils
from typing import Dict, Any, List
from pathlib import Path
import zipfile
import re
from ..core.plugin_base import PluginBase
from superset_tool.client import SupersetClient
from superset_tool.utils.init_clients import setup_clients
from superset_tool.utils.fileio import create_temp_file, update_yamls, create_dashboard_export
from superset_tool.utils.logger import SupersetLogger
class MigrationPlugin(PluginBase):
"""
A plugin to migrate Superset dashboards between environments.
"""
@property
def id(self) -> str:
return "superset-migration"
@property
def name(self) -> str:
return "Superset Dashboard Migration"
@property
def description(self) -> str:
return "Migrates dashboards between Superset environments."
@property
def version(self) -> str:
return "1.0.0"
def get_schema(self) -> Dict[str, Any]:
return {
"type": "object",
"properties": {
"from_env": {
"type": "string",
"title": "Source Environment",
"description": "The environment to migrate from.",
"enum": ["dev", "sbx", "prod", "preprod"],
},
"to_env": {
"type": "string",
"title": "Target Environment",
"description": "The environment to migrate to.",
"enum": ["dev", "sbx", "prod", "preprod"],
},
"dashboard_regex": {
"type": "string",
"title": "Dashboard Regex",
"description": "A regular expression to filter dashboards to migrate.",
},
"replace_db_config": {
"type": "boolean",
"title": "Replace DB Config",
"description": "Whether to replace the database configuration.",
"default": False,
},
"from_db_id": {
"type": "integer",
"title": "Source DB ID",
"description": "The ID of the source database to replace (if replacing).",
},
"to_db_id": {
"type": "integer",
"title": "Target DB ID",
"description": "The ID of the target database to replace with (if replacing).",
},
},
"required": ["from_env", "to_env", "dashboard_regex"],
}
async def execute(self, params: Dict[str, Any]):
from_env = params["from_env"]
to_env = params["to_env"]
dashboard_regex = params["dashboard_regex"]
replace_db_config = params.get("replace_db_config", False)
from_db_id = params.get("from_db_id")
to_db_id = params.get("to_db_id")
logger = SupersetLogger(log_dir=Path.cwd() / "logs", console=True)
logger.info(f"[MigrationPlugin][Entry] Starting migration from {from_env} to {to_env}.")
try:
all_clients = setup_clients(logger)
from_c = all_clients[from_env]
to_c = all_clients[to_env]
_, all_dashboards = from_c.get_dashboards()
regex_str = str(dashboard_regex)
dashboards_to_migrate = [
d for d in all_dashboards if re.search(regex_str, d["dashboard_title"], re.IGNORECASE)
]
if not dashboards_to_migrate:
logger.warning("[MigrationPlugin][State] No dashboards found matching the regex.")
return
db_config_replacement = None
if replace_db_config:
if from_db_id is None or to_db_id is None:
raise ValueError("Source and target database IDs are required when replacing database configuration.")
from_db = from_c.get_database(int(from_db_id))
to_db = to_c.get_database(int(to_db_id))
old_result = from_db.get("result", {})
new_result = to_db.get("result", {})
db_config_replacement = {
"old": {"database_name": old_result.get("database_name"), "uuid": old_result.get("uuid"), "id": str(from_db.get("id"))},
"new": {"database_name": new_result.get("database_name"), "uuid": new_result.get("uuid"), "id": str(to_db.get("id"))}
}
for dash in dashboards_to_migrate:
dash_id, dash_slug, title = dash["id"], dash.get("slug"), dash["dashboard_title"]
try:
exported_content, _ = from_c.export_dashboard(dash_id)
with create_temp_file(content=exported_content, dry_run=True, suffix=".zip", logger=logger) as tmp_zip_path:
if not db_config_replacement:
to_c.import_dashboard(file_name=tmp_zip_path, dash_id=dash_id, dash_slug=dash_slug)
else:
with create_temp_file(suffix=".dir", logger=logger) as tmp_unpack_dir:
with zipfile.ZipFile(tmp_zip_path, "r") as zip_ref:
zip_ref.extractall(tmp_unpack_dir)
update_yamls(db_configs=[db_config_replacement], path=str(tmp_unpack_dir))
with create_temp_file(suffix=".zip", dry_run=True, logger=logger) as tmp_new_zip:
create_dashboard_export(zip_path=tmp_new_zip, source_paths=[str(p) for p in Path(tmp_unpack_dir).glob("**/*")])
to_c.import_dashboard(file_name=tmp_new_zip, dash_id=dash_id, dash_slug=dash_slug)
logger.info(f"[MigrationPlugin][Success] Dashboard {title} imported.")
except Exception as exc:
logger.error(f"[MigrationPlugin][Failure] Failed to migrate dashboard {title}: {exc}", exc_info=True)
logger.info("[MigrationPlugin][Exit] Migration finished.")
except Exception as e:
logger.critical(f"[MigrationPlugin][Failure] Fatal error during migration: {e}", exc_info=True)
raise e
# [/DEF:MigrationPlugin]