001-migration-ui-redesign (#3)

Reviewed-on: #3
This commit is contained in:
2025-12-26 18:17:58 +03:00
parent 4448352ef9
commit a43f8fb021
38 changed files with 2434 additions and 51 deletions

View File

@@ -0,0 +1,48 @@
# [DEF:backend.src.core.database:Module]
#
# @SEMANTICS: database, sqlite, sqlalchemy, session, persistence
# @PURPOSE: Configures the SQLite database connection and session management.
# @LAYER: Core
# @RELATION: DEPENDS_ON -> sqlalchemy
# @RELATION: USES -> backend.src.models.mapping
#
# @INVARIANT: A single engine instance is used for the entire application.
# [SECTION: IMPORTS]
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, Session
from backend.src.models.mapping import Base
import os
# [/SECTION]
# [DEF:DATABASE_URL:Constant]
DATABASE_URL = os.getenv("DATABASE_URL", "sqlite:///./mappings.db")
# [/DEF:DATABASE_URL]
# [DEF:engine:Variable]
engine = create_engine(DATABASE_URL, connect_args={"check_same_thread": False})
# [/DEF:engine]
# [DEF:SessionLocal:Class]
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# [/DEF:SessionLocal]
# [DEF:init_db:Function]
# @PURPOSE: Initializes the database by creating all tables.
def init_db():
Base.metadata.create_all(bind=engine)
# [/DEF:init_db]
# [DEF:get_db:Function]
# @PURPOSE: Dependency for getting a database session.
# @POST: Session is closed after use.
# @RETURN: Generator[Session, None, None]
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
# [/DEF:get_db]
# [/DEF:backend.src.core.database]

View File

@@ -0,0 +1,81 @@
# [DEF:backend.src.core.migration_engine:Module]
#
# @SEMANTICS: migration, engine, zip, yaml, transformation
# @PURPOSE: Handles the interception and transformation of Superset asset ZIP archives.
# @LAYER: Core
# @RELATION: DEPENDS_ON -> PyYAML
#
# @INVARIANT: ZIP structure must be preserved after transformation.
# [SECTION: IMPORTS]
import zipfile
import yaml
import os
import shutil
import tempfile
from pathlib import Path
from typing import Dict
# [/SECTION]
# [DEF:MigrationEngine:Class]
# @PURPOSE: Engine for transforming Superset export ZIPs.
class MigrationEngine:
# [DEF:MigrationEngine.transform_zip:Function]
# @PURPOSE: Extracts ZIP, replaces database UUIDs in YAMLs, and re-packages.
# @PARAM: zip_path (str) - Path to the source ZIP file.
# @PARAM: output_path (str) - Path where the transformed ZIP will be saved.
# @PARAM: db_mapping (Dict[str, str]) - Mapping of source UUID to target UUID.
# @RETURN: bool - True if successful.
def transform_zip(self, zip_path: str, output_path: str, db_mapping: Dict[str, str]) -> bool:
"""
Transform a Superset export ZIP by replacing database UUIDs.
"""
with tempfile.TemporaryDirectory() as temp_dir_str:
temp_dir = Path(temp_dir_str)
try:
# 1. Extract
with zipfile.ZipFile(zip_path, 'r') as zf:
zf.extractall(temp_dir)
# 2. Transform YAMLs
# Datasets are usually in datasets/*.yaml
dataset_files = list(temp_dir.glob("**/datasets/*.yaml"))
for ds_file in dataset_files:
self._transform_yaml(ds_file, db_mapping)
# 3. Re-package
with zipfile.ZipFile(output_path, 'w', zipfile.ZIP_DEFLATED) as zf:
for root, dirs, files in os.walk(temp_dir):
for file in files:
file_path = Path(root) / file
arcname = file_path.relative_to(temp_dir)
zf.write(file_path, arcname)
return True
except Exception as e:
print(f"Error transforming ZIP: {e}")
return False
# [DEF:MigrationEngine._transform_yaml:Function]
# @PURPOSE: Replaces database_uuid in a single YAML file.
def _transform_yaml(self, file_path: Path, db_mapping: Dict[str, str]):
with open(file_path, 'r') as f:
data = yaml.safe_load(f)
if not data:
return
# Superset dataset YAML structure:
# database_uuid: ...
source_uuid = data.get('database_uuid')
if source_uuid in db_mapping:
data['database_uuid'] = db_mapping[source_uuid]
with open(file_path, 'w') as f:
yaml.dump(data, f)
# [/DEF:MigrationEngine._transform_yaml]
# [/DEF:MigrationEngine]
# [/DEF:backend.src.core.migration_engine]

View File

@@ -0,0 +1,57 @@
# [DEF:backend.src.core.superset_client:Module]
#
# @SEMANTICS: superset, api, client, database, metadata
# @PURPOSE: Extends the base SupersetClient with database-specific metadata fetching.
# @LAYER: Core
# @RELATION: INHERITS_FROM -> superset_tool.client.SupersetClient
#
# @INVARIANT: All database metadata requests must include UUID and name.
# [SECTION: IMPORTS]
from typing import List, Dict, Optional, Tuple
from superset_tool.client import SupersetClient as BaseSupersetClient
from superset_tool.models import SupersetConfig
# [/SECTION]
# [DEF:SupersetClient:Class]
# @PURPOSE: Extended SupersetClient for migration-specific operations.
class SupersetClient(BaseSupersetClient):
# [DEF:SupersetClient.get_databases_summary:Function]
# @PURPOSE: Fetch a summary of databases including uuid, name, and engine.
# @POST: Returns a list of database dictionaries with 'engine' field.
# @RETURN: List[Dict] - Summary of databases.
def get_databases_summary(self) -> List[Dict]:
"""
Fetch a summary of databases including uuid, name, and engine.
"""
query = {
"columns": ["uuid", "database_name", "backend"]
}
_, databases = self.get_databases(query=query)
# Map 'backend' to 'engine' for consistency with contracts
for db in databases:
db['engine'] = db.pop('backend', None)
return databases
# [/DEF:SupersetClient.get_databases_summary]
# [DEF:SupersetClient.get_database_by_uuid:Function]
# @PURPOSE: Find a database by its UUID.
# @PARAM: db_uuid (str) - The UUID of the database.
# @RETURN: Optional[Dict] - Database info if found, else None.
def get_database_by_uuid(self, db_uuid: str) -> Optional[Dict]:
"""
Find a database by its UUID.
"""
query = {
"filters": [{"col": "uuid", "op": "eq", "value": db_uuid}]
}
_, databases = self.get_databases(query=query)
return databases[0] if databases else None
# [/DEF:SupersetClient.get_database_by_uuid]
# [/DEF:SupersetClient]
# [/DEF:backend.src.core.superset_client]

View File

@@ -23,6 +23,7 @@ class TaskStatus(str, Enum):
RUNNING = "RUNNING"
SUCCESS = "SUCCESS"
FAILED = "FAILED"
AWAITING_MAPPING = "AWAITING_MAPPING"
# [/DEF]
@@ -64,6 +65,7 @@ class TaskManager:
self.subscribers: Dict[str, List[asyncio.Queue]] = {}
self.executor = ThreadPoolExecutor(max_workers=5) # For CPU-bound plugin execution
self.loop = asyncio.get_event_loop()
self.task_futures: Dict[str, asyncio.Future] = {}
# [/DEF]
async def create_task(self, plugin_id: str, params: Dict[str, Any], user_id: Optional[str] = None) -> Task:
@@ -99,9 +101,11 @@ class TaskManager:
# Execute plugin in a separate thread to avoid blocking the event loop
# if the plugin's execute method is synchronous and potentially CPU-bound.
# If the plugin's execute method is already async, this can be simplified.
# Pass task_id to plugin so it can signal pause
params = {**task.params, "_task_id": task_id}
await self.loop.run_in_executor(
self.executor,
lambda: asyncio.run(plugin.execute(task.params)) if asyncio.iscoroutinefunction(plugin.execute) else plugin.execute(task.params)
lambda: asyncio.run(plugin.execute(params)) if asyncio.iscoroutinefunction(plugin.execute) else plugin.execute(params)
)
task.status = TaskStatus.SUCCESS
self._add_log(task_id, "INFO", f"Task completed successfully for plugin '{plugin.name}'")
@@ -112,6 +116,38 @@ class TaskManager:
task.finished_at = datetime.utcnow()
# In a real system, you might notify clients via WebSocket here
async def resolve_task(self, task_id: str, resolution_params: Dict[str, Any]):
"""
Resumes a task that is awaiting mapping.
"""
task = self.tasks.get(task_id)
if not task or task.status != TaskStatus.AWAITING_MAPPING:
raise ValueError("Task is not awaiting mapping.")
# Update task params with resolution
task.params.update(resolution_params)
task.status = TaskStatus.RUNNING
self._add_log(task_id, "INFO", "Task resumed after mapping resolution.")
# Signal the future to continue
if task_id in self.task_futures:
self.task_futures[task_id].set_result(True)
async def wait_for_resolution(self, task_id: str):
"""
Pauses execution and waits for a resolution signal.
"""
task = self.tasks.get(task_id)
if not task: return
task.status = TaskStatus.AWAITING_MAPPING
self.task_futures[task_id] = self.loop.create_future()
try:
await self.task_futures[task_id]
finally:
del self.task_futures[task_id]
def get_task(self, task_id: str) -> Optional[Task]:
"""
Retrieves a task by its ID.

View File

@@ -0,0 +1,53 @@
# [DEF:backend.src.core.utils.matching:Module]
#
# @SEMANTICS: fuzzy, matching, rapidfuzz, database, mapping
# @PURPOSE: Provides utility functions for fuzzy matching database names.
# @LAYER: Core
# @RELATION: DEPENDS_ON -> rapidfuzz
#
# @INVARIANT: Confidence scores are returned as floats between 0.0 and 1.0.
# [SECTION: IMPORTS]
from rapidfuzz import fuzz, process
from typing import List, Dict
# [/SECTION]
# [DEF:suggest_mappings:Function]
# @PURPOSE: Suggests mappings between source and target databases using fuzzy matching.
# @PRE: source_databases and target_databases are lists of dictionaries with 'uuid' and 'database_name'.
# @POST: Returns a list of suggested mappings with confidence scores.
# @PARAM: source_databases (List[Dict]) - Databases from the source environment.
# @PARAM: target_databases (List[Dict]) - Databases from the target environment.
# @PARAM: threshold (int) - Minimum confidence score (0-100).
# @RETURN: List[Dict] - Suggested mappings.
def suggest_mappings(source_databases: List[Dict], target_databases: List[Dict], threshold: int = 60) -> List[Dict]:
"""
Suggest mappings between source and target databases using fuzzy matching.
"""
suggestions = []
if not target_databases:
return suggestions
target_names = [db['database_name'] for db in target_databases]
for s_db in source_databases:
# Use token_sort_ratio as decided in research.md
match = process.extractOne(
s_db['database_name'],
target_names,
scorer=fuzz.token_sort_ratio
)
if match:
name, score, index = match
if score >= threshold:
suggestions.append({
"source_db_uuid": s_db['uuid'],
"target_db_uuid": target_databases[index]['uuid'],
"confidence": score / 100.0
})
return suggestions
# [/DEF:suggest_mappings]
# [/DEF:backend.src.core.utils.matching]