188 lines
6.3 KiB
Python
188 lines
6.3 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Debug script to test the migration flow and identify where the issue occurs.
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
from pathlib import Path
|
|
|
|
# Add project root to sys.path
|
|
project_root = Path(__file__).parent
|
|
sys.path.insert(0, str(project_root))
|
|
|
|
def test_plugin_loader():
|
|
"""Test if the plugin loader can find the migration plugin."""
|
|
print("=== Testing Plugin Loader ===")
|
|
try:
|
|
from backend.src.core.plugin_loader import PluginLoader
|
|
from pathlib import Path
|
|
|
|
plugin_dir = Path(__file__).parent / "backend" / "src" / "plugins"
|
|
print(f"Plugin directory: {plugin_dir}")
|
|
print(f"Directory exists: {plugin_dir.exists()}")
|
|
|
|
if plugin_dir.exists():
|
|
print(f"Files in plugin directory: {list(plugin_dir.iterdir())}")
|
|
|
|
plugin_loader = PluginLoader(plugin_dir=str(plugin_dir))
|
|
configs = plugin_loader.get_all_plugin_configs()
|
|
|
|
print(f"Found {len(configs)} plugins:")
|
|
for config in configs:
|
|
print(f" - {config.name} (ID: {config.id})")
|
|
|
|
migration_plugin = plugin_loader.get_plugin("superset-migration")
|
|
if migration_plugin:
|
|
print("✓ Migration plugin found")
|
|
print(f" Name: {migration_plugin.name}")
|
|
print(f" Description: {migration_plugin.description}")
|
|
print(f" Schema: {migration_plugin.get_schema()}")
|
|
else:
|
|
print("✗ Migration plugin NOT found")
|
|
|
|
return migration_plugin is not None
|
|
|
|
except Exception as e:
|
|
print(f"✗ Plugin loader test failed: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
return False
|
|
|
|
def test_task_manager():
|
|
"""Test if the task manager can create tasks."""
|
|
print("\n=== Testing Task Manager ===")
|
|
try:
|
|
from backend.src.core.plugin_loader import PluginLoader
|
|
from backend.src.core.task_manager import TaskManager
|
|
from pathlib import Path
|
|
|
|
plugin_dir = Path(__file__).parent / "backend" / "src" / "plugins"
|
|
plugin_loader = PluginLoader(plugin_dir=str(plugin_dir))
|
|
task_manager = TaskManager(plugin_loader)
|
|
|
|
# Test task creation
|
|
test_params = {
|
|
"from_env": "dev",
|
|
"to_env": "prod",
|
|
"dashboard_regex": ".*",
|
|
"replace_db_config": False
|
|
}
|
|
|
|
print(f"Creating test task with params: {test_params}")
|
|
import asyncio
|
|
task = asyncio.run(task_manager.create_task("superset-migration", test_params))
|
|
print(f"✓ Task created successfully: {task.id}")
|
|
print(f" Status: {task.status}")
|
|
print(f" Plugin ID: {task.plugin_id}")
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"✗ Task manager test failed: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
return False
|
|
|
|
def test_migration_endpoint():
|
|
"""Test the migration endpoint directly."""
|
|
print("\n=== Testing Migration Endpoint ===")
|
|
try:
|
|
from backend.src.api.routes.migration import execute_migration
|
|
from backend.src.models.dashboard import DashboardSelection
|
|
from backend.src.dependencies import get_config_manager, get_task_manager
|
|
|
|
# Create a test selection
|
|
selection = DashboardSelection(
|
|
selected_ids=[1, 2, 3],
|
|
source_env_id="ss2",
|
|
target_env_id="ss1"
|
|
)
|
|
|
|
print(f"Testing migration with selection: {selection.dict()}")
|
|
|
|
# This would normally be called by FastAPI with dependencies
|
|
# For testing, we'll call it directly
|
|
config_manager = get_config_manager()
|
|
task_manager = get_task_manager()
|
|
|
|
import asyncio
|
|
|
|
# We need to ensure TaskManager uses the SAME loop as run_and_wait
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
|
|
# Re-initialize TaskManager with the new loop
|
|
task_manager.loop = loop
|
|
|
|
async def run_and_wait():
|
|
result = await execute_migration(selection, config_manager, task_manager)
|
|
print(f"✓ Migration endpoint test successful: {result}")
|
|
|
|
task_id = result["task_id"]
|
|
print(f"Waiting for task {task_id} to complete...")
|
|
|
|
for i in range(20):
|
|
await asyncio.sleep(1)
|
|
task = task_manager.get_task(task_id)
|
|
print(f" [{i}] Task status: {task.status}")
|
|
if task.status in ["SUCCESS", "FAILED"]:
|
|
print(f"Task finished with status: {task.status}")
|
|
for log in task.logs:
|
|
print(f" [{log.level}] {log.message}")
|
|
return task.status == "SUCCESS"
|
|
|
|
print("Task timed out. Current status: " + task_manager.get_task(task_id).status)
|
|
return False
|
|
|
|
try:
|
|
return loop.run_until_complete(run_and_wait())
|
|
finally:
|
|
loop.close()
|
|
|
|
except Exception as e:
|
|
print(f"✗ Migration endpoint test failed: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
return False
|
|
|
|
def main():
|
|
"""Run all tests."""
|
|
print("Migration Debug Test Script")
|
|
print("=" * 50)
|
|
|
|
results = []
|
|
|
|
# Test 1: Plugin Loader
|
|
results.append(("Plugin Loader", test_plugin_loader()))
|
|
|
|
# Test 2: Task Manager
|
|
results.append(("Task Manager", test_task_manager()))
|
|
|
|
# Test 3: Migration Endpoint
|
|
results.append(("Migration Endpoint", test_migration_endpoint()))
|
|
|
|
# Summary
|
|
print("\n" + "=" * 50)
|
|
print("TEST RESULTS SUMMARY:")
|
|
print("=" * 50)
|
|
|
|
passed = 0
|
|
for test_name, result in results:
|
|
status = "PASS" if result else "FAIL"
|
|
print(f"{test_name}: {status}")
|
|
if result:
|
|
passed += 1
|
|
|
|
print(f"\nTotal: {passed}/{len(results)} tests passed")
|
|
|
|
if passed == len(results):
|
|
print("✓ All tests passed! The migration system appears to be working correctly.")
|
|
else:
|
|
print("✗ Some tests failed. Check the logs above for details.")
|
|
|
|
return passed == len(results)
|
|
|
|
if __name__ == "__main__":
|
|
success = main()
|
|
sys.exit(0 if success else 1) |