Skip to content

Commit 3ffad4a

Browse files
committed
Rename time Message and onInterval timestamp
Signed-off-by: Didier Wenzek <[email protected]>
1 parent c4aa07f commit 3ffad4a

File tree

8 files changed

+21
-25
lines changed

8 files changed

+21
-25
lines changed

crates/extensions/tedge_flows/src/js_script.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -319,7 +319,7 @@ export async function onMessage(message, config) {
319319
async fn using_date() {
320320
let js = r#"
321321
export function onMessage(message, config) {
322-
let time = message.timestamp;
322+
let time = message.time;
323323
return {
324324
"topic": message.topic,
325325
"payload": JSON.stringify({

crates/extensions/tedge_flows/src/js_value.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ impl From<Message> for JsonValue {
6161
("topic", JsonValue::string(value.topic)),
6262
("payload", payload),
6363
("raw_payload", raw_payload),
64-
("timestamp", JsonValue::option(value.timestamp)),
64+
("time", JsonValue::option(value.timestamp)),
6565
])
6666
}
6767
}

docs/src/references/mappers/flows.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,22 +64,22 @@ interface FlowStep {
6464
onMessage(message: Message, config: object): null | Message | Message[],
6565

6666
// called at regular intervals to produce aggregated messages
67-
onInterval(timestamp: Timestamp, config: object): null | Message | Message[],
67+
onInterval(time: Date, config: object): null | Message | Message[],
6868

6969
// update the step config given a config update message
7070
onConfigUpdate(message: Message, config: object): object
7171
}
7272
```
7373

74-
A message contains the message topic and payload as well as an ingestion timestamp.
74+
A message contains the message topic and payload as well as a processing timestamp.
7575
The bytes of the raw message payload are also accessible as an array of unsigned bytes:
7676

7777
```ts
7878
type Message = {
7979
topic: string,
8080
payload: string,
8181
raw_payload: Uint8Array,
82-
timestamp: Date
82+
time: Date
8383
}
8484
```
8585
@@ -88,7 +88,7 @@ These values are configured by the flow and can be dynamically updated on recept
8888
8989
The `onMessage` function is called for each message to be transformed
9090
- The arguments passed to the function are:
91-
- The message `{ topic: string, payload: string, raw_payload: Uint8Array, timestamp: Date }`
91+
- The message `{ topic: string, payload: string, raw_payload: Uint8Array, time: Date }`
9292
- The config as read from the flow config or updated by the script
9393
- The function is expected to return zero, one or many transformed messages `[{ topic: string, payload: string }]`
9494
- An exception can be thrown if the input message cannot be transformed.

tests/RobotFramework/tests/tedge_flows/flows/add_timestamp.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
export function onMessage (message) {
22
let payload = JSON.parse(message.payload)
33
if (!payload.time) {
4-
let timestamp = message.timestamp
5-
payload.time = timestamp.getTime() / 1000;
4+
let time = message.time
5+
payload.time = time.getTime() / 1000;
66
}
77

88
return [{

tests/RobotFramework/tests/tedge_flows/flows/circuit-breaker.js

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ class State {
1414
static batch = [0]
1515
}
1616

17-
1817
export function onMessage (message, config) {
1918
State.total += 1
2019
State.batch[0] += 1
@@ -45,8 +44,7 @@ export function onMessage (message, config) {
4544
}
4645
}
4746

48-
49-
export function onInterval(timestamp, config) {
47+
export function onInterval(time, config) {
5048
let max_batch_count = config.tick_count || 10
5149
let new_batch_count = State.batch.unshift(0)
5250
if (new_batch_count > max_batch_count) {
@@ -61,5 +59,4 @@ export function onInterval(timestamp, config) {
6159
} else {
6260
return []
6361
}
64-
6562
}

tests/RobotFramework/tests/tedge_flows/flows/count-messages.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ export function onMessage (message) {
1111
return []
1212
}
1313

14-
export function onInterval(timestamp, config) {
14+
export function onInterval(time, config) {
1515
let message = {
1616
topic: config.topic || "te/error",
1717
payload: JSON.stringify(State.count_per_topic)
Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,22 @@
11
// Reject any message that is too old, too new or with no timestamp
22
export function onMessage (message, config) {
33
let payload = JSON.parse(message.payload)
4-
let msg_time = payload.time
5-
if (!msg_time) {
4+
if (!payload.time) {
65
return []
76
}
87

9-
let msg_timestamp = msg_time
8+
let msg_time = payload.time
109
if (typeof(msg_time) === "string") {
11-
msg_timestamp = Date.parse(msg_time) / 1e3
10+
msg_time = Date.parse(msg_time) / 1e3
1211
}
1312

14-
let timestamp = message.timestamp
13+
let time = message.time
1514
let max = time + (config.max_advance || 1);
1615
let min = time - (config.max_delay || 10);
1716

18-
if (min <= msg_timestamp && msg_timestamp <= max) {
17+
if (min <= msg_time && msg_time <= max) {
1918
return [message]
2019
} else {
21-
return [{"topic":" te/error", "payload":`straggler rejected on ${message.topic} with time=${msg_timestamp} at ${time}`}]
20+
return [{"topic":" te/error", "payload":`straggler rejected on ${message.topic} with time=${msg_time} at ${time}`}]
2221
}
2322
}

tests/RobotFramework/tests/tedge_flows/flows/get-localtime.js

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
export function onMessage (message) {
22
let payload = JSON.parse(message.payload)
3-
let timestamp = message.timestamp
4-
let tzOffset = timestamp.getTimezoneOffset() * 60000;
3+
let time = message.time
4+
let tzOffset = time.getTimezoneOffset() * 60000;
55

6-
payload.time = timestamp.toString();
7-
payload.utc = timestamp.toISOString();
8-
payload.local = (new Date(timestamp.getTime() - tzOffset)).toISOString().slice(0, -1);
6+
payload.time = time.toString();
7+
payload.utc = time.toISOString();
8+
payload.local = (new Date(time.getTime() - tzOffset)).toISOString().slice(0, -1);
99

1010
return {
1111
topic: message.topic,

0 commit comments

Comments
 (0)