update documentator promt

This commit is contained in:
2025-09-08 16:23:03 +03:00
parent 3b2f9d894e
commit aa69776807
18 changed files with 739 additions and 246 deletions

501
extract_semantics.py Normal file
View File

@@ -0,0 +1,501 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# [PACKAGE] tools.semantic_parser
# [FILE] extract_semantics.py
# [SEMANTICS] cli, parser, xml, json, file_io, design_by_contract, structured_logging, protocol_resolver, graphrag, validation, manifest_synchronization
# [AI_NOTE]: Этот скрипт является эталонной реализацией всех четырех ключевых принципов
# семантического обогащения. Он не только проверяет код на соответствие этим правилам,
# но и сам написан с их неукоснительным соблюдением.
# Версия 2.0 добавляет функциональность синхронизации манифеста.
# [IMPORTS]
import sys
import re
import json
import argparse
import os
import logging
import xml.etree.ElementTree as ET
from typing import List, Dict, Any, Optional, Set
# [END_IMPORTS]
# [ENTITY: Class('StructuredFormatter')]
# [RELATION: Class('StructuredFormatter')] -> [INHERITS_FROM] -> [Class('logging.Formatter')]
class StructuredFormatter(logging.Formatter):
"""
@summary Форматтер для логов, реализующий стандарт AIFriendlyLogging.
@invariant Каждый лог, отформатированный этим классом, будет иметь структуру "[LEVEL][ANCHOR][STATE] message".
@sideeffect Отсутствуют.
"""
def format(self, record: logging.LogRecord) -> str:
assert record.msg is not None, "Сообщение лога не может быть None."
record.msg = f"[{record.levelname.upper()}]{record.msg}"
result = super().format(record)
assert result.startswith(f"[{record.levelname.upper()}]"), "Постусловие нарушено: лог не начинается с уровня."
return result
# [END_ENTITY: Class('StructuredFormatter')]
# [ENTITY: Class('SemanticProtocol')]
# [RELATION: Class('SemanticProtocol')] -> [DEPENDS_ON] -> [Module('xml.etree.ElementTree')]
class SemanticProtocol:
"""
@summary Загружает, разрешает <INCLUDE> и предоставляет доступ к правилам из протокола.
@description Этот класс действует как 'резолвер протоколов', рекурсивно обрабатывая
теги <INCLUDE> и объединяя правила из нескольких файлов в единый набор.
@invariant Экземпляр класса всегда содержит полный, объединенный набор правил.
@sideeffect Читает несколько файлов с диска при инициализации.
"""
def __init__(self, main_protocol_path: str):
logger.debug("[DEBUG][ENTRYPOINT][initializing_protocol] Инициализация протокола из главного файла: '%s'", main_protocol_path)
if not os.path.exists(main_protocol_path):
raise FileNotFoundError(f"Главный файл протокола не найден: {main_protocol_path}")
self.processed_paths: Set[str] = set()
self.all_rule_nodes: List[ET.Element] = []
self._resolve_and_load(main_protocol_path)
self.rules = self._parse_all_rules()
logger.info("[INFO][ACTION][resolution_complete] Разрешение протокола завершено. Всего загружено правил: %d", len(self.rules))
def _resolve_and_load(self, file_path: str):
abs_path = os.path.abspath(file_path)
if abs_path in self.processed_paths:
return
logger.info("[INFO][ACTION][resolving_includes] Обработка файла протокола: %s", abs_path)
self.processed_paths.add(abs_path)
try:
tree = ET.parse(abs_path)
root = tree.getroot()
except ET.ParseError as e:
logger.error("[ERROR][ACTION][parsing_failed] Ошибка парсинга XML в файле %s: %s", abs_path, e)
return
self.all_rule_nodes.extend(root.findall(".//Rule"))
base_dir = os.path.dirname(abs_path)
for include_node in root.findall(".//INCLUDE"):
relative_path = include_node.get("from")
if relative_path and relative_path.lower().endswith('.xml'):
included_path = os.path.join(base_dir, relative_path)
self._resolve_and_load(included_path)
def _parse_all_rules(self) -> Dict[str, Dict[str, Any]]:
rules_dict = {}
for rule_node in self.all_rule_nodes:
rule_id = rule_node.get('id')
if not rule_id: continue
definition_node = rule_node.find("Definition")
rules_dict[rule_id] = self._parse_definition(definition_node)
return rules_dict
def _parse_definition(self, node: Optional[ET.Element]) -> Optional[Dict[str, Any]]:
if node is None: return None
def_type = node.get("type")
if def_type in ("regex", "dynamic_regex", "negative_regex"):
return {"type": def_type, "pattern": node.findtext("Pattern", "")}
if def_type == "paired_regex":
return {"type": def_type, "start": node.findtext("Pattern[@name='start']", ""), "end": node.findtext("Pattern[@name='end']", "")}
if def_type == "multi_check":
checks = []
for check_node in node.findall(".//Check"):
check_data = check_node.attrib
check_data['failure_message'] = check_node.findtext("FailureMessage", "")
if check_data.get('type') == 'block_order':
check_data['preceding_pattern'] = check_node.findtext("PrecedingBlockPattern", "")
check_data['following_pattern'] = check_node.findtext("FollowingBlockPattern", "")
elif check_data.get('type') == 'kdoc_validation':
check_data['for_function'] = {t.get('name'): t.get('condition') for t in check_node.findall(".//RequiredTagsForFunction/Tag")}
check_data['for_class'] = {t.get('name'): t.get('condition') for t in check_node.findall(".//RequiredTagsForClass/Tag")}
elif check_data.get('type') == 'contract_enforcement':
condition_node = check_node.find("Condition")
check_data['kdoc_tag'] = condition_node.get('kdoc_tag')
check_data['code_must_contain'] = condition_node.get('code_must_contain')
elif check_data.get('type') == 'entity_type_validation':
check_data['valid_types'] = {t.text for t in check_node.findall(".//ValidEntityTypes/Type")}
elif check_data.get('type') == 'relation_validation':
check_data['triplet_pattern'] = check_node.findtext("TripletPattern", "")
check_data['valid_relations'] = {t.text for t in check_node.findall(".//ValidRelationTypes/Type")}
else:
check_data['pattern'] = check_node.findtext("Pattern", "")
checks.append(check_data)
return {"type": def_type, "checks": checks}
return None
def get_rule(self, rule_id: str) -> Optional[Dict[str, Any]]:
return self.rules.get(rule_id)
# [END_ENTITY: Class('SemanticProtocol')]
# [ENTITY: Class('CodeValidator')]
# [RELATION: Class('CodeValidator')] -> [DEPENDS_ON] -> [Class('SemanticProtocol')]
class CodeValidator:
"""
@summary Применяет правила из протокола к содержимому файла для поиска ошибок.
@invariant Всегда работает с валидным и загруженным экземпляром SemanticProtocol.
"""
def __init__(self, protocol: SemanticProtocol):
self.protocol = protocol
def validate(self, file_path: str, content: str, entity_blocks: List[str]) -> List[str]:
errors = []
rules = self.protocol.rules
if "AIFriendlyLogging" in rules:
errors.extend(self._validate_logging(file_path, content, rules["AIFriendlyLogging"]))
if "DesignByContract" in rules or "GraphRAG" in rules:
for entity_content in entity_blocks:
if "DesignByContract" in rules:
errors.extend(self._validate_entity_dbc(entity_content, rules["DesignByContract"]))
if "GraphRAG" in rules:
errors.extend(self._validate_entity_graphrag(entity_content, rules["GraphRAG"]))
return list(set(errors))
def _validate_logging(self, file_path: str, content: str, rule: Dict) -> List[str]:
errors = []
if rule.get('type') != 'multi_check': return []
for check in rule['checks']:
if check.get('type') == 'negative_regex_in_path' and check.get('path_contains') in file_path and re.search(check['pattern'], content):
errors.append(check['failure_message'])
elif check.get('type') == 'negative_regex' and re.search(check['pattern'], content):
errors.append(check['failure_message'])
elif check.get('type') == 'positive_regex_on_match':
for line in content.splitlines():
if re.search(check['trigger'], line) and not re.search(check['pattern'], line):
errors.append(f"{check['failure_message']} [Строка: '{line.strip()}']")
return errors
def _validate_entity_dbc(self, entity_content: str, rule: Dict) -> List[str]:
errors = []
if rule.get('type') != 'multi_check': return []
kdoc_match = re.search(r"(\/\*\*[\s\S]*?\*\/)", entity_content)
kdoc = kdoc_match.group(1) if kdoc_match else ""
signature_match = re.search(r"\s*(public\s+|private\s+|internal\s+)?(class|interface|fun|object)\s+\w+", entity_content)
is_public = not (signature_match and signature_match.group(1) and 'private' in signature_match.group(1)) if signature_match else False
for check in rule['checks']:
if not is_public and check.get('type') != 'block_order': continue # Проверки контрактов только для public
if check.get('type') == 'kdoc_validation':
is_class = bool(re.search(r"\s*(class|interface|object)", entity_content))
if is_class:
for tag, _ in check['for_class'].items():
if tag not in kdoc: errors.append(f"{check['failure_message']} ({tag})")
else: # is_function
has_params = bool(re.search(r"fun\s+\w+\s*\((.|\s)*\S(.|\s)*\)", entity_content))
returns_value = not bool(re.search(r"fun\s+\w+\(.*\)\s*:\s*Unit", entity_content) or not re.search(r"fun\s+\w+\(.*\)\s*:", entity_content))
for tag, cond in check['for_function'].items():
if tag not in kdoc and (not cond or (cond == 'has_parameters' and has_params) or (cond == 'returns_value' and returns_value)):
errors.append(f"{check['failure_message']} ({tag})")
elif check.get('type') == 'contract_enforcement' and check['kdoc_tag'] in kdoc and not re.search(check['code_must_contain'], entity_content):
errors.append(check['failure_message'])
return errors
def _validate_entity_graphrag(self, entity_content: str, rule: Dict) -> List[str]:
errors = []
if rule.get('type') != 'multi_check': return []
markup_block_match = re.search(r"^([\s\S]*?)(\/\*\*|class|interface|fun|object)", entity_content)
markup_block = markup_block_match.group(1) if markup_block_match else ""
for check in rule['checks']:
if check.get('type') == 'block_order' and "/**" in markup_block:
errors.append(check['failure_message'])
elif check.get('type') == 'entity_type_validation':
entity_match = re.search(r"//\s*\[ENTITY:\s*(?P<type>\w+)\((?P<name>.*?)\)\]", markup_block)
if entity_match and entity_match.group('type') not in check['valid_types']:
errors.append(f"{check['failure_message']} Найдено: {entity_match.group('type')}.")
elif check.get('type') == 'relation_validation':
for line in re.findall(r"//\s*\[RELATION:.*\]", markup_block):
match = re.match(check['triplet_pattern'], line)
if not match:
errors.append(f"{check['failure_message']} (неверный формат). Строка: {line.strip()}")
elif match.group('relation_type') not in check['valid_relations']:
errors.append(f"{check['failure_message']} Найдено: [{match.group('relation_type')}].")
elif check.get('type') == 'markup_cohesion':
for line in markup_block.strip().split('\n'):
if line.strip() and not line.strip().startswith('//'):
errors.append(check['failure_message']); break
return errors
# [END_ENTITY: Class('CodeValidator')]
# [ENTITY: Class('SemanticParser')]
# [RELATION: Class('SemanticParser')] -> [DEPENDS_ON] -> [Class('SemanticProtocol')]
# [RELATION: Class('SemanticParser')] -> [CREATES_INSTANCE_OF] -> [Class('CodeValidator')]
class SemanticParser:
"""
@summary Оркестрирует процесс валидации и парсинга исходных файлов.
@invariant Всегда работает с валидным и загруженным экземпляром SemanticProtocol.
@sideeffect Читает содержимое файлов, переданных для парсинга.
"""
def __init__(self, protocol: SemanticProtocol):
assert isinstance(protocol, SemanticProtocol), "Объект protocol должен быть экземпляром SemanticProtocol."
self.protocol = protocol
self.validator = CodeValidator(protocol)
def parse_file(self, file_path: str) -> Dict[str, Any]:
logger.info("[INFO][ENTRYPOINT][parsing_file] Начало парсинга файла: '%s'", file_path)
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
except Exception as e:
return {"file_path": file_path, "status": "error", "error_message": f"Не удалось прочитать файл: {e}"}
entity_rule = self.protocol.get_rule("EntityContainerization")
entity_blocks = re.findall(entity_rule['start'] + r'[\s\S]*?' + entity_rule['end'], content, re.DOTALL) if entity_rule else []
validation_errors = self.validator.validate(file_path, content, entity_blocks)
header_rule = self.protocol.get_rule("FileHeaderIntegrity")
if not re.search(header_rule['pattern'], content) if header_rule else None:
msg = "Нарушение целостности заголовка (правило FileHeaderIntegrity)."
if msg not in validation_errors: validation_errors.append(msg)
if validation_errors:
logger.warn("[WARN][ACTION][validation_failed] Файл %s не прошел валидацию: %s", file_path, " | ".join(validation_errors))
return {"file_path": file_path, "status": "error", "error_message": " | ".join(validation_errors)}
header_match = re.search(header_rule['pattern'], content)
header_data = header_match.groupdict()
file_info = {
"file_path": file_path, "status": "success",
"header": {"package": header_data.get('package','').strip(), "file_name": header_data.get('file','').strip(), "semantics_tags": [t.strip() for t in header_data.get('semantics','').split(',')]},
"entities": self._extract_entities(content)
}
logger.info("[INFO][POSTCONDITION][parsing_complete] Парсинг файла завершен. Найдено сущностей: %d", len(file_info["entities"]))
return file_info
def _extract_entities(self, content: str) -> List[Dict[str, Any]]:
entity_rule = self.protocol.get_rule("EntityContainerization")
if not entity_rule: return []
entities = []
for match in re.finditer(entity_rule['start'] + r'(?P<body>.*?)' + entity_rule['end'], content, re.DOTALL):
data = match.groupdict()
kdoc = self._parse_kdoc(data.get('body', ''))
e_type, e_name = data.get('type', 'N/A'), data.get('name', 'N/A')
type_snake = re.sub(r'(?<!^)(?=[A-Z])', '_', e_type).lower()
name_snake = re.sub(r'[^a-zA-Z0-9_]', '', e_name.replace(' ', '_')).lower()
entities.append({
"node_id": f"{type_snake}_{name_snake}", "entity_type": e_type, "entity_name": e_name,
"summary": kdoc['summary'], "description": kdoc['description'], "relations": kdoc['relations']
})
return entities
def _parse_kdoc(self, body: str) -> Dict[str, Any]:
summary_match = re.search(r"@summary\s*(.*)", body)
summary = summary_match.group(1).strip() if summary_match else ""
desc_match = re.search(r"@description\s*(.*)", body, re.DOTALL)
desc = ""
if desc_match:
lines = [re.sub(r"^\s*\*\s?", "", l).strip() for l in desc_match.group(1).strip().split('\n')]
desc = " ".join(lines)
relations = [m.groupdict() for m in re.finditer(r"[RELATION:\s*(?P<type>\w+)\s*target_id='(?P<target>.*?)']", body)]
return {"summary": summary, "description": desc, "relations": relations}
# [END_ENTITY: Class('SemanticParser')]
# [ENTITY: Class('ManifestSynchronizer')]
# [RELATION: Class('ManifestSynchronizer')] -> [DEPENDS_ON] -> [Module('xml.etree.ElementTree')]
# [RELATION: Class('ManifestSynchronizer')] -> [MODIFIES_STATE_OF] -> [DataStructure('PROJECT_MANIFEST.xml')]
class ManifestSynchronizer:
"""
@summary Управляет чтением, сравнением и обновлением PROJECT_MANIFEST.xml.
@invariant Экземпляр класса всегда работает с корректно загруженным XML-деревом.
@sideeffect Читает и может перезаписывать файл манифеста на диске.
"""
def __init__(self, manifest_path: str):
"""
@param manifest_path: Путь к файлу PROJECT_MANIFEST.xml.
@sideeffect Читает и парсит XML-файл. Вызывает исключение, если файл не найден или поврежден.
"""
require(os.path.exists(manifest_path), f"Файл манифеста не найден: {manifest_path}")
logger.info("[INFO][ENTRYPOINT][manifest_loading] Загрузка манифеста: %s", manifest_path)
self.manifest_path = manifest_path
try:
self.tree = ET.parse(manifest_path)
self.root = self.tree.getroot()
self.graph_node = self.root.find("PROJECT_GRAPH")
if self.graph_node is None:
raise ValueError("В манифесте отсутствует тег <PROJECT_GRAPH>")
except (ET.ParseError, ValueError) as e:
logger.error("[ERROR][ACTION][manifest_parsing_failed] Ошибка парсинга манифеста: %s", e)
raise ValueError(f"Ошибка парсинга манифеста: {e}")
def synchronize(self, parsed_code_data: List[Dict[str, Any]]) -> Dict[str, int]:
"""
@summary Синхронизирует состояние манифеста с состоянием кодовой базы.
@param parsed_code_data: Список словарей, представляющих состояние файлов, от SemanticParser.
@return Словарь со статистикой изменений.
@sideeffect Модифицирует внутреннее XML-дерево.
"""
stats = {"nodes_added": 0, "nodes_updated": 0, "nodes_removed": 0}
all_code_node_ids = {
entity["node_id"]
for file_data in parsed_code_data if file_data["status"] == "success"
for entity in file_data["entities"]
}
manifest_nodes_map = {node.get("id"): node for node in self.graph_node.findall("NODE")}
manifest_node_ids = set(manifest_nodes_map.keys())
# Удаление узлов, которых больше нет в коде
nodes_to_remove = manifest_node_ids - all_code_node_ids
for node_id in nodes_to_remove:
logger.debug("[DEBUG][ACTION][removing_node] Удаление устаревшего узла: %s", node_id)
self.graph_node.remove(manifest_nodes_map[node_id])
stats["nodes_removed"] += 1
# Добавление и обновление узлов
for file_data in parsed_code_data:
if file_data["status"] != "success":
continue
for entity in file_data["entities"]:
node_id = entity["node_id"]
existing_node = manifest_nodes_map.get(node_id)
if existing_node is None:
logger.debug("[DEBUG][ACTION][adding_node] Добавление нового узла: %s", node_id)
new_node = ET.SubElement(self.graph_node, "NODE", id=node_id)
self._update_node_attributes(new_node, entity, file_data)
stats["nodes_added"] += 1
else:
if self._is_update_needed(existing_node, entity, file_data):
logger.debug("[DEBUG][ACTION][updating_node] Обновление узла: %s", node_id)
self._update_node_attributes(existing_node, entity, file_data)
stats["nodes_updated"] += 1
logger.info("[INFO][POSTCONDITION][synchronization_complete] Синхронизация завершена. Статистика: %s", stats)
return stats
def _update_node_attributes(self, node: ET.Element, entity: Dict, file_data: Dict):
node.set("type", entity["entity_type"])
node.set("name", entity["entity_name"])
node.set("file_path", file_data["file_path"])
node.set("package", file_data["header"]["package"])
# Очистка и добавление дочерних тегов
for child in list(node):
node.remove(child)
ET.SubElement(node, "SUMMARY").text = entity["summary"]
ET.SubElement(node, "DESCRIPTION").text = entity["description"]
tags_node = ET.SubElement(node, "SEMANTICS_TAGS")
tags_node.text = ", ".join(file_data["header"]["semantics_tags"])
relations_node = ET.SubElement(node, "RELATIONS")
for rel in entity["relations"]:
ET.SubElement(relations_node, "RELATION", type=rel["type"], target_id=rel["target"])
def _is_update_needed(self, node: ET.Element, entity: Dict, file_data: Dict) -> bool:
# Простая проверка по нескольким ключевым полям
if node.get("type") != entity["entity_type"] or node.get("name") != entity["entity_name"]:
return True
summary_node = node.find("SUMMARY")
if summary_node is None or summary_node.text != entity["summary"]:
return True
return False
def write_xml(self):
"""
@summary Записывает измененное XML-дерево обратно в файл.
@sideeffect Перезаписывает файл манифеста на диске.
"""
require(self.tree is not None, "XML-дерево не было инициализировано.")
logger.info("[INFO][ACTION][writing_manifest] Запись изменений в файл манифеста: %s", self.manifest_path)
ET.indent(self.tree, space=" ")
self.tree.write(self.manifest_path, encoding="utf-8", xml_declaration=True)
# [END_ENTITY: Class('ManifestSynchronizer')]
# [ENTITY: Function('require')]
def require(condition: bool, message: str):
"""
@summary Проверяет предусловие и вызывает ValueError, если оно ложно.
@param condition: Условие для проверки.
@param message: Сообщение об ошибке.
@sideeffect Вызывает исключение при ложном условии.
"""
if not condition:
raise ValueError(message)
# [END_ENTITY: Function('require')]
# [ENTITY: Function('main')]
# [RELATION: Function('main')] -> [CREATES_INSTANCE_OF] -> [Class('SemanticProtocol')]
# [RELATION: Function('main')] -> [CREATES_INSTANCE_OF] -> [Class('SemanticParser')]
# [RELATION: Function('main')] -> [CREATES_INSTANCE_OF] -> [Class('ManifestSynchronizer')]
def main():
"""
@summary Главная точка входа в приложение.
@description Управляет жизненным циклом: парсинг аргументов, настройка логирования,
запуск парсинга файлов и синхронизации манифеста.
@sideeffect Читает аргументы командной строки, выводит результат в stdout/stderr.
"""
parser = argparse.ArgumentParser(description="Парсит .kt файлы и синхронизирует манифест проекта.")
parser.add_argument('files', nargs='+', help="Список .kt файлов для обработки.")
parser.add_argument('--protocol', required=True, help="Путь к главному файлу протокола.")
parser.add_argument('--manifest-path', required=True, help="Путь к файлу PROJECT_MANIFEST.xml.")
parser.add_argument('--update-in-place', action='store_true', help="Если указано, перезаписывает файл манифеста.")
parser.add_argument('--log-level', default='INFO', choices=['DEBUG', 'INFO', 'WARN', 'ERROR'], help="Уровень логирования.")
args = parser.parse_args()
logger.setLevel(args.log_level)
handler = logging.StreamHandler(sys.stderr)
handler.setFormatter(StructuredFormatter())
logger.addHandler(handler)
logger.info("[INFO][INITIALIZATION][configuring_logger] Логгер настроен. Уровень: %s", args.log_level)
output_report = {
"status": "failure",
"manifest_path": args.manifest_path,
"files_scanned": len(args.files),
"files_with_errors": 0,
"changes": {}
}
try:
protocol = SemanticProtocol(args.protocol)
parser_instance = SemanticParser(protocol)
parsed_results = [parser_instance.parse_file(f) for f in args.files]
output_report["files_with_errors"] = sum(1 for r in parsed_results if r["status"] == "error")
synchronizer = ManifestSynchronizer(args.manifest_path)
change_stats = synchronizer.synchronize(parsed_results)
output_report["changes"] = change_stats
if args.update_in_place:
if sum(change_stats.values()) > 0:
synchronizer.write_xml()
logger.info("[INFO][ACTION][manifest_updated] Манифест был успешно обновлен.")
else:
logger.info("[INFO][ACTION][manifest_not_updated] Изменений не было, манифест не перезаписан.")
output_report["status"] = "success"
except (FileNotFoundError, ValueError, ET.ParseError) as e:
logger.critical("[FATAL][EXECUTION][critical_error] Критическая ошибка: %s", e, exc_info=True)
output_report["error_message"] = str(e)
finally:
print(json.dumps(output_report, indent=2, ensure_ascii=False))
if output_report["status"] == "failure":
sys.exit(1)
# [END_ENTITY: Function('main')]
# [CONTRACT]
if __name__ == "__main__":
logger = logging.getLogger(__name__)
main()
# [END_CONTRACT]
# [END_FILE_extract_semantics.py]