Compare commits

...

11 Commits

Author SHA1 Message Date
abcbb08134 add gzip logging 2025-07-23 18:39:32 +03:00
ae6d81f6f2 readme update 2025-07-21 10:37:12 +03:00
69e90bd4f2 1 2025-07-20 09:47:12 +03:00
c59dff3397 error fix 2025-07-20 09:38:10 +03:00
5742d474fd monitoring refactor 2025-07-20 09:29:19 +03:00
40317aa2e7 end tag error 2025-07-20 09:07:07 +03:00
06cfca4cd1 +fix monitor 2025-07-20 09:01:59 +03:00
cf52a7c0ad fix(logging): Ensure file logger is correctly initialized 2025-07-20 08:35:40 +03:00
ae9c61facc add monitoring service & Telegram sender 2025-07-20 08:34:28 +03:00
0259289ee9 + 2025-07-19 01:04:04 +03:00
fe1a022609 multithread 2025-07-19 01:03:47 +03:00
14 changed files with 657 additions and 265 deletions

5
.gitignore vendored
View File

@@ -19,4 +19,7 @@ price_data_final/
.vscode/ .vscode/
#backups #backups
*.bak *.bak
# Logs
/logs/

236
README.md
View File

@@ -1,58 +1,55 @@
# ANCHOR: Project_README # ANCHOR: Project_README
# Семантика: Документация, описывающая проект, его структуру и способ использования. # Семантика: Документация, описывающая проект, его структуру и способ использования.
# Парсер цен для ElixirPeptide # Сервис мониторинга цен ElixirPeptide (v3.0)
Это структурированное Python-приложение для парсинга каталога товаров с сайта `elixirpeptide.ru`, сбора информации о вариантах товаров и их ценах. Это распределенное Python-приложение для мониторинга каталога товаров с сайта `elixirpeptide.ru`. Сервис автоматически отслеживает изменения в ценах и наличии товаров, и отправляет отчеты в Telegram.
## 🚀 Новые возможности (v2.0) ## 🚀 Архитектура (v3.0)
### ✅ Исправленные критические проблемы: Проект перешел от простого парсера к полноценному сервису мониторинга с асинхронной обработкой данных.
- **Устранено дублирование кода** в `engine.py` и `database.py`
- **Дополнены зависимости** в `requirements.txt` (pydantic, lxml, python-dotenv)
- **Улучшена обработка ошибок** с детальной диагностикой и retry механизмом
- **Добавлена валидация данных** на всех уровнях приложения
### 🎯 Новые функции: - **Точка входа**: `monitoring_service.py` — главный скрипт, запускаемый по расписанию (cron).
- **Retry стратегия** для HTTP запросов с экспоненциальной задержкой - **Оркестратор**: `src/orchestrator.py` управляет процессом парсинга.
- **Детальная статистика** выполнения парсинга - **Анализатор**: `src/analyzer.py` сравнивает данные последнего и предпоследнего запусков, выявляя изменения.
- **Валидация конфигурации** при запуске - **Уведомления**: `src/utils/telegram_sender.py` отправляет HTML-отчеты об изменениях в Telegram.
- **Поддержка переменных окружения** через `.env` файл - **Очередь сообщений**: Интеграция с **RabbitMQ** для асинхронного экспорта данных о товарах и логах.
- **Graceful degradation** - продолжение работы при частичных сбоях - **База данных**: **SQLite** используется для хранения истории парсинга и данных для анализа.
- **Улучшенное логирование** с категоризацией ошибок
### 🔧 Улучшения производительности: ## ✨ Ключевые возможности
- **Адаптивные таймауты** для HTTP запросов
- **Проверка на блокировку/капчу** в ответах сервера - **Автоматический мониторинг**: Запуск по расписанию для непрерывного отслеживания.
- **Оптимизированная обработка данных** с пропуском некорректных записей - **Отчеты об изменениях**: Уведомления в Telegram о новых, удаленных товарах, а также изменениях цен и наличия.
- **Асинхронный экспорт**: Отправка данных в RabbitMQ для дальнейшей обработки другими системами.
- **Надежность**: Механизм retry-запросов, детальное логирование и отказоустойчивость.
- **Гибкая конфигурация**: Все параметры настраиваются через `.env` файл.
## Структура Проекта ## Структура Проекта
Проект организован по принципу семантического разделения ответственности для удобства поддержки и дальнейшей разработки. - `monitoring_service.py`: **[NEW]** Главная точка входа для запуска по расписанию.
- `src/`: Основная директория с исходным кодом. - `src/`: Основная директория с исходным кодом.
- `config.py`: Все настройки (URL, селекторы, флаги сохранения). - `orchestrator.py`: **[NEW]** Управляет всем процессом парсинга.
- `main.py`: Точка входа в приложение, оркестратор процесса. - `analyzer.py`: **[NEW]** Анализирует данные и формирует отчеты.
- `core/`: Пакет с ядром приложения. - `main.py`: Точка входа для ручного запуска парсинга.
- `core/`: Ядро приложения.
- `settings.py`: **[ENHANCED]** Pydantic-модели для конфигурации (включая Telegram, RabbitMQ).
- `database.py`: Логика работы с базой данных SQLite. - `database.py`: Логика работы с базой данных SQLite.
- `logging_config.py`: Настройка системы логирования. - `rabbitmq.py`: **[NEW]** Клиент для работы с RabbitMQ.
- **`models.py`: [NEW FILE] Pydantic модели данных (ProductVariant, LogRecordModel).** - `scraper/`: Логика парсинга.
- **`settings.py`: [ENHANCED] Конфигурация с валидацией и поддержкой .env** - `engine.py`: Улучшенный движок парсера.
- `scraper/`: Пакет с логикой парсинга. - `utils/`: Вспомогательные утилиты.
- **`engine.py`: [ENHANCED] Класс Scraper с retry механизмом и улучшенной обработкой ошибок** - `telegram_sender.py`: **[NEW]** Отправка уведомлений в Telegram.
- `utils/`: Пакет со вспомогательными утилитами. - `requirements.txt`: **[UPDATED]** Обновленный список зависимостей.
- **`exporters.py`: [ENHANCED] Функции для сохранения данных с валидацией** - `.env.example`: Пример файла с переменными окружения.
- `requirements.txt`: Список зависимостей проекта. - `RABBITMQ_SETUP.md`: **[NEW]** Руководство по настройке RabbitMQ.
- `price_data_final/`: Директория для хранения результатов (создается автоматически).
- **`.env.example`: [NEW] Пример файла с переменными окружения**
## Установка и Запуск ## Установка и Запуск
### 1. Клонирование и настройка окружения ### 1. Клонирование и настройка окружения
```bash ```bash
git clone <your-repo-url> git clone https://gitea.bebesh.ru/busya/peptide-parcer.git
cd peptide_parser_project cd peptide-parcer
# Создание виртуального окружения # Создание виртуального окружения
python -m venv venv python -m venv venv
@@ -62,139 +59,68 @@ source venv/bin/activate # Для Windows: venv\Scripts\activate
pip install -r requirements.txt pip install -r requirements.txt
``` ```
### 2. Настройка конфигурации ### 2. Настройка RabbitMQ (Опционально)
#### Вариант A: Через переменные окружения Для асинхронного экспорта данных требуется RabbitMQ. Рекомендуется запуск через Docker:
```bash ```bash
# Создайте файл .env на основе .env.example docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
cp .env.example .env ```
Подробности см. в `RABBITMQ_SETUP.md`.
# Отредактируйте .env файл под ваши нужды ### 3. Настройка конфигурации
nano .env
Скопируйте `.env.example` в `.env` и заполните необходимые параметры.
```bash
cp .env.example .env
nano .env # Отредактируйте файл
``` ```
#### Вариант B: Прямое редактирование настроек **Ключевые переменные окружения:**
Отредактируйте `src/core/settings.py` для изменения настроек по умолчанию. - `TELEGRAM_BOT_TOKEN`: Токен вашего Telegram-бота.
- `TELEGRAM_CHAT_ID`: ID чата, куда будут приходить отчеты.
- `ENABLE_RABBITMQ_EXPORT`: `true` или `false` для включения/отключения экспорта в RabbitMQ.
- `RABBITMQ_HOST`, `RABBITMQ_PORT` и т.д.: Настройки подключения к RabbitMQ.
### 3. Запуск парсера ### 4. Запуск
#### Автоматический мониторинг (рекомендуется)
Настройте запуск `monitoring_service.py` через `cron` или планировщик задач.
```bash
# Пример для cron, запуск каждый час
# 0 * * * * /path/to/your/project/venv/bin/python /path/to/your/project/monitoring_service.py >> /path/to/your/project/logs/cron.log 2>&1
# Ручной запуск для проверки
python monitoring_service.py
```
#### Ручной запуск парсинга (без анализа и отчета)
```bash ```bash
python src/main.py python src/main.py
``` ```
## Конфигурация
### Переменные окружения (.env файл)
| Переменная | Описание | По умолчанию |
|------------|----------|--------------|
| `PARSER_BASE_URL` | Базовый URL сайта | `https://elixirpeptide.ru` |
| `PARSER_CATALOG_URL` | URL каталога товаров | `https://elixirpeptide.ru/catalog/` |
| `PARSER_SAVE_TO_CSV` | Сохранять в CSV | `true` |
| `PARSER_SAVE_TO_DB` | Сохранять в базу данных | `true` |
| `PARSER_LOG_TO_DB` | Логировать в базу данных | `true` |
| `PARSER_TIMEOUT` | Таймаут HTTP запросов (сек) | `30` |
| `PARSER_DELAY` | Задержка между запросами (сек) | `1.0` |
| `PARSER_RETRIES` | Максимум попыток для запросов | `3` |
### Настройки производительности
- **Таймаут запросов**: 30 секунд (настраивается)
- **Задержка между запросами**: 1 секунда (настраивается)
- **Retry стратегия**: 3 попытки с экспоненциальной задержкой
- **Graceful degradation**: Продолжение работы при ошибках отдельных запросов
## Результаты ## Результаты
### Файлы результатов ### Отчеты в Telegram
Сервис присылает в указанный чат отчет, если обнаруживает изменения:
- **💰 Изменились цены**: список товаров с новой и старой ценой.
- **📦 Изменилось наличие**: список товаров, которые появились или закончились.
- **✨ Новые товары**: список добавленных товаров.
- **🗑️ Удаленные товары**: список удаленных товаров.
- **CSV файл**: `price_data_final/prices_full_catalog_YYYY-MM-DD_HHMMSS.csv` ### Данные
- **База данных**: `price_data_final/parser_data.db` (SQLite) - **База данных**: `price_data_final/parser_data.db` — хранит всю историю парсинга.
- **CSV файл**: `price_data_final/prices_full_catalog_*.csv` — создается при ручном запуске.
### Структура данных - **RabbitMQ**: Сообщения с данными о товарах и логах отправляются в очереди `price_parser.products` и `price_parser.logs`.
```csv
name,volume,price
"Peptide X","30ml",1500
"Peptide Y","50ml",2500
```
### Логирование
- **Консольные логи**: Детальная информация о процессе парсинга
- **Логи в БД**: Если `PARSER_LOG_TO_DB=true`, все логи сохраняются в таблицу `logs`
## Обработка ошибок
### Типы обрабатываемых ошибок
1. **Сетевые ошибки**: Timeout, ConnectionError, HTTPError
2. **Ошибки парсинга**: Отсутствующие элементы, некорректные данные
3. **Ошибки файловой системы**: Права доступа, отсутствие директорий
4. **Ошибки базы данных**: SQLite ошибки, проблемы с подключением
### Стратегия восстановления
- **Retry механизм**: Автоматические повторные попытки для сетевых ошибок
- **Graceful degradation**: Пропуск проблемных записей с продолжением работы
- **Детальная диагностика**: Подробные логи для анализа проблем
## Мониторинг и статистика
### Статистика выполнения
Приложение выводит детальную статистику:
```
[FINAL_STATS] Время выполнения: 45.23 секунд
[FINAL_STATS] Успешность: 95/100 (95.0%)
[STATS] Успешно: 95, Ошибок: 5
```
### Метрики
- Общее количество URL для парсинга
- Количество успешно обработанных записей
- Количество ошибок
- Время выполнения
- Процент успешности
## Разработка
### Архитектурные принципы
1. **Разделение ответственности**: Каждый модуль отвечает за свою область
2. **Типизация**: Использование Pydantic для валидации данных
3. **Обработка ошибок**: Graceful handling с детальной диагностикой
4. **Конфигурируемость**: Гибкие настройки через переменные окружения
5. **Логирование**: Структурированное логирование на всех уровнях
### Добавление новых функций
1. **Новые форматы экспорта**: Добавьте функции в `src/utils/exporters.py`
2. **Новые селекторы**: Обновите `ScraperSelectors` в `src/core/settings.py`
3. **Новые поля данных**: Расширьте модель `ProductVariant` в `src/core/models.py`
## Устранение неполадок ## Устранение неполадок
### Частые проблемы 1. **Не приходят сообщения в Telegram**:
- Проверьте правильность `TELEGRAM_BOT_TOKEN` и `TELEGRAM_CHAT_ID`.
1. **"Не удается подключиться к базовому URL"** - Убедитесь, что бот добавлен в чат с нужными правами.
- Проверьте интернет-соединение 2. **Ошибка подключения к RabbitMQ**:
- Убедитесь, что сайт доступен - Проверьте, что RabbitMQ запущен и доступен.
- Проверьте настройки прокси - Убедитесь в корректности настроек в `.env`.
3. **Ошибки парсинга**:
2. **"Не найдено ни одной ссылки на товар"** - Проверьте доступность сайта `elixirpeptide.ru`.
- Проверьте CSS селекторы в настройках - Возможно, изменилась структура HTML-страниц. В этом случае нужно обновить CSS-селекторы в `src/core/settings.py`.
- Убедитесь, что структура сайта не изменилась
3. **"Ошибка при сохранении в БД"**
- Проверьте права доступа к директории
- Убедитесь, что SQLite поддерживается
### Логи для диагностики
Все ошибки логируются с детальной информацией. Проверьте:
- Консольные логи при запуске
- Логи в базе данных (если включено)
- Файлы результатов для проверки данных

76
monitoring_service.py Normal file
View File

@@ -0,0 +1,76 @@
# <MODULE name="monitoring_service" semantics="main_entrypoint_for_cron" />
# <IMPORTS>
import logging
import asyncio
import sys
import os
from datetime import datetime
# Добавляем 'src' в sys.path, чтобы внутренние импорты (e.g., `from core...`) работали
sys.path.insert(0, os.path.join(os.path.abspath(os.path.dirname(__file__)), 'src'))
from orchestrator import AppOrchestrator
from core.settings import settings
from core.logging_config import setup_logging
from analyzer import DataAnalyzer
from utils.telegram_sender import send_telegram_notification
# </IMPORTS>
# <CONTRACT for="main">
# description: "Главная точка входа для сервиса мониторинга. Запускается по расписанию (cron)."
# postconditions:
# - "Парсер запускается, данные сохраняются в БД."
# - "Если обнаружены изменения, отправляется отчет в Telegram."
# </CONTRACT>
# <ENTRYPOINT name="main">
async def main():
"""
Основная асинхронная функция, которая запускает парсер,
анализирует данные и отправляет уведомление.
"""
run_id = datetime.now().strftime("%Y%m%d-%H%M%S")
setup_logging(run_id=run_id)
logger = logging.getLogger(__name__)
logger.info(f"🚀 Запуск сервиса мониторинга. Run ID: {run_id}")
try:
# 1. Запуск парсера
logger.info("Начало этапа парсинга...")
orchestrator = AppOrchestrator(settings=settings)
orchestrator.run()
logger.info("Этап парсинга завершен.")
# 2. Анализ данных
logger.info("Начало этапа анализа данных...")
analyzer = DataAnalyzer()
report_message = analyzer.analyze()
logger.info("Этап анализа данных завершен.")
# 3. Отправка отчета
if report_message:
logger.info("Обнаружены изменения, отправка отчета в Telegram...")
await send_telegram_notification(report_message)
else:
logger.info("Изменений не найдено, отправка отчета не требуется.")
except Exception as e:
logger.critical(f"💥 Критическая ошибка в сервисе мониторинга: {e}", exc_info=True)
# Попытка отправить уведомление об ошибке
try:
error_message = f"<b>❗️ Критическая ошибка в сервисе мониторинга</b>\n\n<pre>{e}</pre>"
await send_telegram_notification(error_message)
except Exception as tg_e:
logger.error(f"Не удалось даже отправить уведомление об ошибке в Telegram: {tg_e}")
logger.info("✅ Сервис мониторинга завершил работу.")
# <MAIN_CONTRACT>
# description: "Стандартный блок для запуска main() при выполнении скрипта."
# </MAIN_CONTRACT>
if __name__ == "__main__":
# Используем asyncio.run() для запуска асинхронной функции main
asyncio.run(main())
# <COHERENCE_CHECK status="PASSED" />

View File

@@ -17,4 +17,12 @@ python-dotenv>=1.0.0
# ANCHOR: RabbitMQ_Dependencies # ANCHOR: RabbitMQ_Dependencies
# Семантика: Зависимости для работы с очередью сообщений RabbitMQ # Семантика: Зависимости для работы с очередью сообщений RabbitMQ
pika>=1.3.0 pika>=1.3.0
# ANCHOR: Database_Dependencies
# Семантика: Зависимости для работы с базой данных
SQLAlchemy>=2.0.0
# ANCHOR: Telegram_Dependencies
# Семантика: Зависимости для отправки уведомлений в Telegram
python-telegram-bot>=21.0.0

118
src/analyzer.py Normal file
View File

@@ -0,0 +1,118 @@
# <MODULE name="analyzer" semantics="data_comparison_logic" />
# <IMPORTS>
import logging
import sqlite3
from typing import List, Dict, Tuple, Any
from src.core.settings import settings
# </IMPORTS>
# <MAIN_CONTRACT for="DataAnalyzer">
# description: "Анализирует данные двух последних запусков парсера и выявляет изменения, используя прямые SQL-запросы."
# </MAIN_CONTRACT>
class DataAnalyzer:
# <INIT>
def __init__(self):
self.db_path = settings.db_path
self.logger = logging.getLogger(__name__)
# </INIT>
def _execute_query(self, query: str, params: tuple = ()) -> List[Any]:
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute(query, params)
return cursor.fetchall()
except sqlite3.Error as e:
self.logger.error(f"Ошибка выполнения SQL-запроса: {e}")
return []
# <HELPER name="_get_last_two_runs">
def _get_last_two_runs(self) -> Tuple[str, str]:
"""Возвращает ID двух последних успешных запусков."""
query = "SELECT run_id FROM parsing_runs ORDER BY start_time DESC LIMIT 2"
runs = self._execute_query(query)
if len(runs) < 2:
self.logger.warning("Найдено менее двух запусков. Сравнение невозможно.")
return None, None
return runs[0][0], runs[1][0] # (current_run_id, previous_run_id)
# </HELPER>
# <HELPER name="_get_data_for_run">
def _get_data_for_run(self, run_id: str) -> Dict[str, Dict]:
"""Возвращает данные о товарах для указанного запуска в виде словаря {url: product_data}."""
query = "SELECT name, volume, price, url, is_in_stock FROM products WHERE run_id = ?"
rows = self._execute_query(query, (run_id,))
# Преобразуем sqlite3.Row в словари
return {row[3]: dict(zip(['name', 'volume', 'price', 'url', 'is_in_stock'], row)) for row in rows}
# </HELPER>
# <ACTION name="analyze">
def analyze(self) -> str:
"""
Сравнивает два последних запуска и генерирует HTML-отчет об изменениях.
"""
current_run_id, prev_run_id = self._get_last_two_runs()
if not current_run_id or not prev_run_id:
return ""
self.logger.info(f"Сравнение запуска {current_run_id} с предыдущим запуском {prev_run_id}")
current_data = self._get_data_for_run(current_run_id)
prev_data = self._get_data_for_run(prev_run_id)
price_changes = []
availability_changes = []
new_items = []
removed_items = []
for url, current_item in current_data.items():
prev_item = prev_data.get(url)
if prev_item:
if current_item['price'] != prev_item['price']:
price_changes.append((current_item, prev_item['price']))
if current_item['is_in_stock'] != prev_item['is_in_stock']:
availability_changes.append(current_item)
else:
new_items.append(current_item)
for url, prev_item in prev_data.items():
if url not in current_data:
removed_items.append(prev_item)
return self._format_report(price_changes, availability_changes, new_items, removed_items)
# </ACTION>
# <HELPER name="_format_report">
def _format_report(self, price_changes, availability_changes, new_items, removed_items) -> str:
"""Форматирует найденные изменения в красивый HTML-отчет."""
if not any([price_changes, availability_changes, new_items, removed_items]):
self.logger.info("Изменений не найдено.")
return ""
report_parts = ["<b>📈 Отчет об изменениях на сайте</b>\n"]
if price_changes:
report_parts.append("\n<b>💰 Изменились цены:</b>")
for item, old_price in price_changes:
report_parts.append(f" • <a href='{item['url']}'>{item['name']} ({item['volume']})</a>: {old_price} ₽ → <b>{item['price']} ₽</b>")
if availability_changes:
report_parts.append("\n<b>📦 Изменилось наличие:</b>")
for item in availability_changes:
status = "В наличии" if item['is_in_stock'] else "❌ Нет в наличии"
report_parts.append(f" • <a href='{item['url']}'>{item['name']} ({item['volume']})</a>: <b>{status}</b>")
if new_items:
report_parts.append("\n<b>✨ Новые товары:</b>")
for item in new_items:
report_parts.append(f" • <a href='{item['url']}'>{item['name']} ({item['volume']})</a> - {item['price']}")
if removed_items:
report_parts.append("\n<b>🗑️ Удаленные товары:</b>")
for item in removed_items:
report_parts.append(f" • <a href='{item['url']}'>{item['name']} ({item['volume']})</a>")
return "\n".join(report_parts)
# </HELPER>
# <COHERENCE_CHECK status="PASSED" />

View File

@@ -102,6 +102,13 @@ def init_database(db_path: Path, run_id: str):
db_path.parent.mkdir(parents=True, exist_ok=True) db_path.parent.mkdir(parents=True, exist_ok=True)
with sqlite3.connect(db_path) as con: with sqlite3.connect(db_path) as con:
cur = con.cursor() cur = con.cursor()
cur.execute("""
CREATE TABLE IF NOT EXISTS parsing_runs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
run_id TEXT NOT NULL UNIQUE,
start_time TIMESTAMP NOT NULL
)
""")
cur.execute(""" cur.execute("""
CREATE TABLE IF NOT EXISTS products ( CREATE TABLE IF NOT EXISTS products (
id INTEGER PRIMARY KEY AUTOINCREMENT, id INTEGER PRIMARY KEY AUTOINCREMENT,
@@ -111,7 +118,8 @@ def init_database(db_path: Path, run_id: str):
price INTEGER NOT NULL, price INTEGER NOT NULL,
url TEXT, url TEXT,
is_in_stock BOOLEAN, is_in_stock BOOLEAN,
parsed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP parsed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (run_id) REFERENCES parsing_runs (run_id)
) )
""") """)
cur.execute(""" cur.execute("""
@@ -120,7 +128,8 @@ def init_database(db_path: Path, run_id: str):
run_id TEXT NOT NULL, run_id TEXT NOT NULL,
timestamp TEXT NOT NULL, timestamp TEXT NOT NULL,
level TEXT NOT NULL, level TEXT NOT NULL,
message TEXT NOT NULL message TEXT NOT NULL,
FOREIGN KEY (run_id) REFERENCES parsing_runs (run_id)
) )
""") """)
con.commit() con.commit()

View File

@@ -2,49 +2,101 @@
# <IMPORTS> # <IMPORTS>
import logging import logging
import os
import gzip
import shutil
from logging.handlers import RotatingFileHandler
from pathlib import Path
from typing import Optional from typing import Optional
from .database import DatabaseLogHandler, DatabaseManager from .database import DatabaseLogHandler, DatabaseManager
from .settings import settings from .settings import settings
# </IMPORTS> # </IMPORTS>
# <HELPER name="gzip_rotator">
class GzipRotator:
def __call__(self, source: str, dest: str) -> None:
try:
with open(source, 'rb') as f_in:
with gzip.open(dest, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
os.remove(source)
except Exception as e:
print(f"Ошибка при сжатии лог-файла: {e}")
def namer(name: str) -> str:
return name + ".gz"
# </HELPER>
# <CONTRACT for="setup_logging"> # <CONTRACT for="setup_logging">
# description: "Настраивает логирование, опционально добавляя обработчик для записи в базу данных." # description: "Настраивает логирование с ротацией и сжатием, опционально добавляя обработчик для записи в базу данных."
# preconditions: # preconditions:
# - "run_id должен быть строкой." # - "run_id должен быть строкой."
# - "db_manager должен быть экземпляром DatabaseManager или None." # - "db_manager должен быть экземпляром DatabaseManager или None."
# postconditions: # postconditions:
# - "Базовая конфигурация логирования настроена." # - "Базовая конфигурация логирования настроена с ротацией и сжатием."
# - "Если log_to_db is True и db_manager предоставлен, добавляется обработчик для БД." # - "Если log_to_db is True и db_manager предоставлен, добавляется обработчик для БД."
# exceptions: # exceptions:
# - "Может возникнуть исключение при ошибке инициализации обработчика БД." # - "Может возникнуть исключение при ошибке инициализации обработчика БД."
# </CONTRACT> # </CONTRACT>
# <ACTION name="setup_logging"> # <ACTION name="setup_logging">
def setup_logging(run_id: str, db_manager: Optional[DatabaseManager] = None): def setup_logging(run_id: str, db_manager: Optional[DatabaseManager] = None):
"""Настраивает систему логирования проекта.""" """Настраивает систему логирования проекта с ротацией и сжатием."""
# <CORE_LOGIC> # <CORE_LOGIC>
log_format = '[%(asctime)s] [%(levelname)s] :: %(message)s' log_format = '[%(asctime)s] [%(levelname)s] :: %(message)s'
logging.basicConfig( handlers = []
level=logging.INFO,
format=log_format, # <ACTION name="create_log_directory">
datefmt='%Y-%m-%d %H:%M:%S', # Создаем директорию для логов, если она не существует.
force=True # Перезаписывает любую существующую конфигурацию os.makedirs(settings.log_dir, exist_ok=True)
# </ACTION>
# <ACTION name="configure_file_handler">
# Добавляем обработчик для записи логов в файл с ротацией и сжатием.
log_file_name = f"run_{run_id}.log"
log_file_path = settings.log_dir / log_file_name
# Используем RotatingFileHandler для управления размером лог-файлов.
# 10 MB per file, 5 backup files
file_handler = RotatingFileHandler(
log_file_path, maxBytes=10*1024*1024, backupCount=5, encoding='utf-8'
) )
file_handler.setLevel(logging.INFO)
file_handler.setFormatter(logging.Formatter(log_format))
# <REFACTORING_TARGET>
# Добавляем сжатие для ротированных файлов
file_handler.rotator = GzipRotator()
file_handler.namer = namer
# </REFACTORING_TARGET>
handlers.append(file_handler)
# </ACTION>
# <DEPENDENCY name="settings.log_to_db" /> # <DEPENDENCY name="settings.log_to_db" />
if settings.log_to_db and db_manager: if settings.log_to_db and db_manager:
# <ERROR_HANDLER> # <ERROR_HANDLER>
try: try:
root_logger = logging.getLogger('')
db_handler = DatabaseLogHandler(db_manager, run_id) db_handler = DatabaseLogHandler(db_manager, run_id)
db_handler.setLevel(logging.DEBUG) db_handler.setLevel(logging.DEBUG)
db_handler.setFormatter(logging.Formatter(log_format)) db_handler.setFormatter(logging.Formatter(log_format))
root_logger.addHandler(db_handler) handlers.append(db_handler)
logging.info("Обработчик логов для записи в <20><>азу данных успешно добавлен.")
except Exception as e: except Exception as e:
logging.error(f"Не удалось инициализировать обработчик логов для БД: {e}") # Логирование еще не настроено, используем print для этой критической ошибки
print(f"Не удалось инициализировать обработчик логов для БД: {e}")
# </ERROR_HANDLER> # </ERROR_HANDLER>
logging.info("Система логирования инициализирована.") logging.basicConfig(
level=logging.DEBUG,
format=log_format,
datefmt='%Y-%m-%d %H:%M:%S',
handlers=handlers,
force=True # Перезаписывает любую существующую конфигурацию
)
if any(isinstance(h, DatabaseLogHandler) for h in handlers):
logging.info("Обработчик логов для записи в базу данных успешно добавлен.")
logging.info("Система логирования инициализирована с ротацией и сжатием.")
# </CORE_LOGIC> # </CORE_LOGIC>
# <COHERENCE_CHECK status="PASSED" /> # <COHERENCE_CHECK status="PASSED" />
# </ACTION> # </ACTION>

View File

@@ -6,10 +6,14 @@
# </DESIGN_NOTE> # </DESIGN_NOTE>
# <IMPORTS> # <IMPORTS>
import logging
from pydantic import BaseModel, Field, HttpUrl from pydantic import BaseModel, Field, HttpUrl
from pydantic.functional_validators import model_validator
from datetime import datetime from datetime import datetime
from typing import List from typing import List, Optional
import uuid import uuid
logger = logging.getLogger(__name__)
# </IMPORTS> # </IMPORTS>
# <MAIN_CONTRACT for="ProductVariant"> # <MAIN_CONTRACT for="ProductVariant">
@@ -19,8 +23,18 @@ import uuid
class ProductVariant(BaseModel): class ProductVariant(BaseModel):
# <STATE name="product_variant_fields"> # <STATE name="product_variant_fields">
name: str = Field(..., description="Название продукта.") name: str = Field(..., description="Название продукта.")
volume: str = Field(..., description="Объем или вариант продукта (например, '50мл', '10 капсул').") volume: Optional[str] = Field(None, description="Объем или вариант продукта (например, '50мл', '10 капсул'). Может быть пустым, если не применимо.")
price: int = Field(..., gt=0, description="Цена продукта в числовом формате, должна быть положительной.") price: int = Field(..., description="Цена продукта в числовом формате. Должна быть положительной, если товар в наличии, иначе может быть 0.")
is_in_stock: bool = Field(..., description="Наличие товара.")
@model_validator(mode='after')
def validate_price_based_on_stock(self) -> 'ProductVariant':
if not self.is_in_stock and self.price != 0:
logger.warning(f"[CONTRACT_VIOLATION] Product '{self.name}' (URL: {self.url}) is out of stock but has a non-zero price ({self.price}). Setting price to 0.")
self.price = 0
elif self.is_in_stock and self.price <= 0:
raise ValueError("Price must be greater than 0 for in-stock products.")
return self
url: HttpUrl = Field(..., description="Полный URL страницы варианта продукта.") url: HttpUrl = Field(..., description="Полный URL страницы варианта продукта.")
is_in_stock: bool = Field(..., description="Наличие товара.") is_in_stock: bool = Field(..., description="Наличие товара.")
# </STATE> # </STATE>

View File

@@ -12,12 +12,7 @@ import pika
from pika.adapters.blocking_connection import BlockingChannel from pika.adapters.blocking_connection import BlockingChannel
from pika.exceptions import AMQPConnectionError, AMQPChannelError, ConnectionClosed from pika.exceptions import AMQPConnectionError, AMQPChannelError, ConnectionClosed
from .settings import ( from .settings import settings
RABBITMQ_HOST, RABBITMQ_PORT, RABBITMQ_USERNAME, RABBITMQ_PASSWORD,
RABBITMQ_VIRTUAL_HOST, RABBITMQ_CONNECTION_TIMEOUT, RABBITMQ_HEARTBEAT,
RABBITMQ_BLOCKED_CONNECTION_TIMEOUT, RABBITMQ_PRODUCTS_QUEUE,
RABBITMQ_LOGS_QUEUE, RABBITMQ_EXCHANGE
)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -41,17 +36,17 @@ class RabbitMQConnection:
Returns: Returns:
pika.ConnectionParameters: Параметры подключения pika.ConnectionParameters: Параметры подключения
""" """
credentials = pika.PlainCredentials(RABBITMQ_USERNAME, RABBITMQ_PASSWORD) credentials = pika.PlainCredentials(settings.rabbitmq_user, settings.rabbitmq_password)
return pika.ConnectionParameters( return pika.ConnectionParameters(
host=RABBITMQ_HOST, host=settings.rabbitmq_host,
port=RABBITMQ_PORT, port=settings.rabbitmq_port,
virtual_host=RABBITMQ_VIRTUAL_HOST, virtual_host=settings.rabbitmq_vhost,
credentials=credentials, credentials=credentials,
connection_attempts=3, connection_attempts=3,
retry_delay=5, retry_delay=5,
socket_timeout=RABBITMQ_CONNECTION_TIMEOUT, socket_timeout=30, # Hardcoded for now
heartbeat=RABBITMQ_HEARTBEAT, heartbeat=600, # Hardcoded for now
blocked_connection_timeout=RABBITMQ_BLOCKED_CONNECTION_TIMEOUT blocked_connection_timeout=300 # Hardcoded for now
) )
def connect(self) -> bool: def connect(self) -> bool:

View File

@@ -67,22 +67,40 @@ class Settings(BaseModel):
# <CONFIG name="logging_settings"> # <CONFIG name="logging_settings">
log_to_db: bool = Field(default=os.getenv('PARSER_LOG_TO_DB', 'true').lower() == 'true') log_to_db: bool = Field(default=os.getenv('PARSER_LOG_TO_DB', 'true').lower() == 'true')
log_dir: Path = Field(default=BASE_DIR / "logs", description="Директория для сохранения логов")
# </CONFIG> # </CONFIG>
# <CONFIG name="performance_settings"> # <CONFIG name="performance_settings">
request_timeout: int = Field(default=int(os.getenv('PARSER_TIMEOUT', 30))) request_timeout: int = Field(default=int(os.getenv('PARSER_TIMEOUT', 30)))
delay_between_requests: float = Field(default=float(os.getenv('PARSER_DELAY', 1.0))) delay_between_requests: float = Field(default=float(os.getenv('PARSER_DELAY', 1.0)))
max_retries: int = Field(default=int(os.getenv('PARSER_RETRIES', 3))) max_retries: int = Field(default=int(os.getenv('PARSER_RETRIES', 3)))
num_parser_threads: int = Field(default=int(os.getenv('PARSER_THREADS', 5)), description="Количество потоков для парсинга")
# </CONFIG>
# <CONFIG name="telegram_settings">
telegram_bot_token: str = Field(default=os.getenv('TELEGRAM_BOT_TOKEN', ''), description="Токен для Telegram бота")
telegram_chat_id: str = Field(default=os.getenv('TELEGRAM_CHAT_ID', ''), description="ID чата для отправки уведомлений")
# </CONFIG>
# <CONFIG name="rabbitmq_settings">
rabbitmq_host: str = Field(default=os.getenv('RABBITMQ_HOST', 'localhost'))
rabbitmq_port: int = Field(default=int(os.getenv('RABBITMQ_PORT', 5672)))
rabbitmq_user: str = Field(default=os.getenv('RABBITMQ_USERNAME', 'guest'))
rabbitmq_password: str = Field(default=os.getenv('RABBITMQ_PASSWORD', 'guest'))
rabbitmq_vhost: str = Field(default=os.getenv('RABBITMQ_VIRTUAL_HOST', '/'))
rabbitmq_products_queue: str = Field(default=os.getenv('RABBITMQ_PRODUCTS_QUEUE', 'price_parser.products'))
rabbitmq_logs_queue: str = Field(default=os.getenv('RABBITMQ_LOGS_QUEUE', 'price_parser.logs'))
rabbitmq_exchange: str = Field(default=os.getenv('RABBITMQ_EXCHANGE', 'price_parser.exchange'))
# </CONFIG> # </CONFIG>
# <CONFIG name="selectors_config_instance"> # <CONFIG name="selectors_config_instance">
selectors: ScraperSelectors = ScraperSelectors( selectors: ScraperSelectors = ScraperSelectors(
CATALOG_PRODUCT_LINK='.product-card h4 a.product-link', CATALOG_PRODUCT_LINK='.product-card h4 a.product-link',
VARIANT_LIST_ITEM='.product-version-select li', VARIANT_LIST_ITEM='.product-version-select li, .product-variants-list .variant-item',
PRODUCT_PAGE_NAME='h1.product-h1', PRODUCT_PAGE_NAME='h1.product-h1',
ACTIVE_VOLUME='.product-version-select li.active', ACTIVE_VOLUME='.product-version-select li.active, .variant-item.active',
PRICE_BLOCK='.product-sale-box .price span', PRICE_BLOCK='.price-value, .product-price .price, .product-sale-box .price span',
PRODUCT_UNAVAILABLE='.product-unavailable', PRODUCT_UNAVAILABLE='.product-unavailable, .out-of-stock, .unavailable, .stock.status-0',
) )
# </CONFIG> # </CONFIG>

View File

@@ -7,8 +7,10 @@
# <IMPORTS> # <IMPORTS>
import sys import sys
import logging import logging
from datetime import datetime
from orchestrator import AppOrchestrator from orchestrator import AppOrchestrator
from core.settings import settings from core.settings import settings
from core.logging_config import setup_logging
# </IMPORTS> # </IMPORTS>
# <CONTRACT for="main"> # <CONTRACT for="main">
@@ -20,17 +22,15 @@ from core.settings import settings
# exceptions: # exceptions:
# - "ValueError: при ошибках в конфигурации." # - "ValueError: при ошибках в конфигурации."
# - "KeyboardInterrupt: при прерывании пользователем." # - "KeyboardInterrupt: при прерывании пользователем."
# - "Exception: при любых других критических ошибках." # - "Exception: при любых других критических ошибка<EFBFBD><EFBFBD>."
# </CONTRACT> # </CONTRACT>
# <ENTRYPOINT name="main"> # <ENTRYPOINT name="main">
def main(): def main():
"""Точка входа в приложение.""" """Точка входа в приложение."""
# <INIT> # <INIT>
logging.basicConfig( run_id = datetime.now().strftime("%Y%m%d-%H%M%S")
level=logging.INFO, setup_logging(run_id=run_id)
format='[%(asctime)s] [%(levelname)s] %(name)s: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# </INIT> # </INIT>
@@ -50,6 +50,7 @@ def main():
logger.info(f" • Таймаут запросов: {settings.request_timeout}с") logger.info(f" • Таймаут запросов: {settings.request_timeout}с")
logger.info(f" • Задержка между запросами: {settings.delay_between_requests}с") logger.info(f" • Задержка между запросами: {settings.delay_between_requests}с")
logger.info(f" • Максимум попыток: {settings.max_retries}") logger.info(f" • Максимум попыток: {settings.max_retries}")
logger.info(f" • Количество потоков: {settings.num_parser_threads}")
# <DEPENDENCY name="settings.validate_configuration" /> # <DEPENDENCY name="settings.validate_configuration" />
config_errors = settings.validate_configuration() config_errors = settings.validate_configuration()

View File

@@ -12,13 +12,17 @@ import requests
from datetime import datetime from datetime import datetime
from typing import List, Optional from typing import List, Optional
from contextlib import contextmanager from contextlib import contextmanager
from concurrent.futures import ThreadPoolExecutor, as_completed
from core.settings import Settings, ENABLE_RABBITMQ_EXPORT, ENABLE_CSV_EXPORT, ENABLE_DATABASE_EXPORT from core.settings import Settings, ENABLE_RABBITMQ_EXPORT, ENABLE_CSV_EXPORT, ENABLE_DATABASE_EXPORT
from core.models import ProductVariant from core.models import ProductVariant
from core.database import init_database, save_data_to_db, DatabaseManager from core.database import DatabaseManager, init_database, save_data_to_db
from core.logging_config import setup_logging from core.rabbitmq import RabbitMQExporter
from scraper.engine import Scraper from scraper.engine import Scraper
from utils.exporters import save_data_to_csv, export_data_to_rabbitmq, export_logs_to_rabbitmq, validate_rabbitmq_connection from utils.exporters import save_data_to_csv
from core.logging_config import setup_logging
from datetime import datetime
import sqlite3
# </IMPORTS> # </IMPORTS>
# <MAIN_CONTRACT for="AppOrchestrator"> # <MAIN_CONTRACT for="AppOrchestrator">
@@ -48,19 +52,30 @@ class AppOrchestrator:
# <DEPENDENCY name="Scraper" relationship="composition"> # <DEPENDENCY name="Scraper" relationship="composition">
# Оркестратор владеет скрейпером и управляет его жизненным циклом. # Оркестратор владеет скрейпером и управляет его жизненным циклом.
self.scraper = Scraper( self.scraper = Scraper(
session=self.http_session,
selectors=self.settings.selectors, selectors=self.settings.selectors,
base_url=self.settings.base_url base_url=self.settings.base_url
) )
# </DEPENDENCY> # </DEPENDENCY>
self.logger = logging.getLogger(self.__class__.__name__) self.logger = logging.getLogger(self.__class__.__name__)
# </INIT> self._setup()
def _register_run(self):
"""Регистрирует текущий запуск в базе данных."""
if not self.db_manager:
return
try:
with self.db_manager as conn:
conn.execute(
"INSERT INTO parsing_runs (run_id, start_time) VALUES (?, ?)",
(self.run_id, datetime.now())
)
conn.commit()
self.logger.info(f"Запуск {self.run_id} зарегистрирован в БД.")
except sqlite3.Error as e:
self.logger.error(f"Не удалось зарегистрировать запуск {self.run_id} в БД: {e}")
raise
# <CONTRACT for="_error_context">
# description: "Контекстный менеджер для централизованной обработки ошибок в ключевых операциях."
# </CONTRACT>
# <HELPER name="_error_context">
@contextmanager @contextmanager
def _error_context(self, operation: str): def _error_context(self, operation: str):
"""Контекстный менеджер для обработки ошибок с детальной диагностикой.""" """Контекстный менеджер для обработки ошибок с детальной диагностикой."""
@@ -70,7 +85,6 @@ class AppOrchestrator:
self.logger.error(f"[{operation.upper()}] Ошибка в операции '{operation}': {e}", exc_info=True) self.logger.error(f"[{operation.upper()}] Ошибка в операции '{operation}': {e}", exc_info=True)
self._log_error_details(operation, e) self._log_error_details(operation, e)
raise raise
# </HELPER>
# <CONTRACT for="_log_error_details"> # <CONTRACT for="_log_error_details">
# description: "Логирует расширенную информацию об ошибке для упрощения диагностики." # description: "Логирует расширенную информацию об ошибке для упрощения диагностики."
@@ -109,10 +123,60 @@ class AppOrchestrator:
setup_logging(self.run_id, self.db_manager) setup_logging(self.run_id, self.db_manager)
if ENABLE_RABBITMQ_EXPORT: if ENABLE_RABBITMQ_EXPORT:
if validate_rabbitmq_connection(): self.rabbitmq_exporter = RabbitMQExporter()
if self.rabbitmq_exporter.connection.connect():
self.logger.info("[ACTION:_setup] Подключение к RabbitMQ доступно") self.logger.info("[ACTION:_setup] Подключение к RabbitMQ доступно")
else: else:
self.logger.warning("[ACTION:_setup] Подключение к RabbitMQ недоступно, экспорт будет пропущен") self.logger.warning("[ACTION:_setup] Подключение к RabbitMQ недоступно, экспорт будет пропущен")
self.rabbitmq_exporter = None
self.logger.info(f"[ACTION:_setup] Оркестратор запущен. Архитектура v2.0. Run ID: {self.run_id}")
# </CORE_LOGIC>
# </ACTION>
# <CONTRACT for="_log_error_details">
# description: "Логирует расширенную информацию об ошибке для упрощения диагностики."
# </CONTRACT>
# <HELPER name="_log_error_details">
def _log_error_details(self, operation: str, error: Exception):
"""Логирует детальную информацию об ошибке."""
error_info = {
'operation': operation,
'error_type': type(error).__name__,
'error_message': str(error),
'run_id': self.run_id,
'timestamp': datetime.now().isoformat(),
'stats': self.stats.copy()
}
self.logger.error(f"[HELPER:_log_error_details] Детали ошибки: {error_info}")
# </HELPER>
# <CONTRACT for="_setup">
# description: "Шаг 0: Инициализация всех систем (БД, логирование, RabbitMQ)."
# </CONTRACT>
# <ACTION name="_setup">
def _setup(self):
"""Инициализация всех систем перед началом парсинга."""
with self._error_context("setup"):
# <CORE_LOGIC>
self.stats['start_time'] = datetime.now()
self.logger.info(f"[ACTION:_setup] Запуск инициализации систем. Run ID: {self.run_id}")
if self.settings.save_to_db or self.settings.log_to_db:
self.settings.output_dir.mkdir(parents=True, exist_ok=True)
self.db_manager = DatabaseManager(self.settings.db_path)
init_database(self.db_manager.db_path, self.run_id)
# <DEPENDENCY name="setup_logging" />
setup_logging(self.run_id, self.db_manager)
if ENABLE_RABBITMQ_EXPORT:
self.rabbitmq_exporter = RabbitMQExporter()
if self.rabbitmq_exporter.connection.connect():
self.logger.info("[ACTION:_setup] Подключение к RabbitMQ доступно")
else:
self.logger.warning("[ACTION:_setup] Подключение к RabbitMQ недоступно, экспорт будет пропущен")
self.rabbitmq_exporter = None
self.logger.info(f"[ACTION:_setup] Оркестратор запущен. Архитектура v2.0. Run ID: {self.run_id}") self.logger.info(f"[ACTION:_setup] Оркестратор запущен. Архитектура v2.0. Run ID: {self.run_id}")
# </CORE_LOGIC> # </CORE_LOGIC>
@@ -155,41 +219,56 @@ class AppOrchestrator:
# </CORE_LOGIC> # </CORE_LOGIC>
# </ACTION> # </ACTION>
# <CONTRACT for="_scrape_single_url">
# description: "Вспомогательный метод для парсинга одного URL в отдельном потоке."
# postconditions: "Возвращает ProductVariant или None в случае ошибки."
# </CONTRACT>
# <HELPER name="_scrape_single_url">
def _scrape_single_url(self, url: str) -> Optional[ProductVariant]:
try:
self.logger.info(f"[ACTION:_scrape_single_url] Парсинг URL: {url.split('/')[-1]}")
variant_data = self.scraper.scrape_variant_page(
variant_url=url,
run_id=self.run_id
)
if variant_data:
self.logger.debug(f"[ACTION:_scrape_single_url] Успешно спарсен URL: {url.split('/')[-1]}")
return variant_data
else:
self.logger.warning(f"[ACTION:_scrape_single_url] Данные не получены для URL: {url.split('/')[-1]}")
return None
except Exception as e:
self.logger.error(f"[ACTION:_scrape_single_url] Ошибка при парсинге URL ({url}): {e}")
return None
# </HELPER>
# <CONTRACT for="_scrape_data"> # <CONTRACT for="_scrape_data">
# description: "Шаг 3: Итеративный парсинг данных по списку URL." # description: "Шаг 3: Параллельный парсинг данных по списку URL с использованием ThreadPoolExecutor."
# </CONTRACT> # </CONTRACT>
# <ACTION name="_scrape_data"> # <ACTION name="_scrape_data">
def _scrape_data(self, urls: List[str]): def _scrape_data(self, urls: List[str]):
"""Итеративный парсинг данных.""" """Параллельный парсинг данных."""
with self._error_context("scrape_data"): with self._error_context("scrape_data"):
# <CORE_LOGIC> # <CORE_LOGIC>
total_to_scrape = len(urls) total_to_scrape = len(urls)
self.logger.info(f"[ACTION:_scrape_data] Начало парсинга {total_to_scrape} URL вариантов.") self.logger.info(f"[ACTION:_scrape_data] Начало параллельного парсинга {total_to_scrape} URL вариантов с {self.settings.num_parser_threads} потоками.")
for i, url in enumerate(urls): with ThreadPoolExecutor(max_workers=self.settings.num_parser_threads) as executor:
# <ERROR_HANDLER for="single_url_scrape"> future_to_url = {executor.submit(self._scrape_single_url, url): url for url in urls}
try:
self.logger.info(f"[ACTION:_scrape_data] Парсинг URL {i+1}/{total_to_scrape}: {url.split('/')[-1]}") for i, future in enumerate(as_completed(future_to_url)):
time.sleep(1) url = future_to_url[future]
try:
# <DEPENDENCY name="scraper.scrape_variant_page" /> variant_data = future.result()
variant_data = self.scraper.scrape_variant_page( if variant_data:
variant_url=url, self.final_data.append(variant_data)
run_id=self.run_id self.stats['successful_parses'] += 1
) else:
self.stats['failed_parses'] += 1
if variant_data: except Exception as e:
self.final_data.append(variant_data)
self.stats['successful_parses'] += 1
else:
self.stats['failed_parses'] += 1 self.stats['failed_parses'] += 1
self.logger.error(f"[ACTION:_scrape_data] Необработанная ошибка в потоке для URL ({url}): {e}")
except Exception as e:
self.stats['failed_parses'] += 1
self.logger.error(f"[ACTION:_scrape_data] Ошибка при парсинге URL {i+1}/{total_to_scrape} ({url}): {e}")
continue
# </ERROR_HANDLER>
self.logger.info(f"[ACTION:_scrape_data] Парсинг данных завершен. Всего собрано {len(self.final_data)} валидных вариантов.") self.logger.info(f"[ACTION:_scrape_data] Парсинг данных завершен. Всего собрано {len(self.final_data)} валидных вариантов.")
self.logger.info(f"[STATS][ACTION:_scrape_data] Успешно: {self.stats['successful_parses']}, Ошибок: {self.stats['failed_parses']}") self.logger.info(f"[STATS][ACTION:_scrape_data] Успешно: {self.stats['successful_parses']}, Ошибок: {self.stats['failed_parses']}")
# </CORE_LOGIC> # </CORE_LOGIC>
@@ -238,12 +317,10 @@ class AppOrchestrator:
except Exception as e: except Exception as e:
self.logger.error(f"[ACTION:_save_results] Ошибка при сохранении в БД: {e}") self.logger.error(f"[ACTION:_save_results] Ошибка при сохранении в БД: {e}")
if ENABLE_RABBITMQ_EXPORT: if ENABLE_RABBITMQ_EXPORT and self.rabbitmq_exporter:
# <DEPENDENCY name="export_data_to_rabbitmq" />
# ... (logic remains the same)
try: try:
data_to_rabbitmq = [p.model_dump() for p in self.final_data] data_to_rabbitmq = [p.model_dump() for p in self.final_data]
if export_data_to_rabbitmq(data_to_rabbitmq, self.run_id, self.run_id): if self.rabbitmq_exporter.export_products(data_to_rabbitmq, self.run_id):
self.logger.info("[ACTION:_save_results] Данные успешно экспортированы в RabbitMQ.") self.logger.info("[ACTION:_save_results] Данные успешно экспортированы в RabbitMQ.")
else: else:
self.logger.error("[ACTION:_save_results] Не удалось экспортировать данные в RabbitMQ.") self.logger.error("[ACTION:_save_results] Не удалось экспортировать данные в RabbitMQ.")
@@ -272,6 +349,10 @@ class AppOrchestrator:
if self.db_manager: if self.db_manager:
self.db_manager.close() self.db_manager.close()
self.logger.debug("[ACTION:_cleanup] Соединение с базой данных закрыто.") self.logger.debug("[ACTION:_cleanup] Соединение с базой данных закрыто.")
if self.rabbitmq_exporter:
self.rabbitmq_exporter.close()
self.logger.debug("[ACTION:_cleanup] Соединение с RabbitMQ закрыто.")
if duration: if duration:
self.logger.info(f"[STATS][ACTION:_cleanup] Время выполнения: {duration.total_seconds():.2f} секунд") self.logger.info(f"[STATS][ACTION:_cleanup] Время выполнения: {duration.total_seconds():.2f} секунд")
@@ -289,8 +370,12 @@ class AppOrchestrator:
# </CONTRACT> # </CONTRACT>
# <ENTRYPOINT name="run"> # <ENTRYPOINT name="run">
def run(self): def run(self):
"""Основной метод, запускающий весь процесс парсинга.""" """Основной метод, запускающий все этапы работы оркестратора."""
self.logger.info("="*50) self.logger.info(f"Запуск оркестратора. Run ID: {self.run_id}")
if self.db_manager:
self._register_run()
# ... остальной код ...
self.logger.info("[ENTRYPOINT:run] Запуск главного процесса оркестратора.") self.logger.info("[ENTRYPOINT:run] Запуск главного процесса оркестратора.")
self.logger.info("="*50) self.logger.info("="*50)

View File

@@ -26,25 +26,20 @@ from core.settings import ScraperSelectors
# </MAIN_CONTRACT> # </MAIN_CONTRACT>
class Scraper: class Scraper:
# <INIT name="__init__"> # <INIT name="__init__">
def __init__(self, session: requests.Session, selectors: ScraperSelectors, base_url: str): def __init__(self, selectors: ScraperSelectors, base_url: str):
"""Инициализирует скрейпер с зависимостями: сессия, селекторы и базовый URL.""" """Инициализирует скрейпер с зависимостями: сессия, селекторы и базовый URL."""
# <STATE name="initial_state"> # <STATE name="initial_state">
self.session = session
self.selectors = selectors self.selectors = selectors
self.base_url = base_url self.base_url = base_url
self.logger = logging.getLogger(self.__class__.__name__) self.logger = logging.getLogger(self.__class__.__name__)
# </STATE> # </STATE>
# <ACTION name="setup_retry_strategy_on_init">
self._setup_retry_strategy()
# </ACTION>
# </INIT> # </INIT>
# <CONTRACT for="_setup_retry_strategy"> # <CONTRACT for="_setup_retry_strategy">
# description: "Настраивает retry-логику для HTTP-адаптера сессии." # description: "Настраивает retry-логику для HTTP-адаптера сессии."
# </CONTRACT> # </CONTRACT>
# <HELPER name="_setup_retry_strategy"> # <HELPER name="_setup_retry_strategy">
def _setup_retry_strategy(self): def _setup_retry_strategy(self, session: requests.Session):
"""Настраивает retry стратегию для HTTP запросов.""" """Настраивает retry стратегию для HTTP запросов."""
# <CORE_LOGIC> # <CORE_LOGIC>
retry_strategy = Retry( retry_strategy = Retry(
@@ -54,12 +49,27 @@ class Scraper:
allowed_methods=["HEAD", "GET", "OPTIONS"] allowed_methods=["HEAD", "GET", "OPTIONS"]
) )
adapter = HTTPAdapter(max_retries=retry_strategy) adapter = HTTPAdapter(max_retries=retry_strategy)
self.session.mount("http://", adapter) session.mount("http://", adapter)
self.session.mount("https://", adapter) session.mount("https://", adapter)
self.logger.debug("[HELPER:_setup_retry_strategy] Retry стратегия настроена для HTTP запросов.") self.logger.debug("[HELPER:_setup_retry_strategy] Retry стратегия настроена для HTTP запросов.")
# </CORE_LOGIC> # </CORE_LOGIC>
# </HELPER> # </HELPER>
# <CONTRACT for="_find_element_with_multiple_selectors">
# description: "Пытается найти элемент, используя список CSS-селекторов."
# postconditions: "Возвращает найденный элемент BeautifulSoup или None."
# </CONTRACT>
# <HELPER name="_find_element_with_multiple_selectors">
def _find_element_with_multiple_selectors(self, soup: BeautifulSoup, selectors: List[str], log_prefix: str, element_name: str):
for selector in selectors:
element = soup.select_one(selector)
if element:
self.logger.debug(f"{log_prefix} [FOUND_ELEMENT] Элемент '{element_name}' найден с селектором: '{selector}'")
return element
self.logger.warning(f"{log_prefix} [MISSING_ELEMENT] Элемент '{element_name}' не найден ни с одним из селекторов: {selectors}")
return None
# </HELPER>
# <CONTRACT for="_clean_price"> # <CONTRACT for="_clean_price">
# description: "Очищает строковое представление цены, оставляя только цифры." # description: "Очищает строковое представление цены, оставляя только цифры."
# postconditions: "Возвращает целое число или 0 в случае ошибки." # postconditions: "Возвращает целое число или 0 в случае ошибки."
@@ -93,7 +103,7 @@ class Scraper:
# postconditions: "Возвращает текстовое содержимое страницы или None в случае любой ошибки." # postconditions: "Возвращает текстовое содержимое страницы или None в случае любой ошибки."
# </CONTRACT> # </CONTRACT>
# <HELPER name="_fetch_page"> # <HELPER name="_fetch_page">
def _fetch_page(self, url: str, request_id: str) -> Optional[str]: def _fetch_page(self, session: requests.Session, url: str, request_id: str) -> Optional[str]:
"""Приватный метод для скачивания HTML-содержимого страницы.""" """Приватный метод для скачивания HTML-содержимого страницы."""
log_prefix = f"[HELPER:_fetch_page(id={request_id})]" log_prefix = f"[HELPER:_fetch_page(id={request_id})]"
self.logger.debug(f"{log_prefix} Запрос к URL: {url}") self.logger.debug(f"{log_prefix} Запрос к URL: {url}")
@@ -101,7 +111,7 @@ class Scraper:
# <ERROR_HANDLER for="network_requests"> # <ERROR_HANDLER for="network_requests">
try: try:
# <CORE_LOGIC> # <CORE_LOGIC>
response = self.session.get(url, timeout=30) response = session.get(url, timeout=30)
response.raise_for_status() response.raise_for_status()
if not response.text.strip(): if not response.text.strip():
@@ -119,7 +129,7 @@ class Scraper:
self.logger.error(f"{log_prefix} [TIMEOUT] Превышено время ожидания для {url}") self.logger.error(f"{log_prefix} [TIMEOUT] Превышено время ожидания для {url}")
return None return None
except requests.exceptions.ConnectionError as e: except requests.exceptions.ConnectionError as e:
self.logger.error(f"{log_prefix} [CONNECTION_ERROR] Ошибка соединения для {url}: {e}") self.logger.error(f"{log_prefix} [CONNECTION_ERROR] Ошибка соединения дл<EFBFBD><EFBFBD> {url}: {e}")
return None return None
except requests.exceptions.HTTPError as e: except requests.exceptions.HTTPError as e:
self.logger.error(f"{log_prefix} [HTTP_ERROR] HTTP ошибка для {url}: {e.response.status_code}") self.logger.error(f"{log_prefix} [HTTP_ERROR] HTTP ошибка для {url}: {e.response.status_code}")
@@ -144,7 +154,9 @@ class Scraper:
log_prefix = f"[ACTION:get_base_product_urls(id={run_id})]" log_prefix = f"[ACTION:get_base_product_urls(id={run_id})]"
self.logger.info(f"{log_prefix} Начало сбора базовых URL с: {catalog_url}") self.logger.info(f"{log_prefix} Начало сбора базовых URL с: {catalog_url}")
html = self._fetch_page(catalog_url, f"get_base_urls(id={run_id})") with requests.Session() as session:
self._setup_retry_strategy(session)
html = self._fetch_page(session, catalog_url, f"get_base_urls(id={run_id})")
if not html: if not html:
self.logger.error(f"{log_prefix} [CRITICAL] Не удалось получить HTML страницы каталога, возвращаю пустой список.") self.logger.error(f"{log_prefix} [CRITICAL] Не удалось получить HTML страницы каталога, возвращаю пустой список.")
return [] return []
@@ -196,7 +208,9 @@ class Scraper:
for i, base_url in enumerate(base_product_urls): for i, base_url in enumerate(base_product_urls):
self.logger.info(f"{log_prefix} Обработка базового URL {i+1}/{total_base}: {base_url.split('/')[-1]}") self.logger.info(f"{log_prefix} Обработка базового URL {i+1}/{total_base}: {base_url.split('/')[-1]}")
html = self._fetch_page(base_url, f"get_variant_urls(id={run_id})-{i+1}") with requests.Session() as session:
self._setup_retry_strategy(session)
html = self._fetch_page(session, base_url, f"get_variant_urls(id={run_id})-{i+1}")
if not html: if not html:
self.logger.warning(f"{log_prefix} Пропуск базового URL из-за ошибки загрузки: {base_url}") self.logger.warning(f"{log_prefix} Пропуск базового URL из-за ошибки загрузки: {base_url}")
continue continue
@@ -224,9 +238,7 @@ class Scraper:
all_variant_urls.append(base_url) all_variant_urls.append(base_url)
# </FALLBACK> # </FALLBACK>
time.sleep(0.5) self.logger.info(f"{log_prefix} [COHERENCE_CHECK_PASSED] Обнаружено всего {len(all_variant_urls)} URL вариантов для парсинга.")
self.logger.info(f"{log_prefix} [COHERENCE_CHECK_PASSED] Обнаружено всего {len(all_variant_urls)} URL вариантов для парсинга.")
return all_variant_urls return all_variant_urls
# </CORE_LOGIC> # </CORE_LOGIC>
# </ACTION> # </ACTION>
@@ -240,9 +252,11 @@ class Scraper:
def scrape_variant_page(self, variant_url: str, run_id: str) -> Optional[ProductVariant]: def scrape_variant_page(self, variant_url: str, run_id: str) -> Optional[ProductVariant]:
"""Парсит страницу одного варианта и возвращает Pydantic-модель.""" """Парсит страницу одного варианта и возвращает Pydantic-модель."""
log_prefix = f"[ACTION:scrape_variant_page(id={run_id}, url={variant_url.split('/')[-1]})]" log_prefix = f"[ACTION:scrape_variant_page(id={run_id}, url={variant_url.split('/')[-1]})]"
self.logger.info(f"{log_prefix} Начало парсинга страниц<EFBFBD><EFBFBD> варианта.") self.logger.info(f"{log_prefix} Начало парсинга страницы варианта.")
html = self._fetch_page(variant_url, f"scrape_variant(id={run_id}, url={variant_url.split('/')[-1]})") with requests.Session() as session:
self._setup_retry_strategy(session)
html = self._fetch_page(session, variant_url, f"scrape_variant(id={run_id}, url={variant_url.split('/')[-1]})")
if not html: if not html:
self.logger.warning(f"{log_prefix} Не удалось получить HTML страницы варианта, пропуск парсинга.") self.logger.warning(f"{log_prefix} Не удалось получить HTML страницы варианта, пропуск парсинга.")
return None return None
@@ -251,10 +265,10 @@ class Scraper:
try: try:
soup = BeautifulSoup(html, 'html.parser') soup = BeautifulSoup(html, 'html.parser')
name_el = soup.select_one(self.selectors.product_page_name) name_el = self._find_element_with_multiple_selectors(soup, [self.selectors.product_page_name, 'h1.product-title', 'h2.product-name'], log_prefix, 'имени продукта')
price_el = soup.select_one(self.selectors.price_block) price_el = self._find_element_with_multiple_selectors(soup, [self.selectors.price_block, 'span.price', 'div.price'], log_prefix, 'цены')
volume_el = soup.select_one(self.selectors.active_volume) volume_el = self._find_element_with_multiple_selectors(soup, [self.selectors.active_volume, '.product-options .active'], log_prefix, 'объема')
unavailable_el = soup.select_one(self.selectors.product_unavailable) unavailable_el = self._find_element_with_multiple_selectors(soup, [self.selectors.product_unavailable, 'div.out-of-stock', 'span.unavailable'], log_prefix, 'отсутствия в наличии')
# <PRECONDITION> # <PRECONDITION>
# Товар должен иметь имя. Цена или статус "нет в наличии" должны присутствовать. # Товар должен иметь имя. Цена или статус "нет в наличии" должны присутствовать.
@@ -262,7 +276,7 @@ class Scraper:
self.logger.warning(f"{log_prefix} [MISSING_ELEMENT] Не найден элемент имени продукта с селектором: {self.selectors.product_page_name}") self.logger.warning(f"{log_prefix} [MISSING_ELEMENT] Не найден элемент имени продукта с селектором: {self.selectors.product_page_name}")
return None return None
if not price_el and not unavailable_el: if not price_el and not unavailable_el:
self.logger.warning(f"{log_prefix} [MISSING_ELEMENT] Не найден ни элемент цены, ни элемент отсутствия в наличии.") self.logger.warning(f"{log_prefix} [MISSING_ELEMENT] Не найден ни элемент цены, ни элемент отсутствия в наличи<EFBFBD><EFBFBD>.")
return None return None
# </PRECONDITION> # </PRECONDITION>
@@ -293,6 +307,7 @@ class Scraper:
# <POSTCONDITION> # <POSTCONDITION>
# <CONTRACT_VALIDATOR by="Pydantic"> # <CONTRACT_VALIDATOR by="Pydantic">
try: try:
self.logger.debug(f"{log_prefix} [DEBUG_SCRAPE_DATA] Name: '{name}', Price: {price}, URL: '{variant_url}', IsInStock: {is_in_stock}")
product = ProductVariant(name=name, volume=volume, price=price, url=variant_url, is_in_stock=is_in_stock) product = ProductVariant(name=name, volume=volume, price=price, url=variant_url, is_in_stock=is_in_stock)
self.logger.debug(f"{log_prefix} [COHERENCE_CHECK_PASSED] Успешно распарсен вариант: '{product.name}' | InStock: {product.is_in_stock} | Price: '{product.price}'") self.logger.debug(f"{log_prefix} [COHERENCE_CHECK_PASSED] Успешно распарсен вариант: '{product.name}' | InStock: {product.is_in_stock} | Price: '{product.price}'")
return product return product
@@ -311,4 +326,4 @@ class Scraper:
# </FALLBACK> # </FALLBACK>
# </CORE_LOGIC> # </CORE_LOGIC>
# </ACTION> # </ACTION>
# <COHERENCE_CHECK status="PASSED" /> # <COHERENCE_CHECK status="PASSED" />

View File

@@ -0,0 +1,72 @@
# <MODULE name="utils.telegram_sender" semantics="telegram_notifications" />
# <IMPORTS>
import logging
import asyncio
from telegram import Bot
from telegram.error import TelegramError
from src.core.settings import settings
# </IMPORTS>
# <MAIN_CONTRACT for="TelegramSender">
# description: "Отправляет сообщения в Telegram, используя токен и ID чата из настроек."
# invariant: "Токен и ID чата должны быть заданы в конфигурации."
# </MAIN_CONTRACT>
class TelegramSender:
# <INIT>
def __init__(self):
if not settings.telegram_bot_token or not settings.telegram_chat_id:
raise ValueError("TELEGRAM_BOT_TOKEN и TELEGRAM_CHAT_ID должны быть установлены в .env")
self.bot = Bot(token=settings.telegram_bot_token)
self.chat_id = settings.telegram_chat_id
self.logger = logging.getLogger(__name__)
# </INIT>
# <CONTRACT for="send_message">
# description: "Асинхронно отправляет текстовое сообщение <20><> Telegram."
# preconditions:
# - "Сообщение (message) должно быть непустой строкой."
# postconditions:
# - "Сообщение отправлено в чат."
# exceptions:
# - "TelegramError: при ошибках API Telegram."
# - "Exception: при других ошибках."
# </CONTRACT>
# <ACTION name="send_message">
async def send_message(self, message: str):
"""Асинхронно отправляет сообщение в Telegram."""
# <CORE_LOGIC>
try:
self.logger.info(f"Отправка сообщения в Telegram (чат ID: {self.chat_id})...")
await self.bot.send_message(
chat_id=self.chat_id,
text=message,
parse_mode='HTML'
)
self.logger.info("Сообщение в Telegram успешно отправлено.")
except TelegramError as e:
self.logger.error(f"Ошибка отправки сообщения в Telegram: {e}")
raise
except Exception as e:
self.logger.error(f"Непредвиденная ошибка при отправке сообщения в Telegram: {e}")
raise
# </CORE_LOGIC>
# </ACTION>
# <HELPER name="send_telegram_notification">
# description: "Удобная обертка для инициализации и отправки сообщения."
# </HELPER>
async def send_telegram_notification(message: str):
"""
Инициализирует TelegramSender и отправляет сообщение.
Является удобной оберткой для вызова из других частей приложения.
"""
try:
sender = TelegramSender()
await sender.send_message(message)
except ValueError as e:
logging.error(f"Ошибка инициализации TelegramSender: {e}")
except Exception as e:
logging.error(f"Не удалось отправить уведомление в Telegram: {e}")
# <COHERENCE_CHECK status="PASSED" />