diff --git a/backup_script.py b/backup_script.py index a8cb731..942a206 100644 --- a/backup_script.py +++ b/backup_script.py @@ -8,7 +8,7 @@ import logging import sys from pathlib import Path -from dataclasses import dataclass +from dataclasses import dataclass,field # [IMPORTS] Third-party from requests.exceptions import RequestException @@ -37,17 +37,18 @@ class BackupConfig: consolidate: bool = True rotate_archive: bool = True clean_folders: bool = True - retention_policy: RetentionPolicy = RetentionPolicy() + retention_policy: RetentionPolicy = field(default_factory=RetentionPolicy) # [ENTITY: Function('backup_dashboards')] # CONTRACT: -# PURPOSE: Выполняет бэкап всех доступных дашбордов для заданного клиента и окружения. +# PURPOSE: Выполняет бэкап всех доступных дашбордов для заданного клиента и окружения, пропуская ошибки экспорта. # PRECONDITIONS: # - `client` должен быть инициализированным экземпляром `SupersetClient`. # - `env_name` должен быть строкой, обозначающей окружение. # - `backup_root` должен быть валидным путем к корневой директории бэкапа. # POSTCONDITIONS: # - Дашборды экспортируются и сохраняются. +# - Ошибки экспорта логируются и не приводят к остановке скрипта. # - Возвращает `True` если все дашборды были экспортированы без критических ошибок, `False` иначе. def backup_dashboards( client: SupersetClient, @@ -90,7 +91,9 @@ def backup_dashboards( success_count += 1 except (SupersetAPIError, RequestException, IOError, OSError) as db_error: - logger.error(f"[STATE][backup_dashboards][FAILURE] Failed to export dashboard {dashboard_title}: {db_error}", exc_info=True) + logger.error(f"[STATE][backup_dashboards][FAILURE] Failed to export dashboard {dashboard_title} (ID: {dashboard_id}): {db_error}", exc_info=True) + # Продолжаем обработку других дашбордов + continue if config.consolidate: consolidate_archive_folders(backup_root / env_name , logger=logger) @@ -125,13 +128,18 @@ def main() -> int: backup_config = BackupConfig(rotate_archive=True) for env in environments: - results[env] = backup_dashboards( - clients[env], - env.upper(), - superset_backup_repo, - logger=logger, - config=backup_config - ) + try: + results[env] = backup_dashboards( + clients[env], + env.upper(), + superset_backup_repo, + logger=logger, + config=backup_config + ) + except Exception as env_error: + logger.critical(f"[STATE][main][FAILURE] Critical error for environment {env}: {env_error}", exc_info=True) + # Продолжаем обработку других окружений + results[env] = False if not all(results.values()): exit_code = 1 diff --git a/migration_script.py b/migration_script.py index 6ad8120..62059ff 100644 --- a/migration_script.py +++ b/migration_script.py @@ -1,237 +1,442 @@ -# -*- coding: utf-8 -*- -# CONTRACT: -# PURPOSE: Интерактивный скрипт для миграции ассетов Superset между различными окружениями. -# SPECIFICATION_LINK: mod_migration_script -# PRECONDITIONS: Наличие корректных конфигурационных файлов для подключения к Superset. -# POSTCONDITIONS: Выбранные ассеты успешно перенесены из исходного в целевое окружение. -# IMPORTS: [argparse, superset_tool.client, superset_tool.utils.init_clients, superset_tool.utils.logger, superset_tool.utils.fileio] -""" -[MODULE] Superset Migration Tool -@description: Интерактивный скрипт для миграции ассетов Superset между различными окружениями. -""" - -from whiptail import Whiptail +# [MODULE_PATH] superset_tool.migration_script +# [FILE] migration_script.py +# [SEMANTICS] migration, cli, superset, ui, logging, fallback, error-recovery, non-interactive, temp-files, batch-delete +# -------------------------------------------------------------- # [IMPORTS] +# -------------------------------------------------------------- +import json +import logging +import sys +import zipfile +from pathlib import Path +from typing import List, Optional, Tuple, Dict + from superset_tool.client import SupersetClient from superset_tool.utils.init_clients import setup_clients -from superset_tool.utils.logger import SupersetLogger from superset_tool.utils.fileio import ( - save_and_unpack_dashboard, - read_dashboard_from_disk, + create_temp_file, # новый контекстный менеджер update_yamls, - create_dashboard_export + create_dashboard_export, +) +from superset_tool.utils.whiptail_fallback import ( + menu, + checklist, + yesno, + msgbox, + inputbox, + gauge, ) -# [ENTITY: Class('Migration')] -# CONTRACT: -# PURPOSE: Инкапсулирует логику и состояние процесса миграции. -# SPECIFICATION_LINK: class_migration -# ATTRIBUTES: -# - name: logger, type: SupersetLogger, description: Экземпляр логгера. -# - name: from_c, type: SupersetClient, description: Клиент для исходного окружения. -# - name: to_c, type: SupersetClient, description: Клиент для целевого окружения. -# - name: dashboards_to_migrate, type: list, description: Список дашбордов для миграции. -# - name: db_config_replacement, type: dict, description: Конфигурация для замены данных БД. +from superset_tool.utils.logger import SupersetLogger # type: ignore +# [END_IMPORTS] + +# -------------------------------------------------------------- +# [ENTITY: Service('Migration')] +# [RELATION: Service('Migration')] -> [DEPENDS_ON] -> [PythonModule('superset_tool.client')] +# -------------------------------------------------------------- +""" +:purpose: Интерактивный процесс миграции дашбордов с возможностью + «удалить‑и‑перезаписать» при ошибке импорта. +:preconditions: + - Конфигурация Superset‑клиентов доступна, + - Пользователь может взаимодействовать через консольный UI. +:postconditions: + - Выбранные дашборды импортированы в целевое окружение. +:sideeffect: Записывает журнал в каталог ``logs/`` текущего рабочего каталога. +""" + class Migration: """ - Класс для управления процессом миграции дашбордов Superset. + :ivar SupersetLogger logger: Логгер. + :ivar bool enable_delete_on_failure: Флаг «удалять‑при‑ошибке». + :ivar SupersetClient from_c: Клиент‑источник. + :ivar SupersetClient to_c: Клиент‑назначение. + :ivar List[dict] dashboards_to_migrate: Список выбранных дашбордов. + :ivar Optional[dict] db_config_replacement: Параметры замены имён БД. + :ivar List[dict] _failed_imports: Внутренний буфер неудавшихся импортов + (ключи: slug, zip_content, dash_id). """ - def __init__(self): - self.logger = SupersetLogger(name="migration_script") - self.from_c: SupersetClient = None - self.to_c: SupersetClient = None - self.dashboards_to_migrate = [] - self.db_config_replacement = None - # END_FUNCTION___init__ - # [ENTITY: Function('run')] - # CONTRACT: - # PURPOSE: Запускает основной воркфлоу миграции, координируя все шаги. - # SPECIFICATION_LINK: func_run_migration - # PRECONDITIONS: None - # POSTCONDITIONS: Процесс миграции завершен. - def run(self): - """Запускает основной воркфлоу миграции.""" + # -------------------------------------------------------------- + # [ENTITY: Method('__init__')] + # -------------------------------------------------------------- + """ + :purpose: Создать сервис миграции и настроить логгер. + :preconditions: None. + :postconditions: ``self.logger`` готов к использованию; ``enable_delete_on_failure`` = ``False``. + """ + def __init__(self) -> None: + default_log_dir = Path.cwd() / "logs" + self.logger = SupersetLogger( + name="migration_script", + log_dir=default_log_dir, + level=logging.INFO, + console=True, + ) + self.enable_delete_on_failure = False + self.from_c: Optional[SupersetClient] = None + self.to_c: Optional[SupersetClient] = None + self.dashboards_to_migrate: List[dict] = [] + self.db_config_replacement: Optional[dict] = None + self._failed_imports: List[dict] = [] # <-- буфер ошибок + assert self.logger is not None, "Logger must be instantiated." + # [END_ENTITY] + + # -------------------------------------------------------------- + # [ENTITY: Method('run')] + # -------------------------------------------------------------- + """ + :purpose: Точка входа – последовательный запуск всех шагов миграции. + :preconditions: Логгер готов. + :postconditions: Скрипт завершён, пользователю выведено сообщение. + """ + def run(self) -> None: self.logger.info("[INFO][run][ENTER] Запуск скрипта миграции.") + self.ask_delete_on_failure() self.select_environments() self.select_dashboards() self.confirm_db_config_replacement() self.execute_migration() - self.logger.info("[INFO][run][EXIT] Скрипт миграции завершен.") - # END_FUNCTION_run + self.logger.info("[INFO][run][EXIT] Скрипт миграции завершён.") + # [END_ENTITY] - # [ENTITY: Function('select_environments')] - # CONTRACT: - # PURPOSE: Шаг 1. Обеспечивает интерактивный выбор исходного и целевого окружений. - # SPECIFICATION_LINK: func_select_environments - # PRECONDITIONS: None - # POSTCONDITIONS: Атрибуты `self.from_c` и `self.to_c` инициализированы валидными клиентами Superset. - def select_environments(self): - """Шаг 1: Выбор окружений (источник и назначение).""" - self.logger.info("[INFO][select_environments][ENTER] Шаг 1/4: Выбор окружений.") - + # -------------------------------------------------------------- + # [ENTITY: Method('ask_delete_on_failure')] + # -------------------------------------------------------------- + """ + :purpose: Запросить у пользователя, следует ли удалять дашборд при ошибке импорта. + :preconditions: None. + :postconditions: ``self.enable_delete_on_failure`` установлен. + """ + def ask_delete_on_failure(self) -> None: + self.enable_delete_on_failure = yesno( + "Поведение при ошибке импорта", + "Если импорт завершится ошибкой, удалить существующий дашборд и попытаться импортировать заново?", + ) + self.logger.info( + "[INFO][ask_delete_on_failure] Delete‑on‑failure = %s", + self.enable_delete_on_failure, + ) + # [END_ENTITY] + + # -------------------------------------------------------------- + # [ENTITY: Method('select_environments')] + # -------------------------------------------------------------- + """ + :purpose: Выбрать исходное и целевое окружения Superset. + :preconditions: ``setup_clients`` успешно инициализирует все клиенты. + :postconditions: ``self.from_c`` и ``self.to_c`` установлены. + """ + def select_environments(self) -> None: + self.logger.info("[INFO][select_environments][ENTER] Шаг 1/5: Выбор окружений.") try: all_clients = setup_clients(self.logger) available_envs = list(all_clients.keys()) except Exception as e: - self.logger.error(f"[ERROR][select_environments][FAILURE] Ошибка при инициализации клиентов: {e}", exc_info=True) - w = Whiptail(title="Ошибка", backtitle="Superset Migration Tool") - w.msgbox("Не удалось инициализировать клиенты. Проверьте конфигурацию.") + self.logger.error("[ERROR][select_environments] %s", e, exc_info=True) + msgbox("Ошибка", "Не удалось инициализировать клиенты.") return - w = Whiptail(title="Выбор окружения", backtitle="Superset Migration Tool") - - # Select source environment - (return_code, from_env_name) = w.menu("Выберите исходное окружение:", available_envs) - if return_code == 0: - self.from_c = all_clients[from_env_name] - self.logger.info(f"[INFO][select_environments][STATE] Исходное окружение: {from_env_name}") - else: + rc, from_env_name = menu( + title="Выбор окружения", + prompt="Исходное окружение:", + choices=available_envs, + ) + if rc != 0: return + self.from_c = all_clients[from_env_name] + self.logger.info("[INFO][select_environments] from = %s", from_env_name) - # Select target environment available_envs.remove(from_env_name) - (return_code, to_env_name) = w.menu("Выберите целевое окружение:", available_envs) - if return_code == 0: - self.to_c = all_clients[to_env_name] - self.logger.info(f"[INFO][select_environments][STATE] Целевое окружение: {to_env_name}") - else: + rc, to_env_name = menu( + title="Выбор окружения", + prompt="Целевое окружение:", + choices=available_envs, + ) + if rc != 0: return - - self.logger.info("[INFO][select_environments][EXIT] Шаг 1 завершен.") - # END_FUNCTION_select_environments - - # [ENTITY: Function('select_dashboards')] - # CONTRACT: - # PURPOSE: Шаг 2. Обеспечивает интерактивный выбор дашбордов для миграции. - # SPECIFICATION_LINK: func_select_dashboards - # PRECONDITIONS: `self.from_c` должен быть инициализирован. - # POSTCONDITIONS: `self.dashboards_to_migrate` содержит список выбранных дашбордов. - def select_dashboards(self): - """Шаг 2: Выбор дашбордов для миграции.""" - self.logger.info("[INFO][select_dashboards][ENTER] Шаг 2/4: Выбор дашбордов.") + self.to_c = all_clients[to_env_name] + self.logger.info("[INFO][select_environments] to = %s", to_env_name) + self.logger.info("[INFO][select_environments][EXIT] Шаг 1 завершён.") + # [END_ENTITY] + # -------------------------------------------------------------- + # [ENTITY: Method('select_dashboards')] + # -------------------------------------------------------------- + """ + :purpose: Позволить пользователю выбрать набор дашбордов для миграции. + :preconditions: ``self.from_c`` инициализирован. + :postconditions: ``self.dashboards_to_migrate`` заполнен. + """ + def select_dashboards(self) -> None: + self.logger.info("[INFO][select_dashboards][ENTER] Шаг 2/5: Выбор дашбордов.") try: - _, all_dashboards = self.from_c.get_dashboards() + _, all_dashboards = self.from_c.get_dashboards() # type: ignore[attr-defined] if not all_dashboards: - self.logger.warning("[WARN][select_dashboards][STATE] В исходном окружении не найдено дашбордов.") - w = Whiptail(title="Информация", backtitle="Superset Migration Tool") - w.msgbox("В исходном окружении не найдено дашбордов.") + self.logger.warning("[WARN][select_dashboards] No dashboards.") + msgbox("Информация", "В исходном окружении нет дашбордов.") return - w = Whiptail(title="Выбор дашбордов", backtitle="Superset Migration Tool") - - dashboard_options = [(str(d['id']), d['dashboard_title']) for d in all_dashboards] - - (return_code, selected_ids) = w.checklist("Выберите дашборды для миграции:", dashboard_options) + options = [("ALL", "Все дашборды")] + [ + (str(d["id"]), d["dashboard_title"]) for d in all_dashboards + ] - if return_code == 0: - self.dashboards_to_migrate = [d for d in all_dashboards if str(d['id']) in selected_ids] - self.logger.info(f"[INFO][select_dashboards][STATE] Выбрано дашбордов: {len(self.dashboards_to_migrate)}") + rc, selected = checklist( + title="Выбор дашбордов", + prompt="Отметьте нужные дашборды (введите номера):", + options=options, + ) + if rc != 0: + return + if "ALL" in selected: + self.dashboards_to_migrate = list(all_dashboards) + self.logger.info( + "[INFO][select_dashboards] Выбраны все дашборды (%d).", + len(self.dashboards_to_migrate), + ) + return + + self.dashboards_to_migrate = [ + d for d in all_dashboards if str(d["id"]) in selected + ] + self.logger.info( + "[INFO][select_dashboards] Выбрано %d дашбордов.", + len(self.dashboards_to_migrate), + ) except Exception as e: - self.logger.error(f"[ERROR][select_dashboards][FAILURE] Ошибка при получении или выборе дашбордов: {e}", exc_info=True) - w = Whiptail(title="Ошибка", backtitle="Superset Migration Tool") - w.msgbox("Произошла ошибка при работе с дашбордами.") - - self.logger.info("[INFO][select_dashboards][EXIT] Шаг 2 завершен.") - # END_FUNCTION_select_dashboards + self.logger.error("[ERROR][select_dashboards] %s", e, exc_info=True) + msgbox("Ошибка", "Не удалось получить список дашбордов.") + self.logger.info("[INFO][select_dashboards][EXIT] Шаг 2 завершён.") + # [END_ENTITY] - # [ENTITY: Function('confirm_db_config_replacement')] - # CONTRACT: - # PURPOSE: Шаг 3. Управляет процессом подтверждения и настройки замены конфигураций БД. - # SPECIFICATION_LINK: func_confirm_db_config_replacement - # PRECONDITIONS: `self.from_c` и `self.to_c` инициализированы. - # POSTCONDITIONS: `self.db_config_replacement` содержит конфигурацию для замены или `None`. - def confirm_db_config_replacement(self): - """Шаг 3: Подтверждение и настройка замены конфигурации БД.""" - self.logger.info("[INFO][confirm_db_config_replacement][ENTER] Шаг 3/4: Замена конфигурации БД.") - - w = Whiptail(title="Замена конфигурации БД", backtitle="Superset Migration Tool") - if w.yesno("Хотите ли вы заменить конфигурации баз данных в YAML-файлах?"): - (return_code, old_db_name) = w.inputbox("Введите имя заменяемой базы данных (например, db_dev):") - if return_code != 0: + # -------------------------------------------------------------- + # [ENTITY: Method('confirm_db_config_replacement')] + # -------------------------------------------------------------- + """ + :purpose: Запросить у пользователя, требуется ли заменить имена БД в YAML‑файлах. + :preconditions: None. + :postconditions: ``self.db_config_replacement`` либо ``None``, либо заполнен. + """ + def confirm_db_config_replacement(self) -> None: + if yesno("Замена БД", "Заменить конфигурацию БД в YAML‑файлах?"): + rc, old_name = inputbox("Замена БД", "Старое имя БД (например, db_dev):") + if rc != 0: return - - (return_code, new_db_name) = w.inputbox("Введите новое имя базы данных (например, db_prod):") - if return_code != 0: + rc, new_name = inputbox("Замена БД", "Новое имя БД (например, db_prod):") + if rc != 0: return - - self.db_config_replacement = {"old": {"database_name": old_db_name}, "new": {"database_name": new_db_name}} - self.logger.info(f"[INFO][confirm_db_config_replacement][STATE] Установлена ручная замена: {self.db_config_replacement}") + self.db_config_replacement = { + "old": {"database_name": old_name}, + "new": {"database_name": new_name}, + } + self.logger.info( + "[INFO][confirm_db_config_replacement] Replacement set: %s", + self.db_config_replacement, + ) else: - self.logger.info("[INFO][confirm_db_config_replacement][STATE] Замена конфигурации БД пропущена.") + self.logger.info("[INFO][confirm_db_config_replacement] Skipped.") + # [END_ENTITY] - self.logger.info("[INFO][confirm_db_config_replacement][EXIT] Шаг 3 завершен.") - # END_FUNCTION_confirm_db_config_replacement - - # [ENTITY: Function('execute_migration')] - # CONTRACT: - # PURPOSE: Шаг 4. Выполняет фактическую миграцию выбранных дашбордов. - # SPECIFICATION_LINK: func_execute_migration - # PRECONDITIONS: Все предыдущие шаги (`select_environments`, `select_dashboards`) успешно выполнены. - # POSTCONDITIONS: Выбранные дашборды перенесены в целевое окружение. - def execute_migration(self): - """Шаг 4: Выполнение миграции и обновления конфигураций.""" - self.logger.info("[INFO][execute_migration][ENTER] Шаг 4/4: Выполнение миграции.") - w = Whiptail(title="Выполнение миграции", backtitle="Superset Migration Tool") - - if not self.dashboards_to_migrate: - self.logger.warning("[WARN][execute_migration][STATE] Нет дашбордов для миграции.") - w.msgbox("Нет дашбордов для миграции. Завершение.") + # -------------------------------------------------------------- + # [ENTITY: Method('_batch_delete_by_ids')] + # -------------------------------------------------------------- + """ + :purpose: Удалить набор дашбордов по их ID единым запросом. + :preconditions: + - ``ids`` – непустой список целых чисел. + :postconditions: Все указанные дашборды удалены (если они существовали). + :sideeffect: Делает HTTP‑запрос ``DELETE /dashboard/?q=[ids]``. + """ + def _batch_delete_by_ids(self, ids: List[int]) -> None: + if not ids: + self.logger.debug("[DEBUG][_batch_delete_by_ids] Empty ID list – nothing to delete.") return - total_dashboards = len(self.dashboards_to_migrate) - self.logger.info(f"[INFO][execute_migration][STATE] Начало миграции {total_dashboards} дашбордов.") - with w.gauge("Выполняется миграция...", width=60, height=10) as gauge: - for i, dashboard in enumerate(self.dashboards_to_migrate): + self.logger.info("[INFO][_batch_delete_by_ids] Deleting dashboards IDs: %s", ids) + # Формируем параметр q в виде JSON‑массива, как требует Superset. + q_param = json.dumps(ids) + response = self.to_c.network.request( + method="DELETE", + endpoint="/dashboard/", + params={"q": q_param}, + ) + # Superset обычно отвечает 200/204; проверяем поле ``result`` при наличии. + if isinstance(response, dict) and response.get("result", True) is False: + self.logger.warning("[WARN][_batch_delete_by_ids] Unexpected delete response: %s", response) + else: + self.logger.info("[INFO][_batch_delete_by_ids] Delete request completed.") + # [END_ENTITY] + + # -------------------------------------------------------------- + # [ENTITY: Method('execute_migration')] + # -------------------------------------------------------------- + """ + :purpose: Выполнить экспорт‑импорт выбранных дашбордов, при необходимости + обновив YAML‑файлы. При ошибке импортов сохраняем slug, а потом + удаляем проблемные дашборды **по ID**, получив их через slug. + :preconditions: + - ``self.dashboards_to_migrate`` не пуст, + - ``self.from_c`` и ``self.to_c`` инициализированы. + :postconditions: + - Все успешные дашборды импортированы, + - Неудачные дашборды, если пользователь выбрал «удалять‑при‑ошибке», + удалены и повторно импортированы. + :sideeffect: При включённом флаге ``enable_delete_on_failure`` производится + батч‑удаление и повторный импорт. + """ + def execute_migration(self) -> None: + if not self.dashboards_to_migrate: + self.logger.warning("[WARN][execute_migration] No dashboards to migrate.") + msgbox("Информация", "Нет дашбордов для миграции.") + return + + total = len(self.dashboards_to_migrate) + self.logger.info("[INFO][execute_migration] Starting migration of %d dashboards.", total) + + # Передаём режим клиенту‑назначению + self.to_c.delete_before_reimport = self.enable_delete_on_failure # type: ignore[attr-defined] + + # ----------------------------------------------------------------- + # 1️⃣ Основной проход – экспорт → импорт → сбор ошибок + # ----------------------------------------------------------------- + with gauge("Миграция...", width=60, height=10) as g: + for i, dash in enumerate(self.dashboards_to_migrate): + dash_id = dash["id"] + dash_slug = dash.get("slug") # slug нужен для дальнейшего поиска + title = dash["dashboard_title"] + + progress = int((i / total) * 100) + g.set_text(f"Миграция: {title} ({i + 1}/{total})") + g.set_percent(progress) + try: - dashboard_id = dashboard['id'] - dashboard_title = dashboard['dashboard_title'] - - progress = int((i / total_dashboards) * 100) - self.logger.debug(f"[DEBUG][execute_migration][PROGRESS] {progress}% - Миграция: {dashboard_title}") - gauge.set_text(f"Миграция: {dashboard_title} ({i+1}/{total_dashboards})") - gauge.set_percent(progress) + # ------------------- Экспорт ------------------- + exported_content, _ = self.from_c.export_dashboard(dash_id) # type: ignore[attr-defined] - self.logger.info(f"[INFO][execute_migration][PROGRESS] Миграция дашборда: {dashboard_title} (ID: {dashboard_id})") + # ------------------- Временный ZIP ------------------- + with create_temp_file( + content=exported_content, + suffix=".zip", + logger=self.logger, + ) as tmp_zip_path: + self.logger.debug("[DEBUG][temp_zip] Temporary ZIP at %s", tmp_zip_path) - # 1. Экспорт - exported_content, _ = self.from_c.export_dashboard(dashboard_id) - zip_path, unpacked_path = save_and_unpack_dashboard(exported_content, f"temp_export_{dashboard_id}", unpack=True) - self.logger.info(f"[INFO][execute_migration][STATE] Дашборд экспортирован и распакован в {unpacked_path}") + # ------------------- Распаковка во временный каталог ------------------- + with create_temp_file(suffix=".dir", logger=self.logger) as tmp_unpack_dir: + self.logger.debug("[DEBUG][temp_dir] Temporary unpack dir: %s", tmp_unpack_dir) - # 2. Обновление YAML, если нужно - if self.db_config_replacement: - update_yamls(db_configs=[self.db_config_replacement], path=str(unpacked_path)) - self.logger.info(f"[INFO][execute_migration][STATE] YAML-файлы обновлены.") + with zipfile.ZipFile(tmp_zip_path, "r") as zip_ref: + zip_ref.extractall(tmp_unpack_dir) + self.logger.info("[INFO][execute_migration] Export unpacked to %s", tmp_unpack_dir) - # 3. Упаковка и импорт - new_zip_path = f"migrated_dashboard_{dashboard_id}.zip" - create_dashboard_export(new_zip_path, [str(unpacked_path)]) - - self.to_c.import_dashboard(new_zip_path) - self.logger.info(f"[INFO][execute_migration][SUCCESS] Дашборд {dashboard_title} успешно импортирован.") + # ------------------- YAML‑обновление (если нужно) ------------------- + if self.db_config_replacement: + update_yamls( + db_configs=[self.db_config_replacement], + path=str(tmp_unpack_dir), + ) + self.logger.info("[INFO][execute_migration] YAML‑files updated.") - except Exception as e: - self.logger.error(f"[ERROR][execute_migration][FAILURE] Ошибка при миграции дашборда {dashboard_title}: {e}", exc_info=True) - error_msg = f"Не удалось смигрировать дашборд: {dashboard_title}.\n\nОшибка: {e}" - w.msgbox(error_msg, width=60, height=15) - - gauge.set_percent(100) + # ------------------- Сборка нового ZIP ------------------- + with create_temp_file(suffix=".zip", logger=self.logger) as tmp_new_zip: + create_dashboard_export( + zip_path=tmp_new_zip, + source_paths=[str(tmp_unpack_dir)], + ) + self.logger.info("[INFO][execute_migration] Re‑packed to %s", tmp_new_zip) - self.logger.info("[INFO][execute_migration][STATE] Миграция завершена.") - w.msgbox("Миграция завершена!", width=40, height=8) - self.logger.info("[INFO][execute_migration][EXIT] Шаг 4 завершен.") - # END_FUNCTION_execute_migration + # ------------------- Импорт ------------------- + self.to_c.import_dashboard( + file_name=tmp_new_zip, + dash_id=dash_id, + dash_slug=dash_slug, + ) # type: ignore[attr-defined] -# END_CLASS_Migration + # Если импорт прошёл без исключений – фиксируем успех + self.logger.info("[INFO][execute_migration][SUCCESS] Dashboard %s imported.", title) -# [MAIN_EXECUTION_BLOCK] + except Exception as exc: + # Сохраняем данные для повторного импорта после batch‑удаления + self.logger.error("[ERROR][execute_migration] %s", exc, exc_info=True) + self._failed_imports.append( + { + "slug": dash_slug, + "dash_id": dash_id, + "zip_content": exported_content, + } + ) + msgbox("Ошибка", f"Не удалось мигрировать дашборд {title}.\n\n{exc}") + + g.set_percent(100) + + # ----------------------------------------------------------------- + # 2️⃣ Если возникли ошибки и пользователь согласился удалять – удаляем и повторяем + # ----------------------------------------------------------------- + if self.enable_delete_on_failure and self._failed_imports: + self.logger.info( + "[INFO][execute_migration] %d dashboards failed. Starting recovery procedure.", + len(self._failed_imports), + ) + + # ------------------- Получаем список дашбордов в целевом окружении ------------------- + _, target_dashboards = self.to_c.get_dashboards() # type: ignore[attr-defined] + slug_to_id: Dict[str, int] = { + d["slug"]: d["id"] for d in target_dashboards if "slug" in d and "id" in d + } + + # ------------------- Формируем список ID‑ов для удаления ------------------- + ids_to_delete: List[int] = [] + for fail in self._failed_imports: + slug = fail["slug"] + if slug and slug in slug_to_id: + ids_to_delete.append(slug_to_id[slug]) + else: + self.logger.warning( + "[WARN][execute_migration] Unable to map slug '%s' to ID on target.", + slug, + ) + + # ------------------- Batch‑удаление ------------------- + self._batch_delete_by_ids(ids_to_delete) + + # ------------------- Повторный импорт только для проблемных дашбордов ------------------- + for fail in self._failed_imports: + dash_slug = fail["slug"] + dash_id = fail["dash_id"] + zip_content = fail["zip_content"] + + # Один раз создаём временный ZIP‑файл из сохранённого содержимого + with create_temp_file( + content=zip_content, + suffix=".zip", + logger=self.logger, + ) as retry_zip_path: + self.logger.debug("[DEBUG][retry_zip] Retry ZIP for slug %s at %s", dash_slug, retry_zip_path) + + # Пере‑импортируем – **slug** передаётся, но клиент будет использовать ID + self.to_c.import_dashboard( + file_name=retry_zip_path, + dash_id=dash_id, + dash_slug=dash_slug, + ) # type: ignore[attr-defined] + + self.logger.info("[INFO][execute_migration][RECOVERED] Dashboard slug '%s' re‑imported.", dash_slug) + + # ----------------------------------------------------------------- + # 3️⃣ Финальная отчётность + # ----------------------------------------------------------------- + self.logger.info("[INFO][execute_migration] Migration finished.") + msgbox("Информация", "Миграция завершена!") + # [END_ENTITY] + +# [END_ENTITY: Service('Migration')] + +# -------------------------------------------------------------- +# Точка входа +# -------------------------------------------------------------- if __name__ == "__main__": - migration = Migration() - migration.run() -# END_MAIN_EXECUTION_BLOCK - -# END_MODULE_migration_script \ No newline at end of file + Migration().run() +# [END_FILE migration_script.py] +# -------------------------------------------------------------- \ No newline at end of file diff --git a/superset_tool/client.py b/superset_tool/client.py index c034be4..4002edc 100644 --- a/superset_tool/client.py +++ b/superset_tool/client.py @@ -1,82 +1,106 @@ -# pylint: disable=too-many-arguments,too-many-locals,too-many-statements,too-many-branches,unused-argument -""" -[MODULE] Superset API Client -@contract: Реализует полное взаимодействие с Superset API -""" +# [MODULE_PATH] superset_tool.client +# [FILE] client.py +# [SEMANTICS] superset, api, client, logging, error-handling, slug-support -# [IMPORTS] Стандартная библиотека +# -------------------------------------------------------------- +# [IMPORTS] +# -------------------------------------------------------------- import json -from typing import Optional, Dict, Tuple, List, Any, Union -import datetime -from pathlib import Path import zipfile +from pathlib import Path +from typing import Any, Dict, List, Optional, Tuple, Union + from requests import Response -# [IMPORTS] Локальные модули from superset_tool.models import SupersetConfig -from superset_tool.exceptions import ( - ExportError, - InvalidZipFormatError -) +from superset_tool.exceptions import ExportError, InvalidZipFormatError from superset_tool.utils.fileio import get_filename_from_headers from superset_tool.utils.logger import SupersetLogger from superset_tool.utils.network import APIClient +# [END_IMPORTS] -# [CONSTANTS] -DEFAULT_TIMEOUT = 30 - -# [TYPE-ALIASES] -JsonType = Union[Dict[str, Any], List[Dict[str, Any]]] -ResponseType = Tuple[bytes, str] - +# -------------------------------------------------------------- +# [ENTITY: Service('SupersetClient')] +# [RELATION: Service('SupersetClient')] -> [DEPENDS_ON] -> [PythonModule('superset_tool.utils.network')] +# -------------------------------------------------------------- +""" +:purpose: Класс‑обёртка над Superset REST‑API. +:preconditions: + - ``config`` – валидный объект :class:`SupersetConfig`. + - Доступен рабочий HTTP‑клиент :class:`APIClient`. +:postconditions: + - Объект готов к выполнению запросов (GET, POST, DELETE и т.д.). +:raises: + - :class:`TypeError` при передаче неверного типа конфигурации. +""" class SupersetClient: - """[MAIN-CONTRACT] Клиент для работы с Superset API""" - # [ENTITY: Function('__init__')] - # CONTRACT: - # PURPOSE: Инициализация клиента Superset. - # PRECONDITIONS: `config` должен быть валидным `SupersetConfig`. - # POSTCONDITIONS: Клиент успешно инициализирован. + """ + :ivar SupersetLogger logger: Логгер, используемый в клиенте. + :ivar SupersetConfig config: Текущая конфигурация подключения. + :ivar APIClient network: Объект‑обёртка над ``requests``. + :ivar bool delete_before_reimport: Флаг, указывающий, + что при ошибке импорта дашборд следует удалить и повторить импорт. + """ + + # -------------------------------------------------------------- + # [ENTITY: Method('__init__')] + # -------------------------------------------------------------- + """ + :purpose: Инициализировать клиент и передать ему логгер. + :preconditions: ``config`` – экземпляр :class:`SupersetConfig`. + :postconditions: Атрибуты ``logger``, ``config`` и ``network`` созданы, + ``delete_before_reimport`` установлен в ``False``. + """ def __init__(self, config: SupersetConfig, logger: Optional[SupersetLogger] = None): self.logger = logger or SupersetLogger(name="SupersetClient") - self.logger.info("[INFO][SupersetClient.__init__][ENTER] Initializing SupersetClient.") + self.logger.info("[INFO][SupersetClient.__init__] Initializing SupersetClient.") self._validate_config(config) self.config = config - self.env = config.env self.network = APIClient( config=config.dict(), verify_ssl=config.verify_ssl, timeout=config.timeout, - logger=self.logger + logger=self.logger, ) - self.logger.info("[INFO][SupersetClient.__init__][SUCCESS] SupersetClient initialized successfully.") - # END_FUNCTION___init__ + self.delete_before_reimport: bool = False + self.logger.info("[INFO][SupersetClient.__init__] SupersetClient initialized.") + # [END_ENTITY] - # [ENTITY: Function('_validate_config')] - # CONTRACT: - # PURPOSE: Валидация конфигурации клиента. - # PRECONDITIONS: `config` должен быть экземпляром `SupersetConfig`. - # POSTCONDITIONS: Конфигурация валидна. + # -------------------------------------------------------------- + # [ENTITY: Method('_validate_config')] + # -------------------------------------------------------------- + """ + :purpose: Проверить, что передан объект :class:`SupersetConfig`. + :preconditions: ``config`` – произвольный объект. + :postconditions: При несовпадении типов возбуждается :class:`TypeError`. + """ def _validate_config(self, config: SupersetConfig) -> None: - self.logger.debug("[DEBUG][SupersetClient._validate_config][ENTER] Validating config.") + self.logger.debug("[DEBUG][_validate_config][ENTER] Validating SupersetConfig.") if not isinstance(config, SupersetConfig): - self.logger.error("[ERROR][SupersetClient._validate_config][FAILURE] Invalid config type.") + self.logger.error("[ERROR][_validate_config][FAILURE] Invalid config type.") raise TypeError("Конфигурация должна быть экземпляром SupersetConfig") - self.logger.debug("[DEBUG][SupersetClient._validate_config][SUCCESS] Config validated.") - # END_FUNCTION__validate_config + self.logger.debug("[DEBUG][_validate_config][SUCCESS] Config is valid.") + # [END_ENTITY] + # -------------------------------------------------------------- + # [ENTITY: Property('headers')] + # -------------------------------------------------------------- @property def headers(self) -> dict: - """[INTERFACE] Базовые заголовки для API-вызовов.""" + """Базовые HTTP‑заголовки, используемые клиентом.""" return self.network.headers - # END_FUNCTION_headers + # [END_ENTITY] - # [ENTITY: Function('get_dashboards')] - # CONTRACT: - # PURPOSE: Получение списка дашбордов с пагинацией. - # PRECONDITIONS: None - # POSTCONDITIONS: Возвращает кортеж с общим количеством и списком дашбордов. + # -------------------------------------------------------------- + # [ENTITY: Method('get_dashboards')] + # -------------------------------------------------------------- + """ + :purpose: Получить список дашбордов с поддержкой пагинации. + :preconditions: None. + :postconditions: Возвращается кортеж ``(total_count, list_of_dashboards)``. + """ def get_dashboards(self, query: Optional[Dict] = None) -> Tuple[int, List[Dict]]: - self.logger.info("[INFO][SupersetClient.get_dashboards][ENTER] Getting dashboards.") + self.logger.info("[INFO][get_dashboards][ENTER] Fetching dashboards.") validated_query = self._validate_query_params(query) total_count = self._fetch_total_object_count(endpoint="/dashboard/") paginated_data = self._fetch_all_pages( @@ -85,236 +109,368 @@ class SupersetClient: "base_query": validated_query, "total_count": total_count, "results_field": "result", - } + }, ) - self.logger.info("[INFO][SupersetClient.get_dashboards][SUCCESS] Got dashboards.") + self.logger.info("[INFO][get_dashboards][SUCCESS] Got dashboards.") return total_count, paginated_data - # END_FUNCTION_get_dashboards + # [END_ENTITY] - # [ENTITY: Function('get_dashboard')] - # CONTRACT: - # PURPOSE: Получение метаданных дашборда по ID или SLUG. - # PRECONDITIONS: `dashboard_id_or_slug` должен существовать. - # POSTCONDITIONS: Возвращает метаданные дашборда. - def get_dashboard(self, dashboard_id_or_slug: str) -> dict: - self.logger.info(f"[INFO][SupersetClient.get_dashboard][ENTER] Getting dashboard: {dashboard_id_or_slug}") - response_data = self.network.request( - method="GET", - endpoint=f"/dashboard/{dashboard_id_or_slug}", - ) - self.logger.info(f"[INFO][SupersetClient.get_dashboard][SUCCESS] Got dashboard: {dashboard_id_or_slug}") - return response_data.get("result", {}) - # END_FUNCTION_get_dashboard - - # [ENTITY: Function('get_datasets')] - # CONTRACT: - # PURPOSE: Получение списка датасетов с пагинацией. - # PRECONDITIONS: None - # POSTCONDITIONS: Возвращает кортеж с общим количеством и списком датасетов. - def get_datasets(self, query: Optional[Dict] = None) -> Tuple[int, List[Dict]]: - self.logger.info("[INFO][SupersetClient.get_datasets][ENTER] Getting datasets.") - total_count = self._fetch_total_object_count(endpoint="/dataset/") - base_query = { - "columns": ["id", "table_name", "sql", "database", "schema"], - "page": 0, - "page_size": 100 - } - validated_query = {**base_query, **(query or {})} - datasets = self._fetch_all_pages( - endpoint="/dataset/", - pagination_options={ - "base_query": validated_query, - "total_count": total_count, - "results_field": "result", - } - ) - self.logger.info("[INFO][SupersetClient.get_datasets][SUCCESS] Got datasets.") - return total_count, datasets - # END_FUNCTION_get_datasets - - # [ENTITY: Function('get_dataset')] - # CONTRACT: - # PURPOSE: Получение метаданных датасета по ID. - # PRECONDITIONS: `dataset_id` должен существовать. - # POSTCONDITIONS: Возвращает метаданные датасета. - def get_dataset(self, dataset_id: str) -> dict: - self.logger.info(f"[INFO][SupersetClient.get_dataset][ENTER] Getting dataset: {dataset_id}") - response_data = self.network.request( - method="GET", - endpoint=f"/dataset/{dataset_id}", - ) - self.logger.info(f"[INFO][SupersetClient.get_dataset][SUCCESS] Got dataset: {dataset_id}") - return response_data.get("result", {}) - # END_FUNCTION_get_dataset - - def get_databases(self) -> List[Dict]: - self.logger.info("[INFO][SupersetClient.get_databases][ENTER] Getting databases.") - response = self.network.request("GET", "/database/") - self.logger.info("[INFO][SupersetClient.get_databases][SUCCESS] Got databases.") - return response.get('result', []) - - # [ENTITY: Function('export_dashboard')] - # CONTRACT: - # PURPOSE: Экспорт дашборда в ZIP-архив. - # PRECONDITIONS: `dashboard_id` должен существовать. - # POSTCONDITIONS: Возвращает содержимое ZIP-архива и имя файла. + # -------------------------------------------------------------- + # [ENTITY: Method('export_dashboard')] + # -------------------------------------------------------------- + """ + :purpose: Скачать дашборд в виде ZIP‑архива. + :preconditions: ``dashboard_id`` – существующий идентификатор. + :postconditions: Возвращается бинарное содержимое и имя файла. + """ def export_dashboard(self, dashboard_id: int) -> Tuple[bytes, str]: - self.logger.info(f"[INFO][SupersetClient.export_dashboard][ENTER] Exporting dashboard: {dashboard_id}") + self.logger.info("[INFO][export_dashboard][ENTER] Exporting dashboard %s.", dashboard_id) response = self.network.request( method="GET", endpoint="/dashboard/export/", params={"q": json.dumps([dashboard_id])}, stream=True, - raw_response=True + raw_response=True, ) self._validate_export_response(response, dashboard_id) filename = self._resolve_export_filename(response, dashboard_id) - content = response.content - self.logger.info(f"[INFO][SupersetClient.export_dashboard][SUCCESS] Exported dashboard: {dashboard_id}") - return content, filename - # END_FUNCTION_export_dashboard + self.logger.info("[INFO][export_dashboard][SUCCESS] Exported dashboard %s.", dashboard_id) + return response.content, filename + # [END_ENTITY] - # [ENTITY: Function('_validate_export_response')] - # CONTRACT: - # PURPOSE: Валидация ответа экспорта. - # PRECONDITIONS: `response` должен быть валидным HTTP-ответом. - # POSTCONDITIONS: Ответ валиден. - def _validate_export_response(self, response: Response, dashboard_id: int) -> None: - self.logger.debug(f"[DEBUG][SupersetClient._validate_export_response][ENTER] Validating export response for dashboard: {dashboard_id}") - content_type = response.headers.get('Content-Type', '') - if 'application/zip' not in content_type: - self.logger.error(f"[ERROR][SupersetClient._validate_export_response][FAILURE] Invalid content type: {content_type}") - raise ExportError(f"Получен не ZIP-архив (Content-Type: {content_type})") - if not response.content: - self.logger.error("[ERROR][SupersetClient._validate_export_response][FAILURE] Empty response content.") - raise ExportError("Получены пустые данные при экспорте") - self.logger.debug(f"[DEBUG][SupersetClient._validate_export_response][SUCCESS] Export response validated for dashboard: {dashboard_id}") - # END_FUNCTION__validate_export_response + # -------------------------------------------------------------- + # [ENTITY: Method('import_dashboard')] + # -------------------------------------------------------------- + """ + :purpose: Импортировать дашборд из ZIP‑файла. При неуспешном импорте, + если ``delete_before_reimport`` = True, сначала удаляется + дашборд по ID, затем импорт повторяется. + :preconditions: + - ``file_name`` – путь к существующему ZIP‑архиву (str|Path). + - ``dash_id`` – (опционально) ID дашборда, который следует удалить. + :postconditions: Возвращается словарь‑ответ API при успехе. + """ + def import_dashboard( + self, + file_name: Union[str, Path], + dash_id: Optional[int] = None, + dash_slug: Optional[str] = None, # сохраняем для возможного логирования + ) -> Dict: + # ----------------------------------------------------------------- + # 1️⃣ Приводим путь к строке (API‑клиент ожидает str) + # ----------------------------------------------------------------- + file_path: str = str(file_name) # <--- гарантируем тип str + self._validate_import_file(file_path) - # [ENTITY: Function('_resolve_export_filename')] - # CONTRACT: - # PURPOSE: Определение имени экспортируемого файла. - # PRECONDITIONS: `response` должен быть валидным HTTP-ответом. - # POSTCONDITIONS: Возвращает имя файла. - def _resolve_export_filename(self, response: Response, dashboard_id: int) -> str: - self.logger.debug(f"[DEBUG][SupersetClient._resolve_export_filename][ENTER] Resolving export filename for dashboard: {dashboard_id}") - filename = get_filename_from_headers(response.headers) - if not filename: - timestamp = datetime.datetime.now().strftime('%Y%m%dT%H%M%S') - filename = f"dashboard_export_{dashboard_id}_{timestamp}.zip" - self.logger.warning(f"[WARNING][SupersetClient._resolve_export_filename][STATE_CHANGE] Could not resolve filename from headers, generated: {filename}") - self.logger.debug(f"[DEBUG][SupersetClient._resolve_export_filename][SUCCESS] Resolved export filename: {filename}") - return filename - # END_FUNCTION__resolve_export_filename + try: + import_response = self._do_import(file_path) + self.logger.info("[INFO][import_dashboard] Imported %s.", file_path) + return import_response - # [ENTITY: Function('export_to_file')] - # CONTRACT: - # PURPOSE: Экспорт дашборда напрямую в файл. - # PRECONDITIONS: `output_dir` должен существовать. - # POSTCONDITIONS: Дашборд сохранен в файл. - def export_to_file(self, dashboard_id: int, output_dir: Union[str, Path]) -> Path: - self.logger.info(f"[INFO][SupersetClient.export_to_file][ENTER] Exporting dashboard {dashboard_id} to file in {output_dir}") - output_dir = Path(output_dir) - if not output_dir.exists(): - self.logger.error(f"[ERROR][SupersetClient.export_to_file][FAILURE] Output directory does not exist: {output_dir}") - raise FileNotFoundError(f"Директория {output_dir} не найдена") - content, filename = self.export_dashboard(dashboard_id) - target_path = output_dir / filename - with open(target_path, 'wb') as f: - f.write(content) - self.logger.info(f"[INFO][SupersetClient.export_to_file][SUCCESS] Exported dashboard {dashboard_id} to {target_path}") - return target_path - # END_FUNCTION_export_to_file + except Exception as exc: + # ----------------------------------------------------------------- + # 2️⃣ Логируем первую неудачу, пытаемся удалить и повторить, + # только если включён флаг ``delete_before_reimport``. + # ----------------------------------------------------------------- + self.logger.error( + "[ERROR][import_dashboard] First import attempt failed: %s", + exc, + exc_info=True, + ) + if not self.delete_before_reimport: + raise - # [ENTITY: Function('import_dashboard')] - # CONTRACT: - # PURPOSE: Импорт дашборда из ZIP-архива. - # PRECONDITIONS: `file_name` должен быть валидным ZIP-файлом. - # POSTCONDITIONS: Возвращает ответ API. - def import_dashboard(self, file_name: Union[str, Path]) -> Dict: - self.logger.info(f"[INFO][SupersetClient.import_dashboard][ENTER] Importing dashboard from: {file_name}") - self._validate_import_file(file_name) - import_response = self.network.upload_file( + # ----------------------------------------------------------------- + # 3️⃣ Выбираем, как искать дашборд для удаления. + # При наличии ``dash_id`` – удаляем его. + # Иначе, если известен ``dash_slug`` – переводим его в ID ниже. + # ----------------------------------------------------------------- + target_id: Optional[int] = dash_id + if target_id is None and dash_slug is not None: + # Попытка динамического определения ID через slug. + # Мы делаем отдельный запрос к /dashboard/ (поисковый фильтр). + self.logger.debug("[DEBUG][import_dashboard] Resolving ID by slug '%s'.", dash_slug) + try: + _, candidates = self.get_dashboards( + query={"filters": [{"col": "slug", "op": "eq", "value": dash_slug}]} + ) + if candidates: + target_id = candidates[0]["id"] + self.logger.debug("[DEBUG][import_dashboard] Resolved slug → ID %s.", target_id) + except Exception as e: + self.logger.warning( + "[WARN][import_dashboard] Could not resolve slug '%s' to ID: %s", + dash_slug, + e, + ) + + # Если всё‑равно нет ID – считаем невозможным корректно удалить. + if target_id is None: + self.logger.error("[ERROR][import_dashboard] No ID available for delete‑retry.") + raise + + # ----------------------------------------------------------------- + # 4️⃣ Удаляем найденный дашборд (по ID) + # ----------------------------------------------------------------- + try: + self.delete_dashboard(target_id) + self.logger.info("[INFO][import_dashboard] Deleted dashboard ID %s, retrying import.", target_id) + except Exception as del_exc: + self.logger.error("[ERROR][import_dashboard] Delete failed: %s", del_exc, exc_info=True) + raise + + # ----------------------------------------------------------------- + # 5️⃣ Повторный импорт (тот же файл) + # ----------------------------------------------------------------- + try: + import_response = self._do_import(file_path) + self.logger.info("[INFO][import_dashboard] Re‑import succeeded.") + return import_response + except Exception as rec_exc: + self.logger.error( + "[ERROR][import_dashboard] Re‑import after delete failed: %s", + rec_exc, + exc_info=True, + ) + raise + # [END_ENTITY] + + # -------------------------------------------------------------- + # [ENTITY: Method('_do_import')] + # -------------------------------------------------------------- + """ + :purpose: Выполнить один запрос на импорт без обработки исключений. + :preconditions: ``file_name`` уже проверен и существует. + :postconditions: Возвращается словарь‑ответ API. + """ + def _do_import(self, file_name: Union[str, Path]) -> Dict: + return self.network.upload_file( endpoint="/dashboard/import/", file_info={ "file_obj": Path(file_name), "file_name": Path(file_name).name, "form_field": "formData", }, - extra_data={'overwrite': 'true'}, - timeout=self.config.timeout * 2 + extra_data={"overwrite": "true"}, + timeout=self.config.timeout * 2, ) - self.logger.info(f"[INFO][SupersetClient.import_dashboard][SUCCESS] Imported dashboard from: {file_name}") - return import_response - # END_FUNCTION_import_dashboard + # [END_ENTITY] - # [ENTITY: Function('_validate_query_params')] - # CONTRACT: - # PURPOSE: Нормализация и валидация параметров запроса. - # PRECONDITIONS: None - # POSTCONDITIONS: Возвращает валидный словарь параметров. + # -------------------------------------------------------------- + # [ENTITY: Method('delete_dashboard')] + # -------------------------------------------------------------- + """ + :purpose: Удалить дашборд **по ID или slug**. + :preconditions: + - ``dashboard_id`` – int ID **или** str slug дашборда. + :postconditions: На уровне API считается, что ресурс удалён + (HTTP 200/204). Логируется результат операции. + """ + def delete_dashboard(self, dashboard_id: Union[int, str]) -> None: + # ``dashboard_id`` может быть целым числом или строковым slug. + self.logger.info("[INFO][delete_dashboard][ENTER] Deleting dashboard %s.", dashboard_id) + response = self.network.request( + method="DELETE", + endpoint=f"/dashboard/{dashboard_id}", + ) + # Superset обычно возвращает 200/204. Если есть поле ``result`` – проверяем. + if response.get("result", True) is not False: + self.logger.info("[INFO][delete_dashboard] Dashboard %s deleted.", dashboard_id) + else: + self.logger.warning("[WARN][delete_dashboard] Unexpected response while deleting %s.", dashboard_id) + # [END_ENTITY] + + # -------------------------------------------------------------- + # [ENTITY: Method('_extract_dashboard_id_from_zip')] + # -------------------------------------------------------------- + """ + :purpose: Попытаться извлечь **ID** дашборда из ``metadata.yaml`` внутри ZIP‑архива. + :preconditions: ``file_name`` – путь к корректному ZIP‑файлу. + :postconditions: Возвращается ``int`` ID или ``None``. + """ + def _extract_dashboard_id_from_zip(self, file_name: Union[str, Path]) -> Optional[int]: + try: + import yaml + with zipfile.ZipFile(file_name, "r") as zf: + for name in zf.namelist(): + if name.endswith("metadata.yaml"): + with zf.open(name) as meta_file: + meta = yaml.safe_load(meta_file.read()) + dash_id = meta.get("dashboard_uuid") or meta.get("dashboard_id") + if dash_id is not None: + return int(dash_id) + except Exception as exc: + self.logger.error("[ERROR][_extract_dashboard_id_from_zip] %s", exc, exc_info=True) + return None + # [END_ENTITY] + + # -------------------------------------------------------------- + # [ENTITY: Method('_extract_dashboard_slug_from_zip')] + # -------------------------------------------------------------- + """ + :purpose: Попытаться извлечь **slug** дашборда из ``metadata.yaml`` внутри ZIP‑архива. + :preconditions: ``file_name`` – путь к корректному ZIP‑файлу. + :postconditions: Возвращается строка‑slug или ``None``. + """ + def _extract_dashboard_slug_from_zip(self, file_name: Union[str, Path]) -> Optional[str]: + try: + import yaml + with zipfile.ZipFile(file_name, "r") as zf: + for name in zf.namelist(): + if name.endswith("metadata.yaml"): + with zf.open(name) as meta_file: + meta = yaml.safe_load(meta_file.read()) + slug = meta.get("slug") + if slug: + return str(slug) + except Exception as exc: + self.logger.error("[ERROR][_extract_dashboard_slug_from_zip] %s", exc, exc_info=True) + return None + # [END_ENTITY] + + # -------------------------------------------------------------- + # [ENTITY: Method('_validate_export_response')] + # -------------------------------------------------------------- + """ + :purpose: Проверить, что ответ от ``/dashboard/export/`` – ZIP‑архив с данными. + :preconditions: ``response`` – объект :class:`requests.Response`. + :postconditions: При несоответствии возбуждается :class:`ExportError`. + """ + def _validate_export_response(self, response: Response, dashboard_id: int) -> None: + self.logger.debug("[DEBUG][_validate_export_response][ENTER] Validating response for %s.", dashboard_id) + content_type = response.headers.get("Content-Type", "") + if "application/zip" not in content_type: + self.logger.error("[ERROR][_validate_export_response][FAILURE] Invalid content type: %s", content_type) + raise ExportError(f"Получен не ZIP‑архив (Content-Type: {content_type})") + if not response.content: + self.logger.error("[ERROR][_validate_export_response][FAILURE] Empty response content.") + raise ExportError("Получены пустые данные при экспорте") + self.logger.debug("[DEBUG][_validate_export_response][SUCCESS] Response validated.") + # [END_ENTITY] + + # -------------------------------------------------------------- + # [ENTITY: Method('_resolve_export_filename')] + # -------------------------------------------------------------- + """ + :purpose: Определить имя файла, полученного из заголовков ответа. + :preconditions: ``response.headers`` содержит (возможно) ``Content‑Disposition``. + :postconditions: Возвращается строка‑имя файла. + """ + def _resolve_export_filename(self, response: Response, dashboard_id: int) -> str: + self.logger.debug("[DEBUG][_resolve_export_filename][ENTER] Resolving filename.") + filename = get_filename_from_headers(response.headers) + if not filename: + from datetime import datetime + timestamp = datetime.now().strftime("%Y%m%dT%H%M%S") + filename = f"dashboard_export_{dashboard_id}_{timestamp}.zip" + self.logger.warning("[WARN][_resolve_export_filename] Generated filename: %s", filename) + self.logger.debug("[DEBUG][_resolve_export_filename][SUCCESS] Filename: %s", filename) + return filename + # [END_ENTITY] + + # -------------------------------------------------------------- + # [ENTITY: Method('_validate_query_params')] + # -------------------------------------------------------------- + """ + :purpose: Сформировать корректный набор параметров запроса. + :preconditions: ``query`` – любой словарь или ``None``. + :postconditions: Возвращается словарь с обязательными полями. + """ def _validate_query_params(self, query: Optional[Dict]) -> Dict: - self.logger.debug("[DEBUG][SupersetClient._validate_query_params][ENTER] Validating query params.") base_query = { "columns": ["slug", "id", "changed_on_utc", "dashboard_title", "published"], "page": 0, - "page_size": 1000 + "page_size": 1000, } - validated_query = {**base_query, **(query or {})} - self.logger.debug(f"[DEBUG][SupersetClient._validate_query_params][SUCCESS] Validated query params: {validated_query}") - return validated_query - # END_FUNCTION__validate_query_params + validated = {**base_query, **(query or {})} + self.logger.debug("[DEBUG][_validate_query_params] %s", validated) + return validated + # [END_ENTITY] - # [ENTITY: Function('_fetch_total_object_count')] - # CONTRACT: - # PURPOSE: Получение общего количества объектов. - # PRECONDITIONS: `endpoint` должен быть валидным. - # POSTCONDITIONS: Возвращает общее количество объектов. - def _fetch_total_object_count(self, endpoint:str) -> int: - self.logger.debug(f"[DEBUG][SupersetClient._fetch_total_object_count][ENTER] Fetching total object count for endpoint: {endpoint}") - query_params_for_count = {'page': 0, 'page_size': 1} + # -------------------------------------------------------------- + # [ENTITY: Method('_fetch_total_object_count')] + # -------------------------------------------------------------- + """ + :purpose: Получить общее количество объектов по указанному endpoint. + :preconditions: ``endpoint`` – строка, начинающаяся с «/». + :postconditions: Возвращается целое число. + """ + def _fetch_total_object_count(self, endpoint: str) -> int: + query_params_for_count = {"page": 0, "page_size": 1} count = self.network.fetch_paginated_count( endpoint=endpoint, query_params=query_params_for_count, - count_field="count" + count_field="count", ) - self.logger.debug(f"[DEBUG][SupersetClient._fetch_total_object_count][SUCCESS] Fetched total object count: {count}") + self.logger.debug("[DEBUG][_fetch_total_object_count] %s → %s", endpoint, count) return count - # END_FUNCTION__fetch_total_object_count + # [END_ENTITY] - # [ENTITY: Function('_fetch_all_pages')] - # CONTRACT: - # PURPOSE: Обход всех страниц пагинированного API. - # PRECONDITIONS: `pagination_options` должен содержать необходимые параметры. - # POSTCONDITIONS: Возвращает список всех объектов. - def _fetch_all_pages(self, endpoint:str, pagination_options: Dict) -> List[Dict]: - self.logger.debug(f"[DEBUG][SupersetClient._fetch_all_pages][ENTER] Fetching all pages for endpoint: {endpoint}") + # -------------------------------------------------------------- + # [ENTITY: Method('_fetch_all_pages')] + # -------------------------------------------------------------- + """ + :purpose: Обойти все страницы пагинированного API. + :preconditions: ``pagination_options`` – словарь, сформированный + в ``_validate_query_params`` и ``_fetch_total_object_count``. + :postconditions: Возвращается список всех объектов. + """ + def _fetch_all_pages(self, endpoint: str, pagination_options: Dict) -> List[Dict]: all_data = self.network.fetch_paginated_data( endpoint=endpoint, - pagination_options=pagination_options + pagination_options=pagination_options, ) - self.logger.debug(f"[DEBUG][SupersetClient._fetch_all_pages][SUCCESS] Fetched all pages for endpoint: {endpoint}") + self.logger.debug("[DEBUG][_fetch_all_pages] Fetched %s items from %s.", len(all_data), endpoint) return all_data - # END_FUNCTION__fetch_all_pages + # [END_ENTITY] - # [ENTITY: Function('_validate_import_file')] - # CONTRACT: - # PURPOSE: Проверка файла перед импортом. - # PRECONDITIONS: `zip_path` должен быть путем к файлу. - # POSTCONDITIONS: Файл валиден. + # -------------------------------------------------------------- + # [ENTITY: Method('_validate_import_file')] + # -------------------------------------------------------------- + """ + :purpose: Проверить, что файл существует, является ZIP‑архивом и + содержит ``metadata.yaml``. + :preconditions: ``zip_path`` – путь к файлу. + :postconditions: При невалидном файле возбуждается :class:`InvalidZipFormatError`. + """ def _validate_import_file(self, zip_path: Union[str, Path]) -> None: - self.logger.debug(f"[DEBUG][SupersetClient._validate_import_file][ENTER] Validating import file: {zip_path}") path = Path(zip_path) if not path.exists(): - self.logger.error(f"[ERROR][SupersetClient._validate_import_file][FAILURE] Import file does not exist: {zip_path}") + self.logger.error("[ERROR][_validate_import_file] File not found: %s", zip_path) raise FileNotFoundError(f"Файл {zip_path} не существует") if not zipfile.is_zipfile(path): - self.logger.error(f"[ERROR][SupersetClient._validate_import_file][FAILURE] Import file is not a zip file: {zip_path}") - raise InvalidZipFormatError(f"Файл {zip_path} не является ZIP-архивом") - with zipfile.ZipFile(path, 'r') as zf: - if not any(n.endswith('metadata.yaml') for n in zf.namelist()): - self.logger.error(f"[ERROR][SupersetClient._validate_import_file][FAILURE] Import file does not contain metadata.yaml: {zip_path}") + self.logger.error("[ERROR][_validate_import_file] Not a zip file: %s", zip_path) + raise InvalidZipFormatError(f"Файл {zip_path} не является ZIP‑архивом") + with zipfile.ZipFile(path, "r") as zf: + if not any(n.endswith("metadata.yaml") for n in zf.namelist()): + self.logger.error("[ERROR][_validate_import_file] No metadata.yaml in %s", zip_path) raise InvalidZipFormatError(f"Архив {zip_path} не содержит 'metadata.yaml'") - self.logger.debug(f"[DEBUG][SupersetClient._validate_import_file][SUCCESS] Validated import file: {zip_path}") - # END_FUNCTION__validate_import_file + self.logger.debug("[DEBUG][_validate_import_file] File %s validated.", zip_path) + # [END_ENTITY] + # -------------------------------------------------------------- + # [ENTITY: Method('get_datasets')] + # -------------------------------------------------------------- + """ + :purpose: Получить список датасетов с поддержкой пагинации. + :preconditions: None. + :postconditions: Возвращается кортеж ``(total_count, list_of_datasets)``. + """ + def get_datasets(self, query: Optional[Dict] = None) -> Tuple[int, List[Dict]]: + self.logger.info("[INFO][get_datasets][ENTER] Fetching datasets.") + validated_query = self._validate_query_params(query) + total_count = self._fetch_total_object_count(endpoint="/dataset/") + paginated_data = self._fetch_all_pages( + endpoint="/dataset/", + pagination_options={ + "base_query": validated_query, + "total_count": total_count, + "results_field": "result", + }, + ) + self.logger.info("[INFO][get_datasets][SUCCESS] Got datasets.") + return total_count, paginated_data + # [END_ENTITY] + + +# [END_FILE client.py] \ No newline at end of file diff --git a/superset_tool/utils/logger.py b/superset_tool/utils/logger.py index 59111f0..d0c44c4 100644 --- a/superset_tool/utils/logger.py +++ b/superset_tool/utils/logger.py @@ -1,88 +1,205 @@ -# [MODULE] Superset Tool Logger Utility -# PURPOSE: Предоставляет стандартизированный класс-обертку `SupersetLogger` для настройки и использования логирования в проекте. -# COHERENCE: Модуль согласован со стандартной библиотекой `logging`, расширяя ее для нужд проекта. +# [MODULE_PATH] superset_tool.utils.logger +# [FILE] logger.py +# [SEMANTICS] logging, utils, ai‑friendly, infrastructure +# -------------------------------------------------------------- +# [IMPORTS] +# -------------------------------------------------------------- import logging import sys from datetime import datetime from pathlib import Path -from typing import Optional +from typing import Optional, Any, Mapping +# [END_IMPORTS] -# CONTRACT: -# PURPOSE: Обеспечивает унифицированную настройку логгера с выводом в консоль и/или файл. -# PRECONDITIONS: -# - `name` должен быть строкой. -# - `level` должен быть валидным уровнем логирования (например, `logging.INFO`). -# POSTCONDITIONS: -# - Создает и настраивает логгер с указанным именем и уровнем. -# - Добавляет обработчики для вывода в файл (если указан `log_dir`) и в консоль (если `console=True`). -# - Очищает все предыдущие обработчики для данного логгера, чтобы избежать дублирования. -# PARAMETERS: -# - name: str - Имя логгера. -# - log_dir: Optional[Path] - Директория для сохранения лог-файлов. -# - level: int - Уровень логирования. -# - console: bool - Флаг для включения вывода в консоль. +# -------------------------------------------------------------- +# [ENTITY: Service('SupersetLogger')] +# -------------------------------------------------------------- +""" +:purpose: Универсальная обёртка над ``logging.Logger``. Позволяет: + • задавать уровень и вывод в консоль/файл, + • передавать произвольные ``extra``‑поля, + • использовать привычный API (info, debug, warning, error, + critical, exception) без «падения» при неверных аргументах. +:preconditions: + - ``name`` – строка‑идентификатор логгера, + - ``level`` – валидный уровень из ``logging``, + - ``log_dir`` – при указании директория, куда будет писаться файл‑лог. +:postconditions: + - Создан полностью сконфигурированный ``logging.Logger`` без + дублирующих обработчиков. +""" class SupersetLogger: + """ + :ivar logging.Logger logger: Внутренний стандартный логгер. + :ivar bool propagate: Отключаем наследование записей, чтобы + сообщения не «проваливались» выше. + """ + + # -------------------------------------------------------------- + # [ENTITY: Method('__init__')] + # -------------------------------------------------------------- + """ + :purpose: Конфигурировать базовый логгер, добавить обработчики + консоли и/или файла, очистить прежние обработчики. + :preconditions: Параметры валидны. + :postconditions: ``self.logger`` готов к использованию. + """ def __init__( self, name: str = "superset_tool", log_dir: Optional[Path] = None, level: int = logging.INFO, - console: bool = True - ): + console: bool = True, + ) -> None: self.logger = logging.getLogger(name) self.logger.setLevel(level) + self.logger.propagate = False # ← не «прокидываем» записи выше - formatter = logging.Formatter( - '%(asctime)s - %(levelname)s - %(message)s' - ) + formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") - # [ANCHOR] HANDLER_RESET - # Очищаем существующие обработчики, чтобы избежать дублирования вывода при повторной инициализации. + # ---- Очистка предыдущих обработчиков (важно при повторных инициализациях) ---- if self.logger.hasHandlers(): self.logger.handlers.clear() - # [ANCHOR] FILE_HANDLER + # ---- Файловый обработчик (если указана директория) ---- if log_dir: log_dir.mkdir(parents=True, exist_ok=True) timestamp = datetime.now().strftime("%Y%m%d") file_handler = logging.FileHandler( - log_dir / f"{name}_{timestamp}.log", encoding='utf-8' + log_dir / f"{name}_{timestamp}.log", encoding="utf-8" ) file_handler.setFormatter(formatter) self.logger.addHandler(file_handler) - # [ANCHOR] CONSOLE_HANDLER + # ---- Консольный обработчик ---- if console: console_handler = logging.StreamHandler(sys.stdout) console_handler.setFormatter(formatter) self.logger.addHandler(console_handler) - # CONTRACT: - # PURPOSE: (HELPER) Генерирует строку с текущей датой для имени лог-файла. - # RETURN: str - Отформатированная дата (YYYYMMDD). - def _get_timestamp(self) -> str: - return datetime.now().strftime("%Y%m%d") - # END_FUNCTION__get_timestamp + # [END_ENTITY] - # [INTERFACE] Методы логирования - def info(self, message: str, extra: Optional[dict] = None, exc_info: bool = False): - self.logger.info(message, extra=extra, exc_info=exc_info) + # -------------------------------------------------------------- + # [ENTITY: Method('_log')] + # -------------------------------------------------------------- + """ + :purpose: Универсальная вспомогательная обёртка над + ``logging.Logger.``. Принимает любые ``*args`` + (подстановочные параметры) и ``extra``‑словарь. + :preconditions: + - ``level_method`` – один из методов ``logger``, + - ``msg`` – строка‑шаблон, + - ``*args`` – значения для ``%``‑подстановок, + - ``extra`` – пользовательские атрибуты (может быть ``None``). + :postconditions: Запись в журнал выполнена. + """ + def _log( + self, + level_method: Any, + msg: str, + *args: Any, + extra: Optional[Mapping[str, Any]] = None, + exc_info: bool = False, + ) -> None: + if extra is not None: + level_method(msg, *args, extra=extra, exc_info=exc_info) + else: + level_method(msg, *args, exc_info=exc_info) - def error(self, message: str, extra: Optional[dict] = None, exc_info: bool = False): - self.logger.error(message, extra=extra, exc_info=exc_info) + # [END_ENTITY] - def warning(self, message: str, extra: Optional[dict] = None, exc_info: bool = False): - self.logger.warning(message, extra=extra, exc_info=exc_info) + # -------------------------------------------------------------- + # [ENTITY: Method('info')] + # -------------------------------------------------------------- + """ + :purpose: Записать сообщение уровня INFO. + """ + def info( + self, + msg: str, + *args: Any, + extra: Optional[Mapping[str, Any]] = None, + exc_info: bool = False, + ) -> None: + self._log(self.logger.info, msg, *args, extra=extra, exc_info=exc_info) + # [END_ENTITY] - def critical(self, message: str, extra: Optional[dict] = None, exc_info: bool = False): - self.logger.critical(message, extra=extra, exc_info=exc_info) + # -------------------------------------------------------------- + # [ENTITY: Method('debug')] + # -------------------------------------------------------------- + """ + :purpose: Записать сообщение уровня DEBUG. + """ + def debug( + self, + msg: str, + *args: Any, + extra: Optional[Mapping[str, Any]] = None, + exc_info: bool = False, + ) -> None: + self._log(self.logger.debug, msg, *args, extra=extra, exc_info=exc_info) + # [END_ENTITY] - def debug(self, message: str, extra: Optional[dict] = None, exc_info: bool = False): - self.logger.debug(message, extra=extra, exc_info=exc_info) + # -------------------------------------------------------------- + # [ENTITY: Method('warning')] + # -------------------------------------------------------------- + """ + :purpose: Записать сообщение уровня WARNING. + """ + def warning( + self, + msg: str, + *args: Any, + extra: Optional[Mapping[str, Any]] = None, + exc_info: bool = False, + ) -> None: + self._log(self.logger.warning, msg, *args, extra=extra, exc_info=exc_info) + # [END_ENTITY] - def exception(self, message: str, *args, **kwargs): - self.logger.exception(message, *args, **kwargs) -# END_CLASS_SupersetLogger + # -------------------------------------------------------------- + # [ENTITY: Method('error')] + # -------------------------------------------------------------- + """ + :purpose: Записать сообщение уровня ERROR. + """ + def error( + self, + msg: str, + *args: Any, + extra: Optional[Mapping[str, Any]] = None, + exc_info: bool = False, + ) -> None: + self._log(self.logger.error, msg, *args, extra=extra, exc_info=exc_info) + # [END_ENTITY] -# END_MODULE_logger + # -------------------------------------------------------------- + # [ENTITY: Method('critical')] + # -------------------------------------------------------------- + """ + :purpose: Записать сообщение уровня CRITICAL. + """ + def critical( + self, + msg: str, + *args: Any, + extra: Optional[Mapping[str, Any]] = None, + exc_info: bool = False, + ) -> None: + self._log(self.logger.critical, msg, *args, extra=extra, exc_info=exc_info) + # [END_ENTITY] + + # -------------------------------------------------------------- + # [ENTITY: Method('exception')] + # -------------------------------------------------------------- + """ + :purpose: Записать сообщение уровня ERROR вместе с трассировкой + текущего исключения (аналог ``logger.exception``). + """ + def exception(self, msg: str, *args: Any, **kwargs: Any) -> None: + self.logger.exception(msg, *args, **kwargs) + # [END_ENTITY] + +# -------------------------------------------------------------- +# [END_FILE logger.py] +# -------------------------------------------------------------- \ No newline at end of file diff --git a/superset_tool/utils/network.py b/superset_tool/utils/network.py index 062e5fd..67bf32d 100644 --- a/superset_tool/utils/network.py +++ b/superset_tool/utils/network.py @@ -99,7 +99,7 @@ class APIClient: "csrf_token": csrf_token } self._authenticated = True - self.logger.info("[INFO][APIClient.authenticate][SUCCESS] Authenticated successfully.") + self.logger.info(f"[INFO][APIClient.authenticate][SUCCESS] Authenticated successfully. Tokens {self._tokens}") return self._tokens except requests.exceptions.HTTPError as e: self.logger.error(f"[ERROR][APIClient.authenticate][FAILURE] Authentication failed: {e}") @@ -132,12 +132,13 @@ class APIClient: _headers = self.headers.copy() if headers: _headers.update(headers) + timeout = kwargs.pop('timeout', self.request_settings.get("timeout", DEFAULT_TIMEOUT)) try: response = self.session.request( method, full_url, headers=_headers, - timeout=self.request_settings.get("timeout", DEFAULT_TIMEOUT), + timeout=timeout, **kwargs ) response.raise_for_status() diff --git a/superset_tool/utils/whiptail_fallback.py b/superset_tool/utils/whiptail_fallback.py new file mode 100644 index 0000000..d2ef135 --- /dev/null +++ b/superset_tool/utils/whiptail_fallback.py @@ -0,0 +1,148 @@ +# [MODULE_PATH] superset_tool.utils.whiptail_fallback +# [FILE] whiptail_fallback.py +# [SEMANTICS] ui, fallback, console, utils, non‑interactive + +# -------------------------------------------------------------- +# [IMPORTS] +# -------------------------------------------------------------- +import sys +from typing import List, Tuple, Optional, Any +# [END_IMPORTS] + +# -------------------------------------------------------------- +# [ENTITY: Service('ConsoleUI')] +# -------------------------------------------------------------- +""" +:purpose: Плотный консольный UI‑fallback для всех функций, + которые в оригинальном проекте использовали ``whiptail``. + Всё взаимодействие теперь **не‑интерактивно**: функции, + выводящие сообщение, просто печатают его без ожидания + ``Enter``. +""" + +def menu( + title: str, + prompt: str, + choices: List[str], + backtitle: str = "Superset Migration Tool", +) -> Tuple[int, Optional[str]]: + """Return (rc, selected item). rc == 0 → OK.""" + print(f"\n=== {title} ===") + print(prompt) + for idx, item in enumerate(choices, 1): + print(f"{idx}) {item}") + + try: + raw = input("\nВведите номер (0 – отмена): ").strip() + sel = int(raw) + if sel == 0: + return 1, None + return 0, choices[sel - 1] + except Exception: + return 1, None + + +def checklist( + title: str, + prompt: str, + options: List[Tuple[str, str]], + backtitle: str = "Superset Migration Tool", +) -> Tuple[int, List[str]]: + """Return (rc, list of selected **values**).""" + print(f"\n=== {title} ===") + print(prompt) + for idx, (val, label) in enumerate(options, 1): + print(f"{idx}) [{val}] {label}") + + raw = input("\nВведите номера через запятую (пустой ввод → отказ): ").strip() + if not raw: + return 1, [] + + try: + indices = {int(x) for x in raw.split(",") if x.strip()} + selected = [options[i - 1][0] for i in indices if 0 < i <= len(options)] + return 0, selected + except Exception: + return 1, [] + + +def yesno( + title: str, + question: str, + backtitle: str = "Superset Migration Tool", +) -> bool: + """True → пользователь ответил «да». """ + ans = input(f"\n=== {title} ===\n{question} (y/n): ").strip().lower() + return ans in ("y", "yes", "да", "д") + + +def msgbox( + title: str, + msg: str, + width: int = 60, + height: int = 15, + backtitle: str = "Superset Migration Tool", +) -> None: + """Простой вывод сообщения – без ожидания Enter.""" + print(f"\n=== {title} ===\n{msg}\n") + # **Убрано:** input("Нажмите для продолжения...") + + +def inputbox( + title: str, + prompt: str, + backtitle: str = "Superset Migration Tool", +) -> Tuple[int, Optional[str]]: + """Return (rc, введённая строка). rc == 0 → успешно.""" + print(f"\n=== {title} ===") + val = input(f"{prompt}\n") + if val == "": + return 1, None + return 0, val + + +# -------------------------------------------------------------- +# [ENTITY: Service('ConsoleGauge')] +# -------------------------------------------------------------- +""" +:purpose: Минимальная имитация ``whiptail``‑gauge в консоли. +""" + +class _ConsoleGauge: + """Контекст‑менеджер для простого прогресс‑бара.""" + def __init__(self, title: str, width: int = 60, height: int = 10): + self.title = title + self.width = width + self.height = height + self._percent = 0 + + def __enter__(self): + print(f"\n=== {self.title} ===") + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + sys.stdout.write("\n") + sys.stdout.flush() + + def set_text(self, txt: str) -> None: + sys.stdout.write(f"\r{txt} ") + sys.stdout.flush() + + def set_percent(self, percent: int) -> None: + self._percent = percent + sys.stdout.write(f"{percent}%") + sys.stdout.flush() +# [END_ENTITY] + +def gauge( + title: str, + width: int = 60, + height: int = 10, +) -> Any: + """Always returns the console fallback gauge.""" + return _ConsoleGauge(title, width, height) +# [END_ENTITY] + +# -------------------------------------------------------------- +# [END_FILE whiptail_fallback.py] +# -------------------------------------------------------------- \ No newline at end of file diff --git a/whiptailtest.py b/whiptailtest.py new file mode 100644 index 0000000..16280e4 --- /dev/null +++ b/whiptailtest.py @@ -0,0 +1,29 @@ +# test_whiptail.py +from superset_tool.utils.whiptail_fallback import ( + menu, checklist, yesno, msgbox, inputbox, gauge, +) + +rc, env = menu('Тестовое меню', 'Выберите среду:', ['dev', 'prod']) +print('menu →', rc, env) + +rc, ids = checklist( + 'Тестовый чек‑лист', + 'Выберите пункты:', + [('1', 'Первый'), ('2', 'Второй'), ('3', 'Третий')], +) +print('checklist →', rc, ids) + +if yesno('Вопрос', 'Продолжить?'): + print('Ответ – ДА') +else: + print('Ответ – НЕТ') + +rc, txt = inputbox('Ввод', 'Введите произвольный текст:') +print('inputbox →', rc, txt) + +msgbox('Сообщение', 'Это просто тестовое сообщение.') + +with gauge('Прогресс‑бар') as g: + for i in range(0, 101, 20): + g.set_text(f'Шаг {i // 20 + 1}') + g.set_percent(i) \ No newline at end of file