Files
ss-tools/superset_tool/client.py
Volobuev Andrey 625b50a6d2 Backup
2025-04-08 16:38:58 +03:00

305 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import requests
from requests.exceptions import HTTPError
import urllib3
import json
from typing import Dict, Optional, Tuple, List, Any
from pydantic import BaseModel, Field
from .utils.fileio import *
from .exceptions import *
from .models import SupersetConfig
from .utils.logger import SupersetLogger
class SupersetClient:
def __init__(self, config: SupersetConfig):
self.config = config
self.logger = config.logger or SupersetLogger(console=False)
self.session = requests.Session()
self._setup_session()
self._authenticate()
def _setup_session(self):
adapter = requests.adapters.HTTPAdapter(
max_retries=3,
pool_connections=10,
pool_maxsize=100
)
if not self.config.verify_ssl:
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
self.logger.debug(f"Проверка сертификатов SSL отключена")
self.session.mount('https://', adapter)
self.session.verify = self.config.verify_ssl
def _authenticate(self):
try:
# Сначала логинимся для получения access_token
login_url = f"{self.config.base_url}/security/login"
response = self.session.post(
login_url,
json={
"username": self.config.auth["username"],
"password": self.config.auth["password"],
"provider": self.config.auth["provider"],
"refresh": True
},
verify=self.config.verify_ssl
)
response.raise_for_status()
self.access_token = response.json()["access_token"]
self.logger.info(
f"Токен Bearer {self.access_token} получен c {login_url}")
# Затем получаем CSRF токен с использованием access_token
csrf_url = f"{self.config.base_url}/security/csrf_token/"
response = self.session.get(
csrf_url,
headers={"Authorization": f"Bearer {self.access_token}"},
verify=self.config.verify_ssl
)
response.raise_for_status()
self.csrf_token = response.json()["result"]
self.logger.info(
f"Токен CSRF {self.csrf_token} получен c {csrf_url}")
except HTTPError as e:
if e.response.status_code == 401:
error_msg = f"Неверные данные для аутенфикации для {login_url}" if "login" in e.request.url else f"Не удалось получить CSRF токен с {csrf_url}"
self.logger.error(f"Ошибка получения: {error_msg}")
raise AuthenticationError(
f"{error_msg}. Проверь данные аутенфикации") from e
raise
@property
def headers(self):
return {
"Authorization": f"Bearer {self.access_token}",
"X-CSRFToken": self.csrf_token,
"Referer": self.config.base_url,
"Content-Type": "application/json"
}
def get_dashboard(self, dashboard_id_or_slug: str) -> Dict:
"""
Получаем информацию по дашборду (если передан dashboard_id_or_slug)
Параметры:
:dashboard_id_or_slug - id или короткая ссылка
"""
url = f"{self.config.base_url}/dashboard/{dashboard_id_or_slug}"
self.logger.debug(f"Получаем информацию по дашборду с /{url}...")
try:
response = self.session.get(
url,
headers=self.headers,
timeout=self.config.timeout
)
response.raise_for_status()
self.logger.info(f"ОК - Получили информацию по дашборду с {response.url}")
return response.json()["result"]
except requests.exceptions.RequestException as e:
self.logger.error(
f"Ошибка при получении информации о дашборде: {str(e)}", exc_info=True)
raise SupersetAPIError(
f"Ошибка при получении информации о дашборде: {str(e)}") from e
def get_dashboards(self, query: Optional[Dict] = None) -> Tuple[int, List[Dict]]:
"""
Получаем информацию по всем дашбордам с учетом пагинации.
Параметры:
query (Optional[Dict]): Дополнительные параметры запроса, включая пагинацию и фильтры.
Возвращает:
Tuple[int, List[Dict]]: Кортеж, содержащий общее количество дашбордов и список всех дашбордов.
"""
url = f"{self.config.base_url}/dashboard/"
self.logger.debug(f"Получаем информацию по дашбордам с {url}...")
modified_query: Dict = {}
all_results: List[Dict] = []
total_count: int = 0
current_page: int = 0
q_param = '{ "columns": [ "id" ], "page": 0, "page_size": 20}'
try:
response = self.session.get(
url=f"{url}?q={q_param}",
#params={"q": json.dumps(default_query)}, # Передаем такой body, иначе на prodta отдает 1 дашборд
headers=self.headers,
timeout=self.config.timeout
)
total_count = response.json()['count']
self.logger.info(
f"ОК - Получили кол-во дашбордов ({total_count}) с {url}")
self.logger.info(f"Запрос - {response.url}")
except requests.exceptions.RequestException as e:
self.logger.error(
f"Ошибка при получении кол-ва дашбордов: {str(e)}", exc_info=True)
raise SupersetAPIError(
f"Ошибка при получении кол-ва дашбордов: {str(e)}") from e
# Инициализация параметров запроса с учетом переданного query
if query:
modified_query = query.copy()
# Убедимся, что page_size установлен, если не передан
modified_query.setdefault("page_size", 20)
else:
modified_query = {
"columns": [
"slug",
"id",
"changed_on_utc",
"dashboard_title",
"published"
],
"page": 0,
"page_size": 20
}
page_size = modified_query["page_size"]
total_pages = (total_count + page_size - 1) // page_size
try:
while current_page < total_pages:
modified_query["page"] = current_page
response = self.session.get(
url,
headers=self.headers,
params={"q": json.dumps(modified_query)},
timeout=self.config.timeout
)
response.raise_for_status()
data = response.json()
all_results.extend(data.get("result", []))
current_page += 1
self.logger.info(f"ОК - Получили информацию по дашбордам с {url}")
# Проверка, достигли ли последней страницы
return total_count, all_results
except requests.exceptions.RequestException as e:
self.logger.error(
f"Ошибка при получении информации о дашбордах: {str(e)}", exc_info=True)
raise SupersetAPIError(
f"Ошибка при получении информации о дашбордах: {str(e)}") from e
def export_dashboard(self, dashboard_id: int, logger: Optional[SupersetLogger] = None) -> Tuple[bytes, str]:
"""Экспортирует дашборд из Superset в виде ZIP-архива и возвращает его содержимое с именем файла.
Параметры:
:dashboard_id (int): Идентификатор дашборда для экспорта
Возвращает:
Tuple[bytes, str]: Кортеж, содержащий:
- bytes: Бинарное содержимое ZIP-архива с дашбордом
- str: Имя файла (из заголовков ответа или в формате dashboard_{id}.zip)
Исключения:
SupersetAPIError: Вызывается при ошибках:
- Проблемы с сетью/соединением
- Невалидный ID дашборда
- Отсутствие прав доступа
- Ошибки сервера (status code >= 400)
Пример использования:
content, filename = client.export_dashboard(5)
with open(filename, 'wb') as f:
f.write(content)
Примечания:
- Для экспорта используется API Endpoint: /dashboard/export/
- Имя файла пытается извлечь из Content-Disposition заголовка
- По умолчанию возвращает имя в формате dashboard_{id}.zip
- Архив содержит JSON-метаданные и связанные элементы (датасеты, чарты)
"""
url = f"{self.config.base_url}/dashboard/export/"
params = {"q": f"[{dashboard_id}]"}
logger = logger or SupersetLogger(name="client", console=False)
self.logger.debug(f"Экспортируем дашборд ID {dashboard_id} c {url}...")
try:
response = self.session.get(
url,
headers=self.headers,
params=params,
timeout=self.config.timeout
)
response.raise_for_status()
filename = get_filename_from_headers(
response.headers) or f"dashboard_{dashboard_id}.zip"
self.logger.info(f"Дашборд сохранен в {filename}")
return response.content, filename
except requests.exceptions.RequestException as e:
self.logger.error(f"Ошибка при экспорте: {str(e)}", exc_info=True)
raise SupersetAPIError(f"Export failed: {str(e)}") from e
def import_dashboard(self, zip_path) -> Dict:
"""Импортирует дашборд в Superset из ZIP-архива.
Параметры:
zip_path (Path): Путь к ZIP-файлу с дашбордом для импорта
Возвращает:
dict: Ответ API в формате JSON с результатами импорта
Исключения:
RuntimeError: Вызывается при:
- Ошибках сети/соединения
- Невалидном формате ZIP-архива
- Конфликте прав доступа
- Ошибках сервера (status code >= 400)
- Попытке перезаписи без соответствующих прав
Пример использования:
result = client.import_dashboard(Path("my_dashboard.zip"))
print(f"Импортирован дашборд: {result['title']}")
Примечания:
- Использует API Endpoint: /dashboard/import/
- Автоматически устанавливает overwrite=true для перезаписи существующих дашбордов
- Удваивает стандартный таймаут для обработки длительных операций
- Удаляет Content-Type заголовок (автоматически генерируется для multipart/form-data)
- Архив должен содержать валидные JSON-метаданные в формате Superset
- Ответ может содержать информацию о созданных/обновленных ресурсах (датасеты, чарты, владельцы)
- При конфликте имен может потребоваться ручное разрешение через параметры импорта
"""
url = f"{self.config.base_url}/dashboard/import/"
self.logger.debug(f"Импортируем дашборд ID {zip_path} на {url}...")
headers_without_content_type = {
k: v for k, v in self.headers.items() if k.lower() != 'content-type'}
zip_name = zip_path.name
# Подготавливаем данные для multipart/form-data
with open(zip_path, 'rb') as f:
files = {
'formData': (
zip_name, # Имя файла
f, # Файловый объект
'application/x-zip-compressed' # MIME-тип из curl
)
}
# Отправляем запрос
response = self.session.post(
url,
files=files,
data={'overwrite': 'true'},
headers=headers_without_content_type,
timeout=self.config.timeout * 2 # Longer timeout for imports
)
# Обрабатываем ответ
try:
response.raise_for_status()
self.logger.info(f"Дашборд импортирован успешно")
return response.json()
except requests.exceptions.HTTPError as e:
self.logger.error(f"Ошибка при импорте: {str(e)}", exc_info=True)
error_detail = f"{e.response.status_code} {e.response.reason}"
if e.response.text:
error_detail += f"\nТело ответа: {e.response.text}"
raise RuntimeError(f"Ошибка импорта: {error_detail}") from e