#!/usr/bin/env python3 """ Debug script to test the migration flow and identify where the issue occurs. """ import sys import os from pathlib import Path # Add project root to sys.path project_root = Path(__file__).parent sys.path.insert(0, str(project_root)) def test_plugin_loader(): """Test if the plugin loader can find the migration plugin.""" print("=== Testing Plugin Loader ===") try: from backend.src.core.plugin_loader import PluginLoader from pathlib import Path plugin_dir = Path(__file__).parent / "backend" / "src" / "plugins" print(f"Plugin directory: {plugin_dir}") print(f"Directory exists: {plugin_dir.exists()}") if plugin_dir.exists(): print(f"Files in plugin directory: {list(plugin_dir.iterdir())}") plugin_loader = PluginLoader(plugin_dir=str(plugin_dir)) configs = plugin_loader.get_all_plugin_configs() print(f"Found {len(configs)} plugins:") for config in configs: print(f" - {config.name} (ID: {config.id})") migration_plugin = plugin_loader.get_plugin("superset-migration") if migration_plugin: print("✓ Migration plugin found") print(f" Name: {migration_plugin.name}") print(f" Description: {migration_plugin.description}") print(f" Schema: {migration_plugin.get_schema()}") else: print("✗ Migration plugin NOT found") return migration_plugin is not None except Exception as e: print(f"✗ Plugin loader test failed: {e}") import traceback traceback.print_exc() return False def test_task_manager(): """Test if the task manager can create tasks.""" print("\n=== Testing Task Manager ===") try: from backend.src.core.plugin_loader import PluginLoader from backend.src.core.task_manager import TaskManager from pathlib import Path plugin_dir = Path(__file__).parent / "backend" / "src" / "plugins" plugin_loader = PluginLoader(plugin_dir=str(plugin_dir)) task_manager = TaskManager(plugin_loader) # Test task creation test_params = { "from_env": "dev", "to_env": "prod", "dashboard_regex": ".*", "replace_db_config": False } print(f"Creating test task with params: {test_params}") import asyncio task = asyncio.run(task_manager.create_task("superset-migration", test_params)) print(f"✓ Task created successfully: {task.id}") print(f" Status: {task.status}") print(f" Plugin ID: {task.plugin_id}") return True except Exception as e: print(f"✗ Task manager test failed: {e}") import traceback traceback.print_exc() return False def test_migration_endpoint(): """Test the migration endpoint directly.""" print("\n=== Testing Migration Endpoint ===") try: from backend.src.api.routes.migration import execute_migration from backend.src.models.dashboard import DashboardSelection from backend.src.dependencies import get_config_manager, get_task_manager # Create a test selection selection = DashboardSelection( selected_ids=[1, 2, 3], source_env_id="ss2", target_env_id="ss1" ) print(f"Testing migration with selection: {selection.dict()}") # This would normally be called by FastAPI with dependencies # For testing, we'll call it directly config_manager = get_config_manager() task_manager = get_task_manager() import asyncio # We need to ensure TaskManager uses the SAME loop as run_and_wait loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) # Re-initialize TaskManager with the new loop task_manager.loop = loop async def run_and_wait(): result = await execute_migration(selection, config_manager, task_manager) print(f"✓ Migration endpoint test successful: {result}") task_id = result["task_id"] print(f"Waiting for task {task_id} to complete...") for i in range(20): await asyncio.sleep(1) task = task_manager.get_task(task_id) print(f" [{i}] Task status: {task.status}") if task.status in ["SUCCESS", "FAILED"]: print(f"Task finished with status: {task.status}") for log in task.logs: print(f" [{log.level}] {log.message}") return task.status == "SUCCESS" print("Task timed out. Current status: " + task_manager.get_task(task_id).status) return False try: return loop.run_until_complete(run_and_wait()) finally: loop.close() except Exception as e: print(f"✗ Migration endpoint test failed: {e}") import traceback traceback.print_exc() return False def main(): """Run all tests.""" print("Migration Debug Test Script") print("=" * 50) results = [] # Test 1: Plugin Loader results.append(("Plugin Loader", test_plugin_loader())) # Test 2: Task Manager results.append(("Task Manager", test_task_manager())) # Test 3: Migration Endpoint results.append(("Migration Endpoint", test_migration_endpoint())) # Summary print("\n" + "=" * 50) print("TEST RESULTS SUMMARY:") print("=" * 50) passed = 0 for test_name, result in results: status = "PASS" if result else "FAIL" print(f"{test_name}: {status}") if result: passed += 1 print(f"\nTotal: {passed}/{len(results)} tests passed") if passed == len(results): print("✓ All tests passed! The migration system appears to be working correctly.") else: print("✗ Some tests failed. Check the logs above for details.") return passed == len(results) if __name__ == "__main__": success = main() sys.exit(0 if success else 1)