205 lines
10 KiB
Python
Executable File
205 lines
10 KiB
Python
Executable File
# [DEF:search_script:Module]
|
||
#
|
||
# @SEMANTICS: search, superset, dataset, regex, file_output
|
||
# @PURPOSE: Предоставляет утилиты для поиска по текстовым паттернам в метаданных датасетов Superset.
|
||
# @LAYER: App
|
||
# @RELATION: DEPENDS_ON -> superset_tool.client
|
||
# @RELATION: DEPENDS_ON -> superset_tool.utils
|
||
# @PUBLIC_API: search_datasets, save_results_to_file, print_search_results, main
|
||
|
||
# [SECTION: IMPORTS]
|
||
import logging
|
||
import re
|
||
import os
|
||
from typing import Dict, Optional
|
||
from requests.exceptions import RequestException
|
||
from superset_tool.client import SupersetClient
|
||
from superset_tool.exceptions import SupersetAPIError
|
||
from superset_tool.utils.logger import SupersetLogger
|
||
from superset_tool.utils.init_clients import setup_clients
|
||
# [/SECTION]
|
||
|
||
# [DEF:search_datasets:Function]
|
||
# @PURPOSE: Выполняет поиск по строковому паттерну в метаданных всех датасетов.
|
||
# @PRE: `client` должен быть инициализированным экземпляром `SupersetClient`.
|
||
# @PRE: `search_pattern` должен быть валидной строкой регулярного выражения.
|
||
# @POST: Возвращает словарь с результатами поиска, где ключ - ID датасета, значение - список совпадений.
|
||
# @RELATION: CALLS -> client.get_datasets
|
||
# @THROW: re.error - Если паттерн регулярного выражения невалиден.
|
||
# @THROW: SupersetAPIError, RequestException - При критических ошибках API.
|
||
# @PARAM: client (SupersetClient) - Клиент для доступа к API Superset.
|
||
# @PARAM: search_pattern (str) - Регулярное выражение для поиска.
|
||
# @PARAM: logger (Optional[SupersetLogger]) - Инстанс логгера.
|
||
# @RETURN: Optional[Dict] - Словарь с результатами или None, если ничего не найдено.
|
||
def search_datasets(
|
||
client: SupersetClient,
|
||
search_pattern: str,
|
||
logger: Optional[SupersetLogger] = None
|
||
) -> Optional[Dict]:
|
||
logger = logger or SupersetLogger(name="dataset_search")
|
||
logger.info(f"[search_datasets][Enter] Searching for pattern: '{search_pattern}'")
|
||
try:
|
||
_, datasets = client.get_datasets(query={"columns": ["id", "table_name", "sql", "database", "columns"]})
|
||
|
||
if not datasets:
|
||
logger.warning("[search_datasets][State] No datasets found.")
|
||
return None
|
||
|
||
pattern = re.compile(search_pattern, re.IGNORECASE)
|
||
results = {}
|
||
|
||
for dataset in datasets:
|
||
dataset_id = dataset.get('id')
|
||
if not dataset_id:
|
||
continue
|
||
|
||
matches = []
|
||
for field, value in dataset.items():
|
||
value_str = str(value)
|
||
if pattern.search(value_str):
|
||
match_obj = pattern.search(value_str)
|
||
matches.append({
|
||
"field": field,
|
||
"match": match_obj.group() if match_obj else "",
|
||
"value": value_str
|
||
})
|
||
|
||
if matches:
|
||
results[dataset_id] = matches
|
||
|
||
logger.info(f"[search_datasets][Success] Found matches in {len(results)} datasets.")
|
||
return results
|
||
|
||
except re.error as e:
|
||
logger.error(f"[search_datasets][Failure] Invalid regex pattern: {e}", exc_info=True)
|
||
raise
|
||
except (SupersetAPIError, RequestException) as e:
|
||
logger.critical(f"[search_datasets][Failure] Critical error during search: {e}", exc_info=True)
|
||
raise
|
||
# [/DEF:search_datasets:Function]
|
||
|
||
# [DEF:save_results_to_file:Function]
|
||
# @PURPOSE: Сохраняет результаты поиска в текстовый файл.
|
||
# @PRE: `results` является словарем, возвращенным `search_datasets`, или `None`.
|
||
# @PRE: `filename` должен быть допустимым путем к файлу.
|
||
# @POST: Записывает отформатированные результаты в указанный файл.
|
||
# @PARAM: results (Optional[Dict]) - Словарь с результатами поиска.
|
||
# @PARAM: filename (str) - Имя файла для сохранения результатов.
|
||
# @PARAM: logger (Optional[SupersetLogger]) - Инстанс логгера.
|
||
# @RETURN: bool - Успешно ли выполнено сохранение.
|
||
def save_results_to_file(results: Optional[Dict], filename: str, logger: Optional[SupersetLogger] = None) -> bool:
|
||
logger = logger or SupersetLogger(name="file_writer")
|
||
logger.info(f"[save_results_to_file][Enter] Saving results to file: {filename}")
|
||
try:
|
||
formatted_report = print_search_results(results)
|
||
with open(filename, 'w', encoding='utf-8') as f:
|
||
f.write(formatted_report)
|
||
logger.info(f"[save_results_to_file][Success] Results saved to {filename}")
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"[save_results_to_file][Failure] Failed to save results to file: {e}", exc_info=True)
|
||
return False
|
||
# [/DEF:save_results_to_file:Function]
|
||
|
||
# [DEF:print_search_results:Function]
|
||
# @PURPOSE: Форматирует результаты поиска для читаемого вывода в консоль.
|
||
# @PRE: `results` является словарем, возвращенным `search_datasets`, или `None`.
|
||
# @POST: Возвращает отформатированную строку с результатами.
|
||
# @PARAM: results (Optional[Dict]) - Словарь с результатами поиска.
|
||
# @PARAM: context_lines (int) - Количество строк контекста для вывода до и после совпадения.
|
||
# @RETURN: str - Отформатированный отчет.
|
||
def print_search_results(results: Optional[Dict], context_lines: int = 3) -> str:
|
||
if not results:
|
||
return "Ничего не найдено"
|
||
|
||
output = []
|
||
for dataset_id, matches in results.items():
|
||
# Получаем информацию о базе данных для текущего датасета
|
||
database_info = ""
|
||
# Ищем поле database среди совпадений, чтобы вывести его
|
||
for match_info in matches:
|
||
if match_info['field'] == 'database':
|
||
database_info = match_info['value']
|
||
break
|
||
# Если database не найден в совпадениях, пробуем получить из других полей
|
||
if not database_info:
|
||
# Предполагаем, что база данных может быть в одном из полей, например sql или table_name
|
||
# Но для точности лучше использовать специальное поле, которое мы уже получили
|
||
pass # Пока не выводим, если не нашли явно
|
||
|
||
output.append(f"\n--- Dataset ID: {dataset_id} ---")
|
||
if database_info:
|
||
output.append(f" Database: {database_info}")
|
||
output.append("") # Пустая строка для читабельности
|
||
|
||
for match_info in matches:
|
||
field, match_text, full_value = match_info['field'], match_info['match'], match_info['value']
|
||
output.append(f" - Поле: {field}")
|
||
output.append(f" Совпадение: '{match_text}'")
|
||
|
||
lines = full_value.splitlines()
|
||
if not lines: continue
|
||
|
||
match_line_index = -1
|
||
for i, line in enumerate(lines):
|
||
if match_text in line:
|
||
match_line_index = i
|
||
break
|
||
|
||
if match_line_index != -1:
|
||
start = max(0, match_line_index - context_lines)
|
||
end = min(len(lines), match_line_index + context_lines + 1)
|
||
output.append(" Контекст:")
|
||
for i in range(start, end):
|
||
prefix = f"{i + 1:5d}: "
|
||
line_content = lines[i]
|
||
if i == match_line_index:
|
||
highlighted = line_content.replace(match_text, f">>>{match_text}<<<")
|
||
output.append(f" {prefix}{highlighted}")
|
||
else:
|
||
output.append(f" {prefix}{line_content}")
|
||
output.append("-" * 25)
|
||
return "\n".join(output)
|
||
# [/DEF:print_search_results:Function]
|
||
|
||
# [DEF:main:Function]
|
||
# @PURPOSE: Основная точка входа для запуска скрипта поиска.
|
||
# @RELATION: CALLS -> setup_clients
|
||
# @RELATION: CALLS -> search_datasets
|
||
# @RELATION: CALLS -> print_search_results
|
||
# @RELATION: CALLS -> save_results_to_file
|
||
def main():
|
||
logger = SupersetLogger(level=logging.INFO, console=True)
|
||
clients = setup_clients(logger)
|
||
|
||
target_client = clients['dev5']
|
||
search_query = r"from dm(_view)*.account_debt"
|
||
|
||
# Генерируем имя файла на основе времени
|
||
import datetime
|
||
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
|
||
output_filename = f"search_results_{timestamp}.txt"
|
||
|
||
results = search_datasets(
|
||
client=target_client,
|
||
search_pattern=search_query,
|
||
logger=logger
|
||
)
|
||
|
||
report = print_search_results(results)
|
||
|
||
logger.info(f"[main][Success] Search finished. Report:\n{report}")
|
||
|
||
# Сохраняем результаты в файл
|
||
success = save_results_to_file(results, output_filename, logger)
|
||
if success:
|
||
logger.info(f"[main][Success] Results also saved to file: {output_filename}")
|
||
else:
|
||
logger.error(f"[main][Failure] Failed to save results to file: {output_filename}")
|
||
# [/DEF:main:Function]
|
||
|
||
if __name__ == "__main__":
|
||
main()
|
||
|
||
# [/DEF:search_script:Module]
|