backup worked
This commit is contained in:
38
backend/src/core/task_manager/cleanup.py
Normal file
38
backend/src/core/task_manager/cleanup.py
Normal file
@@ -0,0 +1,38 @@
|
||||
# [DEF:TaskCleanupModule:Module]
|
||||
# @SEMANTICS: task, cleanup, retention
|
||||
# @PURPOSE: Implements task cleanup and retention policies.
|
||||
# @LAYER: Core
|
||||
# @RELATION: Uses TaskPersistenceService to delete old tasks.
|
||||
|
||||
from datetime import datetime, timedelta
|
||||
from .persistence import TaskPersistenceService
|
||||
from ..logger import logger, belief_scope
|
||||
from ..config_manager import ConfigManager
|
||||
|
||||
# [DEF:TaskCleanupService:Class]
|
||||
# @PURPOSE: Provides methods to clean up old task records.
|
||||
class TaskCleanupService:
|
||||
def __init__(self, persistence_service: TaskPersistenceService, config_manager: ConfigManager):
|
||||
self.persistence_service = persistence_service
|
||||
self.config_manager = config_manager
|
||||
|
||||
# [DEF:TaskCleanupService.run_cleanup:Function]
|
||||
# @PURPOSE: Deletes tasks older than the configured retention period.
|
||||
def run_cleanup(self):
|
||||
with belief_scope("TaskCleanupService.run_cleanup"):
|
||||
settings = self.config_manager.get_config().settings
|
||||
retention_days = settings.task_retention_days
|
||||
|
||||
# This is a simplified implementation.
|
||||
# In a real scenario, we would query IDs of tasks older than retention_days.
|
||||
# For now, we'll log the action.
|
||||
logger.info(f"Cleaning up tasks older than {retention_days} days.")
|
||||
|
||||
# Re-loading tasks to check for limit
|
||||
tasks = self.persistence_service.load_tasks(limit=1000)
|
||||
if len(tasks) > settings.task_retention_limit:
|
||||
to_delete = [t.id for t in tasks[settings.task_retention_limit:]]
|
||||
self.persistence_service.delete_tasks(to_delete)
|
||||
logger.info(f"Deleted {len(to_delete)} tasks exceeding limit of {settings.task_retention_limit}")
|
||||
|
||||
# [/DEF:TaskCleanupService]
|
||||
@@ -71,6 +71,7 @@ class TaskManager:
|
||||
|
||||
task = Task(plugin_id=plugin_id, params=params, user_id=user_id)
|
||||
self.tasks[task.id] = task
|
||||
self.persistence_service.persist_task(task)
|
||||
logger.info(f"Task {task.id} created and scheduled for execution")
|
||||
self.loop.create_task(self._run_task(task.id)) # Schedule task for execution
|
||||
return task
|
||||
@@ -89,6 +90,7 @@ class TaskManager:
|
||||
logger.info(f"Starting execution of task {task_id} for plugin '{plugin.name}'")
|
||||
task.status = TaskStatus.RUNNING
|
||||
task.started_at = datetime.utcnow()
|
||||
self.persistence_service.persist_task(task)
|
||||
self._add_log(task_id, "INFO", f"Task started for plugin '{plugin.name}'")
|
||||
|
||||
try:
|
||||
@@ -113,6 +115,7 @@ class TaskManager:
|
||||
self._add_log(task_id, "ERROR", f"Task failed: {e}", {"error_type": type(e).__name__})
|
||||
finally:
|
||||
task.finished_at = datetime.utcnow()
|
||||
self.persistence_service.persist_task(task)
|
||||
logger.info(f"Task {task_id} execution finished with status: {task.status}")
|
||||
# [/DEF:TaskManager._run_task:Function]
|
||||
|
||||
@@ -132,6 +135,7 @@ class TaskManager:
|
||||
# Update task params with resolution
|
||||
task.params.update(resolution_params)
|
||||
task.status = TaskStatus.RUNNING
|
||||
self.persistence_service.persist_task(task)
|
||||
self._add_log(task_id, "INFO", "Task resumed after mapping resolution.")
|
||||
|
||||
# Signal the future to continue
|
||||
@@ -150,6 +154,7 @@ class TaskManager:
|
||||
if not task: return
|
||||
|
||||
task.status = TaskStatus.AWAITING_MAPPING
|
||||
self.persistence_service.persist_task(task)
|
||||
self.task_futures[task_id] = self.loop.create_future()
|
||||
|
||||
try:
|
||||
@@ -235,6 +240,7 @@ class TaskManager:
|
||||
|
||||
log_entry = LogEntry(level=level, message=message, context=context)
|
||||
task.logs.append(log_entry)
|
||||
self.persistence_service.persist_task(task)
|
||||
|
||||
# Notify subscribers
|
||||
if task_id in self.subscribers:
|
||||
@@ -266,16 +272,10 @@ class TaskManager:
|
||||
del self.subscribers[task_id]
|
||||
# [/DEF:TaskManager.unsubscribe_logs:Function]
|
||||
|
||||
# [DEF:TaskManager.persist_awaiting_input_tasks:Function]
|
||||
# @PURPOSE: Persist tasks in AWAITING_INPUT state using persistence service.
|
||||
def persist_awaiting_input_tasks(self) -> None:
|
||||
self.persistence_service.persist_tasks(list(self.tasks.values()))
|
||||
# [/DEF:TaskManager.persist_awaiting_input_tasks:Function]
|
||||
|
||||
# [DEF:TaskManager.load_persisted_tasks:Function]
|
||||
# @PURPOSE: Load persisted tasks using persistence service.
|
||||
def load_persisted_tasks(self) -> None:
|
||||
loaded_tasks = self.persistence_service.load_tasks()
|
||||
loaded_tasks = self.persistence_service.load_tasks(limit=100)
|
||||
for task in loaded_tasks:
|
||||
if task.id not in self.tasks:
|
||||
self.tasks[task.id] = task
|
||||
@@ -299,9 +299,8 @@ class TaskManager:
|
||||
task.status = TaskStatus.AWAITING_INPUT
|
||||
task.input_required = True
|
||||
task.input_request = input_request
|
||||
self.persistence_service.persist_task(task)
|
||||
self._add_log(task_id, "INFO", "Task paused for user input", {"input_request": input_request})
|
||||
|
||||
self.persist_awaiting_input_tasks()
|
||||
# [/DEF:TaskManager.await_input:Function]
|
||||
|
||||
# [DEF:TaskManager.resume_task_with_password:Function]
|
||||
@@ -326,13 +325,11 @@ class TaskManager:
|
||||
task.input_required = False
|
||||
task.input_request = None
|
||||
task.status = TaskStatus.RUNNING
|
||||
self.persistence_service.persist_task(task)
|
||||
self._add_log(task_id, "INFO", "Task resumed with passwords", {"databases": list(passwords.keys())})
|
||||
|
||||
if task_id in self.task_futures:
|
||||
self.task_futures[task_id].set_result(True)
|
||||
|
||||
# Remove from persistence as it's no longer awaiting input
|
||||
self.persistence_service.delete_tasks([task_id])
|
||||
# [/DEF:TaskManager.resume_task_with_password:Function]
|
||||
|
||||
# [DEF:TaskManager.clear_tasks:Function]
|
||||
|
||||
@@ -1,141 +1,122 @@
|
||||
# [DEF:TaskPersistenceModule:Module]
|
||||
# @SEMANTICS: persistence, sqlite, task, storage
|
||||
# @PURPOSE: Handles the persistence of tasks, specifically those awaiting user input, to a SQLite database.
|
||||
# @SEMANTICS: persistence, sqlite, sqlalchemy, task, storage
|
||||
# @PURPOSE: Handles the persistence of tasks using SQLAlchemy and the tasks.db database.
|
||||
# @LAYER: Core
|
||||
# @RELATION: Used by TaskManager to save and load tasks.
|
||||
# @INVARIANT: Database schema must match the Task model structure.
|
||||
# @CONSTRAINT: Uses synchronous SQLite operations (blocking), should be used carefully.
|
||||
# @INVARIANT: Database schema must match the TaskRecord model structure.
|
||||
|
||||
# [SECTION: IMPORTS]
|
||||
import sqlite3
|
||||
import json
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Optional, Any
|
||||
from typing import List, Optional, Dict, Any
|
||||
import json
|
||||
|
||||
from .models import Task, TaskStatus
|
||||
from sqlalchemy.orm import Session
|
||||
from backend.src.models.task import TaskRecord
|
||||
from backend.src.core.database import TasksSessionLocal
|
||||
from .models import Task, TaskStatus, LogEntry
|
||||
from ..logger import logger, belief_scope
|
||||
# [/SECTION]
|
||||
|
||||
# [DEF:TaskPersistenceService:Class]
|
||||
# @SEMANTICS: persistence, service, database
|
||||
# @PURPOSE: Provides methods to save and load tasks from a local SQLite database.
|
||||
# @SEMANTICS: persistence, service, database, sqlalchemy
|
||||
# @PURPOSE: Provides methods to save and load tasks from the tasks.db database using SQLAlchemy.
|
||||
class TaskPersistenceService:
|
||||
def __init__(self, db_path: Optional[Path] = None):
|
||||
if db_path is None:
|
||||
self.db_path = Path(__file__).parent.parent.parent.parent / "migrations.db"
|
||||
else:
|
||||
self.db_path = db_path
|
||||
self._ensure_db_exists()
|
||||
def __init__(self):
|
||||
# We use TasksSessionLocal from database.py
|
||||
pass
|
||||
|
||||
# [DEF:TaskPersistenceService._ensure_db_exists:Function]
|
||||
# @PURPOSE: Ensures the database directory and table exist.
|
||||
# @PRE: None.
|
||||
# @POST: Database file and table are created if they didn't exist.
|
||||
def _ensure_db_exists(self) -> None:
|
||||
with belief_scope("TaskPersistenceService._ensure_db_exists"):
|
||||
self.db_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
conn = sqlite3.connect(str(self.db_path))
|
||||
cursor = conn.cursor()
|
||||
|
||||
cursor.execute("""
|
||||
CREATE TABLE IF NOT EXISTS persistent_tasks (
|
||||
id TEXT PRIMARY KEY,
|
||||
plugin_id TEXT NOT NULL,
|
||||
status TEXT NOT NULL,
|
||||
created_at TEXT NOT NULL,
|
||||
updated_at TEXT NOT NULL,
|
||||
input_request JSON,
|
||||
context JSON
|
||||
)
|
||||
""")
|
||||
conn.commit()
|
||||
conn.close()
|
||||
# [/DEF:TaskPersistenceService._ensure_db_exists:Function]
|
||||
# [DEF:TaskPersistenceService.persist_task:Function]
|
||||
# @PURPOSE: Persists or updates a single task in the database.
|
||||
# @PARAM: task (Task) - The task object to persist.
|
||||
def persist_task(self, task: Task) -> None:
|
||||
with belief_scope("TaskPersistenceService.persist_task", f"task_id={task.id}"):
|
||||
session: Session = TasksSessionLocal()
|
||||
try:
|
||||
record = session.query(TaskRecord).filter(TaskRecord.id == task.id).first()
|
||||
if not record:
|
||||
record = TaskRecord(id=task.id)
|
||||
session.add(record)
|
||||
|
||||
record.type = task.plugin_id
|
||||
record.status = task.status.value
|
||||
record.environment_id = task.params.get("environment_id") or task.params.get("source_env_id")
|
||||
record.started_at = task.started_at
|
||||
record.finished_at = task.finished_at
|
||||
record.params = task.params
|
||||
|
||||
# Store logs as JSON, converting datetime to string
|
||||
record.logs = []
|
||||
for log in task.logs:
|
||||
log_dict = log.dict()
|
||||
if isinstance(log_dict.get('timestamp'), datetime):
|
||||
log_dict['timestamp'] = log_dict['timestamp'].isoformat()
|
||||
record.logs.append(log_dict)
|
||||
|
||||
# Extract error if failed
|
||||
if task.status == TaskStatus.FAILED:
|
||||
for log in reversed(task.logs):
|
||||
if log.level == "ERROR":
|
||||
record.error = log.message
|
||||
break
|
||||
|
||||
session.commit()
|
||||
except Exception as e:
|
||||
session.rollback()
|
||||
logger.error(f"Failed to persist task {task.id}: {e}")
|
||||
finally:
|
||||
session.close()
|
||||
# [/DEF:TaskPersistenceService.persist_task:Function]
|
||||
|
||||
# [DEF:TaskPersistenceService.persist_tasks:Function]
|
||||
# @PURPOSE: Persists a list of tasks to the database.
|
||||
# @PRE: Tasks list contains valid Task objects.
|
||||
# @POST: Tasks matching the criteria (AWAITING_INPUT) are saved/updated in the DB.
|
||||
# @PARAM: tasks (List[Task]) - The list of tasks to check and persist.
|
||||
# @PURPOSE: Persists multiple tasks.
|
||||
# @PARAM: tasks (List[Task]) - The list of tasks to persist.
|
||||
def persist_tasks(self, tasks: List[Task]) -> None:
|
||||
with belief_scope("TaskPersistenceService.persist_tasks"):
|
||||
conn = sqlite3.connect(str(self.db_path))
|
||||
cursor = conn.cursor()
|
||||
|
||||
count = 0
|
||||
for task in tasks:
|
||||
if task.status == TaskStatus.AWAITING_INPUT:
|
||||
cursor.execute("""
|
||||
INSERT OR REPLACE INTO persistent_tasks
|
||||
(id, plugin_id, status, created_at, updated_at, input_request, context)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?)
|
||||
""", (
|
||||
task.id,
|
||||
task.plugin_id,
|
||||
task.status.value,
|
||||
task.started_at.isoformat() if task.started_at else datetime.utcnow().isoformat(),
|
||||
datetime.utcnow().isoformat(),
|
||||
json.dumps(task.input_request) if task.input_request else None,
|
||||
json.dumps(task.params)
|
||||
))
|
||||
count += 1
|
||||
|
||||
conn.commit()
|
||||
conn.close()
|
||||
logger.info(f"Persisted {count} tasks awaiting input.")
|
||||
for task in tasks:
|
||||
self.persist_task(task)
|
||||
# [/DEF:TaskPersistenceService.persist_tasks:Function]
|
||||
|
||||
# [DEF:TaskPersistenceService.load_tasks:Function]
|
||||
# @PURPOSE: Loads persisted tasks from the database.
|
||||
# @PRE: Database exists.
|
||||
# @POST: Returns a list of Task objects reconstructed from the DB.
|
||||
# @PURPOSE: Loads tasks from the database.
|
||||
# @PARAM: limit (int) - Max tasks to load.
|
||||
# @PARAM: status (Optional[TaskStatus]) - Filter by status.
|
||||
# @RETURN: List[Task] - The loaded tasks.
|
||||
def load_tasks(self) -> List[Task]:
|
||||
def load_tasks(self, limit: int = 100, status: Optional[TaskStatus] = None) -> List[Task]:
|
||||
with belief_scope("TaskPersistenceService.load_tasks"):
|
||||
if not self.db_path.exists():
|
||||
return []
|
||||
|
||||
conn = sqlite3.connect(str(self.db_path))
|
||||
cursor = conn.cursor()
|
||||
|
||||
# Check if plugin_id column exists (migration for existing db)
|
||||
cursor.execute("PRAGMA table_info(persistent_tasks)")
|
||||
columns = [info[1] for info in cursor.fetchall()]
|
||||
has_plugin_id = "plugin_id" in columns
|
||||
session: Session = TasksSessionLocal()
|
||||
try:
|
||||
query = session.query(TaskRecord)
|
||||
if status:
|
||||
query = query.filter(TaskRecord.status == status.value)
|
||||
|
||||
records = query.order_by(TaskRecord.created_at.desc()).limit(limit).all()
|
||||
|
||||
loaded_tasks = []
|
||||
for record in records:
|
||||
try:
|
||||
logs = []
|
||||
if record.logs:
|
||||
for log_data in record.logs:
|
||||
# Handle timestamp conversion if it's a string
|
||||
if isinstance(log_data.get('timestamp'), str):
|
||||
log_data['timestamp'] = datetime.fromisoformat(log_data['timestamp'])
|
||||
logs.append(LogEntry(**log_data))
|
||||
|
||||
if has_plugin_id:
|
||||
cursor.execute("SELECT id, plugin_id, status, created_at, input_request, context FROM persistent_tasks")
|
||||
else:
|
||||
cursor.execute("SELECT id, status, created_at, input_request, context FROM persistent_tasks")
|
||||
|
||||
rows = cursor.fetchall()
|
||||
|
||||
loaded_tasks = []
|
||||
for row in rows:
|
||||
if has_plugin_id:
|
||||
task_id, plugin_id, status, created_at, input_request_json, context_json = row
|
||||
else:
|
||||
task_id, status, created_at, input_request_json, context_json = row
|
||||
plugin_id = "superset-migration" # Default fallback
|
||||
|
||||
try:
|
||||
task = Task(
|
||||
id=task_id,
|
||||
plugin_id=plugin_id,
|
||||
status=TaskStatus(status),
|
||||
started_at=datetime.fromisoformat(created_at),
|
||||
input_required=True,
|
||||
input_request=json.loads(input_request_json) if input_request_json else None,
|
||||
params=json.loads(context_json) if context_json else {}
|
||||
)
|
||||
loaded_tasks.append(task)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to load task {task_id}: {e}")
|
||||
|
||||
conn.close()
|
||||
return loaded_tasks
|
||||
task = Task(
|
||||
id=record.id,
|
||||
plugin_id=record.type,
|
||||
status=TaskStatus(record.status),
|
||||
started_at=record.started_at,
|
||||
finished_at=record.finished_at,
|
||||
params=record.params or {},
|
||||
logs=logs
|
||||
)
|
||||
loaded_tasks.append(task)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to reconstruct task {record.id}: {e}")
|
||||
|
||||
return loaded_tasks
|
||||
finally:
|
||||
session.close()
|
||||
# [/DEF:TaskPersistenceService.load_tasks:Function]
|
||||
|
||||
# [DEF:TaskPersistenceService.delete_tasks:Function]
|
||||
@@ -145,14 +126,16 @@ class TaskPersistenceService:
|
||||
if not task_ids:
|
||||
return
|
||||
with belief_scope("TaskPersistenceService.delete_tasks"):
|
||||
conn = sqlite3.connect(str(self.db_path))
|
||||
cursor = conn.cursor()
|
||||
placeholders = ', '.join('?' for _ in task_ids)
|
||||
cursor.execute(f"DELETE FROM persistent_tasks WHERE id IN ({placeholders})", task_ids)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
session: Session = TasksSessionLocal()
|
||||
try:
|
||||
session.query(TaskRecord).filter(TaskRecord.id.in_(task_ids)).delete(synchronize_session=False)
|
||||
session.commit()
|
||||
except Exception as e:
|
||||
session.rollback()
|
||||
logger.error(f"Failed to delete tasks: {e}")
|
||||
finally:
|
||||
session.close()
|
||||
# [/DEF:TaskPersistenceService.delete_tasks:Function]
|
||||
|
||||
# [/DEF:TaskPersistenceService:Class]
|
||||
|
||||
# [/DEF:TaskPersistenceModule:Module]
|
||||
Reference in New Issue
Block a user