Skip to content

Commit 71688c7

Browse files
new mbuf allocation
1 parent 836e686 commit 71688c7

File tree

5 files changed

+175
-44
lines changed

5 files changed

+175
-44
lines changed

core/src/dpdk/mbuf.rs

Lines changed: 45 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,14 @@
1616
* SPDX-License-Identifier: Apache-2.0
1717
*/
1818

19-
use super::MEMPOOL;
20-
use crate::dpdk::{DpdkError, MempoolError};
21-
use crate::ffi::{self, ToResult};
19+
use crate::ffi::dpdk::{self, MbufPtr};
2220
use crate::packets::{Internal, Packet};
21+
use crate::rt2::Mempool;
2322
use crate::{ensure, trace};
2423
use anyhow::Result;
24+
use capsule_ffi as cffi;
2525
use std::fmt;
2626
use std::mem;
27-
use std::os::raw;
2827
use std::ptr::{self, NonNull};
2928
use std::slice;
3029
use thiserror::Error;
@@ -113,21 +112,21 @@ enum MbufInner {
113112
/// Original version of the message buffer that should be freed when it goes
114113
/// out of scope, unless the ownership of the pointer is given back to
115114
/// DPDK on transmit.
116-
Original(NonNull<ffi::rte_mbuf>),
115+
Original(NonNull<cffi::rte_mbuf>),
117116
/// A clone version of the message buffer that should not be freed when
118117
/// it goes out of scope.
119-
Clone(NonNull<ffi::rte_mbuf>),
118+
Clone(NonNull<cffi::rte_mbuf>),
120119
}
121120

122121
impl MbufInner {
123-
fn ptr(&self) -> &NonNull<ffi::rte_mbuf> {
122+
fn ptr(&self) -> &NonNull<cffi::rte_mbuf> {
124123
match self {
125124
MbufInner::Original(raw) => raw,
126125
MbufInner::Clone(raw) => raw,
127126
}
128127
}
129128

130-
fn ptr_mut(&mut self) -> &mut NonNull<ffi::rte_mbuf> {
129+
fn ptr_mut(&mut self) -> &mut NonNull<cffi::rte_mbuf> {
131130
match self {
132131
MbufInner::Original(ref mut raw) => raw,
133132
MbufInner::Clone(ref mut raw) => raw,
@@ -144,23 +143,23 @@ impl Mbuf {
144143
///
145144
/// # Errors
146145
///
147-
/// Returns `MempoolError::Exhausted` if the allocation of mbuf fails.
146+
/// Returns `MempoolPtrUnsetError` if invoked from a non lcore.
147+
/// Returns `DpdkError` if the allocation of mbuf fails.
148148
#[inline]
149149
pub fn new() -> Result<Self> {
150-
let mempool = MEMPOOL.with(|tls| tls.get());
151-
let raw =
152-
unsafe { ffi::_rte_pktmbuf_alloc(mempool).into_result(|_| MempoolError::Exhausted)? };
150+
let mut mp = Mempool::thread_local_ptr()?;
151+
let ptr = dpdk::pktmbuf_alloc(&mut mp)?;
153152

154153
Ok(Mbuf {
155-
inner: MbufInner::Original(raw),
154+
inner: MbufInner::Original(ptr.into()),
156155
})
157156
}
158157

159158
/// Creates a new message buffer from a byte array.
160159
///
161160
/// # Errors
162161
///
163-
/// Returns `MempoolError::Exhausted` if the allocation of mbuf fails.
162+
/// Returns `DpdkError` if the allocation of mbuf fails.
164163
/// Returns `BufferError::NotResized` if the byte array is larger than
165164
/// the maximum mbuf size.
166165
#[inline]
@@ -173,21 +172,29 @@ impl Mbuf {
173172

174173
/// Creates a new `Mbuf` from a raw pointer.
175174
#[inline]
176-
pub(crate) unsafe fn from_ptr(ptr: *mut ffi::rte_mbuf) -> Self {
175+
pub(crate) fn from_easyptr(ptr: MbufPtr) -> Self {
176+
Mbuf {
177+
inner: MbufInner::Original(ptr.into()),
178+
}
179+
}
180+
181+
/// Creates a new `Mbuf` from a raw pointer.
182+
#[inline]
183+
pub(crate) unsafe fn from_ptr(ptr: *mut cffi::rte_mbuf) -> Self {
177184
Mbuf {
178185
inner: MbufInner::Original(NonNull::new_unchecked(ptr)),
179186
}
180187
}
181188

182189
/// Returns the raw struct needed for FFI calls.
183190
#[inline]
184-
fn raw(&self) -> &ffi::rte_mbuf {
191+
fn raw(&self) -> &cffi::rte_mbuf {
185192
unsafe { self.inner.ptr().as_ref() }
186193
}
187194

188195
/// Returns the raw struct needed for FFI calls.
189196
#[inline]
190-
fn raw_mut(&mut self) -> &mut ffi::rte_mbuf {
197+
fn raw_mut(&mut self) -> &mut cffi::rte_mbuf {
191198
unsafe { self.inner.ptr_mut().as_mut() }
192199
}
193200

@@ -417,7 +424,18 @@ impl Mbuf {
417424
/// The `Mbuf` is consumed. It is the caller's the responsibility to
418425
/// free the raw pointer after use. Otherwise the buffer is leaked.
419426
#[inline]
420-
pub(crate) fn into_ptr(self) -> *mut ffi::rte_mbuf {
427+
pub(crate) fn into_easyptr(self) -> MbufPtr {
428+
let ptr = self.inner.ptr().clone();
429+
mem::forget(self);
430+
ptr.into()
431+
}
432+
433+
/// Acquires the underlying raw struct pointer.
434+
///
435+
/// The `Mbuf` is consumed. It is the caller's the responsibility to
436+
/// free the raw pointer after use. Otherwise the buffer is leaked.
437+
#[inline]
438+
pub(crate) fn into_ptr(self) -> *mut cffi::rte_mbuf {
421439
let ptr = self.inner.ptr().as_ptr();
422440
mem::forget(self);
423441
ptr
@@ -430,25 +448,21 @@ impl Mbuf {
430448
/// Returns `DpdkError` if the allocation of mbuf fails.
431449
pub fn alloc_bulk(len: usize) -> Result<Vec<Mbuf>> {
432450
let mut ptrs = Vec::with_capacity(len);
433-
let mempool = MEMPOOL.with(|tls| tls.get());
434-
435-
let mbufs = unsafe {
436-
ffi::_rte_pktmbuf_alloc_bulk(mempool, ptrs.as_mut_ptr(), len as raw::c_uint)
437-
.into_result(DpdkError::from_errno)?;
451+
let mut mp = Mempool::thread_local_ptr()?;
452+
dpdk::pktmbuf_alloc_bulk(&mut mp, &mut ptrs)?;
438453

439-
ptrs.set_len(len);
440-
ptrs.into_iter()
441-
.map(|ptr| Mbuf::from_ptr(ptr))
442-
.collect::<Vec<_>>()
443-
};
454+
let mbufs = ptrs.into_iter().map(Mbuf::from_easyptr).collect::<Vec<_>>();
444455

445456
Ok(mbufs)
446457
}
447458

448459
/// Frees the message buffers in bulk.
449460
pub(crate) fn free_bulk(mbufs: Vec<Mbuf>) {
450-
let ptrs = mbufs.into_iter().map(Mbuf::into_ptr).collect::<Vec<_>>();
451-
super::mbuf_free_bulk(ptrs);
461+
let mut ptrs = mbufs
462+
.into_iter()
463+
.map(Mbuf::into_easyptr)
464+
.collect::<Vec<_>>();
465+
dpdk::pktmbuf_free_bulk(&mut ptrs);
452466
}
453467
}
454468

@@ -469,9 +483,7 @@ impl Drop for Mbuf {
469483
match self.inner {
470484
MbufInner::Original(_) => {
471485
trace!("freeing mbuf@{:p}.", self.raw().buf_addr);
472-
unsafe {
473-
ffi::_rte_pktmbuf_free(self.raw_mut());
474-
}
486+
dpdk::pktmbuf_free(self.inner.ptr().clone().into());
475487
}
476488
MbufInner::Clone(_) => (),
477489
}

core/src/ffi/dpdk.rs

Lines changed: 74 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ pub(crate) fn mempool_free(ptr: &mut MempoolPtr) {
131131
}
132132

133133
/// An opaque identifier for a logical execution unit of the processor.
134-
#[derive(Copy, Clone)]
134+
#[derive(Copy, Clone, Eq, Hash, PartialEq)]
135135
pub(crate) struct LcoreId(raw::c_uint);
136136

137137
impl LcoreId {
@@ -397,6 +397,79 @@ pub(crate) fn eth_tx_queue_setup(
397397
}
398398
}
399399

400+
/// A `rte_mbuf` pointer.
401+
pub(crate) type MbufPtr = EasyPtr<cffi::rte_mbuf>;
402+
403+
/// Allocates a new mbuf from a mempool.
404+
pub(crate) fn pktmbuf_alloc(mp: &mut MempoolPtr) -> Result<MbufPtr> {
405+
let ptr =
406+
unsafe { cffi::_rte_pktmbuf_alloc(mp.deref_mut()).into_result(|_| DpdkError::new())? };
407+
408+
Ok(EasyPtr(ptr))
409+
}
410+
411+
/// Allocates a bulk of mbufs.
412+
pub(crate) fn pktmbuf_alloc_bulk(mp: &mut MempoolPtr, mbufs: &mut Vec<MbufPtr>) -> Result<()> {
413+
let len = mbufs.capacity();
414+
415+
unsafe {
416+
cffi::_rte_pktmbuf_alloc_bulk(
417+
mp.deref_mut(),
418+
mbufs.as_mut_ptr() as *mut *mut cffi::rte_mbuf,
419+
len as raw::c_uint,
420+
)
421+
.into_result(DpdkError::from_errno)?;
422+
423+
mbufs.set_len(len);
424+
}
425+
426+
Ok(())
427+
}
428+
429+
/// Frees a packet mbuf back into its original mempool.
430+
pub(crate) fn pktmbuf_free(mut m: MbufPtr) {
431+
unsafe {
432+
cffi::_rte_pktmbuf_free(m.deref_mut());
433+
}
434+
}
435+
436+
/// Frees a bulk of packet mbufs back into their original mempools.
437+
pub(crate) fn pktmbuf_free_bulk(mbufs: &mut Vec<MbufPtr>) {
438+
assert!(!mbufs.is_empty());
439+
440+
let mut to_free = Vec::with_capacity(mbufs.len());
441+
let mut pool = mbufs[0].pool;
442+
443+
for mbuf in mbufs.drain(..) {
444+
if pool == mbuf.pool {
445+
to_free.push(mbuf);
446+
} else {
447+
unsafe {
448+
let len = to_free.len();
449+
cffi::_rte_mempool_put_bulk(
450+
pool,
451+
to_free.as_ptr() as *const *mut raw::c_void,
452+
len as u32,
453+
);
454+
to_free.set_len(0);
455+
}
456+
457+
pool = mbuf.pool;
458+
to_free.push(mbuf);
459+
}
460+
}
461+
462+
unsafe {
463+
let len = to_free.len();
464+
cffi::_rte_mempool_put_bulk(
465+
pool,
466+
to_free.as_ptr() as *const *mut raw::c_void,
467+
len as u32,
468+
);
469+
to_free.set_len(0);
470+
}
471+
}
472+
400473
/// An error generated in `libdpdk`.
401474
///
402475
/// When an FFI call fails, the `errno` is translated into `DpdkError`.

core/src/ffi/mod.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,3 +160,17 @@ impl<T> DerefMut for EasyPtr<T> {
160160
unsafe { self.0.as_mut() }
161161
}
162162
}
163+
164+
// delete later?
165+
impl<T> From<NonNull<T>> for EasyPtr<T> {
166+
fn from(ptr: NonNull<T>) -> Self {
167+
EasyPtr(ptr)
168+
}
169+
}
170+
171+
// delete later?
172+
impl<T> From<EasyPtr<T>> for NonNull<T> {
173+
fn from(ptr: EasyPtr<T>) -> Self {
174+
ptr.0
175+
}
176+
}

core/src/rt2/mempool.rs

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,14 @@
1616
* SPDX-License-Identifier: Apache-2.0
1717
*/
1818

19-
use crate::ffi::dpdk::{self, MempoolPtr, SocketId};
19+
use crate::ffi::dpdk::{self, LcoreId, MempoolPtr, SocketId};
2020
use crate::ffi::AsStr;
2121
use crate::{debug, info};
2222
use anyhow::Result;
23+
use std::cell::Cell;
2324
use std::fmt;
25+
use std::ptr::{self, NonNull};
26+
use thiserror::Error;
2427

2528
/// A memory pool is an allocator of message buffers, or `Mbuf`. For best
2629
/// performance, each socket should have a dedicated `Mempool`. However,
@@ -59,6 +62,7 @@ impl Mempool {
5962
}
6063

6164
/// Returns the raw pointer.
65+
#[inline]
6266
pub(crate) fn ptr_mut(&mut self) -> &mut MempoolPtr {
6367
&mut self.ptr
6468
}
@@ -86,6 +90,19 @@ impl Mempool {
8690
pub(crate) fn socket(&self) -> SocketId {
8791
self.ptr.socket_id.into()
8892
}
93+
94+
/// Returns the thread local mempool pointer.
95+
///
96+
/// # Errors
97+
///
98+
/// Returns `MempoolPtrUnsetError` if the thread local pointer is not
99+
/// set. For example when invoked from a non-EAL thread.
100+
pub(crate) fn thread_local_ptr() -> Result<MempoolPtr> {
101+
let ptr = MEMPOOL.with(|tls| tls.get());
102+
NonNull::new(ptr)
103+
.ok_or_else(|| MempoolPtrUnsetError(LcoreId::current()).into())
104+
.map(|ptr| ptr.into())
105+
}
89106
}
90107

91108
impl fmt::Debug for Mempool {
@@ -108,6 +125,19 @@ impl Drop for Mempool {
108125
}
109126
}
110127

128+
/// The thread local mempool is not set.
129+
#[derive(Debug, Error)]
130+
#[error("thread local mempool pointer not set for {0:?}.")]
131+
pub(crate) struct MempoolPtrUnsetError(LcoreId);
132+
133+
thread_local! {
134+
/// The `Mempool` assigned to the core when the core is initialized.
135+
/// `Mbuf::new` uses this pool to allocate new buffers when executed on
136+
/// the core. For best performance, the `Mempool` and the core should
137+
/// share the same socket.
138+
pub(crate) static MEMPOOL: Cell<*mut capsule_ffi::rte_mempool> = Cell::new(ptr::null_mut());
139+
}
140+
111141
#[cfg(test)]
112142
mod tests {
113143
use super::*;

core/src/testils/mod.rs

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,13 @@ mod rvg;
2727
pub use self::packet::*;
2828
pub use self::rvg::*;
2929

30-
use crate::dpdk::{Mempool, SocketId, MEMPOOL};
31-
use crate::ffi::dpdk;
30+
use crate::ffi::dpdk::{self, SocketId};
3231
use crate::metrics;
32+
use crate::rt2::{Mempool, MEMPOOL};
33+
use std::ops::DerefMut;
3334
use std::ptr;
3435
use std::sync::Once;
36+
use std::thread;
3537

3638
static TEST_INIT: Once = Once::new();
3739

@@ -64,12 +66,11 @@ pub fn cargo_test_init() {
6466
});
6567
}
6668

67-
/// A handle that keeps the mempool in scope for the duration of the test. It
68-
/// will unset the thread-bound mempool on drop.
69+
/// A RAII guard that keeps the mempool in scope for the duration of the
70+
/// test. It will unset the thread-bound mempool on drop.
6971
#[derive(Debug)]
7072
pub struct MempoolGuard {
71-
#[allow(dead_code)]
72-
inner: Mempool,
73+
_inner: Mempool,
7374
}
7475

7576
impl Drop for MempoolGuard {
@@ -81,7 +82,8 @@ impl Drop for MempoolGuard {
8182
/// Creates a new mempool for test that automatically cleans up after the
8283
/// test completes.
8384
pub fn new_mempool(capacity: usize, cache_size: usize) -> MempoolGuard {
84-
let mut mempool = Mempool::new(capacity, cache_size, SocketId::ANY).unwrap();
85-
MEMPOOL.with(|tls| tls.set(mempool.raw_mut()));
86-
MempoolGuard { inner: mempool }
85+
let name = format!("test-mp-{:?}", thread::current().id());
86+
let mut mempool = Mempool::new(name, capacity, cache_size, SocketId::ANY).unwrap();
87+
MEMPOOL.with(|tls| tls.set(mempool.ptr_mut().deref_mut()));
88+
MempoolGuard { _inner: mempool }
8789
}

0 commit comments

Comments
 (0)