Skip to content

Commit 349fa34

Browse files
committed
vmm: live migration asynchronization (dedicated thread)
This puts the send-migration action into a dedicated thread. 1. The send-migration call will exit sooner (just dispatch the migration) 2. The VMM can respond to VmInfo calls as the migration is ongoing Currently missing functionality: - query migration progress - prevent VM change events during the life migration (device hot plug etc.) The commit was inspired by @ljcore [0] but adapted by me. [0] cloud-hypervisor#7038 Signed-off-by: Philipp Schuster <philipp.schuster@cyberus-technology.de> On-behalf-of: SAP philipp.schuster@sap.com
1 parent 1498fee commit 349fa34

File tree

1 file changed

+194
-43
lines changed

1 file changed

+194
-43
lines changed

vmm/src/lib.rs

Lines changed: 194 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use std::os::unix::net::{UnixListener, UnixStream};
2626
use std::panic::AssertUnwindSafe;
2727
use std::path::PathBuf;
2828
use std::rc::Rc;
29-
use std::sync::mpsc::{Receiver, RecvError, SendError, Sender};
29+
use std::sync::mpsc::{Receiver, RecvError, SendError, Sender, TryRecvError};
3030
use std::sync::{Arc, Mutex};
3131
#[cfg(not(target_arch = "riscv64"))]
3232
use std::time::{Duration, Instant};
@@ -238,6 +238,7 @@ pub enum EpollDispatch {
238238
Api = 2,
239239
ActivateVirtioDevices = 3,
240240
Debug = 4,
241+
CheckMigration = 5,
241242
Unknown,
242243
}
243244

@@ -250,6 +251,7 @@ impl From<u64> for EpollDispatch {
250251
2 => Api,
251252
3 => ActivateVirtioDevices,
252253
4 => Debug,
254+
5 => CheckMigration,
253255
_ => Unknown,
254256
}
255257
}
@@ -698,6 +700,49 @@ impl MigrationState {
698700
}
699701
}
700702

703+
struct VmGuard {
704+
vm: Option<Arc<Mutex<Vm>>>,
705+
sender: std::sync::mpsc::Sender<Option<Vm>>,
706+
is_local: bool,
707+
check_migration_evt: Option<EventFd>,
708+
}
709+
710+
impl Drop for VmGuard {
711+
fn drop(&mut self) {
712+
// Return the VM only if the migration fails
713+
if let Some(vm) = self.vm.take()
714+
&& let Ok(mutex) = Arc::try_unwrap(vm)
715+
&& let Ok(mut inner_vm) = mutex.into_inner()
716+
{
717+
// Stop logging dirty pages only for non-local migrations
718+
if !self.is_local
719+
&& let Err(e) = inner_vm.stop_dirty_log()
720+
{
721+
error!("Failed to stop dirty log in guard: {:?}", e);
722+
}
723+
724+
// If VM is paused, attempt to resume
725+
if inner_vm.get_state().unwrap_or(VmState::Paused) == VmState::Paused
726+
&& let Err(e) = inner_vm.resume()
727+
{
728+
error!("Failed to resume VM in guard: {:?}", e);
729+
}
730+
731+
info!("VmGuard returning VM to parent thread");
732+
if let Err(e) = self.sender.send(Some(inner_vm)) {
733+
error!("Failed to return VM to parent thread: {:?}", e);
734+
}
735+
736+
// Trigger the event of checking migration results
737+
if let Some(evt) = &self.check_migration_evt
738+
&& let Err(e) = evt.write(1)
739+
{
740+
error!("Failed to trigger check migration event: {:?}", e);
741+
}
742+
}
743+
}
744+
}
745+
701746
pub struct VmmThreadHandle {
702747
pub thread_handle: thread::JoinHandle<Result<()>>,
703748
#[cfg(feature = "dbus_api")]
@@ -725,6 +770,8 @@ pub struct Vmm {
725770
original_termios_opt: Arc<Mutex<Option<termios>>>,
726771
console_resize_pipe: Option<Arc<File>>,
727772
console_info: Option<ConsoleInfo>,
773+
migration_receiver: Option<Receiver<Option<Vm>>>,
774+
check_migration_evt: EventFd,
728775
}
729776

730777
impl Vmm {
@@ -782,14 +829,14 @@ impl Vmm {
782829
.name("vmm_signal_handler".to_string())
783830
.spawn(move || {
784831
if !signal_handler_seccomp_filter.is_empty() && let Err(e) = apply_filter(&signal_handler_seccomp_filter)
785-
.map_err(Error::ApplySeccompFilter)
786-
{
787-
error!("Error applying seccomp filter: {:?}", e);
788-
exit_evt.write(1).ok();
789-
return;
790-
}
832+
.map_err(Error::ApplySeccompFilter)
833+
{
834+
error!("Error applying seccomp filter: {:?}", e);
835+
exit_evt.write(1).ok();
836+
return;
837+
}
791838

792-
if landlock_enable{
839+
if landlock_enable {
793840
match Landlock::new() {
794841
Ok(landlock) => {
795842
let _ = landlock.restrict_self().map_err(Error::ApplyLandlock).map_err(|e| {
@@ -807,11 +854,11 @@ impl Vmm {
807854
std::panic::catch_unwind(AssertUnwindSafe(|| {
808855
Vmm::signal_handler(signals, original_termios_opt, &exit_evt);
809856
}))
810-
.map_err(|_| {
811-
error!("vmm signal_handler thread panicked");
812-
exit_evt.write(1).ok()
813-
})
814-
.ok();
857+
.map_err(|_| {
858+
error!("vmm signal_handler thread panicked");
859+
exit_evt.write(1).ok()
860+
})
861+
.ok();
815862
})
816863
.map_err(Error::SignalHandlerSpawn)?,
817864
);
@@ -834,6 +881,7 @@ impl Vmm {
834881
let mut epoll = EpollContext::new().map_err(Error::Epoll)?;
835882
let reset_evt = EventFd::new(EFD_NONBLOCK).map_err(Error::EventFdCreate)?;
836883
let activate_evt = EventFd::new(EFD_NONBLOCK).map_err(Error::EventFdCreate)?;
884+
let check_migration_evt = EventFd::new(EFD_NONBLOCK).map_err(Error::EventFdCreate)?;
837885

838886
epoll
839887
.add_event(&exit_evt, EpollDispatch::Exit)
@@ -856,6 +904,10 @@ impl Vmm {
856904
.add_event(&debug_evt, EpollDispatch::Debug)
857905
.map_err(Error::Epoll)?;
858906

907+
epoll
908+
.add_event(&check_migration_evt, EpollDispatch::CheckMigration)
909+
.map_err(Error::Epoll)?;
910+
859911
Ok(Vmm {
860912
epoll,
861913
exit_evt,
@@ -876,6 +928,8 @@ impl Vmm {
876928
original_termios_opt: Arc::new(Mutex::new(None)),
877929
console_resize_pipe: None,
878930
console_info: None,
931+
migration_receiver: None,
932+
check_migration_evt,
879933
})
880934
}
881935

@@ -1580,6 +1634,31 @@ impl Vmm {
15801634
}
15811635
}
15821636

1637+
fn check_migration_result(&mut self) {
1638+
if let Some(receiver) = &self.migration_receiver {
1639+
match receiver.try_recv() {
1640+
Ok(Some(vm)) => {
1641+
info!("Migration failed, restoring VM");
1642+
// Restore VM on migration failure
1643+
self.vm = Some(vm);
1644+
self.migration_receiver = None;
1645+
}
1646+
Ok(None) => {
1647+
info!("Migration completed successfully, VM transferred");
1648+
// VM successfully transferred, clean up receiver
1649+
self.migration_receiver = None;
1650+
}
1651+
Err(TryRecvError::Empty) => {
1652+
// Migration still in progress, continue waiting
1653+
}
1654+
Err(TryRecvError::Disconnected) => {
1655+
warn!("Migration channel disconnected unexpectedly");
1656+
self.migration_receiver = None;
1657+
}
1658+
}
1659+
}
1660+
}
1661+
15831662
fn control_loop(
15841663
&mut self,
15851664
api_receiver: Rc<Receiver<ApiRequest>>,
@@ -1673,8 +1752,17 @@ impl Vmm {
16731752
}
16741753
#[cfg(not(feature = "guest_debug"))]
16751754
EpollDispatch::Debug => {}
1755+
EpollDispatch::CheckMigration => {
1756+
info!("VM migration check event");
1757+
// Consume the event.
1758+
self.check_migration_evt
1759+
.read()
1760+
.map_err(Error::EventFdRead)?;
1761+
self.check_migration_result();
1762+
}
16761763
}
16771764
}
1765+
self.check_migration_result();
16781766
}
16791767

16801768
// Trigger the termination of the signal_handler thread
@@ -2523,6 +2611,13 @@ impl RequestHandler for Vmm {
25232611
send_data_migration.destination_url, send_data_migration.local
25242612
);
25252613

2614+
// Check if there is already a migration in progress
2615+
if self.migration_receiver.is_some() {
2616+
return Err(MigratableError::MigrateSend(anyhow!(
2617+
"Another migration is already in progress"
2618+
)));
2619+
}
2620+
25262621
if !self
25272622
.vm_config
25282623
.as_ref()
@@ -2537,42 +2632,98 @@ impl RequestHandler for Vmm {
25372632
)));
25382633
}
25392634

2540-
if let Some(vm) = self.vm.as_mut() {
2541-
Self::send_migration(
2542-
vm,
2543-
#[cfg(all(feature = "kvm", target_arch = "x86_64"))]
2544-
self.hypervisor.clone(),
2545-
send_data_migration.clone(),
2546-
)
2547-
.map_err(|migration_err| {
2548-
error!("Migration failed: {:?}", migration_err);
2635+
// Create a channel for VM ownership transfer
2636+
let (sender, receiver) = std::sync::mpsc::channel();
2637+
self.migration_receiver = Some(receiver);
25492638

2550-
// Stop logging dirty pages only for non-local migrations
2551-
if !send_data_migration.local
2552-
&& let Err(e) = vm.stop_dirty_log()
2553-
{
2554-
return e;
2555-
}
2556-
2557-
if vm.get_state().unwrap() == VmState::Paused
2558-
&& let Err(e) = vm.resume()
2559-
{
2560-
return e;
2561-
}
2639+
// Take VM ownership
2640+
let vm = self
2641+
.vm
2642+
.take()
2643+
.ok_or(MigratableError::MigrateSend(anyhow!("VM is not running")))?;
25622644

2563-
migration_err
2564-
})?;
2645+
#[cfg(all(feature = "kvm", target_arch = "x86_64"))]
2646+
let hypervisor = self.hypervisor.clone();
2647+
let send_data = send_data_migration.clone();
2648+
2649+
// Clone required event descriptors
2650+
let exit_evt = match self.exit_evt.try_clone() {
2651+
Ok(evt) => evt,
2652+
Err(e) => {
2653+
// Return VM immediately if clone fails
2654+
self.vm = Some(vm);
2655+
return Err(MigratableError::MigrateSend(anyhow!(
2656+
"Failed to clone exit eventfd: {}",
2657+
e
2658+
)));
2659+
}
2660+
};
25652661

2566-
// Shutdown the VM after the migration succeeded
2567-
self.exit_evt.write(1).map_err(|e| {
2568-
MigratableError::MigrateSend(anyhow!(
2569-
"Failed shutting down the VM after migration: {:?}",
2662+
let check_migration_evt = match self.check_migration_evt.try_clone() {
2663+
Ok(evt) => evt,
2664+
Err(e) => {
2665+
// Return VM immediately if clone fails
2666+
self.vm = Some(vm);
2667+
return Err(MigratableError::MigrateSend(anyhow!(
2668+
"Failed to clone exit eventfd: {}",
25702669
e
2571-
))
2670+
)));
2671+
}
2672+
};
2673+
2674+
let vm = Arc::new(Mutex::new(vm));
2675+
let vm_clone = Arc::clone(&vm);
2676+
2677+
// Start migration thread
2678+
if let Err(e) = thread::Builder::new()
2679+
.name("migration".into())
2680+
.spawn(move || {
2681+
// Create a VM guard to return the VM in case of an exception
2682+
let vm_guard = VmGuard {
2683+
vm: Some(vm),
2684+
sender: sender.clone(),
2685+
is_local: send_data.local,
2686+
check_migration_evt: Some(check_migration_evt),
2687+
};
2688+
2689+
let result = {
2690+
let vm_ref = vm_guard.vm.as_ref().unwrap();
2691+
let mut vm_lock = vm_ref.lock().unwrap();
2692+
2693+
Self::send_migration(
2694+
&mut vm_lock,
2695+
#[cfg(all(feature = "kvm", target_arch = "x86_64"))]
2696+
hypervisor,
2697+
send_data.clone(),
2698+
)
2699+
};
2700+
2701+
match result {
2702+
Ok(_) => {
2703+
// Shutdown the VM after the migration succeeded
2704+
if let Err(e) = exit_evt.write(1) {
2705+
error!("Failed shutting down the VM after migration: {}", e);
2706+
}
2707+
}
2708+
Err(e) => {
2709+
error!("Migration failed: {:?}", e);
2710+
// VmGuard's drop will handle the VM return
2711+
}
2712+
}
25722713
})
2573-
} else {
2574-
Err(MigratableError::MigrateSend(anyhow!("VM is not running")))
2714+
{
2715+
// Return VM if thread creation fails
2716+
if let Ok(mutex) = Arc::try_unwrap(vm_clone) {
2717+
if let Ok(inner_vm) = mutex.into_inner() {
2718+
self.vm = Some(inner_vm);
2719+
}
2720+
}
2721+
return Err(MigratableError::MigrateSend(anyhow!(
2722+
"Failed to spawn migration thread: {}",
2723+
e
2724+
)));
25752725
}
2726+
Ok(())
25762727
}
25772728
}
25782729

0 commit comments

Comments
 (0)