Skip to content

Commit 82e42e2

Browse files
Merge pull request #3863 from didier-wenzek/feat/tedge-flows-timestamp
feat: Use JS Date for tedge flows timestamps
2 parents d4a9e3e + 3ffad4a commit 82e42e2

File tree

17 files changed

+160
-176
lines changed

17 files changed

+160
-176
lines changed

crates/core/tedge/src/cli/flows/cli.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use anyhow::anyhow;
77
use anyhow::Context;
88
use anyhow::Error;
99
use camino::Utf8PathBuf;
10+
use std::time::SystemTime;
1011
use tedge_config::TEdgeConfig;
1112
use tedge_flows::BaseFlowRegistry;
1213
use tedge_flows::Message;
@@ -47,6 +48,11 @@ pub enum TEdgeFlowsCli {
4748
#[clap(long = "final-on-interval")]
4849
final_on_interval: bool,
4950

51+
/// Processing time to be used for the test
52+
#[clap(long = "processing-time")]
53+
#[arg(value_parser = humantime::parse_rfc3339_weak)]
54+
processing_time: Option<SystemTime>,
55+
5056
/// The input payloads are base64 encoded and have to be decoded first
5157
#[clap(long = "base64-input")]
5258
base64_input: bool,
@@ -80,6 +86,7 @@ impl BuildCommand for TEdgeFlowsCli {
8086
flows_dir,
8187
flow,
8288
final_on_interval,
89+
processing_time,
8390
base64_input,
8491
base64_output,
8592
topic,
@@ -97,6 +104,7 @@ impl BuildCommand for TEdgeFlowsCli {
97104
flow,
98105
message,
99106
final_on_interval,
107+
processing_time,
100108
base64_input,
101109
base64_output,
102110
}

crates/core/tedge/src/cli/flows/test.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,9 @@ use anyhow::Error;
55
use base64::prelude::BASE64_STANDARD;
66
use base64::prelude::*;
77
use camino::Utf8PathBuf;
8+
use std::time::SystemTime;
89
use tedge_config::TEdgeConfig;
910
use tedge_flows::BaseFlowRegistry;
10-
use tedge_flows::DateTime;
1111
use tedge_flows::FlowResult;
1212
use tedge_flows::Message;
1313
use tedge_flows::MessageProcessor;
@@ -22,6 +22,7 @@ pub struct TestCommand {
2222
pub flow: Option<Utf8PathBuf>,
2323
pub message: Option<Message>,
2424
pub final_on_interval: bool,
25+
pub processing_time: Option<SystemTime>,
2526
pub base64_input: bool,
2627
pub base64_output: bool,
2728
}
@@ -41,18 +42,18 @@ impl Command for TestCommand {
4142
Some(flow) => TEdgeFlowsCli::load_file(&self.flows_dir, flow).await?,
4243
};
4344
if let Some(message) = &self.message {
44-
let timestamp = DateTime::now();
45+
let timestamp = self.processing_time.unwrap_or_else(SystemTime::now);
4546
self.process(&mut processor, message.clone(), timestamp)
4647
.await;
4748
} else {
4849
let mut stdin = BufReader::new(tokio::io::stdin());
4950
while let Some(message) = next_message(&mut stdin).await {
50-
let timestamp = DateTime::now();
51+
let timestamp = self.processing_time.unwrap_or_else(SystemTime::now);
5152
self.process(&mut processor, message, timestamp).await;
5253
}
5354
}
5455
if self.final_on_interval {
55-
let timestamp = DateTime::now();
56+
let timestamp = SystemTime::now();
5657
let now = processor
5758
.last_interval_deadline()
5859
.unwrap_or_else(Instant::now);
@@ -67,7 +68,7 @@ impl TestCommand {
6768
&self,
6869
processor: &mut MessageProcessor<BaseFlowRegistry>,
6970
mut message: Message,
70-
timestamp: DateTime,
71+
timestamp: SystemTime,
7172
) {
7273
if self.base64_input {
7374
match BASE64_STANDARD.decode(message.payload) {
@@ -89,7 +90,7 @@ impl TestCommand {
8990
async fn tick(
9091
&self,
9192
processor: &mut MessageProcessor<BaseFlowRegistry>,
92-
timestamp: DateTime,
93+
timestamp: SystemTime,
9394
now: Instant,
9495
) {
9596
processor

crates/extensions/tedge_flows/src/actor.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
use crate::connected_flow::ConnectedFlowRegistry;
2-
use crate::flow::DateTime;
32
use crate::flow::FlowError;
43
use crate::flow::FlowOutput;
54
use crate::flow::FlowResult;
@@ -16,6 +15,7 @@ use serde_json::json;
1615
use std::cmp::min;
1716
use std::collections::HashSet;
1817
use std::time::Duration;
18+
use std::time::SystemTime;
1919
use tedge_actors::Actor;
2020
use tedge_actors::DynSender;
2121
use tedge_actors::MessageReceiver;
@@ -207,7 +207,7 @@ impl FlowsMapper {
207207

208208
async fn on_source_poll(&mut self) -> Result<(), RuntimeError> {
209209
let now = Instant::now();
210-
let timestamp = DateTime::now();
210+
let timestamp = SystemTime::now();
211211

212212
let mut in_messages = vec![];
213213
for flow in self.processor.registry.flows_mut() {
@@ -241,7 +241,7 @@ impl FlowsMapper {
241241
source: SourceTag,
242242
message: Message,
243243
) -> Result<(), RuntimeError> {
244-
let timestamp = DateTime::now();
244+
let timestamp = SystemTime::now();
245245
for messages in self
246246
.processor
247247
.on_message(timestamp, &source, &message)
@@ -255,7 +255,7 @@ impl FlowsMapper {
255255

256256
async fn on_interval(&mut self) -> Result<(), RuntimeError> {
257257
let now = Instant::now();
258-
let timestamp = DateTime::now();
258+
let timestamp = SystemTime::now();
259259
if self.next_dump <= now {
260260
self.processor.dump_memory_stats().await;
261261
self.processor.dump_processing_stats().await;
@@ -295,7 +295,7 @@ impl FlowsMapper {
295295
) -> Result<(), RuntimeError> {
296296
if let Some(flow) = self.processor.registry.flow(&flow_name) {
297297
let topic = flow.input_topic().to_string();
298-
let timestamp = DateTime::now();
298+
let timestamp = SystemTime::now();
299299
let message = Message::new(topic, line);
300300
if let Some(result) = self
301301
.processor

crates/extensions/tedge_flows/src/connected_flow.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
use crate::config::ConfigError;
2-
use crate::flow::DateTime;
32
use crate::flow::Flow;
43
use crate::flow::FlowError;
54
use crate::flow::FlowInput;
@@ -14,6 +13,7 @@ use crate::input_source::StreamingSource;
1413
use crate::registry::FlowRegistry;
1514
use crate::registry::FlowStore;
1615
use camino::Utf8Path;
16+
use std::time::SystemTime;
1717
use tedge_watch_ext::WatchRequest;
1818
use tokio::time::Instant;
1919

@@ -65,14 +65,14 @@ impl ConnectedFlow {
6565
self.polling_source.as_ref().and_then(|p| p.next_deadline())
6666
}
6767

68-
pub async fn on_source_poll(&mut self, timestamp: DateTime, now: Instant) -> FlowResult {
68+
pub async fn on_source_poll(&mut self, timestamp: SystemTime, now: Instant) -> FlowResult {
6969
let result = self.on_source_poll_steps(timestamp, now).await;
7070
self.flow.publish(result)
7171
}
7272

7373
async fn on_source_poll_steps(
7474
&mut self,
75-
timestamp: DateTime,
75+
timestamp: SystemTime,
7676
now: Instant,
7777
) -> Result<Vec<Message>, FlowError> {
7878
let Some(source) = &mut self.polling_source.as_mut() else {

crates/extensions/tedge_flows/src/flow.rs

Lines changed: 15 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,11 @@ use serde_json::Value;
1010
use std::fmt::Display;
1111
use std::fmt::Formatter;
1212
use std::time::Duration;
13+
use std::time::SystemTime;
1314
use tedge_mqtt_ext::MqttMessage;
1415
use tedge_mqtt_ext::Topic;
1516
use tedge_mqtt_ext::TopicFilter;
1617
use tedge_watch_ext::WatchError;
17-
use time::OffsetDateTime;
1818
use tokio::time::Instant;
1919
use tracing::error;
2020
use tracing::warn;
@@ -112,17 +112,11 @@ impl FlowResult {
112112
}
113113
}
114114

115-
#[derive(Copy, Clone, Debug, serde::Deserialize, serde::Serialize, Eq, PartialEq)]
116-
pub struct DateTime {
117-
pub seconds: u64,
118-
pub nanoseconds: u32,
119-
}
120-
121115
#[derive(Clone, serde::Deserialize, serde::Serialize, Eq, PartialEq)]
122116
pub struct Message {
123117
pub topic: String,
124118
pub payload: Vec<u8>,
125-
pub timestamp: Option<DateTime>,
119+
pub timestamp: Option<SystemTime>,
126120
}
127121

128122
#[derive(thiserror::Error, Debug)]
@@ -198,7 +192,7 @@ impl Flow {
198192
&mut self,
199193
js_runtime: &JsRuntime,
200194
stats: &mut Counter,
201-
timestamp: DateTime,
195+
timestamp: SystemTime,
202196
message: &Message,
203197
) -> FlowResult {
204198
let stated_at = stats.flow_on_message_start(self.name());
@@ -218,7 +212,7 @@ impl Flow {
218212
&mut self,
219213
js_runtime: &JsRuntime,
220214
stats: &mut Counter,
221-
timestamp: DateTime,
215+
timestamp: SystemTime,
222216
message: &Message,
223217
) -> Result<Vec<Message>, FlowError> {
224218
let mut messages = vec![message.clone()];
@@ -246,7 +240,7 @@ impl Flow {
246240
&mut self,
247241
js_runtime: &JsRuntime,
248242
stats: &mut Counter,
249-
timestamp: DateTime,
243+
timestamp: SystemTime,
250244
now: Instant,
251245
) -> FlowResult {
252246
let stated_at = stats.flow_on_interval_start(self.name());
@@ -266,7 +260,7 @@ impl Flow {
266260
&mut self,
267261
js_runtime: &JsRuntime,
268262
stats: &mut Counter,
269-
timestamp: DateTime,
263+
timestamp: SystemTime,
270264
now: Instant,
271265
) -> Result<Vec<Message>, FlowError> {
272266
let mut messages = vec![];
@@ -399,36 +393,6 @@ impl FlowStep {
399393
}
400394
}
401395

402-
impl DateTime {
403-
pub fn now() -> Self {
404-
DateTime::try_from(OffsetDateTime::now_utc()).unwrap()
405-
}
406-
407-
pub fn tick_now(&self, tick_every: std::time::Duration) -> bool {
408-
let tick_every_secs = tick_every.as_secs();
409-
tick_every_secs != 0 && (self.seconds % tick_every_secs == 0)
410-
}
411-
412-
pub fn json(&self) -> Value {
413-
json!({"seconds": self.seconds, "nanoseconds": self.nanoseconds})
414-
}
415-
}
416-
417-
impl TryFrom<OffsetDateTime> for DateTime {
418-
type Error = FlowError;
419-
420-
fn try_from(value: OffsetDateTime) -> Result<Self, Self::Error> {
421-
let seconds = u64::try_from(value.unix_timestamp()).map_err(|err| {
422-
FlowError::UnsupportedMessage(format!("failed to convert timestamp: {}", err))
423-
})?;
424-
425-
Ok(DateTime {
426-
seconds,
427-
nanoseconds: value.nanosecond(),
428-
})
429-
}
430-
}
431-
432396
impl Message {
433397
pub fn new(topic: impl ToString, payload: impl Into<Vec<u8>>) -> Self {
434398
Message {
@@ -441,7 +405,7 @@ impl Message {
441405
pub fn with_timestamp(
442406
topic: impl ToString,
443407
payload: impl Into<Vec<u8>>,
444-
timestamp: DateTime,
408+
timestamp: SystemTime,
445409
) -> Self {
446410
Message {
447411
topic: topic.to_string(),
@@ -450,15 +414,9 @@ impl Message {
450414
}
451415
}
452416

453-
#[cfg(test)]
454-
pub fn sent_now(mut self) -> Self {
455-
self.timestamp = Some(DateTime::now());
456-
self
457-
}
458-
459417
pub fn json(&self) -> Value {
460418
if let Some(timestamp) = &self.timestamp {
461-
json!({"topic": self.topic, "payload": self.payload, "timestamp": timestamp.json()})
419+
json!({"topic": self.topic, "payload": self.payload, "timestamp": epoch_ms(timestamp)})
462420
} else {
463421
json!({"topic": self.topic, "payload": self.payload, "timestamp": null})
464422
}
@@ -508,6 +466,13 @@ impl TryFrom<Message> for MqttMessage {
508466
}
509467
}
510468

469+
pub(crate) fn epoch_ms(time: &SystemTime) -> u128 {
470+
let duration = time
471+
.duration_since(SystemTime::UNIX_EPOCH)
472+
.expect("SystemTime after UNIX EPOCH");
473+
duration.as_millis()
474+
}
475+
511476
pub fn error_from_js(err: LoadError) -> FlowError {
512477
FlowError::IncorrectSetting(format!("{err:#}"))
513478
}

crates/extensions/tedge_flows/src/input_source.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
use crate::flow::DateTime;
21
use crate::flow::Message;
32
use async_trait::async_trait;
43
use camino::Utf8PathBuf;
54
use std::time::Duration;
5+
use std::time::SystemTime;
66
use tedge_watch_ext::WatchRequest;
77
use tokio::time::Instant;
88

@@ -16,7 +16,7 @@ pub trait StreamingSource: Send + Sync {
1616
#[async_trait]
1717
pub trait PollingSource: Send + Sync {
1818
/// Poll the source for any available messages at the given timestamp
19-
async fn poll(&mut self, timestamp: DateTime) -> Result<Vec<Message>, PollingSourceError>;
19+
async fn poll(&mut self, timestamp: SystemTime) -> Result<Vec<Message>, PollingSourceError>;
2020

2121
/// Get the next deadline when this source should be polled
2222
/// Returns None if the source doesn't have scheduled polling
@@ -53,7 +53,7 @@ impl CommandPollingSource {
5353

5454
#[async_trait]
5555
impl PollingSource for CommandPollingSource {
56-
async fn poll(&mut self, timestamp: DateTime) -> Result<Vec<Message>, PollingSourceError> {
56+
async fn poll(&mut self, timestamp: SystemTime) -> Result<Vec<Message>, PollingSourceError> {
5757
let output = tedge_watch_ext::command_output(&self.command)
5858
.await
5959
.map_err(|err| PollingSourceError::CannotPoll {
@@ -119,7 +119,7 @@ impl FilePollingSource {
119119

120120
#[async_trait]
121121
impl PollingSource for FilePollingSource {
122-
async fn poll(&mut self, timestamp: DateTime) -> Result<Vec<Message>, PollingSourceError> {
122+
async fn poll(&mut self, timestamp: SystemTime) -> Result<Vec<Message>, PollingSourceError> {
123123
let output = tokio::fs::read_to_string(&self.path).await.map_err(|err| {
124124
PollingSourceError::CannotPoll {
125125
resource: self.path.clone().to_string(),

0 commit comments

Comments
 (0)