Files
ss-tools/superset_tool/utils/network.py
2026-01-18 21:29:54 +03:00

266 lines
15 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# [DEF:superset_tool.utils.network:Module]
#
# @SEMANTICS: network, http, client, api, requests, session, authentication
# @PURPOSE: Инкапсулирует низкоуровневую HTTP-логику для взаимодействия с Superset API, включая аутентификацию, управление сессией, retry-логику и обработку ошибок.
# @LAYER: Infra
# @RELATION: DEPENDS_ON -> superset_tool.exceptions
# @RELATION: DEPENDS_ON -> superset_tool.utils.logger
# @RELATION: DEPENDS_ON -> requests
# @PUBLIC_API: APIClient
# [SECTION: IMPORTS]
from typing import Optional, Dict, Any, List, Union, cast
import json
import io
from pathlib import Path
import requests
from requests.adapters import HTTPAdapter
import urllib3
from superset_tool.utils.logger import belief_scope
from urllib3.util.retry import Retry
from superset_tool.exceptions import AuthenticationError, NetworkError, DashboardNotFoundError, SupersetAPIError, PermissionDeniedError
from superset_tool.utils.logger import SupersetLogger
# [/SECTION]
# [DEF:APIClient:Class]
# @PURPOSE: Инкапсулирует HTTP-логику для работы с API, включая сессии, аутентификацию, и обработку запросов.
class APIClient:
DEFAULT_TIMEOUT = 30
# [DEF:__init__:Function]
# @PURPOSE: Инициализирует API клиент с конфигурацией, сессией и логгером.
# @PARAM: config (Dict[str, Any]) - Конфигурация.
# @PARAM: verify_ssl (bool) - Проверять ли SSL.
# @PARAM: timeout (int) - Таймаут запросов.
# @PARAM: logger (Optional[SupersetLogger]) - Логгер.
# @PRE: config must contain 'base_url' and 'auth'.
# @POST: APIClient instance is initialized with a session.
def __init__(self, config: Dict[str, Any], verify_ssl: bool = True, timeout: int = DEFAULT_TIMEOUT, logger: Optional[SupersetLogger] = None):
with belief_scope("__init__"):
self.logger = logger or SupersetLogger(name="APIClient")
self.logger.info("[APIClient.__init__][Entry] Initializing APIClient.")
self.base_url: str = config.get("base_url", "")
self.auth = config.get("auth")
self.request_settings = {"verify_ssl": verify_ssl, "timeout": timeout}
self.session = self._init_session()
self._tokens: Dict[str, str] = {}
self._authenticated = False
self.logger.info("[APIClient.__init__][Exit] APIClient initialized.")
# [/DEF:__init__:Function]
# [DEF:_init_session:Function]
# @PURPOSE: Создает и настраивает `requests.Session` с retry-логикой.
# @PRE: self.request_settings must be initialized.
# @POST: Returns a configured requests.Session instance.
# @RETURN: requests.Session - Настроенная сессия.
def _init_session(self) -> requests.Session:
with belief_scope("_init_session"):
session = requests.Session()
retries = Retry(total=3, backoff_factor=0.5, status_forcelist=[500, 502, 503, 504])
adapter = HTTPAdapter(max_retries=retries)
session.mount('http://', adapter)
session.mount('https://', adapter)
if not self.request_settings["verify_ssl"]:
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
self.logger.warning("[_init_session][State] SSL verification disabled.")
session.verify = self.request_settings["verify_ssl"]
return session
# [/DEF:_init_session:Function]
# [DEF:authenticate:Function]
# @PURPOSE: Выполняет аутентификацию в Superset API и получает access и CSRF токены.
# @PRE: self.auth and self.base_url must be valid.
# @POST: `self._tokens` заполнен, `self._authenticated` установлен в `True`.
# @RETURN: Dict[str, str] - Словарь с токенами.
# @THROW: AuthenticationError, NetworkError - при ошибках.
def authenticate(self) -> Dict[str, str]:
with belief_scope("authenticate"):
self.logger.info("[authenticate][Enter] Authenticating to %s", self.base_url)
try:
login_url = f"{self.base_url}/security/login"
response = self.session.post(login_url, json=self.auth, timeout=self.request_settings["timeout"])
response.raise_for_status()
access_token = response.json()["access_token"]
csrf_url = f"{self.base_url}/security/csrf_token/"
csrf_response = self.session.get(csrf_url, headers={"Authorization": f"Bearer {access_token}"}, timeout=self.request_settings["timeout"])
csrf_response.raise_for_status()
self._tokens = {"access_token": access_token, "csrf_token": csrf_response.json()["result"]}
self._authenticated = True
self.logger.info("[authenticate][Exit] Authenticated successfully.")
return self._tokens
except requests.exceptions.HTTPError as e:
raise AuthenticationError(f"Authentication failed: {e}") from e
except (requests.exceptions.RequestException, KeyError) as e:
raise NetworkError(f"Network or parsing error during authentication: {e}") from e
# [/DEF:authenticate:Function]
@property
# [DEF:headers:Function]
# @PURPOSE: Возвращает HTTP-заголовки для аутентифицированных запросов.
# @PRE: APIClient is initialized and authenticated or can be authenticated.
# @POST: Returns headers including auth tokens.
def headers(self) -> Dict[str, str]:
with belief_scope("headers"):
if not self._authenticated: self.authenticate()
return {
"Authorization": f"Bearer {self._tokens['access_token']}",
"X-CSRFToken": self._tokens.get("csrf_token", ""),
"Referer": self.base_url,
"Content-Type": "application/json"
}
# [/DEF:headers:Function]
# [DEF:request:Function]
# @PURPOSE: Выполняет универсальный HTTP-запрос к API.
# @PARAM: method (str) - HTTP метод.
# @PARAM: endpoint (str) - API эндпоинт.
# @PARAM: headers (Optional[Dict]) - Дополнительные заголовки.
# @PARAM: raw_response (bool) - Возвращать ли сырой ответ.
# @PRE: method and endpoint must be strings.
# @POST: Returns response content or raw Response object.
# @RETURN: `requests.Response` если `raw_response=True`, иначе `dict`.
# @THROW: SupersetAPIError, NetworkError и их подклассы.
def request(self, method: str, endpoint: str, headers: Optional[Dict] = None, raw_response: bool = False, **kwargs) -> Union[requests.Response, Dict[str, Any]]:
with belief_scope("request"):
full_url = f"{self.base_url}{endpoint}"
_headers = self.headers.copy()
if headers: _headers.update(headers)
try:
response = self.session.request(method, full_url, headers=_headers, **kwargs)
response.raise_for_status()
return response if raw_response else response.json()
except requests.exceptions.HTTPError as e:
self._handle_http_error(e, endpoint)
except requests.exceptions.RequestException as e:
self._handle_network_error(e, full_url)
# [/DEF:request:Function]
# [DEF:_handle_http_error:Function]
# @PURPOSE: (Helper) Преобразует HTTP ошибки в кастомные исключения.
# @PARAM: e (requests.exceptions.HTTPError) - Ошибка.
# @PARAM: endpoint (str) - Эндпоинт.
# @PRE: e must be a valid HTTPError with a response.
# @POST: Raises a specific SupersetAPIError or subclass.
def _handle_http_error(self, e: requests.exceptions.HTTPError, endpoint: str):
with belief_scope("_handle_http_error"):
status_code = e.response.status_code
if status_code == 404: raise DashboardNotFoundError(endpoint) from e
if status_code == 403: raise PermissionDeniedError() from e
if status_code == 401: raise AuthenticationError() from e
raise SupersetAPIError(f"API Error {status_code}: {e.response.text}") from e
# [/DEF:_handle_http_error:Function]
# [DEF:_handle_network_error:Function]
# @PURPOSE: (Helper) Преобразует сетевые ошибки в `NetworkError`.
# @PARAM: e (requests.exceptions.RequestException) - Ошибка.
# @PARAM: url (str) - URL.
# @PRE: e must be a RequestException.
# @POST: Raises a NetworkError.
def _handle_network_error(self, e: requests.exceptions.RequestException, url: str):
with belief_scope("_handle_network_error"):
if isinstance(e, requests.exceptions.Timeout): msg = "Request timeout"
elif isinstance(e, requests.exceptions.ConnectionError): msg = "Connection error"
else: msg = f"Unknown network error: {e}"
raise NetworkError(msg, url=url) from e
# [/DEF:_handle_network_error:Function]
# [DEF:upload_file:Function]
# @PURPOSE: Загружает файл на сервер через multipart/form-data.
# @PARAM: endpoint (str) - Эндпоинт.
# @PARAM: file_info (Dict[str, Any]) - Информация о файле.
# @PARAM: extra_data (Optional[Dict]) - Дополнительные данные.
# @PARAM: timeout (Optional[int]) - Таймаут.
# @PRE: file_info must contain 'file_obj' and 'file_name'.
# @POST: File is uploaded and response returned.
# @RETURN: Ответ API в виде словаря.
# @THROW: SupersetAPIError, NetworkError, TypeError.
def upload_file(self, endpoint: str, file_info: Dict[str, Any], extra_data: Optional[Dict] = None, timeout: Optional[int] = None) -> Dict:
with belief_scope("upload_file"):
full_url = f"{self.base_url}{endpoint}"
_headers = self.headers.copy(); _headers.pop('Content-Type', None)
file_obj, file_name, form_field = file_info.get("file_obj"), file_info.get("file_name"), file_info.get("form_field", "file")
files_payload = {}
if isinstance(file_obj, (str, Path)):
with open(file_obj, 'rb') as f:
files_payload = {form_field: (file_name, f.read(), 'application/x-zip-compressed')}
elif isinstance(file_obj, io.BytesIO):
files_payload = {form_field: (file_name, file_obj.getvalue(), 'application/x-zip-compressed')}
else:
raise TypeError(f"Unsupported file_obj type: {type(file_obj)}")
return self._perform_upload(full_url, files_payload, extra_data, _headers, timeout)
# [/DEF:upload_file:Function]
# [DEF:_perform_upload:Function]
# @PURPOSE: (Helper) Выполняет POST запрос с файлом.
# @PARAM: url (str) - URL.
# @PARAM: files (Dict) - Файлы.
# @PARAM: data (Optional[Dict]) - Данные.
# @PARAM: headers (Dict) - Заголовки.
# @PARAM: timeout (Optional[int]) - Таймаут.
# @PRE: url, files, and headers must be provided.
# @POST: POST request is performed and JSON response returned.
# @RETURN: Dict - Ответ.
def _perform_upload(self, url: str, files: Dict, data: Optional[Dict], headers: Dict, timeout: Optional[int]) -> Dict:
with belief_scope("_perform_upload"):
try:
response = self.session.post(url, files=files, data=data or {}, headers=headers, timeout=timeout or self.request_settings["timeout"])
response.raise_for_status()
# Добавляем логирование для отладки
if response.status_code == 200:
try:
return response.json()
except Exception as json_e:
self.logger.debug(f"[_perform_upload][Debug] Response is not valid JSON: {response.text[:200]}...")
raise SupersetAPIError(f"API error during upload: Response is not valid JSON: {json_e}") from json_e
return response.json()
except requests.exceptions.HTTPError as e:
raise SupersetAPIError(f"API error during upload: {e.response.text}") from e
except requests.exceptions.RequestException as e:
raise NetworkError(f"Network error during upload: {e}", url=url) from e
# [/DEF:_perform_upload:Function]
# [DEF:fetch_paginated_count:Function]
# @PURPOSE: Получает общее количество элементов для пагинации.
# @PARAM: endpoint (str) - Эндпоинт.
# @PARAM: query_params (Dict) - Параметры запроса.
# @PARAM: count_field (str) - Поле с количеством.
# @PRE: query_params must be a dictionary.
# @POST: Returns total count of items.
# @RETURN: int - Количество.
def fetch_paginated_count(self, endpoint: str, query_params: Dict, count_field: str = "count") -> int:
with belief_scope("fetch_paginated_count"):
response_json = cast(Dict[str, Any], self.request("GET", endpoint, params={"q": json.dumps(query_params)}))
return response_json.get(count_field, 0)
# [/DEF:fetch_paginated_count:Function]
# [DEF:fetch_paginated_data:Function]
# @PURPOSE: Автоматически собирает данные со всех страниц пагинированного эндпоинта.
# @PARAM: endpoint (str) - Эндпоинт.
# @PARAM: pagination_options (Dict[str, Any]) - Опции пагинации.
# @PRE: pagination_options must contain 'base_query', 'total_count', 'results_field'.
# @POST: Returns all items across all pages.
# @RETURN: List[Any] - Список данных.
def fetch_paginated_data(self, endpoint: str, pagination_options: Dict[str, Any]) -> List[Any]:
with belief_scope("fetch_paginated_data"):
base_query, total_count = pagination_options["base_query"], pagination_options["total_count"]
results_field, page_size = pagination_options["results_field"], base_query.get('page_size')
assert page_size and page_size > 0, "'page_size' must be a positive number."
results = []
for page in range((total_count + page_size - 1) // page_size):
query = {**base_query, 'page': page}
response_json = cast(Dict[str, Any], self.request("GET", endpoint, params={"q": json.dumps(query)}))
results.extend(response_json.get(results_field, []))
return results
# [/DEF:fetch_paginated_data:Function]
# [/DEF:APIClient:Class]
# [/DEF:superset_tool.utils.network:Module]