diff --git a/src/alerts/alerts_utils.rs b/src/alerts/alerts_utils.rs index 3b46375d3..2e2e8a443 100644 --- a/src/alerts/alerts_utils.rs +++ b/src/alerts/alerts_utils.rs @@ -347,11 +347,7 @@ pub fn get_filter_string(where_clause: &Conditions) -> Result { LogicalOperator::And => { let mut exprs = vec![]; for condition in &where_clause.condition_config { - if condition - .value - .as_ref() - .is_some_and(|v| !v.trim().is_empty()) - { + if condition.value.as_ref().is_some_and(|v| !v.is_empty()) { // ad-hoc error check in case value is some and operator is either `is null` or `is not null` if condition.operator.eq(&WhereConfigOperator::IsNull) || condition.operator.eq(&WhereConfigOperator::IsNotNull) @@ -359,21 +355,70 @@ pub fn get_filter_string(where_clause: &Conditions) -> Result { return Err("value must be null when operator is either `is null` or `is not null`" .into()); } - let value = NumberOrString::from_string( - condition.value.as_ref().unwrap().to_owned(), - ); - match value { - NumberOrString::Number(val) => exprs.push(format!( - "\"{}\" {} {}", - condition.column, condition.operator, val - )), - NumberOrString::String(val) => exprs.push(format!( - "\"{}\" {} '{}'", - condition.column, - condition.operator, - val.replace('\'', "''") - )), - } + + let value = condition.value.as_ref().unwrap(); + + let operator_and_value = match condition.operator { + WhereConfigOperator::Contains => { + let escaped_value = value + .replace("'", "\\'") + .replace('%', "\\%") + .replace('_', "\\_"); + format!("LIKE '%{escaped_value}%' ESCAPE '\\'") + } + WhereConfigOperator::DoesNotContain => { + let escaped_value = value + .replace("'", "\\'") + .replace('%', "\\%") + .replace('_', "\\_"); + format!("NOT LIKE '%{escaped_value}%' ESCAPE '\\'") + } + WhereConfigOperator::ILike => { + let escaped_value = value + .replace("'", "\\'") + .replace('%', "\\%") + .replace('_', "\\_"); + format!("ILIKE '%{escaped_value}%' ESCAPE '\\'") + } + WhereConfigOperator::BeginsWith => { + let escaped_value = value + .replace("'", "\\'") + .replace('%', "\\%") + .replace('_', "\\_"); + format!("LIKE '{escaped_value}%' ESCAPE '\\'") + } + WhereConfigOperator::DoesNotBeginWith => { + let escaped_value = value + .replace("'", "\\'") + .replace('%', "\\%") + .replace('_', "\\_"); + format!("NOT LIKE '{escaped_value}%' ESCAPE '\\'") + } + WhereConfigOperator::EndsWith => { + let escaped_value = value + .replace("'", "\\'") + .replace('%', "\\%") + .replace('_', "\\_"); + format!("LIKE '%{escaped_value}' ESCAPE '\\'") + } + WhereConfigOperator::DoesNotEndWith => { + let escaped_value = value + .replace("'", "\\'") + .replace('%', "\\%") + .replace('_', "\\_"); + format!("NOT LIKE '%{escaped_value}' ESCAPE '\\'") + } + _ => { + let value = match NumberOrString::from_string(value.to_owned()) { + NumberOrString::Number(val) => format!("{val}"), + NumberOrString::String(val) => { + format!("'{}'", val) + } + }; + format!("{} {}", condition.operator, value) + } + }; + exprs.push(format!("\"{}\" {}", condition.column, operator_and_value)) } else { exprs.push(format!("\"{}\" {}", condition.column, condition.operator)) } @@ -393,7 +438,7 @@ fn match_alert_operator(expr: &ConditionConfig) -> Expr { // the form accepts value as a string // if it can be parsed as a number, then parse it // else keep it as a string - if expr.value.as_ref().is_some_and(|v| !v.trim().is_empty()) { + if expr.value.as_ref().is_some_and(|v| !v.is_empty()) { let value = expr.value.as_ref().unwrap(); let value = NumberOrString::from_string(value.clone()); diff --git a/src/alerts/mod.rs b/src/alerts/mod.rs index 814d1bb4b..be1aa1c21 100644 --- a/src/alerts/mod.rs +++ b/src/alerts/mod.rs @@ -343,7 +343,7 @@ impl Conditions { LogicalOperator::And | LogicalOperator::Or => { let expr1 = &self.condition_config[0]; let expr2 = &self.condition_config[1]; - let expr1_msg = if expr1.value.as_ref().is_some_and(|v| !v.trim().is_empty()) { + let expr1_msg = if expr1.value.as_ref().is_some_and(|v| !v.is_empty()) { format!( "{} {} {}", expr1.column, @@ -354,7 +354,7 @@ impl Conditions { format!("{} {}", expr1.column, expr1.operator) }; - let expr2_msg = if expr2.value.as_ref().is_some_and(|v| !v.trim().is_empty()) { + let expr2_msg = if expr2.value.as_ref().is_some_and(|v| !v.is_empty()) { format!( "{} {} {}", expr2.column, @@ -671,18 +671,13 @@ impl AlertConfig { WhereConfigOperator::IsNull | WhereConfigOperator::IsNotNull ); - if needs_no_value - && condition - .value - .as_ref() - .is_some_and(|v| !v.trim().is_empty()) - { + if needs_no_value && condition.value.as_ref().is_some_and(|v| !v.is_empty()) { return Err(AlertError::CustomError( "value must be null when operator is either `is null` or `is not null`" .into(), )); } - if !needs_no_value && condition.value.as_ref().is_none_or(|v| v.trim().is_empty()) { + if !needs_no_value && condition.value.as_ref().is_none_or(|v| v.is_empty()) { return Err(AlertError::CustomError( "value must not be null when operator is neither `is null` nor `is not null`" .into(), diff --git a/src/handlers/http/mod.rs b/src/handlers/http/mod.rs index 6b0e71cbd..69890736d 100644 --- a/src/handlers/http/mod.rs +++ b/src/handlers/http/mod.rs @@ -36,7 +36,6 @@ pub mod cluster; pub mod correlation; pub mod health_check; pub mod ingest; -pub mod resource_check; mod kinesis; pub mod llm; pub mod logstream; @@ -47,6 +46,7 @@ pub mod prism_home; pub mod prism_logstream; pub mod query; pub mod rbac; +pub mod resource_check; pub mod role; pub mod users; pub const MAX_EVENT_PAYLOAD_SIZE: usize = 10485760; diff --git a/src/handlers/http/modal/ingest_server.rs b/src/handlers/http/modal/ingest_server.rs index c202a92c6..e0d1c85c9 100644 --- a/src/handlers/http/modal/ingest_server.rs +++ b/src/handlers/http/modal/ingest_server.rs @@ -19,9 +19,9 @@ use std::sync::Arc; use std::thread; +use actix_web::middleware::from_fn; use actix_web::web; use actix_web::Scope; -use actix_web::middleware::from_fn; use actix_web_prometheus::PrometheusMetrics; use async_trait::async_trait; use base64::Engine; @@ -68,10 +68,9 @@ impl ParseableServer for IngestServer { .service( // Base path "{url}/api/v1" web::scope(&base_path()) - .service( - Server::get_ingest_factory() - .wrap(from_fn(resource_check::check_resource_utilization_middleware)) - ) + .service(Server::get_ingest_factory().wrap(from_fn( + resource_check::check_resource_utilization_middleware, + ))) .service(Self::logstream_api()) .service(Server::get_about_factory()) .service(Self::analytics_factory()) @@ -81,10 +80,9 @@ impl ParseableServer for IngestServer { .service(Server::get_metrics_webscope()) .service(Server::get_readiness_factory()), ) - .service( - Server::get_ingest_otel_factory() - .wrap(from_fn(resource_check::check_resource_utilization_middleware)) - ); + .service(Server::get_ingest_otel_factory().wrap(from_fn( + resource_check::check_resource_utilization_middleware, + ))); } async fn load_metadata(&self) -> anyhow::Result> { @@ -231,7 +229,9 @@ impl IngestServer { .to(ingest::post_event) .authorize_for_stream(Action::Ingest), ) - .wrap(from_fn(resource_check::check_resource_utilization_middleware)), + .wrap(from_fn( + resource_check::check_resource_utilization_middleware, + )), ) .service( web::resource("/sync") diff --git a/src/handlers/http/modal/query_server.rs b/src/handlers/http/modal/query_server.rs index 75b10ce7b..d40ab73df 100644 --- a/src/handlers/http/modal/query_server.rs +++ b/src/handlers/http/modal/query_server.rs @@ -54,10 +54,9 @@ impl ParseableServer for QueryServer { .service( web::scope(&base_path()) .service(Server::get_correlation_webscope()) - .service( - Server::get_query_factory() - .wrap(from_fn(resource_check::check_resource_utilization_middleware)) - ) + .service(Server::get_query_factory().wrap(from_fn( + resource_check::check_resource_utilization_middleware, + ))) .service(Server::get_liveness_factory()) .service(Server::get_readiness_factory()) .service(Server::get_about_factory()) @@ -70,10 +69,9 @@ impl ParseableServer for QueryServer { .service(Server::get_oauth_webscope(oidc_client)) .service(Self::get_user_role_webscope()) .service(Server::get_roles_webscope()) - .service( - Server::get_counts_webscope() - .wrap(from_fn(resource_check::check_resource_utilization_middleware)) - ) + .service(Server::get_counts_webscope().wrap(from_fn( + resource_check::check_resource_utilization_middleware, + ))) .service(Server::get_metrics_webscope()) .service(Server::get_alerts_webscope()) .service(Self::get_cluster_web_scope()), diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs index ece5374de..0eff13882 100644 --- a/src/handlers/http/modal/server.rs +++ b/src/handlers/http/modal/server.rs @@ -25,8 +25,8 @@ use crate::handlers::http::alerts; use crate::handlers::http::base_path; use crate::handlers::http::health_check; use crate::handlers::http::prism_base_path; -use crate::handlers::http::resource_check; use crate::handlers::http::query; +use crate::handlers::http::resource_check; use crate::handlers::http::users::dashboards; use crate::handlers::http::users::filters; use crate::hottier::HotTierManager; @@ -36,9 +36,9 @@ use crate::storage; use crate::sync; use crate::sync::sync_start; +use actix_web::middleware::from_fn; use actix_web::web; use actix_web::web::resource; -use actix_web::middleware::from_fn; use actix_web::Resource; use actix_web::Scope; use actix_web_prometheus::PrometheusMetrics; @@ -73,14 +73,12 @@ impl ParseableServer for Server { .service( web::scope(&base_path()) .service(Self::get_correlation_webscope()) - .service( - Self::get_query_factory() - .wrap(from_fn(resource_check::check_resource_utilization_middleware)) - ) - .service( - Self::get_ingest_factory() - .wrap(from_fn(resource_check::check_resource_utilization_middleware)) - ) + .service(Self::get_query_factory().wrap(from_fn( + resource_check::check_resource_utilization_middleware, + ))) + .service(Self::get_ingest_factory().wrap(from_fn( + resource_check::check_resource_utilization_middleware, + ))) .service(Self::get_liveness_factory()) .service(Self::get_readiness_factory()) .service(Self::get_about_factory()) @@ -93,10 +91,9 @@ impl ParseableServer for Server { .service(Self::get_oauth_webscope(oidc_client)) .service(Self::get_user_role_webscope()) .service(Self::get_roles_webscope()) - .service( - Self::get_counts_webscope() - .wrap(from_fn(resource_check::check_resource_utilization_middleware)) - ) + .service(Self::get_counts_webscope().wrap(from_fn( + resource_check::check_resource_utilization_middleware, + ))) .service(Self::get_alerts_webscope()) .service(Self::get_metrics_webscope()), ) @@ -106,10 +103,9 @@ impl ParseableServer for Server { .service(Server::get_prism_logstream()) .service(Server::get_prism_datasets()), ) - .service( - Self::get_ingest_otel_factory() - .wrap(from_fn(resource_check::check_resource_utilization_middleware)) - ) + .service(Self::get_ingest_otel_factory().wrap(from_fn( + resource_check::check_resource_utilization_middleware, + ))) .service(Self::get_generated()); } @@ -367,14 +363,16 @@ impl Server { .route( web::put() .to(logstream::put_stream) - .authorize_for_stream(Action::CreateStream) + .authorize_for_stream(Action::CreateStream), ) // POST "/logstream/{logstream}" ==> Post logs to given log stream .route( web::post() .to(ingest::post_event) .authorize_for_stream(Action::Ingest) - .wrap(from_fn(resource_check::check_resource_utilization_middleware)), + .wrap(from_fn( + resource_check::check_resource_utilization_middleware, + )), ) // DELETE "/logstream/{logstream}" ==> Delete log stream .route( @@ -383,7 +381,7 @@ impl Server { .authorize_for_stream(Action::DeleteStream), ) .app_data(web::JsonConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)), - ) + ) .service( // GET "/logstream/{logstream}/info" ==> Get info for given log stream web::resource("/info").route( diff --git a/src/handlers/http/resource_check.rs b/src/handlers/http/resource_check.rs index e1f08285f..f5cdb67c4 100644 --- a/src/handlers/http/resource_check.rs +++ b/src/handlers/http/resource_check.rs @@ -25,13 +25,17 @@ use actix_web::{ error::ErrorServiceUnavailable, middleware::Next, }; -use tokio::{select, time::{interval, Duration}}; -use tracing::{warn, trace, info}; +use tokio::{ + select, + time::{interval, Duration}, +}; +use tracing::{info, trace, warn}; -use crate::analytics::{SYS_INFO, refresh_sys_info}; +use crate::analytics::{refresh_sys_info, SYS_INFO}; use crate::parseable::PARSEABLE; -static RESOURCE_CHECK_ENABLED:LazyLock> = LazyLock::new(|| Arc::new(AtomicBool::new(false))); +static RESOURCE_CHECK_ENABLED: LazyLock> = + LazyLock::new(|| Arc::new(AtomicBool::new(false))); /// Spawn a background task to monitor system resources pub fn spawn_resource_monitor(shutdown_rx: tokio::sync::oneshot::Receiver<()>) { @@ -39,17 +43,19 @@ pub fn spawn_resource_monitor(shutdown_rx: tokio::sync::oneshot::Receiver<()>) { let resource_check_interval = PARSEABLE.options.resource_check_interval; let mut check_interval = interval(Duration::from_secs(resource_check_interval)); let mut shutdown_rx = shutdown_rx; - + let cpu_threshold = PARSEABLE.options.cpu_utilization_threshold; let memory_threshold = PARSEABLE.options.memory_utilization_threshold; - - info!("Resource monitor started with thresholds - CPU: {:.1}%, Memory: {:.1}%", - cpu_threshold, memory_threshold); + + info!( + "Resource monitor started with thresholds - CPU: {:.1}%, Memory: {:.1}%", + cpu_threshold, memory_threshold + ); loop { select! { _ = check_interval.tick() => { trace!("Checking system resource utilization..."); - + refresh_sys_info(); let (used_memory, total_memory, cpu_usage) = tokio::task::spawn_blocking(|| { let sys = SYS_INFO.lock().unwrap(); @@ -58,36 +64,36 @@ pub fn spawn_resource_monitor(shutdown_rx: tokio::sync::oneshot::Receiver<()>) { let cpu_usage = sys.global_cpu_usage(); (used_memory, total_memory, cpu_usage) }).await.unwrap(); - + let mut resource_ok = true; - + // Calculate memory usage percentage let memory_usage = if total_memory > 0.0 { (used_memory / total_memory) * 100.0 } else { 0.0 }; - + // Log current resource usage every few checks for debugging - info!("Current resource usage - CPU: {:.1}%, Memory: {:.1}% ({:.1}GB/{:.1}GB)", - cpu_usage, memory_usage, - used_memory / 1024.0 / 1024.0 / 1024.0, + info!("Current resource usage - CPU: {:.1}%, Memory: {:.1}% ({:.1}GB/{:.1}GB)", + cpu_usage, memory_usage, + used_memory / 1024.0 / 1024.0 / 1024.0, total_memory / 1024.0 / 1024.0 / 1024.0); - + // Check memory utilization if memory_usage > memory_threshold { - warn!("High memory usage detected: {:.1}% (threshold: {:.1}%)", + warn!("High memory usage detected: {:.1}% (threshold: {:.1}%)", memory_usage, memory_threshold); resource_ok = false; } - + // Check CPU utilization if cpu_usage > cpu_threshold { - warn!("High CPU usage detected: {:.1}% (threshold: {:.1}%)", + warn!("High CPU usage detected: {:.1}% (threshold: {:.1}%)", cpu_usage, cpu_threshold); resource_ok = false; } - + let previous_state = RESOURCE_CHECK_ENABLED.load(std::sync::atomic::Ordering::SeqCst); RESOURCE_CHECK_ENABLED.store(resource_ok, std::sync::atomic::Ordering::SeqCst); @@ -115,12 +121,14 @@ pub async fn check_resource_utilization_middleware( req: ServiceRequest, next: Next, ) -> Result, Error> { - let resource_ok = RESOURCE_CHECK_ENABLED.load(std::sync::atomic::Ordering::SeqCst); if !resource_ok { let error_msg = "Server resources over-utilized"; - warn!("Rejecting request to {} due to resource constraints", req.path()); + warn!( + "Rejecting request to {} due to resource constraints", + req.path() + ); return Err(ErrorServiceUnavailable(error_msg)); }