Skip to content

Commit a6815a8

Browse files
committed
Separate streaming sources from polling ones
Signed-off-by: Didier Wenzek <[email protected]>
1 parent 90b633f commit a6815a8

File tree

4 files changed

+136
-147
lines changed

4 files changed

+136
-147
lines changed

crates/extensions/tedge_flows/src/actor.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ impl FlowsMapper {
159159
continue;
160160
};
161161
if !self.watched_commands.contains(topic) {
162-
info!(target: "flows", "Adding input: {}", flow.input);
162+
info!(target: "flows", "Adding input: {}", flow.as_ref().input);
163163
watch_requests.push(request);
164164
}
165165
self.watched_commands.remove(topic);
@@ -316,7 +316,7 @@ impl FlowsMapper {
316316
) -> Result<(), RuntimeError> {
317317
let Some((info, flow_error)) = self.processor.registry.flow(flow_name).map(|flow| {
318318
(
319-
format!("Reconnecting input: {flow_name}: {}", flow.input),
319+
format!("Reconnecting input: {flow_name}: {}", flow.as_ref().input),
320320
flow.on_error(error),
321321
)
322322
}) else {
@@ -345,7 +345,7 @@ impl FlowsMapper {
345345
async fn on_input_eos(&mut self, flow_name: &str) -> Result<(), RuntimeError> {
346346
if let Some(flow) = self.processor.registry.flow(flow_name) {
347347
if let Some(request) = flow.watch_request() {
348-
info!(target: "flows", "Reconnecting input: {flow_name}: {}", flow.input);
348+
info!(target: "flows", "Reconnecting input: {flow_name}: {}", flow.as_ref().input);
349349
self.watch_request_sender.send(request).await?
350350
};
351351
}

crates/extensions/tedge_flows/src/connected_flow.rs

Lines changed: 49 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,12 @@ use crate::flow::FlowError;
55
use crate::flow::FlowInput;
66
use crate::flow::FlowResult;
77
use crate::flow::Message;
8-
use crate::input_source::CommandFlowInput;
9-
use crate::input_source::FileFlowInput;
10-
use crate::input_source::FlowSource;
11-
use crate::input_source::MqttFlowInput;
8+
use crate::input_source::CommandPollingSource;
9+
use crate::input_source::CommandStreamingSource;
10+
use crate::input_source::FilePollingSource;
11+
use crate::input_source::FileStreamingSource;
12+
use crate::input_source::PollingSource;
13+
use crate::input_source::StreamingSource;
1214
use crate::registry::FlowRegistry;
1315
use crate::registry::FlowStore;
1416
use camino::Utf8Path;
@@ -18,7 +20,8 @@ use tokio::time::Instant;
1820
/// A flow connected to a source of messages
1921
pub struct ConnectedFlow {
2022
flow: Flow,
21-
pub(crate) input: Box<dyn FlowSource>,
23+
streaming_source: Option<Box<dyn StreamingSource>>,
24+
polling_source: Option<Box<dyn PollingSource>>,
2225
}
2326

2427
impl AsRef<Flow> for ConnectedFlow {
@@ -35,21 +38,31 @@ impl AsMut<Flow> for ConnectedFlow {
3538

3639
impl ConnectedFlow {
3740
pub fn new(flow: Flow) -> Self {
38-
let name = flow.name().to_string();
39-
let input = connect(name, flow.input.clone());
40-
ConnectedFlow { flow, input }
41+
let streaming_source = streaming_source(flow.name().to_owned(), flow.input.clone());
42+
let polling_source = polling_source(flow.input.clone());
43+
ConnectedFlow {
44+
flow,
45+
streaming_source,
46+
polling_source,
47+
}
4148
}
4249

4350
pub fn name(&self) -> &str {
4451
self.flow.name()
4552
}
4653

4754
pub fn input_topic(&self) -> &str {
48-
self.input.enforced_topic().unwrap_or_default()
55+
self.flow.input.enforced_topic().unwrap_or_default()
4956
}
5057

5158
pub fn watch_request(&self) -> Option<WatchRequest> {
52-
self.input.watch_request()
59+
self.streaming_source
60+
.as_ref()
61+
.and_then(|source| source.watch_request())
62+
}
63+
64+
pub fn next_deadline(&self) -> Option<Instant> {
65+
self.polling_source.as_ref().and_then(|p| p.next_deadline())
5366
}
5467

5568
pub async fn on_source_poll(&mut self, timestamp: DateTime, now: Instant) -> FlowResult {
@@ -62,7 +75,9 @@ impl ConnectedFlow {
6275
timestamp: DateTime,
6376
now: Instant,
6477
) -> Result<Vec<Message>, FlowError> {
65-
let source = &mut self.input;
78+
let Some(source) = &mut self.polling_source.as_mut() else {
79+
return Ok(vec![]);
80+
};
6681
if !source.is_ready(now) {
6782
return Ok(vec![]);
6883
};
@@ -77,30 +92,37 @@ impl ConnectedFlow {
7792
}
7893
}
7994

80-
fn connect(flow_name: String, input: FlowInput) -> Box<dyn FlowSource> {
95+
fn streaming_source(flow_name: String, input: FlowInput) -> Option<Box<dyn StreamingSource>> {
96+
match input {
97+
FlowInput::StreamFile { topic: _, path } => {
98+
Some(Box::new(FileStreamingSource::new(flow_name, path)))
99+
}
100+
101+
FlowInput::StreamCommand { topic: _, command } => {
102+
Some(Box::new(CommandStreamingSource::new(flow_name, command)))
103+
}
104+
105+
_ => None,
106+
}
107+
}
108+
109+
fn polling_source(input: FlowInput) -> Option<Box<dyn PollingSource>> {
81110
match input {
82-
FlowInput::Mqtt { topics } => Box::new(MqttFlowInput { topics }),
83111
FlowInput::PollFile {
84112
topic,
85113
path,
86114
interval,
87-
} => Box::new(FileFlowInput::new(flow_name, topic, path, Some(interval))),
115+
} => Some(Box::new(FilePollingSource::new(topic, path, interval))),
116+
88117
FlowInput::PollCommand {
89118
topic,
90119
command,
91120
interval,
92-
} => Box::new(CommandFlowInput::new(
93-
flow_name,
94-
topic,
95-
command,
96-
Some(interval),
97-
)),
98-
FlowInput::StreamFile { topic, path } => {
99-
Box::new(FileFlowInput::new(flow_name, topic, path, None))
100-
}
101-
FlowInput::StreamCommand { topic, command } => {
102-
Box::new(CommandFlowInput::new(flow_name, topic, command, None))
103-
}
121+
} => Some(Box::new(CommandPollingSource::new(
122+
topic, command, interval,
123+
))),
124+
125+
_ => None,
104126
}
105127
}
106128

@@ -139,10 +161,7 @@ impl FlowRegistry for ConnectedFlowRegistry {
139161
.flat_map(|flow| &flow.as_ref().steps)
140162
.filter_map(|step| step.script.next_execution);
141163

142-
let source_deadlines = self
143-
.flows
144-
.flows()
145-
.filter_map(|flow| flow.input.next_deadline());
164+
let source_deadlines = self.flows.flows().filter_map(|flow| flow.next_deadline());
146165

147166
script_deadlines.chain(source_deadlines)
148167
}

crates/extensions/tedge_flows/src/flow.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ use camino::Utf8Path;
77
use camino::Utf8PathBuf;
88
use serde_json::json;
99
use serde_json::Value;
10+
use std::fmt::Display;
11+
use std::fmt::Formatter;
1012
use std::time::Duration;
1113
use tedge_mqtt_ext::MqttMessage;
1214
use tedge_mqtt_ext::Topic;
@@ -323,6 +325,28 @@ impl Flow {
323325
}
324326
}
325327

328+
impl Display for FlowInput {
329+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
330+
match self {
331+
FlowInput::Mqtt { topics } => {
332+
write!(f, "MQTT topics: {:?}", topics)
333+
}
334+
FlowInput::PollFile { path, .. } => {
335+
write!(f, "Polling file: {path}")
336+
}
337+
FlowInput::PollCommand { command, .. } => {
338+
write!(f, "Polling command: {command}")
339+
}
340+
FlowInput::StreamFile { path, .. } => {
341+
write!(f, "Streaming file: {path}")
342+
}
343+
FlowInput::StreamCommand { command, .. } => {
344+
write!(f, "Streaming command: {command}")
345+
}
346+
}
347+
}
348+
}
349+
326350
impl FlowInput {
327351
pub fn topics(&self) -> TopicFilter {
328352
match self {
@@ -331,6 +355,16 @@ impl FlowInput {
331355
}
332356
}
333357

358+
pub fn enforced_topic(&self) -> Option<&str> {
359+
match self {
360+
FlowInput::Mqtt { .. } => None,
361+
FlowInput::PollFile { topic, .. }
362+
| FlowInput::PollCommand { topic, .. }
363+
| FlowInput::StreamFile { topic, .. }
364+
| FlowInput::StreamCommand { topic, .. } => Some(topic),
365+
}
366+
}
367+
334368
pub fn accept_message(&self, message: &Message) -> bool {
335369
match self {
336370
FlowInput::Mqtt { topics } => topics.accept_topic_name(&message.topic),

0 commit comments

Comments
 (0)