# pylint: disable=too-many-arguments,too-many-locals,too-many-statements,too-many-branches,unused-argument,invalid-name,redefined-outer-name """ [MODULE] Dataset Search Utilities @contract: Предоставляет функционал для поиска текстовых паттернов в метаданных датасетов Superset. """ # [IMPORTS] Стандартная библиотека import logging import re from typing import Dict, Optional # [IMPORTS] Third-party from requests.exceptions import RequestException # [IMPORTS] Локальные модули from superset_tool.client import SupersetClient from superset_tool.exceptions import SupersetAPIError from superset_tool.utils.logger import SupersetLogger from superset_tool.utils.init_clients import setup_clients # [ENTITY: Function('search_datasets')] # CONTRACT: # PURPOSE: Выполняет поиск по строковому паттерну в метаданных всех датасетов. # PRECONDITIONS: # - `client` должен быть инициализированным экземпляром `SupersetClient`. # - `search_pattern` должен быть валидной строкой регулярного выражения. # POSTCONDITIONS: # - Возвращает словарь с результатами поиска. def search_datasets( client: SupersetClient, search_pattern: str, logger: Optional[SupersetLogger] = None ) -> Optional[Dict]: logger = logger or SupersetLogger(name="dataset_search") logger.info(f"[STATE][search_datasets][ENTER] Searching for pattern: '{search_pattern}'") try: _, datasets = client.get_datasets(query={ "columns": ["id", "table_name", "sql", "database", "columns"] }) if not datasets: logger.warning("[STATE][search_datasets][EMPTY] No datasets found.") return None pattern = re.compile(search_pattern, re.IGNORECASE) results = {} available_fields = set(datasets[0].keys()) for dataset in datasets: dataset_id = dataset.get('id') if not dataset_id: continue matches = [] for field in available_fields: value = str(dataset.get(field, "")) if pattern.search(value): match_obj = pattern.search(value) matches.append({ "field": field, "match": match_obj.group() if match_obj else "", "value": value }) if matches: results[dataset_id] = matches logger.info(f"[STATE][search_datasets][SUCCESS] Found matches in {len(results)} datasets.") return results except re.error as e: logger.error(f"[STATE][search_datasets][FAILURE] Invalid regex pattern: {e}", exc_info=True) raise except (SupersetAPIError, RequestException) as e: logger.critical(f"[STATE][search_datasets][FAILURE] Critical error during search: {e}", exc_info=True) raise # END_FUNCTION_search_datasets # [ENTITY: Function('print_search_results')] # CONTRACT: # PURPOSE: Форматирует результаты поиска для читаемого вывода в консоль. # PRECONDITIONS: # - `results` является словарем, возвращенным `search_datasets`, или `None`. # POSTCONDITIONS: # - Возвращает отформатированную строку с результатами. def print_search_results(results: Optional[Dict], context_lines: int = 3) -> str: if not results: return "Ничего не найдено" output = [] for dataset_id, matches in results.items(): output.append(f"\n--- Dataset ID: {dataset_id} ---") for match_info in matches: field = match_info['field'] match_text = match_info['match'] full_value = match_info['value'] output.append(f" - Поле: {field}") output.append(f" Совпадение: '{match_text}'") lines = full_value.splitlines() if not lines: continue match_line_index = -1 for i, line in enumerate(lines): if match_text in line: match_line_index = i break if match_line_index != -1: start_line = max(0, match_line_index - context_lines) end_line = min(len(lines), match_line_index + context_lines + 1) output.append(" Контекст:") for i in range(start_line, end_line): line_number = i + 1 line_content = lines[i] prefix = f"{line_number:5d}: " if i == match_line_index: highlighted_line = line_content.replace(match_text, f">>>{match_text}<<<") output.append(f" {prefix}{highlighted_line}") else: output.append(f" {prefix}{line_content}") output.append("-" * 25) return "\n".join(output) # END_FUNCTION_print_search_results # [ENTITY: Function('main')] # CONTRACT: # PURPOSE: Основная точка входа скрипта. # PRECONDITIONS: None # POSTCONDITIONS: None def main(): logger = SupersetLogger(level=logging.INFO, console=True) clients = setup_clients(logger) target_client = clients['dev'] search_query = r"match(r2.path_code, budget_reference.ref_code || '($|(\s))')" results = search_datasets( client=target_client, search_pattern=search_query, logger=logger ) report = print_search_results(results) logger.info(f"[STATE][main][SUCCESS] Search finished. Report:\n{report}") # END_FUNCTION_main if __name__ == "__main__": main()