From 037249c80b0c6778f4a69c4687bad61bbf36f0bd Mon Sep 17 00:00:00 2001 From: vkhinvasara-oorja Date: Tue, 17 Jun 2025 15:08:03 +0530 Subject: [PATCH 1/4] feat: add resource utilization checks to ingestion handlers --- src/handlers/http/ingest.rs | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index c23eb6659..49f86f553 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -24,6 +24,7 @@ use arrow_array::RecordBatch; use bytes::Bytes; use chrono::Utc; use http::StatusCode; +use sysinfo::System; use crate::event::error::EventError; use crate::event::format::known_schema::{self, KNOWN_SCHEMA_LIST}; @@ -45,6 +46,8 @@ use super::modal::utils::ingest_utils::{flatten_and_push_logs, get_custom_fields use super::users::dashboards::DashboardError; use super::users::filters::FiltersError; +const CPU_UTILIZATION_THRESHOLD: f32 = 90.0; +const MEMORY_UTILIZATION_THRESHOLD: f32 = 90.0; // Handler for POST /api/v1/ingest // ingests events by extracting stream name from header // creates if stream does not exist @@ -52,6 +55,8 @@ pub async fn ingest( req: HttpRequest, Json(json): Json, ) -> Result { + check_resource_thresholds()?; + let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else { return Err(PostError::Header(ParseHeaderError::MissingStreamName)); }; @@ -127,6 +132,7 @@ pub async fn ingest( } pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<(), PostError> { + check_resource_thresholds()?; let size: usize = body.len(); let json: StrictValue = serde_json::from_slice(&body)?; let schema = PARSEABLE.get_stream(&stream_name)?.get_schema_raw(); @@ -158,6 +164,7 @@ pub async fn handle_otel_logs_ingestion( req: HttpRequest, Json(json): Json, ) -> Result { + check_resource_thresholds()?; let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else { return Err(PostError::Header(ParseHeaderError::MissingStreamName)); }; @@ -224,6 +231,7 @@ pub async fn handle_otel_metrics_ingestion( req: HttpRequest, Json(json): Json, ) -> Result { + check_resource_thresholds()?; let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else { return Err(PostError::Header(ParseHeaderError::MissingStreamName)); }; @@ -288,6 +296,7 @@ pub async fn handle_otel_traces_ingestion( req: HttpRequest, Json(json): Json, ) -> Result { + check_resource_thresholds()?; let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else { return Err(PostError::Header(ParseHeaderError::MissingStreamName)); }; @@ -354,6 +363,7 @@ pub async fn post_event( stream_name: Path, Json(json): Json, ) -> Result { + check_resource_thresholds()?; let stream_name = stream_name.into_inner(); let internal_stream_names = PARSEABLE.streams.list_internal_streams(); @@ -426,6 +436,7 @@ pub async fn push_logs_unchecked( batches: RecordBatch, stream_name: &str, ) -> Result { + check_resource_thresholds()?; let unchecked_event = event::Event { rb: batches, stream_name: stream_name.to_string(), @@ -442,6 +453,28 @@ pub async fn push_logs_unchecked( Ok(unchecked_event) } +fn check_resource_thresholds() -> Result<(), PostError> { + let mut sys = System::new_all(); + sys.refresh_cpu_usage(); + sys.refresh_memory(); + let used_memory = sys.used_memory() as f32; + let total_memory = sys.total_memory() as f32; + if total_memory == 0.0 { + return Err(PostError::ServiceUnavailable("Unable to determine memory usage".to_string())); + } + let memory_usage = (used_memory / total_memory) * 100.0; + if memory_usage > MEMORY_UTILIZATION_THRESHOLD { + return Err(PostError::ServiceUnavailable(format!("Memory is over-utilized: {:.1}%", memory_usage))); + } + + let cpu_usage = sys.global_cpu_usage(); + if cpu_usage > CPU_UTILIZATION_THRESHOLD { + return Err(PostError::ServiceUnavailable(format!("CPU is over-utilized: {}%", cpu_usage))); + } + + Ok(()) +} + #[derive(Debug, thiserror::Error)] pub enum PostError { #[error("{0}")] @@ -489,6 +522,8 @@ pub enum PostError { IncorrectLogFormat(String), #[error("Failed to ingest events in dataset {0}. Total number of fields {1} exceeds the permissible limit of {2}. We recommend creating a new dataset beyond {2} for better query performance.")] FieldsCountLimitExceeded(String, usize, usize), + #[error("Service Unavailable: Server over-utilized")] + ServiceUnavailable(String), } impl actix_web::ResponseError for PostError { @@ -518,6 +553,7 @@ impl actix_web::ResponseError for PostError { PostError::KnownFormat(_) => StatusCode::BAD_REQUEST, PostError::IncorrectLogFormat(_) => StatusCode::BAD_REQUEST, PostError::FieldsCountLimitExceeded(_, _, _) => StatusCode::BAD_REQUEST, + PostError::ServiceUnavailable(_) => StatusCode::SERVICE_UNAVAILABLE } } From a8d81262d9eeeb9333792eca2e45ff5e282ea6ec Mon Sep 17 00:00:00 2001 From: vkhinvasara-oorja Date: Tue, 17 Jun 2025 19:00:05 +0530 Subject: [PATCH 2/4] feat: add resource utilization middleware for request handling --- src/handlers/http/mod.rs | 1 + src/handlers/http/modal/mod.rs | 3 +- src/handlers/http/resource_check.rs | 76 +++++++++++++++++++++++++++++ 3 files changed, 79 insertions(+), 1 deletion(-) create mode 100644 src/handlers/http/resource_check.rs diff --git a/src/handlers/http/mod.rs b/src/handlers/http/mod.rs index 874a2aed5..6b0e71cbd 100644 --- a/src/handlers/http/mod.rs +++ b/src/handlers/http/mod.rs @@ -36,6 +36,7 @@ pub mod cluster; pub mod correlation; pub mod health_check; pub mod ingest; +pub mod resource_check; mod kinesis; pub mod llm; pub mod logstream; diff --git a/src/handlers/http/modal/mod.rs b/src/handlers/http/modal/mod.rs index 3504ef0fa..b186106b0 100644 --- a/src/handlers/http/modal/mod.rs +++ b/src/handlers/http/modal/mod.rs @@ -45,7 +45,7 @@ use crate::{ utils::get_node_id, }; -use super::{audit, cross_origin_config, health_check, API_BASE_PATH, API_VERSION}; +use super::{audit, cross_origin_config, health_check, resource_check, API_BASE_PATH, API_VERSION}; pub mod ingest; pub mod ingest_server; @@ -113,6 +113,7 @@ pub trait ParseableServer { .wrap(prometheus.clone()) .configure(|config| Self::configure_routes(config, oidc_client.clone())) .wrap(from_fn(health_check::check_shutdown_middleware)) + .wrap(from_fn(resource_check::check_resource_utilization_middleware)) .wrap(from_fn(audit::audit_log_middleware)) .wrap(actix_web::middleware::Logger::default()) .wrap(actix_web::middleware::Compress::default()) diff --git a/src/handlers/http/resource_check.rs b/src/handlers/http/resource_check.rs new file mode 100644 index 000000000..2fa3dae3d --- /dev/null +++ b/src/handlers/http/resource_check.rs @@ -0,0 +1,76 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use actix_web::{ + body::MessageBody, + dev::{ServiceRequest, ServiceResponse}, + error::Error, + error::ErrorServiceUnavailable, + middleware::Next, +}; +use sysinfo::System; +use tracing::warn; + +const CPU_UTILIZATION_THRESHOLD: f32 = 50.0; +const MEMORY_UTILIZATION_THRESHOLD: f32 = 50.0; + +/// Middleware to check system resource utilization before processing requests +/// Returns 503 Service Unavailable if CPU or memory usage exceeds thresholds +pub async fn check_resource_utilization_middleware( + req: ServiceRequest, + next: Next, +) -> Result, Error> { + + let mut sys = System::new_all(); + sys.refresh_cpu_usage(); + sys.refresh_memory(); + + let used_memory = sys.used_memory() as f32; + let total_memory = sys.total_memory() as f32; + + // Check memory utilization + if total_memory > 0.0 { + let memory_usage = (used_memory / total_memory) * 100.0; + if memory_usage > MEMORY_UTILIZATION_THRESHOLD { + let error_msg = format!("Memory is over-utilized: {:.1}%", memory_usage); + warn!( + "Rejecting request to {} due to high memory usage: {:.1}% (threshold: {:.1}%)", + req.path(), + memory_usage, + MEMORY_UTILIZATION_THRESHOLD + ); + return Err(ErrorServiceUnavailable(error_msg)); + } + } + + // Check CPU utilization + let cpu_usage = sys.global_cpu_usage(); + if cpu_usage > CPU_UTILIZATION_THRESHOLD { + let error_msg = format!("CPU is over-utilized: {:.1}%", cpu_usage); + warn!( + "Rejecting request to {} due to high CPU usage: {:.1}% (threshold: {:.1}%)", + req.path(), + cpu_usage, + CPU_UTILIZATION_THRESHOLD + ); + return Err(ErrorServiceUnavailable(error_msg)); + } + + // Continue processing the request if resource utilization is within limits + next.call(req).await +} From 1cee98f40fa22cb8c7988bfc1ff94a85b4f78e6a Mon Sep 17 00:00:00 2001 From: vkhinvasara-oorja Date: Tue, 17 Jun 2025 19:12:04 +0530 Subject: [PATCH 3/4] refactor: remove resource utilization checks from ingestion handlers --- src/handlers/http/ingest.rs | 34 ---------------------------------- 1 file changed, 34 deletions(-) diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index 49f86f553..ac5e09e68 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -24,7 +24,6 @@ use arrow_array::RecordBatch; use bytes::Bytes; use chrono::Utc; use http::StatusCode; -use sysinfo::System; use crate::event::error::EventError; use crate::event::format::known_schema::{self, KNOWN_SCHEMA_LIST}; @@ -45,9 +44,6 @@ use super::logstream::error::{CreateStreamError, StreamError}; use super::modal::utils::ingest_utils::{flatten_and_push_logs, get_custom_fields_from_header}; use super::users::dashboards::DashboardError; use super::users::filters::FiltersError; - -const CPU_UTILIZATION_THRESHOLD: f32 = 90.0; -const MEMORY_UTILIZATION_THRESHOLD: f32 = 90.0; // Handler for POST /api/v1/ingest // ingests events by extracting stream name from header // creates if stream does not exist @@ -55,8 +51,6 @@ pub async fn ingest( req: HttpRequest, Json(json): Json, ) -> Result { - check_resource_thresholds()?; - let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else { return Err(PostError::Header(ParseHeaderError::MissingStreamName)); }; @@ -132,7 +126,6 @@ pub async fn ingest( } pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<(), PostError> { - check_resource_thresholds()?; let size: usize = body.len(); let json: StrictValue = serde_json::from_slice(&body)?; let schema = PARSEABLE.get_stream(&stream_name)?.get_schema_raw(); @@ -164,7 +157,6 @@ pub async fn handle_otel_logs_ingestion( req: HttpRequest, Json(json): Json, ) -> Result { - check_resource_thresholds()?; let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else { return Err(PostError::Header(ParseHeaderError::MissingStreamName)); }; @@ -231,7 +223,6 @@ pub async fn handle_otel_metrics_ingestion( req: HttpRequest, Json(json): Json, ) -> Result { - check_resource_thresholds()?; let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else { return Err(PostError::Header(ParseHeaderError::MissingStreamName)); }; @@ -296,7 +287,6 @@ pub async fn handle_otel_traces_ingestion( req: HttpRequest, Json(json): Json, ) -> Result { - check_resource_thresholds()?; let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else { return Err(PostError::Header(ParseHeaderError::MissingStreamName)); }; @@ -363,7 +353,6 @@ pub async fn post_event( stream_name: Path, Json(json): Json, ) -> Result { - check_resource_thresholds()?; let stream_name = stream_name.into_inner(); let internal_stream_names = PARSEABLE.streams.list_internal_streams(); @@ -436,7 +425,6 @@ pub async fn push_logs_unchecked( batches: RecordBatch, stream_name: &str, ) -> Result { - check_resource_thresholds()?; let unchecked_event = event::Event { rb: batches, stream_name: stream_name.to_string(), @@ -453,28 +441,6 @@ pub async fn push_logs_unchecked( Ok(unchecked_event) } -fn check_resource_thresholds() -> Result<(), PostError> { - let mut sys = System::new_all(); - sys.refresh_cpu_usage(); - sys.refresh_memory(); - let used_memory = sys.used_memory() as f32; - let total_memory = sys.total_memory() as f32; - if total_memory == 0.0 { - return Err(PostError::ServiceUnavailable("Unable to determine memory usage".to_string())); - } - let memory_usage = (used_memory / total_memory) * 100.0; - if memory_usage > MEMORY_UTILIZATION_THRESHOLD { - return Err(PostError::ServiceUnavailable(format!("Memory is over-utilized: {:.1}%", memory_usage))); - } - - let cpu_usage = sys.global_cpu_usage(); - if cpu_usage > CPU_UTILIZATION_THRESHOLD { - return Err(PostError::ServiceUnavailable(format!("CPU is over-utilized: {}%", cpu_usage))); - } - - Ok(()) -} - #[derive(Debug, thiserror::Error)] pub enum PostError { #[error("{0}")] From f623cc8bcec4749358f1a8c4fa8664c592651e40 Mon Sep 17 00:00:00 2001 From: vkhinvasara-oorja Date: Tue, 17 Jun 2025 19:14:39 +0530 Subject: [PATCH 4/4] refactor: remove ServiceUnavailable error from PostError enum --- src/handlers/http/ingest.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index ac5e09e68..c23eb6659 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -44,6 +44,7 @@ use super::logstream::error::{CreateStreamError, StreamError}; use super::modal::utils::ingest_utils::{flatten_and_push_logs, get_custom_fields_from_header}; use super::users::dashboards::DashboardError; use super::users::filters::FiltersError; + // Handler for POST /api/v1/ingest // ingests events by extracting stream name from header // creates if stream does not exist @@ -488,8 +489,6 @@ pub enum PostError { IncorrectLogFormat(String), #[error("Failed to ingest events in dataset {0}. Total number of fields {1} exceeds the permissible limit of {2}. We recommend creating a new dataset beyond {2} for better query performance.")] FieldsCountLimitExceeded(String, usize, usize), - #[error("Service Unavailable: Server over-utilized")] - ServiceUnavailable(String), } impl actix_web::ResponseError for PostError { @@ -519,7 +518,6 @@ impl actix_web::ResponseError for PostError { PostError::KnownFormat(_) => StatusCode::BAD_REQUEST, PostError::IncorrectLogFormat(_) => StatusCode::BAD_REQUEST, PostError::FieldsCountLimitExceeded(_, _, _) => StatusCode::BAD_REQUEST, - PostError::ServiceUnavailable(_) => StatusCode::SERVICE_UNAVAILABLE } }