Compare commits
11 Commits
840e2c4d6a
...
gzip-loggi
| Author | SHA1 | Date | |
|---|---|---|---|
| abcbb08134 | |||
| ae6d81f6f2 | |||
| 69e90bd4f2 | |||
| c59dff3397 | |||
| 5742d474fd | |||
| 40317aa2e7 | |||
| 06cfca4cd1 | |||
| cf52a7c0ad | |||
| ae9c61facc | |||
| 0259289ee9 | |||
| fe1a022609 |
5
.gitignore
vendored
5
.gitignore
vendored
@@ -19,4 +19,7 @@ price_data_final/
|
||||
.vscode/
|
||||
|
||||
#backups
|
||||
*.bak
|
||||
*.bak
|
||||
|
||||
# Logs
|
||||
/logs/
|
||||
236
README.md
236
README.md
@@ -1,58 +1,55 @@
|
||||
# ANCHOR: Project_README
|
||||
# Семантика: Документация, описывающая проект, его структуру и способ использования.
|
||||
|
||||
# Парсер цен для ElixirPeptide
|
||||
# Сервис мониторинга цен ElixirPeptide (v3.0)
|
||||
|
||||
Это структурированное Python-приложение для парсинга каталога товаров с сайта `elixirpeptide.ru`, сбора информации о вариантах товаров и их ценах.
|
||||
Это распределенное Python-приложение для мониторинга каталога товаров с сайта `elixirpeptide.ru`. Сервис автоматически отслеживает изменения в ценах и наличии товаров, и отправляет отчеты в Telegram.
|
||||
|
||||
## 🚀 Новые возможности (v2.0)
|
||||
## 🚀 Архитектура (v3.0)
|
||||
|
||||
### ✅ Исправленные критические проблемы:
|
||||
- **Устранено дублирование кода** в `engine.py` и `database.py`
|
||||
- **Дополнены зависимости** в `requirements.txt` (pydantic, lxml, python-dotenv)
|
||||
- **Улучшена обработка ошибок** с детальной диагностикой и retry механизмом
|
||||
- **Добавлена валидация данных** на всех уровнях приложения
|
||||
Проект перешел от простого парсера к полноценному сервису мониторинга с асинхронной обработкой данных.
|
||||
|
||||
### 🎯 Новые функции:
|
||||
- **Retry стратегия** для HTTP запросов с экспоненциальной задержкой
|
||||
- **Детальная статистика** выполнения парсинга
|
||||
- **Валидация конфигурации** при запуске
|
||||
- **Поддержка переменных окружения** через `.env` файл
|
||||
- **Graceful degradation** - продолжение работы при частичных сбоях
|
||||
- **Улучшенное логирование** с категоризацией ошибок
|
||||
- **Точка входа**: `monitoring_service.py` — главный скрипт, запускаемый по расписанию (cron).
|
||||
- **Оркестратор**: `src/orchestrator.py` управляет процессом парсинга.
|
||||
- **Анализатор**: `src/analyzer.py` сравнивает данные последнего и предпоследнего запусков, выявляя изменения.
|
||||
- **Уведомления**: `src/utils/telegram_sender.py` отправляет HTML-отчеты об изменениях в Telegram.
|
||||
- **Очередь сообщений**: Интеграция с **RabbitMQ** для асинхронного экспорта данных о товарах и логах.
|
||||
- **База данных**: **SQLite** используется для хранения истории парсинга и данных для анализа.
|
||||
|
||||
### 🔧 Улучшения производительности:
|
||||
- **Адаптивные таймауты** для HTTP запросов
|
||||
- **Проверка на блокировку/капчу** в ответах сервера
|
||||
- **Оптимизированная обработка данных** с пропуском некорректных записей
|
||||
## ✨ Ключевые возможности
|
||||
|
||||
- **Автоматический мониторинг**: Запуск по расписанию для непрерывного отслеживания.
|
||||
- **Отчеты об изменениях**: Уведомления в Telegram о новых, удаленных товарах, а также изменениях цен и наличия.
|
||||
- **Асинхронный экспорт**: Отправка данных в RabbitMQ для дальнейшей обработки другими системами.
|
||||
- **Надежность**: Механизм retry-запросов, детальное логирование и отказоустойчивость.
|
||||
- **Гибкая конфигурация**: Все параметры настраиваются через `.env` файл.
|
||||
|
||||
## Структура Проекта
|
||||
|
||||
Проект организован по принципу семантического разделения ответственности для удобства поддержки и дальнейшей разработки.
|
||||
|
||||
- `monitoring_service.py`: **[NEW]** Главная точка входа для запуска по расписанию.
|
||||
- `src/`: Основная директория с исходным кодом.
|
||||
- `config.py`: Все настройки (URL, селекторы, флаги сохранения).
|
||||
- `main.py`: Точка входа в приложение, оркестратор процесса.
|
||||
- `core/`: Пакет с ядром приложения.
|
||||
- `orchestrator.py`: **[NEW]** Управляет всем процессом парсинга.
|
||||
- `analyzer.py`: **[NEW]** Анализирует данные и формирует отчеты.
|
||||
- `main.py`: Точка входа для ручного запуска парсинга.
|
||||
- `core/`: Ядро приложения.
|
||||
- `settings.py`: **[ENHANCED]** Pydantic-модели для конфигурации (включая Telegram, RabbitMQ).
|
||||
- `database.py`: Логика работы с базой данных SQLite.
|
||||
- `logging_config.py`: Настройка системы логирования.
|
||||
- **`models.py`: [NEW FILE] Pydantic модели данных (ProductVariant, LogRecordModel).**
|
||||
- **`settings.py`: [ENHANCED] Конфигурация с валидацией и поддержкой .env**
|
||||
- `scraper/`: Пакет с логикой парсинга.
|
||||
- **`engine.py`: [ENHANCED] Класс Scraper с retry механизмом и улучшенной обработкой ошибок**
|
||||
- `utils/`: Пакет со вспомогательными утилитами.
|
||||
- **`exporters.py`: [ENHANCED] Функции для сохранения данных с валидацией**
|
||||
- `requirements.txt`: Список зависимостей проекта.
|
||||
- `price_data_final/`: Директория для хранения результатов (создается автоматически).
|
||||
- **`.env.example`: [NEW] Пример файла с переменными окружения**
|
||||
- `rabbitmq.py`: **[NEW]** Клиент для работы с RabbitMQ.
|
||||
- `scraper/`: Логика парсинга.
|
||||
- `engine.py`: Улучшенный движок парсера.
|
||||
- `utils/`: Вспомогательные утилиты.
|
||||
- `telegram_sender.py`: **[NEW]** Отправка уведомлений в Telegram.
|
||||
- `requirements.txt`: **[UPDATED]** Обновленный список зависимостей.
|
||||
- `.env.example`: Пример файла с переменными окружения.
|
||||
- `RABBITMQ_SETUP.md`: **[NEW]** Руководство по настройке RabbitMQ.
|
||||
|
||||
## Установка и Запуск
|
||||
|
||||
### 1. Клонирование и настройка окружения
|
||||
|
||||
```bash
|
||||
git clone <your-repo-url>
|
||||
cd peptide_parser_project
|
||||
git clone https://gitea.bebesh.ru/busya/peptide-parcer.git
|
||||
cd peptide-parcer
|
||||
|
||||
# Создание виртуального окружения
|
||||
python -m venv venv
|
||||
@@ -62,139 +59,68 @@ source venv/bin/activate # Для Windows: venv\Scripts\activate
|
||||
pip install -r requirements.txt
|
||||
```
|
||||
|
||||
### 2. Настройка конфигурации
|
||||
### 2. Настройка RabbitMQ (Опционально)
|
||||
|
||||
#### Вариант A: Через переменные окружения
|
||||
Для асинхронного экспорта данных требуется RabbitMQ. Рекомендуется запуск через Docker:
|
||||
```bash
|
||||
# Создайте файл .env на основе .env.example
|
||||
cp .env.example .env
|
||||
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
|
||||
```
|
||||
Подробности см. в `RABBITMQ_SETUP.md`.
|
||||
|
||||
# Отредактируйте .env файл под ваши нужды
|
||||
nano .env
|
||||
### 3. Настройка конфигурации
|
||||
|
||||
Скопируйте `.env.example` в `.env` и заполните необходимые параметры.
|
||||
|
||||
```bash
|
||||
cp .env.example .env
|
||||
nano .env # Отредактируйте файл
|
||||
```
|
||||
|
||||
#### Вариант B: Прямое редактирование настроек
|
||||
Отредактируйте `src/core/settings.py` для изменения настроек по умолчанию.
|
||||
**Ключевые переменные окружения:**
|
||||
- `TELEGRAM_BOT_TOKEN`: Токен вашего Telegram-бота.
|
||||
- `TELEGRAM_CHAT_ID`: ID чата, куда будут приходить отчеты.
|
||||
- `ENABLE_RABBITMQ_EXPORT`: `true` или `false` для включения/отключения экспорта в RabbitMQ.
|
||||
- `RABBITMQ_HOST`, `RABBITMQ_PORT` и т.д.: Настройки подключения к RabbitMQ.
|
||||
|
||||
### 3. Запуск парсера
|
||||
### 4. Запуск
|
||||
|
||||
#### Автоматический мониторинг (рекомендуется)
|
||||
Настройте запуск `monitoring_service.py` через `cron` или планировщик задач.
|
||||
```bash
|
||||
# Пример для cron, запуск каждый час
|
||||
# 0 * * * * /path/to/your/project/venv/bin/python /path/to/your/project/monitoring_service.py >> /path/to/your/project/logs/cron.log 2>&1
|
||||
|
||||
# Ручной запуск для проверки
|
||||
python monitoring_service.py
|
||||
```
|
||||
|
||||
#### Ручной запуск парсинга (без анализа и отчета)
|
||||
```bash
|
||||
python src/main.py
|
||||
```
|
||||
|
||||
## Конфигурация
|
||||
|
||||
### Переменные окружения (.env файл)
|
||||
|
||||
| Переменная | Описание | По умолчанию |
|
||||
|------------|----------|--------------|
|
||||
| `PARSER_BASE_URL` | Базовый URL сайта | `https://elixirpeptide.ru` |
|
||||
| `PARSER_CATALOG_URL` | URL каталога товаров | `https://elixirpeptide.ru/catalog/` |
|
||||
| `PARSER_SAVE_TO_CSV` | Сохранять в CSV | `true` |
|
||||
| `PARSER_SAVE_TO_DB` | Сохранять в базу данных | `true` |
|
||||
| `PARSER_LOG_TO_DB` | Логировать в базу данных | `true` |
|
||||
| `PARSER_TIMEOUT` | Таймаут HTTP запросов (сек) | `30` |
|
||||
| `PARSER_DELAY` | Задержка между запросами (сек) | `1.0` |
|
||||
| `PARSER_RETRIES` | Максимум попыток для запросов | `3` |
|
||||
|
||||
### Настройки производительности
|
||||
|
||||
- **Таймаут запросов**: 30 секунд (настраивается)
|
||||
- **Задержка между запросами**: 1 секунда (настраивается)
|
||||
- **Retry стратегия**: 3 попытки с экспоненциальной задержкой
|
||||
- **Graceful degradation**: Продолжение работы при ошибках отдельных запросов
|
||||
|
||||
## Результаты
|
||||
|
||||
### Файлы результатов
|
||||
### Отчеты в Telegram
|
||||
Сервис присылает в указанный чат отчет, если обнаруживает изменения:
|
||||
- **💰 Изменились цены**: список товаров с новой и старой ценой.
|
||||
- **📦 Изменилось наличие**: список товаров, которые появились или закончились.
|
||||
- **✨ Новые товары**: список добавленных товаров.
|
||||
- **🗑️ Удаленные товары**: список удаленных товаров.
|
||||
|
||||
- **CSV файл**: `price_data_final/prices_full_catalog_YYYY-MM-DD_HHMMSS.csv`
|
||||
- **База данных**: `price_data_final/parser_data.db` (SQLite)
|
||||
|
||||
### Структура данных
|
||||
|
||||
```csv
|
||||
name,volume,price
|
||||
"Peptide X","30ml",1500
|
||||
"Peptide Y","50ml",2500
|
||||
```
|
||||
|
||||
### Логирование
|
||||
|
||||
- **Консольные логи**: Детальная информация о процессе парсинга
|
||||
- **Логи в БД**: Если `PARSER_LOG_TO_DB=true`, все логи сохраняются в таблицу `logs`
|
||||
|
||||
## Обработка ошибок
|
||||
|
||||
### Типы обрабатываемых ошибок
|
||||
|
||||
1. **Сетевые ошибки**: Timeout, ConnectionError, HTTPError
|
||||
2. **Ошибки парсинга**: Отсутствующие элементы, некорректные данные
|
||||
3. **Ошибки файловой системы**: Права доступа, отсутствие директорий
|
||||
4. **Ошибки базы данных**: SQLite ошибки, проблемы с подключением
|
||||
|
||||
### Стратегия восстановления
|
||||
|
||||
- **Retry механизм**: Автоматические повторные попытки для сетевых ошибок
|
||||
- **Graceful degradation**: Пропуск проблемных записей с продолжением работы
|
||||
- **Детальная диагностика**: Подробные логи для анализа проблем
|
||||
|
||||
## Мониторинг и статистика
|
||||
|
||||
### Статистика выполнения
|
||||
|
||||
Приложение выводит детальную статистику:
|
||||
|
||||
```
|
||||
[FINAL_STATS] Время выполнения: 45.23 секунд
|
||||
[FINAL_STATS] Успешность: 95/100 (95.0%)
|
||||
[STATS] Успешно: 95, Ошибок: 5
|
||||
```
|
||||
|
||||
### Метрики
|
||||
|
||||
- Общее количество URL для парсинга
|
||||
- Количество успешно обработанных записей
|
||||
- Количество ошибок
|
||||
- Время выполнения
|
||||
- Процент успешности
|
||||
|
||||
## Разработка
|
||||
|
||||
### Архитектурные принципы
|
||||
|
||||
1. **Разделение ответственности**: Каждый модуль отвечает за свою область
|
||||
2. **Типизация**: Использование Pydantic для валидации данных
|
||||
3. **Обработка ошибок**: Graceful handling с детальной диагностикой
|
||||
4. **Конфигурируемость**: Гибкие настройки через переменные окружения
|
||||
5. **Логирование**: Структурированное логирование на всех уровнях
|
||||
|
||||
### Добавление новых функций
|
||||
|
||||
1. **Новые форматы экспорта**: Добавьте функции в `src/utils/exporters.py`
|
||||
2. **Новые селекторы**: Обновите `ScraperSelectors` в `src/core/settings.py`
|
||||
3. **Новые поля данных**: Расширьте модель `ProductVariant` в `src/core/models.py`
|
||||
### Данные
|
||||
- **База данных**: `price_data_final/parser_data.db` — хранит всю историю парсинга.
|
||||
- **CSV файл**: `price_data_final/prices_full_catalog_*.csv` — создается при ручном запуске.
|
||||
- **RabbitMQ**: Сообщения с данными о товарах и логах отправляются в очереди `price_parser.products` и `price_parser.logs`.
|
||||
|
||||
## Устранение неполадок
|
||||
|
||||
### Частые проблемы
|
||||
|
||||
1. **"Не удается подключиться к базовому URL"**
|
||||
- Проверьте интернет-соединение
|
||||
- Убедитесь, что сайт доступен
|
||||
- Проверьте настройки прокси
|
||||
|
||||
2. **"Не найдено ни одной ссылки на товар"**
|
||||
- Проверьте CSS селекторы в настройках
|
||||
- Убедитесь, что структура сайта не изменилась
|
||||
|
||||
3. **"Ошибка при сохранении в БД"**
|
||||
- Проверьте права доступа к директории
|
||||
- Убедитесь, что SQLite поддерживается
|
||||
|
||||
### Логи для диагностики
|
||||
|
||||
Все ошибки логируются с детальной информацией. Проверьте:
|
||||
- Консольные логи при запуске
|
||||
- Логи в базе данных (если включено)
|
||||
- Файлы результатов для проверки данных
|
||||
1. **Не приходят сообщения в Telegram**:
|
||||
- Проверьте правильность `TELEGRAM_BOT_TOKEN` и `TELEGRAM_CHAT_ID`.
|
||||
- Убедитесь, что бот добавлен в чат с нужными правами.
|
||||
2. **Ошибка подключения к RabbitMQ**:
|
||||
- Проверьте, что RabbitMQ запущен и доступен.
|
||||
- Убедитесь в корректности настроек в `.env`.
|
||||
3. **Ошибки парсинга**:
|
||||
- Проверьте доступность сайта `elixirpeptide.ru`.
|
||||
- Возможно, изменилась структура HTML-страниц. В этом случае нужно обновить CSS-селекторы в `src/core/settings.py`.
|
||||
|
||||
76
monitoring_service.py
Normal file
76
monitoring_service.py
Normal file
@@ -0,0 +1,76 @@
|
||||
# <MODULE name="monitoring_service" semantics="main_entrypoint_for_cron" />
|
||||
|
||||
# <IMPORTS>
|
||||
import logging
|
||||
import asyncio
|
||||
import sys
|
||||
import os
|
||||
from datetime import datetime
|
||||
|
||||
# Добавляем 'src' в sys.path, чтобы внутренние импорты (e.g., `from core...`) работали
|
||||
sys.path.insert(0, os.path.join(os.path.abspath(os.path.dirname(__file__)), 'src'))
|
||||
|
||||
from orchestrator import AppOrchestrator
|
||||
from core.settings import settings
|
||||
from core.logging_config import setup_logging
|
||||
from analyzer import DataAnalyzer
|
||||
from utils.telegram_sender import send_telegram_notification
|
||||
# </IMPORTS>
|
||||
|
||||
# <CONTRACT for="main">
|
||||
# description: "Главная точка входа для сервиса мониторинга. Запускается по расписанию (cron)."
|
||||
# postconditions:
|
||||
# - "Парсер запускается, данные сохраняются в БД."
|
||||
# - "Если обнаружены изменения, отправляется отчет в Telegram."
|
||||
# </CONTRACT>
|
||||
# <ENTRYPOINT name="main">
|
||||
async def main():
|
||||
"""
|
||||
Основная асинхронная функция, которая запускает парсер,
|
||||
анализирует данные и отправляет уведомление.
|
||||
"""
|
||||
run_id = datetime.now().strftime("%Y%m%d-%H%M%S")
|
||||
setup_logging(run_id=run_id)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
logger.info(f"🚀 Запуск сервиса мониторинга. Run ID: {run_id}")
|
||||
|
||||
try:
|
||||
# 1. Запуск парсера
|
||||
logger.info("Начало этапа парсинга...")
|
||||
orchestrator = AppOrchestrator(settings=settings)
|
||||
orchestrator.run()
|
||||
logger.info("Этап парсинга завершен.")
|
||||
|
||||
# 2. Анализ данных
|
||||
logger.info("Начало этапа анализа данных...")
|
||||
analyzer = DataAnalyzer()
|
||||
report_message = analyzer.analyze()
|
||||
logger.info("Этап анализа данных завершен.")
|
||||
|
||||
# 3. Отправка отчета
|
||||
if report_message:
|
||||
logger.info("Обнаружены изменения, отправка отчета в Telegram...")
|
||||
await send_telegram_notification(report_message)
|
||||
else:
|
||||
logger.info("Изменений не найдено, отправка отчета не требуется.")
|
||||
|
||||
except Exception as e:
|
||||
logger.critical(f"💥 Критическая ошибка в сервисе мониторинга: {e}", exc_info=True)
|
||||
# Попытка отправить уведомление об ошибке
|
||||
try:
|
||||
error_message = f"<b>❗️ Критическая ошибка в сервисе мониторинга</b>\n\n<pre>{e}</pre>"
|
||||
await send_telegram_notification(error_message)
|
||||
except Exception as tg_e:
|
||||
logger.error(f"Не удалось даже отправить уведомление об ошибке в Telegram: {tg_e}")
|
||||
|
||||
logger.info("✅ Сервис мониторинга завершил работу.")
|
||||
|
||||
# <MAIN_CONTRACT>
|
||||
# description: "Стандартный блок для запуска main() при выполнении скрипта."
|
||||
# </MAIN_CONTRACT>
|
||||
if __name__ == "__main__":
|
||||
# Используем asyncio.run() для запуска асинхронной функции main
|
||||
asyncio.run(main())
|
||||
|
||||
# <COHERENCE_CHECK status="PASSED" />
|
||||
@@ -17,4 +17,12 @@ python-dotenv>=1.0.0
|
||||
|
||||
# ANCHOR: RabbitMQ_Dependencies
|
||||
# Семантика: Зависимости для работы с очередью сообщений RabbitMQ
|
||||
pika>=1.3.0
|
||||
pika>=1.3.0
|
||||
|
||||
# ANCHOR: Database_Dependencies
|
||||
# Семантика: Зависимости для работы с базой данных
|
||||
SQLAlchemy>=2.0.0
|
||||
|
||||
# ANCHOR: Telegram_Dependencies
|
||||
# Семантика: Зависимости для отправки уведомлений в Telegram
|
||||
python-telegram-bot>=21.0.0
|
||||
118
src/analyzer.py
Normal file
118
src/analyzer.py
Normal file
@@ -0,0 +1,118 @@
|
||||
# <MODULE name="analyzer" semantics="data_comparison_logic" />
|
||||
|
||||
# <IMPORTS>
|
||||
import logging
|
||||
import sqlite3
|
||||
from typing import List, Dict, Tuple, Any
|
||||
from src.core.settings import settings
|
||||
# </IMPORTS>
|
||||
|
||||
# <MAIN_CONTRACT for="DataAnalyzer">
|
||||
# description: "Анализирует данные двух последних запусков парсера и выявляет изменения, используя прямые SQL-запросы."
|
||||
# </MAIN_CONTRACT>
|
||||
class DataAnalyzer:
|
||||
# <INIT>
|
||||
def __init__(self):
|
||||
self.db_path = settings.db_path
|
||||
self.logger = logging.getLogger(__name__)
|
||||
# </INIT>
|
||||
|
||||
def _execute_query(self, query: str, params: tuple = ()) -> List[Any]:
|
||||
try:
|
||||
with sqlite3.connect(self.db_path) as conn:
|
||||
cursor = conn.cursor()
|
||||
cursor.execute(query, params)
|
||||
return cursor.fetchall()
|
||||
except sqlite3.Error as e:
|
||||
self.logger.error(f"Ошибка выполнения SQL-запроса: {e}")
|
||||
return []
|
||||
|
||||
# <HELPER name="_get_last_two_runs">
|
||||
def _get_last_two_runs(self) -> Tuple[str, str]:
|
||||
"""Возвращает ID двух последних успешных запусков."""
|
||||
query = "SELECT run_id FROM parsing_runs ORDER BY start_time DESC LIMIT 2"
|
||||
runs = self._execute_query(query)
|
||||
if len(runs) < 2:
|
||||
self.logger.warning("Найдено менее двух запусков. Сравнение невозможно.")
|
||||
return None, None
|
||||
return runs[0][0], runs[1][0] # (current_run_id, previous_run_id)
|
||||
# </HELPER>
|
||||
|
||||
# <HELPER name="_get_data_for_run">
|
||||
def _get_data_for_run(self, run_id: str) -> Dict[str, Dict]:
|
||||
"""Возвращает данные о товарах для указанного запуска в виде словаря {url: product_data}."""
|
||||
query = "SELECT name, volume, price, url, is_in_stock FROM products WHERE run_id = ?"
|
||||
rows = self._execute_query(query, (run_id,))
|
||||
# Преобразуем sqlite3.Row в словари
|
||||
return {row[3]: dict(zip(['name', 'volume', 'price', 'url', 'is_in_stock'], row)) for row in rows}
|
||||
# </HELPER>
|
||||
|
||||
# <ACTION name="analyze">
|
||||
def analyze(self) -> str:
|
||||
"""
|
||||
Сравнивает два последних запуска и генерирует HTML-отчет об изменениях.
|
||||
"""
|
||||
current_run_id, prev_run_id = self._get_last_two_runs()
|
||||
if not current_run_id or not prev_run_id:
|
||||
return ""
|
||||
|
||||
self.logger.info(f"Сравнение запуска {current_run_id} с предыдущим запуском {prev_run_id}")
|
||||
|
||||
current_data = self._get_data_for_run(current_run_id)
|
||||
prev_data = self._get_data_for_run(prev_run_id)
|
||||
|
||||
price_changes = []
|
||||
availability_changes = []
|
||||
new_items = []
|
||||
removed_items = []
|
||||
|
||||
for url, current_item in current_data.items():
|
||||
prev_item = prev_data.get(url)
|
||||
if prev_item:
|
||||
if current_item['price'] != prev_item['price']:
|
||||
price_changes.append((current_item, prev_item['price']))
|
||||
if current_item['is_in_stock'] != prev_item['is_in_stock']:
|
||||
availability_changes.append(current_item)
|
||||
else:
|
||||
new_items.append(current_item)
|
||||
|
||||
for url, prev_item in prev_data.items():
|
||||
if url not in current_data:
|
||||
removed_items.append(prev_item)
|
||||
|
||||
return self._format_report(price_changes, availability_changes, new_items, removed_items)
|
||||
# </ACTION>
|
||||
|
||||
# <HELPER name="_format_report">
|
||||
def _format_report(self, price_changes, availability_changes, new_items, removed_items) -> str:
|
||||
"""Форматирует найденные изменения в красивый HTML-отчет."""
|
||||
if not any([price_changes, availability_changes, new_items, removed_items]):
|
||||
self.logger.info("Изменений не найдено.")
|
||||
return ""
|
||||
|
||||
report_parts = ["<b>📈 Отчет об изменениях на сайте</b>\n"]
|
||||
|
||||
if price_changes:
|
||||
report_parts.append("\n<b>💰 Изменились цены:</b>")
|
||||
for item, old_price in price_changes:
|
||||
report_parts.append(f" • <a href='{item['url']}'>{item['name']} ({item['volume']})</a>: {old_price} ₽ → <b>{item['price']} ₽</b>")
|
||||
|
||||
if availability_changes:
|
||||
report_parts.append("\n<b>📦 Изменилось наличие:</b>")
|
||||
for item in availability_changes:
|
||||
status = "✅ В наличии" if item['is_in_stock'] else "❌ Нет в наличии"
|
||||
report_parts.append(f" • <a href='{item['url']}'>{item['name']} ({item['volume']})</a>: <b>{status}</b>")
|
||||
|
||||
if new_items:
|
||||
report_parts.append("\n<b>✨ Новые товары:</b>")
|
||||
for item in new_items:
|
||||
report_parts.append(f" • <a href='{item['url']}'>{item['name']} ({item['volume']})</a> - {item['price']} ₽")
|
||||
|
||||
if removed_items:
|
||||
report_parts.append("\n<b>🗑️ Удаленные товары:</b>")
|
||||
for item in removed_items:
|
||||
report_parts.append(f" • <a href='{item['url']}'>{item['name']} ({item['volume']})</a>")
|
||||
|
||||
return "\n".join(report_parts)
|
||||
# </HELPER>
|
||||
# <COHERENCE_CHECK status="PASSED" />
|
||||
@@ -102,6 +102,13 @@ def init_database(db_path: Path, run_id: str):
|
||||
db_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with sqlite3.connect(db_path) as con:
|
||||
cur = con.cursor()
|
||||
cur.execute("""
|
||||
CREATE TABLE IF NOT EXISTS parsing_runs (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
run_id TEXT NOT NULL UNIQUE,
|
||||
start_time TIMESTAMP NOT NULL
|
||||
)
|
||||
""")
|
||||
cur.execute("""
|
||||
CREATE TABLE IF NOT EXISTS products (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
@@ -111,7 +118,8 @@ def init_database(db_path: Path, run_id: str):
|
||||
price INTEGER NOT NULL,
|
||||
url TEXT,
|
||||
is_in_stock BOOLEAN,
|
||||
parsed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
||||
parsed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||
FOREIGN KEY (run_id) REFERENCES parsing_runs (run_id)
|
||||
)
|
||||
""")
|
||||
cur.execute("""
|
||||
@@ -120,7 +128,8 @@ def init_database(db_path: Path, run_id: str):
|
||||
run_id TEXT NOT NULL,
|
||||
timestamp TEXT NOT NULL,
|
||||
level TEXT NOT NULL,
|
||||
message TEXT NOT NULL
|
||||
message TEXT NOT NULL,
|
||||
FOREIGN KEY (run_id) REFERENCES parsing_runs (run_id)
|
||||
)
|
||||
""")
|
||||
con.commit()
|
||||
|
||||
@@ -2,49 +2,101 @@
|
||||
|
||||
# <IMPORTS>
|
||||
import logging
|
||||
import os
|
||||
import gzip
|
||||
import shutil
|
||||
from logging.handlers import RotatingFileHandler
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
from .database import DatabaseLogHandler, DatabaseManager
|
||||
from .settings import settings
|
||||
# </IMPORTS>
|
||||
|
||||
# <HELPER name="gzip_rotator">
|
||||
class GzipRotator:
|
||||
def __call__(self, source: str, dest: str) -> None:
|
||||
try:
|
||||
with open(source, 'rb') as f_in:
|
||||
with gzip.open(dest, 'wb') as f_out:
|
||||
shutil.copyfileobj(f_in, f_out)
|
||||
os.remove(source)
|
||||
except Exception as e:
|
||||
print(f"Ошибка при сжатии лог-файла: {e}")
|
||||
|
||||
def namer(name: str) -> str:
|
||||
return name + ".gz"
|
||||
# </HELPER>
|
||||
|
||||
# <CONTRACT for="setup_logging">
|
||||
# description: "Настраивает логирование, опционально добавляя обработчик для записи в базу данных."
|
||||
# description: "Настраивает логирование с ротацией и сжатием, опционально добавляя обработчик для записи в базу данных."
|
||||
# preconditions:
|
||||
# - "run_id должен быть строкой."
|
||||
# - "db_manager должен быть экземпляром DatabaseManager или None."
|
||||
# postconditions:
|
||||
# - "Базовая конфигурация логирования настроена."
|
||||
# - "Базовая конфигурация логирования настроена с ротацией и сжатием."
|
||||
# - "Если log_to_db is True и db_manager предоставлен, добавляется обработчик для БД."
|
||||
# exceptions:
|
||||
# - "Может возникнуть исключение при ошибке инициализации обработчика БД."
|
||||
# </CONTRACT>
|
||||
# <ACTION name="setup_logging">
|
||||
def setup_logging(run_id: str, db_manager: Optional[DatabaseManager] = None):
|
||||
"""Настраивает систему логирования проекта."""
|
||||
"""Настраивает систему логирования проекта с ротацией и сжатием."""
|
||||
# <CORE_LOGIC>
|
||||
log_format = '[%(asctime)s] [%(levelname)s] :: %(message)s'
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format=log_format,
|
||||
datefmt='%Y-%m-%d %H:%M:%S',
|
||||
force=True # Перезаписывает любую существующую конфигурацию
|
||||
handlers = []
|
||||
|
||||
# <ACTION name="create_log_directory">
|
||||
# Создаем директорию для логов, если она не существует.
|
||||
os.makedirs(settings.log_dir, exist_ok=True)
|
||||
# </ACTION>
|
||||
|
||||
# <ACTION name="configure_file_handler">
|
||||
# Добавляем обработчик для записи логов в файл с ротацией и сжатием.
|
||||
log_file_name = f"run_{run_id}.log"
|
||||
log_file_path = settings.log_dir / log_file_name
|
||||
|
||||
# Используем RotatingFileHandler для управления размером лог-файлов.
|
||||
# 10 MB per file, 5 backup files
|
||||
file_handler = RotatingFileHandler(
|
||||
log_file_path, maxBytes=10*1024*1024, backupCount=5, encoding='utf-8'
|
||||
)
|
||||
file_handler.setLevel(logging.INFO)
|
||||
file_handler.setFormatter(logging.Formatter(log_format))
|
||||
|
||||
# <REFACTORING_TARGET>
|
||||
# Добавляем сжатие для ротированных файлов
|
||||
file_handler.rotator = GzipRotator()
|
||||
file_handler.namer = namer
|
||||
# </REFACTORING_TARGET>
|
||||
|
||||
handlers.append(file_handler)
|
||||
# </ACTION>
|
||||
|
||||
# <DEPENDENCY name="settings.log_to_db" />
|
||||
if settings.log_to_db and db_manager:
|
||||
# <ERROR_HANDLER>
|
||||
try:
|
||||
root_logger = logging.getLogger('')
|
||||
db_handler = DatabaseLogHandler(db_manager, run_id)
|
||||
db_handler.setLevel(logging.DEBUG)
|
||||
db_handler.setFormatter(logging.Formatter(log_format))
|
||||
root_logger.addHandler(db_handler)
|
||||
logging.info("Обработчик логов для записи в <20><>азу данных успешно добавлен.")
|
||||
handlers.append(db_handler)
|
||||
except Exception as e:
|
||||
logging.error(f"Не удалось инициализировать обработчик логов для БД: {e}")
|
||||
# Логирование еще не настроено, используем print для этой критической ошибки
|
||||
print(f"Не удалось инициализировать обработчик логов для БД: {e}")
|
||||
# </ERROR_HANDLER>
|
||||
|
||||
logging.info("Система логирования инициализирована.")
|
||||
logging.basicConfig(
|
||||
level=logging.DEBUG,
|
||||
format=log_format,
|
||||
datefmt='%Y-%m-%d %H:%M:%S',
|
||||
handlers=handlers,
|
||||
force=True # Перезаписывает любую существующую конфигурацию
|
||||
)
|
||||
|
||||
if any(isinstance(h, DatabaseLogHandler) for h in handlers):
|
||||
logging.info("Обработчик логов для записи в базу данных успешно добавлен.")
|
||||
|
||||
logging.info("Система логирования инициализирована с ротацией и сжатием.")
|
||||
# </CORE_LOGIC>
|
||||
# <COHERENCE_CHECK status="PASSED" />
|
||||
# </ACTION>
|
||||
|
||||
@@ -6,10 +6,14 @@
|
||||
# </DESIGN_NOTE>
|
||||
|
||||
# <IMPORTS>
|
||||
import logging
|
||||
from pydantic import BaseModel, Field, HttpUrl
|
||||
from pydantic.functional_validators import model_validator
|
||||
from datetime import datetime
|
||||
from typing import List
|
||||
from typing import List, Optional
|
||||
import uuid
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
# </IMPORTS>
|
||||
|
||||
# <MAIN_CONTRACT for="ProductVariant">
|
||||
@@ -19,8 +23,18 @@ import uuid
|
||||
class ProductVariant(BaseModel):
|
||||
# <STATE name="product_variant_fields">
|
||||
name: str = Field(..., description="Название продукта.")
|
||||
volume: str = Field(..., description="Объем или вариант продукта (например, '50мл', '10 капсул').")
|
||||
price: int = Field(..., gt=0, description="Цена продукта в числовом формате, должна быть положительной.")
|
||||
volume: Optional[str] = Field(None, description="Объем или вариант продукта (например, '50мл', '10 капсул'). Может быть пустым, если не применимо.")
|
||||
price: int = Field(..., description="Цена продукта в числовом формате. Должна быть положительной, если товар в наличии, иначе может быть 0.")
|
||||
is_in_stock: bool = Field(..., description="Наличие товара.")
|
||||
|
||||
@model_validator(mode='after')
|
||||
def validate_price_based_on_stock(self) -> 'ProductVariant':
|
||||
if not self.is_in_stock and self.price != 0:
|
||||
logger.warning(f"[CONTRACT_VIOLATION] Product '{self.name}' (URL: {self.url}) is out of stock but has a non-zero price ({self.price}). Setting price to 0.")
|
||||
self.price = 0
|
||||
elif self.is_in_stock and self.price <= 0:
|
||||
raise ValueError("Price must be greater than 0 for in-stock products.")
|
||||
return self
|
||||
url: HttpUrl = Field(..., description="Полный URL страницы варианта продукта.")
|
||||
is_in_stock: bool = Field(..., description="Наличие товара.")
|
||||
# </STATE>
|
||||
|
||||
@@ -12,12 +12,7 @@ import pika
|
||||
from pika.adapters.blocking_connection import BlockingChannel
|
||||
from pika.exceptions import AMQPConnectionError, AMQPChannelError, ConnectionClosed
|
||||
|
||||
from .settings import (
|
||||
RABBITMQ_HOST, RABBITMQ_PORT, RABBITMQ_USERNAME, RABBITMQ_PASSWORD,
|
||||
RABBITMQ_VIRTUAL_HOST, RABBITMQ_CONNECTION_TIMEOUT, RABBITMQ_HEARTBEAT,
|
||||
RABBITMQ_BLOCKED_CONNECTION_TIMEOUT, RABBITMQ_PRODUCTS_QUEUE,
|
||||
RABBITMQ_LOGS_QUEUE, RABBITMQ_EXCHANGE
|
||||
)
|
||||
from .settings import settings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -41,17 +36,17 @@ class RabbitMQConnection:
|
||||
Returns:
|
||||
pika.ConnectionParameters: Параметры подключения
|
||||
"""
|
||||
credentials = pika.PlainCredentials(RABBITMQ_USERNAME, RABBITMQ_PASSWORD)
|
||||
credentials = pika.PlainCredentials(settings.rabbitmq_user, settings.rabbitmq_password)
|
||||
return pika.ConnectionParameters(
|
||||
host=RABBITMQ_HOST,
|
||||
port=RABBITMQ_PORT,
|
||||
virtual_host=RABBITMQ_VIRTUAL_HOST,
|
||||
host=settings.rabbitmq_host,
|
||||
port=settings.rabbitmq_port,
|
||||
virtual_host=settings.rabbitmq_vhost,
|
||||
credentials=credentials,
|
||||
connection_attempts=3,
|
||||
retry_delay=5,
|
||||
socket_timeout=RABBITMQ_CONNECTION_TIMEOUT,
|
||||
heartbeat=RABBITMQ_HEARTBEAT,
|
||||
blocked_connection_timeout=RABBITMQ_BLOCKED_CONNECTION_TIMEOUT
|
||||
socket_timeout=30, # Hardcoded for now
|
||||
heartbeat=600, # Hardcoded for now
|
||||
blocked_connection_timeout=300 # Hardcoded for now
|
||||
)
|
||||
|
||||
def connect(self) -> bool:
|
||||
|
||||
@@ -67,22 +67,40 @@ class Settings(BaseModel):
|
||||
|
||||
# <CONFIG name="logging_settings">
|
||||
log_to_db: bool = Field(default=os.getenv('PARSER_LOG_TO_DB', 'true').lower() == 'true')
|
||||
log_dir: Path = Field(default=BASE_DIR / "logs", description="Директория для сохранения логов")
|
||||
# </CONFIG>
|
||||
|
||||
# <CONFIG name="performance_settings">
|
||||
request_timeout: int = Field(default=int(os.getenv('PARSER_TIMEOUT', 30)))
|
||||
delay_between_requests: float = Field(default=float(os.getenv('PARSER_DELAY', 1.0)))
|
||||
max_retries: int = Field(default=int(os.getenv('PARSER_RETRIES', 3)))
|
||||
num_parser_threads: int = Field(default=int(os.getenv('PARSER_THREADS', 5)), description="Количество потоков для парсинга")
|
||||
# </CONFIG>
|
||||
|
||||
# <CONFIG name="telegram_settings">
|
||||
telegram_bot_token: str = Field(default=os.getenv('TELEGRAM_BOT_TOKEN', ''), description="Токен для Telegram бота")
|
||||
telegram_chat_id: str = Field(default=os.getenv('TELEGRAM_CHAT_ID', ''), description="ID чата для отправки уведомлений")
|
||||
# </CONFIG>
|
||||
|
||||
# <CONFIG name="rabbitmq_settings">
|
||||
rabbitmq_host: str = Field(default=os.getenv('RABBITMQ_HOST', 'localhost'))
|
||||
rabbitmq_port: int = Field(default=int(os.getenv('RABBITMQ_PORT', 5672)))
|
||||
rabbitmq_user: str = Field(default=os.getenv('RABBITMQ_USERNAME', 'guest'))
|
||||
rabbitmq_password: str = Field(default=os.getenv('RABBITMQ_PASSWORD', 'guest'))
|
||||
rabbitmq_vhost: str = Field(default=os.getenv('RABBITMQ_VIRTUAL_HOST', '/'))
|
||||
rabbitmq_products_queue: str = Field(default=os.getenv('RABBITMQ_PRODUCTS_QUEUE', 'price_parser.products'))
|
||||
rabbitmq_logs_queue: str = Field(default=os.getenv('RABBITMQ_LOGS_QUEUE', 'price_parser.logs'))
|
||||
rabbitmq_exchange: str = Field(default=os.getenv('RABBITMQ_EXCHANGE', 'price_parser.exchange'))
|
||||
# </CONFIG>
|
||||
|
||||
# <CONFIG name="selectors_config_instance">
|
||||
selectors: ScraperSelectors = ScraperSelectors(
|
||||
CATALOG_PRODUCT_LINK='.product-card h4 a.product-link',
|
||||
VARIANT_LIST_ITEM='.product-version-select li',
|
||||
VARIANT_LIST_ITEM='.product-version-select li, .product-variants-list .variant-item',
|
||||
PRODUCT_PAGE_NAME='h1.product-h1',
|
||||
ACTIVE_VOLUME='.product-version-select li.active',
|
||||
PRICE_BLOCK='.product-sale-box .price span',
|
||||
PRODUCT_UNAVAILABLE='.product-unavailable',
|
||||
ACTIVE_VOLUME='.product-version-select li.active, .variant-item.active',
|
||||
PRICE_BLOCK='.price-value, .product-price .price, .product-sale-box .price span',
|
||||
PRODUCT_UNAVAILABLE='.product-unavailable, .out-of-stock, .unavailable, .stock.status-0',
|
||||
)
|
||||
# </CONFIG>
|
||||
|
||||
|
||||
13
src/main.py
13
src/main.py
@@ -7,8 +7,10 @@
|
||||
# <IMPORTS>
|
||||
import sys
|
||||
import logging
|
||||
from datetime import datetime
|
||||
from orchestrator import AppOrchestrator
|
||||
from core.settings import settings
|
||||
from core.logging_config import setup_logging
|
||||
# </IMPORTS>
|
||||
|
||||
# <CONTRACT for="main">
|
||||
@@ -20,17 +22,15 @@ from core.settings import settings
|
||||
# exceptions:
|
||||
# - "ValueError: при ошибках в конфигурации."
|
||||
# - "KeyboardInterrupt: при прерывании пользователем."
|
||||
# - "Exception: при любых других критических ошибках."
|
||||
# - "Exception: при любых других критических ошибка<EFBFBD><EFBFBD>."
|
||||
# </CONTRACT>
|
||||
# <ENTRYPOINT name="main">
|
||||
def main():
|
||||
"""Точка входа в приложение."""
|
||||
# <INIT>
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format='[%(asctime)s] [%(levelname)s] %(name)s: %(message)s',
|
||||
datefmt='%Y-%m-%d %H:%M:%S'
|
||||
)
|
||||
run_id = datetime.now().strftime("%Y%m%d-%H%M%S")
|
||||
setup_logging(run_id=run_id)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
# </INIT>
|
||||
|
||||
@@ -50,6 +50,7 @@ def main():
|
||||
logger.info(f" • Таймаут запросов: {settings.request_timeout}с")
|
||||
logger.info(f" • Задержка между запросами: {settings.delay_between_requests}с")
|
||||
logger.info(f" • Максимум попыток: {settings.max_retries}")
|
||||
logger.info(f" • Количество потоков: {settings.num_parser_threads}")
|
||||
|
||||
# <DEPENDENCY name="settings.validate_configuration" />
|
||||
config_errors = settings.validate_configuration()
|
||||
|
||||
@@ -12,13 +12,17 @@ import requests
|
||||
from datetime import datetime
|
||||
from typing import List, Optional
|
||||
from contextlib import contextmanager
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
|
||||
from core.settings import Settings, ENABLE_RABBITMQ_EXPORT, ENABLE_CSV_EXPORT, ENABLE_DATABASE_EXPORT
|
||||
from core.models import ProductVariant
|
||||
from core.database import init_database, save_data_to_db, DatabaseManager
|
||||
from core.logging_config import setup_logging
|
||||
from core.database import DatabaseManager, init_database, save_data_to_db
|
||||
from core.rabbitmq import RabbitMQExporter
|
||||
from scraper.engine import Scraper
|
||||
from utils.exporters import save_data_to_csv, export_data_to_rabbitmq, export_logs_to_rabbitmq, validate_rabbitmq_connection
|
||||
from utils.exporters import save_data_to_csv
|
||||
from core.logging_config import setup_logging
|
||||
from datetime import datetime
|
||||
import sqlite3
|
||||
# </IMPORTS>
|
||||
|
||||
# <MAIN_CONTRACT for="AppOrchestrator">
|
||||
@@ -48,19 +52,30 @@ class AppOrchestrator:
|
||||
# <DEPENDENCY name="Scraper" relationship="composition">
|
||||
# Оркестратор владеет скрейпером и управляет его жизненным циклом.
|
||||
self.scraper = Scraper(
|
||||
session=self.http_session,
|
||||
selectors=self.settings.selectors,
|
||||
base_url=self.settings.base_url
|
||||
)
|
||||
# </DEPENDENCY>
|
||||
|
||||
self.logger = logging.getLogger(self.__class__.__name__)
|
||||
# </INIT>
|
||||
self._setup()
|
||||
|
||||
def _register_run(self):
|
||||
"""Регистрирует текущий запуск в базе данных."""
|
||||
if not self.db_manager:
|
||||
return
|
||||
try:
|
||||
with self.db_manager as conn:
|
||||
conn.execute(
|
||||
"INSERT INTO parsing_runs (run_id, start_time) VALUES (?, ?)",
|
||||
(self.run_id, datetime.now())
|
||||
)
|
||||
conn.commit()
|
||||
self.logger.info(f"Запуск {self.run_id} зарегистрирован в БД.")
|
||||
except sqlite3.Error as e:
|
||||
self.logger.error(f"Не удалось зарегистрировать запуск {self.run_id} в БД: {e}")
|
||||
raise
|
||||
|
||||
# <CONTRACT for="_error_context">
|
||||
# description: "Контекстный менеджер для централизованной обработки ошибок в ключевых операциях."
|
||||
# </CONTRACT>
|
||||
# <HELPER name="_error_context">
|
||||
@contextmanager
|
||||
def _error_context(self, operation: str):
|
||||
"""Контекстный менеджер для обработки ошибок с детальной диагностикой."""
|
||||
@@ -70,7 +85,6 @@ class AppOrchestrator:
|
||||
self.logger.error(f"[{operation.upper()}] Ошибка в операции '{operation}': {e}", exc_info=True)
|
||||
self._log_error_details(operation, e)
|
||||
raise
|
||||
# </HELPER>
|
||||
|
||||
# <CONTRACT for="_log_error_details">
|
||||
# description: "Логирует расширенную информацию об ошибке для упрощения диагностики."
|
||||
@@ -109,10 +123,60 @@ class AppOrchestrator:
|
||||
setup_logging(self.run_id, self.db_manager)
|
||||
|
||||
if ENABLE_RABBITMQ_EXPORT:
|
||||
if validate_rabbitmq_connection():
|
||||
self.rabbitmq_exporter = RabbitMQExporter()
|
||||
if self.rabbitmq_exporter.connection.connect():
|
||||
self.logger.info("[ACTION:_setup] Подключение к RabbitMQ доступно")
|
||||
else:
|
||||
self.logger.warning("[ACTION:_setup] Подключение к RabbitMQ недоступно, экспорт будет пропущен")
|
||||
self.rabbitmq_exporter = None
|
||||
|
||||
self.logger.info(f"[ACTION:_setup] Оркестратор запущен. Архитектура v2.0. Run ID: {self.run_id}")
|
||||
# </CORE_LOGIC>
|
||||
# </ACTION>
|
||||
|
||||
# <CONTRACT for="_log_error_details">
|
||||
# description: "Логирует расширенную информацию об ошибке для упрощения диагностики."
|
||||
# </CONTRACT>
|
||||
# <HELPER name="_log_error_details">
|
||||
def _log_error_details(self, operation: str, error: Exception):
|
||||
"""Логирует детальную информацию об ошибке."""
|
||||
error_info = {
|
||||
'operation': operation,
|
||||
'error_type': type(error).__name__,
|
||||
'error_message': str(error),
|
||||
'run_id': self.run_id,
|
||||
'timestamp': datetime.now().isoformat(),
|
||||
'stats': self.stats.copy()
|
||||
}
|
||||
self.logger.error(f"[HELPER:_log_error_details] Детали ошибки: {error_info}")
|
||||
# </HELPER>
|
||||
|
||||
# <CONTRACT for="_setup">
|
||||
# description: "Шаг 0: Инициализация всех систем (БД, логирование, RabbitMQ)."
|
||||
# </CONTRACT>
|
||||
# <ACTION name="_setup">
|
||||
def _setup(self):
|
||||
"""Инициализация всех систем перед началом парсинга."""
|
||||
with self._error_context("setup"):
|
||||
# <CORE_LOGIC>
|
||||
self.stats['start_time'] = datetime.now()
|
||||
self.logger.info(f"[ACTION:_setup] Запуск инициализации систем. Run ID: {self.run_id}")
|
||||
|
||||
if self.settings.save_to_db or self.settings.log_to_db:
|
||||
self.settings.output_dir.mkdir(parents=True, exist_ok=True)
|
||||
self.db_manager = DatabaseManager(self.settings.db_path)
|
||||
init_database(self.db_manager.db_path, self.run_id)
|
||||
|
||||
# <DEPENDENCY name="setup_logging" />
|
||||
setup_logging(self.run_id, self.db_manager)
|
||||
|
||||
if ENABLE_RABBITMQ_EXPORT:
|
||||
self.rabbitmq_exporter = RabbitMQExporter()
|
||||
if self.rabbitmq_exporter.connection.connect():
|
||||
self.logger.info("[ACTION:_setup] Подключение к RabbitMQ доступно")
|
||||
else:
|
||||
self.logger.warning("[ACTION:_setup] Подключение к RabbitMQ недоступно, экспорт будет пропущен")
|
||||
self.rabbitmq_exporter = None
|
||||
|
||||
self.logger.info(f"[ACTION:_setup] Оркестратор запущен. Архитектура v2.0. Run ID: {self.run_id}")
|
||||
# </CORE_LOGIC>
|
||||
@@ -155,41 +219,56 @@ class AppOrchestrator:
|
||||
# </CORE_LOGIC>
|
||||
# </ACTION>
|
||||
|
||||
# <CONTRACT for="_scrape_single_url">
|
||||
# description: "Вспомогательный метод для парсинга одного URL в отдельном потоке."
|
||||
# postconditions: "Возвращает ProductVariant или None в случае ошибки."
|
||||
# </CONTRACT>
|
||||
# <HELPER name="_scrape_single_url">
|
||||
def _scrape_single_url(self, url: str) -> Optional[ProductVariant]:
|
||||
try:
|
||||
self.logger.info(f"[ACTION:_scrape_single_url] Парсинг URL: {url.split('/')[-1]}")
|
||||
variant_data = self.scraper.scrape_variant_page(
|
||||
variant_url=url,
|
||||
run_id=self.run_id
|
||||
)
|
||||
if variant_data:
|
||||
self.logger.debug(f"[ACTION:_scrape_single_url] Успешно спарсен URL: {url.split('/')[-1]}")
|
||||
return variant_data
|
||||
else:
|
||||
self.logger.warning(f"[ACTION:_scrape_single_url] Данные не получены для URL: {url.split('/')[-1]}")
|
||||
return None
|
||||
except Exception as e:
|
||||
self.logger.error(f"[ACTION:_scrape_single_url] Ошибка при парсинге URL ({url}): {e}")
|
||||
return None
|
||||
# </HELPER>
|
||||
|
||||
# <CONTRACT for="_scrape_data">
|
||||
# description: "Шаг 3: Итеративный парсинг данных по списку URL."
|
||||
# description: "Шаг 3: Параллельный парсинг данных по списку URL с использованием ThreadPoolExecutor."
|
||||
# </CONTRACT>
|
||||
# <ACTION name="_scrape_data">
|
||||
def _scrape_data(self, urls: List[str]):
|
||||
"""Итеративный парсинг данных."""
|
||||
"""Параллельный парсинг данных."""
|
||||
with self._error_context("scrape_data"):
|
||||
# <CORE_LOGIC>
|
||||
total_to_scrape = len(urls)
|
||||
self.logger.info(f"[ACTION:_scrape_data] Начало парсинга {total_to_scrape} URL вариантов.")
|
||||
self.logger.info(f"[ACTION:_scrape_data] Начало параллельного парсинга {total_to_scrape} URL вариантов с {self.settings.num_parser_threads} потоками.")
|
||||
|
||||
for i, url in enumerate(urls):
|
||||
# <ERROR_HANDLER for="single_url_scrape">
|
||||
try:
|
||||
self.logger.info(f"[ACTION:_scrape_data] Парсинг URL {i+1}/{total_to_scrape}: {url.split('/')[-1]}")
|
||||
time.sleep(1)
|
||||
|
||||
# <DEPENDENCY name="scraper.scrape_variant_page" />
|
||||
variant_data = self.scraper.scrape_variant_page(
|
||||
variant_url=url,
|
||||
run_id=self.run_id
|
||||
)
|
||||
|
||||
if variant_data:
|
||||
self.final_data.append(variant_data)
|
||||
self.stats['successful_parses'] += 1
|
||||
else:
|
||||
with ThreadPoolExecutor(max_workers=self.settings.num_parser_threads) as executor:
|
||||
future_to_url = {executor.submit(self._scrape_single_url, url): url for url in urls}
|
||||
|
||||
for i, future in enumerate(as_completed(future_to_url)):
|
||||
url = future_to_url[future]
|
||||
try:
|
||||
variant_data = future.result()
|
||||
if variant_data:
|
||||
self.final_data.append(variant_data)
|
||||
self.stats['successful_parses'] += 1
|
||||
else:
|
||||
self.stats['failed_parses'] += 1
|
||||
except Exception as e:
|
||||
self.stats['failed_parses'] += 1
|
||||
self.logger.error(f"[ACTION:_scrape_data] Необработанная ошибка в потоке для URL ({url}): {e}")
|
||||
|
||||
except Exception as e:
|
||||
self.stats['failed_parses'] += 1
|
||||
self.logger.error(f"[ACTION:_scrape_data] Ошибка при парсинге URL {i+1}/{total_to_scrape} ({url}): {e}")
|
||||
continue
|
||||
# </ERROR_HANDLER>
|
||||
|
||||
self.logger.info(f"[ACTION:_scrape_data] Парсинг данных завершен. Всего собрано {len(self.final_data)} валидных вариантов.")
|
||||
self.logger.info(f"[STATS][ACTION:_scrape_data] Успешно: {self.stats['successful_parses']}, Ошибок: {self.stats['failed_parses']}")
|
||||
# </CORE_LOGIC>
|
||||
@@ -238,12 +317,10 @@ class AppOrchestrator:
|
||||
except Exception as e:
|
||||
self.logger.error(f"[ACTION:_save_results] Ошибка при сохранении в БД: {e}")
|
||||
|
||||
if ENABLE_RABBITMQ_EXPORT:
|
||||
# <DEPENDENCY name="export_data_to_rabbitmq" />
|
||||
# ... (logic remains the same)
|
||||
if ENABLE_RABBITMQ_EXPORT and self.rabbitmq_exporter:
|
||||
try:
|
||||
data_to_rabbitmq = [p.model_dump() for p in self.final_data]
|
||||
if export_data_to_rabbitmq(data_to_rabbitmq, self.run_id, self.run_id):
|
||||
if self.rabbitmq_exporter.export_products(data_to_rabbitmq, self.run_id):
|
||||
self.logger.info("[ACTION:_save_results] Данные успешно экспортированы в RabbitMQ.")
|
||||
else:
|
||||
self.logger.error("[ACTION:_save_results] Не удалось экспортировать данные в RabbitMQ.")
|
||||
@@ -272,6 +349,10 @@ class AppOrchestrator:
|
||||
if self.db_manager:
|
||||
self.db_manager.close()
|
||||
self.logger.debug("[ACTION:_cleanup] Соединение с базой данных закрыто.")
|
||||
|
||||
if self.rabbitmq_exporter:
|
||||
self.rabbitmq_exporter.close()
|
||||
self.logger.debug("[ACTION:_cleanup] Соединение с RabbitMQ закрыто.")
|
||||
|
||||
if duration:
|
||||
self.logger.info(f"[STATS][ACTION:_cleanup] Время выполнения: {duration.total_seconds():.2f} секунд")
|
||||
@@ -289,8 +370,12 @@ class AppOrchestrator:
|
||||
# </CONTRACT>
|
||||
# <ENTRYPOINT name="run">
|
||||
def run(self):
|
||||
"""Основной метод, запускающий весь процесс парсинга."""
|
||||
self.logger.info("="*50)
|
||||
"""Основной метод, запускающий все этапы работы оркестратора."""
|
||||
self.logger.info(f"Запуск оркестратора. Run ID: {self.run_id}")
|
||||
if self.db_manager:
|
||||
self._register_run()
|
||||
|
||||
# ... остальной код ...
|
||||
self.logger.info("[ENTRYPOINT:run] Запуск главного процесса оркестратора.")
|
||||
self.logger.info("="*50)
|
||||
|
||||
|
||||
@@ -26,25 +26,20 @@ from core.settings import ScraperSelectors
|
||||
# </MAIN_CONTRACT>
|
||||
class Scraper:
|
||||
# <INIT name="__init__">
|
||||
def __init__(self, session: requests.Session, selectors: ScraperSelectors, base_url: str):
|
||||
def __init__(self, selectors: ScraperSelectors, base_url: str):
|
||||
"""Инициализирует скрейпер с зависимостями: сессия, селекторы и базовый URL."""
|
||||
# <STATE name="initial_state">
|
||||
self.session = session
|
||||
self.selectors = selectors
|
||||
self.base_url = base_url
|
||||
self.logger = logging.getLogger(self.__class__.__name__)
|
||||
# </STATE>
|
||||
|
||||
# <ACTION name="setup_retry_strategy_on_init">
|
||||
self._setup_retry_strategy()
|
||||
# </ACTION>
|
||||
# </INIT>
|
||||
|
||||
# <CONTRACT for="_setup_retry_strategy">
|
||||
# description: "Настраивает retry-логику для HTTP-адаптера сессии."
|
||||
# </CONTRACT>
|
||||
# <HELPER name="_setup_retry_strategy">
|
||||
def _setup_retry_strategy(self):
|
||||
def _setup_retry_strategy(self, session: requests.Session):
|
||||
"""Настраивает retry стратегию для HTTP запросов."""
|
||||
# <CORE_LOGIC>
|
||||
retry_strategy = Retry(
|
||||
@@ -54,12 +49,27 @@ class Scraper:
|
||||
allowed_methods=["HEAD", "GET", "OPTIONS"]
|
||||
)
|
||||
adapter = HTTPAdapter(max_retries=retry_strategy)
|
||||
self.session.mount("http://", adapter)
|
||||
self.session.mount("https://", adapter)
|
||||
session.mount("http://", adapter)
|
||||
session.mount("https://", adapter)
|
||||
self.logger.debug("[HELPER:_setup_retry_strategy] Retry стратегия настроена для HTTP запросов.")
|
||||
# </CORE_LOGIC>
|
||||
# </HELPER>
|
||||
|
||||
# <CONTRACT for="_find_element_with_multiple_selectors">
|
||||
# description: "Пытается найти элемент, используя список CSS-селекторов."
|
||||
# postconditions: "Возвращает найденный элемент BeautifulSoup или None."
|
||||
# </CONTRACT>
|
||||
# <HELPER name="_find_element_with_multiple_selectors">
|
||||
def _find_element_with_multiple_selectors(self, soup: BeautifulSoup, selectors: List[str], log_prefix: str, element_name: str):
|
||||
for selector in selectors:
|
||||
element = soup.select_one(selector)
|
||||
if element:
|
||||
self.logger.debug(f"{log_prefix} [FOUND_ELEMENT] Элемент '{element_name}' найден с селектором: '{selector}'")
|
||||
return element
|
||||
self.logger.warning(f"{log_prefix} [MISSING_ELEMENT] Элемент '{element_name}' не найден ни с одним из селекторов: {selectors}")
|
||||
return None
|
||||
# </HELPER>
|
||||
|
||||
# <CONTRACT for="_clean_price">
|
||||
# description: "Очищает строковое представление цены, оставляя только цифры."
|
||||
# postconditions: "Возвращает целое число или 0 в случае ошибки."
|
||||
@@ -93,7 +103,7 @@ class Scraper:
|
||||
# postconditions: "Возвращает текстовое содержимое страницы или None в случае любой ошибки."
|
||||
# </CONTRACT>
|
||||
# <HELPER name="_fetch_page">
|
||||
def _fetch_page(self, url: str, request_id: str) -> Optional[str]:
|
||||
def _fetch_page(self, session: requests.Session, url: str, request_id: str) -> Optional[str]:
|
||||
"""Приватный метод для скачивания HTML-содержимого страницы."""
|
||||
log_prefix = f"[HELPER:_fetch_page(id={request_id})]"
|
||||
self.logger.debug(f"{log_prefix} Запрос к URL: {url}")
|
||||
@@ -101,7 +111,7 @@ class Scraper:
|
||||
# <ERROR_HANDLER for="network_requests">
|
||||
try:
|
||||
# <CORE_LOGIC>
|
||||
response = self.session.get(url, timeout=30)
|
||||
response = session.get(url, timeout=30)
|
||||
response.raise_for_status()
|
||||
|
||||
if not response.text.strip():
|
||||
@@ -119,7 +129,7 @@ class Scraper:
|
||||
self.logger.error(f"{log_prefix} [TIMEOUT] Превышено время ожидания для {url}")
|
||||
return None
|
||||
except requests.exceptions.ConnectionError as e:
|
||||
self.logger.error(f"{log_prefix} [CONNECTION_ERROR] Ошибка соединения для {url}: {e}")
|
||||
self.logger.error(f"{log_prefix} [CONNECTION_ERROR] Ошибка соединения дл<EFBFBD><EFBFBD> {url}: {e}")
|
||||
return None
|
||||
except requests.exceptions.HTTPError as e:
|
||||
self.logger.error(f"{log_prefix} [HTTP_ERROR] HTTP ошибка для {url}: {e.response.status_code}")
|
||||
@@ -144,7 +154,9 @@ class Scraper:
|
||||
log_prefix = f"[ACTION:get_base_product_urls(id={run_id})]"
|
||||
self.logger.info(f"{log_prefix} Начало сбора базовых URL с: {catalog_url}")
|
||||
|
||||
html = self._fetch_page(catalog_url, f"get_base_urls(id={run_id})")
|
||||
with requests.Session() as session:
|
||||
self._setup_retry_strategy(session)
|
||||
html = self._fetch_page(session, catalog_url, f"get_base_urls(id={run_id})")
|
||||
if not html:
|
||||
self.logger.error(f"{log_prefix} [CRITICAL] Не удалось получить HTML страницы каталога, возвращаю пустой список.")
|
||||
return []
|
||||
@@ -196,7 +208,9 @@ class Scraper:
|
||||
for i, base_url in enumerate(base_product_urls):
|
||||
self.logger.info(f"{log_prefix} Обработка базового URL {i+1}/{total_base}: {base_url.split('/')[-1]}")
|
||||
|
||||
html = self._fetch_page(base_url, f"get_variant_urls(id={run_id})-{i+1}")
|
||||
with requests.Session() as session:
|
||||
self._setup_retry_strategy(session)
|
||||
html = self._fetch_page(session, base_url, f"get_variant_urls(id={run_id})-{i+1}")
|
||||
if not html:
|
||||
self.logger.warning(f"{log_prefix} Пропуск базового URL из-за ошибки загрузки: {base_url}")
|
||||
continue
|
||||
@@ -224,9 +238,7 @@ class Scraper:
|
||||
all_variant_urls.append(base_url)
|
||||
# </FALLBACK>
|
||||
|
||||
time.sleep(0.5)
|
||||
|
||||
self.logger.info(f"{log_prefix} [COHERENCE_CHECK_PASSED] Обнаружено всего {len(all_variant_urls)} URL вариантов для парсинга.")
|
||||
self.logger.info(f"{log_prefix} [COHERENCE_CHECK_PASSED] Обнаружено всего {len(all_variant_urls)} URL вариантов для парсинга.")
|
||||
return all_variant_urls
|
||||
# </CORE_LOGIC>
|
||||
# </ACTION>
|
||||
@@ -240,9 +252,11 @@ class Scraper:
|
||||
def scrape_variant_page(self, variant_url: str, run_id: str) -> Optional[ProductVariant]:
|
||||
"""Парсит страницу одного варианта и возвращает Pydantic-модель."""
|
||||
log_prefix = f"[ACTION:scrape_variant_page(id={run_id}, url={variant_url.split('/')[-1]})]"
|
||||
self.logger.info(f"{log_prefix} Начало парсинга страниц<EFBFBD><EFBFBD> варианта.")
|
||||
self.logger.info(f"{log_prefix} Начало парсинга страницы варианта.")
|
||||
|
||||
html = self._fetch_page(variant_url, f"scrape_variant(id={run_id}, url={variant_url.split('/')[-1]})")
|
||||
with requests.Session() as session:
|
||||
self._setup_retry_strategy(session)
|
||||
html = self._fetch_page(session, variant_url, f"scrape_variant(id={run_id}, url={variant_url.split('/')[-1]})")
|
||||
if not html:
|
||||
self.logger.warning(f"{log_prefix} Не удалось получить HTML страницы варианта, пропуск парсинга.")
|
||||
return None
|
||||
@@ -251,10 +265,10 @@ class Scraper:
|
||||
try:
|
||||
soup = BeautifulSoup(html, 'html.parser')
|
||||
|
||||
name_el = soup.select_one(self.selectors.product_page_name)
|
||||
price_el = soup.select_one(self.selectors.price_block)
|
||||
volume_el = soup.select_one(self.selectors.active_volume)
|
||||
unavailable_el = soup.select_one(self.selectors.product_unavailable)
|
||||
name_el = self._find_element_with_multiple_selectors(soup, [self.selectors.product_page_name, 'h1.product-title', 'h2.product-name'], log_prefix, 'имени продукта')
|
||||
price_el = self._find_element_with_multiple_selectors(soup, [self.selectors.price_block, 'span.price', 'div.price'], log_prefix, 'цены')
|
||||
volume_el = self._find_element_with_multiple_selectors(soup, [self.selectors.active_volume, '.product-options .active'], log_prefix, 'объема')
|
||||
unavailable_el = self._find_element_with_multiple_selectors(soup, [self.selectors.product_unavailable, 'div.out-of-stock', 'span.unavailable'], log_prefix, 'отсутствия в наличии')
|
||||
|
||||
# <PRECONDITION>
|
||||
# Товар должен иметь имя. Цена или статус "нет в наличии" должны присутствовать.
|
||||
@@ -262,7 +276,7 @@ class Scraper:
|
||||
self.logger.warning(f"{log_prefix} [MISSING_ELEMENT] Не найден элемент имени продукта с селектором: {self.selectors.product_page_name}")
|
||||
return None
|
||||
if not price_el and not unavailable_el:
|
||||
self.logger.warning(f"{log_prefix} [MISSING_ELEMENT] Не найден ни элемент цены, ни элемент отсутствия в наличии.")
|
||||
self.logger.warning(f"{log_prefix} [MISSING_ELEMENT] Не найден ни элемент цены, ни элемент отсутствия в наличи<EFBFBD><EFBFBD>.")
|
||||
return None
|
||||
# </PRECONDITION>
|
||||
|
||||
@@ -293,6 +307,7 @@ class Scraper:
|
||||
# <POSTCONDITION>
|
||||
# <CONTRACT_VALIDATOR by="Pydantic">
|
||||
try:
|
||||
self.logger.debug(f"{log_prefix} [DEBUG_SCRAPE_DATA] Name: '{name}', Price: {price}, URL: '{variant_url}', IsInStock: {is_in_stock}")
|
||||
product = ProductVariant(name=name, volume=volume, price=price, url=variant_url, is_in_stock=is_in_stock)
|
||||
self.logger.debug(f"{log_prefix} [COHERENCE_CHECK_PASSED] Успешно распарсен вариант: '{product.name}' | InStock: {product.is_in_stock} | Price: '{product.price}'")
|
||||
return product
|
||||
@@ -311,4 +326,4 @@ class Scraper:
|
||||
# </FALLBACK>
|
||||
# </CORE_LOGIC>
|
||||
# </ACTION>
|
||||
# <COHERENCE_CHECK status="PASSED" />
|
||||
# <COHERENCE_CHECK status="PASSED" />
|
||||
72
src/utils/telegram_sender.py
Normal file
72
src/utils/telegram_sender.py
Normal file
@@ -0,0 +1,72 @@
|
||||
# <MODULE name="utils.telegram_sender" semantics="telegram_notifications" />
|
||||
|
||||
# <IMPORTS>
|
||||
import logging
|
||||
import asyncio
|
||||
from telegram import Bot
|
||||
from telegram.error import TelegramError
|
||||
from src.core.settings import settings
|
||||
# </IMPORTS>
|
||||
|
||||
# <MAIN_CONTRACT for="TelegramSender">
|
||||
# description: "Отправляет сообщения в Telegram, используя токен и ID чата из настроек."
|
||||
# invariant: "Токен и ID чата должны быть заданы в конфигурации."
|
||||
# </MAIN_CONTRACT>
|
||||
class TelegramSender:
|
||||
# <INIT>
|
||||
def __init__(self):
|
||||
if not settings.telegram_bot_token or not settings.telegram_chat_id:
|
||||
raise ValueError("TELEGRAM_BOT_TOKEN и TELEGRAM_CHAT_ID должны быть установлены в .env")
|
||||
self.bot = Bot(token=settings.telegram_bot_token)
|
||||
self.chat_id = settings.telegram_chat_id
|
||||
self.logger = logging.getLogger(__name__)
|
||||
# </INIT>
|
||||
|
||||
# <CONTRACT for="send_message">
|
||||
# description: "Асинхронно отправляет текстовое сообщение <20><> Telegram."
|
||||
# preconditions:
|
||||
# - "Сообщение (message) должно быть непустой строкой."
|
||||
# postconditions:
|
||||
# - "Сообщение отправлено в чат."
|
||||
# exceptions:
|
||||
# - "TelegramError: при ошибках API Telegram."
|
||||
# - "Exception: при других ошибках."
|
||||
# </CONTRACT>
|
||||
# <ACTION name="send_message">
|
||||
async def send_message(self, message: str):
|
||||
"""Асинхронно отправляет сообщение в Telegram."""
|
||||
# <CORE_LOGIC>
|
||||
try:
|
||||
self.logger.info(f"Отправка сообщения в Telegram (чат ID: {self.chat_id})...")
|
||||
await self.bot.send_message(
|
||||
chat_id=self.chat_id,
|
||||
text=message,
|
||||
parse_mode='HTML'
|
||||
)
|
||||
self.logger.info("Сообщение в Telegram успешно отправлено.")
|
||||
except TelegramError as e:
|
||||
self.logger.error(f"Ошибка отправки сообщения в Telegram: {e}")
|
||||
raise
|
||||
except Exception as e:
|
||||
self.logger.error(f"Непредвиденная ошибка при отправке сообщения в Telegram: {e}")
|
||||
raise
|
||||
# </CORE_LOGIC>
|
||||
# </ACTION>
|
||||
|
||||
# <HELPER name="send_telegram_notification">
|
||||
# description: "Удобная обертка для инициализации и отправки сообщения."
|
||||
# </HELPER>
|
||||
async def send_telegram_notification(message: str):
|
||||
"""
|
||||
Инициализирует TelegramSender и отправляет сообщение.
|
||||
Является удобной оберткой для вызова из других частей приложения.
|
||||
"""
|
||||
try:
|
||||
sender = TelegramSender()
|
||||
await sender.send_message(message)
|
||||
except ValueError as e:
|
||||
logging.error(f"Ошибка инициализации TelegramSender: {e}")
|
||||
except Exception as e:
|
||||
logging.error(f"Не удалось отправить уведомление в Telegram: {e}")
|
||||
|
||||
# <COHERENCE_CHECK status="PASSED" />
|
||||
Reference in New Issue
Block a user