Skip to content
Closed
Changes from 23 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
215 changes: 214 additions & 1 deletion ibverbs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ use std::time::Duration;

const PORT_NUM: u8 = 1;

/// The doorbell batching API(s) will process only up to these many work requests at a time
/// to prevent inefficient dynamic heap allocations.
pub const DOORBELL_BATCH_LIMIT: usize = 32;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of specifying this as a constant, could we make this a template argument to the post(&[SendWR]) function so that the user has control over it. we can default to 32


/// Direct access to low-level libverbs FFI.
pub use ffi::ibv_gid_type;
pub use ffi::ibv_mtu;
Expand Down Expand Up @@ -1532,6 +1536,10 @@ impl<T> MemoryRegion<T> {
&mut self.data
}

fn inner_immut(&self) -> &T {
&self.data
}

/// Make a subslice of this memory region.
pub fn slice(&self, bounds: impl RangeBounds<usize>) -> LocalMemorySlice {
let (addr, length) = calc_addr_len(
Expand All @@ -1546,6 +1554,49 @@ impl<T> MemoryRegion<T> {
};
LocalMemorySlice { _sge: sge }
}

/// Make a convenient `MemorySlicer`, useful for buffer pools that need to own `MemoryRegions`.
pub fn mk_slicer(&self) -> MemorySlicer {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this needed? it seems unrelated to the doorbell batching, so would be good to make a different PR and discuss there

MemorySlicer {
addr: unsafe { *self.mr }.addr as u64,
bytes_len: unsafe { *self.mr }.length,
lkey: unsafe { *self.mr }.lkey,
}
}
}

/// A non-owning utility wrapper to compute subslices of a preregistered memory region. Useful
/// for safe buffer pooling wherein the buffer pool can own the original `MemoryRegion`.
pub struct MemorySlicer {
addr: u64,
bytes_len: usize,
lkey: u32,
}
impl MemorySlicer {
/// Make a subslice of this memory region.
pub fn slice(&self, bounds: impl RangeBounds<usize>) -> LocalMemorySlice {
let (addr, length) = calc_addr_len(bounds, self.addr, self.bytes_len);
let sge = ffi::ibv_sge {
addr,
length,
lkey: self.lkey,
};
LocalMemorySlice { _sge: sge }
}
}

/// A `MemoryRegion` with buffer (slice) semantics, convenient to pass in to a buffer pool structure
/// to own.
pub struct OwnedMemoryRegion<T: AsRef<[u8]> + AsMut<[u8]>>(pub MemoryRegion<T>);
impl<T: AsRef<[u8]> + AsMut<[u8]>> AsRef<[u8]> for OwnedMemoryRegion<T> {
fn as_ref(&self) -> &[u8] {
self.0.inner_immut().as_ref()
}
}
impl<T: AsRef<[u8]> + AsMut<[u8]>> AsMut<[u8]> for OwnedMemoryRegion<T> {
fn as_mut(&mut self) -> &mut [u8] {
self.0.inner().as_mut()
}
}

/// Local memory slice.
Expand Down Expand Up @@ -1604,7 +1655,9 @@ impl RemoteMemoryRegion {
/// Remote memory slice.
#[derive(Debug, Default, Copy, Clone)]
pub struct RemoteMemorySlice {
addr: u64,
/// Memory address of the registered region. Visible to enable user bookkeeping for
/// doorbell batching.
pub addr: u64,
#[allow(unused)]
length: u32,
rkey: u32,
Expand Down Expand Up @@ -2070,6 +2123,166 @@ impl QueuePair {
Ok(())
}
}

/// Post multiple one-sided, non-vectored requests of the same type (i.e., all WRITE
/// or all READ) at once, exploiting doorbell batching, up to `DOORBELL_BATCH_LIMIT`
/// at a time.
/// On success, returns None, else, on failure, returns the error message and
/// the index of the first request that failed to be posted (all prior ones
/// were successfully posted).
pub fn post_one_sided_batch_single_type(
&mut self,
is_read: bool,
wrids_imms: &mut [(u64, Option<u32>)],
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be good to simplify this function interface and avoid having two functions. could we instead introduce something like this:

struct SendWR<L: usize> {
  pub is_read: bool, // we should use a proper enum here instead,
  pub wr_id: u64,
  pub imm_data: Option<u32>,
  pub locals: SmallVec::<[LocalMemorySlice; L]>,
  pub remote: RemoteMemorySlice,
};

then a single post function that accepts &[SendWR] might be enough for this use case and also the use case in #72

locals: &mut [LocalMemorySlice],
remotes: &[RemoteMemorySlice],
) -> Option<(usize, io::Error)> {
if wrids_imms.len() != remotes.len() {
return Some((0, io::Error::from(io::ErrorKind::InvalidInput)));
}
if locals.len() != remotes.len() {
return Some((0, io::Error::from(io::ErrorKind::InvalidInput)));
}

let num_wrs = std::cmp::min(wrids_imms.len(), DOORBELL_BATCH_LIMIT);
let mut wrs = [ffi::ibv_send_wr::default(); DOORBELL_BATCH_LIMIT];
let base_wr_ptr = wrs.as_mut_ptr();
for ((((idx, wr), (wr_id, imm_data)), local), remote) in wrs
.iter_mut()
.enumerate()
.zip(wrids_imms)
.zip(locals)
.zip(remotes)
.take(num_wrs)
{
let (opcode, anon_1) = if is_read {
(ffi::ibv_wr_opcode::IBV_WR_RDMA_READ, Default::default())
} else if let Some(imm_data) = imm_data {
(
ffi::ibv_wr_opcode::IBV_WR_RDMA_WRITE_WITH_IMM,
ffi::ibv_send_wr__bindgen_ty_1 {
imm_data: imm_data.to_be(),
},
)
} else {
(ffi::ibv_wr_opcode::IBV_WR_RDMA_WRITE, Default::default())
};

*wr = ffi::ibv_send_wr {
wr_id: *wr_id,
next: if idx < num_wrs - 1 {
unsafe { base_wr_ptr.add(idx + 1) }
} else {
ptr::null::<ffi::ibv_send_wr>() as *mut _
},
sg_list: local as *mut LocalMemorySlice as *mut ffi::ibv_sge,
num_sge: 1i32,
opcode,
send_flags: ffi::ibv_send_flags::IBV_SEND_SIGNALED.0,
wr: ffi::ibv_send_wr__bindgen_ty_2 {
rdma: ffi::ibv_send_wr__bindgen_ty_2__bindgen_ty_1 {
remote_addr: remote.addr,
rkey: remote.rkey,
},
},
qp_type: Default::default(),
__bindgen_anon_1: anon_1,
__bindgen_anon_2: Default::default(),
};
}
let mut bad_wr: *mut ffi::ibv_send_wr = ptr::null::<ffi::ibv_send_wr>() as *mut _;

let ctx = unsafe { *self.qp }.context;
let ops = &mut unsafe { *ctx }.ops;
let errno =
unsafe { ops.post_send.as_mut().unwrap()(self.qp, base_wr_ptr, &mut bad_wr as *mut _) };
if errno != 0 {
let bad_idx = unsafe { bad_wr.offset_from(wrs.as_ptr()) } as usize;
Some((bad_idx, io::Error::from_raw_os_error(errno)))
} else {
None
}
}

/// Post multiple one-sided, vectored requests of the same type (i.e., all WRITE
/// or all READ) at once, exploiting doorbell batching, up to `DOORBELL_BATCH_LIMIT`
/// at a time.
/// On success, returns None, else, on failure, returns the error message and
/// the index of the first request that failed to be posted (all prior ones
/// were successfully posted).
pub fn post_one_sided_batch_single_type_vectored(
&mut self,
is_read: bool,
wrids_imms: &[(u64, Option<u32>)],
locals: &[Vec<LocalMemorySlice>],
remotes: &[RemoteMemorySlice],
) -> Option<(usize, io::Error)> {
if wrids_imms.len() != remotes.len() {
return Some((0, io::Error::from(io::ErrorKind::InvalidInput)));
}
if locals.len() != remotes.len() {
return Some((0, io::Error::from(io::ErrorKind::InvalidInput)));
}

let num_wrs = std::cmp::min(wrids_imms.len(), DOORBELL_BATCH_LIMIT);
let mut wrs = [ffi::ibv_send_wr::default(); DOORBELL_BATCH_LIMIT];
let base_wr_ptr = wrs.as_mut_ptr();
for ((((idx, wr), (wr_id, imm_data)), local), remote) in wrs
.iter_mut()
.enumerate()
.zip(wrids_imms)
.zip(locals)
.zip(remotes)
.take(num_wrs)
{
let (opcode, anon_1) = if is_read {
(ffi::ibv_wr_opcode::IBV_WR_RDMA_READ, Default::default())
} else if let Some(imm_data) = imm_data {
(
ffi::ibv_wr_opcode::IBV_WR_RDMA_WRITE_WITH_IMM,
ffi::ibv_send_wr__bindgen_ty_1 {
imm_data: imm_data.to_be(),
},
)
} else {
(ffi::ibv_wr_opcode::IBV_WR_RDMA_WRITE, Default::default())
};

*wr = ffi::ibv_send_wr {
wr_id: *wr_id,
next: if idx < num_wrs - 1 {
unsafe { base_wr_ptr.add(idx + 1) }
} else {
ptr::null::<ffi::ibv_send_wr>() as *mut _
},
sg_list: local.as_ptr() as *mut ffi::ibv_sge,
num_sge: local.len() as i32,
opcode,
send_flags: ffi::ibv_send_flags::IBV_SEND_SIGNALED.0,
wr: ffi::ibv_send_wr__bindgen_ty_2 {
rdma: ffi::ibv_send_wr__bindgen_ty_2__bindgen_ty_1 {
remote_addr: remote.addr,
rkey: remote.rkey,
},
},
qp_type: Default::default(),
__bindgen_anon_1: anon_1,
__bindgen_anon_2: Default::default(),
};
}
let mut bad_wr: *mut ffi::ibv_send_wr = ptr::null::<ffi::ibv_send_wr>() as *mut _;

let ctx = unsafe { *self.qp }.context;
let ops = &mut unsafe { *ctx }.ops;
let errno =
unsafe { ops.post_send.as_mut().unwrap()(self.qp, base_wr_ptr, &mut bad_wr as *mut _) };
if errno != 0 {
let bad_idx = unsafe { bad_wr.offset_from(wrs.as_ptr()) } as usize;
Some((bad_idx, io::Error::from_raw_os_error(errno)))
} else {
None
}
}
}

impl Drop for QueuePair {
Expand Down