Skip to content

Commit ca6b061

Browse files
committed
Fixed issue around swallowing mqtt publish errors.
Signed-off-by: Suneet Nangia <[email protected]>
1 parent a8d509a commit ca6b061

File tree

1 file changed

+13
-1
lines changed
  • crates/outbound-mqtt/src

1 file changed

+13
-1
lines changed

crates/outbound-mqtt/src/lib.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,12 +88,24 @@ impl v2::HostConnection for OutboundMqtt {
8888
qos: Qos,
8989
) -> Result<Result<(), Error>> {
9090
Ok(async {
91-
let (client, _) = self.get_conn(connection).await.map_err(other_error)?;
91+
let (client, eventloop) = self.get_conn(connection).await.map_err(other_error)?;
9292
let qos = convert_to_mqtt_qos_value(qos);
93+
94+
// Message published to EventLoop (not MQTT Broker)
9395
client
9496
.publish_bytes(topic, qos, false, payload.into())
9597
.await
9698
.map_err(other_error)?;
99+
100+
// Poll EventLoop once to send the message to MQTT broker or capture/throw error
101+
// We may revisit this later to manage long running connections and their issues in the connection pool.
102+
eventloop
103+
.poll()
104+
.await
105+
.map_err(|err: rumqttc::ConnectionError| {
106+
v2::Error::ConnectionFailed(err.to_string())
107+
})?;
108+
97109
Ok(())
98110
}
99111
.await)

0 commit comments

Comments
 (0)