archive_exports rework

This commit is contained in:
2025-07-11 16:29:20 +03:00
parent 5ff7c2aca9
commit d23eef096a
6 changed files with 443 additions and 37 deletions

View File

@@ -35,6 +35,8 @@ class AuthenticationError(SupersetToolError):
"""[AUTH] Ошибки аутентификации (неверные учетные данные) или авторизации (проблемы с сессией).
@context: url, username, error_detail (опционально).
"""
# [CONTRACT]
# Description: Исключение, возникающее при ошибках аутентификации в Superset API.
def __init__(self, message: str = "Authentication failed", **context: Any):
super().__init__(
f"[AUTH_FAILURE] {message}",
@@ -60,6 +62,8 @@ class SupersetAPIError(SupersetToolError):
@semantic: Для ошибок, возвращаемых Superset API, или проблем с парсингом ответа.
@context: endpoint, method, status_code, response_body (опционально), error_message (из API).
"""
# [CONTRACT]
# Description: Исключение, возникающее при получении ошибки от Superset API (статус код >= 400).
def __init__(self, message: str = "Superset API error", **context: Any):
super().__init__(
f"[API_FAILURE] {message}",
@@ -80,12 +84,27 @@ class DashboardNotFoundError(SupersetAPIError):
@semantic: Соответствует HTTP 404 Not Found.
@context: dashboard_id_or_slug, url.
"""
# [CONTRACT]
# Description: Исключение, специфичное для случая, когда дашборд не найден (статус 404).
def __init__(self, dashboard_id_or_slug: Union[int, str], message: str = "Dashboard not found", **context: Any):
super().__init__(
f"[NOT_FOUND] Dashboard '{dashboard_id_or_slug}' {message}",
{"subtype": "not_found", "resource_id": dashboard_id_or_slug, **context}
)
class DatasetNotFoundError(SupersetAPIError):
"""[API:404] Запрашиваемый набор данных не существует.
@semantic: Соответствует HTTP 404 Not Found.
@context: dataset_id_or_slug, url.
"""
# [CONTRACT]
# Description: Исключение, специфичное для случая, когда набор данных не найден (статус 404).
def __init__(self, dataset_id_or_slug: Union[int, str], message: str = "Dataset not found", **context: Any):
super().__init__(
f"[NOT_FOUND] Dataset '{dataset_id_or_slug}' {message}",
{"subtype": "not_found", "resource_id": dataset_id_or_slug, **context}
)
# [ERROR-SUBCLASS] Детализированные ошибки обработки файлов
class InvalidZipFormatError(SupersetToolError):
"""[FILE:ZIP] Некорректный формат ZIP-архива или содержимого для импорта/экспорта.
@@ -104,8 +123,31 @@ class NetworkError(SupersetToolError):
@semantic: Ошибки, связанные с невозможностью установить или поддерживать сетевое соединение.
@context: url, original_exception (опционально), timeout (опционально).
"""
# [CONTRACT]
# Description: Исключение, возникающее при сетевых ошибках во время взаимодействия с Superset API.
def __init__(self, message: str = "Network connection failed", **context: Any):
super().__init__(
f"[NETWORK_FAILURE] {message}",
{"type": "network", **context}
)
class FileOperationError(SupersetToolError):
"""
# [CONTRACT]
# Description: Исключение, возникающее при ошибках файловых операций (чтение, запись, архивирование).
"""
pass
class InvalidFileStructureError(FileOperationError):
"""
# [CONTRACT]
# Description: Исключение, возникающее при обнаружении некорректной структуры файлов/директорий.
"""
pass
class ConfigurationError(SupersetToolError):
"""
# [CONTRACT]
# Description: Исключение, возникающее при ошибках в конфигурации инструмента.
"""
pass

View File

@@ -192,20 +192,21 @@ def archive_exports(
deduplicate: bool = False,
logger: Optional[SupersetLogger] = None
) -> None:
"""[CONTRACT] Управление архивом экспортированных дашбордов
@pre:
- output_dir должен существовать
- Значения retention должны быть >= 0
@post:
- Сохраняет файлы согласно политике хранения
- Удаляет устаревшие архивы
- Логирует все действия
@raise:
- ValueError: Если retention параметры некорректны
- Exception: При любых других ошибках
"""
# [CONTRACT] Управление архивом экспортированных дашбордов
# @pre:
# - output_dir должен существовать
# - Значения retention должны быть >= 0
# @post:
# - Сохраняет файлы согласно политике хранения
# - Удаляет устаревшие архивы
# - Логирует все действия
# @raise:
# - ValueError: Если retention параметры некорректны
# - Exception: При любых других ошибках
logger = logger or SupersetLogger(name="fileio", console=False)
logger.info(f"[ARCHIVE] Starting archive cleanup in {output_dir}. Deduplication: {deduplicate}")
# [DEBUG_ARCHIVE] Log input parameters
logger.debug(f"[DEBUG_ARCHIVE] archive_exports called with: output_dir={output_dir}, daily={daily_retention}, weekly={weekly_retention}, monthly={monthly_retention}, deduplicate={deduplicate}")
# [VALIDATION] Проверка параметров
if not all(isinstance(x, int) and x >= 0 for x in [daily_retention, weekly_retention, monthly_retention]):
@@ -221,35 +222,54 @@ def archive_exports(
# [PROCESSING] Сбор информации о файлах
files_with_dates = []
for file in export_dir.glob("*.zip"):
zip_files_in_dir = list(export_dir.glob("*.zip"))
# [DEBUG_ARCHIVE] Log number of zip files found
logger.debug(f"[DEBUG_ARCHIVE] Found {len(zip_files_in_dir)} zip files in {export_dir}")
for file in zip_files_in_dir:
# [DEBUG_ARCHIVE] Log file being processed
logger.debug(f"[DEBUG_ARCHIVE] Processing file: {file.name}")
try:
timestamp_str = file.stem.split('_')[-1].split('T')[0]
file_date = datetime.strptime(timestamp_str, "%Y%m%d").date()
logger.debug(f"[DATE_PARSE] Файл {file.name} добавлен к анализу очистки (массив files_with_dates)")
# [DEBUG_ARCHIVE] Log parsed date
logger.debug(f"[DEBUG_ARCHIVE] Parsed date for {file.name}: {file_date}")
except (ValueError, IndexError):
file_date = datetime.fromtimestamp(file.stat().st_mtime).date()
logger.warning(f"[DATE_PARSE] Using modification date for {file.name}")
# [DEBUG_ARCHIVE] Log parsed date (modification date)
logger.debug(f"[DEBUG_ARCHIVE] Parsed date for {file.name} (mod date): {file_date}")
files_with_dates.append((file, file_date))
# [DEDUPLICATION]
if deduplicate:
logger.info("[DEDUPLICATION] Starting checksum-based deduplication.")
logger.info("Начало дедупликации на основе контрольных сумм.")
for file in files_with_dates:
file_path = file[0]
# [DEBUG_ARCHIVE] Log file being checked for deduplication
logger.debug(f"[DEBUG_ARCHIVE][DEDUPLICATION] Checking file: {file_path.name}")
try:
crc32_checksum = calculate_crc32(file_path)
if crc32_checksum in checksums:
# Duplicate found, delete the older file
logger.warning(f"[DEDUPLICATION] Duplicate found: {file_path}. Deleting.")
# [DEBUG_ARCHIVE][DEDUPLICATION] Log duplicate found and deletion attempt
logger.debug(f"[DEBUG_ARCHIVE][DEDUPLICATION] Duplicate found: {file_path.name}. Checksum: {crc32_checksum}. Attempting deletion.")
file_path.unlink()
else:
checksums[crc32_checksum] = file_path
# [DEBUG_ARCHIVE][DEDUPLICATION] Log file kept after deduplication check
logger.debug(f"[DEBUG_ARCHIVE][DEDUPLICATION] Keeping file: {file_path.name}. Checksum: {crc32_checksum}.")
except Exception as e:
logger.error(f"[DEDUPLICATION_ERROR] Error processing {file_path}: {str(e)}", exc_info=True)
# [PROCESSING] Применение политик хранения
# [DEBUG_ARCHIVE] Log files before retention policy
logger.debug(f"[DEBUG_ARCHIVE] Files with dates before retention policy: {[f.name for f, d in files_with_dates]}")
keep_files = apply_retention_policy(
files_with_dates,
daily_retention,
@@ -257,17 +277,26 @@ def archive_exports(
monthly_retention,
logger
)
# [DEBUG_ARCHIVE] Log files to keep after retention policy
logger.debug(f"[DEBUG_ARCHIVE] Files to keep after retention policy: {[f.name for f in keep_files]}")
# [CLEANUP] Удаление устаревших файлов
deleted_count = 0
for file, _ in files_with_dates:
# [DEBUG_ARCHIVE] Check file for deletion
logger.debug(f"[DEBUG_ARCHIVE] Checking file for deletion: {file.name}. Should keep: {file in keep_files}")
if file not in keep_files:
try:
# [DEBUG_ARCHIVE][FILE_REMOVED_ATTEMPT] Log deletion attempt
logger.info(f"[DEBUG_ARCHIVE][FILE_REMOVED_ATTEMPT] Attempting to delete archive: {file.name}")
file.unlink()
deleted_count += 1
logger.info(f"[FILE_REMOVED] Deleted archive: {file.name}")
except OSError as e:
logger.error(f"[FILE_ERROR] Error deleting {file.name}: {str(e)}", exc_info=True)
# [DEBUG_ARCHIVE][FILE_ERROR] Log deletion error
logger.error(f"[DEBUG_ARCHIVE][FILE_ERROR] Error deleting {file.name}: {str(e)}", exc_info=True)
logger.info(f"[ARCHIVE_RESULT] Cleanup completed. Deleted {deleted_count} archives.")

View File

@@ -1,9 +1,18 @@
# utils/logger.py
# [MODULE] Superset Tool Logger Utility
# @contract: Этот модуль предоставляет утилиту для настройки логирования в приложении.
# @semantic_layers:
# - [CONFIG]: Настройка логгера.
# - [UTILITY]: Вспомогательные функции.
# @coherence: Модуль должен быть семантически когерентен со стандартной библиотекой `logging`.
import logging
import sys
from datetime import datetime
from pathlib import Path
from typing import Optional
# [CONSTANTS]
class SupersetLogger:
def __init__(
self,
@@ -59,3 +68,38 @@ class SupersetLogger:
def exception(self, message: str):
self.logger.exception(message)
def setup_logger(name: str, level: int = logging.INFO) -> logging.Logger:
# [FUNCTION] setup_logger
# [CONTRACT]
"""
Настраивает и возвращает логгер с заданным именем и уровнем.
@pre:
- `name` является непустой строкой.
- `level` является допустимым уровнем логирования из модуля `logging`.
@post:
- Возвращает настроенный экземпляр `logging.Logger`.
- Логгер имеет StreamHandler, выводящий в sys.stdout.
- Форматтер логгера включает время, уровень, имя и сообщение.
@side_effects:
- Создает и добавляет StreamHandler к логгеру.
@invariant:
- Логгер с тем же именем всегда возвращает один и тот же экземпляр.
"""
# [CONFIG] Настройка логгера
# [COHERENCE_CHECK_PASSED] Логика настройки соответствует описанию.
logger = logging.getLogger(name)
logger.setLevel(level)
# Создание форматтера
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(name)s - %(message)s')
# Проверка наличия существующих обработчиков
if not logger.handlers:
# Создание StreamHandler для вывода в sys.stdout
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger