From 4cbee526b8aba77601fb08cd2f42ccbd7f8a8604 Mon Sep 17 00:00:00 2001 From: Volobuev Andrey Date: Fri, 31 Oct 2025 15:23:23 +0300 Subject: [PATCH] fileio functions --- search_script.py | 4 +- superset_tool/utils/fileio.py | 129 +++++++++++++++++++++++++++++----- 2 files changed, 115 insertions(+), 18 deletions(-) diff --git a/search_script.py b/search_script.py index d34c83b..9bb7914 100644 --- a/search_script.py +++ b/search_script.py @@ -129,8 +129,8 @@ def main(): logger = SupersetLogger(level=logging.INFO, console=True) clients = setup_clients(logger) - target_client = clients['dev'] - search_query = r"match(r2.path_code, budget_reference.ref_code || '($|(\s))')" + target_client = clients['prod'] + search_query = r".account_balance_by_contract" results = search_datasets( client=target_client, diff --git a/superset_tool/utils/fileio.py b/superset_tool/utils/fileio.py index 89e3788..4ba3fbf 100644 --- a/superset_tool/utils/fileio.py +++ b/superset_tool/utils/fileio.py @@ -152,10 +152,32 @@ def archive_exports(output_dir: str, policy: RetentionPolicy, deduplicate: bool # @PARAM: policy: RetentionPolicy - Политика хранения. # @PARAM: logger: SupersetLogger - Логгер. # @RETURN: set - Множество путей к файлам, которые должны быть сохранены. -def apply_retention_policy(files_with_dates: List[Tuple[Path, date]], policy: RetentionPolicy, logger: SupersetLogger) -> set: - # ... (логика применения политики) ... - return set() -# +def apply_retention_policy(files_with_dates: List[Tuple[Path, date]], policy: RetentionPolicy, logger: SupersetLogger) -> set: + # Сортируем по дате (от новой к старой) + sorted_files = sorted(files_with_dates, key=lambda x: x[1], reverse=True) + # Словарь для хранения файлов по категориям + daily_files = [] + weekly_files = [] + monthly_files = [] + today = date.today() + for file_path, file_date in sorted_files: + # Ежедневные + if (today - file_date).days < policy.daily: + daily_files.append(file_path) + # Еженедельные + elif (today - file_date).days < policy.weekly * 7: + weekly_files.append(file_path) + # Ежемесячные + elif (today - file_date).days < policy.monthly * 30: + monthly_files.append(file_path) + # Возвращаем множество файлов, которые нужно сохранить + files_to_keep = set() + files_to_keep.update(daily_files) + files_to_keep.update(weekly_files[:policy.weekly]) + files_to_keep.update(monthly_files[:policy.monthly]) + logger.debug("[apply_retention_policy][State] Keeping %d files according to retention policy", len(files_to_keep)) + return files_to_keep +# # # @PURPOSE: Сохраняет бинарное содержимое ZIP-архива на диск и опционально распаковывает его. @@ -211,10 +233,44 @@ def update_yamls(db_configs: Optional[List[Dict]] = None, path: str = "dashboard # # @PURPOSE: (Helper) Обновляет один YAML файл. # @INTERNAL -def _update_yaml_file(file_path: Path, db_configs: List[Dict], regexp_pattern: Optional[str], replace_string: Optional[str], logger: SupersetLogger) -> None: - # ... (логика обновления одного файла) ... - pass -# +def _update_yaml_file(file_path: Path, db_configs: List[Dict], regexp_pattern: Optional[str], replace_string: Optional[str], logger: SupersetLogger) -> None: + # Читаем содержимое файла + try: + with open(file_path, 'r', encoding='utf-8') as f: + content = f.read() + except Exception as e: + logger.error("[_update_yaml_file][Failure] Failed to read %s: %s", file_path, e) + return + # Если задан pattern и replace_string, применяем замену по регулярному выражению + if regexp_pattern and replace_string: + try: + new_content = re.sub(regexp_pattern, replace_string, content) + if new_content != content: + with open(file_path, 'w', encoding='utf-8') as f: + f.write(new_content) + logger.info("[_update_yaml_file][State] Updated %s using regex pattern", file_path) + except Exception as e: + logger.error("[_update_yaml_file][Failure] Error applying regex to %s: %s", file_path, e) + # Если заданы конфигурации, заменяем значения + if db_configs: + try: + parsed_data = yaml.safe_load(content) + if not isinstance(parsed_data, dict): + logger.warning("[_update_yaml_file][Warning] YAML content is not a dictionary in %s", file_path) + return + # Обновляем данные + for config in db_configs: + for key, value in config.items(): + if key in parsed_data: + old_value = parsed_data[key] + parsed_data[key] = value + logger.info("[_update_yaml_file][State] Changed %s.%s from %s to %s", file_path, key, old_value, value) + # Записываем обратно + with open(file_path, 'w', encoding='utf-8') as f: + yaml.dump(parsed_data, f, default_flow_style=False, allow_unicode=True) + except Exception as e: + logger.error("[_update_yaml_file][Failure] Error updating YAML in %s: %s", file_path, e) +# # # @PURPOSE: Создает ZIP-архив из указанных исходных путей. @@ -267,14 +323,55 @@ def get_filename_from_headers(headers: dict) -> Optional[str]: # @PARAM: root_directory: Path - Корневая директория для консолидации. # @PARAM: logger: Optional[SupersetLogger] - Экземпляр логгера. # @THROW: TypeError, ValueError - Если `root_directory` невалиден. -def consolidate_archive_folders(root_directory: Path, logger: Optional[SupersetLogger] = None) -> None: - logger = logger or SupersetLogger(name="fileio") - assert isinstance(root_directory, Path), "root_directory must be a Path object." - assert root_directory.is_dir(), "root_directory must be an existing directory." - - logger.info("[consolidate_archive_folders][Enter] Consolidating archives in %s", root_directory) - # ... (логика консолидации) ... -# +def consolidate_archive_folders(root_directory: Path, logger: Optional[SupersetLogger] = None) -> None: + logger = logger or SupersetLogger(name="fileio") + assert isinstance(root_directory, Path), "root_directory must be a Path object." + assert root_directory.is_dir(), "root_directory must be an existing directory." + + logger.info("[consolidate_archive_folders][Enter] Consolidating archives in %s", root_directory) + # Собираем все директории с архивами + archive_dirs = [] + for item in root_directory.iterdir(): + if item.is_dir(): + # Проверяем, есть ли в директории ZIP-архивы + if any(item.glob("*.zip")): + archive_dirs.append(item) + # Группируем по слагу (части имени до первого '_') + slug_groups = {} + for dir_path in archive_dirs: + dir_name = dir_path.name + slug = dir_name.split('_')[0] if '_' in dir_name else dir_name + if slug not in slug_groups: + slug_groups[slug] = [] + slug_groups[slug].append(dir_path) + # Для каждой группы консолидируем + for slug, dirs in slug_groups.items(): + if len(dirs) <= 1: + continue + # Создаем целевую директорию + target_dir = root_directory / slug + target_dir.mkdir(exist_ok=True) + logger.info("[consolidate_archive_folders][State] Consolidating %d directories under %s", len(dirs), target_dir) + # Перемещаем содержимое + for source_dir in dirs: + if source_dir == target_dir: + continue + for item in source_dir.iterdir(): + dest_item = target_dir / item.name + try: + if item.is_dir(): + shutil.move(str(item), str(dest_item)) + else: + shutil.move(str(item), str(dest_item)) + except Exception as e: + logger.error("[consolidate_archive_folders][Failure] Failed to move %s to %s: %s", item, dest_item, e) + # Удаляем исходную директорию + try: + source_dir.rmdir() + logger.info("[consolidate_archive_folders][State] Removed source directory: %s", source_dir) + except Exception as e: + logger.error("[consolidate_archive_folders][Failure] Failed to remove source directory %s: %s", source_dir, e) +# # --- Конец кода модуля ---