backup worked

This commit is contained in:
Volobuev Andrey
2025-10-06 13:59:30 +03:00
parent 2f8aea3620
commit 8f6b44c679
7 changed files with 1144 additions and 480 deletions

View File

@@ -8,7 +8,7 @@
import logging
import sys
from pathlib import Path
from dataclasses import dataclass
from dataclasses import dataclass,field
# [IMPORTS] Third-party
from requests.exceptions import RequestException
@@ -37,17 +37,18 @@ class BackupConfig:
consolidate: bool = True
rotate_archive: bool = True
clean_folders: bool = True
retention_policy: RetentionPolicy = RetentionPolicy()
retention_policy: RetentionPolicy = field(default_factory=RetentionPolicy)
# [ENTITY: Function('backup_dashboards')]
# CONTRACT:
# PURPOSE: Выполняет бэкап всех доступных дашбордов для заданного клиента и окружения.
# PURPOSE: Выполняет бэкап всех доступных дашбордов для заданного клиента и окружения, пропуская ошибки экспорта.
# PRECONDITIONS:
# - `client` должен быть инициализированным экземпляром `SupersetClient`.
# - `env_name` должен быть строкой, обозначающей окружение.
# - `backup_root` должен быть валидным путем к корневой директории бэкапа.
# POSTCONDITIONS:
# - Дашборды экспортируются и сохраняются.
# - Ошибки экспорта логируются и не приводят к остановке скрипта.
# - Возвращает `True` если все дашборды были экспортированы без критических ошибок, `False` иначе.
def backup_dashboards(
client: SupersetClient,
@@ -90,7 +91,9 @@ def backup_dashboards(
success_count += 1
except (SupersetAPIError, RequestException, IOError, OSError) as db_error:
logger.error(f"[STATE][backup_dashboards][FAILURE] Failed to export dashboard {dashboard_title}: {db_error}", exc_info=True)
logger.error(f"[STATE][backup_dashboards][FAILURE] Failed to export dashboard {dashboard_title} (ID: {dashboard_id}): {db_error}", exc_info=True)
# Продолжаем обработку других дашбордов
continue
if config.consolidate:
consolidate_archive_folders(backup_root / env_name , logger=logger)
@@ -125,6 +128,7 @@ def main() -> int:
backup_config = BackupConfig(rotate_archive=True)
for env in environments:
try:
results[env] = backup_dashboards(
clients[env],
env.upper(),
@@ -132,6 +136,10 @@ def main() -> int:
logger=logger,
config=backup_config
)
except Exception as env_error:
logger.critical(f"[STATE][main][FAILURE] Critical error for environment {env}: {env_error}", exc_info=True)
# Продолжаем обработку других окружений
results[env] = False
if not all(results.values()):
exit_code = 1

View File

@@ -1,237 +1,442 @@
# -*- coding: utf-8 -*-
# CONTRACT:
# PURPOSE: Интерактивный скрипт для миграции ассетов Superset между различными окружениями.
# SPECIFICATION_LINK: mod_migration_script
# PRECONDITIONS: Наличие корректных конфигурационных файлов для подключения к Superset.
# POSTCONDITIONS: Выбранные ассеты успешно перенесены из исходного в целевое окружение.
# IMPORTS: [argparse, superset_tool.client, superset_tool.utils.init_clients, superset_tool.utils.logger, superset_tool.utils.fileio]
"""
[MODULE] Superset Migration Tool
@description: Интерактивный скрипт для миграции ассетов Superset между различными окружениями.
"""
from whiptail import Whiptail
# [MODULE_PATH] superset_tool.migration_script
# [FILE] migration_script.py
# [SEMANTICS] migration, cli, superset, ui, logging, fallback, error-recovery, non-interactive, temp-files, batch-delete
# --------------------------------------------------------------
# [IMPORTS]
# --------------------------------------------------------------
import json
import logging
import sys
import zipfile
from pathlib import Path
from typing import List, Optional, Tuple, Dict
from superset_tool.client import SupersetClient
from superset_tool.utils.init_clients import setup_clients
from superset_tool.utils.logger import SupersetLogger
from superset_tool.utils.fileio import (
save_and_unpack_dashboard,
read_dashboard_from_disk,
create_temp_file, # новый контекстный менеджер
update_yamls,
create_dashboard_export
create_dashboard_export,
)
from superset_tool.utils.whiptail_fallback import (
menu,
checklist,
yesno,
msgbox,
inputbox,
gauge,
)
# [ENTITY: Class('Migration')]
# CONTRACT:
# PURPOSE: Инкапсулирует логику и состояние процесса миграции.
# SPECIFICATION_LINK: class_migration
# ATTRIBUTES:
# - name: logger, type: SupersetLogger, description: Экземпляр логгера.
# - name: from_c, type: SupersetClient, description: Клиент для исходного окружения.
# - name: to_c, type: SupersetClient, description: Клиент для целевого окружения.
# - name: dashboards_to_migrate, type: list, description: Список дашбордов для миграции.
# - name: db_config_replacement, type: dict, description: Конфигурация для замены данных БД.
from superset_tool.utils.logger import SupersetLogger # type: ignore
# [END_IMPORTS]
# --------------------------------------------------------------
# [ENTITY: Service('Migration')]
# [RELATION: Service('Migration')] -> [DEPENDS_ON] -> [PythonModule('superset_tool.client')]
# --------------------------------------------------------------
"""
:purpose: Интерактивный процесс миграции дашбордов с возможностью
«удалить‑и‑перезаписать» при ошибке импорта.
:preconditions:
- Конфигурация Supersetклиентов доступна,
- Пользователь может взаимодействовать через консольный UI.
:postconditions:
- Выбранные дашборды импортированы в целевое окружение.
:sideeffect: Записывает журнал в каталог ``logs/`` текущего рабочего каталога.
"""
class Migration:
"""
Класс для управления процессом миграции дашбордов Superset.
:ivar SupersetLogger logger: Логгер.
:ivar bool enable_delete_on_failure: Флаг «удалять‑при‑ошибке».
:ivar SupersetClient from_c: Клиент‑источник.
:ivar SupersetClient to_c: Клиент‑назначение.
:ivar List[dict] dashboards_to_migrate: Список выбранных дашбордов.
:ivar Optional[dict] db_config_replacement: Параметры замены имён БД.
:ivar List[dict] _failed_imports: Внутренний буфер неудавшихся импортов
(ключи: slug, zip_content, dash_id).
"""
def __init__(self):
self.logger = SupersetLogger(name="migration_script")
self.from_c: SupersetClient = None
self.to_c: SupersetClient = None
self.dashboards_to_migrate = []
self.db_config_replacement = None
# END_FUNCTION___init__
# [ENTITY: Function('run')]
# CONTRACT:
# PURPOSE: Запускает основной воркфлоу миграции, координируя все шаги.
# SPECIFICATION_LINK: func_run_migration
# PRECONDITIONS: None
# POSTCONDITIONS: Процесс миграции завершен.
def run(self):
"""Запускает основной воркфлоу миграции."""
# --------------------------------------------------------------
# [ENTITY: Method('__init__')]
# --------------------------------------------------------------
"""
:purpose: Создать сервис миграции и настроить логгер.
:preconditions: None.
:postconditions: ``self.logger`` готов к использованию; ``enable_delete_on_failure`` = ``False``.
"""
def __init__(self) -> None:
default_log_dir = Path.cwd() / "logs"
self.logger = SupersetLogger(
name="migration_script",
log_dir=default_log_dir,
level=logging.INFO,
console=True,
)
self.enable_delete_on_failure = False
self.from_c: Optional[SupersetClient] = None
self.to_c: Optional[SupersetClient] = None
self.dashboards_to_migrate: List[dict] = []
self.db_config_replacement: Optional[dict] = None
self._failed_imports: List[dict] = [] # <-- буфер ошибок
assert self.logger is not None, "Logger must be instantiated."
# [END_ENTITY]
# --------------------------------------------------------------
# [ENTITY: Method('run')]
# --------------------------------------------------------------
"""
:purpose: Точка входа последовательный запуск всех шагов миграции.
:preconditions: Логгер готов.
:postconditions: Скрипт завершён, пользователю выведено сообщение.
"""
def run(self) -> None:
self.logger.info("[INFO][run][ENTER] Запуск скрипта миграции.")
self.ask_delete_on_failure()
self.select_environments()
self.select_dashboards()
self.confirm_db_config_replacement()
self.execute_migration()
self.logger.info("[INFO][run][EXIT] Скрипт миграции завершен.")
# END_FUNCTION_run
self.logger.info("[INFO][run][EXIT] Скрипт миграции завершён.")
# [END_ENTITY]
# [ENTITY: Function('select_environments')]
# CONTRACT:
# PURPOSE: Шаг 1. Обеспечивает интерактивный выбор исходного и целевого окружений.
# SPECIFICATION_LINK: func_select_environments
# PRECONDITIONS: None
# POSTCONDITIONS: Атрибуты `self.from_c` и `self.to_c` инициализированы валидными клиентами Superset.
def select_environments(self):
"""Шаг 1: Выбор окружений (источник и назначение)."""
self.logger.info("[INFO][select_environments][ENTER] Шаг 1/4: Выбор окружений.")
# --------------------------------------------------------------
# [ENTITY: Method('ask_delete_on_failure')]
# --------------------------------------------------------------
"""
:purpose: Запросить у пользователя, следует ли удалять дашборд при ошибке импорта.
:preconditions: None.
:postconditions: ``self.enable_delete_on_failure`` установлен.
"""
def ask_delete_on_failure(self) -> None:
self.enable_delete_on_failure = yesno(
"Поведение при ошибке импорта",
"Если импорт завершится ошибкой, удалить существующий дашборд и попытаться импортировать заново?",
)
self.logger.info(
"[INFO][ask_delete_on_failure] Deleteonfailure = %s",
self.enable_delete_on_failure,
)
# [END_ENTITY]
# --------------------------------------------------------------
# [ENTITY: Method('select_environments')]
# --------------------------------------------------------------
"""
:purpose: Выбрать исходное и целевое окружения Superset.
:preconditions: ``setup_clients`` успешно инициализирует все клиенты.
:postconditions: ``self.from_c`` и ``self.to_c`` установлены.
"""
def select_environments(self) -> None:
self.logger.info("[INFO][select_environments][ENTER] Шаг1/5: Выбор окружений.")
try:
all_clients = setup_clients(self.logger)
available_envs = list(all_clients.keys())
except Exception as e:
self.logger.error(f"[ERROR][select_environments][FAILURE] Ошибка при инициализации клиентов: {e}", exc_info=True)
w = Whiptail(title="Ошибка", backtitle="Superset Migration Tool")
w.msgbox("Не удалось инициализировать клиенты. Проверьте конфигурацию.")
self.logger.error("[ERROR][select_environments] %s", e, exc_info=True)
msgbox("Ошибка", "Не удалось инициализировать клиенты.")
return
w = Whiptail(title="Выбор окружения", backtitle="Superset Migration Tool")
# Select source environment
(return_code, from_env_name) = w.menu("Выберите исходное окружение:", available_envs)
if return_code == 0:
rc, from_env_name = menu(
title="Выбор окружения",
prompt="Исходное окружение:",
choices=available_envs,
)
if rc != 0:
return
self.from_c = all_clients[from_env_name]
self.logger.info(f"[INFO][select_environments][STATE] Исходное окружение: {from_env_name}")
else:
return
self.logger.info("[INFO][select_environments] from = %s", from_env_name)
# Select target environment
available_envs.remove(from_env_name)
(return_code, to_env_name) = w.menu("Выберите целевое окружение:", available_envs)
if return_code == 0:
rc, to_env_name = menu(
title="Выбор окружения",
prompt="Целевое окружение:",
choices=available_envs,
)
if rc != 0:
return
self.to_c = all_clients[to_env_name]
self.logger.info(f"[INFO][select_environments][STATE] Целевое окружение: {to_env_name}")
else:
return
self.logger.info("[INFO][select_environments][EXIT] Шаг 1 завершен.")
# END_FUNCTION_select_environments
# [ENTITY: Function('select_dashboards')]
# CONTRACT:
# PURPOSE: Шаг 2. Обеспечивает интерактивный выбор дашбордов для миграции.
# SPECIFICATION_LINK: func_select_dashboards
# PRECONDITIONS: `self.from_c` должен быть инициализирован.
# POSTCONDITIONS: `self.dashboards_to_migrate` содержит список выбранных дашбордов.
def select_dashboards(self):
"""Шаг 2: Выбор дашбордов для миграции."""
self.logger.info("[INFO][select_dashboards][ENTER] Шаг 2/4: Выбор дашбордов.")
self.logger.info("[INFO][select_environments] to = %s", to_env_name)
self.logger.info("[INFO][select_environments][EXIT] Шаг1 завершён.")
# [END_ENTITY]
# --------------------------------------------------------------
# [ENTITY: Method('select_dashboards')]
# --------------------------------------------------------------
"""
:purpose: Позволить пользователю выбрать набор дашбордов для миграции.
:preconditions: ``self.from_c`` инициализирован.
:postconditions: ``self.dashboards_to_migrate`` заполнен.
"""
def select_dashboards(self) -> None:
self.logger.info("[INFO][select_dashboards][ENTER] Шаг2/5: Выбор дашбордов.")
try:
_, all_dashboards = self.from_c.get_dashboards()
_, all_dashboards = self.from_c.get_dashboards() # type: ignore[attr-defined]
if not all_dashboards:
self.logger.warning("[WARN][select_dashboards][STATE] В исходном окружении не найдено дашбордов.")
w = Whiptail(title="Информация", backtitle="Superset Migration Tool")
w.msgbox("В исходном окружении не найдено дашбордов.")
self.logger.warning("[WARN][select_dashboards] No dashboards.")
msgbox("Информация", "В исходном окружении нет дашбордов.")
return
w = Whiptail(title="Выбор дашбордов", backtitle="Superset Migration Tool")
options = [("ALL", "Все дашборды")] + [
(str(d["id"]), d["dashboard_title"]) for d in all_dashboards
]
dashboard_options = [(str(d['id']), d['dashboard_title']) for d in all_dashboards]
rc, selected = checklist(
title="Выбор дашбордов",
prompt="Отметьте нужные дашборды (введите номера):",
options=options,
)
if rc != 0:
return
(return_code, selected_ids) = w.checklist("Выберите дашборды для миграции:", dashboard_options)
if return_code == 0:
self.dashboards_to_migrate = [d for d in all_dashboards if str(d['id']) in selected_ids]
self.logger.info(f"[INFO][select_dashboards][STATE] Выбрано дашбордов: {len(self.dashboards_to_migrate)}")
if "ALL" in selected:
self.dashboards_to_migrate = list(all_dashboards)
self.logger.info(
"[INFO][select_dashboards] Выбраны все дашборды (%d).",
len(self.dashboards_to_migrate),
)
return
self.dashboards_to_migrate = [
d for d in all_dashboards if str(d["id"]) in selected
]
self.logger.info(
"[INFO][select_dashboards] Выбрано %d дашбордов.",
len(self.dashboards_to_migrate),
)
except Exception as e:
self.logger.error(f"[ERROR][select_dashboards][FAILURE] Ошибка при получении или выборе дашбордов: {e}", exc_info=True)
w = Whiptail(title="Ошибка", backtitle="Superset Migration Tool")
w.msgbox("Произошла ошибка при работе с дашбордами.")
self.logger.error("[ERROR][select_dashboards] %s", e, exc_info=True)
msgbox("Ошибка", "Не удалось получить список дашбордов.")
self.logger.info("[INFO][select_dashboards][EXIT] Шаг2 завершён.")
# [END_ENTITY]
self.logger.info("[INFO][select_dashboards][EXIT] Шаг 2 завершен.")
# END_FUNCTION_select_dashboards
# [ENTITY: Function('confirm_db_config_replacement')]
# CONTRACT:
# PURPOSE: Шаг 3. Управляет процессом подтверждения и настройки замены конфигураций БД.
# SPECIFICATION_LINK: func_confirm_db_config_replacement
# PRECONDITIONS: `self.from_c` и `self.to_c` инициализированы.
# POSTCONDITIONS: `self.db_config_replacement` содержит конфигурацию для замены или `None`.
def confirm_db_config_replacement(self):
"""Шаг 3: Подтверждение и настройка замены конфигурации БД."""
self.logger.info("[INFO][confirm_db_config_replacement][ENTER] Шаг 3/4: Замена конфигурации БД.")
w = Whiptail(title="Замена конфигурации БД", backtitle="Superset Migration Tool")
if w.yesno("Хотите ли вы заменить конфигурации баз данных в YAML-файлах?"):
(return_code, old_db_name) = w.inputbox("Введите имя заменяемой базы данных (например, db_dev):")
if return_code != 0:
# --------------------------------------------------------------
# [ENTITY: Method('confirm_db_config_replacement')]
# --------------------------------------------------------------
"""
:purpose: Запросить у пользователя, требуется ли заменить имена БД в YAMLфайлах.
:preconditions: None.
:postconditions: ``self.db_config_replacement`` либо ``None``, либо заполнен.
"""
def confirm_db_config_replacement(self) -> None:
if yesno("Замена БД", "Заменить конфигурацию БД в YAMLфайлах?"):
rc, old_name = inputbox("Замена БД", "Старое имя БД (например, db_dev):")
if rc != 0:
return
(return_code, new_db_name) = w.inputbox("Введите новое имя базы данных (например, db_prod):")
if return_code != 0:
rc, new_name = inputbox("Замена БД", "Новое имя БД (например, db_prod):")
if rc != 0:
return
self.db_config_replacement = {"old": {"database_name": old_db_name}, "new": {"database_name": new_db_name}}
self.logger.info(f"[INFO][confirm_db_config_replacement][STATE] Установлена ручная замена: {self.db_config_replacement}")
self.db_config_replacement = {
"old": {"database_name": old_name},
"new": {"database_name": new_name},
}
self.logger.info(
"[INFO][confirm_db_config_replacement] Replacement set: %s",
self.db_config_replacement,
)
else:
self.logger.info("[INFO][confirm_db_config_replacement][STATE] Замена конфигурации БД пропущена.")
self.logger.info("[INFO][confirm_db_config_replacement] Skipped.")
# [END_ENTITY]
self.logger.info("[INFO][confirm_db_config_replacement][EXIT] Шаг 3 завершен.")
# END_FUNCTION_confirm_db_config_replacement
# [ENTITY: Function('execute_migration')]
# CONTRACT:
# PURPOSE: Шаг 4. Выполняет фактическую миграцию выбранных дашбордов.
# SPECIFICATION_LINK: func_execute_migration
# PRECONDITIONS: Все предыдущие шаги (`select_environments`, `select_dashboards`) успешно выполнены.
# POSTCONDITIONS: Выбранные дашборды перенесены в целевое окружение.
def execute_migration(self):
"""Шаг 4: Выполнение миграции и обновления конфигураций."""
self.logger.info("[INFO][execute_migration][ENTER] Шаг 4/4: Выполнение миграции.")
w = Whiptail(title="Выполнение миграции", backtitle="Superset Migration Tool")
if not self.dashboards_to_migrate:
self.logger.warning("[WARN][execute_migration][STATE] Нет дашбордов для миграции.")
w.msgbox("Нет дашбордов для миграции. Завершение.")
# --------------------------------------------------------------
# [ENTITY: Method('_batch_delete_by_ids')]
# --------------------------------------------------------------
"""
:purpose: Удалить набор дашбордов по их ID единым запросом.
:preconditions:
- ``ids`` непустой список целых чисел.
:postconditions: Все указанные дашборды удалены (если они существовали).
:sideeffect: Делает HTTPзапрос ``DELETE /dashboard/?q=[ids]``.
"""
def _batch_delete_by_ids(self, ids: List[int]) -> None:
if not ids:
self.logger.debug("[DEBUG][_batch_delete_by_ids] Empty ID list nothing to delete.")
return
total_dashboards = len(self.dashboards_to_migrate)
self.logger.info(f"[INFO][execute_migration][STATE] Начало миграции {total_dashboards} дашбордов.")
with w.gauge("Выполняется миграция...", width=60, height=10) as gauge:
for i, dashboard in enumerate(self.dashboards_to_migrate):
self.logger.info("[INFO][_batch_delete_by_ids] Deleting dashboards IDs: %s", ids)
# Формируем параметр q в виде JSONмассива, как требует Superset.
q_param = json.dumps(ids)
response = self.to_c.network.request(
method="DELETE",
endpoint="/dashboard/",
params={"q": q_param},
)
# Superset обычно отвечает 200/204; проверяем поле ``result`` при наличии.
if isinstance(response, dict) and response.get("result", True) is False:
self.logger.warning("[WARN][_batch_delete_by_ids] Unexpected delete response: %s", response)
else:
self.logger.info("[INFO][_batch_delete_by_ids] Delete request completed.")
# [END_ENTITY]
# --------------------------------------------------------------
# [ENTITY: Method('execute_migration')]
# --------------------------------------------------------------
"""
:purpose: Выполнить экспорт‑импорт выбранных дашбордов, при необходимости
обновив YAMLфайлы. При ошибке импортов сохраняем slug, а потом
удаляем проблемные дашборды **по ID**, получив их через slug.
:preconditions:
- ``self.dashboards_to_migrate`` не пуст,
- ``self.from_c`` и ``self.to_c`` инициализированы.
:postconditions:
- Все успешные дашборды импортированы,
- Неудачные дашборды, если пользователь выбрал «удалять‑при‑ошибке»,
удалены и повторно импортированы.
:sideeffect: При включённом флаге ``enable_delete_on_failure`` производится
батч‑удаление и повторный импорт.
"""
def execute_migration(self) -> None:
if not self.dashboards_to_migrate:
self.logger.warning("[WARN][execute_migration] No dashboards to migrate.")
msgbox("Информация", "Нет дашбордов для миграции.")
return
total = len(self.dashboards_to_migrate)
self.logger.info("[INFO][execute_migration] Starting migration of %d dashboards.", total)
# Передаём режим клиенту‑назначению
self.to_c.delete_before_reimport = self.enable_delete_on_failure # type: ignore[attr-defined]
# -----------------------------------------------------------------
# 1⃣ Основной проход экспорт → импорт → сбор ошибок
# -----------------------------------------------------------------
with gauge("Миграция...", width=60, height=10) as g:
for i, dash in enumerate(self.dashboards_to_migrate):
dash_id = dash["id"]
dash_slug = dash.get("slug") # slug нужен для дальнейшего поиска
title = dash["dashboard_title"]
progress = int((i / total) * 100)
g.set_text(f"Миграция: {title} ({i + 1}/{total})")
g.set_percent(progress)
try:
dashboard_id = dashboard['id']
dashboard_title = dashboard['dashboard_title']
# ------------------- Экспорт -------------------
exported_content, _ = self.from_c.export_dashboard(dash_id) # type: ignore[attr-defined]
progress = int((i / total_dashboards) * 100)
self.logger.debug(f"[DEBUG][execute_migration][PROGRESS] {progress}% - Миграция: {dashboard_title}")
gauge.set_text(f"Миграция: {dashboard_title} ({i+1}/{total_dashboards})")
gauge.set_percent(progress)
# ------------------- Временный ZIP -------------------
with create_temp_file(
content=exported_content,
suffix=".zip",
logger=self.logger,
) as tmp_zip_path:
self.logger.debug("[DEBUG][temp_zip] Temporary ZIP at %s", tmp_zip_path)
self.logger.info(f"[INFO][execute_migration][PROGRESS] Миграция дашборда: {dashboard_title} (ID: {dashboard_id})")
# ------------------- Распаковка во временный каталог -------------------
with create_temp_file(suffix=".dir", logger=self.logger) as tmp_unpack_dir:
self.logger.debug("[DEBUG][temp_dir] Temporary unpack dir: %s", tmp_unpack_dir)
# 1. Экспорт
exported_content, _ = self.from_c.export_dashboard(dashboard_id)
zip_path, unpacked_path = save_and_unpack_dashboard(exported_content, f"temp_export_{dashboard_id}", unpack=True)
self.logger.info(f"[INFO][execute_migration][STATE] Дашборд экспортирован и распакован в {unpacked_path}")
with zipfile.ZipFile(tmp_zip_path, "r") as zip_ref:
zip_ref.extractall(tmp_unpack_dir)
self.logger.info("[INFO][execute_migration] Export unpacked to %s", tmp_unpack_dir)
# 2. Обновление YAML, если нужно
# ------------------- YAMLобновление (если нужно) -------------------
if self.db_config_replacement:
update_yamls(db_configs=[self.db_config_replacement], path=str(unpacked_path))
self.logger.info(f"[INFO][execute_migration][STATE] YAML-файлы обновлены.")
update_yamls(
db_configs=[self.db_config_replacement],
path=str(tmp_unpack_dir),
)
self.logger.info("[INFO][execute_migration] YAMLfiles updated.")
# 3. Упаковка и импорт
new_zip_path = f"migrated_dashboard_{dashboard_id}.zip"
create_dashboard_export(new_zip_path, [str(unpacked_path)])
# ------------------- Сборка нового ZIP -------------------
with create_temp_file(suffix=".zip", logger=self.logger) as tmp_new_zip:
create_dashboard_export(
zip_path=tmp_new_zip,
source_paths=[str(tmp_unpack_dir)],
)
self.logger.info("[INFO][execute_migration] Repacked to %s", tmp_new_zip)
self.to_c.import_dashboard(new_zip_path)
self.logger.info(f"[INFO][execute_migration][SUCCESS] Дашборд {dashboard_title} успешно импортирован.")
# ------------------- Импорт -------------------
self.to_c.import_dashboard(
file_name=tmp_new_zip,
dash_id=dash_id,
dash_slug=dash_slug,
) # type: ignore[attr-defined]
except Exception as e:
self.logger.error(f"[ERROR][execute_migration][FAILURE] Ошибка при миграции дашборда {dashboard_title}: {e}", exc_info=True)
error_msg = f"Не удалось смигрировать дашборд: {dashboard_title}.\n\nОшибка: {e}"
w.msgbox(error_msg, width=60, height=15)
# Если импорт прошёл без исключений фиксируем успех
self.logger.info("[INFO][execute_migration][SUCCESS] Dashboard %s imported.", title)
gauge.set_percent(100)
except Exception as exc:
# Сохраняем данные для повторного импорта после batchудаления
self.logger.error("[ERROR][execute_migration] %s", exc, exc_info=True)
self._failed_imports.append(
{
"slug": dash_slug,
"dash_id": dash_id,
"zip_content": exported_content,
}
)
msgbox("Ошибка", f"Не удалось мигрировать дашборд {title}.\n\n{exc}")
self.logger.info("[INFO][execute_migration][STATE] Миграция завершена.")
w.msgbox("Миграция завершена!", width=40, height=8)
self.logger.info("[INFO][execute_migration][EXIT] Шаг 4 завершен.")
# END_FUNCTION_execute_migration
g.set_percent(100)
# END_CLASS_Migration
# -----------------------------------------------------------------
# 2⃣ Если возникли ошибки и пользователь согласился удалять удаляем и повторяем
# -----------------------------------------------------------------
if self.enable_delete_on_failure and self._failed_imports:
self.logger.info(
"[INFO][execute_migration] %d dashboards failed. Starting recovery procedure.",
len(self._failed_imports),
)
# [MAIN_EXECUTION_BLOCK]
# ------------------- Получаем список дашбордов в целевом окружении -------------------
_, target_dashboards = self.to_c.get_dashboards() # type: ignore[attr-defined]
slug_to_id: Dict[str, int] = {
d["slug"]: d["id"] for d in target_dashboards if "slug" in d and "id" in d
}
# ------------------- Формируем список IDов для удаления -------------------
ids_to_delete: List[int] = []
for fail in self._failed_imports:
slug = fail["slug"]
if slug and slug in slug_to_id:
ids_to_delete.append(slug_to_id[slug])
else:
self.logger.warning(
"[WARN][execute_migration] Unable to map slug '%s' to ID on target.",
slug,
)
# ------------------- Batchудаление -------------------
self._batch_delete_by_ids(ids_to_delete)
# ------------------- Повторный импорт только для проблемных дашбордов -------------------
for fail in self._failed_imports:
dash_slug = fail["slug"]
dash_id = fail["dash_id"]
zip_content = fail["zip_content"]
# Один раз создаём временный ZIPфайл из сохранённого содержимого
with create_temp_file(
content=zip_content,
suffix=".zip",
logger=self.logger,
) as retry_zip_path:
self.logger.debug("[DEBUG][retry_zip] Retry ZIP for slug %s at %s", dash_slug, retry_zip_path)
# Пере‑импортируем **slug** передаётся, но клиент будет использовать ID
self.to_c.import_dashboard(
file_name=retry_zip_path,
dash_id=dash_id,
dash_slug=dash_slug,
) # type: ignore[attr-defined]
self.logger.info("[INFO][execute_migration][RECOVERED] Dashboard slug '%s' reimported.", dash_slug)
# -----------------------------------------------------------------
# 3⃣ Финальная отчётность
# -----------------------------------------------------------------
self.logger.info("[INFO][execute_migration] Migration finished.")
msgbox("Информация", "Миграция завершена!")
# [END_ENTITY]
# [END_ENTITY: Service('Migration')]
# --------------------------------------------------------------
# Точка входа
# --------------------------------------------------------------
if __name__ == "__main__":
migration = Migration()
migration.run()
# END_MAIN_EXECUTION_BLOCK
# END_MODULE_migration_script
Migration().run()
# [END_FILE migration_script.py]
# --------------------------------------------------------------

View File

@@ -1,82 +1,106 @@
# pylint: disable=too-many-arguments,too-many-locals,too-many-statements,too-many-branches,unused-argument
"""
[MODULE] Superset API Client
@contract: Реализует полное взаимодействие с Superset API
"""
# [MODULE_PATH] superset_tool.client
# [FILE] client.py
# [SEMANTICS] superset, api, client, logging, error-handling, slug-support
# [IMPORTS] Стандартная библиотека
# --------------------------------------------------------------
# [IMPORTS]
# --------------------------------------------------------------
import json
from typing import Optional, Dict, Tuple, List, Any, Union
import datetime
from pathlib import Path
import zipfile
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, Union
from requests import Response
# [IMPORTS] Локальные модули
from superset_tool.models import SupersetConfig
from superset_tool.exceptions import (
ExportError,
InvalidZipFormatError
)
from superset_tool.exceptions import ExportError, InvalidZipFormatError
from superset_tool.utils.fileio import get_filename_from_headers
from superset_tool.utils.logger import SupersetLogger
from superset_tool.utils.network import APIClient
# [END_IMPORTS]
# [CONSTANTS]
DEFAULT_TIMEOUT = 30
# [TYPE-ALIASES]
JsonType = Union[Dict[str, Any], List[Dict[str, Any]]]
ResponseType = Tuple[bytes, str]
# --------------------------------------------------------------
# [ENTITY: Service('SupersetClient')]
# [RELATION: Service('SupersetClient')] -> [DEPENDS_ON] -> [PythonModule('superset_tool.utils.network')]
# --------------------------------------------------------------
"""
:purpose: Класс‑обёртка над Superset RESTAPI.
:preconditions:
- ``config`` валидный объект :class:`SupersetConfig`.
- Доступен рабочий HTTPклиент :class:`APIClient`.
:postconditions:
- Объект готов к выполнению запросов (GET, POST, DELETE и т.д.).
:raises:
- :class:`TypeError` при передаче неверного типа конфигурации.
"""
class SupersetClient:
"""[MAIN-CONTRACT] Клиент для работы с Superset API"""
# [ENTITY: Function('__init__')]
# CONTRACT:
# PURPOSE: Инициализация клиента Superset.
# PRECONDITIONS: `config` должен быть валидным `SupersetConfig`.
# POSTCONDITIONS: Клиент успешно инициализирован.
"""
:ivar SupersetLogger logger: Логгер, используемый в клиенте.
:ivar SupersetConfig config: Текущая конфигурация подключения.
:ivar APIClient network: Объект‑обёртка над ``requests``.
:ivar bool delete_before_reimport: Флаг, указывающий,
что при ошибке импорта дашборд следует удалить и повторить импорт.
"""
# --------------------------------------------------------------
# [ENTITY: Method('__init__')]
# --------------------------------------------------------------
"""
:purpose: Инициализировать клиент и передать ему логгер.
:preconditions: ``config`` экземпляр :class:`SupersetConfig`.
:postconditions: Атрибуты ``logger``, ``config`` и ``network`` созданы,
``delete_before_reimport`` установлен в ``False``.
"""
def __init__(self, config: SupersetConfig, logger: Optional[SupersetLogger] = None):
self.logger = logger or SupersetLogger(name="SupersetClient")
self.logger.info("[INFO][SupersetClient.__init__][ENTER] Initializing SupersetClient.")
self.logger.info("[INFO][SupersetClient.__init__] Initializing SupersetClient.")
self._validate_config(config)
self.config = config
self.env = config.env
self.network = APIClient(
config=config.dict(),
verify_ssl=config.verify_ssl,
timeout=config.timeout,
logger=self.logger
logger=self.logger,
)
self.logger.info("[INFO][SupersetClient.__init__][SUCCESS] SupersetClient initialized successfully.")
# END_FUNCTION___init__
self.delete_before_reimport: bool = False
self.logger.info("[INFO][SupersetClient.__init__] SupersetClient initialized.")
# [END_ENTITY]
# [ENTITY: Function('_validate_config')]
# CONTRACT:
# PURPOSE: Валидация конфигурации клиента.
# PRECONDITIONS: `config` должен быть экземпляром `SupersetConfig`.
# POSTCONDITIONS: Конфигурация валидна.
# --------------------------------------------------------------
# [ENTITY: Method('_validate_config')]
# --------------------------------------------------------------
"""
:purpose: Проверить, что передан объект :class:`SupersetConfig`.
:preconditions: ``config`` произвольный объект.
:postconditions: При несовпадении типов возбуждается :class:`TypeError`.
"""
def _validate_config(self, config: SupersetConfig) -> None:
self.logger.debug("[DEBUG][SupersetClient._validate_config][ENTER] Validating config.")
self.logger.debug("[DEBUG][_validate_config][ENTER] Validating SupersetConfig.")
if not isinstance(config, SupersetConfig):
self.logger.error("[ERROR][SupersetClient._validate_config][FAILURE] Invalid config type.")
self.logger.error("[ERROR][_validate_config][FAILURE] Invalid config type.")
raise TypeError("Конфигурация должна быть экземпляром SupersetConfig")
self.logger.debug("[DEBUG][SupersetClient._validate_config][SUCCESS] Config validated.")
# END_FUNCTION__validate_config
self.logger.debug("[DEBUG][_validate_config][SUCCESS] Config is valid.")
# [END_ENTITY]
# --------------------------------------------------------------
# [ENTITY: Property('headers')]
# --------------------------------------------------------------
@property
def headers(self) -> dict:
"""[INTERFACE] Базовые заголовки для API-вызовов."""
"""Базовые HTTPзаголовки, используемые клиентом."""
return self.network.headers
# END_FUNCTION_headers
# [END_ENTITY]
# [ENTITY: Function('get_dashboards')]
# CONTRACT:
# PURPOSE: Получение списка дашбордов с пагинацией.
# PRECONDITIONS: None
# POSTCONDITIONS: Возвращает кортеж с общим количеством и списком дашбордов.
# --------------------------------------------------------------
# [ENTITY: Method('get_dashboards')]
# --------------------------------------------------------------
"""
:purpose: Получить список дашбордов с поддержкой пагинации.
:preconditions: None.
:postconditions: Возвращается кортеж ``(total_count, list_of_dashboards)``.
"""
def get_dashboards(self, query: Optional[Dict] = None) -> Tuple[int, List[Dict]]:
self.logger.info("[INFO][SupersetClient.get_dashboards][ENTER] Getting dashboards.")
self.logger.info("[INFO][get_dashboards][ENTER] Fetching dashboards.")
validated_query = self._validate_query_params(query)
total_count = self._fetch_total_object_count(endpoint="/dashboard/")
paginated_data = self._fetch_all_pages(
@@ -85,236 +109,368 @@ class SupersetClient:
"base_query": validated_query,
"total_count": total_count,
"results_field": "result",
}
},
)
self.logger.info("[INFO][SupersetClient.get_dashboards][SUCCESS] Got dashboards.")
self.logger.info("[INFO][get_dashboards][SUCCESS] Got dashboards.")
return total_count, paginated_data
# END_FUNCTION_get_dashboards
# [END_ENTITY]
# [ENTITY: Function('get_dashboard')]
# CONTRACT:
# PURPOSE: Получение метаданных дашборда по ID или SLUG.
# PRECONDITIONS: `dashboard_id_or_slug` должен существовать.
# POSTCONDITIONS: Возвращает метаданные дашборда.
def get_dashboard(self, dashboard_id_or_slug: str) -> dict:
self.logger.info(f"[INFO][SupersetClient.get_dashboard][ENTER] Getting dashboard: {dashboard_id_or_slug}")
response_data = self.network.request(
method="GET",
endpoint=f"/dashboard/{dashboard_id_or_slug}",
)
self.logger.info(f"[INFO][SupersetClient.get_dashboard][SUCCESS] Got dashboard: {dashboard_id_or_slug}")
return response_data.get("result", {})
# END_FUNCTION_get_dashboard
# [ENTITY: Function('get_datasets')]
# CONTRACT:
# PURPOSE: Получение списка датасетов с пагинацией.
# PRECONDITIONS: None
# POSTCONDITIONS: Возвращает кортеж с общим количеством и списком датасетов.
def get_datasets(self, query: Optional[Dict] = None) -> Tuple[int, List[Dict]]:
self.logger.info("[INFO][SupersetClient.get_datasets][ENTER] Getting datasets.")
total_count = self._fetch_total_object_count(endpoint="/dataset/")
base_query = {
"columns": ["id", "table_name", "sql", "database", "schema"],
"page": 0,
"page_size": 100
}
validated_query = {**base_query, **(query or {})}
datasets = self._fetch_all_pages(
endpoint="/dataset/",
pagination_options={
"base_query": validated_query,
"total_count": total_count,
"results_field": "result",
}
)
self.logger.info("[INFO][SupersetClient.get_datasets][SUCCESS] Got datasets.")
return total_count, datasets
# END_FUNCTION_get_datasets
# [ENTITY: Function('get_dataset')]
# CONTRACT:
# PURPOSE: Получение метаданных датасета по ID.
# PRECONDITIONS: `dataset_id` должен существовать.
# POSTCONDITIONS: Возвращает метаданные датасета.
def get_dataset(self, dataset_id: str) -> dict:
self.logger.info(f"[INFO][SupersetClient.get_dataset][ENTER] Getting dataset: {dataset_id}")
response_data = self.network.request(
method="GET",
endpoint=f"/dataset/{dataset_id}",
)
self.logger.info(f"[INFO][SupersetClient.get_dataset][SUCCESS] Got dataset: {dataset_id}")
return response_data.get("result", {})
# END_FUNCTION_get_dataset
def get_databases(self) -> List[Dict]:
self.logger.info("[INFO][SupersetClient.get_databases][ENTER] Getting databases.")
response = self.network.request("GET", "/database/")
self.logger.info("[INFO][SupersetClient.get_databases][SUCCESS] Got databases.")
return response.get('result', [])
# [ENTITY: Function('export_dashboard')]
# CONTRACT:
# PURPOSE: Экспорт дашборда в ZIP-архив.
# PRECONDITIONS: `dashboard_id` должен существовать.
# POSTCONDITIONS: Возвращает содержимое ZIP-архива и имя файла.
# --------------------------------------------------------------
# [ENTITY: Method('export_dashboard')]
# --------------------------------------------------------------
"""
:purpose: Скачать дашборд в виде ZIPархива.
:preconditions: ``dashboard_id`` существующий идентификатор.
:postconditions: Возвращается бинарное содержимое и имя файла.
"""
def export_dashboard(self, dashboard_id: int) -> Tuple[bytes, str]:
self.logger.info(f"[INFO][SupersetClient.export_dashboard][ENTER] Exporting dashboard: {dashboard_id}")
self.logger.info("[INFO][export_dashboard][ENTER] Exporting dashboard %s.", dashboard_id)
response = self.network.request(
method="GET",
endpoint="/dashboard/export/",
params={"q": json.dumps([dashboard_id])},
stream=True,
raw_response=True
raw_response=True,
)
self._validate_export_response(response, dashboard_id)
filename = self._resolve_export_filename(response, dashboard_id)
content = response.content
self.logger.info(f"[INFO][SupersetClient.export_dashboard][SUCCESS] Exported dashboard: {dashboard_id}")
return content, filename
# END_FUNCTION_export_dashboard
self.logger.info("[INFO][export_dashboard][SUCCESS] Exported dashboard %s.", dashboard_id)
return response.content, filename
# [END_ENTITY]
# [ENTITY: Function('_validate_export_response')]
# CONTRACT:
# PURPOSE: Валидация ответа экспорта.
# PRECONDITIONS: `response` должен быть валидным HTTP-ответом.
# POSTCONDITIONS: Ответ валиден.
def _validate_export_response(self, response: Response, dashboard_id: int) -> None:
self.logger.debug(f"[DEBUG][SupersetClient._validate_export_response][ENTER] Validating export response for dashboard: {dashboard_id}")
content_type = response.headers.get('Content-Type', '')
if 'application/zip' not in content_type:
self.logger.error(f"[ERROR][SupersetClient._validate_export_response][FAILURE] Invalid content type: {content_type}")
raise ExportError(f"Получен не ZIP-архив (Content-Type: {content_type})")
if not response.content:
self.logger.error("[ERROR][SupersetClient._validate_export_response][FAILURE] Empty response content.")
raise ExportError("Получены пустые данные при экспорте")
self.logger.debug(f"[DEBUG][SupersetClient._validate_export_response][SUCCESS] Export response validated for dashboard: {dashboard_id}")
# END_FUNCTION__validate_export_response
# --------------------------------------------------------------
# [ENTITY: Method('import_dashboard')]
# --------------------------------------------------------------
"""
:purpose: Импортировать дашборд из ZIPфайла. При неуспешном импорте,
если ``delete_before_reimport`` = True, сначала удаляется
дашборд по ID, затем импорт повторяется.
:preconditions:
- ``file_name`` путь к существующему ZIPархиву (str|Path).
- ``dash_id`` (опционально) ID дашборда, который следует удалить.
:postconditions: Возвращается словарь‑ответ API при успехе.
"""
def import_dashboard(
self,
file_name: Union[str, Path],
dash_id: Optional[int] = None,
dash_slug: Optional[str] = None, # сохраняем для возможного логирования
) -> Dict:
# -----------------------------------------------------------------
# 1⃣ Приводим путь к строке (APIклиент ожидает str)
# -----------------------------------------------------------------
file_path: str = str(file_name) # <--- гарантируем тип str
self._validate_import_file(file_path)
# [ENTITY: Function('_resolve_export_filename')]
# CONTRACT:
# PURPOSE: Определение имени экспортируемого файла.
# PRECONDITIONS: `response` должен быть валидным HTTP-ответом.
# POSTCONDITIONS: Возвращает имя файла.
def _resolve_export_filename(self, response: Response, dashboard_id: int) -> str:
self.logger.debug(f"[DEBUG][SupersetClient._resolve_export_filename][ENTER] Resolving export filename for dashboard: {dashboard_id}")
filename = get_filename_from_headers(response.headers)
if not filename:
timestamp = datetime.datetime.now().strftime('%Y%m%dT%H%M%S')
filename = f"dashboard_export_{dashboard_id}_{timestamp}.zip"
self.logger.warning(f"[WARNING][SupersetClient._resolve_export_filename][STATE_CHANGE] Could not resolve filename from headers, generated: {filename}")
self.logger.debug(f"[DEBUG][SupersetClient._resolve_export_filename][SUCCESS] Resolved export filename: {filename}")
return filename
# END_FUNCTION__resolve_export_filename
try:
import_response = self._do_import(file_path)
self.logger.info("[INFO][import_dashboard] Imported %s.", file_path)
return import_response
# [ENTITY: Function('export_to_file')]
# CONTRACT:
# PURPOSE: Экспорт дашборда напрямую в файл.
# PRECONDITIONS: `output_dir` должен существовать.
# POSTCONDITIONS: Дашборд сохранен в файл.
def export_to_file(self, dashboard_id: int, output_dir: Union[str, Path]) -> Path:
self.logger.info(f"[INFO][SupersetClient.export_to_file][ENTER] Exporting dashboard {dashboard_id} to file in {output_dir}")
output_dir = Path(output_dir)
if not output_dir.exists():
self.logger.error(f"[ERROR][SupersetClient.export_to_file][FAILURE] Output directory does not exist: {output_dir}")
raise FileNotFoundError(f"Директория {output_dir} не найдена")
content, filename = self.export_dashboard(dashboard_id)
target_path = output_dir / filename
with open(target_path, 'wb') as f:
f.write(content)
self.logger.info(f"[INFO][SupersetClient.export_to_file][SUCCESS] Exported dashboard {dashboard_id} to {target_path}")
return target_path
# END_FUNCTION_export_to_file
except Exception as exc:
# -----------------------------------------------------------------
# 2⃣ Логируем первую неудачу, пытаемся удалить и повторить,
# только если включён флаг ``delete_before_reimport``.
# -----------------------------------------------------------------
self.logger.error(
"[ERROR][import_dashboard] First import attempt failed: %s",
exc,
exc_info=True,
)
if not self.delete_before_reimport:
raise
# [ENTITY: Function('import_dashboard')]
# CONTRACT:
# PURPOSE: Импорт дашборда из ZIP-архива.
# PRECONDITIONS: `file_name` должен быть валидным ZIP-файлом.
# POSTCONDITIONS: Возвращает ответ API.
def import_dashboard(self, file_name: Union[str, Path]) -> Dict:
self.logger.info(f"[INFO][SupersetClient.import_dashboard][ENTER] Importing dashboard from: {file_name}")
self._validate_import_file(file_name)
import_response = self.network.upload_file(
# -----------------------------------------------------------------
# 3⃣ Выбираем, как искать дашборд для удаления.
# При наличии ``dash_id`` удаляем его.
# Иначе, если известен ``dash_slug`` переводим его в ID ниже.
# -----------------------------------------------------------------
target_id: Optional[int] = dash_id
if target_id is None and dash_slug is not None:
# Попытка динамического определения ID через slug.
# Мы делаем отдельный запрос к /dashboard/ (поисковый фильтр).
self.logger.debug("[DEBUG][import_dashboard] Resolving ID by slug '%s'.", dash_slug)
try:
_, candidates = self.get_dashboards(
query={"filters": [{"col": "slug", "op": "eq", "value": dash_slug}]}
)
if candidates:
target_id = candidates[0]["id"]
self.logger.debug("[DEBUG][import_dashboard] Resolved slug → ID %s.", target_id)
except Exception as e:
self.logger.warning(
"[WARN][import_dashboard] Could not resolve slug '%s' to ID: %s",
dash_slug,
e,
)
# Если всё‑равно нет ID считаем невозможным корректно удалить.
if target_id is None:
self.logger.error("[ERROR][import_dashboard] No ID available for deleteretry.")
raise
# -----------------------------------------------------------------
# 4⃣ Удаляем найденный дашборд (по ID)
# -----------------------------------------------------------------
try:
self.delete_dashboard(target_id)
self.logger.info("[INFO][import_dashboard] Deleted dashboard ID %s, retrying import.", target_id)
except Exception as del_exc:
self.logger.error("[ERROR][import_dashboard] Delete failed: %s", del_exc, exc_info=True)
raise
# -----------------------------------------------------------------
# 5⃣ Повторный импорт (тот же файл)
# -----------------------------------------------------------------
try:
import_response = self._do_import(file_path)
self.logger.info("[INFO][import_dashboard] Reimport succeeded.")
return import_response
except Exception as rec_exc:
self.logger.error(
"[ERROR][import_dashboard] Reimport after delete failed: %s",
rec_exc,
exc_info=True,
)
raise
# [END_ENTITY]
# --------------------------------------------------------------
# [ENTITY: Method('_do_import')]
# --------------------------------------------------------------
"""
:purpose: Выполнить один запрос на импорт без обработки исключений.
:preconditions: ``file_name`` уже проверен и существует.
:postconditions: Возвращается словарь‑ответ API.
"""
def _do_import(self, file_name: Union[str, Path]) -> Dict:
return self.network.upload_file(
endpoint="/dashboard/import/",
file_info={
"file_obj": Path(file_name),
"file_name": Path(file_name).name,
"form_field": "formData",
},
extra_data={'overwrite': 'true'},
timeout=self.config.timeout * 2
extra_data={"overwrite": "true"},
timeout=self.config.timeout * 2,
)
self.logger.info(f"[INFO][SupersetClient.import_dashboard][SUCCESS] Imported dashboard from: {file_name}")
return import_response
# END_FUNCTION_import_dashboard
# [END_ENTITY]
# [ENTITY: Function('_validate_query_params')]
# CONTRACT:
# PURPOSE: Нормализация и валидация параметров запроса.
# PRECONDITIONS: None
# POSTCONDITIONS: Возвращает валидный словарь параметров.
# --------------------------------------------------------------
# [ENTITY: Method('delete_dashboard')]
# --------------------------------------------------------------
"""
:purpose: Удалить дашборд **по ID или slug**.
:preconditions:
- ``dashboard_id`` intID **или** strslug дашборда.
:postconditions: На уровне API считается, что ресурс удалён
(HTTP200/204). Логируется результат операции.
"""
def delete_dashboard(self, dashboard_id: Union[int, str]) -> None:
# ``dashboard_id`` может быть целым числом или строковым slug.
self.logger.info("[INFO][delete_dashboard][ENTER] Deleting dashboard %s.", dashboard_id)
response = self.network.request(
method="DELETE",
endpoint=f"/dashboard/{dashboard_id}",
)
# Superset обычно возвращает 200/204. Если есть поле ``result`` проверяем.
if response.get("result", True) is not False:
self.logger.info("[INFO][delete_dashboard] Dashboard %s deleted.", dashboard_id)
else:
self.logger.warning("[WARN][delete_dashboard] Unexpected response while deleting %s.", dashboard_id)
# [END_ENTITY]
# --------------------------------------------------------------
# [ENTITY: Method('_extract_dashboard_id_from_zip')]
# --------------------------------------------------------------
"""
:purpose: Попытаться извлечь **ID** дашборда из ``metadata.yaml`` внутри ZIPархива.
:preconditions: ``file_name`` путь к корректному ZIPфайлу.
:postconditions: Возвращается ``int``ID или ``None``.
"""
def _extract_dashboard_id_from_zip(self, file_name: Union[str, Path]) -> Optional[int]:
try:
import yaml
with zipfile.ZipFile(file_name, "r") as zf:
for name in zf.namelist():
if name.endswith("metadata.yaml"):
with zf.open(name) as meta_file:
meta = yaml.safe_load(meta_file.read())
dash_id = meta.get("dashboard_uuid") or meta.get("dashboard_id")
if dash_id is not None:
return int(dash_id)
except Exception as exc:
self.logger.error("[ERROR][_extract_dashboard_id_from_zip] %s", exc, exc_info=True)
return None
# [END_ENTITY]
# --------------------------------------------------------------
# [ENTITY: Method('_extract_dashboard_slug_from_zip')]
# --------------------------------------------------------------
"""
:purpose: Попытаться извлечь **slug** дашборда из ``metadata.yaml`` внутри ZIPархива.
:preconditions: ``file_name`` путь к корректному ZIPфайлу.
:postconditions: Возвращается строкаslug или ``None``.
"""
def _extract_dashboard_slug_from_zip(self, file_name: Union[str, Path]) -> Optional[str]:
try:
import yaml
with zipfile.ZipFile(file_name, "r") as zf:
for name in zf.namelist():
if name.endswith("metadata.yaml"):
with zf.open(name) as meta_file:
meta = yaml.safe_load(meta_file.read())
slug = meta.get("slug")
if slug:
return str(slug)
except Exception as exc:
self.logger.error("[ERROR][_extract_dashboard_slug_from_zip] %s", exc, exc_info=True)
return None
# [END_ENTITY]
# --------------------------------------------------------------
# [ENTITY: Method('_validate_export_response')]
# --------------------------------------------------------------
"""
:purpose: Проверить, что ответ от ``/dashboard/export/`` ZIPархив с данными.
:preconditions: ``response`` объект :class:`requests.Response`.
:postconditions: При несоответствии возбуждается :class:`ExportError`.
"""
def _validate_export_response(self, response: Response, dashboard_id: int) -> None:
self.logger.debug("[DEBUG][_validate_export_response][ENTER] Validating response for %s.", dashboard_id)
content_type = response.headers.get("Content-Type", "")
if "application/zip" not in content_type:
self.logger.error("[ERROR][_validate_export_response][FAILURE] Invalid content type: %s", content_type)
raise ExportError(f"Получен не ZIPархив (Content-Type: {content_type})")
if not response.content:
self.logger.error("[ERROR][_validate_export_response][FAILURE] Empty response content.")
raise ExportError("Получены пустые данные при экспорте")
self.logger.debug("[DEBUG][_validate_export_response][SUCCESS] Response validated.")
# [END_ENTITY]
# --------------------------------------------------------------
# [ENTITY: Method('_resolve_export_filename')]
# --------------------------------------------------------------
"""
:purpose: Определить имя файла, полученного из заголовков ответа.
:preconditions: ``response.headers`` содержит (возможно) ``ContentDisposition``.
:postconditions: Возвращается строка‑имя файла.
"""
def _resolve_export_filename(self, response: Response, dashboard_id: int) -> str:
self.logger.debug("[DEBUG][_resolve_export_filename][ENTER] Resolving filename.")
filename = get_filename_from_headers(response.headers)
if not filename:
from datetime import datetime
timestamp = datetime.now().strftime("%Y%m%dT%H%M%S")
filename = f"dashboard_export_{dashboard_id}_{timestamp}.zip"
self.logger.warning("[WARN][_resolve_export_filename] Generated filename: %s", filename)
self.logger.debug("[DEBUG][_resolve_export_filename][SUCCESS] Filename: %s", filename)
return filename
# [END_ENTITY]
# --------------------------------------------------------------
# [ENTITY: Method('_validate_query_params')]
# --------------------------------------------------------------
"""
:purpose: Сформировать корректный набор параметров запроса.
:preconditions: ``query`` любой словарь или ``None``.
:postconditions: Возвращается словарь с обязательными полями.
"""
def _validate_query_params(self, query: Optional[Dict]) -> Dict:
self.logger.debug("[DEBUG][SupersetClient._validate_query_params][ENTER] Validating query params.")
base_query = {
"columns": ["slug", "id", "changed_on_utc", "dashboard_title", "published"],
"page": 0,
"page_size": 1000
"page_size": 1000,
}
validated_query = {**base_query, **(query or {})}
self.logger.debug(f"[DEBUG][SupersetClient._validate_query_params][SUCCESS] Validated query params: {validated_query}")
return validated_query
# END_FUNCTION__validate_query_params
validated = {**base_query, **(query or {})}
self.logger.debug("[DEBUG][_validate_query_params] %s", validated)
return validated
# [END_ENTITY]
# [ENTITY: Function('_fetch_total_object_count')]
# CONTRACT:
# PURPOSE: Получение общего количества объектов.
# PRECONDITIONS: `endpoint` должен быть валидным.
# POSTCONDITIONS: Возвращает общее количество объектов.
def _fetch_total_object_count(self, endpoint:str) -> int:
self.logger.debug(f"[DEBUG][SupersetClient._fetch_total_object_count][ENTER] Fetching total object count for endpoint: {endpoint}")
query_params_for_count = {'page': 0, 'page_size': 1}
# --------------------------------------------------------------
# [ENTITY: Method('_fetch_total_object_count')]
# --------------------------------------------------------------
"""
:purpose: Получить общее количество объектов по указанному endpoint.
:preconditions: ``endpoint`` строка, начинающаяся с «/».
:postconditions: Возвращается целое число.
"""
def _fetch_total_object_count(self, endpoint: str) -> int:
query_params_for_count = {"page": 0, "page_size": 1}
count = self.network.fetch_paginated_count(
endpoint=endpoint,
query_params=query_params_for_count,
count_field="count"
count_field="count",
)
self.logger.debug(f"[DEBUG][SupersetClient._fetch_total_object_count][SUCCESS] Fetched total object count: {count}")
self.logger.debug("[DEBUG][_fetch_total_object_count] %s%s", endpoint, count)
return count
# END_FUNCTION__fetch_total_object_count
# [END_ENTITY]
# [ENTITY: Function('_fetch_all_pages')]
# CONTRACT:
# PURPOSE: Обход всех страниц пагинированного API.
# PRECONDITIONS: `pagination_options` должен содержать необходимые параметры.
# POSTCONDITIONS: Возвращает список всех объектов.
def _fetch_all_pages(self, endpoint:str, pagination_options: Dict) -> List[Dict]:
self.logger.debug(f"[DEBUG][SupersetClient._fetch_all_pages][ENTER] Fetching all pages for endpoint: {endpoint}")
# --------------------------------------------------------------
# [ENTITY: Method('_fetch_all_pages')]
# --------------------------------------------------------------
"""
:purpose: Обойти все страницы пагинированного API.
:preconditions: ``pagination_options`` словарь, сформированный
в ``_validate_query_params`` и ``_fetch_total_object_count``.
:postconditions: Возвращается список всех объектов.
"""
def _fetch_all_pages(self, endpoint: str, pagination_options: Dict) -> List[Dict]:
all_data = self.network.fetch_paginated_data(
endpoint=endpoint,
pagination_options=pagination_options
pagination_options=pagination_options,
)
self.logger.debug(f"[DEBUG][SupersetClient._fetch_all_pages][SUCCESS] Fetched all pages for endpoint: {endpoint}")
self.logger.debug("[DEBUG][_fetch_all_pages] Fetched %s items from %s.", len(all_data), endpoint)
return all_data
# END_FUNCTION__fetch_all_pages
# [END_ENTITY]
# [ENTITY: Function('_validate_import_file')]
# CONTRACT:
# PURPOSE: Проверка файла перед импортом.
# PRECONDITIONS: `zip_path` должен быть путем к файлу.
# POSTCONDITIONS: Файл валиден.
# --------------------------------------------------------------
# [ENTITY: Method('_validate_import_file')]
# --------------------------------------------------------------
"""
:purpose: Проверить, что файл существует, является ZIPархивом и
содержит ``metadata.yaml``.
:preconditions: ``zip_path`` путь к файлу.
:postconditions: При невалидном файле возбуждается :class:`InvalidZipFormatError`.
"""
def _validate_import_file(self, zip_path: Union[str, Path]) -> None:
self.logger.debug(f"[DEBUG][SupersetClient._validate_import_file][ENTER] Validating import file: {zip_path}")
path = Path(zip_path)
if not path.exists():
self.logger.error(f"[ERROR][SupersetClient._validate_import_file][FAILURE] Import file does not exist: {zip_path}")
self.logger.error("[ERROR][_validate_import_file] File not found: %s", zip_path)
raise FileNotFoundError(f"Файл {zip_path} не существует")
if not zipfile.is_zipfile(path):
self.logger.error(f"[ERROR][SupersetClient._validate_import_file][FAILURE] Import file is not a zip file: {zip_path}")
raise InvalidZipFormatError(f"Файл {zip_path} не является ZIP-архивом")
with zipfile.ZipFile(path, 'r') as zf:
if not any(n.endswith('metadata.yaml') for n in zf.namelist()):
self.logger.error(f"[ERROR][SupersetClient._validate_import_file][FAILURE] Import file does not contain metadata.yaml: {zip_path}")
self.logger.error("[ERROR][_validate_import_file] Not a zip file: %s", zip_path)
raise InvalidZipFormatError(f"Файл {zip_path} не является ZIPархивом")
with zipfile.ZipFile(path, "r") as zf:
if not any(n.endswith("metadata.yaml") for n in zf.namelist()):
self.logger.error("[ERROR][_validate_import_file] No metadata.yaml in %s", zip_path)
raise InvalidZipFormatError(f"Архив {zip_path} не содержит 'metadata.yaml'")
self.logger.debug(f"[DEBUG][SupersetClient._validate_import_file][SUCCESS] Validated import file: {zip_path}")
# END_FUNCTION__validate_import_file
self.logger.debug("[DEBUG][_validate_import_file] File %s validated.", zip_path)
# [END_ENTITY]
# --------------------------------------------------------------
# [ENTITY: Method('get_datasets')]
# --------------------------------------------------------------
"""
:purpose: Получить список датасетов с поддержкой пагинации.
:preconditions: None.
:postconditions: Возвращается кортеж ``(total_count, list_of_datasets)``.
"""
def get_datasets(self, query: Optional[Dict] = None) -> Tuple[int, List[Dict]]:
self.logger.info("[INFO][get_datasets][ENTER] Fetching datasets.")
validated_query = self._validate_query_params(query)
total_count = self._fetch_total_object_count(endpoint="/dataset/")
paginated_data = self._fetch_all_pages(
endpoint="/dataset/",
pagination_options={
"base_query": validated_query,
"total_count": total_count,
"results_field": "result",
},
)
self.logger.info("[INFO][get_datasets][SUCCESS] Got datasets.")
return total_count, paginated_data
# [END_ENTITY]
# [END_FILE client.py]

View File

@@ -1,88 +1,205 @@
# [MODULE] Superset Tool Logger Utility
# PURPOSE: Предоставляет стандартизированный класс-обертку `SupersetLogger` для настройки и использования логирования в проекте.
# COHERENCE: Модуль согласован со стандартной библиотекой `logging`, расширяя ее для нужд проекта.
# [MODULE_PATH] superset_tool.utils.logger
# [FILE] logger.py
# [SEMANTICS] logging, utils, aifriendly, infrastructure
# --------------------------------------------------------------
# [IMPORTS]
# --------------------------------------------------------------
import logging
import sys
from datetime import datetime
from pathlib import Path
from typing import Optional
from typing import Optional, Any, Mapping
# [END_IMPORTS]
# CONTRACT:
# PURPOSE: Обеспечивает унифицированную настройку логгера с выводом в консоль и/или файл.
# PRECONDITIONS:
# - `name` должен быть строкой.
# - `level` должен быть валидным уровнем логирования (например, `logging.INFO`).
# POSTCONDITIONS:
# - Создает и настраивает логгер с указанным именем и уровнем.
# - Добавляет обработчики для вывода в файл (если указан `log_dir`) и в консоль (если `console=True`).
# - Очищает все предыдущие обработчики для данного логгера, чтобы избежать дублирования.
# PARAMETERS:
# - name: str - Имя логгера.
# - log_dir: Optional[Path] - Директория для сохранения лог-файлов.
# - level: int - Уровень логирования.
# - console: bool - Флаг для включения вывода в консоль.
# --------------------------------------------------------------
# [ENTITY: Service('SupersetLogger')]
# --------------------------------------------------------------
"""
:purpose: Универсальная обёртка над ``logging.Logger``. Позволяет:
• задавать уровень и вывод в консоль/файл,
• передавать произвольные ``extra``‑поля,
• использовать привычный API (info, debug, warning, error,
critical, exception) без «падения» при неверных аргументах.
:preconditions:
- ``name`` строка‑идентификатор логгера,
- ``level`` валидный уровень из ``logging``,
- ``log_dir`` при указании директория, куда будет писаться файл‑лог.
:postconditions:
- Создан полностью сконфигурированный ``logging.Logger`` без
дублирующих обработчиков.
"""
class SupersetLogger:
"""
:ivar logging.Logger logger: Внутренний стандартный логгер.
:ivar bool propagate: Отключаем наследование записей, чтобы
сообщения не «проваливались» выше.
"""
# --------------------------------------------------------------
# [ENTITY: Method('__init__')]
# --------------------------------------------------------------
"""
:purpose: Конфигурировать базовый логгер, добавить обработчики
консоли и/или файла, очистить прежние обработчики.
:preconditions: Параметры валидны.
:postconditions: ``self.logger`` готов к использованию.
"""
def __init__(
self,
name: str = "superset_tool",
log_dir: Optional[Path] = None,
level: int = logging.INFO,
console: bool = True
):
console: bool = True,
) -> None:
self.logger = logging.getLogger(name)
self.logger.setLevel(level)
self.logger.propagate = False # ← не «прокидываем» записи выше
formatter = logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s'
)
formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
# [ANCHOR] HANDLER_RESET
# Очищаем существующие обработчики, чтобы избежать дублирования вывода при повторной инициализации.
# ---- Очистка предыдущих обработчиков (важно при повторных инициализациях) ----
if self.logger.hasHandlers():
self.logger.handlers.clear()
# [ANCHOR] FILE_HANDLER
# ---- Файловый обработчик (если указана директория) ----
if log_dir:
log_dir.mkdir(parents=True, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d")
file_handler = logging.FileHandler(
log_dir / f"{name}_{timestamp}.log", encoding='utf-8'
log_dir / f"{name}_{timestamp}.log", encoding="utf-8"
)
file_handler.setFormatter(formatter)
self.logger.addHandler(file_handler)
# [ANCHOR] CONSOLE_HANDLER
# ---- Консольный обработчик ----
if console:
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(formatter)
self.logger.addHandler(console_handler)
# CONTRACT:
# PURPOSE: (HELPER) Генерирует строку с текущей датой для имени лог-файла.
# RETURN: str - Отформатированная дата (YYYYMMDD).
def _get_timestamp(self) -> str:
return datetime.now().strftime("%Y%m%d")
# END_FUNCTION__get_timestamp
# [END_ENTITY]
# [INTERFACE] Методы логирования
def info(self, message: str, extra: Optional[dict] = None, exc_info: bool = False):
self.logger.info(message, extra=extra, exc_info=exc_info)
# --------------------------------------------------------------
# [ENTITY: Method('_log')]
# --------------------------------------------------------------
"""
:purpose: Универсальная вспомогательная обёртка над
``logging.Logger.<level>``. Принимает любые ``*args``
(подстановочные параметры) и ``extra``‑словарь.
:preconditions:
- ``level_method`` один из методов ``logger``,
- ``msg`` строка‑шаблон,
- ``*args`` значения для ``%``‑подстановок,
- ``extra`` пользовательские атрибуты (может быть ``None``).
:postconditions: Запись в журнал выполнена.
"""
def _log(
self,
level_method: Any,
msg: str,
*args: Any,
extra: Optional[Mapping[str, Any]] = None,
exc_info: bool = False,
) -> None:
if extra is not None:
level_method(msg, *args, extra=extra, exc_info=exc_info)
else:
level_method(msg, *args, exc_info=exc_info)
def error(self, message: str, extra: Optional[dict] = None, exc_info: bool = False):
self.logger.error(message, extra=extra, exc_info=exc_info)
# [END_ENTITY]
def warning(self, message: str, extra: Optional[dict] = None, exc_info: bool = False):
self.logger.warning(message, extra=extra, exc_info=exc_info)
# --------------------------------------------------------------
# [ENTITY: Method('info')]
# --------------------------------------------------------------
"""
:purpose: Записать сообщение уровня INFO.
"""
def info(
self,
msg: str,
*args: Any,
extra: Optional[Mapping[str, Any]] = None,
exc_info: bool = False,
) -> None:
self._log(self.logger.info, msg, *args, extra=extra, exc_info=exc_info)
# [END_ENTITY]
def critical(self, message: str, extra: Optional[dict] = None, exc_info: bool = False):
self.logger.critical(message, extra=extra, exc_info=exc_info)
# --------------------------------------------------------------
# [ENTITY: Method('debug')]
# --------------------------------------------------------------
"""
:purpose: Записать сообщение уровня DEBUG.
"""
def debug(
self,
msg: str,
*args: Any,
extra: Optional[Mapping[str, Any]] = None,
exc_info: bool = False,
) -> None:
self._log(self.logger.debug, msg, *args, extra=extra, exc_info=exc_info)
# [END_ENTITY]
def debug(self, message: str, extra: Optional[dict] = None, exc_info: bool = False):
self.logger.debug(message, extra=extra, exc_info=exc_info)
# --------------------------------------------------------------
# [ENTITY: Method('warning')]
# --------------------------------------------------------------
"""
:purpose: Записать сообщение уровня WARNING.
"""
def warning(
self,
msg: str,
*args: Any,
extra: Optional[Mapping[str, Any]] = None,
exc_info: bool = False,
) -> None:
self._log(self.logger.warning, msg, *args, extra=extra, exc_info=exc_info)
# [END_ENTITY]
def exception(self, message: str, *args, **kwargs):
self.logger.exception(message, *args, **kwargs)
# END_CLASS_SupersetLogger
# --------------------------------------------------------------
# [ENTITY: Method('error')]
# --------------------------------------------------------------
"""
:purpose: Записать сообщение уровня ERROR.
"""
def error(
self,
msg: str,
*args: Any,
extra: Optional[Mapping[str, Any]] = None,
exc_info: bool = False,
) -> None:
self._log(self.logger.error, msg, *args, extra=extra, exc_info=exc_info)
# [END_ENTITY]
# END_MODULE_logger
# --------------------------------------------------------------
# [ENTITY: Method('critical')]
# --------------------------------------------------------------
"""
:purpose: Записать сообщение уровня CRITICAL.
"""
def critical(
self,
msg: str,
*args: Any,
extra: Optional[Mapping[str, Any]] = None,
exc_info: bool = False,
) -> None:
self._log(self.logger.critical, msg, *args, extra=extra, exc_info=exc_info)
# [END_ENTITY]
# --------------------------------------------------------------
# [ENTITY: Method('exception')]
# --------------------------------------------------------------
"""
:purpose: Записать сообщение уровня ERROR вместе с трассировкой
текущего исключения (аналог ``logger.exception``).
"""
def exception(self, msg: str, *args: Any, **kwargs: Any) -> None:
self.logger.exception(msg, *args, **kwargs)
# [END_ENTITY]
# --------------------------------------------------------------
# [END_FILE logger.py]
# --------------------------------------------------------------

View File

@@ -99,7 +99,7 @@ class APIClient:
"csrf_token": csrf_token
}
self._authenticated = True
self.logger.info("[INFO][APIClient.authenticate][SUCCESS] Authenticated successfully.")
self.logger.info(f"[INFO][APIClient.authenticate][SUCCESS] Authenticated successfully. Tokens {self._tokens}")
return self._tokens
except requests.exceptions.HTTPError as e:
self.logger.error(f"[ERROR][APIClient.authenticate][FAILURE] Authentication failed: {e}")
@@ -132,12 +132,13 @@ class APIClient:
_headers = self.headers.copy()
if headers:
_headers.update(headers)
timeout = kwargs.pop('timeout', self.request_settings.get("timeout", DEFAULT_TIMEOUT))
try:
response = self.session.request(
method,
full_url,
headers=_headers,
timeout=self.request_settings.get("timeout", DEFAULT_TIMEOUT),
timeout=timeout,
**kwargs
)
response.raise_for_status()

View File

@@ -0,0 +1,148 @@
# [MODULE_PATH] superset_tool.utils.whiptail_fallback
# [FILE] whiptail_fallback.py
# [SEMANTICS] ui, fallback, console, utils, noninteractive
# --------------------------------------------------------------
# [IMPORTS]
# --------------------------------------------------------------
import sys
from typing import List, Tuple, Optional, Any
# [END_IMPORTS]
# --------------------------------------------------------------
# [ENTITY: Service('ConsoleUI')]
# --------------------------------------------------------------
"""
:purpose: Плотный консольный UIfallback для всех функций,
которые в оригинальном проекте использовали ``whiptail``.
Всё взаимодействие теперь **не‑интерактивно**: функции,
выводящие сообщение, просто печатают его без ожидания
``Enter``.
"""
def menu(
title: str,
prompt: str,
choices: List[str],
backtitle: str = "Superset Migration Tool",
) -> Tuple[int, Optional[str]]:
"""Return (rc, selected item). rc == 0 → OK."""
print(f"\n=== {title} ===")
print(prompt)
for idx, item in enumerate(choices, 1):
print(f"{idx}) {item}")
try:
raw = input("\nВведите номер (0 отмена): ").strip()
sel = int(raw)
if sel == 0:
return 1, None
return 0, choices[sel - 1]
except Exception:
return 1, None
def checklist(
title: str,
prompt: str,
options: List[Tuple[str, str]],
backtitle: str = "Superset Migration Tool",
) -> Tuple[int, List[str]]:
"""Return (rc, list of selected **values**)."""
print(f"\n=== {title} ===")
print(prompt)
for idx, (val, label) in enumerate(options, 1):
print(f"{idx}) [{val}] {label}")
raw = input("\nВведите номера через запятую (пустой ввод → отказ): ").strip()
if not raw:
return 1, []
try:
indices = {int(x) for x in raw.split(",") if x.strip()}
selected = [options[i - 1][0] for i in indices if 0 < i <= len(options)]
return 0, selected
except Exception:
return 1, []
def yesno(
title: str,
question: str,
backtitle: str = "Superset Migration Tool",
) -> bool:
"""True → пользователь ответил «да». """
ans = input(f"\n=== {title} ===\n{question} (y/n): ").strip().lower()
return ans in ("y", "yes", "да", "д")
def msgbox(
title: str,
msg: str,
width: int = 60,
height: int = 15,
backtitle: str = "Superset Migration Tool",
) -> None:
"""Простой вывод сообщения без ожидания Enter."""
print(f"\n=== {title} ===\n{msg}\n")
# **Убрано:** input("Нажмите <Enter> для продолжения...")
def inputbox(
title: str,
prompt: str,
backtitle: str = "Superset Migration Tool",
) -> Tuple[int, Optional[str]]:
"""Return (rc, введённая строка). rc == 0 → успешно."""
print(f"\n=== {title} ===")
val = input(f"{prompt}\n")
if val == "":
return 1, None
return 0, val
# --------------------------------------------------------------
# [ENTITY: Service('ConsoleGauge')]
# --------------------------------------------------------------
"""
:purpose: Минимальная имитация ``whiptail``gauge в консоли.
"""
class _ConsoleGauge:
"""Контекст‑менеджер для простого прогресс‑бара."""
def __init__(self, title: str, width: int = 60, height: int = 10):
self.title = title
self.width = width
self.height = height
self._percent = 0
def __enter__(self):
print(f"\n=== {self.title} ===")
return self
def __exit__(self, exc_type, exc_val, exc_tb):
sys.stdout.write("\n")
sys.stdout.flush()
def set_text(self, txt: str) -> None:
sys.stdout.write(f"\r{txt} ")
sys.stdout.flush()
def set_percent(self, percent: int) -> None:
self._percent = percent
sys.stdout.write(f"{percent}%")
sys.stdout.flush()
# [END_ENTITY]
def gauge(
title: str,
width: int = 60,
height: int = 10,
) -> Any:
"""Always returns the console fallback gauge."""
return _ConsoleGauge(title, width, height)
# [END_ENTITY]
# --------------------------------------------------------------
# [END_FILE whiptail_fallback.py]
# --------------------------------------------------------------

29
whiptailtest.py Normal file
View File

@@ -0,0 +1,29 @@
# test_whiptail.py
from superset_tool.utils.whiptail_fallback import (
menu, checklist, yesno, msgbox, inputbox, gauge,
)
rc, env = menu('Тестовое меню', 'Выберите среду:', ['dev', 'prod'])
print('menu →', rc, env)
rc, ids = checklist(
'Тестовый чек‑лист',
'Выберите пункты:',
[('1', 'Первый'), ('2', 'Второй'), ('3', 'Третий')],
)
print('checklist →', rc, ids)
if yesno('Вопрос', 'Продолжить?'):
print('Ответ ДА')
else:
print('Ответ НЕТ')
rc, txt = inputbox('Ввод', 'Введите произвольный текст:')
print('inputbox →', rc, txt)
msgbox('Сообщение', 'Это просто тестовое сообщение.')
with gauge('Прогресс‑бар') as g:
for i in range(0, 101, 20):
g.set_text(f'Шаг {i // 20 + 1}')
g.set_percent(i)