Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions ibverbs-sys/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ license = "MIT OR Apache-2.0"

exclude = ["vendor/rdma-core/build/"]

[features]
efa = []

[dependencies]
libloading = "0.8"

[build-dependencies]
bindgen = "0.71.1"
cmake = "0.1.50"
Expand Down
19 changes: 19 additions & 0 deletions ibverbs-sys/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,29 @@ fn main() {
eprintln!("run bindgen");
let bindings = bindgen::Builder::default()
.header("vendor/rdma-core/libibverbs/verbs.h")
.header("vendor/rdma-core/providers/efa/efadv.h")
Copy link
Copy Markdown
Collaborator

@xmakro xmakro Nov 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe the bindings for the efa headers and efa specific methods should only be created if the efa feature is enabled

.clang_arg(format!("-I{built_in}/include/"))
.allowlist_function("ibv_.*")
.allowlist_function("_ibv_.*")
.allowlist_function("efadv_.*")
.allowlist_function("ibv_cq_ex_to_cq")
.allowlist_function("ibv_start_poll")
.allowlist_type("ibv_cq_ex")
.allowlist_type("ibv_poll_cq_attr")
.allowlist_function("ibv_next_poll")
.allowlist_function("ibv_end_poll")
.allowlist_function("ibv_wc_read_opcode")
.allowlist_function("ibv_wc_read_vendor_err")
.allowlist_function("ibv_wc_read_byte_len")
.allowlist_function("ibv_wc_read_imm_data")
.allowlist_function("ibv_wr_rdma_write_imm")
.allowlist_function("ibv_wr_rdma_write")
.allowlist_function("ibv_wr_send")
.allowlist_function("ibv_wr_rdma_read")
.allowlist_type("ibv_.*")
.allowlist_type("efa.*")
.allowlist_var("IBV_LINK_LAYER_.*")
.allowlist_var("EFADV_QP_DRIVER_TYPE_.*")
.bitfield_enum("ibv_access_flags")
.bitfield_enum("ibv_create_cq_wc_flags")
.bitfield_enum("ibv_device_cap_flags")
Expand All @@ -64,6 +82,7 @@ fn main() {
.bitfield_enum("ibv_qp_attr_mask")
.bitfield_enum("ibv_qp_create_send_ops_flags")
.bitfield_enum("ibv_qp_init_attr_mask")
.bitfield_enum("ibv_qp_init_attr_ex_mask")
.bitfield_enum("ibv_qp_open_attr_mask")
.bitfield_enum("ibv_raw_packet_caps")
.bitfield_enum("ibv_rx_hash_fields")
Expand Down
50 changes: 50 additions & 0 deletions ibverbs-sys/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,58 @@
// Suppress expected warnings from bindgen-generated code.
// See https://github.com/rust-lang/rust-bindgen/issues/1651.
#![allow(deref_nullptr)]

include!(concat!(env!("OUT_DIR"), "/bindings.rs"));

// Dynamic EFA loading module
#[cfg(feature = "efa")]
mod efa_dynamic {
use libloading::{Library, Symbol};
use std::sync::OnceLock;

static EFA_LIBRARY: OnceLock<Result<Library, libloading::Error>> = OnceLock::new();

fn load_efa_library() -> Result<&'static Library, &'static libloading::Error> {
EFA_LIBRARY.get_or_init(|| {
unsafe { Library::new("libefa.so") }
}).as_ref()
}

// Function pointer types for EFA functions
type EfadvCreateQpExFn = unsafe extern "C" fn(
ibv_ctx: *mut crate::ibv_context,
attr: *mut crate::ibv_qp_init_attr_ex,
efa_attr: *mut crate::efadv_qp_init_attr,
efa_attr_size: u32,
) -> *mut crate::ibv_qp;

// Lazy-loaded EFA functions
pub struct EfaFunctions {
pub efadv_create_qp_ex: Symbol<'static, EfadvCreateQpExFn>,
}

impl EfaFunctions {
pub fn load() -> Result<Self, String> {
let lib = load_efa_library().map_err(|e| format!("Failed to load EFA library: {}", e))?;

unsafe {
Ok(Self {
efadv_create_qp_ex: lib.get(b"efadv_create_qp_ex").map_err(|e| format!("Failed to load efadv_create_qp_ex: {}", e))?,
})
}
}
}

static EFA_FUNCTIONS: OnceLock<Result<EfaFunctions, String>> = OnceLock::new();

pub fn get_efa_functions() -> Result<&'static EfaFunctions, &'static String> {
EFA_FUNCTIONS.get_or_init(|| EfaFunctions::load()).as_ref()
}
}

#[cfg(feature = "efa")]
pub use efa_dynamic::get_efa_functions;

/// An ibverb work completion.
#[repr(C)]
#[derive(Debug, Copy, Clone)]
Expand Down
3 changes: 2 additions & 1 deletion ibverbs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ license = "MIT OR Apache-2.0"

[dependencies]
ffi = { path = "../ibverbs-sys", package = "ibverbs-sys", version = "0.3.0" }
nix = { version = "0.29.0", default-features = false, features = ["fs", "poll"] }
nix = { version = "0.29.0", default-features = false, features = ["fs", "poll", "mman", "feature"] }

[dependencies.serde]
version = "1.0.100"
Expand All @@ -29,6 +29,7 @@ features = ["derive"]

[features]
default = ["serde"]
efa = ["ffi/efa"]

[dev-dependencies]
bincode = "1.3"
206 changes: 206 additions & 0 deletions ibverbs/examples/efa_read.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
//! EFA Read Example
//!
//! Run with `cargo run --example efa_read --features efa`

fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
println!("Starting EFA one-sided read example with separate reader/writer tasks...");

// Channels to communicate the endpoints between reader and writer.
let (reader_tx, reader_rx) = std::sync::mpsc::channel();
let (writer_tx, writer_rx) = std::sync::mpsc::channel();
// Channels to communicate the remote MRs between reader and writer.
let (reader_remote_mr_tx, reader_remote_mr_rx) = std::sync::mpsc::channel();
let (writer_remote_mr_tx, writer_remote_mr_rx) = std::sync::mpsc::channel();
// Chan to communicate that the writer is ready with data.
let (writer_is_ready_tx, writer_is_ready_rx) = std::sync::mpsc::channel();
// Chan to communicate that the reader has finished reading.
let (reader_done_tx, reader_done_rx) = std::sync::mpsc::channel();

// Spawn reader and writer tasks
let reader_handle = std::thread::spawn(move || {
reader_task(reader_tx, writer_rx, reader_remote_mr_tx, writer_remote_mr_rx, writer_is_ready_rx, reader_done_tx)
});

let writer_handle = std::thread::spawn(move || {
writer_task(writer_tx, reader_rx, writer_remote_mr_tx, reader_remote_mr_rx, writer_is_ready_tx, reader_done_rx)
});

// Wait for both tasks to complete
let reader_result = reader_handle.join().map_err(|_| "Reader thread panicked")?;
let writer_result = writer_handle.join().map_err(|_| "Writer thread panicked")?;

reader_result?;
writer_result?;

println!("Both reader and writer tasks completed successfully!");
Ok(())
}

fn reader_task(
endpoint_tx: std::sync::mpsc::Sender<ibverbs::QueuePairEndpoint>,
peer_rx: std::sync::mpsc::Receiver<ibverbs::QueuePairEndpoint>,
remote_mr_tx: std::sync::mpsc::Sender<ibverbs::RemoteMemorySlice>,
remote_mr_rx: std::sync::mpsc::Receiver<ibverbs::RemoteMemorySlice>,
writer_is_ready_rx: std::sync::mpsc::Receiver<bool>,
reader_done_tx: std::sync::mpsc::Sender<bool>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// Open RDMA device - assume EFA compatibility
let ctx = match ibverbs::devices()
.unwrap()
.iter()
.nth(1) {
Some(dev) => dev.open_with_efa().unwrap(),
None => {
eprintln!("No RDMA devices found!");
return Ok(());
}
};

// Create completion queue and protection domain
let cq = ctx.create_cq(16, 0).unwrap();
let pd = ctx.alloc_pd().unwrap();

// Create EFA QP
let qp_builder_result = pd
.create_qp(&cq, &cq, ibverbs::ibv_qp_type::IBV_QPT_DRIVER)
.and_then(|mut builder| builder.set_gid_index(0).build());

let qp_builder = match qp_builder_result {
Ok(builder) => builder,
Err(e) => {
println!("EFA QP creation failed: {}. This is expected if EFA is not available.", e);
return Ok(());
}
};

let my_endpoint = qp_builder.endpoint()?;

// Allocate and register memory regions
let efa_access_flags = ibverbs::ibv_access_flags(
ibverbs::ibv_access_flags::IBV_ACCESS_LOCAL_WRITE.0 |
ibverbs::ibv_access_flags::IBV_ACCESS_REMOTE_WRITE.0 |
ibverbs::ibv_access_flags::IBV_ACCESS_REMOTE_READ.0
);

let local_mr = pd.allocate_with_permissions(4096, efa_access_flags).unwrap();

// Exchange endpoints and memory regions
endpoint_tx.send(my_endpoint).map_err(|_| "Failed to send endpoint")?;
let peer_endpoint = peer_rx.recv().map_err(|_| "Failed to receive peer endpoint")?;
let remote_mr = remote_mr_rx.recv().unwrap();
remote_mr_tx.send(local_mr.remote()).unwrap();

// Move QP to RTR states and create remote AH
let mut qp = qp_builder.handshake(peer_endpoint).unwrap();

// Wait for writer to be ready with data
let writer_is_ready = writer_is_ready_rx.recv().unwrap();
if !writer_is_ready {
return Err("Writer is not ready".into());
}

// Post EFA read from remote memory into local memory
qp.post_read_efa(&[local_mr.slice(..4096)], remote_mr, peer_endpoint, 0, None).unwrap();

// Wait for completion
let mut completions = [ibverbs::ibv_wc::default(); 16];
loop {
let completed = cq.poll(&mut completions[..]).unwrap();
if completed.is_empty() {
continue;
}
for wr in completed {
if wr.wr_id() == 0 {
if let Some((wc_code, vendor_err)) = wr.error() {
println!("EFA read failed: wc_code={:?}, vendor_err={:?}", wc_code, vendor_err);
return Err("EFA read failed".into());
}
// Check the data that was read
let data = local_mr.inner();
let read_number = u64::from_le_bytes(data[..8].try_into().unwrap());
println!("Read number: {}", read_number);

// Notify writer that reading is complete
reader_done_tx.send(true).unwrap();
println!("Reader notified writer of completion");

return Ok(());
}
}
}
}

fn writer_task(
endpoint_tx: std::sync::mpsc::Sender<ibverbs::QueuePairEndpoint>,
peer_rx: std::sync::mpsc::Receiver<ibverbs::QueuePairEndpoint>,
remote_mr_tx: std::sync::mpsc::Sender<ibverbs::RemoteMemorySlice>,
remote_mr_rx: std::sync::mpsc::Receiver<ibverbs::RemoteMemorySlice>,
writer_is_ready_tx: std::sync::mpsc::Sender<bool>,
reader_done_rx: std::sync::mpsc::Receiver<bool>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// Open RDMA device - assume EFA compatibility
let ctx = match ibverbs::devices()
.unwrap()
.iter()
.nth(2) {
Some(dev) => dev.open_with_efa().unwrap(),
None => {
eprintln!("No RDMA devices found!");
return Ok(());
}
};

// Create completion queue and protection domain
let cq = ctx.create_cq(16, 0).unwrap();
let pd = ctx.alloc_pd().unwrap();

// Create EFA QP
let qp_builder_result = pd
.create_qp(&cq, &cq, ibverbs::ibv_qp_type::IBV_QPT_DRIVER)
.and_then(|mut builder| builder.set_gid_index(0).build());

let qp_builder = match qp_builder_result {
Ok(builder) => builder,
Err(e) => {
println!("EFA QP creation failed: {}. This is expected if EFA is not available.", e);
return Ok(());
}
};

let my_endpoint = qp_builder.endpoint()?;

// Allocate and register memory regions
let efa_access_flags = ibverbs::ibv_access_flags(
ibverbs::ibv_access_flags::IBV_ACCESS_LOCAL_WRITE.0 |
ibverbs::ibv_access_flags::IBV_ACCESS_REMOTE_WRITE.0 |
ibverbs::ibv_access_flags::IBV_ACCESS_REMOTE_READ.0
);

let mut local_mr = pd.allocate_with_permissions(4096, efa_access_flags).unwrap();

// Write a simple number to the local memory region
let test_number: u64 = 42;
local_mr.inner_mut()[..std::mem::size_of::<u64>()].copy_from_slice(&test_number.to_le_bytes());
println!("Writer prepared test number: {}", test_number);

// Exchange endpoints and memory regions
endpoint_tx.send(my_endpoint).map_err(|_| "Failed to send endpoint")?;
let peer_endpoint = peer_rx.recv().map_err(|_| "Failed to receive peer endpoint")?;
remote_mr_tx.send(local_mr.remote()).unwrap();
let _remote_mr = remote_mr_rx.recv().unwrap();

// Move QP to RTR states and create remote AH
let _qp = qp_builder.handshake(peer_endpoint).unwrap();

// Signal ready to reader
writer_is_ready_tx.send(true).unwrap();

// Writer waits for reader to complete reading
println!("Writer is ready and waiting for reader to complete...");
let reader_done = reader_done_rx.recv().unwrap();
if reader_done {
println!("Writer received completion notification from reader");
}

Ok(())
}
Loading