mapper + lint

This commit is contained in:
2025-10-06 18:49:40 +03:00
parent b550cb38ff
commit 74b7779e45
18 changed files with 4512 additions and 2250 deletions

131
dataset_mapper.py Normal file
View File

@@ -0,0 +1,131 @@
# <GRACE_MODULE id="dataset_mapper" name="dataset_mapper.py">
# @SEMANTICS: dataset, mapping, postgresql, xlsx, superset
# @PURPOSE: Этот модуль отвечает за обновление метаданных (verbose_map) в датасетах Superset, извлекая их из PostgreSQL или XLSX-файлов.
# @DEPENDS_ON: superset_tool.client -> Использует SupersetClient для взаимодействия с API.
# @DEPENDS_ON: pandas -> для чтения XLSX-файлов.
# @DEPENDS_ON: psycopg2 -> для подключения к PostgreSQL.
# <IMPORTS>
import pandas as pd
import psycopg2
from superset_tool.client import SupersetClient
from superset_tool.utils.init_clients import setup_clients
from superset_tool.utils.logger import SupersetLogger
from typing import Dict, List, Optional, Any
# </IMPORTS>
# --- Начало кода модуля ---
# <ANCHOR id="DatasetMapper" type="Class">
# @PURPOSE: Класс для меппинга и обновления verbose_map в датасетах Superset.
class DatasetMapper:
def __init__(self, logger: SupersetLogger):
self.logger = logger
# <ANCHOR id="DatasetMapper.get_postgres_comments" type="Function">
# @PURPOSE: Извлекает комментарии к колонкам из системного каталога PostgreSQL.
# @PRE: `db_config` должен содержать валидные креды для подключения к PostgreSQL.
# @PRE: `table_name` и `table_schema` должны быть строками.
# @POST: Возвращается словарь с меппингом `column_name` -> `column_comment`.
# @PARAM: db_config: Dict - Конфигурация для подключения к БД.
# @PARAM: table_name: str - Имя таблицы.
# @PARAM: table_schema: str - Схема таблицы.
# @RETURN: Dict[str, str] - Словарь с комментариями к колонкам.
# @THROW: Exception - При ошибках подключения или выполнения запроса к БД.
def get_postgres_comments(self, db_config: Dict, table_name: str, table_schema: str) -> Dict[str, str]:
self.logger.info("[get_postgres_comments][Enter] Fetching comments from PostgreSQL for %s.%s.", table_schema, table_name)
query = f"""
SELECT cols.column_name, pg_catalog.col_description(c.oid, cols.ordinal_position::int) AS column_comment
FROM information_schema.columns cols
JOIN pg_catalog.pg_class c ON c.relname = cols.table_name
JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace AND n.nspname = cols.table_schema
WHERE cols.table_catalog = '{db_config.get('dbname')}' AND cols.table_name = '{table_name}' AND cols.table_schema = '{table_schema}';
"""
comments = {}
try:
with psycopg2.connect(**db_config) as conn, conn.cursor() as cursor:
cursor.execute(query)
for row in cursor.fetchall():
if row[1]:
comments[row[0]] = row[1]
self.logger.info("[get_postgres_comments][Success] Fetched %d comments.", len(comments))
except Exception as e:
self.logger.error("[get_postgres_comments][Failure] %s", e, exc_info=True)
raise
return comments
# </ANCHOR id="DatasetMapper.get_postgres_comments">
# <ANCHOR id="DatasetMapper.load_excel_mappings" type="Function">
# @PURPOSE: Загружает меппинги 'column_name' -> 'column_comment' из XLSX файла.
# @PRE: `file_path` должен быть валидным путем к XLSX файлу с колонками 'column_name' и 'column_comment'.
# @POST: Возвращается словарь с меппингами.
# @PARAM: file_path: str - Путь к XLSX файлу.
# @RETURN: Dict[str, str] - Словарь с меппингами.
# @THROW: Exception - При ошибках чтения файла или парсинга.
def load_excel_mappings(self, file_path: str) -> Dict[str, str]:
self.logger.info("[load_excel_mappings][Enter] Loading mappings from %s.", file_path)
try:
df = pd.read_excel(file_path)
mappings = df.set_index('column_name')['column_comment'].to_dict()
self.logger.info("[load_excel_mappings][Success] Loaded %d mappings.", len(mappings))
return mappings
except Exception as e:
self.logger.error("[load_excel_mappings][Failure] %s", e, exc_info=True)
raise
# </ANCHOR id="DatasetMapper.load_excel_mappings">
# <ANCHOR id="DatasetMapper.run_mapping" type="Function">
# @PURPOSE: Основная функция для выполнения меппинга и обновления verbose_map датасета в Superset.
# @PARAM: superset_client: SupersetClient - Клиент Superset.
# @PARAM: dataset_id: int - ID датасета для обновления.
# @PARAM: source: str - Источник данных ('postgres', 'excel', 'both').
# @PARAM: postgres_config: Optional[Dict] - Конфигурация для подключения к PostgreSQL.
# @PARAM: excel_path: Optional[str] - Путь к XLSX файлу.
# @PARAM: table_name: Optional[str] - Имя таблицы в PostgreSQL.
# @PARAM: table_schema: Optional[str] - Схема таблицы в PostgreSQL.
# @RELATION: CALLS -> self.get_postgres_comments
# @RELATION: CALLS -> self.load_excel_mappings
# @RELATION: CALLS -> superset_client.get_dataset
# @RELATION: CALLS -> superset_client.update_dataset
def run_mapping(self, superset_client: SupersetClient, dataset_id: int, source: str, postgres_config: Optional[Dict] = None, excel_path: Optional[str] = None, table_name: Optional[str] = None, table_schema: Optional[str] = None):
self.logger.info("[run_mapping][Enter] Starting dataset mapping for ID %d from source '%s'.", dataset_id, source)
mappings: Dict[str, str] = {}
try:
if source in ['postgres', 'both']:
assert postgres_config and table_name and table_schema, "Postgres config is required."
mappings.update(self.get_postgres_comments(postgres_config, table_name, table_schema))
if source in ['excel', 'both']:
assert excel_path, "Excel path is required."
mappings.update(self.load_excel_mappings(excel_path))
if source not in ['postgres', 'excel', 'both']:
self.logger.error("[run_mapping][Failure] Invalid source: %s.", source)
return
dataset_response = superset_client.get_dataset(dataset_id)
dataset_data = dataset_response['result']
original_verbose_map = dataset_data.get('verbose_map', {}).copy()
new_verbose_map = original_verbose_map.copy()
for column in dataset_data.get('columns', []):
column_name = column.get('column_name')
if column_name in mappings:
new_verbose_map[column_name] = mappings[column_name]
if original_verbose_map != new_verbose_map:
dataset_data['verbose_map'] = new_verbose_map
superset_client.update_dataset(dataset_id, {'verbose_map': new_verbose_map})
self.logger.info("[run_mapping][Success] Dataset %d verbose_map updated.", dataset_id)
else:
self.logger.info("[run_mapping][State] No changes in verbose_map, skipping update.")
except (AssertionError, FileNotFoundError, Exception) as e:
self.logger.error("[run_mapping][Failure] %s", e, exc_info=True)
return
# </ANCHOR id="DatasetMapper.run_mapping">
# </ANCHOR id="DatasetMapper">
# --- Конец кода модуля ---
# </GRACE_MODULE id="dataset_mapper">