Compare commits

...

4 Commits

Author SHA1 Message Date
cf52a7c0ad fix(logging): Ensure file logger is correctly initialized 2025-07-20 08:35:40 +03:00
ae9c61facc add monitoring service & Telegram sender 2025-07-20 08:34:28 +03:00
0259289ee9 + 2025-07-19 01:04:04 +03:00
fe1a022609 multithread 2025-07-19 01:03:47 +03:00
11 changed files with 418 additions and 76 deletions

3
.gitignore vendored
View File

@@ -20,3 +20,6 @@ price_data_final/
#backups #backups
*.bak *.bak
# Logs
/logs/

70
monitoring_service.py Normal file
View File

@@ -0,0 +1,70 @@
# <MODULE name="monitoring_service" semantics="main_entrypoint_for_cron" />
# <IMPORTS>
import logging
import asyncio
from datetime import datetime
from src.orchestrator import AppOrchestrator
from src.core.settings import settings
from src.core.logging_config import setup_logging
from src.analyzer import DataAnalyzer
from src.utils.telegram_sender import send_telegram_notification
# </IMPORTS>
# <CONTRACT for="main">
# description: "Главная точка входа для сервиса мониторинга. Запускается по расписанию (cron)."
# postconditions:
# - "Парсер запускается, данные сохраняются в БД."
# - "Если обнаружены изменения, отправляется отчет в Telegram."
# </CONTRACT>
# <ENTRYPOINT name="main">
async def main():
"""
Основная асинхронная функция, которая запускает парсер,
анализирует данные и отправляет уведомление.
"""
run_id = datetime.now().strftime("%Y%m%d-%H%M%S")
setup_logging(run_id=run_id)
logger = logging.getLogger(__name__)
logger.info(f"🚀 Запуск сервиса мониторинга. Run ID: {run_id}")
try:
# 1. Запуск парсера
logger.info("Начало этапа парсинга...")
orchestrator = AppOrchestrator(settings=settings)
orchestrator.run()
logger.info("Этап парсинга завершен.")
# 2. Анализ данных
logger.info("Начало этапа анализа данных...")
analyzer = DataAnalyzer()
report_message = analyzer.analyze()
logger.info("Этап анализа данных завершен.")
# 3. Отправка отчета
if report_message:
logger.info("Обнаружены изменения, отправка отчета в Telegram...")
await send_telegram_notification(report_message)
else:
logger.info("Изменений не найдено, отправка отчета не требуется.")
except Exception as e:
logger.critical(f"💥 Критическая ошибка в сервисе мониторинга: {e}", exc_info=True)
# Попытка отправить уведомление об ошибке
try:
error_message = f"<b>❗️ Критическая ошибка в сервисе мониторинга</b>\n\n<pre>{e}</pre>"
await send_telegram_notification(error_message)
except Exception as tg_e:
logger.error(f"Не удалось даже отправить уведомление об ошибке в Telegram: {tg_e}")
logger.info("✅ Сервис мониторинга завершил работу.")
# <MAIN_CONTRACT>
# description: "Стандартный блок для запуска main() при выполнении скрипта."
# </MAIN_CONTRACT>
if __name__ == "__main__":
# Используем asyncio.run() для запуска асинхронной функции main
asyncio.run(main())
# <COHERENCE_CHECK status="PASSED" />

View File

@@ -18,3 +18,11 @@ python-dotenv>=1.0.0
# ANCHOR: RabbitMQ_Dependencies # ANCHOR: RabbitMQ_Dependencies
# Семантика: Зависимости для работы с очередью сообщений RabbitMQ # Семантика: Зависимости для работы с очередью сообщений RabbitMQ
pika>=1.3.0 pika>=1.3.0
# ANCHOR: Database_Dependencies
# Семантика: Зависимости для работы с базой данных
SQLAlchemy>=2.0.0
# ANCHOR: Telegram_Dependencies
# Семантика: Зависимости для отправки уведомлений в Telegram
python-telegram-bot>=21.0.0

116
src/analyzer.py Normal file
View File

@@ -0,0 +1,116 @@
# <MODULE name="analyzer" semantics="data_comparison_logic" />
# <IMPORTS>
import logging
from typing import List, Dict, Tuple
from sqlalchemy.orm import sessionmaker
from src.core.models import ProductVariant, ParsingRun
from sqlalchemy import create_engine
from src.core.settings import settings
# </IMPORTS>
# <MAIN_CONTRACT for="DataAnalyzer">
# description: "Анализирует данные двух последних запусков парсера и выявляет изменения."
# </MAIN_CONTRACT>
class DataAnalyzer:
# <INIT>
def __init__(self):
self.engine = create_engine(settings.db_path.absolute().as_uri())
self.Session = sessionmaker(bind=self.engine)
self.logger = logging.getLogger(__name__)
# </INIT>
# <HELPER name="_get_last_two_runs">
def _get_last_two_runs(self, session) -> Tuple[str, str]:
"""Возвращает ID двух последних успешных запусков."""
runs = session.query(ParsingRun.run_id).order_by(ParsingRun.start_time.desc()).limit(2).all()
if len(runs) < 2:
self.logger.warning("Найдено менее двух запусков. Сравнение невозможно.")
return None, None
return runs[0][0], runs[1][0] # (current_run_id, previous_run_id)
# </HELPER>
# <HELPER name="_get_data_for_run">
def _get_data_for_run(self, session, run_id: str) -> Dict[str, ProductVariant]:
"""Возвращает данные о товарах для указанного запуска в виде словаря {url: ProductVariant}."""
variants = session.query(ProductVariant).filter(ProductVariant.run_id == run_id).all()
return {v.url: v for v in variants}
# </HELPER>
# <ACTION name="analyze">
def analyze(self) -> str:
"""
Сравнивает два последних запуска и генерирует HTML-отчет об изменениях.
"""
# <CORE_LOGIC>
with self.Session() as session:
current_run_id, prev_run_id = self._get_last_two_runs(session)
if not current_run_id or not prev_run_id:
return ""
self.logger.info(f"Сравнение запуска {current_run_id} с предыдущим з<><D0B7>пуском {prev_run_id}")
current_data = self._get_data_for_run(session, current_run_id)
prev_data = self._get_data_for_run(session, prev_run_id)
price_changes = []
availability_changes = []
new_items = []
removed_items = []
# Поиск изменений и новых товаров
for url, current_item in current_data.items():
prev_item = prev_data.get(url)
if prev_item:
# Изменение цены
if current_item.price != prev_item.price:
price_changes.append((current_item, prev_item.price))
# Изменение статуса наличия
if current_item.is_available != prev_item.is_available:
availability_changes.append(current_item)
else:
new_items.append(current_item)
# Поиск удаленных товаров
for url, prev_item in prev_data.items():
if url not in current_data:
removed_items.append(prev_item)
return self._format_report(price_changes, availability_changes, new_items, removed_items)
# </CORE_LOGIC>
# </ACTION>
# <HELPER name="_format_report">
def _format_report(self, price_changes, availability_changes, new_items, removed_items) -> str:
"""Форматирует найденные изменения в красивый HTML-отчет."""
if not any([price_changes, availability_changes, new_items, removed_items]):
self.logger.info("Изменений не найдено.")
return ""
report_parts = ["<b>📈 Отчет об изменениях на сайте</b>\n"]
if price_changes:
report_parts.append("\n<b>💰 Изменились цены:</b>")
for item, old_price in price_changes:
report_parts.append(f" • <a href='{item.url}'>{item.name} ({item.volume})</a>: {old_price} ₽ → <b>{item.price} ₽</b>")
if availability_changes:
report_parts.append("\n<b>📦 Изменилось наличие:</b>")
for item in availability_changes:
status = "В наличии" if item.is_available else "❌ Нет в наличии"
report_parts.append(f" • <a href='{item.url}'>{item.name} ({item.volume})</a>: <b>{status}</b>")
if new_items:
report_parts.append("\n<b>✨ Новые товары:</b>")
for item in new_items:
report_parts.append(f" • <a href='{item.url}'>{item.name} ({item.volume})</a> - {item.price}")
if removed_items:
report_parts.append("\n<b>🗑️ Удаленные товары:</b>")
for item in removed_items:
report_parts.append(f" • <a href='{item.url}'>{item.name} ({item.volume})</a>")
return "\n".join(report_parts)
# </HELPER>
# <COHERENCE_CHECK status="PASSED" />
/>

View File

@@ -2,6 +2,8 @@
# <IMPORTS> # <IMPORTS>
import logging import logging
import os
from pathlib import Path
from typing import Optional from typing import Optional
from .database import DatabaseLogHandler, DatabaseManager from .database import DatabaseLogHandler, DatabaseManager
from .settings import settings from .settings import settings
@@ -23,27 +25,46 @@ def setup_logging(run_id: str, db_manager: Optional[DatabaseManager] = None):
"""Настраивает систему логирования проекта.""" """Настраивает систему логирования проекта."""
# <CORE_LOGIC> # <CORE_LOGIC>
log_format = '[%(asctime)s] [%(levelname)s] :: %(message)s' log_format = '[%(asctime)s] [%(levelname)s] :: %(message)s'
logging.basicConfig( handlers = []
level=logging.INFO,
format=log_format, # <ACTION name="create_log_directory">
datefmt='%Y-%m-%d %H:%M:%S', # Создаем директорию для логов, если она не существует.
force=True # Перезаписывает любую существующую конфигурацию os.makedirs(settings.log_dir, exist_ok=True)
) # </ACTION>
# <ACTION name="configure_file_handler">
# Добавляем обработчик для записи логов в файл.
log_file_name = f"run_{run_id}.log"
file_handler = logging.FileHandler(settings.log_dir / log_file_name, mode='w', encoding='utf-8')
file_handler.setLevel(logging.INFO)
file_handler.setFormatter(logging.Formatter(log_format))
handlers.append(file_handler)
# </ACTION>
# <DEPENDENCY name="settings.log_to_db" /> # <DEPENDENCY name="settings.log_to_db" />
if settings.log_to_db and db_manager: if settings.log_to_db and db_manager:
# <ERROR_HANDLER> # <ERROR_HANDLER>
try: try:
root_logger = logging.getLogger('')
db_handler = DatabaseLogHandler(db_manager, run_id) db_handler = DatabaseLogHandler(db_manager, run_id)
db_handler.setLevel(logging.DEBUG) db_handler.setLevel(logging.DEBUG)
db_handler.setFormatter(logging.Formatter(log_format)) db_handler.setFormatter(logging.Formatter(log_format))
root_logger.addHandler(db_handler) handlers.append(db_handler)
logging.info("Обработчик логов для записи в <20><>азу данных успешно добавлен.")
except Exception as e: except Exception as e:
logging.error(f"Не удалось инициализировать обработчик логов для БД: {e}") # Логирование еще не настроено, используем print для этой критической ошибки
print(f"Не удалось инициализировать обработчик логов для БД: {e}")
# </ERROR_HANDLER> # </ERROR_HANDLER>
logging.basicConfig(
level=logging.DEBUG,
format=log_format,
datefmt='%Y-%m-%d %H:%M:%S',
handlers=handlers,
force=True # Перезаписывает любую существующую конфигурацию
)
if any(isinstance(h, DatabaseLogHandler) for h in handlers):
logging.info("Обработчик логов для записи в базу данных успешно добавлен.")
logging.info("Система логирования инициализирована.") logging.info("Система логирования инициализирована.")
# </CORE_LOGIC> # </CORE_LOGIC>
# <COHERENCE_CHECK status="PASSED" /> # <COHERENCE_CHECK status="PASSED" />

View File

@@ -6,10 +6,14 @@
# </DESIGN_NOTE> # </DESIGN_NOTE>
# <IMPORTS> # <IMPORTS>
import logging
from pydantic import BaseModel, Field, HttpUrl from pydantic import BaseModel, Field, HttpUrl
from pydantic.functional_validators import model_validator
from datetime import datetime from datetime import datetime
from typing import List from typing import List, Optional
import uuid import uuid
logger = logging.getLogger(__name__)
# </IMPORTS> # </IMPORTS>
# <MAIN_CONTRACT for="ProductVariant"> # <MAIN_CONTRACT for="ProductVariant">
@@ -19,8 +23,18 @@ import uuid
class ProductVariant(BaseModel): class ProductVariant(BaseModel):
# <STATE name="product_variant_fields"> # <STATE name="product_variant_fields">
name: str = Field(..., description="Название продукта.") name: str = Field(..., description="Название продукта.")
volume: str = Field(..., description="Объем или вариант продукта (например, '50мл', '10 капсул').") volume: Optional[str] = Field(None, description="Объем или вариант продукта (например, '50мл', '10 капсул'). Может быть пустым, если не применимо.")
price: int = Field(..., gt=0, description="Цена продукта в числовом формате, должна быть положительной.") price: int = Field(..., description="Цена продукта в числовом формате. Должна быть положительной, если товар в наличии, иначе может быть 0.")
is_in_stock: bool = Field(..., description="Наличие товара.")
@model_validator(mode='after')
def validate_price_based_on_stock(self) -> 'ProductVariant':
if not self.is_in_stock and self.price != 0:
logger.warning(f"[CONTRACT_VIOLATION] Product '{self.name}' (URL: {self.url}) is out of stock but has a non-zero price ({self.price}). Setting price to 0.")
self.price = 0
elif self.is_in_stock and self.price <= 0:
raise ValueError("Price must be greater than 0 for in-stock products.")
return self
url: HttpUrl = Field(..., description="Полный URL страницы варианта продукта.") url: HttpUrl = Field(..., description="Полный URL страницы варианта продукта.")
is_in_stock: bool = Field(..., description="Наличие товара.") is_in_stock: bool = Field(..., description="Наличие товара.")
# </STATE> # </STATE>

View File

@@ -67,22 +67,29 @@ class Settings(BaseModel):
# <CONFIG name="logging_settings"> # <CONFIG name="logging_settings">
log_to_db: bool = Field(default=os.getenv('PARSER_LOG_TO_DB', 'true').lower() == 'true') log_to_db: bool = Field(default=os.getenv('PARSER_LOG_TO_DB', 'true').lower() == 'true')
log_dir: Path = Field(default=BASE_DIR / "logs", description="Директория для сохранения логов")
# </CONFIG> # </CONFIG>
# <CONFIG name="performance_settings"> # <CONFIG name="performance_settings">
request_timeout: int = Field(default=int(os.getenv('PARSER_TIMEOUT', 30))) request_timeout: int = Field(default=int(os.getenv('PARSER_TIMEOUT', 30)))
delay_between_requests: float = Field(default=float(os.getenv('PARSER_DELAY', 1.0))) delay_between_requests: float = Field(default=float(os.getenv('PARSER_DELAY', 1.0)))
max_retries: int = Field(default=int(os.getenv('PARSER_RETRIES', 3))) max_retries: int = Field(default=int(os.getenv('PARSER_RETRIES', 3)))
num_parser_threads: int = Field(default=int(os.getenv('PARSER_THREADS', 5)), description="Количество потоков для парсинга")
# </CONFIG>
# <CONFIG name="telegram_settings">
telegram_bot_token: str = Field(default=os.getenv('TELEGRAM_BOT_TOKEN', ''), description="Токен для Telegram бота")
telegram_chat_id: str = Field(default=os.getenv('TELEGRAM_CHAT_ID', ''), description="ID чата для отправки уведомлений")
# </CONFIG> # </CONFIG>
# <CONFIG name="selectors_config_instance"> # <CONFIG name="selectors_config_instance">
selectors: ScraperSelectors = ScraperSelectors( selectors: ScraperSelectors = ScraperSelectors(
CATALOG_PRODUCT_LINK='.product-card h4 a.product-link', CATALOG_PRODUCT_LINK='.product-card h4 a.product-link',
VARIANT_LIST_ITEM='.product-version-select li', VARIANT_LIST_ITEM='.product-version-select li, .product-variants-list .variant-item',
PRODUCT_PAGE_NAME='h1.product-h1', PRODUCT_PAGE_NAME='h1.product-h1',
ACTIVE_VOLUME='.product-version-select li.active', ACTIVE_VOLUME='.product-version-select li.active, .variant-item.active',
PRICE_BLOCK='.product-sale-box .price span', PRICE_BLOCK='.price-value, .product-price .price, .product-sale-box .price span',
PRODUCT_UNAVAILABLE='.product-unavailable', PRODUCT_UNAVAILABLE='.product-unavailable, .out-of-stock, .unavailable, .stock.status-0',
) )
# </CONFIG> # </CONFIG>

View File

@@ -7,8 +7,10 @@
# <IMPORTS> # <IMPORTS>
import sys import sys
import logging import logging
from datetime import datetime
from orchestrator import AppOrchestrator from orchestrator import AppOrchestrator
from core.settings import settings from core.settings import settings
from core.logging_config import setup_logging
# </IMPORTS> # </IMPORTS>
# <CONTRACT for="main"> # <CONTRACT for="main">
@@ -20,17 +22,15 @@ from core.settings import settings
# exceptions: # exceptions:
# - "ValueError: при ошибках в конфигурации." # - "ValueError: при ошибках в конфигурации."
# - "KeyboardInterrupt: при прерывании пользователем." # - "KeyboardInterrupt: при прерывании пользователем."
# - "Exception: при любых других критических ошибках." # - "Exception: при любых других критических ошибка<EFBFBD><EFBFBD>."
# </CONTRACT> # </CONTRACT>
# <ENTRYPOINT name="main"> # <ENTRYPOINT name="main">
def main(): def main():
"""Точка входа в приложение.""" """Точка входа в приложение."""
# <INIT> # <INIT>
logging.basicConfig( run_id = datetime.now().strftime("%Y%m%d-%H%M%S")
level=logging.INFO, setup_logging(run_id=run_id)
format='[%(asctime)s] [%(levelname)s] %(name)s: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# </INIT> # </INIT>
@@ -50,6 +50,7 @@ def main():
logger.info(f" • Таймаут запросов: {settings.request_timeout}с") logger.info(f" • Таймаут запросов: {settings.request_timeout}с")
logger.info(f" • Задержка между запросами: {settings.delay_between_requests}с") logger.info(f" • Задержка между запросами: {settings.delay_between_requests}с")
logger.info(f" • Максимум попыток: {settings.max_retries}") logger.info(f" • Максимум попыток: {settings.max_retries}")
logger.info(f" • Количество потоков: {settings.num_parser_threads}")
# <DEPENDENCY name="settings.validate_configuration" /> # <DEPENDENCY name="settings.validate_configuration" />
config_errors = settings.validate_configuration() config_errors = settings.validate_configuration()

View File

@@ -12,6 +12,7 @@ import requests
from datetime import datetime from datetime import datetime
from typing import List, Optional from typing import List, Optional
from contextlib import contextmanager from contextlib import contextmanager
from concurrent.futures import ThreadPoolExecutor, as_completed
from core.settings import Settings, ENABLE_RABBITMQ_EXPORT, ENABLE_CSV_EXPORT, ENABLE_DATABASE_EXPORT from core.settings import Settings, ENABLE_RABBITMQ_EXPORT, ENABLE_CSV_EXPORT, ENABLE_DATABASE_EXPORT
from core.models import ProductVariant from core.models import ProductVariant
@@ -48,7 +49,6 @@ class AppOrchestrator:
# <DEPENDENCY name="Scraper" relationship="composition"> # <DEPENDENCY name="Scraper" relationship="composition">
# Оркестратор владеет скрейпером и управляет его жизненным циклом. # Оркестратор владеет скрейпером и управляет его жизненным циклом.
self.scraper = Scraper( self.scraper = Scraper(
session=self.http_session,
selectors=self.settings.selectors, selectors=self.settings.selectors,
base_url=self.settings.base_url base_url=self.settings.base_url
) )
@@ -155,40 +155,55 @@ class AppOrchestrator:
# </CORE_LOGIC> # </CORE_LOGIC>
# </ACTION> # </ACTION>
# <CONTRACT for="_scrape_data"> # <CONTRACT for="_scrape_single_url">
# description: "Шаг 3: Итеративный парсинг данных по списку URL." # description: "Вспомогательный метод для парсинга одного URL в отдельном потоке."
# postconditions: "Возвращает ProductVariant или None в случае ошибки."
# </CONTRACT> # </CONTRACT>
# <ACTION name="_scrape_data"> # <HELPER name="_scrape_single_url">
def _scrape_data(self, urls: List[str]): def _scrape_single_url(self, url: str) -> Optional[ProductVariant]:
"""Итеративный парсинг данных."""
with self._error_context("scrape_data"):
# <CORE_LOGIC>
total_to_scrape = len(urls)
self.logger.info(f"[ACTION:_scrape_data] Начало парсинга {total_to_scrape} URL вариантов.")
for i, url in enumerate(urls):
# <ERROR_HANDLER for="single_url_scrape">
try: try:
self.logger.info(f"[ACTION:_scrape_data] Парсинг URL {i+1}/{total_to_scrape}: {url.split('/')[-1]}") self.logger.info(f"[ACTION:_scrape_single_url] Парсинг URL: {url.split('/')[-1]}")
time.sleep(1)
# <DEPENDENCY name="scraper.scrape_variant_page" />
variant_data = self.scraper.scrape_variant_page( variant_data = self.scraper.scrape_variant_page(
variant_url=url, variant_url=url,
run_id=self.run_id run_id=self.run_id
) )
if variant_data:
self.logger.debug(f"[ACTION:_scrape_single_url] Успешно спарсен URL: {url.split('/')[-1]}")
return variant_data
else:
self.logger.warning(f"[ACTION:_scrape_single_url] Данные не получены для URL: {url.split('/')[-1]}")
return None
except Exception as e:
self.logger.error(f"[ACTION:_scrape_single_url] Ошибка при парсинге URL ({url}): {e}")
return None
# </HELPER>
# <CONTRACT for="_scrape_data">
# description: "Шаг 3: Параллельный парсинг данных по списку URL с использованием ThreadPoolExecutor."
# </CONTRACT>
# <ACTION name="_scrape_data">
def _scrape_data(self, urls: List[str]):
"""Параллельный парсинг данных."""
with self._error_context("scrape_data"):
# <CORE_LOGIC>
total_to_scrape = len(urls)
self.logger.info(f"[ACTION:_scrape_data] Начало параллельного парсинга {total_to_scrape} URL вариантов с {self.settings.num_parser_threads} потоками.")
with ThreadPoolExecutor(max_workers=self.settings.num_parser_threads) as executor:
future_to_url = {executor.submit(self._scrape_single_url, url): url for url in urls}
for i, future in enumerate(as_completed(future_to_url)):
url = future_to_url[future]
try:
variant_data = future.result()
if variant_data: if variant_data:
self.final_data.append(variant_data) self.final_data.append(variant_data)
self.stats['successful_parses'] += 1 self.stats['successful_parses'] += 1
else: else:
self.stats['failed_parses'] += 1 self.stats['failed_parses'] += 1
except Exception as e: except Exception as e:
self.stats['failed_parses'] += 1 self.stats['failed_parses'] += 1
self.logger.error(f"[ACTION:_scrape_data] Ошибка при парсинге URL {i+1}/{total_to_scrape} ({url}): {e}") self.logger.error(f"[ACTION:_scrape_data] Необработанная ошибка в потоке для URL ({url}): {e}")
continue
# </ERROR_HANDLER>
self.logger.info(f"[ACTION:_scrape_data] Парсинг данных завершен. Всего собрано {len(self.final_data)} валидных вариантов.") self.logger.info(f"[ACTION:_scrape_data] Парсинг данных завершен. Всего собрано {len(self.final_data)} валидных вариантов.")
self.logger.info(f"[STATS][ACTION:_scrape_data] Успешно: {self.stats['successful_parses']}, Ошибок: {self.stats['failed_parses']}") self.logger.info(f"[STATS][ACTION:_scrape_data] Успешно: {self.stats['successful_parses']}, Ошибок: {self.stats['failed_parses']}")

View File

@@ -26,25 +26,20 @@ from core.settings import ScraperSelectors
# </MAIN_CONTRACT> # </MAIN_CONTRACT>
class Scraper: class Scraper:
# <INIT name="__init__"> # <INIT name="__init__">
def __init__(self, session: requests.Session, selectors: ScraperSelectors, base_url: str): def __init__(self, selectors: ScraperSelectors, base_url: str):
"""Инициализирует скрейпер с зависимостями: сессия, селекторы и базовый URL.""" """Инициализирует скрейпер с зависимостями: сессия, селекторы и базовый URL."""
# <STATE name="initial_state"> # <STATE name="initial_state">
self.session = session
self.selectors = selectors self.selectors = selectors
self.base_url = base_url self.base_url = base_url
self.logger = logging.getLogger(self.__class__.__name__) self.logger = logging.getLogger(self.__class__.__name__)
# </STATE> # </STATE>
# <ACTION name="setup_retry_strategy_on_init">
self._setup_retry_strategy()
# </ACTION>
# </INIT> # </INIT>
# <CONTRACT for="_setup_retry_strategy"> # <CONTRACT for="_setup_retry_strategy">
# description: "Настраивает retry-логику для HTTP-адаптера сессии." # description: "Настраивает retry-логику для HTTP-адаптера сессии."
# </CONTRACT> # </CONTRACT>
# <HELPER name="_setup_retry_strategy"> # <HELPER name="_setup_retry_strategy">
def _setup_retry_strategy(self): def _setup_retry_strategy(self, session: requests.Session):
"""Настраивает retry стратегию для HTTP запросов.""" """Настраивает retry стратегию для HTTP запросов."""
# <CORE_LOGIC> # <CORE_LOGIC>
retry_strategy = Retry( retry_strategy = Retry(
@@ -54,12 +49,27 @@ class Scraper:
allowed_methods=["HEAD", "GET", "OPTIONS"] allowed_methods=["HEAD", "GET", "OPTIONS"]
) )
adapter = HTTPAdapter(max_retries=retry_strategy) adapter = HTTPAdapter(max_retries=retry_strategy)
self.session.mount("http://", adapter) session.mount("http://", adapter)
self.session.mount("https://", adapter) session.mount("https://", adapter)
self.logger.debug("[HELPER:_setup_retry_strategy] Retry стратегия настроена для HTTP запросов.") self.logger.debug("[HELPER:_setup_retry_strategy] Retry стратегия настроена для HTTP запросов.")
# </CORE_LOGIC> # </CORE_LOGIC>
# </HELPER> # </HELPER>
# <CONTRACT for="_find_element_with_multiple_selectors">
# description: "Пытается найти элемент, используя список CSS-селекторов."
# postconditions: "Возвращает найденный элемент BeautifulSoup или None."
# </CONTRACT>
# <HELPER name="_find_element_with_multiple_selectors">
def _find_element_with_multiple_selectors(self, soup: BeautifulSoup, selectors: List[str], log_prefix: str, element_name: str):
for selector in selectors:
element = soup.select_one(selector)
if element:
self.logger.debug(f"{log_prefix} [FOUND_ELEMENT] Элемент '{element_name}' найден с селектором: '{selector}'")
return element
self.logger.warning(f"{log_prefix} [MISSING_ELEMENT] Элемент '{element_name}' не найден ни с одним из селекторов: {selectors}")
return None
# </HELPER>
# <CONTRACT for="_clean_price"> # <CONTRACT for="_clean_price">
# description: "Очищает строковое представление цены, оставляя только цифры." # description: "Очищает строковое представление цены, оставляя только цифры."
# postconditions: "Возвращает целое число или 0 в случае ошибки." # postconditions: "Возвращает целое число или 0 в случае ошибки."
@@ -93,7 +103,7 @@ class Scraper:
# postconditions: "Возвращает текстовое содержимое страницы или None в случае любой ошибки." # postconditions: "Возвращает текстовое содержимое страницы или None в случае любой ошибки."
# </CONTRACT> # </CONTRACT>
# <HELPER name="_fetch_page"> # <HELPER name="_fetch_page">
def _fetch_page(self, url: str, request_id: str) -> Optional[str]: def _fetch_page(self, session: requests.Session, url: str, request_id: str) -> Optional[str]:
"""Приватный метод для скачивания HTML-содержимого страницы.""" """Приватный метод для скачивания HTML-содержимого страницы."""
log_prefix = f"[HELPER:_fetch_page(id={request_id})]" log_prefix = f"[HELPER:_fetch_page(id={request_id})]"
self.logger.debug(f"{log_prefix} Запрос к URL: {url}") self.logger.debug(f"{log_prefix} Запрос к URL: {url}")
@@ -101,7 +111,7 @@ class Scraper:
# <ERROR_HANDLER for="network_requests"> # <ERROR_HANDLER for="network_requests">
try: try:
# <CORE_LOGIC> # <CORE_LOGIC>
response = self.session.get(url, timeout=30) response = session.get(url, timeout=30)
response.raise_for_status() response.raise_for_status()
if not response.text.strip(): if not response.text.strip():
@@ -119,7 +129,7 @@ class Scraper:
self.logger.error(f"{log_prefix} [TIMEOUT] Превышено время ожидания для {url}") self.logger.error(f"{log_prefix} [TIMEOUT] Превышено время ожидания для {url}")
return None return None
except requests.exceptions.ConnectionError as e: except requests.exceptions.ConnectionError as e:
self.logger.error(f"{log_prefix} [CONNECTION_ERROR] Ошибка соединения для {url}: {e}") self.logger.error(f"{log_prefix} [CONNECTION_ERROR] Ошибка соединения дл<EFBFBD><EFBFBD> {url}: {e}")
return None return None
except requests.exceptions.HTTPError as e: except requests.exceptions.HTTPError as e:
self.logger.error(f"{log_prefix} [HTTP_ERROR] HTTP ошибка для {url}: {e.response.status_code}") self.logger.error(f"{log_prefix} [HTTP_ERROR] HTTP ошибка для {url}: {e.response.status_code}")
@@ -144,7 +154,9 @@ class Scraper:
log_prefix = f"[ACTION:get_base_product_urls(id={run_id})]" log_prefix = f"[ACTION:get_base_product_urls(id={run_id})]"
self.logger.info(f"{log_prefix} Начало сбора базовых URL с: {catalog_url}") self.logger.info(f"{log_prefix} Начало сбора базовых URL с: {catalog_url}")
html = self._fetch_page(catalog_url, f"get_base_urls(id={run_id})") with requests.Session() as session:
self._setup_retry_strategy(session)
html = self._fetch_page(session, catalog_url, f"get_base_urls(id={run_id})")
if not html: if not html:
self.logger.error(f"{log_prefix} [CRITICAL] Не удалось получить HTML страницы каталога, возвращаю пустой список.") self.logger.error(f"{log_prefix} [CRITICAL] Не удалось получить HTML страницы каталога, возвращаю пустой список.")
return [] return []
@@ -196,7 +208,9 @@ class Scraper:
for i, base_url in enumerate(base_product_urls): for i, base_url in enumerate(base_product_urls):
self.logger.info(f"{log_prefix} Обработка базового URL {i+1}/{total_base}: {base_url.split('/')[-1]}") self.logger.info(f"{log_prefix} Обработка базового URL {i+1}/{total_base}: {base_url.split('/')[-1]}")
html = self._fetch_page(base_url, f"get_variant_urls(id={run_id})-{i+1}") with requests.Session() as session:
self._setup_retry_strategy(session)
html = self._fetch_page(session, base_url, f"get_variant_urls(id={run_id})-{i+1}")
if not html: if not html:
self.logger.warning(f"{log_prefix} Пропуск базового URL из-за ошибки загрузки: {base_url}") self.logger.warning(f"{log_prefix} Пропуск базового URL из-за ошибки загрузки: {base_url}")
continue continue
@@ -224,8 +238,6 @@ class Scraper:
all_variant_urls.append(base_url) all_variant_urls.append(base_url)
# </FALLBACK> # </FALLBACK>
time.sleep(0.5)
self.logger.info(f"{log_prefix} [COHERENCE_CHECK_PASSED] Обнаружено всего {len(all_variant_urls)} URL вариантов для парсинга.") self.logger.info(f"{log_prefix} [COHERENCE_CHECK_PASSED] Обнаружено всего {len(all_variant_urls)} URL вариантов для парсинга.")
return all_variant_urls return all_variant_urls
# </CORE_LOGIC> # </CORE_LOGIC>
@@ -240,9 +252,11 @@ class Scraper:
def scrape_variant_page(self, variant_url: str, run_id: str) -> Optional[ProductVariant]: def scrape_variant_page(self, variant_url: str, run_id: str) -> Optional[ProductVariant]:
"""Парсит страницу одного варианта и возвращает Pydantic-модель.""" """Парсит страницу одного варианта и возвращает Pydantic-модель."""
log_prefix = f"[ACTION:scrape_variant_page(id={run_id}, url={variant_url.split('/')[-1]})]" log_prefix = f"[ACTION:scrape_variant_page(id={run_id}, url={variant_url.split('/')[-1]})]"
self.logger.info(f"{log_prefix} Начало парсинга страниц<EFBFBD><EFBFBD> варианта.") self.logger.info(f"{log_prefix} Начало парсинга страницы варианта.")
html = self._fetch_page(variant_url, f"scrape_variant(id={run_id}, url={variant_url.split('/')[-1]})") with requests.Session() as session:
self._setup_retry_strategy(session)
html = self._fetch_page(session, variant_url, f"scrape_variant(id={run_id}, url={variant_url.split('/')[-1]})")
if not html: if not html:
self.logger.warning(f"{log_prefix} Не удалось получить HTML страницы варианта, пропуск парсинга.") self.logger.warning(f"{log_prefix} Не удалось получить HTML страницы варианта, пропуск парсинга.")
return None return None
@@ -251,10 +265,10 @@ class Scraper:
try: try:
soup = BeautifulSoup(html, 'html.parser') soup = BeautifulSoup(html, 'html.parser')
name_el = soup.select_one(self.selectors.product_page_name) name_el = self._find_element_with_multiple_selectors(soup, [self.selectors.product_page_name, 'h1.product-title', 'h2.product-name'], log_prefix, 'имени продукта')
price_el = soup.select_one(self.selectors.price_block) price_el = self._find_element_with_multiple_selectors(soup, [self.selectors.price_block, 'span.price', 'div.price'], log_prefix, 'цены')
volume_el = soup.select_one(self.selectors.active_volume) volume_el = self._find_element_with_multiple_selectors(soup, [self.selectors.active_volume, '.product-options .active'], log_prefix, 'объема')
unavailable_el = soup.select_one(self.selectors.product_unavailable) unavailable_el = self._find_element_with_multiple_selectors(soup, [self.selectors.product_unavailable, 'div.out-of-stock', 'span.unavailable'], log_prefix, 'отсутствия в наличии')
# <PRECONDITION> # <PRECONDITION>
# Товар должен иметь имя. Цена или статус "нет в наличии" должны присутствовать. # Товар должен иметь имя. Цена или статус "нет в наличии" должны присутствовать.
@@ -262,7 +276,7 @@ class Scraper:
self.logger.warning(f"{log_prefix} [MISSING_ELEMENT] Не найден элемент имени продукта с селектором: {self.selectors.product_page_name}") self.logger.warning(f"{log_prefix} [MISSING_ELEMENT] Не найден элемент имени продукта с селектором: {self.selectors.product_page_name}")
return None return None
if not price_el and not unavailable_el: if not price_el and not unavailable_el:
self.logger.warning(f"{log_prefix} [MISSING_ELEMENT] Не найден ни элемент цены, ни элемент отсутствия в наличии.") self.logger.warning(f"{log_prefix} [MISSING_ELEMENT] Не найден ни элемент цены, ни элемент отсутствия в наличи<EFBFBD><EFBFBD>.")
return None return None
# </PRECONDITION> # </PRECONDITION>
@@ -293,6 +307,7 @@ class Scraper:
# <POSTCONDITION> # <POSTCONDITION>
# <CONTRACT_VALIDATOR by="Pydantic"> # <CONTRACT_VALIDATOR by="Pydantic">
try: try:
self.logger.debug(f"{log_prefix} [DEBUG_SCRAPE_DATA] Name: '{name}', Price: {price}, URL: '{variant_url}', IsInStock: {is_in_stock}")
product = ProductVariant(name=name, volume=volume, price=price, url=variant_url, is_in_stock=is_in_stock) product = ProductVariant(name=name, volume=volume, price=price, url=variant_url, is_in_stock=is_in_stock)
self.logger.debug(f"{log_prefix} [COHERENCE_CHECK_PASSED] Успешно распарсен вариант: '{product.name}' | InStock: {product.is_in_stock} | Price: '{product.price}'") self.logger.debug(f"{log_prefix} [COHERENCE_CHECK_PASSED] Успешно распарсен вариант: '{product.name}' | InStock: {product.is_in_stock} | Price: '{product.price}'")
return product return product

View File

@@ -0,0 +1,72 @@
# <MODULE name="utils.telegram_sender" semantics="telegram_notifications" />
# <IMPORTS>
import logging
import asyncio
from telegram import Bot
from telegram.error import TelegramError
from src.core.settings import settings
# </IMPORTS>
# <MAIN_CONTRACT for="TelegramSender">
# description: "Отправляет сообщения в Telegram, используя токен и ID чата из настроек."
# invariant: "Токен и ID чата должны быть заданы в конфигурации."
# </MAIN_CONTRACT>
class TelegramSender:
# <INIT>
def __init__(self):
if not settings.telegram_bot_token or not settings.telegram_chat_id:
raise ValueError("TELEGRAM_BOT_TOKEN и TELEGRAM_CHAT_ID должны быть установлены в .env")
self.bot = Bot(token=settings.telegram_bot_token)
self.chat_id = settings.telegram_chat_id
self.logger = logging.getLogger(__name__)
# </INIT>
# <CONTRACT for="send_message">
# description: "Асинхронно отправляет текстовое сообщение <20><> Telegram."
# preconditions:
# - "Сообщение (message) должно быть непустой строкой."
# postconditions:
# - "Сообщение отправлено в чат."
# exceptions:
# - "TelegramError: при ошибках API Telegram."
# - "Exception: при других ошибках."
# </CONTRACT>
# <ACTION name="send_message">
async def send_message(self, message: str):
"""Асинхронно отправляет сообщение в Telegram."""
# <CORE_LOGIC>
try:
self.logger.info(f"Отправка сообщения в Telegram (чат ID: {self.chat_id})...")
await self.bot.send_message(
chat_id=self.chat_id,
text=message,
parse_mode='HTML'
)
self.logger.info("Сообщение в Telegram успешно отправлено.")
except TelegramError as e:
self.logger.error(f"Ошибка отправки сообщения в Telegram: {e}")
raise
except Exception as e:
self.logger.error(f"Непредвиденная ошибка при отправке сообщения в Telegram: {e}")
raise
# </CORE_LOGIC>
# </ACTION>
# <HELPER name="send_telegram_notification">
# description: "Удобная обертка для инициализации и отправки сообщения."
# </HELPER>
async def send_telegram_notification(message: str):
"""
Инициализирует TelegramSender и отправляет сообщение.
Является удобной оберткой для вызова из других частей приложения.
"""
try:
sender = TelegramSender()
await sender.send_message(message)
except ValueError as e:
logging.error(f"Ошибка инициализации TelegramSender: {e}")
except Exception as e:
logging.error(f"Не удалось отправить уведомление в Telegram: {e}")
# <COHERENCE_CHECK status="PASSED" />