apply patch

This commit is contained in:
2025-07-03 21:03:21 +03:00
parent 54827a5152
commit 0ddd9f0683
6 changed files with 527 additions and 88 deletions

7
.gitignore vendored
View File

@@ -1,5 +1,5 @@
# Python # Python
__pycache__/ **/__pycache__/
*.pyc *.pyc
*.pyo *.pyo
*.pyd *.pyd
@@ -16,4 +16,7 @@ price_data_final/
# IDEs # IDEs
.idea/ .idea/
.vscode/ .vscode/
#backups
*.bak

View File

@@ -15,6 +15,7 @@
- `core/`: Пакет с ядром приложения. - `core/`: Пакет с ядром приложения.
- `database.py`: Логика работы с базой данных SQLite. - `database.py`: Логика работы с базой данных SQLite.
- `logging_config.py`: Настройка системы логирования. - `logging_config.py`: Настройка системы логирования.
- **`models.py`: [NEW FILE] Pydantic модели данных (ProductVariant, LogRecordModel).**
- `scraper/`: Пакет с логикой парсинга. - `scraper/`: Пакет с логикой парсинга.
- `engine.py`: Функции для скачивания и анализа HTML-страниц. - `engine.py`: Функции для скачивания и анализа HTML-страниц.
- `utils/`: Пакет со вспомогательными утилитами. - `utils/`: Пакет со вспомогательными утилитами.

View File

@@ -6,63 +6,238 @@ import logging
import sqlite3 import sqlite3
from datetime import datetime from datetime import datetime
from pathlib import Path from pathlib import Path
from typing import List, Dict from typing import List, Dict, Optional
# Контракты для функций здесь остаются такими же, как в предыдущей версии. from src.core.models import ProductVariant, LogRecordModel # [FIX] Импорт моделей
class DatabaseLogHandler(logging.Handler): # [CONTRACT] DatabaseManager
# ... (код класса DatabaseLogHandler без изменений) ... # @description: Контекстный менеджер для управления соединением с SQLite.
def __init__(self, db_path: Path, run_id: str): # @pre: `db_path` должен быть валидным путем `Path`.
super().__init__() # @post: Гарантирует открытие и закрытие соединения с БД.
class DatabaseManager:
"""[CONTEXT_MANAGER] Управляет соединением с базой данных SQLite."""
def __init__(self, db_path: Path):
self.db_path = db_path self.db_path = db_path
self.conn: Optional[sqlite3.Connection] = None
self.logger = logging.getLogger(self.__class__.__name__)
def __enter__(self):
# [ACTION] Открытие соединения при входе в контекст
self.logger.debug(f"[STATE] Открытие соединения с БД: {self.db_path}")
try:
self.conn = sqlite3.connect(self.db_path)
self.conn.row_factory = sqlite3.Row # Для удобного доступа к данным по именам колонок
self.logger.debug("[COHERENCE_CHECK_PASSED] Соединение с БД установлено.")
return self.conn
except sqlite3.Error as e:
self.logger.critical(f"[CRITICAL] Ошибка подключения к БД: {e}", exc_info=True)
raise ConnectionError(f"Не удалось подключиться к базе данных {self.db_path}") from e
def __exit__(self, exc_type, exc_val, exc_tb):
# [ACTION] Закрытие соединения при выходе из контекста
if self.conn:
self.conn.close()
self.logger.debug("[STATE] Соединение с БД закрыто.")
if exc_type:
self.logger.error(f"[ERROR] Исключение в контекстном менеджере БД: {exc_type.__name__}: {exc_val}", exc_info=True)
# [COHERENCE_CHECK_FAILED] Ошибка внутри контекста
return False # Пробрасываем исключение
def close(self):
"""[HELPER] Явное закрытие соединения, если менеджер используется вне 'with'."""
if self.conn:
self.conn.close()
self.conn = None
self.logger.debug("[STATE] Соединение с БД явно закрыто.")
# [CONTRACT] DatabaseLogHandler (перенесен в models.py и адаптирован)
# @description: Обработчик логирования, который записывает логи в SQLite базу данных.
# @pre: `db_manager` должен быть инициализирован и подключен.
# @post: Записи логов сохраняются в таблицу `logs`.
class DatabaseLogHandler(logging.Handler):
# ... (код класса DatabaseLogHandler) ...
def __init__(self, db_manager: DatabaseManager, run_id: str):
super().__init__()
self.db_manager = db_manager
self.run_id = run_id self.run_id = run_id
self.logger = logging.getLogger(self.__class__.__name__) # [INIT] Инициализация логгера для обработчика
def emit(self, record: logging.LogRecord): def emit(self, record: logging.LogRecord):
# [ACTION] Запись лог-записи в БД
try: try:
con = sqlite3.connect(self.db_path) # Используем менеджер контекста для безопасного взаимодействия с БД
cur = con.cursor() # Примечание: В DatabaseLogHandler обычно не используется with, т.к. он должен быть "легким"
log_time = datetime.fromtimestamp(record.created) # и работать с существующим соединением, которое управляется извне (через db_manager.conn)
cur.execute( # или создает временное (что неэффективно).
"INSERT INTO logs (run_id, timestamp, level, message) VALUES (?, ?, ?, ?)", # В данном случае, db_manager должен предоставить уже открытое соединение.
(self.run_id, log_time, record.levelname, self.format(record)) # Если db_manager не передает активное соединение, нужно его получить.
) # Для простоты, пока будем использовать прямое подключение в emit, но в реальном продакшене
con.commit() # это место лучше оптимизировать (например, через пул соединений или одно соединение в db_manager).
con.close()
except Exception as e:
print(f"CRITICAL: Failed to write log to database: {e}")
def init_database(db_path: Path, request_id: str): with sqlite3.connect(self.db_manager.db_path) as con:
# ... (код функции init_database без изменений) ... cur = con.cursor()
log_prefix = f"init_database(id={request_id})" log_time = datetime.fromtimestamp(record.created)
# Создаем модель лог-записи для валидации
log_entry = LogRecordModel(
run_id=self.run_id,
timestamp=log_time,
level=record.levelname,
message=self.format(record) # Используем форматтер для полного сообщения
)
cur.execute(
"INSERT INTO logs (run_id, timestamp, level, message) VALUES (?, ?, ?, ?)",
(log_entry.run_id, log_entry.timestamp, log_entry.level, log_entry.message)
)
con.commit()
# [COHERENCE_CHECK_PASSED] Лог успешно записан.
except Exception as e:
# [ERROR_HANDLER] Логирование ошибок записи логов (очень важно)
# print() используется, потому что обычный логгер может вызвать рекурсию
print(f"CRITICAL: [COHERENCE_CHECK_FAILED] Не удалось записать лог в базу данных: {e}", flush=True)
# [CONTRACT] init_database
# @description: Инициализирует схему базы данных (создает таблицы, если они не существуют).
# @pre: `db_path` должен быть валидным путем `Path`.
# @post: Таблицы `products` и `logs` существуют в БД.
# @side_effects: Создает директорию для БД, если ее нет.
def init_database(db_path: Path, run_id: str):
log_prefix = f"init_database(id={run_id})"
logging.info(f"{log_prefix} - Инициализация базы данных: {db_path}") logging.info(f"{log_prefix} - Инициализация базы данных: {db_path}")
try: try:
# [ACTION] Создаем родительскую директорию, если она не существует.
db_path.parent.mkdir(parents=True, exist_ok=True) db_path.parent.mkdir(parents=True, exist_ok=True)
con = sqlite3.connect(db_path) # [CONTEXT_MANAGER] Используем with-statement для соединения с БД
cur = con.cursor() with sqlite3.connect(db_path) as con:
cur.execute(""" cur = con.cursor()
CREATE TABLE IF NOT EXISTS products ( # [ACTION] Создание таблицы products
id INTEGER PRIMARY KEY AUTOINCREMENT, cur.execute("""
run_id TEXT NOT NULL, CREATE TABLE IF NOT EXISTS products (
name TEXT NOT NULL, id INTEGER PRIMARY KEY AUTOINCREMENT,
volume TEXT, run_id TEXT NOT NULL,
price INTEGER NOT NULL, name TEXT NOT NULL,
parsed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP volume TEXT,
) price INTEGER NOT NULL,
""") parsed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
cur.execute(""" )
CREATE TABLE IF NOT EXISTS logs ( """)
id INTEGER PRIMARY KEY AUTOINCREMENT, # [ACTION] Создание таблицы logs
run_id TEXT NOT NULL, cur.execute("""
timestamp TIMESTAMP NOT NULL, CREATE TABLE IF NOT EXISTS logs (
level TEXT NOT NULL, id INTEGER PRIMARY KEY AUTOINCREMENT,
message TEXT NOT NULL run_id TEXT NOT NULL,
) timestamp TEXT NOT NULL, -- Changed to TEXT for ISO format from datetime
""") level TEXT NOT NULL,
con.commit() message TEXT NOT NULL
con.close() )
""")
con.commit()
logging.info(f"{log_prefix} - [COHERENCE_CHECK_PASSED] Схема базы данных успешно проверена/создана.") logging.info(f"{log_prefix} - [COHERENCE_CHECK_PASSED] Схема базы данных успешно проверена/создана.")
except sqlite3.Error as e:
logging.error(f"{log_prefix} - [COHERENCE_CHECK_FAILED] Ошибка SQLite при инициализации БД: {e}", exc_info=True)
raise ConnectionError(f"Ошибка БД при инициализации: {e}") from e
except Exception as e: except Exception as e:
logging.error(f"{log_prefix} - [COHERENCE_CHECK_FAILED] Ошибка при инициализации БД: {e}") logging.critical(f"{log_prefix} - [CRITICAL] Непредвиденная ошибка при инициализации БД: {e}", exc_info=True)
raise
# [CONTRACT] save_data_to_db
# @description: Сохраняет список объектов ProductVariant (представленных как словари) в таблицу `products`.
# @pre:
# - `data` должен быть списком словарей, каждый из которых соответствует ProductVariant.
# - `db_path` должен указывать на существующую и инициализированную БД.
# @post: Данные из `data` вставлены в таблицу `products`.
def save_data_to_db(data: List[Dict], db_path: Path, run_id: str):
log_prefix = f"save_data_to_db(id={run_id})"
if not data:
logging.warning(f"{log_prefix} - [CONTRACT_VIOLATION] Данные для сохранения отсутствуют. Пропуск сохранения.")
return
logging.info(f"{log_prefix} - Начало сохранения {len(data)} записей в БД: {db_path}")
# [PRECONDITION] Проверка формата данных (хотя ProductVariant.model_dump() должен гарантировать)
if not all(isinstance(item, dict) and all(k in item for k in ['name', 'volume', 'price']) for item in data):
logging.error(f"{log_prefix} - [CONTRACT_VIOLATION] Некорректный формат данных для сохранения в БД.", extra={"sample_data": data[:1]})
raise ValueError("Данные для сохранения в БД не соответствуют ожидаемому формату ProductVariant.")
try:
# [CONTEXT_MANAGER] Используем with-statement для безопасного соединения и коммита
with sqlite3.connect(db_path) as con:
cur = con.cursor()
products_to_insert = []
for item in data:
# Преобразование к int и обработка возможных ошибок приведения типа
try:
price_int = int(item['price'])
except (ValueError, TypeError) as e:
logging.error(f"{log_prefix} - [DATA_CLEANUP_FAILED] Некорректное значение цены для '{item.get('name')}': {item.get('price')}. Пропуск записи. Ошибка: {e}")
# [COHERENCE_CHECK_FAILED] Данные не соответствуют схеме
continue # Пропускаем эту запись, но продолжаем для остальных
products_to_insert.append(
(run_id, item['name'], item['volume'], price_int)
)
if products_to_insert:
cur.executemany(
"INSERT INTO products (run_id, name, volume, price) VALUES (?, ?, ?, ?)",
products_to_insert
)
con.commit()
logging.info(f"{log_prefix} - [COHERENCE_CHECK_PASSED] {len(products_to_insert)} записей успешно сохранено в базу данных.")
else:
logging.warning(f"{log_prefix} - После фильтрации не осталось валидных записей для сохранения.")
except sqlite3.Error as e:
logging.error(f"{log_prefix} - [COHERENCE_CHECK_FAILED] Ошибка SQLite при сохранении данных: {e}", exc_info=True)
raise ConnectionError(f"Ошибка БД при сохранении: {e}") from e
except Exception as e:
logging.critical(f"{log_prefix} - [CRITICAL] Непредвиденная ошибка при сохранении данных в БД: {e}", exc_info=True)
raise
# [CONTRACT] save_data_to_db
# @description: Сохраняет список объектов ProductVariant (представленных как словари) в таблицу `products`.
# @pre:
# - `data` должен быть списком словарей, каждый из которых соответствует ProductVariant.
# - `db_path` должен указывать на существующую и инициализированную БД.
# @post: Данные из `data` вставлены в таблицу `products`.
def save_data_to_db(data: List[Dict], db_path: Path, run_id: str):
log_prefix = f"save_data_to_db(id={run_id})"
if not data:
logging.warning(f"{log_prefix} - [CONTRACT_VIOLATION] Данные для сохранения отсутствуют. Пропуск сохранения.")
return
logging.info(f"{log_prefix} - Начало сохранения {len(data)} записей в БД: {db_path}")
# [PRECONDITION] Проверка формата данных (хотя ProductVariant.model_dump() должен гарантировать)
if not all(isinstance(item, dict) and all(k in item for k in ['name', 'volume', 'price']) for item in data):
logging.error(f"{log_prefix} - [CONTRACT_VIOLATION] Некорректный формат данных для сохранения в БД.", extra={"sample_data": data[:1]})
raise ValueError("Данные для сохранения в БД не соответствуют ожидаемому формату ProductVariant.")
try:
# [CONTEXT_MANAGER] Используем with-statement для безопасного соединения и коммита
with sqlite3.connect(db_path) as con:
cur = con.cursor()
products_to_insert = []
for item in data:
# Преобразование к int и обработка возможных ошибок приведения типа
try:
price_int = int(item['price'])
except (ValueError, TypeError) as e:
logging.error(f"{log_prefix} - [DATA_CLEANUP_FAILED] Некорректное значение цены для '{item.get('name')}': {item.get('price')}. Пропуск записи. Ошибка: {e}")
# [COHERENCE_CHECK_FAILED] Данные не соответствуют схеме
continue # Пропускаем эту запись, но продолжаем для остальных
products_to_insert.append(
(run_id, item['name'], item['volume'], price_int)
)
if products_to_insert:
cur.executemany(
"INSERT INTO products (run_id, name, volume, price) VALUES (?, ?, ?, ?)",
products_to_insert
)
con.commit()
logging.info(f"{log_prefix} - [COHERENCE_CHECK_PASSED] {len(products_to_insert)} записей успешно сохранено в базу данных.")
else:
logging.warning(f"{log_prefix} - После фильтрации не осталось валидных записей для сохранения.")
except sqlite3.Error as e:
logging.error(f"{log_prefix} - [COHERENCE_CHECK_FAILED] Ошибка SQLite при сохранении данных: {e}", exc_info=True)
raise ConnectionError(f"Ошибка БД при сохранении: {e}") from e
except Exception as e:
logging.critical(f"{log_prefix} - [CRITICAL] Непредвиденная ошибка при сохранении данных в БД: {e}", exc_info=True)
raise raise
def save_data_to_db(data: List[Dict], db_path: Path, run_id: str): def save_data_to_db(data: List[Dict], db_path: Path, run_id: str):

64
src/core/models.py Normal file
View File

@@ -0,0 +1,64 @@
# [FILE] src/core/models.py
# ANCHOR: Core_Models_Module
# Семантика: Определяет Pydantic-модели для структурированного представления данных
# в приложении (продукты, логи).
# [CONTRACT]: Все модели наследуются от `BaseModel` и обеспечивают типизацию и валидацию.
# [COHERENCE]: Согласованы со схемами данных, используемыми в БД и экспортах.
from pydantic import BaseModel, Field, HttpUrl, ValidationError
from datetime import datetime
from typing import Optional
class ProductVariant(BaseModel):
"""
[CONTRACT]
@description: Модель данных для варианта продукта.
@invariant: `name`, `price`, `url` являются обязательными. `price` всегда `int`.
"""
name: str = Field(..., description="Название продукта.")
volume: str = Field(..., description="Объем или вариант продукта (например, '50мл', '10 капсул').")
price: int = Field(..., description="Цена продукта в числовом формате.")
url: HttpUrl = Field(..., description="Полный URL страницы варианта продукта.", examples=["https://elixirpeptide.ru/product/?product=123"])
# [VALIDATOR] Пример пост-валидации, если нужно.
# @validator('price')
# def price_must_be_positive(cls, v):
# if v < 0:
# raise ValueError('Price must be a positive integer')
# return v
class Config:
json_schema_extra = {
"example": {
"name": "Peptide X",
"volume": "30ml",
"price": 1500,
"url": "https://elixirpeptide.ru/catalog/peptide-x/?product=variant1"
}
}
class LogRecordModel(BaseModel):
"""
[CONTRACT]
@description: Модель данных для записи лога, используемая при сохранении логов в БД.
@invariant: Все поля являются обязательными. `timestamp` хранится как ISO-строка.
"""
run_id: str = Field(..., description="Уникальный идентификатор текущего запуска парсера.")
timestamp: datetime = Field(..., description="Время создания лог-записи.")
level: str = Field(..., description="Уровень логирования (e.g., INFO, ERROR, DEBUG).")
message: str = Field(..., description="Текст лог-сообщения.")
# Pydantic автоматически обработает datetime в JSON и другие форматы.
# Для SQLite, timestamp будет храниться как TEXT в ISO-формате.
class Config:
json_schema_extra = {
"example": {
"run_id": "20231027-123456",
"timestamp": "2023-10-27T12:34:56.789Z",
"level": "INFO",
"message": "Парсинг начат."
}
}
# [COHERENCE_CHECK_PASSED] Все основные модели данных определены и типизированы.

View File

@@ -8,13 +8,13 @@ import logging
import time import time
import requests import requests
from datetime import datetime from datetime import datetime
from typing import List from typing import List, Optional
from src.core.settings import Settings from src.core.settings import Settings
from src.core.models import ProductVariant from src.core.models import ProductVariant # [FIX] Импорт ProductVariant из models.py
from src.core.logging_config import setup_logging from src.core.database import init_database, save_data_to_db, DatabaseManager # [FIX] Импорт DatabaseManager
from src.core.database import init_database, save_data_to_db, DatabaseManager from src.core.logging_config import setup_logging # [COHERENCE_CHECK_PASSED] Импорт loggin_config
from src.scraper.engine import Scraper # [FIX] Импортируем класс Scraper from src.scraper.engine import Scraper
from src.utils.exporters import save_data_to_csv from src.utils.exporters import save_data_to_csv
class AppOrchestrator: class AppOrchestrator:
@@ -29,7 +29,7 @@ class AppOrchestrator:
self.run_id = datetime.now().strftime('%Y%m%d-%H%M%S') self.run_id = datetime.now().strftime('%Y%m%d-%H%M%S')
self.http_session = requests.Session() self.http_session = requests.Session()
self.http_session.headers.update(settings.headers) self.http_session.headers.update(settings.headers)
self.db_manager = None self.db_manager: Optional[DatabaseManager] = None # [STATE] Инициализация db_manager как Optional
self.final_data: List[ProductVariant] = [] self.final_data: List[ProductVariant] = []
# [DELEGATES] Создаем экземпляр скрейпера, передавая ему зависимости. # [DELEGATES] Создаем экземпляр скрейпера, передавая ему зависимости.
@@ -39,43 +39,49 @@ class AppOrchestrator:
selectors=self.settings.selectors, selectors=self.settings.selectors,
base_url=self.settings.base_url base_url=self.settings.base_url
) )
self.logger = logging.getLogger(self.__class__.__name__) # [INIT] Инициализация логгера для класса
def _setup(self): def _setup(self):
"""[ACTION] Шаг 0: Инициализация всех систем.""" """[ACTION] Шаг 0: Инициализация всех систем."""
self.logger.info(f"[INFO] Запуск инициализации систем. Run ID: {self.run_id}")
# [CONDITIONAL_ACTION] Инициализация базы данных, если требуется
if self.settings.save_to_db or self.settings.log_to_db: if self.settings.save_to_db or self.settings.log_to_db:
# [ACTION] Создаем директорию для БД, если ее нет
self.settings.output_dir.mkdir(parents=True, exist_ok=True)
self.db_manager = DatabaseManager(self.settings.db_path) self.db_manager = DatabaseManager(self.settings.db_path)
init_database(self.db_manager, self.run_id) init_database(self.db_manager.db_path, self.run_id) # init_database работает с Path
# [DELEGATES] Настройка логирования
setup_logging(self.run_id, self.db_manager) setup_logging(self.run_id, self.db_manager)
logging.info(f"Запуск парсера. Архитектура v2.0. Run ID: {self.run_id}") self.logger.info(f"[INFO] Оркестратор запущен. Архитектура v2.0. Run ID: {self.run_id}")
def _collect_urls(self) -> List[str]: def _collect_urls(self) -> List[str]:
"""[ACTION] Шаги 1 и 2: Сбор всех URL для парсинга.""" """[ACTION] Шаги 1 и 2: Сбор всех URL для парсинга."""
self.logger.info("[INFO] Начало сбора URL для парсинга.")
# [DELEGATES] Делегируем сбор URL скрейперу. # [DELEGATES] Делегируем сбор URL скрейперу.
base_urls = self.scraper.get_base_product_urls( base_urls = self.scraper.get_base_product_urls(
catalog_url=self.settings.catalog_url, catalog_url=self.settings.catalog_url,
run_id=self.run_id run_id=self.run_id
) )
if not base_urls: if not base_urls:
logging.error("Не найдено ни одного базового URL. Завершение работы.") self.logger.error("[ERROR] Не найдено ни одного базового URL. Завершение работы сбора URL.")
return [] return []
# [DELEGATES] Делегируем сбор URL вариантов скрейперу. # [DELEGATES] Делегируем сбор URL вариантов скрейперу.
all_urls_to_scrape = self.scraper.get_all_variant_urls( all_urls_to_scrape = self.scraper.get_all_variant_urls(
base_product_urls=base_urls, base_product_urls=base_urls,
run_id=self.run_id run_id=self.run_id
) )
if not all_urls_to_scrape: if not all_urls_to_scrape:
logging.error("Не удалось сформировать список URL для парсинга. Завершение работы.") self.logger.error("[ERROR] Не удалось сформировать список URL для парсинга. Завершение работы сбора URL.")
self.logger.info(f"[INFO] Сбор URL завершен. Найдено {len(all_urls_to_scrape)} URL вариантов для парсинга.")
return all_urls_to_scrape return all_urls_to_scrape
def _scrape_data(self, urls: List[str]): def _scrape_data(self, urls: List[str]):
"""[ACTION] Шаг 3: Итеративный парсинг данных.""" """[ACTION] Шаг 3: Итеративный парсинг данных."""
total_to_scrape = len(urls) total_to_scrape = len(urls)
self.logger.info(f"[INFO] Начало парсинга {total_to_scrape} URL вариантов.")
for i, url in enumerate(urls): for i, url in enumerate(urls):
logging.info(f"Парсинг URL {i+1}/{total_to_scrape}") self.logger.info(f"[INFO] Парсинг URL {i+1}/{total_to_scrape}: {url.split('/')[-1]}")
time.sleep(1) # Задержка между запросами time.sleep(1) # [ACTION] Задержка между запросами
# [DELEGATES] Делегируем парсинг одной страницы скрейперу. # [DELEGATES] Делегируем парсинг одной страницы скрейперу.
variant_data = self.scraper.scrape_variant_page( variant_data = self.scraper.scrape_variant_page(
variant_url=url, variant_url=url,
@@ -83,35 +89,58 @@ class AppOrchestrator:
) )
if variant_data: if variant_data:
self.final_data.append(variant_data) self.final_data.append(variant_data)
self.logger.info(f"[INFO] Парсинг данных завершен. Всего собрано {len(self.final_data)} валидных вариантов.")
def _save_results(self): def _save_results(self):
"""[ACTION] Шаг 4: Сохранение результатов.""" """[ACTION] Шаг 4: Сохранение результатов."""
self.logger.info("[INFO] Начало сохранения результатов парсинга.")
if not self.final_data: if not self.final_data:
logging.warning("Итоговый набор данных пуст. Файлы не будут созданы.") self.logger.warning("[WARN] Итоговый набор данных пуст. Файлы не будут созданы.")
return return
logging.info(f"Сбор данных завершен. Всего найдено валидных вариантов: {len(self.final_data)}") self.logger.info(f"[INFO] Всего найдено валидных вариантов для сохранения: {len(self.final_data)}")
# [CONDITIONAL_ACTION] Сохранение в CSV
if self.settings.save_to_csv: if self.settings.save_to_csv:
timestamp = datetime.now().strftime('%Y-%m-%d') timestamp = datetime.now().strftime('%Y-%m-%d_%H%M%S') # Добавил время для уникальности
output_filename = self.settings.output_dir / f'prices_full_catalog_{timestamp}.csv' output_filename = self.settings.output_dir / f'prices_full_catalog_{timestamp}.csv'
save_data_to_csv(self.final_data, output_filename, self.run_id) # Преобразуем ProductVariant объекты в словари для save_data_to_csv
data_to_csv = [p.model_dump() for p in self.final_data] # Используем model_dump() для Pydantic v2
save_data_to_csv(data_to_csv, output_filename, self.run_id)
self.logger.info(f"[INFO] Данные сохранены в CSV: {output_filename}")
# [CONDITIONAL_ACTION] Сохранение в БД
if self.settings.save_to_db and self.db_manager: if self.settings.save_to_db and self.db_manager:
save_data_to_db(self.final_data, self.db_manager, self.run_id) # Преобразуем ProductVariant объекты в словари для save_data_to_db
data_to_db = [p.model_dump() for p in self.final_data]
save_data_to_db(data_to_db, self.db_manager.db_path, self.run_id) # save_data_to_db ожидает Path
self.logger.info("[INFO] Данные сохранены в базу данных.")
self.logger.info("[INFO] Сохранение результатов завершено.")
def _cleanup(self): def _cleanup(self):
"""[ACTION] Шаг 5: Корректное завершение работы.""" """[ACTION] Шаг 5: Корректное завершение работы."""
self.logger.info("[INFO] Начало очистки ресурсов.")
self.http_session.close() self.http_session.close()
self.logger.debug("[DEBUG] HTTP-сессия закрыта.")
if self.db_manager: if self.db_manager:
self.db_manager.close() self.db_manager.close()
logging.info(f"Работа парсера завершена. Run ID: {self.run_id}") self.logger.debug("[DEBUG] Соединение с базой данных закрыто.")
self.logger.info(f"[COHERENCE_CHECK_PASSED] Работа парсера завершена. Run ID: {self.run_id}")
def run(self): def run(self):
"""[ENTRYPOINT] Основной метод, запускающий весь процесс.""" """[ENTRYPOINT] Основной метод, запускающий весь процесс."""
self._setup() self.logger.info("="*50)
urls_to_scrape = self._collect_urls() self.logger.info("[INFO] Запуск главного процесса оркестратора.")
if urls_to_scrape: self.logger.info("="*50)
self._scrape_data(urls_to_scrape) try:
self._save_results() self._setup()
self._cleanup() urls_to_scrape = self._collect_urls()
if urls_to_scrape:
self._scrape_data(urls_to_scrape)
self._save_results()
else:
self.logger.warning("[WARN] Отсутствуют URL для парсинга. Пропуск шагов парсинга и сохранения.")
except Exception as e:
self.logger.critical(f"[CRITICAL] Непредвиденная критическая ошибка в оркестраторе: {e}", exc_info=True)
# [COHERENCE_CHECK_FAILED] Критическая ошибка нарушила нормальный поток выполнения.
finally:
self._cleanup()

View File

@@ -10,7 +10,7 @@ import requests
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
from typing import List, Optional from typing import List, Optional
from src.core.models import ProductVariant from src.core.models import ProductVariant # [FIX] Импорт ProductVariant
from src.core.settings import ScraperSelectors from src.core.settings import ScraperSelectors
class Scraper: class Scraper:
@@ -28,8 +28,11 @@ class Scraper:
def _clean_price(self, price_str: str) -> int: def _clean_price(self, price_str: str) -> int:
"""[HELPER] Очищает строку цены и возвращает целое число.""" """[HELPER] Очищает строку цены и возвращает целое число."""
self.logger.debug(f"[DEBUG] Очистка цены: '{price_str}'")
digits = ''.join(filter(str.isdigit, price_str)) digits = ''.join(filter(str.isdigit, price_str))
return int(digits) if digits else 0 cleaned_price = int(digits) if digits else 0
self.logger.debug(f"[DEBUG] Цена после очистки: {cleaned_price}")
return cleaned_price
def _fetch_page(self, url: str, request_id: str) -> Optional[str]: def _fetch_page(self, url: str, request_id: str) -> Optional[str]:
"""[HELPER] Приватный метод для скачивания HTML-содержимого страницы.""" """[HELPER] Приватный метод для скачивания HTML-содержимого страницы."""
@@ -41,7 +44,181 @@ class Scraper:
self.logger.debug(f"{log_prefix} - [COHERENCE_CHECK_PASSED] Страница успешно получена, статус {response.status_code}.") self.logger.debug(f"{log_prefix} - [COHERENCE_CHECK_PASSED] Страница успешно получена, статус {response.status_code}.")
return response.text return response.text
except requests.RequestException as e: except requests.RequestException as e:
self.logger.error(f"{log_prefix} - [COHERENCE_CHECK_FAILED] Сетевая ошибка: {e}") self.logger.error(f"{log_prefix} - [COHERENCE_CHECK_FAILED] Сетевая ошибка при запросе {url}: {e}", exc_info=True)
return None
def get_base_product_urls(self, catalog_url: str, run_id: str) -> List[str]:
"""[ACTION] Собирает URL всех товаров с основной страницы каталога.
@pre: `catalog_url` должен быть доступен.
@post: Возвращает список уникальных URL базовых продуктов.
"""
log_prefix = f"get_base_urls(id={run_id})"
self.logger.info(f"{log_prefix} - Начало сбора базовых URL с: {catalog_url}")
html = self._fetch_page(catalog_url, log_prefix)
if not html:
self.logger.warning(f"{log_prefix} - Не удалось получить HTML страницы каталога, возвращаю пустой список.")
return []
soup = BeautifulSoup(html, 'html.parser')
links = soup.select(self.selectors.catalog_product_link)
unique_urls = {urljoin(self.base_url, link.get('href')) for link in links if link.get('href')}
self.logger.info(f"{log_prefix} - Найдено {len(unique_urls)} уникальных базовых URL.")
# [COHERENCE_CHECK_PASSED] Базовые URL успешно собраны.
return list(unique_urls)
def get_all_variant_urls(self, base_product_urls: List[str], run_id: str) -> List[str]:
"""[ACTION] Проходит по базовым URL и собирает URL всех их вариантов.
@pre: `base_product_urls` - список доступных URL продуктов.
@post: Возвращает список всех URL вариантов продуктов.
"""
all_variant_urls = []
total_base = len(base_product_urls)
log_prefix = f"get_variant_urls(id={run_id})"
self.logger.info(f"{log_prefix} - Начало сбора URL вариантов для {total_base} базовых продуктов.")
for i, base_url in enumerate(base_product_urls):
self.logger.info(f"{log_prefix} - Обработка базового URL {i+1}/{total_base}: {base_url.split('/')[-1]}")
html = self._fetch_page(base_url, f"{log_prefix}-{i+1}")
if not html:
self.logger.warning(f"{log_prefix} - Пропуск базового URL из-за ошибки загрузки: {base_url}")
continue
soup = BeautifulSoup(html, 'html.parser')
variant_items = soup.select(self.selectors.variant_list_item)
if not variant_items:
self.logger.debug(f"{log_prefix} - Товар не имеет явных вариантов, добавляю базовый URL как вариант: {base_url}")
all_variant_urls.append(base_url)
else:
for item in variant_items:
variant_id = item.get('data-id')
if variant_id:
variant_url = f"{base_url}?product={variant_id}"
all_variant_urls.append(variant_url)
self.logger.debug(f"{log_prefix} - Найдено {len(variant_items)} вариантов для товара {base_url.split('/')[-1]}.")
time.sleep(0.5) # [ACTION] Задержка между запросами
self.logger.info(f"{log_prefix} - [COHERENCE_CHECK_PASSED] Обнаружено всего {len(all_variant_urls)} URL вариантов для парсинга.")
return all_variant_urls
def scrape_variant_page(self, variant_url: str, run_id: str) -> Optional[ProductVariant]:
"""[ACTION] Парсит страницу одного варианта и возвращает Pydantic-модель.
@pre: `variant_url` должен быть доступен и содержать ожидаемые элементы.
@post: Возвращает `ProductVariant` или `None` в случае ошибки парсинга.
"""
log_prefix = f"scrape_variant(id={run_id}, url={variant_url.split('/')[-1]})"
self.logger.info(f"{log_prefix} - Начало парсинга страницы варианта.")
html = self._fetch_page(variant_url, log_prefix)
if not html:
self.logger.warning(f"{log_prefix} - Не удалось получить HTML страницы варианта, пропуск парсинга.")
return None
soup = BeautifulSoup(html, 'html.parser')
try:
name_el = soup.select_one(self.selectors.product_page_name)
price_el = soup.select_one(self.selectors.price_block)
volume_el = soup.select_one(self.selectors.active_volume) # Optional, может отсутствовать
# [PRECONDITION] Проверка наличия основных элементов
if not (name_el and price_el):
self.logger.warning(f"{log_prefix} - [COHERENCE_CHECK_FAILED] Не найдены базовые элементы (Имя продукта или Блок цены). Пропуск URL: {variant_url}.")
return None
# [ACTION] Извлечение данных
name = name_el.get_text(strip=True)
price = self._clean_price(price_el.get_text(strip=True))
volume = volume_el.get_text(strip=True) if volume_el else "N/A"
# [POSTCONDITION] Создаем экземпляр контракта данных.
# [CONTRACT_VALIDATOR] Pydantic валидация при создании модели
product = ProductVariant(name=name, volume=volume, price=price, url=variant_url)
self.logger.debug(f"{log_prefix} - [COHERENCE_CHECK_PASSED] Успешно распарсен вариант: '{product.name}' | '{product.volume}' | '{product.price}'")
return product
except Exception as e:
self.logger.error(f"{log_prefix} - [COHERENCE_CHECK_FAILED] Исключение при парсинге страницы {variant_url}: {e}", exc_info=True)
return None
def get_base_product_urls(self, catalog_url: str, run_id: str) -> List[str]:
"""[ACTION] Собирает URL всех товаров с основной страницы каталога.
@pre: `catalog_url` должен быть доступен.
@post: Возвращает список уникальных URL базовых продуктов.
"""
log_prefix = f"get_base_urls(id={run_id})"
self.logger.info(f"{log_prefix} - Начало сбора базовых URL с: {catalog_url}")
html = self._fetch_page(catalog_url, log_prefix)
if not html:
self.logger.warning(f"{log_prefix} - Не удалось получить HTML страницы каталога, возвращаю пустой список.")
return []
soup = BeautifulSoup(html, 'html.parser')
links = soup.select(self.selectors.catalog_product_link)
unique_urls = {urljoin(self.base_url, link.get('href')) for link in links if link.get('href')}
self.logger.info(f"{log_prefix} - Найдено {len(unique_urls)} уникальных базовых URL.")
# [COHERENCE_CHECK_PASSED] Базовые URL успешно собраны.
return list(unique_urls)
def get_all_variant_urls(self, base_product_urls: List[str], run_id: str) -> List[str]:
"""[ACTION] Проходит по базовым URL и собирает URL всех их вариантов.
@pre: `base_product_urls` - список доступных URL продуктов.
@post: Возвращает список всех URL вариантов продуктов.
"""
all_variant_urls = []
total_base = len(base_product_urls)
log_prefix = f"get_variant_urls(id={run_id})"
self.logger.info(f"{log_prefix} - Начало сбора URL вариантов для {total_base} базовых продуктов.")
for i, base_url in enumerate(base_product_urls):
self.logger.info(f"{log_prefix} - Обработка базового URL {i+1}/{total_base}: {base_url.split('/')[-1]}")
html = self._fetch_page(base_url, f"{log_prefix}-{i+1}")
if not html:
self.logger.warning(f"{log_prefix} - Пропуск базового URL из-за ошибки загрузки: {base_url}")
continue
soup = BeautifulSoup(html, 'html.parser')
variant_items = soup.select(self.selectors.variant_list_item)
if not variant_items:
self.logger.debug(f"{log_prefix} - Товар не имеет явных вариантов, добавляю базовый URL как вариант: {base_url}")
all_variant_urls.append(base_url)
else:
for item in variant_items:
variant_id = item.get('data-id')
if variant_id:
variant_url = f"{base_url}?product={variant_id}"
all_variant_urls.append(variant_url)
self.logger.debug(f"{log_prefix} - Найдено {len(variant_items)} вариантов для товара {base_url.split('/')[-1]}.")
time.sleep(0.5) # [ACTION] Задержка между запросами
self.logger.info(f"{log_prefix} - [COHERENCE_CHECK_PASSED] Обнаружено всего {len(all_variant_urls)} URL вариантов для парсинга.")
return all_variant_urls
def scrape_variant_page(self, variant_url: str, run_id: str) -> Optional[ProductVariant]:
"""[ACTION] Парсит страницу одного варианта и возвращает Pydantic-модель.
@pre: `variant_url` должен быть доступен и содержать ожидаемые элементы.
@post: Возвращает `ProductVariant` или `None` в случае ошибки парсинга.
"""
log_prefix = f"scrape_variant(id={run_id}, url={variant_url.split('/')[-1]})"
self.logger.info(f"{log_prefix} - Начало парсинга страницы варианта.")
html = self._fetch_page(variant_url, log_prefix)
if not html:
self.logger.warning(f"{log_prefix} - Не удалось получить HTML страницы варианта, пропуск парсинга.")
return None
soup = BeautifulSoup(html, 'html.parser')
try:
name_el = soup.select_one(self.selectors.product_page_name)
price_el = soup.select_one(self.selectors.price_block)
volume_el = soup.select_one(self.selectors.active_volume) # Optional, может отсутствовать
# [PRECONDITION] Проверка наличия основных элементов
if not (name_el and price_el):
self.logger.warning(f"{log_prefix} - [COHERENCE_CHECK_FAILED] Не найдены базовые элементы (Имя продукта или Блок цены). Пропуск URL: {variant_url}.")
return None
# [ACTION] Извлечение данных
name = name_el.get_text(strip=True)
price = self._clean_price(price_el.get_text(strip=True))
volume = volume_el.get_text(strip=True) if volume_el else "N/A"
# [POSTCONDITION] Создаем экземпляр контракта данных.
# [CONTRACT_VALIDATOR] Pydantic валидация при создании модели
product = ProductVariant(name=name, volume=volume, price=price, url=variant_url)
self.logger.debug(f"{log_prefix} - [COHERENCE_CHECK_PASSED] Успешно распарсен вариант: '{product.name}' | '{product.volume}' | '{product.price}'")
return product
except Exception as e:
self.logger.error(f"{log_prefix} - [COHERENCE_CHECK_FAILED] Исключение при парсинге страницы {variant_url}: {e}", exc_info=True)
return None return None
def get_base_product_urls(self, catalog_url: str, run_id: str) -> List[str]: def get_base_product_urls(self, catalog_url: str, run_id: str) -> List[str]:
@@ -51,11 +228,9 @@ class Scraper:
html = self._fetch_page(catalog_url, log_prefix) html = self._fetch_page(catalog_url, log_prefix)
if not html: if not html:
return [] return []
soup = BeautifulSoup(html, 'html.parser') soup = BeautifulSoup(html, 'html.parser')
links = soup.select(self.selectors.catalog_product_link) links = soup.select(self.selectors.catalog_product_link)
unique_urls = {urljoin(self.base_url, link.get('href')) for link in links if link.get('href')} unique_urls = {urljoin(self.base_url, link.get('href')) for link in links if link.get('href')}
self.logger.info(f"{log_prefix} - Найдено {len(unique_urls)} уникальных базовых URL.") self.logger.info(f"{log_prefix} - Найдено {len(unique_urls)} уникальных базовых URL.")
return list(unique_urls) return list(unique_urls)
@@ -73,7 +248,6 @@ class Scraper:
soup = BeautifulSoup(html, 'html.parser') soup = BeautifulSoup(html, 'html.parser')
variant_items = soup.select(self.selectors.variant_list_item) variant_items = soup.select(self.selectors.variant_list_item)
if not variant_items: if not variant_items:
self.logger.debug(f"{log_prefix} - Товар без вариантов, используется базовый URL: {base_url}") self.logger.debug(f"{log_prefix} - Товар без вариантов, используется базовый URL: {base_url}")
all_variant_urls.append(base_url) all_variant_urls.append(base_url)
@@ -84,9 +258,7 @@ class Scraper:
variant_url = f"{base_url}?product={variant_id}" variant_url = f"{base_url}?product={variant_id}"
all_variant_urls.append(variant_url) all_variant_urls.append(variant_url)
self.logger.debug(f"{log_prefix} - Найдено {len(variant_items)} вариантов для товара.") self.logger.debug(f"{log_prefix} - Найдено {len(variant_items)} вариантов для товара.")
time.sleep(0.5) time.sleep(0.5)
self.logger.info(f"Обнаружено всего {len(all_variant_urls)} URL вариантов для парсинга.") self.logger.info(f"Обнаружено всего {len(all_variant_urls)} URL вариантов для парсинга.")
return all_variant_urls return all_variant_urls
@@ -96,20 +268,15 @@ class Scraper:
html = self._fetch_page(variant_url, log_prefix) html = self._fetch_page(variant_url, log_prefix)
if not html: if not html:
return None return None
soup = BeautifulSoup(html, 'html.parser') soup = BeautifulSoup(html, 'html.parser')
try: try:
name_el = soup.select_one(self.selectors.product_page_name) name_el = soup.select_one(self.selectors.product_page_name)
price_el = soup.select_one(self.selectors.price_block) price_el = soup.select_one(self.selectors.price_block)
if not (name_el and price_el): if not (name_el and price_el):
self.logger.warning(f"{log_prefix} - [COHERENCE_CHECK_FAILED] Не найдены базовые элементы (Имя или Цена). Пропуск URL.") self.logger.warning(f"{log_prefix} - [COHERENCE_CHECK_FAILED] Не найдены базовые элементы (Имя или Цена). Пропуск URL.")
return None return None
name = name_el.get_text(strip=True) name = name_el.get_text(strip=True)
price = self._clean_price(price_el.get_text(strip=True)) price = self._clean_price(price_el.get_text(strip=True))
volume_el = soup.select_one(self.selectors.active_volume) volume_el = soup.select_one(self.selectors.active_volume)
volume = volume_el.get_text(strip=True) if volume_el else "N/A" volume = volume_el.get_text(strip=True) if volume_el else "N/A"