Skip to content

Commit ecdb27f

Browse files
authored
bugfixes (#1395)
bugfixes fixed state update
1 parent a8c1f80 commit ecdb27f

File tree

5 files changed

+87
-88
lines changed

5 files changed

+87
-88
lines changed

src/alerts/alert_types.rs

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use std::time::Duration;
2020

2121
use chrono::{DateTime, Utc};
2222
use tonic::async_trait;
23+
use tracing::trace;
2324
use ulid::Ulid;
2425

2526
use crate::{
@@ -204,8 +205,48 @@ impl AlertTrait for ThresholdAlert {
204205
Box::new(self.clone())
205206
}
206207

207-
fn set_state(&mut self, new_state: AlertState) {
208-
self.state = new_state
208+
async fn update_state(
209+
&mut self,
210+
is_manual: bool,
211+
new_state: AlertState,
212+
trigger_notif: Option<String>,
213+
) -> Result<(), AlertError> {
214+
let store = PARSEABLE.storage.get_object_store();
215+
match self.state {
216+
AlertState::Triggered => {
217+
if is_manual
218+
&& new_state != AlertState::Resolved
219+
&& new_state != AlertState::Silenced
220+
{
221+
let msg = format!("Not allowed to manually go from Triggered to {new_state}");
222+
return Err(AlertError::InvalidStateChange(msg));
223+
}
224+
}
225+
AlertState::Silenced => {
226+
if is_manual && new_state != AlertState::Resolved {
227+
let msg = format!("Not allowed to manually go from Silenced to {new_state}");
228+
return Err(AlertError::InvalidStateChange(msg));
229+
}
230+
}
231+
AlertState::Resolved => {
232+
if is_manual {
233+
let msg = format!("Not allowed to go manually from Resolved to {new_state}");
234+
return Err(AlertError::InvalidStateChange(msg));
235+
}
236+
}
237+
}
238+
// update state in memory
239+
self.state = new_state;
240+
// update on disk
241+
store.put_alert(self.id, &self.to_alert_config()).await?;
242+
243+
if trigger_notif.is_some() {
244+
trace!("trigger notif on-\n{}", self.state);
245+
self.to_alert_config()
246+
.trigger_notifications(trigger_notif.unwrap())
247+
.await?;
248+
}
249+
Ok(())
209250
}
210251
}
211252

src/alerts/alerts_utils.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,8 @@ async fn execute_remote_query(query: &str, time_range: &TimeRange) -> Result<f64
137137
fn convert_result_to_f64(result_value: serde_json::Value) -> Result<f64, AlertError> {
138138
// due to the previous validations, we can be sure that we get an array of objects with just one entry
139139
// [{"countField": Number(1120.251)}]
140-
if let Some(array_val) = result_value.as_array().filter(|arr| !arr.is_empty())
140+
if let Some(array_val) = result_value.as_array()
141+
&& !array_val.is_empty()
141142
&& let Some(object) = array_val[0].as_object()
142143
{
143144
let values = object.values().map(|v| v.as_f64().unwrap()).collect_vec();

src/alerts/mod.rs

Lines changed: 26 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -488,54 +488,6 @@ impl Display for AlertState {
488488
}
489489
}
490490

491-
impl AlertState {
492-
pub async fn update_state(
493-
&self,
494-
new_state: AlertState,
495-
alert_id: Ulid,
496-
) -> Result<(), AlertError> {
497-
match self {
498-
AlertState::Triggered => {
499-
if new_state == AlertState::Triggered {
500-
let msg = format!("Not allowed to manually go from Triggered to {new_state}");
501-
return Err(AlertError::InvalidStateChange(msg));
502-
} else {
503-
// update state on disk and in memory
504-
let guard = ALERTS.read().await;
505-
let alerts = guard.as_ref().ok_or_else(|| {
506-
AlertError::CustomError("Alert manager not initialized".into())
507-
})?;
508-
alerts
509-
.update_state(alert_id, new_state, Some("".into()))
510-
.await?;
511-
}
512-
}
513-
AlertState::Silenced => {
514-
// from here, the user can only go to Resolved
515-
if new_state == AlertState::Resolved {
516-
// update state on disk and in memory
517-
let guard = ALERTS.read().await;
518-
let alerts = guard.as_ref().ok_or_else(|| {
519-
AlertError::CustomError("Alert manager not initialized".into())
520-
})?;
521-
alerts
522-
.update_state(alert_id, new_state, Some("".into()))
523-
.await?;
524-
} else {
525-
let msg = format!("Not allowed to manually go from Silenced to {new_state}");
526-
return Err(AlertError::InvalidStateChange(msg));
527-
}
528-
}
529-
AlertState::Resolved => {
530-
// user shouldn't logically be changing states if current state is Resolved
531-
let msg = format!("Not allowed to go manually from Resolved to {new_state}");
532-
return Err(AlertError::InvalidStateChange(msg));
533-
}
534-
}
535-
Ok(())
536-
}
537-
}
538-
539491
#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, Default)]
540492
#[serde(rename_all = "camelCase")]
541493
pub enum Severity {
@@ -1530,11 +1482,11 @@ impl AlertManagerTrait for Alerts {
15301482
Ok(alerts)
15311483
}
15321484

1533-
/// Returns a sigle alert that the user has access to (based on query auth)
1534-
async fn get_alert_by_id(&self, id: Ulid) -> Result<AlertConfig, AlertError> {
1485+
/// Returns a single alert that the user has access to (based on query auth)
1486+
async fn get_alert_by_id(&self, id: Ulid) -> Result<Box<dyn AlertTrait>, AlertError> {
15351487
let read_access = self.alerts.read().await;
15361488
if let Some(alert) = read_access.get(&id) {
1537-
Ok(alert.to_alert_config())
1489+
Ok(alert.clone_box())
15381490
} else {
15391491
Err(AlertError::CustomError(format!(
15401492
"No alert found for the given ID- {id}"
@@ -1557,31 +1509,33 @@ impl AlertManagerTrait for Alerts {
15571509
new_state: AlertState,
15581510
trigger_notif: Option<String>,
15591511
) -> Result<(), AlertError> {
1560-
let store = PARSEABLE.storage.get_object_store();
1512+
// let store = PARSEABLE.storage.get_object_store();
15611513

15621514
// read and modify alert
1563-
let mut alert = self.get_alert_by_id(alert_id).await?;
1564-
trace!("get alert state by id-\n{}", alert.state);
1565-
1566-
alert.state = new_state;
1567-
1568-
trace!("new state-\n{}", alert.state);
1569-
1570-
// save to disk
1571-
store.put_alert(alert_id, &alert).await?;
1572-
1573-
// modify in memory
1574-
let mut writer = self.alerts.write().await;
1575-
if let Some(alert) = writer.get_mut(&alert_id) {
1576-
trace!("in memory alert-\n{}", alert.get_state());
1577-
alert.set_state(new_state);
1578-
trace!("in memory updated alert-\n{}", alert.get_state());
1515+
let mut write_access = self.alerts.write().await;
1516+
let mut alert: Box<dyn AlertTrait> = if let Some(alert) = write_access.get(&alert_id) {
1517+
match &alert.get_alert_type() {
1518+
AlertType::Threshold => {
1519+
Box::new(ThresholdAlert::from(alert.to_alert_config())) as Box<dyn AlertTrait>
1520+
}
1521+
AlertType::Anomaly => {
1522+
return Err(AlertError::NotPresentInOSS("anomaly".into()));
1523+
}
1524+
AlertType::Forecast => {
1525+
return Err(AlertError::NotPresentInOSS("forecast".into()));
1526+
}
1527+
}
1528+
} else {
1529+
return Err(AlertError::CustomError(format!(
1530+
"No alert found for the given ID- {alert_id}"
1531+
)));
15791532
};
1580-
drop(writer);
15811533

1582-
if trigger_notif.is_some() {
1583-
trace!("trigger notif on-\n{}", alert.state);
1584-
alert.trigger_notifications(trigger_notif.unwrap()).await?;
1534+
let current_state = alert.get_state();
1535+
1536+
if current_state.ne(&new_state) {
1537+
alert.update_state(false, new_state, trigger_notif).await?;
1538+
write_access.insert(*alert.get_id(), alert.clone_box());
15851539
}
15861540

15871541
Ok(())

src/alerts/traits.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,12 @@ use ulid::Ulid;
3030
pub trait AlertTrait: Debug + Send + Sync {
3131
async fn eval_alert(&self) -> Result<(bool, f64), AlertError>;
3232
async fn validate(&self, session_key: &SessionKey) -> Result<(), AlertError>;
33+
async fn update_state(
34+
&mut self,
35+
is_manual: bool,
36+
new_state: AlertState,
37+
trigger_notif: Option<String>,
38+
) -> Result<(), AlertError>;
3339
fn get_id(&self) -> &Ulid;
3440
fn get_severity(&self) -> &Severity;
3541
fn get_title(&self) -> &str;
@@ -46,7 +52,6 @@ pub trait AlertTrait: Debug + Send + Sync {
4652
fn get_datasets(&self) -> &Vec<String>;
4753
fn to_alert_config(&self) -> AlertConfig;
4854
fn clone_box(&self) -> Box<dyn AlertTrait>;
49-
fn set_state(&mut self, new_state: AlertState);
5055
}
5156

5257
#[async_trait]
@@ -57,7 +62,7 @@ pub trait AlertManagerTrait: Send + Sync {
5762
session: SessionKey,
5863
tags: Vec<String>,
5964
) -> Result<Vec<AlertConfig>, AlertError>;
60-
async fn get_alert_by_id(&self, id: Ulid) -> Result<AlertConfig, AlertError>;
65+
async fn get_alert_by_id(&self, id: Ulid) -> Result<Box<dyn AlertTrait>, AlertError>;
6166
async fn update(&self, alert: &dyn AlertTrait);
6267
async fn update_state(
6368
&self,

src/handlers/http/alerts.rs

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -132,9 +132,9 @@ pub async fn get(req: HttpRequest, alert_id: Path<Ulid>) -> Result<impl Responde
132132

133133
let alert = alerts.get_alert_by_id(alert_id).await?;
134134
// validate that the user has access to the tables mentioned in the query
135-
user_auth_for_query(&session_key, &alert.query).await?;
135+
user_auth_for_query(&session_key, alert.get_query()).await?;
136136

137-
Ok(web::Json(alert))
137+
Ok(web::Json(alert.to_alert_config()))
138138
}
139139

140140
// DELETE /alerts/{alert_id}
@@ -153,7 +153,7 @@ pub async fn delete(req: HttpRequest, alert_id: Path<Ulid>) -> Result<impl Respo
153153
let alert = alerts.get_alert_by_id(alert_id).await?;
154154

155155
// validate that the user has access to the tables mentioned in the query
156-
user_auth_for_query(&session_key, &alert.query).await?;
156+
user_auth_for_query(&session_key, alert.get_query()).await?;
157157

158158
let store = PARSEABLE.storage.get_object_store();
159159
let alert_path = alert_json_path(alert_id);
@@ -191,9 +191,9 @@ pub async fn update_state(
191191
};
192192

193193
// check if alert id exists in map
194-
let alert = alerts.get_alert_by_id(alert_id).await?;
194+
let mut alert = alerts.get_alert_by_id(alert_id).await?;
195195
// validate that the user has access to the tables mentioned in the query
196-
user_auth_for_query(&session_key, &alert.query).await?;
196+
user_auth_for_query(&session_key, alert.get_query()).await?;
197197

198198
let query_string = req.query_string();
199199

@@ -212,14 +212,12 @@ pub async fn update_state(
212212
));
213213
}
214214

215-
// get current state
216-
let current_state = alerts.get_state(alert_id).await?;
217-
218215
let new_state = AlertState::from_str(state_value)?;
216+
alert.update_state(true, new_state, Some("".into())).await?;
219217

220-
current_state.update_state(new_state, alert_id).await?;
221-
let alert = alerts.get_alert_by_id(alert_id).await?;
222-
Ok(web::Json(alert))
218+
alerts.update(&*alert).await;
219+
220+
Ok(web::Json(alert.to_alert_config()))
223221
}
224222

225223
pub async fn list_tags() -> Result<impl Responder, AlertError> {

0 commit comments

Comments
 (0)