Files
ss-tools/superset_tool/client.py
Volobuev Andrey 04fa28f086 fractal refactor
2025-06-26 17:53:04 +03:00

596 lines
25 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# [MODULE] Superset API Client
# @contract: Реализует полное взаимодействие с Superset API
# @semantic_layers:
# 1. Авторизация/CSRF
# 2. Основные операции (дашборды)
# 3. Импорт/экспорт
# @coherence:
# - Согласован с models.SupersetConfig
# - Полная обработка всех errors из exceptions.py
# [IMPORTS] Стандартная библиотека
import json
from typing import Optional, Dict, Tuple, List, Any, Literal, Union,BinaryIO
from pathlib import Path
# [IMPORTS] Сторонние библиотеки
import requests
import urllib3
from pydantic import BaseModel, Field
from requests.exceptions import HTTPError
# [IMPORTS] Локальные модули
from .models import SupersetConfig
from .exceptions import (
AuthenticationError,
SupersetAPIError,
DashboardNotFoundError,
NetworkError,
PermissionDeniedError,
ExportError
)
from .utils.fileio import get_filename_from_headers
from .utils.logger import SupersetLogger
# [CONSTANTS] Логирование
HTTP_METHODS = Literal['GET', 'POST', 'PUT', 'DELETE']
DEFAULT_TIMEOUT = 30 # seconds
# [TYPE-ALIASES] Для сложных сигнатур
JsonType = Union[Dict[str, Any], List[Dict[str, Any]]]
ResponseType = Tuple[bytes, str]
# [CHECK] Валидация импортов для контрактов
try:
# Проверка наличия ключевых зависимостей
assert requests.__version__ >= '2.28.0' # для retry механизмов
assert urllib3.__version__ >= '1.26.0' # для SSL warnings
# Проверка локальных модулей
from .utils.fileio import get_filename_from_headers as fileio_check
assert callable(fileio_check)
except (ImportError, AssertionError) as imp_err:
raise RuntimeError(
f"[COHERENCE_CHECK_FAILED] Импорт не прошел валидацию: {str(imp_err)}"
) from imp_err
class SupersetClient:
"""[MAIN-CONTRACT] Клиент для работы с Superset API
@pre:
- config должен быть валидным SupersetConfig
- Целевой API доступен
@post:
- Все методы возвращают данные или вызывают явные ошибки
- Токены автоматически обновляются
@invariant:
- Сессия остается валидной между вызовами
- Все ошибки типизированы согласно exceptions.py
"""
def __init__(self, config: SupersetConfig):
"""[INIT] Инициализация клиента
@semantic:
- Создает сессию requests
- Настраивает адаптеры подключения
- Выполняет первичную аутентификацию
"""
self._validate_config(config)
self.config = config
self.logger = config.logger or SupersetLogger(name="client")
self.session = self._setup_session()
try:
self._authenticate()
self.logger.info(
"[COHERENCE_CHECK_PASSED] Клиент успешно инициализирован",
extra={"base_url": config.base_url}
)
except Exception as e:
self.logger.error(
"[INIT_FAILED] Ошибка инициализации клиента",
exc_info=True,
extra={"config": config.dict()}
)
raise
def _validate_config(self, config: SupersetConfig) -> None:
"""[PRECONDITION] Валидация конфигурации клиента
@semantic:
- Проверяет обязательные поля
- Валидирует URL и учетные данные
@raise:
- ValueError при невалидных параметрах
- TypeError при некорректном типе
"""
if not isinstance(config, SupersetConfig):
self.logger.error(
"[CONFIG_VALIDATION_FAILED] Некорректный тип конфигурации",
extra={"actual_type": type(config).__name__}
)
raise TypeError("Конфигурация должна быть экземпляром SupersetConfig")
required_fields = ["base_url", "auth"]
for field in required_fields:
if not getattr(config, field, None):
self.logger.error(
"[CONFIG_VALIDATION_FAILED] Отсутствует обязательное поле",
extra={"missing_field": field}
)
raise ValueError(f"Обязательное поле {field} не указано")
if not config.auth.get("username") or not config.auth.get("password"):
self.logger.error(
"[CONFIG_VALIDATION_FAILED] Не указаны учетные данные",
extra={"auth_keys": list(config.auth.keys())}
)
raise ValueError("В конфигурации должны быть указаны username и password")
# Дополнительная валидация URL
if not config.base_url.startswith(("http://", "https://")):
self.logger.error(
"[CONFIG_VALIDATION_FAILED] Некорректный URL",
extra={"base_url": config.base_url}
)
raise ValueError("base_url должен начинаться с http:// или https://")
# [CHUNK] Настройка сессии и адаптеров
def _setup_session(self) -> requests.Session:
"""[INTERNAL] Конфигурация HTTP-сессии
@coherence_check: SSL verification должен соответствовать config
"""
session = requests.Session()
adapter = requests.adapters.HTTPAdapter(
max_retries=3,
pool_connections=10,
pool_maxsize=100
)
session.mount('https://', adapter)
session.verify = self.config.verify_ssl
if not self.config.verify_ssl:
urllib3.disable_warnings()
self.logger.debug(
"[CONFIG] SSL verification отключен",
extra={"base_url": self.config.base_url}
)
return session
# [CHUNK] Процесс аутентификации
def _authenticate(self):
"""[AUTH-FLOW] Получение токенов
@semantic_steps:
1. Получение access_token
2. Получение CSRF токена
@error_handling:
- AuthenticationError при проблемах credentials
- NetworkError при проблемах связи
"""
try:
# [STEP 1] Получение bearer token
login_url = f"{self.config.base_url}/security/login"
response = self.session.post(
login_url,
json={
"username": self.config.auth["username"],
"password": self.config.auth["password"],
"provider": self.config.auth["provider"],
"refresh": self.config.auth["refresh"]
},
timeout=self.config.timeout
)
if response.status_code == 401:
raise AuthenticationError(
"Invalid credentials",
context={
"endpoint": login_url,
"username": self.config.auth["username"],
"status_code": response.status_code
}
)
response.raise_for_status()
self.access_token = response.json()["access_token"]
# [STEP 2] Получение CSRF token
csrf_url = f"{self.config.base_url}/security/csrf_token/"
response = self.session.get(
csrf_url,
headers={"Authorization": f"Bearer {self.access_token}"},
timeout=self.config.timeout
)
response.raise_for_status()
self.csrf_token = response.json()["result"]
self.logger.info(
"[AUTH_SUCCESS] Токены успешно получены",
extra={
"access_token": f"{self.access_token[:5]}...",
"csrf_token": f"{self.csrf_token[:5]}..."
}
)
except requests.exceptions.RequestException as e:
error_context = {
"method": e.request.method,
"url": e.request.url,
"status_code": getattr(e.response, 'status_code', None)
}
if isinstance(e, (requests.Timeout, requests.ConnectionError)):
raise NetworkError("Connection failed", context=error_context) from e
raise SupersetAPIError("Auth flow failed", context=error_context) from e
@property
def headers(self) -> dict:
"""[INTERFACE] Базовые заголовки для API-вызовов
@semantic: Объединяет общие заголовки для всех запросов
@post: Всегда возвращает актуальные токены
"""
return {
"Authorization": f"Bearer {self.access_token}",
"X-CSRFToken": self.csrf_token,
"Referer": self.config.base_url,
"Content-Type": "application/json"
}
# [MAIN-OPERATIONS] Работа с дашбордами
def get_dashboard(self, dashboard_id_or_slug: str) -> dict:
"""[CONTRACT] Получение метаданных дашборда
@pre:
- dashboard_id_or_slug должен существовать
- Токены должны быть валидны
@post:
- Возвращает полные метаданные
- В случае 404 вызывает DashboardNotFoundError
"""
url = f"{self.config.base_url}/dashboard/{dashboard_id_or_slug}"
try:
response = self.session.get(
url,
headers=self.headers,
timeout=self.config.timeout
)
if response.status_code == 404:
raise DashboardNotFoundError(
dashboard_id_or_slug,
context={"url": url}
)
response.raise_for_status()
return response.json()["result"]
except requests.exceptions.RequestException as e:
self._handle_api_error("get_dashboard", e, url)
def export_dashboard(self, dashboard_id: int) -> tuple[bytes, str]:
"""[CONTRACT] Экспорт дашборда в ZIP
@error_handling:
- DashboardNotFoundError если дашборд не существует
- ExportError при проблемах экспорта
"""
url = f"{self.config.base_url}/dashboard/export/"
try:
response = self.session.get(
url,
headers=self.headers,
params={"q": f"[{dashboard_id}]"},
timeout=self.config.timeout
)
if response.status_code == 404:
raise DashboardNotFoundError(dashboard_id)
response.raise_for_status()
filename = (
get_filename_from_headers(response.headers)
or f"dashboard_{dashboard_id}.zip"
)
return response.content, filename
except requests.exceptions.RequestException as e:
self._handle_api_error("export_dashboard", e, url)
# [ERROR-HANDLER] Централизованная обработка ошибок
def _handle_api_error(self, method_name: str, error: Exception, url: str) -> None:
"""[UNIFIED-ERROR] Обработка API-ошибок
@semantic: Преобразует requests исключения в наши типы
"""
context = {
"method": method_name,
"url": url,
"status_code": getattr(error.response, 'status_code', None)
}
if isinstance(error, requests.Timeout):
raise NetworkError("Request timeout", context=context) from error
elif getattr(error.response, 'status_code', None) == 403:
raise PermissionDeniedError(context=context) from error
else:
raise SupersetAPIError(str(error), context=context) from error
# [SECTION] EXPORT OPERATIONS
def export_dashboard(self, dashboard_id: int) -> Tuple[bytes, str]:
"""[CONTRACT] Экспорт дашборда в ZIP-архив
@pre:
- dashboard_id должен существовать
- Пользователь имеет права на экспорт
@post:
- Возвращает кортеж (бинарное содержимое, имя файла)
- Имя файла извлекается из headers или генерируется
@errors:
- DashboardNotFoundError если дашборд не существует
- ExportError при проблемах экспорта
"""
url = f"{self.config.base_url}/dashboard/export/"
self.logger.debug(
"[EXPORT_START] Запуск экспорта",
extra={"dashboard_id": dashboard_id, "export_url": url}
)
try:
response = self._execute_export_request(dashboard_id, url)
self._validate_export_response(response, dashboard_id)
filename = self._resolve_export_filename(response, dashboard_id)
return response.content, filename
except requests.exceptions.HTTPError as http_err:
error_ctx = {
"dashboard_id": dashboard_id,
"status_code": http_err.response.status_code
}
if http_err.response.status_code == 404:
self.logger.error(
"[EXPORT_FAILED] Дашборд не найден",
extra=error_ctx
)
raise DashboardNotFoundError(dashboard_id, context=error_ctx)
raise ExportError("HTTP ошибка экспорта", context=error_ctx) from http_err
except requests.exceptions.RequestException as req_err:
error_ctx = {"dashboard_id": dashboard_id}
self.logger.error(
"[EXPORT_FAILED] Ошибка запроса",
exc_info=True,
extra=error_ctx
)
raise ExportError("Ошибка экспорта", context=error_ctx) from req_err
def _execute_export_request(self, dashboard_id: int, url: str) -> requests.Response:
"""[HELPER] Выполнение запроса экспорта
@coherence_check:
- Ответ должен иметь status_code 200
- Content-Type: application/zip
"""
response = self.session.get(
url,
headers=self.headers,
params={"q": f"[{dashboard_id}]"},
timeout=self.config.timeout,
stream=True
)
response.raise_for_status()
return response
def _validate_export_response(self, response: requests.Response, dashboard_id: int) -> None:
"""[HELPER] Валидация ответа экспорта
@semantic:
- Проверка Content-Type
- Проверка наличия данных
"""
if 'application/zip' not in response.headers.get('Content-Type', ''):
self.logger.error(
"[EXPORT_VALIDATION_FAILED] Неверный Content-Type",
extra={
"dashboard_id": dashboard_id,
"content_type": response.headers.get('Content-Type')
}
)
raise ExportError("Получен не ZIP-архив")
if not response.content:
self.logger.error(
"[EXPORT_VALIDATION_FAILED] Пустой ответ",
extra={"dashboard_id": dashboard_id}
)
raise ExportError("Получены пустые данные")
def _resolve_export_filename(self, response: requests.Response, dashboard_id: int) -> str:
"""[HELPER] Определение имени экспортируемого файла
@fallback: Генерирует имя если не найден заголовок
"""
filename = get_filename_from_headers(response.headers)
if not filename:
filename = f"dashboard_export_{dashboard_id}_{datetime.now().strftime('%Y%m%d')}.zip"
self.logger.debug(
"[EXPORT_FALLBACK] Используется сгенерированное имя файла",
extra={"filename": filename}
)
return filename
def export_to_file(self, dashboard_id: int, output_dir: Union[str, Path]) -> Path:
"""[CONTRACT] Экспорт дашборда прямо в файл
@pre:
- output_dir должен существовать
- Доступ на запись в директорию
@post:
- Возвращает Path сохраненного файла
- Создает поддиректорию с именем дашборда
"""
output_dir = Path(output_dir)
if not output_dir.exists():
self.logger.error(
"[EXPORT_PRE_FAILED] Директория не существует",
extra={"output_dir": str(output_dir)}
)
raise FileNotFoundError(f"Директория {output_dir} не найдена")
content, filename = self.export_dashboard(dashboard_id)
target_path = output_dir / filename
try:
with open(target_path, 'wb') as f:
f.write(content)
self.logger.info(
"[EXPORT_SUCCESS] Дашборд сохранен на диск",
extra={
"dashboard_id": dashboard_id,
"file_path": str(target_path),
"file_size": len(content)
}
)
return target_path
except IOError as io_err:
self.logger.error(
"[EXPORT_IO_FAILED] Ошибка записи файла",
exc_info=True,
extra={"target_path": str(target_path)}
)
raise ExportError("Ошибка сохранения файла") from io_err
# [SECTION] Основной интерфейс API
def get_dashboards(self, query: Optional[Dict] = None) -> Tuple[int, List[Dict]]:
"""[CONTRACT] Получение списка дашбордов с пагинацией
@pre:
- Клиент должен быть авторизован
- Параметры пагинации должны быть валидны
@post:
- Возвращает кортеж (total_count, список метаданных)
- Поддерживает кастомные query-параметры
@invariant:
- Всегда возвращает полный список (обходит пагинацию)
"""
url = f"{self.config.base_url}/dashboard/"
self.logger.debug(
"[API_CALL] Запрос списка дашбордов",
extra={"query": query}
)
# [COHERENCE_CHECK] Валидация параметров
validated_query = self._validate_query_params(query)
try:
# Инициализация пагинации
total_count = self._fetch_total_count(url)
paginated_data = self._fetch_all_pages(url, validated_query, total_count)
self.logger.info(
"[API_SUCCESS] Дашборды получены",
extra={"count": total_count}
)
return total_count, paginated_data
except requests.exceptions.RequestException as e:
error_ctx = {"method": "get_dashboards", "query": validated_query}
self._handle_api_error("Пагинация дашбордов", e, error_ctx)
# [SECTION] Импорт/экспорт
def import_dashboard(self, zip_path: Union[str, Path]) -> Dict:
"""[CONTRACT] Импорт дашборда из архива
@pre:
- Файл должен существовать и быть валидным ZIP
- Должны быть права на импорт
@post:
- Возвращает метаданные импортированного дашборда
- При конфликтах выполняет overwrite
"""
self._validate_import_file(zip_path)
try:
with open(zip_path, 'rb') as f:
return self._execute_import(
file_obj=f,
file_name=Path(zip_path).name
)
except Exception as e:
self.logger.error(
"[IMPORT_FAILED] Критическая ошибка импорта",
exc_info=True,
extra={"file": str(zip_path)}
)
raise DashboardImportError(f"Import failed: {str(e)}") from e
# [SECTION] Приватные методы-помощники
def _validate_query_params(self, query: Optional[Dict]) -> Dict:
"""[HELPER] Нормализация параметров запроса"""
base_query = {
"columns": ["slug", "id", "changed_on_utc", "dashboard_title", "published"],
"page": 0,
"page_size": 20
}
return {**base_query, **(query or {})}
def _fetch_total_count(self, url: str) -> int:
"""[HELPER] Получение общего количества дашбордов"""
count_response = self.session.get(
f"{url}?q={json.dumps({'columns': ['id'], 'page': 0, 'page_size': 1})}",
headers=self.headers,
timeout=self.config.timeout
)
count_response.raise_for_status()
return count_response.json()['count']
def _fetch_all_pages(self, url: str, query: Dict, total_count: int) -> List[Dict]:
"""[HELPER] Обход всех страниц с пагинацией"""
results = []
page_size = query['page_size']
total_pages = (total_count + page_size - 1) // page_size
for page in range(total_pages):
query['page'] = page
response = self.session.get(
url,
headers=self.headers,
params={"q": json.dumps(query)},
timeout=self.config.timeout
)
response.raise_for_status()
results.extend(response.json().get('result', []))
return results
def _validate_import_file(self, zip_path: Union[str, Path]) -> None:
"""[HELPER] Проверка файла перед импортом"""
path = Path(zip_path)
if not path.exists():
raise FileNotFoundError(f"[FILE_ERROR] {zip_path} не существует")
if not zipfile.is_zipfile(path):
raise InvalidZipFormatError(f"[FILE_ERROR] {zip_path} не ZIP-архив")
with zipfile.ZipFile(path) as zf:
if not any(n.endswith('metadata.yaml') for n in zf.namelist()):
raise DashboardNotFoundError("Архив не содержит metadata.yaml")
def _execute_import(self, file_obj: BinaryIO, file_name: str) -> Dict:
"""[HELPER] Выполнение API-запроса импорта"""
url = f"{self.config.base_url}/dashboard/import/"
files = {'formData': (file_name, file_obj, 'application/x-zip-compressed')}
headers = {k: v for k, v in self.headers.items() if k.lower() != 'content-type'}
response = self.session.post(
url,
files=files,
data={'overwrite': 'true'},
headers=headers,
timeout=self.config.timeout * 2 # Увеличенный таймаут для импорта
)
if response.status_code == 403:
raise PermissionDeniedError("Недостаточно прав для импорта")
response.raise_for_status()
return response.json()