From 759e0390bd1c285099ae2f6141839c9cbdcf0d64 Mon Sep 17 00:00:00 2001 From: anant Date: Mon, 4 Aug 2025 16:13:01 +0530 Subject: [PATCH 1/5] Updates to alerts - separated out enums, traits, and structs - added ability to pause alert evals (new endpoints to pause and resume) - added snooze for notifications (time dependent) - removed masking from targets fow now --- src/alerts/alert_enums.rs | 250 ++++++++ src/alerts/alert_structs.rs | 302 ++++++++++ src/alerts/{traits.rs => alert_traits.rs} | 41 +- src/alerts/alert_types.rs | 159 +++-- src/alerts/alerts_utils.rs | 28 +- src/alerts/mod.rs | 670 ++++------------------ src/alerts/target.rs | 43 +- src/handlers/http/alerts.rs | 192 +++---- src/handlers/http/modal/server.rs | 16 +- src/handlers/http/targets.rs | 14 +- src/prism/home/mod.rs | 2 +- src/query/mod.rs | 2 +- src/storage/object_storage.rs | 30 +- src/sync.rs | 4 +- 14 files changed, 962 insertions(+), 791 deletions(-) create mode 100644 src/alerts/alert_enums.rs create mode 100644 src/alerts/alert_structs.rs rename src/alerts/{traits.rs => alert_traits.rs} (70%) diff --git a/src/alerts/alert_enums.rs b/src/alerts/alert_enums.rs new file mode 100644 index 000000000..f07db3c44 --- /dev/null +++ b/src/alerts/alert_enums.rs @@ -0,0 +1,250 @@ +use std::fmt::{self, Display}; + +use derive_more::derive::FromStr; +use ulid::Ulid; + +use crate::alerts::{alert_structs::RollingWindow, alert_traits::AlertTrait}; + +pub enum AlertTask { + Create(Box), + Delete(Ulid), +} + +#[derive(Default, Debug, serde::Serialize, serde::Deserialize, Clone)] +#[serde(rename_all = "lowercase")] +pub enum AlertVersion { + V1, + #[default] + V2, +} + +impl From<&str> for AlertVersion { + fn from(value: &str) -> Self { + match value { + "v1" => Self::V1, + "v2" => Self::V2, + _ => Self::V2, // default to v2 + } + } +} + +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, Default)] +#[serde(rename_all = "camelCase")] +pub enum Severity { + Critical, + High, + #[default] + Medium, + Low, +} + +impl Display for Severity { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Severity::Critical => write!(f, "Critical"), + Severity::High => write!(f, "High"), + Severity::Medium => write!(f, "Medium"), + Severity::Low => write!(f, "Low"), + } + } +} + +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub enum LogicalOperator { + And, + Or, +} + +impl Display for LogicalOperator { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + LogicalOperator::And => write!(f, "AND"), + LogicalOperator::Or => write!(f, "OR"), + } + } +} + +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, PartialEq)] +#[serde(rename_all = "camelCase")] +pub enum AlertType { + Threshold, + Anomaly, + Forecast, +} + +impl Display for AlertType { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + AlertType::Threshold => write!(f, "threshold"), + AlertType::Anomaly => write!(f, "anomaly"), + AlertType::Forecast => write!(f, "forecast"), + } + } +} + +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub enum AlertOperator { + #[serde(rename = ">")] + GreaterThan, + #[serde(rename = "<")] + LessThan, + #[serde(rename = "=")] + Equal, + #[serde(rename = "!=")] + NotEqual, + #[serde(rename = ">=")] + GreaterThanOrEqual, + #[serde(rename = "<=")] + LessThanOrEqual, +} + +impl Display for AlertOperator { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + AlertOperator::GreaterThan => write!(f, ">"), + AlertOperator::LessThan => write!(f, "<"), + AlertOperator::Equal => write!(f, "="), + AlertOperator::NotEqual => write!(f, "!="), + AlertOperator::GreaterThanOrEqual => write!(f, ">="), + AlertOperator::LessThanOrEqual => write!(f, "<="), + } + } +} + +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, FromStr, PartialEq, Eq)] +#[serde(rename_all = "camelCase")] +pub enum WhereConfigOperator { + #[serde(rename = "=")] + Equal, + #[serde(rename = "!=")] + NotEqual, + #[serde(rename = "<")] + LessThan, + #[serde(rename = ">")] + GreaterThan, + #[serde(rename = "<=")] + LessThanOrEqual, + #[serde(rename = ">=")] + GreaterThanOrEqual, + #[serde(rename = "is null")] + IsNull, + #[serde(rename = "is not null")] + IsNotNull, + #[serde(rename = "ilike")] + ILike, + #[serde(rename = "contains")] + Contains, + #[serde(rename = "begins with")] + BeginsWith, + #[serde(rename = "ends with")] + EndsWith, + #[serde(rename = "does not contain")] + DoesNotContain, + #[serde(rename = "does not begin with")] + DoesNotBeginWith, + #[serde(rename = "does not end with")] + DoesNotEndWith, +} + +impl WhereConfigOperator { + /// Convert the enum value to its string representation + pub fn as_str(&self) -> &'static str { + match self { + Self::Equal => "=", + Self::NotEqual => "!=", + Self::LessThan => "<", + Self::GreaterThan => ">", + Self::LessThanOrEqual => "<=", + Self::GreaterThanOrEqual => ">=", + Self::IsNull => "is null", + Self::IsNotNull => "is not null", + Self::ILike => "ilike", + Self::Contains => "contains", + Self::BeginsWith => "begins with", + Self::EndsWith => "ends with", + Self::DoesNotContain => "does not contain", + Self::DoesNotBeginWith => "does not begin with", + Self::DoesNotEndWith => "does not end with", + } + } +} + +impl Display for WhereConfigOperator { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + // We can reuse our as_str method to get the string representation + write!(f, "{}", self.as_str()) + } +} + +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub enum AggregateFunction { + Avg, + Count, + CountDistinct, + Min, + Max, + Sum, +} + +impl Display for AggregateFunction { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + AggregateFunction::Avg => write!(f, "Avg"), + AggregateFunction::Count => write!(f, "Count"), + AggregateFunction::CountDistinct => write!(f, "CountDistinct"), + AggregateFunction::Min => write!(f, "Min"), + AggregateFunction::Max => write!(f, "Max"), + AggregateFunction::Sum => write!(f, "Sum"), + } + } +} + +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub enum EvalConfig { + RollingWindow(RollingWindow), +} + +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, Copy, PartialEq, Default, FromStr)] +#[serde(rename_all = "camelCase")] +pub enum AlertState { + Triggered, + #[default] + NotTriggered, + Paused, +} + +impl Display for AlertState { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + AlertState::Triggered => write!(f, "triggered"), + AlertState::Paused => write!(f, "paused"), + AlertState::NotTriggered => write!(f, "not-triggered"), + } + } +} + +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, PartialEq, Default)] +#[serde(rename_all = "camelCase")] +pub enum NotificationState { + #[default] + Notify, + /// Snoozed means the alert will evaluate but no notifications will be sent out + /// + /// It is a state which can only be set manually + /// + /// user needs to pass the timestamp or the duration (in human time) till which the alert is silenced + Snoozed(String), +} + +impl Display for NotificationState { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + NotificationState::Notify => write!(f, "notify"), + NotificationState::Snoozed(end_time) => write!(f, "snoozed till {end_time}"), + } + } +} diff --git a/src/alerts/alert_structs.rs b/src/alerts/alert_structs.rs new file mode 100644 index 000000000..0695ef45a --- /dev/null +++ b/src/alerts/alert_structs.rs @@ -0,0 +1,302 @@ +use std::collections::HashMap; + +use chrono::{DateTime, Utc}; +use serde::Serialize; +use tokio::sync::{RwLock, mpsc}; +use ulid::Ulid; + +use crate::{ + alerts::{ + AlertError, CURRENT_ALERTS_VERSION, + alert_enums::{ + AlertOperator, AlertState, AlertTask, AlertType, AlertVersion, EvalConfig, + LogicalOperator, NotificationState, Severity, WhereConfigOperator, + }, + alert_traits::AlertTrait, + target::TARGETS, + }, + query::resolve_stream_names, +}; + +/// Helper struct for basic alert fields during migration +pub struct BasicAlertFields { + pub id: Ulid, + pub title: String, + pub severity: Severity, +} + +#[derive(Debug)] +pub struct Alerts { + pub alerts: RwLock>>, + pub sender: mpsc::Sender, +} + +#[derive(Debug, Clone)] +pub struct Context { + pub alert_info: AlertInfo, + pub deployment_info: DeploymentInfo, + pub message: String, +} + +impl Context { + pub fn new(alert_info: AlertInfo, deployment_info: DeploymentInfo, message: String) -> Self { + Self { + alert_info, + deployment_info, + message, + } + } + + pub(crate) fn default_alert_string(&self) -> String { + format!( + "AlertName: {}\nTriggered TimeStamp: {}\nSeverity: {}\n{}", + self.alert_info.alert_name, + Utc::now().to_rfc3339(), + self.alert_info.severity, + self.message + ) + } + + pub(crate) fn default_resolved_string(&self) -> String { + format!("{} is now `not-triggered` ", self.alert_info.alert_name) + } + + pub(crate) fn default_paused_string(&self) -> String { + format!( + "{} is now `paused`. No more evals will be run till it is `paused`.", + self.alert_info.alert_name + ) + } + + // fn default_silenced_string(&self) -> String { + // format!( + // "Notifications for {} have been silenced ", + // self.alert_info.alert_name + // ) + // } +} + +#[derive(Debug, Clone)] +pub struct AlertInfo { + pub alert_id: Ulid, + pub alert_name: String, + // message: String, + // reason: String, + pub alert_state: AlertState, + pub notification_state: NotificationState, + pub severity: String, +} + +impl AlertInfo { + pub fn new( + alert_id: Ulid, + alert_name: String, + alert_state: AlertState, + notification_state: NotificationState, + severity: String, + ) -> Self { + Self { + alert_id, + alert_name, + alert_state, + notification_state, + severity, + } + } +} + +#[derive(Debug, Clone)] +pub struct DeploymentInfo { + pub deployment_instance: String, + pub deployment_id: Ulid, + pub deployment_mode: String, +} + +impl DeploymentInfo { + pub fn new(deployment_instance: String, deployment_id: Ulid, deployment_mode: String) -> Self { + Self { + deployment_instance, + deployment_id, + deployment_mode, + } + } +} + +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] +pub struct OperationConfig { + pub column: String, + pub operator: Option, + pub value: Option, +} + +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct FilterConfig { + pub conditions: Vec, +} + +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] +pub struct ConditionConfig { + pub column: String, + pub operator: WhereConfigOperator, + pub value: Option, +} + +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct Conditions { + pub operator: Option, + pub condition_config: Vec, +} + +impl Conditions { + pub fn generate_filter_message(&self) -> String { + match &self.operator { + Some(op) => match op { + LogicalOperator::And | LogicalOperator::Or => { + let expr1 = &self.condition_config[0]; + let expr2 = &self.condition_config[1]; + let expr1_msg = if expr1.value.as_ref().is_some_and(|v| !v.is_empty()) { + format!( + "{} {} {}", + expr1.column, + expr1.operator, + expr1.value.as_ref().unwrap() + ) + } else { + format!("{} {}", expr1.column, expr1.operator) + }; + + let expr2_msg = if expr2.value.as_ref().is_some_and(|v| !v.is_empty()) { + format!( + "{} {} {}", + expr2.column, + expr2.operator, + expr2.value.as_ref().unwrap() + ) + } else { + format!("{} {}", expr2.column, expr2.operator) + }; + + format!("[{expr1_msg} {op} {expr2_msg}]") + } + }, + None => { + let expr = &self.condition_config[0]; + if let Some(val) = &expr.value { + format!("{} {} {}", expr.column, expr.operator, val) + } else { + format!("{} {}", expr.column, expr.operator) + } + } + } + } +} + +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct GroupBy { + pub columns: Vec, +} + +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct ThresholdConfig { + pub operator: AlertOperator, + pub value: f64, +} + +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct RollingWindow { + // x minutes (25m) + pub eval_start: String, + // should always be "now" + pub eval_end: String, + // x minutes (5m) + pub eval_frequency: u64, +} + +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct AlertRequest { + #[serde(default = "Severity::default")] + pub severity: Severity, + pub title: String, + pub query: String, + pub alert_type: AlertType, + pub threshold_config: ThresholdConfig, + pub eval_config: EvalConfig, + pub targets: Vec, + pub tags: Option>, +} + +impl AlertRequest { + pub async fn into(self) -> Result { + // Validate that all target IDs exist + for id in &self.targets { + TARGETS.get_target_by_id(id).await?; + } + let datasets = resolve_stream_names(&self.query)?; + let config = AlertConfig { + version: AlertVersion::from(CURRENT_ALERTS_VERSION), + id: Ulid::new(), + severity: self.severity, + title: self.title, + query: self.query, + datasets, + alert_type: self.alert_type, + threshold_config: self.threshold_config, + eval_config: self.eval_config, + targets: self.targets, + state: AlertState::default(), + notification_state: NotificationState::Notify, + created: Utc::now(), + tags: self.tags, + }; + Ok(config) + } +} + +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct AlertConfig { + pub version: AlertVersion, + #[serde(default)] + pub id: Ulid, + pub severity: Severity, + pub title: String, + pub query: String, + pub datasets: Vec, + pub alert_type: AlertType, + pub threshold_config: ThresholdConfig, + pub eval_config: EvalConfig, + pub targets: Vec, + // for new alerts, state should be resolved + #[serde(default)] + pub state: AlertState, + pub notification_state: NotificationState, + pub created: DateTime, + pub tags: Option>, +} + +#[derive(Debug, Serialize)] +pub struct AlertsSummary { + pub total: u64, + pub triggered: AlertsInfoByState, + pub paused: AlertsInfoByState, + pub resolved: AlertsInfoByState, +} + +#[derive(Debug, Serialize)] +pub struct AlertsInfoByState { + pub total: u64, + pub alert_info: Vec, +} + +#[derive(Debug, Serialize)] +pub struct AlertsInfo { + pub title: String, + pub id: Ulid, + pub severity: Severity, +} diff --git a/src/alerts/traits.rs b/src/alerts/alert_traits.rs similarity index 70% rename from src/alerts/traits.rs rename to src/alerts/alert_traits.rs index 303f6d42f..ec019b10a 100644 --- a/src/alerts/traits.rs +++ b/src/alerts/alert_traits.rs @@ -18,22 +18,44 @@ use crate::{ alerts::{ - AlertConfig, AlertError, AlertState, AlertType, EvalConfig, Severity, ThresholdConfig, + AlertConfig, AlertError, AlertState, AlertType, EvalConfig, Severity, + alert_enums::NotificationState, + alert_structs::{Context, ThresholdConfig}, }, rbac::map::SessionKey, }; +use chrono::{DateTime, Utc}; use std::{collections::HashMap, fmt::Debug}; use tonic::async_trait; use ulid::Ulid; +/// A trait to handle different types of messages built by different alert types +pub trait MessageCreation { + fn create_threshold_message(&self, actual_value: f64) -> Result; + fn create_anomaly_message( + &self, + actual_value: f64, + lower_bound: f64, + upper_bound: f64, + ) -> Result; + fn create_forecast_message( + &self, + forecasted_time: DateTime, + forecasted_value: f64, + ) -> Result; +} + #[async_trait] pub trait AlertTrait: Debug + Send + Sync { - async fn eval_alert(&self) -> Result<(bool, f64), AlertError>; + async fn eval_alert(&self) -> Result, AlertError>; async fn validate(&self, session_key: &SessionKey) -> Result<(), AlertError>; + async fn update_notification_state( + &mut self, + new_notification_state: NotificationState, + ) -> Result<(), AlertError>; async fn update_state( &mut self, - is_manual: bool, - new_state: AlertState, + alert_state: AlertState, trigger_notif: Option, ) -> Result<(), AlertError>; fn get_id(&self) -> &Ulid; @@ -52,6 +74,7 @@ pub trait AlertTrait: Debug + Send + Sync { fn get_datasets(&self) -> &Vec; fn to_alert_config(&self) -> AlertConfig; fn clone_box(&self) -> Box; + // fn get_alert_message(&self) -> Result; } #[async_trait] @@ -70,6 +93,11 @@ pub trait AlertManagerTrait: Send + Sync { new_state: AlertState, trigger_notif: Option, ) -> Result<(), AlertError>; + async fn update_notification_state( + &self, + alert_id: Ulid, + new_notification_state: NotificationState, + ) -> Result<(), AlertError>; async fn delete(&self, alert_id: Ulid) -> Result<(), AlertError>; async fn get_state(&self, alert_id: Ulid) -> Result; async fn start_task(&self, alert: Box) -> Result<(), AlertError>; @@ -77,3 +105,8 @@ pub trait AlertManagerTrait: Send + Sync { async fn list_tags(&self) -> Vec; async fn get_all_alerts(&self) -> HashMap>; } + +#[async_trait] +pub trait CallableTarget { + async fn call(&self, payload: &Context); +} diff --git a/src/alerts/alert_types.rs b/src/alerts/alert_types.rs index 81fbcbfb9..18e58ab58 100644 --- a/src/alerts/alert_types.rs +++ b/src/alerts/alert_types.rs @@ -16,21 +16,22 @@ * */ -use std::time::Duration; +use std::{str::FromStr, time::Duration}; use chrono::{DateTime, Utc}; use tonic::async_trait; -use tracing::trace; +use tracing::{info, trace, warn}; use ulid::Ulid; use crate::{ alerts::{ AlertConfig, AlertError, AlertState, AlertType, AlertVersion, EvalConfig, Severity, ThresholdConfig, + alert_enums::NotificationState, + alert_traits::{AlertTrait, MessageCreation}, alerts_utils::{evaluate_condition, execute_alert_query, extract_time_range}, is_query_aggregate, target::{self, TARGETS}, - traits::AlertTrait, }, handlers::http::query::create_streams_for_distributed, option::Mode, @@ -56,6 +57,7 @@ pub struct ThresholdAlert { // for new alerts, state should be resolved #[serde(default)] pub state: AlertState, + pub notification_state: NotificationState, pub created: DateTime, pub tags: Option>, pub datasets: Vec, @@ -63,7 +65,7 @@ pub struct ThresholdAlert { #[async_trait] impl AlertTrait for ThresholdAlert { - async fn eval_alert(&self) -> Result<(bool, f64), AlertError> { + async fn eval_alert(&self) -> Result, AlertError> { let time_range = extract_time_range(&self.eval_config)?; let final_value = execute_alert_query(self.get_query(), &time_range).await?; let result = evaluate_condition( @@ -71,7 +73,14 @@ impl AlertTrait for ThresholdAlert { final_value, self.threshold_config.value, ); - Ok((result, final_value)) + + let message = if result { + // generate message + Some(self.create_threshold_message(final_value)?) + } else { + None + }; + Ok(message) } async fn validate(&self, session_key: &SessionKey) -> Result<(), AlertError> { @@ -136,6 +145,70 @@ impl AlertTrait for ThresholdAlert { Ok(()) } + async fn update_notification_state( + &mut self, + new_notification_state: NotificationState, + ) -> Result<(), AlertError> { + let store = PARSEABLE.storage.get_object_store(); + // update state in memory + self.notification_state = new_notification_state; + // update on disk + store.put_alert(self.id, &self.to_alert_config()).await?; + + Ok(()) + } + + async fn update_state( + &mut self, + new_state: AlertState, + trigger_notif: Option, + ) -> Result<(), AlertError> { + let store = PARSEABLE.storage.get_object_store(); + if self.state.eq(&AlertState::Paused) { + warn!( + "Alert- {} has been Paused. No evals will be done till it is unpaused.", + self.id + ); + // update state in memory + self.state = new_state; + + // update on disk + store.put_alert(self.id, &self.to_alert_config()).await?; + // The task should have already been removed from the list of running tasks + return Ok(()); + } + + match &mut self.notification_state { + NotificationState::Notify => {} + NotificationState::Snoozed(till_time) => { + // if now > till_time, modify notif state to notify and proceed + let now = Utc::now(); + let till = DateTime::::from_str(till_time) + .map_err(|e| AlertError::CustomError(e.to_string()))?; + if now > till { + info!( + "Modifying alert notif state from snoozed to notify- Now= {now}, Snooze till= {till}" + ); + self.notification_state = NotificationState::Notify; + } + } + } + + // update state in memory + self.state = new_state; + + // update on disk + store.put_alert(self.id, &self.to_alert_config()).await?; + + if trigger_notif.is_some() && self.notification_state.eq(&NotificationState::Notify) { + trace!("trigger notif on-\n{}", self.state); + self.to_alert_config() + .trigger_notifications(trigger_notif.unwrap()) + .await?; + } + Ok(()) + } + fn get_id(&self) -> &Ulid { &self.id } @@ -204,49 +277,41 @@ impl AlertTrait for ThresholdAlert { fn clone_box(&self) -> Box { Box::new(self.clone()) } +} - async fn update_state( - &mut self, - is_manual: bool, - new_state: AlertState, - trigger_notif: Option, - ) -> Result<(), AlertError> { - let store = PARSEABLE.storage.get_object_store(); - match self.state { - AlertState::Triggered => { - if is_manual - && new_state != AlertState::Resolved - && new_state != AlertState::Silenced - { - let msg = format!("Not allowed to manually go from Triggered to {new_state}"); - return Err(AlertError::InvalidStateChange(msg)); - } - } - AlertState::Silenced => { - if is_manual && new_state != AlertState::Resolved { - let msg = format!("Not allowed to manually go from Silenced to {new_state}"); - return Err(AlertError::InvalidStateChange(msg)); - } - } - AlertState::Resolved => { - if is_manual { - let msg = format!("Not allowed to go manually from Resolved to {new_state}"); - return Err(AlertError::InvalidStateChange(msg)); - } - } - } - // update state in memory - self.state = new_state; - // update on disk - store.put_alert(self.id, &self.to_alert_config()).await?; +impl MessageCreation for ThresholdAlert { + fn create_threshold_message(&self, actual_value: f64) -> Result { + Ok(format!( + "Alert Triggered: {}\n\nThreshold: ({} {})\nCurrent Value: {}\nEvaluation Window: {} | Frequency: {}\n\nQuery:\n{}", + self.get_id(), + self.get_threshold_config().operator, + self.get_threshold_config().value, + actual_value, + self.get_eval_window(), + self.get_eval_frequency(), + self.get_query() + )) + } - if trigger_notif.is_some() { - trace!("trigger notif on-\n{}", self.state); - self.to_alert_config() - .trigger_notifications(trigger_notif.unwrap()) - .await?; - } - Ok(()) + fn create_anomaly_message( + &self, + _forecast_value: f64, + _lower_bound: f64, + _upper_bound: f64, + ) -> Result { + Err(AlertError::Unimplemented( + "Anomaly message creation is not allowed for Threshold alert".into(), + )) + } + + fn create_forecast_message( + &self, + _forecast_time: DateTime, + _forecast_value: f64, + ) -> Result { + Err(AlertError::Unimplemented( + "Forecast message creation is not allowed for Threshold alert".into(), + )) } } @@ -263,6 +328,7 @@ impl From for ThresholdAlert { eval_config: value.eval_config, targets: value.targets, state: value.state, + notification_state: value.notification_state, created: value.created, tags: value.tags, datasets: value.datasets, @@ -283,6 +349,7 @@ impl From for AlertConfig { eval_config: val.eval_config, targets: val.targets, state: val.state, + notification_state: val.notification_state, created: val.created, tags: val.tags, datasets: val.datasets, diff --git a/src/alerts/alerts_utils.rs b/src/alerts/alerts_utils.rs index be9676d37..2a49b109a 100644 --- a/src/alerts/alerts_utils.rs +++ b/src/alerts/alerts_utils.rs @@ -28,7 +28,7 @@ use itertools::Itertools; use tracing::trace; use crate::{ - alerts::{AlertTrait, Conditions, LogicalOperator, WhereConfigOperator}, + alerts::{AlertTrait, LogicalOperator, WhereConfigOperator, alert_structs::Conditions}, handlers::http::{ cluster::send_query_request, query::{Query, create_streams_for_distributed}, @@ -55,9 +55,9 @@ use super::{ALERTS, AlertError, AlertOperator, AlertState}; pub async fn evaluate_alert(alert: &dyn AlertTrait) -> Result<(), AlertError> { trace!("RUNNING EVAL TASK FOR- {alert:?}"); - let (result, final_value) = alert.eval_alert().await?; + let message = alert.eval_alert().await?; - update_alert_state(alert, result, final_value).await + update_alert_state(alert, message).await } /// Extract time range from alert evaluation configuration @@ -163,8 +163,7 @@ pub fn evaluate_condition(operator: &AlertOperator, actual: f64, expected: f64) async fn update_alert_state( alert: &dyn AlertTrait, - final_res: bool, - actual_value: f64, + message: Option, ) -> Result<(), AlertError> { let guard = ALERTS.write().await; let alerts = if let Some(alerts) = guard.as_ref() { @@ -173,20 +172,9 @@ async fn update_alert_state( return Err(AlertError::CustomError("No AlertManager set".into())); }; - if final_res { - let message = format!( - "Alert Triggered: {}\n\nThreshold: ({} {})\nCurrent Value: {}\nEvaluation Window: {} | Frequency: {}\n\nQuery:\n{}", - alert.get_id(), - alert.get_threshold_config().operator, - alert.get_threshold_config().value, - actual_value, - alert.get_eval_window(), - alert.get_eval_frequency(), - alert.get_query() - ); - + if let Some(msg) = message { alerts - .update_state(*alert.get_id(), AlertState::Triggered, Some(message)) + .update_state(*alert.get_id(), AlertState::Triggered, Some(msg)) .await } else if alerts .get_state(*alert.get_id()) @@ -194,11 +182,11 @@ async fn update_alert_state( .eq(&AlertState::Triggered) { alerts - .update_state(*alert.get_id(), AlertState::Resolved, Some("".into())) + .update_state(*alert.get_id(), AlertState::NotTriggered, Some("".into())) .await } else { alerts - .update_state(*alert.get_id(), AlertState::Resolved, None) + .update_state(*alert.get_id(), AlertState::NotTriggered, None) .await } } diff --git a/src/alerts/mod.rs b/src/alerts/mod.rs index 5597f71c3..35173ddf0 100644 --- a/src/alerts/mod.rs +++ b/src/alerts/mod.rs @@ -19,17 +19,14 @@ use actix_web::http::header::ContentType; use arrow_schema::{ArrowError, DataType, Schema}; use async_trait::async_trait; -use chrono::{DateTime, Utc}; +use chrono::Utc; use datafusion::logical_expr::{LogicalPlan, Projection}; use datafusion::sql::sqlparser::parser::ParserError; use derive_more::FromStrError; -use derive_more::derive::FromStr; use http::StatusCode; -// use once_cell::sync::Lazy; -use serde::Serialize; use serde_json::{Error as SerdeError, Value as JsonValue}; use std::collections::HashMap; -use std::fmt::{self, Debug, Display}; +use std::fmt::Debug; use std::sync::Arc; use std::thread; use std::time::Duration; @@ -39,14 +36,24 @@ use tokio::task::JoinHandle; use tracing::{error, trace, warn}; use ulid::Ulid; +pub mod alert_enums; +pub mod alert_structs; +pub mod alert_traits; pub mod alert_types; pub mod alerts_utils; pub mod target; -pub mod traits; +pub use crate::alerts::alert_enums::{ + AggregateFunction, AlertOperator, AlertState, AlertTask, AlertType, AlertVersion, EvalConfig, + LogicalOperator, NotificationState, Severity, WhereConfigOperator, +}; +pub use crate::alerts::alert_structs::{ + AlertConfig, AlertInfo, AlertRequest, Alerts, AlertsInfo, AlertsInfoByState, AlertsSummary, + BasicAlertFields, Context, DeploymentInfo, RollingWindow, ThresholdConfig, +}; +use crate::alerts::alert_traits::{AlertManagerTrait, AlertTrait}; use crate::alerts::alert_types::ThresholdAlert; use crate::alerts::target::TARGETS; -use crate::alerts::traits::{AlertManagerTrait, AlertTrait}; use crate::handlers::http::fetch_schema; use crate::handlers::http::query::create_streams_for_distributed; use crate::option::Mode; @@ -58,13 +65,6 @@ use crate::storage::{ALERTS_ROOT_DIRECTORY, ObjectStorageError}; use crate::sync::alert_runtime; use crate::utils::user_auth_for_query; -/// Helper struct for basic alert fields during migration -struct BasicAlertFields { - id: Ulid, - title: String, - severity: Severity, -} - // these types describe the scheduled task for an alert pub type ScheduledTaskHandlers = (JoinHandle<()>, Receiver<()>, Sender<()>); @@ -100,516 +100,6 @@ pub fn create_default_alerts_manager() -> Alerts { alerts } -// pub static ALERTS: Lazy = Lazy::new(|| { -// let (tx, rx) = mpsc::channel::(10); -// let alerts = Alerts { -// alerts: RwLock::new(HashMap::new()), -// sender: tx, -// }; - -// thread::spawn(|| alert_runtime(rx)); - -// alerts -// }); - -#[derive(Debug)] -pub struct Alerts { - pub alerts: RwLock>>, - pub sender: mpsc::Sender, -} - -pub enum AlertTask { - Create(Box), - Delete(Ulid), -} - -#[derive(Default, Debug, serde::Serialize, serde::Deserialize, Clone)] -#[serde(rename_all = "lowercase")] -pub enum AlertVersion { - V1, - #[default] - V2, -} - -impl From<&str> for AlertVersion { - fn from(value: &str) -> Self { - match value { - "v1" => Self::V1, - "v2" => Self::V2, - _ => Self::V2, // default to v2 - } - } -} - -#[async_trait] -pub trait CallableTarget { - async fn call(&self, payload: &Context); -} - -#[derive(Debug, Clone)] -pub struct Context { - alert_info: AlertInfo, - deployment_info: DeploymentInfo, - message: String, -} - -impl Context { - pub fn new(alert_info: AlertInfo, deployment_info: DeploymentInfo, message: String) -> Self { - Self { - alert_info, - deployment_info, - message, - } - } - - fn default_alert_string(&self) -> String { - format!( - "AlertName: {}\nTriggered TimeStamp: {}\nSeverity: {}\n{}", - self.alert_info.alert_name, - Utc::now().to_rfc3339(), - self.alert_info.severity, - self.message - ) - } - - fn default_resolved_string(&self) -> String { - format!("{} is now resolved ", self.alert_info.alert_name) - } - - fn default_silenced_string(&self) -> String { - format!( - "Notifications for {} have been silenced ", - self.alert_info.alert_name - ) - } -} - -#[derive(Debug, Clone)] -pub struct AlertInfo { - alert_id: Ulid, - alert_name: String, - // message: String, - // reason: String, - alert_state: AlertState, - severity: String, -} - -impl AlertInfo { - pub fn new( - alert_id: Ulid, - alert_name: String, - alert_state: AlertState, - severity: String, - ) -> Self { - Self { - alert_id, - alert_name, - alert_state, - severity, - } - } -} - -#[derive(Debug, Clone)] -pub struct DeploymentInfo { - deployment_instance: String, - deployment_id: Ulid, - deployment_mode: String, -} - -impl DeploymentInfo { - pub fn new(deployment_instance: String, deployment_id: Ulid, deployment_mode: String) -> Self { - Self { - deployment_instance, - deployment_id, - deployment_mode, - } - } -} - -#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, PartialEq)] -#[serde(rename_all = "camelCase")] -pub enum AlertType { - Threshold, - Anomaly, - Forecast, -} - -impl Display for AlertType { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - AlertType::Threshold => write!(f, "threshold"), - AlertType::Anomaly => write!(f, "anomaly"), - AlertType::Forecast => write!(f, "forecast"), - } - } -} - -#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] -#[serde(rename_all = "camelCase")] -pub enum AlertOperator { - #[serde(rename = ">")] - GreaterThan, - #[serde(rename = "<")] - LessThan, - #[serde(rename = "=")] - Equal, - #[serde(rename = "!=")] - NotEqual, - #[serde(rename = ">=")] - GreaterThanOrEqual, - #[serde(rename = "<=")] - LessThanOrEqual, -} - -impl Display for AlertOperator { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - AlertOperator::GreaterThan => write!(f, ">"), - AlertOperator::LessThan => write!(f, "<"), - AlertOperator::Equal => write!(f, "="), - AlertOperator::NotEqual => write!(f, "!="), - AlertOperator::GreaterThanOrEqual => write!(f, ">="), - AlertOperator::LessThanOrEqual => write!(f, "<="), - } - } -} - -#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, FromStr, PartialEq, Eq)] -#[serde(rename_all = "camelCase")] -pub enum WhereConfigOperator { - #[serde(rename = "=")] - Equal, - #[serde(rename = "!=")] - NotEqual, - #[serde(rename = "<")] - LessThan, - #[serde(rename = ">")] - GreaterThan, - #[serde(rename = "<=")] - LessThanOrEqual, - #[serde(rename = ">=")] - GreaterThanOrEqual, - #[serde(rename = "is null")] - IsNull, - #[serde(rename = "is not null")] - IsNotNull, - #[serde(rename = "ilike")] - ILike, - #[serde(rename = "contains")] - Contains, - #[serde(rename = "begins with")] - BeginsWith, - #[serde(rename = "ends with")] - EndsWith, - #[serde(rename = "does not contain")] - DoesNotContain, - #[serde(rename = "does not begin with")] - DoesNotBeginWith, - #[serde(rename = "does not end with")] - DoesNotEndWith, -} - -impl WhereConfigOperator { - /// Convert the enum value to its string representation - pub fn as_str(&self) -> &'static str { - match self { - Self::Equal => "=", - Self::NotEqual => "!=", - Self::LessThan => "<", - Self::GreaterThan => ">", - Self::LessThanOrEqual => "<=", - Self::GreaterThanOrEqual => ">=", - Self::IsNull => "is null", - Self::IsNotNull => "is not null", - Self::ILike => "ilike", - Self::Contains => "contains", - Self::BeginsWith => "begins with", - Self::EndsWith => "ends with", - Self::DoesNotContain => "does not contain", - Self::DoesNotBeginWith => "does not begin with", - Self::DoesNotEndWith => "does not end with", - } - } -} - -impl Display for WhereConfigOperator { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - // We can reuse our as_str method to get the string representation - write!(f, "{}", self.as_str()) - } -} - -#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] -#[serde(rename_all = "camelCase")] -pub enum AggregateFunction { - Avg, - Count, - CountDistinct, - Min, - Max, - Sum, -} - -impl Display for AggregateFunction { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - AggregateFunction::Avg => write!(f, "Avg"), - AggregateFunction::Count => write!(f, "Count"), - AggregateFunction::CountDistinct => write!(f, "CountDistinct"), - AggregateFunction::Min => write!(f, "Min"), - AggregateFunction::Max => write!(f, "Max"), - AggregateFunction::Sum => write!(f, "Sum"), - } - } -} - -#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] -pub struct OperationConfig { - pub column: String, - pub operator: Option, - pub value: Option, -} - -#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] -#[serde(rename_all = "camelCase")] -pub struct FilterConfig { - pub conditions: Vec, -} - -#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] -pub struct ConditionConfig { - pub column: String, - pub operator: WhereConfigOperator, - pub value: Option, -} - -#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] -#[serde(rename_all = "camelCase")] -pub struct Conditions { - pub operator: Option, - pub condition_config: Vec, -} - -impl Conditions { - pub fn generate_filter_message(&self) -> String { - match &self.operator { - Some(op) => match op { - LogicalOperator::And | LogicalOperator::Or => { - let expr1 = &self.condition_config[0]; - let expr2 = &self.condition_config[1]; - let expr1_msg = if expr1.value.as_ref().is_some_and(|v| !v.is_empty()) { - format!( - "{} {} {}", - expr1.column, - expr1.operator, - expr1.value.as_ref().unwrap() - ) - } else { - format!("{} {}", expr1.column, expr1.operator) - }; - - let expr2_msg = if expr2.value.as_ref().is_some_and(|v| !v.is_empty()) { - format!( - "{} {} {}", - expr2.column, - expr2.operator, - expr2.value.as_ref().unwrap() - ) - } else { - format!("{} {}", expr2.column, expr2.operator) - }; - - format!("[{expr1_msg} {op} {expr2_msg}]") - } - }, - None => { - let expr = &self.condition_config[0]; - if let Some(val) = &expr.value { - format!("{} {} {}", expr.column, expr.operator, val) - } else { - format!("{} {}", expr.column, expr.operator) - } - } - } - } -} - -#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] -#[serde(rename_all = "camelCase")] -pub struct GroupBy { - pub columns: Vec, -} - -#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] -#[serde(rename_all = "camelCase")] -pub struct ThresholdConfig { - pub operator: AlertOperator, - pub value: f64, -} - -#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] -#[serde(rename_all = "camelCase")] -pub struct RollingWindow { - // x minutes (25m) - pub eval_start: String, - // should always be "now" - pub eval_end: String, - // x minutes (5m) - pub eval_frequency: u64, -} - -#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] -#[serde(rename_all = "camelCase")] -pub enum EvalConfig { - RollingWindow(RollingWindow), -} - -#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] -#[serde(rename_all = "camelCase")] -pub struct AlertEval {} - -#[derive( - Debug, - serde::Serialize, - serde::Deserialize, - Clone, - Copy, - PartialEq, - Eq, - PartialOrd, - Ord, - Default, - FromStr, -)] -#[serde(rename_all = "camelCase")] -pub enum AlertState { - Triggered, - Silenced, - #[default] - Resolved, -} - -impl Display for AlertState { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - AlertState::Triggered => write!(f, "Triggered"), - AlertState::Silenced => write!(f, "Silenced"), - AlertState::Resolved => write!(f, "Resolved"), - } - } -} - -#[derive( - Debug, - serde::Serialize, - serde::Deserialize, - Clone, - Copy, - PartialEq, - Eq, - PartialOrd, - Ord, - Default, - FromStr, -)] -#[serde(rename_all = "camelCase")] -pub enum Severity { - Critical, - High, - #[default] - Medium, - Low, -} - -impl Display for Severity { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - Severity::Critical => write!(f, "Critical"), - Severity::High => write!(f, "High"), - Severity::Medium => write!(f, "Medium"), - Severity::Low => write!(f, "Low"), - } - } -} - -#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] -#[serde(rename_all = "camelCase")] -pub enum LogicalOperator { - And, - Or, -} - -impl Display for LogicalOperator { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - LogicalOperator::And => write!(f, "AND"), - LogicalOperator::Or => write!(f, "OR"), - } - } -} - -#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] -#[serde(rename_all = "camelCase")] -pub struct AlertRequest { - #[serde(default = "Severity::default")] - pub severity: Severity, - pub title: String, - pub query: String, - pub alert_type: AlertType, - pub threshold_config: ThresholdConfig, - pub eval_config: EvalConfig, - pub targets: Vec, - pub tags: Option>, -} - -impl AlertRequest { - pub async fn into(self) -> Result { - // Validate that all target IDs exist - for id in &self.targets { - TARGETS.get_target_by_id(id).await?; - } - let datasets = resolve_stream_names(&self.query)?; - let config = AlertConfig { - version: AlertVersion::from(CURRENT_ALERTS_VERSION), - id: Ulid::new(), - severity: self.severity, - title: self.title, - query: self.query, - datasets, - alert_type: self.alert_type, - threshold_config: self.threshold_config, - eval_config: self.eval_config, - targets: self.targets, - state: AlertState::default(), - created: Utc::now(), - tags: self.tags, - }; - Ok(config) - } -} - -#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] -#[serde(rename_all = "camelCase")] -pub struct AlertConfig { - pub version: AlertVersion, - #[serde(default)] - pub id: Ulid, - pub severity: Severity, - pub title: String, - pub query: String, - pub datasets: Vec, - pub alert_type: AlertType, - pub threshold_config: ThresholdConfig, - pub eval_config: EvalConfig, - pub targets: Vec, - // for new alerts, state should be resolved - #[serde(default)] - pub state: AlertState, - pub created: DateTime, - pub tags: Option>, -} - impl AlertConfig { /// Migration function to convert v1 alerts to v2 structure pub async fn migrate_from_v1( @@ -639,6 +129,7 @@ impl AlertConfig { eval_config, targets, state, + notification_state: NotificationState::Notify, created: Utc::now(), tags: None, }; @@ -1068,9 +559,9 @@ impl AlertConfig { let state_str = alert_json["state"].as_str().unwrap_or("resolved"); match state_str.to_lowercase().as_str() { "triggered" => AlertState::Triggered, - "silenced" => AlertState::Silenced, - "resolved" => AlertState::Resolved, - _ => AlertState::Resolved, + // "silenced" => AlertState::Silenced, + "resolved" => AlertState::NotTriggered, + _ => AlertState::NotTriggered, } } @@ -1180,7 +671,8 @@ impl AlertConfig { self.id, self.title.clone(), self.state, - self.severity.to_string(), + alert_enums::NotificationState::Notify, + self.severity.clone().to_string(), ), DeploymentInfo::new(deployment_instance, deployment_id, deployment_mode), String::default(), @@ -1332,12 +824,14 @@ pub enum AlertError { ParserError(#[from] ParserError), #[error("Invalid alert query")] InvalidAlertQuery, - #[error("Invalid query parameter: {0}")] - InvalidQueryParameter(String), + #[error("Invalid query parameter")] + InvalidQueryParameter, #[error("{0}")] ArrowError(#[from] ArrowError), #[error("Upgrade to Parseable Enterprise for {0} type alerts")] NotPresentInOSS(String), + #[error("{0}")] + Unimplemented(String), } impl actix_web::ResponseError for AlertError { @@ -1360,8 +854,9 @@ impl actix_web::ResponseError for AlertError { Self::TargetInUse => StatusCode::CONFLICT, Self::ParserError(_) => StatusCode::BAD_REQUEST, Self::InvalidAlertQuery => StatusCode::BAD_REQUEST, - Self::InvalidQueryParameter(_) => StatusCode::BAD_REQUEST, + Self::InvalidQueryParameter => StatusCode::BAD_REQUEST, Self::ArrowError(_) => StatusCode::INTERNAL_SERVER_ERROR, + Self::Unimplemented(_) => StatusCode::INTERNAL_SERVER_ERROR, Self::NotPresentInOSS(_) => StatusCode::BAD_REQUEST, } } @@ -1453,7 +948,12 @@ impl AlertManagerTrait for Alerts { } }; - // Create alert task + // Create alert task iff alert's state is not paused + if alert.get_state().eq(&AlertState::Paused) { + map.insert(*alert.get_id(), alert); + return Ok(()); + } + match self.sender.send(AlertTask::Create(alert.clone_box())).await { Ok(_) => {} Err(e) => { @@ -1555,12 +1055,59 @@ impl AlertManagerTrait for Alerts { ))); }; - let current_state = alert.get_state(); - - if current_state.ne(&new_state) { - alert.update_state(false, new_state, trigger_notif).await?; - write_access.insert(*alert.get_id(), alert.clone_box()); + // if new state is Paused then ensure that the task is removed from list + if new_state.eq(&AlertState::Paused) { + self.sender + .send(AlertTask::Delete(alert_id)) + .await + .map_err(|e| AlertError::CustomError(e.to_string()))?; } + // user has resumed evals for this alert + else if alert.get_state().eq(&AlertState::Paused) + && new_state.eq(&AlertState::NotTriggered) + { + self.sender + .send(AlertTask::Create(alert.clone_box())) + .await + .map_err(|e| AlertError::CustomError(e.to_string()))?; + } + alert.update_state(new_state, trigger_notif).await?; + write_access.insert(*alert.get_id(), alert.clone_box()); + Ok(()) + } + + /// Update the notification state of alert + async fn update_notification_state( + &self, + alert_id: Ulid, + new_notification_state: NotificationState, + ) -> Result<(), AlertError> { + // let store = PARSEABLE.storage.get_object_store(); + + // read and modify alert + let mut write_access = self.alerts.write().await; + let mut alert: Box = if let Some(alert) = write_access.get(&alert_id) { + match &alert.get_alert_type() { + AlertType::Threshold => { + Box::new(ThresholdAlert::from(alert.to_alert_config())) as Box + } + AlertType::Anomaly => { + return Err(AlertError::NotPresentInOSS("anomaly".into())); + } + AlertType::Forecast => { + return Err(AlertError::NotPresentInOSS("forecast".into())); + } + } + } else { + return Err(AlertError::CustomError(format!( + "No alert found for the given ID- {alert_id}" + ))); + }; + + alert + .update_notification_state(new_notification_state) + .await?; + write_access.insert(*alert.get_id(), alert.clone_box()); Ok(()) } @@ -1626,27 +1173,6 @@ impl AlertManagerTrait for Alerts { } } -#[derive(Debug, Serialize)] -pub struct AlertsSummary { - total: u64, - triggered: AlertsInfoByState, - silenced: AlertsInfoByState, - resolved: AlertsInfoByState, -} - -#[derive(Debug, Serialize)] -pub struct AlertsInfoByState { - total: u64, - alert_info: Vec, -} - -#[derive(Debug, Serialize)] -pub struct AlertsInfo { - title: String, - id: Ulid, - severity: Severity, -} - // TODO: add RBAC pub async fn get_alerts_summary() -> Result { let guard = ALERTS.read().await; @@ -1660,9 +1186,9 @@ pub async fn get_alerts_summary() -> Result { let mut triggered = 0; let mut resolved = 0; - let mut silenced = 0; + let mut paused = 0; let mut triggered_alerts: Vec = Vec::new(); - let mut silenced_alerts: Vec = Vec::new(); + let mut paused_alerts: Vec = Vec::new(); let mut resolved_alerts: Vec = Vec::new(); // find total alerts for each state @@ -1674,23 +1200,23 @@ pub async fn get_alerts_summary() -> Result { triggered_alerts.push(AlertsInfo { title: alert.get_title().to_string(), id: *alert.get_id(), - severity: *alert.get_severity(), + severity: alert.get_severity().clone(), }); } - AlertState::Silenced => { - silenced += 1; - silenced_alerts.push(AlertsInfo { + AlertState::Paused => { + paused += 1; + paused_alerts.push(AlertsInfo { title: alert.get_title().to_string(), id: *alert.get_id(), - severity: *alert.get_severity(), + severity: alert.get_severity().clone(), }); } - AlertState::Resolved => { + AlertState::NotTriggered => { resolved += 1; resolved_alerts.push(AlertsInfo { title: alert.get_title().to_string(), id: *alert.get_id(), - severity: *alert.get_severity(), + severity: alert.get_severity().clone(), }); } } @@ -1700,8 +1226,8 @@ pub async fn get_alerts_summary() -> Result { triggered_alerts.sort_by_key(|alert| get_severity_priority(&alert.severity)); triggered_alerts.truncate(5); - silenced_alerts.sort_by_key(|alert| get_severity_priority(&alert.severity)); - silenced_alerts.truncate(5); + paused_alerts.sort_by_key(|alert| get_severity_priority(&alert.severity)); + paused_alerts.truncate(5); resolved_alerts.sort_by_key(|alert| get_severity_priority(&alert.severity)); resolved_alerts.truncate(5); @@ -1712,9 +1238,9 @@ pub async fn get_alerts_summary() -> Result { total: triggered, alert_info: triggered_alerts, }, - silenced: AlertsInfoByState { - total: silenced, - alert_info: silenced_alerts, + paused: AlertsInfoByState { + total: paused, + alert_info: paused_alerts, }, resolved: AlertsInfoByState { total: resolved, diff --git a/src/alerts/target.rs b/src/alerts/target.rs index 12b7ded54..47026ecf4 100644 --- a/src/alerts/target.rs +++ b/src/alerts/target.rs @@ -36,11 +36,15 @@ use tracing::{error, trace, warn}; use ulid::Ulid; use url::Url; -use crate::{alerts::AlertError, parseable::PARSEABLE, storage::object_storage::target_json_path}; +use crate::{ + alerts::{AlertError, alert_traits::CallableTarget}, + parseable::PARSEABLE, + storage::object_storage::target_json_path, +}; use super::ALERTS; -use super::{AlertState, CallableTarget, Context}; +use super::{AlertState, Context}; pub static TARGETS: Lazy = Lazy::new(|| TargetConfigs { target_configs: RwLock::new(HashMap::new()), @@ -236,7 +240,7 @@ impl Target { // call once and then start sleeping // reduce repeats by 1 call_target(self.target.clone(), context.clone()); - trace!("state not timed out- {state:?}"); + // trace!("state not timed out- {state:?}"); // set state state.timed_out = true; state.awaiting_resolve = true; @@ -244,7 +248,7 @@ impl Target { self.spawn_timeout_task(timeout, context.clone()); } } - alert_state @ (AlertState::Resolved | AlertState::Silenced) => { + alert_state @ AlertState::NotTriggered => { state.alert_state = alert_state; if state.timed_out { // if in timeout and resolve came in, only process if it's the first one ( awaiting resolve ) @@ -258,6 +262,9 @@ impl Target { call_target(self.target.clone(), context); } + // do not send out any notifs + // (an eval should not have run!) + AlertState::Paused => {} } } @@ -447,11 +454,11 @@ impl CallableTarget for SlackWebHook { AlertState::Triggered => { serde_json::json!({ "text": payload.default_alert_string() }) } - AlertState::Resolved => { + AlertState::NotTriggered => { serde_json::json!({ "text": payload.default_resolved_string() }) } - AlertState::Silenced => { - serde_json::json!({ "text": payload.default_silenced_string() }) + AlertState::Paused => { + serde_json::json!({ "text": payload.default_paused_string() }) } }; @@ -485,8 +492,8 @@ impl CallableTarget for OtherWebHook { let alert = match payload.alert_info.alert_state { AlertState::Triggered => payload.default_alert_string(), - AlertState::Resolved => payload.default_resolved_string(), - AlertState::Silenced => payload.default_silenced_string(), + AlertState::NotTriggered => payload.default_resolved_string(), + AlertState::Paused => payload.default_paused_string(), }; let request = client @@ -562,7 +569,7 @@ impl CallableTarget for AlertManager { // fill in status label accordingly match payload.alert_info.alert_state { AlertState::Triggered => alert["labels"]["status"] = "triggered".into(), - AlertState::Resolved => { + AlertState::NotTriggered => { alert["labels"]["status"] = "resolved".into(); alert["annotations"]["reason"] = serde_json::Value::String(payload.default_resolved_string()); @@ -570,14 +577,14 @@ impl CallableTarget for AlertManager { .to_rfc3339_opts(chrono::SecondsFormat::Millis, true) .into(); } - AlertState::Silenced => { - alert["labels"]["status"] = "silenced".into(); - alert["annotations"]["reason"] = - serde_json::Value::String(payload.default_silenced_string()); - // alert["endsAt"] = Utc::now() - // .to_rfc3339_opts(chrono::SecondsFormat::Millis, true) - // .into(); - } + AlertState::Paused => alert["labels"]["status"] = "paused".into(), // AlertState::Silenced => { + // alert["labels"]["status"] = "silenced".into(); + // alert["annotations"]["reason"] = + // serde_json::Value::String(payload.default_silenced_string()); + // // alert["endsAt"] = Utc::now() + // // .to_rfc3339_opts(chrono::SecondsFormat::Millis, true) + // // .into(); + // } }; if let Err(e) = client diff --git a/src/handlers/http/alerts.rs b/src/handlers/http/alerts.rs index c86b2041c..61d0d92ff 100644 --- a/src/handlers/http/alerts.rs +++ b/src/handlers/http/alerts.rs @@ -16,13 +16,19 @@ * */ -use std::{collections::HashMap, str::FromStr}; +use std::collections::HashMap; use crate::{ - alerts::{AlertType, alert_types::ThresholdAlert, traits::AlertTrait}, + alerts::{ + ALERTS, AlertError, AlertState, + alert_enums::{AlertType, NotificationState}, + alert_structs::{AlertConfig, AlertRequest}, + alert_traits::AlertTrait, + alert_types::ThresholdAlert, + }, parseable::PARSEABLE, storage::object_storage::alert_json_path, - utils::{actix::extract_session_key_from_req, user_auth_for_query}, + utils::{actix::extract_session_key_from_req, time::TimeRange, user_auth_for_query}, }; use actix_web::{ HttpRequest, Responder, @@ -31,28 +37,15 @@ use actix_web::{ use bytes::Bytes; use ulid::Ulid; -use crate::alerts::{ALERTS, AlertConfig, AlertError, AlertRequest, AlertState, Severity}; - // GET /alerts /// User needs at least a read access to the stream(s) that is being referenced in an alert /// Read all alerts then return alerts which satisfy the condition -/// Supports pagination with optional query parameters: -/// - tags: comma-separated list of tags to filter alerts -/// - offset: number of alerts to skip (default: 0) -/// - limit: maximum number of alerts to return (default: 100, max: 1000) pub async fn list(req: HttpRequest) -> Result { let session_key = extract_session_key_from_req(&req)?; let query_map = web::Query::>::from_query(req.query_string()) - .map_err(|_| AlertError::InvalidQueryParameter("malformed query parameters".to_string()))?; - + .map_err(|_| AlertError::InvalidQueryParameter)?; let mut tags_list = Vec::new(); - let mut offset = 0usize; - let mut limit = 100usize; // Default limit - const MAX_LIMIT: usize = 1000; // Maximum allowed limit - - // Parse query parameters if !query_map.is_empty() { - // Parse tags parameter if let Some(tags) = query_map.get("tags") { tags_list = tags .split(',') @@ -60,34 +53,10 @@ pub async fn list(req: HttpRequest) -> Result { .filter(|s| !s.is_empty()) .collect(); if tags_list.is_empty() { - return Err(AlertError::InvalidQueryParameter( - "empty tags not allowed with query param tags".to_string(), - )); - } - } - - // Parse offset parameter - if let Some(offset_str) = query_map.get("offset") { - offset = offset_str.parse().map_err(|_| { - AlertError::InvalidQueryParameter("offset is not a valid number".to_string()) - })?; - } - - // Parse limit parameter - if let Some(limit_str) = query_map.get("limit") { - limit = limit_str.parse().map_err(|_| { - AlertError::InvalidQueryParameter("limit is not a valid number".to_string()) - })?; - - // Validate limit bounds - if limit == 0 || limit > MAX_LIMIT { - return Err(AlertError::InvalidQueryParameter( - "limit should be between 1 and 1000".to_string(), - )); + return Err(AlertError::InvalidQueryParameter); } } } - let guard = ALERTS.read().await; let alerts = if let Some(alerts) = guard.as_ref() { alerts @@ -96,51 +65,11 @@ pub async fn list(req: HttpRequest) -> Result { }; let alerts = alerts.list_alerts_for_user(session_key, tags_list).await?; - let mut alerts_summary = alerts + let alerts_summary = alerts .iter() .map(|alert| alert.to_summary()) .collect::>(); - - // Sort by state priority (Triggered > Silenced > Resolved) then by severity (Critical > High > Medium > Low) - alerts_summary.sort_by(|a, b| { - // Parse state and severity from JSON values back to enums - let state_a = a - .get("state") - .and_then(|v| v.as_str()) - .and_then(|s| s.parse::().ok()) - .unwrap_or(AlertState::Resolved); // Default to lowest priority - - let state_b = b - .get("state") - .and_then(|v| v.as_str()) - .and_then(|s| s.parse::().ok()) - .unwrap_or(AlertState::Resolved); - - let severity_a = a - .get("severity") - .and_then(|v| v.as_str()) - .and_then(|s| s.parse::().ok()) - .unwrap_or(Severity::Low); // Default to lowest priority - - let severity_b = b - .get("severity") - .and_then(|v| v.as_str()) - .and_then(|s| s.parse::().ok()) - .unwrap_or(Severity::Low); - - // First sort by state, then by severity - state_a - .cmp(&state_b) - .then_with(|| severity_a.cmp(&severity_b)) - }); - - let paginated_alerts = alerts_summary - .into_iter() - .skip(offset) - .take(limit) - .collect::>(); - - Ok(web::Json(paginated_alerts)) + Ok(web::Json(alerts_summary)) } // POST /alerts @@ -251,13 +180,27 @@ pub async fn delete(req: HttpRequest, alert_id: Path) -> Result, + Json(mut new_notification_state): Json, ) -> Result { let session_key = extract_session_key_from_req(&req)?; let alert_id = alert_id.into_inner(); + // validate notif state + match &mut new_notification_state { + NotificationState::Notify => {} + NotificationState::Snoozed(till_time) => { + let time_range = TimeRange::parse_human_time(till_time, "now") + .map_err(|e| AlertError::CustomError(format!("Invalid time value passed- {e}")))?; + + // using the difference between start and end times, calculate the new end time + let delta = time_range.end - time_range.start; + *till_time = (time_range.end + delta).to_rfc3339(); + } + } + let guard = ALERTS.write().await; let alerts = if let Some(alerts) = guard.as_ref() { alerts @@ -266,31 +209,74 @@ pub async fn update_state( }; // check if alert id exists in map - let mut alert = alerts.get_alert_by_id(alert_id).await?; + let alert = alerts.get_alert_by_id(alert_id).await?; // validate that the user has access to the tables mentioned in the query user_auth_for_query(&session_key, alert.get_query()).await?; - let query_string = req.query_string(); + alerts + .update_notification_state(alert_id, new_notification_state) + .await?; + let alert = alerts.get_alert_by_id(alert_id).await?; - if query_string.is_empty() { - return Err(AlertError::InvalidStateChange( - "No query string provided".to_string(), - )); - } + Ok(web::Json(alert.to_alert_config())) +} - let tokens = query_string.split('=').collect::>(); - let state_key = tokens[0]; - let state_value = tokens[1]; - if state_key != "state" { - return Err(AlertError::InvalidStateChange( - "Invalid query parameter".to_string(), - )); - } +// PUT /alerts/{alert_id}/pause +/// first save on disk, then in memory +/// then modify scheduled task +pub async fn pause_alert( + req: HttpRequest, + alert_id: Path, +) -> Result { + let session_key = extract_session_key_from_req(&req)?; + let alert_id = alert_id.into_inner(); + + let guard = ALERTS.write().await; + let alerts = if let Some(alerts) = guard.as_ref() { + alerts + } else { + return Err(AlertError::CustomError("No AlertManager set".into())); + }; - let new_state = AlertState::from_str(state_value)?; - alert.update_state(true, new_state, Some("".into())).await?; + // check if alert id exists in map + let alert = alerts.get_alert_by_id(alert_id).await?; + // validate that the user has access to the tables mentioned in the query + user_auth_for_query(&session_key, alert.get_query()).await?; + + alerts + .update_state(alert_id, AlertState::Paused, Some("".into())) + .await?; + let alert = alerts.get_alert_by_id(alert_id).await?; - alerts.update(&*alert).await; + Ok(web::Json(alert.to_alert_config())) +} + +// PUT /alerts/{alert_id}/resume +/// first save on disk, then in memory +/// then modify scheduled task +pub async fn resume_alert( + req: HttpRequest, + alert_id: Path, +) -> Result { + let session_key = extract_session_key_from_req(&req)?; + let alert_id = alert_id.into_inner(); + + let guard = ALERTS.write().await; + let alerts = if let Some(alerts) = guard.as_ref() { + alerts + } else { + return Err(AlertError::CustomError("No AlertManager set".into())); + }; + + // check if alert id exists in map + let alert = alerts.get_alert_by_id(alert_id).await?; + // validate that the user has access to the tables mentioned in the query + user_auth_for_query(&session_key, alert.get_query()).await?; + + alerts + .update_state(alert_id, AlertState::NotTriggered, Some("".into())) + .await?; + let alert = alerts.get_alert_by_id(alert_id).await?; Ok(web::Json(alert.to_alert_config())) } diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs index 3e8170e41..b11e3897f 100644 --- a/src/handlers/http/modal/server.rs +++ b/src/handlers/http/modal/server.rs @@ -267,7 +267,7 @@ impl Server { .route(web::get().to(alerts::get).authorize(Action::GetAlert)) .route( web::put() - .to(alerts::update_state) + .to(alerts::update_notification_state) .authorize(Action::PutAlert), ) .route( @@ -276,6 +276,20 @@ impl Server { .authorize(Action::DeleteAlert), ), ) + .service( + web::resource("/{alert_id}/pause").route( + web::put() + .to(alerts::pause_alert) + .authorize(Action::PutAlert), + ), + ) + .service( + web::resource("/{alert_id}/resume").route( + web::put() + .to(alerts::resume_alert) + .authorize(Action::PutAlert), + ), + ) } pub fn get_targets_webscope() -> Scope { diff --git a/src/handlers/http/targets.rs b/src/handlers/http/targets.rs index 41cad80c7..3c5958a56 100644 --- a/src/handlers/http/targets.rs +++ b/src/handlers/http/targets.rs @@ -19,7 +19,8 @@ pub async fn post( // add to the map TARGETS.update(target.clone()).await?; - Ok(web::Json(target.mask())) + // Ok(web::Json(target.mask())) + Ok(web::Json(target)) } // GET /targets @@ -29,7 +30,7 @@ pub async fn list(_req: HttpRequest) -> Result { .list() .await? .into_iter() - .map(|t| t.mask()) + // .map(|t| t.mask()) .collect_vec(); Ok(web::Json(list)) @@ -41,7 +42,8 @@ pub async fn get(_req: HttpRequest, target_id: Path) -> Result RelativePathBuf { #[inline(always)] pub fn manifest_path(prefix: &str) -> RelativePathBuf { - let hostname = hostname::get() - .unwrap_or_else(|_| std::ffi::OsString::from(&Ulid::new().to_string())) - .into_string() - .unwrap_or_else(|_| Ulid::new().to_string()) - .matches(|c: char| c.is_alphanumeric() || c == '-' || c == '_') - .collect::(); - - if PARSEABLE.options.mode == Mode::Ingest { - let id = INGESTOR_META - .get() - .unwrap_or_else(|| panic!("{}", INGESTOR_EXPECT)) - .get_node_id(); - - let manifest_file_name = format!("ingestor.{hostname}.{id}.{MANIFEST_FILE}"); - RelativePathBuf::from_iter([prefix, &manifest_file_name]) - } else { - let manifest_file_name = format!("{hostname}.{MANIFEST_FILE}"); - RelativePathBuf::from_iter([prefix, &manifest_file_name]) + match &PARSEABLE.options.mode { + Mode::Ingest => { + let id = INGESTOR_META + .get() + .unwrap_or_else(|| panic!("{}", INGESTOR_EXPECT)) + .get_node_id(); + let manifest_file_name = format!("ingestor.{id}.{MANIFEST_FILE}"); + RelativePathBuf::from_iter([prefix, &manifest_file_name]) + } + _ => RelativePathBuf::from_iter([prefix, MANIFEST_FILE]), } } diff --git a/src/sync.rs b/src/sync.rs index bfc6ba88d..382165d9c 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -27,7 +27,8 @@ use tokio::time::{Duration, Instant, interval_at, sleep}; use tokio::{select, task}; use tracing::{error, info, trace, warn}; -use crate::alerts::{AlertTask, alerts_utils}; +use crate::alerts::alert_enums::AlertTask; +use crate::alerts::alerts_utils; use crate::parseable::PARSEABLE; use crate::storage::object_storage::sync_all_streams; use crate::{LOCAL_SYNC_INTERVAL, STORAGE_UPLOAD_INTERVAL}; @@ -308,6 +309,7 @@ pub async fn alert_runtime(mut rx: mpsc::Receiver) -> Result<(), anyh loop { match alerts_utils::evaluate_alert(&*alert).await { Ok(_) => { + warn!(evaluated_alert_id=?alert.get_id()); retry_counter = 0; } Err(err) => { From a331597fe91a1d1227bf0d81c088696a569e7107 Mon Sep 17 00:00:00 2001 From: anant Date: Mon, 4 Aug 2025 16:28:19 +0530 Subject: [PATCH 2/5] updates: - separated `NotificationConfig` from target config --- src/alerts/alert_enums.rs | 18 ++++++++++++++++++ src/alerts/alert_structs.rs | 32 ++++++++++++++++++++++++++++++-- src/alerts/alert_types.rs | 27 +++++++++++++-------------- src/alerts/mod.rs | 28 +++++++++++++--------------- src/alerts/target.rs | 18 ++++++------------ 5 files changed, 80 insertions(+), 43 deletions(-) diff --git a/src/alerts/alert_enums.rs b/src/alerts/alert_enums.rs index f07db3c44..e426067d6 100644 --- a/src/alerts/alert_enums.rs +++ b/src/alerts/alert_enums.rs @@ -1,3 +1,21 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + use std::fmt::{self, Display}; use derive_more::derive::FromStr; diff --git a/src/alerts/alert_structs.rs b/src/alerts/alert_structs.rs index 0695ef45a..21bbae989 100644 --- a/src/alerts/alert_structs.rs +++ b/src/alerts/alert_structs.rs @@ -1,3 +1,21 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + use std::collections::HashMap; use chrono::{DateTime, Utc}; @@ -13,7 +31,7 @@ use crate::{ LogicalOperator, NotificationState, Severity, WhereConfigOperator, }, alert_traits::AlertTrait, - target::TARGETS, + target::{NotificationConfig, TARGETS}, }, query::resolve_stream_names, }; @@ -36,14 +54,21 @@ pub struct Context { pub alert_info: AlertInfo, pub deployment_info: DeploymentInfo, pub message: String, + pub notification_config: NotificationConfig, } impl Context { - pub fn new(alert_info: AlertInfo, deployment_info: DeploymentInfo, message: String) -> Self { + pub fn new( + alert_info: AlertInfo, + deployment_info: DeploymentInfo, + notification_config: NotificationConfig, + message: String, + ) -> Self { Self { alert_info, deployment_info, message, + notification_config, } } @@ -226,6 +251,7 @@ pub struct AlertRequest { pub query: String, pub alert_type: AlertType, pub threshold_config: ThresholdConfig, + pub notification_config: NotificationConfig, pub eval_config: EvalConfig, pub targets: Vec, pub tags: Option>, @@ -251,6 +277,7 @@ impl AlertRequest { targets: self.targets, state: AlertState::default(), notification_state: NotificationState::Notify, + notification_config: self.notification_config, created: Utc::now(), tags: self.tags, }; @@ -276,6 +303,7 @@ pub struct AlertConfig { #[serde(default)] pub state: AlertState, pub notification_state: NotificationState, + pub notification_config: NotificationConfig, pub created: DateTime, pub tags: Option>, } diff --git a/src/alerts/alert_types.rs b/src/alerts/alert_types.rs index 18e58ab58..e849cf294 100644 --- a/src/alerts/alert_types.rs +++ b/src/alerts/alert_types.rs @@ -31,7 +31,7 @@ use crate::{ alert_traits::{AlertTrait, MessageCreation}, alerts_utils::{evaluate_condition, execute_alert_query, extract_time_range}, is_query_aggregate, - target::{self, TARGETS}, + target::{self, NotificationConfig}, }, handlers::http::query::create_streams_for_distributed, option::Mode, @@ -58,6 +58,7 @@ pub struct ThresholdAlert { #[serde(default)] pub state: AlertState, pub notification_state: NotificationState, + pub notification_config: NotificationConfig, pub created: DateTime, pub tags: Option>, pub datasets: Vec, @@ -105,19 +106,15 @@ impl AlertTrait for ThresholdAlert { }; // validate that target repeat notifs !> eval_frequency - for target_id in &self.targets { - let target = TARGETS.get_target_by_id(target_id).await?; - match &target.notification_config.times { - target::Retry::Infinite => {} - target::Retry::Finite(repeat) => { - let notif_duration = - Duration::from_secs(60 * target.notification_config.interval) - * *repeat as u32; - if (notif_duration.as_secs_f64()).gt(&((eval_frequency * 60) as f64)) { - return Err(AlertError::Metadata( - "evalFrequency should be greater than target repetition interval", - )); - } + match &self.notification_config.times { + target::Retry::Infinite => {} + target::Retry::Finite(repeat) => { + let notif_duration = + Duration::from_secs(60 * self.notification_config.interval) * *repeat as u32; + if (notif_duration.as_secs_f64()).gt(&((eval_frequency * 60) as f64)) { + return Err(AlertError::Metadata( + "evalFrequency should be greater than target repetition interval", + )); } } } @@ -329,6 +326,7 @@ impl From for ThresholdAlert { targets: value.targets, state: value.state, notification_state: value.notification_state, + notification_config: value.notification_config, created: value.created, tags: value.tags, datasets: value.datasets, @@ -350,6 +348,7 @@ impl From for AlertConfig { targets: val.targets, state: val.state, notification_state: val.notification_state, + notification_config: val.notification_config, created: val.created, tags: val.tags, datasets: val.datasets, diff --git a/src/alerts/mod.rs b/src/alerts/mod.rs index 35173ddf0..f700f8ab9 100644 --- a/src/alerts/mod.rs +++ b/src/alerts/mod.rs @@ -53,7 +53,7 @@ pub use crate::alerts::alert_structs::{ }; use crate::alerts::alert_traits::{AlertManagerTrait, AlertTrait}; use crate::alerts::alert_types::ThresholdAlert; -use crate::alerts::target::TARGETS; +use crate::alerts::target::{NotificationConfig, TARGETS}; use crate::handlers::http::fetch_schema; use crate::handlers::http::query::create_streams_for_distributed; use crate::option::Mode; @@ -130,6 +130,7 @@ impl AlertConfig { targets, state, notification_state: NotificationState::Notify, + notification_config: NotificationConfig::default(), created: Utc::now(), tags: None, }; @@ -603,19 +604,15 @@ impl AlertConfig { }; // validate that target repeat notifs !> eval_frequency - for target_id in &self.targets { - let target = TARGETS.get_target_by_id(target_id).await?; - match &target.notification_config.times { - target::Retry::Infinite => {} - target::Retry::Finite(repeat) => { - let notif_duration = - Duration::from_secs(60 * target.notification_config.interval) - * *repeat as u32; - if (notif_duration.as_secs_f64()).gt(&((eval_frequency * 60) as f64)) { - return Err(AlertError::Metadata( - "evalFrequency should be greater than target repetition interval", - )); - } + match &self.notification_config.times { + target::Retry::Infinite => {} + target::Retry::Finite(repeat) => { + let notif_duration = + Duration::from_secs(60 * self.notification_config.interval) * *repeat as u32; + if (notif_duration.as_secs_f64()).gt(&((eval_frequency * 60) as f64)) { + return Err(AlertError::Metadata( + "evalFrequency should be greater than target repetition interval", + )); } } } @@ -675,6 +672,7 @@ impl AlertConfig { self.severity.clone().to_string(), ), DeploymentInfo::new(deployment_instance, deployment_id, deployment_mode), + self.notification_config.clone(), String::default(), ) } @@ -951,7 +949,7 @@ impl AlertManagerTrait for Alerts { // Create alert task iff alert's state is not paused if alert.get_state().eq(&AlertState::Paused) { map.insert(*alert.get_id(), alert); - return Ok(()); + continue; } match self.sender.send(AlertTask::Create(alert.clone_box())).await { diff --git a/src/alerts/target.rs b/src/alerts/target.rs index 47026ecf4..65b8e6e5e 100644 --- a/src/alerts/target.rs +++ b/src/alerts/target.rs @@ -151,7 +151,6 @@ pub struct Target { pub name: String, #[serde(flatten)] pub target: TargetType, - pub notification_config: Timeout, #[serde(default = "Ulid::new")] pub id: Ulid, } @@ -170,7 +169,6 @@ impl Target { "name":self.name, "type":"slack", "endpoint":masked_endpoint, - "notificationConfig":self.notification_config, "id":self.id }) } @@ -187,7 +185,6 @@ impl Target { "endpoint":masked_endpoint, "headers":other_web_hook.headers, "skipTlsCheck":other_web_hook.skip_tls_check, - "notificationConfig":self.notification_config, "id":self.id }) } @@ -207,7 +204,6 @@ impl Target { "username":auth.username, "password":password, "skipTlsCheck":alert_manager.skip_tls_check, - "notificationConfig":self.notification_config, "id":self.id }) } else { @@ -218,7 +214,6 @@ impl Target { "username":Value::Null, "password":Value::Null, "skipTlsCheck":alert_manager.skip_tls_check, - "notificationConfig":self.notification_config, "id":self.id }) } @@ -228,7 +223,7 @@ impl Target { pub fn call(&self, context: Context) { trace!("target.call context- {context:?}"); - let timeout = &self.notification_config; + let timeout = context.notification_config.clone(); let resolves = context.alert_info.alert_state; let mut state = timeout.state.lock().unwrap(); trace!("target.call state- {state:?}"); @@ -245,7 +240,7 @@ impl Target { state.timed_out = true; state.awaiting_resolve = true; drop(state); - self.spawn_timeout_task(timeout, context.clone()); + self.spawn_timeout_task(&timeout, context.clone()); } } alert_state @ AlertState::NotTriggered => { @@ -268,7 +263,7 @@ impl Target { } } - fn spawn_timeout_task(&self, target_timeout: &Timeout, alert_context: Context) { + fn spawn_timeout_task(&self, target_timeout: &NotificationConfig, alert_context: Context) { trace!("repeat-\n{target_timeout:?}"); let state = Arc::clone(&target_timeout.state); let retry = target_timeout.times; @@ -383,7 +378,7 @@ impl TryFrom for Target { type Error = String; fn try_from(value: TargetVerifier) -> Result { - let mut timeout = Timeout::default(); + let mut timeout = NotificationConfig::default(); // Default is Infinite in case of alertmanager if matches!(value.target, TargetType::AlertManager(_)) { @@ -405,7 +400,6 @@ impl TryFrom for Target { Ok(Target { name: value.name, target: value.target, - notification_config: timeout, id: value.id, }) } @@ -599,7 +593,7 @@ impl CallableTarget for AlertManager { } #[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] -pub struct Timeout { +pub struct NotificationConfig { pub interval: u64, #[serde(default = "Retry::default")] pub times: Retry, @@ -607,7 +601,7 @@ pub struct Timeout { pub state: Arc>, } -impl Default for Timeout { +impl Default for NotificationConfig { fn default() -> Self { Self { interval: 1, From 9e7716f40f15e4431202a065eb866f7a43c55a67 Mon Sep 17 00:00:00 2001 From: parmesant Date: Tue, 5 Aug 2025 11:07:26 +0530 Subject: [PATCH 3/5] Update sync.rs Signed-off-by: parmesant --- src/sync.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/sync.rs b/src/sync.rs index 382165d9c..f5ffdb998 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -309,7 +309,6 @@ pub async fn alert_runtime(mut rx: mpsc::Receiver) -> Result<(), anyh loop { match alerts_utils::evaluate_alert(&*alert).await { Ok(_) => { - warn!(evaluated_alert_id=?alert.get_id()); retry_counter = 0; } Err(err) => { From f6bfdf812f3d415e759aa033332419f3a56247af Mon Sep 17 00:00:00 2001 From: parmesant Date: Tue, 5 Aug 2025 14:24:42 +0530 Subject: [PATCH 4/5] Update alert_structs.rs add serde default Signed-off-by: parmesant --- src/alerts/alert_structs.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/alerts/alert_structs.rs b/src/alerts/alert_structs.rs index 21bbae989..f248ee276 100644 --- a/src/alerts/alert_structs.rs +++ b/src/alerts/alert_structs.rs @@ -251,6 +251,7 @@ pub struct AlertRequest { pub query: String, pub alert_type: AlertType, pub threshold_config: ThresholdConfig, + #[serde(default)] pub notification_config: NotificationConfig, pub eval_config: EvalConfig, pub targets: Vec, From 15d8fa87a1512b8f441b56f3ce50ca3821a605b2 Mon Sep 17 00:00:00 2001 From: anant Date: Fri, 8 Aug 2025 17:13:04 +0530 Subject: [PATCH 5/5] Multiple changes Added endpoints- - PATCH /{alertid}/enable -> enable a disabled alert - PATCH /{alertid}/disable -> disable an alert - PATCH /{alertid}/update_notification_state -> update the notification state (notify, mute) - PUT /{alertid}/evaluate_alert -> run manual eval for an alert - PUT /{alertid} -> modify an alert --- src/about.rs | 8 +- src/alerts/alert_enums.rs | 36 ++- src/alerts/alert_structs.rs | 58 +++- src/alerts/alert_traits.rs | 7 +- src/alerts/alert_types.rs | 140 +++++++--- src/alerts/alerts_utils.rs | 231 ++++++++++++---- src/alerts/mod.rs | 261 +++++++++++------- src/alerts/target.rs | 25 +- src/catalog/manifest.rs | 9 +- src/catalog/mod.rs | 44 +-- src/event/format/json.rs | 6 +- src/handlers/http/alerts.rs | 191 +++++++++++-- src/handlers/http/audit.rs | 8 +- src/handlers/http/cluster/mod.rs | 58 ++-- src/handlers/http/logstream.rs | 8 +- .../http/modal/query/querier_logstream.rs | 8 +- src/handlers/http/modal/server.rs | 26 +- src/handlers/http/modal/ssl_acceptor.rs | 20 +- src/handlers/http/modal/utils/ingest_utils.rs | 45 +-- src/handlers/http/query.rs | 2 +- src/handlers/livetail.rs | 2 +- src/hottier.rs | 9 +- src/livetail.rs | 8 +- src/metadata.rs | 36 ++- src/migration/metadata_migration.rs | 24 +- src/otel/otel_utils.rs | 8 +- src/parseable/streams.rs | 50 ++-- src/query/mod.rs | 8 +- src/query/stream_schema_provider.rs | 28 +- src/stats.rs | 14 +- src/storage/object_storage.rs | 87 +++--- 31 files changed, 980 insertions(+), 485 deletions(-) diff --git a/src/about.rs b/src/about.rs index f2f4593b5..e2471c962 100644 --- a/src/about.rs +++ b/src/about.rs @@ -112,10 +112,10 @@ pub fn print_about( current_version, ); // " " " " - if let Some(latest_release) = latest_release { - if latest_release.version > current_version { - print_latest_release(latest_release); - } + if let Some(latest_release) = latest_release + && latest_release.version > current_version + { + print_latest_release(latest_release); } eprintln!( diff --git a/src/alerts/alert_enums.rs b/src/alerts/alert_enums.rs index e426067d6..10cf487e2 100644 --- a/src/alerts/alert_enums.rs +++ b/src/alerts/alert_enums.rs @@ -16,12 +16,20 @@ * */ -use std::fmt::{self, Display}; +use std::{ + fmt::{self, Display}, + str::FromStr, +}; +use chrono::{DateTime, Utc}; use derive_more::derive::FromStr; +use serde::ser::Error; use ulid::Ulid; -use crate::alerts::{alert_structs::RollingWindow, alert_traits::AlertTrait}; +use crate::alerts::{ + alert_structs::{AnomalyConfig, ForecastConfig, RollingWindow}, + alert_traits::AlertTrait, +}; pub enum AlertTask { Create(Box), @@ -87,16 +95,16 @@ impl Display for LogicalOperator { #[serde(rename_all = "camelCase")] pub enum AlertType { Threshold, - Anomaly, - Forecast, + Anomaly(AnomalyConfig), + Forecast(ForecastConfig), } impl Display for AlertType { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { AlertType::Threshold => write!(f, "threshold"), - AlertType::Anomaly => write!(f, "anomaly"), - AlertType::Forecast => write!(f, "forecast"), + AlertType::Anomaly(_) => write!(f, "anomaly"), + AlertType::Forecast(_) => write!(f, "forecast"), } } } @@ -232,14 +240,14 @@ pub enum AlertState { Triggered, #[default] NotTriggered, - Paused, + Disabled, } impl Display for AlertState { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { AlertState::Triggered => write!(f, "triggered"), - AlertState::Paused => write!(f, "paused"), + AlertState::Disabled => write!(f, "disabled"), AlertState::NotTriggered => write!(f, "not-triggered"), } } @@ -255,14 +263,22 @@ pub enum NotificationState { /// It is a state which can only be set manually /// /// user needs to pass the timestamp or the duration (in human time) till which the alert is silenced - Snoozed(String), + Mute(String), } impl Display for NotificationState { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { NotificationState::Notify => write!(f, "notify"), - NotificationState::Snoozed(end_time) => write!(f, "snoozed till {end_time}"), + NotificationState::Mute(till_time) => { + let till = match till_time.as_str() { + "indefinite" => DateTime::::MAX_UTC.to_rfc3339(), + _ => DateTime::::from_str(till_time) + .map_err(|e| std::fmt::Error::custom(e.to_string()))? + .to_rfc3339(), + }; + write!(f, "{till}") + } } } } diff --git a/src/alerts/alert_structs.rs b/src/alerts/alert_structs.rs index f248ee276..b734d3150 100644 --- a/src/alerts/alert_structs.rs +++ b/src/alerts/alert_structs.rs @@ -19,7 +19,7 @@ use std::collections::HashMap; use chrono::{DateTime, Utc}; -use serde::Serialize; +use serde::{Deserialize, Serialize}; use tokio::sync::{RwLock, mpsc}; use ulid::Ulid; @@ -86,9 +86,9 @@ impl Context { format!("{} is now `not-triggered` ", self.alert_info.alert_name) } - pub(crate) fn default_paused_string(&self) -> String { + pub(crate) fn default_disabled_string(&self) -> String { format!( - "{} is now `paused`. No more evals will be run till it is `paused`.", + "{} is now `disabled`. No more evals will be run till it is `disabled`.", self.alert_info.alert_name ) } @@ -242,6 +242,16 @@ pub struct RollingWindow { pub eval_frequency: u64, } +impl Default for RollingWindow { + fn default() -> Self { + Self { + eval_start: "10m".into(), + eval_end: "now".into(), + eval_frequency: 10, + } + } +} + #[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] #[serde(rename_all = "camelCase")] pub struct AlertRequest { @@ -329,3 +339,45 @@ pub struct AlertsInfo { pub id: Ulid, pub severity: Severity, } + +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, PartialEq)] +#[serde(rename_all = "camelCase")] +pub struct ForecastConfig { + pub historic_duration: String, + pub forecast_duration: String, +} + +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, PartialEq)] +#[serde(rename_all = "camelCase")] +pub struct AnomalyConfig { + pub historic_duration: String, +} + +/// Result structure for alert query execution with group support +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct AlertQueryResult { + /// List of group results, each containing group values and the aggregate value + pub groups: Vec, + /// True if this is a simple query without GROUP BY (single group with empty group_values) + pub is_simple_query: bool, +} + +/// Result for a single group in a GROUP BY query +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct GroupResult { + /// The group-by column values (empty for non-GROUP BY queries) + pub group_values: HashMap, + /// The aggregate function value for this group + pub aggregate_value: f64, +} + +impl AlertQueryResult { + /// Get the single aggregate value for simple queries (backward compatibility) + pub fn get_single_value(&self) -> f64 { + if self.is_simple_query && !self.groups.is_empty() { + self.groups[0].aggregate_value + } else { + 0.0 + } + } +} diff --git a/src/alerts/alert_traits.rs b/src/alerts/alert_traits.rs index ec019b10a..aa1133f82 100644 --- a/src/alerts/alert_traits.rs +++ b/src/alerts/alert_traits.rs @@ -65,16 +65,15 @@ pub trait AlertTrait: Debug + Send + Sync { fn get_alert_type(&self) -> &AlertType; fn get_threshold_config(&self) -> &ThresholdConfig; fn get_eval_config(&self) -> &EvalConfig; - fn get_targets(&self) -> &Vec; + fn get_targets(&self) -> &[Ulid]; fn get_state(&self) -> &AlertState; - fn get_eval_window(&self) -> String; + fn get_eval_window(&self) -> &str; fn get_eval_frequency(&self) -> u64; fn get_created(&self) -> String; fn get_tags(&self) -> &Option>; - fn get_datasets(&self) -> &Vec; + fn get_datasets(&self) -> &[String]; fn to_alert_config(&self) -> AlertConfig; fn clone_box(&self) -> Box; - // fn get_alert_message(&self) -> Result; } #[async_trait] diff --git a/src/alerts/alert_types.rs b/src/alerts/alert_types.rs index e849cf294..439c4c5c9 100644 --- a/src/alerts/alert_types.rs +++ b/src/alerts/alert_types.rs @@ -28,13 +28,13 @@ use crate::{ AlertConfig, AlertError, AlertState, AlertType, AlertVersion, EvalConfig, Severity, ThresholdConfig, alert_enums::NotificationState, + alert_structs::GroupResult, alert_traits::{AlertTrait, MessageCreation}, alerts_utils::{evaluate_condition, execute_alert_query, extract_time_range}, - is_query_aggregate, + get_number_of_agg_exprs, target::{self, NotificationConfig}, }, handlers::http::query::create_streams_for_distributed, - option::Mode, parseable::PARSEABLE, query::resolve_stream_names, rbac::map::SessionKey, @@ -68,31 +68,49 @@ pub struct ThresholdAlert { impl AlertTrait for ThresholdAlert { async fn eval_alert(&self) -> Result, AlertError> { let time_range = extract_time_range(&self.eval_config)?; - let final_value = execute_alert_query(self.get_query(), &time_range).await?; - let result = evaluate_condition( - &self.threshold_config.operator, - final_value, - self.threshold_config.value, - ); + let query_result = execute_alert_query(self.get_query(), &time_range).await?; + + if query_result.is_simple_query { + // Handle simple queries + let final_value = query_result.get_single_value(); + let result = evaluate_condition( + &self.threshold_config.operator, + final_value, + self.threshold_config.value, + ); - let message = if result { - // generate message - Some(self.create_threshold_message(final_value)?) + let message = if result { + Some(self.create_threshold_message(final_value)?) + } else { + None + }; + Ok(message) } else { - None - }; - Ok(message) - } + // Handle GROUP BY queries - evaluate each group + let mut breached_groups = Vec::new(); + + for group in &query_result.groups { + let result = evaluate_condition( + &self.threshold_config.operator, + group.aggregate_value, + self.threshold_config.value, + ); + + if result { + breached_groups.push(group.clone()); + } + } - async fn validate(&self, session_key: &SessionKey) -> Result<(), AlertError> { - // validate alert type - // Anomaly is only allowed in Prism - if self.alert_type.eq(&AlertType::Anomaly) && PARSEABLE.options.mode != Mode::Prism { - return Err(AlertError::CustomError( - "Anomaly alert is only allowed on Prism mode".into(), - )); + let message = if !breached_groups.is_empty() { + Some(self.create_group_message(&breached_groups)?) + } else { + None + }; + Ok(message) } + } + async fn validate(&self, session_key: &SessionKey) -> Result<(), AlertError> { // validate evalType let eval_frequency = match &self.eval_config { EvalConfig::RollingWindow(rolling_window) => { @@ -121,23 +139,28 @@ impl AlertTrait for ThresholdAlert { // validate that the query is valid if self.query.is_empty() { - return Err(AlertError::InvalidAlertQuery); + return Err(AlertError::InvalidAlertQuery("Empty query".into())); } let tables = resolve_stream_names(&self.query)?; if tables.is_empty() { - return Err(AlertError::InvalidAlertQuery); + return Err(AlertError::InvalidAlertQuery( + "No tables found in query".into(), + )); } create_streams_for_distributed(tables) .await - .map_err(|_| AlertError::InvalidAlertQuery)?; + .map_err(|_| AlertError::InvalidAlertQuery("Invalid tables".into()))?; // validate that the user has access to the tables mentioned in the query user_auth_for_query(session_key, &self.query).await?; // validate that the alert query is valid and can be evaluated - if !is_query_aggregate(&self.query).await? { - return Err(AlertError::InvalidAlertQuery); + let num_aggrs = get_number_of_agg_exprs(&self.query).await?; + if num_aggrs != 1 { + return Err(AlertError::InvalidAlertQuery(format!( + "Found {num_aggrs} aggregate expressions, only 1 allowed" + ))); } Ok(()) } @@ -161,9 +184,9 @@ impl AlertTrait for ThresholdAlert { trigger_notif: Option, ) -> Result<(), AlertError> { let store = PARSEABLE.storage.get_object_store(); - if self.state.eq(&AlertState::Paused) { + if self.state.eq(&AlertState::Disabled) { warn!( - "Alert- {} has been Paused. No evals will be done till it is unpaused.", + "Alert- {} is currently Disabled. Updating state to {new_state}.", self.id ); // update state in memory @@ -177,11 +200,14 @@ impl AlertTrait for ThresholdAlert { match &mut self.notification_state { NotificationState::Notify => {} - NotificationState::Snoozed(till_time) => { + NotificationState::Mute(till_time) => { // if now > till_time, modify notif state to notify and proceed let now = Utc::now(); - let till = DateTime::::from_str(till_time) - .map_err(|e| AlertError::CustomError(e.to_string()))?; + let till = match till_time.as_str() { + "indefinite" => DateTime::::MAX_UTC, + _ => DateTime::::from_str(till_time) + .map_err(|e| AlertError::CustomError(e.to_string()))?, + }; if now > till { info!( "Modifying alert notif state from snoozed to notify- Now= {now}, Snooze till= {till}" @@ -234,7 +260,7 @@ impl AlertTrait for ThresholdAlert { &self.eval_config } - fn get_targets(&self) -> &Vec { + fn get_targets(&self) -> &[Ulid] { &self.targets } @@ -248,21 +274,21 @@ impl AlertTrait for ThresholdAlert { } } - fn get_eval_window(&self) -> String { + fn get_eval_window(&self) -> &str { match &self.eval_config { - EvalConfig::RollingWindow(rolling_window) => rolling_window.eval_start.clone(), + EvalConfig::RollingWindow(rolling_window) => rolling_window.eval_start.as_str(), } } fn get_created(&self) -> String { - self.created.to_string() + self.created.to_rfc3339() } fn get_tags(&self) -> &Option> { &self.tags } - fn get_datasets(&self) -> &Vec { + fn get_datasets(&self) -> &[String] { &self.datasets } @@ -355,3 +381,43 @@ impl From for AlertConfig { } } } + +impl ThresholdAlert { + fn create_group_message(&self, breached_groups: &[GroupResult]) -> Result { + let mut message = format!( + "Alert Triggered: {}\n\nThreshold: ({} {})\nEvaluation Window: {} | Frequency: {}\n\n", + self.get_id(), + self.get_threshold_config().operator, + self.get_threshold_config().value, + self.get_eval_window(), + self.get_eval_frequency() + ); + + message.push_str(&format!( + "Alerting Groups ({} total):\n", + breached_groups.len() + )); + + for (index, group) in breached_groups.iter().enumerate() { + message.push_str(&format!("{}. ", index + 1)); + + if group.group_values.is_empty() { + message.push_str("[No GROUP BY]"); + } else { + let group_desc = group + .group_values + .iter() + .map(|(key, value)| format!("{}: {}", key, value)) + .collect::>() + .join(", "); + message.push_str(&group_desc); + } + + message.push_str(&format!(" → Value: {}\n", group.aggregate_value)); + } + + message.push_str(&format!("\nQuery:\n{}", self.get_query())); + + Ok(message) + } +} diff --git a/src/alerts/alerts_utils.rs b/src/alerts/alerts_utils.rs index 2a49b109a..85af83914 100644 --- a/src/alerts/alerts_utils.rs +++ b/src/alerts/alerts_utils.rs @@ -16,19 +16,22 @@ * */ -use std::fmt::Display; +use std::{collections::HashMap, fmt::Display}; use actix_web::Either; -use arrow_array::{Float64Array, Int64Array, RecordBatch}; +use arrow_array::{Array, Float64Array, Int64Array, RecordBatch}; use datafusion::{ - logical_expr::Literal, + logical_expr::{Literal, LogicalPlan}, prelude::{Expr, lit}, }; -use itertools::Itertools; use tracing::trace; use crate::{ - alerts::{AlertTrait, LogicalOperator, WhereConfigOperator, alert_structs::Conditions}, + alerts::{ + AlertTrait, LogicalOperator, WhereConfigOperator, + alert_structs::{AlertQueryResult, Conditions, GroupResult}, + extract_aggregate_aliases, + }, handlers::http::{ cluster::send_query_request, query::{Query, create_streams_for_distributed}, @@ -70,8 +73,11 @@ pub fn extract_time_range(eval_config: &super::EvalConfig) -> Result Result { +/// Execute the alert query based on the current mode and return structured group results +pub async fn execute_alert_query( + query: &str, + time_range: &TimeRange, +) -> Result { match PARSEABLE.options.mode { Mode::All | Mode::Query => execute_local_query(query, time_range).await, Mode::Prism => execute_remote_query(query, time_range).await, @@ -83,7 +89,10 @@ pub async fn execute_alert_query(query: &str, time_range: &TimeRange) -> Result< } /// Execute alert query locally (Query/All mode) -async fn execute_local_query(query: &str, time_range: &TimeRange) -> Result { +async fn execute_local_query( + query: &str, + time_range: &TimeRange, +) -> Result { let session_state = QUERY_SESSION.state(); let tables = resolve_stream_names(query)?; @@ -93,7 +102,7 @@ async fn execute_local_query(query: &str, time_range: &TimeRange) -> Result Result Result { +async fn execute_remote_query( + query: &str, + time_range: &TimeRange, +) -> Result { + let session_state = QUERY_SESSION.state(); + let raw_logical_plan = session_state.create_logical_plan(query).await?; + let query_request = Query { query: query.to_string(), start_time: time_range.start.to_rfc3339(), @@ -130,24 +145,111 @@ async fn execute_remote_query(query: &str, time_range: &TimeRange) -> Result Result { - // due to the previous validations, we can be sure that we get an array of objects with just one entry - // [{"countField": Number(1120.251)}] - if let Some(array_val) = result_value.as_array() - && !array_val.is_empty() - && let Some(object) = array_val[0].as_object() - { - let values = object.values().map(|v| v.as_f64().unwrap()).collect_vec(); - Ok(values[0]) +/// Convert JSON result value to AlertQueryResult +/// Handles both simple queries and GROUP BY queries with multiple rows +fn convert_result_to_group_results( + result_value: serde_json::Value, + plan: LogicalPlan, +) -> Result { + let array_val = result_value + .as_array() + .ok_or_else(|| AlertError::CustomError("Expected array in query result".to_string()))?; + + let aggregate_aliases = extract_aggregate_aliases(&plan); + + if array_val.is_empty() || aggregate_aliases.is_empty() { + return Ok(AlertQueryResult { + groups: vec![], + is_simple_query: true, + }); + } + + // take the first entry and extract the column name / alias + let (agg_condition, alias) = &aggregate_aliases[0]; + + let aggregate_key = if let Some(alias) = alias { + alias } else { - Err(AlertError::CustomError( - "Query result is not a number or response is empty".to_string(), - )) + agg_condition + }; + + // Find the aggregate column from the first row + let first_row = array_val[0] + .as_object() + .ok_or_else(|| AlertError::CustomError("Expected object in query result".to_string()))?; + + let is_simple_query = first_row.len() == 1; + let mut groups = Vec::new(); + + // Process each row as a separate group + for row in array_val { + if let Some(object) = row.as_object() { + let mut group_values = HashMap::new(); + let mut aggregate_value = 0.0; + + for (key, value) in object { + if key == aggregate_key { + aggregate_value = value.as_f64().ok_or_else(|| { + AlertError::CustomError(format!( + "Non-numeric value found in aggregate column '{}'", + aggregate_key + )) + })?; + } else { + // This is a GROUP BY column + group_values + .insert(key.clone(), value.to_string().trim_matches('"').to_string()); + } + } + + groups.push(GroupResult { + group_values, + aggregate_value, + }); + } + } + + Ok(AlertQueryResult { + groups, + is_simple_query, + }) +} + +/// Extract numeric value from an Arrow array at the given row index +fn extract_numeric_value(column: &dyn Array, row_index: usize) -> f64 { + if let Some(float_array) = column.as_any().downcast_ref::() { + if !float_array.is_null(row_index) { + return float_array.value(row_index); + } + } else if let Some(int_array) = column.as_any().downcast_ref::() + && !int_array.is_null(row_index) + { + return int_array.value(row_index) as f64; + } + 0.0 +} + +/// Extract string value from an Arrow array at the given row index +fn extract_string_value(column: &dyn Array, row_index: usize) -> String { + use arrow_array::StringArray; + + if let Some(string_array) = column.as_any().downcast_ref::() { + if !string_array.is_null(row_index) { + return string_array.value(row_index).to_string(); + } + } else if let Some(int_array) = column.as_any().downcast_ref::() { + if !int_array.is_null(row_index) { + return int_array.value(row_index).to_string(); + } + } else if let Some(float_array) = column.as_any().downcast_ref::() + && !float_array.is_null(row_index) + { + return float_array.value(row_index).to_string(); } + "null".to_string() } pub fn evaluate_condition(operator: &AlertOperator, actual: f64, expected: f64) -> bool { @@ -191,33 +293,64 @@ async fn update_alert_state( } } -fn get_final_value(records: Vec) -> f64 { +/// Extract group results from record batches, supporting both simple and GROUP BY queries +fn extract_group_results(records: Vec, plan: LogicalPlan) -> AlertQueryResult { trace!("records-\n{records:?}"); - if let Some(f) = records - .first() - .and_then(|batch| { - trace!("batch.column(0)-\n{:?}", batch.column(0)); - batch.column(0).as_any().downcast_ref::() - }) - .map(|array| { - trace!("array-\n{array:?}"); - array.value(0) - }) - { - f + let aggregate_aliases = extract_aggregate_aliases(&plan); + + // since there is going to be only one aggregate, we'll check if it is empty + if aggregate_aliases.is_empty() || records.is_empty() { + return AlertQueryResult { + groups: vec![], + is_simple_query: true, + }; + } + + // take the first entry and extract the column name / alias + let (agg_condition, alias) = &aggregate_aliases[0]; + + let alias = if let Some(alias) = alias { + alias } else { - records - .first() - .and_then(|batch| { - trace!("batch.column(0)-\n{:?}", batch.column(0)); - batch.column(0).as_any().downcast_ref::() - }) - .map(|array| { - trace!("array-\n{array:?}"); - array.value(0) - }) - .unwrap_or_default() as f64 + agg_condition + }; + + let first_batch = &records[0]; + let schema = first_batch.schema(); + + // Determine if this is a simple query (no GROUP BY) or a grouped query + let is_simple_query = schema.fields().len() == 1; + + let mut groups = Vec::new(); + + for batch in &records { + for row_index in 0..batch.num_rows() { + let mut group_values = HashMap::new(); + let mut aggregate_value = 0.0; + + // Extract values for each column + for (col_index, field) in schema.fields().iter().enumerate() { + let column = batch.column(col_index); + if field.name().eq(alias) { + aggregate_value = extract_numeric_value(column, row_index) + } else { + // This is a GROUP BY column + let value = extract_string_value(column, row_index); + group_values.insert(field.name().clone(), value); + } + } + + groups.push(GroupResult { + group_values, + aggregate_value, + }); + } + } + + AlertQueryResult { + groups, + is_simple_query, } } diff --git a/src/alerts/mod.rs b/src/alerts/mod.rs index f700f8ab9..35e488298 100644 --- a/src/alerts/mod.rs +++ b/src/alerts/mod.rs @@ -21,6 +21,7 @@ use arrow_schema::{ArrowError, DataType, Schema}; use async_trait::async_trait; use chrono::Utc; use datafusion::logical_expr::{LogicalPlan, Projection}; +use datafusion::prelude::Expr; use datafusion::sql::sqlparser::parser::ParserError; use derive_more::FromStrError; use http::StatusCode; @@ -29,7 +30,7 @@ use std::collections::HashMap; use std::fmt::Debug; use std::sync::Arc; use std::thread; -use std::time::Duration; +// use std::time::Duration; use tokio::sync::oneshot::{Receiver, Sender}; use tokio::sync::{RwLock, mpsc}; use tokio::task::JoinHandle; @@ -55,8 +56,8 @@ use crate::alerts::alert_traits::{AlertManagerTrait, AlertTrait}; use crate::alerts::alert_types::ThresholdAlert; use crate::alerts::target::{NotificationConfig, TARGETS}; use crate::handlers::http::fetch_schema; -use crate::handlers::http::query::create_streams_for_distributed; -use crate::option::Mode; +// use crate::handlers::http::query::create_streams_for_distributed; +// use crate::option::Mode; use crate::parseable::{PARSEABLE, StreamNotFound}; use crate::query::{QUERY_SESSION, resolve_stream_names}; use crate::rbac::map::SessionKey; @@ -581,65 +582,6 @@ impl AlertConfig { Ok(()) } - /// Validations - pub async fn validate(&self, session_key: SessionKey) -> Result<(), AlertError> { - // validate alert type - // Anomaly is only allowed in Prism - if self.alert_type.eq(&AlertType::Anomaly) && PARSEABLE.options.mode != Mode::Prism { - return Err(AlertError::CustomError( - "Anomaly alert is only allowed on Prism mode".into(), - )); - } - - // validate evalType - let eval_frequency = match &self.eval_config { - EvalConfig::RollingWindow(rolling_window) => { - if humantime::parse_duration(&rolling_window.eval_start).is_err() { - return Err(AlertError::Metadata( - "evalStart should be of type humantime", - )); - } - rolling_window.eval_frequency - } - }; - - // validate that target repeat notifs !> eval_frequency - match &self.notification_config.times { - target::Retry::Infinite => {} - target::Retry::Finite(repeat) => { - let notif_duration = - Duration::from_secs(60 * self.notification_config.interval) * *repeat as u32; - if (notif_duration.as_secs_f64()).gt(&((eval_frequency * 60) as f64)) { - return Err(AlertError::Metadata( - "evalFrequency should be greater than target repetition interval", - )); - } - } - } - - // validate that the query is valid - if self.query.is_empty() { - return Err(AlertError::InvalidAlertQuery); - } - - let tables = resolve_stream_names(&self.query)?; - if tables.is_empty() { - return Err(AlertError::InvalidAlertQuery); - } - create_streams_for_distributed(tables) - .await - .map_err(|_| AlertError::InvalidAlertQuery)?; - - // validate that the user has access to the tables mentioned in the query - user_auth_for_query(&session_key, &self.query).await?; - - // validate that the alert query is valid and can be evaluated - if !is_query_aggregate(&self.query).await? { - return Err(AlertError::InvalidAlertQuery); - } - Ok(()) - } - pub fn get_eval_frequency(&self) -> u64 { match &self.eval_config { EvalConfig::RollingWindow(rolling_window) => rolling_window.eval_frequency, @@ -708,6 +650,11 @@ impl AlertConfig { serde_json::Value::String(self.alert_type.to_string()), ); + map.insert( + "notificationState".to_string(), + serde_json::Value::String(self.notification_state.to_string()), + ); + map.insert( "id".to_string(), serde_json::Value::String(self.id.to_string()), @@ -749,7 +696,7 @@ impl AlertConfig { } /// Check if a query is an aggregate query that returns a single value without executing it -pub async fn is_query_aggregate(query: &str) -> Result { +pub async fn get_number_of_agg_exprs(query: &str) -> Result { let session_state = QUERY_SESSION.state(); // Parse the query into a logical plan @@ -759,21 +706,139 @@ pub async fn is_query_aggregate(query: &str) -> Result { .map_err(|err| AlertError::CustomError(format!("Failed to parse query: {err}")))?; // Check if the plan structure indicates an aggregate query - Ok(is_logical_plan_aggregate(&logical_plan)) + _get_number_of_agg_exprs(&logical_plan) +} + +/// Extract the projection which deals with aggregation +pub async fn get_aggregate_projection(query: &str) -> Result { + let session_state = QUERY_SESSION.state(); + + // Parse the query into a logical plan + let logical_plan = session_state + .create_logical_plan(query) + .await + .map_err(|err| AlertError::CustomError(format!("Failed to parse query: {err}")))?; + + // Check if the plan structure indicates an aggregate query + _get_aggregate_projection(&logical_plan) +} + +fn _get_aggregate_projection(plan: &LogicalPlan) -> Result { + match plan { + LogicalPlan::Aggregate(agg) => { + // let fields = exprlist_to_fields(&agg.aggr_expr, &agg.input)?; + match &agg.aggr_expr[0] { + datafusion::prelude::Expr::Alias(alias) => Ok(alias.name.clone()), + _ => Ok(agg.aggr_expr[0].name_for_alias()?), + } + } + // Projection over aggregate: SELECT COUNT(*) as total, SELECT AVG(col) as average + LogicalPlan::Projection(Projection { input, .. }) => _get_aggregate_projection(input), + // Do not consider any aggregates inside a subquery or recursive CTEs + LogicalPlan::Subquery(_) | LogicalPlan::RecursiveQuery(_) => { + Err(AlertError::InvalidAlertQuery("Subquery not allowed".into())) + } + // Recursively check wrapped plans (Filter, Limit, Sort, etc.) + _ => { + // Use inputs() method to get all input plans and recursively search + for input in plan.inputs() { + if let Ok(result) = _get_aggregate_projection(input) { + return Ok(result); + } + } + Err(AlertError::InvalidAlertQuery( + "No aggregate projection found".into(), + )) + } + } +} + +/// Extracts aliases for aggregate functions from a DataFusion logical plan +pub fn extract_aggregate_aliases(plan: &LogicalPlan) -> Vec<(String, Option)> { + let mut aliases = Vec::new(); + + if let LogicalPlan::Projection(projection) = plan { + // Check if this projection contains aliased aggregates + for expr in &projection.expr { + if let Some((agg_name, alias)) = extract_alias_from_expr(expr) { + aliases.push((agg_name, alias)); + } + } + } + + // Recursively check child plans + for input in plan.inputs() { + aliases.extend(extract_aggregate_aliases(input)); + } + + aliases +} + +/// Extracts aggregate function name and alias from an expression +fn extract_alias_from_expr(expr: &Expr) -> Option<(String, Option)> { + match expr { + Expr::Alias(alias_expr) => { + // This is an aliased expression + let alias_name = alias_expr.name.clone(); + + match alias_expr.expr.as_ref() { + Expr::AggregateFunction(agg_func) => { + let agg_name = format!("{:?}", agg_func.func); + Some((agg_name, Some(alias_name))) + } + // Handle other aggregate expressions like Count, etc. + _ => { + // Check if the inner expression is an aggregate + let expr_str = format!("{:?}", alias_expr.expr); + if expr_str.contains("count") + || expr_str.contains("sum") + || expr_str.contains("avg") + || expr_str.contains("min") + || expr_str.contains("max") + { + Some((expr_str, Some(alias_name))) + } else { + None + } + } + } + } + Expr::AggregateFunction(agg_func) => { + // Unaliased aggregate function + let agg_name = format!("{:?}", agg_func.func); + Some((agg_name, None)) + } + Expr::Column(column_expr) => { + // This might be an un-aliased aggregate expression + if column_expr.name().contains("count") + || column_expr.name().contains("sum") + || column_expr.name().contains("avg") + || column_expr.name().contains("min") + || column_expr.name().contains("max") + { + Some((column_expr.name.clone(), None)) + } else { + None + } + } + _ => None, + } } /// Analyze a logical plan to determine if it represents an aggregate query -pub fn is_logical_plan_aggregate(plan: &LogicalPlan) -> bool { +/// +/// Returns the number of aggregate expressions found in the plan +fn _get_number_of_agg_exprs(plan: &LogicalPlan) -> Result { match plan { // Direct aggregate: SELECT COUNT(*), AVG(col), etc. - LogicalPlan::Aggregate(_) => true, + LogicalPlan::Aggregate(agg) => Ok(agg.aggr_expr.len()), // Projection over aggregate: SELECT COUNT(*) as total, SELECT AVG(col) as average - LogicalPlan::Projection(Projection { input, expr, .. }) => { - // Check if input contains an aggregate and we have exactly one expression - let is_aggregate_input = is_logical_plan_aggregate(input); - let single_expr = expr.len() == 1; - is_aggregate_input && single_expr + LogicalPlan::Projection(Projection { input, .. }) => _get_number_of_agg_exprs(input), + + // Do not consider any aggregates inside a subquery or recursive CTEs + LogicalPlan::Subquery(_) | LogicalPlan::RecursiveQuery(_) => { + Err(AlertError::InvalidAlertQuery("Subquery not allowed".into())) } // Recursively check wrapped plans (Filter, Limit, Sort, etc.) @@ -781,7 +846,8 @@ pub fn is_logical_plan_aggregate(plan: &LogicalPlan) -> bool { // Use inputs() method to get all input plans plan.inputs() .iter() - .any(|input| is_logical_plan_aggregate(input)) + .map(|input| _get_number_of_agg_exprs(input)) + .sum() } } } @@ -820,16 +886,18 @@ pub enum AlertError { TargetInUse, #[error("{0}")] ParserError(#[from] ParserError), - #[error("Invalid alert query")] - InvalidAlertQuery, + #[error("Invalid alert query: {0}")] + InvalidAlertQuery(String), #[error("Invalid query parameter")] InvalidQueryParameter, #[error("{0}")] ArrowError(#[from] ArrowError), #[error("Upgrade to Parseable Enterprise for {0} type alerts")] - NotPresentInOSS(String), + NotPresentInOSS(&'static str), #[error("{0}")] Unimplemented(String), + #[error("{0}")] + ValidationFailure(String), } impl actix_web::ResponseError for AlertError { @@ -851,8 +919,9 @@ impl actix_web::ResponseError for AlertError { Self::InvalidTargetModification(_) => StatusCode::BAD_REQUEST, Self::TargetInUse => StatusCode::CONFLICT, Self::ParserError(_) => StatusCode::BAD_REQUEST, - Self::InvalidAlertQuery => StatusCode::BAD_REQUEST, + Self::InvalidAlertQuery(_) => StatusCode::BAD_REQUEST, Self::InvalidQueryParameter => StatusCode::BAD_REQUEST, + Self::ValidationFailure(_) => StatusCode::BAD_REQUEST, Self::ArrowError(_) => StatusCode::INTERNAL_SERVER_ERROR, Self::Unimplemented(_) => StatusCode::INTERNAL_SERVER_ERROR, Self::NotPresentInOSS(_) => StatusCode::BAD_REQUEST, @@ -934,20 +1003,20 @@ impl AlertManagerTrait for Alerts { AlertType::Threshold => { Box::new(ThresholdAlert::from(alert)) as Box } - AlertType::Anomaly => { + AlertType::Anomaly(_) => { return Err(anyhow::Error::msg( - AlertError::NotPresentInOSS("anomaly".into()).to_string(), + AlertError::NotPresentInOSS("anomaly").to_string(), )); } - AlertType::Forecast => { + AlertType::Forecast(_) => { return Err(anyhow::Error::msg( - AlertError::NotPresentInOSS("forecast".into()).to_string(), + AlertError::NotPresentInOSS("forecast").to_string(), )); } }; // Create alert task iff alert's state is not paused - if alert.get_state().eq(&AlertState::Paused) { + if alert.get_state().eq(&AlertState::Disabled) { map.insert(*alert.get_id(), alert); continue; } @@ -1040,11 +1109,11 @@ impl AlertManagerTrait for Alerts { AlertType::Threshold => { Box::new(ThresholdAlert::from(alert.to_alert_config())) as Box } - AlertType::Anomaly => { - return Err(AlertError::NotPresentInOSS("anomaly".into())); + AlertType::Anomaly(_) => { + return Err(AlertError::NotPresentInOSS("anomaly")); } - AlertType::Forecast => { - return Err(AlertError::NotPresentInOSS("forecast".into())); + AlertType::Forecast(_) => { + return Err(AlertError::NotPresentInOSS("forecast")); } } } else { @@ -1053,15 +1122,21 @@ impl AlertManagerTrait for Alerts { ))); }; - // if new state is Paused then ensure that the task is removed from list - if new_state.eq(&AlertState::Paused) { + // if new state is Disabled then ensure that the task is removed from list + if new_state.eq(&AlertState::Disabled) { + if alert.get_state().eq(&AlertState::Disabled) { + return Err(AlertError::InvalidStateChange( + "Can't disable an alert which is currently disabled".into(), + )); + } + self.sender .send(AlertTask::Delete(alert_id)) .await .map_err(|e| AlertError::CustomError(e.to_string()))?; } // user has resumed evals for this alert - else if alert.get_state().eq(&AlertState::Paused) + else if alert.get_state().eq(&AlertState::Disabled) && new_state.eq(&AlertState::NotTriggered) { self.sender @@ -1089,11 +1164,11 @@ impl AlertManagerTrait for Alerts { AlertType::Threshold => { Box::new(ThresholdAlert::from(alert.to_alert_config())) as Box } - AlertType::Anomaly => { - return Err(AlertError::NotPresentInOSS("anomaly".into())); + AlertType::Anomaly(_) => { + return Err(AlertError::NotPresentInOSS("anomaly")); } - AlertType::Forecast => { - return Err(AlertError::NotPresentInOSS("forecast".into())); + AlertType::Forecast(_) => { + return Err(AlertError::NotPresentInOSS("forecast")); } } } else { @@ -1201,7 +1276,7 @@ pub async fn get_alerts_summary() -> Result { severity: alert.get_severity().clone(), }); } - AlertState::Paused => { + AlertState::Disabled => { paused += 1; paused_alerts.push(AlertsInfo { title: alert.get_title().to_string(), diff --git a/src/alerts/target.rs b/src/alerts/target.rs index 65b8e6e5e..647105bff 100644 --- a/src/alerts/target.rs +++ b/src/alerts/target.rs @@ -259,7 +259,7 @@ impl Target { } // do not send out any notifs // (an eval should not have run!) - AlertState::Paused => {} + AlertState::Disabled => {} } } @@ -451,8 +451,8 @@ impl CallableTarget for SlackWebHook { AlertState::NotTriggered => { serde_json::json!({ "text": payload.default_resolved_string() }) } - AlertState::Paused => { - serde_json::json!({ "text": payload.default_paused_string() }) + AlertState::Disabled => { + serde_json::json!({ "text": payload.default_disabled_string() }) } }; @@ -487,7 +487,7 @@ impl CallableTarget for OtherWebHook { let alert = match payload.alert_info.alert_state { AlertState::Triggered => payload.default_alert_string(), AlertState::NotTriggered => payload.default_resolved_string(), - AlertState::Paused => payload.default_paused_string(), + AlertState::Disabled => payload.default_disabled_string(), }; let request = client @@ -571,14 +571,14 @@ impl CallableTarget for AlertManager { .to_rfc3339_opts(chrono::SecondsFormat::Millis, true) .into(); } - AlertState::Paused => alert["labels"]["status"] = "paused".into(), // AlertState::Silenced => { - // alert["labels"]["status"] = "silenced".into(); - // alert["annotations"]["reason"] = - // serde_json::Value::String(payload.default_silenced_string()); - // // alert["endsAt"] = Utc::now() - // // .to_rfc3339_opts(chrono::SecondsFormat::Millis, true) - // // .into(); - // } + AlertState::Disabled => alert["labels"]["status"] = "paused".into(), // AlertState::Silenced => { + // alert["labels"]["status"] = "silenced".into(); + // alert["annotations"]["reason"] = + // serde_json::Value::String(payload.default_silenced_string()); + // // alert["endsAt"] = Utc::now() + // // .to_rfc3339_opts(chrono::SecondsFormat::Millis, true) + // // .into(); + // } }; if let Err(e) = client @@ -595,6 +595,7 @@ impl CallableTarget for AlertManager { #[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] pub struct NotificationConfig { pub interval: u64, + #[serde(skip)] #[serde(default = "Retry::default")] pub times: Retry, #[serde(skip)] diff --git a/src/catalog/manifest.rs b/src/catalog/manifest.rs index ad5b32422..b091e7b0a 100644 --- a/src/catalog/manifest.rs +++ b/src/catalog/manifest.rs @@ -112,13 +112,12 @@ pub fn create_from_parquet_file( let columns = column_statistics(row_groups); manifest_file.columns = columns.into_values().collect(); let mut sort_orders = sort_order(row_groups); - if let Some(last_sort_order) = sort_orders.pop() { - if sort_orders + if let Some(last_sort_order) = sort_orders.pop() + && sort_orders .into_iter() .all(|sort_order| sort_order == last_sort_order) - { - manifest_file.sort_order_id = last_sort_order; - } + { + manifest_file.sort_order_id = last_sort_order; } Ok(manifest_file) diff --git a/src/catalog/mod.rs b/src/catalog/mod.rs index a7c37e116..349d35b4f 100644 --- a/src/catalog/mod.rs +++ b/src/catalog/mod.rs @@ -268,27 +268,27 @@ async fn create_manifest( ..Manifest::default() }; let mut first_event_at = PARSEABLE.get_stream(stream_name)?.get_first_event(); - if first_event_at.is_none() { - if let Some(first_event) = manifest.files.first() { - let time_partition = &meta.time_partition; - let lower_bound = match time_partition { - Some(time_partition) => { - let (lower_bound, _) = get_file_bounds(first_event, time_partition.to_string()); - lower_bound - } - None => { - let (lower_bound, _) = - get_file_bounds(first_event, DEFAULT_TIMESTAMP_KEY.to_string()); - lower_bound - } - }; - first_event_at = Some(lower_bound.with_timezone(&Local).to_rfc3339()); - match PARSEABLE.get_stream(stream_name) { - Ok(stream) => stream.set_first_event_at(first_event_at.as_ref().unwrap()), - Err(err) => error!( - "Failed to update first_event_at in streaminfo for stream {stream_name:?}, error = {err:?}" - ), + if first_event_at.is_none() + && let Some(first_event) = manifest.files.first() + { + let time_partition = &meta.time_partition; + let lower_bound = match time_partition { + Some(time_partition) => { + let (lower_bound, _) = get_file_bounds(first_event, time_partition.to_string()); + lower_bound } + None => { + let (lower_bound, _) = + get_file_bounds(first_event, DEFAULT_TIMESTAMP_KEY.to_string()); + lower_bound + } + }; + first_event_at = Some(lower_bound.with_timezone(&Local).to_rfc3339()); + match PARSEABLE.get_stream(stream_name) { + Ok(stream) => stream.set_first_event_at(first_event_at.as_ref().unwrap()), + Err(err) => error!( + "Failed to update first_event_at in streaminfo for stream {stream_name:?}, error = {err:?}" + ), } } @@ -366,8 +366,8 @@ pub async fn get_first_event( Mode::All | Mode::Ingest => { // get current snapshot let stream_first_event = PARSEABLE.get_stream(stream_name)?.get_first_event(); - if stream_first_event.is_some() { - first_event_at = stream_first_event.unwrap(); + if let Some(event) = stream_first_event { + first_event_at = event; } else { let mut meta = storage.get_object_store_format(stream_name).await?; let meta_clone = meta.clone(); diff --git a/src/event/format/json.rs b/src/event/format/json.rs index 68cd50d26..9d8515950 100644 --- a/src/event/format/json.rs +++ b/src/event/format/json.rs @@ -315,10 +315,8 @@ fn valid_type( fn validate_int(value: &Value, static_schema_flag: bool) -> bool { // allow casting string to int for static schema - if static_schema_flag { - if let Value::String(s) = value { - return s.trim().parse::().is_ok(); - } + if static_schema_flag && let Value::String(s) = value { + return s.trim().parse::().is_ok(); } value.is_i64() } diff --git a/src/handlers/http/alerts.rs b/src/handlers/http/alerts.rs index 61d0d92ff..ae87ae676 100644 --- a/src/handlers/http/alerts.rs +++ b/src/handlers/http/alerts.rs @@ -25,6 +25,7 @@ use crate::{ alert_structs::{AlertConfig, AlertRequest}, alert_traits::AlertTrait, alert_types::ThresholdAlert, + target::Retry, }, parseable::PARSEABLE, storage::object_storage::alert_json_path, @@ -45,16 +46,16 @@ pub async fn list(req: HttpRequest) -> Result { let query_map = web::Query::>::from_query(req.query_string()) .map_err(|_| AlertError::InvalidQueryParameter)?; let mut tags_list = Vec::new(); - if !query_map.is_empty() { - if let Some(tags) = query_map.get("tags") { - tags_list = tags - .split(',') - .map(|s| s.trim().to_string()) - .filter(|s| !s.is_empty()) - .collect(); - if tags_list.is_empty() { - return Err(AlertError::InvalidQueryParameter); - } + if !query_map.is_empty() + && let Some(tags) = query_map.get("tags") + { + tags_list = tags + .split(',') + .map(|s| s.trim().to_string()) + .filter(|s| !s.is_empty()) + .collect(); + if tags_list.is_empty() { + return Err(AlertError::InvalidQueryParameter); } } let guard = ALERTS.read().await; @@ -77,7 +78,29 @@ pub async fn post( req: HttpRequest, Json(alert): Json, ) -> Result { - let alert: AlertConfig = alert.into().await?; + let mut alert: AlertConfig = alert.into().await?; + + if alert.get_eval_frequency().eq(&0) { + return Err(AlertError::ValidationFailure( + "Eval frequency cannot be 0".into(), + )); + } + if alert.notification_config.interval.eq(&0) { + return Err(AlertError::ValidationFailure( + "Notification interval cannot be 0".into(), + )); + } + + // calculate the `times` for notification config + let eval_freq = alert.get_eval_frequency(); + let notif_freq = alert.notification_config.interval; + let times = if (eval_freq / notif_freq) == 0 { + 1 + } else { + (eval_freq / notif_freq) as usize + }; + + alert.notification_config.times = Retry::Finite(times); let threshold_alert; let alert: &dyn AlertTrait = match &alert.alert_type { @@ -85,11 +108,11 @@ pub async fn post( threshold_alert = ThresholdAlert::from(alert); &threshold_alert } - AlertType::Anomaly => { - return Err(AlertError::NotPresentInOSS("anomaly".into())); + AlertType::Anomaly(_) => { + return Err(AlertError::NotPresentInOSS("anomaly")); } - AlertType::Forecast => { - return Err(AlertError::NotPresentInOSS("forecast".into())); + AlertType::Forecast(_) => { + return Err(AlertError::NotPresentInOSS("forecast")); } }; @@ -177,7 +200,7 @@ pub async fn delete(req: HttpRequest, alert_id: Path) -> Result {} - NotificationState::Snoozed(till_time) => { + NotificationState::Mute(till_time) => { let time_range = TimeRange::parse_human_time(till_time, "now") .map_err(|e| AlertError::CustomError(format!("Invalid time value passed- {e}")))?; @@ -221,10 +244,10 @@ pub async fn update_notification_state( Ok(web::Json(alert.to_alert_config())) } -// PUT /alerts/{alert_id}/pause +// PATCH /alerts/{alert_id}/disable /// first save on disk, then in memory /// then modify scheduled task -pub async fn pause_alert( +pub async fn disable_alert( req: HttpRequest, alert_id: Path, ) -> Result { @@ -244,17 +267,17 @@ pub async fn pause_alert( user_auth_for_query(&session_key, alert.get_query()).await?; alerts - .update_state(alert_id, AlertState::Paused, Some("".into())) + .update_state(alert_id, AlertState::Disabled, Some("".into())) .await?; let alert = alerts.get_alert_by_id(alert_id).await?; Ok(web::Json(alert.to_alert_config())) } -// PUT /alerts/{alert_id}/resume +// PATCH /alerts/{alert_id}/enable /// first save on disk, then in memory /// then modify scheduled task -pub async fn resume_alert( +pub async fn enable_alert( req: HttpRequest, alert_id: Path, ) -> Result { @@ -270,6 +293,14 @@ pub async fn resume_alert( // check if alert id exists in map let alert = alerts.get_alert_by_id(alert_id).await?; + + // only run if alert is disabled + if alert.get_state().ne(&AlertState::Disabled) { + return Err(AlertError::InvalidStateChange( + "Can't enable an alert which is not currently disabled".into(), + )); + } + // validate that the user has access to the tables mentioned in the query user_auth_for_query(&session_key, alert.get_query()).await?; @@ -281,6 +312,122 @@ pub async fn resume_alert( Ok(web::Json(alert.to_alert_config())) } +// PUT /alerts/{alert_id} +/// first save on disk, then in memory +/// then modify scheduled task +pub async fn modify_alert( + req: HttpRequest, + alert_id: Path, + Json(alert_request): Json, +) -> Result { + let session_key = extract_session_key_from_req(&req)?; + let alert_id = alert_id.into_inner(); + + let guard = ALERTS.write().await; + let alerts = if let Some(alerts) = guard.as_ref() { + alerts + } else { + return Err(AlertError::CustomError("No AlertManager set".into())); + }; + + // check if alert id exists in map + let alert = alerts.get_alert_by_id(alert_id).await?; + + // validate that the user has access to the tables mentioned in the query + + user_auth_for_query(&session_key, alert.get_query()).await?; + // validate the request + let mut new_config = alert_request.into().await?; + + if &new_config.alert_type != alert.get_alert_type() { + return Err(AlertError::InvalidAlertModifyRequest); + } + + user_auth_for_query(&session_key, &new_config.query).await?; + + // calculate the `times` for notification config + let eval_freq = new_config.get_eval_frequency(); + let notif_freq = new_config.notification_config.interval; + let times = if (eval_freq / notif_freq) == 0 { + 1 + } else { + (eval_freq / notif_freq) as usize + }; + + new_config.notification_config.times = Retry::Finite(times); + + let mut old_config = alert.to_alert_config(); + old_config.threshold_config = new_config.threshold_config; + old_config.datasets = new_config.datasets; + old_config.eval_config = new_config.eval_config; + old_config.notification_config = new_config.notification_config; + old_config.query = new_config.query; + old_config.severity = new_config.severity; + old_config.tags = new_config.tags; + old_config.targets = new_config.targets; + old_config.title = new_config.title; + + let threshold_alert; + let new_alert: &dyn AlertTrait = match &new_config.alert_type { + AlertType::Threshold => { + threshold_alert = ThresholdAlert::from(old_config); + &threshold_alert + } + AlertType::Anomaly(_) => { + return Err(AlertError::NotPresentInOSS("anomaly")); + } + AlertType::Forecast(_) => { + return Err(AlertError::NotPresentInOSS("forecast")); + } + }; + + // remove the task + alerts.delete_task(alert_id).await?; + + // remove alert from memory + alerts.delete(alert_id).await?; + + // move on to saving the alert in ObjectStore + alerts.update(new_alert).await; + + let path = alert_json_path(*new_alert.get_id()); + + let store = PARSEABLE.storage.get_object_store(); + let alert_bytes = serde_json::to_vec(&new_alert.to_alert_config())?; + store.put_object(&path, Bytes::from(alert_bytes)).await?; + + let config = new_alert.to_alert_config(); + + // start the task + alerts.start_task(new_alert.clone_box()).await?; + + Ok(web::Json(config)) +} + +// PUT /alerts/{alert_id}/evaluate_alert +pub async fn evaluate_alert(alert_id: Path) -> Result { + let alert_id = alert_id.into_inner(); + + let guard = ALERTS.write().await; + let alerts = if let Some(alerts) = guard.as_ref() { + alerts + } else { + return Err(AlertError::CustomError("No AlertManager set".into())); + }; + + let alert = alerts.get_alert_by_id(alert_id).await?; + + let config = alert.to_alert_config(); + + // remove task + alerts.delete_task(alert_id).await?; + + // add the task back again so that it evaluates right now + alerts.start_task(alert).await?; + + Ok(Json(config)) +} + pub async fn list_tags() -> Result { let guard = ALERTS.read().await; let alerts = if let Some(alerts) = guard.as_ref() { diff --git a/src/handlers/http/audit.rs b/src/handlers/http/audit.rs index f30e89646..6bdfc3b34 100644 --- a/src/handlers/http/audit.rs +++ b/src/handlers/http/audit.rs @@ -51,10 +51,10 @@ pub async fn audit_log_middleware( log_builder = log_builder.with_stream(message.common_attributes.x_p_stream); } else if let Some(stream) = req.match_info().get("logstream") { log_builder = log_builder.with_stream(stream); - } else if let Some(value) = req.headers().get(STREAM_NAME_HEADER_KEY) { - if let Ok(stream) = value.to_str() { - log_builder = log_builder.with_stream(stream); - } + } else if let Some(value) = req.headers().get(STREAM_NAME_HEADER_KEY) + && let Ok(stream) = value.to_str() + { + log_builder = log_builder.with_stream(stream); } // Get username and authorization method diff --git a/src/handlers/http/cluster/mod.rs b/src/handlers/http/cluster/mod.rs index 568f912a8..c7c146daa 100644 --- a/src/handlers/http/cluster/mod.rs +++ b/src/handlers/http/cluster/mod.rs @@ -1073,25 +1073,25 @@ pub fn init_cluster_metrics_schedular() -> Result<(), PostError> { .run(move || async { let result: Result<(), PostError> = async { let cluster_metrics = fetch_cluster_metrics().await; - if let Ok(metrics) = cluster_metrics { - if !metrics.is_empty() { - info!("Cluster metrics fetched successfully from all ingestors"); - if let Ok(metrics_bytes) = serde_json::to_vec(&metrics) { - if matches!( - ingest_internal_stream( - INTERNAL_STREAM_NAME.to_string(), - bytes::Bytes::from(metrics_bytes), - ) - .await, - Ok(()) - ) { - info!("Cluster metrics successfully ingested into internal stream"); - } else { - error!("Failed to ingest cluster metrics into internal stream"); - } + if let Ok(metrics) = cluster_metrics + && !metrics.is_empty() + { + info!("Cluster metrics fetched successfully from all ingestors"); + if let Ok(metrics_bytes) = serde_json::to_vec(&metrics) { + if matches!( + ingest_internal_stream( + INTERNAL_STREAM_NAME.to_string(), + bytes::Bytes::from(metrics_bytes), + ) + .await, + Ok(()) + ) { + info!("Cluster metrics successfully ingested into internal stream"); } else { - error!("Failed to serialize cluster metrics"); + error!("Failed to ingest cluster metrics into internal stream"); } + } else { + error!("Failed to serialize cluster metrics"); } } Ok(()) @@ -1186,21 +1186,21 @@ async fn get_available_querier() -> Result { }); // Find the next available querier using round-robin strategy - if let Some(selected_domain) = select_next_querier(&mut map).await { - if let Some(status) = map.get_mut(&selected_domain) { - status.available = false; - status.last_used = Some(Instant::now()); - return Ok(status.metadata.clone()); - } + if let Some(selected_domain) = select_next_querier(&mut map).await + && let Some(status) = map.get_mut(&selected_domain) + { + status.available = false; + status.last_used = Some(Instant::now()); + return Ok(status.metadata.clone()); } // If no querier is available, use least-recently-used strategy - if let Some(selected_domain) = select_least_recently_used_querier(&mut map) { - if let Some(status) = map.get_mut(&selected_domain) { - status.available = false; - status.last_used = Some(Instant::now()); - return Ok(status.metadata.clone()); - } + if let Some(selected_domain) = select_least_recently_used_querier(&mut map) + && let Some(status) = map.get_mut(&selected_domain) + { + status.available = false; + status.last_used = Some(Instant::now()); + return Ok(status.metadata.clone()); } // If no querier is available, return an error diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index ab68fce41..5c5bdd4ad 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -68,10 +68,10 @@ pub async fn delete(stream_name: Path) -> Result) -> Result = cluster::get_node_info(NodeType::Ingestor) diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs index b11e3897f..b629bbab1 100644 --- a/src/handlers/http/modal/server.rs +++ b/src/handlers/http/modal/server.rs @@ -267,7 +267,7 @@ impl Server { .route(web::get().to(alerts::get).authorize(Action::GetAlert)) .route( web::put() - .to(alerts::update_notification_state) + .to(alerts::modify_alert) .authorize(Action::PutAlert), ) .route( @@ -277,16 +277,30 @@ impl Server { ), ) .service( - web::resource("/{alert_id}/pause").route( - web::put() - .to(alerts::pause_alert) + web::resource("/{alert_id}/disable").route( + web::patch() + .to(alerts::disable_alert) + .authorize(Action::PutAlert), + ), + ) + .service( + web::resource("/{alert_id}/enable").route( + web::patch() + .to(alerts::enable_alert) + .authorize(Action::PutAlert), + ), + ) + .service( + web::resource("/{alert_id}/update_notification_state").route( + web::patch() + .to(alerts::update_notification_state) .authorize(Action::PutAlert), ), ) .service( - web::resource("/{alert_id}/resume").route( + web::resource("/{alert_id}/evaluate_alert").route( web::put() - .to(alerts::resume_alert) + .to(alerts::evaluate_alert) .authorize(Action::PutAlert), ), ) diff --git a/src/handlers/http/modal/ssl_acceptor.rs b/src/handlers/http/modal/ssl_acceptor.rs index 850b4868b..7d9744d0b 100644 --- a/src/handlers/http/modal/ssl_acceptor.rs +++ b/src/handlers/http/modal/ssl_acceptor.rs @@ -38,17 +38,17 @@ pub fn get_ssl_acceptor( let mut certs = rustls_pemfile::certs(cert_file).collect::, _>>()?; // Load CA certificates from the directory - if let Some(other_cert_dir) = other_certs { - if other_cert_dir.is_dir() { - for entry in fs::read_dir(other_cert_dir)? { - let path = entry.unwrap().path(); + if let Some(other_cert_dir) = other_certs + && other_cert_dir.is_dir() + { + for entry in fs::read_dir(other_cert_dir)? { + let path = entry.unwrap().path(); - if path.is_file() { - let other_cert_file = &mut BufReader::new(File::open(&path)?); - let mut other_certs = rustls_pemfile::certs(other_cert_file) - .collect::, _>>()?; - certs.append(&mut other_certs); - } + if path.is_file() { + let other_cert_file = &mut BufReader::new(File::open(&path)?); + let mut other_certs = rustls_pemfile::certs(other_cert_file) + .collect::, _>>()?; + certs.append(&mut other_certs); } } } diff --git a/src/handlers/http/modal/utils/ingest_utils.rs b/src/handlers/http/modal/utils/ingest_utils.rs index 2e84cbbf9..79a058678 100644 --- a/src/handlers/http/modal/utils/ingest_utils.rs +++ b/src/handlers/http/modal/utils/ingest_utils.rs @@ -174,34 +174,35 @@ pub fn get_custom_fields_from_header(req: &HttpRequest) -> HashMap MAX_FIELD_VALUE_LENGTH { - warn!( - "Header value for '{}' exceeds maximum length, truncating", - header_name - ); - &value[..MAX_FIELD_VALUE_LENGTH] - } else { - value - }; - p_custom_fields.insert(key.to_string(), truncated_value.to_string()); - } else { + if header_name.starts_with("x-p-") + && !IGNORE_HEADERS.contains(&header_name) + && let Ok(value) = header_value.to_str() + { + let key = header_name.trim_start_matches("x-p-"); + if !key.is_empty() { + // Truncate value if it exceeds the maximum length + let truncated_value = if value.len() > MAX_FIELD_VALUE_LENGTH { warn!( - "Ignoring header with empty key after prefix: {}", + "Header value for '{}' exceeds maximum length, truncating", header_name ); - } + &value[..MAX_FIELD_VALUE_LENGTH] + } else { + value + }; + p_custom_fields.insert(key.to_string(), truncated_value.to_string()); + } else { + warn!( + "Ignoring header with empty key after prefix: {}", + header_name + ); } } - if header_name == LOG_SOURCE_KEY { - if let Ok(value) = header_value.to_str() { - p_custom_fields.insert(FORMAT_KEY.to_string(), value.to_string()); - } + if header_name == LOG_SOURCE_KEY + && let Ok(value) = header_value.to_str() + { + p_custom_fields.insert(FORMAT_KEY.to_string(), value.to_string()); } } diff --git a/src/handlers/http/query.rs b/src/handlers/http/query.rs index bd2870138..dd8d1dc10 100644 --- a/src/handlers/http/query.rs +++ b/src/handlers/http/query.rs @@ -346,7 +346,7 @@ pub async fn get_counts( let body = counts_request.into_inner(); // does user have access to table? - user_auth_for_datasets(&permissions, &[body.stream.clone()]).await?; + user_auth_for_datasets(&permissions, std::slice::from_ref(&body.stream)).await?; // if the user has given a sql query (counts call with filters applied), then use this flow // this could include filters or group by diff --git a/src/handlers/livetail.rs b/src/handlers/livetail.rs index 517b56d7c..90e0b0844 100644 --- a/src/handlers/livetail.rs +++ b/src/handlers/livetail.rs @@ -273,7 +273,7 @@ fn extract_basic_auth(header: &MetadataMap) -> Option { .and_then(|value| Credentials::from_header(value.to_string()).ok()) } -fn extract_cookie(header: &MetadataMap) -> Option { +fn extract_cookie(header: &MetadataMap) -> Option> { // extract the cookie from the request let cookies = header.get_all("cookie"); let cookies: Vec<_> = cookies diff --git a/src/hottier.rs b/src/hottier.rs index 45c2af65f..b01a344d3 100644 --- a/src/hottier.rs +++ b/src/hottier.rs @@ -588,11 +588,10 @@ impl HotTierManager { if let (Some(download_date_time), Some(delete_date_time)) = ( extract_datetime(download_file_path.to_str().unwrap()), extract_datetime(path_to_delete.to_str().unwrap()), - ) { - if download_date_time <= delete_date_time { - delete_successful = false; - break 'loop_files; - } + ) && download_date_time <= delete_date_time + { + delete_successful = false; + break 'loop_files; } fs::write(manifest_file.path(), serde_json::to_vec(&manifest)?).await?; diff --git a/src/livetail.rs b/src/livetail.rs index 5723ec829..59827df31 100644 --- a/src/livetail.rs +++ b/src/livetail.rs @@ -160,10 +160,10 @@ impl Stream for ReceiverPipe { // drop sender on map when going out of scope impl Drop for ReceiverPipe { fn drop(&mut self) { - if let Some(map) = self._ref.upgrade() { - if let Some(pipes) = map.write().unwrap().get_mut(&self.stream) { - pipes.retain(|x| x.id != self.id) - } + if let Some(map) = self._ref.upgrade() + && let Some(pipes) = map.write().unwrap().get_mut(&self.stream) + { + pipes.retain(|x| x.id != self.id) } } } diff --git a/src/metadata.rs b/src/metadata.rs index 9b706993a..1e7061bfb 100644 --- a/src/metadata.rs +++ b/src/metadata.rs @@ -143,25 +143,23 @@ pub async fn update_data_type_time_partition( schema: &mut Schema, time_partition: Option<&String>, ) -> anyhow::Result<()> { - if let Some(time_partition) = time_partition { - if let Ok(time_partition_field) = schema.field_with_name(time_partition) { - if time_partition_field.data_type() != &DataType::Timestamp(TimeUnit::Millisecond, None) - { - let mut fields = schema - .fields() - .iter() - .filter(|field| field.name() != time_partition) - .cloned() - .collect::>>(); - let time_partition_field = Arc::new(Field::new( - time_partition, - DataType::Timestamp(TimeUnit::Millisecond, None), - true, - )); - fields.push(time_partition_field); - *schema = Schema::new(fields); - } - } + if let Some(time_partition) = time_partition + && let Ok(time_partition_field) = schema.field_with_name(time_partition) + && time_partition_field.data_type() != &DataType::Timestamp(TimeUnit::Millisecond, None) + { + let mut fields = schema + .fields() + .iter() + .filter(|field| field.name() != time_partition) + .cloned() + .collect::>>(); + let time_partition_field = Arc::new(Field::new( + time_partition, + DataType::Timestamp(TimeUnit::Millisecond, None), + true, + )); + fields.push(time_partition_field); + *schema = Schema::new(fields); } Ok(()) diff --git a/src/migration/metadata_migration.rs b/src/migration/metadata_migration.rs index 9ee38708b..bc02a056b 100644 --- a/src/migration/metadata_migration.rs +++ b/src/migration/metadata_migration.rs @@ -85,10 +85,10 @@ pub fn v2_v3(mut storage_metadata: JsonValue) -> JsonValue { if !privileges.is_empty() { for privilege in privileges.iter_mut() { let privilege_value = privilege.get_mut("privilege"); - if let Some(value) = privilege_value { - if value.as_str().unwrap() == "ingester" { - *value = JsonValue::String("ingestor".to_string()); - } + if let Some(value) = privilege_value + && value.as_str().unwrap() == "ingester" + { + *value = JsonValue::String("ingestor".to_string()); } } let role_name = @@ -124,10 +124,10 @@ pub fn v3_v4(mut storage_metadata: JsonValue) -> JsonValue { }; for privilege in privileges.iter_mut() { let privilege_value = privilege.get_mut("privilege"); - if let Some(value) = privilege_value { - if value.as_str().unwrap() == "ingester" { - *value = JsonValue::String("ingestor".to_string()); - } + if let Some(value) = privilege_value + && value.as_str().unwrap() == "ingester" + { + *value = JsonValue::String("ingestor".to_string()); } } } @@ -185,10 +185,10 @@ pub fn v5_v6(mut storage_metadata: JsonValue) -> JsonValue { for (_, role_permissions) in roles.iter_mut() { if let JsonValue::Array(permissions) = role_permissions { for permission in permissions.iter_mut() { - if let JsonValue::Object(perm_obj) = permission { - if let Some(JsonValue::Object(resource)) = perm_obj.get_mut("resource") { - resource.remove("tag"); - } + if let JsonValue::Object(perm_obj) = permission + && let Some(JsonValue::Object(resource)) = perm_obj.get_mut("resource") + { + resource.remove("tag"); } } } diff --git a/src/otel/otel_utils.rs b/src/otel/otel_utils.rs index 0442795ca..d94054046 100644 --- a/src/otel/otel_utils.rs +++ b/src/otel/otel_utils.rs @@ -169,10 +169,10 @@ pub fn insert_if_some(map: &mut Map, key: &str, opti } pub fn insert_number_if_some(map: &mut Map, key: &str, option: &Option) { - if let Some(value) = option { - if let Some(number) = serde_json::Number::from_f64(*value) { - map.insert(key.to_string(), Value::Number(number)); - } + if let Some(value) = option + && let Some(number) = serde_json::Number::from_f64(*value) + { + map.insert(key.to_string(), Value::Number(number)); } } diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index f20341618..ac2fce266 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -449,16 +449,16 @@ impl Stream { let mut schemas: Vec = Vec::new(); for file in dir.flatten() { - if let Some(ext) = file.path().extension() { - if ext.eq("schema") { - let file = File::open(file.path()).expect("Schema File should exist"); - - let schema = match serde_json::from_reader(file) { - Ok(schema) => schema, - Err(_) => continue, - }; - schemas.push(schema); - } + if let Some(ext) = file.path().extension() + && ext.eq("schema") + { + let file = File::open(file.path()).expect("Schema File should exist"); + + let schema = match serde_json::from_reader(file) { + Ok(schema) => schema, + Err(_) => continue, + }; + schemas.push(schema); } } @@ -742,26 +742,26 @@ impl Stream { } // After deleting the last file, try to remove the inprocess directory if empty - if i == arrow_files.len() - 1 { - if let Some(parent_dir) = file.parent() { - match fs::read_dir(parent_dir) { - Ok(mut entries) => { - if entries.next().is_none() { - if let Err(err) = fs::remove_dir(parent_dir) { - warn!( - "Failed to remove inprocess directory {}: {err}", - parent_dir.display() - ); - } - } - } - Err(err) => { + if i == arrow_files.len() - 1 + && let Some(parent_dir) = file.parent() + { + match fs::read_dir(parent_dir) { + Ok(mut entries) => { + if entries.next().is_none() + && let Err(err) = fs::remove_dir(parent_dir) + { warn!( - "Failed to read inprocess directory {}: {err}", + "Failed to remove inprocess directory {}: {err}", parent_dir.display() ); } } + Err(err) => { + warn!( + "Failed to read inprocess directory {}: {err}", + parent_dir.display() + ); + } } } } diff --git a/src/query/mod.rs b/src/query/mod.rs index 4ed3862d3..dea173db2 100644 --- a/src/query/mod.rs +++ b/src/query/mod.rs @@ -290,10 +290,10 @@ impl Query { name: alias_name, .. }) => { - if let Expr::Column(Column { name, .. }) = &**inner_expr { - if name.to_lowercase() == "count(*)" { - return Some(alias_name); - } + if let Expr::Column(Column { name, .. }) = &**inner_expr + && name.to_lowercase() == "count(*)" + { + return Some(alias_name); } None } diff --git a/src/query/stream_schema_provider.rs b/src/query/stream_schema_provider.rs index 14274f13b..949b0f591 100644 --- a/src/query/stream_schema_provider.rs +++ b/src/query/stream_schema_provider.rs @@ -563,20 +563,20 @@ impl TableProvider for StandardTableProvider { } // Hot tier data fetch - if let Some(hot_tier_manager) = HotTierManager::global() { - if hot_tier_manager.check_stream_hot_tier_exists(&self.stream) { - self.get_hottier_exectuion_plan( - &mut execution_plans, - hot_tier_manager, - &mut manifest_files, - projection, - filters, - limit, - state, - time_partition.clone(), - ) - .await?; - } + if let Some(hot_tier_manager) = HotTierManager::global() + && hot_tier_manager.check_stream_hot_tier_exists(&self.stream) + { + self.get_hottier_exectuion_plan( + &mut execution_plans, + hot_tier_manager, + &mut manifest_files, + projection, + filters, + limit, + state, + time_partition.clone(), + ) + .await?; } if manifest_files.is_empty() { QUERY_CACHE_HIT.with_label_values(&[&self.stream]).inc(); diff --git a/src/stats.rs b/src/stats.rs index eed0d9703..5a167cc39 100644 --- a/src/stats.rs +++ b/src/stats.rs @@ -160,10 +160,10 @@ pub async fn update_deleted_stats( .with_label_values(&["data", stream_name, "parquet"]) .sub(storage_size); let stats = get_current_stats(stream_name, "json"); - if let Some(stats) = stats { - if let Err(e) = storage.put_stats(stream_name, &stats).await { - warn!("Error updating stats to objectstore due to error [{}]", e); - } + if let Some(stats) = stats + && let Err(e) = storage.put_stats(stream_name, &stats).await + { + warn!("Error updating stats to objectstore due to error [{}]", e); } Ok(()) @@ -209,10 +209,8 @@ fn delete_with_label_prefix(metrics: &IntGaugeVec, prefix: &[&str]) { // Check if all prefix elements are present in label values let all_prefixes_found = prefix.iter().all(|p| label_map.values().any(|v| v == p)); - if all_prefixes_found { - if let Err(err) = metrics.remove(&label_map) { - warn!("Error removing metric with labels {:?}: {err}", label_map); - } + if all_prefixes_found && let Err(err) = metrics.remove(&label_map) { + warn!("Error removing metric with labels {:?}: {err}", label_map); } } } diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index 1f32d9518..e247518ad 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -602,53 +602,52 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { .await .into_iter() .next() + && !stream_metadata_obs.is_empty() { - if !stream_metadata_obs.is_empty() { - for stream_metadata_bytes in stream_metadata_obs.iter() { - let stream_ob_metadata = - serde_json::from_slice::(stream_metadata_bytes)?; - all_log_sources.extend(stream_ob_metadata.log_source.clone()); - } + for stream_metadata_bytes in stream_metadata_obs.iter() { + let stream_ob_metadata = + serde_json::from_slice::(stream_metadata_bytes)?; + all_log_sources.extend(stream_ob_metadata.log_source.clone()); + } - // Merge log sources - let mut merged_log_sources: Vec = Vec::new(); - let mut log_source_map: HashMap> = HashMap::new(); + // Merge log sources + let mut merged_log_sources: Vec = Vec::new(); + let mut log_source_map: HashMap> = HashMap::new(); - for log_source_entry in all_log_sources { - let log_source_format = log_source_entry.log_source_format; - let fields = log_source_entry.fields; + for log_source_entry in all_log_sources { + let log_source_format = log_source_entry.log_source_format; + let fields = log_source_entry.fields; - log_source_map - .entry(log_source_format) - .or_default() - .extend(fields); - } + log_source_map + .entry(log_source_format) + .or_default() + .extend(fields); + } - for (log_source_format, fields) in log_source_map { - merged_log_sources.push(LogSourceEntry { - log_source_format, - fields: fields.into_iter().collect(), - }); - } + for (log_source_format, fields) in log_source_map { + merged_log_sources.push(LogSourceEntry { + log_source_format, + fields: fields.into_iter().collect(), + }); + } - let stream_ob_metadata = - serde_json::from_slice::(&stream_metadata_obs[0])?; - let stream_metadata = ObjectStoreFormat { - stats: FullStats::default(), - snapshot: Snapshot::default(), - log_source: merged_log_sources, - ..stream_ob_metadata - }; - - let stream_metadata_bytes: Bytes = serde_json::to_vec(&stream_metadata)?.into(); - self.put_object( - &stream_json_path(stream_name), - stream_metadata_bytes.clone(), - ) - .await?; + let stream_ob_metadata = + serde_json::from_slice::(&stream_metadata_obs[0])?; + let stream_metadata = ObjectStoreFormat { + stats: FullStats::default(), + snapshot: Snapshot::default(), + log_source: merged_log_sources, + ..stream_ob_metadata + }; - return Ok(stream_metadata_bytes); - } + let stream_metadata_bytes: Bytes = serde_json::to_vec(&stream_metadata)?.into(); + self.put_object( + &stream_json_path(stream_name), + stream_metadata_bytes.clone(), + ) + .await?; + + return Ok(stream_metadata_bytes); } Ok(Bytes::new()) } @@ -887,10 +886,10 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { if stats_calculated { // perform local sync for the `pstats` dataset task::spawn(async move { - if let Ok(stats_stream) = PARSEABLE.get_stream(DATASET_STATS_STREAM_NAME) { - if let Err(err) = stats_stream.flush_and_convert(false, false) { - error!("Failed in local sync for dataset stats stream: {err}"); - } + if let Ok(stats_stream) = PARSEABLE.get_stream(DATASET_STATS_STREAM_NAME) + && let Err(err) = stats_stream.flush_and_convert(false, false) + { + error!("Failed in local sync for dataset stats stream: {err}"); } }); }