Files
ss-tools/search_script.py
2025-11-13 09:54:29 +03:00

206 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# <GRACE_MODULE id="search_script" name="search_script.py">
# @SEMANTICS: search, superset, dataset, regex, file_output
# @PURPOSE: Предоставляет утилиты для поиска по текстовым паттернам в метаданных датасетов Superset.
# @DEPENDS_ON: superset_tool.client -> Для взаимодействия с API Superset.
# @DEPENDS_ON: superset_tool.utils -> Для логирования и инициализации клиентов.
# <IMPORTS>
import logging
import re
import os
from typing import Dict, Optional
from requests.exceptions import RequestException
from superset_tool.client import SupersetClient
from superset_tool.exceptions import SupersetAPIError
from superset_tool.utils.logger import SupersetLogger
from superset_tool.utils.init_clients import setup_clients
# </IMPORTS>
# --- Начало кода модуля ---
# <ANCHOR id="search_datasets" type="Function">
# @PURPOSE: Выполняет поиск по строковому паттерну в метаданных всех датасетов.
# @PRE: `client` должен быть инициализированным экземпляром `SupersetClient`.
# @PRE: `search_pattern` должен быть валидной строкой регулярного выражения.
# @POST: Возвращает словарь с результатами поиска, где ключ - ID датасета, значение - список совпадений.
# @PARAM: client: SupersetClient - Клиент для доступа к API Superset.
# @PARAM: search_pattern: str - Регулярное выражение для поиска.
# @PARAM: logger: Optional[SupersetLogger] - Инстанс логгера.
# @RETURN: Optional[Dict] - Словарь с результатами или None, если ничего не найдено.
# @THROW: re.error - Если паттерн регулярного выражения невалиден.
# @THROW: SupersetAPIError, RequestException - При критических ошибках API.
# @RELATION: CALLS -> client.get_datasets
def search_datasets(
client: SupersetClient,
search_pattern: str,
logger: Optional[SupersetLogger] = None
) -> Optional[Dict]:
logger = logger or SupersetLogger(name="dataset_search")
logger.info(f"[search_datasets][Enter] Searching for pattern: '{search_pattern}'")
try:
_, datasets = client.get_datasets(query={"columns": ["id", "table_name", "sql", "database", "columns"]})
if not datasets:
logger.warning("[search_datasets][State] No datasets found.")
return None
pattern = re.compile(search_pattern, re.IGNORECASE)
results = {}
for dataset in datasets:
dataset_id = dataset.get('id')
if not dataset_id:
continue
matches = []
for field, value in dataset.items():
value_str = str(value)
if pattern.search(value_str):
match_obj = pattern.search(value_str)
matches.append({
"field": field,
"match": match_obj.group() if match_obj else "",
"value": value_str
})
if matches:
results[dataset_id] = matches
logger.info(f"[search_datasets][Success] Found matches in {len(results)} datasets.")
return results
except re.error as e:
logger.error(f"[search_datasets][Failure] Invalid regex pattern: {e}", exc_info=True)
raise
except (SupersetAPIError, RequestException) as e:
logger.critical(f"[search_datasets][Failure] Critical error during search: {e}", exc_info=True)
raise
# </ANCHOR id="search_datasets">
# <ANCHOR id="save_results_to_file" type="Function">
# @PURPOSE: Сохраняет результаты поиска в текстовый файл.
# @PRE: `results` является словарем, возвращенным `search_datasets`, или `None`.
# @PRE: `filename` должен быть допустимым путем к файлу.
# @POST: Записывает отформатированные результаты в указанный файл.
# @PARAM: results: Optional[Dict] - Словарь с результатами поиска.
# @PARAM: filename: str - Имя файла для сохранения результатов.
# @PARAM: logger: Optional[SupersetLogger] - Инстанс логгера.
# @RETURN: bool - Успешно ли выполнено сохранение.
def save_results_to_file(results: Optional[Dict], filename: str, logger: Optional[SupersetLogger] = None) -> bool:
logger = logger or SupersetLogger(name="file_writer")
logger.info(f"[save_results_to_file][Enter] Saving results to file: {filename}")
try:
formatted_report = print_search_results(results)
with open(filename, 'w', encoding='utf-8') as f:
f.write(formatted_report)
logger.info(f"[save_results_to_file][Success] Results saved to {filename}")
return True
except Exception as e:
logger.error(f"[save_results_to_file][Failure] Failed to save results to file: {e}", exc_info=True)
return False
# </ANCHOR id="save_results_to_file">
# <ANCHOR id="print_search_results" type="Function">
# @PURPOSE: Форматирует результаты поиска для читаемого вывода в консоль.
# @PRE: `results` является словарем, возвращенным `search_datasets`, или `None`.
# @POST: Возвращает отформатированную строку с результатами.
# @PARAM: results: Optional[Dict] - Словарь с результатами поиска.
# @PARAM: context_lines: int - Количество строк контекста для вывода до и после совпадения.
# @RETURN: str - Отформатированный отчет.
def print_search_results(results: Optional[Dict], context_lines: int = 3) -> str:
if not results:
return "Ничего не найдено"
output = []
for dataset_id, matches in results.items():
# Получаем информацию о базе данных для текущего датасета
database_info = ""
# Ищем поле database среди совпадений, чтобы вывести его
for match_info in matches:
if match_info['field'] == 'database':
database_info = match_info['value']
break
# Если database не найден в совпадениях, пробуем получить из других полей
if not database_info:
# Предполагаем, что база данных может быть в одном из полей, например sql или table_name
# Но для точности лучше использовать специальное поле, которое мы уже получили
pass # Пока не выводим, если не нашли явно
output.append(f"\n--- Dataset ID: {dataset_id} ---")
if database_info:
output.append(f" Database: {database_info}")
output.append("") # Пустая строка для читабельности
for match_info in matches:
field, match_text, full_value = match_info['field'], match_info['match'], match_info['value']
output.append(f" - Поле: {field}")
output.append(f" Совпадение: '{match_text}'")
lines = full_value.splitlines()
if not lines: continue
match_line_index = -1
for i, line in enumerate(lines):
if match_text in line:
match_line_index = i
break
if match_line_index != -1:
start = max(0, match_line_index - context_lines)
end = min(len(lines), match_line_index + context_lines + 1)
output.append(" Контекст:")
for i in range(start, end):
prefix = f"{i + 1:5d}: "
line_content = lines[i]
if i == match_line_index:
highlighted = line_content.replace(match_text, f">>>{match_text}<<<")
output.append(f" {prefix}{highlighted}")
else:
output.append(f" {prefix}{line_content}")
output.append("-" * 25)
return "\n".join(output)
# </ANCHOR id="print_search_results">
# <ANCHOR id="main" type="Function">
# @PURPOSE: Основная точка входа для запуска скрипта поиска.
# @RELATION: CALLS -> setup_clients
# @RELATION: CALLS -> search_datasets
# @RELATION: CALLS -> print_search_results
# @RELATION: CALLS -> save_results_to_file
def main():
logger = SupersetLogger(level=logging.INFO, console=True)
clients = setup_clients(logger)
target_client = clients['prod']
search_query = r"from dm_view.[a-z_]*"
# Генерируем имя файла на основе времени
import datetime
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
output_filename = f"search_results_{timestamp}.txt"
results = search_datasets(
client=target_client,
search_pattern=search_query,
logger=logger
)
report = print_search_results(results)
logger.info(f"[main][Success] Search finished. Report:\n{report}")
# Сохраняем результаты в файл
success = save_results_to_file(results, output_filename, logger)
if success:
logger.info(f"[main][Success] Results also saved to file: {output_filename}")
else:
logger.error(f"[main][Failure] Failed to save results to file: {output_filename}")
# </ANCHOR id="main">
if __name__ == "__main__":
main()
# --- Конец кода модуля ---
# </GRACE_MODULE id="search_script">