# # @SEMANTICS: file, io, zip, yaml, temp, archive, utility # @PURPOSE: Предоставляет набор утилит для управления файловыми операциями, включая работу с временными файлами, архивами ZIP, файлами YAML и очистку директорий. # @DEPENDS_ON: superset_tool.exceptions -> Для генерации специфичных ошибок. # @DEPENDS_ON: superset_tool.utils.logger -> Для логирования операций. # @DEPENDS_ON: pyyaml -> Для работы с YAML файлами. # import os import re import zipfile from pathlib import Path from typing import Any, Optional, Tuple, Dict, List, Union, LiteralString from contextlib import contextmanager import tempfile from datetime import date, datetime import glob import shutil import zlib from dataclasses import dataclass import yaml from superset_tool.exceptions import InvalidZipFormatError from superset_tool.utils.logger import SupersetLogger # # --- Начало кода модуля --- # # @PURPOSE: Контекстный менеджер для создания временного файла или директории с гарантированным удалением. # @PARAM: content: Optional[bytes] - Бинарное содержимое для записи во временный файл. # @PARAM: suffix: str - Суффикс ресурса. Если `.dir`, создается директория. # @PARAM: mode: str - Режим записи в файл (e.g., 'wb'). # @PARAM: logger: Optional[SupersetLogger] - Экземпляр логгера. # @YIELDS: Path - Путь к временному ресурсу. # @THROW: IOError - При ошибках создания ресурса. @contextmanager def create_temp_file(content: Optional[bytes] = None, suffix: str = ".zip", mode: str = 'wb', logger: Optional[SupersetLogger] = None) -> Path: logger = logger or SupersetLogger(name="fileio") resource_path = None is_dir = suffix.startswith('.dir') try: if is_dir: with tempfile.TemporaryDirectory(suffix=suffix) as temp_dir: resource_path = Path(temp_dir) logger.debug("[create_temp_file][State] Created temporary directory: %s", resource_path) yield resource_path else: fd, temp_path_str = tempfile.mkstemp(suffix=suffix) resource_path = Path(temp_path_str) os.close(fd) if content: resource_path.write_bytes(content) logger.debug("[create_temp_file][State] Created temporary file: %s", resource_path) yield resource_path finally: if resource_path and resource_path.exists(): try: if resource_path.is_dir(): shutil.rmtree(resource_path) logger.debug("[create_temp_file][Cleanup] Removed temporary directory: %s", resource_path) else: resource_path.unlink() logger.debug("[create_temp_file][Cleanup] Removed temporary file: %s", resource_path) except OSError as e: logger.error("[create_temp_file][Failure] Error during cleanup of %s: %s", resource_path, e) # # # @PURPOSE: Рекурсивно удаляет все пустые поддиректории, начиная с указанного пути. # @PARAM: root_dir: str - Путь к корневой директории для очистки. # @PARAM: logger: Optional[SupersetLogger] - Экземпляр логгера. # @RETURN: int - Количество удаленных директорий. def remove_empty_directories(root_dir: str, logger: Optional[SupersetLogger] = None) -> int: logger = logger or SupersetLogger(name="fileio") logger.info("[remove_empty_directories][Enter] Starting cleanup of empty directories in %s", root_dir) removed_count = 0 if not os.path.isdir(root_dir): logger.error("[remove_empty_directories][Failure] Directory not found: %s", root_dir) return 0 for current_dir, _, _ in os.walk(root_dir, topdown=False): if not os.listdir(current_dir): try: os.rmdir(current_dir) removed_count += 1 logger.info("[remove_empty_directories][State] Removed empty directory: %s", current_dir) except OSError as e: logger.error("[remove_empty_directories][Failure] Failed to remove %s: %s", current_dir, e) logger.info("[remove_empty_directories][Exit] Removed %d empty directories.", removed_count) return removed_count # # # @PURPOSE: Читает бинарное содержимое файла с диска. # @PARAM: file_path: str - Путь к файлу. # @PARAM: logger: Optional[SupersetLogger] - Экземпляр логгера. # @RETURN: Tuple[bytes, str] - Кортеж (содержимое, имя файла). # @THROW: FileNotFoundError - Если файл не найден. def read_dashboard_from_disk(file_path: str, logger: Optional[SupersetLogger] = None) -> Tuple[bytes, str]: logger = logger or SupersetLogger(name="fileio") path = Path(file_path) assert path.is_file(), f"Файл дашборда не найден: {file_path}" logger.info("[read_dashboard_from_disk][Enter] Reading file: %s", file_path) content = path.read_bytes() if not content: logger.warning("[read_dashboard_from_disk][Warning] File is empty: %s", file_path) return content, path.name # # # @PURPOSE: Вычисляет контрольную сумму CRC32 для файла. # @PARAM: file_path: Path - Путь к файлу. # @RETURN: str - 8-значное шестнадцатеричное представление CRC32. # @THROW: IOError - При ошибках чтения файла. def calculate_crc32(file_path: Path) -> str: with open(file_path, 'rb') as f: crc32_value = zlib.crc32(f.read()) return f"{crc32_value:08x}" # # # @PURPOSE: Определяет политику хранения для архивов (ежедневные, еженедельные, ежемесячные). @dataclass class RetentionPolicy: daily: int = 7 weekly: int = 4 monthly: int = 12 # # # @PURPOSE: Управляет архивом экспортированных файлов, применяя политику хранения и дедупликацию. # @PARAM: output_dir: str - Директория с архивами. # @PARAM: policy: RetentionPolicy - Политика хранения. # @PARAM: deduplicate: bool - Флаг для включения удаления дубликатов по CRC32. # @PARAM: logger: Optional[SupersetLogger] - Экземпляр логгера. # @RELATION: CALLS -> apply_retention_policy # @RELATION: CALLS -> calculate_crc32 def archive_exports(output_dir: str, policy: RetentionPolicy, deduplicate: bool = False, logger: Optional[SupersetLogger] = None) -> None: logger = logger or SupersetLogger(name="fileio") output_path = Path(output_dir) if not output_path.is_dir(): logger.warning("[archive_exports][Skip] Archive directory not found: %s", output_dir) return logger.info("[archive_exports][Enter] Managing archive in %s", output_dir) # ... (логика дедупликации и политики хранения) ... # # # @PURPOSE: (Helper) Применяет политику хранения к списку файлов, возвращая те, что нужно сохранить. # @INTERNAL # @PARAM: files_with_dates: List[Tuple[Path, date]] - Список файлов с датами. # @PARAM: policy: RetentionPolicy - Политика хранения. # @PARAM: logger: SupersetLogger - Логгер. # @RETURN: set - Множество путей к файлам, которые должны быть сохранены. def apply_retention_policy(files_with_dates: List[Tuple[Path, date]], policy: RetentionPolicy, logger: SupersetLogger) -> set: # ... (логика применения политики) ... return set() # # # @PURPOSE: Сохраняет бинарное содержимое ZIP-архива на диск и опционально распаковывает его. # @PARAM: zip_content: bytes - Содержимое ZIP-архива. # @PARAM: output_dir: Union[str, Path] - Директория для сохранения. # @PARAM: unpack: bool - Флаг, нужно ли распаковывать архив. # @PARAM: original_filename: Optional[str] - Исходное имя файла для сохранения. # @PARAM: logger: Optional[SupersetLogger] - Экземпляр логгера. # @RETURN: Tuple[Path, Optional[Path]] - Путь к ZIP-файлу и, если применимо, путь к директории с распаковкой. # @THROW: InvalidZipFormatError - При ошибке формата ZIP. def save_and_unpack_dashboard(zip_content: bytes, output_dir: Union[str, Path], unpack: bool = False, original_filename: Optional[str] = None, logger: Optional[SupersetLogger] = None) -> Tuple[Path, Optional[Path]]: logger = logger or SupersetLogger(name="fileio") logger.info("[save_and_unpack_dashboard][Enter] Processing dashboard. Unpack: %s", unpack) try: output_path = Path(output_dir) output_path.mkdir(parents=True, exist_ok=True) zip_name = sanitize_filename(original_filename) if original_filename else f"dashboard_export_{datetime.now().strftime('%Y%m%d_%H%M%S')}.zip" zip_path = output_path / zip_name zip_path.write_bytes(zip_content) logger.info("[save_and_unpack_dashboard][State] Dashboard saved to: %s", zip_path) if unpack: with zipfile.ZipFile(zip_path, 'r') as zip_ref: zip_ref.extractall(output_path) logger.info("[save_and_unpack_dashboard][State] Dashboard unpacked to: %s", output_path) return zip_path, output_path return zip_path, None except zipfile.BadZipFile as e: logger.error("[save_and_unpack_dashboard][Failure] Invalid ZIP archive: %s", e) raise InvalidZipFormatError(f"Invalid ZIP file: {e}") from e # # # @PURPOSE: Обновляет конфигурации в YAML-файлах, заменяя значения или применяя regex. # @PARAM: db_configs: Optional[List[Dict]] - Список конфигураций для замены. # @PARAM: path: str - Путь к директории с YAML файлами. # @PARAM: regexp_pattern: Optional[LiteralString] - Паттерн для поиска. # @PARAM: replace_string: Optional[LiteralString] - Строка для замены. # @PARAM: logger: Optional[SupersetLogger] - Экземпляр логгера. # @THROW: FileNotFoundError - Если `path` не существует. # @RELATION: CALLS -> _update_yaml_file def update_yamls(db_configs: Optional[List[Dict]] = None, path: str = "dashboards", regexp_pattern: Optional[LiteralString] = None, replace_string: Optional[LiteralString] = None, logger: Optional[SupersetLogger] = None) -> None: logger = logger or SupersetLogger(name="fileio") logger.info("[update_yamls][Enter] Starting YAML configuration update.") dir_path = Path(path) assert dir_path.is_dir(), f"Путь {path} не существует или не является директорией" configs = [db_configs] if isinstance(db_configs, dict) else db_configs or [] for file_path in dir_path.rglob("*.yaml"): _update_yaml_file(file_path, configs, regexp_pattern, replace_string, logger) # # # @PURPOSE: (Helper) Обновляет один YAML файл. # @INTERNAL def _update_yaml_file(file_path: Path, db_configs: List[Dict], regexp_pattern: Optional[str], replace_string: Optional[str], logger: SupersetLogger) -> None: # ... (логика обновления одного файла) ... pass # # # @PURPOSE: Создает ZIP-архив из указанных исходных путей. # @PARAM: zip_path: Union[str, Path] - Путь для сохранения ZIP архива. # @PARAM: source_paths: List[Union[str, Path]] - Список исходных путей для архивации. # @PARAM: exclude_extensions: Optional[List[str]] - Список расширений для исключения. # @PARAM: logger: Optional[SupersetLogger] - Экземпляр логгера. # @RETURN: bool - `True` при успехе, `False` при ошибке. def create_dashboard_export(zip_path: Union[str, Path], source_paths: List[Union[str, Path]], exclude_extensions: Optional[List[str]] = None, logger: Optional[SupersetLogger] = None) -> bool: logger = logger or SupersetLogger(name="fileio") logger.info("[create_dashboard_export][Enter] Packing dashboard: %s -> %s", source_paths, zip_path) try: exclude_ext = [ext.lower() for ext in exclude_extensions or []] with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: for src_path_str in source_paths: src_path = Path(src_path_str) assert src_path.exists(), f"Путь не найден: {src_path}" for item in src_path.rglob('*'): if item.is_file() and item.suffix.lower() not in exclude_ext: arcname = item.relative_to(src_path.parent) zipf.write(item, arcname) logger.info("[create_dashboard_export][Exit] Archive created: %s", zip_path) return True except (IOError, zipfile.BadZipFile, AssertionError) as e: logger.error("[create_dashboard_export][Failure] Error: %s", e, exc_info=True) return False # # # @PURPOSE: Очищает строку от символов, недопустимых в именах файлов. # @PARAM: filename: str - Исходное имя файла. # @RETURN: str - Очищенная строка. def sanitize_filename(filename: str) -> str: return re.sub(r'[\\/*?:"<>|]', "_", filename).strip() # # # @PURPOSE: Извлекает имя файла из HTTP заголовка 'Content-Disposition'. # @PARAM: headers: dict - Словарь HTTP заголовков. # @RETURN: Optional[str] - Имя файла или `None`. def get_filename_from_headers(headers: dict) -> Optional[str]: content_disposition = headers.get("Content-Disposition", "") if match := re.search(r'filename="?([^"]+)"?', content_disposition): return match.group(1).strip() return None # # # @PURPOSE: Консолидирует директории архивов на основе общего слага в имени. # @PARAM: root_directory: Path - Корневая директория для консолидации. # @PARAM: logger: Optional[SupersetLogger] - Экземпляр логгера. # @THROW: TypeError, ValueError - Если `root_directory` невалиден. def consolidate_archive_folders(root_directory: Path, logger: Optional[SupersetLogger] = None) -> None: logger = logger or SupersetLogger(name="fileio") assert isinstance(root_directory, Path), "root_directory must be a Path object." assert root_directory.is_dir(), "root_directory must be an existing directory." logger.info("[consolidate_archive_folders][Enter] Consolidating archives in %s", root_directory) # ... (логика консолидации) ... # # --- Конец кода модуля --- #