@@ -6,21 +6,40 @@ use std::task::{Context, Poll, Waker};
66
77use io_uring:: { cqueue, squeue} ;
88
9+ mod slab_list;
10+
11+ use slab:: Slab ;
12+ use slab_list:: { SlabListEntry , SlabListIndices } ;
13+
914use crate :: driver;
1015use crate :: runtime:: CONTEXT ;
1116use crate :: util:: PhantomUnsendUnsync ;
1217
18+ /// A SlabList is used to hold unserved completions.
19+ ///
20+ /// This is relevant to multi-completion Operations,
21+ /// which require an unknown number of CQE events to be
22+ /// captured before completion.
23+ pub ( crate ) type Completion = SlabListEntry < CqeResult > ;
24+
1325/// In-flight operation
14- pub ( crate ) struct Op < T : ' static > {
26+ pub ( crate ) struct Op < T : ' static , CqeType = SingleCQE > {
1527 // Operation index in the slab
1628 pub ( super ) index : usize ,
1729
1830 // Per-operation data
1931 data : Option < T > ,
2032
33+ // CqeType marker
34+ _cqe_type : PhantomData < CqeType > ,
35+
36+ // Make !Send + !Sync
2137 _phantom : PhantomUnsendUnsync ,
2238}
2339
40+ /// A Marker for Ops which expect only a single completion event
41+ pub ( crate ) struct SingleCQE ;
42+
2443pub ( crate ) trait Completable {
2544 type Output ;
2645 fn complete ( self , cqe : CqeResult ) -> Self :: Output ;
@@ -39,12 +58,15 @@ pub(crate) enum Lifecycle {
3958
4059 /// The operation has completed with a single cqe result
4160 Completed ( CqeResult ) ,
61+
62+ /// One or more completion results have been recieved
63+ /// This holds the indices uniquely identifying the list within the slab
64+ CompletionList ( SlabListIndices ) ,
4265}
4366
4467/// A single CQE entry
4568pub ( crate ) struct CqeResult {
4669 pub ( crate ) result : io:: Result < u32 > ,
47- #[ allow( dead_code) ]
4870 pub ( crate ) flags : u32 ,
4971}
5072
@@ -61,7 +83,7 @@ impl From<cqueue::Entry> for CqeResult {
6183 }
6284}
6385
64- impl < T > Op < T >
86+ impl < T , CqeType > Op < T , CqeType >
6587where
6688 T : Completable ,
6789{
7092 Op {
7193 index : inner. ops . insert ( ) ,
7294 data : Some ( data) ,
95+ _cqe_type : PhantomData ,
7396 _phantom : PhantomData ,
7497 }
7598 }
@@ -114,7 +137,7 @@ where
114137 }
115138}
116139
117- impl < T > Future for Op < T >
140+ impl < T > Future for Op < T , SingleCQE >
118141where
119142 T : Unpin + ' static + Completable ,
120143{
@@ -127,7 +150,7 @@ where
127150
128151 CONTEXT . with ( |runtime_context| {
129152 runtime_context. with_driver_mut ( |driver| {
130- let lifecycle = driver
153+ let ( lifecycle, _ ) = driver
131154 . ops
132155 . get_mut ( me. index )
133156 . expect ( "invalid internal state" ) ;
@@ -149,31 +172,57 @@ where
149172 Lifecycle :: Completed ( cqe) => {
150173 driver. ops . remove ( me. index ) ;
151174 me. index = usize:: MAX ;
152-
153175 Poll :: Ready ( me. data . take ( ) . unwrap ( ) . complete ( cqe) )
154176 }
177+ Lifecycle :: CompletionList ( ..) => {
178+ unreachable ! ( "No `more` flag set for SingleCQE" )
179+ }
155180 }
156181 } )
157182 } )
158183 }
159184}
160185
161- impl < T > Drop for Op < T > {
186+ /// The operation may have pending cqe's not yet processed.
187+ /// To manage this, the lifecycle associated with the Op may if required
188+ /// be placed in LifeCycle::Ignored state to handle cqe's which arrive after
189+ /// the Op has been dropped.
190+ impl < T , CqeType > Drop for Op < T , CqeType > {
162191 fn drop ( & mut self ) {
192+ use std:: mem;
193+
163194 CONTEXT . with ( |runtime_context| {
164195 runtime_context. with_driver_mut ( |driver| {
165- let lifecycle = match driver. ops . get_mut ( self . index ) {
166- Some ( lifecycle) => lifecycle,
167- None => return ,
196+ // Get the Op Lifecycle state from the driver
197+ let ( lifecycle, completions) = match driver. ops . get_mut ( self . index ) {
198+ Some ( val) => val,
199+ None => {
200+ // Op dropped after the driver
201+ return ;
202+ }
168203 } ;
169204
170- match lifecycle {
205+ match mem :: replace ( lifecycle, Lifecycle :: Submitted ) {
171206 Lifecycle :: Submitted | Lifecycle :: Waiting ( _) => {
172207 * lifecycle = Lifecycle :: Ignored ( Box :: new ( self . data . take ( ) ) ) ;
173208 }
174209 Lifecycle :: Completed ( ..) => {
175210 driver. ops . remove ( self . index ) ;
176211 }
212+ Lifecycle :: CompletionList ( indices) => {
213+ // Deallocate list entries, recording if more CQE's are expected
214+ let more = {
215+ let mut list = indices. into_list ( completions) ;
216+ io_uring:: cqueue:: more ( list. peek_end ( ) . unwrap ( ) . flags )
217+ // Dropping list deallocates the list entries
218+ } ;
219+ if more {
220+ // If more are expected, we have to keep the op around
221+ * lifecycle = Lifecycle :: Ignored ( Box :: new ( self . data . take ( ) ) ) ;
222+ } else {
223+ driver. ops . remove ( self . index ) ;
224+ }
225+ }
177226 Lifecycle :: Ignored ( ..) => unreachable ! ( ) ,
178227 }
179228 } )
@@ -182,21 +231,54 @@ impl<T> Drop for Op<T> {
182231}
183232
184233impl Lifecycle {
185- pub ( super ) fn complete ( & mut self , cqe : CqeResult ) -> bool {
234+ pub ( super ) fn complete ( & mut self , completions : & mut Slab < Completion > , cqe : CqeResult ) -> bool {
186235 use std:: mem;
187236
188237 match mem:: replace ( self , Lifecycle :: Submitted ) {
189- Lifecycle :: Submitted => {
190- * self = Lifecycle :: Completed ( cqe) ;
238+ x @ Lifecycle :: Submitted | x @ Lifecycle :: Waiting ( ..) => {
239+ if io_uring:: cqueue:: more ( cqe. flags ) {
240+ let mut list = SlabListIndices :: new ( ) . into_list ( completions) ;
241+ list. push ( cqe) ;
242+ * self = Lifecycle :: CompletionList ( list. into_indices ( ) ) ;
243+ } else {
244+ * self = Lifecycle :: Completed ( cqe) ;
245+ }
246+ if let Lifecycle :: Waiting ( waker) = x {
247+ // waker is woken to notify cqe has arrived
248+ // Note: Maybe defer calling until cqe with !`more` flag set?
249+ waker. wake ( ) ;
250+ }
191251 false
192252 }
193- Lifecycle :: Waiting ( waker) => {
194- * self = Lifecycle :: Completed ( cqe) ;
195- waker. wake ( ) ;
253+
254+ lifecycle @ Lifecycle :: Ignored ( ..) => {
255+ if io_uring:: cqueue:: more ( cqe. flags ) {
256+ // Not yet complete. The Op has been dropped, so we can drop the CQE
257+ // but we must keep the lifecycle alive until no more CQE's expected
258+ * self = lifecycle;
259+ false
260+ } else {
261+ // This Op has completed, we can drop
262+ true
263+ }
264+ }
265+
266+ Lifecycle :: Completed ( ..) => {
267+ // Completions with more flag set go straight onto the slab,
268+ // and are handled in Lifecycle::CompletionList.
269+ // To construct Lifecycle::Completed, a CQE with `more` flag unset was received
270+ // we shouldn't be receiving another.
271+ unreachable ! ( "invalid operation state" )
272+ }
273+
274+ Lifecycle :: CompletionList ( indices) => {
275+ // A completion list may contain CQE's with and without `more` flag set.
276+ // Only the final one may have `more` unset, although we don't check.
277+ let mut list = indices. into_list ( completions) ;
278+ list. push ( cqe) ;
279+ * self = Lifecycle :: CompletionList ( list. into_indices ( ) ) ;
196280 false
197281 }
198- Lifecycle :: Ignored ( ..) => true ,
199- Lifecycle :: Completed ( ..) => unreachable ! ( "invalid operation state" ) ,
200282 }
201283 }
202284}
@@ -377,7 +459,10 @@ mod test {
377459
378460 fn release ( ) {
379461 CONTEXT . with ( |cx| {
380- cx. with_driver_mut ( |driver| driver. ops . lifecycle . clear ( ) ) ;
462+ cx. with_driver_mut ( |driver| {
463+ driver. ops . lifecycle . clear ( ) ;
464+ driver. ops . completions . clear ( ) ;
465+ } ) ;
381466
382467 cx. unset_driver ( ) ;
383468 } ) ;
0 commit comments