Skip to content

Commit 9b689b2

Browse files
committed
Add trait FlowRegistry
The plan is to have two implementations - The BaseFlowRegistry which provides no specific support to poll or stream sources. - The ConnectedFlowRegistry which handles sources Signed-off-by: Didier Wenzek <[email protected]>
1 parent 9407fd8 commit 9b689b2

File tree

9 files changed

+294
-182
lines changed

9 files changed

+294
-182
lines changed

crates/core/tedge/src/cli/flows/cli.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@ use anyhow::Context;
88
use anyhow::Error;
99
use camino::Utf8PathBuf;
1010
use tedge_config::TEdgeConfig;
11-
use tedge_flows::flow::Message;
11+
use tedge_flows::BaseFlowRegistry;
12+
use tedge_flows::Message;
1213
use tedge_flows::MessageProcessor;
1314

1415
#[derive(clap::Subcommand, Debug)]
@@ -110,8 +111,10 @@ impl TEdgeFlowsCli {
110111
config.root_dir().join("flows")
111112
}
112113

113-
pub async fn load_flows(flows_dir: &Utf8PathBuf) -> Result<MessageProcessor, Error> {
114-
let mut processor = MessageProcessor::try_new(flows_dir)
114+
pub async fn load_flows(
115+
flows_dir: &Utf8PathBuf,
116+
) -> Result<MessageProcessor<BaseFlowRegistry>, Error> {
117+
let mut processor = MessageProcessor::with_base_registry(flows_dir)
115118
.await
116119
.with_context(|| format!("loading flows and steps from {flows_dir}"))?;
117120
processor.load_all_flows().await;
@@ -121,8 +124,8 @@ impl TEdgeFlowsCli {
121124
pub async fn load_file(
122125
flows_dir: &Utf8PathBuf,
123126
path: &Utf8PathBuf,
124-
) -> Result<MessageProcessor, Error> {
125-
let mut processor = MessageProcessor::try_new(flows_dir)
127+
) -> Result<MessageProcessor<BaseFlowRegistry>, Error> {
128+
let mut processor = MessageProcessor::with_base_registry(flows_dir)
126129
.await
127130
.with_context(|| format!("loading flow {path}"))?;
128131

crates/core/tedge/src/cli/flows/list.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@ use crate::log::MaybeFancy;
44
use anyhow::Error;
55
use camino::Utf8PathBuf;
66
use tedge_config::TEdgeConfig;
7-
use tedge_flows::flow::Flow;
7+
use tedge_flows::Flow;
8+
use tedge_flows::FlowRegistryExt;
89

910
pub struct ListCommand {
1011
pub flows_dir: Utf8PathBuf,

crates/core/tedge/src/cli/flows/test.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,12 @@ use base64::prelude::BASE64_STANDARD;
66
use base64::prelude::*;
77
use camino::Utf8PathBuf;
88
use tedge_config::TEdgeConfig;
9-
use tedge_flows::flow::*;
9+
use tedge_flows::BaseFlowRegistry;
10+
use tedge_flows::DateTime;
11+
use tedge_flows::FlowResult;
12+
use tedge_flows::Message;
1013
use tedge_flows::MessageProcessor;
14+
use tedge_flows::SourceTag;
1115
use tokio::io::AsyncBufReadExt;
1216
use tokio::io::BufReader;
1317
use tokio::io::Stdin;
@@ -61,7 +65,7 @@ impl Command for TestCommand {
6165
impl TestCommand {
6266
async fn process(
6367
&self,
64-
processor: &mut MessageProcessor,
68+
processor: &mut MessageProcessor<BaseFlowRegistry>,
6569
mut message: Message,
6670
timestamp: DateTime,
6771
) {
@@ -82,7 +86,12 @@ impl TestCommand {
8286
.for_each(|msg| self.print_messages(msg))
8387
}
8488

85-
async fn tick(&self, processor: &mut MessageProcessor, timestamp: DateTime, now: Instant) {
89+
async fn tick(
90+
&self,
91+
processor: &mut MessageProcessor<BaseFlowRegistry>,
92+
timestamp: DateTime,
93+
now: Instant,
94+
) {
8695
processor
8796
.on_interval(timestamp, now)
8897
.await

crates/extensions/tedge_flows/src/actor.rs

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ use crate::flow::FlowOutput;
44
use crate::flow::FlowResult;
55
use crate::flow::Message;
66
use crate::flow::SourceTag;
7+
use crate::registry::BaseFlowRegistry;
8+
use crate::registry::FlowRegistryExt;
79
use crate::runtime::MessageProcessor;
810
use crate::InputMessage;
911
use crate::Tick;
@@ -44,7 +46,7 @@ pub struct FlowsMapper {
4446
pub(super) watch_request_sender: DynSender<WatchRequest>,
4547
pub(super) subscriptions: TopicFilter,
4648
pub(super) watched_commands: HashSet<String>,
47-
pub(super) processor: MessageProcessor,
49+
pub(super) processor: MessageProcessor<BaseFlowRegistry>,
4850
pub(super) next_dump: Instant,
4951
}
5052

@@ -174,16 +176,16 @@ impl FlowsMapper {
174176
async fn notify_flows_status(&mut self) -> Result<(), RuntimeError> {
175177
let status = "enabled";
176178
let now = OffsetDateTime::now_utc();
177-
for flow in self.processor.flows.keys() {
178-
let status = Self::flow_status(flow, status, &now);
179+
for flow in self.processor.registry.flows() {
180+
let status = Self::flow_status(flow.name(), status, &now);
179181
self.mqtt_sender.send(status).await?;
180182
}
181183
Ok(())
182184
}
183185

184186
async fn update_flow_status(&mut self, flow: &str) -> Result<(), RuntimeError> {
185187
let now = OffsetDateTime::now_utc();
186-
let status = if self.processor.flows.contains_key(flow) {
188+
let status = if self.processor.registry.contains_flow(flow) {
187189
"updated"
188190
} else {
189191
"removed"
@@ -270,7 +272,7 @@ impl FlowsMapper {
270272
flow_name: String,
271273
line: String,
272274
) -> Result<(), RuntimeError> {
273-
if let Some(flow) = self.processor.registry.get(&flow_name) {
275+
if let Some(flow) = self.processor.registry.flow(&flow_name) {
274276
let topic = flow.input.enforced_topic().unwrap_or_default();
275277
let source = SourceTag::Process {
276278
flow: flow_name.clone(),
@@ -286,7 +288,7 @@ impl FlowsMapper {
286288
flow_name: &str,
287289
error: FlowError,
288290
) -> Result<(), RuntimeError> {
289-
let Some((info, flow_error)) = self.processor.registry.get(flow_name).map(|flow| {
291+
let Some((info, flow_error)) = self.processor.registry.flow(flow_name).map(|flow| {
290292
(
291293
format!("Reconnecting input: {flow_name}: {}", flow.input),
292294
flow.on_error(error),
@@ -299,7 +301,7 @@ impl FlowsMapper {
299301
let Some(request) = self
300302
.processor
301303
.registry
302-
.get(flow_name)
304+
.flow(flow_name)
303305
.and_then(|flow| flow.watch_request())
304306
else {
305307
return Ok(());
@@ -315,7 +317,7 @@ impl FlowsMapper {
315317
}
316318

317319
async fn on_process_eos(&mut self, flow_name: &str) -> Result<(), RuntimeError> {
318-
if let Some(flow) = self.processor.registry.get(flow_name) {
320+
if let Some(flow) = self.processor.registry.flow(flow_name) {
319321
if let Some(request) = flow.watch_request() {
320322
info!(target: "flows", "Reconnecting input: {flow_name}: {}", flow.input);
321323
self.watch_request_sender.send(request).await?

crates/extensions/tedge_flows/src/config.rs

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,15 @@ use camino::Utf8Path;
1212
use camino::Utf8PathBuf;
1313
use serde::Deserialize;
1414
use serde_json::Value;
15+
use std::collections::HashMap;
1516
use std::fmt::Debug;
1617
use std::time::Duration;
1718
use tedge_mqtt_ext::Topic;
1819
use tedge_mqtt_ext::TopicFilter;
20+
use tokio::fs::read_dir;
21+
use tokio::fs::read_to_string;
22+
use tracing::error;
23+
use tracing::info;
1924

2025
#[derive(Deserialize)]
2126
pub struct FlowConfig {
@@ -101,6 +106,53 @@ pub enum ConfigError {
101106
}
102107

103108
impl FlowConfig {
109+
pub async fn load_all_flows(config_dir: &Utf8Path) -> HashMap<Utf8PathBuf, FlowConfig> {
110+
let mut flows = HashMap::new();
111+
let Ok(mut entries) = read_dir(config_dir).await.map_err(
112+
|err| error!(target: "flows", "Failed to read flows from {config_dir}: {err}"),
113+
) else {
114+
return flows;
115+
};
116+
117+
while let Ok(Some(entry)) = entries.next_entry().await {
118+
let Some(path) = Utf8Path::from_path(&entry.path()).map(|p| p.to_path_buf()) else {
119+
error!(target: "flows", "Skipping non UTF8 path: {}", entry.path().display());
120+
continue;
121+
};
122+
if let Ok(file_type) = entry.file_type().await {
123+
if file_type.is_file() {
124+
if let Some("toml") = path.extension() {
125+
info!(target: "flows", "Loading flow: {path}");
126+
if let Some(flow) = FlowConfig::load_single_flow(&path).await {
127+
flows.insert(path.clone(), flow);
128+
}
129+
}
130+
}
131+
}
132+
}
133+
flows
134+
}
135+
136+
pub async fn load_single_flow(flow: &Utf8Path) -> Option<FlowConfig> {
137+
match FlowConfig::load_flow(flow).await {
138+
Ok(flow) => Some(flow),
139+
Err(err) => {
140+
error!(target: "flows", "Failed to load flow {flow}: {err}");
141+
None
142+
}
143+
}
144+
}
145+
146+
pub fn wrap_script_into_flow(script: &Utf8Path) -> FlowConfig {
147+
FlowConfig::from_step(script.to_owned())
148+
}
149+
150+
async fn load_flow(path: &Utf8Path) -> Result<FlowConfig, LoadError> {
151+
let specs = read_to_string(path).await?;
152+
let flow: FlowConfig = toml::from_str(&specs)?;
153+
Ok(flow)
154+
}
155+
104156
pub fn from_step(script: Utf8PathBuf) -> Self {
105157
let input_topic = "#".to_string();
106158
let step = StepConfig {

crates/extensions/tedge_flows/src/flow.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,18 @@ pub enum FlowError {
117117
Anyhow(#[from] anyhow::Error),
118118
}
119119

120+
impl AsRef<Flow> for Flow {
121+
fn as_ref(&self) -> &Flow {
122+
self
123+
}
124+
}
125+
126+
impl AsMut<Flow> for Flow {
127+
fn as_mut(&mut self) -> &mut Flow {
128+
self
129+
}
130+
}
131+
120132
impl Flow {
121133
pub fn name(&self) -> &str {
122134
self.source.as_str()
@@ -176,7 +188,7 @@ impl Flow {
176188
Ok(messages)
177189
}
178190

179-
pub fn accept_message(&mut self, source: &SourceTag, message: &Message) -> bool {
191+
pub fn accept_message(&self, source: &SourceTag, message: &Message) -> bool {
180192
self.input.accept_message(source, message)
181193
}
182194

crates/extensions/tedge_flows/src/lib.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
mod actor;
22
mod config;
3-
pub mod flow;
3+
mod flow;
44
mod input_source;
55
mod js_lib;
66
mod js_runtime;
@@ -12,6 +12,9 @@ mod stats;
1212

1313
use crate::actor::FlowsMapper;
1414
use crate::actor::STATS_DUMP_INTERVAL;
15+
pub use crate::flow::*;
16+
pub use crate::registry::BaseFlowRegistry;
17+
pub use crate::registry::FlowRegistryExt;
1518
pub use crate::runtime::MessageProcessor;
1619
use camino::Utf8Path;
1720
use std::collections::HashSet;
@@ -38,6 +41,7 @@ use tedge_watch_ext::WatchEvent;
3841
use tedge_watch_ext::WatchRequest;
3942
use tokio::time::Instant;
4043
use tracing::error;
44+
4145
fan_in_message_type!(InputMessage[MqttMessage, WatchEvent, FsWatchEvent, Tick]: Clone, Debug, Eq, PartialEq);
4246

4347
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
@@ -47,12 +51,13 @@ pub struct FlowsMapperBuilder {
4751
message_box: SimpleMessageBoxBuilder<InputMessage, SubscriptionDiff>,
4852
mqtt_sender: DynSender<MqttMessage>,
4953
watch_request_sender: DynSender<WatchRequest>,
50-
processor: MessageProcessor,
54+
processor: MessageProcessor<BaseFlowRegistry>,
5155
}
5256

5357
impl FlowsMapperBuilder {
5458
pub async fn try_new(config_dir: impl AsRef<Utf8Path>) -> Result<Self, LoadError> {
55-
let mut processor = MessageProcessor::try_new(config_dir).await?;
59+
let registry = BaseFlowRegistry::new(config_dir);
60+
let mut processor = MessageProcessor::try_new(registry).await?;
5661
let message_box = SimpleMessageBoxBuilder::new("TedgeFlows", 16);
5762
let mqtt_sender = NullSender.into();
5863
let watch_request_sender = NullSender.into();

0 commit comments

Comments
 (0)