add monitoring service & Telegram sender

This commit is contained in:
2025-07-20 08:34:28 +03:00
parent 0259289ee9
commit ae9c61facc
5 changed files with 276 additions and 5 deletions

70
monitoring_service.py Normal file
View File

@@ -0,0 +1,70 @@
# <MODULE name="monitoring_service" semantics="main_entrypoint_for_cron" />
# <IMPORTS>
import logging
import asyncio
from datetime import datetime
from src.orchestrator import AppOrchestrator
from src.core.settings import settings
from src.core.logging_config import setup_logging
from src.analyzer import DataAnalyzer
from src.utils.telegram_sender import send_telegram_notification
# </IMPORTS>
# <CONTRACT for="main">
# description: "Главная точка входа для сервиса мониторинга. Запускается по расписанию (cron)."
# postconditions:
# - "Парсер запускается, данные сохраняются в БД."
# - "Если обнаружены изменения, отправляется отчет в Telegram."
# </CONTRACT>
# <ENTRYPOINT name="main">
async def main():
"""
Основная асинхронная функция, которая запускает парсер,
анализирует данные и отправляет уведомление.
"""
run_id = datetime.now().strftime("%Y%m%d-%H%M%S")
setup_logging(run_id=run_id)
logger = logging.getLogger(__name__)
logger.info(f"🚀 Запуск сервиса мониторинга. Run ID: {run_id}")
try:
# 1. Запуск парсера
logger.info("Начало этапа парсинга...")
orchestrator = AppOrchestrator(settings=settings)
orchestrator.run()
logger.info("Этап парсинга завершен.")
# 2. Анализ данных
logger.info("Начало этапа анализа данных...")
analyzer = DataAnalyzer()
report_message = analyzer.analyze()
logger.info("Этап анализа данных завершен.")
# 3. Отправка отчета
if report_message:
logger.info("Обнаружены изменения, отправка отчета в Telegram...")
await send_telegram_notification(report_message)
else:
logger.info("Изменений не найдено, отправка отчета не требуется.")
except Exception as e:
logger.critical(f"💥 Критическая ошибка в сервисе мониторинга: {e}", exc_info=True)
# Попытка отправить уведомление об ошибке
try:
error_message = f"<b>❗️ Критическая ошибка в сервисе мониторинга</b>\n\n<pre>{e}</pre>"
await send_telegram_notification(error_message)
except Exception as tg_e:
logger.error(f"Не удалось даже отправить уведомление об ошибке в Telegram: {tg_e}")
logger.info("✅ Сервис мониторинга завершил работу.")
# <MAIN_CONTRACT>
# description: "Стандартный блок для запуска main() при выполнении скрипта."
# </MAIN_CONTRACT>
if __name__ == "__main__":
# Используем asyncio.run() для запуска асинхронной функции main
asyncio.run(main())
# <COHERENCE_CHECK status="PASSED" />

View File

@@ -17,4 +17,12 @@ python-dotenv>=1.0.0
# ANCHOR: RabbitMQ_Dependencies
# Семантика: Зависимости для работы с очередью сообщений RabbitMQ
pika>=1.3.0
pika>=1.3.0
# ANCHOR: Database_Dependencies
# Семантика: Зависимости для работы с базой данных
SQLAlchemy>=2.0.0
# ANCHOR: Telegram_Dependencies
# Семантика: Зависимости для отправки уведомлений в Telegram
python-telegram-bot>=21.0.0

116
src/analyzer.py Normal file
View File

@@ -0,0 +1,116 @@
# <MODULE name="analyzer" semantics="data_comparison_logic" />
# <IMPORTS>
import logging
from typing import List, Dict, Tuple
from sqlalchemy.orm import sessionmaker
from src.core.models import ProductVariant, ParsingRun
from sqlalchemy import create_engine
from src.core.settings import settings
# </IMPORTS>
# <MAIN_CONTRACT for="DataAnalyzer">
# description: "Анализирует данные двух последних запусков парсера и выявляет изменения."
# </MAIN_CONTRACT>
class DataAnalyzer:
# <INIT>
def __init__(self):
self.engine = create_engine(settings.db_path.absolute().as_uri())
self.Session = sessionmaker(bind=self.engine)
self.logger = logging.getLogger(__name__)
# </INIT>
# <HELPER name="_get_last_two_runs">
def _get_last_two_runs(self, session) -> Tuple[str, str]:
"""Возвращает ID двух последних успешных запусков."""
runs = session.query(ParsingRun.run_id).order_by(ParsingRun.start_time.desc()).limit(2).all()
if len(runs) < 2:
self.logger.warning("Найдено менее двух запусков. Сравнение невозможно.")
return None, None
return runs[0][0], runs[1][0] # (current_run_id, previous_run_id)
# </HELPER>
# <HELPER name="_get_data_for_run">
def _get_data_for_run(self, session, run_id: str) -> Dict[str, ProductVariant]:
"""Возвращает данные о товарах для указанного запуска в виде словаря {url: ProductVariant}."""
variants = session.query(ProductVariant).filter(ProductVariant.run_id == run_id).all()
return {v.url: v for v in variants}
# </HELPER>
# <ACTION name="analyze">
def analyze(self) -> str:
"""
Сравнивает два последних запуска и генерирует HTML-отчет об изменениях.
"""
# <CORE_LOGIC>
with self.Session() as session:
current_run_id, prev_run_id = self._get_last_two_runs(session)
if not current_run_id or not prev_run_id:
return ""
self.logger.info(f"Сравнение запуска {current_run_id} с предыдущим з<><D0B7>пуском {prev_run_id}")
current_data = self._get_data_for_run(session, current_run_id)
prev_data = self._get_data_for_run(session, prev_run_id)
price_changes = []
availability_changes = []
new_items = []
removed_items = []
# Поиск изменений и новых товаров
for url, current_item in current_data.items():
prev_item = prev_data.get(url)
if prev_item:
# Изменение цены
if current_item.price != prev_item.price:
price_changes.append((current_item, prev_item.price))
# Изменение статуса наличия
if current_item.is_available != prev_item.is_available:
availability_changes.append(current_item)
else:
new_items.append(current_item)
# Поиск удаленных товаров
for url, prev_item in prev_data.items():
if url not in current_data:
removed_items.append(prev_item)
return self._format_report(price_changes, availability_changes, new_items, removed_items)
# </CORE_LOGIC>
# </ACTION>
# <HELPER name="_format_report">
def _format_report(self, price_changes, availability_changes, new_items, removed_items) -> str:
"""Форматирует найденные изменения в красивый HTML-отчет."""
if not any([price_changes, availability_changes, new_items, removed_items]):
self.logger.info("Изменений не найдено.")
return ""
report_parts = ["<b>📈 Отчет об изменениях на сайте</b>\n"]
if price_changes:
report_parts.append("\n<b>💰 Изменились цены:</b>")
for item, old_price in price_changes:
report_parts.append(f" • <a href='{item.url}'>{item.name} ({item.volume})</a>: {old_price} ₽ → <b>{item.price} ₽</b>")
if availability_changes:
report_parts.append("\n<b>📦 Изменилось наличие:</b>")
for item in availability_changes:
status = "В наличии" if item.is_available else "❌ Нет в наличии"
report_parts.append(f" • <a href='{item.url}'>{item.name} ({item.volume})</a>: <b>{status}</b>")
if new_items:
report_parts.append("\n<b>✨ Новые товары:</b>")
for item in new_items:
report_parts.append(f" • <a href='{item.url}'>{item.name} ({item.volume})</a> - {item.price}")
if removed_items:
report_parts.append("\n<b>🗑️ Удаленные товары:</b>")
for item in removed_items:
report_parts.append(f" • <a href='{item.url}'>{item.name} ({item.volume})</a>")
return "\n".join(report_parts)
# </HELPER>
# <COHERENCE_CHECK status="PASSED" />
/>

View File

@@ -77,14 +77,19 @@ class Settings(BaseModel):
num_parser_threads: int = Field(default=int(os.getenv('PARSER_THREADS', 5)), description="Количество потоков для парсинга")
# </CONFIG>
# <CONFIG name="telegram_settings">
telegram_bot_token: str = Field(default=os.getenv('TELEGRAM_BOT_TOKEN', ''), description="Токен для Telegram бота")
telegram_chat_id: str = Field(default=os.getenv('TELEGRAM_CHAT_ID', ''), description="ID чата для отправки уведомлений")
# </CONFIG>
# <CONFIG name="selectors_config_instance">
selectors: ScraperSelectors = ScraperSelectors(
CATALOG_PRODUCT_LINK='.product-card h4 a.product-link',
VARIANT_LIST_ITEM='.product-version-select li',
VARIANT_LIST_ITEM='.product-version-select li, .product-variants-list .variant-item',
PRODUCT_PAGE_NAME='h1.product-h1',
ACTIVE_VOLUME='.product-version-select li.active',
PRICE_BLOCK='.product-sale-box .price span, .price-value, .product-price, .price',
PRODUCT_UNAVAILABLE='.product-unavailable, .out-of-stock-message, .unavailable-message, .stock-status.out-of-stock, li.not-available, div.disabled',
ACTIVE_VOLUME='.product-version-select li.active, .variant-item.active',
PRICE_BLOCK='.price-value, .product-price .price, .product-sale-box .price span',
PRODUCT_UNAVAILABLE='.product-unavailable, .out-of-stock, .unavailable, .stock.status-0',
)
# </CONFIG>

View File

@@ -0,0 +1,72 @@
# <MODULE name="utils.telegram_sender" semantics="telegram_notifications" />
# <IMPORTS>
import logging
import asyncio
from telegram import Bot
from telegram.error import TelegramError
from src.core.settings import settings
# </IMPORTS>
# <MAIN_CONTRACT for="TelegramSender">
# description: "Отправляет сообщения в Telegram, используя токен и ID чата из настроек."
# invariant: "Токен и ID чата должны быть заданы в конфигурации."
# </MAIN_CONTRACT>
class TelegramSender:
# <INIT>
def __init__(self):
if not settings.telegram_bot_token or not settings.telegram_chat_id:
raise ValueError("TELEGRAM_BOT_TOKEN и TELEGRAM_CHAT_ID должны быть установлены в .env")
self.bot = Bot(token=settings.telegram_bot_token)
self.chat_id = settings.telegram_chat_id
self.logger = logging.getLogger(__name__)
# </INIT>
# <CONTRACT for="send_message">
# description: "Асинхронно отправляет текстовое сообщение <20><> Telegram."
# preconditions:
# - "Сообщение (message) должно быть непустой строкой."
# postconditions:
# - "Сообщение отправлено в чат."
# exceptions:
# - "TelegramError: при ошибках API Telegram."
# - "Exception: при других ошибках."
# </CONTRACT>
# <ACTION name="send_message">
async def send_message(self, message: str):
"""Асинхронно отправляет сообщение в Telegram."""
# <CORE_LOGIC>
try:
self.logger.info(f"Отправка сообщения в Telegram (чат ID: {self.chat_id})...")
await self.bot.send_message(
chat_id=self.chat_id,
text=message,
parse_mode='HTML'
)
self.logger.info("Сообщение в Telegram успешно отправлено.")
except TelegramError as e:
self.logger.error(f"Ошибка отправки сообщения в Telegram: {e}")
raise
except Exception as e:
self.logger.error(f"Непредвиденная ошибка при отправке сообщения в Telegram: {e}")
raise
# </CORE_LOGIC>
# </ACTION>
# <HELPER name="send_telegram_notification">
# description: "Удобная обертка для инициализации и отправки сообщения."
# </HELPER>
async def send_telegram_notification(message: str):
"""
Инициализирует TelegramSender и отправляет сообщение.
Является удобной оберткой для вызова из других частей приложения.
"""
try:
sender = TelegramSender()
await sender.send_message(message)
except ValueError as e:
logging.error(f"Ошибка инициализации TelegramSender: {e}")
except Exception as e:
logging.error(f"Не удалось отправить уведомление в Telegram: {e}")
# <COHERENCE_CHECK status="PASSED" />