Skip to content

Commit 9407fd8

Browse files
committed
Extract flow loading logic into FlowRegistry
Signed-off-by: Didier Wenzek <[email protected]>
1 parent 2372a33 commit 9407fd8

File tree

6 files changed

+285
-224
lines changed

6 files changed

+285
-224
lines changed

crates/core/tedge/src/cli/flows/cli.rs

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -111,23 +111,26 @@ impl TEdgeFlowsCli {
111111
}
112112

113113
pub async fn load_flows(flows_dir: &Utf8PathBuf) -> Result<MessageProcessor, Error> {
114-
MessageProcessor::try_new(flows_dir)
114+
let mut processor = MessageProcessor::try_new(flows_dir)
115115
.await
116-
.with_context(|| format!("loading flows and steps from {flows_dir}"))
116+
.with_context(|| format!("loading flows and steps from {flows_dir}"))?;
117+
processor.load_all_flows().await;
118+
Ok(processor)
117119
}
118120

119121
pub async fn load_file(
120122
flows_dir: &Utf8PathBuf,
121123
path: &Utf8PathBuf,
122124
) -> Result<MessageProcessor, Error> {
125+
let mut processor = MessageProcessor::try_new(flows_dir)
126+
.await
127+
.with_context(|| format!("loading flow {path}"))?;
128+
123129
if let Some("toml") = path.extension() {
124-
MessageProcessor::try_new_single_flow(flows_dir, path)
125-
.await
126-
.with_context(|| format!("loading flow {path}"))
130+
processor.load_single_flow(path).await;
127131
} else {
128-
MessageProcessor::try_new_single_step_flow(flows_dir, path)
129-
.await
130-
.with_context(|| format!("loading flow script {path}"))
132+
processor.load_single_script(path).await;
131133
}
134+
Ok(processor)
132135
}
133136
}

crates/core/tedge/src/cli/flows/list.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,20 +22,21 @@ impl Command for ListCommand {
2222

2323
match &self.topic {
2424
Some(topic) => processor
25-
.flows
26-
.iter()
27-
.filter(|(_, flow)| flow.topics().accept_topic_name(topic))
25+
.registry
26+
.flows()
27+
.filter(|flow| flow.topics().accept_topic_name(topic))
2828
.for_each(Self::display),
2929

30-
None => processor.flows.iter().for_each(Self::display),
30+
None => processor.registry.flows().for_each(Self::display),
3131
}
3232

3333
Ok(())
3434
}
3535
}
3636

3737
impl ListCommand {
38-
fn display((flow_id, flow): (&String, &Flow)) {
38+
fn display(flow: &Flow) {
39+
let flow_id = flow.name();
3940
println!("{flow_id}");
4041
for step in flow.steps.iter() {
4142
println!("\t{}", step.script.path);

crates/extensions/tedge_flows/src/actor.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ impl FlowsMapper {
151151
fn update_watched_commands(&mut self) -> Vec<WatchRequest> {
152152
let mut watch_requests = Vec::new();
153153
let mut new_watched_commands = HashSet::new();
154-
for flow in self.processor.flows.values() {
154+
for flow in self.processor.registry.flows() {
155155
let topic = flow.name();
156156
let Some(request) = flow.watch_request() else {
157157
continue;
@@ -270,7 +270,7 @@ impl FlowsMapper {
270270
flow_name: String,
271271
line: String,
272272
) -> Result<(), RuntimeError> {
273-
if let Some(flow) = self.processor.flows.get(&flow_name) {
273+
if let Some(flow) = self.processor.registry.get(&flow_name) {
274274
let topic = flow.input.enforced_topic().unwrap_or_default();
275275
let source = SourceTag::Process {
276276
flow: flow_name.clone(),
@@ -286,7 +286,7 @@ impl FlowsMapper {
286286
flow_name: &str,
287287
error: FlowError,
288288
) -> Result<(), RuntimeError> {
289-
let Some((info, flow_error)) = self.processor.flows.get(flow_name).map(|flow| {
289+
let Some((info, flow_error)) = self.processor.registry.get(flow_name).map(|flow| {
290290
(
291291
format!("Reconnecting input: {flow_name}: {}", flow.input),
292292
flow.on_error(error),
@@ -298,7 +298,7 @@ impl FlowsMapper {
298298

299299
let Some(request) = self
300300
.processor
301-
.flows
301+
.registry
302302
.get(flow_name)
303303
.and_then(|flow| flow.watch_request())
304304
else {
@@ -315,7 +315,7 @@ impl FlowsMapper {
315315
}
316316

317317
async fn on_process_eos(&mut self, flow_name: &str) -> Result<(), RuntimeError> {
318-
if let Some(flow) = self.processor.flows.get(flow_name) {
318+
if let Some(flow) = self.processor.registry.get(flow_name) {
319319
if let Some(request) = flow.watch_request() {
320320
info!(target: "flows", "Reconnecting input: {flow_name}: {}", flow.input);
321321
self.watch_request_sender.send(request).await?

crates/extensions/tedge_flows/src/lib.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ mod js_lib;
66
mod js_runtime;
77
mod js_script;
88
mod js_value;
9+
mod registry;
910
mod runtime;
1011
mod stats;
1112

@@ -51,10 +52,13 @@ pub struct FlowsMapperBuilder {
5152

5253
impl FlowsMapperBuilder {
5354
pub async fn try_new(config_dir: impl AsRef<Utf8Path>) -> Result<Self, LoadError> {
54-
let processor = MessageProcessor::try_new(config_dir).await?;
55+
let mut processor = MessageProcessor::try_new(config_dir).await?;
5556
let message_box = SimpleMessageBoxBuilder::new("TedgeFlows", 16);
5657
let mqtt_sender = NullSender.into();
5758
let watch_request_sender = NullSender.into();
59+
60+
processor.load_all_flows().await;
61+
5862
Ok(FlowsMapperBuilder {
5963
message_box,
6064
mqtt_sender,
@@ -81,7 +85,7 @@ impl FlowsMapperBuilder {
8185

8286
pub fn connect_fs(&mut self, fs: &mut impl MessageSource<FsWatchEvent, PathBuf>) {
8387
fs.connect_mapped_sink(
84-
self.processor.config_dir.clone().into(),
88+
self.processor.registry.config_dir().into(),
8589
&self.message_box,
8690
|msg| Some(InputMessage::FsWatchEvent(msg)),
8791
);
Lines changed: 226 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,226 @@
1+
use crate::config::FlowConfig;
2+
use crate::flow::Flow;
3+
use crate::js_runtime::JsRuntime;
4+
use crate::LoadError;
5+
use camino::Utf8Path;
6+
use camino::Utf8PathBuf;
7+
use std::collections::HashMap;
8+
use std::path::Path;
9+
use tokio::fs::read_dir;
10+
use tokio::fs::read_to_string;
11+
use tracing::error;
12+
use tracing::info;
13+
use tracing::warn;
14+
15+
pub struct FlowRegistry {
16+
config_dir: Utf8PathBuf,
17+
flows: HashMap<String, Flow>,
18+
}
19+
20+
impl FlowRegistry {
21+
pub fn new(config_dir: impl AsRef<Utf8Path>) -> Self {
22+
FlowRegistry {
23+
config_dir: config_dir.as_ref().to_owned(),
24+
flows: HashMap::new(),
25+
}
26+
}
27+
28+
pub async fn load_all_flows(&mut self, js_runtime: &mut JsRuntime) {
29+
let mut flow_specs = FlowSpecs::default();
30+
flow_specs.load(&self.config_dir).await;
31+
self.flows = flow_specs.compile(js_runtime, &self.config_dir).await
32+
}
33+
34+
pub async fn load_single_flow(&mut self, js_runtime: &mut JsRuntime, flow: impl AsRef<Path>) {
35+
let mut flow_specs = FlowSpecs::default();
36+
flow_specs.load_single_flow(flow.as_ref()).await;
37+
self.flows = flow_specs.compile(js_runtime, &self.config_dir).await
38+
}
39+
40+
pub async fn load_single_script(
41+
&mut self,
42+
js_runtime: &mut JsRuntime,
43+
script: impl AsRef<Path>,
44+
) {
45+
let mut flow_specs = FlowSpecs::default();
46+
flow_specs.load_single_script(script.as_ref()).await;
47+
self.flows = flow_specs.compile(js_runtime, &self.config_dir).await
48+
}
49+
50+
pub fn config_dir(&self) -> &Utf8Path {
51+
&self.config_dir
52+
}
53+
54+
pub fn get(&self, name: &str) -> Option<&Flow> {
55+
self.flows.get(name)
56+
}
57+
58+
pub fn flows(&self) -> impl Iterator<Item = &Flow> {
59+
self.flows.values()
60+
}
61+
62+
pub fn flows_mut(&mut self) -> impl Iterator<Item = &mut Flow> {
63+
self.flows.values_mut()
64+
}
65+
66+
pub async fn reload_script(&mut self, js_runtime: &mut JsRuntime, path: Utf8PathBuf) {
67+
for flow in self.flows.values_mut() {
68+
for step in &mut flow.steps {
69+
if step.script.path() == path {
70+
match js_runtime.load_script(&mut step.script).await {
71+
Ok(()) => {
72+
info!(target: "flows", "Reloading flow script {path}");
73+
step.script.init_next_execution();
74+
}
75+
Err(e) => {
76+
error!(target: "flows", "Failed to reload flow script {path}: {e}");
77+
return;
78+
}
79+
}
80+
}
81+
}
82+
}
83+
}
84+
85+
pub async fn remove_script(&mut self, path: Utf8PathBuf) {
86+
for (flow_id, flow) in self.flows.iter() {
87+
for step in flow.steps.iter() {
88+
if step.script.path() == path {
89+
warn!(target: "flows", "Removing a script used by a flow {flow_id}: {path}");
90+
return;
91+
}
92+
}
93+
}
94+
}
95+
96+
async fn load_flow(
97+
&mut self,
98+
js_runtime: &mut JsRuntime,
99+
flow_id: String,
100+
path: Utf8PathBuf,
101+
) -> bool {
102+
let Ok(source) = tokio::fs::read_to_string(&path).await else {
103+
self.remove_flow(path).await;
104+
return false;
105+
};
106+
let config: FlowConfig = match toml::from_str(&source) {
107+
Ok(config) => config,
108+
Err(e) => {
109+
error!(target: "flows", "Failed to parse toml for flow {path}: {e}");
110+
return false;
111+
}
112+
};
113+
match config
114+
.compile(js_runtime, &self.config_dir, path.clone())
115+
.await
116+
{
117+
Ok(flow) => {
118+
self.flows.insert(flow_id, flow);
119+
true
120+
}
121+
Err(e) => {
122+
error!(target: "flows", "Failed to compile flow {path}: {e}");
123+
false
124+
}
125+
}
126+
}
127+
128+
pub fn flow_id(path: impl AsRef<Path>) -> String {
129+
format!("{}", path.as_ref().display())
130+
}
131+
132+
pub async fn add_flow(&mut self, js_runtime: &mut JsRuntime, path: Utf8PathBuf) {
133+
let flow_id = Self::flow_id(&path);
134+
if self.load_flow(js_runtime, flow_id, path.clone()).await {
135+
info!(target: "flows", "Loading flow {path}");
136+
}
137+
}
138+
139+
pub async fn remove_flow(&mut self, path: Utf8PathBuf) {
140+
let flow_id = Self::flow_id(&path);
141+
self.flows.remove(&flow_id);
142+
info!(target: "flows", "Removing flow {path}");
143+
}
144+
}
145+
146+
#[derive(Default)]
147+
struct FlowSpecs {
148+
flow_specs: HashMap<String, (Utf8PathBuf, FlowConfig)>,
149+
}
150+
151+
impl FlowSpecs {
152+
pub async fn load(&mut self, config_dir: &Utf8Path) {
153+
let Ok(mut entries) = read_dir(config_dir).await.map_err(
154+
|err| error!(target: "flows", "Failed to read flows from {config_dir}: {err}"),
155+
) else {
156+
return;
157+
};
158+
159+
while let Ok(Some(entry)) = entries.next_entry().await {
160+
let Some(path) = Utf8Path::from_path(&entry.path()).map(|p| p.to_path_buf()) else {
161+
error!(target: "flows", "Skipping non UTF8 path: {}", entry.path().display());
162+
continue;
163+
};
164+
if let Ok(file_type) = entry.file_type().await {
165+
if file_type.is_file() {
166+
if let Some("toml") = path.extension() {
167+
info!(target: "flows", "Loading flow: {path}");
168+
if let Err(err) = self.load_flow(path).await {
169+
error!(target: "flows", "Failed to load flow: {err}");
170+
}
171+
}
172+
}
173+
}
174+
}
175+
}
176+
177+
pub async fn load_single_flow(&mut self, flow: &Path) {
178+
let Some(path) = Utf8Path::from_path(flow).map(|p| p.to_path_buf()) else {
179+
error!(target: "flows", "Skipping non UTF8 path: {}", flow.display());
180+
return;
181+
};
182+
if let Err(err) = self.load_flow(&path).await {
183+
error!(target: "flows", "Failed to load flow {path}: {err}");
184+
}
185+
}
186+
187+
pub async fn load_single_script(&mut self, script: impl AsRef<Path>) {
188+
let script = script.as_ref();
189+
let Some(path) = Utf8Path::from_path(script).map(|p| p.to_path_buf()) else {
190+
error!(target: "flows", "Skipping non UTF8 path: {}", script.display());
191+
return;
192+
};
193+
let flow_id = FlowRegistry::flow_id(&path);
194+
let flow = FlowConfig::from_step(path.to_owned());
195+
self.flow_specs.insert(flow_id, (path.to_owned(), flow));
196+
}
197+
198+
async fn load_flow(&mut self, file: impl AsRef<Utf8Path>) -> Result<(), LoadError> {
199+
let path = file.as_ref();
200+
let flow_id = FlowRegistry::flow_id(path);
201+
let specs = read_to_string(path).await?;
202+
let flow: FlowConfig = toml::from_str(&specs)?;
203+
self.flow_specs.insert(flow_id, (path.to_owned(), flow));
204+
205+
Ok(())
206+
}
207+
208+
async fn compile(
209+
mut self,
210+
js_runtime: &mut JsRuntime,
211+
config_dir: &Utf8Path,
212+
) -> HashMap<String, Flow> {
213+
let mut flows = HashMap::new();
214+
for (name, (source, specs)) in self.flow_specs.drain() {
215+
match specs.compile(js_runtime, config_dir, source).await {
216+
Ok(flow) => {
217+
let _ = flows.insert(name, flow);
218+
}
219+
Err(err) => {
220+
error!(target: "flows", "Failed to compile flow {name}: {err}")
221+
}
222+
}
223+
}
224+
flows
225+
}
226+
}

0 commit comments

Comments
 (0)