rabbitmq
This commit is contained in:
@@ -9,6 +9,7 @@ import json
|
||||
from typing import Optional, Dict, Any
|
||||
from contextlib import contextmanager
|
||||
import pika
|
||||
from pika.adapters.blocking_connection import BlockingChannel
|
||||
from pika.exceptions import AMQPConnectionError, AMQPChannelError, ConnectionClosed
|
||||
|
||||
from .settings import (
|
||||
@@ -30,7 +31,7 @@ class RabbitMQConnection:
|
||||
def __init__(self):
|
||||
"""[INIT] Инициализация подключения к RabbitMQ."""
|
||||
self.connection: Optional[pika.BlockingConnection] = None
|
||||
self.channel: Optional[pika.channel.Channel] = None
|
||||
self.channel: Optional[BlockingChannel] = None
|
||||
self._connection_params = self._build_connection_params()
|
||||
|
||||
def _build_connection_params(self) -> pika.ConnectionParameters:
|
||||
@@ -87,6 +88,9 @@ class RabbitMQConnection:
|
||||
[HELPER] Настраивает exchange и очереди в RabbitMQ.
|
||||
@invariant: Создает необходимые exchange и очереди, если они не существуют.
|
||||
"""
|
||||
if not self.channel:
|
||||
raise AMQPChannelError("Channel is not initialized")
|
||||
|
||||
try:
|
||||
# Создание exchange
|
||||
self.channel.exchange_declare(
|
||||
@@ -152,7 +156,7 @@ class RabbitMQConnection:
|
||||
not self.channel.is_closed
|
||||
)
|
||||
|
||||
def send_message(self, queue: str, message: Dict[str, Any], routing_key: str = None) -> bool:
|
||||
def send_message(self, queue: str, message: Dict[str, Any], routing_key: Optional[str] = None) -> bool:
|
||||
"""
|
||||
[CONTRACT]
|
||||
@description: Отправляет сообщение в указанную очередь.
|
||||
@@ -167,7 +171,7 @@ class RabbitMQConnection:
|
||||
Returns:
|
||||
bool: True если сообщение отправлено, False в противном случае
|
||||
"""
|
||||
if not self.is_connected():
|
||||
if not self.is_connected() or not self.channel:
|
||||
logger.error("[RABBITMQ] Попытка отправить сообщение без активного подключения")
|
||||
return False
|
||||
|
||||
|
||||
@@ -11,6 +11,7 @@ from bs4 import BeautifulSoup
|
||||
from typing import List, Optional
|
||||
from requests.adapters import HTTPAdapter
|
||||
from urllib3.util.retry import Retry
|
||||
from pydantic import HttpUrl
|
||||
|
||||
from core.models import ProductVariant # [FIX] Импорт ProductVariant
|
||||
from core.settings import ScraperSelectors
|
||||
@@ -125,7 +126,7 @@ class Scraper:
|
||||
unique_urls = set()
|
||||
for link in links:
|
||||
href = link.get('href')
|
||||
if href:
|
||||
if href and isinstance(href, str):
|
||||
full_url = urljoin(self.base_url, href)
|
||||
unique_urls.add(full_url)
|
||||
else:
|
||||
|
||||
Reference in New Issue
Block a user