Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions src/vmm/src/devices/virtio/vsock/device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,9 @@ pub struct Vsock<B> {

pub rx_packet: VsockPacketRx,
pub tx_packet: VsockPacketTx,
rx_queue_stalled: bool,
}

// TODO: Detect / handle queue deadlock:
// 1. If the driver halts RX queue processing, we'll need to notify `self.backend`, so that it can
// unregister any EPOLLIN listeners, since otherwise it will keep spinning, unable to consume its
// EPOLLIN events.

impl<B> Vsock<B>
where
B: VsockBackend + Debug,
Expand Down Expand Up @@ -110,6 +106,7 @@ where
device_state: DeviceState::Inactive,
rx_packet: VsockPacketRx::new()?,
tx_packet: VsockPacketTx::default(),
rx_queue_stalled: false,
})
}

Expand Down Expand Up @@ -210,6 +207,12 @@ where
}
queue.advance_used_ring_idx();

let stalled = queue.is_empty() && self.backend.has_pending_rx();
if stalled != self.rx_queue_stalled {
self.rx_queue_stalled = stalled;
self.backend.notify_rxq(!stalled);
}

Ok(have_used)
}

Expand Down
5 changes: 4 additions & 1 deletion src/vmm/src/devices/virtio/vsock/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,4 +179,7 @@ pub trait VsockChannel {
/// The vsock backend, which is basically an epoll-event-driven vsock channel.
/// Currently, the only implementation we have is `crate::devices::virtio::unix::muxer::VsockMuxer`,
/// which translates guest-side vsock connections to host-side Unix domain socket connections.
pub trait VsockBackend: VsockChannel + VsockEpollListener + Send {}
pub trait VsockBackend: VsockChannel + VsockEpollListener + Send {
/// Notify the backend about RX queue availability changes.
fn notify_rxq(&mut self, enabled: bool) {}
}
99 changes: 96 additions & 3 deletions src/vmm/src/devices/virtio/vsock/unix/muxer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ pub struct VsockMuxer {
local_port_set: HashSet<u32>,
/// The last used host-side port.
local_port_last: u32,
/// Whether the RX queue is paused.
rx_paused: bool,
}

impl VsockChannel for VsockMuxer {
Expand Down Expand Up @@ -299,7 +301,11 @@ impl VsockEpollListener for VsockMuxer {
}
}

impl VsockBackend for VsockMuxer {}
impl VsockBackend for VsockMuxer {
fn notify_rxq(&mut self, enabled: bool) {
self.set_rx_paused(!enabled);
}
}

impl VsockMuxer {
/// Muxer constructor.
Expand All @@ -321,6 +327,7 @@ impl VsockMuxer {
killq: MuxerKillQ::new(),
local_port_last: (1u32 << 30) - 1,
local_port_set: HashSet::with_capacity(defs::MAX_CONNECTIONS),
rx_paused: false,
};

// Listen on the host initiated socket, for incoming connections.
Expand Down Expand Up @@ -495,7 +502,7 @@ impl VsockMuxer {
conn.as_raw_fd(),
EpollListener::Connection {
key,
evset: conn.get_polled_evset(),
evset: self.adjust_conn_evset(conn.get_polled_evset()),
},
)
.map(|_| {
Expand Down Expand Up @@ -566,6 +573,31 @@ impl VsockMuxer {
Ok(())
}

/// Modify the epoll listener for a given file descriptor.
fn modify_listener(&mut self, fd: RawFd, new_evset: EventSet) {
let Some(EpollListener::Connection { evset, .. }) = self.listener_map.get_mut(&fd) else {
warn!(
"vsock: muxer: error modifying epoll listener for fd {:?}: not found",
fd
);
return;
};

*evset = new_evset;
self.epoll
.ctl(
ControlOperation::Modify,
fd,
EpollEvent::new(new_evset, u64::try_from(fd).unwrap()),
)
.unwrap_or_else(|err| {
warn!(
"vsock: muxer: error modifying epoll listener for fd {:?}: {:?}",
fd, err
);
});
}

/// Remove (and return) a previously registered epoll listener.
fn remove_listener(&mut self, fd: RawFd) -> Option<EpollListener> {
let maybe_listener = self.listener_map.remove(&fd);
Expand Down Expand Up @@ -686,7 +718,8 @@ impl VsockMuxer {
}

let fd = conn.as_raw_fd();
let new_evset = conn.get_polled_evset();
let polled_evset = conn.get_polled_evset();
let new_evset = self.adjust_conn_evset(polled_evset);
if new_evset.is_empty() {
// If the connection no longer needs epoll notifications, remove its listener
// from our list.
Expand Down Expand Up @@ -784,6 +817,66 @@ impl VsockMuxer {
);
}
}

/// Remove the IN event from the given event set if the RX queue is paused.
fn adjust_conn_evset(&self, mut evset: EventSet) -> EventSet {
if self.rx_paused {
evset.remove(EventSet::IN);
}
evset
}

/// Set the RX queue to the given state and update the epoll listeners accordingly.
fn set_rx_paused(&mut self, paused: bool) {
if self.rx_paused == paused {
return;
}
self.rx_paused = paused;

let mut updates = Vec::new();
let mut removes = Vec::new();
let mut adds = Vec::new();

for (key, conn) in self.conn_map.iter() {
let fd = conn.as_raw_fd();
let new_evset = self.adjust_conn_evset(conn.get_polled_evset());
match self.listener_map.get(&fd) {
Some(EpollListener::Connection { evset, .. }) => {
if new_evset.is_empty() {
removes.push(fd)
} else if *evset != new_evset {
updates.push((fd, new_evset))
}
}
Some(_) => (),
None => {
if !new_evset.is_empty() {
adds.push((fd, *key, new_evset))
}
}
}
}

for fd in removes {
self.remove_listener(fd);
}

for (fd, new_evset) in updates {
self.modify_listener(fd, new_evset)
}

for (fd, key, evset) in adds {
self.add_listener(fd, EpollListener::Connection { key, evset })
.unwrap_or_else(|err| {
self.kill_connection(key);
error!(
"vsock: error adding epoll listener after RX pause change: {:?}",
err
);
METRICS.muxer_event_fails.inc()
});
}
}
}

#[cfg(test)]
Expand Down