Files
ss-tools/superset_tool/client.py
Volobuev Andrey c0a6ca7769 2
2025-06-27 15:20:29 +03:00

505 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# [MODULE] Superset API Client
# @contract: Реализует полное взаимодействие с Superset API
# @semantic_layers:
# 1. Авторизация/CSRF
# 2. Основные операции (дашборды)
# 3. Импорт/экспорт
# @coherence:
# - Согласован с models.SupersetConfig
# - Полная обработка всех errors из exceptions.py
# [IMPORTS] Стандартная библиотека
import json
from typing import Optional, Dict, Tuple, List, Any, Literal, Union
import datetime
from pathlib import Path
# [IMPORTS] Сторонние библиотеки
import requests
import urllib3
import zipfile
# [IMPORTS] Локальные модули
from superset_tool.models import SupersetConfig
from superset_tool.exceptions import (
AuthenticationError,
SupersetAPIError,
DashboardNotFoundError,
NetworkError,
PermissionDeniedError,
ExportError,
InvalidZipFormatError
)
from superset_tool.utils.fileio import get_filename_from_headers
from superset_tool.utils.logger import SupersetLogger
from superset_tool.utils.network import APIClient
# [CONSTANTS] Логирование
HTTP_METHODS = Literal['GET', 'POST', 'PUT', 'DELETE']
DEFAULT_TIMEOUT = 30 # seconds
# [TYPE-ALIASES] Для сложных сигнатур
JsonType = Union[Dict[str, Any], List[Dict[str, Any]]]
ResponseType = Tuple[bytes, str]
# [CHECK] Валидация импортов для контрактов
try:
# Проверка наличия ключевых зависимостей
assert requests.__version__ >= '2.28.0' # для retry механизмов
assert urllib3.__version__ >= '1.26.0' # для SSL warnings
# Проверка локальных модулей
from .utils.fileio import get_filename_from_headers as fileio_check
assert callable(fileio_check)
except (ImportError, AssertionError) as imp_err:
raise RuntimeError(
f"[COHERENCE_CHECK_FAILED] Импорт не прошел валидацию: {str(imp_err)}"
) from imp_err
class SupersetClient:
"""[MAIN-CONTRACT] Клиент для работы с Superset API
@pre:
- config должен быть валидным SupersetConfig
- Целевой API доступен
@post:
- Все методы возвращают данные или вызывают явные ошибки
- Токены автоматически обновляются
@invariant:
- Сессия остается валидной между вызовами
- Все ошибки типизированы согласно exceptions.py
"""
def __init__(self, config: SupersetConfig):
"""[INIT] Инициализация клиента
@semantic:
- Создает сессию requests
- Настраивает адаптеры подключения
- Выполняет первичную аутентификацию
"""
self._validate_config(config)
self.config = config
self.logger = config.logger or SupersetLogger(name="client")
self.network = APIClient(
base_url=config.base_url,
auth=config.auth,
verify_ssl=config.verify_ssl
)
self.tokens = self.network.authenticate()
try:
self.logger.info(
"[COHERENCE_CHECK_PASSED] Клиент успешно инициализирован",
extra={"base_url": config.base_url}
)
except Exception as e:
self.logger.error(
"[INIT_FAILED] Ошибка инициализации клиента",
exc_info=True,
extra={"config": config.dict()}
)
raise
def _validate_config(self, config: SupersetConfig) -> None:
"""[PRECONDITION] Валидация конфигурации клиента
@semantic:
- Проверяет обязательные поля
- Валидирует URL и учетные данные
@raise:
- ValueError при невалидных параметрах
- TypeError при некорректном типе
"""
if not isinstance(config, SupersetConfig):
self.logger.error(
"[CONFIG_VALIDATION_FAILED] Некорректный тип конфигурации",
extra={"actual_type": type(config).__name__}
)
raise TypeError("Конфигурация должна быть экземпляром SupersetConfig")
required_fields = ["base_url", "auth"]
for field in required_fields:
if not getattr(config, field, None):
self.logger.error(
"[CONFIG_VALIDATION_FAILED] Отсутствует обязательное поле",
extra={"missing_field": field}
)
raise ValueError(f"Обязательное поле {field} не указано")
if not config.auth.get("username") or not config.auth.get("password"):
self.logger.error(
"[CONFIG_VALIDATION_FAILED] Не указаны учетные данные",
extra={"auth_keys": list(config.auth.keys())}
)
raise ValueError("В конфигурации должны быть указаны username и password")
# Дополнительная валидация URL
if not config.base_url.startswith(("http://", "https://")):
self.logger.error(
"[CONFIG_VALIDATION_FAILED] Некорректный URL",
extra={"base_url": config.base_url}
)
raise ValueError("base_url должен начинаться с http:// или https://")
@property
def headers(self) -> dict:
"""[INTERFACE] Базовые заголовки для API-вызовов
@semantic: Объединяет общие заголовки для всех запросов
@post: Всегда возвращает актуальные токены
"""
return {
"Authorization": f"Bearer {self.tokens['access_token']}",
"X-CSRFToken": self.tokens["csrf_token"],
"Referer": self.config.base_url,
"Content-Type": "application/json"
}
# [MAIN-OPERATIONS] Работа с дашбордами
def get_dashboard(self, dashboard_id_or_slug: str) -> dict:
"""[CONTRACT] Получение метаданных дашборда
@pre:
- dashboard_id_or_slug должен существовать
- Клиент должен быть аутентифицирован (tokens актуальны)
@post:
- Возвращает dict с метаданными дашборда
- В случае 404 вызывает DashboardNotFoundError
@semantic_layers:
1. Взаимодействие с API через APIClient
2. Обработка специфичных для Superset ошибок
"""
try:
response = self.network.request(
method="GET",
endpoint=f"/dashboard/{dashboard_id_or_slug}",
headers=self.headers # Автоматически включает токены
)
return response.json()["result"]
except requests.HTTPError as e:
if e.response.status_code == 404:
raise DashboardNotFoundError(
dashboard_id_or_slug,
context={"url": f"{self.config.base_url}/dashboard/{dashboard_id_or_slug}"}
)
raise SupersetAPIError(
f"API Error: {str(e)}",
status_code=e.response.status_code
) from e
# [ERROR-HANDLER] Централизованная обработка ошибок
def _handle_api_error(self, method_name: str, error: Exception, url: str) -> None:
"""[UNIFIED-ERROR] Обработка API-ошибок
@semantic: Преобразует requests исключения в наши типы
"""
context = {
"method": method_name,
"url": url,
"status_code": getattr(error.response, 'status_code', None)
}
if isinstance(error, requests.Timeout):
raise NetworkError("Request timeout", context=context) from error
elif getattr(error.response, 'status_code', None) == 403:
raise PermissionDeniedError(context=context) from error
else:
raise SupersetAPIError(str(error), context=context) from error
# [SECTION] EXPORT OPERATIONS
def export_dashboard(self, dashboard_id: int) -> Tuple[bytes, str]:
"""[CONTRACT] Экспорт дашборда в ZIP-архив
@pre:
- dashboard_id должен существовать
- Пользователь имеет права на экспорт
@post:
- Возвращает кортеж (бинарное содержимое, имя файла)
- Имя файла извлекается из headers или генерируется
@errors:
- DashboardNotFoundError если дашборд не существует
- ExportError при проблемах экспорта
"""
url = f"{self.config.base_url}/dashboard/export/"
self.logger.debug(
"[EXPORT_START] Запуск экспорта",
extra={"dashboard_id": dashboard_id, "export_url": url}
)
try:
response = self._execute_export_request(dashboard_id, url)
self._validate_export_response(response, dashboard_id)
filename = self._resolve_export_filename(response, dashboard_id)
return response.content, filename
except requests.exceptions.HTTPError as http_err:
error_ctx = {
"dashboard_id": dashboard_id,
"status_code": http_err.response.status_code
}
if http_err.response.status_code == 404:
self.logger.error(
"[EXPORT_FAILED] Дашборд не найден",
extra=error_ctx
)
raise DashboardNotFoundError(dashboard_id, context=error_ctx)
raise ExportError("HTTP ошибка экспорта", context=error_ctx) from http_err
except requests.exceptions.RequestException as req_err:
error_ctx = {"dashboard_id": dashboard_id}
self.logger.error(
"[EXPORT_FAILED] Ошибка запроса",
exc_info=True,
extra=error_ctx
)
raise ExportError("Ошибка экспорта", context=error_ctx) from req_err
def _execute_export_request(self, dashboard_id: int, url: str) -> requests.Response:
"""[HELPER] Выполнение запроса экспорта
@coherence_check:
- Ответ должен иметь status_code 200
- Content-Type: application/zip
"""
response = self.network.request(
method="GET",
endpoint="/dashboard/export/",
params={"q": f"[{dashboard_id}]"},
raw_response=True # Для получения бинарного содержимого
)
response.raise_for_status()
return response
def _validate_export_response(self, response: requests.Response, dashboard_id: int) -> None:
"""[HELPER] Валидация ответа экспорта
@semantic:
- Проверка Content-Type
- Проверка наличия данных
"""
if 'application/zip' not in response.headers.get('Content-Type', ''):
self.logger.error(
"[EXPORT_VALIDATION_FAILED] Неверный Content-Type",
extra={
"dashboard_id": dashboard_id,
"content_type": response.headers.get('Content-Type')
}
)
raise ExportError("Получен не ZIP-архив")
if not response.content:
self.logger.error(
"[EXPORT_VALIDATION_FAILED] Пустой ответ",
extra={"dashboard_id": dashboard_id}
)
raise ExportError("Получены пустые данные")
def _resolve_export_filename(self, response: requests.Response, dashboard_id: int) -> str:
"""[HELPER] Определение имени экспортируемого файла
@fallback: Генерирует имя если не найден заголовок
"""
filename = get_filename_from_headers(response.headers)
if not filename:
filename = f"dashboard_export_{dashboard_id}_{datetime.now().strftime('%Y%m%d')}.zip"
self.logger.debug(
"[EXPORT_FALLBACK] Используется сгенерированное имя файла",
extra={"filename": filename}
)
return filename
def export_to_file(self, dashboard_id: int, output_dir: Union[str, Path]) -> Path:
"""[CONTRACT] Экспорт дашборда прямо в файл
@pre:
- output_dir должен существовать
- Доступ на запись в директорию
@post:
- Возвращает Path сохраненного файла
- Создает поддиректорию с именем дашборда
"""
output_dir = Path(output_dir)
if not output_dir.exists():
self.logger.error(
"[EXPORT_PRE_FAILED] Директория не существует",
extra={"output_dir": str(output_dir)}
)
raise FileNotFoundError(f"Директория {output_dir} не найдена")
content, filename = self.export_dashboard(dashboard_id)
target_path = output_dir / filename
try:
with open(target_path, 'wb') as f:
f.write(content)
self.logger.info(
"[EXPORT_SUCCESS] Дашборд сохранен на диск",
extra={
"dashboard_id": dashboard_id,
"file_path": str(target_path),
"file_size": len(content)
}
)
return target_path
except IOError as io_err:
self.logger.error(
"[EXPORT_IO_FAILED] Ошибка записи файла",
exc_info=True,
extra={"target_path": str(target_path)}
)
raise ExportError("Ошибка сохранения файла") from io_err
# [SECTION] Основной интерфейс API
def get_dashboards(self, query: Optional[Dict] = None) -> Tuple[int, List[Dict]]:
"""[CONTRACT] Получение списка дашбордов с пагинацией
@pre:
- Клиент должен быть авторизован
- Параметры пагинации должны быть валидны
@post:
- Возвращает кортеж (total_count, список метаданных)
- Поддерживает кастомные query-параметры
@invariant:
- Всегда возвращает полный список (обходит пагинацию)
"""
url = f"{self.config.base_url}/dashboard/"
self.logger.debug(
"[API_CALL] Запрос списка дашбордов",
extra={"query": query}
)
# [COHERENCE_CHECK] Валидация параметров
validated_query = self._validate_query_params(query)
try:
# Инициализация пагинации
total_count = self._fetch_total_count()
paginated_data = self._fetch_all_pages(validated_query, total_count)
self.logger.info(
"[API_SUCCESS] Дашборды получены",
extra={"count": total_count}
)
return total_count, paginated_data
except requests.exceptions.RequestException as e:
error_ctx = {"method": "get_dashboards", "query": validated_query}
self._handle_api_error("Пагинация дашбордов", e, error_ctx)
# [SECTION] Импорт
def import_dashboard(self, file_name: Union[str, Path]) -> Dict:
"""[CONTRACT] Импорт дашборда из архива
@pre:
- Файл должен существовать и быть валидным ZIP
- Должны быть права на импорт
@post:
- Возвращает метаданные импортированного дашборда
- При конфликтах выполняет overwrite
"""
self._validate_import_file(file_name)
self.logger.debug(
"[IMPORT_START] Инициирован импорт дашборда",
extra={"file": file_name}
)
try:
return self.network.upload_file(
endpoint="/dashboard/import/",
file_obj=file_name,
file_name=file_name,
form_field="formData",
extra_data={'overwrite': 'true'},
timeout=self.config.timeout * 2
)
except PermissionDeniedError as e:
self.logger.error(
"[IMPORT_AUTH_FAILED] Недостаточно прав для импорта",
exc_info=True
)
raise
except Exception as e:
self.logger.error(
"[IMPORT_FAILED] Ошибка импорта дашборда",
exc_info=True,
extra={"file": file_name}
)
raise SupersetAPIError(f"Ошибка импорта: {str(e)}") from e
# [SECTION] Приватные методы-помощники
def _validate_query_params(self, query: Optional[Dict]) -> Dict:
"""[HELPER] Нормализация параметров запроса"""
base_query = {
"columns": ["slug", "id", "changed_on_utc", "dashboard_title", "published"],
"page": 0,
"page_size": 20
}
return {**base_query, **(query or {})}
def _fetch_total_count(self) -> int:
"""[CONTRACT][HELPER] Получение общего кол-ва дашбордов в системе
@delegates:
- Сетевой запрос -> APIClient
- Обработка ответа -> собственный метод
@errors:
- SupersetAPIError при проблемах с API
"""
query_params = {
'columns': ['id'],
'page': 0,
'page_size': 1
}
try:
return self.network.fetch_paginated_count(
endpoint="/dashboard/",
query_params=query_params,
count_field="count"
)
except requests.exceptions.RequestException as e:
raise SupersetAPIError(f"Ошибка получения количества дашбордов: {str(e)}")
def _fetch_all_pages(self, query: Dict, total_count: int) -> List[Dict]:
"""[HELPER] Обход всех страниц с пагинацией"""
"""[CONTRACT] Получение всех данных с пагинированного API
@delegates:
- Сетевые запросы -> APIClient.fetch_paginated_data()
@params:
query: оригинальный query-объект (без page)
total_count: общее количество элементов
@return:
Список всех элементов
@errors:
- SupersetAPIError: проблемы с API
- ValueError: некорректные параметры пагинации
"""
try:
if not query.get('page_size'):
raise ValueError("Отсутствует page_size в query параметрах")
return self.network.fetch_paginated_data(
endpoint="/dashboard/",
base_query=query,
total_count=total_count,
results_field="result"
)
except (requests.exceptions.RequestException, ValueError) as e:
error_ctx = {
"query": query,
"total_count": total_count,
"error": str(e)
}
self.logger.error("[PAGINATION_ERROR]", extra=error_ctx)
raise SupersetAPIError(f"Ошибка пагинации: {str(e)}") from e
def _validate_import_file(self, zip_path: Union[str, Path]) -> None:
"""[HELPER] Проверка файла перед импортом"""
path = Path(zip_path)
if not path.exists():
raise FileNotFoundError(f"[FILE_ERROR] {zip_path} не существует")
if not zipfile.is_zipfile(path):
raise InvalidZipFormatError(f"[FILE_ERROR] {zip_path} не ZIP-архив")
with zipfile.ZipFile(path) as zf:
if not any(n.endswith('metadata.yaml') for n in zf.namelist()):
raise DashboardNotFoundError("Архив не содержит metadata.yaml")