303 lines
18 KiB
Python
303 lines
18 KiB
Python
# -*- coding: utf-8 -*-
|
||
# CONTRACT:
|
||
# PURPOSE: Интерактивный скрипт для миграции ассетов Superset между различными окружениями.
|
||
# SPECIFICATION_LINK: mod_migration_script
|
||
# PRECONDITIONS: Наличие корректных конфигурационных файлов для подключения к Superset.
|
||
# POSTCONDITIONS: Выбранные ассеты успешно перенесены из исходного в целевое окружение.
|
||
# IMPORTS: [argparse, superset_tool.client, superset_tool.utils.init_clients, superset_tool.utils.logger, superset_tool.utils.fileio]
|
||
"""
|
||
[MODULE] Superset Migration Tool
|
||
@description: Интерактивный скрипт для миграции ассетов Superset между различными окружениями.
|
||
"""
|
||
|
||
# [IMPORTS]
|
||
from superset_tool.client import SupersetClient
|
||
from superset_tool.utils.init_clients import init_superset_clients
|
||
from superset_tool.utils.logger import SupersetLogger
|
||
from superset_tool.utils.fileio import (
|
||
save_and_unpack_dashboard,
|
||
read_dashboard_from_disk,
|
||
update_yamls,
|
||
create_dashboard_export
|
||
)
|
||
|
||
# [ENTITY: Class('Migration')]
|
||
# CONTRACT:
|
||
# PURPOSE: Инкапсулирует логику и состояние процесса миграции.
|
||
# SPECIFICATION_LINK: class_migration
|
||
# ATTRIBUTES:
|
||
# - name: logger, type: SupersetLogger, description: Экземпляр логгера.
|
||
# - name: from_c, type: SupersetClient, description: Клиент для исходного окружения.
|
||
# - name: to_c, type: SupersetClient, description: Клиент для целевого окружения.
|
||
# - name: dashboards_to_migrate, type: list, description: Список дашбордов для миграции.
|
||
# - name: db_config_replacement, type: dict, description: Конфигурация для замены данных БД.
|
||
class Migration:
|
||
"""
|
||
Класс для управления процессом миграции дашбордов Superset.
|
||
"""
|
||
def __init__(self):
|
||
self.logger = SupersetLogger(name="migration_script")
|
||
self.from_c: SupersetClient = None
|
||
self.to_c: SupersetClient = None
|
||
self.dashboards_to_migrate = []
|
||
self.db_config_replacement = None
|
||
# END_FUNCTION___init__
|
||
|
||
# [ENTITY: Function('run')]
|
||
# CONTRACT:
|
||
# PURPOSE: Запускает основной воркфлоу миграции, координируя все шаги.
|
||
# SPECIFICATION_LINK: func_run_migration
|
||
# PRECONDITIONS: None
|
||
# POSTCONDITIONS: Процесс миграции завершен.
|
||
def run(self):
|
||
"""Запускает основной воркфлоу миграции."""
|
||
self.logger.info("[INFO][run][ENTER] Запуск скрипта миграции.")
|
||
self.select_environments()
|
||
self.select_dashboards()
|
||
self.confirm_db_config_replacement()
|
||
self.execute_migration()
|
||
self.logger.info("[INFO][run][EXIT] Скрипт миграции завершен.")
|
||
# END_FUNCTION_run
|
||
|
||
# [ENTITY: Function('select_environments')]
|
||
# CONTRACT:
|
||
# PURPOSE: Шаг 1. Обеспечивает интерактивный выбор исходного и целевого окружений.
|
||
# SPECIFICATION_LINK: func_select_environments
|
||
# PRECONDITIONS: None
|
||
# POSTCONDITIONS: Атрибуты `self.from_c` и `self.to_c` инициализированы валидными клиентами Superset.
|
||
def select_environments(self):
|
||
"""Шаг 1: Выбор окружений (источник и назначение)."""
|
||
self.logger.info("[INFO][select_environments][ENTER] Шаг 1/4: Выбор окружений.")
|
||
|
||
available_envs = {"1": "DEV", "2": "PROD"}
|
||
|
||
print("Доступные окружения:")
|
||
for key, value in available_envs.items():
|
||
print(f" {key}. {value}")
|
||
|
||
while self.from_c is None:
|
||
try:
|
||
from_env_choice = input("Выберите исходное окружение (номер): ")
|
||
from_env_name = available_envs.get(from_env_choice)
|
||
if not from_env_name:
|
||
print("Неверный выбор. Попробуйте снова.")
|
||
continue
|
||
|
||
clients = init_superset_clients(self.logger, env=from_env_name.lower())
|
||
self.from_c = clients[0]
|
||
self.logger.info(f"[INFO][select_environments][STATE] Исходное окружение: {from_env_name}")
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"[ERROR][select_environments][FAILURE] Ошибка при инициализации клиента-источника: {e}", exc_info=True)
|
||
print("Не удалось инициализировать клиент. Проверьте конфигурацию.")
|
||
|
||
while self.to_c is None:
|
||
try:
|
||
to_env_choice = input("Выберите целевое окружение (номер): ")
|
||
to_env_name = available_envs.get(to_env_choice)
|
||
|
||
if not to_env_name:
|
||
print("Неверный выбор. Попробуйте снова.")
|
||
continue
|
||
|
||
if to_env_name == self.from_c.env:
|
||
print("Целевое и исходное окружения не могут совпадать.")
|
||
continue
|
||
|
||
clients = init_superset_clients(self.logger, env=to_env_name.lower())
|
||
self.to_c = clients[0]
|
||
self.logger.info(f"[INFO][select_environments][STATE] Целевое окружение: {to_env_name}")
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"[ERROR][select_environments][FAILURE] Ошибка при инициализации целевого клиента: {e}", exc_info=True)
|
||
print("Не удалось инициализировать клиент. Проверьте конфигурацию.")
|
||
self.logger.info("[INFO][select_environments][EXIT] Шаг 1 завершен.")
|
||
# END_FUNCTION_select_environments
|
||
|
||
# [ENTITY: Function('select_dashboards')]
|
||
# CONTRACT:
|
||
# PURPOSE: Шаг 2. Обеспечивает интерактивный выбор дашбордов для миграции.
|
||
# SPECIFICATION_LINK: func_select_dashboards
|
||
# PRECONDITIONS: `self.from_c` должен быть инициализирован.
|
||
# POSTCONDITIONS: `self.dashboards_to_migrate` содержит список выбранных дашбордов.
|
||
def select_dashboards(self):
|
||
"""Шаг 2: Выбор дашбордов для миграции."""
|
||
self.logger.info("[INFO][select_dashboards][ENTER] Шаг 2/4: Выбор дашбордов.")
|
||
|
||
try:
|
||
all_dashboards = self.from_c.get_dashboards()
|
||
if not all_dashboards:
|
||
self.logger.warning("[WARN][select_dashboards][STATE] В исходном окружении не найдено дашбордов.")
|
||
print("В исходном окружении не найдено дашбордов.")
|
||
return
|
||
|
||
while True:
|
||
print("\nДоступные дашборды:")
|
||
for i, dashboard in enumerate(all_dashboards):
|
||
print(f" {i + 1}. {dashboard['dashboard_title']}")
|
||
|
||
print("\nОпции:")
|
||
print(" - Введите номера дашбордов через запятую (например, 1, 3, 5).")
|
||
print(" - Введите 'все' для выбора всех дашбордов.")
|
||
print(" - Введите 'поиск <запрос>' для поиска дашбордов.")
|
||
print(" - Введите 'выход' для завершения.")
|
||
|
||
choice = input("Ваш выбор: ").lower().strip()
|
||
|
||
if choice == 'выход':
|
||
break
|
||
elif choice == 'все':
|
||
self.dashboards_to_migrate = all_dashboards
|
||
self.logger.info(f"[INFO][select_dashboards][STATE] Выбраны все дашборды: {len(self.dashboards_to_migrate)}")
|
||
break
|
||
elif choice.startswith('поиск '):
|
||
search_query = choice[6:].strip()
|
||
filtered_dashboards = [d for d in all_dashboards if search_query in d['dashboard_title'].lower()]
|
||
if not filtered_dashboards:
|
||
print("По вашему запросу ничего не найдено.")
|
||
else:
|
||
all_dashboards = filtered_dashboards
|
||
continue
|
||
else:
|
||
try:
|
||
selected_indices = [int(i.strip()) - 1 for i in choice.split(',')]
|
||
self.dashboards_to_migrate = [all_dashboards[i] for i in selected_indices if 0 <= i < len(all_dashboards)]
|
||
self.logger.info(f"[INFO][select_dashboards][STATE] Выбрано дашбордов: {len(self.dashboards_to_migrate)}")
|
||
break
|
||
except (ValueError, IndexError):
|
||
print("Неверный ввод. Пожалуйста, введите корректные номера.")
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"[ERROR][select_dashboards][FAILURE] Ошибка при получении или выборе дашбордов: {e}", exc_info=True)
|
||
print("Произошла ошибка при работе с дашбордами.")
|
||
|
||
self.logger.info("[INFO][select_dashboards][EXIT] Шаг 2 завершен.")
|
||
# END_FUNCTION_select_dashboards
|
||
|
||
# [ENTITY: Function('confirm_db_config_replacement')]
|
||
# CONTRACT:
|
||
# PURPOSE: Шаг 3. Управляет процессом подтверждения и настройки замены конфигураций БД.
|
||
# SPECIFICATION_LINK: func_confirm_db_config_replacement
|
||
# PRECONDITIONS: `self.from_c` и `self.to_c` инициализированы.
|
||
# POSTCONDITIONS: `self.db_config_replacement` содержит конфигурацию для замены или `None`.
|
||
def confirm_db_config_replacement(self):
|
||
"""Шаг 3: Подтверждение и настройка замены конфигурации БД."""
|
||
self.logger.info("[INFO][confirm_db_config_replacement][ENTER] Шаг 3/4: Замена конфигурации БД.")
|
||
|
||
while True:
|
||
choice = input("Хотите ли вы заменить конфигурации баз данных в YAML-файлах? (да/нет): ").lower().strip()
|
||
if choice in ["да", "нет"]:
|
||
break
|
||
print("Неверный ввод. Пожалуйста, введите 'да' или 'нет'.")
|
||
|
||
if choice == 'нет':
|
||
self.logger.info("[INFO][confirm_db_config_replacement][STATE] Замена конфигурации БД пропущена.")
|
||
return
|
||
|
||
# Эвристический расчет
|
||
from_env = self.from_c.env.upper()
|
||
to_env = self.to_c.env.upper()
|
||
heuristic_applied = False
|
||
|
||
if from_env == "DEV" and to_env == "PROD":
|
||
self.db_config_replacement = {"old": {"database_name": "db_dev"}, "new": {"database_name": "db_prod"}} # Пример
|
||
self.logger.info("[INFO][confirm_db_config_replacement][STATE] Применена эвристика DEV -> PROD.")
|
||
heuristic_applied = True
|
||
elif from_env == "PROD" and to_env == "DEV":
|
||
self.db_config_replacement = {"old": {"database_name": "db_prod"}, "new": {"database_name": "db_dev"}} # Пример
|
||
self.logger.info("[INFO][confirm_db_config_replacement][STATE] Применена эвристика PROD -> DEV.")
|
||
heuristic_applied = True
|
||
|
||
if heuristic_applied:
|
||
print(f"На основе эвристики будет произведена следующая замена: {self.db_config_replacement}")
|
||
confirm = input("Подтверждаете? (да/нет): ").lower().strip()
|
||
if confirm != 'да':
|
||
self.db_config_replacement = None
|
||
heuristic_applied = False
|
||
|
||
if not heuristic_applied:
|
||
print("Пожалуйста, введите детали для замены.")
|
||
old_key = input("Ключ для замены (например, database_name): ")
|
||
old_value = input(f"Старое значение для {old_key}: ")
|
||
new_value = input(f"Новое значение для {old_key}: ")
|
||
self.db_config_replacement = {"old": {old_key: old_value}, "new": {old_key: new_value}}
|
||
self.logger.info(f"[INFO][confirm_db_config_replacement][STATE] Установлена ручная замена: {self.db_config_replacement}")
|
||
|
||
self.logger.info("[INFO][confirm_db_config_replacement][EXIT] Шаг 3 завершен.")
|
||
# END_FUNCTION_confirm_db_config_replacement
|
||
|
||
# [ENTITY: Function('execute_migration')]
|
||
# CONTRACT:
|
||
# PURPOSE: Шаг 4. Выполняет фактическую миграцию выбранных дашбордов.
|
||
# SPECIFICATION_LINK: func_execute_migration
|
||
# PRECONDITIONS: Все предыдущие шаги (`select_environments`, `select_dashboards`) успешно выполнены.
|
||
# POSTCONDITIONS: Выбранные дашборды перенесены в целевое окружение.
|
||
def execute_migration(self):
|
||
"""Шаг 4: Выполнение миграции и обновления конфигураций."""
|
||
self.logger.info("[INFO][execute_migration][ENTER] Шаг 4/4: Выполнение миграции.")
|
||
|
||
if not self.dashboards_to_migrate:
|
||
self.logger.warning("[WARN][execute_migration][STATE] Нет дашбордов для миграции.")
|
||
print("Нет дашбордов для миграции. Завершение.")
|
||
return
|
||
|
||
db_configs_for_update = []
|
||
if self.db_config_replacement:
|
||
try:
|
||
from_dbs = self.from_c.get_databases()
|
||
to_dbs = self.to_c.get_databases()
|
||
|
||
# Просто пример, как можно было бы сопоставить базы данных.
|
||
# В реальном сценарии логика может быть сложнее.
|
||
for from_db in from_dbs:
|
||
for to_db in to_dbs:
|
||
# Предполагаем, что мы можем сопоставить базы по имени, заменив суффикс
|
||
if from_db['database_name'].replace(self.from_c.env.upper(), self.to_c.env.upper()) == to_db['database_name']:
|
||
db_configs_for_update.append({
|
||
"old": {"database_name": from_db['database_name']},
|
||
"new": {"database_name": to_db['database_name']}
|
||
})
|
||
self.logger.info(f"[INFO][execute_migration][STATE] Сформированы конфигурации для замены БД: {db_configs_for_update}")
|
||
except Exception as e:
|
||
self.logger.error(f"[ERROR][execute_migration][FAILURE] Не удалось получить конфигурации БД: {e}", exc_info=True)
|
||
print("Не удалось получить конфигурации БД. Миграция будет продолжена без замены.")
|
||
|
||
for dashboard in self.dashboards_to_migrate:
|
||
try:
|
||
dashboard_id = dashboard['id']
|
||
self.logger.info(f"[INFO][execute_migration][PROGRESS] Миграция дашборда: {dashboard['dashboard_title']} (ID: {dashboard_id})")
|
||
|
||
# 1. Экспорт
|
||
exported_content = self.from_c.export_dashboards(dashboard_id)
|
||
zip_path, unpacked_path = save_and_unpack_dashboard(exported_content, f"temp_export_{dashboard_id}", unpack=True)
|
||
self.logger.info(f"[INFO][execute_migration][STATE] Дашборд экспортирован и распакован в {unpacked_path}")
|
||
|
||
# 2. Обновление YAML, если нужно
|
||
if db_configs_for_update:
|
||
update_yamls(db_configs=db_configs_for_update, path=str(unpacked_path))
|
||
self.logger.info(f"[INFO][execute_migration][STATE] YAML-файлы обновлены.")
|
||
|
||
# 3. Упаковка и импорт
|
||
new_zip_path = f"migrated_dashboard_{dashboard_id}.zip"
|
||
create_dashboard_export(new_zip_path, [unpacked_path])
|
||
|
||
content_to_import, _ = read_dashboard_from_disk(new_zip_path)
|
||
self.to_c.import_dashboards(content_to_import)
|
||
self.logger.info(f"[INFO][execute_migration][SUCCESS] Дашборд {dashboard['dashboard_title']} успешно импортирован.")
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"[ERROR][execute_migration][FAILURE] Ошибка при миграции дашборда {dashboard['dashboard_title']}: {e}", exc_info=True)
|
||
print(f"Не удалось смигрировать дашборд: {dashboard['dashboard_title']}")
|
||
|
||
self.logger.info("[INFO][execute_migration][EXIT] Шаг 4 завершен.")
|
||
# END_FUNCTION_execute_migration
|
||
|
||
# END_CLASS_Migration
|
||
|
||
# [MAIN_EXECUTION_BLOCK]
|
||
if __name__ == "__main__":
|
||
migration = Migration()
|
||
migration.run()
|
||
# END_MAIN_EXECUTION_BLOCK
|
||
|
||
# END_MODULE_migration_script |