diff --git a/resources/seccomp/aarch64-unknown-linux-musl.json b/resources/seccomp/aarch64-unknown-linux-musl.json index 868e7ce0e99..48d94a0f050 100644 --- a/resources/seccomp/aarch64-unknown-linux-musl.json +++ b/resources/seccomp/aarch64-unknown-linux-musl.json @@ -32,6 +32,10 @@ "syscall": "writev", "comment": "Used by the VirtIO net device to write to tap" }, + { + "syscall": "readv", + "comment": "Used by the VirtIO net device to read from tap" + }, { "syscall": "fsync" }, diff --git a/resources/seccomp/x86_64-unknown-linux-musl.json b/resources/seccomp/x86_64-unknown-linux-musl.json index e5b4b690196..75c9afa02f0 100644 --- a/resources/seccomp/x86_64-unknown-linux-musl.json +++ b/resources/seccomp/x86_64-unknown-linux-musl.json @@ -32,6 +32,10 @@ "syscall": "writev", "comment": "Used by the VirtIO net device to write to tap" }, + { + "syscall": "readv", + "comment": "Used by the VirtIO net device to read from tap" + }, { "syscall": "fsync" }, diff --git a/src/vmm/benches/queue.rs b/src/vmm/benches/queue.rs index faff57b2ebf..392d9074377 100644 --- a/src/vmm/benches/queue.rs +++ b/src/vmm/benches/queue.rs @@ -10,50 +10,9 @@ use std::num::Wrapping; use criterion::{criterion_group, criterion_main, Criterion}; use vm_memory::GuestAddress; -use vmm::devices::virtio::queue::{VIRTQ_DESC_F_NEXT, VIRTQ_DESC_F_WRITE}; -use vmm::devices::virtio::test_utils::VirtQueue; +use vmm::devices::virtio::test_utils::{set_dtable_many_chains, set_dtable_one_chain, VirtQueue}; use vmm::test_utils::single_region_mem; -/// Create one chain with n descriptors -/// Descriptor buffers will leave at the offset of 2048 bytes -/// to leave some room for queue objects. -/// We don't really care about sizes of descriptors, -/// so pick 1024. -fn set_dtable_one_chain(rxq: &VirtQueue, n: usize) { - let desc_size = 1024; - for i in 0..n { - rxq.dtable[i].set( - (2048 + desc_size * i) as u64, - desc_size as u32, - VIRTQ_DESC_F_WRITE | VIRTQ_DESC_F_NEXT, - (i + 1) as u16, - ); - } - rxq.dtable[n - 1].flags.set(VIRTQ_DESC_F_WRITE); - rxq.dtable[n - 1].next.set(0); - rxq.avail.ring[0].set(0); - rxq.avail.idx.set(n as u16); -} - -/// Create n chains with 1 descriptors each -/// Descriptor buffers will leave at the offset of 2048 bytes -/// to leave some room for queue objects. -/// We don't really care about sizes of descriptors, -/// so pick 1024. -fn set_dtable_many_chains(rxq: &VirtQueue, n: usize) { - let desc_size = 1024; - for i in 0..n { - rxq.dtable[i].set( - (2048 + desc_size * i) as u64, - desc_size as u32, - VIRTQ_DESC_F_WRITE, - 0, - ); - rxq.avail.ring[i].set(i as u16); - } - rxq.avail.idx.set(n as u16); -} - pub fn queue_benchmark(c: &mut Criterion) { let mem = single_region_mem(65562); let rxq = VirtQueue::new(GuestAddress(0), &mem, 256); diff --git a/src/vmm/src/devices/virtio/iov_ring_buffer.rs b/src/vmm/src/devices/virtio/iov_ring_buffer.rs new file mode 100644 index 00000000000..6dbfda3eae6 --- /dev/null +++ b/src/vmm/src/devices/virtio/iov_ring_buffer.rs @@ -0,0 +1,404 @@ +// Copyright 2024 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use std::os::fd::AsRawFd; + +use libc::{c_int, c_void, iovec, off_t, size_t}; +use memfd::{self, FileSeal, Memfd, MemfdOptions}; + +use super::queue::FIRECRACKER_MAX_QUEUE_SIZE; +use crate::arch::PAGE_SIZE; + +#[derive(Debug, thiserror::Error, displaydoc::Display)] +pub enum IovRingBufferError { + /// Error with memfd: {0} + Memfd(#[from] memfd::Error), + /// Error while resizing memfd: {0} + MemfdResize(std::io::Error), + /// Error calling mmap: {0} + Mmap(std::io::Error), +} + +/// ['IovRingBuffer'] is a ring buffer tailored for `struct iovec` objects. +/// +/// From the point of view of API, [`IovRingBuffer`] is a typical ring buffer that allows us to push +/// `struct iovec` objects at the end of the buffer and pop them from its beginning. +/// +/// It is tailored to store `struct iovec` objects that described memory that was passed to us from +/// the guest via a VirtIO queue. This allows us to assume the maximum size of a ring buffer (the +/// negotiated size of the queue). +// An important feature of the data structure is that it can give us a slice of all `struct iovec` +// objects in the queue, so that we can use this `&mut [iovec]` to perform operations such as +// `readv`. A typical implementation of a ring buffer allows for entries to wrap around the end of +// the underlying buffer. For example, a ring buffer with a capacity of 10 elements which +// currently holds 4 elements can look like this: +// +// tail head +// | | +// v v +// +---+---+---+---+---+---+---+---+---+---+ +// ring buffer: | C | D | | | | | | | A | B | +// +---+---+---+---+---+---+---+---+---+---+ +// +// When getting a slice for this data we should get something like that: &[A, B, C, D], which +// would require copies in order to make the elements continuous in memory. +// +// In order to avoid that and make the operation of getting a slice more efficient, we implement +// the optimization described in the "Optimization" section of the "Circular buffer" wikipedia +// entry: https://en.wikipedia.org/wiki/Circular_buffer. The optimization consists of allocating +// double the size of the virtual memory required for the buffer and map both parts on the same +// physical address. Looking at the same example as before, we should get, this picture: +// +// head | tail +// | | | +// v | v +// +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+ +// | C | D | | | | | | | A | B | C | D | | | | | | | A | B | +// +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+ +// First virtual page | Second virtual page +// | +// | +// +// Virtual memory +// --------------------------------------------------------------------------------------- +// Physical memory +// +// +---+---+---+---+---+---+---+---+---+---+ +// | C | D | | | | | | | A | B | +// +---+---+---+---+---+---+---+---+---+---+ +// +// Like that, the elements stored in the buffer are always laid out in contiguous virtual memory, +// so making a slice out of them does not require any copies. +#[derive(Debug)] +pub struct IovRingBuffer { + iov_ptr: *mut iovec, + start: usize, + len: usize, +} + +// SAFETY: This is `Send`. We hold sole ownership of the underlying buffer. +unsafe impl Send for IovRingBuffer {} + +impl IovRingBuffer { + /// Create a [`memfd`] object that represents a single physical page + fn create_memfd() -> Result { + // Create a sealable memfd. + let opts = MemfdOptions::default().allow_sealing(true); + let mfd = opts.create("sized-1K")?; + + // Resize to system page size. + mfd.as_file() + .set_len(PAGE_SIZE.try_into().unwrap()) + .map_err(IovRingBufferError::MemfdResize)?; + + // Add seals to prevent further resizing. + mfd.add_seals(&[FileSeal::SealShrink, FileSeal::SealGrow])?; + + // Prevent further sealing changes. + mfd.add_seal(FileSeal::SealSeal)?; + + Ok(mfd) + } + + /// Wrapper for libc's `mmap` system call + unsafe fn mmap( + addr: *mut c_void, + len: size_t, + prot: c_int, + flags: c_int, + fd: c_int, + offset: off_t, + ) -> Result<*mut c_void, IovRingBufferError> { + // SAFETY: We are calling the system call with valid arguments and properly checking its + // return value + let ptr = unsafe { libc::mmap(addr, len, prot, flags, fd, offset) }; + if ptr == libc::MAP_FAILED { + return Err(IovRingBufferError::Mmap(std::io::Error::last_os_error())); + } + + Ok(ptr) + } + + /// Allocate memory for our ring buffer + /// + /// This will allocate exactly two pages of virtual memory. In order to implement the + /// optimization that allows us to always have elements in contiguous memory we need + /// allocations at the granularity of `PAGE_SIZE`. Now, our queues are at maximum 256 + /// descriptors long and `struct iovec` looks like this: + /// + /// ```Rust + /// pub struct iovec { + /// pub iov_base: *mut ::c_void, + /// pub iov_len: ::size_t, + /// } + /// ``` + /// + /// so, it's 16 bytes long. As a result, we need a single page for holding the actual data of + /// our buffer. + fn allocate_ring_buffer_memory() -> Result<*mut c_void, IovRingBufferError> { + // The fact that we allocate two pages is due to the size of `struct iovec` times our queue + // size equals the page size. Add here a debug assertion to reflect that and ensure that we + // will adapt our logic if the assumption changes in the future. + debug_assert_eq!( + std::mem::size_of::() * usize::from(FIRECRACKER_MAX_QUEUE_SIZE), + PAGE_SIZE + ); + + // SAFETY: We are calling this function with valid arguments + unsafe { + Self::mmap( + std::ptr::null_mut(), + PAGE_SIZE * 2, + libc::PROT_NONE, + libc::MAP_PRIVATE | libc::MAP_ANONYMOUS, + -1, + 0, + ) + } + } + + /// Create a new [`IovRingBuffer`] that can hold memory described by a single VirtIO queue. + pub fn new() -> Result { + let memfd = Self::create_memfd()?; + let raw_memfd = memfd.as_file().as_raw_fd(); + let buffer = Self::allocate_ring_buffer_memory()?; + + // Map the first page of virtual memory to the physical page described by the memfd object + // SAFETY: We are calling this function with valid arguments + unsafe { + let _ = Self::mmap( + buffer, + PAGE_SIZE, + libc::PROT_READ | libc::PROT_WRITE, + libc::MAP_SHARED | libc::MAP_FIXED, + raw_memfd, + 0, + )?; + } + + // Map the second page of virtual memory to the physical page described by the memfd object + // SAFETY: safe because `Self::allocate_ring_buffer_memory` allocates exactly two pages for + // us + let next_page = unsafe { buffer.add(PAGE_SIZE) }; + // SAFETY: We are calling this function with valid arguments + unsafe { + let _ = Self::mmap( + next_page, + PAGE_SIZE, + libc::PROT_READ | libc::PROT_WRITE, + libc::MAP_SHARED | libc::MAP_FIXED, + raw_memfd, + 0, + )?; + } + + Ok(Self { + iov_ptr: buffer.cast(), + start: 0, + len: 0, + // head: 0, + // tail: 0, + }) + } + + /// Returns the number of `iovec` objects currently in the [`IovRingBuffer`] + #[inline(always)] + pub fn len(&self) -> usize { + self.len + } + + /// Returns `true` if the [`IovRingBuffer`] is empty, `false` otherwise + #[inline(always)] + pub fn is_empty(&self) -> bool { + self.len == 0 + } + + /// Returns `true` if the [`IovRingBuffer`] is full, `false` otherwise + #[inline(always)] + pub fn is_full(&self) -> bool { + self.len == usize::from(FIRECRACKER_MAX_QUEUE_SIZE) + } + + /// Adds an `iovec` in the ring buffer. + /// Panics if the queue is already full. + pub fn push_back(&mut self, iov: iovec) { + // This should NEVER happen, since our ring buffer is as big as the maximum queue size. + // We also check for the sanity of the VirtIO queues, in queue.rs, which means that if we + // ever try to add something in a full ring buffer, there is an internal bug in the device + // emulation logic. Panic here because the device is hopelessly broken. + if self.is_full() { + panic!("The number of `iovec` objects is bigger than the available space"); + } + + // SAFETY: iov_ptr is valid and tail is within bounds + unsafe { + self.iov_ptr.add(self.start + self.len).write(iov); + } + self.len += 1; + } + + /// Pop first `n` iovs from the back of the queue. + /// Panics if `n` is greater than length of the queue. + pub fn pop_back(&mut self, n: usize) { + if self.len() < n { + panic!("Attempt to pop more objects than are in the queue"); + } + // We don't need to care about case where tail will underflow + // because this can only occur if the ring overflow. + self.len -= n; + } + + /// Pop first `n` iovs from the front of the queue. + /// Panics if `n` is greater than length of the queue. + pub fn pop_front(&mut self, n: usize) { + if self.len() < n { + panic!("Attempt to pop more objects than are in the queue"); + } + self.start += n; + self.len -= n; + if usize::from(FIRECRACKER_MAX_QUEUE_SIZE) <= self.start { + self.start -= usize::from(FIRECRACKER_MAX_QUEUE_SIZE); + } + } + + /// Gets a slice of the `iovec` objects currently in the buffer. + pub fn as_slice(&self) -> &[iovec] { + // SAFETY: we create a slice which does not touch same memory twice. + // slice_start and slice_len are valid values. + unsafe { + let slice_start = self.iov_ptr.add(self.start); + let slice_len = self.len; + std::slice::from_raw_parts(slice_start, slice_len) + } + } + + /// Gets a mutable slice of the `iovec` objects currently in the buffer. + pub fn as_mut_slice(&mut self) -> &mut [iovec] { + // SAFETY: we create a slice which does not touch same memory twice. + // slice_start and slice_len are valid values. + unsafe { + let slice_start = self.iov_ptr.add(self.start); + let slice_len = self.len; + std::slice::from_raw_parts_mut(slice_start, slice_len) + } + } +} + +#[cfg(test)] +#[allow(clippy::needless_range_loop)] +mod tests { + use super::*; + + #[test] + fn test_new() { + let ring_buffer = IovRingBuffer::new().unwrap(); + assert!(ring_buffer.is_empty()); + } + + fn make_iovec(id: usize, len: usize) -> iovec { + iovec { + iov_base: id as *mut libc::c_void, + iov_len: len, + } + } + + #[test] + #[should_panic] + fn test_push_back() { + let mut ring_buffer = IovRingBuffer::new().unwrap(); + assert!(ring_buffer.is_empty()); + + for i in 0..256 { + ring_buffer.push_back(make_iovec(i, i)); + assert_eq!(ring_buffer.len(), i + 1); + } + + ring_buffer.push_back(make_iovec(0, 0)); + } + + #[test] + #[should_panic] + fn test_pop_back_empty() { + let mut deque = IovRingBuffer::new().unwrap(); + assert!(deque.is_empty()); + assert!(!deque.is_full()); + + deque.pop_back(1); + } + + #[test] + fn test_pop_back() { + let mut ring_buffer = IovRingBuffer::new().unwrap(); + assert!(ring_buffer.is_empty()); + assert!(!ring_buffer.is_full()); + + for i in 0..256 { + ring_buffer.push_back(make_iovec(i, i)); + assert_eq!(ring_buffer.len(), i + 1); + } + + assert!(ring_buffer.is_full()); + assert!(!ring_buffer.is_empty()); + + ring_buffer.pop_back(256); + assert!(ring_buffer.is_empty()); + assert!(!ring_buffer.is_full()); + } + + #[test] + #[should_panic] + fn test_pop_front_empty() { + let mut ring_buffer = IovRingBuffer::new().unwrap(); + assert!(ring_buffer.is_empty()); + assert!(!ring_buffer.is_full()); + + ring_buffer.pop_front(1); + } + + #[test] + fn test_pop_front() { + let mut ring_buffer = IovRingBuffer::new().unwrap(); + assert!(ring_buffer.is_empty()); + assert!(!ring_buffer.is_full()); + + for i in 0..256 { + ring_buffer.push_back(make_iovec(i, i)); + assert_eq!(ring_buffer.len(), i + 1); + } + + assert!(ring_buffer.is_full()); + assert!(!ring_buffer.is_empty()); + + ring_buffer.pop_front(256); + assert!(ring_buffer.is_empty()); + assert!(!ring_buffer.is_full()); + } + + #[test] + fn test_as_slice() { + let mut buffer = IovRingBuffer::new().unwrap(); + assert_eq!(buffer.as_mut_slice(), &mut []); + + for i in 0..256 { + buffer.push_back(make_iovec(i, 100)); + } + + let buffer_len = buffer.len(); + let slice = buffer.as_slice(); + assert_eq!(slice.len(), buffer_len); + for i in 0..256 { + assert_eq!(slice[i], make_iovec(i, 100)); + } + + let slice = buffer.as_mut_slice(); + assert_eq!(slice.len(), buffer_len); + for i in 0..256 { + assert_eq!(slice[i], make_iovec(i, 100)); + } + + buffer.pop_front(256); + assert!(buffer.is_empty()); + assert_eq!(buffer.as_slice(), &mut []); + assert_eq!(buffer.as_mut_slice(), &mut []); + } +} diff --git a/src/vmm/src/devices/virtio/mod.rs b/src/vmm/src/devices/virtio/mod.rs index f68c2a123c9..21da50b932c 100644 --- a/src/vmm/src/devices/virtio/mod.rs +++ b/src/vmm/src/devices/virtio/mod.rs @@ -16,6 +16,7 @@ pub mod balloon; pub mod block; pub mod device; pub mod gen; +pub mod iov_ring_buffer; pub mod iovec; pub mod mmio; pub mod net; diff --git a/src/vmm/src/devices/virtio/net/device.rs b/src/vmm/src/devices/virtio/net/device.rs index feb7488cc05..d62bddda356 100755 --- a/src/vmm/src/devices/virtio/net/device.rs +++ b/src/vmm/src/devices/virtio/net/device.rs @@ -5,22 +5,22 @@ // Use of this source code is governed by a BSD-style license that can be // found in the THIRD-PARTY file. -use std::io::Read; use std::mem; use std::net::Ipv4Addr; +use std::num::NonZeroUsize; use std::sync::{Arc, Mutex}; use libc::EAGAIN; -use log::{error, warn}; -use vm_memory::GuestMemoryError; +use log::error; use vmm_sys_util::eventfd::EventFd; +use super::rx_buffer::RxBuffer; use crate::devices::virtio::device::{DeviceState, IrqTrigger, IrqType, VirtioDevice}; use crate::devices::virtio::gen::virtio_blk::VIRTIO_F_VERSION_1; use crate::devices::virtio::gen::virtio_net::{ virtio_net_hdr_v1, VIRTIO_NET_F_CSUM, VIRTIO_NET_F_GUEST_CSUM, VIRTIO_NET_F_GUEST_TSO4, VIRTIO_NET_F_GUEST_TSO6, VIRTIO_NET_F_GUEST_UFO, VIRTIO_NET_F_HOST_TSO4, - VIRTIO_NET_F_HOST_TSO6, VIRTIO_NET_F_HOST_UFO, VIRTIO_NET_F_MAC, + VIRTIO_NET_F_HOST_TSO6, VIRTIO_NET_F_HOST_UFO, VIRTIO_NET_F_MAC, VIRTIO_NET_F_MRG_RXBUF, }; use crate::devices::virtio::gen::virtio_ring::VIRTIO_RING_F_EVENT_IDX; use crate::devices::virtio::iovec::IoVecBuffer; @@ -29,7 +29,7 @@ use crate::devices::virtio::net::tap::Tap; use crate::devices::virtio::net::{ gen, NetError, NetQueue, MAX_BUFFER_SIZE, NET_QUEUE_SIZES, RX_INDEX, TX_INDEX, }; -use crate::devices::virtio::queue::{DescriptorChain, Queue}; +use crate::devices::virtio::queue::Queue; use crate::devices::virtio::{ActivateError, TYPE_NET}; use crate::devices::{report_net_event_fail, DeviceError}; use crate::dumbo::pdu::arp::ETH_IPV4_FRAME_LEN; @@ -40,24 +40,10 @@ use crate::mmds::ns::MmdsNetworkStack; use crate::rate_limiter::{BucketUpdate, RateLimiter, TokenType}; use crate::utils::net::mac::MacAddr; use crate::utils::u64_to_usize; -use crate::vstate::memory::{ByteValued, Bytes, GuestMemoryMmap}; +use crate::vstate::memory::{ByteValued, GuestMemoryMmap}; const FRAME_HEADER_MAX_LEN: usize = PAYLOAD_OFFSET + ETH_IPV4_FRAME_LEN; -#[derive(Debug, thiserror::Error, displaydoc::Display)] -enum FrontendError { - /// Add user. - AddUsed, - /// Descriptor chain too mall. - DescriptorChainTooSmall, - /// Empty queue. - EmptyQueue, - /// Guest memory error: {0} - GuestMemory(GuestMemoryError), - /// Read only descriptor. - ReadOnlyDescriptor, -} - pub(crate) const fn vnet_hdr_len() -> usize { mem::size_of::() } @@ -122,9 +108,9 @@ pub struct Net { pub(crate) rx_rate_limiter: RateLimiter, pub(crate) tx_rate_limiter: RateLimiter, - pub(crate) rx_deferred_frame: bool, - - rx_bytes_read: usize, + /// Used to store last RX packet size and + /// rate limit RX queue. + deferred_rx_bytes: Option, rx_frame_buf: [u8; MAX_BUFFER_SIZE], tx_frame_headers: [u8; frame_hdr_len()], @@ -143,6 +129,7 @@ pub struct Net { pub(crate) metrics: Arc, tx_buffer: IoVecBuffer, + pub rx_buffer: RxBuffer, } impl Net { @@ -163,6 +150,7 @@ impl Net { | 1 << VIRTIO_NET_F_HOST_TSO6 | 1 << VIRTIO_NET_F_HOST_UFO | 1 << VIRTIO_F_VERSION_1 + | 1 << VIRTIO_NET_F_MRG_RXBUF | 1 << VIRTIO_RING_F_EVENT_IDX; let mut config_space = ConfigSpace::default(); @@ -189,8 +177,7 @@ impl Net { queue_evts, rx_rate_limiter, tx_rate_limiter, - rx_deferred_frame: false, - rx_bytes_read: 0, + deferred_rx_bytes: None, rx_frame_buf: [0u8; MAX_BUFFER_SIZE], tx_frame_headers: [0u8; frame_hdr_len()], irq_trigger: IrqTrigger::new().map_err(NetError::EventFd)?, @@ -201,6 +188,7 @@ impl Net { mmds_ns: None, metrics: NetMetricsPerDevice::alloc(id), tx_buffer: Default::default(), + rx_buffer: RxBuffer::new()?, }) } @@ -311,126 +299,47 @@ impl Net { // Attempts to copy a single frame into the guest if there is enough // rate limiting budget. // Returns true on successful frame delivery. - fn rate_limited_rx_single_frame(&mut self) -> bool { - if !Self::rate_limiter_consume_op(&mut self.rx_rate_limiter, self.rx_bytes_read as u64) { - self.metrics.rx_rate_limiter_throttled.inc(); - return false; - } - - // Attempt frame delivery. - let success = self.write_frame_to_guest(); - - // Undo the tokens consumption if guest delivery failed. - if !success { - // revert the rate limiting budget consumption - Self::rate_limiter_replenish_op(&mut self.rx_rate_limiter, self.rx_bytes_read as u64); - } - - success - } - - /// Write a slice in a descriptor chain - /// - /// # Errors - /// - /// Returns an error if the descriptor chain is too short or - /// an inappropriate (read only) descriptor is found in the chain - fn write_to_descriptor_chain( - mem: &GuestMemoryMmap, - data: &[u8], - head: DescriptorChain, - net_metrics: &NetDeviceMetrics, - ) -> Result<(), FrontendError> { - let mut chunk = data; - let mut next_descriptor = Some(head); - - while let Some(descriptor) = &next_descriptor { - if !descriptor.is_write_only() { - return Err(FrontendError::ReadOnlyDescriptor); - } - - let len = std::cmp::min(chunk.len(), descriptor.len as usize); - match mem.write_slice(&chunk[..len], descriptor.addr) { - Ok(()) => { - net_metrics.rx_count.inc(); - chunk = &chunk[len..]; - } - Err(err) => { - error!("Failed to write slice: {:?}", err); - if let GuestMemoryError::PartialBuffer { .. } = err { - net_metrics.rx_partial_writes.inc(); - } - return Err(FrontendError::GuestMemory(err)); + fn send_deferred_rx_bytes(&mut self) -> bool { + match self.deferred_rx_bytes { + Some(bytes) => { + if Self::rate_limiter_consume_op(&mut self.rx_rate_limiter, bytes.get() as u64) { + // The packet is good to go, reset `deferred_rx_bytes`. + self.deferred_rx_bytes = None; + // Attempt frame delivery. + self.rx_buffer.notify_queue(&mut self.queues[RX_INDEX]); + true + } else { + self.metrics.rx_rate_limiter_throttled.inc(); + false } } - - // If chunk is empty we are done here. - if chunk.is_empty() { - let len = data.len() as u64; - net_metrics.rx_bytes_count.add(len); - net_metrics.rx_packets_count.inc(); - return Ok(()); - } - - next_descriptor = descriptor.next_descriptor(); + None => true, } - - warn!("Receiving buffer is too small to hold frame of current size"); - Err(FrontendError::DescriptorChainTooSmall) } - // Copies a single frame from `self.rx_frame_buf` into the guest. - fn do_write_frame_to_guest(&mut self) -> Result<(), FrontendError> { + /// Parse available RX `DescriptorChains` from the queue and + /// put them into `RxBuffer`. + pub fn parse_rx_descriptors(&mut self) { // This is safe since we checked in the event handler that the device is activated. let mem = self.device_state.mem().unwrap(); - let queue = &mut self.queues[RX_INDEX]; - let head_descriptor = queue.pop_or_enable_notification().ok_or_else(|| { - self.metrics.no_rx_avail_buffer.inc(); - FrontendError::EmptyQueue - })?; - let head_index = head_descriptor.index; - - let result = Self::write_to_descriptor_chain( - mem, - &self.rx_frame_buf[..self.rx_bytes_read], - head_descriptor, - &self.metrics, - ); - // Mark the descriptor chain as used. If an error occurred, skip the descriptor chain. - let used_len = if result.is_err() { - self.metrics.rx_fails.inc(); - 0 - } else { - // Safe to unwrap because a frame must be smaller than 2^16 bytes. - u32::try_from(self.rx_bytes_read).unwrap() - }; - queue.add_used(head_index, used_len).map_err(|err| { - error!("Failed to add available descriptor {}: {}", head_index, err); - FrontendError::AddUsed - })?; - - result - } - - // Copies a single frame from `self.rx_frame_buf` into the guest. In case of an error retries - // the operation if possible. Returns true if the operation was successfull. - fn write_frame_to_guest(&mut self) -> bool { - let max_iterations = self.queues[RX_INDEX].actual_size(); - for _ in 0..max_iterations { - match self.do_write_frame_to_guest() { - Ok(()) => return true, - Err(FrontendError::EmptyQueue) | Err(FrontendError::AddUsed) => { - return false; - } - Err(_) => { - // retry - continue; + while let Some(head) = queue.pop_or_enable_notification() { + let index = head.index; + // SAFETY: we are only using this `DescriptorChain` here. + if let Err(err) = unsafe { self.rx_buffer.add_chain(mem, head) } { + error!("net: Could not parse an RX descriptor: {err}"); + // Notify queue about ready descriptors. We need to do this + // to bring queue into up to date state. + self.rx_buffer.notify_queue(queue); + // Try to add the bad descriptor to the used ring. + if let Err(err) = queue.add_used(index, 0) { + error!( + "net: Failed to add available RX descriptor {index} while handling a \ + parsing error: {err}", + ); } } } - - false } // Tries to detour the frame to MMDS and if MMDS doesn't accept it, sends it on the host TAP. @@ -509,6 +418,10 @@ impl Net { // We currently prioritize packets from the MMDS over regular network packets. fn read_from_mmds_or_tap(&mut self) -> Result { + if self.rx_buffer.is_empty() { + return Ok(0); + } + if let Some(ns) = self.mmds_ns.as_mut() { if let Some(len) = ns.write_next_frame(frame_bytes_from_buf_mut(&mut self.rx_frame_buf)?) @@ -517,22 +430,56 @@ impl Net { METRICS.mmds.tx_frames.inc(); METRICS.mmds.tx_bytes.add(len as u64); init_vnet_hdr(&mut self.rx_frame_buf); + let bytes = &self.rx_frame_buf[..vnet_hdr_len() + len]; + // If the rx_buffer does not have enough capacity to hold + // MMDS packet, return a error and skip the packet. + self.rx_buffer.write(bytes)?; + // SAFETY: length of the buffer is less than u32::MAX + #[allow(clippy::cast_possible_truncation)] + // SAFETY: If `write` above succeeds, this means there was a valid + // write and we are allowed to call this function. Write must be valid, + // because `bytes` slice is at least `vnet_hdr_len` bytes long. + unsafe { + self.rx_buffer + .finish_packet(bytes.len() as u32, &mut self.queues[RX_INDEX]); + } return Ok(vnet_hdr_len() + len); } } - self.read_tap().map_err(NetError::IO) + let bytes_written = self.read_tap().map_err(NetError::IO)?; + // SAFETY: Max length of the packet we read is less than u32::MAX + #[allow(clippy::cast_possible_truncation)] + // SAFETY: If `read_tap` above succeeds, this means there was a valid + // write. The only way the write can be considered `invalid` if the `rx_buffer` + // is empty, but we check this condition at the beginning of the function. + unsafe { + self.rx_buffer + .finish_packet(bytes_written as u32, &mut self.queues[RX_INDEX]); + } + + Ok(bytes_written) } + /// Read as many frames as possible. fn process_rx(&mut self) -> Result<(), DeviceError> { - // Read as many frames as possible. + if !self.send_deferred_rx_bytes() { + return Ok(()); + } + + self.parse_rx_descriptors(); + loop { match self.read_from_mmds_or_tap() { + Ok(0) => { + self.metrics.no_rx_avail_buffer.inc(); + break; + } Ok(count) => { - self.rx_bytes_read = count; + self.deferred_rx_bytes = NonZeroUsize::new(count); self.metrics.rx_count.inc(); - if !self.rate_limited_rx_single_frame() { - self.rx_deferred_frame = true; + self.metrics.rx_packets_count.inc(); + if !self.send_deferred_rx_bytes() { break; } } @@ -558,26 +505,6 @@ impl Net { self.try_signal_queue(NetQueue::Rx) } - // Process the deferred frame first, then continue reading from tap. - fn handle_deferred_frame(&mut self) -> Result<(), DeviceError> { - if self.rate_limited_rx_single_frame() { - self.rx_deferred_frame = false; - // process_rx() was interrupted possibly before consuming all - // packets in the tap; try continuing now. - return self.process_rx(); - } - - self.try_signal_queue(NetQueue::Rx) - } - - fn resume_rx(&mut self) -> Result<(), DeviceError> { - if self.rx_deferred_frame { - self.handle_deferred_frame() - } else { - Ok(()) - } - } - fn process_tx(&mut self) -> Result<(), DeviceError> { // This is safe since we checked in the event handler that the device is activated. let mem = self.device_state.mem().unwrap(); @@ -636,7 +563,7 @@ impl Net { &self.metrics, ) .unwrap_or(false); - if frame_consumed_by_mmds && !self.rx_deferred_frame { + if frame_consumed_by_mmds { // MMDS consumed this frame/request, let's also try to process the response. process_rx_for_mmds = true; } @@ -715,14 +642,21 @@ impl Net { self.tx_rate_limiter.update_buckets(tx_bytes, tx_ops); } - fn read_tap(&mut self) -> std::io::Result { - self.tap.read(&mut self.rx_frame_buf) - } - fn write_tap(tap: &mut Tap, buf: &IoVecBuffer) -> std::io::Result { tap.write_iovec(buf) } + fn read_tap(&mut self) -> std::io::Result { + if self.has_feature(u64::from(VIRTIO_NET_F_MRG_RXBUF)) { + let Some(s) = self.rx_buffer.all_chains_mut_slice() else { + return Err(std::io::Error::from_raw_os_error(EAGAIN)); + }; + self.tap.read_iovec(s) + } else { + self.tap.read_iovec(self.rx_buffer.one_chain_mut_slice()) + } + } + /// Process a single RX queue event. /// /// This is called by the event manager responding to the guest adding a new @@ -734,11 +668,16 @@ impl Net { // rate limiters present but with _very high_ allowed rate error!("Failed to get rx queue event: {:?}", err); self.metrics.event_fails.inc(); - } else if self.rx_rate_limiter.is_blocked() { + return; + } else { + self.parse_rx_descriptors(); + } + + if self.rx_rate_limiter.is_blocked() { self.metrics.rx_rate_limiter_throttled.inc(); } else { // If the limiter is not blocked, resume the receiving of bytes. - self.resume_rx() + self.process_rx() .unwrap_or_else(|err| report_net_event_fail(&self.metrics, err)); } } @@ -747,31 +686,14 @@ impl Net { // This is safe since we checked in the event handler that the device is activated. self.metrics.rx_tap_event_count.inc(); - // While there are no available RX queue buffers and there's a deferred_frame - // don't process any more incoming. Otherwise start processing a frame. In the - // process the deferred_frame flag will be set in order to avoid freezing the - // RX queue. - if self.queues[RX_INDEX].is_empty() && self.rx_deferred_frame { - self.metrics.no_rx_avail_buffer.inc(); - return; - } - // While limiter is blocked, don't process any more incoming. if self.rx_rate_limiter.is_blocked() { self.metrics.rx_rate_limiter_throttled.inc(); return; } - if self.rx_deferred_frame - // Process a deferred frame first if available. Don't read from tap again - // until we manage to receive this deferred frame. - { - self.handle_deferred_frame() - .unwrap_or_else(|err| report_net_event_fail(&self.metrics, err)); - } else { - self.process_rx() - .unwrap_or_else(|err| report_net_event_fail(&self.metrics, err)); - } + self.process_rx() + .unwrap_or_else(|err| report_net_event_fail(&self.metrics, err)); } /// Process a single TX queue event. @@ -801,7 +723,7 @@ impl Net { match self.rx_rate_limiter.event_handler() { Ok(_) => { // There might be enough budget now to receive the frame. - self.resume_rx() + self.process_rx() .unwrap_or_else(|err| report_net_event_fail(&self.metrics, err)); } Err(err) => { @@ -830,7 +752,7 @@ impl Net { /// Process device virtio queue(s). pub fn process_virtio_queues(&mut self) { - let _ = self.resume_rx(); + let _ = self.process_rx(); let _ = self.process_tx(); } } @@ -935,6 +857,8 @@ pub mod tests { use std::time::Duration; use std::{mem, thread}; + use libc::iovec; + use super::*; use crate::check_metric_after_block; use crate::devices::virtio::gen::virtio_ring::VIRTIO_RING_F_EVENT_IDX; @@ -942,6 +866,7 @@ pub mod tests { use crate::devices::virtio::net::device::{ frame_bytes_from_buf, frame_bytes_from_buf_mut, frame_hdr_len, init_vnet_hdr, vnet_hdr_len, }; + use crate::devices::virtio::net::rx_buffer::{header_set_num_buffers, ChainInfo}; use crate::devices::virtio::net::test_utils::test::TestHelper; use crate::devices::virtio::net::test_utils::{ default_net, if_index, inject_tap_tx_frame, set_mac, NetEvent, NetQueue, @@ -1008,6 +933,7 @@ pub mod tests { | 1 << VIRTIO_NET_F_HOST_TSO6 | 1 << VIRTIO_NET_F_HOST_UFO | 1 << VIRTIO_F_VERSION_1 + | 1 << VIRTIO_NET_F_MRG_RXBUF | 1 << VIRTIO_RING_F_EVENT_IDX; assert_eq!( @@ -1129,10 +1055,9 @@ pub mod tests { assert_eq!(th.rxq.used.idx.get(), 0); } - #[test] - fn test_rx_read_only_descriptor() { - let mem = single_region_mem(2 * MAX_BUFFER_SIZE); - let mut th = TestHelper::get_default(&mem); + fn rx_read_only_descriptor(mut th: TestHelper) { + // let mem = single_region_mem(2 * MAX_BUFFER_SIZE); + // let mut th = TestHelper::get_default(&mem); th.activate_net(); th.add_desc_chain( @@ -1144,29 +1069,29 @@ pub mod tests { (2, 1000, VIRTQ_DESC_F_WRITE), ], ); - let frame = th.check_rx_deferred_frame(1000); + let frame = th.check_rx_discard_frame(1000); th.rxq.check_used_elem(0, 0, 0); th.check_rx_queue_resume(&frame); } #[test] - fn test_rx_short_writable_descriptor() { + fn test_rx_read_only_descriptor() { let mem = single_region_mem(2 * MAX_BUFFER_SIZE); - let mut th = TestHelper::get_default(&mem); - th.activate_net(); - - th.add_desc_chain(NetQueue::Rx, 0, &[(0, 100, VIRTQ_DESC_F_WRITE)]); - let frame = th.check_rx_deferred_frame(1000); - th.rxq.check_used_elem(0, 0, 0); - - th.check_rx_queue_resume(&frame); + let th = TestHelper::get_default(&mem); + rx_read_only_descriptor(th); } #[test] - fn test_rx_partial_write() { + fn test_rx_read_only_descriptor_mrg_buf() { let mem = single_region_mem(2 * MAX_BUFFER_SIZE); let mut th = TestHelper::get_default(&mem); + // VIRTIO_NET_F_MRG_RXBUF is not enabled by default + th.net().acked_features = 1 << VIRTIO_NET_F_MRG_RXBUF; + rx_read_only_descriptor(th); + } + + fn rx_partial_write(mut th: TestHelper) { th.activate_net(); // The descriptor chain is created so that the last descriptor doesn't fit in the @@ -1176,23 +1101,40 @@ pub mod tests { NetQueue::Rx, offset, &[ - (0, 100, VIRTQ_DESC_F_WRITE), + (0, 110, VIRTQ_DESC_F_WRITE), (1, 50, VIRTQ_DESC_F_WRITE), (2, 4096, VIRTQ_DESC_F_WRITE), ], ); - let frame = th.check_rx_deferred_frame(1000); + let frame = th.check_rx_discard_frame(1000); th.rxq.check_used_elem(0, 0, 0); th.check_rx_queue_resume(&frame); } #[test] - fn test_rx_retry() { + fn test_rx_partial_write() { + let mem = single_region_mem(2 * MAX_BUFFER_SIZE); + let th = TestHelper::get_default(&mem); + rx_partial_write(th); + } + + #[test] + fn test_rx_partial_write_mrg_buf() { let mem = single_region_mem(2 * MAX_BUFFER_SIZE); let mut th = TestHelper::get_default(&mem); + // VIRTIO_NET_F_MRG_RXBUF is not enabled by default + th.net().acked_features = 1 << VIRTIO_NET_F_MRG_RXBUF; + rx_partial_write(th); + } + + fn rx_retry(mut th: TestHelper) { th.activate_net(); + // Even though too short descriptor chains are also + // not valid, we cannot test them as from `readv` perspective + // they are totaly fine. + // Add invalid descriptor chain - read only descriptor. th.add_desc_chain( NetQueue::Rx, @@ -1203,17 +1145,15 @@ pub mod tests { (2, 1000, VIRTQ_DESC_F_WRITE), ], ); - // Add invalid descriptor chain - too short. - th.add_desc_chain(NetQueue::Rx, 1200, &[(3, 100, VIRTQ_DESC_F_WRITE)]); // Add invalid descriptor chain - invalid memory offset. th.add_desc_chain( NetQueue::Rx, th.mem.last_addr().raw_value(), - &[(4, 1000, VIRTQ_DESC_F_WRITE)], + &[(3, 1000, VIRTQ_DESC_F_WRITE)], ); // Add valid descriptor chain. - th.add_desc_chain(NetQueue::Rx, 1300, &[(5, 1000, VIRTQ_DESC_F_WRITE)]); + th.add_desc_chain(NetQueue::Rx, 1300, &[(4, 1000, VIRTQ_DESC_F_WRITE)]); // Inject frame to tap and run epoll. let frame = inject_tap_tx_frame(&th.net(), 1000); @@ -1224,24 +1164,36 @@ pub mod tests { ); // Check that the used queue has advanced. - assert_eq!(th.rxq.used.idx.get(), 4); + assert_eq!(th.rxq.used.idx.get(), 3); assert!(&th.net().irq_trigger.has_pending_irq(IrqType::Vring)); // Check that the invalid descriptor chains have been discarded th.rxq.check_used_elem(0, 0, 0); th.rxq.check_used_elem(1, 3, 0); - th.rxq.check_used_elem(2, 4, 0); // Check that the frame wasn't deferred. - assert!(!th.net().rx_deferred_frame); + assert!(th.net().deferred_rx_bytes.is_none()); // Check that the frame has been written successfully to the valid Rx descriptor chain. th.rxq - .check_used_elem(3, 5, frame.len().try_into().unwrap()); - th.rxq.dtable[5].check_data(&frame); + .check_used_elem(2, 4, frame.len().try_into().unwrap()); + th.rxq.dtable[4].check_data(&frame); } #[test] - fn test_rx_complex_desc_chain() { + fn test_rx_retry() { + let mem = single_region_mem(2 * MAX_BUFFER_SIZE); + let th = TestHelper::get_default(&mem); + rx_retry(th); + } + + #[test] + fn test_rx_retry_mrg_buf() { let mem = single_region_mem(2 * MAX_BUFFER_SIZE); let mut th = TestHelper::get_default(&mem); + // VIRTIO_NET_F_MRG_RXBUF is not enabled by default + th.net().acked_features = 1 << VIRTIO_NET_F_MRG_RXBUF; + rx_retry(th); + } + + fn rx_complex_desc_chain(mut th: TestHelper) { th.activate_net(); // Create a valid Rx avail descriptor chain with multiple descriptors. @@ -1265,7 +1217,7 @@ pub mod tests { ); // Check that the frame wasn't deferred. - assert!(!th.net().rx_deferred_frame); + assert!(th.net().deferred_rx_bytes.is_none()); // Check that the used queue has advanced. assert_eq!(th.rxq.used.idx.get(), 1); assert!(&th.net().irq_trigger.has_pending_irq(IrqType::Vring)); @@ -1278,9 +1230,22 @@ pub mod tests { } #[test] - fn test_rx_multiple_frames() { + fn test_rx_complex_desc_chain() { + let mem = single_region_mem(2 * MAX_BUFFER_SIZE); + let th = TestHelper::get_default(&mem); + rx_complex_desc_chain(th); + } + + #[test] + fn test_rx_complex_desc_chain_mrg_buf() { let mem = single_region_mem(2 * MAX_BUFFER_SIZE); let mut th = TestHelper::get_default(&mem); + // VIRTIO_NET_F_MRG_RXBUF is not enabled by default + th.net().acked_features = 1 << VIRTIO_NET_F_MRG_RXBUF; + rx_complex_desc_chain(th); + } + + fn rx_multiple_frames(mut th: TestHelper) { th.activate_net(); // Create 2 valid Rx avail descriptor chains. Each one has enough space to fit the @@ -1305,7 +1270,7 @@ pub mod tests { ); // Check that the frames weren't deferred. - assert!(!th.net().rx_deferred_frame); + assert!(th.net().deferred_rx_bytes.is_none()); // Check that the used queue has advanced. assert_eq!(th.rxq.used.idx.get(), 2); assert!(&th.net().irq_trigger.has_pending_irq(IrqType::Vring)); @@ -1321,6 +1286,70 @@ pub mod tests { th.rxq.dtable[3].check_data(&[0; 500]); } + #[test] + fn test_rx_multiple_frames() { + let mem = single_region_mem(2 * MAX_BUFFER_SIZE); + let th = TestHelper::get_default(&mem); + rx_multiple_frames(th); + } + + #[test] + fn test_rx_multiple_frames_mrg_buf() { + let mem = single_region_mem(2 * MAX_BUFFER_SIZE); + let mut th = TestHelper::get_default(&mem); + // VIRTIO_NET_F_MRG_RXBUF is not enabled by default + th.net().acked_features = 1 << VIRTIO_NET_F_MRG_RXBUF; + rx_multiple_frames(th); + } + + #[test] + fn test_rx_multiple_frames_mrg() { + let mem = single_region_mem(2 * MAX_BUFFER_SIZE); + let mut th = TestHelper::get_default(&mem); + + // VIRTIO_NET_F_MRG_RXBUF is not enabled by default + th.net().acked_features = 1 << VIRTIO_NET_F_MRG_RXBUF; + + th.activate_net(); + + // Create 2 valid avail descriptor chains. We will send + // one packet that shuld be split amound these 2 chains. + th.add_desc_chain( + NetQueue::Rx, + 0, + &[(0, 100, VIRTQ_DESC_F_WRITE), (1, 100, VIRTQ_DESC_F_WRITE)], + ); + th.add_desc_chain( + NetQueue::Rx, + 1000, + &[(2, 100, VIRTQ_DESC_F_WRITE), (3, 100, VIRTQ_DESC_F_WRITE)], + ); + // Inject frame into tap and run epoll. + let mut frame = inject_tap_tx_frame(&th.net(), 400); + // SAFETY: frame is big enough and has correct alingment. + unsafe { + header_set_num_buffers(frame.as_mut_ptr().cast(), 2); + } + check_metric_after_block!( + th.net().metrics.rx_packets_count, + 1, + th.event_manager.run_with_timeout(100).unwrap() + ); + + // Check that the frame wasn't deferred. + assert!(th.net().deferred_rx_bytes.is_none()); + // Check that the used queue has advanced. + assert_eq!(th.rxq.used.idx.get(), 2); + assert!(&th.net().irq_trigger.has_pending_irq(IrqType::Vring)); + // Check that the frame was written successfully into both descriptor chains. + th.rxq.check_used_elem(0, 0, 200); + th.rxq.check_used_elem(1, 2, 200); + th.rxq.dtable[0].check_data(&frame[0..100]); + th.rxq.dtable[1].check_data(&frame[100..200]); + th.rxq.dtable[2].check_data(&frame[200..300]); + th.rxq.dtable[3].check_data(&frame[300..400]); + } + #[test] fn test_tx_missing_queue_signal() { let mem = single_region_mem(2 * MAX_BUFFER_SIZE); @@ -1605,6 +1634,19 @@ pub mod tests { fn test_mmds_detour_and_injection() { let mut net = default_net(); + // Fake available rx buffer. Otherwise we will not read from MMDS + // because rx_buffer.is_empty() check is before the read. + let mut fake_buffer = vec![0; 128]; + net.rx_buffer.iovecs.push_back(iovec { + iov_base: fake_buffer.as_mut_ptr().cast(), + iov_len: fake_buffer.len(), + }); + net.rx_buffer.chain_infos.push_back(ChainInfo { + head_index: 0, + chain_len: 1, + chain_capacity: 128, + }); + let src_mac = MacAddr::from_str("11:11:11:11:11:11").unwrap(); let src_ip = Ipv4Addr::new(10, 1, 2, 3); let dst_mac = MacAddr::from_str("22:22:22:22:22:22").unwrap(); @@ -1721,79 +1763,13 @@ pub mod tests { // SAFETY: its a valid fd unsafe { libc::close(th.net.lock().unwrap().tap.as_raw_fd()) }; - // The RX queue is empty and rx_deffered_frame is set. - th.net().rx_deferred_frame = true; - check_metric_after_block!( - th.net().metrics.no_rx_avail_buffer, - 1, - th.simulate_event(NetEvent::Tap) - ); - - // We need to set this here to false, otherwise the device will try to - // handle a deferred frame, it will fail and will never try to read from - // the tap. - th.net().rx_deferred_frame = false; - - // Fake an avail buffer; this time, tap reading should error out. - th.rxq.avail.idx.set(1); - check_metric_after_block!( - th.net().metrics.tap_read_fails, - 1, - th.simulate_event(NetEvent::Tap) - ); - } - - #[test] - fn test_deferred_frame() { - let mem = single_region_mem(2 * MAX_BUFFER_SIZE); - let mut th = TestHelper::get_default(&mem); - th.activate_net(); - - let rx_packets_count = th.net().metrics.rx_packets_count.count(); - let _ = inject_tap_tx_frame(&th.net(), 1000); - // Trigger a Tap event that. This should fail since there - // are not any available descriptors in the queue - check_metric_after_block!( - th.net().metrics.no_rx_avail_buffer, - 1, - th.simulate_event(NetEvent::Tap) - ); - // The frame we read from the tap should be deferred now and - // no frames should have been transmitted - assert!(th.net().rx_deferred_frame); - assert_eq!(th.net().metrics.rx_packets_count.count(), rx_packets_count); - - // Let's add a second frame, which should really have the same - // fate. - let _ = inject_tap_tx_frame(&th.net(), 1000); - - // Adding a descriptor in the queue. This should handle the first deferred - // frame. However, this should try to handle the second tap as well and fail - // since there's only one Descriptor Chain in the queue. + // Fake an avail buffer; tap reading should error out. th.add_desc_chain(NetQueue::Rx, 0, &[(0, 4096, VIRTQ_DESC_F_WRITE)]); check_metric_after_block!( - th.net().metrics.no_rx_avail_buffer, + th.net().metrics.tap_read_fails, 1, th.simulate_event(NetEvent::Tap) ); - // We should still have a deferred frame - assert!(th.net().rx_deferred_frame); - // However, we should have delivered the first frame - assert_eq!( - th.net().metrics.rx_packets_count.count(), - rx_packets_count + 1 - ); - - // Let's add one more descriptor and try to handle the last frame as well. - th.add_desc_chain(NetQueue::Rx, 0, &[(0, 4096, VIRTQ_DESC_F_WRITE)]); - check_metric_after_block!( - th.net().metrics.rx_packets_count, - 1, - th.simulate_event(NetEvent::RxQueue) - ); - - // We should be done with any deferred frame - assert!(!th.net().rx_deferred_frame); } #[test] @@ -1908,7 +1884,7 @@ pub mod tests { let mut rl = RateLimiter::new(1000, 0, 500, 0, 0, 0).unwrap(); // set up RX - assert!(!th.net().rx_deferred_frame); + assert!(th.net().deferred_rx_bytes.is_none()); th.add_desc_chain(NetQueue::Rx, 0, &[(0, 4096, VIRTQ_DESC_F_WRITE)]); let frame = inject_tap_tx_frame(&th.net(), 1000); @@ -1928,7 +1904,7 @@ pub mod tests { // assert that limiter is blocked assert!(th.net().rx_rate_limiter.is_blocked()); assert_eq!(th.net().metrics.rx_rate_limiter_throttled.count(), 1); - assert!(th.net().rx_deferred_frame); + assert!(th.net().deferred_rx_bytes.is_some()); // assert that no operation actually completed (limiter blocked it) assert!(&th.net().irq_trigger.has_pending_irq(IrqType::Vring)); // make sure the data is still queued for processing @@ -2026,7 +2002,7 @@ pub mod tests { let mut rl = RateLimiter::new(0, 0, 0, 1, 0, 500).unwrap(); // set up RX - assert!(!th.net().rx_deferred_frame); + assert!(th.net().deferred_rx_bytes.is_none()); th.add_desc_chain(NetQueue::Rx, 0, &[(0, 4096, VIRTQ_DESC_F_WRITE)]); let frame = inject_tap_tx_frame(&th.net(), 1234); @@ -2048,7 +2024,7 @@ pub mod tests { // assert that limiter is blocked assert!(th.net().rx_rate_limiter.is_blocked()); assert!(th.net().metrics.rx_rate_limiter_throttled.count() >= 1); - assert!(th.net().rx_deferred_frame); + assert!(th.net().deferred_rx_bytes.is_some()); // assert that no operation actually completed (limiter blocked it) assert!(&th.net().irq_trigger.has_pending_irq(IrqType::Vring)); // make sure the data is still queued for processing diff --git a/src/vmm/src/devices/virtio/net/mod.rs b/src/vmm/src/devices/virtio/net/mod.rs index 1a7972595ad..8c433dbab54 100644 --- a/src/vmm/src/devices/virtio/net/mod.rs +++ b/src/vmm/src/devices/virtio/net/mod.rs @@ -21,14 +21,17 @@ pub mod device; mod event_handler; pub mod metrics; pub mod persist; +pub mod rx_buffer; mod tap; pub mod test_utils; mod gen; pub use tap::{Tap, TapError}; +use vm_memory::VolatileMemoryError; pub use self::device::Net; +use self::rx_buffer::RxBufferError; /// Enum representing the Net device queue types #[derive(Debug)] @@ -50,6 +53,10 @@ pub enum NetError { EventFd(io::Error), /// IO error: {0} IO(io::Error), + /// Error writing in guest memory: {0} + GuestMemoryError(#[from] VolatileMemoryError), /// The VNET header is missing from the frame VnetHeaderMissing, + /// RxBuffer error: {0} + RxBufferError(#[from] RxBufferError), } diff --git a/src/vmm/src/devices/virtio/net/persist.rs b/src/vmm/src/devices/virtio/net/persist.rs index 54ff724ce51..502ce51edca 100644 --- a/src/vmm/src/devices/virtio/net/persist.rs +++ b/src/vmm/src/devices/virtio/net/persist.rs @@ -10,7 +10,8 @@ use std::sync::{Arc, Mutex}; use serde::{Deserialize, Serialize}; use super::device::Net; -use super::NET_NUM_QUEUES; +use super::rx_buffer::RxBufferState; +use super::{NET_NUM_QUEUES, RX_INDEX}; use crate::devices::virtio::device::DeviceState; use crate::devices::virtio::persist::{PersistError as VirtioStateError, VirtioDeviceState}; use crate::devices::virtio::queue::FIRECRACKER_MAX_QUEUE_SIZE; @@ -43,6 +44,7 @@ pub struct NetState { pub mmds_ns: Option, config_space: NetConfigSpaceState, virtio_state: VirtioDeviceState, + rx_buffer_state: RxBufferState, } /// Auxiliary structure for creating a device when resuming from a snapshot. @@ -83,6 +85,7 @@ impl Persist<'_> for Net { guest_mac: self.guest_mac, }, virtio_state: VirtioDeviceState::from_device(self), + rx_buffer_state: RxBufferState::from_rx_buffer(&self.rx_buffer), } } @@ -130,6 +133,13 @@ impl Persist<'_> for Net { if state.virtio_state.activated { net.device_state = DeviceState::Activated(constructor_args.mem); + + // Recreate `rx_buffer`. We do it by temporarily + // rolling back `next_avail` in the RX queue. The `next_avail` + // will be rolled forward in the `parse_rx_descriptors` method. + net.queues[RX_INDEX].next_avail -= state.rx_buffer_state.chains_count; + net.parse_rx_descriptors(); + net.rx_buffer.used_descriptors = state.rx_buffer_state.used_descriptors; } Ok(net) diff --git a/src/vmm/src/devices/virtio/net/rx_buffer.rs b/src/vmm/src/devices/virtio/net/rx_buffer.rs new file mode 100644 index 00000000000..6c907c3c5f9 --- /dev/null +++ b/src/vmm/src/devices/virtio/net/rx_buffer.rs @@ -0,0 +1,538 @@ +// Copyright 2024 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use std::mem::offset_of; +use std::num::Wrapping; + +use libc::{c_void, iovec, size_t}; +use serde::{Deserialize, Serialize}; +use vm_memory::bitmap::Bitmap; +use vm_memory::{GuestMemory, GuestMemoryError}; + +use crate::devices::virtio::gen::virtio_net::virtio_net_hdr_v1; +use crate::devices::virtio::iov_ring_buffer::{IovRingBuffer, IovRingBufferError}; +use crate::devices::virtio::net::device::vnet_hdr_len; +use crate::devices::virtio::queue::{DescriptorChain, Queue, FIRECRACKER_MAX_QUEUE_SIZE}; +use crate::logger::error; +use crate::utils::ring_buffer::RingBuffer; +use crate::vstate::memory::GuestMemoryMmap; + +/// Writes number of buffers to the [`num_buffers`] field of a virtio_net_hdr_v1 struct +/// pointed by the [`ptr`]. +/// +/// # Safety +/// Memory area needs to be big enoug for virtio_net_hdr_v1 to fit. +pub unsafe fn header_set_num_buffers(ptr: *mut virtio_net_hdr_v1, num_buffers: u16) { + debug_assert!( + ptr.is_aligned(), + "Pointer should have at least 0x2 aligment" + ); + + let ptr: *mut u8 = ptr.cast(); + let ptr = ptr.add(offset_of!(virtio_net_hdr_v1, num_buffers)); + let bytes = num_buffers.to_le_bytes(); + let ptr: *mut [u8; 2] = ptr.cast(); + ptr.write_volatile(bytes); +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RxBufferState { + pub chains_count: u16, + pub used_descriptors: u16, +} + +impl RxBufferState { + pub fn from_rx_buffer(buffer: &RxBuffer) -> Self { + // The maximum number of chains is the maximum size of the queue + // which is FIRECRACKER_MAX_QUEUE_SIZE (256). + Self { + chains_count: buffer.chain_infos.len().try_into().unwrap(), + used_descriptors: buffer.used_descriptors, + } + } +} + +#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)] +pub struct ChainInfo { + pub head_index: u16, + pub chain_len: u16, + pub chain_capacity: u32, +} + +#[derive(Debug, thiserror::Error, displaydoc::Display)] +pub enum RxBufferError { + /// Cannot create `RxBuffer` due to error for `IovRingBuffer`: {0} + New(#[from] IovRingBufferError), + /// Guest memory error: {0} + GuestMemory(#[from] GuestMemoryError), + /// Tried to add a read-only descriptor chain to the `RxBuffer` + ReadOnlyDescriptor, + /// Tried to add too long descriptor chain. + ChainOverflow, + /// Tried to write more bytes than `RxBuffer` can hold. + WriteOverflow, +} + +/// A map of all the memory the guest has provided us with for performing RX +#[derive(Debug)] +pub struct RxBuffer { + // An ring covering all the memory we have available for receiving network + // frames. + pub iovecs: IovRingBuffer, + // Ring buffer of meta data about descriptor chains stored in the `iov_ring`. + pub chain_infos: RingBuffer, + // Total capacity this buffer holds. + pub total_capacity: u32, + // Number of descriptor chains we have used to process packets. + pub used_descriptors: u16, +} + +impl RxBuffer { + /// Create a new [`RxBuffers`] object for storing guest memory for performing RX + pub fn new() -> Result { + Ok(Self { + iovecs: IovRingBuffer::new()?, + chain_infos: RingBuffer::new_with_size(u32::from(FIRECRACKER_MAX_QUEUE_SIZE)), + used_descriptors: 0, + total_capacity: 0, + }) + } + + /// Is number of iovecs is zero. + pub fn is_empty(&self) -> bool { + self.iovecs.is_empty() + } + + /// Returns a slice of underlying iovec for the first chain + /// in the buffer. + pub fn one_chain_mut_slice(&mut self) -> &mut [iovec] { + if let Some(chain_info) = self.chain_infos.first() { + let chain_len = usize::from(chain_info.chain_len); + &mut self.iovecs.as_mut_slice()[0..chain_len] + } else { + &mut [] + } + } + + /// Returns a slice of underlying iovec for the all chains + /// in the buffer. + pub fn all_chains_mut_slice(&mut self) -> Option<&mut [iovec]> { + if self.total_capacity < u32::from(u16::MAX) { + None + } else { + Some(self.iovecs.as_mut_slice()) + } + } + + /// Add a new `DescriptorChain` that we received from the RX queue in the buffer. + /// + /// # Safety + /// The `DescriptorChain` cannot be referencing the same memory location as any other + /// `DescriptorChain`. + pub unsafe fn add_chain( + &mut self, + mem: &GuestMemoryMmap, + head: DescriptorChain, + ) -> Result<(), RxBufferError> { + let head_index = head.index; + + let mut next_descriptor = Some(head); + let mut chain_len: u16 = 0; + let mut chain_capacity: u32 = 0; + while let Some(desc) = next_descriptor { + if !desc.is_write_only() { + self.iovecs.pop_back(usize::from(chain_len)); + return Err(RxBufferError::ReadOnlyDescriptor); + } + + // We use get_slice instead of `get_host_address` here in order to have the whole + // range of the descriptor chain checked, i.e. [addr, addr + len) is a valid memory + // region in the GuestMemoryMmap. + let slice = match mem.get_slice(desc.addr, desc.len as usize) { + Ok(slice) => slice, + Err(e) => { + self.iovecs.pop_back(usize::from(chain_len)); + return Err(RxBufferError::GuestMemory(e)); + } + }; + + // We need to mark the area of guest memory that will be mutated through this + // IoVecBufferMut as dirty ahead of time, as we loose access to all + // vm-memory related information after converting down to iovecs. + slice.bitmap().mark_dirty(0, desc.len as usize); + + let iov_base = slice.ptr_guard_mut().as_ptr().cast::(); + self.iovecs.push_back(iovec { + iov_base, + iov_len: desc.len as size_t, + }); + chain_capacity = chain_capacity + .checked_add(desc.len) + .ok_or(RxBufferError::ChainOverflow)?; + chain_len += 1; + + next_descriptor = desc.next_descriptor(); + } + self.chain_infos.push_back(ChainInfo { + head_index, + chain_len, + chain_capacity, + }); + self.total_capacity += chain_capacity; + + Ok(()) + } + + /// Writes bytes from a slice into buffer. + pub fn write(&mut self, mut bytes: &[u8]) -> Result<(), RxBufferError> { + for iov in self.iovecs.as_mut_slice() { + if bytes.is_empty() { + break; + } + let iov_slice_len = bytes.len().min(iov.iov_len); + // SAFETY: The user space pointer and the length were verified during + // the iovec creation. + unsafe { + std::ptr::copy_nonoverlapping(bytes.as_ptr(), iov.iov_base.cast(), iov_slice_len) + }; + bytes = &bytes[iov_slice_len..]; + } + if !bytes.is_empty() { + Err(RxBufferError::WriteOverflow) + } else { + Ok(()) + } + } + + /// Finish packet processing by removing used iovecs from the buffer and + /// writing information about used descriptor chains into the queue. + /// + /// # Safety + /// `RxBuffer` should not be empty when this method is called because it + /// assumes there is at least one chain holding data. + pub unsafe fn finish_packet(&mut self, mut bytes_written: u32, rx_queue: &mut Queue) { + // This function is called only after some bytes were written to the + // buffer. This means the iov_ring cannot be empty. + debug_assert!(!self.iovecs.is_empty()); + let header_ptr: *mut virtio_net_hdr_v1 = self.iovecs.as_slice()[0].iov_base.cast(); + let header_buff_len = self.iovecs.as_slice()[0].iov_len; + assert!( + vnet_hdr_len() <= header_buff_len, + "Network buffer should be big enough for virtio_net_hdr_v1 object" + ); + + let mut used_heads = 0; + let mut write_used = |head_index: u16, bytes_written: u32, rx_queue: &mut Queue| { + if let Err(err) = rx_queue.write_used_element( + (rx_queue.next_used + Wrapping(self.used_descriptors)).0, + head_index, + bytes_written, + ) { + error!( + "net: Failed to add used descriptor {} of length {} to RX queue: {err}", + head_index, bytes_written + ); + } + used_heads += 1; + self.used_descriptors += 1; + }; + + loop { + let iov_info = self + .chain_infos + .pop_front() + .expect("This should never happen if write to the buffer succeded."); + self.iovecs.pop_front(usize::from(iov_info.chain_len)); + self.total_capacity -= iov_info.chain_capacity; + + if bytes_written <= iov_info.chain_capacity { + write_used(iov_info.head_index, bytes_written, rx_queue); + break; + } else { + write_used(iov_info.head_index, iov_info.chain_capacity, rx_queue); + bytes_written -= iov_info.chain_capacity; + } + } + + // SAFETY: The user space pointer was verified at the point of creation and + // we verified the alignment and header buffer size. + unsafe { + header_set_num_buffers(header_ptr, used_heads); + } + } + + /// Notify queue about all descriptor chains we used to process packets so far. + pub fn notify_queue(&mut self, rx_queue: &mut Queue) { + rx_queue.advance_used_ring(self.used_descriptors); + self.used_descriptors = 0; + } +} + +#[cfg(test)] +// TODO why are we going through this? why clippy hates everything? +#[allow(clippy::cast_possible_wrap)] +#[allow(clippy::needless_range_loop)] +#[allow(clippy::cast_possible_truncation)] +mod tests { + use vm_memory::GuestAddress; + + use super::*; + use crate::devices::virtio::test_utils::{ + set_dtable_many_chains, set_dtable_one_chain, VirtQueue, + }; + use crate::test_utils::single_region_mem; + + #[test] + fn test_rx_buffer_new() { + let mut buff = RxBuffer::new().unwrap(); + assert!(buff.is_empty()); + assert_eq!(buff.one_chain_mut_slice(), &mut []); + } + + #[test] + fn test_rx_buffer_add_chain() { + let mem = single_region_mem(65562 * 2); + let rxq = VirtQueue::new(GuestAddress(0), &mem, 256); + let mut queue = rxq.create_queue(); + + // Single chain with len of 16 + { + let chain_len = 16; + set_dtable_one_chain(&rxq, chain_len); + let desc = queue.pop().unwrap(); + + let mut buff = RxBuffer::new().unwrap(); + // SAFETY: safe it is a test memory + unsafe { + buff.add_chain(&mem, desc).unwrap(); + } + let slice = buff.one_chain_mut_slice(); + for i in 0..chain_len { + assert_eq!( + slice[i].iov_base as u64, + mem.get_host_address(GuestAddress((2048 + 1024 * i) as u64)) + .unwrap() as u64 + ); + assert_eq!(slice[i].iov_len, 1024); + } + assert_eq!(buff.chain_infos.len(), 1); + assert_eq!( + buff.chain_infos.items[0], + ChainInfo { + head_index: 0, + chain_len: 16, + chain_capacity: 16 * 1024, + } + ); + } + + // 64 chains of len 1 and size 1024 + // in total 64K + { + let chains = 64; + set_dtable_many_chains(&rxq, chains); + queue.next_avail = Wrapping(0); + let mut buff = RxBuffer::new().unwrap(); + while let Some(desc) = queue.pop() { + // SAFETY: safe it is a test memory + unsafe { + buff.add_chain(&mem, desc).unwrap(); + } + } + let slice = buff.all_chains_mut_slice().unwrap(); + for i in 0..chains { + assert_eq!( + slice[i].iov_base as u64, + mem.get_host_address(GuestAddress((2048 + 1024 * i) as u64)) + .unwrap() as u64 + ); + assert_eq!(slice[i].iov_len, 1024); + } + assert_eq!(buff.chain_infos.len(), chains as u32); + for (i, ci) in buff.chain_infos.items[0..16].iter().enumerate() { + assert_eq!( + *ci, + ChainInfo { + head_index: i as u16, + chain_len: 1, + chain_capacity: 1024, + } + ); + } + } + } + + #[test] + #[should_panic] + fn test_rx_buffer_write_panic() { + let mem = single_region_mem(65562); + let rxq = VirtQueue::new(GuestAddress(0), &mem, 256); + let mut queue = rxq.create_queue(); + + set_dtable_one_chain(&rxq, 1); + let desc = queue.pop().unwrap(); + + let mut buff = RxBuffer::new().unwrap(); + // SAFETY: safe it is a test memory + unsafe { + buff.add_chain(&mem, desc).unwrap(); + } + + // Write should panic, because we unwrap on error + // because we try to write more than buffer can hold. + buff.write(&[69; 2 * 1024]).unwrap(); + } + + #[test] + fn test_rx_buffer_write_no_mrg_buf() { + let mem = single_region_mem(65562); + let rxq = VirtQueue::new(GuestAddress(0), &mem, 256); + let mut queue = rxq.create_queue(); + + set_dtable_one_chain(&rxq, 1); + let desc = queue.pop().unwrap(); + + let mut buff = RxBuffer::new().unwrap(); + // SAFETY: safe it is a test memory + unsafe { + buff.add_chain(&mem, desc).unwrap(); + } + + // Initially data should be all zeros + let slice = buff.one_chain_mut_slice(); + let data_slice_before: &[u8] = + // SAFETY: safe as iovecs are verified on creation + unsafe { std::slice::from_raw_parts(slice[0].iov_base.cast(), slice[0].iov_len) }; + assert_eq!(data_slice_before, &[0; 1024]); + + // Write should happen to first iovec (as there is only 1) + buff.write(&[69; 1024]).unwrap(); + + let slice = buff.one_chain_mut_slice(); + let data_slice_after: &[u8] = + // SAFETY: safe as iovecs are verified on creation + unsafe { std::slice::from_raw_parts(slice[0].iov_base.cast(), slice[0].iov_len) }; + assert_eq!(data_slice_after, &[69; 1024]); + } + + #[test] + fn test_rx_buffer_write_mrg_buf() { + let mem = single_region_mem(65562 * 2); + let rxq = VirtQueue::new(GuestAddress(0), &mem, 256); + let mut queue = rxq.create_queue(); + + set_dtable_many_chains(&rxq, 64); + + let mut buff = RxBuffer::new().unwrap(); + while let Some(desc) = queue.pop() { + // SAFETY: safe it is a test memory + unsafe { + buff.add_chain(&mem, desc).unwrap(); + } + } + + // Initially data should be all zeros + let slice = buff.all_chains_mut_slice().unwrap(); + let data_slice_before: &[u8] = + // SAFETY: safe as iovecs are verified on creation. + unsafe { std::slice::from_raw_parts(slice[0].iov_base.cast(), slice[0].iov_len) }; + assert_eq!(data_slice_before, &[0; 1024]); + let data_slice_before: &[u8] = + // SAFETY: safe as iovecs are verified on creation. + unsafe { std::slice::from_raw_parts(slice[1].iov_base.cast(), slice[1].iov_len) }; + assert_eq!(data_slice_before, &[0; 1024]); + + // Write should hapepn to all 2 iovecs + buff.write(&[69; 2 * 1024]).unwrap(); + + let slice = buff.all_chains_mut_slice().unwrap(); + let data_slice_after: &[u8] = + // SAFETY: safe as iovecs are verified on creation. + unsafe { std::slice::from_raw_parts(slice[0].iov_base.cast(), slice[0].iov_len) }; + assert_eq!(data_slice_after, &[69; 1024]); + let data_slice_after: &[u8] = + // SAFETY: safe as iovecs are verified on creation. + unsafe { std::slice::from_raw_parts(slice[1].iov_base.cast(), slice[1].iov_len) }; + assert_eq!(data_slice_after, &[69; 1024]); + } + + #[test] + fn test_rx_buffer_finish_packet_and_notify_no_mrg_buf() { + let mem = single_region_mem(65562); + let rxq = VirtQueue::new(GuestAddress(0), &mem, 256); + let mut queue = rxq.create_queue(); + + let chain_len = 16; + set_dtable_one_chain(&rxq, chain_len); + let desc = queue.pop().unwrap(); + + let mut buff = RxBuffer::new().unwrap(); + // SAFETY: safe it is a test memory + unsafe { + buff.add_chain(&mem, desc).unwrap(); + } + + let slice = buff.one_chain_mut_slice(); + // SAFETY: The user space pointer was verified at the point of creation. + #[allow(clippy::transmute_ptr_to_ref)] + let header: &virtio_net_hdr_v1 = unsafe { std::mem::transmute(slice[0].iov_base) }; + assert_eq!(header.num_buffers, 0); + + // There is one chain in the buffer. The length of the data "written" does + // not really matter. We just need to check that single chain present was popped + // and number of buffers is correctly set in the header. + // SAFETY: the buff is not empty + unsafe { + buff.finish_packet(1024, &mut queue); + } + assert_eq!(buff.iovecs.len(), 0); + assert!(buff.is_empty()); + assert_eq!(header.num_buffers, 1); + + assert_eq!(buff.used_descriptors, 1); + assert_eq!(rxq.used.idx.get(), 0); + buff.notify_queue(&mut queue); + assert_eq!(buff.used_descriptors, 0); + assert_eq!(rxq.used.idx.get(), 1); + } + + #[test] + fn test_rx_buffer_finish_packet_and_notify_mrg_buf() { + let mem = single_region_mem(65562); + let rxq = VirtQueue::new(GuestAddress(0), &mem, 256); + let mut queue = rxq.create_queue(); + + set_dtable_many_chains(&rxq, 2); + + let mut buff = RxBuffer::new().unwrap(); + while let Some(desc) = queue.pop() { + // SAFETY: safe it is a test memory + unsafe { + buff.add_chain(&mem, desc).unwrap(); + } + } + + let slice = buff.one_chain_mut_slice(); + // SAFETY: The user space pointer was verified at the point of creation. + #[allow(clippy::transmute_ptr_to_ref)] + let header: &virtio_net_hdr_v1 = unsafe { std::mem::transmute(slice[0].iov_base) }; + assert_eq!(header.num_buffers, 0); + + // There are 2 chains in the buffer. The length of the data "written" to it + // is big enough to touch both chains. We need to check that both chains present were + // popped and number of buffers is correctly set in the header. + // SAFETY: the buff is not empty + unsafe { + buff.finish_packet(2 * 1024, &mut queue); + } + assert_eq!(buff.iovecs.len(), 0); + assert!(buff.is_empty()); + assert_eq!(header.num_buffers, 2); + + assert_eq!(buff.used_descriptors, 2); + assert_eq!(rxq.used.idx.get(), 0); + buff.notify_queue(&mut queue); + assert_eq!(buff.used_descriptors, 0); + assert_eq!(rxq.used.idx.get(), 2); + } +} diff --git a/src/vmm/src/devices/virtio/net/tap.rs b/src/vmm/src/devices/virtio/net/tap.rs index 20024a1ae8e..9b826787386 100644 --- a/src/vmm/src/devices/virtio/net/tap.rs +++ b/src/vmm/src/devices/virtio/net/tap.rs @@ -7,10 +7,11 @@ use std::fmt::{self, Debug}; use std::fs::File; -use std::io::{Error as IoError, Read}; +use std::io::Error as IoError; use std::os::raw::*; use std::os::unix::io::{AsRawFd, FromRawFd, RawFd}; +use libc::iovec; use vmm_sys_util::ioctl::{ioctl_with_mut_ref, ioctl_with_ref, ioctl_with_val}; use vmm_sys_util::{ioctl_ioc_nr, ioctl_iow_nr}; @@ -190,11 +191,21 @@ impl Tap { } Ok(usize::try_from(ret).unwrap()) } -} -impl Read for Tap { - fn read(&mut self, buf: &mut [u8]) -> Result { - self.tap_file.read(buf) + /// Read from tap into a slice of `iovec`. + pub(crate) fn read_iovec(&mut self, buffer: &mut [iovec]) -> Result { + // SAFETY: buffer maximum length FIRECRACKER_MAX_QUEUE_SIZE (256). + #[allow(clippy::cast_possible_wrap)] + #[allow(clippy::cast_possible_truncation)] + let len = buffer.len() as i32; + + // SAFETY: `readv` is safe. Called with a valid tap fd, the iovec pointer and length + // is provide by the `IoVecBuffer` implementation and we check the return value. + let ret = unsafe { libc::readv(self.tap_file.as_raw_fd(), buffer.as_ptr(), len) }; + if ret == -1 { + return Err(IoError::last_os_error()); + } + Ok(usize::try_from(ret).unwrap()) } } @@ -208,6 +219,7 @@ impl AsRawFd for Tap { pub mod tests { #![allow(clippy::undocumented_unsafe_blocks)] + use std::io::Read; use std::os::unix::ffi::OsStrExt; use super::*; @@ -297,7 +309,10 @@ pub mod tests { tap_traffic_simulator.push_tx_packet(packet.as_bytes()); let mut buf = [0u8; PACKET_SIZE]; - assert_eq!(tap.read(&mut buf).unwrap(), PAYLOAD_SIZE + VNET_HDR_SIZE); + assert_eq!( + tap.tap_file.read(&mut buf).unwrap(), + PAYLOAD_SIZE + VNET_HDR_SIZE + ); assert_eq!( &buf[VNET_HDR_SIZE..packet.len() + VNET_HDR_SIZE], packet.as_bytes() @@ -339,4 +354,35 @@ pub mod tests { fragment3 ); } + + #[test] + fn test_read_iovec() { + let mut tap = Tap::open_named("").unwrap(); + enable(&tap); + let tap_traffic_simulator = TapTrafficSimulator::new(if_index(&tap)); + + let mut buff1 = vec![0; PAYLOAD_SIZE + VNET_HDR_SIZE]; + let mut buff2 = vec![0; 2 * PAYLOAD_SIZE]; + + let mut iovs = [ + iovec { + iov_base: buff1.as_mut_ptr().cast(), + iov_len: buff1.len(), + }, + iovec { + iov_base: buff2.as_mut_ptr().cast(), + iov_len: buff2.len(), + }, + ]; + + let packet = vmm_sys_util::rand::rand_alphanumerics(2 * PAYLOAD_SIZE); + tap_traffic_simulator.push_tx_packet(packet.as_bytes()); + assert_eq!( + tap.read_iovec(&mut iovs).unwrap(), + 2 * PAYLOAD_SIZE + VNET_HDR_SIZE + ); + assert_eq!(&buff1[VNET_HDR_SIZE..], &packet.as_bytes()[..PAYLOAD_SIZE]); + assert_eq!(&buff2[..PAYLOAD_SIZE], &packet.as_bytes()[PAYLOAD_SIZE..]); + assert_eq!(&buff2[PAYLOAD_SIZE..], &vec![0; PAYLOAD_SIZE]) + } } diff --git a/src/vmm/src/devices/virtio/net/test_utils.rs b/src/vmm/src/devices/virtio/net/test_utils.rs index 07808bbb44b..d63ab161a69 100644 --- a/src/vmm/src/devices/virtio/net/test_utils.rs +++ b/src/vmm/src/devices/virtio/net/test_utils.rs @@ -4,7 +4,7 @@ #![doc(hidden)] use std::fs::File; -use std::mem; +use std::mem::{self}; use std::os::raw::c_ulong; use std::os::unix::io::{AsRawFd, FromRawFd}; use std::process::Command; @@ -12,8 +12,10 @@ use std::str::FromStr; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; +use crate::devices::virtio::gen::virtio_net::virtio_net_hdr_v1; #[cfg(test)] use crate::devices::virtio::net::device::vnet_hdr_len; +use crate::devices::virtio::net::rx_buffer::header_set_num_buffers; use crate::devices::virtio::net::tap::{IfReqBuilder, Tap}; use crate::devices::virtio::net::Net; use crate::devices::virtio::queue::{Queue, QueueError}; @@ -244,17 +246,27 @@ pub fn enable(tap: &Tap) { .unwrap(); } +pub fn mock_frame(len: usize) -> Vec { + assert!(std::mem::size_of::() <= len); + // virtio_net_hdr_v1.num_buffers is a u16, so we need at least u16 alignment. + let layout = std::alloc::Layout::from_size_align(len, std::mem::align_of::()).unwrap(); + // SAFETY: + // We need mock frame to be with 0x2 alignment for virtio_net_hdr_v1. + let mut mock_frame = unsafe { Vec::from_raw_parts(std::alloc::alloc_zeroed(layout), len, len) }; + // SAFETY: + // Frame is bigger than the header and has correct alignment. + unsafe { + header_set_num_buffers(mock_frame.as_mut_ptr().cast(), 1); + } + mock_frame +} + #[cfg(test)] pub(crate) fn inject_tap_tx_frame(net: &Net, len: usize) -> Vec { - use std::os::unix::ffi::OsStrExt; - assert!(len >= vnet_hdr_len()); let tap_traffic_simulator = TapTrafficSimulator::new(if_index(&net.tap)); - let mut frame = vmm_sys_util::rand::rand_alphanumerics(len - vnet_hdr_len()) - .as_bytes() - .to_vec(); - tap_traffic_simulator.push_tx_packet(&frame); - frame.splice(0..0, vec![b'\0'; vnet_hdr_len()]); + let frame = mock_frame(len); + tap_traffic_simulator.push_tx_packet(&frame[vnet_hdr_len()..]); frame } @@ -430,8 +442,8 @@ pub mod test { event_fd.write(1).unwrap(); } - /// Generate a tap frame of `frame_len` and check that it is deferred - pub fn check_rx_deferred_frame(&mut self, frame_len: usize) -> Vec { + /// Generate a tap frame of `frame_len` and check that it is discarded + pub fn check_rx_discard_frame(&mut self, frame_len: usize) -> Vec { let used_idx = self.rxq.used.idx.get(); // Inject frame to tap and run epoll. @@ -441,8 +453,6 @@ pub mod test { 0, self.event_manager.run_with_timeout(100).unwrap() ); - // Check that the frame has been deferred. - assert!(self.net().rx_deferred_frame); // Check that the descriptor chain has been discarded. assert_eq!(self.rxq.used.idx.get(), used_idx + 1); assert!(&self.net().irq_trigger.has_pending_irq(IrqType::Vring)); diff --git a/src/vmm/src/devices/virtio/queue.rs b/src/vmm/src/devices/virtio/queue.rs index c183e8b2c00..50f437d2fef 100644 --- a/src/vmm/src/devices/virtio/queue.rs +++ b/src/vmm/src/devices/virtio/queue.rs @@ -574,8 +574,13 @@ impl Queue { self.next_avail -= Wrapping(1); } - /// Puts an available descriptor head into the used ring for use by the guest. - pub fn add_used(&mut self, desc_index: u16, len: u32) -> Result<(), QueueError> { + /// Write used element into used_ring ring. + pub fn write_used_element( + &mut self, + ring_index: u16, + desc_index: u16, + len: u32, + ) -> Result<(), QueueError> { if self.actual_size() <= desc_index { error!( "attempted to add out of bounds descriptor to used ring: {}", @@ -584,7 +589,7 @@ impl Queue { return Err(QueueError::DescIndexOutOfBounds(desc_index)); } - let next_used = self.next_used.0 % self.actual_size(); + let next_used = ring_index % self.actual_size(); let used_element = UsedElement { id: u32::from(desc_index), len, @@ -594,14 +599,24 @@ impl Queue { unsafe { self.used_ring_ring_set(usize::from(next_used), used_element); } + Ok(()) + } - self.num_added += Wrapping(1); - self.next_used += Wrapping(1); + /// Advance queue and used ring by `n` elements. + pub fn advance_used_ring(&mut self, n: u16) { + self.num_added += Wrapping(n); + self.next_used += Wrapping(n); // This fence ensures all descriptor writes are visible before the index update is. fence(Ordering::Release); self.used_ring_idx_set(self.next_used.0); + } + + /// Puts an available descriptor head into the used ring for use by the guest. + pub fn add_used(&mut self, desc_index: u16, len: u32) -> Result<(), QueueError> { + self.write_used_element(self.next_used.0, desc_index, len)?; + self.advance_used_ring(1); Ok(()) } diff --git a/src/vmm/src/devices/virtio/test_utils.rs b/src/vmm/src/devices/virtio/test_utils.rs index 9bb66db82ae..89b3810d8bb 100644 --- a/src/vmm/src/devices/virtio/test_utils.rs +++ b/src/vmm/src/devices/virtio/test_utils.rs @@ -8,6 +8,7 @@ use std::marker::PhantomData; use std::mem; use std::sync::atomic::{AtomicUsize, Ordering}; +use super::queue::{VIRTQ_DESC_F_NEXT, VIRTQ_DESC_F_WRITE}; use crate::devices::virtio::queue::Queue; use crate::test_utils::single_region_mem; use crate::utils::u64_to_usize; @@ -312,6 +313,52 @@ impl<'a> VirtQueue<'a> { } } +/// Create one chain with n descriptors +/// Descriptor buffers will leave at the offset of 2048 bytes +/// to leave some room for queue objects. +/// We don't really care about sizes of descriptors, +/// so pick 1024. +/// Allow casting because this is a test code +#[allow(clippy::cast_possible_wrap)] +#[allow(clippy::cast_possible_truncation)] +pub fn set_dtable_one_chain(rxq: &VirtQueue, n: usize) { + let desc_size = 1024; + for i in 0..n { + rxq.dtable[i].set( + (2048 + desc_size * i) as u64, + desc_size as u32, + VIRTQ_DESC_F_WRITE | VIRTQ_DESC_F_NEXT, + (i + 1) as u16, + ); + } + rxq.dtable[n - 1].flags.set(VIRTQ_DESC_F_WRITE); + rxq.dtable[n - 1].next.set(0); + rxq.avail.ring[0].set(0); + rxq.avail.idx.set(n as u16); +} + +/// Create n chains with 1 descriptors each +/// Descriptor buffers will leave at the offset of 2048 bytes +/// to leave some room for queue objects. +/// We don't really care about sizes of descriptors, +/// so pick 1024. +/// Allow casting because this is a test code +#[allow(clippy::cast_possible_wrap)] +#[allow(clippy::cast_possible_truncation)] +pub fn set_dtable_many_chains(rxq: &VirtQueue, n: usize) { + let desc_size = 1024; + for i in 0..n { + rxq.dtable[i].set( + (2048 + desc_size * i) as u64, + desc_size as u32, + VIRTQ_DESC_F_WRITE, + 0, + ); + rxq.avail.ring[i].set(i as u16); + } + rxq.avail.idx.set(n as u16); +} + #[cfg(test)] pub(crate) mod test { diff --git a/src/vmm/src/utils/mod.rs b/src/vmm/src/utils/mod.rs index a0ee2e90b6b..021b9cb8aeb 100644 --- a/src/vmm/src/utils/mod.rs +++ b/src/vmm/src/utils/mod.rs @@ -5,6 +5,8 @@ pub mod byte_order; /// Module with network related helpers pub mod net; +/// Module with `RingBuffer` struct +pub mod ring_buffer; /// Module with external libc functions pub mod signal; /// Module with state machine @@ -22,6 +24,15 @@ pub fn get_page_size() -> Result { } } +/// Safely converts a u32 value to a usize value. +/// This bypasses the Clippy lint check because we only support 64-bit platforms. +#[cfg(target_pointer_width = "64")] +#[inline] +#[allow(clippy::cast_possible_truncation)] +pub const fn u32_to_usize(num: u32) -> usize { + num as usize +} + /// Safely converts a u64 value to a usize value. /// This bypasses the Clippy lint check because we only support 64-bit platforms. #[cfg(target_pointer_width = "64")] diff --git a/src/vmm/src/utils/ring_buffer.rs b/src/vmm/src/utils/ring_buffer.rs new file mode 100644 index 00000000000..4994b537f8a --- /dev/null +++ b/src/vmm/src/utils/ring_buffer.rs @@ -0,0 +1,163 @@ +// Copyright 2024 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use super::u32_to_usize; + +/// Simple ring buffer of fixed size. Because we probably will never +/// need to have buffers with size bigger that u32::MAX, +/// indexes in this struct are u32, so the maximum size is u32::MAX. +/// This saves 8 bytes compared to `VecDequeue` in the standard library. +/// (24 bytes vs 32 bytes) +/// Making indexes smaller than u32 does not make sense, as the alignment +/// of `Box` is 8 bytes. +#[derive(Debug, Default, Clone)] +pub struct RingBuffer { + /// Fixed array of items. + pub items: Box<[T]>, + /// Start index. + pub start: u32, + /// Current length of the ring. + pub len: u32, +} + +impl RingBuffer { + /// New with specified size. + pub fn new_with_size(size: u32) -> Self { + Self { + items: vec![T::default(); u32_to_usize(size)].into_boxed_slice(), + start: 0, + len: 0, + } + } + + /// Get number of items in the buffer + pub fn len(&self) -> u32 { + self.len + } + + /// Check if ring is empty + pub fn is_empty(&self) -> bool { + self.len == 0 + } + + /// Check if ring is full + pub fn is_full(&self) -> bool { + u32_to_usize(self.len) == self.items.len() + } + + /// Returns a reference to the first element if ring is not empty. + pub fn first(&self) -> Option<&T> { + if self.is_empty() { + None + } else { + Some(&self.items[u32_to_usize(self.start)]) + } + } + + /// Push new item to the end of the ring and increases + /// the length. + /// If there is no space for it, nothing will happen. + pub fn push_back(&mut self, item: T) { + if !self.is_full() { + let index = u32_to_usize(self.start + self.len) % self.items.len(); + self.items[index] = item; + self.len += 1; + } + } + + /// Pop item from the from of the ring and return + /// a reference to it. + /// If ring is empty returns None. + pub fn pop_front(&mut self) -> Option<&T> { + if self.is_empty() { + None + } else { + let index = u32_to_usize(self.start); + self.start += 1; + + // Need to allow this, because we cast `items.len()` to u32, + // but this is safe as the max size of the buffer is u32::MAX. + #[allow(clippy::cast_possible_truncation)] + let items_len = self.items.len() as u32; + + self.start %= items_len; + self.len -= 1; + Some(&self.items[index]) + } + } +} + +#[cfg(test)] +pub mod tests { + use super::*; + + #[test] + fn test_new() { + let a = RingBuffer::::new_with_size(69); + assert_eq!(a.items.len(), 69); + assert_eq!(a.start, 0); + assert_eq!(a.len, 0); + assert!(a.is_empty()); + assert!(!a.is_full()); + } + + #[test] + fn test_push() { + let mut a = RingBuffer::::new_with_size(4); + + a.push_back(0); + assert!(!a.is_empty()); + assert!(!a.is_full()); + + a.push_back(1); + assert!(!a.is_empty()); + assert!(!a.is_full()); + + a.push_back(2); + assert!(!a.is_empty()); + assert!(!a.is_full()); + + a.push_back(3); + assert!(!a.is_empty()); + assert!(a.is_full()); + + assert_eq!(a.items.as_ref(), &[0, 1, 2, 3]); + + a.push_back(4); + assert!(!a.is_empty()); + assert!(a.is_full()); + + assert_eq!(a.items.as_ref(), &[0, 1, 2, 3]); + } + + #[test] + fn test_pop_front() { + let mut a = RingBuffer::::new_with_size(4); + a.push_back(0); + a.push_back(1); + a.push_back(2); + a.push_back(3); + assert!(!a.is_empty()); + assert!(a.is_full()); + + assert_eq!(*a.pop_front().unwrap(), 0); + assert!(!a.is_empty()); + assert!(!a.is_full()); + + assert_eq!(*a.pop_front().unwrap(), 1); + assert!(!a.is_empty()); + assert!(!a.is_full()); + + assert_eq!(*a.pop_front().unwrap(), 2); + assert!(!a.is_empty()); + assert!(!a.is_full()); + + assert_eq!(*a.pop_front().unwrap(), 3); + assert!(a.is_empty()); + assert!(!a.is_full()); + + assert!(a.pop_front().is_none()); + assert!(a.is_empty()); + assert!(!a.is_full()); + } +}