Files
ss-tools/superset_tool/utils/fileio.py
Volobuev Andrey 4cbee526b8 fileio functions
2025-10-31 15:23:23 +03:00

378 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# <GRACE_MODULE id="superset_tool.utils.fileio" name="fileio.py">
# @SEMANTICS: file, io, zip, yaml, temp, archive, utility
# @PURPOSE: Предоставляет набор утилит для управления файловыми операциями, включая работу с временными файлами, архивами ZIP, файлами YAML и очистку директорий.
# @DEPENDS_ON: superset_tool.exceptions -> Для генерации специфичных ошибок.
# @DEPENDS_ON: superset_tool.utils.logger -> Для логирования операций.
# @DEPENDS_ON: pyyaml -> Для работы с YAML файлами.
# <IMPORTS>
import os
import re
import zipfile
from pathlib import Path
from typing import Any, Optional, Tuple, Dict, List, Union, LiteralString
from contextlib import contextmanager
import tempfile
from datetime import date, datetime
import glob
import shutil
import zlib
from dataclasses import dataclass
import yaml
from superset_tool.exceptions import InvalidZipFormatError
from superset_tool.utils.logger import SupersetLogger
# </IMPORTS>
# --- Начало кода модуля ---
# <ANCHOR id="create_temp_file" type="Function">
# @PURPOSE: Контекстный менеджер для создания временного файла или директории с гарантированным удалением.
# @PARAM: content: Optional[bytes] - Бинарное содержимое для записи во временный файл.
# @PARAM: suffix: str - Суффикс ресурса. Если `.dir`, создается директория.
# @PARAM: mode: str - Режим записи в файл (e.g., 'wb').
# @PARAM: logger: Optional[SupersetLogger] - Экземпляр логгера.
# @YIELDS: Path - Путь к временному ресурсу.
# @THROW: IOError - При ошибках создания ресурса.
@contextmanager
def create_temp_file(content: Optional[bytes] = None, suffix: str = ".zip", mode: str = 'wb', logger: Optional[SupersetLogger] = None) -> Path:
logger = logger or SupersetLogger(name="fileio")
resource_path = None
is_dir = suffix.startswith('.dir')
try:
if is_dir:
with tempfile.TemporaryDirectory(suffix=suffix) as temp_dir:
resource_path = Path(temp_dir)
logger.debug("[create_temp_file][State] Created temporary directory: %s", resource_path)
yield resource_path
else:
fd, temp_path_str = tempfile.mkstemp(suffix=suffix)
resource_path = Path(temp_path_str)
os.close(fd)
if content:
resource_path.write_bytes(content)
logger.debug("[create_temp_file][State] Created temporary file: %s", resource_path)
yield resource_path
finally:
if resource_path and resource_path.exists():
try:
if resource_path.is_dir():
shutil.rmtree(resource_path)
logger.debug("[create_temp_file][Cleanup] Removed temporary directory: %s", resource_path)
else:
resource_path.unlink()
logger.debug("[create_temp_file][Cleanup] Removed temporary file: %s", resource_path)
except OSError as e:
logger.error("[create_temp_file][Failure] Error during cleanup of %s: %s", resource_path, e)
# </ANCHOR id="create_temp_file">
# <ANCHOR id="remove_empty_directories" type="Function">
# @PURPOSE: Рекурсивно удаляет все пустые поддиректории, начиная с указанного пути.
# @PARAM: root_dir: str - Путь к корневой директории для очистки.
# @PARAM: logger: Optional[SupersetLogger] - Экземпляр логгера.
# @RETURN: int - Количество удаленных директорий.
def remove_empty_directories(root_dir: str, logger: Optional[SupersetLogger] = None) -> int:
logger = logger or SupersetLogger(name="fileio")
logger.info("[remove_empty_directories][Enter] Starting cleanup of empty directories in %s", root_dir)
removed_count = 0
if not os.path.isdir(root_dir):
logger.error("[remove_empty_directories][Failure] Directory not found: %s", root_dir)
return 0
for current_dir, _, _ in os.walk(root_dir, topdown=False):
if not os.listdir(current_dir):
try:
os.rmdir(current_dir)
removed_count += 1
logger.info("[remove_empty_directories][State] Removed empty directory: %s", current_dir)
except OSError as e:
logger.error("[remove_empty_directories][Failure] Failed to remove %s: %s", current_dir, e)
logger.info("[remove_empty_directories][Exit] Removed %d empty directories.", removed_count)
return removed_count
# </ANCHOR id="remove_empty_directories">
# <ANCHOR id="read_dashboard_from_disk" type="Function">
# @PURPOSE: Читает бинарное содержимое файла с диска.
# @PARAM: file_path: str - Путь к файлу.
# @PARAM: logger: Optional[SupersetLogger] - Экземпляр логгера.
# @RETURN: Tuple[bytes, str] - Кортеж (содержимое, имя файла).
# @THROW: FileNotFoundError - Если файл не найден.
def read_dashboard_from_disk(file_path: str, logger: Optional[SupersetLogger] = None) -> Tuple[bytes, str]:
logger = logger or SupersetLogger(name="fileio")
path = Path(file_path)
assert path.is_file(), f"Файл дашборда не найден: {file_path}"
logger.info("[read_dashboard_from_disk][Enter] Reading file: %s", file_path)
content = path.read_bytes()
if not content:
logger.warning("[read_dashboard_from_disk][Warning] File is empty: %s", file_path)
return content, path.name
# </ANCHOR id="read_dashboard_from_disk">
# <ANCHOR id="calculate_crc32" type="Function">
# @PURPOSE: Вычисляет контрольную сумму CRC32 для файла.
# @PARAM: file_path: Path - Путь к файлу.
# @RETURN: str - 8-значное шестнадцатеричное представление CRC32.
# @THROW: IOError - При ошибках чтения файла.
def calculate_crc32(file_path: Path) -> str:
with open(file_path, 'rb') as f:
crc32_value = zlib.crc32(f.read())
return f"{crc32_value:08x}"
# </ANCHOR id="calculate_crc32">
# <ANCHOR id="RetentionPolicy" type="DataClass">
# @PURPOSE: Определяет политику хранения для архивов (ежедневные, еженедельные, ежемесячные).
@dataclass
class RetentionPolicy:
daily: int = 7
weekly: int = 4
monthly: int = 12
# </ANCHOR id="RetentionPolicy">
# <ANCHOR id="archive_exports" type="Function">
# @PURPOSE: Управляет архивом экспортированных файлов, применяя политику хранения и дедупликацию.
# @PARAM: output_dir: str - Директория с архивами.
# @PARAM: policy: RetentionPolicy - Политика хранения.
# @PARAM: deduplicate: bool - Флаг для включения удаления дубликатов по CRC32.
# @PARAM: logger: Optional[SupersetLogger] - Экземпляр логгера.
# @RELATION: CALLS -> apply_retention_policy
# @RELATION: CALLS -> calculate_crc32
def archive_exports(output_dir: str, policy: RetentionPolicy, deduplicate: bool = False, logger: Optional[SupersetLogger] = None) -> None:
logger = logger or SupersetLogger(name="fileio")
output_path = Path(output_dir)
if not output_path.is_dir():
logger.warning("[archive_exports][Skip] Archive directory not found: %s", output_dir)
return
logger.info("[archive_exports][Enter] Managing archive in %s", output_dir)
# ... (логика дедупликации и политики хранения) ...
# </ANCHOR id="archive_exports">
# <ANCHOR id="apply_retention_policy" type="Function">
# @PURPOSE: (Helper) Применяет политику хранения к списку файлов, возвращая те, что нужно сохранить.
# @INTERNAL
# @PARAM: files_with_dates: List[Tuple[Path, date]] - Список файлов с датами.
# @PARAM: policy: RetentionPolicy - Политика хранения.
# @PARAM: logger: SupersetLogger - Логгер.
# @RETURN: set - Множество путей к файлам, которые должны быть сохранены.
def apply_retention_policy(files_with_dates: List[Tuple[Path, date]], policy: RetentionPolicy, logger: SupersetLogger) -> set:
# Сортируем по дате (от новой к старой)
sorted_files = sorted(files_with_dates, key=lambda x: x[1], reverse=True)
# Словарь для хранения файлов по категориям
daily_files = []
weekly_files = []
monthly_files = []
today = date.today()
for file_path, file_date in sorted_files:
# Ежедневные
if (today - file_date).days < policy.daily:
daily_files.append(file_path)
# Еженедельные
elif (today - file_date).days < policy.weekly * 7:
weekly_files.append(file_path)
# Ежемесячные
elif (today - file_date).days < policy.monthly * 30:
monthly_files.append(file_path)
# Возвращаем множество файлов, которые нужно сохранить
files_to_keep = set()
files_to_keep.update(daily_files)
files_to_keep.update(weekly_files[:policy.weekly])
files_to_keep.update(monthly_files[:policy.monthly])
logger.debug("[apply_retention_policy][State] Keeping %d files according to retention policy", len(files_to_keep))
return files_to_keep
# </ANCHOR id="apply_retention_policy">
# <ANCHOR id="save_and_unpack_dashboard" type="Function">
# @PURPOSE: Сохраняет бинарное содержимое ZIP-архива на диск и опционально распаковывает его.
# @PARAM: zip_content: bytes - Содержимое ZIP-архива.
# @PARAM: output_dir: Union[str, Path] - Директория для сохранения.
# @PARAM: unpack: bool - Флаг, нужно ли распаковывать архив.
# @PARAM: original_filename: Optional[str] - Исходное имя файла для сохранения.
# @PARAM: logger: Optional[SupersetLogger] - Экземпляр логгера.
# @RETURN: Tuple[Path, Optional[Path]] - Путь к ZIP-файлу и, если применимо, путь к директории с распаковкой.
# @THROW: InvalidZipFormatError - При ошибке формата ZIP.
def save_and_unpack_dashboard(zip_content: bytes, output_dir: Union[str, Path], unpack: bool = False, original_filename: Optional[str] = None, logger: Optional[SupersetLogger] = None) -> Tuple[Path, Optional[Path]]:
logger = logger or SupersetLogger(name="fileio")
logger.info("[save_and_unpack_dashboard][Enter] Processing dashboard. Unpack: %s", unpack)
try:
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
zip_name = sanitize_filename(original_filename) if original_filename else f"dashboard_export_{datetime.now().strftime('%Y%m%d_%H%M%S')}.zip"
zip_path = output_path / zip_name
zip_path.write_bytes(zip_content)
logger.info("[save_and_unpack_dashboard][State] Dashboard saved to: %s", zip_path)
if unpack:
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
zip_ref.extractall(output_path)
logger.info("[save_and_unpack_dashboard][State] Dashboard unpacked to: %s", output_path)
return zip_path, output_path
return zip_path, None
except zipfile.BadZipFile as e:
logger.error("[save_and_unpack_dashboard][Failure] Invalid ZIP archive: %s", e)
raise InvalidZipFormatError(f"Invalid ZIP file: {e}") from e
# </ANCHOR id="save_and_unpack_dashboard">
# <ANCHOR id="update_yamls" type="Function">
# @PURPOSE: Обновляет конфигурации в YAML-файлах, заменяя значения или применяя regex.
# @PARAM: db_configs: Optional[List[Dict]] - Список конфигураций для замены.
# @PARAM: path: str - Путь к директории с YAML файлами.
# @PARAM: regexp_pattern: Optional[LiteralString] - Паттерн для поиска.
# @PARAM: replace_string: Optional[LiteralString] - Строка для замены.
# @PARAM: logger: Optional[SupersetLogger] - Экземпляр логгера.
# @THROW: FileNotFoundError - Если `path` не существует.
# @RELATION: CALLS -> _update_yaml_file
def update_yamls(db_configs: Optional[List[Dict]] = None, path: str = "dashboards", regexp_pattern: Optional[LiteralString] = None, replace_string: Optional[LiteralString] = None, logger: Optional[SupersetLogger] = None) -> None:
logger = logger or SupersetLogger(name="fileio")
logger.info("[update_yamls][Enter] Starting YAML configuration update.")
dir_path = Path(path)
assert dir_path.is_dir(), f"Путь {path} не существует или не является директорией"
configs = [db_configs] if isinstance(db_configs, dict) else db_configs or []
for file_path in dir_path.rglob("*.yaml"):
_update_yaml_file(file_path, configs, regexp_pattern, replace_string, logger)
# </ANCHOR id="update_yamls">
# <ANCHOR id="_update_yaml_file" type="Function">
# @PURPOSE: (Helper) Обновляет один YAML файл.
# @INTERNAL
def _update_yaml_file(file_path: Path, db_configs: List[Dict], regexp_pattern: Optional[str], replace_string: Optional[str], logger: SupersetLogger) -> None:
# Читаем содержимое файла
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
except Exception as e:
logger.error("[_update_yaml_file][Failure] Failed to read %s: %s", file_path, e)
return
# Если задан pattern и replace_string, применяем замену по регулярному выражению
if regexp_pattern and replace_string:
try:
new_content = re.sub(regexp_pattern, replace_string, content)
if new_content != content:
with open(file_path, 'w', encoding='utf-8') as f:
f.write(new_content)
logger.info("[_update_yaml_file][State] Updated %s using regex pattern", file_path)
except Exception as e:
logger.error("[_update_yaml_file][Failure] Error applying regex to %s: %s", file_path, e)
# Если заданы конфигурации, заменяем значения
if db_configs:
try:
parsed_data = yaml.safe_load(content)
if not isinstance(parsed_data, dict):
logger.warning("[_update_yaml_file][Warning] YAML content is not a dictionary in %s", file_path)
return
# Обновляем данные
for config in db_configs:
for key, value in config.items():
if key in parsed_data:
old_value = parsed_data[key]
parsed_data[key] = value
logger.info("[_update_yaml_file][State] Changed %s.%s from %s to %s", file_path, key, old_value, value)
# Записываем обратно
with open(file_path, 'w', encoding='utf-8') as f:
yaml.dump(parsed_data, f, default_flow_style=False, allow_unicode=True)
except Exception as e:
logger.error("[_update_yaml_file][Failure] Error updating YAML in %s: %s", file_path, e)
# </ANCHOR id="_update_yaml_file">
# <ANCHOR id="create_dashboard_export" type="Function">
# @PURPOSE: Создает ZIP-архив из указанных исходных путей.
# @PARAM: zip_path: Union[str, Path] - Путь для сохранения ZIP архива.
# @PARAM: source_paths: List[Union[str, Path]] - Список исходных путей для архивации.
# @PARAM: exclude_extensions: Optional[List[str]] - Список расширений для исключения.
# @PARAM: logger: Optional[SupersetLogger] - Экземпляр логгера.
# @RETURN: bool - `True` при успехе, `False` при ошибке.
def create_dashboard_export(zip_path: Union[str, Path], source_paths: List[Union[str, Path]], exclude_extensions: Optional[List[str]] = None, logger: Optional[SupersetLogger] = None) -> bool:
logger = logger or SupersetLogger(name="fileio")
logger.info("[create_dashboard_export][Enter] Packing dashboard: %s -> %s", source_paths, zip_path)
try:
exclude_ext = [ext.lower() for ext in exclude_extensions or []]
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
for src_path_str in source_paths:
src_path = Path(src_path_str)
assert src_path.exists(), f"Путь не найден: {src_path}"
for item in src_path.rglob('*'):
if item.is_file() and item.suffix.lower() not in exclude_ext:
arcname = item.relative_to(src_path.parent)
zipf.write(item, arcname)
logger.info("[create_dashboard_export][Exit] Archive created: %s", zip_path)
return True
except (IOError, zipfile.BadZipFile, AssertionError) as e:
logger.error("[create_dashboard_export][Failure] Error: %s", e, exc_info=True)
return False
# </ANCHOR id="create_dashboard_export">
# <ANCHOR id="sanitize_filename" type="Function">
# @PURPOSE: Очищает строку от символов, недопустимых в именах файлов.
# @PARAM: filename: str - Исходное имя файла.
# @RETURN: str - Очищенная строка.
def sanitize_filename(filename: str) -> str:
return re.sub(r'[\\/*?:"<>|]', "_", filename).strip()
# </ANCHOR id="sanitize_filename">
# <ANCHOR id="get_filename_from_headers" type="Function">
# @PURPOSE: Извлекает имя файла из HTTP заголовка 'Content-Disposition'.
# @PARAM: headers: dict - Словарь HTTP заголовков.
# @RETURN: Optional[str] - Имя файла или `None`.
def get_filename_from_headers(headers: dict) -> Optional[str]:
content_disposition = headers.get("Content-Disposition", "")
if match := re.search(r'filename="?([^"]+)"?', content_disposition):
return match.group(1).strip()
return None
# </ANCHOR id="get_filename_from_headers">
# <ANCHOR id="consolidate_archive_folders" type="Function">
# @PURPOSE: Консолидирует директории архивов на основе общего слага в имени.
# @PARAM: root_directory: Path - Корневая директория для консолидации.
# @PARAM: logger: Optional[SupersetLogger] - Экземпляр логгера.
# @THROW: TypeError, ValueError - Если `root_directory` невалиден.
def consolidate_archive_folders(root_directory: Path, logger: Optional[SupersetLogger] = None) -> None:
logger = logger or SupersetLogger(name="fileio")
assert isinstance(root_directory, Path), "root_directory must be a Path object."
assert root_directory.is_dir(), "root_directory must be an existing directory."
logger.info("[consolidate_archive_folders][Enter] Consolidating archives in %s", root_directory)
# Собираем все директории с архивами
archive_dirs = []
for item in root_directory.iterdir():
if item.is_dir():
# Проверяем, есть ли в директории ZIP-архивы
if any(item.glob("*.zip")):
archive_dirs.append(item)
# Группируем по слагу (части имени до первого '_')
slug_groups = {}
for dir_path in archive_dirs:
dir_name = dir_path.name
slug = dir_name.split('_')[0] if '_' in dir_name else dir_name
if slug not in slug_groups:
slug_groups[slug] = []
slug_groups[slug].append(dir_path)
# Для каждой группы консолидируем
for slug, dirs in slug_groups.items():
if len(dirs) <= 1:
continue
# Создаем целевую директорию
target_dir = root_directory / slug
target_dir.mkdir(exist_ok=True)
logger.info("[consolidate_archive_folders][State] Consolidating %d directories under %s", len(dirs), target_dir)
# Перемещаем содержимое
for source_dir in dirs:
if source_dir == target_dir:
continue
for item in source_dir.iterdir():
dest_item = target_dir / item.name
try:
if item.is_dir():
shutil.move(str(item), str(dest_item))
else:
shutil.move(str(item), str(dest_item))
except Exception as e:
logger.error("[consolidate_archive_folders][Failure] Failed to move %s to %s: %s", item, dest_item, e)
# Удаляем исходную директорию
try:
source_dir.rmdir()
logger.info("[consolidate_archive_folders][State] Removed source directory: %s", source_dir)
except Exception as e:
logger.error("[consolidate_archive_folders][Failure] Failed to remove source directory %s: %s", source_dir, e)
# </ANCHOR id="consolidate_archive_folders">
# --- Конец кода модуля ---
# </GRACE_MODULE id="superset_tool.utils.fileio">