#
#
# Инкапсулирует всю логику взаимодействия с базой данных SQLite.
# Отвечает за управление соединениями, создание схемы, сохранение данных и запись логов.
#
#
import logging
import sqlite3
from datetime import datetime
from pathlib import Path
from typing import List, Dict, Optional
from .models import LogRecordModel
#
#
# description: "Контекстный менеджер для безопасного управления соединением с SQLite."
# preconditions: "`db_path` должен быть валидным путем `Path`."
# postconditions: "Гарантирует корректное открытие и закрытие соединения с БД."
#
class DatabaseManager:
#
def __init__(self, db_path: Path):
self.db_path = db_path
self.conn: Optional[sqlite3.Connection] = None
self.logger = logging.getLogger(self.__class__.__name__)
#
#
def __enter__(self):
self.logger.debug(f"[INIT:DatabaseManager] Открытие соединения с БД: {self.db_path}")
try:
self.conn = sqlite3.connect(self.db_path)
self.conn.row_factory = sqlite3.Row
self.logger.debug("[INIT:DatabaseManager] [COHERENCE_CHECK_PASSED] Соединение с БД установлено.")
return self.conn
except sqlite3.Error as e:
self.logger.critical(f"[INIT:DatabaseManager] [CRITICAL] Ошибка подключения к БД: {e}", exc_info=True)
raise ConnectionError(f"Не удалось подключиться к базе данных {self.db_path}") from e
#
#
def __exit__(self, exc_type, exc_val, exc_tb):
if self.conn:
self.conn.close()
self.logger.debug("[CLEANUP:DatabaseManager] Соединение с БД закрыто.")
if exc_type:
self.logger.error(f"[ERROR:DatabaseManager] Исключение в контекстном менеджере БД: {exc_type.__name__}: {exc_val}", exc_info=True)
#
#
def close(self):
if self.conn:
self.conn.close()
self.conn = None
self.logger.debug("[CLEANUP:DatabaseManager] Соединение с БД явно закрыто.")
#
#
# description: "Обработчик логирования, который записывает логи в таблицу `logs` в SQLite."
# preconditions: "`db_manager` должен быть инициализирован."
# postconditions: "Записи логов сохраняются в базу данных."
#
class DatabaseLogHandler(logging.Handler):
#
def __init__(self, db_manager: DatabaseManager, run_id: str):
super().__init__()
self.db_manager = db_manager
self.run_id = run_id
#
#
def emit(self, record: logging.LogRecord):
try:
with sqlite3.connect(self.db_manager.db_path) as con:
cur = con.cursor()
log_entry = LogRecordModel(
run_id=self.run_id,
timestamp=datetime.fromtimestamp(record.created),
level=record.levelname,
message=self.format(record)
)
cur.execute(
"INSERT INTO logs (run_id, timestamp, level, message) VALUES (?, ?, ?, ?)",
(log_entry.run_id, log_entry.timestamp.isoformat(), log_entry.level, log_entry.message)
)
con.commit()
except Exception as e:
print(f"CRITICAL: [COHERENCE_CHECK_FAILED] Не удалось записать лог в базу данных: {e}", flush=True)
#
#
# description: "Инициализирует схему базы данных, создавая таблицы `products` и `logs`, если они не существуют."
# preconditions: "`db_path` должен быть валидным путем `Path`."
# side_effects: "Создает директорию для БД, если она не существует."
#
#
def init_database(db_path: Path, run_id: str):
log_prefix = f"[ACTION:init_database(id={run_id})]"
logging.info(f"{log_prefix} Инициализация базы данных: {db_path}")
try:
db_path.parent.mkdir(parents=True, exist_ok=True)
with sqlite3.connect(db_path) as con:
cur = con.cursor()
cur.execute("""
CREATE TABLE IF NOT EXISTS parsing_runs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
run_id TEXT NOT NULL UNIQUE,
start_time TIMESTAMP NOT NULL
)
""")
cur.execute("""
CREATE TABLE IF NOT EXISTS products (
id INTEGER PRIMARY KEY AUTOINCREMENT,
run_id TEXT NOT NULL,
name TEXT NOT NULL,
volume TEXT,
price INTEGER NOT NULL,
url TEXT,
is_in_stock BOOLEAN,
parsed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (run_id) REFERENCES parsing_runs (run_id)
)
""")
cur.execute("""
CREATE TABLE IF NOT EXISTS logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
run_id TEXT NOT NULL,
timestamp TEXT NOT NULL,
level TEXT NOT NULL,
message TEXT NOT NULL,
FOREIGN KEY (run_id) REFERENCES parsing_runs (run_id)
)
""")
con.commit()
logging.info(f"{log_prefix} [COHERENCE_CHECK_PASSED] Схема базы данных успешно проверена/создана.")
except sqlite3.Error as e:
logging.error(f"{log_prefix} [COHERENCE_CHECK_FAILED] Ошибка SQLite при инициализации БД: {e}", exc_info=True)
raise ConnectionError(f"Ошибка БД при инициализации: {e}") from e
#
#
# description: "Сохраняет список словарей с данными о продуктах в таблицу `products`."
# preconditions:
# - "`data` должен быть списком словарей, соответствующих модели `ProductVariant`."
# - "`db_path` должен указывать на существующую и инициализированную БД."
# postconditions: "Данные вставлены в таблицу. Возвращает True в случае успеха."
#
#
def save_data_to_db(data: List[Dict], db_path: Path, run_id: str) -> bool:
log_prefix = f"[ACTION:save_data_to_db(id={run_id})]"
if not data:
logging.warning(f"{log_prefix} [CONTRACT_VIOLATION] Данные для сохранения отсутствуют.")
return False
logging.info(f"{log_prefix} Начало сохранения {len(data)} записей в БД: {db_path}")
try:
with sqlite3.connect(db_path) as con:
cur = con.cursor()
products_to_insert = [
(run_id, item['name'], item['volume'], item['price'], str(item['url']), item['is_in_stock'])
for item in data
]
if products_to_insert:
cur.executemany(
"INSERT INTO products (run_id, name, volume, price, url, is_in_stock) VALUES (?, ?, ?, ?, ?, ?)",
products_to_insert
)
con.commit()
logging.info(f"{log_prefix} [COHERENCE_CHECK_PASSED] {len(products_to_insert)} записей успешно сохранено.")
return True
else:
logging.warning(f"{log_prefix} Нет ��алидных записей для сохранения.")
return False
except sqlite3.Error as e:
logging.error(f"{log_prefix} [COHERENCE_CHECK_FAILED] Ошибка SQLite при сохранении данных: {e}", exc_info=True)
return False
except Exception as e:
logging.critical(f"{log_prefix} [CRITICAL] Непредвиденная ошибка при сохранении данных в БД: {e}", exc_info=True)
return False
#
#