# [DEF:superset_tool.utils.dataset_mapper:Module] # # @SEMANTICS: dataset, mapping, postgresql, xlsx, superset # @PURPOSE: Этот модуль отвечает за обновление метаданных (verbose_map) в датасетах Superset, извлекая их из PostgreSQL или XLSX-файлов. # @LAYER: Domain # @RELATION: DEPENDS_ON -> superset_tool.client # @RELATION: DEPENDS_ON -> pandas # @RELATION: DEPENDS_ON -> psycopg2 # @PUBLIC_API: DatasetMapper # [SECTION: IMPORTS] import pandas as pd # type: ignore import psycopg2 # type: ignore from superset_tool.client import SupersetClient from superset_tool.utils.init_clients import setup_clients from superset_tool.utils.logger import SupersetLogger from typing import Dict, List, Optional, Any # [/SECTION] # [DEF:DatasetMapper:Class] # @PURPOSE: Класс для меппинга и обновления verbose_map в датасетах Superset. class DatasetMapper: def __init__(self, logger: SupersetLogger): self.logger = logger # [DEF:DatasetMapper.get_postgres_comments:Function] # @PURPOSE: Извлекает комментарии к колонкам из системного каталога PostgreSQL. # @PRE: `db_config` должен содержать валидные креды для подключения к PostgreSQL. # @PRE: `table_name` и `table_schema` должны быть строками. # @POST: Возвращается словарь с меппингом `column_name` -> `column_comment`. # @THROW: Exception - При ошибках подключения или выполнения запроса к БД. # @PARAM: db_config (Dict) - Конфигурация для подключения к БД. # @PARAM: table_name (str) - Имя таблицы. # @PARAM: table_schema (str) - Схема таблицы. # @RETURN: Dict[str, str] - Словарь с комментариями к колонкам. def get_postgres_comments(self, db_config: Dict, table_name: str, table_schema: str) -> Dict[str, str]: self.logger.info("[get_postgres_comments][Enter] Fetching comments from PostgreSQL for %s.%s.", table_schema, table_name) query = f""" SELECT cols.column_name, CASE WHEN pg_catalog.col_description( (SELECT c.oid FROM pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace WHERE c.relname = cols.table_name AND n.nspname = cols.table_schema), cols.ordinal_position::int ) LIKE '%|%' THEN split_part( pg_catalog.col_description( (SELECT c.oid FROM pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace WHERE c.relname = cols.table_name AND n.nspname = cols.table_schema), cols.ordinal_position::int ), '|', 1 ) ELSE pg_catalog.col_description( (SELECT c.oid FROM pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace WHERE c.relname = cols.table_name AND n.nspname = cols.table_schema), cols.ordinal_position::int ) END AS column_comment FROM information_schema.columns cols WHERE cols.table_catalog = '{db_config.get('dbname')}' AND cols.table_name = '{table_name}' AND cols.table_schema = '{table_schema}'; """ comments = {} try: with psycopg2.connect(**db_config) as conn, conn.cursor() as cursor: cursor.execute(query) for row in cursor.fetchall(): if row[1]: comments[row[0]] = row[1] self.logger.info("[get_postgres_comments][Success] Fetched %d comments.", len(comments)) except Exception as e: self.logger.error("[get_postgres_comments][Failure] %s", e, exc_info=True) raise return comments # [/DEF:DatasetMapper.get_postgres_comments] # [DEF:DatasetMapper.load_excel_mappings:Function] # @PURPOSE: Загружает меппинги 'column_name' -> 'column_comment' из XLSX файла. # @PRE: `file_path` должен быть валидным путем к XLSX файлу с колонками 'column_name' и 'column_comment'. # @POST: Возвращается словарь с меппингами. # @THROW: Exception - При ошибках чтения файла или парсинга. # @PARAM: file_path (str) - Путь к XLSX файлу. # @RETURN: Dict[str, str] - Словарь с меппингами. def load_excel_mappings(self, file_path: str) -> Dict[str, str]: self.logger.info("[load_excel_mappings][Enter] Loading mappings from %s.", file_path) try: df = pd.read_excel(file_path) mappings = df.set_index('column_name')['verbose_name'].to_dict() self.logger.info("[load_excel_mappings][Success] Loaded %d mappings.", len(mappings)) return mappings except Exception as e: self.logger.error("[load_excel_mappings][Failure] %s", e, exc_info=True) raise # [/DEF:DatasetMapper.load_excel_mappings] # [DEF:DatasetMapper.run_mapping:Function] # @PURPOSE: Основная функция для выполнения меппинга и обновления verbose_map датасета в Superset. # @RELATION: CALLS -> self.get_postgres_comments # @RELATION: CALLS -> self.load_excel_mappings # @RELATION: CALLS -> superset_client.get_dataset # @RELATION: CALLS -> superset_client.update_dataset # @PARAM: superset_client (SupersetClient) - Клиент Superset. # @PARAM: dataset_id (int) - ID датасета для обновления. # @PARAM: source (str) - Источник данных ('postgres', 'excel', 'both'). # @PARAM: postgres_config (Optional[Dict]) - Конфигурация для подключения к PostgreSQL. # @PARAM: excel_path (Optional[str]) - Путь к XLSX файлу. # @PARAM: table_name (Optional[str]) - Имя таблицы в PostgreSQL. # @PARAM: table_schema (Optional[str]) - Схема таблицы в PostgreSQL. def run_mapping(self, superset_client: SupersetClient, dataset_id: int, source: str, postgres_config: Optional[Dict] = None, excel_path: Optional[str] = None, table_name: Optional[str] = None, table_schema: Optional[str] = None): self.logger.info("[run_mapping][Enter] Starting dataset mapping for ID %d from source '%s'.", dataset_id, source) mappings: Dict[str, str] = {} try: if source in ['postgres', 'both']: assert postgres_config and table_name and table_schema, "Postgres config is required." mappings.update(self.get_postgres_comments(postgres_config, table_name, table_schema)) if source in ['excel', 'both']: assert excel_path, "Excel path is required." mappings.update(self.load_excel_mappings(excel_path)) if source not in ['postgres', 'excel', 'both']: self.logger.error("[run_mapping][Failure] Invalid source: %s.", source) return dataset_response = superset_client.get_dataset(dataset_id) dataset_data = dataset_response['result'] original_columns = dataset_data.get('columns', []) updated_columns = [] changes_made = False for column in original_columns: col_name = column.get('column_name') new_column = { "column_name": col_name, "id": column.get("id"), "advanced_data_type": column.get("advanced_data_type"), "description": column.get("description"), "expression": column.get("expression"), "extra": column.get("extra"), "filterable": column.get("filterable"), "groupby": column.get("groupby"), "is_active": column.get("is_active"), "is_dttm": column.get("is_dttm"), "python_date_format": column.get("python_date_format"), "type": column.get("type"), "uuid": column.get("uuid"), "verbose_name": column.get("verbose_name"), } new_column = {k: v for k, v in new_column.items() if v is not None} if col_name in mappings: mapping_value = mappings[col_name] if isinstance(mapping_value, str) and new_column.get('verbose_name') != mapping_value: new_column['verbose_name'] = mapping_value changes_made = True updated_columns.append(new_column) updated_metrics = [] for metric in dataset_data.get("metrics", []): new_metric = { "id": metric.get("id"), "metric_name": metric.get("metric_name"), "expression": metric.get("expression"), "verbose_name": metric.get("verbose_name"), "description": metric.get("description"), "d3format": metric.get("d3format"), "currency": metric.get("currency"), "extra": metric.get("extra"), "warning_text": metric.get("warning_text"), "metric_type": metric.get("metric_type"), "uuid": metric.get("uuid"), } updated_metrics.append({k: v for k, v in new_metric.items() if v is not None}) if changes_made: payload_for_update = { "database_id": dataset_data.get("database", {}).get("id"), "table_name": dataset_data.get("table_name"), "schema": dataset_data.get("schema"), "columns": updated_columns, "owners": [owner["id"] for owner in dataset_data.get("owners", [])], "metrics": updated_metrics, "extra": dataset_data.get("extra"), "description": dataset_data.get("description"), "sql": dataset_data.get("sql"), "cache_timeout": dataset_data.get("cache_timeout"), "catalog": dataset_data.get("catalog"), "default_endpoint": dataset_data.get("default_endpoint"), "external_url": dataset_data.get("external_url"), "fetch_values_predicate": dataset_data.get("fetch_values_predicate"), "filter_select_enabled": dataset_data.get("filter_select_enabled"), "is_managed_externally": dataset_data.get("is_managed_externally"), "is_sqllab_view": dataset_data.get("is_sqllab_view"), "main_dttm_col": dataset_data.get("main_dttm_col"), "normalize_columns": dataset_data.get("normalize_columns"), "offset": dataset_data.get("offset"), "template_params": dataset_data.get("template_params"), } payload_for_update = {k: v for k, v in payload_for_update.items() if v is not None} superset_client.update_dataset(dataset_id, payload_for_update) self.logger.info("[run_mapping][Success] Dataset %d columns' verbose_name updated.", dataset_id) else: self.logger.info("[run_mapping][State] No changes in columns' verbose_name, skipping update.") except (AssertionError, FileNotFoundError, Exception) as e: self.logger.error("[run_mapping][Failure] %s", e, exc_info=True) return # [/DEF:DatasetMapper.run_mapping] # [/DEF:DatasetMapper] # [/DEF:superset_tool.utils.dataset_mapper]