init refactor

This commit is contained in:
Volobuev Andrey
2025-07-29 17:56:15 +03:00
parent ca2357e2e2
commit f368f5ced9
3 changed files with 216 additions and 114 deletions

4
.gitignore vendored
View File

@@ -1,4 +1,6 @@
*__pycache__*
*.ps1
keyring passwords.py
logs
*logs*
*\.github*

View File

@@ -17,6 +17,7 @@ from superset_tool.client import SupersetClient
from superset_tool.utils.logger import SupersetLogger
from superset_tool.exceptions import AuthenticationError, SupersetAPIError, NetworkError, DashboardNotFoundError
from superset_tool.utils.fileio import save_and_unpack_dashboard, update_yamls, create_dashboard_export, create_temp_file, read_dashboard_from_disk
from superset_tool.utils.init_clients import setup_clients
# [IMPORTS] Стандартная библиотека
import os
@@ -84,127 +85,126 @@ database_config_gp = {
}
logger.debug("[CONFIG] Конфигурация Greenplum загружена.")
# [CONFIG] Конфигурация Superset API для Dev окружения
dev_config = SupersetConfig(
base_url="https://devta.bi.dwh.rusal.com/api/v1",
auth={
"provider": "db",
"username": "migrate_user",
"password": keyring.get_password("system", "dev migrate"),
"refresh": True
},
logger=logger,
verify_ssl=False
)
logger.debug(f"[CONFIG] Dev SupersetConfig создан для {dev_config.base_url}")
# [CONFIG] Конфигурация Superset API для Prod окружения
prod_config = SupersetConfig(
base_url="https://prodta.bi.dwh.rusal.com/api/v1",
auth={
"provider": "db",
"username": "migrate_user",
"password": keyring.get_password("system", "prod migrate"),
"refresh": True
},
logger=logger,
verify_ssl=False
)
logger.debug(f"[CONFIG] Prod SupersetConfig создан для {prod_config.base_url}")
# [CONFIG] Конфигурация Superset API для Sandbox окружения
sandbox_config = SupersetConfig(
base_url="https://sandboxta.bi.dwh.rusal.com/api/v1",
auth={
"provider": "db",
"username": "migrate_user",
"password": keyring.get_password("system", "sandbox migrate"),
"refresh": True
},
logger=logger,
verify_ssl=False
)
logger.debug(f"[CONFIG] Sandbox SupersetConfig создан для {sandbox_config.base_url}")
# [INIT] Инициализация клиентов Superset API
# @invariant: Все клиенты должны быть успешно инициализированы для дальнейшей работы.
try:
dev_client = SupersetClient(dev_config)
sandbox_client = SupersetClient(sandbox_config)
prod_client = SupersetClient(prod_config) # Не используется в текущем flow, но инициализирован.
logger.info("[COHERENCE_CHECK_PASSED] Клиенты Superset успешно инициализированы.")
except Exception as e:
logger.critical(f"[CRITICAL] Ошибка инициализации клиентов Superset: {str(e)}", exc_info=True)
exit(1) # Выход из скрипта при критической ошибке инициализации
# [ANCHOR] CLIENT_SETUP
clients = setup_clients(logger)
# [CONFIG] Определение исходного и целевого клиентов для миграции
# [COHERENCE_NOTE] Эти переменные задают конкретную миграцию. Для параметризации можно использовать аргументы командной строки.
from_c = sandbox_client # Источник миграции
to_c = dev_client # Цель миграции
from_c = clients["sbx"] # Источник миграции
to_c = clients["preprod"] # Цель миграции
dashboard_slug = "FI0060" # Идентификатор дашборда для миграции
# dashboard_id = 53 # ID не нужен, если есть slug
logger.info(f"[INFO] Конфигурация миграции: From '{from_c.config.base_url}' To '{to_c.config.base_url}' for dashboard slug '{dashboard_slug}'")
try:
# [ACTION] Получение метаданных исходного дашборда
logger.info(f"[INFO] Получение метаданных дашборда '{dashboard_slug}' из исходного окружения.")
dashboard_meta = from_c.get_dashboard(dashboard_slug)
dashboard_id = dashboard_meta["id"] # Получаем ID из метаданных
logger.info(f"[INFO] Найден дашборд '{dashboard_meta['dashboard_title']}' с ID: {dashboard_id}.")
# [CONTRACT]
# Описание: Мигрирует один дашборд с from_c на to_c.
# @pre:
# - from_c и to_c должны быть инициализированы.
# @post:
# - Дашборд с from_c успешно экспортирован и импортирован в to_c.
# @raise:
# - Exception: В случае ошибки экспорта или импорта.
def migrate_dashboard (dashboard_slug=dashboard_slug,
from_c = from_c,
to_c = to_c,
logger=logger,
update_db_yaml=False):
logger.info(f"[INFO] Конфигурация миграции: From '{from_c.config.base_url}' To '{to_c.config.base_url}' for dashboard slug '{dashboard_slug}'")
# [CONTEXT_MANAGER] Работа с временной директорией для обработки архива дашборда
with create_temp_file(suffix='.dir', logger=logger) as temp_root:
logger.info(f"[INFO] Создана временная директория: {temp_root}")
# [ANCHOR] EXPORT_DASHBOARD
# Экспорт дашборда во временную директорию ИЛИ чтение с диска
# [COHERENCE_NOTE] В текущем коде закомментирован экспорт и используется локальный файл.
# Для полноценной миграции следует использовать export_dashboard().
zip_content, filename = from_c.export_dashboard(dashboard_id) # Предпочтительный путь для реальной миграции
# [DEBUG] Использование файла с диска для тестирования миграции
#zip_db_path = r"C:\Users\VolobuevAA\Downloads\dashboard_export_20250704T082538.zip"
#logger.warning(f"[WARN] Используется ЛОКАЛЬНЫЙ файл дашборда для миграции: {zip_db_path}. Это может привести к некогерентности, если файл устарел.")
#zip_content, filename = read_dashboard_from_disk(zip_db_path, logger=logger)
# [ANCHOR] SAVE_AND_UNPACK
# Сохранение и распаковка во временную директорию
zip_path, unpacked_path = save_and_unpack_dashboard(
zip_content=zip_content,
original_filename=filename,
unpack=True,
logger=logger,
output_dir=temp_root
)
logger.info(f"[INFO] Дашборд распакован во временную директорию: {unpacked_path}")
# [ANCHOR] UPDATE_YAML_CONFIGS
# Обновление конфигураций баз данных в YAML-файлах
source_path = unpacked_path / Path(filename).stem # Путь к распакованному содержимому дашборда
db_configs_to_apply = [database_config_click, database_config_gp]
logger.info(f"[INFO] Применение трансформаций баз данных к YAML файлам в {source_path}...")
update_yamls(db_configs_to_apply, path=source_path, logger=logger)
logger.info("[INFO] YAML-файлы успешно обновлены.")
try:
# [ACTION] Получение метаданных исходного дашборда
logger.info(f"[INFO] Получение метаданных дашборда '{dashboard_slug}' из исходного окружения.")
dashboard_meta = from_c.get_dashboard(dashboard_slug)
dashboard_id = dashboard_meta["id"] # Получаем ID из метаданных
logger.info(f"[INFO] Найден дашборд '{dashboard_meta['dashboard_title']}' с ID: {dashboard_id}.")
# [ANCHOR] CREATE_NEW_EXPORT_ARCHIVE
# Создание нового экспорта дашборда из модифицированных файлов
temp_zip = temp_root / f"{dashboard_slug}_migrated.zip" # Имя файла для импорта
logger.info(f"[INFO] Создание нового ZIP-архива для импорта: {temp_zip}")
create_dashboard_export(temp_zip, [source_path], logger=logger)
logger.info("[INFO] Новый ZIP-архив дашборда готов к импорту.")
# [CONTEXT_MANAGER] Работа с временной директорией для обработки архива дашборда
with create_temp_file(suffix='.dir', logger=logger) as temp_root:
logger.info(f"[INFO] Создана временная директория: {temp_root}")
# [ANCHOR] EXPORT_DASHBOARD
# Экспорт дашборда во временную директорию ИЛИ чтение с диска
# [COHERENCE_NOTE] В текущем коде закомментирован экспорт и используется локальный файл.
# Для полноценной миграции следует использовать export_dashboard().
zip_content, filename = from_c.export_dashboard(dashboard_id) # Предпочтительный путь для реальной миграции
# [DEBUG] Использование файла с диска для тестирования миграции
#zip_db_path = r"C:\Users\VolobuevAA\Downloads\dashboard_export_20250704T082538.zip"
#logger.warning(f"[WARN] Используется ЛОКАЛЬНЫЙ файл дашборда для миграции: {zip_db_path}. Это может привести к некогерентности, если файл устарел.")
#zip_content, filename = read_dashboard_from_disk(zip_db_path, logger=logger)
# [ANCHOR] SAVE_AND_UNPACK
# Сохранение и распаковка во временную директорию
zip_path, unpacked_path = save_and_unpack_dashboard(
zip_content=zip_content,
original_filename=filename,
unpack=True,
logger=logger,
output_dir=temp_root
)
logger.info(f"[INFO] Дашборд распакован во временную директорию: {unpacked_path}")
# [ANCHOR] UPDATE_YAML_CONFIGS
# Обновление конфигураций баз данных в YAML-файлах
if update_db_yaml:
source_path = unpacked_path / Path(filename).stem # Путь к распакованному содержимому дашборда
db_configs_to_apply = [database_config_click, database_config_gp]
logger.info(f"[INFO] Применение трансформаций баз данных к YAML файлам в {source_path}...")
update_yamls(db_configs_to_apply, path=source_path, logger=logger)
logger.info("[INFO] YAML-файлы успешно обновлены.")
# [ANCHOR] IMPORT_DASHBOARD
# Импорт обновленного дашборда в целевое окружение
logger.info(f"[INFO] Запуск импорта дашборда в целевое окружение {to_c.config.base_url}...")
import_result = to_c.import_dashboard(temp_zip)
logger.info(f"[COHERENCE_CHECK_PASSED] Дашборд '{dashboard_slug}' успешно импортирован/обновлен.", extra={"import_result": import_result})
# [ANCHOR] CREATE_NEW_EXPORT_ARCHIVE
# Создание нового экспорта дашборда из модифицированных файлов
temp_zip = temp_root / f"{dashboard_slug}_migrated.zip" # Имя файла для импорта
logger.info(f"[INFO] Создание нового ZIP-архива для импорта: {temp_zip}")
create_dashboard_export(temp_zip, [source_path], logger=logger)
logger.info("[INFO] Новый ZIP-архив дашборда готов к импорту.")
else:
temp_zip = zip_path
# [ANCHOR] IMPORT_DASHBOARD
# Импорт обновленного дашборда в целевое окружение
logger.info(f"[INFO] Запуск импорта дашборда в целевое окружение {to_c.config.base_url}...")
import_result = to_c.import_dashboard(temp_zip)
logger.info(f"[COHERENCE_CHECK_PASSED] Дашборд '{dashboard_slug}' успешно импортирован/обновлен.", extra={"import_result": import_result})
except (AuthenticationError, SupersetAPIError, NetworkError, DashboardNotFoundError) as e:
logger.error(f"[ERROR] Ошибка миграции дашборда: {str(e)}", exc_info=True, extra=e.context)
exit(1)
except Exception as e:
logger.critical(f"[CRITICAL] Фатальная и необработанная ошибка в скрипте миграции: {str(e)}", exc_info=True)
exit(1)
except (AuthenticationError, SupersetAPIError, NetworkError, DashboardNotFoundError) as e:
logger.error(f"[ERROR] Ошибка миграции дашборда: {str(e)}", exc_info=True, extra=e.context)
# exit(1)
except Exception as e:
logger.critical(f"[CRITICAL] Фатальная и необработанная ошибка в скрипте миграции: {str(e)}", exc_info=True)
# exit(1)
logger.info("[INFO] Процесс миграции завершен.")
logger.info("[INFO] Процесс миграции завершен.")
# [CONTRACT]
# Описание: Мигрирует все дашборды с from_c на to_c.
# @pre:
# - from_c и to_c должны быть инициализированы.
# @post:
# - Все дашборды с from_c успешно экспортированы и импортированы в to_c.
# @raise:
# - Exception: В случае ошибки экспорта или импорта.
def migrate_all_dashboards(from_c: SupersetClient, to_c: SupersetClient,logger=logger) -> None:
# [ACTION] Получение списка всех дашбордов из исходного окружения.
logger.info(f"[ACTION] Получение списка всех дашбордов из '{from_c.config.base_url}'")
total_dashboards, dashboards = from_c.get_dashboards()
logger.info(f"[INFO] Найдено {total_dashboards} дашбордов для миграции.")
# [ACTION] Итерация по всем дашбордам и миграция каждого из них.
for dashboard in dashboards:
dashboard_id = dashboard["id"]
dashboard_slug = dashboard["slug"]
dashboard_title = dashboard["dashboard_title"]
logger.info(f"[INFO] Начало миграции дашборда '{dashboard_title}' (ID: {dashboard_id}, Slug: {dashboard_slug}).")
if dashboard_slug:
try:
migrate_dashboard(dashboard_slug=dashboard_slug,from_c=from_c,to_c=to_c,logger=logger)
except Exception as e:
logger.error(f"[ERROR] Ошибка миграции дашборда: {str(e)}", exc_info=True, extra=e.context)
else:
logger.info(f"[INFO] Пропуск '{dashboard_title}' (ID: {dashboard_id}, Slug: {dashboard_slug}). Пустой SLUG")
logger.info(f"[INFO] Миграция всех дашбордов с '{from_c.config.base_url}' на '{to_c.config.base_url}' завершена.")
# [ACTION] Вызов функции миграции
migrate_all_dashboards(from_c, to_c)

View File

@@ -0,0 +1,100 @@
# [MODULE] Superset Init clients
# @contract: Автоматизирует процесс инициализации клиентов для использования скриптами.
# @semantic_layers:
# 1. Инициализация логгера и клиентов Superset.
# @coherence:
# - Использует `SupersetClient` для взаимодействия с API Superset.
# - Использует `SupersetLogger` для централизованного логирования.
# - Интегрируется с `keyring` для безопасного хранения паролей.
# [IMPORTS] Стандартная библиотека
import logging
from datetime import datetime
from pathlib import Path
# [IMPORTS] Сторонние библиотеки
import keyring
# [IMPORTS] Локальные модули
from superset_tool.models import SupersetConfig
from superset_tool.client import SupersetClient
from superset_tool.utils.logger import SupersetLogger
# [FUNCTION] setup_clients
# @contract: Инициализирует и возвращает SupersetClient для каждого заданного окружения.
# @pre:
# - `keyring` должен содержать необходимые пароли для "dev migrate", "prod migrate", "sandbox migrate".
# - `logger` должен быть инициализирован.
# @post:
# - Возвращает словарь {env_name: SupersetClient_instance}.
# - Логирует успешную инициализацию или ошибку.
# @raise:
# - `Exception`: При любой ошибке в процессе инициализации клиентов (например, отсутствие пароля в keyring, проблемы с сетью при первой аутентификации).
def setup_clients(logger: SupersetLogger):
"""Инициализация клиентов для разных окружений"""
# [ANCHOR] CLIENTS_INITIALIZATION
clients = {}
try:
# [INFO] Инициализация конфигурации для Dev
dev_config = SupersetConfig(
base_url="https://devta.bi.dwh.rusal.com/api/v1",
auth={
"provider": "db",
"username": "migrate_user",
"password": keyring.get_password("system", "dev migrate"),
"refresh": True
},
verify_ssl=False
)
# [DEBUG] Dev config created: {dev_config.base_url}
# [INFO] Инициализация конфигурации для Prod
prod_config = SupersetConfig(
base_url="https://prodta.bi.dwh.rusal.com/api/v1",
auth={
"provider": "db",
"username": "migrate_user",
"password": keyring.get_password("system", "prod migrate"),
"refresh": True
},
verify_ssl=False
)
# [DEBUG] Prod config created: {prod_config.base_url}
# [INFO] Инициализация конфигурации для Sandbox
sandbox_config = SupersetConfig(
base_url="https://sandboxta.bi.dwh.rusal.com/api/v1",
auth={
"provider": "db",
"username": "migrate_user",
"password": keyring.get_password("system", "sandbox migrate"),
"refresh": True
},
verify_ssl=False
)
# [DEBUG] Sandbox config created: {sandbox_config.base_url}
# [INFO] Инициализация конфигурации для Preprod
preprod_config = SupersetConfig(
base_url="https://preprodta.bi.dwh.rusal.com/api/v1",
auth={
"provider": "db",
"username": "migrate_user",
"password": keyring.get_password("system", "preprod migrate"),
"refresh": True
},
verify_ssl=False
)
# [DEBUG] Sandbox config created: {sandbox_config.base_url}
# [INFO] Создание экземпляров SupersetClient
clients['dev'] = SupersetClient(dev_config, logger)
clients['sbx'] = SupersetClient(sandbox_config,logger)
clients['prod'] = SupersetClient(prod_config,logger)
clients['preprod'] = SupersetClient(preprod_config,logger)
logger.info("[COHERENCE_CHECK_PASSED] Клиенты для окружений успешно инициализированы", extra={"envs": list(clients.keys())})
return clients
except Exception as e:
logger.error(f"[ERROR] Ошибка инициализации клиентов: {str(e)}", exc_info=True)
raise