# -*- coding: utf-8 -*- # pylint: disable=too-many-arguments,too-many-locals,too-many-statements,too-many-branches,unused-argument """ [MODULE] File Operations Manager @contract: Предоставляет набор утилит для управления файловыми операциями. """ # [IMPORTS] Core import os import re import zipfile from pathlib import Path from typing import Any, Optional, Tuple, Dict, List, Union, LiteralString from contextlib import contextmanager import tempfile from datetime import date, datetime import glob import shutil import zlib from dataclasses import dataclass # [IMPORTS] Third-party import yaml # [IMPORTS] Local from superset_tool.exceptions import InvalidZipFormatError from superset_tool.utils.logger import SupersetLogger # [CONSTANTS] ALLOWED_FOLDERS = {'databases', 'datasets', 'charts', 'dashboards'} # CONTRACT: # PURPOSE: Контекстный менеджер для создания временного файла или директории, гарантирующий их удаление после использования. # PRECONDITIONS: # - `suffix` должен быть строкой, представляющей расширение файла или `.dir` для директории. # - `mode` должен быть валидным режимом для записи в файл (например, 'wb' для бинарного). # POSTCONDITIONS: # - Создает временный ресурс (файл или директорию). # - Возвращает объект `Path` к созданному ресурсу. # - Автоматически удаляет ресурс при выходе из контекста `with`. # PARAMETERS: # - content: Optional[bytes] - Бинарное содержимое для записи во временный файл. # - suffix: str - Суффикс для ресурса. Если `.dir`, создается директория. # - mode: str - Режим записи в файл. # - logger: Optional[SupersetLogger] - Экземпляр логгера. # YIELDS: Path - Путь к временному ресурсу. # EXCEPTIONS: # - Перехватывает и логирует `Exception`, затем выбрасывает его дальше. @contextmanager def create_temp_file( content: Optional[bytes] = None, suffix: str = ".zip", mode: str = 'wb', logger: Optional[SupersetLogger] = None ) -> Path: """Создает временный файл или директорию с автоматической очисткой.""" logger = logger or SupersetLogger(name="fileio", console=False) temp_resource_path = None is_dir = suffix.startswith('.dir') try: if is_dir: with tempfile.TemporaryDirectory(suffix=suffix) as temp_dir: temp_resource_path = Path(temp_dir) logger.debug(f"[DEBUG][TEMP_RESOURCE] Создана временная директория: {temp_resource_path}") yield temp_resource_path else: with tempfile.NamedTemporaryFile(suffix=suffix, mode=mode, delete=False) as tmp: temp_resource_path = Path(tmp.name) if content: tmp.write(content) tmp.flush() logger.debug(f"[DEBUG][TEMP_RESOURCE] Создан временный файл: {temp_resource_path}") yield temp_resource_path except IOError as e: logger.error(f"[STATE][TEMP_RESOURCE] Ошибка создания временного ресурса: {str(e)}", exc_info=True) raise finally: if temp_resource_path and temp_resource_path.exists(): if is_dir: shutil.rmtree(temp_resource_path, ignore_errors=True) logger.debug(f"[DEBUG][TEMP_CLEANUP] Удалена временная директория: {temp_resource_path}") else: temp_resource_path.unlink(missing_ok=True) logger.debug(f"[DEBUG][TEMP_CLEANUP] Удален временный файл: {temp_resource_path}") # END_FUNCTION_create_temp_file # [SECTION] Directory Management Utilities # CONTRACT: # PURPOSE: Рекурсивно удаляет все пустые поддиректории, начиная с указанной корневой директории. # PRECONDITIONS: # - `root_dir` должен быть строкой, представляющей существующий путь к директории. # POSTCONDITIONS: # - Все пустые директории внутри `root_dir` удалены. # - Непустые директории и файлы остаются нетронутыми. # PARAMETERS: # - root_dir: str - Путь к корневой директории для очистки. # - logger: Optional[SupersetLogger] - Экземпляр логгера. # RETURN: int - Количество удаленных директорий. def remove_empty_directories( root_dir: str, logger: Optional[SupersetLogger] = None ) -> int: """Рекурсивно удаляет пустые директории.""" logger = logger or SupersetLogger(name="fileio", console=False) logger.info(f"[STATE][DIR_CLEANUP] Запуск очистки пустых директорий в {root_dir}") removed_count = 0 root_path = Path(root_dir) if not root_path.is_dir(): logger.error(f"[STATE][DIR_NOT_FOUND] Директория не существует или не является директорией: {root_dir}") return 0 for current_dir, _, _ in os.walk(root_path, topdown=False): if not os.listdir(current_dir): try: os.rmdir(current_dir) removed_count += 1 logger.info(f"[STATE][DIR_REMOVED] Удалена пустая директория: {current_dir}") except OSError as e: logger.error(f"[STATE][DIR_REMOVE_FAILED] Ошибка удаления {current_dir}: {str(e)}") logger.info(f"[STATE][DIR_CLEANUP_DONE] Удалено {removed_count} пустых директорий.") return removed_count # END_FUNCTION_remove_empty_directories # [SECTION] File Operations # CONTRACT: # PURPOSE: Читает бинарное содержимое файла с диска. # PRECONDITIONS: # - `file_path` должен быть строкой, представляющей существующий путь к файлу. # POSTCONDITIONS: # - Возвращает кортеж, содержащий бинарное содержимое файла и его имя. # PARAMETERS: # - file_path: str - Путь к файлу. # - logger: Optional[SupersetLogger] - Экземпляр логгера. # RETURN: Tuple[bytes, str] - (содержимое, имя_файла). # EXCEPTIONS: # - `FileNotFoundError`, если файл не найден. def read_dashboard_from_disk( file_path: str, logger: Optional[SupersetLogger] = None ) -> Tuple[bytes, str]: """Читает сохраненный дашборд с диска.""" logger = logger or SupersetLogger(name="fileio", console=False) path = Path(file_path) if not path.is_file(): logger.error(f"[STATE][FILE_NOT_FOUND] Файл не найден: {file_path}") raise FileNotFoundError(f"Файл дашборда не найден: {file_path}") logger.info(f"[STATE][FILE_READ] Чтение файла с диска: {file_path}") content = path.read_bytes() if not content: logger.warning(f"[STATE][FILE_EMPTY] Файл {file_path} пуст.") return content, path.name # END_FUNCTION_read_dashboard_from_disk # [SECTION] Archive Management # CONTRACT: # PURPOSE: Вычисляет контрольную сумму CRC32 для файла. # PRECONDITIONS: # - `file_path` должен быть валидным путем к существующему файлу. # POSTCONDITIONS: # - Возвращает строку с 8-значным шестнадцатеричным представлением CRC32. # PARAMETERS: # - file_path: Path - Путь к файлу. # RETURN: str - Контрольная сумма CRC32. # EXCEPTIONS: # - `FileNotFoundError`, `IOError` при ошибках I/O. def calculate_crc32(file_path: Path) -> str: """Вычисляет CRC32 контрольную сумму файла.""" try: with open(file_path, 'rb') as f: crc32_value = zlib.crc32(f.read()) return f"{crc32_value:08x}" except FileNotFoundError: raise except IOError as e: raise IOError(f"Ошибка вычисления CRC32 для {file_path}: {str(e)}") from e # END_FUNCTION_calculate_crc32 @dataclass class RetentionPolicy: """Политика хранения для архивов.""" daily: int = 7 weekly: int = 4 monthly: int = 12 # CONTRACT: # PURPOSE: Управляет архивом экспортированных дашбордов, применяя политику хранения (ротацию) и дедупликацию. # PRECONDITIONS: # - `output_dir` должен быть существующей директорией. # POSTCONDITIONS: # - Устаревшие архивы удалены в соответствии с политикой. # - Дубликаты файлов (если `deduplicate=True`) удалены. # PARAMETERS: # - output_dir: str - Директория с архивами. # - policy: RetentionPolicy - Политика хранения. # - deduplicate: bool - Флаг для включения удаления дубликатов по CRC32. # - logger: Optional[SupersetLogger] - Экземпляр логгера. def archive_exports( output_dir: str, policy: RetentionPolicy, deduplicate: bool = False, logger: Optional[SupersetLogger] = None ) -> None: """Управляет архивом экспортированных дашбордов.""" logger = logger or SupersetLogger(name="fileio", console=False) output_path = Path(output_dir) if not output_path.is_dir(): logger.warning(f"[WARN][ARCHIVE] Директория архива не найдена: {output_dir}") return logger.info(f"[INFO][ARCHIVE] Запуск управления архивом в {output_dir}") # 1. Дедупликация if deduplicate: checksums = {} duplicates_removed = 0 for file_path in output_path.glob('*.zip'): try: crc32 = calculate_crc32(file_path) if crc32 in checksums: logger.info(f"[INFO][DEDUPLICATE] Найден дубликат: {file_path} (CRC32: {crc32}). Удаление.") file_path.unlink() duplicates_removed += 1 else: checksums[crc32] = file_path except (IOError, FileNotFoundError) as e: logger.error(f"[ERROR][DEDUPLICATE] Ошибка обработки файла {file_path}: {e}") logger.info(f"[INFO][DEDUPLICATE] Удалено дубликатов: {duplicates_removed}") # 2. Политика хранения try: files_with_dates = [] for file_path in output_path.glob('*.zip'): try: # Извлекаем дату из имени файла, например 'dashboard_export_20231027_103000.zip' match = re.search(r'(\d{8})', file_path.name) if match: file_date = datetime.strptime(match.group(1), "%Y%m%d").date() files_with_dates.append((file_path, file_date)) except (ValueError, IndexError) as e: logger.warning(f"[WARN][RETENTION] Не удалось извлечь дату из имени файла {file_path.name}: {e}") if not files_with_dates: logger.info("[INFO][RETENTION] Не найдено файлов для применения политики хранения.") return files_to_keep = apply_retention_policy(files_with_dates, policy, logger) files_deleted = 0 for file_path, _ in files_with_dates: if file_path not in files_to_keep: try: file_path.unlink() logger.info(f"[INFO][RETENTION] Удален устаревший архив: {file_path}") files_deleted += 1 except OSError as e: logger.error(f"[ERROR][RETENTION] Не удалось удалить файл {file_path}: {e}") logger.info(f"[INFO][RETENTION] Политика хранения применена. Удалено файлов: {files_deleted}.") except Exception as e: logger.error(f"[CRITICAL][ARCHIVE] Критическая ошибка при управлении архивом: {e}", exc_info=True) # END_FUNCTION_archive_exports # CONTRACT: # PURPOSE: (HELPER) Применяет политику хранения к списку файлов с датами. # PRECONDITIONS: # - `files_with_dates` - список кортежей (Path, date). # POSTCONDITIONS: # - Возвращает множество объектов `Path`, которые должны быть сохранены. # PARAMETERS: # - files_with_dates: List[Tuple[Path, date]] - Список файлов. # - policy: RetentionPolicy - Политика хранения. # - logger: SupersetLogger - Логгер. # RETURN: set - Множество файлов для сохранения. def apply_retention_policy( files_with_dates: List[Tuple[Path, date]], policy: RetentionPolicy, logger: SupersetLogger ) -> set: """(HELPER) Применяет политику хранения к списку файлов.""" if not files_with_dates: return set() today = date.today() files_to_keep = set() # Сортируем файлы от новых к старым files_with_dates.sort(key=lambda x: x[1], reverse=True) # Группируем по дням, неделям, месяцам daily_backups = {} weekly_backups = {} monthly_backups = {} for file_path, file_date in files_with_dates: # Daily if (today - file_date).days < policy.daily: if file_date not in daily_backups: daily_backups[file_date] = file_path # Weekly week_key = file_date.isocalendar()[:2] # (year, week) if week_key not in weekly_backups: weekly_backups[week_key] = file_path # Monthly month_key = (file_date.year, file_date.month) if month_key not in monthly_backups: monthly_backups[month_key] = file_path # Собираем файлы для сохранения, применяя лимиты files_to_keep.update(list(daily_backups.values())[:policy.daily]) files_to_keep.update(list(weekly_backups.values())[:policy.weekly]) files_to_keep.update(list(monthly_backups.values())[:policy.monthly]) logger.info(f"[INFO][RETENTION_POLICY] Файлов для сохранения после применения политики: {len(files_to_keep)}") return files_to_keep # END_FUNCTION_apply_retention_policy # CONTRACT: # PURPOSE: Сохраняет бинарное содержимое ZIP-архива на диск и опционально распаковывает его. # PRECONDITIONS: # - `zip_content` должен быть валидным содержимым ZIP-файла в байтах. # - `output_dir` должен быть путем, доступным для записи. # POSTCONDITIONS: # - ZIP-архив сохранен в `output_dir`. # - Если `unpack=True`, архив распакован в ту же директорию. # - Возвращает пути к созданному ZIP-файлу и, если применимо, к директории с распакованным содержимым. # PARAMETERS: # - zip_content: bytes - Содержимое ZIP-архива. # - output_dir: Union[str, Path] - Директория для сохранения. # - unpack: bool - Флаг, нужно ли распаковывать архив. # - original_filename: Optional[str] - Исходное имя файла. # - logger: Optional[SupersetLogger] - Экземпляр логгера. # RETURN: Tuple[Path, Optional[Path]] - (путь_к_zip, путь_к_распаковке_или_None). # EXCEPTIONS: # - `InvalidZipFormatError` при ошибке формата ZIP. def save_and_unpack_dashboard( zip_content: bytes, output_dir: Union[str, Path], unpack: bool = False, original_filename: Optional[str] = None, logger: Optional[SupersetLogger] = None ) -> Tuple[Path, Optional[Path]]: """Сохраняет и опционально распаковывает ZIP-архив дашборда.""" logger = logger or SupersetLogger(name="fileio", console=False) logger.info(f"[STATE] Старт обработки дашборда. Распаковка: {unpack}") try: output_path = Path(output_dir) output_path.mkdir(parents=True, exist_ok=True) logger.debug(f"[DEBUG] Директория {output_path} создана/проверена") zip_name = sanitize_filename(original_filename) if original_filename else None if not zip_name: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") zip_name = f"dashboard_export_{timestamp}.zip" logger.debug(f"[DEBUG] Сгенерировано имя файла: {zip_name}") zip_path = output_path / zip_name logger.info(f"[STATE] Сохранение дашборда в: {zip_path}") with open(zip_path, "wb") as f: f.write(zip_content) if unpack: with zipfile.ZipFile(zip_path, 'r') as zip_ref: zip_ref.extractall(output_path) logger.info(f"[STATE] Дашборд распакован в: {output_path}") return zip_path, output_path return zip_path, None except zipfile.BadZipFile as e: logger.error(f"[STATE][ZIP_ERROR] Невалидный ZIP-архив: {str(e)}") raise InvalidZipFormatError(f"Invalid ZIP file: {str(e)}") from e except Exception as e: logger.error(f"[STATE][UNPACK_ERROR] Ошибка обработки: {str(e)}", exc_info=True) raise # END_FUNCTION_save_and_unpack_dashboard # CONTRACT: # PURPOSE: (HELPER) Рекурсивно обрабатывает значения в YAML-структуре, применяя замену по регулярному выражению. # PRECONDITIONS: `value` может быть строкой, словарем или списком. # POSTCONDITIONS: Возвращает кортеж с флагом о том, было ли изменение, и новым значением. # PARAMETERS: # - name: value, type: Any, description: Значение для обработки. # - name: regexp_pattern, type: str, description: Паттерн для поиска. # - name: replace_string, type: str, description: Строка для замены. # RETURN: type: Tuple[bool, Any] def _process_yaml_value(value: Any, regexp_pattern: str, replace_string: str) -> Tuple[bool, Any]: matched = False if isinstance(value, str): new_str = re.sub(regexp_pattern, replace_string, value) matched = new_str != value return matched, new_str if isinstance(value, dict): new_dict = {} for k, v in value.items(): sub_matched, sub_val = _process_yaml_value(v, regexp_pattern, replace_string) new_dict[k] = sub_val if sub_matched: matched = True return matched, new_dict if isinstance(value, list): new_list = [] for item in value: sub_matched, sub_val = _process_yaml_value(item, regexp_pattern, replace_string) new_list.append(sub_val) if sub_matched: matched = True return matched, new_list return False, value # END_FUNCTION__process_yaml_value # CONTRACT: # PURPOSE: (HELPER) Обновляет один YAML файл на основе предоставленных конфигураций. # PRECONDITIONS: # - `file_path` - существующий YAML файл. # - `db_configs` - список словарей для замены. # POSTCONDITIONS: Файл обновлен. # PARAMETERS: # - name: file_path, type: Path, description: Путь к YAML файлу. # - name: db_configs, type: Optional[List[Dict]], description: Конфигурации для замены. # - name: regexp_pattern, type: Optional[str], description: Паттерн для поиска. # - name: replace_string, type: Optional[str], description: Строка для замены. # - name: logger, type: SupersetLogger, description: Экземпляр логгера. # RETURN: type: None def _update_yaml_file( file_path: Path, db_configs: Optional[List[Dict]], regexp_pattern: Optional[str], replace_string: Optional[str], logger: SupersetLogger ) -> None: try: with open(file_path, 'r', encoding='utf-8') as f: data = yaml.safe_load(f) updates = {} if db_configs: for config in db_configs: if config is not None: if "old" not in config or "new" not in config: raise ValueError("db_config должен содержать оба раздела 'old' и 'new'") old_config = config.get("old", {}) new_config = config.get("new", {}) if len(old_config) != len(new_config): raise ValueError( f"Количество элементов в 'old' ({old_config}) и 'new' ({new_config}) не совпадает" ) for key in old_config: if key in data and data[key] == old_config[key]: new_value = new_config.get(key) if new_value is not None and new_value != data.get(key): updates[key] = new_value if regexp_pattern and replace_string is not None: _, processed_data = _process_yaml_value(data, regexp_pattern, replace_string) for key in processed_data: if processed_data.get(key) != data.get(key): updates[key] = processed_data[key] if updates: logger.info(f"[STATE] Обновление {file_path}: {updates}") data.update(updates) with open(file_path, 'w', encoding='utf-8') as file: yaml.dump( data, file, default_flow_style=False, sort_keys=False ) except yaml.YAMLError as e: logger.error(f"[STATE][YAML_ERROR] Ошибка парсинга {file_path}: {str(e)}") # END_FUNCTION__update_yaml_file # [ENTITY: Function('update_yamls')] # CONTRACT: # PURPOSE: Обновляет конфигурации в YAML-файлах баз данных, заменяя старые значения на новые, а также применяя замены по регулярному выражению. # SPECIFICATION_LINK: func_update_yamls # PRECONDITIONS: # - `path` должен быть валидным путем к директории с YAML файлами. # - `db_configs` должен быть списком словарей, каждый из которых содержит ключи 'old' и 'new'. # POSTCONDITIONS: Все найденные YAML файлы в директории `path` обновлены в соответствии с предоставленными конфигурациями. # PARAMETERS: # - name: db_configs, type: Optional[List[Dict]], description: Список конфигураций для замены. # - name: path, type: str, description: Путь к директории с YAML файлами. # - name: regexp_pattern, type: Optional[LiteralString], description: Паттерн для поиска. # - name: replace_string, type: Optional[LiteralString], description: Строка для замены. # - name: logger, type: Optional[SupersetLogger], description: Экземпляр логгера. # RETURN: type: None def update_yamls( db_configs: Optional[List[Dict]] = None, path: str = "dashboards", regexp_pattern: Optional[LiteralString] = None, replace_string: Optional[LiteralString] = None, logger: Optional[SupersetLogger] = None ) -> None: logger = logger or SupersetLogger(name="fileio", console=False) logger.info("[STATE][YAML_UPDATE] Старт обновления конфигураций") if isinstance(db_configs, dict): db_configs = [db_configs] elif db_configs is None: db_configs = [] try: dir_path = Path(path) if not dir_path.exists() or not dir_path.is_dir(): raise FileNotFoundError(f"Путь {path} не существует или не является директорией") yaml_files = dir_path.rglob("*.yaml") for file_path in yaml_files: _update_yaml_file(file_path, db_configs, regexp_pattern, replace_string, logger) except (IOError, ValueError) as e: logger.error(f"[STATE][YAML_UPDATE_ERROR] Критическая ошибка: {str(e)}", exc_info=True) raise # END_FUNCTION_update_yamls # [ENTITY: Function('create_dashboard_export')] # CONTRACT: # PURPOSE: Создает ZIP-архив дашборда из указанных исходных путей. # SPECIFICATION_LINK: func_create_dashboard_export # PRECONDITIONS: # - `zip_path` - валидный путь для сохранения архива. # - `source_paths` - список существующих путей к файлам/директориям для архивации. # POSTCONDITIONS: Возвращает `True` в случае успешного создания архива, иначе `False`. # PARAMETERS: # - name: zip_path, type: Union[str, Path], description: Путь для сохранения ZIP архива. # - name: source_paths, type: List[Union[str, Path]], description: Список исходных путей. # - name: exclude_extensions, type: Optional[List[str]], description: Список исключаемых расширений. # - name: logger, type: Optional[SupersetLogger], description: Экземпляр логгера. # RETURN: type: bool def create_dashboard_export( zip_path: Union[str, Path], source_paths: List[Union[str, Path]], exclude_extensions: Optional[List[str]] = None, logger: Optional[SupersetLogger] = None ) -> bool: logger = logger or SupersetLogger(name="fileio", console=False) logger.info(f"[STATE] Упаковка дашбордов: {source_paths} -> {zip_path}") try: exclude_ext = [ext.lower() for ext in exclude_extensions] if exclude_extensions else [] with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: for path in source_paths: path = Path(path) if not path.exists(): raise FileNotFoundError(f"Путь не найден: {path}") for item in path.rglob('*'): if item.is_file() and item.suffix.lower() not in exclude_ext: arcname = item.relative_to(path.parent) zipf.write(item, arcname) logger.debug(f"[DEBUG] Добавлен в архив: {arcname}") logger.info(f"[STATE]архив создан: {zip_path}") return True except (IOError, zipfile.BadZipFile) as e: logger.error(f"[STATE][ZIP_CREATION_ERROR] Ошибка: {str(e)}", exc_info=True) return False # END_FUNCTION_create_dashboard_export # [ENTITY: Function('sanitize_filename')] # CONTRACT: # PURPOSE: Очищает строку, предназначенную для имени файла, от недопустимых символов. # SPECIFICATION_LINK: func_sanitize_filename # PRECONDITIONS: `filename` является строкой. # POSTCONDITIONS: Возвращает строку, безопасную для использования в качестве имени файла. # PARAMETERS: # - name: filename, type: str, description: Исходное имя файла. # RETURN: type: str def sanitize_filename(filename: str) -> str: return re.sub(r'[\\/*?:"<>|]', "_", filename).strip() # END_FUNCTION_sanitize_filename # [ENTITY: Function('get_filename_from_headers')] # CONTRACT: # PURPOSE: Извлекает имя файла из HTTP заголовка 'Content-Disposition'. # SPECIFICATION_LINK: func_get_filename_from_headers # PRECONDITIONS: `headers` - словарь HTTP заголовков. # POSTCONDITIONS: Возвращает имя файла или `None`, если оно не найдено. # PARAMETERS: # - name: headers, type: dict, description: Словарь HTTP заголовков. # RETURN: type: Optional[str] def get_filename_from_headers(headers: dict) -> Optional[str]: content_disposition = headers.get("Content-Disposition", "") filename_match = re.findall(r'filename="(.+?)"', content_disposition) if not filename_match: filename_match = re.findall(r'filename=([^;]+)', content_disposition) if filename_match: return filename_match[0].strip('"') return None # END_FUNCTION_get_filename_from_headers # [ENTITY: Function('consolidate_archive_folders')] # CONTRACT: # PURPOSE: Консолидирует директории архивов дашбордов на основе общего слага в имени. # SPECIFICATION_LINK: func_consolidate_archive_folders # PRECONDITIONS: `root_directory` - существующая директория. # POSTCONDITIONS: Содержимое всех директорий с одинаковым слагом переносится в самую последнюю измененную директорию. # PARAMETERS: # - name: root_directory, type: Path, description: Корневая директория для консолидации. # - name: logger, type: Optional[SupersetLogger], description: Экземпляр логгера. # RETURN: type: None def consolidate_archive_folders(root_directory: Path, logger: Optional[SupersetLogger] = None) -> None: logger = logger or SupersetLogger(name="fileio", console=False) if not isinstance(root_directory, Path): raise TypeError("root_directory must be a Path object.") if not root_directory.is_dir(): raise ValueError("root_directory must be an existing directory.") logger.debug("[DEBUG] Checking root_folder: {root_directory}") slug_pattern = re.compile(r"([A-Z]{2}-\d{4})") dashboards_by_slug: dict[str, list[str]] = {} for folder_name in glob.glob(os.path.join(root_directory, '*')): if os.path.isdir(folder_name): logger.debug(f"[DEBUG] Checking folder: {folder_name}") match = slug_pattern.search(folder_name) if match: slug = match.group(1) logger.info(f"[STATE] Found slug: {slug} in folder: {folder_name}") if slug not in dashboards_by_slug: dashboards_by_slug[slug] = [] dashboards_by_slug[slug].append(folder_name) else: logger.debug(f"[DEBUG] No slug found in folder: {folder_name}") else: logger.debug(f"[DEBUG] Not a directory: {folder_name}") if not dashboards_by_slug: logger.warning("[STATE] No folders found matching the slug pattern.") return for slug, folder_list in dashboards_by_slug.items(): latest_folder = max(folder_list, key=os.path.getmtime) logger.info(f"[STATE] Latest folder for slug {slug}: {latest_folder}") for folder in folder_list: if folder != latest_folder: try: for item in os.listdir(folder): s = os.path.join(folder, item) d = os.path.join(latest_folder, item) shutil.move(s, d) logger.info(f"[STATE] Moved contents of {folder} to {latest_folder}") shutil.rmtree(folder) # Remove empty folder logger.info(f"[STATE] Removed empty folder: {folder}") except (IOError, shutil.Error) as e: logger.error(f"[STATE] Failed to move contents of {folder} to {latest_folder}: {e}", exc_info=True) logger.info("[STATE] Dashboard consolidation completed.") # END_FUNCTION_consolidate_archive_folders # END_MODULE_fileio