502 lines
28 KiB
Python
502 lines
28 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
|
||
# [PACKAGE] tools.semantic_parser
|
||
# [FILE] extract_semantics.py
|
||
# [SEMANTICS] cli, parser, xml, json, file_io, design_by_contract, structured_logging, protocol_resolver, graphrag, validation, manifest_synchronization
|
||
|
||
# [AI_NOTE]: Этот скрипт является эталонной реализацией всех четырех ключевых принципов
|
||
# семантического обогащения. Он не только проверяет код на соответствие этим правилам,
|
||
# но и сам написан с их неукоснительным соблюдением.
|
||
# Версия 2.0 добавляет функциональность синхронизации манифеста.
|
||
|
||
# [IMPORTS]
|
||
import sys
|
||
import re
|
||
import json
|
||
import argparse
|
||
import os
|
||
import logging
|
||
import xml.etree.ElementTree as ET
|
||
from typing import List, Dict, Any, Optional, Set
|
||
# [END_IMPORTS]
|
||
|
||
|
||
# [ENTITY: Class('StructuredFormatter')]
|
||
# [RELATION: Class('StructuredFormatter')] -> [INHERITS_FROM] -> [Class('logging.Formatter')]
|
||
class StructuredFormatter(logging.Formatter):
|
||
"""
|
||
@summary Форматтер для логов, реализующий стандарт AIFriendlyLogging.
|
||
@invariant Каждый лог, отформатированный этим классом, будет иметь структуру "[LEVEL][ANCHOR][STATE] message".
|
||
@sideeffect Отсутствуют.
|
||
"""
|
||
def format(self, record: logging.LogRecord) -> str:
|
||
assert record.msg is not None, "Сообщение лога не может быть None."
|
||
record.msg = f"[{record.levelname.upper()}]{record.msg}"
|
||
result = super().format(record)
|
||
assert result.startswith(f"[{record.levelname.upper()}]"), "Постусловие нарушено: лог не начинается с уровня."
|
||
return result
|
||
# [END_ENTITY: Class('StructuredFormatter')]
|
||
|
||
|
||
# [ENTITY: Class('SemanticProtocol')]
|
||
# [RELATION: Class('SemanticProtocol')] -> [DEPENDS_ON] -> [Module('xml.etree.ElementTree')]
|
||
class SemanticProtocol:
|
||
"""
|
||
@summary Загружает, разрешает <INCLUDE> и предоставляет доступ к правилам из протокола.
|
||
@description Этот класс действует как 'резолвер протоколов', рекурсивно обрабатывая
|
||
теги <INCLUDE> и объединяя правила из нескольких файлов в единый набор.
|
||
@invariant Экземпляр класса всегда содержит полный, объединенный набор правил.
|
||
@sideeffect Читает несколько файлов с диска при инициализации.
|
||
"""
|
||
def __init__(self, main_protocol_path: str):
|
||
logger.debug("[DEBUG][ENTRYPOINT][initializing_protocol] Инициализация протокола из главного файла: '%s'", main_protocol_path)
|
||
if not os.path.exists(main_protocol_path):
|
||
raise FileNotFoundError(f"Главный файл протокола не найден: {main_protocol_path}")
|
||
|
||
self.processed_paths: Set[str] = set()
|
||
self.all_rule_nodes: List[ET.Element] = []
|
||
self._resolve_and_load(main_protocol_path)
|
||
|
||
self.rules = self._parse_all_rules()
|
||
logger.info("[INFO][ACTION][resolution_complete] Разрешение протокола завершено. Всего загружено правил: %d", len(self.rules))
|
||
|
||
def _resolve_and_load(self, file_path: str):
|
||
abs_path = os.path.abspath(file_path)
|
||
if abs_path in self.processed_paths:
|
||
return
|
||
|
||
logger.info("[INFO][ACTION][resolving_includes] Обработка файла протокола: %s", abs_path)
|
||
self.processed_paths.add(abs_path)
|
||
|
||
try:
|
||
tree = ET.parse(abs_path)
|
||
root = tree.getroot()
|
||
except ET.ParseError as e:
|
||
logger.error("[ERROR][ACTION][parsing_failed] Ошибка парсинга XML в файле %s: %s", abs_path, e)
|
||
return
|
||
|
||
self.all_rule_nodes.extend(root.findall(".//Rule"))
|
||
|
||
base_dir = os.path.dirname(abs_path)
|
||
for include_node in root.findall(".//INCLUDE"):
|
||
relative_path = include_node.get("from")
|
||
if relative_path and relative_path.lower().endswith('.xml'):
|
||
included_path = os.path.join(base_dir, relative_path)
|
||
self._resolve_and_load(included_path)
|
||
|
||
def _parse_all_rules(self) -> Dict[str, Dict[str, Any]]:
|
||
rules_dict = {}
|
||
for rule_node in self.all_rule_nodes:
|
||
rule_id = rule_node.get('id')
|
||
if not rule_id: continue
|
||
definition_node = rule_node.find("Definition")
|
||
rules_dict[rule_id] = self._parse_definition(definition_node)
|
||
return rules_dict
|
||
|
||
def _parse_definition(self, node: Optional[ET.Element]) -> Optional[Dict[str, Any]]:
|
||
if node is None: return None
|
||
def_type = node.get("type")
|
||
if def_type in ("regex", "dynamic_regex", "negative_regex"):
|
||
return {"type": def_type, "pattern": node.findtext("Pattern", "")}
|
||
if def_type == "paired_regex":
|
||
return {"type": def_type, "start": node.findtext("Pattern[@name='start']", ""), "end": node.findtext("Pattern[@name='end']", "")}
|
||
if def_type == "multi_check":
|
||
checks = []
|
||
for check_node in node.findall(".//Check"):
|
||
check_data = check_node.attrib
|
||
check_data['failure_message'] = check_node.findtext("FailureMessage", "")
|
||
if check_data.get('type') == 'block_order':
|
||
check_data['preceding_pattern'] = check_node.findtext("PrecedingBlockPattern", "")
|
||
check_data['following_pattern'] = check_node.findtext("FollowingBlockPattern", "")
|
||
elif check_data.get('type') == 'kdoc_validation':
|
||
check_data['for_function'] = {t.get('name'): t.get('condition') for t in check_node.findall(".//RequiredTagsForFunction/Tag")}
|
||
check_data['for_class'] = {t.get('name'): t.get('condition') for t in check_node.findall(".//RequiredTagsForClass/Tag")}
|
||
elif check_data.get('type') == 'contract_enforcement':
|
||
condition_node = check_node.find("Condition")
|
||
check_data['kdoc_tag'] = condition_node.get('kdoc_tag')
|
||
check_data['code_must_contain'] = condition_node.get('code_must_contain')
|
||
elif check_data.get('type') == 'entity_type_validation':
|
||
check_data['valid_types'] = {t.text for t in check_node.findall(".//ValidEntityTypes/Type")}
|
||
elif check_data.get('type') == 'relation_validation':
|
||
check_data['triplet_pattern'] = check_node.findtext("TripletPattern", "")
|
||
check_data['valid_relations'] = {t.text for t in check_node.findall(".//ValidRelationTypes/Type")}
|
||
else:
|
||
check_data['pattern'] = check_node.findtext("Pattern", "")
|
||
checks.append(check_data)
|
||
return {"type": def_type, "checks": checks}
|
||
return None
|
||
|
||
def get_rule(self, rule_id: str) -> Optional[Dict[str, Any]]:
|
||
return self.rules.get(rule_id)
|
||
# [END_ENTITY: Class('SemanticProtocol')]
|
||
|
||
|
||
# [ENTITY: Class('CodeValidator')]
|
||
# [RELATION: Class('CodeValidator')] -> [DEPENDS_ON] -> [Class('SemanticProtocol')]
|
||
class CodeValidator:
|
||
"""
|
||
@summary Применяет правила из протокола к содержимому файла для поиска ошибок.
|
||
@invariant Всегда работает с валидным и загруженным экземпляром SemanticProtocol.
|
||
"""
|
||
def __init__(self, protocol: SemanticProtocol):
|
||
self.protocol = protocol
|
||
|
||
def validate(self, file_path: str, content: str, entity_blocks: List[str]) -> List[str]:
|
||
errors = []
|
||
rules = self.protocol.rules
|
||
|
||
if "AIFriendlyLogging" in rules:
|
||
errors.extend(self._validate_logging(file_path, content, rules["AIFriendlyLogging"]))
|
||
|
||
if "DesignByContract" in rules or "GraphRAG" in rules:
|
||
for entity_content in entity_blocks:
|
||
if "DesignByContract" in rules:
|
||
errors.extend(self._validate_entity_dbc(entity_content, rules["DesignByContract"]))
|
||
if "GraphRAG" in rules:
|
||
errors.extend(self._validate_entity_graphrag(entity_content, rules["GraphRAG"]))
|
||
|
||
return list(set(errors))
|
||
|
||
def _validate_logging(self, file_path: str, content: str, rule: Dict) -> List[str]:
|
||
errors = []
|
||
if rule.get('type') != 'multi_check': return []
|
||
for check in rule['checks']:
|
||
if check.get('type') == 'negative_regex_in_path' and check.get('path_contains') in file_path and re.search(check['pattern'], content):
|
||
errors.append(check['failure_message'])
|
||
elif check.get('type') == 'negative_regex' and re.search(check['pattern'], content):
|
||
errors.append(check['failure_message'])
|
||
elif check.get('type') == 'positive_regex_on_match':
|
||
for line in content.splitlines():
|
||
if re.search(check['trigger'], line) and not re.search(check['pattern'], line):
|
||
errors.append(f"{check['failure_message']} [Строка: '{line.strip()}']")
|
||
return errors
|
||
|
||
def _validate_entity_dbc(self, entity_content: str, rule: Dict) -> List[str]:
|
||
errors = []
|
||
if rule.get('type') != 'multi_check': return []
|
||
kdoc_match = re.search(r"(\/\*\*[\s\S]*?\*\/)", entity_content)
|
||
kdoc = kdoc_match.group(1) if kdoc_match else ""
|
||
signature_match = re.search(r"\s*(public\s+|private\s+|internal\s+)?(class|interface|fun|object)\s+\w+", entity_content)
|
||
is_public = not (signature_match and signature_match.group(1) and 'private' in signature_match.group(1)) if signature_match else False
|
||
|
||
for check in rule['checks']:
|
||
if not is_public and check.get('type') != 'block_order': continue # Проверки контрактов только для public
|
||
if check.get('type') == 'kdoc_validation':
|
||
is_class = bool(re.search(r"\s*(class|interface|object)", entity_content))
|
||
if is_class:
|
||
for tag, _ in check['for_class'].items():
|
||
if tag not in kdoc: errors.append(f"{check['failure_message']} ({tag})")
|
||
else: # is_function
|
||
has_params = bool(re.search(r"fun\s+\w+\s*\((.|\s)*\S(.|\s)*\)", entity_content))
|
||
returns_value = not bool(re.search(r"fun\s+\w+\(.*\)\s*:\s*Unit", entity_content) or not re.search(r"fun\s+\w+\(.*\)\s*:", entity_content))
|
||
for tag, cond in check['for_function'].items():
|
||
if tag not in kdoc and (not cond or (cond == 'has_parameters' and has_params) or (cond == 'returns_value' and returns_value)):
|
||
errors.append(f"{check['failure_message']} ({tag})")
|
||
elif check.get('type') == 'contract_enforcement' and check['kdoc_tag'] in kdoc and not re.search(check['code_must_contain'], entity_content):
|
||
errors.append(check['failure_message'])
|
||
return errors
|
||
|
||
def _validate_entity_graphrag(self, entity_content: str, rule: Dict) -> List[str]:
|
||
errors = []
|
||
if rule.get('type') != 'multi_check': return []
|
||
markup_block_match = re.search(r"^([\s\S]*?)(\/\*\*|class|interface|fun|object)", entity_content)
|
||
markup_block = markup_block_match.group(1) if markup_block_match else ""
|
||
|
||
for check in rule['checks']:
|
||
if check.get('type') == 'block_order' and "/**" in markup_block:
|
||
errors.append(check['failure_message'])
|
||
elif check.get('type') == 'entity_type_validation':
|
||
entity_match = re.search(r"//\s*\[ENTITY:\s*(?P<type>\w+)\(‘(?P<name>.*?)’\)\]", markup_block)
|
||
if entity_match and entity_match.group('type') not in check['valid_types']:
|
||
errors.append(f"{check['failure_message']} Найдено: ‘{entity_match.group('type')}’.")
|
||
elif check.get('type') == 'relation_validation':
|
||
for line in re.findall(r"//\s*\[RELATION:.*\]", markup_block):
|
||
match = re.match(check['triplet_pattern'], line)
|
||
if not match:
|
||
errors.append(f"{check['failure_message']} (неверный формат). Строка: ‘{line.strip()}’")
|
||
elif match.group('relation_type') not in check['valid_relations']:
|
||
errors.append(f"{check['failure_message']} Найдено: ‘[{match.group('relation_type')}]’.")
|
||
elif check.get('type') == 'markup_cohesion':
|
||
for line in markup_block.strip().split('\n'):
|
||
if line.strip() and not line.strip().startswith('//'):
|
||
errors.append(check['failure_message']); break
|
||
return errors
|
||
# [END_ENTITY: Class('CodeValidator')]
|
||
|
||
|
||
# [ENTITY: Class('SemanticParser')]
|
||
# [RELATION: Class('SemanticParser')] -> [DEPENDS_ON] -> [Class('SemanticProtocol')]
|
||
# [RELATION: Class('SemanticParser')] -> [CREATES_INSTANCE_OF] -> [Class('CodeValidator')]
|
||
class SemanticParser:
|
||
"""
|
||
@summary Оркестрирует процесс валидации и парсинга исходных файлов.
|
||
@invariant Всегда работает с валидным и загруженным экземпляром SemanticProtocol.
|
||
@sideeffect Читает содержимое файлов, переданных для парсинга.
|
||
"""
|
||
def __init__(self, protocol: SemanticProtocol):
|
||
assert isinstance(protocol, SemanticProtocol), "Объект protocol должен быть экземпляром SemanticProtocol."
|
||
self.protocol = protocol
|
||
self.validator = CodeValidator(protocol)
|
||
|
||
def parse_file(self, file_path: str) -> Dict[str, Any]:
|
||
logger.info("[INFO][ENTRYPOINT][parsing_file] Начало парсинга файла: '%s'", file_path)
|
||
try:
|
||
with open(file_path, 'r', encoding='utf-8') as f:
|
||
content = f.read()
|
||
except Exception as e:
|
||
return {"file_path": file_path, "status": "error", "error_message": f"Не удалось прочитать файл: {e}"}
|
||
|
||
entity_rule = self.protocol.get_rule("EntityContainerization")
|
||
entity_blocks = re.findall(entity_rule['start'] + r'[\s\S]*?' + entity_rule['end'], content, re.DOTALL) if entity_rule else []
|
||
|
||
validation_errors = self.validator.validate(file_path, content, entity_blocks)
|
||
|
||
header_rule = self.protocol.get_rule("FileHeaderIntegrity")
|
||
if not re.search(header_rule['pattern'], content) if header_rule else None:
|
||
msg = "Нарушение целостности заголовка (правило FileHeaderIntegrity)."
|
||
if msg not in validation_errors: validation_errors.append(msg)
|
||
|
||
if validation_errors:
|
||
logger.warn("[WARN][ACTION][validation_failed] Файл %s не прошел валидацию: %s", file_path, " | ".join(validation_errors))
|
||
return {"file_path": file_path, "status": "error", "error_message": " | ".join(validation_errors)}
|
||
|
||
header_match = re.search(header_rule['pattern'], content)
|
||
header_data = header_match.groupdict()
|
||
file_info = {
|
||
"file_path": file_path, "status": "success",
|
||
"header": {"package": header_data.get('package','').strip(), "file_name": header_data.get('file','').strip(), "semantics_tags": [t.strip() for t in header_data.get('semantics','').split(',')]},
|
||
"entities": self._extract_entities(content)
|
||
}
|
||
|
||
logger.info("[INFO][POSTCONDITION][parsing_complete] Парсинг файла завершен. Найдено сущностей: %d", len(file_info["entities"]))
|
||
return file_info
|
||
|
||
def _extract_entities(self, content: str) -> List[Dict[str, Any]]:
|
||
entity_rule = self.protocol.get_rule("EntityContainerization")
|
||
if not entity_rule: return []
|
||
entities = []
|
||
for match in re.finditer(entity_rule['start'] + r'(?P<body>.*?)' + entity_rule['end'], content, re.DOTALL):
|
||
data = match.groupdict()
|
||
kdoc = self._parse_kdoc(data.get('body', ''))
|
||
e_type, e_name = data.get('type', 'N/A'), data.get('name', 'N/A')
|
||
type_snake = re.sub(r'(?<!^)(?=[A-Z])', '_', e_type).lower()
|
||
name_snake = re.sub(r'[^a-zA-Z0-9_]', '', e_name.replace(' ', '_')).lower()
|
||
entities.append({
|
||
"node_id": f"{type_snake}_{name_snake}", "entity_type": e_type, "entity_name": e_name,
|
||
"summary": kdoc['summary'], "description": kdoc['description'], "relations": kdoc['relations']
|
||
})
|
||
return entities
|
||
|
||
def _parse_kdoc(self, body: str) -> Dict[str, Any]:
|
||
summary_match = re.search(r"@summary\s*(.*)", body)
|
||
summary = summary_match.group(1).strip() if summary_match else ""
|
||
desc_match = re.search(r"@description\s*(.*)", body, re.DOTALL)
|
||
desc = ""
|
||
if desc_match:
|
||
lines = [re.sub(r"^\s*\*\s?", "", l).strip() for l in desc_match.group(1).strip().split('\n')]
|
||
desc = " ".join(lines)
|
||
relations = [m.groupdict() for m in re.finditer(r"[RELATION:\s*(?P<type>\w+)\s*target_id='(?P<target>.*?)']", body)]
|
||
return {"summary": summary, "description": desc, "relations": relations}
|
||
# [END_ENTITY: Class('SemanticParser')]
|
||
|
||
|
||
# [ENTITY: Class('ManifestSynchronizer')]
|
||
# [RELATION: Class('ManifestSynchronizer')] -> [DEPENDS_ON] -> [Module('xml.etree.ElementTree')]
|
||
# [RELATION: Class('ManifestSynchronizer')] -> [MODIFIES_STATE_OF] -> [DataStructure('PROJECT_MANIFEST.xml')]
|
||
class ManifestSynchronizer:
|
||
"""
|
||
@summary Управляет чтением, сравнением и обновлением PROJECT_MANIFEST.xml.
|
||
@invariant Экземпляр класса всегда работает с корректно загруженным XML-деревом.
|
||
@sideeffect Читает и может перезаписывать файл манифеста на диске.
|
||
"""
|
||
def __init__(self, manifest_path: str):
|
||
"""
|
||
@param manifest_path: Путь к файлу PROJECT_MANIFEST.xml.
|
||
@sideeffect Читает и парсит XML-файл. Вызывает исключение, если файл не найден или поврежден.
|
||
"""
|
||
require(os.path.exists(manifest_path), f"Файл манифеста не найден: {manifest_path}")
|
||
logger.info("[INFO][ENTRYPOINT][manifest_loading] Загрузка манифеста: %s", manifest_path)
|
||
self.manifest_path = manifest_path
|
||
try:
|
||
self.tree = ET.parse(manifest_path)
|
||
self.root = self.tree.getroot()
|
||
self.graph_node = self.root.find("PROJECT_GRAPH")
|
||
if self.graph_node is None:
|
||
raise ValueError("В манифесте отсутствует тег <PROJECT_GRAPH>")
|
||
except (ET.ParseError, ValueError) as e:
|
||
logger.error("[ERROR][ACTION][manifest_parsing_failed] Ошибка парсинга манифеста: %s", e)
|
||
raise ValueError(f"Ошибка парсинга манифеста: {e}")
|
||
|
||
def synchronize(self, parsed_code_data: List[Dict[str, Any]]) -> Dict[str, int]:
|
||
"""
|
||
@summary Синхронизирует состояние манифеста с состоянием кодовой базы.
|
||
@param parsed_code_data: Список словарей, представляющих состояние файлов, от SemanticParser.
|
||
@return Словарь со статистикой изменений.
|
||
@sideeffect Модифицирует внутреннее XML-дерево.
|
||
"""
|
||
stats = {"nodes_added": 0, "nodes_updated": 0, "nodes_removed": 0}
|
||
|
||
all_code_node_ids = {
|
||
entity["node_id"]
|
||
for file_data in parsed_code_data if file_data["status"] == "success"
|
||
for entity in file_data["entities"]
|
||
}
|
||
|
||
manifest_nodes_map = {node.get("id"): node for node in self.graph_node.findall("NODE")}
|
||
manifest_node_ids = set(manifest_nodes_map.keys())
|
||
|
||
# Удаление узлов, которых больше нет в коде
|
||
nodes_to_remove = manifest_node_ids - all_code_node_ids
|
||
for node_id in nodes_to_remove:
|
||
logger.debug("[DEBUG][ACTION][removing_node] Удаление устаревшего узла: %s", node_id)
|
||
self.graph_node.remove(manifest_nodes_map[node_id])
|
||
stats["nodes_removed"] += 1
|
||
|
||
# Добавление и обновление узлов
|
||
for file_data in parsed_code_data:
|
||
if file_data["status"] != "success":
|
||
continue
|
||
for entity in file_data["entities"]:
|
||
node_id = entity["node_id"]
|
||
existing_node = manifest_nodes_map.get(node_id)
|
||
|
||
if existing_node is None:
|
||
logger.debug("[DEBUG][ACTION][adding_node] Добавление нового узла: %s", node_id)
|
||
new_node = ET.SubElement(self.graph_node, "NODE", id=node_id)
|
||
self._update_node_attributes(new_node, entity, file_data)
|
||
stats["nodes_added"] += 1
|
||
else:
|
||
if self._is_update_needed(existing_node, entity, file_data):
|
||
logger.debug("[DEBUG][ACTION][updating_node] Обновление узла: %s", node_id)
|
||
self._update_node_attributes(existing_node, entity, file_data)
|
||
stats["nodes_updated"] += 1
|
||
|
||
logger.info("[INFO][POSTCONDITION][synchronization_complete] Синхронизация завершена. Статистика: %s", stats)
|
||
return stats
|
||
|
||
def _update_node_attributes(self, node: ET.Element, entity: Dict, file_data: Dict):
|
||
node.set("type", entity["entity_type"])
|
||
node.set("name", entity["entity_name"])
|
||
node.set("file_path", file_data["file_path"])
|
||
node.set("package", file_data["header"]["package"])
|
||
|
||
# Очистка и добавление дочерних тегов
|
||
for child in list(node):
|
||
node.remove(child)
|
||
|
||
ET.SubElement(node, "SUMMARY").text = entity["summary"]
|
||
ET.SubElement(node, "DESCRIPTION").text = entity["description"]
|
||
tags_node = ET.SubElement(node, "SEMANTICS_TAGS")
|
||
tags_node.text = ", ".join(file_data["header"]["semantics_tags"])
|
||
|
||
relations_node = ET.SubElement(node, "RELATIONS")
|
||
for rel in entity["relations"]:
|
||
ET.SubElement(relations_node, "RELATION", type=rel["type"], target_id=rel["target"])
|
||
|
||
def _is_update_needed(self, node: ET.Element, entity: Dict, file_data: Dict) -> bool:
|
||
# Простая проверка по нескольким ключевым полям
|
||
if node.get("type") != entity["entity_type"] or node.get("name") != entity["entity_name"]:
|
||
return True
|
||
summary_node = node.find("SUMMARY")
|
||
if summary_node is None or summary_node.text != entity["summary"]:
|
||
return True
|
||
return False
|
||
|
||
def write_xml(self):
|
||
"""
|
||
@summary Записывает измененное XML-дерево обратно в файл.
|
||
@sideeffect Перезаписывает файл манифеста на диске.
|
||
"""
|
||
require(self.tree is not None, "XML-дерево не было инициализировано.")
|
||
logger.info("[INFO][ACTION][writing_manifest] Запись изменений в файл манифеста: %s", self.manifest_path)
|
||
ET.indent(self.tree, space=" ")
|
||
self.tree.write(self.manifest_path, encoding="utf-8", xml_declaration=True)
|
||
# [END_ENTITY: Class('ManifestSynchronizer')]
|
||
|
||
|
||
# [ENTITY: Function('require')]
|
||
def require(condition: bool, message: str):
|
||
"""
|
||
@summary Проверяет предусловие и вызывает ValueError, если оно ложно.
|
||
@param condition: Условие для проверки.
|
||
@param message: Сообщение об ошибке.
|
||
@sideeffect Вызывает исключение при ложном условии.
|
||
"""
|
||
if not condition:
|
||
raise ValueError(message)
|
||
# [END_ENTITY: Function('require')]
|
||
|
||
|
||
# [ENTITY: Function('main')]
|
||
# [RELATION: Function('main')] -> [CREATES_INSTANCE_OF] -> [Class('SemanticProtocol')]
|
||
# [RELATION: Function('main')] -> [CREATES_INSTANCE_OF] -> [Class('SemanticParser')]
|
||
# [RELATION: Function('main')] -> [CREATES_INSTANCE_OF] -> [Class('ManifestSynchronizer')]
|
||
def main():
|
||
"""
|
||
@summary Главная точка входа в приложение.
|
||
@description Управляет жизненным циклом: парсинг аргументов, настройка логирования,
|
||
запуск парсинга файлов и синхронизации манифеста.
|
||
@sideeffect Читает аргументы командной строки, выводит результат в stdout/stderr.
|
||
"""
|
||
parser = argparse.ArgumentParser(description="Парсит .kt файлы и синхронизирует манифест проекта.")
|
||
parser.add_argument('files', nargs='+', help="Список .kt файлов для обработки.")
|
||
parser.add_argument('--protocol', required=True, help="Путь к главному файлу протокола.")
|
||
parser.add_argument('--manifest-path', required=True, help="Путь к файлу PROJECT_MANIFEST.xml.")
|
||
parser.add_argument('--update-in-place', action='store_true', help="Если указано, перезаписывает файл манифеста.")
|
||
parser.add_argument('--log-level', default='INFO', choices=['DEBUG', 'INFO', 'WARN', 'ERROR'], help="Уровень логирования.")
|
||
args = parser.parse_args()
|
||
|
||
logger.setLevel(args.log_level)
|
||
handler = logging.StreamHandler(sys.stderr)
|
||
handler.setFormatter(StructuredFormatter())
|
||
logger.addHandler(handler)
|
||
logger.info("[INFO][INITIALIZATION][configuring_logger] Логгер настроен. Уровень: %s", args.log_level)
|
||
|
||
output_report = {
|
||
"status": "failure",
|
||
"manifest_path": args.manifest_path,
|
||
"files_scanned": len(args.files),
|
||
"files_with_errors": 0,
|
||
"changes": {}
|
||
}
|
||
|
||
try:
|
||
protocol = SemanticProtocol(args.protocol)
|
||
parser_instance = SemanticParser(protocol)
|
||
|
||
parsed_results = [parser_instance.parse_file(f) for f in args.files]
|
||
output_report["files_with_errors"] = sum(1 for r in parsed_results if r["status"] == "error")
|
||
|
||
synchronizer = ManifestSynchronizer(args.manifest_path)
|
||
change_stats = synchronizer.synchronize(parsed_results)
|
||
output_report["changes"] = change_stats
|
||
|
||
if args.update_in_place:
|
||
if sum(change_stats.values()) > 0:
|
||
synchronizer.write_xml()
|
||
logger.info("[INFO][ACTION][manifest_updated] Манифест был успешно обновлен.")
|
||
else:
|
||
logger.info("[INFO][ACTION][manifest_not_updated] Изменений не было, манифест не перезаписан.")
|
||
|
||
output_report["status"] = "success"
|
||
|
||
except (FileNotFoundError, ValueError, ET.ParseError) as e:
|
||
logger.critical("[FATAL][EXECUTION][critical_error] Критическая ошибка: %s", e, exc_info=True)
|
||
output_report["error_message"] = str(e)
|
||
|
||
finally:
|
||
print(json.dumps(output_report, indent=2, ensure_ascii=False))
|
||
if output_report["status"] == "failure":
|
||
sys.exit(1)
|
||
|
||
# [END_ENTITY: Function('main')]
|
||
|
||
# [CONTRACT]
|
||
if __name__ == "__main__":
|
||
logger = logging.getLogger(__name__)
|
||
main()
|
||
# [END_CONTRACT]
|
||
|
||
# [END_FILE_extract_semantics.py]
|