# [DEF:superset_tool.utils.network:Module] # # @SEMANTICS: network, http, client, api, requests, session, authentication # @PURPOSE: Инкапсулирует низкоуровневую HTTP-логику для взаимодействия с Superset API, включая аутентификацию, управление сессией, retry-логику и обработку ошибок. # @LAYER: Infra # @RELATION: DEPENDS_ON -> superset_tool.exceptions # @RELATION: DEPENDS_ON -> superset_tool.utils.logger # @RELATION: DEPENDS_ON -> requests # @PUBLIC_API: APIClient # [SECTION: IMPORTS] from typing import Optional, Dict, Any, List, Union, cast import json import io from pathlib import Path import requests from requests.adapters import HTTPAdapter import urllib3 from urllib3.util.retry import Retry from superset_tool.exceptions import AuthenticationError, NetworkError, DashboardNotFoundError, SupersetAPIError, PermissionDeniedError from superset_tool.utils.logger import SupersetLogger # [/SECTION] # [DEF:APIClient:Class] # @PURPOSE: Инкапсулирует HTTP-логику для работы с API, включая сессии, аутентификацию, и обработку запросов. class APIClient: DEFAULT_TIMEOUT = 30 # [DEF:APIClient.__init__:Function] # @PURPOSE: Инициализирует API клиент с конфигурацией, сессией и логгером. # @PARAM: config (Dict[str, Any]) - Конфигурация. # @PARAM: verify_ssl (bool) - Проверять ли SSL. # @PARAM: timeout (int) - Таймаут запросов. # @PARAM: logger (Optional[SupersetLogger]) - Логгер. def __init__(self, config: Dict[str, Any], verify_ssl: bool = True, timeout: int = DEFAULT_TIMEOUT, logger: Optional[SupersetLogger] = None): self.logger = logger or SupersetLogger(name="APIClient") self.logger.info("[APIClient.__init__][Entry] Initializing APIClient.") self.base_url: str = config.get("base_url", "") self.auth = config.get("auth") self.request_settings = {"verify_ssl": verify_ssl, "timeout": timeout} self.session = self._init_session() self._tokens: Dict[str, str] = {} self._authenticated = False self.logger.info("[APIClient.__init__][Exit] APIClient initialized.") # [/DEF:APIClient.__init__] # [DEF:APIClient._init_session:Function] # @PURPOSE: Создает и настраивает `requests.Session` с retry-логикой. # @RETURN: requests.Session - Настроенная сессия. def _init_session(self) -> requests.Session: session = requests.Session() retries = Retry(total=3, backoff_factor=0.5, status_forcelist=[500, 502, 503, 504]) adapter = HTTPAdapter(max_retries=retries) session.mount('http://', adapter) session.mount('https://', adapter) if not self.request_settings["verify_ssl"]: urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) self.logger.warning("[_init_session][State] SSL verification disabled.") session.verify = self.request_settings["verify_ssl"] return session # [/DEF:APIClient._init_session] # [DEF:APIClient.authenticate:Function] # @PURPOSE: Выполняет аутентификацию в Superset API и получает access и CSRF токены. # @POST: `self._tokens` заполнен, `self._authenticated` установлен в `True`. # @RETURN: Dict[str, str] - Словарь с токенами. # @THROW: AuthenticationError, NetworkError - при ошибках. def authenticate(self) -> Dict[str, str]: self.logger.info("[authenticate][Enter] Authenticating to %s", self.base_url) try: login_url = f"{self.base_url}/security/login" response = self.session.post(login_url, json=self.auth, timeout=self.request_settings["timeout"]) response.raise_for_status() access_token = response.json()["access_token"] csrf_url = f"{self.base_url}/security/csrf_token/" csrf_response = self.session.get(csrf_url, headers={"Authorization": f"Bearer {access_token}"}, timeout=self.request_settings["timeout"]) csrf_response.raise_for_status() self._tokens = {"access_token": access_token, "csrf_token": csrf_response.json()["result"]} self._authenticated = True self.logger.info("[authenticate][Exit] Authenticated successfully.") return self._tokens except requests.exceptions.HTTPError as e: raise AuthenticationError(f"Authentication failed: {e}") from e except (requests.exceptions.RequestException, KeyError) as e: raise NetworkError(f"Network or parsing error during authentication: {e}") from e # [/DEF:APIClient.authenticate] @property def headers(self) -> Dict[str, str]: # [DEF:APIClient.headers:Function] # @PURPOSE: Возвращает HTTP-заголовки для аутентифицированных запросов. if not self._authenticated: self.authenticate() return { "Authorization": f"Bearer {self._tokens['access_token']}", "X-CSRFToken": self._tokens.get("csrf_token", ""), "Referer": self.base_url, "Content-Type": "application/json" } # [/DEF:APIClient.headers] # [DEF:APIClient.request:Function] # @PURPOSE: Выполняет универсальный HTTP-запрос к API. # @RETURN: `requests.Response` если `raw_response=True`, иначе `dict`. # @THROW: SupersetAPIError, NetworkError и их подклассы. # @PARAM: method (str) - HTTP метод. # @PARAM: endpoint (str) - API эндпоинт. # @PARAM: headers (Optional[Dict]) - Дополнительные заголовки. # @PARAM: raw_response (bool) - Возвращать ли сырой ответ. def request(self, method: str, endpoint: str, headers: Optional[Dict] = None, raw_response: bool = False, **kwargs) -> Union[requests.Response, Dict[str, Any]]: full_url = f"{self.base_url}{endpoint}" _headers = self.headers.copy() if headers: _headers.update(headers) try: response = self.session.request(method, full_url, headers=_headers, **kwargs) response.raise_for_status() return response if raw_response else response.json() except requests.exceptions.HTTPError as e: self._handle_http_error(e, endpoint) except requests.exceptions.RequestException as e: self._handle_network_error(e, full_url) # [/DEF:APIClient.request] # [DEF:APIClient._handle_http_error:Function] # @PURPOSE: (Helper) Преобразует HTTP ошибки в кастомные исключения. # @PARAM: e (requests.exceptions.HTTPError) - Ошибка. # @PARAM: endpoint (str) - Эндпоинт. def _handle_http_error(self, e: requests.exceptions.HTTPError, endpoint: str): status_code = e.response.status_code if status_code == 404: raise DashboardNotFoundError(endpoint) from e if status_code == 403: raise PermissionDeniedError() from e if status_code == 401: raise AuthenticationError() from e raise SupersetAPIError(f"API Error {status_code}: {e.response.text}") from e # [/DEF:APIClient._handle_http_error] # [DEF:APIClient._handle_network_error:Function] # @PURPOSE: (Helper) Преобразует сетевые ошибки в `NetworkError`. # @PARAM: e (requests.exceptions.RequestException) - Ошибка. # @PARAM: url (str) - URL. def _handle_network_error(self, e: requests.exceptions.RequestException, url: str): if isinstance(e, requests.exceptions.Timeout): msg = "Request timeout" elif isinstance(e, requests.exceptions.ConnectionError): msg = "Connection error" else: msg = f"Unknown network error: {e}" raise NetworkError(msg, url=url) from e # [/DEF:APIClient._handle_network_error] # [DEF:APIClient.upload_file:Function] # @PURPOSE: Загружает файл на сервер через multipart/form-data. # @RETURN: Ответ API в виде словаря. # @THROW: SupersetAPIError, NetworkError, TypeError. # @PARAM: endpoint (str) - Эндпоинт. # @PARAM: file_info (Dict[str, Any]) - Информация о файле. # @PARAM: extra_data (Optional[Dict]) - Дополнительные данные. # @PARAM: timeout (Optional[int]) - Таймаут. def upload_file(self, endpoint: str, file_info: Dict[str, Any], extra_data: Optional[Dict] = None, timeout: Optional[int] = None) -> Dict: full_url = f"{self.base_url}{endpoint}" _headers = self.headers.copy(); _headers.pop('Content-Type', None) file_obj, file_name, form_field = file_info.get("file_obj"), file_info.get("file_name"), file_info.get("form_field", "file") files_payload = {} if isinstance(file_obj, (str, Path)): with open(file_obj, 'rb') as f: files_payload = {form_field: (file_name, f.read(), 'application/x-zip-compressed')} elif isinstance(file_obj, io.BytesIO): files_payload = {form_field: (file_name, file_obj.getvalue(), 'application/x-zip-compressed')} else: raise TypeError(f"Unsupported file_obj type: {type(file_obj)}") return self._perform_upload(full_url, files_payload, extra_data, _headers, timeout) # [/DEF:APIClient.upload_file] # [DEF:APIClient._perform_upload:Function] # @PURPOSE: (Helper) Выполняет POST запрос с файлом. # @PARAM: url (str) - URL. # @PARAM: files (Dict) - Файлы. # @PARAM: data (Optional[Dict]) - Данные. # @PARAM: headers (Dict) - Заголовки. # @PARAM: timeout (Optional[int]) - Таймаут. # @RETURN: Dict - Ответ. def _perform_upload(self, url: str, files: Dict, data: Optional[Dict], headers: Dict, timeout: Optional[int]) -> Dict: try: response = self.session.post(url, files=files, data=data or {}, headers=headers, timeout=timeout or self.request_settings["timeout"]) response.raise_for_status() # Добавляем логирование для отладки if response.status_code == 200: try: return response.json() except Exception as json_e: self.logger.debug(f"[_perform_upload][Debug] Response is not valid JSON: {response.text[:200]}...") raise SupersetAPIError(f"API error during upload: Response is not valid JSON: {json_e}") from json_e return response.json() except requests.exceptions.HTTPError as e: raise SupersetAPIError(f"API error during upload: {e.response.text}") from e except requests.exceptions.RequestException as e: raise NetworkError(f"Network error during upload: {e}", url=url) from e # [/DEF:APIClient._perform_upload] # [DEF:APIClient.fetch_paginated_count:Function] # @PURPOSE: Получает общее количество элементов для пагинации. # @PARAM: endpoint (str) - Эндпоинт. # @PARAM: query_params (Dict) - Параметры запроса. # @PARAM: count_field (str) - Поле с количеством. # @RETURN: int - Количество. def fetch_paginated_count(self, endpoint: str, query_params: Dict, count_field: str = "count") -> int: response_json = cast(Dict[str, Any], self.request("GET", endpoint, params={"q": json.dumps(query_params)})) return response_json.get(count_field, 0) # [/DEF:APIClient.fetch_paginated_count] # [DEF:APIClient.fetch_paginated_data:Function] # @PURPOSE: Автоматически собирает данные со всех страниц пагинированного эндпоинта. # @PARAM: endpoint (str) - Эндпоинт. # @PARAM: pagination_options (Dict[str, Any]) - Опции пагинации. # @RETURN: List[Any] - Список данных. def fetch_paginated_data(self, endpoint: str, pagination_options: Dict[str, Any]) -> List[Any]: base_query, total_count = pagination_options["base_query"], pagination_options["total_count"] results_field, page_size = pagination_options["results_field"], base_query.get('page_size') assert page_size and page_size > 0, "'page_size' must be a positive number." results = [] for page in range((total_count + page_size - 1) // page_size): query = {**base_query, 'page': page} response_json = cast(Dict[str, Any], self.request("GET", endpoint, params={"q": json.dumps(query)})) results.extend(response_json.get(results_field, [])) return results # [/DEF:APIClient.fetch_paginated_data] # [/DEF:APIClient] # [/DEF:superset_tool.utils.network]