diff --git a/src/alerts/mod.rs b/src/alerts/mod.rs index a80fd076e..6429f519b 100644 --- a/src/alerts/mod.rs +++ b/src/alerts/mod.rs @@ -19,7 +19,7 @@ use actix_web::http::header::ContentType; use arrow_schema::{DataType, Schema}; use async_trait::async_trait; -use chrono::Utc; +use chrono::{DateTime, Utc}; use datafusion::logical_expr::{LogicalPlan, Projection}; use datafusion::sql::sqlparser::parser::ParserError; use derive_more::FromStrError; @@ -197,6 +197,14 @@ pub enum AlertType { Threshold, } +impl Display for AlertType { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + AlertType::Threshold => write!(f, "threshold"), + } + } +} + #[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] #[serde(rename_all = "camelCase")] pub enum AlertOperator { @@ -528,6 +536,7 @@ pub struct AlertRequest { pub threshold_config: ThresholdConfig, pub eval_config: EvalConfig, pub targets: Vec, + pub tags: Option>, } impl AlertRequest { @@ -536,17 +545,21 @@ impl AlertRequest { for id in &self.targets { TARGETS.get_target_by_id(id).await?; } + let datasets = resolve_stream_names(&self.query)?; let config = AlertConfig { version: AlertVerison::from(CURRENT_ALERTS_VERSION), id: Ulid::new(), severity: self.severity, title: self.title, query: self.query, + datasets, alert_type: self.alert_type, threshold_config: self.threshold_config, eval_config: self.eval_config, targets: self.targets, state: AlertState::default(), + created: Utc::now(), + tags: self.tags, }; Ok(config) } @@ -561,6 +574,7 @@ pub struct AlertConfig { pub severity: Severity, pub title: String, pub query: String, + pub datasets: Vec, pub alert_type: AlertType, pub threshold_config: ThresholdConfig, pub eval_config: EvalConfig, @@ -568,6 +582,8 @@ pub struct AlertConfig { // for new alerts, state should be resolved #[serde(default)] pub state: AlertState, + pub created: DateTime, + pub tags: Option>, } impl AlertConfig { @@ -580,6 +596,7 @@ impl AlertConfig { let alert_info = format!("Alert '{}' (ID: {})", basic_fields.title, basic_fields.id); let query = Self::build_query_from_v1(alert_json, &alert_info).await?; + let datasets = resolve_stream_names(&query)?; let threshold_config = Self::extract_threshold_config(alert_json, &alert_info)?; let eval_config = Self::extract_eval_config(alert_json, &alert_info)?; let targets = Self::extract_targets(alert_json, &alert_info)?; @@ -592,11 +609,14 @@ impl AlertConfig { severity: basic_fields.severity, title: basic_fields.title, query, + datasets, alert_type: AlertType::Threshold, threshold_config, eval_config, targets, state, + created: Utc::now(), + tags: None, }; // Save the migrated alert back to storage @@ -1183,6 +1203,65 @@ impl AlertConfig { } Ok(()) } + + /// create a summary of the dashboard + /// used for listing dashboards + pub fn to_summary(&self) -> serde_json::Map { + let mut map = serde_json::Map::new(); + + map.insert( + "title".to_string(), + serde_json::Value::String(self.title.clone()), + ); + + map.insert( + "created".to_string(), + serde_json::Value::String(self.created.to_string()), + ); + + map.insert( + "alertType".to_string(), + serde_json::Value::String(self.alert_type.to_string()), + ); + + map.insert( + "id".to_string(), + serde_json::Value::String(self.id.to_string()), + ); + + map.insert( + "severity".to_string(), + serde_json::Value::String(self.severity.to_string()), + ); + + map.insert( + "state".to_string(), + serde_json::Value::String(self.state.to_string()), + ); + + if let Some(tags) = &self.tags { + map.insert( + "tags".to_string(), + serde_json::Value::Array( + tags.iter() + .map(|tag| serde_json::Value::String(tag.clone())) + .collect(), + ), + ); + } + + map.insert( + "datasets".to_string(), + serde_json::Value::Array( + self.datasets + .iter() + .map(|dataset| serde_json::Value::String(dataset.clone())) + .collect(), + ), + ); + + map + } } #[derive(Debug, thiserror::Error)] @@ -1221,6 +1300,8 @@ pub enum AlertError { ParserError(#[from] ParserError), #[error("Invalid alert query")] InvalidAlertQuery, + #[error("Invalid query parameter")] + InvalidQueryParameter, } impl actix_web::ResponseError for AlertError { @@ -1243,6 +1324,7 @@ impl actix_web::ResponseError for AlertError { Self::TargetInUse => StatusCode::CONFLICT, Self::ParserError(_) => StatusCode::BAD_REQUEST, Self::InvalidAlertQuery => StatusCode::BAD_REQUEST, + Self::InvalidQueryParameter => StatusCode::BAD_REQUEST, } } @@ -1350,6 +1432,7 @@ impl Alerts { pub async fn list_alerts_for_user( &self, session: SessionKey, + tags: Vec, ) -> Result, AlertError> { let mut alerts: Vec = Vec::new(); for (_, alert) in self.alerts.read().await.iter() { @@ -1358,6 +1441,17 @@ impl Alerts { alerts.push(alert.to_owned()); } } + if tags.is_empty() { + return Ok(alerts); + } + // filter alerts based on tags + alerts.retain(|alert| { + if let Some(alert_tags) = &alert.tags { + alert_tags.iter().any(|tag| tags.contains(tag)) + } else { + false + } + }); Ok(alerts) } @@ -1456,6 +1550,20 @@ impl Alerts { Ok(()) } + + /// List tags from all alerts + /// This function returns a list of unique tags from all alerts + pub async fn list_tags(&self) -> Vec { + let alerts = self.alerts.read().await; + let mut tags = alerts + .iter() + .filter_map(|(_, alert)| alert.tags.as_ref()) + .flat_map(|t| t.iter().cloned()) + .collect::>(); + tags.sort(); + tags.dedup(); + tags + } } #[derive(Debug, Serialize)] diff --git a/src/handlers/http/alerts.rs b/src/handlers/http/alerts.rs index a8f5ebe5d..98ee18610 100644 --- a/src/handlers/http/alerts.rs +++ b/src/handlers/http/alerts.rs @@ -16,7 +16,7 @@ * */ -use std::str::FromStr; +use std::{collections::HashMap, str::FromStr}; use crate::{ parseable::PARSEABLE, @@ -38,9 +38,28 @@ use crate::alerts::{ALERTS, AlertConfig, AlertError, AlertRequest, AlertState}; /// Read all alerts then return alerts which satisfy the condition pub async fn list(req: HttpRequest) -> Result { let session_key = extract_session_key_from_req(&req)?; - let alerts = ALERTS.list_alerts_for_user(session_key).await?; + let query_map = web::Query::>::from_query(req.query_string()) + .map_err(|_| AlertError::InvalidQueryParameter)?; + let mut tags_list = Vec::new(); + if !query_map.is_empty() { + if let Some(tags) = query_map.get("tags") { + tags_list = tags + .split(',') + .map(|s| s.trim().to_string()) + .filter(|s| !s.is_empty()) + .collect(); + if tags_list.is_empty() { + return Err(AlertError::InvalidQueryParameter); + } + } + } - Ok(web::Json(alerts)) + let alerts = ALERTS.list_alerts_for_user(session_key, tags_list).await?; + let alerts_summary = alerts + .iter() + .map(|alert| alert.to_summary()) + .collect::>(); + Ok(web::Json(alerts_summary)) } // POST /alerts @@ -154,3 +173,8 @@ pub async fn update_state( let alert = ALERTS.get_alert_by_id(alert_id).await?; Ok(web::Json(alert)) } + +pub async fn list_tags() -> Result { + let tags = ALERTS.list_tags().await; + Ok(web::Json(tags)) +} diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs index 1bdbe3908..88eb51a11 100644 --- a/src/handlers/http/modal/server.rs +++ b/src/handlers/http/modal/server.rs @@ -269,6 +269,13 @@ impl Server { .authorize(Action::DeleteAlert), ), ) + .service( + web::resource("/list_tags").route( + web::get() + .to(alerts::list_tags) + .authorize(Action::ListDashboard), + ), + ) } pub fn get_targets_webscope() -> Scope { diff --git a/src/prism/home/mod.rs b/src/prism/home/mod.rs index ac7e5ce4f..e7c6eaaac 100644 --- a/src/prism/home/mod.rs +++ b/src/prism/home/mod.rs @@ -356,7 +356,7 @@ async fn get_alert_titles( query_value: &str, ) -> Result, PrismHomeError> { let alerts = ALERTS - .list_alerts_for_user(key.clone()) + .list_alerts_for_user(key.clone(), vec![]) .await? .iter() .filter_map(|alert| {