diff --git a/backup_script.py b/backup_script.py index e985c81..edd022b 100644 --- a/backup_script.py +++ b/backup_script.py @@ -175,7 +175,7 @@ def backup_dashboards(client: SupersetClient, if rotate_archive: # [ANCHOR] ARCHIVE_OLD_BACKUPS try: - archive_exports(dashboard_dir, logger=logger) + archive_exports(dashboard_dir, logger=logger, deduplicate=True) logger.debug(f"[DEBUG] Старые экспорты для '{dashboard_title}' архивированы.") except Exception as cleanup_error: logger.warning( @@ -282,6 +282,7 @@ def main() -> int: clients['dev'], "DEV", superset_backup_repo, + rotate_archive=True, logger=logger ) @@ -290,6 +291,7 @@ def main() -> int: clients['sbx'], "SBX", superset_backup_repo, + rotate_archive=True, logger=logger ) @@ -298,6 +300,7 @@ def main() -> int: clients['prod'], "PROD", superset_backup_repo, + rotate_archive=True, logger=logger ) diff --git a/migration_script.py b/migration_script.py index 11a96e8..c301918 100644 --- a/migration_script.py +++ b/migration_script.py @@ -38,7 +38,7 @@ logger.info("[COHERENCE_CHECK_PASSED] Логгер инициализирова # @semantic: Определяет, как UUID и URI базы данных Clickhouse должны быть изменены. # @invariant: 'old' и 'new' должны содержать полные конфигурации. database_config_click = { - "new": { + "old": { "database_name": "Prod Clickhouse", "sqlalchemy_uri": "clickhousedb+connect://clicketl:XXXXXXXXXX@rgm-s-khclk.hq.root.ad:443/dm", "uuid": "b9b67cb5-9874-4dc6-87bd-354fc33be6f9", @@ -47,7 +47,7 @@ database_config_click = { "allow_cvas": "false", "allow_dml": "false" }, - "old": { + "new": { "database_name": "Dev Clickhouse", "sqlalchemy_uri": "clickhousedb+connect://dwhuser:XXXXXXXXXX@10.66.229.179:8123/dm", "uuid": "e9fd8feb-cb77-4e82-bc1d-44768b8d2fc2", @@ -63,7 +63,7 @@ logger.debug("[CONFIG] Конфигурация Clickhouse загружена.") # @semantic: Определяет, как UUID и URI базы данных Greenplum должны быть изменены. # @invariant: 'old' и 'new' должны содержать полные конфигурации. database_config_gp = { - "new": { + "old": { "database_name": "Prod Greenplum", "sqlalchemy_uri": "postgresql+psycopg2://viz_powerbi_gp_prod:XXXXXXXXXX@10.66.229.201:5432/dwh", "uuid": "805132a3-e942-40ce-99c7-bee8f82f8aa8", @@ -72,7 +72,7 @@ database_config_gp = { "allow_cvas": "true", "allow_dml": "true" }, - "old": { + "new": { "database_name": "DEV Greenplum", "sqlalchemy_uri": "postgresql+psycopg2://viz_superset_gp_dev:XXXXXXXXXX@10.66.229.171:5432/dwh", "uuid": "97b97481-43c3-4181-94c5-b69eaaa1e11f", @@ -140,7 +140,7 @@ except Exception as e: # [CONFIG] Определение исходного и целевого клиентов для миграции # [COHERENCE_NOTE] Эти переменные задают конкретную миграцию. Для параметризации можно использовать аргументы командной строки. from_c = dev_client # Источник миграции -to_c = sandbox_client # Цель миграции +to_c = dev_client # Цель миграции dashboard_slug = "FI0060" # Идентификатор дашборда для миграции # dashboard_id = 53 # ID не нужен, если есть slug @@ -161,12 +161,12 @@ try: # Экспорт дашборда во временную директорию ИЛИ чтение с диска # [COHERENCE_NOTE] В текущем коде закомментирован экспорт и используется локальный файл. # Для полноценной миграции следует использовать export_dashboard(). - zip_content, filename = from_c.export_dashboard(dashboard_id) # Предпочтительный путь для реальной миграции + #zip_content, filename = from_c.export_dashboard(dashboard_id) # Предпочтительный путь для реальной миграции # [DEBUG] Использование файла с диска для тестирования миграции - #zip_db_path = r"C:\Users\VolobuevAA\Downloads\dashboard_export_20250616T174203.zip" - #logger.warning(f"[WARN] Используется ЛОКАЛЬНЫЙ файл дашборда для миграции: {zip_db_path}. Это может привести к некогерентности, если файл устарел.") - #zip_content, filename = read_dashboard_from_disk(zip_db_path, logger=logger) + zip_db_path = r"C:\Users\VolobuevAA\Downloads\dashboard_export_20250704T082538.zip" + logger.warning(f"[WARN] Используется ЛОКАЛЬНЫЙ файл дашборда для миграции: {zip_db_path}. Это может привести к некогерентности, если файл устарел.") + zip_content, filename = read_dashboard_from_disk(zip_db_path, logger=logger) # [ANCHOR] SAVE_AND_UNPACK # Сохранение и распаковка во временную директорию diff --git a/search_script.py b/search_script.py index 1b77b57..86398b8 100644 --- a/search_script.py +++ b/search_script.py @@ -179,9 +179,9 @@ def inspect_datasets(client: SupersetClient): logger = SupersetLogger( level=logging.DEBUG,console=True) clients = setup_clients(logger) -# Поиск всех таблиц с 'select' в датасете +# Поиск всех таблиц в датасете results = search_datasets( - client=clients['sbx'], + client=clients['dev'], search_pattern=r'dm_view\.counterparty', search_fields=["sql"], logger=logger diff --git a/superset_tool/client.py b/superset_tool/client.py index 71e29f9..0c61069 100644 --- a/superset_tool/client.py +++ b/superset_tool/client.py @@ -414,7 +414,7 @@ class SupersetClient: else: self.logger.debug( "[DEBUG] Имя файла экспорта получено из заголовков.", - extra={"filename": filename, "dashboard_id": dashboard_id} + extra={"header_filename": filename, "dashboard_id": dashboard_id} ) return filename diff --git a/superset_tool/utils/fileio.py b/superset_tool/utils/fileio.py index 30f8599..f5fd332 100644 --- a/superset_tool/utils/fileio.py +++ b/superset_tool/utils/fileio.py @@ -23,6 +23,7 @@ from contextlib import contextmanager # [IMPORTS] Third-party import yaml import shutil +import zlib import tempfile from datetime import datetime @@ -164,11 +165,31 @@ def read_dashboard_from_disk( # [SECTION] Archive Management +def calculate_crc32(file_path: Path) -> str: + """[HELPER] Calculates the CRC32 checksum of a file. + @pre: + - file_path must be a valid path to a file. + @post: + - Returns the CRC32 checksum as a hexadecimal string. + @raise: + - FileNotFoundError: If the file does not exist. + - Exception: For any other file I/O errors. + """ + try: + with open(file_path, 'rb') as f: + crc32_value = zlib.crc32(f.read()) + return hex(crc32_value)[2:].zfill(8) # Convert to hex string, remove "0x", and pad with zeros + except FileNotFoundError: + raise FileNotFoundError(f"File not found: {file_path}") + except Exception as e: + raise Exception(f"Error calculating CRC32 for {file_path}: {str(e)}") + def archive_exports( output_dir: str, daily_retention: int = 7, weekly_retention: int = 4, monthly_retention: int = 12, + deduplicate: bool = False, logger: Optional[SupersetLogger] = None ) -> None: """[CONTRACT] Управление архивом экспортированных дашбордов @@ -178,29 +199,55 @@ def archive_exports( @post: - Сохраняет файлы согласно политике хранения - Удаляет устаревшие архивы - - Сохраняет логическую структуру каталогов + - Логирует все действия + @raise: + - ValueError: Если retention параметры некорректны + - Exception: При любых других ошибках """ logger = logger or SupersetLogger(name="fileio", console=False) - logger.info(f"[ARCHIVE] Старт очистки архивов в {output_dir}") + logger.info(f"[ARCHIVE] Starting archive cleanup in {output_dir}. Deduplication: {deduplicate}") # [VALIDATION] Проверка параметров if not all(isinstance(x, int) and x >= 0 for x in [daily_retention, weekly_retention, monthly_retention]): - raise ValueError("[CONFIG_ERROR] Значения retention должны быть положительными") + raise ValueError("[CONFIG_ERROR] Retention values must be positive integers.") + checksums = {} # Dictionary to store checksums and file paths try: export_dir = Path(output_dir) - files_with_dates = [] - # [PROCESSING] Сбор данных о файлах + if not export_dir.exists(): + logger.error(f"[ARCHIVE_ERROR] Directory does not exist: {export_dir}") + raise FileNotFoundError(f"Directory not found: {export_dir}") + + # [PROCESSING] Сбор информации о файлах + files_with_dates = [] for file in export_dir.glob("*.zip"): try: timestamp_str = file.stem.split('_')[-1].split('T')[0] file_date = datetime.strptime(timestamp_str, "%Y%m%d").date() + logger.debug(f"[DATE_PARSE] Файл {file.name} добавлен к анализу очистки (массив files_with_dates)") except (ValueError, IndexError): file_date = datetime.fromtimestamp(file.stat().st_mtime).date() - logger.warning(f"[DATE_PARSE] Используется дата модификации для {file.name}") + logger.warning(f"[DATE_PARSE] Using modification date for {file.name}") files_with_dates.append((file, file_date)) + + + # [DEDUPLICATION] + if deduplicate: + logger.info("[DEDUPLICATION] Starting checksum-based deduplication.") + for file in files_with_dates: + file_path = file[0] + try: + crc32_checksum = calculate_crc32(file_path) + if crc32_checksum in checksums: + # Duplicate found, delete the older file + logger.warning(f"[DEDUPLICATION] Duplicate found: {file_path}. Deleting.") + file_path.unlink() + else: + checksums[crc32_checksum] = file_path + except Exception as e: + logger.error(f"[DEDUPLICATION_ERROR] Error processing {file_path}: {str(e)}", exc_info=True) # [PROCESSING] Применение политик хранения keep_files = apply_retention_policy( @@ -212,13 +259,20 @@ def archive_exports( ) # [CLEANUP] Удаление устаревших файлов + deleted_count = 0 for file, _ in files_with_dates: if file not in keep_files: - file.unlink(missing_ok=True) - logger.info(f"[FILE_REMOVED] Удален архив: {file.name}") + try: + file.unlink() + deleted_count += 1 + logger.info(f"[FILE_REMOVED] Deleted archive: {file.name}") + except OSError as e: + logger.error(f"[FILE_ERROR] Error deleting {file.name}: {str(e)}", exc_info=True) + + logger.info(f"[ARCHIVE_RESULT] Cleanup completed. Deleted {deleted_count} archives.") except Exception as e: - logger.error(f"[ARCHIVE_ERROR] Ошибка обработки архивов: {str(e)}", exc_info=True) + logger.error(f"[ARCHIVE_ERROR] Critical error during archive cleanup: {str(e)}", exc_info=True) raise def apply_retention_policy(