From 373ed59dce846b59533f9fa904f7eb46fc34dc84 Mon Sep 17 00:00:00 2001 From: Volobuev Andrey Date: Tue, 7 Oct 2025 14:33:54 +0300 Subject: [PATCH] mapper --- dataset_mapper.py | 131 --------------------------------------- get_dataset_structure.py | 69 +++++++++++++++++++++ run_mapper.py | 2 +- 3 files changed, 70 insertions(+), 132 deletions(-) delete mode 100644 dataset_mapper.py create mode 100644 get_dataset_structure.py diff --git a/dataset_mapper.py b/dataset_mapper.py deleted file mode 100644 index 09cee40..0000000 --- a/dataset_mapper.py +++ /dev/null @@ -1,131 +0,0 @@ -# -# @SEMANTICS: dataset, mapping, postgresql, xlsx, superset -# @PURPOSE: Этот модуль отвечает за обновление метаданных (verbose_map) в датасетах Superset, извлекая их из PostgreSQL или XLSX-файлов. -# @DEPENDS_ON: superset_tool.client -> Использует SupersetClient для взаимодействия с API. -# @DEPENDS_ON: pandas -> для чтения XLSX-файлов. -# @DEPENDS_ON: psycopg2 -> для подключения к PostgreSQL. - -# -import pandas as pd -import psycopg2 -from superset_tool.client import SupersetClient -from superset_tool.utils.init_clients import setup_clients -from superset_tool.utils.logger import SupersetLogger -from typing import Dict, List, Optional, Any -# - -# --- Начало кода модуля --- - -# -# @PURPOSE: Класс для меппинга и обновления verbose_map в датасетах Superset. -class DatasetMapper: - def __init__(self, logger: SupersetLogger): - self.logger = logger - - # - # @PURPOSE: Извлекает комментарии к колонкам из системного каталога PostgreSQL. - # @PRE: `db_config` должен содержать валидные креды для подключения к PostgreSQL. - # @PRE: `table_name` и `table_schema` должны быть строками. - # @POST: Возвращается словарь с меппингом `column_name` -> `column_comment`. - # @PARAM: db_config: Dict - Конфигурация для подключения к БД. - # @PARAM: table_name: str - Имя таблицы. - # @PARAM: table_schema: str - Схема таблицы. - # @RETURN: Dict[str, str] - Словарь с комментариями к колонкам. - # @THROW: Exception - При ошибках подключения или выполнения запроса к БД. - def get_postgres_comments(self, db_config: Dict, table_name: str, table_schema: str) -> Dict[str, str]: - self.logger.info("[get_postgres_comments][Enter] Fetching comments from PostgreSQL for %s.%s.", table_schema, table_name) - query = f""" - SELECT cols.column_name, pg_catalog.col_description(c.oid, cols.ordinal_position::int) AS column_comment - FROM information_schema.columns cols - JOIN pg_catalog.pg_class c ON c.relname = cols.table_name - JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace AND n.nspname = cols.table_schema - WHERE cols.table_catalog = '{db_config.get('dbname')}' AND cols.table_name = '{table_name}' AND cols.table_schema = '{table_schema}'; - """ - comments = {} - try: - with psycopg2.connect(**db_config) as conn, conn.cursor() as cursor: - cursor.execute(query) - for row in cursor.fetchall(): - if row[1]: - comments[row[0]] = row[1] - self.logger.info("[get_postgres_comments][Success] Fetched %d comments.", len(comments)) - except Exception as e: - self.logger.error("[get_postgres_comments][Failure] %s", e, exc_info=True) - raise - return comments - # - - # - # @PURPOSE: Загружает меппинги 'column_name' -> 'column_comment' из XLSX файла. - # @PRE: `file_path` должен быть валидным путем к XLSX файлу с колонками 'column_name' и 'column_comment'. - # @POST: Возвращается словарь с меппингами. - # @PARAM: file_path: str - Путь к XLSX файлу. - # @RETURN: Dict[str, str] - Словарь с меппингами. - # @THROW: Exception - При ошибках чтения файла или парсинга. - def load_excel_mappings(self, file_path: str) -> Dict[str, str]: - self.logger.info("[load_excel_mappings][Enter] Loading mappings from %s.", file_path) - try: - df = pd.read_excel(file_path) - mappings = df.set_index('column_name')['column_comment'].to_dict() - self.logger.info("[load_excel_mappings][Success] Loaded %d mappings.", len(mappings)) - return mappings - except Exception as e: - self.logger.error("[load_excel_mappings][Failure] %s", e, exc_info=True) - raise - # - - # - # @PURPOSE: Основная функция для выполнения меппинга и обновления verbose_map датасета в Superset. - # @PARAM: superset_client: SupersetClient - Клиент Superset. - # @PARAM: dataset_id: int - ID датасета для обновления. - # @PARAM: source: str - Источник данных ('postgres', 'excel', 'both'). - # @PARAM: postgres_config: Optional[Dict] - Конфигурация для подключения к PostgreSQL. - # @PARAM: excel_path: Optional[str] - Путь к XLSX файлу. - # @PARAM: table_name: Optional[str] - Имя таблицы в PostgreSQL. - # @PARAM: table_schema: Optional[str] - Схема таблицы в PostgreSQL. - # @RELATION: CALLS -> self.get_postgres_comments - # @RELATION: CALLS -> self.load_excel_mappings - # @RELATION: CALLS -> superset_client.get_dataset - # @RELATION: CALLS -> superset_client.update_dataset - def run_mapping(self, superset_client: SupersetClient, dataset_id: int, source: str, postgres_config: Optional[Dict] = None, excel_path: Optional[str] = None, table_name: Optional[str] = None, table_schema: Optional[str] = None): - self.logger.info("[run_mapping][Enter] Starting dataset mapping for ID %d from source '%s'.", dataset_id, source) - mappings: Dict[str, str] = {} - - try: - if source in ['postgres', 'both']: - assert postgres_config and table_name and table_schema, "Postgres config is required." - mappings.update(self.get_postgres_comments(postgres_config, table_name, table_schema)) - if source in ['excel', 'both']: - assert excel_path, "Excel path is required." - mappings.update(self.load_excel_mappings(excel_path)) - if source not in ['postgres', 'excel', 'both']: - self.logger.error("[run_mapping][Failure] Invalid source: %s.", source) - return - - dataset_response = superset_client.get_dataset(dataset_id) - dataset_data = dataset_response['result'] - - original_verbose_map = dataset_data.get('verbose_map', {}).copy() - new_verbose_map = original_verbose_map.copy() - - for column in dataset_data.get('columns', []): - column_name = column.get('column_name') - if column_name in mappings: - new_verbose_map[column_name] = mappings[column_name] - - if original_verbose_map != new_verbose_map: - dataset_data['verbose_map'] = new_verbose_map - superset_client.update_dataset(dataset_id, {'verbose_map': new_verbose_map}) - self.logger.info("[run_mapping][Success] Dataset %d verbose_map updated.", dataset_id) - else: - self.logger.info("[run_mapping][State] No changes in verbose_map, skipping update.") - - except (AssertionError, FileNotFoundError, Exception) as e: - self.logger.error("[run_mapping][Failure] %s", e, exc_info=True) - return - # -# - -# --- Конец кода модуля --- - -# \ No newline at end of file diff --git a/get_dataset_structure.py b/get_dataset_structure.py new file mode 100644 index 0000000..4c17045 --- /dev/null +++ b/get_dataset_structure.py @@ -0,0 +1,69 @@ +# +# @SEMANTICS: superset, dataset, structure, debug, json +# @PURPOSE: Этот модуль предназначен для получения и сохранения структуры данных датасета из Superset. Он используется для отладки и анализа данных, возвращаемых API. +# @DEPENDS_ON: superset_tool.client -> Использует SupersetClient для взаимодействия с API. +# @DEPENDS_ON: superset_tool.utils.init_clients -> Для инициализации клиентов Superset. +# @DEPENDS_ON: superset_tool.utils.logger -> Для логирования. + +# +import argparse +import json +from superset_tool.utils.init_clients import setup_clients +from superset_tool.utils.logger import SupersetLogger +# + +# --- Начало кода модуля --- + +# +# @PURPOSE: Получает структуру датасета из Superset и сохраняет ее в JSON-файл. +# @PARAM: env: str - Среда (dev, prod, и т.д.) для подключения. +# @PARAM: dataset_id: int - ID датасета для получения. +# @PARAM: output_path: str - Путь для сохранения JSON-файла. +# @RELATION: CALLS -> setup_clients +# @RELATION: CALLS -> superset_client.get_dataset +def get_and_save_dataset(env: str, dataset_id: int, output_path: str): + """ + Получает структуру датасета и сохраняет в файл. + """ + logger = SupersetLogger(name="DatasetStructureRetriever") + logger.info("[get_and_save_dataset][Enter] Starting to fetch dataset structure for ID %d from env '%s'.", dataset_id, env) + + try: + clients = setup_clients(logger=logger) + superset_client = clients.get(env) + if not superset_client: + logger.error("[get_and_save_dataset][Failure] Environment '%s' not found.", env) + return + + dataset_response = superset_client.get_dataset(dataset_id) + dataset_data = dataset_response.get('result') + + if not dataset_data: + logger.error("[get_and_save_dataset][Failure] No result in dataset response.") + return + + with open(output_path, 'w', encoding='utf-8') as f: + json.dump(dataset_data, f, ensure_ascii=False, indent=4) + + logger.info("[get_and_save_dataset][Success] Dataset structure saved to %s.", output_path) + + except Exception as e: + logger.error("[get_and_save_dataset][Failure] An error occurred: %s", e, exc_info=True) + +# + +# +# @PURPOSE: Точка входа для CLI. Парсит аргументы и запускает получение структуры датасета. +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Получение структуры датасета из Superset.") + parser.add_argument("--dataset-id", required=True, type=int, help="ID датасета.") + parser.add_argument("--env", required=True, help="Среда для подключения (например, dev).") + parser.add_argument("--output-path", default="dataset_structure.json", help="Путь для сохранения JSON-файла.") + args = parser.parse_args() + + get_and_save_dataset(args.env, args.dataset_id, args.output_path) +# + +# --- Конец кода модуля --- + +# \ No newline at end of file diff --git a/run_mapper.py b/run_mapper.py index fbf9ebb..99d405a 100644 --- a/run_mapper.py +++ b/run_mapper.py @@ -8,7 +8,7 @@ import argparse from superset_tool.utils.init_clients import setup_clients from superset_tool.utils.logger import SupersetLogger -from dataset_mapper import DatasetMapper +from superset_tool.utils.dataset_mapper import DatasetMapper # # --- Начало кода модуля ---