Skip to content

Commit 9b67436

Browse files
committed
Passing binary payloads to flows
Signed-off-by: Didier Wenzek <[email protected]>
1 parent 3c09120 commit 9b67436

File tree

7 files changed

+95
-25
lines changed

7 files changed

+95
-25
lines changed

crates/common/mqtt_channel/src/messages.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,18 @@ impl From<String> for DebugPayload {
7878
}
7979
}
8080

81+
impl From<DebugPayload> for Vec<u8> {
82+
fn from(value: DebugPayload) -> Self {
83+
value.0
84+
}
85+
}
86+
87+
impl From<Vec<u8>> for DebugPayload {
88+
fn from(value: Vec<u8>) -> Self {
89+
DebugPayload(value)
90+
}
91+
}
92+
8193
impl AsRef<Payload> for DebugPayload {
8294
fn as_ref(&self) -> &Payload {
8395
&self.0
@@ -202,6 +214,11 @@ impl MqttMessage {
202214
pub fn payload_bytes(&self) -> &[u8] {
203215
self.payload.as_bytes()
204216
}
217+
218+
/// Split the message into a (topic, payload) pair
219+
pub fn split(self) -> (String, Payload) {
220+
(self.topic.name, self.payload.0)
221+
}
205222
}
206223

207224
impl From<MqttMessage> for Publish {

crates/core/tedge/src/cli/flows/cli.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ impl BuildCommand for TEdgeFlowsCli {
7878
let message = match (topic, payload) {
7979
(Some(topic), Some(payload)) => Some(Message {
8080
topic,
81-
payload,
81+
payload: payload.into_bytes(),
8282
timestamp: None,
8383
}),
8484
(Some(_), None) => Err(anyhow!("Missing sample payload"))?,

crates/core/tedge/src/cli/flows/test.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ fn print(messages: Result<Vec<Message>, FlowError>) {
7878
match messages {
7979
Ok(messages) => {
8080
for message in messages {
81-
println!("[{}] {}", message.topic, message.payload);
81+
println!("{message}");
8282
}
8383
}
8484
Err(err) => {
@@ -104,7 +104,7 @@ fn parse(line: String) -> Result<Option<Message>, Error> {
104104

105105
Ok(Some(Message {
106106
topic,
107-
payload,
107+
payload: payload.into_bytes(),
108108
timestamp: None,
109109
}))
110110
}

crates/extensions/tedge_flows/src/flow.rs

Lines changed: 35 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,10 @@ pub struct DateTime {
3838
pub nanoseconds: u32,
3939
}
4040

41-
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize, Eq, PartialEq)]
41+
#[derive(Clone, serde::Deserialize, serde::Serialize, Eq, PartialEq)]
4242
pub struct Message {
4343
pub topic: String,
44-
pub payload: String,
44+
pub payload: Vec<u8>,
4545
pub timestamp: Option<DateTime>,
4646
}
4747

@@ -222,7 +222,16 @@ impl Message {
222222
pub(crate) fn new(topic: &str, payload: &str) -> Self {
223223
Message {
224224
topic: topic.to_string(),
225-
payload: payload.to_string(),
225+
payload: payload.to_string().into_bytes(),
226+
timestamp: Some(DateTime::now()),
227+
}
228+
}
229+
230+
#[cfg(test)]
231+
pub(crate) fn new_binary(topic: &str, payload: impl Into<Vec<u8>>) -> Self {
232+
Message {
233+
topic: topic.to_string(),
234+
payload: payload.into(),
226235
timestamp: Some(DateTime::now()),
227236
}
228237
}
@@ -234,17 +243,35 @@ impl Message {
234243
json!({"topic": self.topic, "payload": self.payload, "timestamp": null})
235244
}
236245
}
246+
247+
pub fn payload_str(&self) -> Option<&str> {
248+
std::str::from_utf8(&self.payload).ok()
249+
}
250+
}
251+
252+
impl std::fmt::Display for Message {
253+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
254+
'['.fmt(f)?;
255+
self.topic.fmt(f)?;
256+
"] ".fmt(f)?;
257+
match &self.payload_str() {
258+
Some(str) => str.fmt(f),
259+
None => format!("{:?}", self.payload).fmt(f),
260+
}
261+
}
262+
}
263+
264+
impl std::fmt::Debug for Message {
265+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
266+
std::fmt::Display::fmt(self, f)
267+
}
237268
}
238269

239270
impl TryFrom<MqttMessage> for Message {
240271
type Error = FlowError;
241272

242273
fn try_from(message: MqttMessage) -> Result<Self, Self::Error> {
243-
let topic = message.topic.to_string();
244-
let payload = message
245-
.payload_str()
246-
.map_err(|_| FlowError::UnsupportedMessage("Not an UTF8 payload".to_string()))?
247-
.to_string();
274+
let (topic, payload) = message.split();
248275
Ok(Message {
249276
topic,
250277
payload,

crates/extensions/tedge_flows/src/js_lib/text_encoder.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,14 @@ impl<'js> TextEncoder {
2929

3030
pub fn encode(&self, ctx: Ctx<'js>, text: Value<'js>) -> Result<TypedArray<'js, u8>> {
3131
let string = match text.as_string() {
32-
None => "".to_string(),
32+
None => {
33+
if let Some(object) = text.as_object() {
34+
if let Some(bytes) = object.as_typed_array::<u8>() {
35+
return Ok(bytes.clone());
36+
}
37+
}
38+
"".to_string()
39+
}
3340
Some(js_string) => js_string.to_string()?,
3441
};
3542
TypedArray::new(ctx.clone(), string.as_bytes())

crates/extensions/tedge_flows/src/js_script.rs

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ impl JsScript {
7777
timestamp: &DateTime,
7878
message: &Message,
7979
) -> Result<Vec<Message>, FlowError> {
80-
debug!(target: "flows", "{}: onMessage({timestamp:?}, {message:?})", self.module_name());
80+
debug!(target: "flows", "{}: onMessage({timestamp:?}, {message})", self.module_name());
8181
if self.no_js_on_message_fun {
8282
return Ok(vec![message.clone()]);
8383
}
@@ -105,7 +105,7 @@ impl JsScript {
105105
js: &JsRuntime,
106106
message: &Message,
107107
) -> Result<(), FlowError> {
108-
debug!(target: "flows", "{}: onConfigUpdate({message:?})", self.module_name());
108+
debug!(target: "flows", "{}: onConfigUpdate({message})", self.module_name());
109109
if self.no_js_on_config_update_fun {
110110
return Ok(());
111111
}
@@ -354,13 +354,16 @@ export function onMessage(message) {
354354
let js = r#"
355355
export async function onMessage(message, config) {
356356
const utf8decoder = new TextDecoder();
357-
const encodedText = new Uint8Array([240, 159, 146, 150]);
358-
return [{topic:"decoded", payload: utf8decoder.decode(encodedText)}];
357+
const encodedText = message.raw_payload;
358+
console.log(encodedText);
359+
const decodedText = utf8decoder.decode(encodedText);
360+
console.log(decodedText);
361+
return [{topic:"decoded", payload: decodedText}];
359362
}
360363
"#;
361364
let (runtime, script) = runtime_with(js).await;
362365

363-
let input = Message::new("encoded", "content");
366+
let input = Message::new_binary("encoded", [240, 159, 146, 150]);
364367
let mut output = Message::new("decoded", "💖");
365368
output.timestamp = None;
366369
assert_eq!(
@@ -377,14 +380,16 @@ export async function onMessage(message, config) {
377380
let js = r#"
378381
export async function onMessage(message, config) {
379382
const utf8encoder = new TextEncoder();
383+
console.log(message.payload);
380384
const encodedText = utf8encoder.encode(message.payload);
381-
return [{topic:"encoded", payload: `[${encodedText}]`}];
385+
console.log(encodedText);
386+
return [{topic:"encoded", payload: encodedText}];
382387
}
383388
"#;
384389
let (runtime, script) = runtime_with(js).await;
385390

386391
let input = Message::new("decoded", "💖");
387-
let mut output = Message::new("encoded", "[240,159,146,150]");
392+
let mut output = Message::new_binary("encoded", [240, 159, 146, 150]);
388393
output.timestamp = None;
389394
assert_eq!(
390395
script

crates/extensions/tedge_flows/src/js_value.rs

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,15 @@ impl JsonValue {
5353

5454
impl From<Message> for JsonValue {
5555
fn from(value: Message) -> Self {
56+
let raw_payload = JsonValue::Bytes(value.payload.clone());
57+
let payload = match String::from_utf8(value.payload) {
58+
Ok(utf8) => JsonValue::string(utf8),
59+
Err(_) => JsonValue::Null,
60+
};
5661
JsonValue::object([
5762
("topic", JsonValue::string(value.topic)),
58-
("payload", JsonValue::string(value.payload)),
63+
("payload", payload),
64+
("raw_payload", raw_payload),
5965
("timestamp", JsonValue::option(value.timestamp)),
6066
])
6167
}
@@ -114,8 +120,10 @@ impl TryFrom<BTreeMap<String, JsonValue>> for Message {
114120
let Some(JsonValue::String(topic)) = value.get("topic") else {
115121
return Err(anyhow::anyhow!("Missing message topic").into());
116122
};
117-
let Some(JsonValue::String(payload)) = value.get("payload") else {
118-
return Err(anyhow::anyhow!("Missing message payload").into());
123+
let payload = match value.get("payload") {
124+
Some(JsonValue::String(payload)) => payload.to_owned().into_bytes(),
125+
Some(JsonValue::Bytes(payload)) => payload.to_owned(),
126+
_ => return Err(anyhow::anyhow!("Missing message payload").into()),
119127
};
120128
let timestamp = value
121129
.get("timestamp")
@@ -124,7 +132,7 @@ impl TryFrom<BTreeMap<String, JsonValue>> for Message {
124132

125133
Ok(Message {
126134
topic: topic.to_owned(),
127-
payload: payload.to_owned(),
135+
payload,
128136
timestamp,
129137
})
130138
}
@@ -225,8 +233,8 @@ impl<'js> IntoJs<'js> for JsonValueRef<'_> {
225233
Ok(string.into_value())
226234
}
227235
JsonValue::Bytes(value) => {
228-
let string = rquickjs::TypedArray::new(ctx.clone(), value.clone())?;
229-
Ok(string.into_value())
236+
let bytes = rquickjs::TypedArray::new(ctx.clone(), value.clone())?;
237+
Ok(bytes.into_value())
230238
}
231239
JsonValue::Array(values) => {
232240
let array = rquickjs::Array::new(ctx.clone())?;
@@ -271,6 +279,12 @@ impl JsonValue {
271279
.unwrap_or(JsonValue::Null);
272280
return Ok(js_n);
273281
}
282+
if let Some(object) = value.as_object() {
283+
if let Some(bytes) = object.as_typed_array::<u8>() {
284+
let bytes = bytes.as_bytes().unwrap_or_default().to_vec();
285+
return Ok(JsonValue::Bytes(bytes));
286+
}
287+
}
274288
if let Some(string) = value.as_string() {
275289
return Ok(JsonValue::String(string.to_string()?));
276290
}

0 commit comments

Comments
 (0)