diff --git a/superset_tool/utils/dataset_mapper.py b/superset_tool/utils/dataset_mapper.py new file mode 100644 index 0000000..8973aef --- /dev/null +++ b/superset_tool/utils/dataset_mapper.py @@ -0,0 +1,230 @@ +# +# @SEMANTICS: dataset, mapping, postgresql, xlsx, superset +# @PURPOSE: Этот модуль отвечает за обновление метаданных (verbose_map) в датасетах Superset, извлекая их из PostgreSQL или XLSX-файлов. +# @DEPENDS_ON: superset_tool.client -> Использует SupersetClient для взаимодействия с API. +# @DEPENDS_ON: pandas -> для чтения XLSX-файлов. +# @DEPENDS_ON: psycopg2 -> для подключения к PostgreSQL. + +# +import pandas as pd +import psycopg2 +from superset_tool.client import SupersetClient +from superset_tool.utils.init_clients import setup_clients +from superset_tool.utils.logger import SupersetLogger +from typing import Dict, List, Optional, Any +# + +# --- Начало кода модуля --- + +# +# @PURPOSE: Класс для меппинга и обновления verbose_map в датасетах Superset. +class DatasetMapper: + def __init__(self, logger: SupersetLogger): + self.logger = logger + + # + # @PURPOSE: Извлекает комментарии к колонкам из системного каталога PostgreSQL. + # @PRE: `db_config` должен содержать валидные креды для подключения к PostgreSQL. + # @PRE: `table_name` и `table_schema` должны быть строками. + # @POST: Возвращается словарь с меппингом `column_name` -> `column_comment`. + # @PARAM: db_config: Dict - Конфигурация для подключения к БД. + # @PARAM: table_name: str - Имя таблицы. + # @PARAM: table_schema: str - Схема таблицы. + # @RETURN: Dict[str, str] - Словарь с комментариями к колонкам. + # @THROW: Exception - При ошибках подключения или выполнения запроса к БД. + def get_postgres_comments(self, db_config: Dict, table_name: str, table_schema: str) -> Dict[str, str]: + self.logger.info("[get_postgres_comments][Enter] Fetching comments from PostgreSQL for %s.%s.", table_schema, table_name) + query = f""" + SELECT + cols.column_name, + CASE + WHEN pg_catalog.col_description( + (SELECT c.oid + FROM pg_catalog.pg_class c + JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace + WHERE c.relname = cols.table_name + AND n.nspname = cols.table_schema), + cols.ordinal_position::int + ) LIKE '%|%' THEN + split_part( + pg_catalog.col_description( + (SELECT c.oid + FROM pg_catalog.pg_class c + JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace + WHERE c.relname = cols.table_name + AND n.nspname = cols.table_schema), + cols.ordinal_position::int + ), + '|', + 1 + ) + ELSE + pg_catalog.col_description( + (SELECT c.oid + FROM pg_catalog.pg_class c + JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace + WHERE c.relname = cols.table_name + AND n.nspname = cols.table_schema), + cols.ordinal_position::int + ) + END AS column_comment + FROM + information_schema.columns cols + WHERE cols.table_catalog = '{db_config.get('dbname')}' AND cols.table_name = '{table_name}' AND cols.table_schema = '{table_schema}'; + """ + comments = {} + try: + with psycopg2.connect(**db_config) as conn, conn.cursor() as cursor: + cursor.execute(query) + for row in cursor.fetchall(): + if row[1]: + comments[row[0]] = row[1] + self.logger.info("[get_postgres_comments][Success] Fetched %d comments.", len(comments)) + except Exception as e: + self.logger.error("[get_postgres_comments][Failure] %s", e, exc_info=True) + raise + return comments + # + + # + # @PURPOSE: Загружает меппинги 'column_name' -> 'column_comment' из XLSX файла. + # @PRE: `file_path` должен быть валидным путем к XLSX файлу с колонками 'column_name' и 'column_comment'. + # @POST: Возвращается словарь с меппингами. + # @PARAM: file_path: str - Путь к XLSX файлу. + # @RETURN: Dict[str, str] - Словарь с меппингами. + # @THROW: Exception - При ошибках чтения файла или парсинга. + def load_excel_mappings(self, file_path: str) -> Dict[str, str]: + self.logger.info("[load_excel_mappings][Enter] Loading mappings from %s.", file_path) + try: + df = pd.read_excel(file_path) + mappings = df.set_index('column_name')['column_comment'].to_dict() + self.logger.info("[load_excel_mappings][Success] Loaded %d mappings.", len(mappings)) + return mappings + except Exception as e: + self.logger.error("[load_excel_mappings][Failure] %s", e, exc_info=True) + raise + # + + # + # @PURPOSE: Основная функция для выполнения меппинга и обновления verbose_map датасета в Superset. + # @PARAM: superset_client: SupersetClient - Клиент Superset. + # @PARAM: dataset_id: int - ID датасета для обновления. + # @PARAM: source: str - Источник данных ('postgres', 'excel', 'both'). + # @PARAM: postgres_config: Optional[Dict] - Конфигурация для подключения к PostgreSQL. + # @PARAM: excel_path: Optional[str] - Путь к XLSX файлу. + # @PARAM: table_name: Optional[str] - Имя таблицы в PostgreSQL. + # @PARAM: table_schema: Optional[str] - Схема таблицы в PostgreSQL. + # @RELATION: CALLS -> self.get_postgres_comments + # @RELATION: CALLS -> self.load_excel_mappings + # @RELATION: CALLS -> superset_client.get_dataset + # @RELATION: CALLS -> superset_client.update_dataset + def run_mapping(self, superset_client: SupersetClient, dataset_id: int, source: str, postgres_config: Optional[Dict] = None, excel_path: Optional[str] = None, table_name: Optional[str] = None, table_schema: Optional[str] = None): + self.logger.info("[run_mapping][Enter] Starting dataset mapping for ID %d from source '%s'.", dataset_id, source) + mappings: Dict[str, str] = {} + + try: + if source in ['postgres', 'both']: + assert postgres_config and table_name and table_schema, "Postgres config is required." + mappings.update(self.get_postgres_comments(postgres_config, table_name, table_schema)) + if source in ['excel', 'both']: + assert excel_path, "Excel path is required." + mappings.update(self.load_excel_mappings(excel_path)) + if source not in ['postgres', 'excel', 'both']: + self.logger.error("[run_mapping][Failure] Invalid source: %s.", source) + return + + dataset_response = superset_client.get_dataset(dataset_id) + dataset_data = dataset_response['result'] + + original_columns = dataset_data.get('columns', []) + updated_columns = [] + changes_made = False + + for column in original_columns: + col_name = column.get('column_name') + + new_column = { + "column_name": col_name, + "id": column.get("id"), + "advanced_data_type": column.get("advanced_data_type"), + "description": column.get("description"), + "expression": column.get("expression"), + "extra": column.get("extra"), + "filterable": column.get("filterable"), + "groupby": column.get("groupby"), + "is_active": column.get("is_active"), + "is_dttm": column.get("is_dttm"), + "python_date_format": column.get("python_date_format"), + "type": column.get("type"), + "uuid": column.get("uuid"), + "verbose_name": column.get("verbose_name"), + } + + new_column = {k: v for k, v in new_column.items() if v is not None} + + if col_name in mappings: + mapping_value = mappings[col_name] + if isinstance(mapping_value, str) and new_column.get('verbose_name') != mapping_value: + new_column['verbose_name'] = mapping_value + changes_made = True + + updated_columns.append(new_column) + + updated_metrics = [] + for metric in dataset_data.get("metrics", []): + new_metric = { + "id": metric.get("id"), + "metric_name": metric.get("metric_name"), + "expression": metric.get("expression"), + "verbose_name": metric.get("verbose_name"), + "description": metric.get("description"), + "d3format": metric.get("d3format"), + "currency": metric.get("currency"), + "extra": metric.get("extra"), + "warning_text": metric.get("warning_text"), + "metric_type": metric.get("metric_type"), + "uuid": metric.get("uuid"), + } + updated_metrics.append({k: v for k, v in new_metric.items() if v is not None}) + + if changes_made: + payload_for_update = { + "database_id": dataset_data.get("database", {}).get("id"), + "table_name": dataset_data.get("table_name"), + "schema": dataset_data.get("schema"), + "columns": updated_columns, + "owners": [owner["id"] for owner in dataset_data.get("owners", [])], + "metrics": updated_metrics, + "extra": dataset_data.get("extra"), + "description": dataset_data.get("description"), + "sql": dataset_data.get("sql"), + "cache_timeout": dataset_data.get("cache_timeout"), + "catalog": dataset_data.get("catalog"), + "default_endpoint": dataset_data.get("default_endpoint"), + "external_url": dataset_data.get("external_url"), + "fetch_values_predicate": dataset_data.get("fetch_values_predicate"), + "filter_select_enabled": dataset_data.get("filter_select_enabled"), + "is_managed_externally": dataset_data.get("is_managed_externally"), + "is_sqllab_view": dataset_data.get("is_sqllab_view"), + "main_dttm_col": dataset_data.get("main_dttm_col"), + "normalize_columns": dataset_data.get("normalize_columns"), + "offset": dataset_data.get("offset"), + "template_params": dataset_data.get("template_params"), + } + + payload_for_update = {k: v for k, v in payload_for_update.items() if v is not None} + + superset_client.update_dataset(dataset_id, payload_for_update) + self.logger.info("[run_mapping][Success] Dataset %d columns' verbose_name updated.", dataset_id) + else: + self.logger.info("[run_mapping][State] No changes in columns' verbose_name, skipping update.") + + except (AssertionError, FileNotFoundError, Exception) as e: + self.logger.error("[run_mapping][Failure] %s", e, exc_info=True) + return + # +# + +# --- Конец кода модуля --- + +# \ No newline at end of file