@@ -39,6 +39,8 @@ mod write;
3939
4040mod writev;
4141
42+ use crate :: driver:: op:: Lifecycle ;
43+ use io_uring:: opcode:: AsyncCancel ;
4244use io_uring:: IoUring ;
4345use slab:: Slab ;
4446use std:: io;
@@ -75,6 +77,8 @@ impl Driver {
7577 self . uring . submit_and_wait ( 1 )
7678 }
7779
80+ // only used in tests rn
81+ #[ allow( unused) ]
7882 fn num_operations ( & self ) -> usize {
7983 self . ops . lifecycle . len ( )
8084 }
@@ -107,9 +111,10 @@ impl Driver {
107111 Err ( ref e) if e. raw_os_error ( ) == Some ( libc:: EBUSY ) => {
108112 self . tick ( ) ;
109113 }
110- Err ( e) => {
114+ Err ( e) if e . raw_os_error ( ) != Some ( libc :: EINTR ) => {
111115 return Err ( e) ;
112116 }
117+ _ => continue ,
113118 }
114119 }
115120 }
@@ -121,9 +126,54 @@ impl AsRawFd for Driver {
121126 }
122127}
123128
129+ /// Drop the driver, cancelling any in-progress ops and waiting for them to terminate.
130+ ///
131+ /// This first cancels all ops and then waits for them to be moved to the completed lifecycle phase.
132+ ///
133+ /// It is possible for this to be run without previously dropping the runtime, but this should only
134+ /// be possible in the case of [`std::process::exit`].
135+ ///
136+ /// This depends on us knowing when ops are completed and done firing.
137+ /// When multishot ops are added (support exists but none are implemented), a way to know if such
138+ /// an op is finished MUST be added, otherwise our shutdown process is unsound.
124139impl Drop for Driver {
125140 fn drop ( & mut self ) {
126- while self . num_operations ( ) > 0 {
141+ // get all ops in flight for cancellation
142+ while !self . uring . submission ( ) . is_empty ( ) {
143+ self . submit ( ) . expect ( "Internal error when dropping driver" ) ;
144+ }
145+
146+ // pre-determine what to cancel
147+ let mut cancellable_ops = Vec :: new ( ) ;
148+ for ( id, cycle) in self . ops . lifecycle . iter ( ) {
149+ // don't cancel completed items
150+ if !matches ! ( cycle, Lifecycle :: Completed ( _) ) {
151+ cancellable_ops. push ( id) ;
152+ }
153+ }
154+
155+ // cancel all ops
156+ for id in cancellable_ops {
157+ unsafe {
158+ while self
159+ . uring
160+ . submission ( )
161+ . push ( & AsyncCancel :: new ( id as u64 ) . build ( ) . user_data ( u64:: MAX ) )
162+ . is_err ( )
163+ {
164+ self . submit ( ) . expect ( "Internal error when dropping driver" ) ;
165+ }
166+ }
167+ }
168+
169+ // TODO: add a way to know if a multishot op is done sending completions
170+ // SAFETY: this is currently unsound for multishot ops
171+ while !self
172+ . ops
173+ . lifecycle
174+ . iter ( )
175+ . all ( |( _, cycle) | matches ! ( cycle, Lifecycle :: Completed ( _) ) )
176+ {
127177 // If waiting fails, ignore the error. The wait will be attempted
128178 // again on the next loop.
129179 let _ = self . wait ( ) ;
@@ -167,7 +217,9 @@ impl Ops {
167217
168218impl Drop for Ops {
169219 fn drop ( & mut self ) {
170- assert ! ( self . lifecycle. is_empty( ) ) ;
171- assert ! ( self . completions. is_empty( ) ) ;
220+ assert ! ( self
221+ . lifecycle
222+ . iter( )
223+ . all( |( _, cycle) | matches!( cycle, Lifecycle :: Completed ( _) ) ) )
172224 }
173225}
0 commit comments