Skip to content

Commit bb5f526

Browse files
committed
Deprecate adhoc DateTime in favor of std SystemTime
Signed-off-by: Didier Wenzek <[email protected]>
1 parent fd59be1 commit bb5f526

File tree

9 files changed

+75
-122
lines changed

9 files changed

+75
-122
lines changed

crates/core/tedge/src/cli/flows/cli.rs

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@ use anyhow::anyhow;
77
use anyhow::Context;
88
use anyhow::Error;
99
use camino::Utf8PathBuf;
10+
use std::time::SystemTime;
1011
use tedge_config::TEdgeConfig;
1112
use tedge_flows::BaseFlowRegistry;
12-
use tedge_flows::DateTime;
1313
use tedge_flows::Message;
1414
use tedge_flows::MessageProcessor;
1515

@@ -50,8 +50,8 @@ pub enum TEdgeFlowsCli {
5050

5151
/// Processing time to be used for the test
5252
#[clap(long = "processing-time")]
53-
#[arg(value_parser = parse_date)]
54-
processing_time: Option<DateTime>,
53+
#[arg(value_parser = humantime::parse_rfc3339_weak)]
54+
processing_time: Option<SystemTime>,
5555

5656
/// The input payloads are base64 encoded and have to be decoded first
5757
#[clap(long = "base64-input")]
@@ -145,9 +145,3 @@ impl TEdgeFlowsCli {
145145
Ok(processor)
146146
}
147147
}
148-
149-
fn parse_date(src: &str) -> Result<DateTime, String> {
150-
let time = humantime::parse_rfc3339_weak(src)
151-
.map_err(|e| format!("Unable to parse RFC3339 date: {e}"))?;
152-
time.try_into()
153-
}

crates/core/tedge/src/cli/flows/test.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,9 @@ use anyhow::Error;
55
use base64::prelude::BASE64_STANDARD;
66
use base64::prelude::*;
77
use camino::Utf8PathBuf;
8+
use std::time::SystemTime;
89
use tedge_config::TEdgeConfig;
910
use tedge_flows::BaseFlowRegistry;
10-
use tedge_flows::DateTime;
1111
use tedge_flows::FlowResult;
1212
use tedge_flows::Message;
1313
use tedge_flows::MessageProcessor;
@@ -22,7 +22,7 @@ pub struct TestCommand {
2222
pub flow: Option<Utf8PathBuf>,
2323
pub message: Option<Message>,
2424
pub final_on_interval: bool,
25-
pub processing_time: Option<DateTime>,
25+
pub processing_time: Option<SystemTime>,
2626
pub base64_input: bool,
2727
pub base64_output: bool,
2828
}
@@ -42,18 +42,18 @@ impl Command for TestCommand {
4242
Some(flow) => TEdgeFlowsCli::load_file(&self.flows_dir, flow).await?,
4343
};
4444
if let Some(message) = &self.message {
45-
let timestamp = self.processing_time.unwrap_or_else(|| DateTime::now());
45+
let timestamp = self.processing_time.unwrap_or_else(SystemTime::now);
4646
self.process(&mut processor, message.clone(), timestamp)
4747
.await;
4848
} else {
4949
let mut stdin = BufReader::new(tokio::io::stdin());
5050
while let Some(message) = next_message(&mut stdin).await {
51-
let timestamp = self.processing_time.unwrap_or_else(|| DateTime::now());
51+
let timestamp = self.processing_time.unwrap_or_else(SystemTime::now);
5252
self.process(&mut processor, message, timestamp).await;
5353
}
5454
}
5555
if self.final_on_interval {
56-
let timestamp = DateTime::now();
56+
let timestamp = SystemTime::now();
5757
let now = processor
5858
.last_interval_deadline()
5959
.unwrap_or_else(Instant::now);
@@ -68,7 +68,7 @@ impl TestCommand {
6868
&self,
6969
processor: &mut MessageProcessor<BaseFlowRegistry>,
7070
mut message: Message,
71-
timestamp: DateTime,
71+
timestamp: SystemTime,
7272
) {
7373
if self.base64_input {
7474
match BASE64_STANDARD.decode(message.payload) {
@@ -90,7 +90,7 @@ impl TestCommand {
9090
async fn tick(
9191
&self,
9292
processor: &mut MessageProcessor<BaseFlowRegistry>,
93-
timestamp: DateTime,
93+
timestamp: SystemTime,
9494
now: Instant,
9595
) {
9696
processor

crates/extensions/tedge_flows/src/actor.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
use crate::connected_flow::ConnectedFlowRegistry;
2-
use crate::flow::DateTime;
32
use crate::flow::FlowError;
43
use crate::flow::FlowOutput;
54
use crate::flow::FlowResult;
@@ -16,6 +15,7 @@ use serde_json::json;
1615
use std::cmp::min;
1716
use std::collections::HashSet;
1817
use std::time::Duration;
18+
use std::time::SystemTime;
1919
use tedge_actors::Actor;
2020
use tedge_actors::DynSender;
2121
use tedge_actors::MessageReceiver;
@@ -207,7 +207,7 @@ impl FlowsMapper {
207207

208208
async fn on_source_poll(&mut self) -> Result<(), RuntimeError> {
209209
let now = Instant::now();
210-
let timestamp = DateTime::now();
210+
let timestamp = SystemTime::now();
211211

212212
let mut in_messages = vec![];
213213
for flow in self.processor.registry.flows_mut() {
@@ -241,7 +241,7 @@ impl FlowsMapper {
241241
source: SourceTag,
242242
message: Message,
243243
) -> Result<(), RuntimeError> {
244-
let timestamp = DateTime::now();
244+
let timestamp = SystemTime::now();
245245
for messages in self
246246
.processor
247247
.on_message(timestamp, &source, &message)
@@ -255,7 +255,7 @@ impl FlowsMapper {
255255

256256
async fn on_interval(&mut self) -> Result<(), RuntimeError> {
257257
let now = Instant::now();
258-
let timestamp = DateTime::now();
258+
let timestamp = SystemTime::now();
259259
if self.next_dump <= now {
260260
self.processor.dump_memory_stats().await;
261261
self.processor.dump_processing_stats().await;
@@ -295,7 +295,7 @@ impl FlowsMapper {
295295
) -> Result<(), RuntimeError> {
296296
if let Some(flow) = self.processor.registry.flow(&flow_name) {
297297
let topic = flow.input_topic().to_string();
298-
let timestamp = DateTime::now();
298+
let timestamp = SystemTime::now();
299299
let message = Message::new(topic, line);
300300
if let Some(result) = self
301301
.processor

crates/extensions/tedge_flows/src/connected_flow.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
use crate::config::ConfigError;
2-
use crate::flow::DateTime;
32
use crate::flow::Flow;
43
use crate::flow::FlowError;
54
use crate::flow::FlowInput;
@@ -14,6 +13,7 @@ use crate::input_source::StreamingSource;
1413
use crate::registry::FlowRegistry;
1514
use crate::registry::FlowStore;
1615
use camino::Utf8Path;
16+
use std::time::SystemTime;
1717
use tedge_watch_ext::WatchRequest;
1818
use tokio::time::Instant;
1919

@@ -65,14 +65,14 @@ impl ConnectedFlow {
6565
self.polling_source.as_ref().and_then(|p| p.next_deadline())
6666
}
6767

68-
pub async fn on_source_poll(&mut self, timestamp: DateTime, now: Instant) -> FlowResult {
68+
pub async fn on_source_poll(&mut self, timestamp: SystemTime, now: Instant) -> FlowResult {
6969
let result = self.on_source_poll_steps(timestamp, now).await;
7070
self.flow.publish(result)
7171
}
7272

7373
async fn on_source_poll_steps(
7474
&mut self,
75-
timestamp: DateTime,
75+
timestamp: SystemTime,
7676
now: Instant,
7777
) -> Result<Vec<Message>, FlowError> {
7878
let Some(source) = &mut self.polling_source.as_mut() else {

crates/extensions/tedge_flows/src/flow.rs

Lines changed: 14 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ use tedge_mqtt_ext::MqttMessage;
1515
use tedge_mqtt_ext::Topic;
1616
use tedge_mqtt_ext::TopicFilter;
1717
use tedge_watch_ext::WatchError;
18-
use time::OffsetDateTime;
1918
use tokio::time::Instant;
2019
use tracing::error;
2120
use tracing::warn;
@@ -113,17 +112,11 @@ impl FlowResult {
113112
}
114113
}
115114

116-
#[derive(Copy, Clone, Debug, serde::Deserialize, serde::Serialize, Eq, PartialEq)]
117-
pub struct DateTime {
118-
pub seconds: u64,
119-
pub nanoseconds: u32,
120-
}
121-
122115
#[derive(Clone, serde::Deserialize, serde::Serialize, Eq, PartialEq)]
123116
pub struct Message {
124117
pub topic: String,
125118
pub payload: Vec<u8>,
126-
pub timestamp: Option<DateTime>,
119+
pub timestamp: Option<SystemTime>,
127120
}
128121

129122
#[derive(thiserror::Error, Debug)]
@@ -199,7 +192,7 @@ impl Flow {
199192
&mut self,
200193
js_runtime: &JsRuntime,
201194
stats: &mut Counter,
202-
timestamp: DateTime,
195+
timestamp: SystemTime,
203196
message: &Message,
204197
) -> FlowResult {
205198
let stated_at = stats.flow_on_message_start(self.name());
@@ -219,7 +212,7 @@ impl Flow {
219212
&mut self,
220213
js_runtime: &JsRuntime,
221214
stats: &mut Counter,
222-
timestamp: DateTime,
215+
timestamp: SystemTime,
223216
message: &Message,
224217
) -> Result<Vec<Message>, FlowError> {
225218
let mut messages = vec![message.clone()];
@@ -247,7 +240,7 @@ impl Flow {
247240
&mut self,
248241
js_runtime: &JsRuntime,
249242
stats: &mut Counter,
250-
timestamp: DateTime,
243+
timestamp: SystemTime,
251244
now: Instant,
252245
) -> FlowResult {
253246
let stated_at = stats.flow_on_interval_start(self.name());
@@ -267,7 +260,7 @@ impl Flow {
267260
&mut self,
268261
js_runtime: &JsRuntime,
269262
stats: &mut Counter,
270-
timestamp: DateTime,
263+
timestamp: SystemTime,
271264
now: Instant,
272265
) -> Result<Vec<Message>, FlowError> {
273266
let mut messages = vec![];
@@ -400,45 +393,6 @@ impl FlowStep {
400393
}
401394
}
402395

403-
impl DateTime {
404-
pub fn now() -> Self {
405-
DateTime::try_from(OffsetDateTime::now_utc()).unwrap()
406-
}
407-
408-
pub fn json(&self) -> Value {
409-
json!({"seconds": self.seconds, "nanoseconds": self.nanoseconds})
410-
}
411-
}
412-
413-
impl TryFrom<SystemTime> for DateTime {
414-
type Error = String;
415-
416-
fn try_from(time: SystemTime) -> Result<Self, Self::Error> {
417-
match time.duration_since(SystemTime::UNIX_EPOCH) {
418-
Ok(elapsed) => Ok(DateTime {
419-
seconds: elapsed.as_secs(),
420-
nanoseconds: 0,
421-
}),
422-
Err(_) => Err("SystemTime before UNIX EPOCH!".to_string()),
423-
}
424-
}
425-
}
426-
427-
impl TryFrom<OffsetDateTime> for DateTime {
428-
type Error = FlowError;
429-
430-
fn try_from(value: OffsetDateTime) -> Result<Self, Self::Error> {
431-
let seconds = u64::try_from(value.unix_timestamp()).map_err(|err| {
432-
FlowError::UnsupportedMessage(format!("failed to convert timestamp: {}", err))
433-
})?;
434-
435-
Ok(DateTime {
436-
seconds,
437-
nanoseconds: value.nanosecond(),
438-
})
439-
}
440-
}
441-
442396
impl Message {
443397
pub fn new(topic: impl ToString, payload: impl Into<Vec<u8>>) -> Self {
444398
Message {
@@ -451,7 +405,7 @@ impl Message {
451405
pub fn with_timestamp(
452406
topic: impl ToString,
453407
payload: impl Into<Vec<u8>>,
454-
timestamp: DateTime,
408+
timestamp: SystemTime,
455409
) -> Self {
456410
Message {
457411
topic: topic.to_string(),
@@ -462,7 +416,7 @@ impl Message {
462416

463417
pub fn json(&self) -> Value {
464418
if let Some(timestamp) = &self.timestamp {
465-
json!({"topic": self.topic, "payload": self.payload, "timestamp": timestamp.json()})
419+
json!({"topic": self.topic, "payload": self.payload, "timestamp": epoch_ms(timestamp)})
466420
} else {
467421
json!({"topic": self.topic, "payload": self.payload, "timestamp": null})
468422
}
@@ -512,6 +466,13 @@ impl TryFrom<Message> for MqttMessage {
512466
}
513467
}
514468

469+
pub(crate) fn epoch_ms(time: &SystemTime) -> u128 {
470+
let duration = time
471+
.duration_since(SystemTime::UNIX_EPOCH)
472+
.expect("SystemTime after UNIX EPOCH");
473+
duration.as_millis()
474+
}
475+
515476
pub fn error_from_js(err: LoadError) -> FlowError {
516477
FlowError::IncorrectSetting(format!("{err:#}"))
517478
}

crates/extensions/tedge_flows/src/input_source.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
use crate::flow::DateTime;
21
use crate::flow::Message;
32
use async_trait::async_trait;
43
use camino::Utf8PathBuf;
54
use std::time::Duration;
5+
use std::time::SystemTime;
66
use tedge_watch_ext::WatchRequest;
77
use tokio::time::Instant;
88

@@ -16,7 +16,7 @@ pub trait StreamingSource: Send + Sync {
1616
#[async_trait]
1717
pub trait PollingSource: Send + Sync {
1818
/// Poll the source for any available messages at the given timestamp
19-
async fn poll(&mut self, timestamp: DateTime) -> Result<Vec<Message>, PollingSourceError>;
19+
async fn poll(&mut self, timestamp: SystemTime) -> Result<Vec<Message>, PollingSourceError>;
2020

2121
/// Get the next deadline when this source should be polled
2222
/// Returns None if the source doesn't have scheduled polling
@@ -53,7 +53,7 @@ impl CommandPollingSource {
5353

5454
#[async_trait]
5555
impl PollingSource for CommandPollingSource {
56-
async fn poll(&mut self, timestamp: DateTime) -> Result<Vec<Message>, PollingSourceError> {
56+
async fn poll(&mut self, timestamp: SystemTime) -> Result<Vec<Message>, PollingSourceError> {
5757
let output = tedge_watch_ext::command_output(&self.command)
5858
.await
5959
.map_err(|err| PollingSourceError::CannotPoll {
@@ -119,7 +119,7 @@ impl FilePollingSource {
119119

120120
#[async_trait]
121121
impl PollingSource for FilePollingSource {
122-
async fn poll(&mut self, timestamp: DateTime) -> Result<Vec<Message>, PollingSourceError> {
122+
async fn poll(&mut self, timestamp: SystemTime) -> Result<Vec<Message>, PollingSourceError> {
123123
let output = tokio::fs::read_to_string(&self.path).await.map_err(|err| {
124124
PollingSourceError::CannotPoll {
125125
resource: self.path.clone().to_string(),

0 commit comments

Comments
 (0)