Files
ss-tools/migration_script.py
2025-10-06 18:49:40 +03:00

278 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# <GRACE_MODULE id="migration_script" name="migration_script.py">
# @SEMANTICS: migration, cli, superset, ui, logging, error-recovery, batch-delete
# @PURPOSE: Предоставляет интерактивный CLI для миграции дашбордов Superset между окружениями с возможностью восстановления после ошибок.
# @DEPENDS_ON: superset_tool.client -> Для взаимодействия с API Superset.
# @DEPENDS_ON: superset_tool.utils -> Для инициализации клиентов, работы с файлами, UI и логирования.
# <IMPORTS>
import json
import logging
import sys
import zipfile
from pathlib import Path
from typing import List, Optional, Tuple, Dict
from superset_tool.client import SupersetClient
from superset_tool.utils.init_clients import setup_clients
from superset_tool.utils.fileio import create_temp_file, update_yamls, create_dashboard_export
from superset_tool.utils.whiptail_fallback import menu, checklist, yesno, msgbox, inputbox, gauge
from superset_tool.utils.logger import SupersetLogger
# </IMPORTS>
# --- Начало кода модуля ---
# <ANCHOR id="Migration" type="Class">
# @PURPOSE: Инкапсулирует логику интерактивной миграции дашбордов с возможностью «удалить‑и‑перезаписать» при ошибке импорта.
# @RELATION: CREATES_INSTANCE_OF -> SupersetLogger
# @RELATION: USES -> SupersetClient
class Migration:
"""
Интерактивный процесс миграции дашбордов.
"""
def __init__(self) -> None:
# <ANCHOR id="Migration.__init__" type="Function">
# @PURPOSE: Инициализирует сервис миграции, настраивает логгер и начальные состояния.
# @POST: `self.logger` готов к использованию; `enable_delete_on_failure` = `False`.
default_log_dir = Path.cwd() / "logs"
self.logger = SupersetLogger(
name="migration_script",
log_dir=default_log_dir,
level=logging.INFO,
console=True,
)
self.enable_delete_on_failure = False
self.from_c: Optional[SupersetClient] = None
self.to_c: Optional[SupersetClient] = None
self.dashboards_to_migrate: List[dict] = []
self.db_config_replacement: Optional[dict] = None
self._failed_imports: List[dict] = []
assert self.logger is not None, "Logger must be instantiated."
# </ANCHOR id="Migration.__init__">
# <ANCHOR id="Migration.run" type="Function">
# @PURPOSE: Точка входа последовательный запуск всех шагов миграции.
# @PRE: Логгер готов.
# @POST: Скрипт завершён, пользователю выведено сообщение.
# @RELATION: CALLS -> self.ask_delete_on_failure
# @RELATION: CALLS -> self.select_environments
# @RELATION: CALLS -> self.select_dashboards
# @RELATION: CALLS -> self.confirm_db_config_replacement
# @RELATION: CALLS -> self.execute_migration
def run(self) -> None:
self.logger.info("[run][Entry] Запуск скрипта миграции.")
self.ask_delete_on_failure()
self.select_environments()
self.select_dashboards()
self.confirm_db_config_replacement()
self.execute_migration()
self.logger.info("[run][Exit] Скрипт миграции завершён.")
# </ANCHOR id="Migration.run">
# <ANCHOR id="Migration.ask_delete_on_failure" type="Function">
# @PURPOSE: Запрашивает у пользователя, следует ли удалять дашборд при ошибке импорта.
# @POST: `self.enable_delete_on_failure` установлен.
# @RELATION: CALLS -> yesno
def ask_delete_on_failure(self) -> None:
self.enable_delete_on_failure = yesno(
"Поведение при ошибке импорта",
"Если импорт завершится ошибкой, удалить существующий дашборд и попытаться импортировать заново?",
)
self.logger.info(
"[ask_delete_on_failure][State] Delete-on-failure = %s",
self.enable_delete_on_failure,
)
# </ANCHOR id="Migration.ask_delete_on_failure">
# <ANCHOR id="Migration.select_environments" type="Function">
# @PURPOSE: Позволяет пользователю выбрать исходное и целевое окружения Superset.
# @PRE: `setup_clients` успешно инициализирует все клиенты.
# @POST: `self.from_c` и `self.to_c` установлены.
# @RELATION: CALLS -> setup_clients
# @RELATION: CALLS -> menu
def select_environments(self) -> None:
self.logger.info("[select_environments][Entry] Шаг 1/5: Выбор окружений.")
try:
all_clients = setup_clients(self.logger)
available_envs = list(all_clients.keys())
except Exception as e:
self.logger.error("[select_environments][Failure] %s", e, exc_info=True)
msgbox("Ошибка", "Не удалось инициализировать клиенты.")
return
rc, from_env_name = menu(
title="Выбор окружения",
prompt="Исходное окружение:",
choices=available_envs,
)
if rc != 0:
return
self.from_c = all_clients[from_env_name]
self.logger.info("[select_environments][State] from = %s", from_env_name)
available_envs.remove(from_env_name)
rc, to_env_name = menu(
title="Выбор окружения",
prompt="Целевое окружение:",
choices=available_envs,
)
if rc != 0:
return
self.to_c = all_clients[to_env_name]
self.logger.info("[select_environments][State] to = %s", to_env_name)
self.logger.info("[select_environments][Exit] Шаг 1 завершён.")
# </ANCHOR id="Migration.select_environments">
# <ANCHOR id="Migration.select_dashboards" type="Function">
# @PURPOSE: Позволяет пользователю выбрать набор дашбордов для миграции.
# @PRE: `self.from_c` инициализирован.
# @POST: `self.dashboards_to_migrate` заполнен.
# @RELATION: CALLS -> self.from_c.get_dashboards
# @RELATION: CALLS -> checklist
def select_dashboards(self) -> None:
self.logger.info("[select_dashboards][Entry] Шаг 2/5: Выбор дашбордов.")
try:
_, all_dashboards = self.from_c.get_dashboards()
if not all_dashboards:
self.logger.warning("[select_dashboards][State] No dashboards.")
msgbox("Информация", "В исходном окружении нет дашбордов.")
return
options = [("ALL", "Все дашборды")] + [
(str(d["id"]), d["dashboard_title"]) for d in all_dashboards
]
rc, selected = checklist(
title="Выбор дашбордов",
prompt="Отметьте нужные дашборды (введите номера):",
options=options,
)
if rc != 0:
return
if "ALL" in selected:
self.dashboards_to_migrate = list(all_dashboards)
else:
self.dashboards_to_migrate = [
d for d in all_dashboards if str(d["id"]) in selected
]
self.logger.info(
"[select_dashboards][State] Выбрано %d дашбордов.",
len(self.dashboards_to_migrate),
)
except Exception as e:
self.logger.error("[select_dashboards][Failure] %s", e, exc_info=True)
msgbox("Ошибка", "Не удалось получить список дашбордов.")
self.logger.info("[select_dashboards][Exit] Шаг 2 завершён.")
# </ANCHOR id="Migration.select_dashboards">
# <ANCHOR id="Migration.confirm_db_config_replacement" type="Function">
# @PURPOSE: Запрашивает у пользователя, требуется ли заменить имена БД в YAML-файлах.
# @POST: `self.db_config_replacement` либо `None`, либо заполнен.
# @RELATION: CALLS -> yesno
# @RELATION: CALLS -> inputbox
def confirm_db_config_replacement(self) -> None:
if yesno("Замена БД", "Заменить конфигурацию БД в YAMLфайлах?"):
rc, old_name = inputbox("Замена БД", "Старое имя БД (например, db_dev):")
if rc != 0: return
rc, new_name = inputbox("Замена БД", "Новое имя БД (например, db_prod):")
if rc != 0: return
self.db_config_replacement = { "old": {"database_name": old_name}, "new": {"database_name": new_name} }
self.logger.info("[confirm_db_config_replacement][State] Replacement set: %s", self.db_config_replacement)
else:
self.logger.info("[confirm_db_config_replacement][State] Skipped.")
# </ANCHOR id="Migration.confirm_db_config_replacement">
# <ANCHOR id="Migration._batch_delete_by_ids" type="Function">
# @PURPOSE: Удаляет набор дашбордов по их ID единым запросом.
# @PRE: `ids` непустой список целых чисел.
# @POST: Все указанные дашборды удалены (если они существовали).
# @PARAM: ids: List[int] - Список ID дашбордов для удаления.
# @RELATION: CALLS -> self.to_c.network.request
def _batch_delete_by_ids(self, ids: List[int]) -> None:
if not ids:
self.logger.debug("[_batch_delete_by_ids][Skip] Empty ID list nothing to delete.")
return
self.logger.info("[_batch_delete_by_ids][Entry] Deleting dashboards IDs: %s", ids)
q_param = json.dumps(ids)
response = self.to_c.network.request(method="DELETE", endpoint="/dashboard/", params={"q": q_param})
if isinstance(response, dict) and response.get("result", True) is False:
self.logger.warning("[_batch_delete_by_ids][Warning] Unexpected delete response: %s", response)
else:
self.logger.info("[_batch_delete_by_ids][Success] Delete request completed.")
# </ANCHOR id="Migration._batch_delete_by_ids">
# <ANCHOR id="Migration.execute_migration" type="Function">
# @PURPOSE: Выполняет экспорт-импорт дашбордов, обрабатывает ошибки и, при необходимости, выполняет процедуру восстановления.
# @PRE: `self.dashboards_to_migrate` не пуст; `self.from_c` и `self.to_c` инициализированы.
# @POST: Успешные дашборды импортированы; неудачные - восстановлены или залогированы.
# @RELATION: CALLS -> self.from_c.export_dashboard
# @RELATION: CALLS -> create_temp_file
# @RELATION: CALLS -> update_yamls
# @RELATION: CALLS -> create_dashboard_export
# @RELATION: CALLS -> self.to_c.import_dashboard
# @RELATION: CALLS -> self._batch_delete_by_ids
def execute_migration(self) -> None:
if not self.dashboards_to_migrate:
self.logger.warning("[execute_migration][Skip] No dashboards to migrate.")
msgbox("Информация", "Нет дашбордов для миграции.")
return
total = len(self.dashboards_to_migrate)
self.logger.info("[execute_migration][Entry] Starting migration of %d dashboards.", total)
self.to_c.delete_before_reimport = self.enable_delete_on_failure
with gauge("Миграция...", width=60, height=10) as g:
for i, dash in enumerate(self.dashboards_to_migrate):
dash_id, dash_slug, title = dash["id"], dash.get("slug"), dash["dashboard_title"]
g.set_text(f"Миграция: {title} ({i + 1}/{total})")
g.set_percent(int((i / total) * 100))
try:
exported_content, _ = self.from_c.export_dashboard(dash_id)
with create_temp_file(content=exported_content, suffix=".zip", logger=self.logger) as tmp_zip_path, \
create_temp_file(suffix=".dir", logger=self.logger) as tmp_unpack_dir:
with zipfile.ZipFile(tmp_zip_path, "r") as zip_ref:
zip_ref.extractall(tmp_unpack_dir)
if self.db_config_replacement:
update_yamls(db_configs=[self.db_config_replacement], path=str(tmp_unpack_dir))
with create_temp_file(suffix=".zip", logger=self.logger) as tmp_new_zip:
create_dashboard_export(zip_path=tmp_new_zip, source_paths=[str(tmp_unpack_dir)])
self.to_c.import_dashboard(file_name=tmp_new_zip, dash_id=dash_id, dash_slug=dash_slug)
self.logger.info("[execute_migration][Success] Dashboard %s imported.", title)
except Exception as exc:
self.logger.error("[execute_migration][Failure] %s", exc, exc_info=True)
self._failed_imports.append({"slug": dash_slug, "dash_id": dash_id, "zip_content": exported_content})
msgbox("Ошибка", f"Не удалось мигрировать дашборд {title}.\n\n{exc}")
g.set_percent(100)
if self.enable_delete_on_failure and self._failed_imports:
self.logger.info("[execute_migration][Recovery] %d dashboards failed. Starting recovery.", len(self._failed_imports))
_, target_dashboards = self.to_c.get_dashboards()
slug_to_id = {d["slug"]: d["id"] for d in target_dashboards if "slug" in d and "id" in d}
ids_to_delete = [slug_to_id[f["slug"]] for f in self._failed_imports if f["slug"] in slug_to_id]
self._batch_delete_by_ids(ids_to_delete)
for fail in self._failed_imports:
with create_temp_file(content=fail["zip_content"], suffix=".zip", logger=self.logger) as retry_zip:
self.to_c.import_dashboard(file_name=retry_zip, dash_id=fail["dash_id"], dash_slug=fail["slug"])
self.logger.info("[execute_migration][Recovered] Dashboard slug '%s' re-imported.", fail["slug"])
self.logger.info("[execute_migration][Exit] Migration finished.")
msgbox("Информация", "Миграция завершена!")
# </ANCHOR id="Migration.execute_migration">
# </ANCHOR id="Migration">
# --- Конец кода модуля ---
if __name__ == "__main__":
Migration().run()
# </GRACE_MODULE id="migration_script">