# [DEF:SchedulerModule:Module] # @SEMANTICS: scheduler, apscheduler, cron, backup # @PURPOSE: Manages scheduled tasks using APScheduler. # @LAYER: Core # @RELATION: Uses TaskManager to run scheduled backups. # [SECTION: IMPORTS] from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.cron import CronTrigger from .logger import logger, belief_scope from .config_manager import ConfigManager from typing import Optional import asyncio # [/SECTION] # [DEF:SchedulerService:Class] # @SEMANTICS: scheduler, service, apscheduler # @PURPOSE: Provides a service to manage scheduled backup tasks. class SchedulerService: def __init__(self, task_manager, config_manager: ConfigManager): with belief_scope("SchedulerService.__init__"): self.task_manager = task_manager self.config_manager = config_manager self.scheduler = BackgroundScheduler() self.loop = asyncio.get_event_loop() # [DEF:SchedulerService.start:Function] # @PURPOSE: Starts the background scheduler and loads initial schedules. def start(self): with belief_scope("SchedulerService.start"): if not self.scheduler.running: self.scheduler.start() logger.info("Scheduler started.") self.load_schedules() # [/DEF:SchedulerService.start:Function] # [DEF:SchedulerService.stop:Function] # @PURPOSE: Stops the background scheduler. def stop(self): with belief_scope("SchedulerService.stop"): if self.scheduler.running: self.scheduler.shutdown() logger.info("Scheduler stopped.") # [/DEF:SchedulerService.stop:Function] # [DEF:SchedulerService.load_schedules:Function] # @PURPOSE: Loads backup schedules from configuration and registers them. def load_schedules(self): with belief_scope("SchedulerService.load_schedules"): # Clear existing jobs self.scheduler.remove_all_jobs() config = self.config_manager.get_config() for env in config.environments: if env.backup_schedule and env.backup_schedule.enabled: self.add_backup_job(env.id, env.backup_schedule.cron_expression) # [/DEF:SchedulerService.load_schedules:Function] # [DEF:SchedulerService.add_backup_job:Function] # @PURPOSE: Adds a scheduled backup job for an environment. # @PARAM: env_id (str) - The ID of the environment. # @PARAM: cron_expression (str) - The cron expression for the schedule. def add_backup_job(self, env_id: str, cron_expression: str): with belief_scope("SchedulerService.add_backup_job", f"env_id={env_id}, cron={cron_expression}"): job_id = f"backup_{env_id}" try: self.scheduler.add_job( self._trigger_backup, CronTrigger.from_crontab(cron_expression), id=job_id, args=[env_id], replace_existing=True ) logger.info(f"Scheduled backup job added for environment {env_id}: {cron_expression}") except Exception as e: logger.error(f"Failed to add backup job for environment {env_id}: {e}") # [/DEF:SchedulerService.add_backup_job:Function] # [DEF:SchedulerService._trigger_backup:Function] # @PURPOSE: Triggered by the scheduler to start a backup task. # @PARAM: env_id (str) - The ID of the environment. def _trigger_backup(self, env_id: str): with belief_scope("SchedulerService._trigger_backup", f"env_id={env_id}"): logger.info(f"Triggering scheduled backup for environment {env_id}") # Check if a backup is already running for this environment active_tasks = self.task_manager.get_tasks(limit=100) for task in active_tasks: if (task.plugin_id == "superset-backup" and task.status in ["PENDING", "RUNNING"] and task.params.get("environment_id") == env_id): logger.warning(f"Backup already running for environment {env_id}. Skipping scheduled run.") return # Run the backup task # We need to run this in the event loop since create_task is async asyncio.run_coroutine_threadsafe( self.task_manager.create_task("superset-backup", {"environment_id": env_id}), self.loop ) # [/DEF:SchedulerService._trigger_backup:Function] # [/DEF:SchedulerService:Class] # [/DEF:SchedulerModule:Module]