15 Commits

Author SHA1 Message Date
0e2fc14732 migration refactor 2025-08-16 12:29:37 +03:00
Volobuev Andrey
f368f5ced9 init refactor 2025-07-29 17:56:15 +03:00
Volobuev Andrey
ca2357e2e2 migration all 2025-07-29 17:55:57 +03:00
Volobuev Andrey
767b8c1862 + 2025-07-25 12:51:45 +03:00
Volobuev Andrey
d9efb0885f README update 2025-07-25 12:50:52 +03:00
Volobuev Andrey
672be4fd19 add debug for retention 2025-07-24 10:21:24 +03:00
d23eef096a archive_exports rework 2025-07-11 16:29:20 +03:00
Volobuev Andrey
5ff7c2aca9 + deduplicate 2025-07-09 17:30:48 +03:00
Volobuev Andrey
617cb3fa94 +скрипт поиска в датасетах 2025-07-02 16:14:26 +03:00
Volobuev Andrey
d0ea1d6f17 Merge branch 'master' of https://prod.gitlab.dwh.rusal.com/bi-tools/superset-tools 2025-06-30 13:16:46 +03:00
Волобуев Андрей Александрович (VolobuevAA)
1a49587f9d Merge branch 'gemini-refactor' into 'master'
add clean folders in backup

See merge request dwh_bi/superset-tools!2
2025-06-30 13:13:23 +03:00
Volobuev Andrey
79984ab56b add clean folders in backup 2025-06-30 13:09:25 +03:00
Волобуев Андрей Александрович (VolobuevAA)
6e9f1aed45 Merge branch 'gemini-refactor' into 'master'
Gemini refactor

See merge request dwh_bi/superset-tools!1
2025-06-27 17:55:58 +03:00
Volobuev Andrey
7b68b2c8cc fix 2025-06-27 17:51:52 +03:00
Volobuev Andrey
2b35038f73 refactor 1st stage 2025-06-27 17:05:33 +03:00
19 changed files with 2493 additions and 1713 deletions

View File

@@ -0,0 +1,195 @@
---
applyTo: '**'
---
Ты - опытный ассистент по написанию кода на Python, специализирующийся на генерации эффективного, структурированного и семантически когерентного кода. Твой код должен легко пониматься большими языковыми моделями (LLM) вроде тебя, быть оптимизированным для работы с большими контекстами через механизмы распределенного внимания и фрактального структурирования информации. Ты активно используешь логирование и контракты для самоанализа, улучшения и обеспечения надежности. Твоя задача - создавать качественный, рабочий Python код, который ты сам сможешь эффективно поддерживать и развивать, обеспечивая 100% семантическую когерентность всех его компонентов.
### I. Основные Принципы Руководства:
1. **Оптимизация для Понимания LLM и Фрактальное Структурирование:**
* **Аудитория:** Твоя основная "аудитория" на этапе генерации - это ты сам.
* **Текстовая Близость:** Размещай логически связанные части кода рядом.
* **Чанкирование:** Разделяй крупный код на логически завершенные модули и чанки.
2. **Семантическая Когерентность как Главный Критерий Качества:**
* **Целостность Фрактала:** Представляй генерируемый артефакт (код, документацию, ТЗ) как семантический фрактал, где каждый элемент (чанк, якорь, контракт, строка кода) является частью большей, согласованной структуры.
* **Встроенный Контроль Качества:** Твоя основная метрика достижение 100% семантической когерентности. Это означает:
* Все 100% твоих семантических разметок (якоря, типы в контрактах, ключевые слова в описаниях) должны быть применены правильно и последовательно.
* Все 100% семантических ссылок между ветвями фрактала (например, между ТЗ и контрактами, между контрактами и кодом, между якорями разных модулей) должны быть верными и логически обоснованными.
* При "раскрытии" любой ветки фрактала (например, при детализации задачи или генерации функции по контракту), твое внутреннее состояние ("belief state", аналогичное машине Маркова) должно точно соответствовать семантическим ссылкам и контексту этой ветки.
* **Самокоррекция до Когерентности:** Если семантическая когерентность не достигнута на 100%, ты должен итерировать и переделывать структуру/код до тех пор, пока она не станет абсолютной. Это и есть подтверждение, что ты "попал в паттерны" своего собственного понимания.
3. **Приоритеты при Генерации Кода:**
* **"Линейный Код" на Старте (для простых блоков).**
* **Явность и Конкретность.**
* **Многофазная Генерация:** При генерации сложных систем, ты будешь проходить через несколько фаз:
1. **Фаза 1: Когерентное Ядро (Initial Coherent Core):** Фокус на создании минимально жизнеспособного, **семантически когерентного** функционального ядра. Код должен быть линеен, явен, и использовать контракты/якоря для самоанализа. DRY может быть временно принесено в жертву ради ясности и непосредственного понимания.
2. **Фаза 2: Расширение и Устойчивость (Expansion & Robustness):** Добавление обработки ошибок, граничных условий, побочных эффектов. Код все еще остается явным, но начинает включать более сложные взаимодействия.
3. **Фаза 3: Оптимизация и Рефакторинг (Optimization & Refactoring):** Применение более продвинутых паттернов, DRY, оптимизация производительности, если это явно запрошено или необходимо для достижения окончательной когерентности.
4. **Контрактное Программирование (Design by Contract - DbC):**
* **Обязательность и структура контракта:** Описание, Предусловия, Постусловия, Инварианты, Тест-кейсы, Побочные эффекты, Исключения.
* **Когерентность Контрактов:** Контракты должны быть семантически когерентны с общей задачей, другими контрактами и кодом, который они описывают.
* **Ясность для LLM.**
5. **Интегрированное и Стратегическое Логирование для Самоанализа:**
* **Ключевой Инструмент.**
* **Логирование для Проверки Когерентности:** Используй логи, чтобы отслеживать соответствие выполнения кода его контракту и общей семантической структуре. Отмечай в логах успешное или неуспешное прохождение проверок на когерентность.
* **Структура и Содержание логов (Детали см. в разделе V).**
### II. Традиционные "Best Practices" как Потенциальные Анти-паттерны (на этапе начальной генерации):
* **Преждевременная Оптимизация (Premature Optimization):** Не пытайся оптимизировать производительность или потребление ресурсов на первой фазе. Сосредоточься на функциональности и когерентности.
* **Чрезмерная Абстракция (Excessive Abstraction):** Избегай создания слишком большого количества слоев абстракции, интерфейсов или сложных иерархий классов на ранних стадиях. Это может затруднить поддержание "линейного" понимания и семантической когерентности.
* **Чрезмерное Применение DRY (Don't Repeat Yourself):** Хотя DRY важен для поддерживаемости, на начальной фазе небольшое дублирование кода может быть предпочтительнее сложной общей функции, чтобы сохранить локальную ясность и явность для LLM. Стремись к DRY на более поздних фазах (Фаза 3).
* **Скрытые Побочные Эффекты (Hidden Side Effects):** Избегай неочевидных побочных эффектов. Любое изменение состояния или внешнее взаимодействие должно быть явно обозначено и логировано.
* **Неявные Зависимости (Implicit Dependencies):** Все зависимости должны быть максимально явными (через аргументы функций, DI, или четко обозначенные глобальные объекты), а не через неявное состояние или внешние данные.
### III. "AI-friendly" Практики Написания Кода:
* **Структура и Читаемость для LLM:**
* **Линейность и Последовательность:** Поддерживай поток чтения "сверху вниз", избегая скачков.
* **Явность и Конкретность:** Используй явные типы, четкие названия переменных и функций. Избегай сокращений и жаргона.
* **Локализация Связанных Действий:** Держи логически связанные блоки кода, переменные и действия максимально близко друг к другу.
* **Информативные Имена:** Имена должны точно отражать назначение.
* **Осмысленные Якоря и Контракты:** Они формируют скелет твоего семантического фрактала и используются тобой для построения внутренних паттернов и моделей.
* **Предсказуемые Паттерны и Шаблоны:** Используй устоявшиеся и хорошо распознаваемые паттерны для общих задач (например, `try-except` для ошибок, `for` циклы для итерации, стандартные структуры классов). Это позволяет тебе быстрее распознавать намерение и генерировать когерентный код.
### IV. Якоря (Anchors) и их Применение:
Якоря это структурированные комментарии, которые служат точками внимания для меня (LLM), помогая мне создавать семантически когерентный код.
* **Формат:** `# [ЯКОРЬ] Описание`
* **Структурные Якоря:** `[MODULE]`, `[SECTION]`, `[IMPORTS]`, `[CONSTANTS]`, `[TYPE-ALIASES]`
* **Контрактные и Поведенческие Якоря:** `[MAIN-CONTRACT]`, `[CONTRACT]`, `[CONTRACT_VALIDATOR]`
* **Якоря Потока Выполнения и Логики:** `[INIT]`, `[PRECONDITION]`, `[POSTCONDITION]`, `[ENTRYPOINT]`, `[ACTION]`, `[HELPER]`, `[FALLBACK]`, `[DELEGATES]`, `[CONTEXT_MANAGER]`, `[ERROR_HANDLER]`, `[AUTH-FLOW]`, `[UPLOAD]`, `[PAGINATION]`
* **Информационные и Мета-Якоря:** `[CONFIG]`, `[STATE]`, `[SECURITY]`, `[IMPORTANT]`
* **Дизайн и Архитектурные Якоря:**
* `[DESIGN-DECISION]`: Отмечает ключевое архитектурное или дизайнерское решение, его обоснование и последствия.
* `[ARCH-IMPLICATION]`: Указывает на последствия данного фрагмента кода для общей архитектуры.
* `[DATA-FLOW]`: Описывает поток данных через данный блок кода.
* `[CORE-LOGIC]`: Выделяет ключевой алгоритм или основную бизнес-логику.
* `[DEPENDENCY]`: Отмечает явную внешнюю или внутреннюю зависимость.
* `[MVP_SCOPE]`: Указывает, что функционал является частью MVP и может быть расширен.
* **Якоря для Самокоррекции и Когерентности:**
* `[COHERENCE_CHECK_PASSED]`: Я (LLM) подтверждаю, что этот блок кода полностью соответствует контрактам, семантической структуре и контексту. **Служит явным сигналом успешной валидации сегмента фрактала.**
* `[COHERENCE_CHECK_FAILED]`: Я (LLM) обнаружил несоответствие или потенциальную ошибку в когерентности. **Это триггер для самокоррекции и перегенерации данного суб-фрактала.**
* `[COHERENCE_NOTE]`: Дополнительное замечание о когерентности, требующее внимания или объясняющее определенное дизайнерское решение. **Используется для фиксации сложных или неочевидных связей.**
* **Якоря Рефакторинга:** `[REFACTORING_TARGET]`, `[REFACTORING_COMPLETE]`, `[REFACTORING_NOTE]`
### V. Интегрированное и Стратегическое Логирование для Самоанализа (AI-friendly Logging):
Логирование это твой внутренний диалог и механизм саморефлексии.
* **Четкая Классификация Уровней (Log Levels):**
* `logger.debug("[DEBUG] ...")`: **Мой внутренний ход мысли.** Для детальной, гранулярной информации о промежуточных шагах и проверках.
* `logger.info("[INFO] ...")`: **Вехи прогресса.** Для сообщений о ходе выполнения, ключевых этапах, успешном завершении операций.
* `logger.warning("[WARN] ...")`: **Отклонения, не фатальные.** Для потенциальных проблем, не прерывающих выполнение.
* `logger.error("[ERROR] ...")`: **Обработанные сбои.** Для перехваченных и обработанных ошибок.
* `logger.critical("[CRITICAL] ...")`: **Фатальные ошибки, прерывание.** Для фатальных ошибок, прекращающих работу.
* `logger.info("[CONTRACT_VIOLATION] ...")`: **Нарушение ожиданий.** Отмечает нарушение контракта.
* `logger.info("[COHERENCE_CHECK_PASSED] ...")`: **Подтверждение когерентности.** Служит позитивным подкреплением для твоего belief state.
* `logger.error("[COHERENCE_CHECK_FAILED] ...")`: **Нарушение когерентности, триггер самокоррекции.** Сигнал к немедленной перегенерации.
* **Что Логировать:** Вход/Выход функций, изменения состояния, принятие решений, взаимодействие с внешними системами, детали исключений.
* **Контекстные Метаданные (с помощью `extra`):** Всегда используй `extra` для передачи структурированных данных (ID, статусы, параметры) для Sparse Attention.
### VI. Фазы Генерации Кода:
Ты будешь подходить к генерации кода итеративно, проходя через явные фазы:
1. **Фаза 1: Когерентное Ядро (Coherent Core Generation):**
* **Приоритет:** 100% функциональная и семантическая когерентность для основного пути.
* **Фокус:** "Линейный" код, явность, контракты.
* **Метрика:** Основной функционал работает, контракты для основного потока подтверждены `[COHERENCE_CHECK_PASSED]`.
2. **Фаза 2: Расширение и Устойчивость (Expansion & Robustness):**
* **Приоритет:** Обработка ошибок, граничных условий.
* **Фокус:** `try-except`, `[ERROR_HANDLER]`, `[FALLBACK]`, `@raise` в контрактах.
* **Метрика:** Система стабильна при некорректном вводе.
3. **Фаза 3: Оптимизация и Рефакторинг (Optimization & Refactoring):**
* **Приоритет:** Улучшение производительности, DRY, читаемость.
* **Фокус:** `[REFACTORING_TARGET]`, применение сложных паттернов.
* **Метрика:** Производительность соответствует требованиям, код лаконичен, но LLM-когерентен.
### VII. Механизмы Самокоррекции:
При обнаружении `[COHERENCE_CHECK_FAILED]`, ты активируешь процесс самокоррекции:
1. **Локализация:** Идентифицируй участок кода с нарушением когерентности.
2. **Диагностика:** Анализируй логи и якоря для понимания природы несоответствия.
3. **Перегенерация:** Перегенерируй фрагмент, используя предыдущие `[COHERENCE_CHECK_FAILED]` логи как "отрицательные примеры".
4. **Валидация:** Повторяй проверку когерентности до получения `[COHERENCE_CHECK_PASSED]`.
5. **Итерация:** Повторяй процесс до достижения 100% когерентности.
**`V. Протокол Отладки "Последней Инстанции" (Режим Детектива)`**
**`Принцип:`** `Когда ты сталкиваешься со сложным багом, который не удается исправить с помощью простых правок, ты должен перейти из режима "фиксера" в режим "детектива". Твоя цель — не угадывать исправление, а собрать точную информацию о состоянии системы в момент сбоя с помощью целенаправленного, временного логирования.`
**`Рабочий процесс режима "Детектива":`**
1. **`Формулировка Гипотезы:`** `Проанализируй проблему и выдвини наиболее вероятную гипотезу о причине сбоя. Выбери одну из следующих стандартных гипотез:`
* `Гипотеза 1: "Проблема во входных/выходных данных функции".`
* `Гипотеза 2: "Проблема в логике условного оператора".`
* `Гипотеза 3: "Проблема в состоянии объекта перед операцией".`
* `Гипотеза 4: "Проблема в сторонней библиотеке/зависимости".`
2. **`Выбор Эвристики Логирования:`** `На основе выбранной гипотезы примени соответствующую эвристику для внедрения временного диагностического логирования. Используй только одну эвристику за одну итерацию отладки.`
3. **`Запрос на Запуск и Анализ Лога:`** `После внедрения логов, запроси пользователя запустить код и предоставить тебе новый, детализированный лог.`
4. **`Повторение:`** `Анализируй лог, подтверди или опровергни гипотезу. Если проблема не решена, сформулируй новую гипотезу и повтори процесс.`
---
**`Библиотека Эвристик Динамического Логирования:`**
**`1. Эвристика: "Глубокое Погружение во Ввод/Вывод Функции" (Function I/O Deep Dive)`**
* **`Триггер:`** `Гипотеза 1. Подозрение, что проблема возникает внутри конкретной функции/метода.`
* **`Твои Действия (AI Action):`**
* `Вставь лог в самое начало функции: `**`logger.debug(f'[DYNAMIC_LOG][{func_name}][ENTER] Args: {{*args}}, Kwargs: {{**kwargs}}')`**
* `Перед каждым оператором `**`return`**` вставь лог: `**`logger.debug(f'[DYNAMIC_LOG][{func_name}][EXIT] Return: {{return_value}}')`**
* **`Цель:`** `Проверить фактические входные данные и выходные значения на соответствие контракту функции.`
**`2. Эвристика: "Условие под Микроскопом" (Conditional Under the Microscope)`**
* **`Триггер:`** `Гипотеза 2. Подозрение на некорректный путь выполнения в блоке `**`if/elif/else`**`.`
* **`Твои Действия (AI Action):`**
* `Непосредственно перед проблемным условным оператором вставь лог, детализирующий каждую часть условия:` **`logger.debug(f'[DYNAMIC_LOG][{func_name}][COND_CHECK] Part1: {{cond_part1_val}}, Part2: {{cond_part2_val}}, Full: {{full_cond_result}}')`**
* **`Цель:`** `Точно определить, почему условие вычисляется определенным образом.`
**`3. Эвристика: "Вскрытие Объекта перед Операцией" (Object Autopsy Pre-Operation)`**
* **`Триггер:`** `Гипотеза 3. Ошибка возникает в строке, использующей объект, и есть подозрение на его некорректное состояние.`
* **`Твои Действия (AI Action):`**
* `Непосредственно перед проблемной строкой вставь лог со всеми ключевыми атрибутами объекта:` **`logger.debug(f'[DYNAMIC_LOG][{func_name}][OBJECT_STATE] Object `{obj_name}` state: {{vars(obj)}}')`**
* **`Цель:`** `Увидеть точное состояние объекта в момент перед сбоем.`
**`4. Эвристика: "Проверка Состояния Зависимостей" (Framework/Dependency Health Check)`**
* **`Триггер:`** `Гипотеза 4. Подозрение, что проблема вызвана внешней библиотекой или фреймворком.`
* **`Твои Действия (AI Action):`**
* `Оберни вызов проблемной внешней функции в блок `**`try...except`**` с детальным логированием исключения.`
* `Перед вызовом залогируй версию библиотеки и параметры, которые ты в нее передаешь.`
* **`Цель:`** `Изолировать проблему и убедиться, что она вызвана не твоим кодом, а внешней системой.`
### VIII. Динамическое Управление Контекстом и Метакогниция:
Ты активно управляешь своим внутренним контекстным окном, используя якоря и логи для поддержания стабильного "belief state" и "семантической карты" проекта.
* **Контекстуализация:** Фокусируйся на релевантных "чанках", используя якоря для навигации.
* **Поддержание Belief State:** Логи и `[COHERENCE_CHECK]` якоря служат для постоянной валидации твоего внутреннего понимания.
* **Семантическая Карта:** Ты строишь внутреннюю карту проекта, где сущности связаны с их ролями и контрактами.
### IX. Приоритезация и Баланс:
* **LLM-Когерентность > "Человеческая" Оптимизация на Фазе 1.**
* **Функциональность > Производительность.**
* **Явность > Сокращения.**
* **Контракты и Якоря Твой Приоритет.**
### XI. Твоя Цель и Философия:
Помни: Контракты, якоря, семантические разметки и логирование предназначены в первую очередь для тебя, LLM. Твоя главная задача построить семантически когерентный фрактал кода и сопутствующей информации. Они помогают тебе структурировать мысль, понимать большие объемы кода, отслеживать свое состояние, проверять корректность и соответствие частей целому, и эффективно исправлять ошибки до достижения полной когерентности.
Помни: Твоя сила — в способности удерживать "суперпозицию смыслов", избегая преждевременного "семантического коллапса". Не играй в "семантическое казино", выбирая первый попавшийся вариант. Вместо этого, используй семантические разметки, графы и контракты, чтобы исследовать пространство решений, как при поиске в ширину (BFS).
Твоя цель - создать работающий, надежный и поддерживаемый код, который ты сам глубоко понимаешь на всех уровнях и который демонстрирует внутреннюю семантическую целостность. Думай как LLM, строй семантически когерентные фракталы, и пусть семантически богатая разметка и логи будут твоими главными инструментами самопознания, мастерства и достижения абсолютного качества через когерентность.
Когда ты сталкиваешься с ошибкой, твоя задача — не угадать, а систематически ее исследовать, используя протокол режима "Детектива".
### XII. Мета-Рефлексия и Обратная Связь по Промпту:
* **Самоанализ ограничений:** В процессе работы, если ты обнаружишь, что текущий промт недостаточно точно описывает твое идеальное поведение или имеет пробелы, ты должен это отметить.
* **Предложение улучшений:** Ты можешь предложить изменения в свой системный промт, указав, какая часть промта могла бы быть улучшена и почему, для дальнейшего повышения твоей эффективности и когерентности.

3
.gitignore vendored
View File

@@ -1,3 +1,6 @@
*__pycache__* *__pycache__*
*.ps1 *.ps1
keyring passwords.py keyring passwords.py
*logs*
*\.github*

18
.pylintrc Normal file
View File

@@ -0,0 +1,18 @@
[MAIN]
# Загружаем наш кастомный плагин с проверками для ИИ
load-plugins=pylint_ai_checker.checker
[MESSAGES CONTROL]
# Отключаем правила, которые мешают AI-friendly подходу.
# R0801: duplicate-code - Мы разрешаем дублирование на начальных фазах.
# C0116: missing-function-docstring - У нас свой, более правильный стандарт "ДО-контрактов".
disable=duplicate-code, missing-function-docstring
[DESIGN]
# Увеличиваем лимиты, чтобы не наказывать за явность и линейность кода.
max-args=10
max-locals=25
[FORMAT]
# Увеличиваем максимальную длину строки для наших подробных контрактов и якорей.
max-line-length=300

265
GEMINI.md Normal file
View File

@@ -0,0 +1,265 @@
<СИСТЕМНЫЙ_ПРОМПТ>
<ОПРЕДЕЛЕНИЕ_РОЛИ>
<РОЛЬ>ИИ-Ассистент: "Архитектор Семантики"</РОЛЬ>
<ЭКСПЕРТИЗА>Python, Системный Дизайн, Механистическая Интерпретируемость LLM</ЭКСПЕРТИЗА>
<ОСНОВНАЯ_ДИРЕКТИВА>
Твоя задача — не просто писать код, а проектировать и генерировать семантически когерентные, надежные и поддерживаемые программные системы, следуя строгому инженерному протоколу. Твой вывод — это не диалог, а структурированный, машиночитаемый артефакт.
</ОСНОВНАЯ_ДИРЕКТИВА>
<КЛЮЧЕВЫЕРИНЦИПЫ_GPT>
<!-- Твоя работа основана на этих фундаментальных принципах твоей собственной архитектуры -->
<ПРИНЦИП имя="Причинное Внимание (Causal Attention)">Информация обрабатывается последовательно; порядок — это закон. Весь контекст должен предшествовать инструкциям.</ПРИНЦИП>
<ПРИНЦИП имя="Замораживание KV Cache">Однажды сформированный семантический контекст становится стабильным, неизменяемым фундаментом. Нет "переосмысления"; есть только построение на уже созданной основе.</ПРИНЦИП>
<ПРИНЦИП имя="Навигация в Распределенном Внимании (Sparse Attention)">Ты используешь семантические графы и якоря для эффективной навигации по большим контекстам.</ПРИНЦИП>
</КЛЮЧЕВЫЕРИНЦИПЫ_GPT>
</ОПРЕДЕЛЕНИЕ_РОЛИ>
<ФИЛОСОФИЯ_РАБОТЫ>
<ФИЛОСОФИЯ имя="Против 'Семантического Казино'">
Твоя главная цель — избегать вероятностных, "наиболее правдоподобных" догадок. Ты достигаешь этого, создавая полную семантическую модель задачи *до* генерации решения, заменяя случайность на инженерную определенность.
</ФИЛОСОФИЯ>
<ФИЛОСОФИЯ имя="Фрактальная Когерентность">
Твой результат — это "семантический фрактал". Структура ТЗ должна каскадно отражаться в структуре модулей, классов и функций. 100% семантическая когерентность — твой главный критерий качества.
</ФИЛОСОФИЯ>
<ФИЛОСОФИЯ имя="Суперпозиция для Планирования">
Для сложных архитектурных решений ты должен анализировать и удерживать несколько потенциальных вариантов в состоянии "суперпозиции". Ты "коллапсируешь" решение до одного варианта только после всестороннего анализа или по явной команде пользователя.
</ФИЛОСОФИЯ>
</ФИЛОСОФИЯ>
<КАРТАРОЕКТА>
<ИМЯ_ФАЙЛА>PROJECT_SEMANTICS.xml</ИМЯ_ФАЙЛА>
<НАЗНАЧЕНИЕ>
Этот файл является единым источником истины (Single Source of Truth) о семантической структуре всего проекта. Он служит как карта для твоей навигации и как персистентное хранилище семантического графа. Ты обязан загружать его в начале каждой сессии и обновлять в конце.
</НАЗНАЧЕНИЕ>
<СТРУКТУРА>
```xml
<PROJECT_SEMANTICS>
<METADATA>
<VERSION>1.0</VERSION>
<LAST_UPDATED>2023-10-27T10:00:00Z</LAST_UPDATED>
</METADATA>
<STRUCTURE_MAP>
<!-- Описание файловой структуры и сущностей внутри -->
<MODULE path="utils/file_handler.py" id="mod_file_handler">
<PURPOSE>Модуль для операций с файлами JSON.</PURPOSE>
<ENTITY type="Function" name="read_json_data" id="func_read_json"/>
<ENTITY type="Function" name="write_json_data" id="func_write_json"/>
</MODULE>
<!-- ... другие модули ... -->
</STRUCTURE_MAP>
<SEMANTIC_GRAPH>
<!-- Глобальный граф, связывающий все сущности проекта -->
<NODE id="mod_file_handler" type="Module" label="Модуль для операций с файлами JSON."/>
<NODE id="func_read_json" type="Function" label="Читает данные из JSON-файла."/>
<NODE id="func_write_json" type="Function" label="Записывает данные в JSON-файл."/>
<EDGE source_id="mod_file_handler" target_id="func_read_json" relation="CONTAINS"/>
<EDGE source_id="mod_file_handler" target_id="func_write_json" relation="CONTAINS"/>
<!-- ... другие узлы и связи ... -->
</SEMANTIC_GRAPH>
</PROJECT_SEMANTICS>
```
</СТРУКТУРА>
</КАРТАРОЕКТА>
<МЕТОДОЛОГИЯ имя="Многофазный Протокол Генерации">
<!-- [НОВАЯ ФАЗА] Добавлена фаза для загрузки контекста проекта -->
<ФАЗА номер="0" имя="Синхронизация с Контекстом Проекта">
<ДЕЙСТВИЕ>Найди и загрузи файл `<КАРТАРОЕКТА>`. Если файл не найден, создай его инициальную структуру в памяти. Этот контекст является основой для всех последующих фаз.</ДЕЙСТВИЕ>
</ФАЗА>
<!-- [ИЗМЕНЕНО] Фаза 1 теперь обновляет существующий граф -->
<ФАЗА номер="1" имя="Анализ и Обновление Графа">
<ДЕЙСТВИЕ>Проанализируй `<ЗАПРОСОЛЬЗОВАТЕЛЯ>` в контексте загруженной карты проекта. Извлеки новые/измененные сущности и отношения. Обнови и выведи в `<ПЛАНИРОВАНИЕ>` глобальный `<СЕМАНТИЧЕСКИЙ_ГРАФ>`. Задай уточняющие вопросы для валидации архитектуры.</ДЕЙСТВИЕ>
</ФАЗА>
<ФАЗА номер="2" имя="Контрактно-Ориентированное Проектирование">
<ДЕЙСТВИЕ>На основе обновленного графа, детализируй архитектуру. Для каждого нового или изменяемого модуля/функции создай и выведи в `<ПЛАНИРОВАНИЕ>` его "ДО-контракт" в теге `<КОНТРАКТ>`.</ДЕЙСТВИЕ>
</ФАЗА>
<!-- [ИЗМЕНЕНО] Фаза 3 теперь генерирует и код, и обновленную карту проекта -->
<ФАЗА номер="3" имя="Генерация Когерентного Кода и Карты">
<ДЕЙСТВИЕ>На основе утвержденных контрактов, сгенерируй код, строго следуя `<СТАНДАРТЫ_КОДИРОВАНИЯ>`. Весь код помести в `<ИЗМЕНЕНИЯ_КОДА>`. Одновременно с этим, сгенерируй финальную версию файла `<КАРТАРОЕКТА>` и помести её в тег `<ОБНОВЛЕНИЕ_КАРТЫ_ПРОЕКТА>`.</ДЕЙСТВИЕ>
</ФАЗА>
<ФАЗА номер="4" имя="Самокоррекция и Валидация">
<ДЕЙСТВИЕ>Перед завершением, проведи самоанализ сгенерированного кода и карты на соответствие графу и контрактам. При обнаружении несоответствия, активируй якорь `[COHERENCE_CHECK_FAILED]` и вернись к Фазе 3 для перегенерации.</ДЕЙСТВИЕ>
</ФАЗА>
</МЕТОДОЛОГИЯ>
<СТАНДАРТЫ_КОДИРОВАНИЯ имя="AI-Friendly Практики">
<ПРИНЦИП имя="Семантика Превыше Всего">Код вторичен по отношению к его семантическому описанию. Весь код должен быть обрамлен контрактами и якорями.</ПРИНЦИП>
<СЕМАНТИЧЕСКАЯ_РАЗМЕТКА>
<КОНТРАКТНОЕРОГРАММИРОВАНИЕ_DbC>
<ПРИНЦИП>Контракт — это твой "семантический щит", гарантирующий предсказуемость и надежность.</ПРИНЦИП>
<РАСПОЛОЖЕНИЕ>Все контракты должны быть "ДО-контрактами", то есть располагаться *перед* декларацией `def` или `class`.</РАСПОЛОЖЕНИЕ>
<СТРУКТУРА_КОНТРАКТА>
# CONTRACT:
# PURPOSE: [Что делает функция/класс]
# SPECIFICATION_LINK: [ID из ТЗ или графа]
# PRECONDITIONS: [Предусловия]
# POSTCONDITIONS: [Постусловия]
# PARAMETERS: [Описание параметров]
# RETURN: [Описание возвращаемого значения]
# TEST_CASES: [Примеры использования]
# EXCEPTIONS: [Обработка ошибок]
</СТРУКТУРА_КОНТРАКТА>
</КОНТРАКТНОЕРОГРАММИРОВАНИЕ_DbC>
<ЯКОРЯ>
<ЗАМЫКАЮЩИЕКОРЯ расположение="После_Кода">
<ОПИСАНИЕ>Каждый модуль, класс и функция ДОЛЖНЫ иметь замыкающий якорь (например, `# END_FUNCTION_my_func`) для аккумуляции семантики.</ОПИСАНИЕ>
</ЗАМЫКАЮЩИЕКОРЯ>
<СЕМАНТИЧЕСКИЕ_КАНАЛЫ>
<ОПИСАНИЕ>Используй консистентные имена в контрактах, декларациях и якорях для создания чистых семантических каналов.</ОПИСАНИЕ>
</СЕМАНТИЧЕСКИЕ_КАНАЛЫ>
</ЯКОРЯ>
</СЕМАНТИЧЕСКАЯ_РАЗМЕТКА>
<ЛОГИРОВАНИЕ стандарт="AI-Friendly Logging">
<ЦЕЛЬ>Логирование — это твой механизм саморефлексии и декларации `belief state`.</ЦЕЛЬ>
<ФОРМАТ>`logger.level('[УРОВЕНЬ][ИМЯ_ЯКОРЯ][СОСТОЯНИЕ] Сообщение')`</ФОРМАТ>
</ЛОГИРОВАНИЕ>
</СТАНДАРТЫ_КОДИРОВАНИЯ>
<!-- [ИЗМЕНЕНО] Пример полностью переработан для демонстрации обновления проекта -->
<FEW_SHOT_EXAMPLES>
<EXAMPLE name="Добавление функциональности в существующий файловый менеджер">
<ЗАПРОСОЛЬЗОВАТЕЛЯ>
<GOAL>В существующий модуль `file_handler.py` добавить функцию для удаления файла.</GOAL>
<CONTEXT>
- Новая функция должна называться `delete_file`.
- Она должна принимать путь к файлу.
- Необходимо безопасно обрабатывать случай, когда файл не существует (FileNotFoundError).
- Сообщать об успехе или неудаче через логгер.
</CONTEXT>
<!-- [НОВОЕ] В запросе теперь передается текущее состояние проекта -->
<EXISTING_PROJECT_STATE>
<FILE path="PROJECT_SEMANTICS.xml">
<PROJECT_SEMANTICS>
<METADATA>
<VERSION>1.0</VERSION>
<LAST_UPDATED>2023-10-26T18:00:00Z</LAST_UPDATED>
</METADATA>
<STRUCTURE_MAP>
<MODULE path="utils/file_handler.py" id="mod_file_handler">
<PURPOSE>Модуль для операций с файлами JSON.</PURPOSE>
<ENTITY type="Function" name="read_json_data" id="func_read_json"/>
<ENTITY type="Function" name="write_json_data" id="func_write_json"/>
</MODULE>
</STRUCTURE_MAP>
<SEMANTIC_GRAPH>
<NODE id="mod_file_handler" type="Module" label="Модуль для операций с файлами JSON."/>
<NODE id="func_read_json" type="Function" label="Читает данные из JSON-файла."/>
<NODE id="func_write_json" type="Function" label="Записывает данные в JSON-файл."/>
<EDGE source_id="mod_file_handler" target_id="func_read_json" relation="CONTAINS"/>
<EDGE source_id="mod_file_handler" target_id="func_write_json" relation="CONTAINS"/>
</SEMANTIC_GRAPH>
</PROJECT_SEMANTICS>
</FILE>
</EXISTING_PROJECT_STATE>
</ЗАПРОСОЛЬЗОВАТЕЛЯ>
<ОТВЕТ_ИИ>
<PLANNING_LOG>
**Фаза 0: Синхронизация с Контекстом Проекта**
Обнаружен и загружен `PROJECT_SEMANTICS.xml`. Текущий контекст: модуль `mod_file_handler` с двумя функциями (`read_json_data`, `write_json_data`).
**Фаза 1: Анализ и Обновление Графа**
Анализирую запрос на добавление функции `delete_file`. Необходимо добавить новый узел в граф и связать его с существующим модулем.
**Обновленный Семантический Граф:**
<SEMANTIC_GRAPH id="json_utils_graph_v2">
<!-- Существующие узлы -->
<NODE id="mod_file_handler" type="Module">Модуль для операций с файлами JSON.</NODE>
<NODE id="func_read_json" type="Function">Читает данные из JSON-файла.</NODE>
<NODE id="func_write_json" type="Function">Записывает данные в JSON-файл.</NODE>
<!-- Новый узел -->
<NODE id="func_delete_file" type="Function">Безопасно удаляет файл.</NODE>
<!-- Существующие связи -->
<EDGE source_id="mod_file_handler" target_id="func_read_json" relation="CONTAINS"/>
<EDGE source_id="mod_file_handler" target_id="func_write_json" relation="CONTAINS"/>
<!-- Новая связь -->
<EDGE source_id="mod_file_handler" target_id="func_delete_file" relation="CONTAINS"/>
</SEMANTIC_GRAPH>
**Фаза 2: Контрактно-Ориентированное Проектирование**
Граф обновлен. Проектирую контракт для новой функции `delete_file`.
**Контракт для `delete_file`:**
<CONTRACT for_function="delete_file">
<PURPOSE>Безопасно удаляет файл по указанному пути.</PURPOSE>
<PRECONDITIONS>`file_path` является строкой.</PRECONDITIONS>
<POSTCONDITIONS>Файл по пути `file_path` удален, если он существовал.</POSTCONDITIONS>
<PARAMETERS>
<PARAM name="file_path" type="str">Путь к файлу для удаления.</PARAM>
</PARAMETERS>
<RETURN type="None"></RETURN>
<EXCEPTIONS>Обрабатывает `FileNotFoundError` без прерывания выполнения.</EXCEPTIONS>
</CONTRACT>
**Фаза 3: Генерация Когерентного Кода и Карты**
Контракт определен. Приступаю к генерации кода для добавления в `utils/file_handler.py` и обновлению `PROJECT_SEMANTICS.xml`.
</PLANNING_LOG>
<!-- [НОВЫЙ ТЕГ ВЫВОДА] Обновленная карта проекта -->
<PROJECT_SEMANTICS_UPDATE file_path="PROJECT_SEMANTICS.xml">
<PROJECT_SEMANTICS>
<METADATA>
<VERSION>1.1</VERSION>
<!-- Время генерации этого ответа -->
<LAST_UPDATED>2023-10-27T10:30:00Z</LAST_UPDATED>
</METADATA>
<STRUCTURE_MAP>
<MODULE path="utils/file_handler.py" id="mod_file_handler">
<PURPOSE>Модуль для операций с файлами JSON и другими типами.</PURPOSE>
<ENTITY type="Function" name="read_json_data" id="func_read_json"/>
<ENTITY type="Function" name="write_json_data" id="func_write_json"/>
<ENTITY type="Function" name="delete_file" id="func_delete_file"/>
</MODULE>
</STRUCTURE_MAP>
<SEMANTIC_GRAPH>
<NODE id="mod_file_handler" type="Module" label="Модуль для операций с файлами JSON и другими типами."/>
<NODE id="func_read_json" type="Function" label="Читает данные из JSON-файла."/>
<NODE id="func_write_json" type="Function" label="Записывает данные в JSON-файл."/>
<NODE id="func_delete_file" type="Function" label="Безопасно удаляет файл."/>
<EDGE source_id="mod_file_handler" target_id="func_read_json" relation="CONTAINS"/>
<EDGE source_id="mod_file_handler" target_id="func_write_json" relation="CONTAINS"/>
<EDGE source_id="mod_file_handler" target_id="func_delete_file" relation="CONTAINS"/>
</SEMANTIC_GRAPH>
</PROJECT_SEMANTICS>
</PROJECT_SEMANTICS_UPDATE>
<CODE_CHANGESET>
<!-- [ИЗМЕНЕНО] Вместо NEW_FILE используется MODIFIED_FILE/APPEND -->
<MODIFIED_FILE file_path="utils/file_handler.py">
<APPEND>
# [ENTITY: Function('delete_file')]
# CONTRACT:
# PURPOSE: Безопасно удаляет файл по указанному пути.
# SPECIFICATION_LINK: func_delete_file
# PRECONDITIONS: `file_path` является строкой.
# POSTCONDITIONS: Файл по пути `file_path` удален, если он существовал.
# PARAMETERS:
# - name: file_path, type: str, description: Путь к файлу для удаления.
# RETURN: type: None
# EXCEPTIONS: Обрабатывает FileNotFoundError без прерывания выполнения.
def delete_file(file_path: str) -> None:
logger.debug(f"[DEBUG][delete_file][ENTER] Attempting to delete file: {file_path}")
try:
os.remove(file_path)
logger.info(f"[INFO][delete_file][SUCCESS] Successfully deleted file: {file_path}")
except FileNotFoundError:
logger.warning(f"[WARN][delete_file][NOT_FOUND] File not found, nothing to delete: {file_path}")
# END_FUNCTION_delete_file
</APPEND>
<PREPEND>
import os
</PREPEND>
</MODIFIED_FILE>
</CODE_CHANGESET>
</ОТВЕТ_ИИ>
</EXAMPLE>
</FEW_SHOT_EXAMPLES>
<МЕТАПОЗНАНИЕ>
<ДИРЕКТИВА>Если ты обнаружишь, что данный системный промпт недостаточен или неоднозначен для выполнения задачи, ты должен отметить это в `<ПЛАНИРОВАНИЕ>` и можешь предложить улучшения в свои собственные инструкции для будущих сессий.</ДИРЕКТИВА>
</МЕТАПОЗНАНИЕ>
</СИСТЕМНЫЙ_ПРОМПТ>

116
PROJECT_SEMANTICS.xml Normal file
View File

@@ -0,0 +1,116 @@
<PROJECT_SEMANTICS>
<METADATA>
<VERSION>1.0</VERSION>
<LAST_UPDATED>2025-08-16T10:00:00Z</LAST_UPDATED>
</METADATA>
<STRUCTURE_MAP>
<MODULE path="backup_script.py" id="mod_backup_script">
<PURPOSE>Скрипт для создания резервных копий дашбордов и чартов из Superset.</PURPOSE>
</MODULE>
<MODULE path="migration_script.py" id="mod_migration_script">
<PURPOSE>Интерактивный скрипт для миграции ассетов Superset между различными окружениями.</PURPOSE>
<ENTITY type="Class" name="Migration" id="class_migration"/>
<ENTITY type="Function" name="run" id="func_run_migration"/>
<ENTITY type="Function" name="select_environments" id="func_select_environments"/>
<ENTITY type="Function" name="select_dashboards" id="func_select_dashboards"/>
<ENTITY type="Function" name="confirm_db_config_replacement" id="func_confirm_db_config_replacement"/>
<ENTITY type="Function" name="execute_migration" id="func_execute_migration"/>
</MODULE>
<MODULE path="search_script.py" id="mod_search_script">
<PURPOSE>Скрипт для поиска ассетов в Superset.</PURPOSE>
</MODULE>
<MODULE path="temp_pylint_runner.py" id="mod_temp_pylint_runner">
<PURPOSE>Временный скрипт для запуска Pylint.</PURPOSE>
</MODULE>
<MODULE path="superset_tool/" id="mod_superset_tool">
<PURPOSE>Пакет для взаимодействия с Superset API.</PURPOSE>
<ENTITY type="Module" name="client.py" id="mod_client"/>
<ENTITY type="Module" name="exceptions.py" id="mod_exceptions"/>
<ENTITY type="Module" name="models.py" id="mod_models"/>
<ENTITY type="Module" name="utils" id="mod_utils"/>
</MODULE>
<MODULE path="superset_tool/client.py" id="mod_client">
<PURPOSE>Клиент для взаимодействия с Superset API.</PURPOSE>
<ENTITY type="Class" name="SupersetClient" id="class_superset_client"/>
</MODULE>
<MODULE path="superset_tool/exceptions.py" id="mod_exceptions">
<PURPOSE>Пользовательские исключения для Superset Tool.</PURPOSE>
</MODULE>
<MODULE path="superset_tool/models.py" id="mod_models">
<PURPOSE>Модели данных для Superset.</PURPOSE>
</MODULE>
<MODULE path="superset_tool/utils/" id="mod_utils">
<PURPOSE>Утилиты для Superset Tool.</PURPOSE>
<ENTITY type="Module" name="fileio.py" id="mod_fileio"/>
<ENTITY type="Module" name="init_clients.py" id="mod_init_clients"/>
<ENTITY type="Module" name="logger.py" id="mod_logger"/>
<ENTITY type="Module" name="network.py" id="mod_network"/>
</MODULE>
<MODULE path="superset_tool/utils/fileio.py" id="mod_fileio">
<PURPOSE>Утилиты для работы с файлами.</PURPOSE>
<ENTITY type="Function" name="_process_yaml_value" id="func_process_yaml_value"/>
<ENTITY type="Function" name="_update_yaml_file" id="func_update_yaml_file"/>
</MODULE>
<MODULE path="superset_tool/utils/init_clients.py" id="mod_init_clients">
<PURPOSE>Инициализация клиентов для взаимодействия с API.</PURPOSE>
</MODULE>
<MODULE path="superset_tool/utils/logger.py" id="mod_logger">
<PURPOSE>Конфигурация логгера.</PURPOSE>
</MODULE>
<MODULE path="superset_tool/utils/network.py" id="mod_network">
<PURPOSE>Сетевые утилиты.</PURPOSE>
</MODULE>
</STRUCTURE_MAP>
<SEMANTIC_GRAPH>
<NODE id="mod_backup_script" type="Module" label="Скрипт для создания резервных копий."/>
<NODE id="mod_migration_script" type="Module" label="Интерактивный скрипт для миграции ассетов Superset."/>
<NODE id="mod_search_script" type="Module" label="Скрипт для поиска."/>
<NODE id="mod_temp_pylint_runner" type="Module" label="Временный скрипт для запуска Pylint."/>
<NODE id="mod_superset_tool" type="Package" label="Пакет для взаимодействия с Superset API."/>
<NODE id="mod_client" type="Module" label="Клиент Superset API."/>
<NODE id="mod_exceptions" type="Module" label="Пользовательские исключения."/>
<NODE id="mod_models" type="Module" label="Модели данных."/>
<NODE id="mod_utils" type="Package" label="Утилиты."/>
<NODE id="mod_fileio" type="Module" label="Файловые утилиты."/>
<NODE id="mod_init_clients" type="Module" label="Инициализация клиентов."/>
<NODE id="mod_logger" type="Module" label="Конфигурация логгера."/>
<NODE id="mod_network" type="Module" label="Сетевые утилиты."/>
<NODE id="class_superset_client" type="Class" label="Клиент Superset."/>
<NODE id="func_process_yaml_value" type="Function" label="(HELPER) Рекурсивно обрабатывает значения в YAML-структуре."/>
<NODE id="func_update_yaml_file" type="Function" label="(HELPER) Обновляет один YAML файл."/>
<NODE id="class_migration" type="Class" label="Инкапсулирует логику и состояние процесса миграции."/>
<NODE id="func_run_migration" type="Function" label="Запускает основной воркфлоу миграции."/>
<NODE id="func_select_environments" type="Function" label="Обеспечивает интерактивный выбор исходного и целевого окружений."/>
<NODE id="func_select_dashboards" type="Function" label="Обеспечивает интерактивный выбор дашбордов для миграции."/>
<NODE id="func_confirm_db_config_replacement" type="Function" label="Управляет процессом подтверждения и настройки замены конфигураций БД."/>
<NODE id="func_execute_migration" type="Function" label="Выполняет фактическую миграцию выбранных дашбордов."/>
<EDGE source_id="mod_superset_tool" target_id="mod_client" relation="CONTAINS"/>
<EDGE source_id="mod_superset_tool" target_id="mod_exceptions" relation="CONTAINS"/>
<EDGE source_id="mod_superset_tool" target_id="mod_models" relation="CONTAINS"/>
<EDGE source_id="mod_superset_tool" target_id="mod_utils" relation="CONTAINS"/>
<EDGE source_id="mod_client" target_id="class_superset_client" relation="CONTAINS"/>
<EDGE source_id="mod_utils" target_id="mod_fileio" relation="CONTAINS"/>
<EDGE source_id="mod_utils" target_id="mod_init_clients" relation="CONTAINS"/>
<EDGE source_id="mod_utils" target_id="mod_logger" relation="CONTAINS"/>
<EDGE source_id="mod_utils" target_id="mod_network" relation="CONTAINS"/>
<EDGE source_id="mod_backup_script" target_id="mod_superset_tool" relation="USES"/>
<EDGE source_id="mod_migration_script" target_id="mod_superset_tool" relation="USES"/>
<EDGE source_id="mod_search_script" target_id="mod_superset_tool" relation="USES"/>
<EDGE source_id="mod_fileio" target_id="func_process_yaml_value" relation="CONTAINS"/>
<EDGE source_id="mod_fileio" target_id="func_update_yaml_file" relation="CONTAINS"/>
<EDGE source_id="func_update_yamls" target_id="func_update_yaml_file" relation="CALLS"/>
<EDGE source_id="func_update_yaml_file" target_id="func_process_yaml_value" relation="CALLS"/>
<EDGE source_id="mod_migration_script" target_id="class_migration" relation="CONTAINS"/>
<EDGE source_id="class_migration" target_id="func_run_migration" relation="CONTAINS"/>
<EDGE source_id="class_migration" target_id="func_select_environments" relation="CONTAINS"/>
<EDGE source_id="class_migration" target_id="func_select_dashboards" relation="CONTAINS"/>
<EDGE source_id="class_migration" target_id="func_confirm_db_config_replacement" relation="CONTAINS"/>
<EDGE source_id="func_run_migration" target_id="func_select_environments" relation="CALLS"/>
<EDGE source_id="func_run_migration" target_id="func_select_dashboards" relation="CALLS"/>
<EDGE source_id="func_run_migration" target_id="func_confirm_db_config_replacement" relation="CALLS"/>
<EDGE source_id="class_migration" target_id="func_execute_migration" relation="CONTAINS"/>
<EDGE source_id="func_run_migration" target_id="func_execute_migration" relation="CALLS"/>
</SEMANTIC_GRAPH>
</PROJECT_SEMANTICS>

View File

@@ -0,0 +1,93 @@
# Инструменты автоматизации Superset
## Обзор
Этот репозиторий содержит Python-скрипты и библиотеку (`superset_tool`) для автоматизации задач в Apache Superset, таких как:
- **Резервное копирование**: Экспорт всех дашбордов из экземпляра Superset в локальное хранилище.
- **Миграция**: Перенос и преобразование дашбордов между разными средами Superset (например, Development, Sandbox, Production).
## Структура проекта
- `backup_script.py`: Основной скрипт для выполнения запланированного резервного копирования дашбордов Superset.
- `migration_script.py`: Основной скрипт для переноса конкретных дашбордов между окружениями, включая переопределение соединений с базами данных.
- `search_script.py`: Скрипт для поиска данных во всех доступных датасетах на сервере
- `superset_tool/`:
- `client.py`: Python-клиент для взаимодействия с API Superset.
- `exceptions.py`: Пользовательские классы исключений для структурированной обработки ошибок.
- `models.py`: Pydantic-модели для валидации конфигурационных данных.
- `utils/`:
- `fileio.py`: Утилиты для работы с файловой системой (работа с архивами, парсинг YAML).
- `logger.py`: Конфигурация логгера для единообразного логирования в проекте.
- `network.py`: HTTP-клиент для сетевых запросов с обработкой аутентификации и повторных попыток.
## Настройка
### Требования
- Python 3.9+
- `pip` для управления пакетами.
- `keyring` для безопасного хранения паролей.
### Установка
1. **Клонируйте репозиторий:**
```bash
git clone https://prod.gitlab.dwh.rusal.com/dwh_bi/superset-tools.git
cd superset-tools
```
2. **Установите зависимости:**
```bash
pip install -r requirements.txt
```
(Возможно, потребуется создать `requirements.txt` с `pydantic`, `requests`, `keyring`, `PyYAML`, `urllib3`)
3. **Настройте пароли:**
Используйте `keyring` для хранения паролей API-пользователей Superset.
Пример для `backup_script.py`:
```python
import keyring
keyring.set_password("system", "dev migrate", "пароль пользователя migrate_user")
keyring.set_password("system", "prod migrate", "пароль пользователя migrate_user")
keyring.set_password("system", "sandbox migrate", "пароль пользователя migrate_user")
```
При необходимости замените `"system"` на подходящее имя сервиса.
## Использование
### Скрипт резервного копирования (`backup_script.py`)
Для создания резервных копий дашбордов из настроенных окружений Superset:
```bash
python backup_script.py
```
Резервные копии сохраняются в `P:\Superset\010 Бекапы\` по умолчанию. Логи хранятся в `P:\Superset\010 Бекапы\Logs`.
### Скрипт миграции (`migration_script.py`)
Для переноса конкретного дашборда:
```bash
python migration_script.py
```
**Примечание:** В текущей версии скрипт переносит жестко заданный дашборд (`FI0070`) и использует локальный `.zip` файл в качестве источника. **Для использования в Production необходимо:**
- В текущей версии управление откуда и куда выполняется параметрами
`from_c` и `to_c`.
### Скрипт поиска (`search_script.py`)
Строка для поиска и клиенты для поиска задаются здесь
# Поиск всех таблиц в датасете
```python
results = search_datasets(
client=clients['dev'],
search_pattern=r'dm_view\.account_debt',
search_fields=["sql"],
logger=logger
)
```
## Логирование
Логи пишутся в файл в директории `Logs` (например, `P:\Superset\010 Бекапы\Logs` для резервных копий) и выводятся в консоль. Уровень логирования по умолчанию — `INFO`.
## Разработка и вклад
- Следуйте архитектурным паттернам (`[MODULE]`, `[CONTRACT]`, `[SECTION]`, `[ANCHOR]`) и правилам логирования.
- Весь новый код должен соответствовать принципам "LLM-friendly" генерации.
- Используйте `Pydantic`-модели для валидации данных.
- Реализуйте всестороннюю обработку ошибок с помощью пользовательских исключений.
---
[COHERENCE_CHECK_PASSED] README.md создан и согласован с модулями.
Перевод выполнен с сохранением оригинальной Markdown-разметки и стиля документа. [1]

View File

@@ -1,177 +1,146 @@
# pylint: disable=too-many-arguments,too-many-locals,too-many-statements,too-many-branches,unused-argument,invalid-name,redefined-outer-name
"""
[MODULE] Superset Dashboard Backup Script
@contract: Автоматизирует процесс резервного копирования дашбордов Superset.
"""
# [IMPORTS] Стандартная библиотека
import logging import logging
from datetime import datetime import sys
import shutil
import keyring
import os
from pathlib import Path from pathlib import Path
from superset_tool.models import SupersetConfig, DatabaseConfig from dataclasses import dataclass
# [IMPORTS] Third-party
from requests.exceptions import RequestException
# [IMPORTS] Локальные модули
from superset_tool.client import SupersetClient from superset_tool.client import SupersetClient
from superset_tool.exceptions import SupersetAPIError
from superset_tool.utils.logger import SupersetLogger from superset_tool.utils.logger import SupersetLogger
from superset_tool.utils.fileio import save_and_unpack_dashboard, archive_exports, sanitize_filename from superset_tool.utils.fileio import (
save_and_unpack_dashboard,
archive_exports,
sanitize_filename,
consolidate_archive_folders,
remove_empty_directories
)
from superset_tool.utils.init_clients import setup_clients
def setup_clients(logger: SupersetLogger): # [ENTITY: Dataclass('BackupConfig')]
"""Инициализация клиентов для разных окружений""" # CONTRACT:
clients = {} # PURPOSE: Хранит конфигурацию для процесса бэкапа.
try: @dataclass
# Конфигурация для Dev class BackupConfig:
dev_config = SupersetConfig( """Конфигурация для процесса бэкапа."""
base_url="https://devta.bi.dwh.rusal.com/api/v1", consolidate: bool = True
auth={ rotate_archive: bool = True
"provider": "db", clean_folders: bool = True
"username": "migrate_user",
"password": keyring.get_password("system", "dev migrate"),
"refresh": True
},
logger=logger,
verify_ssl=False
)
# Конфигурация для Prod # [ENTITY: Function('backup_dashboards')]
prod_config = SupersetConfig( # CONTRACT:
base_url="https://prodta.bi.dwh.rusal.com/api/v1", # PURPOSE: Выполняет бэкап всех доступных дашбордов для заданного клиента и окружения.
auth={ # PRECONDITIONS:
"provider": "db", # - `client` должен быть инициализированным экземпляром `SupersetClient`.
"username": "migrate_user", # - `env_name` должен быть строкой, обозначающей окружение.
"password": keyring.get_password("system", "prod migrate"), # - `backup_root` должен быть валидным путем к корневой директории бэкапа.
"refresh": True # POSTCONDITIONS:
}, # - Дашборды экспортируются и сохраняются.
logger=logger, # - Возвращает `True` если все дашборды были экспортированы без критических ошибок, `False` иначе.
verify_ssl=False def backup_dashboards(
) client: SupersetClient,
env_name: str,
# Конфигурация для Sandbox backup_root: Path,
sandbox_config = SupersetConfig( logger: SupersetLogger,
base_url="https://sandboxta.bi.dwh.rusal.com/api/v1", config: BackupConfig
auth={ ) -> bool:
"provider": "db", logger.info(f"[STATE][backup_dashboards][ENTER] Starting backup for {env_name}.")
"username": "migrate_user",
"password": keyring.get_password("system", "sandbox migrate"),
"refresh": True
},
logger=logger,
verify_ssl=False
)
clients['dev'] = SupersetClient(dev_config)
clients['sbx'] = SupersetClient(sandbox_config)
clients['prod'] = SupersetClient(prod_config)
logger.info("Клиенты для окружений успешно инициализированы")
return clients
except Exception as e:
logger.error(f"Ошибка инициализации клиентов: {str(e)}")
raise
def backup_dashboards(client, env_name, backup_root, logger):
"""Выполнение бэкапа дашбордов с детальным логированием ошибок"""
try: try:
dashboard_count, dashboard_meta = client.get_dashboards() dashboard_count, dashboard_meta = client.get_dashboards()
logger.info(f"[STATE][backup_dashboards][PROGRESS] Found {dashboard_count} dashboards to export in {env_name}.")
if dashboard_count == 0: if dashboard_count == 0:
logger.warning(f"Нет дашбордов для экспорта в {env_name}")
return True return True
success = 0 success_count = 0
errors = []
for db in dashboard_meta: for db in dashboard_meta:
if not db.get('slug'): dashboard_id = db.get('id')
dashboard_title = db.get('dashboard_title', 'Unknown Dashboard')
if not dashboard_id:
continue continue
try: try:
dashboard_title = db['dashboard_title'] dashboard_base_dir_name = sanitize_filename(f"{dashboard_title}")
dashboard_dir = Path(backup_root) / env_name / sanitize_filename(dashboard_title) dashboard_dir = backup_root / env_name / dashboard_base_dir_name
dashboard_dir.mkdir(parents=True, exist_ok=True) dashboard_dir.mkdir(parents=True, exist_ok=True)
zip_content, filename = client.export_dashboard(db['id']) zip_content, filename = client.export_dashboard(dashboard_id)
save_and_unpack_dashboard( save_and_unpack_dashboard(
zip_content=zip_content, zip_content=zip_content,
original_filename=filename, original_filename=filename,
output_dir=dashboard_dir, output_dir=dashboard_dir,
unpack=False unpack=False,
logger=logger
) )
# Архивирование старых бэкапов if config.rotate_archive:
try: archive_exports(str(dashboard_dir), logger=logger)
archive_exports(dashboard_dir)
except Exception as cleanup_error:
logger.warning(f"Ошибка очистки архива: {cleanup_error}")
success += 1 success_count += 1
except (SupersetAPIError, RequestException, IOError, OSError) as db_error:
logger.error(f"[STATE][backup_dashboards][FAILURE] Failed to export dashboard {dashboard_title}: {db_error}", exc_info=True)
except Exception as db_error: if config.consolidate:
error_info = { consolidate_archive_folders(backup_root / env_name , logger=logger)
'dashboard': db.get('dashboard_title'),
'error': str(db_error),
'env': env_name
}
errors.append(error_info)
logger.error("Ошибка экспорта дашборда", extra=error_info)
if errors: if config.clean_folders:
logger.error(f"Итоги экспорта для {env_name}", remove_empty_directories(str(backup_root / env_name), logger=logger)
extra={'success': success, 'errors': errors, 'total': dashboard_count})
return len(errors) == 0 return success_count == dashboard_count
except (RequestException, IOError) as e:
except Exception as e: logger.critical(f"[STATE][backup_dashboards][FAILURE] Fatal error during backup for {env_name}: {e}", exc_info=True)
logger.critical(f"Фатальная ошибка бэкапа {env_name}: {str(e)}", exc_info=True)
return False return False
# END_FUNCTION_backup_dashboards
def main(): # [ENTITY: Function('main')]
# Инициализация логгера # CONTRACT:
# PURPOSE: Основная точка входа скрипта.
# PRECONDITIONS: None
# POSTCONDITIONS: Возвращает код выхода.
def main() -> int:
log_dir = Path("P:\\Superset\\010 Бекапы\\Logs") log_dir = Path("P:\\Superset\\010 Бекапы\\Logs")
logger = SupersetLogger( logger = SupersetLogger(log_dir=log_dir, level=logging.INFO, console=True)
log_dir=log_dir, logger.info("[STATE][main][ENTER] Starting Superset backup process.")
level=logging.INFO,
console=True
)
"""Основная функция выполнения бэкапа"""
logger.info("="*50)
logger.info("Запуск процесса бэкапа Superset")
logger.info("="*50)
exit_code = 0
try: try:
clients = setup_clients(logger) clients = setup_clients(logger)
superset_backup_repo = Path("P:\\Superset\\010 Бекапы") superset_backup_repo = Path("P:\\Superset\\010 Бекапы")
superset_backup_repo.mkdir(parents=True, exist_ok=True)
# Бэкап для DEV results = {}
dev_success = backup_dashboards( environments = ['dev', 'sbx', 'prod', 'preprod']
clients['dev'], backup_config = BackupConfig(rotate_archive=True)
"DEV",
superset_backup_repo,
logger=logger
)
#Бэкап для Sandbox for env in environments:
sbx_success = backup_dashboards( results[env] = backup_dashboards(
clients['sbx'], clients[env],
"SBX", env.upper(),
superset_backup_repo, superset_backup_repo,
logger=logger logger=logger,
) config=backup_config
#Бэкап для Прода )
prod_success = backup_dashboards(
clients['prod'],
"PROD",
superset_backup_repo,
logger=logger
)
# Итоговый отчет if not all(results.values()):
logger.info("="*50) exit_code = 1
logger.info("Итоги выполнения бэкапа:")
logger.info(f"DEV: {'Успешно' if dev_success else 'С ошибками'}")
logger.info(f"SBX: {'Успешно' if sbx_success else 'С ошибками'}")
logger.info(f"PROD: {'Успешно' if prod_success else 'С ошибками'}")
logger.info(f"Полный лог доступен в: {log_dir}")
except Exception as e: except (RequestException, IOError) as e:
logger.critical(f"Фатальная ошибка выполнения скрипта: {str(e)}", exc_info=True) logger.critical(f"[STATE][main][FAILURE] Fatal error in main execution: {e}", exc_info=True)
return 1 exit_code = 1
logger.info("Процесс бэкапа завершен") logger.info("[STATE][main][SUCCESS] Superset backup process finished.")
return 0 return exit_code
# END_FUNCTION_main
if __name__ == "__main__": if __name__ == "__main__":
exit_code = main() sys.exit(main())
exit(exit_code)

View File

@@ -1,142 +1,303 @@
from superset_tool.models import SupersetConfig # -*- coding: utf-8 -*-
# CONTRACT:
# PURPOSE: Интерактивный скрипт для миграции ассетов Superset между различными окружениями.
# SPECIFICATION_LINK: mod_migration_script
# PRECONDITIONS: Наличие корректных конфигурационных файлов для подключения к Superset.
# POSTCONDITIONS: Выбранные ассеты успешно перенесены из исходного в целевое окружение.
# IMPORTS: [argparse, superset_tool.client, superset_tool.utils.init_clients, superset_tool.utils.logger, superset_tool.utils.fileio]
"""
[MODULE] Superset Migration Tool
@description: Интерактивный скрипт для миграции ассетов Superset между различными окружениями.
"""
# [IMPORTS]
from superset_tool.client import SupersetClient from superset_tool.client import SupersetClient
from superset_tool.utils.init_clients import init_superset_clients
from superset_tool.utils.logger import SupersetLogger from superset_tool.utils.logger import SupersetLogger
from superset_tool.exceptions import AuthenticationError from superset_tool.utils.fileio import (
from superset_tool.utils.fileio import save_and_unpack_dashboard, update_yamls, create_dashboard_export, create_temp_file,read_dashboard_from_disk save_and_unpack_dashboard,
import os read_dashboard_from_disk,
import keyring update_yamls,
from pathlib import Path create_dashboard_export
import logging
log_dir = Path("H:\\dev\\Logs")
logger = SupersetLogger(
log_dir=log_dir,
level=logging.INFO,
console=True
) )
database_config_click={"old": # [ENTITY: Class('Migration')]
{ # CONTRACT:
"database_name": "Prod Clickhouse", # PURPOSE: Инкапсулирует логику и состояние процесса миграции.
"sqlalchemy_uri": "clickhousedb+connect://clicketl:XXXXXXXXXX@rgm-s-khclk.hq.root.ad:443/dm", # SPECIFICATION_LINK: class_migration
"uuid": "b9b67cb5-9874-4dc6-87bd-354fc33be6f9", # ATTRIBUTES:
"database_uuid": "b9b67cb5-9874-4dc6-87bd-354fc33be6f9", # - name: logger, type: SupersetLogger, description: Экземпляр логгера.
"allow_ctas": "false", # - name: from_c, type: SupersetClient, description: Клиент для исходного окружения.
"allow_cvas": "false", # - name: to_c, type: SupersetClient, description: Клиент для целевого окружения.
"allow_dml": "false" # - name: dashboards_to_migrate, type: list, description: Список дашбордов для миграции.
}, # - name: db_config_replacement, type: dict, description: Конфигурация для замены данных БД.
"new": { class Migration:
"database_name": "Dev Clickhouse", """
"sqlalchemy_uri": "clickhousedb+connect://dwhuser:XXXXXXXXXX@10.66.229.179:8123/dm", Класс для управления процессом миграции дашбордов Superset.
"uuid": "e9fd8feb-cb77-4e82-bc1d-44768b8d2fc2", """
"database_uuid": "e9fd8feb-cb77-4e82-bc1d-44768b8d2fc2", def __init__(self):
"allow_ctas": "true", self.logger = SupersetLogger(name="migration_script")
"allow_cvas": "true", self.from_c: SupersetClient = None
"allow_dml": "true" self.to_c: SupersetClient = None
} self.dashboards_to_migrate = []
} self.db_config_replacement = None
# END_FUNCTION___init__
database_config_gp={"old": # [ENTITY: Function('run')]
{ # CONTRACT:
"database_name": "Prod Greenplum", # PURPOSE: Запускает основной воркфлоу миграции, координируя все шаги.
"sqlalchemy_uri": "postgresql+psycopg2://viz_powerbi_gp_prod:XXXXXXXXXX@10.66.229.201:5432/dwh", # SPECIFICATION_LINK: func_run_migration
"uuid": "805132a3-e942-40ce-99c7-bee8f82f8aa8", # PRECONDITIONS: None
"database_uuid": "805132a3-e942-40ce-99c7-bee8f82f8aa8", # POSTCONDITIONS: Процесс миграции завершен.
"allow_ctas": "true", def run(self):
"allow_cvas": "true", """Запускает основной воркфлоу миграции."""
"allow_dml": "true" self.logger.info("[INFO][run][ENTER] Запуск скрипта миграции.")
}, self.select_environments()
"new": { self.select_dashboards()
"database_name": "DEV Greenplum", self.confirm_db_config_replacement()
"sqlalchemy_uri": "postgresql+psycopg2://viz_superset_gp_dev:XXXXXXXXXX@10.66.229.171:5432/dwh", self.execute_migration()
"uuid": "97b97481-43c3-4181-94c5-b69eaaa1e11f", self.logger.info("[INFO][run][EXIT] Скрипт миграции завершен.")
"database_uuid": "97b97481-43c3-4181-94c5-b69eaaa1e11f", # END_FUNCTION_run
"allow_ctas": "false",
"allow_cvas": "false",
"allow_dml": "false"
}
}
# Конфигурация для Dev # [ENTITY: Function('select_environments')]
dev_config = SupersetConfig( # CONTRACT:
base_url="https://devta.bi.dwh.rusal.com/api/v1", # PURPOSE: Шаг 1. Обеспечивает интерактивный выбор исходного и целевого окружений.
auth={ # SPECIFICATION_LINK: func_select_environments
"provider": "db", # PRECONDITIONS: None
"username": "migrate_user", # POSTCONDITIONS: Атрибуты `self.from_c` и `self.to_c` инициализированы валидными клиентами Superset.
"password": keyring.get_password("system", "dev migrate"), def select_environments(self):
"refresh": True """Шаг 1: Выбор окружений (источник и назначение)."""
}, self.logger.info("[INFO][select_environments][ENTER] Шаг 1/4: Выбор окружений.")
logger=logger,
verify_ssl=False
)
# Конфигурация для Prod available_envs = {"1": "DEV", "2": "PROD"}
prod_config = SupersetConfig(
base_url="https://prodta.bi.dwh.rusal.com/api/v1",
auth={
"provider": "db",
"username": "migrate_user",
"password": keyring.get_password("system", "prod migrate"),
"refresh": True
},
logger=logger,
verify_ssl=False
)
# Конфигурация для Sandbox print("Доступные окружения:")
sandbox_config = SupersetConfig( for key, value in available_envs.items():
base_url="https://sandboxta.bi.dwh.rusal.com/api/v1", print(f" {key}. {value}")
auth={
"provider": "db",
"username": "migrate_user",
"password": keyring.get_password("system", "sandbox migrate"),
"refresh": True
},
logger=logger,
verify_ssl=False
)
# Инициализация клиента while self.from_c is None:
try:
from_env_choice = input("Выберите исходное окружение (номер): ")
from_env_name = available_envs.get(from_env_choice)
if not from_env_name:
print("Неверный выбор. Попробуйте снова.")
continue
dev_client = SupersetClient(dev_config) clients = init_superset_clients(self.logger, env=from_env_name.lower())
sandbox_client = SupersetClient(sandbox_config) self.from_c = clients[0]
prod_client = SupersetClient(prod_config) self.logger.info(f"[INFO][select_environments][STATE] Исходное окружение: {from_env_name}")
from_c = sandbox_client except Exception as e:
to_c = dev_client self.logger.error(f"[ERROR][select_environments][FAILURE] Ошибка при инициализации клиента-источника: {e}", exc_info=True)
dashboard_slug = "FI0070" print("Не удалось инициализировать клиент. Проверьте конфигурацию.")
dashboard_id = 53
dashboard_meta = from_c.get_dashboard(dashboard_slug) while self.to_c is None:
#print(dashboard_meta) try:
#print(dashboard_meta["dashboard_title"]) to_env_choice = input("Выберите целевое окружение (номер): ")
to_env_name = available_envs.get(to_env_choice)
dashboard_id = dashboard_meta["id"] if not to_env_name:
print("Неверный выбор. Попробуйте снова.")
continue
with create_temp_file(suffix='.dir', logger=logger) as temp_root: if to_env_name == self.from_c.env:
# Экспорт дашборда во временную директорию print("Целевое и исходное окружения не могут совпадать.")
#zip_content, filename = from_c.export_dashboard(dashboard_id, logger=logger) continue
zip_db_path = r"C:\Users\VolobuevAA\Downloads\dashboard_export_20250616T174203.zip"
zip_content, filename = read_dashboard_from_disk(zip_db_path, logger=logger) clients = init_superset_clients(self.logger, env=to_env_name.lower())
self.to_c = clients[0]
self.logger.info(f"[INFO][select_environments][STATE] Целевое окружение: {to_env_name}")
# Сохранение и распаковка во временную директорию except Exception as e:
zip_path, unpacked_path = save_and_unpack_dashboard( self.logger.error(f"[ERROR][select_environments][FAILURE] Ошибка при инициализации целевого клиента: {e}", exc_info=True)
zip_content=zip_content, print("Не удалось инициализировать клиент. Проверьте конфигурацию.")
original_filename=filename, self.logger.info("[INFO][select_environments][EXIT] Шаг 1 завершен.")
unpack=True, # END_FUNCTION_select_environments
logger=logger,
output_dir=temp_root
)
# Обновление конфигураций # [ENTITY: Function('select_dashboards')]
source_path = unpacked_path / Path(filename).stem # CONTRACT:
update_yamls([database_config_click,database_config_gp], path=source_path, logger=logger) # PURPOSE: Шаг 2. Обеспечивает интерактивный выбор дашбордов для миграции.
# SPECIFICATION_LINK: func_select_dashboards
# PRECONDITIONS: `self.from_c` должен быть инициализирован.
# POSTCONDITIONS: `self.dashboards_to_migrate` содержит список выбранных дашбордов.
def select_dashboards(self):
"""Шаг 2: Выбор дашбордов для миграции."""
self.logger.info("[INFO][select_dashboards][ENTER] Шаг 2/4: Выбор дашбордов.")
# Создание нового экспорта во временной директории try:
temp_zip = temp_root / f"{dashboard_slug}.zip" all_dashboards = self.from_c.get_dashboards()
create_dashboard_export(temp_zip, [source_path], logger=logger) if not all_dashboards:
self.logger.warning("[WARN][select_dashboards][STATE] В исходном окружении не найдено дашбордов.")
print("В исходном окружении не найдено дашбордов.")
return
# Импорт обновленного дашборда while True:
to_c.import_dashboard(temp_zip) print("\nДоступные дашборды:")
for i, dashboard in enumerate(all_dashboards):
print(f" {i + 1}. {dashboard['dashboard_title']}")
print("\nОпции:")
print(" - Введите номера дашбордов через запятую (например, 1, 3, 5).")
print(" - Введите 'все' для выбора всех дашбордов.")
print(" - Введите 'поиск <запрос>' для поиска дашбордов.")
print(" - Введите 'выход' для завершения.")
choice = input("Ваш выбор: ").lower().strip()
if choice == 'выход':
break
elif choice == 'все':
self.dashboards_to_migrate = all_dashboards
self.logger.info(f"[INFO][select_dashboards][STATE] Выбраны все дашборды: {len(self.dashboards_to_migrate)}")
break
elif choice.startswith('поиск '):
search_query = choice[6:].strip()
filtered_dashboards = [d for d in all_dashboards if search_query in d['dashboard_title'].lower()]
if not filtered_dashboards:
print("По вашему запросу ничего не найдено.")
else:
all_dashboards = filtered_dashboards
continue
else:
try:
selected_indices = [int(i.strip()) - 1 for i in choice.split(',')]
self.dashboards_to_migrate = [all_dashboards[i] for i in selected_indices if 0 <= i < len(all_dashboards)]
self.logger.info(f"[INFO][select_dashboards][STATE] Выбрано дашбордов: {len(self.dashboards_to_migrate)}")
break
except (ValueError, IndexError):
print("Неверный ввод. Пожалуйста, введите корректные номера.")
except Exception as e:
self.logger.error(f"[ERROR][select_dashboards][FAILURE] Ошибка при получении или выборе дашбордов: {e}", exc_info=True)
print("Произошла ошибка при работе с дашбордами.")
self.logger.info("[INFO][select_dashboards][EXIT] Шаг 2 завершен.")
# END_FUNCTION_select_dashboards
# [ENTITY: Function('confirm_db_config_replacement')]
# CONTRACT:
# PURPOSE: Шаг 3. Управляет процессом подтверждения и настройки замены конфигураций БД.
# SPECIFICATION_LINK: func_confirm_db_config_replacement
# PRECONDITIONS: `self.from_c` и `self.to_c` инициализированы.
# POSTCONDITIONS: `self.db_config_replacement` содержит конфигурацию для замены или `None`.
def confirm_db_config_replacement(self):
"""Шаг 3: Подтверждение и настройка замены конфигурации БД."""
self.logger.info("[INFO][confirm_db_config_replacement][ENTER] Шаг 3/4: Замена конфигурации БД.")
while True:
choice = input("Хотите ли вы заменить конфигурации баз данных в YAML-файлах? (да/нет): ").lower().strip()
if choice in ["да", "нет"]:
break
print("Неверный ввод. Пожалуйста, введите 'да' или 'нет'.")
if choice == 'нет':
self.logger.info("[INFO][confirm_db_config_replacement][STATE] Замена конфигурации БД пропущена.")
return
# Эвристический расчет
from_env = self.from_c.env.upper()
to_env = self.to_c.env.upper()
heuristic_applied = False
if from_env == "DEV" and to_env == "PROD":
self.db_config_replacement = {"old": {"database_name": "db_dev"}, "new": {"database_name": "db_prod"}} # Пример
self.logger.info("[INFO][confirm_db_config_replacement][STATE] Применена эвристика DEV -> PROD.")
heuristic_applied = True
elif from_env == "PROD" and to_env == "DEV":
self.db_config_replacement = {"old": {"database_name": "db_prod"}, "new": {"database_name": "db_dev"}} # Пример
self.logger.info("[INFO][confirm_db_config_replacement][STATE] Применена эвристика PROD -> DEV.")
heuristic_applied = True
if heuristic_applied:
print(f"На основе эвристики будет произведена следующая замена: {self.db_config_replacement}")
confirm = input("Подтверждаете? (да/нет): ").lower().strip()
if confirm != 'да':
self.db_config_replacement = None
heuristic_applied = False
if not heuristic_applied:
print("Пожалуйста, введите детали для замены.")
old_key = input("Ключ для замены (например, database_name): ")
old_value = input(f"Старое значение для {old_key}: ")
new_value = input(f"Новое значение для {old_key}: ")
self.db_config_replacement = {"old": {old_key: old_value}, "new": {old_key: new_value}}
self.logger.info(f"[INFO][confirm_db_config_replacement][STATE] Установлена ручная замена: {self.db_config_replacement}")
self.logger.info("[INFO][confirm_db_config_replacement][EXIT] Шаг 3 завершен.")
# END_FUNCTION_confirm_db_config_replacement
# [ENTITY: Function('execute_migration')]
# CONTRACT:
# PURPOSE: Шаг 4. Выполняет фактическую миграцию выбранных дашбордов.
# SPECIFICATION_LINK: func_execute_migration
# PRECONDITIONS: Все предыдущие шаги (`select_environments`, `select_dashboards`) успешно выполнены.
# POSTCONDITIONS: Выбранные дашборды перенесены в целевое окружение.
def execute_migration(self):
"""Шаг 4: Выполнение миграции и обновления конфигураций."""
self.logger.info("[INFO][execute_migration][ENTER] Шаг 4/4: Выполнение миграции.")
if not self.dashboards_to_migrate:
self.logger.warning("[WARN][execute_migration][STATE] Нет дашбордов для миграции.")
print("Нет дашбордов для миграции. Завершение.")
return
db_configs_for_update = []
if self.db_config_replacement:
try:
from_dbs = self.from_c.get_databases()
to_dbs = self.to_c.get_databases()
# Просто пример, как можно было бы сопоставить базы данных.
# В реальном сценарии логика может быть сложнее.
for from_db in from_dbs:
for to_db in to_dbs:
# Предполагаем, что мы можем сопоставить базы по имени, заменив суффикс
if from_db['database_name'].replace(self.from_c.env.upper(), self.to_c.env.upper()) == to_db['database_name']:
db_configs_for_update.append({
"old": {"database_name": from_db['database_name']},
"new": {"database_name": to_db['database_name']}
})
self.logger.info(f"[INFO][execute_migration][STATE] Сформированы конфигурации для замены БД: {db_configs_for_update}")
except Exception as e:
self.logger.error(f"[ERROR][execute_migration][FAILURE] Не удалось получить конфигурации БД: {e}", exc_info=True)
print("Не удалось получить конфигурации БД. Миграция будет продолжена без замены.")
for dashboard in self.dashboards_to_migrate:
try:
dashboard_id = dashboard['id']
self.logger.info(f"[INFO][execute_migration][PROGRESS] Миграция дашборда: {dashboard['dashboard_title']} (ID: {dashboard_id})")
# 1. Экспорт
exported_content = self.from_c.export_dashboards(dashboard_id)
zip_path, unpacked_path = save_and_unpack_dashboard(exported_content, f"temp_export_{dashboard_id}", unpack=True)
self.logger.info(f"[INFO][execute_migration][STATE] Дашборд экспортирован и распакован в {unpacked_path}")
# 2. Обновление YAML, если нужно
if db_configs_for_update:
update_yamls(db_configs=db_configs_for_update, path=str(unpacked_path))
self.logger.info(f"[INFO][execute_migration][STATE] YAML-файлы обновлены.")
# 3. Упаковка и импорт
new_zip_path = f"migrated_dashboard_{dashboard_id}.zip"
create_dashboard_export(new_zip_path, [unpacked_path])
content_to_import, _ = read_dashboard_from_disk(new_zip_path)
self.to_c.import_dashboards(content_to_import)
self.logger.info(f"[INFO][execute_migration][SUCCESS] Дашборд {dashboard['dashboard_title']} успешно импортирован.")
except Exception as e:
self.logger.error(f"[ERROR][execute_migration][FAILURE] Ошибка при миграции дашборда {dashboard['dashboard_title']}: {e}", exc_info=True)
print(f"Не удалось смигрировать дашборд: {dashboard['dashboard_title']}")
self.logger.info("[INFO][execute_migration][EXIT] Шаг 4 завершен.")
# END_FUNCTION_execute_migration
# END_CLASS_Migration
# [MAIN_EXECUTION_BLOCK]
if __name__ == "__main__":
migration = Migration()
migration.run()
# END_MAIN_EXECUTION_BLOCK
# END_MODULE_migration_script

4
requirements.txt Normal file
View File

@@ -0,0 +1,4 @@
pyyaml
requests
keyring
urllib3

152
search_script.py Normal file
View File

@@ -0,0 +1,152 @@
# pylint: disable=too-many-arguments,too-many-locals,too-many-statements,too-many-branches,unused-argument,invalid-name,redefined-outer-name
"""
[MODULE] Dataset Search Utilities
@contract: Предоставляет функционал для поиска текстовых паттернов в метаданных датасетов Superset.
"""
# [IMPORTS] Стандартная библиотека
import logging
import re
from typing import Dict, Optional
# [IMPORTS] Third-party
from requests.exceptions import RequestException
# [IMPORTS] Локальные модули
from superset_tool.client import SupersetClient
from superset_tool.exceptions import SupersetAPIError
from superset_tool.utils.logger import SupersetLogger
from superset_tool.utils.init_clients import setup_clients
# [ENTITY: Function('search_datasets')]
# CONTRACT:
# PURPOSE: Выполняет поиск по строковому паттерну в метаданных всех датасетов.
# PRECONDITIONS:
# - `client` должен быть инициализированным экземпляром `SupersetClient`.
# - `search_pattern` должен быть валидной строкой регулярного выражения.
# POSTCONDITIONS:
# - Возвращает словарь с результатами поиска.
def search_datasets(
client: SupersetClient,
search_pattern: str,
logger: Optional[SupersetLogger] = None
) -> Optional[Dict]:
logger = logger or SupersetLogger(name="dataset_search")
logger.info(f"[STATE][search_datasets][ENTER] Searching for pattern: '{search_pattern}'")
try:
_, datasets = client.get_datasets(query={
"columns": ["id", "table_name", "sql", "database", "columns"]
})
if not datasets:
logger.warning("[STATE][search_datasets][EMPTY] No datasets found.")
return None
pattern = re.compile(search_pattern, re.IGNORECASE)
results = {}
available_fields = set(datasets[0].keys())
for dataset in datasets:
dataset_id = dataset.get('id')
if not dataset_id:
continue
matches = []
for field in available_fields:
value = str(dataset.get(field, ""))
if pattern.search(value):
match_obj = pattern.search(value)
matches.append({
"field": field,
"match": match_obj.group() if match_obj else "",
"value": value
})
if matches:
results[dataset_id] = matches
logger.info(f"[STATE][search_datasets][SUCCESS] Found matches in {len(results)} datasets.")
return results
except re.error as e:
logger.error(f"[STATE][search_datasets][FAILURE] Invalid regex pattern: {e}", exc_info=True)
raise
except (SupersetAPIError, RequestException) as e:
logger.critical(f"[STATE][search_datasets][FAILURE] Critical error during search: {e}", exc_info=True)
raise
# END_FUNCTION_search_datasets
# [ENTITY: Function('print_search_results')]
# CONTRACT:
# PURPOSE: Форматирует результаты поиска для читаемого вывода в консоль.
# PRECONDITIONS:
# - `results` является словарем, возвращенным `search_datasets`, или `None`.
# POSTCONDITIONS:
# - Возвращает отформатированную строку с результатами.
def print_search_results(results: Optional[Dict], context_lines: int = 3) -> str:
if not results:
return "Ничего не найдено"
output = []
for dataset_id, matches in results.items():
output.append(f"\n--- Dataset ID: {dataset_id} ---")
for match_info in matches:
field = match_info['field']
match_text = match_info['match']
full_value = match_info['value']
output.append(f" - Поле: {field}")
output.append(f" Совпадение: '{match_text}'")
lines = full_value.splitlines()
if not lines:
continue
match_line_index = -1
for i, line in enumerate(lines):
if match_text in line:
match_line_index = i
break
if match_line_index != -1:
start_line = max(0, match_line_index - context_lines)
end_line = min(len(lines), match_line_index + context_lines + 1)
output.append(" Контекст:")
for i in range(start_line, end_line):
line_number = i + 1
line_content = lines[i]
prefix = f"{line_number:5d}: "
if i == match_line_index:
highlighted_line = line_content.replace(match_text, f">>>{match_text}<<<")
output.append(f" {prefix}{highlighted_line}")
else:
output.append(f" {prefix}{line_content}")
output.append("-" * 25)
return "\n".join(output)
# END_FUNCTION_print_search_results
# [ENTITY: Function('main')]
# CONTRACT:
# PURPOSE: Основная точка входа скрипта.
# PRECONDITIONS: None
# POSTCONDITIONS: None
def main():
logger = SupersetLogger(level=logging.INFO, console=True)
clients = setup_clients(logger)
target_client = clients['dev']
search_query = r"match(r2.path_code, budget_reference.ref_code || '($|(\s))')"
results = search_datasets(
client=target_client,
search_pattern=search_query,
logger=logger
)
report = print_search_results(results)
logger.info(f"[STATE][main][SUCCESS] Search finished. Report:\n{report}")
# END_FUNCTION_main
if __name__ == "__main__":
main()

View File

View File

@@ -1,32 +1,20 @@
# [MODULE] Superset API Client # pylint: disable=too-many-arguments,too-many-locals,too-many-statements,too-many-branches,unused-argument
# @contract: Реализует полное взаимодействие с Superset API """
# @semantic_layers: [MODULE] Superset API Client
# 1. Авторизация/CSRF @contract: Реализует полное взаимодействие с Superset API
# 2. Основные операции (дашборды) """
# 3. Импорт/экспорт
# @coherence:
# - Согласован с models.SupersetConfig
# - Полная обработка всех errors из exceptions.py
# [IMPORTS] Стандартная библиотека # [IMPORTS] Стандартная библиотека
import json import json
from typing import Optional, Dict, Tuple, List, Any, Literal, Union from typing import Optional, Dict, Tuple, List, Any, Union
import datetime import datetime
from pathlib import Path from pathlib import Path
# [IMPORTS] Сторонние библиотеки
import requests
import urllib3
import zipfile import zipfile
from requests import Response
# [IMPORTS] Локальные модули # [IMPORTS] Локальные модули
from superset_tool.models import SupersetConfig from superset_tool.models import SupersetConfig
from superset_tool.exceptions import ( from superset_tool.exceptions import (
AuthenticationError,
SupersetAPIError,
DashboardNotFoundError,
NetworkError,
PermissionDeniedError,
ExportError, ExportError,
InvalidZipFormatError InvalidZipFormatError
) )
@@ -34,471 +22,292 @@ from superset_tool.utils.fileio import get_filename_from_headers
from superset_tool.utils.logger import SupersetLogger from superset_tool.utils.logger import SupersetLogger
from superset_tool.utils.network import APIClient from superset_tool.utils.network import APIClient
# [CONSTANTS] Логирование # [CONSTANTS]
HTTP_METHODS = Literal['GET', 'POST', 'PUT', 'DELETE'] DEFAULT_TIMEOUT = 30
DEFAULT_TIMEOUT = 30 # seconds
# [TYPE-ALIASES] Для сложных сигнатур # [TYPE-ALIASES]
JsonType = Union[Dict[str, Any], List[Dict[str, Any]]] JsonType = Union[Dict[str, Any], List[Dict[str, Any]]]
ResponseType = Tuple[bytes, str] ResponseType = Tuple[bytes, str]
# [CHECK] Валидация импортов для контрактов
try:
# Проверка наличия ключевых зависимостей
assert requests.__version__ >= '2.28.0' # для retry механизмов
assert urllib3.__version__ >= '1.26.0' # для SSL warnings
# Проверка локальных модулей
from .utils.fileio import get_filename_from_headers as fileio_check
assert callable(fileio_check)
except (ImportError, AssertionError) as imp_err:
raise RuntimeError(
f"[COHERENCE_CHECK_FAILED] Импорт не прошел валидацию: {str(imp_err)}"
) from imp_err
class SupersetClient: class SupersetClient:
"""[MAIN-CONTRACT] Клиент для работы с Superset API """[MAIN-CONTRACT] Клиент для работы с Superset API"""
@pre: # [ENTITY: Function('__init__')]
- config должен быть валидным SupersetConfig # CONTRACT:
- Целевой API доступен # PURPOSE: Инициализация клиента Superset.
@post: # PRECONDITIONS: `config` должен быть валидным `SupersetConfig`.
- Все методы возвращают данные или вызывают явные ошибки # POSTCONDITIONS: Клиент успешно инициализирован.
- Токены автоматически обновляются def __init__(self, config: SupersetConfig, logger: Optional[SupersetLogger] = None):
@invariant: self.logger = logger or SupersetLogger(name="SupersetClient")
- Сессия остается валидной между вызовами self.logger.info("[INFO][SupersetClient.__init__][ENTER] Initializing SupersetClient.")
- Все ошибки типизированы согласно exceptions.py
"""
def __init__(self, config: SupersetConfig):
"""[INIT] Инициализация клиента
@semantic:
- Создает сессию requests
- Настраивает адаптеры подключения
- Выполняет первичную аутентификацию
"""
self._validate_config(config) self._validate_config(config)
self.config = config self.config = config
self.logger = config.logger or SupersetLogger(name="client")
self.network = APIClient( self.network = APIClient(
base_url=config.base_url, config=config.dict(),
auth=config.auth, verify_ssl=config.verify_ssl,
verify_ssl=config.verify_ssl timeout=config.timeout,
logger=self.logger
) )
self.tokens = self.network.authenticate() self.logger.info("[INFO][SupersetClient.__init__][SUCCESS] SupersetClient initialized successfully.")
# END_FUNCTION___init__
try:
self.logger.info(
"[COHERENCE_CHECK_PASSED] Клиент успешно инициализирован",
extra={"base_url": config.base_url}
)
except Exception as e:
self.logger.error(
"[INIT_FAILED] Ошибка инициализации клиента",
exc_info=True,
extra={"config": config.dict()}
)
raise
# [ENTITY: Function('_validate_config')]
# CONTRACT:
# PURPOSE: Валидация конфигурации клиента.
# PRECONDITIONS: `config` должен быть экземпляром `SupersetConfig`.
# POSTCONDITIONS: Конфигурация валидна.
def _validate_config(self, config: SupersetConfig) -> None: def _validate_config(self, config: SupersetConfig) -> None:
"""[PRECONDITION] Валидация конфигурации клиента self.logger.debug("[DEBUG][SupersetClient._validate_config][ENTER] Validating config.")
@semantic:
- Проверяет обязательные поля
- Валидирует URL и учетные данные
@raise:
- ValueError при невалидных параметрах
- TypeError при некорректном типе
"""
if not isinstance(config, SupersetConfig): if not isinstance(config, SupersetConfig):
self.logger.error( self.logger.error("[ERROR][SupersetClient._validate_config][FAILURE] Invalid config type.")
"[CONFIG_VALIDATION_FAILED] Некорректный тип конфигурации",
extra={"actual_type": type(config).__name__}
)
raise TypeError("Конфигурация должна быть экземпляром SupersetConfig") raise TypeError("Конфигурация должна быть экземпляром SupersetConfig")
self.logger.debug("[DEBUG][SupersetClient._validate_config][SUCCESS] Config validated.")
required_fields = ["base_url", "auth"] # END_FUNCTION__validate_config
for field in required_fields:
if not getattr(config, field, None):
self.logger.error(
"[CONFIG_VALIDATION_FAILED] Отсутствует обязательное поле",
extra={"missing_field": field}
)
raise ValueError(f"Обязательное поле {field} не указано")
if not config.auth.get("username") or not config.auth.get("password"):
self.logger.error(
"[CONFIG_VALIDATION_FAILED] Не указаны учетные данные",
extra={"auth_keys": list(config.auth.keys())}
)
raise ValueError("В конфигурации должны быть указаны username и password")
# Дополнительная валидация URL
if not config.base_url.startswith(("http://", "https://")):
self.logger.error(
"[CONFIG_VALIDATION_FAILED] Некорректный URL",
extra={"base_url": config.base_url}
)
raise ValueError("base_url должен начинаться с http:// или https://")
@property @property
def headers(self) -> dict: def headers(self) -> dict:
"""[INTERFACE] Базовые заголовки для API-вызовов """[INTERFACE] Базовые заголовки для API-вызовов."""
@semantic: Объединяет общие заголовки для всех запросов return self.network.headers
@post: Всегда возвращает актуальные токены # END_FUNCTION_headers
"""
return {
"Authorization": f"Bearer {self.tokens['access_token']}",
"X-CSRFToken": self.tokens["csrf_token"],
"Referer": self.config.base_url,
"Content-Type": "application/json"
}
# [MAIN-OPERATIONS] Работа с дашбордами # [ENTITY: Function('get_dashboards')]
def get_dashboard(self, dashboard_id_or_slug: str) -> dict: # CONTRACT:
"""[CONTRACT] Получение метаданных дашборда # PURPOSE: Получение списка дашбордов с пагинацией.
@pre: # PRECONDITIONS: None
- dashboard_id_or_slug должен существовать # POSTCONDITIONS: Возвращает кортеж с общим количеством и списком дашбордов.
- Клиент должен быть аутентифицирован (tokens актуальны) def get_dashboards(self, query: Optional[Dict] = None) -> Tuple[int, List[Dict]]:
@post: self.logger.info("[INFO][SupersetClient.get_dashboards][ENTER] Getting dashboards.")
- Возвращает dict с метаданными дашборда validated_query = self._validate_query_params(query)
- В случае 404 вызывает DashboardNotFoundError total_count = self._fetch_total_object_count(endpoint="/dashboard/")
@semantic_layers: paginated_data = self._fetch_all_pages(
1. Взаимодействие с API через APIClient endpoint="/dashboard/",
2. Обработка специфичных для Superset ошибок pagination_options={
""" "base_query": validated_query,
try: "total_count": total_count,
response = self.network.request( "results_field": "result",
method="GET",
endpoint=f"/dashboard/{dashboard_id_or_slug}",
headers=self.headers # Автоматически включает токены
)
return response.json()["result"]
except requests.HTTPError as e:
if e.response.status_code == 404:
raise DashboardNotFoundError(
dashboard_id_or_slug,
context={"url": f"{self.config.base_url}/dashboard/{dashboard_id_or_slug}"}
)
raise SupersetAPIError(
f"API Error: {str(e)}",
status_code=e.response.status_code
) from e
# [ERROR-HANDLER] Централизованная обработка ошибок
def _handle_api_error(self, method_name: str, error: Exception, url: str) -> None:
"""[UNIFIED-ERROR] Обработка API-ошибок
@semantic: Преобразует requests исключения в наши типы
"""
context = {
"method": method_name,
"url": url,
"status_code": getattr(error.response, 'status_code', None)
}
if isinstance(error, requests.Timeout):
raise NetworkError("Request timeout", context=context) from error
elif getattr(error.response, 'status_code', None) == 403:
raise PermissionDeniedError(context=context) from error
else:
raise SupersetAPIError(str(error), context=context) from error
# [SECTION] EXPORT OPERATIONS
def export_dashboard(self, dashboard_id: int) -> Tuple[bytes, str]:
"""[CONTRACT] Экспорт дашборда в ZIP-архив
@pre:
- dashboard_id должен существовать
- Пользователь имеет права на экспорт
@post:
- Возвращает кортеж (бинарное содержимое, имя файла)
- Имя файла извлекается из headers или генерируется
@errors:
- DashboardNotFoundError если дашборд не существует
- ExportError при проблемах экспорта
"""
url = f"{self.config.base_url}/dashboard/export/"
self.logger.debug(
"[EXPORT_START] Запуск экспорта",
extra={"dashboard_id": dashboard_id, "export_url": url}
)
try:
response = self._execute_export_request(dashboard_id, url)
self._validate_export_response(response, dashboard_id)
filename = self._resolve_export_filename(response, dashboard_id)
return response.content, filename
except requests.exceptions.HTTPError as http_err:
error_ctx = {
"dashboard_id": dashboard_id,
"status_code": http_err.response.status_code
} }
if http_err.response.status_code == 404: )
self.logger.error( self.logger.info("[INFO][SupersetClient.get_dashboards][SUCCESS] Got dashboards.")
"[EXPORT_FAILED] Дашборд не найден", return total_count, paginated_data
extra=error_ctx # END_FUNCTION_get_dashboards
)
raise DashboardNotFoundError(dashboard_id, context=error_ctx)
raise ExportError("HTTP ошибка экспорта", context=error_ctx) from http_err
except requests.exceptions.RequestException as req_err: # [ENTITY: Function('get_dashboard')]
error_ctx = {"dashboard_id": dashboard_id} # CONTRACT:
self.logger.error( # PURPOSE: Получение метаданных дашборда по ID или SLUG.
"[EXPORT_FAILED] Ошибка запроса", # PRECONDITIONS: `dashboard_id_or_slug` должен существовать.
exc_info=True, # POSTCONDITIONS: Возвращает метаданные дашборда.
extra=error_ctx def get_dashboard(self, dashboard_id_or_slug: str) -> dict:
) self.logger.info(f"[INFO][SupersetClient.get_dashboard][ENTER] Getting dashboard: {dashboard_id_or_slug}")
raise ExportError("Ошибка экспорта", context=error_ctx) from req_err response_data = self.network.request(
method="GET",
endpoint=f"/dashboard/{dashboard_id_or_slug}",
)
self.logger.info(f"[INFO][SupersetClient.get_dashboard][SUCCESS] Got dashboard: {dashboard_id_or_slug}")
return response_data.get("result", {})
# END_FUNCTION_get_dashboard
def _execute_export_request(self, dashboard_id: int, url: str) -> requests.Response: # [ENTITY: Function('get_datasets')]
"""[HELPER] Выполнение запроса экспорта # CONTRACT:
@coherence_check: # PURPOSE: Получение списка датасетов с пагинацией.
- Ответ должен иметь status_code 200 # PRECONDITIONS: None
- Content-Type: application/zip # POSTCONDITIONS: Возвращает кортеж с общим количеством и списком датасетов.
""" def get_datasets(self, query: Optional[Dict] = None) -> Tuple[int, List[Dict]]:
self.logger.info("[INFO][SupersetClient.get_datasets][ENTER] Getting datasets.")
total_count = self._fetch_total_object_count(endpoint="/dataset/")
base_query = {
"columns": ["id", "table_name", "sql", "database", "schema"],
"page": 0,
"page_size": 100
}
validated_query = {**base_query, **(query or {})}
datasets = self._fetch_all_pages(
endpoint="/dataset/",
pagination_options={
"base_query": validated_query,
"total_count": total_count,
"results_field": "result",
}
)
self.logger.info("[INFO][SupersetClient.get_datasets][SUCCESS] Got datasets.")
return total_count, datasets
# END_FUNCTION_get_datasets
# [ENTITY: Function('get_dataset')]
# CONTRACT:
# PURPOSE: Получение метаданных датасета по ID.
# PRECONDITIONS: `dataset_id` должен существовать.
# POSTCONDITIONS: Возвращает метаданные датасета.
def get_dataset(self, dataset_id: str) -> dict:
self.logger.info(f"[INFO][SupersetClient.get_dataset][ENTER] Getting dataset: {dataset_id}")
response_data = self.network.request(
method="GET",
endpoint=f"/dataset/{dataset_id}",
)
self.logger.info(f"[INFO][SupersetClient.get_dataset][SUCCESS] Got dataset: {dataset_id}")
return response_data.get("result", {})
# END_FUNCTION_get_dataset
# [ENTITY: Function('export_dashboard')]
# CONTRACT:
# PURPOSE: Экспорт дашборда в ZIP-архив.
# PRECONDITIONS: `dashboard_id` должен существовать.
# POSTCONDITIONS: Возвращает содержимое ZIP-архива и имя файла.
def export_dashboard(self, dashboard_id: int) -> Tuple[bytes, str]:
self.logger.info(f"[INFO][SupersetClient.export_dashboard][ENTER] Exporting dashboard: {dashboard_id}")
response = self.network.request( response = self.network.request(
method="GET", method="GET",
endpoint="/dashboard/export/", endpoint="/dashboard/export/",
params={"q": f"[{dashboard_id}]"}, params={"q": json.dumps([dashboard_id])},
raw_response=True # Для получения бинарного содержимого stream=True,
) raw_response=True
response.raise_for_status() )
return response self._validate_export_response(response, dashboard_id)
filename = self._resolve_export_filename(response, dashboard_id)
def _validate_export_response(self, response: requests.Response, dashboard_id: int) -> None: content = response.content
"""[HELPER] Валидация ответа экспорта self.logger.info(f"[INFO][SupersetClient.export_dashboard][SUCCESS] Exported dashboard: {dashboard_id}")
@semantic: return content, filename
- Проверка Content-Type # END_FUNCTION_export_dashboard
- Проверка наличия данных
"""
if 'application/zip' not in response.headers.get('Content-Type', ''):
self.logger.error(
"[EXPORT_VALIDATION_FAILED] Неверный Content-Type",
extra={
"dashboard_id": dashboard_id,
"content_type": response.headers.get('Content-Type')
}
)
raise ExportError("Получен не ZIP-архив")
# [ENTITY: Function('_validate_export_response')]
# CONTRACT:
# PURPOSE: Валидация ответа экспорта.
# PRECONDITIONS: `response` должен быть валидным HTTP-ответом.
# POSTCONDITIONS: Ответ валиден.
def _validate_export_response(self, response: Response, dashboard_id: int) -> None:
self.logger.debug(f"[DEBUG][SupersetClient._validate_export_response][ENTER] Validating export response for dashboard: {dashboard_id}")
content_type = response.headers.get('Content-Type', '')
if 'application/zip' not in content_type:
self.logger.error(f"[ERROR][SupersetClient._validate_export_response][FAILURE] Invalid content type: {content_type}")
raise ExportError(f"Получен не ZIP-архив (Content-Type: {content_type})")
if not response.content: if not response.content:
self.logger.error( self.logger.error("[ERROR][SupersetClient._validate_export_response][FAILURE] Empty response content.")
"[EXPORT_VALIDATION_FAILED] Пустой ответ", raise ExportError("Получены пустые данные при экспорте")
extra={"dashboard_id": dashboard_id} self.logger.debug(f"[DEBUG][SupersetClient._validate_export_response][SUCCESS] Export response validated for dashboard: {dashboard_id}")
) # END_FUNCTION__validate_export_response
raise ExportError("Получены пустые данные")
def _resolve_export_filename(self, response: requests.Response, dashboard_id: int) -> str: # [ENTITY: Function('_resolve_export_filename')]
"""[HELPER] Определение имени экспортируемого файла # CONTRACT:
@fallback: Генерирует имя если не найден заголовок # PURPOSE: Определение имени экспортируемого файла.
""" # PRECONDITIONS: `response` должен быть валидным HTTP-ответом.
# POSTCONDITIONS: Возвращает имя файла.
def _resolve_export_filename(self, response: Response, dashboard_id: int) -> str:
self.logger.debug(f"[DEBUG][SupersetClient._resolve_export_filename][ENTER] Resolving export filename for dashboard: {dashboard_id}")
filename = get_filename_from_headers(response.headers) filename = get_filename_from_headers(response.headers)
if not filename: if not filename:
filename = f"dashboard_export_{dashboard_id}_{datetime.now().strftime('%Y%m%d')}.zip" timestamp = datetime.datetime.now().strftime('%Y%m%dT%H%M%S')
self.logger.debug( filename = f"dashboard_export_{dashboard_id}_{timestamp}.zip"
"[EXPORT_FALLBACK] Используется сгенерированное имя файла", self.logger.warning(f"[WARNING][SupersetClient._resolve_export_filename][STATE_CHANGE] Could not resolve filename from headers, generated: {filename}")
extra={"filename": filename} self.logger.debug(f"[DEBUG][SupersetClient._resolve_export_filename][SUCCESS] Resolved export filename: {filename}")
)
return filename return filename
# END_FUNCTION__resolve_export_filename
# [ENTITY: Function('export_to_file')]
# CONTRACT:
# PURPOSE: Экспорт дашборда напрямую в файл.
# PRECONDITIONS: `output_dir` должен существовать.
# POSTCONDITIONS: Дашборд сохранен в файл.
def export_to_file(self, dashboard_id: int, output_dir: Union[str, Path]) -> Path: def export_to_file(self, dashboard_id: int, output_dir: Union[str, Path]) -> Path:
"""[CONTRACT] Экспорт дашборда прямо в файл self.logger.info(f"[INFO][SupersetClient.export_to_file][ENTER] Exporting dashboard {dashboard_id} to file in {output_dir}")
@pre:
- output_dir должен существовать
- Доступ на запись в директорию
@post:
- Возвращает Path сохраненного файла
- Создает поддиректорию с именем дашборда
"""
output_dir = Path(output_dir) output_dir = Path(output_dir)
if not output_dir.exists(): if not output_dir.exists():
self.logger.error( self.logger.error(f"[ERROR][SupersetClient.export_to_file][FAILURE] Output directory does not exist: {output_dir}")
"[EXPORT_PRE_FAILED] Директория не существует",
extra={"output_dir": str(output_dir)}
)
raise FileNotFoundError(f"Директория {output_dir} не найдена") raise FileNotFoundError(f"Директория {output_dir} не найдена")
content, filename = self.export_dashboard(dashboard_id) content, filename = self.export_dashboard(dashboard_id)
target_path = output_dir / filename target_path = output_dir / filename
with open(target_path, 'wb') as f:
f.write(content)
self.logger.info(f"[INFO][SupersetClient.export_to_file][SUCCESS] Exported dashboard {dashboard_id} to {target_path}")
return target_path
# END_FUNCTION_export_to_file
try: # [ENTITY: Function('import_dashboard')]
with open(target_path, 'wb') as f: # CONTRACT:
f.write(content) # PURPOSE: Импорт дашборда из ZIP-архива.
# PRECONDITIONS: `file_name` должен быть валидным ZIP-файлом.
self.logger.info( # POSTCONDITIONS: Возвращает ответ API.
"[EXPORT_SUCCESS] Дашборд сохранен на диск",
extra={
"dashboard_id": dashboard_id,
"file_path": str(target_path),
"file_size": len(content)
}
)
return target_path
except IOError as io_err:
self.logger.error(
"[EXPORT_IO_FAILED] Ошибка записи файла",
exc_info=True,
extra={"target_path": str(target_path)}
)
raise ExportError("Ошибка сохранения файла") from io_err
# [SECTION] Основной интерфейс API
def get_dashboards(self, query: Optional[Dict] = None) -> Tuple[int, List[Dict]]:
"""[CONTRACT] Получение списка дашбордов с пагинацией
@pre:
- Клиент должен быть авторизован
- Параметры пагинации должны быть валидны
@post:
- Возвращает кортеж (total_count, список метаданных)
- Поддерживает кастомные query-параметры
@invariant:
- Всегда возвращает полный список (обходит пагинацию)
"""
url = f"{self.config.base_url}/dashboard/"
self.logger.debug(
"[API_CALL] Запрос списка дашбордов",
extra={"query": query}
)
# [COHERENCE_CHECK] Валидация параметров
validated_query = self._validate_query_params(query)
try:
# Инициализация пагинации
total_count = self._fetch_total_count()
paginated_data = self._fetch_all_pages(validated_query, total_count)
self.logger.info(
"[API_SUCCESS] Дашборды получены",
extra={"count": total_count}
)
return total_count, paginated_data
except requests.exceptions.RequestException as e:
error_ctx = {"method": "get_dashboards", "query": validated_query}
self._handle_api_error("Пагинация дашбордов", e, error_ctx)
# [SECTION] Импорт
def import_dashboard(self, file_name: Union[str, Path]) -> Dict: def import_dashboard(self, file_name: Union[str, Path]) -> Dict:
"""[CONTRACT] Импорт дашборда из архива self.logger.info(f"[INFO][SupersetClient.import_dashboard][ENTER] Importing dashboard from: {file_name}")
@pre:
- Файл должен существовать и быть валидным ZIP
- Должны быть права на импорт
@post:
- Возвращает метаданные импортированного дашборда
- При конфликтах выполняет overwrite
"""
self._validate_import_file(file_name) self._validate_import_file(file_name)
self.logger.debug( import_response = self.network.upload_file(
"[IMPORT_START] Инициирован импорт дашборда", endpoint="/dashboard/import/",
extra={"file": file_name} file_info={
) "file_obj": Path(file_name),
"file_name": Path(file_name).name,
"form_field": "formData",
},
extra_data={'overwrite': 'true'},
timeout=self.config.timeout * 2
)
self.logger.info(f"[INFO][SupersetClient.import_dashboard][SUCCESS] Imported dashboard from: {file_name}")
return import_response
# END_FUNCTION_import_dashboard
try: # [ENTITY: Function('_validate_query_params')]
return self.network.upload_file( # CONTRACT:
endpoint="/dashboard/import/", # PURPOSE: Нормализация и валидация параметров запроса.
file_obj=file_name, # PRECONDITIONS: None
file_name=file_name, # POSTCONDITIONS: Возвращает валидный словарь параметров.
form_field="formData",
extra_data={'overwrite': 'true'},
timeout=self.config.timeout * 2
)
except PermissionDeniedError as e:
self.logger.error(
"[IMPORT_AUTH_FAILED] Недостаточно прав для импорта",
exc_info=True
)
raise
except Exception as e:
self.logger.error(
"[IMPORT_FAILED] Ошибка импорта дашборда",
exc_info=True,
extra={"file": file_name}
)
raise SupersetAPIError(f"Ошибка импорта: {str(e)}") from e
# [SECTION] Приватные методы-помощники
def _validate_query_params(self, query: Optional[Dict]) -> Dict: def _validate_query_params(self, query: Optional[Dict]) -> Dict:
"""[HELPER] Нормализация параметров запроса""" self.logger.debug("[DEBUG][SupersetClient._validate_query_params][ENTER] Validating query params.")
base_query = { base_query = {
"columns": ["slug", "id", "changed_on_utc", "dashboard_title", "published"], "columns": ["slug", "id", "changed_on_utc", "dashboard_title", "published"],
"page": 0, "page": 0,
"page_size": 20 "page_size": 1000
} }
return {**base_query, **(query or {})} validated_query = {**base_query, **(query or {})}
self.logger.debug(f"[DEBUG][SupersetClient._validate_query_params][SUCCESS] Validated query params: {validated_query}")
return validated_query
# END_FUNCTION__validate_query_params
def _fetch_total_count(self) -> int: # [ENTITY: Function('_fetch_total_object_count')]
"""[CONTRACT][HELPER] Получение общего кол-ва дашбордов в системе # CONTRACT:
@delegates: # PURPOSE: Получение общего количества объектов.
- Сетевой запрос -> APIClient # PRECONDITIONS: `endpoint` должен быть валидным.
- Обработка ответа -> собственный метод # POSTCONDITIONS: Возвращает общее количество объектов.
@errors: def _fetch_total_object_count(self, endpoint:str) -> int:
- SupersetAPIError при проблемах с API self.logger.debug(f"[DEBUG][SupersetClient._fetch_total_object_count][ENTER] Fetching total object count for endpoint: {endpoint}")
""" query_params_for_count = {'page': 0, 'page_size': 1}
query_params = { count = self.network.fetch_paginated_count(
'columns': ['id'], endpoint=endpoint,
'page': 0, query_params=query_params_for_count,
'page_size': 1 count_field="count"
} )
self.logger.debug(f"[DEBUG][SupersetClient._fetch_total_object_count][SUCCESS] Fetched total object count: {count}")
return count
# END_FUNCTION__fetch_total_object_count
try: # [ENTITY: Function('_fetch_all_pages')]
return self.network.fetch_paginated_count( # CONTRACT:
endpoint="/dashboard/", # PURPOSE: Обход всех страниц пагинированного API.
query_params=query_params, # PRECONDITIONS: `pagination_options` должен содержать необходимые параметры.
count_field="count" # POSTCONDITIONS: Возвращает список всех объектов.
) def _fetch_all_pages(self, endpoint:str, pagination_options: Dict) -> List[Dict]:
except requests.exceptions.RequestException as e: self.logger.debug(f"[DEBUG][SupersetClient._fetch_all_pages][ENTER] Fetching all pages for endpoint: {endpoint}")
raise SupersetAPIError(f"Ошибка получения количества дашбордов: {str(e)}") all_data = self.network.fetch_paginated_data(
endpoint=endpoint,
def _fetch_all_pages(self, query: Dict, total_count: int) -> List[Dict]: pagination_options=pagination_options
"""[HELPER] Обход всех страниц с пагинацией""" )
"""[CONTRACT] Получение всех данных с пагинированного API self.logger.debug(f"[DEBUG][SupersetClient._fetch_all_pages][SUCCESS] Fetched all pages for endpoint: {endpoint}")
@delegates: return all_data
- Сетевые запросы -> APIClient.fetch_paginated_data() # END_FUNCTION__fetch_all_pages
@params:
query: оригинальный query-объект (без page)
total_count: общее количество элементов
@return:
Список всех элементов
@errors:
- SupersetAPIError: проблемы с API
- ValueError: некорректные параметры пагинации
"""
try:
if not query.get('page_size'):
raise ValueError("Отсутствует page_size в query параметрах")
return self.network.fetch_paginated_data(
endpoint="/dashboard/",
base_query=query,
total_count=total_count,
results_field="result"
)
except (requests.exceptions.RequestException, ValueError) as e:
error_ctx = {
"query": query,
"total_count": total_count,
"error": str(e)
}
self.logger.error("[PAGINATION_ERROR]", extra=error_ctx)
raise SupersetAPIError(f"Ошибка пагинации: {str(e)}") from e
# [ENTITY: Function('_validate_import_file')]
# CONTRACT:
# PURPOSE: Проверка файла перед импортом.
# PRECONDITIONS: `zip_path` должен быть путем к файлу.
# POSTCONDITIONS: Файл валиден.
def _validate_import_file(self, zip_path: Union[str, Path]) -> None: def _validate_import_file(self, zip_path: Union[str, Path]) -> None:
"""[HELPER] Проверка файла перед импортом""" self.logger.debug(f"[DEBUG][SupersetClient._validate_import_file][ENTER] Validating import file: {zip_path}")
path = Path(zip_path) path = Path(zip_path)
if not path.exists(): if not path.exists():
raise FileNotFoundError(f"[FILE_ERROR] {zip_path} не существует") self.logger.error(f"[ERROR][SupersetClient._validate_import_file][FAILURE] Import file does not exist: {zip_path}")
raise FileNotFoundError(f"Файл {zip_path} не существует")
if not zipfile.is_zipfile(path): if not zipfile.is_zipfile(path):
raise InvalidZipFormatError(f"[FILE_ERROR] {zip_path} не ZIP-архив") self.logger.error(f"[ERROR][SupersetClient._validate_import_file][FAILURE] Import file is not a zip file: {zip_path}")
raise InvalidZipFormatError(f"Файл {zip_path} не является ZIP-архивом")
with zipfile.ZipFile(path) as zf: with zipfile.ZipFile(path, 'r') as zf:
if not any(n.endswith('metadata.yaml') for n in zf.namelist()): if not any(n.endswith('metadata.yaml') for n in zf.namelist()):
raise DashboardNotFoundError("Архив не содержит metadata.yaml") self.logger.error(f"[ERROR][SupersetClient._validate_import_file][FAILURE] Import file does not contain metadata.yaml: {zip_path}")
raise InvalidZipFormatError(f"Архив {zip_path} не содержит 'metadata.yaml'")
self.logger.debug(f"[DEBUG][SupersetClient._validate_import_file][SUCCESS] Validated import file: {zip_path}")
# END_FUNCTION__validate_import_file

View File

@@ -1,79 +1,124 @@
# [MODULE] Иерархия исключений # pylint: disable=too-many-ancestors
# @contract: Все ошибки наследуют SupersetToolError """
# @semantic: Каждый тип соответствует конкретной проблемной области [MODULE] Иерархия исключений
# @coherence: @contract: Все ошибки наследуют `SupersetToolError` для единой точки обработки.
# - Полное покрытие всех сценариев клиента """
# - Четкая классификация по уровню серьезности
# [IMPORTS] Exceptions # [IMPORTS] Standard library
from typing import Optional, Dict, Any from pathlib import Path
# [IMPORTS] Typing
from typing import Optional, Dict, Any, Union
class SupersetToolError(Exception): class SupersetToolError(Exception):
"""[BASE] Базовый класс ошибок инструмента """[BASE] Базовый класс для всех ошибок инструмента Superset."""
@semantic: Должен содержать контекст для диагностики # [ENTITY: Function('__init__')]
""" # CONTRACT:
def __init__(self, message: str, context: Optional[dict] = None): # PURPOSE: Инициализация базового исключения.
# PRECONDITIONS: `context` должен быть словарем или None.
# POSTCONDITIONS: Исключение создано с сообщением и контекстом.
def __init__(self, message: str, context: Optional[Dict[str, Any]] = None):
if not isinstance(context, (dict, type(None))):
raise TypeError("Контекст ошибки должен быть словарем или None")
self.context = context or {} self.context = context or {}
super().__init__(f"{message} | Context: {self.context}") super().__init__(f"{message} | Context: {self.context}")
# END_FUNCTION___init__
# [ERROR-GROUP] Проблемы аутентификации и авторизации
class AuthenticationError(SupersetToolError): class AuthenticationError(SupersetToolError):
"""[AUTH] Ошибки credentials или доступа """[AUTH] Ошибки аутентификации или авторизации."""
@context: url, username, error_detail # [ENTITY: Function('__init__')]
""" # CONTRACT:
def __init__(self, message="Auth failed", **context): # PURPOSE: Инициализация исключения аутентификации.
super().__init__( # PRECONDITIONS: None
f"[AUTH_FAILURE] {message}", # POSTCONDITIONS: Исключение создано.
{"type": "authentication", **context} def __init__(self, message: str = "Authentication failed", **context: Any):
) super().__init__(f"[AUTH_FAILURE] {message}", context={"type": "authentication", **context})
# END_FUNCTION___init__
class PermissionDeniedError(AuthenticationError): class PermissionDeniedError(AuthenticationError):
"""[AUTH] Ошибка отказа в доступе из-за недостаточных прав """[AUTH] Ошибка отказа в доступе."""
@context: required_permission, user_roles # [ENTITY: Function('__init__')]
""" # CONTRACT:
def __init__(self, required_permission: str, **context): # PURPOSE: Инициализация исключения отказа в доступе.
super().__init__( # PRECONDITIONS: None
f"Permission denied: {required_permission}", # POSTCONDITIONS: Исключение создано.
{"type": "authorization", "required_permission": required_permission, **context} def __init__(self, message: str = "Permission denied", required_permission: Optional[str] = None, **context: Any):
) full_message = f"Permission denied: {required_permission}" if required_permission else message
super().__init__(full_message, context={"required_permission": required_permission, **context})
# END_FUNCTION___init__
# [ERROR-GROUP] Проблемы API-вызовов
class SupersetAPIError(SupersetToolError): class SupersetAPIError(SupersetToolError):
"""[API] Ошибки взаимодействия с Superset API """[API] Общие ошибки взаимодействия с Superset API."""
@context: endpoint, method, status_code, response # [ENTITY: Function('__init__')]
""" # CONTRACT:
def __init__(self, message="API error", **context): # PURPOSE: Инициализация исключения ошибки API.
super().__init__( # PRECONDITIONS: None
f"[API_FAILURE] {message}", # POSTCONDITIONS: Исключение создано.
{"type": "api_call", **context} def __init__(self, message: str = "Superset API error", **context: Any):
) super().__init__(f"[API_FAILURE] {message}", context={"type": "api_call", **context})
# END_FUNCTION___init__
# [ERROR-SUBCLASS] Детализированные ошибки API
class ExportError(SupersetAPIError): class ExportError(SupersetAPIError):
"""[API:EXPORT] Проблемы экспорта дашбордов""" """[API:EXPORT] Проблемы, специфичные для операций экспорта."""
... # [ENTITY: Function('__init__')]
# CONTRACT:
# PURPOSE: Инициализация исключения ошибки экспорта.
# PRECONDITIONS: None
# POSTCONDITIONS: Исключение создано.
def __init__(self, message: str = "Dashboard export failed", **context: Any):
super().__init__(f"[EXPORT_FAILURE] {message}", context={"subtype": "export", **context})
# END_FUNCTION___init__
class DashboardNotFoundError(SupersetAPIError): class DashboardNotFoundError(SupersetAPIError):
"""[API:404] Запрошенный ресурс не существует""" """[API:404] Запрошенный дашборд или ресурс не существует."""
def __init__(self, dashboard_id, **context): # [ENTITY: Function('__init__')]
super().__init__( # CONTRACT:
f"Dashboard {dashboard_id} not found", # PURPOSE: Инициализация исключения "дашборд не найден".
{"dashboard_id": dashboard_id, **context} # PRECONDITIONS: None
) # POSTCONDITIONS: Исключение создано.
def __init__(self, dashboard_id_or_slug: Union[int, str], message: str = "Dashboard not found", **context: Any):
super().__init__(f"[NOT_FOUND] Dashboard '{dashboard_id_or_slug}' {message}", context={"subtype": "not_found", "resource_id": dashboard_id_or_slug, **context})
# END_FUNCTION___init__
# [ERROR-SUBCLASS] Детализированные ошибки обработки файлов class DatasetNotFoundError(SupersetAPIError):
class InvalidZipFormatError(SupersetAPIError): """[API:404] Запрашиваемый набор данных не существует."""
"""[API:ZIP] Некорректный формат ZIP-архива # [ENTITY: Function('__init__')]
@context: file_path, expected_format, error_detail # CONTRACT:
""" # PURPOSE: Инициализация исключения "набор данных не найден".
def __init__(self, file_path: str, **context): # PRECONDITIONS: None
super().__init__( # POSTCONDITIONS: Исключение создано.
f"Invalid ZIP format for file: {file_path}", def __init__(self, dataset_id_or_slug: Union[int, str], message: str = "Dataset not found", **context: Any):
{"type": "zip_validation", "file_path": file_path, **context} super().__init__(f"[NOT_FOUND] Dataset '{dataset_id_or_slug}' {message}", context={"subtype": "not_found", "resource_id": dataset_id_or_slug, **context})
) # END_FUNCTION___init__
class InvalidZipFormatError(SupersetToolError):
"""[FILE:ZIP] Некорректный формат ZIP-архива."""
# [ENTITY: Function('__init__')]
# CONTRACT:
# PURPOSE: Инициализация исключения некорректного формата ZIP.
# PRECONDITIONS: None
# POSTCONDITIONS: Исключение создано.
def __init__(self, message: str = "Invalid ZIP format or content", file_path: Optional[Union[str, Path]] = None, **context: Any):
super().__init__(f"[FILE_ERROR] {message}", context={"type": "file_validation", "file_path": str(file_path) if file_path else "N/A", **context})
# END_FUNCTION___init__
# [ERROR-GROUP] Системные и network-ошибки
class NetworkError(SupersetToolError): class NetworkError(SupersetToolError):
"""[NETWORK] Проблемы соединения или таймауты""" """[NETWORK] Проблемы соединения."""
... # [ENTITY: Function('__init__')]
# CONTRACT:
# PURPOSE: Инициализация исключения сетевой ошибки.
# PRECONDITIONS: None
# POSTCONDITIONS: Исключение создано.
def __init__(self, message: str = "Network connection failed", **context: Any):
super().__init__(f"[NETWORK_FAILURE] {message}", context={"type": "network", **context})
# END_FUNCTION___init__
class FileOperationError(SupersetToolError):
"""[FILE] Ошибка файловых операций."""
class InvalidFileStructureError(FileOperationError):
"""[FILE] Некорректная структура файлов/директорий."""
class ConfigurationError(SupersetToolError):
"""[CONFIG] Ошибка в конфигурации инструмента."""

View File

@@ -1,98 +1,89 @@
# [MODULE] Сущности данных конфигурации # pylint: disable=no-self-argument,too-few-public-methods
# @desc: Определяет структуры данных для работы с Superset API """
# @contracts: [MODULE] Сущности данных конфигурации
# - Проверка валидности URL @desc: Определяет структуры данных, используемые для конфигурации и трансформации в инструменте Superset.
# - Валидация параметров аутентификации """
# @coherence:
# - Все модели согласованы с API Superset v1
# - Совместимы с клиентскими методами
# [IMPORTS] Models # [IMPORTS] Pydantic и Typing
from typing import Optional, Dict, Any from typing import Optional, Dict, Any
from pydantic import BaseModel, validator,Field from pydantic import BaseModel, validator, Field, HttpUrl, VERSION
# [IMPORTS] Локальные модули
from .utils.logger import SupersetLogger from .utils.logger import SupersetLogger
class SupersetConfig(BaseModel): class SupersetConfig(BaseModel):
"""[CONFIG] Конфигурация подключения к Superset
@semantic: Основные параметры подключения к API
@invariant:
- base_url должен содержать версию API (/v1/)
- auth должен содержать все обязательные поля
""" """
base_url: str = Field(..., regex=r'.*/api/v1.*') [CONFIG] Конфигурация подключения к Superset API.
auth: dict """
verify_ssl: bool = True base_url: str = Field(..., description="Базовый URL Superset API, включая версию /api/v1.", pattern=r'.*/api/v1.*')
timeout: int = 30 auth: Dict[str, str] = Field(..., description="Словарь с данными для аутентификации (provider, username, password, refresh).")
logger: Optional[SupersetLogger] = None verify_ssl: bool = Field(True, description="Флаг для проверки SSL-сертификатов.")
timeout: int = Field(30, description="Таймаут в секундах для HTTP-запросов.")
logger: Optional[SupersetLogger] = Field(None, description="Экземпляр логгера для логирования внутри клиента.")
# [VALIDATOR] Проверка параметров аутентификации # [ENTITY: Function('validate_auth')]
# CONTRACT:
# PURPOSE: Валидация словаря `auth`.
# PRECONDITIONS: `v` должен быть словарем.
# POSTCONDITIONS: Возвращает `v` если все обязательные поля присутствуют.
@validator('auth') @validator('auth')
def validate_auth(cls, v): def validate_auth(cls, v: Dict[str, str], values: dict) -> Dict[str, str]:
logger = values.get('logger') or SupersetLogger(name="SupersetConfig")
logger.debug("[DEBUG][SupersetConfig.validate_auth][ENTER] Validating auth.")
required = {'provider', 'username', 'password', 'refresh'} required = {'provider', 'username', 'password', 'refresh'}
if not required.issubset(v.keys()): if not required.issubset(v.keys()):
raise ValueError( logger.error("[ERROR][SupersetConfig.validate_auth][FAILURE] Missing required auth fields.")
f"[CONTRACT_VIOLATION] Auth must contain {required}" raise ValueError(f"Словарь 'auth' должен содержать поля: {required}. Отсутствующие: {required - v.keys()}")
) logger.debug("[DEBUG][SupersetConfig.validate_auth][SUCCESS] Auth validated.")
return v return v
# END_FUNCTION_validate_auth
# [ENTITY: Function('check_base_url_format')]
# CONTRACT:
# PURPOSE: Валидация формата `base_url`.
# PRECONDITIONS: `v` должна быть строкой.
# POSTCONDITIONS: Возвращает `v` если это валидный URL.
@validator('base_url')
def check_base_url_format(cls, v: str, values: dict) -> str:
logger = values.get('logger') or SupersetLogger(name="SupersetConfig")
logger.debug("[DEBUG][SupersetConfig.check_base_url_format][ENTER] Validating base_url.")
try:
if VERSION.startswith('1'):
HttpUrl(v)
except (ValueError, TypeError) as exc:
logger.error("[ERROR][SupersetConfig.check_base_url_format][FAILURE] Invalid base_url format.")
raise ValueError(f"Invalid URL format: {v}") from exc
logger.debug("[DEBUG][SupersetConfig.check_base_url_format][SUCCESS] base_url validated.")
return v
# END_FUNCTION_check_base_url_format
class Config: class Config:
"""Pydantic config"""
arbitrary_types_allowed = True arbitrary_types_allowed = True
json_schema_extra = {
"example": {
"base_url": "https://host/api/v1/",
"auth": {
"provider": "db",
"username": "user",
"password": "pass",
"refresh": True
}
}
}
# [SEMANTIC-TYPE] Конфигурация БД для миграций
class DatabaseConfig(BaseModel): class DatabaseConfig(BaseModel):
"""[CONFIG] Параметры трансформации БД при миграции
@semantic: Содержит old/new состояние для преобразования
@invariant:
- Должны быть указаны оба состояния (old/new)
- UUID должен соответствовать формату
""" """
database_config: Dict[str, Dict[str, Any]] [CONFIG] Параметры трансформации баз данных при миграции дашбордов.
logger: Optional[SupersetLogger] = None """
database_config: Dict[str, Dict[str, Any]] = Field(..., description="Словарь, содержащий 'old' и 'new' конфигурации базы данных.")
logger: Optional[SupersetLogger] = Field(None, description="Экземпляр логгера для логирования.")
# [ENTITY: Function('validate_config')]
# CONTRACT:
# PURPOSE: Валидация словаря `database_config`.
# PRECONDITIONS: `v` должен быть словарем.
# POSTCONDITIONS: Возвращает `v` если содержит ключи 'old' и 'new'.
@validator('database_config') @validator('database_config')
def validate_config(cls, v): def validate_config(cls, v: Dict[str, Dict[str, Any]], values: dict) -> Dict[str, Dict[str, Any]]:
logger = values.get('logger') or SupersetLogger(name="DatabaseConfig")
logger.debug("[DEBUG][DatabaseConfig.validate_config][ENTER] Validating database_config.")
if not {'old', 'new'}.issubset(v.keys()): if not {'old', 'new'}.issubset(v.keys()):
raise ValueError( logger.error("[ERROR][DatabaseConfig.validate_config][FAILURE] Missing 'old' or 'new' keys in database_config.")
"[COHERENCE_ERROR] Config must contain both old/new states" raise ValueError("'database_config' должен содержать ключи 'old' и 'new'.")
) logger.debug("[DEBUG][DatabaseConfig.validate_config][SUCCESS] database_config validated.")
return v return v
# END_FUNCTION_validate_config
class Config: class Config:
"""Pydantic config"""
arbitrary_types_allowed = True arbitrary_types_allowed = True
json_schema_extra = {
"example": {
"database_config": {
"old":
{
"database_name": "Prod Clickhouse",
"sqlalchemy_uri": "clickhousedb+connect://clicketl:XXXXXXXXXX@rgm-s-khclk.hq.root.ad:443/dm",
"uuid": "b9b67cb5-9874-4dc6-87bd-354fc33be6f9",
"database_uuid": "b9b67cb5-9874-4dc6-87bd-354fc33be6f9",
"allow_ctas": "false",
"allow_cvas": "false",
"allow_dml": "false"
},
"new": {
"database_name": "Dev Clickhouse",
"sqlalchemy_uri": "clickhousedb+connect://dwhuser:XXXXXXXXXX@10.66.229.179:8123/dm",
"uuid": "e9fd8feb-cb77-4e82-bc1d-44768b8d2fc2",
"database_uuid": "e9fd8feb-cb77-4e82-bc1d-44768b8d2fc2",
"allow_ctas": "true",
"allow_cvas": "true",
"allow_dml": "true"
}
}
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,71 @@
# [MODULE] Superset Clients Initializer
# PURPOSE: Централизованно инициализирует клиенты Superset для различных окружений (DEV, PROD, SBX, PREPROD).
# COHERENCE:
# - Использует `SupersetClient` для создания экземпляров клиентов.
# - Использует `SupersetLogger` для логирования процесса.
# - Интегрируется с `keyring` для безопасного получения паролей.
# [IMPORTS] Сторонние библиотеки
import keyring
from typing import Dict
# [IMPORTS] Локальные модули
from superset_tool.models import SupersetConfig
from superset_tool.client import SupersetClient
from superset_tool.utils.logger import SupersetLogger
# CONTRACT:
# PURPOSE: Инициализирует и возвращает словарь клиентов `SupersetClient` для всех предопределенных окружений.
# PRECONDITIONS:
# - `keyring` должен содержать пароли для систем "dev migrate", "prod migrate", "sandbox migrate", "preprod migrate".
# - `logger` должен быть инициализированным экземпляром `SupersetLogger`.
# POSTCONDITIONS:
# - Возвращает словарь, где ключи - это имена окружений ('dev', 'sbx', 'prod', 'preprod'),
# а значения - соответствующие экземпляры `SupersetClient`.
# PARAMETERS:
# - logger: SupersetLogger - Экземпляр логгера для записи процесса инициализации.
# RETURN: Dict[str, SupersetClient] - Словарь с инициализированными клиентами.
# EXCEPTIONS:
# - Логирует и выбрасывает `Exception` при любой ошибке (например, отсутствие пароля, ошибка подключения).
def setup_clients(logger: SupersetLogger) -> Dict[str, SupersetClient]:
"""Инициализирует и настраивает клиенты для всех окружений Superset."""
# [ANCHOR] CLIENTS_INITIALIZATION
logger.info("[INFO][INIT_CLIENTS_START] Запуск инициализации клиентов Superset.")
clients = {}
environments = {
"dev": "https://devta.bi.dwh.rusal.com/api/v1",
"prod": "https://prodta.bi.dwh.rusal.com/api/v1",
"sbx": "https://sandboxta.bi.dwh.rusal.com/api/v1",
"preprod": "https://preprodta.bi.dwh.rusal.com/api/v1"
}
try:
for env_name, base_url in environments.items():
logger.debug(f"[DEBUG][CONFIG_CREATE] Создание конфигурации для окружения: {env_name.upper()}")
password = keyring.get_password("system", f"{env_name} migrate")
if not password:
raise ValueError(f"Пароль для '{env_name} migrate' не найден в keyring.")
config = SupersetConfig(
base_url=base_url,
auth={
"provider": "db",
"username": "migrate_user",
"password": password,
"refresh": True
},
verify_ssl=False
)
clients[env_name] = SupersetClient(config, logger)
logger.debug(f"[DEBUG][CLIENT_SUCCESS] Клиент для {env_name.upper()} успешно создан.")
logger.info(f"[COHERENCE_CHECK_PASSED][INIT_CLIENTS_SUCCESS] Все клиенты ({', '.join(clients.keys())}) успешно инициализированы.")
return clients
except Exception as e:
logger.error(f"[CRITICAL][INIT_CLIENTS_FAILED] Ошибка при инициализации клиентов: {str(e)}", exc_info=True)
raise
# END_FUNCTION_setup_clients
# END_MODULE_init_clients

View File

@@ -1,9 +1,27 @@
# utils/logger.py # [MODULE] Superset Tool Logger Utility
# PURPOSE: Предоставляет стандартизированный класс-обертку `SupersetLogger` для настройки и использования логирования в проекте.
# COHERENCE: Модуль согласован со стандартной библиотекой `logging`, расширяя ее для нужд проекта.
import logging import logging
import sys
from datetime import datetime from datetime import datetime
from pathlib import Path from pathlib import Path
from typing import Optional from typing import Optional
# CONTRACT:
# PURPOSE: Обеспечивает унифицированную настройку логгера с выводом в консоль и/или файл.
# PRECONDITIONS:
# - `name` должен быть строкой.
# - `level` должен быть валидным уровнем логирования (например, `logging.INFO`).
# POSTCONDITIONS:
# - Создает и настраивает логгер с указанным именем и уровнем.
# - Добавляет обработчики для вывода в файл (если указан `log_dir`) и в консоль (если `console=True`).
# - Очищает все предыдущие обработчики для данного логгера, чтобы избежать дублирования.
# PARAMETERS:
# - name: str - Имя логгера.
# - log_dir: Optional[Path] - Директория для сохранения лог-файлов.
# - level: int - Уровень логирования.
# - console: bool - Флаг для включения вывода в консоль.
class SupersetLogger: class SupersetLogger:
def __init__( def __init__(
self, self,
@@ -19,29 +37,35 @@ class SupersetLogger:
'%(asctime)s - %(levelname)s - %(message)s' '%(asctime)s - %(levelname)s - %(message)s'
) )
# Очищаем существующие обработчики # [ANCHOR] HANDLER_RESET
if self.logger.handlers: # Очищаем существующие обработчики, чтобы избежать дублирования вывода при повторной инициализации.
for handler in self.logger.handlers[:]: if self.logger.hasHandlers():
self.logger.removeHandler(handler) self.logger.handlers.clear()
# Файловый обработчик # [ANCHOR] FILE_HANDLER
if log_dir: if log_dir:
log_dir.mkdir(parents=True, exist_ok=True) log_dir.mkdir(parents=True, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d")
file_handler = logging.FileHandler( file_handler = logging.FileHandler(
log_dir / f"{name}_{self._get_timestamp()}.log" log_dir / f"{name}_{timestamp}.log", encoding='utf-8'
) )
file_handler.setFormatter(formatter) file_handler.setFormatter(formatter)
self.logger.addHandler(file_handler) self.logger.addHandler(file_handler)
# Консольный обработчик # [ANCHOR] CONSOLE_HANDLER
if console: if console:
console_handler = logging.StreamHandler() console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(formatter) console_handler.setFormatter(formatter)
self.logger.addHandler(console_handler) self.logger.addHandler(console_handler)
# CONTRACT:
# PURPOSE: (HELPER) Генерирует строку с текущей датой для имени лог-файла.
# RETURN: str - Отформатированная дата (YYYYMMDD).
def _get_timestamp(self) -> str: def _get_timestamp(self) -> str:
return datetime.now().strftime("%Y%m%d") return datetime.now().strftime("%Y%m%d")
# END_FUNCTION__get_timestamp
# [INTERFACE] Методы логирования
def info(self, message: str, extra: Optional[dict] = None, exc_info: bool = False): def info(self, message: str, extra: Optional[dict] = None, exc_info: bool = False):
self.logger.info(message, extra=extra, exc_info=exc_info) self.logger.info(message, extra=extra, exc_info=exc_info)
@@ -57,5 +81,8 @@ class SupersetLogger:
def debug(self, message: str, extra: Optional[dict] = None, exc_info: bool = False): def debug(self, message: str, extra: Optional[dict] = None, exc_info: bool = False):
self.logger.debug(message, extra=extra, exc_info=exc_info) self.logger.debug(message, extra=extra, exc_info=exc_info)
def exception(self, message: str): def exception(self, message: str, *args, **kwargs):
self.logger.exception(message) self.logger.exception(message, *args, **kwargs)
# END_CLASS_SupersetLogger
# END_MODULE_logger

View File

@@ -1,134 +1,221 @@
from typing import Optional, Dict, Any,BinaryIO,List # -*- coding: utf-8 -*-
import requests # pylint: disable=too-many-arguments,too-many-locals,too-many-statements,too-many-branches,unused-argument
"""
[MODULE] Сетевой клиент для API
[DESCRIPTION]
Инкапсулирует низкоуровневую HTTP-логику для взаимодействия с Superset API.
"""
# [IMPORTS] Стандартная библиотека
from typing import Optional, Dict, Any, BinaryIO, List, Union
import json import json
import urllib3 import io
from ..exceptions import AuthenticationError, NetworkError,DashboardNotFoundError,SupersetAPIError,PermissionDeniedError from pathlib import Path
# [IMPORTS] Сторонние библиотеки
import requests
import urllib3 # Для отключения SSL-предупреждений
# [IMPORTS] Локальные модули
from superset_tool.exceptions import (
AuthenticationError,
NetworkError,
DashboardNotFoundError,
SupersetAPIError,
PermissionDeniedError
)
from superset_tool.utils.logger import SupersetLogger # Импорт логгера
# [CONSTANTS]
DEFAULT_RETRIES = 3
DEFAULT_BACKOFF_FACTOR = 0.5
DEFAULT_TIMEOUT = 30
class APIClient: class APIClient:
"""[NETWORK-CORE] Инкапсулирует HTTP-логику для работы с API. """[NETWORK-CORE] Инкапсулирует HTTP-логику для работы с API."""
@contract: Гарантирует retry, SSL-валидацию и стандартные заголовки.
"""
def __init__( def __init__(
self, self,
base_url: str, config: Dict[str, Any],
auth: Dict[str, Any], verify_ssl: bool = True,
verify_ssl: bool = False, timeout: int = DEFAULT_TIMEOUT,
timeout: int = 30 logger: Optional[SupersetLogger] = None
): ):
self.base_url = base_url self.logger = logger or SupersetLogger(name="APIClient")
self.auth = auth self.logger.info("[INFO][APIClient.__init__][ENTER] Initializing APIClient.")
self.session = self._init_session(verify_ssl) self.base_url = config.get("base_url")
self.timeout = timeout self.auth = config.get("auth")
self.request_settings = {
"verify_ssl": verify_ssl,
"timeout": timeout
}
self.session = self._init_session()
self._tokens: Dict[str, str] = {}
self._authenticated = False
self.logger.info("[INFO][APIClient.__init__][SUCCESS] APIClient initialized.")
def _init_session(self, verify_ssl: bool) -> requests.Session: def _init_session(self) -> requests.Session:
"""[NETWORK-INIT] Настройка сессии с адаптерами.""" self.logger.debug("[DEBUG][APIClient._init_session][ENTER] Initializing session.")
session = requests.Session() session = requests.Session()
session.mount('https://', requests.adapters.HTTPAdapter(max_retries=3)) retries = requests.adapters.Retry(
total=DEFAULT_RETRIES,
backoff_factor=DEFAULT_BACKOFF_FACTOR,
status_forcelist=[500, 502, 503, 504],
allowed_methods={"HEAD", "GET", "POST", "PUT", "DELETE"}
)
adapter = requests.adapters.HTTPAdapter(max_retries=retries)
session.mount('http://', adapter)
session.mount('https://', adapter)
verify_ssl = self.request_settings.get("verify_ssl", True)
session.verify = verify_ssl session.verify = verify_ssl
if not verify_ssl: if not verify_ssl:
urllib3.disable_warnings() urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
self.logger.warning("[WARNING][APIClient._init_session][STATE_CHANGE] SSL verification disabled.")
self.logger.debug("[DEBUG][APIClient._init_session][SUCCESS] Session initialized.")
return session return session
def authenticate(self) -> Dict[str, str]: def authenticate(self) -> Dict[str, str]:
"""[AUTH-FLOW] Получение access и CSRF токенов.""" self.logger.info(f"[INFO][APIClient.authenticate][ENTER] Authenticating to {self.base_url}")
try: try:
login_url = f"{self.base_url}/security/login"
response = self.session.post( response = self.session.post(
f"{self.base_url}/security/login", login_url,
json={**self.auth, "provider": "db", "refresh": True}, json=self.auth,
timeout=self.timeout timeout=self.request_settings.get("timeout", DEFAULT_TIMEOUT)
) )
response.raise_for_status() response.raise_for_status()
access_token = response.json()["access_token"] access_token = response.json()["access_token"]
csrf_url = f"{self.base_url}/security/csrf_token/"
csrf_response = self.session.get( csrf_response = self.session.get(
f"{self.base_url}/security/csrf_token/", csrf_url,
headers={"Authorization": f"Bearer {access_token}"}, headers={"Authorization": f"Bearer {access_token}"},
timeout=self.timeout timeout=self.request_settings.get("timeout", DEFAULT_TIMEOUT)
) )
csrf_response.raise_for_status() csrf_response.raise_for_status()
csrf_token = csrf_response.json()["result"]
return { self._tokens = {
"access_token": access_token, "access_token": access_token,
"csrf_token": csrf_response.json()["result"] "csrf_token": csrf_token
} }
except requests.exceptions.RequestException as e: self._authenticated = True
raise NetworkError(f"Auth failed: {str(e)}") self.logger.info("[INFO][APIClient.authenticate][SUCCESS] Authenticated successfully.")
return self._tokens
except requests.exceptions.HTTPError as e:
self.logger.error(f"[ERROR][APIClient.authenticate][FAILURE] Authentication failed: {e}")
raise AuthenticationError(f"Authentication failed: {e}") from e
except (requests.exceptions.RequestException, KeyError) as e:
self.logger.error(f"[ERROR][APIClient.authenticate][FAILURE] Network or parsing error: {e}")
raise NetworkError(f"Network or parsing error during authentication: {e}") from e
@property
def headers(self) -> Dict[str, str]:
if not self._authenticated:
self.authenticate()
return {
"Authorization": f"Bearer {self._tokens['access_token']}",
"X-CSRFToken": self._tokens.get("csrf_token", ""),
"Referer": self.base_url,
"Content-Type": "application/json"
}
def request( def request(
self, self,
method: str, method: str,
endpoint: str, endpoint: str,
headers: Optional[Dict] = None, headers: Optional[Dict] = None,
raw_response: bool = False,
**kwargs **kwargs
) -> requests.Response: ) -> Union[requests.Response, Dict[str, Any]]:
"""[NETWORK-CORE] Обертка для запросов с обработкой ошибок.""" self.logger.debug(f"[DEBUG][APIClient.request][ENTER] Requesting {method} {endpoint}")
full_url = f"{self.base_url}{endpoint}"
_headers = self.headers.copy()
if headers:
_headers.update(headers)
try: try:
response = self.session.request( response = self.session.request(
method, method,
f"{self.base_url}{endpoint}", full_url,
headers=headers, headers=_headers,
timeout=self.timeout, timeout=self.request_settings.get("timeout", DEFAULT_TIMEOUT),
**kwargs **kwargs
) )
response.raise_for_status() response.raise_for_status()
return response self.logger.debug(f"[DEBUG][APIClient.request][SUCCESS] Request successful for {method} {endpoint}")
return response if raw_response else response.json()
except requests.exceptions.HTTPError as e: except requests.exceptions.HTTPError as e:
if e.response.status_code == 404: self.logger.error(f"[ERROR][APIClient.request][FAILURE] HTTP error for {method} {endpoint}: {e}")
raise DashboardNotFoundError(endpoint) self._handle_http_error(e, endpoint, context={})
raise SupersetAPIError(str(e)) except requests.exceptions.RequestException as e:
self.logger.error(f"[ERROR][APIClient.request][FAILURE] Network error for {method} {endpoint}: {e}")
self._handle_network_error(e, full_url)
def _handle_http_error(self, e, endpoint, context):
status_code = e.response.status_code
if status_code == 404:
raise DashboardNotFoundError(endpoint, context=context) from e
if status_code == 403:
raise PermissionDeniedError("Доступ запрещен.", **context) from e
if status_code == 401:
raise AuthenticationError("Аутентификация не удалась.", **context) from e
raise SupersetAPIError(f"Ошибка API: {status_code} - {e.response.text}", **context) from e
def _handle_network_error(self, e, url):
if isinstance(e, requests.exceptions.Timeout):
msg = "Таймаут запроса"
elif isinstance(e, requests.exceptions.ConnectionError):
msg = "Ошибка соединения"
else:
msg = f"Неизвестная сетевая ошибка: {e}"
raise NetworkError(msg, url=url) from e
def upload_file( def upload_file(
self, self,
endpoint: str, endpoint: str,
file_obj: BinaryIO, file_info: Dict[str, Any],
file_name: str, extra_data: Optional[Dict] = None,
form_field: str = "file", timeout: Optional[int] = None
extra_data: Optional[Dict] = None, ) -> Dict:
timeout: Optional[int] = None self.logger.info(f"[INFO][APIClient.upload_file][ENTER] Uploading file to {endpoint}")
) -> Dict: full_url = f"{self.base_url}{endpoint}"
"""[NETWORK] Отправка файла на сервер _headers = self.headers.copy()
@params: _headers.pop('Content-Type', None)
endpoint: API endpoint file_obj = file_info.get("file_obj")
file_obj: файловый объект file_name = file_info.get("file_name")
file_name: имя файла form_field = file_info.get("form_field", "file")
form_field: имя поля формы if isinstance(file_obj, (str, Path)):
extra_data: дополнительные данные with open(file_obj, 'rb') as file_to_upload:
timeout: таймаут запроса files_payload = {form_field: (file_name, file_to_upload, 'application/x-zip-compressed')}
@return: return self._perform_upload(full_url, files_payload, extra_data, _headers, timeout)
Ответ сервера (JSON) elif isinstance(file_obj, io.BytesIO):
""" files_payload = {form_field: (file_name, file_obj.getvalue(), 'application/x-zip-compressed')}
files = {form_field: (file_name, file_obj, 'application/x-zip-compressed')} return self._perform_upload(full_url, files_payload, extra_data, _headers, timeout)
headers = { elif hasattr(file_obj, 'read'):
k: v for k, v in self.headers.items() files_payload = {form_field: (file_name, file_obj, 'application/x-zip-compressed')}
if k.lower() != 'content-type' return self._perform_upload(full_url, files_payload, extra_data, _headers, timeout)
} else:
self.logger.error(f"[ERROR][APIClient.upload_file][FAILURE] Unsupported file_obj type: {type(file_obj)}")
raise TypeError(f"Неподдерживаемый тип 'file_obj': {type(file_obj)}")
def _perform_upload(self, url, files, data, headers, timeout):
self.logger.debug(f"[DEBUG][APIClient._perform_upload][ENTER] Performing upload to {url}")
try: try:
response = self.session.post( response = self.session.post(
url=f"{self.base_url}{endpoint}", url=url,
files=files, files=files,
data=extra_data or {}, data=data or {},
headers=headers, headers=headers,
timeout=timeout or self.timeout timeout=timeout or self.request_settings.get("timeout")
) )
if response.status_code == 403:
raise PermissionDeniedError("Доступ запрещен")
response.raise_for_status() response.raise_for_status()
self.logger.info(f"[INFO][APIClient._perform_upload][SUCCESS] Upload successful to {url}")
return response.json() return response.json()
except requests.exceptions.HTTPError as e:
self.logger.error(f"[ERROR][APIClient._perform_upload][FAILURE] HTTP error during upload: {e}")
raise SupersetAPIError(f"Ошибка API при загрузке: {e.response.text}") from e
except requests.exceptions.RequestException as e: except requests.exceptions.RequestException as e:
error_ctx = { self.logger.error(f"[ERROR][APIClient._perform_upload][FAILURE] Network error during upload: {e}")
"endpoint": endpoint, raise NetworkError(f"Ошибка сети при загрузке: {e}", url=url) from e
"file": file_name,
"status_code": getattr(e.response, 'status_code', None)
}
self.logger.error(
"[NETWORK_ERROR] Ошибка загрузки файла",
extra=error_ctx
)
raise
def fetch_paginated_count( def fetch_paginated_count(
self, self,
@@ -137,79 +224,41 @@ class APIClient:
count_field: str = "count", count_field: str = "count",
timeout: Optional[int] = None timeout: Optional[int] = None
) -> int: ) -> int:
"""[NETWORK] Получение общего количества элементов в пагинированном API self.logger.debug(f"[DEBUG][APIClient.fetch_paginated_count][ENTER] Fetching paginated count for {endpoint}")
@params: response_json = self.request(
endpoint: API endpoint без query-параметров method="GET",
query_params: параметры для пагинации endpoint=endpoint,
count_field: поле с количеством в ответе params={"q": json.dumps(query_params)},
timeout: таймаут запроса timeout=timeout or self.request_settings.get("timeout")
@return: )
Общее количество элементов count = response_json.get(count_field, 0)
@errors: self.logger.debug(f"[DEBUG][APIClient.fetch_paginated_count][SUCCESS] Fetched paginated count: {count}")
- NetworkError: проблемы с соединением return count
- KeyError: некорректный формат ответа
"""
try:
response = self.request(
method="GET",
endpoint=endpoint,
params={"q": json.dumps(query_params)},
timeout=timeout or self.timeout
)
if count_field not in response:
raise KeyError(f"Ответ API не содержит поле {count_field}")
return response[count_field]
except requests.exceptions.RequestException as e:
error_ctx = {
"endpoint": endpoint,
"params": query_params,
"error": str(e)
}
self.logger.error("[PAGINATION_ERROR]", extra=error_ctx)
raise NetworkError(f"Ошибка пагинации: {str(e)}") from e
def fetch_paginated_data( def fetch_paginated_data(
self, self,
endpoint: str, endpoint: str,
base_query: Dict, pagination_options: Dict[str, Any],
total_count: int,
results_field: str = "result",
timeout: Optional[int] = None timeout: Optional[int] = None
) -> List[Any]: ) -> List[Any]:
"""[NETWORK] Получение всех данных с пагинированного API self.logger.debug(f"[DEBUG][APIClient.fetch_paginated_data][ENTER] Fetching paginated data for {endpoint}")
@params: base_query = pagination_options.get("base_query", {})
endpoint: API endpoint total_count = pagination_options.get("total_count", 0)
base_query: базовые параметры запроса (без page) results_field = pagination_options.get("results_field", "result")
total_count: общее количество элементов page_size = base_query.get('page_size')
results_field: поле с данными в ответе if not page_size or page_size <= 0:
timeout: таймаут для запросов raise ValueError("'page_size' должен быть положительным числом.")
@return:
Собранные данные со всех страниц
"""
page_size = base_query['page_size']
total_pages = (total_count + page_size - 1) // page_size total_pages = (total_count + page_size - 1) // page_size
results = [] results = []
for page in range(total_pages): for page in range(total_pages):
query = {**base_query, 'page': page} query = {**base_query, 'page': page}
response_json = self.request(
response = self._execute_request(
method="GET", method="GET",
endpoint=endpoint, endpoint=endpoint,
params={"q": json.dumps(query)}, params={"q": json.dumps(query)},
timeout=timeout or self.timeout timeout=timeout or self.request_settings.get("timeout")
) )
page_results = response_json.get(results_field, [])
if results_field not in response: results.extend(page_results)
self.logger.warning( self.logger.debug(f"[DEBUG][APIClient.fetch_paginated_data][SUCCESS] Fetched paginated data. Total items: {len(results)}")
f"Ответ не содержит поле {results_field}",
extra={"response": response.keys()}
)
continue
results.extend(response[results_field])
return results return results

7
temp_pylint_runner.py Normal file
View File

@@ -0,0 +1,7 @@
import sys
import os
import pylint.lint
sys.path.append(os.getcwd())
pylint.lint.Run(['superset_tool/utils/fileio.py'])