refactor complete

This commit is contained in:
2026-01-22 17:37:17 +03:00
parent 203ce446f4
commit d99a13d91f
41 changed files with 1789 additions and 3052 deletions

View File

@@ -42,4 +42,5 @@ urllib3==2.6.2
uvicorn==0.38.0
websockets==15.0.1
pandas
psycopg2-binary
psycopg2-binary
openpyxl

View File

@@ -11,12 +11,11 @@
# [SECTION: IMPORTS]
from fastapi import APIRouter, Depends, HTTPException
from typing import List, Dict, Optional
from backend.src.dependencies import get_config_manager, get_scheduler_service
from backend.src.core.superset_client import SupersetClient
from superset_tool.models import SupersetConfig
from ...dependencies import get_config_manager, get_scheduler_service
from ...core.superset_client import SupersetClient
from pydantic import BaseModel, Field
from backend.src.core.config_models import Environment as EnvModel
from backend.src.core.logger import belief_scope
from ...core.config_models import Environment as EnvModel
from ...core.logger import belief_scope
# [/SECTION]
router = APIRouter()
@@ -114,18 +113,7 @@ async def get_environment_databases(id: str, config_manager=Depends(get_config_m
try:
# Initialize SupersetClient from environment config
# Note: We need to map Environment model to SupersetConfig
superset_config = SupersetConfig(
env=env.name,
base_url=env.url,
auth={
"provider": "db", # Defaulting to db provider
"username": env.username,
"password": env.password,
"refresh": "false"
}
)
client = SupersetClient(superset_config)
client = SupersetClient(env)
return client.get_databases_summary()
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to fetch databases: {str(e)}")

View File

@@ -13,9 +13,9 @@
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from typing import List, Optional
from backend.src.dependencies import get_config_manager
from backend.src.core.database import get_db
from backend.src.models.mapping import DatabaseMapping
from ...dependencies import get_config_manager
from ...core.database import get_db
from ...models.mapping import DatabaseMapping
from pydantic import BaseModel
# [/SECTION]

View File

@@ -7,10 +7,9 @@
from fastapi import APIRouter, Depends, HTTPException
from typing import List, Dict
from backend.src.dependencies import get_config_manager, get_task_manager
from backend.src.models.dashboard import DashboardMetadata, DashboardSelection
from backend.src.core.superset_client import SupersetClient
from superset_tool.models import SupersetConfig
from ...dependencies import get_config_manager, get_task_manager
from ...models.dashboard import DashboardMetadata, DashboardSelection
from ...core.superset_client import SupersetClient
router = APIRouter(prefix="/api", tags=["migration"])
@@ -27,14 +26,7 @@ async def get_dashboards(env_id: str, config_manager=Depends(get_config_manager)
if not env:
raise HTTPException(status_code=404, detail="Environment not found")
config = SupersetConfig(
env=env.name,
base_url=env.url,
auth={'provider': 'db', 'username': env.username, 'password': env.password, 'refresh': False},
verify_ssl=True,
timeout=30
)
client = SupersetClient(config)
client = SupersetClient(env)
dashboards = client.get_dashboards_summary()
return dashboards
# [/DEF:get_dashboards:Function]

View File

@@ -17,7 +17,6 @@ from ...dependencies import get_config_manager
from ...core.config_manager import ConfigManager
from ...core.logger import logger, belief_scope
from ...core.superset_client import SupersetClient
from superset_tool.models import SupersetConfig
import os
# [/SECTION]
@@ -28,7 +27,7 @@ router = APIRouter()
# @PRE: Config manager is available.
# @POST: Returns masked AppConfig.
# @RETURN: AppConfig - The current configuration.
@router.get("/", response_model=AppConfig)
@router.get("", response_model=AppConfig)
async def get_settings(config_manager: ConfigManager = Depends(get_config_manager)):
with belief_scope("get_settings"):
logger.info("[get_settings][Entry] Fetching all settings")
@@ -85,17 +84,7 @@ async def add_environment(
# Validate connection before adding
try:
superset_config = SupersetConfig(
env=env.name,
base_url=env.url,
auth={
"provider": "db",
"username": env.username,
"password": env.password,
"refresh": "true"
}
)
client = SupersetClient(config=superset_config)
client = SupersetClient(env)
client.get_dashboards(query={"page_size": 1})
except Exception as e:
logger.error(f"[add_environment][Coherence:Failed] Connection validation failed: {e}")
@@ -130,17 +119,7 @@ async def update_environment(
# Validate connection before updating
try:
superset_config = SupersetConfig(
env=env_to_validate.name,
base_url=env_to_validate.url,
auth={
"provider": "db",
"username": env_to_validate.username,
"password": env_to_validate.password,
"refresh": "true"
}
)
client = SupersetClient(config=superset_config)
client = SupersetClient(env_to_validate)
client.get_dashboards(query={"page_size": 1})
except Exception as e:
logger.error(f"[update_environment][Coherence:Failed] Connection validation failed: {e}")
@@ -187,21 +166,8 @@ async def test_environment_connection(
raise HTTPException(status_code=404, detail=f"Environment {id} not found")
try:
# Create SupersetConfig
# Note: SupersetConfig expects 'auth' dict with specific keys
superset_config = SupersetConfig(
env=env.name,
base_url=env.url,
auth={
"provider": "db", # Defaulting to db for now
"username": env.username,
"password": env.password,
"refresh": "true"
}
)
# Initialize client (this will trigger authentication)
client = SupersetClient(config=superset_config)
client = SupersetClient(env)
# Try a simple request to verify
client.get_dashboards(query={"page_size": 1})

View File

@@ -6,10 +6,8 @@
import sys
from pathlib import Path
# Add project root to sys.path to allow importing superset_tool
# Assuming app.py is in backend/src/
# project_root is used for static files mounting
project_root = Path(__file__).resolve().parent.parent.parent
sys.path.append(str(project_root))
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Depends, Request, HTTPException
from fastapi.middleware.cors import CORSMiddleware
@@ -86,7 +84,7 @@ async def log_requests(request: Request, call_next):
app.include_router(plugins.router, prefix="/api/plugins", tags=["Plugins"])
app.include_router(tasks.router, prefix="/api/tasks", tags=["Tasks"])
app.include_router(settings.router, prefix="/api/settings", tags=["Settings"])
app.include_router(connections.router, prefix="/api/connections", tags=["Connections"])
app.include_router(connections.router, prefix="/api/settings/connections", tags=["Connections"])
app.include_router(environments.router, prefix="/api/environments", tags=["Environments"])
app.include_router(mappings.router)
app.include_router(migration.router)

View File

@@ -23,6 +23,8 @@ class Environment(BaseModel):
url: str
username: str
password: str # Will be masked in UI
verify_ssl: bool = True
timeout: int = 30
is_default: bool = False
backup_schedule: Schedule = Field(default_factory=Schedule)
# [/DEF:Environment:DataClass]

View File

@@ -29,11 +29,10 @@ class BeliefFormatter(logging.Formatter):
# @PARAM: record (logging.LogRecord) - The log record to format.
# @RETURN: str - The formatted log message.
def format(self, record):
msg = super().format(record)
anchor_id = getattr(_belief_state, 'anchor_id', None)
if anchor_id:
msg = f"[{anchor_id}][Action] {msg}"
return msg
record.msg = f"[{anchor_id}][Action] {record.msg}"
return super().format(record)
# [/DEF:format:Function]
# [/DEF:BeliefFormatter:Class]
@@ -193,6 +192,18 @@ class WebSocketLogHandler(logging.Handler):
# @SEMANTICS: logger, global, instance
# @PURPOSE: The global logger instance for the application, configured with both a console handler and the custom WebSocket handler.
logger = logging.getLogger("superset_tools_app")
# [DEF:believed:Function]
# @PURPOSE: A decorator that wraps a function in a belief scope.
# @PARAM: anchor_id (str) - The identifier for the semantic block.
def believed(anchor_id: str):
def decorator(func):
def wrapper(*args, **kwargs):
with belief_scope(anchor_id):
return func(*args, **kwargs)
return wrapper
return decorator
# [/DEF:believed:Function]
logger.setLevel(logging.INFO)
# Create a formatter

View File

@@ -1,82 +1,102 @@
# [DEF:backend.src.core.superset_client:Module]
#
# @SEMANTICS: superset, api, client, database, metadata
# @PURPOSE: Extends the base SupersetClient with database-specific metadata fetching.
# @SEMANTICS: superset, api, client, rest, http, dashboard, dataset, import, export
# @PURPOSE: Предоставляет высокоуровневый клиент для взаимодействия с Superset REST API, инкапсулируя логику запросов, обработку ошибок и пагинацию.
# @LAYER: Core
# @RELATION: INHERITS_FROM -> superset_tool.client.SupersetClient
# @RELATION: USES -> backend.src.core.utils.network.APIClient
# @RELATION: USES -> backend.src.core.config_models.Environment
#
# @INVARIANT: All database metadata requests must include UUID and name.
# @INVARIANT: All network operations must use the internal APIClient instance.
# @PUBLIC_API: SupersetClient
# [SECTION: IMPORTS]
from typing import List, Dict, Optional, Tuple
from .logger import belief_scope
from superset_tool.client import SupersetClient as BaseSupersetClient
from superset_tool.models import SupersetConfig
import json
import zipfile
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, Union, cast
from requests import Response
from .logger import logger as app_logger, belief_scope
from .utils.network import APIClient, SupersetAPIError, AuthenticationError, DashboardNotFoundError, NetworkError
from .utils.fileio import get_filename_from_headers
from .config_models import Environment
# [/SECTION]
# [DEF:SupersetClient:Class]
# @PURPOSE: Extended SupersetClient for migration-specific operations.
class SupersetClient(BaseSupersetClient):
# @PURPOSE: Класс-обёртка над Superset REST API, предоставляющий методы для работы с дашбордами и датасетами.
class SupersetClient:
# [DEF:__init__:Function]
# @PURPOSE: Инициализирует клиент, проверяет конфигурацию и создает сетевой клиент.
# @PRE: `env` должен быть валидным объектом Environment.
# @POST: Атрибуты `env` и `network` созданы и готовы к работе.
# @PARAM: env (Environment) - Конфигурация окружения.
def __init__(self, env: Environment):
with belief_scope("__init__"):
app_logger.info("[SupersetClient.__init__][Enter] Initializing SupersetClient for env %s.", env.name)
self.env = env
# Construct auth payload expected by Superset API
auth_payload = {
"username": env.username,
"password": env.password,
"provider": "db",
"refresh": "true"
}
self.network = APIClient(
config={
"base_url": env.url,
"auth": auth_payload
},
verify_ssl=env.verify_ssl,
timeout=env.timeout
)
self.delete_before_reimport: bool = False
app_logger.info("[SupersetClient.__init__][Exit] SupersetClient initialized.")
# [/DEF:__init__:Function]
# [DEF:authenticate:Function]
# @PURPOSE: Authenticates the client using the configured credentials.
# @PRE: self.network must be initialized with valid auth configuration.
# @POST: Client is authenticated and tokens are stored.
# @RETURN: Dict[str, str] - Authentication tokens.
def authenticate(self):
def authenticate(self) -> Dict[str, str]:
with belief_scope("SupersetClient.authenticate"):
return self.network.authenticate()
# [DEF:get_databases_summary:Function]
# @PURPOSE: Fetch a summary of databases including uuid, name, and engine.
# @PRE: self.network must be initialized and authenticated.
# @POST: Returns a list of database dictionaries with 'engine' field.
# @RETURN: List[Dict] - Summary of databases.
def get_databases_summary(self) -> List[Dict]:
with belief_scope("SupersetClient.get_databases_summary"):
"""
Fetch a summary of databases including uuid, name, and engine.
"""
query = {
"columns": ["uuid", "database_name", "backend"]
}
_, databases = self.get_databases(query=query)
# Map 'backend' to 'engine' for consistency with contracts
for db in databases:
db['engine'] = db.pop('backend', None)
return databases
# [/DEF:get_databases_summary:Function]
# [/DEF:authenticate:Function]
# [DEF:get_database_by_uuid:Function]
# @PURPOSE: Find a database by its UUID.
# @PRE: db_uuid must be a string.
# @POST: Returns database metadata if found.
# @PARAM: db_uuid (str) - The UUID of the database.
# @RETURN: Optional[Dict] - Database info if found, else None.
def get_database_by_uuid(self, db_uuid: str) -> Optional[Dict]:
with belief_scope("SupersetClient.get_database_by_uuid", f"uuid={db_uuid}"):
"""
Find a database by its UUID.
"""
query = {
"filters": [{"col": "uuid", "op": "eq", "value": db_uuid}]
}
_, databases = self.get_databases(query=query)
return databases[0] if databases else None
# [/DEF:get_database_by_uuid:Function]
@property
# [DEF:headers:Function]
# @PURPOSE: Возвращает базовые HTTP-заголовки, используемые сетевым клиентом.
def headers(self) -> dict:
with belief_scope("headers"):
return self.network.headers
# [/DEF:headers:Function]
# [SECTION: DASHBOARD OPERATIONS]
# [DEF:get_dashboards:Function]
# @PURPOSE: Получает полный список дашбордов, автоматически обрабатывая пагинацию.
# @PARAM: query (Optional[Dict]) - Дополнительные параметры запроса для API.
# @RETURN: Tuple[int, List[Dict]] - Кортеж (общее количество, список дашбордов).
def get_dashboards(self, query: Optional[Dict] = None) -> Tuple[int, List[Dict]]:
with belief_scope("get_dashboards"):
app_logger.info("[get_dashboards][Enter] Fetching dashboards.")
validated_query = self._validate_query_params(query or {})
if 'columns' not in validated_query:
validated_query['columns'] = ["slug", "id", "changed_on_utc", "dashboard_title", "published"]
total_count = self._fetch_total_object_count(endpoint="/dashboard/")
paginated_data = self._fetch_all_pages(
endpoint="/dashboard/",
pagination_options={"base_query": validated_query, "total_count": total_count, "results_field": "result"},
)
app_logger.info("[get_dashboards][Exit] Found %d dashboards.", total_count)
return total_count, paginated_data
# [/DEF:get_dashboards:Function]
# [DEF:get_dashboards_summary:Function]
# @PURPOSE: Fetches dashboard metadata optimized for the grid.
# @PRE: self.network must be authenticated.
# @POST: Returns a list of dashboard dictionaries mapped to the grid schema.
# @RETURN: List[Dict]
def get_dashboards_summary(self) -> List[Dict]:
with belief_scope("SupersetClient.get_dashboards_summary"):
"""
Fetches dashboard metadata optimized for the grid.
Returns a list of dictionaries mapped to DashboardMetadata fields.
"""
query = {
"columns": ["id", "dashboard_title", "changed_on_utc", "published"]
}
@@ -94,34 +114,287 @@ class SupersetClient(BaseSupersetClient):
return result
# [/DEF:get_dashboards_summary:Function]
# [DEF:export_dashboard:Function]
# @PURPOSE: Экспортирует дашборд в виде ZIP-архива.
# @PARAM: dashboard_id (int) - ID дашборда для экспорта.
# @RETURN: Tuple[bytes, str] - Бинарное содержимое ZIP-архива и имя файла.
def export_dashboard(self, dashboard_id: int) -> Tuple[bytes, str]:
with belief_scope("export_dashboard"):
app_logger.info("[export_dashboard][Enter] Exporting dashboard %s.", dashboard_id)
response = self.network.request(
method="GET",
endpoint="/dashboard/export/",
params={"q": json.dumps([dashboard_id])},
stream=True,
raw_response=True,
)
response = cast(Response, response)
self._validate_export_response(response, dashboard_id)
filename = self._resolve_export_filename(response, dashboard_id)
app_logger.info("[export_dashboard][Exit] Exported dashboard %s to %s.", dashboard_id, filename)
return response.content, filename
# [/DEF:export_dashboard:Function]
# [DEF:import_dashboard:Function]
# @PURPOSE: Импортирует дашборд из ZIP-файла.
# @PARAM: file_name (Union[str, Path]) - Путь к ZIP-архиву.
# @PARAM: dash_id (Optional[int]) - ID дашборда для удаления при сбое.
# @PARAM: dash_slug (Optional[str]) - Slug дашборда для поиска ID.
# @RETURN: Dict - Ответ API в случае успеха.
def import_dashboard(self, file_name: Union[str, Path], dash_id: Optional[int] = None, dash_slug: Optional[str] = None) -> Dict:
with belief_scope("import_dashboard"):
file_path = str(file_name)
self._validate_import_file(file_path)
try:
return self._do_import(file_path)
except Exception as exc:
app_logger.error("[import_dashboard][Failure] First import attempt failed: %s", exc, exc_info=True)
if not self.delete_before_reimport:
raise
target_id = self._resolve_target_id_for_delete(dash_id, dash_slug)
if target_id is None:
app_logger.error("[import_dashboard][Failure] No ID available for delete-retry.")
raise
self.delete_dashboard(target_id)
app_logger.info("[import_dashboard][State] Deleted dashboard ID %s, retrying import.", target_id)
return self._do_import(file_path)
# [/DEF:import_dashboard:Function]
# [DEF:delete_dashboard:Function]
# @PURPOSE: Удаляет дашборд по его ID или slug.
# @PARAM: dashboard_id (Union[int, str]) - ID или slug дашборда.
def delete_dashboard(self, dashboard_id: Union[int, str]) -> None:
with belief_scope("delete_dashboard"):
app_logger.info("[delete_dashboard][Enter] Deleting dashboard %s.", dashboard_id)
response = self.network.request(method="DELETE", endpoint=f"/dashboard/{dashboard_id}")
response = cast(Dict, response)
if response.get("result", True) is not False:
app_logger.info("[delete_dashboard][Success] Dashboard %s deleted.", dashboard_id)
else:
app_logger.warning("[delete_dashboard][Warning] Unexpected response while deleting %s: %s", dashboard_id, response)
# [/DEF:delete_dashboard:Function]
# [/SECTION]
# [SECTION: DATASET OPERATIONS]
# [DEF:get_datasets:Function]
# @PURPOSE: Получает полный список датасетов, автоматически обрабатывая пагинацию.
# @PARAM: query (Optional[Dict]) - Дополнительные параметры запроса.
# @RETURN: Tuple[int, List[Dict]] - Кортеж (общее количество, список датасетов).
def get_datasets(self, query: Optional[Dict] = None) -> Tuple[int, List[Dict]]:
with belief_scope("get_datasets"):
app_logger.info("[get_datasets][Enter] Fetching datasets.")
validated_query = self._validate_query_params(query)
total_count = self._fetch_total_object_count(endpoint="/dataset/")
paginated_data = self._fetch_all_pages(
endpoint="/dataset/",
pagination_options={"base_query": validated_query, "total_count": total_count, "results_field": "result"},
)
app_logger.info("[get_datasets][Exit] Found %d datasets.", total_count)
return total_count, paginated_data
# [/DEF:get_datasets:Function]
# [DEF:get_dataset:Function]
# @PURPOSE: Fetch full dataset structure including columns and metrics.
# @PRE: dataset_id must be a valid integer.
# @POST: Returns full dataset metadata from Superset API.
# @PARAM: dataset_id (int) - The ID of the dataset.
# @RETURN: Dict - The dataset metadata.
# @PURPOSE: Получает информацию о конкретном датасете по его ID.
# @PARAM: dataset_id (int) - ID датасета.
# @RETURN: Dict - Информация о датасете.
def get_dataset(self, dataset_id: int) -> Dict:
with belief_scope("SupersetClient.get_dataset", f"id={dataset_id}"):
"""
Fetch full dataset structure.
"""
return self.network.get(f"/api/v1/dataset/{dataset_id}").json()
app_logger.info("[get_dataset][Enter] Fetching dataset %s.", dataset_id)
response = self.network.request(method="GET", endpoint=f"/dataset/{dataset_id}")
response = cast(Dict, response)
app_logger.info("[get_dataset][Exit] Got dataset %s.", dataset_id)
return response
# [/DEF:get_dataset:Function]
# [DEF:update_dataset:Function]
# @PURPOSE: Update dataset metadata.
# @PRE: dataset_id must be valid, data must be a valid Superset dataset payload.
# @POST: Dataset is updated in Superset.
# @PARAM: dataset_id (int) - The ID of the dataset.
# @PARAM: data (Dict) - The payload for update.
def update_dataset(self, dataset_id: int, data: Dict):
# @PURPOSE: Обновляет данные датасета по его ID.
# @PARAM: dataset_id (int) - ID датасета.
# @PARAM: data (Dict) - Данные для обновления.
# @RETURN: Dict - Ответ API.
def update_dataset(self, dataset_id: int, data: Dict) -> Dict:
with belief_scope("SupersetClient.update_dataset", f"id={dataset_id}"):
"""
Update dataset metadata.
"""
self.network.put(f"/api/v1/dataset/{dataset_id}", json=data)
app_logger.info("[update_dataset][Enter] Updating dataset %s.", dataset_id)
response = self.network.request(
method="PUT",
endpoint=f"/dataset/{dataset_id}",
data=json.dumps(data),
headers={'Content-Type': 'application/json'}
)
response = cast(Dict, response)
app_logger.info("[update_dataset][Exit] Updated dataset %s.", dataset_id)
return response
# [/DEF:update_dataset:Function]
# [/SECTION]
# [SECTION: DATABASE OPERATIONS]
# [DEF:get_databases:Function]
# @PURPOSE: Получает полный список баз данных.
# @PARAM: query (Optional[Dict]) - Дополнительные параметры запроса.
# @RETURN: Tuple[int, List[Dict]] - Кортеж (общее количество, список баз данных).
def get_databases(self, query: Optional[Dict] = None) -> Tuple[int, List[Dict]]:
with belief_scope("get_databases"):
app_logger.info("[get_databases][Enter] Fetching databases.")
validated_query = self._validate_query_params(query or {})
if 'columns' not in validated_query:
validated_query['columns'] = []
total_count = self._fetch_total_object_count(endpoint="/database/")
paginated_data = self._fetch_all_pages(
endpoint="/database/",
pagination_options={"base_query": validated_query, "total_count": total_count, "results_field": "result"},
)
app_logger.info("[get_databases][Exit] Found %d databases.", total_count)
return total_count, paginated_data
# [/DEF:get_databases:Function]
# [DEF:get_database:Function]
# @PURPOSE: Получает информацию о конкретной базе данных по её ID.
# @PARAM: database_id (int) - ID базы данных.
# @RETURN: Dict - Информация о базе данных.
def get_database(self, database_id: int) -> Dict:
with belief_scope("get_database"):
app_logger.info("[get_database][Enter] Fetching database %s.", database_id)
response = self.network.request(method="GET", endpoint=f"/database/{database_id}")
response = cast(Dict, response)
app_logger.info("[get_database][Exit] Got database %s.", database_id)
return response
# [/DEF:get_database:Function]
# [DEF:get_databases_summary:Function]
# @PURPOSE: Fetch a summary of databases including uuid, name, and engine.
# @RETURN: List[Dict] - Summary of databases.
def get_databases_summary(self) -> List[Dict]:
with belief_scope("SupersetClient.get_databases_summary"):
query = {
"columns": ["uuid", "database_name", "backend"]
}
_, databases = self.get_databases(query=query)
# Map 'backend' to 'engine' for consistency with contracts
for db in databases:
db['engine'] = db.pop('backend', None)
return databases
# [/DEF:get_databases_summary:Function]
# [DEF:get_database_by_uuid:Function]
# @PURPOSE: Find a database by its UUID.
# @PARAM: db_uuid (str) - The UUID of the database.
# @RETURN: Optional[Dict] - Database info if found, else None.
def get_database_by_uuid(self, db_uuid: str) -> Optional[Dict]:
with belief_scope("SupersetClient.get_database_by_uuid", f"uuid={db_uuid}"):
query = {
"filters": [{"col": "uuid", "op": "eq", "value": db_uuid}]
}
_, databases = self.get_databases(query=query)
return databases[0] if databases else None
# [/DEF:get_database_by_uuid:Function]
# [/SECTION]
# [SECTION: HELPERS]
# [DEF:_resolve_target_id_for_delete:Function]
def _resolve_target_id_for_delete(self, dash_id: Optional[int], dash_slug: Optional[str]) -> Optional[int]:
with belief_scope("_resolve_target_id_for_delete"):
if dash_id is not None:
return dash_id
if dash_slug is not None:
app_logger.debug("[_resolve_target_id_for_delete][State] Resolving ID by slug '%s'.", dash_slug)
try:
_, candidates = self.get_dashboards(query={"filters": [{"col": "slug", "op": "eq", "value": dash_slug}]})
if candidates:
target_id = candidates[0]["id"]
app_logger.debug("[_resolve_target_id_for_delete][Success] Resolved slug to ID %s.", target_id)
return target_id
except Exception as e:
app_logger.warning("[_resolve_target_id_for_delete][Warning] Could not resolve slug '%s' to ID: %s", dash_slug, e)
return None
# [/DEF:_resolve_target_id_for_delete:Function]
# [DEF:_do_import:Function]
def _do_import(self, file_name: Union[str, Path]) -> Dict:
with belief_scope("_do_import"):
app_logger.debug(f"[_do_import][State] Uploading file: {file_name}")
file_path = Path(file_name)
if not file_path.exists():
app_logger.error(f"[_do_import][Failure] File does not exist: {file_name}")
raise FileNotFoundError(f"File does not exist: {file_name}")
return self.network.upload_file(
endpoint="/dashboard/import/",
file_info={"file_obj": file_path, "file_name": file_path.name, "form_field": "formData"},
extra_data={"overwrite": "true"},
timeout=self.env.timeout * 2,
)
# [/DEF:_do_import:Function]
# [DEF:_validate_export_response:Function]
def _validate_export_response(self, response: Response, dashboard_id: int) -> None:
with belief_scope("_validate_export_response"):
content_type = response.headers.get("Content-Type", "")
if "application/zip" not in content_type:
raise SupersetAPIError(f"Получен не ZIP-архив (Content-Type: {content_type})")
if not response.content:
raise SupersetAPIError("Получены пустые данные при экспорте")
# [/DEF:_validate_export_response:Function]
# [DEF:_resolve_export_filename:Function]
def _resolve_export_filename(self, response: Response, dashboard_id: int) -> str:
with belief_scope("_resolve_export_filename"):
filename = get_filename_from_headers(dict(response.headers))
if not filename:
from datetime import datetime
timestamp = datetime.now().strftime("%Y%m%dT%H%M%S")
filename = f"dashboard_export_{dashboard_id}_{timestamp}.zip"
app_logger.warning("[_resolve_export_filename][Warning] Generated filename: %s", filename)
return filename
# [/DEF:_resolve_export_filename:Function]
# [DEF:_validate_query_params:Function]
def _validate_query_params(self, query: Optional[Dict]) -> Dict:
with belief_scope("_validate_query_params"):
base_query = {"page": 0, "page_size": 1000}
return {**base_query, **(query or {})}
# [/DEF:_validate_query_params:Function]
# [DEF:_fetch_total_object_count:Function]
def _fetch_total_object_count(self, endpoint: str) -> int:
with belief_scope("_fetch_total_object_count"):
return self.network.fetch_paginated_count(
endpoint=endpoint,
query_params={"page": 0, "page_size": 1},
count_field="count",
)
# [/DEF:_fetch_total_object_count:Function]
# [DEF:_fetch_all_pages:Function]
def _fetch_all_pages(self, endpoint: str, pagination_options: Dict) -> List[Dict]:
with belief_scope("_fetch_all_pages"):
return self.network.fetch_paginated_data(endpoint=endpoint, pagination_options=pagination_options)
# [/DEF:_fetch_all_pages:Function]
# [DEF:_validate_import_file:Function]
def _validate_import_file(self, zip_path: Union[str, Path]) -> None:
with belief_scope("_validate_import_file"):
path = Path(zip_path)
if not path.exists():
raise FileNotFoundError(f"Файл {zip_path} не существует")
if not zipfile.is_zipfile(path):
raise SupersetAPIError(f"Файл {zip_path} не является ZIP-архивом")
with zipfile.ZipFile(path, "r") as zf:
if not any(n.endswith("metadata.yaml") for n in zf.namelist()):
raise SupersetAPIError(f"Архив {zip_path} не содержит 'metadata.yaml'")
# [/DEF:_validate_import_file:Function]
# [/SECTION]
# [/DEF:SupersetClient:Class]
# [/DEF:backend.src.core.superset_client:Module]

View File

@@ -0,0 +1,237 @@
# [DEF:backend.core.utils.dataset_mapper:Module]
#
# @SEMANTICS: dataset, mapping, postgresql, xlsx, superset
# @PURPOSE: Этот модуль отвечает за обновление метаданных (verbose_map) в датасетах Superset, извлекая их из PostgreSQL или XLSX-файлов.
# @LAYER: Domain
# @RELATION: DEPENDS_ON -> backend.core.superset_client
# @RELATION: DEPENDS_ON -> pandas
# @RELATION: DEPENDS_ON -> psycopg2
# @PUBLIC_API: DatasetMapper
# [SECTION: IMPORTS]
import pandas as pd # type: ignore
import psycopg2 # type: ignore
from typing import Dict, List, Optional, Any
from ..logger import logger as app_logger, belief_scope
# [/SECTION]
# [DEF:DatasetMapper:Class]
# @PURPOSE: Класс для меппинга и обновления verbose_map в датасетах Superset.
class DatasetMapper:
# [DEF:__init__:Function]
# @PURPOSE: Initializes the mapper.
# @POST: Объект DatasetMapper инициализирован.
def __init__(self):
pass
# [/DEF:__init__:Function]
# [DEF:get_postgres_comments:Function]
# @PURPOSE: Извлекает комментарии к колонкам из системного каталога PostgreSQL.
# @PRE: db_config должен содержать валидные параметры подключения (host, port, user, password, dbname).
# @PRE: table_name и table_schema должны быть строками.
# @POST: Возвращается словарь, где ключи - имена колонок, значения - комментарии из БД.
# @THROW: Exception - При ошибках подключения или выполнения запроса к БД.
# @PARAM: db_config (Dict) - Конфигурация для подключения к БД.
# @PARAM: table_name (str) - Имя таблицы.
# @PARAM: table_schema (str) - Схема таблицы.
# @RETURN: Dict[str, str] - Словарь с комментариями к колонкам.
def get_postgres_comments(self, db_config: Dict, table_name: str, table_schema: str) -> Dict[str, str]:
with belief_scope("Fetch comments from PostgreSQL"):
app_logger.info("[get_postgres_comments][Enter] Fetching comments from PostgreSQL for %s.%s.", table_schema, table_name)
query = f"""
SELECT
cols.column_name,
CASE
WHEN pg_catalog.col_description(
(SELECT c.oid
FROM pg_catalog.pg_class c
JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
WHERE c.relname = cols.table_name
AND n.nspname = cols.table_schema),
cols.ordinal_position::int
) LIKE '%|%' THEN
split_part(
pg_catalog.col_description(
(SELECT c.oid
FROM pg_catalog.pg_class c
JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
WHERE c.relname = cols.table_name
AND n.nspname = cols.table_schema),
cols.ordinal_position::int
),
'|',
1
)
ELSE
pg_catalog.col_description(
(SELECT c.oid
FROM pg_catalog.pg_class c
JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
WHERE c.relname = cols.table_name
AND n.nspname = cols.table_schema),
cols.ordinal_position::int
)
END AS column_comment
FROM
information_schema.columns cols
WHERE cols.table_catalog = '{db_config.get('dbname')}' AND cols.table_name = '{table_name}' AND cols.table_schema = '{table_schema}';
"""
comments = {}
try:
with psycopg2.connect(**db_config) as conn, conn.cursor() as cursor:
cursor.execute(query)
for row in cursor.fetchall():
if row[1]:
comments[row[0]] = row[1]
app_logger.info("[get_postgres_comments][Success] Fetched %d comments.", len(comments))
except Exception as e:
app_logger.error("[get_postgres_comments][Failure] %s", e, exc_info=True)
raise
return comments
# [/DEF:get_postgres_comments:Function]
# [DEF:load_excel_mappings:Function]
# @PURPOSE: Загружает меппинги 'column_name' -> 'column_comment' из XLSX файла.
# @PRE: file_path должен указывать на существующий XLSX файл.
# @POST: Возвращается словарь с меппингами из файла.
# @THROW: Exception - При ошибках чтения файла или парсинга.
# @PARAM: file_path (str) - Путь к XLSX файлу.
# @RETURN: Dict[str, str] - Словарь с меппингами.
def load_excel_mappings(self, file_path: str) -> Dict[str, str]:
with belief_scope("Load mappings from Excel"):
app_logger.info("[load_excel_mappings][Enter] Loading mappings from %s.", file_path)
try:
df = pd.read_excel(file_path)
mappings = df.set_index('column_name')['verbose_name'].to_dict()
app_logger.info("[load_excel_mappings][Success] Loaded %d mappings.", len(mappings))
return mappings
except Exception as e:
app_logger.error("[load_excel_mappings][Failure] %s", e, exc_info=True)
raise
# [/DEF:load_excel_mappings:Function]
# [DEF:run_mapping:Function]
# @PURPOSE: Основная функция для выполнения меппинга и обновления verbose_map датасета в Superset.
# @PRE: superset_client должен быть авторизован.
# @PRE: dataset_id должен быть существующим ID в Superset.
# @POST: Если найдены изменения, датасет в Superset обновлен через API.
# @RELATION: CALLS -> self.get_postgres_comments
# @RELATION: CALLS -> self.load_excel_mappings
# @RELATION: CALLS -> superset_client.get_dataset
# @RELATION: CALLS -> superset_client.update_dataset
# @PARAM: superset_client (Any) - Клиент Superset.
# @PARAM: dataset_id (int) - ID датасета для обновления.
# @PARAM: source (str) - Источник данных ('postgres', 'excel', 'both').
# @PARAM: postgres_config (Optional[Dict]) - Конфигурация для подключения к PostgreSQL.
# @PARAM: excel_path (Optional[str]) - Путь к XLSX файлу.
# @PARAM: table_name (Optional[str]) - Имя таблицы в PostgreSQL.
# @PARAM: table_schema (Optional[str]) - Схема таблицы в PostgreSQL.
def run_mapping(self, superset_client: Any, dataset_id: int, source: str, postgres_config: Optional[Dict] = None, excel_path: Optional[str] = None, table_name: Optional[str] = None, table_schema: Optional[str] = None):
with belief_scope(f"Run dataset mapping for ID {dataset_id}"):
app_logger.info("[run_mapping][Enter] Starting dataset mapping for ID %d from source '%s'.", dataset_id, source)
mappings: Dict[str, str] = {}
try:
if source in ['postgres', 'both']:
assert postgres_config and table_name and table_schema, "Postgres config is required."
mappings.update(self.get_postgres_comments(postgres_config, table_name, table_schema))
if source in ['excel', 'both']:
assert excel_path, "Excel path is required."
mappings.update(self.load_excel_mappings(excel_path))
if source not in ['postgres', 'excel', 'both']:
app_logger.error("[run_mapping][Failure] Invalid source: %s.", source)
return
dataset_response = superset_client.get_dataset(dataset_id)
dataset_data = dataset_response['result']
original_columns = dataset_data.get('columns', [])
updated_columns = []
changes_made = False
for column in original_columns:
col_name = column.get('column_name')
new_column = {
"column_name": col_name,
"id": column.get("id"),
"advanced_data_type": column.get("advanced_data_type"),
"description": column.get("description"),
"expression": column.get("expression"),
"extra": column.get("extra"),
"filterable": column.get("filterable"),
"groupby": column.get("groupby"),
"is_active": column.get("is_active"),
"is_dttm": column.get("is_dttm"),
"python_date_format": column.get("python_date_format"),
"type": column.get("type"),
"uuid": column.get("uuid"),
"verbose_name": column.get("verbose_name"),
}
new_column = {k: v for k, v in new_column.items() if v is not None}
if col_name in mappings:
mapping_value = mappings[col_name]
if isinstance(mapping_value, str) and new_column.get('verbose_name') != mapping_value:
new_column['verbose_name'] = mapping_value
changes_made = True
updated_columns.append(new_column)
updated_metrics = []
for metric in dataset_data.get("metrics", []):
new_metric = {
"id": metric.get("id"),
"metric_name": metric.get("metric_name"),
"expression": metric.get("expression"),
"verbose_name": metric.get("verbose_name"),
"description": metric.get("description"),
"d3format": metric.get("d3format"),
"currency": metric.get("currency"),
"extra": metric.get("extra"),
"warning_text": metric.get("warning_text"),
"metric_type": metric.get("metric_type"),
"uuid": metric.get("uuid"),
}
updated_metrics.append({k: v for k, v in new_metric.items() if v is not None})
if changes_made:
payload_for_update = {
"database_id": dataset_data.get("database", {}).get("id"),
"table_name": dataset_data.get("table_name"),
"schema": dataset_data.get("schema"),
"columns": updated_columns,
"owners": [owner["id"] for owner in dataset_data.get("owners", [])],
"metrics": updated_metrics,
"extra": dataset_data.get("extra"),
"description": dataset_data.get("description"),
"sql": dataset_data.get("sql"),
"cache_timeout": dataset_data.get("cache_timeout"),
"catalog": dataset_data.get("catalog"),
"default_endpoint": dataset_data.get("default_endpoint"),
"external_url": dataset_data.get("external_url"),
"fetch_values_predicate": dataset_data.get("fetch_values_predicate"),
"filter_select_enabled": dataset_data.get("filter_select_enabled"),
"is_managed_externally": dataset_data.get("is_managed_externally"),
"is_sqllab_view": dataset_data.get("is_sqllab_view"),
"main_dttm_col": dataset_data.get("main_dttm_col"),
"normalize_columns": dataset_data.get("normalize_columns"),
"offset": dataset_data.get("offset"),
"template_params": dataset_data.get("template_params"),
}
payload_for_update = {k: v for k, v in payload_for_update.items() if v is not None}
superset_client.update_dataset(dataset_id, payload_for_update)
app_logger.info("[run_mapping][Success] Dataset %d columns' verbose_name updated.", dataset_id)
else:
app_logger.info("[run_mapping][State] No changes in columns' verbose_name, skipping update.")
except (AssertionError, FileNotFoundError, Exception) as e:
app_logger.error("[run_mapping][Failure] %s", e, exc_info=True)
return
# [/DEF:run_mapping:Function]
# [/DEF:DatasetMapper:Class]
# [/DEF:backend.core.utils.dataset_mapper:Module]

View File

@@ -0,0 +1,486 @@
# [DEF:backend.core.utils.fileio:Module]
#
# @SEMANTICS: file, io, zip, yaml, temp, archive, utility
# @PURPOSE: Предоставляет набор утилит для управления файловыми операциями, включая работу с временными файлами, архивами ZIP, файлами YAML и очистку директорий.
# @LAYER: Infra
# @RELATION: DEPENDS_ON -> backend.src.core.logger
# @RELATION: DEPENDS_ON -> pyyaml
# @PUBLIC_API: create_temp_file, remove_empty_directories, read_dashboard_from_disk, calculate_crc32, RetentionPolicy, archive_exports, save_and_unpack_dashboard, update_yamls, create_dashboard_export, sanitize_filename, get_filename_from_headers, consolidate_archive_folders
# [SECTION: IMPORTS]
import os
import re
import zipfile
from pathlib import Path
from typing import Any, Optional, Tuple, Dict, List, Union, LiteralString, Generator
from contextlib import contextmanager
import tempfile
from datetime import date, datetime
import shutil
import zlib
from dataclasses import dataclass
import yaml
from ..logger import logger as app_logger, belief_scope
# [/SECTION]
# [DEF:InvalidZipFormatError:Class]
class InvalidZipFormatError(Exception):
pass
# [DEF:create_temp_file:Function]
# @PURPOSE: Контекстный менеджер для создания временного файла или директории с гарантированным удалением.
# @PRE: suffix должен быть строкой, определяющей тип ресурса.
# @POST: Временный ресурс создан и путь к нему возвращен; ресурс удален после выхода из контекста.
# @PARAM: content (Optional[bytes]) - Бинарное содержимое для записи во временный файл.
# @PARAM: suffix (str) - Суффикс ресурса. Если `.dir`, создается директория.
# @PARAM: mode (str) - Режим записи в файл (e.g., 'wb').
# @YIELDS: Path - Путь к временному ресурсу.
# @THROW: IOError - При ошибках создания ресурса.
@contextmanager
def create_temp_file(content: Optional[bytes] = None, suffix: str = ".zip", mode: str = 'wb', dry_run = False) -> Generator[Path, None, None]:
with belief_scope("Create temporary resource"):
resource_path = None
is_dir = suffix.startswith('.dir')
try:
if is_dir:
with tempfile.TemporaryDirectory(suffix=suffix) as temp_dir:
resource_path = Path(temp_dir)
app_logger.debug("[create_temp_file][State] Created temporary directory: %s", resource_path)
yield resource_path
else:
fd, temp_path_str = tempfile.mkstemp(suffix=suffix)
resource_path = Path(temp_path_str)
os.close(fd)
if content:
resource_path.write_bytes(content)
app_logger.debug("[create_temp_file][State] Created temporary file: %s", resource_path)
yield resource_path
finally:
if resource_path and resource_path.exists() and not dry_run:
try:
if resource_path.is_dir():
shutil.rmtree(resource_path)
app_logger.debug("[create_temp_file][Cleanup] Removed temporary directory: %s", resource_path)
else:
resource_path.unlink()
app_logger.debug("[create_temp_file][Cleanup] Removed temporary file: %s", resource_path)
except OSError as e:
app_logger.error("[create_temp_file][Failure] Error during cleanup of %s: %s", resource_path, e)
# [/DEF:create_temp_file:Function]
# [DEF:remove_empty_directories:Function]
# @PURPOSE: Рекурсивно удаляет все пустые поддиректории, начиная с указанного пути.
# @PRE: root_dir должен быть путем к существующей директории.
# @POST: Все пустые поддиректории удалены, возвращено их количество.
# @PARAM: root_dir (str) - Путь к корневой директории для очистки.
# @RETURN: int - Количество удаленных директорий.
def remove_empty_directories(root_dir: str) -> int:
with belief_scope(f"Remove empty directories in {root_dir}"):
app_logger.info("[remove_empty_directories][Enter] Starting cleanup of empty directories in %s", root_dir)
removed_count = 0
if not os.path.isdir(root_dir):
app_logger.error("[remove_empty_directories][Failure] Directory not found: %s", root_dir)
return 0
for current_dir, _, _ in os.walk(root_dir, topdown=False):
if not os.listdir(current_dir):
try:
os.rmdir(current_dir)
removed_count += 1
app_logger.info("[remove_empty_directories][State] Removed empty directory: %s", current_dir)
except OSError as e:
app_logger.error("[remove_empty_directories][Failure] Failed to remove %s: %s", current_dir, e)
app_logger.info("[remove_empty_directories][Exit] Removed %d empty directories.", removed_count)
return removed_count
# [/DEF:remove_empty_directories:Function]
# [DEF:read_dashboard_from_disk:Function]
# @PURPOSE: Читает бинарное содержимое файла с диска.
# @PRE: file_path должен указывать на существующий файл.
# @POST: Возвращает байты содержимого и имя файла.
# @PARAM: file_path (str) - Путь к файлу.
# @RETURN: Tuple[bytes, str] - Кортеж (содержимое, имя файла).
# @THROW: FileNotFoundError - Если файл не найден.
def read_dashboard_from_disk(file_path: str) -> Tuple[bytes, str]:
with belief_scope(f"Read dashboard from {file_path}"):
path = Path(file_path)
assert path.is_file(), f"Файл дашборда не найден: {file_path}"
app_logger.info("[read_dashboard_from_disk][Enter] Reading file: %s", file_path)
content = path.read_bytes()
if not content:
app_logger.warning("[read_dashboard_from_disk][Warning] File is empty: %s", file_path)
return content, path.name
# [/DEF:read_dashboard_from_disk:Function]
# [DEF:calculate_crc32:Function]
# @PURPOSE: Вычисляет контрольную сумму CRC32 для файла.
# @PRE: file_path должен быть объектом Path к существующему файлу.
# @POST: Возвращает 8-значную hex-строку CRC32.
# @PARAM: file_path (Path) - Путь к файлу.
# @RETURN: str - 8-значное шестнадцатеричное представление CRC32.
# @THROW: IOError - При ошибках чтения файла.
def calculate_crc32(file_path: Path) -> str:
with belief_scope(f"Calculate CRC32 for {file_path}"):
with open(file_path, 'rb') as f:
crc32_value = zlib.crc32(f.read())
return f"{crc32_value:08x}"
# [/DEF:calculate_crc32:Function]
# [SECTION: DATA_CLASSES]
# [DEF:RetentionPolicy:DataClass]
# @PURPOSE: Определяет политику хранения для архивов (ежедневные, еженедельные, ежемесячные).
@dataclass
class RetentionPolicy:
daily: int = 7
weekly: int = 4
monthly: int = 12
# [/DEF:RetentionPolicy:DataClass]
# [/SECTION]
# [DEF:archive_exports:Function]
# @PURPOSE: Управляет архивом экспортированных файлов, применяя политику хранения и дедупликацию.
# @PRE: output_dir должен быть путем к существующей директории.
# @POST: Старые или дублирующиеся архивы удалены согласно политике.
# @RELATION: CALLS -> apply_retention_policy
# @RELATION: CALLS -> calculate_crc32
# @PARAM: output_dir (str) - Директория с архивами.
# @PARAM: policy (RetentionPolicy) - Политика хранения.
# @PARAM: deduplicate (bool) - Флаг для включения удаления дубликатов по CRC32.
def archive_exports(output_dir: str, policy: RetentionPolicy, deduplicate: bool = False) -> None:
with belief_scope(f"Archive exports in {output_dir}"):
output_path = Path(output_dir)
if not output_path.is_dir():
app_logger.warning("[archive_exports][Skip] Archive directory not found: %s", output_dir)
return
app_logger.info("[archive_exports][Enter] Managing archive in %s", output_dir)
# 1. Collect all zip files
zip_files = list(output_path.glob("*.zip"))
if not zip_files:
app_logger.info("[archive_exports][State] No zip files found in %s", output_dir)
return
# 2. Deduplication
if deduplicate:
app_logger.info("[archive_exports][State] Starting deduplication...")
checksums = {}
files_to_remove = []
# Sort by modification time (newest first) to keep the latest version
zip_files.sort(key=lambda f: f.stat().st_mtime, reverse=True)
for file_path in zip_files:
try:
crc = calculate_crc32(file_path)
if crc in checksums:
files_to_remove.append(file_path)
app_logger.debug("[archive_exports][State] Duplicate found: %s (same as %s)", file_path.name, checksums[crc].name)
else:
checksums[crc] = file_path
except Exception as e:
app_logger.error("[archive_exports][Failure] Failed to calculate CRC32 for %s: %s", file_path, e)
for f in files_to_remove:
try:
f.unlink()
zip_files.remove(f)
app_logger.info("[archive_exports][State] Removed duplicate: %s", f.name)
except OSError as e:
app_logger.error("[archive_exports][Failure] Failed to remove duplicate %s: %s", f, e)
# 3. Retention Policy
files_with_dates = []
for file_path in zip_files:
# Try to extract date from filename
# Pattern: ..._YYYYMMDD_HHMMSS.zip or ..._YYYYMMDD.zip
match = re.search(r'_(\d{8})_', file_path.name)
file_date = None
if match:
try:
date_str = match.group(1)
file_date = datetime.strptime(date_str, "%Y%m%d").date()
except ValueError:
pass
if not file_date:
# Fallback to modification time
file_date = datetime.fromtimestamp(file_path.stat().st_mtime).date()
files_with_dates.append((file_path, file_date))
files_to_keep = apply_retention_policy(files_with_dates, policy)
for file_path, _ in files_with_dates:
if file_path not in files_to_keep:
try:
file_path.unlink()
app_logger.info("[archive_exports][State] Removed by retention policy: %s", file_path.name)
except OSError as e:
app_logger.error("[archive_exports][Failure] Failed to remove %s: %s", file_path, e)
# [/DEF:archive_exports:Function]
# [DEF:apply_retention_policy:Function]
# @PURPOSE: (Helper) Применяет политику хранения к списку файлов, возвращая те, что нужно сохранить.
# @PRE: files_with_dates is a list of (Path, date) tuples.
# @POST: Returns a set of files to keep.
# @PARAM: files_with_dates (List[Tuple[Path, date]]) - Список файлов с датами.
# @PARAM: policy (RetentionPolicy) - Политика хранения.
# @RETURN: set - Множество путей к файлам, которые должны быть сохранены.
def apply_retention_policy(files_with_dates: List[Tuple[Path, date]], policy: RetentionPolicy) -> set:
with belief_scope("Apply retention policy"):
# Сортируем по дате (от новой к старой)
sorted_files = sorted(files_with_dates, key=lambda x: x[1], reverse=True)
# Словарь для хранения файлов по категориям
daily_files = []
weekly_files = []
monthly_files = []
today = date.today()
for file_path, file_date in sorted_files:
# Ежедневные
if (today - file_date).days < policy.daily:
daily_files.append(file_path)
# Еженедельные
elif (today - file_date).days < policy.weekly * 7:
weekly_files.append(file_path)
# Ежемесячные
elif (today - file_date).days < policy.monthly * 30:
monthly_files.append(file_path)
# Возвращаем множество файлов, которые нужно сохранить
files_to_keep = set()
files_to_keep.update(daily_files)
files_to_keep.update(weekly_files[:policy.weekly])
files_to_keep.update(monthly_files[:policy.monthly])
app_logger.debug("[apply_retention_policy][State] Keeping %d files according to retention policy", len(files_to_keep))
return files_to_keep
# [/DEF:apply_retention_policy:Function]
# [DEF:save_and_unpack_dashboard:Function]
# @PURPOSE: Сохраняет бинарное содержимое ZIP-архива на диск и опционально распаковывает его.
# @PRE: zip_content должен быть байтами валидного ZIP-архива.
# @POST: ZIP-файл сохранен, и если unpack=True, он распакован в output_dir.
# @PARAM: zip_content (bytes) - Содержимое ZIP-архива.
# @PARAM: output_dir (Union[str, Path]) - Директория для сохранения.
# @PARAM: unpack (bool) - Флаг, нужно ли распаковывать архив.
# @PARAM: original_filename (Optional[str]) - Исходное имя файла для сохранения.
# @RETURN: Tuple[Path, Optional[Path]] - Путь к ZIP-файлу и, если применимо, путь к директории с распаковкой.
# @THROW: InvalidZipFormatError - При ошибке формата ZIP.
def save_and_unpack_dashboard(zip_content: bytes, output_dir: Union[str, Path], unpack: bool = False, original_filename: Optional[str] = None) -> Tuple[Path, Optional[Path]]:
with belief_scope("Save and unpack dashboard"):
app_logger.info("[save_and_unpack_dashboard][Enter] Processing dashboard. Unpack: %s", unpack)
try:
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
zip_name = sanitize_filename(original_filename) if original_filename else f"dashboard_export_{datetime.now().strftime('%Y%m%d_%H%M%S')}.zip"
zip_path = output_path / zip_name
zip_path.write_bytes(zip_content)
app_logger.info("[save_and_unpack_dashboard][State] Dashboard saved to: %s", zip_path)
if unpack:
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
zip_ref.extractall(output_path)
app_logger.info("[save_and_unpack_dashboard][State] Dashboard unpacked to: %s", output_path)
return zip_path, output_path
return zip_path, None
except zipfile.BadZipFile as e:
app_logger.error("[save_and_unpack_dashboard][Failure] Invalid ZIP archive: %s", e)
raise InvalidZipFormatError(f"Invalid ZIP file: {e}") from e
# [/DEF:save_and_unpack_dashboard:Function]
# [DEF:update_yamls:Function]
# @PURPOSE: Обновляет конфигурации в YAML-файлах, заменяя значения или применяя regex.
# @PRE: path должен быть существующей директорией.
# @POST: Все YAML файлы в директории обновлены согласно переданным параметрам.
# @RELATION: CALLS -> _update_yaml_file
# @THROW: FileNotFoundError - Если `path` не существует.
# @PARAM: db_configs (Optional[List[Dict]]) - Список конфигураций для замены.
# @PARAM: path (str) - Путь к директории с YAML файлами.
# @PARAM: regexp_pattern (Optional[LiteralString]) - Паттерн для поиска.
# @PARAM: replace_string (Optional[LiteralString]) - Строка для замены.
def update_yamls(db_configs: Optional[List[Dict[str, Any]]] = None, path: str = "dashboards", regexp_pattern: Optional[LiteralString] = None, replace_string: Optional[LiteralString] = None) -> None:
with belief_scope("Update YAML configurations"):
app_logger.info("[update_yamls][Enter] Starting YAML configuration update.")
dir_path = Path(path)
assert dir_path.is_dir(), f"Путь {path} не существует или не является директорией"
configs: List[Dict[str, Any]] = db_configs or []
for file_path in dir_path.rglob("*.yaml"):
_update_yaml_file(file_path, configs, regexp_pattern, replace_string)
# [/DEF:update_yamls:Function]
# [DEF:_update_yaml_file:Function]
# @PURPOSE: (Helper) Обновляет один YAML файл.
# @PRE: file_path должен быть объектом Path к существующему YAML файлу.
# @POST: Файл обновлен согласно переданным конфигурациям или регулярному выражению.
# @PARAM: file_path (Path) - Путь к файлу.
# @PARAM: db_configs (List[Dict]) - Конфигурации.
# @PARAM: regexp_pattern (Optional[str]) - Паттерн.
# @PARAM: replace_string (Optional[str]) - Замена.
def _update_yaml_file(file_path: Path, db_configs: List[Dict[str, Any]], regexp_pattern: Optional[str], replace_string: Optional[str]) -> None:
with belief_scope(f"Update YAML file: {file_path}"):
# Читаем содержимое файла
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
except Exception as e:
app_logger.error("[_update_yaml_file][Failure] Failed to read %s: %s", file_path, e)
return
# Если задан pattern и replace_string, применяем замену по регулярному выражению
if regexp_pattern and replace_string:
try:
new_content = re.sub(regexp_pattern, replace_string, content)
if new_content != content:
with open(file_path, 'w', encoding='utf-8') as f:
f.write(new_content)
app_logger.info("[_update_yaml_file][State] Updated %s using regex pattern", file_path)
except Exception as e:
app_logger.error("[_update_yaml_file][Failure] Error applying regex to %s: %s", file_path, e)
# Если заданы конфигурации, заменяем значения (поддержка old/new)
if db_configs:
try:
# Прямой текстовый заменитель для старых/новых значений, чтобы сохранить структуру файла
modified_content = content
for cfg in db_configs:
# Ожидаем структуру: {'old': {...}, 'new': {...}}
old_cfg = cfg.get('old', {})
new_cfg = cfg.get('new', {})
for key, old_val in old_cfg.items():
if key in new_cfg:
new_val = new_cfg[key]
# Заменяем только точные совпадения старого значения в тексте YAML, используя ключ для контекста
if isinstance(old_val, str):
# Ищем паттерн: key: "value" или key: value
key_pattern = re.escape(key)
val_pattern = re.escape(old_val)
# Группы: 1=ключ+разделитель, 2=открывающая кавычка (опц), 3=значение, 4=закрывающая кавычка (опц)
pattern = rf'({key_pattern}\s*:\s*)(["\']?)({val_pattern})(["\']?)'
# [DEF:replacer:Function]
# @PURPOSE: Функция замены, сохраняющая кавычки если они были.
# @PRE: match должен быть объектом совпадения регулярного выражения.
# @POST: Возвращает строку с новым значением, сохраняя префикс и кавычки.
def replacer(match):
prefix = match.group(1)
quote_open = match.group(2)
quote_close = match.group(4)
return f"{prefix}{quote_open}{new_val}{quote_close}"
# [/DEF:replacer:Function]
modified_content = re.sub(pattern, replacer, modified_content)
app_logger.info("[_update_yaml_file][State] Replaced '%s' with '%s' for key %s in %s", old_val, new_val, key, file_path)
# Записываем обратно изменённый контент без парсинга YAML, сохраняем оригинальное форматирование
with open(file_path, 'w', encoding='utf-8') as f:
f.write(modified_content)
except Exception as e:
app_logger.error("[_update_yaml_file][Failure] Error performing raw replacement in %s: %s", file_path, e)
# [/DEF:_update_yaml_file:Function]
# [DEF:create_dashboard_export:Function]
# @PURPOSE: Создает ZIP-архив из указанных исходных путей.
# @PRE: source_paths должен содержать существующие пути.
# @POST: ZIP-архив создан по пути zip_path.
# @PARAM: zip_path (Union[str, Path]) - Путь для сохранения ZIP архива.
# @PARAM: source_paths (List[Union[str, Path]]) - Список исходных путей для архивации.
# @PARAM: exclude_extensions (Optional[List[str]]) - Список расширений для исключения.
# @RETURN: bool - `True` при успехе, `False` при ошибке.
def create_dashboard_export(zip_path: Union[str, Path], source_paths: List[Union[str, Path]], exclude_extensions: Optional[List[str]] = None) -> bool:
with belief_scope(f"Create dashboard export: {zip_path}"):
app_logger.info("[create_dashboard_export][Enter] Packing dashboard: %s -> %s", source_paths, zip_path)
try:
exclude_ext = [ext.lower() for ext in exclude_extensions or []]
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
for src_path_str in source_paths:
src_path = Path(src_path_str)
assert src_path.exists(), f"Путь не найден: {src_path}"
for item in src_path.rglob('*'):
if item.is_file() and item.suffix.lower() not in exclude_ext:
arcname = item.relative_to(src_path.parent)
zipf.write(item, arcname)
app_logger.info("[create_dashboard_export][Exit] Archive created: %s", zip_path)
return True
except (IOError, zipfile.BadZipFile, AssertionError) as e:
app_logger.error("[create_dashboard_export][Failure] Error: %s", e, exc_info=True)
return False
# [/DEF:create_dashboard_export:Function]
# [DEF:sanitize_filename:Function]
# @PURPOSE: Очищает строку от символов, недопустимых в именах файлов.
# @PRE: filename должен быть строкой.
# @POST: Возвращает строку без спецсимволов.
# @PARAM: filename (str) - Исходное имя файла.
# @RETURN: str - Очищенная строка.
def sanitize_filename(filename: str) -> str:
with belief_scope(f"Sanitize filename: {filename}"):
return re.sub(r'[\\/*?:"<>|]', "_", filename).strip()
# [/DEF:sanitize_filename:Function]
# [DEF:get_filename_from_headers:Function]
# @PURPOSE: Извлекает имя файла из HTTP заголовка 'Content-Disposition'.
# @PRE: headers должен быть словарем заголовков.
# @POST: Возвращает имя файла или None, если заголовок отсутствует.
# @PARAM: headers (dict) - Словарь HTTP заголовков.
# @RETURN: Optional[str] - Имя файла or `None`.
def get_filename_from_headers(headers: dict) -> Optional[str]:
with belief_scope("Get filename from headers"):
content_disposition = headers.get("Content-Disposition", "")
if match := re.search(r'filename="?([^"]+)"?', content_disposition):
return match.group(1).strip()
return None
# [/DEF:get_filename_from_headers:Function]
# [DEF:consolidate_archive_folders:Function]
# @PURPOSE: Консолидирует директории архивов на основе общего слага в имени.
# @PRE: root_directory должен быть объектом Path к существующей директории.
# @POST: Директории с одинаковым префиксом объединены в одну.
# @THROW: TypeError, ValueError - Если `root_directory` невалиден.
# @PARAM: root_directory (Path) - Корневая директория для консолидации.
def consolidate_archive_folders(root_directory: Path) -> None:
with belief_scope(f"Consolidate archives in {root_directory}"):
assert isinstance(root_directory, Path), "root_directory must be a Path object."
assert root_directory.is_dir(), "root_directory must be an existing directory."
app_logger.info("[consolidate_archive_folders][Enter] Consolidating archives in %s", root_directory)
# Собираем все директории с архивами
archive_dirs = []
for item in root_directory.iterdir():
if item.is_dir():
# Проверяем, есть ли в директории ZIP-архивы
if any(item.glob("*.zip")):
archive_dirs.append(item)
# Группируем по слагу (части имени до первого '_')
slug_groups = {}
for dir_path in archive_dirs:
dir_name = dir_path.name
slug = dir_name.split('_')[0] if '_' in dir_name else dir_name
if slug not in slug_groups:
slug_groups[slug] = []
slug_groups[slug].append(dir_path)
# Для каждой группы консолидируем
for slug, dirs in slug_groups.items():
if len(dirs) <= 1:
continue
# Создаем целевую директорию
target_dir = root_directory / slug
target_dir.mkdir(exist_ok=True)
app_logger.info("[consolidate_archive_folders][State] Consolidating %d directories under %s", len(dirs), target_dir)
# Перемещаем содержимое
for source_dir in dirs:
if source_dir == target_dir:
continue
for item in source_dir.iterdir():
dest_item = target_dir / item.name
try:
if item.is_dir():
shutil.move(str(item), str(dest_item))
else:
shutil.move(str(item), str(dest_item))
except Exception as e:
app_logger.error("[consolidate_archive_folders][Failure] Failed to move %s to %s: %s", item, dest_item, e)
# Удаляем исходную директорию
try:
source_dir.rmdir()
app_logger.info("[consolidate_archive_folders][State] Removed source directory: %s", source_dir)
except Exception as e:
app_logger.error("[consolidate_archive_folders][Failure] Failed to remove source directory %s: %s", source_dir, e)
# [/DEF:consolidate_archive_folders:Function]
# [/DEF:backend.core.utils.fileio:Module]

View File

@@ -0,0 +1,286 @@
# [DEF:backend.core.utils.network:Module]
#
# @SEMANTICS: network, http, client, api, requests, session, authentication
# @PURPOSE: Инкапсулирует низкоуровневую HTTP-логику для взаимодействия с Superset API, включая аутентификацию, управление сессией, retry-логику и обработку ошибок.
# @LAYER: Infra
# @RELATION: DEPENDS_ON -> backend.src.core.logger
# @RELATION: DEPENDS_ON -> requests
# @PUBLIC_API: APIClient
# [SECTION: IMPORTS]
from typing import Optional, Dict, Any, List, Union, cast
import json
import io
from pathlib import Path
import requests
from requests.adapters import HTTPAdapter
import urllib3
from urllib3.util.retry import Retry
from ..logger import logger as app_logger, belief_scope
# [/SECTION]
# [DEF:SupersetAPIError:Class]
class SupersetAPIError(Exception):
def __init__(self, message: str = "Superset API error", **context: Any):
self.context = context
super().__init__(f"[API_FAILURE] {message} | Context: {self.context}")
# [DEF:AuthenticationError:Class]
class AuthenticationError(SupersetAPIError):
def __init__(self, message: str = "Authentication failed", **context: Any):
super().__init__(message, type="authentication", **context)
# [DEF:PermissionDeniedError:Class]
class PermissionDeniedError(AuthenticationError):
def __init__(self, message: str = "Permission denied", **context: Any):
super().__init__(message, **context)
# [DEF:DashboardNotFoundError:Class]
class DashboardNotFoundError(SupersetAPIError):
def __init__(self, resource_id: Union[int, str], message: str = "Dashboard not found", **context: Any):
super().__init__(f"Dashboard '{resource_id}' {message}", subtype="not_found", resource_id=resource_id, **context)
# [DEF:NetworkError:Class]
class NetworkError(Exception):
def __init__(self, message: str = "Network connection failed", **context: Any):
self.context = context
super().__init__(f"[NETWORK_FAILURE] {message} | Context: {self.context}")
# [DEF:APIClient:Class]
# @PURPOSE: Инкапсулирует HTTP-логику для работы с API, включая сессии, аутентификацию, и обработку запросов.
class APIClient:
DEFAULT_TIMEOUT = 30
# [DEF:__init__:Function]
# @PURPOSE: Инициализирует API клиент с конфигурацией, сессией и логгером.
# @PARAM: config (Dict[str, Any]) - Конфигурация.
# @PARAM: verify_ssl (bool) - Проверять ли SSL.
# @PARAM: timeout (int) - Таймаут запросов.
# @PRE: config must contain 'base_url' and 'auth'.
# @POST: APIClient instance is initialized with a session.
def __init__(self, config: Dict[str, Any], verify_ssl: bool = True, timeout: int = DEFAULT_TIMEOUT):
with belief_scope("__init__"):
app_logger.info("[APIClient.__init__][Entry] Initializing APIClient.")
self.base_url: str = config.get("base_url", "")
self.auth = config.get("auth")
self.request_settings = {"verify_ssl": verify_ssl, "timeout": timeout}
self.session = self._init_session()
self._tokens: Dict[str, str] = {}
self._authenticated = False
app_logger.info("[APIClient.__init__][Exit] APIClient initialized.")
# [/DEF:__init__:Function]
# [DEF:_init_session:Function]
# @PURPOSE: Создает и настраивает `requests.Session` с retry-логикой.
# @PRE: self.request_settings must be initialized.
# @POST: Returns a configured requests.Session instance.
# @RETURN: requests.Session - Настроенная сессия.
def _init_session(self) -> requests.Session:
with belief_scope("_init_session"):
session = requests.Session()
retries = Retry(total=3, backoff_factor=0.5, status_forcelist=[500, 502, 503, 504])
adapter = HTTPAdapter(max_retries=retries)
session.mount('http://', adapter)
session.mount('https://', adapter)
if not self.request_settings["verify_ssl"]:
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
app_logger.warning("[_init_session][State] SSL verification disabled.")
session.verify = self.request_settings["verify_ssl"]
return session
# [/DEF:_init_session:Function]
# [DEF:authenticate:Function]
# @PURPOSE: Выполняет аутентификацию в Superset API и получает access и CSRF токены.
# @PRE: self.auth and self.base_url must be valid.
# @POST: `self._tokens` заполнен, `self._authenticated` установлен в `True`.
# @RETURN: Dict[str, str] - Словарь с токенами.
# @THROW: AuthenticationError, NetworkError - при ошибках.
def authenticate(self) -> Dict[str, str]:
with belief_scope("authenticate"):
app_logger.info("[authenticate][Enter] Authenticating to %s", self.base_url)
try:
login_url = f"{self.base_url}/security/login"
response = self.session.post(login_url, json=self.auth, timeout=self.request_settings["timeout"])
response.raise_for_status()
access_token = response.json()["access_token"]
csrf_url = f"{self.base_url}/security/csrf_token/"
csrf_response = self.session.get(csrf_url, headers={"Authorization": f"Bearer {access_token}"}, timeout=self.request_settings["timeout"])
csrf_response.raise_for_status()
self._tokens = {"access_token": access_token, "csrf_token": csrf_response.json()["result"]}
self._authenticated = True
app_logger.info("[authenticate][Exit] Authenticated successfully.")
return self._tokens
except requests.exceptions.HTTPError as e:
raise AuthenticationError(f"Authentication failed: {e}") from e
except (requests.exceptions.RequestException, KeyError) as e:
raise NetworkError(f"Network or parsing error during authentication: {e}") from e
# [/DEF:authenticate:Function]
@property
# [DEF:headers:Function]
# @PURPOSE: Возвращает HTTP-заголовки для аутентифицированных запросов.
# @PRE: APIClient is initialized and authenticated or can be authenticated.
# @POST: Returns headers including auth tokens.
def headers(self) -> Dict[str, str]:
with belief_scope("headers"):
if not self._authenticated: self.authenticate()
return {
"Authorization": f"Bearer {self._tokens['access_token']}",
"X-CSRFToken": self._tokens.get("csrf_token", ""),
"Referer": self.base_url,
"Content-Type": "application/json"
}
# [/DEF:headers:Function]
# [DEF:request:Function]
# @PURPOSE: Выполняет универсальный HTTP-запрос к API.
# @PARAM: method (str) - HTTP метод.
# @PARAM: endpoint (str) - API эндпоинт.
# @PARAM: headers (Optional[Dict]) - Дополнительные заголовки.
# @PARAM: raw_response (bool) - Возвращать ли сырой ответ.
# @PRE: method and endpoint must be strings.
# @POST: Returns response content or raw Response object.
# @RETURN: `requests.Response` если `raw_response=True`, иначе `dict`.
# @THROW: SupersetAPIError, NetworkError и их подклассы.
def request(self, method: str, endpoint: str, headers: Optional[Dict] = None, raw_response: bool = False, **kwargs) -> Union[requests.Response, Dict[str, Any]]:
with belief_scope("request"):
full_url = f"{self.base_url}{endpoint}"
_headers = self.headers.copy()
if headers: _headers.update(headers)
try:
response = self.session.request(method, full_url, headers=_headers, **kwargs)
response.raise_for_status()
return response if raw_response else response.json()
except requests.exceptions.HTTPError as e:
self._handle_http_error(e, endpoint)
except requests.exceptions.RequestException as e:
self._handle_network_error(e, full_url)
# [/DEF:request:Function]
# [DEF:_handle_http_error:Function]
# @PURPOSE: (Helper) Преобразует HTTP ошибки в кастомные исключения.
# @PARAM: e (requests.exceptions.HTTPError) - Ошибка.
# @PARAM: endpoint (str) - Эндпоинт.
# @PRE: e must be a valid HTTPError with a response.
# @POST: Raises a specific SupersetAPIError or subclass.
def _handle_http_error(self, e: requests.exceptions.HTTPError, endpoint: str):
with belief_scope("_handle_http_error"):
status_code = e.response.status_code
if status_code == 404: raise DashboardNotFoundError(endpoint) from e
if status_code == 403: raise PermissionDeniedError() from e
if status_code == 401: raise AuthenticationError() from e
raise SupersetAPIError(f"API Error {status_code}: {e.response.text}") from e
# [/DEF:_handle_http_error:Function]
# [DEF:_handle_network_error:Function]
# @PURPOSE: (Helper) Преобразует сетевые ошибки в `NetworkError`.
# @PARAM: e (requests.exceptions.RequestException) - Ошибка.
# @PARAM: url (str) - URL.
# @PRE: e must be a RequestException.
# @POST: Raises a NetworkError.
def _handle_network_error(self, e: requests.exceptions.RequestException, url: str):
with belief_scope("_handle_network_error"):
if isinstance(e, requests.exceptions.Timeout): msg = "Request timeout"
elif isinstance(e, requests.exceptions.ConnectionError): msg = "Connection error"
else: msg = f"Unknown network error: {e}"
raise NetworkError(msg, url=url) from e
# [/DEF:_handle_network_error:Function]
# [DEF:upload_file:Function]
# @PURPOSE: Загружает файл на сервер через multipart/form-data.
# @PARAM: endpoint (str) - Эндпоинт.
# @PARAM: file_info (Dict[str, Any]) - Информация о файле.
# @PARAM: extra_data (Optional[Dict]) - Дополнительные данные.
# @PARAM: timeout (Optional[int]) - Таймаут.
# @PRE: file_info must contain 'file_obj' and 'file_name'.
# @POST: File is uploaded and response returned.
# @RETURN: Ответ API в виде словаря.
# @THROW: SupersetAPIError, NetworkError, TypeError.
def upload_file(self, endpoint: str, file_info: Dict[str, Any], extra_data: Optional[Dict] = None, timeout: Optional[int] = None) -> Dict:
with belief_scope("upload_file"):
full_url = f"{self.base_url}{endpoint}"
_headers = self.headers.copy(); _headers.pop('Content-Type', None)
file_obj, file_name, form_field = file_info.get("file_obj"), file_info.get("file_name"), file_info.get("form_field", "file")
files_payload = {}
if isinstance(file_obj, (str, Path)):
with open(file_obj, 'rb') as f:
files_payload = {form_field: (file_name, f.read(), 'application/x-zip-compressed')}
elif isinstance(file_obj, io.BytesIO):
files_payload = {form_field: (file_name, file_obj.getvalue(), 'application/x-zip-compressed')}
else:
raise TypeError(f"Unsupported file_obj type: {type(file_obj)}")
return self._perform_upload(full_url, files_payload, extra_data, _headers, timeout)
# [/DEF:upload_file:Function]
# [DEF:_perform_upload:Function]
# @PURPOSE: (Helper) Выполняет POST запрос с файлом.
# @PARAM: url (str) - URL.
# @PARAM: files (Dict) - Файлы.
# @PARAM: data (Optional[Dict]) - Данные.
# @PARAM: headers (Dict) - Заголовки.
# @PARAM: timeout (Optional[int]) - Таймаут.
# @PRE: url, files, and headers must be provided.
# @POST: POST request is performed and JSON response returned.
# @RETURN: Dict - Ответ.
def _perform_upload(self, url: str, files: Dict, data: Optional[Dict], headers: Dict, timeout: Optional[int]) -> Dict:
with belief_scope("_perform_upload"):
try:
response = self.session.post(url, files=files, data=data or {}, headers=headers, timeout=timeout or self.request_settings["timeout"])
response.raise_for_status()
if response.status_code == 200:
try:
return response.json()
except Exception as json_e:
app_logger.debug(f"[_perform_upload][Debug] Response is not valid JSON: {response.text[:200]}...")
raise SupersetAPIError(f"API error during upload: Response is not valid JSON: {json_e}") from json_e
return response.json()
except requests.exceptions.HTTPError as e:
raise SupersetAPIError(f"API error during upload: {e.response.text}") from e
except requests.exceptions.RequestException as e:
raise NetworkError(f"Network error during upload: {e}", url=url) from e
# [/DEF:_perform_upload:Function]
# [DEF:fetch_paginated_count:Function]
# @PURPOSE: Получает общее количество элементов для пагинации.
# @PARAM: endpoint (str) - Эндпоинт.
# @PARAM: query_params (Dict) - Параметры запроса.
# @PARAM: count_field (str) - Поле с количеством.
# @PRE: query_params must be a dictionary.
# @POST: Returns total count of items.
# @RETURN: int - Количество.
def fetch_paginated_count(self, endpoint: str, query_params: Dict, count_field: str = "count") -> int:
with belief_scope("fetch_paginated_count"):
response_json = cast(Dict[str, Any], self.request("GET", endpoint, params={"q": json.dumps(query_params)}))
return response_json.get(count_field, 0)
# [/DEF:fetch_paginated_count:Function]
# [DEF:fetch_paginated_data:Function]
# @PURPOSE: Автоматически собирает данные со всех страниц пагинированного эндпоинта.
# @PARAM: endpoint (str) - Эндпоинт.
# @PARAM: pagination_options (Dict[str, Any]) - Опции пагинации.
# @PRE: pagination_options must contain 'base_query', 'total_count', 'results_field'.
# @POST: Returns all items across all pages.
# @RETURN: List[Any] - Список данных.
def fetch_paginated_data(self, endpoint: str, pagination_options: Dict[str, Any]) -> List[Any]:
with belief_scope("fetch_paginated_data"):
base_query, total_count = pagination_options["base_query"], pagination_options["total_count"]
results_field, page_size = pagination_options["results_field"], base_query.get('page_size')
assert page_size and page_size > 0, "'page_size' must be a positive number."
results = []
for page in range((total_count + page_size - 1) // page_size):
query = {**base_query, 'page': page}
response_json = cast(Dict[str, Any], self.request("GET", endpoint, params={"q": json.dumps(query)}))
results.extend(response_json.get(results_field, []))
return results
# [/DEF:fetch_paginated_data:Function]
# [/DEF:APIClient:Class]
# [/DEF:backend.core.utils.network:Module]

View File

@@ -12,10 +12,9 @@ from requests.exceptions import RequestException
from ..core.plugin_base import PluginBase
from ..core.logger import belief_scope
from superset_tool.client import SupersetClient
from superset_tool.exceptions import SupersetAPIError
from superset_tool.utils.logger import SupersetLogger
from superset_tool.utils.fileio import (
from ..core.superset_client import SupersetClient
from ..core.utils.network import SupersetAPIError
from ..core.utils.fileio import (
save_and_unpack_dashboard,
archive_exports,
sanitize_filename,
@@ -23,7 +22,6 @@ from superset_tool.utils.fileio import (
remove_empty_directories,
RetentionPolicy
)
from superset_tool.utils.init_clients import setup_clients
from ..dependencies import get_config_manager
# [DEF:BackupPlugin:Class]
@@ -131,25 +129,25 @@ class BackupPlugin(PluginBase):
backup_path_str = params.get("backup_path") or config_manager.get_config().settings.backup_path
backup_path = Path(backup_path_str)
logger = SupersetLogger(log_dir=backup_path / "Logs", console=True)
logger.info(f"[BackupPlugin][Entry] Starting backup for {env}.")
from ..core.logger import logger as app_logger
app_logger.info(f"[BackupPlugin][Entry] Starting backup for {env}.")
try:
config_manager = get_config_manager()
if not config_manager.has_environments():
raise ValueError("No Superset environments configured. Please add an environment in Settings.")
clients = setup_clients(logger, custom_envs=config_manager.get_environments())
client = clients.get(env)
if not client:
env_config = config_manager.get_environment(env)
if not env_config:
raise ValueError(f"Environment '{env}' not found in configuration.")
client = SupersetClient(env_config)
dashboard_count, dashboard_meta = client.get_dashboards()
logger.info(f"[BackupPlugin][Progress] Found {dashboard_count} dashboards to export in {env}.")
app_logger.info(f"[BackupPlugin][Progress] Found {dashboard_count} dashboards to export in {env}.")
if dashboard_count == 0:
logger.info("[BackupPlugin][Exit] No dashboards to back up.")
app_logger.info("[BackupPlugin][Exit] No dashboards to back up.")
return
for db in dashboard_meta:
@@ -169,23 +167,22 @@ class BackupPlugin(PluginBase):
zip_content=zip_content,
original_filename=filename,
output_dir=dashboard_dir,
unpack=False,
logger=logger
unpack=False
)
archive_exports(str(dashboard_dir), policy=RetentionPolicy(), logger=logger)
archive_exports(str(dashboard_dir), policy=RetentionPolicy())
except (SupersetAPIError, RequestException, IOError, OSError) as db_error:
logger.error(f"[BackupPlugin][Failure] Failed to export dashboard {dashboard_title} (ID: {dashboard_id}): {db_error}", exc_info=True)
app_logger.error(f"[BackupPlugin][Failure] Failed to export dashboard {dashboard_title} (ID: {dashboard_id}): {db_error}", exc_info=True)
continue
consolidate_archive_folders(backup_path / env.upper(), logger=logger)
remove_empty_directories(str(backup_path / env.upper()), logger=logger)
consolidate_archive_folders(backup_path / env.upper())
remove_empty_directories(str(backup_path / env.upper()))
logger.info(f"[BackupPlugin][CoherenceCheck:Passed] Backup logic completed for {env}.")
app_logger.info(f"[BackupPlugin][CoherenceCheck:Passed] Backup logic completed for {env}.")
except (RequestException, IOError, KeyError) as e:
logger.critical(f"[BackupPlugin][Failure] Fatal error during backup for {env}: {e}", exc_info=True)
app_logger.critical(f"[BackupPlugin][Failure] Fatal error during backup for {env}: {e}", exc_info=True)
raise e
# [/DEF:execute:Function]
# [/DEF:BackupPlugin:Class]

View File

@@ -145,19 +145,7 @@ class DebugPlugin(PluginBase):
if not env_config:
raise ValueError(f"Environment '{name}' not found.")
# Map Environment model to SupersetConfig
from superset_tool.models import SupersetConfig
superset_config = SupersetConfig(
env=env_config.name,
base_url=env_config.url,
auth={
"provider": "db", # Defaulting to db provider
"username": env_config.username,
"password": env_config.password,
"refresh": "false"
}
)
client = SupersetClient(superset_config)
client = SupersetClient(env_config)
client.authenticate()
count, dbs = client.get_databases()
results[name] = {
@@ -188,19 +176,7 @@ class DebugPlugin(PluginBase):
if not env_config:
raise ValueError(f"Environment '{env_name}' not found.")
# Map Environment model to SupersetConfig
from superset_tool.models import SupersetConfig
superset_config = SupersetConfig(
env=env_config.name,
base_url=env_config.url,
auth={
"provider": "db", # Defaulting to db provider
"username": env_config.username,
"password": env_config.password,
"refresh": "false"
}
)
client = SupersetClient(superset_config)
client = SupersetClient(env_config)
client.authenticate()
dataset_response = client.get_dataset(dataset_id)

View File

@@ -12,8 +12,7 @@ from ..core.superset_client import SupersetClient
from ..core.logger import logger, belief_scope
from ..core.database import SessionLocal
from ..models.connection import ConnectionConfig
from superset_tool.utils.dataset_mapper import DatasetMapper
from superset_tool.utils.logger import SupersetLogger
from ..core.utils.dataset_mapper import DatasetMapper
# [/SECTION]
# [DEF:MapperPlugin:Class]
@@ -137,25 +136,13 @@ class MapperPlugin(PluginBase):
# Get config and initialize client
from ..dependencies import get_config_manager
from superset_tool.models import SupersetConfig
config_manager = get_config_manager()
env_config = config_manager.get_environment(env_name)
if not env_config:
logger.error(f"[MapperPlugin.execute][State] Environment '{env_name}' not found.")
raise ValueError(f"Environment '{env_name}' not found in configuration.")
# Map Environment model to SupersetConfig
superset_config = SupersetConfig(
env=env_config.name,
base_url=env_config.url,
auth={
"provider": "db", # Defaulting to db provider
"username": env_config.username,
"password": env_config.password,
"refresh": "false"
}
)
client = SupersetClient(superset_config)
client = SupersetClient(env_config)
client.authenticate()
postgres_config = None
@@ -185,9 +172,7 @@ class MapperPlugin(PluginBase):
logger.info(f"[MapperPlugin.execute][Action] Starting mapping for dataset {dataset_id} in {env_name}")
# Use internal SupersetLogger for DatasetMapper
s_logger = SupersetLogger(name="dataset_mapper_plugin")
mapper = DatasetMapper(s_logger)
mapper = DatasetMapper()
try:
mapper.run_mapping(

View File

@@ -13,11 +13,9 @@ import re
from ..core.plugin_base import PluginBase
from ..core.logger import belief_scope
from superset_tool.client import SupersetClient
from superset_tool.utils.init_clients import setup_clients
from superset_tool.utils.fileio import create_temp_file, update_yamls, create_dashboard_export
from ..core.superset_client import SupersetClient
from ..core.utils.fileio import create_temp_file, update_yamls, create_dashboard_export
from ..dependencies import get_config_manager
from superset_tool.utils.logger import SupersetLogger
from ..core.migration_engine import MigrationEngine
from ..core.database import SessionLocal
from ..models.mapping import DatabaseMapping, Environment
@@ -150,7 +148,7 @@ class MigrationPlugin(PluginBase):
from ..dependencies import get_task_manager
tm = get_task_manager()
class TaskLoggerProxy(SupersetLogger):
class TaskLoggerProxy:
# [DEF:__init__:Function]
# @PURPOSE: Initializes the proxy logger.
# @PRE: None.
@@ -158,7 +156,7 @@ class MigrationPlugin(PluginBase):
def __init__(self):
with belief_scope("__init__"):
# Initialize parent with dummy values since we override methods
super().__init__(console=False)
pass
# [/DEF:__init__:Function]
# [DEF:debug:Function]
@@ -246,9 +244,8 @@ class MigrationPlugin(PluginBase):
logger.info(f"[MigrationPlugin][State] Resolved environments: {from_env_name} -> {to_env_name}")
all_clients = setup_clients(logger, custom_envs=environments)
from_c = all_clients.get(from_env_name)
to_c = all_clients.get(to_env_name)
from_c = SupersetClient(src_env)
to_c = SupersetClient(tgt_env)
if not from_c or not to_c:
raise ValueError(f"Clients not initialized for environments: {from_env_name}, {to_env_name}")

View File

@@ -106,25 +106,13 @@ class SearchPlugin(PluginBase):
# Get config and initialize client
from ..dependencies import get_config_manager
from superset_tool.models import SupersetConfig
config_manager = get_config_manager()
env_config = config_manager.get_environment(env_name)
if not env_config:
logger.error(f"[SearchPlugin.execute][State] Environment '{env_name}' not found.")
raise ValueError(f"Environment '{env_name}' not found in configuration.")
# Map Environment model to SupersetConfig
superset_config = SupersetConfig(
env=env_config.name,
base_url=env_config.url,
auth={
"provider": "db", # Defaulting to db provider
"username": env_config.username,
"password": env_config.password,
"refresh": "false"
}
)
client = SupersetClient(superset_config)
client = SupersetClient(env_config)
client.authenticate()
logger.info(f"[SearchPlugin.execute][Action] Searching for pattern: '{search_query}' in environment: {env_name}")

View File

@@ -13,7 +13,6 @@ from typing import List, Dict
from backend.src.core.logger import belief_scope
from backend.src.core.superset_client import SupersetClient
from backend.src.core.utils.matching import suggest_mappings
from superset_tool.models import SupersetConfig
# [/SECTION]
# [DEF:MappingService:Class]
@@ -43,17 +42,7 @@ class MappingService:
if not env:
raise ValueError(f"Environment {env_id} not found")
superset_config = SupersetConfig(
env=env.name,
base_url=env.url,
auth={
"provider": "db",
"username": env.username,
"password": env.password,
"refresh": "false"
}
)
return SupersetClient(superset_config)
return SupersetClient(env)
# [/DEF:_get_client:Function]
# [DEF:get_suggestions:Function]

Binary file not shown.

View File

@@ -1,99 +0,0 @@
#!/usr/bin/env python3
"""Test script to verify the fixes for SupersetClient initialization."""
import sys
sys.path.insert(0, '.')
from src.core.config_manager import ConfigManager
from src.core.config_models import Environment
from src.plugins.search import SearchPlugin
from src.plugins.mapper import MapperPlugin
from src.plugins.debug import DebugPlugin
def test_config_manager():
"""Test ConfigManager methods."""
print("Testing ConfigManager...")
try:
config_manager = ConfigManager()
print(f" ConfigManager initialized")
# Test get_environment method
if hasattr(config_manager, 'get_environment'):
print(f" get_environment method exists")
# Add a test environment if none exists
if not config_manager.has_environments():
test_env = Environment(
id="test-env",
name="Test Environment",
url="http://localhost:8088",
username="admin",
password="admin"
)
config_manager.add_environment(test_env)
print(f" Added test environment: {test_env.name}")
# Test retrieving environment
envs = config_manager.get_environments()
if envs:
test_env_id = envs[0].id
env_config = config_manager.get_environment(test_env_id)
print(f" Successfully retrieved environment: {env_config.name}")
return True
else:
print(f" No environments available (add one in settings)")
return False
except Exception as e:
print(f" Error: {e}")
return False
def test_plugins():
"""Test plugin initialization."""
print("\nTesting plugins...")
plugins = [
("Search Plugin", SearchPlugin()),
("Mapper Plugin", MapperPlugin()),
("Debug Plugin", DebugPlugin())
]
all_ok = True
for name, plugin in plugins:
print(f"\nTesting {name}...")
try:
plugin_id = plugin.id
plugin_name = plugin.name
plugin_version = plugin.version
schema = plugin.get_schema()
print(f" ✓ ID: {plugin_id}")
print(f" ✓ Name: {plugin_name}")
print(f" ✓ Version: {plugin_version}")
print(f" ✓ Schema: {schema}")
except Exception as e:
print(f" ✗ Error: {e}")
all_ok = False
return all_ok
def main():
"""Main test function."""
print("=" * 50)
print("Superset Tools Fix Verification")
print("=" * 50)
config_ok = test_config_manager()
plugins_ok = test_plugins()
print("\n" + "=" * 50)
if config_ok and plugins_ok:
print("✅ All fixes verified successfully!")
else:
print("❌ Some tests failed")
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -1,5 +1,5 @@
import pytest
from backend.src.core.logger import belief_scope, logger
from src.core.logger import belief_scope, logger
# [DEF:test_belief_scope_logs_entry_action_exit:Function]

View File

@@ -1,62 +1,23 @@
import pytest
from superset_tool.models import SupersetConfig
from superset_tool.utils.logger import belief_scope
from src.core.config_models import Environment
from src.core.logger import belief_scope
# [DEF:test_superset_config_url_normalization:Function]
# @PURPOSE: Tests that SupersetConfig correctly normalizes the base URL.
# @PRE: SupersetConfig class is available.
# @POST: URL normalization is verified.
def test_superset_config_url_normalization():
with belief_scope("test_superset_config_url_normalization"):
auth = {
"provider": "db",
"username": "admin",
"password": "password",
"refresh": "token"
}
# Test with /api/v1 already present
config = SupersetConfig(
env="dev",
base_url="http://localhost:8088/api/v1",
auth=auth
# [DEF:test_environment_model:Function]
# @PURPOSE: Tests that Environment model correctly stores values.
# @PRE: Environment class is available.
# @POST: Values are verified.
def test_environment_model():
with belief_scope("test_environment_model"):
env = Environment(
id="test-id",
name="test-env",
url="http://localhost:8088/api/v1",
username="admin",
password="password"
)
assert config.base_url == "http://localhost:8088/api/v1"
# Test without /api/v1
config = SupersetConfig(
env="dev",
base_url="http://localhost:8088",
auth=auth
)
assert config.base_url == "http://localhost:8088/api/v1"
# Test with trailing slash
config = SupersetConfig(
env="dev",
base_url="http://localhost:8088/",
auth=auth
)
assert config.base_url == "http://localhost:8088/api/v1"
assert env.id == "test-id"
assert env.name == "test-env"
assert env.url == "http://localhost:8088/api/v1"
# [/DEF:test_superset_config_url_normalization:Function]
# [DEF:test_superset_config_invalid_url:Function]
# @PURPOSE: Tests that SupersetConfig raises ValueError for invalid URLs.
# @PRE: SupersetConfig class is available.
# @POST: ValueError is raised for invalid URLs.
def test_superset_config_invalid_url():
with belief_scope("test_superset_config_invalid_url"):
auth = {
"provider": "db",
"username": "admin",
"password": "password",
"refresh": "token"
}
with pytest.raises(ValueError, match="Must start with http:// or https://"):
SupersetConfig(
env="dev",
base_url="localhost:8088",
auth=auth
)
# [/DEF:test_superset_config_invalid_url:Function]