Files
ss-tools/search_script.py

205 lines
10 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# [DEF:search_script:Module]
#
# @SEMANTICS: search, superset, dataset, regex, file_output
# @PURPOSE: Предоставляет утилиты для поиска по текстовым паттернам в метаданных датасетов Superset.
# @LAYER: App
# @RELATION: DEPENDS_ON -> superset_tool.client
# @RELATION: DEPENDS_ON -> superset_tool.utils
# @PUBLIC_API: search_datasets, save_results_to_file, print_search_results, main
# [SECTION: IMPORTS]
import logging
import re
import os
from typing import Dict, Optional
from requests.exceptions import RequestException
from superset_tool.client import SupersetClient
from superset_tool.exceptions import SupersetAPIError
from superset_tool.utils.logger import SupersetLogger
from superset_tool.utils.init_clients import setup_clients
# [/SECTION]
# [DEF:search_datasets:Function]
# @PURPOSE: Выполняет поиск по строковому паттерну в метаданных всех датасетов.
# @PRE: `client` должен быть инициализированным экземпляром `SupersetClient`.
# @PRE: `search_pattern` должен быть валидной строкой регулярного выражения.
# @POST: Возвращает словарь с результатами поиска, где ключ - ID датасета, значение - список совпадений.
# @RELATION: CALLS -> client.get_datasets
# @THROW: re.error - Если паттерн регулярного выражения невалиден.
# @THROW: SupersetAPIError, RequestException - При критических ошибках API.
# @PARAM: client (SupersetClient) - Клиент для доступа к API Superset.
# @PARAM: search_pattern (str) - Регулярное выражение для поиска.
# @PARAM: logger (Optional[SupersetLogger]) - Инстанс логгера.
# @RETURN: Optional[Dict] - Словарь с результатами или None, если ничего не найдено.
def search_datasets(
client: SupersetClient,
search_pattern: str,
logger: Optional[SupersetLogger] = None
) -> Optional[Dict]:
logger = logger or SupersetLogger(name="dataset_search")
logger.info(f"[search_datasets][Enter] Searching for pattern: '{search_pattern}'")
try:
_, datasets = client.get_datasets(query={"columns": ["id", "table_name", "sql", "database", "columns"]})
if not datasets:
logger.warning("[search_datasets][State] No datasets found.")
return None
pattern = re.compile(search_pattern, re.IGNORECASE)
results = {}
for dataset in datasets:
dataset_id = dataset.get('id')
if not dataset_id:
continue
matches = []
for field, value in dataset.items():
value_str = str(value)
if pattern.search(value_str):
match_obj = pattern.search(value_str)
matches.append({
"field": field,
"match": match_obj.group() if match_obj else "",
"value": value_str
})
if matches:
results[dataset_id] = matches
logger.info(f"[search_datasets][Success] Found matches in {len(results)} datasets.")
return results
except re.error as e:
logger.error(f"[search_datasets][Failure] Invalid regex pattern: {e}", exc_info=True)
raise
except (SupersetAPIError, RequestException) as e:
logger.critical(f"[search_datasets][Failure] Critical error during search: {e}", exc_info=True)
raise
# [/DEF:search_datasets:Function]
# [DEF:save_results_to_file:Function]
# @PURPOSE: Сохраняет результаты поиска в текстовый файл.
# @PRE: `results` является словарем, возвращенным `search_datasets`, или `None`.
# @PRE: `filename` должен быть допустимым путем к файлу.
# @POST: Записывает отформатированные результаты в указанный файл.
# @PARAM: results (Optional[Dict]) - Словарь с результатами поиска.
# @PARAM: filename (str) - Имя файла для сохранения результатов.
# @PARAM: logger (Optional[SupersetLogger]) - Инстанс логгера.
# @RETURN: bool - Успешно ли выполнено сохранение.
def save_results_to_file(results: Optional[Dict], filename: str, logger: Optional[SupersetLogger] = None) -> bool:
logger = logger or SupersetLogger(name="file_writer")
logger.info(f"[save_results_to_file][Enter] Saving results to file: {filename}")
try:
formatted_report = print_search_results(results)
with open(filename, 'w', encoding='utf-8') as f:
f.write(formatted_report)
logger.info(f"[save_results_to_file][Success] Results saved to {filename}")
return True
except Exception as e:
logger.error(f"[save_results_to_file][Failure] Failed to save results to file: {e}", exc_info=True)
return False
# [/DEF:save_results_to_file:Function]
# [DEF:print_search_results:Function]
# @PURPOSE: Форматирует результаты поиска для читаемого вывода в консоль.
# @PRE: `results` является словарем, возвращенным `search_datasets`, или `None`.
# @POST: Возвращает отформатированную строку с результатами.
# @PARAM: results (Optional[Dict]) - Словарь с результатами поиска.
# @PARAM: context_lines (int) - Количество строк контекста для вывода до и после совпадения.
# @RETURN: str - Отформатированный отчет.
def print_search_results(results: Optional[Dict], context_lines: int = 3) -> str:
if not results:
return "Ничего не найдено"
output = []
for dataset_id, matches in results.items():
# Получаем информацию о базе данных для текущего датасета
database_info = ""
# Ищем поле database среди совпадений, чтобы вывести его
for match_info in matches:
if match_info['field'] == 'database':
database_info = match_info['value']
break
# Если database не найден в совпадениях, пробуем получить из других полей
if not database_info:
# Предполагаем, что база данных может быть в одном из полей, например sql или table_name
# Но для точности лучше использовать специальное поле, которое мы уже получили
pass # Пока не выводим, если не нашли явно
output.append(f"\n--- Dataset ID: {dataset_id} ---")
if database_info:
output.append(f" Database: {database_info}")
output.append("") # Пустая строка для читабельности
for match_info in matches:
field, match_text, full_value = match_info['field'], match_info['match'], match_info['value']
output.append(f" - Поле: {field}")
output.append(f" Совпадение: '{match_text}'")
lines = full_value.splitlines()
if not lines: continue
match_line_index = -1
for i, line in enumerate(lines):
if match_text in line:
match_line_index = i
break
if match_line_index != -1:
start = max(0, match_line_index - context_lines)
end = min(len(lines), match_line_index + context_lines + 1)
output.append(" Контекст:")
for i in range(start, end):
prefix = f"{i + 1:5d}: "
line_content = lines[i]
if i == match_line_index:
highlighted = line_content.replace(match_text, f">>>{match_text}<<<")
output.append(f" {prefix}{highlighted}")
else:
output.append(f" {prefix}{line_content}")
output.append("-" * 25)
return "\n".join(output)
# [/DEF:print_search_results:Function]
# [DEF:main:Function]
# @PURPOSE: Основная точка входа для запуска скрипта поиска.
# @RELATION: CALLS -> setup_clients
# @RELATION: CALLS -> search_datasets
# @RELATION: CALLS -> print_search_results
# @RELATION: CALLS -> save_results_to_file
def main():
logger = SupersetLogger(level=logging.INFO, console=True)
clients = setup_clients(logger)
target_client = clients['dev5']
search_query = r"from dm(_view)*.account_debt"
# Генерируем имя файла на основе времени
import datetime
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
output_filename = f"search_results_{timestamp}.txt"
results = search_datasets(
client=target_client,
search_pattern=search_query,
logger=logger
)
report = print_search_results(results)
logger.info(f"[main][Success] Search finished. Report:\n{report}")
# Сохраняем результаты в файл
success = save_results_to_file(results, output_filename, logger)
if success:
logger.info(f"[main][Success] Results also saved to file: {output_filename}")
else:
logger.error(f"[main][Failure] Failed to save results to file: {output_filename}")
# [/DEF:main:Function]
if __name__ == "__main__":
main()
# [/DEF:search_script:Module]