Files
ss-tools/superset_tool/utils/network.py
2025-10-06 18:49:40 +03:00

198 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# <GRACE_MODULE id="superset_tool.utils.network" name="network.py">
# @SEMANTICS: network, http, client, api, requests, session, authentication
# @PURPOSE: Инкапсулирует низкоуровневую HTTP-логику для взаимодействия с Superset API, включая аутентификацию, управление сессией, retry-логику и обработку ошибок.
# @DEPENDS_ON: superset_tool.exceptions -> Для генерации специфичных сетевых и API ошибок.
# @DEPENDS_ON: superset_tool.utils.logger -> Для детального логирования сетевых операций.
# @DEPENDS_ON: requests -> Основа для выполнения HTTP-запросов.
# <IMPORTS>
from typing import Optional, Dict, Any, List, Union
import json
import io
from pathlib import Path
import requests
import urllib3
from superset_tool.exceptions import AuthenticationError, NetworkError, DashboardNotFoundError, SupersetAPIError, PermissionDeniedError
from superset_tool.utils.logger import SupersetLogger
# </IMPORTS>
# --- Начало кода модуля ---
# <ANCHOR id="APIClient" type="Class">
# @PURPOSE: Инкапсулирует HTTP-логику для работы с API, включая сессии, аутентификацию, и обработку запросов.
class APIClient:
DEFAULT_TIMEOUT = 30
def __init__(self, config: Dict[str, Any], verify_ssl: bool = True, timeout: int = DEFAULT_TIMEOUT, logger: Optional[SupersetLogger] = None):
# <ANCHOR id="APIClient.__init__" type="Function">
# @PURPOSE: Инициализирует API клиент с конфигурацией, сессией и логгером.
self.logger = logger or SupersetLogger(name="APIClient")
self.logger.info("[APIClient.__init__][Enter] Initializing APIClient.")
self.base_url = config.get("base_url")
self.auth = config.get("auth")
self.request_settings = {"verify_ssl": verify_ssl, "timeout": timeout}
self.session = self._init_session()
self._tokens: Dict[str, str] = {}
self._authenticated = False
self.logger.info("[APIClient.__init__][Exit] APIClient initialized.")
# </ANCHOR>
def _init_session(self) -> requests.Session:
# <ANCHOR id="APIClient._init_session" type="Function">
# @PURPOSE: Создает и настраивает `requests.Session` с retry-логикой.
# @INTERNAL
session = requests.Session()
retries = requests.adapters.Retry(total=3, backoff_factor=0.5, status_forcelist=[500, 502, 503, 504])
adapter = requests.adapters.HTTPAdapter(max_retries=retries)
session.mount('http://', adapter)
session.mount('https://', adapter)
if not self.request_settings["verify_ssl"]:
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
self.logger.warning("[_init_session][State] SSL verification disabled.")
session.verify = self.request_settings["verify_ssl"]
return session
# </ANCHOR>
def authenticate(self) -> Dict[str, str]:
# <ANCHOR id="APIClient.authenticate" type="Function">
# @PURPOSE: Выполняет аутентификацию в Superset API и получает access и CSRF токены.
# @POST: `self._tokens` заполнен, `self._authenticated` установлен в `True`.
# @RETURN: Словарь с токенами.
# @THROW: AuthenticationError, NetworkError - при ошибках.
self.logger.info("[authenticate][Enter] Authenticating to %s", self.base_url)
try:
login_url = f"{self.base_url}/security/login"
response = self.session.post(login_url, json=self.auth, timeout=self.request_settings["timeout"])
response.raise_for_status()
access_token = response.json()["access_token"]
csrf_url = f"{self.base_url}/security/csrf_token/"
csrf_response = self.session.get(csrf_url, headers={"Authorization": f"Bearer {access_token}"}, timeout=self.request_settings["timeout"])
csrf_response.raise_for_status()
self._tokens = {"access_token": access_token, "csrf_token": csrf_response.json()["result"]}
self._authenticated = True
self.logger.info("[authenticate][Exit] Authenticated successfully.")
return self._tokens
except requests.exceptions.HTTPError as e:
raise AuthenticationError(f"Authentication failed: {e}") from e
except (requests.exceptions.RequestException, KeyError) as e:
raise NetworkError(f"Network or parsing error during authentication: {e}") from e
# </ANCHOR>
@property
def headers(self) -> Dict[str, str]:
# <ANCHOR id="APIClient.headers" type="Property">
# @PURPOSE: Возвращает HTTP-заголовки для аутентифицированных запросов.
if not self._authenticated: self.authenticate()
return {
"Authorization": f"Bearer {self._tokens['access_token']}",
"X-CSRFToken": self._tokens.get("csrf_token", ""),
"Referer": self.base_url,
"Content-Type": "application/json"
}
# </ANCHOR>
def request(self, method: str, endpoint: str, headers: Optional[Dict] = None, raw_response: bool = False, **kwargs) -> Union[requests.Response, Dict[str, Any]]:
# <ANCHOR id="APIClient.request" type="Function">
# @PURPOSE: Выполняет универсальный HTTP-запрос к API.
# @RETURN: `requests.Response` если `raw_response=True`, иначе `dict`.
# @THROW: SupersetAPIError, NetworkError и их подклассы.
full_url = f"{self.base_url}{endpoint}"
_headers = self.headers.copy()
if headers: _headers.update(headers)
try:
response = self.session.request(method, full_url, headers=_headers, **kwargs)
response.raise_for_status()
return response if raw_response else response.json()
except requests.exceptions.HTTPError as e:
self._handle_http_error(e, endpoint)
except requests.exceptions.RequestException as e:
self._handle_network_error(e, full_url)
# </ANCHOR>
def _handle_http_error(self, e: requests.exceptions.HTTPError, endpoint: str):
# <ANCHOR id="APIClient._handle_http_error" type="Function">
# @PURPOSE: (Helper) Преобразует HTTP ошибки в кастомные исключения.
# @INTERNAL
status_code = e.response.status_code
if status_code == 404: raise DashboardNotFoundError(endpoint) from e
if status_code == 403: raise PermissionDeniedError() from e
if status_code == 401: raise AuthenticationError() from e
raise SupersetAPIError(f"API Error {status_code}: {e.response.text}") from e
# </ANCHOR>
def _handle_network_error(self, e: requests.exceptions.RequestException, url: str):
# <ANCHOR id="APIClient._handle_network_error" type="Function">
# @PURPOSE: (Helper) Преобразует сетевые ошибки в `NetworkError`.
# @INTERNAL
if isinstance(e, requests.exceptions.Timeout): msg = "Request timeout"
elif isinstance(e, requests.exceptions.ConnectionError): msg = "Connection error"
else: msg = f"Unknown network error: {e}"
raise NetworkError(msg, url=url) from e
# </ANCHOR>
def upload_file(self, endpoint: str, file_info: Dict[str, Any], extra_data: Optional[Dict] = None, timeout: Optional[int] = None) -> Dict:
# <ANCHOR id="APIClient.upload_file" type="Function">
# @PURPOSE: Загружает файл на сервер через multipart/form-data.
# @RETURN: Ответ API в виде словаря.
# @THROW: SupersetAPIError, NetworkError, TypeError.
full_url = f"{self.base_url}{endpoint}"
_headers = self.headers.copy(); _headers.pop('Content-Type', None)
file_obj, file_name, form_field = file_info.get("file_obj"), file_info.get("file_name"), file_info.get("form_field", "file")
files_payload = {}
if isinstance(file_obj, (str, Path)):
with open(file_obj, 'rb') as f:
files_payload = {form_field: (file_name, f.read(), 'application/x-zip-compressed')}
elif isinstance(file_obj, io.BytesIO):
files_payload = {form_field: (file_name, file_obj.getvalue(), 'application/x-zip-compressed')}
else:
raise TypeError(f"Unsupported file_obj type: {type(file_obj)}")
return self._perform_upload(full_url, files_payload, extra_data, _headers, timeout)
# </ANCHOR>
def _perform_upload(self, url: str, files: Dict, data: Optional[Dict], headers: Dict, timeout: Optional[int]) -> Dict:
# <ANCHOR id="APIClient._perform_upload" type="Function">
# @PURPOSE: (Helper) Выполняет POST запрос с файлом.
# @INTERNAL
try:
response = self.session.post(url, files=files, data=data or {}, headers=headers, timeout=timeout or self.request_settings["timeout"])
response.raise_for_status()
return response.json()
except requests.exceptions.HTTPError as e:
raise SupersetAPIError(f"API error during upload: {e.response.text}") from e
except requests.exceptions.RequestException as e:
raise NetworkError(f"Network error during upload: {e}", url=url) from e
# </ANCHOR>
def fetch_paginated_count(self, endpoint: str, query_params: Dict, count_field: str = "count") -> int:
# <ANCHOR id="APIClient.fetch_paginated_count" type="Function">
# @PURPOSE: Получает общее количество элементов для пагинации.
response_json = self.request("GET", endpoint, params={"q": json.dumps(query_params)})
return response_json.get(count_field, 0)
# </ANCHOR>
def fetch_paginated_data(self, endpoint: str, pagination_options: Dict[str, Any]) -> List[Any]:
# <ANCHOR id="APIClient.fetch_paginated_data" type="Function">
# @PURPOSE: Автоматически собирает данные со всех страниц пагинированного эндпоинта.
base_query, total_count = pagination_options["base_query"], pagination_options["total_count"]
results_field, page_size = pagination_options["results_field"], base_query.get('page_size')
assert page_size and page_size > 0, "'page_size' must be a positive number."
results = []
for page in range((total_count + page_size - 1) // page_size):
query = {**base_query, 'page': page}
response_json = self.request("GET", endpoint, params={"q": json.dumps(query)})
results.extend(response_json.get(results_field, []))
return results
# </ANCHOR>
# </ANCHOR id="APIClient">
# --- Конец кода модуля ---
# </GRACE_MODULE id="superset_tool.utils.network">