Skip to content

Commit 5ca894a

Browse files
authored
fix: pipeline next_run_at with delay (openobserve#6872)
1 parent ba6ba30 commit 5ca894a

File tree

4 files changed

+347
-325
lines changed

4 files changed

+347
-325
lines changed

src/config/src/meta/alerts/mod.rs

Lines changed: 35 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ impl TriggerCondition {
7070
freq_in_secs: bool,
7171
timezone_offset: i32,
7272
apply_silence: bool,
73+
start_from: Option<i64>,
7374
) -> Result<i64, anyhow::Error> {
7475
let frequency = if freq_in_secs {
7576
self.frequency
@@ -84,11 +85,16 @@ impl TriggerCondition {
8485
}
8586
_ => 0,
8687
};
88+
let start_utc = start_from.map_or(Ok(Utc::now()), |from| {
89+
chrono::DateTime::<Utc>::from_timestamp_micros(from).ok_or(anyhow::anyhow!(
90+
"Error converting start_from value to timestamp"
91+
))
92+
})?;
8793
let timezone_offset = FixedOffset::east_opt(timezone_offset * 60).unwrap();
8894
if self.frequency_type == FrequencyType::Cron {
8995
let schedule = Schedule::from_str(&self.cron)?;
9096
if apply_silence {
91-
let silence = Utc::now() + Duration::try_minutes(self.silence).unwrap();
97+
let silence = start_utc + Duration::try_minutes(self.silence).unwrap();
9298
let silence = silence.with_timezone(&timezone_offset);
9399
// Check for the cron timestamp after the silence period
94100
Ok(schedule.after(&silence).next().unwrap().timestamp_micros() + tolerance)
@@ -110,14 +116,14 @@ impl TriggerCondition {
110116
// should use the max of (frequency, silence) as the next_run_at.
111117
// Silence period is in minutes, and the frequency is in seconds.
112118
let delta = std::cmp::max(frequency, self.silence * 60);
113-
Ok(Utc::now().timestamp_micros()
119+
Ok(start_utc.timestamp_micros()
114120
+ Duration::try_seconds(delta)
115121
.unwrap()
116122
.num_microseconds()
117123
.unwrap()
118124
+ tolerance)
119125
} else {
120-
Ok(Utc::now().timestamp_micros()
126+
Ok(start_utc.timestamp_micros()
121127
+ Duration::try_seconds(frequency)
122128
.unwrap()
123129
.num_microseconds()
@@ -183,9 +189,14 @@ impl TriggerCondition {
183189
freq_in_secs: bool,
184190
timezone_offset: i32,
185191
apply_silence: bool,
192+
start_from: Option<i64>,
186193
) -> Result<i64, anyhow::Error> {
187-
let next_run_at =
188-
self.get_next_trigger_time_non_aligned(freq_in_secs, timezone_offset, apply_silence)?;
194+
let next_run_at = self.get_next_trigger_time_non_aligned(
195+
freq_in_secs,
196+
timezone_offset,
197+
apply_silence,
198+
start_from,
199+
)?;
189200
// Cron frequency is handled by the cron library, so we don't need to align it
190201
if self.frequency_type != FrequencyType::Cron {
191202
// `align_time` expects frequency in seconds, so convert if necessary
@@ -205,11 +216,22 @@ impl TriggerCondition {
205216
freq_in_secs: bool,
206217
timezone_offset: i32,
207218
apply_silence: bool,
219+
start_from: Option<i64>,
208220
) -> Result<i64, anyhow::Error> {
209221
if self.align_time {
210-
self.get_aligned_next_trigger_time(freq_in_secs, timezone_offset, apply_silence)
222+
self.get_aligned_next_trigger_time(
223+
freq_in_secs,
224+
timezone_offset,
225+
apply_silence,
226+
start_from,
227+
)
211228
} else {
212-
self.get_next_trigger_time_non_aligned(freq_in_secs, timezone_offset, apply_silence)
229+
self.get_next_trigger_time_non_aligned(
230+
freq_in_secs,
231+
timezone_offset,
232+
apply_silence,
233+
start_from,
234+
)
213235
}
214236
}
215237
}
@@ -595,7 +617,7 @@ mod test {
595617
..Default::default()
596618
};
597619
let result = condition
598-
.get_next_trigger_time_non_aligned(true, 0, false)
620+
.get_next_trigger_time_non_aligned(true, 0, false, None)
599621
.unwrap();
600622
let dt = DateTime::from_timestamp_micros(result).unwrap();
601623
let after_5_minutes = Utc::now() + Duration::minutes(5);
@@ -609,7 +631,7 @@ mod test {
609631
..Default::default()
610632
};
611633
let result = condition
612-
.get_next_trigger_time_non_aligned(true, 0, false)
634+
.get_next_trigger_time_non_aligned(true, 0, false, None)
613635
.unwrap();
614636
let dt = DateTime::from_timestamp_micros(result).unwrap();
615637
assert_eq!(dt.minute() % 5, 0);
@@ -622,7 +644,7 @@ mod test {
622644
..Default::default()
623645
};
624646
let result = condition
625-
.get_next_trigger_time_non_aligned(true, 0, true)
647+
.get_next_trigger_time_non_aligned(true, 0, true, None)
626648
.unwrap();
627649
let dt = DateTime::from_timestamp_micros(result).unwrap();
628650
// The next trigger should be after the silence period
@@ -637,7 +659,7 @@ mod test {
637659
..Default::default()
638660
};
639661
let result = condition
640-
.get_next_trigger_time_non_aligned(true, 0, false)
662+
.get_next_trigger_time_non_aligned(true, 0, false, None)
641663
.unwrap();
642664
let dt = DateTime::from_timestamp_micros(result).unwrap();
643665
// The next trigger should be within the tolerance range
@@ -659,7 +681,7 @@ mod test {
659681

660682
// Mock the current time for testing
661683
let result = condition
662-
.get_aligned_next_trigger_time(true, 0, false)
684+
.get_aligned_next_trigger_time(true, 0, false, None)
663685
.unwrap();
664686
let dt = DateTime::from_timestamp_micros(result).unwrap();
665687

@@ -682,7 +704,7 @@ mod test {
682704
// For cron expressions, the time should not be aligned by our function
683705
// as it's handled by the cron library
684706
let result = condition
685-
.get_aligned_next_trigger_time(true, 0, false)
707+
.get_aligned_next_trigger_time(true, 0, false, None)
686708
.unwrap();
687709
let dt = DateTime::from_timestamp_micros(result).unwrap();
688710

src/service/alerts/derived_streams.rs

Lines changed: 42 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -143,24 +143,38 @@ pub async fn save(
143143
}
144144

145145
// Save the trigger to db
146-
let next_run_at = get_next_run_at(&derived_stream)?;
147-
let trigger = db::scheduler::Trigger {
148-
org: derived_stream.org_id.to_string(),
149-
module: db::scheduler::TriggerModule::DerivedStream,
150-
module_key: trigger_module_key,
151-
next_run_at,
152-
is_realtime: false,
153-
is_silenced: false,
154-
..Default::default()
155-
};
156-
157-
match db::scheduler::get(&trigger.org, trigger.module.clone(), &trigger.module_key).await {
158-
Ok(_) => db::scheduler::update_trigger(trigger)
159-
.await
160-
.map_err(|_| anyhow::anyhow!("Trigger already exists, but failed to update")),
161-
Err(_) => db::scheduler::push(trigger)
162-
.await
163-
.map_err(|e| anyhow::anyhow!("Error save DerivedStream trigger: {}", e)),
146+
match db::scheduler::get(
147+
&derived_stream.org_id,
148+
db::scheduler::TriggerModule::DerivedStream,
149+
&trigger_module_key,
150+
)
151+
.await
152+
{
153+
Ok(mut existing_trigger) => {
154+
let next_run_at = get_next_run_at(
155+
derived_stream.delay.unwrap_or_default(),
156+
Some(existing_trigger.next_run_at),
157+
)?;
158+
existing_trigger.next_run_at = next_run_at;
159+
db::scheduler::update_trigger(existing_trigger)
160+
.await
161+
.map_err(|_| anyhow::anyhow!("Trigger already exists, but failed to update"))
162+
}
163+
Err(_) => {
164+
let next_run_at = get_next_run_at(derived_stream.delay.unwrap_or_default(), None)?;
165+
let trigger = db::scheduler::Trigger {
166+
org: derived_stream.org_id.to_string(),
167+
module: db::scheduler::TriggerModule::DerivedStream,
168+
module_key: trigger_module_key,
169+
next_run_at,
170+
is_realtime: false,
171+
is_silenced: false,
172+
..Default::default()
173+
};
174+
db::scheduler::push(trigger)
175+
.await
176+
.map_err(|e| anyhow::anyhow!("Error save DerivedStream trigger: {}", e))
177+
}
164178
}
165179
}
166180

@@ -213,8 +227,10 @@ impl DerivedStreamExt for DerivedStream {
213227
}
214228
}
215229

216-
pub(super) fn get_next_run_at(derived_stream: &DerivedStream) -> Result<i64, anyhow::Error> {
217-
let delay_in_mins = derived_stream.delay.unwrap_or_default();
230+
fn get_next_run_at(
231+
delay_in_mins: i32,
232+
previous_next_run_at: Option<i64>,
233+
) -> Result<i64, anyhow::Error> {
218234
// validate & parse delay value
219235
if delay_in_mins < 0 {
220236
return Err(anyhow::anyhow!(
@@ -223,7 +239,12 @@ pub(super) fn get_next_run_at(derived_stream: &DerivedStream) -> Result<i64, any
223239
}
224240

225241
let delay = chrono::Duration::minutes(delay_in_mins as _);
226-
Ok(chrono::Utc::now()
242+
let supposed_next_run_at =
243+
previous_next_run_at.map_or(Ok(chrono::Utc::now()), |prev_next_run_at| {
244+
chrono::DateTime::<chrono::Utc>::from_timestamp_micros(prev_next_run_at)
245+
.ok_or(anyhow::anyhow!("Invalid previous next run at timestamp"))
246+
})?;
247+
Ok(supposed_next_run_at
227248
.checked_add_signed(delay)
228249
.ok_or(anyhow::anyhow!("DateTime arithmetic overflow"))?
229250
.timestamp_micros())

0 commit comments

Comments
 (0)