Skip to content

Commit 90b633f

Browse files
committed
Separate Flow vs ConnectedFlow
Signed-off-by: Didier Wenzek <[email protected]>
1 parent 9b689b2 commit 90b633f

File tree

8 files changed

+298
-146
lines changed

8 files changed

+298
-146
lines changed

crates/extensions/tedge_flows/src/actor.rs

Lines changed: 44 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
1+
use crate::connected_flow::ConnectedFlowRegistry;
12
use crate::flow::DateTime;
23
use crate::flow::FlowError;
34
use crate::flow::FlowOutput;
45
use crate::flow::FlowResult;
56
use crate::flow::Message;
67
use crate::flow::SourceTag;
7-
use crate::registry::BaseFlowRegistry;
88
use crate::registry::FlowRegistryExt;
99
use crate::runtime::MessageProcessor;
1010
use crate::InputMessage;
@@ -46,7 +46,7 @@ pub struct FlowsMapper {
4646
pub(super) watch_request_sender: DynSender<WatchRequest>,
4747
pub(super) subscriptions: TopicFilter,
4848
pub(super) watched_commands: HashSet<String>,
49-
pub(super) processor: MessageProcessor<BaseFlowRegistry>,
49+
pub(super) processor: MessageProcessor<ConnectedFlowRegistry>,
5050
pub(super) next_dump: Instant,
5151
}
5252

@@ -71,7 +71,7 @@ impl Actor for FlowsMapper {
7171
self.on_message(source, Message::from(message)).await?
7272
}
7373
InputMessage::WatchEvent(event) => {
74-
self.on_process_event(event).await?;
74+
self.on_input_event(event).await?;
7575
}
7676
InputMessage::FsWatchEvent(FsWatchEvent::Modified(path)) => {
7777
let Ok(path) = Utf8PathBuf::try_from(path) else {
@@ -166,7 +166,7 @@ impl FlowsMapper {
166166
new_watched_commands.insert(topic.to_owned());
167167
}
168168
for old_command in self.watched_commands.drain() {
169-
info!(target: "flows", "removing input: {}", old_command);
169+
info!(target: "flows", "Removing input: {}", old_command);
170170
watch_requests.push(WatchRequest::UnWatch { topic: old_command });
171171
}
172172
self.watched_commands = new_watched_commands;
@@ -208,8 +208,29 @@ impl FlowsMapper {
208208
async fn on_source_poll(&mut self) -> Result<(), RuntimeError> {
209209
let now = Instant::now();
210210
let timestamp = DateTime::now();
211-
for messages in self.processor.on_source_poll(timestamp, now).await {
212-
self.publish_result(messages).await?;
211+
212+
let mut in_messages = vec![];
213+
for flow in self.processor.registry.flows_mut() {
214+
in_messages.push(flow.on_source_poll(timestamp, now).await);
215+
}
216+
217+
for messages in in_messages {
218+
match messages {
219+
FlowResult::Ok { flow, messages, .. } => {
220+
for message in messages {
221+
if let Some(flow_output) = self
222+
.processor
223+
.on_flow_input(&flow, timestamp, &message)
224+
.await
225+
{
226+
self.publish_result(flow_output).await?;
227+
}
228+
}
229+
}
230+
poll_error => {
231+
self.publish_result(poll_error).await?;
232+
}
233+
}
213234
}
214235

215236
Ok(())
@@ -247,43 +268,48 @@ impl FlowsMapper {
247268
Ok(())
248269
}
249270

250-
async fn on_process_event(&mut self, event: WatchEvent) -> Result<(), RuntimeError> {
271+
async fn on_input_event(&mut self, event: WatchEvent) -> Result<(), RuntimeError> {
251272
match event {
252273
WatchEvent::StdoutLine { topic, line } => {
253-
self.on_process_message(topic, line).await?;
274+
self.on_input_message(topic, line).await?;
254275
}
255276
WatchEvent::StderrLine { topic, line } => {
256277
warn!(target: "flows", "Input command {topic}: {line}");
257278
}
258279
WatchEvent::Error { topic, error } => {
259280
error!(target: "flows", "Cannot monitor command: {error}");
260-
self.on_process_error(&topic, error.into()).await?;
281+
self.on_input_error(&topic, error.into()).await?;
261282
}
262283
WatchEvent::EndOfStream { topic } => {
263284
error!(target: "flows", "End of input stream: {topic}");
264-
self.on_process_eos(&topic).await?
285+
self.on_input_eos(&topic).await?
265286
}
266287
}
267288
Ok(())
268289
}
269290

270-
async fn on_process_message(
291+
async fn on_input_message(
271292
&mut self,
272293
flow_name: String,
273294
line: String,
274295
) -> Result<(), RuntimeError> {
275296
if let Some(flow) = self.processor.registry.flow(&flow_name) {
276-
let topic = flow.input.enforced_topic().unwrap_or_default();
277-
let source = SourceTag::Process {
278-
flow: flow_name.clone(),
279-
};
280-
self.on_message(source, Message::new(topic, line)).await?;
297+
let topic = flow.input_topic().to_string();
298+
let timestamp = DateTime::now();
299+
let message = Message::new(topic, line);
300+
if let Some(result) = self
301+
.processor
302+
.on_flow_input(&flow_name, timestamp, &message)
303+
.await
304+
{
305+
self.publish_result(result).await?;
306+
}
281307
}
282308

283309
Ok(())
284310
}
285311

286-
async fn on_process_error(
312+
async fn on_input_error(
287313
&mut self,
288314
flow_name: &str,
289315
error: FlowError,
@@ -316,7 +342,7 @@ impl FlowsMapper {
316342
Ok(())
317343
}
318344

319-
async fn on_process_eos(&mut self, flow_name: &str) -> Result<(), RuntimeError> {
345+
async fn on_input_eos(&mut self, flow_name: &str) -> Result<(), RuntimeError> {
320346
if let Some(flow) = self.processor.registry.flow(flow_name) {
321347
if let Some(request) = flow.watch_request() {
322348
info!(target: "flows", "Reconnecting input: {flow_name}: {}", flow.input);

crates/extensions/tedge_flows/src/config.rs

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,7 @@
11
use crate::flow::Flow;
2+
use crate::flow::FlowInput;
23
use crate::flow::FlowOutput;
34
use crate::flow::FlowStep;
4-
use crate::input_source::CommandFlowInput;
5-
use crate::input_source::FileFlowInput;
6-
use crate::input_source::FlowInput;
7-
use crate::input_source::MqttFlowInput;
85
use crate::js_runtime::JsRuntime;
96
use crate::js_script::JsScript;
107
use crate::LoadError;
@@ -177,8 +174,7 @@ impl FlowConfig {
177174
config_dir: &Utf8Path,
178175
source: Utf8PathBuf,
179176
) -> Result<Flow, ConfigError> {
180-
let flow_name = source.clone().to_string();
181-
let input = self.input.compile(flow_name)?;
177+
let input = self.input.try_into()?;
182178
let output = self.output.try_into()?;
183179
let errors = self.errors.try_into()?;
184180
let mut steps = vec![];
@@ -223,27 +219,42 @@ impl StepConfig {
223219
}
224220
}
225221

226-
impl InputConfig {
227-
pub fn compile(self, flow_name: String) -> Result<Box<dyn FlowInput>, ConfigError> {
228-
Ok(match self {
229-
InputConfig::Mqtt { topics } => Box::new(MqttFlowInput {
222+
impl TryFrom<InputConfig> for FlowInput {
223+
type Error = ConfigError;
224+
fn try_from(input: InputConfig) -> Result<Self, Self::Error> {
225+
Ok(match input {
226+
InputConfig::Mqtt { topics } => FlowInput::Mqtt {
230227
topics: topic_filters(topics)?,
231-
}),
228+
},
232229
InputConfig::File {
233230
topic,
234231
path,
235232
interval,
236233
} => {
237234
let topic = topic.unwrap_or_else(|| path.clone().to_string());
238-
Box::new(FileFlowInput::new(flow_name, topic, path, interval))
235+
match interval {
236+
Some(interval) if !interval.is_zero() => FlowInput::PollFile {
237+
topic,
238+
path,
239+
interval,
240+
},
241+
_ => FlowInput::StreamFile { topic, path },
242+
}
239243
}
240244
InputConfig::Process {
241245
topic,
242246
command,
243247
interval,
244248
} => {
245249
let topic = topic.unwrap_or_else(|| command.clone());
246-
Box::new(CommandFlowInput::new(flow_name, topic, command, interval))
250+
match interval {
251+
Some(interval) if !interval.is_zero() => FlowInput::PollCommand {
252+
topic,
253+
command,
254+
interval,
255+
},
256+
_ => FlowInput::StreamCommand { topic, command },
257+
}
247258
}
248259
})
249260
}
Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
use crate::config::ConfigError;
2+
use crate::flow::DateTime;
3+
use crate::flow::Flow;
4+
use crate::flow::FlowError;
5+
use crate::flow::FlowInput;
6+
use crate::flow::FlowResult;
7+
use crate::flow::Message;
8+
use crate::input_source::CommandFlowInput;
9+
use crate::input_source::FileFlowInput;
10+
use crate::input_source::FlowSource;
11+
use crate::input_source::MqttFlowInput;
12+
use crate::registry::FlowRegistry;
13+
use crate::registry::FlowStore;
14+
use camino::Utf8Path;
15+
use tedge_watch_ext::WatchRequest;
16+
use tokio::time::Instant;
17+
18+
/// A flow connected to a source of messages
19+
pub struct ConnectedFlow {
20+
flow: Flow,
21+
pub(crate) input: Box<dyn FlowSource>,
22+
}
23+
24+
impl AsRef<Flow> for ConnectedFlow {
25+
fn as_ref(&self) -> &Flow {
26+
&self.flow
27+
}
28+
}
29+
30+
impl AsMut<Flow> for ConnectedFlow {
31+
fn as_mut(&mut self) -> &mut Flow {
32+
&mut self.flow
33+
}
34+
}
35+
36+
impl ConnectedFlow {
37+
pub fn new(flow: Flow) -> Self {
38+
let name = flow.name().to_string();
39+
let input = connect(name, flow.input.clone());
40+
ConnectedFlow { flow, input }
41+
}
42+
43+
pub fn name(&self) -> &str {
44+
self.flow.name()
45+
}
46+
47+
pub fn input_topic(&self) -> &str {
48+
self.input.enforced_topic().unwrap_or_default()
49+
}
50+
51+
pub fn watch_request(&self) -> Option<WatchRequest> {
52+
self.input.watch_request()
53+
}
54+
55+
pub async fn on_source_poll(&mut self, timestamp: DateTime, now: Instant) -> FlowResult {
56+
let result = self.on_source_poll_steps(timestamp, now).await;
57+
self.flow.publish(result)
58+
}
59+
60+
async fn on_source_poll_steps(
61+
&mut self,
62+
timestamp: DateTime,
63+
now: Instant,
64+
) -> Result<Vec<Message>, FlowError> {
65+
let source = &mut self.input;
66+
if !source.is_ready(now) {
67+
return Ok(vec![]);
68+
};
69+
70+
let messages = source.poll(timestamp).await?;
71+
source.update_after_poll(now);
72+
Ok(messages)
73+
}
74+
75+
pub fn on_error(&self, error: FlowError) -> FlowResult {
76+
self.flow.publish(Err(error))
77+
}
78+
}
79+
80+
fn connect(flow_name: String, input: FlowInput) -> Box<dyn FlowSource> {
81+
match input {
82+
FlowInput::Mqtt { topics } => Box::new(MqttFlowInput { topics }),
83+
FlowInput::PollFile {
84+
topic,
85+
path,
86+
interval,
87+
} => Box::new(FileFlowInput::new(flow_name, topic, path, Some(interval))),
88+
FlowInput::PollCommand {
89+
topic,
90+
command,
91+
interval,
92+
} => Box::new(CommandFlowInput::new(
93+
flow_name,
94+
topic,
95+
command,
96+
Some(interval),
97+
)),
98+
FlowInput::StreamFile { topic, path } => {
99+
Box::new(FileFlowInput::new(flow_name, topic, path, None))
100+
}
101+
FlowInput::StreamCommand { topic, command } => {
102+
Box::new(CommandFlowInput::new(flow_name, topic, command, None))
103+
}
104+
}
105+
}
106+
107+
pub struct ConnectedFlowRegistry {
108+
flows: FlowStore<ConnectedFlow>,
109+
}
110+
111+
impl ConnectedFlowRegistry {
112+
pub fn new(config_dir: impl AsRef<Utf8Path>) -> Self {
113+
ConnectedFlowRegistry {
114+
flows: FlowStore::new(config_dir),
115+
}
116+
}
117+
}
118+
119+
#[async_trait::async_trait]
120+
impl FlowRegistry for ConnectedFlowRegistry {
121+
type Flow = ConnectedFlow;
122+
123+
fn compile(flow: Flow) -> Result<Self::Flow, ConfigError> {
124+
Ok(ConnectedFlow::new(flow))
125+
}
126+
127+
fn store(&self) -> &FlowStore<Self::Flow> {
128+
&self.flows
129+
}
130+
131+
fn store_mut(&mut self) -> &mut FlowStore<Self::Flow> {
132+
&mut self.flows
133+
}
134+
135+
fn deadlines(&self) -> impl Iterator<Item = Instant> + '_ {
136+
let script_deadlines = self
137+
.flows
138+
.flows()
139+
.flat_map(|flow| &flow.as_ref().steps)
140+
.filter_map(|step| step.script.next_execution);
141+
142+
let source_deadlines = self
143+
.flows
144+
.flows()
145+
.filter_map(|flow| flow.input.next_deadline());
146+
147+
script_deadlines.chain(source_deadlines)
148+
}
149+
}

0 commit comments

Comments
 (0)