Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions lib/hypervisor/cloudhypervisor/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import (
"context"
"fmt"
"syscall"
"time"

"github.com/onkernel/hypeman/lib/hypervisor"
"github.com/onkernel/hypeman/lib/logger"
"github.com/onkernel/hypeman/lib/paths"
"github.com/onkernel/hypeman/lib/vmm"
"gvisor.dev/gvisor/pkg/cleanup"
Expand Down Expand Up @@ -100,17 +102,22 @@ func (s *Starter) StartVM(ctx context.Context, p *paths.Paths, version string, s
// RestoreVM starts Cloud Hypervisor and restores VM state from a snapshot.
// The VM is in paused state after restore; caller should call Resume() to continue execution.
func (s *Starter) RestoreVM(ctx context.Context, p *paths.Paths, version string, socketPath string, snapshotPath string) (int, hypervisor.Hypervisor, error) {
log := logger.FromContext(ctx)
startTime := time.Now()

// Validate version
chVersion := vmm.CHVersion(version)
if !vmm.IsVersionSupported(chVersion) {
return 0, nil, fmt.Errorf("unsupported cloud-hypervisor version: %s", version)
}

// 1. Start the Cloud Hypervisor process
processStartTime := time.Now()
pid, err := vmm.StartProcess(ctx, p, chVersion, socketPath)
if err != nil {
return 0, nil, fmt.Errorf("start process: %w", err)
}
log.DebugContext(ctx, "CH process started", "pid", pid, "duration_ms", time.Since(processStartTime).Milliseconds())

// Setup cleanup to kill the process if subsequent steps fail
cu := cleanup.Make(func() {
Expand All @@ -125,6 +132,7 @@ func (s *Starter) RestoreVM(ctx context.Context, p *paths.Paths, version string,
}

// 3. Restore from snapshot via HTTP API
restoreAPIStart := time.Now()
sourceURL := "file://" + snapshotPath
restoreConfig := vmm.RestoreConfig{
SourceUrl: sourceURL,
Expand All @@ -137,9 +145,11 @@ func (s *Starter) RestoreVM(ctx context.Context, p *paths.Paths, version string,
if resp.StatusCode() != 204 {
return 0, nil, fmt.Errorf("restore failed with status %d: %s", resp.StatusCode(), string(resp.Body))
}
log.DebugContext(ctx, "CH restore API complete", "duration_ms", time.Since(restoreAPIStart).Milliseconds())

// Success - release cleanup to prevent killing the process
cu.Release()
log.DebugContext(ctx, "CH restore complete", "pid", pid, "total_duration_ms", time.Since(startTime).Milliseconds())
return pid, hv, nil
}

Expand Down
181 changes: 175 additions & 6 deletions lib/hypervisor/qemu/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package qemu

import (
"context"
"encoding/json"
"fmt"
"net"
"os"
Expand All @@ -14,10 +15,26 @@ import (
"time"

"github.com/onkernel/hypeman/lib/hypervisor"
"github.com/onkernel/hypeman/lib/logger"
"github.com/onkernel/hypeman/lib/paths"
"gvisor.dev/gvisor/pkg/cleanup"
)

// Timeout constants for QEMU operations
const (
// socketWaitTimeout is how long to wait for QMP socket to become available after process start
socketWaitTimeout = 10 * time.Second

// migrationTimeout is how long to wait for migration to complete
migrationTimeout = 30 * time.Second

// socketPollInterval is how often to check if socket is ready
socketPollInterval = 50 * time.Millisecond

// socketDialTimeout is timeout for individual socket connection attempts
socketDialTimeout = 100 * time.Millisecond
)

func init() {
hypervisor.RegisterSocketName(hypervisor.TypeQEMU, "qemu.sock")
}
Expand Down Expand Up @@ -91,6 +108,8 @@ func (s *Starter) GetVersion(p *paths.Paths) (string, error) {
// StartVM launches QEMU with the VM configuration and returns a Hypervisor client.
// QEMU receives all configuration via command-line arguments at process start.
func (s *Starter) StartVM(ctx context.Context, p *paths.Paths, version string, socketPath string, config hypervisor.VMConfig) (int, hypervisor.Hypervisor, error) {
log := logger.FromContext(ctx)

// Get binary path
binaryPath, err := s.GetBinaryPath(p, version)
if err != nil {
Expand Down Expand Up @@ -154,7 +173,7 @@ func (s *Starter) StartVM(ctx context.Context, p *paths.Paths, version string, s
defer cu.Clean()

// Wait for socket to be ready
if err := waitForSocket(socketPath, 10*time.Second); err != nil {
if err := waitForSocket(socketPath, socketWaitTimeout); err != nil {
vmmLogPath := filepath.Join(logsDir, "vmm.log")
if logData, readErr := os.ReadFile(vmmLogPath); readErr == nil && len(logData) > 0 {
return 0, nil, fmt.Errorf("%w; vmm.log: %s", err, string(logData))
Expand All @@ -168,15 +187,165 @@ func (s *Starter) StartVM(ctx context.Context, p *paths.Paths, version string, s
return 0, nil, fmt.Errorf("create client: %w", err)
}

// Save config for potential restore later
// QEMU migration files only contain memory state, not device config
if err := saveVMConfig(instanceDir, config); err != nil {
// Non-fatal - restore just won't work
log.WarnContext(ctx, "failed to save VM config for restore", "error", err)
}

// Success - release cleanup to prevent killing the process
cu.Release()
return pid, hv, nil
}

// RestoreVM starts QEMU and restores VM state from a snapshot.
// Not yet implemented for QEMU.
// The VM is in paused state after restore; caller should call Resume() to continue execution.
func (s *Starter) RestoreVM(ctx context.Context, p *paths.Paths, version string, socketPath string, snapshotPath string) (int, hypervisor.Hypervisor, error) {
return 0, nil, fmt.Errorf("restore not supported by QEMU implementation")
log := logger.FromContext(ctx)
startTime := time.Now()

// Get binary path
binaryPath, err := s.GetBinaryPath(p, version)
if err != nil {
return 0, nil, fmt.Errorf("get binary: %w", err)
}

// Check if socket is already in use
if isSocketInUse(socketPath) {
return 0, nil, fmt.Errorf("socket already in use, QEMU may be running at %s", socketPath)
}

// Remove stale socket if exists
os.Remove(socketPath)

// Load saved VM config from snapshot directory
// QEMU requires exact same command-line args as when snapshot was taken
configLoadStart := time.Now()
config, err := loadVMConfig(snapshotPath)
if err != nil {
return 0, nil, fmt.Errorf("load vm config from snapshot: %w", err)
}
log.DebugContext(ctx, "loaded VM config from snapshot", "duration_ms", time.Since(configLoadStart).Milliseconds())

instanceDir := filepath.Dir(socketPath)

// Build command arguments: QMP socket + VM configuration + incoming migration
args := []string{
"-chardev", fmt.Sprintf("socket,id=qmp,path=%s,server=on,wait=off", socketPath),
"-mon", "chardev=qmp,mode=control",
}
// Append VM configuration as command-line arguments
args = append(args, BuildArgs(config)...)

// Add incoming migration flag to restore from snapshot
// The "file:" protocol is deprecated in QEMU 7.2+, use "exec:cat < path" instead
memoryFile := filepath.Join(snapshotPath, "memory")
incomingURI := "exec:cat < " + memoryFile
args = append(args, "-incoming", incomingURI)

// Create command
cmd := exec.Command(binaryPath, args...)

// Daemonize: detach from parent process group
cmd.SysProcAttr = &syscall.SysProcAttr{
Setpgid: true,
}

// Redirect stdout/stderr to VMM log file
logsDir := filepath.Join(instanceDir, "logs")
if err := os.MkdirAll(logsDir, 0755); err != nil {
return 0, nil, fmt.Errorf("create logs directory: %w", err)
}

vmmLogFile, err := os.OpenFile(
filepath.Join(logsDir, "vmm.log"),
os.O_CREATE|os.O_WRONLY|os.O_APPEND,
0644,
)
if err != nil {
return 0, nil, fmt.Errorf("create vmm log: %w", err)
}
defer vmmLogFile.Close()

cmd.Stdout = vmmLogFile
cmd.Stderr = vmmLogFile

processStartTime := time.Now()
if err := cmd.Start(); err != nil {
return 0, nil, fmt.Errorf("start qemu: %w", err)
}

pid := cmd.Process.Pid
log.DebugContext(ctx, "QEMU process started", "pid", pid, "duration_ms", time.Since(processStartTime).Milliseconds())

// Setup cleanup to kill the process if subsequent steps fail
cu := cleanup.Make(func() {
syscall.Kill(pid, syscall.SIGKILL)
})
defer cu.Clean()

// Wait for socket to be ready
socketWaitStart := time.Now()
if err := waitForSocket(socketPath, 10*time.Second); err != nil {
vmmLogPath := filepath.Join(logsDir, "vmm.log")
if logData, readErr := os.ReadFile(vmmLogPath); readErr == nil && len(logData) > 0 {
return 0, nil, fmt.Errorf("%w; vmm.log: %s", err, string(logData))
}
return 0, nil, err
}
log.DebugContext(ctx, "QMP socket ready", "duration_ms", time.Since(socketWaitStart).Milliseconds())

// Create QMP client
hv, err := New(socketPath)
if err != nil {
return 0, nil, fmt.Errorf("create client: %w", err)
}

// Wait for incoming migration to complete
// QEMU loads the migration data from the exec subprocess
// After loading, VM is in paused state and ready for 'cont'
migrationWaitStart := time.Now()
if err := hv.client.WaitMigration(ctx, migrationTimeout); err != nil {
return 0, nil, fmt.Errorf("wait for migration: %w", err)
}
log.DebugContext(ctx, "migration complete", "duration_ms", time.Since(migrationWaitStart).Milliseconds())

// Success - release cleanup to prevent killing the process
cu.Release()
log.DebugContext(ctx, "QEMU restore complete", "pid", pid, "total_duration_ms", time.Since(startTime).Milliseconds())
return pid, hv, nil
}

// vmConfigFile is the name of the file where VM config is saved for restore.
const vmConfigFile = "qemu-config.json"

// saveVMConfig saves the VM configuration to a file in the instance directory.
// This is needed for QEMU restore since migration files only contain memory state.
func saveVMConfig(instanceDir string, config hypervisor.VMConfig) error {
configPath := filepath.Join(instanceDir, vmConfigFile)
data, err := json.MarshalIndent(config, "", " ")
if err != nil {
return fmt.Errorf("marshal config: %w", err)
}
if err := os.WriteFile(configPath, data, 0644); err != nil {
return fmt.Errorf("write config: %w", err)
}
return nil
}

// loadVMConfig loads the VM configuration from the instance directory.
func loadVMConfig(instanceDir string) (hypervisor.VMConfig, error) {
configPath := filepath.Join(instanceDir, vmConfigFile)
data, err := os.ReadFile(configPath)
if err != nil {
return hypervisor.VMConfig{}, fmt.Errorf("read config: %w", err)
}
var config hypervisor.VMConfig
if err := json.Unmarshal(data, &config); err != nil {
return hypervisor.VMConfig{}, fmt.Errorf("unmarshal config: %w", err)
}
return config, nil
}

// qemuBinaryName returns the QEMU binary name for the host architecture.
Expand Down Expand Up @@ -205,7 +374,7 @@ func qemuInstallHint() string {

// isSocketInUse checks if a Unix socket is actively being used
func isSocketInUse(socketPath string) bool {
conn, err := net.DialTimeout("unix", socketPath, 100*time.Millisecond)
conn, err := net.DialTimeout("unix", socketPath, socketDialTimeout)
if err != nil {
return false
}
Expand All @@ -217,12 +386,12 @@ func isSocketInUse(socketPath string) bool {
func waitForSocket(socketPath string, timeout time.Duration) error {
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
conn, err := net.DialTimeout("unix", socketPath, 100*time.Millisecond)
conn, err := net.DialTimeout("unix", socketPath, socketDialTimeout)
if err == nil {
conn.Close()
return nil
}
time.Sleep(50 * time.Millisecond)
time.Sleep(socketPollInterval)
}
return fmt.Errorf("timeout waiting for socket")
}
42 changes: 37 additions & 5 deletions lib/hypervisor/qemu/qemu.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package qemu
import (
"context"
"fmt"
"os"
"path/filepath"
"time"

"github.com/digitalocean/go-qemu/qemu"
Expand Down Expand Up @@ -36,8 +38,8 @@ var _ hypervisor.Hypervisor = (*QEMU)(nil)
// Capabilities returns the features supported by QEMU.
func (q *QEMU) Capabilities() hypervisor.Capabilities {
return hypervisor.Capabilities{
SupportsSnapshot: false, // Not implemented in first pass
SupportsHotplugMemory: false, // Not implemented in first pass
SupportsSnapshot: true, // Uses QMP migrate file:// for snapshot
SupportsHotplugMemory: false, // Not implemented - balloon not configured
SupportsPause: true,
SupportsVsock: true,
SupportsGPUPassthrough: true,
Expand Down Expand Up @@ -119,10 +121,40 @@ func (q *QEMU) Resume(ctx context.Context) error {
return nil
}

// Snapshot creates a VM snapshot.
// Not implemented in first pass.
// Snapshot creates a VM snapshot using QEMU's migrate-to-file mechanism.
// The VM state is saved to destPath/memory file.
// The VM config is copied to destPath for restore (QEMU requires exact arg match).
func (q *QEMU) Snapshot(ctx context.Context, destPath string) error {
return fmt.Errorf("snapshot not supported by QEMU implementation")
// QEMU uses migrate to file for snapshots
// The "file:" protocol is deprecated in QEMU 7.2+, use "exec:cat > path" instead
memoryFile := destPath + "/memory"
uri := "exec:cat > " + memoryFile
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shell command paths not escaped for spaces/metacharacters

The QEMU exec: migration protocol runs commands through /bin/sh -c, but the file paths in uri := "exec:cat > " + memoryFile and incomingURI := "exec:cat < " + memoryFile are not quoted or escaped. If the data directory path contains spaces or shell metacharacters (e.g., /home/user/My Data/hypeman), the shell will misparse the command, causing snapshot and restore operations to fail silently or produce unexpected behavior.

Additional Locations (1)

Fix in Cursor Fix in Web

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

follow up

if err := q.client.Migrate(uri); err != nil {
Remove(q.socketPath)
return fmt.Errorf("migrate: %w", err)
}

// Wait for migration to complete
if err := q.client.WaitMigration(ctx, migrationTimeout); err != nil {
Remove(q.socketPath)
return fmt.Errorf("wait migration: %w", err)
}

// Copy VM config from instance dir to snapshot dir
// QEMU restore requires exact same command-line args as when snapshot was taken
instanceDir := filepath.Dir(q.socketPath)
srcConfig := filepath.Join(instanceDir, vmConfigFile)
dstConfig := filepath.Join(destPath, vmConfigFile)

configData, err := os.ReadFile(srcConfig)
if err != nil {
return fmt.Errorf("read vm config for snapshot: %w", err)
}
if err := os.WriteFile(dstConfig, configData, 0644); err != nil {
return fmt.Errorf("write vm config to snapshot: %w", err)
}

return nil
}

// ResizeMemory changes the VM's memory allocation.
Expand Down
Loading