#!/usr/bin/env python3 # -*- coding: utf-8 -*- # [PACKAGE] tools.semantic_parser # [FILE] extract_semantics.py # [SEMANTICS] cli, parser, xml, json, file_io, design_by_contract, structured_logging, protocol_resolver, graphrag, validation, manifest_synchronization # [AI_NOTE]: Этот скрипт является эталонной реализацией всех четырех ключевых принципов # семантического обогащения. Он не только проверяет код на соответствие этим правилам, # но и сам написан с их неукоснительным соблюдением. # Версия 2.0 добавляет функциональность синхронизации манифеста. # [IMPORTS] import sys import re import json import argparse import os import logging import xml.etree.ElementTree as ET from typing import List, Dict, Any, Optional, Set # [END_IMPORTS] # [ENTITY: Class('StructuredFormatter')] # [RELATION: Class('StructuredFormatter')] -> [INHERITS_FROM] -> [Class('logging.Formatter')] class StructuredFormatter(logging.Formatter): """ @summary Форматтер для логов, реализующий стандарт AIFriendlyLogging. @invariant Каждый лог, отформатированный этим классом, будет иметь структуру "[LEVEL][ANCHOR][STATE] message". @sideeffect Отсутствуют. """ def format(self, record: logging.LogRecord) -> str: assert record.msg is not None, "Сообщение лога не может быть None." record.msg = f"[{record.levelname.upper()}]{record.msg}" result = super().format(record) assert result.startswith(f"[{record.levelname.upper()}]"), "Постусловие нарушено: лог не начинается с уровня." return result # [END_ENTITY: Class('StructuredFormatter')] # [ENTITY: Class('SemanticProtocol')] # [RELATION: Class('SemanticProtocol')] -> [DEPENDS_ON] -> [Module('xml.etree.ElementTree')] class SemanticProtocol: """ @summary Загружает, разрешает и предоставляет доступ к правилам из протокола. @description Этот класс действует как 'резолвер протоколов', рекурсивно обрабатывая теги и объединяя правила из нескольких файлов в единый набор. @invariant Экземпляр класса всегда содержит полный, объединенный набор правил. @sideeffect Читает несколько файлов с диска при инициализации. """ def __init__(self, main_protocol_path: str): logger.debug("[DEBUG][ENTRYPOINT][initializing_protocol] Инициализация протокола из главного файла: '%s'", main_protocol_path) if not os.path.exists(main_protocol_path): raise FileNotFoundError(f"Главный файл протокола не найден: {main_protocol_path}") self.processed_paths: Set[str] = set() self.all_rule_nodes: List[ET.Element] = [] self._resolve_and_load(main_protocol_path) self.rules = self._parse_all_rules() logger.info("[INFO][ACTION][resolution_complete] Разрешение протокола завершено. Всего загружено правил: %d", len(self.rules)) def _resolve_and_load(self, file_path: str): abs_path = os.path.abspath(file_path) if abs_path in self.processed_paths: return logger.info("[INFO][ACTION][resolving_includes] Обработка файла протокола: %s", abs_path) self.processed_paths.add(abs_path) try: tree = ET.parse(abs_path) root = tree.getroot() except ET.ParseError as e: logger.error("[ERROR][ACTION][parsing_failed] Ошибка парсинга XML в файле %s: %s", abs_path, e) return self.all_rule_nodes.extend(root.findall(".//Rule")) base_dir = os.path.dirname(abs_path) for include_node in root.findall(".//INCLUDE"): relative_path = include_node.get("from") if relative_path and relative_path.lower().endswith('.xml'): included_path = os.path.join(base_dir, relative_path) self._resolve_and_load(included_path) def _parse_all_rules(self) -> Dict[str, Dict[str, Any]]: rules_dict = {} for rule_node in self.all_rule_nodes: rule_id = rule_node.get('id') if not rule_id: continue definition_node = rule_node.find("Definition") rules_dict[rule_id] = self._parse_definition(definition_node) return rules_dict def _parse_definition(self, node: Optional[ET.Element]) -> Optional[Dict[str, Any]]: if node is None: return None def_type = node.get("type") if def_type in ("regex", "dynamic_regex", "negative_regex"): return {"type": def_type, "pattern": node.findtext("Pattern", "")} if def_type == "paired_regex": return {"type": def_type, "start": node.findtext("Pattern[@name='start']", ""), "end": node.findtext("Pattern[@name='end']", "")} if def_type == "multi_check": checks = [] for check_node in node.findall(".//Check"): check_data = check_node.attrib check_data['failure_message'] = check_node.findtext("FailureMessage", "") if check_data.get('type') == 'block_order': check_data['preceding_pattern'] = check_node.findtext("PrecedingBlockPattern", "") check_data['following_pattern'] = check_node.findtext("FollowingBlockPattern", "") elif check_data.get('type') == 'kdoc_validation': check_data['for_function'] = {t.get('name'): t.get('condition') for t in check_node.findall(".//RequiredTagsForFunction/Tag")} check_data['for_class'] = {t.get('name'): t.get('condition') for t in check_node.findall(".//RequiredTagsForClass/Tag")} elif check_data.get('type') == 'contract_enforcement': condition_node = check_node.find("Condition") check_data['kdoc_tag'] = condition_node.get('kdoc_tag') check_data['code_must_contain'] = condition_node.get('code_must_contain') elif check_data.get('type') == 'entity_type_validation': check_data['valid_types'] = {t.text for t in check_node.findall(".//ValidEntityTypes/Type")} elif check_data.get('type') == 'relation_validation': check_data['triplet_pattern'] = check_node.findtext("TripletPattern", "") check_data['valid_relations'] = {t.text for t in check_node.findall(".//ValidRelationTypes/Type")} else: check_data['pattern'] = check_node.findtext("Pattern", "") checks.append(check_data) return {"type": def_type, "checks": checks} return None def get_rule(self, rule_id: str) -> Optional[Dict[str, Any]]: return self.rules.get(rule_id) # [END_ENTITY: Class('SemanticProtocol')] # [ENTITY: Class('CodeValidator')] # [RELATION: Class('CodeValidator')] -> [DEPENDS_ON] -> [Class('SemanticProtocol')] class CodeValidator: """ @summary Применяет правила из протокола к содержимому файла для поиска ошибок. @invariant Всегда работает с валидным и загруженным экземпляром SemanticProtocol. """ def __init__(self, protocol: SemanticProtocol): self.protocol = protocol def validate(self, file_path: str, content: str, entity_blocks: List[str]) -> List[str]: errors = [] rules = self.protocol.rules if "AIFriendlyLogging" in rules: errors.extend(self._validate_logging(file_path, content, rules["AIFriendlyLogging"])) if "DesignByContract" in rules or "GraphRAG" in rules: for entity_content in entity_blocks: if "DesignByContract" in rules: errors.extend(self._validate_entity_dbc(entity_content, rules["DesignByContract"])) if "GraphRAG" in rules: errors.extend(self._validate_entity_graphrag(entity_content, rules["GraphRAG"])) return list(set(errors)) def _validate_logging(self, file_path: str, content: str, rule: Dict) -> List[str]: errors = [] if rule.get('type') != 'multi_check': return [] for check in rule['checks']: if check.get('type') == 'negative_regex_in_path' and check.get('path_contains') in file_path and re.search(check['pattern'], content): errors.append(check['failure_message']) elif check.get('type') == 'negative_regex' and re.search(check['pattern'], content): errors.append(check['failure_message']) elif check.get('type') == 'positive_regex_on_match': for line in content.splitlines(): if re.search(check['trigger'], line) and not re.search(check['pattern'], line): errors.append(f"{check['failure_message']} [Строка: '{line.strip()}']") return errors def _validate_entity_dbc(self, entity_content: str, rule: Dict) -> List[str]: errors = [] if rule.get('type') != 'multi_check': return [] kdoc_match = re.search(r"(\/\*\*[\s\S]*?\*\/)", entity_content) kdoc = kdoc_match.group(1) if kdoc_match else "" signature_match = re.search(r"\s*(public\s+|private\s+|internal\s+)?(class|interface|fun|object)\s+\w+", entity_content) is_public = not (signature_match and signature_match.group(1) and 'private' in signature_match.group(1)) if signature_match else False for check in rule['checks']: if not is_public and check.get('type') != 'block_order': continue # Проверки контрактов только для public if check.get('type') == 'kdoc_validation': is_class = bool(re.search(r"\s*(class|interface|object)", entity_content)) if is_class: for tag, _ in check['for_class'].items(): if tag not in kdoc: errors.append(f"{check['failure_message']} ({tag})") else: # is_function has_params = bool(re.search(r"fun\s+\w+\s*\((.|\s)*\S(.|\s)*\)", entity_content)) returns_value = not bool(re.search(r"fun\s+\w+\(.*\)\s*:\s*Unit", entity_content) or not re.search(r"fun\s+\w+\(.*\)\s*:", entity_content)) for tag, cond in check['for_function'].items(): if tag not in kdoc and (not cond or (cond == 'has_parameters' and has_params) or (cond == 'returns_value' and returns_value)): errors.append(f"{check['failure_message']} ({tag})") elif check.get('type') == 'contract_enforcement' and check['kdoc_tag'] in kdoc and not re.search(check['code_must_contain'], entity_content): errors.append(check['failure_message']) return errors def _validate_entity_graphrag(self, entity_content: str, rule: Dict) -> List[str]: errors = [] if rule.get('type') != 'multi_check': return [] markup_block_match = re.search(r"^([\s\S]*?)(\/\*\*|class|interface|fun|object)", entity_content) markup_block = markup_block_match.group(1) if markup_block_match else "" for check in rule['checks']: if check.get('type') == 'block_order' and "/**" in markup_block: errors.append(check['failure_message']) elif check.get('type') == 'entity_type_validation': entity_match = re.search(r"//\s*\[ENTITY:\s*(?P\w+)\(‘(?P.*?)’\)\]", markup_block) if entity_match and entity_match.group('type') not in check['valid_types']: errors.append(f"{check['failure_message']} Найдено: ‘{entity_match.group('type')}’.") elif check.get('type') == 'relation_validation': for line in re.findall(r"//\s*\[RELATION:.*\]", markup_block): match = re.match(check['triplet_pattern'], line) if not match: errors.append(f"{check['failure_message']} (неверный формат). Строка: ‘{line.strip()}’") elif match.group('relation_type') not in check['valid_relations']: errors.append(f"{check['failure_message']} Найдено: ‘[{match.group('relation_type')}]’.") elif check.get('type') == 'markup_cohesion': for line in markup_block.strip().split('\n'): if line.strip() and not line.strip().startswith('//'): errors.append(check['failure_message']); break return errors # [END_ENTITY: Class('CodeValidator')] # [ENTITY: Class('SemanticParser')] # [RELATION: Class('SemanticParser')] -> [DEPENDS_ON] -> [Class('SemanticProtocol')] # [RELATION: Class('SemanticParser')] -> [CREATES_INSTANCE_OF] -> [Class('CodeValidator')] class SemanticParser: """ @summary Оркестрирует процесс валидации и парсинга исходных файлов. @invariant Всегда работает с валидным и загруженным экземпляром SemanticProtocol. @sideeffect Читает содержимое файлов, переданных для парсинга. """ def __init__(self, protocol: SemanticProtocol): assert isinstance(protocol, SemanticProtocol), "Объект protocol должен быть экземпляром SemanticProtocol." self.protocol = protocol self.validator = CodeValidator(protocol) def parse_file(self, file_path: str) -> Dict[str, Any]: logger.info("[INFO][ENTRYPOINT][parsing_file] Начало парсинга файла: '%s'", file_path) try: with open(file_path, 'r', encoding='utf-8') as f: content = f.read() except Exception as e: return {"file_path": file_path, "status": "error", "error_message": f"Не удалось прочитать файл: {e}"} entity_rule = self.protocol.get_rule("EntityContainerization") entity_blocks = re.findall(entity_rule['start'] + r'[\s\S]*?' + entity_rule['end'], content, re.DOTALL) if entity_rule else [] validation_errors = self.validator.validate(file_path, content, entity_blocks) header_rule = self.protocol.get_rule("FileHeaderIntegrity") if not re.search(header_rule['pattern'], content) if header_rule else None: msg = "Нарушение целостности заголовка (правило FileHeaderIntegrity)." if msg not in validation_errors: validation_errors.append(msg) if validation_errors: logger.warn("[WARN][ACTION][validation_failed] Файл %s не прошел валидацию: %s", file_path, " | ".join(validation_errors)) return {"file_path": file_path, "status": "error", "error_message": " | ".join(validation_errors)} header_match = re.search(header_rule['pattern'], content) header_data = header_match.groupdict() file_info = { "file_path": file_path, "status": "success", "header": {"package": header_data.get('package','').strip(), "file_name": header_data.get('file','').strip(), "semantics_tags": [t.strip() for t in header_data.get('semantics','').split(',')]}, "entities": self._extract_entities(content) } logger.info("[INFO][POSTCONDITION][parsing_complete] Парсинг файла завершен. Найдено сущностей: %d", len(file_info["entities"])) return file_info def _extract_entities(self, content: str) -> List[Dict[str, Any]]: entity_rule = self.protocol.get_rule("EntityContainerization") if not entity_rule: return [] entities = [] for match in re.finditer(entity_rule['start'] + r'(?P.*?)' + entity_rule['end'], content, re.DOTALL): data = match.groupdict() kdoc = self._parse_kdoc(data.get('body', '')) e_type, e_name = data.get('type', 'N/A'), data.get('name', 'N/A') type_snake = re.sub(r'(? Dict[str, Any]: summary_match = re.search(r"@summary\s*(.*)", body) summary = summary_match.group(1).strip() if summary_match else "" desc_match = re.search(r"@description\s*(.*)", body, re.DOTALL) desc = "" if desc_match: lines = [re.sub(r"^\s*\*\s?", "", l).strip() for l in desc_match.group(1).strip().split('\n')] desc = " ".join(lines) relations = [m.groupdict() for m in re.finditer(r"[RELATION:\s*(?P\w+)\s*target_id='(?P.*?)']", body)] return {"summary": summary, "description": desc, "relations": relations} # [END_ENTITY: Class('SemanticParser')] # [ENTITY: Class('ManifestSynchronizer')] # [RELATION: Class('ManifestSynchronizer')] -> [DEPENDS_ON] -> [Module('xml.etree.ElementTree')] # [RELATION: Class('ManifestSynchronizer')] -> [MODIFIES_STATE_OF] -> [DataStructure('PROJECT_MANIFEST.xml')] class ManifestSynchronizer: """ @summary Управляет чтением, сравнением и обновлением PROJECT_MANIFEST.xml. @invariant Экземпляр класса всегда работает с корректно загруженным XML-деревом. @sideeffect Читает и может перезаписывать файл манифеста на диске. """ def __init__(self, manifest_path: str): """ @param manifest_path: Путь к файлу PROJECT_MANIFEST.xml. @sideeffect Читает и парсит XML-файл. Вызывает исключение, если файл не найден или поврежден. """ require(os.path.exists(manifest_path), f"Файл манифеста не найден: {manifest_path}") logger.info("[INFO][ENTRYPOINT][manifest_loading] Загрузка манифеста: %s", manifest_path) self.manifest_path = manifest_path try: self.tree = ET.parse(manifest_path) self.root = self.tree.getroot() self.graph_node = self.root.find("PROJECT_GRAPH") if self.graph_node is None: raise ValueError("В манифесте отсутствует тег ") except (ET.ParseError, ValueError) as e: logger.error("[ERROR][ACTION][manifest_parsing_failed] Ошибка парсинга манифеста: %s", e) raise ValueError(f"Ошибка парсинга манифеста: {e}") def synchronize(self, parsed_code_data: List[Dict[str, Any]]) -> Dict[str, int]: """ @summary Синхронизирует состояние манифеста с состоянием кодовой базы. @param parsed_code_data: Список словарей, представляющих состояние файлов, от SemanticParser. @return Словарь со статистикой изменений. @sideeffect Модифицирует внутреннее XML-дерево. """ stats = {"nodes_added": 0, "nodes_updated": 0, "nodes_removed": 0} all_code_node_ids = { entity["node_id"] for file_data in parsed_code_data if file_data["status"] == "success" for entity in file_data["entities"] } manifest_nodes_map = {node.get("id"): node for node in self.graph_node.findall("NODE")} manifest_node_ids = set(manifest_nodes_map.keys()) # Удаление узлов, которых больше нет в коде nodes_to_remove = manifest_node_ids - all_code_node_ids for node_id in nodes_to_remove: logger.debug("[DEBUG][ACTION][removing_node] Удаление устаревшего узла: %s", node_id) self.graph_node.remove(manifest_nodes_map[node_id]) stats["nodes_removed"] += 1 # Добавление и обновление узлов for file_data in parsed_code_data: if file_data["status"] != "success": continue for entity in file_data["entities"]: node_id = entity["node_id"] existing_node = manifest_nodes_map.get(node_id) if existing_node is None: logger.debug("[DEBUG][ACTION][adding_node] Добавление нового узла: %s", node_id) new_node = ET.SubElement(self.graph_node, "NODE", id=node_id) self._update_node_attributes(new_node, entity, file_data) stats["nodes_added"] += 1 else: if self._is_update_needed(existing_node, entity, file_data): logger.debug("[DEBUG][ACTION][updating_node] Обновление узла: %s", node_id) self._update_node_attributes(existing_node, entity, file_data) stats["nodes_updated"] += 1 logger.info("[INFO][POSTCONDITION][synchronization_complete] Синхронизация завершена. Статистика: %s", stats) return stats def _update_node_attributes(self, node: ET.Element, entity: Dict, file_data: Dict): node.set("type", entity["entity_type"]) node.set("name", entity["entity_name"]) node.set("file_path", file_data["file_path"]) node.set("package", file_data["header"]["package"]) # Очистка и добавление дочерних тегов for child in list(node): node.remove(child) ET.SubElement(node, "SUMMARY").text = entity["summary"] ET.SubElement(node, "DESCRIPTION").text = entity["description"] tags_node = ET.SubElement(node, "SEMANTICS_TAGS") tags_node.text = ", ".join(file_data["header"]["semantics_tags"]) relations_node = ET.SubElement(node, "RELATIONS") for rel in entity["relations"]: ET.SubElement(relations_node, "RELATION", type=rel["type"], target_id=rel["target"]) def _is_update_needed(self, node: ET.Element, entity: Dict, file_data: Dict) -> bool: # Простая проверка по нескольким ключевым полям if node.get("type") != entity["entity_type"] or node.get("name") != entity["entity_name"]: return True summary_node = node.find("SUMMARY") if summary_node is None or summary_node.text != entity["summary"]: return True return False def write_xml(self): """ @summary Записывает измененное XML-дерево обратно в файл. @sideeffect Перезаписывает файл манифеста на диске. """ require(self.tree is not None, "XML-дерево не было инициализировано.") logger.info("[INFO][ACTION][writing_manifest] Запись изменений в файл манифеста: %s", self.manifest_path) ET.indent(self.tree, space=" ") self.tree.write(self.manifest_path, encoding="utf-8", xml_declaration=True) # [END_ENTITY: Class('ManifestSynchronizer')] # [ENTITY: Function('require')] def require(condition: bool, message: str): """ @summary Проверяет предусловие и вызывает ValueError, если оно ложно. @param condition: Условие для проверки. @param message: Сообщение об ошибке. @sideeffect Вызывает исключение при ложном условии. """ if not condition: raise ValueError(message) # [END_ENTITY: Function('require')] # [ENTITY: Function('main')] # [RELATION: Function('main')] -> [CREATES_INSTANCE_OF] -> [Class('SemanticProtocol')] # [RELATION: Function('main')] -> [CREATES_INSTANCE_OF] -> [Class('SemanticParser')] # [RELATION: Function('main')] -> [CREATES_INSTANCE_OF] -> [Class('ManifestSynchronizer')] def main(): """ @summary Главная точка входа в приложение. @description Управляет жизненным циклом: парсинг аргументов, настройка логирования, запуск парсинга файлов и синхронизации манифеста. @sideeffect Читает аргументы командной строки, выводит результат в stdout/stderr. """ parser = argparse.ArgumentParser(description="Парсит .kt файлы и синхронизирует манифест проекта.") parser.add_argument('files', nargs='+', help="Список .kt файлов для обработки.") parser.add_argument('--protocol', required=True, help="Путь к главному файлу протокола.") parser.add_argument('--manifest-path', required=True, help="Путь к файлу PROJECT_MANIFEST.xml.") parser.add_argument('--update-in-place', action='store_true', help="Если указано, перезаписывает файл манифеста.") parser.add_argument('--log-level', default='INFO', choices=['DEBUG', 'INFO', 'WARN', 'ERROR'], help="Уровень логирования.") args = parser.parse_args() logger.setLevel(args.log_level) handler = logging.StreamHandler(sys.stderr) handler.setFormatter(StructuredFormatter()) logger.addHandler(handler) logger.info("[INFO][INITIALIZATION][configuring_logger] Логгер настроен. Уровень: %s", args.log_level) output_report = { "status": "failure", "manifest_path": args.manifest_path, "files_scanned": len(args.files), "files_with_errors": 0, "changes": {} } try: protocol = SemanticProtocol(args.protocol) parser_instance = SemanticParser(protocol) parsed_results = [parser_instance.parse_file(f) for f in args.files] output_report["files_with_errors"] = sum(1 for r in parsed_results if r["status"] == "error") synchronizer = ManifestSynchronizer(args.manifest_path) change_stats = synchronizer.synchronize(parsed_results) output_report["changes"] = change_stats if args.update_in_place: if sum(change_stats.values()) > 0: synchronizer.write_xml() logger.info("[INFO][ACTION][manifest_updated] Манифест был успешно обновлен.") else: logger.info("[INFO][ACTION][manifest_not_updated] Изменений не было, манифест не перезаписан.") output_report["status"] = "success" except (FileNotFoundError, ValueError, ET.ParseError) as e: logger.critical("[FATAL][EXECUTION][critical_error] Критическая ошибка: %s", e, exc_info=True) output_report["error_message"] = str(e) finally: print(json.dumps(output_report, indent=2, ensure_ascii=False)) if output_report["status"] == "failure": sys.exit(1) # [END_ENTITY: Function('main')] # [CONTRACT] if __name__ == "__main__": logger = logging.getLogger(__name__) main() # [END_CONTRACT] # [END_FILE_extract_semantics.py]