migration refactor

This commit is contained in:
2025-08-16 12:29:37 +03:00
parent f368f5ced9
commit 0e2fc14732
16 changed files with 1977 additions and 2761 deletions

View File

View File

@@ -1,661 +1,313 @@
# [MODULE] Superset API Client
# @contract: Реализует полное взаимодействие с Superset API
# @semantic_layers:
# 1. Авторизация/CSRF (делегируется `APIClient`)
# 2. Основные операции (получение метаданных, список дашбордов)
# 3. Импорт/экспорт дашбордов
# @coherence:
# - Согласован с `models.SupersetConfig` для конфигурации.
# - Полная обработка всех ошибок из `exceptions.py` (делегируется `APIClient` и дополняется специфичными).
# - Полностью использует `utils.network.APIClient` для всех HTTP-запросов.
# pylint: disable=too-many-arguments,too-many-locals,too-many-statements,too-many-branches,unused-argument
"""
[MODULE] Superset API Client
@contract: Реализует полное взаимодействие с Superset API
"""
# [IMPORTS] Стандартная библиотека
import json
from typing import Optional, Dict, Tuple, List, Any, Literal, Union
from typing import Optional, Dict, Tuple, List, Any, Union
import datetime
from pathlib import Path
import zipfile
from requests import Response
import zipfile # Для валидации ZIP-файлов
# [IMPORTS] Сторонние библиотеки (убраны requests и urllib3, т.к. они теперь в network.py)
# [IMPORTS] Локальные модули
from superset_tool.models import SupersetConfig
from superset_tool.exceptions import (
AuthenticationError,
SupersetAPIError,
DashboardNotFoundError,
NetworkError,
PermissionDeniedError,
ExportError,
InvalidZipFormatError
)
from superset_tool.utils.fileio import get_filename_from_headers
from superset_tool.utils.logger import SupersetLogger
from superset_tool.utils.network import APIClient # [REFACTORING_TARGET] Использование APIClient
from superset_tool.utils.network import APIClient
# [CONSTANTS] Общие константы (для информации, т.к. тайм-аут теперь в конфиге)
DEFAULT_TIMEOUT = 30 # seconds - используется как значение по умолчанию в SupersetConfig
# [CONSTANTS]
DEFAULT_TIMEOUT = 30
# [TYPE-ALIASES] Для сложных сигнатур
# [TYPE-ALIASES]
JsonType = Union[Dict[str, Any], List[Dict[str, Any]]]
ResponseType = Tuple[bytes, str]
# [CHECK] Валидация импортов для контрактов
# [COHERENCE_CHECK_PASSED] Теперь зависимость на requests и urllib3 скрыта за APIClient
try:
from .utils.fileio import get_filename_from_headers as fileio_check
assert callable(fileio_check)
from .utils.network import APIClient as network_check
assert callable(network_check)
except (ImportError, AssertionError) as imp_err:
raise RuntimeError(
f"[COHERENCE_CHECK_FAILED] Импорт не прошел валидацию: {str(imp_err)}"
) from imp_err
class SupersetClient:
"""[MAIN-CONTRACT] Клиент для работы с Superset API
@pre:
- `config` должен быть валидным `SupersetConfig`.
- Целевой API доступен и учетные данные корректны.
@post:
- Все методы возвращают ожидаемые данные или вызывают явные, типизированные ошибки.
- Токены для API-вызовов автоматически управляются (`APIClient`).
@invariant:
- Сессия остается валидной между вызовами.
- Все ошибки типизированы согласно `exceptions.py`.
- Все HTTP-запросы проходят через `self.network`.
"""
"""[MAIN-CONTRACT] Клиент для работы с Superset API"""
# [ENTITY: Function('__init__')]
# CONTRACT:
# PURPOSE: Инициализация клиента Superset.
# PRECONDITIONS: `config` должен быть валидным `SupersetConfig`.
# POSTCONDITIONS: Клиент успешно инициализирован.
def __init__(self, config: SupersetConfig, logger: Optional[SupersetLogger] = None):
"""[INIT] Инициализация клиента Superset.
@semantic:
- Валидирует входную конфигурацию.
- Инициализирует внутренний `APIClient` для сетевого взаимодействия.
- Выполняет первичную аутентификацию через `APIClient`.
"""
# [PRECONDITION] Валидация конфигурации
self.logger = logger or SupersetLogger(name="SupersetClient")
self.logger.info("[INFO][SupersetClient.__init__][ENTER] Initializing SupersetClient.")
self._validate_config(config)
self.config = config
# [ANCHOR] API_CLIENT_INIT
# [REFACTORING_COMPLETE] Теперь вся сетевая логика инкапсулирована в APIClient.
# APIClient отвечает за аутентификацию, повторные попытки и обработку низкоуровневых ошибок.
self.network = APIClient(
base_url=config.base_url,
auth=config.auth,
config=config.dict(),
verify_ssl=config.verify_ssl,
timeout=config.timeout,
logger=self.logger # Передаем логгер в APIClient
logger=self.logger
)
try:
# Аутентификация выполняется в конструкторе APIClient или по первому запросу
# Для явного вызова: self.network.authenticate()
# APIClient сам управляет токенами после первого успешного входа
self.logger.info(
"[COHERENCE_CHECK_PASSED] Клиент Superset успешно инициализирован",
extra={"base_url": config.base_url}
)
except Exception as e:
self.logger.error(
"[INIT_FAILED] Ошибка инициализации клиента Superset",
exc_info=True,
extra={"config_base_url": config.base_url, "error": str(e)}
)
raise # Перевыброс ошибки инициализации
self.logger.info("[INFO][SupersetClient.__init__][SUCCESS] SupersetClient initialized successfully.")
# END_FUNCTION___init__
# [ENTITY: Function('_validate_config')]
# CONTRACT:
# PURPOSE: Валидация конфигурации клиента.
# PRECONDITIONS: `config` должен быть экземпляром `SupersetConfig`.
# POSTCONDITIONS: Конфигурация валидна.
def _validate_config(self, config: SupersetConfig) -> None:
"""[PRECONDITION] Валидация конфигурации клиента.
@semantic:
- Проверяет, что `config` является экземпляром `SupersetConfig`.
- Проверяет обязательные поля `base_url` и `auth`.
- Логирует ошибки валидации.
@raise:
- `TypeError`: если `config` не является `SupersetConfig`.
- `ValueError`: если отсутствуют обязательные поля или они невалидны.
"""
self.logger.debug("[DEBUG][SupersetClient._validate_config][ENTER] Validating config.")
if not isinstance(config, SupersetConfig):
self.logger.error(
"[CONTRACT_VIOLATION] Некорректный тип конфигурации",
extra={"actual_type": type(config).__name__}
)
self.logger.error("[ERROR][SupersetClient._validate_config][FAILURE] Invalid config type.")
raise TypeError("Конфигурация должна быть экземпляром SupersetConfig")
self.logger.debug("[DEBUG][SupersetClient._validate_config][SUCCESS] Config validated.")
# END_FUNCTION__validate_config
# Pydantic SupersetConfig уже выполняет основную валидацию через Field и validator.
# Здесь можно добавить дополнительные бизнес-правила или проверки доступности, если нужно.
try:
# Попытка доступа к полям через Pydantic для проверки их существования
_ = config.base_url
_ = config.auth
_ = config.auth.get("username")
_ = config.auth.get("password")
self.logger.debug("[COHERENCE_CHECK_PASSED] Конфигурация SupersetClient прошла внутреннюю валидацию.")
except Exception as e:
self.logger.error(
f"[CONTRACT_VIOLATION] Ошибка валидации полей конфигурации: {e}",
extra={"config_dict": config.dict()}
)
raise ValueError(f"Конфигурация SupersetConfig невалидна: {e}") from e
@property
def headers(self) -> dict:
"""[INTERFACE] Базовые заголовки для API-вызовов.
@semantic: Делегирует получение актуальных заголовков `APIClient`.
@post: Всегда возвращает актуальные токены и CSRF-токен.
@invariant: Заголовки содержат 'Authorization' и 'X-CSRFToken'.
"""
# [REFACTORING_COMPLETE] Заголовки теперь управляются APIClient.
"""[INTERFACE] Базовые заголовки для API-вызовов."""
return self.network.headers
# END_FUNCTION_headers
# [SECTION] API для получения списка дашбордов или получения одного дашборда
# [ENTITY: Function('get_dashboards')]
# CONTRACT:
# PURPOSE: Получение списка дашбордов с пагинацией.
# PRECONDITIONS: None
# POSTCONDITIONS: Возвращает кортеж с общим количеством и списком дашбордов.
def get_dashboards(self, query: Optional[Dict] = None) -> Tuple[int, List[Dict]]:
"""[CONTRACT] Получение списка дашбордов с пагинацией.
@pre:
- Клиент должен быть авторизован.
- Параметры `query` (если предоставлены) должны быть валидны для API Superset.
@post:
- Возвращает кортеж: (общееоличествоашбордов, список_метаданныхашбордов).
- Обходит пагинацию для получения всех доступных дашбордов.
@invariant:
- Всегда возвращает полный список (если `total_count` > 0).
@raise:
- `SupersetAPIError`: При ошибках API (например, неверный формат ответа).
- `NetworkError`: При проблемах с сетью.
- `ValueError`: При некорректных параметрах пагинации (внутренняя ошибка).
"""
self.logger.info("[INFO] Запрос списка всех дашбордов.")
# [COHERENCE_CHECK] Валидация и нормализация параметров запроса
self.logger.info("[INFO][SupersetClient.get_dashboards][ENTER] Getting dashboards.")
validated_query = self._validate_query_params(query)
self.logger.debug("[DEBUG] Параметры запроса списка дашбордов после валидации.", extra={"validated_query": validated_query})
try:
# [ANCHOR] FETCH_TOTAL_COUNT
total_count = self._fetch_total_object_count(endpoint="/dashboard/")
self.logger.info(f"[INFO] Обнаружено {total_count} дашбордов в системе.")
# [ANCHOR] FETCH_ALL_PAGES
paginated_data = self._fetch_all_pages(endpoint="/dashboard/",
query=validated_query,
total_count=total_count)
self.logger.info(
f"[COHERENCE_CHECK_PASSED] Успешно получено {len(paginated_data)} дашбордов из {total_count}."
)
return total_count, paginated_data
except (SupersetAPIError, NetworkError, ValueError, PermissionDeniedError) as e:
self.logger.error(f"[ERROR] Ошибка при получении списка дашбордов: {str(e)}", exc_info=True, extra=getattr(e, 'context', {}))
raise
except Exception as e:
error_ctx = {"query": query, "error_type": type(e).__name__}
self.logger.critical(f"[CRITICAL] Непредвиденная ошибка при получении списка дашбордов: {str(e)}", exc_info=True, extra=error_ctx)
raise SupersetAPIError(f"Непредвиденная ошибка: {str(e)}", context=error_ctx) from e
def get_dashboard(self, dashboard_id_or_slug: str) -> dict:
"""[CONTRACT] Получение метаданных дашборда по ID или SLUG.
@pre:
- `dashboard_id_or_slug` должен быть строкой (ID или slug).
- Клиент должен быть аутентифицирован (токены актуальны).
@post:
- Возвращает `dict` с метаданными дашборда.
@raise:
- `DashboardNotFoundError`: Если дашборд не найден (HTTP 404).
- `SupersetAPIError`: При других ошибках API.
- `NetworkError`: При проблемах с сетью.
"""
self.logger.info(f"[INFO] Запрос метаданных дашборда: {dashboard_id_or_slug}")
try:
response_data = self.network.request(
method="GET",
endpoint=f"/dashboard/{dashboard_id_or_slug}",
# headers=self.headers # [REFACTORING_NOTE] APIClient теперь сам добавляет заголовки
)
# [POSTCONDITION] Проверка структуры ответа
if "result" not in response_data:
self.logger.warning("[CONTRACT_VIOLATION] Ответ API не содержит поле 'result'", extra={"response": response_data})
raise SupersetAPIError("Некорректный формат ответа API при получении дашборда")
self.logger.debug(f"[DEBUG] Метаданные дашборда '{dashboard_id_or_slug}' успешно получены.")
return response_data["result"]
except (DashboardNotFoundError, SupersetAPIError, NetworkError, PermissionDeniedError) as e:
self.logger.error(f"[ERROR] Не удалось получить дашборд '{dashboard_id_or_slug}': {str(e)}", exc_info=True, extra=getattr(e, 'context', {}))
raise # Перевыброс уже типизированной ошибки
except Exception as e:
self.logger.critical(f"[CRITICAL] Непредвиденная ошибка при получении дашборда '{dashboard_id_or_slug}': {str(e)}", exc_info=True)
raise SupersetAPIError(f"Непредвиденная ошибка: {str(e)}", context={"dashboard_id_or_slug": dashboard_id_or_slug}) from e
# [SECTION] API для получения списка датасетов или получения одного датасета
def get_datasets(self, query: Optional[Dict] = None) -> Tuple[int, List[Dict]]:
"""[CONTRACT] Получение списка датасетов с пагинацией.
@pre:
- Клиент должен быть авторизован.
- Параметры `query` (если предоставлены) должны быть валидны для API Superset.
@post:
- Возвращает кортеж: (общееоличествоатасетов, список_метаданныхатасетов).
- Обходит пагинацию для получения всех доступных датасетов.
@invariant:
- Всегда возвращает полный список (если `total_count` > 0).
@raise:
- `SupersetAPIError`: При ошибках API (например, неверный формат ответа).
- `NetworkError`: При проблемах с сетью.
- `ValueError`: При некорректных параметрах пагинации (внутренняя ошибка).
"""
self.logger.info("[INFO] Запрос списка всех датасетов")
try:
# Получаем общее количество датасетов
total_count = self._fetch_total_object_count(endpoint="/dataset/")
self.logger.info(f"[INFO] Обнаружено {total_count} датасетов в системе")
# Валидируем параметры запроса
base_query = {
"columns": ["id", "table_name", "sql", "database", "schema"],
"page": 0,
"page_size": 100
total_count = self._fetch_total_object_count(endpoint="/dashboard/")
paginated_data = self._fetch_all_pages(
endpoint="/dashboard/",
pagination_options={
"base_query": validated_query,
"total_count": total_count,
"results_field": "result",
}
validated_query = {**base_query, **(query or {})}
)
self.logger.info("[INFO][SupersetClient.get_dashboards][SUCCESS] Got dashboards.")
return total_count, paginated_data
# END_FUNCTION_get_dashboards
# Получаем все страницы
datasets = self._fetch_all_pages(
endpoint="/dataset/",
query=validated_query,
total_count=total_count#,
#results_field="result"
)
# [ENTITY: Function('get_dashboard')]
# CONTRACT:
# PURPOSE: Получение метаданных дашборда по ID или SLUG.
# PRECONDITIONS: `dashboard_id_or_slug` должен существовать.
# POSTCONDITIONS: Возвращает метаданные дашборда.
def get_dashboard(self, dashboard_id_or_slug: str) -> dict:
self.logger.info(f"[INFO][SupersetClient.get_dashboard][ENTER] Getting dashboard: {dashboard_id_or_slug}")
response_data = self.network.request(
method="GET",
endpoint=f"/dashboard/{dashboard_id_or_slug}",
)
self.logger.info(f"[INFO][SupersetClient.get_dashboard][SUCCESS] Got dashboard: {dashboard_id_or_slug}")
return response_data.get("result", {})
# END_FUNCTION_get_dashboard
self.logger.info(
f"[COHERENCE_CHECK_PASSED] Успешно получено {len(datasets)} датасетов"
)
return total_count, datasets
except Exception as e:
error_ctx = {"query": query, "error_type": type(e).__name__}
self.logger.error(
f"[ERROR] Ошибка получения списка датасетов: {str(e)}",
exc_info=True,
extra=error_ctx
)
raise
# [ENTITY: Function('get_datasets')]
# CONTRACT:
# PURPOSE: Получение списка датасетов с пагинацией.
# PRECONDITIONS: None
# POSTCONDITIONS: Возвращает кортеж с общим количеством и списком датасетов.
def get_datasets(self, query: Optional[Dict] = None) -> Tuple[int, List[Dict]]:
self.logger.info("[INFO][SupersetClient.get_datasets][ENTER] Getting datasets.")
total_count = self._fetch_total_object_count(endpoint="/dataset/")
base_query = {
"columns": ["id", "table_name", "sql", "database", "schema"],
"page": 0,
"page_size": 100
}
validated_query = {**base_query, **(query or {})}
datasets = self._fetch_all_pages(
endpoint="/dataset/",
pagination_options={
"base_query": validated_query,
"total_count": total_count,
"results_field": "result",
}
)
self.logger.info("[INFO][SupersetClient.get_datasets][SUCCESS] Got datasets.")
return total_count, datasets
# END_FUNCTION_get_datasets
# [ENTITY: Function('get_dataset')]
# CONTRACT:
# PURPOSE: Получение метаданных датасета по ID.
# PRECONDITIONS: `dataset_id` должен существовать.
# POSTCONDITIONS: Возвращает метаданные датасета.
def get_dataset(self, dataset_id: str) -> dict:
"""[CONTRACT] Получение метаданных датасета по ID.
@pre:
- `dataset_id` должен быть строкой (ID или slug).
- Клиент должен быть аутентифицирован (токены актуальны).
@post:
- Возвращает `dict` с метаданными датасета.
@raise:
- `DashboardNotFoundError`: Если дашборд не найден (HTTP 404).
- `SupersetAPIError`: При других ошибках API.
- `NetworkError`: При проблемах с сетью.
"""
self.logger.info(f"[INFO] Запрос метаданных дашборда: {dataset_id}")
try:
response_data = self.network.request(
method="GET",
endpoint=f"/dataset/{dataset_id}",
# headers=self.headers # [REFACTORING_NOTE] APIClient теперь сам добавляет заголовки
)
# [POSTCONDITION] Проверка структуры ответа
if "result" not in response_data:
self.logger.warning("[CONTRACT_VIOLATION] Ответ API не содержит поле 'result'", extra={"response": response_data})
raise SupersetAPIError("Некорректный формат ответа API при получении дашборда")
self.logger.debug(f"[DEBUG] Метаданные дашборда '{dataset_id}' успешно получены.")
return response_data["result"]
except (DashboardNotFoundError, SupersetAPIError, NetworkError, PermissionDeniedError) as e:
self.logger.error(f"[ERROR] Не удалось получить дашборд '{dataset_id}': {str(e)}", exc_info=True, extra=getattr(e, 'context', {}))
raise # Перевыброс уже типизированной ошибки
except Exception as e:
self.logger.critical(f"[CRITICAL] Непредвиденная ошибка при получении дашборда '{dataset_id}': {str(e)}", exc_info=True)
raise SupersetAPIError(f"Непредвиденная ошибка: {str(e)}", context={"dashboard_id_or_slug": dataset_id}) from e
self.logger.info(f"[INFO][SupersetClient.get_dataset][ENTER] Getting dataset: {dataset_id}")
response_data = self.network.request(
method="GET",
endpoint=f"/dataset/{dataset_id}",
)
self.logger.info(f"[INFO][SupersetClient.get_dataset][SUCCESS] Got dataset: {dataset_id}")
return response_data.get("result", {})
# END_FUNCTION_get_dataset
# [SECTION] EXPORT OPERATIONS
# [ENTITY: Function('export_dashboard')]
# CONTRACT:
# PURPOSE: Экспорт дашборда в ZIP-архив.
# PRECONDITIONS: `dashboard_id` должен существовать.
# POSTCONDITIONS: Возвращает содержимое ZIP-архива и имя файла.
def export_dashboard(self, dashboard_id: int) -> Tuple[bytes, str]:
"""[CONTRACT] Экспорт дашборда в ZIP-архив.
@pre:
- `dashboard_id` должен быть целочисленным ID существующего дашборда.
- Пользователь должен иметь права на экспорт.
@post:
- Возвращает кортеж: (бинарное_содержимое_zip, имя_файла).
- Имя файла извлекается из заголовков `Content-Disposition` или генерируется.
@raise:
- `DashboardNotFoundError`: Если дашборд с `dashboard_id` не найден (HTTP 404).
- `ExportError`: При любых других проблемах экспорта (например, неверный тип контента, пустой ответ).
- `NetworkError`: При проблемах с сетью.
"""
self.logger.info(f"[INFO] Запуск экспорта дашборда с ID: {dashboard_id}")
try:
# [ANCHOR] EXECUTE_EXPORT_REQUEST
# [REFACTORING_COMPLETE] Использование self.network.request для экспорта
response = self.network.request(
method="GET",
endpoint="/dashboard/export/",
params={"q": json.dumps([dashboard_id])},
stream=True, # Используем stream для обработки больших файлов
raw_response=True # Получаем сырой объект ответа requests.Response
# headers=self.headers # APIClient сам добавляет заголовки
)
response.raise_for_status() # Проверка статуса ответа
# [ANCHOR] VALIDATE_EXPORT_RESPONSE
self._validate_export_response(response, dashboard_id)
# [ANCHOR] RESOLVE_FILENAME
filename = self._resolve_export_filename(response, dashboard_id)
# [POSTCONDITION] Успешный экспорт
content = response.content # Получаем все содержимое
self.logger.info(
f"[COHERENCE_CHECK_PASSED] Дашборд {dashboard_id} успешно экспортирован. Размер: {len(content)} байт, Имя файла: {filename}"
)
return content, filename
except (DashboardNotFoundError, ExportError, NetworkError, PermissionDeniedError, SupersetAPIError) as e:
# Перехват и перевыброс уже типизированных ошибок от APIClient или предыдущих валидаций
self.logger.error(f"[ERROR] Ошибка экспорта дашборда {dashboard_id}: {str(e)}", exc_info=True, extra=getattr(e, 'context', {}))
raise
except Exception as e:
# Обработка любых непредвиденных ошибок
error_ctx = {"dashboard_id": dashboard_id, "error_type": type(e).__name__}
self.logger.critical(f"[CRITICAL] Непредвиденная ошибка при экспорте дашборда {dashboard_id}: {str(e)}", exc_info=True, extra=error_ctx)
raise ExportError(f"Непредвиденная ошибка при экспорте: {str(e)}", context=error_ctx) from e
# [HELPER] Метод _execute_export_request был инлайнирован в export_dashboard
# Это сделано, чтобы избежать лишней абстракции, так как он просто вызывает self.network.request.
# Валидация HTTP-ответа и ошибок теперь происходит в self.network.request и последующей self.raise_for_status().
self.logger.info(f"[INFO][SupersetClient.export_dashboard][ENTER] Exporting dashboard: {dashboard_id}")
response = self.network.request(
method="GET",
endpoint="/dashboard/export/",
params={"q": json.dumps([dashboard_id])},
stream=True,
raw_response=True
)
self._validate_export_response(response, dashboard_id)
filename = self._resolve_export_filename(response, dashboard_id)
content = response.content
self.logger.info(f"[INFO][SupersetClient.export_dashboard][SUCCESS] Exported dashboard: {dashboard_id}")
return content, filename
# END_FUNCTION_export_dashboard
# [ENTITY: Function('_validate_export_response')]
# CONTRACT:
# PURPOSE: Валидация ответа экспорта.
# PRECONDITIONS: `response` должен быть валидным HTTP-ответом.
# POSTCONDITIONS: Ответ валиден.
def _validate_export_response(self, response: Response, dashboard_id: int) -> None:
"""[HELPER] Валидация ответа экспорта.
@semantic:
- Проверяет, что Content-Type является `application/zip`.
- Проверяет, что ответ не пуст.
@raise:
- `ExportError`: При невалидном Content-Type или пустом содержимом.
"""
self.logger.debug(f"[DEBUG][SupersetClient._validate_export_response][ENTER] Validating export response for dashboard: {dashboard_id}")
content_type = response.headers.get('Content-Type', '')
if 'application/zip' not in content_type:
self.logger.error(
"[CONTRACT_VIOLATION] Неверный Content-Type для экспорта",
extra={
"dashboard_id": dashboard_id,
"expected_type": "application/zip",
"received_type": content_type
}
)
self.logger.error(f"[ERROR][SupersetClient._validate_export_response][FAILURE] Invalid content type: {content_type}")
raise ExportError(f"Получен не ZIP-архив (Content-Type: {content_type})")
if not response.content:
self.logger.error(
"[CONTRACT_VIOLATION] Пустой ответ при экспорте дашборда",
extra={"dashboard_id": dashboard_id}
)
self.logger.error("[ERROR][SupersetClient._validate_export_response][FAILURE] Empty response content.")
raise ExportError("Получены пустые данные при экспорте")
self.logger.debug(f"[COHERENCE_CHECK_PASSED] Ответ экспорта для дашборда {dashboard_id} валиден.")
self.logger.debug(f"[DEBUG][SupersetClient._validate_export_response][SUCCESS] Export response validated for dashboard: {dashboard_id}")
# END_FUNCTION__validate_export_response
# [ENTITY: Function('_resolve_export_filename')]
# CONTRACT:
# PURPOSE: Определение имени экспортируемого файла.
# PRECONDITIONS: `response` должен быть валидным HTTP-ответом.
# POSTCONDITIONS: Возвращает имя файла.
def _resolve_export_filename(self, response: Response, dashboard_id: int) -> str:
"""[HELPER] Определение имени экспортируемого файла.
@semantic:
- Пытается извлечь имя файла из заголовка `Content-Disposition`.
- Если заголовок отсутствует, генерирует имя файла на основе ID дашборда и текущей даты.
@post:
- Возвращает строку с именем файла.
"""
self.logger.debug(f"[DEBUG][SupersetClient._resolve_export_filename][ENTER] Resolving export filename for dashboard: {dashboard_id}")
filename = get_filename_from_headers(response.headers)
if not filename:
# [FALLBACK] Генерация имени файла
filename = f"dashboard_export_{dashboard_id}_{datetime.datetime.now().strftime('%Y%m%dT%H%M%S')}.zip"
self.logger.warning(
"[WARN] Не удалось извлечь имя файла из заголовков. Используется сгенерированное имя.",
extra={"generated_filename": filename, "dashboard_id": dashboard_id}
)
else:
self.logger.debug(
"[DEBUG] Имя файла экспорта получено из заголовков.",
extra={"header_filename": filename, "dashboard_id": dashboard_id}
)
timestamp = datetime.datetime.now().strftime('%Y%m%dT%H%M%S')
filename = f"dashboard_export_{dashboard_id}_{timestamp}.zip"
self.logger.warning(f"[WARNING][SupersetClient._resolve_export_filename][STATE_CHANGE] Could not resolve filename from headers, generated: {filename}")
self.logger.debug(f"[DEBUG][SupersetClient._resolve_export_filename][SUCCESS] Resolved export filename: {filename}")
return filename
# END_FUNCTION__resolve_export_filename
# [ENTITY: Function('export_to_file')]
# CONTRACT:
# PURPOSE: Экспорт дашборда напрямую в файл.
# PRECONDITIONS: `output_dir` должен существовать.
# POSTCONDITIONS: Дашборд сохранен в файл.
def export_to_file(self, dashboard_id: int, output_dir: Union[str, Path]) -> Path:
"""[CONTRACT] Экспорт дашборда напрямую в файл.
@pre:
- `dashboard_id` должен быть существующим ID дашборда.
- `output_dir` должен быть валидным, существующим путем и иметь права на запись.
@post:
- Дашборд экспортируется и сохраняется как ZIP-файл в `output_dir`.
- Возвращает `Path` к сохраненному файлу.
@raise:
- `FileNotFoundError`: Если `output_dir` не существует.
- `ExportError`: При ошибках экспорта или записи файла.
- `NetworkError`: При проблемах с сетью.
"""
self.logger.info(f"[INFO][SupersetClient.export_to_file][ENTER] Exporting dashboard {dashboard_id} to file in {output_dir}")
output_dir = Path(output_dir)
if not output_dir.exists():
self.logger.error(
"[CONTRACT_VIOLATION] Целевая директория для экспорта не найдена.",
extra={"output_dir": str(output_dir)}
)
self.logger.error(f"[ERROR][SupersetClient.export_to_file][FAILURE] Output directory does not exist: {output_dir}")
raise FileNotFoundError(f"Директория {output_dir} не найдена")
self.logger.info(f"[INFO] Экспорт дашборда {dashboard_id} в файл в директорию: {output_dir}")
try:
content, filename = self.export_dashboard(dashboard_id)
target_path = output_dir / filename
content, filename = self.export_dashboard(dashboard_id)
target_path = output_dir / filename
with open(target_path, 'wb') as f:
f.write(content)
self.logger.info(f"[INFO][SupersetClient.export_to_file][SUCCESS] Exported dashboard {dashboard_id} to {target_path}")
return target_path
# END_FUNCTION_export_to_file
with open(target_path, 'wb') as f:
f.write(content)
self.logger.info(
"[COHERENCE_CHECK_PASSED] Дашборд успешно сохранен на диск.",
extra={
"dashboard_id": dashboard_id,
"file_path": str(target_path),
"file_size": len(content)
}
)
return target_path
except (FileNotFoundError, ExportError, NetworkError, SupersetAPIError, DashboardNotFoundError) as e:
self.logger.error(f"[ERROR] Ошибка сохранения дашборда {dashboard_id} на диск: {str(e)}", exc_info=True, extra=getattr(e, 'context', {}))
raise
except IOError as io_err:
error_ctx = {"target_path": str(target_path), "dashboard_id": dashboard_id}
self.logger.critical(f"[CRITICAL] Ошибка записи файла для дашборда {dashboard_id}: {str(io_err)}", exc_info=True, extra=error_ctx)
raise ExportError("Ошибка сохранения файла на диск") from io_err
except Exception as e:
error_ctx = {"dashboard_id": dashboard_id, "error_type": type(e).__name__}
self.logger.critical(f"[CRITICAL] Непредвиденная ошибка при экспорте в файл: {str(e)}", exc_info=True, extra=error_ctx)
raise ExportError(f"Непредвиденная ошибка экспорта в файл: {str(e)}", context=error_ctx) from e
# [SECTION] Импорт дашбордов
# [ENTITY: Function('import_dashboard')]
# CONTRACT:
# PURPOSE: Импорт дашборда из ZIP-архива.
# PRECONDITIONS: `file_name` должен быть валидным ZIP-файлом.
# POSTCONDITIONS: Возвращает ответ API.
def import_dashboard(self, file_name: Union[str, Path]) -> Dict:
"""[CONTRACT] Импорт дашборда из ZIP-архива.
@pre:
- `file_name` должен указывать на существующий и валидный ZIP-файл Superset экспорта.
- Пользователь должен иметь права на импорт дашбордов.
@post:
- Дашборд импортируется (или обновляется, если `overwrite` включен).
- Возвращает `dict` с ответом API об импорте.
@raise:
- `FileNotFoundError`: Если файл не существует.
- `InvalidZipFormatError`: Если файл не является корректным ZIP-архивом Superset.
- `PermissionDeniedError`: Если у пользователя нет прав на импорт.
- `SupersetAPIError`: При других ошибках API импорта.
- `NetworkError`: При проблемах с сетью.
"""
self.logger.info(f"[INFO] Инициирован импорт дашборда из файла: {file_name}")
# [PRECONDITION] Валидация входного файла
self.logger.info(f"[INFO][SupersetClient.import_dashboard][ENTER] Importing dashboard from: {file_name}")
self._validate_import_file(file_name)
try:
# [ANCHOR] UPLOAD_FILE_TO_API
# [REFACTORING_COMPLETE] Использование self.network.upload_file
import_response = self.network.upload_file(
endpoint="/dashboard/import/",
file_obj=Path(file_name), # Pathlib объект, который APIClient может преобразовать в бинарный
file_name=Path(file_name).name, # Имя файла для FormData
form_field="formData",
extra_data={'overwrite': 'true'}, # Предполагаем, что всегда хотим перезаписывать
timeout=self.config.timeout * 2 # Удвоенный таймаут для загрузки больших файлов
# headers=self.headers # APIClient сам добавляет заголовки
)
# [POSTCONDITION] Проверка успешного ответа импорта (Superset обычно возвращает JSON)
if not isinstance(import_response, dict) or "message" not in import_response:
self.logger.warning("[CONTRACT_VIOLATION] Неожиданный формат ответа при импорте", extra={"response": import_response})
raise SupersetAPIError("Неожиданный формат ответа после импорта дашборда.")
import_response = self.network.upload_file(
endpoint="/dashboard/import/",
file_info={
"file_obj": Path(file_name),
"file_name": Path(file_name).name,
"form_field": "formData",
},
extra_data={'overwrite': 'true'},
timeout=self.config.timeout * 2
)
self.logger.info(f"[INFO][SupersetClient.import_dashboard][SUCCESS] Imported dashboard from: {file_name}")
return import_response
# END_FUNCTION_import_dashboard
self.logger.info(
f"[COHERENCE_CHECK_PASSED] Дашборд из '{file_name}' успешно импортирован.",
extra={"api_message": import_response.get("message", "N/A"), "file": file_name}
)
return import_response
except (FileNotFoundError, InvalidZipFormatError, PermissionDeniedError, SupersetAPIError, NetworkError, DashboardNotFoundError) as e:
self.logger.error(f"[ERROR] Ошибка импорта дашборда из '{file_name}': {str(e)}", exc_info=True, extra=getattr(e, 'context', {}))
raise
except Exception as e:
error_ctx = {"file": file_name, "error_type": type(e).__name__}
self.logger.critical(f"[CRITICAL] Непредвиденная ошибка при импорте дашборда: {str(e)}", exc_info=True, extra=error_ctx)
raise SupersetAPIError(f"Непредвиденная ошибка импорта: {str(e)}", context=error_ctx) from e
# [SECTION] Приватные методы-помощники
# [ENTITY: Function('_validate_query_params')]
# CONTRACT:
# PURPOSE: Нормализация и валидация параметров запроса.
# PRECONDITIONS: None
# POSTCONDITIONS: Возвращает валидный словарь параметров.
def _validate_query_params(self, query: Optional[Dict]) -> Dict:
"""[HELPER] Нормализация и валидация параметров запроса для списка дашбордов.
@semantic:
- Устанавливает значения по умолчанию для `columns`, `page`, `page_size`.
- Объединяет предоставленные `query` параметры с дефолтными.
@post:
- Возвращает словарь с полными и валидными параметрами запроса.
"""
self.logger.debug("[DEBUG][SupersetClient._validate_query_params][ENTER] Validating query params.")
base_query = {
"columns": ["slug", "id", "changed_on_utc", "dashboard_title", "published"],
"page": 0,
"page_size": 1000 # Достаточно большой размер страницы для обхода пагинации
"page_size": 1000
}
# [COHERENCE_CHECK_PASSED] Параметры запроса сформированы корректно.
return {**base_query, **(query or {})}
validated_query = {**base_query, **(query or {})}
self.logger.debug(f"[DEBUG][SupersetClient._validate_query_params][SUCCESS] Validated query params: {validated_query}")
return validated_query
# END_FUNCTION__validate_query_params
# [ENTITY: Function('_fetch_total_object_count')]
# CONTRACT:
# PURPOSE: Получение общего количества объектов.
# PRECONDITIONS: `endpoint` должен быть валидным.
# POSTCONDITIONS: Возвращает общее количество объектов.
def _fetch_total_object_count(self, endpoint:str) -> int:
"""[CONTRACT][HELPER] Получение общего количества объектов (дашбордов, датасетов, чартов, баз данных) в системе.
@delegates:
- Сетевой запрос к `APIClient.fetch_paginated_count`.
@pre:
- Клиент должен быть авторизован.
@post:
- Возвращает целочисленное количество дашбордов.
@raise:
- `SupersetAPIError` или `NetworkError` при проблемах с API/сетью.
"""
query_params_for_count = {
'columns': ['id'],
'page': 0,
'page_size': 1
}
self.logger.debug("[DEBUG] Запрос общего количества дашбордов.")
try:
# [REFACTORING_COMPLETE] Использование self.network.fetch_paginated_count
count = self.network.fetch_paginated_count(
endpoint=endpoint,
query_params=query_params_for_count,
count_field="count"
)
self.logger.debug(f"[COHERENCE_CHECK_PASSED] Получено общее количество дашбордов: {count}")
return count
except (SupersetAPIError, NetworkError, PermissionDeniedError) as e:
self.logger.error(f"[ERROR] Ошибка получения общего количества дашбордов: {str(e)}", exc_info=True, extra=getattr(e, 'context', {}))
raise # Перевыброс ошибки
except Exception as e:
error_ctx = {"error_type": type(e).__name__}
self.logger.critical(f"[CRITICAL] Непредвиденная ошибка при получении общего количества: {str(e)}", exc_info=True, extra=error_ctx)
raise SupersetAPIError(f"Непредвиденная ошибка при получении count: {str(e)}", context=error_ctx) from e
self.logger.debug(f"[DEBUG][SupersetClient._fetch_total_object_count][ENTER] Fetching total object count for endpoint: {endpoint}")
query_params_for_count = {'page': 0, 'page_size': 1}
count = self.network.fetch_paginated_count(
endpoint=endpoint,
query_params=query_params_for_count,
count_field="count"
)
self.logger.debug(f"[DEBUG][SupersetClient._fetch_total_object_count][SUCCESS] Fetched total object count: {count}")
return count
# END_FUNCTION__fetch_total_object_count
def _fetch_all_pages(self, endpoint:str, query: Dict, total_count: int) -> List[Dict]:
"""[CONTRACT][HELPER] Обход всех страниц пагинированного API для получения всех данных.
@delegates:
- Сетевые запросы к `APIClient.fetch_paginated_data()`.
@pre:
- `query` должен содержать `page_size`.
- `total_count` должен быть корректным общим количеством элементов.
- `endpoint` должен содержать часть url запроса, например endpoint="/dashboard/".
@post:
- Возвращает список всех элементов, собранных со всех страниц.
@raise:
- `SupersetAPIError` или `NetworkError` при проблемах с API/сетью.
- `ValueError` при некорректных параметрах пагинации.
"""
self.logger.debug(f"[DEBUG] Запуск обхода пагинации. Всего элементов: {total_count}, query: {query}")
try:
if 'page_size' not in query or not query['page_size']:
self.logger.error("[CONTRACT_VIOLATION] Параметр 'page_size' отсутствует или неверен в query.")
raise ValueError("Отсутствует 'page_size' в query параметрах для пагинации")
# [REFACTORING_COMPLETE] Использование self.network.fetch_paginated_data
all_data = self.network.fetch_paginated_data(
endpoint=endpoint,
base_query=query,
total_count=total_count,
results_field="result"
)
self.logger.debug(f"[COHERENCE_CHECK_PASSED] Успешно получено {len(all_data)} элементов со всех страниц.")
return all_data
except (SupersetAPIError, NetworkError, ValueError, PermissionDeniedError) as e:
self.logger.error(f"[ERROR] Ошибка при обходе пагинации: {str(e)}", exc_info=True, extra=getattr(e, 'context', {}))
raise
except Exception as e:
error_ctx = {"query": query, "total_count": total_count, "error_type": type(e).__name__}
self.logger.critical(f"[CRITICAL] Непредвиденная ошибка при обходе пагинации: {str(e)}", exc_info=True, extra=error_ctx)
raise SupersetAPIError(f"Непредвиденная ошибка пагинации: {str(e)}", context=error_ctx) from e
# [ENTITY: Function('_fetch_all_pages')]
# CONTRACT:
# PURPOSE: Обход всех страниц пагинированного API.
# PRECONDITIONS: `pagination_options` должен содержать необходимые параметры.
# POSTCONDITIONS: Возвращает список всех объектов.
def _fetch_all_pages(self, endpoint:str, pagination_options: Dict) -> List[Dict]:
self.logger.debug(f"[DEBUG][SupersetClient._fetch_all_pages][ENTER] Fetching all pages for endpoint: {endpoint}")
all_data = self.network.fetch_paginated_data(
endpoint=endpoint,
pagination_options=pagination_options
)
self.logger.debug(f"[DEBUG][SupersetClient._fetch_all_pages][SUCCESS] Fetched all pages for endpoint: {endpoint}")
return all_data
# END_FUNCTION__fetch_all_pages
# [ENTITY: Function('_validate_import_file')]
# CONTRACT:
# PURPOSE: Проверка файла перед импортом.
# PRECONDITIONS: `zip_path` должен быть путем к файлу.
# POSTCONDITIONS: Файл валиден.
def _validate_import_file(self, zip_path: Union[str, Path]) -> None:
"""[HELPER] Проверка файла перед импортом.
@semantic:
- Проверяет существование файла.
- Проверяет, что файл является валидным ZIP-архивом.
- Проверяет, что ZIP-архив содержит `metadata.yaml` (ключевой для экспорта Superset).
@raise:
- `FileNotFoundError`: Если файл не существует.
- `InvalidZipFormatError`: Если файл не ZIP или не содержит `metadata.yaml`.
"""
self.logger.debug(f"[DEBUG][SupersetClient._validate_import_file][ENTER] Validating import file: {zip_path}")
path = Path(zip_path)
self.logger.debug(f"[DEBUG] Валидация файла для импорта: {path}")
if not path.exists():
self.logger.error(
"[CONTRACT_VIOLATION] Файл для импорта не найден.",
extra={"file_path": str(path)}
)
self.logger.error(f"[ERROR][SupersetClient._validate_import_file][FAILURE] Import file does not exist: {zip_path}")
raise FileNotFoundError(f"Файл {zip_path} не существует")
if not zipfile.is_zipfile(path):
self.logger.error(
"[CONTRACT_VIOLATION] Файл не является валидным ZIP-архивом.",
extra={"file_path": str(path)}
)
self.logger.error(f"[ERROR][SupersetClient._validate_import_file][FAILURE] Import file is not a zip file: {zip_path}")
raise InvalidZipFormatError(f"Файл {zip_path} не является ZIP-архивом")
with zipfile.ZipFile(path, 'r') as zf:
if not any(n.endswith('metadata.yaml') for n in zf.namelist()):
self.logger.error(f"[ERROR][SupersetClient._validate_import_file][FAILURE] Import file does not contain metadata.yaml: {zip_path}")
raise InvalidZipFormatError(f"Архив {zip_path} не содержит 'metadata.yaml'")
self.logger.debug(f"[DEBUG][SupersetClient._validate_import_file][SUCCESS] Validated import file: {zip_path}")
# END_FUNCTION__validate_import_file
try:
with zipfile.ZipFile(path, 'r') as zf:
# [CONTRACT] Проверяем наличие metadata.yaml
if not any(n.endswith('metadata.yaml') for n in zf.namelist()):
self.logger.error(
"[CONTRACT_VIOLATION] ZIP-архив не содержит 'metadata.yaml'.",
extra={"file_path": str(path), "zip_contents": zf.namelist()[:5]} # Логируем первые 5 файлов для отладки
)
raise InvalidZipFormatError(f"Архив {zip_path} не содержит 'metadata.yaml', не является корректным экспортом Superset.")
self.logger.debug(f"[COHERENCE_CHECK_PASSED] Файл '{path}' успешно прошел валидацию для импорта.")
except zipfile.BadZipFile as e:
self.logger.error(
f"[CONTRACT_VIOLATION] Ошибка чтения ZIP-файла: {str(e)}",
exc_info=True, extra={"file_path": str(path)}
)
raise InvalidZipFormatError(f"Файл {zip_path} поврежден или имеет некорректный формат ZIP.") from e
except Exception as e:
self.logger.critical(
f"[CRITICAL] Непредвиденная ошибка при валидации ZIP-файла: {str(e)}",
exc_info=True, extra={"file_path": str(path)}
)
raise SupersetAPIError(f"Непредвиденная ошибка валидации ZIP: {str(e)}", context={"file_path": str(path)}) from e

View File

@@ -1,153 +1,124 @@
# [MODULE] Иерархия исключений
# @contract: Все ошибки наследуют `SupersetToolError` для единой точки обработки.
# @semantic: Каждый тип исключения соответствует конкретной проблемной области в инструменте Superset.
# @coherence:
# - Полное покрытие всех сценариев ошибок клиента и утилит.
# - Четкая классификация по уровню серьезности (от общей до специфичной).
# - Дополнительный `context` для каждой ошибки, помогающий в диагностике.
# pylint: disable=too-many-ancestors
"""
[MODULE] Иерархия исключений
@contract: Все ошибки наследуют `SupersetToolError` для единой точки обработки.
"""
# [IMPORTS] Standard library
from pathlib import Path
# [IMPORTS] Typing
from typing import Optional, Dict, Any,Union
from typing import Optional, Dict, Any, Union
class SupersetToolError(Exception):
"""[BASE] Базовый класс для всех ошибок инструмента Superset.
@semantic: Обеспечивает стандартизированный формат сообщений об ошибках с контекстом.
@invariant:
- `message` всегда присутствует.
- `context` всегда является словарем, даже если пустой.
"""
"""[BASE] Базовый класс для всех ошибок инструмента Superset."""
# [ENTITY: Function('__init__')]
# CONTRACT:
# PURPOSE: Инициализация базового исключения.
# PRECONDITIONS: `context` должен быть словарем или None.
# POSTCONDITIONS: Исключение создано с сообщением и контекстом.
def __init__(self, message: str, context: Optional[Dict[str, Any]] = None):
# [PRECONDITION] Проверка типа контекста
if not isinstance(context, (dict, type(None))):
# [COHERENCE_CHECK_FAILED] Ошибка в передаче контекста
raise TypeError("Контекст ошибки должен быть словарем или None")
self.context = context or {}
super().__init__(f"{message} | Context: {self.context}")
# [POSTCONDITION] Логирование создания ошибки
# Можно добавить здесь логирование, но обычно ошибки логируются в месте их перехвата/подъема,
# чтобы избежать дублирования и получить полный стек вызовов.
# END_FUNCTION___init__
# [ERROR-GROUP] Проблемы аутентификации и авторизации
class AuthenticationError(SupersetToolError):
"""[AUTH] Ошибки аутентификации (неверные учетные данные) или авторизации (проблемы с сессией).
@context: url, username, error_detail (опционально).
"""
# [CONTRACT]
# Description: Исключение, возникающее при ошибках аутентификации в Superset API.
"""[AUTH] Ошибки аутентификации или авторизации."""
# [ENTITY: Function('__init__')]
# CONTRACT:
# PURPOSE: Инициализация исключения аутентификации.
# PRECONDITIONS: None
# POSTCONDITIONS: Исключение создано.
def __init__(self, message: str = "Authentication failed", **context: Any):
super().__init__(
f"[AUTH_FAILURE] {message}",
{"type": "authentication", **context}
)
super().__init__(f"[AUTH_FAILURE] {message}", context={"type": "authentication", **context})
# END_FUNCTION___init__
class PermissionDeniedError(AuthenticationError):
"""[AUTH] Ошибка отказа в доступе из-за недостаточных прав пользователя.
@semantic: Указывает на то, что операция не разрешена.
@context: required_permission (опционально), user_roles (опционально), endpoint (опционально).
@invariant: Наследует от `AuthenticationError`, так как это разновидность проблемы доступа.
"""
"""[AUTH] Ошибка отказа в доступе."""
# [ENTITY: Function('__init__')]
# CONTRACT:
# PURPOSE: Инициализация исключения отказа в доступе.
# PRECONDITIONS: None
# POSTCONDITIONS: Исключение создано.
def __init__(self, message: str = "Permission denied", required_permission: Optional[str] = None, **context: Any):
full_message = f"Permission denied: {required_permission}" if required_permission else message
super().__init__(
full_message,
{"type": "authorization", "required_permission": required_permission, **context}
)
super().__init__(full_message, context={"required_permission": required_permission, **context})
# END_FUNCTION___init__
# [ERROR-GROUP] Проблемы API-вызовов
class SupersetAPIError(SupersetToolError):
"""[API] Общие ошибки взаимодействия с Superset API.
@semantic: Для ошибок, возвращаемых Superset API, или проблем с парсингом ответа.
@context: endpoint, method, status_code, response_body (опционально), error_message (из API).
"""
# [CONTRACT]
# Description: Исключение, возникающее при получении ошибки от Superset API (статус код >= 400).
"""[API] Общие ошибки взаимодействия с Superset API."""
# [ENTITY: Function('__init__')]
# CONTRACT:
# PURPOSE: Инициализация исключения ошибки API.
# PRECONDITIONS: None
# POSTCONDITIONS: Исключение создано.
def __init__(self, message: str = "Superset API error", **context: Any):
super().__init__(
f"[API_FAILURE] {message}",
{"type": "api_call", **context}
)
super().__init__(f"[API_FAILURE] {message}", context={"type": "api_call", **context})
# END_FUNCTION___init__
# [ERROR-SUBCLASS] Детализированные ошибки API
class ExportError(SupersetAPIError):
"""[API:EXPORT] Проблемы, специфичные для операций экспорта дашбордов.
@semantic: Может быть вызвано невалидным форматом ответа, ошибками Superset при экспорте.
@context: dashboard_id (опционально), details (опционально).
"""
"""[API:EXPORT] Проблемы, специфичные для операций экспорта."""
# [ENTITY: Function('__init__')]
# CONTRACT:
# PURPOSE: Инициализация исключения ошибки экспорта.
# PRECONDITIONS: None
# POSTCONDITIONS: Исключение создано.
def __init__(self, message: str = "Dashboard export failed", **context: Any):
super().__init__(f"[EXPORT_FAILURE] {message}", {"subtype": "export", **context})
super().__init__(f"[EXPORT_FAILURE] {message}", context={"subtype": "export", **context})
# END_FUNCTION___init__
class DashboardNotFoundError(SupersetAPIError):
"""[API:404] Запрошенный дашборд или ресурс не существует.
@semantic: Соответствует HTTP 404 Not Found.
@context: dashboard_id_or_slug, url.
"""
# [CONTRACT]
# Description: Исключение, специфичное для случая, когда дашборд не найден (статус 404).
"""[API:404] Запрошенный дашборд или ресурс не существует."""
# [ENTITY: Function('__init__')]
# CONTRACT:
# PURPOSE: Инициализация исключения "дашборд не найден".
# PRECONDITIONS: None
# POSTCONDITIONS: Исключение создано.
def __init__(self, dashboard_id_or_slug: Union[int, str], message: str = "Dashboard not found", **context: Any):
super().__init__(
f"[NOT_FOUND] Dashboard '{dashboard_id_or_slug}' {message}",
{"subtype": "not_found", "resource_id": dashboard_id_or_slug, **context}
)
super().__init__(f"[NOT_FOUND] Dashboard '{dashboard_id_or_slug}' {message}", context={"subtype": "not_found", "resource_id": dashboard_id_or_slug, **context})
# END_FUNCTION___init__
class DatasetNotFoundError(SupersetAPIError):
"""[API:404] Запрашиваемый набор данных не существует.
@semantic: Соответствует HTTP 404 Not Found.
@context: dataset_id_or_slug, url.
"""
# [CONTRACT]
# Description: Исключение, специфичное для случая, когда набор данных не найден (статус 404).
"""[API:404] Запрашиваемый набор данных не существует."""
# [ENTITY: Function('__init__')]
# CONTRACT:
# PURPOSE: Инициализация исключения "набор данных не найден".
# PRECONDITIONS: None
# POSTCONDITIONS: Исключение создано.
def __init__(self, dataset_id_or_slug: Union[int, str], message: str = "Dataset not found", **context: Any):
super().__init__(
f"[NOT_FOUND] Dataset '{dataset_id_or_slug}' {message}",
{"subtype": "not_found", "resource_id": dataset_id_or_slug, **context}
)
super().__init__(f"[NOT_FOUND] Dataset '{dataset_id_or_slug}' {message}", context={"subtype": "not_found", "resource_id": dataset_id_or_slug, **context})
# END_FUNCTION___init__
# [ERROR-SUBCLASS] Детализированные ошибки обработки файлов
class InvalidZipFormatError(SupersetToolError):
"""[FILE:ZIP] Некорректный формат ZIP-архива или содержимого для импорта/экспорта.
@semantic: Указывает на проблемы с целостностью или структурой ZIP-файла.
@context: file_path, expected_content (например, metadata.yaml), error_detail.
"""
"""[FILE:ZIP] Некорректный формат ZIP-архива."""
# [ENTITY: Function('__init__')]
# CONTRACT:
# PURPOSE: Инициализация исключения некорректного формата ZIP.
# PRECONDITIONS: None
# POSTCONDITIONS: Исключение создано.
def __init__(self, message: str = "Invalid ZIP format or content", file_path: Optional[Union[str, Path]] = None, **context: Any):
super().__init__(
f"[FILE_ERROR] {message}",
{"type": "file_validation", "file_path": str(file_path) if file_path else "N/A", **context}
)
super().__init__(f"[FILE_ERROR] {message}", context={"type": "file_validation", "file_path": str(file_path) if file_path else "N/A", **context})
# END_FUNCTION___init__
# [ERROR-GROUP] Системные и network-ошибки
class NetworkError(SupersetToolError):
"""[NETWORK] Проблемы соединения, таймауты, DNS-ошибки и т.п.
@semantic: Ошибки, связанные с невозможностью установить или поддерживать сетевое соединение.
@context: url, original_exception (опционально), timeout (опционально).
"""
# [CONTRACT]
# Description: Исключение, возникающее при сетевых ошибках во время взаимодействия с Superset API.
"""[NETWORK] Проблемы соединения."""
# [ENTITY: Function('__init__')]
# CONTRACT:
# PURPOSE: Инициализация исключения сетевой ошибки.
# PRECONDITIONS: None
# POSTCONDITIONS: Исключение создано.
def __init__(self, message: str = "Network connection failed", **context: Any):
super().__init__(
f"[NETWORK_FAILURE] {message}",
{"type": "network", **context}
)
super().__init__(f"[NETWORK_FAILURE] {message}", context={"type": "network", **context})
# END_FUNCTION___init__
class FileOperationError(SupersetToolError):
"""
# [CONTRACT]
# Description: Исключение, возникающее при ошибках файловых операций (чтение, запись, архивирование).
"""
pass
"""[FILE] Ошибка файловых операций."""
class InvalidFileStructureError(FileOperationError):
"""
# [CONTRACT]
# Description: Исключение, возникающее при обнаружении некорректной структуры файлов/директорий.
"""
pass
"""[FILE] Некорректная структура файлов/директорий."""
class ConfigurationError(SupersetToolError):
"""
# [CONTRACT]
# Description: Исключение, возникающее при ошибках в конфигурации инструмента.
"""
pass
"""[CONFIG] Ошибка в конфигурации инструмента."""

View File

@@ -1,28 +1,19 @@
# [MODULE] Сущности данных конфигурации
# @desc: Определяет структуры данных, используемые для конфигурации и трансформации в инструменте Superset.
# @contracts:
# - Все модели наследуются от `pydantic.BaseModel` для автоматической валидации.
# - Валидация URL-адресов и параметров аутентификации.
# - Валидация структуры конфигурации БД для миграций.
# @coherence:
# - Все модели согласованы со схемой API Superset v1.
# - Совместимы с клиентскими методами `SupersetClient` и утилитами.
# pylint: disable=no-self-argument,too-few-public-methods
"""
[MODULE] Сущности данных конфигурации
@desc: Определяет структуры данных, используемые для конфигурации и трансформации в инструменте Superset.
"""
# [IMPORTS] Pydantic и Typing
from typing import Optional, Dict, Any, Union
from pydantic import BaseModel, validator, Field, HttpUrl
# [COHERENCE_CHECK_PASSED] Все необходимые импорты для Pydantic моделей.
from typing import Optional, Dict, Any
from pydantic import BaseModel, validator, Field, HttpUrl, VERSION
# [IMPORTS] Локальные модули
from .utils.logger import SupersetLogger
class SupersetConfig(BaseModel):
"""[CONFIG] Конфигурация подключения к Superset API.
@semantic: Инкапсулирует основные параметры, необходимые для инициализации `SupersetClient`.
@invariant:
- `base_url` должен быть валидным HTTP(S) URL и содержать `/api/v1`.
- `auth` должен содержать обязательные поля для аутентификации по логину/паролю.
- `timeout` должен быть положительным числом.
"""
[CONFIG] Конфигурация подключения к Superset API.
"""
base_url: str = Field(..., description="Базовый URL Superset API, включая версию /api/v1.", pattern=r'.*/api/v1.*')
auth: Dict[str, str] = Field(..., description="Словарь с данными для аутентификации (provider, username, password, refresh).")
@@ -30,118 +21,69 @@ class SupersetConfig(BaseModel):
timeout: int = Field(30, description="Таймаут в секундах для HTTP-запросов.")
logger: Optional[SupersetLogger] = Field(None, description="Экземпляр логгера для логирования внутри клиента.")
# [VALIDATOR] Проверка параметров аутентификации
# [ENTITY: Function('validate_auth')]
# CONTRACT:
# PURPOSE: Валидация словаря `auth`.
# PRECONDITIONS: `v` должен быть словарем.
# POSTCONDITIONS: Возвращает `v` если все обязательные поля присутствуют.
@validator('auth')
def validate_auth(cls, v: Dict[str, str]) -> Dict[str, str]:
"""[CONTRACT_VALIDATOR] Валидация словаря `auth`.
@pre:
- `v` должен быть словарем.
@post:
- Возвращает `v` если все обязательные поля присутствуют.
@raise:
- `ValueError`: Если отсутствуют обязательные поля ('provider', 'username', 'password', 'refresh').
"""
def validate_auth(cls, v: Dict[str, str], values: dict) -> Dict[str, str]:
logger = values.get('logger') or SupersetLogger(name="SupersetConfig")
logger.debug("[DEBUG][SupersetConfig.validate_auth][ENTER] Validating auth.")
required = {'provider', 'username', 'password', 'refresh'}
if not required.issubset(v.keys()):
raise ValueError(
f"[CONTRACT_VIOLATION] Словарь 'auth' должен содержать поля: {required}. "
f"Отсутствующие: {required - v.keys()}"
)
# [COHERENCE_CHECK_PASSED] Auth-конфигурация валидна.
logger.error("[ERROR][SupersetConfig.validate_auth][FAILURE] Missing required auth fields.")
raise ValueError(f"Словарь 'auth' должен содержать поля: {required}. Отсутствующие: {required - v.keys()}")
logger.debug("[DEBUG][SupersetConfig.validate_auth][SUCCESS] Auth validated.")
return v
# [VALIDATOR] Проверка base_url
# END_FUNCTION_validate_auth
# [ENTITY: Function('check_base_url_format')]
# CONTRACT:
# PURPOSE: Валидация формата `base_url`.
# PRECONDITIONS: `v` должна быть строкой.
# POSTCONDITIONS: Возвращает `v` если это валидный URL.
@validator('base_url')
def check_base_url_format(cls, v: str) -> str:
"""[CONTRACT_VALIDATOR] Валидация формата `base_url`.
@pre:
- `v` должна быть строкой.
@post:
- Возвращает `v` если это валидный URL.
@raise:
- `ValueError`: Если URL невалиден.
"""
def check_base_url_format(cls, v: str, values: dict) -> str:
logger = values.get('logger') or SupersetLogger(name="SupersetConfig")
logger.debug("[DEBUG][SupersetConfig.check_base_url_format][ENTER] Validating base_url.")
try:
# Для Pydantic v2:
from pydantic import HttpUrl
HttpUrl(v, scheme="https") # Явное указание схемы
except ValueError:
# Для совместимости с Pydantic v1:
HttpUrl(v)
if VERSION.startswith('1'):
HttpUrl(v)
except (ValueError, TypeError) as exc:
logger.error("[ERROR][SupersetConfig.check_base_url_format][FAILURE] Invalid base_url format.")
raise ValueError(f"Invalid URL format: {v}") from exc
logger.debug("[DEBUG][SupersetConfig.check_base_url_format][SUCCESS] base_url validated.")
return v
# END_FUNCTION_check_base_url_format
class Config:
arbitrary_types_allowed = True # Разрешаем Pydantic обрабатывать произвольные типы (например, SupersetLogger)
json_schema_extra = {
"example": {
"base_url": "https://host/api/v1/",
"auth": {
"provider": "db",
"username": "user",
"password": "pass",
"refresh": True
},
"verify_ssl": True,
"timeout": 60
}
}
"""Pydantic config"""
arbitrary_types_allowed = True
# [SEMANTIC-TYPE] Конфигурация БД для миграций
class DatabaseConfig(BaseModel):
"""[CONFIG] Параметры трансформации баз данных при миграции дашбордов.
@semantic: Содержит `old` и `new` состояния конфигурации базы данных,
используемые для поиска и замены в YAML-файлах экспортированных дашбордов.
@invariant:
- `database_config` должен быть словарем с ключами 'old' и 'new'.
- Каждое из 'old' и 'new' должно быть словарем, содержащим метаданные БД Superset.
"""
[CONFIG] Параметры трансформации баз данных при миграции дашбордов.
"""
database_config: Dict[str, Dict[str, Any]] = Field(..., description="Словарь, содержащий 'old' и 'new' конфигурации базы данных.")
logger: Optional[SupersetLogger] = Field(None, description="Экземпляр логгера для логирования.")
# [ENTITY: Function('validate_config')]
# CONTRACT:
# PURPOSE: Валидация словаря `database_config`.
# PRECONDITIONS: `v` должен быть словарем.
# POSTCONDITIONS: Возвращает `v` если содержит ключи 'old' и 'new'.
@validator('database_config')
def validate_config(cls, v: Dict[str, Dict[str, Any]]) -> Dict[str, Dict[str, Any]]:
"""[CONTRACT_VALIDATOR] Валидация словаря `database_config`.
@pre:
- `v` должен быть словарем.
@post:
- Возвращает `v` если содержит ключи 'old' и 'new'.
@raise:
- `ValueError`: Если отсутствуют ключи 'old' или 'new'.
"""
def validate_config(cls, v: Dict[str, Dict[str, Any]], values: dict) -> Dict[str, Dict[str, Any]]:
logger = values.get('logger') or SupersetLogger(name="DatabaseConfig")
logger.debug("[DEBUG][DatabaseConfig.validate_config][ENTER] Validating database_config.")
if not {'old', 'new'}.issubset(v.keys()):
raise ValueError(
"[CONTRACT_VIOLATION] 'database_config' должен содержать ключи 'old' и 'new'."
)
# Дополнительно можно добавить проверку структуры `old` и `new` на наличие `uuid`, `database_name` и т.д.
# Для простоты пока ограничимся наличием ключей 'old' и 'new'.
# [COHERENCE_CHECK_PASSED] Конфигурация базы данных для миграции валидна.
logger.error("[ERROR][DatabaseConfig.validate_config][FAILURE] Missing 'old' or 'new' keys in database_config.")
raise ValueError("'database_config' должен содержать ключи 'old' и 'new'.")
logger.debug("[DEBUG][DatabaseConfig.validate_config][SUCCESS] database_config validated.")
return v
# END_FUNCTION_validate_config
class Config:
"""Pydantic config"""
arbitrary_types_allowed = True
json_schema_extra = {
"example": {
"database_config": {
"old":
{
"database_name": "Prod Clickhouse",
"sqlalchemy_uri": "clickhousedb+connect://clicketl:XXXXXXXXXX@rgm-s-khclk.hq.root.ad:443/dm",
"uuid": "b9b67cb5-9874-4dc6-87bd-354fc33be6f9",
"database_uuid": "b9b67cb5-9874-4dc6-87bd-354fc33be6f9",
"allow_ctas": "false",
"allow_cvas": "false",
"allow_dml": "false"
},
"new": {
"database_name": "Dev Clickhouse",
"sqlalchemy_uri": "clickhousedb+connect://dwhuser:XXXXXXXXXX@10.66.229.179:8123/dm",
"uuid": "e9fd8feb-cb77-4e82-bc1d-44768b8d2fc2",
"database_uuid": "e9fd8feb-cb77-4e82-bc1d-44768b8d2fc2",
"allow_ctas": "true",
"allow_cvas": "true",
"allow_dml": "true"
}
}
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -1,100 +1,71 @@
# [MODULE] Superset Init clients
# @contract: Автоматизирует процесс инициализации клиентов для использования скриптами.
# @semantic_layers:
# 1. Инициализация логгера и клиентов Superset.
# @coherence:
# - Использует `SupersetClient` для взаимодействия с API Superset.
# - Использует `SupersetLogger` для централизованного логирования.
# - Интегрируется с `keyring` для безопасного хранения паролей.
# [IMPORTS] Стандартная библиотека
import logging
from datetime import datetime
from pathlib import Path
# [MODULE] Superset Clients Initializer
# PURPOSE: Централизованно инициализирует клиенты Superset для различных окружений (DEV, PROD, SBX, PREPROD).
# COHERENCE:
# - Использует `SupersetClient` для создания экземпляров клиентов.
# - Использует `SupersetLogger` для логирования процесса.
# - Интегрируется с `keyring` для безопасного получения паролей.
# [IMPORTS] Сторонние библиотеки
import keyring
from typing import Dict
# [IMPORTS] Локальные модули
from superset_tool.models import SupersetConfig
from superset_tool.client import SupersetClient
from superset_tool.utils.logger import SupersetLogger
# [FUNCTION] setup_clients
# @contract: Инициализирует и возвращает SupersetClient для каждого заданного окружения.
# @pre:
# - `keyring` должен содержать необходимые пароли для "dev migrate", "prod migrate", "sandbox migrate".
# - `logger` должен быть инициализирован.
# @post:
# - Возвращает словарь {env_name: SupersetClient_instance}.
# - Логирует успешную инициализацию или ошибку.
# @raise:
# - `Exception`: При любой ошибке в процессе инициализации клиентов (например, отсутствие пароля в keyring, проблемы с сетью при первой аутентификации).
def setup_clients(logger: SupersetLogger):
"""Инициализация клиентов для разных окружений"""
# CONTRACT:
# PURPOSE: Инициализирует и возвращает словарь клиентов `SupersetClient` для всех предопределенных окружений.
# PRECONDITIONS:
# - `keyring` должен содержать пароли для систем "dev migrate", "prod migrate", "sandbox migrate", "preprod migrate".
# - `logger` должен быть инициализированным экземпляром `SupersetLogger`.
# POSTCONDITIONS:
# - Возвращает словарь, где ключи - это имена окружений ('dev', 'sbx', 'prod', 'preprod'),
# а значения - соответствующие экземпляры `SupersetClient`.
# PARAMETERS:
# - logger: SupersetLogger - Экземпляр логгера для записи процесса инициализации.
# RETURN: Dict[str, SupersetClient] - Словарь с инициализированными клиентами.
# EXCEPTIONS:
# - Логирует и выбрасывает `Exception` при любой ошибке (например, отсутствие пароля, ошибка подключения).
def setup_clients(logger: SupersetLogger) -> Dict[str, SupersetClient]:
"""Инициализирует и настраивает клиенты для всех окружений Superset."""
# [ANCHOR] CLIENTS_INITIALIZATION
logger.info("[INFO][INIT_CLIENTS_START] Запуск инициализации клиентов Superset.")
clients = {}
environments = {
"dev": "https://devta.bi.dwh.rusal.com/api/v1",
"prod": "https://prodta.bi.dwh.rusal.com/api/v1",
"sbx": "https://sandboxta.bi.dwh.rusal.com/api/v1",
"preprod": "https://preprodta.bi.dwh.rusal.com/api/v1"
}
try:
# [INFO] Инициализация конфигурации для Dev
dev_config = SupersetConfig(
base_url="https://devta.bi.dwh.rusal.com/api/v1",
auth={
"provider": "db",
"username": "migrate_user",
"password": keyring.get_password("system", "dev migrate"),
"refresh": True
},
verify_ssl=False
)
# [DEBUG] Dev config created: {dev_config.base_url}
for env_name, base_url in environments.items():
logger.debug(f"[DEBUG][CONFIG_CREATE] Создание конфигурации для окружения: {env_name.upper()}")
password = keyring.get_password("system", f"{env_name} migrate")
if not password:
raise ValueError(f"Пароль для '{env_name} migrate' не найден в keyring.")
# [INFO] Инициализация конфигурации для Prod
prod_config = SupersetConfig(
base_url="https://prodta.bi.dwh.rusal.com/api/v1",
auth={
"provider": "db",
"username": "migrate_user",
"password": keyring.get_password("system", "prod migrate"),
"refresh": True
},
verify_ssl=False
)
# [DEBUG] Prod config created: {prod_config.base_url}
config = SupersetConfig(
base_url=base_url,
auth={
"provider": "db",
"username": "migrate_user",
"password": password,
"refresh": True
},
verify_ssl=False
)
clients[env_name] = SupersetClient(config, logger)
logger.debug(f"[DEBUG][CLIENT_SUCCESS] Клиент для {env_name.upper()} успешно создан.")
# [INFO] Инициализация конфигурации для Sandbox
sandbox_config = SupersetConfig(
base_url="https://sandboxta.bi.dwh.rusal.com/api/v1",
auth={
"provider": "db",
"username": "migrate_user",
"password": keyring.get_password("system", "sandbox migrate"),
"refresh": True
},
verify_ssl=False
)
# [DEBUG] Sandbox config created: {sandbox_config.base_url}
# [INFO] Инициализация конфигурации для Preprod
preprod_config = SupersetConfig(
base_url="https://preprodta.bi.dwh.rusal.com/api/v1",
auth={
"provider": "db",
"username": "migrate_user",
"password": keyring.get_password("system", "preprod migrate"),
"refresh": True
},
verify_ssl=False
)
# [DEBUG] Sandbox config created: {sandbox_config.base_url}
# [INFO] Создание экземпляров SupersetClient
clients['dev'] = SupersetClient(dev_config, logger)
clients['sbx'] = SupersetClient(sandbox_config,logger)
clients['prod'] = SupersetClient(prod_config,logger)
clients['preprod'] = SupersetClient(preprod_config,logger)
logger.info("[COHERENCE_CHECK_PASSED] Клиенты для окружений успешно инициализированы", extra={"envs": list(clients.keys())})
logger.info(f"[COHERENCE_CHECK_PASSED][INIT_CLIENTS_SUCCESS] Все клиенты ({', '.join(clients.keys())}) успешно инициализированы.")
return clients
except Exception as e:
logger.error(f"[ERROR] Ошибка инициализации клиентов: {str(e)}", exc_info=True)
raise
logger.error(f"[CRITICAL][INIT_CLIENTS_FAILED] Ошибка при инициализации клиентов: {str(e)}", exc_info=True)
raise
# END_FUNCTION_setup_clients
# END_MODULE_init_clients

View File

@@ -1,9 +1,6 @@
# [MODULE] Superset Tool Logger Utility
# @contract: Этот модуль предоставляет утилиту для настройки логирования в приложении.
# @semantic_layers:
# - [CONFIG]: Настройка логгера.
# - [UTILITY]: Вспомогательные функции.
# @coherence: Модуль должен быть семантически когерентен со стандартной библиотекой `logging`.
# PURPOSE: Предоставляет стандартизированный класс-обертку `SupersetLogger` для настройки и использования логирования в проекте.
# COHERENCE: Модуль согласован со стандартной библиотекой `logging`, расширяя ее для нужд проекта.
import logging
import sys
@@ -11,8 +8,20 @@ from datetime import datetime
from pathlib import Path
from typing import Optional
# [CONSTANTS]
# CONTRACT:
# PURPOSE: Обеспечивает унифицированную настройку логгера с выводом в консоль и/или файл.
# PRECONDITIONS:
# - `name` должен быть строкой.
# - `level` должен быть валидным уровнем логирования (например, `logging.INFO`).
# POSTCONDITIONS:
# - Создает и настраивает логгер с указанным именем и уровнем.
# - Добавляет обработчики для вывода в файл (если указан `log_dir`) и в консоль (если `console=True`).
# - Очищает все предыдущие обработчики для данного логгера, чтобы избежать дублирования.
# PARAMETERS:
# - name: str - Имя логгера.
# - log_dir: Optional[Path] - Директория для сохранения лог-файлов.
# - level: int - Уровень логирования.
# - console: bool - Флаг для включения вывода в консоль.
class SupersetLogger:
def __init__(
self,
@@ -23,34 +32,40 @@ class SupersetLogger:
):
self.logger = logging.getLogger(name)
self.logger.setLevel(level)
formatter = logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s'
)
# Очищаем существующие обработчики
if self.logger.handlers:
for handler in self.logger.handlers[:]:
self.logger.removeHandler(handler)
# [ANCHOR] HANDLER_RESET
# Очищаем существующие обработчики, чтобы избежать дублирования вывода при повторной инициализации.
if self.logger.hasHandlers():
self.logger.handlers.clear()
# Файловый обработчик
# [ANCHOR] FILE_HANDLER
if log_dir:
log_dir.mkdir(parents=True, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d")
file_handler = logging.FileHandler(
log_dir / f"{name}_{self._get_timestamp()}.log"
log_dir / f"{name}_{timestamp}.log", encoding='utf-8'
)
file_handler.setFormatter(formatter)
self.logger.addHandler(file_handler)
# Консольный обработчик
# [ANCHOR] CONSOLE_HANDLER
if console:
console_handler = logging.StreamHandler()
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(formatter)
self.logger.addHandler(console_handler)
# CONTRACT:
# PURPOSE: (HELPER) Генерирует строку с текущей датой для имени лог-файла.
# RETURN: str - Отформатированная дата (YYYYMMDD).
def _get_timestamp(self) -> str:
return datetime.now().strftime("%Y%m%d")
# END_FUNCTION__get_timestamp
# [INTERFACE] Методы логирования
def info(self, message: str, extra: Optional[dict] = None, exc_info: bool = False):
self.logger.info(message, extra=extra, exc_info=exc_info)
@@ -59,47 +74,15 @@ class SupersetLogger:
def warning(self, message: str, extra: Optional[dict] = None, exc_info: bool = False):
self.logger.warning(message, extra=extra, exc_info=exc_info)
def critical(self, message: str, extra: Optional[dict] = None, exc_info: bool = False):
self.logger.critical(message, extra=extra, exc_info=exc_info)
def debug(self, message: str, extra: Optional[dict] = None, exc_info: bool = False):
self.logger.debug(message, extra=extra, exc_info=exc_info)
def exception(self, message: str):
self.logger.exception(message)
def exception(self, message: str, *args, **kwargs):
self.logger.exception(message, *args, **kwargs)
# END_CLASS_SupersetLogger
def setup_logger(name: str, level: int = logging.INFO) -> logging.Logger:
# [FUNCTION] setup_logger
# [CONTRACT]
"""
Настраивает и возвращает логгер с заданным именем и уровнем.
@pre:
- `name` является непустой строкой.
- `level` является допустимым уровнем логирования из модуля `logging`.
@post:
- Возвращает настроенный экземпляр `logging.Logger`.
- Логгер имеет StreamHandler, выводящий в sys.stdout.
- Форматтер логгера включает время, уровень, имя и сообщение.
@side_effects:
- Создает и добавляет StreamHandler к логгеру.
@invariant:
- Логгер с тем же именем всегда возвращает один и тот же экземпляр.
"""
# [CONFIG] Настройка логгера
# [COHERENCE_CHECK_PASSED] Логика настройки соответствует описанию.
logger = logging.getLogger(name)
logger.setLevel(level)
# Создание форматтера
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(name)s - %(message)s')
# Проверка наличия существующих обработчиков
if not logger.handlers:
# Создание StreamHandler для вывода в sys.stdout
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
# END_MODULE_logger

View File

@@ -1,15 +1,11 @@
# [MODULE] Сетевой клиент для API
# @contract: Инкапсулирует низкоуровневую HTTP-логику, аутентификацию, повторные попытки и обработку сетевых ошибок.
# @semantic_layers:
# 1. Инициализация сессии `requests` с настройками SSL и таймаутов.
# 2. Управление аутентификацией (получение и обновление access/CSRF токенов).
# 3. Выполнение HTTP-запросов (GET, POST и т.д.) с автоматическими заголовками.
# 4. Обработка пагинации для API-ответов.
# 5. Обработка загрузки файлов.
# @coherence:
# - Полностью независим от `SupersetClient`, предоставляя ему чистый API для сетевых операций.
# - Использует `SupersetLogger` для внутреннего логирования.
# - Всегда выбрасывает типизированные исключения из `superset_tool.exceptions`.
# -*- coding: utf-8 -*-
# pylint: disable=too-many-arguments,too-many-locals,too-many-statements,too-many-branches,unused-argument
"""
[MODULE] Сетевой клиент для API
[DESCRIPTION]
Инкапсулирует низкоуровневую HTTP-логику для взаимодействия с Superset API.
"""
# [IMPORTS] Стандартная библиотека
from typing import Optional, Dict, Any, BinaryIO, List, Union
@@ -19,173 +15,106 @@ from pathlib import Path
# [IMPORTS] Сторонние библиотеки
import requests
import urllib3 # Для отключения SSL-предупреждений
import urllib3 # Для отключения SSL-предупреждений
# [IMPORTS] Локальные модули
from ..exceptions import AuthenticationError, NetworkError, DashboardNotFoundError, SupersetAPIError, PermissionDeniedError
from .logger import SupersetLogger # Импорт логгера
from superset_tool.exceptions import (
AuthenticationError,
NetworkError,
DashboardNotFoundError,
SupersetAPIError,
PermissionDeniedError
)
from superset_tool.utils.logger import SupersetLogger # Импорт логгера
# [CONSTANTS]
DEFAULT_RETRIES = 3
DEFAULT_BACKOFF_FACTOR = 0.5
DEFAULT_TIMEOUT = 30
class APIClient:
"""[NETWORK-CORE] Инкапсулирует HTTP-логику для работы с API.
@contract:
- Гарантирует retry-механизмы для запросов.
- Выполняет SSL-валидацию или отключает ее по конфигурации.
- Автоматически управляет access и CSRF токенами.
- Преобразует HTTP-ошибки в типизированные исключения `superset_tool.exceptions`.
@pre:
- `base_url` должен быть валидным URL.
- `auth` должен содержать необходимые данные для аутентификации.
- `logger` должен быть инициализирован.
@post:
- Аутентификация выполняется при первом запросе или явно через `authenticate()`.
- `self._tokens` всегда содержит актуальные access/CSRF токены после успешной аутентификации.
@invariant:
- Сессия `requests` активна и настроена.
- Все запросы используют актуальные токены.
"""
"""[NETWORK-CORE] Инкапсулирует HTTP-логику для работы с API."""
def __init__(
self,
base_url: str,
auth: Dict[str, Any],
config: Dict[str, Any],
verify_ssl: bool = True,
timeout: int = 30,
timeout: int = DEFAULT_TIMEOUT,
logger: Optional[SupersetLogger] = None
):
# [INIT] Основные параметры
self.base_url = base_url
self.auth = auth
self.verify_ssl = verify_ssl
self.timeout = timeout
self.logger = logger or SupersetLogger(name="APIClient") # [COHERENCE_CHECK_PASSED] Инициализация логгера
# [INIT] Сессия Requests
self.logger = logger or SupersetLogger(name="APIClient")
self.logger.info("[INFO][APIClient.__init__][ENTER] Initializing APIClient.")
self.base_url = config.get("base_url")
self.auth = config.get("auth")
self.request_settings = {
"verify_ssl": verify_ssl,
"timeout": timeout
}
self.session = self._init_session()
self._tokens: Dict[str, str] = {} # [STATE] Хранилище токенов
self._authenticated = False # [STATE] Флаг аутентификации
self.logger.debug(
"[INIT] APIClient инициализирован.",
extra={"base_url": self.base_url, "verify_ssl": self.verify_ssl}
)
self._tokens: Dict[str, str] = {}
self._authenticated = False
self.logger.info("[INFO][APIClient.__init__][SUCCESS] APIClient initialized.")
def _init_session(self) -> requests.Session:
"""[HELPER] Настройка сессии `requests` с адаптерами и SSL-опциями.
@semantic: Создает и конфигурирует объект `requests.Session`.
"""
self.logger.debug("[DEBUG][APIClient._init_session][ENTER] Initializing session.")
session = requests.Session()
# [CONTRACT] Настройка повторных попыток
retries = requests.adapters.Retry(
total=DEFAULT_RETRIES,
backoff_factor=DEFAULT_BACKOFF_FACTOR,
status_forcelist=[500, 502, 503, 504],
allowed_methods={"HEAD", "GET", "POST", "PUT", "DELETE"}
)
session.mount('http://', requests.adapters.HTTPAdapter(max_retries=retries))
session.mount('https://', requests.adapters.HTTPAdapter(max_retries=retries))
session.verify = self.verify_ssl
if not self.verify_ssl:
adapter = requests.adapters.HTTPAdapter(max_retries=retries)
session.mount('http://', adapter)
session.mount('https://', adapter)
verify_ssl = self.request_settings.get("verify_ssl", True)
session.verify = verify_ssl
if not verify_ssl:
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
self.logger.warning("[SECURITY] Отключена проверка SSL-сертификатов. Не использовать в продакшене без явной необходимости.")
self.logger.warning("[WARNING][APIClient._init_session][STATE_CHANGE] SSL verification disabled.")
self.logger.debug("[DEBUG][APIClient._init_session][SUCCESS] Session initialized.")
return session
def authenticate(self) -> Dict[str, str]:
"""[AUTH-FLOW] Получение access и CSRF токенов.
@pre:
- `self.auth` содержит валидные учетные данные.
@post:
- `self._tokens` обновлен актуальными токенами.
- Возвращает обновленные токены.
- `self._authenticated` устанавливается в `True`.
@raise:
- `AuthenticationError`: При ошибках аутентификации (неверные credentials, проблемы с API security).
- `NetworkError`: При проблемах с сетью.
"""
self.logger.info(f"[AUTH] Попытка аутентификации для {self.base_url}")
self.logger.info(f"[INFO][APIClient.authenticate][ENTER] Authenticating to {self.base_url}")
try:
# Шаг 1: Получение access_token
login_url = f"{self.base_url}/security/login"
response = self.session.post(
login_url,
json=self.auth, # Используем self.auth, который уже имеет "provider": "db", "refresh": True
timeout=self.timeout
json=self.auth,
timeout=self.request_settings.get("timeout", DEFAULT_TIMEOUT)
)
response.raise_for_status() # Выбросит HTTPError для 4xx/5xx ответов
response.raise_for_status()
access_token = response.json()["access_token"]
self.logger.debug("[AUTH] Access token успешно получен.")
# Шаг 2: Получение CSRF токена
csrf_url = f"{self.base_url}/security/csrf_token/"
csrf_response = self.session.get(
csrf_url,
headers={"Authorization": f"Bearer {access_token}"},
timeout=self.timeout
timeout=self.request_settings.get("timeout", DEFAULT_TIMEOUT)
)
csrf_response.raise_for_status()
csrf_token = csrf_response.json()["result"]
self.logger.debug("[AUTH] CSRF token успешно получен.")
# [STATE] Сохранение токенов и обновление флага
self._tokens = {
"access_token": access_token,
"csrf_token": csrf_token
}
self._authenticated = True
self.logger.info("[COHERENCE_CHECK_PASSED] Аутентификация успешно завершена.")
self.logger.info("[INFO][APIClient.authenticate][SUCCESS] Authenticated successfully.")
return self._tokens
except requests.exceptions.HTTPError as e:
error_msg = f"HTTP Error during authentication: {e.response.status_code} - {e.response.text}"
self.logger.error(f"[AUTH_FAILED] {error_msg}", exc_info=True)
if e.response.status_code == 401: # Unauthorized
raise AuthenticationError(
f"Неверные учетные данные или истекший токен.",
url=login_url, username=self.auth.get("username"),
status_code=e.response.status_code, response_text=e.response.text
) from e
elif e.response.status_code == 403: # Forbidden
raise PermissionDeniedError(
"Недостаточно прав для аутентификации.",
url=login_url, username=self.auth.get("username"),
status_code=e.response.status_code, response_text=e.response.text
) from e
else:
raise SupersetAPIError(
f"API ошибка при аутентификации: {error_msg}",
url=login_url, status_code=e.response.status_code, response_text=e.response.text
) from e
except requests.exceptions.RequestException as e:
self.logger.error(f"[NETWORK_ERROR] Сетевая ошибка при аутентификации: {str(e)}", exc_info=True)
raise NetworkError(f"Ошибка сети при аутентификации: {str(e)}", url=login_url) from e
except KeyError as e:
self.logger.error(f"[AUTH_FAILED] Некорректный формат ответа при аутентификации: {str(e)}", exc_info=True)
raise AuthenticationError(f"Некорректный формат ответа API при аутентификации: {str(e)}") from e
except Exception as e:
self.logger.critical(f"[CRITICAL] Непредвиденная ошибка аутентификации: {str(e)}", exc_info=True)
raise AuthenticationError(f"Непредвиденная ошибка аутентификации: {str(e)}") from e
self.logger.error(f"[ERROR][APIClient.authenticate][FAILURE] Authentication failed: {e}")
raise AuthenticationError(f"Authentication failed: {e}") from e
except (requests.exceptions.RequestException, KeyError) as e:
self.logger.error(f"[ERROR][APIClient.authenticate][FAILURE] Network or parsing error: {e}")
raise NetworkError(f"Network or parsing error during authentication: {e}") from e
@property
def headers(self) -> Dict[str, str]:
"""[INTERFACE] Возвращает стандартные заголовки с текущими токенами.
@semantic: Если токены не получены, пытается выполнить аутентификацию.
@post: Всегда возвращает словарь с 'Authorization' и 'X-CSRFToken'.
@raise: `AuthenticationError` если аутентификация невозможна.
"""
if not self._authenticated:
self.authenticate() # Попытка аутентификации при первом запросе заголовков
# [CONTRACT] Проверка наличия токенов
if not self._tokens or "access_token" not in self._tokens or "csrf_token" not in self._tokens:
self.logger.error("[CONTRACT_VIOLATION] Токены отсутствуют после попытки аутентификации.", extra={"tokens": self._tokens})
raise AuthenticationError("Не удалось получить токены для заголовков.")
self.authenticate()
return {
"Authorization": f"Bearer {self._tokens['access_token']}",
"X-CSRFToken": self._tokens["csrf_token"],
"X-CSRFToken": self._tokens.get("csrf_token", ""),
"Referer": self.base_url,
"Content-Type": "application/json"
}
@@ -198,180 +127,95 @@ class APIClient:
raw_response: bool = False,
**kwargs
) -> Union[requests.Response, Dict[str, Any]]:
"""[NETWORK-CORE] Обертка для всех HTTP-запросов к Superset API.
@semantic:
- Выполняет запрос с заданными параметрами.
- Автоматически добавляет базовые заголовки (токены, CSRF).
- Обрабатывает HTTP-ошибки и преобразует их в типизированные исключения.
- В случае 401/403, пытается обновить токен и повторить запрос один раз.
@pre:
- `method` - валидный HTTP-метод ('GET', 'POST', 'PUT', 'DELETE').
- `endpoint` - валидный путь API.
@post:
- Возвращает объект `requests.Response` (если `raw_response=True`) или `dict` (JSON-ответ).
@raise:
- `AuthenticationError`, `PermissionDeniedError`, `NetworkError`, `SupersetAPIError`, `DashboardNotFoundError`.
"""
self.logger.debug(f"[DEBUG][APIClient.request][ENTER] Requesting {method} {endpoint}")
full_url = f"{self.base_url}{endpoint}"
self.logger.debug(f"[REQUEST] Выполнение запроса: {method} {full_url}", extra={"kwargs_keys": list(kwargs.keys())})
# [STATE] Заголовки для текущего запроса
_headers = self.headers.copy() # Получаем базовые заголовки с актуальными токенами
if headers: # Объединяем с переданными кастомными заголовками (переданные имеют приоритет)
_headers = self.headers.copy()
if headers:
_headers.update(headers)
retries_left = 1 # Одна попытка на обновление токена
while retries_left >= 0:
try:
response = self.session.request(
method,
full_url,
headers=_headers,
#timeout=self.timeout,
**kwargs
)
response.raise_for_status() # Проверяем статус сразу
self.logger.debug(f"[COHERENCE_CHECK_PASSED] Запрос {method} {endpoint} успешно выполнен.")
return response if raw_response else response.json()
try:
response = self.session.request(
method,
full_url,
headers=_headers,
timeout=self.request_settings.get("timeout", DEFAULT_TIMEOUT),
**kwargs
)
response.raise_for_status()
self.logger.debug(f"[DEBUG][APIClient.request][SUCCESS] Request successful for {method} {endpoint}")
return response if raw_response else response.json()
except requests.exceptions.HTTPError as e:
self.logger.error(f"[ERROR][APIClient.request][FAILURE] HTTP error for {method} {endpoint}: {e}")
self._handle_http_error(e, endpoint, context={})
except requests.exceptions.RequestException as e:
self.logger.error(f"[ERROR][APIClient.request][FAILURE] Network error for {method} {endpoint}: {e}")
self._handle_network_error(e, full_url)
except requests.exceptions.HTTPError as e:
status_code = e.response.status_code
error_context = {
"method": method,
"url": full_url,
"status_code": status_code,
"response_text": e.response.text
}
if status_code in [401, 403] and retries_left > 0:
self.logger.warning(f"[AUTH_REFRESH] Токен истек или недействителен ({status_code}). Попытка обновить и повторить...", extra=error_context)
try:
self.authenticate() # Попытка обновить токены
_headers = self.headers.copy() # Обновляем заголовки с новыми токенами
if headers:
_headers.update(headers)
retries_left -= 1
continue # Повторяем цикл
except AuthenticationError as auth_err:
self.logger.error("[AUTH_FAILED] Не удалось обновить токены.", exc_info=True)
raise PermissionDeniedError("Аутентификация не удалась или права отсутствуют после обновления токена.", **error_context) from auth_err
# [ERROR_MAPPING] Преобразование стандартных HTTP-ошибок в кастомные исключения
if status_code == 404:
raise DashboardNotFoundError(endpoint, context=error_context) from e
elif status_code == 403:
raise PermissionDeniedError("Доступ запрещен.", **error_context) from e
elif status_code == 401:
raise AuthenticationError("Аутентификация не удалась.", **error_context) from e
else:
raise SupersetAPIError(f"Ошибка API: {status_code} - {e.response.text}", **error_context) from e
except requests.exceptions.Timeout as e:
self.logger.error(f"[NETWORK_ERROR] Таймаут запроса: {str(e)}", exc_info=True, extra={"url": full_url})
raise NetworkError("Таймаут запроса", url=full_url) from e
except requests.exceptions.ConnectionError as e:
self.logger.error(f"[NETWORK_ERROR] Ошибка соединения: {str(e)}", exc_info=True, extra={"url": full_url})
raise NetworkError("Ошибка соединения", url=full_url) from e
except requests.exceptions.RequestException as e:
self.logger.critical(f"[CRITICAL] Неизвестная ошибка запроса: {str(e)}", exc_info=True, extra={"url": full_url})
raise NetworkError(f"Неизвестная сетевая ошибка: {str(e)}", url=full_url) from e
except json.JSONDecodeError as e:
self.logger.error(f"[API_FAILED] Ошибка парсинга JSON ответа: {str(e)}", exc_info=True, extra={"url": full_url, "response_text_sample": response.text[:200]})
raise SupersetAPIError(f"Некорректный JSON ответ: {str(e)}", url=full_url) from e
except Exception as e:
self.logger.critical(f"[CRITICAL] Непредвиденная ошибка в APIClient.request: {str(e)}", exc_info=True, extra={"url": full_url})
raise SupersetAPIError(f"Непредвиденная ошибка: {str(e)}", url=full_url) from e
# [COHERENCE_CHECK_FAILED] Если дошли сюда, значит, все повторные попытки провалились
self.logger.error(f"[CONTRACT_VIOLATION] Все повторные попытки для запроса {method} {endpoint} исчерпаны.")
raise SupersetAPIError(f"Все повторные попытки запроса {method} {endpoint} исчерпаны.")
def _handle_http_error(self, e, endpoint, context):
status_code = e.response.status_code
if status_code == 404:
raise DashboardNotFoundError(endpoint, context=context) from e
if status_code == 403:
raise PermissionDeniedError("Доступ запрещен.", **context) from e
if status_code == 401:
raise AuthenticationError("Аутентификация не удалась.", **context) from e
raise SupersetAPIError(f"Ошибка API: {status_code} - {e.response.text}", **context) from e
def _handle_network_error(self, e, url):
if isinstance(e, requests.exceptions.Timeout):
msg = "Таймаут запроса"
elif isinstance(e, requests.exceptions.ConnectionError):
msg = "Ошибка соединения"
else:
msg = f"Неизвестная сетевая ошибка: {e}"
raise NetworkError(msg, url=url) from e
def upload_file(
self,
endpoint: str,
file_obj: Union[str, Path, BinaryIO], # Может быть Path, str или байтовый поток
file_name: str,
form_field: str = "file",
file_info: Dict[str, Any],
extra_data: Optional[Dict] = None,
timeout: Optional[int] = None
) -> Dict:
"""[CONTRACT] Отправка файла на сервер через POST-запрос.
@pre:
- `endpoint` - валидный API endpoint для загрузки.
- `file_obj` - путь к файлу или открытый бинарный файловый объект.
- `file_name` - имя файла для отправки в форме.
@post:
- Возвращает JSON-ответ от сервера в виде словаря.
@raise:
- `FileNotFoundError`: Если `file_obj` является путем и файл не найден.
- `PermissionDeniedError`: Если недостаточно прав.
- `SupersetAPIError`, `NetworkError`.
"""
self.logger.info(f"[INFO][APIClient.upload_file][ENTER] Uploading file to {endpoint}")
full_url = f"{self.base_url}{endpoint}"
_headers = self.headers.copy()
# [IMPORTANT] Content-Type для files формируется requests, поэтому удаляем его из общих заголовков
_headers.pop('Content-Type', None)
files_payload = None
should_close_file = False
_headers.pop('Content-Type', None)
file_obj = file_info.get("file_obj")
file_name = file_info.get("file_name")
form_field = file_info.get("form_field", "file")
if isinstance(file_obj, (str, Path)):
file_path = Path(file_obj)
if not file_path.exists():
self.logger.error(f"[CONTRACT_VIOLATION] Файл для загрузки не найден: {file_path}", extra={"file_path": str(file_path)})
raise FileNotFoundError(f"Файл {file_path} не найден для загрузки.")
files_payload = {form_field: (file_name, open(file_path, 'rb'), 'application/x-zip-compressed')}
should_close_file = True
self.logger.debug(f"[UPLOAD] Загрузка файла из пути: {file_path}")
elif isinstance(file_obj, io.BytesIO): # In-memory binary file
with open(file_obj, 'rb') as file_to_upload:
files_payload = {form_field: (file_name, file_to_upload, 'application/x-zip-compressed')}
return self._perform_upload(full_url, files_payload, extra_data, _headers, timeout)
elif isinstance(file_obj, io.BytesIO):
files_payload = {form_field: (file_name, file_obj.getvalue(), 'application/x-zip-compressed')}
self.logger.debug(f"[UPLOAD] Загрузка файла из байтового потока (in-memory).")
elif hasattr(file_obj, 'read') and hasattr(file_obj, 'seek'): # Generic binary file-like object
return self._perform_upload(full_url, files_payload, extra_data, _headers, timeout)
elif hasattr(file_obj, 'read'):
files_payload = {form_field: (file_name, file_obj, 'application/x-zip-compressed')}
self.logger.debug(f"[UPLOAD] Загрузка файла из файлового объекта.")
return self._perform_upload(full_url, files_payload, extra_data, _headers, timeout)
else:
self.logger.error(f"[CONTRACT_VIOLATION] Неподдерживаемый тип файла для загрузки: {type(file_obj).__name__}")
raise TypeError("Неподдерживаемый тип 'file_obj'. Ожидается Path, str, io.BytesIO или другой файлоподобный объект.")
self.logger.error(f"[ERROR][APIClient.upload_file][FAILURE] Unsupported file_obj type: {type(file_obj)}")
raise TypeError(f"Неподдерживаемый тип 'file_obj': {type(file_obj)}")
def _perform_upload(self, url, files, data, headers, timeout):
self.logger.debug(f"[DEBUG][APIClient._perform_upload][ENTER] Performing upload to {url}")
try:
response = self.session.post(
url=full_url,
files=files_payload,
data=extra_data or {},
headers=_headers,
timeout=timeout or self.timeout
url=url,
files=files,
data=data or {},
headers=headers,
timeout=timeout or self.request_settings.get("timeout")
)
response.raise_for_status()
# [COHERENCE_CHECK_PASSED] Файл успешно загружен.
self.logger.info(f"[UPLOAD_SUCCESS] Файл '{file_name}' успешно загружен на {endpoint}.")
self.logger.info(f"[INFO][APIClient._perform_upload][SUCCESS] Upload successful to {url}")
return response.json()
except requests.exceptions.HTTPError as e:
error_context = {
"endpoint": endpoint,
"file": file_name,
"status_code": e.response.status_code,
"response_text": e.response.text
}
if e.response.status_code == 403:
raise PermissionDeniedError("Доступ запрещен для загрузки файла.", **error_context) from e
else:
raise SupersetAPIError(f"Ошибка API при загрузке файла: {e.response.status_code} - {e.response.text}", **error_context) from e
self.logger.error(f"[ERROR][APIClient._perform_upload][FAILURE] HTTP error during upload: {e}")
raise SupersetAPIError(f"Ошибка API при загрузке: {e.response.text}") from e
except requests.exceptions.RequestException as e:
error_context = {"endpoint": endpoint, "file": file_name, "error_type": type(e).__name__}
self.logger.error(f"[NETWORK_ERROR] Ошибка запроса при загрузке файла: {str(e)}", exc_info=True, extra=error_context)
raise NetworkError(f"Ошибка сети при загрузке файла: {str(e)}", url=full_url) from e
except Exception as e:
error_context = {"endpoint": endpoint, "file": file_name, "error_type": type(e).__name__}
self.logger.critical(f"[CRITICAL] Непредвиденная ошибка при загрузке файла: {str(e)}", exc_info=True, extra=error_context)
raise SupersetAPIError(f"Непредвиденная ошибка загрузки файла: {str(e)}", context=error_context) from e
finally:
# Закрываем файл, если он был открыт в этом методе
if should_close_file and files_payload and files_payload[form_field] and hasattr(files_payload[form_field][1], 'close'):
files_payload[form_field][1].close()
self.logger.debug(f"[UPLOAD] Закрыт файл '{file_name}'.")
self.logger.error(f"[ERROR][APIClient._perform_upload][FAILURE] Network error during upload: {e}")
raise NetworkError(f"Ошибка сети при загрузке: {e}", url=url) from e
def fetch_paginated_count(
self,
@@ -380,100 +224,41 @@ class APIClient:
count_field: str = "count",
timeout: Optional[int] = None
) -> int:
"""[CONTRACT] Получение общего количества элементов в пагинированном API.
@delegates:
- Использует `self.request` для выполнения HTTP-запроса.
@pre:
- `endpoint` должен указывать на пагинированный ресурс.
- `query_params` должны быть валидны для запроса количества.
@post:
- Возвращает целочисленное количество элементов.
@raise:
- `NetworkError`, `SupersetAPIError`, `KeyError` (если `count_field` не найден).
"""
self.logger.debug(f"[PAGINATION] Запрос количества элементов для {endpoint} с параметрами: {query_params}")
try:
response_json = self.request(
method="GET",
endpoint=endpoint,
params={"q": json.dumps(query_params)},
timeout=timeout or self.timeout
)
if count_field not in response_json:
self.logger.error(
f"[CONTRACT_VIOLATION] Ответ API для {endpoint} не содержит поле '{count_field}'",
extra={"response_keys": list(response_json.keys())}
)
raise KeyError(f"Ответ API для {endpoint} не содержит поле '{count_field}'")
count = response_json[count_field]
self.logger.debug(f"[COHERENCE_CHECK_PASSED] Получено количество: {count} для {endpoint}.")
return count
except (KeyError, SupersetAPIError, NetworkError, PermissionDeniedError, DashboardNotFoundError) as e:
self.logger.error(f"[ERROR] Ошибка получения количества элементов для {endpoint}: {str(e)}", exc_info=True, extra=getattr(e, 'context', {}))
raise
except Exception as e:
error_ctx = {"endpoint": endpoint, "params": query_params, "error_type": type(e).__name__}
self.logger.critical(f"[CRITICAL] Непредвиденная ошибка при получении количества: {str(e)}", exc_info=True, extra=error_ctx)
raise SupersetAPIError(f"Непредвиденная ошибка при получении count для {endpoint}: {str(e)}", context=error_ctx) from e
self.logger.debug(f"[DEBUG][APIClient.fetch_paginated_count][ENTER] Fetching paginated count for {endpoint}")
response_json = self.request(
method="GET",
endpoint=endpoint,
params={"q": json.dumps(query_params)},
timeout=timeout or self.request_settings.get("timeout")
)
count = response_json.get(count_field, 0)
self.logger.debug(f"[DEBUG][APIClient.fetch_paginated_count][SUCCESS] Fetched paginated count: {count}")
return count
def fetch_paginated_data(
self,
endpoint: str,
base_query: Dict,
total_count: int,
results_field: str = "result",
pagination_options: Dict[str, Any],
timeout: Optional[int] = None
) -> List[Any]:
"""[CONTRACT] Получение всех данных с пагинированного API.
@delegates:
- Использует `self.request` для выполнения запросов по страницам.
@pre:
- `base_query` должен содержать 'page_size'.
- `total_count` должен быть корректным общим количеством элементов.
@post:
- Возвращает список всех собранных данных со всех страниц.
@raise:
- `NetworkError`, `SupersetAPIError`, `ValueError` (если `page_size` невалиден), `KeyError`.
"""
self.logger.debug(f"[PAGINATION] Запуск получения всех данных для {endpoint}. Total: {total_count}, Base Query: {base_query}")
self.logger.debug(f"[DEBUG][APIClient.fetch_paginated_data][ENTER] Fetching paginated data for {endpoint}")
base_query = pagination_options.get("base_query", {})
total_count = pagination_options.get("total_count", 0)
results_field = pagination_options.get("results_field", "result")
page_size = base_query.get('page_size')
if not page_size or page_size <= 0:
self.logger.error("[CONTRACT_VIOLATION] 'page_size' в базовом запросе невалиден.", extra={"page_size": page_size})
raise ValueError("Параметр 'page_size' должен быть положительным числом.")
raise ValueError("'page_size' должен быть положительным числом.")
total_pages = (total_count + page_size - 1) // page_size
results = []
for page in range(total_pages):
query = {**base_query, 'page': page}
self.logger.debug(f"[PAGINATION] Запрос страницы {page+1}/{total_pages} для {endpoint}.")
try:
response_json = self.request(
method="GET",
endpoint=endpoint,
params={"q": json.dumps(query)},
timeout=timeout or self.timeout
)
if results_field not in response_json:
self.logger.warning(
f"[CONTRACT_VIOLATION] Ответ API для {endpoint} на странице {page} не содержит поле '{results_field}'",
extra={"response_keys": list(response_json.keys())}
)
# Если поле результатов отсутствует на одной странице, это может быть не фатально, но надо залогировать.
continue
results.extend(response_json[results_field])
except (SupersetAPIError, NetworkError, PermissionDeniedError, DashboardNotFoundError) as e:
self.logger.error(f"[ERROR] Ошибка получения страницы {page+1} для {endpoint}: {str(e)}", exc_info=True, extra=getattr(e, 'context', {}))
raise # Пробрасываем ошибку выше, так как не можем продолжить пагинацию
except Exception as e:
error_ctx = {"endpoint": endpoint, "page": page, "error_type": type(e).__name__}
self.logger.critical(f"[CRITICAL] Непредвиденная ошибка при получении страницы {page+1} для {endpoint}: {str(e)}", exc_info=True, extra=error_ctx)
raise SupersetAPIError(f"Непредвиденная ошибка пагинации для {endpoint}: {str(e)}", context=error_ctx) from e
self.logger.debug(f"[COHERENCE_CHECK_PASSED] Все данные с пагинацией для {endpoint} успешно собраны. Всего элементов: {len(results)}")
return results
response_json = self.request(
method="GET",
endpoint=endpoint,
params={"q": json.dumps(query)},
timeout=timeout or self.request_settings.get("timeout")
)
page_results = response_json.get(results_field, [])
results.extend(page_results)
self.logger.debug(f"[DEBUG][APIClient.fetch_paginated_data][SUCCESS] Fetched paginated data. Total items: {len(results)}")
return results