Files
ss-tools/superset_tool/utils/dataset_mapper.py
busya 2d8cae563f feat: implement plugin architecture and application settings with Svelte UI
- Added plugin base and loader for backend extensibility
- Implemented application settings management with config persistence
- Created Svelte-based frontend with Dashboard and Settings pages
- Added API routes for plugins, tasks, and settings
- Updated documentation and specifications
- Improved project structure and developer tools
2025-12-20 20:48:18 +03:00

230 lines
13 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# [DEF:superset_tool.utils.dataset_mapper:Module]
#
# @SEMANTICS: dataset, mapping, postgresql, xlsx, superset
# @PURPOSE: Этот модуль отвечает за обновление метаданных (verbose_map) в датасетах Superset, извлекая их из PostgreSQL или XLSX-файлов.
# @LAYER: Domain
# @RELATION: DEPENDS_ON -> superset_tool.client
# @RELATION: DEPENDS_ON -> pandas
# @RELATION: DEPENDS_ON -> psycopg2
# @PUBLIC_API: DatasetMapper
# [SECTION: IMPORTS]
import pandas as pd # type: ignore
import psycopg2 # type: ignore
from superset_tool.client import SupersetClient
from superset_tool.utils.init_clients import setup_clients
from superset_tool.utils.logger import SupersetLogger
from typing import Dict, List, Optional, Any
# [/SECTION]
# [DEF:DatasetMapper:Class]
# @PURPOSE: Класс для меппинга и обновления verbose_map в датасетах Superset.
class DatasetMapper:
def __init__(self, logger: SupersetLogger):
self.logger = logger
# [DEF:DatasetMapper.get_postgres_comments:Function]
# @PURPOSE: Извлекает комментарии к колонкам из системного каталога PostgreSQL.
# @PRE: `db_config` должен содержать валидные креды для подключения к PostgreSQL.
# @PRE: `table_name` и `table_schema` должны быть строками.
# @POST: Возвращается словарь с меппингом `column_name` -> `column_comment`.
# @THROW: Exception - При ошибках подключения или выполнения запроса к БД.
# @PARAM: db_config (Dict) - Конфигурация для подключения к БД.
# @PARAM: table_name (str) - Имя таблицы.
# @PARAM: table_schema (str) - Схема таблицы.
# @RETURN: Dict[str, str] - Словарь с комментариями к колонкам.
def get_postgres_comments(self, db_config: Dict, table_name: str, table_schema: str) -> Dict[str, str]:
self.logger.info("[get_postgres_comments][Enter] Fetching comments from PostgreSQL for %s.%s.", table_schema, table_name)
query = f"""
SELECT
cols.column_name,
CASE
WHEN pg_catalog.col_description(
(SELECT c.oid
FROM pg_catalog.pg_class c
JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
WHERE c.relname = cols.table_name
AND n.nspname = cols.table_schema),
cols.ordinal_position::int
) LIKE '%|%' THEN
split_part(
pg_catalog.col_description(
(SELECT c.oid
FROM pg_catalog.pg_class c
JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
WHERE c.relname = cols.table_name
AND n.nspname = cols.table_schema),
cols.ordinal_position::int
),
'|',
1
)
ELSE
pg_catalog.col_description(
(SELECT c.oid
FROM pg_catalog.pg_class c
JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
WHERE c.relname = cols.table_name
AND n.nspname = cols.table_schema),
cols.ordinal_position::int
)
END AS column_comment
FROM
information_schema.columns cols
WHERE cols.table_catalog = '{db_config.get('dbname')}' AND cols.table_name = '{table_name}' AND cols.table_schema = '{table_schema}';
"""
comments = {}
try:
with psycopg2.connect(**db_config) as conn, conn.cursor() as cursor:
cursor.execute(query)
for row in cursor.fetchall():
if row[1]:
comments[row[0]] = row[1]
self.logger.info("[get_postgres_comments][Success] Fetched %d comments.", len(comments))
except Exception as e:
self.logger.error("[get_postgres_comments][Failure] %s", e, exc_info=True)
raise
return comments
# [/DEF:DatasetMapper.get_postgres_comments]
# [DEF:DatasetMapper.load_excel_mappings:Function]
# @PURPOSE: Загружает меппинги 'column_name' -> 'column_comment' из XLSX файла.
# @PRE: `file_path` должен быть валидным путем к XLSX файлу с колонками 'column_name' и 'column_comment'.
# @POST: Возвращается словарь с меппингами.
# @THROW: Exception - При ошибках чтения файла или парсинга.
# @PARAM: file_path (str) - Путь к XLSX файлу.
# @RETURN: Dict[str, str] - Словарь с меппингами.
def load_excel_mappings(self, file_path: str) -> Dict[str, str]:
self.logger.info("[load_excel_mappings][Enter] Loading mappings from %s.", file_path)
try:
df = pd.read_excel(file_path)
mappings = df.set_index('column_name')['verbose_name'].to_dict()
self.logger.info("[load_excel_mappings][Success] Loaded %d mappings.", len(mappings))
return mappings
except Exception as e:
self.logger.error("[load_excel_mappings][Failure] %s", e, exc_info=True)
raise
# [/DEF:DatasetMapper.load_excel_mappings]
# [DEF:DatasetMapper.run_mapping:Function]
# @PURPOSE: Основная функция для выполнения меппинга и обновления verbose_map датасета в Superset.
# @RELATION: CALLS -> self.get_postgres_comments
# @RELATION: CALLS -> self.load_excel_mappings
# @RELATION: CALLS -> superset_client.get_dataset
# @RELATION: CALLS -> superset_client.update_dataset
# @PARAM: superset_client (SupersetClient) - Клиент Superset.
# @PARAM: dataset_id (int) - ID датасета для обновления.
# @PARAM: source (str) - Источник данных ('postgres', 'excel', 'both').
# @PARAM: postgres_config (Optional[Dict]) - Конфигурация для подключения к PostgreSQL.
# @PARAM: excel_path (Optional[str]) - Путь к XLSX файлу.
# @PARAM: table_name (Optional[str]) - Имя таблицы в PostgreSQL.
# @PARAM: table_schema (Optional[str]) - Схема таблицы в PostgreSQL.
def run_mapping(self, superset_client: SupersetClient, dataset_id: int, source: str, postgres_config: Optional[Dict] = None, excel_path: Optional[str] = None, table_name: Optional[str] = None, table_schema: Optional[str] = None):
self.logger.info("[run_mapping][Enter] Starting dataset mapping for ID %d from source '%s'.", dataset_id, source)
mappings: Dict[str, str] = {}
try:
if source in ['postgres', 'both']:
assert postgres_config and table_name and table_schema, "Postgres config is required."
mappings.update(self.get_postgres_comments(postgres_config, table_name, table_schema))
if source in ['excel', 'both']:
assert excel_path, "Excel path is required."
mappings.update(self.load_excel_mappings(excel_path))
if source not in ['postgres', 'excel', 'both']:
self.logger.error("[run_mapping][Failure] Invalid source: %s.", source)
return
dataset_response = superset_client.get_dataset(dataset_id)
dataset_data = dataset_response['result']
original_columns = dataset_data.get('columns', [])
updated_columns = []
changes_made = False
for column in original_columns:
col_name = column.get('column_name')
new_column = {
"column_name": col_name,
"id": column.get("id"),
"advanced_data_type": column.get("advanced_data_type"),
"description": column.get("description"),
"expression": column.get("expression"),
"extra": column.get("extra"),
"filterable": column.get("filterable"),
"groupby": column.get("groupby"),
"is_active": column.get("is_active"),
"is_dttm": column.get("is_dttm"),
"python_date_format": column.get("python_date_format"),
"type": column.get("type"),
"uuid": column.get("uuid"),
"verbose_name": column.get("verbose_name"),
}
new_column = {k: v for k, v in new_column.items() if v is not None}
if col_name in mappings:
mapping_value = mappings[col_name]
if isinstance(mapping_value, str) and new_column.get('verbose_name') != mapping_value:
new_column['verbose_name'] = mapping_value
changes_made = True
updated_columns.append(new_column)
updated_metrics = []
for metric in dataset_data.get("metrics", []):
new_metric = {
"id": metric.get("id"),
"metric_name": metric.get("metric_name"),
"expression": metric.get("expression"),
"verbose_name": metric.get("verbose_name"),
"description": metric.get("description"),
"d3format": metric.get("d3format"),
"currency": metric.get("currency"),
"extra": metric.get("extra"),
"warning_text": metric.get("warning_text"),
"metric_type": metric.get("metric_type"),
"uuid": metric.get("uuid"),
}
updated_metrics.append({k: v for k, v in new_metric.items() if v is not None})
if changes_made:
payload_for_update = {
"database_id": dataset_data.get("database", {}).get("id"),
"table_name": dataset_data.get("table_name"),
"schema": dataset_data.get("schema"),
"columns": updated_columns,
"owners": [owner["id"] for owner in dataset_data.get("owners", [])],
"metrics": updated_metrics,
"extra": dataset_data.get("extra"),
"description": dataset_data.get("description"),
"sql": dataset_data.get("sql"),
"cache_timeout": dataset_data.get("cache_timeout"),
"catalog": dataset_data.get("catalog"),
"default_endpoint": dataset_data.get("default_endpoint"),
"external_url": dataset_data.get("external_url"),
"fetch_values_predicate": dataset_data.get("fetch_values_predicate"),
"filter_select_enabled": dataset_data.get("filter_select_enabled"),
"is_managed_externally": dataset_data.get("is_managed_externally"),
"is_sqllab_view": dataset_data.get("is_sqllab_view"),
"main_dttm_col": dataset_data.get("main_dttm_col"),
"normalize_columns": dataset_data.get("normalize_columns"),
"offset": dataset_data.get("offset"),
"template_params": dataset_data.get("template_params"),
}
payload_for_update = {k: v for k, v in payload_for_update.items() if v is not None}
superset_client.update_dataset(dataset_id, payload_for_update)
self.logger.info("[run_mapping][Success] Dataset %d columns' verbose_name updated.", dataset_id)
else:
self.logger.info("[run_mapping][State] No changes in columns' verbose_name, skipping update.")
except (AssertionError, FileNotFoundError, Exception) as e:
self.logger.error("[run_mapping][Failure] %s", e, exc_info=True)
return
# [/DEF:DatasetMapper.run_mapping]
# [/DEF:DatasetMapper]
# [/DEF:superset_tool.utils.dataset_mapper]