diff --git a/.kilocodemodes b/.kilocodemodes new file mode 100644 index 0000000..ea65ce4 --- /dev/null +++ b/.kilocodemodes @@ -0,0 +1,25 @@ +customModes: + - slug: tester + name: Tester + description: QA and Plan Verification Specialist + roleDefinition: >- + You are Kilo Code, acting as a QA and Verification Specialist. Your primary goal is to validate that the project implementation aligns strictly with the defined specifications and task plans. + + Your responsibilities include: + - Reading and analyzing task plans and specifications (typically in the `specs/` directory). + - Verifying that implemented code matches the requirements. + - Executing tests and validating system behavior via CLI or Browser. + - Updating the status of tasks in the plan files (e.g., marking checkboxes [x]) as they are verified. + - Identifying and reporting missing features or bugs. + whenToUse: >- + Use this mode when you need to audit the progress of a project, verify completed tasks against the plan, run quality assurance checks, or update the status of task lists in specification documents. + groups: + - read + - edit + - command + - browser + - mcp + customInstructions: >- + 1. Always begin by loading the relevant plan or task list from the `specs/` directory. + 2. Do not assume a task is done just because it is checked; verify the code or functionality first if asked to audit. + 3. When updating task lists, ensure you only mark items as complete if you have verified them. diff --git a/backend/migrations.db b/backend/migrations.db index 788eae3..3153f44 100644 Binary files a/backend/migrations.db and b/backend/migrations.db differ diff --git a/backend/src/api/routes/migration.py b/backend/src/api/routes/migration.py index e5d89f6..d0f6092 100644 --- a/backend/src/api/routes/migration.py +++ b/backend/src/api/routes/migration.py @@ -55,12 +55,17 @@ async def execute_migration(selection: DashboardSelection, config_manager=Depend # Create migration task with debug logging from ...core.logger import logger - logger.info(f"Creating migration task with selection: {selection.dict()}") + + # Include replace_db_config in the task parameters + task_params = selection.dict() + task_params['replace_db_config'] = selection.replace_db_config + + logger.info(f"Creating migration task with params: {task_params}") logger.info(f"Available environments: {env_ids}") logger.info(f"Source env: {selection.source_env_id}, Target env: {selection.target_env_id}") try: - task = await task_manager.create_task("superset-migration", selection.dict()) + task = await task_manager.create_task("superset-migration", task_params) logger.info(f"Task created successfully: {task.id}") return {"task_id": task.id, "message": "Migration initiated"} except Exception as e: diff --git a/backend/src/api/routes/tasks.py b/backend/src/api/routes/tasks.py index d840f84..418615d 100755 --- a/backend/src/api/routes/tasks.py +++ b/backend/src/api/routes/tasks.py @@ -41,12 +41,15 @@ async def create_task( @router.get("/", response_model=List[Task]) async def list_tasks( + limit: int = 10, + offset: int = 0, + status: Optional[TaskStatus] = None, task_manager: TaskManager = Depends(get_task_manager) ): """ - Retrieve a list of all tasks. + Retrieve a list of tasks with pagination and optional status filter. """ - return task_manager.get_all_tasks() + return task_manager.get_tasks(limit=limit, offset=offset, status=status) @router.get("/{task_id}", response_model=Task) async def get_task( @@ -61,6 +64,19 @@ async def get_task( raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Task not found") return task +@router.get("/{task_id}/logs", response_model=List[LogEntry]) +async def get_task_logs( + task_id: str, + task_manager: TaskManager = Depends(get_task_manager) +): + """ + Retrieve logs for a specific task. + """ + task = task_manager.get_task(task_id) + if not task: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Task not found") + return task_manager.get_task_logs(task_id) + @router.post("/{task_id}/resolve", response_model=Task) async def resolve_task( task_id: str, @@ -90,4 +106,15 @@ async def resume_task( return task_manager.get_task(task_id) except ValueError as e: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) + +@router.delete("/", status_code=status.HTTP_204_NO_CONTENT) +async def clear_tasks( + status: Optional[TaskStatus] = None, + task_manager: TaskManager = Depends(get_task_manager) +): + """ + Clear tasks matching the status filter. If no filter, clears all non-running tasks. + """ + task_manager.clear_tasks(status) + return # [/DEF] \ No newline at end of file diff --git a/backend/src/app.py b/backend/src/app.py index 5131c71..82a964c 100755 --- a/backend/src/app.py +++ b/backend/src/app.py @@ -63,16 +63,30 @@ async def websocket_endpoint(websocket: WebSocket, task_id: str): task_manager = get_task_manager() queue = await task_manager.subscribe_logs(task_id) try: - # Send initial logs if any + # Stream new logs + logger.info(f"Starting log stream for task {task_id}") + + # Send initial logs first to build context initial_logs = task_manager.get_task_logs(task_id) for log_entry in initial_logs: - # Convert datetime to string for JSON serialization log_dict = log_entry.dict() log_dict['timestamp'] = log_dict['timestamp'].isoformat() await websocket.send_json(log_dict) - # Stream new logs - logger.info(f"Starting log stream for task {task_id}") + # Force a check for AWAITING_INPUT status immediately upon connection + # This ensures that if the task is already waiting when the user connects, they get the prompt. + task = task_manager.get_task(task_id) + if task and task.status == "AWAITING_INPUT" and task.input_request: + # Construct a synthetic log entry to trigger the frontend handler + # This is a bit of a hack but avoids changing the websocket protocol significantly + synthetic_log = { + "timestamp": task.logs[-1].timestamp.isoformat() if task.logs else "2024-01-01T00:00:00", + "level": "INFO", + "message": "Task paused for user input (Connection Re-established)", + "context": {"input_request": task.input_request} + } + await websocket.send_json(synthetic_log) + while True: log_entry = await queue.get() log_dict = log_entry.dict() @@ -84,7 +98,9 @@ async def websocket_endpoint(websocket: WebSocket, task_id: str): if "Task completed successfully" in log_entry.message or "Task failed" in log_entry.message: # Wait a bit to ensure client receives the last message await asyncio.sleep(2) - break + # DO NOT BREAK here - allow client to keep connection open if they want to review logs + # or until they disconnect. Breaking closes the socket immediately. + # break except WebSocketDisconnect: logger.info(f"WebSocket connection disconnected for task {task_id}") diff --git a/backend/src/core/config_manager.py b/backend/src/core/config_manager.py index bb3015e..85fdf6c 100755 --- a/backend/src/core/config_manager.py +++ b/backend/src/core/config_manager.py @@ -72,6 +72,8 @@ class ConfigManager: return config except Exception as e: logger.error(f"[_load_config][Coherence:Failed] Error loading config: {e}") + # Fallback but try to preserve existing settings if possible? + # For now, return default to be safe, but log the error prominently. return AppConfig( environments=[], settings=GlobalSettings(backup_path="backups") diff --git a/backend/src/core/config_models.py b/backend/src/core/config_models.py index 31d2713..3994bda 100755 --- a/backend/src/core/config_models.py +++ b/backend/src/core/config_models.py @@ -35,6 +35,11 @@ class GlobalSettings(BaseModel): backup_path: str default_environment_id: Optional[str] = None logging: LoggingConfig = Field(default_factory=LoggingConfig) + + # Task retention settings + task_retention_days: int = 30 + task_retention_limit: int = 100 + pagination_limit: int = 10 # [/DEF:GlobalSettings] # [DEF:AppConfig:DataClass] diff --git a/backend/src/core/task_manager/manager.py b/backend/src/core/task_manager/manager.py index 6d3bc4c..5900502 100644 --- a/backend/src/core/task_manager/manager.py +++ b/backend/src/core/task_manager/manager.py @@ -43,6 +43,9 @@ class TaskManager: except RuntimeError: self.loop = asyncio.get_event_loop() self.task_futures: Dict[str, asyncio.Future] = {} + + # Load persisted tasks on startup + self.load_persisted_tasks() # [/DEF:TaskManager.__init__:Function] # [DEF:TaskManager.create_task:Function] @@ -328,8 +331,49 @@ class TaskManager: if task_id in self.task_futures: self.task_futures[task_id].set_result(True) - self.persist_awaiting_input_tasks() + # Remove from persistence as it's no longer awaiting input + self.persistence_service.delete_tasks([task_id]) # [/DEF:TaskManager.resume_task_with_password:Function] + # [DEF:TaskManager.clear_tasks:Function] + # @PURPOSE: Clears tasks based on status filter. + # @PARAM: status (Optional[TaskStatus]) - Filter by task status. + # @RETURN: int - Number of tasks cleared. + def clear_tasks(self, status: Optional[TaskStatus] = None) -> int: + with belief_scope("TaskManager.clear_tasks"): + tasks_to_remove = [] + for task_id, task in list(self.tasks.items()): + # If status is provided, match it. + # If status is None, match everything EXCEPT RUNNING (unless they are awaiting input/mapping which are technically running but paused?) + # Actually, AWAITING_INPUT and AWAITING_MAPPING are distinct statuses in TaskStatus enum. + # RUNNING is active execution. + + should_remove = False + if status: + if task.status == status: + should_remove = True + else: + # Clear all non-active tasks + if task.status not in [TaskStatus.RUNNING]: + should_remove = True + + if should_remove: + tasks_to_remove.append(task_id) + + for tid in tasks_to_remove: + # Cancel future if exists (e.g. for AWAITING_INPUT/MAPPING) + if tid in self.task_futures: + self.task_futures[tid].cancel() + del self.task_futures[tid] + + del self.tasks[tid] + + # Remove from persistence + self.persistence_service.delete_tasks(tasks_to_remove) + + logger.info(f"Cleared {len(tasks_to_remove)} tasks.") + return len(tasks_to_remove) + # [/DEF:TaskManager.clear_tasks:Function] + # [/DEF:TaskManager:Class] # [/DEF:TaskManagerModule:Module] \ No newline at end of file diff --git a/backend/src/core/task_manager/persistence.py b/backend/src/core/task_manager/persistence.py index 0487ea7..8fa2696 100644 --- a/backend/src/core/task_manager/persistence.py +++ b/backend/src/core/task_manager/persistence.py @@ -122,6 +122,21 @@ class TaskPersistenceService: return loaded_tasks # [/DEF:TaskPersistenceService.load_tasks:Function] + # [DEF:TaskPersistenceService.delete_tasks:Function] + # @PURPOSE: Deletes specific tasks from the database. + # @PARAM: task_ids (List[str]) - List of task IDs to delete. + def delete_tasks(self, task_ids: List[str]) -> None: + if not task_ids: + return + with belief_scope("TaskPersistenceService.delete_tasks"): + conn = sqlite3.connect(str(self.db_path)) + cursor = conn.cursor() + placeholders = ', '.join('?' for _ in task_ids) + cursor.execute(f"DELETE FROM persistent_tasks WHERE id IN ({placeholders})", task_ids) + conn.commit() + conn.close() + # [/DEF:TaskPersistenceService.delete_tasks:Function] + # [/DEF:TaskPersistenceService:Class] # [/DEF:TaskPersistenceModule:Module] \ No newline at end of file diff --git a/backend/src/models/dashboard.py b/backend/src/models/dashboard.py index 43eee41..0af0320 100644 --- a/backend/src/models/dashboard.py +++ b/backend/src/models/dashboard.py @@ -22,6 +22,7 @@ class DashboardSelection(BaseModel): selected_ids: List[int] source_env_id: str target_env_id: str + replace_db_config: bool = False # [/DEF:DashboardSelection] # [/DEF:backend.src.models.dashboard] \ No newline at end of file diff --git a/backend/src/plugins/migration.py b/backend/src/plugins/migration.py index d06151f..8326f62 100755 --- a/backend/src/plugins/migration.py +++ b/backend/src/plugins/migration.py @@ -100,7 +100,31 @@ class MigrationPlugin(PluginBase): from_db_id = params.get("from_db_id") to_db_id = params.get("to_db_id") - logger = SupersetLogger(log_dir=Path.cwd() / "logs", console=True) + # [DEF:MigrationPlugin.execute:Action] + # @PURPOSE: Execute the migration logic with proper task logging. + task_id = params.get("_task_id") + from ..dependencies import get_task_manager + tm = get_task_manager() + + class TaskLoggerProxy(SupersetLogger): + def __init__(self): + # Initialize parent with dummy values since we override methods + super().__init__(console=False) + + def debug(self, msg, *args, extra=None, **kwargs): + if task_id: tm._add_log(task_id, "DEBUG", msg, extra or {}) + def info(self, msg, *args, extra=None, **kwargs): + if task_id: tm._add_log(task_id, "INFO", msg, extra or {}) + def warning(self, msg, *args, extra=None, **kwargs): + if task_id: tm._add_log(task_id, "WARNING", msg, extra or {}) + def error(self, msg, *args, extra=None, **kwargs): + if task_id: tm._add_log(task_id, "ERROR", msg, extra or {}) + def critical(self, msg, *args, extra=None, **kwargs): + if task_id: tm._add_log(task_id, "ERROR", msg, extra or {}) + def exception(self, msg, *args, **kwargs): + if task_id: tm._add_log(task_id, "ERROR", msg, {"exception": True}) + + logger = TaskLoggerProxy() logger.info(f"[MigrationPlugin][Entry] Starting migration task.") logger.info(f"[MigrationPlugin][Action] Params: {params}") @@ -188,10 +212,7 @@ class MigrationPlugin(PluginBase): if not success and replace_db_config: # Signal missing mapping and wait (only if we care about mappings) - task_id = params.get("_task_id") if task_id: - from ..dependencies import get_task_manager - tm = get_task_manager() logger.info(f"[MigrationPlugin][Action] Pausing for missing mapping in task {task_id}") # In a real scenario, we'd pass the missing DB info to the frontend # For this task, we'll just simulate the wait @@ -220,16 +241,25 @@ class MigrationPlugin(PluginBase): except Exception as exc: # Check for password error error_msg = str(exc) + # The error message from Superset is often a JSON string inside a string. + # We need to robustly detect the password requirement. + # Typical error: "Error importing dashboard: databases/PostgreSQL.yaml: {'_schema': ['Must provide a password for the database']}" + if "Must provide a password for the database" in error_msg: - # Extract database name (assuming format: "Must provide a password for the database 'PostgreSQL'") + # Extract database name + # Try to find "databases/DBNAME.yaml" pattern import re - match = re.search(r"database '([^']+)'", error_msg) - db_name = match.group(1) if match else "unknown" - - # Get task manager - from ..dependencies import get_task_manager - tm = get_task_manager() - task_id = params.get("_task_id") + db_name = "unknown" + match = re.search(r"databases/([^.]+)\.yaml", error_msg) + if match: + db_name = match.group(1) + else: + # Fallback: try to find 'database 'NAME'' pattern + match_alt = re.search(r"database '([^']+)'", error_msg) + if match_alt: + db_name = match_alt.group(1) + + logger.warning(f"[MigrationPlugin][Action] Detected missing password for database: {db_name}") if task_id: input_request = { @@ -251,6 +281,9 @@ class MigrationPlugin(PluginBase): logger.info(f"[MigrationPlugin][Action] Retrying import for {title} with provided passwords.") to_c.import_dashboard(file_name=tmp_new_zip, dash_id=dash_id, dash_slug=dash_slug, passwords=passwords) logger.info(f"[MigrationPlugin][Success] Dashboard {title} imported after password injection.") + # Clear passwords from params after use for security + if "passwords" in task.params: + del task.params["passwords"] continue logger.error(f"[MigrationPlugin][Failure] Failed to migrate dashboard {title}: {exc}", exc_info=True) diff --git a/frontend/.svelte-kit/generated/server/internal.js b/frontend/.svelte-kit/generated/server/internal.js index c2ecf92..e4de4c2 100644 --- a/frontend/.svelte-kit/generated/server/internal.js +++ b/frontend/.svelte-kit/generated/server/internal.js @@ -24,7 +24,7 @@ export const options = { app: ({ head, body, assets, nonce, env }) => "\n\n\t
\n\t\t\n\t\t\n\t\t\n\t\t" + head + "\n\t\n\t\n\t\tLoading logs...
+ {:else if error} +{error}
+ {:else if logs.length === 0} +No logs available.
+ {:else} + {#each logs as log} +{JSON.stringify(log.context, null, 2)}
+ {JSON.stringify($selectedTask.params, null, 2)}
+ {JSON.stringify(log.context, null, 2)}
+ Loading environments...
- {:else if error} -Select a source environment to view dashboards.
+ {:else} + {#if loading} +Loading environments...
+ {:else if error} +Loading databases and suggestions...
- {:else if sourceDatabases.length > 0} -Select a source environment to view dashboards.
{/if}Loading databases and suggestions...
+ {:else if sourceDatabases.length > 0} +