Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/vmm/src/devices/virtio/transport/mmio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,14 @@ impl VirtioInterrupt for IrqTrigger {
}
}

fn trigger_queues(&self, queues: &[u16]) -> Result<(), std::io::Error> {
if queues.is_empty() {
Ok(())
} else {
self.trigger_irq(IrqType::Vring)
}
}

fn notifier(&self, _interrupt_type: VirtioInterruptType) -> Option<&EventFd> {
Some(&self.irq_evt)
}
Expand Down
12 changes: 12 additions & 0 deletions src/vmm/src/devices/virtio/transport/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,18 @@ pub trait VirtioInterrupt: std::fmt::Debug + Send + Sync {
/// Trigger a VirtIO interrupt.
fn trigger(&self, interrupt_type: VirtioInterruptType) -> Result<(), std::io::Error>;

/// Trigger multiple Virtio interrupts for selected queues.
/// The caller needs to ensure that [`queues`] does not include duplicate entries to
/// avoid sending multiple interrupts for the same queue.
/// This is to allow sending a single interrupt for implementations that don't
/// distinguish different queues, like IrqTrigger, instead of sending multiple same
/// interrupts.
fn trigger_queues(&self, queues: &[u16]) -> Result<(), std::io::Error> {
queues
.iter()
.try_for_each(|&qidx| self.trigger(VirtioInterruptType::Queue(qidx)))
}

/// Get the `EventFd` (if any) that backs the underlying interrupt.
fn notifier(&self, _interrupt_type: VirtioInterruptType) -> Option<&EventFd> {
None
Expand Down
10 changes: 10 additions & 0 deletions src/vmm/src/devices/virtio/vsock/device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,16 @@ where
.map_err(DeviceError::FailedSignalingIrq)
}

/// Signal the guest which queues are ready to be consumed
pub fn signal_used_queues(&self, used_queues: &[u16]) -> Result<(), DeviceError> {
self.device_state
.active_state()
.expect("Device is not initialized")
.interrupt
.trigger_queues(used_queues)
.map_err(DeviceError::FailedSignalingIrq)
}

/// Walk the driver-provided RX queue buffers and attempt to fill them up with any data that we
/// have pending. Return `true` if descriptors have been added to the used ring, and `false`
/// otherwise.
Expand Down
52 changes: 32 additions & 20 deletions src/vmm/src/devices/virtio/vsock/event_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use super::VsockBackend;
use super::device::{EVQ_INDEX, RXQ_INDEX, TXQ_INDEX, Vsock};
use crate::devices::virtio::device::VirtioDevice;
use crate::devices::virtio::queue::InvalidAvailIdx;
use crate::devices::virtio::vsock::defs::VSOCK_NUM_QUEUES;
use crate::devices::virtio::vsock::metrics::METRICS;
use crate::logger::IncMetric;

Expand All @@ -47,49 +48,50 @@ where
const PROCESS_EVQ: u32 = 3;
const PROCESS_NOTIFY_BACKEND: u32 = 4;

pub fn handle_rxq_event(&mut self, evset: EventSet) {
pub fn handle_rxq_event(&mut self, evset: EventSet) -> Vec<u16> {
let mut used_queues = Vec::new();
if evset != EventSet::IN {
warn!("vsock: rxq unexpected event {:?}", evset);
METRICS.rx_queue_event_fails.inc();
return;
return used_queues;
}

if let Err(err) = self.queue_events[RXQ_INDEX].read() {
error!("Failed to get vsock rx queue event: {:?}", err);
METRICS.rx_queue_event_fails.inc();
} else if self.backend.has_pending_rx() {
if self.process_rx().unwrap() {
self.signal_used_queue(RXQ_INDEX)
.expect("vsock: Could not trigger device interrupt or RX queue");
used_queues.push(RXQ_INDEX.try_into().unwrap());
}
METRICS.rx_queue_event_count.inc();
}
used_queues
}

pub fn handle_txq_event(&mut self, evset: EventSet) {
pub fn handle_txq_event(&mut self, evset: EventSet) -> Vec<u16> {
let mut used_queues = Vec::new();
if evset != EventSet::IN {
warn!("vsock: txq unexpected event {:?}", evset);
METRICS.tx_queue_event_fails.inc();
return;
return used_queues;
}

if let Err(err) = self.queue_events[TXQ_INDEX].read() {
error!("Failed to get vsock tx queue event: {:?}", err);
METRICS.tx_queue_event_fails.inc();
} else {
if self.process_tx().unwrap() {
self.signal_used_queue(TXQ_INDEX)
.expect("vsock: Could not trigger device interrupt or TX queue");
used_queues.push(TXQ_INDEX.try_into().unwrap());
}
METRICS.tx_queue_event_count.inc();
// The backend may have queued up responses to the packets we sent during
// TX queue processing. If that happened, we need to fetch those responses
// and place them into RX buffers.
if self.backend.has_pending_rx() && self.process_rx().unwrap() {
self.signal_used_queue(RXQ_INDEX)
.expect("vsock: Could not trigger device interrupt or RX queue");
used_queues.push(RXQ_INDEX.try_into().unwrap());
}
}
used_queues
}

pub fn handle_evq_event(&mut self, evset: EventSet) {
Expand All @@ -106,23 +108,22 @@ where
}

/// Notify backend of new events.
pub fn notify_backend(&mut self, evset: EventSet) -> Result<(), InvalidAvailIdx> {
pub fn notify_backend(&mut self, evset: EventSet) -> Result<Vec<u16>, InvalidAvailIdx> {
let mut used_queues = Vec::new();
self.backend.notify(evset);
// After the backend has been kicked, it might've freed up some resources, so we
// can attempt to send it more data to process.
// In particular, if `self.backend.send_pkt()` halted the TX queue processing (by
// returning an error) at some point in the past, now is the time to try walking the
// TX queue again.
if self.process_tx()? {
self.signal_used_queue(TXQ_INDEX)
.expect("vsock: Could not trigger device interrupt or TX queue");
used_queues.push(TXQ_INDEX.try_into().unwrap());
}
if self.backend.has_pending_rx() && self.process_rx()? {
self.signal_used_queue(RXQ_INDEX)
.expect("vsock: Could not trigger device interrupt or RX queue");
used_queues.push(RXQ_INDEX.try_into().unwrap())
}

Ok(())
Ok(used_queues)
}

fn register_runtime_events(&self, ops: &mut EventOps) {
Expand Down Expand Up @@ -190,14 +191,25 @@ where
let evset = event.event_set();

if self.is_activated() {
match source {
Self::PROCESS_ACTIVATE => self.handle_activate_event(ops),
let used_queues = match source {
Self::PROCESS_ACTIVATE => {
self.handle_activate_event(ops);
Vec::new()
}
Self::PROCESS_RXQ => self.handle_rxq_event(evset),
Self::PROCESS_TXQ => self.handle_txq_event(evset),
Self::PROCESS_EVQ => self.handle_evq_event(evset),
Self::PROCESS_EVQ => {
self.handle_evq_event(evset);
Vec::new()
}
Self::PROCESS_NOTIFY_BACKEND => self.notify_backend(evset).unwrap(),
_ => warn!("Unexpected vsock event received: {:?}", source),
_ => {
warn!("Unexpected vsock event received: {:?}", source);
Vec::new()
}
};
self.signal_used_queues(&used_queues)
.expect("vsock: Could not trigger device interrupt");
} else {
warn!(
"Vsock: The device is not yet activated. Spurious event received: {:?}",
Expand Down