Skip to content

Commit 33fd1a9

Browse files
committed
fixup! Add base64 encoding support to tedge flows test
1 parent 93014c5 commit 33fd1a9

File tree

3 files changed

+40
-52
lines changed

3 files changed

+40
-52
lines changed

crates/core/tedge/src/cli/flows/cli.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
use crate::cli::flows::decode_message;
21
use crate::cli::flows::list::ListCommand;
32
use crate::cli::flows::test::TestCommand;
43
use crate::command::BuildCommand;
@@ -9,6 +8,7 @@ use anyhow::Context;
98
use anyhow::Error;
109
use std::path::PathBuf;
1110
use tedge_config::TEdgeConfig;
11+
use tedge_flows::flow::Message;
1212
use tedge_flows::MessageProcessor;
1313

1414
#[derive(clap::Subcommand, Debug)]
@@ -86,9 +86,7 @@ impl BuildCommand for TEdgeFlowsCli {
8686
} => {
8787
let flows_dir = flows_dir.unwrap_or_else(|| Self::default_flows_dir(config));
8888
let message = match (topic, payload) {
89-
(Some(topic), Some(payload)) => {
90-
Some(decode_message(topic, payload, base64_input)?)
91-
}
89+
(Some(topic), Some(payload)) => Some(Message::new(topic, payload)),
9290
(Some(_), None) => Err(anyhow!("Missing sample payload"))?,
9391
(None, Some(_)) => Err(anyhow!("Missing sample topic"))?,
9492
(None, None) => None,

crates/core/tedge/src/cli/flows/mod.rs

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -2,27 +2,4 @@ mod cli;
22
mod list;
33
mod test;
44

5-
use base64::prelude::*;
65
pub use cli::TEdgeFlowsCli;
7-
use tedge_flows::flow::Message;
8-
9-
fn decode_message(
10-
topic: String,
11-
payload: String,
12-
base64_payload: bool,
13-
) -> Result<Message, anyhow::Error> {
14-
let payload = if base64_payload {
15-
BASE64_STANDARD.decode(payload.as_bytes())?
16-
} else {
17-
payload.into_bytes()
18-
};
19-
Ok(Message::new(topic, payload))
20-
}
21-
22-
fn encode_message(mut message: Message, base64_payload: bool) -> Message {
23-
if base64_payload {
24-
message.payload = BASE64_STANDARD.encode(message.payload).into_bytes();
25-
};
26-
27-
message
28-
}

crates/core/tedge/src/cli/flows/test.rs

Lines changed: 38 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1-
use crate::cli::flows::decode_message;
2-
use crate::cli::flows::encode_message;
31
use crate::cli::flows::TEdgeFlowsCli;
42
use crate::command::Command;
53
use crate::log::MaybeFancy;
64
use anyhow::Error;
5+
use base64::prelude::BASE64_STANDARD;
6+
use base64::prelude::*;
77
use std::path::PathBuf;
88
use tedge_config::TEdgeConfig;
99
use tedge_flows::flow::*;
@@ -37,12 +37,13 @@ impl Command for TestCommand {
3737
};
3838
if let Some(message) = &self.message {
3939
let timestamp = DateTime::now();
40-
self.process(&mut processor, message, &timestamp).await;
40+
self.process(&mut processor, message.clone(), &timestamp)
41+
.await;
4142
} else {
4243
let mut stdin = BufReader::new(tokio::io::stdin());
43-
while let Some(message) = next_message(self.base64_input, &mut stdin).await {
44+
while let Some(message) = next_message(&mut stdin).await {
4445
let timestamp = DateTime::now();
45-
self.process(&mut processor, &message, &timestamp).await;
46+
self.process(&mut processor, message, &timestamp).await;
4647
}
4748
}
4849
if self.final_on_interval {
@@ -57,41 +58,52 @@ impl TestCommand {
5758
async fn process(
5859
&self,
5960
processor: &mut MessageProcessor,
60-
message: &Message,
61+
mut message: Message,
6162
timestamp: &DateTime,
6263
) {
64+
if self.base64_input {
65+
match BASE64_STANDARD.decode(message.payload) {
66+
Ok(decoded) => message.payload = decoded,
67+
Err(err) => {
68+
tracing::error!("Cannot decode message: {}", err);
69+
return;
70+
}
71+
}
72+
};
6373
processor
64-
.on_message(timestamp, message)
74+
.on_message(timestamp, &message)
6575
.await
6676
.into_iter()
67-
.map(|(_, v)| v)
68-
.for_each(|msg| print(self.base64_output, msg))
77+
.for_each(|msg| self.print_messages(msg))
6978
}
7079

7180
async fn tick(&self, processor: &mut MessageProcessor, timestamp: &DateTime) {
7281
processor
7382
.on_interval(timestamp)
7483
.await
7584
.into_iter()
76-
.map(|(_, v)| v)
77-
.for_each(|msg| print(self.base64_output, msg))
85+
.for_each(|msg| self.print_messages(msg))
7886
}
79-
}
8087

81-
fn print(base64_output: bool, messages: Result<Vec<Message>, FlowError>) {
82-
match messages {
83-
Ok(messages) => {
84-
for message in messages {
85-
println!("{}", encode_message(message, base64_output));
88+
fn print_messages(&self, (flow, messages): (String, Result<Vec<Message>, FlowError>)) {
89+
match messages {
90+
Ok(mut messages) => {
91+
if self.base64_output {
92+
for message in messages.iter_mut() {
93+
message.payload = BASE64_STANDARD.encode(&message.payload).into_bytes();
94+
}
95+
}
96+
for message in messages {
97+
println!("{}", message);
98+
}
99+
}
100+
Err(err) => {
101+
tracing::error!("Error in {flow}: {}", err)
86102
}
87-
}
88-
Err(err) => {
89-
eprintln!("Error: {}", err)
90103
}
91104
}
92105
}
93-
94-
fn parse(base64_input: bool, line: String) -> Result<Option<Message>, Error> {
106+
fn parse_input(line: String) -> Result<Option<Message>, Error> {
95107
let line = line.trim();
96108
if line.is_empty() {
97109
return Ok(None);
@@ -105,7 +117,7 @@ fn parse(base64_input: bool, line: String) -> Result<Option<Message>, Error> {
105117

106118
let topic = line[1..closing_bracket].to_string();
107119
let payload = line[closing_bracket + 1..].to_string();
108-
let message = decode_message(topic, payload, base64_input)?;
120+
let message = Message::new(topic, payload);
109121

110122
Ok(Some(message))
111123
}
@@ -128,9 +140,10 @@ async fn next_line(input: &mut BufReader<Stdin>) -> Option<String> {
128140
}
129141
}
130142
}
131-
async fn next_message(base64_input: bool, input: &mut BufReader<Stdin>) -> Option<Message> {
143+
144+
async fn next_message(input: &mut BufReader<Stdin>) -> Option<Message> {
132145
let line = next_line(input).await?;
133-
match parse(base64_input, line) {
146+
match parse_input(line) {
134147
Ok(message) => message,
135148
Err(err) => {
136149
eprintln!("Fail to parse input message {}", err);

0 commit comments

Comments
 (0)