Compare commits

..

6 Commits

Author SHA1 Message Date
ae6d81f6f2 readme update 2025-07-21 10:37:12 +03:00
69e90bd4f2 1 2025-07-20 09:47:12 +03:00
c59dff3397 error fix 2025-07-20 09:38:10 +03:00
5742d474fd monitoring refactor 2025-07-20 09:29:19 +03:00
40317aa2e7 end tag error 2025-07-20 09:07:07 +03:00
06cfca4cd1 +fix monitor 2025-07-20 09:01:59 +03:00
7 changed files with 262 additions and 243 deletions

236
README.md
View File

@@ -1,58 +1,55 @@
# ANCHOR: Project_README # ANCHOR: Project_README
# Семантика: Документация, описывающая проект, его структуру и способ использования. # Семантика: Документация, описывающая проект, его структуру и способ использования.
# Парсер цен для ElixirPeptide # Сервис мониторинга цен ElixirPeptide (v3.0)
Это структурированное Python-приложение для парсинга каталога товаров с сайта `elixirpeptide.ru`, сбора информации о вариантах товаров и их ценах. Это распределенное Python-приложение для мониторинга каталога товаров с сайта `elixirpeptide.ru`. Сервис автоматически отслеживает изменения в ценах и наличии товаров, и отправляет отчеты в Telegram.
## 🚀 Новые возможности (v2.0) ## 🚀 Архитектура (v3.0)
### ✅ Исправленные критические проблемы: Проект перешел от простого парсера к полноценному сервису мониторинга с асинхронной обработкой данных.
- **Устранено дублирование кода** в `engine.py` и `database.py`
- **Дополнены зависимости** в `requirements.txt` (pydantic, lxml, python-dotenv)
- **Улучшена обработка ошибок** с детальной диагностикой и retry механизмом
- **Добавлена валидация данных** на всех уровнях приложения
### 🎯 Новые функции: - **Точка входа**: `monitoring_service.py` — главный скрипт, запускаемый по расписанию (cron).
- **Retry стратегия** для HTTP запросов с экспоненциальной задержкой - **Оркестратор**: `src/orchestrator.py` управляет процессом парсинга.
- **Детальная статистика** выполнения парсинга - **Анализатор**: `src/analyzer.py` сравнивает данные последнего и предпоследнего запусков, выявляя изменения.
- **Валидация конфигурации** при запуске - **Уведомления**: `src/utils/telegram_sender.py` отправляет HTML-отчеты об изменениях в Telegram.
- **Поддержка переменных окружения** через `.env` файл - **Очередь сообщений**: Интеграция с **RabbitMQ** для асинхронного экспорта данных о товарах и логах.
- **Graceful degradation** - продолжение работы при частичных сбоях - **База данных**: **SQLite** используется для хранения истории парсинга и данных для анализа.
- **Улучшенное логирование** с категоризацией ошибок
### 🔧 Улучшения производительности: ## ✨ Ключевые возможности
- **Адаптивные таймауты** для HTTP запросов
- **Проверка на блокировку/капчу** в ответах сервера - **Автоматический мониторинг**: Запуск по расписанию для непрерывного отслеживания.
- **Оптимизированная обработка данных** с пропуском некорректных записей - **Отчеты об изменениях**: Уведомления в Telegram о новых, удаленных товарах, а также изменениях цен и наличия.
- **Асинхронный экспорт**: Отправка данных в RabbitMQ для дальнейшей обработки другими системами.
- **Надежность**: Механизм retry-запросов, детальное логирование и отказоустойчивость.
- **Гибкая конфигурация**: Все параметры настраиваются через `.env` файл.
## Структура Проекта ## Структура Проекта
Проект организован по принципу семантического разделения ответственности для удобства поддержки и дальнейшей разработки. - `monitoring_service.py`: **[NEW]** Главная точка входа для запуска по расписанию.
- `src/`: Основная директория с исходным кодом. - `src/`: Основная директория с исходным кодом.
- `config.py`: Все настройки (URL, селекторы, флаги сохранения). - `orchestrator.py`: **[NEW]** Управляет всем процессом парсинга.
- `main.py`: Точка входа в приложение, оркестратор процесса. - `analyzer.py`: **[NEW]** Анализирует данные и формирует отчеты.
- `core/`: Пакет с ядром приложения. - `main.py`: Точка входа для ручного запуска парсинга.
- `core/`: Ядро приложения.
- `settings.py`: **[ENHANCED]** Pydantic-модели для конфигурации (включая Telegram, RabbitMQ).
- `database.py`: Логика работы с базой данных SQLite. - `database.py`: Логика работы с базой данных SQLite.
- `logging_config.py`: Настройка системы логирования. - `rabbitmq.py`: **[NEW]** Клиент для работы с RabbitMQ.
- **`models.py`: [NEW FILE] Pydantic модели данных (ProductVariant, LogRecordModel).** - `scraper/`: Логика парсинга.
- **`settings.py`: [ENHANCED] Конфигурация с валидацией и поддержкой .env** - `engine.py`: Улучшенный движок парсера.
- `scraper/`: Пакет с логикой парсинга. - `utils/`: Вспомогательные утилиты.
- **`engine.py`: [ENHANCED] Класс Scraper с retry механизмом и улучшенной обработкой ошибок** - `telegram_sender.py`: **[NEW]** Отправка уведомлений в Telegram.
- `utils/`: Пакет со вспомогательными утилитами. - `requirements.txt`: **[UPDATED]** Обновленный список зависимостей.
- **`exporters.py`: [ENHANCED] Функции для сохранения данных с валидацией** - `.env.example`: Пример файла с переменными окружения.
- `requirements.txt`: Список зависимостей проекта. - `RABBITMQ_SETUP.md`: **[NEW]** Руководство по настройке RabbitMQ.
- `price_data_final/`: Директория для хранения результатов (создается автоматически).
- **`.env.example`: [NEW] Пример файла с переменными окружения**
## Установка и Запуск ## Установка и Запуск
### 1. Клонирование и настройка окружения ### 1. Клонирование и настройка окружения
```bash ```bash
git clone <your-repo-url> git clone https://gitea.bebesh.ru/busya/peptide-parcer.git
cd peptide_parser_project cd peptide-parcer
# Создание виртуального окружения # Создание виртуального окружения
python -m venv venv python -m venv venv
@@ -62,139 +59,68 @@ source venv/bin/activate # Для Windows: venv\Scripts\activate
pip install -r requirements.txt pip install -r requirements.txt
``` ```
### 2. Настройка конфигурации ### 2. Настройка RabbitMQ (Опционально)
#### Вариант A: Через переменные окружения Для асинхронного экспорта данных требуется RabbitMQ. Рекомендуется запуск через Docker:
```bash ```bash
# Создайте файл .env на основе .env.example docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
cp .env.example .env ```
Подробности см. в `RABBITMQ_SETUP.md`.
# Отредактируйте .env файл под ваши нужды ### 3. Настройка конфигурации
nano .env
Скопируйте `.env.example` в `.env` и заполните необходимые параметры.
```bash
cp .env.example .env
nano .env # Отредактируйте файл
``` ```
#### Вариант B: Прямое редактирование настроек **Ключевые переменные окружения:**
Отредактируйте `src/core/settings.py` для изменения настроек по умолчанию. - `TELEGRAM_BOT_TOKEN`: Токен вашего Telegram-бота.
- `TELEGRAM_CHAT_ID`: ID чата, куда будут приходить отчеты.
- `ENABLE_RABBITMQ_EXPORT`: `true` или `false` для включения/отключения экспорта в RabbitMQ.
- `RABBITMQ_HOST`, `RABBITMQ_PORT` и т.д.: Настройки подключения к RabbitMQ.
### 3. Запуск парсера ### 4. Запуск
#### Автоматический мониторинг (рекомендуется)
Настройте запуск `monitoring_service.py` через `cron` или планировщик задач.
```bash
# Пример для cron, запуск каждый час
# 0 * * * * /path/to/your/project/venv/bin/python /path/to/your/project/monitoring_service.py >> /path/to/your/project/logs/cron.log 2>&1
# Ручной запуск для проверки
python monitoring_service.py
```
#### Ручной запуск парсинга (без анализа и отчета)
```bash ```bash
python src/main.py python src/main.py
``` ```
## Конфигурация
### Переменные окружения (.env файл)
| Переменная | Описание | По умолчанию |
|------------|----------|--------------|
| `PARSER_BASE_URL` | Базовый URL сайта | `https://elixirpeptide.ru` |
| `PARSER_CATALOG_URL` | URL каталога товаров | `https://elixirpeptide.ru/catalog/` |
| `PARSER_SAVE_TO_CSV` | Сохранять в CSV | `true` |
| `PARSER_SAVE_TO_DB` | Сохранять в базу данных | `true` |
| `PARSER_LOG_TO_DB` | Логировать в базу данных | `true` |
| `PARSER_TIMEOUT` | Таймаут HTTP запросов (сек) | `30` |
| `PARSER_DELAY` | Задержка между запросами (сек) | `1.0` |
| `PARSER_RETRIES` | Максимум попыток для запросов | `3` |
### Настройки производительности
- **Таймаут запросов**: 30 секунд (настраивается)
- **Задержка между запросами**: 1 секунда (настраивается)
- **Retry стратегия**: 3 попытки с экспоненциальной задержкой
- **Graceful degradation**: Продолжение работы при ошибках отдельных запросов
## Результаты ## Результаты
### Файлы результатов ### Отчеты в Telegram
Сервис присылает в указанный чат отчет, если обнаруживает изменения:
- **💰 Изменились цены**: список товаров с новой и старой ценой.
- **📦 Изменилось наличие**: список товаров, которые появились или закончились.
- **✨ Новые товары**: список добавленных товаров.
- **🗑️ Удаленные товары**: список удаленных товаров.
- **CSV файл**: `price_data_final/prices_full_catalog_YYYY-MM-DD_HHMMSS.csv` ### Данные
- **База данных**: `price_data_final/parser_data.db` (SQLite) - **База данных**: `price_data_final/parser_data.db` — хранит всю историю парсинга.
- **CSV файл**: `price_data_final/prices_full_catalog_*.csv` — создается при ручном запуске.
### Структура данных - **RabbitMQ**: Сообщения с данными о товарах и логах отправляются в очереди `price_parser.products` и `price_parser.logs`.
```csv
name,volume,price
"Peptide X","30ml",1500
"Peptide Y","50ml",2500
```
### Логирование
- **Консольные логи**: Детальная информация о процессе парсинга
- **Логи в БД**: Если `PARSER_LOG_TO_DB=true`, все логи сохраняются в таблицу `logs`
## Обработка ошибок
### Типы обрабатываемых ошибок
1. **Сетевые ошибки**: Timeout, ConnectionError, HTTPError
2. **Ошибки парсинга**: Отсутствующие элементы, некорректные данные
3. **Ошибки файловой системы**: Права доступа, отсутствие директорий
4. **Ошибки базы данных**: SQLite ошибки, проблемы с подключением
### Стратегия восстановления
- **Retry механизм**: Автоматические повторные попытки для сетевых ошибок
- **Graceful degradation**: Пропуск проблемных записей с продолжением работы
- **Детальная диагностика**: Подробные логи для анализа проблем
## Мониторинг и статистика
### Статистика выполнения
Приложение выводит детальную статистику:
```
[FINAL_STATS] Время выполнения: 45.23 секунд
[FINAL_STATS] Успешность: 95/100 (95.0%)
[STATS] Успешно: 95, Ошибок: 5
```
### Метрики
- Общее количество URL для парсинга
- Количество успешно обработанных записей
- Количество ошибок
- Время выполнения
- Процент успешности
## Разработка
### Архитектурные принципы
1. **Разделение ответственности**: Каждый модуль отвечает за свою область
2. **Типизация**: Использование Pydantic для валидации данных
3. **Обработка ошибок**: Graceful handling с детальной диагностикой
4. **Конфигурируемость**: Гибкие настройки через переменные окружения
5. **Логирование**: Структурированное логирование на всех уровнях
### Добавление новых функций
1. **Новые форматы экспорта**: Добавьте функции в `src/utils/exporters.py`
2. **Новые селекторы**: Обновите `ScraperSelectors` в `src/core/settings.py`
3. **Новые поля данных**: Расширьте модель `ProductVariant` в `src/core/models.py`
## Устранение неполадок ## Устранение неполадок
### Частые проблемы 1. **Не приходят сообщения в Telegram**:
- Проверьте правильность `TELEGRAM_BOT_TOKEN` и `TELEGRAM_CHAT_ID`.
1. **"Не удается подключиться к базовому URL"** - Убедитесь, что бот добавлен в чат с нужными правами.
- Проверьте интернет-соединение 2. **Ошибка подключения к RabbitMQ**:
- Убедитесь, что сайт доступен - Проверьте, что RabbitMQ запущен и доступен.
- Проверьте настройки прокси - Убедитесь в корректности настроек в `.env`.
3. **Ошибки парсинга**:
2. **"Не найдено ни одной ссылки на товар"** - Проверьте доступность сайта `elixirpeptide.ru`.
- Проверьте CSS селекторы в настройках - Возможно, изменилась структура HTML-страниц. В этом случае нужно обновить CSS-селекторы в `src/core/settings.py`.
- Убедитесь, что структура сайта не изменилась
3. **"Ошибка при сохранении в БД"**
- Проверьте права доступа к директории
- Убедитесь, что SQLite поддерживается
### Логи для диагностики
Все ошибки логируются с детальной информацией. Проверьте:
- Консольные логи при запуске
- Логи в базе данных (если включено)
- Файлы результатов для проверки данных

View File

@@ -3,12 +3,18 @@
# <IMPORTS> # <IMPORTS>
import logging import logging
import asyncio import asyncio
import sys
import os
from datetime import datetime from datetime import datetime
from src.orchestrator import AppOrchestrator
from src.core.settings import settings # Добавляем 'src' в sys.path, чтобы внутренние импорты (e.g., `from core...`) работали
from src.core.logging_config import setup_logging sys.path.insert(0, os.path.join(os.path.abspath(os.path.dirname(__file__)), 'src'))
from src.analyzer import DataAnalyzer
from src.utils.telegram_sender import send_telegram_notification from orchestrator import AppOrchestrator
from core.settings import settings
from core.logging_config import setup_logging
from analyzer import DataAnalyzer
from utils.telegram_sender import send_telegram_notification
# </IMPORTS> # </IMPORTS>
# <CONTRACT for="main"> # <CONTRACT for="main">

View File

@@ -2,28 +2,36 @@
# <IMPORTS> # <IMPORTS>
import logging import logging
from typing import List, Dict, Tuple import sqlite3
from sqlalchemy.orm import sessionmaker from typing import List, Dict, Tuple, Any
from src.core.models import ProductVariant, ParsingRun
from sqlalchemy import create_engine
from src.core.settings import settings from src.core.settings import settings
# </IMPORTS> # </IMPORTS>
# <MAIN_CONTRACT for="DataAnalyzer"> # <MAIN_CONTRACT for="DataAnalyzer">
# description: "Анализирует данные двух последних запусков парсера и выявляет изменения." # description: "Анализирует данные двух последних запусков парсера и выявляет изменения, используя прямые SQL-запросы."
# </MAIN_CONTRACT> # </MAIN_CONTRACT>
class DataAnalyzer: class DataAnalyzer:
# <INIT> # <INIT>
def __init__(self): def __init__(self):
self.engine = create_engine(settings.db_path.absolute().as_uri()) self.db_path = settings.db_path
self.Session = sessionmaker(bind=self.engine)
self.logger = logging.getLogger(__name__) self.logger = logging.getLogger(__name__)
# </INIT> # </INIT>
def _execute_query(self, query: str, params: tuple = ()) -> List[Any]:
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute(query, params)
return cursor.fetchall()
except sqlite3.Error as e:
self.logger.error(f"Ошибка выполнения SQL-запроса: {e}")
return []
# <HELPER name="_get_last_two_runs"> # <HELPER name="_get_last_two_runs">
def _get_last_two_runs(self, session) -> Tuple[str, str]: def _get_last_two_runs(self) -> Tuple[str, str]:
"""Возвращает ID двух последних успешных запусков.""" """Возвращает ID двух последних успешных запусков."""
runs = session.query(ParsingRun.run_id).order_by(ParsingRun.start_time.desc()).limit(2).all() query = "SELECT run_id FROM parsing_runs ORDER BY start_time DESC LIMIT 2"
runs = self._execute_query(query)
if len(runs) < 2: if len(runs) < 2:
self.logger.warning("Найдено менее двух запусков. Сравнение невозможно.") self.logger.warning("Найдено менее двух запусков. Сравнение невозможно.")
return None, None return None, None
@@ -31,10 +39,12 @@ class DataAnalyzer:
# </HELPER> # </HELPER>
# <HELPER name="_get_data_for_run"> # <HELPER name="_get_data_for_run">
def _get_data_for_run(self, session, run_id: str) -> Dict[str, ProductVariant]: def _get_data_for_run(self, run_id: str) -> Dict[str, Dict]:
"""Возвращает данные о товарах для указанного запуска в виде словаря {url: ProductVariant}.""" """Возвращает данные о товарах для указанного запуска в виде словаря {url: product_data}."""
variants = session.query(ProductVariant).filter(ProductVariant.run_id == run_id).all() query = "SELECT name, volume, price, url, is_in_stock FROM products WHERE run_id = ?"
return {v.url: v for v in variants} rows = self._execute_query(query, (run_id,))
# Преобразуем sqlite3.Row в словари
return {row[3]: dict(zip(['name', 'volume', 'price', 'url', 'is_in_stock'], row)) for row in rows}
# </HELPER> # </HELPER>
# <ACTION name="analyze"> # <ACTION name="analyze">
@@ -42,42 +52,35 @@ class DataAnalyzer:
""" """
Сравнивает два последних запуска и генерирует HTML-отчет об изменениях. Сравнивает два последних запуска и генерирует HTML-отчет об изменениях.
""" """
# <CORE_LOGIC> current_run_id, prev_run_id = self._get_last_two_runs()
with self.Session() as session: if not current_run_id or not prev_run_id:
current_run_id, prev_run_id = self._get_last_two_runs(session) return ""
if not current_run_id or not prev_run_id:
return ""
self.logger.info(f"Сравнение запуска {current_run_id} с предыдущим з<EFBFBD><EFBFBD>пуском {prev_run_id}") self.logger.info(f"Сравнение запуска {current_run_id} с предыдущим запуском {prev_run_id}")
current_data = self._get_data_for_run(session, current_run_id) current_data = self._get_data_for_run(current_run_id)
prev_data = self._get_data_for_run(session, prev_run_id) prev_data = self._get_data_for_run(prev_run_id)
price_changes = [] price_changes = []
availability_changes = [] availability_changes = []
new_items = [] new_items = []
removed_items = [] removed_items = []
# Поиск изменений и новых товаров for url, current_item in current_data.items():
for url, current_item in current_data.items(): prev_item = prev_data.get(url)
prev_item = prev_data.get(url) if prev_item:
if prev_item: if current_item['price'] != prev_item['price']:
# Изменение цены price_changes.append((current_item, prev_item['price']))
if current_item.price != prev_item.price: if current_item['is_in_stock'] != prev_item['is_in_stock']:
price_changes.append((current_item, prev_item.price)) availability_changes.append(current_item)
# Изменение статуса наличия else:
if current_item.is_available != prev_item.is_available: new_items.append(current_item)
availability_changes.append(current_item)
else:
new_items.append(current_item)
# Поиск удаленных товаров for url, prev_item in prev_data.items():
for url, prev_item in prev_data.items(): if url not in current_data:
if url not in current_data: removed_items.append(prev_item)
removed_items.append(prev_item)
return self._format_report(price_changes, availability_changes, new_items, removed_items) return self._format_report(price_changes, availability_changes, new_items, removed_items)
# </CORE_LOGIC>
# </ACTION> # </ACTION>
# <HELPER name="_format_report"> # <HELPER name="_format_report">
@@ -92,25 +95,24 @@ class DataAnalyzer:
if price_changes: if price_changes:
report_parts.append("\n<b>💰 Изменились цены:</b>") report_parts.append("\n<b>💰 Изменились цены:</b>")
for item, old_price in price_changes: for item, old_price in price_changes:
report_parts.append(f" • <a href='{item.url}'>{item.name} ({item.volume})</a>: {old_price} ₽ → <b>{item.price} ₽</b>") report_parts.append(f" • <a href='{item['url']}'>{item['name']} ({item['volume']})</a>: {old_price} ₽ → <b>{item['price']} ₽</b>")
if availability_changes: if availability_changes:
report_parts.append("\n<b>📦 Изменилось наличие:</b>") report_parts.append("\n<b>📦 Изменилось наличие:</b>")
for item in availability_changes: for item in availability_changes:
status = "В наличии" if item.is_available else "❌ Нет в наличии" status = "В наличии" if item['is_in_stock'] else "❌ Нет в наличии"
report_parts.append(f" • <a href='{item.url}'>{item.name} ({item.volume})</a>: <b>{status}</b>") report_parts.append(f" • <a href='{item['url']}'>{item['name']} ({item['volume']})</a>: <b>{status}</b>")
if new_items: if new_items:
report_parts.append("\n<b>✨ Новые товары:</b>") report_parts.append("\n<b>✨ Новые товары:</b>")
for item in new_items: for item in new_items:
report_parts.append(f" • <a href='{item.url}'>{item.name} ({item.volume})</a> - {item.price}") report_parts.append(f" • <a href='{item['url']}'>{item['name']} ({item['volume']})</a> - {item['price']}")
if removed_items: if removed_items:
report_parts.append("\n<b>🗑️ Удаленные товары:</b>") report_parts.append("\n<b>🗑️ Удаленные товары:</b>")
for item in removed_items: for item in removed_items:
report_parts.append(f" • <a href='{item.url}'>{item.name} ({item.volume})</a>") report_parts.append(f" • <a href='{item['url']}'>{item['name']} ({item['volume']})</a>")
return "\n".join(report_parts) return "\n".join(report_parts)
# </HELPER> # </HELPER>
# <COHERENCE_CHECK status="PASSED" /> # <COHERENCE_CHECK status="PASSED" />
/>

View File

@@ -102,6 +102,13 @@ def init_database(db_path: Path, run_id: str):
db_path.parent.mkdir(parents=True, exist_ok=True) db_path.parent.mkdir(parents=True, exist_ok=True)
with sqlite3.connect(db_path) as con: with sqlite3.connect(db_path) as con:
cur = con.cursor() cur = con.cursor()
cur.execute("""
CREATE TABLE IF NOT EXISTS parsing_runs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
run_id TEXT NOT NULL UNIQUE,
start_time TIMESTAMP NOT NULL
)
""")
cur.execute(""" cur.execute("""
CREATE TABLE IF NOT EXISTS products ( CREATE TABLE IF NOT EXISTS products (
id INTEGER PRIMARY KEY AUTOINCREMENT, id INTEGER PRIMARY KEY AUTOINCREMENT,
@@ -111,7 +118,8 @@ def init_database(db_path: Path, run_id: str):
price INTEGER NOT NULL, price INTEGER NOT NULL,
url TEXT, url TEXT,
is_in_stock BOOLEAN, is_in_stock BOOLEAN,
parsed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP parsed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (run_id) REFERENCES parsing_runs (run_id)
) )
""") """)
cur.execute(""" cur.execute("""
@@ -120,7 +128,8 @@ def init_database(db_path: Path, run_id: str):
run_id TEXT NOT NULL, run_id TEXT NOT NULL,
timestamp TEXT NOT NULL, timestamp TEXT NOT NULL,
level TEXT NOT NULL, level TEXT NOT NULL,
message TEXT NOT NULL message TEXT NOT NULL,
FOREIGN KEY (run_id) REFERENCES parsing_runs (run_id)
) )
""") """)
con.commit() con.commit()

View File

@@ -12,12 +12,7 @@ import pika
from pika.adapters.blocking_connection import BlockingChannel from pika.adapters.blocking_connection import BlockingChannel
from pika.exceptions import AMQPConnectionError, AMQPChannelError, ConnectionClosed from pika.exceptions import AMQPConnectionError, AMQPChannelError, ConnectionClosed
from .settings import ( from .settings import settings
RABBITMQ_HOST, RABBITMQ_PORT, RABBITMQ_USERNAME, RABBITMQ_PASSWORD,
RABBITMQ_VIRTUAL_HOST, RABBITMQ_CONNECTION_TIMEOUT, RABBITMQ_HEARTBEAT,
RABBITMQ_BLOCKED_CONNECTION_TIMEOUT, RABBITMQ_PRODUCTS_QUEUE,
RABBITMQ_LOGS_QUEUE, RABBITMQ_EXCHANGE
)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -41,17 +36,17 @@ class RabbitMQConnection:
Returns: Returns:
pika.ConnectionParameters: Параметры подключения pika.ConnectionParameters: Параметры подключения
""" """
credentials = pika.PlainCredentials(RABBITMQ_USERNAME, RABBITMQ_PASSWORD) credentials = pika.PlainCredentials(settings.rabbitmq_user, settings.rabbitmq_password)
return pika.ConnectionParameters( return pika.ConnectionParameters(
host=RABBITMQ_HOST, host=settings.rabbitmq_host,
port=RABBITMQ_PORT, port=settings.rabbitmq_port,
virtual_host=RABBITMQ_VIRTUAL_HOST, virtual_host=settings.rabbitmq_vhost,
credentials=credentials, credentials=credentials,
connection_attempts=3, connection_attempts=3,
retry_delay=5, retry_delay=5,
socket_timeout=RABBITMQ_CONNECTION_TIMEOUT, socket_timeout=30, # Hardcoded for now
heartbeat=RABBITMQ_HEARTBEAT, heartbeat=600, # Hardcoded for now
blocked_connection_timeout=RABBITMQ_BLOCKED_CONNECTION_TIMEOUT blocked_connection_timeout=300 # Hardcoded for now
) )
def connect(self) -> bool: def connect(self) -> bool:

View File

@@ -82,6 +82,17 @@ class Settings(BaseModel):
telegram_chat_id: str = Field(default=os.getenv('TELEGRAM_CHAT_ID', ''), description="ID чата для отправки уведомлений") telegram_chat_id: str = Field(default=os.getenv('TELEGRAM_CHAT_ID', ''), description="ID чата для отправки уведомлений")
# </CONFIG> # </CONFIG>
# <CONFIG name="rabbitmq_settings">
rabbitmq_host: str = Field(default=os.getenv('RABBITMQ_HOST', 'localhost'))
rabbitmq_port: int = Field(default=int(os.getenv('RABBITMQ_PORT', 5672)))
rabbitmq_user: str = Field(default=os.getenv('RABBITMQ_USERNAME', 'guest'))
rabbitmq_password: str = Field(default=os.getenv('RABBITMQ_PASSWORD', 'guest'))
rabbitmq_vhost: str = Field(default=os.getenv('RABBITMQ_VIRTUAL_HOST', '/'))
rabbitmq_products_queue: str = Field(default=os.getenv('RABBITMQ_PRODUCTS_QUEUE', 'price_parser.products'))
rabbitmq_logs_queue: str = Field(default=os.getenv('RABBITMQ_LOGS_QUEUE', 'price_parser.logs'))
rabbitmq_exchange: str = Field(default=os.getenv('RABBITMQ_EXCHANGE', 'price_parser.exchange'))
# </CONFIG>
# <CONFIG name="selectors_config_instance"> # <CONFIG name="selectors_config_instance">
selectors: ScraperSelectors = ScraperSelectors( selectors: ScraperSelectors = ScraperSelectors(
CATALOG_PRODUCT_LINK='.product-card h4 a.product-link', CATALOG_PRODUCT_LINK='.product-card h4 a.product-link',

View File

@@ -16,10 +16,13 @@ from concurrent.futures import ThreadPoolExecutor, as_completed
from core.settings import Settings, ENABLE_RABBITMQ_EXPORT, ENABLE_CSV_EXPORT, ENABLE_DATABASE_EXPORT from core.settings import Settings, ENABLE_RABBITMQ_EXPORT, ENABLE_CSV_EXPORT, ENABLE_DATABASE_EXPORT
from core.models import ProductVariant from core.models import ProductVariant
from core.database import init_database, save_data_to_db, DatabaseManager from core.database import DatabaseManager, init_database, save_data_to_db
from core.logging_config import setup_logging from core.rabbitmq import RabbitMQExporter
from scraper.engine import Scraper from scraper.engine import Scraper
from utils.exporters import save_data_to_csv, export_data_to_rabbitmq, export_logs_to_rabbitmq, validate_rabbitmq_connection from utils.exporters import save_data_to_csv
from core.logging_config import setup_logging
from datetime import datetime
import sqlite3
# </IMPORTS> # </IMPORTS>
# <MAIN_CONTRACT for="AppOrchestrator"> # <MAIN_CONTRACT for="AppOrchestrator">
@@ -55,12 +58,24 @@ class AppOrchestrator:
# </DEPENDENCY> # </DEPENDENCY>
self.logger = logging.getLogger(self.__class__.__name__) self.logger = logging.getLogger(self.__class__.__name__)
# </INIT> self._setup()
def _register_run(self):
"""Регистрирует текущий запуск в базе данных."""
if not self.db_manager:
return
try:
with self.db_manager as conn:
conn.execute(
"INSERT INTO parsing_runs (run_id, start_time) VALUES (?, ?)",
(self.run_id, datetime.now())
)
conn.commit()
self.logger.info(f"Запуск {self.run_id} зарегистрирован в БД.")
except sqlite3.Error as e:
self.logger.error(f"Не удалось зарегистрировать запуск {self.run_id} в БД: {e}")
raise
# <CONTRACT for="_error_context">
# description: "Контекстный менеджер для централизованной обработки ошибок в ключевых операциях."
# </CONTRACT>
# <HELPER name="_error_context">
@contextmanager @contextmanager
def _error_context(self, operation: str): def _error_context(self, operation: str):
"""Контекстный менеджер для обработки ошибок с детальной диагностикой.""" """Контекстный менеджер для обработки ошибок с детальной диагностикой."""
@@ -70,7 +85,6 @@ class AppOrchestrator:
self.logger.error(f"[{operation.upper()}] Ошибка в операции '{operation}': {e}", exc_info=True) self.logger.error(f"[{operation.upper()}] Ошибка в операции '{operation}': {e}", exc_info=True)
self._log_error_details(operation, e) self._log_error_details(operation, e)
raise raise
# </HELPER>
# <CONTRACT for="_log_error_details"> # <CONTRACT for="_log_error_details">
# description: "Логирует расширенную информацию об ошибке для упрощения диагностики." # description: "Логирует расширенную информацию об ошибке для упрощения диагностики."
@@ -109,10 +123,60 @@ class AppOrchestrator:
setup_logging(self.run_id, self.db_manager) setup_logging(self.run_id, self.db_manager)
if ENABLE_RABBITMQ_EXPORT: if ENABLE_RABBITMQ_EXPORT:
if validate_rabbitmq_connection(): self.rabbitmq_exporter = RabbitMQExporter()
if self.rabbitmq_exporter.connection.connect():
self.logger.info("[ACTION:_setup] Подключение к RabbitMQ доступно") self.logger.info("[ACTION:_setup] Подключение к RabbitMQ доступно")
else: else:
self.logger.warning("[ACTION:_setup] Подключение к RabbitMQ недоступно, экспорт будет пропущен") self.logger.warning("[ACTION:_setup] Подключение к RabbitMQ недоступно, экспорт будет пропущен")
self.rabbitmq_exporter = None
self.logger.info(f"[ACTION:_setup] Оркестратор запущен. Архитектура v2.0. Run ID: {self.run_id}")
# </CORE_LOGIC>
# </ACTION>
# <CONTRACT for="_log_error_details">
# description: "Логирует расширенную информацию об ошибке для упрощения диагностики."
# </CONTRACT>
# <HELPER name="_log_error_details">
def _log_error_details(self, operation: str, error: Exception):
"""Логирует детальную информацию об ошибке."""
error_info = {
'operation': operation,
'error_type': type(error).__name__,
'error_message': str(error),
'run_id': self.run_id,
'timestamp': datetime.now().isoformat(),
'stats': self.stats.copy()
}
self.logger.error(f"[HELPER:_log_error_details] Детали ошибки: {error_info}")
# </HELPER>
# <CONTRACT for="_setup">
# description: "Шаг 0: Инициализация всех систем (БД, логирование, RabbitMQ)."
# </CONTRACT>
# <ACTION name="_setup">
def _setup(self):
"""Инициализация всех систем перед началом парсинга."""
with self._error_context("setup"):
# <CORE_LOGIC>
self.stats['start_time'] = datetime.now()
self.logger.info(f"[ACTION:_setup] Запуск инициализации систем. Run ID: {self.run_id}")
if self.settings.save_to_db or self.settings.log_to_db:
self.settings.output_dir.mkdir(parents=True, exist_ok=True)
self.db_manager = DatabaseManager(self.settings.db_path)
init_database(self.db_manager.db_path, self.run_id)
# <DEPENDENCY name="setup_logging" />
setup_logging(self.run_id, self.db_manager)
if ENABLE_RABBITMQ_EXPORT:
self.rabbitmq_exporter = RabbitMQExporter()
if self.rabbitmq_exporter.connection.connect():
self.logger.info("[ACTION:_setup] Подключение к RabbitMQ доступно")
else:
self.logger.warning("[ACTION:_setup] Подключение к RabbitMQ недоступно, экспорт будет пропущен")
self.rabbitmq_exporter = None
self.logger.info(f"[ACTION:_setup] Оркестратор запущен. Архитектура v2.0. Run ID: {self.run_id}") self.logger.info(f"[ACTION:_setup] Оркестратор запущен. Архитектура v2.0. Run ID: {self.run_id}")
# </CORE_LOGIC> # </CORE_LOGIC>
@@ -253,12 +317,10 @@ class AppOrchestrator:
except Exception as e: except Exception as e:
self.logger.error(f"[ACTION:_save_results] Ошибка при сохранении в БД: {e}") self.logger.error(f"[ACTION:_save_results] Ошибка при сохранении в БД: {e}")
if ENABLE_RABBITMQ_EXPORT: if ENABLE_RABBITMQ_EXPORT and self.rabbitmq_exporter:
# <DEPENDENCY name="export_data_to_rabbitmq" />
# ... (logic remains the same)
try: try:
data_to_rabbitmq = [p.model_dump() for p in self.final_data] data_to_rabbitmq = [p.model_dump() for p in self.final_data]
if export_data_to_rabbitmq(data_to_rabbitmq, self.run_id, self.run_id): if self.rabbitmq_exporter.export_products(data_to_rabbitmq, self.run_id):
self.logger.info("[ACTION:_save_results] Данные успешно экспортированы в RabbitMQ.") self.logger.info("[ACTION:_save_results] Данные успешно экспортированы в RabbitMQ.")
else: else:
self.logger.error("[ACTION:_save_results] Не удалось экспортировать данные в RabbitMQ.") self.logger.error("[ACTION:_save_results] Не удалось экспортировать данные в RabbitMQ.")
@@ -288,6 +350,10 @@ class AppOrchestrator:
self.db_manager.close() self.db_manager.close()
self.logger.debug("[ACTION:_cleanup] Соединение с базой данных закрыто.") self.logger.debug("[ACTION:_cleanup] Соединение с базой данных закрыто.")
if self.rabbitmq_exporter:
self.rabbitmq_exporter.close()
self.logger.debug("[ACTION:_cleanup] Соединение с RabbitMQ закрыто.")
if duration: if duration:
self.logger.info(f"[STATS][ACTION:_cleanup] Время выполнения: {duration.total_seconds():.2f} секунд") self.logger.info(f"[STATS][ACTION:_cleanup] Время выполнения: {duration.total_seconds():.2f} секунд")
self.logger.info(f"[STATS][ACTION:_cleanup] Успешность: {self.stats['successful_parses']}/{self.stats['total_urls']} ({self.stats['successful_parses']/self.stats['total_urls']*100:.1f}%)") self.logger.info(f"[STATS][ACTION:_cleanup] Успешность: {self.stats['successful_parses']}/{self.stats['total_urls']} ({self.stats['successful_parses']/self.stats['total_urls']*100:.1f}%)")
@@ -304,8 +370,12 @@ class AppOrchestrator:
# </CONTRACT> # </CONTRACT>
# <ENTRYPOINT name="run"> # <ENTRYPOINT name="run">
def run(self): def run(self):
"""Основной метод, запускающий весь процесс парсинга.""" """Основной метод, запускающий все этапы работы оркестратора."""
self.logger.info("="*50) self.logger.info(f"Запуск оркестратора. Run ID: {self.run_id}")
if self.db_manager:
self._register_run()
# ... остальной код ...
self.logger.info("[ENTRYPOINT:run] Запуск главного процесса оркестратора.") self.logger.info("[ENTRYPOINT:run] Запуск главного процесса оркестратора.")
self.logger.info("="*50) self.logger.info("="*50)