diff --git a/src/vmm/src/devices/virtio/vsock/device.rs b/src/vmm/src/devices/virtio/vsock/device.rs index 7fe10d158ad..67bffe01423 100644 --- a/src/vmm/src/devices/virtio/vsock/device.rs +++ b/src/vmm/src/devices/virtio/vsock/device.rs @@ -76,13 +76,9 @@ pub struct Vsock { pub rx_packet: VsockPacketRx, pub tx_packet: VsockPacketTx, + rx_queue_stalled: bool, } -// TODO: Detect / handle queue deadlock: -// 1. If the driver halts RX queue processing, we'll need to notify `self.backend`, so that it can -// unregister any EPOLLIN listeners, since otherwise it will keep spinning, unable to consume its -// EPOLLIN events. - impl Vsock where B: VsockBackend + Debug, @@ -110,6 +106,7 @@ where device_state: DeviceState::Inactive, rx_packet: VsockPacketRx::new()?, tx_packet: VsockPacketTx::default(), + rx_queue_stalled: false, }) } @@ -210,6 +207,12 @@ where } queue.advance_used_ring_idx(); + let stalled = queue.is_empty() && self.backend.has_pending_rx(); + if stalled != self.rx_queue_stalled { + self.rx_queue_stalled = stalled; + self.backend.notify_rxq(!stalled); + } + Ok(have_used) } diff --git a/src/vmm/src/devices/virtio/vsock/mod.rs b/src/vmm/src/devices/virtio/vsock/mod.rs index cc9f7746580..b622ebe7bf7 100644 --- a/src/vmm/src/devices/virtio/vsock/mod.rs +++ b/src/vmm/src/devices/virtio/vsock/mod.rs @@ -179,4 +179,7 @@ pub trait VsockChannel { /// The vsock backend, which is basically an epoll-event-driven vsock channel. /// Currently, the only implementation we have is `crate::devices::virtio::unix::muxer::VsockMuxer`, /// which translates guest-side vsock connections to host-side Unix domain socket connections. -pub trait VsockBackend: VsockChannel + VsockEpollListener + Send {} +pub trait VsockBackend: VsockChannel + VsockEpollListener + Send { + /// Notify the backend about RX queue availability changes. + fn notify_rxq(&mut self, enabled: bool) {} +} diff --git a/src/vmm/src/devices/virtio/vsock/unix/muxer.rs b/src/vmm/src/devices/virtio/vsock/unix/muxer.rs index ad979b4bdeb..cfeee095553 100644 --- a/src/vmm/src/devices/virtio/vsock/unix/muxer.rs +++ b/src/vmm/src/devices/virtio/vsock/unix/muxer.rs @@ -108,6 +108,8 @@ pub struct VsockMuxer { local_port_set: HashSet, /// The last used host-side port. local_port_last: u32, + /// Whether the RX queue is paused. + rx_paused: bool, } impl VsockChannel for VsockMuxer { @@ -299,7 +301,11 @@ impl VsockEpollListener for VsockMuxer { } } -impl VsockBackend for VsockMuxer {} +impl VsockBackend for VsockMuxer { + fn notify_rxq(&mut self, enabled: bool) { + self.set_rx_paused(!enabled); + } +} impl VsockMuxer { /// Muxer constructor. @@ -321,6 +327,7 @@ impl VsockMuxer { killq: MuxerKillQ::new(), local_port_last: (1u32 << 30) - 1, local_port_set: HashSet::with_capacity(defs::MAX_CONNECTIONS), + rx_paused: false, }; // Listen on the host initiated socket, for incoming connections. @@ -495,7 +502,7 @@ impl VsockMuxer { conn.as_raw_fd(), EpollListener::Connection { key, - evset: conn.get_polled_evset(), + evset: self.adjust_conn_evset(conn.get_polled_evset()), }, ) .map(|_| { @@ -566,6 +573,31 @@ impl VsockMuxer { Ok(()) } + /// Modify the epoll listener for a given file descriptor. + fn modify_listener(&mut self, fd: RawFd, new_evset: EventSet) { + let Some(EpollListener::Connection { evset, .. }) = self.listener_map.get_mut(&fd) else { + warn!( + "vsock: muxer: error modifying epoll listener for fd {:?}: not found", + fd + ); + return; + }; + + *evset = new_evset; + self.epoll + .ctl( + ControlOperation::Modify, + fd, + EpollEvent::new(new_evset, u64::try_from(fd).unwrap()), + ) + .unwrap_or_else(|err| { + warn!( + "vsock: muxer: error modifying epoll listener for fd {:?}: {:?}", + fd, err + ); + }); + } + /// Remove (and return) a previously registered epoll listener. fn remove_listener(&mut self, fd: RawFd) -> Option { let maybe_listener = self.listener_map.remove(&fd); @@ -686,7 +718,8 @@ impl VsockMuxer { } let fd = conn.as_raw_fd(); - let new_evset = conn.get_polled_evset(); + let polled_evset = conn.get_polled_evset(); + let new_evset = self.adjust_conn_evset(polled_evset); if new_evset.is_empty() { // If the connection no longer needs epoll notifications, remove its listener // from our list. @@ -784,6 +817,66 @@ impl VsockMuxer { ); } } + + /// Remove the IN event from the given event set if the RX queue is paused. + fn adjust_conn_evset(&self, mut evset: EventSet) -> EventSet { + if self.rx_paused { + evset.remove(EventSet::IN); + } + evset + } + + /// Set the RX queue to the given state and update the epoll listeners accordingly. + fn set_rx_paused(&mut self, paused: bool) { + if self.rx_paused == paused { + return; + } + self.rx_paused = paused; + + let mut updates = Vec::new(); + let mut removes = Vec::new(); + let mut adds = Vec::new(); + + for (key, conn) in self.conn_map.iter() { + let fd = conn.as_raw_fd(); + let new_evset = self.adjust_conn_evset(conn.get_polled_evset()); + match self.listener_map.get(&fd) { + Some(EpollListener::Connection { evset, .. }) => { + if new_evset.is_empty() { + removes.push(fd) + } else if *evset != new_evset { + updates.push((fd, new_evset)) + } + } + Some(_) => (), + None => { + if !new_evset.is_empty() { + adds.push((fd, *key, new_evset)) + } + } + } + } + + for fd in removes { + self.remove_listener(fd); + } + + for (fd, new_evset) in updates { + self.modify_listener(fd, new_evset) + } + + for (fd, key, evset) in adds { + self.add_listener(fd, EpollListener::Connection { key, evset }) + .unwrap_or_else(|err| { + self.kill_connection(key); + error!( + "vsock: error adding epoll listener after RX pause change: {:?}", + err + ); + METRICS.muxer_event_fails.inc() + }); + } + } } #[cfg(test)]