From 5742d474fdcf0a498ff7c31444ad9d3fee64e24f Mon Sep 17 00:00:00 2001 From: busya Date: Sun, 20 Jul 2025 09:29:19 +0300 Subject: [PATCH] monitoring refactor --- src/analyzer.py | 105 ++++++++++++++++++++++--------------------- src/core/database.py | 13 +++++- src/core/rabbitmq.py | 21 ++++----- src/core/settings.py | 11 +++++ src/orchestrator.py | 62 ++++++++++++++++++------- 5 files changed, 129 insertions(+), 83 deletions(-) diff --git a/src/analyzer.py b/src/analyzer.py index 5b4dfe4..226e30c 100644 --- a/src/analyzer.py +++ b/src/analyzer.py @@ -2,28 +2,36 @@ # import logging -from typing import List, Dict, Tuple -from sqlalchemy.orm import sessionmaker -from src.core.models import ProductVariant, ParsingRun -from sqlalchemy import create_engine +import sqlite3 +from typing import List, Dict, Tuple, Any from src.core.settings import settings # # -# description: "Анализирует данные двух последних запусков парсера и выявляет изменения." +# description: "Анализирует данные двух последних запусков парсера и выявляет изменения, используя прямые SQL-запросы." # class DataAnalyzer: # def __init__(self): - self.engine = create_engine(settings.db_path.absolute().as_uri()) - self.Session = sessionmaker(bind=self.engine) + self.db_path = settings.db_path self.logger = logging.getLogger(__name__) # + def _execute_query(self, query: str, params: tuple = ()) -> List[Any]: + try: + with sqlite3.connect(self.db_path) as conn: + cursor = conn.cursor() + cursor.execute(query, params) + return cursor.fetchall() + except sqlite3.Error as e: + self.logger.error(f"Ошибка выполнения SQL-запроса: {e}") + return [] + # - def _get_last_two_runs(self, session) -> Tuple[str, str]: + def _get_last_two_runs(self) -> Tuple[str, str]: """Возвращает ID двух последних успешных запусков.""" - runs = session.query(ParsingRun.run_id).order_by(ParsingRun.start_time.desc()).limit(2).all() + query = "SELECT run_id FROM parsing_runs ORDER BY start_time DESC LIMIT 2" + runs = self._execute_query(query) if len(runs) < 2: self.logger.warning("Найдено менее двух запусков. Сравнение невозможно.") return None, None @@ -31,10 +39,12 @@ class DataAnalyzer: # # - def _get_data_for_run(self, session, run_id: str) -> Dict[str, ProductVariant]: - """Возвращает данные о товарах для указанного запуска в виде словаря {url: ProductVariant}.""" - variants = session.query(ProductVariant).filter(ProductVariant.run_id == run_id).all() - return {v.url: v for v in variants} + def _get_data_for_run(self, run_id: str) -> Dict[str, Dict]: + """Возвращает данные о товарах для указанного запуска в виде словаря {url: product_data}.""" + query = "SELECT name, volume, price, url, is_in_stock FROM products WHERE run_id = ?" + rows = self._execute_query(query, (run_id,)) + # Преобразуем sqlite3.Row в словари + return {row[3]: dict(zip(['name', 'volume', 'price', 'url', 'is_in_stock'], row)) for row in rows} # # @@ -42,42 +52,35 @@ class DataAnalyzer: """ Сравнивает два последних запуска и генерирует HTML-отчет об изменениях. """ - # - with self.Session() as session: - current_run_id, prev_run_id = self._get_last_two_runs(session) - if not current_run_id or not prev_run_id: - return "" + current_run_id, prev_run_id = self._get_last_two_runs() + if not current_run_id or not prev_run_id: + return "" - self.logger.info(f"Сравнение запуска {current_run_id} с предыдущим з��пуском {prev_run_id}") - - current_data = self._get_data_for_run(session, current_run_id) - prev_data = self._get_data_for_run(session, prev_run_id) + self.logger.info(f"Сравнение запуска {current_run_id} с предыдущим запуском {prev_run_id}") + + current_data = self._get_data_for_run(current_run_id) + prev_data = self._get_data_for_run(prev_run_id) - price_changes = [] - availability_changes = [] - new_items = [] - removed_items = [] + price_changes = [] + availability_changes = [] + new_items = [] + removed_items = [] - # Поиск изменений и новых товаров - for url, current_item in current_data.items(): - prev_item = prev_data.get(url) - if prev_item: - # Изменение цены - if current_item.price != prev_item.price: - price_changes.append((current_item, prev_item.price)) - # Изменение статуса наличия - if current_item.is_available != prev_item.is_available: - availability_changes.append(current_item) - else: - new_items.append(current_item) - - # Поиск удаленных товаров - for url, prev_item in prev_data.items(): - if url not in current_data: - removed_items.append(prev_item) + for url, current_item in current_data.items(): + prev_item = prev_data.get(url) + if prev_item: + if current_item['price'] != prev_item['price']: + price_changes.append((current_item, prev_item['price'])) + if current_item['is_in_stock'] != prev_item['is_in_stock']: + availability_changes.append(current_item) + else: + new_items.append(current_item) + + for url, prev_item in prev_data.items(): + if url not in current_data: + removed_items.append(prev_item) - return self._format_report(price_changes, availability_changes, new_items, removed_items) - # + return self._format_report(price_changes, availability_changes, new_items, removed_items) # # @@ -92,24 +95,24 @@ class DataAnalyzer: if price_changes: report_parts.append("\n💰 Изменились цены:") for item, old_price in price_changes: - report_parts.append(f" • {item.name} ({item.volume}): {old_price} ₽ → {item.price} ₽") + report_parts.append(f" • {item['name']} ({item['volume']}): {old_price} ₽ → {item['price']} ₽") if availability_changes: report_parts.append("\n📦 Изменилось наличие:") for item in availability_changes: - status = "✅ В наличии" if item.is_available else "❌ Нет в наличии" - report_parts.append(f" • {item.name} ({item.volume}): {status}") + status = "✅ В наличии" if item['is_in_stock'] else "❌ Нет в наличии" + report_parts.append(f" • {item['name']} ({item['volume']}): {status}") if new_items: report_parts.append("\n✨ Новые товары:") for item in new_items: - report_parts.append(f" • {item.name} ({item.volume}) - {item.price} ₽") + report_parts.append(f" • {item['name']} ({item['volume']}) - {item['price']} ₽") if removed_items: report_parts.append("\n🗑️ Удаленные товары:") for item in removed_items: - report_parts.append(f" • {item.name} ({item.volume})") + report_parts.append(f" • {item['name']} ({item['volume']})") return "\n".join(report_parts) # -# +# \ No newline at end of file diff --git a/src/core/database.py b/src/core/database.py index ed26d8b..1b6e0b1 100644 --- a/src/core/database.py +++ b/src/core/database.py @@ -102,6 +102,13 @@ def init_database(db_path: Path, run_id: str): db_path.parent.mkdir(parents=True, exist_ok=True) with sqlite3.connect(db_path) as con: cur = con.cursor() + cur.execute(""" + CREATE TABLE IF NOT EXISTS parsing_runs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + run_id TEXT NOT NULL UNIQUE, + start_time TIMESTAMP NOT NULL + ) + """) cur.execute(""" CREATE TABLE IF NOT EXISTS products ( id INTEGER PRIMARY KEY AUTOINCREMENT, @@ -111,7 +118,8 @@ def init_database(db_path: Path, run_id: str): price INTEGER NOT NULL, url TEXT, is_in_stock BOOLEAN, - parsed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + parsed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + FOREIGN KEY (run_id) REFERENCES parsing_runs (run_id) ) """) cur.execute(""" @@ -120,7 +128,8 @@ def init_database(db_path: Path, run_id: str): run_id TEXT NOT NULL, timestamp TEXT NOT NULL, level TEXT NOT NULL, - message TEXT NOT NULL + message TEXT NOT NULL, + FOREIGN KEY (run_id) REFERENCES parsing_runs (run_id) ) """) con.commit() diff --git a/src/core/rabbitmq.py b/src/core/rabbitmq.py index d36a7c7..6f81674 100644 --- a/src/core/rabbitmq.py +++ b/src/core/rabbitmq.py @@ -12,12 +12,7 @@ import pika from pika.adapters.blocking_connection import BlockingChannel from pika.exceptions import AMQPConnectionError, AMQPChannelError, ConnectionClosed -from .settings import ( - RABBITMQ_HOST, RABBITMQ_PORT, RABBITMQ_USERNAME, RABBITMQ_PASSWORD, - RABBITMQ_VIRTUAL_HOST, RABBITMQ_CONNECTION_TIMEOUT, RABBITMQ_HEARTBEAT, - RABBITMQ_BLOCKED_CONNECTION_TIMEOUT, RABBITMQ_PRODUCTS_QUEUE, - RABBITMQ_LOGS_QUEUE, RABBITMQ_EXCHANGE -) +from .settings import settings logger = logging.getLogger(__name__) @@ -41,17 +36,17 @@ class RabbitMQConnection: Returns: pika.ConnectionParameters: Параметры подключения """ - credentials = pika.PlainCredentials(RABBITMQ_USERNAME, RABBITMQ_PASSWORD) + credentials = pika.PlainCredentials(settings.rabbitmq_user, settings.rabbitmq_password) return pika.ConnectionParameters( - host=RABBITMQ_HOST, - port=RABBITMQ_PORT, - virtual_host=RABBITMQ_VIRTUAL_HOST, + host=settings.rabbitmq_host, + port=settings.rabbitmq_port, + virtual_host=settings.rabbitmq_vhost, credentials=credentials, connection_attempts=3, retry_delay=5, - socket_timeout=RABBITMQ_CONNECTION_TIMEOUT, - heartbeat=RABBITMQ_HEARTBEAT, - blocked_connection_timeout=RABBITMQ_BLOCKED_CONNECTION_TIMEOUT + socket_timeout=30, # Hardcoded for now + heartbeat=600, # Hardcoded for now + blocked_connection_timeout=300 # Hardcoded for now ) def connect(self) -> bool: diff --git a/src/core/settings.py b/src/core/settings.py index d5cc207..3162985 100644 --- a/src/core/settings.py +++ b/src/core/settings.py @@ -82,6 +82,17 @@ class Settings(BaseModel): telegram_chat_id: str = Field(default=os.getenv('TELEGRAM_CHAT_ID', ''), description="ID чата для отправки уведомлений") # + # + rabbitmq_host: str = Field(default=os.getenv('RABBITMQ_HOST', 'localhost')) + rabbitmq_port: int = Field(default=int(os.getenv('RABBITMQ_PORT', 5672))) + rabbitmq_user: str = Field(default=os.getenv('RABBITMQ_USERNAME', 'guest')) + rabbitmq_password: str = Field(default=os.getenv('RABBITMQ_PASSWORD', 'guest')) + rabbitmq_vhost: str = Field(default=os.getenv('RABBITMQ_VIRTUAL_HOST', '/')) + rabbitmq_products_queue: str = Field(default=os.getenv('RABBITMQ_PRODUCTS_QUEUE', 'price_parser.products')) + rabbitmq_logs_queue: str = Field(default=os.getenv('RABBITMQ_LOGS_QUEUE', 'price_parser.logs')) + rabbitmq_exchange: str = Field(default=os.getenv('RABBITMQ_EXCHANGE', 'price_parser.exchange')) + # + # selectors: ScraperSelectors = ScraperSelectors( CATALOG_PRODUCT_LINK='.product-card h4 a.product-link', diff --git a/src/orchestrator.py b/src/orchestrator.py index eca6089..908c4b4 100644 --- a/src/orchestrator.py +++ b/src/orchestrator.py @@ -16,10 +16,12 @@ from concurrent.futures import ThreadPoolExecutor, as_completed from core.settings import Settings, ENABLE_RABBITMQ_EXPORT, ENABLE_CSV_EXPORT, ENABLE_DATABASE_EXPORT from core.models import ProductVariant -from core.database import init_database, save_data_to_db, DatabaseManager -from core.logging_config import setup_logging +from core.database import DatabaseManager, init_database, save_data_to_db +from core.rabbitmq import RabbitMQExporter from scraper.engine import Scraper -from utils.exporters import save_data_to_csv, export_data_to_rabbitmq, export_logs_to_rabbitmq, validate_rabbitmq_connection +from utils.exporters import save_data_to_csv +from datetime import datetime +import sqlite3 # # @@ -54,15 +56,33 @@ class AppOrchestrator: ) # + self.run_id = datetime.now().strftime("%Y%m%d-%H%M%S") self.logger = logging.getLogger(self.__class__.__name__) - # + self.db_manager: Optional[DatabaseManager] = None + self.rabbitmq_exporter: Optional[RabbitMQExporter] = None + self.scraper = Scraper( + selectors=self.settings.selectors, + base_url=self.settings.base_url + ) + self._setup() - # - # description: "Контекстный менеджер для централизованной обработки ошибок в ключевых операциях." - # - # - @contextmanager - def _error_context(self, operation: str): + def _register_run(self): + """Регистрирует текущий запуск в базе данных.""" + if not self.db_manager: + return + try: + with self.db_manager as conn: + conn.execute( + "INSERT INTO parsing_runs (run_id, start_time) VALUES (?, ?)", + (self.run_id, datetime.now()) + ) + conn.commit() + self.logger.info(f"Запуск {self.run_id} зарегистрирован в БД.") + except sqlite3.Error as e: + self.logger.error(f"Не удалось зарегистрировать запуск {self.run_id} в БД: {e}") + raise + + def _setup(self): """Контекстный менеджер для обработки ошибок с детальной диагностикой.""" try: yield @@ -109,10 +129,12 @@ class AppOrchestrator: setup_logging(self.run_id, self.db_manager) if ENABLE_RABBITMQ_EXPORT: - if validate_rabbitmq_connection(): + self.rabbitmq_exporter = RabbitMQExporter() + if self.rabbitmq_exporter.connection.connect(): self.logger.info("[ACTION:_setup] Подключение к RabbitMQ доступно") else: self.logger.warning("[ACTION:_setup] Подключение к RabbitMQ недоступно, экспорт будет пропущен") + self.rabbitmq_exporter = None self.logger.info(f"[ACTION:_setup] Оркестратор запущен. Архитектура v2.0. Run ID: {self.run_id}") # @@ -253,12 +275,10 @@ class AppOrchestrator: except Exception as e: self.logger.error(f"[ACTION:_save_results] Ошибка при сохранении в БД: {e}") - if ENABLE_RABBITMQ_EXPORT: - # - # ... (logic remains the same) + if ENABLE_RABBITMQ_EXPORT and self.rabbitmq_exporter: try: data_to_rabbitmq = [p.model_dump() for p in self.final_data] - if export_data_to_rabbitmq(data_to_rabbitmq, self.run_id, self.run_id): + if self.rabbitmq_exporter.export_products(data_to_rabbitmq, self.run_id): self.logger.info("[ACTION:_save_results] Данные успешно экспортированы в RabbitMQ.") else: self.logger.error("[ACTION:_save_results] Не удалось экспортировать данные в RabbitMQ.") @@ -287,6 +307,10 @@ class AppOrchestrator: if self.db_manager: self.db_manager.close() self.logger.debug("[ACTION:_cleanup] Соединение с базой данных закрыто.") + + if self.rabbitmq_exporter: + self.rabbitmq_exporter.close() + self.logger.debug("[ACTION:_cleanup] Соединение с RabbitMQ закрыто.") if duration: self.logger.info(f"[STATS][ACTION:_cleanup] Время выполнения: {duration.total_seconds():.2f} секунд") @@ -304,8 +328,12 @@ class AppOrchestrator: # # def run(self): - """Основной метод, запускающий весь процесс парсинга.""" - self.logger.info("="*50) + """Основной метод, запускающий все этапы работы оркестратора.""" + self.logger.info(f"Запуск оркестратора. Run ID: {self.run_id}") + if self.db_manager: + self._register_run() + + # ... остальной код ... self.logger.info("[ENTRYPOINT:run] Запуск главного процесса оркестратора.") self.logger.info("="*50)