Skip to content

Commit 4b14e91

Browse files
committed
Use JS Date for message timestamps
Signed-off-by: Didier Wenzek <[email protected]>
1 parent a8a6f83 commit 4b14e91

File tree

4 files changed

+48
-66
lines changed

4 files changed

+48
-66
lines changed

crates/extensions/tedge_flows/src/flow.rs

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -404,11 +404,6 @@ impl DateTime {
404404
DateTime::try_from(OffsetDateTime::now_utc()).unwrap()
405405
}
406406

407-
pub fn tick_now(&self, tick_every: std::time::Duration) -> bool {
408-
let tick_every_secs = tick_every.as_secs();
409-
tick_every_secs != 0 && (self.seconds % tick_every_secs == 0)
410-
}
411-
412407
pub fn json(&self) -> Value {
413408
json!({"seconds": self.seconds, "nanoseconds": self.nanoseconds})
414409
}
@@ -450,12 +445,6 @@ impl Message {
450445
}
451446
}
452447

453-
#[cfg(test)]
454-
pub fn sent_now(mut self) -> Self {
455-
self.timestamp = Some(DateTime::now());
456-
self
457-
}
458-
459448
pub fn json(&self) -> Value {
460449
if let Some(timestamp) = &self.timestamp {
461450
json!({"topic": self.topic, "payload": self.payload, "timestamp": timestamp.json()})

crates/extensions/tedge_flows/src/js_script.rs

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ mod tests {
181181
let js = "export function onMessage(msg) { return [msg]; };";
182182
let (runtime, script) = runtime_with(js).await;
183183

184-
let input = Message::new("te/main/device///m/", "hello world").sent_now();
184+
let input = Message::new("te/main/device///m/", "hello world");
185185
let output = input.clone();
186186
assert_eq!(
187187
script
@@ -197,7 +197,7 @@ mod tests {
197197
let js = "export function onMessage(msg) { return msg; };";
198198
let (runtime, script) = runtime_with(js).await;
199199

200-
let input = Message::new("te/main/device///m/", "hello world").sent_now();
200+
let input = Message::new("te/main/device///m/", "hello world");
201201
let output = input.clone();
202202
assert_eq!(
203203
script
@@ -314,6 +314,37 @@ export async function onMessage(message, config) {
314314
);
315315
}
316316

317+
#[tokio::test]
318+
async fn using_date() {
319+
let js = r#"
320+
export function onMessage(message, config) {
321+
let time = message.timestamp;
322+
return {
323+
"topic": message.topic,
324+
"payload": JSON.stringify({
325+
"milliseconds": time.getTime(),
326+
"date": time.toUTCString(),
327+
})
328+
}
329+
}
330+
"#;
331+
let (runtime, script) = runtime_with(js).await;
332+
333+
let datetime = DateTime {
334+
seconds: 1763050414,
335+
nanoseconds: 0,
336+
};
337+
let input = Message::new("clock", "");
338+
let output = Message::new(
339+
"clock",
340+
r#"{"milliseconds":1763050414000,"date":"Thu, 13 Nov 2025 16:13:34 GMT"}"#.to_string(),
341+
);
342+
assert_eq!(
343+
script.on_message(&runtime, datetime, &input).await.unwrap(),
344+
vec![output]
345+
);
346+
}
347+
317348
#[tokio::test]
318349
async fn using_unknown_function() {
319350
let js = r#"

crates/extensions/tedge_flows/src/js_value.rs

Lines changed: 13 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,18 @@ use rquickjs::Ctx;
55
use rquickjs::FromJs;
66
use rquickjs::IntoJs;
77
use rquickjs::Value;
8+
use serde_json::json;
89
use std::collections::BTreeMap;
910

10-
/// Akin to serde_json::Value with an extra case for binary data
11-
#[derive(Clone, Debug)]
11+
/// Akin to serde_json::Value with extra cases for date and binary data
12+
#[derive(Clone, Debug, Eq, PartialEq)]
1213
pub enum JsonValue {
1314
Null,
1415
Bool(bool),
1516
Number(serde_json::Number),
1617
String(String),
1718
Bytes(Vec<u8>), // <= This case motivates the use of JsonValue vs serde_json::Value
19+
DateTime(DateTime),
1820
Array(Vec<JsonValue>),
1921
Object(BTreeMap<String, JsonValue>),
2022
}
@@ -30,10 +32,6 @@ impl JsonValue {
3032
JsonValue::String(value.to_string())
3133
}
3234

33-
fn number(value: impl Into<serde_json::Number>) -> Self {
34-
JsonValue::Number(value.into())
35-
}
36-
3735
fn option(value: Option<impl Into<JsonValue>>) -> Self {
3836
value.map(|v| v.into()).unwrap_or(JsonValue::Null)
3937
}
@@ -69,10 +67,7 @@ impl From<Message> for JsonValue {
6967

7068
impl From<DateTime> for JsonValue {
7169
fn from(value: DateTime) -> Self {
72-
JsonValue::object([
73-
("seconds", JsonValue::number(value.seconds)),
74-
("nanoseconds", JsonValue::number(value.nanoseconds)),
75-
])
70+
JsonValue::DateTime(value)
7671
}
7772
}
7873

@@ -101,6 +96,7 @@ impl From<JsonValue> for serde_json::Value {
10196
JsonValue::Number(n) => serde_json::Value::Number(n),
10297
JsonValue::String(s) => serde_json::Value::String(s),
10398
JsonValue::Bytes(b) => serde_json::Value::String(format!("0x {b:?}")),
99+
JsonValue::DateTime(t) => json!({ "seconds": t.seconds, "nanos": t.nanoseconds }),
104100
JsonValue::Array(a) => {
105101
serde_json::Value::Array(a.into_iter().map(serde_json::Value::from).collect())
106102
}
@@ -131,15 +127,11 @@ impl TryFrom<BTreeMap<String, JsonValue>> for Message {
131127
.into())
132128
}
133129
};
134-
let timestamp = value
135-
.get("timestamp")
136-
.map(|t| DateTime::try_from(t.clone()))
137-
.transpose()?;
138130

139131
Ok(Message {
140132
topic: topic.to_owned(),
141133
payload,
142-
timestamp,
134+
timestamp: None,
143135
})
144136
}
145137
}
@@ -157,37 +149,6 @@ impl TryFrom<JsonValue> for Message {
157149
}
158150
}
159151

160-
impl TryFrom<JsonValue> for DateTime {
161-
type Error = FlowError;
162-
163-
fn try_from(value: JsonValue) -> Result<Self, Self::Error> {
164-
let JsonValue::Object(object) = value else {
165-
return Err(
166-
anyhow::anyhow!("Expect a timestamp object with seconds and nanoseconds").into(),
167-
);
168-
};
169-
DateTime::try_from(object)
170-
}
171-
}
172-
173-
impl TryFrom<BTreeMap<String, JsonValue>> for DateTime {
174-
type Error = FlowError;
175-
176-
fn try_from(value: BTreeMap<String, JsonValue>) -> Result<Self, Self::Error> {
177-
let Some(JsonValue::Number(seconds)) = value.get("seconds") else {
178-
return Err(anyhow::anyhow!("Missing timestamp seconds").into());
179-
};
180-
let Some(JsonValue::Number(nanoseconds)) = value.get("nanoseconds") else {
181-
return Err(anyhow::anyhow!("Missing timestamp nanoseconds").into());
182-
};
183-
184-
Ok(DateTime {
185-
seconds: seconds.as_u64().unwrap_or_default(),
186-
nanoseconds: nanoseconds.as_u64().unwrap_or_default() as u32,
187-
})
188-
}
189-
}
190-
191152
impl TryFrom<JsonValue> for Vec<Message> {
192153
type Error = FlowError;
193154

@@ -242,6 +203,12 @@ impl<'js> IntoJs<'js> for JsonValueRef<'_> {
242203
let bytes = rquickjs::TypedArray::new(ctx.clone(), value.clone())?;
243204
Ok(bytes.into_value())
244205
}
206+
JsonValue::DateTime(value) => {
207+
let seconds = value.seconds;
208+
let milliseconds = value.nanoseconds / 1_000_000;
209+
let time: Value<'js> = ctx.eval(format!("new Date({seconds}{milliseconds:03})"))?;
210+
Ok(time)
211+
}
245212
JsonValue::Array(values) => {
246213
let array = rquickjs::Array::new(ctx.clone())?;
247214
for (i, value) in values.iter().enumerate() {

docs/src/references/mappers/flows.md

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -79,12 +79,7 @@ type Message = {
7979
topic: string,
8080
payload: string,
8181
raw_payload: Uint8Array,
82-
timestamp: Timestamp
83-
}
84-
85-
type Timestamp = {
86-
seconds: number,
87-
nanoseconds: number
82+
timestamp: Date
8883
}
8984
```
9085
@@ -93,7 +88,7 @@ These values are configured by the flow and can be dynamically updated on recept
9388
9489
The `onMessage` function is called for each message to be transformed
9590
- The arguments passed to the function are:
96-
- The message `{ topic: string, payload: string, raw_payload: Uint8Array, timestamp: { seconds: u64, nanoseconds: u32 } }`
91+
- The message `{ topic: string, payload: string, raw_payload: Uint8Array, timestamp: Date }`
9792
- The config as read from the flow config or updated by the script
9893
- The function is expected to return zero, one or many transformed messages `[{ topic: string, payload: string }]`
9994
- An exception can be thrown if the input message cannot be transformed.

0 commit comments

Comments
 (0)