Skip to content

Commit 593725e

Browse files
committed
Replace polling with sleeps in handling intervals in tedge flows
Signed-off-by: James Rhodes <jarhodes314@gmail.com>
1 parent 2db635d commit 593725e

File tree

2 files changed

+16
-20
lines changed

2 files changed

+16
-20
lines changed

crates/extensions/tedge_flows/src/runtime.rs

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -168,8 +168,8 @@ impl MessageProcessor {
168168
script_deadlines.chain(drain_deadlines)
169169
}
170170

171-
/// Get the next deadline for interval execution across all scripts
172-
/// Returns None if no scripts have intervals configured
171+
/// Get the next deadline for interval execution across all scripts and database drains
172+
/// Returns None if no scripts have intervals configured and no database drains are scheduled
173173
pub fn next_interval_deadline(&self) -> Option<tokio::time::Instant> {
174174
self.deadlines().min()
175175
}
@@ -239,18 +239,16 @@ impl MessageProcessor {
239239
if flow.should_drain_at(timestamp) {
240240
if let FlowInput::MeaDB {
241241
series: input_series,
242-
frequency: input_frequency,
243242
max_age: input_span,
243+
..
244244
} = &flow.input
245245
{
246-
if timestamp.tick_now(*input_frequency) {
247246
let cutoff_time = timestamp - *input_span;
248-
let drained_messages = self
249-
.database
250-
.drain_older_than(cutoff_time, input_series)
251-
.await;
252-
out_messages.push((flow_id.to_owned(), drained_messages));
253-
}
247+
let drained_messages = self
248+
.database
249+
.drain_older_than(cutoff_time, input_series)
250+
.await;
251+
out_messages.push((flow_id.to_owned(), drained_messages));
254252
}
255253
}
256254
}

crates/extensions/tedge_flows/tests/mea_db_integration.rs

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -679,8 +679,8 @@ async fn real_actor_database_to_mqtt_integration() {
679679
// Config: Database input → MQTT output with very short timings for fast test
680680
let drain_config = r#"
681681
input.db.series = "sensor-data"
682-
input.db.frequency = "1s"
683-
input.db.max_age = "2s"
682+
input.db.frequency = "1ms"
683+
input.db.max_age = "2ms"
684684
685685
steps = [
686686
{ script = "identity.js" }
@@ -720,10 +720,10 @@ async fn real_actor_database_to_mqtt_integration() {
720720

721721
tokio::spawn(flows_actor.run());
722722

723-
// Wait for at least 3 seconds for the actor's interval timer to trigger draining
724-
// At T=0, T=1s, T=2s the message won't be drained (not old enough)
725-
// At T=3s the message will be drained (3s old > 2s max_age)
726-
sleep(Duration::from_millis(3200)).await;
723+
// Wait for at least 3 milliseconds for the actor's interval timer to trigger draining
724+
// At T=0, T=1ms, T=2ms the message won't be drained (not old enough)
725+
// At T=3ms the message will be drained (3ms old > 2ms max_age)
726+
sleep(Duration::from_millis(3)).await;
727727

728728
// Check for MQTT output messages from the actor
729729
let mut received_messages = vec![];
@@ -806,8 +806,7 @@ async fn flow_that_outputs_multiple_messages_persists_all_to_database() {
806806
return messages;
807807
}
808808
"#;
809-
std::fs::write(config_dir.join("splitter.js"), js_content)
810-
.expect("Failed to write JS file");
809+
std::fs::write(config_dir.join("splitter.js"), js_content).expect("Failed to write JS file");
811810

812811
// Create flow config that outputs multiple messages to database
813812
let config = r#"
@@ -819,8 +818,7 @@ async fn flow_that_outputs_multiple_messages_persists_all_to_database() {
819818
820819
output.db.series = "split-sensor-data"
821820
"#;
822-
std::fs::write(config_dir.join("split_flow.toml"), config)
823-
.expect("Failed to write config");
821+
std::fs::write(config_dir.join("split_flow.toml"), config).expect("Failed to write config");
824822

825823
// Build FlowsMapper actor with mock MQTT
826824
let mut flows_builder = FlowsMapperBuilder::try_new(Utf8Path::from_path(config_dir).unwrap())

0 commit comments

Comments
 (0)