Compare commits

..

4 Commits

Author SHA1 Message Date
cf52a7c0ad fix(logging): Ensure file logger is correctly initialized 2025-07-20 08:35:40 +03:00
ae9c61facc add monitoring service & Telegram sender 2025-07-20 08:34:28 +03:00
0259289ee9 + 2025-07-19 01:04:04 +03:00
fe1a022609 multithread 2025-07-19 01:03:47 +03:00
11 changed files with 418 additions and 76 deletions

5
.gitignore vendored
View File

@@ -19,4 +19,7 @@ price_data_final/
.vscode/
#backups
*.bak
*.bak
# Logs
/logs/

70
monitoring_service.py Normal file
View File

@@ -0,0 +1,70 @@
# <MODULE name="monitoring_service" semantics="main_entrypoint_for_cron" />
# <IMPORTS>
import logging
import asyncio
from datetime import datetime
from src.orchestrator import AppOrchestrator
from src.core.settings import settings
from src.core.logging_config import setup_logging
from src.analyzer import DataAnalyzer
from src.utils.telegram_sender import send_telegram_notification
# </IMPORTS>
# <CONTRACT for="main">
# description: "Главная точка входа для сервиса мониторинга. Запускается по расписанию (cron)."
# postconditions:
# - "Парсер запускается, данные сохраняются в БД."
# - "Если обнаружены изменения, отправляется отчет в Telegram."
# </CONTRACT>
# <ENTRYPOINT name="main">
async def main():
"""
Основная асинхронная функция, которая запускает парсер,
анализирует данные и отправляет уведомление.
"""
run_id = datetime.now().strftime("%Y%m%d-%H%M%S")
setup_logging(run_id=run_id)
logger = logging.getLogger(__name__)
logger.info(f"🚀 Запуск сервиса мониторинга. Run ID: {run_id}")
try:
# 1. Запуск парсера
logger.info("Начало этапа парсинга...")
orchestrator = AppOrchestrator(settings=settings)
orchestrator.run()
logger.info("Этап парсинга завершен.")
# 2. Анализ данных
logger.info("Начало этапа анализа данных...")
analyzer = DataAnalyzer()
report_message = analyzer.analyze()
logger.info("Этап анализа данных завершен.")
# 3. Отправка отчета
if report_message:
logger.info("Обнаружены изменения, отправка отчета в Telegram...")
await send_telegram_notification(report_message)
else:
logger.info("Изменений не найдено, отправка отчета не требуется.")
except Exception as e:
logger.critical(f"💥 Критическая ошибка в сервисе мониторинга: {e}", exc_info=True)
# Попытка отправить уведомление об ошибке
try:
error_message = f"<b>❗️ Критическая ошибка в сервисе мониторинга</b>\n\n<pre>{e}</pre>"
await send_telegram_notification(error_message)
except Exception as tg_e:
logger.error(f"Не удалось даже отправить уведомление об ошибке в Telegram: {tg_e}")
logger.info("✅ Сервис мониторинга завершил работу.")
# <MAIN_CONTRACT>
# description: "Стандартный блок для запуска main() при выполнении скрипта."
# </MAIN_CONTRACT>
if __name__ == "__main__":
# Используем asyncio.run() для запуска асинхронной функции main
asyncio.run(main())
# <COHERENCE_CHECK status="PASSED" />

View File

@@ -17,4 +17,12 @@ python-dotenv>=1.0.0
# ANCHOR: RabbitMQ_Dependencies
# Семантика: Зависимости для работы с очередью сообщений RabbitMQ
pika>=1.3.0
pika>=1.3.0
# ANCHOR: Database_Dependencies
# Семантика: Зависимости для работы с базой данных
SQLAlchemy>=2.0.0
# ANCHOR: Telegram_Dependencies
# Семантика: Зависимости для отправки уведомлений в Telegram
python-telegram-bot>=21.0.0

116
src/analyzer.py Normal file
View File

@@ -0,0 +1,116 @@
# <MODULE name="analyzer" semantics="data_comparison_logic" />
# <IMPORTS>
import logging
from typing import List, Dict, Tuple
from sqlalchemy.orm import sessionmaker
from src.core.models import ProductVariant, ParsingRun
from sqlalchemy import create_engine
from src.core.settings import settings
# </IMPORTS>
# <MAIN_CONTRACT for="DataAnalyzer">
# description: "Анализирует данные двух последних запусков парсера и выявляет изменения."
# </MAIN_CONTRACT>
class DataAnalyzer:
# <INIT>
def __init__(self):
self.engine = create_engine(settings.db_path.absolute().as_uri())
self.Session = sessionmaker(bind=self.engine)
self.logger = logging.getLogger(__name__)
# </INIT>
# <HELPER name="_get_last_two_runs">
def _get_last_two_runs(self, session) -> Tuple[str, str]:
"""Возвращает ID двух последних успешных запусков."""
runs = session.query(ParsingRun.run_id).order_by(ParsingRun.start_time.desc()).limit(2).all()
if len(runs) < 2:
self.logger.warning("Найдено менее двух запусков. Сравнение невозможно.")
return None, None
return runs[0][0], runs[1][0] # (current_run_id, previous_run_id)
# </HELPER>
# <HELPER name="_get_data_for_run">
def _get_data_for_run(self, session, run_id: str) -> Dict[str, ProductVariant]:
"""Возвращает данные о товарах для указанного запуска в виде словаря {url: ProductVariant}."""
variants = session.query(ProductVariant).filter(ProductVariant.run_id == run_id).all()
return {v.url: v for v in variants}
# </HELPER>
# <ACTION name="analyze">
def analyze(self) -> str:
"""
Сравнивает два последних запуска и генерирует HTML-отчет об изменениях.
"""
# <CORE_LOGIC>
with self.Session() as session:
current_run_id, prev_run_id = self._get_last_two_runs(session)
if not current_run_id or not prev_run_id:
return ""
self.logger.info(f"Сравнение запуска {current_run_id} с предыдущим з<><D0B7>пуском {prev_run_id}")
current_data = self._get_data_for_run(session, current_run_id)
prev_data = self._get_data_for_run(session, prev_run_id)
price_changes = []
availability_changes = []
new_items = []
removed_items = []
# Поиск изменений и новых товаров
for url, current_item in current_data.items():
prev_item = prev_data.get(url)
if prev_item:
# Изменение цены
if current_item.price != prev_item.price:
price_changes.append((current_item, prev_item.price))
# Изменение статуса наличия
if current_item.is_available != prev_item.is_available:
availability_changes.append(current_item)
else:
new_items.append(current_item)
# Поиск удаленных товаров
for url, prev_item in prev_data.items():
if url not in current_data:
removed_items.append(prev_item)
return self._format_report(price_changes, availability_changes, new_items, removed_items)
# </CORE_LOGIC>
# </ACTION>
# <HELPER name="_format_report">
def _format_report(self, price_changes, availability_changes, new_items, removed_items) -> str:
"""Форматирует найденные изменения в красивый HTML-отчет."""
if not any([price_changes, availability_changes, new_items, removed_items]):
self.logger.info("Изменений не найдено.")
return ""
report_parts = ["<b>📈 Отчет об изменениях на сайте</b>\n"]
if price_changes:
report_parts.append("\n<b>💰 Изменились цены:</b>")
for item, old_price in price_changes:
report_parts.append(f" • <a href='{item.url}'>{item.name} ({item.volume})</a>: {old_price} ₽ → <b>{item.price} ₽</b>")
if availability_changes:
report_parts.append("\n<b>📦 Изменилось наличие:</b>")
for item in availability_changes:
status = "В наличии" if item.is_available else "❌ Нет в наличии"
report_parts.append(f" • <a href='{item.url}'>{item.name} ({item.volume})</a>: <b>{status}</b>")
if new_items:
report_parts.append("\n<b>✨ Новые товары:</b>")
for item in new_items:
report_parts.append(f" • <a href='{item.url}'>{item.name} ({item.volume})</a> - {item.price}")
if removed_items:
report_parts.append("\n<b>🗑️ Удаленные товары:</b>")
for item in removed_items:
report_parts.append(f" • <a href='{item.url}'>{item.name} ({item.volume})</a>")
return "\n".join(report_parts)
# </HELPER>
# <COHERENCE_CHECK status="PASSED" />
/>

View File

@@ -2,6 +2,8 @@
# <IMPORTS>
import logging
import os
from pathlib import Path
from typing import Optional
from .database import DatabaseLogHandler, DatabaseManager
from .settings import settings
@@ -23,27 +25,46 @@ def setup_logging(run_id: str, db_manager: Optional[DatabaseManager] = None):
"""Настраивает систему логирования проекта."""
# <CORE_LOGIC>
log_format = '[%(asctime)s] [%(levelname)s] :: %(message)s'
logging.basicConfig(
level=logging.INFO,
format=log_format,
datefmt='%Y-%m-%d %H:%M:%S',
force=True # Перезаписывает любую существующую конфигурацию
)
handlers = []
# <ACTION name="create_log_directory">
# Создаем директорию для логов, если она не существует.
os.makedirs(settings.log_dir, exist_ok=True)
# </ACTION>
# <ACTION name="configure_file_handler">
# Добавляем обработчик для записи логов в файл.
log_file_name = f"run_{run_id}.log"
file_handler = logging.FileHandler(settings.log_dir / log_file_name, mode='w', encoding='utf-8')
file_handler.setLevel(logging.INFO)
file_handler.setFormatter(logging.Formatter(log_format))
handlers.append(file_handler)
# </ACTION>
# <DEPENDENCY name="settings.log_to_db" />
if settings.log_to_db and db_manager:
# <ERROR_HANDLER>
try:
root_logger = logging.getLogger('')
db_handler = DatabaseLogHandler(db_manager, run_id)
db_handler.setLevel(logging.DEBUG)
db_handler.setFormatter(logging.Formatter(log_format))
root_logger.addHandler(db_handler)
logging.info("Обработчик логов для записи в <20><>азу данных успешно добавлен.")
handlers.append(db_handler)
except Exception as e:
logging.error(f"Не удалось инициализировать обработчик логов для БД: {e}")
# Логирование еще не настроено, используем print для этой критической ошибки
print(f"Не удалось инициализировать обработчик логов для БД: {e}")
# </ERROR_HANDLER>
logging.basicConfig(
level=logging.DEBUG,
format=log_format,
datefmt='%Y-%m-%d %H:%M:%S',
handlers=handlers,
force=True # Перезаписывает любую существующую конфигурацию
)
if any(isinstance(h, DatabaseLogHandler) for h in handlers):
logging.info("Обработчик логов для записи в базу данных успешно добавлен.")
logging.info("Система логирования инициализирована.")
# </CORE_LOGIC>
# <COHERENCE_CHECK status="PASSED" />

View File

@@ -6,10 +6,14 @@
# </DESIGN_NOTE>
# <IMPORTS>
import logging
from pydantic import BaseModel, Field, HttpUrl
from pydantic.functional_validators import model_validator
from datetime import datetime
from typing import List
from typing import List, Optional
import uuid
logger = logging.getLogger(__name__)
# </IMPORTS>
# <MAIN_CONTRACT for="ProductVariant">
@@ -19,8 +23,18 @@ import uuid
class ProductVariant(BaseModel):
# <STATE name="product_variant_fields">
name: str = Field(..., description="Название продукта.")
volume: str = Field(..., description="Объем или вариант продукта (например, '50мл', '10 капсул').")
price: int = Field(..., gt=0, description="Цена продукта в числовом формате, должна быть положительной.")
volume: Optional[str] = Field(None, description="Объем или вариант продукта (например, '50мл', '10 капсул'). Может быть пустым, если не применимо.")
price: int = Field(..., description="Цена продукта в числовом формате. Должна быть положительной, если товар в наличии, иначе может быть 0.")
is_in_stock: bool = Field(..., description="Наличие товара.")
@model_validator(mode='after')
def validate_price_based_on_stock(self) -> 'ProductVariant':
if not self.is_in_stock and self.price != 0:
logger.warning(f"[CONTRACT_VIOLATION] Product '{self.name}' (URL: {self.url}) is out of stock but has a non-zero price ({self.price}). Setting price to 0.")
self.price = 0
elif self.is_in_stock and self.price <= 0:
raise ValueError("Price must be greater than 0 for in-stock products.")
return self
url: HttpUrl = Field(..., description="Полный URL страницы варианта продукта.")
is_in_stock: bool = Field(..., description="Наличие товара.")
# </STATE>

View File

@@ -67,22 +67,29 @@ class Settings(BaseModel):
# <CONFIG name="logging_settings">
log_to_db: bool = Field(default=os.getenv('PARSER_LOG_TO_DB', 'true').lower() == 'true')
log_dir: Path = Field(default=BASE_DIR / "logs", description="Директория для сохранения логов")
# </CONFIG>
# <CONFIG name="performance_settings">
request_timeout: int = Field(default=int(os.getenv('PARSER_TIMEOUT', 30)))
delay_between_requests: float = Field(default=float(os.getenv('PARSER_DELAY', 1.0)))
max_retries: int = Field(default=int(os.getenv('PARSER_RETRIES', 3)))
num_parser_threads: int = Field(default=int(os.getenv('PARSER_THREADS', 5)), description="Количество потоков для парсинга")
# </CONFIG>
# <CONFIG name="telegram_settings">
telegram_bot_token: str = Field(default=os.getenv('TELEGRAM_BOT_TOKEN', ''), description="Токен для Telegram бота")
telegram_chat_id: str = Field(default=os.getenv('TELEGRAM_CHAT_ID', ''), description="ID чата для отправки уведомлений")
# </CONFIG>
# <CONFIG name="selectors_config_instance">
selectors: ScraperSelectors = ScraperSelectors(
CATALOG_PRODUCT_LINK='.product-card h4 a.product-link',
VARIANT_LIST_ITEM='.product-version-select li',
VARIANT_LIST_ITEM='.product-version-select li, .product-variants-list .variant-item',
PRODUCT_PAGE_NAME='h1.product-h1',
ACTIVE_VOLUME='.product-version-select li.active',
PRICE_BLOCK='.product-sale-box .price span',
PRODUCT_UNAVAILABLE='.product-unavailable',
ACTIVE_VOLUME='.product-version-select li.active, .variant-item.active',
PRICE_BLOCK='.price-value, .product-price .price, .product-sale-box .price span',
PRODUCT_UNAVAILABLE='.product-unavailable, .out-of-stock, .unavailable, .stock.status-0',
)
# </CONFIG>

View File

@@ -7,8 +7,10 @@
# <IMPORTS>
import sys
import logging
from datetime import datetime
from orchestrator import AppOrchestrator
from core.settings import settings
from core.logging_config import setup_logging
# </IMPORTS>
# <CONTRACT for="main">
@@ -20,17 +22,15 @@ from core.settings import settings
# exceptions:
# - "ValueError: при ошибках в конфигурации."
# - "KeyboardInterrupt: при прерывании пользователем."
# - "Exception: при любых других критических ошибках."
# - "Exception: при любых других критических ошибка<EFBFBD><EFBFBD>."
# </CONTRACT>
# <ENTRYPOINT name="main">
def main():
"""Точка входа в приложение."""
# <INIT>
logging.basicConfig(
level=logging.INFO,
format='[%(asctime)s] [%(levelname)s] %(name)s: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
run_id = datetime.now().strftime("%Y%m%d-%H%M%S")
setup_logging(run_id=run_id)
logger = logging.getLogger(__name__)
# </INIT>
@@ -50,6 +50,7 @@ def main():
logger.info(f" • Таймаут запросов: {settings.request_timeout}с")
logger.info(f" • Задержка между запросами: {settings.delay_between_requests}с")
logger.info(f" • Максимум попыток: {settings.max_retries}")
logger.info(f" • Количество потоков: {settings.num_parser_threads}")
# <DEPENDENCY name="settings.validate_configuration" />
config_errors = settings.validate_configuration()

View File

@@ -12,6 +12,7 @@ import requests
from datetime import datetime
from typing import List, Optional
from contextlib import contextmanager
from concurrent.futures import ThreadPoolExecutor, as_completed
from core.settings import Settings, ENABLE_RABBITMQ_EXPORT, ENABLE_CSV_EXPORT, ENABLE_DATABASE_EXPORT
from core.models import ProductVariant
@@ -48,7 +49,6 @@ class AppOrchestrator:
# <DEPENDENCY name="Scraper" relationship="composition">
# Оркестратор владеет скрейпером и управляет его жизненным циклом.
self.scraper = Scraper(
session=self.http_session,
selectors=self.settings.selectors,
base_url=self.settings.base_url
)
@@ -155,41 +155,56 @@ class AppOrchestrator:
# </CORE_LOGIC>
# </ACTION>
# <CONTRACT for="_scrape_single_url">
# description: "Вспомогательный метод для парсинга одного URL в отдельном потоке."
# postconditions: "Возвращает ProductVariant или None в случае ошибки."
# </CONTRACT>
# <HELPER name="_scrape_single_url">
def _scrape_single_url(self, url: str) -> Optional[ProductVariant]:
try:
self.logger.info(f"[ACTION:_scrape_single_url] Парсинг URL: {url.split('/')[-1]}")
variant_data = self.scraper.scrape_variant_page(
variant_url=url,
run_id=self.run_id
)
if variant_data:
self.logger.debug(f"[ACTION:_scrape_single_url] Успешно спарсен URL: {url.split('/')[-1]}")
return variant_data
else:
self.logger.warning(f"[ACTION:_scrape_single_url] Данные не получены для URL: {url.split('/')[-1]}")
return None
except Exception as e:
self.logger.error(f"[ACTION:_scrape_single_url] Ошибка при парсинге URL ({url}): {e}")
return None
# </HELPER>
# <CONTRACT for="_scrape_data">
# description: "Шаг 3: Итеративный парсинг данных по списку URL."
# description: "Шаг 3: Параллельный парсинг данных по списку URL с использованием ThreadPoolExecutor."
# </CONTRACT>
# <ACTION name="_scrape_data">
def _scrape_data(self, urls: List[str]):
"""Итеративный парсинг данных."""
"""Параллельный парсинг данных."""
with self._error_context("scrape_data"):
# <CORE_LOGIC>
total_to_scrape = len(urls)
self.logger.info(f"[ACTION:_scrape_data] Начало парсинга {total_to_scrape} URL вариантов.")
self.logger.info(f"[ACTION:_scrape_data] Начало параллельного парсинга {total_to_scrape} URL вариантов с {self.settings.num_parser_threads} потоками.")
for i, url in enumerate(urls):
# <ERROR_HANDLER for="single_url_scrape">
try:
self.logger.info(f"[ACTION:_scrape_data] Парсинг URL {i+1}/{total_to_scrape}: {url.split('/')[-1]}")
time.sleep(1)
# <DEPENDENCY name="scraper.scrape_variant_page" />
variant_data = self.scraper.scrape_variant_page(
variant_url=url,
run_id=self.run_id
)
if variant_data:
self.final_data.append(variant_data)
self.stats['successful_parses'] += 1
else:
with ThreadPoolExecutor(max_workers=self.settings.num_parser_threads) as executor:
future_to_url = {executor.submit(self._scrape_single_url, url): url for url in urls}
for i, future in enumerate(as_completed(future_to_url)):
url = future_to_url[future]
try:
variant_data = future.result()
if variant_data:
self.final_data.append(variant_data)
self.stats['successful_parses'] += 1
else:
self.stats['failed_parses'] += 1
except Exception as e:
self.stats['failed_parses'] += 1
self.logger.error(f"[ACTION:_scrape_data] Необработанная ошибка в потоке для URL ({url}): {e}")
except Exception as e:
self.stats['failed_parses'] += 1
self.logger.error(f"[ACTION:_scrape_data] Ошибка при парсинге URL {i+1}/{total_to_scrape} ({url}): {e}")
continue
# </ERROR_HANDLER>
self.logger.info(f"[ACTION:_scrape_data] Парсинг данных завершен. Всего собрано {len(self.final_data)} валидных вариантов.")
self.logger.info(f"[STATS][ACTION:_scrape_data] Успешно: {self.stats['successful_parses']}, Ошибок: {self.stats['failed_parses']}")
# </CORE_LOGIC>

View File

@@ -26,25 +26,20 @@ from core.settings import ScraperSelectors
# </MAIN_CONTRACT>
class Scraper:
# <INIT name="__init__">
def __init__(self, session: requests.Session, selectors: ScraperSelectors, base_url: str):
def __init__(self, selectors: ScraperSelectors, base_url: str):
"""Инициализирует скрейпер с зависимостями: сессия, селекторы и базовый URL."""
# <STATE name="initial_state">
self.session = session
self.selectors = selectors
self.base_url = base_url
self.logger = logging.getLogger(self.__class__.__name__)
# </STATE>
# <ACTION name="setup_retry_strategy_on_init">
self._setup_retry_strategy()
# </ACTION>
# </INIT>
# <CONTRACT for="_setup_retry_strategy">
# description: "Настраивает retry-логику для HTTP-адаптера сессии."
# </CONTRACT>
# <HELPER name="_setup_retry_strategy">
def _setup_retry_strategy(self):
def _setup_retry_strategy(self, session: requests.Session):
"""Настраивает retry стратегию для HTTP запросов."""
# <CORE_LOGIC>
retry_strategy = Retry(
@@ -54,12 +49,27 @@ class Scraper:
allowed_methods=["HEAD", "GET", "OPTIONS"]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
self.session.mount("http://", adapter)
self.session.mount("https://", adapter)
session.mount("http://", adapter)
session.mount("https://", adapter)
self.logger.debug("[HELPER:_setup_retry_strategy] Retry стратегия настроена для HTTP запросов.")
# </CORE_LOGIC>
# </HELPER>
# <CONTRACT for="_find_element_with_multiple_selectors">
# description: "Пытается найти элемент, используя список CSS-селекторов."
# postconditions: "Возвращает найденный элемент BeautifulSoup или None."
# </CONTRACT>
# <HELPER name="_find_element_with_multiple_selectors">
def _find_element_with_multiple_selectors(self, soup: BeautifulSoup, selectors: List[str], log_prefix: str, element_name: str):
for selector in selectors:
element = soup.select_one(selector)
if element:
self.logger.debug(f"{log_prefix} [FOUND_ELEMENT] Элемент '{element_name}' найден с селектором: '{selector}'")
return element
self.logger.warning(f"{log_prefix} [MISSING_ELEMENT] Элемент '{element_name}' не найден ни с одним из селекторов: {selectors}")
return None
# </HELPER>
# <CONTRACT for="_clean_price">
# description: "Очищает строковое представление цены, оставляя только цифры."
# postconditions: "Возвращает целое число или 0 в случае ошибки."
@@ -93,7 +103,7 @@ class Scraper:
# postconditions: "Возвращает текстовое содержимое страницы или None в случае любой ошибки."
# </CONTRACT>
# <HELPER name="_fetch_page">
def _fetch_page(self, url: str, request_id: str) -> Optional[str]:
def _fetch_page(self, session: requests.Session, url: str, request_id: str) -> Optional[str]:
"""Приватный метод для скачивания HTML-содержимого страницы."""
log_prefix = f"[HELPER:_fetch_page(id={request_id})]"
self.logger.debug(f"{log_prefix} Запрос к URL: {url}")
@@ -101,7 +111,7 @@ class Scraper:
# <ERROR_HANDLER for="network_requests">
try:
# <CORE_LOGIC>
response = self.session.get(url, timeout=30)
response = session.get(url, timeout=30)
response.raise_for_status()
if not response.text.strip():
@@ -119,7 +129,7 @@ class Scraper:
self.logger.error(f"{log_prefix} [TIMEOUT] Превышено время ожидания для {url}")
return None
except requests.exceptions.ConnectionError as e:
self.logger.error(f"{log_prefix} [CONNECTION_ERROR] Ошибка соединения для {url}: {e}")
self.logger.error(f"{log_prefix} [CONNECTION_ERROR] Ошибка соединения дл<EFBFBD><EFBFBD> {url}: {e}")
return None
except requests.exceptions.HTTPError as e:
self.logger.error(f"{log_prefix} [HTTP_ERROR] HTTP ошибка для {url}: {e.response.status_code}")
@@ -144,7 +154,9 @@ class Scraper:
log_prefix = f"[ACTION:get_base_product_urls(id={run_id})]"
self.logger.info(f"{log_prefix} Начало сбора базовых URL с: {catalog_url}")
html = self._fetch_page(catalog_url, f"get_base_urls(id={run_id})")
with requests.Session() as session:
self._setup_retry_strategy(session)
html = self._fetch_page(session, catalog_url, f"get_base_urls(id={run_id})")
if not html:
self.logger.error(f"{log_prefix} [CRITICAL] Не удалось получить HTML страницы каталога, возвращаю пустой список.")
return []
@@ -196,7 +208,9 @@ class Scraper:
for i, base_url in enumerate(base_product_urls):
self.logger.info(f"{log_prefix} Обработка базового URL {i+1}/{total_base}: {base_url.split('/')[-1]}")
html = self._fetch_page(base_url, f"get_variant_urls(id={run_id})-{i+1}")
with requests.Session() as session:
self._setup_retry_strategy(session)
html = self._fetch_page(session, base_url, f"get_variant_urls(id={run_id})-{i+1}")
if not html:
self.logger.warning(f"{log_prefix} Пропуск базового URL из-за ошибки загрузки: {base_url}")
continue
@@ -224,9 +238,7 @@ class Scraper:
all_variant_urls.append(base_url)
# </FALLBACK>
time.sleep(0.5)
self.logger.info(f"{log_prefix} [COHERENCE_CHECK_PASSED] Обнаружено всего {len(all_variant_urls)} URL вариантов для парсинга.")
self.logger.info(f"{log_prefix} [COHERENCE_CHECK_PASSED] Обнаружено всего {len(all_variant_urls)} URL вариантов для парсинга.")
return all_variant_urls
# </CORE_LOGIC>
# </ACTION>
@@ -240,9 +252,11 @@ class Scraper:
def scrape_variant_page(self, variant_url: str, run_id: str) -> Optional[ProductVariant]:
"""Парсит страницу одного варианта и возвращает Pydantic-модель."""
log_prefix = f"[ACTION:scrape_variant_page(id={run_id}, url={variant_url.split('/')[-1]})]"
self.logger.info(f"{log_prefix} Начало парсинга страниц<EFBFBD><EFBFBD> варианта.")
self.logger.info(f"{log_prefix} Начало парсинга страницы варианта.")
html = self._fetch_page(variant_url, f"scrape_variant(id={run_id}, url={variant_url.split('/')[-1]})")
with requests.Session() as session:
self._setup_retry_strategy(session)
html = self._fetch_page(session, variant_url, f"scrape_variant(id={run_id}, url={variant_url.split('/')[-1]})")
if not html:
self.logger.warning(f"{log_prefix} Не удалось получить HTML страницы варианта, пропуск парсинга.")
return None
@@ -251,10 +265,10 @@ class Scraper:
try:
soup = BeautifulSoup(html, 'html.parser')
name_el = soup.select_one(self.selectors.product_page_name)
price_el = soup.select_one(self.selectors.price_block)
volume_el = soup.select_one(self.selectors.active_volume)
unavailable_el = soup.select_one(self.selectors.product_unavailable)
name_el = self._find_element_with_multiple_selectors(soup, [self.selectors.product_page_name, 'h1.product-title', 'h2.product-name'], log_prefix, 'имени продукта')
price_el = self._find_element_with_multiple_selectors(soup, [self.selectors.price_block, 'span.price', 'div.price'], log_prefix, 'цены')
volume_el = self._find_element_with_multiple_selectors(soup, [self.selectors.active_volume, '.product-options .active'], log_prefix, 'объема')
unavailable_el = self._find_element_with_multiple_selectors(soup, [self.selectors.product_unavailable, 'div.out-of-stock', 'span.unavailable'], log_prefix, 'отсутствия в наличии')
# <PRECONDITION>
# Товар должен иметь имя. Цена или статус "нет в наличии" должны присутствовать.
@@ -262,7 +276,7 @@ class Scraper:
self.logger.warning(f"{log_prefix} [MISSING_ELEMENT] Не найден элемент имени продукта с селектором: {self.selectors.product_page_name}")
return None
if not price_el and not unavailable_el:
self.logger.warning(f"{log_prefix} [MISSING_ELEMENT] Не найден ни элемент цены, ни элемент отсутствия в наличии.")
self.logger.warning(f"{log_prefix} [MISSING_ELEMENT] Не найден ни элемент цены, ни элемент отсутствия в наличи<EFBFBD><EFBFBD>.")
return None
# </PRECONDITION>
@@ -293,6 +307,7 @@ class Scraper:
# <POSTCONDITION>
# <CONTRACT_VALIDATOR by="Pydantic">
try:
self.logger.debug(f"{log_prefix} [DEBUG_SCRAPE_DATA] Name: '{name}', Price: {price}, URL: '{variant_url}', IsInStock: {is_in_stock}")
product = ProductVariant(name=name, volume=volume, price=price, url=variant_url, is_in_stock=is_in_stock)
self.logger.debug(f"{log_prefix} [COHERENCE_CHECK_PASSED] Успешно распарсен вариант: '{product.name}' | InStock: {product.is_in_stock} | Price: '{product.price}'")
return product
@@ -311,4 +326,4 @@ class Scraper:
# </FALLBACK>
# </CORE_LOGIC>
# </ACTION>
# <COHERENCE_CHECK status="PASSED" />
# <COHERENCE_CHECK status="PASSED" />

View File

@@ -0,0 +1,72 @@
# <MODULE name="utils.telegram_sender" semantics="telegram_notifications" />
# <IMPORTS>
import logging
import asyncio
from telegram import Bot
from telegram.error import TelegramError
from src.core.settings import settings
# </IMPORTS>
# <MAIN_CONTRACT for="TelegramSender">
# description: "Отправляет сообщения в Telegram, используя токен и ID чата из настроек."
# invariant: "Токен и ID чата должны быть заданы в конфигурации."
# </MAIN_CONTRACT>
class TelegramSender:
# <INIT>
def __init__(self):
if not settings.telegram_bot_token or not settings.telegram_chat_id:
raise ValueError("TELEGRAM_BOT_TOKEN и TELEGRAM_CHAT_ID должны быть установлены в .env")
self.bot = Bot(token=settings.telegram_bot_token)
self.chat_id = settings.telegram_chat_id
self.logger = logging.getLogger(__name__)
# </INIT>
# <CONTRACT for="send_message">
# description: "Асинхронно отправляет текстовое сообщение <20><> Telegram."
# preconditions:
# - "Сообщение (message) должно быть непустой строкой."
# postconditions:
# - "Сообщение отправлено в чат."
# exceptions:
# - "TelegramError: при ошибках API Telegram."
# - "Exception: при других ошибках."
# </CONTRACT>
# <ACTION name="send_message">
async def send_message(self, message: str):
"""Асинхронно отправляет сообщение в Telegram."""
# <CORE_LOGIC>
try:
self.logger.info(f"Отправка сообщения в Telegram (чат ID: {self.chat_id})...")
await self.bot.send_message(
chat_id=self.chat_id,
text=message,
parse_mode='HTML'
)
self.logger.info("Сообщение в Telegram успешно отправлено.")
except TelegramError as e:
self.logger.error(f"Ошибка отправки сообщения в Telegram: {e}")
raise
except Exception as e:
self.logger.error(f"Непредвиденная ошибка при отправке сообщения в Telegram: {e}")
raise
# </CORE_LOGIC>
# </ACTION>
# <HELPER name="send_telegram_notification">
# description: "Удобная обертка для инициализации и отправки сообщения."
# </HELPER>
async def send_telegram_notification(message: str):
"""
Инициализирует TelegramSender и отправляет сообщение.
Является удобной оберткой для вызова из других частей приложения.
"""
try:
sender = TelegramSender()
await sender.send_message(message)
except ValueError as e:
logging.error(f"Ошибка инициализации TelegramSender: {e}")
except Exception as e:
logging.error(f"Не удалось отправить уведомление в Telegram: {e}")
# <COHERENCE_CHECK status="PASSED" />