diff --git a/src/analyzer.py b/src/analyzer.py
index 5b4dfe4..226e30c 100644
--- a/src/analyzer.py
+++ b/src/analyzer.py
@@ -2,28 +2,36 @@
#
import logging
-from typing import List, Dict, Tuple
-from sqlalchemy.orm import sessionmaker
-from src.core.models import ProductVariant, ParsingRun
-from sqlalchemy import create_engine
+import sqlite3
+from typing import List, Dict, Tuple, Any
from src.core.settings import settings
#
#
-# description: "Анализирует данные двух последних запусков парсера и выявляет изменения."
+# description: "Анализирует данные двух последних запусков парсера и выявляет изменения, используя прямые SQL-запросы."
#
class DataAnalyzer:
#
def __init__(self):
- self.engine = create_engine(settings.db_path.absolute().as_uri())
- self.Session = sessionmaker(bind=self.engine)
+ self.db_path = settings.db_path
self.logger = logging.getLogger(__name__)
#
+ def _execute_query(self, query: str, params: tuple = ()) -> List[Any]:
+ try:
+ with sqlite3.connect(self.db_path) as conn:
+ cursor = conn.cursor()
+ cursor.execute(query, params)
+ return cursor.fetchall()
+ except sqlite3.Error as e:
+ self.logger.error(f"Ошибка выполнения SQL-запроса: {e}")
+ return []
+
#
- def _get_last_two_runs(self, session) -> Tuple[str, str]:
+ def _get_last_two_runs(self) -> Tuple[str, str]:
"""Возвращает ID двух последних успешных запусков."""
- runs = session.query(ParsingRun.run_id).order_by(ParsingRun.start_time.desc()).limit(2).all()
+ query = "SELECT run_id FROM parsing_runs ORDER BY start_time DESC LIMIT 2"
+ runs = self._execute_query(query)
if len(runs) < 2:
self.logger.warning("Найдено менее двух запусков. Сравнение невозможно.")
return None, None
@@ -31,10 +39,12 @@ class DataAnalyzer:
#
#
- def _get_data_for_run(self, session, run_id: str) -> Dict[str, ProductVariant]:
- """Возвращает данные о товарах для указанного запуска в виде словаря {url: ProductVariant}."""
- variants = session.query(ProductVariant).filter(ProductVariant.run_id == run_id).all()
- return {v.url: v for v in variants}
+ def _get_data_for_run(self, run_id: str) -> Dict[str, Dict]:
+ """Возвращает данные о товарах для указанного запуска в виде словаря {url: product_data}."""
+ query = "SELECT name, volume, price, url, is_in_stock FROM products WHERE run_id = ?"
+ rows = self._execute_query(query, (run_id,))
+ # Преобразуем sqlite3.Row в словари
+ return {row[3]: dict(zip(['name', 'volume', 'price', 'url', 'is_in_stock'], row)) for row in rows}
#
#
@@ -42,42 +52,35 @@ class DataAnalyzer:
"""
Сравнивает два последних запуска и генерирует HTML-отчет об изменениях.
"""
- #
- with self.Session() as session:
- current_run_id, prev_run_id = self._get_last_two_runs(session)
- if not current_run_id or not prev_run_id:
- return ""
+ current_run_id, prev_run_id = self._get_last_two_runs()
+ if not current_run_id or not prev_run_id:
+ return ""
- self.logger.info(f"Сравнение запуска {current_run_id} с предыдущим з��пуском {prev_run_id}")
-
- current_data = self._get_data_for_run(session, current_run_id)
- prev_data = self._get_data_for_run(session, prev_run_id)
+ self.logger.info(f"Сравнение запуска {current_run_id} с предыдущим запуском {prev_run_id}")
+
+ current_data = self._get_data_for_run(current_run_id)
+ prev_data = self._get_data_for_run(prev_run_id)
- price_changes = []
- availability_changes = []
- new_items = []
- removed_items = []
+ price_changes = []
+ availability_changes = []
+ new_items = []
+ removed_items = []
- # Поиск изменений и новых товаров
- for url, current_item in current_data.items():
- prev_item = prev_data.get(url)
- if prev_item:
- # Изменение цены
- if current_item.price != prev_item.price:
- price_changes.append((current_item, prev_item.price))
- # Изменение статуса наличия
- if current_item.is_available != prev_item.is_available:
- availability_changes.append(current_item)
- else:
- new_items.append(current_item)
-
- # Поиск удаленных товаров
- for url, prev_item in prev_data.items():
- if url not in current_data:
- removed_items.append(prev_item)
+ for url, current_item in current_data.items():
+ prev_item = prev_data.get(url)
+ if prev_item:
+ if current_item['price'] != prev_item['price']:
+ price_changes.append((current_item, prev_item['price']))
+ if current_item['is_in_stock'] != prev_item['is_in_stock']:
+ availability_changes.append(current_item)
+ else:
+ new_items.append(current_item)
+
+ for url, prev_item in prev_data.items():
+ if url not in current_data:
+ removed_items.append(prev_item)
- return self._format_report(price_changes, availability_changes, new_items, removed_items)
- #
+ return self._format_report(price_changes, availability_changes, new_items, removed_items)
#
#
@@ -92,24 +95,24 @@ class DataAnalyzer:
if price_changes:
report_parts.append("\n💰 Изменились цены:")
for item, old_price in price_changes:
- report_parts.append(f" • {item.name} ({item.volume}): {old_price} ₽ → {item.price} ₽")
+ report_parts.append(f" • {item['name']} ({item['volume']}): {old_price} ₽ → {item['price']} ₽")
if availability_changes:
report_parts.append("\n📦 Изменилось наличие:")
for item in availability_changes:
- status = "✅ В наличии" if item.is_available else "❌ Нет в наличии"
- report_parts.append(f" • {item.name} ({item.volume}): {status}")
+ status = "✅ В наличии" if item['is_in_stock'] else "❌ Нет в наличии"
+ report_parts.append(f" • {item['name']} ({item['volume']}): {status}")
if new_items:
report_parts.append("\n✨ Новые товары:")
for item in new_items:
- report_parts.append(f" • {item.name} ({item.volume}) - {item.price} ₽")
+ report_parts.append(f" • {item['name']} ({item['volume']}) - {item['price']} ₽")
if removed_items:
report_parts.append("\n🗑️ Удаленные товары:")
for item in removed_items:
- report_parts.append(f" • {item.name} ({item.volume})")
+ report_parts.append(f" • {item['name']} ({item['volume']})")
return "\n".join(report_parts)
#
-#
+#
\ No newline at end of file
diff --git a/src/core/database.py b/src/core/database.py
index ed26d8b..1b6e0b1 100644
--- a/src/core/database.py
+++ b/src/core/database.py
@@ -102,6 +102,13 @@ def init_database(db_path: Path, run_id: str):
db_path.parent.mkdir(parents=True, exist_ok=True)
with sqlite3.connect(db_path) as con:
cur = con.cursor()
+ cur.execute("""
+ CREATE TABLE IF NOT EXISTS parsing_runs (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ run_id TEXT NOT NULL UNIQUE,
+ start_time TIMESTAMP NOT NULL
+ )
+ """)
cur.execute("""
CREATE TABLE IF NOT EXISTS products (
id INTEGER PRIMARY KEY AUTOINCREMENT,
@@ -111,7 +118,8 @@ def init_database(db_path: Path, run_id: str):
price INTEGER NOT NULL,
url TEXT,
is_in_stock BOOLEAN,
- parsed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
+ parsed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
+ FOREIGN KEY (run_id) REFERENCES parsing_runs (run_id)
)
""")
cur.execute("""
@@ -120,7 +128,8 @@ def init_database(db_path: Path, run_id: str):
run_id TEXT NOT NULL,
timestamp TEXT NOT NULL,
level TEXT NOT NULL,
- message TEXT NOT NULL
+ message TEXT NOT NULL,
+ FOREIGN KEY (run_id) REFERENCES parsing_runs (run_id)
)
""")
con.commit()
diff --git a/src/core/rabbitmq.py b/src/core/rabbitmq.py
index d36a7c7..6f81674 100644
--- a/src/core/rabbitmq.py
+++ b/src/core/rabbitmq.py
@@ -12,12 +12,7 @@ import pika
from pika.adapters.blocking_connection import BlockingChannel
from pika.exceptions import AMQPConnectionError, AMQPChannelError, ConnectionClosed
-from .settings import (
- RABBITMQ_HOST, RABBITMQ_PORT, RABBITMQ_USERNAME, RABBITMQ_PASSWORD,
- RABBITMQ_VIRTUAL_HOST, RABBITMQ_CONNECTION_TIMEOUT, RABBITMQ_HEARTBEAT,
- RABBITMQ_BLOCKED_CONNECTION_TIMEOUT, RABBITMQ_PRODUCTS_QUEUE,
- RABBITMQ_LOGS_QUEUE, RABBITMQ_EXCHANGE
-)
+from .settings import settings
logger = logging.getLogger(__name__)
@@ -41,17 +36,17 @@ class RabbitMQConnection:
Returns:
pika.ConnectionParameters: Параметры подключения
"""
- credentials = pika.PlainCredentials(RABBITMQ_USERNAME, RABBITMQ_PASSWORD)
+ credentials = pika.PlainCredentials(settings.rabbitmq_user, settings.rabbitmq_password)
return pika.ConnectionParameters(
- host=RABBITMQ_HOST,
- port=RABBITMQ_PORT,
- virtual_host=RABBITMQ_VIRTUAL_HOST,
+ host=settings.rabbitmq_host,
+ port=settings.rabbitmq_port,
+ virtual_host=settings.rabbitmq_vhost,
credentials=credentials,
connection_attempts=3,
retry_delay=5,
- socket_timeout=RABBITMQ_CONNECTION_TIMEOUT,
- heartbeat=RABBITMQ_HEARTBEAT,
- blocked_connection_timeout=RABBITMQ_BLOCKED_CONNECTION_TIMEOUT
+ socket_timeout=30, # Hardcoded for now
+ heartbeat=600, # Hardcoded for now
+ blocked_connection_timeout=300 # Hardcoded for now
)
def connect(self) -> bool:
diff --git a/src/core/settings.py b/src/core/settings.py
index d5cc207..3162985 100644
--- a/src/core/settings.py
+++ b/src/core/settings.py
@@ -82,6 +82,17 @@ class Settings(BaseModel):
telegram_chat_id: str = Field(default=os.getenv('TELEGRAM_CHAT_ID', ''), description="ID чата для отправки уведомлений")
#
+ #
+ rabbitmq_host: str = Field(default=os.getenv('RABBITMQ_HOST', 'localhost'))
+ rabbitmq_port: int = Field(default=int(os.getenv('RABBITMQ_PORT', 5672)))
+ rabbitmq_user: str = Field(default=os.getenv('RABBITMQ_USERNAME', 'guest'))
+ rabbitmq_password: str = Field(default=os.getenv('RABBITMQ_PASSWORD', 'guest'))
+ rabbitmq_vhost: str = Field(default=os.getenv('RABBITMQ_VIRTUAL_HOST', '/'))
+ rabbitmq_products_queue: str = Field(default=os.getenv('RABBITMQ_PRODUCTS_QUEUE', 'price_parser.products'))
+ rabbitmq_logs_queue: str = Field(default=os.getenv('RABBITMQ_LOGS_QUEUE', 'price_parser.logs'))
+ rabbitmq_exchange: str = Field(default=os.getenv('RABBITMQ_EXCHANGE', 'price_parser.exchange'))
+ #
+
#
selectors: ScraperSelectors = ScraperSelectors(
CATALOG_PRODUCT_LINK='.product-card h4 a.product-link',
diff --git a/src/orchestrator.py b/src/orchestrator.py
index eca6089..908c4b4 100644
--- a/src/orchestrator.py
+++ b/src/orchestrator.py
@@ -16,10 +16,12 @@ from concurrent.futures import ThreadPoolExecutor, as_completed
from core.settings import Settings, ENABLE_RABBITMQ_EXPORT, ENABLE_CSV_EXPORT, ENABLE_DATABASE_EXPORT
from core.models import ProductVariant
-from core.database import init_database, save_data_to_db, DatabaseManager
-from core.logging_config import setup_logging
+from core.database import DatabaseManager, init_database, save_data_to_db
+from core.rabbitmq import RabbitMQExporter
from scraper.engine import Scraper
-from utils.exporters import save_data_to_csv, export_data_to_rabbitmq, export_logs_to_rabbitmq, validate_rabbitmq_connection
+from utils.exporters import save_data_to_csv
+from datetime import datetime
+import sqlite3
#
#
@@ -54,15 +56,33 @@ class AppOrchestrator:
)
#
+ self.run_id = datetime.now().strftime("%Y%m%d-%H%M%S")
self.logger = logging.getLogger(self.__class__.__name__)
- #
+ self.db_manager: Optional[DatabaseManager] = None
+ self.rabbitmq_exporter: Optional[RabbitMQExporter] = None
+ self.scraper = Scraper(
+ selectors=self.settings.selectors,
+ base_url=self.settings.base_url
+ )
+ self._setup()
- #
- # description: "Контекстный менеджер для централизованной обработки ошибок в ключевых операциях."
- #
- #
- @contextmanager
- def _error_context(self, operation: str):
+ def _register_run(self):
+ """Регистрирует текущий запуск в базе данных."""
+ if not self.db_manager:
+ return
+ try:
+ with self.db_manager as conn:
+ conn.execute(
+ "INSERT INTO parsing_runs (run_id, start_time) VALUES (?, ?)",
+ (self.run_id, datetime.now())
+ )
+ conn.commit()
+ self.logger.info(f"Запуск {self.run_id} зарегистрирован в БД.")
+ except sqlite3.Error as e:
+ self.logger.error(f"Не удалось зарегистрировать запуск {self.run_id} в БД: {e}")
+ raise
+
+ def _setup(self):
"""Контекстный менеджер для обработки ошибок с детальной диагностикой."""
try:
yield
@@ -109,10 +129,12 @@ class AppOrchestrator:
setup_logging(self.run_id, self.db_manager)
if ENABLE_RABBITMQ_EXPORT:
- if validate_rabbitmq_connection():
+ self.rabbitmq_exporter = RabbitMQExporter()
+ if self.rabbitmq_exporter.connection.connect():
self.logger.info("[ACTION:_setup] Подключение к RabbitMQ доступно")
else:
self.logger.warning("[ACTION:_setup] Подключение к RabbitMQ недоступно, экспорт будет пропущен")
+ self.rabbitmq_exporter = None
self.logger.info(f"[ACTION:_setup] Оркестратор запущен. Архитектура v2.0. Run ID: {self.run_id}")
#
@@ -253,12 +275,10 @@ class AppOrchestrator:
except Exception as e:
self.logger.error(f"[ACTION:_save_results] Ошибка при сохранении в БД: {e}")
- if ENABLE_RABBITMQ_EXPORT:
- #
- # ... (logic remains the same)
+ if ENABLE_RABBITMQ_EXPORT and self.rabbitmq_exporter:
try:
data_to_rabbitmq = [p.model_dump() for p in self.final_data]
- if export_data_to_rabbitmq(data_to_rabbitmq, self.run_id, self.run_id):
+ if self.rabbitmq_exporter.export_products(data_to_rabbitmq, self.run_id):
self.logger.info("[ACTION:_save_results] Данные успешно экспортированы в RabbitMQ.")
else:
self.logger.error("[ACTION:_save_results] Не удалось экспортировать данные в RabbitMQ.")
@@ -287,6 +307,10 @@ class AppOrchestrator:
if self.db_manager:
self.db_manager.close()
self.logger.debug("[ACTION:_cleanup] Соединение с базой данных закрыто.")
+
+ if self.rabbitmq_exporter:
+ self.rabbitmq_exporter.close()
+ self.logger.debug("[ACTION:_cleanup] Соединение с RabbitMQ закрыто.")
if duration:
self.logger.info(f"[STATS][ACTION:_cleanup] Время выполнения: {duration.total_seconds():.2f} секунд")
@@ -304,8 +328,12 @@ class AppOrchestrator:
#
#
def run(self):
- """Основной метод, запускающий весь процесс парсинга."""
- self.logger.info("="*50)
+ """Основной метод, запускающий все этапы работы оркестратора."""
+ self.logger.info(f"Запуск оркестратора. Run ID: {self.run_id}")
+ if self.db_manager:
+ self._register_run()
+
+ # ... остальной код ...
self.logger.info("[ENTRYPOINT:run] Запуск главного процесса оркестратора.")
self.logger.info("="*50)