Files
ss-tools/superset_tool/client.py
Volobuev Andrey 03731e2d77 add errors
2025-04-24 18:10:02 +03:00

336 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import requests
from requests.exceptions import HTTPError
import urllib3
import json
from typing import Dict, Optional, Tuple, List, Any
from pydantic import BaseModel, Field
from .utils.fileio import *
from .exceptions import *
from .models import SupersetConfig
from .utils.logger import SupersetLogger
class SupersetClient:
def __init__(self, config: SupersetConfig):
self.config = config
self.logger = config.logger or SupersetLogger(console=False)
self.session = requests.Session()
self._setup_session()
self._authenticate()
def _setup_session(self):
adapter = requests.adapters.HTTPAdapter(
max_retries=3,
pool_connections=10,
pool_maxsize=100
)
if not self.config.verify_ssl:
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
self.logger.debug(f"Проверка сертификатов SSL отключена")
self.session.mount('https://', adapter)
self.session.verify = self.config.verify_ssl
def _authenticate(self):
try:
# Сначала логинимся для получения access_token
login_url = f"{self.config.base_url}/security/login"
response = self.session.post(
login_url,
json={
"username": self.config.auth["username"],
"password": self.config.auth["password"],
"provider": self.config.auth["provider"],
"refresh": True
},
verify=self.config.verify_ssl
)
response.raise_for_status()
self.access_token = response.json()["access_token"]
self.logger.info(
f"Токен Bearer {self.access_token} получен c {login_url}")
# Затем получаем CSRF токен с использованием access_token
csrf_url = f"{self.config.base_url}/security/csrf_token/"
response = self.session.get(
csrf_url,
headers={"Authorization": f"Bearer {self.access_token}"},
verify=self.config.verify_ssl
)
response.raise_for_status()
self.csrf_token = response.json()["result"]
self.logger.info(
f"Токен CSRF {self.csrf_token} получен c {csrf_url}")
except HTTPError as e:
if e.response.status_code == 401:
error_msg = f"Неверные данные для аутенфикации для {login_url}" if "login" in e.request.url else f"Не удалось получить CSRF токен с {csrf_url}"
self.logger.error(f"Ошибка получения: {error_msg}")
raise AuthenticationError(
f"{error_msg}. Проверь данные аутенфикации") from e
raise
@property
def headers(self):
return {
"Authorization": f"Bearer {self.access_token}",
"X-CSRFToken": self.csrf_token,
"Referer": self.config.base_url,
"Content-Type": "application/json"
}
def get_dashboard(self, dashboard_id_or_slug: str) -> Dict:
"""
Получаем информацию по дашборду (если передан dashboard_id_or_slug)
Параметры:
:dashboard_id_or_slug - id или короткая ссылка
"""
url = f"{self.config.base_url}/dashboard/{dashboard_id_or_slug}"
self.logger.debug(f"Получаем информацию по дашборду с /{url}...")
try:
response = self.session.get(
url,
headers=self.headers,
timeout=self.config.timeout
)
response.raise_for_status()
self.logger.info(f"ОК - Получили информацию по дашборду с {response.url}")
return response.json()["result"]
except requests.exceptions.RequestException as e:
self.logger.error(
f"Ошибка при получении информации о дашборде: {str(e)}", exc_info=True)
raise SupersetAPIError(
f"Ошибка при получении информации о дашборде: {str(e)}") from e
def get_dashboards(self, query: Optional[Dict] = None) -> Tuple[int, List[Dict]]:
"""
Получаем информацию по всем дашбордам с учетом пагинации.
Параметры:
query (Optional[Dict]): Дополнительные параметры запроса, включая пагинацию и фильтры.
Возвращает:
Tuple[int, List[Dict]]: Кортеж, содержащий общее количество дашбордов и список всех дашбордов.
"""
url = f"{self.config.base_url}/dashboard/"
self.logger.debug(f"Получаем информацию по дашбордам с {url}...")
modified_query: Dict = {}
all_results: List[Dict] = []
total_count: int = 0
current_page: int = 0
q_param = '{ "columns": [ "id" ], "page": 0, "page_size": 20}'
try:
response = self.session.get(
url=f"{url}?q={q_param}",
#params={"q": json.dumps(default_query)}, # Передаем такой body, иначе на prodta отдает 1 дашборд
headers=self.headers,
timeout=self.config.timeout
)
total_count = response.json()['count']
self.logger.info(
f"ОК - Получили кол-во дашбордов ({total_count}) с {url}")
self.logger.info(f"Запрос - {response.url}")
except requests.exceptions.RequestException as e:
self.logger.error(
f"Ошибка при получении кол-ва дашбордов: {str(e)}", exc_info=True)
raise SupersetAPIError(
f"Ошибка при получении кол-ва дашбордов: {str(e)}") from e
# Инициализация параметров запроса с учетом переданного query
if query:
modified_query = query.copy()
# Убедимся, что page_size установлен, если не передан
modified_query.setdefault("page_size", 20)
else:
modified_query = {
"columns": [
"slug",
"id",
"changed_on_utc",
"dashboard_title",
"published"
],
"page": 0,
"page_size": 20
}
page_size = modified_query["page_size"]
total_pages = (total_count + page_size - 1) // page_size
try:
while current_page < total_pages:
modified_query["page"] = current_page
response = self.session.get(
url,
headers=self.headers,
params={"q": json.dumps(modified_query)},
timeout=self.config.timeout
)
response.raise_for_status()
data = response.json()
all_results.extend(data.get("result", []))
current_page += 1
self.logger.info(f"ОК - Получили информацию по дашбордам с {url}")
# Проверка, достигли ли последней страницы
return total_count, all_results
except requests.exceptions.RequestException as e:
self.logger.error(
f"Ошибка при получении информации о дашбордах: {str(e)}", exc_info=True)
raise SupersetAPIError(
f"Ошибка при получении информации о дашбордах: {str(e)}") from e
def export_dashboard(self, dashboard_id: int, logger: Optional[SupersetLogger] = None) -> Tuple[bytes, str]:
"""Экспортирует дашборд из Superset в виде ZIP-архива и возвращает его содержимое с именем файла.
Параметры:
:dashboard_id (int): Идентификатор дашборда для экспорта
Возвращает:
Tuple[bytes, str]: Кортеж, содержащий:
- bytes: Бинарное содержимое ZIP-архива с дашбордом
- str: Имя файла (из заголовков ответа или в формате dashboard_{id}.zip)
Исключения:
SupersetAPIError: Вызывается при ошибках:
- Проблемы с сетью/соединением
- Невалидный ID дашборда
- Отсутствие прав доступа
- Ошибки сервера (status code >= 400)
Пример использования:
content, filename = client.export_dashboard(5)
with open(filename, 'wb') as f:
f.write(content)
Примечания:
- Для экспорта используется API Endpoint: /dashboard/export/
- Имя файла пытается извлечь из Content-Disposition заголовка
- По умолчанию возвращает имя в формате dashboard_{id}.zip
- Архив содержит JSON-метаданные и связанные элементы (датасеты, чарты)
"""
url = f"{self.config.base_url}/dashboard/export/"
params = {"q": f"[{dashboard_id}]"}
logger = logger or SupersetLogger(name="client", console=False)
self.logger.debug(f"Экспортируем дашборд ID {dashboard_id} c {url}...")
try:
response = self.session.get(
url,
headers=self.headers,
params=params,
timeout=self.config.timeout
)
response.raise_for_status()
filename = get_filename_from_headers(
response.headers) or f"dashboard_{dashboard_id}.zip"
self.logger.info(f"Дашборд сохранен в {filename}")
return response.content, filename
except requests.exceptions.RequestException as e:
self.logger.error(f"Ошибка при экспорте: {str(e)}", exc_info=True)
raise SupersetAPIError(f"Export failed: {str(e)}") from e
def import_dashboard(self, zip_path) -> Dict:
"""Импортирует дашборд в Superset из ZIP-архива с детальной обработкой ошибок.
Параметры:
zip_path (Union[str, Path]): Путь к ZIP-файлу с дашбордом
Возвращает:
dict: Ответ API в формате JSON с результатами импорта
Пример использования:
result = client.import_dashboard(Path("my_dashboard.zip"))
print(f"Импортирован дашборд: {result['title']}")
Примечания:
- Использует API Endpoint: /dashboard/import/
- Автоматически устанавливает overwrite=true для перезаписи существующих дашбордов
- Удваивает стандартный таймаут для обработки длительных операций
- Удаляет Content-Type заголовок (автоматически генерируется для multipart/form-data)
- Архив должен содержать валидные JSON-метаданные в формате Superset
- Ответ может содержать информацию о созданных/обновленных ресурсах (датасеты, чарты, владельцы)
- При конфликте имен может потребоваться ручное разрешение через параметры импорта
"""
url = f"{self.config.base_url}/dashboard/import/"
self.logger.debug(f"Импортируем дашборд ID {zip_path} на {url}...")
# Валидация входного файла
try:
if not Path(zip_path).exists():
raise FileNotFoundError(f"Файл не найден: {zip_path}")
if not zipfile.is_zipfile(zip_path):
raise InvalidZipFormatError(f"Файл не является ZIP-архивом: {zip_path}")
# Дополнительная проверка содержимого архива
with zipfile.ZipFile(zip_path) as zf:
if not any(name.endswith('metadata.yaml') for name in zf.namelist()):
raise DashboardNotFoundError("Архив не содержит metadata.yaml")
except (FileNotFoundError, InvalidZipFormatError, DashboardNotFoundError) as e:
self.logger.error(f"Ошибка валидации архива: {str(e)}", exc_info=True)
raise
headers = {
k: v for k, v in self.headers.items()
if k.lower() != 'content-type'
}
try:
with open(zip_path, 'rb') as f:
files = {
'formData': (
Path(zip_path).name,
f,
'application/x-zip-compressed'
)
}
response = self.session.post(
url,
files=files,
data={'overwrite': 'true'},
headers=headers,
timeout=self.config.timeout * 2
)
# Обработка HTTP-ошибок
if response.status_code == 404:
raise DashboardNotFoundError("Эндпоинт импорта не найден")
elif response.status_code == 403:
raise PermissionDeniedError("Недостаточно прав для импорта")
elif response.status_code >= 500:
raise SupersetServerError(f"Ошибка сервера: {response.status_code}")
response.raise_for_status()
self.logger.info(f"Дашборд успешно импортирован из {zip_path}")
return response.json()
except requests.exceptions.ConnectionError as e:
error_msg = f"Ошибка соединения: {str(e)}"
self.logger.error(error_msg, exc_info=True)
raise NetworkError(error_msg) from e
except requests.exceptions.Timeout as e:
error_msg = f"Таймаут при импорте дашборда"
self.logger.error(error_msg, exc_info=True)
raise NetworkError(error_msg) from e
except requests.exceptions.RequestException as e:
error_msg = f"Ошибка при импорте: {str(e)}"
self.logger.error(error_msg, exc_info=True)
raise DashboardImportError(error_msg) from e
except Exception as e:
error_msg = f"Неожиданная ошибка: {str(e)}"
self.logger.critical(error_msg, exc_info=True)
raise DashboardImportError(error_msg) from e