Skip to content

Commit 4d1cbe3

Browse files
authored
Merge pull request #2362 from suneetnangia/issue/2335/fix-swallowed-mqtt-errors
Improved rumqttc event loop check.
2 parents 36c08f2 + 7577b3c commit 4d1cbe3

File tree

1 file changed

+17
-10
lines changed
  • crates/outbound-mqtt/src

1 file changed

+17
-10
lines changed

crates/outbound-mqtt/src/lib.rs

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ mod host_component;
33
use std::time::Duration;
44

55
use anyhow::Result;
6-
use rumqttc::AsyncClient;
6+
use rumqttc::{AsyncClient, Event, Incoming, Outgoing, QoS};
77
use spin_core::{async_trait, wasmtime::component::Resource};
88
use spin_world::v2::mqtt::{self as v2, Connection as MqttConnection, Error, Qos};
99

@@ -97,15 +97,22 @@ impl v2::HostConnection for OutboundMqtt {
9797
.await
9898
.map_err(other_error)?;
9999

100-
// Poll EventLoop once to send the message to MQTT broker or capture/throw error
101-
// We may revisit this later to manage long running connections and their issues in the connection pool.
102-
eventloop
103-
.poll()
104-
.await
105-
.map_err(|err: rumqttc::ConnectionError| {
106-
v2::Error::ConnectionFailed(err.to_string())
107-
})?;
108-
100+
// Poll event loop until outgoing publish event is iterated over to send the message to MQTT broker or capture/throw error.
101+
// We may revisit this later to manage long running connections, high throughput use cases and their issues in the connection pool.
102+
loop {
103+
let event = eventloop
104+
.poll()
105+
.await
106+
.map_err(|err| v2::Error::ConnectionFailed(err.to_string()))?;
107+
108+
match (qos, event) {
109+
(QoS::AtMostOnce, Event::Outgoing(Outgoing::Publish(_)))
110+
| (QoS::AtLeastOnce, Event::Incoming(Incoming::PubAck(_)))
111+
| (QoS::ExactlyOnce, Event::Outgoing(Outgoing::PubComp(_))) => break,
112+
113+
(_, _) => continue,
114+
}
115+
}
109116
Ok(())
110117
}
111118
.await)

0 commit comments

Comments
 (0)