Files
ss-tools/superset_tool/utils/fileio.py

433 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import re
import zipfile
from pathlib import Path
from typing import Optional, Tuple, Dict, Any
import datetime
import shutil
import yaml
import tempfile
import os
from contextlib import contextmanager
from ..utils.logger import SupersetLogger
@contextmanager
def create_temp_file(
content: bytes,
suffix: str = ".zip",
logger: Optional[SupersetLogger] = None
):
"""Контекстный менеджер для создания временных файлов с логированием"""
logger = logger or SupersetLogger(name="fileio", console=False)
try:
logger.debug(f"Создание временного файла с суффиксом {suffix}")
with tempfile.NamedTemporaryFile(suffix=suffix, delete=False) as tmp:
tmp.write(content)
tmp.flush()
yield Path(tmp.name)
except Exception as e:
logger.error(
f"Ошибка создания временного файла: {str(e)}", exc_info=True)
raise
finally:
if Path(tmp.name).exists():
Path(tmp.name).unlink()
logger.debug(f"Временный файл {tmp.name} удален")
def save_and_unpack_dashboard(
zip_content: bytes,
output_dir: str = "dashboards",
unpack: bool = False,
original_filename: Optional[str] = None,
logger: Optional[SupersetLogger] = None
) -> Tuple[Path, Optional[Path]]:
"""Сохраняет и распаковывает дашборд с логированием"""
logger = logger or SupersetLogger(name="fileio", console=False)
logger.info(f"Старт обработки дашборда. Распаковка: {unpack}")
try:
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
logger.debug(f"Директория {output_path} создана/проверена")
zip_name = sanitize_filename(
original_filename) if original_filename else None
if not zip_name:
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
zip_name = f"dashboard_export_{timestamp}.zip"
logger.debug(f"Сгенерировано имя файла: {zip_name}")
zip_path = output_path / zip_name
logger.info(f"Сохранение дашборда в: {zip_path}")
with open(zip_path, "wb") as f:
f.write(zip_content)
logger.info(f"Дашборд успешно сохранен: {zip_path}")
if unpack:
extract_path = output_path
logger.debug(f"Начало распаковки в: {extract_path}")
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
zip_ref.extractall(extract_path)
logger.info(f"Дашборд распакован в: {extract_path}")
return zip_path, extract_path
return zip_path, None
except Exception as e:
logger.error(f"Ошибка обработки дашборда: {str(e)}", exc_info=True)
raise RuntimeError(f"Failed to unpack dashboard: {str(e)}") from e
def create_dashboard_export(zip_name, source_paths,
exclude_extensions=None,
compress_type=zipfile.ZIP_DEFLATED,
logger: Optional[SupersetLogger] = None):
"""
Создает ZIP-архив с сохранением оригинальной структуры директорий
Параметры:
zip_name (str): Имя создаваемого ZIP-архива
source_paths (list): Список путей для добавления в архив
exclude_extensions (list): Расширения файлов для исключения (например, ['.tmp', '.log'])
compress_type: Тип сжатия (по умолчанию ZIP_DEFLATED)
"""
logger = logger or SupersetLogger(name="fileio", console=False)
logger.info(f"Упаковываем дашборд {source_paths} в {zip_name}")
try:
exclude_ext = [ext.lower() for ext in exclude_extensions] if exclude_extensions else []
# Проверка существования исходных путей
for path in source_paths:
if not os.path.exists(path):
raise FileNotFoundError(f"Путь не найден: {path}")
# Сбор всех файлов с их базовыми путями
files_with_base = []
for path in source_paths:
abs_path = os.path.abspath(path)
if os.path.isfile(abs_path):
# Для файла: базовый путь = директория файла
base_path = os.path.dirname(abs_path)
files_with_base.append((abs_path, base_path))
elif os.path.isdir(abs_path):
# Для директории: базовый путь = сама директория
base_path = abs_path
for root, _, files in os.walk(abs_path):
for file in files:
full_path = os.path.join(root, file)
files_with_base.append((full_path, base_path))
# Фильтрация по расширениям
if exclude_ext:
files_with_base = [
(f, b) for f, b in files_with_base
if os.path.splitext(f)[1].lower() not in exclude_ext
]
# Создание архива
with zipfile.ZipFile(zip_name, 'w', compress_type) as zipf:
for file, base_path in files_with_base:
# Вычисляем относительный путь от base_path
rel_path = os.path.relpath(file, start=base_path)
arcname = os.path.join(Path(zip_name).stem, rel_path)
zipf.write(file, arcname=arcname)
logger.debug(f"Добавлен {arcname}")
logger.info(f"Архив создан {zip_name}")
return True
except Exception as e:
logger.error(f"\nОшибка: {str(e)}")
return False
def get_filename_from_headers(headers: dict) -> Optional[str]:
"""Извлекает имя файла из заголовков HTTP-ответа"""
content_disposition = headers.get("Content-Disposition", "")
# Пытаемся найти имя файла в кавычках
filename_match = re.findall(r'filename="(.+?)"', content_disposition)
if not filename_match:
# Пробуем без кавычек
filename_match = re.findall(r'filename=([^;]+)', content_disposition)
if filename_match:
return filename_match[0].strip('"')
return None
def sanitize_filename(filename: str) -> str:
"""Очищает имя файла от потенциально опасных символов"""
return re.sub(r'[\\/*?:"<>|]', "_", filename).strip()
def archive_exports(
output_dir: str,
daily_retention: int = 7,
weekly_retention: int = 4,
monthly_retention: int = 12,
yearly_retention: Optional[int] = None,
logger: Optional[SupersetLogger] = None
):
"""Управление историей экспортов по политике GFS (Grandfather-Father-Son)
Параметры:
:daily_retention:
:weekly_retention:
:monthly_retention:
:yearly_retention: Optional[int]
Извлекает даты из стандартного суперсетовсого архива вида, либо берет дату изменения архива
dashboard_export_20250326T121517.zip"""
logger = logger or SupersetLogger(name="fileio", console=False)
logger.info(f"Старт очистки архивов в {output_dir}")
export_dir = Path(output_dir)
files_with_dates = []
# Собираем файлы с их датами
for file in export_dir.glob("*.zip"):
# Извлекаем временную метку из имени файла
try:
# Разбиваем имя файла по шаблону: dashboard_export_YYYYMMDDTHHMMSS.zip
timestamp_str = file.stem.split('_')[-1]
date_str = timestamp_str.split('T')[0] # Отделяем дату от времени
date = datetime.datetime.strptime(date_str, "%Y%m%d").date()
except (ValueError, IndexError):
# Если не удалось распарсить - используем дату изменения файла
mtime = file.stat().st_mtime
date = datetime.datetime.fromtimestamp(mtime).date()
logger.warning(f"Использована дата модификации для {file.name}")
files_with_dates.append((file, date))
try:
# Сортируем файлы по дате (новые сначала)
files_with_dates.sort(key=lambda x: x[1], reverse=True)
# Создаем группы для разных уровней резервирования
daily_groups = {}
weekly_groups = {}
monthly_groups = {}
yearly_groups = {}
for file, date in files_with_dates:
# Группировка по дням
daily_groups.setdefault(date, file)
# Группировка по неделям
year, week, _ = date.isocalendar()
weekly_groups.setdefault((year, week), file)
# Группировка по месяцам
monthly_groups.setdefault((date.year, date.month), file)
# Группировка по годам
yearly_groups.setdefault(date.year, file)
# Выбираем файлы для сохранения
keep_files = set()
# Daily - последние N дней
sorted_daily = sorted(daily_groups.keys(), reverse=True)[
:daily_retention]
keep_files.update(daily_groups[d] for d in sorted_daily)
# Weekly - последние N недель
sorted_weekly = sorted(weekly_groups.keys(), reverse=True)[
:weekly_retention]
keep_files.update(weekly_groups[w] for w in sorted_weekly)
# Monthly - последние N месяцев
sorted_monthly = sorted(monthly_groups.keys(), reverse=True)[
:monthly_retention]
keep_files.update(monthly_groups[m] for m in sorted_monthly)
# Yearly - все или последние N лет
if yearly_retention is not None:
sorted_yearly = sorted(yearly_groups.keys(), reverse=True)[
:yearly_retention]
keep_files.update(yearly_groups[y] for y in sorted_yearly)
else:
keep_files.update(yearly_groups.values())
# Удаляем неподходящие файлы и директории
for file, _ in files_with_dates:
if file not in keep_files:
file.unlink()
unpacked_dir = export_dir / file.stem
if unpacked_dir.exists():
shutil.rmtree(unpacked_dir)
logger.info(f"Очистка завершена. Сохранено {len(keep_files)} файлов")
except Exception as e:
logger.error(f"Ошибка очистки архивов: {str(e)}", exc_info=True)
raise
def determine_and_load_yaml_type(file_path):
with open(file_path, 'r') as f:
data = yaml.safe_load(f)
if 'dashboard_title' in data and 'position' in data:
return data, 'dashboard'
elif 'sqlalchemy_uri' in data and 'database_name' in data:
return data, 'database'
elif 'table_name' in data and ('sql' in data or 'columns' in data):
return data, 'dataset'
elif 'slice_name' in data and 'viz_type' in data:
return data, 'chart'
else:
return data, 'unknown'
def update_db_yaml(
db_config: Dict = None,
path: str = "dashboards",
logger: Optional[SupersetLogger] = None
) -> None:
"""
Обновляет конфигурации в YAML-файлах баз данных, заменяя старые значения на новые
:param db_config: Словарь с параметрами для замены в формате:
{
"old": {старые_ключи: значения_для_поиска},
"new": {новые_ключи: значения_для_замены}
}
:param path: Путь к папке с YAML-файлами
"""
logger = logger or SupersetLogger(name="fileio", console=False)
logger.info("Старт обновления YAML-конфигов")
try:
db_config = db_config or {}
old_config = db_config.get("old", {}) # Значения для поиска
new_config = db_config.get("new", {}) # Значения для замены
databases_dir = Path(path)
yaml_files = databases_dir.rglob("*.yaml")
for file_path in yaml_files:
try:
result = determine_and_load_yaml_type(file_path)
data, yaml_type = result if result else ({}, None)
logger.debug(f"Тип {file_path} - {yaml_type}")
updates = {}
for key in old_config:
if key in data and data[key] == old_config[key]:
new_value = new_config.get(key)
if new_value is not None and new_value != data.get(key):
updates[key] = new_value
if updates:
logger.info(f"Обновление {file_path}: {updates}")
data.update(updates)
with open(file_path, 'w') as file:
yaml.dump(
data,
file,
default_flow_style=False,
sort_keys=False
)
except Exception as e:
logger.error(f"Ошибка обработки {file_path}: {str(e)}", exc_info=True)
except Exception as e:
logger.error(f"Критическая ошибка обновления: {str(e)}", exc_info=True)
raise
def sync_for_git(
source_path: str,
destination_path: str,
dry_run: bool = False,
logger: Optional[SupersetLogger] = None
) -> None:
"""
Синхронизирует содержимое папки source_path с destination_path.
Перезаписывает файлы в destination_path файлами из source_path.
Удаляет файлы и пустые директории в destination_path, которые отсутствуют в source_path
(исключая папку .git).
:param source_path: Путь к папке с данными (источник)
:param destination_path: Путь к папке назначения
:param dry_run: Режим имитации (не вносит реальных изменений)
:param verbose: Подробный вывод операций
"""
logger = logger or SupersetLogger(name="fileio", console=False)
logger.info("Старт перезаписи целевой папки")
source_files = set()
for root, _, files in os.walk(source_path):
for file in files:
rel_path = os.path.relpath(os.path.join(root, file), source_path)
source_files.add(rel_path)
destination_files = set()
for root, _, files in os.walk(destination_path):
for file in files:
rel_path = os.path.relpath(
os.path.join(root, file), destination_path)
destination_files.add(rel_path)
# Копирование/обновление файлов
for file in source_files:
src = os.path.join(source_path, file)
dst = os.path.join(destination_path, file)
dest_dir = os.path.dirname(dst)
os.makedirs(dest_dir, exist_ok=True)
shutil.copy2(src, dst)
logger.info(f"Копируем: {file}")
# Удаление лишних файлов
files_to_delete = destination_files - source_files
git_dir = Path(destination_path) / ".git"
for file in files_to_delete:
target = Path(destination_path) / file
# Пропускаем .git и его содержимое
try:
if git_dir in target.parents or target == git_dir:
logger.info(f"Пропускаем .git: {target}")
continue
except ValueError:
pass
try:
if target.is_file():
target.unlink()
elif target.is_dir():
shutil.rmtree(target)
except OSError as e:
logger.error(f"Ошибка удаления: {target}: {e}")
# Удаление пустых директорий
git_dir = Path(destination_path) / ".git"
deleted_dirs = set()
# Проходим снизу вверх (от вложенных директорий к корневым)
for root, dirs, files in os.walk(destination_path, topdown=False):
for dir_name in dirs:
dir_path = Path(root) / dir_name
# Пропускаем .git и его поддиректории
try:
if git_dir in dir_path.parents or dir_path == git_dir:
continue
except ValueError:
pass
# Проверяем что директория пуста и не была удалена ранее
if dir_path not in deleted_dirs and not any(dir_path.iterdir()):
try:
dir_path.rmdir()
deleted_dirs.add(dir_path)
logger.info(f"Удаляем пустую директорию: {dir_path}")
except OSError as e:
logger.error(f"Ошибка удаления: {dir_path}: {e}")