diff --git a/src/aleph/vm/controllers/configuration.py b/src/aleph/vm/controllers/configuration.py index d5a5e6a99..60c487baf 100644 --- a/src/aleph/vm/controllers/configuration.py +++ b/src/aleph/vm/controllers/configuration.py @@ -34,11 +34,13 @@ class QemuVMConfiguration(BaseModel): image_path: str monitor_socket_path: Path qmp_socket_path: Path + qga_socket_path: Path vcpu_count: int mem_size_mb: int interface_name: str | None = None host_volumes: list[QemuVMHostVolume] gpus: list[QemuGPU] + incoming_migration_port: int | None = None # Port for incoming migration mode class QemuConfidentialVMConfiguration(BaseModel): @@ -47,6 +49,7 @@ class QemuConfidentialVMConfiguration(BaseModel): image_path: str monitor_socket_path: Path qmp_socket_path: Path + qga_socket_path: Path vcpu_count: int mem_size_mb: int interface_name: str | None = None @@ -71,9 +74,30 @@ class Configuration(BaseModel): hypervisor: HypervisorType = HypervisorType.firecracker +def get_controller_configuration_path(vm_hash: str) -> Path: + """Get the path to the controller configuration file for a VM.""" + return Path(f"{settings.EXECUTION_ROOT}/{vm_hash}-controller.json") + + +def load_controller_configuration(vm_hash: str) -> Configuration | None: + """Load VM configuration from the controller service configuration file. + + :param vm_hash: The VM hash identifying the configuration file + :return: The Configuration object, or None if the file doesn't exist + """ + config_file_path = get_controller_configuration_path(vm_hash) + + if not config_file_path.exists(): + logger.warning(f"Controller configuration file not found for {vm_hash}") + return None + + with config_file_path.open("r") as f: + return Configuration.model_validate_json(f.read()) + + def save_controller_configuration(vm_hash: str, configuration: Configuration) -> Path: """Save VM configuration to be used by the controller service""" - config_file_path = Path(f"{settings.EXECUTION_ROOT}/{vm_hash}-controller.json") + config_file_path = get_controller_configuration_path(vm_hash) with config_file_path.open("w") as controller_config_file: controller_config_file.write( configuration.model_dump_json( diff --git a/src/aleph/vm/controllers/firecracker/executable.py b/src/aleph/vm/controllers/firecracker/executable.py index 9e96e44b8..a17e23082 100644 --- a/src/aleph/vm/controllers/firecracker/executable.py +++ b/src/aleph/vm/controllers/firecracker/executable.py @@ -309,8 +309,11 @@ async def wait_for_init(self) -> None: May be empty.""" return - async def configure(self): - """Configure the VM by saving controller service configuration""" + async def configure(self, incoming_migration_port: int | None = None): + """Configure the VM by saving controller service configuration. + + Note: incoming_migration_port is ignored for Firecracker VMs as they don't support live migration. + """ if self.persistent: firecracker_config_path = await self.fvm.save_configuration_file(self._firecracker_config) vm_configuration = VMConfiguration( diff --git a/src/aleph/vm/controllers/interface.py b/src/aleph/vm/controllers/interface.py index bff265a2b..e3a0c6ecb 100644 --- a/src/aleph/vm/controllers/interface.py +++ b/src/aleph/vm/controllers/interface.py @@ -72,8 +72,12 @@ async def wait_for_init(self) -> None: May be empty.""" pass - async def configure(self) -> None: - """Configuration done after the VM process is started""" + async def configure(self, incoming_migration_port: int | None = None) -> None: + """Configuration done after the VM process is started. + + :param incoming_migration_port: Optional port for incoming migration (QEMU only). + When set, the VM is configured to wait for migration data instead of booting normally. + """ raise NotImplementedError() async def load_configuration(self) -> None: diff --git a/src/aleph/vm/controllers/qemu/client.py b/src/aleph/vm/controllers/qemu/client.py index c98899d5b..50dd89fe8 100644 --- a/src/aleph/vm/controllers/qemu/client.py +++ b/src/aleph/vm/controllers/qemu/client.py @@ -1,7 +1,65 @@ +import base64 +import time +from enum import Enum + import qmp +import yaml from pydantic import BaseModel +class VmRunStatus(str, Enum): + """QEMU VM run status values.""" + + DEBUG = "debug" + FINISH_MIGRATE = "finish-migrate" + INMIGRATE = "inmigrate" + INTERNAL_ERROR = "internal-error" + IO_ERROR = "io-error" + PAUSED = "paused" + POSTMIGRATE = "postmigrate" + PRELAUNCH = "prelaunch" + RESTORE_VM = "restore-vm" + RUNNING = "running" + SAVE_VM = "save-vm" + SHUTDOWN = "shutdown" + SUSPENDED = "suspended" + WATCHDOG = "watchdog" + GUEST_PANICKED = "guest-panicked" + COLO = "colo" + + +class VmStatus(BaseModel): + """Response from QEMU query-status command.""" + + status: VmRunStatus + running: bool + singlestep: bool = False + + @property + def is_running(self) -> bool: + """Check if VM is actively running.""" + return self.running and self.status == VmRunStatus.RUNNING + + @property + def is_migrating(self) -> bool: + """Check if VM is in migration-related state.""" + return self.status in ( + VmRunStatus.INMIGRATE, + VmRunStatus.POSTMIGRATE, + VmRunStatus.FINISH_MIGRATE, + ) + + @property + def is_error(self) -> bool: + """Check if VM is in an error state.""" + return self.status in ( + VmRunStatus.INTERNAL_ERROR, + VmRunStatus.IO_ERROR, + VmRunStatus.GUEST_PANICKED, + VmRunStatus.SHUTDOWN, + ) + + class VmSevInfo(BaseModel): enabled: bool api_major: int @@ -18,11 +76,19 @@ def __init__(self, vm): if not (vm.qmp_socket_path and vm.qmp_socket_path.exists()): msg = "VM is not running" raise Exception(msg) - client = qmp.QEMUMonitorProtocol(str(vm.qmp_socket_path)) - client.connect() - # qmp_client = qmp.QEMUMonitorProtocol(address=("localhost", vm.qmp_port)) - self.qmp_client = client + qmp_client = qmp.QEMUMonitorProtocol(str(vm.qmp_socket_path)) + qmp_client.connect() + self.qmp_client = qmp_client + + # QGA (QEMU Guest Agent) uses a separate communication channel over virtio-serial. + # The wire protocol is JSON-based and compatible with the QMP library. + if vm.qga_socket_path and vm.qga_socket_path.exists(): + qga_client = qmp.QEMUMonitorProtocol(str(vm.qga_socket_path)) + qga_client.connect() + self.qga_client = qga_client + else: + self.qga_client = None def __enter__(self): return self @@ -32,6 +98,8 @@ def __exit__(self, exc_type, exc_val, exc_tb): def close(self) -> None: self.qmp_client.close() + if self.qga_client: + self.qga_client.close() def query_sev_info(self) -> VmSevInfo: caps = self.qmp_client.command("query-sev") @@ -68,9 +136,148 @@ def continue_execution(self) -> None: """ self.qmp_client.command("cont") - def query_status(self) -> None: + def query_status(self) -> VmStatus: """ Get running status. + + :return: VmStatus with status, running, and singlestep fields + """ + result = self.qmp_client.command("query-status") + return VmStatus.model_validate(result) + + def migrate(self, destination_uri: str, bandwidth_limit_mbps: int | None = None) -> None: + """ + Start live migration with block and incremental mode. + + Uses: migrate -d -b -i (detach, block, incremental) + uri format: tcp:HOST:PORT + + :param destination_uri: The destination URI (e.g., "tcp:192.168.1.100:4444") + :param bandwidth_limit_mbps: Optional bandwidth limit in MB/s + """ + if bandwidth_limit_mbps: + self.qmp_client.command("migrate_set_speed", value=bandwidth_limit_mbps * 1024 * 1024) + + # Enable migration capabilities for better performance and reliability + capabilities = [ + {"capability": "xbzrle", "state": True}, # Compression + {"capability": "auto-converge", "state": True}, # Auto-converge for busy VMs + ] + self.qmp_client.command("migrate-set-capabilities", capabilities=capabilities) + + # Start migration with block migration (copies disk + memory) + # Note: 'blk' and 'inc' are deprecated in newer QEMU, but we use them for compatibility + self.qmp_client.command("migrate", uri=destination_uri, blk=True, inc=True) + + def query_migrate(self) -> dict: + """ + Query migration status. + + Returns dict with keys: + - status: 'none', 'setup', 'cancelling', 'cancelled', 'active', 'postcopy-active', + 'postcopy-paused', 'postcopy-recover', 'completed', 'failed', 'colo', 'pre-switchover' + - total-time: Total elapsed time (ms) + - downtime: Downtime (ms) for completed migrations + - expected-downtime: Expected downtime + - ram: RAM migration stats (transferred, remaining, total, etc.) + - disk: Disk migration stats + """ + return self.qmp_client.command("query-migrate") + + def migrate_cancel(self) -> None: + """Cancel ongoing migration.""" + self.qmp_client.command("migrate_cancel") + + def _get_qga_client(self) -> qmp.QEMUMonitorProtocol: + """Get the QGA client, raising an error if not available.""" + if not self.qga_client: + msg = "QEMU Guest Agent socket is not available" + raise Exception(msg) + return self.qga_client + + def guest_exec(self, command: str, args: list[str] | None = None, capture_output: bool = True) -> dict: + """ + Execute a command in the guest via qemu-guest-agent. + + :param command: The command/path to execute (e.g., "/bin/bash") + :param args: Arguments to pass to the command + :param capture_output: Whether to capture stdout/stderr + :return: Dict with 'pid' key for the started process + """ + qga = self._get_qga_client() + exec_args = {"path": command, "capture-output": capture_output} + if args: + exec_args["arg"] = args + return qga.command("guest-exec", **exec_args) + + def guest_exec_status(self, pid: int) -> dict: + """ + Get the status of a guest-exec command. + + :param pid: The PID returned by guest_exec + :return: Dict with 'exited', 'exitcode', 'out-data', 'err-data' keys + """ + qga = self._get_qga_client() + return qga.command("guest-exec-status", pid=pid) + + def reconfigure_guest_network( + self, + new_ip: str, + gateway: str, + nameservers: list[str], + interface: str = "eth0", + ) -> dict: + """ + Reconfigure guest network via qemu-guest-agent after migration. + + This updates the netplan configuration inside the guest VM with the new + network settings and applies them. + + :param new_ip: New IP address with CIDR notation (e.g., "10.0.0.5/24") + :param gateway: Gateway IP address (e.g., "10.0.0.1") + :param nameservers: List of DNS server IPs (e.g., ["8.8.8.8", "8.8.4.4"]) + :param interface: Network interface name (default: "eth0") + :return: Dict with 'pid' key for the started process + """ + network_config = { + "network": { + "version": 2, + "ethernets": { + interface: { + "addresses": [new_ip], + "routes": [{"to": "default", "via": gateway}], + "nameservers": {"addresses": nameservers}, + }, + }, + }, + } + netplan_yaml = yaml.safe_dump(network_config, default_flow_style=False, sort_keys=False) + + # Create a script that writes the netplan config and applies it + # Use base64 encoding to avoid escaping issues + config_b64 = base64.b64encode(netplan_yaml.encode()).decode() + + script = f""" + echo '{config_b64}' | base64 -d > /etc/netplan/50-cloud-init.yaml + netplan apply + """ + + return self.guest_exec("/bin/bash", ["-c", script]) + + def wait_for_guest_agent(self, timeout_seconds: int = 60) -> bool: + """ + Wait for the qemu-guest-agent to become available. + + :param timeout_seconds: Maximum time to wait + :return: True if agent is available, False if timeout """ - # {'status': 'prelaunch', 'singlestep': False, 'running': False} - return self.qmp_client.command("query-status") + qga = self._get_qga_client() + start_time = time.monotonic() + while time.monotonic() - start_time < timeout_seconds: + try: + # Try to ping the guest agent + qga.command("guest-ping") + return True + except Exception: + time.sleep(1) + return False diff --git a/src/aleph/vm/controllers/qemu/cloudinit.py b/src/aleph/vm/controllers/qemu/cloudinit.py index f212b3e6e..2e879d5d7 100644 --- a/src/aleph/vm/controllers/qemu/cloudinit.py +++ b/src/aleph/vm/controllers/qemu/cloudinit.py @@ -32,7 +32,7 @@ def get_hostname_from_hash(vm_hash: ItemHash) -> str: return base64.b32encode(item_hash_binary).decode().strip("=").lower() -def encode_user_data(hostname, ssh_authorized_keys, has_gpu: bool = False) -> bytes: +def encode_user_data(hostname, ssh_authorized_keys, has_gpu: bool = False, install_guest_agent: bool = True) -> bytes: """Creates user data configuration file for cloud-init tool""" config: dict[str, str | bool | list[str] | list[list[str]]] = { "hostname": hostname, @@ -40,6 +40,7 @@ def encode_user_data(hostname, ssh_authorized_keys, has_gpu: bool = False) -> by "ssh_pwauth": False, "ssh_authorized_keys": ssh_authorized_keys, "resize_rootfs": True, + "package_update": True, } # Add kernel boot parameters for GPU instances to speed up PCI enumeration @@ -65,6 +66,10 @@ def encode_user_data(hostname, ssh_authorized_keys, has_gpu: bool = False) -> by ], ] + if install_guest_agent: + config["packages"] = ["qemu-guest-agent"] + config["runcmd"] = ["systemctl start qemu-guest-agent.service"] + cloud_config_header = "#cloud-config\n" config_output = yaml.safe_dump(config, default_flow_style=False, sort_keys=False) content = (cloud_config_header + config_output).encode() @@ -116,13 +121,14 @@ async def create_cloud_init_drive_image( route, ssh_authorized_keys, has_gpu: bool = False, + install_guest_agent: bool = True, ): with ( NamedTemporaryFile() as user_data_config_file, NamedTemporaryFile() as network_config_file, NamedTemporaryFile() as metadata_config_file, ): - user_data = encode_user_data(hostname, ssh_authorized_keys, has_gpu=has_gpu) + user_data = encode_user_data(hostname, ssh_authorized_keys, has_gpu=has_gpu, install_guest_agent=install_guest_agent) user_data_config_file.write(user_data) user_data_config_file.flush() network_config = create_network_file(ip, ipv6, ipv6_gateway, nameservers, route) @@ -145,7 +151,7 @@ async def create_cloud_init_drive_image( class CloudInitMixin(AlephVmControllerInterface): - async def _create_cloud_init_drive(self) -> Drive: + async def _create_cloud_init_drive(self, install_guest_agent: bool = True) -> Drive: """Creates the cloud-init volume to configure and set up the VM""" ssh_authorized_keys = self.resources.message_content.authorized_keys or [] if settings.USE_DEVELOPER_SSH_KEYS: @@ -175,6 +181,7 @@ async def _create_cloud_init_drive(self) -> Drive: route, ssh_authorized_keys, has_gpu=has_gpu, + install_guest_agent=install_guest_agent, ) return Drive( diff --git a/src/aleph/vm/controllers/qemu/instance.py b/src/aleph/vm/controllers/qemu/instance.py index 054478b42..e6bfa30d0 100644 --- a/src/aleph/vm/controllers/qemu/instance.py +++ b/src/aleph/vm/controllers/qemu/instance.py @@ -168,10 +168,18 @@ def to_dict(self): async def setup(self): pass - async def configure(self): - """Configure the VM by saving controller service configuration""" + async def configure(self, incoming_migration_port: int | None = None): + """Configure the VM by saving controller service configuration. + + :param incoming_migration_port: Optional port for incoming migration. When set, + the VM configuration will include the -incoming flag to wait for migration + data from a source host instead of booting normally. + """ + if incoming_migration_port is not None: + logger.debug(f"Configuring {self} for incoming migration on port {incoming_migration_port}") + else: + logger.debug(f"Making Qemu configuration: {self}") - logger.debug(f"Making Qemu configuration: {self} ") monitor_socket_path = settings.EXECUTION_ROOT / (str(self.vm_hash) + "-monitor.socket") cloud_init_drive = await self._create_cloud_init_drive() @@ -192,6 +200,7 @@ async def configure(self): image_path=image_path, monitor_socket_path=monitor_socket_path, qmp_socket_path=self.qmp_socket_path, + qga_socket_path=self.qga_socket_path, vcpu_count=vcpu_count, mem_size_mb=mem_size_mb, interface_name=interface_name, @@ -204,6 +213,7 @@ async def configure(self): for volume in self.resources.volumes ], gpus=[QemuGPU(pci_host=gpu.pci_host, supports_x_vga=gpu.supports_x_vga) for gpu in self.resources.gpus], + incoming_migration_port=incoming_migration_port, ) configuration = Configuration( @@ -227,6 +237,10 @@ def save_controller_configuration(self): def qmp_socket_path(self) -> Path: return settings.EXECUTION_ROOT / f"{self.vm_hash}-qmp.socket" + @property + def qga_socket_path(self) -> Path: + return settings.EXECUTION_ROOT / f"{self.vm_hash}-qga.socket" + async def start(self): # Start via systemd not here raise NotImplementedError() @@ -243,3 +257,9 @@ async def teardown(self): if self.tap_interface: await self.tap_interface.delete() await self.stop_guest_api() + + def get_ip(self) -> str | None: + """Get the guest IP address.""" + if self.tap_interface: + return str(self.tap_interface.guest_ip) + return None diff --git a/src/aleph/vm/controllers/qemu_confidential/instance.py b/src/aleph/vm/controllers/qemu_confidential/instance.py index 37986b10c..1ccfe72f5 100644 --- a/src/aleph/vm/controllers/qemu_confidential/instance.py +++ b/src/aleph/vm/controllers/qemu_confidential/instance.py @@ -83,13 +83,15 @@ def __init__( async def setup(self): pass - async def configure(self): - """Configure the VM by saving controller service configuration""" + async def configure(self, incoming_migration_port: int | None = None): + """Configure the VM by saving controller service configuration. - logger.debug(f"Making Qemu configuration: {self} ") + Note: incoming_migration_port is ignored for confidential VMs as they don't support live migration. + """ + logger.debug(f"Making Qemu configuration: {self}") monitor_socket_path = settings.EXECUTION_ROOT / (str(self.vm_id) + "-monitor.socket") - cloud_init_drive = await self._create_cloud_init_drive() + cloud_init_drive = await self._create_cloud_init_drive(install_guest_agent=False) image_path = str(self.resources.rootfs_path) firmware_path = str(self.resources.firmware_path) diff --git a/src/aleph/vm/hypervisors/qemu/qemuvm.py b/src/aleph/vm/hypervisors/qemu/qemuvm.py index 2758fada7..e80eaba8f 100644 --- a/src/aleph/vm/hypervisors/qemu/qemuvm.py +++ b/src/aleph/vm/hypervisors/qemu/qemuvm.py @@ -23,6 +23,7 @@ class QemuVM: image_path: str monitor_socket_path: Path qmp_socket_path: Path + qga_socket_path: Path vcpu_count: int mem_size_mb: int interface_name: str @@ -31,6 +32,7 @@ class QemuVM: gpus: list[QemuGPU] journal_stdout: TextIO | None journal_stderr: TextIO | None + incoming_migration_port: int | None = None def __repr__(self) -> str: if self.qemu_process: @@ -44,6 +46,7 @@ def __init__(self, vm_hash, config: QemuVMConfiguration): self.image_path = config.image_path self.monitor_socket_path = config.monitor_socket_path self.qmp_socket_path = config.qmp_socket_path + self.qga_socket_path = config.qga_socket_path self.vcpu_count = config.vcpu_count self.mem_size_mb = config.mem_size_mb self.interface_name = config.interface_name @@ -57,6 +60,7 @@ def __init__(self, vm_hash, config: QemuVMConfiguration): for volume in config.host_volumes ] self.gpus = config.gpus + self.incoming_migration_port = config.incoming_migration_port @property def _journal_stdout_name(self) -> str: @@ -89,7 +93,7 @@ async def start( "-smp", str(self.vcpu_count), "-drive", - f"file={self.image_path},media=disk,if=virtio", + f"file={self.image_path},media=disk,if=virtio,file.locking=off", # To debug you can pass gtk or curses instead "-display", "none", @@ -98,6 +102,13 @@ async def start( # Listen for commands on this socket "-monitor", f"unix:{self.monitor_socket_path},server,nowait", + # Qemu Guest Agent communication channel options + "-device", + "virtio-serial", + "-chardev", + f"socket,path={self.qga_socket_path},server=on,wait=off,id=qga0", + "-device", + "virtserialport,chardev=qga0,name=org.qemu.guest_agent.0", # Listen for commands on this socket (QMP protocol in json). Supervisor use it to send shutdown or start # command "-qmp", @@ -118,12 +129,31 @@ async def start( ] if self.interface_name: # script=no, downscript=no tell qemu not to try to set up the network itself - args += ["-net", "nic,model=virtio", "-net", f"tap,ifname={self.interface_name},script=no,downscript=no"] + # Split network devices in two to be able to use rombar=0 to allow live migration between different hosts + args += [ + "-device", + "virtio-net-pci,netdev=net0,rombar=0", + "-netdev", + f"tap,id=net0,ifname={self.interface_name},script=no,downscript=no", + ] if self.cloud_init_drive_path: args += ["-cdrom", f"{self.cloud_init_drive_path}"] + # Add incoming migration flag if specified (destination mode) + if self.incoming_migration_port is not None: + args += ["-incoming", f"tcp:0.0.0.0:{self.incoming_migration_port}"] + args += self._get_host_volumes_args() + + # Add CPU configuration for migration compatibility + # Use migratable=on to ensure CPU features are compatible across different hosts + # Note: GPU mode uses its own -cpu flag in _get_gpu_args() with host-phys-bits-limit + # Note: Specify -machine type pc-i440fx-6.2 (Qemu from Ubuntu 22.04) + # as less as possible to allow migration between different hosts + if not self.gpus: + args += ["-machine", "pc-i440fx-6.2", "-cpu", "host,migratable=on"] + args += self._get_gpu_args() print(*args) @@ -149,6 +179,9 @@ def _get_host_volumes_args(self): return args def _get_gpu_args(self): + if not self.gpus: + return [] + args = [ # Use host-phys-bits-limit argument for GPU support. TODO: Investigate how to get the correct bits size "-cpu", diff --git a/src/aleph/vm/models.py b/src/aleph/vm/models.py index 6de50a934..a7fab662d 100644 --- a/src/aleph/vm/models.py +++ b/src/aleph/vm/models.py @@ -6,6 +6,8 @@ from collections.abc import Callable, Coroutine from dataclasses import dataclass from datetime import datetime, timezone +from enum import Enum +from typing import Optional from aleph_message.models import ( ExecutableContent, @@ -51,6 +53,17 @@ logger = logging.getLogger(__name__) +class MigrationState(str, Enum): + """State of VM migration process.""" + + NONE = "none" # No migration in progress + PREPARING = "preparing" # Destination preparing to receive + WAITING = "waiting" # Destination waiting for data + MIGRATING = "migrating" # Source sending data + COMPLETED = "completed" # Migration completed successfully + FAILED = "failed" # Migration failed + + @dataclass class VmExecutionTimes: defined_at: datetime @@ -100,6 +113,10 @@ class VmExecution: mapped_ports: dict[int, dict] # Port redirect to the VM record: ExecutionRecord | None = None + # Migration state tracking + migration_state: MigrationState = MigrationState.NONE + migration_port: int | None = None # Port used for migration (on destination) + async def fetch_port_redirect_config_and_setup(self): if not self.is_instance: return @@ -410,7 +427,7 @@ def create( return vm - async def start(self): + async def start(self, incoming_migration_port: int | None = None): assert self.vm, "The VM attribute has to be set before calling start()" self.times.starting_at = datetime.now(tz=timezone.utc) @@ -421,7 +438,11 @@ async def start(self): # for persistent and instances we will use SystemD manager if not self.persistent: await self.vm.start() - await self.vm.configure() + + # Configure the VM, passing migration port if specified + # When migration port is set, QEMU starts with -incoming flag + await self.vm.configure(incoming_migration_port) + await self.vm.start_guest_api() # Start VM and snapshots automatically @@ -434,7 +455,11 @@ async def start(self): await self.wait_for_init() await self.vm.load_configuration() self.times.started_at = datetime.now(tz=timezone.utc) - else: + elif incoming_migration_port is None: + # Only wait for boot on non-migration VMs. + # Migration-receiving VMs are in "inmigrate" state and won't respond to ping + # until migration completes. The migration finalization monitor handles + # waiting for migration completion instead. self.init_task = asyncio.create_task(self.non_blocking_wait_for_boot()) if self.vm and self.vm.support_snapshot and self.snapshot_manager: diff --git a/src/aleph/vm/orchestrator/supervisor.py b/src/aleph/vm/orchestrator/supervisor.py index 4be9fe0b4..e651c762a 100644 --- a/src/aleph/vm/orchestrator/supervisor.py +++ b/src/aleph/vm/orchestrator/supervisor.py @@ -50,6 +50,7 @@ status_public_config, update_allocations, ) +from .views.migration import allocate_migration, migration_start from .views.operator import ( operate_confidential_initialize, operate_confidential_inject_secret, @@ -170,6 +171,9 @@ def setup_webapp(pool: VmPool | None): web.post("/control/allocations", update_allocations), web.post("/control/network/recreate", recreate_network), web.post("/control/proxy/regenerate", regenerate_proxy), + # Migration endpoints (scheduler-only, uses ALLOCATION_TOKEN_HASH auth) + web.post("/control/migrate", allocate_migration), + web.post("/control/machine/{ref}/migration/start", migration_start), # Raise an HTTP Error 404 if attempting to access an unknown URL within these paths. web.get("/about/{suffix:.*}", http_not_found), web.get("/control/{suffix:.*}", http_not_found), diff --git a/src/aleph/vm/orchestrator/views/migration.py b/src/aleph/vm/orchestrator/views/migration.py new file mode 100644 index 000000000..3eded65fc --- /dev/null +++ b/src/aleph/vm/orchestrator/views/migration.py @@ -0,0 +1,621 @@ +""" +Migration endpoints for live VM migration between CRN hosts. + +These endpoints are called by the scheduler to coordinate VM migration: +1. POST /control/migrate - Prepare destination to receive migration +2. POST /control/machine/{ref}/migration/start - Start migration from source +""" + +import asyncio +import logging +import time +from datetime import datetime, timezone +from http import HTTPStatus + +import pydantic +from aiohttp import web +from aleph_message.models import ItemHash, MessageType +from aleph_message.models.execution.environment import HypervisorType +from pydantic import BaseModel + +from aleph.vm.conf import settings +from aleph.vm.controllers.configuration import ( + QemuVMConfiguration, + load_controller_configuration, + save_controller_configuration, +) +from aleph.vm.controllers.qemu.client import QemuVmClient +from aleph.vm.models import MigrationState, VmExecution +from aleph.vm.orchestrator.messages import load_updated_message +from aleph.vm.pool import VmPool +from aleph.vm.utils import cors_allow_all, create_task_log_exceptions, dumps_for_json + +from . import authenticate_api_request +from .operator import get_execution_or_404, get_itemhash_or_400 + +logger = logging.getLogger(__name__) + +# Lock to prevent concurrent migration operations +migration_lock: asyncio.Lock | None = None + +# Track background migration finalization tasks +_migration_finalization_tasks: dict[ItemHash, asyncio.Task] = {} + + +def _clear_incoming_migration_port(vm_hash: ItemHash) -> None: + """ + Clear the incoming_migration_port from the controller configuration file. + + After a successful migration, the destination VM should no longer have the + -incoming flag set. This ensures that if the VM service restarts, it will + boot normally instead of waiting for migration data. + + :param vm_hash: The VM hash identifying the configuration file + """ + try: + configuration = load_controller_configuration(vm_hash) + if configuration is None: + logger.warning(f"Controller configuration file not found for {vm_hash}, skipping incoming port cleanup") + return + + # Check if this is a QEMU VM configuration with incoming_migration_port + vm_config = configuration.vm_configuration + if isinstance(vm_config, QemuVMConfiguration) and vm_config.incoming_migration_port is not None: + vm_config.incoming_migration_port = None + save_controller_configuration(vm_hash, configuration) + logger.info(f"Updated controller configuration for {vm_hash}: cleared incoming_migration_port") + else: + logger.debug(f"No incoming_migration_port found in configuration for {vm_hash}") + + except Exception as e: + logger.error(f"Failed to clear incoming_migration_port from configuration for {vm_hash}: {e}") + + +class AllocateMigrationRequest(BaseModel): + """Request body for POST /control/migrate.""" + + vm_hash: str + migration_port: int + + +class MigrationStartRequest(BaseModel): + """Request body for POST /control/machine/{ref}/migration/start.""" + + destination_host: str + destination_port: int + bandwidth_limit_mbps: int | None = None + + +@cors_allow_all +async def allocate_migration(request: web.Request) -> web.Response: + """ + POST /control/migrate + + Prepare destination host to receive migrating VM. + Called by the scheduler before initiating migration from source. + + Auth: X-Auth-Signature header (scheduler token, same as /control/allocations) + + Body: {"vm_hash": "abc123...", "migration_port": 4444} + + This endpoint: + - Validates scheduler authentication (ALLOCATION_TOKEN_HASH) + - Fetches VM message from Aleph network + - Validates this is a QEMU instance + - Creates destination disk image (sparse QCOW2) + - Sets up network (TAP interface, firewall rules) + - Prepares QEMU configuration with -incoming tcp:0.0.0.0:PORT flag + + Returns: {"status": "ready", "migration_host": "...", "migration_port": 4444, "vm_hash": "..."} + """ + if not authenticate_api_request(request): + return web.HTTPUnauthorized(text="Authentication token received is invalid") + + global migration_lock + if migration_lock is None: + migration_lock = asyncio.Lock() + + try: + data = await request.json() + params = AllocateMigrationRequest.model_validate(data) + except pydantic.ValidationError as error: + return web.json_response(data=error.json(), status=HTTPStatus.BAD_REQUEST) + + pool: VmPool = request.app["vm_pool"] + vm_hash = ItemHash(params.vm_hash) + + async with migration_lock: + # Check if VM already exists on this host + existing = pool.executions.get(vm_hash) + if existing and existing.is_running: + return web.json_response( + {"status": "error", "error": "VM already running on this host", "vm_hash": str(vm_hash)}, + status=HTTPStatus.CONFLICT, + ) + + try: + # Fetch VM message from Aleph network + message, original_message = await load_updated_message(vm_hash) + + # Validate it's an instance + if message.type != MessageType.instance: + return web.json_response( + {"status": "error", "error": "Message is not an instance", "vm_hash": str(vm_hash)}, + status=HTTPStatus.BAD_REQUEST, + ) + + # Validate it's a QEMU instance + hypervisor = message.content.environment.hypervisor or HypervisorType.firecracker + if hypervisor != HypervisorType.qemu: + return web.json_response( + { + "status": "error", + "error": "Live migration only supported for QEMU instances", + "vm_hash": str(vm_hash), + }, + status=HTTPStatus.BAD_REQUEST, + ) + + # Reject confidential VMs - they cannot be live-migrated + if message.content.environment.trusted_execution is not None: + return web.json_response( + { + "status": "error", + "error": "Live migration is not supported for confidential VMs", + "vm_hash": str(vm_hash), + }, + status=HTTPStatus.BAD_REQUEST, + ) + + # Create VM prepared for incoming migration + execution = await pool.create_a_vm( + vm_hash=vm_hash, + message=message.content, + original=original_message.content, + persistent=True, + incoming_migration_port=params.migration_port, + ) + + # Get the host IP from network configuration + migration_host = pool.network.host_ipv4 if pool.network else "0.0.0.0" + + logger.info(f"Prepared VM {vm_hash} for incoming migration on {migration_host}:{params.migration_port}") + + # Start background task to monitor migration completion and finalize + _start_migration_finalization_task(execution, pool) + + return web.json_response( + { + "status": "ready", + "migration_host": migration_host, + "migration_port": params.migration_port, + "vm_hash": str(vm_hash), + }, + status=HTTPStatus.OK, + dumps=dumps_for_json, + ) + + except ValueError as error: + logger.error(f"Failed to prepare migration for {vm_hash}: {error}") + return web.json_response( + {"status": "error", "error": str(error), "vm_hash": str(vm_hash)}, + status=HTTPStatus.BAD_REQUEST, + ) + except Exception as error: + logger.exception(f"Failed to prepare migration for {vm_hash}: {error}") + return web.json_response( + {"status": "error", "error": f"Failed to prepare migration: {error}", "vm_hash": str(vm_hash)}, + status=HTTPStatus.INTERNAL_SERVER_ERROR, + ) + + +@cors_allow_all +async def migration_start(request: web.Request) -> web.Response: + """ + POST /control/machine/{ref}/migration/start + + Start migration from source to destination. + Called by the scheduler after destination is prepared. + + Auth: X-Auth-Signature header (scheduler token, same as /control/allocations) + + Body: {"destination_host": "...", "destination_port": 4444, "bandwidth_limit_mbps": 100} + + This endpoint: + - Validates scheduler authentication (ALLOCATION_TOKEN_HASH) + - Validates running VM + - Sends QMP migrate command (with block migration) + - Polls query-migrate until completion + - After completion: auto-cleanup source VM + + Returns: {"status": "completed", "total_time_ms": ..., "downtime_ms": ..., "transferred_bytes": ...} + """ + if not authenticate_api_request(request): + return web.HTTPUnauthorized(text="Authentication token received is invalid") + + vm_hash = get_itemhash_or_400(request.match_info) + + try: + data = await request.json() + params = MigrationStartRequest.model_validate(data) + except pydantic.ValidationError as error: + return web.json_response(data=error.json(), status=HTTPStatus.BAD_REQUEST) + + pool: VmPool = request.app["vm_pool"] + execution: VmExecution = get_execution_or_404(vm_hash, pool) + + # Validate VM is running + if not execution.is_running: + return web.json_response( + {"status": "error", "error": "VM is not running", "vm_hash": str(vm_hash)}, + status=HTTPStatus.BAD_REQUEST, + ) + + # Validate it's a QEMU instance + if execution.hypervisor != HypervisorType.qemu: + return web.json_response( + {"status": "error", "error": "Live migration only supported for QEMU instances", "vm_hash": str(vm_hash)}, + status=HTTPStatus.BAD_REQUEST, + ) + + # Reject confidential VMs - they cannot be live-migrated + if execution.is_confidential: + return web.json_response( + { + "status": "error", + "error": "Live migration is not supported for confidential VMs", + "vm_hash": str(vm_hash), + }, + status=HTTPStatus.BAD_REQUEST, + ) + + # Check that VM object exists + if not execution.vm: + return web.json_response( + {"status": "error", "error": "VM not properly initialized", "vm_hash": str(vm_hash)}, + status=HTTPStatus.BAD_REQUEST, + ) + + try: + # Update migration state + execution.migration_state = MigrationState.MIGRATING + + # Connect to QMP + vm_client = QemuVmClient(execution.vm) + + # Build destination URI + destination_uri = f"tcp:{params.destination_host}:{params.destination_port}" + logger.info(f"Starting migration of {vm_hash} to {destination_uri}") + + # Start migration + start_time = time.monotonic() + vm_client.migrate(destination_uri, bandwidth_limit_mbps=params.bandwidth_limit_mbps) + + # Poll for migration completion + migration_result = await _wait_for_migration_completion(vm_client, vm_hash) + + if migration_result["status"] == "completed": + total_time_ms = int((time.monotonic() - start_time) * 1000) + + # Extract stats from migration result + downtime_ms = migration_result.get("downtime", 0) + transferred_bytes = migration_result.get("ram", {}).get("transferred", 0) + if "disk" in migration_result: + transferred_bytes += migration_result["disk"].get("transferred", 0) + + logger.info( + f"Migration of {vm_hash} completed in {total_time_ms}ms, " + f"downtime: {downtime_ms}ms, transferred: {transferred_bytes} bytes" + ) + + # Cleanup source VM after successful migration + execution.migration_state = MigrationState.COMPLETED + await _cleanup_source_vm(pool, execution) + + return web.json_response( + { + "status": "completed", + "vm_hash": str(vm_hash), + "total_time_ms": total_time_ms, + "downtime_ms": downtime_ms, + "transferred_bytes": transferred_bytes, + }, + status=HTTPStatus.OK, + dumps=dumps_for_json, + ) + else: + # Migration failed + execution.migration_state = MigrationState.FAILED + error_msg = migration_result.get("error-desc", "Unknown error") + logger.error(f"Migration of {vm_hash} failed: {error_msg}") + return web.json_response( + {"status": "error", "error": f"Migration failed: {error_msg}", "vm_hash": str(vm_hash)}, + status=HTTPStatus.INTERNAL_SERVER_ERROR, + ) + + except Exception as error: + execution.migration_state = MigrationState.FAILED + logger.exception(f"Migration of {vm_hash} failed: {error}") + return web.json_response( + {"status": "error", "error": f"Migration failed: {error}", "vm_hash": str(vm_hash)}, + status=HTTPStatus.INTERNAL_SERVER_ERROR, + ) + + +async def _wait_for_migration_completion( + vm_client: QemuVmClient, + vm_hash: ItemHash, + poll_interval: float = 1.0, + timeout: float = 3600.0, +) -> dict: + """ + Poll migration status until completion or timeout. + + :param vm_client: QMP client for the VM + :param vm_hash: VM hash for logging + :param poll_interval: Seconds between status checks + :param timeout: Maximum time to wait in seconds + :return: Final migration status dict + """ + start_time = time.monotonic() + + while True: + elapsed = time.monotonic() - start_time + if elapsed > timeout: + logger.warning(f"Migration of {vm_hash} timed out after {timeout}s") + vm_client.migrate_cancel() + return {"status": "failed", "error-desc": f"Migration timed out after {timeout}s"} + + status = vm_client.query_migrate() + migration_status = status.get("status", "unknown") + + logger.debug(f"Migration status for {vm_hash}: {migration_status}") + + if migration_status == "completed": + return status + elif migration_status in ("failed", "cancelled"): + return status + elif migration_status in ("active", "setup", "pre-switchover", "postcopy-active"): + # Migration in progress, log progress + if "ram" in status: + ram = status["ram"] + transferred = ram.get("transferred", 0) + total = ram.get("total", 0) + if total > 0: + progress = (transferred / total) * 100 + logger.debug(f"Migration progress for {vm_hash}: {progress:.1f}%") + + await asyncio.sleep(poll_interval) + + +async def _cleanup_source_vm(pool: VmPool, execution: VmExecution) -> None: + """ + Cleanup source VM after successful migration. + + This stops the VM and removes it from the pool. + The VM on the source has already been paused by QEMU during migration. + + :param pool: The VM pool + :param execution: The VM execution to cleanup + """ + vm_hash = execution.vm_hash + logger.info(f"Cleaning up source VM {vm_hash} after migration") + + try: + # Stop the VM (this also handles network teardown) + await pool.stop_vm(vm_hash) + + # Remove from pool + pool.forget_vm(vm_hash) + + logger.info(f"Source VM {vm_hash} cleanup completed") + except Exception as error: + logger.error(f"Error cleaning up source VM {vm_hash}: {error}") + # Don't raise - migration was successful, cleanup failure is non-fatal + + +def _start_migration_finalization_task(execution: VmExecution, pool: VmPool) -> None: + """ + Start a background task to monitor migration completion and finalize on destination. + + This task monitors the VM status until it transitions from "inmigrate" to "running", + then reconfigures the guest network with the new IP address. + + :param execution: The VM execution waiting for migration + :param pool: The VM pool + """ + vm_hash = execution.vm_hash + + # Cancel any existing task for this VM + if vm_hash in _migration_finalization_tasks: + _migration_finalization_tasks[vm_hash].cancel() + + task = create_task_log_exceptions( + _finalize_migration_on_destination(execution, pool), + name=f"migration-finalize-{vm_hash}", + ) + _migration_finalization_tasks[vm_hash] = task + + +async def _finalize_migration_on_destination( + execution: VmExecution, + pool: VmPool, + poll_interval: float = 10.0, + timeout: float = 3600.0, +) -> None: + """ + Monitor migration completion on destination and reconfigure guest network. + + This background task: + 1. Waits for the VM to transition from "inmigrate" to "running" status + 2. Waits for the guest agent to become available + 3. Reconfigures the guest network with the new IP address + + :param execution: The VM execution waiting for migration + :param pool: The VM pool + :param poll_interval: Seconds between status checks + :param timeout: Maximum time to wait in seconds + """ + vm_hash = execution.vm_hash + logger.info(f"Starting migration finalization monitor for {vm_hash}") + + start_time = time.monotonic() + + try: + # Wait for QMP socket to be available + while not execution.vm or not execution.vm.qmp_socket_path.exists(): + if time.monotonic() - start_time > timeout: + logger.error(f"Timeout waiting for QMP socket for {vm_hash}") + execution.migration_state = MigrationState.FAILED + return + await asyncio.sleep(poll_interval) + + # Monitor VM status until migration completes + # We need to check both: + # 1. VM status is "running" (CPU is executing) + # 2. Migration status is "completed" (all data including disk blocks transferred) + migration_complete = False + while not migration_complete: + elapsed = time.monotonic() - start_time + if elapsed > timeout: + logger.error(f"Migration finalization timed out for {vm_hash} after {timeout}s") + execution.migration_state = MigrationState.FAILED + return + + try: + vm_client = QemuVmClient(execution.vm) + status = vm_client.query_status() + + logger.debug(f"Destination VM {vm_hash} status: {status.status.value}, running: {status.running}") + + if status.is_error: + logger.error(f"VM {vm_hash} in error state: {status.status.value}") + execution.migration_state = MigrationState.FAILED + vm_client.close() + return + + if status.is_running: + # VM is running, but we also need to verify migration data transfer is complete + # This is important for block migration where disk data may still be transferring + migrate_info = vm_client.query_migrate() + migrate_status = migrate_info.get("status", "unknown") + + logger.debug(f"Destination VM {vm_hash} migration status: {migrate_status}") + + if migrate_status == "completed": + logger.info(f"Migration completed for {vm_hash}, VM is running and all data transferred") + vm_client.close() + migration_complete = True + break + elif migrate_status in ("active", "postcopy-active", "pre-switchover"): + # Migration still in progress (disk blocks still transferring) + logger.debug(f"VM {vm_hash} running but migration still active: {migrate_status}") + elif migrate_status in ("failed", "cancelled"): + logger.error(f"Migration failed for {vm_hash}: {migrate_status}") + execution.migration_state = MigrationState.FAILED + vm_client.close() + return + elif migrate_status == "none": + # No migration info available, VM is running normally + # This can happen if the destination doesn't track incoming migration status + logger.info(f"Migration completed for {vm_hash}, VM is running (no migration info)") + vm_client.close() + migration_complete = True + break + + vm_client.close() + except Exception as e: + logger.debug(f"Could not query VM status for {vm_hash}: {e}") + + await asyncio.sleep(poll_interval) + + # Migration completed, now reconfigure guest network + await _reconfigure_guest_network(execution) + + # Clear the incoming_migration_port from the controller configuration + # This ensures the VM will boot normally if the service restarts + _clear_incoming_migration_port(vm_hash) + + # Update migration state and mark as started + execution.migration_state = MigrationState.COMPLETED + execution.times.started_at = datetime.now(tz=timezone.utc) + logger.info(f"Migration finalization completed for {vm_hash}") + + except asyncio.CancelledError: + logger.info(f"Migration finalization task cancelled for {vm_hash}") + raise + except Exception as error: + logger.exception(f"Error during migration finalization for {vm_hash}: {error}") + execution.migration_state = MigrationState.FAILED + finally: + # Clean up task reference + if vm_hash in _migration_finalization_tasks: + del _migration_finalization_tasks[vm_hash] + + +async def _reconfigure_guest_network( + execution: VmExecution, + guest_agent_timeout: int = 120, +) -> None: + """ + Reconfigure guest network after migration completes. + + This connects to the guest via qemu-guest-agent and updates the netplan + configuration with the new IP address assigned on this host. + + :param execution: The VM execution + :param guest_agent_timeout: Timeout for guest agent availability + """ + vm_hash = execution.vm_hash + + if not execution.vm or not execution.vm.tap_interface: + logger.warning(f"Cannot reconfigure network for {vm_hash}: no tap interface") + return + + tap = execution.vm.tap_interface + new_ip = tap.guest_ip.with_prefixlen # e.g., "10.0.0.5/24" + gateway = str(tap.host_ip.ip) # e.g., "10.0.0.1" + nameservers = list(settings.DNS_NAMESERVERS) if hasattr(settings, "DNS_NAMESERVERS") else ["8.8.8.8", "8.8.4.4"] + + logger.info(f"Reconfiguring guest network for {vm_hash}: IP={new_ip}, gateway={gateway}") + + try: + vm_client = QemuVmClient(execution.vm) + + # Wait for guest agent to be available + logger.debug(f"Waiting for guest agent on {vm_hash}") + if not vm_client.wait_for_guest_agent(timeout_seconds=guest_agent_timeout): + logger.warning(f"Guest agent not available for {vm_hash}, skipping network reconfiguration") + vm_client.close() + return + + # Reconfigure the network + result = vm_client.reconfigure_guest_network( + new_ip=str(new_ip), + gateway=gateway, + nameservers=nameservers, + ) + + logger.info(f"Network reconfiguration initiated for {vm_hash}, pid={result.get('pid')}") + + # Wait a moment for the command to complete + await asyncio.sleep(2) + + # Check if the command completed successfully + if "pid" in result: + try: + status = vm_client.guest_exec_status(result["pid"]) + if status.get("exited") and status.get("exitcode", -1) == 0: + logger.info(f"Network reconfiguration successful for {vm_hash}") + elif status.get("exited"): + logger.warning( + f"Network reconfiguration may have failed for {vm_hash}, " + f"exit code: {status.get('exitcode')}" + ) + except Exception as e: + logger.debug(f"Could not get guest-exec status for {vm_hash}: {e}") + + vm_client.close() + + except Exception as error: + logger.error(f"Failed to reconfigure guest network for {vm_hash}: {error}") diff --git a/src/aleph/vm/orchestrator/views/operator.py b/src/aleph/vm/orchestrator/views/operator.py index 9601c37e5..97a80ea87 100644 --- a/src/aleph/vm/orchestrator/views/operator.py +++ b/src/aleph/vm/orchestrator/views/operator.py @@ -460,10 +460,10 @@ async def operate_confidential_inject_secret(request: web.Request, authenticated vm_client.continue_execution() status = vm_client.query_status() - print(status["status"] != "running") + print(status.status != "running") return web.json_response( - data={"status": status}, + data={"status": status.model_dump()}, status=200, dumps=dumps_for_json, ) diff --git a/src/aleph/vm/pool.py b/src/aleph/vm/pool.py index ed99e7877..7bfb66d9d 100644 --- a/src/aleph/vm/pool.py +++ b/src/aleph/vm/pool.py @@ -34,7 +34,7 @@ from aleph.vm.vm_type import VmType from .haproxy import fetch_list_and_update -from .models import ExecutableContent, VmExecution +from .models import ExecutableContent, MigrationState, VmExecution from .network.firewall import setup_nftables_for_vm logger = logging.getLogger(__name__) @@ -137,19 +137,46 @@ def calculate_available_disk(self) -> int: return available_space async def create_a_vm( - self, vm_hash: ItemHash, message: ExecutableContent, original: ExecutableContent, persistent: bool + self, + vm_hash: ItemHash, + message: ExecutableContent, + original: ExecutableContent, + persistent: bool, + incoming_migration_port: int | None = None, ) -> VmExecution: - """Create a new VM from an Aleph function or instance message.""" + """Create a new VM from an Aleph function or instance message. + + :param vm_hash: The hash of the VM message + :param message: The executable content of the VM + :param original: The original executable content + :param persistent: Whether the VM should be persistent + :param incoming_migration_port: Optional port for incoming migration. When set, + the VM is prepared to receive migration data from a source host with QEMU's + -incoming flag. The VM will wait for migration data instead of booting normally. + :return: The VmExecution object + """ + is_migration = incoming_migration_port is not None + async with self.creation_lock: - # Check if an execution is already present for this VM, then return it. - # Do not `await` in this section. - current_execution = self.get_running_vm(vm_hash) + # Check if an execution is already present for this VM + current_execution = self.executions.get(vm_hash) if current_execution: - return current_execution + if is_migration: + # For migration, we must not have an existing VM + msg = f"VM {vm_hash} already exists on this host" + raise ValueError(msg) + # For normal creation, return existing execution if running + if current_execution.is_running and not current_execution.is_stopping: + current_execution.cancel_expiration() + return current_execution # Check if there are sufficient resources available before creating the VM check_sufficient_resources(self.calculate_available_disk(), message) + # For migration, force persistent=True since migrated VMs are always persistent instances + if is_migration: + persistent = True + execution = VmExecution( vm_hash=vm_hash, message=message, @@ -158,16 +185,24 @@ async def create_a_vm( systemd_manager=self.systemd_manager, persistent=persistent, ) + + # Set migration state if this is a migration + if is_migration: + execution.migration_state = MigrationState.PREPARING + execution.migration_port = incoming_migration_port + self.executions[vm_hash] = execution resources = set() try: + # GPU handling (not used for migration as it's QEMU instances only) if message.requirements and message.requirements.gpu: # Ensure we have the necessary GPU for the user by reserving them resources = self.find_resources_available_for_user(message, message.address) # First assign Host GPUs from the available execution.prepare_gpus(list(resources)) # Prepare VM general Resources and also the GPUs + await execution.prepare() vm_id = self.get_unique_vm_id() @@ -179,20 +214,30 @@ async def create_a_vm( if self.network.interface_exists(vm_id): await tap_interface.delete() await self.network.create_tap(vm_id, tap_interface) - else: tap_interface = None execution.create(vm_id=vm_id, tap_interface=tap_interface) - await execution.start() + + # Start the VM, passing migration port if this is a migration + await execution.start(incoming_migration_port) + if execution.is_instance: await execution.fetch_port_redirect_config_and_setup() - # clear the user reservations + # Update migration state after successful start + if is_migration: + execution.migration_state = MigrationState.WAITING + logger.info(f"VM {vm_hash} prepared for incoming migration on port {incoming_migration_port}") + + # Clear the user reservations for resource in resources: if resource in self.reservations: del self.reservations[resource] + except Exception: + if is_migration: + execution.migration_state = MigrationState.FAILED if execution.is_instance: # ensure the VM is removed from the pool on creation error await execution.removed_all_ports_redirection() diff --git a/tests/supervisor/test_qemu_client.py b/tests/supervisor/test_qemu_client.py new file mode 100644 index 000000000..0a81805a9 --- /dev/null +++ b/tests/supervisor/test_qemu_client.py @@ -0,0 +1,416 @@ +"""Tests for the QEMU VM client and status models.""" + +import pytest + +from aleph.vm.controllers.qemu.client import VmRunStatus, VmStatus + + +class TestVmRunStatus: + """Tests for the VmRunStatus enum.""" + + def test_all_status_values_exist(self): + """Test that all expected QEMU status values are defined.""" + expected_statuses = [ + "debug", + "finish-migrate", + "inmigrate", + "internal-error", + "io-error", + "paused", + "postmigrate", + "prelaunch", + "restore-vm", + "running", + "save-vm", + "shutdown", + "suspended", + "watchdog", + "guest-panicked", + "colo", + ] + for status in expected_statuses: + assert status in [s.value for s in VmRunStatus] + + def test_status_string_values(self): + """Test that status values match QEMU's expected strings.""" + assert VmRunStatus.RUNNING.value == "running" + assert VmRunStatus.INMIGRATE.value == "inmigrate" + assert VmRunStatus.PAUSED.value == "paused" + assert VmRunStatus.PRELAUNCH.value == "prelaunch" + + +class TestVmStatus: + """Tests for the VmStatus model.""" + + def test_create_running_status(self): + """Test creating a running VM status.""" + status = VmStatus(status=VmRunStatus.RUNNING, running=True, singlestep=False) + assert status.status == VmRunStatus.RUNNING + assert status.running is True + assert status.singlestep is False + + def test_create_from_dict(self): + """Test creating VmStatus from a dict (like QMP response).""" + qmp_response = {"status": "running", "running": True, "singlestep": False} + status = VmStatus.model_validate(qmp_response) + assert status.status == VmRunStatus.RUNNING + assert status.running is True + + def test_create_inmigrate_status(self): + """Test creating an inmigrate VM status.""" + qmp_response = {"status": "inmigrate", "running": False, "singlestep": False} + status = VmStatus.model_validate(qmp_response) + assert status.status == VmRunStatus.INMIGRATE + assert status.running is False + + def test_is_running_property(self): + """Test the is_running property.""" + # VM is running + running_status = VmStatus(status=VmRunStatus.RUNNING, running=True) + assert running_status.is_running is True + + # VM has running status but running=False (shouldn't happen, but test anyway) + weird_status = VmStatus(status=VmRunStatus.RUNNING, running=False) + assert weird_status.is_running is False + + # VM is paused + paused_status = VmStatus(status=VmRunStatus.PAUSED, running=False) + assert paused_status.is_running is False + + def test_is_migrating_property(self): + """Test the is_migrating property.""" + # Incoming migration + inmigrate_status = VmStatus(status=VmRunStatus.INMIGRATE, running=False) + assert inmigrate_status.is_migrating is True + + # Post migration + postmigrate_status = VmStatus(status=VmRunStatus.POSTMIGRATE, running=False) + assert postmigrate_status.is_migrating is True + + # Finish migration + finish_status = VmStatus(status=VmRunStatus.FINISH_MIGRATE, running=False) + assert finish_status.is_migrating is True + + # Running (not migrating) + running_status = VmStatus(status=VmRunStatus.RUNNING, running=True) + assert running_status.is_migrating is False + + def test_is_error_property(self): + """Test the is_error property.""" + # Internal error + error_status = VmStatus(status=VmRunStatus.INTERNAL_ERROR, running=False) + assert error_status.is_error is True + + # IO error + io_error_status = VmStatus(status=VmRunStatus.IO_ERROR, running=False) + assert io_error_status.is_error is True + + # Guest panicked + panic_status = VmStatus(status=VmRunStatus.GUEST_PANICKED, running=False) + assert panic_status.is_error is True + + # Shutdown + shutdown_status = VmStatus(status=VmRunStatus.SHUTDOWN, running=False) + assert shutdown_status.is_error is True + + # Running (not an error) + running_status = VmStatus(status=VmRunStatus.RUNNING, running=True) + assert running_status.is_error is False + + def test_singlestep_default(self): + """Test that singlestep defaults to False.""" + qmp_response = {"status": "running", "running": True} + status = VmStatus.model_validate(qmp_response) + assert status.singlestep is False + + def test_model_dump(self): + """Test that model_dump works for JSON serialization.""" + status = VmStatus(status=VmRunStatus.RUNNING, running=True, singlestep=False) + dumped = status.model_dump() + assert dumped["status"] == VmRunStatus.RUNNING + assert dumped["running"] is True + assert dumped["singlestep"] is False + + +def _make_mock_vm(mocker): + """Create a mock VM with both QMP and QGA socket paths.""" + mock_vm = mocker.Mock() + mock_vm.qmp_socket_path = mocker.Mock() + mock_vm.qmp_socket_path.exists.return_value = True + mock_vm.qga_socket_path = mocker.Mock() + mock_vm.qga_socket_path.exists.return_value = True + return mock_vm + + +class TestQemuVmClientMocked: + """Tests for QemuVmClient methods using mocks.""" + + def test_migrate_builds_correct_uri(self, mocker): + """Test that migrate method builds the correct destination URI.""" + mock_qmp = mocker.Mock() + mock_qga = mocker.Mock() + mock_vm = _make_mock_vm(mocker) + + mocker.patch( + "aleph.vm.controllers.qemu.client.qmp.QEMUMonitorProtocol", + side_effect=[mock_qmp, mock_qga], + ) + + from aleph.vm.controllers.qemu.client import QemuVmClient + + client = QemuVmClient(mock_vm) + client.migrate("tcp:192.168.1.100:4444") + + # Verify migrate was called on QMP client with correct parameters + mock_qmp.command.assert_any_call("migrate-set-capabilities", capabilities=mocker.ANY) + mock_qmp.command.assert_any_call("migrate", uri="tcp:192.168.1.100:4444", blk=True, inc=True) + + def test_migrate_with_bandwidth_limit(self, mocker): + """Test that migrate sets bandwidth limit when specified.""" + mock_qmp = mocker.Mock() + mock_qga = mocker.Mock() + mock_vm = _make_mock_vm(mocker) + + mocker.patch( + "aleph.vm.controllers.qemu.client.qmp.QEMUMonitorProtocol", + side_effect=[mock_qmp, mock_qga], + ) + + from aleph.vm.controllers.qemu.client import QemuVmClient + + client = QemuVmClient(mock_vm) + client.migrate("tcp:192.168.1.100:4444", bandwidth_limit_mbps=100) + + # Verify bandwidth limit was set on QMP client (100 MB/s = 100 * 1024 * 1024 bytes/s) + mock_qmp.command.assert_any_call("migrate_set_speed", value=100 * 1024 * 1024) + + def test_query_migrate_returns_dict(self, mocker): + """Test that query_migrate returns the migration status dict.""" + mock_qmp = mocker.Mock() + mock_qmp.command.return_value = { + "status": "active", + "ram": {"transferred": 1000000, "total": 5000000}, + } + mock_qga = mocker.Mock() + mock_vm = _make_mock_vm(mocker) + + mocker.patch( + "aleph.vm.controllers.qemu.client.qmp.QEMUMonitorProtocol", + side_effect=[mock_qmp, mock_qga], + ) + + from aleph.vm.controllers.qemu.client import QemuVmClient + + client = QemuVmClient(mock_vm) + result = client.query_migrate() + + assert result["status"] == "active" + assert result["ram"]["transferred"] == 1000000 + + def test_query_status_returns_vm_status(self, mocker): + """Test that query_status returns a VmStatus object.""" + mock_qmp = mocker.Mock() + mock_qmp.command.return_value = {"status": "running", "running": True, "singlestep": False} + mock_qga = mocker.Mock() + mock_vm = _make_mock_vm(mocker) + + mocker.patch( + "aleph.vm.controllers.qemu.client.qmp.QEMUMonitorProtocol", + side_effect=[mock_qmp, mock_qga], + ) + + from aleph.vm.controllers.qemu.client import QemuVmClient + + client = QemuVmClient(mock_vm) + status = client.query_status() + + assert isinstance(status, VmStatus) + assert status.status == VmRunStatus.RUNNING + assert status.is_running is True + + def test_migrate_cancel(self, mocker): + """Test that migrate_cancel calls the correct QMP command.""" + mock_qmp = mocker.Mock() + mock_qga = mocker.Mock() + mock_vm = _make_mock_vm(mocker) + + mocker.patch( + "aleph.vm.controllers.qemu.client.qmp.QEMUMonitorProtocol", + side_effect=[mock_qmp, mock_qga], + ) + + from aleph.vm.controllers.qemu.client import QemuVmClient + + client = QemuVmClient(mock_vm) + client.migrate_cancel() + + mock_qmp.command.assert_called_with("migrate_cancel") + + def test_guest_exec(self, mocker): + """Test that guest_exec calls the correct QGA command.""" + mock_qmp = mocker.Mock() + mock_qga = mocker.Mock() + mock_qga.command.return_value = {"pid": 12345} + mock_vm = _make_mock_vm(mocker) + + mocker.patch( + "aleph.vm.controllers.qemu.client.qmp.QEMUMonitorProtocol", + side_effect=[mock_qmp, mock_qga], + ) + + from aleph.vm.controllers.qemu.client import QemuVmClient + + client = QemuVmClient(mock_vm) + result = client.guest_exec("/bin/bash", ["-c", "echo test"]) + + assert result["pid"] == 12345 + # Verify the command was sent through QGA, not QMP + mock_qga.command.assert_called_with( + "guest-exec", path="/bin/bash", arg=["-c", "echo test"], **{"capture-output": True} + ) + mock_qmp.command.assert_not_called() + + def test_guest_exec_status(self, mocker): + """Test that guest_exec_status returns command status via QGA.""" + mock_qmp = mocker.Mock() + mock_qga = mocker.Mock() + mock_qga.command.return_value = {"exited": True, "exitcode": 0} + mock_vm = _make_mock_vm(mocker) + + mocker.patch( + "aleph.vm.controllers.qemu.client.qmp.QEMUMonitorProtocol", + side_effect=[mock_qmp, mock_qga], + ) + + from aleph.vm.controllers.qemu.client import QemuVmClient + + client = QemuVmClient(mock_vm) + result = client.guest_exec_status(12345) + + assert result["exited"] is True + assert result["exitcode"] == 0 + # Verify the command was sent through QGA, not QMP + mock_qga.command.assert_called_with("guest-exec-status", pid=12345) + mock_qmp.command.assert_not_called() + + def test_reconfigure_guest_network(self, mocker): + """Test that reconfigure_guest_network creates correct netplan config via QGA.""" + mock_qmp = mocker.Mock() + mock_qga = mocker.Mock() + mock_qga.command.return_value = {"pid": 12345} + mock_vm = _make_mock_vm(mocker) + + mocker.patch( + "aleph.vm.controllers.qemu.client.qmp.QEMUMonitorProtocol", + side_effect=[mock_qmp, mock_qga], + ) + + from aleph.vm.controllers.qemu.client import QemuVmClient + + client = QemuVmClient(mock_vm) + result = client.reconfigure_guest_network( + new_ip="10.0.0.5/24", + gateway="10.0.0.1", + nameservers=["8.8.8.8", "8.8.4.4"], + ) + + assert result["pid"] == 12345 + # Verify guest-exec was called on QGA with bash + call_args = mock_qga.command.call_args + assert call_args[0][0] == "guest-exec" + assert call_args[1]["path"] == "/bin/bash" + mock_qmp.command.assert_not_called() + + def test_wait_for_guest_agent_success(self, mocker): + """Test wait_for_guest_agent returns True when agent responds via QGA.""" + mock_qmp = mocker.Mock() + mock_qga = mocker.Mock() + mock_qga.command.return_value = {} # guest-ping returns empty dict + mock_vm = _make_mock_vm(mocker) + + mocker.patch( + "aleph.vm.controllers.qemu.client.qmp.QEMUMonitorProtocol", + side_effect=[mock_qmp, mock_qga], + ) + mocker.patch("aleph.vm.controllers.qemu.client.time.sleep") + mocker.patch("aleph.vm.controllers.qemu.client.time.monotonic", side_effect=[0, 1]) + + from aleph.vm.controllers.qemu.client import QemuVmClient + + client = QemuVmClient(mock_vm) + result = client.wait_for_guest_agent(timeout_seconds=10) + + assert result is True + mock_qga.command.assert_called_with("guest-ping") + + def test_wait_for_guest_agent_timeout(self, mocker): + """Test wait_for_guest_agent returns False on timeout.""" + mock_qmp = mocker.Mock() + mock_qga = mocker.Mock() + mock_qga.command.side_effect = Exception("Guest agent not available") + mock_vm = _make_mock_vm(mocker) + + mocker.patch( + "aleph.vm.controllers.qemu.client.qmp.QEMUMonitorProtocol", + side_effect=[mock_qmp, mock_qga], + ) + mocker.patch("aleph.vm.controllers.qemu.client.time.sleep") + # Simulate time passing + mocker.patch("aleph.vm.controllers.qemu.client.time.monotonic", side_effect=[0, 5, 11]) + + from aleph.vm.controllers.qemu.client import QemuVmClient + + client = QemuVmClient(mock_vm) + result = client.wait_for_guest_agent(timeout_seconds=10) + + assert result is False + + def test_client_raises_on_missing_socket(self, mocker): + """Test that client raises exception when QMP socket doesn't exist.""" + mock_vm = mocker.Mock() + mock_vm.qmp_socket_path = mocker.Mock() + mock_vm.qmp_socket_path.exists.return_value = False + + from aleph.vm.controllers.qemu.client import QemuVmClient + + with pytest.raises(Exception, match="VM is not running"): + QemuVmClient(mock_vm) + + def test_guest_exec_raises_when_qga_unavailable(self, mocker): + """Test that guest_exec raises when QGA socket is not available.""" + mock_qmp = mocker.Mock() + mock_vm = mocker.Mock() + mock_vm.qmp_socket_path = mocker.Mock() + mock_vm.qmp_socket_path.exists.return_value = True + mock_vm.qga_socket_path = mocker.Mock() + mock_vm.qga_socket_path.exists.return_value = False + + mocker.patch( + "aleph.vm.controllers.qemu.client.qmp.QEMUMonitorProtocol", + return_value=mock_qmp, + ) + + from aleph.vm.controllers.qemu.client import QemuVmClient + + client = QemuVmClient(mock_vm) + with pytest.raises(Exception, match="QEMU Guest Agent socket is not available"): + client.guest_exec("/bin/bash", ["-c", "echo test"]) + + def test_close_closes_both_clients(self, mocker): + """Test that close() closes both QMP and QGA connections.""" + mock_qmp = mocker.Mock() + mock_qga = mocker.Mock() + mock_vm = _make_mock_vm(mocker) + + mocker.patch( + "aleph.vm.controllers.qemu.client.qmp.QEMUMonitorProtocol", + side_effect=[mock_qmp, mock_qga], + ) + + from aleph.vm.controllers.qemu.client import QemuVmClient + + client = QemuVmClient(mock_vm) + client.close() + + mock_qmp.close.assert_called_once() + mock_qga.close.assert_called_once() diff --git a/tests/supervisor/views/test_migration.py b/tests/supervisor/views/test_migration.py new file mode 100644 index 000000000..1c0fe11f5 --- /dev/null +++ b/tests/supervisor/views/test_migration.py @@ -0,0 +1,561 @@ +"""Tests for the migration views and endpoints.""" + +import asyncio +from http import HTTPStatus +from unittest import mock + +import pytest +from aiohttp.test_utils import TestClient +from aleph_message.models import ItemHash +from aleph_message.models.execution.environment import HypervisorType + +from aleph.vm.conf import settings +from aleph.vm.models import MigrationState +from aleph.vm.orchestrator.supervisor import setup_webapp +from aleph.vm.storage import get_message + + +@pytest.fixture +def mock_vm_hash(): + """Return a valid VM hash for testing.""" + return ItemHash(settings.FAKE_INSTANCE_ID) + + +@pytest.fixture +def mock_scheduler_auth(mocker): + """Mock the scheduler authentication to always pass.""" + mocker.patch( + "aleph.vm.orchestrator.views.migration.authenticate_api_request", + return_value=True, + ) + + +class TestAllocateMigrationEndpoint: + """Tests for POST /control/migrate endpoint.""" + + @pytest.mark.asyncio + async def test_allocate_migration_unauthorized(self, aiohttp_client, mocker): + """Test that unauthorized requests are rejected.""" + mocker.patch( + "aleph.vm.orchestrator.views.migration.authenticate_api_request", + return_value=False, + ) + + pool = mocker.Mock(executions={}) + app = setup_webapp(pool=pool) + client: TestClient = await aiohttp_client(app) + + response = await client.post( + "/control/migrate", + json={"vm_hash": "a" * 64, "migration_port": 4444}, + ) + + assert response.status == HTTPStatus.UNAUTHORIZED + + @pytest.mark.asyncio + async def test_allocate_migration_invalid_request(self, aiohttp_client, mocker, mock_scheduler_auth): + """Test that invalid request body is rejected.""" + pool = mocker.Mock(executions={}) + app = setup_webapp(pool=pool) + client: TestClient = await aiohttp_client(app) + + # Missing migration_port + response = await client.post( + "/control/migrate", + json={"vm_hash": "a" * 64}, + ) + + assert response.status == HTTPStatus.BAD_REQUEST + + @pytest.mark.asyncio + async def test_allocate_migration_vm_already_exists( + self, aiohttp_client, mocker, mock_scheduler_auth, mock_vm_hash + ): + """Test that allocate fails if VM already exists on host.""" + pool = mocker.Mock( + executions={ + mock_vm_hash: mocker.Mock(is_running=True), + } + ) + app = setup_webapp(pool=pool) + client: TestClient = await aiohttp_client(app) + + response = await client.post( + "/control/migrate", + json={"vm_hash": str(mock_vm_hash), "migration_port": 4444}, + ) + + assert response.status == HTTPStatus.CONFLICT + data = await response.json() + assert data["status"] == "error" + assert "already running" in data["error"] + + @pytest.mark.asyncio + async def test_allocate_migration_not_instance(self, aiohttp_client, mocker, mock_scheduler_auth, mock_vm_hash): + """Test that allocate fails for non-instance messages.""" + from aleph_message.models import MessageType + + mock_message = mocker.Mock() + mock_message.type = MessageType.program # Not an instance + + mocker.patch( + "aleph.vm.orchestrator.views.migration.load_updated_message", + return_value=(mock_message, mock_message), + ) + + pool = mocker.Mock(executions={}) + app = setup_webapp(pool=pool) + client: TestClient = await aiohttp_client(app) + + response = await client.post( + "/control/migrate", + json={"vm_hash": str(mock_vm_hash), "migration_port": 4444}, + ) + + assert response.status == HTTPStatus.BAD_REQUEST + data = await response.json() + assert "not an instance" in data["error"] + + @pytest.mark.asyncio + async def test_allocate_migration_not_qemu(self, aiohttp_client, mocker, mock_scheduler_auth, mock_vm_hash): + """Test that allocate fails for non-QEMU instances.""" + from aleph_message.models import MessageType + + mock_message = mocker.Mock() + mock_message.type = MessageType.instance + mock_message.content = mocker.Mock() + mock_message.content.environment = mocker.Mock() + mock_message.content.environment.hypervisor = HypervisorType.firecracker + + mocker.patch( + "aleph.vm.orchestrator.views.migration.load_updated_message", + return_value=(mock_message, mock_message), + ) + + pool = mocker.Mock(executions={}) + app = setup_webapp(pool=pool) + client: TestClient = await aiohttp_client(app) + + response = await client.post( + "/control/migrate", + json={"vm_hash": str(mock_vm_hash), "migration_port": 4444}, + ) + + assert response.status == HTTPStatus.BAD_REQUEST + data = await response.json() + assert "QEMU" in data["error"] + + @pytest.mark.asyncio + async def test_allocate_migration_success(self, aiohttp_client, mocker, mock_scheduler_auth, mock_vm_hash): + """Test successful migration allocation.""" + from aleph_message.models import MessageType + + mock_message = mocker.Mock() + mock_message.type = MessageType.instance + mock_message.content = mocker.Mock() + mock_message.content.environment = mocker.Mock() + mock_message.content.environment.hypervisor = HypervisorType.qemu + + mocker.patch( + "aleph.vm.orchestrator.views.migration.load_updated_message", + return_value=(mock_message, mock_message), + ) + + mock_execution = mocker.Mock() + mock_execution.vm_hash = mock_vm_hash + + pool = mocker.AsyncMock() + pool.executions = {} + pool.network = mocker.Mock(host_ipv4="192.168.1.100") + pool.create_a_vm = mocker.AsyncMock(return_value=mock_execution) + + # Mock the finalization task + mocker.patch( + "aleph.vm.orchestrator.views.migration._start_migration_finalization_task", + ) + + app = setup_webapp(pool=pool) + client: TestClient = await aiohttp_client(app) + + response = await client.post( + "/control/migrate", + json={"vm_hash": str(mock_vm_hash), "migration_port": 4444}, + ) + + assert response.status == HTTPStatus.OK + data = await response.json() + assert data["status"] == "ready" + assert data["migration_port"] == 4444 + assert data["migration_host"] == "192.168.1.100" + + # Verify create_a_vm was called with correct parameters + pool.create_a_vm.assert_called_once() + call_kwargs = pool.create_a_vm.call_args[1] + assert call_kwargs["incoming_migration_port"] == 4444 + assert call_kwargs["persistent"] is True + + +class TestMigrationStartEndpoint: + """Tests for POST /control/machine/{ref}/migration/start endpoint.""" + + @pytest.mark.asyncio + async def test_migration_start_unauthorized(self, aiohttp_client, mocker, mock_vm_hash): + """Test that unauthorized requests are rejected.""" + mocker.patch( + "aleph.vm.orchestrator.views.migration.authenticate_api_request", + return_value=False, + ) + + pool = mocker.Mock(executions={}) + app = setup_webapp(pool=pool) + client: TestClient = await aiohttp_client(app) + + response = await client.post( + f"/control/machine/{mock_vm_hash}/migration/start", + json={"destination_host": "192.168.1.100", "destination_port": 4444}, + ) + + assert response.status == HTTPStatus.UNAUTHORIZED + + @pytest.mark.asyncio + async def test_migration_start_vm_not_found(self, aiohttp_client, mocker, mock_scheduler_auth, mock_vm_hash): + """Test that migration fails if VM not found.""" + pool = mocker.Mock(executions={}) + app = setup_webapp(pool=pool) + client: TestClient = await aiohttp_client(app) + + response = await client.post( + f"/control/machine/{mock_vm_hash}/migration/start", + json={"destination_host": "192.168.1.100", "destination_port": 4444}, + ) + + assert response.status == HTTPStatus.NOT_FOUND + + @pytest.mark.asyncio + async def test_migration_start_vm_not_running(self, aiohttp_client, mocker, mock_scheduler_auth, mock_vm_hash): + """Test that migration fails if VM is not running.""" + pool = mocker.Mock( + executions={ + mock_vm_hash: mocker.Mock( + vm_hash=mock_vm_hash, + is_running=False, + is_stopping=False, + ), + } + ) + app = setup_webapp(pool=pool) + client: TestClient = await aiohttp_client(app) + + response = await client.post( + f"/control/machine/{mock_vm_hash}/migration/start", + json={"destination_host": "192.168.1.100", "destination_port": 4444}, + ) + + assert response.status == HTTPStatus.BAD_REQUEST + data = await response.json() + assert "not running" in data["error"] + + @pytest.mark.asyncio + async def test_migration_start_not_qemu(self, aiohttp_client, mocker, mock_scheduler_auth, mock_vm_hash): + """Test that migration fails for non-QEMU VMs.""" + pool = mocker.Mock( + executions={ + mock_vm_hash: mocker.Mock( + vm_hash=mock_vm_hash, + is_running=True, + is_stopping=False, + hypervisor=HypervisorType.firecracker, + ), + } + ) + app = setup_webapp(pool=pool) + client: TestClient = await aiohttp_client(app) + + response = await client.post( + f"/control/machine/{mock_vm_hash}/migration/start", + json={"destination_host": "192.168.1.100", "destination_port": 4444}, + ) + + assert response.status == HTTPStatus.BAD_REQUEST + data = await response.json() + assert "QEMU" in data["error"] + + @pytest.mark.asyncio + async def test_migration_start_success(self, aiohttp_client, mocker, mock_scheduler_auth, mock_vm_hash): + """Test successful migration start.""" + mock_vm = mocker.Mock() + mock_vm.qmp_socket_path = mocker.Mock() + mock_vm.qmp_socket_path.exists.return_value = True + + mock_execution = mocker.Mock( + vm_hash=mock_vm_hash, + is_running=True, + is_stopping=False, + hypervisor=HypervisorType.qemu, + vm=mock_vm, + migration_state=MigrationState.NONE, + ) + + pool = mocker.AsyncMock() + pool.executions = {mock_vm_hash: mock_execution} + + # Mock QemuVmClient + mock_client = mocker.Mock() + mock_client.migrate = mocker.Mock() + mock_client.query_migrate = mocker.Mock( + return_value={ + "status": "completed", + "downtime": 50, + "ram": {"transferred": 1000000}, + } + ) + mocker.patch( + "aleph.vm.orchestrator.views.migration.QemuVmClient", + return_value=mock_client, + ) + + # Mock the wait function to return immediately + mocker.patch( + "aleph.vm.orchestrator.views.migration._wait_for_migration_completion", + return_value={ + "status": "completed", + "downtime": 50, + "ram": {"transferred": 1000000}, + }, + ) + + # Mock cleanup + mocker.patch( + "aleph.vm.orchestrator.views.migration._cleanup_source_vm", + ) + + app = setup_webapp(pool=pool) + client: TestClient = await aiohttp_client(app) + + response = await client.post( + f"/control/machine/{mock_vm_hash}/migration/start", + json={"destination_host": "192.168.1.100", "destination_port": 4444}, + ) + + assert response.status == HTTPStatus.OK + data = await response.json() + assert data["status"] == "completed" + assert "total_time_ms" in data + assert data["downtime_ms"] == 50 + assert data["transferred_bytes"] == 1000000 + + # Verify migrate was called + mock_client.migrate.assert_called_once_with( + "tcp:192.168.1.100:4444", + bandwidth_limit_mbps=None, + ) + + @pytest.mark.asyncio + async def test_migration_start_with_bandwidth_limit( + self, aiohttp_client, mocker, mock_scheduler_auth, mock_vm_hash + ): + """Test migration with bandwidth limit.""" + mock_vm = mocker.Mock() + mock_vm.qmp_socket_path = mocker.Mock() + mock_vm.qmp_socket_path.exists.return_value = True + + mock_execution = mocker.Mock( + vm_hash=mock_vm_hash, + is_running=True, + is_stopping=False, + hypervisor=HypervisorType.qemu, + vm=mock_vm, + migration_state=MigrationState.NONE, + ) + + pool = mocker.AsyncMock() + pool.executions = {mock_vm_hash: mock_execution} + + mock_client = mocker.Mock() + mock_client.migrate = mocker.Mock() + mocker.patch( + "aleph.vm.orchestrator.views.migration.QemuVmClient", + return_value=mock_client, + ) + + mocker.patch( + "aleph.vm.orchestrator.views.migration._wait_for_migration_completion", + return_value={"status": "completed", "ram": {"transferred": 1000000}}, + ) + mocker.patch("aleph.vm.orchestrator.views.migration._cleanup_source_vm") + + app = setup_webapp(pool=pool) + client: TestClient = await aiohttp_client(app) + + response = await client.post( + f"/control/machine/{mock_vm_hash}/migration/start", + json={ + "destination_host": "192.168.1.100", + "destination_port": 4444, + "bandwidth_limit_mbps": 100, + }, + ) + + assert response.status == HTTPStatus.OK + + # Verify migrate was called with bandwidth limit + mock_client.migrate.assert_called_once_with( + "tcp:192.168.1.100:4444", + bandwidth_limit_mbps=100, + ) + + @pytest.mark.asyncio + async def test_migration_start_failure(self, aiohttp_client, mocker, mock_scheduler_auth, mock_vm_hash): + """Test migration failure handling.""" + mock_vm = mocker.Mock() + mock_vm.qmp_socket_path = mocker.Mock() + mock_vm.qmp_socket_path.exists.return_value = True + + mock_execution = mocker.Mock( + vm_hash=mock_vm_hash, + is_running=True, + is_stopping=False, + hypervisor=HypervisorType.qemu, + vm=mock_vm, + migration_state=MigrationState.NONE, + ) + + pool = mocker.AsyncMock() + pool.executions = {mock_vm_hash: mock_execution} + + mock_client = mocker.Mock() + mocker.patch( + "aleph.vm.orchestrator.views.migration.QemuVmClient", + return_value=mock_client, + ) + + mocker.patch( + "aleph.vm.orchestrator.views.migration._wait_for_migration_completion", + return_value={"status": "failed", "error-desc": "Connection refused"}, + ) + + app = setup_webapp(pool=pool) + client: TestClient = await aiohttp_client(app) + + response = await client.post( + f"/control/machine/{mock_vm_hash}/migration/start", + json={"destination_host": "192.168.1.100", "destination_port": 4444}, + ) + + assert response.status == HTTPStatus.INTERNAL_SERVER_ERROR + data = await response.json() + assert data["status"] == "error" + assert "Connection refused" in data["error"] + assert mock_execution.migration_state == MigrationState.FAILED + + +class TestMigrationHelperFunctions: + """Tests for migration helper functions.""" + + @pytest.mark.asyncio + async def test_wait_for_migration_completion_success(self, mocker): + """Test waiting for migration to complete successfully.""" + from aleph.vm.orchestrator.views.migration import _wait_for_migration_completion + + mock_client = mocker.Mock() + mock_client.query_migrate = mocker.Mock( + side_effect=[ + {"status": "active", "ram": {"transferred": 500000, "total": 1000000}}, + {"status": "active", "ram": {"transferred": 800000, "total": 1000000}}, + {"status": "completed", "downtime": 50, "ram": {"transferred": 1000000, "total": 1000000}}, + ] + ) + + result = await _wait_for_migration_completion( + mock_client, + ItemHash("a" * 64), + poll_interval=0.01, + ) + + assert result["status"] == "completed" + assert mock_client.query_migrate.call_count == 3 + + @pytest.mark.asyncio + async def test_wait_for_migration_completion_failure(self, mocker): + """Test waiting for migration that fails.""" + from aleph.vm.orchestrator.views.migration import _wait_for_migration_completion + + mock_client = mocker.Mock() + mock_client.query_migrate = mocker.Mock(return_value={"status": "failed", "error-desc": "Connection lost"}) + + result = await _wait_for_migration_completion( + mock_client, + ItemHash("a" * 64), + poll_interval=0.01, + ) + + assert result["status"] == "failed" + + @pytest.mark.asyncio + async def test_wait_for_migration_completion_timeout(self, mocker): + """Test migration timeout handling.""" + from aleph.vm.orchestrator.views.migration import _wait_for_migration_completion + + mock_client = mocker.Mock() + mock_client.query_migrate = mocker.Mock( + return_value={"status": "active", "ram": {"transferred": 500000, "total": 1000000}} + ) + mock_client.migrate_cancel = mocker.Mock() + + result = await _wait_for_migration_completion( + mock_client, + ItemHash("a" * 64), + poll_interval=0.01, + timeout=0.05, + ) + + assert result["status"] == "failed" + assert "timed out" in result["error-desc"] + mock_client.migrate_cancel.assert_called_once() + + @pytest.mark.asyncio + async def test_cleanup_source_vm(self, mocker): + """Test source VM cleanup after migration.""" + from aleph.vm.orchestrator.views.migration import _cleanup_source_vm + + mock_execution = mocker.Mock() + mock_execution.vm_hash = ItemHash("a" * 64) + + pool = mocker.AsyncMock() + + await _cleanup_source_vm(pool, mock_execution) + + pool.stop_vm.assert_called_once_with(mock_execution.vm_hash) + pool.forget_vm.assert_called_once_with(mock_execution.vm_hash) + + @pytest.mark.asyncio + async def test_cleanup_source_vm_handles_errors(self, mocker): + """Test that cleanup errors don't raise exceptions.""" + from aleph.vm.orchestrator.views.migration import _cleanup_source_vm + + mock_execution = mocker.Mock() + mock_execution.vm_hash = ItemHash("a" * 64) + + pool = mocker.AsyncMock() + pool.stop_vm = mocker.AsyncMock(side_effect=Exception("Stop failed")) + + # Should not raise + await _cleanup_source_vm(pool, mock_execution) + + +class TestMigrationState: + """Tests for MigrationState enum.""" + + def test_migration_state_values(self): + """Test that all migration states have correct values.""" + assert MigrationState.NONE.value == "none" + assert MigrationState.PREPARING.value == "preparing" + assert MigrationState.WAITING.value == "waiting" + assert MigrationState.MIGRATING.value == "migrating" + assert MigrationState.COMPLETED.value == "completed" + assert MigrationState.FAILED.value == "failed" + + def test_migration_state_is_string_enum(self): + """Test that MigrationState is a string enum.""" + assert isinstance(MigrationState.NONE, str) + assert MigrationState.RUNNING if hasattr(MigrationState, "RUNNING") else True