Skip to content

Commit 4c5784b

Browse files
committed
bugfixes
deadlock issue with alert updates
1 parent 838191a commit 4c5784b

File tree

3 files changed

+78
-86
lines changed

3 files changed

+78
-86
lines changed

src/alerts/mod.rs

Lines changed: 40 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ pub async fn set_alert_manager(manager: Arc<dyn AlertManagerTrait>) {
9292
}
9393

9494
pub fn create_default_alerts_manager() -> Alerts {
95-
let (tx, rx) = mpsc::channel::<AlertTask>(10);
95+
let (tx, rx) = mpsc::channel::<AlertTask>(1000);
9696
let alerts = Alerts {
9797
alerts: RwLock::new(HashMap::new()),
9898
sender: tx,
@@ -1117,52 +1117,62 @@ impl AlertManagerTrait for Alerts {
11171117
new_state: AlertState,
11181118
trigger_notif: Option<String>,
11191119
) -> Result<(), AlertError> {
1120-
// let store = PARSEABLE.storage.get_object_store();
1121-
1122-
// read and modify alert
1123-
let mut write_access = self.alerts.write().await;
1124-
let mut alert: Box<dyn AlertTrait> = if let Some(alert) = write_access.get(&alert_id) {
1125-
match &alert.get_alert_type() {
1126-
AlertType::Threshold => {
1127-
Box::new(ThresholdAlert::from(alert.to_alert_config())) as Box<dyn AlertTrait>
1128-
}
1129-
AlertType::Anomaly(_) => {
1130-
return Err(AlertError::NotPresentInOSS("anomaly"));
1131-
}
1132-
AlertType::Forecast(_) => {
1133-
return Err(AlertError::NotPresentInOSS("forecast"));
1120+
let (mut alert, should_delete_task, should_create_task) = {
1121+
let read_access = self.alerts.read().await;
1122+
let alert = if let Some(alert) = read_access.get(&alert_id) {
1123+
match &alert.get_alert_type() {
1124+
AlertType::Threshold => Box::new(ThresholdAlert::from(alert.to_alert_config()))
1125+
as Box<dyn AlertTrait>,
1126+
AlertType::Anomaly(_) => {
1127+
return Err(AlertError::NotPresentInOSS("anomaly"));
1128+
}
1129+
AlertType::Forecast(_) => {
1130+
return Err(AlertError::NotPresentInOSS("forecast"));
1131+
}
11341132
}
1135-
}
1136-
} else {
1137-
return Err(AlertError::CustomError(format!(
1138-
"No alert found for the given ID- {alert_id}"
1139-
)));
1140-
};
1133+
} else {
1134+
return Err(AlertError::CustomError(format!(
1135+
"No alert found for the given ID- {alert_id}"
1136+
)));
1137+
};
11411138

1142-
// if new state is Disabled then ensure that the task is removed from list
1143-
if new_state.eq(&AlertState::Disabled) {
1144-
if alert.get_state().eq(&AlertState::Disabled) {
1139+
let current_state = *alert.get_state();
1140+
let should_delete_task =
1141+
new_state.eq(&AlertState::Disabled) && !current_state.eq(&AlertState::Disabled);
1142+
let should_create_task =
1143+
current_state.eq(&AlertState::Disabled) && new_state.eq(&AlertState::NotTriggered);
1144+
1145+
if new_state.eq(&AlertState::Disabled) && current_state.eq(&AlertState::Disabled) {
11451146
return Err(AlertError::InvalidStateChange(
11461147
"Can't disable an alert which is currently disabled".into(),
11471148
));
11481149
}
11491150

1151+
(alert, should_delete_task, should_create_task)
1152+
}; // Read lock released here
1153+
1154+
// Handle task operations without holding any locks
1155+
if should_delete_task {
11501156
self.sender
11511157
.send(AlertTask::Delete(alert_id))
11521158
.await
11531159
.map_err(|e| AlertError::CustomError(e.to_string()))?;
1154-
}
1155-
// user has resumed evals for this alert
1156-
else if alert.get_state().eq(&AlertState::Disabled)
1157-
&& new_state.eq(&AlertState::NotTriggered)
1158-
{
1160+
} else if should_create_task {
11591161
self.sender
11601162
.send(AlertTask::Create(alert.clone_box()))
11611163
.await
11621164
.map_err(|e| AlertError::CustomError(e.to_string()))?;
11631165
}
1166+
1167+
// Update the alert state
11641168
alert.update_state(new_state, trigger_notif).await?;
1165-
write_access.insert(*alert.get_id(), alert.clone_box());
1169+
1170+
// Finally, update the in-memory state with a brief write lock
1171+
{
1172+
let mut write_access = self.alerts.write().await;
1173+
write_access.insert(*alert.get_id(), alert.clone_box());
1174+
}
1175+
11661176
Ok(())
11671177
}
11681178

src/alerts/target.rs

Lines changed: 21 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -270,33 +270,34 @@ impl Target {
270270
let alert_id = alert_context.alert_info.alert_id;
271271

272272
let sleep_and_check_if_call =
273-
move |timeout_state: Arc<Mutex<TimeoutState>>, current_state: AlertState| {
274-
async move {
275-
tokio::time::sleep(Duration::from_secs(timeout * 60)).await;
273+
move |timeout_state: Arc<Mutex<TimeoutState>>, current_state: AlertState| async move {
274+
tokio::time::sleep(Duration::from_secs(timeout * 60)).await;
276275

277-
let mut state = timeout_state.lock().unwrap();
276+
let mut state = timeout_state.lock().unwrap();
278277

279-
if current_state == AlertState::Triggered {
280-
// it is still firing .. sleep more and come back
281-
state.awaiting_resolve = true;
282-
true
283-
} else {
284-
state.timed_out = false;
285-
false
286-
}
278+
if current_state == AlertState::Triggered {
279+
state.awaiting_resolve = true;
280+
true
281+
} else {
282+
state.timed_out = false;
283+
false
287284
}
288285
};
289286

290287
trace!("Spawning retry task");
291288
tokio::spawn(async move {
292-
let guard = ALERTS.read().await;
293-
let alerts = if let Some(alerts) = guard.as_ref() {
294-
alerts
295-
} else {
296-
error!("No AlertManager set for alert_id: {alert_id}, stopping timeout task");
297-
*state.lock().unwrap() = TimeoutState::default();
298-
return;
299-
};
289+
// Get alerts manager reference once at the start
290+
let alerts = {
291+
let guard = ALERTS.read().await;
292+
if let Some(alerts) = guard.as_ref() {
293+
alerts.clone()
294+
} else {
295+
error!("No AlertManager set for alert_id: {alert_id}, stopping timeout task");
296+
*state.lock().unwrap() = TimeoutState::default();
297+
return;
298+
}
299+
}; // Lock released immediately
300+
300301
match retry {
301302
Retry::Infinite => loop {
302303
let current_state = if let Ok(state) = alerts.get_state(alert_id).await {
@@ -333,15 +334,6 @@ impl Target {
333334
call_target(target.clone(), alert_context.clone())
334335
}
335336
}
336-
// // fallback for if this task only observed FIRING on all RETRIES
337-
// // Stream might be dead and sending too many alerts is not great
338-
// // Send and alert stating that this alert will only work once it has seen a RESOLVE
339-
// state.lock().unwrap().timed_out = false;
340-
// let context = alert_context;
341-
// // context.alert_info.message = format!(
342-
// // "Triggering alert did not resolve itself after {times} retries, This alert is paused until it resolves");
343-
// // Send and exit this task.
344-
// call_target(target, context);
345337
}
346338
}
347339
*state.lock().unwrap() = TimeoutState::default();

src/handlers/http/alerts.rs

Lines changed: 17 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -333,39 +333,38 @@ pub async fn modify_alert(
333333
let session_key = extract_session_key_from_req(&req)?;
334334
let alert_id = alert_id.into_inner();
335335

336-
let guard = ALERTS.write().await;
337-
let alerts = if let Some(alerts) = guard.as_ref() {
338-
alerts
339-
} else {
340-
return Err(AlertError::CustomError("No AlertManager set".into()));
336+
// Get alerts manager reference without holding the global lock
337+
let alerts = {
338+
let guard = ALERTS.read().await;
339+
if let Some(alerts) = guard.as_ref() {
340+
alerts.clone()
341+
} else {
342+
return Err(AlertError::CustomError("No AlertManager set".into()));
343+
}
341344
};
342345

343-
// check if alert id exists in map
346+
// Validate and prepare the new alert
344347
let alert = alerts.get_alert_by_id(alert_id).await?;
345-
346-
// validate that the user has access to the tables mentioned in the query
347348
user_auth_for_query(&session_key, alert.get_query()).await?;
348349

349-
// validate the request
350350
let mut new_config = alert_request.into().await?;
351-
352351
if &new_config.alert_type != alert.get_alert_type() {
353352
return Err(AlertError::InvalidAlertModifyRequest);
354353
}
355354

356355
user_auth_for_query(&session_key, &new_config.query).await?;
357356

358-
// calculate the `times` for notification config
357+
// Calculate notification config
359358
let eval_freq = new_config.get_eval_frequency();
360359
let notif_freq = new_config.notification_config.interval;
361360
let times = if (eval_freq / notif_freq) == 0 {
362361
1
363362
} else {
364363
(eval_freq / notif_freq) as usize
365364
};
366-
367365
new_config.notification_config.times = Retry::Finite(times);
368366

367+
// Prepare the updated config
369368
let mut old_config = alert.to_alert_config();
370369
old_config.threshold_config = new_config.threshold_config;
371370
old_config.datasets = new_config.datasets;
@@ -377,12 +376,8 @@ pub async fn modify_alert(
377376
old_config.targets = new_config.targets;
378377
old_config.title = new_config.title;
379378

380-
let threshold_alert;
381-
let new_alert: &dyn AlertTrait = match &new_config.alert_type {
382-
AlertType::Threshold => {
383-
threshold_alert = ThresholdAlert::from(old_config);
384-
&threshold_alert
385-
}
379+
let new_alert: Box<dyn AlertTrait> = match &new_config.alert_type {
380+
AlertType::Threshold => Box::new(ThresholdAlert::from(old_config)) as Box<dyn AlertTrait>,
386381
AlertType::Anomaly(_) => {
387382
return Err(AlertError::NotPresentInOSS("anomaly"));
388383
}
@@ -391,24 +386,19 @@ pub async fn modify_alert(
391386
}
392387
};
393388

394-
// save the new alert in ObjectStore
395-
alerts.update(new_alert).await;
389+
// Perform I/O operations
396390
let path = alert_json_path(*new_alert.get_id());
397391
let store = PARSEABLE.storage.get_object_store();
398392
let alert_bytes = serde_json::to_vec(&new_alert.to_alert_config())?;
399393
store.put_object(&path, Bytes::from(alert_bytes)).await?;
400394

401-
// remove the task
395+
// Now perform the atomic operations
402396
alerts.delete_task(alert_id).await?;
403-
404-
// remove alert from memory
405397
alerts.delete(alert_id).await?;
406-
407-
let config = new_alert.to_alert_config().to_response();
408-
409-
// start the task
398+
alerts.update(&*new_alert).await;
410399
alerts.start_task(new_alert.clone_box()).await?;
411400

401+
let config = new_alert.to_alert_config().to_response();
412402
Ok(web::Json(config))
413403
}
414404

0 commit comments

Comments
 (0)