Files
ss-tools/superset_tool/client.py
Volobuev Andrey 2f8aea3620 fix url check
2025-08-26 17:39:11 +03:00

321 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# pylint: disable=too-many-arguments,too-many-locals,too-many-statements,too-many-branches,unused-argument
"""
[MODULE] Superset API Client
@contract: Реализует полное взаимодействие с Superset API
"""
# [IMPORTS] Стандартная библиотека
import json
from typing import Optional, Dict, Tuple, List, Any, Union
import datetime
from pathlib import Path
import zipfile
from requests import Response
# [IMPORTS] Локальные модули
from superset_tool.models import SupersetConfig
from superset_tool.exceptions import (
ExportError,
InvalidZipFormatError
)
from superset_tool.utils.fileio import get_filename_from_headers
from superset_tool.utils.logger import SupersetLogger
from superset_tool.utils.network import APIClient
# [CONSTANTS]
DEFAULT_TIMEOUT = 30
# [TYPE-ALIASES]
JsonType = Union[Dict[str, Any], List[Dict[str, Any]]]
ResponseType = Tuple[bytes, str]
class SupersetClient:
"""[MAIN-CONTRACT] Клиент для работы с Superset API"""
# [ENTITY: Function('__init__')]
# CONTRACT:
# PURPOSE: Инициализация клиента Superset.
# PRECONDITIONS: `config` должен быть валидным `SupersetConfig`.
# POSTCONDITIONS: Клиент успешно инициализирован.
def __init__(self, config: SupersetConfig, logger: Optional[SupersetLogger] = None):
self.logger = logger or SupersetLogger(name="SupersetClient")
self.logger.info("[INFO][SupersetClient.__init__][ENTER] Initializing SupersetClient.")
self._validate_config(config)
self.config = config
self.env = config.env
self.network = APIClient(
config=config.dict(),
verify_ssl=config.verify_ssl,
timeout=config.timeout,
logger=self.logger
)
self.logger.info("[INFO][SupersetClient.__init__][SUCCESS] SupersetClient initialized successfully.")
# END_FUNCTION___init__
# [ENTITY: Function('_validate_config')]
# CONTRACT:
# PURPOSE: Валидация конфигурации клиента.
# PRECONDITIONS: `config` должен быть экземпляром `SupersetConfig`.
# POSTCONDITIONS: Конфигурация валидна.
def _validate_config(self, config: SupersetConfig) -> None:
self.logger.debug("[DEBUG][SupersetClient._validate_config][ENTER] Validating config.")
if not isinstance(config, SupersetConfig):
self.logger.error("[ERROR][SupersetClient._validate_config][FAILURE] Invalid config type.")
raise TypeError("Конфигурация должна быть экземпляром SupersetConfig")
self.logger.debug("[DEBUG][SupersetClient._validate_config][SUCCESS] Config validated.")
# END_FUNCTION__validate_config
@property
def headers(self) -> dict:
"""[INTERFACE] Базовые заголовки для API-вызовов."""
return self.network.headers
# END_FUNCTION_headers
# [ENTITY: Function('get_dashboards')]
# CONTRACT:
# PURPOSE: Получение списка дашбордов с пагинацией.
# PRECONDITIONS: None
# POSTCONDITIONS: Возвращает кортеж с общим количеством и списком дашбордов.
def get_dashboards(self, query: Optional[Dict] = None) -> Tuple[int, List[Dict]]:
self.logger.info("[INFO][SupersetClient.get_dashboards][ENTER] Getting dashboards.")
validated_query = self._validate_query_params(query)
total_count = self._fetch_total_object_count(endpoint="/dashboard/")
paginated_data = self._fetch_all_pages(
endpoint="/dashboard/",
pagination_options={
"base_query": validated_query,
"total_count": total_count,
"results_field": "result",
}
)
self.logger.info("[INFO][SupersetClient.get_dashboards][SUCCESS] Got dashboards.")
return total_count, paginated_data
# END_FUNCTION_get_dashboards
# [ENTITY: Function('get_dashboard')]
# CONTRACT:
# PURPOSE: Получение метаданных дашборда по ID или SLUG.
# PRECONDITIONS: `dashboard_id_or_slug` должен существовать.
# POSTCONDITIONS: Возвращает метаданные дашборда.
def get_dashboard(self, dashboard_id_or_slug: str) -> dict:
self.logger.info(f"[INFO][SupersetClient.get_dashboard][ENTER] Getting dashboard: {dashboard_id_or_slug}")
response_data = self.network.request(
method="GET",
endpoint=f"/dashboard/{dashboard_id_or_slug}",
)
self.logger.info(f"[INFO][SupersetClient.get_dashboard][SUCCESS] Got dashboard: {dashboard_id_or_slug}")
return response_data.get("result", {})
# END_FUNCTION_get_dashboard
# [ENTITY: Function('get_datasets')]
# CONTRACT:
# PURPOSE: Получение списка датасетов с пагинацией.
# PRECONDITIONS: None
# POSTCONDITIONS: Возвращает кортеж с общим количеством и списком датасетов.
def get_datasets(self, query: Optional[Dict] = None) -> Tuple[int, List[Dict]]:
self.logger.info("[INFO][SupersetClient.get_datasets][ENTER] Getting datasets.")
total_count = self._fetch_total_object_count(endpoint="/dataset/")
base_query = {
"columns": ["id", "table_name", "sql", "database", "schema"],
"page": 0,
"page_size": 100
}
validated_query = {**base_query, **(query or {})}
datasets = self._fetch_all_pages(
endpoint="/dataset/",
pagination_options={
"base_query": validated_query,
"total_count": total_count,
"results_field": "result",
}
)
self.logger.info("[INFO][SupersetClient.get_datasets][SUCCESS] Got datasets.")
return total_count, datasets
# END_FUNCTION_get_datasets
# [ENTITY: Function('get_dataset')]
# CONTRACT:
# PURPOSE: Получение метаданных датасета по ID.
# PRECONDITIONS: `dataset_id` должен существовать.
# POSTCONDITIONS: Возвращает метаданные датасета.
def get_dataset(self, dataset_id: str) -> dict:
self.logger.info(f"[INFO][SupersetClient.get_dataset][ENTER] Getting dataset: {dataset_id}")
response_data = self.network.request(
method="GET",
endpoint=f"/dataset/{dataset_id}",
)
self.logger.info(f"[INFO][SupersetClient.get_dataset][SUCCESS] Got dataset: {dataset_id}")
return response_data.get("result", {})
# END_FUNCTION_get_dataset
def get_databases(self) -> List[Dict]:
self.logger.info("[INFO][SupersetClient.get_databases][ENTER] Getting databases.")
response = self.network.request("GET", "/database/")
self.logger.info("[INFO][SupersetClient.get_databases][SUCCESS] Got databases.")
return response.get('result', [])
# [ENTITY: Function('export_dashboard')]
# CONTRACT:
# PURPOSE: Экспорт дашборда в ZIP-архив.
# PRECONDITIONS: `dashboard_id` должен существовать.
# POSTCONDITIONS: Возвращает содержимое ZIP-архива и имя файла.
def export_dashboard(self, dashboard_id: int) -> Tuple[bytes, str]:
self.logger.info(f"[INFO][SupersetClient.export_dashboard][ENTER] Exporting dashboard: {dashboard_id}")
response = self.network.request(
method="GET",
endpoint="/dashboard/export/",
params={"q": json.dumps([dashboard_id])},
stream=True,
raw_response=True
)
self._validate_export_response(response, dashboard_id)
filename = self._resolve_export_filename(response, dashboard_id)
content = response.content
self.logger.info(f"[INFO][SupersetClient.export_dashboard][SUCCESS] Exported dashboard: {dashboard_id}")
return content, filename
# END_FUNCTION_export_dashboard
# [ENTITY: Function('_validate_export_response')]
# CONTRACT:
# PURPOSE: Валидация ответа экспорта.
# PRECONDITIONS: `response` должен быть валидным HTTP-ответом.
# POSTCONDITIONS: Ответ валиден.
def _validate_export_response(self, response: Response, dashboard_id: int) -> None:
self.logger.debug(f"[DEBUG][SupersetClient._validate_export_response][ENTER] Validating export response for dashboard: {dashboard_id}")
content_type = response.headers.get('Content-Type', '')
if 'application/zip' not in content_type:
self.logger.error(f"[ERROR][SupersetClient._validate_export_response][FAILURE] Invalid content type: {content_type}")
raise ExportError(f"Получен не ZIP-архив (Content-Type: {content_type})")
if not response.content:
self.logger.error("[ERROR][SupersetClient._validate_export_response][FAILURE] Empty response content.")
raise ExportError("Получены пустые данные при экспорте")
self.logger.debug(f"[DEBUG][SupersetClient._validate_export_response][SUCCESS] Export response validated for dashboard: {dashboard_id}")
# END_FUNCTION__validate_export_response
# [ENTITY: Function('_resolve_export_filename')]
# CONTRACT:
# PURPOSE: Определение имени экспортируемого файла.
# PRECONDITIONS: `response` должен быть валидным HTTP-ответом.
# POSTCONDITIONS: Возвращает имя файла.
def _resolve_export_filename(self, response: Response, dashboard_id: int) -> str:
self.logger.debug(f"[DEBUG][SupersetClient._resolve_export_filename][ENTER] Resolving export filename for dashboard: {dashboard_id}")
filename = get_filename_from_headers(response.headers)
if not filename:
timestamp = datetime.datetime.now().strftime('%Y%m%dT%H%M%S')
filename = f"dashboard_export_{dashboard_id}_{timestamp}.zip"
self.logger.warning(f"[WARNING][SupersetClient._resolve_export_filename][STATE_CHANGE] Could not resolve filename from headers, generated: {filename}")
self.logger.debug(f"[DEBUG][SupersetClient._resolve_export_filename][SUCCESS] Resolved export filename: {filename}")
return filename
# END_FUNCTION__resolve_export_filename
# [ENTITY: Function('export_to_file')]
# CONTRACT:
# PURPOSE: Экспорт дашборда напрямую в файл.
# PRECONDITIONS: `output_dir` должен существовать.
# POSTCONDITIONS: Дашборд сохранен в файл.
def export_to_file(self, dashboard_id: int, output_dir: Union[str, Path]) -> Path:
self.logger.info(f"[INFO][SupersetClient.export_to_file][ENTER] Exporting dashboard {dashboard_id} to file in {output_dir}")
output_dir = Path(output_dir)
if not output_dir.exists():
self.logger.error(f"[ERROR][SupersetClient.export_to_file][FAILURE] Output directory does not exist: {output_dir}")
raise FileNotFoundError(f"Директория {output_dir} не найдена")
content, filename = self.export_dashboard(dashboard_id)
target_path = output_dir / filename
with open(target_path, 'wb') as f:
f.write(content)
self.logger.info(f"[INFO][SupersetClient.export_to_file][SUCCESS] Exported dashboard {dashboard_id} to {target_path}")
return target_path
# END_FUNCTION_export_to_file
# [ENTITY: Function('import_dashboard')]
# CONTRACT:
# PURPOSE: Импорт дашборда из ZIP-архива.
# PRECONDITIONS: `file_name` должен быть валидным ZIP-файлом.
# POSTCONDITIONS: Возвращает ответ API.
def import_dashboard(self, file_name: Union[str, Path]) -> Dict:
self.logger.info(f"[INFO][SupersetClient.import_dashboard][ENTER] Importing dashboard from: {file_name}")
self._validate_import_file(file_name)
import_response = self.network.upload_file(
endpoint="/dashboard/import/",
file_info={
"file_obj": Path(file_name),
"file_name": Path(file_name).name,
"form_field": "formData",
},
extra_data={'overwrite': 'true'},
timeout=self.config.timeout * 2
)
self.logger.info(f"[INFO][SupersetClient.import_dashboard][SUCCESS] Imported dashboard from: {file_name}")
return import_response
# END_FUNCTION_import_dashboard
# [ENTITY: Function('_validate_query_params')]
# CONTRACT:
# PURPOSE: Нормализация и валидация параметров запроса.
# PRECONDITIONS: None
# POSTCONDITIONS: Возвращает валидный словарь параметров.
def _validate_query_params(self, query: Optional[Dict]) -> Dict:
self.logger.debug("[DEBUG][SupersetClient._validate_query_params][ENTER] Validating query params.")
base_query = {
"columns": ["slug", "id", "changed_on_utc", "dashboard_title", "published"],
"page": 0,
"page_size": 1000
}
validated_query = {**base_query, **(query or {})}
self.logger.debug(f"[DEBUG][SupersetClient._validate_query_params][SUCCESS] Validated query params: {validated_query}")
return validated_query
# END_FUNCTION__validate_query_params
# [ENTITY: Function('_fetch_total_object_count')]
# CONTRACT:
# PURPOSE: Получение общего количества объектов.
# PRECONDITIONS: `endpoint` должен быть валидным.
# POSTCONDITIONS: Возвращает общее количество объектов.
def _fetch_total_object_count(self, endpoint:str) -> int:
self.logger.debug(f"[DEBUG][SupersetClient._fetch_total_object_count][ENTER] Fetching total object count for endpoint: {endpoint}")
query_params_for_count = {'page': 0, 'page_size': 1}
count = self.network.fetch_paginated_count(
endpoint=endpoint,
query_params=query_params_for_count,
count_field="count"
)
self.logger.debug(f"[DEBUG][SupersetClient._fetch_total_object_count][SUCCESS] Fetched total object count: {count}")
return count
# END_FUNCTION__fetch_total_object_count
# [ENTITY: Function('_fetch_all_pages')]
# CONTRACT:
# PURPOSE: Обход всех страниц пагинированного API.
# PRECONDITIONS: `pagination_options` должен содержать необходимые параметры.
# POSTCONDITIONS: Возвращает список всех объектов.
def _fetch_all_pages(self, endpoint:str, pagination_options: Dict) -> List[Dict]:
self.logger.debug(f"[DEBUG][SupersetClient._fetch_all_pages][ENTER] Fetching all pages for endpoint: {endpoint}")
all_data = self.network.fetch_paginated_data(
endpoint=endpoint,
pagination_options=pagination_options
)
self.logger.debug(f"[DEBUG][SupersetClient._fetch_all_pages][SUCCESS] Fetched all pages for endpoint: {endpoint}")
return all_data
# END_FUNCTION__fetch_all_pages
# [ENTITY: Function('_validate_import_file')]
# CONTRACT:
# PURPOSE: Проверка файла перед импортом.
# PRECONDITIONS: `zip_path` должен быть путем к файлу.
# POSTCONDITIONS: Файл валиден.
def _validate_import_file(self, zip_path: Union[str, Path]) -> None:
self.logger.debug(f"[DEBUG][SupersetClient._validate_import_file][ENTER] Validating import file: {zip_path}")
path = Path(zip_path)
if not path.exists():
self.logger.error(f"[ERROR][SupersetClient._validate_import_file][FAILURE] Import file does not exist: {zip_path}")
raise FileNotFoundError(f"Файл {zip_path} не существует")
if not zipfile.is_zipfile(path):
self.logger.error(f"[ERROR][SupersetClient._validate_import_file][FAILURE] Import file is not a zip file: {zip_path}")
raise InvalidZipFormatError(f"Файл {zip_path} не является ZIP-архивом")
with zipfile.ZipFile(path, 'r') as zf:
if not any(n.endswith('metadata.yaml') for n in zf.namelist()):
self.logger.error(f"[ERROR][SupersetClient._validate_import_file][FAILURE] Import file does not contain metadata.yaml: {zip_path}")
raise InvalidZipFormatError(f"Архив {zip_path} не содержит 'metadata.yaml'")
self.logger.debug(f"[DEBUG][SupersetClient._validate_import_file][SUCCESS] Validated import file: {zip_path}")
# END_FUNCTION__validate_import_file