Files
ss-tools/test_migration_debug.py
2025-12-27 10:16:41 +03:00

188 lines
6.3 KiB
Python

#!/usr/bin/env python3
"""
Debug script to test the migration flow and identify where the issue occurs.
"""
import sys
import os
from pathlib import Path
# Add project root to sys.path
project_root = Path(__file__).parent
sys.path.insert(0, str(project_root))
def test_plugin_loader():
"""Test if the plugin loader can find the migration plugin."""
print("=== Testing Plugin Loader ===")
try:
from backend.src.core.plugin_loader import PluginLoader
from pathlib import Path
plugin_dir = Path(__file__).parent / "backend" / "src" / "plugins"
print(f"Plugin directory: {plugin_dir}")
print(f"Directory exists: {plugin_dir.exists()}")
if plugin_dir.exists():
print(f"Files in plugin directory: {list(plugin_dir.iterdir())}")
plugin_loader = PluginLoader(plugin_dir=str(plugin_dir))
configs = plugin_loader.get_all_plugin_configs()
print(f"Found {len(configs)} plugins:")
for config in configs:
print(f" - {config.name} (ID: {config.id})")
migration_plugin = plugin_loader.get_plugin("superset-migration")
if migration_plugin:
print("✓ Migration plugin found")
print(f" Name: {migration_plugin.name}")
print(f" Description: {migration_plugin.description}")
print(f" Schema: {migration_plugin.get_schema()}")
else:
print("✗ Migration plugin NOT found")
return migration_plugin is not None
except Exception as e:
print(f"✗ Plugin loader test failed: {e}")
import traceback
traceback.print_exc()
return False
def test_task_manager():
"""Test if the task manager can create tasks."""
print("\n=== Testing Task Manager ===")
try:
from backend.src.core.plugin_loader import PluginLoader
from backend.src.core.task_manager import TaskManager
from pathlib import Path
plugin_dir = Path(__file__).parent / "backend" / "src" / "plugins"
plugin_loader = PluginLoader(plugin_dir=str(plugin_dir))
task_manager = TaskManager(plugin_loader)
# Test task creation
test_params = {
"from_env": "dev",
"to_env": "prod",
"dashboard_regex": ".*",
"replace_db_config": False
}
print(f"Creating test task with params: {test_params}")
import asyncio
task = asyncio.run(task_manager.create_task("superset-migration", test_params))
print(f"✓ Task created successfully: {task.id}")
print(f" Status: {task.status}")
print(f" Plugin ID: {task.plugin_id}")
return True
except Exception as e:
print(f"✗ Task manager test failed: {e}")
import traceback
traceback.print_exc()
return False
def test_migration_endpoint():
"""Test the migration endpoint directly."""
print("\n=== Testing Migration Endpoint ===")
try:
from backend.src.api.routes.migration import execute_migration
from backend.src.models.dashboard import DashboardSelection
from backend.src.dependencies import get_config_manager, get_task_manager
# Create a test selection
selection = DashboardSelection(
selected_ids=[1, 2, 3],
source_env_id="ss2",
target_env_id="ss1"
)
print(f"Testing migration with selection: {selection.dict()}")
# This would normally be called by FastAPI with dependencies
# For testing, we'll call it directly
config_manager = get_config_manager()
task_manager = get_task_manager()
import asyncio
# We need to ensure TaskManager uses the SAME loop as run_and_wait
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# Re-initialize TaskManager with the new loop
task_manager.loop = loop
async def run_and_wait():
result = await execute_migration(selection, config_manager, task_manager)
print(f"✓ Migration endpoint test successful: {result}")
task_id = result["task_id"]
print(f"Waiting for task {task_id} to complete...")
for i in range(20):
await asyncio.sleep(1)
task = task_manager.get_task(task_id)
print(f" [{i}] Task status: {task.status}")
if task.status in ["SUCCESS", "FAILED"]:
print(f"Task finished with status: {task.status}")
for log in task.logs:
print(f" [{log.level}] {log.message}")
return task.status == "SUCCESS"
print("Task timed out. Current status: " + task_manager.get_task(task_id).status)
return False
try:
return loop.run_until_complete(run_and_wait())
finally:
loop.close()
except Exception as e:
print(f"✗ Migration endpoint test failed: {e}")
import traceback
traceback.print_exc()
return False
def main():
"""Run all tests."""
print("Migration Debug Test Script")
print("=" * 50)
results = []
# Test 1: Plugin Loader
results.append(("Plugin Loader", test_plugin_loader()))
# Test 2: Task Manager
results.append(("Task Manager", test_task_manager()))
# Test 3: Migration Endpoint
results.append(("Migration Endpoint", test_migration_endpoint()))
# Summary
print("\n" + "=" * 50)
print("TEST RESULTS SUMMARY:")
print("=" * 50)
passed = 0
for test_name, result in results:
status = "PASS" if result else "FAIL"
print(f"{test_name}: {status}")
if result:
passed += 1
print(f"\nTotal: {passed}/{len(results)} tests passed")
if passed == len(results):
print("✓ All tests passed! The migration system appears to be working correctly.")
else:
print("✗ Some tests failed. Check the logs above for details.")
return passed == len(results)
if __name__ == "__main__":
success = main()
sys.exit(0 if success else 1)