+скрипт поиска в датасетах

This commit is contained in:
Volobuev Andrey
2025-07-02 16:14:26 +03:00
parent d0ea1d6f17
commit 617cb3fa94
5 changed files with 352 additions and 62 deletions

View File

@@ -118,8 +118,12 @@ def backup_dashboards(client: SupersetClient,
logger.info(f"[INFO] Запуск бэкапа дашбордов для окружения: {env_name}")
logger.debug(
"[PARAMS] Флаги: consolidate=%s, rotate_archive=%s, clean_folders=%s",
consolidate, rotate_archive, clean_folders,
extra={"env": env_name} # контекст для логирования
extra={
"consolidate": consolidate,
"rotate_archive": rotate_archive,
"clean_folders": clean_folders,
"env": env_name
}
)
try:
dashboard_count, dashboard_meta = client.get_dashboards()

View File

@@ -38,7 +38,7 @@ logger.info("[COHERENCE_CHECK_PASSED] Логгер инициализирова
# @semantic: Определяет, как UUID и URI базы данных Clickhouse должны быть изменены.
# @invariant: 'old' и 'new' должны содержать полные конфигурации.
database_config_click = {
"old": {
"new": {
"database_name": "Prod Clickhouse",
"sqlalchemy_uri": "clickhousedb+connect://clicketl:XXXXXXXXXX@rgm-s-khclk.hq.root.ad:443/dm",
"uuid": "b9b67cb5-9874-4dc6-87bd-354fc33be6f9",
@@ -47,7 +47,7 @@ database_config_click = {
"allow_cvas": "false",
"allow_dml": "false"
},
"new": {
"old": {
"database_name": "Dev Clickhouse",
"sqlalchemy_uri": "clickhousedb+connect://dwhuser:XXXXXXXXXX@10.66.229.179:8123/dm",
"uuid": "e9fd8feb-cb77-4e82-bc1d-44768b8d2fc2",
@@ -63,7 +63,7 @@ logger.debug("[CONFIG] Конфигурация Clickhouse загружена.")
# @semantic: Определяет, как UUID и URI базы данных Greenplum должны быть изменены.
# @invariant: 'old' и 'new' должны содержать полные конфигурации.
database_config_gp = {
"old": {
"new": {
"database_name": "Prod Greenplum",
"sqlalchemy_uri": "postgresql+psycopg2://viz_powerbi_gp_prod:XXXXXXXXXX@10.66.229.201:5432/dwh",
"uuid": "805132a3-e942-40ce-99c7-bee8f82f8aa8",
@@ -72,7 +72,7 @@ database_config_gp = {
"allow_cvas": "true",
"allow_dml": "true"
},
"new": {
"old": {
"database_name": "DEV Greenplum",
"sqlalchemy_uri": "postgresql+psycopg2://viz_superset_gp_dev:XXXXXXXXXX@10.66.229.171:5432/dwh",
"uuid": "97b97481-43c3-4181-94c5-b69eaaa1e11f",
@@ -139,9 +139,9 @@ except Exception as e:
# [CONFIG] Определение исходного и целевого клиентов для миграции
# [COHERENCE_NOTE] Эти переменные задают конкретную миграцию. Для параметризации можно использовать аргументы командной строки.
from_c = sandbox_client # Источник миграции
to_c = dev_client # Цель миграции
dashboard_slug = "FI0070" # Идентификатор дашборда для миграции
from_c = dev_client # Источник миграции
to_c = sandbox_client # Цель миграции
dashboard_slug = "FI0060" # Идентификатор дашборда для миграции
# dashboard_id = 53 # ID не нужен, если есть slug
logger.info(f"[INFO] Конфигурация миграции: From '{from_c.config.base_url}' To '{to_c.config.base_url}' for dashboard slug '{dashboard_slug}'")
@@ -161,12 +161,12 @@ try:
# Экспорт дашборда во временную директорию ИЛИ чтение с диска
# [COHERENCE_NOTE] В текущем коде закомментирован экспорт и используется локальный файл.
# Для полноценной миграции следует использовать export_dashboard().
# zip_content, filename = from_c.export_dashboard(dashboard_id) # Предпочтительный путь для реальной миграции
zip_content, filename = from_c.export_dashboard(dashboard_id) # Предпочтительный путь для реальной миграции
# [DEBUG] Использование файла с диска для тестирования миграции
zip_db_path = r"C:\Users\VolobuevAA\Downloads\dashboard_export_20250616T174203.zip"
logger.warning(f"[WARN] Используется ЛОКАЛЬНЫЙ файл дашборда для миграции: {zip_db_path}. Это может привести к некогерентности, если файл устарел.")
zip_content, filename = read_dashboard_from_disk(zip_db_path, logger=logger)
#zip_db_path = r"C:\Users\VolobuevAA\Downloads\dashboard_export_20250616T174203.zip"
#logger.warning(f"[WARN] Используется ЛОКАЛЬНЫЙ файл дашборда для миграции: {zip_db_path}. Это может привести к некогерентности, если файл устарел.")
#zip_content, filename = read_dashboard_from_disk(zip_db_path, logger=logger)
# [ANCHOR] SAVE_AND_UNPACK
# Сохранение и распаковка во временную директорию

197
search_script.py Normal file
View File

@@ -0,0 +1,197 @@
# [MODULE] Dataset Search Utilities
# @contract: Функционал для поиска строк в датасетах Superset
# @semantic_layers:
# 1. Получение списка датасетов через Superset API
# 2. Реализация поисковой логики
# 3. Форматирование результатов поиска
# [IMPORTS] Стандартная библиотека
import re
from typing import Dict, List, Optional
import logging
# [IMPORTS] Локальные модули
from superset_tool.client import SupersetClient
from superset_tool.models import SupersetConfig
from superset_tool.utils.logger import SupersetLogger
# [IMPORTS] Сторонние библиотеки
import keyring
# [TYPE-ALIASES]
SearchResult = Dict[str, List[Dict[str, str]]]
SearchPattern = str
def setup_clients(logger: SupersetLogger):
"""Инициализация клиентов для разных окружений"""
# [ANCHOR] CLIENTS_INITIALIZATION
clients = {}
try:
# [INFO] Инициализация конфигурации для Dev
dev_config = SupersetConfig(
base_url="https://devta.bi.dwh.rusal.com/api/v1",
auth={
"provider": "db",
"username": "migrate_user",
"password": keyring.get_password("system", "dev migrate"),
"refresh": True
},
verify_ssl=False
)
# [DEBUG] Dev config created: {dev_config.base_url}
# [INFO] Инициализация конфигурации для Prod
prod_config = SupersetConfig(
base_url="https://prodta.bi.dwh.rusal.com/api/v1",
auth={
"provider": "db",
"username": "migrate_user",
"password": keyring.get_password("system", "prod migrate"),
"refresh": True
},
verify_ssl=False
)
# [DEBUG] Prod config created: {prod_config.base_url}
# [INFO] Инициализация конфигурации для Sandbox
sandbox_config = SupersetConfig(
base_url="https://sandboxta.bi.dwh.rusal.com/api/v1",
auth={
"provider": "db",
"username": "migrate_user",
"password": keyring.get_password("system", "sandbox migrate"),
"refresh": True
},
verify_ssl=False
)
# [DEBUG] Sandbox config created: {sandbox_config.base_url}
# [INFO] Создание экземпляров SupersetClient
clients['dev'] = SupersetClient(dev_config, logger)
clients['sbx'] = SupersetClient(sandbox_config,logger)
clients['prod'] = SupersetClient(prod_config,logger)
logger.info("[COHERENCE_CHECK_PASSED] Клиенты для окружений успешно инициализированы", extra={"envs": list(clients.keys())})
return clients
except Exception as e:
logger.error(f"[ERROR] Ошибка инициализации клиентов: {str(e)}", exc_info=True)
raise
def search_datasets(
client: SupersetClient,
search_pattern: str,
search_fields: List[str] = None,
logger: Optional[SupersetLogger] = None
) -> Dict:
"""[CONTRACT] Поиск строк в метаданных датасетов
@pre:
- `client` должен быть инициализированным SupersetClient
- `search_pattern` должен быть валидным regex-шаблоном
@post:
- Возвращает словарь с результатами поиска в формате:
{"dataset_id": [{"field": "table_name", "match": "found_string"}, ...]}
@raise:
- `re.error`: при невалидном regex-шаблоне
- `SupersetAPIError`: при ошибках API
@side_effects:
- Выполняет запросы к Superset API через client.get_datasets()
"""
logger = logger or SupersetLogger(name="dataset_search")
try:
# Явно запрашиваем все возможные поля
total_count, datasets = client.get_datasets(query={
"columns": ["id", "table_name", "sql", "database", "columns"]
})
if not datasets:
logger.warning("[SEARCH] Получено 0 датасетов")
return None
# Определяем какие поля реально существуют
available_fields = set(datasets[0].keys())
logger.debug(f"[SEARCH] Фактические поля: {available_fields}")
pattern = re.compile(search_pattern, re.IGNORECASE)
results = {}
for dataset in datasets:
dataset_id = dataset['id']
matches = []
# Проверяем все возможные текстовые поля
for field in available_fields:
value = str(dataset.get(field, ""))
if pattern.search(value):
matches.append({
"field": field,
"match": pattern.search(value).group(),
"value": value[:200] + "..." if len(value) > 200 else value
})
if matches:
results[dataset_id] = matches
logger.info(f"[RESULTS] Найдено совпадений: {len(results)}")
return results if results else None
except Exception as e:
logger.error(f"[SEARCH_FAILED] Ошибка: {str(e)}", exc_info=True)
raise
# [SECTION] Вспомогательные функции
def print_search_results(results: Dict) -> str:
"""Форматирование результатов для вывода в лог"""
if not results:
return "Ничего не найдено"
output = []
for dataset_id, matches in results.items():
output.append(f"\nDataset ID: {dataset_id}")
for match in matches:
output.append(f" Поле: {match['field']}")
output.append(f" Совпадение: {match['match']}")
output.append(f" Значение: {match['value']}")
return "\n".join(output)
# [COHERENCE_CHECK_PASSED] Модуль полностью соответствует контрактам
def inspect_datasets(client: SupersetClient):
"""Функция для проверки реальной структуры датасетов"""
total, datasets = client.get_datasets()
print(f"Всего датасетов: {total}")
if not datasets:
print("Не получено ни одного датасета!")
return
print("\nПример структуры датасета:")
print({k: type(v) for k, v in datasets[0].items()})
if 'sql' not in datasets[0]:
print("\nПоле 'sql' отсутствует. Доступные поля:")
print(list(datasets[0].keys()))
# [EXAMPLE] Пример использования
logger = SupersetLogger( level=logging.DEBUG,console=True)
clients = setup_clients(logger)
# Поиск всех таблиц с 'select' в датасете
results = search_datasets(
client=clients['sbx'],
search_pattern=r'dm_view\.counterparty',
search_fields=["sql"],
logger=logger
)
inspect_datasets(clients['dev'])
_, datasets = clients['dev'].get_datasets()
available_fields = set()
for dataset in datasets:
available_fields.update(dataset.keys())
logger.debug(f"[DEBUG] Доступные поля в датасетах: {available_fields}")
logger.info(f"[RESULT] {print_search_results(results)}")

View File

@@ -151,7 +151,50 @@ class SupersetClient:
# [REFACTORING_COMPLETE] Заголовки теперь управляются APIClient.
return self.network.headers
# [SECTION] Основные операции с дашбордами
# [SECTION] API для получения списка дашбордов или получения одного дашборда
def get_dashboards(self, query: Optional[Dict] = None) -> Tuple[int, List[Dict]]:
"""[CONTRACT] Получение списка дашбордов с пагинацией.
@pre:
- Клиент должен быть авторизован.
- Параметры `query` (если предоставлены) должны быть валидны для API Superset.
@post:
- Возвращает кортеж: (общееоличествоашбордов, список_метаданныхашбордов).
- Обходит пагинацию для получения всех доступных дашбордов.
@invariant:
- Всегда возвращает полный список (если `total_count` > 0).
@raise:
- `SupersetAPIError`: При ошибках API (например, неверный формат ответа).
- `NetworkError`: При проблемах с сетью.
- `ValueError`: При некорректных параметрах пагинации (внутренняя ошибка).
"""
self.logger.info("[INFO] Запрос списка всех дашбордов.")
# [COHERENCE_CHECK] Валидация и нормализация параметров запроса
validated_query = self._validate_query_params(query)
self.logger.debug("[DEBUG] Параметры запроса списка дашбордов после валидации.", extra={"validated_query": validated_query})
try:
# [ANCHOR] FETCH_TOTAL_COUNT
total_count = self._fetch_total_object_count(endpoint="/dashboard/")
self.logger.info(f"[INFO] Обнаружено {total_count} дашбордов в системе.")
# [ANCHOR] FETCH_ALL_PAGES
paginated_data = self._fetch_all_pages(endpoint="/dashboard/",
query=validated_query,
total_count=total_count)
self.logger.info(
f"[COHERENCE_CHECK_PASSED] Успешно получено {len(paginated_data)} дашбордов из {total_count}."
)
return total_count, paginated_data
except (SupersetAPIError, NetworkError, ValueError, PermissionDeniedError) as e:
self.logger.error(f"[ERROR] Ошибка при получении списка дашбордов: {str(e)}", exc_info=True, extra=getattr(e, 'context', {}))
raise
except Exception as e:
error_ctx = {"query": query, "error_type": type(e).__name__}
self.logger.critical(f"[CRITICAL] Непредвиденная ошибка при получении списка дашбордов: {str(e)}", exc_info=True, extra=error_ctx)
raise SupersetAPIError(f"Непредвиденная ошибка: {str(e)}", context=error_ctx) from e
def get_dashboard(self, dashboard_id_or_slug: str) -> dict:
"""[CONTRACT] Получение метаданных дашборда по ID или SLUG.
@pre:
@@ -184,6 +227,91 @@ class SupersetClient:
self.logger.critical(f"[CRITICAL] Непредвиденная ошибка при получении дашборда '{dashboard_id_or_slug}': {str(e)}", exc_info=True)
raise SupersetAPIError(f"Непредвиденная ошибка: {str(e)}", context={"dashboard_id_or_slug": dashboard_id_or_slug}) from e
# [SECTION] API для получения списка датасетов или получения одного датасета
def get_datasets(self, query: Optional[Dict] = None) -> Tuple[int, List[Dict]]:
"""[CONTRACT] Получение списка датасетов с пагинацией.
@pre:
- Клиент должен быть авторизован.
- Параметры `query` (если предоставлены) должны быть валидны для API Superset.
@post:
- Возвращает кортеж: (общееоличествоатасетов, список_метаданныхатасетов).
- Обходит пагинацию для получения всех доступных датасетов.
@invariant:
- Всегда возвращает полный список (если `total_count` > 0).
@raise:
- `SupersetAPIError`: При ошибках API (например, неверный формат ответа).
- `NetworkError`: При проблемах с сетью.
- `ValueError`: При некорректных параметрах пагинации (внутренняя ошибка).
"""
self.logger.info("[INFO] Запрос списка всех датасетов")
try:
# Получаем общее количество датасетов
total_count = self._fetch_total_object_count(endpoint="/dataset/")
self.logger.info(f"[INFO] Обнаружено {total_count} датасетов в системе")
# Валидируем параметры запроса
base_query = {
"columns": ["id", "table_name", "sql", "database", "schema"],
"page": 0,
"page_size": 100
}
validated_query = {**base_query, **(query or {})}
# Получаем все страницы
datasets = self._fetch_all_pages(
endpoint="/dataset/",
query=validated_query,
total_count=total_count#,
#results_field="result"
)
self.logger.info(
f"[COHERENCE_CHECK_PASSED] Успешно получено {len(datasets)} датасетов"
)
return total_count, datasets
except Exception as e:
error_ctx = {"query": query, "error_type": type(e).__name__}
self.logger.error(
f"[ERROR] Ошибка получения списка датасетов: {str(e)}",
exc_info=True,
extra=error_ctx
)
raise
def get_dataset(self, dataset_id: str) -> dict:
"""[CONTRACT] Получение метаданных датасета по ID.
@pre:
- `dataset_id` должен быть строкой (ID или slug).
- Клиент должен быть аутентифицирован (токены актуальны).
@post:
- Возвращает `dict` с метаданными датасета.
@raise:
- `DashboardNotFoundError`: Если дашборд не найден (HTTP 404).
- `SupersetAPIError`: При других ошибках API.
- `NetworkError`: При проблемах с сетью.
"""
self.logger.info(f"[INFO] Запрос метаданных дашборда: {dataset_id}")
try:
response_data = self.network.request(
method="GET",
endpoint=f"/dataset/{dataset_id}",
# headers=self.headers # [REFACTORING_NOTE] APIClient теперь сам добавляет заголовки
)
# [POSTCONDITION] Проверка структуры ответа
if "result" not in response_data:
self.logger.warning("[CONTRACT_VIOLATION] Ответ API не содержит поле 'result'", extra={"response": response_data})
raise SupersetAPIError("Некорректный формат ответа API при получении дашборда")
self.logger.debug(f"[DEBUG] Метаданные дашборда '{dataset_id}' успешно получены.")
return response_data["result"]
except (DashboardNotFoundError, SupersetAPIError, NetworkError, PermissionDeniedError) as e:
self.logger.error(f"[ERROR] Не удалось получить дашборд '{dataset_id}': {str(e)}", exc_info=True, extra=getattr(e, 'context', {}))
raise # Перевыброс уже типизированной ошибки
except Exception as e:
self.logger.critical(f"[CRITICAL] Непредвиденная ошибка при получении дашборда '{dataset_id}': {str(e)}", exc_info=True)
raise SupersetAPIError(f"Непредвиденная ошибка: {str(e)}", context={"dashboard_id_or_slug": dataset_id}) from e
# [SECTION] EXPORT OPERATIONS
def export_dashboard(self, dashboard_id: int) -> Tuple[bytes, str]:
"""[CONTRACT] Экспорт дашборда в ZIP-архив.
@@ -341,49 +469,8 @@ class SupersetClient:
self.logger.critical(f"[CRITICAL] Непредвиденная ошибка при экспорте в файл: {str(e)}", exc_info=True, extra=error_ctx)
raise ExportError(f"Непредвиденная ошибка экспорта в файл: {str(e)}", context=error_ctx) from e
# [SECTION] API для получения списка дашбордов
def get_dashboards(self, query: Optional[Dict] = None) -> Tuple[int, List[Dict]]:
"""[CONTRACT] Получение списка дашбордов с пагинацией.
@pre:
- Клиент должен быть авторизован.
- Параметры `query` (если предоставлены) должны быть валидны для API Superset.
@post:
- Возвращает кортеж: (общееоличествоашбордов, список_метаданныхашбордов).
- Обходит пагинацию для получения всех доступных дашбордов.
@invariant:
- Всегда возвращает полный список (если `total_count` > 0).
@raise:
- `SupersetAPIError`: При ошибках API (например, неверный формат ответа).
- `NetworkError`: При проблемах с сетью.
- `ValueError`: При некорректных параметрах пагинации (внутренняя ошибка).
"""
self.logger.info("[INFO] Запрос списка всех дашбордов.")
# [COHERENCE_CHECK] Валидация и нормализация параметров запроса
validated_query = self._validate_query_params(query)
self.logger.debug("[DEBUG] Параметры запроса списка дашбордов после валидации.", extra={"validated_query": validated_query})
try:
# [ANCHOR] FETCH_TOTAL_COUNT
total_count = self._fetch_total_count()
self.logger.info(f"[INFO] Обнаружено {total_count} дашбордов в системе.")
# [ANCHOR] FETCH_ALL_PAGES
paginated_data = self._fetch_all_pages(validated_query, total_count)
self.logger.info(
f"[COHERENCE_CHECK_PASSED] Успешно получено {len(paginated_data)} дашбордов из {total_count}."
)
return total_count, paginated_data
except (SupersetAPIError, NetworkError, ValueError, PermissionDeniedError) as e:
self.logger.error(f"[ERROR] Ошибка при получении списка дашбордов: {str(e)}", exc_info=True, extra=getattr(e, 'context', {}))
raise
except Exception as e:
error_ctx = {"query": query, "error_type": type(e).__name__}
self.logger.critical(f"[CRITICAL] Непредвиденная ошибка при получении списка дашбордов: {str(e)}", exc_info=True, extra=error_ctx)
raise SupersetAPIError(f"Непредвиденная ошибка: {str(e)}", context=error_ctx) from e
# [SECTION] Импорт
# [SECTION] Импорт дашбордов
def import_dashboard(self, file_name: Union[str, Path]) -> Dict:
"""[CONTRACT] Импорт дашборда из ZIP-архива.
@pre:
@@ -451,8 +538,8 @@ class SupersetClient:
# [COHERENCE_CHECK_PASSED] Параметры запроса сформированы корректно.
return {**base_query, **(query or {})}
def _fetch_total_count(self) -> int:
"""[CONTRACT][HELPER] Получение общего количества дашбордов в системе.
def _fetch_total_object_count(self, endpoint:str) -> int:
"""[CONTRACT][HELPER] Получение общего количества объектов (дашбордов, датасетов, чартов, баз данных) в системе.
@delegates:
- Сетевой запрос к `APIClient.fetch_paginated_count`.
@pre:
@@ -471,7 +558,7 @@ class SupersetClient:
try:
# [REFACTORING_COMPLETE] Использование self.network.fetch_paginated_count
count = self.network.fetch_paginated_count(
endpoint="/dashboard/",
endpoint=endpoint,
query_params=query_params_for_count,
count_field="count"
)
@@ -485,13 +572,14 @@ class SupersetClient:
self.logger.critical(f"[CRITICAL] Непредвиденная ошибка при получении общего количества: {str(e)}", exc_info=True, extra=error_ctx)
raise SupersetAPIError(f"Непредвиденная ошибка при получении count: {str(e)}", context=error_ctx) from e
def _fetch_all_pages(self, query: Dict, total_count: int) -> List[Dict]:
def _fetch_all_pages(self, endpoint:str, query: Dict, total_count: int) -> List[Dict]:
"""[CONTRACT][HELPER] Обход всех страниц пагинированного API для получения всех данных.
@delegates:
- Сетевые запросы к `APIClient.fetch_paginated_data()`.
@pre:
- `query` должен содержать `page_size`.
- `total_count` должен быть корректным общим количеством элементов.
- `endpoint` должен содержать часть url запроса, например endpoint="/dashboard/".
@post:
- Возвращает список всех элементов, собранных со всех страниц.
@raise:
@@ -506,7 +594,7 @@ class SupersetClient:
# [REFACTORING_COMPLETE] Использование self.network.fetch_paginated_data
all_data = self.network.fetch_paginated_data(
endpoint="/dashboard/",
endpoint=endpoint,
base_query=query,
total_count=total_count,
results_field="result"

View File

@@ -17,6 +17,7 @@ from typing import Any, Optional, Tuple, Dict, List, Literal, Union, BinaryIO, L
from collections import defaultdict
from datetime import date
import glob
import filecmp
from contextlib import contextmanager
# [IMPORTS] Third-party