Files
ss-tools/backend/src/plugins/search.py
2026-01-22 17:37:17 +03:00

202 lines
8.0 KiB
Python

# [DEF:SearchPluginModule:Module]
# @SEMANTICS: plugin, search, datasets, regex, superset
# @PURPOSE: Implements a plugin for searching text patterns across all datasets in a specific Superset environment.
# @LAYER: Plugins
# @RELATION: Inherits from PluginBase. Uses SupersetClient from core.
# @CONSTRAINT: Must use belief_scope for logging.
# [SECTION: IMPORTS]
import re
from typing import Dict, Any, List, Optional
from ..core.plugin_base import PluginBase
from ..core.superset_client import SupersetClient
from ..core.logger import logger, belief_scope
# [/SECTION]
# [DEF:SearchPlugin:Class]
# @PURPOSE: Plugin for searching text patterns in Superset datasets.
class SearchPlugin(PluginBase):
"""
Plugin for searching text patterns in Superset datasets.
"""
@property
# [DEF:id:Function]
# @PURPOSE: Returns the unique identifier for the search plugin.
# @PRE: Plugin instance exists.
# @POST: Returns string ID.
# @RETURN: str - "search-datasets"
def id(self) -> str:
with belief_scope("id"):
return "search-datasets"
# [/DEF:id:Function]
@property
# [DEF:name:Function]
# @PURPOSE: Returns the human-readable name of the search plugin.
# @PRE: Plugin instance exists.
# @POST: Returns string name.
# @RETURN: str - Plugin name.
def name(self) -> str:
with belief_scope("name"):
return "Search Datasets"
# [/DEF:name:Function]
@property
# [DEF:description:Function]
# @PURPOSE: Returns a description of the search plugin.
# @PRE: Plugin instance exists.
# @POST: Returns string description.
# @RETURN: str - Plugin description.
def description(self) -> str:
with belief_scope("description"):
return "Search for text patterns across all datasets in a specific environment."
# [/DEF:description:Function]
@property
# [DEF:version:Function]
# @PURPOSE: Returns the version of the search plugin.
# @PRE: Plugin instance exists.
# @POST: Returns string version.
# @RETURN: str - "1.0.0"
def version(self) -> str:
with belief_scope("version"):
return "1.0.0"
# [/DEF:version:Function]
# [DEF:get_schema:Function]
# @PURPOSE: Returns the JSON schema for the search plugin parameters.
# @PRE: Plugin instance exists.
# @POST: Returns dictionary schema.
# @RETURN: Dict[str, Any] - JSON schema.
def get_schema(self) -> Dict[str, Any]:
with belief_scope("get_schema"):
return {
"type": "object",
"properties": {
"env": {
"type": "string",
"title": "Environment",
"description": "The Superset environment to search in (e.g., 'dev', 'prod')."
},
"query": {
"type": "string",
"title": "Search Query (Regex)",
"description": "The regex pattern to search for."
}
},
"required": ["env", "query"]
}
# [/DEF:get_schema:Function]
# [DEF:execute:Function]
# @PURPOSE: Executes the dataset search logic.
# @PARAM: params (Dict[str, Any]) - Search parameters.
# @PRE: Params contain valid 'env' and 'query'.
# @POST: Returns a dictionary with count and results list.
# @RETURN: Dict[str, Any] - Search results.
async def execute(self, params: Dict[str, Any]) -> Dict[str, Any]:
with belief_scope("SearchPlugin.execute", f"params={params}"):
env_name = params.get("env")
search_query = params.get("query")
if not env_name or not search_query:
logger.error("[SearchPlugin.execute][State] Missing required parameters.")
raise ValueError("Missing required parameters: env, query")
# Get config and initialize client
from ..dependencies import get_config_manager
config_manager = get_config_manager()
env_config = config_manager.get_environment(env_name)
if not env_config:
logger.error(f"[SearchPlugin.execute][State] Environment '{env_name}' not found.")
raise ValueError(f"Environment '{env_name}' not found in configuration.")
client = SupersetClient(env_config)
client.authenticate()
logger.info(f"[SearchPlugin.execute][Action] Searching for pattern: '{search_query}' in environment: {env_name}")
try:
# Ported logic from search_script.py
_, datasets = client.get_datasets(query={"columns": ["id", "table_name", "sql", "database", "columns"]})
if not datasets:
logger.warning("[SearchPlugin.execute][State] No datasets found.")
return {"count": 0, "results": []}
pattern = re.compile(search_query, re.IGNORECASE)
results = []
for dataset in datasets:
dataset_id = dataset.get('id')
dataset_name = dataset.get('table_name', 'Unknown')
if not dataset_id:
continue
for field, value in dataset.items():
value_str = str(value)
if pattern.search(value_str):
match_obj = pattern.search(value_str)
results.append({
"dataset_id": dataset_id,
"dataset_name": dataset_name,
"field": field,
"match_context": self._get_context(value_str, match_obj.group() if match_obj else ""),
"full_value": value_str
})
logger.info(f"[SearchPlugin.execute][Success] Found matches in {len(results)} locations.")
return {
"count": len(results),
"results": results
}
except re.error as e:
logger.error(f"[SearchPlugin.execute][Failure] Invalid regex pattern: {e}")
raise ValueError(f"Invalid regex pattern: {e}")
except Exception as e:
logger.error(f"[SearchPlugin.execute][Failure] Error during search: {e}")
raise
# [/DEF:execute:Function]
# [DEF:_get_context:Function]
# @PURPOSE: Extracts a small context around the match for display.
# @PARAM: text (str) - The full text to extract context from.
# @PARAM: match_text (str) - The matched text pattern.
# @PARAM: context_lines (int) - Number of lines of context to include.
# @PRE: text and match_text must be strings.
# @POST: Returns context string.
# @RETURN: str - Extracted context.
def _get_context(self, text: str, match_text: str, context_lines: int = 1) -> str:
"""
Extracts a small context around the match for display.
"""
with belief_scope("_get_context"):
if not match_text:
return text[:100] + "..." if len(text) > 100 else text
lines = text.splitlines()
match_line_index = -1
for i, line in enumerate(lines):
if match_text in line:
match_line_index = i
break
if match_line_index != -1:
start = max(0, match_line_index - context_lines)
end = min(len(lines), match_line_index + context_lines + 1)
context = []
for i in range(start, end):
line_content = lines[i]
if i == match_line_index:
context.append(f"==> {line_content}")
else:
context.append(f" {line_content}")
return "\n".join(context)
return text[:100] + "..." if len(text) > 100 else text
# [/DEF:_get_context:Function]
# [/DEF:SearchPlugin:Class]
# [/DEF:SearchPluginModule:Module]