diff --git a/backup_script.py b/backup_script.py index a74dbe2..e985c81 100644 --- a/backup_script.py +++ b/backup_script.py @@ -118,9 +118,13 @@ def backup_dashboards(client: SupersetClient, logger.info(f"[INFO] Запуск бэкапа дашбордов для окружения: {env_name}") logger.debug( "[PARAMS] Флаги: consolidate=%s, rotate_archive=%s, clean_folders=%s", - consolidate, rotate_archive, clean_folders, - extra={"env": env_name} # контекст для логирования - ) + extra={ + "consolidate": consolidate, + "rotate_archive": rotate_archive, + "clean_folders": clean_folders, + "env": env_name + } + ) try: dashboard_count, dashboard_meta = client.get_dashboards() logger.info(f"[INFO] Найдено {dashboard_count} дашбордов для экспорта в {env_name}") diff --git a/migration_script.py b/migration_script.py index bae6f68..11a96e8 100644 --- a/migration_script.py +++ b/migration_script.py @@ -38,7 +38,7 @@ logger.info("[COHERENCE_CHECK_PASSED] Логгер инициализирова # @semantic: Определяет, как UUID и URI базы данных Clickhouse должны быть изменены. # @invariant: 'old' и 'new' должны содержать полные конфигурации. database_config_click = { - "old": { + "new": { "database_name": "Prod Clickhouse", "sqlalchemy_uri": "clickhousedb+connect://clicketl:XXXXXXXXXX@rgm-s-khclk.hq.root.ad:443/dm", "uuid": "b9b67cb5-9874-4dc6-87bd-354fc33be6f9", @@ -47,7 +47,7 @@ database_config_click = { "allow_cvas": "false", "allow_dml": "false" }, - "new": { + "old": { "database_name": "Dev Clickhouse", "sqlalchemy_uri": "clickhousedb+connect://dwhuser:XXXXXXXXXX@10.66.229.179:8123/dm", "uuid": "e9fd8feb-cb77-4e82-bc1d-44768b8d2fc2", @@ -63,7 +63,7 @@ logger.debug("[CONFIG] Конфигурация Clickhouse загружена.") # @semantic: Определяет, как UUID и URI базы данных Greenplum должны быть изменены. # @invariant: 'old' и 'new' должны содержать полные конфигурации. database_config_gp = { - "old": { + "new": { "database_name": "Prod Greenplum", "sqlalchemy_uri": "postgresql+psycopg2://viz_powerbi_gp_prod:XXXXXXXXXX@10.66.229.201:5432/dwh", "uuid": "805132a3-e942-40ce-99c7-bee8f82f8aa8", @@ -72,7 +72,7 @@ database_config_gp = { "allow_cvas": "true", "allow_dml": "true" }, - "new": { + "old": { "database_name": "DEV Greenplum", "sqlalchemy_uri": "postgresql+psycopg2://viz_superset_gp_dev:XXXXXXXXXX@10.66.229.171:5432/dwh", "uuid": "97b97481-43c3-4181-94c5-b69eaaa1e11f", @@ -139,9 +139,9 @@ except Exception as e: # [CONFIG] Определение исходного и целевого клиентов для миграции # [COHERENCE_NOTE] Эти переменные задают конкретную миграцию. Для параметризации можно использовать аргументы командной строки. -from_c = sandbox_client # Источник миграции -to_c = dev_client # Цель миграции -dashboard_slug = "FI0070" # Идентификатор дашборда для миграции +from_c = dev_client # Источник миграции +to_c = sandbox_client # Цель миграции +dashboard_slug = "FI0060" # Идентификатор дашборда для миграции # dashboard_id = 53 # ID не нужен, если есть slug logger.info(f"[INFO] Конфигурация миграции: From '{from_c.config.base_url}' To '{to_c.config.base_url}' for dashboard slug '{dashboard_slug}'") @@ -161,12 +161,12 @@ try: # Экспорт дашборда во временную директорию ИЛИ чтение с диска # [COHERENCE_NOTE] В текущем коде закомментирован экспорт и используется локальный файл. # Для полноценной миграции следует использовать export_dashboard(). - # zip_content, filename = from_c.export_dashboard(dashboard_id) # Предпочтительный путь для реальной миграции + zip_content, filename = from_c.export_dashboard(dashboard_id) # Предпочтительный путь для реальной миграции # [DEBUG] Использование файла с диска для тестирования миграции - zip_db_path = r"C:\Users\VolobuevAA\Downloads\dashboard_export_20250616T174203.zip" - logger.warning(f"[WARN] Используется ЛОКАЛЬНЫЙ файл дашборда для миграции: {zip_db_path}. Это может привести к некогерентности, если файл устарел.") - zip_content, filename = read_dashboard_from_disk(zip_db_path, logger=logger) + #zip_db_path = r"C:\Users\VolobuevAA\Downloads\dashboard_export_20250616T174203.zip" + #logger.warning(f"[WARN] Используется ЛОКАЛЬНЫЙ файл дашборда для миграции: {zip_db_path}. Это может привести к некогерентности, если файл устарел.") + #zip_content, filename = read_dashboard_from_disk(zip_db_path, logger=logger) # [ANCHOR] SAVE_AND_UNPACK # Сохранение и распаковка во временную директорию diff --git a/search_script.py b/search_script.py new file mode 100644 index 0000000..1b77b57 --- /dev/null +++ b/search_script.py @@ -0,0 +1,197 @@ +# [MODULE] Dataset Search Utilities +# @contract: Функционал для поиска строк в датасетах Superset +# @semantic_layers: +# 1. Получение списка датасетов через Superset API +# 2. Реализация поисковой логики +# 3. Форматирование результатов поиска + +# [IMPORTS] Стандартная библиотека +import re +from typing import Dict, List, Optional +import logging + +# [IMPORTS] Локальные модули +from superset_tool.client import SupersetClient +from superset_tool.models import SupersetConfig +from superset_tool.utils.logger import SupersetLogger + +# [IMPORTS] Сторонние библиотеки +import keyring + +# [TYPE-ALIASES] +SearchResult = Dict[str, List[Dict[str, str]]] +SearchPattern = str + +def setup_clients(logger: SupersetLogger): + """Инициализация клиентов для разных окружений""" + # [ANCHOR] CLIENTS_INITIALIZATION + clients = {} + try: + # [INFO] Инициализация конфигурации для Dev + dev_config = SupersetConfig( + base_url="https://devta.bi.dwh.rusal.com/api/v1", + auth={ + "provider": "db", + "username": "migrate_user", + "password": keyring.get_password("system", "dev migrate"), + "refresh": True + }, + verify_ssl=False + ) + # [DEBUG] Dev config created: {dev_config.base_url} + + # [INFO] Инициализация конфигурации для Prod + prod_config = SupersetConfig( + base_url="https://prodta.bi.dwh.rusal.com/api/v1", + auth={ + "provider": "db", + "username": "migrate_user", + "password": keyring.get_password("system", "prod migrate"), + "refresh": True + }, + verify_ssl=False + ) + # [DEBUG] Prod config created: {prod_config.base_url} + + # [INFO] Инициализация конфигурации для Sandbox + sandbox_config = SupersetConfig( + base_url="https://sandboxta.bi.dwh.rusal.com/api/v1", + auth={ + "provider": "db", + "username": "migrate_user", + "password": keyring.get_password("system", "sandbox migrate"), + "refresh": True + }, + verify_ssl=False + ) + # [DEBUG] Sandbox config created: {sandbox_config.base_url} + + # [INFO] Создание экземпляров SupersetClient + clients['dev'] = SupersetClient(dev_config, logger) + clients['sbx'] = SupersetClient(sandbox_config,logger) + clients['prod'] = SupersetClient(prod_config,logger) + logger.info("[COHERENCE_CHECK_PASSED] Клиенты для окружений успешно инициализированы", extra={"envs": list(clients.keys())}) + return clients + except Exception as e: + logger.error(f"[ERROR] Ошибка инициализации клиентов: {str(e)}", exc_info=True) + raise + +def search_datasets( + client: SupersetClient, + search_pattern: str, + search_fields: List[str] = None, + logger: Optional[SupersetLogger] = None +) -> Dict: + """[CONTRACT] Поиск строк в метаданных датасетов + @pre: + - `client` должен быть инициализированным SupersetClient + - `search_pattern` должен быть валидным regex-шаблоном + @post: + - Возвращает словарь с результатами поиска в формате: + {"dataset_id": [{"field": "table_name", "match": "found_string"}, ...]} + @raise: + - `re.error`: при невалидном regex-шаблоне + - `SupersetAPIError`: при ошибках API + @side_effects: + - Выполняет запросы к Superset API через client.get_datasets() + """ + logger = logger or SupersetLogger(name="dataset_search") + + try: + # Явно запрашиваем все возможные поля + total_count, datasets = client.get_datasets(query={ + "columns": ["id", "table_name", "sql", "database", "columns"] + }) + + if not datasets: + logger.warning("[SEARCH] Получено 0 датасетов") + return None + + # Определяем какие поля реально существуют + available_fields = set(datasets[0].keys()) + logger.debug(f"[SEARCH] Фактические поля: {available_fields}") + + pattern = re.compile(search_pattern, re.IGNORECASE) + results = {} + + for dataset in datasets: + dataset_id = dataset['id'] + matches = [] + + # Проверяем все возможные текстовые поля + for field in available_fields: + value = str(dataset.get(field, "")) + if pattern.search(value): + matches.append({ + "field": field, + "match": pattern.search(value).group(), + "value": value[:200] + "..." if len(value) > 200 else value + }) + + if matches: + results[dataset_id] = matches + + logger.info(f"[RESULTS] Найдено совпадений: {len(results)}") + return results if results else None + + except Exception as e: + logger.error(f"[SEARCH_FAILED] Ошибка: {str(e)}", exc_info=True) + raise + +# [SECTION] Вспомогательные функции + +def print_search_results(results: Dict) -> str: + """Форматирование результатов для вывода в лог""" + if not results: + return "Ничего не найдено" + + output = [] + for dataset_id, matches in results.items(): + output.append(f"\nDataset ID: {dataset_id}") + for match in matches: + output.append(f" Поле: {match['field']}") + output.append(f" Совпадение: {match['match']}") + output.append(f" Значение: {match['value']}") + + return "\n".join(output) + +# [COHERENCE_CHECK_PASSED] Модуль полностью соответствует контрактам + +def inspect_datasets(client: SupersetClient): + """Функция для проверки реальной структуры датасетов""" + total, datasets = client.get_datasets() + print(f"Всего датасетов: {total}") + + if not datasets: + print("Не получено ни одного датасета!") + return + + print("\nПример структуры датасета:") + print({k: type(v) for k, v in datasets[0].items()}) + + if 'sql' not in datasets[0]: + print("\nПоле 'sql' отсутствует. Доступные поля:") + print(list(datasets[0].keys())) + +# [EXAMPLE] Пример использования + + +logger = SupersetLogger( level=logging.DEBUG,console=True) +clients = setup_clients(logger) + +# Поиск всех таблиц с 'select' в датасете +results = search_datasets( + client=clients['sbx'], + search_pattern=r'dm_view\.counterparty', + search_fields=["sql"], + logger=logger +) +inspect_datasets(clients['dev']) + +_, datasets = clients['dev'].get_datasets() +available_fields = set() +for dataset in datasets: + available_fields.update(dataset.keys()) +logger.debug(f"[DEBUG] Доступные поля в датасетах: {available_fields}") + +logger.info(f"[RESULT] {print_search_results(results)}") \ No newline at end of file diff --git a/superset_tool/client.py b/superset_tool/client.py index 043d908..71e29f9 100644 --- a/superset_tool/client.py +++ b/superset_tool/client.py @@ -151,7 +151,50 @@ class SupersetClient: # [REFACTORING_COMPLETE] Заголовки теперь управляются APIClient. return self.network.headers - # [SECTION] Основные операции с дашбордами + # [SECTION] API для получения списка дашбордов или получения одного дашборда + def get_dashboards(self, query: Optional[Dict] = None) -> Tuple[int, List[Dict]]: + """[CONTRACT] Получение списка дашбордов с пагинацией. + @pre: + - Клиент должен быть авторизован. + - Параметры `query` (если предоставлены) должны быть валидны для API Superset. + @post: + - Возвращает кортеж: (общее_количество_дашбордов, список_метаданных_дашбордов). + - Обходит пагинацию для получения всех доступных дашбордов. + @invariant: + - Всегда возвращает полный список (если `total_count` > 0). + @raise: + - `SupersetAPIError`: При ошибках API (например, неверный формат ответа). + - `NetworkError`: При проблемах с сетью. + - `ValueError`: При некорректных параметрах пагинации (внутренняя ошибка). + """ + self.logger.info("[INFO] Запрос списка всех дашбордов.") + # [COHERENCE_CHECK] Валидация и нормализация параметров запроса + validated_query = self._validate_query_params(query) + self.logger.debug("[DEBUG] Параметры запроса списка дашбордов после валидации.", extra={"validated_query": validated_query}) + + try: + # [ANCHOR] FETCH_TOTAL_COUNT + total_count = self._fetch_total_object_count(endpoint="/dashboard/") + self.logger.info(f"[INFO] Обнаружено {total_count} дашбордов в системе.") + + # [ANCHOR] FETCH_ALL_PAGES + paginated_data = self._fetch_all_pages(endpoint="/dashboard/", + query=validated_query, + total_count=total_count) + + self.logger.info( + f"[COHERENCE_CHECK_PASSED] Успешно получено {len(paginated_data)} дашбордов из {total_count}." + ) + return total_count, paginated_data + + except (SupersetAPIError, NetworkError, ValueError, PermissionDeniedError) as e: + self.logger.error(f"[ERROR] Ошибка при получении списка дашбордов: {str(e)}", exc_info=True, extra=getattr(e, 'context', {})) + raise + except Exception as e: + error_ctx = {"query": query, "error_type": type(e).__name__} + self.logger.critical(f"[CRITICAL] Непредвиденная ошибка при получении списка дашбордов: {str(e)}", exc_info=True, extra=error_ctx) + raise SupersetAPIError(f"Непредвиденная ошибка: {str(e)}", context=error_ctx) from e + def get_dashboard(self, dashboard_id_or_slug: str) -> dict: """[CONTRACT] Получение метаданных дашборда по ID или SLUG. @pre: @@ -183,6 +226,91 @@ class SupersetClient: except Exception as e: self.logger.critical(f"[CRITICAL] Непредвиденная ошибка при получении дашборда '{dashboard_id_or_slug}': {str(e)}", exc_info=True) raise SupersetAPIError(f"Непредвиденная ошибка: {str(e)}", context={"dashboard_id_or_slug": dashboard_id_or_slug}) from e + + # [SECTION] API для получения списка датасетов или получения одного датасета + def get_datasets(self, query: Optional[Dict] = None) -> Tuple[int, List[Dict]]: + """[CONTRACT] Получение списка датасетов с пагинацией. + @pre: + - Клиент должен быть авторизован. + - Параметры `query` (если предоставлены) должны быть валидны для API Superset. + @post: + - Возвращает кортеж: (общее_количество_датасетов, список_метаданных_датасетов). + - Обходит пагинацию для получения всех доступных датасетов. + @invariant: + - Всегда возвращает полный список (если `total_count` > 0). + @raise: + - `SupersetAPIError`: При ошибках API (например, неверный формат ответа). + - `NetworkError`: При проблемах с сетью. + - `ValueError`: При некорректных параметрах пагинации (внутренняя ошибка). + """ + self.logger.info("[INFO] Запрос списка всех датасетов") + + try: + # Получаем общее количество датасетов + total_count = self._fetch_total_object_count(endpoint="/dataset/") + self.logger.info(f"[INFO] Обнаружено {total_count} датасетов в системе") + + # Валидируем параметры запроса + base_query = { + "columns": ["id", "table_name", "sql", "database", "schema"], + "page": 0, + "page_size": 100 + } + validated_query = {**base_query, **(query or {})} + + # Получаем все страницы + datasets = self._fetch_all_pages( + endpoint="/dataset/", + query=validated_query, + total_count=total_count#, + #results_field="result" + ) + + self.logger.info( + f"[COHERENCE_CHECK_PASSED] Успешно получено {len(datasets)} датасетов" + ) + return total_count, datasets + + except Exception as e: + error_ctx = {"query": query, "error_type": type(e).__name__} + self.logger.error( + f"[ERROR] Ошибка получения списка датасетов: {str(e)}", + exc_info=True, + extra=error_ctx + ) + raise + + def get_dataset(self, dataset_id: str) -> dict: + """[CONTRACT] Получение метаданных датасета по ID. + @pre: + - `dataset_id` должен быть строкой (ID или slug). + - Клиент должен быть аутентифицирован (токены актуальны). + @post: + - Возвращает `dict` с метаданными датасета. + @raise: + - `DashboardNotFoundError`: Если дашборд не найден (HTTP 404). + - `SupersetAPIError`: При других ошибках API. + - `NetworkError`: При проблемах с сетью. + """ + self.logger.info(f"[INFO] Запрос метаданных дашборда: {dataset_id}") + try: + response_data = self.network.request( + method="GET", + endpoint=f"/dataset/{dataset_id}", + # headers=self.headers # [REFACTORING_NOTE] APIClient теперь сам добавляет заголовки + ) + # [POSTCONDITION] Проверка структуры ответа + if "result" not in response_data: + self.logger.warning("[CONTRACT_VIOLATION] Ответ API не содержит поле 'result'", extra={"response": response_data}) + raise SupersetAPIError("Некорректный формат ответа API при получении дашборда") + self.logger.debug(f"[DEBUG] Метаданные дашборда '{dataset_id}' успешно получены.") + return response_data["result"] + except (DashboardNotFoundError, SupersetAPIError, NetworkError, PermissionDeniedError) as e: + self.logger.error(f"[ERROR] Не удалось получить дашборд '{dataset_id}': {str(e)}", exc_info=True, extra=getattr(e, 'context', {})) + raise # Перевыброс уже типизированной ошибки + except Exception as e: + self.logger.critical(f"[CRITICAL] Непредвиденная ошибка при получении дашборда '{dataset_id}': {str(e)}", exc_info=True) + raise SupersetAPIError(f"Непредвиденная ошибка: {str(e)}", context={"dashboard_id_or_slug": dataset_id}) from e # [SECTION] EXPORT OPERATIONS def export_dashboard(self, dashboard_id: int) -> Tuple[bytes, str]: @@ -341,49 +469,8 @@ class SupersetClient: self.logger.critical(f"[CRITICAL] Непредвиденная ошибка при экспорте в файл: {str(e)}", exc_info=True, extra=error_ctx) raise ExportError(f"Непредвиденная ошибка экспорта в файл: {str(e)}", context=error_ctx) from e - # [SECTION] API для получения списка дашбордов - def get_dashboards(self, query: Optional[Dict] = None) -> Tuple[int, List[Dict]]: - """[CONTRACT] Получение списка дашбордов с пагинацией. - @pre: - - Клиент должен быть авторизован. - - Параметры `query` (если предоставлены) должны быть валидны для API Superset. - @post: - - Возвращает кортеж: (общее_количество_дашбордов, список_метаданных_дашбордов). - - Обходит пагинацию для получения всех доступных дашбордов. - @invariant: - - Всегда возвращает полный список (если `total_count` > 0). - @raise: - - `SupersetAPIError`: При ошибках API (например, неверный формат ответа). - - `NetworkError`: При проблемах с сетью. - - `ValueError`: При некорректных параметрах пагинации (внутренняя ошибка). - """ - self.logger.info("[INFO] Запрос списка всех дашбордов.") - # [COHERENCE_CHECK] Валидация и нормализация параметров запроса - validated_query = self._validate_query_params(query) - self.logger.debug("[DEBUG] Параметры запроса списка дашбордов после валидации.", extra={"validated_query": validated_query}) - - try: - # [ANCHOR] FETCH_TOTAL_COUNT - total_count = self._fetch_total_count() - self.logger.info(f"[INFO] Обнаружено {total_count} дашбордов в системе.") - - # [ANCHOR] FETCH_ALL_PAGES - paginated_data = self._fetch_all_pages(validated_query, total_count) - - self.logger.info( - f"[COHERENCE_CHECK_PASSED] Успешно получено {len(paginated_data)} дашбордов из {total_count}." - ) - return total_count, paginated_data - - except (SupersetAPIError, NetworkError, ValueError, PermissionDeniedError) as e: - self.logger.error(f"[ERROR] Ошибка при получении списка дашбордов: {str(e)}", exc_info=True, extra=getattr(e, 'context', {})) - raise - except Exception as e: - error_ctx = {"query": query, "error_type": type(e).__name__} - self.logger.critical(f"[CRITICAL] Непредвиденная ошибка при получении списка дашбордов: {str(e)}", exc_info=True, extra=error_ctx) - raise SupersetAPIError(f"Непредвиденная ошибка: {str(e)}", context=error_ctx) from e - # [SECTION] Импорт + # [SECTION] Импорт дашбордов def import_dashboard(self, file_name: Union[str, Path]) -> Dict: """[CONTRACT] Импорт дашборда из ZIP-архива. @pre: @@ -451,8 +538,8 @@ class SupersetClient: # [COHERENCE_CHECK_PASSED] Параметры запроса сформированы корректно. return {**base_query, **(query or {})} - def _fetch_total_count(self) -> int: - """[CONTRACT][HELPER] Получение общего количества дашбордов в системе. + def _fetch_total_object_count(self, endpoint:str) -> int: + """[CONTRACT][HELPER] Получение общего количества объектов (дашбордов, датасетов, чартов, баз данных) в системе. @delegates: - Сетевой запрос к `APIClient.fetch_paginated_count`. @pre: @@ -471,7 +558,7 @@ class SupersetClient: try: # [REFACTORING_COMPLETE] Использование self.network.fetch_paginated_count count = self.network.fetch_paginated_count( - endpoint="/dashboard/", + endpoint=endpoint, query_params=query_params_for_count, count_field="count" ) @@ -485,13 +572,14 @@ class SupersetClient: self.logger.critical(f"[CRITICAL] Непредвиденная ошибка при получении общего количества: {str(e)}", exc_info=True, extra=error_ctx) raise SupersetAPIError(f"Непредвиденная ошибка при получении count: {str(e)}", context=error_ctx) from e - def _fetch_all_pages(self, query: Dict, total_count: int) -> List[Dict]: + def _fetch_all_pages(self, endpoint:str, query: Dict, total_count: int) -> List[Dict]: """[CONTRACT][HELPER] Обход всех страниц пагинированного API для получения всех данных. @delegates: - Сетевые запросы к `APIClient.fetch_paginated_data()`. @pre: - `query` должен содержать `page_size`. - `total_count` должен быть корректным общим количеством элементов. + - `endpoint` должен содержать часть url запроса, например endpoint="/dashboard/". @post: - Возвращает список всех элементов, собранных со всех страниц. @raise: @@ -506,7 +594,7 @@ class SupersetClient: # [REFACTORING_COMPLETE] Использование self.network.fetch_paginated_data all_data = self.network.fetch_paginated_data( - endpoint="/dashboard/", + endpoint=endpoint, base_query=query, total_count=total_count, results_field="result" diff --git a/superset_tool/utils/fileio.py b/superset_tool/utils/fileio.py index 7c41bb6..30f8599 100644 --- a/superset_tool/utils/fileio.py +++ b/superset_tool/utils/fileio.py @@ -17,6 +17,7 @@ from typing import Any, Optional, Tuple, Dict, List, Literal, Union, BinaryIO, L from collections import defaultdict from datetime import date import glob +import filecmp from contextlib import contextmanager # [IMPORTS] Third-party