Skip to content

Commit 227a4ee

Browse files
Merge pull request #4001 from didier-wenzek/refactor/deprecate-flow-message-string-payload
breaking change: message payloads are passed as binary to flow scripts
2 parents 714f374 + aeb125c commit 227a4ee

File tree

17 files changed

+86
-48
lines changed

17 files changed

+86
-48
lines changed

crates/extensions/tedge_flows/src/js_script.rs

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -184,9 +184,10 @@ mod tests {
184184
async fn collectd_script() {
185185
let js = r#"
186186
export function onMessage(message, context) {
187-
const { topic = "topic/not/set" } = context.config;
187+
const { topic = "topic/not/set" } = context.config
188+
const td = new globalThis.TextDecoder()
188189
let groups = message.topic.split( '/')
189-
let data = message.payload.split(':')
190+
let data = td.decode(message.payload).split(':')
190191
191192
let group = groups[2]
192193
let measurement = groups[3]
@@ -335,7 +336,7 @@ export function onMessage(message) {
335336
let js = r#"
336337
export async function onMessage(message) {
337338
const utf8decoder = new TextDecoder();
338-
const encodedText = message.raw_payload;
339+
const encodedText = message.payload;
339340
console.log(encodedText);
340341
const decodedText = utf8decoder.decode(encodedText);
341342
console.log(decodedText);
@@ -360,8 +361,10 @@ export async function onMessage(message) {
360361
let js = r#"
361362
export async function onMessage(message) {
362363
const utf8encoder = new TextEncoder();
363-
console.log(message.payload);
364-
const encodedText = utf8encoder.encode(message.payload);
364+
const utf8decoder = new TextDecoder();
365+
const payload = utf8decoder.decode(message.payload);
366+
console.log(payload);
367+
const encodedText = utf8encoder.encode(payload);
365368
console.log(encodedText);
366369
return [{topic:"encoded", payload: encodedText}];
367370
}
@@ -382,10 +385,11 @@ export async function onMessage(message) {
382385
#[tokio::test]
383386
async fn decode_utf8_with_bom_and_invalid_chars() {
384387
let js = r#"
388+
const utf8 = new TextDecoder();
389+
385390
export async function onMessage(message) {
386-
const utf8decoder = new TextDecoder();
387-
const encodedText = message.raw_payload;
388-
const decodedText = utf8decoder.decode(encodedText);
391+
const encodedText = message.payload;
392+
const decodedText = utf8.decode(encodedText);
389393
return [{topic:"decoded", payload: decodedText}];
390394
}
391395
"#;
@@ -408,10 +412,12 @@ export async function onMessage(message) {
408412
let js = r#"
409413
export async function onMessage(message) {
410414
const utf8encoder = new TextEncoder();
415+
const utf8decoder = new TextDecoder();
416+
const payload = utf8decoder.decode(message.payload);
411417
const u8array = new Uint8Array(8);
412-
const result = utf8encoder.encodeInto(message.payload, u8array);
418+
const result = utf8encoder.encodeInto(payload, u8array);
413419
console.log(result);
414-
utf8encoder.encodeInto(message.payload, u8array.subarray(4));
420+
utf8encoder.encodeInto(payload, u8array.subarray(4));
415421
return [{topic:"encoded", payload: u8array}];
416422
}
417423
"#;
@@ -435,7 +441,7 @@ export async function onMessage(message) {
435441
const te = new globalThis.TextEncoder();
436442
const td = new globalThis.TextDecoder();
437443
438-
const encodedText = message.raw_payload;
444+
const encodedText = message.payload;
439445
const decodedText = td.decode(encodedText);
440446
const finalPayload = te.encode(decodedText + decodedText);
441447
return [{topic:"decoded", payload: finalPayload}];
@@ -458,7 +464,7 @@ export async function onMessage(message) {
458464
async fn reading_raw_integers() {
459465
let js = r#"
460466
export async function onMessage(message) {
461-
const measurements = new Uint32Array(message.raw_payload.buffer);
467+
const measurements = new Uint32Array(message.payload.buffer);
462468
463469
const tedge_json = {
464470
time: measurements[0],

crates/extensions/tedge_flows/src/js_value.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -115,14 +115,10 @@ impl JsonValue {
115115
impl From<Message> for JsonValue {
116116
fn from(value: Message) -> Self {
117117
let raw_payload = JsonValue::Bytes(value.payload.clone());
118-
let payload = match String::from_utf8(value.payload) {
119-
Ok(utf8) => JsonValue::string(utf8),
120-
Err(_) => JsonValue::Null,
121-
};
118+
122119
JsonValue::object([
123120
("topic", JsonValue::string(value.topic)),
124-
("payload", payload),
125-
("raw_payload", raw_payload),
121+
("payload", raw_payload),
126122
("time", JsonValue::option(value.timestamp)),
127123
])
128124
}

docs/src/references/mappers/flows.md

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -74,18 +74,30 @@ interface FlowStep {
7474
}
7575
```
7676

77-
A message contains the message topic and payload as well as a processing timestamp.
78-
The bytes of the raw message payload are also accessible as an array of unsigned bytes:
77+
A message has three attributes: a `topic`, a `payload` and a processing timestamp.
7978

8079
```ts
8180
type Message = {
8281
topic: string,
83-
payload: string,
84-
raw_payload: Uint8Array,
82+
payload: Uint8Array,
8583
time: Date
8684
}
8785
```
8886
87+
:::note
88+
The message `payload` is an array of unsigned bytes that has to be explicitly converted to a string when appropriate:
89+
90+
```js
91+
const utf8 = new TextDecoder();
92+
93+
export function onMessage(message) {
94+
let string_payload = utf8.decode(message.payload)
95+
let json_payload = JSON.parse(string_payload)
96+
// ..
97+
}
98+
```
99+
:::
100+
89101
### Context
90102

91103
The `context` object passed to `onMessage()` and `onInterval()` gives scripts and flows a way to share data.
@@ -126,9 +138,9 @@ The `context.config` is an object freely defined by the step module, to provide
126138
127139
The `onMessage` function is called for each message to be transformed
128140
- The arguments passed to the function are:
129-
- The message `{ topic: string, payload: string, raw_payload: Uint8Array, time: Date }`
141+
- The message `{ topic: string, payload: Uint8Array, time: Date }`
130142
- A context object with the config and state
131-
- The function is expected to return zero, one or many transformed messages `[{ topic: string, payload: string }]`
143+
- The function is expected to return zero, one or many transformed messages `[{ topic: string, payload: Uint8Array | string }]`
132144
- An exception can be thrown if the input message cannot be transformed.
133145
134146
A flow script can also export a `onInterval` function
Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,18 @@
1+
const utf8 = new TextDecoder()
2+
13
export function onMessage(message, context) {
24
let topic = message.topic.split( '/')
35
if (topic.length < 5) {
46
throw new Error("Not a thin-edge entity registration");
57
}
68
let entity_id = `${topic[1]}/${topic[2]}/${topic[3]}/${topic[4]}`
79

8-
let entity = JSON.parse(message.payload)
10+
let entity = JSON.parse(utf8.decode(message.payload))
911
context.mapper.set(entity_id, entity)
1012

1113
console.log("Entity metadata", entity_id, entity)
1214
return [{
1315
topic: "te/infos",
1416
payload: `New entity: ${entity.name}`
1517
}]
16-
}
18+
}

tests/RobotFramework/tests/azure/custom-az-v2.js

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
1+
const utf8 = new TextDecoder()
2+
13
export function onMessage(message, context) {
24
let topic = message.topic.split('/')
35
let mea_type = topic[6];
46

5-
let mea = JSON.parse(message.payload)
7+
let mea = JSON.parse(utf8.decode(message.payload))
68
if (mea_type) {
79
mea.type = mea_type
810
}
@@ -21,4 +23,4 @@ export function onMessage(message, context) {
2123
topic: message.topic,
2224
payload: JSON.stringify(mea)
2325
}]
24-
}
26+
}
Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
1+
const utf8 = new TextDecoder()
2+
13
export function onMessage(message) {
24
let topic = message.topic.split('/')
35
let mea_type = topic[6];
46

5-
let mea = JSON.parse(message.payload)
7+
let mea = JSON.parse(utf8.decode(message.payload))
68
if (mea_type) {
79
mea.type = mea_type
810
}
@@ -11,4 +13,4 @@ export function onMessage(message) {
1113
topic: message.topic,
1214
payload: JSON.stringify(mea)
1315
}]
14-
}
16+
}

tests/RobotFramework/tests/cumulocity/flows/collectd.js

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
1+
const utf8 = new TextDecoder()
2+
13
export function onMessage(message, context) {
24
const { topic = "te/device/main///m/collectd" } = context.config;
35
let groups = message.topic.split('/')
4-
let data = message.payload.split(':')
6+
let data = utf8.decode(message.payload).split(':')
57

68
if (groups.length < 4) {
79
throw new Error("Not a collectd topic");
@@ -20,4 +22,4 @@ export function onMessage(message, context) {
2022
topic: topic,
2123
payload: `{"time": ${time}, "${group}": {"${measurement}": ${value}}}`
2224
}]
23-
}
25+
}

tests/RobotFramework/tests/cumulocity/flows/custom-measurements.js

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
1+
const utf8 = new TextDecoder()
2+
13
export function onMessage(message, context) {
24
const topic = message.topic.split('/');
35
const name = topic[6];
4-
const value = JSON.parse(message.payload);
6+
const value = JSON.parse(utf8.decode(message.payload));
57
if (!Number.isFinite(value)) {
68
throw new Error("Invalid payload. Only numbers are accepted");
79
}
@@ -31,4 +33,4 @@ export function onMessage(message, context) {
3133
topic: "c8y/measurement/measurements/create",
3234
payload: JSON.stringify(c8y_msg),
3335
}]
34-
}
36+
}

tests/RobotFramework/tests/tedge_flows/flows/average.js

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,11 @@ class State {
77
static agg_for_topic = {}
88
}
99

10+
const utf8 = new TextDecoder();
11+
1012
export function onMessage (message) {
1113
let topic = message.topic
12-
let payload = JSON.parse(message.payload)
14+
let payload = JSON.parse(utf8.decode(message.payload))
1315
let agg_payload = State.agg_for_topic[topic]
1416
if (agg_payload) {
1517
for (let [k, v] of Object.entries(payload)) {
@@ -105,4 +107,4 @@ export function onInterval() {
105107

106108
State.agg_for_topic = {}
107109
return messages
108-
}
110+
}

tests/RobotFramework/tests/tedge_flows/flows/collectd-to-te.js

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
1+
const utf8 = new TextDecoder();
2+
13
export function onMessage(message, context) {
24
const { topic = "te/device/main///m/collectd" } = context.config;
35
let groups = message.topic.split('/')
4-
let data = message.payload.split(':')
6+
let data = utf8.decode(message.payload).split(':')
57

68
if (groups.length < 4) {
79
throw new Error("Not a collectd topic");
@@ -20,4 +22,4 @@ export function onMessage(message, context) {
2022
topic: topic,
2123
payload: `{"time": ${time}, "${group}": {"${measurement}": ${value}}}`
2224
}]
23-
}
25+
}

0 commit comments

Comments
 (0)