Skip to content

Commit 529e71b

Browse files
committed
Improve explanation of shutdown process
1 parent 51408a0 commit 529e71b

File tree

1 file changed

+13
-6
lines changed

1 file changed

+13
-6
lines changed

crates/common/mqtt_channel/src/connection.rs

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -215,24 +215,28 @@ impl Connection {
215215
let mut disconnect_permit = None;
216216

217217
loop {
218+
// Check if we are ready to disconnect. Due to ownership of the
219+
// event loop, this needs to be done before we call
220+
// `event_loop.poll()`
218221
let remaining_events_empty = event_loop.state.inflight() == 0;
219222
if disconnect_permit.is_some() && !triggered_disconnect && remaining_events_empty {
223+
// `sender_loop` is not running and we have no remaining
224+
// publishes to process
220225
let client = mqtt_client.clone();
221226
tokio::spawn(async move { client.disconnect().await });
222-
// tokio::fs::write("/tmp/thing.txt",format!("{:#?}", &events.event_loop.state)).await.unwrap();
223227
triggered_disconnect = true;
224228
}
229+
225230
let next_event = event_loop.poll();
226231
let next_permit = permits.clone().acquire_owned();
227232
tokio::pin!(next_event);
228233
tokio::pin!(next_permit);
234+
229235
let event = futures::future::select(next_event.as_mut(), next_permit.as_mut()).await;
230236
let event = match event {
231-
Either::Left((event, _)) => {
232-
disconnect_permit.take();
233-
event
234-
}
237+
Either::Left((event, _)) => event,
235238
Either::Right((permit, _)) => {
239+
// The `sender_loop` has now concluded
236240
disconnect_permit = Some(permit.unwrap());
237241
continue;
238242
}
@@ -311,7 +315,7 @@ impl Connection {
311315
mut messages_receiver: mpsc::UnboundedReceiver<MqttMessage>,
312316
mut error_sender: mpsc::UnboundedSender<MqttError>,
313317
last_will: Option<MqttMessage>,
314-
_guard: OwnedSemaphorePermit,
318+
_disconnect_permit: OwnedSemaphorePermit,
315319
) {
316320
while let Some(message) = messages_receiver.next().await {
317321
let payload = Vec::from(message.payload_bytes());
@@ -331,6 +335,9 @@ impl Connection {
331335
.publish(last_will.topic, last_will.qos, last_will.retain, payload)
332336
.await;
333337
}
338+
339+
// At this point, `_disconnect_permit` is dropped
340+
// This allows `receiver_loop` acquire a permit and commence the shutdown process
334341
}
335342

336343
pub(crate) async fn do_pause() {

0 commit comments

Comments
 (0)