import re import zipfile from pathlib import Path from typing import Optional, Tuple, Dict import datetime import shutil import yaml import tempfile import os from contextlib import contextmanager @contextmanager def create_temp_file(content: bytes, suffix: str = ".zip"): try: with tempfile.NamedTemporaryFile(suffix=suffix, delete=False) as tmp: tmp.write(content) tmp.flush() yield Path(tmp.name) finally: if Path(tmp.name).exists(): Path(tmp.name).unlink() def save_and_unpack_dashboard( zip_content: bytes, output_dir: str = "dashboards", unpack: bool = False, original_filename: Optional[str] = None ) -> Tuple[Path, Path]: """Сохраняет и распаковывает дашборд с учетом оригинального имени""" try: output_path = Path(output_dir) output_path.mkdir(parents=True, exist_ok=True) # Генерируем имя файла if original_filename: zip_name = sanitize_filename(original_filename) else: timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") zip_name = f"dashboard_export_{timestamp}.zip" zip_path = output_path / zip_name # Сохраняем ZIP-файл with open(zip_path, "wb") as f: f.write(zip_content) if unpack: # Распаковываем в поддиректорию с именем архива #extract_dir_name = zip_path.stem extract_path = output_path with zipfile.ZipFile(zip_path, 'r') as zip_ref: zip_ref.extractall(extract_path) return zip_path, extract_path return zip_path except Exception as e: raise RuntimeError(f"Failed to unpack dashboard: {str(e)}") from e def get_filename_from_headers(headers: dict) -> Optional[str]: """Извлекает имя файла из заголовков HTTP-ответа""" content_disposition = headers.get("Content-Disposition", "") # Пытаемся найти имя файла в кавычках filename_match = re.findall(r'filename="(.+?)"', content_disposition) if not filename_match: # Пробуем без кавычек filename_match = re.findall(r'filename=([^;]+)', content_disposition) if filename_match: return filename_match[0].strip('"') return None def sanitize_filename(filename: str) -> str: """Очищает имя файла от потенциально опасных символов""" return re.sub(r'[\\/*?:"<>|]', "_", filename).strip() def archive_exports( output_dir: str, daily_retention: int = 7, weekly_retention: int = 4, monthly_retention: int = 12, yearly_retention: Optional[int] = None ): """Управление историей экспортов по политике GFS (Grandfather-Father-Son) Параметры: :daily_retention: :weekly_retention: :monthly_retention: :yearly_retention: Optional[int] Извлекает даты из стандартного суперсетовсого архива вида, либо берет дату изменения архива dashboard_export_20250326T121517.zip""" export_dir = Path(output_dir) files_with_dates = [] # Собираем файлы с их датами for file in export_dir.glob("*.zip"): # Извлекаем временную метку из имени файла try: # Разбиваем имя файла по шаблону: dashboard_export_YYYYMMDDTHHMMSS.zip timestamp_str = file.stem.split('_')[-1] date_str = timestamp_str.split('T')[0] # Отделяем дату от времени date = datetime.datetime.strptime(date_str, "%Y%m%d").date() except (ValueError, IndexError): # Если не удалось распарсить - используем дату изменения файла mtime = file.stat().st_mtime date = datetime.datetime.fromtimestamp(mtime).date() files_with_dates.append((file, date)) # Сортируем файлы по дате (новые сначала) files_with_dates.sort(key=lambda x: x[1], reverse=True) # Создаем группы для разных уровней резервирования daily_groups = {} weekly_groups = {} monthly_groups = {} yearly_groups = {} for file, date in files_with_dates: # Группировка по дням daily_groups.setdefault(date, file) # Группировка по неделям year, week, _ = date.isocalendar() weekly_groups.setdefault((year, week), file) # Группировка по месяцам monthly_groups.setdefault((date.year, date.month), file) # Группировка по годам yearly_groups.setdefault(date.year, file) # Выбираем файлы для сохранения keep_files = set() # Daily - последние N дней sorted_daily = sorted(daily_groups.keys(), reverse=True)[:daily_retention] keep_files.update(daily_groups[d] for d in sorted_daily) # Weekly - последние N недель sorted_weekly = sorted(weekly_groups.keys(), reverse=True)[:weekly_retention] keep_files.update(weekly_groups[w] for w in sorted_weekly) # Monthly - последние N месяцев sorted_monthly = sorted(monthly_groups.keys(), reverse=True)[:monthly_retention] keep_files.update(monthly_groups[m] for m in sorted_monthly) # Yearly - все или последние N лет if yearly_retention is not None: sorted_yearly = sorted(yearly_groups.keys(), reverse=True)[:yearly_retention] keep_files.update(yearly_groups[y] for y in sorted_yearly) else: keep_files.update(yearly_groups.values()) # Удаляем неподходящие файлы и директории for file, _ in files_with_dates: if file not in keep_files: file.unlink() # unpacked_dir = export_dir / file.stem # if unpacked_dir.exists(): # shutil.rmtree(unpacked_dir) def determine_and_load_yaml_type(file_path): with open(file_path, 'r') as f: data = yaml.safe_load(f) if 'dashboard_title' in data and 'position' in data: return data, 'dashboard' elif 'sqlalchemy_uri' in data and 'database_name' in data: return data, 'database' elif 'table_name' in data and ('sql' in data or 'columns' in data): return data, 'dataset' elif 'slice_name' in data and 'viz_type' in data: return data, 'chart' else: return data, 'unknown' def update_db_yaml(db_config: Dict = None, path: str = "dashboards", verbose: bool = False) -> None: """ Обновляет конфигурации в YAML-файлах баз данных согласно переданному словарю замен :param db_config: Словарь с параметрами для замены (ключ: значение) :param path: Путь к папке с YAML-файлами (по умолчанию 'databases') """ # Устанавливаем дефолтные значения db_config = db_config or {} # Ищем все YAML-файлы в указанной папке databases_dir = Path(path) yaml_files = databases_dir.rglob("*.yaml") for file_path in yaml_files: try: # Чтение и загрузка YAML data, yaml_type = determine_and_load_yaml_type(file_path) or {} # Обновляем только существующие ключи updates = { k: v for k, v in db_config.items() if ( k in data # ключ есть в data and data[k] != v # значение отличается and ( # для database — все ключи (yaml_type == "database") or # для dataset — исключаем uuid (yaml_type == "dataset" and k != "uuid") ) ) } # Обновляем data data.update(updates) if verbose and updates: print(f"Обработан {file_path}") print(updates) # Сохранение с сохранением структуры файла with open(file_path, 'w') as file: yaml.dump( data, file, default_flow_style=False, sort_keys=False, allow_unicode=True ) except Exception as e: print(f"Ошибка при обработке файла {file_path}: {str(e)}") def sync_for_git( source_path: str, destination_path: str, dry_run: bool = False, verbose: bool = False ) -> None: """ Синхронизирует содержимое папки source_path с destination_path. Перезаписывает файлы в destination_path файлами из source_path. Удаляет файлы и пустые директории в destination_path, которые отсутствуют в source_path (исключая папку .git). :param source_path: Путь к папке с данными (источник) :param destination_path: Путь к папке назначения :param dry_run: Режим имитации (не вносит реальных изменений) :param verbose: Подробный вывод операций """ source_files = set() for root, _, files in os.walk(source_path): for file in files: rel_path = os.path.relpath(os.path.join(root, file), source_path) source_files.add(rel_path) destination_files = set() for root, _, files in os.walk(destination_path): for file in files: rel_path = os.path.relpath(os.path.join(root, file), destination_path) destination_files.add(rel_path) # Копирование/обновление файлов for file in source_files: src = os.path.join(source_path, file) dst = os.path.join(destination_path, file) dest_dir = os.path.dirname(dst) if verbose: status = "[DRY-RUN] " if dry_run else "" print(f"{status}Creating directory: {dest_dir}") if not dry_run: os.makedirs(dest_dir, exist_ok=True) if verbose: status = "[DRY-RUN] " if dry_run else "" print(f"{status}Copying: {file}") if not dry_run: shutil.copy2(src, dst) # Удаление лишних файлов files_to_delete = destination_files - source_files git_dir = Path(destination_path) / ".git" for file in files_to_delete: target = Path(destination_path) / file # Пропускаем .git и его содержимое try: if git_dir in target.parents or target == git_dir: if verbose: print(f"Skipping .git item: {target}") continue except ValueError: pass if verbose: action = "Would delete" if dry_run else "Deleting" print(f"{action}: {target}") if not dry_run: try: if target.is_file(): target.unlink() elif target.is_dir(): shutil.rmtree(target) except OSError as e: print(f"Error deleting {target}: {e}") # Удаление пустых директорий if verbose: print("\nChecking for empty directories...") git_dir = Path(destination_path) / ".git" deleted_dirs = set() # Проходим снизу вверх (от вложенных директорий к корневым) for root, dirs, files in os.walk(destination_path, topdown=False): for dir_name in dirs: dir_path = Path(root) / dir_name # Пропускаем .git и его поддиректории try: if git_dir in dir_path.parents or dir_path == git_dir: continue except ValueError: pass # Проверяем что директория пуста и не была удалена ранее if dir_path not in deleted_dirs and not any(dir_path.iterdir()): if verbose: status = "[DRY-RUN] " if dry_run else "" print(f"{status}Deleting empty directory: {dir_path}") if not dry_run: try: dir_path.rmdir() deleted_dirs.add(dir_path) except OSError as e: print(f"Error deleting directory {dir_path}: {e}")