Skip to content

Commit 072301a

Browse files
lib: add register_buffers* functions (#313)
* lib: add `register_buffers_*` family of functions * add tests and docs * chore: update tests and names --------- Co-authored-by: Fathy Boundjadj <[email protected]>
1 parent 48d25cf commit 072301a

File tree

4 files changed

+320
-8
lines changed

4 files changed

+320
-8
lines changed

io-uring-test/src/main.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,9 @@ fn test<S: squeue::EntryMarker, C: cqueue::EntryMarker>(
7575

7676
// register
7777
tests::register::test_register_files_sparse(&mut ring, &test)?;
78-
tests::register_buf_ring::test_register_buf_ring(&mut ring, &test)?;
7978
tests::register_buffers::test_register_buffers(&mut ring, &test)?;
79+
tests::register_buffers::test_register_buffers_update(&mut ring, &test)?;
80+
tests::register_buf_ring::test_register_buf_ring(&mut ring, &test)?;
8081
tests::register_sync_cancel::test_register_sync_cancel(&mut ring, &test)?;
8182
tests::register_sync_cancel::test_register_sync_cancel_unsubmitted(&mut ring, &test)?;
8283
tests::register_sync_cancel::test_register_sync_cancel_any(&mut ring, &test)?;

io-uring-test/src/tests/register_buffers.rs

Lines changed: 214 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,24 +8,70 @@ use io_uring::{
88
};
99
use libc::iovec;
1010
use std::{
11+
fs::File,
1112
io::{self, IoSliceMut},
13+
io::{Error, Write},
1214
ops::DerefMut,
1315
os::fd::AsRawFd,
16+
os::fd::FromRawFd,
1417
};
1518

16-
/// Creates shared buffers to be registered into the [`IoUring`] instance, and then tests that they
17-
/// can actually be used through the [`WriteFixed`] and [`ReadFixed`] opcodes.
19+
use io_uring::{opcode, types};
20+
use libc::EFAULT;
21+
1822
pub fn test_register_buffers<S: squeue::EntryMarker, C: cqueue::EntryMarker>(
1923
ring: &mut IoUring<S, C>,
2024
test: &Test,
21-
) -> io::Result<()> {
25+
) -> anyhow::Result<()> {
26+
_test_register_buffers(
27+
ring,
28+
test,
29+
"register_buffers",
30+
None,
31+
|ring, iovecs, _| unsafe { ring.submitter().register_buffers(&iovecs) },
32+
)?;
33+
_test_register_buffers(
34+
ring,
35+
test,
36+
"register_buffers2",
37+
ring.params().is_feature_resource_tagging(),
38+
|ring, iovecs, tags| unsafe { ring.submitter().register_buffers2(iovecs, tags) },
39+
)?;
40+
_test_register_buffers(
41+
ring,
42+
test,
43+
"register_buffers_sparse",
44+
ring.params().is_feature_resource_tagging(),
45+
|ring, iovecs, _| {
46+
let submitter = ring.submitter();
47+
submitter.register_buffers_sparse(iovecs.len() as _)?;
48+
unsafe { submitter.register_buffers_update(0, iovecs, None) }
49+
},
50+
)?;
51+
52+
return Ok(());
53+
}
54+
55+
fn _test_register_buffers<
56+
S: squeue::EntryMarker,
57+
C: cqueue::EntryMarker,
58+
F: FnMut(&mut IoUring<S, C>, &[iovec], &[u64]) -> io::Result<()>,
59+
P: Into<Option<bool>>,
60+
>(
61+
ring: &mut IoUring<S, C>,
62+
test: &Test,
63+
name: &str,
64+
probe: P,
65+
mut register: F,
66+
) -> anyhow::Result<()> {
2267
require!(
2368
test;
2469
test.probe.is_supported(WriteFixed::CODE);
2570
test.probe.is_supported(ReadFixed::CODE);
71+
probe.into().unwrap_or(true);
2672
);
2773

28-
println!("test register_buffers");
74+
println!("test {name}");
2975

3076
const BUF_SIZE: usize = 1 << 12; // Page size
3177
const BUFFERS: usize = 8; // The maximum number of squeue entries (in main.rs)
@@ -56,11 +102,11 @@ pub fn test_register_buffers<S: squeue::EntryMarker, C: cqueue::EntryMarker>(
56102
// to cast these as `slices` is valid for this entire function
57103
let iovecs: &[iovec] =
58104
unsafe { std::slice::from_raw_parts(slices.as_ptr().cast(), slices.len()) };
105+
let tags = vec![0; slices.len()];
59106

60-
let submitter = ring.submitter();
61107
// Safety: Since `iovecs` is derived from valid `IoSliceMut`s, this upholds the safety contract
62108
// of `register_buffers` that the buffers are valid until the buffers are unregistered
63-
unsafe { submitter.register_buffers(iovecs)? };
109+
register(ring, iovecs, &tags).unwrap();
64110

65111
// Prepare writing the buffers out to the file
66112
(0..BUFFERS).for_each(|index| {
@@ -141,3 +187,165 @@ pub fn test_register_buffers<S: squeue::EntryMarker, C: cqueue::EntryMarker>(
141187

142188
Ok(())
143189
}
190+
191+
const BUFFER_TAG: u64 = 0xbadcafe;
192+
const TIMEOUT_TAG: u64 = 0xbadf00d;
193+
194+
pub fn test_register_buffers_update<S: squeue::EntryMarker, C: cqueue::EntryMarker>(
195+
ring: &mut IoUring<S, C>,
196+
test: &Test,
197+
) -> anyhow::Result<()> {
198+
require!(
199+
test;
200+
test.probe.is_supported(opcode::ReadFixed::CODE);
201+
ring.params().is_feature_resource_tagging();
202+
);
203+
204+
println!("test register_buffers_update");
205+
206+
let (read, mut write) = create_pipe()?;
207+
let mut buf = [0xde, 0xed, 0xbe, 0xef];
208+
let mut tags = [BUFFER_TAG];
209+
let iovecs = [libc::iovec {
210+
iov_len: buf.len(),
211+
iov_base: buf.as_mut_ptr() as _,
212+
}];
213+
let timeout = types::Timespec::new().nsec(50 * 1_000_000);
214+
let timeout = opcode::Timeout::new(&timeout as _)
215+
.build()
216+
.user_data(TIMEOUT_TAG)
217+
.into();
218+
let read_sqe = opcode::ReadFixed::new(
219+
types::Fd(read.as_raw_fd()),
220+
buf.as_mut_ptr(),
221+
buf.len() as _,
222+
5,
223+
)
224+
.build()
225+
.user_data(42)
226+
.into();
227+
228+
// Register a buffer table and then immediately unregister it
229+
ring.submitter().register_buffers_sparse(1)?;
230+
ring.submitter().unregister_buffers()?;
231+
232+
// Push a timeout of 50ms
233+
unsafe { ring.submission().push(&timeout).unwrap() };
234+
235+
// We should not receive any other entries than the timeout
236+
check_only_timeout(ring)?;
237+
238+
// Register a sparse buffer table of 10 elements
239+
ring.submitter().register_buffers_sparse(10)?;
240+
241+
// Try read the pipe using a sparse buffer
242+
let cqe = {
243+
write.write("yo".as_bytes())?;
244+
245+
unsafe { ring.submission().push(&read_sqe).unwrap() };
246+
247+
ring.submit_and_wait(1)?;
248+
249+
ring.completion().next().unwrap().into()
250+
};
251+
252+
// We should get the correct user_data
253+
if cqe.user_data() != 42 {
254+
return Err(anyhow::anyhow!("unexpected completion queue entry"));
255+
}
256+
257+
// EFAULT is to be expected with incorrect fixed buffers
258+
if cqe.result() != -EFAULT {
259+
return Err(anyhow::anyhow!("unexpected read result: {}", cqe.result()));
260+
}
261+
262+
// Register a buffer at the index 5
263+
unsafe {
264+
ring.submitter()
265+
.register_buffers_update(5, &iovecs, Some(&tags))?;
266+
267+
// Push a timeout of 50ms
268+
ring.submission().push(&timeout).unwrap();
269+
}
270+
271+
// We should not receive any other entries than the timeout
272+
check_only_timeout(ring)?;
273+
274+
// Register a buffer at the same index 5, but this time with an empty tag.
275+
let cqe = {
276+
tags[0] = 0;
277+
278+
unsafe {
279+
ring.submitter()
280+
.register_buffers_update(5, &iovecs, Some(&tags))?;
281+
}
282+
ring.submit_and_wait(1)?;
283+
284+
ring.completion().next().unwrap().into()
285+
};
286+
287+
// We should get a cqe with the first tag because we registered a
288+
// new buffer at an index where a buffer was already registered.
289+
if cqe.user_data() != BUFFER_TAG {
290+
return Err(anyhow::anyhow!(
291+
"expected completion queue to contain a buffer unregistered event"
292+
));
293+
}
294+
295+
// Try reading now that the buffer is registered at index 5
296+
let cqe = {
297+
unsafe {
298+
ring.submission().push(&read_sqe).unwrap();
299+
}
300+
301+
ring.submit_and_wait(1)?;
302+
303+
ring.completion().next().unwrap().into()
304+
};
305+
306+
// We should get the correct user_data
307+
if cqe.user_data() != 42 {
308+
return Err(anyhow::anyhow!("unexpected completion queue entry"));
309+
}
310+
311+
// We should read exactly two bytes
312+
if cqe.result() != 2 {
313+
return Err(anyhow::anyhow!("unexpected read result: {}", cqe.result()));
314+
}
315+
316+
// The first two bytes of `buf` should be "yo"
317+
if &buf[0..2] != "yo".as_bytes() {
318+
return Err(anyhow::anyhow!("unexpected read buffer data: {:x?}", &buf));
319+
}
320+
321+
return Ok(());
322+
}
323+
324+
/// Create a pipe and return both ends as RAII `File` handles
325+
fn create_pipe() -> io::Result<(File, File)> {
326+
let mut fds = [-1, -1];
327+
328+
unsafe {
329+
if libc::pipe(fds.as_mut_ptr()) == -1 {
330+
Err(Error::last_os_error())
331+
} else {
332+
Ok((File::from_raw_fd(fds[0]), File::from_raw_fd(fds[1])))
333+
}
334+
}
335+
}
336+
337+
/// Submit sqes and asserts the only cqe is a timeout entry
338+
fn check_only_timeout<S: squeue::EntryMarker, C: cqueue::EntryMarker>(
339+
ring: &mut IoUring<S, C>,
340+
) -> Result<(), anyhow::Error> {
341+
ring.submit_and_wait(1)?;
342+
343+
if Into::<cqueue::Entry>::into(ring.completion().next().unwrap()).user_data() == TIMEOUT_TAG {
344+
// There should not be any more entries in the queue
345+
if ring.completion().next().is_none() {
346+
return Ok(());
347+
}
348+
}
349+
350+
Err(anyhow::anyhow!("unexpected completion queue entry"))
351+
}

src/lib.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -354,7 +354,6 @@ impl<S: squeue::EntryMarker, C: cqueue::EntryMarker> Builder<S, C> {
354354
/// events. You are only able to [register restrictions](Submitter::register_restrictions) when
355355
/// the rings are disabled due to concurrency issues. You can enable the rings with
356356
/// [`Submitter::register_enable_rings`]. Available since 5.10.
357-
358357
pub fn setup_r_disabled(&mut self) -> &mut Self {
359358
self.params.flags |= sys::IORING_SETUP_R_DISABLED;
360359
self

src/submit.rs

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,108 @@ impl<'a> Submitter<'a> {
193193
.map(drop)
194194
}
195195

196+
/// Update a range of fixed buffers starting at `offset`.
197+
///
198+
/// This is required to use buffers registered using
199+
/// [`register_buffers_sparse`](Self::register_buffers_sparse),
200+
/// although it can be also be used with [`register_buffers`](Self::register_buffers).
201+
///
202+
/// See [`register_buffers2`](Self::register_buffers2)
203+
/// for more information about resource tagging.
204+
///
205+
/// Available since Linux 5.13.
206+
///
207+
/// # Safety
208+
///
209+
/// Developers must ensure that the `iov_base` and `iov_len` values are valid and will
210+
/// be valid until buffers are unregistered or the ring destroyed, otherwise undefined
211+
/// behaviour may occur.
212+
pub unsafe fn register_buffers_update(
213+
&self,
214+
offset: u32,
215+
bufs: &[libc::iovec],
216+
tags: Option<&[u64]>,
217+
) -> io::Result<()> {
218+
let nr = tags
219+
.as_ref()
220+
.map_or(bufs.len(), |tags| bufs.len().min(tags.len()));
221+
222+
let rr = sys::io_uring_rsrc_update2 {
223+
nr: nr as _,
224+
data: bufs.as_ptr() as _,
225+
tags: tags.map(|tags| tags.as_ptr() as _).unwrap_or(0),
226+
offset,
227+
..Default::default()
228+
};
229+
230+
execute(
231+
self.fd.as_raw_fd(),
232+
sys::IORING_REGISTER_BUFFERS_UPDATE,
233+
cast_ptr::<sys::io_uring_rsrc_update2>(&rr).cast(),
234+
std::mem::size_of::<sys::io_uring_rsrc_update2>() as _,
235+
)
236+
.map(drop)
237+
}
238+
239+
/// Variant of [`register_buffers`](Self::register_buffers)
240+
/// with resource tagging.
241+
///
242+
/// `tags` should be the same length as `bufs` and contain the
243+
/// tag value corresponding to the buffer at the same index.
244+
///
245+
/// If a tag is zero, then tagging for this particular resource
246+
/// (a buffer in this case) is disabled. Otherwise, after the
247+
/// resource had been unregistered and it's not used anymore,
248+
/// a CQE will be posted with `user_data` set to the specified
249+
/// tag and all other fields zeroed.
250+
///
251+
/// Available since Linux 5.13.
252+
///
253+
/// # Safety
254+
///
255+
/// Developers must ensure that the `iov_base` and `iov_len` values are valid and will
256+
/// be valid until buffers are unregistered or the ring destroyed, otherwise undefined
257+
/// behaviour may occur.
258+
pub unsafe fn register_buffers2(&self, bufs: &[libc::iovec], tags: &[u64]) -> io::Result<()> {
259+
let rr = sys::io_uring_rsrc_register {
260+
nr: bufs.len().min(tags.len()) as _,
261+
data: bufs.as_ptr() as _,
262+
tags: tags.as_ptr() as _,
263+
..Default::default()
264+
};
265+
execute(
266+
self.fd.as_raw_fd(),
267+
sys::IORING_REGISTER_BUFFERS2,
268+
cast_ptr::<sys::io_uring_rsrc_register>(&rr).cast(),
269+
std::mem::size_of::<sys::io_uring_rsrc_register>() as _,
270+
)
271+
.map(drop)
272+
}
273+
274+
/// Registers an empty table of nr fixed buffers buffers.
275+
///
276+
/// These must be updated before use, using eg.
277+
/// [`register_buffers_update`](Self::register_buffers_update).
278+
///
279+
/// See [`register_buffers`](Self::register_buffers)
280+
/// for more information about fixed buffers.
281+
///
282+
/// Available since Linux 5.13.
283+
pub fn register_buffers_sparse(&self, nr: u32) -> io::Result<()> {
284+
let rr = sys::io_uring_rsrc_register {
285+
nr,
286+
flags: sys::IORING_RSRC_REGISTER_SPARSE,
287+
..Default::default()
288+
};
289+
execute(
290+
self.fd.as_raw_fd(),
291+
sys::IORING_REGISTER_BUFFERS2,
292+
cast_ptr::<sys::io_uring_rsrc_register>(&rr).cast(),
293+
std::mem::size_of::<sys::io_uring_rsrc_register>() as _,
294+
)
295+
.map(drop)
296+
}
297+
196298
/// Registers an empty file table of nr_files number of file descriptors. The sparse variant is
197299
/// available in kernels 5.19 and later.
198300
///
@@ -331,6 +433,8 @@ impl<'a> Submitter<'a> {
331433
///
332434
/// You do not need to explicitly call this before dropping the [`IoUring`](crate::IoUring), as
333435
/// it will be cleaned up by the kernel automatically.
436+
///
437+
/// Available since Linux 5.1.
334438
pub fn unregister_buffers(&self) -> io::Result<()> {
335439
execute(
336440
self.fd.as_raw_fd(),

0 commit comments

Comments
 (0)