refactor 1st stage

This commit is contained in:
Volobuev Andrey
2025-06-27 17:05:33 +03:00
parent c0a6ca7769
commit 2b35038f73
7 changed files with 1306 additions and 632 deletions

View File

@@ -0,0 +1,78 @@
# Superset Automation Tools
## Overview
This repository contains Python scripts and a utility library (`superset_tool`) designed to automate common tasks with Apache Superset, such as:
- **Backup**: Exporting all dashboards from Superset instances to local storage.
- **Migration**: Transferring and transforming dashboards between different Superset environments (e.g., Development, Sandbox, Production).
## Project Structure
- `backup_script.py`: Main script for performing scheduled backups of Superset dashboards.
- `migration_script.py`: Main script for migrating specific dashboards between environments, including database connection remapping.
- `superset_tool/`:
- `client.py`: Python client for interacting with the Superset API.
- `exceptions.py`: Custom exception classes for structured error handling.
- `models.py`: Pydantic models for configuration data validation.
- `utils/`:
- `fileio.py`: Utilities for file system operations (archive handling, YAML parsing).
- `logger.py`: Custom logger configuration for consistent logging across the project.
- `network.py`: Low-level HTTP client for network requests, handling authentication and retries.
## Setup
### Prerequisites
- Python 3.9+
- `pip` for package management.
- `keyring` for secure password storage.
### Installation
1. **Clone the repository:**
```bash
git clone https://your-repo-link/superset-automation.git
cd superset-automation
```
2. **Install dependencies:**
```bash
pip install -r requirements.txt
```
(You might need to create `requirements.txt` with `pydantic`, `requests`, `keyring`, `PyYAML`, `urllib3`)
3. **Configure Passwords:**
Use `keyring` to store your Superset API user passwords.
Example for `backup_script.py`:
```python
import keyring
keyring.set_password("system", "dev migrate", "your_dev_password")
keyring.set_password("system", "prod migrate", "your_prod_password")
keyring.set_password("system", "sandbox migrate", "your_sandbox_password")
```
Replace `"system"` with a suitable service name if preferred.
## Usage
### Backup Script (`backup_script.py`)
To back up dashboards from configured Superset environments:
```bash
python backup_script.py
```
Backup files will be saved to `P:\Superset\010 Бекапы\` by default. Logs will be in `P:\Superset\010 Бекапы\Logs`.
### Migration Script (`migration_script.py`)
To migrate a specific dashboard:
```bash
python migration_script.py
```
**Note:** This script is currently configured to migrate a hardcoded dashboard slug (`FI0070`) and uses a local `.zip` file for the dashboard source. **For production use, you should modify the script to:**
- Accept dashboard slug/ID as command-line arguments.
- Utilize the `from_c.export_dashboard()` method instead of reading from a local file.
- Potentially parameterize `from_c` and `to_c` environments.
## Logging
Logs are configured to be written to a file within a `Logs` directory (e.g., `P:\Superset\010 Бекапы\Logs` for backups) and also output to the console. The log level is set to `INFO` by default.
## Development & Contribution
- Follow the architectural patterns (`[MODULE]`, `[CONTRACT]`, `[SECTION]`, `[ANCHOR]`) and logging guidelines for consistency.
- Ensure all new code adheres to the principles of "LLM-friendly" code generation.
- Use `Pydantic` models for data validation.
- Implement comprehensive error handling using custom exceptions.
---
[COHERENCE_CHECK_PASSED] README.md создан и согласован с модулями.

View File

@@ -1,20 +1,48 @@
# [MODULE] Superset Dashboard Backup Script
# @contract: Автоматизирует процесс резервного копирования дашбордов Superset из различных окружений.
# @semantic_layers:
# 1. Инициализация логгера и клиентов Superset.
# 2. Выполнение бэкапа для каждого окружения (DEV, SBX, PROD).
# 3. Формирование итогового отчета.
# @coherence:
# - Использует `SupersetClient` для взаимодействия с API Superset.
# - Использует `SupersetLogger` для централизованного логирования.
# - Работает с `Pathlib` для управления файлами и директориями.
# - Интегрируется с `keyring` для безопасного хранения паролей.
# [IMPORTS] Стандартная библиотека
import logging import logging
from datetime import datetime from datetime import datetime
import shutil import shutil
import keyring
import os import os
from pathlib import Path from pathlib import Path
from superset_tool.models import SupersetConfig, DatabaseConfig
# [IMPORTS] Сторонние библиотеки
import keyring
# [IMPORTS] Локальные модули
from superset_tool.models import SupersetConfig
from superset_tool.client import SupersetClient from superset_tool.client import SupersetClient
from superset_tool.utils.logger import SupersetLogger from superset_tool.utils.logger import SupersetLogger
from superset_tool.utils.fileio import save_and_unpack_dashboard, archive_exports, sanitize_filename from superset_tool.utils.fileio import save_and_unpack_dashboard, archive_exports, sanitize_filename
# [COHERENCE_CHECK_PASSED] Все необходимые модули импортированы и согласованы.
# [FUNCTION] setup_clients
# @contract: Инициализирует и возвращает SupersetClient для каждого заданного окружения.
# @pre:
# - `keyring` должен содержать необходимые пароли для "dev migrate", "prod migrate", "sandbox migrate".
# - `logger` должен быть инициализирован.
# @post:
# - Возвращает словарь {env_name: SupersetClient_instance}.
# - Логирует успешную инициализацию или ошибку.
# @raise:
# - `Exception`: При любой ошибке в процессе инициализации клиентов (например, отсутствие пароля в keyring, проблемы с сетью при первой аутентификации).
def setup_clients(logger: SupersetLogger): def setup_clients(logger: SupersetLogger):
"""Инициализация клиентов для разных окружений""" """Инициализация клиентов для разных окружений"""
# [ANCHOR] CLIENTS_INITIALIZATION
clients = {} clients = {}
try: try:
# Конфигурация для Dev # [INFO] Инициализация конфигурации для Dev
dev_config = SupersetConfig( dev_config = SupersetConfig(
base_url="https://devta.bi.dwh.rusal.com/api/v1", base_url="https://devta.bi.dwh.rusal.com/api/v1",
auth={ auth={
@@ -23,11 +51,11 @@ def setup_clients(logger: SupersetLogger):
"password": keyring.get_password("system", "dev migrate"), "password": keyring.get_password("system", "dev migrate"),
"refresh": True "refresh": True
}, },
logger=logger,
verify_ssl=False verify_ssl=False
) )
# [DEBUG] Dev config created: {dev_config.base_url}
# Конфигурация для Prod # [INFO] Инициализация конфигурации для Prod
prod_config = SupersetConfig( prod_config = SupersetConfig(
base_url="https://prodta.bi.dwh.rusal.com/api/v1", base_url="https://prodta.bi.dwh.rusal.com/api/v1",
auth={ auth={
@@ -36,11 +64,11 @@ def setup_clients(logger: SupersetLogger):
"password": keyring.get_password("system", "prod migrate"), "password": keyring.get_password("system", "prod migrate"),
"refresh": True "refresh": True
}, },
logger=logger,
verify_ssl=False verify_ssl=False
) )
# [DEBUG] Prod config created: {prod_config.base_url}
# Конфигурация для Sandbox # [INFO] Инициализация конфигурации для Sandbox
sandbox_config = SupersetConfig( sandbox_config = SupersetConfig(
base_url="https://sandboxta.bi.dwh.rusal.com/api/v1", base_url="https://sandboxta.bi.dwh.rusal.com/api/v1",
auth={ auth={
@@ -49,92 +77,169 @@ def setup_clients(logger: SupersetLogger):
"password": keyring.get_password("system", "sandbox migrate"), "password": keyring.get_password("system", "sandbox migrate"),
"refresh": True "refresh": True
}, },
logger=logger,
verify_ssl=False verify_ssl=False
) )
# [DEBUG] Sandbox config created: {sandbox_config.base_url}
clients['dev'] = SupersetClient(dev_config) # [INFO] Создание экземпляров SupersetClient
clients['sbx'] = SupersetClient(sandbox_config) clients['dev'] = SupersetClient(dev_config, logger)
clients['prod'] = SupersetClient(prod_config) clients['sbx'] = SupersetClient(sandbox_config,logger)
logger.info("Клиенты для окружений успешно инициализированы") clients['prod'] = SupersetClient(prod_config,logger)
logger.info("[COHERENCE_CHECK_PASSED] Клиенты для окружений успешно инициализированы", extra={"envs": list(clients.keys())})
return clients return clients
except Exception as e: except Exception as e:
logger.error(f"Ошибка инициализации клиентов: {str(e)}") logger.error(f"[ERROR] Ошибка инициализации клиентов: {str(e)}", exc_info=True)
raise raise
def backup_dashboards(client, env_name, backup_root, logger): # [FUNCTION] backup_dashboards
# @contract: Выполняет бэкап всех доступных дашбордов для заданного клиента и окружения.
# @pre:
# - `client` должен быть инициализированным экземпляром `SupersetClient`.
# - `env_name` должен быть строкой, обозначающей окружение.
# - `backup_root` должен быть валидным путем к корневой директории бэкапа.
# - `logger` должен быть инициализирован.
# @post:
# - Дашборды экспортируются и сохраняются в поддиректориях `backup_root/env_name/dashboard_title`.
# - Старые экспорты архивируются.
# - Возвращает `True` если все дашборды были экспортированы без критических ошибок, `False` иначе.
# @side_effects:
# - Создает директории и файлы в файловой системе.
# - Логирует статус выполнения, успешные экспорты и ошибки.
# @exceptions:
# - `SupersetAPIError`, `NetworkError`, `DashboardNotFoundError`, `ExportError` могут быть подняты методами `SupersetClient` и будут логированы.
def backup_dashboards(client: SupersetClient, env_name: str, backup_root: Path, logger: SupersetLogger) -> bool:
"""Выполнение бэкапа дашбордов с детальным логированием ошибок""" """Выполнение бэкапа дашбордов с детальным логированием ошибок"""
# [ANCHOR] DASHBOARD_BACKUP_PROCESS
logger.info(f"[INFO] Запуск бэкапа дашбордов для окружения: {env_name}")
try: try:
dashboard_count, dashboard_meta = client.get_dashboards() dashboard_count, dashboard_meta = client.get_dashboards()
logger.info(f"[INFO] Найдено {dashboard_count} дашбордов для экспорта в {env_name}")
if dashboard_count == 0: if dashboard_count == 0:
logger.warning(f"Нет дашбордов для экспорта в {env_name}") logger.warning(f"[WARN] Нет дашбордов для экспорта в {env_name}. Процесс завершен.")
return True return True
success = 0 success_count = 0
errors = [] error_details = []
for db in dashboard_meta: for db in dashboard_meta:
if not db.get('slug'): dashboard_id = db.get('id')
dashboard_title = db.get('dashboard_title', 'Unknown Dashboard')
dashboard_slug = db.get('slug', 'unknown-slug') # Используем slug для уникальности
# [PRECONDITION] Проверка наличия ID и slug
if not dashboard_id or not dashboard_slug:
logger.warning(
f"[SKIP] Пропущен дашборд с неполными метаданными: {dashboard_title} (ID: {dashboard_id}, Slug: {dashboard_slug})",
extra={'dashboard_meta': db}
)
continue continue
try: logger.debug(f"[DEBUG] Попытка экспорта дашборда: '{dashboard_title}' (ID: {dashboard_id})")
dashboard_title = db['dashboard_title']
dashboard_dir = Path(backup_root) / env_name / sanitize_filename(dashboard_title)
dashboard_dir.mkdir(parents=True, exist_ok=True)
zip_content, filename = client.export_dashboard(db['id']) try:
# [ANCHOR] CREATE_DASHBOARD_DIR
# Используем slug в пути для большей уникальности и избежания конфликтов имен
dashboard_base_dir_name = sanitize_filename(f"{dashboard_slug}-{dashboard_title}")
dashboard_dir = backup_root / env_name / dashboard_base_dir_name
dashboard_dir.mkdir(parents=True, exist_ok=True)
logger.debug(f"[DEBUG] Директория для дашборда: {dashboard_dir}")
# [ANCHOR] EXPORT_DASHBOARD_ZIP
zip_content, filename = client.export_dashboard(dashboard_id)
# [ANCHOR] SAVE_AND_UNPACK
# Сохраняем только ZIP-файл, распаковка здесь не нужна для бэкапа
save_and_unpack_dashboard( save_and_unpack_dashboard(
zip_content=zip_content, zip_content=zip_content,
original_filename=filename, original_filename=filename,
output_dir=dashboard_dir, output_dir=dashboard_dir,
unpack=False unpack=False, # Только сохраняем ZIP, не распаковываем для бэкапа
logger=logger
)
logger.info(f"[INFO] Дашборд '{dashboard_title}' (ID: {dashboard_id}) успешно экспортирован.")
# [ANCHOR] ARCHIVE_OLD_BACKUPS
try:
archive_exports(dashboard_dir, logger=logger)
logger.debug(f"[DEBUG] Старые экспорты для '{dashboard_title}' архивированы.")
except Exception as cleanup_error:
logger.warning(
f"[WARN] Ошибка архивирования старых бэкапов для '{dashboard_title}': {cleanup_error}",
exc_info=False # Не показываем полный traceback для очистки, т.к. это второстепенно
) )
# Архивирование старых бэкапов success_count += 1
try:
archive_exports(dashboard_dir)
except Exception as cleanup_error:
logger.warning(f"Ошибка очистки архива: {cleanup_error}")
success += 1
except Exception as db_error: except Exception as db_error:
error_info = { error_info = {
'dashboard': db.get('dashboard_title'), 'dashboard_id': dashboard_id,
'error': str(db_error), 'dashboard_title': dashboard_title,
'env': env_name 'error_message': str(db_error),
'env': env_name,
'error_type': type(db_error).__name__
} }
errors.append(error_info) error_details.append(error_info)
logger.error("Ошибка экспорта дашборда", extra=error_info) logger.error(
f"[ERROR] Ошибка экспорта дашборда '{dashboard_title}' (ID: {dashboard_id})",
extra=error_info, exc_info=True # Логируем полный traceback для ошибок экспорта
)
if errors: if error_details:
logger.error(f"Итоги экспорта для {env_name}", logger.error(
extra={'success': success, 'errors': errors, 'total': dashboard_count}) f"[COHERENCE_CHECK_FAILED] Итоги экспорта для {env_name}:",
extra={'success_count': success_count, 'errors': error_details, 'total_dashboards': dashboard_count}
return len(errors) == 0 )
return False
else:
logger.info(
f"[COHERENCE_CHECK_PASSED] Все {success_count} дашбордов для {env_name} успешно экспортированы."
)
return True
except Exception as e: except Exception as e:
logger.critical(f"Фатальная ошибка бэкапа {env_name}: {str(e)}", exc_info=True) logger.critical(
f"[CRITICAL] Фатальная ошибка бэкапа для окружения {env_name}: {str(e)}",
exc_info=True
)
return False return False
def main(): # [FUNCTION] main
# Инициализация логгера # @contract: Основная точка входа скрипта.
log_dir = Path("P:\\Superset\\010 Бекапы\\Logs") # @semantic: Координирует инициализацию, выполнение бэкапа и логирование результатов.
# @post:
# - Возвращает 0 при успешном выполнении, 1 при фатальной ошибке.
# @side_effects:
# - Инициализирует логгер.
# - Вызывает `setup_clients` и `backup_dashboards`.
# - Записывает логи в файл и выводит в консоль.
def main() -> int:
"""Основная функция выполнения бэкапа"""
# [ANCHOR] MAIN_EXECUTION_START
# [CONFIG] Инициализация логгера
# @invariant: Логгер должен быть доступен на протяжении всей работы скрипта.
log_dir = Path("P:\\Superset\\010 Бекапы\\Logs") # [COHERENCE_NOTE] Убедитесь, что путь доступен.
logger = SupersetLogger( logger = SupersetLogger(
log_dir=log_dir, log_dir=log_dir,
level=logging.INFO, level=logging.INFO,
console=True console=True
) )
"""Основная функция выполнения бэкапа"""
logger.info("="*50) logger.info("="*50)
logger.info("Запуск процесса бэкапа Superset") logger.info("[INFO] Запуск процесса бэкапа Superset")
logger.info("="*50) logger.info("="*50)
exit_code = 0 # [STATE] Код выхода скрипта
try: try:
# [ANCHOR] CLIENT_SETUP
clients = setup_clients(logger) clients = setup_clients(logger)
superset_backup_repo = Path("P:\\Superset\\010 Бекапы")
# Бэкап для DEV # [CONFIG] Определение корневой директории для бэкапов
# @invariant: superset_backup_repo должен быть доступен для записи.
superset_backup_repo = Path("P:\\Superset\\010 Бекапы")
superset_backup_repo.mkdir(parents=True, exist_ok=True) # Гарантируем существование директории
logger.info(f"[INFO] Корневая директория бэкапов: {superset_backup_repo}")
# [ANCHOR] BACKUP_DEV_ENVIRONMENT
dev_success = backup_dashboards( dev_success = backup_dashboards(
clients['dev'], clients['dev'],
"DEV", "DEV",
@@ -142,14 +247,15 @@ def main():
logger=logger logger=logger
) )
#Бэкап для Sandbox # [ANCHOR] BACKUP_SBX_ENVIRONMENT
sbx_success = backup_dashboards( sbx_success = backup_dashboards(
clients['sbx'], clients['sbx'],
"SBX", "SBX",
superset_backup_repo, superset_backup_repo,
logger=logger logger=logger
) )
#Бэкап для Прода
# [ANCHOR] BACKUP_PROD_ENVIRONMENT
prod_success = backup_dashboards( prod_success = backup_dashboards(
clients['prod'], clients['prod'],
"PROD", "PROD",
@@ -157,21 +263,29 @@ def main():
logger=logger logger=logger
) )
# Итоговый отчет # [ANCHOR] FINAL_REPORT
# [INFO] Итоговый отчет о выполнении бэкапа
logger.info("="*50) logger.info("="*50)
logger.info("Итоги выполнения бэкапа:") logger.info("[INFO] Итоги выполнения бэкапа:")
logger.info(f"DEV: {'Успешно' if dev_success else 'С ошибками'}") logger.info(f"[INFO] DEV: {'Успешно' if dev_success else 'С ошибками'}")
logger.info(f"SBX: {'Успешно' if sbx_success else 'С ошибками'}") logger.info(f"[INFO] SBX: {'Успешно' if sbx_success else 'С ошибками'}")
logger.info(f"PROD: {'Успешно' if prod_success else 'С ошибками'}") logger.info(f"[INFO] PROD: {'Успешно' if prod_success else 'С ошибками'}")
logger.info(f"Полный лог доступен в: {log_dir}") logger.info(f"[INFO] Полный лог доступен в: {log_dir}")
if not (dev_success and sbx_success and prod_success):
exit_code = 1
logger.warning("[COHERENCE_CHECK_FAILED] Бэкап завершен с ошибками в одном или нескольких окружениях.")
else:
logger.info("[COHERENCE_CHECK_PASSED] Все бэкапы успешно завершены без ошибок.")
except Exception as e: except Exception as e:
logger.critical(f"Фатальная ошибка выполнения скрипта: {str(e)}", exc_info=True) logger.critical(f"[CRITICAL] Фатальная ошибка выполнения скрипта: {str(e)}", exc_info=True)
return 1 exit_code = 1
logger.info("Процесс бэкапа завершен") logger.info("[INFO] Процесс бэкапа завершен")
return 0 return exit_code
# [ENTRYPOINT] Главная точка запуска скрипта
if __name__ == "__main__": if __name__ == "__main__":
exit_code = main() exit_code = main()
exit(exit_code) exit(exit_code)

View File

@@ -1,22 +1,44 @@
# [MODULE] Superset Dashboard Migration Script
# @contract: Автоматизирует процесс миграции и обновления дашбордов Superset между окружениями.
# @semantic_layers:
# 1. Конфигурация клиентов Superset для исходного и целевого окружений.
# 2. Определение правил трансформации конфигураций баз данных.
# 3. Экспорт дашборда, модификация YAML-файлов, создание нового архива и импорт.
# @coherence:
# - Использует `SupersetClient` для взаимодействия с API Superset.
# - Использует `SupersetLogger` для централизованного логирования.
# - Работает с `Pathlib` для управления файлами и директориями.
# - Интегрируется с `keyring` для безопасного хранения паролей.
# - Зависит от утилит `fileio` для обработки архивов и YAML-файлов.
# [IMPORTS] Локальные модули
from superset_tool.models import SupersetConfig from superset_tool.models import SupersetConfig
from superset_tool.client import SupersetClient from superset_tool.client import SupersetClient
from superset_tool.utils.logger import SupersetLogger from superset_tool.utils.logger import SupersetLogger
from superset_tool.exceptions import AuthenticationError from superset_tool.exceptions import AuthenticationError, SupersetAPIError, NetworkError, DashboardNotFoundError
from superset_tool.utils.fileio import save_and_unpack_dashboard, update_yamls, create_dashboard_export, create_temp_file,read_dashboard_from_disk from superset_tool.utils.fileio import save_and_unpack_dashboard, update_yamls, create_dashboard_export, create_temp_file, read_dashboard_from_disk
# [IMPORTS] Стандартная библиотека
import os import os
import keyring import keyring
from pathlib import Path from pathlib import Path
import logging import logging
log_dir = Path("H:\\dev\\Logs") # [CONFIG] Инициализация глобального логгера
# @invariant: Логгер доступен для всех компонентов скрипта.
log_dir = Path("H:\\dev\\Logs") # [COHERENCE_NOTE] Убедитесь, что путь доступен.
logger = SupersetLogger( logger = SupersetLogger(
log_dir=log_dir, log_dir=log_dir,
level=logging.INFO, level=logging.INFO,
console=True console=True
) )
logger.info("[COHERENCE_CHECK_PASSED] Логгер инициализирован для скрипта миграции.")
database_config_click={"old": # [CONFIG] Конфигурация трансформации базы данных Clickhouse
{ # @semantic: Определяет, как UUID и URI базы данных Clickhouse должны быть изменены.
# @invariant: 'old' и 'new' должны содержать полные конфигурации.
database_config_click = {
"old": {
"database_name": "Prod Clickhouse", "database_name": "Prod Clickhouse",
"sqlalchemy_uri": "clickhousedb+connect://clicketl:XXXXXXXXXX@rgm-s-khclk.hq.root.ad:443/dm", "sqlalchemy_uri": "clickhousedb+connect://clicketl:XXXXXXXXXX@rgm-s-khclk.hq.root.ad:443/dm",
"uuid": "b9b67cb5-9874-4dc6-87bd-354fc33be6f9", "uuid": "b9b67cb5-9874-4dc6-87bd-354fc33be6f9",
@@ -34,10 +56,14 @@ database_config_click={"old":
"allow_cvas": "true", "allow_cvas": "true",
"allow_dml": "true" "allow_dml": "true"
} }
} }
logger.debug("[CONFIG] Конфигурация Clickhouse загружена.")
database_config_gp={"old": # [CONFIG] Конфигурация трансформации базы данных Greenplum
{ # @semantic: Определяет, как UUID и URI базы данных Greenplum должны быть изменены.
# @invariant: 'old' и 'new' должны содержать полные конфигурации.
database_config_gp = {
"old": {
"database_name": "Prod Greenplum", "database_name": "Prod Greenplum",
"sqlalchemy_uri": "postgresql+psycopg2://viz_powerbi_gp_prod:XXXXXXXXXX@10.66.229.201:5432/dwh", "sqlalchemy_uri": "postgresql+psycopg2://viz_powerbi_gp_prod:XXXXXXXXXX@10.66.229.201:5432/dwh",
"uuid": "805132a3-e942-40ce-99c7-bee8f82f8aa8", "uuid": "805132a3-e942-40ce-99c7-bee8f82f8aa8",
@@ -55,9 +81,10 @@ database_config_gp={"old":
"allow_cvas": "false", "allow_cvas": "false",
"allow_dml": "false" "allow_dml": "false"
} }
} }
logger.debug("[CONFIG] Конфигурация Greenplum загружена.")
# Конфигурация для Dev # [CONFIG] Конфигурация Superset API для Dev окружения
dev_config = SupersetConfig( dev_config = SupersetConfig(
base_url="https://devta.bi.dwh.rusal.com/api/v1", base_url="https://devta.bi.dwh.rusal.com/api/v1",
auth={ auth={
@@ -69,8 +96,9 @@ dev_config = SupersetConfig(
logger=logger, logger=logger,
verify_ssl=False verify_ssl=False
) )
logger.debug(f"[CONFIG] Dev SupersetConfig создан для {dev_config.base_url}")
# Конфигурация для Prod # [CONFIG] Конфигурация Superset API для Prod окружения
prod_config = SupersetConfig( prod_config = SupersetConfig(
base_url="https://prodta.bi.dwh.rusal.com/api/v1", base_url="https://prodta.bi.dwh.rusal.com/api/v1",
auth={ auth={
@@ -82,8 +110,9 @@ prod_config = SupersetConfig(
logger=logger, logger=logger,
verify_ssl=False verify_ssl=False
) )
logger.debug(f"[CONFIG] Prod SupersetConfig создан для {prod_config.base_url}")
# Конфигурация для Sandbox # [CONFIG] Конфигурация Superset API для Sandbox окружения
sandbox_config = SupersetConfig( sandbox_config = SupersetConfig(
base_url="https://sandboxta.bi.dwh.rusal.com/api/v1", base_url="https://sandboxta.bi.dwh.rusal.com/api/v1",
auth={ auth={
@@ -95,31 +124,51 @@ sandbox_config = SupersetConfig(
logger=logger, logger=logger,
verify_ssl=False verify_ssl=False
) )
logger.debug(f"[CONFIG] Sandbox SupersetConfig создан для {sandbox_config.base_url}")
# Инициализация клиента # [INIT] Инициализация клиентов Superset API
# @invariant: Все клиенты должны быть успешно инициализированы для дальнейшей работы.
try:
dev_client = SupersetClient(dev_config)
sandbox_client = SupersetClient(sandbox_config)
prod_client = SupersetClient(prod_config) # Не используется в текущем flow, но инициализирован.
logger.info("[COHERENCE_CHECK_PASSED] Клиенты Superset успешно инициализированы.")
except Exception as e:
logger.critical(f"[CRITICAL] Ошибка инициализации клиентов Superset: {str(e)}", exc_info=True)
exit(1) # Выход из скрипта при критической ошибке инициализации
dev_client = SupersetClient(dev_config) # [CONFIG] Определение исходного и целевого клиентов для миграции
sandbox_client = SupersetClient(sandbox_config) # [COHERENCE_NOTE] Эти переменные задают конкретную миграцию. Для параметризации можно использовать аргументы командной строки.
prod_client = SupersetClient(prod_config) from_c = sandbox_client # Источник миграции
to_c = dev_client # Цель миграции
dashboard_slug = "FI0070" # Идентификатор дашборда для миграции
# dashboard_id = 53 # ID не нужен, если есть slug
from_c = sandbox_client logger.info(f"[INFO] Конфигурация миграции: From '{from_c.config.base_url}' To '{to_c.config.base_url}' for dashboard slug '{dashboard_slug}'")
to_c = dev_client
dashboard_slug = "FI0070"
dashboard_id = 53
dashboard_meta = from_c.get_dashboard(dashboard_slug) try:
#print(dashboard_meta) # [ACTION] Получение метаданных исходного дашборда
#print(dashboard_meta["dashboard_title"]) logger.info(f"[INFO] Получение метаданных дашборда '{dashboard_slug}' из исходного окружения.")
dashboard_meta = from_c.get_dashboard(dashboard_slug)
dashboard_id = dashboard_meta["id"] # Получаем ID из метаданных
logger.info(f"[INFO] Найден дашборд '{dashboard_meta['dashboard_title']}' с ID: {dashboard_id}.")
dashboard_id = dashboard_meta["id"] # [CONTEXT_MANAGER] Работа с временной директорией для обработки архива дашборда
with create_temp_file(suffix='.dir', logger=logger) as temp_root:
logger.info(f"[INFO] Создана временная директория: {temp_root}")
with create_temp_file(suffix='.dir', logger=logger) as temp_root: # [ANCHOR] EXPORT_DASHBOARD
# Экспорт дашборда во временную директорию # Экспорт дашборда во временную директорию ИЛИ чтение с диска
#zip_content, filename = from_c.export_dashboard(dashboard_id, logger=logger) # [COHERENCE_NOTE] В текущем коде закомментирован экспорт и используется локальный файл.
# Для полноценной миграции следует использовать export_dashboard().
# zip_content, filename = from_c.export_dashboard(dashboard_id) # Предпочтительный путь для реальной миграции
# [DEBUG] Использование файла с диска для тестирования миграции
zip_db_path = r"C:\Users\VolobuevAA\Downloads\dashboard_export_20250616T174203.zip" zip_db_path = r"C:\Users\VolobuevAA\Downloads\dashboard_export_20250616T174203.zip"
logger.warning(f"[WARN] Используется ЛОКАЛЬНЫЙ файл дашборда для миграции: {zip_db_path}. Это может привести к некогерентности, если файл устарел.")
zip_content, filename = read_dashboard_from_disk(zip_db_path, logger=logger) zip_content, filename = read_dashboard_from_disk(zip_db_path, logger=logger)
# [ANCHOR] SAVE_AND_UNPACK
# Сохранение и распаковка во временную директорию # Сохранение и распаковка во временную директорию
zip_path, unpacked_path = save_and_unpack_dashboard( zip_path, unpacked_path = save_and_unpack_dashboard(
zip_content=zip_content, zip_content=zip_content,
@@ -128,15 +177,34 @@ with create_temp_file(suffix='.dir', logger=logger) as temp_root:
logger=logger, logger=logger,
output_dir=temp_root output_dir=temp_root
) )
logger.info(f"[INFO] Дашборд распакован во временную директорию: {unpacked_path}")
# Обновление конфигураций # [ANCHOR] UPDATE_YAML_CONFIGS
source_path = unpacked_path / Path(filename).stem # Обновление конфигураций баз данных в YAML-файлах
update_yamls([database_config_click,database_config_gp], path=source_path, logger=logger) source_path = unpacked_path / Path(filename).stem # Путь к распакованному содержимому дашборда
db_configs_to_apply = [database_config_click, database_config_gp]
logger.info(f"[INFO] Применение трансформаций баз данных к YAML файлам в {source_path}...")
update_yamls(db_configs_to_apply, path=source_path, logger=logger)
logger.info("[INFO] YAML-файлы успешно обновлены.")
# Создание нового экспорта во временной директории # [ANCHOR] CREATE_NEW_EXPORT_ARCHIVE
temp_zip = temp_root / f"{dashboard_slug}.zip" # Создание нового экспорта дашборда из модифицированных файлов
temp_zip = temp_root / f"{dashboard_slug}_migrated.zip" # Имя файла для импорта
logger.info(f"[INFO] Создание нового ZIP-архива для импорта: {temp_zip}")
create_dashboard_export(temp_zip, [source_path], logger=logger) create_dashboard_export(temp_zip, [source_path], logger=logger)
logger.info("[INFO] Новый ZIP-архив дашборда готов к импорту.")
# Импорт обновленного дашборда # [ANCHOR] IMPORT_DASHBOARD
to_c.import_dashboard(temp_zip) # Импорт обновленного дашборда в целевое окружение
logger.info(f"[INFO] Запуск импорта дашборда в целевое окружение {to_c.config.base_url}...")
import_result = to_c.import_dashboard(temp_zip)
logger.info(f"[COHERENCE_CHECK_PASSED] Дашборд '{dashboard_slug}' успешно импортирован/обновлен.", extra={"import_result": import_result})
except (AuthenticationError, SupersetAPIError, NetworkError, DashboardNotFoundError) as e:
logger.error(f"[ERROR] Ошибка миграции дашборда: {str(e)}", exc_info=True, extra=e.context)
exit(1)
except Exception as e:
logger.critical(f"[CRITICAL] Фатальная и необработанная ошибка в скрипте миграции: {str(e)}", exc_info=True)
exit(1)
logger.info("[INFO] Процесс миграции завершен.")

View File

@@ -1,23 +1,23 @@
# [MODULE] Superset API Client # [MODULE] Superset API Client
# @contract: Реализует полное взаимодействие с Superset API # @contract: Реализует полное взаимодействие с Superset API
# @semantic_layers: # @semantic_layers:
# 1. Авторизация/CSRF # 1. Авторизация/CSRF (делегируется `APIClient`)
# 2. Основные операции (дашборды) # 2. Основные операции (получение метаданных, список дашбордов)
# 3. Импорт/экспорт # 3. Импорт/экспорт дашбордов
# @coherence: # @coherence:
# - Согласован с models.SupersetConfig # - Согласован с `models.SupersetConfig` для конфигурации.
# - Полная обработка всех errors из exceptions.py # - Полная обработка всех ошибок из `exceptions.py` (делегируется `APIClient` и дополняется специфичными).
# - Полностью использует `utils.network.APIClient` для всех HTTP-запросов.
# [IMPORTS] Стандартная библиотека # [IMPORTS] Стандартная библиотека
import json import json
from typing import Optional, Dict, Tuple, List, Any, Literal, Union from typing import Optional, Dict, Tuple, List, Any, Literal, Union
import datetime import datetime
from pathlib import Path from pathlib import Path
from requests import Response
import zipfile # Для валидации ZIP-файлов
# [IMPORTS] Сторонние библиотеки # [IMPORTS] Сторонние библиотеки (убраны requests и urllib3, т.к. они теперь в network.py)
import requests
import urllib3
import zipfile
# [IMPORTS] Локальные модули # [IMPORTS] Локальные модули
from superset_tool.models import SupersetConfig from superset_tool.models import SupersetConfig
@@ -32,304 +32,295 @@ from superset_tool.exceptions import (
) )
from superset_tool.utils.fileio import get_filename_from_headers from superset_tool.utils.fileio import get_filename_from_headers
from superset_tool.utils.logger import SupersetLogger from superset_tool.utils.logger import SupersetLogger
from superset_tool.utils.network import APIClient from superset_tool.utils.network import APIClient # [REFACTORING_TARGET] Использование APIClient
# [CONSTANTS] Логирование # [CONSTANTS] Общие константы (для информации, т.к. тайм-аут теперь в конфиге)
HTTP_METHODS = Literal['GET', 'POST', 'PUT', 'DELETE'] DEFAULT_TIMEOUT = 30 # seconds - используется как значение по умолчанию в SupersetConfig
DEFAULT_TIMEOUT = 30 # seconds
# [TYPE-ALIASES] Для сложных сигнатур # [TYPE-ALIASES] Для сложных сигнатур
JsonType = Union[Dict[str, Any], List[Dict[str, Any]]] JsonType = Union[Dict[str, Any], List[Dict[str, Any]]]
ResponseType = Tuple[bytes, str] ResponseType = Tuple[bytes, str]
# [CHECK] Валидация импортов для контрактов # [CHECK] Валидация импортов для контрактов
# [COHERENCE_CHECK_PASSED] Теперь зависимость на requests и urllib3 скрыта за APIClient
try: try:
# Проверка наличия ключевых зависимостей
assert requests.__version__ >= '2.28.0' # для retry механизмов
assert urllib3.__version__ >= '1.26.0' # для SSL warnings
# Проверка локальных модулей
from .utils.fileio import get_filename_from_headers as fileio_check from .utils.fileio import get_filename_from_headers as fileio_check
assert callable(fileio_check) assert callable(fileio_check)
from .utils.network import APIClient as network_check
assert callable(network_check)
except (ImportError, AssertionError) as imp_err: except (ImportError, AssertionError) as imp_err:
raise RuntimeError( raise RuntimeError(
f"[COHERENCE_CHECK_FAILED] Импорт не прошел валидацию: {str(imp_err)}" f"[COHERENCE_CHECK_FAILED] Импорт не прошел валидацию: {str(imp_err)}"
) from imp_err ) from imp_err
class SupersetClient: class SupersetClient:
"""[MAIN-CONTRACT] Клиент для работы с Superset API """[MAIN-CONTRACT] Клиент для работы с Superset API
@pre: @pre:
- config должен быть валидным SupersetConfig - `config` должен быть валидным `SupersetConfig`.
- Целевой API доступен - Целевой API доступен и учетные данные корректны.
@post: @post:
- Все методы возвращают данные или вызывают явные ошибки - Все методы возвращают ожидаемые данные или вызывают явные, типизированные ошибки.
- Токены автоматически обновляются - Токены для API-вызовов автоматически управляются (`APIClient`).
@invariant: @invariant:
- Сессия остается валидной между вызовами - Сессия остается валидной между вызовами.
- Все ошибки типизированы согласно exceptions.py - Все ошибки типизированы согласно `exceptions.py`.
- Все HTTP-запросы проходят через `self.network`.
""" """
def __init__(self, config: SupersetConfig): def __init__(self, config: SupersetConfig, logger: Optional[SupersetLogger] = None):
"""[INIT] Инициализация клиента """[INIT] Инициализация клиента Superset.
@semantic: @semantic:
- Создает сессию requests - Валидирует входную конфигурацию.
- Настраивает адаптеры подключения - Инициализирует внутренний `APIClient` для сетевого взаимодействия.
- Выполняет первичную аутентификацию - Выполняет первичную аутентификацию через `APIClient`.
""" """
# [PRECONDITION] Валидация конфигурации
self.logger = logger or SupersetLogger(name="SupersetClient")
self._validate_config(config) self._validate_config(config)
self.config = config self.config = config
self.logger = config.logger or SupersetLogger(name="client")
# [ANCHOR] API_CLIENT_INIT
# [REFACTORING_COMPLETE] Теперь вся сетевая логика инкапсулирована в APIClient.
# APIClient отвечает за аутентификацию, повторные попытки и обработку низкоуровневых ошибок.
self.network = APIClient( self.network = APIClient(
base_url=config.base_url, base_url=config.base_url,
auth=config.auth, auth=config.auth,
verify_ssl=config.verify_ssl verify_ssl=config.verify_ssl,
timeout=config.timeout,
logger=self.logger # Передаем логгер в APIClient
) )
self.tokens = self.network.authenticate()
try: try:
# Аутентификация выполняется в конструкторе APIClient или по первому запросу
# Для явного вызова: self.network.authenticate()
# APIClient сам управляет токенами после первого успешного входа
self.logger.info( self.logger.info(
"[COHERENCE_CHECK_PASSED] Клиент успешно инициализирован", "[COHERENCE_CHECK_PASSED] Клиент Superset успешно инициализирован",
extra={"base_url": config.base_url} extra={"base_url": config.base_url}
) )
except Exception as e: except Exception as e:
self.logger.error( self.logger.error(
"[INIT_FAILED] Ошибка инициализации клиента", "[INIT_FAILED] Ошибка инициализации клиента Superset",
exc_info=True, exc_info=True,
extra={"config": config.dict()} extra={"config_base_url": config.base_url, "error": str(e)}
) )
raise raise # Перевыброс ошибки инициализации
def _validate_config(self, config: SupersetConfig) -> None: def _validate_config(self, config: SupersetConfig) -> None:
"""[PRECONDITION] Валидация конфигурации клиента """[PRECONDITION] Валидация конфигурации клиента.
@semantic: @semantic:
- Проверяет обязательные поля - Проверяет, что `config` является экземпляром `SupersetConfig`.
- Валидирует URL и учетные данные - Проверяет обязательные поля `base_url` и `auth`.
- Логирует ошибки валидации.
@raise: @raise:
- ValueError при невалидных параметрах - `TypeError`: если `config` не является `SupersetConfig`.
- TypeError при некорректном типе - `ValueError`: если отсутствуют обязательные поля или они невалидны.
""" """
if not isinstance(config, SupersetConfig): if not isinstance(config, SupersetConfig):
self.logger.error( self.logger.error(
"[CONFIG_VALIDATION_FAILED] Некорректный тип конфигурации", "[CONTRACT_VIOLATION] Некорректный тип конфигурации",
extra={"actual_type": type(config).__name__} extra={"actual_type": type(config).__name__}
) )
raise TypeError("Конфигурация должна быть экземпляром SupersetConfig") raise TypeError("Конфигурация должна быть экземпляром SupersetConfig")
required_fields = ["base_url", "auth"] # Pydantic SupersetConfig уже выполняет основную валидацию через Field и validator.
for field in required_fields: # Здесь можно добавить дополнительные бизнес-правила или проверки доступности, если нужно.
if not getattr(config, field, None): try:
# Попытка доступа к полям через Pydantic для проверки их существования
_ = config.base_url
_ = config.auth
_ = config.auth.get("username")
_ = config.auth.get("password")
self.logger.debug("[COHERENCE_CHECK_PASSED] Конфигурация SupersetClient прошла внутреннюю валидацию.")
except Exception as e:
self.logger.error( self.logger.error(
"[CONFIG_VALIDATION_FAILED] Отсутствует обязательное поле", f"[CONTRACT_VIOLATION] Ошибка валидации полей конфигурации: {e}",
extra={"missing_field": field} extra={"config_dict": config.dict()}
) )
raise ValueError(f"Обязательное поле {field} не указано") raise ValueError(f"Конфигурация SupersetConfig невалидна: {e}") from e
if not config.auth.get("username") or not config.auth.get("password"):
self.logger.error(
"[CONFIG_VALIDATION_FAILED] Не указаны учетные данные",
extra={"auth_keys": list(config.auth.keys())}
)
raise ValueError("В конфигурации должны быть указаны username и password")
# Дополнительная валидация URL
if not config.base_url.startswith(("http://", "https://")):
self.logger.error(
"[CONFIG_VALIDATION_FAILED] Некорректный URL",
extra={"base_url": config.base_url}
)
raise ValueError("base_url должен начинаться с http:// или https://")
@property @property
def headers(self) -> dict: def headers(self) -> dict:
"""[INTERFACE] Базовые заголовки для API-вызовов """[INTERFACE] Базовые заголовки для API-вызовов.
@semantic: Объединяет общие заголовки для всех запросов @semantic: Делегирует получение актуальных заголовков `APIClient`.
@post: Всегда возвращает актуальные токены @post: Всегда возвращает актуальные токены и CSRF-токен.
@invariant: Заголовки содержат 'Authorization' и 'X-CSRFToken'.
""" """
return { # [REFACTORING_COMPLETE] Заголовки теперь управляются APIClient.
"Authorization": f"Bearer {self.tokens['access_token']}", return self.network.headers
"X-CSRFToken": self.tokens["csrf_token"],
"Referer": self.config.base_url,
"Content-Type": "application/json"
}
# [MAIN-OPERATIONS] Работа с дашбордами # [SECTION] Основные операции с дашбордами
def get_dashboard(self, dashboard_id_or_slug: str) -> dict: def get_dashboard(self, dashboard_id_or_slug: str) -> dict:
"""[CONTRACT] Получение метаданных дашборда """[CONTRACT] Получение метаданных дашборда по ID или SLUG.
@pre: @pre:
- dashboard_id_or_slug должен существовать - `dashboard_id_or_slug` должен быть строкой (ID или slug).
- Клиент должен быть аутентифицирован (tokens актуальны) - Клиент должен быть аутентифицирован (токены актуальны).
@post: @post:
- Возвращает dict с метаданными дашборда - Возвращает `dict` с метаданными дашборда.
- В случае 404 вызывает DashboardNotFoundError @raise:
@semantic_layers: - `DashboardNotFoundError`: Если дашборд не найден (HTTP 404).
1. Взаимодействие с API через APIClient - `SupersetAPIError`: При других ошибках API.
2. Обработка специфичных для Superset ошибок - `NetworkError`: При проблемах с сетью.
""" """
self.logger.info(f"[INFO] Запрос метаданных дашборда: {dashboard_id_or_slug}")
try: try:
response = self.network.request( response_data = self.network.request(
method="GET", method="GET",
endpoint=f"/dashboard/{dashboard_id_or_slug}", endpoint=f"/dashboard/{dashboard_id_or_slug}",
headers=self.headers # Автоматически включает токены # headers=self.headers # [REFACTORING_NOTE] APIClient теперь сам добавляет заголовки
) ).json()
return response.json()["result"] # [POSTCONDITION] Проверка структуры ответа
if "result" not in response_data:
except requests.HTTPError as e: self.logger.warning("[CONTRACT_VIOLATION] Ответ API не содержит поле 'result'", extra={"response": response_data})
if e.response.status_code == 404: raise SupersetAPIError("Некорректный формат ответа API при получении дашборда")
raise DashboardNotFoundError( self.logger.debug(f"[DEBUG] Метаданные дашборда '{dashboard_id_or_slug}' успешно получены.")
dashboard_id_or_slug, return response_data["result"]
context={"url": f"{self.config.base_url}/dashboard/{dashboard_id_or_slug}"} except (DashboardNotFoundError, SupersetAPIError, NetworkError, PermissionDeniedError) as e:
) self.logger.error(f"[ERROR] Не удалось получить дашборд '{dashboard_id_or_slug}': {str(e)}", exc_info=True, extra=getattr(e, 'context', {}))
raise SupersetAPIError( raise # Перевыброс уже типизированной ошибки
f"API Error: {str(e)}", except Exception as e:
status_code=e.response.status_code self.logger.critical(f"[CRITICAL] Непредвиденная ошибка при получении дашборда '{dashboard_id_or_slug}': {str(e)}", exc_info=True)
) from e raise SupersetAPIError(f"Непредвиденная ошибка: {str(e)}", context={"dashboard_id_or_slug": dashboard_id_or_slug}) from e
# [ERROR-HANDLER] Централизованная обработка ошибок
def _handle_api_error(self, method_name: str, error: Exception, url: str) -> None:
"""[UNIFIED-ERROR] Обработка API-ошибок
@semantic: Преобразует requests исключения в наши типы
"""
context = {
"method": method_name,
"url": url,
"status_code": getattr(error.response, 'status_code', None)
}
if isinstance(error, requests.Timeout):
raise NetworkError("Request timeout", context=context) from error
elif getattr(error.response, 'status_code', None) == 403:
raise PermissionDeniedError(context=context) from error
else:
raise SupersetAPIError(str(error), context=context) from error
# [SECTION] EXPORT OPERATIONS # [SECTION] EXPORT OPERATIONS
def export_dashboard(self, dashboard_id: int) -> Tuple[bytes, str]: def export_dashboard(self, dashboard_id: int) -> Tuple[bytes, str]:
"""[CONTRACT] Экспорт дашборда в ZIP-архив """[CONTRACT] Экспорт дашборда в ZIP-архив.
@pre: @pre:
- dashboard_id должен существовать - `dashboard_id` должен быть целочисленным ID существующего дашборда.
- Пользователь имеет права на экспорт - Пользователь должен иметь права на экспорт.
@post: @post:
- Возвращает кортеж (бинарное содержимое, имя файла) - Возвращает кортеж: (бинарное_содержимое_zip, имя_файла).
- Имя файла извлекается из headers или генерируется - Имя файла извлекается из заголовков `Content-Disposition` или генерируется.
@errors: @raise:
- DashboardNotFoundError если дашборд не существует - `DashboardNotFoundError`: Если дашборд с `dashboard_id` не найден (HTTP 404).
- ExportError при проблемах экспорта - `ExportError`: При любых других проблемах экспорта (например, неверный тип контента, пустой ответ).
- `NetworkError`: При проблемах с сетью.
""" """
url = f"{self.config.base_url}/dashboard/export/" self.logger.info(f"[INFO] Запуск экспорта дашборда с ID: {dashboard_id}")
self.logger.debug(
"[EXPORT_START] Запуск экспорта",
extra={"dashboard_id": dashboard_id, "export_url": url}
)
try: try:
response = self._execute_export_request(dashboard_id, url) # [ANCHOR] EXECUTE_EXPORT_REQUEST
self._validate_export_response(response, dashboard_id) # [REFACTORING_COMPLETE] Использование self.network.request для экспорта
filename = self._resolve_export_filename(response, dashboard_id)
return response.content, filename
except requests.exceptions.HTTPError as http_err:
error_ctx = {
"dashboard_id": dashboard_id,
"status_code": http_err.response.status_code
}
if http_err.response.status_code == 404:
self.logger.error(
"[EXPORT_FAILED] Дашборд не найден",
extra=error_ctx
)
raise DashboardNotFoundError(dashboard_id, context=error_ctx)
raise ExportError("HTTP ошибка экспорта", context=error_ctx) from http_err
except requests.exceptions.RequestException as req_err:
error_ctx = {"dashboard_id": dashboard_id}
self.logger.error(
"[EXPORT_FAILED] Ошибка запроса",
exc_info=True,
extra=error_ctx
)
raise ExportError("Ошибка экспорта", context=error_ctx) from req_err
def _execute_export_request(self, dashboard_id: int, url: str) -> requests.Response:
"""[HELPER] Выполнение запроса экспорта
@coherence_check:
- Ответ должен иметь status_code 200
- Content-Type: application/zip
"""
response = self.network.request( response = self.network.request(
method="GET", method="GET",
endpoint="/dashboard/export/", endpoint="/dashboard/export/",
params={"q": f"[{dashboard_id}]"}, params={"q": json.dumps([dashboard_id])},
raw_response=True # Для получения бинарного содержимого stream=True, # Используем stream для обработки больших файлов
raw_response=True # Получаем сырой объект ответа requests.Response
# headers=self.headers # APIClient сам добавляет заголовки
) )
response.raise_for_status() response.raise_for_status() # Проверка статуса ответа
return response
def _validate_export_response(self, response: requests.Response, dashboard_id: int) -> None: # [ANCHOR] VALIDATE_EXPORT_RESPONSE
"""[HELPER] Валидация ответа экспорта self._validate_export_response(response, dashboard_id)
# [ANCHOR] RESOLVE_FILENAME
filename = self._resolve_export_filename(response, dashboard_id)
# [POSTCONDITION] Успешный экспорт
content = response.content # Получаем все содержимое
self.logger.info(
f"[COHERENCE_CHECK_PASSED] Дашборд {dashboard_id} успешно экспортирован. Размер: {len(content)} байт, Имя файла: {filename}"
)
return content, filename
except (DashboardNotFoundError, ExportError, NetworkError, PermissionDeniedError, SupersetAPIError) as e:
# Перехват и перевыброс уже типизированных ошибок от APIClient или предыдущих валидаций
self.logger.error(f"[ERROR] Ошибка экспорта дашборда {dashboard_id}: {str(e)}", exc_info=True, extra=getattr(e, 'context', {}))
raise
except Exception as e:
# Обработка любых непредвиденных ошибок
error_ctx = {"dashboard_id": dashboard_id, "error_type": type(e).__name__}
self.logger.critical(f"[CRITICAL] Непредвиденная ошибка при экспорте дашборда {dashboard_id}: {str(e)}", exc_info=True, extra=error_ctx)
raise ExportError(f"Непредвиденная ошибка при экспорте: {str(e)}", context=error_ctx) from e
# [HELPER] Метод _execute_export_request был инлайнирован в export_dashboard
# Это сделано, чтобы избежать лишней абстракции, так как он просто вызывает self.network.request.
# Валидация HTTP-ответа и ошибок теперь происходит в self.network.request и последующей self.raise_for_status().
def _validate_export_response(self, response: Response, dashboard_id: int) -> None:
"""[HELPER] Валидация ответа экспорта.
@semantic: @semantic:
- Проверка Content-Type - Проверяет, что Content-Type является `application/zip`.
- Проверка наличия данных - Проверяет, что ответ не пуст.
@raise:
- `ExportError`: При невалидном Content-Type или пустом содержимом.
""" """
if 'application/zip' not in response.headers.get('Content-Type', ''): content_type = response.headers.get('Content-Type', '')
if 'application/zip' not in content_type:
self.logger.error( self.logger.error(
"[EXPORT_VALIDATION_FAILED] Неверный Content-Type", "[CONTRACT_VIOLATION] Неверный Content-Type для экспорта",
extra={ extra={
"dashboard_id": dashboard_id, "dashboard_id": dashboard_id,
"content_type": response.headers.get('Content-Type') "expected_type": "application/zip",
"received_type": content_type
} }
) )
raise ExportError("Получен не ZIP-архив") raise ExportError(f"Получен не ZIP-архив (Content-Type: {content_type})")
if not response.content: if not response.content:
self.logger.error( self.logger.error(
"[EXPORT_VALIDATION_FAILED] Пустой ответ", "[CONTRACT_VIOLATION] Пустой ответ при экспорте дашборда",
extra={"dashboard_id": dashboard_id} extra={"dashboard_id": dashboard_id}
) )
raise ExportError("Получены пустые данные") raise ExportError("Получены пустые данные при экспорте")
self.logger.debug(f"[COHERENCE_CHECK_PASSED] Ответ экспорта для дашборда {dashboard_id} валиден.")
def _resolve_export_filename(self, response: requests.Response, dashboard_id: int) -> str: def _resolve_export_filename(self, response: Response, dashboard_id: int) -> str:
"""[HELPER] Определение имени экспортируемого файла """[HELPER] Определение имени экспортируемого файла.
@fallback: Генерирует имя если не найден заголовок @semantic:
- Пытается извлечь имя файла из заголовка `Content-Disposition`.
- Если заголовок отсутствует, генерирует имя файла на основе ID дашборда и текущей даты.
@post:
- Возвращает строку с именем файла.
""" """
filename = get_filename_from_headers(response.headers) filename = get_filename_from_headers(response.headers)
if not filename: if not filename:
filename = f"dashboard_export_{dashboard_id}_{datetime.now().strftime('%Y%m%d')}.zip" # [FALLBACK] Генерация имени файла
filename = f"dashboard_export_{dashboard_id}_{datetime.datetime.now().strftime('%Y%m%dT%H%M%S')}.zip"
self.logger.warning(
"[WARN] Не удалось извлечь имя файла из заголовков. Используется сгенерированное имя.",
extra={"generated_filename": filename, "dashboard_id": dashboard_id}
)
else:
self.logger.debug( self.logger.debug(
"[EXPORT_FALLBACK] Используется сгенерированное имя файла", "[DEBUG] Имя файла экспорта получено из заголовков.",
extra={"filename": filename} extra={"filename": filename, "dashboard_id": dashboard_id}
) )
return filename return filename
def export_to_file(self, dashboard_id: int, output_dir: Union[str, Path]) -> Path: def export_to_file(self, dashboard_id: int, output_dir: Union[str, Path]) -> Path:
"""[CONTRACT] Экспорт дашборда прямо в файл """[CONTRACT] Экспорт дашборда напрямую в файл.
@pre: @pre:
- output_dir должен существовать - `dashboard_id` должен быть существующим ID дашборда.
- Доступ на запись в директорию - `output_dir` должен быть валидным, существующим путем и иметь права на запись.
@post: @post:
- Возвращает Path сохраненного файла - Дашборд экспортируется и сохраняется как ZIP-файл в `output_dir`.
- Создает поддиректорию с именем дашборда - Возвращает `Path` к сохраненному файлу.
@raise:
- `FileNotFoundError`: Если `output_dir` не существует.
- `ExportError`: При ошибках экспорта или записи файла.
- `NetworkError`: При проблемах с сетью.
""" """
output_dir = Path(output_dir) output_dir = Path(output_dir)
if not output_dir.exists(): if not output_dir.exists():
self.logger.error( self.logger.error(
"[EXPORT_PRE_FAILED] Директория не существует", "[CONTRACT_VIOLATION] Целевая директория для экспорта не найдена.",
extra={"output_dir": str(output_dir)} extra={"output_dir": str(output_dir)}
) )
raise FileNotFoundError(f"Директория {output_dir} не найдена") raise FileNotFoundError(f"Директория {output_dir} не найдена")
self.logger.info(f"[INFO] Экспорт дашборда {dashboard_id} в файл в директорию: {output_dir}")
try:
content, filename = self.export_dashboard(dashboard_id) content, filename = self.export_dashboard(dashboard_id)
target_path = output_dir / filename target_path = output_dir / filename
try:
with open(target_path, 'wb') as f: with open(target_path, 'wb') as f:
f.write(content) f.write(content)
self.logger.info( self.logger.info(
"[EXPORT_SUCCESS] Дашборд сохранен на диск", "[COHERENCE_CHECK_PASSED] Дашборд успешно сохранен на диск.",
extra={ extra={
"dashboard_id": dashboard_id, "dashboard_id": dashboard_id,
"file_path": str(target_path), "file_path": str(target_path),
@@ -338,167 +329,245 @@ class SupersetClient:
) )
return target_path return target_path
except (FileNotFoundError, ExportError, NetworkError, SupersetAPIError, DashboardNotFoundError) as e:
self.logger.error(f"[ERROR] Ошибка сохранения дашборда {dashboard_id} на диск: {str(e)}", exc_info=True, extra=getattr(e, 'context', {}))
raise
except IOError as io_err: except IOError as io_err:
self.logger.error( error_ctx = {"target_path": str(target_path), "dashboard_id": dashboard_id}
"[EXPORT_IO_FAILED] Ошибка записи файла", self.logger.critical(f"[CRITICAL] Ошибка записи файла для дашборда {dashboard_id}: {str(io_err)}", exc_info=True, extra=error_ctx)
exc_info=True, raise ExportError("Ошибка сохранения файла на диск") from io_err
extra={"target_path": str(target_path)} except Exception as e:
) error_ctx = {"dashboard_id": dashboard_id, "error_type": type(e).__name__}
raise ExportError("Ошибка сохранения файла") from io_err self.logger.critical(f"[CRITICAL] Непредвиденная ошибка при экспорте в файл: {str(e)}", exc_info=True, extra=error_ctx)
raise ExportError(f"Непредвиденная ошибка экспорта в файл: {str(e)}", context=error_ctx) from e
# [SECTION] Основной интерфейс API # [SECTION] API для получения списка дашбордов
def get_dashboards(self, query: Optional[Dict] = None) -> Tuple[int, List[Dict]]: def get_dashboards(self, query: Optional[Dict] = None) -> Tuple[int, List[Dict]]:
"""[CONTRACT] Получение списка дашбордов с пагинацией """[CONTRACT] Получение списка дашбордов с пагинацией.
@pre: @pre:
- Клиент должен быть авторизован - Клиент должен быть авторизован.
- Параметры пагинации должны быть валидны - Параметры `query` (если предоставлены) должны быть валидны для API Superset.
@post: @post:
- Возвращает кортеж (total_count, список метаданных) - Возвращает кортеж: (общееоличествоашбордов, список_метаданныхашбордов).
- Поддерживает кастомные query-параметры - Обходит пагинацию для получения всех доступных дашбордов.
@invariant: @invariant:
- Всегда возвращает полный список (обходит пагинацию) - Всегда возвращает полный список (если `total_count` > 0).
@raise:
- `SupersetAPIError`: При ошибках API (например, неверный формат ответа).
- `NetworkError`: При проблемах с сетью.
- `ValueError`: При некорректных параметрах пагинации (внутренняя ошибка).
""" """
url = f"{self.config.base_url}/dashboard/" self.logger.info("[INFO] Запрос списка всех дашбордов.")
self.logger.debug( # [COHERENCE_CHECK] Валидация и нормализация параметров запроса
"[API_CALL] Запрос списка дашбордов",
extra={"query": query}
)
# [COHERENCE_CHECK] Валидация параметров
validated_query = self._validate_query_params(query) validated_query = self._validate_query_params(query)
self.logger.debug("[DEBUG] Параметры запроса списка дашбордов после валидации.", extra={"validated_query": validated_query})
try: try:
# Инициализация пагинации # [ANCHOR] FETCH_TOTAL_COUNT
total_count = self._fetch_total_count() total_count = self._fetch_total_count()
self.logger.info(f"[INFO] Обнаружено {total_count} дашбордов в системе.")
# [ANCHOR] FETCH_ALL_PAGES
paginated_data = self._fetch_all_pages(validated_query, total_count) paginated_data = self._fetch_all_pages(validated_query, total_count)
self.logger.info( self.logger.info(
"[API_SUCCESS] Дашборды получены", f"[COHERENCE_CHECK_PASSED] Успешно получено {len(paginated_data)} дашбордов из {total_count}."
extra={"count": total_count}
) )
return total_count, paginated_data return total_count, paginated_data
except requests.exceptions.RequestException as e: except (SupersetAPIError, NetworkError, ValueError, PermissionDeniedError) as e:
error_ctx = {"method": "get_dashboards", "query": validated_query} self.logger.error(f"[ERROR] Ошибка при получении списка дашбордов: {str(e)}", exc_info=True, extra=getattr(e, 'context', {}))
self._handle_api_error("Пагинация дашбордов", e, error_ctx) raise
except Exception as e:
error_ctx = {"query": query, "error_type": type(e).__name__}
self.logger.critical(f"[CRITICAL] Непредвиденная ошибка при получении списка дашбордов: {str(e)}", exc_info=True, extra=error_ctx)
raise SupersetAPIError(f"Непредвиденная ошибка: {str(e)}", context=error_ctx) from e
# [SECTION] Импорт # [SECTION] Импорт
def import_dashboard(self, file_name: Union[str, Path]) -> Dict: def import_dashboard(self, file_name: Union[str, Path]) -> Dict:
"""[CONTRACT] Импорт дашборда из архива """[CONTRACT] Импорт дашборда из ZIP-архива.
@pre: @pre:
- Файл должен существовать и быть валидным ZIP - `file_name` должен указывать на существующий и валидный ZIP-файл Superset экспорта.
- Должны быть права на импорт - Пользователь должен иметь права на импорт дашбордов.
@post: @post:
- Возвращает метаданные импортированного дашборда - Дашборд импортируется (или обновляется, если `overwrite` включен).
- При конфликтах выполняет overwrite - Возвращает `dict` с ответом API об импорте.
@raise:
- `FileNotFoundError`: Если файл не существует.
- `InvalidZipFormatError`: Если файл не является корректным ZIP-архивом Superset.
- `PermissionDeniedError`: Если у пользователя нет прав на импорт.
- `SupersetAPIError`: При других ошибках API импорта.
- `NetworkError`: При проблемах с сетью.
""" """
self.logger.info(f"[INFO] Инициирован импорт дашборда из файла: {file_name}")
# [PRECONDITION] Валидация входного файла
self._validate_import_file(file_name) self._validate_import_file(file_name)
self.logger.debug(
"[IMPORT_START] Инициирован импорт дашборда",
extra={"file": file_name}
)
try: try:
return self.network.upload_file( # [ANCHOR] UPLOAD_FILE_TO_API
# [REFACTORING_COMPLETE] Использование self.network.upload_file
import_response = self.network.upload_file(
endpoint="/dashboard/import/", endpoint="/dashboard/import/",
file_obj=file_name, file_obj=Path(file_name), # Pathlib объект, который APIClient может преобразовать в бинарный
file_name=file_name, file_name=Path(file_name).name, # Имя файла для FormData
form_field="formData", form_field="formData",
extra_data={'overwrite': 'true'}, extra_data={'overwrite': 'true'}, # Предполагаем, что всегда хотим перезаписывать
timeout=self.config.timeout * 2 timeout=self.config.timeout * 2 # Удвоенный таймаут для загрузки больших файлов
# headers=self.headers # APIClient сам добавляет заголовки
) )
# [POSTCONDITION] Проверка успешного ответа импорта (Superset обычно возвращает JSON)
if not isinstance(import_response, dict) or "message" not in import_response:
self.logger.warning("[CONTRACT_VIOLATION] Неожиданный формат ответа при импорте", extra={"response": import_response})
raise SupersetAPIError("Неожиданный формат ответа после импорта дашборда.")
except PermissionDeniedError as e: self.logger.info(
self.logger.error( f"[COHERENCE_CHECK_PASSED] Дашборд из '{file_name}' успешно импортирован.",
"[IMPORT_AUTH_FAILED] Недостаточно прав для импорта", extra={"api_message": import_response.get("message", "N/A"), "file": file_name}
exc_info=True
) )
return import_response
except (FileNotFoundError, InvalidZipFormatError, PermissionDeniedError, SupersetAPIError, NetworkError, DashboardNotFoundError) as e:
self.logger.error(f"[ERROR] Ошибка импорта дашборда из '{file_name}': {str(e)}", exc_info=True, extra=getattr(e, 'context', {}))
raise raise
except Exception as e: except Exception as e:
self.logger.error( error_ctx = {"file": file_name, "error_type": type(e).__name__}
"[IMPORT_FAILED] Ошибка импорта дашборда", self.logger.critical(f"[CRITICAL] Непредвиденная ошибка при импорте дашборда: {str(e)}", exc_info=True, extra=error_ctx)
exc_info=True, raise SupersetAPIError(f"Непредвиденная ошибка импорта: {str(e)}", context=error_ctx) from e
extra={"file": file_name}
)
raise SupersetAPIError(f"Ошибка импорта: {str(e)}") from e
# [SECTION] Приватные методы-помощники # [SECTION] Приватные методы-помощники
def _validate_query_params(self, query: Optional[Dict]) -> Dict: def _validate_query_params(self, query: Optional[Dict]) -> Dict:
"""[HELPER] Нормализация параметров запроса""" """[HELPER] Нормализация и валидация параметров запроса для списка дашбордов.
@semantic:
- Устанавливает значения по умолчанию для `columns`, `page`, `page_size`.
- Объединяет предоставленные `query` параметры с дефолтными.
@post:
- Возвращает словарь с полными и валидными параметрами запроса.
"""
base_query = { base_query = {
"columns": ["slug", "id", "changed_on_utc", "dashboard_title", "published"], "columns": ["slug", "id", "changed_on_utc", "dashboard_title", "published"],
"page": 0, "page": 0,
"page_size": 20 "page_size": 1000 # Достаточно большой размер страницы для обхода пагинации
} }
# [COHERENCE_CHECK_PASSED] Параметры запроса сформированы корректно.
return {**base_query, **(query or {})} return {**base_query, **(query or {})}
def _fetch_total_count(self) -> int: def _fetch_total_count(self) -> int:
"""[CONTRACT][HELPER] Получение общего кол-ва дашбордов в системе """[CONTRACT][HELPER] Получение общего количества дашбордов в системе.
@delegates: @delegates:
- Сетевой запрос -> APIClient - Сетевой запрос к `APIClient.fetch_paginated_count`.
- Обработка ответа -> собственный метод @pre:
@errors: - Клиент должен быть авторизован.
- SupersetAPIError при проблемах с API @post:
- Возвращает целочисленное количество дашбордов.
@raise:
- `SupersetAPIError` или `NetworkError` при проблемах с API/сетью.
""" """
query_params = { query_params_for_count = {
'columns': ['id'], 'columns': ['id'],
'page': 0, 'page': 0,
'page_size': 1 'page_size': 1
} }
self.logger.debug("[DEBUG] Запрос общего количества дашбордов.")
try: try:
return self.network.fetch_paginated_count( # [REFACTORING_COMPLETE] Использование self.network.fetch_paginated_count
count = self.network.fetch_paginated_count(
endpoint="/dashboard/", endpoint="/dashboard/",
query_params=query_params, query_params=query_params_for_count,
count_field="count" count_field="count"
) )
except requests.exceptions.RequestException as e: self.logger.debug(f"[COHERENCE_CHECK_PASSED] Получено общее количество дашбордов: {count}")
raise SupersetAPIError(f"Ошибка получения количества дашбордов: {str(e)}") return count
except (SupersetAPIError, NetworkError, PermissionDeniedError) as e:
self.logger.error(f"[ERROR] Ошибка получения общего количества дашбордов: {str(e)}", exc_info=True, extra=getattr(e, 'context', {}))
raise # Перевыброс ошибки
except Exception as e:
error_ctx = {"error_type": type(e).__name__}
self.logger.critical(f"[CRITICAL] Непредвиденная ошибка при получении общего количества: {str(e)}", exc_info=True, extra=error_ctx)
raise SupersetAPIError(f"Непредвиденная ошибка при получении count: {str(e)}", context=error_ctx) from e
def _fetch_all_pages(self, query: Dict, total_count: int) -> List[Dict]: def _fetch_all_pages(self, query: Dict, total_count: int) -> List[Dict]:
"""[HELPER] Обход всех страниц с пагинацией""" """[CONTRACT][HELPER] Обход всех страниц пагинированного API для получения всех данных.
"""[CONTRACT] Получение всех данных с пагинированного API
@delegates: @delegates:
- Сетевые запросы -> APIClient.fetch_paginated_data() - Сетевые запросы к `APIClient.fetch_paginated_data()`.
@params: @pre:
query: оригинальный query-объект (без page) - `query` должен содержать `page_size`.
total_count: общее количество элементов - `total_count` должен быть корректным общим количеством элементов.
@return: @post:
Список всех элементов - Возвращает список всех элементов, собранных со всех страниц.
@errors: @raise:
- SupersetAPIError: проблемы с API - `SupersetAPIError` или `NetworkError` при проблемах с API/сетью.
- ValueError: некорректные параметры пагинации - `ValueError` при некорректных параметрах пагинации.
""" """
self.logger.debug(f"[DEBUG] Запуск обхода пагинации. Всего элементов: {total_count}, query: {query}")
try: try:
if not query.get('page_size'): if 'page_size' not in query or not query['page_size']:
raise ValueError("Отсутствует page_size в query параметрах") self.logger.error("[CONTRACT_VIOLATION] Параметр 'page_size' отсутствует или неверен в query.")
raise ValueError("Отсутствует 'page_size' в query параметрах для пагинации")
return self.network.fetch_paginated_data( # [REFACTORING_COMPLETE] Использование self.network.fetch_paginated_data
all_data = self.network.fetch_paginated_data(
endpoint="/dashboard/", endpoint="/dashboard/",
base_query=query, base_query=query,
total_count=total_count, total_count=total_count,
results_field="result" results_field="result"
) )
self.logger.debug(f"[COHERENCE_CHECK_PASSED] Успешно получено {len(all_data)} элементов со всех страниц.")
return all_data
except (requests.exceptions.RequestException, ValueError) as e: except (SupersetAPIError, NetworkError, ValueError, PermissionDeniedError) as e:
error_ctx = { self.logger.error(f"[ERROR] Ошибка при обходе пагинации: {str(e)}", exc_info=True, extra=getattr(e, 'context', {}))
"query": query, raise
"total_count": total_count, except Exception as e:
"error": str(e) error_ctx = {"query": query, "total_count": total_count, "error_type": type(e).__name__}
} self.logger.critical(f"[CRITICAL] Непредвиденная ошибка при обходе пагинации: {str(e)}", exc_info=True, extra=error_ctx)
self.logger.error("[PAGINATION_ERROR]", extra=error_ctx) raise SupersetAPIError(f"Непредвиденная ошибка пагинации: {str(e)}", context=error_ctx) from e
raise SupersetAPIError(f"Ошибка пагинации: {str(e)}") from e
def _validate_import_file(self, zip_path: Union[str, Path]) -> None: def _validate_import_file(self, zip_path: Union[str, Path]) -> None:
"""[HELPER] Проверка файла перед импортом""" """[HELPER] Проверка файла перед импортом.
@semantic:
- Проверяет существование файла.
- Проверяет, что файл является валидным ZIP-архивом.
- Проверяет, что ZIP-архив содержит `metadata.yaml` (ключевой для экспорта Superset).
@raise:
- `FileNotFoundError`: Если файл не существует.
- `InvalidZipFormatError`: Если файл не ZIP или не содержит `metadata.yaml`.
"""
path = Path(zip_path) path = Path(zip_path)
self.logger.debug(f"[DEBUG] Валидация файла для импорта: {path}")
if not path.exists(): if not path.exists():
raise FileNotFoundError(f"[FILE_ERROR] {zip_path} не существует") self.logger.error(
"[CONTRACT_VIOLATION] Файл для импорта не найден.",
extra={"file_path": str(path)}
)
raise FileNotFoundError(f"Файл {zip_path} не существует")
if not zipfile.is_zipfile(path): if not zipfile.is_zipfile(path):
raise InvalidZipFormatError(f"[FILE_ERROR] {zip_path} не ZIP-архив") self.logger.error(
"[CONTRACT_VIOLATION] Файл не является валидным ZIP-архивом.",
extra={"file_path": str(path)}
)
raise InvalidZipFormatError(f"Файл {zip_path} не является ZIP-архивом")
with zipfile.ZipFile(path) as zf: try:
with zipfile.ZipFile(path, 'r') as zf:
# [CONTRACT] Проверяем наличие metadata.yaml
if not any(n.endswith('metadata.yaml') for n in zf.namelist()): if not any(n.endswith('metadata.yaml') for n in zf.namelist()):
raise DashboardNotFoundError("Архив не содержит metadata.yaml") self.logger.error(
"[CONTRACT_VIOLATION] ZIP-архив не содержит 'metadata.yaml'.",
extra={"file_path": str(path), "zip_contents": zf.namelist()[:5]} # Логируем первые 5 файлов для отладки
)
raise InvalidZipFormatError(f"Архив {zip_path} не содержит 'metadata.yaml', не является корректным экспортом Superset.")
self.logger.debug(f"[COHERENCE_CHECK_PASSED] Файл '{path}' успешно прошел валидацию для импорта.")
except zipfile.BadZipFile as e:
self.logger.error(
f"[CONTRACT_VIOLATION] Ошибка чтения ZIP-файла: {str(e)}",
exc_info=True, extra={"file_path": str(path)}
)
raise InvalidZipFormatError(f"Файл {zip_path} поврежден или имеет некорректный формат ZIP.") from e
except Exception as e:
self.logger.critical(
f"[CRITICAL] Непредвиденная ошибка при валидации ZIP-файла: {str(e)}",
exc_info=True, extra={"file_path": str(path)}
)
raise SupersetAPIError(f"Непредвиденная ошибка валидации ZIP: {str(e)}", context={"file_path": str(path)}) from e

View File

@@ -1,48 +1,66 @@
# [MODULE] Иерархия исключений # [MODULE] Иерархия исключений
# @contract: Все ошибки наследуют SupersetToolError # @contract: Все ошибки наследуют `SupersetToolError` для единой точки обработки.
# @semantic: Каждый тип соответствует конкретной проблемной области # @semantic: Каждый тип исключения соответствует конкретной проблемной области в инструменте Superset.
# @coherence: # @coherence:
# - Полное покрытие всех сценариев клиента # - Полное покрытие всех сценариев ошибок клиента и утилит.
# - Четкая классификация по уровню серьезности # - Четкая классификация по уровню серьезности (от общей до специфичной).
# - Дополнительный `context` для каждой ошибки, помогающий в диагностике.
# [IMPORTS] Exceptions # [IMPORTS] Standard library
from typing import Optional, Dict, Any from pathlib import Path
# [IMPORTS] Typing
from typing import Optional, Dict, Any,Union
class SupersetToolError(Exception): class SupersetToolError(Exception):
"""[BASE] Базовый класс ошибок инструмента """[BASE] Базовый класс для всех ошибок инструмента Superset.
@semantic: Должен содержать контекст для диагностики @semantic: Обеспечивает стандартизированный формат сообщений об ошибках с контекстом.
@invariant:
- `message` всегда присутствует.
- `context` всегда является словарем, даже если пустой.
""" """
def __init__(self, message: str, context: Optional[dict] = None): def __init__(self, message: str, context: Optional[Dict[str, Any]] = None):
# [PRECONDITION] Проверка типа контекста
if not isinstance(context, (dict, type(None))):
# [COHERENCE_CHECK_FAILED] Ошибка в передаче контекста
raise TypeError("Контекст ошибки должен быть словарем или None")
self.context = context or {} self.context = context or {}
super().__init__(f"{message} | Context: {self.context}") super().__init__(f"{message} | Context: {self.context}")
# [POSTCONDITION] Логирование создания ошибки
# Можно добавить здесь логирование, но обычно ошибки логируются в месте их перехвата/подъема,
# чтобы избежать дублирования и получить полный стек вызовов.
# [ERROR-GROUP] Проблемы аутентификации и авторизации # [ERROR-GROUP] Проблемы аутентификации и авторизации
class AuthenticationError(SupersetToolError): class AuthenticationError(SupersetToolError):
"""[AUTH] Ошибки credentials или доступа """[AUTH] Ошибки аутентификации (неверные учетные данные) или авторизации (проблемы с сессией).
@context: url, username, error_detail @context: url, username, error_detail (опционально).
""" """
def __init__(self, message="Auth failed", **context): def __init__(self, message: str = "Authentication failed", **context: Any):
super().__init__( super().__init__(
f"[AUTH_FAILURE] {message}", f"[AUTH_FAILURE] {message}",
{"type": "authentication", **context} {"type": "authentication", **context}
) )
class PermissionDeniedError(AuthenticationError): class PermissionDeniedError(AuthenticationError):
"""[AUTH] Ошибка отказа в доступе из-за недостаточных прав """[AUTH] Ошибка отказа в доступе из-за недостаточных прав пользователя.
@context: required_permission, user_roles @semantic: Указывает на то, что операция не разрешена.
@context: required_permission (опционально), user_roles (опционально), endpoint (опционально).
@invariant: Наследует от `AuthenticationError`, так как это разновидность проблемы доступа.
""" """
def __init__(self, required_permission: str, **context): def __init__(self, message: str = "Permission denied", required_permission: Optional[str] = None, **context: Any):
full_message = f"Permission denied: {required_permission}" if required_permission else message
super().__init__( super().__init__(
f"Permission denied: {required_permission}", full_message,
{"type": "authorization", "required_permission": required_permission, **context} {"type": "authorization", "required_permission": required_permission, **context}
) )
# [ERROR-GROUP] Проблемы API-вызовов # [ERROR-GROUP] Проблемы API-вызовов
class SupersetAPIError(SupersetToolError): class SupersetAPIError(SupersetToolError):
"""[API] Ошибки взаимодействия с Superset API """[API] Общие ошибки взаимодействия с Superset API.
@context: endpoint, method, status_code, response @semantic: Для ошибок, возвращаемых Superset API, или проблем с парсингом ответа.
@context: endpoint, method, status_code, response_body (опционально), error_message (из API).
""" """
def __init__(self, message="API error", **context): def __init__(self, message: str = "Superset API error", **context: Any):
super().__init__( super().__init__(
f"[API_FAILURE] {message}", f"[API_FAILURE] {message}",
{"type": "api_call", **context} {"type": "api_call", **context}
@@ -50,30 +68,44 @@ class SupersetAPIError(SupersetToolError):
# [ERROR-SUBCLASS] Детализированные ошибки API # [ERROR-SUBCLASS] Детализированные ошибки API
class ExportError(SupersetAPIError): class ExportError(SupersetAPIError):
"""[API:EXPORT] Проблемы экспорта дашбордов""" """[API:EXPORT] Проблемы, специфичные для операций экспорта дашбордов.
... @semantic: Может быть вызвано невалидным форматом ответа, ошибками Superset при экспорте.
@context: dashboard_id (опционально), details (опционально).
"""
def __init__(self, message: str = "Dashboard export failed", **context: Any):
super().__init__(f"[EXPORT_FAILURE] {message}", {"subtype": "export", **context})
class DashboardNotFoundError(SupersetAPIError): class DashboardNotFoundError(SupersetAPIError):
"""[API:404] Запрошенный ресурс не существует""" """[API:404] Запрошенный дашборд или ресурс не существует.
def __init__(self, dashboard_id, **context): @semantic: Соответствует HTTP 404 Not Found.
@context: dashboard_id_or_slug, url.
"""
def __init__(self, dashboard_id_or_slug: Union[int, str], message: str = "Dashboard not found", **context: Any):
super().__init__( super().__init__(
f"Dashboard {dashboard_id} not found", f"[NOT_FOUND] Dashboard '{dashboard_id_or_slug}' {message}",
{"dashboard_id": dashboard_id, **context} {"subtype": "not_found", "resource_id": dashboard_id_or_slug, **context}
) )
# [ERROR-SUBCLASS] Детализированные ошибки обработки файлов # [ERROR-SUBCLASS] Детализированные ошибки обработки файлов
class InvalidZipFormatError(SupersetAPIError): class InvalidZipFormatError(SupersetToolError):
"""[API:ZIP] Некорректный формат ZIP-архива """[FILE:ZIP] Некорректный формат ZIP-архива или содержимого для импорта/экспорта.
@context: file_path, expected_format, error_detail @semantic: Указывает на проблемы с целостностью или структурой ZIP-файла.
@context: file_path, expected_content (например, metadata.yaml), error_detail.
""" """
def __init__(self, file_path: str, **context): def __init__(self, message: str = "Invalid ZIP format or content", file_path: Optional[Union[str, Path]] = None, **context: Any):
super().__init__( super().__init__(
f"Invalid ZIP format for file: {file_path}", f"[FILE_ERROR] {message}",
{"type": "zip_validation", "file_path": file_path, **context} {"type": "file_validation", "file_path": str(file_path) if file_path else "N/A", **context}
) )
# [ERROR-GROUP] Системные и network-ошибки # [ERROR-GROUP] Системные и network-ошибки
class NetworkError(SupersetToolError): class NetworkError(SupersetToolError):
"""[NETWORK] Проблемы соединения или таймауты""" """[NETWORK] Проблемы соединения, таймауты, DNS-ошибки и т.п.
... @semantic: Ошибки, связанные с невозможностью установить или поддерживать сетевое соединение.
@context: url, original_exception (опционально), timeout (опционально).
"""
def __init__(self, message: str = "Network connection failed", **context: Any):
super().__init__(
f"[NETWORK_FAILURE] {message}",
{"type": "network", **context}
)

View File

@@ -1,42 +1,77 @@
# [MODULE] Сущности данных конфигурации # [MODULE] Сущности данных конфигурации
# @desc: Определяет структуры данных для работы с Superset API # @desc: Определяет структуры данных, используемые для конфигурации и трансформации в инструменте Superset.
# @contracts: # @contracts:
# - Проверка валидности URL # - Все модели наследуются от `pydantic.BaseModel` для автоматической валидации.
# - Валидация параметров аутентификации # - Валидация URL-адресов и параметров аутентификации.
# - Валидация структуры конфигурации БД для миграций.
# @coherence: # @coherence:
# - Все модели согласованы с API Superset v1 # - Все модели согласованы со схемой API Superset v1.
# - Совместимы с клиентскими методами # - Совместимы с клиентскими методами `SupersetClient` и утилитами.
# [IMPORTS] Models # [IMPORTS] Pydantic и Typing
from typing import Optional, Dict, Any from typing import Optional, Dict, Any, Union
from pydantic import BaseModel, validator,Field from pydantic import BaseModel, validator, Field, HttpUrl
# [COHERENCE_CHECK_PASSED] Все необходимые импорты для Pydantic моделей.
# [IMPORTS] Локальные модули
from .utils.logger import SupersetLogger from .utils.logger import SupersetLogger
class SupersetConfig(BaseModel): class SupersetConfig(BaseModel):
"""[CONFIG] Конфигурация подключения к Superset """[CONFIG] Конфигурация подключения к Superset API.
@semantic: Основные параметры подключения к API @semantic: Инкапсулирует основные параметры, необходимые для инициализации `SupersetClient`.
@invariant: @invariant:
- base_url должен содержать версию API (/v1/) - `base_url` должен быть валидным HTTP(S) URL и содержать `/api/v1`.
- auth должен содержать все обязательные поля - `auth` должен содержать обязательные поля для аутентификации по логину/паролю.
- `timeout` должен быть положительным числом.
""" """
base_url: str = Field(..., regex=r'.*/api/v1.*') base_url: str = Field(..., description="Базовый URL Superset API, включая версию /api/v1.", regex=r'.*/api/v1.*')
auth: dict auth: Dict[str, str] = Field(..., description="Словарь с данными для аутентификации (provider, username, password, refresh).")
verify_ssl: bool = True verify_ssl: bool = Field(True, description="Флаг для проверки SSL-сертификатов.")
timeout: int = 30 timeout: int = Field(30, description="Таймаут в секундах для HTTP-запросов.")
logger: Optional[SupersetLogger] = None logger: Optional[SupersetLogger] = Field(None, description="Экземпляр логгера для логирования внутри клиента.")
# [VALIDATOR] Проверка параметров аутентификации # [VALIDATOR] Проверка параметров аутентификации
@validator('auth') @validator('auth')
def validate_auth(cls, v): def validate_auth(cls, v: Dict[str, str]) -> Dict[str, str]:
"""[CONTRACT_VALIDATOR] Валидация словаря `auth`.
@pre:
- `v` должен быть словарем.
@post:
- Возвращает `v` если все обязательные поля присутствуют.
@raise:
- `ValueError`: Если отсутствуют обязательные поля ('provider', 'username', 'password', 'refresh').
"""
required = {'provider', 'username', 'password', 'refresh'} required = {'provider', 'username', 'password', 'refresh'}
if not required.issubset(v.keys()): if not required.issubset(v.keys()):
raise ValueError( raise ValueError(
f"[CONTRACT_VIOLATION] Auth must contain {required}" f"[CONTRACT_VIOLATION] Словарь 'auth' должен содержать поля: {required}. "
f"Отсутствующие: {required - v.keys()}"
) )
# [COHERENCE_CHECK_PASSED] Auth-конфигурация валидна.
return v
# [VALIDATOR] Проверка base_url
@validator('base_url')
def check_base_url_format(cls, v: str) -> str:
"""[CONTRACT_VALIDATOR] Валидация формата `base_url`.
@pre:
- `v` должна быть строкой.
@post:
- Возвращает `v` если это валидный URL.
@raise:
- `ValueError`: Если URL невалиден.
"""
try:
# Для Pydantic v2:
from pydantic import HttpUrl
HttpUrl(v, scheme="https") # Явное указание схемы
except ValueError:
# Для совместимости с Pydantic v1:
HttpUrl(v)
return v return v
class Config: class Config:
arbitrary_types_allowed = True arbitrary_types_allowed = True # Разрешаем Pydantic обрабатывать произвольные типы (например, SupersetLogger)
json_schema_extra = { json_schema_extra = {
"example": { "example": {
"base_url": "https://host/api/v1/", "base_url": "https://host/api/v1/",
@@ -45,28 +80,42 @@ class SupersetConfig(BaseModel):
"username": "user", "username": "user",
"password": "pass", "password": "pass",
"refresh": True "refresh": True
} },
"verify_ssl": True,
"timeout": 60
} }
} }
# [SEMANTIC-TYPE] Конфигурация БД для миграций # [SEMANTIC-TYPE] Конфигурация БД для миграций
class DatabaseConfig(BaseModel): class DatabaseConfig(BaseModel):
"""[CONFIG] Параметры трансформации БД при миграции """[CONFIG] Параметры трансформации баз данных при миграции дашбордов.
@semantic: Содержит old/new состояние для преобразования @semantic: Содержит `old` и `new` состояния конфигурации базы данных,
используемые для поиска и замены в YAML-файлах экспортированных дашбордов.
@invariant: @invariant:
- Должны быть указаны оба состояния (old/new) - `database_config` должен быть словарем с ключами 'old' и 'new'.
- UUID должен соответствовать формату - Каждое из 'old' и 'new' должно быть словарем, содержащим метаданные БД Superset.
""" """
database_config: Dict[str, Dict[str, Any]] database_config: Dict[str, Dict[str, Any]] = Field(..., description="Словарь, содержащий 'old' и 'new' конфигурации базы данных.")
logger: Optional[SupersetLogger] = None logger: Optional[SupersetLogger] = Field(None, description="Экземпляр логгера для логирования.")
@validator('database_config') @validator('database_config')
def validate_config(cls, v): def validate_config(cls, v: Dict[str, Dict[str, Any]]) -> Dict[str, Dict[str, Any]]:
"""[CONTRACT_VALIDATOR] Валидация словаря `database_config`.
@pre:
- `v` должен быть словарем.
@post:
- Возвращает `v` если содержит ключи 'old' и 'new'.
@raise:
- `ValueError`: Если отсутствуют ключи 'old' или 'new'.
"""
if not {'old', 'new'}.issubset(v.keys()): if not {'old', 'new'}.issubset(v.keys()):
raise ValueError( raise ValueError(
"[COHERENCE_ERROR] Config must contain both old/new states" "[CONTRACT_VIOLATION] 'database_config' должен содержать ключи 'old' и 'new'."
) )
# Дополнительно можно добавить проверку структуры `old` и `new` на наличие `uuid`, `database_name` и т.д.
# Для простоты пока ограничимся наличием ключей 'old' и 'new'.
# [COHERENCE_CHECK_PASSED] Конфигурация базы данных для миграции валидна.
return v return v
class Config: class Config:

View File

@@ -1,134 +1,377 @@
from typing import Optional, Dict, Any,BinaryIO,List # [MODULE] Сетевой клиент для API
import requests # @contract: Инкапсулирует низкоуровневую HTTP-логику, аутентификацию, повторные попытки и обработку сетевых ошибок.
# @semantic_layers:
# 1. Инициализация сессии `requests` с настройками SSL и таймаутов.
# 2. Управление аутентификацией (получение и обновление access/CSRF токенов).
# 3. Выполнение HTTP-запросов (GET, POST и т.д.) с автоматическими заголовками.
# 4. Обработка пагинации для API-ответов.
# 5. Обработка загрузки файлов.
# @coherence:
# - Полностью независим от `SupersetClient`, предоставляя ему чистый API для сетевых операций.
# - Использует `SupersetLogger` для внутреннего логирования.
# - Всегда выбрасывает типизированные исключения из `superset_tool.exceptions`.
# [IMPORTS] Стандартная библиотека
from typing import Optional, Dict, Any, BinaryIO, List, Union
import json import json
import urllib3 import io
from ..exceptions import AuthenticationError, NetworkError,DashboardNotFoundError,SupersetAPIError,PermissionDeniedError from pathlib import Path
# [IMPORTS] Сторонние библиотеки
import requests
import urllib3 # Для отключения SSL-предупреждений
# [IMPORTS] Локальные модули
from ..exceptions import AuthenticationError, NetworkError, DashboardNotFoundError, SupersetAPIError, PermissionDeniedError
from .logger import SupersetLogger # Импорт логгера
# [CONSTANTS]
DEFAULT_RETRIES = 3
DEFAULT_BACKOFF_FACTOR = 0.5
class APIClient: class APIClient:
"""[NETWORK-CORE] Инкапсулирует HTTP-логику для работы с API. """[NETWORK-CORE] Инкапсулирует HTTP-логику для работы с API.
@contract: Гарантирует retry, SSL-валидацию и стандартные заголовки. @contract:
- Гарантирует retry-механизмы для запросов.
- Выполняет SSL-валидацию или отключает ее по конфигурации.
- Автоматически управляет access и CSRF токенами.
- Преобразует HTTP-ошибки в типизированные исключения `superset_tool.exceptions`.
@pre:
- `base_url` должен быть валидным URL.
- `auth` должен содержать необходимые данные для аутентификации.
- `logger` должен быть инициализирован.
@post:
- Аутентификация выполняется при первом запросе или явно через `authenticate()`.
- `self._tokens` всегда содержит актуальные access/CSRF токены после успешной аутентификации.
@invariant:
- Сессия `requests` активна и настроена.
- Все запросы используют актуальные токены.
""" """
def __init__( def __init__(
self, self,
base_url: str, base_url: str,
auth: Dict[str, Any], auth: Dict[str, Any],
verify_ssl: bool = False, verify_ssl: bool = True,
timeout: int = 30 timeout: int = 30,
logger: Optional[SupersetLogger] = None
): ):
# [INIT] Основные параметры
self.base_url = base_url self.base_url = base_url
self.auth = auth self.auth = auth
self.session = self._init_session(verify_ssl) self.verify_ssl = verify_ssl
self.timeout = timeout self.timeout = timeout
self.logger = logger or SupersetLogger(name="APIClient") # [COHERENCE_CHECK_PASSED] Инициализация логгера
def _init_session(self, verify_ssl: bool) -> requests.Session: # [INIT] Сессия Requests
"""[NETWORK-INIT] Настройка сессии с адаптерами.""" self.session = self._init_session()
self._tokens: Dict[str, str] = {} # [STATE] Хранилище токенов
self._authenticated = False # [STATE] Флаг аутентификации
self.logger.debug(
"[INIT] APIClient инициализирован.",
extra={"base_url": self.base_url, "verify_ssl": self.verify_ssl}
)
def _init_session(self) -> requests.Session:
"""[HELPER] Настройка сессии `requests` с адаптерами и SSL-опциями.
@semantic: Создает и конфигурирует объект `requests.Session`.
"""
session = requests.Session() session = requests.Session()
session.mount('https://', requests.adapters.HTTPAdapter(max_retries=3)) # [CONTRACT] Настройка повторных попыток
session.verify = verify_ssl retries = requests.adapters.Retry(
if not verify_ssl: total=DEFAULT_RETRIES,
urllib3.disable_warnings() backoff_factor=DEFAULT_BACKOFF_FACTOR,
status_forcelist=[500, 502, 503, 504],
allowed_methods={"HEAD", "GET", "POST", "PUT", "DELETE"}
)
session.mount('http://', requests.adapters.HTTPAdapter(max_retries=retries))
session.mount('https://', requests.adapters.HTTPAdapter(max_retries=retries))
session.verify = self.verify_ssl
if not self.verify_ssl:
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
self.logger.warning("[SECURITY] Отключена проверка SSL-сертификатов. Не использовать в продакшене без явной необходимости.")
return session return session
def authenticate(self) -> Dict[str, str]: def authenticate(self) -> Dict[str, str]:
"""[AUTH-FLOW] Получение access и CSRF токенов.""" """[AUTH-FLOW] Получение access и CSRF токенов.
@pre:
- `self.auth` содержит валидные учетные данные.
@post:
- `self._tokens` обновлен актуальными токенами.
- Возвращает обновленные токены.
- `self._authenticated` устанавливается в `True`.
@raise:
- `AuthenticationError`: При ошибках аутентификации (неверные credentials, проблемы с API security).
- `NetworkError`: При проблемах с сетью.
"""
self.logger.info(f"[AUTH] Попытка аутентификации для {self.base_url}")
try: try:
# Шаг 1: Получение access_token
login_url = f"{self.base_url}/security/login"
response = self.session.post( response = self.session.post(
f"{self.base_url}/security/login", login_url,
json={**self.auth, "provider": "db", "refresh": True}, json=self.auth, # Используем self.auth, который уже имеет "provider": "db", "refresh": True
timeout=self.timeout timeout=self.timeout
) )
response.raise_for_status() response.raise_for_status() # Выбросит HTTPError для 4xx/5xx ответов
access_token = response.json()["access_token"] access_token = response.json()["access_token"]
self.logger.debug("[AUTH] Access token успешно получен.")
# Шаг 2: Получение CSRF токена
csrf_url = f"{self.base_url}/security/csrf_token/"
csrf_response = self.session.get( csrf_response = self.session.get(
f"{self.base_url}/security/csrf_token/", csrf_url,
headers={"Authorization": f"Bearer {access_token}"}, headers={"Authorization": f"Bearer {access_token}"},
timeout=self.timeout timeout=self.timeout
) )
csrf_response.raise_for_status() csrf_response.raise_for_status()
csrf_token = csrf_response.json()["result"]
self.logger.debug("[AUTH] CSRF token успешно получен.")
# [STATE] Сохранение токенов и обновление флага
self._tokens = {
"access_token": access_token,
"csrf_token": csrf_token
}
self._authenticated = True
self.logger.info("[COHERENCE_CHECK_PASSED] Аутентификация успешно завершена.")
return self._tokens
except requests.exceptions.HTTPError as e:
error_msg = f"HTTP Error during authentication: {e.response.status_code} - {e.response.text}"
self.logger.error(f"[AUTH_FAILED] {error_msg}", exc_info=True)
if e.response.status_code == 401: # Unauthorized
raise AuthenticationError(
f"Неверные учетные данные или истекший токен.",
url=login_url, username=self.auth.get("username"),
status_code=e.response.status_code, response_text=e.response.text
) from e
elif e.response.status_code == 403: # Forbidden
raise PermissionDeniedError(
"Недостаточно прав для аутентификации.",
url=login_url, username=self.auth.get("username"),
status_code=e.response.status_code, response_text=e.response.text
) from e
else:
raise SupersetAPIError(
f"API ошибка при аутентификации: {error_msg}",
url=login_url, status_code=e.response.status_code, response_text=e.response.text
) from e
except requests.exceptions.RequestException as e:
self.logger.error(f"[NETWORK_ERROR] Сетевая ошибка при аутентификации: {str(e)}", exc_info=True)
raise NetworkError(f"Ошибка сети при аутентификации: {str(e)}", url=login_url) from e
except KeyError as e:
self.logger.error(f"[AUTH_FAILED] Некорректный формат ответа при аутентификации: {str(e)}", exc_info=True)
raise AuthenticationError(f"Некорректный формат ответа API при аутентификации: {str(e)}") from e
except Exception as e:
self.logger.critical(f"[CRITICAL] Непредвиденная ошибка аутентификации: {str(e)}", exc_info=True)
raise AuthenticationError(f"Непредвиденная ошибка аутентификации: {str(e)}") from e
@property
def headers(self) -> Dict[str, str]:
"""[INTERFACE] Возвращает стандартные заголовки с текущими токенами.
@semantic: Если токены не получены, пытается выполнить аутентификацию.
@post: Всегда возвращает словарь с 'Authorization' и 'X-CSRFToken'.
@raise: `AuthenticationError` если аутентификация невозможна.
"""
if not self._authenticated:
self.authenticate() # Попытка аутентификации при первом запросе заголовков
# [CONTRACT] Проверка наличия токенов
if not self._tokens or "access_token" not in self._tokens or "csrf_token" not in self._tokens:
self.logger.error("[CONTRACT_VIOLATION] Токены отсутствуют после попытки аутентификации.", extra={"tokens": self._tokens})
raise AuthenticationError("Не удалось получить токены для заголовков.")
return { return {
"access_token": access_token, "Authorization": f"Bearer {self._tokens['access_token']}",
"csrf_token": csrf_response.json()["result"] "X-CSRFToken": self._tokens["csrf_token"],
"Referer": self.base_url,
"Content-Type": "application/json"
} }
except requests.exceptions.RequestException as e:
raise NetworkError(f"Auth failed: {str(e)}")
def request( def request(
self, self,
method: str, method: str,
endpoint: str, endpoint: str,
headers: Optional[Dict] = None, headers: Optional[Dict] = None,
raw_response: bool = False,
**kwargs **kwargs
) -> requests.Response: ) -> Union[requests.Response, Dict[str, Any]]:
"""[NETWORK-CORE] Обертка для запросов с обработкой ошибок.""" """[NETWORK-CORE] Обертка для всех HTTP-запросов к Superset API.
@semantic:
- Выполняет запрос с заданными параметрами.
- Автоматически добавляет базовые заголовки (токены, CSRF).
- Обрабатывает HTTP-ошибки и преобразует их в типизированные исключения.
- В случае 401/403, пытается обновить токен и повторить запрос один раз.
@pre:
- `method` - валидный HTTP-метод ('GET', 'POST', 'PUT', 'DELETE').
- `endpoint` - валидный путь API.
@post:
- Возвращает объект `requests.Response` (если `raw_response=True`) или `dict` (JSON-ответ).
@raise:
- `AuthenticationError`, `PermissionDeniedError`, `NetworkError`, `SupersetAPIError`, `DashboardNotFoundError`.
"""
full_url = f"{self.base_url}{endpoint}"
self.logger.debug(f"[REQUEST] Выполнение запроса: {method} {full_url}", extra={"kwargs_keys": list(kwargs.keys())})
# [STATE] Заголовки для текущего запроса
_headers = self.headers.copy() # Получаем базовые заголовки с актуальными токенами
if headers: # Объединяем с переданными кастомными заголовками (переданные имеют приоритет)
_headers.update(headers)
retries_left = 1 # Одна попытка на обновление токена
while retries_left >= 0:
try: try:
response = self.session.request( response = self.session.request(
method, method,
f"{self.base_url}{endpoint}", full_url,
headers=headers, headers=_headers,
timeout=self.timeout, #timeout=self.timeout,
**kwargs **kwargs
) )
response.raise_for_status() response.raise_for_status() # Проверяем статус сразу
return response self.logger.debug(f"[COHERENCE_CHECK_PASSED] Запрос {method} {endpoint} успешно выполнен.")
return response if raw_response else response.json()
except requests.exceptions.HTTPError as e: except requests.exceptions.HTTPError as e:
if e.response.status_code == 404: status_code = e.response.status_code
raise DashboardNotFoundError(endpoint) error_context = {
raise SupersetAPIError(str(e)) "method": method,
"url": full_url,
"status_code": status_code,
"response_text": e.response.text
}
if status_code in [401, 403] and retries_left > 0:
self.logger.warning(f"[AUTH_REFRESH] Токен истек или недействителен ({status_code}). Попытка обновить и повторить...", extra=error_context)
try:
self.authenticate() # Попытка обновить токены
_headers = self.headers.copy() # Обновляем заголовки с новыми токенами
if headers:
_headers.update(headers)
retries_left -= 1
continue # Повторяем цикл
except AuthenticationError as auth_err:
self.logger.error("[AUTH_FAILED] Не удалось обновить токены.", exc_info=True)
raise PermissionDeniedError("Аутентификация не удалась или права отсутствуют после обновления токена.", **error_context) from auth_err
# [ERROR_MAPPING] Преобразование стандартных HTTP-ошибок в кастомные исключения
if status_code == 404:
raise DashboardNotFoundError(endpoint, context=error_context) from e
elif status_code == 403:
raise PermissionDeniedError("Доступ запрещен.", **error_context) from e
elif status_code == 401:
raise AuthenticationError("Аутентификация не удалась.", **error_context) from e
else:
raise SupersetAPIError(f"Ошибка API: {status_code} - {e.response.text}", **error_context) from e
except requests.exceptions.Timeout as e:
self.logger.error(f"[NETWORK_ERROR] Таймаут запроса: {str(e)}", exc_info=True, extra={"url": full_url})
raise NetworkError("Таймаут запроса", url=full_url) from e
except requests.exceptions.ConnectionError as e:
self.logger.error(f"[NETWORK_ERROR] Ошибка соединения: {str(e)}", exc_info=True, extra={"url": full_url})
raise NetworkError("Ошибка соединения", url=full_url) from e
except requests.exceptions.RequestException as e:
self.logger.critical(f"[CRITICAL] Неизвестная ошибка запроса: {str(e)}", exc_info=True, extra={"url": full_url})
raise NetworkError(f"Неизвестная сетевая ошибка: {str(e)}", url=full_url) from e
except json.JSONDecodeError as e:
self.logger.error(f"[API_FAILED] Ошибка парсинга JSON ответа: {str(e)}", exc_info=True, extra={"url": full_url, "response_text_sample": response.text[:200]})
raise SupersetAPIError(f"Некорректный JSON ответ: {str(e)}", url=full_url) from e
except Exception as e:
self.logger.critical(f"[CRITICAL] Непредвиденная ошибка в APIClient.request: {str(e)}", exc_info=True, extra={"url": full_url})
raise SupersetAPIError(f"Непредвиденная ошибка: {str(e)}", url=full_url) from e
# [COHERENCE_CHECK_FAILED] Если дошли сюда, значит, все повторные попытки провалились
self.logger.error(f"[CONTRACT_VIOLATION] Все повторные попытки для запроса {method} {endpoint} исчерпаны.")
raise SupersetAPIError(f"Все повторные попытки запроса {method} {endpoint} исчерпаны.")
def upload_file( def upload_file(
self, self,
endpoint: str, endpoint: str,
file_obj: BinaryIO, file_obj: Union[str, Path, BinaryIO], # Может быть Path, str или байтовый поток
file_name: str, file_name: str,
form_field: str = "file", form_field: str = "file",
extra_data: Optional[Dict] = None, extra_data: Optional[Dict] = None,
timeout: Optional[int] = None timeout: Optional[int] = None
) -> Dict: ) -> Dict:
"""[NETWORK] Отправка файла на сервер """[CONTRACT] Отправка файла на сервер через POST-запрос.
@params: @pre:
endpoint: API endpoint - `endpoint` - валидный API endpoint для загрузки.
file_obj: файловый объект - `file_obj` - путь к файлу или открытый бинарный файловый объект.
file_name: имя файла - `file_name` - имя файла для отправки в форме.
form_field: имя поля формы @post:
extra_data: дополнительные данные - Возвращает JSON-ответ от сервера в виде словаря.
timeout: таймаут запроса @raise:
@return: - `FileNotFoundError`: Если `file_obj` является путем и файл не найден.
Ответ сервера (JSON) - `PermissionDeniedError`: Если недостаточно прав.
- `SupersetAPIError`, `NetworkError`.
""" """
files = {form_field: (file_name, file_obj, 'application/x-zip-compressed')} full_url = f"{self.base_url}{endpoint}"
headers = { _headers = self.headers.copy()
k: v for k, v in self.headers.items() # [IMPORTANT] Content-Type для files формируется requests, поэтому удаляем его из общих заголовков
if k.lower() != 'content-type' _headers.pop('Content-Type', None)
}
files_payload = None
should_close_file = False
if isinstance(file_obj, (str, Path)):
file_path = Path(file_obj)
if not file_path.exists():
self.logger.error(f"[CONTRACT_VIOLATION] Файл для загрузки не найден: {file_path}", extra={"file_path": str(file_path)})
raise FileNotFoundError(f"Файл {file_path} не найден для загрузки.")
files_payload = {form_field: (file_name, open(file_path, 'rb'), 'application/x-zip-compressed')}
should_close_file = True
self.logger.debug(f"[UPLOAD] Загрузка файла из пути: {file_path}")
elif isinstance(file_obj, io.BytesIO): # In-memory binary file
files_payload = {form_field: (file_name, file_obj.getvalue(), 'application/x-zip-compressed')}
self.logger.debug(f"[UPLOAD] Загрузка файла из байтового потока (in-memory).")
elif hasattr(file_obj, 'read') and hasattr(file_obj, 'seek'): # Generic binary file-like object
files_payload = {form_field: (file_name, file_obj, 'application/x-zip-compressed')}
self.logger.debug(f"[UPLOAD] Загрузка файла из файлового объекта.")
else:
self.logger.error(f"[CONTRACT_VIOLATION] Неподдерживаемый тип файла для загрузки: {type(file_obj).__name__}")
raise TypeError("Неподдерживаемый тип 'file_obj'. Ожидается Path, str, io.BytesIO или другой файлоподобный объект.")
try: try:
response = self.session.post( response = self.session.post(
url=f"{self.base_url}{endpoint}", url=full_url,
files=files, files=files_payload,
data=extra_data or {}, data=extra_data or {},
headers=headers, headers=_headers,
timeout=timeout or self.timeout timeout=timeout or self.timeout
) )
if response.status_code == 403:
raise PermissionDeniedError("Доступ запрещен")
response.raise_for_status() response.raise_for_status()
# [COHERENCE_CHECK_PASSED] Файл успешно загружен.
self.logger.info(f"[UPLOAD_SUCCESS] Файл '{file_name}' успешно загружен на {endpoint}.")
return response.json() return response.json()
except requests.exceptions.RequestException as e: except requests.exceptions.HTTPError as e:
error_ctx = { error_context = {
"endpoint": endpoint, "endpoint": endpoint,
"file": file_name, "file": file_name,
"status_code": getattr(e.response, 'status_code', None) "status_code": e.response.status_code,
"response_text": e.response.text
} }
self.logger.error( if e.response.status_code == 403:
"[NETWORK_ERROR] Ошибка загрузки файла", raise PermissionDeniedError("Доступ запрещен для загрузки файла.", **error_context) from e
extra=error_ctx else:
) raise SupersetAPIError(f"Ошибка API при загрузке файла: {e.response.status_code} - {e.response.text}", **error_context) from e
raise except requests.exceptions.RequestException as e:
error_context = {"endpoint": endpoint, "file": file_name, "error_type": type(e).__name__}
self.logger.error(f"[NETWORK_ERROR] Ошибка запроса при загрузке файла: {str(e)}", exc_info=True, extra=error_context)
raise NetworkError(f"Ошибка сети при загрузке файла: {str(e)}", url=full_url) from e
except Exception as e:
error_context = {"endpoint": endpoint, "file": file_name, "error_type": type(e).__name__}
self.logger.critical(f"[CRITICAL] Непредвиденная ошибка при загрузке файла: {str(e)}", exc_info=True, extra=error_context)
raise SupersetAPIError(f"Непредвиденная ошибка загрузки файла: {str(e)}", context=error_context) from e
finally:
# Закрываем файл, если он был открыт в этом методе
if should_close_file and files_payload and files_payload[form_field] and hasattr(files_payload[form_field][1], 'close'):
files_payload[form_field][1].close()
self.logger.debug(f"[UPLOAD] Закрыт файл '{file_name}'.")
def fetch_paginated_count( def fetch_paginated_count(
self, self,
@@ -137,39 +380,44 @@ class APIClient:
count_field: str = "count", count_field: str = "count",
timeout: Optional[int] = None timeout: Optional[int] = None
) -> int: ) -> int:
"""[NETWORK] Получение общего количества элементов в пагинированном API """[CONTRACT] Получение общего количества элементов в пагинированном API.
@params: @delegates:
endpoint: API endpoint без query-параметров - Использует `self.request` для выполнения HTTP-запроса.
query_params: параметры для пагинации @pre:
count_field: поле с количеством в ответе - `endpoint` должен указывать на пагинированный ресурс.
timeout: таймаут запроса - `query_params` должны быть валидны для запроса количества.
@return: @post:
Общее количество элементов - Возвращает целочисленное количество элементов.
@errors: @raise:
- NetworkError: проблемы с соединением - `NetworkError`, `SupersetAPIError`, `KeyError` (если `count_field` не найден).
- KeyError: некорректный формат ответа
""" """
self.logger.debug(f"[PAGINATION] Запрос количества элементов для {endpoint} с параметрами: {query_params}")
try: try:
response = self.request( response_json = self.request(
method="GET", method="GET",
endpoint=endpoint, endpoint=endpoint,
params={"q": json.dumps(query_params)}, params={"q": json.dumps(query_params)},
timeout=timeout or self.timeout timeout=timeout or self.timeout
) )
if count_field not in response: if count_field not in response_json:
raise KeyError(f"Ответ API не содержит поле {count_field}") self.logger.error(
f"[CONTRACT_VIOLATION] Ответ API для {endpoint} не содержит поле '{count_field}'",
extra={"response_keys": list(response_json.keys())}
)
raise KeyError(f"Ответ API для {endpoint} не содержит поле '{count_field}'")
return response[count_field] count = response_json[count_field]
self.logger.debug(f"[COHERENCE_CHECK_PASSED] Получено количество: {count} для {endpoint}.")
return count
except requests.exceptions.RequestException as e: except (KeyError, SupersetAPIError, NetworkError, PermissionDeniedError, DashboardNotFoundError) as e:
error_ctx = { self.logger.error(f"[ERROR] Ошибка получения количества элементов для {endpoint}: {str(e)}", exc_info=True, extra=getattr(e, 'context', {}))
"endpoint": endpoint, raise
"params": query_params, except Exception as e:
"error": str(e) error_ctx = {"endpoint": endpoint, "params": query_params, "error_type": type(e).__name__}
} self.logger.critical(f"[CRITICAL] Непредвиденная ошибка при получении количества: {str(e)}", exc_info=True, extra=error_ctx)
self.logger.error("[PAGINATION_ERROR]", extra=error_ctx) raise SupersetAPIError(f"Непредвиденная ошибка при получении count для {endpoint}: {str(e)}", context=error_ctx) from e
raise NetworkError(f"Ошибка пагинации: {str(e)}") from e
def fetch_paginated_data( def fetch_paginated_data(
self, self,
@@ -179,37 +427,53 @@ class APIClient:
results_field: str = "result", results_field: str = "result",
timeout: Optional[int] = None timeout: Optional[int] = None
) -> List[Any]: ) -> List[Any]:
"""[NETWORK] Получение всех данных с пагинированного API """[CONTRACT] Получение всех данных с пагинированного API.
@params: @delegates:
endpoint: API endpoint - Использует `self.request` для выполнения запросов по страницам.
base_query: базовые параметры запроса (без page) @pre:
total_count: общее количество элементов - `base_query` должен содержать 'page_size'.
results_field: поле с данными в ответе - `total_count` должен быть корректным общим количеством элементов.
timeout: таймаут для запросов @post:
@return: - Возвращает список всех собранных данных со всех страниц.
Собранные данные со всех страниц @raise:
- `NetworkError`, `SupersetAPIError`, `ValueError` (если `page_size` невалиден), `KeyError`.
""" """
page_size = base_query['page_size'] self.logger.debug(f"[PAGINATION] Запуск получения всех данных для {endpoint}. Total: {total_count}, Base Query: {base_query}")
page_size = base_query.get('page_size')
if not page_size or page_size <= 0:
self.logger.error("[CONTRACT_VIOLATION] 'page_size' в базовом запросе невалиден.", extra={"page_size": page_size})
raise ValueError("Параметр 'page_size' должен быть положительным числом.")
total_pages = (total_count + page_size - 1) // page_size total_pages = (total_count + page_size - 1) // page_size
results = [] results = []
for page in range(total_pages): for page in range(total_pages):
query = {**base_query, 'page': page} query = {**base_query, 'page': page}
self.logger.debug(f"[PAGINATION] Запрос страницы {page+1}/{total_pages} для {endpoint}.")
response = self._execute_request( try:
response_json = self.request(
method="GET", method="GET",
endpoint=endpoint, endpoint=endpoint,
params={"q": json.dumps(query)}, params={"q": json.dumps(query)},
timeout=timeout or self.timeout timeout=timeout or self.timeout
) )
if results_field not in response: if results_field not in response_json:
self.logger.warning( self.logger.warning(
f"Ответ не содержит поле {results_field}", f"[CONTRACT_VIOLATION] Ответ API для {endpoint} на странице {page} не содержит поле '{results_field}'",
extra={"response": response.keys()} extra={"response_keys": list(response_json.keys())}
) )
# Если поле результатов отсутствует на одной странице, это может быть не фатально, но надо залогировать.
continue continue
results.extend(response[results_field]) results.extend(response_json[results_field])
except (SupersetAPIError, NetworkError, PermissionDeniedError, DashboardNotFoundError) as e:
self.logger.error(f"[ERROR] Ошибка получения страницы {page+1} для {endpoint}: {str(e)}", exc_info=True, extra=getattr(e, 'context', {}))
raise # Пробрасываем ошибку выше, так как не можем продолжить пагинацию
except Exception as e:
error_ctx = {"endpoint": endpoint, "page": page, "error_type": type(e).__name__}
self.logger.critical(f"[CRITICAL] Непредвиденная ошибка при получении страницы {page+1} для {endpoint}: {str(e)}", exc_info=True, extra=error_ctx)
raise SupersetAPIError(f"Непредвиденная ошибка пагинации для {endpoint}: {str(e)}", context=error_ctx) from e
self.logger.debug(f"[COHERENCE_CHECK_PASSED] Все данные с пагинацией для {endpoint} успешно собраны. Всего элементов: {len(results)}")
return results return results