semantic markup update

This commit is contained in:
2026-01-18 21:29:54 +03:00
parent 11c59fb420
commit 76baeb1038
85 changed files with 7020 additions and 5953 deletions

View File

@@ -11,6 +11,7 @@ from pathlib import Path
from requests.exceptions import RequestException
from ..core.plugin_base import PluginBase
from ..core.logger import belief_scope
from superset_tool.client import SupersetClient
from superset_tool.exceptions import SupersetAPIError
from superset_tool.utils.logger import SupersetLogger
@@ -33,24 +34,58 @@ class BackupPlugin(PluginBase):
"""
@property
# [DEF:id:Function]
# @PURPOSE: Returns the unique identifier for the backup plugin.
# @PRE: Plugin instance exists.
# @POST: Returns string ID.
# @RETURN: str - "superset-backup"
def id(self) -> str:
return "superset-backup"
with belief_scope("id"):
return "superset-backup"
# [/DEF:id:Function]
@property
# [DEF:name:Function]
# @PURPOSE: Returns the human-readable name of the backup plugin.
# @PRE: Plugin instance exists.
# @POST: Returns string name.
# @RETURN: str - Plugin name.
def name(self) -> str:
return "Superset Dashboard Backup"
with belief_scope("name"):
return "Superset Dashboard Backup"
# [/DEF:name:Function]
@property
# [DEF:description:Function]
# @PURPOSE: Returns a description of the backup plugin.
# @PRE: Plugin instance exists.
# @POST: Returns string description.
# @RETURN: str - Plugin description.
def description(self) -> str:
return "Backs up all dashboards from a Superset instance."
with belief_scope("description"):
return "Backs up all dashboards from a Superset instance."
# [/DEF:description:Function]
@property
# [DEF:version:Function]
# @PURPOSE: Returns the version of the backup plugin.
# @PRE: Plugin instance exists.
# @POST: Returns string version.
# @RETURN: str - "1.0.0"
def version(self) -> str:
return "1.0.0"
with belief_scope("version"):
return "1.0.0"
# [/DEF:version:Function]
# [DEF:get_schema:Function]
# @PURPOSE: Returns the JSON schema for backup plugin parameters.
# @PRE: Plugin instance exists.
# @POST: Returns dictionary schema.
# @RETURN: Dict[str, Any] - JSON schema.
def get_schema(self) -> Dict[str, Any]:
config_manager = get_config_manager()
envs = [e.name for e in config_manager.get_environments()]
with belief_scope("get_schema"):
config_manager = get_config_manager()
envs = [e.name for e in config_manager.get_environments()]
default_path = config_manager.get_config().settings.backup_path
return {
@@ -71,79 +106,87 @@ class BackupPlugin(PluginBase):
},
"required": ["env", "backup_path"],
}
# [/DEF:get_schema:Function]
# [DEF:execute:Function]
# @PURPOSE: Executes the dashboard backup logic.
# @PARAM: params (Dict[str, Any]) - Backup parameters (env, backup_path).
# @PRE: Target environment must be configured. params must be a dictionary.
# @POST: All dashboards are exported and archived.
async def execute(self, params: Dict[str, Any]):
config_manager = get_config_manager()
env_id = params.get("environment_id")
# Resolve environment name if environment_id is provided
if env_id:
env_config = next((e for e in config_manager.get_environments() if e.id == env_id), None)
if env_config:
params["env"] = env_config.name
env = params.get("env")
if not env:
raise KeyError("env")
backup_path_str = params.get("backup_path") or config_manager.get_config().settings.backup_path
backup_path = Path(backup_path_str)
logger = SupersetLogger(log_dir=backup_path / "Logs", console=True)
logger.info(f"[BackupPlugin][Entry] Starting backup for {env}.")
try:
with belief_scope("execute"):
config_manager = get_config_manager()
if not config_manager.has_environments():
raise ValueError("No Superset environments configured. Please add an environment in Settings.")
env_id = params.get("environment_id")
# Resolve environment name if environment_id is provided
if env_id:
env_config = next((e for e in config_manager.get_environments() if e.id == env_id), None)
if env_config:
params["env"] = env_config.name
env = params.get("env")
if not env:
raise KeyError("env")
backup_path_str = params.get("backup_path") or config_manager.get_config().settings.backup_path
backup_path = Path(backup_path_str)
logger = SupersetLogger(log_dir=backup_path / "Logs", console=True)
logger.info(f"[BackupPlugin][Entry] Starting backup for {env}.")
try:
config_manager = get_config_manager()
if not config_manager.has_environments():
raise ValueError("No Superset environments configured. Please add an environment in Settings.")
clients = setup_clients(logger, custom_envs=config_manager.get_environments())
client = clients.get(env)
clients = setup_clients(logger, custom_envs=config_manager.get_environments())
client = clients.get(env)
if not client:
raise ValueError(f"Environment '{env}' not found in configuration.")
dashboard_count, dashboard_meta = client.get_dashboards()
logger.info(f"[BackupPlugin][Progress] Found {dashboard_count} dashboards to export in {env}.")
if not client:
raise ValueError(f"Environment '{env}' not found in configuration.")
dashboard_count, dashboard_meta = client.get_dashboards()
logger.info(f"[BackupPlugin][Progress] Found {dashboard_count} dashboards to export in {env}.")
if dashboard_count == 0:
logger.info("[BackupPlugin][Exit] No dashboards to back up.")
return
if dashboard_count == 0:
logger.info("[BackupPlugin][Exit] No dashboards to back up.")
return
for db in dashboard_meta:
dashboard_id = db.get('id')
dashboard_title = db.get('dashboard_title', 'Unknown Dashboard')
if not dashboard_id:
continue
for db in dashboard_meta:
dashboard_id = db.get('id')
dashboard_title = db.get('dashboard_title', 'Unknown Dashboard')
if not dashboard_id:
continue
try:
dashboard_base_dir_name = sanitize_filename(f"{dashboard_title}")
dashboard_dir = backup_path / env.upper() / dashboard_base_dir_name
dashboard_dir.mkdir(parents=True, exist_ok=True)
try:
dashboard_base_dir_name = sanitize_filename(f"{dashboard_title}")
dashboard_dir = backup_path / env.upper() / dashboard_base_dir_name
dashboard_dir.mkdir(parents=True, exist_ok=True)
zip_content, filename = client.export_dashboard(dashboard_id)
zip_content, filename = client.export_dashboard(dashboard_id)
save_and_unpack_dashboard(
zip_content=zip_content,
original_filename=filename,
output_dir=dashboard_dir,
unpack=False,
logger=logger
)
save_and_unpack_dashboard(
zip_content=zip_content,
original_filename=filename,
output_dir=dashboard_dir,
unpack=False,
logger=logger
)
archive_exports(str(dashboard_dir), policy=RetentionPolicy(), logger=logger)
archive_exports(str(dashboard_dir), policy=RetentionPolicy(), logger=logger)
except (SupersetAPIError, RequestException, IOError, OSError) as db_error:
logger.error(f"[BackupPlugin][Failure] Failed to export dashboard {dashboard_title} (ID: {dashboard_id}): {db_error}", exc_info=True)
continue
consolidate_archive_folders(backup_path / env.upper(), logger=logger)
remove_empty_directories(str(backup_path / env.upper()), logger=logger)
except (SupersetAPIError, RequestException, IOError, OSError) as db_error:
logger.error(f"[BackupPlugin][Failure] Failed to export dashboard {dashboard_title} (ID: {dashboard_id}): {db_error}", exc_info=True)
continue
consolidate_archive_folders(backup_path / env.upper(), logger=logger)
remove_empty_directories(str(backup_path / env.upper()), logger=logger)
logger.info(f"[BackupPlugin][CoherenceCheck:Passed] Backup logic completed for {env}.")
logger.info(f"[BackupPlugin][CoherenceCheck:Passed] Backup logic completed for {env}.")
except (RequestException, IOError, KeyError) as e:
logger.critical(f"[BackupPlugin][Failure] Fatal error during backup for {env}: {e}", exc_info=True)
raise e
except (RequestException, IOError, KeyError) as e:
logger.critical(f"[BackupPlugin][Failure] Fatal error during backup for {env}: {e}", exc_info=True)
raise e
# [/DEF:execute:Function]
# [/DEF:BackupPlugin:Class]
# [/DEF:BackupPlugin:Module]

View File

@@ -20,25 +20,57 @@ class DebugPlugin(PluginBase):
"""
@property
# [DEF:id:Function]
# @PURPOSE: Returns the unique identifier for the debug plugin.
# @PRE: Plugin instance exists.
# @POST: Returns string ID.
# @RETURN: str - "system-debug"
def id(self) -> str:
return "system-debug"
with belief_scope("id"):
return "system-debug"
# [/DEF:id:Function]
@property
# [DEF:name:Function]
# @PURPOSE: Returns the human-readable name of the debug plugin.
# @PRE: Plugin instance exists.
# @POST: Returns string name.
# @RETURN: str - Plugin name.
def name(self) -> str:
return "System Debug"
with belief_scope("name"):
return "System Debug"
# [/DEF:name:Function]
@property
# [DEF:description:Function]
# @PURPOSE: Returns a description of the debug plugin.
# @PRE: Plugin instance exists.
# @POST: Returns string description.
# @RETURN: str - Plugin description.
def description(self) -> str:
return "Run system diagnostics and debug Superset API responses."
with belief_scope("description"):
return "Run system diagnostics and debug Superset API responses."
# [/DEF:description:Function]
@property
# [DEF:version:Function]
# @PURPOSE: Returns the version of the debug plugin.
# @PRE: Plugin instance exists.
# @POST: Returns string version.
# @RETURN: str - "1.0.0"
def version(self) -> str:
return "1.0.0"
with belief_scope("version"):
return "1.0.0"
# [/DEF:version:Function]
# [DEF:DebugPlugin.get_schema:Function]
# [DEF:get_schema:Function]
# @PURPOSE: Returns the JSON schema for the debug plugin parameters.
# @PRE: Plugin instance exists.
# @POST: Returns dictionary schema.
# @RETURN: Dict[str, Any] - JSON schema.
def get_schema(self) -> Dict[str, Any]:
return {
with belief_scope("get_schema"):
return {
"type": "object",
"properties": {
"action": {
@@ -70,12 +102,16 @@ class DebugPlugin(PluginBase):
},
"required": ["action"]
}
# [/DEF:DebugPlugin.get_schema:Function]
# [/DEF:get_schema:Function]
# [DEF:DebugPlugin.execute:Function]
# [DEF:execute:Function]
# @PURPOSE: Executes the debug logic.
# @PARAM: params (Dict[str, Any]) - Debug parameters.
# @PRE: action must be provided in params.
# @POST: Debug action is executed and results returned.
# @RETURN: Dict[str, Any] - Execution results.
async def execute(self, params: Dict[str, Any]) -> Dict[str, Any]:
with belief_scope("DebugPlugin.execute", f"params={params}"):
with belief_scope("execute"):
action = params.get("action")
if action == "test-db-api":
@@ -84,16 +120,17 @@ class DebugPlugin(PluginBase):
return await self._get_dataset_structure(params)
else:
raise ValueError(f"Unknown action: {action}")
# [/DEF:DebugPlugin.execute:Function]
# [/DEF:execute:Function]
# [DEF:DebugPlugin._test_db_api:Function]
# [DEF:_test_db_api:Function]
# @PURPOSE: Tests database API connectivity for source and target environments.
# @PRE: source_env and target_env params exist.
# @PRE: source_env and target_env params exist in params.
# @POST: Returns DB counts for both envs.
# @PARAM: params (Dict) - Plugin parameters.
# @RETURN: Dict - Comparison results.
async def _test_db_api(self, params: Dict[str, Any]) -> Dict[str, Any]:
source_env_name = params.get("source_env")
with belief_scope("_test_db_api"):
source_env_name = params.get("source_env")
target_env_name = params.get("target_env")
if not source_env_name or not target_env_name:
@@ -117,16 +154,17 @@ class DebugPlugin(PluginBase):
}
return results
# [/DEF:DebugPlugin._test_db_api:Function]
# [/DEF:_test_db_api:Function]
# [DEF:DebugPlugin._get_dataset_structure:Function]
# [DEF:_get_dataset_structure:Function]
# @PURPOSE: Retrieves the structure of a dataset.
# @PRE: env and dataset_id params exist.
# @PRE: env and dataset_id params exist in params.
# @POST: Returns dataset JSON structure.
# @PARAM: params (Dict) - Plugin parameters.
# @RETURN: Dict - Dataset structure.
async def _get_dataset_structure(self, params: Dict[str, Any]) -> Dict[str, Any]:
env_name = params.get("env")
with belief_scope("_get_dataset_structure"):
env_name = params.get("env")
dataset_id = params.get("dataset_id")
if not env_name or dataset_id is None:
@@ -143,7 +181,7 @@ class DebugPlugin(PluginBase):
dataset_response = client.get_dataset(dataset_id)
return dataset_response.get('result') or {}
# [/DEF:DebugPlugin._get_dataset_structure:Function]
# [/DEF:_get_dataset_structure:Function]
# [/DEF:DebugPlugin:Class]
# [/DEF:DebugPluginModule:Module]

View File

@@ -24,25 +24,57 @@ class MapperPlugin(PluginBase):
"""
@property
# [DEF:id:Function]
# @PURPOSE: Returns the unique identifier for the mapper plugin.
# @PRE: Plugin instance exists.
# @POST: Returns string ID.
# @RETURN: str - "dataset-mapper"
def id(self) -> str:
return "dataset-mapper"
with belief_scope("id"):
return "dataset-mapper"
# [/DEF:id:Function]
@property
# [DEF:name:Function]
# @PURPOSE: Returns the human-readable name of the mapper plugin.
# @PRE: Plugin instance exists.
# @POST: Returns string name.
# @RETURN: str - Plugin name.
def name(self) -> str:
return "Dataset Mapper"
with belief_scope("name"):
return "Dataset Mapper"
# [/DEF:name:Function]
@property
# [DEF:description:Function]
# @PURPOSE: Returns a description of the mapper plugin.
# @PRE: Plugin instance exists.
# @POST: Returns string description.
# @RETURN: str - Plugin description.
def description(self) -> str:
return "Map dataset column verbose names using PostgreSQL comments or Excel files."
with belief_scope("description"):
return "Map dataset column verbose names using PostgreSQL comments or Excel files."
# [/DEF:description:Function]
@property
# [DEF:version:Function]
# @PURPOSE: Returns the version of the mapper plugin.
# @PRE: Plugin instance exists.
# @POST: Returns string version.
# @RETURN: str - "1.0.0"
def version(self) -> str:
return "1.0.0"
with belief_scope("version"):
return "1.0.0"
# [/DEF:version:Function]
# [DEF:MapperPlugin.get_schema:Function]
# [DEF:get_schema:Function]
# @PURPOSE: Returns the JSON schema for the mapper plugin parameters.
# @PRE: Plugin instance exists.
# @POST: Returns dictionary schema.
# @RETURN: Dict[str, Any] - JSON schema.
def get_schema(self) -> Dict[str, Any]:
return {
with belief_scope("get_schema"):
return {
"type": "object",
"properties": {
"env": {
@@ -85,14 +117,16 @@ class MapperPlugin(PluginBase):
},
"required": ["env", "dataset_id", "source"]
}
# [/DEF:MapperPlugin.get_schema:Function]
# [/DEF:get_schema:Function]
# [DEF:MapperPlugin.execute:Function]
# [DEF:execute:Function]
# @PURPOSE: Executes the dataset mapping logic.
# @PRE: Params contain valid 'env', 'dataset_id', and 'source'.
# @PARAM: params (Dict[str, Any]) - Mapping parameters.
# @PRE: Params contain valid 'env', 'dataset_id', and 'source'. params must be a dictionary.
# @POST: Updates the dataset in Superset.
# @RETURN: Dict[str, Any] - Execution status.
async def execute(self, params: Dict[str, Any]) -> Dict[str, Any]:
with belief_scope("MapperPlugin.execute", f"params={params}"):
with belief_scope("execute"):
env_name = params.get("env")
dataset_id = params.get("dataset_id")
source = params.get("source")
@@ -158,7 +192,7 @@ class MapperPlugin(PluginBase):
except Exception as e:
logger.error(f"[MapperPlugin.execute][Failure] Mapping failed: {e}")
raise
# [/DEF:MapperPlugin.execute:Function]
# [/DEF:execute:Function]
# [/DEF:MapperPlugin:Class]
# [/DEF:MapperPluginModule:Module]

View File

@@ -12,6 +12,7 @@ import zipfile
import re
from ..core.plugin_base import PluginBase
from ..core.logger import belief_scope
from superset_tool.client import SupersetClient
from superset_tool.utils.init_clients import setup_clients
from superset_tool.utils.fileio import create_temp_file, update_yamls, create_dashboard_export
@@ -29,23 +30,57 @@ class MigrationPlugin(PluginBase):
"""
@property
# [DEF:id:Function]
# @PURPOSE: Returns the unique identifier for the migration plugin.
# @PRE: None.
# @POST: Returns "superset-migration".
# @RETURN: str - "superset-migration"
def id(self) -> str:
return "superset-migration"
with belief_scope("id"):
return "superset-migration"
# [/DEF:id:Function]
@property
# [DEF:name:Function]
# @PURPOSE: Returns the human-readable name of the migration plugin.
# @PRE: None.
# @POST: Returns the plugin name.
# @RETURN: str - Plugin name.
def name(self) -> str:
return "Superset Dashboard Migration"
with belief_scope("name"):
return "Superset Dashboard Migration"
# [/DEF:name:Function]
@property
# [DEF:description:Function]
# @PURPOSE: Returns a description of the migration plugin.
# @PRE: None.
# @POST: Returns the plugin description.
# @RETURN: str - Plugin description.
def description(self) -> str:
return "Migrates dashboards between Superset environments."
with belief_scope("description"):
return "Migrates dashboards between Superset environments."
# [/DEF:description:Function]
@property
# [DEF:version:Function]
# @PURPOSE: Returns the version of the migration plugin.
# @PRE: None.
# @POST: Returns "1.0.0".
# @RETURN: str - "1.0.0"
def version(self) -> str:
return "1.0.0"
with belief_scope("version"):
return "1.0.0"
# [/DEF:version:Function]
# [DEF:get_schema:Function]
# @PURPOSE: Returns the JSON schema for migration plugin parameters.
# @PRE: Config manager is available.
# @POST: Returns a valid JSON schema dictionary.
# @RETURN: Dict[str, Any] - JSON schema.
def get_schema(self) -> Dict[str, Any]:
config_manager = get_config_manager()
with belief_scope("get_schema"):
config_manager = get_config_manager()
envs = [e.name for e in config_manager.get_environments()]
return {
@@ -87,11 +122,18 @@ class MigrationPlugin(PluginBase):
},
"required": ["from_env", "to_env", "dashboard_regex"],
}
# [/DEF:get_schema:Function]
# [DEF:execute:Function]
# @PURPOSE: Executes the dashboard migration logic.
# @PARAM: params (Dict[str, Any]) - Migration parameters.
# @PRE: Source and target environments must be configured.
# @POST: Selected dashboards are migrated.
async def execute(self, params: Dict[str, Any]):
source_env_id = params.get("source_env_id")
target_env_id = params.get("target_env_id")
selected_ids = params.get("selected_ids")
with belief_scope("MigrationPlugin.execute"):
source_env_id = params.get("source_env_id")
target_env_id = params.get("target_env_id")
selected_ids = params.get("selected_ids")
# Legacy support or alternative params
from_env_name = params.get("from_env")
@@ -109,29 +151,77 @@ class MigrationPlugin(PluginBase):
tm = get_task_manager()
class TaskLoggerProxy(SupersetLogger):
# [DEF:__init__:Function]
# @PURPOSE: Initializes the proxy logger.
# @PRE: None.
# @POST: Instance is initialized.
def __init__(self):
# Initialize parent with dummy values since we override methods
super().__init__(console=False)
with belief_scope("__init__"):
# Initialize parent with dummy values since we override methods
super().__init__(console=False)
# [/DEF:__init__:Function]
# [DEF:debug:Function]
# @PURPOSE: Logs a debug message to the task manager.
# @PRE: msg is a string.
# @POST: Log is added to task manager if task_id exists.
def debug(self, msg, *args, extra=None, **kwargs):
if task_id: tm._add_log(task_id, "DEBUG", msg, extra or {})
with belief_scope("debug"):
if task_id: tm._add_log(task_id, "DEBUG", msg, extra or {})
# [/DEF:debug:Function]
# [DEF:info:Function]
# @PURPOSE: Logs an info message to the task manager.
# @PRE: msg is a string.
# @POST: Log is added to task manager if task_id exists.
def info(self, msg, *args, extra=None, **kwargs):
if task_id: tm._add_log(task_id, "INFO", msg, extra or {})
with belief_scope("info"):
if task_id: tm._add_log(task_id, "INFO", msg, extra or {})
# [/DEF:info:Function]
# [DEF:warning:Function]
# @PURPOSE: Logs a warning message to the task manager.
# @PRE: msg is a string.
# @POST: Log is added to task manager if task_id exists.
def warning(self, msg, *args, extra=None, **kwargs):
if task_id: tm._add_log(task_id, "WARNING", msg, extra or {})
with belief_scope("warning"):
if task_id: tm._add_log(task_id, "WARNING", msg, extra or {})
# [/DEF:warning:Function]
# [DEF:error:Function]
# @PURPOSE: Logs an error message to the task manager.
# @PRE: msg is a string.
# @POST: Log is added to task manager if task_id exists.
def error(self, msg, *args, extra=None, **kwargs):
if task_id: tm._add_log(task_id, "ERROR", msg, extra or {})
with belief_scope("error"):
if task_id: tm._add_log(task_id, "ERROR", msg, extra or {})
# [/DEF:error:Function]
# [DEF:critical:Function]
# @PURPOSE: Logs a critical message to the task manager.
# @PRE: msg is a string.
# @POST: Log is added to task manager if task_id exists.
def critical(self, msg, *args, extra=None, **kwargs):
if task_id: tm._add_log(task_id, "ERROR", msg, extra or {})
with belief_scope("critical"):
if task_id: tm._add_log(task_id, "ERROR", msg, extra or {})
# [/DEF:critical:Function]
# [DEF:exception:Function]
# @PURPOSE: Logs an exception message to the task manager.
# @PRE: msg is a string.
# @POST: Log is added to task manager if task_id exists.
def exception(self, msg, *args, **kwargs):
if task_id: tm._add_log(task_id, "ERROR", msg, {"exception": True})
with belief_scope("exception"):
if task_id: tm._add_log(task_id, "ERROR", msg, {"exception": True})
# [/DEF:exception:Function]
logger = TaskLoggerProxy()
logger.info(f"[MigrationPlugin][Entry] Starting migration task.")
logger.info(f"[MigrationPlugin][Action] Params: {params}")
try:
config_manager = get_config_manager()
with belief_scope("execute"):
config_manager = get_config_manager()
environments = config_manager.get_environments()
# Resolve environments
@@ -289,12 +379,12 @@ class MigrationPlugin(PluginBase):
continue
logger.error(f"[MigrationPlugin][Failure] Failed to migrate dashboard {title}: {exc}", exc_info=True)
# [/DEF:MigrationPlugin.execute:Action]
logger.info("[MigrationPlugin][Exit] Migration finished.")
logger.info("[MigrationPlugin][Exit] Migration finished.")
except Exception as e:
logger.critical(f"[MigrationPlugin][Failure] Fatal error during migration: {e}", exc_info=True)
raise e
# [/DEF:MigrationPlugin.execute:Action]
# [/DEF:execute:Function]
# [/DEF:MigrationPlugin:Class]
# [/DEF:MigrationPlugin:Module]

View File

@@ -21,25 +21,57 @@ class SearchPlugin(PluginBase):
"""
@property
# [DEF:id:Function]
# @PURPOSE: Returns the unique identifier for the search plugin.
# @PRE: Plugin instance exists.
# @POST: Returns string ID.
# @RETURN: str - "search-datasets"
def id(self) -> str:
return "search-datasets"
with belief_scope("id"):
return "search-datasets"
# [/DEF:id:Function]
@property
# [DEF:name:Function]
# @PURPOSE: Returns the human-readable name of the search plugin.
# @PRE: Plugin instance exists.
# @POST: Returns string name.
# @RETURN: str - Plugin name.
def name(self) -> str:
return "Search Datasets"
with belief_scope("name"):
return "Search Datasets"
# [/DEF:name:Function]
@property
# [DEF:description:Function]
# @PURPOSE: Returns a description of the search plugin.
# @PRE: Plugin instance exists.
# @POST: Returns string description.
# @RETURN: str - Plugin description.
def description(self) -> str:
return "Search for text patterns across all datasets in a specific environment."
with belief_scope("description"):
return "Search for text patterns across all datasets in a specific environment."
# [/DEF:description:Function]
@property
# [DEF:version:Function]
# @PURPOSE: Returns the version of the search plugin.
# @PRE: Plugin instance exists.
# @POST: Returns string version.
# @RETURN: str - "1.0.0"
def version(self) -> str:
return "1.0.0"
with belief_scope("version"):
return "1.0.0"
# [/DEF:version:Function]
# [DEF:SearchPlugin.get_schema:Function]
# [DEF:get_schema:Function]
# @PURPOSE: Returns the JSON schema for the search plugin parameters.
# @PRE: Plugin instance exists.
# @POST: Returns dictionary schema.
# @RETURN: Dict[str, Any] - JSON schema.
def get_schema(self) -> Dict[str, Any]:
return {
with belief_scope("get_schema"):
return {
"type": "object",
"properties": {
"env": {
@@ -55,12 +87,14 @@ class SearchPlugin(PluginBase):
},
"required": ["env", "query"]
}
# [/DEF:SearchPlugin.get_schema:Function]
# [/DEF:get_schema:Function]
# [DEF:SearchPlugin.execute:Function]
# [DEF:execute:Function]
# @PURPOSE: Executes the dataset search logic.
# @PARAM: params (Dict[str, Any]) - Search parameters.
# @PRE: Params contain valid 'env' and 'query'.
# @POST: Returns a dictionary with count and results list.
# @RETURN: Dict[str, Any] - Search results.
async def execute(self, params: Dict[str, Any]) -> Dict[str, Any]:
with belief_scope("SearchPlugin.execute", f"params={params}"):
env_name = params.get("env")
@@ -124,38 +158,45 @@ class SearchPlugin(PluginBase):
except Exception as e:
logger.error(f"[SearchPlugin.execute][Failure] Error during search: {e}")
raise
# [/DEF:SearchPlugin.execute:Function]
# [/DEF:execute:Function]
# [DEF:SearchPlugin._get_context:Function]
# [DEF:_get_context:Function]
# @PURPOSE: Extracts a small context around the match for display.
# @PARAM: text (str) - The full text to extract context from.
# @PARAM: match_text (str) - The matched text pattern.
# @PARAM: context_lines (int) - Number of lines of context to include.
# @PRE: text and match_text must be strings.
# @POST: Returns context string.
# @RETURN: str - Extracted context.
def _get_context(self, text: str, match_text: str, context_lines: int = 1) -> str:
"""
Extracts a small context around the match for display.
"""
if not match_text:
return text[:100] + "..." if len(text) > 100 else text
with belief_scope("_get_context"):
if not match_text:
return text[:100] + "..." if len(text) > 100 else text
lines = text.splitlines()
lines = text.splitlines()
match_line_index = -1
for i, line in enumerate(lines):
if match_text in line:
match_line_index = i
break
if match_line_index != -1:
start = max(0, match_line_index - context_lines)
end = min(len(lines), match_line_index + context_lines + 1)
context = []
for i in range(start, end):
line_content = lines[i]
if i == match_line_index:
context.append(f"==> {line_content}")
else:
context.append(f" {line_content}")
return "\n".join(context)
return text[:100] + "..." if len(text) > 100 else text
# [/DEF:SearchPlugin._get_context:Function]
if match_line_index != -1:
start = max(0, match_line_index - context_lines)
end = min(len(lines), match_line_index + context_lines + 1)
context = []
for i in range(start, end):
line_content = lines[i]
if i == match_line_index:
context.append(f"==> {line_content}")
else:
context.append(f" {line_content}")
return "\n".join(context)
return text[:100] + "..." if len(text) > 100 else text
# [/DEF:_get_context:Function]
# [/DEF:SearchPlugin:Class]
# [/DEF:SearchPluginModule:Module]