Skip to content

Commit e513fcc

Browse files
committed
Add base64 encoding support to tedge flows test
Signed-off-by: Didier Wenzek <[email protected]>
1 parent 14f42f5 commit e513fcc

File tree

6 files changed

+89
-34
lines changed

6 files changed

+89
-34
lines changed

crates/core/tedge/src/cli/flows/cli.rs

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use crate::cli::flows::decode_message;
12
use crate::cli::flows::list::ListCommand;
23
use crate::cli::flows::test::TestCommand;
34
use crate::command::BuildCommand;
@@ -8,7 +9,6 @@ use anyhow::Context;
89
use anyhow::Error;
910
use std::path::PathBuf;
1011
use tedge_config::TEdgeConfig;
11-
use tedge_flows::flow::Message;
1212
use tedge_flows::MessageProcessor;
1313

1414
#[derive(clap::Subcommand, Debug)]
@@ -46,6 +46,14 @@ pub enum TEdgeFlowsCli {
4646
#[clap(long = "final-on-interval")]
4747
final_on_interval: bool,
4848

49+
/// The input payloads are base64 encoded and have to be decoded first
50+
#[clap(long = "base64-input")]
51+
base64_input: bool,
52+
53+
/// The output payloads have to be base64 encoded before being displayed
54+
#[clap(long = "base64-output")]
55+
base64_output: bool,
56+
4957
/// Topic of the message sample
5058
///
5159
/// If none is provided, messages are read from stdin expecting a line per message:
@@ -71,16 +79,16 @@ impl BuildCommand for TEdgeFlowsCli {
7179
flows_dir,
7280
flow,
7381
final_on_interval,
82+
base64_input,
83+
base64_output,
7484
topic,
7585
payload,
7686
} => {
7787
let flows_dir = flows_dir.unwrap_or_else(|| Self::default_flows_dir(config));
7888
let message = match (topic, payload) {
79-
(Some(topic), Some(payload)) => Some(Message {
80-
topic,
81-
payload: payload.into_bytes(),
82-
timestamp: None,
83-
}),
89+
(Some(topic), Some(payload)) => {
90+
Some(decode_message(topic, payload, base64_input)?)
91+
}
8492
(Some(_), None) => Err(anyhow!("Missing sample payload"))?,
8593
(None, Some(_)) => Err(anyhow!("Missing sample topic"))?,
8694
(None, None) => None,
@@ -90,6 +98,8 @@ impl BuildCommand for TEdgeFlowsCli {
9098
flow,
9199
message,
92100
final_on_interval,
101+
base64_input,
102+
base64_output,
93103
}
94104
.into_boxed())
95105
}

crates/core/tedge/src/cli/flows/mod.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,27 @@ mod cli;
22
mod list;
33
mod test;
44

5+
use base64::prelude::*;
56
pub use cli::TEdgeFlowsCli;
7+
use tedge_flows::flow::Message;
8+
9+
fn decode_message(
10+
topic: String,
11+
payload: String,
12+
base64_payload: bool,
13+
) -> Result<Message, anyhow::Error> {
14+
let payload = if base64_payload {
15+
BASE64_STANDARD.decode(payload.as_bytes())?
16+
} else {
17+
payload.into_bytes()
18+
};
19+
Ok(Message::new(topic, payload))
20+
}
21+
22+
fn encode_message(mut message: Message, base64_payload: bool) -> Message {
23+
if base64_payload {
24+
message.payload = BASE64_STANDARD.encode(message.payload).into_bytes();
25+
};
26+
27+
message
28+
}

crates/core/tedge/src/cli/flows/test.rs

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use crate::cli::flows::decode_message;
2+
use crate::cli::flows::encode_message;
13
use crate::cli::flows::TEdgeFlowsCli;
24
use crate::command::Command;
35
use crate::log::MaybeFancy;
@@ -15,6 +17,8 @@ pub struct TestCommand {
1517
pub flow: Option<PathBuf>,
1618
pub message: Option<Message>,
1719
pub final_on_interval: bool,
20+
pub base64_input: bool,
21+
pub base64_output: bool,
1822
}
1923

2024
#[async_trait::async_trait]
@@ -36,7 +40,7 @@ impl Command for TestCommand {
3640
self.process(&mut processor, message, &timestamp).await;
3741
} else {
3842
let mut stdin = BufReader::new(tokio::io::stdin());
39-
while let Some(message) = next_message(&mut stdin).await {
43+
while let Some(message) = next_message(self.base64_input, &mut stdin).await {
4044
let timestamp = DateTime::now();
4145
self.process(&mut processor, &message, &timestamp).await;
4246
}
@@ -61,7 +65,7 @@ impl TestCommand {
6165
.await
6266
.into_iter()
6367
.map(|(_, v)| v)
64-
.for_each(print)
68+
.for_each(|msg| print(self.base64_output, msg))
6569
}
6670

6771
async fn tick(&self, processor: &mut MessageProcessor, timestamp: &DateTime) {
@@ -70,15 +74,15 @@ impl TestCommand {
7074
.await
7175
.into_iter()
7276
.map(|(_, v)| v)
73-
.for_each(print)
77+
.for_each(|msg| print(self.base64_output, msg))
7478
}
7579
}
7680

77-
fn print(messages: Result<Vec<Message>, FlowError>) {
81+
fn print(base64_output: bool, messages: Result<Vec<Message>, FlowError>) {
7882
match messages {
7983
Ok(messages) => {
8084
for message in messages {
81-
println!("{message}");
85+
println!("{}", encode_message(message, base64_output));
8286
}
8387
}
8488
Err(err) => {
@@ -87,7 +91,7 @@ fn print(messages: Result<Vec<Message>, FlowError>) {
8791
}
8892
}
8993

90-
fn parse(line: String) -> Result<Option<Message>, Error> {
94+
fn parse(base64_input: bool, line: String) -> Result<Option<Message>, Error> {
9195
let line = line.trim();
9296
if line.is_empty() {
9397
return Ok(None);
@@ -101,12 +105,9 @@ fn parse(line: String) -> Result<Option<Message>, Error> {
101105

102106
let topic = line[1..closing_bracket].to_string();
103107
let payload = line[closing_bracket + 1..].to_string();
108+
let message = decode_message(topic, payload, base64_input)?;
104109

105-
Ok(Some(Message {
106-
topic,
107-
payload: payload.into_bytes(),
108-
timestamp: None,
109-
}))
110+
Ok(Some(message))
110111
}
111112

112113
async fn next_line(input: &mut BufReader<Stdin>) -> Option<String> {
@@ -127,9 +128,9 @@ async fn next_line(input: &mut BufReader<Stdin>) -> Option<String> {
127128
}
128129
}
129130
}
130-
async fn next_message(input: &mut BufReader<Stdin>) -> Option<Message> {
131+
async fn next_message(base64_input: bool, input: &mut BufReader<Stdin>) -> Option<Message> {
131132
let line = next_line(input).await?;
132-
match parse(line) {
133+
match parse(base64_input, line) {
133134
Ok(message) => message,
134135
Err(err) => {
135136
eprintln!("Fail to parse input message {}", err);

crates/extensions/tedge_flows/src/flow.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -218,21 +218,26 @@ impl TryFrom<OffsetDateTime> for DateTime {
218218
}
219219

220220
impl Message {
221-
#[cfg(test)]
222-
pub(crate) fn new(topic: &str, payload: &str) -> Self {
221+
pub fn new(topic: impl ToString, payload: impl Into<Vec<u8>>) -> Self {
223222
Message {
224223
topic: topic.to_string(),
225-
payload: payload.to_string().into_bytes(),
226-
timestamp: Some(DateTime::now()),
224+
payload: payload.into(),
225+
timestamp: None,
227226
}
228227
}
229228

229+
#[cfg(test)]
230+
pub fn sent_now(mut self) -> Self {
231+
self.timestamp = Some(DateTime::now());
232+
self
233+
}
234+
230235
#[cfg(test)]
231236
pub(crate) fn new_binary(topic: &str, payload: impl Into<Vec<u8>>) -> Self {
232237
Message {
233238
topic: topic.to_string(),
234239
payload: payload.into(),
235-
timestamp: Some(DateTime::now()),
240+
timestamp: None,
236241
}
237242
}
238243

crates/extensions/tedge_flows/src/js_script.rs

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ mod tests {
155155
let js = "export function onMessage(msg) { return [msg]; };";
156156
let (runtime, script) = runtime_with(js).await;
157157

158-
let input = Message::new("te/main/device///m/", "hello world");
158+
let input = Message::new("te/main/device///m/", "hello world").sent_now();
159159
let output = input.clone();
160160
assert_eq!(
161161
script
@@ -171,7 +171,7 @@ mod tests {
171171
let js = "export function onMessage(msg) { return msg; };";
172172
let (runtime, script) = runtime_with(js).await;
173173

174-
let input = Message::new("te/main/device///m/", "hello world");
174+
let input = Message::new("te/main/device///m/", "hello world").sent_now();
175175
let output = input.clone();
176176
assert_eq!(
177177
script
@@ -255,11 +255,10 @@ export function onMessage(message, config) {
255255
"collectd/h/memory/percent-used",
256256
"1748440192.104:19.9289468288182",
257257
);
258-
let mut output = Message::new(
258+
let output = Message::new(
259259
"te/device/main///m/collectd",
260260
r#"{"time": 1748440192.104, "memory": {"percent-used": 19.9289468288182}}"#,
261261
);
262-
output.timestamp = None;
263262
assert_eq!(
264263
script
265264
.on_message(&runtime, &DateTime::now(), &input)
@@ -279,8 +278,7 @@ export async function onMessage(message, config) {
279278
let (runtime, script) = runtime_with(js).await;
280279

281280
let input = Message::new("dummy", "content");
282-
let mut output = Message::new("foo/bar", r#"{foo:"bar"}"#);
283-
output.timestamp = None;
281+
let output = Message::new("foo/bar", r#"{foo:"bar"}"#);
284282
assert_eq!(
285283
script
286284
.on_message(&runtime, &DateTime::now(), &input)
@@ -364,8 +362,7 @@ export async function onMessage(message, config) {
364362
let (runtime, script) = runtime_with(js).await;
365363

366364
let input = Message::new_binary("encoded", [240, 159, 146, 150]);
367-
let mut output = Message::new("decoded", "💖");
368-
output.timestamp = None;
365+
let output = Message::new("decoded", "💖");
369366
assert_eq!(
370367
script
371368
.on_message(&runtime, &DateTime::now(), &input)
@@ -389,8 +386,7 @@ export async function onMessage(message, config) {
389386
let (runtime, script) = runtime_with(js).await;
390387

391388
let input = Message::new("decoded", "💖");
392-
let mut output = Message::new_binary("encoded", [240, 159, 146, 150]);
393-
output.timestamp = None;
389+
let output = Message::new_binary("encoded", [240, 159, 146, 150]);
394390
assert_eq!(
395391
script
396392
.on_message(&runtime, &DateTime::now(), &input)

tests/RobotFramework/tests/tedge_flows/tedge_flows.robot

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,26 @@ Translate complex tedge json to c8y json
3838
... ${transformed_msg}
3939
... [c8y/measurement/measurements/create] {"type":"environment","time":"2025-06-27T08:11:05.301804125Z","temperature":{"temperature":258},"location":{"latitude":32.54,"longitude":-117.67,"altitude":98.6},"pressure":{"pressure":98}}
4040

41+
Using base64 to encode tedge flows input
42+
${encoded_input} Execute Command
43+
... echo -n '{"time":"2025-06-27T08:11:05.301804125Z", "temperature": 258}' | base64 --wrap\=0
44+
${transformed_msg} Execute Command
45+
... tedge flows test --base64-input te/device/main///m/env ${encoded_input}
46+
... strip=True
47+
Should Be Equal
48+
... ${transformed_msg}
49+
... [c8y/measurement/measurements/create] {"type":"env","time":"2025-06-27T08:11:05.301804125Z","temperature":{"temperature":258}}
50+
51+
Using base64 to encode tedge flows output
52+
${encoded_output} Execute Command
53+
... echo -n '{"type":"env","time":"2025-06-27T08:11:05.301804125Z","temperature":{"temperature":258}}' | base64 --wrap\=0
54+
${transformed_msg} Execute Command
55+
... tedge flows test --base64-output te/device/main///m/env '{"time":"2025-06-27T08:11:05.301804125Z", "temperature": 258}'
56+
... strip=True
57+
Should Be Equal
58+
... ${transformed_msg}
59+
... [c8y/measurement/measurements/create] ${encoded_output}
60+
4161
Units are configured using topic metadata
4262
${transformed_msg} Execute Command
4363
... cat /etc/tedge/flows/measurements.samples | awk '{ print $2 }' FS\='INPUT:' | tedge flows test

0 commit comments

Comments
 (0)