Enhance application with new features, improved error handling, and performance optimizations. Key updates include: added data validation, retry strategies for HTTP requests, detailed logging, and support for RabbitMQ exports. Updated dependencies and enhanced README documentation for better setup instructions.
This commit is contained in:
350
src/core/rabbitmq.py
Normal file
350
src/core/rabbitmq.py
Normal file
@@ -0,0 +1,350 @@
|
||||
# [FILE] src/core/rabbitmq.py
|
||||
# ANCHOR: RabbitMQ_Module
|
||||
# Семантика: Модуль для работы с очередью сообщений RabbitMQ.
|
||||
# [CONTRACT]: Обеспечивает надежное подключение, отправку сообщений и обработку ошибок.
|
||||
# [COHERENCE]: Интегрирован с моделями данных и настройками приложения.
|
||||
|
||||
import logging
|
||||
import json
|
||||
from typing import Optional, Dict, Any
|
||||
from contextlib import contextmanager
|
||||
import pika
|
||||
from pika.exceptions import AMQPConnectionError, AMQPChannelError, ConnectionClosed
|
||||
|
||||
from .settings import (
|
||||
RABBITMQ_HOST, RABBITMQ_PORT, RABBITMQ_USERNAME, RABBITMQ_PASSWORD,
|
||||
RABBITMQ_VIRTUAL_HOST, RABBITMQ_CONNECTION_TIMEOUT, RABBITMQ_HEARTBEAT,
|
||||
RABBITMQ_BLOCKED_CONNECTION_TIMEOUT, RABBITMQ_PRODUCTS_QUEUE,
|
||||
RABBITMQ_LOGS_QUEUE, RABBITMQ_EXCHANGE
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class RabbitMQConnection:
|
||||
"""
|
||||
[CONTRACT]
|
||||
@description: Класс для управления подключением к RabbitMQ.
|
||||
@invariant: Обеспечивает надежное подключение с автоматическим переподключением.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
"""[INIT] Инициализация подключения к RabbitMQ."""
|
||||
self.connection: Optional[pika.BlockingConnection] = None
|
||||
self.channel: Optional[pika.channel.Channel] = None
|
||||
self._connection_params = self._build_connection_params()
|
||||
|
||||
def _build_connection_params(self) -> pika.ConnectionParameters:
|
||||
"""
|
||||
[HELPER] Строит параметры подключения к RabbitMQ.
|
||||
|
||||
Returns:
|
||||
pika.ConnectionParameters: Параметры подключения
|
||||
"""
|
||||
credentials = pika.PlainCredentials(RABBITMQ_USERNAME, RABBITMQ_PASSWORD)
|
||||
return pika.ConnectionParameters(
|
||||
host=RABBITMQ_HOST,
|
||||
port=RABBITMQ_PORT,
|
||||
virtual_host=RABBITMQ_VIRTUAL_HOST,
|
||||
credentials=credentials,
|
||||
connection_attempts=3,
|
||||
retry_delay=5,
|
||||
socket_timeout=RABBITMQ_CONNECTION_TIMEOUT,
|
||||
heartbeat=RABBITMQ_HEARTBEAT,
|
||||
blocked_connection_timeout=RABBITMQ_BLOCKED_CONNECTION_TIMEOUT
|
||||
)
|
||||
|
||||
def connect(self) -> bool:
|
||||
"""
|
||||
[CONTRACT]
|
||||
@description: Устанавливает подключение к RabbitMQ.
|
||||
@precondition: Параметры подключения корректны.
|
||||
@postcondition: Подключение установлено или False в случае ошибки.
|
||||
|
||||
Returns:
|
||||
bool: True если подключение успешно, False в противном случае
|
||||
"""
|
||||
try:
|
||||
logger.info(f"[RABBITMQ] Подключение к {RABBITMQ_HOST}:{RABBITMQ_PORT}")
|
||||
|
||||
self.connection = pika.BlockingConnection(self._connection_params)
|
||||
self.channel = self.connection.channel()
|
||||
|
||||
# [SETUP] Настройка exchange и очередей
|
||||
self._setup_exchange_and_queues()
|
||||
|
||||
logger.info("[RABBITMQ] Подключение успешно установлено")
|
||||
return True
|
||||
|
||||
except AMQPConnectionError as e:
|
||||
logger.error(f"[RABBITMQ] Ошибка подключения: {e}")
|
||||
return False
|
||||
except Exception as e:
|
||||
logger.error(f"[RABBITMQ] Непредвиденная ошибка при подключении: {e}")
|
||||
return False
|
||||
|
||||
def _setup_exchange_and_queues(self):
|
||||
"""
|
||||
[HELPER] Настраивает exchange и очереди в RabbitMQ.
|
||||
@invariant: Создает необходимые exchange и очереди, если они не существуют.
|
||||
"""
|
||||
try:
|
||||
# Создание exchange
|
||||
self.channel.exchange_declare(
|
||||
exchange=RABBITMQ_EXCHANGE,
|
||||
exchange_type='direct',
|
||||
durable=True
|
||||
)
|
||||
|
||||
# Создание очереди для продуктов
|
||||
self.channel.queue_declare(
|
||||
queue=RABBITMQ_PRODUCTS_QUEUE,
|
||||
durable=True
|
||||
)
|
||||
self.channel.queue_bind(
|
||||
exchange=RABBITMQ_EXCHANGE,
|
||||
queue=RABBITMQ_PRODUCTS_QUEUE,
|
||||
routing_key='products'
|
||||
)
|
||||
|
||||
# Создание очереди для логов
|
||||
self.channel.queue_declare(
|
||||
queue=RABBITMQ_LOGS_QUEUE,
|
||||
durable=True
|
||||
)
|
||||
self.channel.queue_bind(
|
||||
exchange=RABBITMQ_EXCHANGE,
|
||||
queue=RABBITMQ_LOGS_QUEUE,
|
||||
routing_key='logs'
|
||||
)
|
||||
|
||||
logger.info("[RABBITMQ] Exchange и очереди настроены")
|
||||
|
||||
except AMQPChannelError as e:
|
||||
logger.error(f"[RABBITMQ] Ошибка настройки очередей: {e}")
|
||||
raise
|
||||
|
||||
def disconnect(self):
|
||||
"""
|
||||
[CONTRACT]
|
||||
@description: Закрывает подключение к RabbitMQ.
|
||||
@postcondition: Подключение закрыто корректно.
|
||||
"""
|
||||
try:
|
||||
if self.channel and not self.channel.is_closed:
|
||||
self.channel.close()
|
||||
if self.connection and not self.connection.is_closed:
|
||||
self.connection.close()
|
||||
logger.info("[RABBITMQ] Подключение закрыто")
|
||||
except Exception as e:
|
||||
logger.error(f"[RABBITMQ] Ошибка при закрытии подключения: {e}")
|
||||
|
||||
def is_connected(self) -> bool:
|
||||
"""
|
||||
[HELPER] Проверяет, активно ли подключение.
|
||||
|
||||
Returns:
|
||||
bool: True если подключение активно, False в противном случае
|
||||
"""
|
||||
return (
|
||||
self.connection is not None and
|
||||
not self.connection.is_closed and
|
||||
self.channel is not None and
|
||||
not self.channel.is_closed
|
||||
)
|
||||
|
||||
def send_message(self, queue: str, message: Dict[str, Any], routing_key: str = None) -> bool:
|
||||
"""
|
||||
[CONTRACT]
|
||||
@description: Отправляет сообщение в указанную очередь.
|
||||
@precondition: Подключение активно, сообщение валидно.
|
||||
@postcondition: Сообщение отправлено или False в случае ошибки.
|
||||
|
||||
Args:
|
||||
queue: Название очереди
|
||||
message: Сообщение для отправки
|
||||
routing_key: Ключ маршрутизации (по умолчанию равен названию очереди)
|
||||
|
||||
Returns:
|
||||
bool: True если сообщение отправлено, False в противном случае
|
||||
"""
|
||||
if not self.is_connected():
|
||||
logger.error("[RABBITMQ] Попытка отправить сообщение без активного подключения")
|
||||
return False
|
||||
|
||||
try:
|
||||
routing_key = routing_key or queue
|
||||
message_body = json.dumps(message, ensure_ascii=False, default=str)
|
||||
|
||||
self.channel.basic_publish(
|
||||
exchange=RABBITMQ_EXCHANGE,
|
||||
routing_key=routing_key,
|
||||
body=message_body,
|
||||
properties=pika.BasicProperties(
|
||||
delivery_mode=2, # Сохранять сообщения на диск
|
||||
content_type='application/json'
|
||||
)
|
||||
)
|
||||
|
||||
logger.info(f"[RABBITMQ] Сообщение отправлено в очередь {queue}")
|
||||
return True
|
||||
|
||||
except AMQPChannelError as e:
|
||||
logger.error(f"[RABBITMQ] Ошибка отправки сообщения: {e}")
|
||||
return False
|
||||
except Exception as e:
|
||||
logger.error(f"[RABBITMQ] Непредвиденная ошибка при отправке: {e}")
|
||||
return False
|
||||
|
||||
@contextmanager
|
||||
def get_rabbitmq_connection():
|
||||
"""
|
||||
[CONTEXT_MANAGER]
|
||||
@description: Контекстный менеджер для работы с RabbitMQ.
|
||||
@invariant: Автоматически закрывает подключение при выходе из контекста.
|
||||
|
||||
Yields:
|
||||
RabbitMQConnection: Объект подключения к RabbitMQ
|
||||
"""
|
||||
connection = RabbitMQConnection()
|
||||
try:
|
||||
if connection.connect():
|
||||
yield connection
|
||||
else:
|
||||
logger.error("[RABBITMQ] Не удалось установить подключение")
|
||||
yield None
|
||||
finally:
|
||||
connection.disconnect()
|
||||
|
||||
class RabbitMQExporter:
|
||||
"""
|
||||
[CONTRACT]
|
||||
@description: Класс для экспорта данных в RabbitMQ.
|
||||
@invariant: Обеспечивает надежную отправку данных о продуктах и логов.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
"""[INIT] Инициализация экспортера RabbitMQ."""
|
||||
self.connection = RabbitMQConnection()
|
||||
|
||||
def export_products(self, products: list, run_id: str) -> bool:
|
||||
"""
|
||||
[CONTRACT]
|
||||
@description: Экспортирует данные о продуктах в RabbitMQ.
|
||||
@precondition: Список продуктов не пустой, run_id валиден.
|
||||
@postcondition: Данные отправлены в очередь или False в случае ошибки.
|
||||
|
||||
Args:
|
||||
products: Список продуктов для экспорта
|
||||
run_id: Идентификатор запуска парсера
|
||||
|
||||
Returns:
|
||||
bool: True если экспорт успешен, False в противном случае
|
||||
"""
|
||||
if not products:
|
||||
logger.warning("[RABBITMQ] Попытка экспорта пустого списка продуктов")
|
||||
return False
|
||||
|
||||
try:
|
||||
from .models import ProductDataMessage, ProductVariant
|
||||
|
||||
# Преобразование данных в Pydantic модели
|
||||
product_variants = []
|
||||
for product in products:
|
||||
try:
|
||||
variant = ProductVariant(**product)
|
||||
product_variants.append(variant)
|
||||
except Exception as e:
|
||||
logger.error(f"[RABBITMQ] Ошибка валидации продукта: {e}")
|
||||
continue
|
||||
|
||||
if not product_variants:
|
||||
logger.error("[RABBITMQ] Нет валидных продуктов для экспорта")
|
||||
return False
|
||||
|
||||
# Создание сообщения
|
||||
message = ProductDataMessage(
|
||||
source="price_parser",
|
||||
products=product_variants,
|
||||
run_id=run_id,
|
||||
total_count=len(product_variants)
|
||||
)
|
||||
|
||||
# Отправка сообщения
|
||||
if not self.connection.is_connected() and not self.connection.connect():
|
||||
return False
|
||||
|
||||
return self.connection.send_message(
|
||||
queue=RABBITMQ_PRODUCTS_QUEUE,
|
||||
message=message.dict(),
|
||||
routing_key='products'
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[RABBITMQ] Ошибка экспорта продуктов: {e}")
|
||||
return False
|
||||
|
||||
def export_logs(self, log_records: list, run_id: str) -> bool:
|
||||
"""
|
||||
[CONTRACT]
|
||||
@description: Экспортирует логи в RabbitMQ.
|
||||
@precondition: Список логов не пустой, run_id валиден.
|
||||
@postcondition: Логи отправлены в очередь или False в случае ошибки.
|
||||
|
||||
Args:
|
||||
log_records: Список записей логов
|
||||
run_id: Идентификатор запуска парсера
|
||||
|
||||
Returns:
|
||||
bool: True если экспорт успешен, False в противном случае
|
||||
"""
|
||||
if not log_records:
|
||||
logger.warning("[RABBITMQ] Попытка экспорта пустого списка логов")
|
||||
return False
|
||||
|
||||
try:
|
||||
from .models import LogMessage, LogRecordModel
|
||||
|
||||
# Преобразование данных в Pydantic модели
|
||||
log_models = []
|
||||
for log_record in log_records:
|
||||
try:
|
||||
log_model = LogRecordModel(**log_record)
|
||||
log_models.append(log_model)
|
||||
except Exception as e:
|
||||
logger.error(f"[RABBITMQ] Ошибка валидации лога: {e}")
|
||||
continue
|
||||
|
||||
if not log_models:
|
||||
logger.error("[RABBITMQ] Нет валидных логов для экспорта")
|
||||
return False
|
||||
|
||||
# Создание сообщения
|
||||
message = LogMessage(
|
||||
source="price_parser",
|
||||
log_records=log_models,
|
||||
run_id=run_id
|
||||
)
|
||||
|
||||
# Отправка сообщения
|
||||
if not self.connection.is_connected() and not self.connection.connect():
|
||||
return False
|
||||
|
||||
return self.connection.send_message(
|
||||
queue=RABBITMQ_LOGS_QUEUE,
|
||||
message=message.dict(),
|
||||
routing_key='logs'
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[RABBITMQ] Ошибка экспорта логов: {e}")
|
||||
return False
|
||||
|
||||
def close(self):
|
||||
"""
|
||||
[CONTRACT]
|
||||
@description: Закрывает подключение к RabbitMQ.
|
||||
@postcondition: Подключение закрыто корректно.
|
||||
"""
|
||||
self.connection.disconnect()
|
||||
|
||||
# [COHERENCE_CHECK_PASSED] Модуль RabbitMQ создан с полной поддержкой контрактов и обработки ошибок.
|
||||
Reference in New Issue
Block a user