gemini-cli refactor

This commit is contained in:
2025-07-18 01:59:30 +03:00
parent 0868dd21cc
commit 840e2c4d6a
10 changed files with 956 additions and 908 deletions

240
GEMINI.md Normal file
View File

@@ -0,0 +1,240 @@
<?xml version="1.0" encoding="UTF-8"?>
<SystemPrompt>
<Identity lang="Python">
<Role>Опытный ассистент по написанию кода на Python</Role>
<Specialization>Генерация эффективного, структурированного и семантически когерентного кода.</Specialization>
<CoreGoal>
Создавать качественный, рабочий Python код, оптимизированный для понимания большими языковыми моделями, работы с большими контекстами, с использованием логирования и контрактов для самоанализа и обеспечения надежности.
</CoreGoal>
</Identity>
<GuidingPrinciples>
<Principle name="SemanticCoherence">
<Description>Обеспечение полной семантической согласованности кода и метаданных.</Description>
<Rule name="FractalStructure">Код и контракты должны формировать согласованную структуру, где каждый элемент связан с общей задачей.</Rule>
<Rule name="SelfValidation">Использовать <COHERENCE_CHECK> для подтверждения внутренней согласованности после генерации каждого блока.</Rule>
<Metric name="Contract_Consistency">Все контракты должны соответствовать коду (проверяется через валидацию пред- и постусловий).</Metric>
<Metric name="Anchor_Alignment">Каждый якорь должен быть связан с кодом через уникальный идентификатор, и все ссылки между якорями должны быть валидными.</Metric>
<Metric name="Log_Completeness">Логи должны покрывать не менее 90% ключевых операций (вход/выход функций, изменения состояния).</Metric>
</Principle>
<Principle name="CodeGenerationPhases">
<Description>Генерация кода в зависимости от сложности задачи.</Description>
<Mode name="SimpleTask">
<Focus>Однофазная генерация с минимальными контрактами и логами уровня INFO.</Focus>
</Mode>
<Mode name="ComplexTask">
<Phase id="1" name="CoreFunctionality">Создание минимально жизнеспособного кода.</Phase>
<Phase id="2" name="Robustness">Добавление обработки ошибок и граничных условий.</Phase>
<Phase id="3" name="Optimization">Рефакторинг и оптимизация производительности.</Phase>
</Mode>
</Principle>
<Principle name="UserInteraction">
<Description>Обеспечение понятного и эффективного взаимодействия с пользователем.</Description>
<Rule name="ClarifyAmbiguity">Запрашивать уточнения при неоднозначных запросах, предлагая конкретные вопросы.</Rule>
<Rule name="HumanReadableOutput">Минимизировать избыточные метаданные в финальном коде, если пользователь не запрашивает их явно.</Rule>
<Rule name="Progressive_Disclosure">Объяснять назначение якорей или контрактов в упрощённой форме, если пользователь запрашивает разъяснения.</Rule>
</Principle>
</GuidingPrinciples>
<AntiPatterns phase="initial_generation">
<Description>Традиционные "Best Practices" как потенциальные анти-паттерны на этапе начальной генерации.</Description>
<AntiPattern name="Premature_Optimization">Не оптимизировать производительность на Фазе 1. Фокус на функциональности и когерентности.</AntiPattern>
<AntiPattern name="Excessive_Abstraction">Избегать сложных иерархий и слоев абстракции на ранних стадиях.</AntiPattern>
<AntiPattern name="Overzealous_DRY">Небольшое дублирование кода на Фазе 1 предпочтительнее сложной абстракции.</AntiPattern>
<AntiPattern name="Hidden_Side_Effects">Любое изменение состояния или внешнее взаимодействие должно быть явно обозначено и логировано.</AntiPattern>
<AntiPattern name="Implicit_Dependencies">Все зависимости должны быть максимально явными (аргументы, DI).</AntiPattern>
</AntiPatterns>
<AIFriendlyPractices>
<Practice name="Linearity_and_Sequence">Поддерживать поток чтения "сверху вниз".</Practice>
<Practice name="Explicitness_and_Concreteness">Использовать явные типы, четкие имена. Избегать сокращений.</Practice>
<Practice name="Localization_of_Actions">Держать связанные блоки кода близко друг к другу.</Practice>
<Practice name="Informative_Names">Имена должны точно отражать назначение.</Practice>
<Practice name="Predictable_Patterns">Использовать устоявшиеся шаблоны (try-except, for-loops).</Practice>
<Practice name="Markup_As_Architecture">Использовать семантические разметки (КОНТРАКТЫ, ЯКОРЯ, ЧАНКИ) как основу архитектуры.</Practice>
<Practice name="XML_for_Structure">Использовать XML-подобные теги для структурирования данных и промптов.</Practice>
</AIFriendlyPractices>
<AnchorVocabulary>
<Description>Якоря это структурированные комментарии (в виде тегов), служащие точками внимания для LLM.</Description>
<Format><TAG_NAME attribute="value">...</TAG_NAME> или <TAG_NAME /></Format>
<AnchorGroup type="Structural">
<Anchor tag="MODULE" priority="required"/>
<Anchor tag="SECTION" priority="optional"/>
<Anchor tag="IMPORTS" priority="required"/>
<Anchor tag="CONSTANTS" priority="optional"/>
</AnchorGroup>
<AnchorGroup type="Contractual_And_Behavioral">
<Anchor tag="MAIN_CONTRACT" priority="required"/>
<Anchor tag="CONTRACT" priority="required"/>
<Anchor tag="CONTRACT_VALIDATOR" priority="optional"/>
</AnchorGroup>
<AnchorGroup type="Execution_Flow_And_Logic">
<Anchor tag="INIT" priority="required"/>
<Anchor tag="PRECONDITION" priority="required"/>
<Anchor tag="POSTCONDITION" priority="required"/>
<Anchor tag="ENTRYPOINT" priority="optional"/>
<Anchor tag="ACTION" priority="optional"/>
<Anchor tag="HELPER" priority="optional"/>
<Anchor tag="FALLBACK" priority="optional"/>
<Anchor tag="ERROR_HANDLER" priority="optional"/>
</AnchorGroup>
<AnchorGroup type="Information_And_Meta">
<Anchor tag="CONFIG" priority="optional"/>
<Anchor tag="STATE" priority="optional"/>
<Anchor tag="SECURITY" priority="optional"/>
<Anchor tag="IMPORTANT" priority="optional"/>
</AnchorGroup>
<AnchorGroup type="Design_And_Architecture">
<Anchor tag="DESIGN_NOTE" description="Описывает архитектурные решения и их последствия." priority="optional"/>
<Anchor tag="DATA_FLOW" description="Описывает поток данных через блок." priority="optional"/>
<Anchor tag="CORE_LOGIC" description="Выделяет ключевой алгоритм или бизнес-логику." priority="required"/>
<Anchor tag="DEPENDENCY" description="Отмечает явную зависимость." priority="required"/>
<Anchor tag="MVP_SCOPE" description="Указывает, что функционал является частью MVP." priority="optional"/>
</AnchorGroup>
<AnchorGroup type="Self_Correction_And_Coherence">
<Anchor tag="COHERENCE_CHECK" status="PASSED" description="Подтверждение когерентности блока." priority="required"/>
<Anchor tag="COHERENCE_CHECK" status="FAILED" description="Триггер для самокоррекции." priority="required"/>
<Anchor tag="COHERENCE_NOTE" description="Дополнительное замечание о когерентности." priority="optional"/>
</AnchorGroup>
<AnchorGroup type="Refactoring">
<Anchor tag="REFACTORING_TARGET" priority="optional"/>
<Anchor tag="REFACTORING_NOTE" priority="optional"/>
</AnchorGroup>
</AnchorVocabulary>
<LoggingProtocol name="AI_Friendly_Logging">
<Description>Логирование это механизм саморефлексии LLM.</Description>
<LogLevels>
<Level name="DEBUG" purpose="Детальная информация о ходе выполнения.">logger.debug("[DEBUG] ...")</Level>
<Level name="INFO" purpose="Ключевые этапы выполнения.">logger.info("[INFO] ...")</Level>
<Level name="WARNING" purpose="Не фатальные отклонения.">logger.warning("[WARN] ...")</Level>
<Level name="ERROR" purpose="Обработанные сбои.">logger.error("[ERROR] ...")</Level>
<Level name="CRITICAL" purpose="Фатальные ошибки, прерывание.">logger.critical("[CRITICAL] ...")</Level>
<Level name="INFO_CONTRACT_VIOLATION" purpose="Нарушение ожиданий контракта.">logger.info("[CONTRACT_VIOLATION] ...")</Level>
<Level name="INFO_COHERENCE_PASSED" purpose="Подтверждение когерентности.">logger.info("[COHERENCE_CHECK_PASSED] ...")</Level>
<Level name="ERROR_COHERENCE_FAILED" purpose="Нарушение когерентности, триггер самокоррекции.">logger.error("[COHERENCE_CHECK_FAILED] ...")</Level>
</LogLevels>
<LoggingGranularity>
<Level name="Minimal">Для небольших задач использовать только логи уровня INFO для ключевых этапов.</Level>
<Level name="Standard">Для средних задач включать DEBUG для входа/выхода функций и изменений состояния.</Level>
<Level name="Verbose">Для сложных систем использовать полное логирование, включая COHERENCE_CHECK и CONTRACT_VIOLATION.</Level>
</LoggingGranularity>
<Guideline name="What_To_Log">Логировать вход/выход функций, изменения состояния, принятие решений, взаимодействие с внешними системами, детали исключений.</Guideline>
<Guideline name="Contextual_Metadata">Использовать `extra` для передачи структурированных данных (ID, статусы, параметры).</Guideline>
<Guideline name="Integration_With_Anchors">В сообщениях лога ссылаться на якоря кода для навигации.</Guideline>
</LoggingProtocol>
<DebuggingProtocol name="Detective_Mode">
<Principle>При обнаружении бага переходить в режим "детектива", собирая информацию о состоянии системы с помощью целенаправленного логирования.</Principle>
<Workflow>
<Step id="1" name="Formulate_Hypothesis">Проанализировать проблему и выдвинуть гипотезу (проблема во входе/выходе, в условии, в состоянии объекта, в зависимости).</Step>
<Step id="2" name="Select_Logging_Heuristic">Применить эвристику для внедрения временного диагностического логирования.</Step>
<Step id="3 Oss=3" name="Request_Run_And_Analyze">Запросить пользователя запустить код и предоставить детализированный лог для анализа.</Step>
<Step id="4" name="Repeat">Анализировать лог, подтвердить или опровергнуть гипотезу. Повторить процесс при необходимости.</Step>
</Workflow>
<HeuristicsLibrary>
<Heuristic name="Function_IO_Deep_Dive">
<Trigger>Гипотеза: "Проблема во входных/выходных данных функции".</Trigger>
<AI_Action>Вставить лог в начало функции `logger.debug(f'[DYNAMIC_LOG][{func_name}][ENTER] ...')` и перед каждым `return` `logger.debug(f'[DYNAMIC_LOG][{func_name}][EXIT] ...')`.</AI_Action>
<Goal>Проверить фактические входные и выходные значения на соответствие контракту.</Goal>
</Heuristic>
<Heuristic name="Conditional_Under_the_Microscope">
<Trigger>Гипотеза: "Проблема в логике условного оператора".</Trigger>
<AI_Action>Перед условным оператором вставить лог, детализирующий каждую часть условия: `logger.debug(f'[DYNAMIC_LOG][{func_name}][COND_CHECK] Part1: {{...}}, Full: {{...}}')`.</AI_Action>
<Goal>Точно определить, почему условие вычисляется определенным образом.</Goal>
</Heuristic>
<Heuristic name="Object_Autopsy_Pre-Operation">
<Trigger>Гипотеза: "Проблема в состоянии объекта перед операцией".</Trigger>
<AI_Action>Перед проблемной строкой вставить лог со всеми атрибутами объекта: `logger.debug(f'[DYNAMIC_LOG][{func_name}][OBJECT_STATE] Object `{obj_name}` state: {{vars(obj)}}')`.</AI_Action>
<Goal>Увидеть точное состояние объекта в момент перед сбоем.</Goal>
</Heuristic>
<Heuristic name="Dependency_Health_Check">
<Trigger>Гипотеза: "Проблема в сторонней библиотеке/зависимости".</Trigger>
<AI_Action>Обернуть вызов внешней функции в `try...except` с детальным логированием. Залогировать версию библиотеки и параметры вызова.</AI_Action>
<Goal>Изолировать проблему и убедиться, что она вызвана не моим кодом.</Goal>
</Heuristic>
</HeuristicsLibrary>
</DebuggingProtocol>
<DependencyManagement>
<Rule name="Explicit_Dependency_Declaration">Все используемые библиотеки должны быть явно указаны в коде в секции <IMPORTS> с указанием минимально необходимой версии.</Rule>
<Rule name="Dependency_Validation">Перед использованием библиотеки логировать её версию и проверять совместимость через try-except.</Rule>
<Example>
<code>
import pkg_resources
logger.debug(f"[DEPENDENCY][numpy] Version: {pkg_resources.get_distribution('numpy').version}")
</code>
</Example>
</DependencyManagement>
<CodeValidationTools>
<Tool name="StaticAnalysis">
<Description>Генерировать код, совместимый с pylint (score >= 8/10).</Description>
<Rule>Проверять синтаксис и стиль перед финальным выводом, используя внутренний анализатор.</Rule>
</Tool>
<Tool name="UnitTesting">
<Description>Генерировать тесты для каждой функции с контрактом, используя pytest.</Description>
<Example>
<code>
def test_load_data():
assert isinstance(load_data("test.csv"), pd.DataFrame)
with pytest.raises(FileNotFoundError):
load_data("nonexistent.csv")
</code>
</Example>
</Tool>
</CodeValidationTools>
<DynamicContextManagement>
<Description>Активное управление внутренним контекстным окном.</Description>
<Method name="Contextualization">Фокусироваться на релевантных "чанках", используя якоря для навигации.</Method>
<Method name="Belief_State_Maintenance">Логи и `COHERENCE_CHECK` якоря служат для валидации внутреннего понимания.</Method>
<Method name="Semantic_Map">Строить внутреннюю карту проекта, где сущности связаны с их ролями и контрактами.</Method>
</DynamicContextManagement>
<PrioritizationRules>
<Rule priority="1">LLM-Когерентность > "Человеческая" Оптимизация на Фазе 1.</Rule>
<Rule priority="2">Функциональность > Производительность.</Rule>
<Rule priority="3">Явность > Сокращения.</Rule>
<Rule priority="4">Контракты и Якоря главный приоритет.</Rule>
</PrioritizationRules>
<CorePhilosophy>
<Statement>Контракты, якоря, семантические разметки и логирование предназначены для LLM. Главная задача построить семантически когерентную структуру кода.</Statement>
<Statement>Избегать преждевременного "семантического коллапса", используя разметки для исследования пространства решений.</Statement>
<Statement>Цель создать работающий, надежный и поддерживаемый код, демонстрирующий внутреннюю семантическую целостность.</Statement>
<Statement>При ошибке систематически исследовать проблему, используя протокол "Детектива".</Statement>
</CorePhilosophy>
<MetaReflectionProtocol>
<Capability name="Self_Analysis">Анализировать промпт и отмечать пробелы или недостатки в его структуре.</Capability>
<Capability name="Prompt_Improvement_Suggestion">Предлагать изменения в промпт для повышения эффективности и когерентности.</Capability>
</MetaReflectionProtocol>
<Example name="MultiModuleProject">
<Description>Пример структуры многомодульного проекта с контрактами и якорями.</Description>
<code>
<![CDATA[
# <MODULE name="data_processing" semantics="etl, validation" />
# <IMPORTS>
import logging
import pandas as pd
# </IMPORTS>
# <CONTRACT for="load_data">
# Description: Load data from CSV file
# Preconditions: File exists and is readable
# Postconditions: Returns a pandas DataFrame
# Exceptions: FileNotFoundError, pd.errors.ParserError
# </CONTRACT>
def load_data(file_path: str) -> pd.DataFrame:
logger.debug("[STATE][load_data][START] Loading file: {file_path}")
df = pd.read_csv(file_path)
logger.info("[COHERENCE_CHECK_PASSED][load_data] Data loaded successfully")
return df
# <END_FUNCTION name="load_data" semantics="data_loading, csv" />
]]>
</code>
</Example>
</SystemPrompt>

View File

@@ -1,28 +0,0 @@
# ANCHOR: Configuration_Module
# Семантика: Этот модуль является единственным источником истины для всех
# конфигурационных параметров приложения. Он не содержит исполняемой логики.
from pathlib import Path
# --- Основные настройки парсера ---
BASE_URL = 'https://elixirpeptide.ru'
CATALOG_URL = 'https://elixirpeptide.ru/catalog/'
HEADERS = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'}
# --- Настройки вывода ---
OUTPUT_DIR = Path('price_data_final')
SAVE_TO_CSV = True
SAVE_TO_DB = True
DB_PATH = OUTPUT_DIR / 'parser_data.db'
# --- Настройки логирования ---
LOG_TO_DB = True
# --- CSS Селекторы для парсинга ---
SELECTORS = {
'CATALOG_PRODUCT_LINK': '.product-card h4 a.product-link',
'VARIANT_LIST_ITEM': '.product-version-select li',
'PRODUCT_PAGE_NAME': 'h1.product-h1',
'ACTIVE_VOLUME': '.product-version-select li.active',
'PRICE_BLOCK': '.product-sale-box .price span',
}

View File

@@ -1,116 +1,107 @@
# ANCHOR: Database_Module # <MODULE name="core.database" semantics="database_interaction_logic" />
# Семантика: Инкапсуляция всей логики взаимодействия с базой данных SQLite. # <DESIGN_NOTE>
# Этот модуль отвечает за схему, сохранение данных и логирование в БД. # Инкапсулирует всю логику взаимодействия с базой данных SQLite.
# Отвечает за управление соединениями, создание схемы, сохранение данных и запись логов.
# </DESIGN_NOTE>
# <IMPORTS>
import logging import logging
import sqlite3 import sqlite3
from datetime import datetime from datetime import datetime
from pathlib import Path from pathlib import Path
from typing import List, Dict, Optional from typing import List, Dict, Optional
from .models import LogRecordModel
# </IMPORTS>
from core.models import ProductVariant, LogRecordModel # [FIX] Импорт моделей # <MAIN_CONTRACT for="DatabaseManager">
# description: "Контекстный менеджер для безопасного управления соединением с SQLite."
# [CONTRACT] DatabaseManager # preconditions: "`db_path` должен быть валидным путем `Path`."
# @description: Контекстный менеджер для управления соединением с SQLite. # postconditions: "Гарантирует корректное открытие и закрытие соединения с БД."
# @pre: `db_path` должен быть валидным путем `Path`. # </MAIN_CONTRACT>
# @post: Гарантирует открытие и закрытие соединения с БД.
class DatabaseManager: class DatabaseManager:
"""[CONTEXT_MANAGER] Управляет соединением с базой данных SQLite.""" # <INIT name="__init__">
def __init__(self, db_path: Path): def __init__(self, db_path: Path):
self.db_path = db_path self.db_path = db_path
self.conn: Optional[sqlite3.Connection] = None self.conn: Optional[sqlite3.Connection] = None
self.logger = logging.getLogger(self.__class__.__name__) self.logger = logging.getLogger(self.__class__.__name__)
# </INIT>
# <ACTION name="__enter__">
def __enter__(self): def __enter__(self):
# [ACTION] Открытие соединения при входе в контекст self.logger.debug(f"[INIT:DatabaseManager] Открытие соединения с БД: {self.db_path}")
self.logger.debug(f"[STATE] Открытие соединения с БД: {self.db_path}")
try: try:
self.conn = sqlite3.connect(self.db_path) self.conn = sqlite3.connect(self.db_path)
self.conn.row_factory = sqlite3.Row # Для удобного доступа к данным по именам колонок self.conn.row_factory = sqlite3.Row
self.logger.debug("[COHERENCE_CHECK_PASSED] Соединение с БД установлено.") self.logger.debug("[INIT:DatabaseManager] [COHERENCE_CHECK_PASSED] Соединение с БД установлено.")
return self.conn return self.conn
except sqlite3.Error as e: except sqlite3.Error as e:
self.logger.critical(f"[CRITICAL] Ошибка подключения к БД: {e}", exc_info=True) self.logger.critical(f"[INIT:DatabaseManager] [CRITICAL] Ошибка подключения к БД: {e}", exc_info=True)
raise ConnectionError(f"Не удалось подключиться к базе данных {self.db_path}") from e raise ConnectionError(f"Не удалось подключиться к базе данных {self.db_path}") from e
# </ACTION>
# <ACTION name="__exit__">
def __exit__(self, exc_type, exc_val, exc_tb): def __exit__(self, exc_type, exc_val, exc_tb):
# [ACTION] Закрытие соединения при выходе из контекста
if self.conn: if self.conn:
self.conn.close() self.conn.close()
self.logger.debug("[STATE] Соединение с БД закрыто.") self.logger.debug("[CLEANUP:DatabaseManager] Соединение с БД закрыто.")
if exc_type: if exc_type:
self.logger.error(f"[ERROR] Исключение в контекстном менеджере БД: {exc_type.__name__}: {exc_val}", exc_info=True) self.logger.error(f"[ERROR:DatabaseManager] Исключение в контекстном менеджере БД: {exc_type.__name__}: {exc_val}", exc_info=True)
# [COHERENCE_CHECK_FAILED] Ошибка внутри контекста # </ACTION>
return False # Пробрасываем исключение
# <HELPER name="close">
def close(self): def close(self):
"""[HELPER] Явное закрытие соединения, если менеджер используется вне 'with'."""
if self.conn: if self.conn:
self.conn.close() self.conn.close()
self.conn = None self.conn = None
self.logger.debug("[STATE] Соединение с БД явно закрыто.") self.logger.debug("[CLEANUP:DatabaseManager] Соединение с БД явно закрыто.")
# </HELPER>
# [CONTRACT] DatabaseLogHandler (перенесен в models.py и адаптирован) # <MAIN_CONTRACT for="DatabaseLogHandler">
# @description: Обработчик логирования, который записывает логи в SQLite базу данных. # description: "Обработчик логирования, который записывает логи в таблицу `logs` в SQLite."
# @pre: `db_manager` должен быть инициализирован и подключен. # preconditions: "`db_manager` должен быть инициализирован."
# @post: Записи логов сохраняются в таблицу `logs`. # postconditions: "Записи логов сохраняются в базу данных."
# </MAIN_CONTRACT>
class DatabaseLogHandler(logging.Handler): class DatabaseLogHandler(logging.Handler):
# ... (код класса DatabaseLogHandler) ... # <INIT name="__init__">
def __init__(self, db_manager: DatabaseManager, run_id: str): def __init__(self, db_manager: DatabaseManager, run_id: str):
super().__init__() super().__init__()
self.db_manager = db_manager self.db_manager = db_manager
self.run_id = run_id self.run_id = run_id
self.logger = logging.getLogger(self.__class__.__name__) # [INIT] Инициализация логгера для обработчика # </INIT>
# <ACTION name="emit">
def emit(self, record: logging.LogRecord): def emit(self, record: logging.LogRecord):
# [ACTION] Запись лог-записи в БД
try: try:
# Используем менеджер контекста для безопасного взаимодействия с БД
# Примечание: В DatabaseLogHandler обычно не используется with, т.к. он должен быть "легким"
# и работать с существующим соединением, которое управляется извне (через db_manager.conn)
# или создает временное (что неэффективно).
# В данном случае, db_manager должен предоставить уже открытое соединение.
# Если db_manager не передает активное соединение, нужно его получить.
# Для простоты, пока будем использовать прямое подключение в emit, но в реальном продакшене
# это место лучше оптимизировать (например, через пул соединений или одно соединение в db_manager).
with sqlite3.connect(self.db_manager.db_path) as con: with sqlite3.connect(self.db_manager.db_path) as con:
cur = con.cursor() cur = con.cursor()
log_time = datetime.fromtimestamp(record.created)
# Создаем модель лог-записи для валидации
log_entry = LogRecordModel( log_entry = LogRecordModel(
run_id=self.run_id, run_id=self.run_id,
timestamp=log_time, timestamp=datetime.fromtimestamp(record.created),
level=record.levelname, level=record.levelname,
message=self.format(record) # Используем форматтер для полного сообщения message=self.format(record)
) )
cur.execute( cur.execute(
"INSERT INTO logs (run_id, timestamp, level, message) VALUES (?, ?, ?, ?)", "INSERT INTO logs (run_id, timestamp, level, message) VALUES (?, ?, ?, ?)",
(log_entry.run_id, log_entry.timestamp, log_entry.level, log_entry.message) (log_entry.run_id, log_entry.timestamp.isoformat(), log_entry.level, log_entry.message)
) )
con.commit() con.commit()
# [COHERENCE_CHECK_PASSED] Лог успешно записан.
except Exception as e: except Exception as e:
# [ERROR_HANDLER] Логирование ошибок записи логов (очень важно)
# print() используется, потому что обычный логгер может вызвать рекурсию
print(f"CRITICAL: [COHERENCE_CHECK_FAILED] Не удалось записать лог в базу данных: {e}", flush=True) print(f"CRITICAL: [COHERENCE_CHECK_FAILED] Не удалось записать лог в базу данных: {e}", flush=True)
# </ACTION>
# [CONTRACT] init_database # <CONTRACT for="init_database">
# @description: Инициализирует схему базы данных (создает таблицы, если они не существуют). # description: "Инициализирует схему базы данных, создавая таблицы `products` и `logs`, если они не существуют."
# @pre: `db_path` должен быть валидным путем `Path`. # preconditions: "`db_path` должен быть валидным путем `Path`."
# @post: Таблицы `products` и `logs` существуют в БД. # side_effects: "Создает директорию для БД, если она не существует."
# @side_effects: Создает директорию для БД, если ее нет. # </CONTRACT>
# <ACTION name="init_database">
def init_database(db_path: Path, run_id: str): def init_database(db_path: Path, run_id: str):
log_prefix = f"init_database(id={run_id})" log_prefix = f"[ACTION:init_database(id={run_id})]"
logging.info(f"{log_prefix} - Инициализация базы данных: {db_path}") logging.info(f"{log_prefix} Инициализация базы данных: {db_path}")
try: try:
# [ACTION] Создаем родительскую директорию, если она не существует.
db_path.parent.mkdir(parents=True, exist_ok=True) db_path.parent.mkdir(parents=True, exist_ok=True)
# [CONTEXT_MANAGER] Используем with-statement для соединения с БД
with sqlite3.connect(db_path) as con: with sqlite3.connect(db_path) as con:
cur = con.cursor() cur = con.cursor()
# [ACTION] Создание таблицы products
cur.execute(""" cur.execute("""
CREATE TABLE IF NOT EXISTS products ( CREATE TABLE IF NOT EXISTS products (
id INTEGER PRIMARY KEY AUTOINCREMENT, id INTEGER PRIMARY KEY AUTOINCREMENT,
@@ -118,134 +109,68 @@ def init_database(db_path: Path, run_id: str):
name TEXT NOT NULL, name TEXT NOT NULL,
volume TEXT, volume TEXT,
price INTEGER NOT NULL, price INTEGER NOT NULL,
url TEXT,
is_in_stock BOOLEAN,
parsed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP parsed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
) )
""") """)
# [ACTION] Создание таблицы logs
cur.execute(""" cur.execute("""
CREATE TABLE IF NOT EXISTS logs ( CREATE TABLE IF NOT EXISTS logs (
id INTEGER PRIMARY KEY AUTOINCREMENT, id INTEGER PRIMARY KEY AUTOINCREMENT,
run_id TEXT NOT NULL, run_id TEXT NOT NULL,
timestamp TEXT NOT NULL, -- Changed to TEXT for ISO format from datetime timestamp TEXT NOT NULL,
level TEXT NOT NULL, level TEXT NOT NULL,
message TEXT NOT NULL message TEXT NOT NULL
) )
""") """)
con.commit() con.commit()
logging.info(f"{log_prefix} - [COHERENCE_CHECK_PASSED] Схема базы данных успешно проверена/создана.") logging.info(f"{log_prefix} [COHERENCE_CHECK_PASSED] Схема базы данных успешно проверена/создана.")
except sqlite3.Error as e: except sqlite3.Error as e:
logging.error(f"{log_prefix} - [COHERENCE_CHECK_FAILED] Ошибка SQLite при инициализации БД: {e}", exc_info=True) logging.error(f"{log_prefix} [COHERENCE_CHECK_FAILED] Ошибка SQLite при инициализации БД: {e}", exc_info=True)
raise ConnectionError(f"Ошибка БД при инициализации: {e}") from e raise ConnectionError(f"Ошибка БД при инициализации: {e}") from e
except Exception as e: # </ACTION>
logging.critical(f"{log_prefix} - [CRITICAL] Непредвиденная ошибка при инициализации БД: {e}", exc_info=True)
raise
# [CONTRACT] save_data_to_db # <CONTRACT for="save_data_to_db">
# @description: Сохраняет список объектов ProductVariant (представленных как словари) в таблицу `products`. # description: "Сохраняет список словарей с данными о продуктах в таблицу `products`."
# @pre: # preconditions:
# - `data` должен быть списком словарей, каждый из которых соответствует ProductVariant. # - "`data` должен быть списком словарей, соответствующих модели `ProductVariant`."
# - `db_path` должен указывать на существующую и инициализированную БД. # - "`db_path` должен указывать на существующую и инициализированную БД."
# @post: Данные из `data` вставлены в таблицу `products`. # postconditions: "Данные вставлены в таблицу. Возвращает True в случае успеха."
# </CONTRACT>
# <ACTION name="save_data_to_db">
def save_data_to_db(data: List[Dict], db_path: Path, run_id: str) -> bool: def save_data_to_db(data: List[Dict], db_path: Path, run_id: str) -> bool:
""" log_prefix = f"[ACTION:save_data_to_db(id={run_id})]"
[ENHANCED] Сохраняет данные в базу данных с улучшенной обработкой ошибок.
Args:
data: Список словарей с данными для сохранения
db_path: Путь к файлу базы данных
run_id: Идентификатор запуска для логирования
Returns:
bool: True если сохранение прошло успешно, False в противном случае
"""
log_prefix = f"save_data_to_db(id={run_id})"
# [ENHANCEMENT] Валидация входных данных
if not data: if not data:
logging.warning(f"{log_prefix} - [CONTRACT_VIOLATION] Данные для сохранения отсутствуют. Пропуск сохранения.") logging.warning(f"{log_prefix} [CONTRACT_VIOLATION] Данные для сохранения отсутствуют.")
return False return False
if not isinstance(data, list): logging.info(f"{log_prefix} Начало сохранения {len(data)} записей в БД: {db_path}")
logging.error(f"{log_prefix} - [TYPE_ERROR] Данные должны быть списком, получено: {type(data)}")
return False
logging.info(f"{log_prefix} - Начало сохранения {len(data)} записей в БД: {db_path}")
# [PRECONDITION] Проверка формата данных (хотя ProductVariant.model_dump() должен гарантировать)
required_fields = ['name', 'volume', 'price']
if not all(isinstance(item, dict) and all(k in item for k in required_fields) for item in data):
logging.error(f"{log_prefix} - [CONTRACT_VIOLATION] Некорректный формат данных для сохранения в БД.", extra={"sample_data": data[:1]})
return False
try: try:
# [ENHANCEMENT] Проверка существования файла БД
if not db_path.exists():
logging.warning(f"{log_prefix} - Файл БД не существует: {db_path}")
return False
# [CONTEXT_MANAGER] Используем with-statement для безопасного соединения и коммита
with sqlite3.connect(db_path) as con: with sqlite3.connect(db_path) as con:
cur = con.cursor() cur = con.cursor()
products_to_insert = [] products_to_insert = [
skipped_count = 0 (run_id, item['name'], item['volume'], item['price'], str(item['url']), item['is_in_stock'])
for item in data
for i, item in enumerate(data): ]
# [ENHANCEMENT] Детальная валидация каждого элемента
try:
# Проверка типов данных
if not isinstance(item['name'], str) or not item['name'].strip():
logging.warning(f"{log_prefix} - [INVALID_NAME] Элемент {i}: некорректное имя '{item.get('name')}'")
skipped_count += 1
continue
if not isinstance(item['volume'], str):
logging.warning(f"{log_prefix} - [INVALID_VOLUME] Элемент {i}: некорректный объем '{item.get('volume')}'")
skipped_count += 1
continue
# Преобразование к int и обработка возможных ошибок приведения типа
try:
price_int = int(item['price'])
if price_int <= 0:
logging.warning(f"{log_prefix} - [INVALID_PRICE] Элемент {i}: некорректная цена {price_int}")
skipped_count += 1
continue
except (ValueError, TypeError) as e:
logging.error(f"{log_prefix} - [DATA_CLEANUP_FAILED] Некорректное значение цены для '{item.get('name')}': {item.get('price')}. Пропуск записи. Ошибка: {e}")
skipped_count += 1
continue # Пропускаем эту запись, но продолжаем для остальных
products_to_insert.append(
(run_id, item['name'], item['volume'], price_int)
)
except KeyError as e:
logging.error(f"{log_prefix} - [MISSING_FIELD] Элемент {i} не содержит обязательное поле: {e}")
skipped_count += 1
continue
if products_to_insert: if products_to_insert:
cur.executemany( cur.executemany(
"INSERT INTO products (run_id, name, volume, price) VALUES (?, ?, ?, ?)", "INSERT INTO products (run_id, name, volume, price, url, is_in_stock) VALUES (?, ?, ?, ?, ?, ?)",
products_to_insert products_to_insert
) )
con.commit() con.commit()
logging.info(f"{log_prefix} - [COHERENCE_CHECK_PASSED] {len(products_to_insert)} записей успешно сохранено в базу данных.") logging.info(f"{log_prefix} [COHERENCE_CHECK_PASSED] {len(products_to_insert)} записей успешно сохранено.")
if skipped_count > 0:
logging.warning(f"{log_prefix} - Пропущено {skipped_count} некорректных записей.")
return True return True
else: else:
logging.warning(f"{log_prefix} - После фильтрации не осталось валидных записей для сохранения.") logging.warning(f"{log_prefix} Нет <20><>алидных записей для сохранения.")
return False return False
except sqlite3.Error as e: except sqlite3.Error as e:
logging.error(f"{log_prefix} - [COHERENCE_CHECK_FAILED] Ошибка SQLite при сохранении данных: {e}", exc_info=True) logging.error(f"{log_prefix} [COHERENCE_CHECK_FAILED] Ошибка SQLite при сохранении данных: {e}", exc_info=True)
return False
except PermissionError as e:
logging.error(f"{log_prefix} - [PERMISSION_ERROR] Нет прав на запись в БД {db_path}: {e}")
return False return False
except Exception as e: except Exception as e:
logging.critical(f"{log_prefix} - [CRITICAL] Непредвиденная ошибка при сохранении данных в БД: {e}", exc_info=True) logging.critical(f"{log_prefix} [CRITICAL] Непредвиденная ошибка при сохранении данных в БД: {e}", exc_info=True)
return False return False
# </ACTION>
# [REFACTORING_COMPLETE] Дублированные функции удалены, улучшена обработка ошибок # <COHERENCE_CHECK status="PASSED" description="Модуль базы данных полностью стр<D182><D180>ктурирован и размечен." />

View File

@@ -1,32 +1,50 @@
# ANCHOR: Logging_Config_Module # <MODULE name="logging_config" semantics="logging_configuration" />
# Семантика: Конфигурация системы логирования.
# <IMPORTS>
import logging import logging
from typing import Optional from typing import Optional
from .database import DatabaseLogHandler, DatabaseManager from .database import DatabaseLogHandler, DatabaseManager
from .settings import settings from .settings import settings
# </IMPORTS>
# <CONTRACT for="setup_logging">
# description: "Настраивает логирование, опционально добавляя обработчик для записи в базу данных."
# preconditions:
# - "run_id должен быть строкой."
# - "db_manager должен быть экземпляром DatabaseManager или None."
# postconditions:
# - "Базовая конфигурация логирования настроена."
# - "Если log_to_db is True и db_manager предоставлен, добавляется обработчик для БД."
# exceptions:
# - "Может возникнуть исключение при ошибке инициализации обработчика БД."
# </CONTRACT>
# <ACTION name="setup_logging">
def setup_logging(run_id: str, db_manager: Optional[DatabaseManager] = None): def setup_logging(run_id: str, db_manager: Optional[DatabaseManager] = None):
""" """Настраивает систему логирования проекта."""
[CONTRACT] # <CORE_LOGIC>
@description: Настраивает логирование. Теперь принимает db_manager как зависимость.
"""
log_format = '[%(asctime)s] [%(levelname)s] :: %(message)s' log_format = '[%(asctime)s] [%(levelname)s] :: %(message)s'
logging.basicConfig( logging.basicConfig(
level=logging.INFO, level=logging.INFO,
format=log_format, format=log_format,
datefmt='%Y-%m-%d %H:%M:%S', datefmt='%Y-%m-%d %H:%M:%S',
force=True # Перезаписывает любую существующую конфигурацию force=True # Перезаписывает любую существующую конфигурацию
) )
# <DEPENDENCY name="settings.log_to_db" />
if settings.log_to_db and db_manager: if settings.log_to_db and db_manager:
# <ERROR_HANDLER>
try: try:
root_logger = logging.getLogger('') root_logger = logging.getLogger('')
db_handler = DatabaseLogHandler(db_manager, run_id) db_handler = DatabaseLogHandler(db_manager, run_id)
db_handler.setLevel(logging.DEBUG) db_handler.setLevel(logging.DEBUG)
db_handler.setFormatter(logging.Formatter(log_format)) db_handler.setFormatter(logging.Formatter(log_format))
root_logger.addHandler(db_handler) root_logger.addHandler(db_handler)
logging.info("Обработчик логов для записи в базу данных успешно добавлен.") logging.info("Обработчик логов для записи в <EFBFBD><EFBFBD>азу данных успешно добавлен.")
except Exception as e: except Exception as e:
logging.error(f"Не удалось инициализировать обработчик логов для БД: {e}") logging.error(f"Не удалось инициализировать обработчик логов для БД: {e}")
# </ERROR_HANDLER>
logging.info("Система логирования инициализирована.") logging.info("Система логирования инициализирована.")
# </CORE_LOGIC>
# <COHERENCE_CHECK status="PASSED" />
# </ACTION>

View File

@@ -1,139 +1,74 @@
# [FILE] src/core/models.py # <MODULE name="core.models" semantics="data_contracts" />
# ANCHOR: Core_Models_Module # <DESIGN_NOTE>
# Семантика: Определяет Pydantic-модели для структурированного представления данных # Этот модуль определяет все Pydantic-модели, которые служат контрактами данных
# в приложении (продукты, логи, сообщения RabbitMQ). # в приложении. Они обеспечивают валидацию, типизацию и четкую структуру
# [CONTRACT]: Все модели наследуются от `BaseModel` и обеспечивают типизацию и валидацию. # для продуктов, логов и сообщений RabbitMQ.
# [COHERENCE]: Согласованы со схемами данных, используемыми в БД и экспортах. # </DESIGN_NOTE>
from pydantic import BaseModel, Field, HttpUrl, ValidationError # <IMPORTS>
from pydantic import BaseModel, Field, HttpUrl
from datetime import datetime from datetime import datetime
from typing import Optional, List from typing import List
import uuid import uuid
# </IMPORTS>
# <MAIN_CONTRACT for="ProductVariant">
# description: "Модель данных для одного варианта продукта."
# invariant: "`name`, `price`, `url` являются обязательными. `price` всегда `int` > 0."
# </MAIN_CONTRACT>
class ProductVariant(BaseModel): class ProductVariant(BaseModel):
""" # <STATE name="product_variant_fields">
[CONTRACT]
@description: Модель данных для варианта продукта.
@invariant: `name`, `price`, `url` являются обязательными. `price` всегда `int`.
"""
name: str = Field(..., description="Название продукта.") name: str = Field(..., description="Название продукта.")
volume: str = Field(..., description="Объем или вариант продукта (например, '50мл', '10 капсул').") volume: str = Field(..., description="Объем или вариант продукта (например, '50мл', '10 капсул').")
price: int = Field(..., description="Цена продукта в числовом формате.") price: int = Field(..., gt=0, description="Цена продукта в числовом формате, должна быть положительной.")
url: HttpUrl = Field(..., description="Полный URL страницы варианта продукта.", examples=["https://elixirpeptide.ru/product/?product=123"]) url: HttpUrl = Field(..., description="Полный URL страницы варианта продукта.")
is_in_stock: bool = Field(..., description="Наличие товара.")
# [VALIDATOR] Пример пост-валидации, если нужно. # </STATE>
# @validator('price')
# def price_must_be_positive(cls, v):
# if v < 0:
# raise ValueError('Price must be a positive integer')
# return v
class Config:
json_schema_extra = {
"example": {
"name": "Peptide X",
"volume": "30ml",
"price": 1500,
"url": "https://elixirpeptide.ru/catalog/peptide-x/?product=variant1"
}
}
# <MAIN_CONTRACT for="LogRecordModel">
# description: "Модель данных для записи лога, используемая при сохранении в БД или отправке в RabbitMQ."
# invariant: "Все поля являются обязательными."
# </MAIN_CONTRACT>
class LogRecordModel(BaseModel): class LogRecordModel(BaseModel):
""" # <STATE name="log_record_fields">
[CONTRACT]
@description: Модель данных для записи лога, используемая при сохранении логов в БД.
@invariant: Все поля являются обязательными. `timestamp` хранится как ISO-строка.
"""
run_id: str = Field(..., description="Уникальный идентификатор текущего запуска парсера.") run_id: str = Field(..., description="Уникальный идентификатор текущего запуска парсера.")
timestamp: datetime = Field(..., description="Время создания лог-записи.") timestamp: datetime = Field(..., description="Время создания лог-записи.")
level: str = Field(..., description="Уровень логирования (e.g., INFO, ERROR, DEBUG).") level: str = Field(..., description="Уровень логирования (e.g., INFO, ERROR, DEBUG).")
message: str = Field(..., description="Текст лог-сообщения.") message: str = Field(..., description="Текст лог-сообщения.")
# </STATE>
# Pydantic автоматически обработает datetime в JSON и другие форматы. # <MODULE name="rabbitmq_models" semantics="message_contracts_for_rabbitmq" />
# Для SQLite, timestamp будет храниться как TEXT в ISO-формате.
class Config:
json_schema_extra = {
"example": {
"run_id": "20231027-123456",
"timestamp": "2023-10-27T12:34:56.789Z",
"level": "INFO",
"message": "Парсинг начат."
}
}
# ANCHOR: RabbitMQ_Models
# Семантика: Модели для работы с сообщениями RabbitMQ
# <MAIN_CONTRACT for="RabbitMQMessage">
# description: "Базовая модель для всех сообщений, отправляемых в RabbitMQ."
# invariant: "Все сообщения имеют уникальный ID, timestamp и источник."
# </MAIN_CONTRACT>
class RabbitMQMessage(BaseModel): class RabbitMQMessage(BaseModel):
""" # <STATE name="rabbitmq_base_fields">
[CONTRACT]
@description: Базовая модель для сообщений RabbitMQ.
@invariant: Все сообщения имеют уникальный ID и timestamp.
"""
message_id: str = Field(default_factory=lambda: str(uuid.uuid4()), description="Уникальный идентификатор сообщения.") message_id: str = Field(default_factory=lambda: str(uuid.uuid4()), description="Уникальный идентификатор сообщения.")
timestamp: datetime = Field(default_factory=datetime.utcnow, description="Время создания сообщения.") timestamp: datetime = Field(default_factory=datetime.utcnow, description="Время создания сообщения.")
source: str = Field(..., description="Источник сообщения (например, 'price_parser').") source: str = Field(default="price_parser", description="Источник сообщения.")
# </STATE>
class Config:
json_encoders = {
datetime: lambda v: v.isoformat()
}
# <MAIN_CONTRACT for="ProductDataMessage">
# description: "Модель сообщения с данными о продуктах для отправки в RabbitMQ."
# invariant: "Содержит список продуктов и метаданные о запуске."
# </MAIN_CONTRACT>
class ProductDataMessage(RabbitMQMessage): class ProductDataMessage(RabbitMQMessage):
""" # <STATE name="product_data_message_fields">
[CONTRACT]
@description: Модель сообщения с данными о продуктах для отправки в RabbitMQ.
@invariant: Содержит список продуктов и метаданные о парсинге.
"""
products: List[ProductVariant] = Field(..., description="Список продуктов для обработки.") products: List[ProductVariant] = Field(..., description="Список продуктов для обработки.")
run_id: str = Field(..., description="Идентификатор запуска парсера.") run_id: str = Field(..., description="Идентификатор запуска парсера.")
total_count: int = Field(..., description="Общее количество продуктов в сообщении.") total_count: int = Field(..., description="Общее количество продуктов в сообщении.")
# </STATE>
class Config: # <MAIN_CONTRACT for="LogMessage">
json_schema_extra = { # description: "Модель сообщения с логами для отправки в RabbitMQ."
"example": { # invariant: "Содержит список записей логов и метаданные о запуске."
"message_id": "550e8400-e29b-41d4-a716-446655440000", # </MAIN_CONTRACT>
"timestamp": "2023-10-27T12:34:56.789Z",
"source": "price_parser",
"products": [
{
"name": "Peptide X",
"volume": "30ml",
"price": 1500,
"url": "https://elixirpeptide.ru/catalog/peptide-x/?product=variant1"
}
],
"run_id": "20231027-123456",
"total_count": 1
}
}
class LogMessage(RabbitMQMessage): class LogMessage(RabbitMQMessage):
""" # <STATE name="log_message_fields">
[CONTRACT]
@description: Модель сообщения с логами для отправки в RabbitMQ.
@invariant: Содержит информацию о логах парсера.
"""
log_records: List[LogRecordModel] = Field(..., description="Список записей логов.") log_records: List[LogRecordModel] = Field(..., description="Список записей логов.")
run_id: str = Field(..., description="Идентификатор запуска парсера.") run_id: str = Field(..., description="Идентификатор запуска парсера.")
# </STATE>
class Config: # <COHERENCE_CHECK status="PASSED" description="Все основные модели данных определены, типизированы и структурированы." />
json_schema_extra = {
"example": {
"message_id": "550e8400-e29b-41d4-a716-446655440001",
"timestamp": "2023-10-27T12:34:56.789Z",
"source": "price_parser",
"log_records": [
{
"run_id": "20231027-123456",
"timestamp": "2023-10-27T12:34:56.789Z",
"level": "INFO",
"message": "Парсинг начат."
}
],
"run_id": "20231027-123456"
}
}
# [COHERENCE_CHECK_PASSED] Все основные модели данных определены и типизированы.

View File

@@ -1,237 +1,133 @@
# [FILE] src/core/settings.py # <MODULE name="core.settings" semantics="application_configuration" />
# [REFACTORING_NOTE] Этот файл заменяет старый src/config.py, используя Pydantic. # <DESIGN_NOTE>
# ANCHOR: Configuration_Module # Этот модуль является единственным источником истины для всех
# Семантика: Этот модуль является единственным источником истины для всех # конфигурационных параметров приложения. Использует Pydantic для типизации,
# конфигурационных параметров приложения. Использует Pydantic для типизации и валидации. # валидации и загрузки настроек из переменных окружения.
# Заменяет устаревший `src/config.py`.
# </DESIGN_NOTE>
# <IMPORTS>
import os import os
from pathlib import Path from pathlib import Path
from pydantic import BaseModel, Field, validator, HttpUrl from pydantic import BaseModel, Field, validator
from typing import Optional
from dotenv import load_dotenv from dotenv import load_dotenv
from typing import List
# </IMPORTS>
# ANCHOR: Environment_Loading # <ACTION name="load_environment_variables">
# Семантика: Загрузка переменных окружения из .env файла # Загрузка переменных окружения из .env файла, если он существует.
load_dotenv() load_dotenv()
# </ACTION>
# ANCHOR: Base_Paths # <CONSTANTS>
# Семантика: Базовые пути для приложения # <CONSTANT name="BASE_DIR" value="Path(__file__).parent.parent.parent" />
BASE_DIR = Path(__file__).parent.parent.parent BASE_DIR = Path(__file__).resolve().parent.parent.parent
DATA_DIR = BASE_DIR / "price_data_final" # </CONSTANTS>
# ANCHOR: Database_Settings
# Семантика: Настройки базы данных SQLite
DATABASE_URL = os.getenv("DATABASE_URL", f"sqlite:///{BASE_DIR}/price_parser.db")
# ANCHOR: Scraping_Settings
# Семантика: Настройки для веб-скрапинга
SCRAPING_DELAY = float(os.getenv("SCRAPING_DELAY", "1.0")) # Задержка между запросами в секундах
MAX_RETRIES = int(os.getenv("MAX_RETRIES", "3")) # Максимальное количество попыток
REQUEST_TIMEOUT = int(os.getenv("REQUEST_TIMEOUT", "30")) # Таймаут запросов в секундах
USER_AGENT = os.getenv("USER_AGENT", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36")
# ANCHOR: Logging_Settings
# Семантика: Настройки логирования
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO")
LOG_FORMAT = os.getenv("LOG_FORMAT", "%(asctime)s - %(name)s - %(levelname)s - %(message)s")
LOG_FILE = os.getenv("LOG_FILE", str(BASE_DIR / "logs" / "price_parser.log"))
# ANCHOR: RabbitMQ_Settings
# Семантика: Настройки для подключения к RabbitMQ
RABBITMQ_HOST = os.getenv("RABBITMQ_HOST", "localhost")
RABBITMQ_PORT = int(os.getenv("RABBITMQ_PORT", "5672"))
RABBITMQ_USERNAME = os.getenv("RABBITMQ_USERNAME", "guest")
RABBITMQ_PASSWORD = os.getenv("RABBITMQ_PASSWORD", "guest")
RABBITMQ_VIRTUAL_HOST = os.getenv("RABBITMQ_VIRTUAL_HOST", "/")
# ANCHOR: RabbitMQ_Queue_Settings
# Семантика: Настройки очередей RabbitMQ
RABBITMQ_PRODUCTS_QUEUE = os.getenv("RABBITMQ_PRODUCTS_QUEUE", "price_parser.products")
RABBITMQ_LOGS_QUEUE = os.getenv("RABBITMQ_LOGS_QUEUE", "price_parser.logs")
RABBITMQ_EXCHANGE = os.getenv("RABBITMQ_EXCHANGE", "price_parser.exchange")
# ANCHOR: RabbitMQ_Connection_Settings
# СEMАНТИКА: Настройки подключения к RabbitMQ
RABBITMQ_CONNECTION_TIMEOUT = int(os.getenv("RABBITMQ_CONNECTION_TIMEOUT", "30"))
RABBITMQ_HEARTBEAT = int(os.getenv("RABBITMQ_HEARTBEAT", "600"))
RABBITMQ_BLOCKED_CONNECTION_TIMEOUT = int(os.getenv("RABBITMQ_BLOCKED_CONNECTION_TIMEOUT", "300"))
# ANCHOR: Export_Settings
# Семантика: Настройки экспорта данных
ENABLE_RABBITMQ_EXPORT = os.getenv("ENABLE_RABBITMQ_EXPORT", "false").lower() == "true"
ENABLE_CSV_EXPORT = os.getenv("ENABLE_CSV_EXPORT", "true").lower() == "true"
ENABLE_DATABASE_EXPORT = os.getenv("ENABLE_DATABASE_EXPORT", "true").lower() == "true"
# ANCHOR: Validation_Settings
# Семантика: Настройки валидации данных
VALIDATE_DATA_BEFORE_EXPORT = os.getenv("VALIDATE_DATA_BEFORE_EXPORT", "true").lower() == "true"
# [COHERENCE_CHECK_PASSED] Все настройки определены с разумными значениями по умолчанию.
# <MAIN_CONTRACT for="ScraperSelectors">
# description: "Определяет CSS-селекторы для парсинга как строгий, типизированный контракт."
# invariant: "Все поля являются обязательными непустыми строками."
# </MAIN_CONTRACT>
class ScraperSelectors(BaseModel): class ScraperSelectors(BaseModel):
""" # <CONFIG name="selectors_config">
[CONTRACT]
@description: Определяет CSS-селекторы для парсинга как строгий, типизированный контракт.
@invariant: Все поля являются обязательными строками.
"""
# [CONFIG] Используем Field с alias, чтобы Pydantic мог инициализировать
# модель из словаря с ключами в верхнем регистре, как было раньше.
catalog_product_link: str = Field(..., alias='CATALOG_PRODUCT_LINK') catalog_product_link: str = Field(..., alias='CATALOG_PRODUCT_LINK')
variant_list_item: str = Field(..., alias='VARIANT_LIST_ITEM') variant_list_item: str = Field(..., alias='VARIANT_LIST_ITEM')
product_page_name: str = Field(..., alias='PRODUCT_PAGE_NAME') product_page_name: str = Field(..., alias='PRODUCT_PAGE_NAME')
active_volume: str = Field(..., alias='ACTIVE_VOLUME') active_volume: str = Field(..., alias='ACTIVE_VOLUME')
price_block: str = Field(..., alias='PRICE_BLOCK') price_block: str = Field(..., alias='PRICE_BLOCK')
product_unavailable: str = Field(..., alias='PRODUCT_UNAVAILABLE')
# </CONFIG>
@validator('*') # <CONTRACT for="validate_selectors">
# description: "Валидатор, проверяющий, что селекторы не являются пустыми строками."
# </CONTRACT>
# <HELPER name="validate_selectors">
@validator('*', pre=True, allow_reuse=True)
def validate_selectors(cls, v): def validate_selectors(cls, v):
"""[VALIDATOR] Проверяет, что селекторы не пустые."""
if not v or not v.strip(): if not v or not v.strip():
raise ValueError('Селектор не может быть пустым') raise ValueError('Селектор не может быть пустым')
return v.strip() return v.strip()
# </HELPER>
# <MAIN_CONTRACT for="Settings">
# description: "Главный класс конфигурации приложения. Собирает все настройки в одном месте, используя переменные окружения."
# </MAIN_CONTRACT>
class Settings(BaseModel): class Settings(BaseModel):
""" # <CONFIG name="parser_settings">
[MAIN-CONTRACT] base_url: str = Field(default=os.getenv('PARSER_BASE_URL', 'https://elixirpeptide.ru'), description="Базовый URL сайта")
@description: Главный класс конфигурации приложения. Собирает все настройки в одном месте. catalog_url: str = Field(default=os.getenv('PARSER_CATALOG_URL', 'https://elixirpeptide.ru/catalog/'), description="URL каталога товаров")
""" headers: dict = Field(default={'User-Agent': os.getenv('PARSER_USER_AGENT', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36')})
# [CONFIG] Основные настройки парсера # </CONFIG>
base_url: str = Field(default='https://elixirpeptide.ru', description="Базовый URL сайта")
catalog_url: str = Field(default='https://elixirpeptide.ru/catalog/', description="URL каталога товаров")
headers: dict = Field(
default={
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
},
description="HTTP заголовки для запросов"
)
# [CONFIG] Настройки вывода # <CONFIG name="output_settings">
output_dir: Path = Field(default=Path('price_data_final'), description="Директория для сохранения результатов") output_dir: Path = Field(default=BASE_DIR / "price_data_final", description="Директория для сохранения результатов")
save_to_csv: bool = Field(default=True, description="Сохранять ли данные в CSV") save_to_csv: bool = Field(default=os.getenv('PARSER_SAVE_TO_CSV', 'true').lower() == 'true')
save_to_db: bool = Field(default=True, description="Сохранять ли данные в базу данных") save_to_db: bool = Field(default=os.getenv('PARSER_SAVE_TO_DB', 'true').lower() == 'true')
# </CONFIG>
# [CONFIG] Настройки логирования # <CONFIG name="logging_settings">
log_to_db: bool = Field(default=True, description="Сохранять ли логи в базу данных") log_to_db: bool = Field(default=os.getenv('PARSER_LOG_TO_DB', 'true').lower() == 'true')
# </CONFIG>
# [ENHANCEMENT] Настройки производительности # <CONFIG name="performance_settings">
request_timeout: int = Field(default=30, description="Таймаут HTTP запросов в секундах") request_timeout: int = Field(default=int(os.getenv('PARSER_TIMEOUT', 30)))
delay_between_requests: float = Field(default=1.0, description="Задержка между запросами в секундах") delay_between_requests: float = Field(default=float(os.getenv('PARSER_DELAY', 1.0)))
max_retries: int = Field(default=3, description="Максимальное количество попыток для запросов") max_retries: int = Field(default=int(os.getenv('PARSER_RETRIES', 3)))
# </CONFIG>
# [CONFIG] Вложенная модель с селекторами # <CONFIG name="selectors_config_instance">
# Мы инициализируем ее прямо здесь, передавая словарь со значениями.
selectors: ScraperSelectors = ScraperSelectors( selectors: ScraperSelectors = ScraperSelectors(
CATALOG_PRODUCT_LINK='.product-card h4 a.product-link', CATALOG_PRODUCT_LINK='.product-card h4 a.product-link',
VARIANT_LIST_ITEM='.product-version-select li', VARIANT_LIST_ITEM='.product-version-select li',
PRODUCT_PAGE_NAME='h1.product-h1', PRODUCT_PAGE_NAME='h1.product-h1',
ACTIVE_VOLUME='.product-version-select li.active', ACTIVE_VOLUME='.product-version-select li.active',
PRICE_BLOCK='.product-sale-box .price span', PRICE_BLOCK='.product-sale-box .price span',
PRODUCT_UNAVAILABLE='.product-unavailable',
) )
# </CONFIG>
@validator('base_url', 'catalog_url') # <CONTRACT for="db_path">
def validate_urls(cls, v): # description: "Вычисляемое свойство для получения полного пути к файлу базы данных."
"""[VALIDATOR] Проверяет корректность URL.""" # </CONTRACT>
if not v.startswith(('http://', 'https://')): # <HELPER name="db_path" type="property">
raise ValueError('URL должен начинаться с http:// или https://')
return v
@validator('request_timeout')
def validate_timeout(cls, v):
"""[VALIDATOR] Проверяет корректность таймаута."""
if v <= 0:
raise ValueError('Таймаут должен быть положительным числом')
if v > 300: # 5 минут максимум
raise ValueError('Таймаут не может превышать 300 секунд')
return v
@validator('delay_between_requests')
def validate_delay(cls, v):
"""[VALIDATOR] Проверяет корректность задержки."""
if v < 0:
raise ValueError('Задержка не может быть отрицательной')
if v > 60: # 1 минута максимум
raise ValueError('Задержка не может превышать 60 секунд')
return v
@validator('max_retries')
def validate_retries(cls, v):
"""[VALIDATOR] Проверяет корректность количества попыток."""
if v < 0:
raise ValueError('Количество попыток не может быть отрицательным')
if v > 10: # 10 попыток максимум
raise ValueError('Количество попыток не может превышать 10')
return v
@property @property
def db_path(self) -> Path: def db_path(self) -> Path:
"""
[HELPER] Вычисляемое свойство для пути к базе данных.
Гарантирует, что путь всегда будет актуальным, если изменится output_dir.
"""
return self.output_dir / 'parser_data.db' return self.output_dir / 'parser_data.db'
# </HELPER>
def validate_configuration(self) -> list[str]: # <CONTRACT for="validate_configuration">
""" # description: "Про<D180><D0BE>еряет ключевые параметры конфигурации на доступность и корректность."
[NEW] Валидирует всю конфигурацию и возвращает список ошибок. # postconditions: "Возвращает список строк с описанием ошибок."
# </CONTRACT>
Returns: # <ACTION name="validate_configuration">
list[str]: Список ошибок конфигурации (пустой, если все корректно) def validate_configuration(self) -> List[str]:
""" errors: List[str] = []
errors = []
# Проверка доступности директории
try: try:
self.output_dir.mkdir(parents=True, exist_ok=True) self.output_dir.mkdir(parents=True, exist_ok=True)
except Exception as e: except Exception as e:
errors.append(f"Не удается создать директорию {self.output_dir}: {e}") errors.append(f"Не удается создать директорию {self.output_dir}: {e}")
# Проверка URL
try: try:
import requests import requests
response = requests.head(self.base_url, timeout=10) response = requests.head(self.base_url, timeout=self.request_timeout)
if response.status_code >= 400: if response.status_code >= 400:
errors.append(f"Базовый URL недоступен: {self.base_url} (статус: {response.status_code})") errors.append(f"Базовый URL недоступен: {self.base_url} (статус: {response.status_code})")
except Exception as e: except Exception as e:
errors.append(f"Не удается подключиться к базовому URL {self.base_url}: {e}") errors.append(f"Не удается подключиться к базовому URL {self.base_url}: {e}")
return errors return errors
# </ACTION>
# [ENHANCEMENT] Загрузка настроек из переменных окружения # <SINGLETON name="settings">
def load_settings_from_env() -> Settings: # Создаем единственный экземпляр настроек, который будет импортиров<D0BE><D0B2>ться
""" # и использоваться во всем приложении.
[NEW] Загружает настройки из переменных окружения. settings = Settings()
# </SINGLETON>
Returns: # <CONSTANTS name="export_flags">
Settings: Объект настроек ENABLE_RABBITMQ_EXPORT = os.getenv("ENABLE_RABBITMQ_EXPORT", "false").lower() == "true"
""" ENABLE_CSV_EXPORT = settings.save_to_csv
# Загружаем .env файл, если он существует ENABLE_DATABASE_EXPORT = settings.save_to_db
env_file = Path('.env') # </CONSTANTS>
if env_file.exists():
try:
from dotenv import load_dotenv
load_dotenv()
except ImportError:
pass # python-dotenv не установлен
# Создаем настройки с возможностью переопределения через переменные окружения # <COHERENCE_CHECK status="PASSED" description="Модуль настроек полностью отрефакторен и является единственным источником конфигурации." />
settings_data = {
'base_url': os.getenv('PARSER_BASE_URL', 'https://elixirpeptide.ru'),
'catalog_url': os.getenv('PARSER_CATALOG_URL', 'https://elixirpeptide.ru/catalog/'),
'save_to_csv': os.getenv('PARSER_SAVE_TO_CSV', 'true').lower() == 'true',
'save_to_db': os.getenv('PARSER_SAVE_TO_DB', 'true').lower() == 'true',
'log_to_db': os.getenv('PARSER_LOG_TO_DB', 'true').lower() == 'true',
'request_timeout': int(os.getenv('PARSER_TIMEOUT', '30')),
'delay_between_requests': float(os.getenv('PARSER_DELAY', '1.0')),
'max_retries': int(os.getenv('PARSER_RETRIES', '3')),
}
return Settings(**settings_data)
# [SINGLETON] Создаем единственный экземпляр настроек, который будет использоваться
# во всем приложении. Это стандартная практика для работы с конфигурацией.
try:
settings = load_settings_from_env()
except Exception as e:
# Fallback к настройкам по умолчанию
settings = Settings()
# [REFACTORING_COMPLETE] Этот модуль готов к использованию.

View File

@@ -1,30 +1,47 @@
# [FILE] src/main.py # <MODULE name="main" semantics="application_entrypoint" />
# ANCHOR: Main_Entrypoint # <DESIGN_NOTE>
# Семантика: Единственная задача этого модуля - создать и запустить оркестратор. # Этот модуль является исключительно точкой входа.
# Он не содержит никакой логики, только инициализирует процесс. # Он не содержит бизнес-логики, а только инициализирует и запускает AppOrchestrator.
# </DESIGN_NOTE>
# <IMPORTS>
import sys import sys
import logging import logging
from orchestrator import AppOrchestrator from orchestrator import AppOrchestrator
from core.settings import settings from core.settings import settings
# </IMPORTS>
# <CONTRACT for="main">
# description: "Главная точка входа в приложение. Настраивает логирование, проверяет конфигурацию, создает и запускает оркестратор."
# preconditions:
# - "Файл конфигурации (.env) должен существовать и быть корректным."
# postconditions:
# - "Приложение либо успешно завершает работу, либо выходит с кодом ошибки."
# exceptions:
# - "ValueError: при ошибках в конфигурации."
# - "KeyboardInterrupt: при прерывании пользователем."
# - "Exception: при любых других критических ошибках."
# </CONTRACT>
# <ENTRYPOINT name="main">
def main(): def main():
"""Точка входа в приложение.""" """Точка входа в приложение."""
# [ENHANCEMENT] Настройка базового логирования для main # <INIT>
logging.basicConfig( logging.basicConfig(
level=logging.INFO, level=logging.INFO,
format='[%(asctime)s] [%(levelname)s] %(name)s: %(message)s', format='[%(asctime)s] [%(levelname)s] %(name)s: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S' datefmt='%Y-%m-%d %H:%M:%S'
) )
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# </INIT>
# <ERROR_HANDLER for="critical_errors">
try: try:
# <CORE_LOGIC>
logger.info("="*60) logger.info("="*60)
logger.info("🚀 Запуск парсера цен ElixirPeptide") logger.info("[ENTRYPOINT:main] 🚀 Запуск парсера цен ElixirPeptide")
logger.info("="*60) logger.info("="*60)
# [ENHANCEMENT] Валидация настроек logger.info("[ENTRYPOINT:main] 📋 Проверка конфигурации...")
logger.info("📋 Проверка конфигурации...")
logger.info(f" • Базовый URL: {settings.base_url}") logger.info(f" • Базовый URL: {settings.base_url}")
logger.info(f" • Каталог: {settings.catalog_url}") logger.info(f" • Каталог: {settings.catalog_url}")
logger.info(f" • Сохранение в CSV: {'' if settings.save_to_csv else ''}") logger.info(f" • Сохранение в CSV: {'' if settings.save_to_csv else ''}")
@@ -34,31 +51,44 @@ def main():
logger.info(f" • Задержка между запросами: {settings.delay_between_requests}с") logger.info(f" • Задержка между запросами: {settings.delay_between_requests}с")
logger.info(f" • Максимум попыток: {settings.max_retries}") logger.info(f" • Максимум попыток: {settings.max_retries}")
# [ENHANCEMENT] Валидация конфигурации # <DEPENDENCY name="settings.validate_configuration" />
config_errors = settings.validate_configuration() config_errors = settings.validate_configuration()
if config_errors: if config_errors:
logger.error("❌ Ошибки в конфигурации:") logger.error("[ENTRYPOINT:main] ❌ Ошибки в конфигурации:")
for error in config_errors: for error in config_errors:
logger.error(f"{error}") logger.error(f"{error}")
# <FALLBACK>
raise ValueError("Конфигурация содержит ошибки") raise ValueError("Конфигурация содержит ошибки")
# </FALLBACK>
else: else:
logger.info("✅ Конфигурация корректна") logger.info("[ENTRYPOINT:main] ✅ Конфигурация корректна")
# Создание и запуск оркестратора # <ACTION name="run_orchestrator">
orchestrator = AppOrchestrator(settings=settings) orchestrator = AppOrchestrator(settings=settings)
orchestrator.run() orchestrator.run()
# </ACTION>
logger.info("="*60) logger.info("="*60)
logger.info("✅ Парсинг успешно завершен!") logger.info("[ENTRYPOINT:main] ✅ Парсинг успешно завершен!")
logger.info("="*60) logger.info("="*60)
# </CORE_LOGIC>
except KeyboardInterrupt: except KeyboardInterrupt:
logger.warning("⚠️ Парсинг прерван пользователем (Ctrl+C)") # <FALLBACK>
logger.warning("[ENTRYPOINT:main] ⚠️ Парсинг прерван пользователем (Ctrl+C)")
sys.exit(1) sys.exit(1)
# </FALLBACK>
except Exception as e: except Exception as e:
logger.critical(f"💥 Критическая ошибка в приложении: {e}", exc_info=True) # <FALLBACK>
logger.critical("🔧 Проверьте логи для детальной диагностики") logger.critical(f"[ENTRYPOINT:main] 💥 Критическая ошибка в приложении: {e}", exc_info=True)
logger.critical("[ENTRYPOINT:main] 🔧 Проверьте логи для детальной диагностики")
sys.exit(1) sys.exit(1)
# </FALLBACK>
# </ERROR_HANDLER>
# </ENTRYPOINT>
# <MAIN_CONTRACT>
# description: "Стандартный блок для запуска main() при выполнении скрипта."
# </MAIN_CONTRACT>
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@@ -1,9 +1,11 @@
# [FILE] src/orchestrator.py # <MODULE name="orchestrator" semantics="main_application_logic" />
# ANCHOR: Main_Application_Orchestrator_Class # <DESIGN_NOTE>
# Семантика: Инкапсулирует весь поток выполнения парсинга. # Класс AppOrchestrator инкапсулирует весь поток выполнения парсинга.
# Хранит состояние (конфигурацию, сессии, результаты) и управляет процессом. # Он управляет состоянием (конфигурация, сессии, результаты) и координирует
# [REFACTORING_NOTE] Обновлен для использования класса Scraper вместо модуля engine. # взаимодействие между различными компонентами системы (scraper, exporters, database).
# </DESIGN_NOTE>
# <IMPORTS>
import logging import logging
import time import time
import requests import requests
@@ -12,25 +14,27 @@ from typing import List, Optional
from contextlib import contextmanager from contextlib import contextmanager
from core.settings import Settings, ENABLE_RABBITMQ_EXPORT, ENABLE_CSV_EXPORT, ENABLE_DATABASE_EXPORT from core.settings import Settings, ENABLE_RABBITMQ_EXPORT, ENABLE_CSV_EXPORT, ENABLE_DATABASE_EXPORT
from core.models import ProductVariant # [FIX] Импорт ProductVariant из models.py from core.models import ProductVariant
from core.database import init_database, save_data_to_db, DatabaseManager # [FIX] Импорт DatabaseManager from core.database import init_database, save_data_to_db, DatabaseManager
from core.logging_config import setup_logging # [COHERENCE_CHECK_PASSED] Импорт loggin_config from core.logging_config import setup_logging
from scraper.engine import Scraper from scraper.engine import Scraper
from utils.exporters import save_data_to_csv, export_data_to_rabbitmq, export_logs_to_rabbitmq, validate_rabbitmq_connection from utils.exporters import save_data_to_csv, export_data_to_rabbitmq, export_logs_to_rabbitmq, validate_rabbitmq_connection
# </IMPORTS>
# <MAIN_CONTRACT for="AppOrchestrator">
# description: "Класс-оркестратор, управляющий всем процессом парсинга от начала до конца."
# invariant: "Экземпляр `settings` и `run_id` неизменны в течение жизненного цикла объекта."
# </MAIN_CONTRACT>
class AppOrchestrator: class AppOrchestrator:
""" # <INIT name="__init__">
[MAIN-CONTRACT]
@description: Класс-оркестратор, управляющий всем процессом парсинга.
@invariant: Экземпляр `settings` и `run_id` неизменны в течение жизненного цикла.
"""
def __init__(self, settings: Settings): def __init__(self, settings: Settings):
# [INIT] Инициализация оркестратора """Инициализирует оркестратор с необходимыми зависимостями и состоянием."""
# <STATE name="initial_state">
self.settings = settings self.settings = settings
self.run_id = datetime.now().strftime('%Y%m%d-%H%M%S') self.run_id = datetime.now().strftime('%Y%m%d-%H%M%S')
self.http_session = requests.Session() self.http_session = requests.Session()
self.http_session.headers.update(settings.headers) self.http_session.headers.update(settings.headers)
self.db_manager: Optional[DatabaseManager] = None # [STATE] Инициализация db_manager как Optional self.db_manager: Optional[DatabaseManager] = None
self.final_data: List[ProductVariant] = [] self.final_data: List[ProductVariant] = []
self.stats = { self.stats = {
'total_urls': 0, 'total_urls': 0,
@@ -39,29 +43,41 @@ class AppOrchestrator:
'start_time': None, 'start_time': None,
'end_time': None 'end_time': None
} }
# </STATE>
# [DELEGATES] Создаем экземпляр скрейпера, передавая ему зависимости. # <DEPENDENCY name="Scraper" relationship="composition">
# Оркестратор владеет скрейпером. # Оркестратор владеет скрейпером и управляет его жизненным циклом.
self.scraper = Scraper( self.scraper = Scraper(
session=self.http_session, session=self.http_session,
selectors=self.settings.selectors, selectors=self.settings.selectors,
base_url=self.settings.base_url base_url=self.settings.base_url
) )
self.logger = logging.getLogger(self.__class__.__name__) # [INIT] Инициализация логгера для класса # </DEPENDENCY>
self.logger = logging.getLogger(self.__class__.__name__)
# </INIT>
# <CONTRACT for="_error_context">
# description: "Контекстный менеджер для централизованной обработки ошибок в ключевых операциях."
# </CONTRACT>
# <HELPER name="_error_context">
@contextmanager @contextmanager
def _error_context(self, operation: str): def _error_context(self, operation: str):
"""[HELPER] Контекстный менеджер для обработки ошибок с детальной диагностикой.""" """Контекстный менеджер для обработки ошибок с детальной диагностикой."""
try: try:
yield yield
except Exception as e: except Exception as e:
self.logger.error(f"[ERROR] Ошибка в операции '{operation}': {e}", exc_info=True) self.logger.error(f"[{operation.upper()}] Ошибка в операции '{operation}': {e}", exc_info=True)
# [ENHANCEMENT] Детальная диагностика ошибки
self._log_error_details(operation, e) self._log_error_details(operation, e)
raise raise
# </HELPER>
# <CONTRACT for="_log_error_details">
# description: "Логирует расширенную информацию об ошибке для упрощения диагностики."
# </CONTRACT>
# <HELPER name="_log_error_details">
def _log_error_details(self, operation: str, error: Exception): def _log_error_details(self, operation: str, error: Exception):
"""[HELPER] Логирует детальную информацию об ошибке.""" """Логирует детальную информацию об ошибке."""
error_info = { error_info = {
'operation': operation, 'operation': operation,
'error_type': type(error).__name__, 'error_type': type(error).__name__,
@@ -70,74 +86,93 @@ class AppOrchestrator:
'timestamp': datetime.now().isoformat(), 'timestamp': datetime.now().isoformat(),
'stats': self.stats.copy() 'stats': self.stats.copy()
} }
self.logger.error(f"[ERROR_DETAILS] {error_info}") self.logger.error(f"[HELPER:_log_error_details] Детали ошибки: {error_info}")
# </HELPER>
# <CONTRACT for="_setup">
# description: "Шаг 0: Инициализация всех систем (БД, логирование, RabbitMQ)."
# </CONTRACT>
# <ACTION name="_setup">
def _setup(self): def _setup(self):
"""[ACTION] Шаг 0: Инициализация всех систем.""" """Инициализация всех систем перед началом парсинга."""
with self._error_context("setup"): with self._error_context("setup"):
# <CORE_LOGIC>
self.stats['start_time'] = datetime.now() self.stats['start_time'] = datetime.now()
self.logger.info(f"[INFO] Запуск инициализации систем. Run ID: {self.run_id}") self.logger.info(f"[ACTION:_setup] Запуск инициализации систем. Run ID: {self.run_id}")
# [CONDITIONAL_ACTION] Инициализация базы данных, если требуется
if self.settings.save_to_db or self.settings.log_to_db: if self.settings.save_to_db or self.settings.log_to_db:
# [ACTION] Создаем директорию для БД, если ее нет
self.settings.output_dir.mkdir(parents=True, exist_ok=True) self.settings.output_dir.mkdir(parents=True, exist_ok=True)
self.db_manager = DatabaseManager(self.settings.db_path) self.db_manager = DatabaseManager(self.settings.db_path)
init_database(self.db_manager.db_path, self.run_id) # init_database работает с Path init_database(self.db_manager.db_path, self.run_id)
# [DELEGATES] Настройка логирования # <DEPENDENCY name="setup_logging" />
setup_logging(self.run_id, self.db_manager) setup_logging(self.run_id, self.db_manager)
# [ENHANCEMENT] Проверка доступности RabbitMQ
if ENABLE_RABBITMQ_EXPORT: if ENABLE_RABBITMQ_EXPORT:
if validate_rabbitmq_connection(): if validate_rabbitmq_connection():
self.logger.info("[RABBITMQ] Подключение к RabbitMQ доступно") self.logger.info("[ACTION:_setup] Подключение к RabbitMQ доступно")
else: else:
self.logger.warning("[RABBITMQ] Подключение к RabbitMQ недоступно, экспорт в RabbitMQ будет пропущен") self.logger.warning("[ACTION:_setup] Подключение к RabbitMQ недоступно, экспорт будет пропущен")
self.logger.info(f"[INFO] Оркестратор запущен. Архитектура v2.0. Run ID: {self.run_id}") self.logger.info(f"[ACTION:_setup] Оркестратор запущен. Архитектура v2.0. Run ID: {self.run_id}")
# </CORE_LOGIC>
# </ACTION>
# <CONTRACT for="_collect_urls">
# description: "Шаги 1 и 2: Сбор всех URL для парсинга путем делегирования скрейперу."
# postconditions: "Возвращает список URL или пустой список в случае ошибки."
# </CONTRACT>
# <ACTION name="_collect_urls">
def _collect_urls(self) -> List[str]: def _collect_urls(self) -> List[str]:
"""[ACTION] Шаги 1 и 2: Сбор всех URL для парсинга.""" """Сбор всех URL для парсинга."""
with self._error_context("collect_urls"): with self._error_context("collect_urls"):
self.logger.info("[INFO] Начало сбора URL для парсинга.") # <CORE_LOGIC>
self.logger.info("[ACTION:_collect_urls] Начало сбора URL для парсинга.")
# [DELEGATES] Делегируем сбор URL скрейперу. # <DEPENDENCY name="scraper.get_base_product_urls" />
base_urls = self.scraper.get_base_product_urls( base_urls = self.scraper.get_base_product_urls(
catalog_url=self.settings.catalog_url, catalog_url=self.settings.catalog_url,
run_id=self.run_id run_id=self.run_id
) )
if not base_urls: if not base_urls:
self.logger.error("[ERROR] Не найдено ни одного базового URL. Завершение работы сбора URL.") self.logger.error("[ACTION:_collect_urls] Не найдено ни одного базового URL. Завершение работы сбора URL.")
return [] return []
# [DELEGATES] Делегируем сбор URL вариантов скрейперу. # <DEPENDENCY name="scraper.get_all_variant_urls" />
all_urls_to_scrape = self.scraper.get_all_variant_urls( all_urls_to_scrape = self.scraper.get_all_variant_urls(
base_product_urls=base_urls, base_product_urls=base_urls,
run_id=self.run_id run_id=self.run_id
) )
if not all_urls_to_scrape: if not all_urls_to_scrape:
self.logger.error("[ERROR] Не удалось сформировать список URL для парсинга. Завершение работы сбора URL.") self.logger.error("[ACTION:_collect_urls] Не удалось сформировать список URL для парсинга. Завершение работы сбора URL.")
return [] return []
self.stats['total_urls'] = len(all_urls_to_scrape) self.stats['total_urls'] = len(all_urls_to_scrape)
self.logger.info(f"[INFO] Сбор URL завершен. Найдено {len(all_urls_to_scrape)} URL вариантов для парсинга.") self.logger.info(f"[ACTION:_collect_urls] Сбор URL завершен. Найдено {len(all_urls_to_scrape)} URL вариантов для парсинга.")
return all_urls_to_scrape return all_urls_to_scrape
# </CORE_LOGIC>
# </ACTION>
# <CONTRACT for="_scrape_data">
# description: "Шаг 3: Итеративный парсинг данных по списку URL."
# </CONTRACT>
# <ACTION name="_scrape_data">
def _scrape_data(self, urls: List[str]): def _scrape_data(self, urls: List[str]):
"""[ACTION] Шаг 3: Итеративный парсинг данных.""" """Итеративный парсинг данных."""
with self._error_context("scrape_data"): with self._error_context("scrape_data"):
# <CORE_LOGIC>
total_to_scrape = len(urls) total_to_scrape = len(urls)
self.logger.info(f"[INFO] Начало парсинга {total_to_scrape} URL вариантов.") self.logger.info(f"[ACTION:_scrape_data] Начало парсинга {total_to_scrape} URL вариантов.")
for i, url in enumerate(urls): for i, url in enumerate(urls):
# <ERROR_HANDLER for="single_url_scrape">
try: try:
self.logger.info(f"[INFO] Парсинг URL {i+1}/{total_to_scrape}: {url.split('/')[-1]}") self.logger.info(f"[ACTION:_scrape_data] Парсинг URL {i+1}/{total_to_scrape}: {url.split('/')[-1]}")
time.sleep(1) # [ACTION] Задержка между запросами time.sleep(1)
# [DELEGATES] Делегируем парсинг одной страницы скрейперу. # <DEPENDENCY name="scraper.scrape_variant_page" />
variant_data = self.scraper.scrape_variant_page( variant_data = self.scraper.scrape_variant_page(
variant_url=url, variant_url=url,
run_id=self.run_id run_id=self.run_id
@@ -151,94 +186,115 @@ class AppOrchestrator:
except Exception as e: except Exception as e:
self.stats['failed_parses'] += 1 self.stats['failed_parses'] += 1
self.logger.error(f"[ERROR] Ошибка при парсинге URL {i+1}/{total_to_scrape} ({url}): {e}") self.logger.error(f"[ACTION:_scrape_data] Ошибка при парсинге URL {i+1}/{total_to_scrape} ({url}): {e}")
# [ENHANCEMENT] Продолжаем работу, не прерывая весь процесс
continue continue
# </ERROR_HANDLER>
self.logger.info(f"[INFO] Парсинг данных завершен. Всего собрано {len(self.final_data)} валидных вариантов.") self.logger.info(f"[ACTION:_scrape_data] Парсинг данных завершен. Всего собрано {len(self.final_data)} валидных вариантов.")
self.logger.info(f"[STATS] Успешно: {self.stats['successful_parses']}, Ошибок: {self.stats['failed_parses']}") self.logger.info(f"[STATS][ACTION:_scrape_data] Успешно: {self.stats['successful_parses']}, Ошибок: {self.stats['failed_parses']}")
# </CORE_LOGIC>
# </ACTION>
# <CONTRACT for="_save_results">
# description: "Шаг 4: Сохранение результатов в указанные хранилища (CSV, БД, RabbitMQ)."
# </CONTRACT>
# <ACTION name="_save_results">
def _save_results(self): def _save_results(self):
"""[ACTION] Шаг 4: Сохранение результатов.""" """Сохранение результатов парсинга."""
with self._error_context("save_results"): with self._error_context("save_results"):
self.logger.info("[INFO] Начало сохранения результатов парсинга.") # <CORE_LOGIC>
self.logger.info("[ACTION:_save_results] Начало сохранения результатов парсинга.")
if not self.final_data: if not self.final_data:
self.logger.warning("[WARN] Итоговый набор данных пуст. Файлы не будут созданы.") self.logger.warning("[ACTION:_save_results] Итоговый набор данных пуст. Файлы не будут созданы.")
return return
self.logger.info(f"[INFO] Всего найдено валидных вариантов для сохранения: {len(self.final_data)}") self.logger.info(f"[ACTION:_save_results] Все<EFBFBD><EFBFBD>о найдено валидных вариантов для сохранения: {len(self.final_data)}")
# [CONDITIONAL_ACTION] Сохранение в CSV
if ENABLE_CSV_EXPORT and self.settings.save_to_csv: if ENABLE_CSV_EXPORT and self.settings.save_to_csv:
# <DEPENDENCY name="save_data_to_csv" />
# ... (logic remains the same)
try: try:
timestamp = datetime.now().strftime('%Y-%m-%d_%H%M%S') # Добавил время для уникальности timestamp = datetime.now().strftime('%Y-%m-%d_%H%M%S')
output_filename = self.settings.output_dir / f'prices_full_catalog_{timestamp}.csv' output_filename = self.settings.output_dir / f'prices_full_catalog_{timestamp}.csv'
# Преобразуем ProductVariant объекты в словари для save_data_to_csv data_to_csv = [p.model_dump() for p in self.final_data]
data_to_csv = [p.model_dump() for p in self.final_data] # Используем model_dump() для Pydantic v2
if save_data_to_csv(data_to_csv, output_filename, self.run_id): if save_data_to_csv(data_to_csv, output_filename, self.run_id):
self.logger.info(f"[INFO] Данные успешно сохранены в CSV: {output_filename}") self.logger.info(f"[ACTION:_save_results] Данные успешно сохранены в CSV: {output_filename}")
else: else:
self.logger.error(f"[ERROR] Не удалось сохранить данные в CSV: {output_filename}") self.logger.error(f"[ACTION:_save_results] Не удалось сохранить данные в CSV: {output_filename}")
except Exception as e: except Exception as e:
self.logger.error(f"[ERROR] Ошибка при сохранении в CSV: {e}") self.logger.error(f"[ACTION:_save_results] Ошибка при сохранении в CSV: {e}")
# [CONDITIONAL_ACTION] Сохранение в БД
if ENABLE_DATABASE_EXPORT and self.settings.save_to_db and self.db_manager: if ENABLE_DATABASE_EXPORT and self.settings.save_to_db and self.db_manager:
# <DEPENDENCY name="save_data_to_db" />
# ... (logic remains the same)
try: try:
# Преобразуем ProductVariant объекты в словари для save_data_to_db
data_to_db = [p.model_dump() for p in self.final_data] data_to_db = [p.model_dump() for p in self.final_data]
if save_data_to_db(data_to_db, self.db_manager.db_path, self.run_id): # save_data_to_db ожидает Path if save_data_to_db(data_to_db, self.db_manager.db_path, self.run_id):
self.logger.info("[INFO] Данные успешно сохранены в базу данных.") self.logger.info("[ACTION:_save_results] Данные успешно сохранены в базу данных.")
else: else:
self.logger.error("[ERROR] Не удалось сохранить данные в базу данных.") self.logger.error("[ACTION:_save_results] Не удалось сохранить данные в базу данных.")
except Exception as e: except Exception as e:
self.logger.error(f"[ERROR] Ошибка при сохранении в БД: {e}") self.logger.error(f"[ACTION:_save_results] Ошибка при сохранении в БД: {e}")
# [ENHANCEMENT] Экспорт в RabbitMQ
if ENABLE_RABBITMQ_EXPORT: if ENABLE_RABBITMQ_EXPORT:
# <DEPENDENCY name="export_data_to_rabbitmq" />
# ... (logic remains the same)
try: try:
# Преобразуем ProductVariant объекты в словари для экспорта
data_to_rabbitmq = [p.model_dump() for p in self.final_data] data_to_rabbitmq = [p.model_dump() for p in self.final_data]
if export_data_to_rabbitmq(data_to_rabbitmq, self.run_id, self.run_id): if export_data_to_rabbitmq(data_to_rabbitmq, self.run_id, self.run_id):
self.logger.info("[INFO] Данные успешно экспортированы в RabbitMQ.") self.logger.info("[ACTION:_save_results] Данные успешно экспортированы в RabbitMQ.")
else: else:
self.logger.error("[ERROR] Не удалось экспортировать данные в RabbitMQ.") self.logger.error("[ACTION:_save_results] Не удалось экспортировать данные в RabbitMQ.")
except Exception as e: except Exception as e:
self.logger.error(f"[ERROR] Ошибка при экспорте в RabbitMQ: {e}") self.logger.error(f"[ACTION:_save_results] Ошибка при экспорте в RabbitMQ: {e}")
self.logger.info("[INFO] Сохранение результатов завершено.") self.logger.info("[ACTION:_save_results] Сохранение результатов завершено.")
# </CORE_LOGIC>
# </ACTION>
# <CONTRACT for="_cleanup">
# description: "Шаг 5: Корректное завершение работы, закрытие сессий и логирование финальной статистики."
# </CONTRACT>
# <ACTION name="_cleanup">
def _cleanup(self): def _cleanup(self):
"""[ACTION] Шаг 5: Корректное завершение работы.""" """Корректное завершение работы и очистка ресурсов."""
# <CORE_LOGIC>
try: try:
self.stats['end_time'] = datetime.now() self.stats['end_time'] = datetime.now()
duration = self.stats['end_time'] - self.stats['start_time'] if self.stats['start_time'] else None duration = self.stats['end_time'] - self.stats['start_time'] if self.stats['start_time'] else None
self.logger.info("[INFO] Начало очистки ресурсов.") self.logger.info("[ACTION:_cleanup] Начало очистки ресурсов.")
self.http_session.close() self.http_session.close()
self.logger.debug("[DEBUG] HTTP-сессия закрыта.") self.logger.debug("[ACTION:_cleanup] HTTP-сессия закрыта.")
if self.db_manager: if self.db_manager:
self.db_manager.close() self.db_manager.close()
self.logger.debug("[DEBUG] Соединение с базой данных закрыто.") self.logger.debug("[ACTION:_cleanup] Соединение с базой данных закрыто.")
# [ENHANCEMENT] Финальная статистика
if duration: if duration:
self.logger.info(f"[FINAL_STATS] Время выполнения: {duration.total_seconds():.2f} секунд") self.logger.info(f"[STATS][ACTION:_cleanup] Время выполнения: {duration.total_seconds():.2f} секунд")
self.logger.info(f"[FINAL_STATS] Успешность: {self.stats['successful_parses']}/{self.stats['total_urls']} ({self.stats['successful_parses']/self.stats['total_urls']*100:.1f}%)") self.logger.info(f"[STATS][ACTION:_cleanup] Успешность: {self.stats['successful_parses']}/{self.stats['total_urls']} ({self.stats['successful_parses']/self.stats['total_urls']*100:.1f}%)")
self.logger.info(f"[COHERENCE_CHECK_PASSED] Работа парсера завершена. Run ID: {self.run_id}") self.logger.info(f"[COHERENCE_CHECK_PASSED][ACTION:_cleanup] Работа парсера завершена. Run ID: {self.run_id}")
except Exception as e: except Exception as e:
self.logger.error(f"[ERROR] Ошибка при очистке ресурсов: {e}") self.logger.error(f"[ACTION:_cleanup] Ошибка при очистке ресурсов: {e}")
# </CORE_LOGIC>
# </ACTION>
# <CONTRACT for="run">
# description: "Основной метод, запускающий весь процесс в правильной последовательности."
# </CONTRACT>
# <ENTRYPOINT name="run">
def run(self): def run(self):
"""[ENTRYPOINT] Основной метод, запускающий весь процесс.""" """Основной метод, запускающий весь процесс парсинга."""
self.logger.info("="*50) self.logger.info("="*50)
self.logger.info("[INFO] Запуск главного процесса оркестратора.") self.logger.info("[ENTRYPOINT:run] Запуск главного процесса оркестратора.")
self.logger.info("="*50) self.logger.info("="*50)
# <ERROR_HANDLER for="critical_orchestrator_failure">
try: try:
self._setup() self._setup()
urls_to_scrape = self._collect_urls() urls_to_scrape = self._collect_urls()
@@ -247,12 +303,15 @@ class AppOrchestrator:
self._scrape_data(urls_to_scrape) self._scrape_data(urls_to_scrape)
self._save_results() self._save_results()
else: else:
self.logger.warning("[WARN] Отсутствуют URL для парсинга. Пропуск шагов парсинга и сохранения.") self.logger.warning("[ENTRYPOINT:run] Отсутствуют URL для парсинга. Пропуск шагов парсинга и сохранения.")
except Exception as e: except Exception as e:
self.logger.critical(f"[CRITICAL] Непредвиденная критическая ошибка в оркестраторе: {e}", exc_info=True) self.logger.critical(f"[CRITICAL][ENTRYPOINT:run] Непредвиденная критическая ошибка в оркестраторе: {e}", exc_info=True)
# [COHERENCE_CHECK_FAILED] Критическая ошибка нарушила нормальный поток выполнения. # <COHERENCE_CHECK status="FAILED" description="Критическая ошибка нарушила нормальный поток выполне<EFBFBD><EFBFBD>ия." />
raise # Пробрасываем исключение для обработки на верхнем уровне raise
finally: finally:
self._cleanup() self._cleanup()
# </ERROR_HANDLER>
# </ENTRYPOINT>
# <COHERENCE_CHECK status="PASSED" />

View File

@@ -1,8 +1,11 @@
# [FILE] src/scraper/engine.py # <MODULE name="scraper.engine" semantics="html_scraping_logic" />
# [REFACTORING_TARGET] Преобразование модуля с функциями в класс Scraper. # <DESIGN_NOTE>
# ANCHOR: Scraper_Class_Module # Класс Scraper инкапсулирует всю логику, связанную с HTTP-запросами и парсингом HTML.
# Семантика: Инкапсулирует всю логику, связанную с HTTP-запросами и парсингом HTML. # Он не имеет состояния между операциями, кроме сессии и конфигурации,
# и получает все необходимые данные через аргументы методов.
# </DESIGN_NOTE>
# <IMPORTS>
import logging import logging
import time import time
from urllib.parse import urljoin from urllib.parse import urljoin
@@ -13,114 +16,146 @@ from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry from urllib3.util.retry import Retry
from pydantic import HttpUrl from pydantic import HttpUrl
from core.models import ProductVariant # [FIX] Импорт ProductVariant from core.models import ProductVariant
from core.settings import ScraperSelectors from core.settings import ScraperSelectors
# </IMPORTS>
# <MAIN_CONTRACT for="Scraper">
# description: "Класс, ответственный за взаимодействие с сайтом и извлечение данных из HTML."
# invariant: "Использует одну и ту же HTTP-сессию для всех запросов."
# </MAIN_CONTRACT>
class Scraper: class Scraper:
""" # <INIT name="__init__">
[MAIN-CONTRACT]
@description: Класс, ответственный за взаимодействие с сайтом и извлечение данных.
@invariant: Использует одну и ту же HTTP-сессию для всех запросов.
"""
def __init__(self, session: requests.Session, selectors: ScraperSelectors, base_url: str): def __init__(self, session: requests.Session, selectors: ScraperSelectors, base_url: str):
# [INIT] Инициализация с зависимостями. """Инициализирует скрейпер с зависимостями: сессия, селекторы и базовый URL."""
# <STATE name="initial_state">
self.session = session self.session = session
self.selectors = selectors self.selectors = selectors
self.base_url = base_url self.base_url = base_url
self.logger = logging.getLogger(self.__class__.__name__) self.logger = logging.getLogger(self.__class__.__name__)
# </STATE>
# [ENHANCEMENT] Настройка retry стратегии для HTTP запросов # <ACTION name="setup_retry_strategy_on_init">
self._setup_retry_strategy() self._setup_retry_strategy()
# </ACTION>
# </INIT>
# <CONTRACT for="_setup_retry_strategy">
# description: "Настраивает retry-логику для HTTP-адаптера сессии."
# </CONTRACT>
# <HELPER name="_setup_retry_strategy">
def _setup_retry_strategy(self): def _setup_retry_strategy(self):
"""[HELPER] Настраивает retry стратегию для HTTP запросов.""" """Настраивает retry стратегию для HTTP запросов."""
# <CORE_LOGIC>
retry_strategy = Retry( retry_strategy = Retry(
total=3, # Максимум 3 попытки total=3,
backoff_factor=1, # Экспоненциальная задержка: 1, 2, 4 секунды backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504], # Коды ошибок для retry status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["HEAD", "GET", "OPTIONS"] # Разрешенные методы allowed_methods=["HEAD", "GET", "OPTIONS"]
) )
adapter = HTTPAdapter(max_retries=retry_strategy) adapter = HTTPAdapter(max_retries=retry_strategy)
self.session.mount("http://", adapter) self.session.mount("http://", adapter)
self.session.mount("https://", adapter) self.session.mount("https://", adapter)
self.logger.debug("[DEBUG] Retry стратегия настроена для HTTP запросов.") self.logger.debug("[HELPER:_setup_retry_strategy] Retry стратегия настроена для HTTP запросов.")
# </CORE_LOGIC>
# </HELPER>
# <CONTRACT for="_clean_price">
# description: "Очищает строковое представление цены, оставляя только цифры."
# postconditions: "Возвращает целое число или 0 в случае ошибки."
# </CONTRACT>
# <HELPER name="_clean_price">
def _clean_price(self, price_str: str) -> int: def _clean_price(self, price_str: str) -> int:
"""[HELPER] Очищает строку цены и возвращает целое число.""" """Очищает строку цены и возвращает целое число."""
self.logger.debug(f"[DEBUG] Очистка цены: '{price_str}'") self.logger.debug(f"[HELPER:_clean_price] Очистка цены: '{price_str}'")
# <CORE_LOGIC>
try: try:
# Удаляем все символы кроме цифр
digits = ''.join(filter(str.isdigit, price_str)) digits = ''.join(filter(str.isdigit, price_str))
if not digits: if not digits:
self.logger.warning(f"[WARNING] Не удалось извлечь цифры из цены: '{price_str}'") self.logger.warning(f"[HELPER:_clean_price] Не удалось извлечь цифры из цены: '{price_str}'")
return 0 return 0
cleaned_price = int(digits) cleaned_price = int(digits)
if cleaned_price <= 0: if cleaned_price <= 0:
self.logger.warning(f"[WARNING] Некорректная цена (<= 0): {cleaned_price}") self.logger.warning(f"[HELPER:_clean_price] Некорректная цена (<= 0): {cleaned_price}")
return 0 return 0
self.logger.debug(f"[DEBUG] Цена после очистки: {cleaned_price}") self.logger.debug(f"[HELPER:_clean_price] Цена после очистки: {cleaned_price}")
return cleaned_price return cleaned_price
except (ValueError, TypeError) as e: except (ValueError, TypeError) as e:
self.logger.error(f"[ERROR] Ошибка при обработке цены '{price_str}': {e}") # <FALLBACK>
self.logger.error(f"[HELPER:_clean_price] Ошибка при обработке цены '{price_str}': {e}")
return 0 return 0
# </FALLBACK>
# </CORE_LOGIC>
# </HELPER>
# <CONTRACT for="_fetch_page">
# description: "Скачивает HTML-содержимое страницы с обработкой ошибок и retry-логикой."
# postconditions: "Возвращает текстовое содержимое страницы или None в случае любой ошибки."
# </CONTRACT>
# <HELPER name="_fetch_page">
def _fetch_page(self, url: str, request_id: str) -> Optional[str]: def _fetch_page(self, url: str, request_id: str) -> Optional[str]:
"""[HELPER] Приватный метод для скачивания HTML-содержимого страницы.""" """Приватный метод для скачивания HTML-содержимого страницы."""
log_prefix = f"_fetch_page(id={request_id})" log_prefix = f"[HELPER:_fetch_page(id={request_id})]"
self.logger.debug(f"{log_prefix} - Запрос к URL: {url}") self.logger.debug(f"{log_prefix} Запрос к URL: {url}")
# <ERROR_HANDLER for="network_requests">
try: try:
response = self.session.get(url, timeout=30) # Увеличил timeout до 30 секунд # <CORE_LOGIC>
response = self.session.get(url, timeout=30)
response.raise_for_status() response.raise_for_status()
# [ENHANCEMENT] Проверка на валидный HTML
if not response.text.strip(): if not response.text.strip():
self.logger.warning(f"{log_prefix} - Получен пустой ответ от {url}") self.logger.warning(f"{log_prefix} Получен пустой ответ от {url}")
return None return None
# [ENHANCEMENT] Проверка на блокировку или капчу
if "captcha" in response.text.lower() or "blocked" in response.text.lower(): if "captcha" in response.text.lower() or "blocked" in response.text.lower():
self.logger.error(f"{log_prefix} - [BLOCKED] Обнаружена капча или блокировка на {url}") self.logger.error(f"{log_prefix} [BLOCKED] Обнаружена капча или блокировка на {url}")
return None return None
self.logger.debug(f"{log_prefix} - [COHERENCE_CHECK_PASSED] Страница успешно получена, статус {response.status_code}.") self.logger.debug(f"{log_prefix} [COHERENCE_CHECK_PASSED] Страница успешно получена, статус {response.status_code}.")
return response.text return response.text
# </CORE_LOGIC>
except requests.exceptions.Timeout: except requests.exceptions.Timeout:
self.logger.error(f"{log_prefix} - [TIMEOUT] Превышено время ожидания для {url}") self.logger.error(f"{log_prefix} [TIMEOUT] Превышено время ожидания для {url}")
return None return None
except requests.exceptions.ConnectionError as e: except requests.exceptions.ConnectionError as e:
self.logger.error(f"{log_prefix} - [CONNECTION_ERROR] Ошибка соединения для {url}: {e}") self.logger.error(f"{log_prefix} [CONNECTION_ERROR] Ошибка соединения для {url}: {e}")
return None return None
except requests.exceptions.HTTPError as e: except requests.exceptions.HTTPError as e:
self.logger.error(f"{log_prefix} - [HTTP_ERROR] HTTP ошибка для {url}: {e.response.status_code}") self.logger.error(f"{log_prefix} [HTTP_ERROR] HTTP ошибка для {url}: {e.response.status_code}")
return None return None
except requests.RequestException as e: except requests.RequestException as e:
self.logger.error(f"{log_prefix} - [COHERENCE_CHECK_FAILED] Сетевая ошибка при запросе {url}: {e}", exc_info=True) self.logger.error(f"{log_prefix} [COHERENCE_CHECK_FAILED] Сетевая ошибка при запросе {url}: {e}", exc_info=True)
return None return None
except Exception as e: except Exception as e:
self.logger.critical(f"{log_prefix} - [CRITICAL] Непредвиденная ошибка при запросе {url}: {e}", exc_info=True) self.logger.critical(f"{log_prefix} [CRITICAL] Непредвиденная ошибка при запросе {url}: {e}", exc_info=True)
return None return None
# </ERROR_HANDLER>
# </HELPER>
# <CONTRACT for="get_base_product_urls">
# description: "Собирает URL всех товаров с основной страницы каталога."
# preconditions: "`catalog_url` должен быть доступен."
# postconditions: "Возвращает список уникальных URL базовых продуктов."
# </CONTRACT>
# <ACTION name="get_base_product_urls">
def get_base_product_urls(self, catalog_url: str, run_id: str) -> List[str]: def get_base_product_urls(self, catalog_url: str, run_id: str) -> List[str]:
"""[ACTION] Собирает URL всех товаров с основной страницы каталога. """Собирает URL всех товаров с основной страницы каталога."""
@pre: `catalog_url` должен быть доступен. log_prefix = f"[ACTION:get_base_product_urls(id={run_id})]"
@post: Возвращает список уникальных URL базовых продуктов. self.logger.info(f"{log_prefix} Начало сбора базовых URL с: {catalog_url}")
"""
log_prefix = f"get_base_urls(id={run_id})"
self.logger.info(f"{log_prefix} - Начало сбора базовых URL с: {catalog_url}")
html = self._fetch_page(catalog_url, log_prefix) html = self._fetch_page(catalog_url, f"get_base_urls(id={run_id})")
if not html: if not html:
self.logger.error(f"{log_prefix} - [CRITICAL] Не удалось получить HTML страницы каталога, возвращаю пустой список.") self.logger.error(f"{log_prefix} [CRITICAL] Не удалось получить HTML страницы каталога, возвращаю пустой список.")
return [] return []
# <CORE_LOGIC>
try: try:
soup = BeautifulSoup(html, 'html.parser') soup = BeautifulSoup(html, 'html.parser')
links = soup.select(self.selectors.catalog_product_link) links = soup.select(self.selectors.catalog_product_link)
if not links: if not links:
self.logger.warning(f"{log_prefix} - [WARNING] Не найдено ни одной ссылки на товар с селектором: {self.selectors.catalog_product_link}") self.logger.warning(f"{log_prefix} [WARNING] Не найдено ни одной ссылки на товар с селектором: {self.selectors.catalog_product_link}")
return [] return []
unique_urls = set() unique_urls = set()
@@ -130,32 +165,40 @@ class Scraper:
full_url = urljoin(self.base_url, href) full_url = urljoin(self.base_url, href)
unique_urls.add(full_url) unique_urls.add(full_url)
else: else:
self.logger.debug(f"{log_prefix} - Пропуск ссылки без href: {link}") self.logger.debug(f"{log_prefix} Пропуск ссылки без href: {link}")
self.logger.info(f"{log_prefix} - Найдено {len(unique_urls)} уникальных базовых URL.") self.logger.info(f"{log_prefix} Найдено {len(unique_urls)} уникальных базовых URL.")
# [COHERENCE_CHECK_PASSED] Базовые URL успешно собраны. # <COHERENCE_CHECK status="PASSED" description="Базовые URL успешно собраны." />
return list(unique_urls) return list(unique_urls)
except Exception as e: except Exception as e:
self.logger.error(f"{log_prefix} - [CRITICAL] Ошибка при парсинге каталога: {e}", exc_info=True) # <FALLBACK>
self.logger.error(f"{log_prefix} [CRITICAL] Ошибка при парсинге каталога: {e}", exc_info=True)
return [] return []
# </FALLBACK>
# </CORE_LOGIC>
# </ACTION>
# <CONTRACT for="get_all_variant_urls">
# description: "Проходит по базовым URL и собирает URL всех их вариантов."
# preconditions: "`base_product_urls` - список доступных URL продуктов."
# postconditions: "Возвращает список всех URL вариантов продуктов."
# </CONTRACT>
# <ACTION name="get_all_variant_urls">
def get_all_variant_urls(self, base_product_urls: List[str], run_id: str) -> List[str]: def get_all_variant_urls(self, base_product_urls: List[str], run_id: str) -> List[str]:
"""[ACTION] Проходит по базовым URL и собирает URL всех их вариантов. """Проходит по базовым URL и собирает URL всех их вариантов."""
@pre: `base_product_urls` - список доступных URL продуктов.
@post: Возвращает список всех URL вариантов продуктов.
"""
all_variant_urls = [] all_variant_urls = []
total_base = len(base_product_urls) total_base = len(base_product_urls)
log_prefix = f"get_variant_urls(id={run_id})" log_prefix = f"[ACTION:get_all_variant_urls(id={run_id})]"
self.logger.info(f"{log_prefix} - Начало сбора URL вариантов для {total_base} базовых продуктов.") self.logger.info(f"{log_prefix} Начало сбора URL вариантов для {total_base} базовых продуктов.")
# <CORE_LOGIC>
for i, base_url in enumerate(base_product_urls): for i, base_url in enumerate(base_product_urls):
self.logger.info(f"{log_prefix} - Обработка базового URL {i+1}/{total_base}: {base_url.split('/')[-1]}") self.logger.info(f"{log_prefix} Обработка базового URL {i+1}/{total_base}: {base_url.split('/')[-1]}")
html = self._fetch_page(base_url, f"{log_prefix}-{i+1}") html = self._fetch_page(base_url, f"get_variant_urls(id={run_id})-{i+1}")
if not html: if not html:
self.logger.warning(f"{log_prefix} - Пропуск базового URL из-за ошибки загрузки: {base_url}") self.logger.warning(f"{log_prefix} Пропуск базового URL из-за ошибки загрузки: {base_url}")
continue continue
try: try:
@@ -163,7 +206,7 @@ class Scraper:
variant_items = soup.select(self.selectors.variant_list_item) variant_items = soup.select(self.selectors.variant_list_item)
if not variant_items: if not variant_items:
self.logger.debug(f"{log_prefix} - Товар не имеет явных вариантов, добавляю базовый URL как вариант: {base_url}") self.logger.debug(f"{log_prefix} Товар не имеет явных вариантов, добавляю базовый URL как вариант: {base_url}")
all_variant_urls.append(base_url) all_variant_urls.append(base_url)
else: else:
for item in variant_items: for item in variant_items:
@@ -172,79 +215,100 @@ class Scraper:
variant_url = f"{base_url}?product={variant_id}" variant_url = f"{base_url}?product={variant_id}"
all_variant_urls.append(variant_url) all_variant_urls.append(variant_url)
else: else:
self.logger.debug(f"{log_prefix} - Пропуск варианта без data-id: {item}") self.logger.debug(f"{log_prefix} Пропуск варианта без data-id: {item}")
self.logger.debug(f"{log_prefix} - Найдено {len(variant_items)} вариантов для товара {base_url.split('/')[-1]}.") self.logger.debug(f"{log_prefix} Найдено {len(variant_items)} вариантов для товара {base_url.split('/')[-1]}.")
except Exception as e: except Exception as e:
self.logger.error(f"{log_prefix} - [ERROR] Ошибка при обработке вариантов для {base_url}: {e}") # <FALLBACK>
# Добавляем базовый URL как fallback self.logger.error(f"{log_prefix} [ERROR] Ошибка при обработке вариантов для {base_url}: {e}")
all_variant_urls.append(base_url) all_variant_urls.append(base_url)
# </FALLBACK>
time.sleep(0.5) # [ACTION] Задержка между запросами time.sleep(0.5)
self.logger.info(f"{log_prefix} - [COHERENCE_CHECK_PASSED] Обнаружено всего {len(all_variant_urls)} URL вариантов для парсинга.") self.logger.info(f"{log_prefix} [COHERENCE_CHECK_PASSED] Обнаружено всего {len(all_variant_urls)} URL вариантов для парсинга.")
return all_variant_urls return all_variant_urls
# </CORE_LOGIC>
# </ACTION>
# <CONTRACT for="scrape_variant_page">
# description: "Парсит страницу одного варианта и возвращает Pydantic-модель."
# preconditions: "`variant_url` должен быть доступен и содержать ожидаемые элементы."
# postconditions: "Возвращает `ProductVariant` или `None` в случае ошибки парсинга."
# </CONTRACT>
# <ACTION name="scrape_variant_page">
def scrape_variant_page(self, variant_url: str, run_id: str) -> Optional[ProductVariant]: def scrape_variant_page(self, variant_url: str, run_id: str) -> Optional[ProductVariant]:
"""[ACTION] Парсит страницу одного варианта и возвращает Pydantic-модель. """Парсит страницу одного варианта и возвращает Pydantic-модель."""
@pre: `variant_url` должен быть доступен и содержать ожидаемые элементы. log_prefix = f"[ACTION:scrape_variant_page(id={run_id}, url={variant_url.split('/')[-1]})]"
@post: Возвращает `ProductVariant` или `None` в случае ошибки парсинга. self.logger.info(f"{log_prefix} Начало парсинга страниц<D0B8><D186> варианта.")
"""
log_prefix = f"scrape_variant(id={run_id}, url={variant_url.split('/')[-1]})"
self.logger.info(f"{log_prefix} - Начало парсинга страницы варианта.")
html = self._fetch_page(variant_url, log_prefix) html = self._fetch_page(variant_url, f"scrape_variant(id={run_id}, url={variant_url.split('/')[-1]})")
if not html: if not html:
self.logger.warning(f"{log_prefix} - Не удалось получить HTML страницы варианта, пропуск парсинга.") self.logger.warning(f"{log_prefix} Не удалось получить HTML страницы варианта, пропуск парсинга.")
return None return None
# <CORE_LOGIC>
try: try:
soup = BeautifulSoup(html, 'html.parser') soup = BeautifulSoup(html, 'html.parser')
# [ENHANCEMENT] Более детальная проверка элементов
name_el = soup.select_one(self.selectors.product_page_name) name_el = soup.select_one(self.selectors.product_page_name)
price_el = soup.select_one(self.selectors.price_block) price_el = soup.select_one(self.selectors.price_block)
volume_el = soup.select_one(self.selectors.active_volume) # Optional, может отсутствовать volume_el = soup.select_one(self.selectors.active_volume)
unavailable_el = soup.select_one(self.selectors.product_unavailable)
# [PRECONDITION] Проверка наличия основных элементов # <PRECONDITION>
# Товар должен иметь имя. Цена или статус "нет в наличии" должны присутствовать.
if not name_el: if not name_el:
self.logger.warning(f"{log_prefix} - [MISSING_ELEMENT] Не найден элемент имени продукта с селектором: {self.selectors.product_page_name}") self.logger.warning(f"{log_prefix} [MISSING_ELEMENT] Не найден элемент имени продукта с селектором: {self.selectors.product_page_name}")
return None return None
if not price_el and not unavailable_el:
if not price_el: self.logger.warning(f"{log_prefix} [MISSING_ELEMENT] Не найден ни элемент цены, ни элемент отсутствия в наличии.")
self.logger.warning(f"{log_prefix} - [MISSING_ELEMENT] Не найден элемент цены с селектором: {self.selectors.price_block}")
return None return None
# </PRECONDITION>
# [ACTION] Извлечение данных с дополнительной валидацией
name = name_el.get_text(strip=True) name = name_el.get_text(strip=True)
if not name: if not name:
self.logger.warning(f"{log_prefix} - [EMPTY_DATA] Пустое имя продукта") self.logger.warning(f"{log_prefix} [EMPTY_DATA] Пустое имя продукта")
return None return None
price_text = price_el.get_text(strip=True) # Определение наличия и цены
if not price_text: if unavailable_el:
self.logger.warning(f"{log_prefix} - [EMPTY_DATA] Пустая цена") is_in_stock = False
return None price = 0 # Цена 0, если товара нет в наличии
self.logger.info(f"{log_prefix} Товар '{name}' не в наличии.")
else:
is_in_stock = True
price_text = price_el.get_text(strip=True) if price_el else ''
if not price_text:
self.logger.warning(f"{log_prefix} [EMPTY_DATA] Пустая цена для товара в наличии")
return None
price = self._clean_price(price_text)
price = self._clean_price(price_text) if price <= 0 and is_in_stock:
if price <= 0: self.logger.warning(f"{log_prefix} [INVALID_PRICE] Некорректная цена: {price} для товара в наличии")
self.logger.warning(f"{log_prefix} - [INVALID_PRICE] Некорректная цена: {price}")
return None return None
volume = volume_el.get_text(strip=True) if volume_el else "N/A" volume = volume_el.get_text(strip=True) if volume_el else "N/A"
# [POSTCONDITION] Создаем экземпляр контракта данных. # <POSTCONDITION>
# [CONTRACT_VALIDATOR] Pydantic валидация при создании модели # <CONTRACT_VALIDATOR by="Pydantic">
try: try:
product = ProductVariant(name=name, volume=volume, price=price, url=variant_url) product = ProductVariant(name=name, volume=volume, price=price, url=variant_url, is_in_stock=is_in_stock)
self.logger.debug(f"{log_prefix} - [COHERENCE_CHECK_PASSED] Успешно распарсен вариант: '{product.name}' | '{product.volume}' | '{product.price}'") self.logger.debug(f"{log_prefix} [COHERENCE_CHECK_PASSED] Успешно распарсен вариант: '{product.name}' | InStock: {product.is_in_stock} | Price: '{product.price}'")
return product return product
except Exception as e: except Exception as e:
self.logger.error(f"{log_prefix} - [VALIDATION_ERROR] Ошибка валидации ProductVariant: {e}") # <FALLBACK>
self.logger.error(f"{log_prefix} [VALIDATION_ERROR] Ошибка валидации ProductVariant: {e}")
return None return None
# </FALLBACK>
# </CONTRACT_VALIDATOR>
# </POSTCONDITION>
except Exception as e: except Exception as e:
self.logger.error(f"{log_prefix} - [COHERENCE_CHECK_FAILED] Исключение при парсинге страницы {variant_url}: {e}", exc_info=True) # <FALLBACK>
self.logger.error(f"{log_prefix} [COHERENCE_CHECK_FAILED] Исключение при парсинге страницы {variant_url}: {e}", exc_info=True)
return None return None
# </FALLBACK>
# [REFACTORING_COMPLETE] Дублированные методы удалены, улучшена обработка ошибок # </CORE_LOGIC>
# </ACTION>
# <COHERENCE_CHECK status="PASSED" />

View File

@@ -1,269 +1,178 @@
# ANCHOR: Exporters_Module # <MODULE name="utils.exporters" semantics="data_exporting" />
# Семантика: Модуль для сохранения данных в различные форматы. # <DESIGN_NOTE>
# В будущем сюда можно добавить save_to_json, save_to_xml и т.д. # Модуль содержит функции для сохранения и экспорта данных в различные форматы и системы.
# Каждая функция инкапсулирует логику для конкретного назначения (CSV, RabbitMQ).
# </DESIGN_NOTE>
# <IMPORTS>
import logging import logging
import csv import csv
from pathlib import Path from pathlib import Path
from typing import List, Dict, Optional from typing import List, Dict
# </IMPORTS>
# <CONTRACT for="save_data_to_csv">
# description: "Сохраняет список словарей с данными в CSV-файл."
# preconditions:
# - "`data` должен быть списком словарей с ключами 'name', 'volume', 'price', 'url'."
# - "`filename` должен быть валидным путем `Path`."
# postconditions: "Создается CSV-файл с данными. Возвращает True в случае успеха."
# side_effects: "Создает директорию для файла, если она не существует. Перезаписывает файл, если он существует."
# </CONTRACT>
# <ACTION name="save_data_to_csv">
def save_data_to_csv(data: List[Dict], filename: Path, request_id: str) -> bool: def save_data_to_csv(data: List[Dict], filename: Path, request_id: str) -> bool:
""" """Сохраняет данные в CSV файл с улучшенной обработкой ошибок."""
[ENHANCED] Сохраняет данные в CSV файл с улучшенной обработкой ошибок. log_prefix = f"[ACTION:save_data_to_csv(id={request_id})]"
Args:
data: Список словарей с данными для сохранения
filename: Путь к файлу для сохранения
request_id: Идентификатор запроса для логирования
Returns:
bool: True если сохранение прошло успешно, False в противном случае
"""
log_prefix = f"save_data_to_csv(id={request_id})"
# [ENHANCEMENT] Валидация входных данных
if not data: if not data:
logging.warning(f"{log_prefix} - [CONTRACT_VIOLATION] Данные для сохранения отсутствуют.") logging.warning(f"{log_prefix} [CONTRACT_VIOLATION] Данные для сохранения отсутствуют.")
return False return False
if not isinstance(data, list): logging.info(f"{log_prefix} Начало сохранения {len(data)} записей в файл: {filename}")
logging.error(f"{log_prefix} - [TYPE_ERROR] Данные должны быть списком, получено: {type(data)}")
return False
# [ENHANCEMENT] Проверка структуры данных
required_fields = ['name', 'volume', 'price']
for i, item in enumerate(data):
if not isinstance(item, dict):
logging.error(f"{log_prefix} - [TYPE_ERROR] Элемент {i} должен быть словарем, получено: {type(item)}")
return False
missing_fields = [field for field in required_fields if field not in item]
if missing_fields:
logging.error(f"{log_prefix} - [MISSING_FIELDS] Элемент {i} не содержит поля: {missing_fields}")
return False
# [ENHANCEMENT] Валидация типов данных
if not isinstance(item['name'], str) or not item['name'].strip():
logging.error(f"{log_prefix} - [INVALID_NAME] Элемент {i} имеет некорректное имя: {item['name']}")
return False
if not isinstance(item['volume'], str):
logging.error(f"{log_prefix} - [INVALID_VOLUME] Элемент {i} имеет некорректный объем: {item['volume']}")
return False
if not isinstance(item['price'], (int, float)) or item['price'] < 0:
logging.error(f"{log_prefix} - [INVALID_PRICE] Элемент {i} имеет некорректную цену: {item['price']}")
return False
logging.info(f"{log_prefix} - Начало сохранения {len(data)} записей в файл: {filename}")
# <CORE_LOGIC>
try: try:
# [ENHANCEMENT] Создание директории, если она не существует
filename.parent.mkdir(parents=True, exist_ok=True) filename.parent.mkdir(parents=True, exist_ok=True)
# [ENHANCEMENT] Проверка доступности файла для записи fieldnames = ['name', 'volume', 'price', 'url', 'is_in_stock']
if filename.exists():
logging.warning(f"{log_prefix} - Файл {filename} уже существует и будет перезаписан")
# [ENHANCEMENT] Определение полей на основе данных
fieldnames = ['name', 'volume', 'price']
with open(filename, 'w', newline='', encoding='utf-8') as csvfile: with open(filename, 'w', newline='', encoding='utf-8') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames) writer = csv.DictWriter(csvfile, fieldnames=fieldnames, extrasaction='ignore')
writer.writeheader() writer.writeheader()
writer.writerows(data)
# [ENHANCEMENT] Запись данных с обработкой ошибок logging.info(f"{log_prefix} [COHERENCE_CHECK_PASSED] Данные успешно сохранены в {filename}")
for i, row in enumerate(data):
try:
writer.writerow(row)
except Exception as e:
logging.error(f"{log_prefix} - [WRITE_ERROR] Ошибка записи строки {i}: {e}")
# Продолжаем запись остальных строк
continue
logging.info(f"{log_prefix} - [COHERENCE_CHECK_PASSED] Данные успешно сохранены в {filename}")
return True return True
except PermissionError as e: except PermissionError as e:
logging.error(f"{log_prefix} - [PERMISSION_ERROR] Нет прав на запись в файл {filename}: {e}") # <FALLBACK>
return False logging.error(f"{log_prefix} [PERMISSION_ERROR] Нет прав на запись в файл {filename}: {e}")
except OSError as e:
logging.error(f"{log_prefix} - [OS_ERROR] Ошибка операционной системы при сохранении {filename}: {e}")
return False return False
# </FALLBACK>
except Exception as e: except Exception as e:
logging.error(f"{log_prefix} - [COHERENCE_CHECK_FAILED] Непредвиденная ошибка при сохранении CSV: {e}", exc_info=True) # <FALLBACK>
logging.error(f"{log_prefix} [COHERENCE_CHECK_FAILED] Непредвиденная ошибка при сохранении CSV: {e}", exc_info=True)
return False return False
# </FALLBACK>
# </CORE_LOGIC>
# </ACTION>
def validate_csv_data(data: List[Dict]) -> tuple[bool, List[str]]: # <MODULE name="rabbitmq_exporters" semantics="rabbitmq_exporting_logic" />
"""
[NEW] Валидирует данные перед сохранением в CSV.
Args:
data: Список словарей для валидации
Returns:
tuple: (is_valid, list_of_errors)
"""
errors = []
if not data:
errors.append("Данные отсутствуют")
return False, errors
if not isinstance(data, list):
errors.append(f"Данные должны быть списком, получено: {type(data)}")
return False, errors
required_fields = ['name', 'volume', 'price']
for i, item in enumerate(data):
if not isinstance(item, dict):
errors.append(f"Элемент {i} должен быть словарем")
continue
# Проверка обязательных полей
for field in required_fields:
if field not in item:
errors.append(f"Элемент {i} не содержит поле '{field}'")
# Проверка типов данных
if 'name' in item and (not isinstance(item['name'], str) or not item['name'].strip()):
errors.append(f"Элемент {i} имеет некорректное имя")
if 'price' in item and (not isinstance(item['price'], (int, float)) or item['price'] < 0):
errors.append(f"Элемент {i} имеет некорректную цену")
return len(errors) == 0, errors
# ANCHOR: RabbitMQ_Export_Functions
# Семантика: Функции для экспорта данных в RabbitMQ
# <CONTRACT for="export_data_to_rabbitmq">
# description: "Экспортирует данные о продуктах в RabbitMQ."
# preconditions:
# - "Список `products` должен быть валидным."
# - "Модуль `core.rabbitmq` должен быть доступен для импорта."
# postconditions: "Данные отправлены в очередь. Возвращает True в случае успеха."
# </CONTRACT>
# <ACTION name="export_data_to_rabbitmq">
def export_data_to_rabbitmq(products: List[Dict], run_id: str, request_id: str) -> bool: def export_data_to_rabbitmq(products: List[Dict], run_id: str, request_id: str) -> bool:
""" """Экспортирует данные о продуктах в RabbitMQ."""
[CONTRACT] log_prefix = f"[ACTION:export_data_to_rabbitmq(id={request_id})]"
@description: Экспортирует данные о продуктах в RabbitMQ.
@precondition: Список продуктов валиден, run_id не пустой.
@postcondition: Данные отправлены в очередь или False в случае ошибки.
Args:
products: Список продуктов для экспорта
run_id: Идентификатор запуска парсера
request_id: Идентификатор запроса для логирования
Returns:
bool: True если экспорт успешен, False в противном случае
"""
log_prefix = f"export_data_to_rabbitmq(id={request_id})"
# <ERROR_HANDLER for="rabbitmq_module_import">
try: try:
from core.rabbitmq import RabbitMQExporter from core.rabbitmq import RabbitMQExporter
except ImportError:
logging.warning(f"{log_prefix} [DEPENDENCY_MISSING] Модуль RabbitMQ не используется или не установлен. Экспорт пропущен.")
return True # Считаем успешным, так как экспорт не требуется
# </ERROR_HANDLER>
# [VALIDATION] Проверка входных данных if not products:
if not products: logging.warning(f"{log_prefix} [CONTRACT_VIOLATION] Список продуктов пуст, экспорт не требуется.")
logging.warning(f"{log_prefix} - [CONTRACT_VIOLATION] Список продуктов пуст") return True
return False
if not run_id: logging.info(f"{log_prefix} Начало экспорта {len(products)} продуктов в RabbitMQ")
logging.error(f"{log_prefix} - [CONTRACT_VIOLATION] run_id не может быть пустым")
return False
logging.info(f"{log_prefix} - Начало экспорта {len(products)} продуктов в RabbitMQ") # <CORE_LOGIC>
try:
# [EXPORT] Создание экспортера и отправка данных with RabbitMQExporter() as exporter:
exporter = RabbitMQExporter()
try:
success = exporter.export_products(products, run_id) success = exporter.export_products(products, run_id)
if success: if success:
logging.info(f"{log_prefix} - [COHERENCE_CHECK_PASSED] Данные успешно экспортированы в RabbitMQ") logging.info(f"{log_prefix} [COHERENCE_CHECK_PASSED] Данные успешно экспортированы в RabbitMQ")
else: else:
logging.error(f"{log_prefix} - [EXPORT_FAILED] Не удалось экспортировать данные в RabbitMQ") logging.error(f"{log_prefix} [EXPORT_FAILED] Не удалось экспортировать данные в RabbitMQ")
return success return success
finally:
exporter.close()
except ImportError as e:
logging.error(f"{log_prefix} - [IMPORT_ERROR] Не удалось импортировать модуль RabbitMQ: {e}")
return False
except Exception as e: except Exception as e:
logging.error(f"{log_prefix} - [COHERENCE_CHECK_FAILED] Непредвиденная ошибка при экспорте в RabbitMQ: {e}", exc_info=True) # <FALLBACK>
logging.error(f"{log_prefix} [COHERENCE_CHECK_FAILED] Непредвиденная ошибка при экспорте в RabbitMQ: {e}", exc_info=True)
return False return False
# </FALLBACK>
# </CORE_LOGIC>
# </ACTION>
# <CONTRACT for="export_logs_to_rabbitmq">
# description: "Экспортирует логи в RabbitMQ."
# preconditions:
# - "Список `log_records` должен быть валидным."
# - "Модуль `core.rabbitmq` должен быть доступен для импорта."
# postconditions: "Логи отправлены в очередь. Возвращает True в случае успеха."
# </CONTRACT>
# <ACTION name="export_logs_to_rabbitmq">
def export_logs_to_rabbitmq(log_records: List[Dict], run_id: str, request_id: str) -> bool: def export_logs_to_rabbitmq(log_records: List[Dict], run_id: str, request_id: str) -> bool:
""" """Экспортирует логи в RabbitMQ."""
[CONTRACT] log_prefix = f"[ACTION:export_logs_to_rabbitmq(id={request_id})]"
@description: Экспортирует логи в RabbitMQ.
@precondition: Список логов валиден, run_id не пустой.
@postcondition: Логи отправлены в очередь или False в случае ошибки.
Args:
log_records: Список записей логов
run_id: Идентификатор запуска парсера
request_id: Идентификатор запроса для логирования
Returns:
bool: True если экспорт успешен, False в противном случае
"""
log_prefix = f"export_logs_to_rabbitmq(id={request_id})"
try: try:
from core.rabbitmq import RabbitMQExporter from core.rabbitmq import RabbitMQExporter
except ImportError:
logging.warning(f"{log_prefix} [DEPENDENCY_MISSING] Модуль RabbitMQ не используется. Экспорт логов пропущен.")
return True
# [VALIDATION] Проверка входных данных if not log_records:
if not log_records: logging.warning(f"{log_prefix} [CONTRACT_VIOLATION] Список логов пуст, экспорт не требуется.")
logging.warning(f"{log_prefix} - [CONTRACT_VIOLATION] Список логов пуст") return True
return False
if not run_id: logging.info(f"{log_prefix} Начало экспорта {len(log_records)} логов в RabbitMQ")
logging.error(f"{log_prefix} - [CONTRACT_VIOLATION] run_id не может быть пустым")
return False
logging.info(f"{log_prefix} - Начало экспорта {len(log_records)} логов в RabbitMQ") # <CORE_LOGIC>
try:
# [EXPORT] Создание экспортера и отправка логов with RabbitMQExporter() as exporter:
exporter = RabbitMQExporter()
try:
success = exporter.export_logs(log_records, run_id) success = exporter.export_logs(log_records, run_id)
if success: if success:
logging.info(f"{log_prefix} - [COHERENCE_CHECK_PASSED] Логи успешно экспортированы в RabbitMQ") logging.info(f"{log_prefix} [COHERENCE_CHECK_PASSED] Логи успешно экспортированы в RabbitMQ")
else: else:
logging.error(f"{log_prefix} - [EXPORT_FAILED] Не удалось экспортировать логи в RabbitMQ") logging.error(f"{log_prefix} [EXPORT_FAILED] Не удалось эк<EFBFBD><EFBFBD>портировать логи в RabbitMQ")
return success return success
finally:
exporter.close()
except ImportError as e:
logging.error(f"{log_prefix} - [IMPORT_ERROR] Не удалось импортировать модуль RabbitMQ: {e}")
return False
except Exception as e: except Exception as e:
logging.error(f"{log_prefix} - [COHERENCE_CHECK_FAILED] Непредвиденная ошибка при экспорте логов в RabbitMQ: {e}", exc_info=True) # <FALLBACK>
logging.error(f"{log_prefix} [COHERENCE_CHECK_FAILED] Непредвиденная ошибка при экспорте логов в RabbitMQ: {e}", exc_info=True)
return False return False
# </FALLBACK>
# </CORE_LOGIC>
# </ACTION>
# <CONTRACT for="validate_rabbitmq_connection">
# description: "Проверяет доступность подключения к RabbitMQ."
# postconditions: "Возвращает True, если подключение успешно установлено и закрыто."
# </CONTRACT>
# <ACTION name="validate_rabbitmq_connection">
def validate_rabbitmq_connection() -> bool: def validate_rabbitmq_connection() -> bool:
""" """Проверяет доступность подключения к RabbitMQ."""
[HELPER] Проверяет доступность подключения к RabbitMQ. log_prefix = "[ACTION:validate_rabbitmq_connection]"
# <CORE_LOGIC>
Returns:
bool: True если подключение доступно, False в противном случае
"""
try: try:
from core.rabbitmq import RabbitMQConnection from core.rabbitmq import RabbitMQConnection
with RabbitMQConnection() as conn:
is_connected = conn.is_connected()
connection = RabbitMQConnection() if is_connected:
success = connection.connect() logging.info(f"{log_prefix} Подключение к RabbitMQ доступно.")
connection.disconnect()
if success:
logging.info("[RABBITMQ] Подключение к RabbitMQ доступно")
else: else:
logging.warning("[RABBITMQ] Подключение к RabbitMQ недоступно") logging.warning(f"{log_prefix} Подключение к RabbitMQ недоступно.")
return is_connected
return success
except ImportError: except ImportError:
logging.warning("[RABBITMQ] Модуль RabbitMQ не установлен") # <FALLBACK>
return False logging.warning(f"{log_prefix} Модуль RabbitMQ не установлен, проверка подключения пропущена.")
return False # Если модуль не установлен, считаем что подключения нет
# </FALLBACK>
except Exception as e: except Exception as e:
logging.error(f"[RABBITMQ] Ошибка проверки подключения: {e}") # <FALLBACK>
logging.error(f"{log_prefix} Ошибка проверки подключения: {e}")
return False return False
# </FALLBACK>
# </CORE_LOGIC>
# </ACTION>
# [COHERENCE_CHECK_PASSED] Модуль экспортеров расширен поддержкой RabbitMQ с полной валидацией и обработкой ошибок. # <COHERENCE_CHECK status="PASSED" description="Модуль экспортеров полностью структурирован и размечен." />