Skip to content

Commit c585353

Browse files
authored
bearer: Inject SubmissionFlags (#41)
* bearer: Inject SubmissionFlags * Remove dead comments
1 parent 90fbba6 commit c585353

File tree

10 files changed

+136
-18
lines changed

10 files changed

+136
-18
lines changed

io-uring-bearer/src/error.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ pub enum UringBearerError {
2121
Submission(String),
2222
/// Something wrong Slab
2323
Slab(String),
24+
/// Invalid Submission Flags encountered
25+
SubmissionFlags,
2426
/// Slab was able to store the value but not get it? This is a bug.
2527
SlabBugSetGet(&'static str),
2628
/// Register handles error
@@ -64,6 +66,7 @@ impl Display for UringBearerError {
6466
Self::SubmissionPush => write!(f, "Submisionn push error. Is the squeue full?"),
6567
Self::Submission(s) => write!(f, "Submission: {}", s),
6668
Self::Slab(s) => write!(f, "Slab: {}", s),
69+
Self::SubmissionFlags => write!(f, "Invalid flags set in Submission"),
6770
Self::SlabBugSetGet(s) => write!(f, "Slab Bug: {}", s),
6871
Self::RegisterHandles(s) => write!(f, "Register Handles: {}", s),
6972
Self::Slabbable(e) => write!(f, "Slabbable: {}", e),

io-uring-bearer/src/lib.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,13 @@ pub(crate) mod fixed;
3333
mod capacity;
3434
pub use capacity::BearerCapacityKind;
3535

36+
//-----------------------------------------------
37+
// Submission types
38+
//-----------------------------------------------
39+
mod submission;
40+
#[doc(inline)]
41+
pub use submission::SubmissionFlags;
42+
3643
//-----------------------------------------------
3744
// Completion types
3845
//-----------------------------------------------

io-uring-bearer/src/submission.rs

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
//! Submission related types
2+
3+
use crate::error::UringBearerError;
4+
use io_uring::squeue::Flags as IoUringFlags;
5+
6+
/// See Linux io_uring_sqe_set_flags(3) for IOSQE_ASYNC for the respective documentation.
7+
/// These flags may be set for each submission queue entry. By default all flags are Off.
8+
#[derive(Clone, Debug, Default, PartialEq)]
9+
pub struct SubmissionFlags {
10+
pub(crate) bits: u8,
11+
}
12+
13+
impl SubmissionFlags {
14+
/// Submission of this entry will not start before all the prior entries are completed.
15+
/// Submissions after this entry will not complete before this entry is completed.
16+
#[inline]
17+
pub fn on_io_drain(mut self) -> Self {
18+
self.bits |= IoUringFlags::IO_DRAIN.bits();
19+
self
20+
}
21+
/// Submission is linked to the next after this so it will not start before this is completed.
22+
#[inline]
23+
pub fn on_io_link(mut self) -> Self {
24+
self.bits |= IoUringFlags::IO_LINK.bits();
25+
self
26+
}
27+
/// Same as on_io_link but does not sever regardless of completion result.
28+
// TODO(doc): what does this actually mean - e.g. will next one get cancelled if this fails ? etc.
29+
#[inline]
30+
pub fn on_io_hard_link(mut self) -> Self {
31+
self.bits |= IoUringFlags::IO_HARDLINK.bits();
32+
self
33+
}
34+
/// Issue direct async submission over non-blocking directly. See Linux io_uring_sqe_set_flags(3).
35+
#[inline]
36+
pub fn on_async(mut self) -> Self {
37+
self.bits |= IoUringFlags::ASYNC.bits();
38+
self
39+
}
40+
/// Signal to kernel to select a buffer from a given group id.
41+
#[inline]
42+
pub fn on_buffer_select(mut self) -> Self {
43+
self.bits |= IoUringFlags::BUFFER_SELECT.bits();
44+
self
45+
}
46+
/// Skip all the completion events when sucessfull out of this submission.
47+
#[inline]
48+
pub fn on_skip_success(mut self) -> Self {
49+
self.bits |= IoUringFlags::SKIP_SUCCESS.bits();
50+
self
51+
}
52+
/// Convert to [`io-uring::squeue::Flags`]
53+
#[inline]
54+
pub fn to_io_uring_flags(&self) -> Result<io_uring::squeue::Flags, UringBearerError> {
55+
match io_uring::squeue::Flags::from_bits(self.bits) {
56+
None => Err(UringBearerError::SubmissionFlags),
57+
Some(ret) => Ok(ret),
58+
}
59+
}
60+
}

io-uring-bearer/src/uring.rs

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use io_uring::IoUring;
2222

2323
use crate::completion::SubmissionRecordStatus;
2424
use crate::fixed::FixedFdRegister;
25+
use crate::SubmissionFlags;
2526

2627
use crate::slab::BuffersRec;
2728
use crate::slab::FutexRec;
@@ -177,34 +178,52 @@ impl<C: core::fmt::Debug + Clone + OpCompletion> UringBearer<C> {
177178
.map_err(|e| UringBearerError::Submission(e.to_string()))
178179
}
179180
/// Push a general Op implementing OpCode trait (see io-uring-opcode)
180-
pub fn push_op<Op: OpCode<C>>(&mut self, op: Op) -> Result<usize, UringBearerError> {
181+
pub fn push_op<Op: OpCode<C>>(
182+
&mut self,
183+
op: Op,
184+
flags: Option<SubmissionFlags>,
185+
) -> Result<usize, UringBearerError> {
181186
let key = self
182187
.fd_slab
183188
.take_next_with(Completion::Op(op.submission()?))
184189
.map_err(UringBearerError::Slabbable)?;
185190

186-
match self._push_to_completion(key) {
191+
match self._push_to_completion(key, flags) {
187192
Err(e) => Err(e),
188193
Ok(()) => Ok(key),
189194
}
190195
}
191196
/// Push a pending typed Completion directly
192-
pub fn push_op_typed(&mut self, op: Completion<C>) -> Result<usize, UringBearerError> {
197+
pub fn push_op_typed(
198+
&mut self,
199+
op: Completion<C>,
200+
flags: Option<SubmissionFlags>,
201+
) -> Result<usize, UringBearerError> {
193202
let key = self
194203
.fd_slab
195204
.take_next_with(op)
196205
.map_err(UringBearerError::Slabbable)?;
197206

198-
match self._push_to_completion(key) {
207+
match self._push_to_completion(key, flags) {
199208
Err(e) => Err(e),
200209
Ok(()) => Ok(key),
201210
}
202211
}
203212
#[inline]
204-
pub(crate) fn _push_to_completion(&mut self, idx: usize) -> Result<(), UringBearerError> {
213+
pub(crate) fn _push_to_completion(
214+
&mut self,
215+
idx: usize,
216+
in_flags: Option<SubmissionFlags>,
217+
) -> Result<(), UringBearerError> {
205218
let iou = &mut self.io_uring;
206219
let mut s_queue = iou.submission();
207220

221+
let flags = match in_flags {
222+
None => SubmissionFlags::default(),
223+
Some(fl) => fl,
224+
}
225+
.to_io_uring_flags()?;
226+
208227
let completion_rec = self
209228
.fd_slab
210229
.slot_get_mut(idx)
@@ -216,7 +235,7 @@ impl<C: core::fmt::Debug + Clone + OpCompletion> UringBearer<C> {
216235
return Err(UringBearerError::InvalidOwnership(completion.owner(), idx));
217236
}
218237
completion.force_owner_kernel();
219-
completion.entry().user_data(idx as u64)
238+
completion.entry().flags(flags).user_data(idx as u64)
220239
}
221240
_ => return Err(UringBearerError::SlabBugSetGet("Submisison not found?")),
222241
};

io-uring-bearer/src/uring/accept_multi.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,18 @@
33
use super::UringBearer;
44
use crate::error::UringBearerError;
55
use crate::Completion;
6+
use crate::SubmissionFlags;
67
use io_uring_opcode::OpExtAcceptMulti;
78
use io_uring_opcode::{OpCode, OpCompletion};
89
use slabbable::Slabbable;
910

1011
impl<C: core::fmt::Debug + Clone + OpCompletion> UringBearer<C> {
1112
/// Push a AcceptMulti implementing OpCode + traits (see io-uring-opcode)
12-
pub fn push_accept_multi<Op>(&mut self, op: Op) -> Result<usize, UringBearerError>
13+
pub fn push_accept_multi<Op>(
14+
&mut self,
15+
op: Op,
16+
flags: Option<SubmissionFlags>,
17+
) -> Result<usize, UringBearerError>
1318
where
1419
Op: OpCode<C> + OpExtAcceptMulti,
1520
{
@@ -18,7 +23,7 @@ impl<C: core::fmt::Debug + Clone + OpCompletion> UringBearer<C> {
1823
.take_next_with(Completion::AcceptMulti(op.submission()?))
1924
.map_err(UringBearerError::Slabbable)?;
2025

21-
match self._push_to_completion(key) {
26+
match self._push_to_completion(key, flags) {
2227
Err(e) => Err(e),
2328
Ok(()) => Ok(key),
2429
}

io-uring-bearer/src/uring/connect.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,18 @@
33
use super::UringBearer;
44
use crate::error::UringBearerError;
55
use crate::Completion;
6+
use crate::SubmissionFlags;
67
use io_uring_opcode::OpExtConnect;
78
use io_uring_opcode::{OpCode, OpCompletion};
89
use slabbable::Slabbable;
910

1011
impl<C: core::fmt::Debug + Clone + OpCompletion> UringBearer<C> {
1112
/// Push a Connect implementing OpCode + traits (see io-uring-opcode)
12-
pub fn push_connect<Op>(&mut self, op: Op) -> Result<usize, UringBearerError>
13+
pub fn push_connect<Op>(
14+
&mut self,
15+
op: Op,
16+
flags: Option<SubmissionFlags>,
17+
) -> Result<usize, UringBearerError>
1318
where
1419
Op: OpCode<C> + OpExtConnect,
1520
{
@@ -18,7 +23,7 @@ impl<C: core::fmt::Debug + Clone + OpCompletion> UringBearer<C> {
1823
.take_next_with(Completion::Connect(op.submission()?))
1924
.map_err(UringBearerError::Slabbable)?;
2025

21-
match self._push_to_completion(key) {
26+
match self._push_to_completion(key, flags) {
2227
Err(e) => Err(e),
2328
Ok(()) => Ok(key),
2429
}

io-uring-bearer/src/uring/epoll_ctl.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,18 @@
33
use super::UringBearer;
44
use crate::error::UringBearerError;
55
use crate::Completion;
6+
use crate::SubmissionFlags;
67
use io_uring_opcode::OpExtEpollCtl;
78
use io_uring_opcode::{OpCode, OpCompletion};
89
use slabbable::Slabbable;
910

1011
impl<C: core::fmt::Debug + Clone + OpCompletion> UringBearer<C> {
1112
/// Push an EpollCtl implementing OpCode + traits (see io-uring-opcode)
12-
pub fn push_epoll_ctl<Op>(&mut self, op: Op) -> Result<usize, UringBearerError>
13+
pub fn push_epoll_ctl<Op>(
14+
&mut self,
15+
op: Op,
16+
flags: Option<SubmissionFlags>,
17+
) -> Result<usize, UringBearerError>
1318
where
1419
Op: OpCode<C> + OpExtEpollCtl,
1520
{
@@ -18,7 +23,7 @@ impl<C: core::fmt::Debug + Clone + OpCompletion> UringBearer<C> {
1823
.take_next_with(Completion::EpollCtl(op.submission()?))
1924
.map_err(UringBearerError::Slabbable)?;
2025

21-
match self._push_to_completion(key) {
26+
match self._push_to_completion(key, flags) {
2227
Err(e) => Err(e),
2328
Ok(()) => Ok(key),
2429
}

io-uring-bearer/src/uring/recv.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,19 @@ use crate::Completion;
55
use crate::UringBearer;
66

77
use crate::slab::{RecvMultiRec, RecvRec};
8+
use crate::SubmissionFlags;
89

910
use io_uring_opcode::OpCompletion;
1011
use slabbable::Slabbable;
1112

1213
impl<C: core::fmt::Debug + Clone + OpCompletion> UringBearer<C> {
1314
/// Add Recv pending Completion. The referenced buffers index must hold only one buffer.
14-
pub fn add_recv(&mut self, fixed_fd: u32, buf_idx: usize) -> Result<usize, UringBearerError> {
15+
pub fn add_recv(
16+
&mut self,
17+
fixed_fd: u32,
18+
buf_idx: usize,
19+
flags: Option<SubmissionFlags>,
20+
) -> Result<usize, UringBearerError> {
1521
let taken_buf = self.take_one_mutable_buffer(buf_idx)?;
1622
if !self._fixed_fd_validate(fixed_fd) {
1723
return Err(UringBearerError::FdNotRegistered(fixed_fd));
@@ -21,14 +27,15 @@ impl<C: core::fmt::Debug + Clone + OpCompletion> UringBearer<C> {
2127
.take_next_with(Completion::Recv(RecvRec::new(fixed_fd as u32, taken_buf)))
2228
.map_err(UringBearerError::Slabbable)?;
2329

24-
self._push_to_completion(key)?;
30+
self._push_to_completion(key, flags)?;
2531
Ok(key)
2632
}
2733
/// Add RecvMulti pending Completion
2834
pub fn add_recv_multi(
2935
&mut self,
3036
fixed_fd: u32,
3137
buf_group: u16,
38+
flags: Option<SubmissionFlags>,
3239
) -> Result<usize, UringBearerError> {
3340
if !self._fixed_fd_validate(fixed_fd) {
3441
return Err(UringBearerError::FdNotRegistered(fixed_fd));
@@ -41,7 +48,7 @@ impl<C: core::fmt::Debug + Clone + OpCompletion> UringBearer<C> {
4148
)))
4249
.map_err(UringBearerError::Slabbable)?;
4350

44-
self._push_to_completion(key)?;
51+
self._push_to_completion(key, flags)?;
4552
Ok(key)
4653
}
4754
}

io-uring-bearer/src/uring/send_zc.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
33
use crate::uring::UringBearerError;
44
use crate::Completion;
5+
use crate::SubmissionFlags;
56
use crate::UringBearer;
67

78
use crate::slab::SendZcRec;
@@ -18,6 +19,7 @@ impl<C: core::fmt::Debug + Clone + OpCompletion> UringBearer<C> {
1819
fixed_fd: u32,
1920
buf_idx: usize,
2021
kernel_index: u16,
22+
flags: Option<SubmissionFlags>,
2123
) -> Result<usize, UringBearerError> {
2224
let taken_buf = self.take_one_immutable_buffer(buf_idx, kernel_index)?;
2325
if !self._fixed_fd_validate(fixed_fd) {
@@ -31,7 +33,7 @@ impl<C: core::fmt::Debug + Clone + OpCompletion> UringBearer<C> {
3133
)))
3234
.map_err(UringBearerError::Slabbable)?;
3335

34-
self._push_to_completion(key)?;
36+
self._push_to_completion(key, flags)?;
3537
Ok(key)
3638
}
3739
}

io-uring-bearer/src/uring/socket.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,18 @@
33
use super::UringBearer;
44
use crate::error::UringBearerError;
55
use crate::Completion;
6+
use crate::SubmissionFlags;
67
use io_uring_opcode::OpExtSocket;
78
use io_uring_opcode::{OpCode, OpCompletion};
89
use slabbable::Slabbable;
910

1011
impl<C: core::fmt::Debug + Clone + OpCompletion> UringBearer<C> {
1112
/// Push a Socket implementing OpCode + traits (see io-uring-opcode)
12-
pub fn push_socket<Op>(&mut self, op: Op) -> Result<usize, UringBearerError>
13+
pub fn push_socket<Op>(
14+
&mut self,
15+
op: Op,
16+
flags: Option<SubmissionFlags>,
17+
) -> Result<usize, UringBearerError>
1318
where
1419
Op: OpCode<C> + OpExtSocket,
1520
{
@@ -18,7 +23,7 @@ impl<C: core::fmt::Debug + Clone + OpCompletion> UringBearer<C> {
1823
.take_next_with(Completion::Socket(op.submission()?))
1924
.map_err(UringBearerError::Slabbable)?;
2025

21-
match self._push_to_completion(key) {
26+
match self._push_to_completion(key, flags) {
2227
Err(e) => Err(e),
2328
Ok(()) => Ok(key),
2429
}

0 commit comments

Comments
 (0)