diff --git a/search_script.py b/search_script.py
index d34c83b..9bb7914 100644
--- a/search_script.py
+++ b/search_script.py
@@ -129,8 +129,8 @@ def main():
logger = SupersetLogger(level=logging.INFO, console=True)
clients = setup_clients(logger)
- target_client = clients['dev']
- search_query = r"match(r2.path_code, budget_reference.ref_code || '($|(\s))')"
+ target_client = clients['prod']
+ search_query = r".account_balance_by_contract"
results = search_datasets(
client=target_client,
diff --git a/superset_tool/utils/fileio.py b/superset_tool/utils/fileio.py
index 89e3788..4ba3fbf 100644
--- a/superset_tool/utils/fileio.py
+++ b/superset_tool/utils/fileio.py
@@ -152,10 +152,32 @@ def archive_exports(output_dir: str, policy: RetentionPolicy, deduplicate: bool
# @PARAM: policy: RetentionPolicy - Политика хранения.
# @PARAM: logger: SupersetLogger - Логгер.
# @RETURN: set - Множество путей к файлам, которые должны быть сохранены.
-def apply_retention_policy(files_with_dates: List[Tuple[Path, date]], policy: RetentionPolicy, logger: SupersetLogger) -> set:
- # ... (логика применения политики) ...
- return set()
-#
+def apply_retention_policy(files_with_dates: List[Tuple[Path, date]], policy: RetentionPolicy, logger: SupersetLogger) -> set:
+ # Сортируем по дате (от новой к старой)
+ sorted_files = sorted(files_with_dates, key=lambda x: x[1], reverse=True)
+ # Словарь для хранения файлов по категориям
+ daily_files = []
+ weekly_files = []
+ monthly_files = []
+ today = date.today()
+ for file_path, file_date in sorted_files:
+ # Ежедневные
+ if (today - file_date).days < policy.daily:
+ daily_files.append(file_path)
+ # Еженедельные
+ elif (today - file_date).days < policy.weekly * 7:
+ weekly_files.append(file_path)
+ # Ежемесячные
+ elif (today - file_date).days < policy.monthly * 30:
+ monthly_files.append(file_path)
+ # Возвращаем множество файлов, которые нужно сохранить
+ files_to_keep = set()
+ files_to_keep.update(daily_files)
+ files_to_keep.update(weekly_files[:policy.weekly])
+ files_to_keep.update(monthly_files[:policy.monthly])
+ logger.debug("[apply_retention_policy][State] Keeping %d files according to retention policy", len(files_to_keep))
+ return files_to_keep
+#
#
# @PURPOSE: Сохраняет бинарное содержимое ZIP-архива на диск и опционально распаковывает его.
@@ -211,10 +233,44 @@ def update_yamls(db_configs: Optional[List[Dict]] = None, path: str = "dashboard
#
# @PURPOSE: (Helper) Обновляет один YAML файл.
# @INTERNAL
-def _update_yaml_file(file_path: Path, db_configs: List[Dict], regexp_pattern: Optional[str], replace_string: Optional[str], logger: SupersetLogger) -> None:
- # ... (логика обновления одного файла) ...
- pass
-#
+def _update_yaml_file(file_path: Path, db_configs: List[Dict], regexp_pattern: Optional[str], replace_string: Optional[str], logger: SupersetLogger) -> None:
+ # Читаем содержимое файла
+ try:
+ with open(file_path, 'r', encoding='utf-8') as f:
+ content = f.read()
+ except Exception as e:
+ logger.error("[_update_yaml_file][Failure] Failed to read %s: %s", file_path, e)
+ return
+ # Если задан pattern и replace_string, применяем замену по регулярному выражению
+ if regexp_pattern and replace_string:
+ try:
+ new_content = re.sub(regexp_pattern, replace_string, content)
+ if new_content != content:
+ with open(file_path, 'w', encoding='utf-8') as f:
+ f.write(new_content)
+ logger.info("[_update_yaml_file][State] Updated %s using regex pattern", file_path)
+ except Exception as e:
+ logger.error("[_update_yaml_file][Failure] Error applying regex to %s: %s", file_path, e)
+ # Если заданы конфигурации, заменяем значения
+ if db_configs:
+ try:
+ parsed_data = yaml.safe_load(content)
+ if not isinstance(parsed_data, dict):
+ logger.warning("[_update_yaml_file][Warning] YAML content is not a dictionary in %s", file_path)
+ return
+ # Обновляем данные
+ for config in db_configs:
+ for key, value in config.items():
+ if key in parsed_data:
+ old_value = parsed_data[key]
+ parsed_data[key] = value
+ logger.info("[_update_yaml_file][State] Changed %s.%s from %s to %s", file_path, key, old_value, value)
+ # Записываем обратно
+ with open(file_path, 'w', encoding='utf-8') as f:
+ yaml.dump(parsed_data, f, default_flow_style=False, allow_unicode=True)
+ except Exception as e:
+ logger.error("[_update_yaml_file][Failure] Error updating YAML in %s: %s", file_path, e)
+#
#
# @PURPOSE: Создает ZIP-архив из указанных исходных путей.
@@ -267,14 +323,55 @@ def get_filename_from_headers(headers: dict) -> Optional[str]:
# @PARAM: root_directory: Path - Корневая директория для консолидации.
# @PARAM: logger: Optional[SupersetLogger] - Экземпляр логгера.
# @THROW: TypeError, ValueError - Если `root_directory` невалиден.
-def consolidate_archive_folders(root_directory: Path, logger: Optional[SupersetLogger] = None) -> None:
- logger = logger or SupersetLogger(name="fileio")
- assert isinstance(root_directory, Path), "root_directory must be a Path object."
- assert root_directory.is_dir(), "root_directory must be an existing directory."
-
- logger.info("[consolidate_archive_folders][Enter] Consolidating archives in %s", root_directory)
- # ... (логика консолидации) ...
-#
+def consolidate_archive_folders(root_directory: Path, logger: Optional[SupersetLogger] = None) -> None:
+ logger = logger or SupersetLogger(name="fileio")
+ assert isinstance(root_directory, Path), "root_directory must be a Path object."
+ assert root_directory.is_dir(), "root_directory must be an existing directory."
+
+ logger.info("[consolidate_archive_folders][Enter] Consolidating archives in %s", root_directory)
+ # Собираем все директории с архивами
+ archive_dirs = []
+ for item in root_directory.iterdir():
+ if item.is_dir():
+ # Проверяем, есть ли в директории ZIP-архивы
+ if any(item.glob("*.zip")):
+ archive_dirs.append(item)
+ # Группируем по слагу (части имени до первого '_')
+ slug_groups = {}
+ for dir_path in archive_dirs:
+ dir_name = dir_path.name
+ slug = dir_name.split('_')[0] if '_' in dir_name else dir_name
+ if slug not in slug_groups:
+ slug_groups[slug] = []
+ slug_groups[slug].append(dir_path)
+ # Для каждой группы консолидируем
+ for slug, dirs in slug_groups.items():
+ if len(dirs) <= 1:
+ continue
+ # Создаем целевую директорию
+ target_dir = root_directory / slug
+ target_dir.mkdir(exist_ok=True)
+ logger.info("[consolidate_archive_folders][State] Consolidating %d directories under %s", len(dirs), target_dir)
+ # Перемещаем содержимое
+ for source_dir in dirs:
+ if source_dir == target_dir:
+ continue
+ for item in source_dir.iterdir():
+ dest_item = target_dir / item.name
+ try:
+ if item.is_dir():
+ shutil.move(str(item), str(dest_item))
+ else:
+ shutil.move(str(item), str(dest_item))
+ except Exception as e:
+ logger.error("[consolidate_archive_folders][Failure] Failed to move %s to %s: %s", item, dest_item, e)
+ # Удаляем исходную директорию
+ try:
+ source_dir.rmdir()
+ logger.info("[consolidate_archive_folders][State] Removed source directory: %s", source_dir)
+ except Exception as e:
+ logger.error("[consolidate_archive_folders][Failure] Failed to remove source directory %s: %s", source_dir, e)
+#
# --- Конец кода модуля ---