@@ -6,7 +6,6 @@ use crate::PubChannel;
66use crate :: SubChannel ;
77use futures:: channel:: mpsc;
88use futures:: channel:: oneshot;
9- use futures:: future:: Either ;
109use futures:: SinkExt ;
1110use futures:: StreamExt ;
1211use log:: error;
@@ -227,15 +226,15 @@ impl Connection {
227226 triggered_disconnect = true ;
228227 }
229228
230- let next_event = event_loop. poll ( ) ;
231- let next_permit = permits. clone ( ) . acquire_owned ( ) ;
232- tokio:: pin!( next_event) ;
233- tokio:: pin!( next_permit) ;
229+ let event = tokio:: select! {
230+ // If there is an event, we need to process that first
231+ // Otherwise we risk shutting down early
232+ // e.g. a `Publish` request from the sender is not "inflight"
233+ // but will immediately be returned by `event_loop.poll()`
234+ biased;
234235
235- let event = futures:: future:: select ( next_event. as_mut ( ) , next_permit. as_mut ( ) ) . await ;
236- let event = match event {
237- Either :: Left ( ( event, _) ) => event,
238- Either :: Right ( ( permit, _) ) => {
236+ event = event_loop. poll( ) => event,
237+ permit = permits. clone( ) . acquire_owned( ) => {
239238 // The `sender_loop` has now concluded
240239 disconnect_permit = Some ( permit. unwrap( ) ) ;
241240 continue ;
0 commit comments