Merge branch 'gemini-refactor' into 'master'

Gemini refactor

See merge request dwh_bi/superset-tools!1
This commit is contained in:
Волобуев Андрей Александрович (VolobuevAA)
2025-06-27 17:55:58 +03:00
9 changed files with 1462 additions and 651 deletions

7
.gitignore vendored
View File

@@ -1,11 +1,6 @@
__pycache__ *__pycache__*
dashboards/
БД/
*.ps1 *.ps1
*.ipynb *.ipynb
*.txt *.txt
*.zip *.zip
keyring passwords.py keyring passwords.py
Logs/
patch1.patch
test postman.py

View File

@@ -0,0 +1,78 @@
# Superset Automation Tools
## Overview
This repository contains Python scripts and a utility library (`superset_tool`) designed to automate common tasks with Apache Superset, such as:
- **Backup**: Exporting all dashboards from Superset instances to local storage.
- **Migration**: Transferring and transforming dashboards between different Superset environments (e.g., Development, Sandbox, Production).
## Project Structure
- `backup_script.py`: Main script for performing scheduled backups of Superset dashboards.
- `migration_script.py`: Main script for migrating specific dashboards between environments, including database connection remapping.
- `superset_tool/`:
- `client.py`: Python client for interacting with the Superset API.
- `exceptions.py`: Custom exception classes for structured error handling.
- `models.py`: Pydantic models for configuration data validation.
- `utils/`:
- `fileio.py`: Utilities for file system operations (archive handling, YAML parsing).
- `logger.py`: Custom logger configuration for consistent logging across the project.
- `network.py`: Low-level HTTP client for network requests, handling authentication and retries.
## Setup
### Prerequisites
- Python 3.9+
- `pip` for package management.
- `keyring` for secure password storage.
### Installation
1. **Clone the repository:**
```bash
git clone https://your-repo-link/superset-automation.git
cd superset-automation
```
2. **Install dependencies:**
```bash
pip install -r requirements.txt
```
(You might need to create `requirements.txt` with `pydantic`, `requests`, `keyring`, `PyYAML`, `urllib3`)
3. **Configure Passwords:**
Use `keyring` to store your Superset API user passwords.
Example for `backup_script.py`:
```python
import keyring
keyring.set_password("system", "dev migrate", "your_dev_password")
keyring.set_password("system", "prod migrate", "your_prod_password")
keyring.set_password("system", "sandbox migrate", "your_sandbox_password")
```
Replace `"system"` with a suitable service name if preferred.
## Usage
### Backup Script (`backup_script.py`)
To back up dashboards from configured Superset environments:
```bash
python backup_script.py
```
Backup files will be saved to `P:\Superset\010 Бекапы\` by default. Logs will be in `P:\Superset\010 Бекапы\Logs`.
### Migration Script (`migration_script.py`)
To migrate a specific dashboard:
```bash
python migration_script.py
```
**Note:** This script is currently configured to migrate a hardcoded dashboard slug (`FI0070`) and uses a local `.zip` file for the dashboard source. **For production use, you should modify the script to:**
- Accept dashboard slug/ID as command-line arguments.
- Utilize the `from_c.export_dashboard()` method instead of reading from a local file.
- Potentially parameterize `from_c` and `to_c` environments.
## Logging
Logs are configured to be written to a file within a `Logs` directory (e.g., `P:\Superset\010 Бекапы\Logs` for backups) and also output to the console. The log level is set to `INFO` by default.
## Development & Contribution
- Follow the architectural patterns (`[MODULE]`, `[CONTRACT]`, `[SECTION]`, `[ANCHOR]`) and logging guidelines for consistency.
- Ensure all new code adheres to the principles of "LLM-friendly" code generation.
- Use `Pydantic` models for data validation.
- Implement comprehensive error handling using custom exceptions.
---
[COHERENCE_CHECK_PASSED] README.md создан и согласован с модулями.

View File

@@ -1,20 +1,48 @@
# [MODULE] Superset Dashboard Backup Script
# @contract: Автоматизирует процесс резервного копирования дашбордов Superset из различных окружений.
# @semantic_layers:
# 1. Инициализация логгера и клиентов Superset.
# 2. Выполнение бэкапа для каждого окружения (DEV, SBX, PROD).
# 3. Формирование итогового отчета.
# @coherence:
# - Использует `SupersetClient` для взаимодействия с API Superset.
# - Использует `SupersetLogger` для централизованного логирования.
# - Работает с `Pathlib` для управления файлами и директориями.
# - Интегрируется с `keyring` для безопасного хранения паролей.
# [IMPORTS] Стандартная библиотека
import logging import logging
from datetime import datetime from datetime import datetime
import shutil import shutil
import keyring
import os import os
from pathlib import Path from pathlib import Path
from superset_tool.models import SupersetConfig, DatabaseConfig
# [IMPORTS] Сторонние библиотеки
import keyring
# [IMPORTS] Локальные модули
from superset_tool.models import SupersetConfig
from superset_tool.client import SupersetClient from superset_tool.client import SupersetClient
from superset_tool.utils.logger import SupersetLogger from superset_tool.utils.logger import SupersetLogger
from superset_tool.utils.fileio import save_and_unpack_dashboard, archive_exports, sanitize_filename from superset_tool.utils.fileio import save_and_unpack_dashboard, archive_exports, sanitize_filename
# [COHERENCE_CHECK_PASSED] Все необходимые модули импортированы и согласованы.
# [FUNCTION] setup_clients
# @contract: Инициализирует и возвращает SupersetClient для каждого заданного окружения.
# @pre:
# - `keyring` должен содержать необходимые пароли для "dev migrate", "prod migrate", "sandbox migrate".
# - `logger` должен быть инициализирован.
# @post:
# - Возвращает словарь {env_name: SupersetClient_instance}.
# - Логирует успешную инициализацию или ошибку.
# @raise:
# - `Exception`: При любой ошибке в процессе инициализации клиентов (например, отсутствие пароля в keyring, проблемы с сетью при первой аутентификации).
def setup_clients(logger: SupersetLogger): def setup_clients(logger: SupersetLogger):
"""Инициализация клиентов для разных окружений""" """Инициализация клиентов для разных окружений"""
# [ANCHOR] CLIENTS_INITIALIZATION
clients = {} clients = {}
try: try:
# Конфигурация для Dev # [INFO] Инициализация конфигурации для Dev
dev_config = SupersetConfig( dev_config = SupersetConfig(
base_url="https://devta.bi.dwh.rusal.com/api/v1", base_url="https://devta.bi.dwh.rusal.com/api/v1",
auth={ auth={
@@ -23,11 +51,11 @@ def setup_clients(logger: SupersetLogger):
"password": keyring.get_password("system", "dev migrate"), "password": keyring.get_password("system", "dev migrate"),
"refresh": True "refresh": True
}, },
logger=logger,
verify_ssl=False verify_ssl=False
) )
# [DEBUG] Dev config created: {dev_config.base_url}
# Конфигурация для Prod # [INFO] Инициализация конфигурации для Prod
prod_config = SupersetConfig( prod_config = SupersetConfig(
base_url="https://prodta.bi.dwh.rusal.com/api/v1", base_url="https://prodta.bi.dwh.rusal.com/api/v1",
auth={ auth={
@@ -36,11 +64,11 @@ def setup_clients(logger: SupersetLogger):
"password": keyring.get_password("system", "prod migrate"), "password": keyring.get_password("system", "prod migrate"),
"refresh": True "refresh": True
}, },
logger=logger,
verify_ssl=False verify_ssl=False
) )
# [DEBUG] Prod config created: {prod_config.base_url}
# Конфигурация для Sandbox # [INFO] Инициализация конфигурации для Sandbox
sandbox_config = SupersetConfig( sandbox_config = SupersetConfig(
base_url="https://sandboxta.bi.dwh.rusal.com/api/v1", base_url="https://sandboxta.bi.dwh.rusal.com/api/v1",
auth={ auth={
@@ -49,113 +77,215 @@ def setup_clients(logger: SupersetLogger):
"password": keyring.get_password("system", "sandbox migrate"), "password": keyring.get_password("system", "sandbox migrate"),
"refresh": True "refresh": True
}, },
logger=logger,
verify_ssl=False verify_ssl=False
) )
# [DEBUG] Sandbox config created: {sandbox_config.base_url}
clients['dev'] = SupersetClient(dev_config) # [INFO] Создание экземпляров SupersetClient
clients['sbx'] = SupersetClient(sandbox_config) clients['dev'] = SupersetClient(dev_config, logger)
clients['prod'] = SupersetClient(prod_config) clients['sbx'] = SupersetClient(sandbox_config,logger)
logger.info("Клиенты для окружений успешно инициализированы") clients['prod'] = SupersetClient(prod_config,logger)
logger.info("[COHERENCE_CHECK_PASSED] Клиенты для окружений успешно инициализированы", extra={"envs": list(clients.keys())})
return clients return clients
except Exception as e: except Exception as e:
logger.error(f"Ошибка инициализации клиентов: {str(e)}") logger.error(f"[ERROR] Ошибка инициализации клиентов: {str(e)}", exc_info=True)
raise raise
def backup_dashboards(client, # [FUNCTION] backup_dashboards
env_name, # @contract: Выполняет бэкап всех доступных дашбордов для заданного клиента и окружения.
backup_root, # @pre:
logger: SupersetLogger ): # - `client` должен быть инициализированным экземпляром `SupersetClient`.
"""Выполнение бэкапа дашбордов для указанного окружения""" # - `env_name` должен быть строкой, обозначающей окружение.
# - `backup_root` должен быть валидным путем к корневой директории бэкапа.
# - `logger` должен быть инициализирован.
# @post:
# - Дашборды экспортируются и сохраняются в поддиректориях `backup_root/env_name/dashboard_title`.
# - Старые экспорты архивируются.
# - Возвращает `True` если все дашборды были экспортированы без критических ошибок, `False` иначе.
# @side_effects:
# - Создает директории и файлы в файловой системе.
# - Логирует статус выполнения, успешные экспорты и ошибки.
# @exceptions:
# - `SupersetAPIError`, `NetworkError`, `DashboardNotFoundError`, `ExportError` могут быть подняты методами `SupersetClient` и будут логированы.
def backup_dashboards(client: SupersetClient, env_name: str, backup_root: Path, logger: SupersetLogger) -> bool:
"""Выполнение бэкапа дашбордов с детальным логированием ошибок"""
# [ANCHOR] DASHBOARD_BACKUP_PROCESS
logger.info(f"[INFO] Запуск бэкапа дашбордов для окружения: {env_name}")
try: try:
dashboard_count, dashboard_meta = client.get_dashboards() dashboard_count, dashboard_meta = client.get_dashboards()
total = 0 logger.info(f"[INFO] Найдено {dashboard_count} дашбордов для экспорта в {env_name}")
success = 0 if dashboard_count == 0:
logger.warning(f"[WARN] Нет дашбордов для экспорта в {env_name}. Процесс завершен.")
return True
success_count = 0
error_details = []
for db in dashboard_meta: for db in dashboard_meta:
if db['slug']: dashboard_id = db.get('id')
total += 1 dashboard_title = db.get('dashboard_title', 'Unknown Dashboard')
dashboard_title = db['dashboard_title'] dashboard_slug = db.get('slug', 'unknown-slug') # Используем slug для уникальности
# [PRECONDITION] Проверка наличия ID и slug
if not dashboard_id or not dashboard_slug:
logger.warning(
f"[SKIP] Пропущен дашборд с неполными метаданными: {dashboard_title} (ID: {dashboard_id}, Slug: {dashboard_slug})",
extra={'dashboard_meta': db}
)
continue
logger.debug(f"[DEBUG] Попытка экспорта дашборда: '{dashboard_title}' (ID: {dashboard_id})")
try:
# [ANCHOR] CREATE_DASHBOARD_DIR
# Используем slug в пути для большей уникальности и избежания конфликтов имен
dashboard_base_dir_name = sanitize_filename(f"{dashboard_slug}-{dashboard_title}")
dashboard_dir = backup_root / env_name / dashboard_base_dir_name
dashboard_dir.mkdir(parents=True, exist_ok=True)
logger.debug(f"[DEBUG] Директория для дашборда: {dashboard_dir}")
# [ANCHOR] EXPORT_DASHBOARD_ZIP
zip_content, filename = client.export_dashboard(dashboard_id)
# [ANCHOR] SAVE_AND_UNPACK
# Сохраняем только ZIP-файл, распаковка здесь не нужна для бэкапа
save_and_unpack_dashboard(
zip_content=zip_content,
original_filename=filename,
output_dir=dashboard_dir,
unpack=False, # Только сохраняем ZIP, не распаковываем для бэкапа
logger=logger
)
logger.info(f"[INFO] Дашборд '{dashboard_title}' (ID: {dashboard_id}) успешно экспортирован.")
# [ANCHOR] ARCHIVE_OLD_BACKUPS
try: try:
dashboard_dir = Path(backup_root , env_name , sanitize_filename(dashboard_title)) archive_exports(dashboard_dir, logger=logger)
dashboard_dir.mkdir(parents=True, exist_ok=True) logger.debug(f"[DEBUG] Старые экспорты для '{dashboard_title}' архивированы.")
except Exception as cleanup_error:
zip_content, filename = client.export_dashboard(db['id']) logger.warning(
zip_path = save_and_unpack_dashboard( f"[WARN] Ошибка архивирования старых бэкапов для '{dashboard_title}': {cleanup_error}",
zip_content=zip_content, exc_info=False # Не показываем полный traceback для очистки, т.к. это второстепенно
original_filename=filename,
output_dir=dashboard_dir,
unpack=False
) )
success += 1 success_count += 1
#Очистка старых бэкапов
try: except Exception as db_error:
archive_exports(dashboard_dir) error_info = {
except Exception as cleanup_error: 'dashboard_id': dashboard_id,
raise cleanup_error 'dashboard_title': dashboard_title,
'error_message': str(db_error),
except Exception as db_error: 'env': env_name,
raise db_error 'error_type': type(db_error).__name__
return success == total }
error_details.append(error_info)
logger.error(
f"[ERROR] Ошибка экспорта дашборда '{dashboard_title}' (ID: {dashboard_id})",
extra=error_info, exc_info=True # Логируем полный traceback для ошибок экспорта
)
if error_details:
logger.error(
f"[COHERENCE_CHECK_FAILED] Итоги экспорта для {env_name}:",
extra={'success_count': success_count, 'errors': error_details, 'total_dashboards': dashboard_count}
)
return False
else:
logger.info(
f"[COHERENCE_CHECK_PASSED] Все {success_count} дашбордов для {env_name} успешно экспортированы."
)
return True
except Exception as e: except Exception as e:
logger.critical(
f"[CRITICAL] Фатальная ошибка бэкапа для окружения {env_name}: {str(e)}",
exc_info=True
)
return False return False
def main(): # [FUNCTION] main
# Инициализация логгера # @contract: Основная точка входа скрипта.
log_dir = Path("P:\\Superset\\010 Бекапы\\Logs") # @semantic: Координирует инициализацию, выполнение бэкапа и логирование результатов.
# @post:
# - Возвращает 0 при успешном выполнении, 1 при фатальной ошибке.
# @side_effects:
# - Инициализирует логгер.
# - Вызывает `setup_clients` и `backup_dashboards`.
# - Записывает логи в файл и выводит в консоль.
def main() -> int:
"""Основная функция выполнения бэкапа"""
# [ANCHOR] MAIN_EXECUTION_START
# [CONFIG] Инициализация логгера
# @invariant: Логгер должен быть доступен на протяжении всей работы скрипта.
log_dir = Path("P:\\Superset\\010 Бекапы\\Logs") # [COHERENCE_NOTE] Убедитесь, что путь доступен.
logger = SupersetLogger( logger = SupersetLogger(
log_dir=log_dir, log_dir=log_dir,
level=logging.INFO, level=logging.INFO,
console=True console=True
) )
"""Основная функция выполнения бэкапа"""
logger.info("="*50) logger.info("="*50)
logger.info("Запуск процесса бэкапа Superset") logger.info("[INFO] Запуск процесса бэкапа Superset")
logger.info("="*50) logger.info("="*50)
exit_code = 0 # [STATE] Код выхода скрипта
try: try:
# [ANCHOR] CLIENT_SETUP
clients = setup_clients(logger) clients = setup_clients(logger)
# [CONFIG] Определение корневой директории для бэкапов
# @invariant: superset_backup_repo должен быть доступен для записи.
superset_backup_repo = Path("P:\\Superset\\010 Бекапы") superset_backup_repo = Path("P:\\Superset\\010 Бекапы")
superset_backup_repo.mkdir(parents=True, exist_ok=True) # Гарантируем существование директории
logger.info(f"[INFO] Корневая директория бэкапов: {superset_backup_repo}")
# Бэкап для DEV # [ANCHOR] BACKUP_DEV_ENVIRONMENT
dev_success = backup_dashboards( dev_success = backup_dashboards(
clients['dev'], clients['dev'],
"DEV", "DEV",
superset_backup_repo, superset_backup_repo,
logger=logger logger=logger
) )
#Бэкап для Sandbox # [ANCHOR] BACKUP_SBX_ENVIRONMENT
sbx_success = backup_dashboards( sbx_success = backup_dashboards(
clients['sbx'], clients['sbx'],
"SBX", "SBX",
superset_backup_repo, superset_backup_repo,
logger=logger logger=logger
) )
#Бэкап для Прода
# [ANCHOR] BACKUP_PROD_ENVIRONMENT
prod_success = backup_dashboards( prod_success = backup_dashboards(
clients['prod'], clients['prod'],
"PROD", "PROD",
superset_backup_repo, superset_backup_repo,
logger=logger logger=logger
) )
# Итоговый отчет # [ANCHOR] FINAL_REPORT
# [INFO] Итоговый отчет о выполнении бэкапа
logger.info("="*50) logger.info("="*50)
logger.info("Итоги выполнения бэкапа:") logger.info("[INFO] Итоги выполнения бэкапа:")
logger.info(f"DEV: {'Успешно' if dev_success else 'С ошибками'}") logger.info(f"[INFO] DEV: {'Успешно' if dev_success else 'С ошибками'}")
logger.info(f"SBX: {'Успешно' if sbx_success else 'С ошибками'}") logger.info(f"[INFO] SBX: {'Успешно' if sbx_success else 'С ошибками'}")
logger.info(f"PROD: {'Успешно' if prod_success else 'С ошибками'}") logger.info(f"[INFO] PROD: {'Успешно' if prod_success else 'С ошибками'}")
logger.info(f"Полный лог доступен в: {log_dir}") logger.info(f"[INFO] Полный лог доступен в: {log_dir}")
except Exception as e:
logger.critical(f"Фатальная ошибка выполнения скрипта: {str(e)}", exc_info=True)
return 1
logger.info("Процесс бэкапа завершен")
return 0
if not (dev_success and sbx_success and prod_success):
exit_code = 1
logger.warning("[COHERENCE_CHECK_FAILED] Бэкап завершен с ошибками в одном или нескольких окружениях.")
else:
logger.info("[COHERENCE_CHECK_PASSED] Все бэкапы успешно завершены без ошибок.")
except Exception as e:
logger.critical(f"[CRITICAL] Фатальная ошибка выполнения скрипта: {str(e)}", exc_info=True)
exit_code = 1
logger.info("[INFO] Процесс бэкапа завершен")
return exit_code
# [ENTRYPOINT] Главная точка запуска скрипта
if __name__ == "__main__": if __name__ == "__main__":
exit_code = main() exit_code = main()
exit(exit_code) exit(exit_code)

View File

@@ -1,63 +1,90 @@
# [MODULE] Superset Dashboard Migration Script
# @contract: Автоматизирует процесс миграции и обновления дашбордов Superset между окружениями.
# @semantic_layers:
# 1. Конфигурация клиентов Superset для исходного и целевого окружений.
# 2. Определение правил трансформации конфигураций баз данных.
# 3. Экспорт дашборда, модификация YAML-файлов, создание нового архива и импорт.
# @coherence:
# - Использует `SupersetClient` для взаимодействия с API Superset.
# - Использует `SupersetLogger` для централизованного логирования.
# - Работает с `Pathlib` для управления файлами и директориями.
# - Интегрируется с `keyring` для безопасного хранения паролей.
# - Зависит от утилит `fileio` для обработки архивов и YAML-файлов.
# [IMPORTS] Локальные модули
from superset_tool.models import SupersetConfig from superset_tool.models import SupersetConfig
from superset_tool.client import SupersetClient from superset_tool.client import SupersetClient
from superset_tool.utils.logger import SupersetLogger from superset_tool.utils.logger import SupersetLogger
from superset_tool.exceptions import AuthenticationError from superset_tool.exceptions import AuthenticationError, SupersetAPIError, NetworkError, DashboardNotFoundError
from superset_tool.utils.fileio import save_and_unpack_dashboard, update_yamls, create_dashboard_export, create_temp_file,read_dashboard_from_disk from superset_tool.utils.fileio import save_and_unpack_dashboard, update_yamls, create_dashboard_export, create_temp_file, read_dashboard_from_disk
# [IMPORTS] Стандартная библиотека
import os import os
import keyring import keyring
from pathlib import Path from pathlib import Path
import logging import logging
log_dir = Path("H:\\dev\\Logs") # [CONFIG] Инициализация глобального логгера
# @invariant: Логгер доступен для всех компонентов скрипта.
log_dir = Path("H:\\dev\\Logs") # [COHERENCE_NOTE] Убедитесь, что путь доступен.
logger = SupersetLogger( logger = SupersetLogger(
log_dir=log_dir, log_dir=log_dir,
level=logging.INFO, level=logging.INFO,
console=True console=True
) )
logger.info("[COHERENCE_CHECK_PASSED] Логгер инициализирован для скрипта миграции.")
database_config_click={"old": # [CONFIG] Конфигурация трансформации базы данных Clickhouse
{ # @semantic: Определяет, как UUID и URI базы данных Clickhouse должны быть изменены.
"database_name": "Prod Clickhouse", # @invariant: 'old' и 'new' должны содержать полные конфигурации.
"sqlalchemy_uri": "clickhousedb+connect://clicketl:XXXXXXXXXX@rgm-s-khclk.hq.root.ad:443/dm", database_config_click = {
"uuid": "b9b67cb5-9874-4dc6-87bd-354fc33be6f9", "old": {
"database_uuid": "b9b67cb5-9874-4dc6-87bd-354fc33be6f9", "database_name": "Prod Clickhouse",
"allow_ctas": "false", "sqlalchemy_uri": "clickhousedb+connect://clicketl:XXXXXXXXXX@rgm-s-khclk.hq.root.ad:443/dm",
"allow_cvas": "false", "uuid": "b9b67cb5-9874-4dc6-87bd-354fc33be6f9",
"allow_dml": "false" "database_uuid": "b9b67cb5-9874-4dc6-87bd-354fc33be6f9",
}, "allow_ctas": "false",
"new": { "allow_cvas": "false",
"database_name": "Dev Clickhouse", "allow_dml": "false"
"sqlalchemy_uri": "clickhousedb+connect://dwhuser:XXXXXXXXXX@10.66.229.179:8123/dm", },
"uuid": "e9fd8feb-cb77-4e82-bc1d-44768b8d2fc2", "new": {
"database_uuid": "e9fd8feb-cb77-4e82-bc1d-44768b8d2fc2", "database_name": "Dev Clickhouse",
"allow_ctas": "true", "sqlalchemy_uri": "clickhousedb+connect://dwhuser:XXXXXXXXXX@10.66.229.179:8123/dm",
"allow_cvas": "true", "uuid": "e9fd8feb-cb77-4e82-bc1d-44768b8d2fc2",
"allow_dml": "true" "database_uuid": "e9fd8feb-cb77-4e82-bc1d-44768b8d2fc2",
} "allow_ctas": "true",
} "allow_cvas": "true",
"allow_dml": "true"
}
}
logger.debug("[CONFIG] Конфигурация Clickhouse загружена.")
database_config_gp={"old": # [CONFIG] Конфигурация трансформации базы данных Greenplum
{ # @semantic: Определяет, как UUID и URI базы данных Greenplum должны быть изменены.
"database_name": "Prod Greenplum", # @invariant: 'old' и 'new' должны содержать полные конфигурации.
"sqlalchemy_uri": "postgresql+psycopg2://viz_powerbi_gp_prod:XXXXXXXXXX@10.66.229.201:5432/dwh", database_config_gp = {
"uuid": "805132a3-e942-40ce-99c7-bee8f82f8aa8", "old": {
"database_uuid": "805132a3-e942-40ce-99c7-bee8f82f8aa8", "database_name": "Prod Greenplum",
"allow_ctas": "true", "sqlalchemy_uri": "postgresql+psycopg2://viz_powerbi_gp_prod:XXXXXXXXXX@10.66.229.201:5432/dwh",
"allow_cvas": "true", "uuid": "805132a3-e942-40ce-99c7-bee8f82f8aa8",
"allow_dml": "true" "database_uuid": "805132a3-e942-40ce-99c7-bee8f82f8aa8",
}, "allow_ctas": "true",
"new": { "allow_cvas": "true",
"database_name": "DEV Greenplum", "allow_dml": "true"
"sqlalchemy_uri": "postgresql+psycopg2://viz_superset_gp_dev:XXXXXXXXXX@10.66.229.171:5432/dwh", },
"uuid": "97b97481-43c3-4181-94c5-b69eaaa1e11f", "new": {
"database_uuid": "97b97481-43c3-4181-94c5-b69eaaa1e11f", "database_name": "DEV Greenplum",
"allow_ctas": "false", "sqlalchemy_uri": "postgresql+psycopg2://viz_superset_gp_dev:XXXXXXXXXX@10.66.229.171:5432/dwh",
"allow_cvas": "false", "uuid": "97b97481-43c3-4181-94c5-b69eaaa1e11f",
"allow_dml": "false" "database_uuid": "97b97481-43c3-4181-94c5-b69eaaa1e11f",
} "allow_ctas": "false",
} "allow_cvas": "false",
"allow_dml": "false"
}
}
logger.debug("[CONFIG] Конфигурация Greenplum загружена.")
# Конфигурация для Dev # [CONFIG] Конфигурация Superset API для Dev окружения
dev_config = SupersetConfig( dev_config = SupersetConfig(
base_url="https://devta.bi.dwh.rusal.com/api/v1", base_url="https://devta.bi.dwh.rusal.com/api/v1",
auth={ auth={
@@ -69,8 +96,9 @@ dev_config = SupersetConfig(
logger=logger, logger=logger,
verify_ssl=False verify_ssl=False
) )
logger.debug(f"[CONFIG] Dev SupersetConfig создан для {dev_config.base_url}")
# Конфигурация для Prod # [CONFIG] Конфигурация Superset API для Prod окружения
prod_config = SupersetConfig( prod_config = SupersetConfig(
base_url="https://prodta.bi.dwh.rusal.com/api/v1", base_url="https://prodta.bi.dwh.rusal.com/api/v1",
auth={ auth={
@@ -82,8 +110,9 @@ prod_config = SupersetConfig(
logger=logger, logger=logger,
verify_ssl=False verify_ssl=False
) )
logger.debug(f"[CONFIG] Prod SupersetConfig создан для {prod_config.base_url}")
# Конфигурация для Sandbox # [CONFIG] Конфигурация Superset API для Sandbox окружения
sandbox_config = SupersetConfig( sandbox_config = SupersetConfig(
base_url="https://sandboxta.bi.dwh.rusal.com/api/v1", base_url="https://sandboxta.bi.dwh.rusal.com/api/v1",
auth={ auth={
@@ -95,48 +124,87 @@ sandbox_config = SupersetConfig(
logger=logger, logger=logger,
verify_ssl=False verify_ssl=False
) )
logger.debug(f"[CONFIG] Sandbox SupersetConfig создан для {sandbox_config.base_url}")
# Инициализация клиента # [INIT] Инициализация клиентов Superset API
# @invariant: Все клиенты должны быть успешно инициализированы для дальнейшей работы.
try:
dev_client = SupersetClient(dev_config)
sandbox_client = SupersetClient(sandbox_config)
prod_client = SupersetClient(prod_config) # Не используется в текущем flow, но инициализирован.
logger.info("[COHERENCE_CHECK_PASSED] Клиенты Superset успешно инициализированы.")
except Exception as e:
logger.critical(f"[CRITICAL] Ошибка инициализации клиентов Superset: {str(e)}", exc_info=True)
exit(1) # Выход из скрипта при критической ошибке инициализации
dev_client = SupersetClient(dev_config) # [CONFIG] Определение исходного и целевого клиентов для миграции
sandbox_client = SupersetClient(sandbox_config) # [COHERENCE_NOTE] Эти переменные задают конкретную миграцию. Для параметризации можно использовать аргументы командной строки.
prod_client = SupersetClient(prod_config) from_c = sandbox_client # Источник миграции
to_c = dev_client # Цель миграции
dashboard_slug = "FI0070" # Идентификатор дашборда для миграции
# dashboard_id = 53 # ID не нужен, если есть slug
from_c = sandbox_client logger.info(f"[INFO] Конфигурация миграции: From '{from_c.config.base_url}' To '{to_c.config.base_url}' for dashboard slug '{dashboard_slug}'")
to_c = dev_client
dashboard_slug = "FI0070"
dashboard_id = 53
dashboard_meta = from_c.get_dashboard(dashboard_slug) try:
#print(dashboard_meta) # [ACTION] Получение метаданных исходного дашборда
#print(dashboard_meta["dashboard_title"]) logger.info(f"[INFO] Получение метаданных дашборда '{dashboard_slug}' из исходного окружения.")
dashboard_meta = from_c.get_dashboard(dashboard_slug)
dashboard_id = dashboard_meta["id"] # Получаем ID из метаданных
logger.info(f"[INFO] Найден дашборд '{dashboard_meta['dashboard_title']}' с ID: {dashboard_id}.")
dashboard_id = dashboard_meta["id"] # [CONTEXT_MANAGER] Работа с временной директорией для обработки архива дашборда
with create_temp_file(suffix='.dir', logger=logger) as temp_root:
logger.info(f"[INFO] Создана временная директория: {temp_root}")
# [ANCHOR] EXPORT_DASHBOARD
# Экспорт дашборда во временную директорию ИЛИ чтение с диска
# [COHERENCE_NOTE] В текущем коде закомментирован экспорт и используется локальный файл.
# Для полноценной миграции следует использовать export_dashboard().
# zip_content, filename = from_c.export_dashboard(dashboard_id) # Предпочтительный путь для реальной миграции
# [DEBUG] Использование файла с диска для тестирования миграции
zip_db_path = r"C:\Users\VolobuevAA\Downloads\dashboard_export_20250616T174203.zip"
logger.warning(f"[WARN] Используется ЛОКАЛЬНЫЙ файл дашборда для миграции: {zip_db_path}. Это может привести к некогерентности, если файл устарел.")
zip_content, filename = read_dashboard_from_disk(zip_db_path, logger=logger)
# [ANCHOR] SAVE_AND_UNPACK
# Сохранение и распаковка во временную директорию
zip_path, unpacked_path = save_and_unpack_dashboard(
zip_content=zip_content,
original_filename=filename,
unpack=True,
logger=logger,
output_dir=temp_root
)
logger.info(f"[INFO] Дашборд распакован во временную директорию: {unpacked_path}")
# [ANCHOR] UPDATE_YAML_CONFIGS
# Обновление конфигураций баз данных в YAML-файлах
source_path = unpacked_path / Path(filename).stem # Путь к распакованному содержимому дашборда
db_configs_to_apply = [database_config_click, database_config_gp]
logger.info(f"[INFO] Применение трансформаций баз данных к YAML файлам в {source_path}...")
update_yamls(db_configs_to_apply, path=source_path, logger=logger)
logger.info("[INFO] YAML-файлы успешно обновлены.")
with create_temp_file(suffix='.dir', logger=logger) as temp_root: # [ANCHOR] CREATE_NEW_EXPORT_ARCHIVE
# Экспорт дашборда во временную директорию # Создание нового экспорта дашборда из модифицированных файлов
#zip_content, filename = from_c.export_dashboard(dashboard_id, logger=logger) temp_zip = temp_root / f"{dashboard_slug}_migrated.zip" # Имя файла для импорта
zip_db_path = r"C:\Users\VolobuevAA\Downloads\dashboard_export_20250616T174203.zip" logger.info(f"[INFO] Создание нового ZIP-архива для импорта: {temp_zip}")
create_dashboard_export(temp_zip, [source_path], logger=logger)
logger.info("[INFO] Новый ZIP-архив дашборда готов к импорту.")
zip_content, filename = read_dashboard_from_disk(zip_db_path, logger=logger) # [ANCHOR] IMPORT_DASHBOARD
# Импорт обновленного дашборда в целевое окружение
logger.info(f"[INFO] Запуск импорта дашборда в целевое окружение {to_c.config.base_url}...")
import_result = to_c.import_dashboard(temp_zip)
logger.info(f"[COHERENCE_CHECK_PASSED] Дашборд '{dashboard_slug}' успешно импортирован/обновлен.", extra={"import_result": import_result})
# Сохранение и распаковка во временную директорию except (AuthenticationError, SupersetAPIError, NetworkError, DashboardNotFoundError) as e:
zip_path, unpacked_path = save_and_unpack_dashboard( logger.error(f"[ERROR] Ошибка миграции дашборда: {str(e)}", exc_info=True, extra=e.context)
zip_content=zip_content, exit(1)
original_filename=filename, except Exception as e:
unpack=True, logger.critical(f"[CRITICAL] Фатальная и необработанная ошибка в скрипте миграции: {str(e)}", exc_info=True)
logger=logger, exit(1)
output_dir=temp_root
)
# Обновление конфигураций
source_path = unpacked_path / Path(filename).stem
update_yamls([database_config_click,database_config_gp], path=source_path, logger=logger)
# Создание нового экспорта во временной директории
temp_zip = temp_root / f"{dashboard_slug}.zip"
create_dashboard_export(temp_zip, [source_path], logger=logger)
# Импорт обновленного дашборда
to_c.import_dashboard(temp_zip)
logger.info("[INFO] Процесс миграции завершен.")

View File

@@ -1,449 +1,326 @@
# [MODULE] Superset API Client # [MODULE] Superset API Client
# @contract: Реализует полное взаимодействие с Superset API # @contract: Реализует полное взаимодействие с Superset API
# @semantic_layers: # @semantic_layers:
# 1. Авторизация/CSRF # 1. Авторизация/CSRF (делегируется `APIClient`)
# 2. Основные операции (дашборды) # 2. Основные операции (получение метаданных, список дашбордов)
# 3. Импорт/экспорт # 3. Импорт/экспорт дашбордов
# @coherence: # @coherence:
# - Согласован с models.SupersetConfig # - Согласован с `models.SupersetConfig` для конфигурации.
# - Полная обработка всех errors из exceptions.py # - Полная обработка всех ошибок из `exceptions.py` (делегируется `APIClient` и дополняется специфичными).
# - Полностью использует `utils.network.APIClient` для всех HTTP-запросов.
# [IMPORTS] Стандартная библиотека # [IMPORTS] Стандартная библиотека
import json import json
from typing import Optional, Dict, Tuple, List, Any, Literal, Union,BinaryIO from typing import Optional, Dict, Tuple, List, Any, Literal, Union
import datetime
from pathlib import Path from pathlib import Path
from requests import Response
import zipfile # Для валидации ZIP-файлов
# [IMPORTS] Сторонние библиотеки # [IMPORTS] Сторонние библиотеки (убраны requests и urllib3, т.к. они теперь в network.py)
import requests
import urllib3
from pydantic import BaseModel, Field
from requests.exceptions import HTTPError
# [IMPORTS] Локальные модули # [IMPORTS] Локальные модули
from .models import SupersetConfig from superset_tool.models import SupersetConfig
from .exceptions import ( from superset_tool.exceptions import (
AuthenticationError, AuthenticationError,
SupersetAPIError, SupersetAPIError,
DashboardNotFoundError, DashboardNotFoundError,
NetworkError, NetworkError,
PermissionDeniedError, PermissionDeniedError,
ExportError ExportError,
InvalidZipFormatError
) )
from .utils.fileio import get_filename_from_headers from superset_tool.utils.fileio import get_filename_from_headers
from .utils.logger import SupersetLogger from superset_tool.utils.logger import SupersetLogger
from superset_tool.utils.network import APIClient # [REFACTORING_TARGET] Использование APIClient
# [CONSTANTS] Логирование # [CONSTANTS] Общие константы (для информации, т.к. тайм-аут теперь в конфиге)
HTTP_METHODS = Literal['GET', 'POST', 'PUT', 'DELETE'] DEFAULT_TIMEOUT = 30 # seconds - используется как значение по умолчанию в SupersetConfig
DEFAULT_TIMEOUT = 30 # seconds
# [TYPE-ALIASES] Для сложных сигнатур # [TYPE-ALIASES] Для сложных сигнатур
JsonType = Union[Dict[str, Any], List[Dict[str, Any]]] JsonType = Union[Dict[str, Any], List[Dict[str, Any]]]
ResponseType = Tuple[bytes, str] ResponseType = Tuple[bytes, str]
# [CHECK] Валидация импортов для контрактов # [CHECK] Валидация импортов для контрактов
# [COHERENCE_CHECK_PASSED] Теперь зависимость на requests и urllib3 скрыта за APIClient
try: try:
# Проверка наличия ключевых зависимостей
assert requests.__version__ >= '2.28.0' # для retry механизмов
assert urllib3.__version__ >= '1.26.0' # для SSL warnings
# Проверка локальных модулей
from .utils.fileio import get_filename_from_headers as fileio_check from .utils.fileio import get_filename_from_headers as fileio_check
assert callable(fileio_check) assert callable(fileio_check)
from .utils.network import APIClient as network_check
assert callable(network_check)
except (ImportError, AssertionError) as imp_err: except (ImportError, AssertionError) as imp_err:
raise RuntimeError( raise RuntimeError(
f"[COHERENCE_CHECK_FAILED] Импорт не прошел валидацию: {str(imp_err)}" f"[COHERENCE_CHECK_FAILED] Импорт не прошел валидацию: {str(imp_err)}"
) from imp_err ) from imp_err
class SupersetClient: class SupersetClient:
"""[MAIN-CONTRACT] Клиент для работы с Superset API """[MAIN-CONTRACT] Клиент для работы с Superset API
@pre: @pre:
- config должен быть валидным SupersetConfig - `config` должен быть валидным `SupersetConfig`.
- Целевой API доступен - Целевой API доступен и учетные данные корректны.
@post: @post:
- Все методы возвращают данные или вызывают явные ошибки - Все методы возвращают ожидаемые данные или вызывают явные, типизированные ошибки.
- Токены автоматически обновляются - Токены для API-вызовов автоматически управляются (`APIClient`).
@invariant: @invariant:
- Сессия остается валидной между вызовами - Сессия остается валидной между вызовами.
- Все ошибки типизированы согласно exceptions.py - Все ошибки типизированы согласно `exceptions.py`.
- Все HTTP-запросы проходят через `self.network`.
""" """
def __init__(self, config: SupersetConfig): def __init__(self, config: SupersetConfig, logger: Optional[SupersetLogger] = None):
"""[INIT] Инициализация клиента """[INIT] Инициализация клиента Superset.
@semantic: @semantic:
- Создает сессию requests - Валидирует входную конфигурацию.
- Настраивает адаптеры подключения - Инициализирует внутренний `APIClient` для сетевого взаимодействия.
- Выполняет первичную аутентификацию - Выполняет первичную аутентификацию через `APIClient`.
""" """
# [PRECONDITION] Валидация конфигурации
self.logger = logger or SupersetLogger(name="SupersetClient")
self._validate_config(config) self._validate_config(config)
self.config = config self.config = config
self.logger = config.logger or SupersetLogger(name="client")
self.session = self._setup_session()
# [ANCHOR] API_CLIENT_INIT
# [REFACTORING_COMPLETE] Теперь вся сетевая логика инкапсулирована в APIClient.
# APIClient отвечает за аутентификацию, повторные попытки и обработку низкоуровневых ошибок.
self.network = APIClient(
base_url=config.base_url,
auth=config.auth,
verify_ssl=config.verify_ssl,
timeout=config.timeout,
logger=self.logger # Передаем логгер в APIClient
)
try: try:
self._authenticate() # Аутентификация выполняется в конструкторе APIClient или по первому запросу
# Для явного вызова: self.network.authenticate()
# APIClient сам управляет токенами после первого успешного входа
self.logger.info( self.logger.info(
"[COHERENCE_CHECK_PASSED] Клиент успешно инициализирован", "[COHERENCE_CHECK_PASSED] Клиент Superset успешно инициализирован",
extra={"base_url": config.base_url} extra={"base_url": config.base_url}
) )
except Exception as e: except Exception as e:
self.logger.error( self.logger.error(
"[INIT_FAILED] Ошибка инициализации клиента", "[INIT_FAILED] Ошибка инициализации клиента Superset",
exc_info=True, exc_info=True,
extra={"config": config.dict()} extra={"config_base_url": config.base_url, "error": str(e)}
) )
raise raise # Перевыброс ошибки инициализации
def _validate_config(self, config: SupersetConfig) -> None: def _validate_config(self, config: SupersetConfig) -> None:
"""[PRECONDITION] Валидация конфигурации клиента """[PRECONDITION] Валидация конфигурации клиента.
@semantic: @semantic:
- Проверяет обязательные поля - Проверяет, что `config` является экземпляром `SupersetConfig`.
- Валидирует URL и учетные данные - Проверяет обязательные поля `base_url` и `auth`.
- Логирует ошибки валидации.
@raise: @raise:
- ValueError при невалидных параметрах - `TypeError`: если `config` не является `SupersetConfig`.
- TypeError при некорректном типе - `ValueError`: если отсутствуют обязательные поля или они невалидны.
""" """
if not isinstance(config, SupersetConfig): if not isinstance(config, SupersetConfig):
self.logger.error( self.logger.error(
"[CONFIG_VALIDATION_FAILED] Некорректный тип конфигурации", "[CONTRACT_VIOLATION] Некорректный тип конфигурации",
extra={"actual_type": type(config).__name__} extra={"actual_type": type(config).__name__}
) )
raise TypeError("Конфигурация должна быть экземпляром SupersetConfig") raise TypeError("Конфигурация должна быть экземпляром SupersetConfig")
required_fields = ["base_url", "auth"] # Pydantic SupersetConfig уже выполняет основную валидацию через Field и validator.
for field in required_fields: # Здесь можно добавить дополнительные бизнес-правила или проверки доступности, если нужно.
if not getattr(config, field, None):
self.logger.error(
"[CONFIG_VALIDATION_FAILED] Отсутствует обязательное поле",
extra={"missing_field": field}
)
raise ValueError(f"Обязательное поле {field} не указано")
if not config.auth.get("username") or not config.auth.get("password"):
self.logger.error(
"[CONFIG_VALIDATION_FAILED] Не указаны учетные данные",
extra={"auth_keys": list(config.auth.keys())}
)
raise ValueError("В конфигурации должны быть указаны username и password")
# Дополнительная валидация URL
if not config.base_url.startswith(("http://", "https://")):
self.logger.error(
"[CONFIG_VALIDATION_FAILED] Некорректный URL",
extra={"base_url": config.base_url}
)
raise ValueError("base_url должен начинаться с http:// или https://")
# [CHUNK] Настройка сессии и адаптеров
def _setup_session(self) -> requests.Session:
"""[INTERNAL] Конфигурация HTTP-сессии
@coherence_check: SSL verification должен соответствовать config
"""
session = requests.Session()
adapter = requests.adapters.HTTPAdapter(
max_retries=3,
pool_connections=10,
pool_maxsize=100
)
session.mount('https://', adapter)
session.verify = self.config.verify_ssl
if not self.config.verify_ssl:
urllib3.disable_warnings()
self.logger.debug(
"[CONFIG] SSL verification отключен",
extra={"base_url": self.config.base_url}
)
return session
# [CHUNK] Процесс аутентификации
def _authenticate(self):
"""[AUTH-FLOW] Получение токенов
@semantic_steps:
1. Получение access_token
2. Получение CSRF токена
@error_handling:
- AuthenticationError при проблемах credentials
- NetworkError при проблемах связи
"""
try: try:
# [STEP 1] Получение bearer token # Попытка доступа к полям через Pydantic для проверки их существования
login_url = f"{self.config.base_url}/security/login" _ = config.base_url
response = self.session.post( _ = config.auth
login_url, _ = config.auth.get("username")
json={ _ = config.auth.get("password")
"username": self.config.auth["username"], self.logger.debug("[COHERENCE_CHECK_PASSED] Конфигурация SupersetClient прошла внутреннюю валидацию.")
"password": self.config.auth["password"], except Exception as e:
"provider": self.config.auth["provider"], self.logger.error(
"refresh": self.config.auth["refresh"] f"[CONTRACT_VIOLATION] Ошибка валидации полей конфигурации: {e}",
}, extra={"config_dict": config.dict()}
timeout=self.config.timeout
) )
raise ValueError(f"Конфигурация SupersetConfig невалидна: {e}") from e
if response.status_code == 401:
raise AuthenticationError(
"Invalid credentials",
context={
"endpoint": login_url,
"username": self.config.auth["username"],
"status_code": response.status_code
}
)
response.raise_for_status()
self.access_token = response.json()["access_token"]
# [STEP 2] Получение CSRF token
csrf_url = f"{self.config.base_url}/security/csrf_token/"
response = self.session.get(
csrf_url,
headers={"Authorization": f"Bearer {self.access_token}"},
timeout=self.config.timeout
)
response.raise_for_status()
self.csrf_token = response.json()["result"]
self.logger.info(
"[AUTH_SUCCESS] Токены успешно получены",
extra={
"access_token": f"{self.access_token[:5]}...",
"csrf_token": f"{self.csrf_token[:5]}..."
}
)
except requests.exceptions.RequestException as e:
error_context = {
"method": e.request.method,
"url": e.request.url,
"status_code": getattr(e.response, 'status_code', None)
}
if isinstance(e, (requests.Timeout, requests.ConnectionError)):
raise NetworkError("Connection failed", context=error_context) from e
raise SupersetAPIError("Auth flow failed", context=error_context) from e
@property @property
def headers(self) -> dict: def headers(self) -> dict:
"""[INTERFACE] Базовые заголовки для API-вызовов """[INTERFACE] Базовые заголовки для API-вызовов.
@semantic: Объединяет общие заголовки для всех запросов @semantic: Делегирует получение актуальных заголовков `APIClient`.
@post: Всегда возвращает актуальные токены @post: Всегда возвращает актуальные токены и CSRF-токен.
@invariant: Заголовки содержат 'Authorization' и 'X-CSRFToken'.
""" """
return { # [REFACTORING_COMPLETE] Заголовки теперь управляются APIClient.
"Authorization": f"Bearer {self.access_token}", return self.network.headers
"X-CSRFToken": self.csrf_token,
"Referer": self.config.base_url,
"Content-Type": "application/json"
}
# [MAIN-OPERATIONS] Работа с дашбордами # [SECTION] Основные операции с дашбордами
def get_dashboard(self, dashboard_id_or_slug: str) -> dict: def get_dashboard(self, dashboard_id_or_slug: str) -> dict:
"""[CONTRACT] Получение метаданных дашборда """[CONTRACT] Получение метаданных дашборда по ID или SLUG.
@pre: @pre:
- dashboard_id_or_slug должен существовать - `dashboard_id_or_slug` должен быть строкой (ID или slug).
- Токены должны быть валидны - Клиент должен быть аутентифицирован (токены актуальны).
@post: @post:
- Возвращает полные метаданные - Возвращает `dict` с метаданными дашборда.
- В случае 404 вызывает DashboardNotFoundError @raise:
- `DashboardNotFoundError`: Если дашборд не найден (HTTP 404).
- `SupersetAPIError`: При других ошибках API.
- `NetworkError`: При проблемах с сетью.
""" """
url = f"{self.config.base_url}/dashboard/{dashboard_id_or_slug}" self.logger.info(f"[INFO] Запрос метаданных дашборда: {dashboard_id_or_slug}")
try: try:
response = self.session.get( response_data = self.network.request(
url, method="GET",
headers=self.headers, endpoint=f"/dashboard/{dashboard_id_or_slug}",
timeout=self.config.timeout # headers=self.headers # [REFACTORING_NOTE] APIClient теперь сам добавляет заголовки
) )
# [POSTCONDITION] Проверка структуры ответа
if response.status_code == 404: if "result" not in response_data:
raise DashboardNotFoundError( self.logger.warning("[CONTRACT_VIOLATION] Ответ API не содержит поле 'result'", extra={"response": response_data})
dashboard_id_or_slug, raise SupersetAPIError("Некорректный формат ответа API при получении дашборда")
context={"url": url} self.logger.debug(f"[DEBUG] Метаданные дашборда '{dashboard_id_or_slug}' успешно получены.")
) return response_data["result"]
except (DashboardNotFoundError, SupersetAPIError, NetworkError, PermissionDeniedError) as e:
response.raise_for_status() self.logger.error(f"[ERROR] Не удалось получить дашборд '{dashboard_id_or_slug}': {str(e)}", exc_info=True, extra=getattr(e, 'context', {}))
return response.json()["result"] raise # Перевыброс уже типизированной ошибки
except Exception as e:
except requests.exceptions.RequestException as e: self.logger.critical(f"[CRITICAL] Непредвиденная ошибка при получении дашборда '{dashboard_id_or_slug}': {str(e)}", exc_info=True)
self._handle_api_error("get_dashboard", e, url) raise SupersetAPIError(f"Непредвиденная ошибка: {str(e)}", context={"dashboard_id_or_slug": dashboard_id_or_slug}) from e
def export_dashboard(self, dashboard_id: int) -> tuple[bytes, str]:
"""[CONTRACT] Экспорт дашборда в ZIP
@error_handling:
- DashboardNotFoundError если дашборд не существует
- ExportError при проблемах экспорта
"""
url = f"{self.config.base_url}/dashboard/export/"
try:
response = self.session.get(
url,
headers=self.headers,
params={"q": f"[{dashboard_id}]"},
timeout=self.config.timeout
)
if response.status_code == 404:
raise DashboardNotFoundError(dashboard_id)
response.raise_for_status()
filename = (
get_filename_from_headers(response.headers)
or f"dashboard_{dashboard_id}.zip"
)
return response.content, filename
except requests.exceptions.RequestException as e:
self._handle_api_error("export_dashboard", e, url)
# [ERROR-HANDLER] Централизованная обработка ошибок
def _handle_api_error(self, method_name: str, error: Exception, url: str) -> None:
"""[UNIFIED-ERROR] Обработка API-ошибок
@semantic: Преобразует requests исключения в наши типы
"""
context = {
"method": method_name,
"url": url,
"status_code": getattr(error.response, 'status_code', None)
}
if isinstance(error, requests.Timeout):
raise NetworkError("Request timeout", context=context) from error
elif getattr(error.response, 'status_code', None) == 403:
raise PermissionDeniedError(context=context) from error
else:
raise SupersetAPIError(str(error), context=context) from error
# [SECTION] EXPORT OPERATIONS # [SECTION] EXPORT OPERATIONS
def export_dashboard(self, dashboard_id: int) -> Tuple[bytes, str]: def export_dashboard(self, dashboard_id: int) -> Tuple[bytes, str]:
"""[CONTRACT] Экспорт дашборда в ZIP-архив """[CONTRACT] Экспорт дашборда в ZIP-архив.
@pre: @pre:
- dashboard_id должен существовать - `dashboard_id` должен быть целочисленным ID существующего дашборда.
- Пользователь имеет права на экспорт - Пользователь должен иметь права на экспорт.
@post: @post:
- Возвращает кортеж (бинарное содержимое, имя файла) - Возвращает кортеж: (бинарное_содержимое_zip, имя_файла).
- Имя файла извлекается из headers или генерируется - Имя файла извлекается из заголовков `Content-Disposition` или генерируется.
@errors: @raise:
- DashboardNotFoundError если дашборд не существует - `DashboardNotFoundError`: Если дашборд с `dashboard_id` не найден (HTTP 404).
- ExportError при проблемах экспорта - `ExportError`: При любых других проблемах экспорта (например, неверный тип контента, пустой ответ).
- `NetworkError`: При проблемах с сетью.
""" """
url = f"{self.config.base_url}/dashboard/export/" self.logger.info(f"[INFO] Запуск экспорта дашборда с ID: {dashboard_id}")
self.logger.debug(
"[EXPORT_START] Запуск экспорта",
extra={"dashboard_id": dashboard_id, "export_url": url}
)
try: try:
response = self._execute_export_request(dashboard_id, url) # [ANCHOR] EXECUTE_EXPORT_REQUEST
# [REFACTORING_COMPLETE] Использование self.network.request для экспорта
response = self.network.request(
method="GET",
endpoint="/dashboard/export/",
params={"q": json.dumps([dashboard_id])},
stream=True, # Используем stream для обработки больших файлов
raw_response=True # Получаем сырой объект ответа requests.Response
# headers=self.headers # APIClient сам добавляет заголовки
)
response.raise_for_status() # Проверка статуса ответа
# [ANCHOR] VALIDATE_EXPORT_RESPONSE
self._validate_export_response(response, dashboard_id) self._validate_export_response(response, dashboard_id)
# [ANCHOR] RESOLVE_FILENAME
filename = self._resolve_export_filename(response, dashboard_id) filename = self._resolve_export_filename(response, dashboard_id)
return response.content, filename
# [POSTCONDITION] Успешный экспорт
except requests.exceptions.HTTPError as http_err: content = response.content # Получаем все содержимое
error_ctx = { self.logger.info(
"dashboard_id": dashboard_id, f"[COHERENCE_CHECK_PASSED] Дашборд {dashboard_id} успешно экспортирован. Размер: {len(content)} байт, Имя файла: {filename}"
"status_code": http_err.response.status_code
}
if http_err.response.status_code == 404:
self.logger.error(
"[EXPORT_FAILED] Дашборд не найден",
extra=error_ctx
)
raise DashboardNotFoundError(dashboard_id, context=error_ctx)
raise ExportError("HTTP ошибка экспорта", context=error_ctx) from http_err
except requests.exceptions.RequestException as req_err:
error_ctx = {"dashboard_id": dashboard_id}
self.logger.error(
"[EXPORT_FAILED] Ошибка запроса",
exc_info=True,
extra=error_ctx
) )
raise ExportError("Ошибка экспорта", context=error_ctx) from req_err return content, filename
def _execute_export_request(self, dashboard_id: int, url: str) -> requests.Response: except (DashboardNotFoundError, ExportError, NetworkError, PermissionDeniedError, SupersetAPIError) as e:
"""[HELPER] Выполнение запроса экспорта # Перехват и перевыброс уже типизированных ошибок от APIClient или предыдущих валидаций
@coherence_check: self.logger.error(f"[ERROR] Ошибка экспорта дашборда {dashboard_id}: {str(e)}", exc_info=True, extra=getattr(e, 'context', {}))
- Ответ должен иметь status_code 200 raise
- Content-Type: application/zip except Exception as e:
""" # Обработка любых непредвиденных ошибок
response = self.session.get( error_ctx = {"dashboard_id": dashboard_id, "error_type": type(e).__name__}
url, self.logger.critical(f"[CRITICAL] Непредвиденная ошибка при экспорте дашборда {dashboard_id}: {str(e)}", exc_info=True, extra=error_ctx)
headers=self.headers, raise ExportError(f"Непредвиденная ошибка при экспорте: {str(e)}", context=error_ctx) from e
params={"q": f"[{dashboard_id}]"},
timeout=self.config.timeout,
stream=True
)
response.raise_for_status()
return response
def _validate_export_response(self, response: requests.Response, dashboard_id: int) -> None: # [HELPER] Метод _execute_export_request был инлайнирован в export_dashboard
"""[HELPER] Валидация ответа экспорта # Это сделано, чтобы избежать лишней абстракции, так как он просто вызывает self.network.request.
# Валидация HTTP-ответа и ошибок теперь происходит в self.network.request и последующей self.raise_for_status().
def _validate_export_response(self, response: Response, dashboard_id: int) -> None:
"""[HELPER] Валидация ответа экспорта.
@semantic: @semantic:
- Проверка Content-Type - Проверяет, что Content-Type является `application/zip`.
- Проверка наличия данных - Проверяет, что ответ не пуст.
@raise:
- `ExportError`: При невалидном Content-Type или пустом содержимом.
""" """
if 'application/zip' not in response.headers.get('Content-Type', ''): content_type = response.headers.get('Content-Type', '')
if 'application/zip' not in content_type:
self.logger.error( self.logger.error(
"[EXPORT_VALIDATION_FAILED] Неверный Content-Type", "[CONTRACT_VIOLATION] Неверный Content-Type для экспорта",
extra={ extra={
"dashboard_id": dashboard_id, "dashboard_id": dashboard_id,
"content_type": response.headers.get('Content-Type') "expected_type": "application/zip",
"received_type": content_type
} }
) )
raise ExportError("Получен не ZIP-архив") raise ExportError(f"Получен не ZIP-архив (Content-Type: {content_type})")
if not response.content: if not response.content:
self.logger.error( self.logger.error(
"[EXPORT_VALIDATION_FAILED] Пустой ответ", "[CONTRACT_VIOLATION] Пустой ответ при экспорте дашборда",
extra={"dashboard_id": dashboard_id} extra={"dashboard_id": dashboard_id}
) )
raise ExportError("Получены пустые данные") raise ExportError("Получены пустые данные при экспорте")
self.logger.debug(f"[COHERENCE_CHECK_PASSED] Ответ экспорта для дашборда {dashboard_id} валиден.")
def _resolve_export_filename(self, response: requests.Response, dashboard_id: int) -> str: def _resolve_export_filename(self, response: Response, dashboard_id: int) -> str:
"""[HELPER] Определение имени экспортируемого файла """[HELPER] Определение имени экспортируемого файла.
@fallback: Генерирует имя если не найден заголовок @semantic:
- Пытается извлечь имя файла из заголовка `Content-Disposition`.
- Если заголовок отсутствует, генерирует имя файла на основе ID дашборда и текущей даты.
@post:
- Возвращает строку с именем файла.
""" """
filename = get_filename_from_headers(response.headers) filename = get_filename_from_headers(response.headers)
if not filename: if not filename:
filename = f"dashboard_export_{dashboard_id}_{datetime.now().strftime('%Y%m%d')}.zip" # [FALLBACK] Генерация имени файла
filename = f"dashboard_export_{dashboard_id}_{datetime.datetime.now().strftime('%Y%m%dT%H%M%S')}.zip"
self.logger.warning(
"[WARN] Не удалось извлечь имя файла из заголовков. Используется сгенерированное имя.",
extra={"generated_filename": filename, "dashboard_id": dashboard_id}
)
else:
self.logger.debug( self.logger.debug(
"[EXPORT_FALLBACK] Используется сгенерированное имя файла", "[DEBUG] Имя файла экспорта получено из заголовков.",
extra={"filename": filename} extra={"filename": filename, "dashboard_id": dashboard_id}
) )
return filename return filename
def export_to_file(self, dashboard_id: int, output_dir: Union[str, Path]) -> Path: def export_to_file(self, dashboard_id: int, output_dir: Union[str, Path]) -> Path:
"""[CONTRACT] Экспорт дашборда прямо в файл """[CONTRACT] Экспорт дашборда напрямую в файл.
@pre: @pre:
- output_dir должен существовать - `dashboard_id` должен быть существующим ID дашборда.
- Доступ на запись в директорию - `output_dir` должен быть валидным, существующим путем и иметь права на запись.
@post: @post:
- Возвращает Path сохраненного файла - Дашборд экспортируется и сохраняется как ZIP-файл в `output_dir`.
- Создает поддиректорию с именем дашборда - Возвращает `Path` к сохраненному файлу.
@raise:
- `FileNotFoundError`: Если `output_dir` не существует.
- `ExportError`: При ошибках экспорта или записи файла.
- `NetworkError`: При проблемах с сетью.
""" """
output_dir = Path(output_dir) output_dir = Path(output_dir)
if not output_dir.exists(): if not output_dir.exists():
self.logger.error( self.logger.error(
"[EXPORT_PRE_FAILED] Директория не существует", "[CONTRACT_VIOLATION] Целевая директория для экспорта не найдена.",
extra={"output_dir": str(output_dir)} extra={"output_dir": str(output_dir)}
) )
raise FileNotFoundError(f"Директория {output_dir} не найдена") raise FileNotFoundError(f"Директория {output_dir} не найдена")
content, filename = self.export_dashboard(dashboard_id) self.logger.info(f"[INFO] Экспорт дашборда {dashboard_id} в файл в директорию: {output_dir}")
target_path = output_dir / filename
try: try:
content, filename = self.export_dashboard(dashboard_id)
target_path = output_dir / filename
with open(target_path, 'wb') as f: with open(target_path, 'wb') as f:
f.write(content) f.write(content)
self.logger.info( self.logger.info(
"[EXPORT_SUCCESS] Дашборд сохранен на диск", "[COHERENCE_CHECK_PASSED] Дашборд успешно сохранен на диск.",
extra={ extra={
"dashboard_id": dashboard_id, "dashboard_id": dashboard_id,
"file_path": str(target_path), "file_path": str(target_path),
@@ -452,145 +329,245 @@ class SupersetClient:
) )
return target_path return target_path
except (FileNotFoundError, ExportError, NetworkError, SupersetAPIError, DashboardNotFoundError) as e:
self.logger.error(f"[ERROR] Ошибка сохранения дашборда {dashboard_id} на диск: {str(e)}", exc_info=True, extra=getattr(e, 'context', {}))
raise
except IOError as io_err: except IOError as io_err:
self.logger.error( error_ctx = {"target_path": str(target_path), "dashboard_id": dashboard_id}
"[EXPORT_IO_FAILED] Ошибка записи файла", self.logger.critical(f"[CRITICAL] Ошибка записи файла для дашборда {dashboard_id}: {str(io_err)}", exc_info=True, extra=error_ctx)
exc_info=True, raise ExportError("Ошибка сохранения файла на диск") from io_err
extra={"target_path": str(target_path)} except Exception as e:
) error_ctx = {"dashboard_id": dashboard_id, "error_type": type(e).__name__}
raise ExportError("Ошибка сохранения файла") from io_err self.logger.critical(f"[CRITICAL] Непредвиденная ошибка при экспорте в файл: {str(e)}", exc_info=True, extra=error_ctx)
raise ExportError(f"Непредвиденная ошибка экспорта в файл: {str(e)}", context=error_ctx) from e
# [SECTION] Основной интерфейс API # [SECTION] API для получения списка дашбордов
def get_dashboards(self, query: Optional[Dict] = None) -> Tuple[int, List[Dict]]: def get_dashboards(self, query: Optional[Dict] = None) -> Tuple[int, List[Dict]]:
"""[CONTRACT] Получение списка дашбордов с пагинацией """[CONTRACT] Получение списка дашбордов с пагинацией.
@pre: @pre:
- Клиент должен быть авторизован - Клиент должен быть авторизован.
- Параметры пагинации должны быть валидны - Параметры `query` (если предоставлены) должны быть валидны для API Superset.
@post: @post:
- Возвращает кортеж (total_count, список метаданных) - Возвращает кортеж: (общееоличествоашбордов, список_метаданныхашбордов).
- Поддерживает кастомные query-параметры - Обходит пагинацию для получения всех доступных дашбордов.
@invariant: @invariant:
- Всегда возвращает полный список (обходит пагинацию) - Всегда возвращает полный список (если `total_count` > 0).
@raise:
- `SupersetAPIError`: При ошибках API (например, неверный формат ответа).
- `NetworkError`: При проблемах с сетью.
- `ValueError`: При некорректных параметрах пагинации (внутренняя ошибка).
""" """
url = f"{self.config.base_url}/dashboard/" self.logger.info("[INFO] Запрос списка всех дашбордов.")
self.logger.debug( # [COHERENCE_CHECK] Валидация и нормализация параметров запроса
"[API_CALL] Запрос списка дашбордов",
extra={"query": query}
)
# [COHERENCE_CHECK] Валидация параметров
validated_query = self._validate_query_params(query) validated_query = self._validate_query_params(query)
self.logger.debug("[DEBUG] Параметры запроса списка дашбордов после валидации.", extra={"validated_query": validated_query})
try: try:
# Инициализация пагинации # [ANCHOR] FETCH_TOTAL_COUNT
total_count = self._fetch_total_count(url) total_count = self._fetch_total_count()
paginated_data = self._fetch_all_pages(url, validated_query, total_count) self.logger.info(f"[INFO] Обнаружено {total_count} дашбордов в системе.")
# [ANCHOR] FETCH_ALL_PAGES
paginated_data = self._fetch_all_pages(validated_query, total_count)
self.logger.info( self.logger.info(
"[API_SUCCESS] Дашборды получены", f"[COHERENCE_CHECK_PASSED] Успешно получено {len(paginated_data)} дашбордов из {total_count}."
extra={"count": total_count}
) )
return total_count, paginated_data return total_count, paginated_data
except requests.exceptions.RequestException as e: except (SupersetAPIError, NetworkError, ValueError, PermissionDeniedError) as e:
error_ctx = {"method": "get_dashboards", "query": validated_query} self.logger.error(f"[ERROR] Ошибка при получении списка дашбордов: {str(e)}", exc_info=True, extra=getattr(e, 'context', {}))
self._handle_api_error("Пагинация дашбордов", e, error_ctx) raise
except Exception as e:
error_ctx = {"query": query, "error_type": type(e).__name__}
self.logger.critical(f"[CRITICAL] Непредвиденная ошибка при получении списка дашбордов: {str(e)}", exc_info=True, extra=error_ctx)
raise SupersetAPIError(f"Непредвиденная ошибка: {str(e)}", context=error_ctx) from e
# [SECTION] Импорт/экспорт # [SECTION] Импорт
def import_dashboard(self, zip_path: Union[str, Path]) -> Dict: def import_dashboard(self, file_name: Union[str, Path]) -> Dict:
"""[CONTRACT] Импорт дашборда из архива """[CONTRACT] Импорт дашборда из ZIP-архива.
@pre: @pre:
- Файл должен существовать и быть валидным ZIP - `file_name` должен указывать на существующий и валидный ZIP-файл Superset экспорта.
- Должны быть права на импорт - Пользователь должен иметь права на импорт дашбордов.
@post: @post:
- Возвращает метаданные импортированного дашборда - Дашборд импортируется (или обновляется, если `overwrite` включен).
- При конфликтах выполняет overwrite - Возвращает `dict` с ответом API об импорте.
@raise:
- `FileNotFoundError`: Если файл не существует.
- `InvalidZipFormatError`: Если файл не является корректным ZIP-архивом Superset.
- `PermissionDeniedError`: Если у пользователя нет прав на импорт.
- `SupersetAPIError`: При других ошибках API импорта.
- `NetworkError`: При проблемах с сетью.
""" """
self._validate_import_file(zip_path) self.logger.info(f"[INFO] Инициирован импорт дашборда из файла: {file_name}")
# [PRECONDITION] Валидация входного файла
self._validate_import_file(file_name)
try: try:
with open(zip_path, 'rb') as f: # [ANCHOR] UPLOAD_FILE_TO_API
return self._execute_import( # [REFACTORING_COMPLETE] Использование self.network.upload_file
file_obj=f, import_response = self.network.upload_file(
file_name=Path(zip_path).name endpoint="/dashboard/import/",
) file_obj=Path(file_name), # Pathlib объект, который APIClient может преобразовать в бинарный
except Exception as e: file_name=Path(file_name).name, # Имя файла для FormData
self.logger.error( form_field="formData",
"[IMPORT_FAILED] Критическая ошибка импорта", extra_data={'overwrite': 'true'}, # Предполагаем, что всегда хотим перезаписывать
exc_info=True, timeout=self.config.timeout * 2 # Удвоенный таймаут для загрузки больших файлов
extra={"file": str(zip_path)} # headers=self.headers # APIClient сам добавляет заголовки
) )
raise DashboardImportError(f"Import failed: {str(e)}") from e # [POSTCONDITION] Проверка успешного ответа импорта (Superset обычно возвращает JSON)
if not isinstance(import_response, dict) or "message" not in import_response:
self.logger.warning("[CONTRACT_VIOLATION] Неожиданный формат ответа при импорте", extra={"response": import_response})
raise SupersetAPIError("Неожиданный формат ответа после импорта дашборда.")
self.logger.info(
f"[COHERENCE_CHECK_PASSED] Дашборд из '{file_name}' успешно импортирован.",
extra={"api_message": import_response.get("message", "N/A"), "file": file_name}
)
return import_response
except (FileNotFoundError, InvalidZipFormatError, PermissionDeniedError, SupersetAPIError, NetworkError, DashboardNotFoundError) as e:
self.logger.error(f"[ERROR] Ошибка импорта дашборда из '{file_name}': {str(e)}", exc_info=True, extra=getattr(e, 'context', {}))
raise
except Exception as e:
error_ctx = {"file": file_name, "error_type": type(e).__name__}
self.logger.critical(f"[CRITICAL] Непредвиденная ошибка при импорте дашборда: {str(e)}", exc_info=True, extra=error_ctx)
raise SupersetAPIError(f"Непредвиденная ошибка импорта: {str(e)}", context=error_ctx) from e
# [SECTION] Приватные методы-помощники # [SECTION] Приватные методы-помощники
def _validate_query_params(self, query: Optional[Dict]) -> Dict: def _validate_query_params(self, query: Optional[Dict]) -> Dict:
"""[HELPER] Нормализация параметров запроса""" """[HELPER] Нормализация и валидация параметров запроса для списка дашбордов.
@semantic:
- Устанавливает значения по умолчанию для `columns`, `page`, `page_size`.
- Объединяет предоставленные `query` параметры с дефолтными.
@post:
- Возвращает словарь с полными и валидными параметрами запроса.
"""
base_query = { base_query = {
"columns": ["slug", "id", "changed_on_utc", "dashboard_title", "published"], "columns": ["slug", "id", "changed_on_utc", "dashboard_title", "published"],
"page": 0, "page": 0,
"page_size": 20 "page_size": 1000 # Достаточно большой размер страницы для обхода пагинации
} }
# [COHERENCE_CHECK_PASSED] Параметры запроса сформированы корректно.
return {**base_query, **(query or {})} return {**base_query, **(query or {})}
def _fetch_total_count(self, url: str) -> int: def _fetch_total_count(self) -> int:
"""[HELPER] Получение общего количества дашбордов""" """[CONTRACT][HELPER] Получение общего количества дашбордов в системе.
count_response = self.session.get( @delegates:
f"{url}?q={json.dumps({'columns': ['id'], 'page': 0, 'page_size': 1})}", - Сетевой запрос к `APIClient.fetch_paginated_count`.
headers=self.headers, @pre:
timeout=self.config.timeout - Клиент должен быть авторизован.
) @post:
count_response.raise_for_status() - Возвращает целочисленное количество дашбордов.
return count_response.json()['count'] @raise:
- `SupersetAPIError` или `NetworkError` при проблемах с API/сетью.
def _fetch_all_pages(self, url: str, query: Dict, total_count: int) -> List[Dict]: """
"""[HELPER] Обход всех страниц с пагинацией""" query_params_for_count = {
results = [] 'columns': ['id'],
page_size = query['page_size'] 'page': 0,
total_pages = (total_count + page_size - 1) // page_size 'page_size': 1
}
for page in range(total_pages): self.logger.debug("[DEBUG] Запрос общего количества дашбордов.")
query['page'] = page try:
response = self.session.get( # [REFACTORING_COMPLETE] Использование self.network.fetch_paginated_count
url, count = self.network.fetch_paginated_count(
headers=self.headers, endpoint="/dashboard/",
params={"q": json.dumps(query)}, query_params=query_params_for_count,
timeout=self.config.timeout count_field="count"
) )
response.raise_for_status() self.logger.debug(f"[COHERENCE_CHECK_PASSED] Получено общее количество дашбордов: {count}")
results.extend(response.json().get('result', [])) return count
except (SupersetAPIError, NetworkError, PermissionDeniedError) as e:
return results self.logger.error(f"[ERROR] Ошибка получения общего количества дашбордов: {str(e)}", exc_info=True, extra=getattr(e, 'context', {}))
raise # Перевыброс ошибки
except Exception as e:
error_ctx = {"error_type": type(e).__name__}
self.logger.critical(f"[CRITICAL] Непредвиденная ошибка при получении общего количества: {str(e)}", exc_info=True, extra=error_ctx)
raise SupersetAPIError(f"Непредвиденная ошибка при получении count: {str(e)}", context=error_ctx) from e
def _fetch_all_pages(self, query: Dict, total_count: int) -> List[Dict]:
"""[CONTRACT][HELPER] Обход всех страниц пагинированного API для получения всех данных.
@delegates:
- Сетевые запросы к `APIClient.fetch_paginated_data()`.
@pre:
- `query` должен содержать `page_size`.
- `total_count` должен быть корректным общим количеством элементов.
@post:
- Возвращает список всех элементов, собранных со всех страниц.
@raise:
- `SupersetAPIError` или `NetworkError` при проблемах с API/сетью.
- `ValueError` при некорректных параметрах пагинации.
"""
self.logger.debug(f"[DEBUG] Запуск обхода пагинации. Всего элементов: {total_count}, query: {query}")
try:
if 'page_size' not in query or not query['page_size']:
self.logger.error("[CONTRACT_VIOLATION] Параметр 'page_size' отсутствует или неверен в query.")
raise ValueError("Отсутствует 'page_size' в query параметрах для пагинации")
# [REFACTORING_COMPLETE] Использование self.network.fetch_paginated_data
all_data = self.network.fetch_paginated_data(
endpoint="/dashboard/",
base_query=query,
total_count=total_count,
results_field="result"
)
self.logger.debug(f"[COHERENCE_CHECK_PASSED] Успешно получено {len(all_data)} элементов со всех страниц.")
return all_data
except (SupersetAPIError, NetworkError, ValueError, PermissionDeniedError) as e:
self.logger.error(f"[ERROR] Ошибка при обходе пагинации: {str(e)}", exc_info=True, extra=getattr(e, 'context', {}))
raise
except Exception as e:
error_ctx = {"query": query, "total_count": total_count, "error_type": type(e).__name__}
self.logger.critical(f"[CRITICAL] Непредвиденная ошибка при обходе пагинации: {str(e)}", exc_info=True, extra=error_ctx)
raise SupersetAPIError(f"Непредвиденная ошибка пагинации: {str(e)}", context=error_ctx) from e
def _validate_import_file(self, zip_path: Union[str, Path]) -> None: def _validate_import_file(self, zip_path: Union[str, Path]) -> None:
"""[HELPER] Проверка файла перед импортом""" """[HELPER] Проверка файла перед импортом.
@semantic:
- Проверяет существование файла.
- Проверяет, что файл является валидным ZIP-архивом.
- Проверяет, что ZIP-архив содержит `metadata.yaml` (ключевой для экспорта Superset).
@raise:
- `FileNotFoundError`: Если файл не существует.
- `InvalidZipFormatError`: Если файл не ZIP или не содержит `metadata.yaml`.
"""
path = Path(zip_path) path = Path(zip_path)
self.logger.debug(f"[DEBUG] Валидация файла для импорта: {path}")
if not path.exists(): if not path.exists():
raise FileNotFoundError(f"[FILE_ERROR] {zip_path} не существует") self.logger.error(
"[CONTRACT_VIOLATION] Файл для импорта не найден.",
extra={"file_path": str(path)}
)
raise FileNotFoundError(f"Файл {zip_path} не существует")
if not zipfile.is_zipfile(path): if not zipfile.is_zipfile(path):
raise InvalidZipFormatError(f"[FILE_ERROR] {zip_path} не ZIP-архив") self.logger.error(
"[CONTRACT_VIOLATION] Файл не является валидным ZIP-архивом.",
extra={"file_path": str(path)}
)
raise InvalidZipFormatError(f"Файл {zip_path} не является ZIP-архивом")
with zipfile.ZipFile(path) as zf: try:
if not any(n.endswith('metadata.yaml') for n in zf.namelist()): with zipfile.ZipFile(path, 'r') as zf:
raise DashboardNotFoundError("Архив не содержит metadata.yaml") # [CONTRACT] Проверяем наличие metadata.yaml
if not any(n.endswith('metadata.yaml') for n in zf.namelist()):
def _execute_import(self, file_obj: BinaryIO, file_name: str) -> Dict: self.logger.error(
"""[HELPER] Выполнение API-запроса импорта""" "[CONTRACT_VIOLATION] ZIP-архив не содержит 'metadata.yaml'.",
url = f"{self.config.base_url}/dashboard/import/" extra={"file_path": str(path), "zip_contents": zf.namelist()[:5]} # Логируем первые 5 файлов для отладки
)
files = {'formData': (file_name, file_obj, 'application/x-zip-compressed')} raise InvalidZipFormatError(f"Архив {zip_path} не содержит 'metadata.yaml', не является корректным экспортом Superset.")
headers = {k: v for k, v in self.headers.items() if k.lower() != 'content-type'} self.logger.debug(f"[COHERENCE_CHECK_PASSED] Файл '{path}' успешно прошел валидацию для импорта.")
except zipfile.BadZipFile as e:
response = self.session.post( self.logger.error(
url, f"[CONTRACT_VIOLATION] Ошибка чтения ZIP-файла: {str(e)}",
files=files, exc_info=True, extra={"file_path": str(path)}
data={'overwrite': 'true'}, )
headers=headers, raise InvalidZipFormatError(f"Файл {zip_path} поврежден или имеет некорректный формат ZIP.") from e
timeout=self.config.timeout * 2 # Увеличенный таймаут для импорта except Exception as e:
) self.logger.critical(
f"[CRITICAL] Непредвиденная ошибка при валидации ZIP-файла: {str(e)}",
if response.status_code == 403: exc_info=True, extra={"file_path": str(path)}
raise PermissionDeniedError("Недостаточно прав для импорта") )
raise SupersetAPIError(f"Непредвиденная ошибка валидации ZIP: {str(e)}", context={"file_path": str(path)}) from e
response.raise_for_status()
return response.json()

View File

@@ -1,48 +1,66 @@
# [MODULE] Иерархия исключений # [MODULE] Иерархия исключений
# @contract: Все ошибки наследуют SupersetToolError # @contract: Все ошибки наследуют `SupersetToolError` для единой точки обработки.
# @semantic: Каждый тип соответствует конкретной проблемной области # @semantic: Каждый тип исключения соответствует конкретной проблемной области в инструменте Superset.
# @coherence: # @coherence:
# - Полное покрытие всех сценариев клиента # - Полное покрытие всех сценариев ошибок клиента и утилит.
# - Четкая классификация по уровню серьезности # - Четкая классификация по уровню серьезности (от общей до специфичной).
# - Дополнительный `context` для каждой ошибки, помогающий в диагностике.
# [IMPORTS] Exceptions # [IMPORTS] Standard library
from typing import Optional, Dict, Any from pathlib import Path
# [IMPORTS] Typing
from typing import Optional, Dict, Any,Union
class SupersetToolError(Exception): class SupersetToolError(Exception):
"""[BASE] Базовый класс ошибок инструмента """[BASE] Базовый класс для всех ошибок инструмента Superset.
@semantic: Должен содержать контекст для диагностики @semantic: Обеспечивает стандартизированный формат сообщений об ошибках с контекстом.
@invariant:
- `message` всегда присутствует.
- `context` всегда является словарем, даже если пустой.
""" """
def __init__(self, message: str, context: Optional[dict] = None): def __init__(self, message: str, context: Optional[Dict[str, Any]] = None):
# [PRECONDITION] Проверка типа контекста
if not isinstance(context, (dict, type(None))):
# [COHERENCE_CHECK_FAILED] Ошибка в передаче контекста
raise TypeError("Контекст ошибки должен быть словарем или None")
self.context = context or {} self.context = context or {}
super().__init__(f"{message} | Context: {self.context}") super().__init__(f"{message} | Context: {self.context}")
# [POSTCONDITION] Логирование создания ошибки
# Можно добавить здесь логирование, но обычно ошибки логируются в месте их перехвата/подъема,
# чтобы избежать дублирования и получить полный стек вызовов.
# [ERROR-GROUP] Проблемы аутентификации и авторизации # [ERROR-GROUP] Проблемы аутентификации и авторизации
class AuthenticationError(SupersetToolError): class AuthenticationError(SupersetToolError):
"""[AUTH] Ошибки credentials или доступа """[AUTH] Ошибки аутентификации (неверные учетные данные) или авторизации (проблемы с сессией).
@context: url, username, error_detail @context: url, username, error_detail (опционально).
""" """
def __init__(self, message="Auth failed", **context): def __init__(self, message: str = "Authentication failed", **context: Any):
super().__init__( super().__init__(
f"[AUTH_FAILURE] {message}", f"[AUTH_FAILURE] {message}",
{"type": "authentication", **context} {"type": "authentication", **context}
) )
class PermissionDeniedError(AuthenticationError): class PermissionDeniedError(AuthenticationError):
"""[AUTH] Ошибка отказа в доступе из-за недостаточных прав """[AUTH] Ошибка отказа в доступе из-за недостаточных прав пользователя.
@context: required_permission, user_roles @semantic: Указывает на то, что операция не разрешена.
@context: required_permission (опционально), user_roles (опционально), endpoint (опционально).
@invariant: Наследует от `AuthenticationError`, так как это разновидность проблемы доступа.
""" """
def __init__(self, required_permission: str, **context): def __init__(self, message: str = "Permission denied", required_permission: Optional[str] = None, **context: Any):
full_message = f"Permission denied: {required_permission}" if required_permission else message
super().__init__( super().__init__(
f"Permission denied: {required_permission}", full_message,
{"type": "authorization", "required_permission": required_permission, **context} {"type": "authorization", "required_permission": required_permission, **context}
) )
# [ERROR-GROUP] Проблемы API-вызовов # [ERROR-GROUP] Проблемы API-вызовов
class SupersetAPIError(SupersetToolError): class SupersetAPIError(SupersetToolError):
"""[API] Ошибки взаимодействия с Superset API """[API] Общие ошибки взаимодействия с Superset API.
@context: endpoint, method, status_code, response @semantic: Для ошибок, возвращаемых Superset API, или проблем с парсингом ответа.
@context: endpoint, method, status_code, response_body (опционально), error_message (из API).
""" """
def __init__(self, message="API error", **context): def __init__(self, message: str = "Superset API error", **context: Any):
super().__init__( super().__init__(
f"[API_FAILURE] {message}", f"[API_FAILURE] {message}",
{"type": "api_call", **context} {"type": "api_call", **context}
@@ -50,30 +68,44 @@ class SupersetAPIError(SupersetToolError):
# [ERROR-SUBCLASS] Детализированные ошибки API # [ERROR-SUBCLASS] Детализированные ошибки API
class ExportError(SupersetAPIError): class ExportError(SupersetAPIError):
"""[API:EXPORT] Проблемы экспорта дашбордов""" """[API:EXPORT] Проблемы, специфичные для операций экспорта дашбордов.
... @semantic: Может быть вызвано невалидным форматом ответа, ошибками Superset при экспорте.
@context: dashboard_id (опционально), details (опционально).
"""
def __init__(self, message: str = "Dashboard export failed", **context: Any):
super().__init__(f"[EXPORT_FAILURE] {message}", {"subtype": "export", **context})
class DashboardNotFoundError(SupersetAPIError): class DashboardNotFoundError(SupersetAPIError):
"""[API:404] Запрошенный ресурс не существует""" """[API:404] Запрошенный дашборд или ресурс не существует.
def __init__(self, dashboard_id, **context): @semantic: Соответствует HTTP 404 Not Found.
@context: dashboard_id_or_slug, url.
"""
def __init__(self, dashboard_id_or_slug: Union[int, str], message: str = "Dashboard not found", **context: Any):
super().__init__( super().__init__(
f"Dashboard {dashboard_id} not found", f"[NOT_FOUND] Dashboard '{dashboard_id_or_slug}' {message}",
{"dashboard_id": dashboard_id, **context} {"subtype": "not_found", "resource_id": dashboard_id_or_slug, **context}
) )
# [ERROR-SUBCLASS] Детализированные ошибки обработки файлов # [ERROR-SUBCLASS] Детализированные ошибки обработки файлов
class InvalidZipFormatError(SupersetAPIError): class InvalidZipFormatError(SupersetToolError):
"""[API:ZIP] Некорректный формат ZIP-архива """[FILE:ZIP] Некорректный формат ZIP-архива или содержимого для импорта/экспорта.
@context: file_path, expected_format, error_detail @semantic: Указывает на проблемы с целостностью или структурой ZIP-файла.
@context: file_path, expected_content (например, metadata.yaml), error_detail.
""" """
def __init__(self, file_path: str, **context): def __init__(self, message: str = "Invalid ZIP format or content", file_path: Optional[Union[str, Path]] = None, **context: Any):
super().__init__( super().__init__(
f"Invalid ZIP format for file: {file_path}", f"[FILE_ERROR] {message}",
{"type": "zip_validation", "file_path": file_path, **context} {"type": "file_validation", "file_path": str(file_path) if file_path else "N/A", **context}
) )
# [ERROR-GROUP] Системные и network-ошибки # [ERROR-GROUP] Системные и network-ошибки
class NetworkError(SupersetToolError): class NetworkError(SupersetToolError):
"""[NETWORK] Проблемы соединения или таймауты""" """[NETWORK] Проблемы соединения, таймауты, DNS-ошибки и т.п.
... @semantic: Ошибки, связанные с невозможностью установить или поддерживать сетевое соединение.
@context: url, original_exception (опционально), timeout (опционально).
"""
def __init__(self, message: str = "Network connection failed", **context: Any):
super().__init__(
f"[NETWORK_FAILURE] {message}",
{"type": "network", **context}
)

View File

@@ -1,42 +1,77 @@
# [MODULE] Сущности данных конфигурации # [MODULE] Сущности данных конфигурации
# @desc: Определяет структуры данных для работы с Superset API # @desc: Определяет структуры данных, используемые для конфигурации и трансформации в инструменте Superset.
# @contracts: # @contracts:
# - Проверка валидности URL # - Все модели наследуются от `pydantic.BaseModel` для автоматической валидации.
# - Валидация параметров аутентификации # - Валидация URL-адресов и параметров аутентификации.
# - Валидация структуры конфигурации БД для миграций.
# @coherence: # @coherence:
# - Все модели согласованы с API Superset v1 # - Все модели согласованы со схемой API Superset v1.
# - Совместимы с клиентскими методами # - Совместимы с клиентскими методами `SupersetClient` и утилитами.
# [IMPORTS] Models # [IMPORTS] Pydantic и Typing
from typing import Optional, Dict, Any from typing import Optional, Dict, Any, Union
from pydantic import BaseModel, validator,Field from pydantic import BaseModel, validator, Field, HttpUrl
# [COHERENCE_CHECK_PASSED] Все необходимые импорты для Pydantic моделей.
# [IMPORTS] Локальные модули
from .utils.logger import SupersetLogger from .utils.logger import SupersetLogger
class SupersetConfig(BaseModel): class SupersetConfig(BaseModel):
"""[CONFIG] Конфигурация подключения к Superset """[CONFIG] Конфигурация подключения к Superset API.
@semantic: Основные параметры подключения к API @semantic: Инкапсулирует основные параметры, необходимые для инициализации `SupersetClient`.
@invariant: @invariant:
- base_url должен содержать версию API (/v1/) - `base_url` должен быть валидным HTTP(S) URL и содержать `/api/v1`.
- auth должен содержать все обязательные поля - `auth` должен содержать обязательные поля для аутентификации по логину/паролю.
- `timeout` должен быть положительным числом.
""" """
base_url: str = Field(..., regex=r'.*/api/v1.*') base_url: str = Field(..., description="Базовый URL Superset API, включая версию /api/v1.", regex=r'.*/api/v1.*')
auth: dict auth: Dict[str, str] = Field(..., description="Словарь с данными для аутентификации (provider, username, password, refresh).")
verify_ssl: bool = True verify_ssl: bool = Field(True, description="Флаг для проверки SSL-сертификатов.")
timeout: int = 30 timeout: int = Field(30, description="Таймаут в секундах для HTTP-запросов.")
logger: Optional[SupersetLogger] = None logger: Optional[SupersetLogger] = Field(None, description="Экземпляр логгера для логирования внутри клиента.")
# [VALIDATOR] Проверка параметров аутентификации # [VALIDATOR] Проверка параметров аутентификации
@validator('auth') @validator('auth')
def validate_auth(cls, v): def validate_auth(cls, v: Dict[str, str]) -> Dict[str, str]:
"""[CONTRACT_VALIDATOR] Валидация словаря `auth`.
@pre:
- `v` должен быть словарем.
@post:
- Возвращает `v` если все обязательные поля присутствуют.
@raise:
- `ValueError`: Если отсутствуют обязательные поля ('provider', 'username', 'password', 'refresh').
"""
required = {'provider', 'username', 'password', 'refresh'} required = {'provider', 'username', 'password', 'refresh'}
if not required.issubset(v.keys()): if not required.issubset(v.keys()):
raise ValueError( raise ValueError(
f"[CONTRACT_VIOLATION] Auth must contain {required}" f"[CONTRACT_VIOLATION] Словарь 'auth' должен содержать поля: {required}. "
f"Отсутствующие: {required - v.keys()}"
) )
# [COHERENCE_CHECK_PASSED] Auth-конфигурация валидна.
return v
# [VALIDATOR] Проверка base_url
@validator('base_url')
def check_base_url_format(cls, v: str) -> str:
"""[CONTRACT_VALIDATOR] Валидация формата `base_url`.
@pre:
- `v` должна быть строкой.
@post:
- Возвращает `v` если это валидный URL.
@raise:
- `ValueError`: Если URL невалиден.
"""
try:
# Для Pydantic v2:
from pydantic import HttpUrl
HttpUrl(v, scheme="https") # Явное указание схемы
except ValueError:
# Для совместимости с Pydantic v1:
HttpUrl(v)
return v return v
class Config: class Config:
arbitrary_types_allowed = True arbitrary_types_allowed = True # Разрешаем Pydantic обрабатывать произвольные типы (например, SupersetLogger)
json_schema_extra = { json_schema_extra = {
"example": { "example": {
"base_url": "https://host/api/v1/", "base_url": "https://host/api/v1/",
@@ -45,28 +80,42 @@ class SupersetConfig(BaseModel):
"username": "user", "username": "user",
"password": "pass", "password": "pass",
"refresh": True "refresh": True
} },
"verify_ssl": True,
"timeout": 60
} }
} }
# [SEMANTIC-TYPE] Конфигурация БД для миграций # [SEMANTIC-TYPE] Конфигурация БД для миграций
class DatabaseConfig(BaseModel): class DatabaseConfig(BaseModel):
"""[CONFIG] Параметры трансформации БД при миграции """[CONFIG] Параметры трансформации баз данных при миграции дашбордов.
@semantic: Содержит old/new состояние для преобразования @semantic: Содержит `old` и `new` состояния конфигурации базы данных,
используемые для поиска и замены в YAML-файлах экспортированных дашбордов.
@invariant: @invariant:
- Должны быть указаны оба состояния (old/new) - `database_config` должен быть словарем с ключами 'old' и 'new'.
- UUID должен соответствовать формату - Каждое из 'old' и 'new' должно быть словарем, содержащим метаданные БД Superset.
""" """
database_config: Dict[str, Dict[str, Any]] database_config: Dict[str, Dict[str, Any]] = Field(..., description="Словарь, содержащий 'old' и 'new' конфигурации базы данных.")
logger: Optional[SupersetLogger] = None logger: Optional[SupersetLogger] = Field(None, description="Экземпляр логгера для логирования.")
@validator('database_config') @validator('database_config')
def validate_config(cls, v): def validate_config(cls, v: Dict[str, Dict[str, Any]]) -> Dict[str, Dict[str, Any]]:
"""[CONTRACT_VALIDATOR] Валидация словаря `database_config`.
@pre:
- `v` должен быть словарем.
@post:
- Возвращает `v` если содержит ключи 'old' и 'new'.
@raise:
- `ValueError`: Если отсутствуют ключи 'old' или 'new'.
"""
if not {'old', 'new'}.issubset(v.keys()): if not {'old', 'new'}.issubset(v.keys()):
raise ValueError( raise ValueError(
"[COHERENCE_ERROR] Config must contain both old/new states" "[CONTRACT_VIOLATION] 'database_config' должен содержать ключи 'old' и 'new'."
) )
# Дополнительно можно добавить проверку структуры `old` и `new` на наличие `uuid`, `database_name` и т.д.
# Для простоты пока ограничимся наличием ключей 'old' и 'new'.
# [COHERENCE_CHECK_PASSED] Конфигурация базы данных для миграции валидна.
return v return v
class Config: class Config:

View File

@@ -50,6 +50,9 @@ class SupersetLogger:
def warning(self, message: str, extra: Optional[dict] = None, exc_info: bool = False): def warning(self, message: str, extra: Optional[dict] = None, exc_info: bool = False):
self.logger.warning(message, extra=extra, exc_info=exc_info) self.logger.warning(message, extra=extra, exc_info=exc_info)
def critical(self, message: str, extra: Optional[dict] = None, exc_info: bool = False):
self.logger.critical(message, extra=extra, exc_info=exc_info)
def debug(self, message: str, extra: Optional[dict] = None, exc_info: bool = False): def debug(self, message: str, extra: Optional[dict] = None, exc_info: bool = False):
self.logger.debug(message, extra=extra, exc_info=exc_info) self.logger.debug(message, extra=extra, exc_info=exc_info)

View File

@@ -0,0 +1,479 @@
# [MODULE] Сетевой клиент для API
# @contract: Инкапсулирует низкоуровневую HTTP-логику, аутентификацию, повторные попытки и обработку сетевых ошибок.
# @semantic_layers:
# 1. Инициализация сессии `requests` с настройками SSL и таймаутов.
# 2. Управление аутентификацией (получение и обновление access/CSRF токенов).
# 3. Выполнение HTTP-запросов (GET, POST и т.д.) с автоматическими заголовками.
# 4. Обработка пагинации для API-ответов.
# 5. Обработка загрузки файлов.
# @coherence:
# - Полностью независим от `SupersetClient`, предоставляя ему чистый API для сетевых операций.
# - Использует `SupersetLogger` для внутреннего логирования.
# - Всегда выбрасывает типизированные исключения из `superset_tool.exceptions`.
# [IMPORTS] Стандартная библиотека
from typing import Optional, Dict, Any, BinaryIO, List, Union
import json
import io
from pathlib import Path
# [IMPORTS] Сторонние библиотеки
import requests
import urllib3 # Для отключения SSL-предупреждений
# [IMPORTS] Локальные модули
from ..exceptions import AuthenticationError, NetworkError, DashboardNotFoundError, SupersetAPIError, PermissionDeniedError
from .logger import SupersetLogger # Импорт логгера
# [CONSTANTS]
DEFAULT_RETRIES = 3
DEFAULT_BACKOFF_FACTOR = 0.5
class APIClient:
"""[NETWORK-CORE] Инкапсулирует HTTP-логику для работы с API.
@contract:
- Гарантирует retry-механизмы для запросов.
- Выполняет SSL-валидацию или отключает ее по конфигурации.
- Автоматически управляет access и CSRF токенами.
- Преобразует HTTP-ошибки в типизированные исключения `superset_tool.exceptions`.
@pre:
- `base_url` должен быть валидным URL.
- `auth` должен содержать необходимые данные для аутентификации.
- `logger` должен быть инициализирован.
@post:
- Аутентификация выполняется при первом запросе или явно через `authenticate()`.
- `self._tokens` всегда содержит актуальные access/CSRF токены после успешной аутентификации.
@invariant:
- Сессия `requests` активна и настроена.
- Все запросы используют актуальные токены.
"""
def __init__(
self,
base_url: str,
auth: Dict[str, Any],
verify_ssl: bool = True,
timeout: int = 30,
logger: Optional[SupersetLogger] = None
):
# [INIT] Основные параметры
self.base_url = base_url
self.auth = auth
self.verify_ssl = verify_ssl
self.timeout = timeout
self.logger = logger or SupersetLogger(name="APIClient") # [COHERENCE_CHECK_PASSED] Инициализация логгера
# [INIT] Сессия Requests
self.session = self._init_session()
self._tokens: Dict[str, str] = {} # [STATE] Хранилище токенов
self._authenticated = False # [STATE] Флаг аутентификации
self.logger.debug(
"[INIT] APIClient инициализирован.",
extra={"base_url": self.base_url, "verify_ssl": self.verify_ssl}
)
def _init_session(self) -> requests.Session:
"""[HELPER] Настройка сессии `requests` с адаптерами и SSL-опциями.
@semantic: Создает и конфигурирует объект `requests.Session`.
"""
session = requests.Session()
# [CONTRACT] Настройка повторных попыток
retries = requests.adapters.Retry(
total=DEFAULT_RETRIES,
backoff_factor=DEFAULT_BACKOFF_FACTOR,
status_forcelist=[500, 502, 503, 504],
allowed_methods={"HEAD", "GET", "POST", "PUT", "DELETE"}
)
session.mount('http://', requests.adapters.HTTPAdapter(max_retries=retries))
session.mount('https://', requests.adapters.HTTPAdapter(max_retries=retries))
session.verify = self.verify_ssl
if not self.verify_ssl:
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
self.logger.warning("[SECURITY] Отключена проверка SSL-сертификатов. Не использовать в продакшене без явной необходимости.")
return session
def authenticate(self) -> Dict[str, str]:
"""[AUTH-FLOW] Получение access и CSRF токенов.
@pre:
- `self.auth` содержит валидные учетные данные.
@post:
- `self._tokens` обновлен актуальными токенами.
- Возвращает обновленные токены.
- `self._authenticated` устанавливается в `True`.
@raise:
- `AuthenticationError`: При ошибках аутентификации (неверные credentials, проблемы с API security).
- `NetworkError`: При проблемах с сетью.
"""
self.logger.info(f"[AUTH] Попытка аутентификации для {self.base_url}")
try:
# Шаг 1: Получение access_token
login_url = f"{self.base_url}/security/login"
response = self.session.post(
login_url,
json=self.auth, # Используем self.auth, который уже имеет "provider": "db", "refresh": True
timeout=self.timeout
)
response.raise_for_status() # Выбросит HTTPError для 4xx/5xx ответов
access_token = response.json()["access_token"]
self.logger.debug("[AUTH] Access token успешно получен.")
# Шаг 2: Получение CSRF токена
csrf_url = f"{self.base_url}/security/csrf_token/"
csrf_response = self.session.get(
csrf_url,
headers={"Authorization": f"Bearer {access_token}"},
timeout=self.timeout
)
csrf_response.raise_for_status()
csrf_token = csrf_response.json()["result"]
self.logger.debug("[AUTH] CSRF token успешно получен.")
# [STATE] Сохранение токенов и обновление флага
self._tokens = {
"access_token": access_token,
"csrf_token": csrf_token
}
self._authenticated = True
self.logger.info("[COHERENCE_CHECK_PASSED] Аутентификация успешно завершена.")
return self._tokens
except requests.exceptions.HTTPError as e:
error_msg = f"HTTP Error during authentication: {e.response.status_code} - {e.response.text}"
self.logger.error(f"[AUTH_FAILED] {error_msg}", exc_info=True)
if e.response.status_code == 401: # Unauthorized
raise AuthenticationError(
f"Неверные учетные данные или истекший токен.",
url=login_url, username=self.auth.get("username"),
status_code=e.response.status_code, response_text=e.response.text
) from e
elif e.response.status_code == 403: # Forbidden
raise PermissionDeniedError(
"Недостаточно прав для аутентификации.",
url=login_url, username=self.auth.get("username"),
status_code=e.response.status_code, response_text=e.response.text
) from e
else:
raise SupersetAPIError(
f"API ошибка при аутентификации: {error_msg}",
url=login_url, status_code=e.response.status_code, response_text=e.response.text
) from e
except requests.exceptions.RequestException as e:
self.logger.error(f"[NETWORK_ERROR] Сетевая ошибка при аутентификации: {str(e)}", exc_info=True)
raise NetworkError(f"Ошибка сети при аутентификации: {str(e)}", url=login_url) from e
except KeyError as e:
self.logger.error(f"[AUTH_FAILED] Некорректный формат ответа при аутентификации: {str(e)}", exc_info=True)
raise AuthenticationError(f"Некорректный формат ответа API при аутентификации: {str(e)}") from e
except Exception as e:
self.logger.critical(f"[CRITICAL] Непредвиденная ошибка аутентификации: {str(e)}", exc_info=True)
raise AuthenticationError(f"Непредвиденная ошибка аутентификации: {str(e)}") from e
@property
def headers(self) -> Dict[str, str]:
"""[INTERFACE] Возвращает стандартные заголовки с текущими токенами.
@semantic: Если токены не получены, пытается выполнить аутентификацию.
@post: Всегда возвращает словарь с 'Authorization' и 'X-CSRFToken'.
@raise: `AuthenticationError` если аутентификация невозможна.
"""
if not self._authenticated:
self.authenticate() # Попытка аутентификации при первом запросе заголовков
# [CONTRACT] Проверка наличия токенов
if not self._tokens or "access_token" not in self._tokens or "csrf_token" not in self._tokens:
self.logger.error("[CONTRACT_VIOLATION] Токены отсутствуют после попытки аутентификации.", extra={"tokens": self._tokens})
raise AuthenticationError("Не удалось получить токены для заголовков.")
return {
"Authorization": f"Bearer {self._tokens['access_token']}",
"X-CSRFToken": self._tokens["csrf_token"],
"Referer": self.base_url,
"Content-Type": "application/json"
}
def request(
self,
method: str,
endpoint: str,
headers: Optional[Dict] = None,
raw_response: bool = False,
**kwargs
) -> Union[requests.Response, Dict[str, Any]]:
"""[NETWORK-CORE] Обертка для всех HTTP-запросов к Superset API.
@semantic:
- Выполняет запрос с заданными параметрами.
- Автоматически добавляет базовые заголовки (токены, CSRF).
- Обрабатывает HTTP-ошибки и преобразует их в типизированные исключения.
- В случае 401/403, пытается обновить токен и повторить запрос один раз.
@pre:
- `method` - валидный HTTP-метод ('GET', 'POST', 'PUT', 'DELETE').
- `endpoint` - валидный путь API.
@post:
- Возвращает объект `requests.Response` (если `raw_response=True`) или `dict` (JSON-ответ).
@raise:
- `AuthenticationError`, `PermissionDeniedError`, `NetworkError`, `SupersetAPIError`, `DashboardNotFoundError`.
"""
full_url = f"{self.base_url}{endpoint}"
self.logger.debug(f"[REQUEST] Выполнение запроса: {method} {full_url}", extra={"kwargs_keys": list(kwargs.keys())})
# [STATE] Заголовки для текущего запроса
_headers = self.headers.copy() # Получаем базовые заголовки с актуальными токенами
if headers: # Объединяем с переданными кастомными заголовками (переданные имеют приоритет)
_headers.update(headers)
retries_left = 1 # Одна попытка на обновление токена
while retries_left >= 0:
try:
response = self.session.request(
method,
full_url,
headers=_headers,
#timeout=self.timeout,
**kwargs
)
response.raise_for_status() # Проверяем статус сразу
self.logger.debug(f"[COHERENCE_CHECK_PASSED] Запрос {method} {endpoint} успешно выполнен.")
return response if raw_response else response.json()
except requests.exceptions.HTTPError as e:
status_code = e.response.status_code
error_context = {
"method": method,
"url": full_url,
"status_code": status_code,
"response_text": e.response.text
}
if status_code in [401, 403] and retries_left > 0:
self.logger.warning(f"[AUTH_REFRESH] Токен истек или недействителен ({status_code}). Попытка обновить и повторить...", extra=error_context)
try:
self.authenticate() # Попытка обновить токены
_headers = self.headers.copy() # Обновляем заголовки с новыми токенами
if headers:
_headers.update(headers)
retries_left -= 1
continue # Повторяем цикл
except AuthenticationError as auth_err:
self.logger.error("[AUTH_FAILED] Не удалось обновить токены.", exc_info=True)
raise PermissionDeniedError("Аутентификация не удалась или права отсутствуют после обновления токена.", **error_context) from auth_err
# [ERROR_MAPPING] Преобразование стандартных HTTP-ошибок в кастомные исключения
if status_code == 404:
raise DashboardNotFoundError(endpoint, context=error_context) from e
elif status_code == 403:
raise PermissionDeniedError("Доступ запрещен.", **error_context) from e
elif status_code == 401:
raise AuthenticationError("Аутентификация не удалась.", **error_context) from e
else:
raise SupersetAPIError(f"Ошибка API: {status_code} - {e.response.text}", **error_context) from e
except requests.exceptions.Timeout as e:
self.logger.error(f"[NETWORK_ERROR] Таймаут запроса: {str(e)}", exc_info=True, extra={"url": full_url})
raise NetworkError("Таймаут запроса", url=full_url) from e
except requests.exceptions.ConnectionError as e:
self.logger.error(f"[NETWORK_ERROR] Ошибка соединения: {str(e)}", exc_info=True, extra={"url": full_url})
raise NetworkError("Ошибка соединения", url=full_url) from e
except requests.exceptions.RequestException as e:
self.logger.critical(f"[CRITICAL] Неизвестная ошибка запроса: {str(e)}", exc_info=True, extra={"url": full_url})
raise NetworkError(f"Неизвестная сетевая ошибка: {str(e)}", url=full_url) from e
except json.JSONDecodeError as e:
self.logger.error(f"[API_FAILED] Ошибка парсинга JSON ответа: {str(e)}", exc_info=True, extra={"url": full_url, "response_text_sample": response.text[:200]})
raise SupersetAPIError(f"Некорректный JSON ответ: {str(e)}", url=full_url) from e
except Exception as e:
self.logger.critical(f"[CRITICAL] Непредвиденная ошибка в APIClient.request: {str(e)}", exc_info=True, extra={"url": full_url})
raise SupersetAPIError(f"Непредвиденная ошибка: {str(e)}", url=full_url) from e
# [COHERENCE_CHECK_FAILED] Если дошли сюда, значит, все повторные попытки провалились
self.logger.error(f"[CONTRACT_VIOLATION] Все повторные попытки для запроса {method} {endpoint} исчерпаны.")
raise SupersetAPIError(f"Все повторные попытки запроса {method} {endpoint} исчерпаны.")
def upload_file(
self,
endpoint: str,
file_obj: Union[str, Path, BinaryIO], # Может быть Path, str или байтовый поток
file_name: str,
form_field: str = "file",
extra_data: Optional[Dict] = None,
timeout: Optional[int] = None
) -> Dict:
"""[CONTRACT] Отправка файла на сервер через POST-запрос.
@pre:
- `endpoint` - валидный API endpoint для загрузки.
- `file_obj` - путь к файлу или открытый бинарный файловый объект.
- `file_name` - имя файла для отправки в форме.
@post:
- Возвращает JSON-ответ от сервера в виде словаря.
@raise:
- `FileNotFoundError`: Если `file_obj` является путем и файл не найден.
- `PermissionDeniedError`: Если недостаточно прав.
- `SupersetAPIError`, `NetworkError`.
"""
full_url = f"{self.base_url}{endpoint}"
_headers = self.headers.copy()
# [IMPORTANT] Content-Type для files формируется requests, поэтому удаляем его из общих заголовков
_headers.pop('Content-Type', None)
files_payload = None
should_close_file = False
if isinstance(file_obj, (str, Path)):
file_path = Path(file_obj)
if not file_path.exists():
self.logger.error(f"[CONTRACT_VIOLATION] Файл для загрузки не найден: {file_path}", extra={"file_path": str(file_path)})
raise FileNotFoundError(f"Файл {file_path} не найден для загрузки.")
files_payload = {form_field: (file_name, open(file_path, 'rb'), 'application/x-zip-compressed')}
should_close_file = True
self.logger.debug(f"[UPLOAD] Загрузка файла из пути: {file_path}")
elif isinstance(file_obj, io.BytesIO): # In-memory binary file
files_payload = {form_field: (file_name, file_obj.getvalue(), 'application/x-zip-compressed')}
self.logger.debug(f"[UPLOAD] Загрузка файла из байтового потока (in-memory).")
elif hasattr(file_obj, 'read') and hasattr(file_obj, 'seek'): # Generic binary file-like object
files_payload = {form_field: (file_name, file_obj, 'application/x-zip-compressed')}
self.logger.debug(f"[UPLOAD] Загрузка файла из файлового объекта.")
else:
self.logger.error(f"[CONTRACT_VIOLATION] Неподдерживаемый тип файла для загрузки: {type(file_obj).__name__}")
raise TypeError("Неподдерживаемый тип 'file_obj'. Ожидается Path, str, io.BytesIO или другой файлоподобный объект.")
try:
response = self.session.post(
url=full_url,
files=files_payload,
data=extra_data or {},
headers=_headers,
timeout=timeout or self.timeout
)
response.raise_for_status()
# [COHERENCE_CHECK_PASSED] Файл успешно загружен.
self.logger.info(f"[UPLOAD_SUCCESS] Файл '{file_name}' успешно загружен на {endpoint}.")
return response.json()
except requests.exceptions.HTTPError as e:
error_context = {
"endpoint": endpoint,
"file": file_name,
"status_code": e.response.status_code,
"response_text": e.response.text
}
if e.response.status_code == 403:
raise PermissionDeniedError("Доступ запрещен для загрузки файла.", **error_context) from e
else:
raise SupersetAPIError(f"Ошибка API при загрузке файла: {e.response.status_code} - {e.response.text}", **error_context) from e
except requests.exceptions.RequestException as e:
error_context = {"endpoint": endpoint, "file": file_name, "error_type": type(e).__name__}
self.logger.error(f"[NETWORK_ERROR] Ошибка запроса при загрузке файла: {str(e)}", exc_info=True, extra=error_context)
raise NetworkError(f"Ошибка сети при загрузке файла: {str(e)}", url=full_url) from e
except Exception as e:
error_context = {"endpoint": endpoint, "file": file_name, "error_type": type(e).__name__}
self.logger.critical(f"[CRITICAL] Непредвиденная ошибка при загрузке файла: {str(e)}", exc_info=True, extra=error_context)
raise SupersetAPIError(f"Непредвиденная ошибка загрузки файла: {str(e)}", context=error_context) from e
finally:
# Закрываем файл, если он был открыт в этом методе
if should_close_file and files_payload and files_payload[form_field] and hasattr(files_payload[form_field][1], 'close'):
files_payload[form_field][1].close()
self.logger.debug(f"[UPLOAD] Закрыт файл '{file_name}'.")
def fetch_paginated_count(
self,
endpoint: str,
query_params: Dict,
count_field: str = "count",
timeout: Optional[int] = None
) -> int:
"""[CONTRACT] Получение общего количества элементов в пагинированном API.
@delegates:
- Использует `self.request` для выполнения HTTP-запроса.
@pre:
- `endpoint` должен указывать на пагинированный ресурс.
- `query_params` должны быть валидны для запроса количества.
@post:
- Возвращает целочисленное количество элементов.
@raise:
- `NetworkError`, `SupersetAPIError`, `KeyError` (если `count_field` не найден).
"""
self.logger.debug(f"[PAGINATION] Запрос количества элементов для {endpoint} с параметрами: {query_params}")
try:
response_json = self.request(
method="GET",
endpoint=endpoint,
params={"q": json.dumps(query_params)},
timeout=timeout or self.timeout
)
if count_field not in response_json:
self.logger.error(
f"[CONTRACT_VIOLATION] Ответ API для {endpoint} не содержит поле '{count_field}'",
extra={"response_keys": list(response_json.keys())}
)
raise KeyError(f"Ответ API для {endpoint} не содержит поле '{count_field}'")
count = response_json[count_field]
self.logger.debug(f"[COHERENCE_CHECK_PASSED] Получено количество: {count} для {endpoint}.")
return count
except (KeyError, SupersetAPIError, NetworkError, PermissionDeniedError, DashboardNotFoundError) as e:
self.logger.error(f"[ERROR] Ошибка получения количества элементов для {endpoint}: {str(e)}", exc_info=True, extra=getattr(e, 'context', {}))
raise
except Exception as e:
error_ctx = {"endpoint": endpoint, "params": query_params, "error_type": type(e).__name__}
self.logger.critical(f"[CRITICAL] Непредвиденная ошибка при получении количества: {str(e)}", exc_info=True, extra=error_ctx)
raise SupersetAPIError(f"Непредвиденная ошибка при получении count для {endpoint}: {str(e)}", context=error_ctx) from e
def fetch_paginated_data(
self,
endpoint: str,
base_query: Dict,
total_count: int,
results_field: str = "result",
timeout: Optional[int] = None
) -> List[Any]:
"""[CONTRACT] Получение всех данных с пагинированного API.
@delegates:
- Использует `self.request` для выполнения запросов по страницам.
@pre:
- `base_query` должен содержать 'page_size'.
- `total_count` должен быть корректным общим количеством элементов.
@post:
- Возвращает список всех собранных данных со всех страниц.
@raise:
- `NetworkError`, `SupersetAPIError`, `ValueError` (если `page_size` невалиден), `KeyError`.
"""
self.logger.debug(f"[PAGINATION] Запуск получения всех данных для {endpoint}. Total: {total_count}, Base Query: {base_query}")
page_size = base_query.get('page_size')
if not page_size or page_size <= 0:
self.logger.error("[CONTRACT_VIOLATION] 'page_size' в базовом запросе невалиден.", extra={"page_size": page_size})
raise ValueError("Параметр 'page_size' должен быть положительным числом.")
total_pages = (total_count + page_size - 1) // page_size
results = []
for page in range(total_pages):
query = {**base_query, 'page': page}
self.logger.debug(f"[PAGINATION] Запрос страницы {page+1}/{total_pages} для {endpoint}.")
try:
response_json = self.request(
method="GET",
endpoint=endpoint,
params={"q": json.dumps(query)},
timeout=timeout or self.timeout
)
if results_field not in response_json:
self.logger.warning(
f"[CONTRACT_VIOLATION] Ответ API для {endpoint} на странице {page} не содержит поле '{results_field}'",
extra={"response_keys": list(response_json.keys())}
)
# Если поле результатов отсутствует на одной странице, это может быть не фатально, но надо залогировать.
continue
results.extend(response_json[results_field])
except (SupersetAPIError, NetworkError, PermissionDeniedError, DashboardNotFoundError) as e:
self.logger.error(f"[ERROR] Ошибка получения страницы {page+1} для {endpoint}: {str(e)}", exc_info=True, extra=getattr(e, 'context', {}))
raise # Пробрасываем ошибку выше, так как не можем продолжить пагинацию
except Exception as e:
error_ctx = {"endpoint": endpoint, "page": page, "error_type": type(e).__name__}
self.logger.critical(f"[CRITICAL] Непредвиденная ошибка при получении страницы {page+1} для {endpoint}: {str(e)}", exc_info=True, extra=error_ctx)
raise SupersetAPIError(f"Непредвиденная ошибка пагинации для {endpoint}: {str(e)}", context=error_ctx) from e
self.logger.debug(f"[COHERENCE_CHECK_PASSED] Все данные с пагинацией для {endpoint} успешно собраны. Всего элементов: {len(results)}")
return results