Skip to content

Commit d83411f

Browse files
authored
Merge pull request #26 from karthik2804/add-topic-name
add topic name metadata calling the guest handler
2 parents a8e0955 + ed6e1c9 commit d83411f

File tree

6 files changed

+33
-16
lines changed

6 files changed

+33
-16
lines changed

examples/mqtt-app/src/lib.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,16 @@
11
use chrono::{DateTime, Utc};
2-
use spin_mqtt_sdk::{mqtt_component, Payload};
2+
use spin_mqtt_sdk::{mqtt_component, Metadata, Payload};
33

44
#[mqtt_component]
5-
async fn handle_message(message: Payload) -> anyhow::Result<()> {
5+
async fn handle_message(message: Payload, metadata: Metadata) -> anyhow::Result<()> {
66
let datetime: DateTime<Utc> = std::time::SystemTime::now().into();
77
let formatted_time = datetime.format("%Y-%m-%d %H:%M:%S.%f").to_string();
88

99
println!(
10-
"{:?} Message received by wasm component: '{}'",
10+
"{:?} Message received by wasm component: '{}' on topic '{}'",
1111
formatted_time,
12-
String::from_utf8_lossy(&message)
12+
String::from_utf8_lossy(&message),
13+
metadata.topic
1314
);
1415
Ok(())
1516
}

sdk/macro/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@ pub fn mqtt_component(_attr: TokenStream, item: TokenStream) -> TokenStream {
1717
#preamble
1818
}
1919
impl self::preamble::Guest for preamble::Mqtt {
20-
fn handle_message(payload: ::spin_mqtt_sdk::Payload) -> ::std::result::Result<(), ::spin_mqtt_sdk::Error> {
20+
fn handle_message(payload: ::spin_mqtt_sdk::Payload, metadata: ::spin_mqtt_sdk::Metadata) -> ::std::result::Result<(), ::spin_mqtt_sdk::Error> {
2121
::spin_mqtt_sdk::executor::run(async move {
22-
match super::#func_name(payload)#await_postfix {
22+
match super::#func_name(payload, metadata)#await_postfix {
2323
::std::result::Result::Ok(()) => ::std::result::Result::Ok(()),
2424
::std::result::Result::Err(e) => {
2525
eprintln!("{}", e);

sdk/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,4 +17,4 @@ pub mod wit {
1717
pub use wit_bindgen;
1818

1919
#[doc(inline)]
20-
pub use wit::spin::mqtt_trigger::spin_mqtt_types::{Error, Payload};
20+
pub use wit::spin::mqtt_trigger::spin_mqtt_types::{Error, Metadata, Payload};

spin-mqtt.wit

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,20 @@ interface spin-mqtt-types {
1515
exactly-once,
1616
}
1717

18+
// metadata associated with the payload
19+
record metadata {
20+
topic: string,
21+
}
22+
1823
/// The message payload.
1924
type payload = list<u8>;
2025
}
2126

2227
world spin-mqtt {
23-
use spin-mqtt-types.{error, payload};
28+
use spin-mqtt-types.{error, metadata, payload};
2429

2530
/// The entrypoint for a Mqtt handler in wasm component
26-
export handle-message: func(message: payload) -> result<_, error>;
31+
export handle-message: func(message: payload, metadata: metadata) -> result<_, error>;
2732
}
2833

2934
world spin-mqtt-sdk {

src/lib.rs

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ wasmtime::component::bindgen!({
1515
async: true,
1616
});
1717

18+
use spin::mqtt_trigger::spin_mqtt_types as mqtt_types;
19+
1820
pub(crate) type RuntimeData = ();
1921
pub(crate) type _Store = spin_core::Store<RuntimeData>;
2022

@@ -115,7 +117,7 @@ impl TriggerExecutor for MqttTrigger {
115117
async fn run(self, config: Self::RunConfig) -> anyhow::Result<()> {
116118
if config.test {
117119
for component in &self.component_configs {
118-
self.handle_mqtt_event(&component.0, b"test message".to_vec())
120+
self.handle_mqtt_event(&component.0, b"test message".to_vec(), "test".to_string())
119121
.await?;
120122
}
121123

@@ -155,15 +157,20 @@ impl TriggerExecutor for MqttTrigger {
155157
}
156158

157159
impl MqttTrigger {
158-
async fn handle_mqtt_event(&self, component_id: &str, message: Vec<u8>) -> anyhow::Result<()> {
160+
async fn handle_mqtt_event(
161+
&self,
162+
component_id: &str,
163+
message: Vec<u8>,
164+
topic: String,
165+
) -> anyhow::Result<()> {
159166
// Load the guest wasm component
160167
let (instance, mut store) = self.engine.prepare_instance(component_id).await?;
161168

162169
// SpinMqtt is auto generated by bindgen as per WIT files referenced above.
163170
let instance = SpinMqtt::new(&mut store, &instance)?;
164171

165172
instance
166-
.call_handle_message(store, &message)
173+
.call_handle_message(store, &message, &mqtt_types::Metadata { topic })
167174
.await?
168175
.map_err(|err| anyhow!("failed to execute guest: {err}"))
169176
}
@@ -199,7 +206,11 @@ impl MqttTrigger {
199206
Ok(Some(msg)) => {
200207
// Handle the received message
201208
_ = self
202-
.handle_mqtt_event(&component_id, msg.payload().to_vec())
209+
.handle_mqtt_event(
210+
&component_id,
211+
msg.payload().to_vec(),
212+
msg.topic().to_owned(),
213+
)
203214
.await
204215
.map_err(|err| tracing::error!("{err}"));
205216
}
Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
use spin_mqtt_sdk::{mqtt_component, Payload};
21
use chrono::{DateTime, Utc};
2+
use spin_mqtt_sdk::{mqtt_component, Metadata, Payload};
33

44
#[mqtt_component]
5-
fn handle_message(message: Payload) -> anyhow::Result<()> {
5+
async fn handle_message(message: Payload, metadata: Metadata) -> anyhow::Result<()> {
66
let datetime: DateTime<Utc> = std::time::SystemTime::now().into();
77
let formatted_time = datetime.format("%Y-%m-%d %H:%M:%S.%f").to_string();
88

@@ -12,4 +12,4 @@ fn handle_message(message: Payload) -> anyhow::Result<()> {
1212
String::from_utf8_lossy(&message)
1313
);
1414
Ok(())
15-
}
15+
}

0 commit comments

Comments
 (0)