This commit is contained in:
2026-01-12 12:33:51 +03:00
parent 7a9b1a190a
commit 696aac32e7
30 changed files with 1511 additions and 593 deletions

View File

@@ -1 +1 @@
from . import plugins, tasks, settings
from . import plugins, tasks, settings, connections

View File

@@ -0,0 +1,78 @@
# [DEF:ConnectionsRouter:Module]
# @SEMANTICS: api, router, connections, database
# @PURPOSE: Defines the FastAPI router for managing external database connections.
# @LAYER: UI (API)
# @RELATION: Depends on SQLAlchemy session.
# @CONSTRAINT: Must use belief_scope for logging.
# [SECTION: IMPORTS]
from typing import List
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from ...core.database import get_db
from ...models.connection import ConnectionConfig
from pydantic import BaseModel, Field
from datetime import datetime
from ...core.logger import logger, belief_scope
# [/SECTION]
router = APIRouter()
# [DEF:ConnectionSchema:Class]
class ConnectionSchema(BaseModel):
id: str
name: str
type: str
host: Optional[str] = None
port: Optional[int] = None
database: Optional[str] = None
username: Optional[str] = None
created_at: datetime
class Config:
orm_mode = True
# [DEF:ConnectionCreate:Class]
class ConnectionCreate(BaseModel):
name: str
type: str
host: Optional[str] = None
port: Optional[int] = None
database: Optional[str] = None
username: Optional[str] = None
password: Optional[str] = None
from typing import Optional
# [DEF:list_connections:Function]
@router.get("", response_model=List[ConnectionSchema])
async def list_connections(db: Session = Depends(get_db)):
with belief_scope("ConnectionsRouter.list_connections"):
connections = db.query(ConnectionConfig).all()
return connections
# [DEF:create_connection:Function]
@router.post("", response_model=ConnectionSchema, status_code=status.HTTP_201_CREATED)
async def create_connection(connection: ConnectionCreate, db: Session = Depends(get_db)):
with belief_scope("ConnectionsRouter.create_connection", f"name={connection.name}"):
db_connection = ConnectionConfig(**connection.dict())
db.add(db_connection)
db.commit()
db.refresh(db_connection)
logger.info(f"[ConnectionsRouter.create_connection][Success] Created connection {db_connection.id}")
return db_connection
# [DEF:delete_connection:Function]
@router.delete("/{connection_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_connection(connection_id: str, db: Session = Depends(get_db)):
with belief_scope("ConnectionsRouter.delete_connection", f"id={connection_id}"):
db_connection = db.query(ConnectionConfig).filter(ConnectionConfig.id == connection_id).first()
if not db_connection:
logger.error(f"[ConnectionsRouter.delete_connection][State] Connection {connection_id} not found")
raise HTTPException(status_code=404, detail="Connection not found")
db.delete(db_connection)
db.commit()
logger.info(f"[ConnectionsRouter.delete_connection][Success] Deleted connection {connection_id}")
return
# [/DEF:ConnectionsRouter:Module]

View File

@@ -20,7 +20,7 @@ import os
from .dependencies import get_task_manager, get_scheduler_service
from .core.logger import logger
from .api.routes import plugins, tasks, settings, environments, mappings, migration
from .api.routes import plugins, tasks, settings, environments, mappings, migration, connections
from .core.database import init_db
# [DEF:App:Global]
@@ -66,6 +66,7 @@ async def log_requests(request: Request, call_next):
app.include_router(plugins.router, prefix="/api/plugins", tags=["Plugins"])
app.include_router(tasks.router, prefix="/api/tasks", tags=["Tasks"])
app.include_router(settings.router, prefix="/api/settings", tags=["Settings"])
app.include_router(connections.router, prefix="/api/settings/connections", tags=["Connections"])
app.include_router(environments.router, prefix="/api/environments", tags=["Environments"])
app.include_router(mappings.router)
app.include_router(migration.router)

View File

@@ -12,8 +12,9 @@
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, Session
from backend.src.models.mapping import Base
# Import TaskRecord to ensure it's registered with Base
# Import models to ensure they're registered with Base
from backend.src.models.task import TaskRecord
from backend.src.models.connection import ConnectionConfig
import os
# [/SECTION]

View File

@@ -78,6 +78,28 @@ class SupersetClient(BaseSupersetClient):
return result
# [/DEF:SupersetClient.get_dashboards_summary:Function]
# [DEF:SupersetClient.get_dataset:Function]
# @PURPOSE: Fetch full dataset structure including columns and metrics.
# @PARAM: dataset_id (int) - The ID of the dataset.
# @RETURN: Dict - The dataset metadata.
def get_dataset(self, dataset_id: int) -> Dict:
"""
Fetch full dataset structure.
"""
return self.network.get(f"/api/v1/dataset/{dataset_id}").json()
# [/DEF:SupersetClient.get_dataset:Function]
# [DEF:SupersetClient.update_dataset:Function]
# @PURPOSE: Update dataset metadata.
# @PARAM: dataset_id (int) - The ID of the dataset.
# @PARAM: data (Dict) - The payload for update.
def update_dataset(self, dataset_id: int, data: Dict):
"""
Update dataset metadata.
"""
self.network.put(f"/api/v1/dataset/{dataset_id}", json=data)
# [/DEF:SupersetClient.update_dataset:Function]
# [/DEF:SupersetClient:Class]
# [/DEF:backend.src.core.superset_client:Module]

View File

@@ -98,9 +98,9 @@ class TaskManager:
params = {**task.params, "_task_id": task_id}
if asyncio.iscoroutinefunction(plugin.execute):
await plugin.execute(params)
task.result = await plugin.execute(params)
else:
await self.loop.run_in_executor(
task.result = await self.loop.run_in_executor(
self.executor,
plugin.execute,
params

View File

@@ -51,6 +51,7 @@ class Task(BaseModel):
params: Dict[str, Any] = Field(default_factory=dict)
input_required: bool = False
input_request: Optional[Dict[str, Any]] = None
result: Optional[Dict[str, Any]] = None
# [DEF:Task.__init__:Function]
# @PURPOSE: Initializes the Task model and validates input_request for AWAITING_INPUT status.

View File

@@ -43,6 +43,7 @@ class TaskPersistenceService:
record.started_at = task.started_at
record.finished_at = task.finished_at
record.params = task.params
record.result = task.result
# Store logs as JSON, converting datetime to string
record.logs = []
@@ -108,6 +109,7 @@ class TaskPersistenceService:
started_at=record.started_at,
finished_at=record.finished_at,
params=record.params or {},
result=record.result,
logs=logs
)
loaded_tasks.append(task)

View File

@@ -0,0 +1,34 @@
# [DEF:backend.src.models.connection:Module]
#
# @SEMANTICS: database, connection, configuration, sqlalchemy, sqlite
# @PURPOSE: Defines the database schema for external database connection configurations.
# @LAYER: Domain
# @RELATION: DEPENDS_ON -> sqlalchemy
#
# @INVARIANT: All primary keys are UUID strings.
# [SECTION: IMPORTS]
from sqlalchemy import Column, String, Integer, DateTime
from sqlalchemy.sql import func
from .mapping import Base
import uuid
# [/SECTION]
# [DEF:ConnectionConfig:Class]
# @PURPOSE: Stores credentials for external databases used for column mapping.
class ConnectionConfig(Base):
__tablename__ = "connection_configs"
id = Column(String, primary_key=True, default=lambda: str(uuid.uuid4()))
name = Column(String, nullable=False)
type = Column(String, nullable=False) # e.g., "postgres"
host = Column(String, nullable=True)
port = Column(Integer, nullable=True)
database = Column(String, nullable=True)
username = Column(String, nullable=True)
password = Column(String, nullable=True) # Encrypted/Obfuscated password
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now())
# [/DEF:ConnectionConfig:Class]
# [/DEF:backend.src.models.connection:Module]

View File

@@ -27,6 +27,7 @@ class TaskRecord(Base):
finished_at = Column(DateTime(timezone=True), nullable=True)
logs = Column(JSON, nullable=True) # Store structured logs as JSON
error = Column(String, nullable=True)
result = Column(JSON, nullable=True)
created_at = Column(DateTime(timezone=True), server_default=func.now())
params = Column(JSON, nullable=True)
# [/DEF:TaskRecord:Class]

View File

@@ -0,0 +1,137 @@
# [DEF:DebugPluginModule:Module]
# @SEMANTICS: plugin, debug, api, database, superset
# @PURPOSE: Implements a plugin for system diagnostics and debugging Superset API responses.
# @LAYER: Plugins
# @RELATION: Inherits from PluginBase. Uses SupersetClient from core.
# @CONSTRAINT: Must use belief_scope for logging.
# [SECTION: IMPORTS]
from typing import Dict, Any, Optional
from ..core.plugin_base import PluginBase
from ..core.superset_client import SupersetClient
from ..core.logger import logger, belief_scope
# [/SECTION]
# [DEF:DebugPlugin:Class]
# @PURPOSE: Plugin for system diagnostics and debugging.
class DebugPlugin(PluginBase):
"""
Plugin for system diagnostics and debugging.
"""
@property
def id(self) -> str:
return "system-debug"
@property
def name(self) -> str:
return "System Debug"
@property
def description(self) -> str:
return "Run system diagnostics and debug Superset API responses."
@property
def version(self) -> str:
return "1.0.0"
# [DEF:DebugPlugin.get_schema:Function]
# @PURPOSE: Returns the JSON schema for the debug plugin parameters.
def get_schema(self) -> Dict[str, Any]:
return {
"type": "object",
"properties": {
"action": {
"type": "string",
"title": "Action",
"enum": ["test-db-api", "get-dataset-structure"],
"default": "test-db-api"
},
"env": {
"type": "string",
"title": "Environment",
"description": "The Superset environment (for dataset structure)."
},
"dataset_id": {
"type": "integer",
"title": "Dataset ID",
"description": "The ID of the dataset (for dataset structure)."
},
"source_env": {
"type": "string",
"title": "Source Environment",
"description": "Source env for DB API test."
},
"target_env": {
"type": "string",
"title": "Target Environment",
"description": "Target env for DB API test."
}
},
"required": ["action"]
}
# [/DEF:DebugPlugin.get_schema:Function]
# [DEF:DebugPlugin.execute:Function]
# @PURPOSE: Executes the debug logic.
async def execute(self, params: Dict[str, Any]) -> Dict[str, Any]:
with belief_scope("DebugPlugin.execute", f"params={params}"):
action = params.get("action")
if action == "test-db-api":
return await self._test_db_api(params)
elif action == "get-dataset-structure":
return await self._get_dataset_structure(params)
else:
raise ValueError(f"Unknown action: {action}")
# [/DEF:DebugPlugin.execute:Function]
# [DEF:DebugPlugin._test_db_api:Function]
async def _test_db_api(self, params: Dict[str, Any]) -> Dict[str, Any]:
source_env_name = params.get("source_env")
target_env_name = params.get("target_env")
if not source_env_name or not target_env_name:
raise ValueError("source_env and target_env are required for test-db-api")
from ..dependencies import get_config_manager
config_manager = get_config_manager()
results = {}
for name in [source_env_name, target_env_name]:
env_config = config_manager.get_environment(name)
if not env_config:
raise ValueError(f"Environment '{name}' not found.")
client = SupersetClient(env_config)
client.authenticate()
count, dbs = client.get_databases()
results[name] = {
"count": count,
"databases": dbs
}
return results
# [DEF:DebugPlugin._get_dataset_structure:Function]
async def _get_dataset_structure(self, params: Dict[str, Any]) -> Dict[str, Any]:
env_name = params.get("env")
dataset_id = params.get("dataset_id")
if not env_name or dataset_id is None:
raise ValueError("env and dataset_id are required for get-dataset-structure")
from ..dependencies import get_config_manager
config_manager = get_config_manager()
env_config = config_manager.get_environment(env_name)
if not env_config:
raise ValueError(f"Environment '{env_name}' not found.")
client = SupersetClient(env_config)
client.authenticate()
dataset_response = client.get_dataset(dataset_id)
return dataset_response.get('result') or {}
# [/DEF:DebugPlugin:Class]
# [/DEF:DebugPluginModule:Module]

View File

@@ -0,0 +1,164 @@
# [DEF:MapperPluginModule:Module]
# @SEMANTICS: plugin, mapper, datasets, postgresql, excel
# @PURPOSE: Implements a plugin for mapping dataset columns using external database connections or Excel files.
# @LAYER: Plugins
# @RELATION: Inherits from PluginBase. Uses DatasetMapper from superset_tool.
# @CONSTRAINT: Must use belief_scope for logging.
# [SECTION: IMPORTS]
from typing import Dict, Any, Optional
from ..core.plugin_base import PluginBase
from ..core.superset_client import SupersetClient
from ..core.logger import logger, belief_scope
from ..core.database import SessionLocal
from ..models.connection import ConnectionConfig
from superset_tool.utils.dataset_mapper import DatasetMapper
from superset_tool.utils.logger import SupersetLogger
# [/SECTION]
# [DEF:MapperPlugin:Class]
# @PURPOSE: Plugin for mapping dataset columns verbose names.
class MapperPlugin(PluginBase):
"""
Plugin for mapping dataset columns verbose names.
"""
@property
def id(self) -> str:
return "dataset-mapper"
@property
def name(self) -> str:
return "Dataset Mapper"
@property
def description(self) -> str:
return "Map dataset column verbose names using PostgreSQL comments or Excel files."
@property
def version(self) -> str:
return "1.0.0"
# [DEF:MapperPlugin.get_schema:Function]
# @PURPOSE: Returns the JSON schema for the mapper plugin parameters.
def get_schema(self) -> Dict[str, Any]:
return {
"type": "object",
"properties": {
"env": {
"type": "string",
"title": "Environment",
"description": "The Superset environment (e.g., 'dev')."
},
"dataset_id": {
"type": "integer",
"title": "Dataset ID",
"description": "The ID of the dataset to update."
},
"source": {
"type": "string",
"title": "Mapping Source",
"enum": ["postgres", "excel"],
"default": "postgres"
},
"connection_id": {
"type": "string",
"title": "Saved Connection",
"description": "The ID of a saved database connection (for postgres source)."
},
"table_name": {
"type": "string",
"title": "Table Name",
"description": "Target table name in PostgreSQL."
},
"table_schema": {
"type": "string",
"title": "Table Schema",
"description": "Target table schema in PostgreSQL.",
"default": "public"
},
"excel_path": {
"type": "string",
"title": "Excel Path",
"description": "Path to the Excel file (for excel source)."
}
},
"required": ["env", "dataset_id", "source"]
}
# [/DEF:MapperPlugin.get_schema:Function]
# [DEF:MapperPlugin.execute:Function]
# @PURPOSE: Executes the dataset mapping logic.
# @PRE: Params contain valid 'env', 'dataset_id', and 'source'.
# @POST: Updates the dataset in Superset.
async def execute(self, params: Dict[str, Any]) -> Dict[str, Any]:
with belief_scope("MapperPlugin.execute", f"params={params}"):
env_name = params.get("env")
dataset_id = params.get("dataset_id")
source = params.get("source")
if not env_name or dataset_id is None or not source:
logger.error("[MapperPlugin.execute][State] Missing required parameters.")
raise ValueError("Missing required parameters: env, dataset_id, source")
# Get config and initialize client
from ..dependencies import get_config_manager
config_manager = get_config_manager()
env_config = config_manager.get_environment(env_name)
if not env_config:
logger.error(f"[MapperPlugin.execute][State] Environment '{env_name}' not found.")
raise ValueError(f"Environment '{env_name}' not found in configuration.")
client = SupersetClient(env_config)
client.authenticate()
postgres_config = None
if source == "postgres":
connection_id = params.get("connection_id")
if not connection_id:
logger.error("[MapperPlugin.execute][State] connection_id is required for postgres source.")
raise ValueError("connection_id is required for postgres source.")
# Load connection from DB
db = SessionLocal()
try:
conn_config = db.query(ConnectionConfig).filter(ConnectionConfig.id == connection_id).first()
if not conn_config:
logger.error(f"[MapperPlugin.execute][State] Connection {connection_id} not found.")
raise ValueError(f"Connection {connection_id} not found.")
postgres_config = {
'dbname': conn_config.database,
'user': conn_config.username,
'password': conn_config.password,
'host': conn_config.host,
'port': str(conn_config.port) if conn_config.port else '5432'
}
finally:
db.close()
logger.info(f"[MapperPlugin.execute][Action] Starting mapping for dataset {dataset_id} in {env_name}")
# Use internal SupersetLogger for DatasetMapper
s_logger = SupersetLogger(name="dataset_mapper_plugin")
mapper = DatasetMapper(s_logger)
try:
mapper.run_mapping(
superset_client=client,
dataset_id=dataset_id,
source=source,
postgres_config=postgres_config,
excel_path=params.get("excel_path"),
table_name=params.get("table_name"),
table_schema=params.get("table_schema") or "public"
)
logger.info(f"[MapperPlugin.execute][Success] Mapping completed for dataset {dataset_id}")
return {"status": "success", "dataset_id": dataset_id}
except Exception as e:
logger.error(f"[MapperPlugin.execute][Failure] Mapping failed: {e}")
raise
# [/DEF:MapperPlugin.execute:Function]
# [/DEF:MapperPlugin:Class]
# [/DEF:MapperPluginModule:Module]

View File

@@ -0,0 +1,161 @@
# [DEF:SearchPluginModule:Module]
# @SEMANTICS: plugin, search, datasets, regex, superset
# @PURPOSE: Implements a plugin for searching text patterns across all datasets in a specific Superset environment.
# @LAYER: Plugins
# @RELATION: Inherits from PluginBase. Uses SupersetClient from core.
# @CONSTRAINT: Must use belief_scope for logging.
# [SECTION: IMPORTS]
import re
from typing import Dict, Any, List, Optional
from ..core.plugin_base import PluginBase
from ..core.superset_client import SupersetClient
from ..core.logger import logger, belief_scope
# [/SECTION]
# [DEF:SearchPlugin:Class]
# @PURPOSE: Plugin for searching text patterns in Superset datasets.
class SearchPlugin(PluginBase):
"""
Plugin for searching text patterns in Superset datasets.
"""
@property
def id(self) -> str:
return "search-datasets"
@property
def name(self) -> str:
return "Search Datasets"
@property
def description(self) -> str:
return "Search for text patterns across all datasets in a specific environment."
@property
def version(self) -> str:
return "1.0.0"
# [DEF:SearchPlugin.get_schema:Function]
# @PURPOSE: Returns the JSON schema for the search plugin parameters.
def get_schema(self) -> Dict[str, Any]:
return {
"type": "object",
"properties": {
"env": {
"type": "string",
"title": "Environment",
"description": "The Superset environment to search in (e.g., 'dev', 'prod')."
},
"query": {
"type": "string",
"title": "Search Query (Regex)",
"description": "The regex pattern to search for."
}
},
"required": ["env", "query"]
}
# [/DEF:SearchPlugin.get_schema:Function]
# [DEF:SearchPlugin.execute:Function]
# @PURPOSE: Executes the dataset search logic.
# @PRE: Params contain valid 'env' and 'query'.
# @POST: Returns a dictionary with count and results list.
async def execute(self, params: Dict[str, Any]) -> Dict[str, Any]:
with belief_scope("SearchPlugin.execute", f"params={params}"):
env_name = params.get("env")
search_query = params.get("query")
if not env_name or not search_query:
logger.error("[SearchPlugin.execute][State] Missing required parameters.")
raise ValueError("Missing required parameters: env, query")
# Get config and initialize client
from ..dependencies import get_config_manager
config_manager = get_config_manager()
env_config = config_manager.get_environment(env_name)
if not env_config:
logger.error(f"[SearchPlugin.execute][State] Environment '{env_name}' not found.")
raise ValueError(f"Environment '{env_name}' not found in configuration.")
client = SupersetClient(env_config)
client.authenticate()
logger.info(f"[SearchPlugin.execute][Action] Searching for pattern: '{search_query}' in environment: {env_name}")
try:
# Ported logic from search_script.py
_, datasets = client.get_datasets(query={"columns": ["id", "table_name", "sql", "database", "columns"]})
if not datasets:
logger.warning("[SearchPlugin.execute][State] No datasets found.")
return {"count": 0, "results": []}
pattern = re.compile(search_query, re.IGNORECASE)
results = []
for dataset in datasets:
dataset_id = dataset.get('id')
dataset_name = dataset.get('table_name', 'Unknown')
if not dataset_id:
continue
for field, value in dataset.items():
value_str = str(value)
if pattern.search(value_str):
match_obj = pattern.search(value_str)
results.append({
"dataset_id": dataset_id,
"dataset_name": dataset_name,
"field": field,
"match_context": self._get_context(value_str, match_obj.group() if match_obj else ""),
"full_value": value_str
})
logger.info(f"[SearchPlugin.execute][Success] Found matches in {len(results)} locations.")
return {
"count": len(results),
"results": results
}
except re.error as e:
logger.error(f"[SearchPlugin.execute][Failure] Invalid regex pattern: {e}")
raise ValueError(f"Invalid regex pattern: {e}")
except Exception as e:
logger.error(f"[SearchPlugin.execute][Failure] Error during search: {e}")
raise
# [/DEF:SearchPlugin.execute:Function]
# [DEF:SearchPlugin._get_context:Function]
# @PURPOSE: Extracts a small context around the match for display.
def _get_context(self, text: str, match_text: str, context_lines: int = 1) -> str:
"""
Extracts a small context around the match for display.
"""
if not match_text:
return text[:100] + "..." if len(text) > 100 else text
lines = text.splitlines()
match_line_index = -1
for i, line in enumerate(lines):
if match_text in line:
match_line_index = i
break
if match_line_index != -1:
start = max(0, match_line_index - context_lines)
end = min(len(lines), match_line_index + context_lines + 1)
context = []
for i in range(start, end):
line_content = lines[i]
if i == match_line_index:
context.append(f"==> {line_content}")
else:
context.append(f" {line_content}")
return "\n".join(context)
return text[:100] + "..." if len(text) > 100 else text
# [/DEF:SearchPlugin._get_context:Function]
# [/DEF:SearchPlugin:Class]
# [/DEF:SearchPluginModule:Module]