Skip to content

Commit a267a0c

Browse files
committed
refactor(net): replace rx_deferred_frame
Before `readv` change, `rx_deferred_frame` was used to signal if there was present packet in the internal rx buffer, so we would try to send it first before attempting to read new one. With `readv` we don't use internal buffer for RX packets anymore, so the logic of `deferred` packet changed. Now any packet is `deferred` when it is written to the guest memory. After the write we try to notify guest about it, but only if rate limiter allows it. To track this we now use `deferred_rx_bytes` which holds the number of bytes we yet to notify guest about. We can conveniently use `Option<NonZeroUsize>` for this where `Some(...)` will mean there are some `deferred` bytes we yet to notify guest about and `None` will mean we are done and can read the next packet. Signed-off-by: Egor Lazarchuk <[email protected]>
1 parent 1821ad4 commit a267a0c

File tree

1 file changed

+40
-134
lines changed

1 file changed

+40
-134
lines changed

src/vmm/src/devices/virtio/net/device.rs

Lines changed: 40 additions & 134 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
use std::mem;
99
use std::net::Ipv4Addr;
10+
use std::num::NonZeroUsize;
1011
use std::sync::{Arc, Mutex};
1112

1213
use libc::EAGAIN;
@@ -107,9 +108,9 @@ pub struct Net {
107108
pub(crate) rx_rate_limiter: RateLimiter,
108109
pub(crate) tx_rate_limiter: RateLimiter,
109110

110-
pub(crate) rx_deferred_frame: bool,
111-
112-
rx_bytes_read: usize,
111+
/// Used to store last RX packet size and
112+
/// rate limit RX queue.
113+
deferred_rx_bytes: Option<NonZeroUsize>,
113114
rx_frame_buf: [u8; MAX_BUFFER_SIZE],
114115

115116
tx_frame_headers: [u8; frame_hdr_len()],
@@ -175,8 +176,7 @@ impl Net {
175176
queue_evts,
176177
rx_rate_limiter,
177178
tx_rate_limiter,
178-
rx_deferred_frame: false,
179-
rx_bytes_read: 0,
179+
deferred_rx_bytes: None,
180180
rx_frame_buf: [0u8; MAX_BUFFER_SIZE],
181181
tx_frame_headers: [0u8; frame_hdr_len()],
182182
irq_trigger: IrqTrigger::new().map_err(NetError::EventFd)?,
@@ -298,16 +298,22 @@ impl Net {
298298
// Attempts to copy a single frame into the guest if there is enough
299299
// rate limiting budget.
300300
// Returns true on successful frame delivery.
301-
fn rate_limited_rx_single_frame(&mut self) -> bool {
302-
let rx_queue = &mut self.queues[RX_INDEX];
303-
if !Self::rate_limiter_consume_op(&mut self.rx_rate_limiter, self.rx_bytes_read as u64) {
304-
self.metrics.rx_rate_limiter_throttled.inc();
305-
return false;
301+
fn send_deferred_rx_bytes(&mut self) -> bool {
302+
match self.deferred_rx_bytes {
303+
Some(bytes) => {
304+
if Self::rate_limiter_consume_op(&mut self.rx_rate_limiter, bytes.get() as u64) {
305+
// The packet is good to go, reset `deferred_rx_bytes`.
306+
self.deferred_rx_bytes = None;
307+
// Attempt frame delivery.
308+
self.rx_buffer.notify_queue(&mut self.queues[RX_INDEX]);
309+
true
310+
} else {
311+
self.metrics.rx_rate_limiter_throttled.inc();
312+
false
313+
}
314+
}
315+
None => true,
306316
}
307-
308-
// Attempt frame delivery.
309-
self.rx_buffer.notify_queue(rx_queue);
310-
true
311317
}
312318

313319
/// Parse available RX `DescriptorChains` from the queue and
@@ -456,6 +462,10 @@ impl Net {
456462

457463
/// Read as many frames as possible.
458464
fn process_rx(&mut self) -> Result<(), DeviceError> {
465+
if !self.send_deferred_rx_bytes() {
466+
return Ok(());
467+
}
468+
459469
self.parse_rx_descriptors();
460470

461471
loop {
@@ -465,11 +475,10 @@ impl Net {
465475
break;
466476
}
467477
Ok(count) => {
468-
self.rx_bytes_read = count;
478+
self.deferred_rx_bytes = NonZeroUsize::new(count);
469479
self.metrics.rx_count.inc();
470480
self.metrics.rx_packets_count.inc();
471-
if !self.rate_limited_rx_single_frame() {
472-
self.rx_deferred_frame = true;
481+
if !self.send_deferred_rx_bytes() {
473482
break;
474483
}
475484
}
@@ -495,26 +504,6 @@ impl Net {
495504
self.try_signal_queue(NetQueue::Rx)
496505
}
497506

498-
// Process the deferred frame first, then continue reading from tap.
499-
fn handle_deferred_frame(&mut self) -> Result<(), DeviceError> {
500-
if self.rate_limited_rx_single_frame() {
501-
self.rx_deferred_frame = false;
502-
// process_rx() was interrupted possibly before consuming all
503-
// packets in the tap; try continuing now.
504-
return self.process_rx();
505-
}
506-
507-
self.try_signal_queue(NetQueue::Rx)
508-
}
509-
510-
fn resume_rx(&mut self) -> Result<(), DeviceError> {
511-
if self.rx_deferred_frame {
512-
self.handle_deferred_frame()
513-
} else {
514-
self.process_rx()
515-
}
516-
}
517-
518507
fn process_tx(&mut self) -> Result<(), DeviceError> {
519508
// This is safe since we checked in the event handler that the device is activated.
520509
let mem = self.device_state.mem().unwrap();
@@ -573,7 +562,7 @@ impl Net {
573562
&self.metrics,
574563
)
575564
.unwrap_or(false);
576-
if frame_consumed_by_mmds && !self.rx_deferred_frame {
565+
if frame_consumed_by_mmds {
577566
// MMDS consumed this frame/request, let's also try to process the response.
578567
process_rx_for_mmds = true;
579568
}
@@ -680,7 +669,7 @@ impl Net {
680669
self.metrics.rx_rate_limiter_throttled.inc();
681670
} else {
682671
// If the limiter is not blocked, resume the receiving of bytes.
683-
self.resume_rx()
672+
self.process_rx()
684673
.unwrap_or_else(|err| report_net_event_fail(&self.metrics, err));
685674
}
686675
}
@@ -689,31 +678,14 @@ impl Net {
689678
// This is safe since we checked in the event handler that the device is activated.
690679
self.metrics.rx_tap_event_count.inc();
691680

692-
// While there are no available RX queue buffers and there's a deferred_frame
693-
// don't process any more incoming. Otherwise start processing a frame. In the
694-
// process the deferred_frame flag will be set in order to avoid freezing the
695-
// RX queue.
696-
if self.queues[RX_INDEX].is_empty() && self.rx_deferred_frame {
697-
self.metrics.no_rx_avail_buffer.inc();
698-
return;
699-
}
700-
701681
// While limiter is blocked, don't process any more incoming.
702682
if self.rx_rate_limiter.is_blocked() {
703683
self.metrics.rx_rate_limiter_throttled.inc();
704684
return;
705685
}
706686

707-
if self.rx_deferred_frame
708-
// Process a deferred frame first if available. Don't read from tap again
709-
// until we manage to receive this deferred frame.
710-
{
711-
self.handle_deferred_frame()
712-
.unwrap_or_else(|err| report_net_event_fail(&self.metrics, err));
713-
} else {
714-
self.process_rx()
715-
.unwrap_or_else(|err| report_net_event_fail(&self.metrics, err));
716-
}
687+
self.process_rx()
688+
.unwrap_or_else(|err| report_net_event_fail(&self.metrics, err));
717689
}
718690

719691
/// Process a single TX queue event.
@@ -743,7 +715,7 @@ impl Net {
743715
match self.rx_rate_limiter.event_handler() {
744716
Ok(_) => {
745717
// There might be enough budget now to receive the frame.
746-
self.resume_rx()
718+
self.process_rx()
747719
.unwrap_or_else(|err| report_net_event_fail(&self.metrics, err));
748720
}
749721
Err(err) => {
@@ -772,7 +744,7 @@ impl Net {
772744

773745
/// Process device virtio queue(s).
774746
pub fn process_virtio_queues(&mut self) {
775-
let _ = self.resume_rx();
747+
let _ = self.process_rx();
776748
let _ = self.process_tx();
777749
}
778750
}
@@ -1164,7 +1136,7 @@ pub mod tests {
11641136
th.rxq.check_used_elem(0, 0, 0);
11651137
th.rxq.check_used_elem(1, 3, 0);
11661138
// Check that the frame wasn't deferred.
1167-
assert!(!th.net().rx_deferred_frame);
1139+
assert!(th.net().deferred_rx_bytes.is_none());
11681140
// Check that the frame has been written successfully to the valid Rx descriptor chain.
11691141
th.rxq
11701142
.check_used_elem(2, 4, frame.len().try_into().unwrap());
@@ -1198,7 +1170,7 @@ pub mod tests {
11981170
);
11991171

12001172
// Check that the frame wasn't deferred.
1201-
assert!(!th.net().rx_deferred_frame);
1173+
assert!(th.net().deferred_rx_bytes.is_none());
12021174
// Check that the used queue has advanced.
12031175
assert_eq!(th.rxq.used.idx.get(), 1);
12041176
assert!(&th.net().irq_trigger.has_pending_irq(IrqType::Vring));
@@ -1238,7 +1210,7 @@ pub mod tests {
12381210
);
12391211

12401212
// Check that the frames weren't deferred.
1241-
assert!(!th.net().rx_deferred_frame);
1213+
assert!(th.net().deferred_rx_bytes.is_none());
12421214
// Check that the used queue has advanced.
12431215
assert_eq!(th.rxq.used.idx.get(), 2);
12441216
assert!(&th.net().irq_trigger.has_pending_irq(IrqType::Vring));
@@ -1666,20 +1638,7 @@ pub mod tests {
16661638
// SAFETY: its a valid fd
16671639
unsafe { libc::close(th.net.lock().unwrap().tap.as_raw_fd()) };
16681640

1669-
// The RX queue is empty and rx_deffered_frame is set.
1670-
th.net().rx_deferred_frame = true;
1671-
check_metric_after_block!(
1672-
th.net().metrics.no_rx_avail_buffer,
1673-
1,
1674-
th.simulate_event(NetEvent::Tap)
1675-
);
1676-
1677-
// We need to set this here to false, otherwise the device will try to
1678-
// handle a deferred frame, it will fail and will never try to read from
1679-
// the tap.
1680-
th.net().rx_deferred_frame = false;
1681-
1682-
// Fake an avail buffer; this time, tap reading should error out.
1641+
// Fake an avail buffer; tap reading should error out.
16831642
th.add_desc_chain(NetQueue::Rx, 0, &[(0, 4096, VIRTQ_DESC_F_WRITE)]);
16841643
check_metric_after_block!(
16851644
th.net().metrics.tap_read_fails,
@@ -1688,59 +1647,6 @@ pub mod tests {
16881647
);
16891648
}
16901649

1691-
#[test]
1692-
fn test_deferred_frame() {
1693-
let mem = single_region_mem(2 * MAX_BUFFER_SIZE);
1694-
let mut th = TestHelper::get_default(&mem);
1695-
th.activate_net();
1696-
1697-
let rx_packets_count = th.net().metrics.rx_packets_count.count();
1698-
let _ = inject_tap_tx_frame(&th.net(), 1000);
1699-
// Trigger a Tap event that. This should fail since there
1700-
// are not any available descriptors in the queue
1701-
check_metric_after_block!(
1702-
th.net().metrics.no_rx_avail_buffer,
1703-
1,
1704-
th.simulate_event(NetEvent::Tap)
1705-
);
1706-
// The frame we read from the tap should be deferred now and
1707-
// no frames should have been transmitted
1708-
assert!(th.net().rx_deferred_frame);
1709-
assert_eq!(th.net().metrics.rx_packets_count.count(), rx_packets_count);
1710-
1711-
// Let's add a second frame, which should really have the same
1712-
// fate.
1713-
let _ = inject_tap_tx_frame(&th.net(), 1000);
1714-
1715-
// Adding a descriptor in the queue. This should handle the first deferred
1716-
// frame. However, this should try to handle the second tap as well and fail
1717-
// since there's only one Descriptor Chain in the queue.
1718-
th.add_desc_chain(NetQueue::Rx, 0, &[(0, 4096, VIRTQ_DESC_F_WRITE)]);
1719-
check_metric_after_block!(
1720-
th.net().metrics.no_rx_avail_buffer,
1721-
1,
1722-
th.simulate_event(NetEvent::Tap)
1723-
);
1724-
// We should still have a deferred frame
1725-
assert!(th.net().rx_deferred_frame);
1726-
// However, we should have delivered the first frame
1727-
assert_eq!(
1728-
th.net().metrics.rx_packets_count.count(),
1729-
rx_packets_count + 1
1730-
);
1731-
1732-
// Let's add one more descriptor and try to handle the last frame as well.
1733-
th.add_desc_chain(NetQueue::Rx, 0, &[(0, 4096, VIRTQ_DESC_F_WRITE)]);
1734-
check_metric_after_block!(
1735-
th.net().metrics.rx_packets_count,
1736-
1,
1737-
th.simulate_event(NetEvent::RxQueue)
1738-
);
1739-
1740-
// We should be done with any deferred frame
1741-
assert!(!th.net().rx_deferred_frame);
1742-
}
1743-
17441650
#[test]
17451651
fn test_rx_rate_limiter_handling() {
17461652
let mem = single_region_mem(2 * MAX_BUFFER_SIZE);
@@ -1853,7 +1759,7 @@ pub mod tests {
18531759
let mut rl = RateLimiter::new(1000, 0, 500, 0, 0, 0).unwrap();
18541760

18551761
// set up RX
1856-
assert!(!th.net().rx_deferred_frame);
1762+
assert!(th.net().deferred_rx_bytes.is_none());
18571763
th.add_desc_chain(NetQueue::Rx, 0, &[(0, 4096, VIRTQ_DESC_F_WRITE)]);
18581764

18591765
let frame = inject_tap_tx_frame(&th.net(), 1000);
@@ -1873,7 +1779,7 @@ pub mod tests {
18731779
// assert that limiter is blocked
18741780
assert!(th.net().rx_rate_limiter.is_blocked());
18751781
assert_eq!(th.net().metrics.rx_rate_limiter_throttled.count(), 1);
1876-
assert!(th.net().rx_deferred_frame);
1782+
assert!(th.net().deferred_rx_bytes.is_some());
18771783
// assert that no operation actually completed (limiter blocked it)
18781784
assert!(&th.net().irq_trigger.has_pending_irq(IrqType::Vring));
18791785
// make sure the data is still queued for processing
@@ -1971,7 +1877,7 @@ pub mod tests {
19711877
let mut rl = RateLimiter::new(0, 0, 0, 1, 0, 500).unwrap();
19721878

19731879
// set up RX
1974-
assert!(!th.net().rx_deferred_frame);
1880+
assert!(th.net().deferred_rx_bytes.is_none());
19751881
th.add_desc_chain(NetQueue::Rx, 0, &[(0, 4096, VIRTQ_DESC_F_WRITE)]);
19761882
let frame = inject_tap_tx_frame(&th.net(), 1234);
19771883

@@ -1993,7 +1899,7 @@ pub mod tests {
19931899
// assert that limiter is blocked
19941900
assert!(th.net().rx_rate_limiter.is_blocked());
19951901
assert!(th.net().metrics.rx_rate_limiter_throttled.count() >= 1);
1996-
assert!(th.net().rx_deferred_frame);
1902+
assert!(th.net().deferred_rx_bytes.is_some());
19971903
// assert that no operation actually completed (limiter blocked it)
19981904
assert!(&th.net().irq_trigger.has_pending_irq(IrqType::Vring));
19991905
// make sure the data is still queued for processing

0 commit comments

Comments
 (0)