feat(migration): implement interactive mapping resolution workflow

- Add SQLite database integration for environments and mappings
- Update TaskManager to support pausing tasks (AWAITING_MAPPING)
- Modify MigrationPlugin to detect missing mappings and wait for resolution
- Add frontend UI for handling missing mappings interactively
- Create dedicated migration routes and API endpoints
- Update .gitignore and project documentation
This commit is contained in:
2025-12-25 22:27:29 +03:00
parent 43b4c75e36
commit 2ffc3cc68f
38 changed files with 2437 additions and 51 deletions

BIN
backend/mappings.db Normal file

Binary file not shown.

View File

@@ -9,4 +9,6 @@ requests
keyring
httpx
PyYAML
websockets
websockets
rapidfuzz
sqlalchemy

View File

@@ -0,0 +1,75 @@
# [DEF:backend.src.api.routes.environments:Module]
#
# @SEMANTICS: api, environments, superset, databases
# @PURPOSE: API endpoints for listing environments and their databases.
# @LAYER: API
# @RELATION: DEPENDS_ON -> backend.src.dependencies
# @RELATION: DEPENDS_ON -> backend.src.core.superset_client
#
# @INVARIANT: Environment IDs must exist in the configuration.
# [SECTION: IMPORTS]
from fastapi import APIRouter, Depends, HTTPException
from typing import List, Dict, Optional
from backend.src.dependencies import get_config_manager
from backend.src.core.superset_client import SupersetClient
from superset_tool.models import SupersetConfig
from pydantic import BaseModel
# [/SECTION]
router = APIRouter(prefix="/api/environments", tags=["environments"])
# [DEF:EnvironmentResponse:DataClass]
class EnvironmentResponse(BaseModel):
id: str
name: str
url: str
# [/DEF:EnvironmentResponse]
# [DEF:DatabaseResponse:DataClass]
class DatabaseResponse(BaseModel):
uuid: str
database_name: str
engine: Optional[str]
# [/DEF:DatabaseResponse]
# [DEF:get_environments:Function]
# @PURPOSE: List all configured environments.
# @RETURN: List[EnvironmentResponse]
@router.get("", response_model=List[EnvironmentResponse])
async def get_environments(config_manager=Depends(get_config_manager)):
envs = config_manager.get_environments()
return [EnvironmentResponse(id=e.id, name=e.name, url=e.url) for e in envs]
# [/DEF:get_environments]
# [DEF:get_environment_databases:Function]
# @PURPOSE: Fetch the list of databases from a specific environment.
# @PARAM: id (str) - The environment ID.
# @RETURN: List[Dict] - List of databases.
@router.get("/{id}/databases")
async def get_environment_databases(id: str, config_manager=Depends(get_config_manager)):
envs = config_manager.get_environments()
env = next((e for e in envs if e.id == id), None)
if not env:
raise HTTPException(status_code=404, detail="Environment not found")
try:
# Initialize SupersetClient from environment config
# Note: We need to map Environment model to SupersetConfig
superset_config = SupersetConfig(
env=env.name,
base_url=env.url,
auth={
"provider": "db", # Defaulting to db provider
"username": env.username,
"password": env.password,
"refresh": "false"
}
)
client = SupersetClient(superset_config)
return client.get_databases_summary()
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to fetch databases: {str(e)}")
# [/DEF:get_environment_databases]
# [/DEF:backend.src.api.routes.environments]

View File

@@ -0,0 +1,110 @@
# [DEF:backend.src.api.routes.mappings:Module]
#
# @SEMANTICS: api, mappings, database, fuzzy-matching
# @PURPOSE: API endpoints for managing database mappings and getting suggestions.
# @LAYER: API
# @RELATION: DEPENDS_ON -> backend.src.dependencies
# @RELATION: DEPENDS_ON -> backend.src.core.database
# @RELATION: DEPENDS_ON -> backend.src.services.mapping_service
#
# @INVARIANT: Mappings are persisted in the SQLite database.
# [SECTION: IMPORTS]
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from typing import List, Optional
from backend.src.dependencies import get_config_manager
from backend.src.core.database import get_db
from backend.src.models.mapping import DatabaseMapping
from pydantic import BaseModel
# [/SECTION]
router = APIRouter(prefix="/api/mappings", tags=["mappings"])
# [DEF:MappingCreate:DataClass]
class MappingCreate(BaseModel):
source_env_id: str
target_env_id: str
source_db_uuid: str
target_db_uuid: str
source_db_name: str
target_db_name: str
# [/DEF:MappingCreate]
# [DEF:MappingResponse:DataClass]
class MappingResponse(BaseModel):
id: str
source_env_id: str
target_env_id: str
source_db_uuid: str
target_db_uuid: str
source_db_name: str
target_db_name: str
class Config:
from_attributes = True
# [/DEF:MappingResponse]
# [DEF:SuggestRequest:DataClass]
class SuggestRequest(BaseModel):
source_env_id: str
target_env_id: str
# [/DEF:SuggestRequest]
# [DEF:get_mappings:Function]
# @PURPOSE: List all saved database mappings.
@router.get("", response_model=List[MappingResponse])
async def get_mappings(
source_env_id: Optional[str] = None,
target_env_id: Optional[str] = None,
db: Session = Depends(get_db)
):
query = db.query(DatabaseMapping)
if source_env_id:
query = query.filter(DatabaseMapping.source_env_id == source_env_id)
if target_env_id:
query = query.filter(DatabaseMapping.target_env_id == target_env_id)
return query.all()
# [/DEF:get_mappings]
# [DEF:create_mapping:Function]
# @PURPOSE: Create or update a database mapping.
@router.post("", response_model=MappingResponse)
async def create_mapping(mapping: MappingCreate, db: Session = Depends(get_db)):
# Check if mapping already exists
existing = db.query(DatabaseMapping).filter(
DatabaseMapping.source_env_id == mapping.source_env_id,
DatabaseMapping.target_env_id == mapping.target_env_id,
DatabaseMapping.source_db_uuid == mapping.source_db_uuid
).first()
if existing:
existing.target_db_uuid = mapping.target_db_uuid
existing.target_db_name = mapping.target_db_name
db.commit()
db.refresh(existing)
return existing
new_mapping = DatabaseMapping(**mapping.dict())
db.add(new_mapping)
db.commit()
db.refresh(new_mapping)
return new_mapping
# [/DEF:create_mapping]
# [DEF:suggest_mappings_api:Function]
# @PURPOSE: Get suggested mappings based on fuzzy matching.
@router.post("/suggest")
async def suggest_mappings_api(
request: SuggestRequest,
config_manager=Depends(get_config_manager)
):
from backend.src.services.mapping_service import MappingService
service = MappingService(config_manager)
try:
return await service.get_suggestions(request.source_env_id, request.target_env_id)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# [/DEF:suggest_mappings_api]
# [/DEF:backend.src.api.routes.mappings]

View File

@@ -16,6 +16,9 @@ class CreateTaskRequest(BaseModel):
plugin_id: str
params: Dict[str, Any]
class ResolveTaskRequest(BaseModel):
resolution_params: Dict[str, Any]
@router.post("/", response_model=Task, status_code=status.HTTP_201_CREATED)
async def create_task(
request: CreateTaskRequest,
@@ -54,4 +57,19 @@ async def get_task(
if not task:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Task not found")
return task
@router.post("/{task_id}/resolve", response_model=Task)
async def resolve_task(
task_id: str,
request: ResolveTaskRequest,
task_manager: TaskManager = Depends(get_task_manager)
):
"""
Resolve a task that is awaiting mapping.
"""
try:
await task_manager.resolve_task(task_id, request.resolution_params)
return task_manager.get_task(task_id)
except ValueError as e:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
# [/DEF]

View File

@@ -20,7 +20,11 @@ import os
from .dependencies import get_task_manager
from .core.logger import logger
from .api.routes import plugins, tasks, settings
from .api.routes import plugins, tasks, settings, environments, mappings
from .core.database import init_db
# Initialize database
init_db()
# [DEF:App:Global]
# @SEMANTICS: app, fastapi, instance
@@ -45,6 +49,8 @@ app.add_middleware(
app.include_router(plugins.router, prefix="/api/plugins", tags=["Plugins"])
app.include_router(tasks.router, prefix="/api/tasks", tags=["Tasks"])
app.include_router(settings.router, prefix="/api/settings", tags=["Settings"])
app.include_router(environments.router)
app.include_router(mappings.router)
# [DEF:WebSocketEndpoint:Endpoint]
# @SEMANTICS: websocket, logs, streaming, real-time

View File

@@ -0,0 +1,48 @@
# [DEF:backend.src.core.database:Module]
#
# @SEMANTICS: database, sqlite, sqlalchemy, session, persistence
# @PURPOSE: Configures the SQLite database connection and session management.
# @LAYER: Core
# @RELATION: DEPENDS_ON -> sqlalchemy
# @RELATION: USES -> backend.src.models.mapping
#
# @INVARIANT: A single engine instance is used for the entire application.
# [SECTION: IMPORTS]
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, Session
from backend.src.models.mapping import Base
import os
# [/SECTION]
# [DEF:DATABASE_URL:Constant]
DATABASE_URL = os.getenv("DATABASE_URL", "sqlite:///./mappings.db")
# [/DEF:DATABASE_URL]
# [DEF:engine:Variable]
engine = create_engine(DATABASE_URL, connect_args={"check_same_thread": False})
# [/DEF:engine]
# [DEF:SessionLocal:Class]
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# [/DEF:SessionLocal]
# [DEF:init_db:Function]
# @PURPOSE: Initializes the database by creating all tables.
def init_db():
Base.metadata.create_all(bind=engine)
# [/DEF:init_db]
# [DEF:get_db:Function]
# @PURPOSE: Dependency for getting a database session.
# @POST: Session is closed after use.
# @RETURN: Generator[Session, None, None]
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
# [/DEF:get_db]
# [/DEF:backend.src.core.database]

View File

@@ -0,0 +1,81 @@
# [DEF:backend.src.core.migration_engine:Module]
#
# @SEMANTICS: migration, engine, zip, yaml, transformation
# @PURPOSE: Handles the interception and transformation of Superset asset ZIP archives.
# @LAYER: Core
# @RELATION: DEPENDS_ON -> PyYAML
#
# @INVARIANT: ZIP structure must be preserved after transformation.
# [SECTION: IMPORTS]
import zipfile
import yaml
import os
import shutil
import tempfile
from pathlib import Path
from typing import Dict
# [/SECTION]
# [DEF:MigrationEngine:Class]
# @PURPOSE: Engine for transforming Superset export ZIPs.
class MigrationEngine:
# [DEF:MigrationEngine.transform_zip:Function]
# @PURPOSE: Extracts ZIP, replaces database UUIDs in YAMLs, and re-packages.
# @PARAM: zip_path (str) - Path to the source ZIP file.
# @PARAM: output_path (str) - Path where the transformed ZIP will be saved.
# @PARAM: db_mapping (Dict[str, str]) - Mapping of source UUID to target UUID.
# @RETURN: bool - True if successful.
def transform_zip(self, zip_path: str, output_path: str, db_mapping: Dict[str, str]) -> bool:
"""
Transform a Superset export ZIP by replacing database UUIDs.
"""
with tempfile.TemporaryDirectory() as temp_dir_str:
temp_dir = Path(temp_dir_str)
try:
# 1. Extract
with zipfile.ZipFile(zip_path, 'r') as zf:
zf.extractall(temp_dir)
# 2. Transform YAMLs
# Datasets are usually in datasets/*.yaml
dataset_files = list(temp_dir.glob("**/datasets/*.yaml"))
for ds_file in dataset_files:
self._transform_yaml(ds_file, db_mapping)
# 3. Re-package
with zipfile.ZipFile(output_path, 'w', zipfile.ZIP_DEFLATED) as zf:
for root, dirs, files in os.walk(temp_dir):
for file in files:
file_path = Path(root) / file
arcname = file_path.relative_to(temp_dir)
zf.write(file_path, arcname)
return True
except Exception as e:
print(f"Error transforming ZIP: {e}")
return False
# [DEF:MigrationEngine._transform_yaml:Function]
# @PURPOSE: Replaces database_uuid in a single YAML file.
def _transform_yaml(self, file_path: Path, db_mapping: Dict[str, str]):
with open(file_path, 'r') as f:
data = yaml.safe_load(f)
if not data:
return
# Superset dataset YAML structure:
# database_uuid: ...
source_uuid = data.get('database_uuid')
if source_uuid in db_mapping:
data['database_uuid'] = db_mapping[source_uuid]
with open(file_path, 'w') as f:
yaml.dump(data, f)
# [/DEF:MigrationEngine._transform_yaml]
# [/DEF:MigrationEngine]
# [/DEF:backend.src.core.migration_engine]

View File

@@ -0,0 +1,57 @@
# [DEF:backend.src.core.superset_client:Module]
#
# @SEMANTICS: superset, api, client, database, metadata
# @PURPOSE: Extends the base SupersetClient with database-specific metadata fetching.
# @LAYER: Core
# @RELATION: INHERITS_FROM -> superset_tool.client.SupersetClient
#
# @INVARIANT: All database metadata requests must include UUID and name.
# [SECTION: IMPORTS]
from typing import List, Dict, Optional, Tuple
from superset_tool.client import SupersetClient as BaseSupersetClient
from superset_tool.models import SupersetConfig
# [/SECTION]
# [DEF:SupersetClient:Class]
# @PURPOSE: Extended SupersetClient for migration-specific operations.
class SupersetClient(BaseSupersetClient):
# [DEF:SupersetClient.get_databases_summary:Function]
# @PURPOSE: Fetch a summary of databases including uuid, name, and engine.
# @POST: Returns a list of database dictionaries with 'engine' field.
# @RETURN: List[Dict] - Summary of databases.
def get_databases_summary(self) -> List[Dict]:
"""
Fetch a summary of databases including uuid, name, and engine.
"""
query = {
"columns": ["uuid", "database_name", "backend"]
}
_, databases = self.get_databases(query=query)
# Map 'backend' to 'engine' for consistency with contracts
for db in databases:
db['engine'] = db.pop('backend', None)
return databases
# [/DEF:SupersetClient.get_databases_summary]
# [DEF:SupersetClient.get_database_by_uuid:Function]
# @PURPOSE: Find a database by its UUID.
# @PARAM: db_uuid (str) - The UUID of the database.
# @RETURN: Optional[Dict] - Database info if found, else None.
def get_database_by_uuid(self, db_uuid: str) -> Optional[Dict]:
"""
Find a database by its UUID.
"""
query = {
"filters": [{"col": "uuid", "op": "eq", "value": db_uuid}]
}
_, databases = self.get_databases(query=query)
return databases[0] if databases else None
# [/DEF:SupersetClient.get_database_by_uuid]
# [/DEF:SupersetClient]
# [/DEF:backend.src.core.superset_client]

View File

@@ -23,6 +23,7 @@ class TaskStatus(str, Enum):
RUNNING = "RUNNING"
SUCCESS = "SUCCESS"
FAILED = "FAILED"
AWAITING_MAPPING = "AWAITING_MAPPING"
# [/DEF]
@@ -64,6 +65,7 @@ class TaskManager:
self.subscribers: Dict[str, List[asyncio.Queue]] = {}
self.executor = ThreadPoolExecutor(max_workers=5) # For CPU-bound plugin execution
self.loop = asyncio.get_event_loop()
self.task_futures: Dict[str, asyncio.Future] = {}
# [/DEF]
async def create_task(self, plugin_id: str, params: Dict[str, Any], user_id: Optional[str] = None) -> Task:
@@ -99,9 +101,11 @@ class TaskManager:
# Execute plugin in a separate thread to avoid blocking the event loop
# if the plugin's execute method is synchronous and potentially CPU-bound.
# If the plugin's execute method is already async, this can be simplified.
# Pass task_id to plugin so it can signal pause
params = {**task.params, "_task_id": task_id}
await self.loop.run_in_executor(
self.executor,
lambda: asyncio.run(plugin.execute(task.params)) if asyncio.iscoroutinefunction(plugin.execute) else plugin.execute(task.params)
lambda: asyncio.run(plugin.execute(params)) if asyncio.iscoroutinefunction(plugin.execute) else plugin.execute(params)
)
task.status = TaskStatus.SUCCESS
self._add_log(task_id, "INFO", f"Task completed successfully for plugin '{plugin.name}'")
@@ -112,6 +116,38 @@ class TaskManager:
task.finished_at = datetime.utcnow()
# In a real system, you might notify clients via WebSocket here
async def resolve_task(self, task_id: str, resolution_params: Dict[str, Any]):
"""
Resumes a task that is awaiting mapping.
"""
task = self.tasks.get(task_id)
if not task or task.status != TaskStatus.AWAITING_MAPPING:
raise ValueError("Task is not awaiting mapping.")
# Update task params with resolution
task.params.update(resolution_params)
task.status = TaskStatus.RUNNING
self._add_log(task_id, "INFO", "Task resumed after mapping resolution.")
# Signal the future to continue
if task_id in self.task_futures:
self.task_futures[task_id].set_result(True)
async def wait_for_resolution(self, task_id: str):
"""
Pauses execution and waits for a resolution signal.
"""
task = self.tasks.get(task_id)
if not task: return
task.status = TaskStatus.AWAITING_MAPPING
self.task_futures[task_id] = self.loop.create_future()
try:
await self.task_futures[task_id]
finally:
del self.task_futures[task_id]
def get_task(self, task_id: str) -> Optional[Task]:
"""
Retrieves a task by its ID.

View File

@@ -0,0 +1,53 @@
# [DEF:backend.src.core.utils.matching:Module]
#
# @SEMANTICS: fuzzy, matching, rapidfuzz, database, mapping
# @PURPOSE: Provides utility functions for fuzzy matching database names.
# @LAYER: Core
# @RELATION: DEPENDS_ON -> rapidfuzz
#
# @INVARIANT: Confidence scores are returned as floats between 0.0 and 1.0.
# [SECTION: IMPORTS]
from rapidfuzz import fuzz, process
from typing import List, Dict
# [/SECTION]
# [DEF:suggest_mappings:Function]
# @PURPOSE: Suggests mappings between source and target databases using fuzzy matching.
# @PRE: source_databases and target_databases are lists of dictionaries with 'uuid' and 'database_name'.
# @POST: Returns a list of suggested mappings with confidence scores.
# @PARAM: source_databases (List[Dict]) - Databases from the source environment.
# @PARAM: target_databases (List[Dict]) - Databases from the target environment.
# @PARAM: threshold (int) - Minimum confidence score (0-100).
# @RETURN: List[Dict] - Suggested mappings.
def suggest_mappings(source_databases: List[Dict], target_databases: List[Dict], threshold: int = 60) -> List[Dict]:
"""
Suggest mappings between source and target databases using fuzzy matching.
"""
suggestions = []
if not target_databases:
return suggestions
target_names = [db['database_name'] for db in target_databases]
for s_db in source_databases:
# Use token_sort_ratio as decided in research.md
match = process.extractOne(
s_db['database_name'],
target_names,
scorer=fuzz.token_sort_ratio
)
if match:
name, score, index = match
if score >= threshold:
suggestions.append({
"source_db_uuid": s_db['uuid'],
"target_db_uuid": target_databases[index]['uuid'],
"confidence": score / 100.0
})
return suggestions
# [/DEF:suggest_mappings]
# [/DEF:backend.src.core.utils.matching]

View File

@@ -0,0 +1,70 @@
# [DEF:backend.src.models.mapping:Module]
#
# @SEMANTICS: database, mapping, environment, migration, sqlalchemy, sqlite
# @PURPOSE: Defines the database schema for environment metadata and database mappings using SQLAlchemy.
# @LAYER: Domain
# @RELATION: DEPENDS_ON -> sqlalchemy
#
# @INVARIANT: All primary keys are UUID strings.
# @CONSTRAINT: source_env_id and target_env_id must be valid environment IDs.
# [SECTION: IMPORTS]
from sqlalchemy import Column, String, Boolean, DateTime, ForeignKey, Enum as SQLEnum
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.sql import func
import uuid
import enum
# [/SECTION]
Base = declarative_base()
# [DEF:MigrationStatus:Class]
# @PURPOSE: Enumeration of possible migration job statuses.
class MigrationStatus(enum.Enum):
PENDING = "PENDING"
RUNNING = "RUNNING"
COMPLETED = "COMPLETED"
FAILED = "FAILED"
AWAITING_MAPPING = "AWAITING_MAPPING"
# [/DEF:MigrationStatus]
# [DEF:Environment:Class]
# @PURPOSE: Represents a Superset instance environment.
class Environment(Base):
__tablename__ = "environments"
id = Column(String, primary_key=True, default=lambda: str(uuid.uuid4()))
name = Column(String, nullable=False)
url = Column(String, nullable=False)
credentials_id = Column(String, nullable=False)
# [/DEF:Environment]
# [DEF:DatabaseMapping:Class]
# @PURPOSE: Represents a mapping between source and target databases.
class DatabaseMapping(Base):
__tablename__ = "database_mappings"
id = Column(String, primary_key=True, default=lambda: str(uuid.uuid4()))
source_env_id = Column(String, ForeignKey("environments.id"), nullable=False)
target_env_id = Column(String, ForeignKey("environments.id"), nullable=False)
source_db_uuid = Column(String, nullable=False)
target_db_uuid = Column(String, nullable=False)
source_db_name = Column(String, nullable=False)
target_db_name = Column(String, nullable=False)
engine = Column(String, nullable=True)
# [/DEF:DatabaseMapping]
# [DEF:MigrationJob:Class]
# @PURPOSE: Represents a single migration execution job.
class MigrationJob(Base):
__tablename__ = "migration_jobs"
id = Column(String, primary_key=True, default=lambda: str(uuid.uuid4()))
source_env_id = Column(String, ForeignKey("environments.id"), nullable=False)
target_env_id = Column(String, ForeignKey("environments.id"), nullable=False)
status = Column(SQLEnum(MigrationStatus), default=MigrationStatus.PENDING)
replace_db = Column(Boolean, default=False)
created_at = Column(DateTime(timezone=True), server_default=func.now())
# [/DEF:MigrationJob]
# [/DEF:backend.src.models.mapping]

View File

@@ -17,6 +17,9 @@ from superset_tool.utils.init_clients import setup_clients
from superset_tool.utils.fileio import create_temp_file, update_yamls, create_dashboard_export
from ..dependencies import get_config_manager
from superset_tool.utils.logger import SupersetLogger
from ..core.migration_engine import MigrationEngine
from ..core.database import SessionLocal
from ..models.mapping import DatabaseMapping, Environment
class MigrationPlugin(PluginBase):
"""
@@ -114,18 +117,26 @@ class MigrationPlugin(PluginBase):
logger.warning("[MigrationPlugin][State] No dashboards found matching the regex.")
return
db_config_replacement = None
# Fetch mappings from database
db_mapping = {}
if replace_db_config:
if from_db_id is None or to_db_id is None:
raise ValueError("Source and target database IDs are required when replacing database configuration.")
from_db = from_c.get_database(int(from_db_id))
to_db = to_c.get_database(int(to_db_id))
old_result = from_db.get("result", {})
new_result = to_db.get("result", {})
db_config_replacement = {
"old": {"database_name": old_result.get("database_name"), "uuid": old_result.get("uuid"), "id": str(from_db.get("id"))},
"new": {"database_name": new_result.get("database_name"), "uuid": new_result.get("uuid"), "id": str(to_db.get("id"))}
}
db = SessionLocal()
try:
# Find environment IDs by name
src_env = db.query(Environment).filter(Environment.name == from_env).first()
tgt_env = db.query(Environment).filter(Environment.name == to_env).first()
if src_env and tgt_env:
mappings = db.query(DatabaseMapping).filter(
DatabaseMapping.source_env_id == src_env.id,
DatabaseMapping.target_env_id == tgt_env.id
).all()
db_mapping = {m.source_db_uuid: m.target_db_uuid for m in mappings}
logger.info(f"[MigrationPlugin][State] Loaded {len(db_mapping)} database mappings.")
finally:
db.close()
engine = MigrationEngine()
for dash in dashboards_to_migrate:
dash_id, dash_slug, title = dash["id"], dash.get("slug"), dash["dashboard_title"]
@@ -133,18 +144,46 @@ class MigrationPlugin(PluginBase):
try:
exported_content, _ = from_c.export_dashboard(dash_id)
with create_temp_file(content=exported_content, dry_run=True, suffix=".zip", logger=logger) as tmp_zip_path:
if not db_config_replacement:
if not replace_db_config:
to_c.import_dashboard(file_name=tmp_zip_path, dash_id=dash_id, dash_slug=dash_slug)
else:
with create_temp_file(suffix=".dir", logger=logger) as tmp_unpack_dir:
with zipfile.ZipFile(tmp_zip_path, "r") as zip_ref:
zip_ref.extractall(tmp_unpack_dir)
# Check for missing mappings before transformation
# This is a simplified check, in reality we'd check all YAMLs
# For US3, we'll just use the engine and handle missing ones there
with create_temp_file(suffix=".zip", dry_run=True, logger=logger) as tmp_new_zip:
# If we have missing mappings, we might need to pause
# For now, let's assume the engine can tell us what's missing
success = engine.transform_zip(str(tmp_zip_path), str(tmp_new_zip), db_mapping)
update_yamls(db_configs=[db_config_replacement], path=str(tmp_unpack_dir))
with create_temp_file(suffix=".zip", dry_run=True, logger=logger) as tmp_new_zip:
create_dashboard_export(zip_path=tmp_new_zip, source_paths=[str(p) for p in Path(tmp_unpack_dir).glob("**/*")])
if not success:
# Signal missing mapping and wait
task_id = params.get("_task_id")
if task_id:
from ..dependencies import get_task_manager
tm = get_task_manager()
logger.info(f"[MigrationPlugin][Action] Pausing for missing mapping in task {task_id}")
# In a real scenario, we'd pass the missing DB info to the frontend
# For this task, we'll just simulate the wait
await tm.wait_for_resolution(task_id)
# After resolution, retry transformation with updated mappings
# (Mappings would be updated in task.params by resolve_task)
db = SessionLocal()
try:
src_env = db.query(Environment).filter(Environment.name == from_env).first()
tgt_env = db.query(Environment).filter(Environment.name == to_env).first()
mappings = db.query(DatabaseMapping).filter(
DatabaseMapping.source_env_id == src_env.id,
DatabaseMapping.target_env_id == tgt_env.id
).all()
db_mapping = {m.source_db_uuid: m.target_db_uuid for m in mappings}
finally:
db.close()
success = engine.transform_zip(str(tmp_zip_path), str(tmp_new_zip), db_mapping)
if success:
to_c.import_dashboard(file_name=tmp_new_zip, dash_id=dash_id, dash_slug=dash_slug)
else:
logger.error(f"[MigrationPlugin][Failure] Failed to transform ZIP for dashboard {title}")
logger.info(f"[MigrationPlugin][Success] Dashboard {title} imported.")
except Exception as exc:

View File

@@ -0,0 +1,66 @@
# [DEF:backend.src.services.mapping_service:Module]
#
# @SEMANTICS: service, mapping, fuzzy-matching, superset
# @PURPOSE: Orchestrates database fetching and fuzzy matching suggestions.
# @LAYER: Service
# @RELATION: DEPENDS_ON -> backend.src.core.superset_client
# @RELATION: DEPENDS_ON -> backend.src.core.utils.matching
#
# @INVARIANT: Suggestions are based on database names.
# [SECTION: IMPORTS]
from typing import List, Dict
from backend.src.core.superset_client import SupersetClient
from backend.src.core.utils.matching import suggest_mappings
from superset_tool.models import SupersetConfig
# [/SECTION]
# [DEF:MappingService:Class]
# @PURPOSE: Service for handling database mapping logic.
class MappingService:
# [DEF:MappingService.__init__:Function]
def __init__(self, config_manager):
self.config_manager = config_manager
# [DEF:MappingService._get_client:Function]
# @PURPOSE: Helper to get an initialized SupersetClient for an environment.
def _get_client(self, env_id: str) -> SupersetClient:
envs = self.config_manager.get_environments()
env = next((e for e in envs if e.id == env_id), None)
if not env:
raise ValueError(f"Environment {env_id} not found")
superset_config = SupersetConfig(
env=env.name,
base_url=env.url,
auth={
"provider": "db",
"username": env.username,
"password": env.password,
"refresh": "false"
}
)
return SupersetClient(superset_config)
# [DEF:MappingService.get_suggestions:Function]
# @PURPOSE: Fetches databases from both environments and returns fuzzy matching suggestions.
# @PARAM: source_env_id (str) - Source environment ID.
# @PARAM: target_env_id (str) - Target environment ID.
# @RETURN: List[Dict] - Suggested mappings.
async def get_suggestions(self, source_env_id: str, target_env_id: str) -> List[Dict]:
"""
Get suggested mappings between two environments.
"""
source_client = self._get_client(source_env_id)
target_client = self._get_client(target_env_id)
source_dbs = source_client.get_databases_summary()
target_dbs = target_client.get_databases_summary()
return suggest_mappings(source_dbs, target_dbs)
# [/DEF:MappingService.get_suggestions]
# [/DEF:MappingService]
# [/DEF:backend.src.services.mapping_service]