# [DEF:MapperPluginModule:Module] # @SEMANTICS: plugin, mapper, datasets, postgresql, excel # @PURPOSE: Implements a plugin for mapping dataset columns using external database connections or Excel files. # @LAYER: Plugins # @RELATION: Inherits from PluginBase. Uses DatasetMapper from superset_tool. # @CONSTRAINT: Must use belief_scope for logging. # [SECTION: IMPORTS] from typing import Dict, Any, Optional from ..core.plugin_base import PluginBase from ..core.superset_client import SupersetClient from ..core.logger import logger, belief_scope from ..core.database import SessionLocal from ..models.connection import ConnectionConfig from ..core.utils.dataset_mapper import DatasetMapper # [/SECTION] # [DEF:MapperPlugin:Class] # @PURPOSE: Plugin for mapping dataset columns verbose names. class MapperPlugin(PluginBase): """ Plugin for mapping dataset columns verbose names. """ @property # [DEF:id:Function] # @PURPOSE: Returns the unique identifier for the mapper plugin. # @PRE: Plugin instance exists. # @POST: Returns string ID. # @RETURN: str - "dataset-mapper" def id(self) -> str: with belief_scope("id"): return "dataset-mapper" # [/DEF:id:Function] @property # [DEF:name:Function] # @PURPOSE: Returns the human-readable name of the mapper plugin. # @PRE: Plugin instance exists. # @POST: Returns string name. # @RETURN: str - Plugin name. def name(self) -> str: with belief_scope("name"): return "Dataset Mapper" # [/DEF:name:Function] @property # [DEF:description:Function] # @PURPOSE: Returns a description of the mapper plugin. # @PRE: Plugin instance exists. # @POST: Returns string description. # @RETURN: str - Plugin description. def description(self) -> str: with belief_scope("description"): return "Map dataset column verbose names using PostgreSQL comments or Excel files." # [/DEF:description:Function] @property # [DEF:version:Function] # @PURPOSE: Returns the version of the mapper plugin. # @PRE: Plugin instance exists. # @POST: Returns string version. # @RETURN: str - "1.0.0" def version(self) -> str: with belief_scope("version"): return "1.0.0" # [/DEF:version:Function] # [DEF:get_schema:Function] # @PURPOSE: Returns the JSON schema for the mapper plugin parameters. # @PRE: Plugin instance exists. # @POST: Returns dictionary schema. # @RETURN: Dict[str, Any] - JSON schema. def get_schema(self) -> Dict[str, Any]: with belief_scope("get_schema"): return { "type": "object", "properties": { "env": { "type": "string", "title": "Environment", "description": "The Superset environment (e.g., 'dev')." }, "dataset_id": { "type": "integer", "title": "Dataset ID", "description": "The ID of the dataset to update." }, "source": { "type": "string", "title": "Mapping Source", "enum": ["postgres", "excel"], "default": "postgres" }, "connection_id": { "type": "string", "title": "Saved Connection", "description": "The ID of a saved database connection (for postgres source)." }, "table_name": { "type": "string", "title": "Table Name", "description": "Target table name in PostgreSQL." }, "table_schema": { "type": "string", "title": "Table Schema", "description": "Target table schema in PostgreSQL.", "default": "public" }, "excel_path": { "type": "string", "title": "Excel Path", "description": "Path to the Excel file (for excel source)." } }, "required": ["env", "dataset_id", "source"] } # [/DEF:get_schema:Function] # [DEF:execute:Function] # @PURPOSE: Executes the dataset mapping logic. # @PARAM: params (Dict[str, Any]) - Mapping parameters. # @PRE: Params contain valid 'env', 'dataset_id', and 'source'. params must be a dictionary. # @POST: Updates the dataset in Superset. # @RETURN: Dict[str, Any] - Execution status. async def execute(self, params: Dict[str, Any]) -> Dict[str, Any]: with belief_scope("execute"): env_name = params.get("env") dataset_id = params.get("dataset_id") source = params.get("source") if not env_name or dataset_id is None or not source: logger.error("[MapperPlugin.execute][State] Missing required parameters.") raise ValueError("Missing required parameters: env, dataset_id, source") # Get config and initialize client from ..dependencies import get_config_manager config_manager = get_config_manager() env_config = config_manager.get_environment(env_name) if not env_config: logger.error(f"[MapperPlugin.execute][State] Environment '{env_name}' not found.") raise ValueError(f"Environment '{env_name}' not found in configuration.") client = SupersetClient(env_config) client.authenticate() postgres_config = None if source == "postgres": connection_id = params.get("connection_id") if not connection_id: logger.error("[MapperPlugin.execute][State] connection_id is required for postgres source.") raise ValueError("connection_id is required for postgres source.") # Load connection from DB db = SessionLocal() try: conn_config = db.query(ConnectionConfig).filter(ConnectionConfig.id == connection_id).first() if not conn_config: logger.error(f"[MapperPlugin.execute][State] Connection {connection_id} not found.") raise ValueError(f"Connection {connection_id} not found.") postgres_config = { 'dbname': conn_config.database, 'user': conn_config.username, 'password': conn_config.password, 'host': conn_config.host, 'port': str(conn_config.port) if conn_config.port else '5432' } finally: db.close() logger.info(f"[MapperPlugin.execute][Action] Starting mapping for dataset {dataset_id} in {env_name}") mapper = DatasetMapper() try: mapper.run_mapping( superset_client=client, dataset_id=dataset_id, source=source, postgres_config=postgres_config, excel_path=params.get("excel_path"), table_name=params.get("table_name"), table_schema=params.get("table_schema") or "public" ) logger.info(f"[MapperPlugin.execute][Success] Mapping completed for dataset {dataset_id}") return {"status": "success", "dataset_id": dataset_id} except Exception as e: logger.error(f"[MapperPlugin.execute][Failure] Mapping failed: {e}") raise # [/DEF:execute:Function] # [/DEF:MapperPlugin:Class] # [/DEF:MapperPluginModule:Module]