diff --git a/superset_tool/README.md b/README.md similarity index 100% rename from superset_tool/README.md rename to README.md diff --git a/migration_script.py b/migration_script.py index 9aa1218..bd7f03a 100644 --- a/migration_script.py +++ b/migration_script.py @@ -2,7 +2,7 @@ from superset_tool.models import SupersetConfig from superset_tool.client import SupersetClient from superset_tool.utils.logger import SupersetLogger from superset_tool.exceptions import AuthenticationError -from superset_tool.utils.fileio import save_and_unpack_dashboard, update_db_yaml, create_dashboard_export, create_temp_file +from superset_tool.utils.fileio import save_and_unpack_dashboard, update_yamls, create_dashboard_export, create_temp_file,read_dashboard_from_disk import os import keyring from pathlib import Path @@ -15,7 +15,7 @@ logger = SupersetLogger( console=True ) -database_config_click={"new": +database_config_click={"old": { "database_name": "Prod Clickhouse", "sqlalchemy_uri": "clickhousedb+connect://clicketl:XXXXXXXXXX@rgm-s-khclk.hq.root.ad:443/dm", @@ -25,7 +25,7 @@ database_config_click={"new": "allow_cvas": "false", "allow_dml": "false" }, - "old": { + "new": { "database_name": "Dev Clickhouse", "sqlalchemy_uri": "clickhousedb+connect://dwhuser:XXXXXXXXXX@10.66.229.179:8123/dm", "uuid": "e9fd8feb-cb77-4e82-bc1d-44768b8d2fc2", @@ -36,7 +36,7 @@ database_config_click={"new": } } -database_config_gp={"new": +database_config_gp={"old": { "database_name": "Prod Greenplum", "sqlalchemy_uri": "postgresql+psycopg2://viz_powerbi_gp_prod:XXXXXXXXXX@10.66.229.201:5432/dwh", @@ -46,7 +46,7 @@ database_config_gp={"new": "allow_cvas": "true", "allow_dml": "true" }, - "old": { + "new": { "database_name": "DEV Greenplum", "sqlalchemy_uri": "postgresql+psycopg2://viz_superset_gp_dev:XXXXXXXXXX@10.66.229.171:5432/dwh", "uuid": "97b97481-43c3-4181-94c5-b69eaaa1e11f", @@ -102,9 +102,9 @@ dev_client = SupersetClient(dev_config) sandbox_client = SupersetClient(sandbox_config) prod_client = SupersetClient(prod_config) -from_c = dev_client -to_c = sandbox_client -dashboard_slug = "FI0050" +from_c = sandbox_client +to_c = dev_client +dashboard_slug = "FI0070" dashboard_id = 53 dashboard_meta = from_c.get_dashboard(dashboard_slug) @@ -115,8 +115,11 @@ dashboard_id = dashboard_meta["id"] with create_temp_file(suffix='.dir', logger=logger) as temp_root: # Экспорт дашборда во временную директорию - zip_content, filename = from_c.export_dashboard(dashboard_id, logger=logger) - + #zip_content, filename = from_c.export_dashboard(dashboard_id, logger=logger) + zip_db_path = r"C:\Users\VolobuevAA\Downloads\dashboard_export_20250616T174203.zip" + + zip_content, filename = read_dashboard_from_disk(zip_db_path, logger=logger) + # Сохранение и распаковка во временную директорию zip_path, unpacked_path = save_and_unpack_dashboard( zip_content=zip_content, @@ -128,8 +131,7 @@ with create_temp_file(suffix='.dir', logger=logger) as temp_root: # Обновление конфигураций source_path = unpacked_path / Path(filename).stem - update_db_yaml(database_config_click, path=source_path, logger=logger) - update_db_yaml(database_config_gp, path=source_path, logger=logger) + update_yamls([database_config_click,database_config_gp], path=source_path, logger=logger) # Создание нового экспорта во временной директории temp_zip = temp_root / f"{dashboard_slug}.zip" diff --git a/superset_tool/client.py b/superset_tool/client.py index 8f91c6f..da24783 100644 --- a/superset_tool/client.py +++ b/superset_tool/client.py @@ -1,39 +1,177 @@ -import requests -from requests.exceptions import HTTPError -import urllib3 +# [MODULE] Superset API Client +# @contract: Реализует полное взаимодействие с Superset API +# @semantic_layers: +# 1. Авторизация/CSRF +# 2. Основные операции (дашборды) +# 3. Импорт/экспорт +# @coherence: +# - Согласован с models.SupersetConfig +# - Полная обработка всех errors из exceptions.py + +# [IMPORTS] Стандартная библиотека import json -from typing import Dict, Optional, Tuple, List, Any +from typing import Optional, Dict, Tuple, List, Any, Literal, Union,BinaryIO +from pathlib import Path + +# [IMPORTS] Сторонние библиотеки +import requests +import urllib3 from pydantic import BaseModel, Field -from .utils.fileio import * -from .exceptions import * +from requests.exceptions import HTTPError + +# [IMPORTS] Локальные модули from .models import SupersetConfig +from .exceptions import ( + AuthenticationError, + SupersetAPIError, + DashboardNotFoundError, + NetworkError, + PermissionDeniedError, + ExportError +) +from .utils.fileio import get_filename_from_headers from .utils.logger import SupersetLogger -class SupersetClient: - def __init__(self, config: SupersetConfig): - self.config = config - self.logger = config.logger or SupersetLogger(console=False) - self.session = requests.Session() - self._setup_session() - self._authenticate() +# [CONSTANTS] Логирование +HTTP_METHODS = Literal['GET', 'POST', 'PUT', 'DELETE'] +DEFAULT_TIMEOUT = 30 # seconds - def _setup_session(self): +# [TYPE-ALIASES] Для сложных сигнатур +JsonType = Union[Dict[str, Any], List[Dict[str, Any]]] +ResponseType = Tuple[bytes, str] + +# [CHECK] Валидация импортов для контрактов +try: + # Проверка наличия ключевых зависимостей + assert requests.__version__ >= '2.28.0' # для retry механизмов + assert urllib3.__version__ >= '1.26.0' # для SSL warnings + + # Проверка локальных модулей + from .utils.fileio import get_filename_from_headers as fileio_check + assert callable(fileio_check) + +except (ImportError, AssertionError) as imp_err: + raise RuntimeError( + f"[COHERENCE_CHECK_FAILED] Импорт не прошел валидацию: {str(imp_err)}" + ) from imp_err + + + +class SupersetClient: + """[MAIN-CONTRACT] Клиент для работы с Superset API + @pre: + - config должен быть валидным SupersetConfig + - Целевой API доступен + @post: + - Все методы возвращают данные или вызывают явные ошибки + - Токены автоматически обновляются + @invariant: + - Сессия остается валидной между вызовами + - Все ошибки типизированы согласно exceptions.py + """ + + def __init__(self, config: SupersetConfig): + """[INIT] Инициализация клиента + @semantic: + - Создает сессию requests + - Настраивает адаптеры подключения + - Выполняет первичную аутентификацию + """ + self._validate_config(config) + self.config = config + self.logger = config.logger or SupersetLogger(name="client") + self.session = self._setup_session() + + try: + self._authenticate() + self.logger.info( + "[COHERENCE_CHECK_PASSED] Клиент успешно инициализирован", + extra={"base_url": config.base_url} + ) + except Exception as e: + self.logger.error( + "[INIT_FAILED] Ошибка инициализации клиента", + exc_info=True, + extra={"config": config.dict()} + ) + raise + + def _validate_config(self, config: SupersetConfig) -> None: + """[PRECONDITION] Валидация конфигурации клиента + @semantic: + - Проверяет обязательные поля + - Валидирует URL и учетные данные + @raise: + - ValueError при невалидных параметрах + - TypeError при некорректном типе + """ + if not isinstance(config, SupersetConfig): + self.logger.error( + "[CONFIG_VALIDATION_FAILED] Некорректный тип конфигурации", + extra={"actual_type": type(config).__name__} + ) + raise TypeError("Конфигурация должна быть экземпляром SupersetConfig") + + required_fields = ["base_url", "auth"] + for field in required_fields: + if not getattr(config, field, None): + self.logger.error( + "[CONFIG_VALIDATION_FAILED] Отсутствует обязательное поле", + extra={"missing_field": field} + ) + raise ValueError(f"Обязательное поле {field} не указано") + + if not config.auth.get("username") or not config.auth.get("password"): + self.logger.error( + "[CONFIG_VALIDATION_FAILED] Не указаны учетные данные", + extra={"auth_keys": list(config.auth.keys())} + ) + raise ValueError("В конфигурации должны быть указаны username и password") + + # Дополнительная валидация URL + if not config.base_url.startswith(("http://", "https://")): + self.logger.error( + "[CONFIG_VALIDATION_FAILED] Некорректный URL", + extra={"base_url": config.base_url} + ) + raise ValueError("base_url должен начинаться с http:// или https://") + + # [CHUNK] Настройка сессии и адаптеров + def _setup_session(self) -> requests.Session: + """[INTERNAL] Конфигурация HTTP-сессии + @coherence_check: SSL verification должен соответствовать config + """ + session = requests.Session() adapter = requests.adapters.HTTPAdapter( max_retries=3, pool_connections=10, pool_maxsize=100 ) - + + session.mount('https://', adapter) + session.verify = self.config.verify_ssl + if not self.config.verify_ssl: - urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) - self.logger.debug(f"Проверка сертификатов SSL отключена") - - self.session.mount('https://', adapter) - self.session.verify = self.config.verify_ssl + urllib3.disable_warnings() + self.logger.debug( + "[CONFIG] SSL verification отключен", + extra={"base_url": self.config.base_url} + ) + + return session + # [CHUNK] Процесс аутентификации def _authenticate(self): + """[AUTH-FLOW] Получение токенов + @semantic_steps: + 1. Получение access_token + 2. Получение CSRF токена + @error_handling: + - AuthenticationError при проблемах credentials + - NetworkError при проблемах связи + """ try: - # Сначала логинимся для получения access_token + # [STEP 1] Получение bearer token login_url = f"{self.config.base_url}/security/login" response = self.session.post( login_url, @@ -41,38 +179,59 @@ class SupersetClient: "username": self.config.auth["username"], "password": self.config.auth["password"], "provider": self.config.auth["provider"], - "refresh": True + "refresh": self.config.auth["refresh"] }, - verify=self.config.verify_ssl + timeout=self.config.timeout ) + + if response.status_code == 401: + raise AuthenticationError( + "Invalid credentials", + context={ + "endpoint": login_url, + "username": self.config.auth["username"], + "status_code": response.status_code + } + ) + response.raise_for_status() self.access_token = response.json()["access_token"] - self.logger.info( - f"Токен Bearer {self.access_token} получен c {login_url}") - # Затем получаем CSRF токен с использованием access_token + # [STEP 2] Получение CSRF token csrf_url = f"{self.config.base_url}/security/csrf_token/" - response = self.session.get( csrf_url, headers={"Authorization": f"Bearer {self.access_token}"}, - verify=self.config.verify_ssl + timeout=self.config.timeout ) response.raise_for_status() - self.csrf_token = response.json()["result"] + self.logger.info( - f"Токен CSRF {self.csrf_token} получен c {csrf_url}") - except HTTPError as e: - if e.response.status_code == 401: - error_msg = f"Неверные данные для аутенфикации для {login_url}" if "login" in e.request.url else f"Не удалось получить CSRF токен с {csrf_url}" - self.logger.error(f"Ошибка получения: {error_msg}") - raise AuthenticationError( - f"{error_msg}. Проверь данные аутенфикации") from e - raise + "[AUTH_SUCCESS] Токены успешно получены", + extra={ + "access_token": f"{self.access_token[:5]}...", + "csrf_token": f"{self.csrf_token[:5]}..." + } + ) + + except requests.exceptions.RequestException as e: + error_context = { + "method": e.request.method, + "url": e.request.url, + "status_code": getattr(e.response, 'status_code', None) + } + + if isinstance(e, (requests.Timeout, requests.ConnectionError)): + raise NetworkError("Connection failed", context=error_context) from e + raise SupersetAPIError("Auth flow failed", context=error_context) from e @property - def headers(self): + def headers(self) -> dict: + """[INTERFACE] Базовые заголовки для API-вызовов + @semantic: Объединяет общие заголовки для всех запросов + @post: Всегда возвращает актуальные токены + """ return { "Authorization": f"Bearer {self.access_token}", "X-CSRFToken": self.csrf_token, @@ -80,256 +239,358 @@ class SupersetClient: "Content-Type": "application/json" } - def get_dashboard(self, dashboard_id_or_slug: str) -> Dict: - """ - Получаем информацию по дашборду (если передан dashboard_id_or_slug) - Параметры: - :dashboard_id_or_slug - id или короткая ссылка + # [MAIN-OPERATIONS] Работа с дашбордами + def get_dashboard(self, dashboard_id_or_slug: str) -> dict: + """[CONTRACT] Получение метаданных дашборда + @pre: + - dashboard_id_or_slug должен существовать + - Токены должны быть валидны + @post: + - Возвращает полные метаданные + - В случае 404 вызывает DashboardNotFoundError """ url = f"{self.config.base_url}/dashboard/{dashboard_id_or_slug}" - self.logger.debug(f"Получаем информацию по дашборду с /{url}...") - + try: response = self.session.get( url, headers=self.headers, timeout=self.config.timeout ) - response.raise_for_status() - self.logger.info(f"ОК - Получили информацию по дашборду с {response.url}") - - return response.json()["result"] - except requests.exceptions.RequestException as e: - self.logger.error( - f"Ошибка при получении информации о дашборде: {str(e)}", exc_info=True) - raise SupersetAPIError( - f"Ошибка при получении информации о дашборде: {str(e)}") from e - - def get_dashboards(self, query: Optional[Dict] = None) -> Tuple[int, List[Dict]]: - """ - Получаем информацию по всем дашбордам с учетом пагинации. - - Параметры: - query (Optional[Dict]): Дополнительные параметры запроса, включая пагинацию и фильтры. - - Возвращает: - Tuple[int, List[Dict]]: Кортеж, содержащий общее количество дашбордов и список всех дашбордов. - """ - url = f"{self.config.base_url}/dashboard/" - self.logger.debug(f"Получаем информацию по дашбордам с {url}...") - modified_query: Dict = {} - all_results: List[Dict] = [] - total_count: int = 0 - current_page: int = 0 - q_param = '{ "columns": [ "id" ], "page": 0, "page_size": 20}' - try: - response = self.session.get( - url=f"{url}?q={q_param}", - #params={"q": json.dumps(default_query)}, # Передаем такой body, иначе на prodta отдает 1 дашборд - headers=self.headers, - timeout=self.config.timeout - ) - total_count = response.json()['count'] - self.logger.info( - f"ОК - Получили кол-во дашбордов ({total_count}) с {url}") - self.logger.info(f"Запрос - {response.url}") - except requests.exceptions.RequestException as e: - self.logger.error( - f"Ошибка при получении кол-ва дашбордов: {str(e)}", exc_info=True) - raise SupersetAPIError( - f"Ошибка при получении кол-ва дашбордов: {str(e)}") from e - # Инициализация параметров запроса с учетом переданного query - - if query: - modified_query = query.copy() - # Убедимся, что page_size установлен, если не передан - modified_query.setdefault("page_size", 20) - else: - modified_query = { - "columns": [ - "slug", - "id", - "changed_on_utc", - "dashboard_title", - "published" - ], - "page": 0, - "page_size": 20 - } - - page_size = modified_query["page_size"] - - total_pages = (total_count + page_size - 1) // page_size - - try: - while current_page < total_pages: - modified_query["page"] = current_page - response = self.session.get( - url, - headers=self.headers, - params={"q": json.dumps(modified_query)}, - timeout=self.config.timeout + + if response.status_code == 404: + raise DashboardNotFoundError( + dashboard_id_or_slug, + context={"url": url} ) - response.raise_for_status() - data = response.json() - all_results.extend(data.get("result", [])) - current_page += 1 - - self.logger.info(f"ОК - Получили информацию по дашбордам с {url}") - # Проверка, достигли ли последней страницы - return total_count, all_results + + response.raise_for_status() + return response.json()["result"] + except requests.exceptions.RequestException as e: - self.logger.error( - f"Ошибка при получении информации о дашбордах: {str(e)}", exc_info=True) - raise SupersetAPIError( - f"Ошибка при получении информации о дашбордах: {str(e)}") from e + self._handle_api_error("get_dashboard", e, url) - def export_dashboard(self, dashboard_id: int, logger: Optional[SupersetLogger] = None) -> Tuple[bytes, str]: - """Экспортирует дашборд из Superset в виде ZIP-архива и возвращает его содержимое с именем файла. - - Параметры: - :dashboard_id (int): Идентификатор дашборда для экспорта - - Возвращает: - Tuple[bytes, str]: Кортеж, содержащий: - - bytes: Бинарное содержимое ZIP-архива с дашбордом - - str: Имя файла (из заголовков ответа или в формате dashboard_{id}.zip) - - Исключения: - SupersetAPIError: Вызывается при ошибках: - - Проблемы с сетью/соединением - - Невалидный ID дашборда - - Отсутствие прав доступа - - Ошибки сервера (status code >= 400) - - Пример использования: - content, filename = client.export_dashboard(5) - with open(filename, 'wb') as f: - f.write(content) - - Примечания: - - Для экспорта используется API Endpoint: /dashboard/export/ - - Имя файла пытается извлечь из Content-Disposition заголовка - - По умолчанию возвращает имя в формате dashboard_{id}.zip - - Архив содержит JSON-метаданные и связанные элементы (датасеты, чарты) - """ + def export_dashboard(self, dashboard_id: int) -> tuple[bytes, str]: + """[CONTRACT] Экспорт дашборда в ZIP + @error_handling: + - DashboardNotFoundError если дашборд не существует + - ExportError при проблемах экспорта + """ url = f"{self.config.base_url}/dashboard/export/" - params = {"q": f"[{dashboard_id}]"} - logger = logger or SupersetLogger(name="client", console=False) - self.logger.debug(f"Экспортируем дашборд ID {dashboard_id} c {url}...") - + try: response = self.session.get( url, headers=self.headers, - params=params, + params={"q": f"[{dashboard_id}]"}, timeout=self.config.timeout ) + + if response.status_code == 404: + raise DashboardNotFoundError(dashboard_id) + response.raise_for_status() + + filename = ( + get_filename_from_headers(response.headers) + or f"dashboard_{dashboard_id}.zip" + ) + return response.content, filename + + except requests.exceptions.RequestException as e: + self._handle_api_error("export_dashboard", e, url) - filename = get_filename_from_headers( - response.headers) or f"dashboard_{dashboard_id}.zip" - self.logger.info(f"Дашборд сохранен в {filename}") + # [ERROR-HANDLER] Централизованная обработка ошибок + def _handle_api_error(self, method_name: str, error: Exception, url: str) -> None: + """[UNIFIED-ERROR] Обработка API-ошибок + @semantic: Преобразует requests исключения в наши типы + """ + context = { + "method": method_name, + "url": url, + "status_code": getattr(error.response, 'status_code', None) + } + + if isinstance(error, requests.Timeout): + raise NetworkError("Request timeout", context=context) from error + elif getattr(error.response, 'status_code', None) == 403: + raise PermissionDeniedError(context=context) from error + else: + raise SupersetAPIError(str(error), context=context) from error + + # [SECTION] EXPORT OPERATIONS + def export_dashboard(self, dashboard_id: int) -> Tuple[bytes, str]: + """[CONTRACT] Экспорт дашборда в ZIP-архив + @pre: + - dashboard_id должен существовать + - Пользователь имеет права на экспорт + @post: + - Возвращает кортеж (бинарное содержимое, имя файла) + - Имя файла извлекается из headers или генерируется + @errors: + - DashboardNotFoundError если дашборд не существует + - ExportError при проблемах экспорта + """ + url = f"{self.config.base_url}/dashboard/export/" + self.logger.debug( + "[EXPORT_START] Запуск экспорта", + extra={"dashboard_id": dashboard_id, "export_url": url} + ) + + try: + response = self._execute_export_request(dashboard_id, url) + self._validate_export_response(response, dashboard_id) + + filename = self._resolve_export_filename(response, dashboard_id) return response.content, filename - except requests.exceptions.RequestException as e: - self.logger.error(f"Ошибка при экспорте: {str(e)}", exc_info=True) - raise SupersetAPIError(f"Export failed: {str(e)}") from e + except requests.exceptions.HTTPError as http_err: + error_ctx = { + "dashboard_id": dashboard_id, + "status_code": http_err.response.status_code + } + if http_err.response.status_code == 404: + self.logger.error( + "[EXPORT_FAILED] Дашборд не найден", + extra=error_ctx + ) + raise DashboardNotFoundError(dashboard_id, context=error_ctx) + raise ExportError("HTTP ошибка экспорта", context=error_ctx) from http_err - def import_dashboard(self, zip_path) -> Dict: - """Импортирует дашборд в Superset из ZIP-архива с детальной обработкой ошибок. + except requests.exceptions.RequestException as req_err: + error_ctx = {"dashboard_id": dashboard_id} + self.logger.error( + "[EXPORT_FAILED] Ошибка запроса", + exc_info=True, + extra=error_ctx + ) + raise ExportError("Ошибка экспорта", context=error_ctx) from req_err - Параметры: - zip_path (Union[str, Path]): Путь к ZIP-файлу с дашбордом - - Возвращает: - dict: Ответ API в формате JSON с результатами импорта - - Пример использования: - result = client.import_dashboard(Path("my_dashboard.zip")) - print(f"Импортирован дашборд: {result['title']}") - - Примечания: - - Использует API Endpoint: /dashboard/import/ - - Автоматически устанавливает overwrite=true для перезаписи существующих дашбордов - - Удваивает стандартный таймаут для обработки длительных операций - - Удаляет Content-Type заголовок (автоматически генерируется для multipart/form-data) - - Архив должен содержать валидные JSON-метаданные в формате Superset - - Ответ может содержать информацию о созданных/обновленных ресурсах (датасеты, чарты, владельцы) - - При конфликте имен может потребоваться ручное разрешение через параметры импорта + def _execute_export_request(self, dashboard_id: int, url: str) -> requests.Response: + """[HELPER] Выполнение запроса экспорта + @coherence_check: + - Ответ должен иметь status_code 200 + - Content-Type: application/zip """ - url = f"{self.config.base_url}/dashboard/import/" - self.logger.debug(f"Импортируем дашборд ID {zip_path} на {url}...") - - # Валидация входного файла + response = self.session.get( + url, + headers=self.headers, + params={"q": f"[{dashboard_id}]"}, + timeout=self.config.timeout, + stream=True + ) + response.raise_for_status() + return response + + def _validate_export_response(self, response: requests.Response, dashboard_id: int) -> None: + """[HELPER] Валидация ответа экспорта + @semantic: + - Проверка Content-Type + - Проверка наличия данных + """ + if 'application/zip' not in response.headers.get('Content-Type', ''): + self.logger.error( + "[EXPORT_VALIDATION_FAILED] Неверный Content-Type", + extra={ + "dashboard_id": dashboard_id, + "content_type": response.headers.get('Content-Type') + } + ) + raise ExportError("Получен не ZIP-архив") + + if not response.content: + self.logger.error( + "[EXPORT_VALIDATION_FAILED] Пустой ответ", + extra={"dashboard_id": dashboard_id} + ) + raise ExportError("Получены пустые данные") + + def _resolve_export_filename(self, response: requests.Response, dashboard_id: int) -> str: + """[HELPER] Определение имени экспортируемого файла + @fallback: Генерирует имя если не найден заголовок + """ + filename = get_filename_from_headers(response.headers) + if not filename: + filename = f"dashboard_export_{dashboard_id}_{datetime.now().strftime('%Y%m%d')}.zip" + self.logger.debug( + "[EXPORT_FALLBACK] Используется сгенерированное имя файла", + extra={"filename": filename} + ) + return filename + + def export_to_file(self, dashboard_id: int, output_dir: Union[str, Path]) -> Path: + """[CONTRACT] Экспорт дашборда прямо в файл + @pre: + - output_dir должен существовать + - Доступ на запись в директорию + @post: + - Возвращает Path сохраненного файла + - Создает поддиректорию с именем дашборда + """ + output_dir = Path(output_dir) + if not output_dir.exists(): + self.logger.error( + "[EXPORT_PRE_FAILED] Директория не существует", + extra={"output_dir": str(output_dir)} + ) + raise FileNotFoundError(f"Директория {output_dir} не найдена") + + content, filename = self.export_dashboard(dashboard_id) + target_path = output_dir / filename + try: - if not Path(zip_path).exists(): - raise FileNotFoundError(f"Файл не найден: {zip_path}") + with open(target_path, 'wb') as f: + f.write(content) - if not zipfile.is_zipfile(zip_path): - raise InvalidZipFormatError(f"Файл не является ZIP-архивом: {zip_path}") + self.logger.info( + "[EXPORT_SUCCESS] Дашборд сохранен на диск", + extra={ + "dashboard_id": dashboard_id, + "file_path": str(target_path), + "file_size": len(content) + } + ) + return target_path + + except IOError as io_err: + self.logger.error( + "[EXPORT_IO_FAILED] Ошибка записи файла", + exc_info=True, + extra={"target_path": str(target_path)} + ) + raise ExportError("Ошибка сохранения файла") from io_err + + # [SECTION] Основной интерфейс API + def get_dashboards(self, query: Optional[Dict] = None) -> Tuple[int, List[Dict]]: + """[CONTRACT] Получение списка дашбордов с пагинацией + @pre: + - Клиент должен быть авторизован + - Параметры пагинации должны быть валидны + @post: + - Возвращает кортеж (total_count, список метаданных) + - Поддерживает кастомные query-параметры + @invariant: + - Всегда возвращает полный список (обходит пагинацию) + """ + url = f"{self.config.base_url}/dashboard/" + self.logger.debug( + "[API_CALL] Запрос списка дашбордов", + extra={"query": query} + ) + + # [COHERENCE_CHECK] Валидация параметров + validated_query = self._validate_query_params(query) + + try: + # Инициализация пагинации + total_count = self._fetch_total_count(url) + paginated_data = self._fetch_all_pages(url, validated_query, total_count) - # Дополнительная проверка содержимого архива - with zipfile.ZipFile(zip_path) as zf: - if not any(name.endswith('metadata.yaml') for name in zf.namelist()): - raise DashboardNotFoundError("Архив не содержит metadata.yaml") - - except (FileNotFoundError, InvalidZipFormatError, DashboardNotFoundError) as e: - self.logger.error(f"Ошибка валидации архива: {str(e)}", exc_info=True) - raise - - headers = { - k: v for k, v in self.headers.items() - if k.lower() != 'content-type' - } + self.logger.info( + "[API_SUCCESS] Дашборды получены", + extra={"count": total_count} + ) + return total_count, paginated_data + + except requests.exceptions.RequestException as e: + error_ctx = {"method": "get_dashboards", "query": validated_query} + self._handle_api_error("Пагинация дашбордов", e, error_ctx) + # [SECTION] Импорт/экспорт + def import_dashboard(self, zip_path: Union[str, Path]) -> Dict: + """[CONTRACT] Импорт дашборда из архива + @pre: + - Файл должен существовать и быть валидным ZIP + - Должны быть права на импорт + @post: + - Возвращает метаданные импортированного дашборда + - При конфликтах выполняет overwrite + """ + self._validate_import_file(zip_path) + try: with open(zip_path, 'rb') as f: - files = { - 'formData': ( - Path(zip_path).name, - f, - 'application/x-zip-compressed' - ) - } - - response = self.session.post( - url, - files=files, - data={'overwrite': 'true'}, - headers=headers, - timeout=self.config.timeout * 2 + return self._execute_import( + file_obj=f, + file_name=Path(zip_path).name ) - - # Обработка HTTP-ошибок - if response.status_code == 404: - raise DashboardNotFoundError("Эндпоинт импорта не найден") - elif response.status_code == 403: - raise PermissionDeniedError("Недостаточно прав для импорта") - elif response.status_code >= 500: - raise SupersetServerError(f"Ошибка сервера: {response.status_code}") - - response.raise_for_status() - - self.logger.info(f"Дашборд успешно импортирован из {zip_path}") - return response.json() - - except requests.exceptions.ConnectionError as e: - error_msg = f"Ошибка соединения: {str(e)}" - self.logger.error(error_msg, exc_info=True) - raise NetworkError(error_msg) from e - - except requests.exceptions.Timeout as e: - error_msg = f"Таймаут при импорте дашборда" - self.logger.error(error_msg, exc_info=True) - raise NetworkError(error_msg) from e - - except requests.exceptions.RequestException as e: - error_msg = f"Ошибка при импорте: {str(e)}" - self.logger.error(error_msg, exc_info=True) - raise DashboardImportError(error_msg) from e - except Exception as e: - error_msg = f"Неожиданная ошибка: {str(e)}" - self.logger.critical(error_msg, exc_info=True) - raise DashboardImportError(error_msg) from e + self.logger.error( + "[IMPORT_FAILED] Критическая ошибка импорта", + exc_info=True, + extra={"file": str(zip_path)} + ) + raise DashboardImportError(f"Import failed: {str(e)}") from e + + # [SECTION] Приватные методы-помощники + def _validate_query_params(self, query: Optional[Dict]) -> Dict: + """[HELPER] Нормализация параметров запроса""" + base_query = { + "columns": ["slug", "id", "changed_on_utc", "dashboard_title", "published"], + "page": 0, + "page_size": 20 + } + return {**base_query, **(query or {})} + + def _fetch_total_count(self, url: str) -> int: + """[HELPER] Получение общего количества дашбордов""" + count_response = self.session.get( + f"{url}?q={json.dumps({'columns': ['id'], 'page': 0, 'page_size': 1})}", + headers=self.headers, + timeout=self.config.timeout + ) + count_response.raise_for_status() + return count_response.json()['count'] + + def _fetch_all_pages(self, url: str, query: Dict, total_count: int) -> List[Dict]: + """[HELPER] Обход всех страниц с пагинацией""" + results = [] + page_size = query['page_size'] + total_pages = (total_count + page_size - 1) // page_size + + for page in range(total_pages): + query['page'] = page + response = self.session.get( + url, + headers=self.headers, + params={"q": json.dumps(query)}, + timeout=self.config.timeout + ) + response.raise_for_status() + results.extend(response.json().get('result', [])) + + return results + + def _validate_import_file(self, zip_path: Union[str, Path]) -> None: + """[HELPER] Проверка файла перед импортом""" + path = Path(zip_path) + if not path.exists(): + raise FileNotFoundError(f"[FILE_ERROR] {zip_path} не существует") + + if not zipfile.is_zipfile(path): + raise InvalidZipFormatError(f"[FILE_ERROR] {zip_path} не ZIP-архив") + + with zipfile.ZipFile(path) as zf: + if not any(n.endswith('metadata.yaml') for n in zf.namelist()): + raise DashboardNotFoundError("Архив не содержит metadata.yaml") + + def _execute_import(self, file_obj: BinaryIO, file_name: str) -> Dict: + """[HELPER] Выполнение API-запроса импорта""" + url = f"{self.config.base_url}/dashboard/import/" + + files = {'formData': (file_name, file_obj, 'application/x-zip-compressed')} + headers = {k: v for k, v in self.headers.items() if k.lower() != 'content-type'} + + response = self.session.post( + url, + files=files, + data={'overwrite': 'true'}, + headers=headers, + timeout=self.config.timeout * 2 # Увеличенный таймаут для импорта + ) + + if response.status_code == 403: + raise PermissionDeniedError("Недостаточно прав для импорта") + + response.raise_for_status() + return response.json() \ No newline at end of file diff --git a/superset_tool/exceptions.py b/superset_tool/exceptions.py index 18e0d76..732867a 100644 --- a/superset_tool/exceptions.py +++ b/superset_tool/exceptions.py @@ -1,32 +1,79 @@ +# [MODULE] Иерархия исключений +# @contract: Все ошибки наследуют SupersetToolError +# @semantic: Каждый тип соответствует конкретной проблемной области +# @coherence: +# - Полное покрытие всех сценариев клиента +# - Четкая классификация по уровню серьезности + +# [IMPORTS] Exceptions +from typing import Optional, Dict, Any + class SupersetToolError(Exception): - """Base exception class for all tool errors""" + """[BASE] Базовый класс ошибок инструмента + @semantic: Должен содержать контекст для диагностики + """ + def __init__(self, message: str, context: Optional[dict] = None): + self.context = context or {} + super().__init__(f"{message} | Context: {self.context}") +# [ERROR-GROUP] Проблемы аутентификации и авторизации class AuthenticationError(SupersetToolError): - """Authentication related errors""" + """[AUTH] Ошибки credentials или доступа + @context: url, username, error_detail + """ + def __init__(self, message="Auth failed", **context): + super().__init__( + f"[AUTH_FAILURE] {message}", + {"type": "authentication", **context} + ) +class PermissionDeniedError(AuthenticationError): + """[AUTH] Ошибка отказа в доступе из-за недостаточных прав + @context: required_permission, user_roles + """ + def __init__(self, required_permission: str, **context): + super().__init__( + f"Permission denied: {required_permission}", + {"type": "authorization", "required_permission": required_permission, **context} + ) + +# [ERROR-GROUP] Проблемы API-вызовов class SupersetAPIError(SupersetToolError): - """General API communication errors""" + """[API] Ошибки взаимодействия с Superset API + @context: endpoint, method, status_code, response + """ + def __init__(self, message="API error", **context): + super().__init__( + f"[API_FAILURE] {message}", + {"type": "api_call", **context} + ) -class ExportError(SupersetToolError): - """Dashboard export errors""" +# [ERROR-SUBCLASS] Детализированные ошибки API +class ExportError(SupersetAPIError): + """[API:EXPORT] Проблемы экспорта дашбордов""" + ... -class ImportError(SupersetToolError): - """Dashboard import errors""" +class DashboardNotFoundError(SupersetAPIError): + """[API:404] Запрошенный ресурс не существует""" + def __init__(self, dashboard_id, **context): + super().__init__( + f"Dashboard {dashboard_id} not found", + {"dashboard_id": dashboard_id, **context} + ) + +# [ERROR-SUBCLASS] Детализированные ошибки обработки файлов +class InvalidZipFormatError(SupersetAPIError): + """[API:ZIP] Некорректный формат ZIP-архива + @context: file_path, expected_format, error_detail + """ + def __init__(self, file_path: str, **context): + super().__init__( + f"Invalid ZIP format for file: {file_path}", + {"type": "zip_validation", "file_path": file_path, **context} + ) -class InvalidZipFormatError(SupersetToolError): - "Archive zip errors" - -class DashboardNotFoundError(SupersetToolError): - "404 error" - -class PermissionDeniedError(SupersetToolError): - "403 error" - -class SupersetServerError(SupersetToolError): - "500 error" +# [ERROR-GROUP] Системные и network-ошибки class NetworkError(SupersetToolError): - "Network errors" - -class DashboardImportError(SupersetToolError): - "Api import errors" \ No newline at end of file + """[NETWORK] Проблемы соединения или таймауты""" + ... \ No newline at end of file diff --git a/superset_tool/models.py b/superset_tool/models.py index 9de2424..047143b 100644 --- a/superset_tool/models.py +++ b/superset_tool/models.py @@ -1,21 +1,98 @@ -# models.py -from pydantic import BaseModel, validator -from typing import Optional +# [MODULE] Сущности данных конфигурации +# @desc: Определяет структуры данных для работы с Superset API +# @contracts: +# - Проверка валидности URL +# - Валидация параметров аутентификации +# @coherence: +# - Все модели согласованы с API Superset v1 +# - Совместимы с клиентскими методами + +# [IMPORTS] Models +from typing import Optional, Dict, Any +from pydantic import BaseModel, validator,Field from .utils.logger import SupersetLogger class SupersetConfig(BaseModel): - base_url: str + """[CONFIG] Конфигурация подключения к Superset + @semantic: Основные параметры подключения к API + @invariant: + - base_url должен содержать версию API (/v1/) + - auth должен содержать все обязательные поля + """ + base_url: str = Field(..., regex=r'.*/api/v1.*') auth: dict verify_ssl: bool = True timeout: int = 30 logger: Optional[SupersetLogger] = None - class Config: - arbitrary_types_allowed = True # Разрешаем произвольные типы + # [VALIDATOR] Проверка параметров аутентификации + @validator('auth') + def validate_auth(cls, v): + required = {'provider', 'username', 'password', 'refresh'} + if not required.issubset(v.keys()): + raise ValueError( + f"[CONTRACT_VIOLATION] Auth must contain {required}" + ) + return v + class Config: + arbitrary_types_allowed = True + json_schema_extra = { + "example": { + "base_url": "https://host/api/v1/", + "auth": { + "provider": "db", + "username": "user", + "password": "pass", + "refresh": True + } + } + } + + +# [SEMANTIC-TYPE] Конфигурация БД для миграций class DatabaseConfig(BaseModel): - database_config: dict + """[CONFIG] Параметры трансформации БД при миграции + @semantic: Содержит old/new состояние для преобразования + @invariant: + - Должны быть указаны оба состояния (old/new) + - UUID должен соответствовать формату + """ + database_config: Dict[str, Dict[str, Any]] logger: Optional[SupersetLogger] = None + @validator('database_config') + def validate_config(cls, v): + if not {'old', 'new'}.issubset(v.keys()): + raise ValueError( + "[COHERENCE_ERROR] Config must contain both old/new states" + ) + return v + class Config: - arbitrary_types_allowed = True \ No newline at end of file + arbitrary_types_allowed = True + json_schema_extra = { + "example": { + "database_config": { + "old": + { + "database_name": "Prod Clickhouse", + "sqlalchemy_uri": "clickhousedb+connect://clicketl:XXXXXXXXXX@rgm-s-khclk.hq.root.ad:443/dm", + "uuid": "b9b67cb5-9874-4dc6-87bd-354fc33be6f9", + "database_uuid": "b9b67cb5-9874-4dc6-87bd-354fc33be6f9", + "allow_ctas": "false", + "allow_cvas": "false", + "allow_dml": "false" + }, + "new": { + "database_name": "Dev Clickhouse", + "sqlalchemy_uri": "clickhousedb+connect://dwhuser:XXXXXXXXXX@10.66.229.179:8123/dm", + "uuid": "e9fd8feb-cb77-4e82-bc1d-44768b8d2fc2", + "database_uuid": "e9fd8feb-cb77-4e82-bc1d-44768b8d2fc2", + "allow_ctas": "true", + "allow_cvas": "true", + "allow_dml": "true" + } + } + } + } \ No newline at end of file diff --git a/superset_tool/utils/fileio.py b/superset_tool/utils/fileio.py index 4143a1a..f309dc9 100644 --- a/superset_tool/utils/fileio.py +++ b/superset_tool/utils/fileio.py @@ -1,35 +1,60 @@ +# [MODULE] File Operations Manager +# @desc: Управление файловыми операциями для дашбордов Superset +# @contracts: +# 1. Валидация ZIP-архивов +# 2. Работа с YAML-конфигами +# 3. Управление директориями +# @coherence: +# - Согласован с SupersetClient +# - Поддерживает все форматы экспорта Superset + +# [IMPORTS] Core +import os import re import zipfile from pathlib import Path -from typing import Optional, Tuple, Dict, Any -import datetime -import shutil -import yaml -import tempfile -import os +from typing import Any, Optional, Tuple, Dict, List, Literal, Union, BinaryIO, LiteralString +from collections import defaultdict +from datetime import date from contextlib import contextmanager + +# [IMPORTS] Third-party +import yaml +import shutil +import tempfile +from datetime import datetime + +# [IMPORTS] Local +from ..models import DatabaseConfig +from ..exceptions import InvalidZipFormatError, DashboardNotFoundError from ..utils.logger import SupersetLogger +# [CONSTANTS] +ALLOWED_FOLDERS = {'databases', 'datasets', 'charts', 'dashboards'} + +# [CONTRACT] Временные ресурсы @contextmanager def create_temp_file( content: Optional[bytes] = None, suffix: str = ".zip", mode: str = 'wb', logger: Optional[SupersetLogger] = None -): - """Расширенный контекстный менеджер для временных файлов/директорий""" +) -> Path: + """[CONTEXT-MANAGER] Создание временного файла/директории + @pre: + - suffix должен быть допустимым расширением + - mode соответствует типу содержимого + @post: + - Возвращает Path созданного ресурса + - Гарантирует удаление временного файла при выходе + """ logger = logger or SupersetLogger(name="fileio", console=False) try: - logger.debug(f"Создание временного ресурса с суффиксом {suffix}") - - # Для директорий if suffix.startswith('.dir'): with tempfile.TemporaryDirectory(suffix=suffix) as tmp_dir: logger.debug(f"Создана временная директория: {tmp_dir}") yield Path(tmp_dir) - - # Для файлов else: with tempfile.NamedTemporaryFile(suffix=suffix, mode=mode, delete=False) as tmp: if content: @@ -38,22 +63,223 @@ def create_temp_file( logger.debug(f"Создан временный файл: {tmp.name}") yield Path(tmp.name) except Exception as e: - logger.error(f"Ошибка создания временного ресурса: {str(e)}", exc_info=True) + logger.error(f"[TEMP_FILE_ERROR] Ошибка создания ресурса: {str(e)}", exc_info=True) raise finally: if 'tmp' in locals() and Path(tmp.name).exists() and not suffix.startswith('.dir'): - Path(tmp.name).unlink() - logger.debug(f"Временный файл {tmp.name} удален") + Path(tmp.name).unlink(missing_ok=True) + +# [SECTION] Directory Management Utilities + +def remove_empty_directories( + root_dir: str, + exclude: Optional[List[str]] = None, + logger: Optional[SupersetLogger] = None +) -> int: + """[CONTRACT] Рекурсивное удаление пустых директорий + @pre: + - root_dir должен существовать и быть директорией + - exclude не должен содержать некорректных символов + @post: + - Возвращает количество удаленных директорий + - Не удаляет директории из списка exclude + - Гарантирует рекурсивную обработку вложенных папок + """ + logger = logger or SupersetLogger(name="fileio", console=False) + logger.info(f"[DIR_CLEANUP] Старт очистки пустых директорий в {root_dir}") + + excluded = set(exclude or []) + removed_count = 0 + root_path = Path(root_dir) + + # [VALIDATION] Проверка корневой директории + if not root_path.exists(): + logger.error(f"[DIR_ERROR] Директория не существует: {root_dir}") + return 0 + + try: + # [PROCESSING] Рекурсивный обход снизу вверх + for current_dir, _, files in os.walk(root_path, topdown=False): + current_path = Path(current_dir) + + # [EXCLUSION] Пропуск исключенных директорий + if any(excluded_part in current_path.parts for excluded_part in excluded): + logger.debug(f"[DIR_SKIP] Пропущено по исключению: {current_dir}") + continue + + # [REMOVAL] Удаление пустых директорий + if not any(current_path.iterdir()): + try: + current_path.rmdir() + removed_count += 1 + logger.info(f"[DIR_REMOVED] Удалена пустая директория: {current_dir}") + except OSError as e: + logger.error(f"[DIR_ERROR] Ошибка удаления {current_dir}: {str(e)}") + + except Exception as e: + logger.error(f"[DIR_CLEANUP_ERROR] Критическая ошибка: {str(e)}", exc_info=True) + raise + + logger.info(f"[DIR_RESULT] Удалено {removed_count} пустых директорий") + return removed_count +# [SECTION] File Operations + +def read_dashboard_from_disk( + file_path: str, + logger: Optional[SupersetLogger] = None +) -> Tuple[bytes, str]: + """[CONTRACT] Чтение сохраненного дашборда с диска + @pre: + - file_path должен существовать + - Файл должен быть доступен для чтения + @post: + - Возвращает (содержимое файла, имя файла) + - Сохраняет целостность данных + """ + logger = logger or SupersetLogger(name="fileio", console=False) + + try: + path = Path(file_path) + if not path.exists(): + raise FileNotFoundError(f"[FILE_MISSING] Файл дашборда не найден: {file_path}") + + logger.info(f"[FILE_READ] Чтение дашборда с диска: {file_path}") + + with open(file_path, "rb") as f: + content = f.read() + + if not content: + logger.warning("[FILE_EMPTY] Файл существует, но пуст") + + return content, path.name + + except Exception as e: + logger.error(f"[FILE_READ_ERROR] Ошибка чтения: {str(e)}", exc_info=True) + raise + + +# [SECTION] Archive Management + +def archive_exports( + output_dir: str, + daily_retention: int = 7, + weekly_retention: int = 4, + monthly_retention: int = 12, + logger: Optional[SupersetLogger] = None +) -> None: + """[CONTRACT] Управление архивом экспортированных дашбордов + @pre: + - output_dir должен существовать + - Значения retention должны быть >= 0 + @post: + - Сохраняет файлы согласно политике хранения + - Удаляет устаревшие архивы + - Сохраняет логическую структуру каталогов + """ + logger = logger or SupersetLogger(name="fileio", console=False) + logger.info(f"[ARCHIVE] Старт очистки архивов в {output_dir}") + + # [VALIDATION] Проверка параметров + if not all(isinstance(x, int) and x >= 0 for x in [daily_retention, weekly_retention, monthly_retention]): + raise ValueError("[CONFIG_ERROR] Значения retention должны быть положительными") + + try: + export_dir = Path(output_dir) + files_with_dates = [] + + # [PROCESSING] Сбор данных о файлах + for file in export_dir.glob("*.zip"): + try: + timestamp_str = file.stem.split('_')[-1].split('T')[0] + file_date = datetime.strptime(timestamp_str, "%Y%m%d").date() + except (ValueError, IndexError): + file_date = datetime.fromtimestamp(file.stat().st_mtime).date() + logger.warning(f"[DATE_PARSE] Используется дата модификации для {file.name}") + + files_with_dates.append((file, file_date)) + + # [PROCESSING] Применение политик хранения + keep_files = apply_retention_policy( + files_with_dates, + daily_retention, + weekly_retention, + monthly_retention, + logger + ) + + # [CLEANUP] Удаление устаревших файлов + for file, _ in files_with_dates: + if file not in keep_files: + file.unlink(missing_ok=True) + logger.info(f"[FILE_REMOVED] Удален архив: {file.name}") + + except Exception as e: + logger.error(f"[ARCHIVE_ERROR] Ошибка обработки архивов: {str(e)}", exc_info=True) + raise + +def apply_retention_policy( + files_with_dates: List[Tuple[Path, date]], + daily: int, + weekly: int, + monthly: int, + logger: SupersetLogger +) -> set: + """[HELPER] Применение политик хранения файлов + @pre: + - files_with_dates должен содержать валидные даты + @post: + - Возвращает set файлов для сохранения + - Соответствует указанным retention-правилам + """ + # [GROUPING] Группировка файлов + daily_groups = defaultdict(list) + weekly_groups = defaultdict(list) + monthly_groups = defaultdict(list) + + for file, file_date in files_with_dates: + daily_groups[file_date].append(file) + weekly_groups[(file_date.isocalendar().year, file_date.isocalendar().week)].append(file) + monthly_groups[(file_date.year, file_date.month)].append(file) + + # [SELECTION] Выбор файлов для сохранения + keep_files = set() + + # Daily - последние N дней + sorted_daily = sorted(daily_groups.keys(), reverse=True)[:daily] + for day in sorted_daily: + keep_files.update(daily_groups[day]) + + # Weekly - последние N недель + sorted_weekly = sorted(weekly_groups.keys(), reverse=True)[:weekly] + for week in sorted_weekly: + keep_files.update(weekly_groups[week]) + + # Monthly - последние N месяцев + sorted_monthly = sorted(monthly_groups.keys(), reverse=True)[:monthly] + for month in sorted_monthly: + keep_files.update(monthly_groups[month]) + + logger.debug(f"[RETENTION] Сохранено файлов: {len(keep_files)}") + return keep_files + +# [CONTRACT] Сохранение и распаковка дашборда def save_and_unpack_dashboard( zip_content: bytes, - output_dir: str = "dashboards", + output_dir: Union[str, Path], unpack: bool = False, original_filename: Optional[str] = None, logger: Optional[SupersetLogger] = None ) -> Tuple[Path, Optional[Path]]: - """Сохраняет и распаковывает дашборд с логированием""" + """[OPERATION] Обработка ZIP-архива дашборда + @pre: + - zip_content должен быть валидным ZIP + - output_dir должен существовать или быть возможным для создания + @post: + - Возвращает (путь_к_архиву, путь_распаковки) или (путь_к_архиву, None) + - Сохраняет оригинальную структуру файлов + """ logger = logger or SupersetLogger(name="fileio", console=False) logger.info(f"Старт обработки дашборда. Распаковка: {unpack}") @@ -62,10 +288,9 @@ def save_and_unpack_dashboard( output_path.mkdir(parents=True, exist_ok=True) logger.debug(f"Директория {output_path} создана/проверена") - zip_name = sanitize_filename( - original_filename) if original_filename else None + zip_name = sanitize_filename(original_filename) if original_filename else None if not zip_name: - timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") zip_name = f"dashboard_export_{timestamp}.zip" logger.debug(f"Сгенерировано имя файла: {zip_name}") @@ -75,157 +300,200 @@ def save_and_unpack_dashboard( with open(zip_path, "wb") as f: f.write(zip_content) - logger.info(f"Дашборд успешно сохранен: {zip_path}") - if unpack: - extract_path = output_path - logger.debug(f"Начало распаковки в: {extract_path}") - with zipfile.ZipFile(zip_path, 'r') as zip_ref: - zip_ref.extractall(extract_path) - - logger.info(f"Дашборд распакован в: {extract_path}") - return zip_path, extract_path + zip_ref.extractall(output_path) + logger.info(f"Дашборд распакован в: {output_path}") + return zip_path, output_path return zip_path, None + except zipfile.BadZipFile as e: + logger.error(f"[ZIP_ERROR] Невалидный ZIP-архив: {str(e)}") + raise InvalidZipFormatError(f"Invalid ZIP file: {str(e)}") from e except Exception as e: - logger.error(f"Ошибка обработки дашборда: {str(e)}", exc_info=True) - raise RuntimeError(f"Failed to unpack dashboard: {str(e)}") from e + logger.error(f"[UNPACK_ERROR] Ошибка обработки: {str(e)}", exc_info=True) + raise -def print_directory(root_dir): - if not os.path.isdir(root_dir): - print(f"Error: '{root_dir}' is not a valid directory") - return - - # Печатаем корневую директорию - print(f"{root_dir}/") - - # Получаем список элементов в корневой директории - with os.scandir(root_dir) as entries: - for entry in entries: - # Определяем отступ и форматирование - line = " ├── " if entry.name != sorted(os.listdir(root_dir))[-1] else " └── " - suffix = "/" if entry.is_dir() else "" - print(f"{line}{entry.name}{suffix}") - -def validate_directory_structure(root_dir): - # Проверяем корневую папку - root_items = os.listdir(root_dir) - if len(root_items) != 1: - return False - - subdir_name = root_items[0] - subdir_path = os.path.join(root_dir, subdir_name) - if not os.path.isdir(subdir_path): - return False - - # Проверяем вложенную папку - subdir_items = os.listdir(subdir_path) - - # Проверяем наличие metadata.yaml - if 'metadata.yaml' not in subdir_items: - return False - if not os.path.isfile(os.path.join(subdir_path, 'metadata.yaml')): - return False - - # Проверяем допустимые папки - allowed_folders = {'databases', 'datasets', 'charts', 'dashboards'} - found_folders = set() - - for item in subdir_items: - item_path = os.path.join(subdir_path, item) - - # Пропускаем файл метаданных - if item == 'metadata.yaml': - continue - - # Проверяем что элемент является папкой - if not os.path.isdir(item_path): - return False - - # Проверяем допустимость имени папки - if item not in allowed_folders: - return False - - # Проверяем уникальность папки - if item in found_folders: - return False - found_folders.add(item) - - # Проверяем количество папок - if not 1 <= len(found_folders) <= 4: - return False - - return True - -def create_dashboard_export(zip_name, source_paths, - exclude_extensions=None, - compress_type=zipfile.ZIP_DEFLATED, - logger: Optional[SupersetLogger] = None): - """ - Создает ZIP-архив с сохранением оригинальной структуры директорий - - Параметры: - zip_name (str): Имя создаваемого ZIP-архива - source_paths (list): Список путей для добавления в архив - exclude_extensions (list): Расширения файлов для исключения (например, ['.tmp', '.log']) - compress_type: Тип сжатия (по умолчанию ZIP_DEFLATED) +def print_directory( + root_dir: str, + logger: Optional[SupersetLogger] = None +) -> None: + """[CONTRACT] Визуализация структуры директории в древовидном формате + @pre: + - root_dir должен быть валидным путем к директории + @post: + - Выводит в консоль и логи структуру директории + - Не модифицирует файловую систему + @errors: + - ValueError если путь не существует или не является директорией """ logger = logger or SupersetLogger(name="fileio", console=False) + logger.debug(f"[DIR_TREE] Начало построения дерева для {root_dir}") - for path in source_paths: - if not validate_directory_structure(path): - logger.error(f"Некорректная структура директории: {path} [1]") - logger.error(print_directory(path)) + try: + root_path = Path(root_dir) - logger.info(f"Упаковываем дашборд {source_paths} в {zip_name}") + # [VALIDATION] Проверка существования и типа + if not root_path.exists(): + raise ValueError(f"Путь не существует: {root_dir}") + if not root_path.is_dir(): + raise ValueError(f"Указан файл вместо директории: {root_dir}") + + # [OUTPUT] Форматированный вывод + print(f"\n{root_dir}/") + with os.scandir(root_dir) as entries: + entries = sorted(entries, key=lambda e: e.name) + for idx, entry in enumerate(entries): + is_last = idx == len(entries) - 1 + prefix = " └── " if is_last else " ├── " + suffix = "/" if entry.is_dir() else "" + print(f"{prefix}{entry.name}{suffix}") + + logger.info(f"[DIR_TREE] Успешно построено дерево для {root_dir}") + + except Exception as e: + error_msg = f"[DIR_TREE_ERROR] Ошибка визуализации: {str(e)}" + logger.error(error_msg, exc_info=True) + raise ValueError(error_msg) from e + + +def validate_directory_structure( + root_dir: str, + logger: Optional[SupersetLogger] = None +) -> bool: + """[CONTRACT] Валидация структуры директории экспорта Superset + @pre: + - root_dir должен быть валидным путем + @post: + - Возвращает True если структура соответствует требованиям: + 1. Ровно один подкаталог верхнего уровня + 2. Наличие metadata.yaml + 3. Допустимые имена поддиректорий (databases/datasets/charts/dashboards) + @errors: + - ValueError при некорректном пути + """ + logger = logger or SupersetLogger(name="fileio", console=False) + logger.info(f"[DIR_VALIDATION] Валидация структуры в {root_dir}") + + try: + root_path = Path(root_dir) + + # [BASE VALIDATION] + if not root_path.exists(): + raise ValueError(f"Директория не существует: {root_dir}") + if not root_path.is_dir(): + raise ValueError(f"Требуется директория, получен файл: {root_dir}") + + root_items = os.listdir(root_dir) + + # [CHECK 1] Ровно один подкаталог верхнего уровня + if len(root_items) != 1: + logger.warning(f"[VALIDATION_FAIL] Ожидается 1 подкаталог, найдено {len(root_items)}") + return False + + subdir_path = root_path / root_items[0] + + # [CHECK 2] Должен быть подкаталог + if not subdir_path.is_dir(): + logger.warning(f"[VALIDATION_FAIL] {root_items[0]} не является директорией") + return False + + # [CHECK 3] Проверка metadata.yaml + if "metadata.yaml" not in os.listdir(subdir_path): + logger.warning("[VALIDATION_FAIL] Отсутствует metadata.yaml") + return False + + # [CHECK 4] Валидация поддиректорий + found_folders = set() + for item in os.listdir(subdir_path): + if item == "metadata.yaml": + continue + + item_path = subdir_path / item + if not item_path.is_dir(): + logger.warning(f"[VALIDATION_FAIL] {item} не является директорией") + return False + + if item not in ALLOWED_FOLDERS: + logger.warning(f"[VALIDATION_FAIL] Недопустимая директория: {item}") + return False + + if item in found_folders: + logger.warning(f"[VALIDATION_FAIL] Дубликат директории: {item}") + return False + + found_folders.add(item) + + # [FINAL CHECK] + valid_structure = ( + 1 <= len(found_folders) <= 4 and + all(folder in ALLOWED_FOLDERS for folder in found_folders) + ) + + if not valid_structure: + logger.warning( + f"[VALIDATION_FAIL] Некорректный набор директорий: {found_folders}" + ) + + return valid_structure + + except Exception as e: + error_msg = f"[DIR_VALIDATION_ERROR] Критическая ошибка: {str(e)}" + logger.error(error_msg, exc_info=True) + raise ValueError(error_msg) from e + +# [CONTRACT] Создание ZIP-архива +def create_dashboard_export( + zip_path: Union[str, Path], + source_paths: List[Union[str, Path]], + exclude_extensions: Optional[List[str]] = None, + validate_source: bool = False, + logger: Optional[SupersetLogger] = None +) -> bool: + """[OPERATION] Упаковка дашборда в архив + @pre: + - source_paths должны существовать + - Должны быть права на запись в zip_path + @post: + - Возвращает True если создание успешно + - Сохраняет оригинальную структуру папок + """ + logger = logger or SupersetLogger(name="fileio", console=False) + logger.info(f"Упаковка дашбордов: {source_paths} -> {zip_path}") + try: exclude_ext = [ext.lower() for ext in exclude_extensions] if exclude_extensions else [] - # Проверка существования исходных путей - for path in source_paths: - if not os.path.exists(path): - raise FileNotFoundError(f"Путь не найден: {path}") - - # Сбор всех файлов с их базовыми путями - files_with_base = [] - for path in source_paths: - abs_path = os.path.abspath(path) - if os.path.isfile(abs_path): - # Для файла: базовый путь = директория файла - base_path = os.path.dirname(abs_path) - files_with_base.append((abs_path, base_path)) - elif os.path.isdir(abs_path): - # Для директории: базовый путь = сама директория - base_path = abs_path - for root, _, files in os.walk(abs_path): - for file in files: - full_path = os.path.join(root, file) - files_with_base.append((full_path, base_path)) - - # Фильтрация по расширениям - if exclude_ext: - files_with_base = [ - (f, b) for f, b in files_with_base - if os.path.splitext(f)[1].lower() not in exclude_ext - ] - - # Создание архива - with zipfile.ZipFile(zip_name, 'w', compress_type) as zipf: - for file, base_path in files_with_base: - # Вычисляем относительный путь от base_path - rel_path = os.path.relpath(file, start=base_path) - arcname = os.path.join(Path(zip_name).stem, rel_path) - zipf.write(file, arcname=arcname) - logger.debug(f"Добавлен {arcname}") + with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: + for path in source_paths: + path = Path(path) + if not path.exists(): + raise FileNotFoundError(f"Путь не найден: {path}") - logger.info(f"Архив создан {zip_name}") + for item in path.rglob('*'): + if item.is_file() and item.suffix.lower() not in exclude_ext: + arcname = item.relative_to(path.parent) + zipf.write(item, arcname) + logger.debug(f"Добавлен в архив: {arcname}") + + logger.info(f"Архив создан: {zip_path}") return True + except Exception as e: - logger.error(f"\nОшибка: {str(e)}") + logger.error(f"[ZIP_CREATION_ERROR] Ошибка: {str(e)}", exc_info=True) return False + +# [UTILITY] Валидация имен файлов +def sanitize_filename(filename: str) -> str: + """[UTILITY] Очистка имени файла от опасных символов + @post: + - Возвращает безопасное имя файла без спецсимволов + """ + return re.sub(r'[\\/*?:"<>|]', "_", filename).strip() + + def get_filename_from_headers(headers: dict) -> Optional[str]: """Извлекает имя файла из заголовков HTTP-ответа""" content_disposition = headers.get("Content-Disposition", "") @@ -240,110 +508,6 @@ def get_filename_from_headers(headers: dict) -> Optional[str]: return filename_match[0].strip('"') return None -def sanitize_filename(filename: str) -> str: - """Очищает имя файла от потенциально опасных символов""" - return re.sub(r'[\\/*?:"<>|]', "_", filename).strip() - -def archive_exports( - output_dir: str, - daily_retention: int = 7, - weekly_retention: int = 4, - monthly_retention: int = 12, - yearly_retention: Optional[int] = None, - logger: Optional[SupersetLogger] = None -): - """Управление историей экспортов по политике GFS (Grandfather-Father-Son) - Параметры: - :daily_retention: - :weekly_retention: - :monthly_retention: - :yearly_retention: Optional[int] - Извлекает даты из стандартного суперсетовсого архива вида, либо берет дату изменения архива - dashboard_export_20250326T121517.zip""" - - logger = logger or SupersetLogger(name="fileio", console=False) - logger.info(f"Старт очистки архивов в {output_dir}") - export_dir = Path(output_dir) - files_with_dates = [] - - # Собираем файлы с их датами - for file in export_dir.glob("*.zip"): - # Извлекаем временную метку из имени файла - try: - # Разбиваем имя файла по шаблону: dashboard_export_YYYYMMDDTHHMMSS.zip - timestamp_str = file.stem.split('_')[-1] - date_str = timestamp_str.split('T')[0] # Отделяем дату от времени - date = datetime.datetime.strptime(date_str, "%Y%m%d").date() - except (ValueError, IndexError): - # Если не удалось распарсить - используем дату изменения файла - mtime = file.stat().st_mtime - date = datetime.datetime.fromtimestamp(mtime).date() - logger.warning(f"Использована дата модификации для {file.name}") - - files_with_dates.append((file, date)) - try: - # Сортируем файлы по дате (новые сначала) - files_with_dates.sort(key=lambda x: x[1], reverse=True) - - # Создаем группы для разных уровней резервирования - daily_groups = {} - weekly_groups = {} - monthly_groups = {} - yearly_groups = {} - - for file, date in files_with_dates: - # Группировка по дням - daily_groups.setdefault(date, file) - - # Группировка по неделям - year, week, _ = date.isocalendar() - weekly_groups.setdefault((year, week), file) - - # Группировка по месяцам - monthly_groups.setdefault((date.year, date.month), file) - - # Группировка по годам - yearly_groups.setdefault(date.year, file) - - # Выбираем файлы для сохранения - keep_files = set() - - # Daily - последние N дней - sorted_daily = sorted(daily_groups.keys(), reverse=True)[ - :daily_retention] - keep_files.update(daily_groups[d] for d in sorted_daily) - - # Weekly - последние N недель - sorted_weekly = sorted(weekly_groups.keys(), reverse=True)[ - :weekly_retention] - keep_files.update(weekly_groups[w] for w in sorted_weekly) - - # Monthly - последние N месяцев - sorted_monthly = sorted(monthly_groups.keys(), reverse=True)[ - :monthly_retention] - keep_files.update(monthly_groups[m] for m in sorted_monthly) - - # Yearly - все или последние N лет - if yearly_retention is not None: - sorted_yearly = sorted(yearly_groups.keys(), reverse=True)[ - :yearly_retention] - keep_files.update(yearly_groups[y] for y in sorted_yearly) - else: - keep_files.update(yearly_groups.values()) - - # Удаляем неподходящие файлы и директории - for file, _ in files_with_dates: - if file not in keep_files: - file.unlink() - unpacked_dir = export_dir / file.stem - if unpacked_dir.exists(): - shutil.rmtree(unpacked_dir) - logger.info(f"Очистка завершена. Сохранено {len(keep_files)} файлов") - - except Exception as e: - logger.error(f"Ошибка очистки архивов: {str(e)}", exc_info=True) - raise - def determine_and_load_yaml_type(file_path): with open(file_path, 'r') as f: data = yaml.safe_load(f) @@ -359,31 +523,72 @@ def determine_and_load_yaml_type(file_path): else: return data, 'unknown' -def update_db_yaml( - db_config: Dict = None, +# [CONTRACT] Управление конфигурациями YAML +def update_yamls( + db_configs: Optional[List[Dict]] = None, path: str = "dashboards", + regexp_pattern: Optional[LiteralString] = None, + replace_string: Optional[LiteralString] = None, logger: Optional[SupersetLogger] = None ) -> None: """ - Обновляет конфигурации в YAML-файлах баз данных, заменяя старые значения на новые + [OPERATION] Обновление YAML-конфигов + @pre: + - path должен содержать валидные YAML-файлы + - db_configs должен содержать old/new состояния + @post: + - Все YAML-файлы обновлены согласно конфигурациям + - Сохраняется оригинальная структура файлов - :param db_config: Словарь с параметрами для замены в формате: + Обновляет конфигурации в YAML-файлах баз данных, заменяя старые значения на новые. + Поддерживает два типа замен: + 1. Точечную замену значений по ключам из db_config + 2. Регулярные выражения для замены текста во всех строковых полях + + Параметры: + :db_configs: Список словарей или словарь с параметрами для замены в формате: { "old": {старые_ключи: значения_для_поиска}, "new": {новые_ключи: значения_для_замены} } - :param path: Путь к папке с YAML-файлами + Если не указан - используется только замена по регулярным выражениям + :path: Путь к папке с YAML-файлами (по умолчанию "dashboards") + :regexp_pattern: Регулярное выражение для поиска текста (опционально) + :replace_string: Строка для замены найденного текста (используется с regexp_pattern) + :logger: Логгер для записи событий (по умолчанию создается новый) + + Логирует: + - Информационные сообщения о начале процесса и успешных обновлениях + - Ошибки обработки отдельных файлов + - Критические ошибки, прерывающие выполнение + + Пример использования: + update_yamls( + db_config={ + "old": {"host": "old.db.example.com"}, + "new": {"host": "new.db.example.com"} + }, + regexp_pattern="old\.", + replace_string="new." + ) """ + logger = logger or SupersetLogger(name="fileio", console=False) - logger.info("Старт обновления YAML-конфигов") + logger.info("[YAML_UPDATE] Старт обновления конфигураций") + # Преобразуем единственный конфиг в список для универсальности + if isinstance(db_configs, dict): + db_configs = [db_configs] + elif db_configs is None: + db_configs = [] + try: - db_config = db_config or {} - old_config = db_config.get("old", {}) # Значения для поиска - new_config = db_config.get("new", {}) # Значения для замены + dir = Path(path) - databases_dir = Path(path) - yaml_files = databases_dir.rglob("*.yaml") + if not dir.exists() or not dir.is_dir(): + raise FileNotFoundError(f"Путь {path} не существует или не является директорией") + + yaml_files = dir.rglob("*.yaml") for file_path in yaml_files: try: @@ -393,11 +598,62 @@ def update_db_yaml( logger.debug(f"Тип {file_path} - {yaml_type}") updates = {} - for key in old_config: - if key in data and data[key] == old_config[key]: - new_value = new_config.get(key) - if new_value is not None and new_value != data.get(key): - updates[key] = new_value + + # 1. Обработка замен по ключам из db_config (если нужно использовать только новые значения) + if db_configs: + for config in db_configs: + #Валидация конфига + if config is not None: + if "old" not in config or "new" not in config: + raise ValueError("db_config должен содержать оба раздела 'old' и 'new'") + + old_config = config.get("old", {}) # Значения для поиска + new_config = config.get("new", {}) # Значения для замены + + if len(old_config) != len(new_config): + raise ValueError( + f"Количество элементов в 'old' ({old_config}) и 'new' ({new_config}) не совпадает" + ) + + for key in old_config: + if key in data and data[key] == old_config[key]: + new_value = new_config.get(key) + if new_value is not None and new_value != data.get(key): + updates[key] = new_value # Заменяем без проверки старого значения + + # 2. Регулярная замена (с исправленной функцией process_value) + if regexp_pattern: + def process_value(value: Any) -> Tuple[bool, Any]: + """Рекурсивная обработка с флагом замены.""" + matched = False + if isinstance(value, str): + new_str = re.sub(regexp_pattern, replace_string, value) + matched = (new_str != value) + return matched, new_str + elif isinstance(value, dict): + new_dict = {} + for k, v in value.items(): + sub_matched, sub_val = process_value(v) + new_dict[k] = sub_val + if sub_matched: + matched = True + return matched, new_dict + elif isinstance(value, list): + new_list = [] + for item in value: + sub_matched, sub_val = process_value(item) + new_list.append(sub_val) + if sub_matched: + matched = True + return matched, new_list + return False, value # Нет замены для других типов + + # Применяем обработку ко всем данным + _, processed_data = process_value(data) + # Собираем обновления только для изменившихся полей + for key in processed_data: + if processed_data[key] != data.get(key): + updates[key] = processed_data[key] if updates: logger.info(f"Обновление {file_path}: {updates}") @@ -411,11 +667,11 @@ def update_db_yaml( sort_keys=False ) - except Exception as e: - logger.error(f"Ошибка обработки {file_path}: {str(e)}", exc_info=True) + except yaml.YAMLError as e: + logger.error(f"[YAML_ERROR] Ошибка парсинга {file_path}: {str(e)}") except Exception as e: - logger.error(f"Критическая ошибка обновления: {str(e)}", exc_info=True) + logger.error(f"[YAML_UPDATE_ERROR] Критическая ошибка: {str(e)}", exc_info=True) raise def sync_for_git( @@ -424,92 +680,194 @@ def sync_for_git( dry_run: bool = False, logger: Optional[SupersetLogger] = None ) -> None: - """ - Синхронизирует содержимое папки source_path с destination_path. - - Перезаписывает файлы в destination_path файлами из source_path. - Удаляет файлы и пустые директории в destination_path, которые отсутствуют в source_path - (исключая папку .git). - - :param source_path: Путь к папке с данными (источник) - :param destination_path: Путь к папке назначения - :param dry_run: Режим имитации (не вносит реальных изменений) - :param verbose: Подробный вывод операций + """[CONTRACT] Синхронизация контента между директориями с учетом Git + @pre: + - source_path должен существовать и быть директорией + - destination_path должен быть допустимым путем + - Исходная директория должна содержать валидную структуру Superset + @post: + - Полностью заменяет содержимое destination_path (кроме .git) + - Сохраняет оригинальные разрешения файлов + - Логирует все изменения при dry_run=True + @errors: + - ValueError при несоответствии структуры source_path + - RuntimeError при ошибках файловых операций """ logger = logger or SupersetLogger(name="fileio", console=False) - logger.info("Старт перезаписи целевой папки") - source_files = set() + logger.info( + "[SYNC_START] Запуск синхронизации", + extra={ + "source": source_path, + "destination": destination_path, + "dry_run": dry_run + } + ) - for root, _, files in os.walk(source_path): - for file in files: - rel_path = os.path.relpath(os.path.join(root, file), source_path) - source_files.add(rel_path) + try: + # [VALIDATION] Проверка исходной директории + if not validate_directory_structure(source_path, logger): + raise ValueError(f"Invalid source structure: {source_path}") - destination_files = set() - for root, _, files in os.walk(destination_path): - for file in files: - rel_path = os.path.relpath( - os.path.join(root, file), destination_path) - destination_files.add(rel_path) + src_path = Path(source_path) + dst_path = Path(destination_path) + + # [PREPARATION] Сбор информации о файлах + source_files = get_file_mapping(src_path) + destination_files = get_file_mapping(dst_path) - # Копирование/обновление файлов - for file in source_files: - src = os.path.join(source_path, file) - dst = os.path.join(destination_path, file) - dest_dir = os.path.dirname(dst) + # [SYNC OPERATIONS] + operations = { + 'copied': 0, + 'removed': 0, + 'skipped': 0 + } - os.makedirs(dest_dir, exist_ok=True) + # Копирование/обновление файлов + operations.update(process_copy_operations( + src_path, + dst_path, + source_files, + destination_files, + dry_run, + logger + )) - shutil.copy2(src, dst) - logger.info(f"Копируем: {file}") + # Удаление устаревших файлов + operations.update(process_cleanup_operations( + dst_path, + source_files, + destination_files, + dry_run, + logger + )) - # Удаление лишних файлов - files_to_delete = destination_files - source_files - git_dir = Path(destination_path) / ".git" + # [RESULT] + logger.info( + "[SYNC_RESULT] Итоги синхронизации", + extra=operations + ) - for file in files_to_delete: - target = Path(destination_path) / file + except Exception as e: + error_msg = f"[SYNC_FAILED] Ошибка синхронизации: {str(e)}" + logger.error(error_msg, exc_info=True) + raise RuntimeError(error_msg) from e - # Пропускаем .git и его содержимое + +# [HELPER] Получение карты файлов +def get_file_mapping(root_path: Path) -> Dict[str, Path]: + """[UTILITY] Генерация словаря файлов относительно корня + @post: + - Возвращает Dict[relative_path: Path] + - Игнорирует .git директории + """ + file_map = {} + for item in root_path.rglob("*"): + if ".git" in item.parts: + continue + rel_path = item.relative_to(root_path) + file_map[str(rel_path)] = item + return file_map + + +# [HELPER] Обработка копирования +def process_copy_operations( + src_path: Path, + dst_path: Path, + source_files: Dict[str, Path], + destination_files: Dict[str, Path], + dry_run: bool, + logger: SupersetLogger +) -> Dict[str, int]: + """[OPERATION] Выполнение операций копирования + @post: + - Возвращает счетчики операций {'copied': X, 'skipped': Y} + - Создает все необходимые поддиректории + """ + counters = {'copied': 0, 'skipped': 0} + + for rel_path, src_file in source_files.items(): + dst_file = dst_path / rel_path + + # Проверка необходимости обновления + if rel_path in destination_files: + if filecmp.cmp(src_file, dst_file, shallow=False): + counters['skipped'] += 1 + continue + + # Dry-run логирование + if dry_run: + logger.debug( + f"[DRY_RUN] Будет скопирован: {rel_path}", + extra={'operation': 'copy'} + ) + continue + + # Реальное копирование + try: + dst_file.parent.mkdir(parents=True, exist_ok=True) + shutil.copy2(src_file, dst_file) + counters['copied'] += 1 + logger.debug(f"Скопирован: {rel_path}") + except Exception as copy_error: + logger.error( + f"[COPY_ERROR] Ошибка копирования {rel_path}: {str(copy_error)}", + exc_info=True + ) + raise + + return counters + + +# [HELPER] Обработка удаления +def process_cleanup_operations( + dst_path: Path, + source_files: Dict[str, Path], + destination_files: Dict[str, Path], + dry_run: bool, + logger: SupersetLogger +) -> Dict[str, int]: + """[OPERATION] Удаление устаревших файлов + @post: + - Возвращает счетчики {'removed': X} + - Гарантированно не удаляет .git + """ + counters = {'removed': 0} + files_to_delete = set(destination_files.keys()) - set(source_files.keys()) + git_dir = dst_path / ".git" + + for rel_path in files_to_delete: + target = dst_path / rel_path + + # Защита .git try: if git_dir in target.parents or target == git_dir: - logger.info(f"Пропускаем .git: {target}") + logger.debug(f"Сохранен .git: {target}") continue - except ValueError: - pass + except ValueError: # Для случаев некорректных путей + continue + # Dry-run логирование + if dry_run: + logger.debug( + f"[DRY_RUN] Будет удален: {rel_path}", + extra={'operation': 'delete'} + ) + continue + + # Реальное удаление try: if target.is_file(): target.unlink() elif target.is_dir(): shutil.rmtree(target) - except OSError as e: - logger.error(f"Ошибка удаления: {target}: {e}") + counters['removed'] += 1 + logger.debug(f"Удален: {rel_path}") + except Exception as remove_error: + logger.error( + f"[REMOVE_ERROR] Ошибка удаления {target}: {str(remove_error)}", + exc_info=True + ) + raise - # Удаление пустых директорий - - git_dir = Path(destination_path) / ".git" - deleted_dirs = set() - - # Проходим снизу вверх (от вложенных директорий к корневым) - for root, dirs, files in os.walk(destination_path, topdown=False): - for dir_name in dirs: - dir_path = Path(root) / dir_name - - # Пропускаем .git и его поддиректории - try: - if git_dir in dir_path.parents or dir_path == git_dir: - continue - except ValueError: - pass - - # Проверяем что директория пуста и не была удалена ранее - if dir_path not in deleted_dirs and not any(dir_path.iterdir()): - - try: - dir_path.rmdir() - deleted_dirs.add(dir_path) - logger.info(f"Удаляем пустую директорию: {dir_path}") - except OSError as e: - logger.error(f"Ошибка удаления: {dir_path}: {e}") + return counters diff --git a/superset_tool/utils/logger.py b/superset_tool/utils/logger.py index c7264e6..acbb78d 100644 --- a/superset_tool/utils/logger.py +++ b/superset_tool/utils/logger.py @@ -42,20 +42,17 @@ class SupersetLogger: def _get_timestamp(self) -> str: return datetime.now().strftime("%Y%m%d") - def info(self, message: str, exc_info: bool = False): - self.logger.info(message, exc_info=exc_info) + def info(self, message: str, extra: Optional[dict] = None, exc_info: bool = False): + self.logger.info(message, extra=extra, exc_info=exc_info) - def error(self, message: str, exc_info: bool = False): - self.logger.error(message, exc_info=exc_info) + def error(self, message: str, extra: Optional[dict] = None, exc_info: bool = False): + self.logger.error(message, extra=extra, exc_info=exc_info) - def warning(self, message: str, exc_info: bool = False): - self.logger.warning(message, exc_info=exc_info) + def warning(self, message: str, extra: Optional[dict] = None, exc_info: bool = False): + self.logger.warning(message, extra=extra, exc_info=exc_info) - def debug(self, message: str, exc_info: bool = False): - self.logger.debug(message, exc_info=exc_info) + def debug(self, message: str, extra: Optional[dict] = None, exc_info: bool = False): + self.logger.debug(message, extra=extra, exc_info=exc_info) def exception(self, message: str): self.logger.exception(message) - - def critical(self, message: str, exc_info: bool = False): - self.logger.critical(message, exc_info=exc_info) diff --git a/test api.py b/test api.py deleted file mode 100644 index 770cdff..0000000 --- a/test api.py +++ /dev/null @@ -1,113 +0,0 @@ -import logging -from datetime import datetime -import shutil -import os -import keyring -from pathlib import Path -from superset_tool.models import SupersetConfig, DatabaseConfig -from superset_tool.client import SupersetClient -from superset_tool.utils.fileio import save_and_unpack_dashboard, archive_exports - -# Настройка логирования -LOG_DIR = Path("P:\\Superset\\010 Бекапы\\Logs") -LOG_DIR.mkdir(exist_ok=True, parents=True) -logging.basicConfig( - level=logging.INFO, - format='%(asctime)s - %(levelname)s - %(message)s', - handlers=[ - logging.FileHandler(LOG_DIR / f"superset_backup_{datetime.now().strftime('%Y%m%d')}.log"), - logging.StreamHandler() - ] -) -logger = logging.getLogger(__name__) - -def setup_clients(): - """Инициализация клиентов для разных окружений""" - clients = {} - try: - - # Конфигурация для Prod - prod_config = SupersetConfig( - base_url="https://prodta.bi.dwh.rusal.com/api/v1", - auth={ - "provider": "db", - "username": "migrate_user", - "password": keyring.get_password("system", "prod migrate"), - "refresh": True - }, - verify_ssl=False - ) - # Конфигурация для Dev - dev_config = SupersetConfig( - base_url="https://devta.bi.dwh.rusal.com/api/v1", - auth={ - "provider": "db", - "username": "migrate_user", - "password": keyring.get_password("system", "dev migrate"), - "refresh": True - }, - verify_ssl=False - ) - - - # Конфигурация для Sandbox - sandbox_config = SupersetConfig( - base_url="https://sandboxta.bi.dwh.rusal.com/api/v1", - auth={ - "provider": "db", - "username": "migrate_user", - "password": keyring.get_password("system", "sandbox migrate"), - "refresh": True - }, - verify_ssl=False - ) - try: - clients['dev'] = SupersetClient(dev_config) - except Exception as e: - logger.error(f"Ошибка инициализации клиента: {str(e)}") - # try: - # clients['sbx'] = SupersetClient(sandbox_config) - # except Exception as e: - # logger.error(f"Ошибка инициализации клиента: {str(e)}") - try: - clients['prod'] = SupersetClient(prod_config) - except Exception as e: - logger.error(f"Ошибка инициализации клиента: {str(e)}") - - return clients - except Exception as e: - logger.error(f"Ошибка инициализации клиентов: {str(e)}") - raise - -def backup_dashboards(client, env_name, backup_root): - """Выполнение бэкапа дашбордов для указанного окружения""" - #logger.info(f"Начало бэкапа для окружения {env_name}") - - #print(client.get_dashboards()) - print(client.get_dashboard("IM0010")) - - - -clients = setup_clients() -superset_backup_repo = Path("P:\\Superset\\010 Бекапы") - -# Бэкап для DEV -dev_success = backup_dashboards( - clients['dev'], - "DEV", - superset_backup_repo -) - - -# Бэкап для Sandbox -# sbx_success = backup_dashboards( -# clients['sbx'], -# "SBX", -# superset_backup_repo -# ) - -prod_success = backup_dashboards( - clients['prod'], - "PROD", - superset_backup_repo -)