Files
peptide-parcer/src/core/database.py
2025-07-20 09:29:19 +03:00

186 lines
9.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# <MODULE name="core.database" semantics="database_interaction_logic" />
# <DESIGN_NOTE>
# Инкапсулирует всю логику взаимодействия с базой данных SQLite.
# Отвечает за управление соединениями, создание схемы, сохранение данных и запись логов.
# </DESIGN_NOTE>
# <IMPORTS>
import logging
import sqlite3
from datetime import datetime
from pathlib import Path
from typing import List, Dict, Optional
from .models import LogRecordModel
# </IMPORTS>
# <MAIN_CONTRACT for="DatabaseManager">
# description: "Контекстный менеджер для безопасного управления соединением с SQLite."
# preconditions: "`db_path` должен быть валидным путем `Path`."
# postconditions: "Гарантирует корректное открытие и закрытие соединения с БД."
# </MAIN_CONTRACT>
class DatabaseManager:
# <INIT name="__init__">
def __init__(self, db_path: Path):
self.db_path = db_path
self.conn: Optional[sqlite3.Connection] = None
self.logger = logging.getLogger(self.__class__.__name__)
# </INIT>
# <ACTION name="__enter__">
def __enter__(self):
self.logger.debug(f"[INIT:DatabaseManager] Открытие соединения с БД: {self.db_path}")
try:
self.conn = sqlite3.connect(self.db_path)
self.conn.row_factory = sqlite3.Row
self.logger.debug("[INIT:DatabaseManager] [COHERENCE_CHECK_PASSED] Соединение с БД установлено.")
return self.conn
except sqlite3.Error as e:
self.logger.critical(f"[INIT:DatabaseManager] [CRITICAL] Ошибка подключения к БД: {e}", exc_info=True)
raise ConnectionError(f"Не удалось подключиться к базе данных {self.db_path}") from e
# </ACTION>
# <ACTION name="__exit__">
def __exit__(self, exc_type, exc_val, exc_tb):
if self.conn:
self.conn.close()
self.logger.debug("[CLEANUP:DatabaseManager] Соединение с БД закрыто.")
if exc_type:
self.logger.error(f"[ERROR:DatabaseManager] Исключение в контекстном менеджере БД: {exc_type.__name__}: {exc_val}", exc_info=True)
# </ACTION>
# <HELPER name="close">
def close(self):
if self.conn:
self.conn.close()
self.conn = None
self.logger.debug("[CLEANUP:DatabaseManager] Соединение с БД явно закрыто.")
# </HELPER>
# <MAIN_CONTRACT for="DatabaseLogHandler">
# description: "Обработчик логирования, который записывает логи в таблицу `logs` в SQLite."
# preconditions: "`db_manager` должен быть инициализирован."
# postconditions: "Записи логов сохраняются в базу данных."
# </MAIN_CONTRACT>
class DatabaseLogHandler(logging.Handler):
# <INIT name="__init__">
def __init__(self, db_manager: DatabaseManager, run_id: str):
super().__init__()
self.db_manager = db_manager
self.run_id = run_id
# </INIT>
# <ACTION name="emit">
def emit(self, record: logging.LogRecord):
try:
with sqlite3.connect(self.db_manager.db_path) as con:
cur = con.cursor()
log_entry = LogRecordModel(
run_id=self.run_id,
timestamp=datetime.fromtimestamp(record.created),
level=record.levelname,
message=self.format(record)
)
cur.execute(
"INSERT INTO logs (run_id, timestamp, level, message) VALUES (?, ?, ?, ?)",
(log_entry.run_id, log_entry.timestamp.isoformat(), log_entry.level, log_entry.message)
)
con.commit()
except Exception as e:
print(f"CRITICAL: [COHERENCE_CHECK_FAILED] Не удалось записать лог в базу данных: {e}", flush=True)
# </ACTION>
# <CONTRACT for="init_database">
# description: "Инициализирует схему базы данных, создавая таблицы `products` и `logs`, если они не существуют."
# preconditions: "`db_path` должен быть валидным путем `Path`."
# side_effects: "Создает директорию для БД, если она не существует."
# </CONTRACT>
# <ACTION name="init_database">
def init_database(db_path: Path, run_id: str):
log_prefix = f"[ACTION:init_database(id={run_id})]"
logging.info(f"{log_prefix} Инициализация базы данных: {db_path}")
try:
db_path.parent.mkdir(parents=True, exist_ok=True)
with sqlite3.connect(db_path) as con:
cur = con.cursor()
cur.execute("""
CREATE TABLE IF NOT EXISTS parsing_runs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
run_id TEXT NOT NULL UNIQUE,
start_time TIMESTAMP NOT NULL
)
""")
cur.execute("""
CREATE TABLE IF NOT EXISTS products (
id INTEGER PRIMARY KEY AUTOINCREMENT,
run_id TEXT NOT NULL,
name TEXT NOT NULL,
volume TEXT,
price INTEGER NOT NULL,
url TEXT,
is_in_stock BOOLEAN,
parsed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (run_id) REFERENCES parsing_runs (run_id)
)
""")
cur.execute("""
CREATE TABLE IF NOT EXISTS logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
run_id TEXT NOT NULL,
timestamp TEXT NOT NULL,
level TEXT NOT NULL,
message TEXT NOT NULL,
FOREIGN KEY (run_id) REFERENCES parsing_runs (run_id)
)
""")
con.commit()
logging.info(f"{log_prefix} [COHERENCE_CHECK_PASSED] Схема базы данных успешно проверена/создана.")
except sqlite3.Error as e:
logging.error(f"{log_prefix} [COHERENCE_CHECK_FAILED] Ошибка SQLite при инициализации БД: {e}", exc_info=True)
raise ConnectionError(f"Ошибка БД при инициализации: {e}") from e
# </ACTION>
# <CONTRACT for="save_data_to_db">
# description: "Сохраняет список словарей с данными о продуктах в таблицу `products`."
# preconditions:
# - "`data` должен быть списком словарей, соответствующих модели `ProductVariant`."
# - "`db_path` должен указывать на существующую и инициализированную БД."
# postconditions: "Данные вставлены в таблицу. Возвращает True в случае успеха."
# </CONTRACT>
# <ACTION name="save_data_to_db">
def save_data_to_db(data: List[Dict], db_path: Path, run_id: str) -> bool:
log_prefix = f"[ACTION:save_data_to_db(id={run_id})]"
if not data:
logging.warning(f"{log_prefix} [CONTRACT_VIOLATION] Данные для сохранения отсутствуют.")
return False
logging.info(f"{log_prefix} Начало сохранения {len(data)} записей в БД: {db_path}")
try:
with sqlite3.connect(db_path) as con:
cur = con.cursor()
products_to_insert = [
(run_id, item['name'], item['volume'], item['price'], str(item['url']), item['is_in_stock'])
for item in data
]
if products_to_insert:
cur.executemany(
"INSERT INTO products (run_id, name, volume, price, url, is_in_stock) VALUES (?, ?, ?, ?, ?, ?)",
products_to_insert
)
con.commit()
logging.info(f"{log_prefix} [COHERENCE_CHECK_PASSED] {len(products_to_insert)} записей успешно сохранено.")
return True
else:
logging.warning(f"{log_prefix} Нет <20><>алидных записей для сохранения.")
return False
except sqlite3.Error as e:
logging.error(f"{log_prefix} [COHERENCE_CHECK_FAILED] Ошибка SQLite при сохранении данных: {e}", exc_info=True)
return False
except Exception as e:
logging.critical(f"{log_prefix} [CRITICAL] Непредвиденная ошибка при сохранении данных в БД: {e}", exc_info=True)
return False
# </ACTION>
# <COHERENCE_CHECK status="PASSED" description="Модуль базы данных полностью стр<D182><D180>ктурирован и размечен." />