Skip to content

Commit fd59be1

Browse files
committed
Add a --processing-time option to tedge flows test
Signed-off-by: Didier Wenzek <[email protected]>
1 parent 4b14e91 commit fd59be1

File tree

6 files changed

+35
-6
lines changed

6 files changed

+35
-6
lines changed

crates/core/tedge/src/cli/flows/cli.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use anyhow::Error;
99
use camino::Utf8PathBuf;
1010
use tedge_config::TEdgeConfig;
1111
use tedge_flows::BaseFlowRegistry;
12+
use tedge_flows::DateTime;
1213
use tedge_flows::Message;
1314
use tedge_flows::MessageProcessor;
1415

@@ -47,6 +48,11 @@ pub enum TEdgeFlowsCli {
4748
#[clap(long = "final-on-interval")]
4849
final_on_interval: bool,
4950

51+
/// Processing time to be used for the test
52+
#[clap(long = "processing-time")]
53+
#[arg(value_parser = parse_date)]
54+
processing_time: Option<DateTime>,
55+
5056
/// The input payloads are base64 encoded and have to be decoded first
5157
#[clap(long = "base64-input")]
5258
base64_input: bool,
@@ -80,6 +86,7 @@ impl BuildCommand for TEdgeFlowsCli {
8086
flows_dir,
8187
flow,
8288
final_on_interval,
89+
processing_time,
8390
base64_input,
8491
base64_output,
8592
topic,
@@ -97,6 +104,7 @@ impl BuildCommand for TEdgeFlowsCli {
97104
flow,
98105
message,
99106
final_on_interval,
107+
processing_time,
100108
base64_input,
101109
base64_output,
102110
}
@@ -137,3 +145,9 @@ impl TEdgeFlowsCli {
137145
Ok(processor)
138146
}
139147
}
148+
149+
fn parse_date(src: &str) -> Result<DateTime, String> {
150+
let time = humantime::parse_rfc3339_weak(src)
151+
.map_err(|e| format!("Unable to parse RFC3339 date: {e}"))?;
152+
time.try_into()
153+
}

crates/core/tedge/src/cli/flows/test.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ pub struct TestCommand {
2222
pub flow: Option<Utf8PathBuf>,
2323
pub message: Option<Message>,
2424
pub final_on_interval: bool,
25+
pub processing_time: Option<DateTime>,
2526
pub base64_input: bool,
2627
pub base64_output: bool,
2728
}
@@ -41,13 +42,13 @@ impl Command for TestCommand {
4142
Some(flow) => TEdgeFlowsCli::load_file(&self.flows_dir, flow).await?,
4243
};
4344
if let Some(message) = &self.message {
44-
let timestamp = DateTime::now();
45+
let timestamp = self.processing_time.unwrap_or_else(|| DateTime::now());
4546
self.process(&mut processor, message.clone(), timestamp)
4647
.await;
4748
} else {
4849
let mut stdin = BufReader::new(tokio::io::stdin());
4950
while let Some(message) = next_message(&mut stdin).await {
50-
let timestamp = DateTime::now();
51+
let timestamp = self.processing_time.unwrap_or_else(|| DateTime::now());
5152
self.process(&mut processor, message, timestamp).await;
5253
}
5354
}

crates/extensions/tedge_flows/src/flow.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use serde_json::Value;
1010
use std::fmt::Display;
1111
use std::fmt::Formatter;
1212
use std::time::Duration;
13+
use std::time::SystemTime;
1314
use tedge_mqtt_ext::MqttMessage;
1415
use tedge_mqtt_ext::Topic;
1516
use tedge_mqtt_ext::TopicFilter;
@@ -409,6 +410,20 @@ impl DateTime {
409410
}
410411
}
411412

413+
impl TryFrom<SystemTime> for DateTime {
414+
type Error = String;
415+
416+
fn try_from(time: SystemTime) -> Result<Self, Self::Error> {
417+
match time.duration_since(SystemTime::UNIX_EPOCH) {
418+
Ok(elapsed) => Ok(DateTime {
419+
seconds: elapsed.as_secs(),
420+
nanoseconds: 0,
421+
}),
422+
Err(_) => Err("SystemTime before UNIX EPOCH!".to_string()),
423+
}
424+
}
425+
}
426+
412427
impl TryFrom<OffsetDateTime> for DateTime {
413428
type Error = FlowError;
414429

tests/RobotFramework/tests/tedge_flows/flows/add_timestamp.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ export function onMessage (message) {
22
let payload = JSON.parse(message.payload)
33
if (!payload.time) {
44
let timestamp = message.timestamp
5-
payload.time = timestamp.seconds + (timestamp.nanoseconds / 1e9)
5+
payload.time = timestamp.getTime() / 1000;
66
}
77

88
return [{

tests/RobotFramework/tests/tedge_flows/flows/drop_stragglers.js

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ export function onMessage (message, config) {
1212
}
1313

1414
let timestamp = message.timestamp
15-
let time = timestamp.seconds + (timestamp.nanoseconds / 1e9)
1615
let max = time + (config.max_advance || 1);
1716
let min = time - (config.max_delay || 10);
1817

tests/RobotFramework/tests/tedge_flows/tedge_flows.robot

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@ Test Tags theme:tedge_flows
1010

1111
*** Test Cases ***
1212
Add missing timestamps
13-
${transformed_msg} Execute Command tedge flows test te/device/main///m/ '{}'
14-
Should Contain ${transformed_msg} item=time
13+
${transformed_msg} Execute Command tedge flows test --processing-time "2025-06-27 11:31:02" te/device/main///m/ '{}'
14+
Should Contain ${transformed_msg} item="time":"2025-06-27T11:31:02.000Z"
1515

1616
Convert timestamps to ISO
1717
${transformed_msg} Execute Command tedge flows test te/device/main///m/ '{"time": 1751023862.000}'

0 commit comments

Comments
 (0)