Files
ss-tools/backend/src/core/migration_engine.py
2025-12-27 10:16:41 +03:00

98 lines
4.3 KiB
Python

# [DEF:backend.src.core.migration_engine:Module]
#
# @SEMANTICS: migration, engine, zip, yaml, transformation
# @PURPOSE: Handles the interception and transformation of Superset asset ZIP archives.
# @LAYER: Core
# @RELATION: DEPENDS_ON -> PyYAML
#
# @INVARIANT: ZIP structure must be preserved after transformation.
# [SECTION: IMPORTS]
import zipfile
import yaml
import os
import shutil
import tempfile
from pathlib import Path
from typing import Dict
from .logger import logger, belief_scope
import yaml
# [/SECTION]
# [DEF:MigrationEngine:Class]
# @PURPOSE: Engine for transforming Superset export ZIPs.
class MigrationEngine:
# [DEF:MigrationEngine.transform_zip:Function]
# @PURPOSE: Extracts ZIP, replaces database UUIDs in YAMLs, and re-packages.
# @PARAM: zip_path (str) - Path to the source ZIP file.
# @PARAM: output_path (str) - Path where the transformed ZIP will be saved.
# @PARAM: db_mapping (Dict[str, str]) - Mapping of source UUID to target UUID.
# @PARAM: strip_databases (bool) - Whether to remove the databases directory from the archive.
# @RETURN: bool - True if successful.
def transform_zip(self, zip_path: str, output_path: str, db_mapping: Dict[str, str], strip_databases: bool = True) -> bool:
"""
Transform a Superset export ZIP by replacing database UUIDs.
"""
with belief_scope("MigrationEngine.transform_zip"):
with tempfile.TemporaryDirectory() as temp_dir_str:
temp_dir = Path(temp_dir_str)
try:
# 1. Extract
logger.info(f"[MigrationEngine.transform_zip][Action] Extracting ZIP: {zip_path}")
with zipfile.ZipFile(zip_path, 'r') as zf:
zf.extractall(temp_dir)
# 2. Transform YAMLs
# Datasets are usually in datasets/*.yaml
dataset_files = list(temp_dir.glob("**/datasets/**/*.yaml")) + list(temp_dir.glob("**/datasets/*.yaml"))
dataset_files = list(set(dataset_files))
logger.info(f"[MigrationEngine.transform_zip][State] Found {len(dataset_files)} dataset files.")
for ds_file in dataset_files:
logger.info(f"[MigrationEngine.transform_zip][Action] Transforming dataset: {ds_file}")
self._transform_yaml(ds_file, db_mapping)
# 3. Re-package
logger.info(f"[MigrationEngine.transform_zip][Action] Re-packaging ZIP to: {output_path} (strip_databases={strip_databases})")
with zipfile.ZipFile(output_path, 'w', zipfile.ZIP_DEFLATED) as zf:
for root, dirs, files in os.walk(temp_dir):
rel_root = Path(root).relative_to(temp_dir)
if strip_databases and "databases" in rel_root.parts:
logger.info(f"[MigrationEngine.transform_zip][Action] Skipping file in databases directory: {rel_root}")
continue
for file in files:
file_path = Path(root) / file
arcname = file_path.relative_to(temp_dir)
zf.write(file_path, arcname)
return True
except Exception as e:
logger.error(f"[MigrationEngine.transform_zip][Coherence:Failed] Error transforming ZIP: {e}")
return False
# [DEF:MigrationEngine._transform_yaml:Function]
# @PURPOSE: Replaces database_uuid in a single YAML file.
def _transform_yaml(self, file_path: Path, db_mapping: Dict[str, str]):
with open(file_path, 'r') as f:
data = yaml.safe_load(f)
if not data:
return
# Superset dataset YAML structure:
# database_uuid: ...
source_uuid = data.get('database_uuid')
if source_uuid in db_mapping:
data['database_uuid'] = db_mapping[source_uuid]
with open(file_path, 'w') as f:
yaml.dump(data, f)
# [/DEF:MigrationEngine._transform_yaml]
# [/DEF:MigrationEngine]
# [/DEF:backend.src.core.migration_engine]