# [DEF:migration_script:Module] # # @SEMANTICS: migration, cli, superset, ui, logging, error-recovery, batch-delete # @PURPOSE: Предоставляет интерактивный CLI для миграции дашбордов Superset между окружениями с возможностью восстановления после ошибок. # @LAYER: App # @RELATION: DEPENDS_ON -> superset_tool.client # @RELATION: DEPENDS_ON -> superset_tool.utils # @PUBLIC_API: Migration # [SECTION: IMPORTS] import json import logging import sys import zipfile import re from pathlib import Path from typing import List, Optional, Tuple, Dict from superset_tool.client import SupersetClient from superset_tool.utils.init_clients import setup_clients from superset_tool.utils.fileio import create_temp_file, update_yamls, create_dashboard_export from superset_tool.utils.whiptail_fallback import menu, checklist, yesno, msgbox, inputbox, gauge from superset_tool.utils.logger import SupersetLogger # [/SECTION] # [DEF:Migration:Class] # @PURPOSE: Инкапсулирует логику интерактивной миграции дашбордов с возможностью «удалить‑и‑перезаписать» при ошибке импорта. # @RELATION: CREATES_INSTANCE_OF -> SupersetLogger # @RELATION: USES -> SupersetClient class Migration: """ Интерактивный процесс миграции дашбордов. """ # [DEF:Migration.__init__:Function] # @PURPOSE: Инициализирует сервис миграции, настраивает логгер и начальные состояния. # @POST: `self.logger` готов к использованию; `enable_delete_on_failure` = `False`. def __init__(self) -> None: default_log_dir = Path.cwd() / "logs" self.logger = SupersetLogger( name="migration_script", log_dir=default_log_dir, level=logging.INFO, console=True, ) self.enable_delete_on_failure = False self.from_c: Optional[SupersetClient] = None self.to_c: Optional[SupersetClient] = None self.dashboards_to_migrate: List[dict] = [] self.db_config_replacement: Optional[dict] = None self._failed_imports: List[dict] = [] # [/DEF:Migration.__init__:Function] # [DEF:Migration.run:Function] # @PURPOSE: Точка входа – последовательный запуск всех шагов миграции. # @PRE: Логгер готов. # @POST: Скрипт завершён, пользователю выведено сообщение. # @RELATION: CALLS -> self.ask_delete_on_failure # @RELATION: CALLS -> self.select_environments # @RELATION: CALLS -> self.select_dashboards # @RELATION: CALLS -> self.confirm_db_config_replacement # @RELATION: CALLS -> self.execute_migration def run(self) -> None: self.logger.info("[run][Entry] Запуск скрипта миграции.") self.ask_delete_on_failure() self.select_environments() self.select_dashboards() self.confirm_db_config_replacement() self.execute_migration() self.logger.info("[run][Exit] Скрипт миграции завершён.") # [/DEF:Migration.run:Function] # [DEF:Migration.ask_delete_on_failure:Function] # @PURPOSE: Запрашивает у пользователя, следует ли удалять дашборд при ошибке импорта. # @POST: `self.enable_delete_on_failure` установлен. # @RELATION: CALLS -> yesno def ask_delete_on_failure(self) -> None: self.enable_delete_on_failure = yesno( "Поведение при ошибке импорта", "Если импорт завершится ошибкой, удалить существующий дашборд и попытаться импортировать заново?", ) self.logger.info( "[ask_delete_on_failure][State] Delete-on-failure = %s", self.enable_delete_on_failure, ) # [/DEF:Migration.ask_delete_on_failure:Function] # [DEF:Migration.select_environments:Function] # @PURPOSE: Позволяет пользователю выбрать исходное и целевое окружения Superset. # @PRE: `setup_clients` успешно инициализирует все клиенты. # @POST: `self.from_c` и `self.to_c` установлены. # @RELATION: CALLS -> setup_clients # @RELATION: CALLS -> menu def select_environments(self) -> None: self.logger.info("[select_environments][Entry] Шаг 1/5: Выбор окружений.") try: all_clients = setup_clients(self.logger) available_envs = list(all_clients.keys()) except Exception as e: self.logger.error("[select_environments][Failure] %s", e, exc_info=True) msgbox("Ошибка", "Не удалось инициализировать клиенты.") return rc, from_env_name = menu( title="Выбор окружения", prompt="Исходное окружение:", choices=available_envs, ) if rc != 0 or from_env_name is None: self.logger.info("[select_environments][State] Source environment selection cancelled.") return self.from_c = all_clients[from_env_name] self.logger.info("[select_environments][State] from = %s", from_env_name) available_envs.remove(from_env_name) rc, to_env_name = menu( title="Выбор окружения", prompt="Целевое окружение:", choices=available_envs, ) if rc != 0 or to_env_name is None: self.logger.info("[select_environments][State] Target environment selection cancelled.") return self.to_c = all_clients[to_env_name] self.logger.info("[select_environments][State] to = %s", to_env_name) self.logger.info("[select_environments][Exit] Шаг 1 завершён.") # [/DEF:Migration.select_environments:Function] # [DEF:Migration.select_dashboards:Function] # @PURPOSE: Позволяет пользователю выбрать набор дашбордов для миграции. # @PRE: `self.from_c` инициализирован. # @POST: `self.dashboards_to_migrate` заполнен. # @RELATION: CALLS -> self.from_c.get_dashboards # @RELATION: CALLS -> checklist def select_dashboards(self) -> None: self.logger.info("[select_dashboards][Entry] Шаг 2/5: Выбор дашбордов.") if self.from_c is None: self.logger.error("[select_dashboards][Failure] Source client not initialized.") msgbox("Ошибка", "Исходное окружение не выбрано.") return try: _, all_dashboards = self.from_c.get_dashboards() if not all_dashboards: self.logger.warning("[select_dashboards][State] No dashboards.") msgbox("Информация", "В исходном окружении нет дашбордов.") return rc, regex = inputbox("Поиск", "Введите регулярное выражение для поиска дашбордов:") if rc != 0: return # Ensure regex is a string and perform case‑insensitive search regex_str = str(regex) filtered_dashboards = [ d for d in all_dashboards if re.search(regex_str, d["dashboard_title"], re.IGNORECASE) ] options = [("ALL", "Все дашборды")] + [ (str(d["id"]), d["dashboard_title"]) for d in filtered_dashboards ] rc, selected = checklist( title="Выбор дашбордов", prompt="Отметьте нужные дашборды (введите номера):", options=options, ) if rc != 0: return if "ALL" in selected: self.dashboards_to_migrate = filtered_dashboards else: self.dashboards_to_migrate = [ d for d in filtered_dashboards if str(d["id"]) in selected ] self.logger.info( "[select_dashboards][State] Выбрано %d дашбордов.", len(self.dashboards_to_migrate), ) except Exception as e: self.logger.error("[select_dashboards][Failure] %s", e, exc_info=True) msgbox("Ошибка", "Не удалось получить список дашбордов.") self.logger.info("[select_dashboards][Exit] Шаг 2 завершён.") # [/DEF:Migration.select_dashboards:Function] # [DEF:Migration.confirm_db_config_replacement:Function] # @PURPOSE: Запрашивает у пользователя, требуется ли заменить имена БД в YAML-файлах. # @POST: `self.db_config_replacement` либо `None`, либо заполнен. # @RELATION: CALLS -> yesno # @RELATION: CALLS -> self._select_databases def confirm_db_config_replacement(self) -> None: if yesno("Замена БД", "Заменить конфигурацию БД в YAML‑файлах?"): old_db, new_db = self._select_databases() if not old_db or not new_db: self.logger.info("[confirm_db_config_replacement][State] Selection cancelled.") return print(f"old_db: {old_db}") old_result = old_db.get("result", {}) new_result = new_db.get("result", {}) self.db_config_replacement = { "old": { "database_name": old_result.get("database_name"), "uuid": old_result.get("uuid"), "database_uuid": old_result.get("uuid"), "id": str(old_db.get("id")) }, "new": { "database_name": new_result.get("database_name"), "uuid": new_result.get("uuid"), "database_uuid": new_result.get("uuid"), "id": str(new_db.get("id")) } } self.logger.info("[confirm_db_config_replacement][State] Replacement set: %s", self.db_config_replacement) else: self.logger.info("[confirm_db_config_replacement][State] Skipped.") # [/DEF:Migration.confirm_db_config_replacement:Function] # [DEF:Migration._select_databases:Function] # @PURPOSE: Позволяет пользователю выбрать исходную и целевую БД через API. # @POST: Возвращает кортеж (старая БД, новая БД) или (None, None) при отмене. # @RELATION: CALLS -> self.from_c.get_databases # @RELATION: CALLS -> self.to_c.get_databases # @RELATION: CALLS -> self.from_c.get_database # @RELATION: CALLS -> self.to_c.get_database # @RELATION: CALLS -> menu def _select_databases(self) -> Tuple[Optional[Dict], Optional[Dict]]: self.logger.info("[_select_databases][Entry] Selecting databases from both environments.") if self.from_c is None or self.to_c is None: self.logger.error("[_select_databases][Failure] Source or target client not initialized.") msgbox("Ошибка", "Исходное или целевое окружение не выбрано.") return None, None # Получаем список БД из обоих окружений try: _, from_dbs = self.from_c.get_databases() _, to_dbs = self.to_c.get_databases() except Exception as e: self.logger.error("[_select_databases][Failure] Failed to fetch databases: %s", e) msgbox("Ошибка", "Не удалось получить список баз данных.") return None, None # Формируем список для выбора # По Swagger документации, в ответе API поле называется "database_name" from_choices = [] for db in from_dbs: db_name = db.get("database_name", "Без имени") from_choices.append((str(db["id"]), f"{db_name} (ID: {db['id']})")) to_choices = [] for db in to_dbs: db_name = db.get("database_name", "Без имени") to_choices.append((str(db["id"]), f"{db_name} (ID: {db['id']})")) # Показываем список БД для исходного окружения rc, from_sel = menu( title="Выбор исходной БД", prompt="Выберите исходную БД:", choices=[f"{name}" for id, name in from_choices] ) if rc != 0: return None, None # Определяем выбранную БД from_db_id = from_choices[[choice[1] for choice in from_choices].index(from_sel)][0] # Получаем полную информацию о выбранной БД из исходного окружения try: from_db = self.from_c.get_database(int(from_db_id)) except Exception as e: self.logger.error("[_select_databases][Failure] Failed to fetch database details: %s", e) msgbox("Ошибка", "Не удалось получить информацию о выбранной базе данных.") return None, None # Показываем список БД для целевого окружения rc, to_sel = menu( title="Выбор целевой БД", prompt="Выберите целевую БД:", choices=[f"{name}" for id, name in to_choices] ) if rc != 0: return None, None # Определяем выбранную БД to_db_id = to_choices[[choice[1] for choice in to_choices].index(to_sel)][0] # Получаем полную информацию о выбранной БД из целевого окружения try: to_db = self.to_c.get_database(int(to_db_id)) except Exception as e: self.logger.error("[_select_databases][Failure] Failed to fetch database details: %s", e) msgbox("Ошибка", "Не удалось получить информацию о выбранной базе данных.") return None, None self.logger.info("[_select_databases][Exit] Selected databases: %s -> %s", from_db.get("database_name", "Без имени"), to_db.get("database_name", "Без имени")) return from_db, to_db # [/DEF:Migration._select_databases:Function] # [DEF:Migration._batch_delete_by_ids:Function] # @PURPOSE: Удаляет набор дашбордов по их ID единым запросом. # @PRE: `ids` – непустой список целых чисел. # @POST: Все указанные дашборды удалены (если они существовали). # @RELATION: CALLS -> self.to_c.network.request # @PARAM: ids (List[int]) - Список ID дашбордов для удаления. def _batch_delete_by_ids(self, ids: List[int]) -> None: if not ids: self.logger.debug("[_batch_delete_by_ids][Skip] Empty ID list – nothing to delete.") return if self.to_c is None: self.logger.error("[_batch_delete_by_ids][Failure] Target client not initialized.") msgbox("Ошибка", "Целевое окружение не выбрано.") return self.logger.info("[_batch_delete_by_ids][Entry] Deleting dashboards IDs: %s", ids) q_param = json.dumps(ids) response = self.to_c.network.request(method="DELETE", endpoint="/dashboard/", params={"q": q_param}) if isinstance(response, dict) and response.get("result", True) is False: self.logger.warning("[_batch_delete_by_ids][Warning] Unexpected delete response: %s", response) else: self.logger.info("[_batch_delete_by_ids][Success] Delete request completed.") # [/DEF:Migration._batch_delete_by_ids:Function] # [DEF:Migration.execute_migration:Function] # @PURPOSE: Выполняет экспорт-импорт дашбордов, обрабатывает ошибки и, при необходимости, выполняет процедуру восстановления. # @PRE: `self.dashboards_to_migrate` не пуст; `self.from_c` и `self.to_c` инициализированы. # @POST: Успешные дашборды импортированы; неудачные - восстановлены или залогированы. # @RELATION: CALLS -> self.from_c.export_dashboard # @RELATION: CALLS -> create_temp_file # @RELATION: CALLS -> update_yamls # @RELATION: CALLS -> create_dashboard_export # @RELATION: CALLS -> self.to_c.import_dashboard # @RELATION: CALLS -> self._batch_delete_by_ids def execute_migration(self) -> None: if not self.dashboards_to_migrate: self.logger.warning("[execute_migration][Skip] No dashboards to migrate.") msgbox("Информация", "Нет дашбордов для миграции.") return if self.from_c is None or self.to_c is None: self.logger.error("[execute_migration][Failure] Source or target client not initialized.") msgbox("Ошибка", "Исходное или целевое окружение не выбрано.") return total = len(self.dashboards_to_migrate) self.logger.info("[execute_migration][Entry] Starting migration of %d dashboards.", total) self.to_c.delete_before_reimport = self.enable_delete_on_failure with gauge("Миграция...", width=60, height=10) as g: for i, dash in enumerate(self.dashboards_to_migrate): dash_id, dash_slug, title = dash["id"], dash.get("slug"), dash["dashboard_title"] g.set_text(f"Миграция: {title} ({i + 1}/{total})") g.set_percent(int((i / total) * 100)) exported_content = None # Initialize exported_content try: exported_content, _ = self.from_c.export_dashboard(dash_id) with create_temp_file(content=exported_content, dry_run=True, suffix=".zip", logger=self.logger) as tmp_zip_path, \ create_temp_file(suffix=".dir", logger=self.logger) as tmp_unpack_dir: if not self.db_config_replacement: self.to_c.import_dashboard(file_name=tmp_zip_path, dash_id=dash_id, dash_slug=dash_slug) else: with zipfile.ZipFile(tmp_zip_path, "r") as zip_ref: zip_ref.extractall(tmp_unpack_dir) if self.db_config_replacement: update_yamls(db_configs=[self.db_config_replacement], path=str(tmp_unpack_dir)) with create_temp_file(suffix=".zip", dry_run=True, logger=self.logger) as tmp_new_zip: create_dashboard_export(zip_path=tmp_new_zip, source_paths=[str(p) for p in Path(tmp_unpack_dir).glob("**/*")]) self.to_c.import_dashboard(file_name=tmp_new_zip, dash_id=dash_id, dash_slug=dash_slug) self.logger.info("[execute_migration][Success] Dashboard %s imported.", title) except Exception as exc: self.logger.error("[execute_migration][Failure] %s", exc, exc_info=True) self._failed_imports.append({"slug": dash_slug, "dash_id": dash_id, "zip_content": exported_content}) msgbox("Ошибка", f"Не удалось мигрировать дашборд {title}.\n\n{exc}") g.set_percent(100) if self.enable_delete_on_failure and self._failed_imports: self.logger.info("[execute_migration][Recovery] %d dashboards failed. Starting recovery.", len(self._failed_imports)) _, target_dashboards = self.to_c.get_dashboards() slug_to_id = {d["slug"]: d["id"] for d in target_dashboards if "slug" in d and "id" in d} ids_to_delete = [slug_to_id[f["slug"]] for f in self._failed_imports if f["slug"] in slug_to_id] self._batch_delete_by_ids(ids_to_delete) for fail in self._failed_imports: with create_temp_file(content=fail["zip_content"], suffix=".zip", logger=self.logger) as retry_zip: self.to_c.import_dashboard(file_name=retry_zip, dash_id=fail["dash_id"], dash_slug=fail["slug"]) self.logger.info("[execute_migration][Recovered] Dashboard slug '%s' re-imported.", fail["slug"]) self.logger.info("[execute_migration][Exit] Migration finished.") msgbox("Информация", "Миграция завершена!") # [/DEF:Migration.execute_migration:Function] # [/DEF:Migration:Class] if __name__ == "__main__": Migration().run() # [/DEF:migration_script:Module]