Skip to content

Commit 22a4149

Browse files
committed
Replace polling with sleeps in handling intervals in tedge flows
Signed-off-by: James Rhodes <jarhodes314@gmail.com>
1 parent 5d09818 commit 22a4149

File tree

7 files changed

+164
-60
lines changed

7 files changed

+164
-60
lines changed

crates/extensions/tedge_flows/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ async-trait = { workspace = true }
1414
camino = { workspace = true, features = ["serde1"] }
1515
cfg-if = { workspace = true }
1616
fjall = { workspace = true, optional = true }
17+
futures = { workspace = true }
1718
humantime = { workspace = true }
1819
rquickjs = { git = "https://github.com/thin-edge/rquickjs", rev = "4ed04ac9af3de453dd41ff09fdd1837c7ceb1f1c", default-features = false, features = [
1920
# disable bindgen and rely on pre-generated bindings due to problems

crates/extensions/tedge_flows/src/actor.rs

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ use crate::InputMessage;
77
use crate::OutputMessage;
88
use async_trait::async_trait;
99
use camino::Utf8PathBuf;
10+
use futures::future::Either;
11+
use std::future::pending;
1012
use tedge_actors::Actor;
1113
use tedge_actors::MessageReceiver;
1214
use tedge_actors::RuntimeError;
@@ -16,8 +18,7 @@ use tedge_file_system_ext::FsWatchEvent;
1618
use tedge_mqtt_ext::MqttMessage;
1719
use tedge_mqtt_ext::SubscriptionDiff;
1820
use tedge_mqtt_ext::TopicFilter;
19-
use tokio::time::interval;
20-
use tokio::time::Duration;
21+
use tokio::time::sleep_until;
2122
use tracing::error;
2223
use tracing::info;
2324

@@ -34,11 +35,14 @@ impl Actor for FlowsMapper {
3435
}
3536

3637
async fn run(mut self) -> Result<(), RuntimeError> {
37-
let mut interval = interval(Duration::from_secs(1));
38-
3938
loop {
39+
let deadline_future = match self.processor.next_interval_deadline() {
40+
Some(deadline) => Either::Left(sleep_until(deadline)),
41+
None => Either::Right(pending()),
42+
};
43+
4044
tokio::select! {
41-
_ = interval.tick() => {
45+
_ = deadline_future => {
4246
let drained_messages = self.drain_db().await?;
4347
self.on_messages(MessageSource::MeaDB, drained_messages).await?;
4448

@@ -147,8 +151,7 @@ impl FlowsMapper {
147151
for (flow_id, flow_messages) in self.processor.on_interval(timestamp).await {
148152
match flow_messages {
149153
Ok(messages) => {
150-
self.publish_messages(flow_id.clone(), messages)
151-
.await?;
154+
self.publish_messages(flow_id.clone(), messages).await?;
152155
}
153156
Err(err) => {
154157
error!(target: "flows", "{flow_id}: {err}");
@@ -184,15 +187,18 @@ impl FlowsMapper {
184187
}
185188
}
186189
FlowOutput::MeaDB { output_series } => {
187-
let messages = messages.into_iter().map(|m| (DateTime::now(), m)).collect::<Vec<_>>();
188-
if let Err(err) = self
189-
.processor
190-
.database
191-
.store_many(output_series, messages)
192-
.await
193-
{
194-
error!(target: "flows", "{flow_id}: fail to persist message: {err}");
195-
}
190+
let messages = messages
191+
.into_iter()
192+
.map(|m| (DateTime::now(), m))
193+
.collect::<Vec<_>>();
194+
if let Err(err) = self
195+
.processor
196+
.database
197+
.store_many(output_series, messages)
198+
.await
199+
{
200+
error!(target: "flows", "{flow_id}: fail to persist message: {err}");
201+
}
196202
}
197203
}
198204
}

crates/extensions/tedge_flows/src/config.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,15 +118,20 @@ impl FlowConfig {
118118
js_runtime.load_script(&mut step.script).await?;
119119
step.check(&source);
120120
step.fix();
121+
step.script.init_next_execution();
121122
steps.push(step);
122123
}
123124
let output = self.output.try_into()?;
124-
Ok(Flow {
125+
let mut flow = Flow {
125126
input,
126127
steps,
127128
source,
128129
output,
129-
})
130+
next_drain: None,
131+
last_drain: None,
132+
};
133+
flow.init_next_drain();
134+
Ok(flow)
130135
}
131136
}
132137

crates/extensions/tedge_flows/src/flow.rs

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,12 @@ pub struct Flow {
2626

2727
/// Target of the transformed messages
2828
pub output: FlowOutput,
29+
30+
/// Next time to drain database for MeaDB inputs (for deadline-based wakeup)
31+
pub next_drain: Option<tokio::time::Instant>,
32+
33+
/// Last time database was drained (for frequency checking)
34+
pub last_drain: Option<DateTime>,
2935
}
3036

3137
/// A message transformation step
@@ -94,6 +100,46 @@ impl Flow {
94100
}
95101
}
96102

103+
pub fn init_next_drain(&mut self) {
104+
if let FlowInput::MeaDB { frequency, .. } = &self.input {
105+
if !frequency.is_zero() {
106+
self.next_drain = Some(tokio::time::Instant::now() + *frequency);
107+
}
108+
}
109+
}
110+
111+
pub fn should_drain_at(&mut self, timestamp: DateTime) -> bool {
112+
if let FlowInput::MeaDB { frequency, .. } = &self.input {
113+
if frequency.is_zero() {
114+
return false;
115+
}
116+
117+
// Check if enough time has passed since last drain
118+
match self.last_drain {
119+
Some(last_drain) => {
120+
let elapsed_secs = timestamp.seconds.saturating_sub(last_drain.seconds);
121+
let frequency_secs = frequency.as_secs();
122+
if elapsed_secs >= frequency_secs {
123+
self.last_drain = Some(timestamp);
124+
// Also update the deadline for the actor loop
125+
self.next_drain = Some(tokio::time::Instant::now() + *frequency);
126+
true
127+
} else {
128+
false
129+
}
130+
}
131+
None => {
132+
// First drain
133+
self.last_drain = Some(timestamp);
134+
self.next_drain = Some(tokio::time::Instant::now() + *frequency);
135+
true
136+
}
137+
}
138+
} else {
139+
false
140+
}
141+
}
142+
97143
pub fn topics(&self) -> TopicFilter {
98144
let mut topics = self.input.topics();
99145
for step in self.steps.iter() {
@@ -159,7 +205,7 @@ impl Flow {
159205
) -> Result<Vec<Message>, FlowError> {
160206
let stated_at = stats.flow_on_interval_start(self.source.as_str());
161207
let mut messages = vec![];
162-
for step in self.steps.iter() {
208+
for step in self.steps.iter_mut() {
163209
let js = step.script.source();
164210
// Process first the messages triggered upstream by the tick
165211
let mut transformed_messages = vec![];
@@ -175,7 +221,8 @@ impl Flow {
175221
transformed_messages.extend(step_output?);
176222
}
177223

178-
// Only then process the tick
224+
// Only then process the tick if it's time to execute
225+
if step.script.should_execute_interval() {
179226
let step_started_at = stats.flow_step_start(&js, "onInterval");
180227
let tick_output = step.script.on_interval(js_runtime, timestamp).await;
181228
match &tick_output {
@@ -185,6 +232,7 @@ impl Flow {
185232
Err(_) => stats.flow_step_failed(&js, "onInterval"),
186233
}
187234
transformed_messages.extend(tick_output?);
235+
}
188236

189237
// Iterate with all the messages collected at this step
190238
messages = transformed_messages;

crates/extensions/tedge_flows/src/js_script.rs

Lines changed: 36 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use crate::js_runtime::JsRuntime;
66
use crate::js_value::JsonValue;
77
use camino::Utf8Path;
88
use camino::Utf8PathBuf;
9+
use tokio::time::Instant;
910
use tracing::debug;
1011

1112
#[derive(Clone)]
@@ -14,6 +15,7 @@ pub struct JsScript {
1415
pub path: Utf8PathBuf,
1516
pub config: JsonValue,
1617
pub interval: std::time::Duration,
18+
pub next_execution: Option<Instant>,
1719
pub no_js_on_message_fun: bool,
1820
pub no_js_on_config_update_fun: bool,
1921
pub no_js_on_interval_fun: bool,
@@ -27,6 +29,7 @@ impl JsScript {
2729
path,
2830
config: JsonValue::default(),
2931
interval: std::time::Duration::ZERO,
32+
next_execution: None,
3033
no_js_on_message_fun: true,
3134
no_js_on_config_update_fun: true,
3235
no_js_on_interval_fun: true,
@@ -116,24 +119,51 @@ impl JsScript {
116119
Ok(())
117120
}
118121

122+
/// Initialize the next execution time for this script's interval
123+
/// Should be called after the script is loaded and interval is set
124+
pub fn init_next_execution(&mut self) {
125+
if !self.no_js_on_interval_fun && !self.interval.is_zero() {
126+
self.next_execution = Some(Instant::now() + self.interval);
127+
}
128+
}
129+
130+
/// Check if this script should execute its interval function now
131+
/// Returns true and updates next_execution if it's time to execute
132+
pub fn should_execute_interval(&mut self) -> bool {
133+
if self.no_js_on_interval_fun || self.interval.is_zero() {
134+
return false;
135+
}
136+
137+
let now = Instant::now();
138+
match self.next_execution {
139+
Some(deadline) if now >= deadline => {
140+
// Time to execute - schedule next execution
141+
self.next_execution = Some(now + self.interval);
142+
true
143+
}
144+
None => {
145+
// First execution - initialize and execute
146+
self.next_execution = Some(now + self.interval);
147+
true
148+
}
149+
_ => false,
150+
}
151+
}
152+
119153
/// Trigger the onInterval function of the JS module
120154
///
121155
/// The "onInterval" function is passed 2 arguments
122156
/// - the current timestamp
123157
/// - the current flow step config
124158
///
125159
/// Return zero, one or more messages
160+
///
161+
/// Note: Caller should check should_execute_interval() before calling this
126162
pub async fn on_interval(
127163
&self,
128164
js: &JsRuntime,
129165
timestamp: DateTime,
130166
) -> Result<Vec<Message>, FlowError> {
131-
if self.no_js_on_interval_fun {
132-
return Ok(vec![]);
133-
}
134-
if !timestamp.tick_now(self.interval) {
135-
return Ok(vec![]);
136-
}
137167
debug!(target: "flows", "{}: onInterval({timestamp:?})", self.module_name());
138168
let input = vec![timestamp.into(), self.config.clone()];
139169
js.call_function(&self.module_name(), "onInterval", input)

crates/extensions/tedge_flows/src/runtime.rs

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,20 @@ impl MessageProcessor {
155155
topics
156156
}
157157

158+
/// Get the next deadline for interval execution across all scripts and database drains
159+
/// Returns None if no scripts have intervals configured and no database drains are scheduled
160+
pub fn next_interval_deadline(&self) -> Option<tokio::time::Instant> {
161+
let script_deadlines = self
162+
.flows
163+
.values()
164+
.flat_map(|flow| &flow.steps)
165+
.filter_map(|step| step.script.next_execution);
166+
167+
let drain_deadlines = self.flows.values().filter_map(|flow| flow.next_drain);
168+
169+
script_deadlines.chain(drain_deadlines).min()
170+
}
171+
158172
pub async fn on_message(
159173
&mut self,
160174
source: MessageSource,
@@ -206,20 +220,19 @@ impl MessageProcessor {
206220
timestamp: DateTime,
207221
) -> Vec<(String, Result<Vec<(DateTime, Message)>, DatabaseError>)> {
208222
let mut out_messages = vec![];
209-
for (flow_id, flow) in self.flows.iter() {
210-
if let FlowInput::MeaDB {
211-
series: input_series,
212-
frequency: input_frequency,
213-
max_age: input_span,
214-
} = &flow.input
215-
{
216-
if timestamp.tick_now(*input_frequency) {
223+
for (flow_id, flow) in self.flows.iter_mut() {
224+
if flow.should_drain_at(timestamp) {
225+
if let FlowInput::MeaDB {
226+
series: input_series,
227+
max_age: input_span,
228+
..
229+
} = &flow.input
230+
{
217231
let cutoff_time = timestamp - *input_span;
218232
let drained_messages = self
219233
.database
220234
.drain_older_than(cutoff_time, input_series)
221-
.await
222-
.map_err(DatabaseError::from);
235+
.await;
223236
out_messages.push((flow_id.to_owned(), drained_messages));
224237
}
225238
}
@@ -241,6 +254,7 @@ impl MessageProcessor {
241254
if step.script.path() == path {
242255
match self.js_runtime.load_script(&mut step.script).await {
243256
Ok(()) => {
257+
step.script.init_next_execution();
244258
info!(target: "flows", "Reloaded flow script {path}");
245259
}
246260
Err(e) => {

0 commit comments

Comments
 (0)