Skip to content

Commit 93cc5fd

Browse files
Merge pull request #4012 from didier-wenzek/feat/configuration-shared-by-the-steps-of-a-flow
feat: The steps of a flow can share configuration values
2 parents ac1942e + f798e6d commit 93cc5fd

File tree

6 files changed

+153
-33
lines changed

6 files changed

+153
-33
lines changed

crates/extensions/c8y_mapper_ext/src/flows.rs

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -110,10 +110,12 @@ topic = "{errors_topic}"
110110
format!(
111111
r#"input.mqtt.topics = {input_topics:?}
112112
113+
config = {{ topic_root = "{topic_prefix}" }}
114+
113115
steps = [
114116
{{ builtin = "add-timestamp", config = {{ property = "time", format = "unix", reformat = false }} }},
115-
{{ builtin = "cache-early-messages", config = {{ topic_root = "{topic_prefix}" }} }},
116-
{{ builtin = "into-c8y-measurements", config = {{ topic_root = "{topic_prefix}" }} }},
117+
{{ builtin = "cache-early-messages" }},
118+
{{ builtin = "into-c8y-measurements" }},
117119
{{ builtin = "limit-payload-size", config = {{ max_size = {max_size} }} }},
118120
]
119121
@@ -141,10 +143,12 @@ topic = "{errors_topic}"
141143
format!(
142144
r#"input.mqtt.topics = {input_topics:?}
143145
146+
config = {{ topic_root = "{topic_prefix}", c8y_prefix = "{c8y_prefix}" }}
147+
144148
steps = [
145149
{{ builtin = "add-timestamp", config = {{ property = "time", format = "rfc3339", reformat = false }} }},
146-
{{ builtin = "cache-early-messages", config = {{ topic_root = "{topic_prefix}" }} }},
147-
{{ builtin = "into-c8y-events", config = {{ topic_root = "{topic_prefix}", c8y_prefix = "{c8y_prefix}", max_mqtt_payload_size = {max_mqtt_payload_size} }} }},
150+
{{ builtin = "cache-early-messages" }},
151+
{{ builtin = "into-c8y-events", config = {{ max_mqtt_payload_size = {max_mqtt_payload_size} }} }},
148152
]
149153
150154
[output.mqtt]
@@ -173,10 +177,12 @@ topic = "{errors_topic}"
173177
format!(
174178
r#"input.mqtt.topics = {input_topics:?}
175179
180+
config = {{ topic_root = "{topic_prefix}", c8y_prefix = "{c8y_prefix}" }}
181+
176182
steps = [
177183
{{ builtin = "add-timestamp", config = {{ property = "time", format = "rfc3339", reformat = false }} }},
178-
{{ builtin = "cache-early-messages", config = {{ topic_root = "{topic_prefix}" }} }},
179-
{{ builtin = "into-c8y-alarms", interval = "{alarm_interval}", config = {{ topic_root = "{topic_prefix}", c8y_prefix = "{c8y_prefix}" }} }},
184+
{{ builtin = "cache-early-messages" }},
185+
{{ builtin = "into-c8y-alarms", interval = "{alarm_interval}" }},
180186
{{ builtin = "limit-payload-size", config = {{ max_size = {max_size} }} }},
181187
]
182188
@@ -204,9 +210,11 @@ topic = "{errors_topic}"
204210
format!(
205211
r#"input.mqtt.topics = {input_topics:?}
206212
213+
config = {{ topic_root = "{topic_prefix}", c8y_prefix = "{c8y_prefix}" }}
214+
207215
steps = [
208-
{{ builtin = "cache-early-messages", config = {{ topic_root = "{topic_prefix}" }} }},
209-
{{ builtin = "into-c8y-health-status", config = {{ topic_root = "{topic_prefix}", main_device = "{main_device}", c8y_prefix = "{c8y_prefix}" }} }},
216+
{{ builtin = "cache-early-messages" }},
217+
{{ builtin = "into-c8y-health-status", config = {{ main_device = "{main_device}" }} }},
210218
]
211219
212220
[output.mqtt]

crates/extensions/tedge_flows/src/config.rs

Lines changed: 109 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use camino::Utf8Path;
1010
use camino::Utf8PathBuf;
1111
use glob::glob;
1212
use serde::Deserialize;
13+
use serde_json::Map;
1314
use serde_json::Value;
1415
use std::collections::HashMap;
1516
use std::fmt::Debug;
@@ -28,6 +29,10 @@ pub struct FlowConfig {
2829
description: Option<String>,
2930
tags: Option<Vec<String>>,
3031

32+
/// configuration shared by the steps of this flow
33+
#[serde(default)]
34+
config: Map<String, Value>,
35+
3136
input: InputConfig,
3237
#[serde(default)]
3338
steps: Vec<StepConfig>,
@@ -43,7 +48,7 @@ pub struct StepConfig {
4348
step: StepSpec,
4449

4550
#[serde(default)]
46-
config: Option<Value>,
51+
config: Map<String, Value>,
4752

4853
#[serde(default)]
4954
#[serde(deserialize_with = "parse_optional_human_duration")]
@@ -187,14 +192,15 @@ impl FlowConfig {
187192
let input_topic = "#".to_string();
188193
let step = StepConfig {
189194
step: StepSpec::JavaScript(script),
190-
config: None,
195+
config: Map::new(),
191196
interval: None,
192197
};
193198
Self {
194199
name: None,
195200
version: None,
196201
description: None,
197202
tags: None,
203+
config: Map::new(),
198204
input: InputConfig::Mqtt {
199205
topics: vec![input_topic],
200206
},
@@ -216,6 +222,8 @@ impl FlowConfig {
216222
let mut steps = vec![];
217223
for (i, step) in self.steps.into_iter().enumerate() {
218224
let step = step
225+
.with_shared_config(&self.config)
226+
.with_interval_as_config()
219227
.compile(rs_transformers, js_runtime, i, &source)
220228
.await?;
221229
steps.push(step);
@@ -238,6 +246,25 @@ impl FlowConfig {
238246
}
239247

240248
impl StepConfig {
249+
pub fn with_shared_config(mut self, shared_config: &Map<String, Value>) -> Self {
250+
for (k, v) in shared_config.iter() {
251+
if !self.config.contains_key(k) {
252+
self.config.insert(k.clone(), v.clone());
253+
}
254+
}
255+
self
256+
}
257+
258+
pub fn with_interval_as_config(mut self) -> Self {
259+
let key = "interval";
260+
let interval = self.interval.unwrap_or(Duration::from_secs(1));
261+
if !self.config.contains_key(key) {
262+
self.config
263+
.insert(key.to_string(), interval.as_secs().into());
264+
}
265+
self
266+
}
267+
241268
pub async fn compile(
242269
&self,
243270
rs_transformers: &BuiltinTransformers,
@@ -253,8 +280,13 @@ impl StepConfig {
253280
Self::instantiate_builtin(rs_transformers, flow, name, index)?
254281
}
255282
};
283+
let config = if self.config.is_empty() {
284+
None
285+
} else {
286+
Some(Value::Object(self.config.clone()))
287+
};
256288
let step = step
257-
.with_config(self.config.clone())?
289+
.with_config(config)?
258290
.with_interval(self.interval, flow.as_str());
259291
Ok(step)
260292
}
@@ -388,3 +420,77 @@ fn default_errors() -> OutputConfig {
388420
topic: Some("te/error".to_string()),
389421
}
390422
}
423+
424+
#[cfg(test)]
425+
mod tests {
426+
use super::*;
427+
use serde_json::json;
428+
429+
#[test]
430+
fn inherit_shared_config() {
431+
for (shared_config, step_config, merged_config) in [
432+
(json!({}), json!({"x": 1, "y": 2}), json!({"x": 1, "y": 2})),
433+
(
434+
json!({"z": 3}),
435+
json!({"x": 1, "y": 2}),
436+
json!({"x": 1, "y": 2, "z": 3}),
437+
),
438+
(
439+
json!({"z": 3, "x": 4}),
440+
json!({"x": 1, "y": 2}),
441+
json!({"x": 1, "y": 2, "z": 3}),
442+
),
443+
(
444+
json!({"x": 4}),
445+
json!({"x": 1, "y": 2}),
446+
json!({"x": 1, "y": 2}),
447+
),
448+
(json!({"x": 4}), json!({}), json!({"x": 4})),
449+
] {
450+
let shared_config = shared_config.as_object().unwrap();
451+
let step_config = step_config.as_object().unwrap();
452+
let merged_config = merged_config.as_object().unwrap();
453+
let step = StepConfig {
454+
step: StepSpec::Transformer("some-step".to_string()),
455+
config: step_config.clone(),
456+
interval: None,
457+
};
458+
assert_eq!(
459+
&step.with_shared_config(shared_config).config,
460+
merged_config
461+
);
462+
}
463+
}
464+
465+
#[test]
466+
fn with_interval_as_config() {
467+
for (interval, config, merged_config) in [
468+
(None, json!({}), json!({"interval": 1})),
469+
(
470+
Some(Duration::from_secs(5)),
471+
json!({}),
472+
json!({"interval": 5}),
473+
),
474+
(None, json!({"x": 42}), json!({"interval": 1, "x": 42})),
475+
(
476+
Some(Duration::from_secs(5)),
477+
json!({"x": 42}),
478+
json!({"interval": 5, "x": 42}),
479+
),
480+
(
481+
Some(Duration::from_secs(5)),
482+
json!({"interval": 33, "x": 42}),
483+
json!({"interval": 33, "x": 42}),
484+
),
485+
] {
486+
let config = config.as_object().unwrap();
487+
let merged_config = merged_config.as_object().unwrap();
488+
let step = StepConfig {
489+
step: StepSpec::Transformer("some-step".to_string()),
490+
config: config.clone(),
491+
interval,
492+
};
493+
assert_eq!(&step.with_interval_as_config().config, merged_config);
494+
}
495+
}
496+
}

crates/extensions/tedge_flows/src/js_value.rs

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -84,20 +84,18 @@ impl JsonValue {
8484
})
8585
}
8686

87-
pub fn strings_property(&self, property: &str) -> Vec<&str> {
88-
self.property(property)
89-
.map(|v| match v {
90-
JsonValue::String(string) => vec![string.as_str()],
91-
JsonValue::Array(props) => props
92-
.iter()
93-
.filter_map(|v| match v {
94-
JsonValue::String(s) => Some(s.as_str()),
95-
_ => None,
96-
})
97-
.collect(),
98-
_ => vec![],
99-
})
100-
.unwrap_or_default()
87+
pub fn strings_property(&self, property: &str) -> Option<Vec<&str>> {
88+
self.property(property).map(|v| match v {
89+
JsonValue::String(string) => vec![string.as_str()],
90+
JsonValue::Array(props) => props
91+
.iter()
92+
.filter_map(|v| match v {
93+
JsonValue::String(s) => Some(s.as_str()),
94+
_ => None,
95+
})
96+
.collect(),
97+
_ => vec![],
98+
})
10199
}
102100

103101
pub fn number_property(&self, property: &str) -> Option<&Number> {

crates/extensions/tedge_flows/src/transformers/ignore_topics.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,9 @@ impl Transformer for IgnoreTopics {
1818
}
1919

2020
fn set_config(&mut self, config: JsonValue) -> Result<(), ConfigError> {
21-
let topics = config.strings_property("topics");
22-
self.topics = crate::config::topic_filters(topics)?;
21+
if let Some(topics) = config.strings_property("topics") {
22+
self.topics = crate::config::topic_filters(topics)?;
23+
}
2324
Ok(())
2425
}
2526

crates/extensions/tedge_flows/src/transformers/update_context.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,9 @@ impl Transformer for UpdateContext {
2626
}
2727

2828
fn set_config(&mut self, config: JsonValue) -> Result<(), ConfigError> {
29-
let topics = config.strings_property("topics");
30-
self.topics = crate::config::topic_filters(topics)?;
29+
if let Some(topics) = config.strings_property("topics") {
30+
self.topics = crate::config::topic_filters(topics)?;
31+
}
3132
Ok(())
3233
}
3334

docs/src/references/mappers/flows.md

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -132,18 +132,18 @@ type Context = {
132132
script: KVStore,
133133

134134
// A value provided by the flow configuration of that step
135-
config: any,
135+
config: unknown,
136136
}
137137

138138
type KVStore = {
139139
// List the keys for which this store holds a value
140140
keys(): string[]
141141

142142
// Get the value attached to a key (returning null, if none)
143-
get(key: string): any,
143+
get(key: string): unknown,
144144

145145
// Set the value attached to a key (removing the key if the provided value is null)
146-
set(key: string, value: any),
146+
set(key: string, value: unknown),
147147

148148
// Remove any value attache to a key
149149
remove(key: string),
@@ -177,11 +177,17 @@ A flow script can also export a `onInterval` function
177177
- This can also be a TypeScript module with a `.ts` extension.
178178
- The definition of flow defines its input, output and error sink as well as a list of transformation steps.
179179
- Each step is built either from a `script` or a `builtin` transformation
180-
- A step possibly given a config (arbitrary json that will be passed to the transformation script)
180+
- A step can possibly be given a config (an arbitrary json object that will be passed to the transformation script)
181+
- Configuration values can also be defined at the flow level,
182+
these values will be used as default configuration values by all the steps.
183+
- The pace at which a step `onInterval` function is called defaults to one second,
184+
and can be configured `{ script = "my-script.js", interval = "60s", config = { .. } }`.
181185
182186
```toml
183187
input.mqtt.topics = ["te/+/+/+/+/m/+"]
184188

189+
config = { format = "rfc3339" }
190+
185191
steps = [
186192
{ builtin = "add-timestamp" },
187193
{ script = "drop_stragglers.js", config = { max_delay = 60 } },

0 commit comments

Comments
 (0)