Skip to content

Commit 7577b3c

Browse files
committed
Added support for all QoS levels in event polling.
Signed-off-by: Suneet Nangia <[email protected]>
1 parent 0746884 commit 7577b3c

File tree

1 file changed

+8
-3
lines changed
  • crates/outbound-mqtt/src

1 file changed

+8
-3
lines changed

crates/outbound-mqtt/src/lib.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ mod host_component;
33
use std::time::Duration;
44

55
use anyhow::Result;
6-
use rumqttc::AsyncClient;
6+
use rumqttc::{AsyncClient, Event, Incoming, Outgoing, QoS};
77
use spin_core::{async_trait, wasmtime::component::Resource};
88
use spin_world::v2::mqtt::{self as v2, Connection as MqttConnection, Error, Qos};
99

@@ -105,10 +105,15 @@ impl v2::HostConnection for OutboundMqtt {
105105
.await
106106
.map_err(|err| v2::Error::ConnectionFailed(err.to_string()))?;
107107

108-
if let rumqttc::Event::Outgoing(rumqttc::Outgoing::Publish(_)) = event {
109-
return Ok(());
108+
match (qos, event) {
109+
(QoS::AtMostOnce, Event::Outgoing(Outgoing::Publish(_)))
110+
| (QoS::AtLeastOnce, Event::Incoming(Incoming::PubAck(_)))
111+
| (QoS::ExactlyOnce, Event::Outgoing(Outgoing::PubComp(_))) => break,
112+
113+
(_, _) => continue,
110114
}
111115
}
116+
Ok(())
112117
}
113118
.await)
114119
}

0 commit comments

Comments
 (0)