Skip to content

Commit 9fbca1c

Browse files
authored
Merge pull request #3799 from didier-wenzek/refactor/flows-paths
refactor: use more specific types
2 parents ad5870a + a3ca6bb commit 9fbca1c

File tree

10 files changed

+77
-74
lines changed

10 files changed

+77
-74
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/core/tedge/src/cli/flows/cli.rs

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use crate::ConfigError;
66
use anyhow::anyhow;
77
use anyhow::Context;
88
use anyhow::Error;
9-
use std::path::PathBuf;
9+
use camino::Utf8PathBuf;
1010
use tedge_config::TEdgeConfig;
1111
use tedge_flows::flow::Message;
1212
use tedge_flows::MessageProcessor;
@@ -19,7 +19,7 @@ pub enum TEdgeFlowsCli {
1919
///
2020
/// Default to /etc/tedge/flows
2121
#[clap(long)]
22-
flows_dir: Option<PathBuf>,
22+
flows_dir: Option<Utf8PathBuf>,
2323

2424
/// List flows processing messages published on this topic
2525
///
@@ -34,13 +34,13 @@ pub enum TEdgeFlowsCli {
3434
///
3535
/// Default to /etc/tedge/flows
3636
#[clap(long)]
37-
flows_dir: Option<PathBuf>,
37+
flows_dir: Option<Utf8PathBuf>,
3838

3939
/// Path to the flow step script or TOML flow definition
4040
///
4141
/// If none is provided, applies all the matching flows
4242
#[clap(long)]
43-
flow: Option<PathBuf>,
43+
flow: Option<Utf8PathBuf>,
4444

4545
/// Trigger onInterval after all the message samples
4646
#[clap(long = "final-on-interval")]
@@ -106,25 +106,28 @@ impl BuildCommand for TEdgeFlowsCli {
106106
}
107107

108108
impl TEdgeFlowsCli {
109-
fn default_flows_dir(config: &TEdgeConfig) -> PathBuf {
110-
config.root_dir().join("flows").into()
109+
fn default_flows_dir(config: &TEdgeConfig) -> Utf8PathBuf {
110+
config.root_dir().join("flows")
111111
}
112112

113-
pub async fn load_flows(flows_dir: &PathBuf) -> Result<MessageProcessor, Error> {
113+
pub async fn load_flows(flows_dir: &Utf8PathBuf) -> Result<MessageProcessor, Error> {
114114
MessageProcessor::try_new(flows_dir)
115115
.await
116-
.with_context(|| format!("loading flows and steps from {}", flows_dir.display()))
116+
.with_context(|| format!("loading flows and steps from {flows_dir}"))
117117
}
118118

119-
pub async fn load_file(flows_dir: &PathBuf, path: &PathBuf) -> Result<MessageProcessor, Error> {
120-
if let Some("toml") = path.extension().and_then(|s| s.to_str()) {
119+
pub async fn load_file(
120+
flows_dir: &Utf8PathBuf,
121+
path: &Utf8PathBuf,
122+
) -> Result<MessageProcessor, Error> {
123+
if let Some("toml") = path.extension() {
121124
MessageProcessor::try_new_single_flow(flows_dir, path)
122125
.await
123-
.with_context(|| format!("loading flow {flow}", flow = path.display()))
126+
.with_context(|| format!("loading flow {path}"))
124127
} else {
125128
MessageProcessor::try_new_single_step_flow(flows_dir, path)
126129
.await
127-
.with_context(|| format!("loading flow script {script}", script = path.display()))
130+
.with_context(|| format!("loading flow script {path}"))
128131
}
129132
}
130133
}

crates/core/tedge/src/cli/flows/list.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,19 +2,19 @@ use crate::cli::flows::TEdgeFlowsCli;
22
use crate::command::Command;
33
use crate::log::MaybeFancy;
44
use anyhow::Error;
5-
use std::path::PathBuf;
5+
use camino::Utf8PathBuf;
66
use tedge_config::TEdgeConfig;
77
use tedge_flows::flow::Flow;
88

99
pub struct ListCommand {
10-
pub flows_dir: PathBuf,
10+
pub flows_dir: Utf8PathBuf,
1111
pub topic: Option<String>,
1212
}
1313

1414
#[async_trait::async_trait]
1515
impl Command for ListCommand {
1616
fn description(&self) -> String {
17-
format!("list flows and flow steps in {}", self.flows_dir.display())
17+
format!("list flows and flow steps in {}", self.flows_dir)
1818
}
1919

2020
async fn execute(&self, _config: TEdgeConfig) -> Result<(), MaybeFancy<Error>> {
@@ -38,7 +38,7 @@ impl ListCommand {
3838
fn display((flow_id, flow): (&String, &Flow)) {
3939
println!("{flow_id}");
4040
for step in flow.steps.iter() {
41-
println!("\t{}", step.script.path.display());
41+
println!("\t{}", step.script.path);
4242
}
4343
}
4444
}

crates/core/tedge/src/cli/flows/test.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use crate::log::MaybeFancy;
44
use anyhow::Error;
55
use base64::prelude::BASE64_STANDARD;
66
use base64::prelude::*;
7-
use std::path::PathBuf;
7+
use camino::Utf8PathBuf;
88
use tedge_config::TEdgeConfig;
99
use tedge_flows::flow::*;
1010
use tedge_flows::MessageProcessor;
@@ -13,8 +13,8 @@ use tokio::io::BufReader;
1313
use tokio::io::Stdin;
1414

1515
pub struct TestCommand {
16-
pub flows_dir: PathBuf,
17-
pub flow: Option<PathBuf>,
16+
pub flows_dir: Utf8PathBuf,
17+
pub flow: Option<Utf8PathBuf>,
1818
pub message: Option<Message>,
1919
pub final_on_interval: bool,
2020
pub base64_input: bool,
@@ -25,8 +25,8 @@ pub struct TestCommand {
2525
impl Command for TestCommand {
2626
fn description(&self) -> String {
2727
format!(
28-
"process message samples using flows and steps in {:}",
29-
self.flows_dir.display()
28+
"process message samples using flows and steps in {}",
29+
self.flows_dir
3030
)
3131
}
3232

crates/extensions/tedge_flows/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,5 +32,8 @@ tokio = { workspace = true, features = ["fs", "macros", "time", "sync"] }
3232
toml = { workspace = true, features = ["parse"] }
3333
tracing = { workspace = true }
3434

35+
[dev-dependencies]
36+
tempfile = { workspace = true }
37+
3538
[lints]
3639
workspace = true

crates/extensions/tedge_flows/src/config.rs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ use camino::Utf8PathBuf;
99
use serde::Deserialize;
1010
use serde_json::Value;
1111
use std::fmt::Debug;
12-
use std::path::Path;
1312
use std::time::Duration;
1413
use tedge_mqtt_ext::TopicFilter;
1514

@@ -75,7 +74,7 @@ impl FlowConfig {
7574
pub async fn compile(
7675
self,
7776
js_runtime: &mut JsRuntime,
78-
config_dir: &Path,
77+
config_dir: &Utf8Path,
7978
source: Utf8PathBuf,
8079
) -> Result<Flow, ConfigError> {
8180
let input = self.input.try_into()?;
@@ -98,18 +97,18 @@ impl FlowConfig {
9897
impl StepConfig {
9998
pub async fn compile(
10099
self,
101-
config_dir: &Path,
100+
config_dir: &Utf8Path,
102101
index: usize,
103102
flow: &Utf8Path,
104103
) -> Result<FlowStep, ConfigError> {
105104
let path = match self.script {
106-
ScriptSpec::JavaScript(path) if path.is_absolute() => path.into(),
107-
ScriptSpec::JavaScript(path) if path.starts_with(config_dir) => path.into(),
105+
ScriptSpec::JavaScript(path) if path.is_absolute() => path,
106+
ScriptSpec::JavaScript(path) if path.starts_with(config_dir) => path,
108107
ScriptSpec::JavaScript(path) => config_dir.join(path),
109108
};
110-
let script = JsScript::new(flow.to_owned().into(), index, path)
109+
let script = JsScript::new(flow.to_owned(), index, path)
111110
.with_config(self.config)
112-
.with_interval_secs(self.interval.as_secs());
111+
.with_interval(self.interval);
113112
let config_topics = topic_filters(self.meta_topics)?;
114113
Ok(FlowStep {
115114
script,

crates/extensions/tedge_flows/src/flow.rs

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -161,21 +161,21 @@ impl FlowStep {
161161
pub(crate) fn check(&self, flow: &Utf8Path) {
162162
let script = &self.script;
163163
if script.no_js_on_message_fun {
164-
warn!(target: "flows", "Flow script with no 'onMessage' function: {}", script.path.display());
164+
warn!(target: "flows", "Flow script with no 'onMessage' function: {}", script.path);
165165
}
166166
if script.no_js_on_config_update_fun && !self.config_topics.is_empty() {
167-
warn!(target: "flows", "Flow script with no 'onConfigUpdate' function: {}; but configured with 'config_topics' in {flow}", script.path.display());
167+
warn!(target: "flows", "Flow script with no 'onConfigUpdate' function: {}; but configured with 'config_topics' in {flow}", script.path);
168168
}
169-
if script.no_js_on_interval_fun && script.interval_secs != 0 {
170-
warn!(target: "flows", "Flow script with no 'onInterval' function: {}; but configured with an 'interval' in {flow}", script.path.display());
169+
if script.no_js_on_interval_fun && !script.interval.is_zero() {
170+
warn!(target: "flows", "Flow script with no 'onInterval' function: {}; but configured with an 'interval' in {flow}", script.path);
171171
}
172172
}
173173

174174
pub(crate) fn fix(&mut self) {
175175
let script = &mut self.script;
176-
if !script.no_js_on_interval_fun && script.interval_secs == 0 {
177-
// 0 as a default is not appropriate for a script with an onInterval handler
178-
script.interval_secs = 1;
176+
if !script.no_js_on_interval_fun && script.interval.is_zero() {
177+
// Zero as a default is not appropriate for a script with an onInterval handler
178+
script.interval = std::time::Duration::from_secs(1);
179179
}
180180
}
181181
}
@@ -193,8 +193,9 @@ impl DateTime {
193193
DateTime::try_from(OffsetDateTime::now_utc()).unwrap()
194194
}
195195

196-
pub fn tick_now(&self, tick_every_seconds: u64) -> bool {
197-
tick_every_seconds != 0 && (self.seconds % tick_every_seconds == 0)
196+
pub fn tick_now(&self, tick_every: std::time::Duration) -> bool {
197+
let tick_every_secs = tick_every.as_secs();
198+
tick_every_secs != 0 && (self.seconds % tick_every_secs == 0)
198199
}
199200

200201
pub fn json(&self) -> Value {

crates/extensions/tedge_flows/src/js_script.rs

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,29 +4,29 @@ use crate::flow::FlowError;
44
use crate::flow::Message;
55
use crate::js_runtime::JsRuntime;
66
use crate::js_value::JsonValue;
7-
use std::path::Path;
8-
use std::path::PathBuf;
7+
use camino::Utf8Path;
8+
use camino::Utf8PathBuf;
99
use tracing::debug;
1010

1111
#[derive(Clone)]
1212
pub struct JsScript {
1313
pub module_name: String,
14-
pub path: PathBuf,
14+
pub path: Utf8PathBuf,
1515
pub config: JsonValue,
16-
pub interval_secs: u64,
16+
pub interval: std::time::Duration,
1717
pub no_js_on_message_fun: bool,
1818
pub no_js_on_config_update_fun: bool,
1919
pub no_js_on_interval_fun: bool,
2020
}
2121

2222
impl JsScript {
23-
pub fn new(flow: PathBuf, index: usize, path: PathBuf) -> Self {
24-
let module_name = format!("{}|{}|{}", flow.display(), index, path.display());
23+
pub fn new(flow: Utf8PathBuf, index: usize, path: Utf8PathBuf) -> Self {
24+
let module_name = format!("{flow}|{index}|{path}");
2525
JsScript {
2626
module_name,
2727
path,
2828
config: JsonValue::default(),
29-
interval_secs: 0,
29+
interval: std::time::Duration::ZERO,
3030
no_js_on_message_fun: true,
3131
no_js_on_config_update_fun: true,
3232
no_js_on_interval_fun: true,
@@ -48,19 +48,16 @@ impl JsScript {
4848
}
4949
}
5050

51-
pub fn with_interval_secs(self, interval_secs: u64) -> Self {
52-
Self {
53-
interval_secs,
54-
..self
55-
}
51+
pub fn with_interval(self, interval: std::time::Duration) -> Self {
52+
Self { interval, ..self }
5653
}
5754

58-
pub fn path(&self) -> &Path {
55+
pub fn path(&self) -> &Utf8Path {
5956
&self.path
6057
}
6158

6259
pub fn source(&self) -> String {
63-
format!("{}", self.path.display())
60+
self.path.to_string()
6461
}
6562

6663
/// Transform an input message into zero, one or more output messages
@@ -134,7 +131,7 @@ impl JsScript {
134131
if self.no_js_on_interval_fun {
135132
return Ok(vec![]);
136133
}
137-
if !timestamp.tick_now(self.interval_secs) {
134+
if !timestamp.tick_now(self.interval) {
138135
return Ok(vec![]);
139136
}
140137
debug!(target: "flows", "{}: onInterval({timestamp:?})", self.module_name());

crates/extensions/tedge_flows/src/lib.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@ mod stats;
1010

1111
use crate::actor::FlowsMapper;
1212
pub use crate::runtime::MessageProcessor;
13+
use camino::Utf8Path;
1314
use std::convert::Infallible;
14-
use std::path::Path;
1515
use std::path::PathBuf;
1616
use tedge_actors::fan_in_message_type;
1717
use tedge_actors::Builder;
@@ -39,7 +39,7 @@ pub struct FlowsMapperBuilder {
3939
}
4040

4141
impl FlowsMapperBuilder {
42-
pub async fn try_new(config_dir: impl AsRef<Path>) -> Result<Self, LoadError> {
42+
pub async fn try_new(config_dir: impl AsRef<Utf8Path>) -> Result<Self, LoadError> {
4343
let processor = MessageProcessor::try_new(config_dir).await?;
4444
Ok(FlowsMapperBuilder {
4545
message_box: SimpleMessageBoxBuilder::new("GenMapper", 16),
@@ -68,7 +68,7 @@ impl FlowsMapperBuilder {
6868

6969
pub fn connect_fs(&mut self, fs: &mut impl MessageSource<FsWatchEvent, PathBuf>) {
7070
fs.connect_mapped_sink(
71-
self.processor.config_dir.clone(),
71+
self.processor.config_dir.clone().into(),
7272
&self.message_box,
7373
|msg| Some(InputMessage::FsWatchEvent(msg)),
7474
);

0 commit comments

Comments
 (0)