Files
ss-tools/superset_tool/utils/fileio.py
2025-06-30 13:09:25 +03:00

954 lines
40 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# [MODULE] File Operations Manager
# @desc: Управление файловыми операциями для дашбордов Superset
# @contracts:
# 1. Валидация ZIP-архивов
# 2. Работа с YAML-конфигами
# 3. Управление директориями
# @coherence:
# - Согласован с SupersetClient
# - Поддерживает все форматы экспорта Superset
# [IMPORTS] Core
import os
import re
import zipfile
from pathlib import Path
from typing import Any, Optional, Tuple, Dict, List, Literal, Union, BinaryIO, LiteralString
from collections import defaultdict
from datetime import date
import glob
from contextlib import contextmanager
# [IMPORTS] Third-party
import yaml
import shutil
import tempfile
from datetime import datetime
# [IMPORTS] Local
from ..models import DatabaseConfig
from ..exceptions import InvalidZipFormatError, DashboardNotFoundError
from ..utils.logger import SupersetLogger
# [CONSTANTS]
ALLOWED_FOLDERS = {'databases', 'datasets', 'charts', 'dashboards'}
# [CONTRACT] Временные ресурсы
@contextmanager
def create_temp_file(
content: Optional[bytes] = None,
suffix: str = ".zip",
mode: str = 'wb',
logger: Optional[SupersetLogger] = None
) -> Path:
"""[CONTEXT-MANAGER] Создание временного файла/директории
@pre:
- suffix должен быть допустимым расширением
- mode соответствует типу содержимого
@post:
- Возвращает Path созданного ресурса
- Гарантирует удаление временного файла при выходе
"""
logger = logger or SupersetLogger(name="fileio", console=False)
try:
if suffix.startswith('.dir'):
with tempfile.TemporaryDirectory(suffix=suffix) as tmp_dir:
logger.debug(f"Создана временная директория: {tmp_dir}")
yield Path(tmp_dir)
else:
with tempfile.NamedTemporaryFile(suffix=suffix, mode=mode, delete=False) as tmp:
if content:
tmp.write(content)
tmp.flush()
logger.debug(f"Создан временный файл: {tmp.name}")
yield Path(tmp.name)
except Exception as e:
logger.error(f"[TEMP_FILE_ERROR] Ошибка создания ресурса: {str(e)}", exc_info=True)
raise
finally:
if 'tmp' in locals() and Path(tmp.name).exists() and not suffix.startswith('.dir'):
Path(tmp.name).unlink(missing_ok=True)
# [SECTION] Directory Management Utilities
def remove_empty_directories(
root_dir: str,
exclude: Optional[List[str]] = None,
logger: Optional[SupersetLogger] = None
) -> int:
"""[CONTRACT] Рекурсивное удаление пустых директорий
@pre:
- root_dir должен существовать и быть директорией
- exclude не должен содержать некорректных символов
@post:
- Возвращает количество удаленных директорий
- Не удаляет директории из списка exclude
- Гарантирует рекурсивную обработку вложенных папок
"""
logger = logger or SupersetLogger(name="fileio", console=False)
logger.info(f"[DIR_CLEANUP] Старт очистки пустых директорий в {root_dir}")
excluded = set(exclude or [])
removed_count = 0
root_path = Path(root_dir)
# [VALIDATION] Проверка корневой директории
if not root_path.exists():
logger.error(f"[DIR_ERROR] Директория не существует: {root_dir}")
return 0
try:
# [PROCESSING] Рекурсивный обход снизу вверх
for current_dir, _, files in os.walk(root_path, topdown=False):
current_path = Path(current_dir)
# [EXCLUSION] Пропуск исключенных директорий
if any(excluded_part in current_path.parts for excluded_part in excluded):
logger.debug(f"[DIR_SKIP] Пропущено по исключению: {current_dir}")
continue
# [REMOVAL] Удаление пустых директорий
if not any(current_path.iterdir()):
try:
current_path.rmdir()
removed_count += 1
logger.info(f"[DIR_REMOVED] Удалена пустая директория: {current_dir}")
except OSError as e:
logger.error(f"[DIR_ERROR] Ошибка удаления {current_dir}: {str(e)}")
except Exception as e:
logger.error(f"[DIR_CLEANUP_ERROR] Критическая ошибка: {str(e)}", exc_info=True)
raise
logger.info(f"[DIR_RESULT] Удалено {removed_count} пустых директорий")
return removed_count
# [SECTION] File Operations
def read_dashboard_from_disk(
file_path: str,
logger: Optional[SupersetLogger] = None
) -> Tuple[bytes, str]:
"""[CONTRACT] Чтение сохраненного дашборда с диска
@pre:
- file_path должен существовать
- Файл должен быть доступен для чтения
@post:
- Возвращает (содержимое файла, имя файла)
- Сохраняет целостность данных
"""
logger = logger or SupersetLogger(name="fileio", console=False)
try:
path = Path(file_path)
if not path.exists():
raise FileNotFoundError(f"[FILE_MISSING] Файл дашборда не найден: {file_path}")
logger.info(f"[FILE_READ] Чтение дашборда с диска: {file_path}")
with open(file_path, "rb") as f:
content = f.read()
if not content:
logger.warning("[FILE_EMPTY] Файл существует, но пуст")
return content, path.name
except Exception as e:
logger.error(f"[FILE_READ_ERROR] Ошибка чтения: {str(e)}", exc_info=True)
raise
# [SECTION] Archive Management
def archive_exports(
output_dir: str,
daily_retention: int = 7,
weekly_retention: int = 4,
monthly_retention: int = 12,
logger: Optional[SupersetLogger] = None
) -> None:
"""[CONTRACT] Управление архивом экспортированных дашбордов
@pre:
- output_dir должен существовать
- Значения retention должны быть >= 0
@post:
- Сохраняет файлы согласно политике хранения
- Удаляет устаревшие архивы
- Сохраняет логическую структуру каталогов
"""
logger = logger or SupersetLogger(name="fileio", console=False)
logger.info(f"[ARCHIVE] Старт очистки архивов в {output_dir}")
# [VALIDATION] Проверка параметров
if not all(isinstance(x, int) and x >= 0 for x in [daily_retention, weekly_retention, monthly_retention]):
raise ValueError("[CONFIG_ERROR] Значения retention должны быть положительными")
try:
export_dir = Path(output_dir)
files_with_dates = []
# [PROCESSING] Сбор данных о файлах
for file in export_dir.glob("*.zip"):
try:
timestamp_str = file.stem.split('_')[-1].split('T')[0]
file_date = datetime.strptime(timestamp_str, "%Y%m%d").date()
except (ValueError, IndexError):
file_date = datetime.fromtimestamp(file.stat().st_mtime).date()
logger.warning(f"[DATE_PARSE] Используется дата модификации для {file.name}")
files_with_dates.append((file, file_date))
# [PROCESSING] Применение политик хранения
keep_files = apply_retention_policy(
files_with_dates,
daily_retention,
weekly_retention,
monthly_retention,
logger
)
# [CLEANUP] Удаление устаревших файлов
for file, _ in files_with_dates:
if file not in keep_files:
file.unlink(missing_ok=True)
logger.info(f"[FILE_REMOVED] Удален архив: {file.name}")
except Exception as e:
logger.error(f"[ARCHIVE_ERROR] Ошибка обработки архивов: {str(e)}", exc_info=True)
raise
def apply_retention_policy(
files_with_dates: List[Tuple[Path, date]],
daily: int,
weekly: int,
monthly: int,
logger: SupersetLogger
) -> set:
"""[HELPER] Применение политик хранения файлов
@pre:
- files_with_dates должен содержать валидные даты
@post:
- Возвращает set файлов для сохранения
- Соответствует указанным retention-правилам
"""
# [GROUPING] Группировка файлов
daily_groups = defaultdict(list)
weekly_groups = defaultdict(list)
monthly_groups = defaultdict(list)
for file, file_date in files_with_dates:
daily_groups[file_date].append(file)
weekly_groups[(file_date.isocalendar().year, file_date.isocalendar().week)].append(file)
monthly_groups[(file_date.year, file_date.month)].append(file)
# [SELECTION] Выбор файлов для сохранения
keep_files = set()
# Daily - последние N дней
sorted_daily = sorted(daily_groups.keys(), reverse=True)[:daily]
for day in sorted_daily:
keep_files.update(daily_groups[day])
# Weekly - последние N недель
sorted_weekly = sorted(weekly_groups.keys(), reverse=True)[:weekly]
for week in sorted_weekly:
keep_files.update(weekly_groups[week])
# Monthly - последние N месяцев
sorted_monthly = sorted(monthly_groups.keys(), reverse=True)[:monthly]
for month in sorted_monthly:
keep_files.update(monthly_groups[month])
logger.debug(f"[RETENTION] Сохранено файлов: {len(keep_files)}")
return keep_files
# [CONTRACT] Сохранение и распаковка дашборда
def save_and_unpack_dashboard(
zip_content: bytes,
output_dir: Union[str, Path],
unpack: bool = False,
original_filename: Optional[str] = None,
logger: Optional[SupersetLogger] = None
) -> Tuple[Path, Optional[Path]]:
"""[OPERATION] Обработка ZIP-архива дашборда
@pre:
- zip_content должен быть валидным ZIP
- output_dir должен существовать или быть возможным для создания
@post:
- Возвращает (путь_к_архиву, путь_распаковки) или (путь_к_архиву, None)
- Сохраняет оригинальную структуру файлов
"""
logger = logger or SupersetLogger(name="fileio", console=False)
logger.info(f"Старт обработки дашборда. Распаковка: {unpack}")
try:
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
logger.debug(f"Директория {output_path} создана/проверена")
zip_name = sanitize_filename(original_filename) if original_filename else None
if not zip_name:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
zip_name = f"dashboard_export_{timestamp}.zip"
logger.debug(f"Сгенерировано имя файла: {zip_name}")
zip_path = output_path / zip_name
logger.info(f"Сохранение дашборда в: {zip_path}")
with open(zip_path, "wb") as f:
f.write(zip_content)
if unpack:
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
zip_ref.extractall(output_path)
logger.info(f"Дашборд распакован в: {output_path}")
return zip_path, output_path
return zip_path, None
except zipfile.BadZipFile as e:
logger.error(f"[ZIP_ERROR] Невалидный ZIP-архив: {str(e)}")
raise InvalidZipFormatError(f"Invalid ZIP file: {str(e)}") from e
except Exception as e:
logger.error(f"[UNPACK_ERROR] Ошибка обработки: {str(e)}", exc_info=True)
raise
def print_directory(
root_dir: str,
logger: Optional[SupersetLogger] = None
) -> None:
"""[CONTRACT] Визуализация структуры директории в древовидном формате
@pre:
- root_dir должен быть валидным путем к директории
@post:
- Выводит в консоль и логи структуру директории
- Не модифицирует файловую систему
@errors:
- ValueError если путь не существует или не является директорией
"""
logger = logger or SupersetLogger(name="fileio", console=False)
logger.debug(f"[DIR_TREE] Начало построения дерева для {root_dir}")
try:
root_path = Path(root_dir)
# [VALIDATION] Проверка существования и типа
if not root_path.exists():
raise ValueError(f"Путь не существует: {root_dir}")
if not root_path.is_dir():
raise ValueError(f"Указан файл вместо директории: {root_dir}")
# [OUTPUT] Форматированный вывод
print(f"\n{root_dir}/")
with os.scandir(root_dir) as entries:
entries = sorted(entries, key=lambda e: e.name)
for idx, entry in enumerate(entries):
is_last = idx == len(entries) - 1
prefix = " └── " if is_last else " ├── "
suffix = "/" if entry.is_dir() else ""
print(f"{prefix}{entry.name}{suffix}")
logger.info(f"[DIR_TREE] Успешно построено дерево для {root_dir}")
except Exception as e:
error_msg = f"[DIR_TREE_ERROR] Ошибка визуализации: {str(e)}"
logger.error(error_msg, exc_info=True)
raise ValueError(error_msg) from e
def validate_directory_structure(
root_dir: str,
logger: Optional[SupersetLogger] = None
) -> bool:
"""[CONTRACT] Валидация структуры директории экспорта Superset
@pre:
- root_dir должен быть валидным путем
@post:
- Возвращает True если структура соответствует требованиям:
1. Ровно один подкаталог верхнего уровня
2. Наличие metadata.yaml
3. Допустимые имена поддиректорий (databases/datasets/charts/dashboards)
@errors:
- ValueError при некорректном пути
"""
logger = logger or SupersetLogger(name="fileio", console=False)
logger.info(f"[DIR_VALIDATION] Валидация структуры в {root_dir}")
try:
root_path = Path(root_dir)
# [BASE VALIDATION]
if not root_path.exists():
raise ValueError(f"Директория не существует: {root_dir}")
if not root_path.is_dir():
raise ValueError(f"Требуется директория, получен файл: {root_dir}")
root_items = os.listdir(root_dir)
# [CHECK 1] Ровно один подкаталог верхнего уровня
if len(root_items) != 1:
logger.warning(f"[VALIDATION_FAIL] Ожидается 1 подкаталог, найдено {len(root_items)}")
return False
subdir_path = root_path / root_items[0]
# [CHECK 2] Должен быть подкаталог
if not subdir_path.is_dir():
logger.warning(f"[VALIDATION_FAIL] {root_items[0]} не является директорией")
return False
# [CHECK 3] Проверка metadata.yaml
if "metadata.yaml" not in os.listdir(subdir_path):
logger.warning("[VALIDATION_FAIL] Отсутствует metadata.yaml")
return False
# [CHECK 4] Валидация поддиректорий
found_folders = set()
for item in os.listdir(subdir_path):
if item == "metadata.yaml":
continue
item_path = subdir_path / item
if not item_path.is_dir():
logger.warning(f"[VALIDATION_FAIL] {item} не является директорией")
return False
if item not in ALLOWED_FOLDERS:
logger.warning(f"[VALIDATION_FAIL] Недопустимая директория: {item}")
return False
if item in found_folders:
logger.warning(f"[VALIDATION_FAIL] Дубликат директории: {item}")
return False
found_folders.add(item)
# [FINAL CHECK]
valid_structure = (
1 <= len(found_folders) <= 4 and
all(folder in ALLOWED_FOLDERS for folder in found_folders)
)
if not valid_structure:
logger.warning(
f"[VALIDATION_FAIL] Некорректный набор директорий: {found_folders}"
)
return valid_structure
except Exception as e:
error_msg = f"[DIR_VALIDATION_ERROR] Критическая ошибка: {str(e)}"
logger.error(error_msg, exc_info=True)
raise ValueError(error_msg) from e
# [CONTRACT] Создание ZIP-архива
def create_dashboard_export(
zip_path: Union[str, Path],
source_paths: List[Union[str, Path]],
exclude_extensions: Optional[List[str]] = None,
validate_source: bool = False,
logger: Optional[SupersetLogger] = None
) -> bool:
"""[OPERATION] Упаковка дашборда в архив
@pre:
- source_paths должны существовать
- Должны быть права на запись в zip_path
@post:
- Возвращает True если создание успешно
- Сохраняет оригинальную структуру папок
"""
logger = logger or SupersetLogger(name="fileio", console=False)
logger.info(f"Упаковка дашбордов: {source_paths} -> {zip_path}")
try:
exclude_ext = [ext.lower() for ext in exclude_extensions] if exclude_extensions else []
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
for path in source_paths:
path = Path(path)
if not path.exists():
raise FileNotFoundError(f"Путь не найден: {path}")
for item in path.rglob('*'):
if item.is_file() and item.suffix.lower() not in exclude_ext:
arcname = item.relative_to(path.parent)
zipf.write(item, arcname)
logger.debug(f"Добавлен в архив: {arcname}")
logger.info(f"Архив создан: {zip_path}")
return True
except Exception as e:
logger.error(f"[ZIP_CREATION_ERROR] Ошибка: {str(e)}", exc_info=True)
return False
# [UTILITY] Валидация имен файлов
def sanitize_filename(filename: str) -> str:
"""[UTILITY] Очистка имени файла от опасных символов
@post:
- Возвращает безопасное имя файла без спецсимволов
"""
return re.sub(r'[\\/*?:"<>|]', "_", filename).strip()
def get_filename_from_headers(headers: dict) -> Optional[str]:
"""Извлекает имя файла из заголовков HTTP-ответа"""
content_disposition = headers.get("Content-Disposition", "")
# Пытаемся найти имя файла в кавычках
filename_match = re.findall(r'filename="(.+?)"', content_disposition)
if not filename_match:
# Пробуем без кавычек
filename_match = re.findall(r'filename=([^;]+)', content_disposition)
if filename_match:
return filename_match[0].strip('"')
return None
def determine_and_load_yaml_type(file_path):
with open(file_path, 'r') as f:
data = yaml.safe_load(f)
if 'dashboard_title' in data and 'position' in data:
return data, 'dashboard'
elif 'sqlalchemy_uri' in data and 'database_name' in data:
return data, 'database'
elif 'table_name' in data and ('sql' in data or 'columns' in data):
return data, 'dataset'
elif 'slice_name' in data and 'viz_type' in data:
return data, 'chart'
else:
return data, 'unknown'
# [CONTRACT] Управление конфигурациями YAML
def update_yamls(
db_configs: Optional[List[Dict]] = None,
path: str = "dashboards",
regexp_pattern: Optional[LiteralString] = None,
replace_string: Optional[LiteralString] = None,
logger: Optional[SupersetLogger] = None
) -> None:
"""
[OPERATION] Обновление YAML-конфигов
@pre:
- path должен содержать валидные YAML-файлы
- db_configs должен содержать old/new состояния
@post:
- Все YAML-файлы обновлены согласно конфигурациям
- Сохраняется оригинальная структура файлов
Обновляет конфигурации в YAML-файлах баз данных, заменяя старые значения на новые.
Поддерживает два типа замен:
1. Точечную замену значений по ключам из db_config
2. Регулярные выражения для замены текста во всех строковых полях
Параметры:
:db_configs: Список словарей или словарь с параметрами для замены в формате:
{
"old": {старые_ключи: значения_для_поиска},
"new": {новые_ключи: значения_для_замены}
}
Если не указан - используется только замена по регулярным выражениям
:path: Путь к папке с YAML-файлами (по умолчанию "dashboards")
:regexp_pattern: Регулярное выражение для поиска текста (опционально)
:replace_string: Строка для замены найденного текста (используется с regexp_pattern)
:logger: Логгер для записи событий (по умолчанию создается новый)
Логирует:
- Информационные сообщения о начале процесса и успешных обновлениях
- Ошибки обработки отдельных файлов
- Критические ошибки, прерывающие выполнение
Пример использования:
update_yamls(
db_config={
"old": {"host": "old.db.example.com"},
"new": {"host": "new.db.example.com"}
},
regexp_pattern="old\.",
replace_string="new."
)
"""
logger = logger or SupersetLogger(name="fileio", console=False)
logger.info("[YAML_UPDATE] Старт обновления конфигураций")
# Преобразуем единственный конфиг в список для универсальности
if isinstance(db_configs, dict):
db_configs = [db_configs]
elif db_configs is None:
db_configs = []
try:
dir = Path(path)
if not dir.exists() or not dir.is_dir():
raise FileNotFoundError(f"Путь {path} не существует или не является директорией")
yaml_files = dir.rglob("*.yaml")
for file_path in yaml_files:
try:
result = determine_and_load_yaml_type(file_path)
data, yaml_type = result if result else ({}, None)
logger.debug(f"Тип {file_path} - {yaml_type}")
updates = {}
# 1. Обработка замен по ключам из db_config (если нужно использовать только новые значения)
if db_configs:
for config in db_configs:
#Валидация конфига
if config is not None:
if "old" not in config or "new" not in config:
raise ValueError("db_config должен содержать оба раздела 'old' и 'new'")
old_config = config.get("old", {}) # Значения для поиска
new_config = config.get("new", {}) # Значения для замены
if len(old_config) != len(new_config):
raise ValueError(
f"Количество элементов в 'old' ({old_config}) и 'new' ({new_config}) не совпадает"
)
for key in old_config:
if key in data and data[key] == old_config[key]:
new_value = new_config.get(key)
if new_value is not None and new_value != data.get(key):
updates[key] = new_value # Заменяем без проверки старого значения
# 2. Регулярная замена (с исправленной функцией process_value)
if regexp_pattern:
def process_value(value: Any) -> Tuple[bool, Any]:
"""Рекурсивная обработка с флагом замены."""
matched = False
if isinstance(value, str):
new_str = re.sub(regexp_pattern, replace_string, value)
matched = (new_str != value)
return matched, new_str
elif isinstance(value, dict):
new_dict = {}
for k, v in value.items():
sub_matched, sub_val = process_value(v)
new_dict[k] = sub_val
if sub_matched:
matched = True
return matched, new_dict
elif isinstance(value, list):
new_list = []
for item in value:
sub_matched, sub_val = process_value(item)
new_list.append(sub_val)
if sub_matched:
matched = True
return matched, new_list
return False, value # Нет замены для других типов
# Применяем обработку ко всем данным
_, processed_data = process_value(data)
# Собираем обновления только для изменившихся полей
for key in processed_data:
if processed_data[key] != data.get(key):
updates[key] = processed_data[key]
if updates:
logger.info(f"Обновление {file_path}: {updates}")
data.update(updates)
with open(file_path, 'w') as file:
yaml.dump(
data,
file,
default_flow_style=False,
sort_keys=False
)
except yaml.YAMLError as e:
logger.error(f"[YAML_ERROR] Ошибка парсинга {file_path}: {str(e)}")
except Exception as e:
logger.error(f"[YAML_UPDATE_ERROR] Критическая ошибка: {str(e)}", exc_info=True)
raise
def consolidate_archive_folders(root_directory: Path, logger: Optional[SupersetLogger] = None) -> None:
"""
Consolidates dashboard folders under a root directory based on the slug (pattern MM-0080 - two latin letters - hyphen - 4 digits)
and moves the contents to the folder with the latest modification date.
Args:
root_directory (Path): The root directory containing the dashboard folders.
Raises:
TypeError: If root_directory is not a Path object.
ValueError: If root_directory is empty.
[CONTRACT]
@pre: root_directory must be a valid Path object representing an existing directory.
@post: The contents of all folders matching the slug pattern are moved to the folder with the latest modification date for each slug.
@invariant: The slug pattern remains consistent throughout the execution.
@raise: TypeError if root_directory is not a Path, ValueError if root_directory is empty.
"""
# [CONTRACT] Ensure valid input
if not isinstance(root_directory, Path):
raise TypeError("root_directory must be a Path object.")
if not root_directory:
raise ValueError("root_directory cannot be empty.")
logger.debug(f"[DEBUG] Checking root_folder: {root_directory}")
# [SECTION] Define the slug pattern
slug_pattern = re.compile(r"([A-Z]{2}-\d{4})") # Capture the first occurrence of the pattern
# [SECTION] Group folders by slug
dashboards_by_slug: dict[str, list[str]] = {}
for folder_name in glob.glob(os.path.join(root_directory, '*')):
if os.path.isdir(folder_name):
logger.debug(f"[DEBUG] Checking folder: {folder_name}") # Debug log: show the folder name being checked
match = slug_pattern.search(folder_name)
if match:
slug = match.group(1) # Extract the captured group (the slug)
logger.info(f"[INFO] Found slug: {slug} in folder: {folder_name}") #Log when slug is matched
logger.debug(f"[DEBUG] Regex match object: {match}") # Log the complete match object
if slug not in dashboards_by_slug:
dashboards_by_slug[slug] = []
dashboards_by_slug[slug].append(folder_name)
else:
logger.debug(f"[DEBUG] No slug found in folder: {folder_name}") # Debug log: show when slug is not matched
else:
logger.debug(f"[DEBUG] Not a directory: {folder_name}") #debug log for when its not a directory
# [SECTION] Check if any slugs were found
if not dashboards_by_slug:
logger.warning("[WARN] No folders found matching the slug pattern.")
return
# [SECTION] Iterate through each slug group
for slug, folder_list in dashboards_by_slug.items():
# [ACTION] Find the folder with the latest modification date
latest_folder = max(folder_list, key=os.path.getmtime)
logger.info(f"[INFO] Latest folder for slug {slug}: {latest_folder}")
# [SECTION] Move contents of other folders to the latest folder
for folder in folder_list:
if folder != latest_folder:
# [ACTION] Move contents
try:
for item in os.listdir(folder):
s = os.path.join(folder, item)
d = os.path.join(latest_folder, item)
if os.path.isdir(s):
shutil.move(s, d)
else:
shutil.move(s, d)
logger.info(f"[INFO] Moved contents of {folder} to {latest_folder}")
except Exception as e:
logger.error(f"[ERROR] Failed to move contents of {folder} to {latest_folder}: {e}", exc_info=True)
logger.info("[INFO] Dashboard consolidation completed.")
# [COHERENCE_CHECK_PASSED] Function executed successfully and all contracts were met.
def sync_for_git(
source_path: str,
destination_path: str,
dry_run: bool = False,
logger: Optional[SupersetLogger] = None
) -> None:
"""[CONTRACT] Синхронизация контента между директориями с учетом Git
@pre:
- source_path должен существовать и быть директорией
- destination_path должен быть допустимым путем
- Исходная директория должна содержать валидную структуру Superset
@post:
- Полностью заменяет содержимое destination_path (кроме .git)
- Сохраняет оригинальные разрешения файлов
- Логирует все изменения при dry_run=True
@errors:
- ValueError при несоответствии структуры source_path
- RuntimeError при ошибках файловых операций
"""
logger = logger or SupersetLogger(name="fileio", console=False)
logger.info(
"[SYNC_START] Запуск синхронизации",
extra={
"source": source_path,
"destination": destination_path,
"dry_run": dry_run
}
)
try:
# [VALIDATION] Проверка исходной директории
if not validate_directory_structure(source_path, logger):
raise ValueError(f"Invalid source structure: {source_path}")
src_path = Path(source_path)
dst_path = Path(destination_path)
# [PREPARATION] Сбор информации о файлах
source_files = get_file_mapping(src_path)
destination_files = get_file_mapping(dst_path)
# [SYNC OPERATIONS]
operations = {
'copied': 0,
'removed': 0,
'skipped': 0
}
# Копирование/обновление файлов
operations.update(process_copy_operations(
src_path,
dst_path,
source_files,
destination_files,
dry_run,
logger
))
# Удаление устаревших файлов
operations.update(process_cleanup_operations(
dst_path,
source_files,
destination_files,
dry_run,
logger
))
# [RESULT]
logger.info(
"[SYNC_RESULT] Итоги синхронизации",
extra=operations
)
except Exception as e:
error_msg = f"[SYNC_FAILED] Ошибка синхронизации: {str(e)}"
logger.error(error_msg, exc_info=True)
raise RuntimeError(error_msg) from e
# [HELPER] Получение карты файлов
def get_file_mapping(root_path: Path) -> Dict[str, Path]:
"""[UTILITY] Генерация словаря файлов относительно корня
@post:
- Возвращает Dict[relative_path: Path]
- Игнорирует .git директории
"""
file_map = {}
for item in root_path.rglob("*"):
if ".git" in item.parts:
continue
rel_path = item.relative_to(root_path)
file_map[str(rel_path)] = item
return file_map
# [HELPER] Обработка копирования
def process_copy_operations(
src_path: Path,
dst_path: Path,
source_files: Dict[str, Path],
destination_files: Dict[str, Path],
dry_run: bool,
logger: SupersetLogger
) -> Dict[str, int]:
"""[OPERATION] Выполнение операций копирования
@post:
- Возвращает счетчики операций {'copied': X, 'skipped': Y}
- Создает все необходимые поддиректории
"""
counters = {'copied': 0, 'skipped': 0}
for rel_path, src_file in source_files.items():
dst_file = dst_path / rel_path
# Проверка необходимости обновления
if rel_path in destination_files:
if filecmp.cmp(src_file, dst_file, shallow=False):
counters['skipped'] += 1
continue
# Dry-run логирование
if dry_run:
logger.debug(
f"[DRY_RUN] Будет скопирован: {rel_path}",
extra={'operation': 'copy'}
)
continue
# Реальное копирование
try:
dst_file.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(src_file, dst_file)
counters['copied'] += 1
logger.debug(f"Скопирован: {rel_path}")
except Exception as copy_error:
logger.error(
f"[COPY_ERROR] Ошибка копирования {rel_path}: {str(copy_error)}",
exc_info=True
)
raise
return counters
# [HELPER] Обработка удаления
def process_cleanup_operations(
dst_path: Path,
source_files: Dict[str, Path],
destination_files: Dict[str, Path],
dry_run: bool,
logger: SupersetLogger
) -> Dict[str, int]:
"""[OPERATION] Удаление устаревших файлов
@post:
- Возвращает счетчики {'removed': X}
- Гарантированно не удаляет .git
"""
counters = {'removed': 0}
files_to_delete = set(destination_files.keys()) - set(source_files.keys())
git_dir = dst_path / ".git"
for rel_path in files_to_delete:
target = dst_path / rel_path
# Защита .git
try:
if git_dir in target.parents or target == git_dir:
logger.debug(f"Сохранен .git: {target}")
continue
except ValueError: # Для случаев некорректных путей
continue
# Dry-run логирование
if dry_run:
logger.debug(
f"[DRY_RUN] Будет удален: {rel_path}",
extra={'operation': 'delete'}
)
continue
# Реальное удаление
try:
if target.is_file():
target.unlink()
elif target.is_dir():
shutil.rmtree(target)
counters['removed'] += 1
logger.debug(f"Удален: {rel_path}")
except Exception as remove_error:
logger.error(
f"[REMOVE_ERROR] Ошибка удаления {target}: {str(remove_error)}",
exc_info=True
)
raise
return counters