# [MODULE] Dataset Search Utilities # @contract: Функционал для поиска строк в датасетах Superset # @semantic_layers: # 1. Получение списка датасетов через Superset API # 2. Реализация поисковой логики # 3. Форматирование результатов поиска # [IMPORTS] Стандартная библиотека import re from typing import Dict, List, Optional import logging # [IMPORTS] Локальные модули from superset_tool.client import SupersetClient from superset_tool.models import SupersetConfig from superset_tool.utils.logger import SupersetLogger # [IMPORTS] Сторонние библиотеки import keyring # [TYPE-ALIASES] SearchResult = Dict[str, List[Dict[str, str]]] SearchPattern = str def setup_clients(logger: SupersetLogger): """Инициализация клиентов для разных окружений""" # [ANCHOR] CLIENTS_INITIALIZATION clients = {} try: # [INFO] Инициализация конфигурации для Dev dev_config = SupersetConfig( base_url="https://devta.bi.dwh.rusal.com/api/v1", auth={ "provider": "db", "username": "migrate_user", "password": keyring.get_password("system", "dev migrate"), "refresh": True }, verify_ssl=False ) # [DEBUG] Dev config created: {dev_config.base_url} # [INFO] Инициализация конфигурации для Prod prod_config = SupersetConfig( base_url="https://prodta.bi.dwh.rusal.com/api/v1", auth={ "provider": "db", "username": "migrate_user", "password": keyring.get_password("system", "prod migrate"), "refresh": True }, verify_ssl=False ) # [DEBUG] Prod config created: {prod_config.base_url} # [INFO] Инициализация конфигурации для Sandbox sandbox_config = SupersetConfig( base_url="https://sandboxta.bi.dwh.rusal.com/api/v1", auth={ "provider": "db", "username": "migrate_user", "password": keyring.get_password("system", "sandbox migrate"), "refresh": True }, verify_ssl=False ) # [DEBUG] Sandbox config created: {sandbox_config.base_url} # [INFO] Создание экземпляров SupersetClient clients['dev'] = SupersetClient(dev_config, logger) clients['sbx'] = SupersetClient(sandbox_config,logger) clients['prod'] = SupersetClient(prod_config,logger) logger.info("[COHERENCE_CHECK_PASSED] Клиенты для окружений успешно инициализированы", extra={"envs": list(clients.keys())}) return clients except Exception as e: logger.error(f"[ERROR] Ошибка инициализации клиентов: {str(e)}", exc_info=True) raise def search_datasets( client: SupersetClient, search_pattern: str, search_fields: List[str] = None, logger: Optional[SupersetLogger] = None ) -> Dict: """[CONTRACT] Поиск строк в метаданных датасетов @pre: - `client` должен быть инициализированным SupersetClient - `search_pattern` должен быть валидным regex-шаблоном @post: - Возвращает словарь с результатами поиска в формате: {"dataset_id": [{"field": "table_name", "match": "found_string"}, ...]} @raise: - `re.error`: при невалидном regex-шаблоне - `SupersetAPIError`: при ошибках API @side_effects: - Выполняет запросы к Superset API через client.get_datasets() """ logger = logger or SupersetLogger(name="dataset_search") try: # Явно запрашиваем все возможные поля total_count, datasets = client.get_datasets(query={ "columns": ["id", "table_name", "sql", "database", "columns"] }) if not datasets: logger.warning("[SEARCH] Получено 0 датасетов") return None # Определяем какие поля реально существуют available_fields = set(datasets[0].keys()) logger.debug(f"[SEARCH] Фактические поля: {available_fields}") pattern = re.compile(search_pattern, re.IGNORECASE) results = {} for dataset in datasets: dataset_id = dataset['id'] matches = [] # Проверяем все возможные текстовые поля for field in available_fields: value = str(dataset.get(field, "")) if pattern.search(value): matches.append({ "field": field, "match": pattern.search(value).group(), "value": value[:200] + "..." if len(value) > 200 else value }) if matches: results[dataset_id] = matches logger.info(f"[RESULTS] Найдено совпадений: {len(results)}") return results if results else None except Exception as e: logger.error(f"[SEARCH_FAILED] Ошибка: {str(e)}", exc_info=True) raise # [SECTION] Вспомогательные функции def print_search_results(results: Dict) -> str: """Форматирование результатов для вывода в лог""" if not results: return "Ничего не найдено" output = [] for dataset_id, matches in results.items(): output.append(f"\nDataset ID: {dataset_id}") for match in matches: output.append(f" Поле: {match['field']}") output.append(f" Совпадение: {match['match']}") output.append(f" Значение: {match['value']}") return "\n".join(output) # [COHERENCE_CHECK_PASSED] Модуль полностью соответствует контрактам def inspect_datasets(client: SupersetClient): """Функция для проверки реальной структуры датасетов""" total, datasets = client.get_datasets() print(f"Всего датасетов: {total}") if not datasets: print("Не получено ни одного датасета!") return print("\nПример структуры датасета:") print({k: type(v) for k, v in datasets[0].items()}) if 'sql' not in datasets[0]: print("\nПоле 'sql' отсутствует. Доступные поля:") print(list(datasets[0].keys())) # [EXAMPLE] Пример использования logger = SupersetLogger( level=logging.DEBUG,console=True) clients = setup_clients(logger) # Поиск всех таблиц в датасете results = search_datasets( client=clients['dev'], search_pattern=r'dm_view\.counterparty', search_fields=["sql"], logger=logger ) inspect_datasets(clients['dev']) _, datasets = clients['dev'].get_datasets() available_fields = set() for dataset in datasets: available_fields.update(dataset.keys()) logger.debug(f"[DEBUG] Доступные поля в датасетах: {available_fields}") logger.info(f"[RESULT] {print_search_results(results)}")