diff --git a/.gitignore b/.gitignore index dccf889..f2746a5 100644 --- a/.gitignore +++ b/.gitignore @@ -4,4 +4,6 @@ dashboards/ *.ps1 *.ipynb *.txt +*.zip keyring passwords.py +Logs/ \ No newline at end of file diff --git a/backup_script.py b/backup_script.py index a328ec1..e0445ba 100644 --- a/backup_script.py +++ b/backup_script.py @@ -6,22 +6,11 @@ import os from pathlib import Path from superset_tool.models import SupersetConfig, DatabaseConfig from superset_tool.client import SupersetClient +from superset_tool.utils.logger import SupersetLogger from superset_tool.utils.fileio import save_and_unpack_dashboard, archive_exports, sanitize_filename -# Настройка логирования -LOG_DIR = Path("P:\\Superset\\010 Бекапы\\Logs") -LOG_DIR.mkdir(exist_ok=True, parents=True) -logging.basicConfig( - level=logging.INFO, - format='%(asctime)s - %(levelname)s - %(message)s', - handlers=[ - logging.FileHandler(LOG_DIR / f"superset_backup_{datetime.now().strftime('%Y%m%d')}.log"), - logging.StreamHandler() - ] -) -logger = logging.getLogger(__name__) -def setup_clients(): +def setup_clients(logger: SupersetLogger): """Инициализация клиентов для разных окружений""" clients = {} try: @@ -34,11 +23,12 @@ def setup_clients(): "password": keyring.get_password("system", "dev migrate"), "refresh": True }, + logger=logger, verify_ssl=False ) # Конфигурация для Prod - sandbox_config = SupersetConfig( + prod_config = SupersetConfig( base_url="https://prodta.bi.dwh.rusal.com/api/v1", auth={ "provider": "db", @@ -46,11 +36,12 @@ def setup_clients(): "password": keyring.get_password("system", "prod migrate"), "refresh": True }, + logger=logger, verify_ssl=False ) # Конфигурация для Sandbox - prod_config = SupersetConfig( + sandbox_config = SupersetConfig( base_url="https://sandboxta.bi.dwh.rusal.com/api/v1", auth={ "provider": "db", @@ -58,20 +49,24 @@ def setup_clients(): "password": keyring.get_password("system", "sandbox migrate"), "refresh": True }, + logger=logger, verify_ssl=False ) clients['dev'] = SupersetClient(dev_config) clients['sbx'] = SupersetClient(sandbox_config) + clients['prod'] = SupersetClient(prod_config) logger.info("Клиенты для окружений успешно инициализированы") return clients except Exception as e: logger.error(f"Ошибка инициализации клиентов: {str(e)}") raise -def backup_dashboards(client, env_name, backup_root): +def backup_dashboards(client, + env_name, + backup_root, + logger: SupersetLogger ): """Выполнение бэкапа дашбордов для указанного окружения""" - logger.info(f"Начало бэкапа для окружения {env_name}") try: dashboard_count, dashboard_meta = client.get_dashboards() total = 0 @@ -93,48 +88,57 @@ def backup_dashboards(client, env_name, backup_root): unpack=False ) - logger.info(f"[{env_name}] Дашборд {dashboard_title} сохранен в {zip_path}") success += 1 - #Очистка старых бэкапов try: archive_exports(dashboard_dir) - logger.debug(f"[{env_name}] Выполнена очистка для {dashboard_title}") except Exception as cleanup_error: - logger.error(f"[{env_name}] Ошибка очистки {dashboard_title}: {str(cleanup_error)}") + raise cleanup_error except Exception as db_error: - logger.error(f"[{env_name}] Ошибка обработки дашборда {dashboard_title}: {str(db_error)}", - exc_info=True) - - logger.info(f"Бэкап {env_name} завершен. Успешно: {success}/{total}. Всего на сервере - {dashboard_count}") + raise db_error return success == total except Exception as e: - logger.error(f"Критическая ошибка при бэкапе {env_name}: {str(e)}", exc_info=True) return False def main(): + # Инициализация логгера + log_dir = Path("P:\\Superset\\010 Бекапы\\Logs") + logger = SupersetLogger( + log_dir=log_dir, + level=logging.INFO, + console=True + ) """Основная функция выполнения бэкапа""" logger.info("="*50) logger.info("Запуск процесса бэкапа Superset") logger.info("="*50) try: - clients = setup_clients() + clients = setup_clients(logger) superset_backup_repo = Path("P:\\Superset\\010 Бекапы") # Бэкап для DEV dev_success = backup_dashboards( clients['dev'], "DEV", - superset_backup_repo + superset_backup_repo, + logger=logger ) - # Бэкап для Sandbox + #Бэкап для Sandbox sbx_success = backup_dashboards( clients['sbx'], "SBX", - superset_backup_repo + superset_backup_repo, + logger=logger + ) + #Бэкап для Прода + prod_success = backup_dashboards( + clients['prod'], + "PROD", + superset_backup_repo, + logger=logger ) # Итоговый отчет @@ -142,7 +146,8 @@ def main(): logger.info("Итоги выполнения бэкапа:") logger.info(f"DEV: {'Успешно' if dev_success else 'С ошибками'}") logger.info(f"SBX: {'Успешно' if sbx_success else 'С ошибками'}") - logger.info(f"Полный лог доступен в: {LOG_DIR}") + logger.info(f"PROD: {'Успешно' if prod_success else 'С ошибками'}") + logger.info(f"Полный лог доступен в: {log_dir}") except Exception as e: logger.critical(f"Фатальная ошибка выполнения скрипта: {str(e)}", exc_info=True) diff --git a/migration_script.py b/migration_script.py index b464d77..02a371d 100644 --- a/migration_script.py +++ b/migration_script.py @@ -1,31 +1,62 @@ from superset_tool.models import SupersetConfig from superset_tool.client import SupersetClient +from superset_tool.utils.logger import SupersetLogger from superset_tool.exceptions import AuthenticationError -from superset_tool.utils.fileio import save_and_unpack_dashboard, update_db_yaml, archive_exports, sync_for_git +from superset_tool.utils.fileio import save_and_unpack_dashboard, update_db_yaml, create_dashboard_export import os import keyring from pathlib import Path +import logging +log_dir = Path("H:\\dev\\Logs") +logger = SupersetLogger( + log_dir=log_dir, + level=logging.INFO, + console=True +) -database_config={"PROD": +database_config_click={"new": { "database_name": "Prod Clickhouse", "sqlalchemy_uri": "clickhousedb+connect://clicketl:XXXXXXXXXX@rgm-s-khclk.hq.root.ad:443/dm", "uuid": "b9b67cb5-9874-4dc6-87bd-354fc33be6f9", + "database_uuid": "b9b67cb5-9874-4dc6-87bd-354fc33be6f9", "allow_ctas": "true", "allow_cvas": "true", "allow_dml": "true" }, - "DEV": { + "old": { "database_name": "Dev Clickhouse", "sqlalchemy_uri": "clickhousedb+connect://dwhuser:XXXXXXXXXX@10.66.229.179:8123/dm", - "uuid": "b9b67cb5-9874-4dc6-87bd-354fc33be6f9", - "database_uuid": "b9b67cb5-9874-4dc6-87bd-354fc33be6f9", + "uuid": "e9fd8feb-cb77-4e82-bc1d-44768b8d2fc2", + "database_uuid": "e9fd8feb-cb77-4e82-bc1d-44768b8d2fc2", "allow_ctas": "true", "allow_cvas": "true", "allow_dml": "true" } } + +database_config_gp={"new": + { + "database_name": "Prod Greenplum", + "sqlalchemy_uri": "postgresql+psycopg2://viz_powerbi_gp_prod:XXXXXXXXXX@10.66.229.201:5432/dwh", + "uuid": "805132a3-e942-40ce-99c7-bee8f82f8aa8", + "database_uuid": "805132a3-e942-40ce-99c7-bee8f82f8aa8", + "allow_ctas": "true", + "allow_cvas": "true", + "allow_dml": "true" + }, + "old": { + "database_name": "DEV Greenplum", + "sqlalchemy_uri": "postgresql+psycopg2://viz_superset_gp_dev:XXXXXXXXXX@10.66.229.171:5432/dwh", + "uuid": "97b97481-43c3-4181-94c5-b69eaaa1e11f", + "database_uuid": "97b97481-43c3-4181-94c5-b69eaaa1e11f", + "allow_ctas": "false", + "allow_cvas": "false", + "allow_dml": "false" + } + } + # Конфигурация для Dev dev_config = SupersetConfig( base_url="https://devta.bi.dwh.rusal.com/api/v1", @@ -35,6 +66,7 @@ dev_config = SupersetConfig( "password": keyring.get_password("system", "dev migrate"), "refresh": True }, + logger=logger, verify_ssl=False ) @@ -47,6 +79,7 @@ prod_config = SupersetConfig( "password": keyring.get_password("system", "prod migrate"), "refresh": True }, + logger=logger, verify_ssl=False ) @@ -59,41 +92,45 @@ sandbox_config = SupersetConfig( "password": keyring.get_password("system", "sandbox migrate"), "refresh": True }, + logger=logger, verify_ssl=False ) # Инициализация клиента dev_client = SupersetClient(dev_config) -#prod_client = SupersetClient(prod_config) +sandbox_client = SupersetClient(sandbox_config) +prod_client = SupersetClient(prod_config) -dashboard_slug = "IM0010" +from_c = dev_client +to_c = sandbox_client +dashboard_slug = "FI0050" #dashboard_id = 53 -dashboard_meta = dev_client.get_dashboard(dashboard_slug) -print(dashboard_meta) -# print(dashboard_meta["dashboard_title"]) +dashboard_meta = from_c.get_dashboard(dashboard_slug) +#print(dashboard_meta) +print(dashboard_meta["dashboard_title"]) -#dashboard_id = dashboard_meta["id"] -# zip_content, filename = prod_client.export_dashboard(dashboard_id) -# superset_repo = "H:\\Superset\\repo\\" -# # print(f"Экспортируем дашборд ID = {dashboard_id}...") -# # #Сохранение и распаковка -# zip_path, unpacked_path = save_and_unpack_dashboard( -# zip_content=zip_content, -# original_filename=filename, -# output_dir=f"dashboards\{dashboard_slug}" -# ) -# dest_path = os.path.join(superset_repo,dashboard_slug) -# source_path = os.path.join(unpacked_path,Path(filename).stem) -# print(dest_path) -# print(source_path) -# sync_for_git(source_path=source_path,destination_path=dest_path,verbose=True) +dashboard_id = dashboard_meta["id"] +zip_content, filename = from_c.export_dashboard(dashboard_id, logger=logger) +superset_repo = Path("H:\\dev\\dashboards\\") +# print(f"Экспортируем дашборд ID = {dashboard_id}...") +# #Сохранение и распаковка +zip_path, unpacked_path = save_and_unpack_dashboard( + zip_content=zip_content, + original_filename=filename, + unpack=True, + logger=logger, + output_dir=os.path.join(superset_repo,dashboard_slug) +) +dest_path = os.path.join(superset_repo,dashboard_slug) +source_path = os.path.join(unpacked_path,Path(filename).stem) -# print(f"Сохранено в: {zip_path}") -# print(f"Распаковано в: {unpacked_path}") -# update_db_yaml(prod_config.database_config, path = unpacked_path,verbose=False) +update_db_yaml(database_config_click, path = source_path, logger=logger) +update_db_yaml(database_config_gp, path = source_path, logger=logger) -# prod_client.import_dashboard(zip_path) +create_dashboard_export(f"{dashboard_slug}.zip",[source_path],logger=logger) + +zip_path = Path(f"{dashboard_slug}.zip") +to_c.import_dashboard(zip_path) -# archive_exports("dashboards", max_files=3) diff --git a/superset_tool/client.py b/superset_tool/client.py index 604131f..f76740a 100644 --- a/superset_tool/client.py +++ b/superset_tool/client.py @@ -7,11 +7,13 @@ from pydantic import BaseModel, Field from .utils.fileio import * from .exceptions import * from .models import SupersetConfig +from .utils.logger import SupersetLogger class SupersetClient: def __init__(self, config: SupersetConfig): self.config = config + self.logger = config.logger or SupersetLogger(console=False) self.session = requests.Session() self._setup_session() self._authenticate() @@ -25,6 +27,7 @@ class SupersetClient: if not self.config.verify_ssl: urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + self.logger.debug(f"Проверка сертификатов SSL отключена") self.session.mount('https://', adapter) self.session.verify = self.config.verify_ssl @@ -45,20 +48,25 @@ class SupersetClient: ) response.raise_for_status() self.access_token = response.json()["access_token"] - + self.logger.info(f"Токен Bearer {self.access_token} получен c {login_url}") + # Затем получаем CSRF токен с использованием access_token csrf_url = f"{self.config.base_url}/security/csrf_token/" + response = self.session.get( csrf_url, headers={"Authorization": f"Bearer {self.access_token}"}, verify=self.config.verify_ssl ) response.raise_for_status() + self.csrf_token = response.json()["result"] + self.logger.info(f"Токен CSRF {self.csrf_token} получен c {csrf_url}") except HTTPError as e: if e.response.status_code == 401: - error_msg = "Invalid credentials" if "login" in e.request.url else "CSRF token fetch failed" - raise AuthenticationError(f"{error_msg}. Check auth configuration") from e + error_msg = f"Неверные данные для аутенфикации для {login_url}" if "login" in e.request.url else f"Не удалось получить CSRF токен с {csrf_url}" + self.logger.error(f"Ошибка получения: {error_msg}") + raise AuthenticationError(f"{error_msg}. Проверь данные аутенфикации") from e raise @@ -78,7 +86,7 @@ class SupersetClient: :dashboard_id_or_slug - id или короткая ссылка """ url = f"{self.config.base_url}/dashboard/{dashboard_id_or_slug}" - + self.logger.debug(f"Получаем информацию по дашборду с /{url}...") try: response = self.session.get( url, @@ -86,9 +94,11 @@ class SupersetClient: timeout=self.config.timeout ) response.raise_for_status() + self.logger.info(f"ОК - Получили информацию по дашборду с {url}") return response.json()["result"] except requests.exceptions.RequestException as e: - raise SupersetAPIError(f"Failed to get dashboard: {str(e)}") from e + self.logger.error(f"Ошибка при получении информации о дашборде: {str(e)}", exc_info=True) + raise SupersetAPIError(f"Ошибка при получении информации о дашборде: {str(e)}") from e def get_dashboards(self, query: Optional[Dict] = None) -> Tuple[int, List[Dict]]: @@ -102,11 +112,12 @@ class SupersetClient: Tuple[int, List[Dict]]: Кортеж, содержащий общее количество дашбордов и список всех дашбордов. """ url = f"{self.config.base_url}/dashboard/" + self.logger.debug(f"Получаем информацию по дашбордам с {url}...") modified_query: Dict = {} all_results: List[Dict] = [] total_count: int = 0 current_page: int = 0 - + try: total_count = self.session.get( url, @@ -114,7 +125,9 @@ class SupersetClient: headers=self.headers, timeout=self.config.timeout ).json()['count'] + self.logger.info(f"ОК - Получили кол-во дашбордов ({total_count}) с {url}") except requests.exceptions.RequestException as e: + self.logger.error(f"Ошибка при получении кол-ва дашбордов: {str(e)}", exc_info=True) raise SupersetAPIError(f"Ошибка при получении кол-ва дашбордов: {str(e)}") from e #Инициализация параметров запроса с учетом переданного query @@ -151,18 +164,19 @@ class SupersetClient: params={"q": json.dumps(modified_query)} , timeout=self.config.timeout ) - response.raise_for_status() data = response.json() all_results.extend(data.get("result", [])) - current_page += 1 + + self.logger.info(f"ОК - Получили информацию по дашбордам с {url}") # Проверка, достигли ли последней страницы return total_count, all_results except requests.exceptions.RequestException as e: - raise SupersetAPIError(f"Ошибка при получении дашбордов: {str(e)}") from e + self.logger.error(f"Ошибка при получении информации о дашбордах: {str(e)}", exc_info=True) + raise SupersetAPIError(f"Ошибка при получении информации о дашбордах: {str(e)}") from e - def export_dashboard(self, dashboard_id: int) -> Tuple[bytes, str]: + def export_dashboard(self, dashboard_id: int, logger: Optional[SupersetLogger] = None) -> Tuple[bytes, str]: """Экспортирует дашборд из Superset в виде ZIP-архива и возвращает его содержимое с именем файла. Параметры: @@ -193,6 +207,8 @@ class SupersetClient: """ url = f"{self.config.base_url}/dashboard/export/" params = {"q": f"[{dashboard_id}]"} + logger = logger or SupersetLogger(name="client", console=False) + self.logger.debug(f"Экспортируем дашборд ID {dashboard_id} c {url}...") try: response = self.session.get( @@ -204,9 +220,11 @@ class SupersetClient: response.raise_for_status() filename = get_filename_from_headers(response.headers) or f"dashboard_{dashboard_id}.zip" + self.logger.info(f"Дашборд сохранен в {filename}") return response.content, filename except requests.exceptions.RequestException as e: + self.logger.error(f"Ошибка при экспорте: {str(e)}", exc_info=True) raise SupersetAPIError(f"Export failed: {str(e)}") from e @@ -241,7 +259,7 @@ class SupersetClient: - При конфликте имен может потребоваться ручное разрешение через параметры импорта """ url = f"{self.config.base_url}/dashboard/import/" - + self.logger.debug(f"Импортируем дашборд ID {zip_path} на {url}...") headers_without_content_type = {k: v for k, v in self.headers.items() if k.lower() != 'content-type'} zip_name = zip_path.name @@ -266,8 +284,10 @@ class SupersetClient: # Обрабатываем ответ try: response.raise_for_status() + self.logger.info(f"Дашборд импортирован успешно") return response.json() except requests.exceptions.HTTPError as e: + self.logger.error(f"Ошибка при импорте: {str(e)}", exc_info=True) error_detail = f"{e.response.status_code} {e.response.reason}" if e.response.text: error_detail += f"\nТело ответа: {e.response.text}" diff --git a/superset_tool/models.py b/superset_tool/models.py index 412e9f3..9de2424 100644 --- a/superset_tool/models.py +++ b/superset_tool/models.py @@ -1,10 +1,21 @@ -from pydantic import BaseModel +# models.py +from pydantic import BaseModel, validator +from typing import Optional +from .utils.logger import SupersetLogger class SupersetConfig(BaseModel): base_url: str - auth: dict # Словарь с параметрами аутентификации + auth: dict verify_ssl: bool = True timeout: int = 30 + logger: Optional[SupersetLogger] = None + + class Config: + arbitrary_types_allowed = True # Разрешаем произвольные типы class DatabaseConfig(BaseModel): - database_config: dict \ No newline at end of file + database_config: dict + logger: Optional[SupersetLogger] = None + + class Config: + arbitrary_types_allowed = True \ No newline at end of file diff --git a/superset_tool/utils/fileio.py b/superset_tool/utils/fileio.py index 87e3282..9d506c4 100644 --- a/superset_tool/utils/fileio.py +++ b/superset_tool/utils/fileio.py @@ -1,66 +1,147 @@ import re import zipfile from pathlib import Path -from typing import Optional, Tuple, Dict +from typing import Optional, Tuple, Dict, Any import datetime import shutil import yaml import tempfile import os from contextlib import contextmanager +from ..utils.logger import SupersetLogger @contextmanager -def create_temp_file(content: bytes, suffix: str = ".zip"): +def create_temp_file( + content: bytes, + suffix: str = ".zip", + logger: Optional[SupersetLogger] = None +): + """Контекстный менеджер для создания временных файлов с логированием""" + logger = logger or SupersetLogger(name="fileio", console=False) try: + logger.debug(f"Создание временного файла с суффиксом {suffix}") with tempfile.NamedTemporaryFile(suffix=suffix, delete=False) as tmp: tmp.write(content) tmp.flush() yield Path(tmp.name) + except Exception as e: + logger.error( + f"Ошибка создания временного файла: {str(e)}", exc_info=True) + raise finally: if Path(tmp.name).exists(): Path(tmp.name).unlink() + logger.debug(f"Временный файл {tmp.name} удален") + def save_and_unpack_dashboard( zip_content: bytes, output_dir: str = "dashboards", unpack: bool = False, - original_filename: Optional[str] = None -) -> Tuple[Path, Path]: - """Сохраняет и распаковывает дашборд с учетом оригинального имени""" + original_filename: Optional[str] = None, + logger: Optional[SupersetLogger] = None +) -> Tuple[Path, Optional[Path]]: + """Сохраняет и распаковывает дашборд с логированием""" + logger = logger or SupersetLogger(name="fileio", console=False) + logger.info(f"Старт обработки дашборда. Распаковка: {unpack}") + try: output_path = Path(output_dir) output_path.mkdir(parents=True, exist_ok=True) + logger.debug(f"Директория {output_path} создана/проверена") - # Генерируем имя файла - if original_filename: - zip_name = sanitize_filename(original_filename) - else: + zip_name = sanitize_filename( + original_filename) if original_filename else None + if not zip_name: timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") zip_name = f"dashboard_export_{timestamp}.zip" + logger.debug(f"Сгенерировано имя файла: {zip_name}") zip_path = output_path / zip_name + logger.info(f"Сохранение дашборда в: {zip_path}") - # Сохраняем ZIP-файл with open(zip_path, "wb") as f: f.write(zip_content) - + + logger.info(f"Дашборд успешно сохранен: {zip_path}") + if unpack: - # Распаковываем в поддиректорию с именем архива - #extract_dir_name = zip_path.stem - extract_path = output_path + extract_path = output_path + logger.debug(f"Начало распаковки в: {extract_path}") with zipfile.ZipFile(zip_path, 'r') as zip_ref: zip_ref.extractall(extract_path) - + + logger.info(f"Дашборд распакован в: {extract_path}") return zip_path, extract_path - - return zip_path - + + return zip_path, None except Exception as e: + logger.error(f"Ошибка обработки дашборда: {str(e)}", exc_info=True) raise RuntimeError(f"Failed to unpack dashboard: {str(e)}") from e +def create_dashboard_export(zip_name, source_paths, + exclude_extensions=None, + compress_type=zipfile.ZIP_DEFLATED, + logger: Optional[SupersetLogger] = None): + """ + Создает ZIP-архив с сохранением оригинальной структуры директорий + + Параметры: + zip_name (str): Имя создаваемого ZIP-архива + source_paths (list): Список путей для добавления в архив + exclude_extensions (list): Расширения файлов для исключения (например, ['.tmp', '.log']) + compress_type: Тип сжатия (по умолчанию ZIP_DEFLATED) + """ + logger = logger or SupersetLogger(name="fileio", console=False) + logger.info(f"Упаковываем дашборд {source_paths} в {zip_name}") + try: + exclude_ext = [ext.lower() for ext in exclude_extensions] if exclude_extensions else [] + + # Проверка существования исходных путей + for path in source_paths: + if not os.path.exists(path): + raise FileNotFoundError(f"Путь не найден: {path}") + + # Сбор всех файлов с их базовыми путями + files_with_base = [] + for path in source_paths: + abs_path = os.path.abspath(path) + if os.path.isfile(abs_path): + # Для файла: базовый путь = директория файла + base_path = os.path.dirname(abs_path) + files_with_base.append((abs_path, base_path)) + elif os.path.isdir(abs_path): + # Для директории: базовый путь = сама директория + base_path = abs_path + for root, _, files in os.walk(abs_path): + for file in files: + full_path = os.path.join(root, file) + files_with_base.append((full_path, base_path)) + + # Фильтрация по расширениям + if exclude_ext: + files_with_base = [ + (f, b) for f, b in files_with_base + if os.path.splitext(f)[1].lower() not in exclude_ext + ] + + # Создание архива + with zipfile.ZipFile(zip_name, 'w', compress_type) as zipf: + for file, base_path in files_with_base: + # Вычисляем относительный путь от base_path + rel_path = os.path.relpath(file, start=base_path) + arcname = os.path.join(Path(zip_name).stem, rel_path) + zipf.write(file, arcname=arcname) + logger.debug(f"Добавлен {arcname}") + + logger.info(f"Архив создан {zip_name}") + return True + except Exception as e: + logger.error(f"\nОшибка: {str(e)}") + return False def get_filename_from_headers(headers: dict) -> Optional[str]: """Извлекает имя файла из заголовков HTTP-ответа""" @@ -76,18 +157,17 @@ def get_filename_from_headers(headers: dict) -> Optional[str]: return filename_match[0].strip('"') return None - def sanitize_filename(filename: str) -> str: """Очищает имя файла от потенциально опасных символов""" return re.sub(r'[\\/*?:"<>|]', "_", filename).strip() - def archive_exports( output_dir: str, daily_retention: int = 7, weekly_retention: int = 4, monthly_retention: int = 12, - yearly_retention: Optional[int] = None + yearly_retention: Optional[int] = None, + logger: Optional[SupersetLogger] = None ): """Управление историей экспортов по политике GFS (Grandfather-Father-Son) Параметры: @@ -97,10 +177,12 @@ def archive_exports( :yearly_retention: Optional[int] Извлекает даты из стандартного суперсетовсого архива вида, либо берет дату изменения архива dashboard_export_20250326T121517.zip""" - + + logger = logger or SupersetLogger(name="fileio", console=False) + logger.info(f"Старт очистки архивов в {output_dir}") export_dir = Path(output_dir) files_with_dates = [] - + # Собираем файлы с их датами for file in export_dir.glob("*.zip"): # Извлекаем временную метку из имени файла @@ -113,62 +195,71 @@ def archive_exports( # Если не удалось распарсить - используем дату изменения файла mtime = file.stat().st_mtime date = datetime.datetime.fromtimestamp(mtime).date() - + logger.warning(f"Использована дата модификации для {file.name}") + files_with_dates.append((file, date)) - - # Сортируем файлы по дате (новые сначала) - files_with_dates.sort(key=lambda x: x[1], reverse=True) - - # Создаем группы для разных уровней резервирования - daily_groups = {} - weekly_groups = {} - monthly_groups = {} - yearly_groups = {} + try: + # Сортируем файлы по дате (новые сначала) + files_with_dates.sort(key=lambda x: x[1], reverse=True) - for file, date in files_with_dates: - # Группировка по дням - daily_groups.setdefault(date, file) - - # Группировка по неделям - year, week, _ = date.isocalendar() - weekly_groups.setdefault((year, week), file) - - # Группировка по месяцам - monthly_groups.setdefault((date.year, date.month), file) - - # Группировка по годам - yearly_groups.setdefault(date.year, file) - - # Выбираем файлы для сохранения - keep_files = set() - - # Daily - последние N дней - sorted_daily = sorted(daily_groups.keys(), reverse=True)[:daily_retention] - keep_files.update(daily_groups[d] for d in sorted_daily) - - # Weekly - последние N недель - sorted_weekly = sorted(weekly_groups.keys(), reverse=True)[:weekly_retention] - keep_files.update(weekly_groups[w] for w in sorted_weekly) - - # Monthly - последние N месяцев - sorted_monthly = sorted(monthly_groups.keys(), reverse=True)[:monthly_retention] - keep_files.update(monthly_groups[m] for m in sorted_monthly) - - # Yearly - все или последние N лет - if yearly_retention is not None: - sorted_yearly = sorted(yearly_groups.keys(), reverse=True)[:yearly_retention] - keep_files.update(yearly_groups[y] for y in sorted_yearly) - else: - keep_files.update(yearly_groups.values()) - - # Удаляем неподходящие файлы и директории - for file, _ in files_with_dates: - if file not in keep_files: - file.unlink() - # unpacked_dir = export_dir / file.stem - # if unpacked_dir.exists(): - # shutil.rmtree(unpacked_dir) + # Создаем группы для разных уровней резервирования + daily_groups = {} + weekly_groups = {} + monthly_groups = {} + yearly_groups = {} + for file, date in files_with_dates: + # Группировка по дням + daily_groups.setdefault(date, file) + + # Группировка по неделям + year, week, _ = date.isocalendar() + weekly_groups.setdefault((year, week), file) + + # Группировка по месяцам + monthly_groups.setdefault((date.year, date.month), file) + + # Группировка по годам + yearly_groups.setdefault(date.year, file) + + # Выбираем файлы для сохранения + keep_files = set() + + # Daily - последние N дней + sorted_daily = sorted(daily_groups.keys(), reverse=True)[ + :daily_retention] + keep_files.update(daily_groups[d] for d in sorted_daily) + + # Weekly - последние N недель + sorted_weekly = sorted(weekly_groups.keys(), reverse=True)[ + :weekly_retention] + keep_files.update(weekly_groups[w] for w in sorted_weekly) + + # Monthly - последние N месяцев + sorted_monthly = sorted(monthly_groups.keys(), reverse=True)[ + :monthly_retention] + keep_files.update(monthly_groups[m] for m in sorted_monthly) + + # Yearly - все или последние N лет + if yearly_retention is not None: + sorted_yearly = sorted(yearly_groups.keys(), reverse=True)[ + :yearly_retention] + keep_files.update(yearly_groups[y] for y in sorted_yearly) + else: + keep_files.update(yearly_groups.values()) + + # Удаляем неподходящие файлы и директории + for file, _ in files_with_dates: + if file not in keep_files: + file.unlink() + unpacked_dir = export_dir / file.stem + if unpacked_dir.exists(): + shutil.rmtree(unpacked_dir) + logger.info(f"Очистка завершена. Сохранено {len(keep_files)} файлов") + + except Exception as e: + logger.error(f"Ошибка очистки архивов: {str(e)}", exc_info=True) + raise def determine_and_load_yaml_type(file_path): with open(file_path, 'r') as f: @@ -185,67 +276,70 @@ def determine_and_load_yaml_type(file_path): else: return data, 'unknown' - -def update_db_yaml(db_config: Dict = None, path: str = "dashboards", verbose: bool = False) -> None: +def update_db_yaml( + db_config: Dict = None, + path: str = "dashboards", + logger: Optional[SupersetLogger] = None +) -> None: """ - Обновляет конфигурации в YAML-файлах баз данных согласно переданному словарю замен + Обновляет конфигурации в YAML-файлах баз данных, заменяя старые значения на новые - :param db_config: Словарь с параметрами для замены (ключ: значение) - :param path: Путь к папке с YAML-файлами (по умолчанию 'databases') + :param db_config: Словарь с параметрами для замены в формате: + { + "old": {старые_ключи: значения_для_поиска}, + "new": {новые_ключи: значения_для_замены} + } + :param path: Путь к папке с YAML-файлами """ - # Устанавливаем дефолтные значения - db_config = db_config or {} + logger = logger or SupersetLogger(name="fileio", console=False) + logger.info("Старт обновления YAML-конфигов") - # Ищем все YAML-файлы в указанной папке - databases_dir = Path(path) - yaml_files = databases_dir.rglob("*.yaml") + try: + db_config = db_config or {} + old_config = db_config.get("old", {}) # Значения для поиска + new_config = db_config.get("new", {}) # Значения для замены - for file_path in yaml_files: - try: - # Чтение и загрузка YAML + databases_dir = Path(path) + yaml_files = databases_dir.rglob("*.yaml") - data, yaml_type = determine_and_load_yaml_type(file_path) or {} + for file_path in yaml_files: + try: + result = determine_and_load_yaml_type(file_path) + + data, yaml_type = result if result else ({}, None) + logger.debug(f"Тип {file_path} - {yaml_type}") - # Обновляем только существующие ключи - updates = { - k: v - for k, v in db_config.items() - if ( - k in data # ключ есть в data - and data[k] != v # значение отличается - and ( - # для database — все ключи - (yaml_type == "database") or - # для dataset — исключаем uuid - (yaml_type == "dataset" and k != "uuid") - ) - ) - } + updates = {} + for key in old_config: + if key in data and data[key] == old_config[key]: + new_value = new_config.get(key) + if new_value is not None and new_value != data.get(key): + updates[key] = new_value - # Обновляем data - data.update(updates) - if verbose and updates: - print(f"Обработан {file_path}") - print(updates) + if updates: + logger.info(f"Обновление {file_path}: {updates}") + data.update(updates) + + with open(file_path, 'w') as file: + yaml.dump( + data, + file, + default_flow_style=False, + sort_keys=False + ) - # Сохранение с сохранением структуры файла - with open(file_path, 'w') as file: - yaml.dump( - data, - file, - default_flow_style=False, - sort_keys=False, - allow_unicode=True - ) + except Exception as e: + logger.error(f"Ошибка обработки {file_path}: {str(e)}", exc_info=True) - except Exception as e: - print(f"Ошибка при обработке файла {file_path}: {str(e)}") + except Exception as e: + logger.error(f"Критическая ошибка обновления: {str(e)}", exc_info=True) + raise def sync_for_git( source_path: str, destination_path: str, dry_run: bool = False, - verbose: bool = False + logger: Optional[SupersetLogger] = None ) -> None: """ Синхронизирует содержимое папки source_path с destination_path. @@ -259,7 +353,10 @@ def sync_for_git( :param dry_run: Режим имитации (не вносит реальных изменений) :param verbose: Подробный вывод операций """ + logger = logger or SupersetLogger(name="fileio", console=False) + logger.info("Старт перезаписи целевой папки") source_files = set() + for root, _, files in os.walk(source_path): for file in files: rel_path = os.path.relpath(os.path.join(root, file), source_path) @@ -268,7 +365,8 @@ def sync_for_git( destination_files = set() for root, _, files in os.walk(destination_path): for file in files: - rel_path = os.path.relpath(os.path.join(root, file), destination_path) + rel_path = os.path.relpath( + os.path.join(root, file), destination_path) destination_files.add(rel_path) # Копирование/обновление файлов @@ -277,19 +375,10 @@ def sync_for_git( dst = os.path.join(destination_path, file) dest_dir = os.path.dirname(dst) - if verbose: - status = "[DRY-RUN] " if dry_run else "" - print(f"{status}Creating directory: {dest_dir}") + os.makedirs(dest_dir, exist_ok=True) - if not dry_run: - os.makedirs(dest_dir, exist_ok=True) - - if verbose: - status = "[DRY-RUN] " if dry_run else "" - print(f"{status}Copying: {file}") - - if not dry_run: - shutil.copy2(src, dst) + shutil.copy2(src, dst) + logger.info(f"Копируем: {file}") # Удаление лишних файлов files_to_delete = destination_files - source_files @@ -297,33 +386,25 @@ def sync_for_git( for file in files_to_delete: target = Path(destination_path) / file - + # Пропускаем .git и его содержимое try: if git_dir in target.parents or target == git_dir: - if verbose: - print(f"Skipping .git item: {target}") + logger.info(f"Пропускаем .git: {target}") continue except ValueError: pass - if verbose: - action = "Would delete" if dry_run else "Deleting" - print(f"{action}: {target}") - - if not dry_run: - try: - if target.is_file(): - target.unlink() - elif target.is_dir(): - shutil.rmtree(target) - except OSError as e: - print(f"Error deleting {target}: {e}") + try: + if target.is_file(): + target.unlink() + elif target.is_dir(): + shutil.rmtree(target) + except OSError as e: + logger.error(f"Ошибка удаления: {target}: {e}") # Удаление пустых директорий - if verbose: - print("\nChecking for empty directories...") - + git_dir = Path(destination_path) / ".git" deleted_dirs = set() @@ -331,25 +412,21 @@ def sync_for_git( for root, dirs, files in os.walk(destination_path, topdown=False): for dir_name in dirs: dir_path = Path(root) / dir_name - + # Пропускаем .git и его поддиректории try: if git_dir in dir_path.parents or dir_path == git_dir: continue except ValueError: pass - + # Проверяем что директория пуста и не была удалена ранее if dir_path not in deleted_dirs and not any(dir_path.iterdir()): - if verbose: - status = "[DRY-RUN] " if dry_run else "" - print(f"{status}Deleting empty directory: {dir_path}") - - if not dry_run: - try: - dir_path.rmdir() - deleted_dirs.add(dir_path) - except OSError as e: - print(f"Error deleting directory {dir_path}: {e}") + try: + dir_path.rmdir() + deleted_dirs.add(dir_path) + logger.info(f"Удаляем пустую директорию: {dir_path}") + except OSError as e: + logger.error(f"Ошибка удаления: {dir_path}: {e}") diff --git a/superset_tool/utils/logger.py b/superset_tool/utils/logger.py index 6954707..c7264e6 100644 --- a/superset_tool/utils/logger.py +++ b/superset_tool/utils/logger.py @@ -1,5 +1,61 @@ # utils/logger.py -def configure_logger(): - logger = logging.getLogger("superset_migration") - # Настройка формата, обработчиков и уровня логирования - return logger \ No newline at end of file +import logging +from datetime import datetime +from pathlib import Path +from typing import Optional + +class SupersetLogger: + def __init__( + self, + name: str = "superset_tool", + log_dir: Optional[Path] = None, + level: int = logging.INFO, + console: bool = True + ): + self.logger = logging.getLogger(name) + self.logger.setLevel(level) + + formatter = logging.Formatter( + '%(asctime)s - %(levelname)s - %(message)s' + ) + + # Очищаем существующие обработчики + if self.logger.handlers: + for handler in self.logger.handlers[:]: + self.logger.removeHandler(handler) + + # Файловый обработчик + if log_dir: + log_dir.mkdir(parents=True, exist_ok=True) + file_handler = logging.FileHandler( + log_dir / f"{name}_{self._get_timestamp()}.log" + ) + file_handler.setFormatter(formatter) + self.logger.addHandler(file_handler) + + # Консольный обработчик + if console: + console_handler = logging.StreamHandler() + console_handler.setFormatter(formatter) + self.logger.addHandler(console_handler) + + def _get_timestamp(self) -> str: + return datetime.now().strftime("%Y%m%d") + + def info(self, message: str, exc_info: bool = False): + self.logger.info(message, exc_info=exc_info) + + def error(self, message: str, exc_info: bool = False): + self.logger.error(message, exc_info=exc_info) + + def warning(self, message: str, exc_info: bool = False): + self.logger.warning(message, exc_info=exc_info) + + def debug(self, message: str, exc_info: bool = False): + self.logger.debug(message, exc_info=exc_info) + + def exception(self, message: str): + self.logger.exception(message) + + def critical(self, message: str, exc_info: bool = False): + self.logger.critical(message, exc_info=exc_info) diff --git a/test api.py b/test api.py index b649211..0d4f0c9 100644 --- a/test api.py +++ b/test api.py @@ -25,6 +25,18 @@ def setup_clients(): """Инициализация клиентов для разных окружений""" clients = {} try: + + # Конфигурация для Prod + prod_config = SupersetConfig( + base_url="https://prodta.bi.dwh.rusal.com/api/v1", + auth={ + "provider": "db", + "username": "migrate_user", + "password": keyring.get_password("system", "prod migrate"), + "refresh": True + }, + verify_ssl=False + ) # Конфигурация для Dev dev_config = SupersetConfig( base_url="https://devta.bi.dwh.rusal.com/api/v1", @@ -37,17 +49,6 @@ def setup_clients(): verify_ssl=False ) - # Конфигурация для Prod - prod_config = SupersetConfig( - base_url="https://prodta.bi.dwh.rusal.com/api/v1", - auth={ - "provider": "db", - "username": "migrate_user", - "password": keyring.get_password("system", "prod migrate"), - "refresh": True - }, - verify_ssl=False - ) # Конфигурация для Sandbox sandbox_config = SupersetConfig( @@ -60,10 +61,19 @@ def setup_clients(): }, verify_ssl=False ) - - clients['dev'] = SupersetClient(dev_config) - clients['sbx'] = SupersetClient(sandbox_config) - logger.info("Клиенты для окружений успешно инициализированы") + try: + clients['dev'] = SupersetClient(dev_config) + except Exception as e: + logger.error(f"Ошибка инициализации клиента: {str(e)}") + try: + clients['sbx'] = SupersetClient(sandbox_config) + except Exception as e: + logger.error(f"Ошибка инициализации клиента: {str(e)}") + try: + clients['prod'] = SupersetClient(prod_config) + except Exception as e: + logger.error(f"Ошибка инициализации клиента: {str(e)}") + return clients except Exception as e: logger.error(f"Ошибка инициализации клиентов: {str(e)}") @@ -71,9 +81,9 @@ def setup_clients(): def backup_dashboards(client, env_name, backup_root): """Выполнение бэкапа дашбордов для указанного окружения""" - logger.info(f"Начало бэкапа для окружения {env_name}") + #logger.info(f"Начало бэкапа для окружения {env_name}") - print(client.get_dashboards()) + #print(client.get_dashboards()) # dashboard_count,dashboard_meta = client.get_dashboards() # total = 0 # success = 0 @@ -100,9 +110,16 @@ dev_success = backup_dashboards( superset_backup_repo ) + # Бэкап для Sandbox -# sbx_success = backup_dashboards( -# clients['sbx'], -# "SBX", -# superset_backup_repo -# ) \ No newline at end of file +sbx_success = backup_dashboards( + clients['sbx'], + "SBX", + superset_backup_repo +) + +prod_success = backup_dashboards( + clients['prod'], + "PROD", + superset_backup_repo +)