diff --git a/Cargo.toml b/Cargo.toml index 013d9ac3d..097897c6f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,6 +9,7 @@ build = "build.rs" [dependencies] # Arrow and DataFusion ecosystem +arrow = "53.0.0" arrow-array = { version = "53.0.0" } arrow-flight = { version = "53.0.0", features = ["tls"] } arrow-ipc = { version = "53.0.0", features = ["zstd"] } @@ -136,7 +137,6 @@ zip = { version = "2.2.0", default-features = false, features = ["deflate"] } [dev-dependencies] maplit = "1.0" rstest = "0.23.0" -arrow = "53.0.0" [package.metadata.parseable_ui] assets-url = "https://github.com/parseablehq/console/releases/download/v0.9.16/build.zip" diff --git a/src/alerts/alerts_utils.rs b/src/alerts/alerts_utils.rs new file mode 100644 index 000000000..dbb26a78c --- /dev/null +++ b/src/alerts/alerts_utils.rs @@ -0,0 +1,466 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use arrow_array::{Float64Array, Int64Array, RecordBatch}; +use datafusion::{ + common::tree_node::TreeNode, + functions_aggregate::{ + count::count, + expr_fn::avg, + min_max::{max, min}, + sum::sum, + }, + prelude::{col, lit, Expr}, +}; +use tracing::trace; + +use crate::{ + query::{TableScanVisitor, QUERY_SESSION}, + rbac::{ + map::SessionKey, + role::{Action, Permission}, + Users, + }, + utils::time::TimeRange, +}; + +use super::{ + AggregateOperation, Aggregations, AlertConfig, AlertError, AlertOperator, AlertState, + Conditions, ALERTS, +}; + +async fn get_tables_from_query(query: &str) -> Result { + let session_state = QUERY_SESSION.state(); + let raw_logical_plan = session_state.create_logical_plan(query).await?; + + let mut visitor = TableScanVisitor::default(); + let _ = raw_logical_plan.visit(&mut visitor); + Ok(visitor) +} + +pub async fn user_auth_for_query(session_key: &SessionKey, query: &str) -> Result<(), AlertError> { + let tables = get_tables_from_query(query).await?; + let permissions = Users.get_permissions(session_key); + + for table_name in tables.into_inner().iter() { + let mut authorized = false; + + // in permission check if user can run query on the stream. + // also while iterating add any filter tags for this stream + for permission in permissions.iter() { + match permission { + Permission::Stream(Action::All, _) => { + authorized = true; + break; + } + Permission::StreamWithTag(Action::Query, ref stream, _) + if stream == table_name || stream == "*" => + { + authorized = true; + } + _ => (), + } + } + + if !authorized { + return Err(AlertError::Unauthorized); + } + } + + Ok(()) +} + +/// accept the alert +/// +/// alert contains aggregate_config +/// +/// aggregate_config contains the filters which need to be applied +/// +/// iterate over each agg config, apply filters, the evaluate for that config +/// +/// collect the results in the end +/// +/// check whether notification needs to be triggered or not +pub async fn evaluate_alert(alert: &AlertConfig) -> Result<(), AlertError> { + trace!("RUNNING EVAL TASK FOR- {alert:?}"); + + let (start_time, end_time) = match &alert.eval_type { + super::EvalConfig::RollingWindow(rolling_window) => { + (&rolling_window.eval_start, &rolling_window.eval_end) + } + }; + + let session_state = QUERY_SESSION.state(); + let raw_logical_plan = session_state.create_logical_plan(&alert.query).await?; + + // TODO: Filter tags should be taken care of!!! + let time_range = TimeRange::parse_human_time(start_time, end_time) + .map_err(|err| AlertError::CustomError(err.to_string()))?; + + let query = crate::query::Query { + raw_logical_plan, + time_range, + filter_tag: None, + }; + + // for now proceed in a similar fashion as we do in query + // TODO: in case of multiple table query does the selection of time partition make a difference? (especially when the tables don't have overlapping data) + let stream_name = if let Some(stream_name) = query.first_table_name() { + stream_name + } else { + return Err(AlertError::CustomError(format!( + "Table name not found in query- {}", + alert.query + ))); + }; + + let base_df = query + .get_dataframe(stream_name) + .await + .map_err(|err| AlertError::CustomError(err.to_string()))?; + + let mut agg_results = vec![]; + + let agg_filter_exprs = get_exprs(&alert.aggregate_config); + + let final_res = match &alert.aggregate_config { + crate::alerts::Aggregations::AND((agg1, agg2)) + | crate::alerts::Aggregations::OR((agg1, agg2)) => { + for ((agg_expr, filter), agg) in agg_filter_exprs.into_iter().zip([agg1, agg2]) { + let filtered_df = if let Some(filter) = filter { + base_df.clone().filter(filter)? + } else { + base_df.clone() + }; + + let aggregated_rows = filtered_df + .aggregate(vec![], vec![agg_expr])? + .collect() + .await?; + + let final_value = get_final_value(aggregated_rows); + + // now compare + let res = match &agg.operator { + AlertOperator::GreaterThan => final_value > agg.value, + AlertOperator::LessThan => final_value < agg.value, + AlertOperator::EqualTo => final_value == agg.value, + AlertOperator::NotEqualTo => final_value != agg.value, + AlertOperator::GreaterThanEqualTo => final_value >= agg.value, + AlertOperator::LessThanEqualTo => final_value <= agg.value, + _ => unreachable!(), + }; + + let message = if res { + if agg.condition_config.is_some() { + Some( + agg.condition_config + .as_ref() + .unwrap() + .generate_filter_message(), + ) + } else { + Some(String::default()) + } + } else { + None + }; + + agg_results.push((res, message, agg, final_value)); + } + + let res = match &alert.aggregate_config { + Aggregations::AND(_) => agg_results.iter().all(|(r, _, _, _)| *r), + Aggregations::OR(_) => agg_results.iter().any(|(r, _, _, _)| *r), + _ => unreachable!(), + }; + + res + } + crate::alerts::Aggregations::Single(agg) => { + let (agg_expr, filter) = &agg_filter_exprs[0]; + let filtered_df = if let Some(filter) = filter { + base_df.filter(filter.clone())? + } else { + base_df + }; + + let aggregated_rows = filtered_df + .aggregate(vec![], vec![agg_expr.clone()])? + .collect() + .await?; + + let final_value = get_final_value(aggregated_rows); + + // now compare + let res = match &agg.operator { + AlertOperator::GreaterThan => final_value > agg.value, + AlertOperator::LessThan => final_value < agg.value, + AlertOperator::EqualTo => final_value == agg.value, + AlertOperator::NotEqualTo => final_value != agg.value, + AlertOperator::GreaterThanEqualTo => final_value >= agg.value, + AlertOperator::LessThanEqualTo => final_value <= agg.value, + _ => unreachable!(), + }; + + let message = if res { + if agg.condition_config.is_some() { + Some( + agg.condition_config + .as_ref() + .unwrap() + .generate_filter_message(), + ) + } else { + Some(String::default()) + } + } else { + None + }; + + agg_results.push((res, message, agg, final_value)); + + res + } + }; + + if final_res { + trace!("ALERT!!!!!!"); + + let mut message = String::default(); + for (_, filter_msg, agg_config, final_value) in agg_results { + if let Some(msg) = filter_msg { + message.extend([format!( + "|{}({}) WHERE ({}) {} {} (ActualValue: {})|", + agg_config.agg, + agg_config.column, + msg, + agg_config.operator, + agg_config.value, + final_value + )]); + } else { + message.extend([format!( + "|{}({}) {} {} (ActualValue: {})", + agg_config.agg, + agg_config.column, + agg_config.operator, + agg_config.value, + final_value + )]); + } + } + + // let outbound_message = format!("AlertName: {}, Triggered TimeStamp: {}, Severity: {}, Message: {}",alert.title, Utc::now().to_rfc3339(), alert.severity, message); + + // update state + ALERTS + .update_state(&alert.id.to_string(), AlertState::Triggered, Some(message)) + .await?; + } else if alert.state.eq(&AlertState::Triggered) { + ALERTS + .update_state(&alert.id.to_string(), AlertState::Resolved, Some("".into())) + .await?; + } else { + ALERTS + .update_state(&alert.id.to_string(), AlertState::Resolved, None) + .await?; + } + + Ok(()) +} + +fn get_final_value(aggregated_rows: Vec) -> f64 { + trace!("aggregated_rows-\n{aggregated_rows:?}"); + + if let Some(f) = aggregated_rows + .first() + .and_then(|batch| { + trace!("batch.column(0)-\n{:?}", batch.column(0)); + batch.column(0).as_any().downcast_ref::() + }) + .map(|array| { + trace!("array-\n{array:?}"); + array.value(0) + }) + { + f + } else { + aggregated_rows + .first() + .and_then(|batch| { + trace!("batch.column(0)-\n{:?}", batch.column(0)); + batch.column(0).as_any().downcast_ref::() + }) + .map(|array| { + trace!("array-\n{array:?}"); + array.value(0) + }) + .unwrap_or_default() as f64 + } +} + +/// This function accepts aggregate_config and +/// returns a tuple of (aggregate expressions, filter expressions) +/// +/// It calls get_filter_expr() to get filter expressions +fn get_exprs(aggregate_config: &Aggregations) -> Vec<(Expr, Option)> { + let mut agg_expr = Vec::new(); + + match aggregate_config { + Aggregations::OR((agg1, agg2)) | Aggregations::AND((agg1, agg2)) => { + for agg in [agg1, agg2] { + let filter_expr = if let Some(where_clause) = &agg.condition_config { + let fe = get_filter_expr(where_clause); + + trace!("filter_expr-\n{fe:?}"); + + Some(fe) + } else { + None + }; + + let e = match agg.agg { + AggregateOperation::Avg => avg(col(&agg.column)), + AggregateOperation::Count => count(col(&agg.column)), + AggregateOperation::Min => min(col(&agg.column)), + AggregateOperation::Max => max(col(&agg.column)), + AggregateOperation::Sum => sum(col(&agg.column)), + }; + agg_expr.push((e, filter_expr)); + } + } + Aggregations::Single(agg) => { + let filter_expr = if let Some(where_clause) = &agg.condition_config { + let fe = get_filter_expr(where_clause); + + trace!("filter_expr-\n{fe:?}"); + + Some(fe) + } else { + None + }; + + let e = match agg.agg { + AggregateOperation::Avg => avg(col(&agg.column)), + AggregateOperation::Count => count(col(&agg.column)), + AggregateOperation::Min => min(col(&agg.column)), + AggregateOperation::Max => max(col(&agg.column)), + AggregateOperation::Sum => sum(col(&agg.column)), + }; + agg_expr.push((e, filter_expr)); + } + } + agg_expr +} + +fn get_filter_expr(where_clause: &Conditions) -> Expr { + match where_clause { + crate::alerts::Conditions::AND((expr1, expr2)) => { + let mut expr = Expr::Literal(datafusion::scalar::ScalarValue::Boolean(Some(true))); + for e in [expr1, expr2] { + let ex = match e.operator { + AlertOperator::GreaterThan => col(&e.column).gt(lit(&e.value)), + AlertOperator::LessThan => col(&e.column).lt(lit(&e.value)), + AlertOperator::EqualTo => col(&e.column).eq(lit(&e.value)), + AlertOperator::NotEqualTo => col(&e.column).not_eq(lit(&e.value)), + AlertOperator::GreaterThanEqualTo => col(&e.column).gt_eq(lit(&e.value)), + AlertOperator::LessThanEqualTo => col(&e.column).lt_eq(lit(&e.value)), + AlertOperator::Like => col(&e.column).like(lit(&e.value)), + AlertOperator::NotLike => col(&e.column).not_like(lit(&e.value)), + }; + expr = expr.and(ex); + } + expr + } + crate::alerts::Conditions::OR((expr1, expr2)) => { + let mut expr = Expr::Literal(datafusion::scalar::ScalarValue::Boolean(Some(false))); + for e in [expr1, expr2] { + let ex = match e.operator { + AlertOperator::GreaterThan => col(&e.column).gt(lit(&e.value)), + AlertOperator::LessThan => col(&e.column).lt(lit(&e.value)), + AlertOperator::EqualTo => col(&e.column).eq(lit(&e.value)), + AlertOperator::NotEqualTo => col(&e.column).not_eq(lit(&e.value)), + AlertOperator::GreaterThanEqualTo => col(&e.column).gt_eq(lit(&e.value)), + AlertOperator::LessThanEqualTo => col(&e.column).lt_eq(lit(&e.value)), + AlertOperator::Like => col(&e.column).like(lit(&e.value)), + AlertOperator::NotLike => col(&e.column).not_like(lit(&e.value)), + }; + expr = expr.or(ex); + } + expr + } + crate::alerts::Conditions::Condition(expr) => match expr.operator { + AlertOperator::GreaterThan => col(&expr.column).gt(lit(&expr.value)), + AlertOperator::LessThan => col(&expr.column).lt(lit(&expr.value)), + AlertOperator::EqualTo => col(&expr.column).eq(lit(&expr.value)), + AlertOperator::NotEqualTo => col(&expr.column).not_eq(lit(&expr.value)), + AlertOperator::GreaterThanEqualTo => col(&expr.column).gt_eq(lit(&expr.value)), + AlertOperator::LessThanEqualTo => col(&expr.column).lt_eq(lit(&expr.value)), + AlertOperator::Like => col(&expr.column).like(lit(&expr.value)), + AlertOperator::NotLike => col(&expr.column).not_like(lit(&expr.value)), + }, + } +} + +pub async fn update_alert_state( + alert_id: &str, + session_key: &SessionKey, + new_state: AlertState, +) -> Result<(), AlertError> { + // check if alert id exists in map + let alert = ALERTS.get_alert_by_id(alert_id).await?; + + // validate that the user has access to the tables mentioned + user_auth_for_query(session_key, &alert.query).await?; + + // get current state + let current_state = ALERTS.get_state(alert_id).await?; + + match current_state { + AlertState::Triggered => { + if new_state == AlertState::Triggered { + let msg = format!("Not allowed to manually go from Triggered to {new_state}"); + return Err(AlertError::InvalidStateChange(msg)); + } else { + // update state on disk and in memory + ALERTS + .update_state(alert_id, new_state, Some("".into())) + .await?; + } + } + AlertState::Silenced => { + // from here, the user can only go to Resolved + if new_state == AlertState::Resolved { + // update state on disk and in memory + ALERTS + .update_state(alert_id, new_state, Some("".into())) + .await?; + } else { + let msg = format!("Not allowed to manually go from Silenced to {new_state}"); + return Err(AlertError::InvalidStateChange(msg)); + } + } + AlertState::Resolved => { + // user shouldn't logically be changing states if current state is Resolved + let msg = format!("Not allowed to go manually from Resolved to {new_state}"); + return Err(AlertError::InvalidStateChange(msg)); + } + } + Ok(()) +} diff --git a/src/alerts/mod.rs b/src/alerts/mod.rs index 121bc6bb8..f8696febb 100644 --- a/src/alerts/mod.rs +++ b/src/alerts/mod.rs @@ -16,150 +16,67 @@ * */ -use arrow_array::cast::as_string_array; -use arrow_array::RecordBatch; -use arrow_schema::DataType; +use actix_web::http::header::ContentType; +use actix_web::web::Json; +use actix_web::{FromRequest, HttpRequest}; +use alerts_utils::user_auth_for_query; use async_trait::async_trait; -use datafusion::arrow::compute::kernels::cast; -use datafusion::arrow::datatypes::Schema; -use regex::Regex; -use serde::{Deserialize, Serialize}; -use std::fmt; - -pub mod parser; -pub mod rule; +use chrono::Utc; +use datafusion::common::tree_node::TreeNode; +use http::StatusCode; +use itertools::Itertools; +use once_cell::sync::Lazy; +use serde_json::Error as SerdeError; +use std::collections::{HashMap, HashSet}; +use std::fmt::{self, Display}; +use std::future::Future; +use std::pin::Pin; +use tokio::sync::oneshot::{Receiver, Sender}; +use tokio::sync::RwLock; +use tokio::task::JoinHandle; +use tracing::{trace, warn}; + +pub mod alerts_utils; pub mod target; -use crate::metrics::ALERTS_STATES; use crate::option::CONFIG; -use crate::utils::arrow::get_field; -use crate::utils::uid; -use crate::{storage, utils}; +use crate::query::{TableScanVisitor, QUERY_SESSION}; +use crate::rbac::map::SessionKey; +use crate::storage; +use crate::storage::ObjectStorageError; +use crate::sync::schedule_alert_task; +use crate::utils::time::TimeRange; +use crate::utils::{get_hash, uid}; -pub use self::rule::Rule; use self::target::Target; -#[derive(Default, Debug, serde::Serialize, serde::Deserialize)] -#[serde(rename_all = "camelCase")] +// these types describe the scheduled task for an alert +pub type ScheduledTaskHandlers = (JoinHandle<()>, Receiver<()>, Sender<()>); +pub type ScheduledTasks = RwLock>; + +pub const CURRENT_ALERTS_VERSION: &str = "v1"; + +pub static ALERTS: Lazy = Lazy::new(Alerts::default); + +#[derive(Debug, Default)] pub struct Alerts { - pub version: AlertVerison, - pub alerts: Vec, + pub alerts: RwLock>, + pub scheduled_tasks: ScheduledTasks, } -#[derive(Default, Debug, serde::Serialize, serde::Deserialize)] +#[derive(Default, Debug, serde::Serialize, serde::Deserialize, Clone)] #[serde(rename_all = "lowercase")] pub enum AlertVerison { #[default] V1, } -#[derive(Debug, serde::Serialize, serde::Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct Alert { - #[serde(default = "crate::utils::uid::gen")] - pub id: uid::Uid, - pub name: String, - #[serde(flatten)] - pub message: Message, - pub rule: Rule, - pub targets: Vec, -} - -impl Alert { - pub fn check_alert(&self, stream_name: &str, events: RecordBatch) { - let resolves = self.rule.resolves(events.clone()); - - for (index, state) in resolves.into_iter().enumerate() { - match state { - AlertState::Listening | AlertState::Firing => (), - alert_state @ (AlertState::SetToFiring | AlertState::Resolved) => { - let context = self.get_context( - stream_name.to_owned(), - alert_state, - &self.rule, - events.slice(index, 1), - ); - ALERTS_STATES - .with_label_values(&[ - context.stream.as_str(), - context.alert_info.alert_name.as_str(), - context.alert_info.alert_state.to_string().as_str(), - ]) - .inc(); - for target in &self.targets { - target.call(context.clone()); - } - } - } - } - } - - fn get_context( - &self, - stream_name: String, - alert_state: AlertState, - rule: &Rule, - event_row: RecordBatch, - ) -> Context { - let deployment_instance = format!( - "{}://{}", - CONFIG.parseable.get_scheme(), - CONFIG.parseable.address - ); - let deployment_id = storage::StorageMetadata::global().deployment_id; - let deployment_mode = storage::StorageMetadata::global().mode.to_string(); - let mut additional_labels = - serde_json::to_value(rule).expect("rule is perfectly deserializable"); - utils::json::flatten::flatten_with_parent_prefix(&mut additional_labels, "rule", "_") - .expect("can be flattened"); - Context::new( - stream_name, - AlertInfo::new( - self.name.clone(), - self.message.get(event_row), - rule.trigger_reason(), - alert_state, - ), - DeploymentInfo::new(deployment_instance, deployment_id, deployment_mode), - additional_labels, - ) - } -} - -#[derive(Debug, Serialize, Deserialize, Clone)] -#[serde(rename_all = "camelCase")] -pub struct Message { - pub message: String, -} - -impl Message { - // checks if message (with a column name) is valid (i.e. the column name is present in the schema) - pub fn valid(&self, schema: &Schema, column: &str) -> bool { - get_field(&schema.fields, column).is_some() - } - - pub fn extract_column_names(&self) -> Vec<&str> { - // the message can have either no column name ({column_name} not present) or any number of {column_name} present - Regex::new(r"\{(.*?)\}") - .unwrap() - .captures_iter(self.message.as_str()) - .map(|cap| cap.get(1).unwrap().as_str()) - .collect() - } - - /// Returns the message with the column names replaced with the values in the column. - fn get(&self, event: RecordBatch) -> String { - let mut replace_message = self.message.clone(); - for column in self.extract_column_names() { - if let Some(value) = event.column_by_name(column) { - let arr = cast(value, &DataType::Utf8).unwrap(); - let value = as_string_array(&arr).value(0); - - replace_message = - replace_message.replace(&format!("{{{column}}}"), value.to_string().as_str()); - } +impl From<&str> for AlertVerison { + fn from(value: &str) -> Self { + match value { + "v1" => Self::V1, + _ => unreachable!() } - replace_message } } @@ -170,65 +87,64 @@ pub trait CallableTarget { #[derive(Debug, Clone)] pub struct Context { - stream: String, alert_info: AlertInfo, deployment_info: DeploymentInfo, - additional_labels: serde_json::Value, + message: String, } impl Context { - pub fn new( - stream: String, - alert_info: AlertInfo, - deployment_info: DeploymentInfo, - additional_labels: serde_json::Value, - ) -> Self { + pub fn new(alert_info: AlertInfo, deployment_info: DeploymentInfo, message: String) -> Self { Self { - stream, alert_info, deployment_info, - additional_labels, + message, } } fn default_alert_string(&self) -> String { format!( - "{} triggered on {}\nMessage: {}\nFailing Condition: {}", + "AlertName: {}, Triggered TimeStamp: {}, Severity: {}, Message: {}", self.alert_info.alert_name, - self.stream, - self.alert_info.message, - self.alert_info.reason + Utc::now().to_rfc3339(), + self.alert_info.severity, + self.message ) } fn default_resolved_string(&self) -> String { + format!("{} is now resolved ", self.alert_info.alert_name) + } + + fn default_silenced_string(&self) -> String { format!( - "{} on {} is now resolved ", - self.alert_info.alert_name, self.stream + "Notifications for {} have been silenced ", + self.alert_info.alert_name ) } } #[derive(Debug, Clone)] pub struct AlertInfo { + alert_id: String, alert_name: String, - message: String, - reason: String, + // message: String, + // reason: String, alert_state: AlertState, + severity: String, } impl AlertInfo { pub fn new( + alert_id: String, alert_name: String, - message: String, - reason: String, alert_state: AlertState, + severity: String, ) -> Self { Self { + alert_id, alert_name, - message, - reason, alert_state, + severity, } } } @@ -254,27 +170,680 @@ impl DeploymentInfo { } } -#[derive(Debug, PartialEq, Eq, Clone, Copy)] +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub enum AlertType { + Threshold, +} + +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub enum AlertOperator { + #[serde(rename = ">")] + GreaterThan, + #[serde(rename = "<")] + LessThan, + #[serde(rename = "=")] + EqualTo, + #[serde(rename = "<>")] + NotEqualTo, + #[serde(rename = ">=")] + GreaterThanEqualTo, + #[serde(rename = "<=")] + LessThanEqualTo, + #[serde(rename = "like")] + Like, + #[serde(rename = "not like")] + NotLike, +} + +impl Display for AlertOperator { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + AlertOperator::GreaterThan => write!(f, ">"), + AlertOperator::LessThan => write!(f, "<"), + AlertOperator::EqualTo => write!(f, "="), + AlertOperator::NotEqualTo => write!(f, "<>"), + AlertOperator::GreaterThanEqualTo => write!(f, ">="), + AlertOperator::LessThanEqualTo => write!(f, "<="), + AlertOperator::Like => write!(f, "like"), + AlertOperator::NotLike => write!(f, "not like"), + } + } +} + +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub enum AggregateOperation { + Avg, + Count, + Min, + Max, + Sum, +} + +impl Display for AggregateOperation { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + AggregateOperation::Avg => write!(f, "Avg"), + AggregateOperation::Count => write!(f, "Count"), + AggregateOperation::Min => write!(f, "Min"), + AggregateOperation::Max => write!(f, "Max"), + AggregateOperation::Sum => write!(f, "Sum"), + } + } +} + +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] +pub struct OperationConfig { + pub column: String, + pub operator: Option, + pub value: Option, +} + +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct FilterConfig { + pub conditions: Vec, +} + +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] +pub struct ConditionConfig { + pub column: String, + pub operator: AlertOperator, + pub value: String, +} + +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] +pub enum Conditions { + AND((ConditionConfig, ConditionConfig)), + OR((ConditionConfig, ConditionConfig)), + Condition(ConditionConfig), +} + +impl Conditions { + pub fn generate_filter_message(&self) -> String { + match self { + Conditions::AND((expr1, expr2)) => { + format!( + "[{} {} {} AND {} {} {}]", + expr1.column, + expr1.operator, + expr1.value, + expr2.column, + expr2.operator, + expr2.value + ) + } + Conditions::OR((expr1, expr2)) => { + format!( + "[{} {} {} OR {} {} {}]", + expr1.column, + expr1.operator, + expr1.value, + expr2.column, + expr2.operator, + expr2.value + ) + } + Conditions::Condition(expr) => { + format!("[{} {} {}]", expr.column, expr.operator, expr.value) + } + } + } +} + +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct AggregateConfig { + pub agg: AggregateOperation, + pub condition_config: Option, + pub column: String, + pub operator: AlertOperator, + pub value: f64, +} + +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] +pub enum Aggregations { + AND((AggregateConfig, AggregateConfig)), + OR((AggregateConfig, AggregateConfig)), + Single(AggregateConfig), +} + +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] +pub enum AggregateCondition { + AND, + OR, +} + +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct RollingWindow { + // x minutes (25m) + pub eval_start: String, + // should always be "now" + pub eval_end: String, + // x minutes (5m) + pub eval_frequency: u32, +} + +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub enum EvalConfig { + RollingWindow(RollingWindow), +} + +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct AlertEval {} + +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, Copy, PartialEq, Default)] +#[serde(rename_all = "camelCase")] pub enum AlertState { - Listening, - SetToFiring, - Firing, + Triggered, + Silenced, + #[default] Resolved, } -impl Default for AlertState { - fn default() -> Self { - Self::Listening +impl Display for AlertState { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + AlertState::Triggered => write!(f, "Triggered"), + AlertState::Silenced => write!(f, "Silenced"), + AlertState::Resolved => write!(f, "Resolved"), + } } } -impl fmt::Display for AlertState { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match *self { - AlertState::Listening => write!(f, "Listening"), - AlertState::SetToFiring => write!(f, "SetToFiring"), - AlertState::Firing => write!(f, "Firing"), - AlertState::Resolved => write!(f, "Resolved"), +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, Default)] +#[serde(rename_all = "camelCase")] +pub enum Severity { + Critical, + High, + #[default] + Medium, + Low, +} + +impl Display for Severity { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Severity::Critical => write!(f, "Critical (P0)"), + Severity::High => write!(f, "High (P1)"), + Severity::Medium => write!(f, "Medium (P2)"), + Severity::Low => write!(f, "Low (P3)"), } } } + +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct AlertRequest { + #[serde(default = "Severity::default")] + pub severity: Severity, + pub title: String, + pub query: String, + pub alert_type: AlertType, + pub aggregate_config: Aggregations, + pub eval_type: EvalConfig, + pub targets: Vec, +} + +impl FromRequest for AlertRequest { + type Error = actix_web::Error; + type Future = Pin>>>; + + fn from_request(req: &HttpRequest, payload: &mut actix_web::dev::Payload) -> Self::Future { + let body = Json::::from_request(req, payload); + let fut = async move { + let body = body.await?.into_inner(); + Ok(body) + }; + + Box::pin(fut) + } +} + +impl AlertRequest { + pub fn modify(self, alert: AlertConfig) -> AlertConfig { + AlertConfig { + version: alert.version, + id: alert.id, + severity: alert.severity, + title: self.title, + query: self.query, + alert_type: self.alert_type, + aggregate_config: self.aggregate_config, + eval_type: self.eval_type, + targets: self.targets, + state: AlertState::default(), + } + } +} + +impl From for AlertConfig { + fn from(val: AlertRequest) -> AlertConfig { + AlertConfig { + version: AlertVerison::from(CURRENT_ALERTS_VERSION), + id: get_hash(Utc::now().timestamp_micros().to_string().as_str()), + severity: val.severity, + title: val.title, + query: val.query, + alert_type: val.alert_type, + aggregate_config: val.aggregate_config, + eval_type: val.eval_type, + targets: val.targets, + state: AlertState::default(), + } + } +} + +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct AlertConfig { + pub version: AlertVerison, + pub id: String, + pub severity: Severity, + pub title: String, + pub query: String, + pub alert_type: AlertType, + pub aggregate_config: Aggregations, + pub eval_type: EvalConfig, + pub targets: Vec, + // for new alerts, state should be resolved + #[serde(default = "AlertState::default")] + pub state: AlertState, +} + +impl AlertConfig { + /// Validations + pub async fn validate<'a>(&'_ self) -> Result<(), AlertError> { + // validate evalType + let eval_frequency = match &self.eval_type { + EvalConfig::RollingWindow(rolling_window) => { + if rolling_window.eval_end != "now" { + return Err(AlertError::Metadata("evalEnd should be now")); + } + + if humantime::parse_duration(&rolling_window.eval_start).is_err() { + return Err(AlertError::Metadata( + "evalStart should be of type humantime", + )); + } + rolling_window.eval_frequency + } + }; + + // validate that target repeat notifs !> eval_frequency + for target in &self.targets { + match &target.timeout.times { + target::Retry::Infinite => {} + target::Retry::Finite(repeat) => { + let notif_duration = target.timeout.interval * *repeat as u32; + if (notif_duration.as_secs_f64()).gt(&((eval_frequency * 60) as f64)) { + return Err(AlertError::Metadata( + "evalFrequency should be greater than target repetition interval", + )); + } + } + } + } + + let session_state = QUERY_SESSION.state(); + let raw_logical_plan = session_state.create_logical_plan(&self.query).await?; + + // create a visitor to extract the table names present in query + let mut visitor = TableScanVisitor::default(); + let _ = raw_logical_plan.visit(&mut visitor); + + let table = visitor.into_inner().first().unwrap().to_owned(); + + let lowercase = self.query.split(&table).collect_vec()[0].to_lowercase(); + + if lowercase + .strip_prefix(" ") + .unwrap_or(&lowercase) + .strip_suffix(" ") + .unwrap_or(&lowercase) + .ne("select * from") + { + return Err(AlertError::Metadata( + "Query needs to be select * from ", + )); + } + + // TODO: Filter tags should be taken care of!!! + let time_range = TimeRange::parse_human_time("1m", "now") + .map_err(|err| AlertError::CustomError(err.to_string()))?; + + let query = crate::query::Query { + raw_logical_plan, + time_range, + filter_tag: None, + }; + + // for now proceed in a similar fashion as we do in query + // TODO: in case of multiple table query does the selection of time partition make a difference? (especially when the tables don't have overlapping data) + let stream_name = if let Some(stream_name) = query.first_table_name() { + stream_name + } else { + return Err(AlertError::CustomError(format!( + "Table name not found in query- {}", + self.query + ))); + }; + + let base_df = query + .get_dataframe(stream_name) + .await + .map_err(|err| AlertError::CustomError(err.to_string()))?; + + // now that we have base_df, verify that it has + // columns from aggregate config + let columns = self.get_agg_config_cols(); + + base_df.select_columns(columns.iter().map(|c| c.as_str()).collect_vec().as_slice())?; + Ok(()) + } + + // validate whether + fn get_agg_config_cols(&self) -> HashSet<&String> { + let mut columns: HashSet<&String> = HashSet::new(); + match &self.aggregate_config { + Aggregations::AND((agg1, agg2)) | Aggregations::OR((agg1, agg2)) => { + columns.insert(&agg1.column); + columns.insert(&agg2.column); + + if let Some(condition) = &agg1.condition_config { + columns.extend(self.get_condition_cols(condition)); + } + } + Aggregations::Single(agg) => { + columns.insert(&agg.column); + + if let Some(condition) = &agg.condition_config { + columns.extend(self.get_condition_cols(condition)); + } + } + } + columns + } + + fn get_condition_cols<'a>(&'a self, condition: &'a Conditions) -> HashSet<&'a String> { + let mut columns: HashSet<&String> = HashSet::new(); + match condition { + Conditions::AND((c1, c2)) | Conditions::OR((c1, c2)) => { + columns.insert(&c1.column); + columns.insert(&c2.column); + } + Conditions::Condition(c) => { + columns.insert(&c.column); + } + } + columns + } + + pub fn get_eval_frequency(&self) -> u32 { + match &self.eval_type { + EvalConfig::RollingWindow(rolling_window) => rolling_window.eval_frequency, + } + } + + fn get_context(&self) -> Context { + let deployment_instance = format!( + "{}://{}", + CONFIG.parseable.get_scheme(), + CONFIG.parseable.address + ); + let deployment_id = storage::StorageMetadata::global().deployment_id; + let deployment_mode = storage::StorageMetadata::global().mode.to_string(); + + // let additional_labels = + // serde_json::to_value(rule).expect("rule is perfectly deserializable"); + // let flatten_additional_labels = + // utils::json::flatten::flatten_with_parent_prefix(additional_labels, "rule", "_") + // .expect("can be flattened"); + + Context::new( + AlertInfo::new( + self.id.to_string(), + self.title.clone(), + self.state, + self.severity.clone().to_string(), + ), + DeploymentInfo::new(deployment_instance, deployment_id, deployment_mode), + String::default(), + ) + } + + pub async fn trigger_notifications(&self, message: String) -> Result<(), AlertError> { + let mut context = self.get_context(); + context.message = message; + for target in &self.targets { + trace!("Target (trigger_notifications)-\n{target:?}"); + target.call(context.clone()); + } + Ok(()) + } +} + +#[derive(Debug, thiserror::Error)] +pub enum AlertError { + #[error("Storage Error: {0}")] + ObjectStorage(#[from] ObjectStorageError), + #[error("Serde Error: {0}")] + Serde(#[from] SerdeError), + #[error("Cannot perform this operation: {0}")] + Metadata(&'static str), + #[error("User is not authorized to run this query")] + Unauthorized, + #[error("ActixError: {0}")] + Error(#[from] actix_web::Error), + #[error("DataFusion Error: {0}")] + DatafusionError(#[from] datafusion::error::DataFusionError), + #[error("Error: {0}")] + CustomError(String), + #[error("Invalid State Change: {0}")] + InvalidStateChange(String), +} + +impl actix_web::ResponseError for AlertError { + fn status_code(&self) -> StatusCode { + match self { + Self::ObjectStorage(_) => StatusCode::INTERNAL_SERVER_ERROR, + Self::Serde(_) => StatusCode::BAD_REQUEST, + Self::Metadata(_) => StatusCode::BAD_REQUEST, + Self::Unauthorized => StatusCode::BAD_REQUEST, + Self::Error(_) => StatusCode::INTERNAL_SERVER_ERROR, + Self::DatafusionError(_) => StatusCode::INTERNAL_SERVER_ERROR, + Self::CustomError(_) => StatusCode::BAD_REQUEST, + Self::InvalidStateChange(_) => StatusCode::BAD_REQUEST, + } + } + + fn error_response(&self) -> actix_web::HttpResponse { + actix_web::HttpResponse::build(self.status_code()) + .insert_header(ContentType::plaintext()) + .body(self.to_string()) + } +} + +impl Alerts { + /// Loads alerts from disk + /// spawn scheduled tasks + /// Evaluate + pub async fn load(&self) -> Result<(), AlertError> { + let mut this = vec![]; + let store = CONFIG.storage().get_object_store(); + let all_alerts = store.get_alerts().await.unwrap_or_default(); + + for alert in all_alerts { + if alert.is_empty() { + continue; + } + + let alert: AlertConfig = serde_json::from_slice(&alert)?; + + let (handle, rx, tx) = + schedule_alert_task(alert.get_eval_frequency(), alert.clone()).await?; + + self.update_task(&alert.id, handle, rx, tx).await; + + this.push(alert); + } + + let mut s = self.alerts.write().await; + s.append(&mut this.clone()); + drop(s); + + Ok(()) + } + + /// Returns a list of alerts that the user has access to (based on query auth) + pub async fn list_alerts_for_user( + &self, + session: SessionKey, + ) -> Result, AlertError> { + let mut alerts: Vec = Vec::new(); + for alert in self.alerts.read().await.iter() { + // filter based on whether the user can execute this query or not + let query = &alert.query; + if user_auth_for_query(&session, query).await.is_ok() { + alerts.push(alert.to_owned()); + } + } + + Ok(alerts) + } + + /// Returns a sigle alert that the user has access to (based on query auth) + pub async fn get_alert_by_id(&self, id: &str) -> Result { + let read_access = self.alerts.read().await; + let alert = read_access.iter().find(|a| a.id == id); + + if let Some(alert) = alert { + Ok(alert.clone()) + } else { + Err(AlertError::CustomError(format!( + "No alert found for the given ID- {id}" + ))) + } + } + + /// Update the in-mem vector of alerts + pub async fn update(&self, alert: &AlertConfig) { + let mut s = self.alerts.write().await; + s.retain(|a| a.id != alert.id); + s.push(alert.clone()); + } + + /// Update the state of alert + pub async fn update_state( + &self, + alert_id: &str, + new_state: AlertState, + trigger_notif: Option, + ) -> Result<(), AlertError> { + let store = CONFIG.storage().get_object_store(); + + // read and modify alert + let mut alert = self.get_alert_by_id(alert_id).await?; + trace!("get alert state by id-\n{}", alert.state); + + alert.state = new_state; + + trace!("new state-\n{}", alert.state); + + // save to disk + store.put_alert(alert_id, &alert).await?; + + // modify in memory + let mut writer = self.alerts.write().await; + let alert_to_update = writer.iter_mut().find(|alert| alert.id == alert_id); + if let Some(alert) = alert_to_update { + trace!("in memory alert-\n{}", alert.state); + alert.state = new_state; + trace!("in memory updated alert-\n{}", alert.state); + }; + drop(writer); + + if trigger_notif.is_some() { + trace!("trigger notif on-\n{}", alert.state); + alert.trigger_notifications(trigger_notif.unwrap()).await?; + } + + Ok(()) + } + + /// Remove alert and scheduled task from disk and memory + pub async fn delete(&self, alert_id: &str) -> Result<(), AlertError> { + // delete from memory + let read_access = self.alerts.read().await; + + let index = read_access + .iter() + .enumerate() + .find(|(_, alert)| alert.id == alert_id) + .to_owned(); + + if let Some((index, _)) = index { + // drop the read access in order to get exclusive write access + drop(read_access); + self.alerts.write().await.remove(index); + trace!("removed alert from memory"); + } else { + warn!("Alert ID- {alert_id} not found in memory!"); + } + Ok(()) + } + + /// Get state of alert using alert_id + pub async fn get_state(&self, alert_id: &str) -> Result { + let read_access = self.alerts.read().await; + let alert = read_access.iter().find(|a| a.id == alert_id); + + if let Some(alert) = alert { + Ok(alert.state) + } else { + let msg = format!("No alert present for ID- {alert_id}"); + Err(AlertError::CustomError(msg)) + } + } + + /// Update the scheduled alert tasks in-memory map + pub async fn update_task( + &self, + id: &str, + handle: JoinHandle<()>, + rx: Receiver<()>, + tx: Sender<()>, + ) { + let mut s = self.scheduled_tasks.write().await; + s.remove(id); + s.insert(id.to_owned(), (handle, rx, tx)); + } + + /// Remove a scheduled alert task + pub async fn delete_task(&self, alert_id: &str) -> Result<(), AlertError> { + let read_access = self.scheduled_tasks.read().await; + + let hashed_object = read_access.iter().find(|(id, _)| *id == alert_id); + + if hashed_object.is_some() { + // drop the read access in order to get exclusive write access + drop(read_access); + + // now delete from hashmap + let removed = self.scheduled_tasks.write().await.remove(alert_id); + + if removed.is_none() { + trace!("Unable to remove alert task {alert_id} from hashmap"); + } + } else { + trace!("Alert task {alert_id} not found in hashmap"); + } + + Ok(()) + } +} diff --git a/src/alerts/parser.rs b/src/alerts/parser.rs index 562c14b07..98f9ca871 100644 --- a/src/alerts/parser.rs +++ b/src/alerts/parser.rs @@ -229,7 +229,7 @@ impl FromStr for CompositeRule { mod tests { use std::str::FromStr; - use crate::alerts::rule::{ + use crate::handlers::http::alerts::rule::{ base::{ ops::{NumericOperator, StringOperator}, NumericRule, StringRule, @@ -237,6 +237,8 @@ mod tests { CompositeRule, }; + + #[test] fn test_and_or_not() { let input = r#"key=500 and key="value" or !(key=300)"#; diff --git a/src/alerts/target.rs b/src/alerts/target.rs index c7e2c7586..664291aec 100644 --- a/src/alerts/target.rs +++ b/src/alerts/target.rs @@ -28,9 +28,9 @@ use chrono::Utc; use http::{header::AUTHORIZATION, HeaderMap, HeaderValue}; use humantime_serde::re::humantime; use reqwest::ClientBuilder; -use tracing::error; +use tracing::{error, trace, warn}; -use crate::utils::json; +use super::ALERTS; use super::{AlertState, CallableTarget, Context}; @@ -42,7 +42,13 @@ pub enum Retry { Finite(usize), } -#[derive(Debug, serde::Serialize, serde::Deserialize)] +impl Default for Retry { + fn default() -> Self { + Retry::Finite(1) + } +} + +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] #[serde(rename_all = "lowercase")] #[serde(try_from = "TargetVerifier")] pub struct Target { @@ -54,24 +60,29 @@ pub struct Target { impl Target { pub fn call(&self, context: Context) { + trace!("target.call context- {context:?}"); let timeout = &self.timeout; let resolves = context.alert_info.alert_state; let mut state = timeout.state.lock().unwrap(); + trace!("target.call state- {state:?}"); + state.alert_state = resolves; match resolves { - AlertState::SetToFiring => { - state.alert_state = AlertState::Firing; + AlertState::Triggered => { if !state.timed_out { + // call once and then start sleeping + // reduce repeats by 1 + call_target(self.target.clone(), context.clone()); + trace!("state not timed out- {state:?}"); // set state state.timed_out = true; state.awaiting_resolve = true; drop(state); self.spawn_timeout_task(timeout, context.clone()); - call_target(self.target.clone(), context) } } - AlertState::Resolved => { - state.alert_state = AlertState::Listening; + alert_state @ (AlertState::Resolved | AlertState::Silenced) => { + state.alert_state = alert_state; if state.timed_out { // if in timeout and resolve came in, only process if it's the first one ( awaiting resolve ) if state.awaiting_resolve { @@ -84,63 +95,88 @@ impl Target { call_target(self.target.clone(), context); } - _ => unreachable!(), } } - fn spawn_timeout_task(&self, repeat: &Timeout, alert_context: Context) { - let state = Arc::clone(&repeat.state); - let retry = repeat.times; - let timeout = repeat.interval; + fn spawn_timeout_task(&self, target_timeout: &Timeout, alert_context: Context) { + trace!("repeat-\n{target_timeout:?}"); + let state = Arc::clone(&target_timeout.state); + let retry = target_timeout.times; + let timeout = target_timeout.interval; let target = self.target.clone(); + let alert_id = alert_context.alert_info.alert_id.clone(); - let sleep_and_check_if_call = move |timeout_state: Arc>| { - async move { - tokio::time::sleep(timeout).await; - let mut state = timeout_state.lock().unwrap(); - if state.alert_state == AlertState::Firing { - // it is still firing .. sleep more and come back - state.awaiting_resolve = true; - true - } else { - state.timed_out = false; - false + let sleep_and_check_if_call = + move |timeout_state: Arc>, current_state: AlertState| { + async move { + tokio::time::sleep(timeout).await; + + let mut state = timeout_state.lock().unwrap(); + + if current_state == AlertState::Triggered { + // it is still firing .. sleep more and come back + state.awaiting_resolve = true; + true + } else { + state.timed_out = false; + false + } } - } - }; + }; - actix_web::rt::spawn(async move { + trace!("Spawning retry task"); + tokio::spawn(async move { match retry { Retry::Infinite => loop { - let should_call = sleep_and_check_if_call(Arc::clone(&state)).await; + let current_state = if let Ok(state) = ALERTS.get_state(&alert_id).await { + state + } else { + *state.lock().unwrap() = TimeoutState::default(); + warn!("Unable to fetch state for given alert_id- {alert_id}, stopping target notifs"); + return; + }; + + let should_call = + sleep_and_check_if_call(Arc::clone(&state), current_state).await; if should_call { call_target(target.clone(), alert_context.clone()) } }, Retry::Finite(times) => { - for _ in 0..times { - let should_call = sleep_and_check_if_call(Arc::clone(&state)).await; + for _ in 0..(times - 1) { + let current_state = if let Ok(state) = ALERTS.get_state(&alert_id).await { + state + } else { + *state.lock().unwrap() = TimeoutState::default(); + warn!("Unable to fetch state for given alert_id- {alert_id}, stopping target notifs"); + return; + }; + + let should_call = + sleep_and_check_if_call(Arc::clone(&state), current_state).await; if should_call { call_target(target.clone(), alert_context.clone()) } } - // fallback for if this task only observed FIRING on all RETRIES - // Stream might be dead and sending too many alerts is not great - // Send and alert stating that this alert will only work once it has seen a RESOLVE - state.lock().unwrap().timed_out = false; - let mut context = alert_context; - context.alert_info.message = format!( - "Triggering alert did not resolve itself after {times} retries, This alert is paused until it resolves"); - // Send and exit this task. - call_target(target, context); + // // fallback for if this task only observed FIRING on all RETRIES + // // Stream might be dead and sending too many alerts is not great + // // Send and alert stating that this alert will only work once it has seen a RESOLVE + // state.lock().unwrap().timed_out = false; + // let context = alert_context; + // // context.alert_info.message = format!( + // // "Triggering alert did not resolve itself after {times} retries, This alert is paused until it resolves"); + // // Send and exit this task. + // call_target(target, context); } } + *state.lock().unwrap() = TimeoutState::default(); }); } } fn call_target(target: TargetType, context: Context) { - actix_web::rt::spawn(async move { target.call(&context).await }); + trace!("Calling target with context- {context:?}"); + tokio::spawn(async move { target.call(&context).await }); } #[derive(Debug, serde::Deserialize)] @@ -230,13 +266,15 @@ impl CallableTarget for SlackWebHook { .expect("Client can be constructed on this system"); let alert = match payload.alert_info.alert_state { - AlertState::SetToFiring => { + AlertState::Triggered => { serde_json::json!({ "text": payload.default_alert_string() }) } AlertState::Resolved => { serde_json::json!({ "text": payload.default_resolved_string() }) } - _ => unreachable!(), + AlertState::Silenced => { + serde_json::json!({ "text": payload.default_silenced_string() }) + } }; if let Err(e) = client.post(&self.endpoint).json(&alert).send().await { @@ -268,9 +306,9 @@ impl CallableTarget for OtherWebHook { .expect("Client can be constructed on this system"); let alert = match payload.alert_info.alert_state { - AlertState::SetToFiring => payload.default_alert_string(), + AlertState::Triggered => payload.default_alert_string(), AlertState::Resolved => payload.default_resolved_string(), - _ => unreachable!(), + AlertState::Silenced => payload.default_silenced_string(), }; let request = client @@ -318,33 +356,33 @@ impl CallableTarget for AlertManager { let mut alerts = serde_json::json!([{ "labels": { "alertname": payload.alert_info.alert_name, - "stream": payload.stream, + // "stream": payload.stream, "deployment_instance": payload.deployment_info.deployment_instance, "deployment_id": payload.deployment_info.deployment_id, "deployment_mode": payload.deployment_info.deployment_mode }, "annotations": { - "message": payload.alert_info.message, - "reason": payload.alert_info.reason + "message": "MESSAGE", + "reason": "REASON" } }]); let alert = &mut alerts[0]; - alert["labels"].as_object_mut().expect("is object").extend( - payload - .additional_labels - .as_object() - .expect("is object") - .iter() - // filter non null values for alertmanager and only pass strings - .filter(|(_, value)| !value.is_null()) - .map(|(k, value)| (k.to_owned(), json::convert_to_string(value))), - ); + // alert["labels"].as_object_mut().expect("is object").extend( + // payload + // .additional_labels + // .as_object() + // .expect("is object") + // .iter() + // // filter non null values for alertmanager and only pass strings + // .filter(|(_, value)| !value.is_null()) + // .map(|(k, value)| (k.to_owned(), json::convert_to_string(value))), + // ); // fill in status label accordingly match payload.alert_info.alert_state { - AlertState::SetToFiring => alert["labels"]["status"] = "firing".into(), + AlertState::Triggered => alert["labels"]["status"] = "triggered".into(), AlertState::Resolved => { alert["labels"]["status"] = "resolved".into(); alert["annotations"]["reason"] = @@ -353,7 +391,14 @@ impl CallableTarget for AlertManager { .to_rfc3339_opts(chrono::SecondsFormat::Millis, true) .into(); } - _ => unreachable!(), + AlertState::Silenced => { + alert["labels"]["status"] = "silenced".into(); + alert["annotations"]["reason"] = + serde_json::Value::String(payload.default_silenced_string()); + // alert["endsAt"] = Utc::now() + // .to_rfc3339_opts(chrono::SecondsFormat::Millis, true) + // .into(); + } }; if let Err(e) = client.post(&self.endpoint).json(&alerts).send().await { @@ -362,10 +407,11 @@ impl CallableTarget for AlertManager { } } -#[derive(Debug, serde::Serialize, serde::Deserialize)] +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] pub struct Timeout { #[serde(with = "humantime_serde")] pub interval: Duration, + #[serde(default = "Retry::default")] pub times: Retry, #[serde(skip)] pub state: Arc>, @@ -374,8 +420,8 @@ pub struct Timeout { impl Default for Timeout { fn default() -> Self { Self { - interval: Duration::from_secs(200), - times: Retry::Finite(5), + interval: Duration::from_secs(60), + times: Retry::default(), state: Arc::>::default(), } } diff --git a/src/analytics.rs b/src/analytics.rs index 85b14b87a..5a2f218b6 100644 --- a/src/analytics.rs +++ b/src/analytics.rs @@ -290,7 +290,7 @@ async fn build_metrics() -> HashMap { } pub fn init_analytics_scheduler() -> anyhow::Result<()> { - info!("Setting up schedular for anonymous user analytics"); + info!("Setting up scheduler for anonymous user analytics"); let mut scheduler = AsyncScheduler::new(); scheduler diff --git a/src/event/mod.rs b/src/event/mod.rs index 2e9bc7359..78f70326a 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -24,7 +24,6 @@ use arrow_array::RecordBatch; use arrow_schema::{Field, Fields, Schema}; use itertools::Itertools; use std::sync::Arc; -use tracing::error; use self::error::EventError; pub use self::writer::STREAM_WRITERS; @@ -88,13 +87,6 @@ impl Event { crate::livetail::LIVETAIL.process(&self.stream_name, &self.rb); - if let Err(e) = metadata::STREAM_INFO - .check_alerts(&self.stream_name, &self.rb) - .await - { - error!("Error checking for alerts. {:?}", e); - } - Ok(()) } diff --git a/src/handlers/http/alerts.rs b/src/handlers/http/alerts.rs new file mode 100644 index 000000000..c5b7069af --- /dev/null +++ b/src/handlers/http/alerts.rs @@ -0,0 +1,216 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use crate::{ + option::CONFIG, + storage::{object_storage::alert_json_path, ALERTS_ROOT_DIRECTORY}, + sync::schedule_alert_task, + utils::actix::extract_session_key_from_req, +}; +use actix_web::{web, HttpRequest, Responder}; +use bytes::Bytes; +use relative_path::RelativePathBuf; + +use crate::alerts::{ + alerts_utils::user_auth_for_query, AlertConfig, AlertError, AlertRequest, AlertState, ALERTS, +}; + +// GET /alerts +/// User needs at least a read access to the stream(s) that is being referenced in an alert +/// Read all alerts then return alerts which satisfy the condition +pub async fn list(req: HttpRequest) -> Result { + let session_key = extract_session_key_from_req(&req)?; + let alerts = ALERTS.list_alerts_for_user(session_key).await?; + + Ok(web::Json(alerts)) +} + +// POST /alerts +pub async fn post(req: HttpRequest, alert: AlertRequest) -> Result { + let alert: AlertConfig = alert.into(); + alert.validate().await?; + + // validate the incoming alert query + // does the user have access to these tables or not? + let session_key = extract_session_key_from_req(&req)?; + user_auth_for_query(&session_key, &alert.query).await?; + + // create scheduled tasks + let (handle, rx, tx) = schedule_alert_task(alert.get_eval_frequency(), alert.clone()).await?; + + // now that we've validated that the user can run this query + // move on to saving the alert in ObjectStore + ALERTS.update(&alert).await; + + let path = alert_json_path(&alert.id.to_string()); + + let store = CONFIG.storage().get_object_store(); + let alert_bytes = serde_json::to_vec(&alert)?; + store.put_object(&path, Bytes::from(alert_bytes)).await?; + + ALERTS.update_task(&alert.id, handle, rx, tx).await; + + Ok(web::Json(alert)) +} + +// GET /alerts/{alert_id} +pub async fn get(req: HttpRequest) -> Result { + let session_key = extract_session_key_from_req(&req)?; + let id = req + .match_info() + .get("alert_id") + .ok_or(AlertError::Metadata("No alert ID Provided"))?; + + let alert = ALERTS.get_alert_by_id(id).await?; + // validate that the user has access to the tables mentioned + user_auth_for_query(&session_key, &alert.query).await?; + + Ok(web::Json(alert)) +} + +// DELETE /alerts/{alert_id} +/// Deletion should happen from disk, sheduled tasks, then memory +pub async fn delete(req: HttpRequest) -> Result { + let session_key = extract_session_key_from_req(&req)?; + let alert_id = req + .match_info() + .get("alert_id") + .ok_or(AlertError::Metadata("No alert ID Provided"))?; + + let alert = ALERTS.get_alert_by_id(alert_id).await?; + + // validate that the user has access to the tables mentioned + user_auth_for_query(&session_key, &alert.query).await?; + + let store = CONFIG.storage().get_object_store(); + let alert_path = alert_json_path(alert_id); + + // delete from disk + store + .delete_object(&alert_path) + .await + .map_err(AlertError::ObjectStorage)?; + + // delete from disk and memory + ALERTS.delete(alert_id).await?; + + // delete the scheduled task + ALERTS.delete_task(alert_id).await?; + + Ok(format!("Deleted alert with ID- {alert_id}")) +} + +// PUT /alerts/{alert_id} +/// first save on disk, then in memory +/// then modify scheduled task +pub async fn modify(req: HttpRequest, alert: AlertRequest) -> Result { + let session_key = extract_session_key_from_req(&req)?; + let alert_id = req + .match_info() + .get("alert_id") + .ok_or(AlertError::Metadata("No alert ID Provided"))?; + + // check if alert id exists in map + let old_alert = ALERTS.get_alert_by_id(alert_id).await?; + + // validate that the user has access to the tables mentioned + // in the old as well as the modified alert + user_auth_for_query(&session_key, &old_alert.query).await?; + user_auth_for_query(&session_key, &alert.query).await?; + + let store = CONFIG.storage().get_object_store(); + + // fetch the alert object for the relevant ID + let old_alert_config: AlertConfig = serde_json::from_slice( + &store + .get_object(&RelativePathBuf::from_iter([ + ALERTS_ROOT_DIRECTORY, + &format!("{alert_id}.json"), + ])) + .await?, + )?; + + let alert = alert.modify(old_alert_config); + alert.validate().await?; + + // modify task + let (handle, rx, tx) = schedule_alert_task(alert.get_eval_frequency(), alert.clone()).await?; + + // modify on disk + store.put_alert(&alert.id.to_string(), &alert).await?; + + // modify in memory + ALERTS.update(&alert).await; + + ALERTS.update_task(&alert.id, handle, rx, tx).await; + + Ok(web::Json(alert)) +} + +// PUT /alerts/{alert_id}/update_state +pub async fn update_state(req: HttpRequest, state: String) -> Result { + let session_key = extract_session_key_from_req(&req)?; + let alert_id = req + .match_info() + .get("alert_id") + .ok_or(AlertError::Metadata("No alert ID Provided"))?; + + // check if alert id exists in map + let alert = ALERTS.get_alert_by_id(alert_id).await?; + + // validate that the user has access to the tables mentioned + user_auth_for_query(&session_key, &alert.query).await?; + + // get current state + let current_state = ALERTS.get_state(alert_id).await?; + + let new_state: AlertState = serde_json::from_str(&state)?; + + match current_state { + AlertState::Triggered => { + if new_state == AlertState::Triggered { + let msg = format!("Not allowed to manually go from Triggered to {new_state}"); + return Err(AlertError::InvalidStateChange(msg)); + } else { + // update state on disk and in memory + ALERTS + .update_state(alert_id, new_state, Some("".into())) + .await?; + } + } + AlertState::Silenced => { + // from here, the user can only go to Resolved + if new_state == AlertState::Resolved { + // update state on disk and in memory + ALERTS + .update_state(alert_id, new_state, Some("".into())) + .await?; + } else { + let msg = format!("Not allowed to manually go from Silenced to {new_state}"); + return Err(AlertError::InvalidStateChange(msg)); + } + } + AlertState::Resolved => { + // user shouldn't logically be changing states if current state is Resolved + let msg = format!("Not allowed to go manually from Resolved to {new_state}"); + return Err(AlertError::InvalidStateChange(msg)); + } + } + + Ok("") +} diff --git a/src/handlers/http/cluster/mod.rs b/src/handlers/http/cluster/mod.rs index 4e936c79d..0b8ff51ef 100644 --- a/src/handlers/http/cluster/mod.rs +++ b/src/handlers/http/cluster/mod.rs @@ -776,8 +776,8 @@ async fn fetch_cluster_metrics() -> Result, PostError> { Ok(dresses) } -pub fn init_cluster_metrics_schedular() -> Result<(), PostError> { - info!("Setting up schedular for cluster metrics ingestion"); +pub fn init_cluster_metrics_scheduler() -> Result<(), PostError> { + info!("Setting up scheduler for cluster metrics ingestion"); let mut scheduler = AsyncScheduler::new(); scheduler .every(CLUSTER_METRICS_INTERVAL_SECONDS) diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index cafe56190..61e51a78d 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -24,7 +24,6 @@ use super::modal::utils::logstream_utils::{ create_stream_and_schema_from_storage, create_update_stream, }; use super::query::update_schema_when_distributed; -use crate::alerts::Alerts; use crate::catalog::get_first_event; use crate::event::format::override_data_type; use crate::handlers::STREAM_TYPE_KEY; @@ -157,39 +156,6 @@ pub async fn schema(req: HttpRequest) -> Result { } } -pub async fn get_alert(req: HttpRequest) -> Result { - let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); - - let alerts = metadata::STREAM_INFO - .read() - .expect(metadata::LOCK_EXPECT) - .get(&stream_name) - .map(|metadata| { - serde_json::to_value(&metadata.alerts).expect("alerts can serialize to valid json") - }); - - let mut alerts = match alerts { - Some(alerts) => alerts, - None => { - let alerts = CONFIG - .storage() - .get_object_store() - .get_alerts(&stream_name) - .await?; - - if alerts.alerts.is_empty() { - return Err(StreamError::NoAlertsSet); - } - - serde_json::to_value(alerts).expect("alerts can serialize to valid json") - } - }; - - remove_id_from_alerts(&mut alerts); - - Ok((web::Json(alerts), StatusCode::OK)) -} - pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); @@ -198,73 +164,6 @@ pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result, -) -> Result { - let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); - - let mut body = body.into_inner(); - remove_id_from_alerts(&mut body); - - let alerts: Alerts = match serde_json::from_value(body) { - Ok(alerts) => alerts, - Err(err) => { - return Err(StreamError::BadAlertJson { - stream: stream_name, - err, - }) - } - }; - - validator::alert(&alerts)?; - - if !STREAM_INFO.stream_initialized(&stream_name)? { - // For query mode, if the stream not found in memory map, - //check if it exists in the storage - //create stream and schema from storage - if CONFIG.parseable.mode == Mode::Query { - match create_stream_and_schema_from_storage(&stream_name).await { - Ok(true) => {} - Ok(false) | Err(_) => return Err(StreamError::StreamNotFound(stream_name.clone())), - } - } else { - return Err(StreamError::UninitializedLogstream); - } - } - - let schema = STREAM_INFO.schema(&stream_name)?; - for alert in &alerts.alerts { - for column in alert.message.extract_column_names() { - let is_valid = alert.message.valid(&schema, column); - if !is_valid { - return Err(StreamError::InvalidAlertMessage( - alert.name.to_owned(), - column.to_string(), - )); - } - if !alert.rule.valid_for_schema(&schema) { - return Err(StreamError::InvalidAlert(alert.name.to_owned())); - } - } - } - - CONFIG - .storage() - .get_object_store() - .put_alerts(&stream_name, &alerts) - .await?; - - metadata::STREAM_INFO - .set_alert(&stream_name, alerts) - .expect("alerts set on existing stream"); - - Ok(( - format!("set alert configuration for log stream {stream_name}"), - StatusCode::OK, - )) -} - pub async fn get_retention(req: HttpRequest) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); if !STREAM_INFO.stream_exists(&stream_name) { @@ -473,17 +372,6 @@ pub fn first_event_at_empty(stream_name: &str) -> bool { true } -fn remove_id_from_alerts(value: &mut Value) { - if let Some(Value::Array(alerts)) = value.get_mut("alerts") { - alerts - .iter_mut() - .map_while(|alert| alert.as_object_mut()) - .for_each(|map| { - map.remove("id"); - }); - } -} - pub async fn create_stream( stream_name: String, time_partition: &str, diff --git a/src/handlers/http/mod.rs b/src/handlers/http/mod.rs index 8d8db14c9..c60c7c739 100644 --- a/src/handlers/http/mod.rs +++ b/src/handlers/http/mod.rs @@ -28,6 +28,7 @@ use crate::{option::CONFIG, storage::STREAM_ROOT_DIRECTORY, HTTP_CLIENT}; use self::{cluster::get_ingestor_info, query::Query}; pub mod about; +pub mod alerts; mod audit; pub mod cluster; pub mod correlation; diff --git a/src/handlers/http/modal/query_server.rs b/src/handlers/http/modal/query_server.rs index 0304cffce..bc0604970 100644 --- a/src/handlers/http/modal/query_server.rs +++ b/src/handlers/http/modal/query_server.rs @@ -16,10 +16,11 @@ * */ +use crate::alerts::ALERTS; use crate::correlation::CORRELATIONS; use crate::handlers::airplane; use crate::handlers::http::base_path; -use crate::handlers::http::cluster::{self, init_cluster_metrics_schedular}; +use crate::handlers::http::cluster::{self, init_cluster_metrics_scheduler}; use crate::handlers::http::logstream::create_internal_stream_if_not_exists; use crate::handlers::http::middleware::{DisAllowRootUser, RouteExt}; use crate::handlers::http::{self, role}; @@ -64,6 +65,7 @@ impl ParseableServer for QueryServer { .service(Server::get_oauth_webscope(oidc_client)) .service(Self::get_user_role_webscope()) .service(Server::get_metrics_webscope()) + .service(Server::get_alerts_webscope()) .service(Self::get_cluster_web_scope()), ) .service(Server::get_generated()); @@ -97,8 +99,18 @@ impl ParseableServer for QueryServer { if let Err(e) = CORRELATIONS.load().await { error!("{e}"); } - FILTERS.load().await?; - DASHBOARDS.load().await?; + if let Err(err) = FILTERS.load().await { + error!("{err}") + }; + + if let Err(err) = DASHBOARDS.load().await { + error!("{err}") + }; + + if let Err(err) = ALERTS.load().await { + error!("{err}") + }; + // track all parquet files already in the data directory storage::retention::load_retention_from_global(); @@ -108,7 +120,7 @@ impl ParseableServer for QueryServer { analytics::init_analytics_scheduler()?; } - if matches!(init_cluster_metrics_schedular(), Ok(())) { + if matches!(init_cluster_metrics_scheduler(), Ok(())) { info!("Cluster metrics scheduler started successfully"); } if let Some(hot_tier_manager) = HotTierManager::global() { @@ -283,21 +295,6 @@ impl QueryServer { .authorize_for_stream(Action::GetStreamInfo), ), ) - .service( - web::resource("/alert") - // PUT "/logstream/{logstream}/alert" ==> Set alert for given log stream - .route( - web::put() - .to(logstream::put_alert) - .authorize_for_stream(Action::PutAlert), - ) - // GET "/logstream/{logstream}/alert" ==> Get alert for given log stream - .route( - web::get() - .to(logstream::get_alert) - .authorize_for_stream(Action::GetAlert), - ), - ) .service( // GET "/logstream/{logstream}/schema" ==> Get schema for given log stream web::resource("/schema").route( diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs index 556a77123..99419ad56 100644 --- a/src/handlers/http/modal/server.rs +++ b/src/handlers/http/modal/server.rs @@ -16,10 +16,12 @@ * */ +use crate::alerts::ALERTS; use crate::analytics; use crate::correlation::CORRELATIONS; use crate::handlers; use crate::handlers::http::about; +use crate::handlers::http::alerts; use crate::handlers::http::base_path; use crate::handlers::http::health_check; use crate::handlers::http::query; @@ -79,7 +81,8 @@ impl ParseableServer for Server { .service(Self::get_llm_webscope()) .service(Self::get_oauth_webscope(oidc_client)) .service(Self::get_user_role_webscope()) - .service(Self::get_metrics_webscope()), + .service(Self::get_metrics_webscope()) + .service(Self::get_alerts_webscope()), ) .service(Self::get_ingest_otel_factory()) .service(Self::get_generated()); @@ -103,8 +106,17 @@ impl ParseableServer for Server { if let Err(e) = CORRELATIONS.load().await { error!("{e}"); } - FILTERS.load().await?; - DASHBOARDS.load().await?; + if let Err(err) = FILTERS.load().await { + error!("{err}") + }; + + if let Err(err) = DASHBOARDS.load().await { + error!("{err}") + }; + + if let Err(err) = ALERTS.load().await { + error!("{err}") + }; storage::retention::load_retention_from_global(); @@ -202,6 +214,32 @@ impl Server { ) } + pub fn get_alerts_webscope() -> Scope { + web::scope("/alerts") + .service( + web::resource("") + .route(web::get().to(alerts::list).authorize(Action::GetAlert)) + .route(web::post().to(alerts::post).authorize(Action::PutAlert)), + ) + .service( + web::resource("/{alert_id}") + .route(web::get().to(alerts::get).authorize(Action::GetAlert)) + .route(web::put().to(alerts::modify).authorize(Action::PutAlert)) + .route( + web::delete() + .to(alerts::delete) + .authorize(Action::DeleteAlert), + ), + ) + .service( + web::resource("/{alert_id}/update_state").route( + web::put() + .to(alerts::update_state) + .authorize(Action::PutAlert), + ), + ) + } + // get the dashboards web scope pub fn get_dashboards_webscope() -> Scope { web::scope("/dashboards") @@ -323,21 +361,6 @@ impl Server { .authorize_for_stream(Action::GetStreamInfo), ), ) - .service( - web::resource("/alert") - // PUT "/logstream/{logstream}/alert" ==> Set alert for given log stream - .route( - web::put() - .to(logstream::put_alert) - .authorize_for_stream(Action::PutAlert), - ) - // GET "/logstream/{logstream}/alert" ==> Get alert for given log stream - .route( - web::get() - .to(logstream::get_alert) - .authorize_for_stream(Action::GetAlert), - ), - ) .service( // GET "/logstream/{logstream}/schema" ==> Get schema for given log stream web::resource("/schema").route( diff --git a/src/lib.rs b/src/lib.rs index 973406cb5..52a422520 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -17,7 +17,7 @@ */ pub mod about; -mod alerts; +pub mod alerts; pub mod analytics; pub mod audit; pub mod banner; diff --git a/src/metadata.rs b/src/metadata.rs index 94954c4c0..cdd51898b 100644 --- a/src/metadata.rs +++ b/src/metadata.rs @@ -16,7 +16,6 @@ * */ -use arrow_array::RecordBatch; use arrow_schema::{DataType, Field, Fields, Schema, TimeUnit}; use chrono::{Local, NaiveDateTime}; use itertools::Itertools; @@ -27,8 +26,7 @@ use std::collections::HashMap; use std::num::NonZeroU32; use std::sync::{Arc, RwLock}; -use self::error::stream_info::{CheckAlertError, LoadError, MetadataError}; -use crate::alerts::Alerts; +use self::error::stream_info::{LoadError, MetadataError}; use crate::catalog::snapshot::ManifestItem; use crate::metrics::{ fetch_stats_from_storage, EVENTS_INGESTED, EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE, @@ -65,7 +63,6 @@ pub enum SchemaVersion { pub struct LogStreamMetadata { pub schema_version: SchemaVersion, pub schema: HashMap>, - pub alerts: Alerts, pub retention: Option, pub created_at: String, pub first_event_at: Option, @@ -87,32 +84,11 @@ pub const LOCK_EXPECT: &str = "no method in metadata should panic while holding // 4. When first event is sent to stream (update the schema) // 5. When set alert API is called (update the alert) impl StreamInfo { - pub async fn check_alerts( - &self, - stream_name: &str, - rb: &RecordBatch, - ) -> Result<(), CheckAlertError> { - let map = self.read().expect(LOCK_EXPECT); - let meta = map - .get(stream_name) - .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_owned()))?; - - for alert in &meta.alerts.alerts { - alert.check_alert(stream_name, rb.clone()) - } - - Ok(()) - } - pub fn stream_exists(&self, stream_name: &str) -> bool { let map = self.read().expect(LOCK_EXPECT); map.contains_key(stream_name) } - pub fn stream_initialized(&self, stream_name: &str) -> Result { - Ok(!self.schema(stream_name)?.fields.is_empty()) - } - pub fn get_first_event(&self, stream_name: &str) -> Result, MetadataError> { let map = self.read().expect(LOCK_EXPECT); map.get(stream_name) @@ -185,15 +161,6 @@ impl StreamInfo { Ok(Arc::new(schema)) } - pub fn set_alert(&self, stream_name: &str, alerts: Alerts) -> Result<(), MetadataError> { - let mut map = self.write().expect(LOCK_EXPECT); - map.get_mut(stream_name) - .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string())) - .map(|metadata| { - metadata.alerts = alerts; - }) - } - pub fn set_retention( &self, stream_name: &str, @@ -444,7 +411,6 @@ pub async fn load_stream_metadata_on_server_start( fetch_stats_from_storage(stream_name, stats).await; load_daily_metrics(&snapshot.manifest_list, stream_name); - let alerts = storage.get_alerts(stream_name).await?; let schema = update_schema_from_staging(stream_name, schema); let schema = HashMap::from_iter( schema @@ -456,7 +422,6 @@ pub async fn load_stream_metadata_on_server_start( let metadata = LogStreamMetadata { schema_version, schema, - alerts, retention, created_at, first_event_at, diff --git a/src/query/mod.rs b/src/query/mod.rs index b86573d27..87b51e291 100644 --- a/src/query/mod.rs +++ b/src/query/mod.rs @@ -151,6 +151,16 @@ impl Query { Ok((results, fields)) } + pub async fn get_dataframe(&self, stream_name: String) -> Result { + let time_partition = STREAM_INFO.get_time_partition(&stream_name)?; + + let df = QUERY_SESSION + .execute_logical_plan(self.final_logical_plan(&time_partition)) + .await?; + + Ok(df) + } + /// return logical plan with all time filters applied through fn final_logical_plan(&self, time_partition: &Option) -> LogicalPlan { // see https://github.com/apache/arrow-datafusion/pull/8400 @@ -196,7 +206,7 @@ impl Query { } #[derive(Debug, Default)] -pub(crate) struct TableScanVisitor { +pub struct TableScanVisitor { tables: Vec, } diff --git a/src/rbac/role.rs b/src/rbac/role.rs index bc1deb58e..00208631c 100644 --- a/src/rbac/role.rs +++ b/src/rbac/role.rs @@ -36,6 +36,7 @@ pub enum Action { DeleteHotTierEnabled, PutAlert, GetAlert, + DeleteAlert, PutUser, ListUser, DeleteUser, @@ -139,6 +140,9 @@ impl RoleBuilder { | Action::ListFilter | Action::CreateFilter | Action::DeleteFilter + | Action::PutAlert + | Action::GetAlert + | Action::DeleteAlert | Action::GetAnalytics => Permission::Unit(action), Action::Ingest | Action::ListStream @@ -147,8 +151,6 @@ impl RoleBuilder { | Action::GetStats | Action::GetRetention | Action::PutRetention - | Action::PutAlert - | Action::GetAlert | Action::All => Permission::Stream(action, self.stream.clone().unwrap()), }; perms.push(perm); @@ -230,6 +232,7 @@ pub mod model { Action::DeleteHotTierEnabled, Action::PutAlert, Action::GetAlert, + Action::DeleteAlert, Action::QueryLLM, Action::CreateFilter, Action::ListFilter, @@ -258,6 +261,7 @@ pub mod model { Action::PutRetention, Action::PutAlert, Action::GetAlert, + Action::DeleteAlert, Action::GetRetention, Action::PutHotTierEnabled, Action::GetHotTierEnabled, @@ -308,6 +312,7 @@ pub mod model { Action::DeleteDashboard, Action::GetStreamInfo, Action::GetUserRoles, + Action::GetAlert, ], stream: None, tag: None, diff --git a/src/storage/localfs.rs b/src/storage/localfs.rs index 64ae1f9a2..b4bf1dcc4 100644 --- a/src/storage/localfs.rs +++ b/src/storage/localfs.rs @@ -39,7 +39,7 @@ use crate::{ }; use super::{ - LogStream, ObjectStorage, ObjectStorageError, ObjectStorageProvider, + LogStream, ObjectStorage, ObjectStorageError, ObjectStorageProvider, ALERTS_ROOT_DIRECTORY, CORRELATIONS_ROOT_DIRECTORY, PARSEABLE_ROOT_DIRECTORY, SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY, }; @@ -302,6 +302,7 @@ impl ObjectStorage for LocalFS { PARSEABLE_ROOT_DIRECTORY, USERS_ROOT_DIR, CORRELATIONS_ROOT_DIRECTORY, + ALERTS_ROOT_DIRECTORY, ]; let directories = ReadDirStream::new(fs::read_dir(&self.root).await?); let entries: Vec = directories.try_collect().await?; @@ -326,6 +327,7 @@ impl ObjectStorage for LocalFS { "lost+found", PARSEABLE_ROOT_DIRECTORY, CORRELATIONS_ROOT_DIRECTORY, + ALERTS_ROOT_DIRECTORY, ]; let directories = ReadDirStream::new(fs::read_dir(&self.root).await?); let entries: Vec = directories.try_collect().await?; diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 573800812..9d0f8b35e 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -48,6 +48,7 @@ pub use store_metadata::{ }; // metadata file names in a Stream prefix +pub const ALERTS_ROOT_DIRECTORY: &str = ".alerts"; pub const STREAM_METADATA_FILE_NAME: &str = ".stream.json"; pub const PARSEABLE_METADATA_FILE_NAME: &str = ".parseable.json"; pub const STREAM_ROOT_DIRECTORY: &str = ".stream"; diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index 7252cfaf0..6a6621db2 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -21,11 +21,12 @@ use super::{ ObjectStoreFormat, Permisssion, StorageDir, StorageMetadata, }; use super::{ - Owner, ALERT_FILE_NAME, CORRELATIONS_ROOT_DIRECTORY, MANIFEST_FILE, + Owner, ALERTS_ROOT_DIRECTORY, CORRELATIONS_ROOT_DIRECTORY, MANIFEST_FILE, PARSEABLE_METADATA_FILE_NAME, PARSEABLE_ROOT_DIRECTORY, SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY, }; +use crate::alerts::AlertConfig; use crate::correlation::{CorrelationConfig, CorrelationError}; use crate::handlers::http::modal::ingest_server::INGESTOR_META; use crate::handlers::http::users::{DASHBOARDS_DIR, FILTER_DIR, USERS_ROOT_DIR}; @@ -33,7 +34,7 @@ use crate::metadata::SchemaVersion; use crate::metrics::{EVENTS_STORAGE_SIZE_DATE, LIFETIME_EVENTS_STORAGE_SIZE}; use crate::option::Mode; use crate::{ - alerts::Alerts, + // alerts::Alerts, catalog::{self, manifest::Manifest, snapshot::Snapshot}, metadata::STREAM_INFO, metrics::{storage::StorageMetrics, STORAGE_SIZE}, @@ -213,12 +214,12 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { Ok(()) } - async fn put_alerts( + async fn put_alert( &self, - stream_name: &str, - alerts: &Alerts, + alert_id: &str, + alert: &AlertConfig, ) -> Result<(), ObjectStorageError> { - self.put_object(&alert_json_path(stream_name), to_bytes(alerts)) + self.put_object(&alert_json_path(alert_id), to_bytes(alert)) .await } @@ -295,21 +296,16 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { Ok(serde_json::from_slice(&schema_map)?) } - async fn get_alerts(&self, stream_name: &str) -> Result { - match self.get_object(&alert_json_path(stream_name)).await { - Ok(alerts) => { - if let Ok(alerts) = serde_json::from_slice(&alerts) { - Ok(alerts) - } else { - error!("Incompatible alerts found for stream - {stream_name}. Refer https://www.parseable.io/docs/alerts for correct alert config."); - Ok(Alerts::default()) - } - } - Err(e) => match e { - ObjectStorageError::NoSuchKey(_) => Ok(Alerts::default()), - e => Err(e), - }, - } + async fn get_alerts(&self) -> Result, ObjectStorageError> { + let alerts_path = RelativePathBuf::from_iter([ALERTS_ROOT_DIRECTORY]); + let alerts_bytes = self + .get_objects( + Some(&alerts_path), + Box::new(|file_name| file_name.ends_with(".json")), + ) + .await?; + + Ok(alerts_bytes) } async fn upsert_stream_metadata( @@ -726,8 +722,9 @@ pub fn parseable_json_path() -> RelativePathBuf { /// TODO: Needs to be updated for distributed mode #[inline(always)] -fn alert_json_path(stream_name: &str) -> RelativePathBuf { - RelativePathBuf::from_iter([stream_name, STREAM_ROOT_DIRECTORY, ALERT_FILE_NAME]) +pub fn alert_json_path(alert_id: &str) -> RelativePathBuf { + RelativePathBuf::from_iter([ALERTS_ROOT_DIRECTORY, &format!("{alert_id}.json")]) + // RelativePathBuf::from_iter([stream_name, STREAM_ROOT_DIRECTORY, ALERT_FILE_NAME]) } #[inline(always)] diff --git a/src/sync.rs b/src/sync.rs index 2a06d88aa..d843e3a3a 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -23,6 +23,7 @@ use tokio::task; use tokio::time::{interval, sleep, Duration}; use tracing::{error, info, warn}; +use crate::alerts::{alerts_utils, AlertConfig, AlertError}; use crate::option::CONFIG; use crate::{storage, STORAGE_UPLOAD_INTERVAL}; @@ -134,3 +135,64 @@ pub async fn run_local_sync() -> ( (handle, outbox_rx, inbox_tx) } + +pub async fn schedule_alert_task( + eval_frequency: u32, + alert: AlertConfig, +) -> Result< + ( + task::JoinHandle<()>, + oneshot::Receiver<()>, + oneshot::Sender<()>, + ), + AlertError, +> { + let (outbox_tx, outbox_rx) = oneshot::channel::<()>(); + let (inbox_tx, inbox_rx) = oneshot::channel::<()>(); + + let handle = tokio::task::spawn(async move { + info!("new alert task started for {alert:?}"); + + let result = std::panic::catch_unwind(AssertUnwindSafe(|| async move { + let mut scheduler = AsyncScheduler::new(); + scheduler.every((eval_frequency).minutes()).run(move || { + let alert_val = alert.clone(); + async move { + match alerts_utils::evaluate_alert(&alert_val).await { + Ok(_) => {} + Err(err) => error!("Error while evaluation- {err}"), + } + } + }); + let mut inbox_rx = AssertUnwindSafe(inbox_rx); + let mut check_interval = interval(Duration::from_secs(1)); + + loop { + // Run any pending scheduled tasks + check_interval.tick().await; + scheduler.run_pending().await; + + // Check inbox + match inbox_rx.try_recv() { + Ok(_) => break, + Err(tokio::sync::oneshot::error::TryRecvError::Empty) => continue, + Err(tokio::sync::oneshot::error::TryRecvError::Closed) => { + warn!("Inbox channel closed unexpectedly"); + break; + } + } + } + })); + + match result { + Ok(future) => { + future.await; + } + Err(panic_error) => { + error!("Panic in scheduled alert task: {:?}", panic_error); + let _ = outbox_tx.send(()); + } + } + }); + Ok((handle, outbox_rx, inbox_tx)) +} diff --git a/src/utils/json/flatten.rs b/src/utils/json/flatten.rs index 58809618c..f5a897764 100644 --- a/src/utils/json/flatten.rs +++ b/src/utils/json/flatten.rs @@ -178,24 +178,6 @@ pub fn validate_time_partition( } } -// Flattens starting from only object types at TOP, e.g. with the parent_key `root` and separator `_` -// `{ "a": { "b": 1, c: { "d": 2 } } }` becomes `{"root_a_b":1,"root_a_c_d":2}` -pub fn flatten_with_parent_prefix( - nested_value: &mut Value, - prefix: &str, - separator: &str, -) -> Result<(), JsonFlattenError> { - let Value::Object(nested_obj) = nested_value else { - return Err(JsonFlattenError::NonObjectInArray); - }; - - let mut map = Map::new(); - flatten_object(&mut map, Some(prefix), nested_obj, separator)?; - *nested_obj = map; - - Ok(()) -} - // Flattens a nested JSON Object/Map into another target Map fn flatten_object( output_map: &mut Map, diff --git a/src/utils/json/mod.rs b/src/utils/json/mod.rs index 0f5c05812..efa9cb2e2 100644 --- a/src/utils/json/mod.rs +++ b/src/utils/json/mod.rs @@ -89,26 +89,6 @@ pub fn convert_array_to_object( Ok(value_arr) } -pub fn convert_to_string(value: &Value) -> Value { - match value { - Value::Null => Value::String("null".to_owned()), - Value::Bool(b) => Value::String(b.to_string()), - Value::Number(n) => Value::String(n.to_string()), - Value::String(s) => Value::String(s.to_owned()), - Value::Array(v) => { - let new_vec = v.iter().map(convert_to_string).collect(); - Value::Array(new_vec) - } - Value::Object(map) => { - let new_map = map - .iter() - .map(|(k, v)| (k.clone(), convert_to_string(v))) - .collect(); - Value::Object(new_map) - } - } -} - struct TrueFromStr; impl Visitor<'_> for TrueFromStr { diff --git a/src/validator.rs b/src/validator.rs index bfa1dae02..9584ae504 100644 --- a/src/validator.rs +++ b/src/validator.rs @@ -18,10 +18,7 @@ use error::HotTierValidationError; -use self::error::{AlertValidationError, StreamNameValidationError, UsernameValidationError}; -use crate::alerts::rule::base::{NumericRule, StringRule}; -use crate::alerts::rule::{ColumnRule, ConsecutiveNumericRule, ConsecutiveStringRule}; -use crate::alerts::{Alerts, Rule}; +use self::error::{StreamNameValidationError, UsernameValidationError}; use crate::hottier::MIN_STREAM_HOT_TIER_SIZE_BYTES; use crate::option::validation::bytes_to_human_size; use crate::storage::StreamType; @@ -33,50 +30,6 @@ const DENIED_NAMES: &[&str] = &[ const ALLOWED_SPECIAL_CHARS: &[char] = &['-', '_']; -pub fn alert(alerts: &Alerts) -> Result<(), AlertValidationError> { - let alert_name: Vec<&str> = alerts.alerts.iter().map(|a| a.name.as_str()).collect(); - let mut alert_name_dedup = alert_name.clone(); - alert_name_dedup.sort(); - alert_name_dedup.dedup(); - - if alert_name.len() != alert_name_dedup.len() { - return Err(AlertValidationError::ExistingName); - } - for alert in &alerts.alerts { - if alert.name.is_empty() { - return Err(AlertValidationError::EmptyName); - } - - if alert.message.message.is_empty() { - return Err(AlertValidationError::EmptyMessage); - } - if alert.targets.is_empty() { - return Err(AlertValidationError::NoTarget); - } - - if let Rule::Column(ref column_rule) = alert.rule { - match column_rule { - ColumnRule::ConsecutiveNumeric(ConsecutiveNumericRule { - base_rule: NumericRule { ref column, .. }, - ref state, - }) - | ColumnRule::ConsecutiveString(ConsecutiveStringRule { - base_rule: StringRule { ref column, .. }, - ref state, - }) => { - if column.is_empty() { - return Err(AlertValidationError::EmptyRuleField); - } - if state.repeats == 0 { - return Err(AlertValidationError::InvalidRuleRepeat); - } - } - } - } - } - Ok(()) -} - pub fn stream_name(stream_name: &str, stream_type: &str) -> Result<(), StreamNameValidationError> { if stream_name.is_empty() { return Err(StreamNameValidationError::EmptyName);