diff --git a/Cargo.lock b/Cargo.lock index a09bcefc..790c29bb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -124,6 +124,7 @@ version = "0.3.2+55.0" dependencies = [ "bindgen", "cmake", + "libloading", "proc-macro2", ] diff --git a/ibverbs-sys/Cargo.toml b/ibverbs-sys/Cargo.toml index 8bb76423..84478c5a 100644 --- a/ibverbs-sys/Cargo.toml +++ b/ibverbs-sys/Cargo.toml @@ -23,6 +23,12 @@ license = "MIT OR Apache-2.0" exclude = ["vendor/rdma-core/build/"] +[features] +efa = [] + +[dependencies] +libloading = "0.8" + [build-dependencies] bindgen = "0.71.1" cmake = "0.1.50" diff --git a/ibverbs-sys/build.rs b/ibverbs-sys/build.rs index 96bcac29..8cd9c46e 100644 --- a/ibverbs-sys/build.rs +++ b/ibverbs-sys/build.rs @@ -50,11 +50,29 @@ fn main() { eprintln!("run bindgen"); let bindings = bindgen::Builder::default() .header("vendor/rdma-core/libibverbs/verbs.h") + .header("vendor/rdma-core/providers/efa/efadv.h") .clang_arg(format!("-I{built_in}/include/")) .allowlist_function("ibv_.*") .allowlist_function("_ibv_.*") + .allowlist_function("efadv_.*") + .allowlist_function("ibv_cq_ex_to_cq") + .allowlist_function("ibv_start_poll") + .allowlist_type("ibv_cq_ex") + .allowlist_type("ibv_poll_cq_attr") + .allowlist_function("ibv_next_poll") + .allowlist_function("ibv_end_poll") + .allowlist_function("ibv_wc_read_opcode") + .allowlist_function("ibv_wc_read_vendor_err") + .allowlist_function("ibv_wc_read_byte_len") + .allowlist_function("ibv_wc_read_imm_data") + .allowlist_function("ibv_wr_rdma_write_imm") + .allowlist_function("ibv_wr_rdma_write") + .allowlist_function("ibv_wr_send") + .allowlist_function("ibv_wr_rdma_read") .allowlist_type("ibv_.*") + .allowlist_type("efa.*") .allowlist_var("IBV_LINK_LAYER_.*") + .allowlist_var("EFADV_QP_DRIVER_TYPE_.*") .bitfield_enum("ibv_access_flags") .bitfield_enum("ibv_create_cq_wc_flags") .bitfield_enum("ibv_device_cap_flags") @@ -64,6 +82,7 @@ fn main() { .bitfield_enum("ibv_qp_attr_mask") .bitfield_enum("ibv_qp_create_send_ops_flags") .bitfield_enum("ibv_qp_init_attr_mask") + .bitfield_enum("ibv_qp_init_attr_ex_mask") .bitfield_enum("ibv_qp_open_attr_mask") .bitfield_enum("ibv_raw_packet_caps") .bitfield_enum("ibv_rx_hash_fields") diff --git a/ibverbs-sys/src/lib.rs b/ibverbs-sys/src/lib.rs index 0847b08c..49610d8c 100644 --- a/ibverbs-sys/src/lib.rs +++ b/ibverbs-sys/src/lib.rs @@ -5,8 +5,58 @@ // Suppress expected warnings from bindgen-generated code. // See https://github.com/rust-lang/rust-bindgen/issues/1651. #![allow(deref_nullptr)] + include!(concat!(env!("OUT_DIR"), "/bindings.rs")); +// Dynamic EFA loading module +#[cfg(feature = "efa")] +mod efa_dynamic { + use libloading::{Library, Symbol}; + use std::sync::OnceLock; + + static EFA_LIBRARY: OnceLock> = OnceLock::new(); + + fn load_efa_library() -> Result<&'static Library, &'static libloading::Error> { + EFA_LIBRARY.get_or_init(|| { + unsafe { Library::new("libefa.so") } + }).as_ref() + } + + // Function pointer types for EFA functions + type EfadvCreateQpExFn = unsafe extern "C" fn( + ibv_ctx: *mut crate::ibv_context, + attr: *mut crate::ibv_qp_init_attr_ex, + efa_attr: *mut crate::efadv_qp_init_attr, + efa_attr_size: u32, + ) -> *mut crate::ibv_qp; + + // Lazy-loaded EFA functions + pub struct EfaFunctions { + pub efadv_create_qp_ex: Symbol<'static, EfadvCreateQpExFn>, + } + + impl EfaFunctions { + pub fn load() -> Result { + let lib = load_efa_library().map_err(|e| format!("Failed to load EFA library: {}", e))?; + + unsafe { + Ok(Self { + efadv_create_qp_ex: lib.get(b"efadv_create_qp_ex").map_err(|e| format!("Failed to load efadv_create_qp_ex: {}", e))?, + }) + } + } + } + + static EFA_FUNCTIONS: OnceLock> = OnceLock::new(); + + pub fn get_efa_functions() -> Result<&'static EfaFunctions, &'static String> { + EFA_FUNCTIONS.get_or_init(|| EfaFunctions::load()).as_ref() + } +} + +#[cfg(feature = "efa")] +pub use efa_dynamic::get_efa_functions; + /// An ibverb work completion. #[repr(C)] #[derive(Debug, Copy, Clone)] diff --git a/ibverbs/Cargo.toml b/ibverbs/Cargo.toml index e775276c..a82bc051 100644 --- a/ibverbs/Cargo.toml +++ b/ibverbs/Cargo.toml @@ -20,7 +20,7 @@ license = "MIT OR Apache-2.0" [dependencies] ffi = { path = "../ibverbs-sys", package = "ibverbs-sys", version = "0.3.0" } -nix = { version = "0.29.0", default-features = false, features = ["fs", "poll"] } +nix = { version = "0.29.0", default-features = false, features = ["fs", "poll", "mman", "feature"] } [dependencies.serde] version = "1.0.100" @@ -29,6 +29,7 @@ features = ["derive"] [features] default = ["serde"] +efa = ["ffi/efa"] [dev-dependencies] bincode = "1.3" diff --git a/ibverbs/examples/efa_read.rs b/ibverbs/examples/efa_read.rs new file mode 100644 index 00000000..71c740f9 --- /dev/null +++ b/ibverbs/examples/efa_read.rs @@ -0,0 +1,206 @@ +//! EFA Read Example +//! +//! Run with `cargo run --example efa_read --features efa` + +fn main() -> Result<(), Box> { + println!("Starting EFA one-sided read example with separate reader/writer tasks..."); + + // Channels to communicate the endpoints between reader and writer. + let (reader_tx, reader_rx) = std::sync::mpsc::channel(); + let (writer_tx, writer_rx) = std::sync::mpsc::channel(); + // Channels to communicate the remote MRs between reader and writer. + let (reader_remote_mr_tx, reader_remote_mr_rx) = std::sync::mpsc::channel(); + let (writer_remote_mr_tx, writer_remote_mr_rx) = std::sync::mpsc::channel(); + // Chan to communicate that the writer is ready with data. + let (writer_is_ready_tx, writer_is_ready_rx) = std::sync::mpsc::channel(); + // Chan to communicate that the reader has finished reading. + let (reader_done_tx, reader_done_rx) = std::sync::mpsc::channel(); + + // Spawn reader and writer tasks + let reader_handle = std::thread::spawn(move || { + reader_task(reader_tx, writer_rx, reader_remote_mr_tx, writer_remote_mr_rx, writer_is_ready_rx, reader_done_tx) + }); + + let writer_handle = std::thread::spawn(move || { + writer_task(writer_tx, reader_rx, writer_remote_mr_tx, reader_remote_mr_rx, writer_is_ready_tx, reader_done_rx) + }); + + // Wait for both tasks to complete + let reader_result = reader_handle.join().map_err(|_| "Reader thread panicked")?; + let writer_result = writer_handle.join().map_err(|_| "Writer thread panicked")?; + + reader_result?; + writer_result?; + + println!("Both reader and writer tasks completed successfully!"); + Ok(()) +} + +fn reader_task( + endpoint_tx: std::sync::mpsc::Sender, + peer_rx: std::sync::mpsc::Receiver, + remote_mr_tx: std::sync::mpsc::Sender, + remote_mr_rx: std::sync::mpsc::Receiver, + writer_is_ready_rx: std::sync::mpsc::Receiver, + reader_done_tx: std::sync::mpsc::Sender, +) -> Result<(), Box> { + // Open RDMA device - assume EFA compatibility + let ctx = match ibverbs::devices() + .unwrap() + .iter() + .nth(1) { + Some(dev) => dev.open_with_efa().unwrap(), + None => { + eprintln!("No RDMA devices found!"); + return Ok(()); + } + }; + + // Create completion queue and protection domain + let cq = ctx.create_cq(16, 0).unwrap(); + let pd = ctx.alloc_pd().unwrap(); + + // Create EFA QP + let qp_builder_result = pd + .create_qp(&cq, &cq, ibverbs::ibv_qp_type::IBV_QPT_DRIVER) + .and_then(|mut builder| builder.set_gid_index(0).build()); + + let qp_builder = match qp_builder_result { + Ok(builder) => builder, + Err(e) => { + println!("EFA QP creation failed: {}. This is expected if EFA is not available.", e); + return Ok(()); + } + }; + + let my_endpoint = qp_builder.endpoint()?; + + // Allocate and register memory regions + let efa_access_flags = ibverbs::ibv_access_flags( + ibverbs::ibv_access_flags::IBV_ACCESS_LOCAL_WRITE.0 | + ibverbs::ibv_access_flags::IBV_ACCESS_REMOTE_WRITE.0 | + ibverbs::ibv_access_flags::IBV_ACCESS_REMOTE_READ.0 + ); + + let local_mr = pd.allocate_with_permissions(4096, efa_access_flags).unwrap(); + + // Exchange endpoints and memory regions + endpoint_tx.send(my_endpoint).map_err(|_| "Failed to send endpoint")?; + let peer_endpoint = peer_rx.recv().map_err(|_| "Failed to receive peer endpoint")?; + let remote_mr = remote_mr_rx.recv().unwrap(); + remote_mr_tx.send(local_mr.remote()).unwrap(); + + // Move QP to RTR states and create remote AH + let mut qp = qp_builder.handshake(peer_endpoint).unwrap(); + + // Wait for writer to be ready with data + let writer_is_ready = writer_is_ready_rx.recv().unwrap(); + if !writer_is_ready { + return Err("Writer is not ready".into()); + } + + // Post EFA read from remote memory into local memory + qp.post_read_efa(&[local_mr.slice(..4096)], remote_mr, peer_endpoint, 0, None).unwrap(); + + // Wait for completion + let mut completions = [ibverbs::ibv_wc::default(); 16]; + loop { + let completed = cq.poll(&mut completions[..]).unwrap(); + if completed.is_empty() { + continue; + } + for wr in completed { + if wr.wr_id() == 0 { + if let Some((wc_code, vendor_err)) = wr.error() { + println!("EFA read failed: wc_code={:?}, vendor_err={:?}", wc_code, vendor_err); + return Err("EFA read failed".into()); + } + // Check the data that was read + let data = local_mr.inner(); + let read_number = u64::from_le_bytes(data[..8].try_into().unwrap()); + println!("Read number: {}", read_number); + + // Notify writer that reading is complete + reader_done_tx.send(true).unwrap(); + println!("Reader notified writer of completion"); + + return Ok(()); + } + } + } +} + +fn writer_task( + endpoint_tx: std::sync::mpsc::Sender, + peer_rx: std::sync::mpsc::Receiver, + remote_mr_tx: std::sync::mpsc::Sender, + remote_mr_rx: std::sync::mpsc::Receiver, + writer_is_ready_tx: std::sync::mpsc::Sender, + reader_done_rx: std::sync::mpsc::Receiver, +) -> Result<(), Box> { + // Open RDMA device - assume EFA compatibility + let ctx = match ibverbs::devices() + .unwrap() + .iter() + .nth(2) { + Some(dev) => dev.open_with_efa().unwrap(), + None => { + eprintln!("No RDMA devices found!"); + return Ok(()); + } + }; + + // Create completion queue and protection domain + let cq = ctx.create_cq(16, 0).unwrap(); + let pd = ctx.alloc_pd().unwrap(); + + // Create EFA QP + let qp_builder_result = pd + .create_qp(&cq, &cq, ibverbs::ibv_qp_type::IBV_QPT_DRIVER) + .and_then(|mut builder| builder.set_gid_index(0).build()); + + let qp_builder = match qp_builder_result { + Ok(builder) => builder, + Err(e) => { + println!("EFA QP creation failed: {}. This is expected if EFA is not available.", e); + return Ok(()); + } + }; + + let my_endpoint = qp_builder.endpoint()?; + + // Allocate and register memory regions + let efa_access_flags = ibverbs::ibv_access_flags( + ibverbs::ibv_access_flags::IBV_ACCESS_LOCAL_WRITE.0 | + ibverbs::ibv_access_flags::IBV_ACCESS_REMOTE_WRITE.0 | + ibverbs::ibv_access_flags::IBV_ACCESS_REMOTE_READ.0 + ); + + let mut local_mr = pd.allocate_with_permissions(4096, efa_access_flags).unwrap(); + + // Write a simple number to the local memory region + let test_number: u64 = 42; + local_mr.inner_mut()[..std::mem::size_of::()].copy_from_slice(&test_number.to_le_bytes()); + println!("Writer prepared test number: {}", test_number); + + // Exchange endpoints and memory regions + endpoint_tx.send(my_endpoint).map_err(|_| "Failed to send endpoint")?; + let peer_endpoint = peer_rx.recv().map_err(|_| "Failed to receive peer endpoint")?; + remote_mr_tx.send(local_mr.remote()).unwrap(); + let _remote_mr = remote_mr_rx.recv().unwrap(); + + // Move QP to RTR states and create remote AH + let _qp = qp_builder.handshake(peer_endpoint).unwrap(); + + // Signal ready to reader + writer_is_ready_tx.send(true).unwrap(); + + // Writer waits for reader to complete reading + println!("Writer is ready and waiting for reader to complete..."); + let reader_done = reader_done_rx.recv().unwrap(); + if reader_done { + println!("Writer received completion notification from reader"); + } + + Ok(()) +} diff --git a/ibverbs/examples/efa_write.rs b/ibverbs/examples/efa_write.rs new file mode 100644 index 00000000..a94f2f4b --- /dev/null +++ b/ibverbs/examples/efa_write.rs @@ -0,0 +1,211 @@ +//! EFA Write Example +//! +//! Key Notes: +//! - "handshakes" in SRD is not exactly a handshake. We really just need to create the AHs on +//! both sides and then we can use the AHs to send and receive data. This is because SRD +//! is a connectionless protocol. It does not gurantee that both sides are ready. +//! - We use the QP_EX API to create the QP. This is primarily needed so that we can create an SRD QP. +//! - All MRs must be registered before the QP is handshaked... +//! - All post_receives on receiver side must be posted before the writer sends. +//! Failure to do so will result in a `IBV_WC_RNR_RETRY_EXC_ERR` error in WCE +//! - Writes are non ordered. You must not depend on the "last" write with imm data to be received as a +//! signal of last write completion. +//! +//! Run with `cargo run --example efa_write --features efa` + +fn main() -> Result<(), Box> { + println!("Starting EFA one-sided write example with separate sender/receiver tasks..."); + + // Channels to communicate the endpoints between sender and receiver. + let (sender_tx, sender_rx) = std::sync::mpsc::channel(); + let (receiver_tx, receiver_rx) = std::sync::mpsc::channel(); + // Channels to communicate the remote MRs between sender and receiver. + let (sender_remote_mr_tx, sender_remote_mr_rx) = std::sync::mpsc::channel(); + let (receiver_remote_mr_tx, receiver_remote_mr_rx) = std::sync::mpsc::channel(); + // Chan to communicate that the receiver is ready to receive data. + let (receiver_is_ready_tx, receiver_is_ready_rx) = std::sync::mpsc::channel(); + + // Spawn sender and receiver tasks + let sender_handle = std::thread::spawn(move || { + sender_task(sender_tx, receiver_rx, sender_remote_mr_tx, receiver_remote_mr_rx, receiver_is_ready_rx) + }); + + let receiver_handle = std::thread::spawn(move || { + receiver_task(receiver_tx, sender_rx, receiver_remote_mr_tx, sender_remote_mr_rx, receiver_is_ready_tx) + }); + + // Wait for both tasks to complete + let sender_result = sender_handle.join().map_err(|_| "Sender thread panicked")?; + let receiver_result = receiver_handle.join().map_err(|_| "Receiver thread panicked")?; + + sender_result?; + receiver_result?; + + println!("Both sender and receiver tasks completed successfully!"); + Ok(()) +} + +fn sender_task( + endpoint_tx: std::sync::mpsc::Sender, + peer_rx: std::sync::mpsc::Receiver, + remote_mr_tx: std::sync::mpsc::Sender, + remote_mr_rx: std::sync::mpsc::Receiver, + receiver_is_ready_rx: std::sync::mpsc::Receiver, +) -> Result<(), Box> { + // Open RDMA device - assume EFA compatibility + let ctx = match ibverbs::devices() + .unwrap() + .iter() + .nth(1) { + Some(dev) => dev.open_with_efa().unwrap(), + None => { + eprintln!("No RDMA devices found!"); + return Ok(()); + } + }; + + // Create completion queue and protection domain + let cq = ctx.create_cq(16, 0).unwrap(); + let pd = ctx.alloc_pd().unwrap(); + + // Create EFA QP + let qp_builder_result = pd + .create_qp(&cq, &cq, ibverbs::ibv_qp_type::IBV_QPT_DRIVER) + .and_then(|mut builder| builder.set_gid_index(0).build()); + + let qp_builder = match qp_builder_result { + Ok(builder) => builder, + Err(e) => { + println!("EFA QP creation failed: {}. This is expected if EFA is not available.", e); + return Ok(()); + } + }; + + let my_endpoint = qp_builder.endpoint()?; + + // Allocate and register memory regions + let efa_access_flags = ibverbs::ibv_access_flags( + ibverbs::ibv_access_flags::IBV_ACCESS_LOCAL_WRITE.0 | + ibverbs::ibv_access_flags::IBV_ACCESS_REMOTE_WRITE.0 | + ibverbs::ibv_access_flags::IBV_ACCESS_REMOTE_READ.0 + ); + + let local_mr = pd.allocate_with_permissions(4096, efa_access_flags).unwrap(); + + // Exchange endpoints and memory regions + endpoint_tx.send(my_endpoint).map_err(|_| "Failed to send endpoint")?; + let peer_endpoint = peer_rx.recv().map_err(|_| "Failed to receive peer endpoint")?; + let remote_mr = remote_mr_rx.recv().unwrap(); + remote_mr_tx.send(local_mr.remote()).unwrap(); + + // Move QP to RTR states and create remote AH + let mut qp = qp_builder.handshake(peer_endpoint).unwrap(); + + // Wait for receiver to be ready + let receiver_is_ready = receiver_is_ready_rx.recv().unwrap(); + if !receiver_is_ready { + return Err("Receiver is not ready".into()); + } + + // Post EFA write with immediate data + qp.post_write(&[local_mr.slice(..4096)], remote_mr, 0, Some(0x67)).unwrap(); + + // Wait for completion + let mut completions = [ibverbs::ibv_wc::default(); 16]; + loop { + let completed = cq.poll(&mut completions[..]).unwrap(); + if completed.is_empty() { + continue; + } + for wr in completed { + if wr.wr_id() == 0 { + if let Some((wc_code, vendor_err)) = wr.error() { + println!("EFA write failed: wc_code={:?}, vendor_err={:?}", wc_code, vendor_err); + return Err("EFA write failed".into()); + } + return Ok(()); + } + } + } +} + +fn receiver_task( + endpoint_tx: std::sync::mpsc::Sender, + peer_rx: std::sync::mpsc::Receiver, + remote_mr_tx: std::sync::mpsc::Sender, + remote_mr_rx: std::sync::mpsc::Receiver, + receiver_is_ready_tx: std::sync::mpsc::Sender, +) -> Result<(), Box> { + // Open RDMA device - assume EFA compatibility + let ctx = match ibverbs::devices() + .unwrap() + .iter() + .nth(2) { + Some(dev) => dev.open_with_efa().unwrap(), + None => { + eprintln!("No RDMA devices found!"); + return Ok(()); + } + }; + + // Create completion queue and protection domain + let cq = ctx.create_cq(16, 0).unwrap(); + let pd = ctx.alloc_pd().unwrap(); + + // Create EFA QP + let qp_builder_result = pd + .create_qp(&cq, &cq, ibverbs::ibv_qp_type::IBV_QPT_DRIVER) + .and_then(|mut builder| builder.set_gid_index(0).build()); + + let qp_builder = match qp_builder_result { + Ok(builder) => builder, + Err(e) => { + println!("EFA QP creation failed: {}. This is expected if EFA is not available.", e); + return Ok(()); + } + }; + + let my_endpoint = qp_builder.endpoint()?; + + // Allocate and register memory regions + let efa_access_flags = ibverbs::ibv_access_flags( + ibverbs::ibv_access_flags::IBV_ACCESS_LOCAL_WRITE.0 | + ibverbs::ibv_access_flags::IBV_ACCESS_REMOTE_WRITE.0 | + ibverbs::ibv_access_flags::IBV_ACCESS_REMOTE_READ.0 + ); + + let local_mr = pd.allocate_with_permissions(4096, efa_access_flags).unwrap(); + + // Exchange endpoints and memory regions + endpoint_tx.send(my_endpoint).map_err(|_| "Failed to send endpoint")?; + let peer_endpoint = peer_rx.recv().map_err(|_| "Failed to receive peer endpoint")?; + remote_mr_tx.send(local_mr.remote()).unwrap(); + let _remote_mr = remote_mr_rx.recv().unwrap(); + + // Move QP to RTR states and create remote AH + let mut qp = qp_builder.handshake(peer_endpoint).unwrap(); + + // Post receive for the write operation + let dummy_buffer = pd.allocate_with_permissions(4096, efa_access_flags).unwrap(); + unsafe { qp.post_receive(&[dummy_buffer.slice(..)], 1) }.unwrap(); + + // Signal ready to sender + receiver_is_ready_tx.send(true).unwrap(); + + // Wait for completion + let mut completions = [ibverbs::ibv_wc::default(); 16]; + loop { + let completed = cq.poll(&mut completions[..]).unwrap(); + if completed.is_empty() { + continue; + } + for wr in completed { + if wr.wr_id() == 1 { + let imm_data = wr.imm_data(); + let imm_data_host_order = u32::from_be(imm_data.unwrap()); + println!("Received immediate data: {} (0x{:08x})", imm_data_host_order, imm_data_host_order); + return Ok(()); + } + } + } +} diff --git a/ibverbs/src/lib.rs b/ibverbs/src/lib.rs index 09a9b196..7a84ea2e 100644 --- a/ibverbs/src/lib.rs +++ b/ibverbs/src/lib.rs @@ -76,6 +76,7 @@ use std::sync::Arc; use std::time::Duration; const PORT_NUM: u8 = 1; +const UD_QKEY: u32 = 0x12345678; /// Direct access to low-level libverbs FFI. pub use ffi::ibv_gid_type; @@ -100,6 +101,13 @@ pub const DEFAULT_ACCESS_FLAGS: ffi::ibv_access_flags = ffi::ibv_access_flags( | ffi::ibv_access_flags::IBV_ACCESS_RELAXED_ORDERING.0, ); +/// Default access flags compatible with EFA. +pub const EFA_DEFAULT_ACCESS_FLAGS: ffi::ibv_access_flags = ffi::ibv_access_flags( + ffi::ibv_access_flags::IBV_ACCESS_LOCAL_WRITE.0 + | ffi::ibv_access_flags::IBV_ACCESS_REMOTE_WRITE.0 + | ffi::ibv_access_flags::IBV_ACCESS_REMOTE_READ.0 +); + /// Get list of available RDMA devices. /// /// # Errors @@ -256,7 +264,16 @@ impl<'devlist> Device<'devlist> { /// - `EMFILE`: Too many files are opened by this process (from `ibv_query_gid`). /// - Other: the device is not in `ACTIVE` or `ARMED` state. pub fn open(&self) -> io::Result { - Context::with_device(*self.0) + Context::with_device(*self.0, false) + } + + /// Opens an RDMA device and creates a context for further use with EFA support enabled. + /// + /// # Errors + /// + /// Same as `open()`. + pub fn open_with_efa(&self) -> io::Result { + Context::with_device(*self.0, true) } /// Returns a string of the name, which is associated with this RDMA device. @@ -331,6 +348,7 @@ impl<'devlist> Device<'devlist> { struct ContextInner { ctx: *mut ffi::ibv_context, + enable_efa: bool, } impl ContextInner { @@ -388,14 +406,14 @@ pub struct Context { impl Context { /// Opens a context for the given device, and queries its port and gid. - fn with_device(dev: *mut ffi::ibv_device) -> io::Result { + fn with_device(dev: *mut ffi::ibv_device, enable_efa: bool) -> io::Result { assert!(!dev.is_null()); let ctx = unsafe { ffi::ibv_open_device(dev) }; if ctx.is_null() { return Err(io::Error::other("failed to open device")); } - let inner = Arc::new(ContextInner { ctx }); + let inner = Arc::new(ContextInner { ctx, enable_efa }); let ctx = Context { inner }; // checks that the port is active/armed. @@ -403,6 +421,11 @@ impl Context { Ok(ctx) } + /// Returns true if EFA is enabled for this context. + pub fn is_efa_enabled(&self) -> bool { + self.inner.enable_efa + } + /// Create a completion queue (CQ). /// /// When an outstanding Work Request, within a Send or Receive Queue, is completed, a Work @@ -517,13 +540,13 @@ impl Drop for CompletionQueueInner { let errno = unsafe { ffi::ibv_destroy_cq(self.cq) }; if errno != 0 { let e = io::Error::from_raw_os_error(errno); - panic!("{e}"); + panic!("Failed to destroy CQ: {e}"); } let errno = unsafe { ffi::ibv_destroy_comp_channel(self.cc) }; if errno != 0 { let e = io::Error::from_raw_os_error(errno); - panic!("{e}"); + panic!("Failed to destroy completion channel: {e}"); } } } @@ -782,7 +805,8 @@ impl QueuePairBuilder { qp_type, access: (qp_type == ffi::ibv_qp_type::IBV_QPT_RC - || qp_type == ffi::ibv_qp_type::IBV_QPT_UC) + || qp_type == ffi::ibv_qp_type::IBV_QPT_UC + || qp_type == ffi::ibv_qp_type::IBV_QPT_DRIVER) .then_some(ffi::ibv_access_flags::IBV_ACCESS_LOCAL_WRITE), min_rnr_timer: (qp_type == ffi::ibv_qp_type::IBV_QPT_RC).then_some(16), retry_count: (qp_type == ffi::ibv_qp_type::IBV_QPT_RC).then_some(6), @@ -802,12 +826,13 @@ impl QueuePairBuilder { /// Set the access flags for the new `QueuePair`. /// - /// Valid only for RC and UC QPs. + /// Valid only for RC, UC, and DRIVER QPs. /// /// Defaults to `IBV_ACCESS_LOCAL_WRITE`. pub fn set_access(&mut self, access: ffi::ibv_access_flags) -> &mut Self { if self.qp_type == ffi::ibv_qp_type::IBV_QPT_RC || self.qp_type == ffi::ibv_qp_type::IBV_QPT_UC + || self.qp_type == ffi::ibv_qp_type::IBV_QPT_DRIVER { self.access = Some(access); } @@ -1106,6 +1131,9 @@ impl QueuePairBuilder { /// This method will fail if asked to create QP of a type other than `IBV_QPT_RC` or /// `IBV_QPT_UD` associated with an SRQ. /// + /// If `enable_efa` is set to `true`, this will use EFA-specific QP creation. Otherwise, + /// it uses the standard ibverbs API. + /// /// # Errors /// /// - `EINVAL`: Invalid `ProtectionDomain`, sending or receiving `Context`, or invalid value @@ -1114,23 +1142,77 @@ impl QueuePairBuilder { /// - `ENOSYS`: QP with this Transport Service Type isn't supported by this RDMA device. /// - `EPERM`: Not enough permissions to create a QP with this Transport Service Type. pub fn build(&self) -> io::Result { - let mut attr = ffi::ibv_qp_init_attr { - qp_context: unsafe { ptr::null::().offset(self.ctx) } as *mut _, - send_cq: self.send.cq as *const _ as *mut _, - recv_cq: self.recv.cq as *const _ as *mut _, - srq: ptr::null::() as *mut _, - cap: ffi::ibv_qp_cap { - max_send_wr: self.max_send_wr, - max_recv_wr: self.max_recv_wr, - max_send_sge: self.max_send_sge, - max_recv_sge: self.max_recv_sge, - max_inline_data: self.max_inline_data, - }, - qp_type: self.qp_type, - sq_sig_all: 0, - }; - let qp = unsafe { ffi::ibv_create_qp(self.pd.pd, &mut attr as *mut _) }; + let qp = if self.pd.ctx.enable_efa { + #[cfg(feature = "efa")] { + // EFA uses extended API to create an SRD QP. + let send_ops_flags = ( + ffi::ibv_qp_create_send_ops_flags::IBV_QP_EX_WITH_RDMA_WRITE.0 | + ffi::ibv_qp_create_send_ops_flags::IBV_QP_EX_WITH_RDMA_WRITE_WITH_IMM.0 | + ffi::ibv_qp_create_send_ops_flags::IBV_QP_EX_WITH_RDMA_READ.0 | + ffi::ibv_qp_create_send_ops_flags::IBV_QP_EX_WITH_SEND.0 | + ffi::ibv_qp_create_send_ops_flags::IBV_QP_EX_WITH_SEND_WITH_IMM.0 + ) as u32; + let mut attr = ffi::ibv_qp_init_attr_ex { + qp_context: self.pd.ctx.ctx as *mut _, + send_cq: self.send.cq as *const _ as *mut _, + recv_cq: self.recv.cq as *const _ as *mut _, + cap: ffi::ibv_qp_cap { + max_send_wr: self.max_send_wr, + max_recv_wr: self.max_recv_wr, + max_send_sge: self.max_send_sge, + max_recv_sge: self.max_recv_sge, + max_inline_data: self.max_inline_data, + }, + qp_type: self.qp_type, + sq_sig_all: 1, + comp_mask: ( + ffi::ibv_qp_init_attr_mask::IBV_QP_INIT_ATTR_PD | + ffi::ibv_qp_init_attr_mask::IBV_QP_INIT_ATTR_SEND_OPS_FLAGS + ).0, + pd: self.pd.pd, + send_ops_flags: send_ops_flags as u64, + ..Default::default() + }; + + let mut efa_attr = ffi::efadv_qp_init_attr { + driver_qp_type: ffi::EFADV_QP_DRIVER_TYPE_SRD as u32, + sl: 0 as u8, + ..Default::default() + }; + + let qp = match ffi::get_efa_functions() { + Ok(efa_funcs) => unsafe { + (efa_funcs.efadv_create_qp_ex)(self.pd.ctx.ctx, &mut attr, &mut efa_attr as *mut _, std::mem::size_of::() as u32) + }, + Err(e) => { + return Err(io::Error::other(format!("Failed to load EFA functions: {}", e))); + } + }; + qp + } + #[cfg(not(feature = "efa"))] { + panic!("EFA feature is not enabled but ctx.enable_efa is true"); + } + } else { + // otherwise, use standard ibverbs API to create a regular QP. + let mut attr = ffi::ibv_qp_init_attr { + qp_context: unsafe { ptr::null::().offset(self.ctx) } as *mut _, + send_cq: self.send.cq as *const _ as *mut _, + recv_cq: self.recv.cq as *const _ as *mut _, + srq: ptr::null::() as *mut _, + cap: ffi::ibv_qp_cap { + max_send_wr: self.max_send_wr, + max_recv_wr: self.max_recv_wr, + max_send_sge: self.max_send_sge, + max_recv_sge: self.max_recv_sge, + max_inline_data: self.max_inline_data, + }, + qp_type: self.qp_type, + sq_sig_all: 0, + }; + unsafe { ffi::ibv_create_qp(self.pd.pd, &mut attr as *mut _) } + }; if qp.is_null() { Err(io::Error::last_os_error()) } else { @@ -1139,6 +1221,8 @@ impl QueuePairBuilder { qp: QueuePair { pd: self.pd.clone(), qp, + ah: ptr::null_mut(), + remote_endpoint: None, }, gid_index: self.gid_index, traffic_class: self.traffic_class, @@ -1232,7 +1316,7 @@ pub struct PreparedQueuePair { /// endianness. #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] #[derive(Default, Copy, Clone, Debug, Eq, PartialEq, Hash)] -#[repr(transparent)] +#[repr(align(8))] // Ensure 8-byte alignment pub struct Gid { raw: [u8; 16], } @@ -1367,7 +1451,7 @@ impl PreparedQueuePair { }) } - /// Set up the `QueuePair` such that it is ready to exchange packets with a remote `QueuePair`. + /// Set up the `QueuePair` such that it is ready to exchange packets with a remote `QueuePair` using standard ibverbs. /// /// Internally, this uses `ibv_modify_qp` to mark the `QueuePair` as initialized /// (`IBV_QPS_INIT`), ready to receive (`IBV_QPS_RTR`), and ready to send (`IBV_QPS_RTS`). @@ -1391,7 +1475,12 @@ impl PreparedQueuePair { /// ah_attr.sl = 0; /// ah_attr.src_path_bits = 0; /// ``` - /// + /// Note(EFA): EFA is a connectionless protocol, the handshake is simply modifying the QP + /// state from INIT -> RTS. A successful handshake does NOT gurantee that the remote endpoint + /// has successfully completed its own handshake. As such, the sender usually needs to retry + /// the first write request, OR have some other mechanism with which to gurantee that the + /// receiver has completed its handshake first. + /// /// # Errors /// /// - `EINVAL`: Invalid value provided in `attr` or in `attr_mask`. @@ -1400,6 +1489,8 @@ impl PreparedQueuePair { /// [RDMAmojo]: http://www.rdmamojo.com/2014/01/18/connecting-queue-pairs/ pub fn handshake(self, remote: QueuePairEndpoint) -> io::Result { // init and associate with port + let mut qp = self.qp; + let qp_type = unsafe { (*qp.qp).qp_type }; let mut attr = ffi::ibv_qp_attr { qp_state: ffi::ibv_qp_state::IBV_QPS_INIT, pkey_index: 0, @@ -1409,44 +1500,52 @@ impl PreparedQueuePair { let mut mask = ffi::ibv_qp_attr_mask::IBV_QP_STATE | ffi::ibv_qp_attr_mask::IBV_QP_PKEY_INDEX | ffi::ibv_qp_attr_mask::IBV_QP_PORT; - if let Some(access) = self.access { - attr.qp_access_flags = access.0; - mask |= ffi::ibv_qp_attr_mask::IBV_QP_ACCESS_FLAGS; + + if qp.pd.ctx.enable_efa { + attr.qkey = UD_QKEY; + mask |= ffi::ibv_qp_attr_mask::IBV_QP_QKEY; + attr.port_num = PORT_NUM; + mask |= ffi::ibv_qp_attr_mask::IBV_QP_PORT; + } else { + if let Some(access) = self.access { + attr.qp_access_flags = access.0; + mask |= ffi::ibv_qp_attr_mask::IBV_QP_ACCESS_FLAGS; + } } - let errno = unsafe { ffi::ibv_modify_qp(self.qp.qp, &mut attr as *mut _, mask.0 as i32) }; + + let errno = unsafe { ffi::ibv_modify_qp(qp.qp, &mut attr as *mut _, mask.0 as i32) }; if errno != 0 { return Err(io::Error::from_raw_os_error(errno)); } // set ready to receive + let mut mask = ffi::ibv_qp_attr_mask::IBV_QP_STATE; let mut attr = ffi::ibv_qp_attr { qp_state: ffi::ibv_qp_state::IBV_QPS_RTR, - // TODO: this is only valid for RC and UC - dest_qp_num: remote.num, - // TODO: this is only valid for RC and UC - ah_attr: ffi::ibv_ah_attr { - dlid: remote.lid, - sl: self.service_level, - src_path_bits: 0, - port_num: PORT_NUM, - grh: Default::default(), - ..Default::default() - }, ..Default::default() }; - if let Some(gid) = remote.gid { - attr.ah_attr.is_global = 1; - attr.ah_attr.grh.dgid = gid.into(); - attr.ah_attr.grh.hop_limit = 0xff; - attr.ah_attr.grh.sgid_index = self - .gid_index - .ok_or_else(|| io::Error::other("gid was set for remote but not local"))? - as u8; - attr.ah_attr.grh.traffic_class = self.traffic_class; + + if qp_type == ffi::ibv_qp_type::IBV_QPT_RC || qp_type == ffi::ibv_qp_type::IBV_QPT_UC { + attr.dest_qp_num = remote.num; + attr.ah_attr.dlid = remote.lid; + attr.ah_attr.sl = self.service_level; + attr.ah_attr.src_path_bits = 0; + attr.ah_attr.port_num = PORT_NUM; + attr.ah_attr.grh = Default::default(); + if let Some(gid) = remote.gid { + attr.ah_attr.is_global = 1; + attr.ah_attr.grh.dgid = gid.into(); + attr.ah_attr.grh.hop_limit = 0xff; + attr.ah_attr.grh.sgid_index = self + .gid_index + .ok_or_else(|| io::Error::other("gid was set for remote but not local"))? + as u8; + attr.ah_attr.grh.traffic_class = self.traffic_class; + } + mask |= ffi::ibv_qp_attr_mask::IBV_QP_DEST_QPN; + mask |= ffi::ibv_qp_attr_mask::IBV_QP_AV; } - let mut mask = ffi::ibv_qp_attr_mask::IBV_QP_STATE - | ffi::ibv_qp_attr_mask::IBV_QP_AV - | ffi::ibv_qp_attr_mask::IBV_QP_DEST_QPN; + if let Some(max_dest_rd_atomic) = self.max_dest_rd_atomic { attr.max_dest_rd_atomic = max_dest_rd_atomic; mask |= ffi::ibv_qp_attr_mask::IBV_QP_MAX_DEST_RD_ATOMIC; @@ -1463,7 +1562,7 @@ impl PreparedQueuePair { attr.rq_psn = rq_psn; mask |= ffi::ibv_qp_attr_mask::IBV_QP_RQ_PSN; } - let errno = unsafe { ffi::ibv_modify_qp(self.qp.qp, &mut attr as *mut _, mask.0 as i32) }; + let errno = unsafe { ffi::ibv_modify_qp(qp.qp, &mut attr as *mut _, mask.0 as i32) }; if errno != 0 { return Err(io::Error::from_raw_os_error(errno)); } @@ -1491,12 +1590,34 @@ impl PreparedQueuePair { attr.max_rd_atomic = max_rd_atomic; mask |= ffi::ibv_qp_attr_mask::IBV_QP_MAX_QP_RD_ATOMIC; } - let errno = unsafe { ffi::ibv_modify_qp(self.qp.qp, &mut attr as *mut _, mask.0 as i32) }; + let errno = unsafe { ffi::ibv_modify_qp(qp.qp, &mut attr as *mut _, mask.0 as i32) }; if errno != 0 { return Err(io::Error::from_raw_os_error(errno)); } - Ok(self.qp) + if qp.pd.ctx.enable_efa { + // EFA QP requires an AH to be created for the remote endpoint. + let mut ah_attr = ffi::ibv_ah_attr { + dlid: remote.lid, + sl: 0, + src_path_bits: 0, + port_num: PORT_NUM, + is_global: 1, + static_rate: 0, + grh: ffi::ibv_global_route { + dgid: remote.gid.unwrap().into(), + ..Default::default() + }, + ..Default::default() + }; + qp.ah = unsafe { ffi::ibv_create_ah(qp.pd.pd, &mut ah_attr as *mut _) }; + if qp.ah.is_null() { + return Err(io::Error::last_os_error()); + } + qp.remote_endpoint = Some(remote); + } + + Ok(qp) } } @@ -1513,7 +1634,7 @@ impl Drop for MemoryRegionInner { let errno = unsafe { ffi::ibv_dereg_mr(self.mr) }; if errno != 0 { let e = io::Error::from_raw_os_error(errno); - panic!("{e}"); + panic!("Failed to deregister MR: {e}"); } } } @@ -1649,7 +1770,7 @@ impl Drop for ProtectionDomainInner { let errno = unsafe { ffi::ibv_dealloc_pd(self.pd) }; if errno != 0 { let e = io::Error::from_raw_os_error(errno); - panic!("{e}"); + panic!("Failed to deallocate PD: {e}"); } } } @@ -1771,7 +1892,11 @@ impl ProtectionDomain { /// - `ENOMEM`: Not enough resources (either in operating system or in RDMA device) to /// complete this operation. pub fn allocate(&self, n: usize) -> io::Result>> { - let access_flags = DEFAULT_ACCESS_FLAGS; + let access_flags = if self.inner.ctx.enable_efa { + EFA_DEFAULT_ACCESS_FLAGS + } else { + DEFAULT_ACCESS_FLAGS + }; self.allocate_with_permissions(n, access_flags) } @@ -1808,7 +1933,12 @@ impl ProtectionDomain { &self, data: T, ) -> io::Result> { - self.register_with_permissions(data, DEFAULT_ACCESS_FLAGS) + let access_flags = if self.inner.ctx.enable_efa { + EFA_DEFAULT_ACCESS_FLAGS + } else { + DEFAULT_ACCESS_FLAGS + }; + self.register_with_permissions(data, access_flags) } /// Registers an already allocated DMA-BUF memory region (MR) associated with this `ProtectionDomain`. @@ -1856,6 +1986,8 @@ impl ProtectionDomain { pub struct QueuePair { pd: Arc, qp: *mut ffi::ibv_qp, + ah: *mut ffi::ibv_ah, + remote_endpoint: Option, } unsafe impl Send for QueuePair {} @@ -1894,6 +2026,45 @@ impl QueuePair { /// [1]: http://www.rdmamojo.com/2013/01/26/ibv_post_send/ #[inline] pub unsafe fn post_send(&mut self, local: &[LocalMemorySlice], wr_id: u64) -> io::Result<()> { + if self.pd.ctx.enable_efa { + self.post_send_efa(local, wr_id) + } else { + self.post_send_regular(local, wr_id) + } + } + + #[inline] + /// Remote RDMA send using EFA. + fn post_send_efa(&mut self, local: &[LocalMemorySlice], wr_id: u64) -> io::Result<()> { + let qp_ex_ptr = unsafe { ffi::ibv_qp_to_qp_ex(self.qp) }; + if qp_ex_ptr.is_null() { + return Err(io::Error::last_os_error()); + } + + let qp_ex = unsafe { &mut *qp_ex_ptr }; + + let errno = unsafe { + qp_ex.wr_start.unwrap()(qp_ex); + qp_ex.wr_id = wr_id; + qp_ex.comp_mask = 0; + qp_ex.wr_flags = ffi::ibv_send_flags::IBV_SEND_SIGNALED.0; + qp_ex.wr_send.unwrap()(qp_ex); + qp_ex.wr_set_sge_list.unwrap()(qp_ex, local.len(), local.as_ptr() as *mut ffi::ibv_sge); + qp_ex.wr_set_ud_addr.unwrap()(qp_ex, self.ah, self.remote_endpoint.unwrap().num, UD_QKEY); + qp_ex.wr_complete.unwrap()(qp_ex) + }; + if errno != 0 { + Err(io::Error::from_raw_os_error(errno)) + } else { + Ok(()) + } + } + + #[inline] + /// Remote RDMA write using standard ibverbs. + /// immediate data can be used to signal the completion of the write operation + /// the other side uses post_recv on a dummy buffer and get the imm data from the work completion + fn post_send_regular(&mut self, local: &[LocalMemorySlice], wr_id: u64) -> io::Result<()> { let mut wr = ffi::ibv_send_wr { wr_id, next: ptr::null::() as *mut _, @@ -2005,12 +2176,33 @@ impl QueuePair { /// Remote RDMA write. /// immediate data can be used to signal the completion of the write operation /// the other side uses post_recv on a dummy buffer and get the imm data from the work completion + /// + /// If this QueuePair was created with EFA support, this will use EFA-specific write operations. + /// Otherwise, it uses standard ibverbs RDMA write operations. pub fn post_write( &mut self, local: &[LocalMemorySlice], remote: RemoteMemorySlice, wr_id: u64, imm_data: Option, + ) -> io::Result<()> { + if self.pd.ctx.enable_efa { + self.post_write_efa(local, remote, wr_id, imm_data) + } else { + self.post_write_regular(local, remote, wr_id, imm_data) + } + } + + #[inline] + /// Remote RDMA write using standard ibverbs. + /// immediate data can be used to signal the completion of the write operation + /// the other side uses post_recv on a dummy buffer and get the imm data from the work completion + fn post_write_regular( + &mut self, + local: &[LocalMemorySlice], + remote: RemoteMemorySlice, + wr_id: u64, + imm_data: Option, ) -> io::Result<()> { let opcode = if imm_data.is_some() { ffi::ibv_wr_opcode::IBV_WR_RDMA_WRITE_WITH_IMM @@ -2021,6 +2213,96 @@ impl QueuePair { self._post_one_sided(local, remote, wr_id, opcode, imm_data) } + #[inline] + /// Remote RDMA write with EFA. + /// immediate data can be used to signal the completion of the write operation + /// the other side uses post_recv on a dummy buffer and get the imm data from the work completion + pub fn post_write_efa( + &mut self, + local: &[LocalMemorySlice], + remote_mr: RemoteMemorySlice, + wr_id: u64, + imm_data: Option, + ) -> io::Result<()> { + let qp_ex_ptr = unsafe { ffi::ibv_qp_to_qp_ex(self.qp) }; + if qp_ex_ptr.is_null() { + return Err(io::Error::last_os_error()); + } + + let qp_ex = unsafe { &mut *qp_ex_ptr }; + + let errno = unsafe { + qp_ex.wr_start.unwrap()(qp_ex); + qp_ex.wr_id = wr_id; + qp_ex.comp_mask = 0; + qp_ex.wr_flags = ffi::ibv_send_flags::IBV_SEND_SIGNALED.0; + if imm_data.is_some() { + qp_ex.wr_rdma_write_imm.unwrap()(qp_ex, remote_mr.rkey, remote_mr.addr, imm_data.unwrap().to_be()); + } else { + qp_ex.wr_rdma_write.unwrap()(qp_ex, remote_mr.rkey, remote_mr.addr); + }; + qp_ex.wr_set_sge_list.unwrap()(qp_ex, local.len(), local.as_ptr() as *mut ffi::ibv_sge); + qp_ex.wr_set_ud_addr.unwrap()(qp_ex, self.ah, self.remote_endpoint.unwrap().num, UD_QKEY); + qp_ex.wr_complete.unwrap()(qp_ex) + }; + if errno != 0 { + Err(io::Error::from_raw_os_error(errno)) + } else { + Ok(()) + } + } + + #[inline] + /// Get raw QP pointer (for testing/debugging purposes only). + pub fn raw_qp_ptr(&self) -> *mut ffi::ibv_qp { + self.qp + } + + /// Remote RDMA read with EFA. + /// Upgrades QP to QP_EX + /// Note: EFA may not support immediate data with reads + pub fn post_read_efa( + &mut self, + local: &[LocalMemorySlice], + remote: RemoteMemorySlice, + remote_endpoint: QueuePairEndpoint, + wr_id: u64, + _imm_data: Option, + ) -> io::Result<()> { + + let qp_ex_ptr = unsafe { ffi::ibv_qp_to_qp_ex(self.qp) }; + + if qp_ex_ptr.is_null() { + return Err(io::Error::last_os_error()); + } + + let qp_ex = unsafe { &mut *qp_ex_ptr }; + + let errno = unsafe { + qp_ex.wr_start.unwrap()(qp_ex); + qp_ex.wr_id = wr_id; + // set comp_mask to 0 + qp_ex.comp_mask = 0; + // set wr_flags = IBV_SEND_SIGNALED + qp_ex.wr_flags = ffi::ibv_send_flags::IBV_SEND_SIGNALED.0; + + // Set RDMA read parameters first (following write pattern) + qp_ex.wr_rdma_read.unwrap()(qp_ex, remote.rkey, remote.addr); + + // set sge_list with num_Sge and sg_list + qp_ex.wr_set_sge_list.unwrap()(qp_ex, local.len(), local.as_ptr() as *mut ffi::ibv_sge); + // set ud_addr as remote->ah, remote_qpn, qkey + qp_ex.wr_set_ud_addr.unwrap()(qp_ex, self.ah, remote_endpoint.num, UD_QKEY); + + qp_ex.wr_complete.unwrap()(qp_ex) + }; + if errno != 0 { + Err(io::Error::from_raw_os_error(errno)) + } else { + Ok(()) + } + } + #[inline] /// Remote RDMA read. /// RDMA read does not support immediate data. @@ -2085,11 +2367,20 @@ impl QueuePair { impl Drop for QueuePair { fn drop(&mut self) { + // Destroy the address handle if it exists + if !self.ah.is_null() { + let errno = unsafe { ffi::ibv_destroy_ah(self.ah) }; + if errno != 0 { + let e = io::Error::from_raw_os_error(errno); + panic!("Failed to destroy AH: {e}"); + } + } + // TODO: ibv_destroy_qp() fails if the QP is attached to a multicast group. let errno = unsafe { ffi::ibv_destroy_qp(self.qp) }; if errno != 0 { let e = io::Error::from_raw_os_error(errno); - panic!("{e}"); + panic!("Failed to destroy QP: {e}"); } } }