Skip to content

Commit ecaaa3e

Browse files
ollie-etlollie-etl
andauthored
Support multi completion events: v2 (tokio-rs#130)
* Add Marker SingleCqe This is used to parameterize Op<T, CqeType>. It is now possible to specialize blanket implementations, such as Future, to classes of Ops * Add CompletionList Adds infrastructure to store Completions in a slab backed indexed linked list * Add CompletionList variant Provides the infrastructure for receiving multiple CQE's for a single Op's lifecycle * fmt * clippy * Add the overlooked Lifecycle::Ignored state logic * Newtype Cqe result tuple * Add comments and generalize * Make singly linked * Add some tests * fmt * Add missing cfg(test) * Fix over-zealous refactoring * Update no_op complete signature * fmt * Address nits * More docs Co-authored-by: ollie-etl <Oliver [email protected]>
1 parent c069cc6 commit ecaaa3e

File tree

3 files changed

+287
-23
lines changed

3 files changed

+287
-23
lines changed

src/driver/mod.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,9 @@ struct Ops {
5656
// When dropping the driver, all in-flight operations must have completed. This
5757
// type wraps the slab and ensures that, on drop, the slab is empty.
5858
lifecycle: Slab<op::Lifecycle>,
59+
60+
/// Received but unserviced Op completions
61+
completions: Slab<op::Completion>,
5962
}
6063

6164
impl Driver {
@@ -133,11 +136,15 @@ impl Ops {
133136
fn new() -> Ops {
134137
Ops {
135138
lifecycle: Slab::with_capacity(64),
139+
completions: Slab::with_capacity(64),
136140
}
137141
}
138142

139-
fn get_mut(&mut self, index: usize) -> Option<&mut op::Lifecycle> {
140-
self.lifecycle.get_mut(index)
143+
fn get_mut(&mut self, index: usize) -> Option<(&mut op::Lifecycle, &mut Slab<op::Completion>)> {
144+
let completions = &mut self.completions;
145+
self.lifecycle
146+
.get_mut(index)
147+
.map(|lifecycle| (lifecycle, completions))
141148
}
142149

143150
// Insert a new operation
@@ -151,7 +158,8 @@ impl Ops {
151158
}
152159

153160
fn complete(&mut self, index: usize, cqe: op::CqeResult) {
154-
if self.lifecycle[index].complete(cqe) {
161+
let completions = &mut self.completions;
162+
if self.lifecycle[index].complete(completions, cqe) {
155163
self.lifecycle.remove(index);
156164
}
157165
}
@@ -160,5 +168,6 @@ impl Ops {
160168
impl Drop for Ops {
161169
fn drop(&mut self) {
162170
assert!(self.lifecycle.is_empty());
171+
assert!(self.completions.is_empty());
163172
}
164173
}

src/driver/op.rs

Lines changed: 105 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -6,21 +6,40 @@ use std::task::{Context, Poll, Waker};
66

77
use io_uring::{cqueue, squeue};
88

9+
mod slab_list;
10+
11+
use slab::Slab;
12+
use slab_list::{SlabListEntry, SlabListIndices};
13+
914
use crate::driver;
1015
use crate::runtime::CONTEXT;
1116
use crate::util::PhantomUnsendUnsync;
1217

18+
/// A SlabList is used to hold unserved completions.
19+
///
20+
/// This is relevant to multi-completion Operations,
21+
/// which require an unknown number of CQE events to be
22+
/// captured before completion.
23+
pub(crate) type Completion = SlabListEntry<CqeResult>;
24+
1325
/// In-flight operation
14-
pub(crate) struct Op<T: 'static> {
26+
pub(crate) struct Op<T: 'static, CqeType = SingleCQE> {
1527
// Operation index in the slab
1628
pub(super) index: usize,
1729

1830
// Per-operation data
1931
data: Option<T>,
2032

33+
// CqeType marker
34+
_cqe_type: PhantomData<CqeType>,
35+
36+
// Make !Send + !Sync
2137
_phantom: PhantomUnsendUnsync,
2238
}
2339

40+
/// A Marker for Ops which expect only a single completion event
41+
pub(crate) struct SingleCQE;
42+
2443
pub(crate) trait Completable {
2544
type Output;
2645
fn complete(self, cqe: CqeResult) -> Self::Output;
@@ -39,12 +58,15 @@ pub(crate) enum Lifecycle {
3958

4059
/// The operation has completed with a single cqe result
4160
Completed(CqeResult),
61+
62+
/// One or more completion results have been recieved
63+
/// This holds the indices uniquely identifying the list within the slab
64+
CompletionList(SlabListIndices),
4265
}
4366

4467
/// A single CQE entry
4568
pub(crate) struct CqeResult {
4669
pub(crate) result: io::Result<u32>,
47-
#[allow(dead_code)]
4870
pub(crate) flags: u32,
4971
}
5072

@@ -61,7 +83,7 @@ impl From<cqueue::Entry> for CqeResult {
6183
}
6284
}
6385

64-
impl<T> Op<T>
86+
impl<T, CqeType> Op<T, CqeType>
6587
where
6688
T: Completable,
6789
{
@@ -70,6 +92,7 @@ where
7092
Op {
7193
index: inner.ops.insert(),
7294
data: Some(data),
95+
_cqe_type: PhantomData,
7396
_phantom: PhantomData,
7497
}
7598
}
@@ -114,7 +137,7 @@ where
114137
}
115138
}
116139

117-
impl<T> Future for Op<T>
140+
impl<T> Future for Op<T, SingleCQE>
118141
where
119142
T: Unpin + 'static + Completable,
120143
{
@@ -127,7 +150,7 @@ where
127150

128151
CONTEXT.with(|runtime_context| {
129152
runtime_context.with_driver_mut(|driver| {
130-
let lifecycle = driver
153+
let (lifecycle, _) = driver
131154
.ops
132155
.get_mut(me.index)
133156
.expect("invalid internal state");
@@ -149,31 +172,57 @@ where
149172
Lifecycle::Completed(cqe) => {
150173
driver.ops.remove(me.index);
151174
me.index = usize::MAX;
152-
153175
Poll::Ready(me.data.take().unwrap().complete(cqe))
154176
}
177+
Lifecycle::CompletionList(..) => {
178+
unreachable!("No `more` flag set for SingleCQE")
179+
}
155180
}
156181
})
157182
})
158183
}
159184
}
160185

161-
impl<T> Drop for Op<T> {
186+
/// The operation may have pending cqe's not yet processed.
187+
/// To manage this, the lifecycle associated with the Op may if required
188+
/// be placed in LifeCycle::Ignored state to handle cqe's which arrive after
189+
/// the Op has been dropped.
190+
impl<T, CqeType> Drop for Op<T, CqeType> {
162191
fn drop(&mut self) {
192+
use std::mem;
193+
163194
CONTEXT.with(|runtime_context| {
164195
runtime_context.with_driver_mut(|driver| {
165-
let lifecycle = match driver.ops.get_mut(self.index) {
166-
Some(lifecycle) => lifecycle,
167-
None => return,
196+
// Get the Op Lifecycle state from the driver
197+
let (lifecycle, completions) = match driver.ops.get_mut(self.index) {
198+
Some(val) => val,
199+
None => {
200+
// Op dropped after the driver
201+
return;
202+
}
168203
};
169204

170-
match lifecycle {
205+
match mem::replace(lifecycle, Lifecycle::Submitted) {
171206
Lifecycle::Submitted | Lifecycle::Waiting(_) => {
172207
*lifecycle = Lifecycle::Ignored(Box::new(self.data.take()));
173208
}
174209
Lifecycle::Completed(..) => {
175210
driver.ops.remove(self.index);
176211
}
212+
Lifecycle::CompletionList(indices) => {
213+
// Deallocate list entries, recording if more CQE's are expected
214+
let more = {
215+
let mut list = indices.into_list(completions);
216+
io_uring::cqueue::more(list.peek_end().unwrap().flags)
217+
// Dropping list deallocates the list entries
218+
};
219+
if more {
220+
// If more are expected, we have to keep the op around
221+
*lifecycle = Lifecycle::Ignored(Box::new(self.data.take()));
222+
} else {
223+
driver.ops.remove(self.index);
224+
}
225+
}
177226
Lifecycle::Ignored(..) => unreachable!(),
178227
}
179228
})
@@ -182,21 +231,54 @@ impl<T> Drop for Op<T> {
182231
}
183232

184233
impl Lifecycle {
185-
pub(super) fn complete(&mut self, cqe: CqeResult) -> bool {
234+
pub(super) fn complete(&mut self, completions: &mut Slab<Completion>, cqe: CqeResult) -> bool {
186235
use std::mem;
187236

188237
match mem::replace(self, Lifecycle::Submitted) {
189-
Lifecycle::Submitted => {
190-
*self = Lifecycle::Completed(cqe);
238+
x @ Lifecycle::Submitted | x @ Lifecycle::Waiting(..) => {
239+
if io_uring::cqueue::more(cqe.flags) {
240+
let mut list = SlabListIndices::new().into_list(completions);
241+
list.push(cqe);
242+
*self = Lifecycle::CompletionList(list.into_indices());
243+
} else {
244+
*self = Lifecycle::Completed(cqe);
245+
}
246+
if let Lifecycle::Waiting(waker) = x {
247+
// waker is woken to notify cqe has arrived
248+
// Note: Maybe defer calling until cqe with !`more` flag set?
249+
waker.wake();
250+
}
191251
false
192252
}
193-
Lifecycle::Waiting(waker) => {
194-
*self = Lifecycle::Completed(cqe);
195-
waker.wake();
253+
254+
lifecycle @ Lifecycle::Ignored(..) => {
255+
if io_uring::cqueue::more(cqe.flags) {
256+
// Not yet complete. The Op has been dropped, so we can drop the CQE
257+
// but we must keep the lifecycle alive until no more CQE's expected
258+
*self = lifecycle;
259+
false
260+
} else {
261+
// This Op has completed, we can drop
262+
true
263+
}
264+
}
265+
266+
Lifecycle::Completed(..) => {
267+
// Completions with more flag set go straight onto the slab,
268+
// and are handled in Lifecycle::CompletionList.
269+
// To construct Lifecycle::Completed, a CQE with `more` flag unset was received
270+
// we shouldn't be receiving another.
271+
unreachable!("invalid operation state")
272+
}
273+
274+
Lifecycle::CompletionList(indices) => {
275+
// A completion list may contain CQE's with and without `more` flag set.
276+
// Only the final one may have `more` unset, although we don't check.
277+
let mut list = indices.into_list(completions);
278+
list.push(cqe);
279+
*self = Lifecycle::CompletionList(list.into_indices());
196280
false
197281
}
198-
Lifecycle::Ignored(..) => true,
199-
Lifecycle::Completed(..) => unreachable!("invalid operation state"),
200282
}
201283
}
202284
}
@@ -377,7 +459,10 @@ mod test {
377459

378460
fn release() {
379461
CONTEXT.with(|cx| {
380-
cx.with_driver_mut(|driver| driver.ops.lifecycle.clear());
462+
cx.with_driver_mut(|driver| {
463+
driver.ops.lifecycle.clear();
464+
driver.ops.completions.clear();
465+
});
381466

382467
cx.unset_driver();
383468
});

0 commit comments

Comments
 (0)