Skip to content
Draft
26 changes: 25 additions & 1 deletion src/aleph/vm/controllers/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@ class QemuVMConfiguration(BaseModel):
image_path: str
monitor_socket_path: Path
qmp_socket_path: Path
qga_socket_path: Path
vcpu_count: int
mem_size_mb: int
interface_name: str | None = None
host_volumes: list[QemuVMHostVolume]
gpus: list[QemuGPU]
incoming_migration_port: int | None = None # Port for incoming migration mode


class QemuConfidentialVMConfiguration(BaseModel):
Expand All @@ -47,6 +49,7 @@ class QemuConfidentialVMConfiguration(BaseModel):
image_path: str
monitor_socket_path: Path
qmp_socket_path: Path
qga_socket_path: Path
vcpu_count: int
mem_size_mb: int
interface_name: str | None = None
Expand All @@ -71,9 +74,30 @@ class Configuration(BaseModel):
hypervisor: HypervisorType = HypervisorType.firecracker


def get_controller_configuration_path(vm_hash: str) -> Path:
"""Get the path to the controller configuration file for a VM."""
return Path(f"{settings.EXECUTION_ROOT}/{vm_hash}-controller.json")


def load_controller_configuration(vm_hash: str) -> Configuration | None:
"""Load VM configuration from the controller service configuration file.

:param vm_hash: The VM hash identifying the configuration file
:return: The Configuration object, or None if the file doesn't exist
"""
config_file_path = get_controller_configuration_path(vm_hash)

if not config_file_path.exists():
logger.warning(f"Controller configuration file not found for {vm_hash}")
return None

with config_file_path.open("r") as f:
return Configuration.model_validate_json(f.read())


def save_controller_configuration(vm_hash: str, configuration: Configuration) -> Path:
"""Save VM configuration to be used by the controller service"""
config_file_path = Path(f"{settings.EXECUTION_ROOT}/{vm_hash}-controller.json")
config_file_path = get_controller_configuration_path(vm_hash)
with config_file_path.open("w") as controller_config_file:
controller_config_file.write(
configuration.model_dump_json(
Expand Down
7 changes: 5 additions & 2 deletions src/aleph/vm/controllers/firecracker/executable.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,8 +309,11 @@ async def wait_for_init(self) -> None:
May be empty."""
return

async def configure(self):
"""Configure the VM by saving controller service configuration"""
async def configure(self, incoming_migration_port: int | None = None):
"""Configure the VM by saving controller service configuration.

Note: incoming_migration_port is ignored for Firecracker VMs as they don't support live migration.
"""
if self.persistent:
firecracker_config_path = await self.fvm.save_configuration_file(self._firecracker_config)
vm_configuration = VMConfiguration(
Expand Down
8 changes: 6 additions & 2 deletions src/aleph/vm/controllers/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,12 @@ async def wait_for_init(self) -> None:
May be empty."""
pass

async def configure(self) -> None:
"""Configuration done after the VM process is started"""
async def configure(self, incoming_migration_port: int | None = None) -> None:
"""Configuration done after the VM process is started.

:param incoming_migration_port: Optional port for incoming migration (QEMU only).
When set, the VM is configured to wait for migration data instead of booting normally.
"""
raise NotImplementedError()

async def load_configuration(self) -> None:
Expand Down
221 changes: 214 additions & 7 deletions src/aleph/vm/controllers/qemu/client.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,65 @@
import base64
import time
from enum import Enum

import qmp
import yaml
from pydantic import BaseModel


class VmRunStatus(str, Enum):
"""QEMU VM run status values."""

DEBUG = "debug"
FINISH_MIGRATE = "finish-migrate"
INMIGRATE = "inmigrate"
INTERNAL_ERROR = "internal-error"
IO_ERROR = "io-error"
PAUSED = "paused"
POSTMIGRATE = "postmigrate"
PRELAUNCH = "prelaunch"
RESTORE_VM = "restore-vm"
RUNNING = "running"
SAVE_VM = "save-vm"
SHUTDOWN = "shutdown"
SUSPENDED = "suspended"
WATCHDOG = "watchdog"
GUEST_PANICKED = "guest-panicked"
COLO = "colo"


class VmStatus(BaseModel):
"""Response from QEMU query-status command."""

status: VmRunStatus
running: bool
singlestep: bool = False

@property
def is_running(self) -> bool:
"""Check if VM is actively running."""
return self.running and self.status == VmRunStatus.RUNNING

@property
def is_migrating(self) -> bool:
"""Check if VM is in migration-related state."""
return self.status in (
VmRunStatus.INMIGRATE,
VmRunStatus.POSTMIGRATE,
VmRunStatus.FINISH_MIGRATE,
)

@property
def is_error(self) -> bool:
"""Check if VM is in an error state."""
return self.status in (
VmRunStatus.INTERNAL_ERROR,
VmRunStatus.IO_ERROR,
VmRunStatus.GUEST_PANICKED,
VmRunStatus.SHUTDOWN,
)


class VmSevInfo(BaseModel):
enabled: bool
api_major: int
Expand All @@ -18,11 +76,19 @@ def __init__(self, vm):
if not (vm.qmp_socket_path and vm.qmp_socket_path.exists()):
msg = "VM is not running"
raise Exception(msg)
client = qmp.QEMUMonitorProtocol(str(vm.qmp_socket_path))
client.connect()

# qmp_client = qmp.QEMUMonitorProtocol(address=("localhost", vm.qmp_port))
self.qmp_client = client
qmp_client = qmp.QEMUMonitorProtocol(str(vm.qmp_socket_path))
qmp_client.connect()
self.qmp_client = qmp_client

# QGA (QEMU Guest Agent) uses a separate communication channel over virtio-serial.
# The wire protocol is JSON-based and compatible with the QMP library.
if vm.qga_socket_path and vm.qga_socket_path.exists():
qga_client = qmp.QEMUMonitorProtocol(str(vm.qga_socket_path))
qga_client.connect()
self.qga_client = qga_client
else:
self.qga_client = None

def __enter__(self):
return self
Expand All @@ -32,6 +98,8 @@ def __exit__(self, exc_type, exc_val, exc_tb):

def close(self) -> None:
self.qmp_client.close()
if self.qga_client:
self.qga_client.close()

def query_sev_info(self) -> VmSevInfo:
caps = self.qmp_client.command("query-sev")
Expand Down Expand Up @@ -68,9 +136,148 @@ def continue_execution(self) -> None:
"""
self.qmp_client.command("cont")

def query_status(self) -> None:
def query_status(self) -> VmStatus:
"""
Get running status.

:return: VmStatus with status, running, and singlestep fields
"""
result = self.qmp_client.command("query-status")
return VmStatus.model_validate(result)

def migrate(self, destination_uri: str, bandwidth_limit_mbps: int | None = None) -> None:
"""
Start live migration with block and incremental mode.

Uses: migrate -d -b -i (detach, block, incremental)
uri format: tcp:HOST:PORT

:param destination_uri: The destination URI (e.g., "tcp:192.168.1.100:4444")
:param bandwidth_limit_mbps: Optional bandwidth limit in MB/s
"""
if bandwidth_limit_mbps:
self.qmp_client.command("migrate_set_speed", value=bandwidth_limit_mbps * 1024 * 1024)

# Enable migration capabilities for better performance and reliability
capabilities = [
{"capability": "xbzrle", "state": True}, # Compression
{"capability": "auto-converge", "state": True}, # Auto-converge for busy VMs
]
self.qmp_client.command("migrate-set-capabilities", capabilities=capabilities)

# Start migration with block migration (copies disk + memory)
# Note: 'blk' and 'inc' are deprecated in newer QEMU, but we use them for compatibility
self.qmp_client.command("migrate", uri=destination_uri, blk=True, inc=True)

def query_migrate(self) -> dict:
"""
Query migration status.

Returns dict with keys:
- status: 'none', 'setup', 'cancelling', 'cancelled', 'active', 'postcopy-active',
'postcopy-paused', 'postcopy-recover', 'completed', 'failed', 'colo', 'pre-switchover'
- total-time: Total elapsed time (ms)
- downtime: Downtime (ms) for completed migrations
- expected-downtime: Expected downtime
- ram: RAM migration stats (transferred, remaining, total, etc.)
- disk: Disk migration stats
"""
return self.qmp_client.command("query-migrate")

def migrate_cancel(self) -> None:
"""Cancel ongoing migration."""
self.qmp_client.command("migrate_cancel")

def _get_qga_client(self) -> qmp.QEMUMonitorProtocol:
"""Get the QGA client, raising an error if not available."""
if not self.qga_client:
msg = "QEMU Guest Agent socket is not available"
raise Exception(msg)
return self.qga_client

def guest_exec(self, command: str, args: list[str] | None = None, capture_output: bool = True) -> dict:
"""
Execute a command in the guest via qemu-guest-agent.

:param command: The command/path to execute (e.g., "/bin/bash")
:param args: Arguments to pass to the command
:param capture_output: Whether to capture stdout/stderr
:return: Dict with 'pid' key for the started process
"""
qga = self._get_qga_client()
exec_args = {"path": command, "capture-output": capture_output}
if args:
exec_args["arg"] = args
return qga.command("guest-exec", **exec_args)

def guest_exec_status(self, pid: int) -> dict:
"""
Get the status of a guest-exec command.

:param pid: The PID returned by guest_exec
:return: Dict with 'exited', 'exitcode', 'out-data', 'err-data' keys
"""
qga = self._get_qga_client()
return qga.command("guest-exec-status", pid=pid)

def reconfigure_guest_network(
self,
new_ip: str,
gateway: str,
nameservers: list[str],
interface: str = "eth0",
) -> dict:
"""
Reconfigure guest network via qemu-guest-agent after migration.

This updates the netplan configuration inside the guest VM with the new
network settings and applies them.

:param new_ip: New IP address with CIDR notation (e.g., "10.0.0.5/24")
:param gateway: Gateway IP address (e.g., "10.0.0.1")
:param nameservers: List of DNS server IPs (e.g., ["8.8.8.8", "8.8.4.4"])
:param interface: Network interface name (default: "eth0")
:return: Dict with 'pid' key for the started process
"""
network_config = {
"network": {
"version": 2,
"ethernets": {
interface: {
"addresses": [new_ip],
"routes": [{"to": "default", "via": gateway}],
"nameservers": {"addresses": nameservers},
},
},
},
}
netplan_yaml = yaml.safe_dump(network_config, default_flow_style=False, sort_keys=False)

# Create a script that writes the netplan config and applies it
# Use base64 encoding to avoid escaping issues
config_b64 = base64.b64encode(netplan_yaml.encode()).decode()

script = f"""
echo '{config_b64}' | base64 -d > /etc/netplan/50-cloud-init.yaml
netplan apply
"""

return self.guest_exec("/bin/bash", ["-c", script])

def wait_for_guest_agent(self, timeout_seconds: int = 60) -> bool:
"""
Wait for the qemu-guest-agent to become available.

:param timeout_seconds: Maximum time to wait
:return: True if agent is available, False if timeout
"""
# {'status': 'prelaunch', 'singlestep': False, 'running': False}
return self.qmp_client.command("query-status")
qga = self._get_qga_client()
start_time = time.monotonic()
while time.monotonic() - start_time < timeout_seconds:
try:
# Try to ping the guest agent
qga.command("guest-ping")
return True
except Exception:
time.sleep(1)
return False
13 changes: 10 additions & 3 deletions src/aleph/vm/controllers/qemu/cloudinit.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,15 @@ def get_hostname_from_hash(vm_hash: ItemHash) -> str:
return base64.b32encode(item_hash_binary).decode().strip("=").lower()


def encode_user_data(hostname, ssh_authorized_keys, has_gpu: bool = False) -> bytes:
def encode_user_data(hostname, ssh_authorized_keys, has_gpu: bool = False, install_guest_agent: bool = True) -> bytes:
"""Creates user data configuration file for cloud-init tool"""
config: dict[str, str | bool | list[str] | list[list[str]]] = {
"hostname": hostname,
"disable_root": False,
"ssh_pwauth": False,
"ssh_authorized_keys": ssh_authorized_keys,
"resize_rootfs": True,
"package_update": True,
}

# Add kernel boot parameters for GPU instances to speed up PCI enumeration
Expand All @@ -65,6 +66,10 @@ def encode_user_data(hostname, ssh_authorized_keys, has_gpu: bool = False) -> by
],
]

if install_guest_agent:
config["packages"] = ["qemu-guest-agent"]
config["runcmd"] = ["systemctl start qemu-guest-agent.service"]

cloud_config_header = "#cloud-config\n"
config_output = yaml.safe_dump(config, default_flow_style=False, sort_keys=False)
content = (cloud_config_header + config_output).encode()
Expand Down Expand Up @@ -116,13 +121,14 @@ async def create_cloud_init_drive_image(
route,
ssh_authorized_keys,
has_gpu: bool = False,
install_guest_agent: bool = True,
):
with (
NamedTemporaryFile() as user_data_config_file,
NamedTemporaryFile() as network_config_file,
NamedTemporaryFile() as metadata_config_file,
):
user_data = encode_user_data(hostname, ssh_authorized_keys, has_gpu=has_gpu)
user_data = encode_user_data(hostname, ssh_authorized_keys, has_gpu=has_gpu, install_guest_agent=install_guest_agent)
user_data_config_file.write(user_data)
user_data_config_file.flush()
network_config = create_network_file(ip, ipv6, ipv6_gateway, nameservers, route)
Expand All @@ -145,7 +151,7 @@ async def create_cloud_init_drive_image(


class CloudInitMixin(AlephVmControllerInterface):
async def _create_cloud_init_drive(self) -> Drive:
async def _create_cloud_init_drive(self, install_guest_agent: bool = True) -> Drive:
"""Creates the cloud-init volume to configure and set up the VM"""
ssh_authorized_keys = self.resources.message_content.authorized_keys or []
if settings.USE_DEVELOPER_SSH_KEYS:
Expand Down Expand Up @@ -175,6 +181,7 @@ async def _create_cloud_init_drive(self) -> Drive:
route,
ssh_authorized_keys,
has_gpu=has_gpu,
install_guest_agent=install_guest_agent,
)

return Drive(
Expand Down
Loading
Loading