This commit is contained in:
2025-07-19 01:04:04 +03:00
parent fe1a022609
commit 0259289ee9
6 changed files with 107 additions and 57 deletions

5
.gitignore vendored
View File

@@ -19,4 +19,7 @@ price_data_final/
.vscode/ .vscode/
#backups #backups
*.bak *.bak
# Logs
/logs/

View File

@@ -6,10 +6,14 @@
# </DESIGN_NOTE> # </DESIGN_NOTE>
# <IMPORTS> # <IMPORTS>
import logging
from pydantic import BaseModel, Field, HttpUrl from pydantic import BaseModel, Field, HttpUrl
from pydantic.functional_validators import model_validator
from datetime import datetime from datetime import datetime
from typing import List from typing import List, Optional
import uuid import uuid
logger = logging.getLogger(__name__)
# </IMPORTS> # </IMPORTS>
# <MAIN_CONTRACT for="ProductVariant"> # <MAIN_CONTRACT for="ProductVariant">
@@ -19,8 +23,18 @@ import uuid
class ProductVariant(BaseModel): class ProductVariant(BaseModel):
# <STATE name="product_variant_fields"> # <STATE name="product_variant_fields">
name: str = Field(..., description="Название продукта.") name: str = Field(..., description="Название продукта.")
volume: str = Field(..., description="Объем или вариант продукта (например, '50мл', '10 капсул').") volume: Optional[str] = Field(None, description="Объем или вариант продукта (например, '50мл', '10 капсул'). Может быть пустым, если не применимо.")
price: int = Field(..., gt=0, description="Цена продукта в числовом формате, должна быть положительной.") price: int = Field(..., description="Цена продукта в числовом формате. Должна быть положительной, если товар в наличии, иначе может быть 0.")
is_in_stock: bool = Field(..., description="Наличие товара.")
@model_validator(mode='after')
def validate_price_based_on_stock(self) -> 'ProductVariant':
if not self.is_in_stock and self.price != 0:
logger.warning(f"[CONTRACT_VIOLATION] Product '{self.name}' (URL: {self.url}) is out of stock but has a non-zero price ({self.price}). Setting price to 0.")
self.price = 0
elif self.is_in_stock and self.price <= 0:
raise ValueError("Price must be greater than 0 for in-stock products.")
return self
url: HttpUrl = Field(..., description="Полный URL страницы варианта продукта.") url: HttpUrl = Field(..., description="Полный URL страницы варианта продукта.")
is_in_stock: bool = Field(..., description="Наличие товара.") is_in_stock: bool = Field(..., description="Наличие товара.")
# </STATE> # </STATE>

View File

@@ -67,12 +67,14 @@ class Settings(BaseModel):
# <CONFIG name="logging_settings"> # <CONFIG name="logging_settings">
log_to_db: bool = Field(default=os.getenv('PARSER_LOG_TO_DB', 'true').lower() == 'true') log_to_db: bool = Field(default=os.getenv('PARSER_LOG_TO_DB', 'true').lower() == 'true')
log_dir: Path = Field(default=BASE_DIR / "logs", description="Директория для сохранения логов")
# </CONFIG> # </CONFIG>
# <CONFIG name="performance_settings"> # <CONFIG name="performance_settings">
request_timeout: int = Field(default=int(os.getenv('PARSER_TIMEOUT', 30))) request_timeout: int = Field(default=int(os.getenv('PARSER_TIMEOUT', 30)))
delay_between_requests: float = Field(default=float(os.getenv('PARSER_DELAY', 1.0))) delay_between_requests: float = Field(default=float(os.getenv('PARSER_DELAY', 1.0)))
max_retries: int = Field(default=int(os.getenv('PARSER_RETRIES', 3))) max_retries: int = Field(default=int(os.getenv('PARSER_RETRIES', 3)))
num_parser_threads: int = Field(default=int(os.getenv('PARSER_THREADS', 5)), description="Количество потоков для парсинга")
# </CONFIG> # </CONFIG>
# <CONFIG name="selectors_config_instance"> # <CONFIG name="selectors_config_instance">
@@ -81,8 +83,8 @@ class Settings(BaseModel):
VARIANT_LIST_ITEM='.product-version-select li', VARIANT_LIST_ITEM='.product-version-select li',
PRODUCT_PAGE_NAME='h1.product-h1', PRODUCT_PAGE_NAME='h1.product-h1',
ACTIVE_VOLUME='.product-version-select li.active', ACTIVE_VOLUME='.product-version-select li.active',
PRICE_BLOCK='.product-sale-box .price span', PRICE_BLOCK='.product-sale-box .price span, .price-value, .product-price, .price',
PRODUCT_UNAVAILABLE='.product-unavailable', PRODUCT_UNAVAILABLE='.product-unavailable, .out-of-stock-message, .unavailable-message, .stock-status.out-of-stock, li.not-available, div.disabled',
) )
# </CONFIG> # </CONFIG>

View File

@@ -50,6 +50,7 @@ def main():
logger.info(f" • Таймаут запросов: {settings.request_timeout}с") logger.info(f" • Таймаут запросов: {settings.request_timeout}с")
logger.info(f" • Задержка между запросами: {settings.delay_between_requests}с") logger.info(f" • Задержка между запросами: {settings.delay_between_requests}с")
logger.info(f" • Максимум попыток: {settings.max_retries}") logger.info(f" • Максимум попыток: {settings.max_retries}")
logger.info(f" • Количество потоков: {settings.num_parser_threads}")
# <DEPENDENCY name="settings.validate_configuration" /> # <DEPENDENCY name="settings.validate_configuration" />
config_errors = settings.validate_configuration() config_errors = settings.validate_configuration()

View File

@@ -12,6 +12,7 @@ import requests
from datetime import datetime from datetime import datetime
from typing import List, Optional from typing import List, Optional
from contextlib import contextmanager from contextlib import contextmanager
from concurrent.futures import ThreadPoolExecutor, as_completed
from core.settings import Settings, ENABLE_RABBITMQ_EXPORT, ENABLE_CSV_EXPORT, ENABLE_DATABASE_EXPORT from core.settings import Settings, ENABLE_RABBITMQ_EXPORT, ENABLE_CSV_EXPORT, ENABLE_DATABASE_EXPORT
from core.models import ProductVariant from core.models import ProductVariant
@@ -48,7 +49,6 @@ class AppOrchestrator:
# <DEPENDENCY name="Scraper" relationship="composition"> # <DEPENDENCY name="Scraper" relationship="composition">
# Оркестратор владеет скрейпером и управляет его жизненным циклом. # Оркестратор владеет скрейпером и управляет его жизненным циклом.
self.scraper = Scraper( self.scraper = Scraper(
session=self.http_session,
selectors=self.settings.selectors, selectors=self.settings.selectors,
base_url=self.settings.base_url base_url=self.settings.base_url
) )
@@ -155,41 +155,56 @@ class AppOrchestrator:
# </CORE_LOGIC> # </CORE_LOGIC>
# </ACTION> # </ACTION>
# <CONTRACT for="_scrape_single_url">
# description: "Вспомогательный метод для парсинга одного URL в отдельном потоке."
# postconditions: "Возвращает ProductVariant или None в случае ошибки."
# </CONTRACT>
# <HELPER name="_scrape_single_url">
def _scrape_single_url(self, url: str) -> Optional[ProductVariant]:
try:
self.logger.info(f"[ACTION:_scrape_single_url] Парсинг URL: {url.split('/')[-1]}")
variant_data = self.scraper.scrape_variant_page(
variant_url=url,
run_id=self.run_id
)
if variant_data:
self.logger.debug(f"[ACTION:_scrape_single_url] Успешно спарсен URL: {url.split('/')[-1]}")
return variant_data
else:
self.logger.warning(f"[ACTION:_scrape_single_url] Данные не получены для URL: {url.split('/')[-1]}")
return None
except Exception as e:
self.logger.error(f"[ACTION:_scrape_single_url] Ошибка при парсинге URL ({url}): {e}")
return None
# </HELPER>
# <CONTRACT for="_scrape_data"> # <CONTRACT for="_scrape_data">
# description: "Шаг 3: Итеративный парсинг данных по списку URL." # description: "Шаг 3: Параллельный парсинг данных по списку URL с использованием ThreadPoolExecutor."
# </CONTRACT> # </CONTRACT>
# <ACTION name="_scrape_data"> # <ACTION name="_scrape_data">
def _scrape_data(self, urls: List[str]): def _scrape_data(self, urls: List[str]):
"""Итеративный парсинг данных.""" """Параллельный парсинг данных."""
with self._error_context("scrape_data"): with self._error_context("scrape_data"):
# <CORE_LOGIC> # <CORE_LOGIC>
total_to_scrape = len(urls) total_to_scrape = len(urls)
self.logger.info(f"[ACTION:_scrape_data] Начало парсинга {total_to_scrape} URL вариантов.") self.logger.info(f"[ACTION:_scrape_data] Начало параллельного парсинга {total_to_scrape} URL вариантов с {self.settings.num_parser_threads} потоками.")
for i, url in enumerate(urls): with ThreadPoolExecutor(max_workers=self.settings.num_parser_threads) as executor:
# <ERROR_HANDLER for="single_url_scrape"> future_to_url = {executor.submit(self._scrape_single_url, url): url for url in urls}
try:
self.logger.info(f"[ACTION:_scrape_data] Парсинг URL {i+1}/{total_to_scrape}: {url.split('/')[-1]}") for i, future in enumerate(as_completed(future_to_url)):
time.sleep(1) url = future_to_url[future]
try:
# <DEPENDENCY name="scraper.scrape_variant_page" /> variant_data = future.result()
variant_data = self.scraper.scrape_variant_page( if variant_data:
variant_url=url, self.final_data.append(variant_data)
run_id=self.run_id self.stats['successful_parses'] += 1
) else:
self.stats['failed_parses'] += 1
if variant_data: except Exception as e:
self.final_data.append(variant_data)
self.stats['successful_parses'] += 1
else:
self.stats['failed_parses'] += 1 self.stats['failed_parses'] += 1
self.logger.error(f"[ACTION:_scrape_data] Необработанная ошибка в потоке для URL ({url}): {e}")
except Exception as e:
self.stats['failed_parses'] += 1
self.logger.error(f"[ACTION:_scrape_data] Ошибка при парсинге URL {i+1}/{total_to_scrape} ({url}): {e}")
continue
# </ERROR_HANDLER>
self.logger.info(f"[ACTION:_scrape_data] Парсинг данных завершен. Всего собрано {len(self.final_data)} валидных вариантов.") self.logger.info(f"[ACTION:_scrape_data] Парсинг данных завершен. Всего собрано {len(self.final_data)} валидных вариантов.")
self.logger.info(f"[STATS][ACTION:_scrape_data] Успешно: {self.stats['successful_parses']}, Ошибок: {self.stats['failed_parses']}") self.logger.info(f"[STATS][ACTION:_scrape_data] Успешно: {self.stats['successful_parses']}, Ошибок: {self.stats['failed_parses']}")
# </CORE_LOGIC> # </CORE_LOGIC>

View File

@@ -26,25 +26,20 @@ from core.settings import ScraperSelectors
# </MAIN_CONTRACT> # </MAIN_CONTRACT>
class Scraper: class Scraper:
# <INIT name="__init__"> # <INIT name="__init__">
def __init__(self, session: requests.Session, selectors: ScraperSelectors, base_url: str): def __init__(self, selectors: ScraperSelectors, base_url: str):
"""Инициализирует скрейпер с зависимостями: сессия, селекторы и базовый URL.""" """Инициализирует скрейпер с зависимостями: сессия, селекторы и базовый URL."""
# <STATE name="initial_state"> # <STATE name="initial_state">
self.session = session
self.selectors = selectors self.selectors = selectors
self.base_url = base_url self.base_url = base_url
self.logger = logging.getLogger(self.__class__.__name__) self.logger = logging.getLogger(self.__class__.__name__)
# </STATE> # </STATE>
# <ACTION name="setup_retry_strategy_on_init">
self._setup_retry_strategy()
# </ACTION>
# </INIT> # </INIT>
# <CONTRACT for="_setup_retry_strategy"> # <CONTRACT for="_setup_retry_strategy">
# description: "Настраивает retry-логику для HTTP-адаптера сессии." # description: "Настраивает retry-логику для HTTP-адаптера сессии."
# </CONTRACT> # </CONTRACT>
# <HELPER name="_setup_retry_strategy"> # <HELPER name="_setup_retry_strategy">
def _setup_retry_strategy(self): def _setup_retry_strategy(self, session: requests.Session):
"""Настраивает retry стратегию для HTTP запросов.""" """Настраивает retry стратегию для HTTP запросов."""
# <CORE_LOGIC> # <CORE_LOGIC>
retry_strategy = Retry( retry_strategy = Retry(
@@ -54,12 +49,27 @@ class Scraper:
allowed_methods=["HEAD", "GET", "OPTIONS"] allowed_methods=["HEAD", "GET", "OPTIONS"]
) )
adapter = HTTPAdapter(max_retries=retry_strategy) adapter = HTTPAdapter(max_retries=retry_strategy)
self.session.mount("http://", adapter) session.mount("http://", adapter)
self.session.mount("https://", adapter) session.mount("https://", adapter)
self.logger.debug("[HELPER:_setup_retry_strategy] Retry стратегия настроена для HTTP запросов.") self.logger.debug("[HELPER:_setup_retry_strategy] Retry стратегия настроена для HTTP запросов.")
# </CORE_LOGIC> # </CORE_LOGIC>
# </HELPER> # </HELPER>
# <CONTRACT for="_find_element_with_multiple_selectors">
# description: "Пытается найти элемент, используя список CSS-селекторов."
# postconditions: "Возвращает найденный элемент BeautifulSoup или None."
# </CONTRACT>
# <HELPER name="_find_element_with_multiple_selectors">
def _find_element_with_multiple_selectors(self, soup: BeautifulSoup, selectors: List[str], log_prefix: str, element_name: str):
for selector in selectors:
element = soup.select_one(selector)
if element:
self.logger.debug(f"{log_prefix} [FOUND_ELEMENT] Элемент '{element_name}' найден с селектором: '{selector}'")
return element
self.logger.warning(f"{log_prefix} [MISSING_ELEMENT] Элемент '{element_name}' не найден ни с одним из селекторов: {selectors}")
return None
# </HELPER>
# <CONTRACT for="_clean_price"> # <CONTRACT for="_clean_price">
# description: "Очищает строковое представление цены, оставляя только цифры." # description: "Очищает строковое представление цены, оставляя только цифры."
# postconditions: "Возвращает целое число или 0 в случае ошибки." # postconditions: "Возвращает целое число или 0 в случае ошибки."
@@ -93,7 +103,7 @@ class Scraper:
# postconditions: "Возвращает текстовое содержимое страницы или None в случае любой ошибки." # postconditions: "Возвращает текстовое содержимое страницы или None в случае любой ошибки."
# </CONTRACT> # </CONTRACT>
# <HELPER name="_fetch_page"> # <HELPER name="_fetch_page">
def _fetch_page(self, url: str, request_id: str) -> Optional[str]: def _fetch_page(self, session: requests.Session, url: str, request_id: str) -> Optional[str]:
"""Приватный метод для скачивания HTML-содержимого страницы.""" """Приватный метод для скачивания HTML-содержимого страницы."""
log_prefix = f"[HELPER:_fetch_page(id={request_id})]" log_prefix = f"[HELPER:_fetch_page(id={request_id})]"
self.logger.debug(f"{log_prefix} Запрос к URL: {url}") self.logger.debug(f"{log_prefix} Запрос к URL: {url}")
@@ -101,7 +111,7 @@ class Scraper:
# <ERROR_HANDLER for="network_requests"> # <ERROR_HANDLER for="network_requests">
try: try:
# <CORE_LOGIC> # <CORE_LOGIC>
response = self.session.get(url, timeout=30) response = session.get(url, timeout=30)
response.raise_for_status() response.raise_for_status()
if not response.text.strip(): if not response.text.strip():
@@ -119,7 +129,7 @@ class Scraper:
self.logger.error(f"{log_prefix} [TIMEOUT] Превышено время ожидания для {url}") self.logger.error(f"{log_prefix} [TIMEOUT] Превышено время ожидания для {url}")
return None return None
except requests.exceptions.ConnectionError as e: except requests.exceptions.ConnectionError as e:
self.logger.error(f"{log_prefix} [CONNECTION_ERROR] Ошибка соединения для {url}: {e}") self.logger.error(f"{log_prefix} [CONNECTION_ERROR] Ошибка соединения дл<EFBFBD><EFBFBD> {url}: {e}")
return None return None
except requests.exceptions.HTTPError as e: except requests.exceptions.HTTPError as e:
self.logger.error(f"{log_prefix} [HTTP_ERROR] HTTP ошибка для {url}: {e.response.status_code}") self.logger.error(f"{log_prefix} [HTTP_ERROR] HTTP ошибка для {url}: {e.response.status_code}")
@@ -144,7 +154,9 @@ class Scraper:
log_prefix = f"[ACTION:get_base_product_urls(id={run_id})]" log_prefix = f"[ACTION:get_base_product_urls(id={run_id})]"
self.logger.info(f"{log_prefix} Начало сбора базовых URL с: {catalog_url}") self.logger.info(f"{log_prefix} Начало сбора базовых URL с: {catalog_url}")
html = self._fetch_page(catalog_url, f"get_base_urls(id={run_id})") with requests.Session() as session:
self._setup_retry_strategy(session)
html = self._fetch_page(session, catalog_url, f"get_base_urls(id={run_id})")
if not html: if not html:
self.logger.error(f"{log_prefix} [CRITICAL] Не удалось получить HTML страницы каталога, возвращаю пустой список.") self.logger.error(f"{log_prefix} [CRITICAL] Не удалось получить HTML страницы каталога, возвращаю пустой список.")
return [] return []
@@ -196,7 +208,9 @@ class Scraper:
for i, base_url in enumerate(base_product_urls): for i, base_url in enumerate(base_product_urls):
self.logger.info(f"{log_prefix} Обработка базового URL {i+1}/{total_base}: {base_url.split('/')[-1]}") self.logger.info(f"{log_prefix} Обработка базового URL {i+1}/{total_base}: {base_url.split('/')[-1]}")
html = self._fetch_page(base_url, f"get_variant_urls(id={run_id})-{i+1}") with requests.Session() as session:
self._setup_retry_strategy(session)
html = self._fetch_page(session, base_url, f"get_variant_urls(id={run_id})-{i+1}")
if not html: if not html:
self.logger.warning(f"{log_prefix} Пропуск базового URL из-за ошибки загрузки: {base_url}") self.logger.warning(f"{log_prefix} Пропуск базового URL из-за ошибки загрузки: {base_url}")
continue continue
@@ -224,9 +238,7 @@ class Scraper:
all_variant_urls.append(base_url) all_variant_urls.append(base_url)
# </FALLBACK> # </FALLBACK>
time.sleep(0.5) self.logger.info(f"{log_prefix} [COHERENCE_CHECK_PASSED] Обнаружено всего {len(all_variant_urls)} URL вариантов для парсинга.")
self.logger.info(f"{log_prefix} [COHERENCE_CHECK_PASSED] Обнаружено всего {len(all_variant_urls)} URL вариантов для парсинга.")
return all_variant_urls return all_variant_urls
# </CORE_LOGIC> # </CORE_LOGIC>
# </ACTION> # </ACTION>
@@ -240,9 +252,11 @@ class Scraper:
def scrape_variant_page(self, variant_url: str, run_id: str) -> Optional[ProductVariant]: def scrape_variant_page(self, variant_url: str, run_id: str) -> Optional[ProductVariant]:
"""Парсит страницу одного варианта и возвращает Pydantic-модель.""" """Парсит страницу одного варианта и возвращает Pydantic-модель."""
log_prefix = f"[ACTION:scrape_variant_page(id={run_id}, url={variant_url.split('/')[-1]})]" log_prefix = f"[ACTION:scrape_variant_page(id={run_id}, url={variant_url.split('/')[-1]})]"
self.logger.info(f"{log_prefix} Начало парсинга страниц<EFBFBD><EFBFBD> варианта.") self.logger.info(f"{log_prefix} Начало парсинга страницы варианта.")
html = self._fetch_page(variant_url, f"scrape_variant(id={run_id}, url={variant_url.split('/')[-1]})") with requests.Session() as session:
self._setup_retry_strategy(session)
html = self._fetch_page(session, variant_url, f"scrape_variant(id={run_id}, url={variant_url.split('/')[-1]})")
if not html: if not html:
self.logger.warning(f"{log_prefix} Не удалось получить HTML страницы варианта, пропуск парсинга.") self.logger.warning(f"{log_prefix} Не удалось получить HTML страницы варианта, пропуск парсинга.")
return None return None
@@ -251,10 +265,10 @@ class Scraper:
try: try:
soup = BeautifulSoup(html, 'html.parser') soup = BeautifulSoup(html, 'html.parser')
name_el = soup.select_one(self.selectors.product_page_name) name_el = self._find_element_with_multiple_selectors(soup, [self.selectors.product_page_name, 'h1.product-title', 'h2.product-name'], log_prefix, 'имени продукта')
price_el = soup.select_one(self.selectors.price_block) price_el = self._find_element_with_multiple_selectors(soup, [self.selectors.price_block, 'span.price', 'div.price'], log_prefix, 'цены')
volume_el = soup.select_one(self.selectors.active_volume) volume_el = self._find_element_with_multiple_selectors(soup, [self.selectors.active_volume, '.product-options .active'], log_prefix, 'объема')
unavailable_el = soup.select_one(self.selectors.product_unavailable) unavailable_el = self._find_element_with_multiple_selectors(soup, [self.selectors.product_unavailable, 'div.out-of-stock', 'span.unavailable'], log_prefix, 'отсутствия в наличии')
# <PRECONDITION> # <PRECONDITION>
# Товар должен иметь имя. Цена или статус "нет в наличии" должны присутствовать. # Товар должен иметь имя. Цена или статус "нет в наличии" должны присутствовать.
@@ -262,7 +276,7 @@ class Scraper:
self.logger.warning(f"{log_prefix} [MISSING_ELEMENT] Не найден элемент имени продукта с селектором: {self.selectors.product_page_name}") self.logger.warning(f"{log_prefix} [MISSING_ELEMENT] Не найден элемент имени продукта с селектором: {self.selectors.product_page_name}")
return None return None
if not price_el and not unavailable_el: if not price_el and not unavailable_el:
self.logger.warning(f"{log_prefix} [MISSING_ELEMENT] Не найден ни элемент цены, ни элемент отсутствия в наличии.") self.logger.warning(f"{log_prefix} [MISSING_ELEMENT] Не найден ни элемент цены, ни элемент отсутствия в наличи<EFBFBD><EFBFBD>.")
return None return None
# </PRECONDITION> # </PRECONDITION>
@@ -293,6 +307,7 @@ class Scraper:
# <POSTCONDITION> # <POSTCONDITION>
# <CONTRACT_VALIDATOR by="Pydantic"> # <CONTRACT_VALIDATOR by="Pydantic">
try: try:
self.logger.debug(f"{log_prefix} [DEBUG_SCRAPE_DATA] Name: '{name}', Price: {price}, URL: '{variant_url}', IsInStock: {is_in_stock}")
product = ProductVariant(name=name, volume=volume, price=price, url=variant_url, is_in_stock=is_in_stock) product = ProductVariant(name=name, volume=volume, price=price, url=variant_url, is_in_stock=is_in_stock)
self.logger.debug(f"{log_prefix} [COHERENCE_CHECK_PASSED] Успешно распарсен вариант: '{product.name}' | InStock: {product.is_in_stock} | Price: '{product.price}'") self.logger.debug(f"{log_prefix} [COHERENCE_CHECK_PASSED] Успешно распарсен вариант: '{product.name}' | InStock: {product.is_in_stock} | Price: '{product.price}'")
return product return product
@@ -311,4 +326,4 @@ class Scraper:
# </FALLBACK> # </FALLBACK>
# </CORE_LOGIC> # </CORE_LOGIC>
# </ACTION> # </ACTION>
# <COHERENCE_CHECK status="PASSED" /> # <COHERENCE_CHECK status="PASSED" />