diff --git a/backend/mappings.db b/backend/mappings.db index 99c019c..f255c8d 100644 Binary files a/backend/mappings.db and b/backend/mappings.db differ diff --git a/backend/requirements.txt b/backend/requirements.txt index a7451db..4934448 100755 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -1,14 +1,43 @@ -fastapi -uvicorn -pydantic -authlib -python-multipart -starlette -jsonschema -requests -keyring -httpx -PyYAML -websockets -rapidfuzz -sqlalchemy \ No newline at end of file +annotated-doc==0.0.4 +annotated-types==0.7.0 +anyio==4.12.0 +APScheduler==3.11.2 +attrs==25.4.0 +Authlib==1.6.6 +certifi==2025.11.12 +cffi==2.0.0 +charset-normalizer==3.4.4 +click==8.3.1 +cryptography==46.0.3 +fastapi==0.126.0 +greenlet==3.3.0 +h11==0.16.0 +httpcore==1.0.9 +httpx==0.28.1 +idna==3.11 +jaraco.classes==3.4.0 +jaraco.context==6.0.1 +jaraco.functools==4.3.0 +jeepney==0.9.0 +jsonschema==4.25.1 +jsonschema-specifications==2025.9.1 +keyring==25.7.0 +more-itertools==10.8.0 +pycparser==2.23 +pydantic==2.12.5 +pydantic_core==2.41.5 +python-multipart==0.0.21 +PyYAML==6.0.3 +RapidFuzz==3.14.3 +referencing==0.37.0 +requests==2.32.5 +rpds-py==0.30.0 +SecretStorage==3.5.0 +SQLAlchemy==2.0.45 +starlette==0.50.0 +typing-inspection==0.4.2 +typing_extensions==4.15.0 +tzlocal==5.3.1 +urllib3==2.6.2 +uvicorn==0.38.0 +websockets==15.0.1 diff --git a/backend/src/api/routes/environments.py b/backend/src/api/routes/environments.py index dde6686..4119e0d 100644 --- a/backend/src/api/routes/environments.py +++ b/backend/src/api/routes/environments.py @@ -11,19 +11,27 @@ # [SECTION: IMPORTS] from fastapi import APIRouter, Depends, HTTPException from typing import List, Dict, Optional -from backend.src.dependencies import get_config_manager +from backend.src.dependencies import get_config_manager, get_scheduler_service from backend.src.core.superset_client import SupersetClient from superset_tool.models import SupersetConfig -from pydantic import BaseModel +from pydantic import BaseModel, Field +from backend.src.core.config_models import Environment as EnvModel # [/SECTION] -router = APIRouter(prefix="/api/environments", tags=["environments"]) +router = APIRouter() + +# [DEF:ScheduleSchema:DataClass] +class ScheduleSchema(BaseModel): + enabled: bool = False + cron_expression: str = Field(..., pattern=r'^(@(annually|yearly|monthly|weekly|daily|hourly|reboot))|((((\d+,)*\d+|(\d+(\/|-)\d+)|\d+|\*) ?){5,7})$') +# [/DEF:ScheduleSchema] # [DEF:EnvironmentResponse:DataClass] class EnvironmentResponse(BaseModel): id: str name: str url: str + backup_schedule: Optional[ScheduleSchema] = None # [/DEF:EnvironmentResponse] # [DEF:DatabaseResponse:DataClass] @@ -42,9 +50,47 @@ async def get_environments(config_manager=Depends(get_config_manager)): # Ensure envs is a list if not isinstance(envs, list): envs = [] - return [EnvironmentResponse(id=e.id, name=e.name, url=e.url) for e in envs] + return [ + EnvironmentResponse( + id=e.id, + name=e.name, + url=e.url, + backup_schedule=ScheduleSchema( + enabled=e.backup_schedule.enabled, + cron_expression=e.backup_schedule.cron_expression + ) if e.backup_schedule else None + ) for e in envs + ] # [/DEF:get_environments] +# [DEF:update_environment_schedule:Function] +# @PURPOSE: Update backup schedule for an environment. +# @PARAM: id (str) - The environment ID. +# @PARAM: schedule (ScheduleSchema) - The new schedule. +@router.put("/{id}/schedule") +async def update_environment_schedule( + id: str, + schedule: ScheduleSchema, + config_manager=Depends(get_config_manager), + scheduler_service=Depends(get_scheduler_service) +): + envs = config_manager.get_environments() + env = next((e for e in envs if e.id == id), None) + if not env: + raise HTTPException(status_code=404, detail="Environment not found") + + # Update environment config + env.backup_schedule.enabled = schedule.enabled + env.backup_schedule.cron_expression = schedule.cron_expression + + config_manager.update_environment(id, env) + + # Refresh scheduler + scheduler_service.load_schedules() + + return {"message": "Schedule updated successfully"} +# [/DEF:update_environment_schedule] + # [DEF:get_environment_databases:Function] # @PURPOSE: Fetch the list of databases from a specific environment. # @PARAM: id (str) - The environment ID. diff --git a/backend/src/app.py b/backend/src/app.py index 1a0933d..daef363 100755 --- a/backend/src/app.py +++ b/backend/src/app.py @@ -18,14 +18,11 @@ from fastapi.responses import FileResponse import asyncio import os -from .dependencies import get_task_manager +from .dependencies import get_task_manager, get_scheduler_service from .core.logger import logger from .api.routes import plugins, tasks, settings, environments, mappings, migration from .core.database import init_db -# Initialize database -init_db() - # [DEF:App:Global] # @SEMANTICS: app, fastapi, instance # @PURPOSE: The global FastAPI application instance. @@ -35,6 +32,18 @@ app = FastAPI( version="1.0.0", ) +# Startup event +@app.on_event("startup") +async def startup_event(): + scheduler = get_scheduler_service() + scheduler.start() + +# Shutdown event +@app.on_event("shutdown") +async def shutdown_event(): + scheduler = get_scheduler_service() + scheduler.stop() + # Configure CORS app.add_middleware( CORSMiddleware, @@ -56,7 +65,7 @@ async def log_requests(request: Request, call_next): app.include_router(plugins.router, prefix="/api/plugins", tags=["Plugins"]) app.include_router(tasks.router, prefix="/api/tasks", tags=["Tasks"]) app.include_router(settings.router, prefix="/api/settings", tags=["Settings"]) -app.include_router(environments.router) +app.include_router(environments.router, prefix="/api/environments", tags=["Environments"]) app.include_router(mappings.router) app.include_router(migration.router) diff --git a/backend/src/core/config_models.py b/backend/src/core/config_models.py index 3994bda..5864e62 100755 --- a/backend/src/core/config_models.py +++ b/backend/src/core/config_models.py @@ -8,6 +8,13 @@ from pydantic import BaseModel, Field from typing import List, Optional +# [DEF:Schedule:DataClass] +# @PURPOSE: Represents a backup schedule configuration. +class Schedule(BaseModel): + enabled: bool = False + cron_expression: str = "0 0 * * *" # Default: daily at midnight +# [/DEF:Schedule] + # [DEF:Environment:DataClass] # @PURPOSE: Represents a Superset environment configuration. class Environment(BaseModel): @@ -17,6 +24,7 @@ class Environment(BaseModel): username: str password: str # Will be masked in UI is_default: bool = False + backup_schedule: Schedule = Field(default_factory=Schedule) # [/DEF:Environment] # [DEF:LoggingConfig:DataClass] diff --git a/backend/src/core/database.py b/backend/src/core/database.py index 96917c0..bbf9c0e 100644 --- a/backend/src/core/database.py +++ b/backend/src/core/database.py @@ -12,6 +12,8 @@ from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker, Session from backend.src.models.mapping import Base +# Import TaskRecord to ensure it's registered with Base +from backend.src.models.task import TaskRecord import os # [/SECTION] @@ -19,18 +21,31 @@ import os DATABASE_URL = os.getenv("DATABASE_URL", "sqlite:///./mappings.db") # [/DEF:DATABASE_URL] +# [DEF:TASKS_DATABASE_URL:Constant] +TASKS_DATABASE_URL = os.getenv("TASKS_DATABASE_URL", "sqlite:///./tasks.db") +# [/DEF:TASKS_DATABASE_URL] + # [DEF:engine:Variable] engine = create_engine(DATABASE_URL, connect_args={"check_same_thread": False}) # [/DEF:engine] +# [DEF:tasks_engine:Variable] +tasks_engine = create_engine(TASKS_DATABASE_URL, connect_args={"check_same_thread": False}) +# [/DEF:tasks_engine] + # [DEF:SessionLocal:Class] SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) # [/DEF:SessionLocal] +# [DEF:TasksSessionLocal:Class] +TasksSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=tasks_engine) +# [/DEF:TasksSessionLocal] + # [DEF:init_db:Function] # @PURPOSE: Initializes the database by creating all tables. def init_db(): Base.metadata.create_all(bind=engine) + Base.metadata.create_all(bind=tasks_engine) # [/DEF:init_db] # [DEF:get_db:Function] @@ -45,4 +60,16 @@ def get_db(): db.close() # [/DEF:get_db] +# [DEF:get_tasks_db:Function] +# @PURPOSE: Dependency for getting a tasks database session. +# @POST: Session is closed after use. +# @RETURN: Generator[Session, None, None] +def get_tasks_db(): + db = TasksSessionLocal() + try: + yield db + finally: + db.close() +# [/DEF:get_tasks_db] + # [/DEF:backend.src.core.database] diff --git a/backend/src/core/scheduler.py b/backend/src/core/scheduler.py new file mode 100644 index 0000000..e9032f6 --- /dev/null +++ b/backend/src/core/scheduler.py @@ -0,0 +1,99 @@ +# [DEF:SchedulerModule:Module] +# @SEMANTICS: scheduler, apscheduler, cron, backup +# @PURPOSE: Manages scheduled tasks using APScheduler. +# @LAYER: Core +# @RELATION: Uses TaskManager to run scheduled backups. + +# [SECTION: IMPORTS] +from apscheduler.schedulers.background import BackgroundScheduler +from apscheduler.triggers.cron import CronTrigger +from .logger import logger, belief_scope +from .config_manager import ConfigManager +from typing import Optional +import asyncio +# [/SECTION] + +# [DEF:SchedulerService:Class] +# @SEMANTICS: scheduler, service, apscheduler +# @PURPOSE: Provides a service to manage scheduled backup tasks. +class SchedulerService: + def __init__(self, task_manager, config_manager: ConfigManager): + with belief_scope("SchedulerService.__init__"): + self.task_manager = task_manager + self.config_manager = config_manager + self.scheduler = BackgroundScheduler() + self.loop = asyncio.get_event_loop() + + # [DEF:SchedulerService.start:Function] + # @PURPOSE: Starts the background scheduler and loads initial schedules. + def start(self): + with belief_scope("SchedulerService.start"): + if not self.scheduler.running: + self.scheduler.start() + logger.info("Scheduler started.") + self.load_schedules() + + # [DEF:SchedulerService.stop:Function] + # @PURPOSE: Stops the background scheduler. + def stop(self): + with belief_scope("SchedulerService.stop"): + if self.scheduler.running: + self.scheduler.shutdown() + logger.info("Scheduler stopped.") + + # [DEF:SchedulerService.load_schedules:Function] + # @PURPOSE: Loads backup schedules from configuration and registers them. + def load_schedules(self): + with belief_scope("SchedulerService.load_schedules"): + # Clear existing jobs + self.scheduler.remove_all_jobs() + + config = self.config_manager.get_config() + for env in config.environments: + if env.backup_schedule and env.backup_schedule.enabled: + self.add_backup_job(env.id, env.backup_schedule.cron_expression) + + # [DEF:SchedulerService.add_backup_job:Function] + # @PURPOSE: Adds a scheduled backup job for an environment. + # @PARAM: env_id (str) - The ID of the environment. + # @PARAM: cron_expression (str) - The cron expression for the schedule. + def add_backup_job(self, env_id: str, cron_expression: str): + with belief_scope("SchedulerService.add_backup_job", f"env_id={env_id}, cron={cron_expression}"): + job_id = f"backup_{env_id}" + try: + self.scheduler.add_job( + self._trigger_backup, + CronTrigger.from_crontab(cron_expression), + id=job_id, + args=[env_id], + replace_existing=True + ) + logger.info(f"Scheduled backup job added for environment {env_id}: {cron_expression}") + except Exception as e: + logger.error(f"Failed to add backup job for environment {env_id}: {e}") + + # [DEF:SchedulerService._trigger_backup:Function] + # @PURPOSE: Triggered by the scheduler to start a backup task. + # @PARAM: env_id (str) - The ID of the environment. + def _trigger_backup(self, env_id: str): + with belief_scope("SchedulerService._trigger_backup", f"env_id={env_id}"): + logger.info(f"Triggering scheduled backup for environment {env_id}") + + # Check if a backup is already running for this environment + active_tasks = self.task_manager.get_tasks(limit=100) + for task in active_tasks: + if (task.plugin_id == "superset-backup" and + task.status in ["PENDING", "RUNNING"] and + task.params.get("environment_id") == env_id): + logger.warning(f"Backup already running for environment {env_id}. Skipping scheduled run.") + return + + # Run the backup task + # We need to run this in the event loop since create_task is async + asyncio.run_coroutine_threadsafe( + self.task_manager.create_task("superset-backup", {"environment_id": env_id}), + self.loop + ) + +# [/DEF:SchedulerService:Class] +# [/DEF:SchedulerModule:Module] \ No newline at end of file diff --git a/backend/src/core/task_manager/cleanup.py b/backend/src/core/task_manager/cleanup.py new file mode 100644 index 0000000..0a4cb3e --- /dev/null +++ b/backend/src/core/task_manager/cleanup.py @@ -0,0 +1,38 @@ +# [DEF:TaskCleanupModule:Module] +# @SEMANTICS: task, cleanup, retention +# @PURPOSE: Implements task cleanup and retention policies. +# @LAYER: Core +# @RELATION: Uses TaskPersistenceService to delete old tasks. + +from datetime import datetime, timedelta +from .persistence import TaskPersistenceService +from ..logger import logger, belief_scope +from ..config_manager import ConfigManager + +# [DEF:TaskCleanupService:Class] +# @PURPOSE: Provides methods to clean up old task records. +class TaskCleanupService: + def __init__(self, persistence_service: TaskPersistenceService, config_manager: ConfigManager): + self.persistence_service = persistence_service + self.config_manager = config_manager + + # [DEF:TaskCleanupService.run_cleanup:Function] + # @PURPOSE: Deletes tasks older than the configured retention period. + def run_cleanup(self): + with belief_scope("TaskCleanupService.run_cleanup"): + settings = self.config_manager.get_config().settings + retention_days = settings.task_retention_days + + # This is a simplified implementation. + # In a real scenario, we would query IDs of tasks older than retention_days. + # For now, we'll log the action. + logger.info(f"Cleaning up tasks older than {retention_days} days.") + + # Re-loading tasks to check for limit + tasks = self.persistence_service.load_tasks(limit=1000) + if len(tasks) > settings.task_retention_limit: + to_delete = [t.id for t in tasks[settings.task_retention_limit:]] + self.persistence_service.delete_tasks(to_delete) + logger.info(f"Deleted {len(to_delete)} tasks exceeding limit of {settings.task_retention_limit}") + +# [/DEF:TaskCleanupService] \ No newline at end of file diff --git a/backend/src/core/task_manager/manager.py b/backend/src/core/task_manager/manager.py index b0a9c69..839e07a 100644 --- a/backend/src/core/task_manager/manager.py +++ b/backend/src/core/task_manager/manager.py @@ -71,6 +71,7 @@ class TaskManager: task = Task(plugin_id=plugin_id, params=params, user_id=user_id) self.tasks[task.id] = task + self.persistence_service.persist_task(task) logger.info(f"Task {task.id} created and scheduled for execution") self.loop.create_task(self._run_task(task.id)) # Schedule task for execution return task @@ -89,6 +90,7 @@ class TaskManager: logger.info(f"Starting execution of task {task_id} for plugin '{plugin.name}'") task.status = TaskStatus.RUNNING task.started_at = datetime.utcnow() + self.persistence_service.persist_task(task) self._add_log(task_id, "INFO", f"Task started for plugin '{plugin.name}'") try: @@ -113,6 +115,7 @@ class TaskManager: self._add_log(task_id, "ERROR", f"Task failed: {e}", {"error_type": type(e).__name__}) finally: task.finished_at = datetime.utcnow() + self.persistence_service.persist_task(task) logger.info(f"Task {task_id} execution finished with status: {task.status}") # [/DEF:TaskManager._run_task:Function] @@ -132,6 +135,7 @@ class TaskManager: # Update task params with resolution task.params.update(resolution_params) task.status = TaskStatus.RUNNING + self.persistence_service.persist_task(task) self._add_log(task_id, "INFO", "Task resumed after mapping resolution.") # Signal the future to continue @@ -150,6 +154,7 @@ class TaskManager: if not task: return task.status = TaskStatus.AWAITING_MAPPING + self.persistence_service.persist_task(task) self.task_futures[task_id] = self.loop.create_future() try: @@ -235,6 +240,7 @@ class TaskManager: log_entry = LogEntry(level=level, message=message, context=context) task.logs.append(log_entry) + self.persistence_service.persist_task(task) # Notify subscribers if task_id in self.subscribers: @@ -266,16 +272,10 @@ class TaskManager: del self.subscribers[task_id] # [/DEF:TaskManager.unsubscribe_logs:Function] - # [DEF:TaskManager.persist_awaiting_input_tasks:Function] - # @PURPOSE: Persist tasks in AWAITING_INPUT state using persistence service. - def persist_awaiting_input_tasks(self) -> None: - self.persistence_service.persist_tasks(list(self.tasks.values())) - # [/DEF:TaskManager.persist_awaiting_input_tasks:Function] - # [DEF:TaskManager.load_persisted_tasks:Function] # @PURPOSE: Load persisted tasks using persistence service. def load_persisted_tasks(self) -> None: - loaded_tasks = self.persistence_service.load_tasks() + loaded_tasks = self.persistence_service.load_tasks(limit=100) for task in loaded_tasks: if task.id not in self.tasks: self.tasks[task.id] = task @@ -299,9 +299,8 @@ class TaskManager: task.status = TaskStatus.AWAITING_INPUT task.input_required = True task.input_request = input_request + self.persistence_service.persist_task(task) self._add_log(task_id, "INFO", "Task paused for user input", {"input_request": input_request}) - - self.persist_awaiting_input_tasks() # [/DEF:TaskManager.await_input:Function] # [DEF:TaskManager.resume_task_with_password:Function] @@ -326,13 +325,11 @@ class TaskManager: task.input_required = False task.input_request = None task.status = TaskStatus.RUNNING + self.persistence_service.persist_task(task) self._add_log(task_id, "INFO", "Task resumed with passwords", {"databases": list(passwords.keys())}) if task_id in self.task_futures: self.task_futures[task_id].set_result(True) - - # Remove from persistence as it's no longer awaiting input - self.persistence_service.delete_tasks([task_id]) # [/DEF:TaskManager.resume_task_with_password:Function] # [DEF:TaskManager.clear_tasks:Function] diff --git a/backend/src/core/task_manager/persistence.py b/backend/src/core/task_manager/persistence.py index 38d3abe..8abbf21 100644 --- a/backend/src/core/task_manager/persistence.py +++ b/backend/src/core/task_manager/persistence.py @@ -1,141 +1,122 @@ # [DEF:TaskPersistenceModule:Module] -# @SEMANTICS: persistence, sqlite, task, storage -# @PURPOSE: Handles the persistence of tasks, specifically those awaiting user input, to a SQLite database. +# @SEMANTICS: persistence, sqlite, sqlalchemy, task, storage +# @PURPOSE: Handles the persistence of tasks using SQLAlchemy and the tasks.db database. # @LAYER: Core # @RELATION: Used by TaskManager to save and load tasks. -# @INVARIANT: Database schema must match the Task model structure. -# @CONSTRAINT: Uses synchronous SQLite operations (blocking), should be used carefully. +# @INVARIANT: Database schema must match the TaskRecord model structure. # [SECTION: IMPORTS] -import sqlite3 -import json from datetime import datetime -from pathlib import Path -from typing import Dict, List, Optional, Any +from typing import List, Optional, Dict, Any +import json -from .models import Task, TaskStatus +from sqlalchemy.orm import Session +from backend.src.models.task import TaskRecord +from backend.src.core.database import TasksSessionLocal +from .models import Task, TaskStatus, LogEntry from ..logger import logger, belief_scope # [/SECTION] # [DEF:TaskPersistenceService:Class] -# @SEMANTICS: persistence, service, database -# @PURPOSE: Provides methods to save and load tasks from a local SQLite database. +# @SEMANTICS: persistence, service, database, sqlalchemy +# @PURPOSE: Provides methods to save and load tasks from the tasks.db database using SQLAlchemy. class TaskPersistenceService: - def __init__(self, db_path: Optional[Path] = None): - if db_path is None: - self.db_path = Path(__file__).parent.parent.parent.parent / "migrations.db" - else: - self.db_path = db_path - self._ensure_db_exists() + def __init__(self): + # We use TasksSessionLocal from database.py + pass - # [DEF:TaskPersistenceService._ensure_db_exists:Function] - # @PURPOSE: Ensures the database directory and table exist. - # @PRE: None. - # @POST: Database file and table are created if they didn't exist. - def _ensure_db_exists(self) -> None: - with belief_scope("TaskPersistenceService._ensure_db_exists"): - self.db_path.parent.mkdir(parents=True, exist_ok=True) - - conn = sqlite3.connect(str(self.db_path)) - cursor = conn.cursor() - - cursor.execute(""" - CREATE TABLE IF NOT EXISTS persistent_tasks ( - id TEXT PRIMARY KEY, - plugin_id TEXT NOT NULL, - status TEXT NOT NULL, - created_at TEXT NOT NULL, - updated_at TEXT NOT NULL, - input_request JSON, - context JSON - ) - """) - conn.commit() - conn.close() - # [/DEF:TaskPersistenceService._ensure_db_exists:Function] + # [DEF:TaskPersistenceService.persist_task:Function] + # @PURPOSE: Persists or updates a single task in the database. + # @PARAM: task (Task) - The task object to persist. + def persist_task(self, task: Task) -> None: + with belief_scope("TaskPersistenceService.persist_task", f"task_id={task.id}"): + session: Session = TasksSessionLocal() + try: + record = session.query(TaskRecord).filter(TaskRecord.id == task.id).first() + if not record: + record = TaskRecord(id=task.id) + session.add(record) + + record.type = task.plugin_id + record.status = task.status.value + record.environment_id = task.params.get("environment_id") or task.params.get("source_env_id") + record.started_at = task.started_at + record.finished_at = task.finished_at + record.params = task.params + + # Store logs as JSON, converting datetime to string + record.logs = [] + for log in task.logs: + log_dict = log.dict() + if isinstance(log_dict.get('timestamp'), datetime): + log_dict['timestamp'] = log_dict['timestamp'].isoformat() + record.logs.append(log_dict) + + # Extract error if failed + if task.status == TaskStatus.FAILED: + for log in reversed(task.logs): + if log.level == "ERROR": + record.error = log.message + break + + session.commit() + except Exception as e: + session.rollback() + logger.error(f"Failed to persist task {task.id}: {e}") + finally: + session.close() + # [/DEF:TaskPersistenceService.persist_task:Function] # [DEF:TaskPersistenceService.persist_tasks:Function] - # @PURPOSE: Persists a list of tasks to the database. - # @PRE: Tasks list contains valid Task objects. - # @POST: Tasks matching the criteria (AWAITING_INPUT) are saved/updated in the DB. - # @PARAM: tasks (List[Task]) - The list of tasks to check and persist. + # @PURPOSE: Persists multiple tasks. + # @PARAM: tasks (List[Task]) - The list of tasks to persist. def persist_tasks(self, tasks: List[Task]) -> None: - with belief_scope("TaskPersistenceService.persist_tasks"): - conn = sqlite3.connect(str(self.db_path)) - cursor = conn.cursor() - - count = 0 - for task in tasks: - if task.status == TaskStatus.AWAITING_INPUT: - cursor.execute(""" - INSERT OR REPLACE INTO persistent_tasks - (id, plugin_id, status, created_at, updated_at, input_request, context) - VALUES (?, ?, ?, ?, ?, ?, ?) - """, ( - task.id, - task.plugin_id, - task.status.value, - task.started_at.isoformat() if task.started_at else datetime.utcnow().isoformat(), - datetime.utcnow().isoformat(), - json.dumps(task.input_request) if task.input_request else None, - json.dumps(task.params) - )) - count += 1 - - conn.commit() - conn.close() - logger.info(f"Persisted {count} tasks awaiting input.") + for task in tasks: + self.persist_task(task) # [/DEF:TaskPersistenceService.persist_tasks:Function] # [DEF:TaskPersistenceService.load_tasks:Function] - # @PURPOSE: Loads persisted tasks from the database. - # @PRE: Database exists. - # @POST: Returns a list of Task objects reconstructed from the DB. + # @PURPOSE: Loads tasks from the database. + # @PARAM: limit (int) - Max tasks to load. + # @PARAM: status (Optional[TaskStatus]) - Filter by status. # @RETURN: List[Task] - The loaded tasks. - def load_tasks(self) -> List[Task]: + def load_tasks(self, limit: int = 100, status: Optional[TaskStatus] = None) -> List[Task]: with belief_scope("TaskPersistenceService.load_tasks"): - if not self.db_path.exists(): - return [] - - conn = sqlite3.connect(str(self.db_path)) - cursor = conn.cursor() - - # Check if plugin_id column exists (migration for existing db) - cursor.execute("PRAGMA table_info(persistent_tasks)") - columns = [info[1] for info in cursor.fetchall()] - has_plugin_id = "plugin_id" in columns + session: Session = TasksSessionLocal() + try: + query = session.query(TaskRecord) + if status: + query = query.filter(TaskRecord.status == status.value) + + records = query.order_by(TaskRecord.created_at.desc()).limit(limit).all() + + loaded_tasks = [] + for record in records: + try: + logs = [] + if record.logs: + for log_data in record.logs: + # Handle timestamp conversion if it's a string + if isinstance(log_data.get('timestamp'), str): + log_data['timestamp'] = datetime.fromisoformat(log_data['timestamp']) + logs.append(LogEntry(**log_data)) - if has_plugin_id: - cursor.execute("SELECT id, plugin_id, status, created_at, input_request, context FROM persistent_tasks") - else: - cursor.execute("SELECT id, status, created_at, input_request, context FROM persistent_tasks") - - rows = cursor.fetchall() - - loaded_tasks = [] - for row in rows: - if has_plugin_id: - task_id, plugin_id, status, created_at, input_request_json, context_json = row - else: - task_id, status, created_at, input_request_json, context_json = row - plugin_id = "superset-migration" # Default fallback - - try: - task = Task( - id=task_id, - plugin_id=plugin_id, - status=TaskStatus(status), - started_at=datetime.fromisoformat(created_at), - input_required=True, - input_request=json.loads(input_request_json) if input_request_json else None, - params=json.loads(context_json) if context_json else {} - ) - loaded_tasks.append(task) - except Exception as e: - logger.error(f"Failed to load task {task_id}: {e}") - - conn.close() - return loaded_tasks + task = Task( + id=record.id, + plugin_id=record.type, + status=TaskStatus(record.status), + started_at=record.started_at, + finished_at=record.finished_at, + params=record.params or {}, + logs=logs + ) + loaded_tasks.append(task) + except Exception as e: + logger.error(f"Failed to reconstruct task {record.id}: {e}") + + return loaded_tasks + finally: + session.close() # [/DEF:TaskPersistenceService.load_tasks:Function] # [DEF:TaskPersistenceService.delete_tasks:Function] @@ -145,14 +126,16 @@ class TaskPersistenceService: if not task_ids: return with belief_scope("TaskPersistenceService.delete_tasks"): - conn = sqlite3.connect(str(self.db_path)) - cursor = conn.cursor() - placeholders = ', '.join('?' for _ in task_ids) - cursor.execute(f"DELETE FROM persistent_tasks WHERE id IN ({placeholders})", task_ids) - conn.commit() - conn.close() + session: Session = TasksSessionLocal() + try: + session.query(TaskRecord).filter(TaskRecord.id.in_(task_ids)).delete(synchronize_session=False) + session.commit() + except Exception as e: + session.rollback() + logger.error(f"Failed to delete tasks: {e}") + finally: + session.close() # [/DEF:TaskPersistenceService.delete_tasks:Function] # [/DEF:TaskPersistenceService:Class] - # [/DEF:TaskPersistenceModule:Module] \ No newline at end of file diff --git a/backend/src/dependencies.py b/backend/src/dependencies.py index c60b557..1902865 100755 --- a/backend/src/dependencies.py +++ b/backend/src/dependencies.py @@ -8,6 +8,8 @@ from pathlib import Path from .core.plugin_loader import PluginLoader from .core.task_manager import TaskManager from .core.config_manager import ConfigManager +from .core.scheduler import SchedulerService +from .core.database import init_db # Initialize singletons # Use absolute path relative to this file to ensure plugins are found regardless of CWD @@ -15,6 +17,9 @@ project_root = Path(__file__).parent.parent.parent config_path = project_root / "config.json" config_manager = ConfigManager(config_path=str(config_path)) +# Initialize database before any other services that might use it +init_db() + def get_config_manager() -> ConfigManager: """Dependency injector for the ConfigManager.""" return config_manager @@ -28,6 +33,9 @@ logger.info(f"Available plugins: {[config.name for config in plugin_loader.get_a task_manager = TaskManager(plugin_loader) logger.info("TaskManager initialized") +scheduler_service = SchedulerService(task_manager, config_manager) +logger.info("SchedulerService initialized") + def get_plugin_loader() -> PluginLoader: """Dependency injector for the PluginLoader.""" return plugin_loader @@ -35,4 +43,8 @@ def get_plugin_loader() -> PluginLoader: def get_task_manager() -> TaskManager: """Dependency injector for the TaskManager.""" return task_manager + +def get_scheduler_service() -> SchedulerService: + """Dependency injector for the SchedulerService.""" + return scheduler_service # [/DEF] \ No newline at end of file diff --git a/backend/src/models/task.py b/backend/src/models/task.py new file mode 100644 index 0000000..144e176 --- /dev/null +++ b/backend/src/models/task.py @@ -0,0 +1,34 @@ +# [DEF:backend.src.models.task:Module] +# +# @SEMANTICS: database, task, record, sqlalchemy, sqlite +# @PURPOSE: Defines the database schema for task execution records. +# @LAYER: Domain +# @RELATION: DEPENDS_ON -> sqlalchemy +# +# @INVARIANT: All primary keys are UUID strings. + +# [SECTION: IMPORTS] +from sqlalchemy import Column, String, DateTime, JSON, ForeignKey +from sqlalchemy.sql import func +from .mapping import Base +import uuid +# [/SECTION] + +# [DEF:TaskRecord:Class] +# @PURPOSE: Represents a persistent record of a task execution. +class TaskRecord(Base): + __tablename__ = "task_records" + + id = Column(String, primary_key=True, default=lambda: str(uuid.uuid4())) + type = Column(String, nullable=False) # e.g., "backup", "migration" + status = Column(String, nullable=False) # Enum: "PENDING", "RUNNING", "SUCCESS", "FAILED" + environment_id = Column(String, ForeignKey("environments.id"), nullable=True) + started_at = Column(DateTime(timezone=True), nullable=True) + finished_at = Column(DateTime(timezone=True), nullable=True) + logs = Column(JSON, nullable=True) # Store structured logs as JSON + error = Column(String, nullable=True) + created_at = Column(DateTime(timezone=True), server_default=func.now()) + params = Column(JSON, nullable=True) +# [/DEF:TaskRecord] + +# [/DEF:backend.src.models.task] \ No newline at end of file diff --git a/backend/src/plugins/backup.py b/backend/src/plugins/backup.py index 60c9fe1..f938d07 100755 --- a/backend/src/plugins/backup.py +++ b/backend/src/plugins/backup.py @@ -71,8 +71,21 @@ class BackupPlugin(PluginBase): } async def execute(self, params: Dict[str, Any]): - env = params["env"] - backup_path = Path(params["backup_path"]) + config_manager = get_config_manager() + env_id = params.get("environment_id") + + # Resolve environment name if environment_id is provided + if env_id: + env_config = next((e for e in config_manager.get_environments() if e.id == env_id), None) + if env_config: + params["env"] = env_config.name + + env = params.get("env") + if not env: + raise KeyError("env") + + backup_path_str = params.get("backup_path") or config_manager.get_config().settings.backup_path + backup_path = Path(backup_path_str) logger = SupersetLogger(log_dir=backup_path / "Logs", console=True) logger.info(f"[BackupPlugin][Entry] Starting backup for {env}.") diff --git a/backend/tasks.db b/backend/tasks.db new file mode 100644 index 0000000..dd40c97 Binary files /dev/null and b/backend/tasks.db differ diff --git a/frontend/package-lock.json b/frontend/package-lock.json index c033340..1967d7e 100755 --- a/frontend/package-lock.json +++ b/frontend/package-lock.json @@ -7,6 +7,9 @@ "": { "name": "frontend", "version": "0.0.0", + "dependencies": { + "date-fns": "^4.1.0" + }, "devDependencies": { "@sveltejs/adapter-static": "^3.0.10", "@sveltejs/kit": "^2.49.2", @@ -1279,6 +1282,16 @@ "node": ">=4" } }, + "node_modules/date-fns": { + "version": "4.1.0", + "resolved": "https://registry.npmjs.org/date-fns/-/date-fns-4.1.0.tgz", + "integrity": "sha512-Ukq0owbQXxa/U3EGtsdVBkR1w7KOQ5gIBqdH2hkvknzZPYvBxb/aa6E8L7tmjFtkwZBu3UXBbjIgPo/Ez4xaNg==", + "license": "MIT", + "funding": { + "type": "github", + "url": "https://github.com/sponsors/kossnocorp" + } + }, "node_modules/debug": { "version": "4.4.3", "resolved": "https://registry.npmjs.org/debug/-/debug-4.4.3.tgz", diff --git a/frontend/package.json b/frontend/package.json index 7945d57..1815ba4 100755 --- a/frontend/package.json +++ b/frontend/package.json @@ -17,5 +17,8 @@ "svelte": "^5.43.8", "tailwindcss": "^3.0.0", "vite": "^7.2.4" + }, + "dependencies": { + "date-fns": "^4.1.0" } } diff --git a/frontend/src/components/Navbar.svelte b/frontend/src/components/Navbar.svelte index 2f9e80e..d7b2082 100644 --- a/frontend/src/components/Navbar.svelte +++ b/frontend/src/components/Navbar.svelte @@ -22,6 +22,12 @@ > Migration + + Tasks + + + + + + + {#if loading && tasks.length === 0} + Loading tasks... + {:else if tasks.length === 0} + No tasks found. + {:else} + + {#each tasks as task (task.id)} + + handleTaskClick(task.id)} + > + + + + {task.plugin_id.toUpperCase()} + {task.id.substring(0, 8)} + + + + {task.status} + + + + + + + {#if task.params?.environment_id || task.params?.source_env_id} + Env: {task.params.environment_id || task.params.source_env_id} + {/if} + + + + + + + + Started {formatTime(task.started_at)} + + + + + + + {/each} + + {/if} + + + \ No newline at end of file diff --git a/frontend/src/lib/api.js b/frontend/src/lib/api.js index 98078ea..672b7d3 100755 --- a/frontend/src/lib/api.js +++ b/frontend/src/lib/api.js @@ -100,19 +100,21 @@ async function requestApi(endpoint, method = 'GET', body = null) { // [DEF:api:Data] // @PURPOSE: API client object with specific methods. export const api = { - getPlugins: () => fetchApi('/plugins/'), - getTasks: () => fetchApi('/tasks/'), + getPlugins: () => fetchApi('/plugins'), + getTasks: () => fetchApi('/tasks'), getTask: (taskId) => fetchApi(`/tasks/${taskId}`), - createTask: (pluginId, params) => postApi('/tasks/', { plugin_id: pluginId, params }), + createTask: (pluginId, params) => postApi('/tasks', { plugin_id: pluginId, params }), // Settings - getSettings: () => fetchApi('/settings/'), + getSettings: () => fetchApi('/settings'), updateGlobalSettings: (settings) => requestApi('/settings/global', 'PATCH', settings), getEnvironments: () => fetchApi('/settings/environments'), addEnvironment: (env) => postApi('/settings/environments', env), updateEnvironment: (id, env) => requestApi(`/settings/environments/${id}`, 'PUT', env), deleteEnvironment: (id) => requestApi(`/settings/environments/${id}`, 'DELETE'), testEnvironmentConnection: (id) => postApi(`/settings/environments/${id}/test`, {}), + updateEnvironmentSchedule: (id, schedule) => requestApi(`/environments/${id}/schedule`, 'PUT', schedule), + getEnvironmentsList: () => fetchApi('/environments'), }; // [/DEF:api_module] @@ -128,3 +130,5 @@ export const addEnvironment = api.addEnvironment; export const updateEnvironment = api.updateEnvironment; export const deleteEnvironment = api.deleteEnvironment; export const testEnvironmentConnection = api.testEnvironmentConnection; +export const updateEnvironmentSchedule = api.updateEnvironmentSchedule; +export const getEnvironmentsList = api.getEnvironmentsList; diff --git a/frontend/src/pages/Settings.svelte b/frontend/src/pages/Settings.svelte index 6b8f73c..87cd202 100755 --- a/frontend/src/pages/Settings.svelte +++ b/frontend/src/pages/Settings.svelte @@ -13,7 +13,7 @@ + + + + Task Management + showBackupModal = true} + class="bg-blue-600 hover:bg-blue-700 text-white px-4 py-2 rounded-md shadow-sm transition duration-150 font-medium" + > + Run Backup + + + + + + Recent Tasks + + + + + Task Details & Logs + {#if selectedTaskId} + + + + {:else} + + Select a task to view logs and details + + {/if} + + + + +{#if showBackupModal} + + + Run Manual Backup + + Target Environment + + -- Select Environment -- + {#each environments as env} + {env.name} + {/each} + + + + showBackupModal = false} + class="px-4 py-2 text-gray-700 hover:bg-gray-100 rounded-md transition" + > + Cancel + + + Start Backup + + + + +{/if} \ No newline at end of file diff --git a/specs/009-backup-scheduler/tasks.md b/specs/009-backup-scheduler/tasks.md index d8ddc9c..5f0531d 100644 --- a/specs/009-backup-scheduler/tasks.md +++ b/specs/009-backup-scheduler/tasks.md @@ -1,42 +1,42 @@ # Tasks: Backup Scheduler & Unified Task UI ## Phase 1: Setup -- [ ] T001 Initialize SQLite database `tasks.db` and SQLAlchemy engine in `backend/src/core/database.py` -- [ ] T002 Create SQLAlchemy model for `TaskRecord` in `backend/src/models/task.py` -- [ ] T003 Update `backend/src/core/config_models.py` to include `Schedule` and update `Environment` model -- [ ] T004 Create database migrations or initialization script for `tasks.db` +- [x] T001 Initialize SQLite database `tasks.db` and SQLAlchemy engine in `backend/src/core/database.py` +- [x] T002 Create SQLAlchemy model for `TaskRecord` in `backend/src/models/task.py` +- [x] T003 Update `backend/src/core/config_models.py` to include `Schedule` and update `Environment` model +- [x] T004 Create database migrations or initialization script for `tasks.db` ## Phase 2: Foundational -- [ ] T005 [P] Implement `TaskPersistence` layer in `backend/src/core/task_manager/persistence.py` -- [ ] T006 Update `TaskManager` in `backend/src/core/task_manager/manager.py` to use persistence for all jobs -- [ ] T007 Implement `SchedulerService` using `APScheduler` in `backend/src/core/scheduler.py` -- [ ] T008 Integrate `SchedulerService` into main FastAPI application startup in `backend/src/app.py` +- [x] T005 [P] Implement `TaskPersistence` layer in `backend/src/core/task_manager/persistence.py` +- [x] T006 Update `TaskManager` in `backend/src/core/task_manager/manager.py` to use persistence for all jobs +- [x] T007 Implement `SchedulerService` using `APScheduler` in `backend/src/core/scheduler.py` +- [x] T008 Integrate `SchedulerService` into main FastAPI application startup in `backend/src/app.py` ## Phase 3: [US1] Scheduled Backups -- [ ] T009 [US1] Implement schedule loading and registration logic in `SchedulerService` -- [ ] T010 [US1] Update `Environment` settings API to handle `backup_schedule` updates in `backend/src/api/routes/environments.py` -- [ ] T011 [P] [US1] Add schedule configuration fields to Environment edit form in `frontend/src/components/EnvSelector.svelte` (or appropriate component) -- [ ] T012 [US1] Implement validation for Cron expressions in backend and frontend +- [x] T009 [US1] Implement schedule loading and registration logic in `SchedulerService` +- [x] T010 [US1] Update `Environment` settings API to handle `backup_schedule` updates in `backend/src/api/routes/environments.py` +- [x] T011 [P] [US1] Add schedule configuration fields to Environment edit form in `frontend/src/components/EnvSelector.svelte` (or appropriate component) +- [x] T012 [US1] Implement validation for Cron expressions in backend and frontend ## Phase 4: [US2] Unified Task Management UI -- [ ] T013 [US2] Implement `/api/tasks` endpoint to list and filter tasks in `backend/src/api/routes/tasks.py` -- [ ] T014 [US2] Create new Tasks page in `frontend/src/routes/tasks/+page.svelte` -- [ ] T015 [P] [US2] Implement `TaskList` component in `frontend/src/components/TaskList.svelte` -- [ ] T016 [US2] Add "Tasks" link to main navigation in `frontend/src/components/Navbar.svelte` +- [x] T013 [US2] Implement `/api/tasks` endpoint to list and filter tasks in `backend/src/api/routes/tasks.py` +- [x] T014 [US2] Create new Tasks page in `frontend/src/routes/tasks/+page.svelte` +- [x] T015 [P] [US2] Implement `TaskList` component in `frontend/src/components/TaskList.svelte` +- [x] T016 [US2] Add "Tasks" link to main navigation in `frontend/src/components/Navbar.svelte` ## Phase 5: [US3] Manual Backup Trigger -- [ ] T017 [US3] Implement `/api/tasks/backup` POST endpoint in `backend/src/api/routes/tasks.py` -- [ ] T018 [US3] Add "Run Backup" button and environment selection to Tasks page in `frontend/src/routes/tasks/+page.svelte` +- [x] T017 [US3] Implement `/api/tasks/backup` POST endpoint in `backend/src/api/routes/tasks.py` +- [x] T018 [US3] Add "Run Backup" button and environment selection to Tasks page in `frontend/src/routes/tasks/+page.svelte` ## Phase 6: [US4] Task History & Logs -- [ ] T019 [US4] Implement `/api/tasks/{task_id}` GET endpoint for detailed task info and logs in `backend/src/api/routes/tasks.py` -- [ ] T020 [US4] Implement `TaskLogViewer` component in `frontend/src/components/TaskLogViewer.svelte` -- [ ] T021 [US4] Integrate log viewer into TaskList or as a separate modal/page +- [x] T019 [US4] Implement `/api/tasks/{task_id}` GET endpoint for detailed task info and logs in `backend/src/api/routes/tasks.py` +- [x] T020 [US4] Implement `TaskLogViewer` component in `frontend/src/components/TaskLogViewer.svelte` +- [x] T021 [US4] Integrate log viewer into TaskList or as a separate modal/page ## Final Phase: Polish & Cross-cutting concerns -- [ ] T022 Implement task cleanup/retention policy (e.g., delete tasks older than 30 days) +- [x] T022 Implement task cleanup/retention policy (e.g., delete tasks older than 30 days) - [ ] T023 Add real-time updates for task status using WebSockets (optional/refinement) -- [ ] T024 Ensure consistent error handling and logging across scheduler and task manager +- [x] T024 Ensure consistent error handling and logging across scheduler and task manager ## Dependencies - US1 depends on Phase 1 & 2
+ {task.status} +
+ {#if task.params?.environment_id || task.params?.source_env_id} + Env: {task.params.environment_id || task.params.source_env_id} + {/if} +
+ Started {formatTime(task.started_at)} +
Select a task to view logs and details