refactor, add db search

This commit is contained in:
2025-12-15 19:18:17 +03:00
parent e6346612c4
commit d3395d55c3
24 changed files with 1582 additions and 32542 deletions

View File

@@ -0,0 +1,5 @@
# [DEF:superset_tool.utils:Module]
# @SEMANTICS: package, utils
# @PURPOSE: Utility package for superset_tool.
# @LAYER: Infra
# [/DEF:superset_tool.utils]

View File

@@ -1,37 +1,38 @@
# <GRACE_MODULE id="dataset_mapper" name="dataset_mapper.py">
# @SEMANTICS: dataset, mapping, postgresql, xlsx, superset
# @PURPOSE: Этот модуль отвечает за обновление метаданных (verbose_map) в датасетах Superset, извлекая их из PostgreSQL или XLSX-файлов.
# @DEPENDS_ON: superset_tool.client -> Использует SupersetClient для взаимодействия с API.
# @DEPENDS_ON: pandas -> для чтения XLSX-файлов.
# @DEPENDS_ON: psycopg2 -> для подключения к PostgreSQL.
# <IMPORTS>
import pandas as pd
import psycopg2
# [DEF:superset_tool.utils.dataset_mapper:Module]
#
# @SEMANTICS: dataset, mapping, postgresql, xlsx, superset
# @PURPOSE: Этот модуль отвечает за обновление метаданных (verbose_map) в датасетах Superset, извлекая их из PostgreSQL или XLSX-файлов.
# @LAYER: Domain
# @RELATION: DEPENDS_ON -> superset_tool.client
# @RELATION: DEPENDS_ON -> pandas
# @RELATION: DEPENDS_ON -> psycopg2
# @PUBLIC_API: DatasetMapper
# [SECTION: IMPORTS]
import pandas as pd # type: ignore
import psycopg2 # type: ignore
from superset_tool.client import SupersetClient
from superset_tool.utils.init_clients import setup_clients
from superset_tool.utils.logger import SupersetLogger
from typing import Dict, List, Optional, Any
# </IMPORTS>
# --- Начало кода модуля ---
# <ANCHOR id="DatasetMapper" type="Class">
# @PURPOSE: Класс для меппинга и обновления verbose_map в датасетах Superset.
# [/SECTION]
# [DEF:DatasetMapper:Class]
# @PURPOSE: Класс для меппинга и обновления verbose_map в датасетах Superset.
class DatasetMapper:
def __init__(self, logger: SupersetLogger):
self.logger = logger
# <ANCHOR id="DatasetMapper.get_postgres_comments" type="Function">
# @PURPOSE: Извлекает комментарии к колонкам из системного каталога PostgreSQL.
# @PRE: `db_config` должен содержать валидные креды для подключения к PostgreSQL.
# @PRE: `table_name` и `table_schema` должны быть строками.
# @POST: Возвращается словарь с меппингом `column_name` -> `column_comment`.
# @PARAM: db_config: Dict - Конфигурация для подключения к БД.
# @PARAM: table_name: str - Имя таблицы.
# @PARAM: table_schema: str - Схема таблицы.
# @RETURN: Dict[str, str] - Словарь с комментариями к колонкам.
# @THROW: Exception - При ошибках подключения или выполнения запроса к БД.
# [DEF:DatasetMapper.get_postgres_comments:Function]
# @PURPOSE: Извлекает комментарии к колонкам из системного каталога PostgreSQL.
# @PRE: `db_config` должен содержать валидные креды для подключения к PostgreSQL.
# @PRE: `table_name` и `table_schema` должны быть строками.
# @POST: Возвращается словарь с меппингом `column_name` -> `column_comment`.
# @THROW: Exception - При ошибках подключения или выполнения запроса к БД.
# @PARAM: db_config (Dict) - Конфигурация для подключения к БД.
# @PARAM: table_name (str) - Имя таблицы.
# @PARAM: table_schema (str) - Схема таблицы.
# @RETURN: Dict[str, str] - Словарь с комментариями к колонкам.
def get_postgres_comments(self, db_config: Dict, table_name: str, table_schema: str) -> Dict[str, str]:
self.logger.info("[get_postgres_comments][Enter] Fetching comments from PostgreSQL for %s.%s.", table_schema, table_name)
query = f"""
@@ -84,15 +85,15 @@ class DatasetMapper:
self.logger.error("[get_postgres_comments][Failure] %s", e, exc_info=True)
raise
return comments
# </ANCHOR id="DatasetMapper.get_postgres_comments">
# <ANCHOR id="DatasetMapper.load_excel_mappings" type="Function">
# @PURPOSE: Загружает меппинги 'column_name' -> 'column_comment' из XLSX файла.
# @PRE: `file_path` должен быть валидным путем к XLSX файлу с колонками 'column_name' и 'column_comment'.
# @POST: Возвращается словарь с меппингами.
# @PARAM: file_path: str - Путь к XLSX файлу.
# @RETURN: Dict[str, str] - Словарь с меппингами.
# @THROW: Exception - При ошибках чтения файла или парсинга.
# [/DEF:DatasetMapper.get_postgres_comments]
# [DEF:DatasetMapper.load_excel_mappings:Function]
# @PURPOSE: Загружает меппинги 'column_name' -> 'column_comment' из XLSX файла.
# @PRE: `file_path` должен быть валидным путем к XLSX файлу с колонками 'column_name' и 'column_comment'.
# @POST: Возвращается словарь с меппингами.
# @THROW: Exception - При ошибках чтения файла или парсинга.
# @PARAM: file_path (str) - Путь к XLSX файлу.
# @RETURN: Dict[str, str] - Словарь с меппингами.
def load_excel_mappings(self, file_path: str) -> Dict[str, str]:
self.logger.info("[load_excel_mappings][Enter] Loading mappings from %s.", file_path)
try:
@@ -103,21 +104,21 @@ class DatasetMapper:
except Exception as e:
self.logger.error("[load_excel_mappings][Failure] %s", e, exc_info=True)
raise
# </ANCHOR id="DatasetMapper.load_excel_mappings">
# <ANCHOR id="DatasetMapper.run_mapping" type="Function">
# @PURPOSE: Основная функция для выполнения меппинга и обновления verbose_map датасета в Superset.
# @PARAM: superset_client: SupersetClient - Клиент Superset.
# @PARAM: dataset_id: int - ID датасета для обновления.
# @PARAM: source: str - Источник данных ('postgres', 'excel', 'both').
# @PARAM: postgres_config: Optional[Dict] - Конфигурация для подключения к PostgreSQL.
# @PARAM: excel_path: Optional[str] - Путь к XLSX файлу.
# @PARAM: table_name: Optional[str] - Имя таблицы в PostgreSQL.
# @PARAM: table_schema: Optional[str] - Схема таблицы в PostgreSQL.
# @RELATION: CALLS -> self.get_postgres_comments
# @RELATION: CALLS -> self.load_excel_mappings
# @RELATION: CALLS -> superset_client.get_dataset
# @RELATION: CALLS -> superset_client.update_dataset
# [/DEF:DatasetMapper.load_excel_mappings]
# [DEF:DatasetMapper.run_mapping:Function]
# @PURPOSE: Основная функция для выполнения меппинга и обновления verbose_map датасета в Superset.
# @RELATION: CALLS -> self.get_postgres_comments
# @RELATION: CALLS -> self.load_excel_mappings
# @RELATION: CALLS -> superset_client.get_dataset
# @RELATION: CALLS -> superset_client.update_dataset
# @PARAM: superset_client (SupersetClient) - Клиент Superset.
# @PARAM: dataset_id (int) - ID датасета для обновления.
# @PARAM: source (str) - Источник данных ('postgres', 'excel', 'both').
# @PARAM: postgres_config (Optional[Dict]) - Конфигурация для подключения к PostgreSQL.
# @PARAM: excel_path (Optional[str]) - Путь к XLSX файлу.
# @PARAM: table_name (Optional[str]) - Имя таблицы в PostgreSQL.
# @PARAM: table_schema (Optional[str]) - Схема таблицы в PostgreSQL.
def run_mapping(self, superset_client: SupersetClient, dataset_id: int, source: str, postgres_config: Optional[Dict] = None, excel_path: Optional[str] = None, table_name: Optional[str] = None, table_schema: Optional[str] = None):
self.logger.info("[run_mapping][Enter] Starting dataset mapping for ID %d from source '%s'.", dataset_id, source)
mappings: Dict[str, str] = {}
@@ -132,14 +133,14 @@ class DatasetMapper:
if source not in ['postgres', 'excel', 'both']:
self.logger.error("[run_mapping][Failure] Invalid source: %s.", source)
return
dataset_response = superset_client.get_dataset(dataset_id)
dataset_data = dataset_response['result']
original_columns = dataset_data.get('columns', [])
updated_columns = []
changes_made = False
for column in original_columns:
col_name = column.get('column_name')
@@ -161,7 +162,7 @@ class DatasetMapper:
}
new_column = {k: v for k, v in new_column.items() if v is not None}
if col_name in mappings:
mapping_value = mappings[col_name]
if isinstance(mapping_value, str) and new_column.get('verbose_name') != mapping_value:
@@ -169,7 +170,7 @@ class DatasetMapper:
changes_made = True
updated_columns.append(new_column)
updated_metrics = []
for metric in dataset_data.get("metrics", []):
new_metric = {
@@ -186,7 +187,7 @@ class DatasetMapper:
"uuid": metric.get("uuid"),
}
updated_metrics.append({k: v for k, v in new_metric.items() if v is not None})
if changes_made:
payload_for_update = {
"database_id": dataset_data.get("database", {}).get("id"),
@@ -213,18 +214,16 @@ class DatasetMapper:
}
payload_for_update = {k: v for k, v in payload_for_update.items() if v is not None}
superset_client.update_dataset(dataset_id, payload_for_update)
self.logger.info("[run_mapping][Success] Dataset %d columns' verbose_name updated.", dataset_id)
else:
self.logger.info("[run_mapping][State] No changes in columns' verbose_name, skipping update.")
except (AssertionError, FileNotFoundError, Exception) as e:
self.logger.error("[run_mapping][Failure] %s", e, exc_info=True)
return
# </ANCHOR id="DatasetMapper.run_mapping">
# </ANCHOR id="DatasetMapper">
# --- Конец кода модуля ---
# </GRACE_MODULE id="dataset_mapper">
# [/DEF:DatasetMapper.run_mapping]
# [/DEF:DatasetMapper]
# [/DEF:superset_tool.utils.dataset_mapper]

View File

@@ -1,16 +1,19 @@
# <GRACE_MODULE id="superset_tool.utils.fileio" name="fileio.py">
# @SEMANTICS: file, io, zip, yaml, temp, archive, utility
# @PURPOSE: Предоставляет набор утилит для управления файловыми операциями, включая работу с временными файлами, архивами ZIP, файлами YAML и очистку директорий.
# @DEPENDS_ON: superset_tool.exceptions -> Для генерации специфичных ошибок.
# @DEPENDS_ON: superset_tool.utils.logger -> Для логирования операций.
# @DEPENDS_ON: pyyaml -> Для работы с YAML файлами.
# [DEF:superset_tool.utils.fileio:Module]
#
# @SEMANTICS: file, io, zip, yaml, temp, archive, utility
# @PURPOSE: Предоставляет набор утилит для управления файловыми операциями, включая работу с временными файлами, архивами ZIP, файлами YAML и очистку директорий.
# @LAYER: Infra
# @RELATION: DEPENDS_ON -> superset_tool.exceptions
# @RELATION: DEPENDS_ON -> superset_tool.utils.logger
# @RELATION: DEPENDS_ON -> pyyaml
# @PUBLIC_API: create_temp_file, remove_empty_directories, read_dashboard_from_disk, calculate_crc32, RetentionPolicy, archive_exports, save_and_unpack_dashboard, update_yamls, create_dashboard_export, sanitize_filename, get_filename_from_headers, consolidate_archive_folders
# <IMPORTS>
# [SECTION: IMPORTS]
import os
import re
import zipfile
from pathlib import Path
from typing import Any, Optional, Tuple, Dict, List, Union, LiteralString
from typing import Any, Optional, Tuple, Dict, List, Union, LiteralString, Generator
from contextlib import contextmanager
import tempfile
from datetime import date, datetime
@@ -21,20 +24,18 @@ from dataclasses import dataclass
import yaml
from superset_tool.exceptions import InvalidZipFormatError
from superset_tool.utils.logger import SupersetLogger
# </IMPORTS>
# [/SECTION]
# --- Начало кода модуля ---
# <ANCHOR id="create_temp_file" type="Function">
# @PURPOSE: Контекстный менеджер для создания временного файла или директории с гарантированным удалением.
# @PARAM: content: Optional[bytes] - Бинарное содержимое для записи во временный файл.
# @PARAM: suffix: str - Суффикс ресурса. Если `.dir`, создается директория.
# @PARAM: mode: str - Режим записи в файл (e.g., 'wb').
# @PARAM: logger: Optional[SupersetLogger] - Экземпляр логгера.
# @YIELDS: Path - Путь к временному ресурсу.
# @THROW: IOError - При ошибках создания ресурса.
# [DEF:create_temp_file:Function]
# @PURPOSE: Контекстный менеджер для создания временного файла или директории с гарантированным удалением.
# @PARAM: content (Optional[bytes]) - Бинарное содержимое для записи во временный файл.
# @PARAM: suffix (str) - Суффикс ресурса. Если `.dir`, создается директория.
# @PARAM: mode (str) - Режим записи в файл (e.g., 'wb').
# @PARAM: logger (Optional[SupersetLogger]) - Экземпляр логгера.
# @YIELDS: Path - Путь к временному ресурсу.
# @THROW: IOError - При ошибках создания ресурса.
@contextmanager
def create_temp_file(content: Optional[bytes] = None, suffix: str = ".zip", mode: str = 'wb', logger: Optional[SupersetLogger] = None) -> Path:
def create_temp_file(content: Optional[bytes] = None, suffix: str = ".zip", mode: str = 'wb', logger: Optional[SupersetLogger] = None) -> Generator[Path, None, None]:
logger = logger or SupersetLogger(name="fileio")
resource_path = None
is_dir = suffix.startswith('.dir')
@@ -63,13 +64,13 @@ def create_temp_file(content: Optional[bytes] = None, suffix: str = ".zip", mode
logger.debug("[create_temp_file][Cleanup] Removed temporary file: %s", resource_path)
except OSError as e:
logger.error("[create_temp_file][Failure] Error during cleanup of %s: %s", resource_path, e)
# </ANCHOR id="create_temp_file">
# [/DEF:create_temp_file]
# <ANCHOR id="remove_empty_directories" type="Function">
# @PURPOSE: Рекурсивно удаляет все пустые поддиректории, начиная с указанного пути.
# @PARAM: root_dir: str - Путь к корневой директории для очистки.
# @PARAM: logger: Optional[SupersetLogger] - Экземпляр логгера.
# @RETURN: int - Количество удаленных директорий.
# [DEF:remove_empty_directories:Function]
# @PURPOSE: Рекурсивно удаляет все пустые поддиректории, начиная с указанного пути.
# @PARAM: root_dir (str) - Путь к корневой директории для очистки.
# @PARAM: logger (Optional[SupersetLogger]) - Экземпляр логгера.
# @RETURN: int - Количество удаленных директорий.
def remove_empty_directories(root_dir: str, logger: Optional[SupersetLogger] = None) -> int:
logger = logger or SupersetLogger(name="fileio")
logger.info("[remove_empty_directories][Enter] Starting cleanup of empty directories in %s", root_dir)
@@ -87,14 +88,14 @@ def remove_empty_directories(root_dir: str, logger: Optional[SupersetLogger] = N
logger.error("[remove_empty_directories][Failure] Failed to remove %s: %s", current_dir, e)
logger.info("[remove_empty_directories][Exit] Removed %d empty directories.", removed_count)
return removed_count
# </ANCHOR id="remove_empty_directories">
# [/DEF:remove_empty_directories]
# <ANCHOR id="read_dashboard_from_disk" type="Function">
# @PURPOSE: Читает бинарное содержимое файла с диска.
# @PARAM: file_path: str - Путь к файлу.
# @PARAM: logger: Optional[SupersetLogger] - Экземпляр логгера.
# @RETURN: Tuple[bytes, str] - Кортеж (содержимое, имя файла).
# @THROW: FileNotFoundError - Если файл не найден.
# [DEF:read_dashboard_from_disk:Function]
# @PURPOSE: Читает бинарное содержимое файла с диска.
# @PARAM: file_path (str) - Путь к файлу.
# @PARAM: logger (Optional[SupersetLogger]) - Экземпляр логгера.
# @RETURN: Tuple[bytes, str] - Кортеж (содержимое, имя файла).
# @THROW: FileNotFoundError - Если файл не найден.
def read_dashboard_from_disk(file_path: str, logger: Optional[SupersetLogger] = None) -> Tuple[bytes, str]:
logger = logger or SupersetLogger(name="fileio")
path = Path(file_path)
@@ -104,36 +105,36 @@ def read_dashboard_from_disk(file_path: str, logger: Optional[SupersetLogger] =
if not content:
logger.warning("[read_dashboard_from_disk][Warning] File is empty: %s", file_path)
return content, path.name
# </ANCHOR id="read_dashboard_from_disk">
# [/DEF:read_dashboard_from_disk]
# <ANCHOR id="calculate_crc32" type="Function">
# @PURPOSE: Вычисляет контрольную сумму CRC32 для файла.
# @PARAM: file_path: Path - Путь к файлу.
# @RETURN: str - 8-значное шестнадцатеричное представление CRC32.
# @THROW: IOError - При ошибках чтения файла.
# [DEF:calculate_crc32:Function]
# @PURPOSE: Вычисляет контрольную сумму CRC32 для файла.
# @PARAM: file_path (Path) - Путь к файлу.
# @RETURN: str - 8-значное шестнадцатеричное представление CRC32.
# @THROW: IOError - При ошибках чтения файла.
def calculate_crc32(file_path: Path) -> str:
with open(file_path, 'rb') as f:
crc32_value = zlib.crc32(f.read())
return f"{crc32_value:08x}"
# </ANCHOR id="calculate_crc32">
# [/DEF:calculate_crc32]
# <ANCHOR id="RetentionPolicy" type="DataClass">
# @PURPOSE: Определяет политику хранения для архивов (ежедневные, еженедельные, ежемесячные).
# [DEF:RetentionPolicy:DataClass]
# @PURPOSE: Определяет политику хранения для архивов (ежедневные, еженедельные, ежемесячные).
@dataclass
class RetentionPolicy:
daily: int = 7
weekly: int = 4
monthly: int = 12
# </ANCHOR id="RetentionPolicy">
# [/DEF:RetentionPolicy]
# <ANCHOR id="archive_exports" type="Function">
# @PURPOSE: Управляет архивом экспортированных файлов, применяя политику хранения и дедупликацию.
# @PARAM: output_dir: str - Директория с архивами.
# @PARAM: policy: RetentionPolicy - Политика хранения.
# @PARAM: deduplicate: bool - Флаг для включения удаления дубликатов по CRC32.
# @PARAM: logger: Optional[SupersetLogger] - Экземпляр логгера.
# @RELATION: CALLS -> apply_retention_policy
# @RELATION: CALLS -> calculate_crc32
# [DEF:archive_exports:Function]
# @PURPOSE: Управляет архивом экспортированных файлов, применяя политику хранения и дедупликацию.
# @RELATION: CALLS -> apply_retention_policy
# @RELATION: CALLS -> calculate_crc32
# @PARAM: output_dir (str) - Директория с архивами.
# @PARAM: policy (RetentionPolicy) - Политика хранения.
# @PARAM: deduplicate (bool) - Флаг для включения удаления дубликатов по CRC32.
# @PARAM: logger (Optional[SupersetLogger]) - Экземпляр логгера.
def archive_exports(output_dir: str, policy: RetentionPolicy, deduplicate: bool = False, logger: Optional[SupersetLogger] = None) -> None:
logger = logger or SupersetLogger(name="fileio")
output_path = Path(output_dir)
@@ -142,16 +143,78 @@ def archive_exports(output_dir: str, policy: RetentionPolicy, deduplicate: bool
return
logger.info("[archive_exports][Enter] Managing archive in %s", output_dir)
# ... (логика дедупликации и политики хранения) ...
# </ANCHOR id="archive_exports">
# 1. Collect all zip files
zip_files = list(output_path.glob("*.zip"))
if not zip_files:
logger.info("[archive_exports][State] No zip files found in %s", output_dir)
return
# <ANCHOR id="apply_retention_policy" type="Function">
# @PURPOSE: (Helper) Применяет политику хранения к списку файлов, возвращая те, что нужно сохранить.
# @INTERNAL
# @PARAM: files_with_dates: List[Tuple[Path, date]] - Список файлов с датами.
# @PARAM: policy: RetentionPolicy - Политика хранения.
# @PARAM: logger: SupersetLogger - Логгер.
# @RETURN: set - Множество путей к файлам, которые должны быть сохранены.
# 2. Deduplication
if deduplicate:
logger.info("[archive_exports][State] Starting deduplication...")
checksums = {}
files_to_remove = []
# Sort by modification time (newest first) to keep the latest version
zip_files.sort(key=lambda f: f.stat().st_mtime, reverse=True)
for file_path in zip_files:
try:
crc = calculate_crc32(file_path)
if crc in checksums:
files_to_remove.append(file_path)
logger.debug("[archive_exports][State] Duplicate found: %s (same as %s)", file_path.name, checksums[crc].name)
else:
checksums[crc] = file_path
except Exception as e:
logger.error("[archive_exports][Failure] Failed to calculate CRC32 for %s: %s", file_path, e)
for f in files_to_remove:
try:
f.unlink()
zip_files.remove(f)
logger.info("[archive_exports][State] Removed duplicate: %s", f.name)
except OSError as e:
logger.error("[archive_exports][Failure] Failed to remove duplicate %s: %s", f, e)
# 3. Retention Policy
files_with_dates = []
for file_path in zip_files:
# Try to extract date from filename
# Pattern: ..._YYYYMMDD_HHMMSS.zip or ..._YYYYMMDD.zip
match = re.search(r'_(\d{8})_', file_path.name)
file_date = None
if match:
try:
date_str = match.group(1)
file_date = datetime.strptime(date_str, "%Y%m%d").date()
except ValueError:
pass
if not file_date:
# Fallback to modification time
file_date = datetime.fromtimestamp(file_path.stat().st_mtime).date()
files_with_dates.append((file_path, file_date))
files_to_keep = apply_retention_policy(files_with_dates, policy, logger)
for file_path, _ in files_with_dates:
if file_path not in files_to_keep:
try:
file_path.unlink()
logger.info("[archive_exports][State] Removed by retention policy: %s", file_path.name)
except OSError as e:
logger.error("[archive_exports][Failure] Failed to remove %s: %s", file_path, e)
# [/DEF:archive_exports]
# [DEF:apply_retention_policy:Function]
# @PURPOSE: (Helper) Применяет политику хранения к списку файлов, возвращая те, что нужно сохранить.
# @PARAM: files_with_dates (List[Tuple[Path, date]]) - Список файлов с датами.
# @PARAM: policy (RetentionPolicy) - Политика хранения.
# @PARAM: logger (SupersetLogger) - Логгер.
# @RETURN: set - Множество путей к файлам, которые должны быть сохранены.
def apply_retention_policy(files_with_dates: List[Tuple[Path, date]], policy: RetentionPolicy, logger: SupersetLogger) -> set:
# Сортируем по дате (от новой к старой)
sorted_files = sorted(files_with_dates, key=lambda x: x[1], reverse=True)
@@ -177,17 +240,17 @@ def apply_retention_policy(files_with_dates: List[Tuple[Path, date]], policy: Re
files_to_keep.update(monthly_files[:policy.monthly])
logger.debug("[apply_retention_policy][State] Keeping %d files according to retention policy", len(files_to_keep))
return files_to_keep
# </ANCHOR id="apply_retention_policy">
# [/DEF:apply_retention_policy]
# <ANCHOR id="save_and_unpack_dashboard" type="Function">
# @PURPOSE: Сохраняет бинарное содержимое ZIP-архива на диск и опционально распаковывает его.
# @PARAM: zip_content: bytes - Содержимое ZIP-архива.
# @PARAM: output_dir: Union[str, Path] - Директория для сохранения.
# @PARAM: unpack: bool - Флаг, нужно ли распаковывать архив.
# @PARAM: original_filename: Optional[str] - Исходное имя файла для сохранения.
# @PARAM: logger: Optional[SupersetLogger] - Экземпляр логгера.
# @RETURN: Tuple[Path, Optional[Path]] - Путь к ZIP-файлу и, если применимо, путь к директории с распаковкой.
# @THROW: InvalidZipFormatError - При ошибке формата ZIP.
# [DEF:save_and_unpack_dashboard:Function]
# @PURPOSE: Сохраняет бинарное содержимое ZIP-архива на диск и опционально распаковывает его.
# @PARAM: zip_content (bytes) - Содержимое ZIP-архива.
# @PARAM: output_dir (Union[str, Path]) - Директория для сохранения.
# @PARAM: unpack (bool) - Флаг, нужно ли распаковывать архив.
# @PARAM: original_filename (Optional[str]) - Исходное имя файла для сохранения.
# @PARAM: logger (Optional[SupersetLogger]) - Экземпляр логгера.
# @RETURN: Tuple[Path, Optional[Path]] - Путь к ZIP-файлу и, если применимо, путь к директории с распаковкой.
# @THROW: InvalidZipFormatError - При ошибке формата ZIP.
def save_and_unpack_dashboard(zip_content: bytes, output_dir: Union[str, Path], unpack: bool = False, original_filename: Optional[str] = None, logger: Optional[SupersetLogger] = None) -> Tuple[Path, Optional[Path]]:
logger = logger or SupersetLogger(name="fileio")
logger.info("[save_and_unpack_dashboard][Enter] Processing dashboard. Unpack: %s", unpack)
@@ -207,17 +270,17 @@ def save_and_unpack_dashboard(zip_content: bytes, output_dir: Union[str, Path],
except zipfile.BadZipFile as e:
logger.error("[save_and_unpack_dashboard][Failure] Invalid ZIP archive: %s", e)
raise InvalidZipFormatError(f"Invalid ZIP file: {e}") from e
# </ANCHOR id="save_and_unpack_dashboard">
# [/DEF:save_and_unpack_dashboard]
# <ANCHOR id="update_yamls" type="Function">
# @PURPOSE: Обновляет конфигурации в YAML-файлах, заменяя значения или применяя regex.
# @PARAM: db_configs: Optional[List[Dict]] - Список конфигураций для замены.
# @PARAM: path: str - Путь к директории с YAML файлами.
# @PARAM: regexp_pattern: Optional[LiteralString] - Паттерн для поиска.
# @PARAM: replace_string: Optional[LiteralString] - Строка для замены.
# @PARAM: logger: Optional[SupersetLogger] - Экземпляр логгера.
# @THROW: FileNotFoundError - Если `path` не существует.
# @RELATION: CALLS -> _update_yaml_file
# [DEF:update_yamls:Function]
# @PURPOSE: Обновляет конфигурации в YAML-файлах, заменяя значения или применяя regex.
# @RELATION: CALLS -> _update_yaml_file
# @THROW: FileNotFoundError - Если `path` не существует.
# @PARAM: db_configs (Optional[List[Dict]]) - Список конфигураций для замены.
# @PARAM: path (str) - Путь к директории с YAML файлами.
# @PARAM: regexp_pattern (Optional[LiteralString]) - Паттерн для поиска.
# @PARAM: replace_string (Optional[LiteralString]) - Строка для замены.
# @PARAM: logger (Optional[SupersetLogger]) - Экземпляр логгера.
def update_yamls(db_configs: Optional[List[Dict]] = None, path: str = "dashboards", regexp_pattern: Optional[LiteralString] = None, replace_string: Optional[LiteralString] = None, logger: Optional[SupersetLogger] = None) -> None:
logger = logger or SupersetLogger(name="fileio")
logger.info("[update_yamls][Enter] Starting YAML configuration update.")
@@ -228,57 +291,64 @@ def update_yamls(db_configs: Optional[List[Dict]] = None, path: str = "dashboard
for file_path in dir_path.rglob("*.yaml"):
_update_yaml_file(file_path, configs, regexp_pattern, replace_string, logger)
# </ANCHOR id="update_yamls">
# [/DEF:update_yamls]
# <ANCHOR id="_update_yaml_file" type="Function">
# @PURPOSE: (Helper) Обновляет один YAML файл.
# @INTERNAL
def _update_yaml_file(file_path: Path, db_configs: List[Dict], regexp_pattern: Optional[str], replace_string: Optional[str], logger: SupersetLogger) -> None:
# Читаем содержимое файла
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
except Exception as e:
logger.error("[_update_yaml_file][Failure] Failed to read %s: %s", file_path, e)
return
# Если задан pattern и replace_string, применяем замену по регулярному выражению
if regexp_pattern and replace_string:
try:
new_content = re.sub(regexp_pattern, replace_string, content)
if new_content != content:
with open(file_path, 'w', encoding='utf-8') as f:
f.write(new_content)
logger.info("[_update_yaml_file][State] Updated %s using regex pattern", file_path)
except Exception as e:
logger.error("[_update_yaml_file][Failure] Error applying regex to %s: %s", file_path, e)
# Если заданы конфигурации, заменяем значения
if db_configs:
try:
parsed_data = yaml.safe_load(content)
if not isinstance(parsed_data, dict):
logger.warning("[_update_yaml_file][Warning] YAML content is not a dictionary in %s", file_path)
return
# Обновляем данные
for config in db_configs:
for key, value in config.items():
if key in parsed_data:
old_value = parsed_data[key]
parsed_data[key] = value
logger.info("[_update_yaml_file][State] Changed %s.%s from %s to %s", file_path, key, old_value, value)
# Записываем обратно
with open(file_path, 'w', encoding='utf-8') as f:
yaml.dump(parsed_data, f, default_flow_style=False, allow_unicode=True)
except Exception as e:
logger.error("[_update_yaml_file][Failure] Error updating YAML in %s: %s", file_path, e)
# </ANCHOR id="_update_yaml_file">
# [DEF:_update_yaml_file:Function]
# @PURPOSE: (Helper) Обновляет один YAML файл.
# @PARAM: file_path (Path) - Путь к файлу.
# @PARAM: db_configs (List[Dict]) - Конфигурации.
# @PARAM: regexp_pattern (Optional[str]) - Паттерн.
# @PARAM: replace_string (Optional[str]) - Замена.
# @PARAM: logger (SupersetLogger) - Логгер.
def _update_yaml_file(file_path: Path, db_configs: List[Dict], regexp_pattern: Optional[str], replace_string: Optional[str], logger: SupersetLogger) -> None:
# Читаем содержимое файла
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
except Exception as e:
logger.error("[_update_yaml_file][Failure] Failed to read %s: %s", file_path, e)
return
# Если задан pattern и replace_string, применяем замену по регулярному выражению
if regexp_pattern and replace_string:
try:
new_content = re.sub(regexp_pattern, replace_string, content)
if new_content != content:
with open(file_path, 'w', encoding='utf-8') as f:
f.write(new_content)
logger.info("[_update_yaml_file][State] Updated %s using regex pattern", file_path)
except Exception as e:
logger.error("[_update_yaml_file][Failure] Error applying regex to %s: %s", file_path, e)
# Если заданы конфигурации, заменяем значения (поддержка old/new)
if db_configs:
try:
# Прямой текстовый заменитель для старых/новых значений, чтобы сохранить структуру файла
modified_content = content
for cfg in db_configs:
# Ожидаем структуру: {'old': {...}, 'new': {...}}
old_cfg = cfg.get('old', {})
new_cfg = cfg.get('new', {})
for key, old_val in old_cfg.items():
if key in new_cfg:
new_val = new_cfg[key]
# Заменяем только точные совпадения старого значения в тексте YAML
if isinstance(old_val, str):
escaped_old = re.escape(old_val)
modified_content = re.sub(escaped_old, new_val, modified_content)
logger.info("[_update_yaml_file][State] Replaced '%s' with '%s' for key %s in %s", old_val, new_val, key, file_path)
# Записываем обратно изменённый контент без парсинга YAML, сохраняем оригинальное форматирование
with open(file_path, 'w', encoding='utf-8') as f:
f.write(modified_content)
except Exception as e:
logger.error("[_update_yaml_file][Failure] Error performing raw replacement in %s: %s", file_path, e)
# [/DEF:_update_yaml_file]
# <ANCHOR id="create_dashboard_export" type="Function">
# @PURPOSE: Создает ZIP-архив из указанных исходных путей.
# @PARAM: zip_path: Union[str, Path] - Путь для сохранения ZIP архива.
# @PARAM: source_paths: List[Union[str, Path]] - Список исходных путей для архивации.
# @PARAM: exclude_extensions: Optional[List[str]] - Список расширений для исключения.
# @PARAM: logger: Optional[SupersetLogger] - Экземпляр логгера.
# @RETURN: bool - `True` при успехе, `False` при ошибке.
# [DEF:create_dashboard_export:Function]
# @PURPOSE: Создает ZIP-архив из указанных исходных путей.
# @PARAM: zip_path (Union[str, Path]) - Путь для сохранения ZIP архива.
# @PARAM: source_paths (List[Union[str, Path]]) - Список исходных путей для архивации.
# @PARAM: exclude_extensions (Optional[List[str]]) - Список расширений для исключения.
# @PARAM: logger (Optional[SupersetLogger]) - Экземпляр логгера.
# @RETURN: bool - `True` при успехе, `False` при ошибке.
def create_dashboard_export(zip_path: Union[str, Path], source_paths: List[Union[str, Path]], exclude_extensions: Optional[List[str]] = None, logger: Optional[SupersetLogger] = None) -> bool:
logger = logger or SupersetLogger(name="fileio")
logger.info("[create_dashboard_export][Enter] Packing dashboard: %s -> %s", source_paths, zip_path)
@@ -297,32 +367,32 @@ def create_dashboard_export(zip_path: Union[str, Path], source_paths: List[Union
except (IOError, zipfile.BadZipFile, AssertionError) as e:
logger.error("[create_dashboard_export][Failure] Error: %s", e, exc_info=True)
return False
# </ANCHOR id="create_dashboard_export">
# [/DEF:create_dashboard_export]
# <ANCHOR id="sanitize_filename" type="Function">
# @PURPOSE: Очищает строку от символов, недопустимых в именах файлов.
# @PARAM: filename: str - Исходное имя файла.
# @RETURN: str - Очищенная строка.
# [DEF:sanitize_filename:Function]
# @PURPOSE: Очищает строку от символов, недопустимых в именах файлов.
# @PARAM: filename (str) - Исходное имя файла.
# @RETURN: str - Очищенная строка.
def sanitize_filename(filename: str) -> str:
return re.sub(r'[\\/*?:"<>|]', "_", filename).strip()
# </ANCHOR id="sanitize_filename">
# [/DEF:sanitize_filename]
# <ANCHOR id="get_filename_from_headers" type="Function">
# @PURPOSE: Извлекает имя файла из HTTP заголовка 'Content-Disposition'.
# @PARAM: headers: dict - Словарь HTTP заголовков.
# @RETURN: Optional[str] - Имя файла или `None`.
# [DEF:get_filename_from_headers:Function]
# @PURPOSE: Извлекает имя файла из HTTP заголовка 'Content-Disposition'.
# @PARAM: headers (dict) - Словарь HTTP заголовков.
# @RETURN: Optional[str] - Имя файла или `None`.
def get_filename_from_headers(headers: dict) -> Optional[str]:
content_disposition = headers.get("Content-Disposition", "")
if match := re.search(r'filename="?([^"]+)"?', content_disposition):
return match.group(1).strip()
return None
# </ANCHOR id="get_filename_from_headers">
# [/DEF:get_filename_from_headers]
# <ANCHOR id="consolidate_archive_folders" type="Function">
# @PURPOSE: Консолидирует директории архивов на основе общего слага в имени.
# @PARAM: root_directory: Path - Корневая директория для консолидации.
# @PARAM: logger: Optional[SupersetLogger] - Экземпляр логгера.
# @THROW: TypeError, ValueError - Если `root_directory` невалиден.
# [DEF:consolidate_archive_folders:Function]
# @PURPOSE: Консолидирует директории архивов на основе общего слага в имени.
# @THROW: TypeError, ValueError - Если `root_directory` невалиден.
# @PARAM: root_directory (Path) - Корневая директория для консолидации.
# @PARAM: logger (Optional[SupersetLogger]) - Экземпляр логгера.
def consolidate_archive_folders(root_directory: Path, logger: Optional[SupersetLogger] = None) -> None:
logger = logger or SupersetLogger(name="fileio")
assert isinstance(root_directory, Path), "root_directory must be a Path object."
@@ -371,8 +441,6 @@ def consolidate_archive_folders(root_directory: Path, logger: Optional[SupersetL
logger.info("[consolidate_archive_folders][State] Removed source directory: %s", source_dir)
except Exception as e:
logger.error("[consolidate_archive_folders][Failure] Failed to remove source directory %s: %s", source_dir, e)
# </ANCHOR id="consolidate_archive_folders">
# [/DEF:consolidate_archive_folders]
# --- Конец кода модуля ---
# </GRACE_MODULE id="superset_tool.utils.fileio">
# [/DEF:superset_tool.utils.fileio]

View File

@@ -1,40 +1,43 @@
# <GRACE_MODULE id="superset_tool.utils.init_clients" name="init_clients.py">
# @SEMANTICS: utility, factory, client, initialization, configuration
# @PURPOSE: Централизованно инициализирует клиенты Superset для различных окружений (DEV, PROD, SBX, PREPROD), используя `keyring` для безопасного доступа к паролям.
# @DEPENDS_ON: superset_tool.models -> Использует SupersetConfig для создания конфигураций.
# @DEPENDS_ON: superset_tool.client -> Создает экземпляры SupersetClient.
# @DEPENDS_ON: keyring -> Для безопасного получения паролей.
# [DEF:superset_tool.utils.init_clients:Module]
#
# @SEMANTICS: utility, factory, client, initialization, configuration
# @PURPOSE: Централизованно инициализирует клиенты Superset для различных окружений (DEV, PROD, SBX, PREPROD), используя `keyring` для безопасного доступа к паролям.
# @LAYER: Infra
# @RELATION: DEPENDS_ON -> superset_tool.models
# @RELATION: DEPENDS_ON -> superset_tool.client
# @RELATION: DEPENDS_ON -> keyring
# @PUBLIC_API: setup_clients
# <IMPORTS>
# [SECTION: IMPORTS]
import keyring
from typing import Dict
from superset_tool.models import SupersetConfig
from superset_tool.client import SupersetClient
from superset_tool.utils.logger import SupersetLogger
# </IMPORTS>
# [/SECTION]
# --- Начало кода модуля ---
# <ANCHOR id="setup_clients" type="Function">
# @PURPOSE: Инициализирует и возвращает словарь клиентов `SupersetClient` для всех предопределенных окружений.
# @PRE: `keyring` должен содержать пароли для систем "dev migrate", "prod migrate", "sbx migrate", "preprod migrate".
# @PRE: `logger` должен быть валидным экземпляром `SupersetLogger`.
# @POST: Возвращает словарь с инициализированными клиентами.
# @PARAM: logger: SupersetLogger - Экземпляр логгера для записи процесса.
# @RETURN: Dict[str, SupersetClient] - Словарь, где ключ - имя окружения, значение - `SupersetClient`.
# @THROW: ValueError - Если пароль для окружения не найден в `keyring`.
# @THROW: Exception - При любых других ошибках инициализации.
# @RELATION: CREATES_INSTANCE_OF -> SupersetConfig
# @RELATION: CREATES_INSTANCE_OF -> SupersetClient
# [DEF:setup_clients:Function]
# @PURPOSE: Инициализирует и возвращает словарь клиентов `SupersetClient` для всех предопределенных окружений.
# @PRE: `keyring` должен содержать пароли для систем "dev migrate", "prod migrate", "sbx migrate", "preprod migrate".
# @PRE: `logger` должен быть валидным экземпляром `SupersetLogger`.
# @POST: Возвращает словарь с инициализированными клиентами.
# @THROW: ValueError - Если пароль для окружения не найден в `keyring`.
# @THROW: Exception - При любых других ошибках инициализации.
# @RELATION: CREATES_INSTANCE_OF -> SupersetConfig
# @RELATION: CREATES_INSTANCE_OF -> SupersetClient
# @PARAM: logger (SupersetLogger) - Экземпляр логгера для записи процесса.
# @RETURN: Dict[str, SupersetClient] - Словарь, где ключ - имя окружения, значение - `SupersetClient`.
def setup_clients(logger: SupersetLogger) -> Dict[str, SupersetClient]:
logger.info("[setup_clients][Enter] Starting Superset clients initialization.")
clients = {}
environments = {
"dev": "https://devta.bi.dwh.rusal.com/api/v1/",
"prod": "https://prodta.bi.dwh.rusal.com/api/v1/",
"sbx": "https://sandboxta.bi.dwh.rusal.com/api/v1/",
"preprod": "https://preprodta.bi.dwh.rusal.com/api/v1/"
"dev": "https://devta.bi.dwh.rusal.com/api/v1",
"prod": "https://prodta.bi.dwh.rusal.com/api/v1",
"sbx": "https://sandboxta.bi.dwh.rusal.com/api/v1",
"preprod": "https://preprodta.bi.dwh.rusal.com/api/v1",
"uatta": "https://uatta.bi.dwh.rusal.com/api/v1",
"dev5":"https://dev.bi.dwh.rusal.com/api/v1"
}
try:
@@ -60,8 +63,6 @@ def setup_clients(logger: SupersetLogger) -> Dict[str, SupersetClient]:
except Exception as e:
logger.critical("[setup_clients][Failure] Critical error during client initialization: %s", e, exc_info=True)
raise
# </ANCHOR id="setup_clients">
# [/DEF:setup_clients]
# --- Конец кода модуля ---
# </GRACE_MODULE id="superset_tool.utils.init_clients">
# [/DEF:superset_tool.utils.init_clients]

View File

@@ -1,29 +1,34 @@
# <GRACE_MODULE id="superset_tool.utils.logger" name="logger.py">
# @SEMANTICS: logging, utility, infrastructure, wrapper
# @PURPOSE: Предоставляет универсальную обёртку над стандартным `logging.Logger` для унифицированного создания и управления логгерами с выводом в консоль и/или файл.
# [DEF:superset_tool.utils.logger:Module]
#
# @SEMANTICS: logging, utility, infrastructure, wrapper
# @PURPOSE: Предоставляет универсальную обёртку над стандартным `logging.Logger` для унифицированного создания и управления логгерами с выводом в консоль и/или файл.
# @LAYER: Infra
# @RELATION: WRAPS -> logging.Logger
#
# @INVARIANT: Логгер всегда должен иметь имя.
# @PUBLIC_API: SupersetLogger
# <IMPORTS>
# [SECTION: IMPORTS]
import logging
import sys
from datetime import datetime
from pathlib import Path
from typing import Optional, Any, Mapping
# </IMPORTS>
# [/SECTION]
# --- Начало кода модуля ---
# <ANCHOR id="SupersetLogger" type="Class">
# @PURPOSE: Обёртка над `logging.Logger`, которая упрощает конфигурацию и использование логгеров.
# @RELATION: WRAPS -> logging.Logger
# [DEF:SupersetLogger:Class]
# @PURPOSE: Обёртка над `logging.Logger`, которая упрощает конфигурацию и использование логгеров.
# @RELATION: WRAPS -> logging.Logger
class SupersetLogger:
# [DEF:SupersetLogger.__init__:Function]
# @PURPOSE: Конфигурирует и инициализирует логгер, добавляя обработчики для файла и/или консоли.
# @PRE: Если log_dir указан, путь должен быть валидным (или создаваемым).
# @POST: `self.logger` готов к использованию с настроенными обработчиками.
# @PARAM: name (str) - Идентификатор логгера.
# @PARAM: log_dir (Optional[Path]) - Директория для сохранения лог-файлов.
# @PARAM: level (int) - Уровень логирования (e.g., `logging.INFO`).
# @PARAM: console (bool) - Флаг для включения вывода в консоль.
def __init__(self, name: str = "superset_tool", log_dir: Optional[Path] = None, level: int = logging.INFO, console: bool = True) -> None:
# <ANCHOR id="SupersetLogger.__init__" type="Function">
# @PURPOSE: Конфигурирует и инициализирует логгер, добавляя обработчики для файла и/или консоли.
# @PARAM: name: str - Идентификатор логгера.
# @PARAM: log_dir: Optional[Path] - Директория для сохранения лог-файлов.
# @PARAM: level: int - Уровень логирования (e.g., `logging.INFO`).
# @PARAM: console: bool - Флаг для включения вывода в консоль.
# @POST: `self.logger` готов к использованию с настроенными обработчиками.
self.logger = logging.getLogger(name)
self.logger.setLevel(level)
self.logger.propagate = False
@@ -44,52 +49,55 @@ class SupersetLogger:
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(formatter)
self.logger.addHandler(console_handler)
# </ANCHOR id="SupersetLogger.__init__">
# [/DEF:SupersetLogger.__init__]
# <ANCHOR id="SupersetLogger._log" type="Function">
# @PURPOSE: (Helper) Универсальный метод для вызова соответствующего уровня логирования.
# @INTERNAL
# [DEF:SupersetLogger._log:Function]
# @PURPOSE: (Helper) Универсальный метод для вызова соответствующего уровня логирования.
# @PARAM: level_method (Any) - Метод логгера (info, debug, etc).
# @PARAM: msg (str) - Сообщение.
# @PARAM: args (Any) - Аргументы форматирования.
# @PARAM: extra (Optional[Mapping[str, Any]]) - Дополнительные данные.
# @PARAM: exc_info (bool) - Добавлять ли информацию об исключении.
def _log(self, level_method: Any, msg: str, *args: Any, extra: Optional[Mapping[str, Any]] = None, exc_info: bool = False) -> None:
level_method(msg, *args, extra=extra, exc_info=exc_info)
# </ANCHOR id="SupersetLogger._log">
# [/DEF:SupersetLogger._log]
# <ANCHOR id="SupersetLogger.info" type="Function">
# @PURPOSE: Записывает сообщение уровня INFO.
# [DEF:SupersetLogger.info:Function]
# @PURPOSE: Записывает сообщение уровня INFO.
def info(self, msg: str, *args: Any, extra: Optional[Mapping[str, Any]] = None, exc_info: bool = False) -> None:
self._log(self.logger.info, msg, *args, extra=extra, exc_info=exc_info)
# </ANCHOR id="SupersetLogger.info">
# [/DEF:SupersetLogger.info]
# <ANCHOR id="SupersetLogger.debug" type="Function">
# @PURPOSE: Записывает сообщение уровня DEBUG.
# [DEF:SupersetLogger.debug:Function]
# @PURPOSE: Записывает сообщение уровня DEBUG.
def debug(self, msg: str, *args: Any, extra: Optional[Mapping[str, Any]] = None, exc_info: bool = False) -> None:
self._log(self.logger.debug, msg, *args, extra=extra, exc_info=exc_info)
# </ANCHOR id="SupersetLogger.debug">
# [/DEF:SupersetLogger.debug]
# <ANCHOR id="SupersetLogger.warning" type="Function">
# @PURPOSE: Записывает сообщение уровня WARNING.
# [DEF:SupersetLogger.warning:Function]
# @PURPOSE: Записывает сообщение уровня WARNING.
def warning(self, msg: str, *args: Any, extra: Optional[Mapping[str, Any]] = None, exc_info: bool = False) -> None:
self._log(self.logger.warning, msg, *args, extra=extra, exc_info=exc_info)
# </ANCHOR id="SupersetLogger.warning">
# [/DEF:SupersetLogger.warning]
# <ANCHOR id="SupersetLogger.error" type="Function">
# @PURPOSE: Записывает сообщение уровня ERROR.
# [DEF:SupersetLogger.error:Function]
# @PURPOSE: Записывает сообщение уровня ERROR.
def error(self, msg: str, *args: Any, extra: Optional[Mapping[str, Any]] = None, exc_info: bool = False) -> None:
self._log(self.logger.error, msg, *args, extra=extra, exc_info=exc_info)
# </ANCHOR id="SupersetLogger.error">
# [/DEF:SupersetLogger.error]
# <ANCHOR id="SupersetLogger.critical" type="Function">
# @PURPOSE: Записывает сообщение уровня CRITICAL.
# [DEF:SupersetLogger.critical:Function]
# @PURPOSE: Записывает сообщение уровня CRITICAL.
def critical(self, msg: str, *args: Any, extra: Optional[Mapping[str, Any]] = None, exc_info: bool = False) -> None:
self._log(self.logger.critical, msg, *args, extra=extra, exc_info=exc_info)
# </ANCHOR id="SupersetLogger.critical">
# [/DEF:SupersetLogger.critical]
# <ANCHOR id="SupersetLogger.exception" type="Function">
# @PURPOSE: Записывает сообщение уровня ERROR вместе с трассировкой стека текущего исключения.
# [DEF:SupersetLogger.exception:Function]
# @PURPOSE: Записывает сообщение уровня ERROR вместе с трассировкой стека текущего исключения.
def exception(self, msg: str, *args: Any, **kwargs: Any) -> None:
self.logger.exception(msg, *args, **kwargs)
# </ANCHOR id="SupersetLogger.exception">
# </ANCHOR id="SupersetLogger">
# [/DEF:SupersetLogger.exception]
# --- Конец кода модуля ---
# [/DEF:SupersetLogger]
# </GRACE_MODULE id="superset_tool.utils.logger">
# [/DEF:superset_tool.utils.logger]

View File

@@ -1,49 +1,56 @@
# <GRACE_MODULE id="superset_tool.utils.network" name="network.py">
# @SEMANTICS: network, http, client, api, requests, session, authentication
# @PURPOSE: Инкапсулирует низкоуровневую HTTP-логику для взаимодействия с Superset API, включая аутентификацию, управление сессией, retry-логику и обработку ошибок.
# @DEPENDS_ON: superset_tool.exceptions -> Для генерации специфичных сетевых и API ошибок.
# @DEPENDS_ON: superset_tool.utils.logger -> Для детального логирования сетевых операций.
# @DEPENDS_ON: requests -> Основа для выполнения HTTP-запросов.
# [DEF:superset_tool.utils.network:Module]
#
# @SEMANTICS: network, http, client, api, requests, session, authentication
# @PURPOSE: Инкапсулирует низкоуровневую HTTP-логику для взаимодействия с Superset API, включая аутентификацию, управление сессией, retry-логику и обработку ошибок.
# @LAYER: Infra
# @RELATION: DEPENDS_ON -> superset_tool.exceptions
# @RELATION: DEPENDS_ON -> superset_tool.utils.logger
# @RELATION: DEPENDS_ON -> requests
# @PUBLIC_API: APIClient
# <IMPORTS>
from typing import Optional, Dict, Any, List, Union
# [SECTION: IMPORTS]
from typing import Optional, Dict, Any, List, Union, cast
import json
import io
from pathlib import Path
import requests
from requests.adapters import HTTPAdapter
import urllib3
from urllib3.util.retry import Retry
from superset_tool.exceptions import AuthenticationError, NetworkError, DashboardNotFoundError, SupersetAPIError, PermissionDeniedError
from superset_tool.utils.logger import SupersetLogger
# </IMPORTS>
# [/SECTION]
# --- Начало кода модуля ---
# <ANCHOR id="APIClient" type="Class">
# @PURPOSE: Инкапсулирует HTTP-логику для работы с API, включая сессии, аутентификацию, и обработку запросов.
# [DEF:APIClient:Class]
# @PURPOSE: Инкапсулирует HTTP-логику для работы с API, включая сессии, аутентификацию, и обработку запросов.
class APIClient:
DEFAULT_TIMEOUT = 30
# [DEF:APIClient.__init__:Function]
# @PURPOSE: Инициализирует API клиент с конфигурацией, сессией и логгером.
# @PARAM: config (Dict[str, Any]) - Конфигурация.
# @PARAM: verify_ssl (bool) - Проверять ли SSL.
# @PARAM: timeout (int) - Таймаут запросов.
# @PARAM: logger (Optional[SupersetLogger]) - Логгер.
def __init__(self, config: Dict[str, Any], verify_ssl: bool = True, timeout: int = DEFAULT_TIMEOUT, logger: Optional[SupersetLogger] = None):
# <ANCHOR id="APIClient.__init__" type="Function">
# @PURPOSE: Инициализирует API клиент с конфигурацией, сессией и логгером.
self.logger = logger or SupersetLogger(name="APIClient")
self.logger.info("[APIClient.__init__][Enter] Initializing APIClient.")
self.base_url = config.get("base_url")
self.logger.info("[APIClient.__init__][Entry] Initializing APIClient.")
self.base_url: str = config.get("base_url", "")
self.auth = config.get("auth")
self.request_settings = {"verify_ssl": verify_ssl, "timeout": timeout}
self.session = self._init_session()
self._tokens: Dict[str, str] = {}
self._authenticated = False
self.logger.info("[APIClient.__init__][Exit] APIClient initialized.")
# </ANCHOR>
# [/DEF:APIClient.__init__]
# [DEF:APIClient._init_session:Function]
# @PURPOSE: Создает и настраивает `requests.Session` с retry-логикой.
# @RETURN: requests.Session - Настроенная сессия.
def _init_session(self) -> requests.Session:
# <ANCHOR id="APIClient._init_session" type="Function">
# @PURPOSE: Создает и настраивает `requests.Session` с retry-логикой.
# @INTERNAL
session = requests.Session()
retries = requests.adapters.Retry(total=3, backoff_factor=0.5, status_forcelist=[500, 502, 503, 504])
adapter = requests.adapters.HTTPAdapter(max_retries=retries)
retries = Retry(total=3, backoff_factor=0.5, status_forcelist=[500, 502, 503, 504])
adapter = HTTPAdapter(max_retries=retries)
session.mount('http://', adapter)
session.mount('https://', adapter)
if not self.request_settings["verify_ssl"]:
@@ -51,14 +58,14 @@ class APIClient:
self.logger.warning("[_init_session][State] SSL verification disabled.")
session.verify = self.request_settings["verify_ssl"]
return session
# </ANCHOR>
# [/DEF:APIClient._init_session]
# [DEF:APIClient.authenticate:Function]
# @PURPOSE: Выполняет аутентификацию в Superset API и получает access и CSRF токены.
# @POST: `self._tokens` заполнен, `self._authenticated` установлен в `True`.
# @RETURN: Dict[str, str] - Словарь с токенами.
# @THROW: AuthenticationError, NetworkError - при ошибках.
def authenticate(self) -> Dict[str, str]:
# <ANCHOR id="APIClient.authenticate" type="Function">
# @PURPOSE: Выполняет аутентификацию в Superset API и получает access и CSRF токены.
# @POST: `self._tokens` заполнен, `self._authenticated` установлен в `True`.
# @RETURN: Словарь с токенами.
# @THROW: AuthenticationError, NetworkError - при ошибках.
self.logger.info("[authenticate][Enter] Authenticating to %s", self.base_url)
try:
login_url = f"{self.base_url}/security/login"
@@ -78,12 +85,12 @@ class APIClient:
raise AuthenticationError(f"Authentication failed: {e}") from e
except (requests.exceptions.RequestException, KeyError) as e:
raise NetworkError(f"Network or parsing error during authentication: {e}") from e
# </ANCHOR>
# [/DEF:APIClient.authenticate]
@property
def headers(self) -> Dict[str, str]:
# <ANCHOR id="APIClient.headers" type="Property">
# @PURPOSE: Возвращает HTTP-заголовки для аутентифицированных запросов.
# [DEF:APIClient.headers:Function]
# @PURPOSE: Возвращает HTTP-заголовки для аутентифицированных запросов.
if not self._authenticated: self.authenticate()
return {
"Authorization": f"Bearer {self._tokens['access_token']}",
@@ -91,13 +98,17 @@ class APIClient:
"Referer": self.base_url,
"Content-Type": "application/json"
}
# </ANCHOR>
# [/DEF:APIClient.headers]
# [DEF:APIClient.request:Function]
# @PURPOSE: Выполняет универсальный HTTP-запрос к API.
# @RETURN: `requests.Response` если `raw_response=True`, иначе `dict`.
# @THROW: SupersetAPIError, NetworkError и их подклассы.
# @PARAM: method (str) - HTTP метод.
# @PARAM: endpoint (str) - API эндпоинт.
# @PARAM: headers (Optional[Dict]) - Дополнительные заголовки.
# @PARAM: raw_response (bool) - Возвращать ли сырой ответ.
def request(self, method: str, endpoint: str, headers: Optional[Dict] = None, raw_response: bool = False, **kwargs) -> Union[requests.Response, Dict[str, Any]]:
# <ANCHOR id="APIClient.request" type="Function">
# @PURPOSE: Выполняет универсальный HTTP-запрос к API.
# @RETURN: `requests.Response` если `raw_response=True`, иначе `dict`.
# @THROW: SupersetAPIError, NetworkError и их подклассы.
full_url = f"{self.base_url}{endpoint}"
_headers = self.headers.copy()
if headers: _headers.update(headers)
@@ -110,34 +121,40 @@ class APIClient:
self._handle_http_error(e, endpoint)
except requests.exceptions.RequestException as e:
self._handle_network_error(e, full_url)
# </ANCHOR>
# [/DEF:APIClient.request]
# [DEF:APIClient._handle_http_error:Function]
# @PURPOSE: (Helper) Преобразует HTTP ошибки в кастомные исключения.
# @PARAM: e (requests.exceptions.HTTPError) - Ошибка.
# @PARAM: endpoint (str) - Эндпоинт.
def _handle_http_error(self, e: requests.exceptions.HTTPError, endpoint: str):
# <ANCHOR id="APIClient._handle_http_error" type="Function">
# @PURPOSE: (Helper) Преобразует HTTP ошибки в кастомные исключения.
# @INTERNAL
status_code = e.response.status_code
if status_code == 404: raise DashboardNotFoundError(endpoint) from e
if status_code == 403: raise PermissionDeniedError() from e
if status_code == 401: raise AuthenticationError() from e
raise SupersetAPIError(f"API Error {status_code}: {e.response.text}") from e
# </ANCHOR>
# [/DEF:APIClient._handle_http_error]
# [DEF:APIClient._handle_network_error:Function]
# @PURPOSE: (Helper) Преобразует сетевые ошибки в `NetworkError`.
# @PARAM: e (requests.exceptions.RequestException) - Ошибка.
# @PARAM: url (str) - URL.
def _handle_network_error(self, e: requests.exceptions.RequestException, url: str):
# <ANCHOR id="APIClient._handle_network_error" type="Function">
# @PURPOSE: (Helper) Преобразует сетевые ошибки в `NetworkError`.
# @INTERNAL
if isinstance(e, requests.exceptions.Timeout): msg = "Request timeout"
elif isinstance(e, requests.exceptions.ConnectionError): msg = "Connection error"
else: msg = f"Unknown network error: {e}"
raise NetworkError(msg, url=url) from e
# </ANCHOR>
# [/DEF:APIClient._handle_network_error]
# [DEF:APIClient.upload_file:Function]
# @PURPOSE: Загружает файл на сервер через multipart/form-data.
# @RETURN: Ответ API в виде словаря.
# @THROW: SupersetAPIError, NetworkError, TypeError.
# @PARAM: endpoint (str) - Эндпоинт.
# @PARAM: file_info (Dict[str, Any]) - Информация о файле.
# @PARAM: extra_data (Optional[Dict]) - Дополнительные данные.
# @PARAM: timeout (Optional[int]) - Таймаут.
def upload_file(self, endpoint: str, file_info: Dict[str, Any], extra_data: Optional[Dict] = None, timeout: Optional[int] = None) -> Dict:
# <ANCHOR id="APIClient.upload_file" type="Function">
# @PURPOSE: Загружает файл на сервер через multipart/form-data.
# @RETURN: Ответ API в виде словаря.
# @THROW: SupersetAPIError, NetworkError, TypeError.
full_url = f"{self.base_url}{endpoint}"
_headers = self.headers.copy(); _headers.pop('Content-Type', None)
@@ -153,32 +170,51 @@ class APIClient:
raise TypeError(f"Unsupported file_obj type: {type(file_obj)}")
return self._perform_upload(full_url, files_payload, extra_data, _headers, timeout)
# </ANCHOR>
# [/DEF:APIClient.upload_file]
# [DEF:APIClient._perform_upload:Function]
# @PURPOSE: (Helper) Выполняет POST запрос с файлом.
# @PARAM: url (str) - URL.
# @PARAM: files (Dict) - Файлы.
# @PARAM: data (Optional[Dict]) - Данные.
# @PARAM: headers (Dict) - Заголовки.
# @PARAM: timeout (Optional[int]) - Таймаут.
# @RETURN: Dict - Ответ.
def _perform_upload(self, url: str, files: Dict, data: Optional[Dict], headers: Dict, timeout: Optional[int]) -> Dict:
# <ANCHOR id="APIClient._perform_upload" type="Function">
# @PURPOSE: (Helper) Выполняет POST запрос с файлом.
# @INTERNAL
try:
response = self.session.post(url, files=files, data=data or {}, headers=headers, timeout=timeout or self.request_settings["timeout"])
response.raise_for_status()
# Добавляем логирование для отладки
if response.status_code == 200:
try:
return response.json()
except Exception as json_e:
self.logger.debug(f"[_perform_upload][Debug] Response is not valid JSON: {response.text[:200]}...")
raise SupersetAPIError(f"API error during upload: Response is not valid JSON: {json_e}") from json_e
return response.json()
except requests.exceptions.HTTPError as e:
raise SupersetAPIError(f"API error during upload: {e.response.text}") from e
except requests.exceptions.RequestException as e:
raise NetworkError(f"Network error during upload: {e}", url=url) from e
# </ANCHOR>
# [/DEF:APIClient._perform_upload]
# [DEF:APIClient.fetch_paginated_count:Function]
# @PURPOSE: Получает общее количество элементов для пагинации.
# @PARAM: endpoint (str) - Эндпоинт.
# @PARAM: query_params (Dict) - Параметры запроса.
# @PARAM: count_field (str) - Поле с количеством.
# @RETURN: int - Количество.
def fetch_paginated_count(self, endpoint: str, query_params: Dict, count_field: str = "count") -> int:
# <ANCHOR id="APIClient.fetch_paginated_count" type="Function">
# @PURPOSE: Получает общее количество элементов для пагинации.
response_json = self.request("GET", endpoint, params={"q": json.dumps(query_params)})
response_json = cast(Dict[str, Any], self.request("GET", endpoint, params={"q": json.dumps(query_params)}))
return response_json.get(count_field, 0)
# </ANCHOR>
# [/DEF:APIClient.fetch_paginated_count]
# [DEF:APIClient.fetch_paginated_data:Function]
# @PURPOSE: Автоматически собирает данные со всех страниц пагинированного эндпоинта.
# @PARAM: endpoint (str) - Эндпоинт.
# @PARAM: pagination_options (Dict[str, Any]) - Опции пагинации.
# @RETURN: List[Any] - Список данных.
def fetch_paginated_data(self, endpoint: str, pagination_options: Dict[str, Any]) -> List[Any]:
# <ANCHOR id="APIClient.fetch_paginated_data" type="Function">
# @PURPOSE: Автоматически собирает данные со всех страниц пагинированного эндпоинта.
base_query, total_count = pagination_options["base_query"], pagination_options["total_count"]
results_field, page_size = pagination_options["results_field"], base_query.get('page_size')
assert page_size and page_size > 0, "'page_size' must be a positive number."
@@ -186,13 +222,11 @@ class APIClient:
results = []
for page in range((total_count + page_size - 1) // page_size):
query = {**base_query, 'page': page}
response_json = self.request("GET", endpoint, params={"q": json.dumps(query)})
response_json = cast(Dict[str, Any], self.request("GET", endpoint, params={"q": json.dumps(query)}))
results.extend(response_json.get(results_field, []))
return results
# </ANCHOR>
# [/DEF:APIClient.fetch_paginated_data]
# </ANCHOR id="APIClient">
# [/DEF:APIClient]
# --- Конец кода модуля ---
# </GRACE_MODULE id="superset_tool.utils.network">
# [/DEF:superset_tool.utils.network]

View File

@@ -1,20 +1,21 @@
# <GRACE_MODULE id="superset_tool.utils.whiptail_fallback" name="whiptail_fallback.py">
# @SEMANTICS: ui, fallback, console, utility, interactive
# @PURPOSE: Предоставляет плотный консольный UI-fallback для интерактивных диалогов, имитируя `whiptail` для систем, где он недоступен.
# [DEF:superset_tool.utils.whiptail_fallback:Module]
#
# @SEMANTICS: ui, fallback, console, utility, interactive
# @PURPOSE: Предоставляет плотный консольный UI-fallback для интерактивных диалогов, имитируя `whiptail` для систем, где он недоступен.
# @LAYER: UI
# @PUBLIC_API: menu, checklist, yesno, msgbox, inputbox, gauge
# <IMPORTS>
# [SECTION: IMPORTS]
import sys
from typing import List, Tuple, Optional, Any
# </IMPORTS>
# [/SECTION]
# --- Начало кода модуля ---
# <ANCHOR id="menu" type="Function">
# @PURPOSE: Отображает меню выбора и возвращает выбранный элемент.
# @PARAM: title: str - Заголовок меню.
# @PARAM: prompt: str - Приглашение к вводу.
# @PARAM: choices: List[str] - Список вариантов для выбора.
# @RETURN: Tuple[int, Optional[str]] - Кортеж (код возврата, выбранный элемент). rc=0 - успех.
# [DEF:menu:Function]
# @PURPOSE: Отображает меню выбора и возвращает выбранный элемент.
# @PARAM: title (str) - Заголовок меню.
# @PARAM: prompt (str) - Приглашение к вводу.
# @PARAM: choices (List[str]) - Список вариантов для выбора.
# @RETURN: Tuple[int, Optional[str]] - Кортеж (код возврата, выбранный элемент). rc=0 - успех.
def menu(title: str, prompt: str, choices: List[str], **kwargs) -> Tuple[int, Optional[str]]:
print(f"\n=== {title} ===\n{prompt}")
for idx, item in enumerate(choices, 1):
@@ -25,14 +26,14 @@ def menu(title: str, prompt: str, choices: List[str], **kwargs) -> Tuple[int, Op
return (0, choices[sel - 1]) if 0 < sel <= len(choices) else (1, None)
except (ValueError, IndexError):
return 1, None
# </ANCHOR id="menu">
# [/DEF:menu]
# <ANCHOR id="checklist" type="Function">
# @PURPOSE: Отображает список с возможностью множественного выбора.
# @PARAM: title: str - Заголовок.
# @PARAM: prompt: str - Приглашение к вводу.
# @PARAM: options: List[Tuple[str, str]] - Список кортежей (значение, метка).
# @RETURN: Tuple[int, List[str]] - Кортеж (код возврата, список выбранных значений).
# [DEF:checklist:Function]
# @PURPOSE: Отображает список с возможностью множественного выбора.
# @PARAM: title (str) - Заголовок.
# @PARAM: prompt (str) - Приглашение к вводу.
# @PARAM: options (List[Tuple[str, str]]) - Список кортежей (значение, метка).
# @RETURN: Tuple[int, List[str]] - Кортеж (код возврата, список выбранных значений).
def checklist(title: str, prompt: str, options: List[Tuple[str, str]], **kwargs) -> Tuple[int, List[str]]:
print(f"\n=== {title} ===\n{prompt}")
for idx, (val, label) in enumerate(options, 1):
@@ -45,40 +46,39 @@ def checklist(title: str, prompt: str, options: List[Tuple[str, str]], **kwargs)
return 0, selected_values
except (ValueError, IndexError):
return 1, []
# </ANCHOR id="checklist">
# [/DEF:checklist]
# <ANCHOR id="yesno" type="Function">
# @PURPOSE: Задает вопрос с ответом да/нет.
# @PARAM: title: str - Заголовок.
# @PARAM: question: str - Вопрос для пользователя.
# @RETURN: bool - `True`, если пользователь ответил "да".
# [DEF:yesno:Function]
# @PURPOSE: Задает вопрос с ответом да/нет.
# @PARAM: title (str) - Заголовок.
# @PARAM: question (str) - Вопрос для пользователя.
# @RETURN: bool - `True`, если пользователь ответил "да".
def yesno(title: str, question: str, **kwargs) -> bool:
ans = input(f"\n=== {title} ===\n{question} (y/n): ").strip().lower()
return ans in ("y", "yes", "да", "д")
# </ANCHOR id="yesno">
# [/DEF:yesno]
# <ANCHOR id="msgbox" type="Function">
# @PURPOSE: Отображает информационное сообщение.
# @PARAM: title: str - Заголовок.
# @PARAM: msg: str - Текст сообщения.
# [DEF:msgbox:Function]
# @PURPOSE: Отображает информационное сообщение.
# @PARAM: title (str) - Заголовок.
# @PARAM: msg (str) - Текст сообщения.
def msgbox(title: str, msg: str, **kwargs) -> None:
print(f"\n=== {title} ===\n{msg}\n")
# </ANCHOR id="msgbox">
# [/DEF:msgbox]
# <ANCHOR id="inputbox" type="Function">
# @PURPOSE: Запрашивает у пользователя текстовый ввод.
# @PARAM: title: str - Заголовок.
# @PARAM: prompt: str - Приглашение к вводу.
# @RETURN: Tuple[int, Optional[str]] - Кортеж (код возврата, введенная строка).
# [DEF:inputbox:Function]
# @PURPOSE: Запрашивает у пользователя текстовый ввод.
# @PARAM: title (str) - Заголовок.
# @PARAM: prompt (str) - Приглашение к вводу.
# @RETURN: Tuple[int, Optional[str]] - Кортеж (код возврата, введенная строка).
def inputbox(title: str, prompt: str, **kwargs) -> Tuple[int, Optional[str]]:
print(f"\n=== {title} ===")
val = input(f"{prompt}\n")
return (0, val) if val else (1, None)
# </ANCHOR id="inputbox">
# [/DEF:inputbox]
# <ANCHOR id="_ConsoleGauge" type="Class">
# @PURPOSE: Контекстный менеджер для имитации `whiptail gauge` в консоли.
# @INTERNAL
# [DEF:_ConsoleGauge:Class]
# @PURPOSE: Контекстный менеджер для имитации `whiptail gauge` в консоли.
class _ConsoleGauge:
def __init__(self, title: str, **kwargs):
self.title = title
@@ -91,16 +91,14 @@ class _ConsoleGauge:
sys.stdout.write(f"\r{txt} "); sys.stdout.flush()
def set_percent(self, percent: int) -> None:
sys.stdout.write(f"{percent}%"); sys.stdout.flush()
# </ANCHOR id="_ConsoleGauge">
# [/DEF:_ConsoleGauge]
# <ANCHOR id="gauge" type="Function">
# @PURPOSE: Создает и возвращает экземпляр `_ConsoleGauge`.
# @PARAM: title: str - Заголовок для индикатора прогресса.
# @RETURN: _ConsoleGauge - Экземпляр контекстного менеджера.
# [DEF:gauge:Function]
# @PURPOSE: Создает и возвращает экземпляр `_ConsoleGauge`.
# @PARAM: title (str) - Заголовок для индикатора прогресса.
# @RETURN: _ConsoleGauge - Экземпляр контекстного менеджера.
def gauge(title: str, **kwargs) -> _ConsoleGauge:
return _ConsoleGauge(title, **kwargs)
# </ANCHOR id="gauge">
# [/DEF:gauge]
# --- Конец кода модуля ---
# </GRACE_MODULE id="superset_tool.utils.whiptail_fallback">
# [/DEF:superset_tool.utils.whiptail_fallback]