Skip to content

Commit 633c167

Browse files
committed
Move db draining logic out of actor in tedge-flows
1 parent 593725e commit 633c167

File tree

7 files changed

+350
-119
lines changed

7 files changed

+350
-119
lines changed

crates/extensions/tedge_flows/src/actor.rs

Lines changed: 40 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,7 @@ impl Actor for FlowsMapper {
4444

4545
tokio::select! {
4646
_ = deadline_future => {
47-
let drained_messages = self.drain_db().await?;
48-
self.on_messages(MessageSource::MeaDB, drained_messages).await?;
49-
47+
self.poll_ready_sources().await?;
5048
self.on_interval().await?;
5149
}
5250
message = self.messages.recv() => {
@@ -132,17 +130,6 @@ impl FlowsMapper {
132130
Ok(())
133131
}
134132

135-
async fn on_messages(
136-
&mut self,
137-
source: MessageSource,
138-
messages: Vec<(DateTime, Message)>,
139-
) -> Result<(), RuntimeError> {
140-
for (timestamp, message) in messages {
141-
self.on_message(source, timestamp, message).await?
142-
}
143-
Ok(())
144-
}
145-
146133
async fn on_interval(&mut self) -> Result<(), RuntimeError> {
147134
let now = Instant::now();
148135
let timestamp = DateTime::now();
@@ -196,6 +183,8 @@ impl FlowsMapper {
196183
if let Err(err) = self
197184
.processor
198185
.database
186+
.lock()
187+
.await
199188
.store_many(output_series, messages)
200189
.await
201190
{
@@ -207,22 +196,47 @@ impl FlowsMapper {
207196
Ok(())
208197
}
209198

210-
async fn drain_db(&mut self) -> Result<Vec<(DateTime, Message)>, RuntimeError> {
199+
async fn poll_ready_sources(&mut self) -> Result<(), RuntimeError> {
211200
let timestamp = DateTime::now();
212-
let mut messages = vec![];
213-
for (flow_id, flow_messages) in self.processor.drain_db(timestamp).await {
214-
match flow_messages {
215-
Ok(flow_messages) => {
216-
for (t, m) in flow_messages.iter() {
217-
info!(target: "flows", "drained: @{}.{} [{}]", t.seconds, t.nanoseconds, m.topic);
201+
202+
// Collect flow IDs with ready sources
203+
let ready_flows: Vec<String> = self
204+
.processor
205+
.flows
206+
.iter()
207+
.filter_map(|(flow_id, flow)| {
208+
flow.input_source
209+
.as_ref()
210+
.filter(|source| source.is_ready(timestamp))
211+
.map(|_| flow_id.clone())
212+
})
213+
.collect();
214+
215+
// Poll each ready source and process messages
216+
for flow_id in ready_flows {
217+
if let Some(flow) = self.processor.flows.get_mut(&flow_id) {
218+
if let Some(source) = &mut flow.input_source {
219+
match source.poll(timestamp).await {
220+
Ok(messages) => {
221+
for (t, m) in messages.iter() {
222+
info!(target: "flows", "drained: @{}.{} [{}]", t.seconds, t.nanoseconds, m.topic);
223+
}
224+
source.update_after_poll(timestamp);
225+
226+
// Process the messages through the flow
227+
for (msg_timestamp, message) in messages {
228+
self.on_message(MessageSource::MeaDB, msg_timestamp, message)
229+
.await?;
230+
}
231+
}
232+
Err(err) => {
233+
error!(target: "flows", "{flow_id}: Failed to poll source: {err}");
234+
}
218235
}
219-
messages.extend(flow_messages);
220-
}
221-
Err(err) => {
222-
error!(target: "flows", "{flow_id}: {err}");
223236
}
224237
}
225238
}
226-
Ok(messages)
239+
240+
Ok(())
227241
}
228242
}

crates/extensions/tedge_flows/src/config.rs

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
1+
use crate::database::MeaDb;
12
use crate::flow::Flow;
23
use crate::flow::FlowInput;
34
use crate::flow::FlowOutput;
45
use crate::flow::FlowStep;
6+
use crate::input_source::DatabaseSource;
57
use crate::js_runtime::JsRuntime;
68
use crate::js_script::JsScript;
79
use crate::LoadError;
@@ -10,8 +12,10 @@ use camino::Utf8PathBuf;
1012
use serde::Deserialize;
1113
use serde_json::Value;
1214
use std::fmt::Debug;
15+
use std::sync::Arc;
1316
use std::time::Duration;
1417
use tedge_mqtt_ext::TopicFilter;
18+
use tokio::sync::Mutex;
1519

1620
#[derive(Deserialize)]
1721
pub struct FlowConfig {
@@ -110,8 +114,25 @@ impl FlowConfig {
110114
js_runtime: &mut JsRuntime,
111115
config_dir: &Utf8Path,
112116
source: Utf8PathBuf,
117+
database: Arc<Mutex<Box<dyn MeaDb>>>,
113118
) -> Result<Flow, ConfigError> {
114119
let input = self.input.try_into()?;
120+
121+
// Create input source for MeaDB inputs
122+
let input_source = match &input {
123+
FlowInput::MeaDB {
124+
series,
125+
frequency,
126+
max_age,
127+
} => Some(Box::new(DatabaseSource::new(
128+
database.clone(),
129+
series.clone(),
130+
*frequency,
131+
*max_age,
132+
)) as Box<dyn crate::input_source::InputSource>),
133+
FlowInput::Mqtt { .. } => None,
134+
};
135+
115136
let mut steps = vec![];
116137
for (i, step) in self.steps.into_iter().enumerate() {
117138
let mut step = step.compile(config_dir, i, &source).await?;
@@ -122,15 +143,13 @@ impl FlowConfig {
122143
steps.push(step);
123144
}
124145
let output = self.output.try_into()?;
125-
let mut flow = Flow {
146+
let flow = Flow {
126147
input,
148+
input_source,
127149
steps,
128150
source,
129151
output,
130-
next_drain: None,
131-
last_drain: None,
132152
};
133-
flow.init_next_drain();
134153
Ok(flow)
135154
}
136155
}

crates/extensions/tedge_flows/src/flow.rs

Lines changed: 4 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use std::ops::Sub;
22
use std::time::Duration;
33

4+
use crate::input_source::InputSource;
45
use crate::js_runtime::JsRuntime;
56
use crate::js_script::JsScript;
67
use crate::stats::Counter;
@@ -16,24 +17,20 @@ use tokio::time::Instant;
1617
use tracing::warn;
1718

1819
/// A chain of transformation of MQTT messages
19-
#[derive(Debug)]
2020
pub struct Flow {
2121
/// The source topics
2222
pub input: FlowInput,
2323

24+
/// Input source for polling (e.g., database drains)
25+
pub input_source: Option<Box<dyn InputSource>>,
26+
2427
/// Transformation steps to apply in order to the messages
2528
pub steps: Vec<FlowStep>,
2629

2730
pub source: Utf8PathBuf,
2831

2932
/// Target of the transformed messages
3033
pub output: FlowOutput,
31-
32-
/// Next time to drain database for MeaDB inputs (for deadline-based wakeup)
33-
pub next_drain: Option<tokio::time::Instant>,
34-
35-
/// Last time database was drained (for frequency checking)
36-
pub last_drain: Option<DateTime>,
3734
}
3835

3936
/// A message transformation step
@@ -104,45 +101,6 @@ impl Flow {
104101
}
105102
}
106103

107-
pub fn init_next_drain(&mut self) {
108-
if let FlowInput::MeaDB { frequency, .. } = &self.input {
109-
if !frequency.is_zero() {
110-
self.next_drain = Some(tokio::time::Instant::now() + *frequency);
111-
}
112-
}
113-
}
114-
115-
pub fn should_drain_at(&mut self, timestamp: DateTime) -> bool {
116-
if let FlowInput::MeaDB { frequency, .. } = &self.input {
117-
if frequency.is_zero() {
118-
return false;
119-
}
120-
121-
// Check if enough time has passed since last drain
122-
match self.last_drain {
123-
Some(last_drain) => {
124-
let elapsed_secs = timestamp.seconds.saturating_sub(last_drain.seconds);
125-
let frequency_secs = frequency.as_secs();
126-
if elapsed_secs >= frequency_secs {
127-
self.last_drain = Some(timestamp);
128-
// Also update the deadline for the actor loop
129-
self.next_drain = Some(tokio::time::Instant::now() + *frequency);
130-
true
131-
} else {
132-
false
133-
}
134-
}
135-
None => {
136-
// First drain
137-
self.last_drain = Some(timestamp);
138-
self.next_drain = Some(tokio::time::Instant::now() + *frequency);
139-
true
140-
}
141-
}
142-
} else {
143-
false
144-
}
145-
}
146104

147105
pub fn topics(&self) -> TopicFilter {
148106
let mut topics = self.input.topics();

0 commit comments

Comments
 (0)