# [DEF:search_script:Module] # # @SEMANTICS: search, superset, dataset, regex, file_output # @PURPOSE: Предоставляет утилиты для поиска по текстовым паттернам в метаданных датасетов Superset. # @LAYER: App # @RELATION: DEPENDS_ON -> superset_tool.client # @RELATION: DEPENDS_ON -> superset_tool.utils # @PUBLIC_API: search_datasets, save_results_to_file, print_search_results, main # [SECTION: IMPORTS] import logging import re import os from typing import Dict, Optional from requests.exceptions import RequestException from superset_tool.client import SupersetClient from superset_tool.exceptions import SupersetAPIError from superset_tool.utils.logger import SupersetLogger from superset_tool.utils.init_clients import setup_clients # [/SECTION] # [DEF:search_datasets:Function] # @PURPOSE: Выполняет поиск по строковому паттерну в метаданных всех датасетов. # @PRE: `client` должен быть инициализированным экземпляром `SupersetClient`. # @PRE: `search_pattern` должен быть валидной строкой регулярного выражения. # @POST: Возвращает словарь с результатами поиска, где ключ - ID датасета, значение - список совпадений. # @RELATION: CALLS -> client.get_datasets # @THROW: re.error - Если паттерн регулярного выражения невалиден. # @THROW: SupersetAPIError, RequestException - При критических ошибках API. # @PARAM: client (SupersetClient) - Клиент для доступа к API Superset. # @PARAM: search_pattern (str) - Регулярное выражение для поиска. # @PARAM: logger (Optional[SupersetLogger]) - Инстанс логгера. # @RETURN: Optional[Dict] - Словарь с результатами или None, если ничего не найдено. def search_datasets( client: SupersetClient, search_pattern: str, logger: Optional[SupersetLogger] = None ) -> Optional[Dict]: logger = logger or SupersetLogger(name="dataset_search") logger.info(f"[search_datasets][Enter] Searching for pattern: '{search_pattern}'") try: _, datasets = client.get_datasets(query={"columns": ["id", "table_name", "sql", "database", "columns"]}) if not datasets: logger.warning("[search_datasets][State] No datasets found.") return None pattern = re.compile(search_pattern, re.IGNORECASE) results = {} for dataset in datasets: dataset_id = dataset.get('id') if not dataset_id: continue matches = [] for field, value in dataset.items(): value_str = str(value) if pattern.search(value_str): match_obj = pattern.search(value_str) matches.append({ "field": field, "match": match_obj.group() if match_obj else "", "value": value_str }) if matches: results[dataset_id] = matches logger.info(f"[search_datasets][Success] Found matches in {len(results)} datasets.") return results except re.error as e: logger.error(f"[search_datasets][Failure] Invalid regex pattern: {e}", exc_info=True) raise except (SupersetAPIError, RequestException) as e: logger.critical(f"[search_datasets][Failure] Critical error during search: {e}", exc_info=True) raise # [/DEF:search_datasets] # [DEF:save_results_to_file:Function] # @PURPOSE: Сохраняет результаты поиска в текстовый файл. # @PRE: `results` является словарем, возвращенным `search_datasets`, или `None`. # @PRE: `filename` должен быть допустимым путем к файлу. # @POST: Записывает отформатированные результаты в указанный файл. # @PARAM: results (Optional[Dict]) - Словарь с результатами поиска. # @PARAM: filename (str) - Имя файла для сохранения результатов. # @PARAM: logger (Optional[SupersetLogger]) - Инстанс логгера. # @RETURN: bool - Успешно ли выполнено сохранение. def save_results_to_file(results: Optional[Dict], filename: str, logger: Optional[SupersetLogger] = None) -> bool: logger = logger or SupersetLogger(name="file_writer") logger.info(f"[save_results_to_file][Enter] Saving results to file: {filename}") try: formatted_report = print_search_results(results) with open(filename, 'w', encoding='utf-8') as f: f.write(formatted_report) logger.info(f"[save_results_to_file][Success] Results saved to {filename}") return True except Exception as e: logger.error(f"[save_results_to_file][Failure] Failed to save results to file: {e}", exc_info=True) return False # [/DEF:save_results_to_file] # [DEF:print_search_results:Function] # @PURPOSE: Форматирует результаты поиска для читаемого вывода в консоль. # @PRE: `results` является словарем, возвращенным `search_datasets`, или `None`. # @POST: Возвращает отформатированную строку с результатами. # @PARAM: results (Optional[Dict]) - Словарь с результатами поиска. # @PARAM: context_lines (int) - Количество строк контекста для вывода до и после совпадения. # @RETURN: str - Отформатированный отчет. def print_search_results(results: Optional[Dict], context_lines: int = 3) -> str: if not results: return "Ничего не найдено" output = [] for dataset_id, matches in results.items(): # Получаем информацию о базе данных для текущего датасета database_info = "" # Ищем поле database среди совпадений, чтобы вывести его for match_info in matches: if match_info['field'] == 'database': database_info = match_info['value'] break # Если database не найден в совпадениях, пробуем получить из других полей if not database_info: # Предполагаем, что база данных может быть в одном из полей, например sql или table_name # Но для точности лучше использовать специальное поле, которое мы уже получили pass # Пока не выводим, если не нашли явно output.append(f"\n--- Dataset ID: {dataset_id} ---") if database_info: output.append(f" Database: {database_info}") output.append("") # Пустая строка для читабельности for match_info in matches: field, match_text, full_value = match_info['field'], match_info['match'], match_info['value'] output.append(f" - Поле: {field}") output.append(f" Совпадение: '{match_text}'") lines = full_value.splitlines() if not lines: continue match_line_index = -1 for i, line in enumerate(lines): if match_text in line: match_line_index = i break if match_line_index != -1: start = max(0, match_line_index - context_lines) end = min(len(lines), match_line_index + context_lines + 1) output.append(" Контекст:") for i in range(start, end): prefix = f"{i + 1:5d}: " line_content = lines[i] if i == match_line_index: highlighted = line_content.replace(match_text, f">>>{match_text}<<<") output.append(f" {prefix}{highlighted}") else: output.append(f" {prefix}{line_content}") output.append("-" * 25) return "\n".join(output) # [/DEF:print_search_results] # [DEF:main:Function] # @PURPOSE: Основная точка входа для запуска скрипта поиска. # @RELATION: CALLS -> setup_clients # @RELATION: CALLS -> search_datasets # @RELATION: CALLS -> print_search_results # @RELATION: CALLS -> save_results_to_file def main(): logger = SupersetLogger(level=logging.INFO, console=True) clients = setup_clients(logger) target_client = clients['dev5'] search_query = r"from dm(_view)*.account_debt" # Генерируем имя файла на основе времени import datetime timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") output_filename = f"search_results_{timestamp}.txt" results = search_datasets( client=target_client, search_pattern=search_query, logger=logger ) report = print_search_results(results) logger.info(f"[main][Success] Search finished. Report:\n{report}") # Сохраняем результаты в файл success = save_results_to_file(results, output_filename, logger) if success: logger.info(f"[main][Success] Results also saved to file: {output_filename}") else: logger.error(f"[main][Failure] Failed to save results to file: {output_filename}") # [/DEF:main] if __name__ == "__main__": main() # [/DEF:search_script]