231 lines
9.6 KiB
Python
Executable File
231 lines
9.6 KiB
Python
Executable File
# [DEF:ConfigManagerModule:Module]
|
|
#
|
|
# @SEMANTICS: config, manager, persistence, json
|
|
# @PURPOSE: Manages application configuration, including loading/saving to JSON and CRUD for environments.
|
|
# @LAYER: Core
|
|
# @RELATION: DEPENDS_ON -> ConfigModels
|
|
# @RELATION: CALLS -> logger
|
|
# @RELATION: WRITES_TO -> config.json
|
|
#
|
|
# @INVARIANT: Configuration must always be valid according to AppConfig model.
|
|
# @PUBLIC_API: ConfigManager
|
|
|
|
# [SECTION: IMPORTS]
|
|
import json
|
|
import os
|
|
from pathlib import Path
|
|
from typing import Optional, List
|
|
from .config_models import AppConfig, Environment, GlobalSettings
|
|
from .logger import logger
|
|
# [/SECTION]
|
|
|
|
# [DEF:ConfigManager:Class]
|
|
# @PURPOSE: A class to handle application configuration persistence and management.
|
|
# @RELATION: WRITES_TO -> config.json
|
|
class ConfigManager:
|
|
|
|
# [DEF:__init__:Function]
|
|
# @PURPOSE: Initializes the ConfigManager.
|
|
# @PRE: isinstance(config_path, str) and len(config_path) > 0
|
|
# @POST: self.config is an instance of AppConfig
|
|
# @PARAM: config_path (str) - Path to the configuration file.
|
|
def __init__(self, config_path: str = "config.json"):
|
|
# 1. Runtime check of @PRE
|
|
assert isinstance(config_path, str) and config_path, "config_path must be a non-empty string"
|
|
|
|
logger.info(f"[ConfigManager][Entry] Initializing with {config_path}")
|
|
|
|
# 2. Logic implementation
|
|
self.config_path = Path(config_path)
|
|
self.config: AppConfig = self._load_config()
|
|
|
|
# 3. Runtime check of @POST
|
|
assert isinstance(self.config, AppConfig), "self.config must be an instance of AppConfig"
|
|
|
|
logger.info(f"[ConfigManager][Exit] Initialized")
|
|
# [/DEF:__init__]
|
|
|
|
# [DEF:_load_config:Function]
|
|
# @PURPOSE: Loads the configuration from disk or creates a default one.
|
|
# @POST: isinstance(return, AppConfig)
|
|
# @RETURN: AppConfig - The loaded or default configuration.
|
|
def _load_config(self) -> AppConfig:
|
|
logger.debug(f"[_load_config][Entry] Loading from {self.config_path}")
|
|
|
|
if not self.config_path.exists():
|
|
logger.info(f"[_load_config][Action] Config file not found. Creating default.")
|
|
default_config = AppConfig(
|
|
environments=[],
|
|
settings=GlobalSettings(backup_path="backups")
|
|
)
|
|
self._save_config_to_disk(default_config)
|
|
return default_config
|
|
|
|
try:
|
|
with open(self.config_path, "r") as f:
|
|
data = json.load(f)
|
|
config = AppConfig(**data)
|
|
logger.info(f"[_load_config][Coherence:OK] Configuration loaded")
|
|
return config
|
|
except Exception as e:
|
|
logger.error(f"[_load_config][Coherence:Failed] Error loading config: {e}")
|
|
return AppConfig(
|
|
environments=[],
|
|
settings=GlobalSettings(backup_path="backups")
|
|
)
|
|
# [/DEF:_load_config]
|
|
|
|
# [DEF:_save_config_to_disk:Function]
|
|
# @PURPOSE: Saves the provided configuration object to disk.
|
|
# @PRE: isinstance(config, AppConfig)
|
|
# @PARAM: config (AppConfig) - The configuration to save.
|
|
def _save_config_to_disk(self, config: AppConfig):
|
|
logger.debug(f"[_save_config_to_disk][Entry] Saving to {self.config_path}")
|
|
|
|
# 1. Runtime check of @PRE
|
|
assert isinstance(config, AppConfig), "config must be an instance of AppConfig"
|
|
|
|
# 2. Logic implementation
|
|
try:
|
|
with open(self.config_path, "w") as f:
|
|
json.dump(config.dict(), f, indent=4)
|
|
logger.info(f"[_save_config_to_disk][Action] Configuration saved")
|
|
except Exception as e:
|
|
logger.error(f"[_save_config_to_disk][Coherence:Failed] Failed to save: {e}")
|
|
# [/DEF:_save_config_to_disk]
|
|
|
|
# [DEF:save:Function]
|
|
# @PURPOSE: Saves the current configuration state to disk.
|
|
def save(self):
|
|
self._save_config_to_disk(self.config)
|
|
# [/DEF:save]
|
|
|
|
# [DEF:get_config:Function]
|
|
# @PURPOSE: Returns the current configuration.
|
|
# @RETURN: AppConfig - The current configuration.
|
|
def get_config(self) -> AppConfig:
|
|
return self.config
|
|
# [/DEF:get_config]
|
|
|
|
# [DEF:update_global_settings:Function]
|
|
# @PURPOSE: Updates the global settings and persists the change.
|
|
# @PRE: isinstance(settings, GlobalSettings)
|
|
# @PARAM: settings (GlobalSettings) - The new global settings.
|
|
def update_global_settings(self, settings: GlobalSettings):
|
|
logger.info(f"[update_global_settings][Entry] Updating settings")
|
|
|
|
# 1. Runtime check of @PRE
|
|
assert isinstance(settings, GlobalSettings), "settings must be an instance of GlobalSettings"
|
|
|
|
# 2. Logic implementation
|
|
self.config.settings = settings
|
|
self.save()
|
|
|
|
logger.info(f"[update_global_settings][Exit] Settings updated")
|
|
# [/DEF:update_global_settings]
|
|
|
|
# [DEF:validate_path:Function]
|
|
# @PURPOSE: Validates if a path exists and is writable.
|
|
# @PARAM: path (str) - The path to validate.
|
|
# @RETURN: tuple (bool, str) - (is_valid, message)
|
|
def validate_path(self, path: str) -> tuple[bool, str]:
|
|
p = os.path.abspath(path)
|
|
if not os.path.exists(p):
|
|
try:
|
|
os.makedirs(p, exist_ok=True)
|
|
except Exception as e:
|
|
return False, f"Path does not exist and could not be created: {e}"
|
|
|
|
if not os.access(p, os.W_OK):
|
|
return False, "Path is not writable"
|
|
|
|
return True, "Path is valid and writable"
|
|
# [/DEF:validate_path]
|
|
|
|
# [DEF:get_environments:Function]
|
|
# @PURPOSE: Returns the list of configured environments.
|
|
# @RETURN: List[Environment] - List of environments.
|
|
def get_environments(self) -> List[Environment]:
|
|
return self.config.environments
|
|
# [/DEF:get_environments]
|
|
|
|
# [DEF:has_environments:Function]
|
|
# @PURPOSE: Checks if at least one environment is configured.
|
|
# @RETURN: bool - True if at least one environment exists.
|
|
def has_environments(self) -> bool:
|
|
return len(self.config.environments) > 0
|
|
# [/DEF:has_environments]
|
|
|
|
# [DEF:add_environment:Function]
|
|
# @PURPOSE: Adds a new environment to the configuration.
|
|
# @PRE: isinstance(env, Environment)
|
|
# @PARAM: env (Environment) - The environment to add.
|
|
def add_environment(self, env: Environment):
|
|
logger.info(f"[add_environment][Entry] Adding environment {env.id}")
|
|
|
|
# 1. Runtime check of @PRE
|
|
assert isinstance(env, Environment), "env must be an instance of Environment"
|
|
|
|
# 2. Logic implementation
|
|
# Check for duplicate ID and remove if exists
|
|
self.config.environments = [e for e in self.config.environments if e.id != env.id]
|
|
self.config.environments.append(env)
|
|
self.save()
|
|
|
|
logger.info(f"[add_environment][Exit] Environment added")
|
|
# [/DEF:add_environment]
|
|
|
|
# [DEF:update_environment:Function]
|
|
# @PURPOSE: Updates an existing environment.
|
|
# @PRE: isinstance(env_id, str) and len(env_id) > 0 and isinstance(updated_env, Environment)
|
|
# @PARAM: env_id (str) - The ID of the environment to update.
|
|
# @PARAM: updated_env (Environment) - The updated environment data.
|
|
# @RETURN: bool - True if updated, False otherwise.
|
|
def update_environment(self, env_id: str, updated_env: Environment) -> bool:
|
|
logger.info(f"[update_environment][Entry] Updating {env_id}")
|
|
|
|
# 1. Runtime check of @PRE
|
|
assert env_id and isinstance(env_id, str), "env_id must be a non-empty string"
|
|
assert isinstance(updated_env, Environment), "updated_env must be an instance of Environment"
|
|
|
|
# 2. Logic implementation
|
|
for i, env in enumerate(self.config.environments):
|
|
if env.id == env_id:
|
|
# If password is masked, keep the old one
|
|
if updated_env.password == "********":
|
|
updated_env.password = env.password
|
|
|
|
self.config.environments[i] = updated_env
|
|
self.save()
|
|
logger.info(f"[update_environment][Coherence:OK] Updated {env_id}")
|
|
return True
|
|
|
|
logger.warning(f"[update_environment][Coherence:Failed] Environment {env_id} not found")
|
|
return False
|
|
# [/DEF:update_environment]
|
|
|
|
# [DEF:delete_environment:Function]
|
|
# @PURPOSE: Deletes an environment by ID.
|
|
# @PRE: isinstance(env_id, str) and len(env_id) > 0
|
|
# @PARAM: env_id (str) - The ID of the environment to delete.
|
|
def delete_environment(self, env_id: str):
|
|
logger.info(f"[delete_environment][Entry] Deleting {env_id}")
|
|
|
|
# 1. Runtime check of @PRE
|
|
assert env_id and isinstance(env_id, str), "env_id must be a non-empty string"
|
|
|
|
# 2. Logic implementation
|
|
original_count = len(self.config.environments)
|
|
self.config.environments = [e for e in self.config.environments if e.id != env_id]
|
|
|
|
if len(self.config.environments) < original_count:
|
|
self.save()
|
|
logger.info(f"[delete_environment][Action] Deleted {env_id}")
|
|
else:
|
|
logger.warning(f"[delete_environment][Coherence:Failed] Environment {env_id} not found")
|
|
# [/DEF:delete_environment]
|
|
|
|
# [/DEF:ConfigManager]
|
|
|
|
# [/DEF:ConfigManagerModule]
|