Files
ss-tools/migration_script.py
2025-08-16 12:29:37 +03:00

303 lines
18 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
# CONTRACT:
# PURPOSE: Интерактивный скрипт для миграции ассетов Superset между различными окружениями.
# SPECIFICATION_LINK: mod_migration_script
# PRECONDITIONS: Наличие корректных конфигурационных файлов для подключения к Superset.
# POSTCONDITIONS: Выбранные ассеты успешно перенесены из исходного в целевое окружение.
# IMPORTS: [argparse, superset_tool.client, superset_tool.utils.init_clients, superset_tool.utils.logger, superset_tool.utils.fileio]
"""
[MODULE] Superset Migration Tool
@description: Интерактивный скрипт для миграции ассетов Superset между различными окружениями.
"""
# [IMPORTS]
from superset_tool.client import SupersetClient
from superset_tool.utils.init_clients import init_superset_clients
from superset_tool.utils.logger import SupersetLogger
from superset_tool.utils.fileio import (
save_and_unpack_dashboard,
read_dashboard_from_disk,
update_yamls,
create_dashboard_export
)
# [ENTITY: Class('Migration')]
# CONTRACT:
# PURPOSE: Инкапсулирует логику и состояние процесса миграции.
# SPECIFICATION_LINK: class_migration
# ATTRIBUTES:
# - name: logger, type: SupersetLogger, description: Экземпляр логгера.
# - name: from_c, type: SupersetClient, description: Клиент для исходного окружения.
# - name: to_c, type: SupersetClient, description: Клиент для целевого окружения.
# - name: dashboards_to_migrate, type: list, description: Список дашбордов для миграции.
# - name: db_config_replacement, type: dict, description: Конфигурация для замены данных БД.
class Migration:
"""
Класс для управления процессом миграции дашбордов Superset.
"""
def __init__(self):
self.logger = SupersetLogger(name="migration_script")
self.from_c: SupersetClient = None
self.to_c: SupersetClient = None
self.dashboards_to_migrate = []
self.db_config_replacement = None
# END_FUNCTION___init__
# [ENTITY: Function('run')]
# CONTRACT:
# PURPOSE: Запускает основной воркфлоу миграции, координируя все шаги.
# SPECIFICATION_LINK: func_run_migration
# PRECONDITIONS: None
# POSTCONDITIONS: Процесс миграции завершен.
def run(self):
"""Запускает основной воркфлоу миграции."""
self.logger.info("[INFO][run][ENTER] Запуск скрипта миграции.")
self.select_environments()
self.select_dashboards()
self.confirm_db_config_replacement()
self.execute_migration()
self.logger.info("[INFO][run][EXIT] Скрипт миграции завершен.")
# END_FUNCTION_run
# [ENTITY: Function('select_environments')]
# CONTRACT:
# PURPOSE: Шаг 1. Обеспечивает интерактивный выбор исходного и целевого окружений.
# SPECIFICATION_LINK: func_select_environments
# PRECONDITIONS: None
# POSTCONDITIONS: Атрибуты `self.from_c` и `self.to_c` инициализированы валидными клиентами Superset.
def select_environments(self):
"""Шаг 1: Выбор окружений (источник и назначение)."""
self.logger.info("[INFO][select_environments][ENTER] Шаг 1/4: Выбор окружений.")
available_envs = {"1": "DEV", "2": "PROD"}
print("Доступные окружения:")
for key, value in available_envs.items():
print(f" {key}. {value}")
while self.from_c is None:
try:
from_env_choice = input("Выберите исходное окружение (номер): ")
from_env_name = available_envs.get(from_env_choice)
if not from_env_name:
print("Неверный выбор. Попробуйте снова.")
continue
clients = init_superset_clients(self.logger, env=from_env_name.lower())
self.from_c = clients[0]
self.logger.info(f"[INFO][select_environments][STATE] Исходное окружение: {from_env_name}")
except Exception as e:
self.logger.error(f"[ERROR][select_environments][FAILURE] Ошибка при инициализации клиента-источника: {e}", exc_info=True)
print("Не удалось инициализировать клиент. Проверьте конфигурацию.")
while self.to_c is None:
try:
to_env_choice = input("Выберите целевое окружение (номер): ")
to_env_name = available_envs.get(to_env_choice)
if not to_env_name:
print("Неверный выбор. Попробуйте снова.")
continue
if to_env_name == self.from_c.env:
print("Целевое и исходное окружения не могут совпадать.")
continue
clients = init_superset_clients(self.logger, env=to_env_name.lower())
self.to_c = clients[0]
self.logger.info(f"[INFO][select_environments][STATE] Целевое окружение: {to_env_name}")
except Exception as e:
self.logger.error(f"[ERROR][select_environments][FAILURE] Ошибка при инициализации целевого клиента: {e}", exc_info=True)
print("Не удалось инициализировать клиент. Проверьте конфигурацию.")
self.logger.info("[INFO][select_environments][EXIT] Шаг 1 завершен.")
# END_FUNCTION_select_environments
# [ENTITY: Function('select_dashboards')]
# CONTRACT:
# PURPOSE: Шаг 2. Обеспечивает интерактивный выбор дашбордов для миграции.
# SPECIFICATION_LINK: func_select_dashboards
# PRECONDITIONS: `self.from_c` должен быть инициализирован.
# POSTCONDITIONS: `self.dashboards_to_migrate` содержит список выбранных дашбордов.
def select_dashboards(self):
"""Шаг 2: Выбор дашбордов для миграции."""
self.logger.info("[INFO][select_dashboards][ENTER] Шаг 2/4: Выбор дашбордов.")
try:
all_dashboards = self.from_c.get_dashboards()
if not all_dashboards:
self.logger.warning("[WARN][select_dashboards][STATE] В исходном окружении не найдено дашбордов.")
print("В исходном окружении не найдено дашбордов.")
return
while True:
print("\nДоступные дашборды:")
for i, dashboard in enumerate(all_dashboards):
print(f" {i + 1}. {dashboard['dashboard_title']}")
print("\nОпции:")
print(" - Введите номера дашбордов через запятую (например, 1, 3, 5).")
print(" - Введите 'все' для выбора всех дашбордов.")
print(" - Введите 'поиск <запрос>' для поиска дашбордов.")
print(" - Введите 'выход' для завершения.")
choice = input("Ваш выбор: ").lower().strip()
if choice == 'выход':
break
elif choice == 'все':
self.dashboards_to_migrate = all_dashboards
self.logger.info(f"[INFO][select_dashboards][STATE] Выбраны все дашборды: {len(self.dashboards_to_migrate)}")
break
elif choice.startswith('поиск '):
search_query = choice[6:].strip()
filtered_dashboards = [d for d in all_dashboards if search_query in d['dashboard_title'].lower()]
if not filtered_dashboards:
print("По вашему запросу ничего не найдено.")
else:
all_dashboards = filtered_dashboards
continue
else:
try:
selected_indices = [int(i.strip()) - 1 for i in choice.split(',')]
self.dashboards_to_migrate = [all_dashboards[i] for i in selected_indices if 0 <= i < len(all_dashboards)]
self.logger.info(f"[INFO][select_dashboards][STATE] Выбрано дашбордов: {len(self.dashboards_to_migrate)}")
break
except (ValueError, IndexError):
print("Неверный ввод. Пожалуйста, введите корректные номера.")
except Exception as e:
self.logger.error(f"[ERROR][select_dashboards][FAILURE] Ошибка при получении или выборе дашбордов: {e}", exc_info=True)
print("Произошла ошибка при работе с дашбордами.")
self.logger.info("[INFO][select_dashboards][EXIT] Шаг 2 завершен.")
# END_FUNCTION_select_dashboards
# [ENTITY: Function('confirm_db_config_replacement')]
# CONTRACT:
# PURPOSE: Шаг 3. Управляет процессом подтверждения и настройки замены конфигураций БД.
# SPECIFICATION_LINK: func_confirm_db_config_replacement
# PRECONDITIONS: `self.from_c` и `self.to_c` инициализированы.
# POSTCONDITIONS: `self.db_config_replacement` содержит конфигурацию для замены или `None`.
def confirm_db_config_replacement(self):
"""Шаг 3: Подтверждение и настройка замены конфигурации БД."""
self.logger.info("[INFO][confirm_db_config_replacement][ENTER] Шаг 3/4: Замена конфигурации БД.")
while True:
choice = input("Хотите ли вы заменить конфигурации баз данных в YAML-файлах? (да/нет): ").lower().strip()
if choice in ["да", "нет"]:
break
print("Неверный ввод. Пожалуйста, введите 'да' или 'нет'.")
if choice == 'нет':
self.logger.info("[INFO][confirm_db_config_replacement][STATE] Замена конфигурации БД пропущена.")
return
# Эвристический расчет
from_env = self.from_c.env.upper()
to_env = self.to_c.env.upper()
heuristic_applied = False
if from_env == "DEV" and to_env == "PROD":
self.db_config_replacement = {"old": {"database_name": "db_dev"}, "new": {"database_name": "db_prod"}} # Пример
self.logger.info("[INFO][confirm_db_config_replacement][STATE] Применена эвристика DEV -> PROD.")
heuristic_applied = True
elif from_env == "PROD" and to_env == "DEV":
self.db_config_replacement = {"old": {"database_name": "db_prod"}, "new": {"database_name": "db_dev"}} # Пример
self.logger.info("[INFO][confirm_db_config_replacement][STATE] Применена эвристика PROD -> DEV.")
heuristic_applied = True
if heuristic_applied:
print(f"На основе эвристики будет произведена следующая замена: {self.db_config_replacement}")
confirm = input("Подтверждаете? (да/нет): ").lower().strip()
if confirm != 'да':
self.db_config_replacement = None
heuristic_applied = False
if not heuristic_applied:
print("Пожалуйста, введите детали для замены.")
old_key = input("Ключ для замены (например, database_name): ")
old_value = input(f"Старое значение для {old_key}: ")
new_value = input(f"Новое значение для {old_key}: ")
self.db_config_replacement = {"old": {old_key: old_value}, "new": {old_key: new_value}}
self.logger.info(f"[INFO][confirm_db_config_replacement][STATE] Установлена ручная замена: {self.db_config_replacement}")
self.logger.info("[INFO][confirm_db_config_replacement][EXIT] Шаг 3 завершен.")
# END_FUNCTION_confirm_db_config_replacement
# [ENTITY: Function('execute_migration')]
# CONTRACT:
# PURPOSE: Шаг 4. Выполняет фактическую миграцию выбранных дашбордов.
# SPECIFICATION_LINK: func_execute_migration
# PRECONDITIONS: Все предыдущие шаги (`select_environments`, `select_dashboards`) успешно выполнены.
# POSTCONDITIONS: Выбранные дашборды перенесены в целевое окружение.
def execute_migration(self):
"""Шаг 4: Выполнение миграции и обновления конфигураций."""
self.logger.info("[INFO][execute_migration][ENTER] Шаг 4/4: Выполнение миграции.")
if not self.dashboards_to_migrate:
self.logger.warning("[WARN][execute_migration][STATE] Нет дашбордов для миграции.")
print("Нет дашбордов для миграции. Завершение.")
return
db_configs_for_update = []
if self.db_config_replacement:
try:
from_dbs = self.from_c.get_databases()
to_dbs = self.to_c.get_databases()
# Просто пример, как можно было бы сопоставить базы данных.
# В реальном сценарии логика может быть сложнее.
for from_db in from_dbs:
for to_db in to_dbs:
# Предполагаем, что мы можем сопоставить базы по имени, заменив суффикс
if from_db['database_name'].replace(self.from_c.env.upper(), self.to_c.env.upper()) == to_db['database_name']:
db_configs_for_update.append({
"old": {"database_name": from_db['database_name']},
"new": {"database_name": to_db['database_name']}
})
self.logger.info(f"[INFO][execute_migration][STATE] Сформированы конфигурации для замены БД: {db_configs_for_update}")
except Exception as e:
self.logger.error(f"[ERROR][execute_migration][FAILURE] Не удалось получить конфигурации БД: {e}", exc_info=True)
print("Не удалось получить конфигурации БД. Миграция будет продолжена без замены.")
for dashboard in self.dashboards_to_migrate:
try:
dashboard_id = dashboard['id']
self.logger.info(f"[INFO][execute_migration][PROGRESS] Миграция дашборда: {dashboard['dashboard_title']} (ID: {dashboard_id})")
# 1. Экспорт
exported_content = self.from_c.export_dashboards(dashboard_id)
zip_path, unpacked_path = save_and_unpack_dashboard(exported_content, f"temp_export_{dashboard_id}", unpack=True)
self.logger.info(f"[INFO][execute_migration][STATE] Дашборд экспортирован и распакован в {unpacked_path}")
# 2. Обновление YAML, если нужно
if db_configs_for_update:
update_yamls(db_configs=db_configs_for_update, path=str(unpacked_path))
self.logger.info(f"[INFO][execute_migration][STATE] YAML-файлы обновлены.")
# 3. Упаковка и импорт
new_zip_path = f"migrated_dashboard_{dashboard_id}.zip"
create_dashboard_export(new_zip_path, [unpacked_path])
content_to_import, _ = read_dashboard_from_disk(new_zip_path)
self.to_c.import_dashboards(content_to_import)
self.logger.info(f"[INFO][execute_migration][SUCCESS] Дашборд {dashboard['dashboard_title']} успешно импортирован.")
except Exception as e:
self.logger.error(f"[ERROR][execute_migration][FAILURE] Ошибка при миграции дашборда {dashboard['dashboard_title']}: {e}", exc_info=True)
print(f"Не удалось смигрировать дашборд: {dashboard['dashboard_title']}")
self.logger.info("[INFO][execute_migration][EXIT] Шаг 4 завершен.")
# END_FUNCTION_execute_migration
# END_CLASS_Migration
# [MAIN_EXECUTION_BLOCK]
if __name__ == "__main__":
migration = Migration()
migration.run()
# END_MAIN_EXECUTION_BLOCK
# END_MODULE_migration_script