Files
ss-tools/superset_tool/client.py
2026-01-19 00:07:06 +03:00

509 lines
33 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# [DEF:superset_tool.client:Module]
#
# @SEMANTICS: superset, api, client, rest, http, dashboard, dataset, import, export
# @PURPOSE: Предоставляет высокоуровневый клиент для взаимодействия с Superset REST API, инкапсулируя логику запросов, обработку ошибок и пагинацию.
# @LAYER: Domain
# @RELATION: DEPENDS_ON -> superset_tool.models
# @RELATION: DEPENDS_ON -> superset_tool.exceptions
# @RELATION: DEPENDS_ON -> superset_tool.utils
#
# @INVARIANT: All network operations must use the internal APIClient instance.
# @CONSTRAINT: No direct use of 'requests' library outside of APIClient.
# @PUBLIC_API: SupersetClient
# [SECTION: IMPORTS]
import json
import zipfile
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, Union, cast
from requests import Response
from superset_tool.models import SupersetConfig
from superset_tool.exceptions import ExportError, InvalidZipFormatError
from superset_tool.utils.fileio import get_filename_from_headers
from superset_tool.utils.logger import SupersetLogger, belief_scope
from superset_tool.utils.network import APIClient
# [/SECTION]
# [DEF:SupersetClient:Class]
# @PURPOSE: Класс-обёртка над Superset REST API, предоставляющий методы для работы с дашбордами и датасетами.
# @RELATION: CREATES_INSTANCE_OF -> APIClient
# @RELATION: USES -> SupersetConfig
class SupersetClient:
# [DEF:__init__:Function]
# @PURPOSE: Инициализирует клиент, проверяет конфигурацию и создает сетевой клиент.
# @PRE: `config` должен быть валидным объектом SupersetConfig.
# @POST: Атрибуты `logger`, `config`, и `network` созданы и готовы к работе.
# @PARAM: config (SupersetConfig) - Конфигурация подключения.
# @PARAM: logger (Optional[SupersetLogger]) - Экземпляр логгера.
def __init__(self, config: SupersetConfig, logger: Optional[SupersetLogger] = None):
with belief_scope("__init__"):
self.logger = logger or SupersetLogger(name="SupersetClient")
self.logger.info("[SupersetClient.__init__][Enter] Initializing SupersetClient.")
self._validate_config(config)
self.config = config
self.network = APIClient(
config=config.dict(),
verify_ssl=config.verify_ssl,
timeout=config.timeout,
logger=self.logger,
)
self.delete_before_reimport: bool = False
self.logger.info("[SupersetClient.__init__][Exit] SupersetClient initialized.")
# [/DEF:__init__:Function]
# [DEF:_validate_config:Function]
# @PURPOSE: Проверяет, что переданный объект конфигурации имеет корректный тип.
# @PRE: `config` должен быть передан.
# @POST: Если проверка пройдена, выполнение продолжается.
# @THROW: TypeError - Если `config` не является экземпляром `SupersetConfig`.
# @PARAM: config (SupersetConfig) - Объект для проверки.
def _validate_config(self, config: SupersetConfig) -> None:
with belief_scope("_validate_config"):
self.logger.debug("[_validate_config][Enter] Validating SupersetConfig.")
assert isinstance(config, SupersetConfig), "Конфигурация должна быть экземпляром SupersetConfig"
self.logger.debug("[_validate_config][Exit] Config is valid.")
# [/DEF:_validate_config:Function]
@property
# [DEF:headers:Function]
# @PURPOSE: Возвращает базовые HTTP-заголовки, используемые сетевым клиентом.
# @PRE: self.network должен быть инициализирован.
# @POST: Возвращаемый словарь содержит актуальные заголовки, включая токен авторизации.
def headers(self) -> dict:
with belief_scope("headers"):
return self.network.headers
# [/DEF:headers:Function]
# [DEF:get_dashboards:Function]
# @PURPOSE: Получает полный список дашбордов, автоматически обрабатывая пагинацию.
# @RELATION: CALLS -> self._fetch_total_object_count
# @RELATION: CALLS -> self._fetch_all_pages
# @PRE: self.network должен быть инициализирован.
# @POST: Возвращаемый список содержит все дашборды, доступные по API.
# @THROW: APIError - В случае ошибки сетевого запроса.
# @PARAM: query (Optional[Dict]) - Дополнительные параметры запроса для API.
# @RETURN: Tuple[int, List[Dict]] - Кортеж (общее количество, список дашбордов).
def get_dashboards(self, query: Optional[Dict] = None) -> Tuple[int, List[Dict]]:
with belief_scope("get_dashboards"):
assert self.network, "[get_dashboards][PRE] Network client must be initialized."
self.logger.info("[get_dashboards][Enter] Fetching dashboards.")
validated_query = self._validate_query_params(query or {})
if 'columns' not in validated_query:
validated_query['columns'] = ["slug", "id", "changed_on_utc", "dashboard_title", "published"]
total_count = self._fetch_total_object_count(endpoint="/dashboard/")
paginated_data = self._fetch_all_pages(
endpoint="/dashboard/",
pagination_options={"base_query": validated_query, "total_count": total_count, "results_field": "result"},
)
self.logger.info("[get_dashboards][Exit] Found %d dashboards.", total_count)
return total_count, paginated_data
validated_query['columns'] = ["slug", "id", "changed_on_utc", "dashboard_title", "published"]
total_count = self._fetch_total_object_count(endpoint="/dashboard/")
paginated_data = self._fetch_all_pages(
endpoint="/dashboard/",
pagination_options={"base_query": validated_query, "total_count": total_count, "results_field": "result"},
)
self.logger.info("[get_dashboards][Exit] Found %d dashboards.", total_count)
return total_count, paginated_data
# [/DEF:get_dashboards:Function]
# [DEF:export_dashboard:Function]
# @PURPOSE: Экспортирует дашборд в виде ZIP-архива.
# @RELATION: CALLS -> self.network.request
# @PRE: dashboard_id должен быть положительным целым числом.
# @POST: Возвращает бинарное содержимое ZIP-архива и имя файла.
# @THROW: ExportError - Если экспорт завершился неудачей.
# @PARAM: dashboard_id (int) - ID дашборда для экспорта.
# @RETURN: Tuple[bytes, str] - Бинарное содержимое ZIP-архива и имя файла.
def export_dashboard(self, dashboard_id: int) -> Tuple[bytes, str]:
with belief_scope("export_dashboard"):
assert isinstance(dashboard_id, int) and dashboard_id > 0, "[export_dashboard][PRE] dashboard_id must be a positive integer."
self.logger.info("[export_dashboard][Enter] Exporting dashboard %s.", dashboard_id)
response = self.network.request(
method="GET",
endpoint="/dashboard/export/",
params={"q": json.dumps([dashboard_id])},
stream=True,
raw_response=True,
)
response = cast(Response, response)
self._validate_export_response(response, dashboard_id)
filename = self._resolve_export_filename(response, dashboard_id)
self.logger.info("[export_dashboard][Exit] Exported dashboard %s to %s.", dashboard_id, filename)
return response.content, filename
# [/DEF:export_dashboard:Function]
# [DEF:import_dashboard:Function]
# @PURPOSE: Импортирует дашборд из ZIP-файла с возможностью автоматического удаления и повторной попытки при ошибке.
# @RELATION: CALLS -> self._do_import
# @RELATION: CALLS -> self.delete_dashboard
# @RELATION: CALLS -> self.get_dashboards
# @PRE: Файл, указанный в `file_name`, должен существовать и быть валидным ZIP-архивом Superset.
# @POST: Дашборд успешно импортирован, возвращен ответ API.
# @THROW: FileNotFoundError - Если файл не найден.
# @THROW: InvalidZipFormatError - Если файл не является валидным ZIP-архивом Superset.
# @PARAM: file_name (Union[str, Path]) - Путь к ZIP-архиву.
# @PARAM: dash_id (Optional[int]) - ID дашборда для удаления при сбое.
# @PARAM: dash_slug (Optional[str]) - Slug дашборда для поиска ID, если ID не предоставлен.
# @RETURN: Dict - Ответ API в случае успеха.
def import_dashboard(self, file_name: Union[str, Path], dash_id: Optional[int] = None, dash_slug: Optional[str] = None) -> Dict:
with belief_scope("import_dashboard"):
assert file_name, "[import_dashboard][PRE] file_name must be provided."
file_path = str(file_name)
self._validate_import_file(file_path)
try:
return self._do_import(file_path)
except Exception as exc:
self.logger.error("[import_dashboard][Failure] First import attempt failed: %s", exc, exc_info=True)
if not self.delete_before_reimport:
raise
target_id = self._resolve_target_id_for_delete(dash_id, dash_slug)
if target_id is None:
self.logger.error("[import_dashboard][Failure] No ID available for delete-retry.")
raise
self.delete_dashboard(target_id)
self.logger.info("[import_dashboard][State] Deleted dashboard ID %s, retrying import.", target_id)
return self._do_import(file_path)
# [/DEF:import_dashboard:Function]
# [DEF:_resolve_target_id_for_delete:Function]
# @PURPOSE: Определяет ID дашборда для удаления, используя ID или slug.
# @PARAM: dash_id (Optional[int]) - ID дашборда.
# @PARAM: dash_slug (Optional[str]) - Slug дашборда.
# @PRE: По крайней мере один из параметров (dash_id или dash_slug) должен быть предоставлен.
# @POST: Возвращает ID дашборда, если найден, иначе None.
# @THROW: APIError - В случае ошибки сетевого запроса при поиске по slug.
# @RETURN: Optional[int] - Найденный ID или None.
def _resolve_target_id_for_delete(self, dash_id: Optional[int], dash_slug: Optional[str]) -> Optional[int]:
with belief_scope("_resolve_target_id_for_delete"):
assert dash_id is not None or dash_slug is not None, "[_resolve_target_id_for_delete][PRE] At least one of ID or slug must be provided."
if dash_id is not None:
return dash_id
if dash_slug is not None:
self.logger.debug("[_resolve_target_id_for_delete][State] Resolving ID by slug '%s'.", dash_slug)
try:
_, candidates = self.get_dashboards(query={"filters": [{"col": "slug", "op": "eq", "value": dash_slug}]})
if candidates:
target_id = candidates[0]["id"]
self.logger.debug("[_resolve_target_id_for_delete][Success] Resolved slug to ID %s.", target_id)
return target_id
except Exception as e:
self.logger.warning("[_resolve_target_id_for_delete][Warning] Could not resolve slug '%s' to ID: %s", dash_slug, e)
return None
self.logger.debug("[_resolve_target_id_for_delete][State] Resolving ID by slug '%s'.", dash_slug)
try:
_, candidates = self.get_dashboards(query={"filters": [{"col": "slug", "op": "eq", "value": dash_slug}]})
if candidates:
target_id = candidates[0]["id"]
self.logger.debug("[_resolve_target_id_for_delete][Success] Resolved slug to ID %s.", target_id)
return target_id
except Exception as e:
self.logger.warning("[_resolve_target_id_for_delete][Warning] Could not resolve slug '%s' to ID: %s", dash_slug, e)
return None
# [/DEF:_resolve_target_id_for_delete:Function]
# [DEF:_do_import:Function]
# @PURPOSE: Выполняет один запрос на импорт без обработки исключений.
# @PRE: Файл должен существовать.
# @POST: Файл успешно загружен, возвращен ответ API.
# @THROW: FileNotFoundError - Если файл не существует.
# @PARAM: file_name (Union[str, Path]) - Путь к файлу.
# @RETURN: Dict - Ответ API.
def _do_import(self, file_name: Union[str, Path]) -> Dict:
with belief_scope("_do_import"):
self.logger.debug(f"[_do_import][State] Uploading file: {file_name}")
file_path = Path(file_name)
if file_path.exists():
self.logger.debug(f"[_do_import][State] File size: {file_path.stat().st_size} bytes")
else:
self.logger.error(f"[_do_import][Failure] File does not exist: {file_name}")
raise FileNotFoundError(f"File does not exist: {file_name}")
return self.network.upload_file(
endpoint="/dashboard/import/",
file_info={"file_obj": file_path, "file_name": file_path.name, "form_field": "formData"},
extra_data={"overwrite": "true"},
timeout=self.config.timeout * 2,
)
# [/DEF:_do_import:Function]
# [DEF:delete_dashboard:Function]
# @PURPOSE: Удаляет дашборд по его ID или slug.
# @RELATION: CALLS -> self.network.request
# @PRE: dashboard_id должен быть предоставлен.
# @POST: Дашборд удален или залогировано предупреждение.
# @THROW: APIError - В случае ошибки сетевого запроса.
# @PARAM: dashboard_id (Union[int, str]) - ID или slug дашборда.
def delete_dashboard(self, dashboard_id: Union[int, str]) -> None:
with belief_scope("delete_dashboard"):
assert dashboard_id, "[delete_dashboard][PRE] dashboard_id must be provided."
self.logger.info("[delete_dashboard][Enter] Deleting dashboard %s.", dashboard_id)
response = self.network.request(method="DELETE", endpoint=f"/dashboard/{dashboard_id}")
response = cast(Dict, response)
if response.get("result", True) is not False:
self.logger.info("[delete_dashboard][Success] Dashboard %s deleted.", dashboard_id)
else:
self.logger.warning("[delete_dashboard][Warning] Unexpected response while deleting %s: %s", dashboard_id, response)
# [/DEF:delete_dashboard:Function]
# [DEF:_extract_dashboard_id_from_zip:Function]
# @PURPOSE: Извлекает ID дашборда из `metadata.yaml` внутри ZIP-архива.
# @PARAM: file_name (Union[str, Path]) - Путь к ZIP-файлу.
# @PRE: Файл, указанный в `file_name`, должен быть валидным ZIP-архивом.
# @POST: Возвращает ID дашборда, если найден в metadata.yaml, иначе None.
# @THROW: ImportError - Если не установлен `yaml`.
# @RETURN: Optional[int] - ID дашборда или None.
def _extract_dashboard_id_from_zip(self, file_name: Union[str, Path]) -> Optional[int]:
with belief_scope("_extract_dashboard_id_from_zip"):
assert zipfile.is_zipfile(file_name), "[_extract_dashboard_id_from_zip][PRE] file_name must be a valid zip file."
try:
import yaml
with zipfile.ZipFile(file_name, "r") as zf:
for name in zf.namelist():
if name.endswith("metadata.yaml"):
with zf.open(name) as meta_file:
meta = yaml.safe_load(meta_file)
dash_id = meta.get("dashboard_uuid") or meta.get("dashboard_id")
if dash_id: return int(dash_id)
except Exception as exc:
self.logger.error("[_extract_dashboard_id_from_zip][Failure] %s", exc, exc_info=True)
return None
# [/DEF:_extract_dashboard_id_from_zip:Function]
# [DEF:_extract_dashboard_slug_from_zip:Function]
# @PURPOSE: Извлекает slug дашборда из `metadata.yaml` внутри ZIP-архива.
# @PARAM: file_name (Union[str, Path]) - Путь к ZIP-файлу.
# @PRE: Файл, указанный в `file_name`, должен быть валидным ZIP-архивом.
# @POST: Возвращает slug дашборда, если найден в metadata.yaml, иначе None.
# @THROW: ImportError - Если не установлен `yaml`.
# @RETURN: Optional[str] - Slug дашборда или None.
def _extract_dashboard_slug_from_zip(self, file_name: Union[str, Path]) -> Optional[str]:
with belief_scope("_extract_dashboard_slug_from_zip"):
assert zipfile.is_zipfile(file_name), "[_extract_dashboard_slug_from_zip][PRE] file_name must be a valid zip file."
try:
import yaml
with zipfile.ZipFile(file_name, "r") as zf:
for name in zf.namelist():
if name.endswith("metadata.yaml"):
with zf.open(name) as meta_file:
meta = yaml.safe_load(meta_file)
if slug := meta.get("slug"):
return str(slug)
except Exception as exc:
self.logger.error("[_extract_dashboard_slug_from_zip][Failure] %s", exc, exc_info=True)
return None
# [/DEF:_extract_dashboard_slug_from_zip:Function]
# [DEF:_validate_export_response:Function]
# @PURPOSE: Проверяет, что HTTP-ответ на экспорт является валидным ZIP-архивом.
# @PRE: response должен быть объектом requests.Response.
# @POST: Проверка пройдена, если ответ является непустым ZIP-архивом.
# @THROW: ExportError - Если ответ не является ZIP-архивом или пуст.
# @PARAM: response (Response) - HTTP ответ.
# @PARAM: dashboard_id (int) - ID дашборда.
def _validate_export_response(self, response: Response, dashboard_id: int) -> None:
with belief_scope("_validate_export_response"):
assert isinstance(response, Response), "[_validate_export_response][PRE] response must be a requests.Response object."
content_type = response.headers.get("Content-Type", "")
if "application/zip" not in content_type:
raise ExportError(f"Получен не ZIP-архив (Content-Type: {content_type})")
if not response.content:
raise ExportError("Получены пустые данные при экспорте")
# [/DEF:_validate_export_response:Function]
# [DEF:_resolve_export_filename:Function]
# @PURPOSE: Определяет имя файла для экспорта из заголовков или генерирует его.
# @PRE: response должен быть объектом requests.Response.
# @POST: Возвращает непустое имя файла.
# @PARAM: response (Response) - HTTP ответ.
# @PARAM: dashboard_id (int) - ID дашборда.
# @RETURN: str - Имя файла.
def _resolve_export_filename(self, response: Response, dashboard_id: int) -> str:
with belief_scope("_resolve_export_filename"):
assert isinstance(response, Response), "[_resolve_export_filename][PRE] response must be a requests.Response object."
filename = get_filename_from_headers(dict(response.headers))
if not filename:
from datetime import datetime
timestamp = datetime.now().strftime("%Y%m%dT%H%M%S")
filename = f"dashboard_export_{dashboard_id}_{timestamp}.zip"
self.logger.warning("[_resolve_export_filename][Warning] Generated filename: %s", filename)
return filename
# [/DEF:_resolve_export_filename:Function]
# [DEF:_validate_query_params:Function]
# @PURPOSE: Формирует корректный набор параметров запроса с пагинацией.
# @PARAM: query (Optional[Dict]) - Исходные параметры.
# @PRE: query, если предоставлен, должен быть словарем.
# @POST: Возвращает словарь, содержащий базовые параметры пагинации, объединенные с `query`.
# @RETURN: Dict - Валидированные параметры.
def _validate_query_params(self, query: Optional[Dict]) -> Dict:
with belief_scope("_validate_query_params"):
assert query is None or isinstance(query, dict), "[_validate_query_params][PRE] query must be a dictionary or None."
base_query = {"page": 0, "page_size": 1000}
return {**base_query, **(query or {})}
# [/DEF:_validate_query_params:Function]
# [DEF:_fetch_total_object_count:Function]
# @PURPOSE: Получает общее количество объектов по указанному эндпоинту для пагинации.
# @PARAM: endpoint (str) - API эндпоинт.
# @PRE: endpoint должен быть непустой строкой.
# @POST: Возвращает общее количество объектов (>= 0).
# @THROW: APIError - В случае ошибки сетевого запроса.
# @RETURN: int - Количество объектов.
def _fetch_total_object_count(self, endpoint: str) -> int:
with belief_scope("_fetch_total_object_count"):
assert endpoint and isinstance(endpoint, str), "[_fetch_total_object_count][PRE] endpoint must be a non-empty string."
return self.network.fetch_paginated_count(
endpoint=endpoint,
query_params={"page": 0, "page_size": 1},
count_field="count",
)
# [/DEF:_fetch_total_object_count:Function]
# [DEF:_fetch_all_pages:Function]
# @PURPOSE: Итерируется по всем страницам пагинированного API и собирает все данные.
# @PARAM: endpoint (str) - API эндпоинт.
# @PARAM: pagination_options (Dict) - Опции пагинации.
# @PRE: endpoint должен быть непустой строкой, pagination_options - словарем.
# @POST: Возвращает полный список объектов.
# @THROW: APIError - В случае ошибки сетевого запроса.
# @RETURN: List[Dict] - Список всех объектов.
def _fetch_all_pages(self, endpoint: str, pagination_options: Dict) -> List[Dict]:
with belief_scope("_fetch_all_pages"):
assert endpoint and isinstance(endpoint, str), "[_fetch_all_pages][PRE] endpoint must be a non-empty string."
assert isinstance(pagination_options, dict), "[_fetch_all_pages][PRE] pagination_options must be a dictionary."
return self.network.fetch_paginated_data(endpoint=endpoint, pagination_options=pagination_options)
# [/DEF:_fetch_all_pages:Function]
# [DEF:_validate_import_file:Function]
# @PURPOSE: Проверяет, что файл существует, является ZIP-архивом и содержит `metadata.yaml`.
# @PRE: zip_path должен быть предоставлен.
# @POST: Проверка пройдена, если файл существует, является ZIP и содержит `metadata.yaml`.
# @THROW: FileNotFoundError - Если файл не найден.
# @THROW: InvalidZipFormatError - Если файл не является ZIP или не содержит `metadata.yaml`.
# @PARAM: zip_path (Union[str, Path]) - Путь к файлу.
def _validate_import_file(self, zip_path: Union[str, Path]) -> None:
with belief_scope("_validate_import_file"):
assert zip_path, "[_validate_import_file][PRE] zip_path must be provided."
path = Path(zip_path)
assert path.exists(), f"Файл {zip_path} не существует"
assert zipfile.is_zipfile(path), f"Файл {zip_path} не является ZIP-архивом"
with zipfile.ZipFile(path, "r") as zf:
assert any(n.endswith("metadata.yaml") for n in zf.namelist()), f"Архив {zip_path} не содержит 'metadata.yaml'"
# [/DEF:_validate_import_file:Function]
# [DEF:get_datasets:Function]
# @PURPOSE: Получает полный список датасетов, автоматически обрабатывая пагинацию.
# @RELATION: CALLS -> self._fetch_total_object_count
# @RELATION: CALLS -> self._fetch_all_pages
# @PARAM: query (Optional[Dict]) - Дополнительные параметры запроса.
# @PRE: self.network должен быть инициализирован.
# @POST: Возвращаемый список содержит все датасеты, доступные по API.
# @THROW: APIError - В случае ошибки сетевого запроса.
# @RETURN: Tuple[int, List[Dict]] - Кортеж (общее количество, список датасетов).
def get_datasets(self, query: Optional[Dict] = None) -> Tuple[int, List[Dict]]:
with belief_scope("get_datasets"):
assert self.network, "[get_datasets][PRE] Network client must be initialized."
self.logger.info("[get_datasets][Enter] Fetching datasets.")
validated_query = self._validate_query_params(query)
total_count = self._fetch_total_object_count(endpoint="/dataset/")
paginated_data = self._fetch_all_pages(
endpoint="/dataset/",
pagination_options={"base_query": validated_query, "total_count": total_count, "results_field": "result"},
)
self.logger.info("[get_datasets][Exit] Found %d datasets.", total_count)
return total_count, paginated_data
# [/DEF:get_datasets:Function]
# [DEF:get_databases:Function]
# @PURPOSE: Получает полный список баз данных, автоматически обрабатывая пагинацию.
# @RELATION: CALLS -> self._fetch_total_object_count
# @RELATION: CALLS -> self._fetch_all_pages
# @PARAM: query (Optional[Dict]) - Дополнительные параметры запроса.
# @PRE: self.network должен быть инициализирован.
# @POST: Возвращаемый список содержит все базы данных, доступные по API.
# @THROW: APIError - В случае ошибки сетевого запроса.
# @RETURN: Tuple[int, List[Dict]] - Кортеж (общее количество, список баз данных).
def get_databases(self, query: Optional[Dict] = None) -> Tuple[int, List[Dict]]:
with belief_scope("get_databases"):
assert self.network, "[get_databases][PRE] Network client must be initialized."
self.logger.info("[get_databases][Enter] Fetching databases.")
validated_query = self._validate_query_params(query or {})
if 'columns' not in validated_query:
validated_query['columns'] = []
total_count = self._fetch_total_object_count(endpoint="/database/")
paginated_data = self._fetch_all_pages(
endpoint="/database/",
pagination_options={"base_query": validated_query, "total_count": total_count, "results_field": "result"},
)
self.logger.info("[get_databases][Exit] Found %d databases.", total_count)
return total_count, paginated_data
# [/DEF:get_databases:Function]
# [DEF:get_dataset:Function]
# @PURPOSE: Получает информацию о конкретном датасете по его ID.
# @RELATION: CALLS -> self.network.request
# @PARAM: dataset_id (int) - ID датасета.
# @PRE: dataset_id должен быть положительным целым числом.
# @POST: Возвращает словарь с информацией о датасете.
# @THROW: APIError - В случае ошибки сетевого запроса или если датасет не найден.
# @RETURN: Dict - Информация о датасете.
def get_dataset(self, dataset_id: int) -> Dict:
with belief_scope("get_dataset"):
assert isinstance(dataset_id, int) and dataset_id > 0, "[get_dataset][PRE] dataset_id must be a positive integer."
self.logger.info("[get_dataset][Enter] Fetching dataset %s.", dataset_id)
response = self.network.request(method="GET", endpoint=f"/dataset/{dataset_id}")
response = cast(Dict, response)
self.logger.info("[get_dataset][Exit] Got dataset %s.", dataset_id)
return response
# [/DEF:get_dataset:Function]
# [DEF:get_database:Function]
# @PURPOSE: Получает информацию о конкретной базе данных по её ID.
# @RELATION: CALLS -> self.network.request
# @PARAM: database_id (int) - ID базы данных.
# @PRE: database_id должен быть положительным целым числом.
# @POST: Возвращает словарь с информацией о базе данных.
# @THROW: APIError - В случае ошибки сетевого запроса или если база данных не найдена.
# @RETURN: Dict - Информация о базе данных.
def get_database(self, database_id: int) -> Dict:
with belief_scope("get_database"):
assert isinstance(database_id, int) and database_id > 0, "[get_database][PRE] database_id must be a positive integer."
self.logger.info("[get_database][Enter] Fetching database %s.", database_id)
response = self.network.request(method="GET", endpoint=f"/database/{database_id}")
response = cast(Dict, response)
self.logger.info("[get_database][Exit] Got database %s.", database_id)
return response
# [/DEF:get_database:Function]
# [DEF:update_dataset:Function]
# @PURPOSE: Обновляет данные датасета по его ID.
# @RELATION: CALLS -> self.network.request
# @PARAM: dataset_id (int) - ID датасета.
# @PARAM: data (Dict) - Данные для обновления.
# @PRE: dataset_id должен быть положительным целым числом, data - непустым словарем.
# @POST: Датасет успешно обновлен, возвращен ответ API.
# @THROW: APIError - В случае ошибки сетевого запроса.
# @RETURN: Dict - Ответ API.
def update_dataset(self, dataset_id: int, data: Dict) -> Dict:
with belief_scope("update_dataset"):
assert isinstance(dataset_id, int) and dataset_id > 0, "[update_dataset][PRE] dataset_id must be a positive integer."
assert isinstance(data, dict) and data, "[update_dataset][PRE] data must be a non-empty dictionary."
self.logger.info("[update_dataset][Enter] Updating dataset %s.", dataset_id)
response = self.network.request(
method="PUT",
endpoint=f"/dataset/{dataset_id}",
data=json.dumps(data),
headers={'Content-Type': 'application/json'}
)
response = cast(Dict, response)
self.logger.info("[update_dataset][Exit] Updated dataset %s.", dataset_id)
return response
# [/DEF:update_dataset:Function]
# [/DEF:SupersetClient:Class]
# [/DEF:superset_tool.client:Module]