153 lines
5.9 KiB
Python
153 lines
5.9 KiB
Python
# pylint: disable=too-many-arguments,too-many-locals,too-many-statements,too-many-branches,unused-argument,invalid-name,redefined-outer-name
|
||
"""
|
||
[MODULE] Dataset Search Utilities
|
||
@contract: Предоставляет функционал для поиска текстовых паттернов в метаданных датасетов Superset.
|
||
"""
|
||
|
||
# [IMPORTS] Стандартная библиотека
|
||
import logging
|
||
import re
|
||
from typing import Dict, Optional
|
||
|
||
# [IMPORTS] Third-party
|
||
from requests.exceptions import RequestException
|
||
|
||
# [IMPORTS] Локальные модули
|
||
from superset_tool.client import SupersetClient
|
||
from superset_tool.exceptions import SupersetAPIError
|
||
from superset_tool.utils.logger import SupersetLogger
|
||
from superset_tool.utils.init_clients import setup_clients
|
||
|
||
# [ENTITY: Function('search_datasets')]
|
||
# CONTRACT:
|
||
# PURPOSE: Выполняет поиск по строковому паттерну в метаданных всех датасетов.
|
||
# PRECONDITIONS:
|
||
# - `client` должен быть инициализированным экземпляром `SupersetClient`.
|
||
# - `search_pattern` должен быть валидной строкой регулярного выражения.
|
||
# POSTCONDITIONS:
|
||
# - Возвращает словарь с результатами поиска.
|
||
def search_datasets(
|
||
client: SupersetClient,
|
||
search_pattern: str,
|
||
logger: Optional[SupersetLogger] = None
|
||
) -> Optional[Dict]:
|
||
logger = logger or SupersetLogger(name="dataset_search")
|
||
logger.info(f"[STATE][search_datasets][ENTER] Searching for pattern: '{search_pattern}'")
|
||
try:
|
||
_, datasets = client.get_datasets(query={
|
||
"columns": ["id", "table_name", "sql", "database", "columns"]
|
||
})
|
||
|
||
if not datasets:
|
||
logger.warning("[STATE][search_datasets][EMPTY] No datasets found.")
|
||
return None
|
||
|
||
pattern = re.compile(search_pattern, re.IGNORECASE)
|
||
results = {}
|
||
available_fields = set(datasets[0].keys())
|
||
|
||
for dataset in datasets:
|
||
dataset_id = dataset.get('id')
|
||
if not dataset_id:
|
||
continue
|
||
|
||
matches = []
|
||
for field in available_fields:
|
||
value = str(dataset.get(field, ""))
|
||
if pattern.search(value):
|
||
match_obj = pattern.search(value)
|
||
matches.append({
|
||
"field": field,
|
||
"match": match_obj.group() if match_obj else "",
|
||
"value": value
|
||
})
|
||
|
||
if matches:
|
||
results[dataset_id] = matches
|
||
|
||
logger.info(f"[STATE][search_datasets][SUCCESS] Found matches in {len(results)} datasets.")
|
||
return results
|
||
|
||
except re.error as e:
|
||
logger.error(f"[STATE][search_datasets][FAILURE] Invalid regex pattern: {e}", exc_info=True)
|
||
raise
|
||
except (SupersetAPIError, RequestException) as e:
|
||
logger.critical(f"[STATE][search_datasets][FAILURE] Critical error during search: {e}", exc_info=True)
|
||
raise
|
||
# END_FUNCTION_search_datasets
|
||
|
||
# [ENTITY: Function('print_search_results')]
|
||
# CONTRACT:
|
||
# PURPOSE: Форматирует результаты поиска для читаемого вывода в консоль.
|
||
# PRECONDITIONS:
|
||
# - `results` является словарем, возвращенным `search_datasets`, или `None`.
|
||
# POSTCONDITIONS:
|
||
# - Возвращает отформатированную строку с результатами.
|
||
def print_search_results(results: Optional[Dict], context_lines: int = 3) -> str:
|
||
if not results:
|
||
return "Ничего не найдено"
|
||
|
||
output = []
|
||
for dataset_id, matches in results.items():
|
||
output.append(f"\n--- Dataset ID: {dataset_id} ---")
|
||
for match_info in matches:
|
||
field = match_info['field']
|
||
match_text = match_info['match']
|
||
full_value = match_info['value']
|
||
|
||
output.append(f" - Поле: {field}")
|
||
output.append(f" Совпадение: '{match_text}'")
|
||
|
||
lines = full_value.splitlines()
|
||
if not lines:
|
||
continue
|
||
|
||
match_line_index = -1
|
||
for i, line in enumerate(lines):
|
||
if match_text in line:
|
||
match_line_index = i
|
||
break
|
||
|
||
if match_line_index != -1:
|
||
start_line = max(0, match_line_index - context_lines)
|
||
end_line = min(len(lines), match_line_index + context_lines + 1)
|
||
|
||
output.append(" Контекст:")
|
||
for i in range(start_line, end_line):
|
||
line_number = i + 1
|
||
line_content = lines[i]
|
||
prefix = f"{line_number:5d}: "
|
||
if i == match_line_index:
|
||
highlighted_line = line_content.replace(match_text, f">>>{match_text}<<<")
|
||
output.append(f" {prefix}{highlighted_line}")
|
||
else:
|
||
output.append(f" {prefix}{line_content}")
|
||
output.append("-" * 25)
|
||
return "\n".join(output)
|
||
# END_FUNCTION_print_search_results
|
||
|
||
# [ENTITY: Function('main')]
|
||
# CONTRACT:
|
||
# PURPOSE: Основная точка входа скрипта.
|
||
# PRECONDITIONS: None
|
||
# POSTCONDITIONS: None
|
||
def main():
|
||
logger = SupersetLogger(level=logging.INFO, console=True)
|
||
clients = setup_clients(logger)
|
||
|
||
target_client = clients['dev']
|
||
search_query = r"match(r2.path_code, budget_reference.ref_code || '($|(\s))')"
|
||
|
||
results = search_datasets(
|
||
client=target_client,
|
||
search_pattern=search_query,
|
||
logger=logger
|
||
)
|
||
|
||
report = print_search_results(results)
|
||
logger.info(f"[STATE][main][SUCCESS] Search finished. Report:\n{report}")
|
||
# END_FUNCTION_main
|
||
|
||
if __name__ == "__main__":
|
||
main()
|