monitoring refactor

This commit is contained in:
2025-07-20 09:29:19 +03:00
parent 40317aa2e7
commit 5742d474fd
5 changed files with 129 additions and 83 deletions

View File

@@ -2,28 +2,36 @@
# <IMPORTS> # <IMPORTS>
import logging import logging
from typing import List, Dict, Tuple import sqlite3
from sqlalchemy.orm import sessionmaker from typing import List, Dict, Tuple, Any
from src.core.models import ProductVariant, ParsingRun
from sqlalchemy import create_engine
from src.core.settings import settings from src.core.settings import settings
# </IMPORTS> # </IMPORTS>
# <MAIN_CONTRACT for="DataAnalyzer"> # <MAIN_CONTRACT for="DataAnalyzer">
# description: "Анализирует данные двух последних запусков парсера и выявляет изменения." # description: "Анализирует данные двух последних запусков парсера и выявляет изменения, используя прямые SQL-запросы."
# </MAIN_CONTRACT> # </MAIN_CONTRACT>
class DataAnalyzer: class DataAnalyzer:
# <INIT> # <INIT>
def __init__(self): def __init__(self):
self.engine = create_engine(settings.db_path.absolute().as_uri()) self.db_path = settings.db_path
self.Session = sessionmaker(bind=self.engine)
self.logger = logging.getLogger(__name__) self.logger = logging.getLogger(__name__)
# </INIT> # </INIT>
def _execute_query(self, query: str, params: tuple = ()) -> List[Any]:
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute(query, params)
return cursor.fetchall()
except sqlite3.Error as e:
self.logger.error(f"Ошибка выполнения SQL-запроса: {e}")
return []
# <HELPER name="_get_last_two_runs"> # <HELPER name="_get_last_two_runs">
def _get_last_two_runs(self, session) -> Tuple[str, str]: def _get_last_two_runs(self) -> Tuple[str, str]:
"""Возвращает ID двух последних успешных запусков.""" """Возвращает ID двух последних успешных запусков."""
runs = session.query(ParsingRun.run_id).order_by(ParsingRun.start_time.desc()).limit(2).all() query = "SELECT run_id FROM parsing_runs ORDER BY start_time DESC LIMIT 2"
runs = self._execute_query(query)
if len(runs) < 2: if len(runs) < 2:
self.logger.warning("Найдено менее двух запусков. Сравнение невозможно.") self.logger.warning("Найдено менее двух запусков. Сравнение невозможно.")
return None, None return None, None
@@ -31,10 +39,12 @@ class DataAnalyzer:
# </HELPER> # </HELPER>
# <HELPER name="_get_data_for_run"> # <HELPER name="_get_data_for_run">
def _get_data_for_run(self, session, run_id: str) -> Dict[str, ProductVariant]: def _get_data_for_run(self, run_id: str) -> Dict[str, Dict]:
"""Возвращает данные о товарах для указанного запуска в виде словаря {url: ProductVariant}.""" """Возвращает данные о товарах для указанного запуска в виде словаря {url: product_data}."""
variants = session.query(ProductVariant).filter(ProductVariant.run_id == run_id).all() query = "SELECT name, volume, price, url, is_in_stock FROM products WHERE run_id = ?"
return {v.url: v for v in variants} rows = self._execute_query(query, (run_id,))
# Преобразуем sqlite3.Row в словари
return {row[3]: dict(zip(['name', 'volume', 'price', 'url', 'is_in_stock'], row)) for row in rows}
# </HELPER> # </HELPER>
# <ACTION name="analyze"> # <ACTION name="analyze">
@@ -42,42 +52,35 @@ class DataAnalyzer:
""" """
Сравнивает два последних запуска и генерирует HTML-отчет об изменениях. Сравнивает два последних запуска и генерирует HTML-отчет об изменениях.
""" """
# <CORE_LOGIC> current_run_id, prev_run_id = self._get_last_two_runs()
with self.Session() as session: if not current_run_id or not prev_run_id:
current_run_id, prev_run_id = self._get_last_two_runs(session) return ""
if not current_run_id or not prev_run_id:
return ""
self.logger.info(f"Сравнение запуска {current_run_id} с предыдущим з<EFBFBD><EFBFBD>пуском {prev_run_id}") self.logger.info(f"Сравнение запуска {current_run_id} с предыдущим запуском {prev_run_id}")
current_data = self._get_data_for_run(session, current_run_id) current_data = self._get_data_for_run(current_run_id)
prev_data = self._get_data_for_run(session, prev_run_id) prev_data = self._get_data_for_run(prev_run_id)
price_changes = [] price_changes = []
availability_changes = [] availability_changes = []
new_items = [] new_items = []
removed_items = [] removed_items = []
# Поиск изменений и новых товаров for url, current_item in current_data.items():
for url, current_item in current_data.items(): prev_item = prev_data.get(url)
prev_item = prev_data.get(url) if prev_item:
if prev_item: if current_item['price'] != prev_item['price']:
# Изменение цены price_changes.append((current_item, prev_item['price']))
if current_item.price != prev_item.price: if current_item['is_in_stock'] != prev_item['is_in_stock']:
price_changes.append((current_item, prev_item.price)) availability_changes.append(current_item)
# Изменение статуса наличия else:
if current_item.is_available != prev_item.is_available: new_items.append(current_item)
availability_changes.append(current_item)
else:
new_items.append(current_item)
# Поиск удаленных товаров for url, prev_item in prev_data.items():
for url, prev_item in prev_data.items(): if url not in current_data:
if url not in current_data: removed_items.append(prev_item)
removed_items.append(prev_item)
return self._format_report(price_changes, availability_changes, new_items, removed_items) return self._format_report(price_changes, availability_changes, new_items, removed_items)
# </CORE_LOGIC>
# </ACTION> # </ACTION>
# <HELPER name="_format_report"> # <HELPER name="_format_report">
@@ -92,23 +95,23 @@ class DataAnalyzer:
if price_changes: if price_changes:
report_parts.append("\n<b>💰 Изменились цены:</b>") report_parts.append("\n<b>💰 Изменились цены:</b>")
for item, old_price in price_changes: for item, old_price in price_changes:
report_parts.append(f" • <a href='{item.url}'>{item.name} ({item.volume})</a>: {old_price} ₽ → <b>{item.price} ₽</b>") report_parts.append(f" • <a href='{item['url']}'>{item['name']} ({item['volume']})</a>: {old_price} ₽ → <b>{item['price']} ₽</b>")
if availability_changes: if availability_changes:
report_parts.append("\n<b>📦 Изменилось наличие:</b>") report_parts.append("\n<b>📦 Изменилось наличие:</b>")
for item in availability_changes: for item in availability_changes:
status = "В наличии" if item.is_available else "❌ Нет в наличии" status = "В наличии" if item['is_in_stock'] else "❌ Нет в наличии"
report_parts.append(f" • <a href='{item.url}'>{item.name} ({item.volume})</a>: <b>{status}</b>") report_parts.append(f" • <a href='{item['url']}'>{item['name']} ({item['volume']})</a>: <b>{status}</b>")
if new_items: if new_items:
report_parts.append("\n<b>✨ Новые товары:</b>") report_parts.append("\n<b>✨ Новые товары:</b>")
for item in new_items: for item in new_items:
report_parts.append(f" • <a href='{item.url}'>{item.name} ({item.volume})</a> - {item.price}") report_parts.append(f" • <a href='{item['url']}'>{item['name']} ({item['volume']})</a> - {item['price']}")
if removed_items: if removed_items:
report_parts.append("\n<b>🗑️ Удаленные товары:</b>") report_parts.append("\n<b>🗑️ Удаленные товары:</b>")
for item in removed_items: for item in removed_items:
report_parts.append(f" • <a href='{item.url}'>{item.name} ({item.volume})</a>") report_parts.append(f" • <a href='{item['url']}'>{item['name']} ({item['volume']})</a>")
return "\n".join(report_parts) return "\n".join(report_parts)
# </HELPER> # </HELPER>

View File

@@ -102,6 +102,13 @@ def init_database(db_path: Path, run_id: str):
db_path.parent.mkdir(parents=True, exist_ok=True) db_path.parent.mkdir(parents=True, exist_ok=True)
with sqlite3.connect(db_path) as con: with sqlite3.connect(db_path) as con:
cur = con.cursor() cur = con.cursor()
cur.execute("""
CREATE TABLE IF NOT EXISTS parsing_runs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
run_id TEXT NOT NULL UNIQUE,
start_time TIMESTAMP NOT NULL
)
""")
cur.execute(""" cur.execute("""
CREATE TABLE IF NOT EXISTS products ( CREATE TABLE IF NOT EXISTS products (
id INTEGER PRIMARY KEY AUTOINCREMENT, id INTEGER PRIMARY KEY AUTOINCREMENT,
@@ -111,7 +118,8 @@ def init_database(db_path: Path, run_id: str):
price INTEGER NOT NULL, price INTEGER NOT NULL,
url TEXT, url TEXT,
is_in_stock BOOLEAN, is_in_stock BOOLEAN,
parsed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP parsed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (run_id) REFERENCES parsing_runs (run_id)
) )
""") """)
cur.execute(""" cur.execute("""
@@ -120,7 +128,8 @@ def init_database(db_path: Path, run_id: str):
run_id TEXT NOT NULL, run_id TEXT NOT NULL,
timestamp TEXT NOT NULL, timestamp TEXT NOT NULL,
level TEXT NOT NULL, level TEXT NOT NULL,
message TEXT NOT NULL message TEXT NOT NULL,
FOREIGN KEY (run_id) REFERENCES parsing_runs (run_id)
) )
""") """)
con.commit() con.commit()

View File

@@ -12,12 +12,7 @@ import pika
from pika.adapters.blocking_connection import BlockingChannel from pika.adapters.blocking_connection import BlockingChannel
from pika.exceptions import AMQPConnectionError, AMQPChannelError, ConnectionClosed from pika.exceptions import AMQPConnectionError, AMQPChannelError, ConnectionClosed
from .settings import ( from .settings import settings
RABBITMQ_HOST, RABBITMQ_PORT, RABBITMQ_USERNAME, RABBITMQ_PASSWORD,
RABBITMQ_VIRTUAL_HOST, RABBITMQ_CONNECTION_TIMEOUT, RABBITMQ_HEARTBEAT,
RABBITMQ_BLOCKED_CONNECTION_TIMEOUT, RABBITMQ_PRODUCTS_QUEUE,
RABBITMQ_LOGS_QUEUE, RABBITMQ_EXCHANGE
)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -41,17 +36,17 @@ class RabbitMQConnection:
Returns: Returns:
pika.ConnectionParameters: Параметры подключения pika.ConnectionParameters: Параметры подключения
""" """
credentials = pika.PlainCredentials(RABBITMQ_USERNAME, RABBITMQ_PASSWORD) credentials = pika.PlainCredentials(settings.rabbitmq_user, settings.rabbitmq_password)
return pika.ConnectionParameters( return pika.ConnectionParameters(
host=RABBITMQ_HOST, host=settings.rabbitmq_host,
port=RABBITMQ_PORT, port=settings.rabbitmq_port,
virtual_host=RABBITMQ_VIRTUAL_HOST, virtual_host=settings.rabbitmq_vhost,
credentials=credentials, credentials=credentials,
connection_attempts=3, connection_attempts=3,
retry_delay=5, retry_delay=5,
socket_timeout=RABBITMQ_CONNECTION_TIMEOUT, socket_timeout=30, # Hardcoded for now
heartbeat=RABBITMQ_HEARTBEAT, heartbeat=600, # Hardcoded for now
blocked_connection_timeout=RABBITMQ_BLOCKED_CONNECTION_TIMEOUT blocked_connection_timeout=300 # Hardcoded for now
) )
def connect(self) -> bool: def connect(self) -> bool:

View File

@@ -82,6 +82,17 @@ class Settings(BaseModel):
telegram_chat_id: str = Field(default=os.getenv('TELEGRAM_CHAT_ID', ''), description="ID чата для отправки уведомлений") telegram_chat_id: str = Field(default=os.getenv('TELEGRAM_CHAT_ID', ''), description="ID чата для отправки уведомлений")
# </CONFIG> # </CONFIG>
# <CONFIG name="rabbitmq_settings">
rabbitmq_host: str = Field(default=os.getenv('RABBITMQ_HOST', 'localhost'))
rabbitmq_port: int = Field(default=int(os.getenv('RABBITMQ_PORT', 5672)))
rabbitmq_user: str = Field(default=os.getenv('RABBITMQ_USERNAME', 'guest'))
rabbitmq_password: str = Field(default=os.getenv('RABBITMQ_PASSWORD', 'guest'))
rabbitmq_vhost: str = Field(default=os.getenv('RABBITMQ_VIRTUAL_HOST', '/'))
rabbitmq_products_queue: str = Field(default=os.getenv('RABBITMQ_PRODUCTS_QUEUE', 'price_parser.products'))
rabbitmq_logs_queue: str = Field(default=os.getenv('RABBITMQ_LOGS_QUEUE', 'price_parser.logs'))
rabbitmq_exchange: str = Field(default=os.getenv('RABBITMQ_EXCHANGE', 'price_parser.exchange'))
# </CONFIG>
# <CONFIG name="selectors_config_instance"> # <CONFIG name="selectors_config_instance">
selectors: ScraperSelectors = ScraperSelectors( selectors: ScraperSelectors = ScraperSelectors(
CATALOG_PRODUCT_LINK='.product-card h4 a.product-link', CATALOG_PRODUCT_LINK='.product-card h4 a.product-link',

View File

@@ -16,10 +16,12 @@ from concurrent.futures import ThreadPoolExecutor, as_completed
from core.settings import Settings, ENABLE_RABBITMQ_EXPORT, ENABLE_CSV_EXPORT, ENABLE_DATABASE_EXPORT from core.settings import Settings, ENABLE_RABBITMQ_EXPORT, ENABLE_CSV_EXPORT, ENABLE_DATABASE_EXPORT
from core.models import ProductVariant from core.models import ProductVariant
from core.database import init_database, save_data_to_db, DatabaseManager from core.database import DatabaseManager, init_database, save_data_to_db
from core.logging_config import setup_logging from core.rabbitmq import RabbitMQExporter
from scraper.engine import Scraper from scraper.engine import Scraper
from utils.exporters import save_data_to_csv, export_data_to_rabbitmq, export_logs_to_rabbitmq, validate_rabbitmq_connection from utils.exporters import save_data_to_csv
from datetime import datetime
import sqlite3
# </IMPORTS> # </IMPORTS>
# <MAIN_CONTRACT for="AppOrchestrator"> # <MAIN_CONTRACT for="AppOrchestrator">
@@ -54,15 +56,33 @@ class AppOrchestrator:
) )
# </DEPENDENCY> # </DEPENDENCY>
self.run_id = datetime.now().strftime("%Y%m%d-%H%M%S")
self.logger = logging.getLogger(self.__class__.__name__) self.logger = logging.getLogger(self.__class__.__name__)
# </INIT> self.db_manager: Optional[DatabaseManager] = None
self.rabbitmq_exporter: Optional[RabbitMQExporter] = None
self.scraper = Scraper(
selectors=self.settings.selectors,
base_url=self.settings.base_url
)
self._setup()
# <CONTRACT for="_error_context"> def _register_run(self):
# description: "Контекстный менеджер для централизованной обработки ошибок в ключевых операциях." """Регистрирует текущий запуск в базе данных."""
# </CONTRACT> if not self.db_manager:
# <HELPER name="_error_context"> return
@contextmanager try:
def _error_context(self, operation: str): with self.db_manager as conn:
conn.execute(
"INSERT INTO parsing_runs (run_id, start_time) VALUES (?, ?)",
(self.run_id, datetime.now())
)
conn.commit()
self.logger.info(f"Запуск {self.run_id} зарегистрирован в БД.")
except sqlite3.Error as e:
self.logger.error(f"Не удалось зарегистрировать запуск {self.run_id} в БД: {e}")
raise
def _setup(self):
"""Контекстный менеджер для обработки ошибок с детальной диагностикой.""" """Контекстный менеджер для обработки ошибок с детальной диагностикой."""
try: try:
yield yield
@@ -109,10 +129,12 @@ class AppOrchestrator:
setup_logging(self.run_id, self.db_manager) setup_logging(self.run_id, self.db_manager)
if ENABLE_RABBITMQ_EXPORT: if ENABLE_RABBITMQ_EXPORT:
if validate_rabbitmq_connection(): self.rabbitmq_exporter = RabbitMQExporter()
if self.rabbitmq_exporter.connection.connect():
self.logger.info("[ACTION:_setup] Подключение к RabbitMQ доступно") self.logger.info("[ACTION:_setup] Подключение к RabbitMQ доступно")
else: else:
self.logger.warning("[ACTION:_setup] Подключение к RabbitMQ недоступно, экспорт будет пропущен") self.logger.warning("[ACTION:_setup] Подключение к RabbitMQ недоступно, экспорт будет пропущен")
self.rabbitmq_exporter = None
self.logger.info(f"[ACTION:_setup] Оркестратор запущен. Архитектура v2.0. Run ID: {self.run_id}") self.logger.info(f"[ACTION:_setup] Оркестратор запущен. Архитектура v2.0. Run ID: {self.run_id}")
# </CORE_LOGIC> # </CORE_LOGIC>
@@ -253,12 +275,10 @@ class AppOrchestrator:
except Exception as e: except Exception as e:
self.logger.error(f"[ACTION:_save_results] Ошибка при сохранении в БД: {e}") self.logger.error(f"[ACTION:_save_results] Ошибка при сохранении в БД: {e}")
if ENABLE_RABBITMQ_EXPORT: if ENABLE_RABBITMQ_EXPORT and self.rabbitmq_exporter:
# <DEPENDENCY name="export_data_to_rabbitmq" />
# ... (logic remains the same)
try: try:
data_to_rabbitmq = [p.model_dump() for p in self.final_data] data_to_rabbitmq = [p.model_dump() for p in self.final_data]
if export_data_to_rabbitmq(data_to_rabbitmq, self.run_id, self.run_id): if self.rabbitmq_exporter.export_products(data_to_rabbitmq, self.run_id):
self.logger.info("[ACTION:_save_results] Данные успешно экспортированы в RabbitMQ.") self.logger.info("[ACTION:_save_results] Данные успешно экспортированы в RabbitMQ.")
else: else:
self.logger.error("[ACTION:_save_results] Не удалось экспортировать данные в RabbitMQ.") self.logger.error("[ACTION:_save_results] Не удалось экспортировать данные в RabbitMQ.")
@@ -288,6 +308,10 @@ class AppOrchestrator:
self.db_manager.close() self.db_manager.close()
self.logger.debug("[ACTION:_cleanup] Соединение с базой данных закрыто.") self.logger.debug("[ACTION:_cleanup] Соединение с базой данных закрыто.")
if self.rabbitmq_exporter:
self.rabbitmq_exporter.close()
self.logger.debug("[ACTION:_cleanup] Соединение с RabbitMQ закрыто.")
if duration: if duration:
self.logger.info(f"[STATS][ACTION:_cleanup] Время выполнения: {duration.total_seconds():.2f} секунд") self.logger.info(f"[STATS][ACTION:_cleanup] Время выполнения: {duration.total_seconds():.2f} секунд")
self.logger.info(f"[STATS][ACTION:_cleanup] Успешность: {self.stats['successful_parses']}/{self.stats['total_urls']} ({self.stats['successful_parses']/self.stats['total_urls']*100:.1f}%)") self.logger.info(f"[STATS][ACTION:_cleanup] Успешность: {self.stats['successful_parses']}/{self.stats['total_urls']} ({self.stats['successful_parses']/self.stats['total_urls']*100:.1f}%)")
@@ -304,8 +328,12 @@ class AppOrchestrator:
# </CONTRACT> # </CONTRACT>
# <ENTRYPOINT name="run"> # <ENTRYPOINT name="run">
def run(self): def run(self):
"""Основной метод, запускающий весь процесс парсинга.""" """Основной метод, запускающий все этапы работы оркестратора."""
self.logger.info("="*50) self.logger.info(f"Запуск оркестратора. Run ID: {self.run_id}")
if self.db_manager:
self._register_run()
# ... остальной код ...
self.logger.info("[ENTRYPOINT:run] Запуск главного процесса оркестратора.") self.logger.info("[ENTRYPOINT:run] Запуск главного процесса оркестратора.")
self.logger.info("="*50) self.logger.info("="*50)