Skip to content

Commit 10929f5

Browse files
authored
Merge pull request #14 from karthik2804/fix_program_termination
fix handling of ctrl+c in the trigger
2 parents 6167277 + 61ecaf5 commit 10929f5

File tree

2 files changed

+59
-33
lines changed

2 files changed

+59
-33
lines changed

examples/mqtt-app/spin.toml

100755100644
Lines changed: 30 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,39 @@
1-
spin_manifest_version = "1"
2-
authors = ["Suneet Nangia <[email protected]>"]
3-
description = "Demo app to receive MQTT messages."
1+
spin_manifest_version = 2
2+
3+
[application]
44
name = "mqtt-app"
5-
trigger = { type = "mqtt", address = "mqtt://172.17.0.2:1883", username = "user", password = "password", keep_alive_interval = "30" }
65
version = "0.1.0"
6+
description = "Demo app to receive MQTT messages."
7+
authors = ["Suneet Nangia <[email protected]>"]
78

8-
# This spin app has two components mqtt-c01 and mqtt-c02, which subscribe to the two different topics from the single MQTT server.
9-
[[component]]
10-
id = "mqtt-c01"
11-
source = "target/wasm32-wasi/release/mqtt_app.wasm"
12-
allowed_outbound_hosts = ["mqtt://172.17.0.2:1883"]
13-
[component.trigger]
9+
[application.trigger.mqtt]
10+
address = "mqtt://localhost:1883"
11+
username = "user"
12+
password = "password"
13+
keep_alive_interval = "30"
14+
15+
[[trigger.mqtt]]
16+
id = "trigger-mqtt-c01"
17+
component = "mqtt-c01"
1418
topic = "messages-in01"
1519
qos = "1"
16-
[component.build]
17-
command = "cargo build --target wasm32-wasi --release"
1820

19-
[[component]]
20-
id = "mqtt-c02"
21-
source = "target/wasm32-wasi/release/mqtt_app.wasm"
22-
allowed_outbound_hosts = ["mqtt://172.17.0.2:1883"]
23-
[component.trigger]
21+
[[trigger.mqtt]]
22+
id = "trigger-mqtt-c02"
23+
component = "mqtt-c02"
2424
topic = "messages-in02"
2525
qos = "0"
26-
[component.build]
26+
27+
[component.mqtt-c01]
28+
source = "target/wasm32-wasi/release/mqtt_app.wasm"
29+
allowed_outbound_hosts = ["mqtt://localhost:1883"]
30+
31+
[component.mqtt-c01.build]
32+
command = "cargo build --target wasm32-wasi --release"
33+
34+
[component.mqtt-c02]
35+
source = "target/wasm32-wasi/release/mqtt_app.wasm"
36+
allowed_outbound_hosts = ["mqtt://localhost:1883"]
37+
38+
[component.mqtt-c02.build]
2739
command = "cargo build --target wasm32-wasi --release"

src/lib.rs

Lines changed: 29 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
use anyhow::{anyhow, Context, Ok};
1+
use anyhow::{anyhow, Context};
22
use clap::Args;
3-
use paho_mqtt::Client;
3+
use paho_mqtt::AsyncClient;
44
use serde::{Deserialize, Serialize};
55
use spin_app::MetadataKey;
66
use spin_core::async_trait;
@@ -89,7 +89,7 @@ impl TriggerExecutor for MqttTrigger {
8989
let qos = config.qos.parse::<i32>()?;
9090
let topic = config.topic.clone();
9191
acc.push((component, qos, topic));
92-
Ok(acc)
92+
anyhow::Ok(acc)
9393
})?;
9494

9595
Ok(Self {
@@ -105,7 +105,8 @@ impl TriggerExecutor for MqttTrigger {
105105
async fn run(self, config: Self::RunConfig) -> anyhow::Result<()> {
106106
if config.test {
107107
for component in &self.component_configs {
108-
self.handle_mqtt_event(&component.0, "test message").await?;
108+
self.handle_mqtt_event(&component.0, b"test message".to_vec())
109+
.await?;
109110
}
110111

111112
Ok(())
@@ -144,7 +145,7 @@ impl TriggerExecutor for MqttTrigger {
144145
}
145146

146147
impl MqttTrigger {
147-
async fn handle_mqtt_event(&self, component_id: &str, message: &str) -> anyhow::Result<()> {
148+
async fn handle_mqtt_event(&self, component_id: &str, message: Vec<u8>) -> anyhow::Result<()> {
148149
// Load the guest wasm component
149150
let (instance, mut store) = self.engine.prepare_instance(component_id).await?;
150151

@@ -156,7 +157,7 @@ impl MqttTrigger {
156157
let instance = SpinMqtt::new(&mut store, &instance)?;
157158

158159
instance
159-
.call_handle_message(store, &message.as_bytes().to_vec())
160+
.call_handle_message(store, &message)
160161
.await?
161162
.map_err(|err| anyhow!("failed to execute guest: {err}"))
162163
}
@@ -168,7 +169,7 @@ impl MqttTrigger {
168169
topic: String,
169170
) -> anyhow::Result<()> {
170171
// Receive the messages here from the specific topic in mqtt broker.
171-
let client = Client::new(self.address.clone())?;
172+
let mut client = AsyncClient::new(self.address.clone())?;
172173
let conn_opts = paho_mqtt::ConnectOptionsBuilder::new()
173174
.keep_alive_interval(Duration::from_secs(self.keep_alive_interval))
174175
.user_name(&self.username)
@@ -177,19 +178,32 @@ impl MqttTrigger {
177178

178179
client
179180
.connect(conn_opts)
181+
.await
180182
.context(format!("failed to connect to {:?}", self.address))?;
181183
client
182184
.subscribe(&topic, qos)
185+
.await
183186
.context(format!("failed to subscribe to {topic:?}"))?;
184187

185-
for msg in client.start_consuming() {
186-
if let Some(msg) = msg {
187-
_ = self
188-
.handle_mqtt_event(&component_id, &msg.payload_str())
189-
.await
190-
.map_err(|err| tracing::error!("{err}"));
191-
} else {
192-
continue;
188+
// Should the buffer be bounded/configurable?
189+
let rx = client.get_stream(None);
190+
191+
loop {
192+
match rx.recv().await {
193+
Ok(Some(msg)) => {
194+
// Handle the received message
195+
_ = self
196+
.handle_mqtt_event(&component_id, msg.payload().to_vec())
197+
.await
198+
.map_err(|err| tracing::error!("{err}"));
199+
}
200+
Ok(None) => {
201+
// Todo: Figure out what this case is
202+
}
203+
Err(_) => {
204+
// Channel is empty and closed
205+
break;
206+
}
193207
}
194208
}
195209

0 commit comments

Comments
 (0)