Skip to content

Commit ecd3ec4

Browse files
committed
Rewrite reconnection logic for MQTT client in hopes that it fixes flaky tests
1 parent e6cbf08 commit ecd3ec4

File tree

1 file changed

+19
-71
lines changed

1 file changed

+19
-71
lines changed

src/io/mqtt/client.rs

Lines changed: 19 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use futures::{FutureExt, StreamExt, stream::BoxStream};
55
use paho_mqtt::{self as mqtt};
66
use std::fmt::Debug;
77
use std::time::Duration;
8-
use tracing::{Level, debug, info, instrument, warn};
8+
use tracing::{Level, debug, info, instrument};
99
use uuid::Uuid;
1010

1111
/* An interface for creating the MQTT client that can be used
@@ -171,7 +171,8 @@ mod paho {
171171

172172
let opts = mqtt::ConnectOptionsBuilder::new_v3()
173173
.keep_alive_interval(Duration::from_secs(30))
174-
.clean_session(true)
174+
.clean_session(false)
175+
.automatic_reconnect(Duration::from_millis(500), Duration::from_secs(60))
175176
.finalize();
176177

177178
// Error means requester has gone away - we don't care in that case
@@ -184,85 +185,32 @@ mod paho {
184185
#[instrument(level=Level::INFO, skip(client))]
185186
fn message_stream(
186187
mut client: mqtt::AsyncClient,
187-
max_reconnect_attempts: u32,
188+
_max_reconnect_attempts: u32,
188189
) -> BoxStream<'static, MqttMessage> {
189190
// Note: Important that we call get_stream before the async block, otherwise
190191
// we risk MQTT client receiving messages before stream is registered
191192
let stream = client.get_stream(10);
192193
Box::pin(stream! {
193-
let mut reconnect_attempts = 0;
194194
let mut stream = stream;
195-
196-
loop {
197-
// Inner loop to read from current stream
198-
loop {
199-
match stream.next().await {
200-
Some(msg) => {
201-
match msg {
202-
Some(message) => {
203-
debug!(?message, topic = message.topic(), "Received MQTT message");
204-
let message = MqttMessage::new(
205-
message.topic().to_string(),
206-
message.payload_str().to_string(),
207-
message.qos() as i32,
208-
);
209-
yield message;
210-
reconnect_attempts = 0; // Reset counter on successful message
211-
}
212-
None => {
213-
debug!("MQTT connection lost, will attempt reconnect");
214-
break; // Break inner loop, try reconnect
215-
}
216-
}
217-
}
218-
None => {
219-
break; // Break inner loop, try reconnect
220-
}
221-
}
222-
}
223-
224-
// Stream exhausted, check if we should reconnect
225-
if max_reconnect_attempts == 0 {
226-
warn!("Connection lost. Reconnection disabled (max_reconnect_attempts=0), stopping MQTT stream");
227-
break;
228-
}
229-
230-
warn!("Connection lost. Attempting reconnect...");
231-
reconnect_attempts += 1;
232-
233-
if reconnect_attempts > max_reconnect_attempts {
234-
warn!("Max reconnection attempts ({}) reached, stopping MQTT stream", max_reconnect_attempts);
235-
break;
236-
}
237-
238-
// Add timeout to reconnection attempt (shorter timeout to prevent hanging)
239-
let reconnect_future = client.reconnect();
240-
let timeout_future = smol::Timer::after(Duration::from_millis(500));
241-
242-
futures::select! {
243-
result = FutureExt::fuse(reconnect_future) => {
244-
match result {
245-
Ok(_) => {
246-
info!("MQTT client reconnected successfully after {} attempts", reconnect_attempts);
247-
stream = client.get_stream(10);
248-
continue; // Continue outer loop with new connection
249-
}
250-
Err(err) => {
251-
warn!(?err, attempt = reconnect_attempts, "MQTT client reconnection failed");
252-
// Add small delay before next attempt or termination
253-
smol::Timer::after(Duration::from_millis(100)).await;
254-
break; // Break outer loop, terminate stream
255-
}
256-
}
195+
while let Some(msg) = stream.next().await {
196+
match msg {
197+
Some(message) => {
198+
debug!(?message, topic = message.topic(), "Received MQTT message");
199+
let message = MqttMessage::new(
200+
message.topic().to_string(),
201+
message.payload_str().to_string(),
202+
message.qos() as i32,
203+
);
204+
yield message;
257205
}
258-
_ = FutureExt::fuse(timeout_future) => {
259-
warn!("MQTT reconnection timeout after 500ms, attempt {}/{}", reconnect_attempts, max_reconnect_attempts);
260-
// Add small delay before next attempt or termination
261-
smol::Timer::after(Duration::from_millis(100)).await;
262-
break; // Break outer loop, terminate stream
206+
None => {
207+
debug!("MQTT connection lost, waiting for auto-reconnect");
208+
smol::Timer::after(Duration::from_millis(200)).await;
209+
stream = client.get_stream(10);
263210
}
264211
}
265212
}
213+
debug!("MQTT stream ended permanently. Disconnecting.");
266214
})
267215
}
268216

0 commit comments

Comments
 (0)