diff --git a/.gitignore b/.gitignore index 36d39bb..5a5cca2 100644 --- a/.gitignore +++ b/.gitignore @@ -1,11 +1,6 @@ -__pycache__ -dashboards/ -БД/ +*__pycache__* *.ps1 *.ipynb *.txt *.zip keyring passwords.py -Logs/ -patch1.patch -test postman.py diff --git a/backup_script.py b/backup_script.py index e0445ba..bd3a00d 100644 --- a/backup_script.py +++ b/backup_script.py @@ -62,43 +62,59 @@ def setup_clients(logger: SupersetLogger): logger.error(f"Ошибка инициализации клиентов: {str(e)}") raise -def backup_dashboards(client, - env_name, - backup_root, - logger: SupersetLogger ): - """Выполнение бэкапа дашбордов для указанного окружения""" +def backup_dashboards(client, env_name, backup_root, logger): + """Выполнение бэкапа дашбордов с детальным логированием ошибок""" try: dashboard_count, dashboard_meta = client.get_dashboards() - total = 0 + if dashboard_count == 0: + logger.warning(f"Нет дашбордов для экспорта в {env_name}") + return True + success = 0 + errors = [] for db in dashboard_meta: - if db['slug']: - total += 1 + if not db.get('slug'): + continue + + try: dashboard_title = db['dashboard_title'] + dashboard_dir = Path(backup_root) / env_name / sanitize_filename(dashboard_title) + dashboard_dir.mkdir(parents=True, exist_ok=True) + + zip_content, filename = client.export_dashboard(db['id']) + save_and_unpack_dashboard( + zip_content=zip_content, + original_filename=filename, + output_dir=dashboard_dir, + unpack=False + ) + + # Архивирование старых бэкапов try: - dashboard_dir = Path(backup_root , env_name , sanitize_filename(dashboard_title)) - dashboard_dir.mkdir(parents=True, exist_ok=True) - - zip_content, filename = client.export_dashboard(db['id']) - zip_path = save_and_unpack_dashboard( - zip_content=zip_content, - original_filename=filename, - output_dir=dashboard_dir, - unpack=False - ) - - success += 1 - #Очистка старых бэкапов - try: - archive_exports(dashboard_dir) - except Exception as cleanup_error: - raise cleanup_error - - except Exception as db_error: - raise db_error - return success == total + archive_exports(dashboard_dir) + except Exception as cleanup_error: + logger.warning(f"Ошибка очистки архива: {cleanup_error}") + + success += 1 + + except Exception as db_error: + error_info = { + 'dashboard': db.get('dashboard_title'), + 'error': str(db_error), + 'env': env_name + } + errors.append(error_info) + logger.error("Ошибка экспорта дашборда", extra=error_info) + + if errors: + logger.error(f"Итоги экспорта для {env_name}", + extra={'success': success, 'errors': errors, 'total': dashboard_count}) + + return len(errors) == 0 + except Exception as e: + logger.critical(f"Фатальная ошибка бэкапа {env_name}: {str(e)}", exc_info=True) return False def main(): diff --git a/superset_tool/client.py b/superset_tool/client.py index da24783..f2db131 100644 --- a/superset_tool/client.py +++ b/superset_tool/client.py @@ -10,27 +10,29 @@ # [IMPORTS] Стандартная библиотека import json -from typing import Optional, Dict, Tuple, List, Any, Literal, Union,BinaryIO +from typing import Optional, Dict, Tuple, List, Any, Literal, Union +import datetime from pathlib import Path # [IMPORTS] Сторонние библиотеки import requests import urllib3 -from pydantic import BaseModel, Field -from requests.exceptions import HTTPError +import zipfile # [IMPORTS] Локальные модули -from .models import SupersetConfig -from .exceptions import ( +from superset_tool.models import SupersetConfig +from superset_tool.exceptions import ( AuthenticationError, SupersetAPIError, DashboardNotFoundError, NetworkError, PermissionDeniedError, - ExportError + ExportError, + InvalidZipFormatError ) -from .utils.fileio import get_filename_from_headers -from .utils.logger import SupersetLogger +from superset_tool.utils.fileio import get_filename_from_headers +from superset_tool.utils.logger import SupersetLogger +from superset_tool.utils.network import APIClient # [CONSTANTS] Логирование HTTP_METHODS = Literal['GET', 'POST', 'PUT', 'DELETE'] @@ -80,10 +82,14 @@ class SupersetClient: self._validate_config(config) self.config = config self.logger = config.logger or SupersetLogger(name="client") - self.session = self._setup_session() + self.network = APIClient( + base_url=config.base_url, + auth=config.auth, + verify_ssl=config.verify_ssl + ) + self.tokens = self.network.authenticate() try: - self._authenticate() self.logger.info( "[COHERENCE_CHECK_PASSED] Клиент успешно инициализирован", extra={"base_url": config.base_url} @@ -136,96 +142,6 @@ class SupersetClient: ) raise ValueError("base_url должен начинаться с http:// или https://") - # [CHUNK] Настройка сессии и адаптеров - def _setup_session(self) -> requests.Session: - """[INTERNAL] Конфигурация HTTP-сессии - @coherence_check: SSL verification должен соответствовать config - """ - session = requests.Session() - adapter = requests.adapters.HTTPAdapter( - max_retries=3, - pool_connections=10, - pool_maxsize=100 - ) - - session.mount('https://', adapter) - session.verify = self.config.verify_ssl - - if not self.config.verify_ssl: - urllib3.disable_warnings() - self.logger.debug( - "[CONFIG] SSL verification отключен", - extra={"base_url": self.config.base_url} - ) - - return session - - # [CHUNK] Процесс аутентификации - def _authenticate(self): - """[AUTH-FLOW] Получение токенов - @semantic_steps: - 1. Получение access_token - 2. Получение CSRF токена - @error_handling: - - AuthenticationError при проблемах credentials - - NetworkError при проблемах связи - """ - try: - # [STEP 1] Получение bearer token - login_url = f"{self.config.base_url}/security/login" - response = self.session.post( - login_url, - json={ - "username": self.config.auth["username"], - "password": self.config.auth["password"], - "provider": self.config.auth["provider"], - "refresh": self.config.auth["refresh"] - }, - timeout=self.config.timeout - ) - - if response.status_code == 401: - raise AuthenticationError( - "Invalid credentials", - context={ - "endpoint": login_url, - "username": self.config.auth["username"], - "status_code": response.status_code - } - ) - - response.raise_for_status() - self.access_token = response.json()["access_token"] - - # [STEP 2] Получение CSRF token - csrf_url = f"{self.config.base_url}/security/csrf_token/" - response = self.session.get( - csrf_url, - headers={"Authorization": f"Bearer {self.access_token}"}, - timeout=self.config.timeout - ) - response.raise_for_status() - self.csrf_token = response.json()["result"] - - self.logger.info( - "[AUTH_SUCCESS] Токены успешно получены", - extra={ - "access_token": f"{self.access_token[:5]}...", - "csrf_token": f"{self.csrf_token[:5]}..." - } - ) - - except requests.exceptions.RequestException as e: - error_context = { - "method": e.request.method, - "url": e.request.url, - "status_code": getattr(e.response, 'status_code', None) - } - - if isinstance(e, (requests.Timeout, requests.ConnectionError)): - raise NetworkError("Connection failed", context=error_context) from e - raise SupersetAPIError("Auth flow failed", context=error_context) from e - @property def headers(self) -> dict: """[INTERFACE] Базовые заголовки для API-вызовов @@ -233,8 +149,8 @@ class SupersetClient: @post: Всегда возвращает актуальные токены """ return { - "Authorization": f"Bearer {self.access_token}", - "X-CSRFToken": self.csrf_token, + "Authorization": f"Bearer {self.tokens['access_token']}", + "X-CSRFToken": self.tokens["csrf_token"], "Referer": self.config.base_url, "Content-Type": "application/json" } @@ -244,62 +160,33 @@ class SupersetClient: """[CONTRACT] Получение метаданных дашборда @pre: - dashboard_id_or_slug должен существовать - - Токены должны быть валидны + - Клиент должен быть аутентифицирован (tokens актуальны) @post: - - Возвращает полные метаданные + - Возвращает dict с метаданными дашборда - В случае 404 вызывает DashboardNotFoundError + @semantic_layers: + 1. Взаимодействие с API через APIClient + 2. Обработка специфичных для Superset ошибок """ - url = f"{self.config.base_url}/dashboard/{dashboard_id_or_slug}" - try: - response = self.session.get( - url, - headers=self.headers, - timeout=self.config.timeout + response = self.network.request( + method="GET", + endpoint=f"/dashboard/{dashboard_id_or_slug}", + headers=self.headers # Автоматически включает токены ) - - if response.status_code == 404: + return response.json()["result"] + + except requests.HTTPError as e: + if e.response.status_code == 404: raise DashboardNotFoundError( dashboard_id_or_slug, - context={"url": url} + context={"url": f"{self.config.base_url}/dashboard/{dashboard_id_or_slug}"} ) - - response.raise_for_status() - return response.json()["result"] - - except requests.exceptions.RequestException as e: - self._handle_api_error("get_dashboard", e, url) - - def export_dashboard(self, dashboard_id: int) -> tuple[bytes, str]: - """[CONTRACT] Экспорт дашборда в ZIP - @error_handling: - - DashboardNotFoundError если дашборд не существует - - ExportError при проблемах экспорта - """ - url = f"{self.config.base_url}/dashboard/export/" + raise SupersetAPIError( + f"API Error: {str(e)}", + status_code=e.response.status_code + ) from e - try: - response = self.session.get( - url, - headers=self.headers, - params={"q": f"[{dashboard_id}]"}, - timeout=self.config.timeout - ) - - if response.status_code == 404: - raise DashboardNotFoundError(dashboard_id) - - response.raise_for_status() - - filename = ( - get_filename_from_headers(response.headers) - or f"dashboard_{dashboard_id}.zip" - ) - return response.content, filename - - except requests.exceptions.RequestException as e: - self._handle_api_error("export_dashboard", e, url) - # [ERROR-HANDLER] Централизованная обработка ошибок def _handle_api_error(self, method_name: str, error: Exception, url: str) -> None: """[UNIFIED-ERROR] Обработка API-ошибок @@ -372,13 +259,12 @@ class SupersetClient: - Ответ должен иметь status_code 200 - Content-Type: application/zip """ - response = self.session.get( - url, - headers=self.headers, - params={"q": f"[{dashboard_id}]"}, - timeout=self.config.timeout, - stream=True - ) + response = self.network.request( + method="GET", + endpoint="/dashboard/export/", + params={"q": f"[{dashboard_id}]"}, + raw_response=True # Для получения бинарного содержимого + ) response.raise_for_status() return response @@ -483,8 +369,8 @@ class SupersetClient: try: # Инициализация пагинации - total_count = self._fetch_total_count(url) - paginated_data = self._fetch_all_pages(url, validated_query, total_count) + total_count = self._fetch_total_count() + paginated_data = self._fetch_all_pages(validated_query, total_count) self.logger.info( "[API_SUCCESS] Дашборды получены", @@ -496,8 +382,8 @@ class SupersetClient: error_ctx = {"method": "get_dashboards", "query": validated_query} self._handle_api_error("Пагинация дашбордов", e, error_ctx) - # [SECTION] Импорт/экспорт - def import_dashboard(self, zip_path: Union[str, Path]) -> Dict: + # [SECTION] Импорт + def import_dashboard(self, file_name: Union[str, Path]) -> Dict: """[CONTRACT] Импорт дашборда из архива @pre: - Файл должен существовать и быть валидным ZIP @@ -506,21 +392,36 @@ class SupersetClient: - Возвращает метаданные импортированного дашборда - При конфликтах выполняет overwrite """ - self._validate_import_file(zip_path) + self._validate_import_file(file_name) + self.logger.debug( + "[IMPORT_START] Инициирован импорт дашборда", + extra={"file": file_name} + ) try: - with open(zip_path, 'rb') as f: - return self._execute_import( - file_obj=f, - file_name=Path(zip_path).name - ) + return self.network.upload_file( + endpoint="/dashboard/import/", + file_obj=file_name, + file_name=file_name, + form_field="formData", + extra_data={'overwrite': 'true'}, + timeout=self.config.timeout * 2 + ) + + except PermissionDeniedError as e: + self.logger.error( + "[IMPORT_AUTH_FAILED] Недостаточно прав для импорта", + exc_info=True + ) + raise + except Exception as e: self.logger.error( - "[IMPORT_FAILED] Критическая ошибка импорта", + "[IMPORT_FAILED] Ошибка импорта дашборда", exc_info=True, - extra={"file": str(zip_path)} + extra={"file": file_name} ) - raise DashboardImportError(f"Import failed: {str(e)}") from e + raise SupersetAPIError(f"Ошибка импорта: {str(e)}") from e # [SECTION] Приватные методы-помощники def _validate_query_params(self, query: Optional[Dict]) -> Dict: @@ -532,34 +433,62 @@ class SupersetClient: } return {**base_query, **(query or {})} - def _fetch_total_count(self, url: str) -> int: - """[HELPER] Получение общего количества дашбордов""" - count_response = self.session.get( - f"{url}?q={json.dumps({'columns': ['id'], 'page': 0, 'page_size': 1})}", - headers=self.headers, - timeout=self.config.timeout - ) - count_response.raise_for_status() - return count_response.json()['count'] - - def _fetch_all_pages(self, url: str, query: Dict, total_count: int) -> List[Dict]: - """[HELPER] Обход всех страниц с пагинацией""" - results = [] - page_size = query['page_size'] - total_pages = (total_count + page_size - 1) // page_size - - for page in range(total_pages): - query['page'] = page - response = self.session.get( - url, - headers=self.headers, - params={"q": json.dumps(query)}, - timeout=self.config.timeout - ) - response.raise_for_status() - results.extend(response.json().get('result', [])) + def _fetch_total_count(self) -> int: + """[CONTRACT][HELPER] Получение общего кол-ва дашбордов в системе + @delegates: + - Сетевой запрос -> APIClient + - Обработка ответа -> собственный метод + @errors: + - SupersetAPIError при проблемах с API + """ + query_params = { + 'columns': ['id'], + 'page': 0, + 'page_size': 1 + } - return results + try: + return self.network.fetch_paginated_count( + endpoint="/dashboard/", + query_params=query_params, + count_field="count" + ) + except requests.exceptions.RequestException as e: + raise SupersetAPIError(f"Ошибка получения количества дашбордов: {str(e)}") + + def _fetch_all_pages(self, query: Dict, total_count: int) -> List[Dict]: + """[HELPER] Обход всех страниц с пагинацией""" + """[CONTRACT] Получение всех данных с пагинированного API + @delegates: + - Сетевые запросы -> APIClient.fetch_paginated_data() + @params: + query: оригинальный query-объект (без page) + total_count: общее количество элементов + @return: + Список всех элементов + @errors: + - SupersetAPIError: проблемы с API + - ValueError: некорректные параметры пагинации + """ + try: + if not query.get('page_size'): + raise ValueError("Отсутствует page_size в query параметрах") + + return self.network.fetch_paginated_data( + endpoint="/dashboard/", + base_query=query, + total_count=total_count, + results_field="result" + ) + + except (requests.exceptions.RequestException, ValueError) as e: + error_ctx = { + "query": query, + "total_count": total_count, + "error": str(e) + } + self.logger.error("[PAGINATION_ERROR]", extra=error_ctx) + raise SupersetAPIError(f"Ошибка пагинации: {str(e)}") from e def _validate_import_file(self, zip_path: Union[str, Path]) -> None: """[HELPER] Проверка файла перед импортом""" @@ -573,24 +502,3 @@ class SupersetClient: with zipfile.ZipFile(path) as zf: if not any(n.endswith('metadata.yaml') for n in zf.namelist()): raise DashboardNotFoundError("Архив не содержит metadata.yaml") - - def _execute_import(self, file_obj: BinaryIO, file_name: str) -> Dict: - """[HELPER] Выполнение API-запроса импорта""" - url = f"{self.config.base_url}/dashboard/import/" - - files = {'formData': (file_name, file_obj, 'application/x-zip-compressed')} - headers = {k: v for k, v in self.headers.items() if k.lower() != 'content-type'} - - response = self.session.post( - url, - files=files, - data={'overwrite': 'true'}, - headers=headers, - timeout=self.config.timeout * 2 # Увеличенный таймаут для импорта - ) - - if response.status_code == 403: - raise PermissionDeniedError("Недостаточно прав для импорта") - - response.raise_for_status() - return response.json() \ No newline at end of file diff --git a/superset_tool/utils/logger.py b/superset_tool/utils/logger.py index acbb78d..5d92360 100644 --- a/superset_tool/utils/logger.py +++ b/superset_tool/utils/logger.py @@ -50,6 +50,9 @@ class SupersetLogger: def warning(self, message: str, extra: Optional[dict] = None, exc_info: bool = False): self.logger.warning(message, extra=extra, exc_info=exc_info) + + def critical(self, message: str, extra: Optional[dict] = None, exc_info: bool = False): + self.logger.critical(message, extra=extra, exc_info=exc_info) def debug(self, message: str, extra: Optional[dict] = None, exc_info: bool = False): self.logger.debug(message, extra=extra, exc_info=exc_info) diff --git a/superset_tool/utils/network.py b/superset_tool/utils/network.py new file mode 100644 index 0000000..fbea159 --- /dev/null +++ b/superset_tool/utils/network.py @@ -0,0 +1,215 @@ +from typing import Optional, Dict, Any,BinaryIO,List +import requests +import json +import urllib3 +from ..exceptions import AuthenticationError, NetworkError,DashboardNotFoundError,SupersetAPIError,PermissionDeniedError + +class APIClient: + """[NETWORK-CORE] Инкапсулирует HTTP-логику для работы с API. + @contract: Гарантирует retry, SSL-валидацию и стандартные заголовки. + """ + def __init__( + self, + base_url: str, + auth: Dict[str, Any], + verify_ssl: bool = False, + timeout: int = 30 + ): + self.base_url = base_url + self.auth = auth + self.session = self._init_session(verify_ssl) + self.timeout = timeout + + def _init_session(self, verify_ssl: bool) -> requests.Session: + """[NETWORK-INIT] Настройка сессии с адаптерами.""" + session = requests.Session() + session.mount('https://', requests.adapters.HTTPAdapter(max_retries=3)) + session.verify = verify_ssl + if not verify_ssl: + urllib3.disable_warnings() + return session + + def authenticate(self) -> Dict[str, str]: + """[AUTH-FLOW] Получение access и CSRF токенов.""" + try: + response = self.session.post( + f"{self.base_url}/security/login", + json={**self.auth, "provider": "db", "refresh": True}, + timeout=self.timeout + ) + response.raise_for_status() + access_token = response.json()["access_token"] + + csrf_response = self.session.get( + f"{self.base_url}/security/csrf_token/", + headers={"Authorization": f"Bearer {access_token}"}, + timeout=self.timeout + ) + csrf_response.raise_for_status() + + return { + "access_token": access_token, + "csrf_token": csrf_response.json()["result"] + } + except requests.exceptions.RequestException as e: + raise NetworkError(f"Auth failed: {str(e)}") + + def request( + self, + method: str, + endpoint: str, + headers: Optional[Dict] = None, + **kwargs + ) -> requests.Response: + """[NETWORK-CORE] Обертка для запросов с обработкой ошибок.""" + try: + response = self.session.request( + method, + f"{self.base_url}{endpoint}", + headers=headers, + timeout=self.timeout, + **kwargs + ) + response.raise_for_status() + return response + except requests.exceptions.HTTPError as e: + if e.response.status_code == 404: + raise DashboardNotFoundError(endpoint) + raise SupersetAPIError(str(e)) + + def upload_file( + self, + endpoint: str, + file_obj: BinaryIO, + file_name: str, + form_field: str = "file", + extra_data: Optional[Dict] = None, + timeout: Optional[int] = None + ) -> Dict: + """[NETWORK] Отправка файла на сервер + @params: + endpoint: API endpoint + file_obj: файловый объект + file_name: имя файла + form_field: имя поля формы + extra_data: дополнительные данные + timeout: таймаут запроса + @return: + Ответ сервера (JSON) + """ + files = {form_field: (file_name, file_obj, 'application/x-zip-compressed')} + headers = { + k: v for k, v in self.headers.items() + if k.lower() != 'content-type' + } + + try: + response = self.session.post( + url=f"{self.base_url}{endpoint}", + files=files, + data=extra_data or {}, + headers=headers, + timeout=timeout or self.timeout + ) + + if response.status_code == 403: + raise PermissionDeniedError("Доступ запрещен") + + response.raise_for_status() + return response.json() + + except requests.exceptions.RequestException as e: + error_ctx = { + "endpoint": endpoint, + "file": file_name, + "status_code": getattr(e.response, 'status_code', None) + } + self.logger.error( + "[NETWORK_ERROR] Ошибка загрузки файла", + extra=error_ctx + ) + raise + + def fetch_paginated_count( + self, + endpoint: str, + query_params: Dict, + count_field: str = "count", + timeout: Optional[int] = None + ) -> int: + """[NETWORK] Получение общего количества элементов в пагинированном API + @params: + endpoint: API endpoint без query-параметров + query_params: параметры для пагинации + count_field: поле с количеством в ответе + timeout: таймаут запроса + @return: + Общее количество элементов + @errors: + - NetworkError: проблемы с соединением + - KeyError: некорректный формат ответа + """ + try: + response = self.request( + method="GET", + endpoint=endpoint, + params={"q": json.dumps(query_params)}, + timeout=timeout or self.timeout + ) + + if count_field not in response: + raise KeyError(f"Ответ API не содержит поле {count_field}") + + return response[count_field] + + except requests.exceptions.RequestException as e: + error_ctx = { + "endpoint": endpoint, + "params": query_params, + "error": str(e) + } + self.logger.error("[PAGINATION_ERROR]", extra=error_ctx) + raise NetworkError(f"Ошибка пагинации: {str(e)}") from e + + def fetch_paginated_data( + self, + endpoint: str, + base_query: Dict, + total_count: int, + results_field: str = "result", + timeout: Optional[int] = None + ) -> List[Any]: + """[NETWORK] Получение всех данных с пагинированного API + @params: + endpoint: API endpoint + base_query: базовые параметры запроса (без page) + total_count: общее количество элементов + results_field: поле с данными в ответе + timeout: таймаут для запросов + @return: + Собранные данные со всех страниц + """ + page_size = base_query['page_size'] + total_pages = (total_count + page_size - 1) // page_size + results = [] + + for page in range(total_pages): + query = {**base_query, 'page': page} + + response = self._execute_request( + method="GET", + endpoint=endpoint, + params={"q": json.dumps(query)}, + timeout=timeout or self.timeout + ) + + if results_field not in response: + self.logger.warning( + f"Ответ не содержит поле {results_field}", + extra={"response": response.keys()} + ) + continue + + results.extend(response[results_field]) + + return results \ No newline at end of file