Skip to content

Commit 9eb21ce

Browse files
authored
fix: scheduled pipeline align query time instead of next_run_at (openobserve#7007)
1 parent 36c71ee commit 9eb21ce

File tree

2 files changed

+30
-69
lines changed

2 files changed

+30
-69
lines changed

src/service/alerts/derived_streams.rs

Lines changed: 1 addition & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@ pub async fn save(
142142
};
143143
}
144144

145+
let next_run_at = chrono::Utc::now().timestamp_micros();
145146
// Save the trigger to db
146147
match db::scheduler::get(
147148
&derived_stream.org_id,
@@ -151,17 +152,12 @@ pub async fn save(
151152
.await
152153
{
153154
Ok(mut existing_trigger) => {
154-
let next_run_at = get_next_run_at(
155-
derived_stream.delay.unwrap_or_default(),
156-
Some(existing_trigger.next_run_at),
157-
)?;
158155
existing_trigger.next_run_at = next_run_at;
159156
db::scheduler::update_trigger(existing_trigger)
160157
.await
161158
.map_err(|_| anyhow::anyhow!("Trigger already exists, but failed to update"))
162159
}
163160
Err(_) => {
164-
let next_run_at = get_next_run_at(derived_stream.delay.unwrap_or_default(), None)?;
165161
let trigger = db::scheduler::Trigger {
166162
org: derived_stream.org_id.to_string(),
167163
module: db::scheduler::TriggerModule::DerivedStream,
@@ -226,26 +222,3 @@ impl DerivedStreamExt for DerivedStream {
226222
.await
227223
}
228224
}
229-
230-
fn get_next_run_at(
231-
delay_in_mins: i32,
232-
previous_next_run_at: Option<i64>,
233-
) -> Result<i64, anyhow::Error> {
234-
// validate & parse delay value
235-
if delay_in_mins < 0 {
236-
return Err(anyhow::anyhow!(
237-
"Invalid delay value. Value must be non-negative"
238-
));
239-
}
240-
241-
let delay = chrono::Duration::minutes(delay_in_mins as _);
242-
let supposed_next_run_at =
243-
previous_next_run_at.map_or(Ok(chrono::Utc::now()), |prev_next_run_at| {
244-
chrono::DateTime::<chrono::Utc>::from_timestamp_micros(prev_next_run_at)
245-
.ok_or(anyhow::anyhow!("Invalid previous next run at timestamp"))
246-
})?;
247-
Ok(supposed_next_run_at
248-
.checked_add_signed(delay)
249-
.ok_or(anyhow::anyhow!("DateTime arithmetic overflow"))?
250-
.timestamp_micros())
251-
}

src/service/alerts/scheduler/handlers.rs

Lines changed: 29 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1014,7 +1014,7 @@ async fn handle_derived_stream_triggers(
10141014
chrono::Duration::try_minutes(delay_in_mins as _).and_then(|td| td.num_microseconds())
10151015
})
10161016
.unwrap_or_default();
1017-
let mut supposed_to_be_run_at = trigger.next_run_at - user_defined_delay;
1017+
let supposed_to_be_run_at = trigger.next_run_at - user_defined_delay;
10181018
let period_num_microseconds = Duration::try_minutes(derived_stream.trigger_condition.period)
10191019
.unwrap()
10201020
.num_microseconds()
@@ -1023,24 +1023,6 @@ async fn handle_derived_stream_triggers(
10231023
let (mut start, mut end) = if derived_stream.start_at.is_some() && trigger.data.is_empty() {
10241024
(derived_stream.start_at, supposed_to_be_run_at)
10251025
} else if let Some(t0) = start_time {
1026-
// Don't use only the period_num_microseconds, because, then the delay is lets say 10 secs
1027-
// The following code will make a separate query to cover the delay period of 10 secs which
1028-
// is unnecessary. Hence, we need to check how big the delay is.
1029-
// Note: For pipeline, period and frequency both have the same value.
1030-
1031-
// For derived stream, period is in minutes, so we need to convert it to seconds for
1032-
// align_time
1033-
if derived_stream.trigger_condition.align_time {
1034-
let aligned_curr_time = TriggerCondition::align_time(
1035-
supposed_to_be_run_at,
1036-
derived_stream.tz_offset,
1037-
derived_stream.trigger_condition.period * 60,
1038-
);
1039-
// conditionally modify supposed_to_be_run_at
1040-
if aligned_curr_time > t0 {
1041-
supposed_to_be_run_at = aligned_curr_time;
1042-
}
1043-
}
10441026
// If the delay is equal to or greater than the frequency, we need to ingest data one by
10451027
// one If the delay is less than the frequency, we need to ingest data for
10461028
// the "next run at" period, For example, if the current time is 5:19pm,
@@ -1057,6 +1039,18 @@ async fn handle_derived_stream_triggers(
10571039
} else {
10581040
(None, supposed_to_be_run_at)
10591041
};
1042+
// For derived stream, period is in minutes, so we need to convert it to seconds for align_time
1043+
if derived_stream.trigger_condition.align_time {
1044+
let aligned_curr_time = TriggerCondition::align_time(
1045+
end,
1046+
derived_stream.tz_offset,
1047+
derived_stream.trigger_condition.period * 60,
1048+
);
1049+
// conditionally modify supposed_to_be_run_at
1050+
if start.is_none_or(|t0| t0 < aligned_curr_time) {
1051+
end = aligned_curr_time;
1052+
}
1053+
}
10601054

10611055
// In case the scheduler background job (watch_timeout) updates the trigger retries
10621056
// (not through this handler), we need to skip to the next run at but with the same
@@ -1069,12 +1063,14 @@ async fn handle_derived_stream_triggers(
10691063
new_trigger.module_key
10701064
);
10711065
// Go to the next nun at, but use the same trigger start time
1072-
new_trigger.next_run_at = derived_stream.trigger_condition.get_next_trigger_time(
1073-
false,
1074-
derived_stream.tz_offset,
1075-
false,
1076-
Some(end + user_defined_delay),
1077-
)?;
1066+
new_trigger.next_run_at = derived_stream
1067+
.trigger_condition
1068+
.get_next_trigger_time_non_aligned(
1069+
false,
1070+
derived_stream.tz_offset,
1071+
false,
1072+
Some(trigger.next_run_at),
1073+
)?;
10781074
// Start over next time
10791075
new_trigger.retries = 0;
10801076
db::scheduler::update_trigger(new_trigger).await?;
@@ -1298,11 +1294,6 @@ async fn handle_derived_stream_triggers(
12981294
} else {
12991295
// SUCCESS: move the time range forward by frequency and continue
13001296
start = Some(trigger_results.end_time);
1301-
// There could still be some data to be processed for the current period
1302-
// so we need to move the end time forward by the period length or the
1303-
// remaining time whichever is smaller
1304-
let _end = period_num_microseconds + 1;
1305-
end += _end;
13061297
trigger_data_stream.query_took = trigger_results.query_took;
13071298
}
13081299
} else {
@@ -1316,11 +1307,6 @@ async fn handle_derived_stream_triggers(
13161307

13171308
// move the time range forward by frequency and continue
13181309
start = Some(trigger_results.end_time);
1319-
// There could still be some data to be processed for the current period
1320-
// so we need to move the end time forward by the period length or the remaining
1321-
// time whichever is smaller
1322-
let _end = period_num_microseconds + 1;
1323-
end += _end;
13241310
}
13251311
}
13261312
};
@@ -1345,12 +1331,14 @@ async fn handle_derived_stream_triggers(
13451331
&& new_trigger.retries < max_retries)
13461332
{
13471333
// Go to the next nun at, but use the same trigger start time
1348-
new_trigger.next_run_at = derived_stream.trigger_condition.get_next_trigger_time(
1349-
false,
1350-
derived_stream.tz_offset,
1351-
false,
1352-
Some(end + user_defined_delay),
1353-
)?;
1334+
new_trigger.next_run_at = derived_stream
1335+
.trigger_condition
1336+
.get_next_trigger_time_non_aligned(
1337+
false,
1338+
derived_stream.tz_offset,
1339+
false,
1340+
Some(trigger.next_run_at),
1341+
)?;
13541342

13551343
// If the trigger didn't fail, we need to reset the `retries` count.
13561344
// Only cumulative failures should be used to check with `max_retries`

0 commit comments

Comments
 (0)