Files
homebox_lens/extract_semantics.py
2025-09-08 16:23:03 +03:00

502 lines
28 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# [PACKAGE] tools.semantic_parser
# [FILE] extract_semantics.py
# [SEMANTICS] cli, parser, xml, json, file_io, design_by_contract, structured_logging, protocol_resolver, graphrag, validation, manifest_synchronization
# [AI_NOTE]: Этот скрипт является эталонной реализацией всех четырех ключевых принципов
# семантического обогащения. Он не только проверяет код на соответствие этим правилам,
# но и сам написан с их неукоснительным соблюдением.
# Версия 2.0 добавляет функциональность синхронизации манифеста.
# [IMPORTS]
import sys
import re
import json
import argparse
import os
import logging
import xml.etree.ElementTree as ET
from typing import List, Dict, Any, Optional, Set
# [END_IMPORTS]
# [ENTITY: Class('StructuredFormatter')]
# [RELATION: Class('StructuredFormatter')] -> [INHERITS_FROM] -> [Class('logging.Formatter')]
class StructuredFormatter(logging.Formatter):
"""
@summary Форматтер для логов, реализующий стандарт AIFriendlyLogging.
@invariant Каждый лог, отформатированный этим классом, будет иметь структуру "[LEVEL][ANCHOR][STATE] message".
@sideeffect Отсутствуют.
"""
def format(self, record: logging.LogRecord) -> str:
assert record.msg is not None, "Сообщение лога не может быть None."
record.msg = f"[{record.levelname.upper()}]{record.msg}"
result = super().format(record)
assert result.startswith(f"[{record.levelname.upper()}]"), "Постусловие нарушено: лог не начинается с уровня."
return result
# [END_ENTITY: Class('StructuredFormatter')]
# [ENTITY: Class('SemanticProtocol')]
# [RELATION: Class('SemanticProtocol')] -> [DEPENDS_ON] -> [Module('xml.etree.ElementTree')]
class SemanticProtocol:
"""
@summary Загружает, разрешает <INCLUDE> и предоставляет доступ к правилам из протокола.
@description Этот класс действует как 'резолвер протоколов', рекурсивно обрабатывая
теги <INCLUDE> и объединяя правила из нескольких файлов в единый набор.
@invariant Экземпляр класса всегда содержит полный, объединенный набор правил.
@sideeffect Читает несколько файлов с диска при инициализации.
"""
def __init__(self, main_protocol_path: str):
logger.debug("[DEBUG][ENTRYPOINT][initializing_protocol] Инициализация протокола из главного файла: '%s'", main_protocol_path)
if not os.path.exists(main_protocol_path):
raise FileNotFoundError(f"Главный файл протокола не найден: {main_protocol_path}")
self.processed_paths: Set[str] = set()
self.all_rule_nodes: List[ET.Element] = []
self._resolve_and_load(main_protocol_path)
self.rules = self._parse_all_rules()
logger.info("[INFO][ACTION][resolution_complete] Разрешение протокола завершено. Всего загружено правил: %d", len(self.rules))
def _resolve_and_load(self, file_path: str):
abs_path = os.path.abspath(file_path)
if abs_path in self.processed_paths:
return
logger.info("[INFO][ACTION][resolving_includes] Обработка файла протокола: %s", abs_path)
self.processed_paths.add(abs_path)
try:
tree = ET.parse(abs_path)
root = tree.getroot()
except ET.ParseError as e:
logger.error("[ERROR][ACTION][parsing_failed] Ошибка парсинга XML в файле %s: %s", abs_path, e)
return
self.all_rule_nodes.extend(root.findall(".//Rule"))
base_dir = os.path.dirname(abs_path)
for include_node in root.findall(".//INCLUDE"):
relative_path = include_node.get("from")
if relative_path and relative_path.lower().endswith('.xml'):
included_path = os.path.join(base_dir, relative_path)
self._resolve_and_load(included_path)
def _parse_all_rules(self) -> Dict[str, Dict[str, Any]]:
rules_dict = {}
for rule_node in self.all_rule_nodes:
rule_id = rule_node.get('id')
if not rule_id: continue
definition_node = rule_node.find("Definition")
rules_dict[rule_id] = self._parse_definition(definition_node)
return rules_dict
def _parse_definition(self, node: Optional[ET.Element]) -> Optional[Dict[str, Any]]:
if node is None: return None
def_type = node.get("type")
if def_type in ("regex", "dynamic_regex", "negative_regex"):
return {"type": def_type, "pattern": node.findtext("Pattern", "")}
if def_type == "paired_regex":
return {"type": def_type, "start": node.findtext("Pattern[@name='start']", ""), "end": node.findtext("Pattern[@name='end']", "")}
if def_type == "multi_check":
checks = []
for check_node in node.findall(".//Check"):
check_data = check_node.attrib
check_data['failure_message'] = check_node.findtext("FailureMessage", "")
if check_data.get('type') == 'block_order':
check_data['preceding_pattern'] = check_node.findtext("PrecedingBlockPattern", "")
check_data['following_pattern'] = check_node.findtext("FollowingBlockPattern", "")
elif check_data.get('type') == 'kdoc_validation':
check_data['for_function'] = {t.get('name'): t.get('condition') for t in check_node.findall(".//RequiredTagsForFunction/Tag")}
check_data['for_class'] = {t.get('name'): t.get('condition') for t in check_node.findall(".//RequiredTagsForClass/Tag")}
elif check_data.get('type') == 'contract_enforcement':
condition_node = check_node.find("Condition")
check_data['kdoc_tag'] = condition_node.get('kdoc_tag')
check_data['code_must_contain'] = condition_node.get('code_must_contain')
elif check_data.get('type') == 'entity_type_validation':
check_data['valid_types'] = {t.text for t in check_node.findall(".//ValidEntityTypes/Type")}
elif check_data.get('type') == 'relation_validation':
check_data['triplet_pattern'] = check_node.findtext("TripletPattern", "")
check_data['valid_relations'] = {t.text for t in check_node.findall(".//ValidRelationTypes/Type")}
else:
check_data['pattern'] = check_node.findtext("Pattern", "")
checks.append(check_data)
return {"type": def_type, "checks": checks}
return None
def get_rule(self, rule_id: str) -> Optional[Dict[str, Any]]:
return self.rules.get(rule_id)
# [END_ENTITY: Class('SemanticProtocol')]
# [ENTITY: Class('CodeValidator')]
# [RELATION: Class('CodeValidator')] -> [DEPENDS_ON] -> [Class('SemanticProtocol')]
class CodeValidator:
"""
@summary Применяет правила из протокола к содержимому файла для поиска ошибок.
@invariant Всегда работает с валидным и загруженным экземпляром SemanticProtocol.
"""
def __init__(self, protocol: SemanticProtocol):
self.protocol = protocol
def validate(self, file_path: str, content: str, entity_blocks: List[str]) -> List[str]:
errors = []
rules = self.protocol.rules
if "AIFriendlyLogging" in rules:
errors.extend(self._validate_logging(file_path, content, rules["AIFriendlyLogging"]))
if "DesignByContract" in rules or "GraphRAG" in rules:
for entity_content in entity_blocks:
if "DesignByContract" in rules:
errors.extend(self._validate_entity_dbc(entity_content, rules["DesignByContract"]))
if "GraphRAG" in rules:
errors.extend(self._validate_entity_graphrag(entity_content, rules["GraphRAG"]))
return list(set(errors))
def _validate_logging(self, file_path: str, content: str, rule: Dict) -> List[str]:
errors = []
if rule.get('type') != 'multi_check': return []
for check in rule['checks']:
if check.get('type') == 'negative_regex_in_path' and check.get('path_contains') in file_path and re.search(check['pattern'], content):
errors.append(check['failure_message'])
elif check.get('type') == 'negative_regex' and re.search(check['pattern'], content):
errors.append(check['failure_message'])
elif check.get('type') == 'positive_regex_on_match':
for line in content.splitlines():
if re.search(check['trigger'], line) and not re.search(check['pattern'], line):
errors.append(f"{check['failure_message']} [Строка: '{line.strip()}']")
return errors
def _validate_entity_dbc(self, entity_content: str, rule: Dict) -> List[str]:
errors = []
if rule.get('type') != 'multi_check': return []
kdoc_match = re.search(r"(\/\*\*[\s\S]*?\*\/)", entity_content)
kdoc = kdoc_match.group(1) if kdoc_match else ""
signature_match = re.search(r"\s*(public\s+|private\s+|internal\s+)?(class|interface|fun|object)\s+\w+", entity_content)
is_public = not (signature_match and signature_match.group(1) and 'private' in signature_match.group(1)) if signature_match else False
for check in rule['checks']:
if not is_public and check.get('type') != 'block_order': continue # Проверки контрактов только для public
if check.get('type') == 'kdoc_validation':
is_class = bool(re.search(r"\s*(class|interface|object)", entity_content))
if is_class:
for tag, _ in check['for_class'].items():
if tag not in kdoc: errors.append(f"{check['failure_message']} ({tag})")
else: # is_function
has_params = bool(re.search(r"fun\s+\w+\s*\((.|\s)*\S(.|\s)*\)", entity_content))
returns_value = not bool(re.search(r"fun\s+\w+\(.*\)\s*:\s*Unit", entity_content) or not re.search(r"fun\s+\w+\(.*\)\s*:", entity_content))
for tag, cond in check['for_function'].items():
if tag not in kdoc and (not cond or (cond == 'has_parameters' and has_params) or (cond == 'returns_value' and returns_value)):
errors.append(f"{check['failure_message']} ({tag})")
elif check.get('type') == 'contract_enforcement' and check['kdoc_tag'] in kdoc and not re.search(check['code_must_contain'], entity_content):
errors.append(check['failure_message'])
return errors
def _validate_entity_graphrag(self, entity_content: str, rule: Dict) -> List[str]:
errors = []
if rule.get('type') != 'multi_check': return []
markup_block_match = re.search(r"^([\s\S]*?)(\/\*\*|class|interface|fun|object)", entity_content)
markup_block = markup_block_match.group(1) if markup_block_match else ""
for check in rule['checks']:
if check.get('type') == 'block_order' and "/**" in markup_block:
errors.append(check['failure_message'])
elif check.get('type') == 'entity_type_validation':
entity_match = re.search(r"//\s*\[ENTITY:\s*(?P<type>\w+)\((?P<name>.*?)\)\]", markup_block)
if entity_match and entity_match.group('type') not in check['valid_types']:
errors.append(f"{check['failure_message']} Найдено: {entity_match.group('type')}.")
elif check.get('type') == 'relation_validation':
for line in re.findall(r"//\s*\[RELATION:.*\]", markup_block):
match = re.match(check['triplet_pattern'], line)
if not match:
errors.append(f"{check['failure_message']} (неверный формат). Строка: {line.strip()}")
elif match.group('relation_type') not in check['valid_relations']:
errors.append(f"{check['failure_message']} Найдено: [{match.group('relation_type')}].")
elif check.get('type') == 'markup_cohesion':
for line in markup_block.strip().split('\n'):
if line.strip() and not line.strip().startswith('//'):
errors.append(check['failure_message']); break
return errors
# [END_ENTITY: Class('CodeValidator')]
# [ENTITY: Class('SemanticParser')]
# [RELATION: Class('SemanticParser')] -> [DEPENDS_ON] -> [Class('SemanticProtocol')]
# [RELATION: Class('SemanticParser')] -> [CREATES_INSTANCE_OF] -> [Class('CodeValidator')]
class SemanticParser:
"""
@summary Оркестрирует процесс валидации и парсинга исходных файлов.
@invariant Всегда работает с валидным и загруженным экземпляром SemanticProtocol.
@sideeffect Читает содержимое файлов, переданных для парсинга.
"""
def __init__(self, protocol: SemanticProtocol):
assert isinstance(protocol, SemanticProtocol), "Объект protocol должен быть экземпляром SemanticProtocol."
self.protocol = protocol
self.validator = CodeValidator(protocol)
def parse_file(self, file_path: str) -> Dict[str, Any]:
logger.info("[INFO][ENTRYPOINT][parsing_file] Начало парсинга файла: '%s'", file_path)
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
except Exception as e:
return {"file_path": file_path, "status": "error", "error_message": f"Не удалось прочитать файл: {e}"}
entity_rule = self.protocol.get_rule("EntityContainerization")
entity_blocks = re.findall(entity_rule['start'] + r'[\s\S]*?' + entity_rule['end'], content, re.DOTALL) if entity_rule else []
validation_errors = self.validator.validate(file_path, content, entity_blocks)
header_rule = self.protocol.get_rule("FileHeaderIntegrity")
if not re.search(header_rule['pattern'], content) if header_rule else None:
msg = "Нарушение целостности заголовка (правило FileHeaderIntegrity)."
if msg not in validation_errors: validation_errors.append(msg)
if validation_errors:
logger.warn("[WARN][ACTION][validation_failed] Файл %s не прошел валидацию: %s", file_path, " | ".join(validation_errors))
return {"file_path": file_path, "status": "error", "error_message": " | ".join(validation_errors)}
header_match = re.search(header_rule['pattern'], content)
header_data = header_match.groupdict()
file_info = {
"file_path": file_path, "status": "success",
"header": {"package": header_data.get('package','').strip(), "file_name": header_data.get('file','').strip(), "semantics_tags": [t.strip() for t in header_data.get('semantics','').split(',')]},
"entities": self._extract_entities(content)
}
logger.info("[INFO][POSTCONDITION][parsing_complete] Парсинг файла завершен. Найдено сущностей: %d", len(file_info["entities"]))
return file_info
def _extract_entities(self, content: str) -> List[Dict[str, Any]]:
entity_rule = self.protocol.get_rule("EntityContainerization")
if not entity_rule: return []
entities = []
for match in re.finditer(entity_rule['start'] + r'(?P<body>.*?)' + entity_rule['end'], content, re.DOTALL):
data = match.groupdict()
kdoc = self._parse_kdoc(data.get('body', ''))
e_type, e_name = data.get('type', 'N/A'), data.get('name', 'N/A')
type_snake = re.sub(r'(?<!^)(?=[A-Z])', '_', e_type).lower()
name_snake = re.sub(r'[^a-zA-Z0-9_]', '', e_name.replace(' ', '_')).lower()
entities.append({
"node_id": f"{type_snake}_{name_snake}", "entity_type": e_type, "entity_name": e_name,
"summary": kdoc['summary'], "description": kdoc['description'], "relations": kdoc['relations']
})
return entities
def _parse_kdoc(self, body: str) -> Dict[str, Any]:
summary_match = re.search(r"@summary\s*(.*)", body)
summary = summary_match.group(1).strip() if summary_match else ""
desc_match = re.search(r"@description\s*(.*)", body, re.DOTALL)
desc = ""
if desc_match:
lines = [re.sub(r"^\s*\*\s?", "", l).strip() for l in desc_match.group(1).strip().split('\n')]
desc = " ".join(lines)
relations = [m.groupdict() for m in re.finditer(r"[RELATION:\s*(?P<type>\w+)\s*target_id='(?P<target>.*?)']", body)]
return {"summary": summary, "description": desc, "relations": relations}
# [END_ENTITY: Class('SemanticParser')]
# [ENTITY: Class('ManifestSynchronizer')]
# [RELATION: Class('ManifestSynchronizer')] -> [DEPENDS_ON] -> [Module('xml.etree.ElementTree')]
# [RELATION: Class('ManifestSynchronizer')] -> [MODIFIES_STATE_OF] -> [DataStructure('PROJECT_MANIFEST.xml')]
class ManifestSynchronizer:
"""
@summary Управляет чтением, сравнением и обновлением PROJECT_MANIFEST.xml.
@invariant Экземпляр класса всегда работает с корректно загруженным XML-деревом.
@sideeffect Читает и может перезаписывать файл манифеста на диске.
"""
def __init__(self, manifest_path: str):
"""
@param manifest_path: Путь к файлу PROJECT_MANIFEST.xml.
@sideeffect Читает и парсит XML-файл. Вызывает исключение, если файл не найден или поврежден.
"""
require(os.path.exists(manifest_path), f"Файл манифеста не найден: {manifest_path}")
logger.info("[INFO][ENTRYPOINT][manifest_loading] Загрузка манифеста: %s", manifest_path)
self.manifest_path = manifest_path
try:
self.tree = ET.parse(manifest_path)
self.root = self.tree.getroot()
self.graph_node = self.root.find("PROJECT_GRAPH")
if self.graph_node is None:
raise ValueError("В манифесте отсутствует тег <PROJECT_GRAPH>")
except (ET.ParseError, ValueError) as e:
logger.error("[ERROR][ACTION][manifest_parsing_failed] Ошибка парсинга манифеста: %s", e)
raise ValueError(f"Ошибка парсинга манифеста: {e}")
def synchronize(self, parsed_code_data: List[Dict[str, Any]]) -> Dict[str, int]:
"""
@summary Синхронизирует состояние манифеста с состоянием кодовой базы.
@param parsed_code_data: Список словарей, представляющих состояние файлов, от SemanticParser.
@return Словарь со статистикой изменений.
@sideeffect Модифицирует внутреннее XML-дерево.
"""
stats = {"nodes_added": 0, "nodes_updated": 0, "nodes_removed": 0}
all_code_node_ids = {
entity["node_id"]
for file_data in parsed_code_data if file_data["status"] == "success"
for entity in file_data["entities"]
}
manifest_nodes_map = {node.get("id"): node for node in self.graph_node.findall("NODE")}
manifest_node_ids = set(manifest_nodes_map.keys())
# Удаление узлов, которых больше нет в коде
nodes_to_remove = manifest_node_ids - all_code_node_ids
for node_id in nodes_to_remove:
logger.debug("[DEBUG][ACTION][removing_node] Удаление устаревшего узла: %s", node_id)
self.graph_node.remove(manifest_nodes_map[node_id])
stats["nodes_removed"] += 1
# Добавление и обновление узлов
for file_data in parsed_code_data:
if file_data["status"] != "success":
continue
for entity in file_data["entities"]:
node_id = entity["node_id"]
existing_node = manifest_nodes_map.get(node_id)
if existing_node is None:
logger.debug("[DEBUG][ACTION][adding_node] Добавление нового узла: %s", node_id)
new_node = ET.SubElement(self.graph_node, "NODE", id=node_id)
self._update_node_attributes(new_node, entity, file_data)
stats["nodes_added"] += 1
else:
if self._is_update_needed(existing_node, entity, file_data):
logger.debug("[DEBUG][ACTION][updating_node] Обновление узла: %s", node_id)
self._update_node_attributes(existing_node, entity, file_data)
stats["nodes_updated"] += 1
logger.info("[INFO][POSTCONDITION][synchronization_complete] Синхронизация завершена. Статистика: %s", stats)
return stats
def _update_node_attributes(self, node: ET.Element, entity: Dict, file_data: Dict):
node.set("type", entity["entity_type"])
node.set("name", entity["entity_name"])
node.set("file_path", file_data["file_path"])
node.set("package", file_data["header"]["package"])
# Очистка и добавление дочерних тегов
for child in list(node):
node.remove(child)
ET.SubElement(node, "SUMMARY").text = entity["summary"]
ET.SubElement(node, "DESCRIPTION").text = entity["description"]
tags_node = ET.SubElement(node, "SEMANTICS_TAGS")
tags_node.text = ", ".join(file_data["header"]["semantics_tags"])
relations_node = ET.SubElement(node, "RELATIONS")
for rel in entity["relations"]:
ET.SubElement(relations_node, "RELATION", type=rel["type"], target_id=rel["target"])
def _is_update_needed(self, node: ET.Element, entity: Dict, file_data: Dict) -> bool:
# Простая проверка по нескольким ключевым полям
if node.get("type") != entity["entity_type"] or node.get("name") != entity["entity_name"]:
return True
summary_node = node.find("SUMMARY")
if summary_node is None or summary_node.text != entity["summary"]:
return True
return False
def write_xml(self):
"""
@summary Записывает измененное XML-дерево обратно в файл.
@sideeffect Перезаписывает файл манифеста на диске.
"""
require(self.tree is not None, "XML-дерево не было инициализировано.")
logger.info("[INFO][ACTION][writing_manifest] Запись изменений в файл манифеста: %s", self.manifest_path)
ET.indent(self.tree, space=" ")
self.tree.write(self.manifest_path, encoding="utf-8", xml_declaration=True)
# [END_ENTITY: Class('ManifestSynchronizer')]
# [ENTITY: Function('require')]
def require(condition: bool, message: str):
"""
@summary Проверяет предусловие и вызывает ValueError, если оно ложно.
@param condition: Условие для проверки.
@param message: Сообщение об ошибке.
@sideeffect Вызывает исключение при ложном условии.
"""
if not condition:
raise ValueError(message)
# [END_ENTITY: Function('require')]
# [ENTITY: Function('main')]
# [RELATION: Function('main')] -> [CREATES_INSTANCE_OF] -> [Class('SemanticProtocol')]
# [RELATION: Function('main')] -> [CREATES_INSTANCE_OF] -> [Class('SemanticParser')]
# [RELATION: Function('main')] -> [CREATES_INSTANCE_OF] -> [Class('ManifestSynchronizer')]
def main():
"""
@summary Главная точка входа в приложение.
@description Управляет жизненным циклом: парсинг аргументов, настройка логирования,
запуск парсинга файлов и синхронизации манифеста.
@sideeffect Читает аргументы командной строки, выводит результат в stdout/stderr.
"""
parser = argparse.ArgumentParser(description="Парсит .kt файлы и синхронизирует манифест проекта.")
parser.add_argument('files', nargs='+', help="Список .kt файлов для обработки.")
parser.add_argument('--protocol', required=True, help="Путь к главному файлу протокола.")
parser.add_argument('--manifest-path', required=True, help="Путь к файлу PROJECT_MANIFEST.xml.")
parser.add_argument('--update-in-place', action='store_true', help="Если указано, перезаписывает файл манифеста.")
parser.add_argument('--log-level', default='INFO', choices=['DEBUG', 'INFO', 'WARN', 'ERROR'], help="Уровень логирования.")
args = parser.parse_args()
logger.setLevel(args.log_level)
handler = logging.StreamHandler(sys.stderr)
handler.setFormatter(StructuredFormatter())
logger.addHandler(handler)
logger.info("[INFO][INITIALIZATION][configuring_logger] Логгер настроен. Уровень: %s", args.log_level)
output_report = {
"status": "failure",
"manifest_path": args.manifest_path,
"files_scanned": len(args.files),
"files_with_errors": 0,
"changes": {}
}
try:
protocol = SemanticProtocol(args.protocol)
parser_instance = SemanticParser(protocol)
parsed_results = [parser_instance.parse_file(f) for f in args.files]
output_report["files_with_errors"] = sum(1 for r in parsed_results if r["status"] == "error")
synchronizer = ManifestSynchronizer(args.manifest_path)
change_stats = synchronizer.synchronize(parsed_results)
output_report["changes"] = change_stats
if args.update_in_place:
if sum(change_stats.values()) > 0:
synchronizer.write_xml()
logger.info("[INFO][ACTION][manifest_updated] Манифест был успешно обновлен.")
else:
logger.info("[INFO][ACTION][manifest_not_updated] Изменений не было, манифест не перезаписан.")
output_report["status"] = "success"
except (FileNotFoundError, ValueError, ET.ParseError) as e:
logger.critical("[FATAL][EXECUTION][critical_error] Критическая ошибка: %s", e, exc_info=True)
output_report["error_message"] = str(e)
finally:
print(json.dumps(output_report, indent=2, ensure_ascii=False))
if output_report["status"] == "failure":
sys.exit(1)
# [END_ENTITY: Function('main')]
# [CONTRACT]
if __name__ == "__main__":
logger = logging.getLogger(__name__)
main()
# [END_CONTRACT]
# [END_FILE_extract_semantics.py]