Compare commits
6 Commits
cf52a7c0ad
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| ae6d81f6f2 | |||
| 69e90bd4f2 | |||
| c59dff3397 | |||
| 5742d474fd | |||
| 40317aa2e7 | |||
| 06cfca4cd1 |
236
README.md
236
README.md
@@ -1,58 +1,55 @@
|
|||||||
# ANCHOR: Project_README
|
# ANCHOR: Project_README
|
||||||
# Семантика: Документация, описывающая проект, его структуру и способ использования.
|
# Семантика: Документация, описывающая проект, его структуру и способ использования.
|
||||||
|
|
||||||
# Парсер цен для ElixirPeptide
|
# Сервис мониторинга цен ElixirPeptide (v3.0)
|
||||||
|
|
||||||
Это структурированное Python-приложение для парсинга каталога товаров с сайта `elixirpeptide.ru`, сбора информации о вариантах товаров и их ценах.
|
Это распределенное Python-приложение для мониторинга каталога товаров с сайта `elixirpeptide.ru`. Сервис автоматически отслеживает изменения в ценах и наличии товаров, и отправляет отчеты в Telegram.
|
||||||
|
|
||||||
## 🚀 Новые возможности (v2.0)
|
## 🚀 Архитектура (v3.0)
|
||||||
|
|
||||||
### ✅ Исправленные критические проблемы:
|
Проект перешел от простого парсера к полноценному сервису мониторинга с асинхронной обработкой данных.
|
||||||
- **Устранено дублирование кода** в `engine.py` и `database.py`
|
|
||||||
- **Дополнены зависимости** в `requirements.txt` (pydantic, lxml, python-dotenv)
|
|
||||||
- **Улучшена обработка ошибок** с детальной диагностикой и retry механизмом
|
|
||||||
- **Добавлена валидация данных** на всех уровнях приложения
|
|
||||||
|
|
||||||
### 🎯 Новые функции:
|
- **Точка входа**: `monitoring_service.py` — главный скрипт, запускаемый по расписанию (cron).
|
||||||
- **Retry стратегия** для HTTP запросов с экспоненциальной задержкой
|
- **Оркестратор**: `src/orchestrator.py` управляет процессом парсинга.
|
||||||
- **Детальная статистика** выполнения парсинга
|
- **Анализатор**: `src/analyzer.py` сравнивает данные последнего и предпоследнего запусков, выявляя изменения.
|
||||||
- **Валидация конфигурации** при запуске
|
- **Уведомления**: `src/utils/telegram_sender.py` отправляет HTML-отчеты об изменениях в Telegram.
|
||||||
- **Поддержка переменных окружения** через `.env` файл
|
- **Очередь сообщений**: Интеграция с **RabbitMQ** для асинхронного экспорта данных о товарах и логах.
|
||||||
- **Graceful degradation** - продолжение работы при частичных сбоях
|
- **База данных**: **SQLite** используется для хранения истории парсинга и данных для анализа.
|
||||||
- **Улучшенное логирование** с категоризацией ошибок
|
|
||||||
|
|
||||||
### 🔧 Улучшения производительности:
|
## ✨ Ключевые возможности
|
||||||
- **Адаптивные таймауты** для HTTP запросов
|
|
||||||
- **Проверка на блокировку/капчу** в ответах сервера
|
- **Автоматический мониторинг**: Запуск по расписанию для непрерывного отслеживания.
|
||||||
- **Оптимизированная обработка данных** с пропуском некорректных записей
|
- **Отчеты об изменениях**: Уведомления в Telegram о новых, удаленных товарах, а также изменениях цен и наличия.
|
||||||
|
- **Асинхронный экспорт**: Отправка данных в RabbitMQ для дальнейшей обработки другими системами.
|
||||||
|
- **Надежность**: Механизм retry-запросов, детальное логирование и отказоустойчивость.
|
||||||
|
- **Гибкая конфигурация**: Все параметры настраиваются через `.env` файл.
|
||||||
|
|
||||||
## Структура Проекта
|
## Структура Проекта
|
||||||
|
|
||||||
Проект организован по принципу семантического разделения ответственности для удобства поддержки и дальнейшей разработки.
|
- `monitoring_service.py`: **[NEW]** Главная точка входа для запуска по расписанию.
|
||||||
|
|
||||||
- `src/`: Основная директория с исходным кодом.
|
- `src/`: Основная директория с исходным кодом.
|
||||||
- `config.py`: Все настройки (URL, селекторы, флаги сохранения).
|
- `orchestrator.py`: **[NEW]** Управляет всем процессом парсинга.
|
||||||
- `main.py`: Точка входа в приложение, оркестратор процесса.
|
- `analyzer.py`: **[NEW]** Анализирует данные и формирует отчеты.
|
||||||
- `core/`: Пакет с ядром приложения.
|
- `main.py`: Точка входа для ручного запуска парсинга.
|
||||||
|
- `core/`: Ядро приложения.
|
||||||
|
- `settings.py`: **[ENHANCED]** Pydantic-модели для конфигурации (включая Telegram, RabbitMQ).
|
||||||
- `database.py`: Логика работы с базой данных SQLite.
|
- `database.py`: Логика работы с базой данных SQLite.
|
||||||
- `logging_config.py`: Настройка системы логирования.
|
- `rabbitmq.py`: **[NEW]** Клиент для работы с RabbitMQ.
|
||||||
- **`models.py`: [NEW FILE] Pydantic модели данных (ProductVariant, LogRecordModel).**
|
- `scraper/`: Логика парсинга.
|
||||||
- **`settings.py`: [ENHANCED] Конфигурация с валидацией и поддержкой .env**
|
- `engine.py`: Улучшенный движок парсера.
|
||||||
- `scraper/`: Пакет с логикой парсинга.
|
- `utils/`: Вспомогательные утилиты.
|
||||||
- **`engine.py`: [ENHANCED] Класс Scraper с retry механизмом и улучшенной обработкой ошибок**
|
- `telegram_sender.py`: **[NEW]** Отправка уведомлений в Telegram.
|
||||||
- `utils/`: Пакет со вспомогательными утилитами.
|
- `requirements.txt`: **[UPDATED]** Обновленный список зависимостей.
|
||||||
- **`exporters.py`: [ENHANCED] Функции для сохранения данных с валидацией**
|
- `.env.example`: Пример файла с переменными окружения.
|
||||||
- `requirements.txt`: Список зависимостей проекта.
|
- `RABBITMQ_SETUP.md`: **[NEW]** Руководство по настройке RabbitMQ.
|
||||||
- `price_data_final/`: Директория для хранения результатов (создается автоматически).
|
|
||||||
- **`.env.example`: [NEW] Пример файла с переменными окружения**
|
|
||||||
|
|
||||||
## Установка и Запуск
|
## Установка и Запуск
|
||||||
|
|
||||||
### 1. Клонирование и настройка окружения
|
### 1. Клонирование и настройка окружения
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
git clone <your-repo-url>
|
git clone https://gitea.bebesh.ru/busya/peptide-parcer.git
|
||||||
cd peptide_parser_project
|
cd peptide-parcer
|
||||||
|
|
||||||
# Создание виртуального окружения
|
# Создание виртуального окружения
|
||||||
python -m venv venv
|
python -m venv venv
|
||||||
@@ -62,139 +59,68 @@ source venv/bin/activate # Для Windows: venv\Scripts\activate
|
|||||||
pip install -r requirements.txt
|
pip install -r requirements.txt
|
||||||
```
|
```
|
||||||
|
|
||||||
### 2. Настройка конфигурации
|
### 2. Настройка RabbitMQ (Опционально)
|
||||||
|
|
||||||
#### Вариант A: Через переменные окружения
|
Для асинхронного экспорта данных требуется RabbitMQ. Рекомендуется запуск через Docker:
|
||||||
```bash
|
```bash
|
||||||
# Создайте файл .env на основе .env.example
|
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
|
||||||
cp .env.example .env
|
```
|
||||||
|
Подробности см. в `RABBITMQ_SETUP.md`.
|
||||||
|
|
||||||
# Отредактируйте .env файл под ваши нужды
|
### 3. Настройка конфигурации
|
||||||
nano .env
|
|
||||||
|
Скопируйте `.env.example` в `.env` и заполните необходимые параметры.
|
||||||
|
|
||||||
|
```bash
|
||||||
|
cp .env.example .env
|
||||||
|
nano .env # Отредактируйте файл
|
||||||
```
|
```
|
||||||
|
|
||||||
#### Вариант B: Прямое редактирование настроек
|
**Ключевые переменные окружения:**
|
||||||
Отредактируйте `src/core/settings.py` для изменения настроек по умолчанию.
|
- `TELEGRAM_BOT_TOKEN`: Токен вашего Telegram-бота.
|
||||||
|
- `TELEGRAM_CHAT_ID`: ID чата, куда будут приходить отчеты.
|
||||||
|
- `ENABLE_RABBITMQ_EXPORT`: `true` или `false` для включения/отключения экспорта в RabbitMQ.
|
||||||
|
- `RABBITMQ_HOST`, `RABBITMQ_PORT` и т.д.: Настройки подключения к RabbitMQ.
|
||||||
|
|
||||||
### 3. Запуск парсера
|
### 4. Запуск
|
||||||
|
|
||||||
|
#### Автоматический мониторинг (рекомендуется)
|
||||||
|
Настройте запуск `monitoring_service.py` через `cron` или планировщик задач.
|
||||||
|
```bash
|
||||||
|
# Пример для cron, запуск каждый час
|
||||||
|
# 0 * * * * /path/to/your/project/venv/bin/python /path/to/your/project/monitoring_service.py >> /path/to/your/project/logs/cron.log 2>&1
|
||||||
|
|
||||||
|
# Ручной запуск для проверки
|
||||||
|
python monitoring_service.py
|
||||||
|
```
|
||||||
|
|
||||||
|
#### Ручной запуск парсинга (без анализа и отчета)
|
||||||
```bash
|
```bash
|
||||||
python src/main.py
|
python src/main.py
|
||||||
```
|
```
|
||||||
|
|
||||||
## Конфигурация
|
|
||||||
|
|
||||||
### Переменные окружения (.env файл)
|
|
||||||
|
|
||||||
| Переменная | Описание | По умолчанию |
|
|
||||||
|------------|----------|--------------|
|
|
||||||
| `PARSER_BASE_URL` | Базовый URL сайта | `https://elixirpeptide.ru` |
|
|
||||||
| `PARSER_CATALOG_URL` | URL каталога товаров | `https://elixirpeptide.ru/catalog/` |
|
|
||||||
| `PARSER_SAVE_TO_CSV` | Сохранять в CSV | `true` |
|
|
||||||
| `PARSER_SAVE_TO_DB` | Сохранять в базу данных | `true` |
|
|
||||||
| `PARSER_LOG_TO_DB` | Логировать в базу данных | `true` |
|
|
||||||
| `PARSER_TIMEOUT` | Таймаут HTTP запросов (сек) | `30` |
|
|
||||||
| `PARSER_DELAY` | Задержка между запросами (сек) | `1.0` |
|
|
||||||
| `PARSER_RETRIES` | Максимум попыток для запросов | `3` |
|
|
||||||
|
|
||||||
### Настройки производительности
|
|
||||||
|
|
||||||
- **Таймаут запросов**: 30 секунд (настраивается)
|
|
||||||
- **Задержка между запросами**: 1 секунда (настраивается)
|
|
||||||
- **Retry стратегия**: 3 попытки с экспоненциальной задержкой
|
|
||||||
- **Graceful degradation**: Продолжение работы при ошибках отдельных запросов
|
|
||||||
|
|
||||||
## Результаты
|
## Результаты
|
||||||
|
|
||||||
### Файлы результатов
|
### Отчеты в Telegram
|
||||||
|
Сервис присылает в указанный чат отчет, если обнаруживает изменения:
|
||||||
|
- **💰 Изменились цены**: список товаров с новой и старой ценой.
|
||||||
|
- **📦 Изменилось наличие**: список товаров, которые появились или закончились.
|
||||||
|
- **✨ Новые товары**: список добавленных товаров.
|
||||||
|
- **🗑️ Удаленные товары**: список удаленных товаров.
|
||||||
|
|
||||||
- **CSV файл**: `price_data_final/prices_full_catalog_YYYY-MM-DD_HHMMSS.csv`
|
### Данные
|
||||||
- **База данных**: `price_data_final/parser_data.db` (SQLite)
|
- **База данных**: `price_data_final/parser_data.db` — хранит всю историю парсинга.
|
||||||
|
- **CSV файл**: `price_data_final/prices_full_catalog_*.csv` — создается при ручном запуске.
|
||||||
### Структура данных
|
- **RabbitMQ**: Сообщения с данными о товарах и логах отправляются в очереди `price_parser.products` и `price_parser.logs`.
|
||||||
|
|
||||||
```csv
|
|
||||||
name,volume,price
|
|
||||||
"Peptide X","30ml",1500
|
|
||||||
"Peptide Y","50ml",2500
|
|
||||||
```
|
|
||||||
|
|
||||||
### Логирование
|
|
||||||
|
|
||||||
- **Консольные логи**: Детальная информация о процессе парсинга
|
|
||||||
- **Логи в БД**: Если `PARSER_LOG_TO_DB=true`, все логи сохраняются в таблицу `logs`
|
|
||||||
|
|
||||||
## Обработка ошибок
|
|
||||||
|
|
||||||
### Типы обрабатываемых ошибок
|
|
||||||
|
|
||||||
1. **Сетевые ошибки**: Timeout, ConnectionError, HTTPError
|
|
||||||
2. **Ошибки парсинга**: Отсутствующие элементы, некорректные данные
|
|
||||||
3. **Ошибки файловой системы**: Права доступа, отсутствие директорий
|
|
||||||
4. **Ошибки базы данных**: SQLite ошибки, проблемы с подключением
|
|
||||||
|
|
||||||
### Стратегия восстановления
|
|
||||||
|
|
||||||
- **Retry механизм**: Автоматические повторные попытки для сетевых ошибок
|
|
||||||
- **Graceful degradation**: Пропуск проблемных записей с продолжением работы
|
|
||||||
- **Детальная диагностика**: Подробные логи для анализа проблем
|
|
||||||
|
|
||||||
## Мониторинг и статистика
|
|
||||||
|
|
||||||
### Статистика выполнения
|
|
||||||
|
|
||||||
Приложение выводит детальную статистику:
|
|
||||||
|
|
||||||
```
|
|
||||||
[FINAL_STATS] Время выполнения: 45.23 секунд
|
|
||||||
[FINAL_STATS] Успешность: 95/100 (95.0%)
|
|
||||||
[STATS] Успешно: 95, Ошибок: 5
|
|
||||||
```
|
|
||||||
|
|
||||||
### Метрики
|
|
||||||
|
|
||||||
- Общее количество URL для парсинга
|
|
||||||
- Количество успешно обработанных записей
|
|
||||||
- Количество ошибок
|
|
||||||
- Время выполнения
|
|
||||||
- Процент успешности
|
|
||||||
|
|
||||||
## Разработка
|
|
||||||
|
|
||||||
### Архитектурные принципы
|
|
||||||
|
|
||||||
1. **Разделение ответственности**: Каждый модуль отвечает за свою область
|
|
||||||
2. **Типизация**: Использование Pydantic для валидации данных
|
|
||||||
3. **Обработка ошибок**: Graceful handling с детальной диагностикой
|
|
||||||
4. **Конфигурируемость**: Гибкие настройки через переменные окружения
|
|
||||||
5. **Логирование**: Структурированное логирование на всех уровнях
|
|
||||||
|
|
||||||
### Добавление новых функций
|
|
||||||
|
|
||||||
1. **Новые форматы экспорта**: Добавьте функции в `src/utils/exporters.py`
|
|
||||||
2. **Новые селекторы**: Обновите `ScraperSelectors` в `src/core/settings.py`
|
|
||||||
3. **Новые поля данных**: Расширьте модель `ProductVariant` в `src/core/models.py`
|
|
||||||
|
|
||||||
## Устранение неполадок
|
## Устранение неполадок
|
||||||
|
|
||||||
### Частые проблемы
|
1. **Не приходят сообщения в Telegram**:
|
||||||
|
- Проверьте правильность `TELEGRAM_BOT_TOKEN` и `TELEGRAM_CHAT_ID`.
|
||||||
1. **"Не удается подключиться к базовому URL"**
|
- Убедитесь, что бот добавлен в чат с нужными правами.
|
||||||
- Проверьте интернет-соединение
|
2. **Ошибка подключения к RabbitMQ**:
|
||||||
- Убедитесь, что сайт доступен
|
- Проверьте, что RabbitMQ запущен и доступен.
|
||||||
- Проверьте настройки прокси
|
- Убедитесь в корректности настроек в `.env`.
|
||||||
|
3. **Ошибки парсинга**:
|
||||||
2. **"Не найдено ни одной ссылки на товар"**
|
- Проверьте доступность сайта `elixirpeptide.ru`.
|
||||||
- Проверьте CSS селекторы в настройках
|
- Возможно, изменилась структура HTML-страниц. В этом случае нужно обновить CSS-селекторы в `src/core/settings.py`.
|
||||||
- Убедитесь, что структура сайта не изменилась
|
|
||||||
|
|
||||||
3. **"Ошибка при сохранении в БД"**
|
|
||||||
- Проверьте права доступа к директории
|
|
||||||
- Убедитесь, что SQLite поддерживается
|
|
||||||
|
|
||||||
### Логи для диагностики
|
|
||||||
|
|
||||||
Все ошибки логируются с детальной информацией. Проверьте:
|
|
||||||
- Консольные логи при запуске
|
|
||||||
- Логи в базе данных (если включено)
|
|
||||||
- Файлы результатов для проверки данных
|
|
||||||
|
|||||||
@@ -3,12 +3,18 @@
|
|||||||
# <IMPORTS>
|
# <IMPORTS>
|
||||||
import logging
|
import logging
|
||||||
import asyncio
|
import asyncio
|
||||||
|
import sys
|
||||||
|
import os
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from src.orchestrator import AppOrchestrator
|
|
||||||
from src.core.settings import settings
|
# Добавляем 'src' в sys.path, чтобы внутренние импорты (e.g., `from core...`) работали
|
||||||
from src.core.logging_config import setup_logging
|
sys.path.insert(0, os.path.join(os.path.abspath(os.path.dirname(__file__)), 'src'))
|
||||||
from src.analyzer import DataAnalyzer
|
|
||||||
from src.utils.telegram_sender import send_telegram_notification
|
from orchestrator import AppOrchestrator
|
||||||
|
from core.settings import settings
|
||||||
|
from core.logging_config import setup_logging
|
||||||
|
from analyzer import DataAnalyzer
|
||||||
|
from utils.telegram_sender import send_telegram_notification
|
||||||
# </IMPORTS>
|
# </IMPORTS>
|
||||||
|
|
||||||
# <CONTRACT for="main">
|
# <CONTRACT for="main">
|
||||||
|
|||||||
106
src/analyzer.py
106
src/analyzer.py
@@ -2,28 +2,36 @@
|
|||||||
|
|
||||||
# <IMPORTS>
|
# <IMPORTS>
|
||||||
import logging
|
import logging
|
||||||
from typing import List, Dict, Tuple
|
import sqlite3
|
||||||
from sqlalchemy.orm import sessionmaker
|
from typing import List, Dict, Tuple, Any
|
||||||
from src.core.models import ProductVariant, ParsingRun
|
|
||||||
from sqlalchemy import create_engine
|
|
||||||
from src.core.settings import settings
|
from src.core.settings import settings
|
||||||
# </IMPORTS>
|
# </IMPORTS>
|
||||||
|
|
||||||
# <MAIN_CONTRACT for="DataAnalyzer">
|
# <MAIN_CONTRACT for="DataAnalyzer">
|
||||||
# description: "Анализирует данные двух последних запусков парсера и выявляет изменения."
|
# description: "Анализирует данные двух последних запусков парсера и выявляет изменения, используя прямые SQL-запросы."
|
||||||
# </MAIN_CONTRACT>
|
# </MAIN_CONTRACT>
|
||||||
class DataAnalyzer:
|
class DataAnalyzer:
|
||||||
# <INIT>
|
# <INIT>
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.engine = create_engine(settings.db_path.absolute().as_uri())
|
self.db_path = settings.db_path
|
||||||
self.Session = sessionmaker(bind=self.engine)
|
|
||||||
self.logger = logging.getLogger(__name__)
|
self.logger = logging.getLogger(__name__)
|
||||||
# </INIT>
|
# </INIT>
|
||||||
|
|
||||||
|
def _execute_query(self, query: str, params: tuple = ()) -> List[Any]:
|
||||||
|
try:
|
||||||
|
with sqlite3.connect(self.db_path) as conn:
|
||||||
|
cursor = conn.cursor()
|
||||||
|
cursor.execute(query, params)
|
||||||
|
return cursor.fetchall()
|
||||||
|
except sqlite3.Error as e:
|
||||||
|
self.logger.error(f"Ошибка выполнения SQL-запроса: {e}")
|
||||||
|
return []
|
||||||
|
|
||||||
# <HELPER name="_get_last_two_runs">
|
# <HELPER name="_get_last_two_runs">
|
||||||
def _get_last_two_runs(self, session) -> Tuple[str, str]:
|
def _get_last_two_runs(self) -> Tuple[str, str]:
|
||||||
"""Возвращает ID двух последних успешных запусков."""
|
"""Возвращает ID двух последних успешных запусков."""
|
||||||
runs = session.query(ParsingRun.run_id).order_by(ParsingRun.start_time.desc()).limit(2).all()
|
query = "SELECT run_id FROM parsing_runs ORDER BY start_time DESC LIMIT 2"
|
||||||
|
runs = self._execute_query(query)
|
||||||
if len(runs) < 2:
|
if len(runs) < 2:
|
||||||
self.logger.warning("Найдено менее двух запусков. Сравнение невозможно.")
|
self.logger.warning("Найдено менее двух запусков. Сравнение невозможно.")
|
||||||
return None, None
|
return None, None
|
||||||
@@ -31,10 +39,12 @@ class DataAnalyzer:
|
|||||||
# </HELPER>
|
# </HELPER>
|
||||||
|
|
||||||
# <HELPER name="_get_data_for_run">
|
# <HELPER name="_get_data_for_run">
|
||||||
def _get_data_for_run(self, session, run_id: str) -> Dict[str, ProductVariant]:
|
def _get_data_for_run(self, run_id: str) -> Dict[str, Dict]:
|
||||||
"""Возвращает данные о товарах для указанного запуска в виде словаря {url: ProductVariant}."""
|
"""Возвращает данные о товарах для указанного запуска в виде словаря {url: product_data}."""
|
||||||
variants = session.query(ProductVariant).filter(ProductVariant.run_id == run_id).all()
|
query = "SELECT name, volume, price, url, is_in_stock FROM products WHERE run_id = ?"
|
||||||
return {v.url: v for v in variants}
|
rows = self._execute_query(query, (run_id,))
|
||||||
|
# Преобразуем sqlite3.Row в словари
|
||||||
|
return {row[3]: dict(zip(['name', 'volume', 'price', 'url', 'is_in_stock'], row)) for row in rows}
|
||||||
# </HELPER>
|
# </HELPER>
|
||||||
|
|
||||||
# <ACTION name="analyze">
|
# <ACTION name="analyze">
|
||||||
@@ -42,42 +52,35 @@ class DataAnalyzer:
|
|||||||
"""
|
"""
|
||||||
Сравнивает два последних запуска и генерирует HTML-отчет об изменениях.
|
Сравнивает два последних запуска и генерирует HTML-отчет об изменениях.
|
||||||
"""
|
"""
|
||||||
# <CORE_LOGIC>
|
current_run_id, prev_run_id = self._get_last_two_runs()
|
||||||
with self.Session() as session:
|
if not current_run_id or not prev_run_id:
|
||||||
current_run_id, prev_run_id = self._get_last_two_runs(session)
|
return ""
|
||||||
if not current_run_id or not prev_run_id:
|
|
||||||
return ""
|
|
||||||
|
|
||||||
self.logger.info(f"Сравнение запуска {current_run_id} с предыдущим з<EFBFBD><EFBFBD>пуском {prev_run_id}")
|
self.logger.info(f"Сравнение запуска {current_run_id} с предыдущим запуском {prev_run_id}")
|
||||||
|
|
||||||
current_data = self._get_data_for_run(session, current_run_id)
|
current_data = self._get_data_for_run(current_run_id)
|
||||||
prev_data = self._get_data_for_run(session, prev_run_id)
|
prev_data = self._get_data_for_run(prev_run_id)
|
||||||
|
|
||||||
price_changes = []
|
price_changes = []
|
||||||
availability_changes = []
|
availability_changes = []
|
||||||
new_items = []
|
new_items = []
|
||||||
removed_items = []
|
removed_items = []
|
||||||
|
|
||||||
# Поиск изменений и новых товаров
|
for url, current_item in current_data.items():
|
||||||
for url, current_item in current_data.items():
|
prev_item = prev_data.get(url)
|
||||||
prev_item = prev_data.get(url)
|
if prev_item:
|
||||||
if prev_item:
|
if current_item['price'] != prev_item['price']:
|
||||||
# Изменение цены
|
price_changes.append((current_item, prev_item['price']))
|
||||||
if current_item.price != prev_item.price:
|
if current_item['is_in_stock'] != prev_item['is_in_stock']:
|
||||||
price_changes.append((current_item, prev_item.price))
|
availability_changes.append(current_item)
|
||||||
# Изменение статуса наличия
|
else:
|
||||||
if current_item.is_available != prev_item.is_available:
|
new_items.append(current_item)
|
||||||
availability_changes.append(current_item)
|
|
||||||
else:
|
for url, prev_item in prev_data.items():
|
||||||
new_items.append(current_item)
|
if url not in current_data:
|
||||||
|
removed_items.append(prev_item)
|
||||||
# Поиск удаленных товаров
|
|
||||||
for url, prev_item in prev_data.items():
|
|
||||||
if url not in current_data:
|
|
||||||
removed_items.append(prev_item)
|
|
||||||
|
|
||||||
return self._format_report(price_changes, availability_changes, new_items, removed_items)
|
return self._format_report(price_changes, availability_changes, new_items, removed_items)
|
||||||
# </CORE_LOGIC>
|
|
||||||
# </ACTION>
|
# </ACTION>
|
||||||
|
|
||||||
# <HELPER name="_format_report">
|
# <HELPER name="_format_report">
|
||||||
@@ -92,25 +95,24 @@ class DataAnalyzer:
|
|||||||
if price_changes:
|
if price_changes:
|
||||||
report_parts.append("\n<b>💰 Изменились цены:</b>")
|
report_parts.append("\n<b>💰 Изменились цены:</b>")
|
||||||
for item, old_price in price_changes:
|
for item, old_price in price_changes:
|
||||||
report_parts.append(f" • <a href='{item.url}'>{item.name} ({item.volume})</a>: {old_price} ₽ → <b>{item.price} ₽</b>")
|
report_parts.append(f" • <a href='{item['url']}'>{item['name']} ({item['volume']})</a>: {old_price} ₽ → <b>{item['price']} ₽</b>")
|
||||||
|
|
||||||
if availability_changes:
|
if availability_changes:
|
||||||
report_parts.append("\n<b>📦 Изменилось наличие:</b>")
|
report_parts.append("\n<b>📦 Изменилось наличие:</b>")
|
||||||
for item in availability_changes:
|
for item in availability_changes:
|
||||||
status = "✅ В наличии" if item.is_available else "❌ Нет в наличии"
|
status = "✅ В наличии" if item['is_in_stock'] else "❌ Нет в наличии"
|
||||||
report_parts.append(f" • <a href='{item.url}'>{item.name} ({item.volume})</a>: <b>{status}</b>")
|
report_parts.append(f" • <a href='{item['url']}'>{item['name']} ({item['volume']})</a>: <b>{status}</b>")
|
||||||
|
|
||||||
if new_items:
|
if new_items:
|
||||||
report_parts.append("\n<b>✨ Новые товары:</b>")
|
report_parts.append("\n<b>✨ Новые товары:</b>")
|
||||||
for item in new_items:
|
for item in new_items:
|
||||||
report_parts.append(f" • <a href='{item.url}'>{item.name} ({item.volume})</a> - {item.price} ₽")
|
report_parts.append(f" • <a href='{item['url']}'>{item['name']} ({item['volume']})</a> - {item['price']} ₽")
|
||||||
|
|
||||||
if removed_items:
|
if removed_items:
|
||||||
report_parts.append("\n<b>🗑️ Удаленные товары:</b>")
|
report_parts.append("\n<b>🗑️ Удаленные товары:</b>")
|
||||||
for item in removed_items:
|
for item in removed_items:
|
||||||
report_parts.append(f" • <a href='{item.url}'>{item.name} ({item.volume})</a>")
|
report_parts.append(f" • <a href='{item['url']}'>{item['name']} ({item['volume']})</a>")
|
||||||
|
|
||||||
return "\n".join(report_parts)
|
return "\n".join(report_parts)
|
||||||
# </HELPER>
|
# </HELPER>
|
||||||
# <COHERENCE_CHECK status="PASSED" />
|
# <COHERENCE_CHECK status="PASSED" />
|
||||||
/>
|
|
||||||
@@ -102,6 +102,13 @@ def init_database(db_path: Path, run_id: str):
|
|||||||
db_path.parent.mkdir(parents=True, exist_ok=True)
|
db_path.parent.mkdir(parents=True, exist_ok=True)
|
||||||
with sqlite3.connect(db_path) as con:
|
with sqlite3.connect(db_path) as con:
|
||||||
cur = con.cursor()
|
cur = con.cursor()
|
||||||
|
cur.execute("""
|
||||||
|
CREATE TABLE IF NOT EXISTS parsing_runs (
|
||||||
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||||
|
run_id TEXT NOT NULL UNIQUE,
|
||||||
|
start_time TIMESTAMP NOT NULL
|
||||||
|
)
|
||||||
|
""")
|
||||||
cur.execute("""
|
cur.execute("""
|
||||||
CREATE TABLE IF NOT EXISTS products (
|
CREATE TABLE IF NOT EXISTS products (
|
||||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||||
@@ -111,7 +118,8 @@ def init_database(db_path: Path, run_id: str):
|
|||||||
price INTEGER NOT NULL,
|
price INTEGER NOT NULL,
|
||||||
url TEXT,
|
url TEXT,
|
||||||
is_in_stock BOOLEAN,
|
is_in_stock BOOLEAN,
|
||||||
parsed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
parsed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||||
|
FOREIGN KEY (run_id) REFERENCES parsing_runs (run_id)
|
||||||
)
|
)
|
||||||
""")
|
""")
|
||||||
cur.execute("""
|
cur.execute("""
|
||||||
@@ -120,7 +128,8 @@ def init_database(db_path: Path, run_id: str):
|
|||||||
run_id TEXT NOT NULL,
|
run_id TEXT NOT NULL,
|
||||||
timestamp TEXT NOT NULL,
|
timestamp TEXT NOT NULL,
|
||||||
level TEXT NOT NULL,
|
level TEXT NOT NULL,
|
||||||
message TEXT NOT NULL
|
message TEXT NOT NULL,
|
||||||
|
FOREIGN KEY (run_id) REFERENCES parsing_runs (run_id)
|
||||||
)
|
)
|
||||||
""")
|
""")
|
||||||
con.commit()
|
con.commit()
|
||||||
|
|||||||
@@ -12,12 +12,7 @@ import pika
|
|||||||
from pika.adapters.blocking_connection import BlockingChannel
|
from pika.adapters.blocking_connection import BlockingChannel
|
||||||
from pika.exceptions import AMQPConnectionError, AMQPChannelError, ConnectionClosed
|
from pika.exceptions import AMQPConnectionError, AMQPChannelError, ConnectionClosed
|
||||||
|
|
||||||
from .settings import (
|
from .settings import settings
|
||||||
RABBITMQ_HOST, RABBITMQ_PORT, RABBITMQ_USERNAME, RABBITMQ_PASSWORD,
|
|
||||||
RABBITMQ_VIRTUAL_HOST, RABBITMQ_CONNECTION_TIMEOUT, RABBITMQ_HEARTBEAT,
|
|
||||||
RABBITMQ_BLOCKED_CONNECTION_TIMEOUT, RABBITMQ_PRODUCTS_QUEUE,
|
|
||||||
RABBITMQ_LOGS_QUEUE, RABBITMQ_EXCHANGE
|
|
||||||
)
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
@@ -41,17 +36,17 @@ class RabbitMQConnection:
|
|||||||
Returns:
|
Returns:
|
||||||
pika.ConnectionParameters: Параметры подключения
|
pika.ConnectionParameters: Параметры подключения
|
||||||
"""
|
"""
|
||||||
credentials = pika.PlainCredentials(RABBITMQ_USERNAME, RABBITMQ_PASSWORD)
|
credentials = pika.PlainCredentials(settings.rabbitmq_user, settings.rabbitmq_password)
|
||||||
return pika.ConnectionParameters(
|
return pika.ConnectionParameters(
|
||||||
host=RABBITMQ_HOST,
|
host=settings.rabbitmq_host,
|
||||||
port=RABBITMQ_PORT,
|
port=settings.rabbitmq_port,
|
||||||
virtual_host=RABBITMQ_VIRTUAL_HOST,
|
virtual_host=settings.rabbitmq_vhost,
|
||||||
credentials=credentials,
|
credentials=credentials,
|
||||||
connection_attempts=3,
|
connection_attempts=3,
|
||||||
retry_delay=5,
|
retry_delay=5,
|
||||||
socket_timeout=RABBITMQ_CONNECTION_TIMEOUT,
|
socket_timeout=30, # Hardcoded for now
|
||||||
heartbeat=RABBITMQ_HEARTBEAT,
|
heartbeat=600, # Hardcoded for now
|
||||||
blocked_connection_timeout=RABBITMQ_BLOCKED_CONNECTION_TIMEOUT
|
blocked_connection_timeout=300 # Hardcoded for now
|
||||||
)
|
)
|
||||||
|
|
||||||
def connect(self) -> bool:
|
def connect(self) -> bool:
|
||||||
|
|||||||
@@ -82,6 +82,17 @@ class Settings(BaseModel):
|
|||||||
telegram_chat_id: str = Field(default=os.getenv('TELEGRAM_CHAT_ID', ''), description="ID чата для отправки уведомлений")
|
telegram_chat_id: str = Field(default=os.getenv('TELEGRAM_CHAT_ID', ''), description="ID чата для отправки уведомлений")
|
||||||
# </CONFIG>
|
# </CONFIG>
|
||||||
|
|
||||||
|
# <CONFIG name="rabbitmq_settings">
|
||||||
|
rabbitmq_host: str = Field(default=os.getenv('RABBITMQ_HOST', 'localhost'))
|
||||||
|
rabbitmq_port: int = Field(default=int(os.getenv('RABBITMQ_PORT', 5672)))
|
||||||
|
rabbitmq_user: str = Field(default=os.getenv('RABBITMQ_USERNAME', 'guest'))
|
||||||
|
rabbitmq_password: str = Field(default=os.getenv('RABBITMQ_PASSWORD', 'guest'))
|
||||||
|
rabbitmq_vhost: str = Field(default=os.getenv('RABBITMQ_VIRTUAL_HOST', '/'))
|
||||||
|
rabbitmq_products_queue: str = Field(default=os.getenv('RABBITMQ_PRODUCTS_QUEUE', 'price_parser.products'))
|
||||||
|
rabbitmq_logs_queue: str = Field(default=os.getenv('RABBITMQ_LOGS_QUEUE', 'price_parser.logs'))
|
||||||
|
rabbitmq_exchange: str = Field(default=os.getenv('RABBITMQ_EXCHANGE', 'price_parser.exchange'))
|
||||||
|
# </CONFIG>
|
||||||
|
|
||||||
# <CONFIG name="selectors_config_instance">
|
# <CONFIG name="selectors_config_instance">
|
||||||
selectors: ScraperSelectors = ScraperSelectors(
|
selectors: ScraperSelectors = ScraperSelectors(
|
||||||
CATALOG_PRODUCT_LINK='.product-card h4 a.product-link',
|
CATALOG_PRODUCT_LINK='.product-card h4 a.product-link',
|
||||||
|
|||||||
@@ -16,10 +16,13 @@ from concurrent.futures import ThreadPoolExecutor, as_completed
|
|||||||
|
|
||||||
from core.settings import Settings, ENABLE_RABBITMQ_EXPORT, ENABLE_CSV_EXPORT, ENABLE_DATABASE_EXPORT
|
from core.settings import Settings, ENABLE_RABBITMQ_EXPORT, ENABLE_CSV_EXPORT, ENABLE_DATABASE_EXPORT
|
||||||
from core.models import ProductVariant
|
from core.models import ProductVariant
|
||||||
from core.database import init_database, save_data_to_db, DatabaseManager
|
from core.database import DatabaseManager, init_database, save_data_to_db
|
||||||
from core.logging_config import setup_logging
|
from core.rabbitmq import RabbitMQExporter
|
||||||
from scraper.engine import Scraper
|
from scraper.engine import Scraper
|
||||||
from utils.exporters import save_data_to_csv, export_data_to_rabbitmq, export_logs_to_rabbitmq, validate_rabbitmq_connection
|
from utils.exporters import save_data_to_csv
|
||||||
|
from core.logging_config import setup_logging
|
||||||
|
from datetime import datetime
|
||||||
|
import sqlite3
|
||||||
# </IMPORTS>
|
# </IMPORTS>
|
||||||
|
|
||||||
# <MAIN_CONTRACT for="AppOrchestrator">
|
# <MAIN_CONTRACT for="AppOrchestrator">
|
||||||
@@ -55,12 +58,24 @@ class AppOrchestrator:
|
|||||||
# </DEPENDENCY>
|
# </DEPENDENCY>
|
||||||
|
|
||||||
self.logger = logging.getLogger(self.__class__.__name__)
|
self.logger = logging.getLogger(self.__class__.__name__)
|
||||||
# </INIT>
|
self._setup()
|
||||||
|
|
||||||
|
def _register_run(self):
|
||||||
|
"""Регистрирует текущий запуск в базе данных."""
|
||||||
|
if not self.db_manager:
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
with self.db_manager as conn:
|
||||||
|
conn.execute(
|
||||||
|
"INSERT INTO parsing_runs (run_id, start_time) VALUES (?, ?)",
|
||||||
|
(self.run_id, datetime.now())
|
||||||
|
)
|
||||||
|
conn.commit()
|
||||||
|
self.logger.info(f"Запуск {self.run_id} зарегистрирован в БД.")
|
||||||
|
except sqlite3.Error as e:
|
||||||
|
self.logger.error(f"Не удалось зарегистрировать запуск {self.run_id} в БД: {e}")
|
||||||
|
raise
|
||||||
|
|
||||||
# <CONTRACT for="_error_context">
|
|
||||||
# description: "Контекстный менеджер для централизованной обработки ошибок в ключевых операциях."
|
|
||||||
# </CONTRACT>
|
|
||||||
# <HELPER name="_error_context">
|
|
||||||
@contextmanager
|
@contextmanager
|
||||||
def _error_context(self, operation: str):
|
def _error_context(self, operation: str):
|
||||||
"""Контекстный менеджер для обработки ошибок с детальной диагностикой."""
|
"""Контекстный менеджер для обработки ошибок с детальной диагностикой."""
|
||||||
@@ -70,7 +85,6 @@ class AppOrchestrator:
|
|||||||
self.logger.error(f"[{operation.upper()}] Ошибка в операции '{operation}': {e}", exc_info=True)
|
self.logger.error(f"[{operation.upper()}] Ошибка в операции '{operation}': {e}", exc_info=True)
|
||||||
self._log_error_details(operation, e)
|
self._log_error_details(operation, e)
|
||||||
raise
|
raise
|
||||||
# </HELPER>
|
|
||||||
|
|
||||||
# <CONTRACT for="_log_error_details">
|
# <CONTRACT for="_log_error_details">
|
||||||
# description: "Логирует расширенную информацию об ошибке для упрощения диагностики."
|
# description: "Логирует расширенную информацию об ошибке для упрощения диагностики."
|
||||||
@@ -109,10 +123,60 @@ class AppOrchestrator:
|
|||||||
setup_logging(self.run_id, self.db_manager)
|
setup_logging(self.run_id, self.db_manager)
|
||||||
|
|
||||||
if ENABLE_RABBITMQ_EXPORT:
|
if ENABLE_RABBITMQ_EXPORT:
|
||||||
if validate_rabbitmq_connection():
|
self.rabbitmq_exporter = RabbitMQExporter()
|
||||||
|
if self.rabbitmq_exporter.connection.connect():
|
||||||
self.logger.info("[ACTION:_setup] Подключение к RabbitMQ доступно")
|
self.logger.info("[ACTION:_setup] Подключение к RabbitMQ доступно")
|
||||||
else:
|
else:
|
||||||
self.logger.warning("[ACTION:_setup] Подключение к RabbitMQ недоступно, экспорт будет пропущен")
|
self.logger.warning("[ACTION:_setup] Подключение к RabbitMQ недоступно, экспорт будет пропущен")
|
||||||
|
self.rabbitmq_exporter = None
|
||||||
|
|
||||||
|
self.logger.info(f"[ACTION:_setup] Оркестратор запущен. Архитектура v2.0. Run ID: {self.run_id}")
|
||||||
|
# </CORE_LOGIC>
|
||||||
|
# </ACTION>
|
||||||
|
|
||||||
|
# <CONTRACT for="_log_error_details">
|
||||||
|
# description: "Логирует расширенную информацию об ошибке для упрощения диагностики."
|
||||||
|
# </CONTRACT>
|
||||||
|
# <HELPER name="_log_error_details">
|
||||||
|
def _log_error_details(self, operation: str, error: Exception):
|
||||||
|
"""Логирует детальную информацию об ошибке."""
|
||||||
|
error_info = {
|
||||||
|
'operation': operation,
|
||||||
|
'error_type': type(error).__name__,
|
||||||
|
'error_message': str(error),
|
||||||
|
'run_id': self.run_id,
|
||||||
|
'timestamp': datetime.now().isoformat(),
|
||||||
|
'stats': self.stats.copy()
|
||||||
|
}
|
||||||
|
self.logger.error(f"[HELPER:_log_error_details] Детали ошибки: {error_info}")
|
||||||
|
# </HELPER>
|
||||||
|
|
||||||
|
# <CONTRACT for="_setup">
|
||||||
|
# description: "Шаг 0: Инициализация всех систем (БД, логирование, RabbitMQ)."
|
||||||
|
# </CONTRACT>
|
||||||
|
# <ACTION name="_setup">
|
||||||
|
def _setup(self):
|
||||||
|
"""Инициализация всех систем перед началом парсинга."""
|
||||||
|
with self._error_context("setup"):
|
||||||
|
# <CORE_LOGIC>
|
||||||
|
self.stats['start_time'] = datetime.now()
|
||||||
|
self.logger.info(f"[ACTION:_setup] Запуск инициализации систем. Run ID: {self.run_id}")
|
||||||
|
|
||||||
|
if self.settings.save_to_db or self.settings.log_to_db:
|
||||||
|
self.settings.output_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
self.db_manager = DatabaseManager(self.settings.db_path)
|
||||||
|
init_database(self.db_manager.db_path, self.run_id)
|
||||||
|
|
||||||
|
# <DEPENDENCY name="setup_logging" />
|
||||||
|
setup_logging(self.run_id, self.db_manager)
|
||||||
|
|
||||||
|
if ENABLE_RABBITMQ_EXPORT:
|
||||||
|
self.rabbitmq_exporter = RabbitMQExporter()
|
||||||
|
if self.rabbitmq_exporter.connection.connect():
|
||||||
|
self.logger.info("[ACTION:_setup] Подключение к RabbitMQ доступно")
|
||||||
|
else:
|
||||||
|
self.logger.warning("[ACTION:_setup] Подключение к RabbitMQ недоступно, экспорт будет пропущен")
|
||||||
|
self.rabbitmq_exporter = None
|
||||||
|
|
||||||
self.logger.info(f"[ACTION:_setup] Оркестратор запущен. Архитектура v2.0. Run ID: {self.run_id}")
|
self.logger.info(f"[ACTION:_setup] Оркестратор запущен. Архитектура v2.0. Run ID: {self.run_id}")
|
||||||
# </CORE_LOGIC>
|
# </CORE_LOGIC>
|
||||||
@@ -253,12 +317,10 @@ class AppOrchestrator:
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.logger.error(f"[ACTION:_save_results] Ошибка при сохранении в БД: {e}")
|
self.logger.error(f"[ACTION:_save_results] Ошибка при сохранении в БД: {e}")
|
||||||
|
|
||||||
if ENABLE_RABBITMQ_EXPORT:
|
if ENABLE_RABBITMQ_EXPORT and self.rabbitmq_exporter:
|
||||||
# <DEPENDENCY name="export_data_to_rabbitmq" />
|
|
||||||
# ... (logic remains the same)
|
|
||||||
try:
|
try:
|
||||||
data_to_rabbitmq = [p.model_dump() for p in self.final_data]
|
data_to_rabbitmq = [p.model_dump() for p in self.final_data]
|
||||||
if export_data_to_rabbitmq(data_to_rabbitmq, self.run_id, self.run_id):
|
if self.rabbitmq_exporter.export_products(data_to_rabbitmq, self.run_id):
|
||||||
self.logger.info("[ACTION:_save_results] Данные успешно экспортированы в RabbitMQ.")
|
self.logger.info("[ACTION:_save_results] Данные успешно экспортированы в RabbitMQ.")
|
||||||
else:
|
else:
|
||||||
self.logger.error("[ACTION:_save_results] Не удалось экспортировать данные в RabbitMQ.")
|
self.logger.error("[ACTION:_save_results] Не удалось экспортировать данные в RabbitMQ.")
|
||||||
@@ -287,6 +349,10 @@ class AppOrchestrator:
|
|||||||
if self.db_manager:
|
if self.db_manager:
|
||||||
self.db_manager.close()
|
self.db_manager.close()
|
||||||
self.logger.debug("[ACTION:_cleanup] Соединение с базой данных закрыто.")
|
self.logger.debug("[ACTION:_cleanup] Соединение с базой данных закрыто.")
|
||||||
|
|
||||||
|
if self.rabbitmq_exporter:
|
||||||
|
self.rabbitmq_exporter.close()
|
||||||
|
self.logger.debug("[ACTION:_cleanup] Соединение с RabbitMQ закрыто.")
|
||||||
|
|
||||||
if duration:
|
if duration:
|
||||||
self.logger.info(f"[STATS][ACTION:_cleanup] Время выполнения: {duration.total_seconds():.2f} секунд")
|
self.logger.info(f"[STATS][ACTION:_cleanup] Время выполнения: {duration.total_seconds():.2f} секунд")
|
||||||
@@ -304,8 +370,12 @@ class AppOrchestrator:
|
|||||||
# </CONTRACT>
|
# </CONTRACT>
|
||||||
# <ENTRYPOINT name="run">
|
# <ENTRYPOINT name="run">
|
||||||
def run(self):
|
def run(self):
|
||||||
"""Основной метод, запускающий весь процесс парсинга."""
|
"""Основной метод, запускающий все этапы работы оркестратора."""
|
||||||
self.logger.info("="*50)
|
self.logger.info(f"Запуск оркестратора. Run ID: {self.run_id}")
|
||||||
|
if self.db_manager:
|
||||||
|
self._register_run()
|
||||||
|
|
||||||
|
# ... остальной код ...
|
||||||
self.logger.info("[ENTRYPOINT:run] Запуск главного процесса оркестратора.")
|
self.logger.info("[ENTRYPOINT:run] Запуск главного процесса оркестратора.")
|
||||||
self.logger.info("="*50)
|
self.logger.info("="*50)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user