# [MODULE] Dataset Search Utilities # @contract: Функционал для поиска строк в датасетах Superset # @semantic_layers: # 1. Получение списка датасетов через Superset API # 2. Реализация поисковой логики # 3. Форматирование результатов поиска # [IMPORTS] Стандартная библиотека import re from typing import Dict, List, Optional import logging # [IMPORTS] Локальные модули from superset_tool.client import SupersetClient from superset_tool.models import SupersetConfig from superset_tool.utils.logger import SupersetLogger # [IMPORTS] Сторонние библиотеки import keyring # [TYPE-ALIASES] SearchResult = Dict[str, List[Dict[str, str]]] SearchPattern = str def setup_clients(logger: SupersetLogger): # [FUNCTION] setup_clients # [CONTRACT] """ Инициализация клиентов SupersetClient для разных окружений (dev, sbx, prod). @pre: - `logger` является инициализированным экземпляром SupersetLogger. - Учетные данные для каждого окружения доступны через `keyring`. @post: - Возвращает словарь с инициализированными экземплярами SupersetClient для 'dev', 'sbx', 'prod'. - Каждый клиент аутентифицирован. @side_effects: - Выполняет запросы к Superset API для аутентификации. - Использует `keyring` для получения паролей. - Логирует процесс инициализации и ошибки. @raise: - Exception: При ошибке инициализации клиента или аутентификации. """ # [ANCHOR] CLIENTS_INITIALIZATION clients = {} try: # [INFO] Инициализация конфигурации для Dev dev_config = SupersetConfig( base_url="https://devta.bi.dwh.rusal.com/api/v1", auth={ "provider": "db", "username": "migrate_user", "password": keyring.get_password("system", "dev migrate"), "refresh": True }, verify_ssl=False ) # [DEBUG] Dev config created: {dev_config.base_url} # [INFO] Инициализация конфигурации для Prod prod_config = SupersetConfig( base_url="https://prodta.bi.dwh.rusal.com/api/v1", auth={ "provider": "db", "username": "migrate_user", "password": keyring.get_password("system", "prod migrate"), "refresh": True }, verify_ssl=False ) # [DEBUG] Prod config created: {prod_config.base_url} # [INFO] Инициализация конфигурации для Sandbox sandbox_config = SupersetConfig( base_url="https://sandboxta.bi.dwh.rusal.com/api/v1", auth={ "provider": "db", "username": "migrate_user", "password": keyring.get_password("system", "sandbox migrate"), "refresh": True }, verify_ssl=False ) # [DEBUG] Sandbox config created: {sandbox_config.base_url} # [INFO] Создание экземпляров SupersetClient clients['dev'] = SupersetClient(dev_config, logger) clients['sbx'] = SupersetClient(sandbox_config,logger) clients['prod'] = SupersetClient(prod_config,logger) logger.info("[COHERENCE_CHECK_PASSED] Клиенты для окружений успешно инициализированы", extra={"envs": list(clients.keys())}) return clients except Exception as e: logger.error(f"[ERROR] Ошибка инициализации клиентов: {str(e)}", exc_info=True) raise def search_datasets( client: SupersetClient, search_pattern: str, search_fields: List[str] = None, logger: Optional[SupersetLogger] = None ) -> Dict: # [FUNCTION] search_datasets """[CONTRACT] Поиск строк в метаданных датасетов @pre: - `client` должен быть инициализированным SupersetClient - `search_pattern` должен быть валидным regex-шаблоном @post: - Возвращает словарь с результатами поиска в формате: {"dataset_id": [{"field": "table_name", "match": "found_string", "value": "full_field_value"}, ...]}. @raise: - `re.error`: при невалидном regex-шаблоне - `SupersetAPIError`: при ошибках API - `AuthenticationError`: при ошибках аутентификации - `NetworkError`: при сетевых ошибках @side_effects: - Выполняет запросы к Superset API через client.get_datasets(). - Логирует процесс поиска и ошибки. """ logger = logger or SupersetLogger(name="dataset_search") try: # Явно запрашиваем все возможные поля total_count, datasets = client.get_datasets(query={ "columns": ["id", "table_name", "sql", "database", "columns"] }) if not datasets: logger.warning("[SEARCH] Получено 0 датасетов") return None # Определяем какие поля реально существуют available_fields = set(datasets[0].keys()) logger.debug(f"[SEARCH] Фактические поля: {available_fields}") pattern = re.compile(search_pattern, re.IGNORECASE) results = {} for dataset in datasets: dataset_id = dataset['id'] matches = [] # Проверяем все возможные текстовые поля for field in available_fields: value = str(dataset.get(field, "")) if pattern.search(value): matches.append({ "field": field, "match": pattern.search(value).group(), # Сохраняем полное значение поля, не усекаем "value": value }) if matches: results[dataset_id] = matches logger.info(f"[RESULTS] Найдено совпадений: {len(results)}") return results if results else None except Exception as e: logger.error(f"[SEARCH_FAILED] Ошибка: {str(e)}", exc_info=True) raise # [SECTION] Вспомогательные функции def print_search_results(results: Dict, context_lines: int = 3) -> str: # [FUNCTION] print_search_results # [CONTRACT] """ Форматирует результаты поиска для вывода, показывая фрагмент кода с контекстом. @pre: - `results` является словарем в формате {"dataset_id": [{"field": "...", "match": "...", "value": "..."}, ...]}. - `context_lines` является неотрицательным целым числом. @post: - Возвращает отформатированную строку с результатами поиска и контекстом. - Функция не изменяет входные данные. @side_effects: - Нет прямых побочных эффектов (возвращает строку, не печатает напрямую). """ if not results: return "Ничего не найдено" output = [] for dataset_id, matches in results.items(): output.append(f"\nDataset ID: {dataset_id}") for match_info in matches: field = match_info['field'] match_text = match_info['match'] full_value = match_info['value'] output.append(f" Поле: {field}") output.append(f" Совпадение: '{match_text}'") # Находим позицию совпадения в полном тексте match_start_index = full_value.find(match_text) if match_start_index == -1: # Этого не должно произойти, если search_datasets работает правильно, но для надежности output.append(" Не удалось найти совпадение в полном тексте.") continue # Разбиваем текст на строки lines = full_value.splitlines() # Находим номер строки, где находится совпадение current_index = 0 match_line_index = -1 for i, line in enumerate(lines): if current_index <= match_start_index < current_index + len(line) + 1: # +1 for newline character match_line_index = i break current_index += len(line) + 1 # +1 for newline character if match_line_index == -1: output.append(" Не удалось определить строку совпадения.") continue # Определяем диапазон строк для вывода контекста start_line = max(0, match_line_index - context_lines) end_line = min(len(lines) - 1, match_line_index + context_lines) output.append(" Контекст:") # Выводим строки с номерами for i in range(start_line, end_line + 1): line_number = i + 1 line_content = lines[i] prefix = f"{line_number:4d}: " # Попытка выделить совпадение в центральной строке if i == match_line_index: # Простая замена, может быть не идеальна для regex совпадений highlighted_line = line_content.replace(match_text, f">>>{match_text}<<<") output.append(f"{prefix}{highlighted_line}") else: output.append(f"{prefix}{line_content}") output.append("-" * 20) # Разделитель между совпадениями return "\n".join(output) def inspect_datasets(client: SupersetClient): # [FUNCTION] inspect_datasets # [CONTRACT] """ Функция для проверки реальной структуры датасетов. Предназначена в основном для отладки и исследования структуры данных. @pre: - `client` является инициализированным экземпляром SupersetClient. @post: - Выводит информацию о количестве датасетов и структуре первого датасета в консоль. - Функция не изменяет состояние клиента. @side_effects: - Вызовы к Superset API через `client.get_datasets()`. - Вывод в консоль. - Логирует процесс инспекции и ошибки. @raise: - `SupersetAPIError`: при ошибках API - `AuthenticationError`: при ошибках аутентификации - `NetworkError`: при сетевых ошибках """ total, datasets = client.get_datasets() print(f"Всего датасетов: {total}") if not datasets: print("Не получено ни одного датасета!") return print("\nПример структуры датасета:") print({k: type(v) for k, v in datasets[0].items()}) if 'sql' not in datasets[0]: print("\nПоле 'sql' отсутствует. Доступные поля:") print(list(datasets[0].keys())) # [EXAMPLE] Пример использования logger = SupersetLogger( level=logging.INFO,console=True) clients = setup_clients(logger) # Поиск всех таблиц в датасете results = search_datasets( client=clients['dev'], search_pattern=r'dm_view\.account_debt', search_fields=["sql"], logger=logger ) inspect_datasets(clients['dev']) _, datasets = clients['dev'].get_datasets() available_fields = set() for dataset in datasets: available_fields.update(dataset.keys()) logger.debug(f"[DEBUG] Доступные поля в датасетах: {available_fields}") logger.info(f"[RESULT] {print_search_results(results)}")